From eac79f7c7e936453a58508761638e16ffb255085 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Fri, 14 Jul 2017 11:59:31 -0700 Subject: [PATCH] cache: refactor reference reuse and caching Replaces previous mutable.Freeze logic with commits that can live together with mutable data. Finalize method is added if the implementation needs to make sure that the immutable ref is flushed to the driver. Refs are automaitcally finalized when writable layers are created on top of them. Signed-off-by: Tonis Tiigi --- cache/instructioncache/cache.go | 91 ++------ cache/manager.go | 76 +++++-- cache/manager_test.go | 334 +++++++++++++++++++--------- cache/metadata.go | 34 ++- cache/metadata/metadata.go | 19 +- cache/metadata/metadata_test.go | 6 +- cache/refs.go | 173 ++++++++------ client/llb/llb.go | 4 +- control/control_default.go | 1 + control/control_standalone_test.go | 2 +- snapshot/blobmapping/snapshotter.go | 2 +- snapshot/localmounter.go | 11 +- solver/exec.go | 24 +- solver/pb/const.go | 2 + solver/solver.go | 121 ++++++---- solver/source.go | 7 +- solver/state.go | 10 +- source/git/gitsource.go | 16 +- source/local/local.go | 61 ++++- 19 files changed, 635 insertions(+), 359 deletions(-) diff --git a/cache/instructioncache/cache.go b/cache/instructioncache/cache.go index f1180b6c..74b619b7 100644 --- a/cache/instructioncache/cache.go +++ b/cache/instructioncache/cache.go @@ -1,6 +1,7 @@ package instructioncache import ( + "github.com/Sirupsen/logrus" "github.com/boltdb/bolt" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" @@ -10,103 +11,51 @@ import ( const cacheKey = "buildkit.instructioncache" -type cacheGroup struct { - Snapshots []string `json:"snapshots"` -} - type LocalStore struct { MetadataStore *metadata.Store Cache cache.Accessor } -func (ls *LocalStore) Set(key string, refsAny []interface{}) error { - refs, err := toReferenceArray(refsAny) - if err != nil { - return err +func (ls *LocalStore) Set(key string, value interface{}) error { + ref, ok := value.(cache.ImmutableRef) + if !ok { + return errors.Errorf("invalid ref") } - cg := cacheGroup{} - for _, r := range refs { - cg.Snapshots = append(cg.Snapshots, r.ID()) - } - v, err := metadata.NewValue(cg) + v, err := metadata.NewValue(ref.ID()) if err != nil { return err } v.Index = index(key) - for _, r := range refs { - si, _ := ls.MetadataStore.Get(r.ID()) - if err := si.Update(func(b *bolt.Bucket) error { // TODO: should share transaction - return si.SetValue(b, index(key), *v) - }); err != nil { - return err - } - } - return nil + si, _ := ls.MetadataStore.Get(ref.ID()) + return si.Update(func(b *bolt.Bucket) error { + return si.SetValue(b, index(key), v) + }) } -func (ls *LocalStore) Lookup(ctx context.Context, key string) ([]interface{}, error) { +func (ls *LocalStore) Lookup(ctx context.Context, key string) (interface{}, error) { snaps, err := ls.MetadataStore.Search(index(key)) if err != nil { return nil, err } - refs := make([]cache.ImmutableRef, 0) - var retErr error -loop0: - for _, s := range snaps { - retErr = nil - for _, r := range refs { - r.Release(context.TODO()) - } - refs = nil + for _, s := range snaps { v := s.Get(index(key)) if v != nil { - var cg cacheGroup - if err = v.Unmarshal(&cg); err != nil { - retErr = err + var id string + if err = v.Unmarshal(&id); err != nil { continue } - for _, id := range cg.Snapshots { - r, err := ls.Cache.Get(ctx, id) - if err != nil { - retErr = err - continue loop0 - } - refs = append(refs, r) + r, err := ls.Cache.Get(ctx, id) + if err != nil { + logrus.Warnf("failed to get cached snapshot %s: %v", id, err) + continue } - retErr = nil - break + return r, nil } } - if retErr != nil { - for _, r := range refs { - r.Release(context.TODO()) - } - refs = nil - } - return toAny(refs), retErr + return nil, nil } func index(k string) string { return cacheKey + "::" + k } - -func toReferenceArray(in []interface{}) ([]cache.ImmutableRef, error) { - out := make([]cache.ImmutableRef, 0, len(in)) - for _, i := range in { - r, ok := i.(cache.ImmutableRef) - if !ok { - return nil, errors.Errorf("invalid reference") - } - out = append(out, r) - } - return out, nil -} - -func toAny(in []cache.ImmutableRef) []interface{} { - out := make([]interface{}, 0, len(in)) - for _, i := range in { - out = append(out, i) - } - return out -} diff --git a/cache/manager.go b/cache/manager.go index 2285b8b1..5d03d1d5 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -102,15 +102,10 @@ func (cm *cacheManager) get(ctx context.Context, id string) (ImmutableRef, error defer rec.mu.Unlock() if rec.mutable { - return nil, errors.Wrapf(errInvalid, "invalid mutable ref %s", id) - } - - if rec.mutable && !rec.frozen { if len(rec.refs) != 0 { return nil, errors.Wrapf(errLocked, "%s is locked", id) - } else { - rec.frozen = true } + return rec.mref().commit(ctx) } return rec.ref(), nil @@ -121,6 +116,24 @@ func (cm *cacheManager) load(ctx context.Context, id string) (*cacheRecord, erro return rec, nil } + md, _ := cm.md.Get(id) + if mutableID := getEqualMutable(&md); mutableID != "" { + mutable, err := cm.load(ctx, mutableID) + if err != nil { + return nil, err + } + rec := &cacheRecord{ + cm: cm, + refs: make(map[Mountable]struct{}), + parent: mutable.parent, + md: &md, + equalMutable: &mutableRef{cacheRecord: mutable}, + } + mutable.equalImmutable = &immutableRef{cacheRecord: rec} + cm.records[id] = rec + return rec, nil + } + info, err := cm.Snapshotter.Stat(ctx, id) if err != nil { return nil, errors.Wrap(errNotFound, err.Error()) @@ -134,8 +147,6 @@ func (cm *cacheManager) load(ctx context.Context, id string) (*cacheRecord, erro } } - md, _ := cm.md.Get(id) - rec := &cacheRecord{ mutable: info.Kind != cdsnapshot.KindCommitted, cm: cm, @@ -143,7 +154,7 @@ func (cm *cacheManager) load(ctx context.Context, id string) (*cacheRecord, erro parent: parent, md: &md, } - cm.records[id] = rec // TODO: store to db + cm.records[id] = rec return rec, nil } @@ -158,6 +169,9 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef) (MutableRef, er if err != nil { return nil, err } + if err := parent.Finalize(ctx); err != nil { + return nil, err + } parentID = parent.ID() } @@ -185,7 +199,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef) (MutableRef, er return rec.mref(), nil } -func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, error) { // Rebase? +func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, error) { cm.mu.Lock() defer cm.mu.Unlock() @@ -200,10 +214,18 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, return nil, errors.Wrapf(errInvalid, "%s is not mutable", id) } - if rec.frozen || len(rec.refs) != 0 { + if len(rec.refs) != 0 { return nil, errors.Wrapf(errLocked, "%s is locked", id) } + if rec.equalImmutable != nil { + delete(cm.records, rec.equalImmutable.ID()) + if err := rec.equalImmutable.remove(ctx, false); err != nil { + return nil, err + } + rec.equalImmutable = nil + } + return rec.mref(), nil } @@ -222,6 +244,11 @@ func (cm *cacheManager) DiskUsage(ctx context.Context) ([]*client.UsageInfo, err for id, cr := range cm.records { cr.mu.Lock() + // ignore duplicates that share data + if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 { + cr.mu.Unlock() + continue + } c := &cacheUsageInfo{ refs: len(cr.refs), mutable: cr.mutable, @@ -230,12 +257,12 @@ func (cm *cacheManager) DiskUsage(ctx context.Context) ([]*client.UsageInfo, err if cr.parent != nil { c.parent = cr.parent.ID() } - if cr.mutable && c.refs > 0 && !cr.frozen { + if cr.mutable && c.refs > 0 { c.size = 0 // size can not be determined because it is changing } m[id] = c - cr.mu.Unlock() rescan[id] = struct{}{} + cr.mu.Unlock() } cm.mu.Unlock() @@ -270,17 +297,20 @@ func (cm *cacheManager) DiskUsage(ctx context.Context) ([]*client.UsageInfo, err if d.Size == sizeUnknown { func(d *client.UsageInfo) { eg.Go(func() error { - ref, err := cm.Get(ctx, d.ID) - if err != nil { - d.Size = 0 - return nil + if !d.Mutable { + ref, err := cm.Get(ctx, d.ID) + if err != nil { + d.Size = 0 + return nil + } + s, err := ref.Size(ctx) + if err != nil { + return err + } + d.Size = s + return ref.Release(context.TODO()) } - s, err := ref.Size(ctx) - if err != nil { - return err - } - d.Size = s - return ref.Release(context.TODO()) + return nil }) }(d) } diff --git a/cache/manager_test.go b/cache/manager_test.go index 80bb488d..e5b190e0 100644 --- a/cache/manager_test.go +++ b/cache/manager_test.go @@ -2,6 +2,7 @@ package cache import ( "context" + "fmt" "io/ioutil" "os" "path/filepath" @@ -12,127 +13,254 @@ import ( "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/snapshot" "github.com/pkg/errors" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestManager(t *testing.T) { ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") tmpdir, err := ioutil.TempDir("", "cachemanager") - assert.NoError(t, err) + require.NoError(t, err) defer os.RemoveAll(tmpdir) + cm := getCacheManager(t, tmpdir) + + _, err = cm.Get(ctx, "foobar") + require.Error(t, err) + + checkDiskUsage(t, ctx, cm, 0, 0) + + active, err := cm.New(ctx, nil) + require.NoError(t, err) + + m, err := active.Mount(ctx) + require.NoError(t, err) + + lm := snapshot.LocalMounter(m) + target, err := lm.Mount() + require.NoError(t, err) + + fi, err := os.Stat(target) + require.NoError(t, err) + require.Equal(t, fi.IsDir(), true) + + err = lm.Unmount() + require.NoError(t, err) + + _, err = cm.GetMutable(ctx, active.ID()) + require.Error(t, err) + require.Equal(t, errLocked, errors.Cause(err)) + + checkDiskUsage(t, ctx, cm, 1, 0) + + snap, err := active.Commit(ctx) + require.NoError(t, err) + + checkDiskUsage(t, ctx, cm, 1, 0) + + _, err = cm.GetMutable(ctx, active.ID()) + require.Error(t, err) + require.Equal(t, errLocked, errors.Cause(err)) + + err = snap.Release(ctx) + require.NoError(t, err) + + checkDiskUsage(t, ctx, cm, 0, 1) + + active, err = cm.GetMutable(ctx, active.ID()) + require.NoError(t, err) + + checkDiskUsage(t, ctx, cm, 1, 0) + + snap, err = active.Commit(ctx) + require.NoError(t, err) + + checkDiskUsage(t, ctx, cm, 1, 0) + + err = snap.Finalize(ctx) + require.NoError(t, err) + + err = snap.Release(ctx) + require.NoError(t, err) + + _, err = cm.GetMutable(ctx, active.ID()) + require.Error(t, err) + require.Equal(t, errNotFound, errors.Cause(err)) + + _, err = cm.GetMutable(ctx, snap.ID()) + require.Error(t, err) + require.Equal(t, errInvalid, errors.Cause(err)) + + snap, err = cm.Get(ctx, snap.ID()) + require.NoError(t, err) + + snap2, err := cm.Get(ctx, snap.ID()) + require.NoError(t, err) + + checkDiskUsage(t, ctx, cm, 1, 0) + + err = snap.Release(ctx) + require.NoError(t, err) + + active2, err := cm.New(ctx, snap2) + require.NoError(t, err) + + checkDiskUsage(t, ctx, cm, 2, 0) + + snap3, err := active2.Commit(ctx) + require.NoError(t, err) + + err = snap2.Release(ctx) + require.NoError(t, err) + + checkDiskUsage(t, ctx, cm, 2, 0) + + err = snap3.Release(ctx) + require.NoError(t, err) + + checkDiskUsage(t, ctx, cm, 0, 2) + + err = cm.Close() + require.NoError(t, err) +} + +func TestLazyCommit(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") + + tmpdir, err := ioutil.TempDir("", "cachemanager") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + cm := getCacheManager(t, tmpdir) + + active, err := cm.New(ctx, nil) + require.NoError(t, err) + + // after commit mutable is locked + snap, err := active.Commit(ctx) + require.NoError(t, err) + + _, err = cm.GetMutable(ctx, active.ID()) + require.Error(t, err) + require.Equal(t, errLocked, errors.Cause(err)) + + // immutable refs still work + snap2, err := cm.Get(ctx, snap.ID()) + require.NoError(t, err) + require.Equal(t, snap.ID(), snap2.ID()) + + err = snap.Release(ctx) + require.NoError(t, err) + + err = snap2.Release(ctx) + require.NoError(t, err) + + // immutable work after final release as well + snap, err = cm.Get(ctx, snap.ID()) + require.NoError(t, err) + require.Equal(t, snap.ID(), snap2.ID()) + err = snap.Release(ctx) + require.NoError(t, err) + + // after release mutable becomes available again + active2, err := cm.GetMutable(ctx, active.ID()) + require.NoError(t, err) + require.Equal(t, active2.ID(), active.ID()) + + // because ref was took mutable old immutable are cleared + _, err = cm.Get(ctx, snap.ID()) + require.Error(t, err) + require.Equal(t, errNotFound, errors.Cause(err)) + + snap, err = active2.Commit(ctx) + require.NoError(t, err) + + // this time finalize commit + err = snap.Finalize(ctx) + require.NoError(t, err) + + err = snap.Release(ctx) + require.NoError(t, err) + + // mutable is gone after finalize + _, err = cm.GetMutable(ctx, active2.ID()) + require.Error(t, err) + require.Equal(t, errNotFound, errors.Cause(err)) + + // immutable still works + snap2, err = cm.Get(ctx, snap.ID()) + require.NoError(t, err) + require.Equal(t, snap.ID(), snap2.ID()) + + err = snap2.Release(ctx) + require.NoError(t, err) + + // test restarting after commit + active, err = cm.New(ctx, nil) + require.NoError(t, err) + + // after commit mutable is locked + snap, err = active.Commit(ctx) + require.NoError(t, err) + + err = cm.Close() + require.NoError(t, err) + + cm = getCacheManager(t, tmpdir) + + snap2, err = cm.Get(ctx, snap.ID()) + require.NoError(t, err) + + err = snap2.Release(ctx) + require.NoError(t, err) + + active, err = cm.GetMutable(ctx, active.ID()) + require.NoError(t, err) + + _, err = cm.Get(ctx, snap.ID()) + require.Error(t, err) + require.Equal(t, errNotFound, errors.Cause(err)) + + snap, err = active.Commit(ctx) + require.NoError(t, err) + + err = cm.Close() + require.NoError(t, err) + + cm = getCacheManager(t, tmpdir) + + snap2, err = cm.Get(ctx, snap.ID()) + require.NoError(t, err) + + err = snap2.Finalize(ctx) + require.NoError(t, err) + + err = snap2.Release(ctx) + require.NoError(t, err) + + active, err = cm.GetMutable(ctx, active.ID()) + require.Error(t, err) + require.Equal(t, errNotFound, errors.Cause(err)) +} + +func getCacheManager(t *testing.T, tmpdir string) Manager { snapshotter, err := naive.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) - assert.NoError(t, err) + require.NoError(t, err) md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) - assert.NoError(t, err) + require.NoError(t, err) cm, err := NewManager(ManagerOpt{ Snapshotter: snapshotter, MetadataStore: md, }) - assert.NoError(t, err) - - _, err = cm.Get(ctx, "foobar") - assert.Error(t, err) - - checkDiskUsage(t, ctx, cm, 0, 0) - - active, err := cm.New(ctx, nil) - assert.NoError(t, err) - - m, err := active.Mount(ctx) - assert.NoError(t, err) - - lm := snapshot.LocalMounter(m) - target, err := lm.Mount() - assert.NoError(t, err) - - fi, err := os.Stat(target) - assert.NoError(t, err) - assert.Equal(t, fi.IsDir(), true) - - err = lm.Unmount() - assert.NoError(t, err) - - _, err = cm.GetMutable(ctx, active.ID()) - assert.Error(t, err) - assert.Equal(t, errLocked, errors.Cause(err)) - - checkDiskUsage(t, ctx, cm, 1, 0) - - snap, err := active.Freeze() - assert.NoError(t, err) - - checkDiskUsage(t, ctx, cm, 1, 0) - - _, err = cm.GetMutable(ctx, active.ID()) - assert.Error(t, err) - assert.Equal(t, errLocked, errors.Cause(err)) - - err = snap.Release(ctx) - assert.NoError(t, err) - - checkDiskUsage(t, ctx, cm, 0, 1) - - active, err = cm.GetMutable(ctx, active.ID()) - assert.NoError(t, err) - - checkDiskUsage(t, ctx, cm, 1, 0) - - snap, err = active.ReleaseAndCommit(ctx) - assert.NoError(t, err) - - checkDiskUsage(t, ctx, cm, 1, 0) - - err = snap.Release(ctx) - assert.NoError(t, err) - - _, err = cm.GetMutable(ctx, active.ID()) - assert.Error(t, err) - assert.Equal(t, errNotFound, errors.Cause(err)) - - _, err = cm.GetMutable(ctx, snap.ID()) - assert.Error(t, err) - assert.Equal(t, errInvalid, errors.Cause(err)) - - snap, err = cm.Get(ctx, snap.ID()) - assert.NoError(t, err) - - snap2, err := cm.Get(ctx, snap.ID()) - assert.NoError(t, err) - - checkDiskUsage(t, ctx, cm, 1, 0) - - err = snap.Release(ctx) - assert.NoError(t, err) - - active2, err := cm.New(ctx, snap2) - assert.NoError(t, err) - - checkDiskUsage(t, ctx, cm, 2, 0) - - snap3, err := active2.Freeze() - assert.NoError(t, err) - - err = snap2.Release(ctx) - assert.NoError(t, err) - - checkDiskUsage(t, ctx, cm, 2, 0) - - err = snap3.Release(ctx) - assert.NoError(t, err) - - checkDiskUsage(t, ctx, cm, 0, 2) - - err = cm.Close() - assert.NoError(t, err) + require.NoError(t, err, fmt.Sprintf("error: %+v", err)) + return cm } func checkDiskUsage(t *testing.T, ctx context.Context, cm Manager, inuse, unused int) { du, err := cm.DiskUsage(ctx) - assert.NoError(t, err) + require.NoError(t, err) var inuseActual, unusedActual int for _, r := range du { if r.InUse { @@ -141,6 +269,6 @@ func checkDiskUsage(t *testing.T, ctx context.Context, cm Manager, inuse, unused unusedActual++ } } - assert.Equal(t, inuse, inuseActual) - assert.Equal(t, unused, unusedActual) + require.Equal(t, inuse, inuseActual) + require.Equal(t, unused, unusedActual) } diff --git a/cache/metadata.go b/cache/metadata.go index 5691daaa..71e817c4 100644 --- a/cache/metadata.go +++ b/cache/metadata.go @@ -15,6 +15,8 @@ import ( const sizeUnknown int64 = -1 const keySize = "snapshot.size" +const keyEqualMutable = "cache.equalMutable" +const keyEqualImmutable = "cache.equalImmutable" func setSize(si *metadata.StorageItem, s int64) error { v, err := metadata.NewValue(s) @@ -22,7 +24,7 @@ func setSize(si *metadata.StorageItem, s int64) error { return errors.Wrap(err, "failed to create size value") } si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keySize, *v) + return si.SetValue(b, keySize, v) }) return nil } @@ -38,3 +40,33 @@ func getSize(si *metadata.StorageItem) int64 { } return size } + +func getEqualMutable(si *metadata.StorageItem) string { + v := si.Get(keyEqualMutable) + if v == nil { + return "" + } + var str string + if err := v.Unmarshal(&str); err != nil { + return "" + } + return str +} + +func setEqualMutable(si *metadata.StorageItem, s string) error { + v, err := metadata.NewValue(s) + if err != nil { + return errors.Wrapf(err, "failed to create %s meta value", keyEqualMutable) + } + si.Queue(func(b *bolt.Bucket) error { + return si.SetValue(b, keyEqualMutable, v) + }) + return nil +} + +func clearEqualMutable(si *metadata.StorageItem) error { + si.Queue(func(b *bolt.Bucket) error { + return si.SetValue(b, keyEqualMutable, nil) + }) + return nil +} diff --git a/cache/metadata/metadata.go b/cache/metadata/metadata.go index 5035b7b2..7d4cc7ac 100644 --- a/cache/metadata/metadata.go +++ b/cache/metadata/metadata.go @@ -186,10 +186,12 @@ func newStorageItem(id string, b *bolt.Bucket, s *Store) (StorageItem, error) { if b != nil { if err := b.ForEach(func(k, v []byte) error { var sv Value - if err := json.Unmarshal(v, &sv); err != nil { - return err + if len(v) > 0 { + if err := json.Unmarshal(v, &sv); err != nil { + return err + } + si.values[string(k)] = &sv } - si.values[string(k)] = &sv return nil }); err != nil { return si, err @@ -247,7 +249,14 @@ func (s *StorageItem) Indexes() (out []string) { return } -func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v Value) error { +func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error { + if v == nil { + if err := b.Put([]byte(key), nil); err != nil { + return err + } + delete(s.values, key) + return nil + } dt, err := json.Marshal(v) if err != nil { return err @@ -264,7 +273,7 @@ func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v Value) error { return err } } - s.values[key] = &v + s.values[key] = v return nil } diff --git a/cache/metadata/metadata_test.go b/cache/metadata/metadata_test.go index fec67813..69c76fce 100644 --- a/cache/metadata/metadata_test.go +++ b/cache/metadata/metadata_test.go @@ -31,7 +31,7 @@ func TestGetSetSearch(t *testing.T) { require.NoError(t, err) si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, "bar", *v) + return si.SetValue(b, "bar", v) }) err = si.Commit() @@ -72,7 +72,7 @@ func TestGetSetSearch(t *testing.T) { require.NoError(t, err) si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, "bar2", *v) + return si.SetValue(b, "bar2", v) }) err = si.Commit() @@ -135,7 +135,7 @@ func TestIndexes(t *testing.T) { v.Index = tcase.index si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, tcase.value, *v) + return si.SetValue(b, tcase.value, v) }) err = si.Commit() diff --git a/cache/refs.go b/cache/refs.go index 67ac499f..263243d7 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -17,14 +17,15 @@ type ImmutableRef interface { Release(context.Context) error Size(ctx context.Context) (int64, error) Parent() ImmutableRef + Finalize(ctx context.Context) error // Make sure reference is flushed to driver // Prepare() / ChainID() / Meta() } type MutableRef interface { Mountable ID() string - Freeze() (ImmutableRef, error) - ReleaseAndCommit(ctx context.Context) (ImmutableRef, error) + Commit(context.Context) (ImmutableRef, error) + Release(context.Context) error Size(ctx context.Context) (int64, error) } @@ -33,10 +34,8 @@ type Mountable interface { } type cacheRecord struct { - mu sync.Mutex - mutable bool - frozen bool - // meta SnapMeta + mu sync.Mutex + mutable bool refs map[Mountable]struct{} cm *cacheManager parent ImmutableRef @@ -46,6 +45,10 @@ type cacheRecord struct { sizeG flightcontrol.Group // size int64 + + // these are filled if multiple refs point to same data + equalMutable *mutableRef + equalImmutable *immutableRef } // hold manager lock before calling @@ -67,11 +70,16 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) { cr.mu.Lock() s := getSize(cr.md) - cr.mu.Unlock() if s != sizeUnknown { + cr.mu.Unlock() return s, nil } - usage, err := cr.cm.ManagerOpt.Snapshotter.Usage(ctx, cr.ID()) + driverID := cr.ID() + if cr.equalMutable != nil { + driverID = cr.equalMutable.ID() + } + cr.mu.Unlock() + usage, err := cr.cm.ManagerOpt.Snapshotter.Usage(ctx, driverID) if err != nil { return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID()) } @@ -104,6 +112,9 @@ func (cr *cacheRecord) Mount(ctx context.Context) ([]mount.Mount, error) { } return m, nil } + if err := cr.finalize(ctx); err != nil { + return nil, err + } if cr.viewMount == nil { // TODO: handle this better cr.view = identity.NewID() m, err := cr.cm.Snapshotter.View(ctx, cr.view, cr.ID()) @@ -116,6 +127,18 @@ func (cr *cacheRecord) Mount(ctx context.Context) ([]mount.Mount, error) { return cr.viewMount, nil } +func (cr *cacheRecord) remove(ctx context.Context, removeSnapshot bool) error { + if err := cr.cm.md.Clear(cr.ID()); err != nil { + return err + } + if removeSnapshot { + if err := cr.cm.Snapshotter.Remove(ctx, cr.ID()); err != nil { + return err + } + } + return nil +} + func (cr *cacheRecord) ID() string { return cr.md.ID() } @@ -148,82 +171,100 @@ func (sr *immutableRef) release(ctx context.Context) error { } delete(sr.refs, sr) - sr.frozen = false if len(sr.refs) == 0 { - //go sr.cm.GC() + if sr.equalMutable != nil { + sr.equalMutable.release(ctx) + } + // go sr.cm.GC() } return nil } -func (sr *mutableRef) Freeze() (ImmutableRef, error) { +func (sr *immutableRef) Finalize(ctx context.Context) error { + sr.mu.Lock() + defer sr.mu.Unlock() + + return sr.finalize(ctx) +} + +func (sr *cacheRecord) finalize(ctx context.Context) error { + mutable := sr.equalMutable + if mutable == nil { + return nil + } + err := sr.cm.Snapshotter.Commit(ctx, sr.ID(), sr.equalMutable.ID()) + if err != nil { + return errors.Wrapf(err, "failed to commit %s", sr.equalMutable.ID()) + } + delete(sr.cm.records, sr.equalMutable.ID()) + if err := sr.equalMutable.remove(ctx, false); err != nil { + return err + } + sr.equalMutable = nil + clearEqualMutable(sr.md) + return sr.md.Commit() +} + +func (sr *mutableRef) commit(ctx context.Context) (ImmutableRef, error) { + if !sr.mutable || len(sr.refs) == 0 { + return nil, errors.Wrapf(errInvalid, "invalid mutable") + } + + id := identity.NewID() + md, _ := sr.cm.md.Get(id) + + rec := &cacheRecord{ + cm: sr.cm, + parent: sr.parent, + equalMutable: sr, + refs: make(map[Mountable]struct{}), + md: &md, + } + + sr.cm.records[id] = rec + + if err := sr.md.Commit(); err != nil { + return nil, err + } + + setSize(&md, sizeUnknown) + setEqualMutable(&md, sr.ID()) + if err := md.Commit(); err != nil { + return nil, err + } + + ref := rec.ref() + sr.equalImmutable = ref + return ref, nil +} + +func (sr *mutableRef) Commit(ctx context.Context) (ImmutableRef, error) { sr.cm.mu.Lock() defer sr.cm.mu.Unlock() sr.mu.Lock() defer sr.mu.Unlock() - if !sr.mutable || sr.frozen || len(sr.refs) != 1 { - return nil, errors.Wrapf(errInvalid, "invalid mutable") - } - - if _, ok := sr.refs[sr]; !ok { - return nil, errors.Wrapf(errInvalid, "invalid mutable") - } - - delete(sr.refs, sr) - - sri := sr.ref() - - sri.frozen = true - setSize(sr.md, sizeUnknown) - if err := sr.md.Commit(); err != nil { - return nil, err - } - - return sri, nil + return sr.commit(ctx) } -func (sr *mutableRef) ReleaseAndCommit(ctx context.Context) (ImmutableRef, error) { +func (sr *mutableRef) Release(ctx context.Context) error { sr.cm.mu.Lock() defer sr.cm.mu.Unlock() sr.mu.Lock() + defer sr.mu.Unlock() - if !sr.mutable || sr.frozen { - sr.mu.Unlock() - return nil, errors.Wrapf(errInvalid, "invalid mutable") - } - if len(sr.refs) != 1 { - sr.mu.Unlock() - return nil, errors.Wrapf(errInvalid, "multiple mutable references") - } - - sr.mu.Unlock() - - id := identity.NewID() - - err := sr.cm.Snapshotter.Commit(ctx, id, sr.ID()) - if err != nil { - return nil, errors.Wrapf(err, "failed to commit %s", sr.ID()) - } - - delete(sr.cm.records, sr.ID()) - - if err := sr.cm.md.Clear(sr.ID()); err != nil { - return nil, err - } - - md, _ := sr.cm.md.Get(id) - - rec := &cacheRecord{ - cm: sr.cm, - parent: sr.parent, - refs: make(map[Mountable]struct{}), - md: &md, - } - sr.cm.records[id] = rec // TODO: save to db - - return rec.ref(), nil + return sr.release(ctx) +} + +func (sr *mutableRef) release(ctx context.Context) error { + delete(sr.refs, sr) + // delete(sr.cm.records, sr.ID()) + // if err := sr.remove(ctx, true); err != nil { + // return err + // } + return nil } diff --git a/client/llb/llb.go b/client/llb/llb.go index aa1736b0..28e46e11 100644 --- a/client/llb/llb.go +++ b/client/llb/llb.go @@ -153,7 +153,7 @@ func (eo *exec) marshalTo(list [][]byte, cache map[digest.Digest]struct{}) (dige } } if dgst == "" { - inputIndex = -1 + inputIndex = pb.Empty } if inputIndex == len(pop.Inputs) { var mountIndex int64 @@ -174,7 +174,7 @@ func (eo *exec) marshalTo(list [][]byte, cache map[digest.Digest]struct{}) (dige pm.Output = outputIndex outputIndex++ } else { - pm.Output = -1 + pm.Output = pb.SkipOutput } m.outputIndex = outputIndex - 1 peo.Mounts = append(peo.Mounts, pm) diff --git a/control/control_default.go b/control/control_default.go index ab21bfaf..9b689be4 100644 --- a/control/control_default.go +++ b/control/control_default.go @@ -95,6 +95,7 @@ func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) { ss, err := local.NewSource(local.Opt{ SessionManager: sessm, CacheAccessor: cm, + MetadataStore: md, }) if err != nil { return nil, err diff --git a/control/control_standalone_test.go b/control/control_standalone_test.go index 7eaf4775..7f0be46c 100644 --- a/control/control_standalone_test.go +++ b/control/control_standalone_test.go @@ -124,7 +124,7 @@ func TestControl(t *testing.T) { err = w.Exec(ctx, meta, root, nil, nil, nil) assert.NoError(t, err) - rf, err := root.Freeze() + rf, err := root.Commit(ctx) assert.NoError(t, err) mounts, err = rf.Mount(ctx) diff --git a/snapshot/blobmapping/snapshotter.go b/snapshot/blobmapping/snapshotter.go index 79158a9a..05c3541a 100644 --- a/snapshot/blobmapping/snapshotter.go +++ b/snapshot/blobmapping/snapshotter.go @@ -114,7 +114,7 @@ func (s *Snapshotter) SetBlob(ctx context.Context, key string, blob digest.Diges v.Index = index(blob) return md.Update(func(b *bolt.Bucket) error { - return md.SetValue(b, blobKey, *v) + return md.SetValue(b, blobKey, v) }) } diff --git a/snapshot/localmounter.go b/snapshot/localmounter.go index c4372485..7d42c03d 100644 --- a/snapshot/localmounter.go +++ b/snapshot/localmounter.go @@ -31,7 +31,16 @@ func (lm *localMounter) Mount() (string, error) { defer lm.mu.Unlock() if len(lm.m) == 1 && lm.m[0].Type == "bind" { - return lm.m[0].Source, nil + ro := false + for _, opt := range lm.m[0].Options { + if opt == "ro" { + ro = true + break + } + } + if !ro { + return lm.m[0].Source, nil + } } dir, err := ioutil.TempDir("", "buildkit-mount") diff --git a/solver/exec.go b/solver/exec.go index 7df8db2a..b7ceef56 100644 --- a/solver/exec.go +++ b/solver/exec.go @@ -27,7 +27,7 @@ func newExecOp(op *pb.Op_Exec, cm cache.Manager, w worker.Worker) (Op, error) { }, nil } -func (e *execOp) CacheKey(ctx context.Context, inputs []string) (string, error) { +func (e *execOp) CacheKey(ctx context.Context, inputs []string) (string, int, error) { dt, err := json.Marshal(struct { Inputs []string Exec *pb.ExecOp @@ -36,9 +36,16 @@ func (e *execOp) CacheKey(ctx context.Context, inputs []string) (string, error) Exec: e.op, }) if err != nil { - return "", err + return "", 0, err } - return digest.FromBytes(dt).String(), nil + numRefs := 0 + for _, m := range e.op.Mounts { + if m.Output != pb.SkipOutput { + numRefs++ + } + } + + return digest.FromBytes(dt).String(), numRefs, nil } func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, error) { @@ -49,10 +56,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro defer func() { for _, o := range outputs { if o != nil { - s, err := o.Freeze() // TODO: log error - if err == nil { - go s.Release(ctx) - } + go o.Release(ctx) } } }() @@ -60,7 +64,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro for _, m := range e.op.Mounts { var mountable cache.Mountable var ref cache.ImmutableRef - if m.Input != -1 { + if m.Input != pb.Empty { if int(m.Input) > len(inputs) { return nil, errors.Errorf("missing input %d", m.Input) } @@ -72,7 +76,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro } mountable = ref } - if m.Output != -1 { + if m.Output != pb.SkipOutput { active, err := e.cm.New(ctx, ref) // TODO: should be method if err != nil { return nil, err @@ -107,7 +111,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro refs := []Reference{} for i, o := range outputs { - ref, err := o.ReleaseAndCommit(ctx) + ref, err := o.Commit(ctx) if err != nil { return nil, errors.Wrapf(err, "error committing %s", o.ID()) } diff --git a/solver/pb/const.go b/solver/pb/const.go index 71b972b4..2000ce0e 100644 --- a/solver/pb/const.go +++ b/solver/pb/const.go @@ -1,3 +1,5 @@ package pb const RootMount = "/" +const SkipOutput = -1 // TODO: custom type +const Empty = -1 // TODO: custom type diff --git a/solver/solver.go b/solver/solver.go index d03e0d46..ad2e414e 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -1,6 +1,8 @@ package solver import ( + "fmt" + "github.com/Sirupsen/logrus" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/client" @@ -44,13 +46,13 @@ type Reference interface { // Op is an implementation for running a vertex type Op interface { - CacheKey(context.Context, []string) (string, error) + CacheKey(context.Context, []string) (string, int, error) Run(ctx context.Context, inputs []Reference) (outputs []Reference, err error) } type InstructionCache interface { - Lookup(ctx context.Context, key string) ([]interface{}, error) // TODO: regular ref - Set(key string, refs []interface{}) error + Lookup(ctx context.Context, key string) (interface{}, error) // TODO: regular ref + Set(key string, ref interface{}) error } type Solver struct { @@ -104,6 +106,16 @@ func (s *Solver) Solve(ctx context.Context, id string, v Vertex, exp exporter.Ex } }() + for _, ref := range refs { + immutable, ok := toImmutableRef(ref) + if !ok { + return errors.Errorf("invalid reference for exporting: %T", ref) + } + if err := immutable.Finalize(ctx); err != nil { + return err + } + } + if exp != nil { immutable, ok := toImmutableRef(refs[0]) if !ok { @@ -118,7 +130,6 @@ func (s *Solver) Solve(ctx context.Context, id string, v Vertex, exp exporter.Ex return err } } - return err } @@ -131,31 +142,31 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client. return j.pipe(ctx, statusChan) } -func (s *Solver) getCacheKey(ctx context.Context, j *job, g *vertex) (cacheKey string, retErr error) { +func (s *Solver) getCacheKey(ctx context.Context, j *job, g *vertex) (cacheKey string, numRefs int, retErr error) { state, err := s.activeState.vertexState(j, g.digest, func() (Op, error) { return s.resolve(g) }) if err != nil { - return "", err + return "", 0, err } inputs := make([]string, len(g.inputs)) if len(g.inputs) > 0 { eg, ctx := errgroup.WithContext(ctx) for i, in := range g.inputs { - func(i int, in *vertex) { + func(i int, in *vertex, index int) { eg.Go(func() error { - k, err := s.getCacheKey(ctx, j, in) + k, _, err := s.getCacheKey(ctx, j, in) if err != nil { return err } - inputs[i] = k + inputs[i] = fmt.Sprintf("%s.%d", k, index) return nil }) - }(i, in.vertex) + }(i, in.vertex, in.index) } if err := eg.Wait(); err != nil { - return "", err + return "", 0, err } } @@ -169,7 +180,7 @@ func (s *Solver) getCacheKey(ctx context.Context, j *job, g *vertex) (cacheKey s }() } - return state.GetCacheKey(ctx, func(ctx context.Context, op Op) (string, error) { + return state.GetCacheKey(ctx, func(ctx context.Context, op Op) (string, int, error) { return op.CacheKey(ctx, inputs) }) } @@ -185,21 +196,29 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer var cacheKey string if s.cache != nil { var err error - cacheKey, err = s.getCacheKey(ctx, j, g) + var numRefs int + cacheKey, numRefs, err = s.getCacheKey(ctx, j, g) if err != nil { return nil, err } - cacheRefsAny, err := s.cache.Lookup(ctx, cacheKey) - if err != nil { - return nil, err - } - if len(cacheRefsAny) > 0 { - cacheRefs, err := toReferenceArray(cacheRefsAny) + cacheRefs := make([]Reference, 0, numRefs) + // check if all current refs are already cached + for i := 0; i < numRefs; i++ { + ref, err := s.cache.Lookup(ctx, fmt.Sprintf("%s.%d", cacheKey, i)) if err != nil { return nil, err } - g.recursiveMarkCached(ctx) - return cacheRefs, nil + if ref == nil { // didn't find ref, release all + for _, ref := range cacheRefs { + ref.Release(context.TODO()) + } + break + } + cacheRefs = append(cacheRefs, ref.(Reference)) + if len(cacheRefs) == numRefs { // last item + g.recursiveMarkCached(ctx) + return cacheRefs, nil + } } } @@ -208,8 +227,28 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer if len(g.inputs) > 0 { eg, ctx := errgroup.WithContext(ctx) for i, in := range g.inputs { - func(i int, in *vertex) { + func(i int, in *vertex, index int) { eg.Go(func() error { + if s.cache != nil { + k, numRefs, err := s.getCacheKey(ctx, j, in) + if err != nil { + return err + } + ref, err := s.cache.Lookup(ctx, fmt.Sprintf("%s.%d", k, index)) + if err != nil { + return err + } + if ref != nil { + if ref, ok := toImmutableRef(ref.(Reference)); ok { + refs[i] = make([]*sharedRef, numRefs) + refs[i][index] = newSharedRef(ref) + in.recursiveMarkCached(ctx) + return nil + } + } + } + + // execute input vertex r, err := s.getRefs(ctx, j, in) if err != nil { return err @@ -219,13 +258,15 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer } return nil }) - }(i, in.vertex) + }(i, in.vertex, in.index) } err := eg.Wait() if err != nil { for _, r := range refs { for _, r := range r { - go r.Release(context.TODO()) + if r != nil { + go r.Release(context.TODO()) + } } } return nil, err @@ -247,7 +288,9 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer // release anything else for _, r := range refs { for _, r := range r { - go r.Release(context.TODO()) + if r != nil { + go r.Release(context.TODO()) + } } } @@ -265,34 +308,18 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer return nil, err } if s.cache != nil { - if err := s.cache.Set(cacheKey, toAny(refs)); err != nil { - logrus.Errorf("failed to save cache for %s: %v", cacheKey, err) + for i, ref := range refs { + if ref != nil { + if err := s.cache.Set(fmt.Sprintf("%s.%d", cacheKey, i), ref); err != nil { + logrus.Errorf("failed to save cache for %s: %v", cacheKey, err) + } + } } } return refs, nil }) } -func toReferenceArray(in []interface{}) ([]Reference, error) { - out := make([]Reference, 0, len(in)) - for _, i := range in { - r, ok := i.(Reference) - if !ok { - return nil, errors.Errorf("invalid reference") - } - out = append(out, r) - } - return out, nil -} - -func toAny(in []Reference) []interface{} { - out := make([]interface{}, 0, len(in)) - for _, i := range in { - out = append(out, i) - } - return out -} - func toImmutableRef(ref Reference) (cache.ImmutableRef, bool) { sysRef := ref if sys, ok := ref.(interface { diff --git a/solver/source.go b/solver/source.go index e551bf70..52d90adf 100644 --- a/solver/source.go +++ b/solver/source.go @@ -58,12 +58,13 @@ func (s *sourceOp) instance(ctx context.Context) (source.SourceInstance, error) return s.src, nil } -func (s *sourceOp) CacheKey(ctx context.Context, _ []string) (string, error) { +func (s *sourceOp) CacheKey(ctx context.Context, _ []string) (string, int, error) { src, err := s.instance(ctx) if err != nil { - return "", err + return "", 0, err } - return src.CacheKey(ctx) + k, err := src.CacheKey(ctx) + return k, 1, err } func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) { diff --git a/solver/state.go b/solver/state.go index 5273569d..080ebe4f 100644 --- a/solver/state.go +++ b/solver/state.go @@ -24,6 +24,7 @@ type state struct { jobs map[*job]struct{} refs []*sharedRef cacheKey string + numRefs int op Op progressCtx context.Context cacheCtx context.Context @@ -96,7 +97,7 @@ func (s *state) GetRefs(ctx context.Context, cb func(context.Context, Op) ([]Ref return refs, nil } -func (s *state) GetCacheKey(ctx context.Context, cb func(context.Context, Op) (string, error)) (string, error) { +func (s *state) GetCacheKey(ctx context.Context, cb func(context.Context, Op) (string, int, error)) (string, int, error) { _, err := s.Do(ctx, "cache:"+s.key.String(), func(doctx context.Context) (interface{}, error) { if s.cacheKey != "" { if err := writeProgressSnapshot(s.cacheCtx, ctx); err != nil { @@ -104,18 +105,19 @@ func (s *state) GetCacheKey(ctx context.Context, cb func(context.Context, Op) (s } return nil, nil } - cacheKey, err := cb(doctx, s.op) + cacheKey, numRefs, err := cb(doctx, s.op) if err != nil { return nil, err } s.cacheKey = cacheKey + s.numRefs = numRefs s.cacheCtx = doctx return nil, nil }) if err != nil { - return "", err + return "", 0, err } - return s.cacheKey, nil + return s.cacheKey, s.numRefs, nil } func writeProgressSnapshot(srcCtx, destCtx context.Context) error { diff --git a/source/git/gitsource.go b/source/git/gitsource.go index 1ed72b3d..74382686 100644 --- a/source/git/gitsource.go +++ b/source/git/gitsource.go @@ -86,10 +86,7 @@ func (gs *gitSource) mountRemote(ctx context.Context, remote string) (target str } releaseRemoteRef := func() { - s, err := remoteRef.Freeze() // TODO: remove this - if err == nil { - s.Release(context.TODO()) - } + remoteRef.Release(context.TODO()) } defer func() { @@ -133,7 +130,7 @@ func (gs *gitSource) mountRemote(ctx context.Context, remote string) (target str } if err := si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, "git-remote", *v) + return si.SetValue(b, "git-remote", v) }); err != nil { return "", nil, err } @@ -270,10 +267,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe defer func() { if retErr != nil && checkoutRef != nil { - s, err := checkoutRef.Freeze() // TODO: remove this - if err != nil { - s.Release(context.TODO()) - } + checkoutRef.Release(context.TODO()) } }() @@ -326,7 +320,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe lm.Unmount() lm = nil - snap, err := checkoutRef.ReleaseAndCommit(ctx) + snap, err := checkoutRef.Commit(ctx) if err != nil { return nil, err } @@ -345,7 +339,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe return nil, err } if err := si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, "git-snapshot", *v) + return si.SetValue(b, "git-snapshot", v) }); err != nil { return nil, err } diff --git a/source/local/local.go b/source/local/local.go index 8eddf0b5..bded3fad 100644 --- a/source/local/local.go +++ b/source/local/local.go @@ -3,7 +3,10 @@ package local import ( "time" + "github.com/Sirupsen/logrus" + "github.com/boltdb/bolt" "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/session" "github.com/moby/buildkit/session/filesync" "github.com/moby/buildkit/snapshot" @@ -12,15 +15,19 @@ import ( "golang.org/x/net/context" ) +const keySharedKey = "local.sharedKey" + type Opt struct { SessionManager *session.Manager CacheAccessor cache.Accessor + MetadataStore *metadata.Store } func NewSource(opt Opt) (source.Source, error) { ls := &localSource{ sm: opt.SessionManager, cm: opt.CacheAccessor, + md: opt.MetadataStore, } return ls, nil } @@ -28,6 +35,7 @@ func NewSource(opt Opt) (source.Source, error) { type localSource struct { sm *session.Manager cm cache.Accessor + md *metadata.Store } func (ls *localSource) ID() string { @@ -80,17 +88,33 @@ func (ls *localSourceHandler) Snapshot(ctx context.Context) (out cache.Immutable return nil, err } - mutable, err := ls.cm.New(ctx, nil) + sharedKey := keySharedKey + ":" + ls.src.Name + ":" + caller.SharedKey() + + var mutable cache.MutableRef + sis, err := ls.md.Search(sharedKey) if err != nil { return nil, err } + for _, si := range sis { + if m, err := ls.cm.GetMutable(ctx, si.ID()); err == nil { + logrus.Debugf("reusing ref for local: %s", m.ID()) + mutable = m + break + } + } + + if mutable == nil { + m, err := ls.cm.New(ctx, nil) + if err != nil { + return nil, err + } + mutable = m + logrus.Debugf("new ref for local: %s", mutable.ID()) + } defer func() { if retErr != nil && mutable != nil { - s, err := mutable.Freeze() - if err == nil { - go s.Release(context.TODO()) - } + go mutable.Release(context.TODO()) } }() @@ -128,11 +152,34 @@ func (ls *localSourceHandler) Snapshot(ctx context.Context) (out cache.Immutable } lm = nil - snap, err := mutable.ReleaseAndCommit(ctx) + skipStoreSharedKey := false + si, _ := ls.md.Get(mutable.ID()) + if v := si.Get(keySharedKey); v != nil { + var str string + if err := v.Unmarshal(&str); err != nil { + return nil, err + } + skipStoreSharedKey = str == sharedKey + } + if !skipStoreSharedKey { + v, err := metadata.NewValue(sharedKey) + if err != nil { + return nil, err + } + v.Index = sharedKey + if err := si.Update(func(b *bolt.Bucket) error { + return si.SetValue(b, sharedKey, v) + }); err != nil { + return nil, err + } + logrus.Debugf("saved %s as %s", mutable.ID(), sharedKey) + } + + snap, err := mutable.Commit(ctx) if err != nil { return nil, err } mutable = nil - return snap, err + return snap, nil }