Add support for lazily-pulled blobs in cache manager.

This allows the layers of images to only be pulled if/once they are actually
required.

Signed-off-by: Erik Sipsma <erik@sipsma.dev>
v0.8
Erik Sipsma 2020-05-28 13:46:33 -07:00
parent ba0150f7b2
commit 55cbd19dec
38 changed files with 1677 additions and 1039 deletions

235
cache/blobs.go vendored Normal file
View File

@ -0,0 +1,235 @@
package cache
import (
"context"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/winlayers"
digest "github.com/opencontainers/go-digest"
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
var g flightcontrol.Group
const containerdUncompressed = "containerd.io/uncompressed"
type CompareWithParent interface {
CompareWithParent(ctx context.Context, ref string, opts ...diff.Opt) (ocispec.Descriptor, error)
}
var ErrNoBlobs = errors.Errorf("no blobs for snapshot")
// computeBlobChain ensures every ref in a parent chain has an associated blob in the content store. If
// a blob is missing and createIfNeeded is true, then the blob will be created, otherwise ErrNoBlobs will
// be returned. Caller must hold a lease when calling this function.
func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded bool, compressionType compression.Type) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for computeBlobChain")
}
if err := sr.Finalize(ctx, true); err != nil {
return err
}
if isTypeWindows(sr) {
ctx = winlayers.UseWindowsLayerMode(ctx)
}
return computeBlobChain(ctx, sr, createIfNeeded, compressionType)
}
func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type) error {
baseCtx := ctx
eg, ctx := errgroup.WithContext(ctx)
var currentDescr ocispec.Descriptor
if sr.parent != nil {
eg.Go(func() error {
return computeBlobChain(ctx, sr.parent, createIfNeeded, compressionType)
})
}
eg.Go(func() error {
dp, err := g.Do(ctx, sr.ID(), func(ctx context.Context) (interface{}, error) {
refInfo := sr.Info()
if refInfo.Blob != "" {
return nil, nil
} else if !createIfNeeded {
return nil, errors.WithStack(ErrNoBlobs)
}
var mediaType string
switch compressionType {
case compression.Uncompressed:
mediaType = ocispec.MediaTypeImageLayer
case compression.Gzip:
mediaType = ocispec.MediaTypeImageLayerGzip
default:
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
}
var descr ocispec.Descriptor
var err error
if pc, ok := sr.cm.Differ.(CompareWithParent); ok {
descr, err = pc.CompareWithParent(ctx, sr.ID(), diff.WithMediaType(mediaType))
if err != nil {
return nil, err
}
}
if descr.Digest == "" {
// reference needs to be committed
var lower []mount.Mount
if sr.parent != nil {
m, err := sr.parent.Mount(ctx, true)
if err != nil {
return nil, err
}
var release func() error
lower, release, err = m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
}
m, err := sr.Mount(ctx, true)
if err != nil {
return nil, err
}
upper, release, err := m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
descr, err = sr.cm.Differ.Compare(ctx, lower, upper,
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
)
if err != nil {
return nil, err
}
}
if descr.Annotations == nil {
descr.Annotations = map[string]string{}
}
info, err := sr.cm.ContentStore.Info(ctx, descr.Digest)
if err != nil {
return nil, err
}
if diffID, ok := info.Labels[containerdUncompressed]; ok {
descr.Annotations[containerdUncompressed] = diffID
} else if compressionType == compression.Uncompressed {
descr.Annotations[containerdUncompressed] = descr.Digest.String()
} else {
return nil, errors.Errorf("unknown layer compression type")
}
return descr, nil
})
if err != nil {
return err
}
if dp != nil {
currentDescr = dp.(ocispec.Descriptor)
}
return nil
})
err := eg.Wait()
if err != nil {
return err
}
if currentDescr.Digest != "" {
if err := sr.setBlob(baseCtx, currentDescr); err != nil {
return err
}
}
return nil
}
// setBlob associates a blob with the cache record.
// A lease must be held for the blob when calling this function
// Caller should call Info() for knowing what current values are actually set
func (sr *immutableRef) setBlob(ctx context.Context, desc ocispec.Descriptor) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for setBlob")
}
diffID, err := diffIDFromDescriptor(desc)
if err != nil {
return err
}
if _, err := sr.cm.ContentStore.Info(ctx, desc.Digest); err != nil {
return err
}
sr.mu.Lock()
defer sr.mu.Unlock()
if getChainID(sr.md) != "" {
return nil
}
if err := sr.finalize(ctx, true); err != nil {
return err
}
p := sr.parent
var parentChainID digest.Digest
var parentBlobChainID digest.Digest
if p != nil {
pInfo := p.Info()
if pInfo.ChainID == "" || pInfo.BlobChainID == "" {
return errors.Errorf("failed to set blob for reference with non-addressable parent")
}
parentChainID = pInfo.ChainID
parentBlobChainID = pInfo.BlobChainID
}
if err := sr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{
ID: desc.Digest.String(),
Type: "content",
}); err != nil {
return err
}
queueDiffID(sr.md, diffID.String())
queueBlob(sr.md, desc.Digest.String())
chainID := diffID
blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID})
if parentChainID != "" {
chainID = imagespecidentity.ChainID([]digest.Digest{parentChainID, chainID})
blobChainID = imagespecidentity.ChainID([]digest.Digest{parentBlobChainID, blobChainID})
}
queueChainID(sr.md, chainID.String())
queueBlobChainID(sr.md, blobChainID.String())
queueMediaType(sr.md, desc.MediaType)
queueBlobSize(sr.md, desc.Size)
if err := sr.md.Commit(); err != nil {
return err
}
return nil
}
func isTypeWindows(sr *immutableRef) bool {
if GetLayerType(sr) == "windows" {
return true
}
if parent := sr.parent; parent != nil {
return isTypeWindows(parent)
}
return false
}

194
cache/blobs/blobs.go vendored
View File

@ -1,194 +0,0 @@
package blobs
import (
"context"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/winlayers"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
var g flightcontrol.Group
const containerdUncompressed = "containerd.io/uncompressed"
type DiffPair struct {
DiffID digest.Digest
Blobsum digest.Digest
}
type CompareWithParent interface {
CompareWithParent(ctx context.Context, ref string, opts ...diff.Opt) (ocispec.Descriptor, error)
}
var ErrNoBlobs = errors.Errorf("no blobs for snapshot")
// GetDiffPairs returns the DiffID/Blobsum pairs for a giver reference and saves it.
// Caller must hold a lease when calling this function.
func GetDiffPairs(ctx context.Context, contentStore content.Store, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool, compression CompressionType) ([]DiffPair, error) {
if ref == nil {
return nil, nil
}
if _, ok := leases.FromContext(ctx); !ok {
return nil, errors.Errorf("missing lease requirement for GetDiffPairs")
}
if err := ref.Finalize(ctx, true); err != nil {
return nil, err
}
if isTypeWindows(ref) {
ctx = winlayers.UseWindowsLayerMode(ctx)
}
return getDiffPairs(ctx, contentStore, differ, ref, createBlobs, compression)
}
func getDiffPairs(ctx context.Context, contentStore content.Store, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool, compression CompressionType) ([]DiffPair, error) {
if ref == nil {
return nil, nil
}
baseCtx := ctx
eg, ctx := errgroup.WithContext(ctx)
var diffPairs []DiffPair
var currentDescr ocispec.Descriptor
parent := ref.Parent()
if parent != nil {
defer parent.Release(context.TODO())
eg.Go(func() error {
dp, err := getDiffPairs(ctx, contentStore, differ, parent, createBlobs, compression)
if err != nil {
return err
}
diffPairs = dp
return nil
})
}
eg.Go(func() error {
dp, err := g.Do(ctx, ref.ID(), func(ctx context.Context) (interface{}, error) {
refInfo := ref.Info()
if refInfo.Blob != "" {
return nil, nil
} else if !createBlobs {
return nil, errors.WithStack(ErrNoBlobs)
}
var mediaType string
var descr ocispec.Descriptor
var err error
switch compression {
case Uncompressed:
mediaType = ocispec.MediaTypeImageLayer
case Gzip:
mediaType = ocispec.MediaTypeImageLayerGzip
default:
return nil, errors.Errorf("unknown layer compression type")
}
if pc, ok := differ.(CompareWithParent); ok {
descr, err = pc.CompareWithParent(ctx, ref.ID(), diff.WithMediaType(mediaType))
if err != nil {
return nil, err
}
}
if descr.Digest == "" {
// reference needs to be committed
parent := ref.Parent()
var lower []mount.Mount
var release func() error
if parent != nil {
defer parent.Release(context.TODO())
m, err := parent.Mount(ctx, true)
if err != nil {
return nil, err
}
lower, release, err = m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
}
m, err := ref.Mount(ctx, true)
if err != nil {
return nil, err
}
upper, release, err := m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
descr, err = differ.Compare(ctx, lower, upper,
diff.WithMediaType(mediaType),
diff.WithReference(ref.ID()),
)
if err != nil {
return nil, err
}
}
if descr.Annotations == nil {
descr.Annotations = map[string]string{}
}
info, err := contentStore.Info(ctx, descr.Digest)
if err != nil {
return nil, err
}
if diffID, ok := info.Labels[containerdUncompressed]; ok {
descr.Annotations[containerdUncompressed] = diffID
} else if compression == Uncompressed {
descr.Annotations[containerdUncompressed] = descr.Digest.String()
} else {
return nil, errors.Errorf("unknown layer compression type")
}
return descr, nil
})
if err != nil {
return err
}
if dp != nil {
currentDescr = dp.(ocispec.Descriptor)
}
return nil
})
err := eg.Wait()
if err != nil {
return nil, err
}
if currentDescr.Digest != "" {
if err := ref.SetBlob(baseCtx, currentDescr); err != nil {
return nil, err
}
}
refInfo := ref.Info()
return append(diffPairs, DiffPair{DiffID: refInfo.DiffID, Blobsum: refInfo.Blob}), nil
}
func isTypeWindows(ref cache.ImmutableRef) bool {
if cache.GetLayerType(ref) == "windows" {
return true
}
if parent := ref.Parent(); parent != nil {
defer parent.Release(context.TODO())
return isTypeWindows(parent)
}
return false
}

88
cache/manager.go vendored
View File

@ -8,6 +8,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/leases"
@ -16,6 +17,7 @@ import (
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/flightcontrol"
digest "github.com/opencontainers/go-digest"
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -38,6 +40,7 @@ type ManagerOpt struct {
PruneRefChecker ExternalRefCheckerFunc
GarbageCollect func(ctx context.Context) (gc.Stats, error)
Applier diff.Applier
Differ diff.Comparer
}
type Accessor interface {
@ -45,7 +48,7 @@ type Accessor interface {
Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error)
New(ctx context.Context, parent ImmutableRef, opts ...RefOption) (MutableRef, error)
GetMutable(ctx context.Context, id string) (MutableRef, error) // Rebase?
GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) // Rebase?
IdentityMapping() *idtools.IdentityMapping
Metadata(string) *metadata.StorageItem
}
@ -74,6 +77,7 @@ type cacheManager struct {
md *metadata.Store
muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group
}
func NewManager(opt ManagerOpt) (Manager, error) {
@ -92,7 +96,7 @@ func NewManager(opt ManagerOpt) (Manager, error) {
return cm, nil
}
func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ir ImmutableRef, err error) {
func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ir ImmutableRef, rerr error) {
diffID, err := diffIDFromDescriptor(desc)
if err != nil {
return nil, err
@ -100,9 +104,12 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor,
chainID := diffID
blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID})
if desc.Digest != "" {
if _, err := cm.ContentStore.Info(ctx, desc.Digest); err != nil {
return nil, errors.Wrapf(err, "failed to get blob %s", desc.Digest)
descHandlers := descHandlersOf(opts...)
if descHandlers == nil || descHandlers[desc.Digest] == nil {
if _, err := cm.ContentStore.Info(ctx, desc.Digest); errors.Is(err, errdefs.ErrNotFound) {
return nil, NeedsRemoteProvidersError([]digest.Digest{desc.Digest})
} else if err != nil {
return nil, err
}
}
@ -115,7 +122,8 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor,
}
chainID = imagespecidentity.ChainID([]digest.Digest{pInfo.ChainID, chainID})
blobChainID = imagespecidentity.ChainID([]digest.Digest{pInfo.BlobChainID, blobChainID})
p2, err := cm.Get(ctx, parent.ID(), NoUpdateLastUsed)
p2, err := cm.Get(ctx, parent.ID(), NoUpdateLastUsed, descHandlers)
if err != nil {
return nil, err
}
@ -128,7 +136,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor,
releaseParent := false
defer func() {
if releaseParent || err != nil && p != nil {
if releaseParent || rerr != nil && p != nil {
p.Release(context.TODO())
}
}()
@ -187,7 +195,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor,
}
defer func() {
if err != nil {
if rerr != nil {
if err := cm.ManagerOpt.LeaseManager.Delete(context.TODO(), leases.Lease{
ID: l.ID,
}); err != nil {
@ -233,6 +241,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor,
queueSnapshotID(rec.md, snapshotID)
queueBlobOnly(rec.md, blobOnly)
queueMediaType(rec.md, desc.MediaType)
queueBlobSize(rec.md, desc.Size)
queueCommitted(rec.md)
if err := rec.md.Commit(); err != nil {
@ -241,7 +250,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor,
cm.records[id] = rec
return rec.ref(true), nil
return rec.ref(true, descHandlers), nil
}
// init loads all snapshots from metadata state and tries to load the records
@ -307,25 +316,52 @@ func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (
}
}
descHandlers := descHandlersOf(opts...)
if rec.mutable {
if len(rec.refs) != 0 {
return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
}
if rec.equalImmutable != nil {
return rec.equalImmutable.ref(triggerUpdate), nil
return rec.equalImmutable.ref(triggerUpdate, descHandlers), nil
}
return rec.mref(triggerUpdate).commit(ctx)
return rec.mref(triggerUpdate, descHandlers).commit(ctx)
}
return rec.ref(triggerUpdate), nil
return rec.ref(triggerUpdate, descHandlers), nil
}
// getRecord returns record for id. Requires manager lock.
func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOption) (cr *cacheRecord, retErr error) {
checkLazyProviders := func(rec *cacheRecord) error {
missing := NeedsRemoteProvidersError(nil)
dhs := descHandlersOf(opts...)
for {
blob := digest.Digest(getBlob(rec.md))
if isLazy, err := rec.isLazy(ctx); err != nil {
return err
} else if isLazy && dhs[blob] == nil {
missing = append(missing, blob)
}
if rec.parent == nil {
break
}
rec = rec.parent.cacheRecord
}
if len(missing) > 0 {
return missing
}
return nil
}
if rec, ok := cm.records[id]; ok {
if rec.isDead() {
return nil, errors.Wrapf(errNotFound, "failed to get dead record %s", id)
}
if err := checkLazyProviders(rec); err != nil {
return nil, err
}
return rec, nil
}
@ -346,7 +382,8 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt
mu: &sync.Mutex{},
cm: cm,
refs: make(map[ref]struct{}),
parent: mutable.parentRef(false),
// mutable refs are always non-lazy, so we can set parent desc handlers to nil
parent: mutable.parentRef(false, nil),
md: md,
equalMutable: &mutableRef{cacheRecord: mutable},
}
@ -393,6 +430,9 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt
}
cm.records[id] = rec
if err := checkLazyProviders(rec); err != nil {
return nil, err
}
return rec, nil
}
@ -403,14 +443,21 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti
var parentID string
var parentSnapshotID string
if s != nil {
p, err := cm.Get(ctx, s.ID(), NoUpdateLastUsed)
if _, ok := s.(*immutableRef); ok {
parent = s.Clone().(*immutableRef)
} else {
p, err := cm.Get(ctx, s.ID(), append(opts, NoUpdateLastUsed)...)
if err != nil {
return nil, err
}
if err := p.Finalize(ctx, true); err != nil {
parent = p.(*immutableRef)
}
if err := parent.Finalize(ctx, true); err != nil {
return nil, err
}
if err := parent.Extract(ctx); err != nil {
return nil, err
}
parent = p.(*immutableRef)
parentSnapshotID = getSnapshotID(parent.md)
parentID = parent.ID()
}
@ -473,13 +520,14 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti
cm.records[id] = rec // TODO: save to db
return rec.mref(true), nil
return rec.mref(true, nil), nil
}
func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, error) {
func (cm *cacheManager) GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
rec, err := cm.getRecord(ctx, id)
rec, err := cm.getRecord(ctx, id, opts...)
if err != nil {
return nil, err
}
@ -505,7 +553,7 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef,
rec.equalImmutable = nil
}
return rec.mref(true), nil
return rec.mref(true, descHandlersOf(opts...)), nil
}
func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error {

16
cache/manager_test.go vendored
View File

@ -455,7 +455,11 @@ func TestExtractOnMutable(t *testing.T) {
_, err = cm.GetByBlob(ctx, desc2, snap)
require.Error(t, err)
err = snap.SetBlob(ctx, desc)
leaseCtx, done, err := leaseutil.WithLease(ctx, co.lm, leases.WithExpiration(0))
require.NoError(t, err)
err = snap.(*immutableRef).setBlob(leaseCtx, desc)
done(context.TODO())
require.NoError(t, err)
snap2, err := cm.GetByBlob(ctx, desc2, snap)
@ -535,6 +539,10 @@ func TestSetBlob(t *testing.T) {
defer cleanup()
ctx, done, err := leaseutil.WithLease(ctx, co.lm, leaseutil.MakeTemporary)
require.NoError(t, err)
defer done(context.TODO())
cm := co.manager
active, err := cm.New(ctx, nil)
@ -560,7 +568,7 @@ func TestSetBlob(t *testing.T) {
err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), desc)
require.NoError(t, err)
err = snap.SetBlob(ctx, ocispec.Descriptor{
err = snap.(*immutableRef).setBlob(ctx, ocispec.Descriptor{
Digest: digest.FromBytes([]byte("foobar")),
Annotations: map[string]string{
"containerd.io/uncompressed": digest.FromBytes([]byte("foobar2")).String(),
@ -568,7 +576,7 @@ func TestSetBlob(t *testing.T) {
})
require.Error(t, err)
err = snap.SetBlob(ctx, desc)
err = snap.(*immutableRef).setBlob(ctx, desc)
require.NoError(t, err)
info = snap.Info()
@ -592,7 +600,7 @@ func TestSetBlob(t *testing.T) {
err = content.WriteBlob(ctx, co.cs, "ref2", bytes.NewBuffer(b2), desc2)
require.NoError(t, err)
err = snap2.SetBlob(ctx, desc2)
err = snap2.(*immutableRef).setBlob(ctx, desc2)
require.NoError(t, err)
info2 := snap2.Info()

26
cache/metadata.go vendored
View File

@ -29,6 +29,9 @@ const keySnapshot = "cache.snapshot"
const keyBlobOnly = "cache.blobonly"
const keyMediaType = "cache.mediatype"
// BlobSize is the packed blob size as specified in the oci descriptor
const keyBlobSize = "cache.blobsize"
const keyDeleted = "cache.deleted"
func queueDiffID(si *metadata.StorageItem, str string) error {
@ -307,6 +310,29 @@ func getSize(si *metadata.StorageItem) int64 {
return size
}
func queueBlobSize(si *metadata.StorageItem, s int64) error {
v, err := metadata.NewValue(s)
if err != nil {
return errors.Wrap(err, "failed to create blobsize value")
}
si.Queue(func(b *bolt.Bucket) error {
return si.SetValue(b, keyBlobSize, v)
})
return nil
}
func getBlobSize(si *metadata.StorageItem) int64 {
v := si.Get(keyBlobSize)
if v == nil {
return sizeUnknown
}
var size int64
if err := v.Unmarshal(&size); err != nil {
return sizeUnknown
}
return size
}
func getEqualMutable(si *metadata.StorageItem) string {
v := si.Get(keyEqualMutable)
if v == nil {

34
cache/opts.go vendored Normal file
View File

@ -0,0 +1,34 @@
package cache
import (
"fmt"
"github.com/containerd/containerd/content"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
)
type DescHandler struct {
Provider content.Provider
ImageRef string
Progress progress.Controller
}
type DescHandlers map[digest.Digest]*DescHandler
func descHandlersOf(opts ...RefOption) DescHandlers {
for _, opt := range opts {
if opt, ok := opt.(DescHandlers); ok {
return opt
}
}
return nil
}
type DescHandlerKey digest.Digest
type NeedsRemoteProvidersError []digest.Digest
func (m NeedsRemoteProvidersError) Error() string {
return fmt.Sprintf("missing descriptor handlers for lazy blobs %+v", []digest.Digest(m))
}

101
cache/progress.go vendored Normal file
View File

@ -0,0 +1,101 @@
package cache
import (
"context"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/moby/buildkit/util/progress"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type providerWithProgress struct {
provider content.Provider
manager interface {
content.IngestManager
content.Manager
}
}
func (p *providerWithProgress) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
doneCh := make(chan struct{})
go func() {
ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop()
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
ingestRef := remotes.MakeRefKey(ctx, desc)
started := time.Now()
onFinalStatus := false
for {
if onFinalStatus {
return
}
select {
case <-doneCh:
onFinalStatus = true
case <-ctx.Done():
onFinalStatus = true
case <-ticker.C:
}
status, err := p.manager.Status(ctx, ingestRef)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Action: "downloading",
Current: int(status.Offset),
Total: int(status.Total),
Started: &started,
})
continue
} else if !errors.Is(err, errdefs.ErrNotFound) {
logrus.Errorf("unexpected error getting ingest status of %q: %v", ingestRef, err)
return
}
info, err := p.manager.Info(ctx, desc.Digest)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Action: "done",
Current: int(info.Size),
Total: int(info.Size),
Started: &started,
Completed: &info.CreatedAt,
})
return
}
if errors.Is(err, errdefs.ErrNotFound) {
pw.Write(desc.Digest.String(), progress.Status{
Action: "waiting",
})
} else {
logrus.Errorf("unexpected error getting content status of %q: %v", desc.Digest.String(), err)
return
}
}
}()
ra, err := p.provider.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
return readerAtWithCloseCh{ReaderAt: ra, closeCh: doneCh}, nil
}
type readerAtWithCloseCh struct {
content.ReaderAt
closeCh chan struct{}
}
func (ra readerAtWithCloseCh) Close() error {
close(ra.closeCh)
return ra.ReaderAt.Close()
}

247
cache/refs.go vendored
View File

@ -15,13 +15,16 @@ import (
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/winlayers"
digest "github.com/opencontainers/go-digest"
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
// Ref is a reference to cacheable objects.
@ -41,8 +44,8 @@ type ImmutableRef interface {
Clone() ImmutableRef
Info() RefInfo
SetBlob(ctx context.Context, desc ocispec.Descriptor) error
Extract(ctx context.Context) error // +progress
GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type) (*solver.Remote, error)
}
type RefInfo struct {
@ -93,15 +96,23 @@ type cacheRecord struct {
}
// hold ref lock before calling
func (cr *cacheRecord) ref(triggerLastUsed bool) *immutableRef {
ref := &immutableRef{cacheRecord: cr, triggerLastUsed: triggerLastUsed}
func (cr *cacheRecord) ref(triggerLastUsed bool, descHandlers DescHandlers) *immutableRef {
ref := &immutableRef{
cacheRecord: cr,
triggerLastUsed: triggerLastUsed,
descHandlers: descHandlers,
}
cr.refs[ref] = struct{}{}
return ref
}
// hold ref lock before calling
func (cr *cacheRecord) mref(triggerLastUsed bool) *mutableRef {
ref := &mutableRef{cacheRecord: cr, triggerLastUsed: triggerLastUsed}
func (cr *cacheRecord) mref(triggerLastUsed bool, descHandlers DescHandlers) *mutableRef {
ref := &mutableRef{
cacheRecord: cr,
triggerLastUsed: triggerLastUsed,
descHandlers: descHandlers,
}
cr.refs[ref] = struct{}{}
return ref
}
@ -131,6 +142,17 @@ func (cr *cacheRecord) isDead() bool {
return cr.dead || (cr.equalImmutable != nil && cr.equalImmutable.dead) || (cr.equalMutable != nil && cr.equalMutable.dead)
}
func (cr *cacheRecord) isLazy(ctx context.Context) (bool, error) {
if !getBlobOnly(cr.md) {
return false, nil
}
_, err := cr.cm.ContentStore.Info(ctx, digest.Digest(getBlob(cr.md)))
if errors.Is(err, errdefs.ErrNotFound) {
return true, nil
}
return false, err
}
func (cr *cacheRecord) IdentityMapping() *idtools.IdentityMapping {
return cr.cm.IdentityMapping()
}
@ -186,27 +208,18 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) {
return s.(int64), nil
}
func (cr *cacheRecord) Parent() ImmutableRef {
if p := cr.parentRef(true); p != nil { // avoid returning typed nil pointer
return p
}
return nil
}
func (cr *cacheRecord) parentRef(hidden bool) *immutableRef {
func (cr *cacheRecord) parentRef(hidden bool, descHandlers DescHandlers) *immutableRef {
p := cr.parent
if p == nil {
return nil
}
p.mu.Lock()
defer p.mu.Unlock()
return p.ref(hidden)
return p.ref(hidden, descHandlers)
}
func (cr *cacheRecord) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
// must be called holding cacheRecord mu
func (cr *cacheRecord) mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
if cr.mutable {
m, err := cr.cm.Snapshotter.Mounts(ctx, getSnapshotID(cr.md))
if err != nil {
@ -282,20 +295,29 @@ func (cr *cacheRecord) ID() string {
type immutableRef struct {
*cacheRecord
triggerLastUsed bool
descHandlers DescHandlers
}
type mutableRef struct {
*cacheRecord
triggerLastUsed bool
descHandlers DescHandlers
}
func (sr *immutableRef) Clone() ImmutableRef {
sr.mu.Lock()
ref := sr.ref(false)
ref := sr.ref(false, sr.descHandlers)
sr.mu.Unlock()
return ref
}
func (sr *immutableRef) Parent() ImmutableRef {
if p := sr.parentRef(true, sr.descHandlers); p != nil { // avoid returning typed nil pointer
return p
}
return nil
}
func (sr *immutableRef) Info() RefInfo {
return RefInfo{
ChainID: digest.Digest(getChainID(sr.md)),
@ -308,25 +330,120 @@ func (sr *immutableRef) Info() RefInfo {
}
}
func (sr *immutableRef) Extract(ctx context.Context) error {
_, err := sr.sizeG.Do(ctx, sr.ID()+"-extract", func(ctx context.Context) (interface{}, error) {
func (sr *immutableRef) ociDesc() (ocispec.Descriptor, error) {
desc := ocispec.Descriptor{
Digest: digest.Digest(getBlob(sr.md)),
Size: getBlobSize(sr.md),
MediaType: getMediaType(sr.md),
Annotations: make(map[string]string),
}
diffID := getDiffID(sr.md)
if diffID != "" {
desc.Annotations["containerd.io/uncompressed"] = diffID
}
createdAt := GetCreatedAt(sr.md)
if !createdAt.IsZero() {
createdAt, err := createdAt.MarshalText()
if err != nil {
return ocispec.Descriptor{}, err
}
desc.Annotations["buildkit/createdat"] = string(createdAt)
}
return desc, nil
}
// order is from parent->child, sr will be at end of slice
func (sr *immutableRef) parentRefChain() []*immutableRef {
var count int
for ref := sr; ref != nil; ref = ref.parent {
count++
}
refs := make([]*immutableRef, count)
for i, ref := count-1, sr; ref != nil; i, ref = i-1, ref.parent {
refs[i] = ref
}
return refs
}
func (sr *immutableRef) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
if getBlobOnly(sr.md) {
if err := sr.Extract(ctx); err != nil {
return nil, err
}
}
sr.mu.Lock()
defer sr.mu.Unlock()
return sr.mount(ctx, readonly)
}
func (sr *immutableRef) Extract(ctx context.Context) (rerr error) {
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return err
}
defer done(ctx)
if GetLayerType(sr) == "windows" {
ctx = winlayers.UseWindowsLayerMode(ctx)
}
return sr.extract(ctx, sr.descHandlers)
}
func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers) error {
_, err := sr.sizeG.Do(ctx, sr.ID()+"-extract", func(ctx context.Context) (_ interface{}, rerr error) {
snapshotID := getSnapshotID(sr.md)
if _, err := sr.cm.Snapshotter.Stat(ctx, snapshotID); err == nil {
queueBlobOnly(sr.md, false)
return nil, sr.md.Commit()
}
eg, egctx := errgroup.WithContext(ctx)
parentID := ""
if sr.parent != nil {
if err := sr.parent.Extract(ctx); err != nil {
return nil, err
eg.Go(func() error {
if err := sr.parent.extract(egctx, dhs); err != nil {
return err
}
parentID = getSnapshotID(sr.parent.md)
return nil
})
}
info := sr.Info()
key := fmt.Sprintf("extract-%s %s", identity.NewID(), info.ChainID)
err := sr.cm.Snapshotter.Prepare(ctx, key, parentID)
desc, err := sr.ociDesc()
if err != nil {
return nil, err
}
dh := dhs[desc.Digest]
eg.Go(func() error {
// unlazies if needed, otherwise a no-op
return lazyRefProvider{
ref: sr,
desc: desc,
dh: dh,
}.Unlazy(egctx)
})
if err := eg.Wait(); err != nil {
return nil, err
}
if dh != nil && dh.Progress != nil {
_, stopProgress := dh.Progress.Start(ctx)
defer stopProgress(rerr)
statusDone := dh.Progress.Status("extracting "+desc.Digest.String(), "extracting")
defer statusDone()
}
key := fmt.Sprintf("extract-%s %s", identity.NewID(), sr.Info().ChainID)
err = sr.cm.Snapshotter.Prepare(ctx, key, parentID)
if err != nil {
return nil, err
}
@ -339,10 +456,7 @@ func (sr *immutableRef) Extract(ctx context.Context) error {
if err != nil {
return nil, err
}
_, err = sr.cm.Applier.Apply(ctx, ocispec.Descriptor{
Digest: info.Blob,
MediaType: info.MediaType,
}, mounts)
_, err = sr.cm.Applier.Apply(ctx, desc, mounts)
if err != nil {
unmount()
return nil, err
@ -357,6 +471,7 @@ func (sr *immutableRef) Extract(ctx context.Context) error {
}
}
queueBlobOnly(sr.md, false)
setSize(sr.md, sizeUnknown)
if err := sr.md.Commit(); err != nil {
return nil, err
}
@ -365,65 +480,6 @@ func (sr *immutableRef) Extract(ctx context.Context) error {
return err
}
// SetBlob associates a blob with the cache record.
// A lease must be held for the blob when calling this function
// Caller should call Info() for knowing what current values are actually set
func (sr *immutableRef) SetBlob(ctx context.Context, desc ocispec.Descriptor) error {
diffID, err := diffIDFromDescriptor(desc)
if err != nil {
return err
}
if _, err := sr.cm.ContentStore.Info(ctx, desc.Digest); err != nil {
return err
}
sr.mu.Lock()
defer sr.mu.Unlock()
if getChainID(sr.md) != "" {
return nil
}
if err := sr.finalize(ctx, true); err != nil {
return err
}
p := sr.parent
var parentChainID digest.Digest
var parentBlobChainID digest.Digest
if p != nil {
pInfo := p.Info()
if pInfo.ChainID == "" || pInfo.BlobChainID == "" {
return errors.Errorf("failed to set blob for reference with non-addressable parent")
}
parentChainID = pInfo.ChainID
parentBlobChainID = pInfo.BlobChainID
}
if err := sr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{
ID: desc.Digest.String(),
Type: "content",
}); err != nil {
return err
}
queueDiffID(sr.md, diffID.String())
queueBlob(sr.md, desc.Digest.String())
chainID := diffID
blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID})
if parentChainID != "" {
chainID = imagespecidentity.ChainID([]digest.Digest{parentChainID, chainID})
blobChainID = imagespecidentity.ChainID([]digest.Digest{parentBlobChainID, blobChainID})
}
queueChainID(sr.md, chainID.String())
queueBlobChainID(sr.md, blobChainID.String())
queueMediaType(sr.md, desc.MediaType)
if err := sr.md.Commit(); err != nil {
return err
}
return nil
}
func (sr *immutableRef) Release(ctx context.Context) error {
sr.cm.mu.Lock()
defer sr.cm.mu.Unlock()
@ -555,7 +611,7 @@ func (sr *mutableRef) commit(ctx context.Context) (*immutableRef, error) {
rec := &cacheRecord{
mu: sr.mu,
cm: sr.cm,
parent: sr.parentRef(false),
parent: sr.parentRef(false, sr.descHandlers),
equalMutable: sr,
refs: make(map[ref]struct{}),
md: md,
@ -588,11 +644,18 @@ func (sr *mutableRef) commit(ctx context.Context) (*immutableRef, error) {
return nil, err
}
ref := rec.ref(true)
ref := rec.ref(true, sr.descHandlers)
sr.equalImmutable = ref
return ref, nil
}
func (sr *mutableRef) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
sr.mu.Lock()
defer sr.mu.Unlock()
return sr.mount(ctx, readonly)
}
func (sr *mutableRef) Commit(ctx context.Context) (ImmutableRef, error) {
sr.cm.mu.Lock()
defer sr.cm.mu.Unlock()

175
cache/remote.go vendored Normal file
View File

@ -0,0 +1,175 @@
package cache
import (
"context"
"fmt"
"net/url"
"strings"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/reference"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type Unlazier interface {
Unlazy(ctx context.Context) error
}
func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type) (*solver.Remote, error) {
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
}
defer done(ctx)
err = sr.computeBlobChain(ctx, createIfNeeded, compressionType)
if err != nil {
return nil, err
}
mprovider := lazyMultiProvider{mprovider: contentutil.NewMultiProvider(nil)}
remote := &solver.Remote{
Provider: mprovider,
}
for _, ref := range sr.parentRefChain() {
desc, err := ref.ociDesc()
if err != nil {
return nil, err
}
// NOTE: The media type might be missing for some migrated ones
// from before lease based storage. If so, we should detect
// the media type from blob data.
//
// Discussion: https://github.com/moby/buildkit/pull/1277#discussion_r352795429
if desc.MediaType == "" {
desc.MediaType, err = compression.DetectLayerMediaType(ctx, sr.cm.ContentStore, desc.Digest, false)
if err != nil {
return nil, err
}
}
dh := sr.descHandlers[desc.Digest]
// update distribution source annotation
if dh != nil && dh.ImageRef != "" {
refspec, err := reference.Parse(dh.ImageRef)
if err != nil {
return nil, err
}
u, err := url.Parse("dummy://" + refspec.Locator)
if err != nil {
return nil, err
}
source, repo := u.Hostname(), strings.TrimPrefix(u.Path, "/")
if desc.Annotations == nil {
desc.Annotations = make(map[string]string)
}
dslKey := fmt.Sprintf("%s.%s", "containerd.io/distribution.source", source)
desc.Annotations[dslKey] = repo
}
remote.Descriptors = append(remote.Descriptors, desc)
mprovider.Add(lazyRefProvider{
ref: ref,
desc: desc,
dh: sr.descHandlers[desc.Digest],
})
}
return remote, nil
}
type lazyMultiProvider struct {
mprovider *contentutil.MultiProvider
plist []lazyRefProvider
}
func (mp lazyMultiProvider) Add(p lazyRefProvider) {
mp.mprovider.Add(p.desc.Digest, p)
mp.plist = append(mp.plist, p)
}
func (mp lazyMultiProvider) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
return mp.mprovider.ReaderAt(ctx, desc)
}
func (mp lazyMultiProvider) Unlazy(ctx context.Context) error {
eg, egctx := errgroup.WithContext(ctx)
for _, p := range mp.plist {
eg.Go(func() error {
return p.Unlazy(egctx)
})
}
return eg.Wait()
}
type lazyRefProvider struct {
ref *immutableRef
desc ocispec.Descriptor
dh *DescHandler
}
func (p lazyRefProvider) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
if desc.Digest != p.desc.Digest {
return nil, errdefs.ErrNotFound
}
if err := p.Unlazy(ctx); err != nil {
return nil, err
}
return p.ref.cm.ContentStore.ReaderAt(ctx, desc)
}
func (p lazyRefProvider) Unlazy(ctx context.Context) error {
_, err := p.ref.cm.unlazyG.Do(ctx, string(p.desc.Digest), func(ctx context.Context) (_ interface{}, rerr error) {
if isLazy, err := p.ref.isLazy(ctx); err != nil {
return nil, err
} else if !isLazy {
return nil, nil
}
if p.dh == nil {
// shouldn't happen, if you have a lazy immutable ref it already should be validated
// that descriptor handlers exist for it
return nil, errors.New("unexpected nil descriptor handler")
}
if p.dh.Progress != nil {
var stopProgress func(error)
ctx, stopProgress = p.dh.Progress.Start(ctx)
defer stopProgress(rerr)
}
// For now, just pull down the whole content and then return a ReaderAt from the local content
// store. If efficient partial reads are desired in the future, something more like a "tee"
// that caches remote partial reads to a local store may need to replace this.
err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &providerWithProgress{
provider: p.dh.Provider,
manager: p.ref.cm.ContentStore,
}, p.desc)
if err != nil {
return nil, err
}
if p.dh.ImageRef != "" {
if GetDescription(p.ref.md) == "" {
queueDescription(p.ref.md, "pulled from "+p.dh.ImageRef)
err := p.ref.md.Commit()
if err != nil {
return nil, err
}
}
}
return nil, err
})
return err
}

View File

@ -23,6 +23,7 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
ctderrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshots"
@ -117,6 +118,7 @@ func TestIntegration(t *testing.T) {
testMultipleRegistryCacheImportExport,
testSourceMap,
testSourceMapFromRef,
testLazyImagePush,
}, mirrors)
integration.Run(t, []integration.Test{
@ -2047,6 +2049,140 @@ func testBuildPushAndValidate(t *testing.T, sb integration.Sandbox) {
require.False(t, ok)
}
func testLazyImagePush(t *testing.T, sb integration.Sandbox) {
skipDockerd(t, sb)
requiresLinux(t)
cdAddress := sb.ContainerdAddress()
if cdAddress == "" {
t.Skip("test requires containerd worker")
}
client, err := newContainerd(cdAddress)
require.NoError(t, err)
defer client.Close()
ctx := namespaces.WithNamespace(context.Background(), "buildkit")
registry, err := sb.NewRegistry()
if errors.Is(err, integration.ErrorRequirements) {
t.Skip(err.Error())
}
require.NoError(t, err)
c, err := New(context.TODO(), sb.Address())
require.NoError(t, err)
defer c.Close()
// push the busybox image to the mutable registry
sourceImage := "busybox:latest"
def, err := llb.Image(sourceImage).Marshal(context.TODO())
require.NoError(t, err)
targetNoTag := registry + "/buildkit/testlazyimage:"
target := targetNoTag + "latest"
_, err = c.Solve(context.TODO(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": target,
"push": "true",
},
},
},
}, nil)
require.NoError(t, err)
imageService := client.ImageService()
contentStore := client.ContentStore()
img, err := imageService.Get(ctx, target)
require.NoError(t, err)
manifest, err := images.Manifest(ctx, contentStore, img.Target, nil)
require.NoError(t, err)
for _, layer := range manifest.Layers {
_, err = contentStore.Info(ctx, layer.Digest)
require.NoError(t, err)
}
// clear all local state out
err = imageService.Delete(ctx, img.Name, images.SynchronousDelete())
require.NoError(t, err)
checkAllReleasable(t, c, sb, true)
// retag the image we just pushed with no actual changes, which
// should not result in the image getting un-lazied
def, err = llb.Image(target).Marshal(context.TODO())
require.NoError(t, err)
target2 := targetNoTag + "newtag"
_, err = c.Solve(context.TODO(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": target2,
"push": "true",
},
},
},
}, nil)
require.NoError(t, err)
img, err = imageService.Get(ctx, target2)
require.NoError(t, err)
manifest, err = images.Manifest(ctx, contentStore, img.Target, nil)
require.NoError(t, err)
for _, layer := range manifest.Layers {
_, err = contentStore.Info(ctx, layer.Digest)
require.True(t, errors.Is(err, ctderrdefs.ErrNotFound), "unexpected error %v", err)
}
// clear all local state out again
err = imageService.Delete(ctx, img.Name, images.SynchronousDelete())
require.NoError(t, err)
checkAllReleasable(t, c, sb, true)
// try a cross-repo push to same registry, which should still result in the
// image remaining lazy
target3 := registry + "/buildkit/testlazycrossrepo:latest"
_, err = c.Solve(context.TODO(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": target3,
"push": "true",
},
},
},
}, nil)
require.NoError(t, err)
img, err = imageService.Get(ctx, target3)
require.NoError(t, err)
manifest, err = images.Manifest(ctx, contentStore, img.Target, nil)
require.NoError(t, err)
for _, layer := range manifest.Layers {
_, err = contentStore.Info(ctx, layer.Digest)
require.True(t, errors.Is(err, ctderrdefs.ErrNotFound), "unexpected error %v", err)
}
// check that a subsequent build can use the previously lazy image in an exec
def, err = llb.Image(target2).Run(llb.Args([]string{"true"})).Marshal(context.TODO())
require.NoError(t, err)
_, err = c.Solve(context.TODO(), def, SolveOpt{}, nil)
require.NoError(t, err)
}
func testBasicCacheImportExport(t *testing.T, sb integration.Sandbox, cacheOptionsEntryImport, cacheOptionsEntryExport []CacheOptionsEntry) {
requiresLinux(t)
c, err := New(context.TODO(), sb.Address())

View File

@ -14,10 +14,12 @@ import (
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/rootfs"
"github.com/moby/buildkit/cache/blobs"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/push"
digest "github.com/opencontainers/go-digest"
@ -62,7 +64,7 @@ func New(opt Opt) (exporter.Exporter, error) {
func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
i := &imageExporterInstance{
imageExporter: e,
layerCompression: blobs.DefaultCompression,
layerCompression: compression.Default,
}
for k, v := range opt {
@ -134,9 +136,9 @@ func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exp
case keyLayerCompression:
switch v {
case "gzip":
i.layerCompression = blobs.Gzip
i.layerCompression = compression.Gzip
case "uncompressed":
i.layerCompression = blobs.Uncompressed
i.layerCompression = compression.Uncompressed
default:
return nil, errors.Errorf("unsupported layer compression type: %v", v)
}
@ -160,7 +162,7 @@ type imageExporterInstance struct {
ociTypes bool
nameCanonical bool
danglingPrefix string
layerCompression blobs.CompressionType
layerCompression compression.Type
meta map[string][]byte
}
@ -231,13 +233,35 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
tagDone(nil)
if e.unpack {
if err := e.unpackImage(ctx, img); err != nil {
if err := e.unpackImage(ctx, img, src); err != nil {
return nil, err
}
}
}
if e.push {
if err := push.Push(ctx, e.opt.SessionManager, sessionID, e.opt.ImageWriter.ContentStore(), desc.Digest, targetName, e.insecure, e.opt.RegistryHosts, e.pushByDigest); err != nil {
mprovider := contentutil.NewMultiProvider(e.opt.ImageWriter.ContentStore())
if src.Ref != nil {
remote, err := src.Ref.GetRemote(ctx, false, e.layerCompression)
if err != nil {
return nil, err
}
for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider)
}
}
if len(src.Refs) > 0 {
for _, r := range src.Refs {
remote, err := r.GetRemote(ctx, false, e.layerCompression)
if err != nil {
return nil, err
}
for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider)
}
}
}
if err := push.Push(ctx, e.opt.SessionManager, sessionID, mprovider, e.opt.ImageWriter.ContentStore(), desc.Digest, targetName, e.insecure, e.opt.RegistryHosts, e.pushByDigest); err != nil {
return nil, err
}
}
@ -249,7 +273,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
return resp, nil
}
func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Image) (err0 error) {
func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Image, src exporter.Source) (err0 error) {
unpackDone := oneOffProgress(ctx, "unpacking to "+img.Name)
defer func() {
unpackDone(err0)
@ -267,7 +291,28 @@ func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Imag
return err
}
layers, err := getLayers(ctx, contentStore, manifest)
topLayerRef := src.Ref
if len(src.Refs) > 0 {
if r, ok := src.Refs[platforms.DefaultString()]; ok {
topLayerRef = r
} else {
return errors.Errorf("no reference for default platform %s", platforms.DefaultString())
}
}
remote, err := topLayerRef.GetRemote(ctx, true, e.layerCompression)
if err != nil {
return err
}
// ensure the content for each layer exists locally in case any are lazy
if unlazier, ok := remote.Provider.(cache.Unlazier); ok {
if err := unlazier.Unlazy(ctx); err != nil {
return err
}
}
layers, err := getLayers(ctx, remote.Descriptors, manifest)
if err != nil {
return err
}
@ -297,21 +342,16 @@ func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Imag
return err
}
func getLayers(ctx context.Context, contentStore content.Store, manifest ocispec.Manifest) ([]rootfs.Layer, error) {
diffIDs, err := images.RootFS(ctx, contentStore, manifest.Config)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve rootfs")
}
if len(diffIDs) != len(manifest.Layers) {
func getLayers(ctx context.Context, descs []ocispec.Descriptor, manifest ocispec.Manifest) ([]rootfs.Layer, error) {
if len(descs) != len(manifest.Layers) {
return nil, errors.Errorf("mismatched image rootfs and manifest layers")
}
layers := make([]rootfs.Layer, len(diffIDs))
for i := range diffIDs {
layers := make([]rootfs.Layer, len(descs))
for i, desc := range descs {
layers[i].Diff = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayer,
Digest: diffIDs[i],
Digest: digest.Digest(desc.Annotations["containerd.io/uncompressed"]),
}
layers[i].Blob = manifest.Layers[i]
}

View File

@ -12,10 +12,11 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/blobs"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/exporter/containerimage/exptypes"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/system"
digest "github.com/opencontainers/go-digest"
@ -45,7 +46,7 @@ type ImageWriter struct {
opt WriterOpt
}
func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool, compression blobs.CompressionType) (*ocispec.Descriptor, error) {
func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool, compressionType compression.Type) (*ocispec.Descriptor, error) {
platformsBytes, ok := inp.Metadata[exptypes.ExporterPlatformsKey]
if len(inp.Refs) > 0 && !ok {
@ -53,11 +54,11 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool
}
if len(inp.Refs) == 0 {
layers, err := ic.exportLayers(ctx, compression, inp.Ref)
remotes, err := ic.exportLayers(ctx, compressionType, inp.Ref)
if err != nil {
return nil, err
}
return ic.commitDistributionManifest(ctx, inp.Ref, inp.Metadata[exptypes.ExporterImageConfigKey], layers[0], oci, inp.Metadata[exptypes.ExporterInlineCache])
return ic.commitDistributionManifest(ctx, inp.Ref, inp.Metadata[exptypes.ExporterImageConfigKey], &remotes[0], oci, inp.Metadata[exptypes.ExporterInlineCache])
}
var p exptypes.Platforms
@ -70,13 +71,13 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool
}
refs := make([]cache.ImmutableRef, 0, len(inp.Refs))
layersMap := make(map[string]int, len(inp.Refs))
remotesMap := make(map[string]int, len(inp.Refs))
for id, r := range inp.Refs {
layersMap[id] = len(refs)
remotesMap[id] = len(refs)
refs = append(refs, r)
}
layers, err := ic.exportLayers(ctx, compression, refs...)
remotes, err := ic.exportLayers(ctx, compressionType, refs...)
if err != nil {
return nil, err
}
@ -109,7 +110,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool
}
config := inp.Metadata[fmt.Sprintf("%s/%s", exptypes.ExporterImageConfigKey, p.ID)]
desc, err := ic.commitDistributionManifest(ctx, r, config, layers[layersMap[p.ID]], oci, inp.Metadata[fmt.Sprintf("%s/%s", exptypes.ExporterInlineCache, p.ID)])
desc, err := ic.commitDistributionManifest(ctx, r, config, &remotes[remotesMap[p.ID]], oci, inp.Metadata[fmt.Sprintf("%s/%s", exptypes.ExporterInlineCache, p.ID)])
if err != nil {
return nil, err
}
@ -141,20 +142,23 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool
return &idxDesc, nil
}
func (ic *ImageWriter) exportLayers(ctx context.Context, compression blobs.CompressionType, refs ...cache.ImmutableRef) ([][]blobs.DiffPair, error) {
func (ic *ImageWriter) exportLayers(ctx context.Context, compressionType compression.Type, refs ...cache.ImmutableRef) ([]solver.Remote, error) {
eg, ctx := errgroup.WithContext(ctx)
layersDone := oneOffProgress(ctx, "exporting layers")
out := make([][]blobs.DiffPair, len(refs))
out := make([]solver.Remote, len(refs))
for i, ref := range refs {
func(i int, ref cache.ImmutableRef) {
eg.Go(func() error {
diffPairs, err := blobs.GetDiffPairs(ctx, ic.opt.ContentStore, ic.opt.Differ, ref, true, compression)
if err != nil {
return errors.Wrap(err, "failed calculating diff pairs for exported snapshot")
if ref == nil {
return
}
out[i] = diffPairs
eg.Go(func() error {
remote, err := ref.GetRemote(ctx, true, compressionType)
if err != nil {
return err
}
out[i] = *remote
return nil
})
}(i, ref)
@ -167,7 +171,7 @@ func (ic *ImageWriter) exportLayers(ctx context.Context, compression blobs.Compr
return out, nil
}
func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, ref cache.ImmutableRef, config []byte, layers []blobs.DiffPair, oci bool, inlineCache []byte) (*ocispec.Descriptor, error) {
func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, ref cache.ImmutableRef, config []byte, remote *solver.Remote, oci bool, inlineCache []byte) (*ocispec.Descriptor, error) {
if len(config) == 0 {
var err error
config, err = emptyImageConfig()
@ -176,14 +180,20 @@ func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, ref cache
}
}
if remote == nil {
remote = &solver.Remote{
Provider: ic.opt.ContentStore,
}
}
history, err := parseHistoryFromConfig(config)
if err != nil {
return nil, err
}
diffPairs, history := normalizeLayersAndHistory(layers, history, ref)
remote, history = normalizeLayersAndHistory(remote, history, ref, oci)
config, err = patchImageConfig(config, diffPairs, history, inlineCache)
config, err = patchImageConfig(config, remote.Descriptors, history, inlineCache)
if err != nil {
return nil, err
}
@ -224,33 +234,9 @@ func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, ref cache
"containerd.io/gc.ref.content.0": configDigest.String(),
}
layerMediaTypes := blobs.GetMediaTypeForLayers(diffPairs, ref)
cs := ic.opt.ContentStore
for i, dp := range diffPairs {
info, err := cs.Info(ctx, dp.Blobsum)
if err != nil {
return nil, errors.Wrapf(err, "could not find blob %s from contentstore", dp.Blobsum)
}
layerType := blobs.ConvertLayerMediaType(layerMediaTypes[i], oci)
// NOTE: The media type might be missing for some migrated ones
// from before lease based storage. If so, we should detect
// the media type from blob data.
//
// Discussion: https://github.com/moby/buildkit/pull/1277#discussion_r352795429
if layerType == "" {
layerType, err = blobs.DetectLayerMediaType(ctx, cs, dp.Blobsum, oci)
if err != nil {
return nil, err
}
}
mfst.Layers = append(mfst.Layers, ocispec.Descriptor{
Digest: dp.Blobsum,
Size: info.Size,
MediaType: layerType,
})
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = dp.Blobsum.String()
for i, desc := range remote.Descriptors {
mfst.Layers = append(mfst.Layers, desc)
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = desc.Digest.String()
}
mfstJSON, err := json.MarshalIndent(mfst, "", " ")
@ -335,7 +321,7 @@ func parseHistoryFromConfig(dt []byte) ([]ocispec.History, error) {
return config.History, nil
}
func patchImageConfig(dt []byte, dps []blobs.DiffPair, history []ocispec.History, cache []byte) ([]byte, error) {
func patchImageConfig(dt []byte, descs []ocispec.Descriptor, history []ocispec.History, cache []byte) ([]byte, error) {
m := map[string]json.RawMessage{}
if err := json.Unmarshal(dt, &m); err != nil {
return nil, errors.Wrap(err, "failed to parse image config for patch")
@ -343,8 +329,8 @@ func patchImageConfig(dt []byte, dps []blobs.DiffPair, history []ocispec.History
var rootFS ocispec.RootFS
rootFS.Type = "layers"
for _, dp := range dps {
rootFS.DiffIDs = append(rootFS.DiffIDs, dp.DiffID)
for _, desc := range descs {
rootFS.DiffIDs = append(rootFS.DiffIDs, digest.Digest(desc.Annotations["containerd.io/uncompressed"]))
}
dt, err := json.Marshal(rootFS)
if err != nil {
@ -384,9 +370,9 @@ func patchImageConfig(dt []byte, dps []blobs.DiffPair, history []ocispec.History
return dt, errors.Wrap(err, "failed to marshal config after patch")
}
func normalizeLayersAndHistory(diffs []blobs.DiffPair, history []ocispec.History, ref cache.ImmutableRef) ([]blobs.DiffPair, []ocispec.History) {
func normalizeLayersAndHistory(remote *solver.Remote, history []ocispec.History, ref cache.ImmutableRef, oci bool) (*solver.Remote, []ocispec.History) {
refMeta := getRefMetadata(ref, len(diffs))
refMeta := getRefMetadata(ref, len(remote.Descriptors))
var historyLayers int
for _, h := range history {
@ -395,14 +381,14 @@ func normalizeLayersAndHistory(diffs []blobs.DiffPair, history []ocispec.History
}
}
if historyLayers > len(diffs) {
if historyLayers > len(remote.Descriptors) {
// this case shouldn't happen but if it does force set history layers empty
// from the bottom
logrus.Warn("invalid image config with unaccounted layers")
historyCopy := make([]ocispec.History, 0, len(history))
var l int
for _, h := range history {
if l >= len(diffs) {
if l >= len(remote.Descriptors) {
h.EmptyLayer = true
}
if !h.EmptyLayer {
@ -413,11 +399,11 @@ func normalizeLayersAndHistory(diffs []blobs.DiffPair, history []ocispec.History
history = historyCopy
}
if len(diffs) > historyLayers {
if len(remote.Descriptors) > historyLayers {
// some history items are missing. add them based on the ref metadata
for _, md := range refMeta[historyLayers:] {
history = append(history, ocispec.History{
Created: &md.createdAt,
Created: md.createdAt,
CreatedBy: md.description,
Comment: "buildkit.exporter.image.v0",
})
@ -428,11 +414,11 @@ func normalizeLayersAndHistory(diffs []blobs.DiffPair, history []ocispec.History
for i, h := range history {
if !h.EmptyLayer {
if h.Created == nil {
h.Created = &refMeta[layerIndex].createdAt
h.Created = refMeta[layerIndex].createdAt
}
if diffs[layerIndex].Blobsum == emptyGZLayer {
if remote.Descriptors[layerIndex].Digest == emptyGZLayer {
h.EmptyLayer = true
diffs = append(diffs[:layerIndex], diffs[layerIndex+1:]...)
remote.Descriptors = append(remote.Descriptors[:layerIndex], remote.Descriptors[layerIndex+1:]...)
} else {
layerIndex++
}
@ -471,21 +457,25 @@ func normalizeLayersAndHistory(diffs []blobs.DiffPair, history []ocispec.History
history[i] = h
}
return diffs, history
// convert between oci and docker media types (or vice versa) if needed
remote.Descriptors = compression.ConvertAllLayerMediaTypes(oci, remote.Descriptors...)
return remote, history
}
type refMetadata struct {
description string
createdAt time.Time
createdAt *time.Time
}
func getRefMetadata(ref cache.ImmutableRef, limit int) []refMetadata {
if limit <= 0 {
return nil
}
now := time.Now()
meta := refMetadata{
description: "created by buildkit", // shouldn't be shown but don't fail build
createdAt: time.Now(),
createdAt: &now,
}
if ref == nil {
return append(getRefMetadata(nil, limit-1), meta)
@ -493,7 +483,8 @@ func getRefMetadata(ref cache.ImmutableRef, limit int) []refMetadata {
if descr := cache.GetDescription(ref.Metadata()); descr != "" {
meta.description = descr
}
meta.createdAt = cache.GetCreatedAt(ref.Metadata())
createdAt := cache.GetCreatedAt(ref.Metadata())
meta.createdAt = &createdAt
p := ref.Parent()
if p != nil {
defer p.Release(context.TODO())

View File

@ -9,11 +9,13 @@ import (
archiveexporter "github.com/containerd/containerd/images/archive"
"github.com/containerd/containerd/leases"
"github.com/docker/distribution/reference"
"github.com/moby/buildkit/cache/blobs"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/exporter/containerimage"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/progress"
@ -52,7 +54,7 @@ func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exp
var ot *bool
i := &imageExporterInstance{
imageExporter: e,
layerCompression: blobs.DefaultCompression,
layerCompression: compression.Default,
}
for k, v := range opt {
switch k {
@ -61,9 +63,9 @@ func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exp
case keyLayerCompression:
switch v {
case "gzip":
i.layerCompression = blobs.Gzip
i.layerCompression = compression.Gzip
case "uncompressed":
i.layerCompression = blobs.Uncompressed
i.layerCompression = compression.Uncompressed
default:
return nil, errors.Errorf("unsupported layer compression type: %v", v)
}
@ -98,7 +100,7 @@ type imageExporterInstance struct {
meta map[string][]byte
name string
ociTypes bool
layerCompression blobs.CompressionType
layerCompression compression.Type
}
func (e *imageExporterInstance) Name() string {
@ -172,8 +174,43 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
if err != nil {
return nil, err
}
mprovider := contentutil.NewMultiProvider(e.opt.ImageWriter.ContentStore())
if src.Ref != nil {
remote, err := src.Ref.GetRemote(ctx, false, e.layerCompression)
if err != nil {
return nil, err
}
// unlazy before tar export as the tar writer does not handle
// layer blobs in parallel (whereas unlazy does)
if unlazier, ok := remote.Provider.(cache.Unlazier); ok {
if err := unlazier.Unlazy(ctx); err != nil {
return nil, err
}
}
for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider)
}
}
if len(src.Refs) > 0 {
for _, r := range src.Refs {
remote, err := r.GetRemote(ctx, false, e.layerCompression)
if err != nil {
return nil, err
}
if unlazier, ok := remote.Provider.(cache.Unlazier); ok {
if err := unlazier.Unlazy(ctx); err != nil {
return nil, err
}
}
for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider)
}
}
}
report := oneOffProgress(ctx, "sending tarball")
if err := archiveexporter.Export(ctx, e.opt.ImageWriter.ContentStore(), w, expOpts...); err != nil {
if err := archiveexporter.Export(ctx, mprovider, w, expOpts...); err != nil {
w.Close()
if grpcerrors.Code(err) == codes.AlreadyExists {
return resp, report(nil)

88
solver/cacheopts.go Normal file
View File

@ -0,0 +1,88 @@
package solver
import (
"context"
digest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
)
type CacheOpts map[interface{}]interface{}
type cacheOptGetterKey struct{}
func CacheOptGetterOf(ctx context.Context) func(keys ...interface{}) map[interface{}]interface{} {
if v := ctx.Value(cacheOptGetterKey{}); v != nil {
if getter, ok := v.(func(keys ...interface{}) map[interface{}]interface{}); ok {
return getter
}
}
return nil
}
func withAncestorCacheOpts(ctx context.Context, start *state) context.Context {
return context.WithValue(ctx, cacheOptGetterKey{}, func(keys ...interface{}) map[interface{}]interface{} {
keySet := make(map[interface{}]struct{})
for _, k := range keys {
keySet[k] = struct{}{}
}
values := make(map[interface{}]interface{})
walkAncestors(start, func(st *state) bool {
if st.clientVertex.Error != "" {
// don't use values from cancelled or otherwise error'd vertexes
return false
}
for _, res := range st.op.cacheRes {
if res.Opts == nil {
continue
}
for k := range keySet {
if v, ok := res.Opts[k]; ok {
values[k] = v
delete(keySet, k)
if len(keySet) == 0 {
return true
}
}
}
}
return false
})
return values
})
}
func walkAncestors(start *state, f func(*state) bool) {
stack := [][]*state{{start}}
cache := make(map[digest.Digest]struct{})
for len(stack) > 0 {
sts := stack[len(stack)-1]
if len(sts) == 0 {
stack = stack[:len(stack)-1]
continue
}
st := sts[len(sts)-1]
stack[len(stack)-1] = sts[:len(sts)-1]
if st == nil {
continue
}
if _, ok := cache[st.origDigest]; ok {
continue
}
cache[st.origDigest] = struct{}{}
if shouldStop := f(st); shouldStop {
return
}
stack = append(stack, []*state{})
for _, parentDgst := range st.clientVertex.Inputs {
st.solver.mu.RLock()
parent := st.solver.actives[parentDgst]
st.solver.mu.RUnlock()
if parent == nil {
logrus.Warnf("parent %q not found in active job list during cache opt search", parentDgst)
continue
}
stack[len(stack)-1] = append(stack[len(stack)-1], parent)
}
}
}

View File

@ -865,6 +865,7 @@ func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
res, err := e.op.LoadCache(ctx, rec)
if err != nil {
logrus.Debugf("load cache for %s err: %v", e.edge.Vertex.Name(), err)
return nil, errors.Wrap(err, "failed to load cache")
}

View File

@ -596,7 +596,7 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "load cache: "+s.st.vtx.Name())
notifyStarted(ctx, &s.st.clientVertex, true)
res, err := s.Cache().Load(ctx, rec)
res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s.st), rec)
tracing.FinishWithError(span, err)
notifyCompleted(ctx, &s.st.clientVertex, err, true)
return res, err
@ -619,7 +619,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBased
}
s.slowMu.Unlock()
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
key, err := f(ctx, res)
key, err := f(withAncestorCacheOpts(ctx, s.st), res)
complete := true
if err != nil {
select {
@ -666,6 +666,7 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp,
return nil, s.cacheErr
}
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
ctx = withAncestorCacheOpts(ctx, s.st)
if len(s.st.vtx.Inputs()) == 0 {
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name())
@ -721,6 +722,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
}
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
ctx = withAncestorCacheOpts(ctx, s.st)
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name())

View File

@ -24,9 +24,10 @@ type sourceOp struct {
src source.SourceInstance
sessM *session.Manager
w worker.Worker
vtx solver.Vertex
}
func NewSourceOp(_ solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, sessM *session.Manager, w worker.Worker) (solver.Op, error) {
func NewSourceOp(vtx solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, sessM *session.Manager, w worker.Worker) (solver.Op, error) {
if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil {
return nil, err
}
@ -36,6 +37,7 @@ func NewSourceOp(_ solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *s
w: w,
sessM: sessM,
platform: platform,
vtx: vtx,
}, nil
}
@ -49,7 +51,7 @@ func (s *sourceOp) instance(ctx context.Context) (source.SourceInstance, error)
if err != nil {
return nil, err
}
src, err := s.sm.Resolve(ctx, id, s.sessM)
src, err := s.sm.Resolve(ctx, id, s.sessM, s.vtx)
if err != nil {
return nil, err
}
@ -62,7 +64,7 @@ func (s *sourceOp) CacheMap(ctx context.Context, g session.Group, index int) (*s
if err != nil {
return nil, false, err
}
k, done, err := src.CacheKey(ctx, g, index)
k, cacheOpts, done, err := src.CacheKey(ctx, g, index)
if err != nil {
return nil, false, err
}
@ -76,6 +78,7 @@ func (s *sourceOp) CacheMap(ctx context.Context, g session.Group, index int) (*s
return &solver.CacheMap{
// TODO: add os/arch
Digest: dgst,
Opts: cacheOpts,
}, done, nil
}

View File

@ -7,6 +7,7 @@ import (
"github.com/moby/buildkit/cache/contenthash"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
@ -70,5 +71,5 @@ func workerRefConverter(ctx context.Context, res solver.Result) (*solver.Remote,
return nil, errors.Errorf("invalid result: %T", res.Sys())
}
return ref.Worker.GetRemote(ctx, ref.ImmutableRef, true)
return ref.ImmutableRef.GetRemote(ctx, true, compression.Default)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/moby/buildkit/frontend/gateway"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/entitlements"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/worker"
@ -267,7 +268,7 @@ func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedR
return nil, errors.Errorf("invalid reference: %T", res.Sys())
}
remote, err := workerRef.Worker.GetRemote(ctx, workerRef.ImmutableRef, true)
remote, err := workerRef.ImmutableRef.GetRemote(ctx, true, compression.Default)
if err != nil || remote == nil {
return nil, nil
}

View File

@ -172,6 +172,12 @@ type CacheMap struct {
// the checksum of file contents from input snapshots.
ComputeDigestFunc ResultBasedCacheFunc
}
// Opts specifies generic options that will be passed to cache load calls if/when
// the key associated with this CacheMap is used to load a ref. It allows options
// such as oci descriptor content providers and progress writers to be passed to
// the cache. Opts should not have any impact on the computed cache key.
Opts CacheOpts
}
// ExportableCacheKey is a cache key connected with an exporter that can export

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"runtime"
"sync"
"time"
"github.com/containerd/containerd/content"
@ -12,19 +13,20 @@ import (
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/docker/docker/errdefs"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/controller"
"github.com/moby/buildkit/util/pull"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/winlayers"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
specs "github.com/opencontainers/image-spec/specs-go/v1"
@ -98,7 +100,7 @@ func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.Re
return typed.dgst, typed.dt, nil
}
func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, vtx solver.Vertex) (source.SourceInstance, error) {
imageIdentifier, ok := id.(*source.ImageIdentifier)
if !ok {
return nil, errors.Errorf("invalid image identifier %v", id)
@ -110,18 +112,15 @@ func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session
}
pullerUtil := &pull.Puller{
Snapshotter: is.Snapshotter,
ContentStore: is.ContentStore,
Applier: is.Applier,
Src: imageIdentifier.Reference,
Platform: &platform,
Src: imageIdentifier.Reference,
}
p := &puller{
CacheAccessor: is.CacheAccessor,
Puller: pullerUtil,
Platform: platform,
id: imageIdentifier,
LeaseManager: is.LeaseManager,
Puller: pullerUtil,
id: imageIdentifier,
ResolverOpt: pull.ResolverOpt{
Hosts: is.RegistryHosts,
Auth: resolver.NewSessionAuthenticator(sm, nil),
@ -129,6 +128,7 @@ func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session
Mode: imageIdentifier.ResolveMode,
Ref: imageIdentifier.Reference.String(),
},
vtx: vtx,
}
return p, nil
}
@ -136,69 +136,125 @@ func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session
type puller struct {
CacheAccessor cache.Accessor
LeaseManager leases.Manager
Platform specs.Platform
ResolverOpt pull.ResolverOpt
id *source.ImageIdentifier
vtx solver.Vertex
cacheKeyOnce sync.Once
cacheKeyErr error
releaseTmpLeases func(context.Context) error
descHandlers cache.DescHandlers
manifest *pull.PulledManifests
manifestKey string
configKey string
*pull.Puller
}
func mainManifestKey(ctx context.Context, desc specs.Descriptor, platform specs.Platform) (digest.Digest, error) {
dt, err := json.Marshal(struct {
func mainManifestKey(ctx context.Context, desc specs.Descriptor, platform *specs.Platform) (digest.Digest, error) {
keyStruct := struct {
Digest digest.Digest
OS string
Arch string
Variant string `json:",omitempty"`
}{
Digest: desc.Digest,
OS: platform.OS,
Arch: platform.Architecture,
Variant: platform.Variant,
})
}
if platform != nil {
keyStruct.OS = platform.OS
keyStruct.Arch = platform.Architecture
keyStruct.Variant = platform.Variant
}
dt, err := json.Marshal(keyStruct)
if err != nil {
return "", err
}
return digest.FromBytes(dt), nil
}
func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cacheKey string, cacheOpts solver.CacheOpts, cacheDone bool, err error) {
if p.Puller.Resolver == nil {
p.Puller.Resolver = pull.NewResolver(g, p.ResolverOpt)
} else {
p.ResolverOpt.Auth.AddSession(g)
}
_, desc, err := p.Puller.Resolve(ctx)
p.cacheKeyOnce.Do(func() {
ctx, done, err := leaseutil.WithLease(ctx, p.LeaseManager, leases.WithExpiration(5*time.Minute), leaseutil.MakeTemporary)
if err != nil {
return "", false, err
p.cacheKeyErr = err
return
}
if index == 0 || desc.Digest == "" {
k, err := mainManifestKey(ctx, desc, p.Platform)
p.releaseTmpLeases = done
imageutil.AddLease(p.releaseTmpLeases)
defer func() {
if p.cacheKeyErr != nil {
p.releaseTmpLeases(ctx)
}
}()
resolveProgressDone := oneOffProgress(ctx, "resolve "+p.Src.String())
defer func() {
resolveProgressDone(err)
}()
p.manifest, err = p.PullManifests(ctx)
if err != nil {
return "", false, err
}
return k.String(), false, nil
}
ref, err := reference.ParseNormalizedNamed(p.Src.String())
if err != nil {
return "", false, err
}
ref, err = reference.WithDigest(ref, desc.Digest)
if err != nil {
return "", false, nil
}
_, dt, err := imageutil.Config(ctx, ref.String(), p.Resolver, p.ContentStore, p.LeaseManager, &p.Platform)
if err != nil {
return "", false, err
p.cacheKeyErr = err
return
}
k := cacheKeyFromConfig(dt).String()
if k == "" {
if len(p.manifest.Remote.Descriptors) > 0 {
pw, _, _ := progress.FromContext(ctx)
progressController := &controller.Controller{
Writer: pw,
}
if p.vtx != nil {
progressController.Digest = p.vtx.Digest()
progressController.Name = p.vtx.Name()
}
descHandler := &cache.DescHandler{
Provider: p.manifest.Remote.Provider,
ImageRef: p.manifest.Ref,
Progress: progressController,
}
p.descHandlers = cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
for _, desc := range p.manifest.Remote.Descriptors {
p.descHandlers[desc.Digest] = descHandler
}
}
desc := p.manifest.MainManifestDesc
k, err := mainManifestKey(ctx, desc, p.Platform)
if err != nil {
return "", false, err
p.cacheKeyErr = err
return
}
return k.String(), true, nil
p.manifestKey = k.String()
dt, err := content.ReadBlob(ctx, p.ContentStore, p.manifest.ConfigDesc)
if err != nil {
p.cacheKeyErr = err
return
}
return k, true, nil
p.configKey = cacheKeyFromConfig(dt).String()
})
if p.cacheKeyErr != nil {
return "", nil, false, p.cacheKeyErr
}
cacheOpts = solver.CacheOpts(make(map[interface{}]interface{}))
for dgst, descHandler := range p.descHandlers {
cacheOpts[cache.DescHandlerKey(dgst)] = descHandler
}
cacheDone = index > 0
if index == 0 || p.configKey == "" {
return p.manifestKey, cacheOpts, cacheDone, nil
}
return p.configKey, cacheOpts, cacheDone, nil
}
func (p *puller) Snapshot(ctx context.Context, g session.Group) (ir cache.ImmutableRef, err error) {
@ -208,55 +264,46 @@ func (p *puller) Snapshot(ctx context.Context, g session.Group) (ir cache.Immuta
p.ResolverOpt.Auth.AddSession(g)
}
layerNeedsTypeWindows := false
if platform := p.Puller.Platform; platform != nil {
if platform.OS == "windows" && runtime.GOOS != "windows" {
ctx = winlayers.UseWindowsLayerMode(ctx)
layerNeedsTypeWindows = true
if len(p.manifest.Remote.Descriptors) == 0 {
return nil, nil
}
defer p.releaseTmpLeases(ctx)
var current cache.ImmutableRef
defer func() {
if err != nil && current != nil {
current.Release(context.TODO())
}
}()
var parent cache.ImmutableRef
for _, layerDesc := range p.manifest.Remote.Descriptors {
parent = current
current, err = p.CacheAccessor.GetByBlob(ctx, layerDesc, parent, p.descHandlers)
if parent != nil {
parent.Release(context.TODO())
}
if err != nil {
return nil, err
}
}
// workaround for gcr, authentication not supported on blob endpoints
pull.EnsureManifestRequested(ctx, p.Puller.Resolver, p.Puller.Src.String())
for _, desc := range p.manifest.Nonlayers {
if _, err := p.ContentStore.Info(ctx, desc.Digest); errdefs.IsNotFound(err) {
// manifest or config must have gotten gc'd after CacheKey, re-pull them
ctx, done, err := leaseutil.WithLease(ctx, p.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
}
defer done(ctx)
pulled, err := p.Puller.Pull(ctx)
if err != nil {
if _, err := p.PullManifests(ctx); err != nil {
return nil, err
}
if len(pulled.Layers) == 0 {
return nil, nil
} else if err != nil {
return nil, err
}
extractDone := oneOffProgress(ctx, "unpacking "+pulled.Ref)
var current cache.ImmutableRef
defer func() {
if err != nil && current != nil {
current.Release(context.TODO())
}
extractDone(err)
}()
for _, l := range pulled.Layers {
ref, err := p.CacheAccessor.GetByBlob(ctx, l, current, cache.WithDescription("pulled from "+pulled.Ref))
if err != nil {
return nil, err
}
if err := ref.Extract(ctx); err != nil {
ref.Release(context.TODO())
return nil, err
}
if current != nil {
current.Release(context.TODO())
}
current = ref
}
for _, desc := range pulled.MetadataBlobs {
if err := p.LeaseManager.AddResource(ctx, leases.Lease{ID: current.ID()}, leases.Resource{
ID: desc.Digest.String(),
Type: "content",
@ -265,7 +312,7 @@ func (p *puller) Snapshot(ctx context.Context, g session.Group) (ir cache.Immuta
}
}
if layerNeedsTypeWindows && current != nil {
if current != nil && p.Platform != nil && p.Platform.OS == "windows" && runtime.GOOS != "windows" {
if err := markRefLayerTypeWindows(current); err != nil {
return nil, err
}

View File

@ -21,6 +21,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/secrets"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/progress/logs"
"github.com/pkg/errors"
@ -166,7 +167,7 @@ func (gs *gitSourceHandler) shaToCacheKey(sha string) string {
return key
}
func (gs *gitSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
func (gs *gitSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, _ solver.Vertex) (source.SourceInstance, error) {
gitIdentifier, ok := id.(*source.GitIdentifier)
if !ok {
return nil, errors.Errorf("invalid git identifier %v", id)
@ -231,7 +232,7 @@ func (gs *gitSourceHandler) getAuthToken(ctx context.Context, g session.Group) e
})
}
func (gs *gitSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
func (gs *gitSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, solver.CacheOpts, bool, error) {
remote := gs.src.Remote
ref := gs.src.Ref
if ref == "" {
@ -243,14 +244,14 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context, g session.Group, index
if isCommitSHA(ref) {
ref = gs.shaToCacheKey(ref)
gs.cacheKey = ref
return ref, true, nil
return ref, nil, true, nil
}
gs.getAuthToken(ctx, g)
gitDir, unmountGitDir, err := gs.mountRemote(ctx, remote, gs.auth)
if err != nil {
return "", false, err
return "", nil, false, err
}
defer unmountGitDir()
@ -258,21 +259,21 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context, g session.Group, index
buf, err := gitWithinDir(ctx, gitDir, "", gs.auth, "ls-remote", "origin", ref)
if err != nil {
return "", false, errors.Wrapf(err, "failed to fetch remote %s", remote)
return "", nil, false, errors.Wrapf(err, "failed to fetch remote %s", remote)
}
out := buf.String()
idx := strings.Index(out, "\t")
if idx == -1 {
return "", false, errors.Errorf("repository does not contain ref %s, output: %q", ref, string(out))
return "", nil, false, errors.Errorf("repository does not contain ref %s, output: %q", ref, string(out))
}
sha := string(out[:idx])
if !isCommitSHA(sha) {
return "", false, errors.Errorf("invalid commit sha %q", sha)
return "", nil, false, errors.Errorf("invalid commit sha %q", sha)
}
sha = gs.shaToCacheKey(sha)
gs.cacheKey = sha
return sha, true, nil
return sha, nil, true, nil
}
func (gs *gitSourceHandler) Snapshot(ctx context.Context, g session.Group) (out cache.ImmutableRef, retErr error) {
@ -284,7 +285,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context, g session.Group) (out
cacheKey := gs.cacheKey
if cacheKey == "" {
var err error
cacheKey, _, err = gs.CacheKey(ctx, g, 0)
cacheKey, _, _, err = gs.CacheKey(ctx, g, 0)
if err != nil {
return nil, err
}

View File

@ -57,10 +57,10 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
id := &source.GitIdentifier{Remote: repodir, KeepGitDir: keepGitDir}
g, err := gs.Resolve(ctx, id, nil)
g, err := gs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
key1, done, err := g.CacheKey(ctx, nil, 0)
key1, _, done, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.True(t, done)
@ -99,10 +99,10 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
// second fetch returns same dir
id = &source.GitIdentifier{Remote: repodir, Ref: "master", KeepGitDir: keepGitDir}
g, err = gs.Resolve(ctx, id, nil)
g, err = gs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
key2, _, err := g.CacheKey(ctx, nil, 0)
key2, _, _, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, key1, key2)
@ -115,10 +115,10 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
id = &source.GitIdentifier{Remote: repodir, Ref: "feature", KeepGitDir: keepGitDir}
g, err = gs.Resolve(ctx, id, nil)
g, err = gs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
key3, _, err := g.CacheKey(ctx, nil, 0)
key3, _, _, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.NotEqual(t, key1, key3)
@ -184,10 +184,10 @@ func testFetchBySHA(t *testing.T, keepGitDir bool) {
id := &source.GitIdentifier{Remote: repodir, Ref: sha, KeepGitDir: keepGitDir}
g, err := gs.Resolve(ctx, id, nil)
g, err := gs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
key1, done, err := g.CacheKey(ctx, nil, 0)
key1, _, done, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.True(t, done)
@ -267,10 +267,10 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) {
id := &source.GitIdentifier{Remote: repodir, KeepGitDir: keepGitDir}
id2 := &source.GitIdentifier{Remote: repodir2, KeepGitDir: keepGitDir}
g, err := gs.Resolve(ctx, id, nil)
g, err := gs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
g2, err := gs.Resolve(ctx, id2, nil)
g2, err := gs.Resolve(ctx, id2, nil, nil)
require.NoError(t, err)
expLen := 40
@ -278,11 +278,11 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) {
expLen += 4
}
key1, _, err := g.CacheKey(ctx, nil, 0)
key1, _, _, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expLen, len(key1))
key2, _, err := g2.CacheKey(ctx, nil, 0)
key2, _, _, err := g2.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expLen, len(key2))

View File

@ -21,6 +21,7 @@ import (
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/tracing"
digest "github.com/opencontainers/go-digest"
@ -67,7 +68,7 @@ type httpSourceHandler struct {
sm *session.Manager
}
func (hs *httpSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
func (hs *httpSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, _ solver.Vertex) (source.SourceInstance, error) {
httpIdentifier, ok := id.(*source.HTTPIdentifier)
if !ok {
return nil, errors.Errorf("invalid http identifier %v", id)
@ -122,26 +123,26 @@ func (hs *httpSourceHandler) formatCacheKey(filename string, dgst digest.Digest,
return digest.FromBytes(dt)
}
func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, solver.CacheOpts, bool, error) {
if hs.src.Checksum != "" {
hs.cacheKey = hs.src.Checksum
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, nil), hs.src.Checksum, "").String(), true, nil
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, nil), hs.src.Checksum, "").String(), nil, true, nil
}
uh, err := hs.urlHash()
if err != nil {
return "", false, nil
return "", nil, false, nil
}
// look up metadata(previously stored headers) for that URL
sis, err := hs.md.Search(uh.String())
if err != nil {
return "", false, errors.Wrapf(err, "failed to search metadata for %s", uh)
return "", nil, false, errors.Wrapf(err, "failed to search metadata for %s", uh)
}
req, err := http.NewRequest("GET", hs.src.URL, nil)
if err != nil {
return "", false, err
return "", nil, false, err
}
req = req.WithContext(ctx)
m := map[string]*metadata.StorageItem{}
@ -198,7 +199,7 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, inde
if dgst != "" {
modTime := getModTime(si)
resp.Body.Close()
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, modTime).String(), true, nil
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, modTime).String(), nil, true, nil
}
}
}
@ -209,10 +210,10 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, inde
resp, err := client.Do(req)
if err != nil {
return "", false, err
return "", nil, false, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return "", false, errors.Errorf("invalid response status %d", resp.StatusCode)
return "", nil, false, errors.Errorf("invalid response status %d", resp.StatusCode)
}
if resp.StatusCode == http.StatusNotModified {
respETag := resp.Header.Get("ETag")
@ -225,27 +226,27 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, inde
}
si, ok := m[respETag]
if !ok {
return "", false, errors.Errorf("invalid not-modified ETag: %v", respETag)
return "", nil, false, errors.Errorf("invalid not-modified ETag: %v", respETag)
}
hs.refID = si.ID()
dgst := getChecksum(si)
if dgst == "" {
return "", false, errors.Errorf("invalid metadata change")
return "", nil, false, errors.Errorf("invalid metadata change")
}
modTime := getModTime(si)
resp.Body.Close()
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, modTime).String(), true, nil
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, modTime).String(), nil, true, nil
}
ref, dgst, err := hs.save(ctx, resp)
if err != nil {
return "", false, err
return "", nil, false, err
}
ref.Release(context.TODO())
hs.cacheKey = dgst
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, resp.Header.Get("Last-Modified")).String(), true, nil
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, resp.Header.Get("Last-Modified")).String(), nil, true, nil
}
func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response) (ref cache.ImmutableRef, dgst digest.Digest, retErr error) {

View File

@ -51,10 +51,10 @@ func TestHTTPSource(t *testing.T) {
id := &source.HTTPIdentifier{URL: server.URL + "/foo"}
h, err := hs.Resolve(ctx, id, nil)
h, err := hs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
k, _, err := h.CacheKey(ctx, nil, 0)
k, _, _, err := h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
expectedContent1 := "sha256:0b1a154faa3003c1fbe7fda9c8a42d55fde2df2a2c405c32038f8ac7ed6b044a"
@ -80,10 +80,10 @@ func TestHTTPSource(t *testing.T) {
ref = nil
// repeat, should use the etag
h, err = hs.Resolve(ctx, id, nil)
h, err = hs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
k, _, err = h.CacheKey(ctx, nil, 0)
k, _, _, err = h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expectedContent1, k)
@ -116,10 +116,10 @@ func TestHTTPSource(t *testing.T) {
// update etag, downloads again
server.SetRoute("/foo", resp2)
h, err = hs.Resolve(ctx, id, nil)
h, err = hs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
k, _, err = h.CacheKey(ctx, nil, 0)
k, _, _, err = h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expectedContent2, k)
@ -169,10 +169,10 @@ func TestHTTPDefaultName(t *testing.T) {
id := &source.HTTPIdentifier{URL: server.URL}
h, err := hs.Resolve(ctx, id, nil)
h, err := hs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
k, _, err := h.CacheKey(ctx, nil, 0)
k, _, _, err := h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, "sha256:146f16ec8810a62a57ce314aba391f95f7eaaf41b8b1ebaf2ab65fd63b1ad437", k)
@ -212,10 +212,10 @@ func TestHTTPInvalidURL(t *testing.T) {
id := &source.HTTPIdentifier{URL: server.URL + "/foo"}
h, err := hs.Resolve(ctx, id, nil)
h, err := hs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
_, _, err = h.CacheKey(ctx, nil, 0)
_, _, _, err = h.CacheKey(ctx, nil, 0)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid response")
}
@ -246,10 +246,10 @@ func TestHTTPChecksum(t *testing.T) {
id := &source.HTTPIdentifier{URL: server.URL + "/foo", Checksum: digest.FromBytes([]byte("content-different"))}
h, err := hs.Resolve(ctx, id, nil)
h, err := hs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
k, _, err := h.CacheKey(ctx, nil, 0)
k, _, _, err := h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
expectedContentDifferent := "sha256:f25996f463dca69cffb580f8273ffacdda43332b5f0a8bea2ead33900616d44b"
@ -268,10 +268,10 @@ func TestHTTPChecksum(t *testing.T) {
id = &source.HTTPIdentifier{URL: server.URL + "/foo", Checksum: digest.FromBytes([]byte("content-correct"))}
h, err = hs.Resolve(ctx, id, nil)
h, err = hs.Resolve(ctx, id, nil, nil)
require.NoError(t, err)
k, _, err = h.CacheKey(ctx, nil, 0)
k, _, _, err = h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expectedContentCorrect, k)

View File

@ -14,6 +14,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
@ -51,7 +52,7 @@ func (ls *localSource) ID() string {
return source.LocalScheme
}
func (ls *localSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
func (ls *localSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, _ solver.Vertex) (source.SourceInstance, error) {
localIdentifier, ok := id.(*source.LocalIdentifier)
if !ok {
return nil, errors.Errorf("invalid local identifier %v", id)
@ -70,13 +71,13 @@ type localSourceHandler struct {
*localSource
}
func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, solver.CacheOpts, bool, error) {
sessionID := ls.src.SessionID
if sessionID == "" {
id := g.SessionIterator().NextSession()
if id == "" {
return "", false, errors.New("could not access local files without session")
return "", nil, false, errors.New("could not access local files without session")
}
sessionID = id
}
@ -87,9 +88,9 @@ func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, ind
FollowPaths []string
}{SessionID: sessionID, IncludePatterns: ls.src.IncludePatterns, ExcludePatterns: ls.src.ExcludePatterns, FollowPaths: ls.src.FollowPaths})
if err != nil {
return "", false, err
return "", nil, false, err
}
return "session:" + ls.src.Name + ":" + digest.FromBytes(dt).String(), true, nil
return "session:" + ls.src.Name + ":" + digest.FromBytes(dt).String(), nil, true, nil
}
func (ls *localSourceHandler) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {

View File

@ -6,16 +6,17 @@ import (
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/pkg/errors"
)
type Source interface {
ID() string
Resolve(ctx context.Context, id Identifier, sm *session.Manager) (SourceInstance, error)
Resolve(ctx context.Context, id Identifier, sm *session.Manager, vtx solver.Vertex) (SourceInstance, error)
}
type SourceInstance interface {
CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error)
CacheKey(ctx context.Context, g session.Group, index int) (string, solver.CacheOpts, bool, error)
Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error)
}
@ -36,7 +37,7 @@ func (sm *Manager) Register(src Source) {
sm.mu.Unlock()
}
func (sm *Manager) Resolve(ctx context.Context, id Identifier, sessM *session.Manager) (SourceInstance, error) {
func (sm *Manager) Resolve(ctx context.Context, id Identifier, sessM *session.Manager, vtx solver.Vertex) (SourceInstance, error) {
sm.mu.Lock()
src, ok := sm.sources[id.ID()]
sm.mu.Unlock()
@ -45,5 +46,5 @@ func (sm *Manager) Resolve(ctx context.Context, id Identifier, sessM *session.Ma
return nil, errors.Errorf("no handler for %s", id.ID())
}
return src.Resolve(ctx, id, sessM)
return src.Resolve(ctx, id, sessM, vtx)
}

View File

@ -1,4 +1,4 @@
package blobs
package compression
import (
"bytes"
@ -7,30 +7,29 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/moby/buildkit/cache"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// CompressionType represents compression type for blob data.
type CompressionType int
// Type represents compression type for blob data.
type Type int
const (
// Uncompressed indicates no compression.
Uncompressed CompressionType = iota
Uncompressed Type = iota
// Gzip is used for blob data.
Gzip
// UnknownCompression means not supported yet.
UnknownCompression CompressionType = -1
UnknownCompression Type = -1
)
var DefaultCompression = Gzip
var Default = Gzip
func (ct CompressionType) String() string {
func (ct Type) String() string {
switch ct {
case Uncompressed:
return "uncompressed"
@ -41,7 +40,7 @@ func (ct CompressionType) String() string {
}
}
// DetectCompressionType returns media type from existing blob data.
// DetectLayerMediaType returns media type from existing blob data.
func DetectLayerMediaType(ctx context.Context, cs content.Store, id digest.Digest, oci bool) (string, error) {
ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: id})
if err != nil {
@ -71,7 +70,7 @@ func DetectLayerMediaType(ctx context.Context, cs content.Store, id digest.Diges
}
// detectCompressionType detects compression type from real blob data.
func detectCompressionType(cr io.Reader) (CompressionType, error) {
func detectCompressionType(cr io.Reader) (Type, error) {
var buf [10]byte
var n int
var err error
@ -86,7 +85,7 @@ func detectCompressionType(cr io.Reader) (CompressionType, error) {
return UnknownCompression, err
}
for c, m := range map[CompressionType][]byte{
for c, m := range map[Type][]byte{
Gzip: {0x1F, 0x8B, 0x08},
} {
if n < len(m) {
@ -99,41 +98,6 @@ func detectCompressionType(cr io.Reader) (CompressionType, error) {
return Uncompressed, nil
}
// GetMediaTypeForLayers retrieves media type for layer from ref information.
// If there is a mismatch in diff IDs or blobsums between the diffPairs and
// corresponding ref, the returned slice will have an empty media type for
// that layer and all parents.
func GetMediaTypeForLayers(diffPairs []DiffPair, ref cache.ImmutableRef) []string {
layerTypes := make([]string, len(diffPairs))
if ref == nil {
return layerTypes
}
tref := ref.Clone()
// diffPairs is ordered parent->child, but we iterate over refs from child->parent,
// so iterate over diffPairs in reverse
for i := range diffPairs {
dp := diffPairs[len(diffPairs)-1-i]
info := tref.Info()
if !(info.DiffID == dp.DiffID && info.Blob == dp.Blobsum) {
break
}
layerTypes[len(diffPairs)-1-i] = info.MediaType
parent := tref.Parent()
tref.Release(context.TODO())
tref = parent
if tref == nil {
break
}
}
if tref != nil {
tref.Release(context.TODO())
}
return layerTypes
}
var toDockerLayerType = map[string]string{
ocispec.MediaTypeImageLayer: images.MediaTypeDockerSchema2Layer,
images.MediaTypeDockerSchema2Layer: images.MediaTypeDockerSchema2Layer,
@ -148,7 +112,7 @@ var toOCILayerType = map[string]string{
images.MediaTypeDockerSchema2LayerGzip: ocispec.MediaTypeImageLayerGzip,
}
func ConvertLayerMediaType(mediaType string, oci bool) string {
func convertLayerMediaType(mediaType string, oci bool) string {
var converted string
if oci {
converted = toOCILayerType[mediaType]
@ -161,3 +125,12 @@ func ConvertLayerMediaType(mediaType string, oci bool) string {
}
return converted
}
func ConvertAllLayerMediaTypes(oci bool, descs ...ocispec.Descriptor) []ocispec.Descriptor {
var converted []ocispec.Descriptor
for _, desc := range descs {
desc.MediaType = convertLayerMediaType(desc.MediaType, oci)
converted = append(converted, desc)
}
return converted
}

View File

@ -36,6 +36,12 @@ func CancelCacheLeases() {
leasesMu.Unlock()
}
func AddLease(f func(context.Context) error) {
leasesMu.Lock()
leasesF = append(leasesF, f)
leasesMu.Unlock()
}
func Config(ctx context.Context, str string, resolver remotes.Resolver, cache ContentCache, leaseManager leases.Manager, p *specs.Platform) (digest.Digest, []byte, error) {
// TODO: fix buildkit to take interface instead of struct
var platform platforms.MatchComparer
@ -57,9 +63,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co
ctx = ctx2
defer func() {
// this lease is not deleted to allow other components to access manifest/config from cache. It will be deleted after 5 min deadline or on pruning inactive builder
leasesMu.Lock()
leasesF = append(leasesF, done)
leasesMu.Unlock()
AddLease(done)
}()
}

View File

@ -0,0 +1,72 @@
package controller
import (
"context"
"sync/atomic"
"time"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/progress"
"github.com/opencontainers/go-digest"
)
type Controller struct {
count int64
started *time.Time
Digest digest.Digest
Name string
Writer progress.Writer
}
var _ progress.Controller = &Controller{}
func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) {
if c.Digest == "" {
return progress.WithProgress(ctx, c.Writer), func(error) {}
}
if atomic.AddInt64(&c.count, 1) == 1 {
if c.started == nil {
now := time.Now()
c.started = &now
}
c.Writer.Write(c.Digest.String(), client.Vertex{
Digest: c.Digest,
Name: c.Name,
Started: c.started,
})
}
return progress.WithProgress(ctx, c.Writer), func(err error) {
if atomic.AddInt64(&c.count, -1) == 0 {
now := time.Now()
var errString string
if err != nil {
errString = err.Error()
}
c.Writer.Write(c.Digest.String(), client.Vertex{
Digest: c.Digest,
Name: c.Name,
Started: c.started,
Completed: &now,
Error: errString,
})
}
}
}
func (c *Controller) Status(id string, action string) func() {
start := time.Now()
c.Writer.Write(id, progress.Status{
Action: action,
Started: &start,
})
return func() {
complete := time.Now()
c.Writer.Write(id, progress.Status{
Action: action,
Started: &start,
Completed: &complete,
})
}
}

View File

@ -62,6 +62,11 @@ func WithMetadata(key string, val interface{}) WriterOption {
}
}
type Controller interface {
Start(context.Context) (context.Context, func(error))
Status(id string, action string) func()
}
type Writer interface {
Write(id string, value interface{}) error
Close() error

View File

@ -3,89 +3,101 @@ package pull
import (
"context"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/reference"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
type Puller struct {
Snapshotter snapshot.Snapshotter
ContentStore content.Store
Applier diff.Applier
Resolver remotes.Resolver
Src reference.Spec
Platform *ocispec.Platform
// See NewResolver()
Resolver remotes.Resolver
resolveOnce sync.Once
desc ocispec.Descriptor
ref string
resolveErr error
desc ocispec.Descriptor
configDesc ocispec.Descriptor
ref string
layers []ocispec.Descriptor
nonlayers []ocispec.Descriptor
}
type Pulled struct {
var _ content.Provider = &Puller{}
type PulledManifests struct {
Ref string
Descriptor ocispec.Descriptor
Layers []ocispec.Descriptor
MetadataBlobs []ocispec.Descriptor
MainManifestDesc ocispec.Descriptor
ConfigDesc ocispec.Descriptor
Nonlayers []ocispec.Descriptor
Remote *solver.Remote
}
func (p *Puller) Resolve(ctx context.Context) (string, ocispec.Descriptor, error) {
func (p *Puller) resolve(ctx context.Context) error {
p.resolveOnce.Do(func() {
resolveProgressDone := oneOffProgress(ctx, "resolve "+p.Src.String())
desc := ocispec.Descriptor{
Digest: p.Src.Digest(),
}
if desc.Digest != "" {
info, err := p.ContentStore.Info(ctx, desc.Digest)
if err == nil {
desc.Size = info.Size
p.ref = p.Src.String()
ra, err := p.ContentStore.ReaderAt(ctx, desc)
if err == nil {
mt, err := imageutil.DetectManifestMediaType(ra)
if err == nil {
desc.MediaType = mt
p.desc = desc
resolveProgressDone(nil)
if p.tryLocalResolve(ctx) == nil {
return
}
}
}
}
ref, desc, err := p.Resolver.Resolve(ctx, p.Src.String())
if err != nil {
p.resolveErr = err
resolveProgressDone(err)
return
}
p.desc = desc
p.ref = ref
resolveProgressDone(nil)
})
return p.ref, p.desc, p.resolveErr
return p.resolveErr
}
func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
if _, _, err := p.Resolve(ctx); err != nil {
func (p *Puller) tryLocalResolve(ctx context.Context) error {
desc := ocispec.Descriptor{
Digest: p.Src.Digest(),
}
if desc.Digest == "" {
return errors.New("empty digest")
}
info, err := p.ContentStore.Info(ctx, desc.Digest)
if err != nil {
return err
}
desc.Size = info.Size
p.ref = p.Src.String()
ra, err := p.ContentStore.ReaderAt(ctx, desc)
if err != nil {
return err
}
mt, err := imageutil.DetectManifestMediaType(ra)
if err != nil {
return err
}
desc.MediaType = mt
p.desc = desc
return nil
}
func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
err := p.resolve(ctx)
if err != nil {
return nil, err
}
// workaround for gcr, authentication not supported on blob endpoints
EnsureManifestRequested(ctx, p.Resolver, p.ref)
var platform platforms.MatchComparer
if p.Platform != nil {
platform = platforms.Only(*p.Platform)
@ -93,27 +105,19 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
platform = platforms.Default()
}
ongoing := newJobs(p.ref)
pctx, stopProgress := context.WithCancel(ctx)
go showProgress(pctx, ongoing, p.ContentStore)
fetcher, err := p.Resolver.Fetcher(ctx, p.ref)
if err != nil {
stopProgress()
return nil, err
}
var mu sync.Mutex // images.Dispatch calls handlers in parallel
metadata := make(map[digest.Digest]ocispec.Descriptor)
// TODO: need a wrapper snapshot interface that combines content
// and snapshots as 1) buildkit shouldn't have a dependency on contentstore
// or 2) cachemanager should manage the contentstore
handlers := []images.Handler{
images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(desc)
return nil, nil
}),
var handlers []images.Handler
fetcher, err := p.Resolver.Fetcher(ctx, p.ref)
if err != nil {
return nil, err
}
var schema1Converter *schema1.Converter
if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
schema1Converter = schema1.NewConverter(p.ContentStore, fetcher)
@ -128,10 +132,10 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
dslHandler, err := docker.AppendDistributionSourceLabel(p.ContentStore, p.ref)
if err != nil {
stopProgress()
return nil, err
}
handlers = append(handlers,
filterLayerBlobs(metadata, &mu),
remotes.FetchHandler(p.ContentStore, fetcher),
childrenHandler,
dslHandler,
@ -139,70 +143,81 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, p.desc); err != nil {
stopProgress()
return nil, err
}
stopProgress()
var usedBlobs []ocispec.Descriptor
if schema1Converter != nil {
ongoing.remove(p.desc) // Not left in the content store so this is sufficient.
p.desc, err = schema1Converter.Convert(ctx)
if err != nil {
return nil, err
}
ongoing.add(p.desc)
var mu sync.Mutex // images.Dispatch calls handlers in parallel
allBlobs := make(map[digest.Digest]ocispec.Descriptor)
for _, j := range ongoing.added {
allBlobs[j.Digest] = j.Descriptor
}
handlers := []images.Handler{
images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
mu.Lock()
defer mu.Unlock()
usedBlobs = append(usedBlobs, desc)
delete(allBlobs, desc.Digest)
return nil, nil
}),
filterLayerBlobs(metadata, &mu),
images.FilterPlatforms(images.ChildrenHandler(p.ContentStore), platform),
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, p.desc); err != nil {
return nil, err
}
} else {
for _, j := range ongoing.added {
usedBlobs = append(usedBlobs, j.Descriptor)
}
for _, desc := range metadata {
p.nonlayers = append(p.nonlayers, desc)
switch desc.MediaType {
case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
p.configDesc = desc
}
}
// split all pulled data to layers and rest. layers remain roots and are deleted with snapshots. rest will be linked to layers.
var notLayerBlobs []ocispec.Descriptor
for _, j := range usedBlobs {
switch j.MediaType {
case ocispec.MediaTypeImageLayer, images.MediaTypeDockerSchema2Layer, ocispec.MediaTypeImageLayerGzip, images.MediaTypeDockerSchema2LayerGzip, images.MediaTypeDockerSchema2LayerForeign, images.MediaTypeDockerSchema2LayerForeignGzip:
default:
notLayerBlobs = append(notLayerBlobs, j)
}
}
layers, err := getLayers(ctx, p.ContentStore, p.desc, platform)
p.layers, err = getLayers(ctx, p.ContentStore, p.desc, platform)
if err != nil {
return nil, err
}
return &Pulled{
return &PulledManifests{
Ref: p.ref,
Descriptor: p.desc,
Layers: layers,
MetadataBlobs: notLayerBlobs,
MainManifestDesc: p.desc,
ConfigDesc: p.configDesc,
Nonlayers: p.nonlayers,
Remote: &solver.Remote{
Descriptors: p.layers,
Provider: p,
},
}, nil
}
func (p *Puller) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
err := p.resolve(ctx)
if err != nil {
return nil, err
}
fetcher, err := p.Resolver.Fetcher(ctx, p.ref)
if err != nil {
return nil, err
}
return contentutil.FromFetcher(fetcher).ReaderAt(ctx, desc)
}
func filterLayerBlobs(metadata map[digest.Digest]ocispec.Descriptor, mu sync.Locker) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
switch desc.MediaType {
case ocispec.MediaTypeImageLayer, images.MediaTypeDockerSchema2Layer, ocispec.MediaTypeImageLayerGzip, images.MediaTypeDockerSchema2LayerGzip, images.MediaTypeDockerSchema2LayerForeign, images.MediaTypeDockerSchema2LayerForeignGzip:
return nil, images.ErrSkipDesc
default:
if metadata != nil {
mu.Lock()
metadata[desc.Digest] = desc
mu.Unlock()
}
}
return nil, nil
}
}
func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Descriptor, platform platforms.MatchComparer) ([]ocispec.Descriptor, error) {
manifest, err := images.Manifest(ctx, provider, desc, platform)
if err != nil {
@ -227,185 +242,3 @@ func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Desc
}
return layers, nil
}
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
var (
ticker = time.NewTicker(150 * time.Millisecond)
statuses = map[string]statusInfo{}
done bool
)
defer ticker.Stop()
pw, _, ctx := progress.FromContext(ctx)
defer pw.Close()
for {
select {
case <-ticker.C:
case <-ctx.Done():
done = true
}
resolved := "resolved"
if !ongoing.isResolved() {
resolved = "resolving"
}
statuses[ongoing.name] = statusInfo{
Ref: ongoing.name,
Status: resolved,
}
actives := make(map[string]statusInfo)
if !done {
active, err := cs.ListStatuses(ctx, "")
if err != nil {
// log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
actives[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
}
}
// now, update the items in jobs that are not in active
for _, j := range ongoing.jobs() {
refKey := remotes.MakeRefKey(ctx, j.Descriptor)
if a, ok := actives[refKey]; ok {
started := j.started
pw.Write(j.Digest.String(), progress.Status{
Action: a.Status,
Total: int(a.Total),
Current: int(a.Offset),
Started: &started,
})
continue
}
if !j.done {
info, err := cs.Info(context.TODO(), j.Digest)
if err != nil {
if errors.Is(err, errdefs.ErrNotFound) {
pw.Write(j.Digest.String(), progress.Status{
Action: "waiting",
})
continue
}
} else {
j.done = true
}
if done || j.done {
started := j.started
createdAt := info.CreatedAt
pw.Write(j.Digest.String(), progress.Status{
Action: "done",
Current: int(info.Size),
Total: int(info.Size),
Completed: &createdAt,
Started: &started,
})
}
}
}
if done {
return
}
}
}
// jobs provides a way of identifying the download keys for a particular task
// encountering during the pull walk.
//
// This is very minimal and will probably be replaced with something more
// featured.
type jobs struct {
name string
added map[digest.Digest]*job
mu sync.Mutex
resolved bool
}
type job struct {
ocispec.Descriptor
done bool
started time.Time
}
func newJobs(name string) *jobs {
return &jobs{
name: name,
added: make(map[digest.Digest]*job),
}
}
func (j *jobs) add(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()
if _, ok := j.added[desc.Digest]; ok {
return
}
j.added[desc.Digest] = &job{
Descriptor: desc,
started: time.Now(),
}
}
func (j *jobs) remove(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()
delete(j.added, desc.Digest)
}
func (j *jobs) jobs() []*job {
j.mu.Lock()
defer j.mu.Unlock()
descs := make([]*job, 0, len(j.added))
for _, j := range j.added {
descs = append(descs, j)
}
return descs
}
func (j *jobs) isResolved() bool {
j.mu.Lock()
defer j.mu.Unlock()
return j.resolved
}
type statusInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time
}
func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
}
pw.Write(id, st)
return func(err error) error {
// TODO: set error on status
now := time.Now()
st.Completed = &now
pw.Write(id, st)
pw.Close()
return err
}
}

View File

@ -16,10 +16,10 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
var cache *resolverCache
var resCache *resolverCache
func init() {
cache = newResolverCache()
resCache = newResolverCache()
}
type ResolverOpt struct {
@ -31,12 +31,12 @@ type ResolverOpt struct {
}
func NewResolver(g session.Group, opt ResolverOpt) remotes.Resolver {
if res := cache.Get(opt.Ref, g); res != nil {
if res := resCache.Get(opt.Ref, g); res != nil {
return withLocal(res, opt.ImageStore, opt.Mode)
}
r := resolver.New(opt.Hosts, opt.Auth)
r = cache.Add(opt.Ref, r, opt.Auth, g)
r = resCache.Add(opt.Ref, r, opt.Auth, g)
return withLocal(r, opt.ImageStore, opt.Mode)
}

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
@ -25,7 +26,7 @@ import (
"github.com/sirupsen/logrus"
)
func Push(ctx context.Context, sm *session.Manager, sid string, cs content.Store, dgst digest.Digest, ref string, insecure bool, hosts docker.RegistryHosts, byDigest bool) error {
func Push(ctx context.Context, sm *session.Manager, sid string, provider content.Provider, manager content.Manager, dgst digest.Digest, ref string, insecure bool, hosts docker.RegistryHosts, byDigest bool) error {
desc := ocispec.Descriptor{
Digest: dgst,
}
@ -77,19 +78,19 @@ func Push(ctx context.Context, sm *session.Manager, sid string, cs content.Store
}
})
pushHandler := remotes.PushHandler(pusher, cs)
pushUpdateSourceHandler, err := updateDistributionSourceHandler(cs, pushHandler, ref)
pushHandler := remotes.PushHandler(pusher, provider)
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
if err != nil {
return err
}
handlers := append([]images.Handler{},
images.HandlerFunc(annotateDistributionSourceHandler(cs, childrenHandler(cs))),
images.HandlerFunc(annotateDistributionSourceHandler(manager, childrenHandler(provider))),
filterHandler,
dedupeHandler(pushUpdateSourceHandler),
)
ra, err := cs.ReaderAt(ctx, desc)
ra, err := provider.ReaderAt(ctx, desc)
if err != nil {
return err
}
@ -118,7 +119,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, cs content.Store
return mfstDone(nil)
}
func annotateDistributionSourceHandler(cs content.Store, f images.HandlerFunc) func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
func annotateDistributionSourceHandler(manager content.Manager, f images.HandlerFunc) func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
children, err := f(ctx, desc)
if err != nil {
@ -135,9 +136,10 @@ func annotateDistributionSourceHandler(cs content.Store, f images.HandlerFunc) f
for i := range children {
child := children[i]
info, err := cs.Info(ctx, child.Digest)
if err != nil {
info, err := manager.Info(ctx, child.Digest)
if errors.Is(err, errdefs.ErrNotFound) {
continue
} else if err != nil {
return nil, err
}
@ -228,8 +230,8 @@ func childrenHandler(provider content.Provider) images.HandlerFunc {
//
// FIXME(fuweid): There is race condition for current design of distribution
// source label if there are pull/push jobs consuming same layer.
func updateDistributionSourceHandler(cs content.Store, pushF images.HandlerFunc, ref string) (images.HandlerFunc, error) {
updateF, err := docker.AppendDistributionSourceLabel(cs, ref)
func updateDistributionSourceHandler(manager content.Manager, pushF images.HandlerFunc, ref string) (images.HandlerFunc, error) {
updateF, err := docker.AppendDistributionSourceLabel(manager, ref)
if err != nil {
return nil, err
}

View File

@ -18,7 +18,6 @@ import (
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/docker/pkg/idtools"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/blobs"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
@ -42,17 +41,14 @@ import (
"github.com/moby/buildkit/source/http"
"github.com/moby/buildkit/source/local"
"github.com/moby/buildkit/util/archutil"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/controller"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"golang.org/x/sync/errgroup"
)
const labelCreatedAt = "buildkit/createdat"
@ -104,6 +100,7 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
GarbageCollect: opt.GarbageCollect,
LeaseManager: opt.LeaseManager,
ContentStore: opt.ContentStore,
Differ: opt.Differ,
})
if err != nil {
return nil, err
@ -220,12 +217,31 @@ func (w *Worker) GCPolicy() []client.PruneInfo {
return w.WorkerOpt.GCPolicy
}
func (w *Worker) LoadRef(id string, hidden bool) (cache.ImmutableRef, error) {
func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.ImmutableRef, error) {
var opts []cache.RefOption
if hidden {
opts = append(opts, cache.NoUpdateLastUsed)
}
return w.CacheMgr.Get(context.TODO(), id, opts...)
ref, err := w.CacheMgr.Get(ctx, id, opts...)
var needsRemoteProviders cache.NeedsRemoteProvidersError
if errors.As(err, &needsRemoteProviders) {
if optGetter := solver.CacheOptGetterOf(ctx); optGetter != nil {
var keys []interface{}
for _, dgst := range needsRemoteProviders {
keys = append(keys, cache.DescHandlerKey(dgst))
}
descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
for k, v := range optGetter(keys...) {
if v, ok := v.(*cache.DescHandler); ok {
descHandlers[k.(digest.Digest)] = v
}
}
opts = append(opts, descHandlers)
ref, err = w.CacheMgr.Get(ctx, id, opts...)
}
}
return ref, err
}
func (w *Worker) Executor() executor.Executor {
@ -343,121 +359,17 @@ func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter,
}
}
func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) {
ctx, done, err := leaseutil.WithLease(ctx, w.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
}
defer done(ctx)
// TODO(fuweid): add compression option or config for cache exporter.
diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore(), w.Differ, ref, createIfNeeded, blobs.DefaultCompression)
if err != nil {
return nil, errors.Wrap(err, "failed calculating diff pairs for exported snapshot")
}
if len(diffPairs) == 0 {
return nil, nil
}
createdTimes := getCreatedTimes(ref)
if len(createdTimes) != len(diffPairs) {
return nil, errors.Errorf("invalid createdtimes/diffpairs")
}
descs := make([]ocispec.Descriptor, len(diffPairs))
cs := w.ContentStore()
layerMediaTypes := blobs.GetMediaTypeForLayers(diffPairs, ref)
for i, dp := range diffPairs {
info, err := cs.Info(ctx, dp.Blobsum)
if err != nil {
return nil, err
}
tm, err := createdTimes[i].MarshalText()
if err != nil {
return nil, err
}
var mediaType string
if len(layerMediaTypes) > i {
mediaType = layerMediaTypes[i]
}
// NOTE: The media type might be missing for some migrated ones
// from before lease based storage. If so, we should detect
// the media type from blob data.
//
// Discussion: https://github.com/moby/buildkit/pull/1277#discussion_r352795429
if mediaType == "" {
mediaType, err = blobs.DetectLayerMediaType(ctx, cs, dp.Blobsum, false)
if err != nil {
return nil, err
}
}
descs[i] = ocispec.Descriptor{
Digest: dp.Blobsum,
Size: info.Size,
MediaType: mediaType,
Annotations: map[string]string{
"containerd.io/uncompressed": dp.DiffID.String(),
labelCreatedAt: string(tm),
},
}
}
return &solver.Remote{
Descriptors: descs,
Provider: cs,
}, nil
}
func getCreatedTimes(ref cache.ImmutableRef) (out []time.Time) {
parent := ref.Parent()
if parent != nil {
defer parent.Release(context.TODO())
out = getCreatedTimes(parent)
}
return append(out, cache.GetCreatedAt(ref.Metadata()))
}
func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cache.ImmutableRef, err error) {
ctx, done, err := leaseutil.WithLease(ctx, w.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
pw, _, _ := progress.FromContext(ctx)
descHandler := &cache.DescHandler{
Provider: remote.Provider,
Progress: &controller.Controller{Writer: pw},
}
defer done(ctx)
eg, gctx := errgroup.WithContext(ctx)
descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
for _, desc := range remote.Descriptors {
func(desc ocispec.Descriptor) {
eg.Go(func() error {
done := oneOffProgress(ctx, fmt.Sprintf("pulling %s", desc.Digest))
if err := contentutil.Copy(gctx, w.ContentStore(), remote.Provider, desc); err != nil {
return done(err)
}
if ref, ok := desc.Annotations["containerd.io/distribution.source.ref"]; ok {
hf, err := docker.AppendDistributionSourceLabel(w.ContentStore(), ref)
if err != nil {
return done(err)
}
_, err = hf(ctx, desc)
return done(err)
}
return done(nil)
})
}(desc)
descHandlers[desc.Digest] = descHandler
}
if err := eg.Wait(); err != nil {
return nil, err
}
unpackProgressDone := oneOffProgress(ctx, "unpacking")
defer func() {
err = unpackProgressDone(err)
}()
var current cache.ImmutableRef
for i, desc := range remote.Descriptors {
tm := time.Now()
@ -473,17 +385,16 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cac
if v, ok := desc.Annotations["buildkit/description"]; ok {
descr = v
}
ref, err := w.CacheMgr.GetByBlob(ctx, desc, current, cache.WithDescription(descr), cache.WithCreationTime(tm))
ref, err := w.CacheMgr.GetByBlob(ctx, desc, current,
cache.WithDescription(descr),
cache.WithCreationTime(tm),
descHandlers)
if current != nil {
current.Release(context.TODO())
}
if err != nil {
return nil, err
}
if err := ref.Extract(ctx); err != nil {
ref.Release(context.TODO())
return nil, err
}
current = ref
}
return current, nil
@ -519,20 +430,3 @@ func ID(root string) (string, error) {
}
return string(b), nil
}
func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
}
pw.Write(id, st)
return func(err error) error {
// TODO: set error on status
now := time.Now()
st.Completed = &now
pw.Write(id, st)
pw.Close()
return err
}
}

View File

@ -7,6 +7,7 @@ import (
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/pkg/errors"
)
@ -36,7 +37,7 @@ func (s *cacheResultStorage) Save(res solver.Result, createdAt time.Time) (solve
return solver.CacheResult{ID: ref.ID(), CreatedAt: createdAt}, nil
}
func (s *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult) (solver.Result, error) {
return s.load(res.ID, false)
return s.load(ctx, res.ID, false)
}
func (s *cacheResultStorage) getWorkerRef(id string) (Worker, string, error) {
@ -51,7 +52,7 @@ func (s *cacheResultStorage) getWorkerRef(id string) (Worker, string, error) {
return w, refID, nil
}
func (s *cacheResultStorage) load(id string, hidden bool) (solver.Result, error) {
func (s *cacheResultStorage) load(ctx context.Context, id string, hidden bool) (solver.Result, error) {
w, refID, err := s.getWorkerRef(id)
if err != nil {
return nil, err
@ -59,7 +60,7 @@ func (s *cacheResultStorage) load(id string, hidden bool) (solver.Result, error)
if refID == "" {
return NewWorkerRefResult(nil, w), nil
}
ref, err := w.LoadRef(refID, hidden)
ref, err := w.LoadRef(ctx, refID, hidden)
if err != nil {
return nil, err
}
@ -71,19 +72,19 @@ func (s *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheRes
if err != nil {
return nil, err
}
ref, err := w.LoadRef(refID, true)
ref, err := w.LoadRef(ctx, refID, true)
if err != nil {
return nil, err
}
defer ref.Release(context.TODO())
remote, err := w.GetRemote(ctx, ref, false)
remote, err := ref.GetRemote(ctx, false, compression.Default)
if err != nil {
return nil, nil // ignore error. loadRemote is best effort
}
return remote, nil
}
func (s *cacheResultStorage) Exists(id string) bool {
ref, err := s.load(id, true)
ref, err := s.load(context.TODO(), id, true)
if err != nil {
return false
}

View File

@ -23,7 +23,9 @@ import (
func NewBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker, sm *session.Manager) cache.ImmutableRef {
img, err := source.NewImageIdentifier("docker.io/library/busybox:latest")
require.NoError(t, err)
src, err := w.SourceManager.Resolve(ctx, img, sm)
src, err := w.SourceManager.Resolve(ctx, img, sm, nil)
require.NoError(t, err)
_, _, _, err = src.CacheKey(ctx, nil, 0)
require.NoError(t, err)
snap, err := src.Snapshot(ctx, nil)
require.NoError(t, err)

View File

@ -23,14 +23,13 @@ type Worker interface {
Platforms(noCache bool) []specs.Platform
GCPolicy() []client.PruneInfo
LoadRef(id string, hidden bool) (cache.ImmutableRef, error)
LoadRef(ctx context.Context, id string, hidden bool) (cache.ImmutableRef, error)
// ResolveOp resolves Vertex.Sys() to Op implementation.
ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error)
DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error)
Exporter(name string, sm *session.Manager) (exporter.Exporter, error)
Prune(ctx context.Context, ch chan client.UsageInfo, opt ...client.PruneInfo) error
GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error)
FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error)
PruneCacheMounts(ctx context.Context, ids []string) error
ContentStore() content.Store