solver: add support for releasing cached results

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2018-02-26 17:39:18 -08:00
parent 666a5e4119
commit a9db799188
8 changed files with 407 additions and 20 deletions

View File

@ -135,7 +135,7 @@ func (s *Store) AddResult(id string, res solver.CacheResult) error {
})
}
func (s *Store) Release(resultID string) error {
func (s *Store) Release(id, resultID string) error {
return errors.Errorf("not-implemented")
}

View File

@ -21,11 +21,26 @@ var internalMemoryKey = internalMemoryKeyT("buildkit/memory-cache-id")
var NoSelector = digest.FromBytes(nil)
func NewInMemoryCacheManager() CacheManager {
return &inMemoryCacheManager{
id: identity.NewID(),
backend: NewInMemoryCacheStorage(),
results: NewInMemoryResultStorage(),
return NewCacheManager(identity.NewID(), NewInMemoryCacheStorage(), NewInMemoryResultStorage())
}
func NewCacheManager(id string, storage CacheKeyStorage, results CacheResultStorage) CacheManager {
cm := &inMemoryCacheManager{
id: id,
backend: storage,
results: results,
}
storage.Walk(func(id string) error {
return storage.WalkResults(id, func(cr CacheResult) error {
if !results.Exists(cr.ID) {
storage.Release(cr.ID)
}
return nil
})
})
return cm
}
type inMemoryCacheKey struct {
@ -180,7 +195,7 @@ func (c *inMemoryCacheManager) getBestResult(cki CacheKeyInfo) (*CacheResult, er
}
sort.Slice(results, func(i, j int) bool {
return results[i].CreatedAt.Before(results[j].CreatedAt)
return results[i].CreatedAt.After(results[j].CreatedAt)
})
if len(results) > 0 {
@ -243,16 +258,19 @@ func (c *inMemoryCacheManager) Query(deps []ExportableCacheKey, input Index, dgs
refs[ck.CacheKeyInfo.ID] = struct{}{}
}
keys := make([]*inMemoryCacheKey, 0)
outs := make([]*CacheRecord, 0, len(refs))
for id := range refs {
cki, err := c.backend.Get(id)
if err == nil {
k := c.toInMemoryCacheKey(cki)
keys = append(keys, k)
if err := c.backend.WalkResults(id, func(r CacheResult) error {
outs = append(outs, &CacheRecord{
ID: id + "@" + r.ID,
CacheKey: withExporter(k, &r),
CacheManager: c,
Loadable: true,
CreatedAt: r.CreatedAt,
})
return nil
@ -262,6 +280,17 @@ func (c *inMemoryCacheManager) Query(deps []ExportableCacheKey, input Index, dgs
}
}
if len(outs) == 0 {
for _, k := range keys {
outs = append(outs, &CacheRecord{
ID: k.CacheKeyInfo.ID,
CacheKey: withExporter(k, nil),
CacheManager: c,
Loadable: false,
})
}
}
return outs, nil
}

View File

@ -194,6 +194,87 @@ func TestInMemoryCacheSelectorNested(t *testing.T) {
require.Equal(t, len(matches), 1)
}
func TestInMemoryCacheReleaseParent(t *testing.T) {
storage := NewInMemoryCacheStorage()
results := NewInMemoryResultStorage()
m := NewCacheManager(identity.NewID(), storage, results)
res0 := testResult("result0")
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0)
require.NoError(t, err)
res1 := testResult("result1")
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo},
}), res1)
require.NoError(t, err)
matches, err := m.Query(nil, 0, dgst("foo"), 0, "")
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.True(t, matches[0].Loadable)
err = storage.Release(res0.ID())
require.NoError(t, err)
// foo becomes unloadable
matches, err = m.Query(nil, 0, dgst("foo"), 0, "")
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.False(t, matches[0].Loadable)
matches, err = m.Query([]ExportableCacheKey{matches[0].CacheKey}, 0, dgst("bar"), 0, "")
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.True(t, matches[0].Loadable)
// releasing bar releases both foo and bar
err = storage.Release(res1.ID())
require.NoError(t, err)
matches, err = m.Query(nil, 0, dgst("foo"), 0, "")
require.NoError(t, err)
require.Equal(t, len(matches), 0)
}
// TestInMemoryCacheRestoreOfflineDeletion deletes a result while the
// cachemanager is not running and checks that it syncs up on restore
func TestInMemoryCacheRestoreOfflineDeletion(t *testing.T) {
storage := NewInMemoryCacheStorage()
results := NewInMemoryResultStorage()
m := NewCacheManager(identity.NewID(), storage, results)
res0 := testResult("result0")
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0)
require.NoError(t, err)
res1 := testResult("result1")
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo},
}), res1)
require.NoError(t, err)
results2 := NewInMemoryResultStorage()
_, err = results2.Save(res1) // only add bar
require.NoError(t, err)
m = NewCacheManager(identity.NewID(), storage, results2)
matches, err := m.Query(nil, 0, dgst("foo"), 0, "")
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.False(t, matches[0].Loadable)
matches, err = m.Query([]ExportableCacheKey{matches[0].CacheKey}, 0, dgst("bar"), 0, "")
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.True(t, matches[0].Loadable)
}
func dgst(s string) digest.Digest {
return digest.FromBytes([]byte(s))
}

View File

@ -13,6 +13,7 @@ var ErrNotFound = errors.Errorf("not found")
// CacheKeyStorage is interface for persisting cache metadata
type CacheKeyStorage interface {
Get(id string) (CacheKeyInfo, error)
Walk(fn func(id string) error) error
Set(info CacheKeyInfo) error
WalkResults(id string, fn func(CacheResult) error) error
@ -59,4 +60,5 @@ type CacheResultStorage interface {
Save(Result) (CacheResult, error)
Load(ctx context.Context, res CacheResult) (Result, error)
LoadRemote(ctx context.Context, res CacheResult) (*Remote, error)
Exists(id string) bool
}

View File

@ -17,6 +17,8 @@ func RunCacheStorageTests(t *testing.T, st func() (CacheKeyStorage, func())) {
testGetSet,
testResults,
testLinks,
testResultReleaseSingleLevel,
testResultReleaseMultiLevel,
} {
runStorageTest(t, tc, st)
}
@ -144,13 +146,18 @@ func testLinks(t *testing.T, st CacheKeyStorage) {
err = st.Set(cki2)
require.NoError(t, err)
require.NoError(t, st.Set(CacheKeyInfo{ID: "target0"}))
require.NoError(t, st.Set(CacheKeyInfo{ID: "target0-second"}))
require.NoError(t, st.Set(CacheKeyInfo{ID: "target1"}))
require.NoError(t, st.Set(CacheKeyInfo{ID: "target1-second"}))
l0 := CacheInfoLink{
Input: 0, Output: 1, Digest: digest.FromBytes([]byte(">target0")),
}
err = st.AddLink(cki.ID, l0, "target0")
require.NoError(t, err)
err = st.AddLink(cki2.ID, l0, "target0-bar")
err = st.AddLink(cki2.ID, l0, "target0-second")
require.NoError(t, err)
m := map[string]struct{}{}
@ -205,6 +212,176 @@ func testLinks(t *testing.T, st CacheKeyStorage) {
require.True(t, ok)
}
func testResultReleaseSingleLevel(t *testing.T, st CacheKeyStorage) {
t.Parallel()
cki := CacheKeyInfo{
ID: "foo",
Base: digest.FromBytes([]byte("foo")),
}
err := st.Set(cki)
require.NoError(t, err)
err = st.AddResult(cki.ID, CacheResult{
ID: "foo0",
CreatedAt: time.Now(),
})
require.NoError(t, err)
err = st.AddResult(cki.ID, CacheResult{
ID: "foo1",
CreatedAt: time.Now(),
})
require.NoError(t, err)
err = st.Release("foo0")
require.NoError(t, err)
m := map[string]struct{}{}
st.WalkResults("foo", func(res CacheResult) error {
m[res.ID] = struct{}{}
return nil
})
require.Equal(t, len(m), 1)
_, ok := m["foo1"]
require.True(t, ok)
err = st.Release("foo1")
require.NoError(t, err)
m = map[string]struct{}{}
st.WalkResults("foo", func(res CacheResult) error {
m[res.ID] = struct{}{}
return nil
})
require.Equal(t, len(m), 0)
_, err = st.Get("foo")
require.Error(t, err)
require.Error(t, errors.Cause(err), ErrNotFound)
}
func testResultReleaseMultiLevel(t *testing.T, st CacheKeyStorage) {
t.Parallel()
cki := CacheKeyInfo{
ID: "foo",
Base: digest.FromBytes([]byte("foo")),
}
err := st.Set(cki)
require.NoError(t, err)
err = st.AddResult(cki.ID, CacheResult{
ID: "foo-result",
CreatedAt: time.Now(),
})
require.NoError(t, err)
sub0 := CacheKeyInfo{
ID: "sub0",
Base: digest.FromBytes([]byte("sub0")),
}
err = st.Set(sub0)
require.NoError(t, err)
err = st.AddResult(sub0.ID, CacheResult{
ID: "sub0-result",
CreatedAt: time.Now(),
})
require.NoError(t, err)
l0 := CacheInfoLink{
Input: 0, Output: 1, Digest: digest.FromBytes([]byte("to-sub0")),
}
err = st.AddLink(cki.ID, l0, "sub0")
require.NoError(t, err)
sub1 := CacheKeyInfo{
ID: "sub1",
Base: digest.FromBytes([]byte("sub1")),
}
err = st.Set(sub1)
require.NoError(t, err)
err = st.AddResult(sub1.ID, CacheResult{
ID: "sub1-result",
CreatedAt: time.Now(),
})
require.NoError(t, err)
err = st.AddLink(cki.ID, l0, "sub1")
require.NoError(t, err)
// delete one sub doesn't delete parent
err = st.Release("sub0-result")
require.NoError(t, err)
m := map[string]struct{}{}
err = st.WalkResults("foo", func(res CacheResult) error {
m[res.ID] = struct{}{}
return nil
})
require.NoError(t, err)
require.Equal(t, len(m), 1)
_, ok := m["foo-result"]
require.True(t, ok)
_, err = st.Get("sub0")
require.Error(t, err)
require.Equal(t, errors.Cause(err), ErrNotFound)
m = map[string]struct{}{}
err = st.WalkLinks("foo", l0, func(id string) error {
m[id] = struct{}{}
return nil
})
require.NoError(t, err)
require.Equal(t, len(m), 1)
_, ok = m["sub1"]
require.True(t, ok)
// release foo removes the result but doesn't break the chain
err = st.Release("foo-result")
require.NoError(t, err)
_, err = st.Get("foo")
require.NoError(t, err)
m = map[string]struct{}{}
err = st.WalkResults("foo", func(res CacheResult) error {
m[res.ID] = struct{}{}
return nil
})
require.NoError(t, err)
require.Equal(t, len(m), 0)
m = map[string]struct{}{}
err = st.WalkLinks("foo", l0, func(id string) error {
m[id] = struct{}{}
return nil
})
require.NoError(t, err)
require.Equal(t, len(m), 1)
// release sub1 now releases foo as well
err = st.Release("sub1-result")
require.NoError(t, err)
_, err = st.Get("sub1")
require.Error(t, err)
require.Equal(t, errors.Cause(err), ErrNotFound)
_, err = st.Get("foo")
require.Error(t, err)
require.Equal(t, errors.Cause(err), ErrNotFound)
}
func getFunctionName(i interface{}) string {
fullname := runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
dot := strings.LastIndex(fullname, ".") + 1

View File

@ -302,7 +302,9 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
} else {
for _, r := range records {
e.cacheRecords[r.ID] = r
if r.Loadable {
e.cacheRecords[r.ID] = r
}
}
if len(records) > 0 {
e.keys = append(e.keys, records[0].CacheKey)
@ -423,7 +425,9 @@ func (e *edge) recalcCurrentState() {
for k, r := range newRecords {
e.keys = append(e.keys, r.CacheKey)
e.cacheRecords[k] = r
if r.Loadable {
e.cacheRecords[k] = r
}
}
// detect lower/upper bound for current state
@ -622,7 +626,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory)
// execIfPossible creates a request for getting the edge result if there is
// enough state
func (e *edge) execIfPossible(f *pipeFactory) {
if len(e.keys) > 0 {
if len(e.cacheRecords) > 0 {
if e.keysDidChange {
e.postpone(f)
return
@ -652,6 +656,9 @@ func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
var rec *CacheRecord
for _, r := range e.cacheRecords {
if !r.Loadable {
continue
}
if rec == nil || rec.CreatedAt.Before(r.CreatedAt) || (rec.CreatedAt.Equal(r.CreatedAt) && rec.Priority < r.Priority) {
rec = r
}

View File

@ -9,19 +9,24 @@ import (
)
func NewInMemoryCacheStorage() CacheKeyStorage {
return &inMemoryStore{byID: map[string]*inMemoryKey{}}
return &inMemoryStore{
byID: map[string]*inMemoryKey{},
byResult: map[string]map[string]struct{}{},
}
}
type inMemoryStore struct {
mu sync.RWMutex
byID map[string]*inMemoryKey
mu sync.RWMutex
byID map[string]*inMemoryKey
byResult map[string]map[string]struct{}
}
type inMemoryKey struct {
CacheKeyInfo
results map[string]CacheResult
links map[CacheInfoLink]map[string]struct{}
results map[string]CacheResult
links map[CacheInfoLink]map[string]struct{}
backlinks map[string]struct{}
}
func (s *inMemoryStore) Get(id string) (CacheKeyInfo, error) {
@ -40,8 +45,9 @@ func (s *inMemoryStore) Set(info CacheKeyInfo) error {
k, ok := s.byID[info.ID]
if !ok {
k = &inMemoryKey{
results: map[string]CacheResult{},
links: map[CacheInfoLink]map[string]struct{}{},
results: map[string]CacheResult{},
links: map[CacheInfoLink]map[string]struct{}{},
backlinks: map[string]struct{}{},
}
s.byID[info.ID] = k
}
@ -49,14 +55,37 @@ func (s *inMemoryStore) Set(info CacheKeyInfo) error {
return nil
}
func (s *inMemoryStore) Walk(fn func(string) error) error {
s.mu.RLock()
ids := make([]string, 0, len(s.byID))
for id := range s.byID {
ids = append(ids, id)
}
s.mu.RUnlock()
for _, id := range ids {
if err := fn(id); err != nil {
return err
}
}
return nil
}
func (s *inMemoryStore) WalkResults(id string, fn func(CacheResult) error) error {
s.mu.RLock()
defer s.mu.RUnlock()
k, ok := s.byID[id]
if !ok {
s.mu.RUnlock()
return nil
}
copy := make([]CacheResult, 0, len(k.results))
for _, res := range k.results {
copy = append(copy, res)
}
s.mu.RUnlock()
for _, res := range copy {
if err := fn(res); err != nil {
return err
}
@ -86,11 +115,63 @@ func (s *inMemoryStore) AddResult(id string, res CacheResult) error {
return errors.Wrapf(ErrNotFound, "no such key %s", id)
}
k.results[res.ID] = res
m, ok := s.byResult[res.ID]
if !ok {
m = map[string]struct{}{}
s.byResult[res.ID] = m
}
m[id] = struct{}{}
return nil
}
func (s *inMemoryStore) Release(resultID string) error {
return errors.Errorf("not-implemented")
s.mu.Lock()
defer s.mu.Unlock()
ids, ok := s.byResult[resultID]
if !ok {
return nil
}
for id := range ids {
k, ok := s.byID[id]
if !ok {
continue
}
delete(k.results, resultID)
delete(s.byResult[resultID], id)
if len(s.byResult[resultID]) == 0 {
delete(s.byResult, resultID)
}
if len(k.results) == 0 && len(k.links) == 0 {
s.emptyBranchWithParents(k)
}
}
return nil
}
func (s *inMemoryStore) emptyBranchWithParents(k *inMemoryKey) {
if len(k.results) != 0 && len(k.links) != 0 {
return
}
for id := range k.backlinks {
p, ok := s.byID[id]
if !ok {
continue
}
for l := range p.links {
delete(p.links[l], k.ID)
if len(p.links[l]) == 0 {
delete(p.links, l)
}
}
s.emptyBranchWithParents(p)
}
delete(s.byID, k.ID)
}
func (s *inMemoryStore) AddLink(id string, link CacheInfoLink, target string) error {
@ -100,12 +181,17 @@ func (s *inMemoryStore) AddLink(id string, link CacheInfoLink, target string) er
if !ok {
return errors.Wrapf(ErrNotFound, "no such key %s", id)
}
k2, ok := s.byID[target]
if !ok {
return errors.Wrapf(ErrNotFound, "no such key %s", target)
}
m, ok := k.links[link]
if !ok {
m = map[string]struct{}{}
k.links[link] = m
}
k2.backlinks[id] = struct{}{}
m[target] = struct{}{}
return nil
}
@ -149,3 +235,8 @@ func (s *inMemoryResultStore) Load(ctx context.Context, res CacheResult) (Result
func (s *inMemoryResultStore) LoadRemote(ctx context.Context, res CacheResult) (*Remote, error) {
return nil, nil
}
func (s *inMemoryResultStore) Exists(id string) bool {
_, ok := s.m.Load(id)
return ok
}

View File

@ -130,7 +130,7 @@ type CacheRecord struct {
ID string
CacheKey ExportableCacheKey
CacheManager CacheManager
// Loadable bool
Loadable bool
// Size int
CreatedAt time.Time
Priority int