Merge pull request #1092 from tonistiigi/cache-mount-clear
solver: support no-cache for exec cache mountsdocker-19.03
commit
9ae7b298e1
|
@ -36,6 +36,7 @@ type Accessor interface {
|
|||
New(ctx context.Context, s ImmutableRef, opts ...RefOption) (MutableRef, error)
|
||||
GetMutable(ctx context.Context, id string) (MutableRef, error) // Rebase?
|
||||
IdentityMapping() *idtools.IdentityMapping
|
||||
Metadata(string) *metadata.StorageItem
|
||||
}
|
||||
|
||||
type Controller interface {
|
||||
|
@ -124,6 +125,16 @@ func (cm *cacheManager) GetFromSnapshotter(ctx context.Context, id string, opts
|
|||
return cm.get(ctx, id, true, opts...)
|
||||
}
|
||||
|
||||
func (cm *cacheManager) Metadata(id string) *metadata.StorageItem {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
r, ok := cm.records[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return r.Metadata()
|
||||
}
|
||||
|
||||
// get requires manager lock to be taken
|
||||
func (cm *cacheManager) get(ctx context.Context, id string, fromSnapshotter bool, opts ...RefOption) (*immutableRef, error) {
|
||||
rec, err := cm.getRecord(ctx, id, fromSnapshotter, opts...)
|
||||
|
|
|
@ -250,6 +250,10 @@ func (s *StorageItem) Update(fn func(b *bolt.Bucket) error) error {
|
|||
return s.storage.Update(s.id, fn)
|
||||
}
|
||||
|
||||
func (s *StorageItem) Metadata() *StorageItem {
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *StorageItem) Keys() []string {
|
||||
keys := make([]string, 0, len(s.values))
|
||||
for k := range s.values {
|
||||
|
@ -333,6 +337,15 @@ func (s *StorageItem) Indexes() (out []string) {
|
|||
|
||||
func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error {
|
||||
if v == nil {
|
||||
if old, ok := s.values[key]; ok {
|
||||
if old.Index != "" {
|
||||
b, err := b.Tx().CreateBucketIfNotExists([]byte(indexBucket))
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
b.Delete([]byte(indexKey(old.Index, s.ID()))) // ignore error
|
||||
}
|
||||
}
|
||||
if err := b.Put([]byte(key), nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -99,6 +99,7 @@ func TestClientIntegration(t *testing.T) {
|
|||
testBasicInlineCacheImportExport,
|
||||
testExportBusyboxLocal,
|
||||
testBridgeNetworking,
|
||||
testCacheMountNoCache,
|
||||
}, mirrors)
|
||||
|
||||
integration.Run(t, []integration.Test{
|
||||
|
@ -1989,6 +1990,44 @@ func testDuplicateCacheMount(t *testing.T, sb integration.Sandbox) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func testCacheMountNoCache(t *testing.T, sb integration.Sandbox) {
|
||||
requiresLinux(t)
|
||||
c, err := New(context.TODO(), sb.Address())
|
||||
require.NoError(t, err)
|
||||
defer c.Close()
|
||||
|
||||
busybox := llb.Image("busybox:latest")
|
||||
|
||||
out := busybox.Run(llb.Shlex(`sh -e -c "touch /m1/foo; touch /m2/bar"`))
|
||||
out.AddMount("/m1", llb.Scratch(), llb.AsPersistentCacheDir("mycache1", llb.CacheMountLocked))
|
||||
out.AddMount("/m2", llb.Scratch(), llb.AsPersistentCacheDir("mycache2", llb.CacheMountLocked))
|
||||
|
||||
def, err := out.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = c.Solve(context.TODO(), def, SolveOpt{}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
out = busybox.Run(llb.Shlex(`sh -e -c "[[ ! -f /m1/foo ]]; touch /m1/foo2;"`), llb.IgnoreCache)
|
||||
out.AddMount("/m1", llb.Scratch(), llb.AsPersistentCacheDir("mycache1", llb.CacheMountLocked))
|
||||
|
||||
def, err = out.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = c.Solve(context.TODO(), def, SolveOpt{}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
out = busybox.Run(llb.Shlex(`sh -e -c "[[ -f /m1/foo2 ]]; [[ -f /m2/bar ]];"`))
|
||||
out.AddMount("/m1", llb.Scratch(), llb.AsPersistentCacheDir("mycache1", llb.CacheMountLocked))
|
||||
out.AddMount("/m2", llb.Scratch(), llb.AsPersistentCacheDir("mycache2", llb.CacheMountLocked))
|
||||
|
||||
def, err = out.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = c.Solve(context.TODO(), def, SolveOpt{}, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// containerd/containerd#2119
|
||||
func testDuplicateWhiteouts(t *testing.T, sb integration.Sandbox) {
|
||||
requiresLinux(t)
|
||||
|
|
|
@ -29,6 +29,7 @@ type llbBridge struct {
|
|||
builder solver.Builder
|
||||
frontends map[string]frontend.Frontend
|
||||
resolveWorker func() (worker.Worker, error)
|
||||
eachWorker func(func(worker.Worker) error) error
|
||||
resolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
|
||||
cms map[string]solver.CacheManager
|
||||
cmsMu sync.Mutex
|
||||
|
@ -91,11 +92,25 @@ func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (res *
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dpc := &detectPrunedCacheID{}
|
||||
|
||||
edge, err := Load(req.Definition, ValidateEntitlements(ent), WithCacheSources(cms), RuntimePlatforms(b.platforms), WithValidateCaps())
|
||||
edge, err := Load(req.Definition, dpc.Load, ValidateEntitlements(ent), WithCacheSources(cms), RuntimePlatforms(b.platforms), WithValidateCaps())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to load LLB")
|
||||
}
|
||||
|
||||
if len(dpc.ids) > 0 {
|
||||
ids := make([]string, 0, len(dpc.ids))
|
||||
for id := range dpc.ids {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
if err := b.eachWorker(func(w worker.Worker) error {
|
||||
return w.PruneCacheMounts(ctx, ids)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ref, err := b.builder.Build(ctx, edge)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to build LLB")
|
||||
|
|
|
@ -221,11 +221,13 @@ func (e *execOp) getMountDeps() ([]dep, error) {
|
|||
}
|
||||
|
||||
func (e *execOp) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, m *pb.Mount, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) {
|
||||
|
||||
key := "cache-dir:" + id
|
||||
if ref != nil {
|
||||
key += ":" + ref.ID()
|
||||
}
|
||||
mu := CacheMountsLocker()
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if ref, ok := e.cacheMounts[key]; ok {
|
||||
return ref.clone(), nil
|
||||
|
@ -792,10 +794,17 @@ type cacheRefs struct {
|
|||
shares map[string]*cacheRefShare
|
||||
}
|
||||
|
||||
func (r *cacheRefs) get(key string, fn func() (cache.MutableRef, error)) (cache.MutableRef, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
// ClearActiveCacheMounts clears shared cache mounts currently in use.
|
||||
// Caller needs to hold CacheMountsLocker before calling
|
||||
func ClearActiveCacheMounts() {
|
||||
sharedCacheRefs.shares = nil
|
||||
}
|
||||
|
||||
func CacheMountsLocker() sync.Locker {
|
||||
return &sharedCacheRefs.mu
|
||||
}
|
||||
|
||||
func (r *cacheRefs) get(key string, fn func() (cache.MutableRef, error)) (cache.MutableRef, error) {
|
||||
if r.shares == nil {
|
||||
r.shares = map[string]*cacheRefShare{}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ type Solver struct {
|
|||
workerController *worker.Controller
|
||||
solver *solver.Solver
|
||||
resolveWorker ResolveWorkerFunc
|
||||
eachWorker func(func(worker.Worker) error) error
|
||||
frontends map[string]frontend.Frontend
|
||||
resolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
|
||||
platforms []specs.Platform
|
||||
|
@ -51,6 +52,7 @@ func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.Cac
|
|||
s := &Solver{
|
||||
workerController: wc,
|
||||
resolveWorker: defaultResolver(wc),
|
||||
eachWorker: allWorkers(wc),
|
||||
frontends: f,
|
||||
resolveCacheImporterFuncs: resolveCI,
|
||||
gatewayForwarder: gatewayForwarder,
|
||||
|
@ -87,6 +89,7 @@ func (s *Solver) Bridge(b solver.Builder) frontend.FrontendLLBBridge {
|
|||
builder: b,
|
||||
frontends: s.frontends,
|
||||
resolveWorker: s.resolveWorker,
|
||||
eachWorker: s.eachWorker,
|
||||
resolveCacheImporterFuncs: s.resolveCacheImporterFuncs,
|
||||
cms: map[string]solver.CacheManager{},
|
||||
platforms: s.platforms,
|
||||
|
@ -285,6 +288,20 @@ func defaultResolver(wc *worker.Controller) ResolveWorkerFunc {
|
|||
return wc.GetDefault()
|
||||
}
|
||||
}
|
||||
func allWorkers(wc *worker.Controller) func(func(w worker.Worker) error) error {
|
||||
return func(f func(worker.Worker) error) error {
|
||||
all, err := wc.List()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, w := range all {
|
||||
if err := f(w); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func oneOffProgress(ctx context.Context, id string) func(err error) error {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
|
|
|
@ -131,6 +131,34 @@ func ValidateEntitlements(ent entitlements.Set) LoadOpt {
|
|||
}
|
||||
}
|
||||
|
||||
type detectPrunedCacheID struct {
|
||||
ids map[string]struct{}
|
||||
}
|
||||
|
||||
func (dpc *detectPrunedCacheID) Load(op *pb.Op, md *pb.OpMetadata, opt *solver.VertexOptions) error {
|
||||
if md == nil || !md.IgnoreCache {
|
||||
return nil
|
||||
}
|
||||
switch op := op.Op.(type) {
|
||||
case *pb.Op_Exec:
|
||||
for _, m := range op.Exec.GetMounts() {
|
||||
if m.MountType == pb.MountType_CACHE {
|
||||
if m.CacheOpt != nil {
|
||||
id := m.CacheOpt.ID
|
||||
if id == "" {
|
||||
id = m.Dest
|
||||
}
|
||||
if dpc.ids == nil {
|
||||
dpc.ids = map[string]struct{}{}
|
||||
}
|
||||
dpc.ids[id] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Load(def *pb.Definition, opts ...LoadOpt) (solver.Edge, error) {
|
||||
return loadLLB(def, func(dgst digest.Digest, pbOp *pb.Op, load func(digest.Digest) (solver.Vertex, error)) (solver.Vertex, error) {
|
||||
opMetadata := def.Metadata[dgst]
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
|
@ -51,6 +52,7 @@ import (
|
|||
specs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
|
@ -217,6 +219,46 @@ func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *se
|
|||
return nil, errors.Errorf("could not resolve %v", v)
|
||||
}
|
||||
|
||||
func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error {
|
||||
mu := ops.CacheMountsLocker()
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
for _, id := range ids {
|
||||
id = "cache-dir:" + id
|
||||
sis, err := w.MetadataStore.Search(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, si := range sis {
|
||||
for _, k := range si.Indexes() {
|
||||
if k == id || strings.HasPrefix(k, id+":") {
|
||||
if siCached := w.CacheManager.Metadata(si.ID()); siCached != nil {
|
||||
si = siCached
|
||||
}
|
||||
if err := cache.CachePolicyDefault(si); err != nil {
|
||||
return err
|
||||
}
|
||||
si.Queue(func(b *bolt.Bucket) error {
|
||||
return si.SetValue(b, k, nil)
|
||||
})
|
||||
if err := si.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
// if ref is unused try to clean it up right away by releasing it
|
||||
if mref, err := w.CacheManager.GetMutable(ctx, si.ID()); err == nil {
|
||||
go mref.Release(context.TODO())
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ops.ClearActiveCacheMounts()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error) {
|
||||
// ImageSource is typically source/containerimage
|
||||
resolveImageConfig, ok := w.ImageSource.(resolveImageConfig)
|
||||
|
|
|
@ -33,6 +33,7 @@ type Worker interface {
|
|||
Prune(ctx context.Context, ch chan client.UsageInfo, opt ...client.PruneInfo) error
|
||||
GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error)
|
||||
FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error)
|
||||
PruneCacheMounts(ctx context.Context, ids []string) error
|
||||
}
|
||||
|
||||
// Pre-defined label keys
|
||||
|
|
Loading…
Reference in New Issue