Merge pull request #299 from tonistiigi/cache-release
solver: add support for releasing cached resultsdocker-18.09
commit
1b31410147
|
@ -10,9 +10,11 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
mainBucket = "_main"
|
||||
resultBucket = "_result"
|
||||
linksBucket = "_links"
|
||||
mainBucket = "_main"
|
||||
resultBucket = "_result"
|
||||
linksBucket = "_links"
|
||||
byResultBucket = "_byresult"
|
||||
backlinksBucket = "_backlinks"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
|
@ -25,7 +27,7 @@ func NewStore(dbPath string) (*Store, error) {
|
|||
return nil, errors.Wrapf(err, "failed to open database file %s", dbPath)
|
||||
}
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
for _, b := range []string{mainBucket, resultBucket, linksBucket} {
|
||||
for _, b := range []string{mainBucket, resultBucket, linksBucket, byResultBucket, backlinksBucket} {
|
||||
if _, err := tx.CreateBucketIfNotExists([]byte(b)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -70,6 +72,25 @@ func (s *Store) Set(info solver.CacheKeyInfo) error {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *Store) Walk(fn func(id string) error) error {
|
||||
ids := make([]string, 0)
|
||||
if err := s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(mainBucket))
|
||||
return b.ForEach(func(k, v []byte) error {
|
||||
ids = append(ids, string(k))
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, id := range ids {
|
||||
if err := fn(id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) WalkResults(id string, fn func(solver.CacheResult) error) error {
|
||||
return s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(resultBucket))
|
||||
|
@ -119,11 +140,7 @@ func (s *Store) Load(id string, resultID string) (solver.CacheResult, error) {
|
|||
|
||||
func (s *Store) AddResult(id string, res solver.CacheResult) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(resultBucket))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
b, err := b.CreateBucketIfNotExists([]byte(id))
|
||||
b, err := tx.Bucket([]byte(resultBucket)).CreateBucketIfNotExists([]byte(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -131,21 +148,129 @@ func (s *Store) AddResult(id string, res solver.CacheResult) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put([]byte(res.ID), dt)
|
||||
if err := b.Put([]byte(res.ID), dt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err = tx.Bucket([]byte(byResultBucket)).CreateBucketIfNotExists([]byte(res.ID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Put([]byte(id), []byte{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) Release(resultID string) error {
|
||||
return errors.Errorf("not-implemented")
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(byResultBucket))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
b = b.Bucket([]byte(resultID))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
if err := b.ForEach(func(k, v []byte) error {
|
||||
return s.releaseHelper(tx, string(k), resultID)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) releaseHelper(tx *bolt.Tx, id, resultID string) error {
|
||||
results := tx.Bucket([]byte(resultBucket)).Bucket([]byte(id))
|
||||
if results == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := results.Delete([]byte(resultID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ids := tx.Bucket([]byte(byResultBucket))
|
||||
|
||||
ids = ids.Bucket([]byte(resultID))
|
||||
if ids == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := ids.Delete([]byte(resultID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isEmptyBucket(ids) {
|
||||
if err := tx.Bucket([]byte(byResultBucket)).DeleteBucket([]byte(resultID)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
links := tx.Bucket([]byte(resultBucket))
|
||||
if results == nil {
|
||||
return nil
|
||||
}
|
||||
links = links.Bucket([]byte(id))
|
||||
|
||||
return s.emptyBranchWithParents(tx, []byte(id))
|
||||
}
|
||||
|
||||
func (s *Store) emptyBranchWithParents(tx *bolt.Tx, id []byte) error {
|
||||
results := tx.Bucket([]byte(resultBucket)).Bucket(id)
|
||||
if results == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
isEmptyLinks := true
|
||||
links := tx.Bucket([]byte(linksBucket)).Bucket(id)
|
||||
if links != nil {
|
||||
isEmptyLinks = isEmptyBucket(links)
|
||||
}
|
||||
|
||||
if !isEmptyBucket(results) || !isEmptyLinks {
|
||||
return nil
|
||||
}
|
||||
|
||||
if backlinks := tx.Bucket([]byte(backlinksBucket)).Bucket(id); backlinks != nil {
|
||||
if err := backlinks.ForEach(func(k, v []byte) error {
|
||||
if subLinks := tx.Bucket([]byte(linksBucket)).Bucket(k); subLinks != nil {
|
||||
if err := subLinks.ForEach(func(k, v []byte) error {
|
||||
parts := bytes.Split(k, []byte("@"))
|
||||
if len(parts) != 2 {
|
||||
return errors.Errorf("invalid key %s", k)
|
||||
}
|
||||
if bytes.Equal(id, parts[1]) {
|
||||
return subLinks.Delete(k)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isEmptyBucket(subLinks) {
|
||||
if err := tx.Bucket([]byte(linksBucket)).DeleteBucket(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return s.emptyBranchWithParents(tx, k)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tx.Bucket([]byte(backlinksBucket)).DeleteBucket(id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return tx.Bucket([]byte(mainBucket)).Delete(id)
|
||||
}
|
||||
|
||||
func (s *Store) AddLink(id string, link solver.CacheInfoLink, target string) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(linksBucket))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
b, err := b.CreateBucketIfNotExists([]byte(id))
|
||||
b, err := tx.Bucket([]byte(linksBucket)).CreateBucketIfNotExists([]byte(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -155,7 +280,20 @@ func (s *Store) AddLink(id string, link solver.CacheInfoLink, target string) err
|
|||
return err
|
||||
}
|
||||
|
||||
return b.Put(bytes.Join([][]byte{dt, []byte(target)}, []byte("@")), []byte{})
|
||||
if err := b.Put(bytes.Join([][]byte{dt, []byte(target)}, []byte("@")), []byte{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err = tx.Bucket([]byte(backlinksBucket)).CreateBucketIfNotExists([]byte(target))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := b.Put([]byte(id), []byte{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -198,3 +336,8 @@ func (s *Store) WalkLinks(id string, link solver.CacheInfoLink, fn func(id strin
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isEmptyBucket(b *bolt.Bucket) bool {
|
||||
k, _ := b.Cursor().First()
|
||||
return k == nil
|
||||
}
|
||||
|
|
|
@ -21,11 +21,26 @@ var internalMemoryKey = internalMemoryKeyT("buildkit/memory-cache-id")
|
|||
var NoSelector = digest.FromBytes(nil)
|
||||
|
||||
func NewInMemoryCacheManager() CacheManager {
|
||||
return &inMemoryCacheManager{
|
||||
id: identity.NewID(),
|
||||
backend: NewInMemoryCacheStorage(),
|
||||
results: NewInMemoryResultStorage(),
|
||||
return NewCacheManager(identity.NewID(), NewInMemoryCacheStorage(), NewInMemoryResultStorage())
|
||||
}
|
||||
|
||||
func NewCacheManager(id string, storage CacheKeyStorage, results CacheResultStorage) CacheManager {
|
||||
cm := &inMemoryCacheManager{
|
||||
id: id,
|
||||
backend: storage,
|
||||
results: results,
|
||||
}
|
||||
|
||||
storage.Walk(func(id string) error {
|
||||
return storage.WalkResults(id, func(cr CacheResult) error {
|
||||
if !results.Exists(cr.ID) {
|
||||
storage.Release(cr.ID)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
return cm
|
||||
}
|
||||
|
||||
type inMemoryCacheKey struct {
|
||||
|
@ -180,7 +195,7 @@ func (c *inMemoryCacheManager) getBestResult(cki CacheKeyInfo) (*CacheResult, er
|
|||
}
|
||||
|
||||
sort.Slice(results, func(i, j int) bool {
|
||||
return results[i].CreatedAt.Before(results[j].CreatedAt)
|
||||
return results[i].CreatedAt.After(results[j].CreatedAt)
|
||||
})
|
||||
|
||||
if len(results) > 0 {
|
||||
|
@ -243,16 +258,19 @@ func (c *inMemoryCacheManager) Query(deps []ExportableCacheKey, input Index, dgs
|
|||
refs[ck.CacheKeyInfo.ID] = struct{}{}
|
||||
}
|
||||
|
||||
keys := make([]*inMemoryCacheKey, 0)
|
||||
outs := make([]*CacheRecord, 0, len(refs))
|
||||
for id := range refs {
|
||||
cki, err := c.backend.Get(id)
|
||||
if err == nil {
|
||||
k := c.toInMemoryCacheKey(cki)
|
||||
keys = append(keys, k)
|
||||
if err := c.backend.WalkResults(id, func(r CacheResult) error {
|
||||
outs = append(outs, &CacheRecord{
|
||||
ID: id + "@" + r.ID,
|
||||
CacheKey: withExporter(k, &r),
|
||||
CacheManager: c,
|
||||
Loadable: true,
|
||||
CreatedAt: r.CreatedAt,
|
||||
})
|
||||
return nil
|
||||
|
@ -262,6 +280,17 @@ func (c *inMemoryCacheManager) Query(deps []ExportableCacheKey, input Index, dgs
|
|||
}
|
||||
}
|
||||
|
||||
if len(outs) == 0 {
|
||||
for _, k := range keys {
|
||||
outs = append(outs, &CacheRecord{
|
||||
ID: k.CacheKeyInfo.ID,
|
||||
CacheKey: withExporter(k, nil),
|
||||
CacheManager: c,
|
||||
Loadable: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return outs, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -194,6 +194,87 @@ func TestInMemoryCacheSelectorNested(t *testing.T) {
|
|||
require.Equal(t, len(matches), 1)
|
||||
}
|
||||
|
||||
func TestInMemoryCacheReleaseParent(t *testing.T) {
|
||||
storage := NewInMemoryCacheStorage()
|
||||
results := NewInMemoryResultStorage()
|
||||
m := NewCacheManager(identity.NewID(), storage, results)
|
||||
|
||||
res0 := testResult("result0")
|
||||
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0)
|
||||
require.NoError(t, err)
|
||||
|
||||
res1 := testResult("result1")
|
||||
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
|
||||
{CacheKey: cacheFoo},
|
||||
}), res1)
|
||||
require.NoError(t, err)
|
||||
|
||||
matches, err := m.Query(nil, 0, dgst("foo"), 0, "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(matches), 1)
|
||||
|
||||
require.True(t, matches[0].Loadable)
|
||||
|
||||
err = storage.Release(res0.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
// foo becomes unloadable
|
||||
matches, err = m.Query(nil, 0, dgst("foo"), 0, "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(matches), 1)
|
||||
|
||||
require.False(t, matches[0].Loadable)
|
||||
|
||||
matches, err = m.Query([]ExportableCacheKey{matches[0].CacheKey}, 0, dgst("bar"), 0, "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(matches), 1)
|
||||
|
||||
require.True(t, matches[0].Loadable)
|
||||
|
||||
// releasing bar releases both foo and bar
|
||||
err = storage.Release(res1.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
matches, err = m.Query(nil, 0, dgst("foo"), 0, "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(matches), 0)
|
||||
}
|
||||
|
||||
// TestInMemoryCacheRestoreOfflineDeletion deletes a result while the
|
||||
// cachemanager is not running and checks that it syncs up on restore
|
||||
func TestInMemoryCacheRestoreOfflineDeletion(t *testing.T) {
|
||||
storage := NewInMemoryCacheStorage()
|
||||
results := NewInMemoryResultStorage()
|
||||
m := NewCacheManager(identity.NewID(), storage, results)
|
||||
|
||||
res0 := testResult("result0")
|
||||
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0)
|
||||
require.NoError(t, err)
|
||||
|
||||
res1 := testResult("result1")
|
||||
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
|
||||
{CacheKey: cacheFoo},
|
||||
}), res1)
|
||||
require.NoError(t, err)
|
||||
|
||||
results2 := NewInMemoryResultStorage()
|
||||
_, err = results2.Save(res1) // only add bar
|
||||
require.NoError(t, err)
|
||||
|
||||
m = NewCacheManager(identity.NewID(), storage, results2)
|
||||
|
||||
matches, err := m.Query(nil, 0, dgst("foo"), 0, "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(matches), 1)
|
||||
require.False(t, matches[0].Loadable)
|
||||
|
||||
matches, err = m.Query([]ExportableCacheKey{matches[0].CacheKey}, 0, dgst("bar"), 0, "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(matches), 1)
|
||||
|
||||
require.True(t, matches[0].Loadable)
|
||||
}
|
||||
|
||||
func dgst(s string) digest.Digest {
|
||||
return digest.FromBytes([]byte(s))
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ var ErrNotFound = errors.Errorf("not found")
|
|||
// CacheKeyStorage is interface for persisting cache metadata
|
||||
type CacheKeyStorage interface {
|
||||
Get(id string) (CacheKeyInfo, error)
|
||||
Walk(fn func(id string) error) error
|
||||
Set(info CacheKeyInfo) error
|
||||
|
||||
WalkResults(id string, fn func(CacheResult) error) error
|
||||
|
@ -59,4 +60,5 @@ type CacheResultStorage interface {
|
|||
Save(Result) (CacheResult, error)
|
||||
Load(ctx context.Context, res CacheResult) (Result, error)
|
||||
LoadRemote(ctx context.Context, res CacheResult) (*Remote, error)
|
||||
Exists(id string) bool
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@ func RunCacheStorageTests(t *testing.T, st func() (CacheKeyStorage, func())) {
|
|||
testGetSet,
|
||||
testResults,
|
||||
testLinks,
|
||||
testResultReleaseSingleLevel,
|
||||
testResultReleaseMultiLevel,
|
||||
} {
|
||||
runStorageTest(t, tc, st)
|
||||
}
|
||||
|
@ -144,13 +146,18 @@ func testLinks(t *testing.T, st CacheKeyStorage) {
|
|||
err = st.Set(cki2)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, st.Set(CacheKeyInfo{ID: "target0"}))
|
||||
require.NoError(t, st.Set(CacheKeyInfo{ID: "target0-second"}))
|
||||
require.NoError(t, st.Set(CacheKeyInfo{ID: "target1"}))
|
||||
require.NoError(t, st.Set(CacheKeyInfo{ID: "target1-second"}))
|
||||
|
||||
l0 := CacheInfoLink{
|
||||
Input: 0, Output: 1, Digest: digest.FromBytes([]byte(">target0")),
|
||||
}
|
||||
err = st.AddLink(cki.ID, l0, "target0")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddLink(cki2.ID, l0, "target0-bar")
|
||||
err = st.AddLink(cki2.ID, l0, "target0-second")
|
||||
require.NoError(t, err)
|
||||
|
||||
m := map[string]struct{}{}
|
||||
|
@ -205,6 +212,176 @@ func testLinks(t *testing.T, st CacheKeyStorage) {
|
|||
require.True(t, ok)
|
||||
}
|
||||
|
||||
func testResultReleaseSingleLevel(t *testing.T, st CacheKeyStorage) {
|
||||
t.Parallel()
|
||||
cki := CacheKeyInfo{
|
||||
ID: "foo",
|
||||
Base: digest.FromBytes([]byte("foo")),
|
||||
}
|
||||
err := st.Set(cki)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddResult(cki.ID, CacheResult{
|
||||
ID: "foo0",
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddResult(cki.ID, CacheResult{
|
||||
ID: "foo1",
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.Release("foo0")
|
||||
require.NoError(t, err)
|
||||
|
||||
m := map[string]struct{}{}
|
||||
st.WalkResults("foo", func(res CacheResult) error {
|
||||
m[res.ID] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
|
||||
require.Equal(t, len(m), 1)
|
||||
_, ok := m["foo1"]
|
||||
require.True(t, ok)
|
||||
|
||||
err = st.Release("foo1")
|
||||
require.NoError(t, err)
|
||||
|
||||
m = map[string]struct{}{}
|
||||
st.WalkResults("foo", func(res CacheResult) error {
|
||||
m[res.ID] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
|
||||
require.Equal(t, len(m), 0)
|
||||
|
||||
_, err = st.Get("foo")
|
||||
require.Error(t, err)
|
||||
require.Error(t, errors.Cause(err), ErrNotFound)
|
||||
}
|
||||
|
||||
func testResultReleaseMultiLevel(t *testing.T, st CacheKeyStorage) {
|
||||
t.Parallel()
|
||||
cki := CacheKeyInfo{
|
||||
ID: "foo",
|
||||
Base: digest.FromBytes([]byte("foo")),
|
||||
}
|
||||
err := st.Set(cki)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddResult(cki.ID, CacheResult{
|
||||
ID: "foo-result",
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
sub0 := CacheKeyInfo{
|
||||
ID: "sub0",
|
||||
Base: digest.FromBytes([]byte("sub0")),
|
||||
}
|
||||
err = st.Set(sub0)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddResult(sub0.ID, CacheResult{
|
||||
ID: "sub0-result",
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
l0 := CacheInfoLink{
|
||||
Input: 0, Output: 1, Digest: digest.FromBytes([]byte("to-sub0")),
|
||||
}
|
||||
err = st.AddLink(cki.ID, l0, "sub0")
|
||||
require.NoError(t, err)
|
||||
|
||||
sub1 := CacheKeyInfo{
|
||||
ID: "sub1",
|
||||
Base: digest.FromBytes([]byte("sub1")),
|
||||
}
|
||||
err = st.Set(sub1)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddResult(sub1.ID, CacheResult{
|
||||
ID: "sub1-result",
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddLink(cki.ID, l0, "sub1")
|
||||
require.NoError(t, err)
|
||||
|
||||
// delete one sub doesn't delete parent
|
||||
|
||||
err = st.Release("sub0-result")
|
||||
require.NoError(t, err)
|
||||
|
||||
m := map[string]struct{}{}
|
||||
err = st.WalkResults("foo", func(res CacheResult) error {
|
||||
m[res.ID] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(m), 1)
|
||||
_, ok := m["foo-result"]
|
||||
require.True(t, ok)
|
||||
|
||||
_, err = st.Get("sub0")
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errors.Cause(err), ErrNotFound)
|
||||
|
||||
m = map[string]struct{}{}
|
||||
err = st.WalkLinks("foo", l0, func(id string) error {
|
||||
m[id] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(m), 1)
|
||||
|
||||
_, ok = m["sub1"]
|
||||
require.True(t, ok)
|
||||
|
||||
// release foo removes the result but doesn't break the chain
|
||||
|
||||
err = st.Release("foo-result")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = st.Get("foo")
|
||||
require.NoError(t, err)
|
||||
|
||||
m = map[string]struct{}{}
|
||||
err = st.WalkResults("foo", func(res CacheResult) error {
|
||||
m[res.ID] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(m), 0)
|
||||
|
||||
m = map[string]struct{}{}
|
||||
err = st.WalkLinks("foo", l0, func(id string) error {
|
||||
m[id] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(m), 1)
|
||||
|
||||
// release sub1 now releases foo as well
|
||||
|
||||
err = st.Release("sub1-result")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = st.Get("sub1")
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errors.Cause(err), ErrNotFound)
|
||||
|
||||
_, err = st.Get("foo")
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errors.Cause(err), ErrNotFound)
|
||||
}
|
||||
|
||||
func getFunctionName(i interface{}) string {
|
||||
fullname := runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
|
||||
dot := strings.LastIndex(fullname, ".") + 1
|
||||
|
|
|
@ -302,7 +302,9 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
|
|||
logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
|
||||
} else {
|
||||
for _, r := range records {
|
||||
e.cacheRecords[r.ID] = r
|
||||
if r.Loadable {
|
||||
e.cacheRecords[r.ID] = r
|
||||
}
|
||||
}
|
||||
if len(records) > 0 {
|
||||
e.keys = append(e.keys, records[0].CacheKey)
|
||||
|
@ -423,7 +425,9 @@ func (e *edge) recalcCurrentState() {
|
|||
|
||||
for k, r := range newRecords {
|
||||
e.keys = append(e.keys, r.CacheKey)
|
||||
e.cacheRecords[k] = r
|
||||
if r.Loadable {
|
||||
e.cacheRecords[k] = r
|
||||
}
|
||||
}
|
||||
|
||||
// detect lower/upper bound for current state
|
||||
|
@ -622,7 +626,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory)
|
|||
// execIfPossible creates a request for getting the edge result if there is
|
||||
// enough state
|
||||
func (e *edge) execIfPossible(f *pipeFactory) {
|
||||
if len(e.keys) > 0 {
|
||||
if len(e.cacheRecords) > 0 {
|
||||
if e.keysDidChange {
|
||||
e.postpone(f)
|
||||
return
|
||||
|
@ -652,6 +656,9 @@ func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
|
|||
var rec *CacheRecord
|
||||
|
||||
for _, r := range e.cacheRecords {
|
||||
if !r.Loadable {
|
||||
continue
|
||||
}
|
||||
if rec == nil || rec.CreatedAt.Before(r.CreatedAt) || (rec.CreatedAt.Equal(r.CreatedAt) && rec.Priority < r.Priority) {
|
||||
rec = r
|
||||
}
|
||||
|
|
|
@ -9,19 +9,24 @@ import (
|
|||
)
|
||||
|
||||
func NewInMemoryCacheStorage() CacheKeyStorage {
|
||||
return &inMemoryStore{byID: map[string]*inMemoryKey{}}
|
||||
return &inMemoryStore{
|
||||
byID: map[string]*inMemoryKey{},
|
||||
byResult: map[string]map[string]struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
type inMemoryStore struct {
|
||||
mu sync.RWMutex
|
||||
byID map[string]*inMemoryKey
|
||||
mu sync.RWMutex
|
||||
byID map[string]*inMemoryKey
|
||||
byResult map[string]map[string]struct{}
|
||||
}
|
||||
|
||||
type inMemoryKey struct {
|
||||
CacheKeyInfo
|
||||
|
||||
results map[string]CacheResult
|
||||
links map[CacheInfoLink]map[string]struct{}
|
||||
results map[string]CacheResult
|
||||
links map[CacheInfoLink]map[string]struct{}
|
||||
backlinks map[string]struct{}
|
||||
}
|
||||
|
||||
func (s *inMemoryStore) Get(id string) (CacheKeyInfo, error) {
|
||||
|
@ -40,8 +45,9 @@ func (s *inMemoryStore) Set(info CacheKeyInfo) error {
|
|||
k, ok := s.byID[info.ID]
|
||||
if !ok {
|
||||
k = &inMemoryKey{
|
||||
results: map[string]CacheResult{},
|
||||
links: map[CacheInfoLink]map[string]struct{}{},
|
||||
results: map[string]CacheResult{},
|
||||
links: map[CacheInfoLink]map[string]struct{}{},
|
||||
backlinks: map[string]struct{}{},
|
||||
}
|
||||
s.byID[info.ID] = k
|
||||
}
|
||||
|
@ -49,14 +55,37 @@ func (s *inMemoryStore) Set(info CacheKeyInfo) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *inMemoryStore) Walk(fn func(string) error) error {
|
||||
s.mu.RLock()
|
||||
ids := make([]string, 0, len(s.byID))
|
||||
for id := range s.byID {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
for _, id := range ids {
|
||||
if err := fn(id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *inMemoryStore) WalkResults(id string, fn func(CacheResult) error) error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
k, ok := s.byID[id]
|
||||
if !ok {
|
||||
s.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
copy := make([]CacheResult, 0, len(k.results))
|
||||
for _, res := range k.results {
|
||||
copy = append(copy, res)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
for _, res := range copy {
|
||||
if err := fn(res); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -86,11 +115,61 @@ func (s *inMemoryStore) AddResult(id string, res CacheResult) error {
|
|||
return errors.Wrapf(ErrNotFound, "no such key %s", id)
|
||||
}
|
||||
k.results[res.ID] = res
|
||||
m, ok := s.byResult[res.ID]
|
||||
if !ok {
|
||||
m = map[string]struct{}{}
|
||||
s.byResult[res.ID] = m
|
||||
}
|
||||
m[id] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *inMemoryStore) Release(resultID string) error {
|
||||
return errors.Errorf("not-implemented")
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
ids, ok := s.byResult[resultID]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
for id := range ids {
|
||||
k, ok := s.byID[id]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
delete(k.results, resultID)
|
||||
delete(s.byResult[resultID], id)
|
||||
if len(s.byResult[resultID]) == 0 {
|
||||
delete(s.byResult, resultID)
|
||||
}
|
||||
|
||||
s.emptyBranchWithParents(k)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *inMemoryStore) emptyBranchWithParents(k *inMemoryKey) {
|
||||
if len(k.results) != 0 || len(k.links) != 0 {
|
||||
return
|
||||
}
|
||||
for id := range k.backlinks {
|
||||
p, ok := s.byID[id]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for l := range p.links {
|
||||
delete(p.links[l], k.ID)
|
||||
if len(p.links[l]) == 0 {
|
||||
delete(p.links, l)
|
||||
}
|
||||
}
|
||||
s.emptyBranchWithParents(p)
|
||||
}
|
||||
|
||||
delete(s.byID, k.ID)
|
||||
}
|
||||
|
||||
func (s *inMemoryStore) AddLink(id string, link CacheInfoLink, target string) error {
|
||||
|
@ -100,12 +179,17 @@ func (s *inMemoryStore) AddLink(id string, link CacheInfoLink, target string) er
|
|||
if !ok {
|
||||
return errors.Wrapf(ErrNotFound, "no such key %s", id)
|
||||
}
|
||||
k2, ok := s.byID[target]
|
||||
if !ok {
|
||||
return errors.Wrapf(ErrNotFound, "no such key %s", target)
|
||||
}
|
||||
m, ok := k.links[link]
|
||||
if !ok {
|
||||
m = map[string]struct{}{}
|
||||
k.links[link] = m
|
||||
}
|
||||
|
||||
k2.backlinks[id] = struct{}{}
|
||||
m[target] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
@ -149,3 +233,8 @@ func (s *inMemoryResultStore) Load(ctx context.Context, res CacheResult) (Result
|
|||
func (s *inMemoryResultStore) LoadRemote(ctx context.Context, res CacheResult) (*Remote, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *inMemoryResultStore) Exists(id string) bool {
|
||||
_, ok := s.m.Load(id)
|
||||
return ok
|
||||
}
|
||||
|
|
|
@ -130,7 +130,7 @@ type CacheRecord struct {
|
|||
ID string
|
||||
CacheKey ExportableCacheKey
|
||||
CacheManager CacheManager
|
||||
// Loadable bool
|
||||
Loadable bool
|
||||
// Size int
|
||||
CreatedAt time.Time
|
||||
Priority int
|
||||
|
|
Loading…
Reference in New Issue