diff --git a/cache/manager.go b/cache/manager.go index 0c7ec789..ebf12e31 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -36,6 +36,7 @@ type Accessor interface { New(ctx context.Context, s ImmutableRef, opts ...RefOption) (MutableRef, error) GetMutable(ctx context.Context, id string) (MutableRef, error) // Rebase? IdentityMapping() *idtools.IdentityMapping + Metadata(string) *metadata.StorageItem } type Controller interface { @@ -124,6 +125,16 @@ func (cm *cacheManager) GetFromSnapshotter(ctx context.Context, id string, opts return cm.get(ctx, id, true, opts...) } +func (cm *cacheManager) Metadata(id string) *metadata.StorageItem { + cm.mu.Lock() + defer cm.mu.Unlock() + r, ok := cm.records[id] + if !ok { + return nil + } + return r.Metadata() +} + // get requires manager lock to be taken func (cm *cacheManager) get(ctx context.Context, id string, fromSnapshotter bool, opts ...RefOption) (*immutableRef, error) { rec, err := cm.getRecord(ctx, id, fromSnapshotter, opts...) diff --git a/cache/metadata/metadata.go b/cache/metadata/metadata.go index f43da001..42e8cb40 100644 --- a/cache/metadata/metadata.go +++ b/cache/metadata/metadata.go @@ -250,6 +250,10 @@ func (s *StorageItem) Update(fn func(b *bolt.Bucket) error) error { return s.storage.Update(s.id, fn) } +func (s *StorageItem) Metadata() *StorageItem { + return s +} + func (s *StorageItem) Keys() []string { keys := make([]string, 0, len(s.values)) for k := range s.values { @@ -333,6 +337,15 @@ func (s *StorageItem) Indexes() (out []string) { func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error { if v == nil { + if old, ok := s.values[key]; ok { + if old.Index != "" { + b, err := b.Tx().CreateBucketIfNotExists([]byte(indexBucket)) + if err != nil { + return errors.WithStack(err) + } + b.Delete([]byte(indexKey(old.Index, s.ID()))) // ignore error + } + } if err := b.Put([]byte(key), nil); err != nil { return err } diff --git a/client/client_test.go b/client/client_test.go index a502f788..78dce493 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -99,6 +99,7 @@ func TestClientIntegration(t *testing.T) { testBasicInlineCacheImportExport, testExportBusyboxLocal, testBridgeNetworking, + testCacheMountNoCache, }, mirrors) integration.Run(t, []integration.Test{ @@ -1989,6 +1990,44 @@ func testDuplicateCacheMount(t *testing.T, sb integration.Sandbox) { require.NoError(t, err) } +func testCacheMountNoCache(t *testing.T, sb integration.Sandbox) { + requiresLinux(t) + c, err := New(context.TODO(), sb.Address()) + require.NoError(t, err) + defer c.Close() + + busybox := llb.Image("busybox:latest") + + out := busybox.Run(llb.Shlex(`sh -e -c "touch /m1/foo; touch /m2/bar"`)) + out.AddMount("/m1", llb.Scratch(), llb.AsPersistentCacheDir("mycache1", llb.CacheMountLocked)) + out.AddMount("/m2", llb.Scratch(), llb.AsPersistentCacheDir("mycache2", llb.CacheMountLocked)) + + def, err := out.Marshal() + require.NoError(t, err) + + _, err = c.Solve(context.TODO(), def, SolveOpt{}, nil) + require.NoError(t, err) + + out = busybox.Run(llb.Shlex(`sh -e -c "[[ ! -f /m1/foo ]]; touch /m1/foo2;"`), llb.IgnoreCache) + out.AddMount("/m1", llb.Scratch(), llb.AsPersistentCacheDir("mycache1", llb.CacheMountLocked)) + + def, err = out.Marshal() + require.NoError(t, err) + + _, err = c.Solve(context.TODO(), def, SolveOpt{}, nil) + require.NoError(t, err) + + out = busybox.Run(llb.Shlex(`sh -e -c "[[ -f /m1/foo2 ]]; [[ -f /m2/bar ]];"`)) + out.AddMount("/m1", llb.Scratch(), llb.AsPersistentCacheDir("mycache1", llb.CacheMountLocked)) + out.AddMount("/m2", llb.Scratch(), llb.AsPersistentCacheDir("mycache2", llb.CacheMountLocked)) + + def, err = out.Marshal() + require.NoError(t, err) + + _, err = c.Solve(context.TODO(), def, SolveOpt{}, nil) + require.NoError(t, err) +} + // containerd/containerd#2119 func testDuplicateWhiteouts(t *testing.T, sb integration.Sandbox) { requiresLinux(t) diff --git a/solver/llbsolver/bridge.go b/solver/llbsolver/bridge.go index e5d362d8..42f2a8ce 100644 --- a/solver/llbsolver/bridge.go +++ b/solver/llbsolver/bridge.go @@ -29,6 +29,7 @@ type llbBridge struct { builder solver.Builder frontends map[string]frontend.Frontend resolveWorker func() (worker.Worker, error) + eachWorker func(func(worker.Worker) error) error resolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc cms map[string]solver.CacheManager cmsMu sync.Mutex @@ -91,11 +92,25 @@ func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (res * if err != nil { return nil, err } + dpc := &detectPrunedCacheID{} - edge, err := Load(req.Definition, ValidateEntitlements(ent), WithCacheSources(cms), RuntimePlatforms(b.platforms), WithValidateCaps()) + edge, err := Load(req.Definition, dpc.Load, ValidateEntitlements(ent), WithCacheSources(cms), RuntimePlatforms(b.platforms), WithValidateCaps()) if err != nil { return nil, errors.Wrap(err, "failed to load LLB") } + + if len(dpc.ids) > 0 { + ids := make([]string, 0, len(dpc.ids)) + for id := range dpc.ids { + ids = append(ids, id) + } + if err := b.eachWorker(func(w worker.Worker) error { + return w.PruneCacheMounts(ctx, ids) + }); err != nil { + return nil, err + } + } + ref, err := b.builder.Build(ctx, edge) if err != nil { return nil, errors.Wrap(err, "failed to build LLB") diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index 99902a83..d0de3802 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -221,11 +221,13 @@ func (e *execOp) getMountDeps() ([]dep, error) { } func (e *execOp) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, m *pb.Mount, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) { - key := "cache-dir:" + id if ref != nil { key += ":" + ref.ID() } + mu := CacheMountsLocker() + mu.Lock() + defer mu.Unlock() if ref, ok := e.cacheMounts[key]; ok { return ref.clone(), nil @@ -792,10 +794,17 @@ type cacheRefs struct { shares map[string]*cacheRefShare } -func (r *cacheRefs) get(key string, fn func() (cache.MutableRef, error)) (cache.MutableRef, error) { - r.mu.Lock() - defer r.mu.Unlock() +// ClearActiveCacheMounts clears shared cache mounts currently in use. +// Caller needs to hold CacheMountsLocker before calling +func ClearActiveCacheMounts() { + sharedCacheRefs.shares = nil +} +func CacheMountsLocker() sync.Locker { + return &sharedCacheRefs.mu +} + +func (r *cacheRefs) get(key string, fn func() (cache.MutableRef, error)) (cache.MutableRef, error) { if r.shares == nil { r.shares = map[string]*cacheRefShare{} } diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 9ae11639..9e4525a5 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -39,6 +39,7 @@ type Solver struct { workerController *worker.Controller solver *solver.Solver resolveWorker ResolveWorkerFunc + eachWorker func(func(worker.Worker) error) error frontends map[string]frontend.Frontend resolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc platforms []specs.Platform @@ -51,6 +52,7 @@ func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.Cac s := &Solver{ workerController: wc, resolveWorker: defaultResolver(wc), + eachWorker: allWorkers(wc), frontends: f, resolveCacheImporterFuncs: resolveCI, gatewayForwarder: gatewayForwarder, @@ -87,6 +89,7 @@ func (s *Solver) Bridge(b solver.Builder) frontend.FrontendLLBBridge { builder: b, frontends: s.frontends, resolveWorker: s.resolveWorker, + eachWorker: s.eachWorker, resolveCacheImporterFuncs: s.resolveCacheImporterFuncs, cms: map[string]solver.CacheManager{}, platforms: s.platforms, @@ -285,6 +288,20 @@ func defaultResolver(wc *worker.Controller) ResolveWorkerFunc { return wc.GetDefault() } } +func allWorkers(wc *worker.Controller) func(func(w worker.Worker) error) error { + return func(f func(worker.Worker) error) error { + all, err := wc.List() + if err != nil { + return err + } + for _, w := range all { + if err := f(w); err != nil { + return err + } + } + return nil + } +} func oneOffProgress(ctx context.Context, id string) func(err error) error { pw, _, _ := progress.FromContext(ctx) diff --git a/solver/llbsolver/vertex.go b/solver/llbsolver/vertex.go index 91875461..7de9fd0f 100644 --- a/solver/llbsolver/vertex.go +++ b/solver/llbsolver/vertex.go @@ -131,6 +131,34 @@ func ValidateEntitlements(ent entitlements.Set) LoadOpt { } } +type detectPrunedCacheID struct { + ids map[string]struct{} +} + +func (dpc *detectPrunedCacheID) Load(op *pb.Op, md *pb.OpMetadata, opt *solver.VertexOptions) error { + if md == nil || !md.IgnoreCache { + return nil + } + switch op := op.Op.(type) { + case *pb.Op_Exec: + for _, m := range op.Exec.GetMounts() { + if m.MountType == pb.MountType_CACHE { + if m.CacheOpt != nil { + id := m.CacheOpt.ID + if id == "" { + id = m.Dest + } + if dpc.ids == nil { + dpc.ids = map[string]struct{}{} + } + dpc.ids[id] = struct{}{} + } + } + } + } + return nil +} + func Load(def *pb.Definition, opts ...LoadOpt) (solver.Edge, error) { return loadLLB(def, func(dgst digest.Digest, pbOp *pb.Op, load func(digest.Digest) (solver.Vertex, error)) (solver.Vertex, error) { opMetadata := def.Metadata[dgst] diff --git a/worker/base/worker.go b/worker/base/worker.go index cacb7807..0a540a9d 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "time" "github.com/containerd/containerd/content" @@ -51,6 +52,7 @@ import ( specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" + bolt "go.etcd.io/bbolt" "golang.org/x/sync/errgroup" ) @@ -217,6 +219,46 @@ func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *se return nil, errors.Errorf("could not resolve %v", v) } +func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error { + mu := ops.CacheMountsLocker() + mu.Lock() + defer mu.Unlock() + + for _, id := range ids { + id = "cache-dir:" + id + sis, err := w.MetadataStore.Search(id) + if err != nil { + return err + } + for _, si := range sis { + for _, k := range si.Indexes() { + if k == id || strings.HasPrefix(k, id+":") { + if siCached := w.CacheManager.Metadata(si.ID()); siCached != nil { + si = siCached + } + if err := cache.CachePolicyDefault(si); err != nil { + return err + } + si.Queue(func(b *bolt.Bucket) error { + return si.SetValue(b, k, nil) + }) + if err := si.Commit(); err != nil { + return err + } + // if ref is unused try to clean it up right away by releasing it + if mref, err := w.CacheManager.GetMutable(ctx, si.ID()); err == nil { + go mref.Release(context.TODO()) + } + break + } + } + } + } + + ops.ClearActiveCacheMounts() + return nil +} + func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error) { // ImageSource is typically source/containerimage resolveImageConfig, ok := w.ImageSource.(resolveImageConfig) diff --git a/worker/worker.go b/worker/worker.go index 6485af57..38cc7db0 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -33,6 +33,7 @@ type Worker interface { Prune(ctx context.Context, ch chan client.UsageInfo, opt ...client.PruneInfo) error GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) + PruneCacheMounts(ctx context.Context, ids []string) error } // Pre-defined label keys