From a9db7991887614ef7188984b44ecd639f5c8e558 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 26 Feb 2018 17:39:18 -0800 Subject: [PATCH] solver: add support for releasing cached results Signed-off-by: Tonis Tiigi --- solver-next/boltdbcachestorage/storage.go | 2 +- solver-next/cache.go | 39 ++++- solver-next/cache_test.go | 81 ++++++++++ solver-next/cachestorage.go | 2 + solver-next/cachestorage_testsuite.go | 179 +++++++++++++++++++++- solver-next/edge.go | 13 +- solver-next/memorycachestorage.go | 109 +++++++++++-- solver-next/types.go | 2 +- 8 files changed, 407 insertions(+), 20 deletions(-) diff --git a/solver-next/boltdbcachestorage/storage.go b/solver-next/boltdbcachestorage/storage.go index a25b38fa..40664a7d 100644 --- a/solver-next/boltdbcachestorage/storage.go +++ b/solver-next/boltdbcachestorage/storage.go @@ -135,7 +135,7 @@ func (s *Store) AddResult(id string, res solver.CacheResult) error { }) } -func (s *Store) Release(resultID string) error { +func (s *Store) Release(id, resultID string) error { return errors.Errorf("not-implemented") } diff --git a/solver-next/cache.go b/solver-next/cache.go index 92380528..8d526ab7 100644 --- a/solver-next/cache.go +++ b/solver-next/cache.go @@ -21,11 +21,26 @@ var internalMemoryKey = internalMemoryKeyT("buildkit/memory-cache-id") var NoSelector = digest.FromBytes(nil) func NewInMemoryCacheManager() CacheManager { - return &inMemoryCacheManager{ - id: identity.NewID(), - backend: NewInMemoryCacheStorage(), - results: NewInMemoryResultStorage(), + return NewCacheManager(identity.NewID(), NewInMemoryCacheStorage(), NewInMemoryResultStorage()) +} + +func NewCacheManager(id string, storage CacheKeyStorage, results CacheResultStorage) CacheManager { + cm := &inMemoryCacheManager{ + id: id, + backend: storage, + results: results, } + + storage.Walk(func(id string) error { + return storage.WalkResults(id, func(cr CacheResult) error { + if !results.Exists(cr.ID) { + storage.Release(cr.ID) + } + return nil + }) + }) + + return cm } type inMemoryCacheKey struct { @@ -180,7 +195,7 @@ func (c *inMemoryCacheManager) getBestResult(cki CacheKeyInfo) (*CacheResult, er } sort.Slice(results, func(i, j int) bool { - return results[i].CreatedAt.Before(results[j].CreatedAt) + return results[i].CreatedAt.After(results[j].CreatedAt) }) if len(results) > 0 { @@ -243,16 +258,19 @@ func (c *inMemoryCacheManager) Query(deps []ExportableCacheKey, input Index, dgs refs[ck.CacheKeyInfo.ID] = struct{}{} } + keys := make([]*inMemoryCacheKey, 0) outs := make([]*CacheRecord, 0, len(refs)) for id := range refs { cki, err := c.backend.Get(id) if err == nil { k := c.toInMemoryCacheKey(cki) + keys = append(keys, k) if err := c.backend.WalkResults(id, func(r CacheResult) error { outs = append(outs, &CacheRecord{ ID: id + "@" + r.ID, CacheKey: withExporter(k, &r), CacheManager: c, + Loadable: true, CreatedAt: r.CreatedAt, }) return nil @@ -262,6 +280,17 @@ func (c *inMemoryCacheManager) Query(deps []ExportableCacheKey, input Index, dgs } } + if len(outs) == 0 { + for _, k := range keys { + outs = append(outs, &CacheRecord{ + ID: k.CacheKeyInfo.ID, + CacheKey: withExporter(k, nil), + CacheManager: c, + Loadable: false, + }) + } + } + return outs, nil } diff --git a/solver-next/cache_test.go b/solver-next/cache_test.go index 4942861a..2c27e9e7 100644 --- a/solver-next/cache_test.go +++ b/solver-next/cache_test.go @@ -194,6 +194,87 @@ func TestInMemoryCacheSelectorNested(t *testing.T) { require.Equal(t, len(matches), 1) } +func TestInMemoryCacheReleaseParent(t *testing.T) { + storage := NewInMemoryCacheStorage() + results := NewInMemoryResultStorage() + m := NewCacheManager(identity.NewID(), storage, results) + + res0 := testResult("result0") + cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0) + require.NoError(t, err) + + res1 := testResult("result1") + _, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{ + {CacheKey: cacheFoo}, + }), res1) + require.NoError(t, err) + + matches, err := m.Query(nil, 0, dgst("foo"), 0, "") + require.NoError(t, err) + require.Equal(t, len(matches), 1) + + require.True(t, matches[0].Loadable) + + err = storage.Release(res0.ID()) + require.NoError(t, err) + + // foo becomes unloadable + matches, err = m.Query(nil, 0, dgst("foo"), 0, "") + require.NoError(t, err) + require.Equal(t, len(matches), 1) + + require.False(t, matches[0].Loadable) + + matches, err = m.Query([]ExportableCacheKey{matches[0].CacheKey}, 0, dgst("bar"), 0, "") + require.NoError(t, err) + require.Equal(t, len(matches), 1) + + require.True(t, matches[0].Loadable) + + // releasing bar releases both foo and bar + err = storage.Release(res1.ID()) + require.NoError(t, err) + + matches, err = m.Query(nil, 0, dgst("foo"), 0, "") + require.NoError(t, err) + require.Equal(t, len(matches), 0) +} + +// TestInMemoryCacheRestoreOfflineDeletion deletes a result while the +// cachemanager is not running and checks that it syncs up on restore +func TestInMemoryCacheRestoreOfflineDeletion(t *testing.T) { + storage := NewInMemoryCacheStorage() + results := NewInMemoryResultStorage() + m := NewCacheManager(identity.NewID(), storage, results) + + res0 := testResult("result0") + cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0) + require.NoError(t, err) + + res1 := testResult("result1") + _, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{ + {CacheKey: cacheFoo}, + }), res1) + require.NoError(t, err) + + results2 := NewInMemoryResultStorage() + _, err = results2.Save(res1) // only add bar + require.NoError(t, err) + + m = NewCacheManager(identity.NewID(), storage, results2) + + matches, err := m.Query(nil, 0, dgst("foo"), 0, "") + require.NoError(t, err) + require.Equal(t, len(matches), 1) + require.False(t, matches[0].Loadable) + + matches, err = m.Query([]ExportableCacheKey{matches[0].CacheKey}, 0, dgst("bar"), 0, "") + require.NoError(t, err) + require.Equal(t, len(matches), 1) + + require.True(t, matches[0].Loadable) +} + func dgst(s string) digest.Digest { return digest.FromBytes([]byte(s)) } diff --git a/solver-next/cachestorage.go b/solver-next/cachestorage.go index 0ade7017..7f56963a 100644 --- a/solver-next/cachestorage.go +++ b/solver-next/cachestorage.go @@ -13,6 +13,7 @@ var ErrNotFound = errors.Errorf("not found") // CacheKeyStorage is interface for persisting cache metadata type CacheKeyStorage interface { Get(id string) (CacheKeyInfo, error) + Walk(fn func(id string) error) error Set(info CacheKeyInfo) error WalkResults(id string, fn func(CacheResult) error) error @@ -59,4 +60,5 @@ type CacheResultStorage interface { Save(Result) (CacheResult, error) Load(ctx context.Context, res CacheResult) (Result, error) LoadRemote(ctx context.Context, res CacheResult) (*Remote, error) + Exists(id string) bool } diff --git a/solver-next/cachestorage_testsuite.go b/solver-next/cachestorage_testsuite.go index 82bedd39..7ef10cb3 100644 --- a/solver-next/cachestorage_testsuite.go +++ b/solver-next/cachestorage_testsuite.go @@ -17,6 +17,8 @@ func RunCacheStorageTests(t *testing.T, st func() (CacheKeyStorage, func())) { testGetSet, testResults, testLinks, + testResultReleaseSingleLevel, + testResultReleaseMultiLevel, } { runStorageTest(t, tc, st) } @@ -144,13 +146,18 @@ func testLinks(t *testing.T, st CacheKeyStorage) { err = st.Set(cki2) require.NoError(t, err) + require.NoError(t, st.Set(CacheKeyInfo{ID: "target0"})) + require.NoError(t, st.Set(CacheKeyInfo{ID: "target0-second"})) + require.NoError(t, st.Set(CacheKeyInfo{ID: "target1"})) + require.NoError(t, st.Set(CacheKeyInfo{ID: "target1-second"})) + l0 := CacheInfoLink{ Input: 0, Output: 1, Digest: digest.FromBytes([]byte(">target0")), } err = st.AddLink(cki.ID, l0, "target0") require.NoError(t, err) - err = st.AddLink(cki2.ID, l0, "target0-bar") + err = st.AddLink(cki2.ID, l0, "target0-second") require.NoError(t, err) m := map[string]struct{}{} @@ -205,6 +212,176 @@ func testLinks(t *testing.T, st CacheKeyStorage) { require.True(t, ok) } +func testResultReleaseSingleLevel(t *testing.T, st CacheKeyStorage) { + t.Parallel() + cki := CacheKeyInfo{ + ID: "foo", + Base: digest.FromBytes([]byte("foo")), + } + err := st.Set(cki) + require.NoError(t, err) + + err = st.AddResult(cki.ID, CacheResult{ + ID: "foo0", + CreatedAt: time.Now(), + }) + require.NoError(t, err) + + err = st.AddResult(cki.ID, CacheResult{ + ID: "foo1", + CreatedAt: time.Now(), + }) + require.NoError(t, err) + + err = st.Release("foo0") + require.NoError(t, err) + + m := map[string]struct{}{} + st.WalkResults("foo", func(res CacheResult) error { + m[res.ID] = struct{}{} + return nil + }) + + require.Equal(t, len(m), 1) + _, ok := m["foo1"] + require.True(t, ok) + + err = st.Release("foo1") + require.NoError(t, err) + + m = map[string]struct{}{} + st.WalkResults("foo", func(res CacheResult) error { + m[res.ID] = struct{}{} + return nil + }) + + require.Equal(t, len(m), 0) + + _, err = st.Get("foo") + require.Error(t, err) + require.Error(t, errors.Cause(err), ErrNotFound) +} + +func testResultReleaseMultiLevel(t *testing.T, st CacheKeyStorage) { + t.Parallel() + cki := CacheKeyInfo{ + ID: "foo", + Base: digest.FromBytes([]byte("foo")), + } + err := st.Set(cki) + require.NoError(t, err) + + err = st.AddResult(cki.ID, CacheResult{ + ID: "foo-result", + CreatedAt: time.Now(), + }) + require.NoError(t, err) + + sub0 := CacheKeyInfo{ + ID: "sub0", + Base: digest.FromBytes([]byte("sub0")), + } + err = st.Set(sub0) + require.NoError(t, err) + + err = st.AddResult(sub0.ID, CacheResult{ + ID: "sub0-result", + CreatedAt: time.Now(), + }) + require.NoError(t, err) + + l0 := CacheInfoLink{ + Input: 0, Output: 1, Digest: digest.FromBytes([]byte("to-sub0")), + } + err = st.AddLink(cki.ID, l0, "sub0") + require.NoError(t, err) + + sub1 := CacheKeyInfo{ + ID: "sub1", + Base: digest.FromBytes([]byte("sub1")), + } + err = st.Set(sub1) + require.NoError(t, err) + + err = st.AddResult(sub1.ID, CacheResult{ + ID: "sub1-result", + CreatedAt: time.Now(), + }) + require.NoError(t, err) + + err = st.AddLink(cki.ID, l0, "sub1") + require.NoError(t, err) + + // delete one sub doesn't delete parent + + err = st.Release("sub0-result") + require.NoError(t, err) + + m := map[string]struct{}{} + err = st.WalkResults("foo", func(res CacheResult) error { + m[res.ID] = struct{}{} + return nil + }) + require.NoError(t, err) + + require.Equal(t, len(m), 1) + _, ok := m["foo-result"] + require.True(t, ok) + + _, err = st.Get("sub0") + require.Error(t, err) + require.Equal(t, errors.Cause(err), ErrNotFound) + + m = map[string]struct{}{} + err = st.WalkLinks("foo", l0, func(id string) error { + m[id] = struct{}{} + return nil + }) + require.NoError(t, err) + require.Equal(t, len(m), 1) + + _, ok = m["sub1"] + require.True(t, ok) + + // release foo removes the result but doesn't break the chain + + err = st.Release("foo-result") + require.NoError(t, err) + + _, err = st.Get("foo") + require.NoError(t, err) + + m = map[string]struct{}{} + err = st.WalkResults("foo", func(res CacheResult) error { + m[res.ID] = struct{}{} + return nil + }) + require.NoError(t, err) + + require.Equal(t, len(m), 0) + + m = map[string]struct{}{} + err = st.WalkLinks("foo", l0, func(id string) error { + m[id] = struct{}{} + return nil + }) + require.NoError(t, err) + require.Equal(t, len(m), 1) + + // release sub1 now releases foo as well + + err = st.Release("sub1-result") + require.NoError(t, err) + + _, err = st.Get("sub1") + require.Error(t, err) + require.Equal(t, errors.Cause(err), ErrNotFound) + + _, err = st.Get("foo") + require.Error(t, err) + require.Equal(t, errors.Cause(err), ErrNotFound) +} + func getFunctionName(i interface{}) string { fullname := runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() dot := strings.LastIndex(fullname, ".") + 1 diff --git a/solver-next/edge.go b/solver-next/edge.go index 94bea196..556a5b6b 100644 --- a/solver-next/edge.go +++ b/solver-next/edge.go @@ -302,7 +302,9 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) { logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error } else { for _, r := range records { - e.cacheRecords[r.ID] = r + if r.Loadable { + e.cacheRecords[r.ID] = r + } } if len(records) > 0 { e.keys = append(e.keys, records[0].CacheKey) @@ -423,7 +425,9 @@ func (e *edge) recalcCurrentState() { for k, r := range newRecords { e.keys = append(e.keys, r.CacheKey) - e.cacheRecords[k] = r + if r.Loadable { + e.cacheRecords[k] = r + } } // detect lower/upper bound for current state @@ -622,7 +626,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory) // execIfPossible creates a request for getting the edge result if there is // enough state func (e *edge) execIfPossible(f *pipeFactory) { - if len(e.keys) > 0 { + if len(e.cacheRecords) > 0 { if e.keysDidChange { e.postpone(f) return @@ -652,6 +656,9 @@ func (e *edge) loadCache(ctx context.Context) (interface{}, error) { var rec *CacheRecord for _, r := range e.cacheRecords { + if !r.Loadable { + continue + } if rec == nil || rec.CreatedAt.Before(r.CreatedAt) || (rec.CreatedAt.Equal(r.CreatedAt) && rec.Priority < r.Priority) { rec = r } diff --git a/solver-next/memorycachestorage.go b/solver-next/memorycachestorage.go index 1a0f2814..42f82192 100644 --- a/solver-next/memorycachestorage.go +++ b/solver-next/memorycachestorage.go @@ -9,19 +9,24 @@ import ( ) func NewInMemoryCacheStorage() CacheKeyStorage { - return &inMemoryStore{byID: map[string]*inMemoryKey{}} + return &inMemoryStore{ + byID: map[string]*inMemoryKey{}, + byResult: map[string]map[string]struct{}{}, + } } type inMemoryStore struct { - mu sync.RWMutex - byID map[string]*inMemoryKey + mu sync.RWMutex + byID map[string]*inMemoryKey + byResult map[string]map[string]struct{} } type inMemoryKey struct { CacheKeyInfo - results map[string]CacheResult - links map[CacheInfoLink]map[string]struct{} + results map[string]CacheResult + links map[CacheInfoLink]map[string]struct{} + backlinks map[string]struct{} } func (s *inMemoryStore) Get(id string) (CacheKeyInfo, error) { @@ -40,8 +45,9 @@ func (s *inMemoryStore) Set(info CacheKeyInfo) error { k, ok := s.byID[info.ID] if !ok { k = &inMemoryKey{ - results: map[string]CacheResult{}, - links: map[CacheInfoLink]map[string]struct{}{}, + results: map[string]CacheResult{}, + links: map[CacheInfoLink]map[string]struct{}{}, + backlinks: map[string]struct{}{}, } s.byID[info.ID] = k } @@ -49,14 +55,37 @@ func (s *inMemoryStore) Set(info CacheKeyInfo) error { return nil } +func (s *inMemoryStore) Walk(fn func(string) error) error { + s.mu.RLock() + ids := make([]string, 0, len(s.byID)) + for id := range s.byID { + ids = append(ids, id) + } + s.mu.RUnlock() + + for _, id := range ids { + if err := fn(id); err != nil { + return err + } + } + return nil +} + func (s *inMemoryStore) WalkResults(id string, fn func(CacheResult) error) error { s.mu.RLock() - defer s.mu.RUnlock() + k, ok := s.byID[id] if !ok { + s.mu.RUnlock() return nil } + copy := make([]CacheResult, 0, len(k.results)) for _, res := range k.results { + copy = append(copy, res) + } + s.mu.RUnlock() + + for _, res := range copy { if err := fn(res); err != nil { return err } @@ -86,11 +115,63 @@ func (s *inMemoryStore) AddResult(id string, res CacheResult) error { return errors.Wrapf(ErrNotFound, "no such key %s", id) } k.results[res.ID] = res + m, ok := s.byResult[res.ID] + if !ok { + m = map[string]struct{}{} + s.byResult[res.ID] = m + } + m[id] = struct{}{} return nil } func (s *inMemoryStore) Release(resultID string) error { - return errors.Errorf("not-implemented") + s.mu.Lock() + defer s.mu.Unlock() + + ids, ok := s.byResult[resultID] + if !ok { + return nil + } + + for id := range ids { + k, ok := s.byID[id] + if !ok { + continue + } + + delete(k.results, resultID) + delete(s.byResult[resultID], id) + if len(s.byResult[resultID]) == 0 { + delete(s.byResult, resultID) + } + + if len(k.results) == 0 && len(k.links) == 0 { + s.emptyBranchWithParents(k) + } + } + + return nil +} + +func (s *inMemoryStore) emptyBranchWithParents(k *inMemoryKey) { + if len(k.results) != 0 && len(k.links) != 0 { + return + } + for id := range k.backlinks { + p, ok := s.byID[id] + if !ok { + continue + } + for l := range p.links { + delete(p.links[l], k.ID) + if len(p.links[l]) == 0 { + delete(p.links, l) + } + } + s.emptyBranchWithParents(p) + } + + delete(s.byID, k.ID) } func (s *inMemoryStore) AddLink(id string, link CacheInfoLink, target string) error { @@ -100,12 +181,17 @@ func (s *inMemoryStore) AddLink(id string, link CacheInfoLink, target string) er if !ok { return errors.Wrapf(ErrNotFound, "no such key %s", id) } + k2, ok := s.byID[target] + if !ok { + return errors.Wrapf(ErrNotFound, "no such key %s", target) + } m, ok := k.links[link] if !ok { m = map[string]struct{}{} k.links[link] = m } + k2.backlinks[id] = struct{}{} m[target] = struct{}{} return nil } @@ -149,3 +235,8 @@ func (s *inMemoryResultStore) Load(ctx context.Context, res CacheResult) (Result func (s *inMemoryResultStore) LoadRemote(ctx context.Context, res CacheResult) (*Remote, error) { return nil, nil } + +func (s *inMemoryResultStore) Exists(id string) bool { + _, ok := s.m.Load(id) + return ok +} diff --git a/solver-next/types.go b/solver-next/types.go index 348b35f8..642a473d 100644 --- a/solver-next/types.go +++ b/solver-next/types.go @@ -130,7 +130,7 @@ type CacheRecord struct { ID string CacheKey ExportableCacheKey CacheManager CacheManager - // Loadable bool + Loadable bool // Size int CreatedAt time.Time Priority int