cache: update components to new lease based cache manager

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
v0.7
Tonis Tiigi 2019-09-17 17:18:32 -07:00
parent b9ec2bc4d4
commit 688e2c2272
18 changed files with 829 additions and 566 deletions

56
cache/blobs/blobs.go vendored
View File

@ -2,13 +2,11 @@ package blobs
import (
"context"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/mount"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/winlayers"
digest "github.com/opencontainers/go-digest"
@ -28,7 +26,7 @@ type DiffPair struct {
var ErrNoBlobs = errors.Errorf("no blobs for snapshot")
func GetDiffPairs(ctx context.Context, contentStore content.Store, snapshotter snapshot.Snapshotter, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool) ([]DiffPair, error) {
func GetDiffPairs(ctx context.Context, contentStore content.Store, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool) ([]DiffPair, error) {
if ref == nil {
return nil, nil
}
@ -41,22 +39,23 @@ func GetDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s
ctx = winlayers.UseWindowsLayerMode(ctx)
}
return getDiffPairs(ctx, contentStore, snapshotter, differ, ref, createBlobs)
return getDiffPairs(ctx, contentStore, differ, ref, createBlobs)
}
func getDiffPairs(ctx context.Context, contentStore content.Store, snapshotter snapshot.Snapshotter, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool) ([]DiffPair, error) {
func getDiffPairs(ctx context.Context, contentStore content.Store, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool) ([]DiffPair, error) {
if ref == nil {
return nil, nil
}
baseCtx := ctx
eg, ctx := errgroup.WithContext(ctx)
var diffPairs []DiffPair
var currentPair DiffPair
var currentDescr ocispec.Descriptor
parent := ref.Parent()
if parent != nil {
defer parent.Release(context.TODO())
eg.Go(func() error {
dp, err := getDiffPairs(ctx, contentStore, snapshotter, differ, parent, createBlobs)
dp, err := getDiffPairs(ctx, contentStore, differ, parent, createBlobs)
if err != nil {
return err
}
@ -66,12 +65,9 @@ func getDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s
}
eg.Go(func() error {
dp, err := g.Do(ctx, ref.ID(), func(ctx context.Context) (interface{}, error) {
diffID, blob, err := snapshotter.GetBlob(ctx, ref.ID())
if err != nil {
return nil, err
}
if blob != "" {
return DiffPair{DiffID: diffID, Blobsum: blob}, nil
refInfo := ref.Info()
if refInfo.Blob != "" {
return nil, nil
} else if !createBlobs {
return nil, errors.WithStack(ErrNoBlobs)
}
@ -107,9 +103,6 @@ func getDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s
descr, err := differ.Compare(ctx, lower, upper,
diff.WithMediaType(ocispec.MediaTypeImageLayerGzip),
diff.WithReference(ref.ID()),
diff.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
}),
)
if err != nil {
return nil, err
@ -118,30 +111,37 @@ func getDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s
if err != nil {
return nil, err
}
diffIDStr, ok := info.Labels[containerdUncompressed]
if !ok {
if diffID, ok := info.Labels[containerdUncompressed]; !ok {
return nil, errors.Errorf("invalid differ response with no diffID: %v", descr.Digest)
} else {
if descr.Annotations == nil {
descr.Annotations = map[string]string{}
}
descr.Annotations[containerdUncompressed] = diffID
}
diffIDDigest, err := digest.Parse(diffIDStr)
if err != nil {
return nil, err
}
if err := snapshotter.SetBlob(ctx, ref.ID(), diffIDDigest, descr.Digest); err != nil {
return nil, err
}
return DiffPair{DiffID: diffIDDigest, Blobsum: descr.Digest}, nil
return descr, nil
})
if err != nil {
return err
}
currentPair = dp.(DiffPair)
if dp != nil {
currentDescr = dp.(ocispec.Descriptor)
}
return nil
})
err := eg.Wait()
if err != nil {
return nil, err
}
return append(diffPairs, currentPair), nil
if currentDescr.Digest != "" {
if err := ref.SetBlob(baseCtx, currentDescr); err != nil {
return nil, err
}
}
refInfo := ref.Info()
return append(diffPairs, DiffPair{DiffID: refInfo.DiffID, Blobsum: refInfo.Blob}), nil
}
func isTypeWindows(ref cache.ImmutableRef) bool {

82
cache/manager.go vendored
View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/leases"
@ -17,6 +18,7 @@ import (
"github.com/moby/buildkit/snapshot"
"github.com/opencontainers/go-digest"
imagespaceidentity "github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
@ -29,16 +31,17 @@ var (
)
type ManagerOpt struct {
Snapshotter snapshot.SnapshotterBase
Snapshotter snapshot.Snapshotter
MetadataStore *metadata.Store
ContentStore content.Store
LeaseManager leases.Manager
PruneRefChecker ExternalRefCheckerFunc
GarbageCollect func(ctx context.Context) (gc.Stats, error)
Applier diff.Applier
}
type Accessor interface {
GetByBlob(ctx context.Context, diffID, blobID digest.Digest, parent ImmutableRef, opts ...RefOption) (ImmutableRef, error)
GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ImmutableRef, error)
Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error)
New(ctx context.Context, parent ImmutableRef, opts ...RefOption) (MutableRef, error)
@ -58,7 +61,7 @@ type Manager interface {
Close() error
}
type ExternalRefCheckerFunc func() (ExternalRefChecker, error)
type ExternalRefCheckerFunc func(Accessor) (ExternalRefChecker, error)
type ExternalRefChecker interface {
Exists(key string) bool
@ -89,12 +92,16 @@ func NewManager(opt ManagerOpt) (Manager, error) {
return cm, nil
}
func (cm *cacheManager) GetByBlob(ctx context.Context, diffID, blobID digest.Digest, parent ImmutableRef, opts ...RefOption) (ir ImmutableRef, err error) {
func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ir ImmutableRef, err error) {
diffID, err := diffIDFromDescriptor(desc)
if err != nil {
return nil, err
}
chainID := diffID
blobChainID := imagespaceidentity.ChainID([]digest.Digest{blobID, diffID})
blobChainID := imagespaceidentity.ChainID([]digest.Digest{desc.Digest, diffID})
if _, err := cm.ContentStore.Info(ctx, blobID); err != nil {
return nil, errors.Wrapf(err, "failed to get blob %s", blobID)
if _, err := cm.ContentStore.Info(ctx, desc.Digest); err != nil {
return nil, errors.Wrapf(err, "failed to get blob %s", desc.Digest)
}
var p *immutableRef
@ -106,8 +113,15 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, diffID, blobID digest.Dig
}
chainID = imagespaceidentity.ChainID([]digest.Digest{pInfo.ChainID, chainID})
blobChainID = imagespaceidentity.ChainID([]digest.Digest{pInfo.BlobChainID, blobChainID})
p = parent.(*immutableRef)
parentID = p.ID()
p2, err := cm.Get(ctx, parent.ID(), NoUpdateLastUsed)
if err != nil {
return nil, err
}
if err := p2.Finalize(ctx, true); err != nil {
return nil, err
}
parentID = p2.ID()
p = p2.(*immutableRef)
}
defer func() {
@ -177,14 +191,14 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, diffID, blobID digest.Dig
}()
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{
ID: id,
Type: "snapshots/" + snapshotID,
ID: snapshotID,
Type: "snapshots/" + cm.ManagerOpt.Snapshotter.Name(),
}); err != nil {
return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id)
}
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: id}, leases.Resource{
ID: blobID.String(),
ID: desc.Digest.String(),
Type: "content",
}); err != nil {
return nil, errors.Wrapf(err, "failed to add blob %s to lease", id)
@ -205,11 +219,13 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, diffID, blobID digest.Dig
}
queueDiffID(rec.md, diffID.String())
queueBlob(rec.md, blobID.String())
queueBlob(rec.md, desc.Digest.String())
queueChainID(rec.md, chainID.String())
queueBlobChainID(rec.md, blobChainID.String())
queueSnapshotID(rec.md, snapshotID)
queueBlobOnly(rec.md, blobOnly)
queueMediaType(rec.md, desc.MediaType)
queueCommitted(rec.md)
if err := rec.md.Commit(); err != nil {
return nil, err
@ -230,7 +246,7 @@ func (cm *cacheManager) init(ctx context.Context) error {
for _, si := range items {
if _, err := cm.getRecord(ctx, si.ID()); err != nil {
logrus.Debugf("could not load snapshot %s: %v", si.ID(), err)
logrus.Debugf("could not load snapshot %s: %+v", si.ID(), err)
cm.md.Clear(si.ID())
// TODO: make sure content is deleted as well
}
@ -338,14 +354,10 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt
return rec, nil
}
info, err := cm.Snapshotter.Stat(ctx, id)
if err != nil {
return nil, errors.Wrap(errNotFound, err.Error())
}
var parent *immutableRef
if info.Parent != "" {
parent, err = cm.get(ctx, info.Parent, append(opts, NoUpdateLastUsed)...)
if parentID := getParent(md); parentID != "" {
var err error
parent, err = cm.get(ctx, parentID, append(opts, NoUpdateLastUsed)...)
if err != nil {
return nil, err
}
@ -388,6 +400,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti
var parent *immutableRef
var parentID string
var parentSnapshotID string
if s != nil {
p, err := cm.Get(ctx, s.ID(), NoUpdateLastUsed)
if err != nil {
@ -396,8 +409,9 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti
if err := p.Finalize(ctx, true); err != nil {
return nil, err
}
parentID = p.ID()
parent = p.(*immutableRef)
parentSnapshotID = getSnapshotID(parent.md)
parentID = parent.ID()
}
defer func() {
@ -406,7 +420,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti
}
}()
if err := cm.Snapshotter.Prepare(ctx, id, parentID); err != nil {
if err := cm.Snapshotter.Prepare(ctx, id, parentSnapshotID); err != nil {
return nil, errors.Wrapf(err, "failed to prepare %s", id)
}
l, err := cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error {
@ -504,8 +518,10 @@ func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opt
cm.muPrune.Unlock()
if _, err := cm.ManagerOpt.GarbageCollect(ctx); err != nil {
return err
if cm.GarbageCollect != nil {
if _, err := cm.GarbageCollect(ctx); err != nil {
return err
}
}
return nil
@ -519,7 +535,7 @@ func (cm *cacheManager) pruneOnce(ctx context.Context, ch chan client.UsageInfo,
var check ExternalRefChecker
if f := cm.PruneRefChecker; f != nil && (!opt.All || len(opt.Filter) > 0) {
c, err := f()
c, err := f(cm)
if err != nil {
return errors.WithStack(err)
}
@ -731,7 +747,7 @@ func (cm *cacheManager) markShared(m map[string]*cacheUsageInfo) error {
if cm.PruneRefChecker == nil {
return nil
}
c, err := cm.PruneRefChecker()
c, err := cm.PruneRefChecker(cm)
if err != nil {
return errors.WithStack(err)
}
@ -1059,3 +1075,15 @@ func sortDeleteRecords(toDelete []*deleteRecord) {
float64(toDelete[j].usageCountIndex)/float64(maxUsageCountIndex)
})
}
func diffIDFromDescriptor(desc ocispec.Descriptor) (digest.Digest, error) {
diffIDStr, ok := desc.Annotations["containerd.io/uncompressed"]
if !ok {
return "", errors.Errorf("missing uncompressed annotation for %s", desc.Digest)
}
diffID, err := digest.Parse(diffIDStr)
if err != nil {
return "", errors.Wrapf(err, "failed to parse diffID %q for %s", diffIDStr, desc.Digest)
}
return diffID, nil
}

401
cache/manager_test.go vendored
View File

@ -13,6 +13,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/diff/apply"
"github.com/containerd/containerd/leases"
ctdmetadata "github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
@ -122,6 +123,7 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func()
ContentStore: mdb.ContentStore(),
LeaseManager: leaseutil.WithNamespace(lm, ns),
GarbageCollect: mdb.GarbageCollect,
Applier: apply.NewFileSystemApplier(mdb.ContentStore()),
})
if err != nil {
return nil, nil, err
@ -266,6 +268,245 @@ func TestManager(t *testing.T) {
require.Equal(t, 0, len(dirs))
}
func TestSnapshotExtract(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
cm := co.manager
b, desc, err := mapToBlob(map[string]string{"foo": "bar"})
require.NoError(t, err)
err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), desc)
require.NoError(t, err)
snap, err := cm.GetByBlob(ctx, desc, nil)
require.NoError(t, err)
require.Equal(t, false, snap.Info().Extracted)
b2, desc2, err := mapToBlob(map[string]string{"foo": "bar123"})
require.NoError(t, err)
err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b2), desc2)
require.NoError(t, err)
snap2, err := cm.GetByBlob(ctx, desc2, snap)
require.NoError(t, err)
size, err := snap2.Size(ctx)
require.NoError(t, err)
require.Equal(t, int64(len(b2)), size)
require.Equal(t, false, snap2.Info().Extracted)
dirs, err := ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots"))
require.NoError(t, err)
require.Equal(t, 0, len(dirs))
checkNumBlobs(ctx, t, co.cs, 2)
err = snap2.Extract(ctx)
require.NoError(t, err)
require.Equal(t, true, snap.Info().Extracted)
require.Equal(t, true, snap2.Info().Extracted)
dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots"))
require.NoError(t, err)
require.Equal(t, 2, len(dirs))
buf := pruneResultBuffer()
err = cm.Prune(ctx, buf.C, client.PruneInfo{})
buf.close()
require.NoError(t, err)
checkDiskUsage(ctx, t, cm, 2, 0)
require.Equal(t, len(buf.all), 0)
dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots"))
require.NoError(t, err)
require.Equal(t, 2, len(dirs))
checkNumBlobs(ctx, t, co.cs, 2)
id := snap.ID()
err = snap.Release(context.TODO())
require.NoError(t, err)
buf = pruneResultBuffer()
err = cm.Prune(ctx, buf.C, client.PruneInfo{})
buf.close()
require.NoError(t, err)
checkDiskUsage(ctx, t, cm, 2, 0)
dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots"))
require.NoError(t, err)
require.Equal(t, 2, len(dirs))
snap, err = cm.Get(ctx, id)
require.NoError(t, err)
checkDiskUsage(ctx, t, cm, 2, 0)
err = snap2.Release(context.TODO())
require.NoError(t, err)
checkDiskUsage(ctx, t, cm, 1, 1)
buf = pruneResultBuffer()
err = cm.Prune(ctx, buf.C, client.PruneInfo{})
buf.close()
require.NoError(t, err)
checkDiskUsage(ctx, t, cm, 1, 0)
require.Equal(t, len(buf.all), 1)
dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots"))
require.NoError(t, err)
require.Equal(t, 1, len(dirs))
checkNumBlobs(ctx, t, co.cs, 1)
err = snap.Release(context.TODO())
require.NoError(t, err)
buf = pruneResultBuffer()
err = cm.Prune(ctx, buf.C, client.PruneInfo{})
buf.close()
require.NoError(t, err)
checkDiskUsage(ctx, t, cm, 0, 0)
dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots"))
require.NoError(t, err)
require.Equal(t, 0, len(dirs))
checkNumBlobs(ctx, t, co.cs, 0)
}
func TestExtractOnMutable(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
cm := co.manager
active, err := cm.New(ctx, nil)
require.NoError(t, err)
snap, err := active.Commit(ctx)
require.NoError(t, err)
b, desc, err := mapToBlob(map[string]string{"foo": "bar"})
require.NoError(t, err)
err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), desc)
require.NoError(t, err)
b2, desc2, err := mapToBlob(map[string]string{"foo2": "1"})
require.NoError(t, err)
err = content.WriteBlob(ctx, co.cs, "ref2", bytes.NewBuffer(b2), desc2)
require.NoError(t, err)
snap2, err := cm.GetByBlob(ctx, desc2, snap)
require.Error(t, err)
err = snap.SetBlob(ctx, desc)
require.NoError(t, err)
snap2, err = cm.GetByBlob(ctx, desc2, snap)
require.NoError(t, err)
err = snap.Release(context.TODO())
require.NoError(t, err)
require.Equal(t, false, snap2.Info().Extracted)
size, err := snap2.Size(ctx)
require.NoError(t, err)
require.Equal(t, int64(len(b2)), size)
dirs, err := ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots"))
require.NoError(t, err)
require.Equal(t, 1, len(dirs))
checkNumBlobs(ctx, t, co.cs, 2)
err = snap2.Extract(ctx)
require.NoError(t, err)
require.Equal(t, true, snap.Info().Extracted)
require.Equal(t, true, snap2.Info().Extracted)
buf := pruneResultBuffer()
err = cm.Prune(ctx, buf.C, client.PruneInfo{})
buf.close()
require.NoError(t, err)
checkDiskUsage(ctx, t, cm, 2, 0)
require.Equal(t, len(buf.all), 0)
dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots"))
require.NoError(t, err)
require.Equal(t, 2, len(dirs))
err = snap2.Release(context.TODO())
require.NoError(t, err)
checkDiskUsage(ctx, t, cm, 0, 2)
buf = pruneResultBuffer()
err = cm.Prune(ctx, buf.C, client.PruneInfo{})
buf.close()
require.NoError(t, err)
checkDiskUsage(ctx, t, cm, 0, 0)
require.Equal(t, len(buf.all), 2)
dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots"))
require.NoError(t, err)
require.Equal(t, 0, len(dirs))
checkNumBlobs(ctx, t, co.cs, 0)
}
func TestSetBlob(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
@ -302,27 +543,31 @@ func TestSetBlob(t *testing.T) {
ctx, clean, err := leaseutil.WithLease(ctx, co.lm)
b, diffID, err := mapToBlob(map[string]string{"foo": "bar"})
b, desc, err := mapToBlob(map[string]string{"foo": "bar"})
require.NoError(t, err)
err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), ocispec.Descriptor{})
err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), desc)
require.NoError(t, err)
err = snap.SetBlob(ctx, diffID, digest.FromBytes([]byte("foobar")))
err = snap.SetBlob(ctx, ocispec.Descriptor{
Digest: digest.FromBytes([]byte("foobar")),
Annotations: map[string]string{
"containerd.io/uncompressed": digest.FromBytes([]byte("foobar2")).String(),
},
})
require.Error(t, err)
blobDgst := digest.FromBytes(b)
err = snap.SetBlob(ctx, diffID, blobDgst)
err = snap.SetBlob(ctx, desc)
require.NoError(t, err)
info = snap.Info()
require.Equal(t, diffID, info.DiffID)
require.Equal(t, blobDgst, info.Blob)
require.Equal(t, diffID, info.ChainID)
require.Equal(t, digest.FromBytes([]byte(blobDgst+" "+diffID)), info.BlobChainID)
require.Equal(t, desc.Annotations["containerd.io/uncompressed"], string(info.DiffID))
require.Equal(t, desc.Digest, info.Blob)
require.Equal(t, desc.MediaType, info.MediaType)
require.Equal(t, info.DiffID, info.ChainID)
require.Equal(t, digest.FromBytes([]byte(desc.Digest+" "+info.DiffID)), info.BlobChainID)
require.Equal(t, snap.ID(), info.SnapshotID)
require.Equal(t, info.Extracted, true)
blobChain1 := info.BlobChainID
active, err = cm.New(ctx, snap)
require.NoError(t, err)
@ -330,92 +575,93 @@ func TestSetBlob(t *testing.T) {
snap2, err := active.Commit(ctx)
require.NoError(t, err)
b2, diffID2, err := mapToBlob(map[string]string{"foo2": "bar2"})
require.NoError(t, err)
blobDgst2 := digest.FromBytes(b2)
err = content.WriteBlob(ctx, co.cs, "ref2", bytes.NewBuffer(b2), ocispec.Descriptor{})
b2, desc2, err := mapToBlob(map[string]string{"foo2": "bar2"})
require.NoError(t, err)
err = snap2.SetBlob(ctx, diffID2, blobDgst2)
err = content.WriteBlob(ctx, co.cs, "ref2", bytes.NewBuffer(b2), desc2)
require.NoError(t, err)
info = snap2.Info()
require.Equal(t, diffID2, info.DiffID)
require.Equal(t, blobDgst2, info.Blob)
require.Equal(t, digest.FromBytes([]byte(diffID+" "+diffID2)), info.ChainID)
require.Equal(t, digest.FromBytes([]byte(blobChain1+" "+digest.FromBytes([]byte(blobDgst2+" "+diffID2)))), info.BlobChainID)
require.Equal(t, snap2.ID(), info.SnapshotID)
require.Equal(t, info.Extracted, true)
b3, diffID3, err := mapToBlob(map[string]string{"foo3": "bar3"})
require.NoError(t, err)
blobDgst3 := digest.FromBytes(b3)
err = content.WriteBlob(ctx, co.cs, "ref3", bytes.NewBuffer(b3), ocispec.Descriptor{})
err = snap2.SetBlob(ctx, desc2)
require.NoError(t, err)
snap3, err := cm.GetByBlob(ctx, diffID3, blobDgst3, snap)
info2 := snap2.Info()
require.Equal(t, desc2.Annotations["containerd.io/uncompressed"], string(info2.DiffID))
require.Equal(t, desc2.Digest, info2.Blob)
require.Equal(t, desc2.MediaType, info2.MediaType)
require.Equal(t, digest.FromBytes([]byte(info.ChainID+" "+info2.DiffID)), info2.ChainID)
require.Equal(t, digest.FromBytes([]byte(info.BlobChainID+" "+digest.FromBytes([]byte(desc2.Digest+" "+info2.DiffID)))), info2.BlobChainID)
require.Equal(t, snap2.ID(), info2.SnapshotID)
require.Equal(t, info2.Extracted, true)
b3, desc3, err := mapToBlob(map[string]string{"foo3": "bar3"})
require.NoError(t, err)
info = snap3.Info()
require.Equal(t, diffID3, info.DiffID)
require.Equal(t, blobDgst3, info.Blob)
require.Equal(t, digest.FromBytes([]byte(diffID+" "+diffID3)), info.ChainID)
blobChain3 := digest.FromBytes([]byte(blobChain1 + " " + digest.FromBytes([]byte(blobDgst3+" "+diffID3))))
require.Equal(t, blobChain3, info.BlobChainID)
require.Equal(t, string(info.ChainID), info.SnapshotID)
require.Equal(t, info.Extracted, false)
err = content.WriteBlob(ctx, co.cs, "ref3", bytes.NewBuffer(b3), desc3)
require.NoError(t, err)
snap3, err := cm.GetByBlob(ctx, desc3, snap)
require.NoError(t, err)
info3 := snap3.Info()
require.Equal(t, desc3.Annotations["containerd.io/uncompressed"], string(info3.DiffID))
require.Equal(t, desc3.Digest, info3.Blob)
require.Equal(t, desc3.MediaType, info3.MediaType)
require.Equal(t, digest.FromBytes([]byte(info.ChainID+" "+info3.DiffID)), info3.ChainID)
require.Equal(t, digest.FromBytes([]byte(info.BlobChainID+" "+digest.FromBytes([]byte(desc3.Digest+" "+info3.DiffID)))), info3.BlobChainID)
require.Equal(t, string(info3.ChainID), info3.SnapshotID)
require.Equal(t, info3.Extracted, false)
// snap4 is same as snap2
_, _, err = mapToBlob(map[string]string{"foo2": "bar2"})
require.NoError(t, err)
snap4, err := cm.GetByBlob(ctx, diffID2, blobDgst2, snap)
snap4, err := cm.GetByBlob(ctx, desc2, snap)
require.NoError(t, err)
require.Equal(t, snap2.ID(), snap4.ID())
// snap5 is same different blob but same diffID as snap2
b5, _, err := mapToBlob(map[string]string{"foo5": "bar5"})
b5, desc5, err := mapToBlob(map[string]string{"foo5": "bar5"})
require.NoError(t, err)
err = content.WriteBlob(ctx, co.cs, "ref5", bytes.NewBuffer(b5), ocispec.Descriptor{})
desc5.Annotations["containerd.io/uncompressed"] = info2.DiffID.String()
err = content.WriteBlob(ctx, co.cs, "ref5", bytes.NewBuffer(b5), desc5)
require.NoError(t, err)
blobDgst5 := digest.FromBytes(b5)
snap5, err := cm.GetByBlob(ctx, diffID2, blobDgst5, snap)
snap5, err := cm.GetByBlob(ctx, desc5, snap)
require.NoError(t, err)
require.NotEqual(t, snap2.ID(), snap5.ID())
require.Equal(t, snap2.Info().SnapshotID, snap5.Info().SnapshotID)
require.Equal(t, diffID2, snap5.Info().DiffID)
require.Equal(t, blobDgst5, snap5.Info().Blob)
require.Equal(t, info2.DiffID, snap5.Info().DiffID)
require.Equal(t, desc5.Digest, snap5.Info().Blob)
require.Equal(t, snap2.Info().ChainID, snap5.Info().ChainID)
require.NotEqual(t, snap2.Info().BlobChainID, snap5.Info().BlobChainID)
require.Equal(t, digest.FromBytes([]byte(blobChain1+" "+digest.FromBytes([]byte(blobDgst5+" "+diffID2)))), snap5.Info().BlobChainID)
require.Equal(t, digest.FromBytes([]byte(info.BlobChainID+" "+digest.FromBytes([]byte(desc5.Digest+" "+info2.DiffID)))), snap5.Info().BlobChainID)
// snap6 is a child of snap3
b6, diffID6, err := mapToBlob(map[string]string{"foo6": "bar6"})
require.NoError(t, err)
blobDgst6 := digest.FromBytes(b6)
err = content.WriteBlob(ctx, co.cs, "ref6", bytes.NewBuffer(b6), ocispec.Descriptor{})
b6, desc6, err := mapToBlob(map[string]string{"foo6": "bar6"})
require.NoError(t, err)
snap6, err := cm.GetByBlob(ctx, diffID6, blobDgst6, snap3)
err = content.WriteBlob(ctx, co.cs, "ref6", bytes.NewBuffer(b6), desc6)
require.NoError(t, err)
info = snap6.Info()
require.Equal(t, diffID6, info.DiffID)
require.Equal(t, blobDgst6, info.Blob)
require.Equal(t, digest.FromBytes([]byte(snap3.Info().ChainID+" "+diffID6)), info.ChainID)
require.Equal(t, digest.FromBytes([]byte(blobChain3+" "+digest.FromBytes([]byte(blobDgst6+" "+diffID6)))), info.BlobChainID)
require.Equal(t, string(info.ChainID), info.SnapshotID)
require.Equal(t, info.Extracted, false)
snap6, err := cm.GetByBlob(ctx, desc6, snap3)
require.NoError(t, err)
_, err = cm.GetByBlob(ctx, diffID6, digest.FromBytes([]byte("notexist")), snap3)
info6 := snap6.Info()
require.Equal(t, desc6.Annotations["containerd.io/uncompressed"], string(info6.DiffID))
require.Equal(t, desc6.Digest, info6.Blob)
require.Equal(t, digest.FromBytes([]byte(snap3.Info().ChainID+" "+info6.DiffID)), info6.ChainID)
require.Equal(t, digest.FromBytes([]byte(info3.BlobChainID+" "+digest.FromBytes([]byte(info6.Blob+" "+info6.DiffID)))), info6.BlobChainID)
require.Equal(t, string(info6.ChainID), info6.SnapshotID)
require.Equal(t, info6.Extracted, false)
_, err = cm.GetByBlob(ctx, ocispec.Descriptor{
Digest: digest.FromBytes([]byte("notexist")),
Annotations: map[string]string{
"containerd.io/uncompressed": digest.FromBytes([]byte("notexist")).String(),
},
}, snap3)
require.Error(t, err)
clean(context.TODO())
@ -702,6 +948,16 @@ func checkDiskUsage(ctx context.Context, t *testing.T, cm Manager, inuse, unused
require.Equal(t, unused, unusedActual)
}
func checkNumBlobs(ctx context.Context, t *testing.T, cs content.Store, expected int) {
c := 0
err := cs.Walk(ctx, func(_ content.Info) error {
c++
return nil
})
require.NoError(t, err)
require.Equal(t, expected, c)
}
func pruneResultBuffer() *buf {
b := &buf{C: make(chan client.UsageInfo), closed: make(chan struct{})}
go func() {
@ -724,7 +980,7 @@ func (b *buf) close() {
<-b.closed
}
func mapToBlob(m map[string]string) ([]byte, digest.Digest, error) {
func mapToBlob(m map[string]string) ([]byte, ocispec.Descriptor, error) {
buf := bytes.NewBuffer(nil)
gz := gzip.NewWriter(buf)
sha := digest.SHA256.Digester()
@ -735,17 +991,24 @@ func mapToBlob(m map[string]string) ([]byte, digest.Digest, error) {
Name: k,
Size: int64(len(v)),
}); err != nil {
return nil, "", err
return nil, ocispec.Descriptor{}, err
}
if _, err := tw.Write([]byte(v)); err != nil {
return nil, "", err
return nil, ocispec.Descriptor{}, err
}
}
if err := tw.Close(); err != nil {
return nil, "", err
return nil, ocispec.Descriptor{}, err
}
if err := gz.Close(); err != nil {
return nil, "", err
return nil, ocispec.Descriptor{}, err
}
return buf.Bytes(), sha.Digest(), nil
return buf.Bytes(), ocispec.Descriptor{
Digest: digest.FromBytes(buf.Bytes()),
MediaType: ocispec.MediaTypeImageLayerGzip,
Size: int64(buf.Len()),
Annotations: map[string]string{
"containerd.io/uncompressed": sha.Digest().String(),
},
}, nil
}

29
cache/metadata.go vendored
View File

@ -27,6 +27,7 @@ const keyBlobChainID = "cache.blobChainID"
const keyBlob = "cache.blob"
const keySnapshot = "cache.snapshot"
const keyBlobOnly = "cache.blobonly"
const keyMediaType = "cache.mediatype"
const keyDeleted = "cache.deleted"
@ -44,6 +45,32 @@ func queueDiffID(si *metadata.StorageItem, str string) error {
return nil
}
func getMediaType(si *metadata.StorageItem) string {
v := si.Get(keyMediaType)
if v == nil {
return si.ID()
}
var str string
if err := v.Unmarshal(&str); err != nil {
return ""
}
return str
}
func queueMediaType(si *metadata.StorageItem, str string) error {
if str == "" {
return nil
}
v, err := metadata.NewValue(str)
if err != nil {
return errors.Wrap(err, "failed to create mediaType value")
}
si.Queue(func(b *bolt.Bucket) error {
return si.SetValue(b, keyMediaType, v)
})
return nil
}
func getSnapshotID(si *metadata.StorageItem) string {
v := si.Get(keySnapshot)
if v == nil {
@ -64,7 +91,7 @@ func queueSnapshotID(si *metadata.StorageItem, str string) error {
if err != nil {
return errors.Wrap(err, "failed to create chainID value")
}
si.Update(func(b *bolt.Bucket) error {
si.Queue(func(b *bolt.Bucket) error {
return si.SetValue(b, keySnapshot, v)
})
return nil

112
cache/refs.go vendored
View File

@ -2,12 +2,15 @@ package cache
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/snapshots"
"github.com/docker/docker/pkg/idtools"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/identity"
@ -15,6 +18,7 @@ import (
"github.com/moby/buildkit/util/flightcontrol"
"github.com/opencontainers/go-digest"
imagespaceidentity "github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -36,7 +40,7 @@ type ImmutableRef interface {
Clone() ImmutableRef
Info() RefInfo
SetBlob(ctx context.Context, diffID, blob digest.Digest) error
SetBlob(ctx context.Context, desc ocispec.Descriptor) error
Extract(ctx context.Context) error // +progress
}
@ -46,6 +50,7 @@ type RefInfo struct {
BlobChainID digest.Digest
DiffID digest.Digest
Blob digest.Digest
MediaType string
Extracted bool
}
@ -121,15 +126,27 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) {
driverID = getSnapshotID(cr.equalMutable.md)
}
cr.mu.Unlock()
usage, err := cr.cm.ManagerOpt.Snapshotter.Usage(ctx, driverID)
if err != nil {
cr.mu.Lock()
isDead := cr.isDead()
cr.mu.Unlock()
if isDead {
return int64(0), nil
var usage snapshots.Usage
if !getBlobOnly(cr.md) {
var err error
usage, err = cr.cm.ManagerOpt.Snapshotter.Usage(ctx, driverID)
if err != nil {
cr.mu.Lock()
isDead := cr.isDead()
cr.mu.Unlock()
if isDead {
return int64(0), nil
}
if !errdefs.IsNotFound(err) {
return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID())
}
}
}
if dgst := getBlob(cr.md); dgst != "" {
info, err := cr.cm.ContentStore.Info(ctx, digest.Digest(dgst))
if err == nil {
usage.Size += info.Size
}
return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID())
}
cr.mu.Lock()
setSize(cr.md, usage.Size)
@ -238,26 +255,84 @@ func (sr *immutableRef) Clone() ImmutableRef {
return ref
}
func (sr *immutableRef) Extract(ctx context.Context) error {
return errors.Errorf("extract not implemented")
}
func (sr *immutableRef) Info() RefInfo {
return RefInfo{
ChainID: digest.Digest(getChainID(sr.md)),
DiffID: digest.Digest(getDiffID(sr.md)),
Blob: digest.Digest(getBlob(sr.md)),
MediaType: getMediaType(sr.md),
BlobChainID: digest.Digest(getBlobChainID(sr.md)),
SnapshotID: getSnapshotID(sr.md),
Extracted: !getBlobOnly(sr.md),
}
}
func (sr *immutableRef) Extract(ctx context.Context) error {
_, err := sr.sizeG.Do(ctx, sr.ID()+"-extract", func(ctx context.Context) (interface{}, error) {
snapshotID := getSnapshotID(sr.md)
if _, err := sr.cm.Snapshotter.Stat(ctx, snapshotID); err == nil {
queueBlobOnly(sr.md, false)
return nil, sr.md.Commit()
}
parentID := ""
if sr.parent != nil {
if err := sr.parent.Extract(ctx); err != nil {
return nil, err
}
parentID = getSnapshotID(sr.parent.md)
}
info := sr.Info()
key := fmt.Sprintf("extract-%s %s", identity.NewID(), info.ChainID)
err := sr.cm.Snapshotter.Prepare(ctx, key, parentID)
if err != nil {
return nil, err
}
mountable, err := sr.cm.Snapshotter.Mounts(ctx, key)
if err != nil {
return nil, err
}
mounts, unmount, err := mountable.Mount()
if err != nil {
return nil, err
}
_, err = sr.cm.Applier.Apply(ctx, ocispec.Descriptor{
Digest: info.Blob,
MediaType: info.MediaType,
}, mounts)
if err != nil {
unmount()
return nil, err
}
if err := unmount(); err != nil {
return nil, err
}
if err := sr.cm.Snapshotter.Commit(ctx, getSnapshotID(sr.md), key); err != nil {
if !errdefs.IsAlreadyExists(err) {
return nil, err
}
}
queueBlobOnly(sr.md, false)
if err := sr.md.Commit(); err != nil {
return nil, err
}
return nil, nil
})
return err
}
// SetBlob associates a blob with the cache record.
// A lease must be held for the blob when calling this function
// Caller should call Info() for knowing what current values are actually set
func (sr *immutableRef) SetBlob(ctx context.Context, diffID, blob digest.Digest) error {
if _, err := sr.cm.ContentStore.Info(ctx, blob); err != nil {
func (sr *immutableRef) SetBlob(ctx context.Context, desc ocispec.Descriptor) error {
diffID, err := diffIDFromDescriptor(desc)
if err != nil {
return err
}
if _, err := sr.cm.ContentStore.Info(ctx, desc.Digest); err != nil {
return err
}
@ -285,22 +360,23 @@ func (sr *immutableRef) SetBlob(ctx context.Context, diffID, blob digest.Digest)
}
if err := sr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{
ID: blob.String(),
ID: desc.Digest.String(),
Type: "content",
}); err != nil {
return err
}
queueDiffID(sr.md, diffID.String())
queueBlob(sr.md, blob.String())
queueBlob(sr.md, desc.Digest.String())
chainID := diffID
blobChainID := imagespaceidentity.ChainID([]digest.Digest{blob, diffID})
blobChainID := imagespaceidentity.ChainID([]digest.Digest{desc.Digest, diffID})
if parentChainID != "" {
chainID = imagespaceidentity.ChainID([]digest.Digest{parentChainID, chainID})
blobChainID = imagespaceidentity.ChainID([]digest.Digest{parentBlobChainID, blobChainID})
}
queueChainID(sr.md, chainID.String())
queueBlobChainID(sr.md, blobChainID.String())
queueMediaType(sr.md, desc.MediaType)
if err := sr.md.Commit(); err != nil {
return err
}

View File

@ -157,7 +157,7 @@ func (ic *ImageWriter) exportLayers(ctx context.Context, refs ...cache.Immutable
for i, ref := range refs {
func(i int, ref cache.ImmutableRef) {
eg.Go(func() error {
diffPairs, err := blobs.GetDiffPairs(ctx, ic.opt.ContentStore, ic.opt.Snapshotter, ic.opt.Differ, ref, true)
diffPairs, err := blobs.GetDiffPairs(ctx, ic.opt.ContentStore, ic.opt.Differ, ref, true)
if err != nil {
return errors.Wrap(err, "failed calculating diff pairs for exported snapshot")
}

View File

@ -9,6 +9,7 @@ import (
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/snapshot"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
)
@ -17,7 +18,7 @@ const blobKey = "blobmapping.blob"
type Opt struct {
Content content.Store
Snapshotter snapshot.SnapshotterBase
Snapshotter snapshot.Snapshotter
MetadataStore *metadata.Store
}
@ -34,14 +35,14 @@ type DiffPair struct {
// this snapshotter keeps an internal mapping between a snapshot and a blob
type Snapshotter struct {
snapshot.SnapshotterBase
snapshot.Snapshotter
opt Opt
}
func NewSnapshotter(opt Opt) snapshot.Snapshotter {
s := &Snapshotter{
SnapshotterBase: opt.Snapshotter,
opt: opt,
Snapshotter: opt.Snapshotter,
opt: opt,
}
return s
@ -60,7 +61,7 @@ func (s *Snapshotter) Remove(ctx context.Context, key string) error {
return err
}
if err := s.SnapshotterBase.Remove(ctx, key); err != nil {
if err := s.Snapshotter.Remove(ctx, key); err != nil {
return err
}
@ -73,7 +74,9 @@ func (s *Snapshotter) Remove(ctx context.Context, key string) error {
}
func (s *Snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
u, err := s.SnapshotterBase.Usage(ctx, key)
return snapshots.Usage{}, errors.Errorf("to-be-removed")
u, err := s.Snapshotter.Usage(ctx, key)
if err != nil {
return snapshots.Usage{}, err
}
@ -92,6 +95,8 @@ func (s *Snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, e
}
func (s *Snapshotter) GetBlob(ctx context.Context, key string) (digest.Digest, digest.Digest, error) {
return "", "", errors.Errorf("to-be-removed")
md, _ := s.opt.MetadataStore.Get(key)
v := md.Get(blobKey)
if v == nil {
@ -108,6 +113,7 @@ func (s *Snapshotter) GetBlob(ctx context.Context, key string) (digest.Digest, d
// Checks that there is a blob in the content store.
// If same blob has already been set then this is a noop.
func (s *Snapshotter) SetBlob(ctx context.Context, key string, diffID, blobsum digest.Digest) error {
return errors.Errorf("SetBlob should not be called")
info, err := s.opt.Content.Info(ctx, blobsum)
if err != nil {
return err

View File

@ -2,25 +2,23 @@ package containerd
import (
"context"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
type garbageCollectFn func(context.Context) error
func NewContentStore(store content.Store, ns string, gc func(context.Context) error) content.Store {
return &noGCContentStore{&nsContent{ns, store, gc}}
func NewContentStore(store content.Store, ns string) content.Store {
return &nsContent{ns, store}
}
type nsContent struct {
ns string
content.Store
gc garbageCollectFn
}
func (c *nsContent) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
@ -39,16 +37,7 @@ func (c *nsContent) Walk(ctx context.Context, fn content.WalkFunc, filters ...st
}
func (c *nsContent) Delete(ctx context.Context, dgst digest.Digest) error {
ctx = namespaces.WithNamespace(ctx, c.ns)
if _, err := c.Update(ctx, content.Info{
Digest: dgst,
}, "labels.containerd.io/gc.root"); err != nil {
return err
} // calling snapshotter.Remove here causes a race in containerd
if c.gc == nil {
return nil
}
return c.gc(ctx)
return errors.Errorf("contentstore.Delete usage is forbidden")
}
func (c *nsContent) Status(ctx context.Context, ref string) (content.Status, error) {
@ -76,31 +65,12 @@ func (c *nsContent) Writer(ctx context.Context, opts ...content.WriterOpt) (cont
}
func (c *nsContent) writer(ctx context.Context, retries int, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
_, noRoot := wOpts.Desc.Annotations["buildkit/noroot"]
delete(wOpts.Desc.Annotations, "buildkit/noroot")
opts = append(opts, content.WithDescriptor(wOpts.Desc))
ctx = namespaces.WithNamespace(ctx, c.ns)
w, err := c.Store.Writer(ctx, opts...)
if err != nil {
if !noRoot && errdefs.IsAlreadyExists(err) && wOpts.Desc.Digest != "" && retries > 0 {
_, err2 := c.Update(ctx, content.Info{
Digest: wOpts.Desc.Digest,
Labels: map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
},
}, "labels.containerd.io/gc.root")
if err2 != nil {
return c.writer(ctx, retries-1, opts...)
}
}
return nil, err
}
return &nsWriter{Writer: w, ns: c.ns}, err
return &nsWriter{Writer: w, ns: c.ns}, nil
}
type nsWriter struct {
@ -113,25 +83,25 @@ func (w *nsWriter) Commit(ctx context.Context, size int64, expected digest.Diges
return w.Writer.Commit(ctx, size, expected, opts...)
}
type noGCContentStore struct {
content.Store
}
type noGCWriter struct {
content.Writer
}
// type noGCContentStore struct {
// content.Store
// }
// type noGCWriter struct {
// content.Writer
// }
func (cs *noGCContentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
w, err := cs.Store.Writer(ctx, opts...)
return &noGCWriter{w}, err
}
// func (cs *noGCContentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
// w, err := cs.Store.Writer(ctx, opts...)
// return &noGCWriter{w}, err
// }
func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
opts = append(opts, func(info *content.Info) error {
if info.Labels == nil {
info.Labels = map[string]string{}
}
info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano)
return nil
})
return w.Writer.Commit(ctx, size, expected, opts...)
}
// func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
// opts = append(opts, func(info *content.Info) error {
// if info.Labels == nil {
// info.Labels = map[string]string{}
// }
// info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano)
// return nil
// })
// return w.Writer.Commit(ctx, size, expected, opts...)
// }

View File

@ -3,55 +3,38 @@ package containerd
import (
"context"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
ctdsnapshot "github.com/containerd/containerd/snapshots"
"github.com/containerd/containerd/snapshots"
"github.com/docker/docker/pkg/idtools"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/snapshot/blobmapping"
"github.com/pkg/errors"
)
func NewSnapshotter(name string, snapshotter ctdsnapshot.Snapshotter, store content.Store, mdstore *metadata.Store, ns string, gc func(context.Context) error, idmap *idtools.IdentityMapping) snapshot.Snapshotter {
return blobmapping.NewSnapshotter(blobmapping.Opt{
Content: store,
Snapshotter: snapshot.FromContainerdSnapshotter(name, &nsSnapshotter{ns, snapshotter, gc}, idmap),
MetadataStore: mdstore,
})
func NewSnapshotter(name string, snapshotter snapshots.Snapshotter, ns string, idmap *idtools.IdentityMapping) snapshot.Snapshotter {
return snapshot.FromContainerdSnapshotter(name, &nsSnapshotter{ns, snapshotter}, idmap)
}
func NSSnapshotter(ns string, snapshotter ctdsnapshot.Snapshotter) ctdsnapshot.Snapshotter {
func NSSnapshotter(ns string, snapshotter snapshots.Snapshotter) snapshots.Snapshotter {
return &nsSnapshotter{ns: ns, Snapshotter: snapshotter}
}
type nsSnapshotter struct {
ns string
ctdsnapshot.Snapshotter
gc garbageCollectFn
snapshots.Snapshotter
}
func (s *nsSnapshotter) Stat(ctx context.Context, key string) (ctdsnapshot.Info, error) {
func (s *nsSnapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
ctx = namespaces.WithNamespace(ctx, s.ns)
info, err := s.Snapshotter.Stat(ctx, key)
/* if err == nil {
if _, ok := info.Labels["labels.containerd.io/gc.root"]; !ok {
if err := addRootLabel()(&info); err != nil {
return info, err
}
return s.Update(ctx, info, "labels.containerd.io/gc.root")
}
}*/
return info, err
return s.Snapshotter.Stat(ctx, key)
}
func (s *nsSnapshotter) Update(ctx context.Context, info ctdsnapshot.Info, fieldpaths ...string) (ctdsnapshot.Info, error) {
func (s *nsSnapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
ctx = namespaces.WithNamespace(ctx, s.ns)
return s.Snapshotter.Update(ctx, info, fieldpaths...)
}
func (s *nsSnapshotter) Usage(ctx context.Context, key string) (ctdsnapshot.Usage, error) {
func (s *nsSnapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
ctx = namespaces.WithNamespace(ctx, s.ns)
return s.Snapshotter.Usage(ctx, key)
}
@ -59,47 +42,22 @@ func (s *nsSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount,
ctx = namespaces.WithNamespace(ctx, s.ns)
return s.Snapshotter.Mounts(ctx, key)
}
func (s *nsSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) {
func (s *nsSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
ctx = namespaces.WithNamespace(ctx, s.ns)
return s.Snapshotter.Prepare(ctx, key, parent, addRootLabel(opts...))
return s.Snapshotter.Prepare(ctx, key, parent, opts...)
}
func (s *nsSnapshotter) View(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) {
func (s *nsSnapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
ctx = namespaces.WithNamespace(ctx, s.ns)
return s.Snapshotter.View(ctx, key, parent, addRootLabel(opts...))
return s.Snapshotter.View(ctx, key, parent, opts...)
}
func (s *nsSnapshotter) Commit(ctx context.Context, name, key string, opts ...ctdsnapshot.Opt) error {
func (s *nsSnapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
ctx = namespaces.WithNamespace(ctx, s.ns)
return s.Snapshotter.Commit(ctx, name, key, addRootLabel(opts...))
return s.Snapshotter.Commit(ctx, name, key, opts...)
}
func (s *nsSnapshotter) Remove(ctx context.Context, key string) error {
return errors.Errorf("remove should not be called directly")
ctx = namespaces.WithNamespace(ctx, s.ns)
if _, err := s.Update(ctx, ctdsnapshot.Info{
Name: key,
}, "labels.containerd.io/gc.root"); err != nil {
return err
} // calling snapshotter.Remove here causes a race in containerd
if s.gc == nil {
return nil
}
return s.gc(ctx)
return errors.Errorf("calling snapshotter.Remove is forbidden")
}
func (s *nsSnapshotter) Walk(ctx context.Context, fn func(context.Context, ctdsnapshot.Info) error) error {
func (s *nsSnapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error {
ctx = namespaces.WithNamespace(ctx, s.ns)
return s.Snapshotter.Walk(ctx, fn)
}
func addRootLabel(opts ...ctdsnapshot.Opt) ctdsnapshot.Opt {
return func(info *ctdsnapshot.Info) error {
for _, opt := range opts {
if err := opt(info); err != nil {
return err
}
}
// if info.Labels == nil {
// info.Labels = map[string]string{}
// }
// info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano)
return nil
}
}

View File

@ -9,7 +9,6 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/snapshot"
digest "github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
@ -20,27 +19,27 @@ const (
)
type Opt struct {
Snapshotter snapshot.Snapshotter
ImageStore images.Store
ContentStore content.Provider
ContentStore content.Store
}
// New creates new image reference checker that can be used to see if a reference
// is being used by any of the images in the image store
func New(opt Opt) cache.ExternalRefCheckerFunc {
return func() (cache.ExternalRefChecker, error) {
return &checker{opt: opt}, nil
return func(cm cache.Accessor) (cache.ExternalRefChecker, error) {
return &Checker{opt: opt, cm: cm}, nil
}
}
type checker struct {
type Checker struct {
cm cache.Accessor
opt Opt
once sync.Once
images map[string]struct{}
cache map[string]bool
}
func (c *checker) Exists(key string) bool {
func (c *Checker) Exists(key string) bool {
if c.opt.ImageStore == nil {
return false
}
@ -62,26 +61,26 @@ func (c *checker) Exists(key string) bool {
return ok
}
func (c *checker) getLayers(key string) ([]specs.Descriptor, error) {
_, blob, err := c.opt.Snapshotter.GetBlob(context.TODO(), key)
func (c *Checker) getLayers(key string) ([]specs.Descriptor, error) {
ref, err := c.cm.Get(context.TODO(), key)
if err != nil {
return nil, err
}
stat, err := c.opt.Snapshotter.Stat(context.TODO(), key)
if err != nil {
return nil, err
info := ref.Info()
if info.Blob == "" {
return nil, errors.Errorf("layer without blob")
}
var layers []specs.Descriptor
if parent := stat.Parent; parent != "" {
layers, err = c.getLayers(parent)
if parent := ref.Parent(); parent != nil {
layers, err = c.getLayers(parent.ID())
if err != nil {
return nil, err
}
}
return append(layers, specs.Descriptor{Digest: blob}), nil
return append(layers, specs.Descriptor{Digest: info.Blob}), nil
}
func (c *checker) init() {
func (c *Checker) init() {
c.images = map[string]struct{}{}
c.cache = map[string]bool{}
@ -103,7 +102,7 @@ func (c *checker) init() {
}
}
func (c *checker) registerLayers(l []specs.Descriptor) {
func (c *Checker) registerLayers(l []specs.Descriptor) {
if k := layerKey(l); k != "" {
c.images[k] = struct{}{}
}

View File

@ -9,7 +9,6 @@ import (
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/snapshots"
"github.com/docker/docker/pkg/idtools"
digest "github.com/opencontainers/go-digest"
)
type Mountable interface {
@ -18,7 +17,8 @@ type Mountable interface {
IdentityMapping() *idtools.IdentityMapping
}
type SnapshotterBase interface {
// Snapshotter defines interface that any snapshot implementation should satisfy
type Snapshotter interface {
Name() string
Mounts(ctx context.Context, key string) (Mountable, error)
Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) error
@ -34,18 +34,7 @@ type SnapshotterBase interface {
IdentityMapping() *idtools.IdentityMapping
}
// Snapshotter defines interface that any snapshot implementation should satisfy
type Snapshotter interface {
Blobmapper
SnapshotterBase
}
type Blobmapper interface {
GetBlob(ctx context.Context, key string) (digest.Digest, digest.Digest, error)
SetBlob(ctx context.Context, key string, diffID, blob digest.Digest) error
}
func FromContainerdSnapshotter(name string, s snapshots.Snapshotter, idmap *idtools.IdentityMapping) SnapshotterBase {
func FromContainerdSnapshotter(name string, s snapshots.Snapshotter, idmap *idtools.IdentityMapping) Snapshotter {
return &fromContainerd{name: name, Snapshotter: s, idmap: idmap}
}

View File

@ -18,6 +18,7 @@ import (
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/pull"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/winlayers"
@ -104,19 +105,20 @@ func (is *imageSource) Resolve(ctx context.Context, id source.Identifier, sm *se
Src: imageIdentifier.Reference,
Resolver: pull.NewResolver(ctx, is.ResolverOpt, sm, is.ImageStore, imageIdentifier.ResolveMode, imageIdentifier.Reference.String()),
Platform: &platform,
LeaseManager: is.LeaseManager,
}
p := &puller{
CacheAccessor: is.CacheAccessor,
Puller: pullerUtil,
Platform: platform,
id: imageIdentifier,
LeaseManager: is.LeaseManager,
}
return p, nil
}
type puller struct {
CacheAccessor cache.Accessor
LeaseManager leases.Manager
Platform specs.Platform
id *source.ImageIdentifier
*pull.Puller
@ -188,33 +190,51 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
// workaround for gcr, authentication not supported on blob endpoints
pull.EnsureManifestRequested(ctx, p.Puller.Resolver, p.Puller.Src.String())
ctx, done, err := leaseutil.WithLease(ctx, p.LeaseManager)
if err != nil {
return nil, err
}
defer done(ctx)
pulled, err := p.Puller.Pull(ctx)
if err != nil {
return nil, err
}
if pulled.ChainID == "" {
if len(pulled.Layers) == 0 {
return nil, nil
}
ref, err := p.CacheAccessor.GetFromSnapshotter(ctx, string(pulled.ChainID), cache.WithDescription("pulled from "+pulled.Ref))
if err != nil {
return nil, err
var current cache.ImmutableRef
for _, l := range pulled.Layers {
ref, err := p.CacheAccessor.GetByBlob(ctx, l, current, cache.WithDescription("pulled from "+pulled.Ref))
if current != nil {
current.Release(context.TODO())
}
if err != nil {
return nil, err
}
if err := ref.Extract(ctx); err != nil {
ref.Release(context.TODO())
return nil, err
}
current = ref
}
if layerNeedsTypeWindows && ref != nil {
if err := markRefLayerTypeWindows(ref); err != nil {
ref.Release(context.TODO())
if layerNeedsTypeWindows && current != nil {
if err := markRefLayerTypeWindows(current); err != nil {
current.Release(context.TODO())
return nil, err
}
}
if p.id.RecordType != "" && cache.GetRecordType(ref) == "" {
if err := cache.SetRecordType(ref, p.id.RecordType); err != nil {
ref.Release(context.TODO())
if p.id.RecordType != "" && cache.GetRecordType(current) == "" {
if err := cache.SetRecordType(current, p.id.RecordType); err != nil {
current.Release(context.TODO())
return nil, err
}
}
return ref, nil
return current, nil
}
func markRefLayerTypeWindows(ref cache.ImmutableRef) error {

View File

@ -3,7 +3,6 @@ package imageutil
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
@ -94,12 +93,9 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co
}
children := childrenConfigHandler(cache, platform)
if m, ok := cache.(content.Manager); ok {
children = SetChildrenLabelsNonBlobs(m, children)
}
handlers := []images.Handler{
fetchWithoutRoot(remotes.FetchHandler(cache, fetcher)),
remotes.FetchHandler(cache, fetcher),
children,
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
@ -118,16 +114,6 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co
return desc.Digest, dt, nil
}
func fetchWithoutRoot(fetch images.HandlerFunc) images.HandlerFunc {
return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
if desc.Annotations == nil {
desc.Annotations = map[string]string{}
}
desc.Annotations["buildkit/noroot"] = "true"
return fetch(ctx, desc)
}
}
func childrenConfigHandler(provider content.Provider, platform platforms.MatchComparer) images.HandlerFunc {
return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
var descs []specs.Descriptor
@ -207,39 +193,3 @@ func DetectManifestBlobMediaType(dt []byte) (string, error) {
}
return images.MediaTypeDockerSchema2ManifestList, nil
}
func SetChildrenLabelsNonBlobs(manager content.Manager, f images.HandlerFunc) images.HandlerFunc {
return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
children, err := f(ctx, desc)
if err != nil {
return children, err
}
if len(children) > 0 {
info := content.Info{
Digest: desc.Digest,
Labels: map[string]string{},
}
fields := []string{}
for i, ch := range children {
switch ch.MediaType {
case images.MediaTypeDockerSchema2Layer, images.MediaTypeDockerSchema2LayerGzip, specs.MediaTypeImageLayer, specs.MediaTypeImageLayerGzip:
continue
default:
}
info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String()
fields = append(fields, fmt.Sprintf("labels.containerd.io/gc.ref.content.%d", i))
}
if len(info.Labels) > 0 {
_, err := manager.Update(ctx, info, fields...)
if err != nil {
return nil, err
}
}
}
return children, err
}
}

View File

@ -28,25 +28,40 @@ func WithLease(ctx context.Context, ls leases.Manager, opts ...leases.Opt) (cont
}
func WithNamespace(lm leases.Manager, ns string) leases.Manager {
return &nsLM{Manager: lm, ns: ns}
return &nsLM{manager: lm, ns: ns}
}
type nsLM struct {
leases.Manager
ns string
manager leases.Manager
ns string
}
func (l *nsLM) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) {
ctx = namespaces.WithNamespace(ctx, l.ns)
return l.Manager.Create(ctx, opts...)
return l.manager.Create(ctx, opts...)
}
func (l *nsLM) Delete(ctx context.Context, lease leases.Lease, opts ...leases.DeleteOpt) error {
ctx = namespaces.WithNamespace(ctx, l.ns)
return l.Manager.Delete(ctx, lease, opts...)
return l.manager.Delete(ctx, lease, opts...)
}
func (l *nsLM) List(ctx context.Context, filters ...string) ([]leases.Lease, error) {
ctx = namespaces.WithNamespace(ctx, l.ns)
return l.Manager.List(ctx, filters...)
return l.manager.List(ctx, filters...)
}
func (l *nsLM) AddResource(ctx context.Context, lease leases.Lease, resource leases.Resource) error {
ctx = namespaces.WithNamespace(ctx, l.ns)
return l.manager.AddResource(ctx, lease, resource)
}
func (l *nsLM) DeleteResource(ctx context.Context, lease leases.Lease, resource leases.Resource) error {
ctx = namespaces.WithNamespace(ctx, l.ns)
return l.manager.DeleteResource(ctx, lease, resource)
}
func (l *nsLM) ListResources(ctx context.Context, lease leases.Lease) ([]leases.Resource, error) {
ctx = namespaces.WithNamespace(ctx, l.ns)
return l.manager.ListResources(ctx, lease)
}

View File

@ -9,20 +9,15 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/reference"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
"github.com/containerd/containerd/rootfs"
ctdsnapshot "github.com/containerd/containerd/snapshots"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
@ -33,7 +28,6 @@ type Puller struct {
Applier diff.Applier
Src reference.Spec
Platform *ocispec.Platform
LeaseManager leases.Manager
// See NewResolver()
Resolver remotes.Resolver
resolveOnce sync.Once
@ -43,9 +37,10 @@ type Puller struct {
}
type Pulled struct {
Ref string
Descriptor ocispec.Descriptor
ChainID digest.Digest
Ref string
Descriptor ocispec.Descriptor
Layers []ocispec.Descriptor
MetadataBlobs []ocispec.Descriptor
}
func (p *Puller) Resolve(ctx context.Context) (string, ocispec.Descriptor, error) {
@ -98,12 +93,6 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
platform = platforms.Default()
}
ctx, done, err := leaseutil.WithLease(ctx, p.LeaseManager)
if err != nil {
return nil, err
}
defer done(ctx)
ongoing := newJobs(p.ref)
pctx, stopProgress := context.WithCancel(ctx)
@ -132,8 +121,6 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
} else {
// Get all the children for a descriptor
childrenHandler := images.ChildrenHandler(p.ContentStore)
// Set any children labels for that content
childrenHandler = imageutil.SetChildrenLabelsNonBlobs(p.ContentStore, childrenHandler)
// Filter the children by the platform
childrenHandler = images.FilterPlatforms(childrenHandler, platform)
// Limit manifests pulled to the best match in an index
@ -209,86 +196,92 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
}
}
for _, l := range layerBlobs {
labels := map[string]string{}
var fields []string
for _, nl := range notLayerBlobs {
k := "containerd.io/gc.ref.content." + nl.Digest.Hex()[:12]
labels[k] = nl.Digest.String()
fields = append(fields, "labels."+k)
}
if _, err := p.ContentStore.Update(ctx, content.Info{
Digest: l.Digest,
Labels: labels,
}, fields...); err != nil {
return nil, err
}
}
// for _, l := range layerBlobs {
// labels := map[string]string{}
// var fields []string
// for _, nl := range notLayerBlobs {
// k := "containerd.io/gc.ref.content." + nl.Digest.Hex()[:12]
// labels[k] = nl.Digest.String()
// fields = append(fields, "labels."+k)
// }
// if _, err := p.ContentStore.Update(ctx, content.Info{
// Digest: l.Digest,
// Labels: labels,
// }, fields...); err != nil {
// return nil, err
// }
// }
for _, nl := range append(notLayerBlobs, unusedBlobs...) {
if err := p.ContentStore.Delete(ctx, nl.Digest); err != nil {
return nil, err
}
}
// for _, nl := range append(notLayerBlobs, unusedBlobs...) {
// if err := p.ContentStore.Delete(ctx, nl.Digest); err != nil {
// return nil, err
// }
// }
csh, release := snapshot.NewContainerdSnapshotter(p.Snapshotter)
defer release()
unpackProgressDone := oneOffProgress(ctx, "unpacking "+p.Src.String())
chainid, err := unpack(ctx, p.desc, p.ContentStore, csh, p.Snapshotter, p.Applier, platform)
layers, err := getLayers(ctx, p.ContentStore, p.desc, platform)
if err != nil {
return nil, unpackProgressDone(err)
return nil, err
}
unpackProgressDone(nil)
// csh, release := snapshot.NewContainerdSnapshotter(p.Snapshotter)
// defer release()
// unpackProgressDone := oneOffProgress(ctx, "unpacking "+p.Src.String())
// chainid, err := unpack(ctx, p.desc, p.ContentStore, csh, p.Snapshotter, p.Applier, platform)
// if err != nil {
// return nil, unpackProgressDone(err)
// }
// unpackProgressDone(nil)
return &Pulled{
Ref: p.ref,
Descriptor: p.desc,
ChainID: chainid,
Ref: p.ref,
Descriptor: p.desc,
Layers: layers,
MetadataBlobs: notLayerBlobs,
}, nil
}
func unpack(ctx context.Context, desc ocispec.Descriptor, cs content.Store, csh ctdsnapshot.Snapshotter, s snapshot.Snapshotter, applier diff.Applier, platform platforms.MatchComparer) (digest.Digest, error) {
layers, err := getLayers(ctx, cs, desc, platform)
if err != nil {
return "", err
}
// func unpack(ctx context.Context, desc ocispec.Descriptor, cs content.Store, csh ctdsnapshot.Snapshotter, s snapshot.Snapshotter, applier diff.Applier, platform platforms.MatchComparer) (digest.Digest, error) {
// layers, err := getLayers(ctx, cs, desc, platform)
// if err != nil {
// return "", err
// }
var chain []digest.Digest
for _, layer := range layers {
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
}
if _, err := rootfs.ApplyLayer(ctx, layer, chain, csh, applier, ctdsnapshot.WithLabels(labels)); err != nil {
return "", err
}
chain = append(chain, layer.Diff.Digest)
}
chainID := identity.ChainID(chain)
if err != nil {
return "", err
}
// var chain []digest.Digest
// for _, layer := range layers {
// labels := map[string]string{
// "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
// }
// if _, err := rootfs.ApplyLayer(ctx, layer, chain, csh, applier, ctdsnapshot.WithLabels(labels)); err != nil {
// return "", err
// }
// chain = append(chain, layer.Diff.Digest)
// }
// chainID := identity.ChainID(chain)
// if err != nil {
// return "", err
// }
if err := fillBlobMapping(ctx, s, layers); err != nil {
return "", err
}
// if err := fillBlobMapping(ctx, s, layers); err != nil {
// return "", err
// }
return chainID, nil
}
// return chainID, nil
// }
func fillBlobMapping(ctx context.Context, s snapshot.Snapshotter, layers []rootfs.Layer) error {
var chain []digest.Digest
for _, l := range layers {
chain = append(chain, l.Diff.Digest)
chainID := identity.ChainID(chain)
if err := s.SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil {
return err
}
}
return nil
}
// func fillBlobMapping(ctx context.Context, s snapshot.Snapshotter, layers []rootfs.Layer) error {
// var chain []digest.Digest
// for _, l := range layers {
// chain = append(chain, l.Diff.Digest)
// chainID := identity.ChainID(chain)
// if err := s.SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil {
// return err
// }
// }
// return nil
// }
func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Descriptor, platform platforms.MatchComparer) ([]rootfs.Layer, error) {
func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Descriptor, platform platforms.MatchComparer) ([]ocispec.Descriptor, error) {
manifest, err := images.Manifest(ctx, provider, desc, platform)
if err != nil {
return nil, errors.WithStack(err)
@ -301,14 +294,14 @@ func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Desc
if len(diffIDs) != len(manifest.Layers) {
return nil, errors.Errorf("mismatched image rootfs and manifest layers %+v %+v", diffIDs, manifest.Layers)
}
layers := make([]rootfs.Layer, len(diffIDs))
layers := make([]ocispec.Descriptor, len(diffIDs))
for i := range diffIDs {
layers[i].Diff = ocispec.Descriptor{
// TODO: derive media type from compressed type
MediaType: ocispec.MediaTypeImageLayer,
Digest: diffIDs[i],
desc := manifest.Layers[i]
if desc.Annotations == nil {
desc.Annotations = map[string]string{}
}
layers[i].Blob = manifest.Layers[i]
desc.Annotations["containerd.io/uncompressed"] = diffIDs[i].String()
layers[i] = desc
}
return layers, nil
}

View File

@ -12,11 +12,11 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/rootfs"
cdsnapshot "github.com/containerd/containerd/snapshots"
"github.com/docker/docker/pkg/idtools"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/blobs"
@ -48,7 +48,6 @@ import (
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
ociidentity "github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
@ -78,6 +77,7 @@ type WorkerOpt struct {
ResolveOptionsFunc resolver.ResolveOptionsFunc
IdentityMapping *idtools.IdentityMapping
LeaseManager leases.Manager
GarbageCollect func(context.Context) (gc.Stats, error)
}
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
@ -94,7 +94,6 @@ type Worker struct {
func NewWorker(opt WorkerOpt) (*Worker, error) {
imageRefChecker := imagerefchecker.New(imagerefchecker.Opt{
ImageStore: opt.ImageStore,
Snapshotter: opt.Snapshotter,
ContentStore: opt.ContentStore,
})
@ -102,6 +101,10 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
Snapshotter: opt.Snapshotter,
MetadataStore: opt.MetadataStore,
PruneRefChecker: imageRefChecker,
Applier: opt.Applier,
GarbageCollect: opt.GarbageCollect,
LeaseManager: opt.LeaseManager,
ContentStore: opt.ContentStore,
})
if err != nil {
return nil, err
@ -332,7 +335,7 @@ func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter,
}
func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) {
diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore(), w.Snapshotter, w.Differ, ref, createIfNeeded)
diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore(), w.Differ, ref, createIfNeeded)
if err != nil {
return nil, errors.Wrap(err, "failed calculating diff pairs for exported snapshot")
}
@ -384,7 +387,7 @@ func getCreatedTimes(ref cache.ImmutableRef) (out []time.Time) {
return append(out, cache.GetCreatedAt(ref.Metadata()))
}
func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) {
func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cache.ImmutableRef, err error) {
ctx, done, err := leaseutil.WithLease(ctx, w.LeaseManager)
if err != nil {
return nil, err
@ -416,85 +419,78 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.I
return nil, err
}
cd, release := snapshot.NewContainerdSnapshotter(w.Snapshotter)
defer release()
unpackProgressDone := oneOffProgress(ctx, "unpacking")
chainIDs, refs, err := w.unpack(ctx, w.CacheManager, remote.Descriptors, cd)
if err != nil {
return nil, unpackProgressDone(err)
}
defer func() {
for _, ref := range refs {
ref.Release(context.TODO())
}
err = unpackProgressDone(err)
}()
unpackProgressDone(nil)
for i, chainID := range chainIDs {
var current cache.ImmutableRef
for i, desc := range remote.Descriptors {
tm := time.Now()
if tmstr, ok := remote.Descriptors[i].Annotations[labelCreatedAt]; ok {
if tmstr, ok := desc.Annotations[labelCreatedAt]; ok {
if err := (&tm).UnmarshalText([]byte(tmstr)); err != nil {
return nil, err
}
}
descr := fmt.Sprintf("imported %s", remote.Descriptors[i].Digest)
if v, ok := remote.Descriptors[i].Annotations["buildkit/description"]; ok {
if v, ok := desc.Annotations["buildkit/description"]; ok {
descr = v
}
ref, err := w.CacheManager.Get(ctx, chainID, cache.WithDescription(descr), cache.WithCreationTime(tm))
ref, err := w.CacheManager.GetByBlob(ctx, desc, current, cache.WithDescription(descr), cache.WithCreationTime(tm))
if current != nil {
current.Release(context.TODO())
}
if err != nil {
return nil, err
}
if i == len(remote.Descriptors)-1 {
return ref, nil
if err := ref.Extract(ctx); err != nil {
return nil, err
}
ref.Release(context.TODO())
current = ref
}
return nil, errors.Errorf("unreachable")
return current, nil
}
func (w *Worker) unpack(ctx context.Context, cm cache.Manager, descs []ocispec.Descriptor, s cdsnapshot.Snapshotter) (ids []string, refs []cache.ImmutableRef, err error) {
defer func() {
if err != nil {
for _, r := range refs {
r.Release(context.TODO())
}
}
}()
// func (w *Worker) unpack(ctx context.Context, cm cache.Manager, descs []ocispec.Descriptor, s cdsnapshot.Snapshotter) (ids []string, refs []cache.ImmutableRef, err error) {
// defer func() {
// if err != nil {
// for _, r := range refs {
// r.Release(context.TODO())
// }
// }
// }()
layers, err := getLayers(ctx, descs)
if err != nil {
return nil, nil, err
}
// layers, err := getLayers(ctx, descs)
// if err != nil {
// return nil, nil, err
// }
var chain []digest.Digest
for _, layer := range layers {
newChain := append(chain, layer.Diff.Digest)
// var chain []digest.Digest
// for _, layer := range layers {
// newChain := append(chain, layer.Diff.Digest)
chainID := ociidentity.ChainID(newChain)
ref, err := cm.Get(ctx, string(chainID))
if err == nil {
refs = append(refs, ref)
} else {
if _, err := rootfs.ApplyLayer(ctx, layer, chain, s, w.Applier); err != nil {
return nil, nil, err
}
}
chain = newChain
// chainID := ociidentity.ChainID(newChain)
// ref, err := cm.Get(ctx, string(chainID))
// if err == nil {
// refs = append(refs, ref)
// } else {
// if _, err := rootfs.ApplyLayer(ctx, layer, chain, s, w.Applier); err != nil {
// return nil, nil, err
// }
// }
// chain = newChain
if err := w.Snapshotter.SetBlob(ctx, string(chainID), layer.Diff.Digest, layer.Blob.Digest); err != nil {
return nil, nil, err
}
}
// if err := w.Snapshotter.SetBlob(ctx, string(chainID), layer.Diff.Digest, layer.Blob.Digest); err != nil {
// return nil, nil, err
// }
// }
ids = make([]string, len(chain))
for i := range chain {
ids[i] = string(ociidentity.ChainID(chain[:i+1]))
}
// ids = make([]string, len(chain))
// for i := range chain {
// ids[i] = string(ociidentity.ChainID(chain[:i+1]))
// }
return ids, refs, nil
}
// return ids, refs, nil
// }
// Labels returns default labels
// utility function. could be moved to the constructor logic?

View File

@ -5,25 +5,21 @@ import (
"os"
"path/filepath"
"strings"
"time"
"github.com/containerd/containerd"
introspection "github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/leases"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/executor/containerdexecutor"
"github.com/moby/buildkit/executor/oci"
"github.com/moby/buildkit/identity"
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/network/netproviders"
"github.com/moby/buildkit/util/throttle"
"github.com/moby/buildkit/util/winlayers"
"github.com/moby/buildkit/worker/base"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// NewWorkerOpt creates a WorkerOpt.
@ -64,28 +60,17 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
xlabels[k] = v
}
throttledGC := throttle.Throttle(time.Second, func() {
// TODO: how to avoid this?
ctx := context.TODO()
snapshotter := client.SnapshotService(snapshotterName)
ctx = namespaces.WithNamespace(ctx, ns)
key := identity.NewID()
if _, err := snapshotter.Prepare(ctx, key, "", snapshots.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
})); err != nil {
logrus.Errorf("GC error: %+v", err)
}
if err := snapshotter.Remove(ctx, key); err != nil {
logrus.Errorf("GC error: %+v", err)
}
})
lm := leaseutil.WithNamespace(client.LeasesService(), ns)
gc := func(ctx context.Context) error {
throttledGC()
return nil
gc := func(ctx context.Context) (gc.Stats, error) {
l, err := lm.Create(ctx)
if err != nil {
return nil, nil
}
return nil, lm.Delete(ctx, leases.Lease{ID: l.ID}, leases.SynchronousDelete)
}
cs := containerdsnapshot.NewContentStore(client.ContentStore(), ns, gc)
cs := containerdsnapshot.NewContentStore(client.ContentStore(), ns)
resp, err := client.IntrospectionService().Plugins(context.TODO(), &introspection.PluginsRequest{Filters: []string{"type==io.containerd.runtime.v1"}})
if err != nil {
@ -112,17 +97,18 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
}
opt := base.WorkerOpt{
ID: id,
Labels: xlabels,
MetadataStore: md,
Executor: containerdexecutor.New(client, root, "", np, dns),
Snapshotter: containerdsnapshot.NewSnapshotter(snapshotterName, client.SnapshotService(snapshotterName), cs, md, ns, gc, nil),
ContentStore: cs,
Applier: winlayers.NewFileSystemApplierWithWindows(cs, df),
Differ: winlayers.NewWalkingDiffWithWindows(cs, df),
ImageStore: client.ImageService(),
Platforms: platforms,
LeaseManager: leaseutil.WithNamespace(client.LeasesService(), ns),
ID: id,
Labels: xlabels,
MetadataStore: md,
Executor: containerdexecutor.New(client, root, "", np, dns),
Snapshotter: containerdsnapshot.NewSnapshotter(snapshotterName, client.SnapshotService(snapshotterName), ns, nil),
ContentStore: cs,
Applier: winlayers.NewFileSystemApplierWithWindows(cs, df),
Differ: winlayers.NewWalkingDiffWithWindows(cs, df),
ImageStore: client.ImageService(),
Platforms: platforms,
LeaseManager: lm,
GarbageCollect: gc,
}
return opt, nil
}

View File

@ -4,7 +4,6 @@ import (
"context"
"os"
"path/filepath"
"time"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/diff/apply"
@ -19,11 +18,9 @@ import (
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/network/netproviders"
"github.com/moby/buildkit/util/throttle"
"github.com/moby/buildkit/util/winlayers"
"github.com/moby/buildkit/worker/base"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
)
@ -85,18 +82,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
return opt, err
}
throttledGC := throttle.Throttle(time.Second, func() {
if _, err := mdb.GarbageCollect(context.TODO()); err != nil {
logrus.Errorf("GC error: %+v", err)
}
})
gc := func(ctx context.Context) error {
throttledGC()
return nil
}
c = containerdsnapshot.NewContentStore(mdb.ContentStore(), "buildkit", gc)
c = containerdsnapshot.NewContentStore(mdb.ContentStore(), "buildkit")
id, err := base.ID(root)
if err != nil {
@ -111,7 +97,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
Labels: xlabels,
MetadataStore: md,
Executor: exe,
Snapshotter: containerdsnapshot.NewSnapshotter(snFactory.Name, mdb.Snapshotter(snFactory.Name), c, md, "buildkit", gc, idmap),
Snapshotter: containerdsnapshot.NewSnapshotter(snFactory.Name, mdb.Snapshotter(snFactory.Name), "buildkit", idmap),
ContentStore: c,
Applier: winlayers.NewFileSystemApplierWithWindows(c, apply.NewFileSystemApplier(c)),
Differ: winlayers.NewWalkingDiffWithWindows(c, walking.NewWalkingDiff(c)),
@ -119,6 +105,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
Platforms: []specs.Platform{platforms.Normalize(platforms.DefaultSpec())},
IdentityMapping: idmap,
LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"),
GarbageCollect: mdb.GarbageCollect,
}
return opt, nil
}