cache: refactor reference reuse and caching

Replaces previous mutable.Freeze logic with
commits that can live together with mutable data.
Finalize method is added if the implementation
needs to make sure that the immutable ref is
flushed to the driver. Refs are automaitcally
finalized when writable layers are created on
top of them.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2017-07-14 11:59:31 -07:00
parent 2e48745f57
commit eac79f7c7e
19 changed files with 635 additions and 359 deletions

View File

@ -1,6 +1,7 @@
package instructioncache
import (
"github.com/Sirupsen/logrus"
"github.com/boltdb/bolt"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
@ -10,103 +11,51 @@ import (
const cacheKey = "buildkit.instructioncache"
type cacheGroup struct {
Snapshots []string `json:"snapshots"`
}
type LocalStore struct {
MetadataStore *metadata.Store
Cache cache.Accessor
}
func (ls *LocalStore) Set(key string, refsAny []interface{}) error {
refs, err := toReferenceArray(refsAny)
if err != nil {
return err
func (ls *LocalStore) Set(key string, value interface{}) error {
ref, ok := value.(cache.ImmutableRef)
if !ok {
return errors.Errorf("invalid ref")
}
cg := cacheGroup{}
for _, r := range refs {
cg.Snapshots = append(cg.Snapshots, r.ID())
}
v, err := metadata.NewValue(cg)
v, err := metadata.NewValue(ref.ID())
if err != nil {
return err
}
v.Index = index(key)
for _, r := range refs {
si, _ := ls.MetadataStore.Get(r.ID())
if err := si.Update(func(b *bolt.Bucket) error { // TODO: should share transaction
return si.SetValue(b, index(key), *v)
}); err != nil {
return err
}
}
return nil
si, _ := ls.MetadataStore.Get(ref.ID())
return si.Update(func(b *bolt.Bucket) error {
return si.SetValue(b, index(key), v)
})
}
func (ls *LocalStore) Lookup(ctx context.Context, key string) ([]interface{}, error) {
func (ls *LocalStore) Lookup(ctx context.Context, key string) (interface{}, error) {
snaps, err := ls.MetadataStore.Search(index(key))
if err != nil {
return nil, err
}
refs := make([]cache.ImmutableRef, 0)
var retErr error
loop0:
for _, s := range snaps {
retErr = nil
for _, r := range refs {
r.Release(context.TODO())
}
refs = nil
for _, s := range snaps {
v := s.Get(index(key))
if v != nil {
var cg cacheGroup
if err = v.Unmarshal(&cg); err != nil {
retErr = err
var id string
if err = v.Unmarshal(&id); err != nil {
continue
}
for _, id := range cg.Snapshots {
r, err := ls.Cache.Get(ctx, id)
if err != nil {
retErr = err
continue loop0
}
refs = append(refs, r)
r, err := ls.Cache.Get(ctx, id)
if err != nil {
logrus.Warnf("failed to get cached snapshot %s: %v", id, err)
continue
}
retErr = nil
break
return r, nil
}
}
if retErr != nil {
for _, r := range refs {
r.Release(context.TODO())
}
refs = nil
}
return toAny(refs), retErr
return nil, nil
}
func index(k string) string {
return cacheKey + "::" + k
}
func toReferenceArray(in []interface{}) ([]cache.ImmutableRef, error) {
out := make([]cache.ImmutableRef, 0, len(in))
for _, i := range in {
r, ok := i.(cache.ImmutableRef)
if !ok {
return nil, errors.Errorf("invalid reference")
}
out = append(out, r)
}
return out, nil
}
func toAny(in []cache.ImmutableRef) []interface{} {
out := make([]interface{}, 0, len(in))
for _, i := range in {
out = append(out, i)
}
return out
}

76
cache/manager.go vendored
View File

@ -102,15 +102,10 @@ func (cm *cacheManager) get(ctx context.Context, id string) (ImmutableRef, error
defer rec.mu.Unlock()
if rec.mutable {
return nil, errors.Wrapf(errInvalid, "invalid mutable ref %s", id)
}
if rec.mutable && !rec.frozen {
if len(rec.refs) != 0 {
return nil, errors.Wrapf(errLocked, "%s is locked", id)
} else {
rec.frozen = true
}
return rec.mref().commit(ctx)
}
return rec.ref(), nil
@ -121,6 +116,24 @@ func (cm *cacheManager) load(ctx context.Context, id string) (*cacheRecord, erro
return rec, nil
}
md, _ := cm.md.Get(id)
if mutableID := getEqualMutable(&md); mutableID != "" {
mutable, err := cm.load(ctx, mutableID)
if err != nil {
return nil, err
}
rec := &cacheRecord{
cm: cm,
refs: make(map[Mountable]struct{}),
parent: mutable.parent,
md: &md,
equalMutable: &mutableRef{cacheRecord: mutable},
}
mutable.equalImmutable = &immutableRef{cacheRecord: rec}
cm.records[id] = rec
return rec, nil
}
info, err := cm.Snapshotter.Stat(ctx, id)
if err != nil {
return nil, errors.Wrap(errNotFound, err.Error())
@ -134,8 +147,6 @@ func (cm *cacheManager) load(ctx context.Context, id string) (*cacheRecord, erro
}
}
md, _ := cm.md.Get(id)
rec := &cacheRecord{
mutable: info.Kind != cdsnapshot.KindCommitted,
cm: cm,
@ -143,7 +154,7 @@ func (cm *cacheManager) load(ctx context.Context, id string) (*cacheRecord, erro
parent: parent,
md: &md,
}
cm.records[id] = rec // TODO: store to db
cm.records[id] = rec
return rec, nil
}
@ -158,6 +169,9 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef) (MutableRef, er
if err != nil {
return nil, err
}
if err := parent.Finalize(ctx); err != nil {
return nil, err
}
parentID = parent.ID()
}
@ -185,7 +199,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef) (MutableRef, er
return rec.mref(), nil
}
func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, error) { // Rebase?
func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
@ -200,10 +214,18 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef,
return nil, errors.Wrapf(errInvalid, "%s is not mutable", id)
}
if rec.frozen || len(rec.refs) != 0 {
if len(rec.refs) != 0 {
return nil, errors.Wrapf(errLocked, "%s is locked", id)
}
if rec.equalImmutable != nil {
delete(cm.records, rec.equalImmutable.ID())
if err := rec.equalImmutable.remove(ctx, false); err != nil {
return nil, err
}
rec.equalImmutable = nil
}
return rec.mref(), nil
}
@ -222,6 +244,11 @@ func (cm *cacheManager) DiskUsage(ctx context.Context) ([]*client.UsageInfo, err
for id, cr := range cm.records {
cr.mu.Lock()
// ignore duplicates that share data
if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 {
cr.mu.Unlock()
continue
}
c := &cacheUsageInfo{
refs: len(cr.refs),
mutable: cr.mutable,
@ -230,12 +257,12 @@ func (cm *cacheManager) DiskUsage(ctx context.Context) ([]*client.UsageInfo, err
if cr.parent != nil {
c.parent = cr.parent.ID()
}
if cr.mutable && c.refs > 0 && !cr.frozen {
if cr.mutable && c.refs > 0 {
c.size = 0 // size can not be determined because it is changing
}
m[id] = c
cr.mu.Unlock()
rescan[id] = struct{}{}
cr.mu.Unlock()
}
cm.mu.Unlock()
@ -270,17 +297,20 @@ func (cm *cacheManager) DiskUsage(ctx context.Context) ([]*client.UsageInfo, err
if d.Size == sizeUnknown {
func(d *client.UsageInfo) {
eg.Go(func() error {
ref, err := cm.Get(ctx, d.ID)
if err != nil {
d.Size = 0
return nil
if !d.Mutable {
ref, err := cm.Get(ctx, d.ID)
if err != nil {
d.Size = 0
return nil
}
s, err := ref.Size(ctx)
if err != nil {
return err
}
d.Size = s
return ref.Release(context.TODO())
}
s, err := ref.Size(ctx)
if err != nil {
return err
}
d.Size = s
return ref.Release(context.TODO())
return nil
})
}(d)
}

334
cache/manager_test.go vendored
View File

@ -2,6 +2,7 @@ package cache
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
@ -12,127 +13,254 @@ import (
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/snapshot"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestManager(t *testing.T) {
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
assert.NoError(t, err)
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
cm := getCacheManager(t, tmpdir)
_, err = cm.Get(ctx, "foobar")
require.Error(t, err)
checkDiskUsage(t, ctx, cm, 0, 0)
active, err := cm.New(ctx, nil)
require.NoError(t, err)
m, err := active.Mount(ctx)
require.NoError(t, err)
lm := snapshot.LocalMounter(m)
target, err := lm.Mount()
require.NoError(t, err)
fi, err := os.Stat(target)
require.NoError(t, err)
require.Equal(t, fi.IsDir(), true)
err = lm.Unmount()
require.NoError(t, err)
_, err = cm.GetMutable(ctx, active.ID())
require.Error(t, err)
require.Equal(t, errLocked, errors.Cause(err))
checkDiskUsage(t, ctx, cm, 1, 0)
snap, err := active.Commit(ctx)
require.NoError(t, err)
checkDiskUsage(t, ctx, cm, 1, 0)
_, err = cm.GetMutable(ctx, active.ID())
require.Error(t, err)
require.Equal(t, errLocked, errors.Cause(err))
err = snap.Release(ctx)
require.NoError(t, err)
checkDiskUsage(t, ctx, cm, 0, 1)
active, err = cm.GetMutable(ctx, active.ID())
require.NoError(t, err)
checkDiskUsage(t, ctx, cm, 1, 0)
snap, err = active.Commit(ctx)
require.NoError(t, err)
checkDiskUsage(t, ctx, cm, 1, 0)
err = snap.Finalize(ctx)
require.NoError(t, err)
err = snap.Release(ctx)
require.NoError(t, err)
_, err = cm.GetMutable(ctx, active.ID())
require.Error(t, err)
require.Equal(t, errNotFound, errors.Cause(err))
_, err = cm.GetMutable(ctx, snap.ID())
require.Error(t, err)
require.Equal(t, errInvalid, errors.Cause(err))
snap, err = cm.Get(ctx, snap.ID())
require.NoError(t, err)
snap2, err := cm.Get(ctx, snap.ID())
require.NoError(t, err)
checkDiskUsage(t, ctx, cm, 1, 0)
err = snap.Release(ctx)
require.NoError(t, err)
active2, err := cm.New(ctx, snap2)
require.NoError(t, err)
checkDiskUsage(t, ctx, cm, 2, 0)
snap3, err := active2.Commit(ctx)
require.NoError(t, err)
err = snap2.Release(ctx)
require.NoError(t, err)
checkDiskUsage(t, ctx, cm, 2, 0)
err = snap3.Release(ctx)
require.NoError(t, err)
checkDiskUsage(t, ctx, cm, 0, 2)
err = cm.Close()
require.NoError(t, err)
}
func TestLazyCommit(t *testing.T) {
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
cm := getCacheManager(t, tmpdir)
active, err := cm.New(ctx, nil)
require.NoError(t, err)
// after commit mutable is locked
snap, err := active.Commit(ctx)
require.NoError(t, err)
_, err = cm.GetMutable(ctx, active.ID())
require.Error(t, err)
require.Equal(t, errLocked, errors.Cause(err))
// immutable refs still work
snap2, err := cm.Get(ctx, snap.ID())
require.NoError(t, err)
require.Equal(t, snap.ID(), snap2.ID())
err = snap.Release(ctx)
require.NoError(t, err)
err = snap2.Release(ctx)
require.NoError(t, err)
// immutable work after final release as well
snap, err = cm.Get(ctx, snap.ID())
require.NoError(t, err)
require.Equal(t, snap.ID(), snap2.ID())
err = snap.Release(ctx)
require.NoError(t, err)
// after release mutable becomes available again
active2, err := cm.GetMutable(ctx, active.ID())
require.NoError(t, err)
require.Equal(t, active2.ID(), active.ID())
// because ref was took mutable old immutable are cleared
_, err = cm.Get(ctx, snap.ID())
require.Error(t, err)
require.Equal(t, errNotFound, errors.Cause(err))
snap, err = active2.Commit(ctx)
require.NoError(t, err)
// this time finalize commit
err = snap.Finalize(ctx)
require.NoError(t, err)
err = snap.Release(ctx)
require.NoError(t, err)
// mutable is gone after finalize
_, err = cm.GetMutable(ctx, active2.ID())
require.Error(t, err)
require.Equal(t, errNotFound, errors.Cause(err))
// immutable still works
snap2, err = cm.Get(ctx, snap.ID())
require.NoError(t, err)
require.Equal(t, snap.ID(), snap2.ID())
err = snap2.Release(ctx)
require.NoError(t, err)
// test restarting after commit
active, err = cm.New(ctx, nil)
require.NoError(t, err)
// after commit mutable is locked
snap, err = active.Commit(ctx)
require.NoError(t, err)
err = cm.Close()
require.NoError(t, err)
cm = getCacheManager(t, tmpdir)
snap2, err = cm.Get(ctx, snap.ID())
require.NoError(t, err)
err = snap2.Release(ctx)
require.NoError(t, err)
active, err = cm.GetMutable(ctx, active.ID())
require.NoError(t, err)
_, err = cm.Get(ctx, snap.ID())
require.Error(t, err)
require.Equal(t, errNotFound, errors.Cause(err))
snap, err = active.Commit(ctx)
require.NoError(t, err)
err = cm.Close()
require.NoError(t, err)
cm = getCacheManager(t, tmpdir)
snap2, err = cm.Get(ctx, snap.ID())
require.NoError(t, err)
err = snap2.Finalize(ctx)
require.NoError(t, err)
err = snap2.Release(ctx)
require.NoError(t, err)
active, err = cm.GetMutable(ctx, active.ID())
require.Error(t, err)
require.Equal(t, errNotFound, errors.Cause(err))
}
func getCacheManager(t *testing.T, tmpdir string) Manager {
snapshotter, err := naive.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
assert.NoError(t, err)
require.NoError(t, err)
md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db"))
assert.NoError(t, err)
require.NoError(t, err)
cm, err := NewManager(ManagerOpt{
Snapshotter: snapshotter,
MetadataStore: md,
})
assert.NoError(t, err)
_, err = cm.Get(ctx, "foobar")
assert.Error(t, err)
checkDiskUsage(t, ctx, cm, 0, 0)
active, err := cm.New(ctx, nil)
assert.NoError(t, err)
m, err := active.Mount(ctx)
assert.NoError(t, err)
lm := snapshot.LocalMounter(m)
target, err := lm.Mount()
assert.NoError(t, err)
fi, err := os.Stat(target)
assert.NoError(t, err)
assert.Equal(t, fi.IsDir(), true)
err = lm.Unmount()
assert.NoError(t, err)
_, err = cm.GetMutable(ctx, active.ID())
assert.Error(t, err)
assert.Equal(t, errLocked, errors.Cause(err))
checkDiskUsage(t, ctx, cm, 1, 0)
snap, err := active.Freeze()
assert.NoError(t, err)
checkDiskUsage(t, ctx, cm, 1, 0)
_, err = cm.GetMutable(ctx, active.ID())
assert.Error(t, err)
assert.Equal(t, errLocked, errors.Cause(err))
err = snap.Release(ctx)
assert.NoError(t, err)
checkDiskUsage(t, ctx, cm, 0, 1)
active, err = cm.GetMutable(ctx, active.ID())
assert.NoError(t, err)
checkDiskUsage(t, ctx, cm, 1, 0)
snap, err = active.ReleaseAndCommit(ctx)
assert.NoError(t, err)
checkDiskUsage(t, ctx, cm, 1, 0)
err = snap.Release(ctx)
assert.NoError(t, err)
_, err = cm.GetMutable(ctx, active.ID())
assert.Error(t, err)
assert.Equal(t, errNotFound, errors.Cause(err))
_, err = cm.GetMutable(ctx, snap.ID())
assert.Error(t, err)
assert.Equal(t, errInvalid, errors.Cause(err))
snap, err = cm.Get(ctx, snap.ID())
assert.NoError(t, err)
snap2, err := cm.Get(ctx, snap.ID())
assert.NoError(t, err)
checkDiskUsage(t, ctx, cm, 1, 0)
err = snap.Release(ctx)
assert.NoError(t, err)
active2, err := cm.New(ctx, snap2)
assert.NoError(t, err)
checkDiskUsage(t, ctx, cm, 2, 0)
snap3, err := active2.Freeze()
assert.NoError(t, err)
err = snap2.Release(ctx)
assert.NoError(t, err)
checkDiskUsage(t, ctx, cm, 2, 0)
err = snap3.Release(ctx)
assert.NoError(t, err)
checkDiskUsage(t, ctx, cm, 0, 2)
err = cm.Close()
assert.NoError(t, err)
require.NoError(t, err, fmt.Sprintf("error: %+v", err))
return cm
}
func checkDiskUsage(t *testing.T, ctx context.Context, cm Manager, inuse, unused int) {
du, err := cm.DiskUsage(ctx)
assert.NoError(t, err)
require.NoError(t, err)
var inuseActual, unusedActual int
for _, r := range du {
if r.InUse {
@ -141,6 +269,6 @@ func checkDiskUsage(t *testing.T, ctx context.Context, cm Manager, inuse, unused
unusedActual++
}
}
assert.Equal(t, inuse, inuseActual)
assert.Equal(t, unused, unusedActual)
require.Equal(t, inuse, inuseActual)
require.Equal(t, unused, unusedActual)
}

34
cache/metadata.go vendored
View File

@ -15,6 +15,8 @@ import (
const sizeUnknown int64 = -1
const keySize = "snapshot.size"
const keyEqualMutable = "cache.equalMutable"
const keyEqualImmutable = "cache.equalImmutable"
func setSize(si *metadata.StorageItem, s int64) error {
v, err := metadata.NewValue(s)
@ -22,7 +24,7 @@ func setSize(si *metadata.StorageItem, s int64) error {
return errors.Wrap(err, "failed to create size value")
}
si.Queue(func(b *bolt.Bucket) error {
return si.SetValue(b, keySize, *v)
return si.SetValue(b, keySize, v)
})
return nil
}
@ -38,3 +40,33 @@ func getSize(si *metadata.StorageItem) int64 {
}
return size
}
func getEqualMutable(si *metadata.StorageItem) string {
v := si.Get(keyEqualMutable)
if v == nil {
return ""
}
var str string
if err := v.Unmarshal(&str); err != nil {
return ""
}
return str
}
func setEqualMutable(si *metadata.StorageItem, s string) error {
v, err := metadata.NewValue(s)
if err != nil {
return errors.Wrapf(err, "failed to create %s meta value", keyEqualMutable)
}
si.Queue(func(b *bolt.Bucket) error {
return si.SetValue(b, keyEqualMutable, v)
})
return nil
}
func clearEqualMutable(si *metadata.StorageItem) error {
si.Queue(func(b *bolt.Bucket) error {
return si.SetValue(b, keyEqualMutable, nil)
})
return nil
}

View File

@ -186,10 +186,12 @@ func newStorageItem(id string, b *bolt.Bucket, s *Store) (StorageItem, error) {
if b != nil {
if err := b.ForEach(func(k, v []byte) error {
var sv Value
if err := json.Unmarshal(v, &sv); err != nil {
return err
if len(v) > 0 {
if err := json.Unmarshal(v, &sv); err != nil {
return err
}
si.values[string(k)] = &sv
}
si.values[string(k)] = &sv
return nil
}); err != nil {
return si, err
@ -247,7 +249,14 @@ func (s *StorageItem) Indexes() (out []string) {
return
}
func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v Value) error {
func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error {
if v == nil {
if err := b.Put([]byte(key), nil); err != nil {
return err
}
delete(s.values, key)
return nil
}
dt, err := json.Marshal(v)
if err != nil {
return err
@ -264,7 +273,7 @@ func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v Value) error {
return err
}
}
s.values[key] = &v
s.values[key] = v
return nil
}

View File

@ -31,7 +31,7 @@ func TestGetSetSearch(t *testing.T) {
require.NoError(t, err)
si.Queue(func(b *bolt.Bucket) error {
return si.SetValue(b, "bar", *v)
return si.SetValue(b, "bar", v)
})
err = si.Commit()
@ -72,7 +72,7 @@ func TestGetSetSearch(t *testing.T) {
require.NoError(t, err)
si.Queue(func(b *bolt.Bucket) error {
return si.SetValue(b, "bar2", *v)
return si.SetValue(b, "bar2", v)
})
err = si.Commit()
@ -135,7 +135,7 @@ func TestIndexes(t *testing.T) {
v.Index = tcase.index
si.Queue(func(b *bolt.Bucket) error {
return si.SetValue(b, tcase.value, *v)
return si.SetValue(b, tcase.value, v)
})
err = si.Commit()

173
cache/refs.go vendored
View File

@ -17,14 +17,15 @@ type ImmutableRef interface {
Release(context.Context) error
Size(ctx context.Context) (int64, error)
Parent() ImmutableRef
Finalize(ctx context.Context) error // Make sure reference is flushed to driver
// Prepare() / ChainID() / Meta()
}
type MutableRef interface {
Mountable
ID() string
Freeze() (ImmutableRef, error)
ReleaseAndCommit(ctx context.Context) (ImmutableRef, error)
Commit(context.Context) (ImmutableRef, error)
Release(context.Context) error
Size(ctx context.Context) (int64, error)
}
@ -33,10 +34,8 @@ type Mountable interface {
}
type cacheRecord struct {
mu sync.Mutex
mutable bool
frozen bool
// meta SnapMeta
mu sync.Mutex
mutable bool
refs map[Mountable]struct{}
cm *cacheManager
parent ImmutableRef
@ -46,6 +45,10 @@ type cacheRecord struct {
sizeG flightcontrol.Group
// size int64
// these are filled if multiple refs point to same data
equalMutable *mutableRef
equalImmutable *immutableRef
}
// hold manager lock before calling
@ -67,11 +70,16 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) {
s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) {
cr.mu.Lock()
s := getSize(cr.md)
cr.mu.Unlock()
if s != sizeUnknown {
cr.mu.Unlock()
return s, nil
}
usage, err := cr.cm.ManagerOpt.Snapshotter.Usage(ctx, cr.ID())
driverID := cr.ID()
if cr.equalMutable != nil {
driverID = cr.equalMutable.ID()
}
cr.mu.Unlock()
usage, err := cr.cm.ManagerOpt.Snapshotter.Usage(ctx, driverID)
if err != nil {
return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID())
}
@ -104,6 +112,9 @@ func (cr *cacheRecord) Mount(ctx context.Context) ([]mount.Mount, error) {
}
return m, nil
}
if err := cr.finalize(ctx); err != nil {
return nil, err
}
if cr.viewMount == nil { // TODO: handle this better
cr.view = identity.NewID()
m, err := cr.cm.Snapshotter.View(ctx, cr.view, cr.ID())
@ -116,6 +127,18 @@ func (cr *cacheRecord) Mount(ctx context.Context) ([]mount.Mount, error) {
return cr.viewMount, nil
}
func (cr *cacheRecord) remove(ctx context.Context, removeSnapshot bool) error {
if err := cr.cm.md.Clear(cr.ID()); err != nil {
return err
}
if removeSnapshot {
if err := cr.cm.Snapshotter.Remove(ctx, cr.ID()); err != nil {
return err
}
}
return nil
}
func (cr *cacheRecord) ID() string {
return cr.md.ID()
}
@ -148,82 +171,100 @@ func (sr *immutableRef) release(ctx context.Context) error {
}
delete(sr.refs, sr)
sr.frozen = false
if len(sr.refs) == 0 {
//go sr.cm.GC()
if sr.equalMutable != nil {
sr.equalMutable.release(ctx)
}
// go sr.cm.GC()
}
return nil
}
func (sr *mutableRef) Freeze() (ImmutableRef, error) {
func (sr *immutableRef) Finalize(ctx context.Context) error {
sr.mu.Lock()
defer sr.mu.Unlock()
return sr.finalize(ctx)
}
func (sr *cacheRecord) finalize(ctx context.Context) error {
mutable := sr.equalMutable
if mutable == nil {
return nil
}
err := sr.cm.Snapshotter.Commit(ctx, sr.ID(), sr.equalMutable.ID())
if err != nil {
return errors.Wrapf(err, "failed to commit %s", sr.equalMutable.ID())
}
delete(sr.cm.records, sr.equalMutable.ID())
if err := sr.equalMutable.remove(ctx, false); err != nil {
return err
}
sr.equalMutable = nil
clearEqualMutable(sr.md)
return sr.md.Commit()
}
func (sr *mutableRef) commit(ctx context.Context) (ImmutableRef, error) {
if !sr.mutable || len(sr.refs) == 0 {
return nil, errors.Wrapf(errInvalid, "invalid mutable")
}
id := identity.NewID()
md, _ := sr.cm.md.Get(id)
rec := &cacheRecord{
cm: sr.cm,
parent: sr.parent,
equalMutable: sr,
refs: make(map[Mountable]struct{}),
md: &md,
}
sr.cm.records[id] = rec
if err := sr.md.Commit(); err != nil {
return nil, err
}
setSize(&md, sizeUnknown)
setEqualMutable(&md, sr.ID())
if err := md.Commit(); err != nil {
return nil, err
}
ref := rec.ref()
sr.equalImmutable = ref
return ref, nil
}
func (sr *mutableRef) Commit(ctx context.Context) (ImmutableRef, error) {
sr.cm.mu.Lock()
defer sr.cm.mu.Unlock()
sr.mu.Lock()
defer sr.mu.Unlock()
if !sr.mutable || sr.frozen || len(sr.refs) != 1 {
return nil, errors.Wrapf(errInvalid, "invalid mutable")
}
if _, ok := sr.refs[sr]; !ok {
return nil, errors.Wrapf(errInvalid, "invalid mutable")
}
delete(sr.refs, sr)
sri := sr.ref()
sri.frozen = true
setSize(sr.md, sizeUnknown)
if err := sr.md.Commit(); err != nil {
return nil, err
}
return sri, nil
return sr.commit(ctx)
}
func (sr *mutableRef) ReleaseAndCommit(ctx context.Context) (ImmutableRef, error) {
func (sr *mutableRef) Release(ctx context.Context) error {
sr.cm.mu.Lock()
defer sr.cm.mu.Unlock()
sr.mu.Lock()
defer sr.mu.Unlock()
if !sr.mutable || sr.frozen {
sr.mu.Unlock()
return nil, errors.Wrapf(errInvalid, "invalid mutable")
}
if len(sr.refs) != 1 {
sr.mu.Unlock()
return nil, errors.Wrapf(errInvalid, "multiple mutable references")
}
sr.mu.Unlock()
id := identity.NewID()
err := sr.cm.Snapshotter.Commit(ctx, id, sr.ID())
if err != nil {
return nil, errors.Wrapf(err, "failed to commit %s", sr.ID())
}
delete(sr.cm.records, sr.ID())
if err := sr.cm.md.Clear(sr.ID()); err != nil {
return nil, err
}
md, _ := sr.cm.md.Get(id)
rec := &cacheRecord{
cm: sr.cm,
parent: sr.parent,
refs: make(map[Mountable]struct{}),
md: &md,
}
sr.cm.records[id] = rec // TODO: save to db
return rec.ref(), nil
return sr.release(ctx)
}
func (sr *mutableRef) release(ctx context.Context) error {
delete(sr.refs, sr)
// delete(sr.cm.records, sr.ID())
// if err := sr.remove(ctx, true); err != nil {
// return err
// }
return nil
}

View File

@ -153,7 +153,7 @@ func (eo *exec) marshalTo(list [][]byte, cache map[digest.Digest]struct{}) (dige
}
}
if dgst == "" {
inputIndex = -1
inputIndex = pb.Empty
}
if inputIndex == len(pop.Inputs) {
var mountIndex int64
@ -174,7 +174,7 @@ func (eo *exec) marshalTo(list [][]byte, cache map[digest.Digest]struct{}) (dige
pm.Output = outputIndex
outputIndex++
} else {
pm.Output = -1
pm.Output = pb.SkipOutput
}
m.outputIndex = outputIndex - 1
peo.Mounts = append(peo.Mounts, pm)

View File

@ -95,6 +95,7 @@ func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) {
ss, err := local.NewSource(local.Opt{
SessionManager: sessm,
CacheAccessor: cm,
MetadataStore: md,
})
if err != nil {
return nil, err

View File

@ -124,7 +124,7 @@ func TestControl(t *testing.T) {
err = w.Exec(ctx, meta, root, nil, nil, nil)
assert.NoError(t, err)
rf, err := root.Freeze()
rf, err := root.Commit(ctx)
assert.NoError(t, err)
mounts, err = rf.Mount(ctx)

View File

@ -114,7 +114,7 @@ func (s *Snapshotter) SetBlob(ctx context.Context, key string, blob digest.Diges
v.Index = index(blob)
return md.Update(func(b *bolt.Bucket) error {
return md.SetValue(b, blobKey, *v)
return md.SetValue(b, blobKey, v)
})
}

View File

@ -31,7 +31,16 @@ func (lm *localMounter) Mount() (string, error) {
defer lm.mu.Unlock()
if len(lm.m) == 1 && lm.m[0].Type == "bind" {
return lm.m[0].Source, nil
ro := false
for _, opt := range lm.m[0].Options {
if opt == "ro" {
ro = true
break
}
}
if !ro {
return lm.m[0].Source, nil
}
}
dir, err := ioutil.TempDir("", "buildkit-mount")

View File

@ -27,7 +27,7 @@ func newExecOp(op *pb.Op_Exec, cm cache.Manager, w worker.Worker) (Op, error) {
}, nil
}
func (e *execOp) CacheKey(ctx context.Context, inputs []string) (string, error) {
func (e *execOp) CacheKey(ctx context.Context, inputs []string) (string, int, error) {
dt, err := json.Marshal(struct {
Inputs []string
Exec *pb.ExecOp
@ -36,9 +36,16 @@ func (e *execOp) CacheKey(ctx context.Context, inputs []string) (string, error)
Exec: e.op,
})
if err != nil {
return "", err
return "", 0, err
}
return digest.FromBytes(dt).String(), nil
numRefs := 0
for _, m := range e.op.Mounts {
if m.Output != pb.SkipOutput {
numRefs++
}
}
return digest.FromBytes(dt).String(), numRefs, nil
}
func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, error) {
@ -49,10 +56,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
defer func() {
for _, o := range outputs {
if o != nil {
s, err := o.Freeze() // TODO: log error
if err == nil {
go s.Release(ctx)
}
go o.Release(ctx)
}
}
}()
@ -60,7 +64,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
for _, m := range e.op.Mounts {
var mountable cache.Mountable
var ref cache.ImmutableRef
if m.Input != -1 {
if m.Input != pb.Empty {
if int(m.Input) > len(inputs) {
return nil, errors.Errorf("missing input %d", m.Input)
}
@ -72,7 +76,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
}
mountable = ref
}
if m.Output != -1 {
if m.Output != pb.SkipOutput {
active, err := e.cm.New(ctx, ref) // TODO: should be method
if err != nil {
return nil, err
@ -107,7 +111,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
refs := []Reference{}
for i, o := range outputs {
ref, err := o.ReleaseAndCommit(ctx)
ref, err := o.Commit(ctx)
if err != nil {
return nil, errors.Wrapf(err, "error committing %s", o.ID())
}

View File

@ -1,3 +1,5 @@
package pb
const RootMount = "/"
const SkipOutput = -1 // TODO: custom type
const Empty = -1 // TODO: custom type

View File

@ -1,6 +1,8 @@
package solver
import (
"fmt"
"github.com/Sirupsen/logrus"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client"
@ -44,13 +46,13 @@ type Reference interface {
// Op is an implementation for running a vertex
type Op interface {
CacheKey(context.Context, []string) (string, error)
CacheKey(context.Context, []string) (string, int, error)
Run(ctx context.Context, inputs []Reference) (outputs []Reference, err error)
}
type InstructionCache interface {
Lookup(ctx context.Context, key string) ([]interface{}, error) // TODO: regular ref
Set(key string, refs []interface{}) error
Lookup(ctx context.Context, key string) (interface{}, error) // TODO: regular ref
Set(key string, ref interface{}) error
}
type Solver struct {
@ -104,6 +106,16 @@ func (s *Solver) Solve(ctx context.Context, id string, v Vertex, exp exporter.Ex
}
}()
for _, ref := range refs {
immutable, ok := toImmutableRef(ref)
if !ok {
return errors.Errorf("invalid reference for exporting: %T", ref)
}
if err := immutable.Finalize(ctx); err != nil {
return err
}
}
if exp != nil {
immutable, ok := toImmutableRef(refs[0])
if !ok {
@ -118,7 +130,6 @@ func (s *Solver) Solve(ctx context.Context, id string, v Vertex, exp exporter.Ex
return err
}
}
return err
}
@ -131,31 +142,31 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.
return j.pipe(ctx, statusChan)
}
func (s *Solver) getCacheKey(ctx context.Context, j *job, g *vertex) (cacheKey string, retErr error) {
func (s *Solver) getCacheKey(ctx context.Context, j *job, g *vertex) (cacheKey string, numRefs int, retErr error) {
state, err := s.activeState.vertexState(j, g.digest, func() (Op, error) {
return s.resolve(g)
})
if err != nil {
return "", err
return "", 0, err
}
inputs := make([]string, len(g.inputs))
if len(g.inputs) > 0 {
eg, ctx := errgroup.WithContext(ctx)
for i, in := range g.inputs {
func(i int, in *vertex) {
func(i int, in *vertex, index int) {
eg.Go(func() error {
k, err := s.getCacheKey(ctx, j, in)
k, _, err := s.getCacheKey(ctx, j, in)
if err != nil {
return err
}
inputs[i] = k
inputs[i] = fmt.Sprintf("%s.%d", k, index)
return nil
})
}(i, in.vertex)
}(i, in.vertex, in.index)
}
if err := eg.Wait(); err != nil {
return "", err
return "", 0, err
}
}
@ -169,7 +180,7 @@ func (s *Solver) getCacheKey(ctx context.Context, j *job, g *vertex) (cacheKey s
}()
}
return state.GetCacheKey(ctx, func(ctx context.Context, op Op) (string, error) {
return state.GetCacheKey(ctx, func(ctx context.Context, op Op) (string, int, error) {
return op.CacheKey(ctx, inputs)
})
}
@ -185,21 +196,29 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer
var cacheKey string
if s.cache != nil {
var err error
cacheKey, err = s.getCacheKey(ctx, j, g)
var numRefs int
cacheKey, numRefs, err = s.getCacheKey(ctx, j, g)
if err != nil {
return nil, err
}
cacheRefsAny, err := s.cache.Lookup(ctx, cacheKey)
if err != nil {
return nil, err
}
if len(cacheRefsAny) > 0 {
cacheRefs, err := toReferenceArray(cacheRefsAny)
cacheRefs := make([]Reference, 0, numRefs)
// check if all current refs are already cached
for i := 0; i < numRefs; i++ {
ref, err := s.cache.Lookup(ctx, fmt.Sprintf("%s.%d", cacheKey, i))
if err != nil {
return nil, err
}
g.recursiveMarkCached(ctx)
return cacheRefs, nil
if ref == nil { // didn't find ref, release all
for _, ref := range cacheRefs {
ref.Release(context.TODO())
}
break
}
cacheRefs = append(cacheRefs, ref.(Reference))
if len(cacheRefs) == numRefs { // last item
g.recursiveMarkCached(ctx)
return cacheRefs, nil
}
}
}
@ -208,8 +227,28 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer
if len(g.inputs) > 0 {
eg, ctx := errgroup.WithContext(ctx)
for i, in := range g.inputs {
func(i int, in *vertex) {
func(i int, in *vertex, index int) {
eg.Go(func() error {
if s.cache != nil {
k, numRefs, err := s.getCacheKey(ctx, j, in)
if err != nil {
return err
}
ref, err := s.cache.Lookup(ctx, fmt.Sprintf("%s.%d", k, index))
if err != nil {
return err
}
if ref != nil {
if ref, ok := toImmutableRef(ref.(Reference)); ok {
refs[i] = make([]*sharedRef, numRefs)
refs[i][index] = newSharedRef(ref)
in.recursiveMarkCached(ctx)
return nil
}
}
}
// execute input vertex
r, err := s.getRefs(ctx, j, in)
if err != nil {
return err
@ -219,13 +258,15 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer
}
return nil
})
}(i, in.vertex)
}(i, in.vertex, in.index)
}
err := eg.Wait()
if err != nil {
for _, r := range refs {
for _, r := range r {
go r.Release(context.TODO())
if r != nil {
go r.Release(context.TODO())
}
}
}
return nil, err
@ -247,7 +288,9 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer
// release anything else
for _, r := range refs {
for _, r := range r {
go r.Release(context.TODO())
if r != nil {
go r.Release(context.TODO())
}
}
}
@ -265,34 +308,18 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer
return nil, err
}
if s.cache != nil {
if err := s.cache.Set(cacheKey, toAny(refs)); err != nil {
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
for i, ref := range refs {
if ref != nil {
if err := s.cache.Set(fmt.Sprintf("%s.%d", cacheKey, i), ref); err != nil {
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
}
}
}
}
return refs, nil
})
}
func toReferenceArray(in []interface{}) ([]Reference, error) {
out := make([]Reference, 0, len(in))
for _, i := range in {
r, ok := i.(Reference)
if !ok {
return nil, errors.Errorf("invalid reference")
}
out = append(out, r)
}
return out, nil
}
func toAny(in []Reference) []interface{} {
out := make([]interface{}, 0, len(in))
for _, i := range in {
out = append(out, i)
}
return out
}
func toImmutableRef(ref Reference) (cache.ImmutableRef, bool) {
sysRef := ref
if sys, ok := ref.(interface {

View File

@ -58,12 +58,13 @@ func (s *sourceOp) instance(ctx context.Context) (source.SourceInstance, error)
return s.src, nil
}
func (s *sourceOp) CacheKey(ctx context.Context, _ []string) (string, error) {
func (s *sourceOp) CacheKey(ctx context.Context, _ []string) (string, int, error) {
src, err := s.instance(ctx)
if err != nil {
return "", err
return "", 0, err
}
return src.CacheKey(ctx)
k, err := src.CacheKey(ctx)
return k, 1, err
}
func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) {

View File

@ -24,6 +24,7 @@ type state struct {
jobs map[*job]struct{}
refs []*sharedRef
cacheKey string
numRefs int
op Op
progressCtx context.Context
cacheCtx context.Context
@ -96,7 +97,7 @@ func (s *state) GetRefs(ctx context.Context, cb func(context.Context, Op) ([]Ref
return refs, nil
}
func (s *state) GetCacheKey(ctx context.Context, cb func(context.Context, Op) (string, error)) (string, error) {
func (s *state) GetCacheKey(ctx context.Context, cb func(context.Context, Op) (string, int, error)) (string, int, error) {
_, err := s.Do(ctx, "cache:"+s.key.String(), func(doctx context.Context) (interface{}, error) {
if s.cacheKey != "" {
if err := writeProgressSnapshot(s.cacheCtx, ctx); err != nil {
@ -104,18 +105,19 @@ func (s *state) GetCacheKey(ctx context.Context, cb func(context.Context, Op) (s
}
return nil, nil
}
cacheKey, err := cb(doctx, s.op)
cacheKey, numRefs, err := cb(doctx, s.op)
if err != nil {
return nil, err
}
s.cacheKey = cacheKey
s.numRefs = numRefs
s.cacheCtx = doctx
return nil, nil
})
if err != nil {
return "", err
return "", 0, err
}
return s.cacheKey, nil
return s.cacheKey, s.numRefs, nil
}
func writeProgressSnapshot(srcCtx, destCtx context.Context) error {

View File

@ -86,10 +86,7 @@ func (gs *gitSource) mountRemote(ctx context.Context, remote string) (target str
}
releaseRemoteRef := func() {
s, err := remoteRef.Freeze() // TODO: remove this
if err == nil {
s.Release(context.TODO())
}
remoteRef.Release(context.TODO())
}
defer func() {
@ -133,7 +130,7 @@ func (gs *gitSource) mountRemote(ctx context.Context, remote string) (target str
}
if err := si.Update(func(b *bolt.Bucket) error {
return si.SetValue(b, "git-remote", *v)
return si.SetValue(b, "git-remote", v)
}); err != nil {
return "", nil, err
}
@ -270,10 +267,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
defer func() {
if retErr != nil && checkoutRef != nil {
s, err := checkoutRef.Freeze() // TODO: remove this
if err != nil {
s.Release(context.TODO())
}
checkoutRef.Release(context.TODO())
}
}()
@ -326,7 +320,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
lm.Unmount()
lm = nil
snap, err := checkoutRef.ReleaseAndCommit(ctx)
snap, err := checkoutRef.Commit(ctx)
if err != nil {
return nil, err
}
@ -345,7 +339,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
return nil, err
}
if err := si.Update(func(b *bolt.Bucket) error {
return si.SetValue(b, "git-snapshot", *v)
return si.SetValue(b, "git-snapshot", v)
}); err != nil {
return nil, err
}

View File

@ -3,7 +3,10 @@ package local
import (
"time"
"github.com/Sirupsen/logrus"
"github.com/boltdb/bolt"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
@ -12,15 +15,19 @@ import (
"golang.org/x/net/context"
)
const keySharedKey = "local.sharedKey"
type Opt struct {
SessionManager *session.Manager
CacheAccessor cache.Accessor
MetadataStore *metadata.Store
}
func NewSource(opt Opt) (source.Source, error) {
ls := &localSource{
sm: opt.SessionManager,
cm: opt.CacheAccessor,
md: opt.MetadataStore,
}
return ls, nil
}
@ -28,6 +35,7 @@ func NewSource(opt Opt) (source.Source, error) {
type localSource struct {
sm *session.Manager
cm cache.Accessor
md *metadata.Store
}
func (ls *localSource) ID() string {
@ -80,17 +88,33 @@ func (ls *localSourceHandler) Snapshot(ctx context.Context) (out cache.Immutable
return nil, err
}
mutable, err := ls.cm.New(ctx, nil)
sharedKey := keySharedKey + ":" + ls.src.Name + ":" + caller.SharedKey()
var mutable cache.MutableRef
sis, err := ls.md.Search(sharedKey)
if err != nil {
return nil, err
}
for _, si := range sis {
if m, err := ls.cm.GetMutable(ctx, si.ID()); err == nil {
logrus.Debugf("reusing ref for local: %s", m.ID())
mutable = m
break
}
}
if mutable == nil {
m, err := ls.cm.New(ctx, nil)
if err != nil {
return nil, err
}
mutable = m
logrus.Debugf("new ref for local: %s", mutable.ID())
}
defer func() {
if retErr != nil && mutable != nil {
s, err := mutable.Freeze()
if err == nil {
go s.Release(context.TODO())
}
go mutable.Release(context.TODO())
}
}()
@ -128,11 +152,34 @@ func (ls *localSourceHandler) Snapshot(ctx context.Context) (out cache.Immutable
}
lm = nil
snap, err := mutable.ReleaseAndCommit(ctx)
skipStoreSharedKey := false
si, _ := ls.md.Get(mutable.ID())
if v := si.Get(keySharedKey); v != nil {
var str string
if err := v.Unmarshal(&str); err != nil {
return nil, err
}
skipStoreSharedKey = str == sharedKey
}
if !skipStoreSharedKey {
v, err := metadata.NewValue(sharedKey)
if err != nil {
return nil, err
}
v.Index = sharedKey
if err := si.Update(func(b *bolt.Bucket) error {
return si.SetValue(b, sharedKey, v)
}); err != nil {
return nil, err
}
logrus.Debugf("saved %s as %s", mutable.ID(), sharedKey)
}
snap, err := mutable.Commit(ctx)
if err != nil {
return nil, err
}
mutable = nil
return snap, err
return snap, nil
}