Refactor cache record mount cache.

This is mostly just preparation for merge-op. The existing
Extract method is updated to be usable for unlazying any type of refs
rather than just lazy blobs. The way views are created is simplified and
centralized in one location.

Signed-off-by: Erik Sipsma <erik@sipsma.dev>
master
Erik Sipsma 2021-10-28 12:59:26 -07:00
parent 03ed0548ef
commit 9321ec2f82
10 changed files with 297 additions and 231 deletions

View File

@ -1208,11 +1208,12 @@ func setupCacheManager(t *testing.T, tmpdir string, snapshotterName string, snap
md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db"))
require.NoError(t, err) require.NoError(t, err)
lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit")
cm, err := cache.NewManager(cache.ManagerOpt{ cm, err := cache.NewManager(cache.ManagerOpt{
Snapshotter: snapshot.FromContainerdSnapshotter(snapshotterName, containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter(snapshotterName)), nil), Snapshotter: snapshot.FromContainerdSnapshotter(snapshotterName, containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter(snapshotterName)), nil),
MetadataStore: md, MetadataStore: md,
LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"), LeaseManager: lm,
ContentStore: mdb.ContentStore(), ContentStore: mdb.ContentStore(),
GarbageCollect: mdb.GarbageCollect, GarbageCollect: mdb.GarbageCollect,
}) })

25
cache/manager.go vendored
View File

@ -216,10 +216,10 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor,
} }
}() }()
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{ if err := cm.LeaseManager.AddResource(ctx, l, leases.Resource{
ID: snapshotID, ID: snapshotID,
Type: "snapshots/" + cm.ManagerOpt.Snapshotter.Name(), Type: "snapshots/" + cm.Snapshotter.Name(),
}); err != nil { }); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id) return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id)
} }
@ -504,24 +504,25 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, sess session.Gr
} }
}() }()
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{ snapshotID := id
ID: id, if err := cm.LeaseManager.AddResource(ctx, leases.Lease{ID: id}, leases.Resource{
Type: "snapshots/" + cm.ManagerOpt.Snapshotter.Name(), ID: snapshotID,
}); err != nil { Type: "snapshots/" + cm.Snapshotter.Name(),
return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id) }); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", snapshotID)
} }
if cm.Snapshotter.Name() == "stargz" && parent != nil { if cm.Snapshotter.Name() == "stargz" && parent != nil {
if rerr := parent.withRemoteSnapshotLabelsStargzMode(ctx, sess, func() { if rerr := parent.withRemoteSnapshotLabelsStargzMode(ctx, sess, func() {
err = cm.Snapshotter.Prepare(ctx, id, parentSnapshotID) err = cm.Snapshotter.Prepare(ctx, snapshotID, parentSnapshotID)
}); rerr != nil { }); rerr != nil {
return nil, rerr return nil, rerr
} }
} else { } else {
err = cm.Snapshotter.Prepare(ctx, id, parentSnapshotID) err = cm.Snapshotter.Prepare(ctx, snapshotID, parentSnapshotID)
} }
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to prepare %s", id) return nil, errors.Wrapf(err, "failed to prepare %s", parentSnapshotID)
} }
cm.mu.Lock() cm.mu.Lock()
@ -538,7 +539,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, sess session.Gr
cacheMetadata: md, cacheMetadata: md,
} }
opts = append(opts, withSnapshotID(id)) opts = append(opts, withSnapshotID(snapshotID))
if err := initializeMetadata(rec.cacheMetadata, parentID, opts...); err != nil { if err := initializeMetadata(rec.cacheMetadata, parentID, opts...); err != nil {
return nil, err return nil, err
} }

265
cache/refs.go vendored
View File

@ -75,8 +75,7 @@ type cacheRecord struct {
// dead means record is marked as deleted // dead means record is marked as deleted
dead bool dead bool
view string mountCache snapshot.Mountable
viewMount snapshot.Mountable
sizeG flightcontrol.Group sizeG flightcontrol.Group
@ -164,6 +163,14 @@ func (cr *cacheRecord) IdentityMapping() *idtools.IdentityMapping {
return cr.cm.IdentityMapping() return cr.cm.IdentityMapping()
} }
func (cr *cacheRecord) viewLeaseID() string {
return cr.ID() + "-view"
}
func (cr *cacheRecord) viewSnapshotID() string {
return cr.getSnapshotID() + "-view"
}
func (cr *cacheRecord) size(ctx context.Context) (int64, error) { func (cr *cacheRecord) size(ctx context.Context) (int64, error) {
// this expects that usage() is implemented lazily // this expects that usage() is implemented lazily
s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) { s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) {
@ -239,52 +246,59 @@ func (cr *cacheRecord) parentRef(hidden bool, descHandlers DescHandlers) *immuta
return p.ref(hidden, descHandlers) return p.ref(hidden, descHandlers)
} }
// must be called holding cacheRecord mu // caller must hold cr.mu
func (cr *cacheRecord) mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { func (cr *cacheRecord) mount(ctx context.Context, s session.Group) (_ snapshot.Mountable, rerr error) {
if cr.mountCache != nil {
return cr.mountCache, nil
}
var mountSnapshotID string
if cr.mutable { if cr.mutable {
m, err := cr.cm.Snapshotter.Mounts(ctx, cr.getSnapshotID()) mountSnapshotID = cr.getSnapshotID()
if err != nil { } else if cr.equalMutable != nil {
return nil, errors.Wrapf(err, "failed to mount %s", cr.ID()) mountSnapshotID = cr.equalMutable.getSnapshotID()
} } else {
if readonly { mountSnapshotID = cr.viewSnapshotID()
m = setReadonly(m) if _, err := cr.cm.LeaseManager.Create(ctx, func(l *leases.Lease) error {
} l.ID = cr.viewLeaseID()
return m, nil
}
if cr.equalMutable != nil && readonly {
m, err := cr.cm.Snapshotter.Mounts(ctx, cr.equalMutable.getSnapshotID())
if err != nil {
return nil, errors.Wrapf(err, "failed to mount %s", cr.equalMutable.ID())
}
return setReadonly(m), nil
}
if err := cr.finalize(ctx); err != nil {
return nil, err
}
if cr.viewMount == nil { // TODO: handle this better
view := identity.NewID()
l, err := cr.cm.LeaseManager.Create(ctx, func(l *leases.Lease) error {
l.ID = view
l.Labels = map[string]string{ l.Labels = map[string]string{
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano), "containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
} }
return nil return nil
}, leaseutil.MakeTemporary) }, leaseutil.MakeTemporary); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
defer func() {
if rerr != nil {
cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.viewLeaseID()})
}
}()
if err := cr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: cr.viewLeaseID()}, leases.Resource{
ID: mountSnapshotID,
Type: "snapshots/" + cr.cm.Snapshotter.Name(),
}); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
// Return the mount direct from View rather than setting it using the Mounts call below.
// The two are equivalent for containerd snapshotters but the moby snapshotter requires
// the use of the mountable returned by View in this case.
mnts, err := cr.cm.Snapshotter.View(ctx, mountSnapshotID, cr.getSnapshotID())
if err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
cr.mountCache = mnts
}
if cr.mountCache != nil {
return cr.mountCache, nil
}
mnts, err := cr.cm.Snapshotter.Mounts(ctx, mountSnapshotID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ctx = leases.WithLease(ctx, l.ID) cr.mountCache = mnts
m, err := cr.cm.Snapshotter.View(ctx, view, cr.getSnapshotID()) return cr.mountCache, nil
if err != nil {
cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: l.ID})
return nil, errors.Wrapf(err, "failed to mount %s", cr.ID())
}
cr.view = view
cr.viewMount = m
}
return cr.viewMount, nil
} }
// call when holding the manager lock // call when holding the manager lock
@ -299,8 +313,10 @@ func (cr *cacheRecord) remove(ctx context.Context, removeSnapshot bool) error {
} }
} }
if removeSnapshot { if removeSnapshot {
if err := cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}); err != nil { if err := cr.cm.LeaseManager.Delete(ctx, leases.Lease{
return errors.Wrapf(err, "failed to remove %s", cr.ID()) ID: cr.ID(),
}); err != nil && !errdefs.IsNotFound(err) {
return errors.Wrapf(err, "failed to delete lease for %s", cr.ID())
} }
} }
if err := cr.cm.MetadataStore.Clear(cr.ID()); err != nil { if err := cr.cm.MetadataStore.Clear(cr.ID()); err != nil {
@ -362,6 +378,11 @@ func (sr *immutableRef) Clone() ImmutableRef {
} }
func (sr *immutableRef) ociDesc(ctx context.Context, dhs DescHandlers) (ocispecs.Descriptor, error) { func (sr *immutableRef) ociDesc(ctx context.Context, dhs DescHandlers) (ocispecs.Descriptor, error) {
dgst := sr.getBlob()
if dgst == "" {
return ocispecs.Descriptor{}, errors.Errorf("no blob set for cache record %s", sr.ID())
}
desc := ocispecs.Descriptor{ desc := ocispecs.Descriptor{
Digest: sr.getBlob(), Digest: sr.getBlob(),
Size: sr.getBlobSize(), Size: sr.getBlobSize(),
@ -446,7 +467,7 @@ func getCompressionVariantBlob(ctx context.Context, cs content.Store, dgst diges
func (sr *immutableRef) addCompressionBlob(ctx context.Context, desc ocispecs.Descriptor, compressionType compression.Type) error { func (sr *immutableRef) addCompressionBlob(ctx context.Context, desc ocispecs.Descriptor, compressionType compression.Type) error {
cs := sr.cm.ContentStore cs := sr.cm.ContentStore
if err := sr.cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{ if err := sr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{
ID: desc.Digest.String(), ID: desc.Digest.String(),
Type: "content", Type: "content",
}); err != nil { }); err != nil {
@ -544,7 +565,13 @@ func (sr *immutableRef) parentRefChain() []*immutableRef {
return refs return refs
} }
func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (snapshot.Mountable, error) { func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (_ snapshot.Mountable, rerr error) {
if sr.equalMutable != nil && !readonly {
if err := sr.Finalize(ctx); err != nil {
return nil, err
}
}
if err := sr.Extract(ctx, s); err != nil { if err := sr.Extract(ctx, s); err != nil {
return nil, err return nil, err
} }
@ -552,35 +579,33 @@ func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Grou
sr.mu.Lock() sr.mu.Lock()
defer sr.mu.Unlock() defer sr.mu.Unlock()
if sr.mountCache != nil {
return sr.mountCache, nil
}
var mnt snapshot.Mountable
if sr.cm.Snapshotter.Name() == "stargz" { if sr.cm.Snapshotter.Name() == "stargz" {
var (
m snapshot.Mountable
rerr error
)
if err := sr.withRemoteSnapshotLabelsStargzMode(ctx, s, func() { if err := sr.withRemoteSnapshotLabelsStargzMode(ctx, s, func() {
m, rerr = sr.mount(ctx, readonly) mnt, rerr = sr.mount(ctx, s)
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
return m, rerr } else {
mnt, rerr = sr.mount(ctx, s)
}
if rerr != nil {
return nil, rerr
} }
return sr.mount(ctx, readonly) if readonly {
mnt = setReadonly(mnt)
}
return mnt, nil
} }
func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr error) { func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr error) {
if !sr.getBlobOnly() { if !sr.getBlobOnly() {
return return nil
}
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return err
}
defer done(ctx)
if sr.GetLayerType() == "windows" {
ctx = winlayers.UseWindowsLayerMode(ctx)
} }
if sr.cm.Snapshotter.Name() == "stargz" { if sr.cm.Snapshotter.Name() == "stargz" {
@ -588,14 +613,14 @@ func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr erro
if rerr = sr.prepareRemoteSnapshotsStargzMode(ctx, s); rerr != nil { if rerr = sr.prepareRemoteSnapshotsStargzMode(ctx, s); rerr != nil {
return return
} }
rerr = sr.extract(ctx, sr.descHandlers, s) rerr = sr.unlazy(ctx, sr.descHandlers, s)
}); err != nil { }); err != nil {
return err return err
} }
return rerr return rerr
} }
return sr.extract(ctx, sr.descHandlers, s) return sr.unlazy(ctx, sr.descHandlers, s)
} }
func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, s session.Group, f func()) error { func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, s session.Group, f func()) error {
@ -726,15 +751,38 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields
return return
} }
func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session.Group) error { func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, s session.Group) error {
_, err := sr.sizeG.Do(ctx, sr.ID()+"-extract", func(ctx context.Context) (_ interface{}, rerr error) { _, err := sr.sizeG.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ interface{}, rerr error) {
snapshotID := sr.getSnapshotID() if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil {
if _, err := sr.cm.Snapshotter.Stat(ctx, snapshotID); err == nil {
return nil, nil return nil, nil
} }
return nil, sr.unlazyLayer(ctx, dhs, s)
})
return err
}
// should be called within sizeG.Do call for this ref's ID
func (sr *immutableRef) unlazyLayer(ctx context.Context, dhs DescHandlers, s session.Group) (rerr error) {
if !sr.getBlobOnly() {
return nil
}
if sr.cm.Applier == nil { if sr.cm.Applier == nil {
return nil, errors.New("extract requires an applier") return errors.New("unlazy requires an applier")
}
if _, ok := leases.FromContext(ctx); !ok {
leaseCtx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return err
}
defer done(leaseCtx)
ctx = leaseCtx
}
if sr.GetLayerType() == "windows" {
ctx = winlayers.UseWindowsLayerMode(ctx)
} }
eg, egctx := errgroup.WithContext(ctx) eg, egctx := errgroup.WithContext(ctx)
@ -742,7 +790,7 @@ func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session
parentID := "" parentID := ""
if sr.parent != nil { if sr.parent != nil {
eg.Go(func() error { eg.Go(func() error {
if err := sr.parent.extract(egctx, dhs, s); err != nil { if err := sr.parent.unlazy(egctx, dhs, s); err != nil {
return err return err
} }
parentID = sr.parent.getSnapshotID() parentID = sr.parent.getSnapshotID()
@ -752,7 +800,7 @@ func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session
desc, err := sr.ociDesc(ctx, dhs) desc, err := sr.ociDesc(ctx, dhs)
if err != nil { if err != nil {
return nil, err return err
} }
dh := dhs[desc.Digest] dh := dhs[desc.Digest]
@ -767,7 +815,7 @@ func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session
}) })
if err := eg.Wait(); err != nil { if err := eg.Wait(); err != nil {
return nil, err return err
} }
if dh != nil && dh.Progress != nil { if dh != nil && dh.Progress != nil {
@ -781,40 +829,38 @@ func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session
err = sr.cm.Snapshotter.Prepare(ctx, key, parentID) err = sr.cm.Snapshotter.Prepare(ctx, key, parentID)
if err != nil { if err != nil {
return nil, err return err
} }
mountable, err := sr.cm.Snapshotter.Mounts(ctx, key) mountable, err := sr.cm.Snapshotter.Mounts(ctx, key)
if err != nil { if err != nil {
return nil, err return err
} }
mounts, unmount, err := mountable.Mount() mounts, unmount, err := mountable.Mount()
if err != nil { if err != nil {
return nil, err return err
} }
_, err = sr.cm.Applier.Apply(ctx, desc, mounts) _, err = sr.cm.Applier.Apply(ctx, desc, mounts)
if err != nil { if err != nil {
unmount() unmount()
return nil, err return err
} }
if err := unmount(); err != nil { if err := unmount(); err != nil {
return nil, err return err
} }
if err := sr.cm.Snapshotter.Commit(ctx, sr.getSnapshotID(), key); err != nil { if err := sr.cm.Snapshotter.Commit(ctx, sr.getSnapshotID(), key); err != nil {
if !errors.Is(err, errdefs.ErrAlreadyExists) { if !errors.Is(err, errdefs.ErrAlreadyExists) {
return nil, err return err
} }
} }
sr.queueBlobOnly(false) sr.queueBlobOnly(false)
sr.queueSize(sizeUnknown) sr.queueSize(sizeUnknown)
if err := sr.commitMetadata(); err != nil { if err := sr.commitMetadata(); err != nil {
return nil, err
}
return nil, nil
})
return err return err
} }
return nil
}
func (sr *immutableRef) Release(ctx context.Context) error { func (sr *immutableRef) Release(ctx context.Context) error {
sr.cm.mu.Lock() sr.cm.mu.Lock()
@ -853,16 +899,13 @@ func (sr *immutableRef) release(ctx context.Context) error {
} }
if len(sr.refs) == 0 { if len(sr.refs) == 0 {
if sr.viewMount != nil { // TODO: release viewMount earlier if possible
if err := sr.cm.LeaseManager.Delete(ctx, leases.Lease{ID: sr.view}); err != nil {
return errors.Wrapf(err, "failed to remove view lease %s", sr.view)
}
sr.view = ""
sr.viewMount = nil
}
if sr.equalMutable != nil { if sr.equalMutable != nil {
sr.equalMutable.release(ctx) sr.equalMutable.release(ctx)
} else {
if err := sr.cm.LeaseManager.Delete(ctx, leases.Lease{ID: sr.viewLeaseID()}); err != nil && !errdefs.IsNotFound(err) {
return err
}
sr.mountCache = nil
} }
} }
@ -895,19 +938,20 @@ func (cr *cacheRecord) finalize(ctx context.Context) error {
} }
} }
if err := cr.cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: cr.ID()}, leases.Resource{ if err := cr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: cr.ID()}, leases.Resource{
ID: cr.ID(), ID: cr.getSnapshotID(),
Type: "snapshots/" + cr.cm.ManagerOpt.Snapshotter.Name(), Type: "snapshots/" + cr.cm.Snapshotter.Name(),
}); err != nil { }); err != nil {
cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}) cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()})
return errors.Wrapf(err, "failed to add snapshot %s to lease", cr.ID()) return errors.Wrapf(err, "failed to add snapshot %s to lease", cr.getSnapshotID())
} }
err = cr.cm.Snapshotter.Commit(ctx, cr.ID(), mutable.ID()) if err := cr.cm.Snapshotter.Commit(ctx, cr.getSnapshotID(), mutable.getSnapshotID()); err != nil {
if err != nil {
cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}) cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()})
return errors.Wrapf(err, "failed to commit %s", mutable.ID()) return errors.Wrapf(err, "failed to commit %s to %s", mutable.getSnapshotID(), cr.getSnapshotID())
} }
cr.mountCache = nil
mutable.dead = true mutable.dead = true
go func() { go func() {
cr.cm.mu.Lock() cr.cm.mu.Lock()
@ -919,7 +963,6 @@ func (cr *cacheRecord) finalize(ctx context.Context) error {
cr.equalMutable = nil cr.equalMutable = nil
cr.clearEqualMutable() cr.clearEqualMutable()
cr.queueSnapshotID(cr.ID())
return cr.commitMetadata() return cr.commitMetadata()
} }
@ -965,7 +1008,7 @@ func (sr *mutableRef) commit(ctx context.Context) (*immutableRef, error) {
md.queueCommitted(true) md.queueCommitted(true)
md.queueSize(sizeUnknown) md.queueSize(sizeUnknown)
md.queueSnapshotID(sr.getSnapshotID()) md.queueSnapshotID(id)
md.setEqualMutable(sr.ID()) md.setEqualMutable(sr.ID())
if err := md.commitMetadata(); err != nil { if err := md.commitMetadata(); err != nil {
return nil, err return nil, err
@ -976,24 +1019,32 @@ func (sr *mutableRef) commit(ctx context.Context) (*immutableRef, error) {
return ref, nil return ref, nil
} }
func (sr *mutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (snapshot.Mountable, error) { func (sr *mutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (_ snapshot.Mountable, rerr error) {
sr.mu.Lock() sr.mu.Lock()
defer sr.mu.Unlock() defer sr.mu.Unlock()
if sr.mountCache != nil {
return sr.mountCache, nil
}
var mnt snapshot.Mountable
if sr.cm.Snapshotter.Name() == "stargz" && sr.parent != nil { if sr.cm.Snapshotter.Name() == "stargz" && sr.parent != nil {
var (
m snapshot.Mountable
rerr error
)
if err := sr.parent.withRemoteSnapshotLabelsStargzMode(ctx, s, func() { if err := sr.parent.withRemoteSnapshotLabelsStargzMode(ctx, s, func() {
m, rerr = sr.mount(ctx, readonly) mnt, rerr = sr.mount(ctx, s)
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
return m, rerr } else {
mnt, rerr = sr.mount(ctx, s)
}
if rerr != nil {
return nil, rerr
} }
return sr.mount(ctx, readonly) if readonly {
mnt = setReadonly(mnt)
}
return mnt, nil
} }
func (sr *mutableRef) Commit(ctx context.Context) (ImmutableRef, error) { func (sr *mutableRef) Commit(ctx context.Context) (ImmutableRef, error) {

View File

@ -2,9 +2,7 @@ package snapshot
import ( import (
"context" "context"
"os"
"sync" "sync"
"sync/atomic"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots"
@ -70,29 +68,6 @@ func (s *fromContainerd) IdentityMapping() *idtools.IdentityMapping {
return s.idmap return s.idmap
} }
type staticMountable struct {
count int32
id string
mounts []mount.Mount
idmap *idtools.IdentityMapping
}
func (cm *staticMountable) Mount() ([]mount.Mount, func() error, error) {
atomic.AddInt32(&cm.count, 1)
return cm.mounts, func() error {
if atomic.AddInt32(&cm.count, -1) < 0 {
if v := os.Getenv("BUILDKIT_DEBUG_PANIC_ON_ERROR"); v == "1" {
panic("release of released mount " + cm.id)
}
}
return nil
}, nil
}
func (cm *staticMountable) IdentityMapping() *idtools.IdentityMapping {
return cm.idmap
}
// NewContainerdSnapshotter converts snapshotter to containerd snapshotter // NewContainerdSnapshotter converts snapshotter to containerd snapshotter
func NewContainerdSnapshotter(s Snapshotter) (snapshots.Snapshotter, func() error) { func NewContainerdSnapshotter(s Snapshotter) (snapshots.Snapshotter, func() error) {
cs := &containerdSnapshotter{Snapshotter: s} cs := &containerdSnapshotter{Snapshotter: s}

View File

@ -0,0 +1,36 @@
package snapshot
import (
"os"
"sync/atomic"
"github.com/containerd/containerd/mount"
"github.com/docker/docker/pkg/idtools"
)
type staticMountable struct {
count int32
id string
mounts []mount.Mount
idmap *idtools.IdentityMapping
}
func (cm *staticMountable) Mount() ([]mount.Mount, func() error, error) {
// return a copy to prevent changes to mount.Mounts in the slice from affecting cm
mounts := make([]mount.Mount, len(cm.mounts))
copy(mounts, cm.mounts)
atomic.AddInt32(&cm.count, 1)
return cm.mounts, func() error {
if atomic.AddInt32(&cm.count, -1) < 0 {
if v := os.Getenv("BUILDKIT_DEBUG_PANIC_ON_ERROR"); v == "1" {
panic("release of released mount " + cm.id)
}
}
return nil
}, nil
}
func (cm *staticMountable) IdentityMapping() *idtools.IdentityMapping {
return cm.idmap
}

View File

@ -108,7 +108,7 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func()
return nil, nil, err return nil, nil, err
} }
lm := ctdmetadata.NewLeaseManager(mdb) lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), ns)
md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db"))
if err != nil { if err != nil {
@ -119,7 +119,7 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func()
Snapshotter: snapshot.FromContainerdSnapshotter(opt.snapshotterName, containerdsnapshot.NSSnapshotter(ns, mdb.Snapshotter(opt.snapshotterName)), nil), Snapshotter: snapshot.FromContainerdSnapshotter(opt.snapshotterName, containerdsnapshot.NSSnapshotter(ns, mdb.Snapshotter(opt.snapshotterName)), nil),
MetadataStore: md, MetadataStore: md,
ContentStore: mdb.ContentStore(), ContentStore: mdb.ContentStore(),
LeaseManager: leaseutil.WithNamespace(lm, ns), LeaseManager: lm,
GarbageCollect: mdb.GarbageCollect, GarbageCollect: mdb.GarbageCollect,
Applier: apply.NewFileSystemApplier(mdb.ContentStore()), Applier: apply.NewFileSystemApplier(mdb.ContentStore()),
}) })

View File

@ -83,7 +83,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
require.NoError(t, err) require.NoError(t, err)
defer ref1.Release(context.TODO()) defer ref1.Release(context.TODO())
mount, err := ref1.Mount(ctx, false, nil) mount, err := ref1.Mount(ctx, true, nil)
require.NoError(t, err) require.NoError(t, err)
lm := snapshot.LocalMounter(mount) lm := snapshot.LocalMounter(mount)
@ -136,7 +136,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
require.NoError(t, err) require.NoError(t, err)
defer ref3.Release(context.TODO()) defer ref3.Release(context.TODO())
mount, err = ref3.Mount(ctx, false, nil) mount, err = ref3.Mount(ctx, true, nil)
require.NoError(t, err) require.NoError(t, err)
lm = snapshot.LocalMounter(mount) lm = snapshot.LocalMounter(mount)
@ -213,7 +213,7 @@ func testFetchBySHA(t *testing.T, keepGitDir bool) {
require.NoError(t, err) require.NoError(t, err)
defer ref1.Release(context.TODO()) defer ref1.Release(context.TODO())
mount, err := ref1.Mount(ctx, false, nil) mount, err := ref1.Mount(ctx, true, nil)
require.NoError(t, err) require.NoError(t, err)
lm := snapshot.LocalMounter(mount) lm := snapshot.LocalMounter(mount)
@ -306,7 +306,7 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) {
require.NoError(t, err) require.NoError(t, err)
defer ref1.Release(context.TODO()) defer ref1.Release(context.TODO())
mount, err := ref1.Mount(ctx, false, nil) mount, err := ref1.Mount(ctx, true, nil)
require.NoError(t, err) require.NoError(t, err)
lm := snapshot.LocalMounter(mount) lm := snapshot.LocalMounter(mount)
@ -318,7 +318,7 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) {
require.NoError(t, err) require.NoError(t, err)
defer ref2.Release(context.TODO()) defer ref2.Release(context.TODO())
mount, err = ref2.Mount(ctx, false, nil) mount, err = ref2.Mount(ctx, true, nil)
require.NoError(t, err) require.NoError(t, err)
lm = snapshot.LocalMounter(mount) lm = snapshot.LocalMounter(mount)
@ -420,7 +420,7 @@ func testSubdir(t *testing.T, keepGitDir bool) {
require.NoError(t, err) require.NoError(t, err)
defer ref1.Release(context.TODO()) defer ref1.Release(context.TODO())
mount, err := ref1.Mount(ctx, false, nil) mount, err := ref1.Mount(ctx, true, nil)
require.NoError(t, err) require.NoError(t, err)
lm := snapshot.LocalMounter(mount) lm := snapshot.LocalMounter(mount)
@ -455,11 +455,12 @@ func setupGitSource(t *testing.T, tmpdir string) source.Source {
md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db"))
require.NoError(t, err) require.NoError(t, err)
lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit")
cm, err := cache.NewManager(cache.ManagerOpt{ cm, err := cache.NewManager(cache.ManagerOpt{
Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil), Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil),
MetadataStore: md, MetadataStore: md,
LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"), LeaseManager: lm,
ContentStore: mdb.ContentStore(), ContentStore: mdb.ContentStore(),
GarbageCollect: mdb.GarbageCollect, GarbageCollect: mdb.GarbageCollect,
}) })

View File

@ -313,7 +313,7 @@ func TestHTTPChecksum(t *testing.T) {
} }
func readFile(ctx context.Context, ref cache.ImmutableRef, fp string) ([]byte, error) { func readFile(ctx context.Context, ref cache.ImmutableRef, fp string) ([]byte, error) {
mount, err := ref.Mount(ctx, false, nil) mount, err := ref.Mount(ctx, true, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -358,11 +358,12 @@ func newHTTPSource(tmpdir string) (source.Source, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit")
cm, err := cache.NewManager(cache.ManagerOpt{ cm, err := cache.NewManager(cache.ManagerOpt{
Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil), Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil),
MetadataStore: md, MetadataStore: md,
LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"), LeaseManager: lm,
ContentStore: mdb.ContentStore(), ContentStore: mdb.ContentStore(),
GarbageCollect: mdb.GarbageCollect, GarbageCollect: mdb.GarbageCollect,
}) })

View File

@ -101,8 +101,8 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
for k, v := range labels { for k, v := range labels {
xlabels[k] = v xlabels[k] = v
} }
snap := containerdsnapshot.NewSnapshotter(snFactory.Name, mdb.Snapshotter(snFactory.Name), "buildkit", idmap)
lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit") lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit")
snap := containerdsnapshot.NewSnapshotter(snFactory.Name, mdb.Snapshotter(snFactory.Name), "buildkit", idmap)
if err := cache.MigrateV2( if err := cache.MigrateV2(
context.TODO(), context.TODO(),

View File

@ -73,7 +73,7 @@ func TestRuncWorker(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
snap := tests.NewBusyboxSourceSnapshot(ctx, t, w, sm) snap := tests.NewBusyboxSourceSnapshot(ctx, t, w, sm)
mounts, err := snap.Mount(ctx, false, nil) mounts, err := snap.Mount(ctx, true, nil)
require.NoError(t, err) require.NoError(t, err)
lm := snapshot.LocalMounter(mounts) lm := snapshot.LocalMounter(mounts)
@ -98,7 +98,7 @@ func TestRuncWorker(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// for _, d := range du { // for _, d := range du {
// fmt.Printf("du: %+v\n", d) // t.Logf("du: %+v\n", d)
// } // }
for _, d := range du { for _, d := range du {
@ -111,16 +111,16 @@ func TestRuncWorker(t *testing.T) {
} }
stderr := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil)
err = w.WorkerOpt.Executor.Run(ctx, "", execMount(snap), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) err = w.WorkerOpt.Executor.Run(ctx, "", execMount(snap, true), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil)
require.Error(t, err) // Read-only root require.Error(t, err) // Read-only root
// typical error is like `mkdir /.../rootfs/proc: read-only file system`. // typical error is like `mkdir /.../rootfs/proc: read-only file system`.
// make sure the error is caused before running `echo foo > /bar`. // make sure the error is caused before running `echo foo > /bar`.
require.Contains(t, stderr.String(), "read-only file system") require.Contains(t, stderr.String(), "read-only file system")
root, err := w.CacheMgr.New(ctx, snap, nil) root, err := w.CacheMgr.New(ctx, snap, nil, cache.CachePolicyRetain)
require.NoError(t, err) require.NoError(t, err)
err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root, false), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil)
require.NoError(t, err) require.NoError(t, err)
meta = executor.Meta{ meta = executor.Meta{
@ -128,13 +128,13 @@ func TestRuncWorker(t *testing.T) {
Cwd: "/", Cwd: "/",
} }
err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root, false), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil)
require.NoError(t, err) require.NoError(t, err)
rf, err := root.Commit(ctx) rf, err := root.Commit(ctx)
require.NoError(t, err) require.NoError(t, err)
mounts, err = rf.Mount(ctx, false, nil) mounts, err = rf.Mount(ctx, true, nil)
require.NoError(t, err) require.NoError(t, err)
lm = snapshot.LocalMounter(mounts) lm = snapshot.LocalMounter(mounts)
@ -207,7 +207,7 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) {
} }
stdout := bytes.NewBuffer(nil) stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil)
err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root), nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}, nil) err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root, false), nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}, nil)
require.NoError(t, err, fmt.Sprintf("stdout=%q, stderr=%q", stdout.String(), stderr.String())) require.NoError(t, err, fmt.Sprintf("stdout=%q, stderr=%q", stdout.String(), stderr.String()))
require.Equal(t, string(selfCmdline), stdout.String()) require.Equal(t, string(selfCmdline), stdout.String())
} }
@ -244,8 +244,8 @@ func (n *nopCloser) Close() error {
return nil return nil
} }
func execMount(m cache.Mountable) executor.Mount { func execMount(m cache.Mountable, readonly bool) executor.Mount {
return executor.Mount{Src: &mountable{m: m}} return executor.Mount{Src: &mountable{m: m}, Readonly: readonly}
} }
type mountable struct { type mountable struct {