cache: Allow sharing compression variants among refs

Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
master
Kohei Tokunaga 2022-01-28 22:59:37 +09:00
parent 45fc3ed510
commit 4280dfd489
5 changed files with 603 additions and 152 deletions

22
cache/blobs.go vendored
View File

@ -241,7 +241,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.Errorf("unknown layer compression type")
}
if err := sr.setBlob(ctx, comp.Type, desc); err != nil {
if err := sr.setBlob(ctx, desc); err != nil {
return nil, err
}
return nil, nil
@ -267,10 +267,15 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
// setBlob associates a blob with the cache record.
// A lease must be held for the blob when calling this function
func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression.Type, desc ocispecs.Descriptor) error {
func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) (rerr error) {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for setBlob")
}
defer func() {
if rerr == nil {
rerr = sr.linkBlob(ctx, desc)
}
}()
diffID, err := diffIDFromDescriptor(desc)
if err != nil {
@ -280,10 +285,6 @@ func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression
return err
}
if compressionType == compression.UnknownCompression {
return errors.Errorf("unhandled layer media type: %q", desc.MediaType)
}
sr.mu.Lock()
defer sr.mu.Unlock()
@ -311,9 +312,6 @@ func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression
return err
}
if err := sr.addCompressionBlob(ctx, desc, compressionType); err != nil {
return err
}
return nil
}
@ -437,11 +435,11 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.
// This ref can be used as the specified compressionType. Keep it lazy.
return nil, nil
}
return nil, ref.addCompressionBlob(ctx, desc, comp.Type)
return nil, ref.linkBlob(ctx, desc)
}
// First, lookup local content store
if _, err := ref.getCompressionBlob(ctx, comp.Type); err == nil {
if _, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil {
return nil, nil // found the compression variant. no need to convert.
}
@ -460,7 +458,7 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.
}
// Start to track converted layer
if err := ref.addCompressionBlob(ctx, *newDesc, comp.Type); err != nil {
if err := ref.linkBlob(ctx, *newDesc); err != nil {
return nil, errors.Wrapf(err, "failed to add compression blob")
}
return nil, nil

2
cache/converter.go vendored
View File

@ -14,6 +14,7 @@ import (
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/labels"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
@ -129,6 +130,7 @@ var bufioPool = sync.Pool{
}
func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
bklog.G(ctx).WithField("blob", desc).WithField("target", c.target).Debugf("converting blob to the target compression")
// prepare the source and destination
labelz := make(map[string]string)
ref := fmt.Sprintf("convert-from-%s-to-%s-%s", desc.Digest, c.target.Type.String(), identity.NewID())

435
cache/manager_test.go vendored
View File

@ -5,6 +5,7 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
@ -597,11 +598,7 @@ func TestExtractOnMutable(t *testing.T) {
leaseCtx, done, err := leaseutil.WithLease(ctx, co.lm, leases.WithExpiration(0))
require.NoError(t, err)
compressionType := compression.FromMediaType(desc.MediaType)
if compressionType == compression.UnknownCompression {
t.Errorf("unhandled layer media type: %q", desc.MediaType)
}
err = snap.(*immutableRef).setBlob(leaseCtx, compressionType, desc)
err = snap.(*immutableRef).setBlob(leaseCtx, desc)
done(context.TODO())
require.NoError(t, err)
err = snap.(*immutableRef).computeChainMetadata(leaseCtx, map[string]struct{}{snap.ID(): {}})
@ -713,7 +710,7 @@ func TestSetBlob(t *testing.T) {
err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), desc)
require.NoError(t, err)
err = snap.(*immutableRef).setBlob(ctx, compression.UnknownCompression, ocispecs.Descriptor{
err = snap.(*immutableRef).setBlob(ctx, ocispecs.Descriptor{
Digest: digest.FromBytes([]byte("foobar")),
Annotations: map[string]string{
"containerd.io/uncompressed": digest.FromBytes([]byte("foobar2")).String(),
@ -721,11 +718,7 @@ func TestSetBlob(t *testing.T) {
})
require.Error(t, err)
compressionType := compression.FromMediaType(desc.MediaType)
if compressionType == compression.UnknownCompression {
t.Errorf("unhandled layer media type: %q", desc.MediaType)
}
err = snap.(*immutableRef).setBlob(ctx, compressionType, desc)
err = snap.(*immutableRef).setBlob(ctx, desc)
require.NoError(t, err)
err = snap.(*immutableRef).computeChainMetadata(ctx, map[string]struct{}{snap.ID(): {}})
require.NoError(t, err)
@ -751,11 +744,7 @@ func TestSetBlob(t *testing.T) {
err = content.WriteBlob(ctx, co.cs, "ref2", bytes.NewBuffer(b2), desc2)
require.NoError(t, err)
compressionType2 := compression.FromMediaType(desc2.MediaType)
if compressionType2 == compression.UnknownCompression {
t.Errorf("unhandled layer media type: %q", desc2.MediaType)
}
err = snap2.(*immutableRef).setBlob(ctx, compressionType2, desc2)
err = snap2.(*immutableRef).setBlob(ctx, desc2)
require.NoError(t, err)
err = snap2.(*immutableRef).computeChainMetadata(ctx, map[string]struct{}{snap.ID(): {}, snap2.ID(): {}})
require.NoError(t, err)
@ -1110,6 +1099,396 @@ func TestLazyCommit(t *testing.T) {
require.Equal(t, true, errors.Is(err, errNotFound))
}
func TestLoopLeaseContent(t *testing.T) {
t.Parallel()
// windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'"
if runtime.GOOS != "linux" {
t.Skipf("unsupported GOOS: %s", runtime.GOOS)
}
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
cm := co.manager
ctx, done, err := leaseutil.WithLease(ctx, co.lm, leaseutil.MakeTemporary)
require.NoError(t, err)
defer done(ctx)
// store an uncompressed blob to the content store
compressionLoop := []compression.Type{compression.Uncompressed, compression.Gzip, compression.Zstd, compression.EStargz}
blobBytes, orgDesc, err := mapToBlob(map[string]string{"foo": "1"}, false)
require.NoError(t, err)
contentBuffer := contentutil.NewBuffer()
descHandlers := DescHandlers(map[digest.Digest]*DescHandler{})
cw, err := contentBuffer.Writer(ctx, content.WithRef(fmt.Sprintf("write-test-blob-%s", orgDesc.Digest)))
require.NoError(t, err)
_, err = cw.Write(blobBytes)
require.NoError(t, err)
require.NoError(t, cw.Commit(ctx, 0, cw.Digest()))
descHandlers[orgDesc.Digest] = &DescHandler{
Provider: func(_ session.Group) content.Provider { return contentBuffer },
}
// Create a compression loop
ref, err := cm.GetByBlob(ctx, orgDesc, nil, descHandlers)
require.NoError(t, err)
allRefs := []ImmutableRef{ref}
defer func() {
for _, ref := range allRefs {
ref.Release(ctx)
}
}()
var chain []ocispecs.Descriptor
for _, compressionType := range compressionLoop {
remotes, err := ref.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
require.Equal(t, 1, len(remotes[0].Descriptors))
desc := remotes[0].Descriptors[0]
chain = append(chain, desc)
ref, err = cm.GetByBlob(ctx, desc, nil, descHandlers)
require.NoError(t, err)
allRefs = append(allRefs, ref)
}
require.Equal(t, len(compressionLoop), len(chain))
require.NoError(t, ref.(*immutableRef).linkBlob(ctx, chain[0])) // This creates a loop
// Make sure a loop is created
visited := make(map[digest.Digest]struct{})
gotChain := []digest.Digest{orgDesc.Digest}
cur := orgDesc
previous := chain[len(chain)-1].Digest
for i := 0; i < 1000; i++ {
dgst := cur.Digest
visited[dgst] = struct{}{}
info, err := co.cs.Info(ctx, dgst)
if err != nil && !errors.Is(err, errdefs.ErrNotFound) {
require.NoError(t, err)
}
var children []ocispecs.Descriptor
for k, dgstS := range info.Labels {
if !strings.HasPrefix(k, blobVariantGCLabel) {
continue
}
cDgst, err := digest.Parse(dgstS)
if err != nil || cDgst == dgst || previous == cDgst {
continue
}
cDesc, err := getBlobDesc(ctx, co.cs, cDgst)
require.NoError(t, err)
children = append(children, cDesc)
}
require.Equal(t, 1, len(children), "previous=%v, cur=%v, labels: %+v", previous, cur, info.Labels)
previous = cur.Digest
cur = children[0]
if _, ok := visited[cur.Digest]; ok {
break
}
gotChain = append(gotChain, cur.Digest)
}
require.Equal(t, len(chain), len(gotChain))
// Prune all refs
require.NoError(t, done(ctx))
for _, ref := range allRefs {
ref.Release(ctx)
}
ensurePrune(ctx, t, cm, len(gotChain)-1, 10)
// Check if contents are cleaned up
for _, d := range gotChain {
_, err := co.cs.Info(ctx, d)
require.ErrorIs(t, err, errdefs.ErrNotFound)
}
}
func TestSharingCompressionVariant(t *testing.T) {
t.Parallel()
// windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'"
if runtime.GOOS != "linux" {
t.Skipf("unsupported GOOS: %s", runtime.GOOS)
}
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
ctx, done, err := leaseutil.WithLease(ctx, co.lm, leaseutil.MakeTemporary)
require.NoError(t, err)
defer done(context.TODO())
allCompressions := []compression.Type{compression.Uncompressed, compression.Gzip, compression.Zstd, compression.EStargz}
do := func(test func(testCaseSharingCompressionVariant)) {
for _, a := range exclude(allCompressions, compression.Uncompressed) {
for _, aV1 := range exclude(allCompressions, a) {
for _, aV2 := range exclude(allCompressions, a, aV1) {
for _, b := range []compression.Type{aV1, aV2} {
for _, bV1 := range exclude(allCompressions, a, aV1, aV2) {
test(testCaseSharingCompressionVariant{
a: a,
aVariants: []compression.Type{aV1, aV2},
b: b,
bVariants: []compression.Type{bV1, a},
})
}
}
}
}
}
}
t.Logf("Test cases with possible compression types")
do(func(testCase testCaseSharingCompressionVariant) {
testCase.checkPrune = true
testSharingCompressionVariant(ctx, t, co, testCase)
require.NoError(t, co.manager.Prune(ctx, nil, client.PruneInfo{All: true}))
checkDiskUsage(ctx, t, co.manager, 0, 0)
})
t.Logf("Test case with many parallel operation")
eg, egctx := errgroup.WithContext(ctx)
do(func(testCase testCaseSharingCompressionVariant) {
eg.Go(func() error {
testCase.checkPrune = false
testSharingCompressionVariant(egctx, t, co, testCase)
return nil
})
})
require.NoError(t, eg.Wait())
}
func exclude(s []compression.Type, ts ...compression.Type) (res []compression.Type) {
EachElem:
for _, v := range s {
for _, t := range ts {
if v == t {
continue EachElem
}
}
res = append(res, v)
}
return
}
// testCaseSharingCompressionVariant is one test case configuration for testSharingCompressionVariant.
// This configures two refs A and B.
// A creates compression variants configured by aVariants and
// B creates compression variants configured by bVariants.
// This test checks if aVariants are visible from B and bVariants are visible from A.
type testCaseSharingCompressionVariant struct {
// a is the compression of the initial immutableRef's (called A) blob
a compression.Type
// aVariants are the compression variants created from A
aVariants []compression.Type
// b is another immutableRef (called B) which has one of the compression variants of A
b compression.Type
// bVariants are compression variants created from B
bVariants []compression.Type
// checkPrune is whether checking prune API. must be false if run tests in parallel.
checkPrune bool
}
func testSharingCompressionVariant(ctx context.Context, t *testing.T, co *cmOut, testCase testCaseSharingCompressionVariant) {
var (
cm = co.manager
allCompressions = append(append([]compression.Type{testCase.a, testCase.b}, testCase.aVariants...), testCase.bVariants...)
orgContent = map[string]string{"foo": "1"}
)
test := func(customized bool) {
defer cm.Prune(ctx, nil, client.PruneInfo{})
// Prepare the original content
_, orgContentDesc, err := mapToBlob(orgContent, false)
require.NoError(t, err)
blobBytes, aDesc, err := mapToBlobWithCompression(orgContent, func(w io.Writer) (io.WriteCloser, string, error) {
cw, err := getCompressor(w, testCase.a, customized)
if err != nil {
return nil, "", err
}
return cw, testCase.a.DefaultMediaType(), nil
})
require.NoError(t, err)
contentBuffer := contentutil.NewBuffer()
descHandlers := DescHandlers(map[digest.Digest]*DescHandler{})
cw, err := contentBuffer.Writer(ctx, content.WithRef(fmt.Sprintf("write-test-blob-%s", aDesc.Digest)))
require.NoError(t, err)
_, err = cw.Write(blobBytes)
require.NoError(t, err)
require.NoError(t, cw.Commit(ctx, 0, cw.Digest()))
descHandlers[aDesc.Digest] = &DescHandler{
Provider: func(_ session.Group) content.Provider { return contentBuffer },
}
// Create compression variants
aRef, err := cm.GetByBlob(ctx, aDesc, nil, descHandlers)
require.NoError(t, err)
defer aRef.Release(ctx)
var bDesc ocispecs.Descriptor
for _, compressionType := range testCase.aVariants {
remotes, err := aRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
require.Equal(t, 1, len(remotes[0].Descriptors))
if compressionType == testCase.b {
bDesc = remotes[0].Descriptors[0]
}
}
require.NotEqual(t, "", bDesc.Digest, "compression B must be chosen from the variants of A")
bRef, err := cm.GetByBlob(ctx, bDesc, nil, descHandlers)
require.NoError(t, err)
defer bRef.Release(ctx)
for _, compressionType := range testCase.bVariants {
remotes, err := bRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
require.Equal(t, 1, len(remotes[0].Descriptors))
}
// check if all compression variables are available on the both refs
checkCompression := func(desc ocispecs.Descriptor, compressionType compression.Type) {
require.Equal(t, compressionType.DefaultMediaType(), desc.MediaType, "compression: %v", compressionType)
if compressionType == compression.EStargz {
ok, err := isEStargz(ctx, co.cs, desc.Digest)
require.NoError(t, err, "compression: %v", compressionType)
require.True(t, ok, "compression: %v", compressionType)
}
}
for _, c := range allCompressions {
aDesc, err := aRef.(*immutableRef).getBlobWithCompression(ctx, c)
require.NoError(t, err, "compression: %v", c)
bDesc, err := bRef.(*immutableRef).getBlobWithCompression(ctx, c)
require.NoError(t, err, "compression: %v", c)
checkCompression(aDesc, c)
checkCompression(bDesc, c)
}
// check if compression variables are availalbe on B still after A is released
if testCase.checkPrune && aRef.ID() != bRef.ID() {
require.NoError(t, aRef.Release(ctx))
ensurePrune(ctx, t, cm, 1, 10)
checkDiskUsage(ctx, t, co.manager, 1, 0)
for _, c := range allCompressions {
_, err = bRef.(*immutableRef).getBlobWithCompression(ctx, c)
require.NoError(t, err)
}
}
// check if contents are valid
for _, c := range allCompressions {
bDesc, err := bRef.(*immutableRef).getBlobWithCompression(ctx, c)
require.NoError(t, err, "compression: %v", c)
uDgst := bDesc.Digest
if c != compression.Uncompressed {
convertFunc, err := getConverter(ctx, co.cs, bDesc, compression.New(compression.Uncompressed))
require.NoError(t, err, "compression: %v", c)
uDesc, err := convertFunc(ctx, co.cs, bDesc)
require.NoError(t, err, "compression: %v", c)
uDgst = uDesc.Digest
}
require.Equal(t, uDgst, orgContentDesc.Digest, "compression: %v", c)
}
}
for _, customized := range []bool{true, false} {
// tests in two patterns: whether making the initial blob customized
test(customized)
}
}
func ensurePrune(ctx context.Context, t *testing.T, cm Manager, pruneNum, maxRetry int) {
sum := 0
for i := 0; i <= maxRetry; i++ {
buf := pruneResultBuffer()
require.NoError(t, cm.Prune(ctx, buf.C, client.PruneInfo{All: true}))
buf.close()
sum += len(buf.all)
if sum >= pruneNum {
return
}
time.Sleep(100 * time.Millisecond)
t.Logf("Retrying to prune (%v)", i)
}
require.Equal(t, true, sum >= pruneNum, "actual=%v, expected=%v", sum, pruneNum)
}
func getCompressor(w io.Writer, compressionType compression.Type, customized bool) (io.WriteCloser, error) {
switch compressionType {
case compression.Uncompressed:
return nil, fmt.Errorf("compression is not requested: %v", compressionType)
case compression.Gzip:
if customized {
gz, _ := gzip.NewWriterLevel(w, gzip.NoCompression)
gz.Header.Comment = "hello"
gz.Close()
}
return gzip.NewWriter(w), nil
case compression.EStargz:
done := make(chan struct{})
pr, pw := io.Pipe()
level := gzip.BestCompression
if customized {
level = gzip.BestSpeed
}
go func() {
defer close(done)
gw := estargz.NewWriterLevel(w, level)
if err := gw.AppendTarLossLess(pr); err != nil {
pr.CloseWithError(err)
return
}
if _, err := gw.Close(); err != nil {
pr.CloseWithError(err)
return
}
pr.Close()
}()
return &writeCloser{pw, func() error { <-done; return nil }}, nil
case compression.Zstd:
if customized {
skippableFrameMagic := []byte{0x50, 0x2a, 0x4d, 0x18}
s := []byte("hello")
size := make([]byte, 4)
binary.LittleEndian.PutUint32(size, uint32(len(s)))
if _, err := w.Write(append(append(skippableFrameMagic, size...), s...)); err != nil {
return nil, err
}
}
return zstd.NewWriter(w)
default:
return nil, fmt.Errorf("unknown compression type: %q", compressionType)
}
}
func TestConversion(t *testing.T) {
t.Parallel()
if runtime.GOOS != "linux" {
@ -1388,7 +1767,7 @@ func TestGetRemotes(t *testing.T) {
if needs {
require.False(t, isLazy, "layer %q requires conversion so it must be unlazied", desc.Digest)
}
bDesc, err := r.getCompressionBlob(egctx, compressionType)
bDesc, err := r.getBlobWithCompression(egctx, compressionType)
if isLazy {
require.Error(t, err)
} else {
@ -2076,12 +2455,26 @@ func (b bufferCloser) Close() error {
}
func mapToBlob(m map[string]string, compress bool) ([]byte, ocispecs.Descriptor, error) {
if !compress {
return mapToBlobWithCompression(m, nil)
}
return mapToBlobWithCompression(m, func(w io.Writer) (io.WriteCloser, string, error) {
return gzip.NewWriter(w), ocispecs.MediaTypeImageLayerGzip, nil
})
}
func mapToBlobWithCompression(m map[string]string, compress func(io.Writer) (io.WriteCloser, string, error)) ([]byte, ocispecs.Descriptor, error) {
buf := bytes.NewBuffer(nil)
sha := digest.SHA256.Digester()
var dest io.WriteCloser = bufferCloser{buf}
if compress {
dest = gzip.NewWriter(buf)
mediaType := ocispecs.MediaTypeImageLayer
if compress != nil {
var err error
dest, mediaType, err = compress(buf)
if err != nil {
return nil, ocispecs.Descriptor{}, err
}
}
tw := tar.NewWriter(io.MultiWriter(sha.Hash(), dest))
@ -2103,10 +2496,6 @@ func mapToBlob(m map[string]string, compress bool) ([]byte, ocispecs.Descriptor,
return nil, ocispecs.Descriptor{}, err
}
mediaType := ocispecs.MediaTypeImageLayer
if compress {
mediaType = ocispecs.MediaTypeImageLayerGzip
}
return buf.Bytes(), ocispecs.Descriptor{
Digest: digest.FromBytes(buf.Bytes()),
MediaType: mediaType,

256
cache/refs.go vendored
View File

@ -338,24 +338,21 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) {
}
}
if dgst := cr.getBlob(); dgst != "" {
added := make(map[digest.Digest]struct{})
info, err := cr.cm.ContentStore.Info(ctx, digest.Digest(dgst))
if err == nil {
usage.Size += info.Size
added[digest.Digest(dgst)] = struct{}{}
}
for k, v := range info.Labels {
// accumulate size of compression variant blobs
if strings.HasPrefix(k, compressionVariantDigestLabelPrefix) {
if cdgst, err := digest.Parse(v); err == nil {
if digest.Digest(dgst) == cdgst {
// do not double count if the label points to this content itself.
continue
}
if info, err := cr.cm.ContentStore.Info(ctx, cdgst); err == nil {
walkBlobVariantsOnly(ctx, cr.cm.ContentStore, digest.Digest(dgst), func(desc ocispecs.Descriptor) bool {
if _, ok := added[desc.Digest]; !ok {
if info, err := cr.cm.ContentStore.Info(ctx, desc.Digest); err == nil {
usage.Size += info.Size
added[desc.Digest] = struct{}{}
}
}
}
}
return true
}, nil)
}
cr.mu.Lock()
cr.queueSize(usage.Size)
@ -687,92 +684,171 @@ func (sr *immutableRef) ociDesc(ctx context.Context, dhs DescHandlers, preferNon
}
const (
compressionVariantDigestLabelPrefix = "buildkit.io/compression/digest."
compressionVariantAnnotationsLabelPrefix = "buildkit.io/compression/annotation."
compressionVariantMediaTypeLabel = "buildkit.io/compression/mediatype"
blobVariantGCLabel = "containerd.io/gc.ref.content.blob-"
blobAnnotationsLabelPrefix = "buildkit.io/blob/annotation."
blobMediaTypeLabel = "buildkit.io/blob/mediatype"
)
func compressionVariantDigestLabel(compressionType compression.Type) string {
return compressionVariantDigestLabelPrefix + compressionType.String()
// linkBlob makes a link between this ref and the passed blob. The linked blob can be
// acquired during walkBlob. This is useful to associate a compression variant blob to
// this ref. This doesn't record the blob to the cache record (i.e. the passed blob can't
// be acquired through getBlob). Use setBlob for that purpose.
func (sr *immutableRef) linkBlob(ctx context.Context, desc ocispecs.Descriptor) error {
cs := sr.cm.ContentStore
blobDigest := sr.getBlob()
info, err := cs.Info(ctx, blobDigest)
if err != nil {
return err
}
vInfo, err := cs.Info(ctx, desc.Digest)
if err != nil {
return err
}
vInfo.Labels = map[string]string{
blobVariantGCLabel + blobDigest.String(): blobDigest.String(),
}
vInfo = addBlobDescToInfo(desc, vInfo)
if _, err := cs.Update(ctx, vInfo, fieldsFromLabels(vInfo.Labels)...); err != nil {
return err
}
// let the future call to size() recalcultate the new size
sr.mu.Lock()
sr.queueSize(sizeUnknown)
if err := sr.commitMetadata(); err != nil {
sr.mu.Unlock()
return err
}
sr.mu.Unlock()
if desc.Digest == blobDigest {
return nil
}
info.Labels = map[string]string{
blobVariantGCLabel + desc.Digest.String(): desc.Digest.String(),
}
_, err = cs.Update(ctx, info, fieldsFromLabels(info.Labels)...)
return err
}
func getCompressionVariants(ctx context.Context, cs content.Store, dgst digest.Digest) (res []compression.Type, _ error) {
info, err := cs.Info(ctx, dgst)
if errors.Is(err, errdefs.ErrNotFound) {
return nil, nil
} else if err != nil {
return nil, err
func (sr *immutableRef) getBlobWithCompression(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) {
if _, err := sr.cm.ContentStore.Info(ctx, sr.getBlob()); err != nil {
return ocispecs.Descriptor{}, err
}
for k := range info.Labels {
if strings.HasPrefix(k, compressionVariantDigestLabelPrefix) {
if t := compression.Parse(strings.TrimPrefix(k, compressionVariantDigestLabelPrefix)); t != compression.UnknownCompression {
res = append(res, t)
}
}
}
return
}
func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) {
return getCompressionVariantBlob(ctx, sr.cm.ContentStore, sr.getBlob(), compressionType)
}
func getCompressionVariantBlob(ctx context.Context, cs content.Store, dgst digest.Digest, compressionType compression.Type) (ocispecs.Descriptor, error) {
info, err := cs.Info(ctx, dgst)
desc, err := sr.ociDesc(ctx, nil, true)
if err != nil {
return ocispecs.Descriptor{}, err
}
dgstS, ok := info.Labels[compressionVariantDigestLabel(compressionType)]
if ok {
dgst, err := digest.Parse(dgstS)
if err != nil {
return ocispecs.Descriptor{}, err
return getBlobWithCompression(ctx, sr.cm.ContentStore, desc, compressionType)
}
return getBlobDesc(ctx, cs, dgst)
func getBlobWithCompression(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, compressionType compression.Type) (ocispecs.Descriptor, error) {
if compressionType == compression.UnknownCompression {
return ocispecs.Descriptor{}, fmt.Errorf("cannot get unknown compression type")
}
var target *ocispecs.Descriptor
if err := walkBlob(ctx, cs, desc, func(desc ocispecs.Descriptor) bool {
if needs, err := needsConversion(ctx, cs, desc, compressionType); err == nil && !needs {
target = &desc
return false
}
return true
}); err != nil || target == nil {
return ocispecs.Descriptor{}, errdefs.ErrNotFound
}
return *target, nil
}
func (sr *immutableRef) addCompressionBlob(ctx context.Context, desc ocispecs.Descriptor, compressionType compression.Type) error {
cs := sr.cm.ContentStore
if err := sr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{
ID: desc.Digest.String(),
Type: "content",
}); err != nil {
func walkBlob(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, f func(ocispecs.Descriptor) bool) error {
if !f(desc) {
return nil
}
if _, err := walkBlobVariantsOnly(ctx, cs, desc.Digest, func(desc ocispecs.Descriptor) bool { return f(desc) }, nil); err != nil {
return err
}
info, err := cs.Info(ctx, sr.getBlob())
return nil
}
func walkBlobVariantsOnly(ctx context.Context, cs content.Store, dgst digest.Digest, f func(ocispecs.Descriptor) bool, visited map[digest.Digest]struct{}) (bool, error) {
if visited == nil {
visited = make(map[digest.Digest]struct{})
}
visited[dgst] = struct{}{}
info, err := cs.Info(ctx, dgst)
if errors.Is(err, errdefs.ErrNotFound) {
return true, nil
} else if err != nil {
return false, err
}
var children []digest.Digest
for k, dgstS := range info.Labels {
if !strings.HasPrefix(k, blobVariantGCLabel) {
continue
}
cDgst, err := digest.Parse(dgstS)
if err != nil || cDgst == dgst {
continue
}
if cDesc, err := getBlobDesc(ctx, cs, cDgst); err == nil {
if !f(cDesc) {
return false, nil
}
}
children = append(children, cDgst)
}
for _, c := range children {
if _, isVisited := visited[c]; isVisited {
continue
}
if isContinue, err := walkBlobVariantsOnly(ctx, cs, c, f, visited); !isContinue || err != nil {
return isContinue, err
}
}
return true, nil
}
func getBlobDesc(ctx context.Context, cs content.Store, dgst digest.Digest) (ocispecs.Descriptor, error) {
info, err := cs.Info(ctx, dgst)
if err != nil {
return err
return ocispecs.Descriptor{}, err
}
if info.Labels == nil {
return ocispecs.Descriptor{}, fmt.Errorf("no blob metadata is stored for %q", info.Digest)
}
mt, ok := info.Labels[blobMediaTypeLabel]
if !ok {
return ocispecs.Descriptor{}, fmt.Errorf("no media type is stored for %q", info.Digest)
}
desc := ocispecs.Descriptor{
Digest: info.Digest,
Size: info.Size,
MediaType: mt,
}
for k, v := range info.Labels {
if strings.HasPrefix(k, blobAnnotationsLabelPrefix) {
if desc.Annotations == nil {
desc.Annotations = make(map[string]string)
}
desc.Annotations[strings.TrimPrefix(k, blobAnnotationsLabelPrefix)] = v
}
}
if len(desc.URLs) == 0 {
// If there are no URL's, there is no reason to have this be non-dsitributable
desc.MediaType = layerToDistributable(desc.MediaType)
}
return desc, nil
}
func addBlobDescToInfo(desc ocispecs.Descriptor, info content.Info) content.Info {
if _, ok := info.Labels[blobMediaTypeLabel]; ok {
return info // descriptor information already stored
}
if info.Labels == nil {
info.Labels = make(map[string]string)
}
cachedVariantLabel := compressionVariantDigestLabel(compressionType)
info.Labels[cachedVariantLabel] = desc.Digest.String()
if _, err := cs.Update(ctx, info, "labels."+cachedVariantLabel); err != nil {
return err
}
info, err = cs.Info(ctx, desc.Digest)
if err != nil {
return err
}
var fields []string
info.Labels = map[string]string{
compressionVariantMediaTypeLabel: desc.MediaType,
}
fields = append(fields, "labels."+compressionVariantMediaTypeLabel)
info.Labels[blobMediaTypeLabel] = desc.MediaType
for k, v := range filterAnnotationsForSave(desc.Annotations) {
k2 := compressionVariantAnnotationsLabelPrefix + k
info.Labels[k2] = v
fields = append(fields, "labels."+k2)
info.Labels[blobAnnotationsLabelPrefix+k] = v
}
if _, err := cs.Update(ctx, info, fields...); err != nil {
return err
}
return nil
return info
}
func filterAnnotationsForSave(a map[string]string) (b map[string]string) {
@ -792,33 +868,11 @@ func filterAnnotationsForSave(a map[string]string) (b map[string]string) {
return
}
func getBlobDesc(ctx context.Context, cs content.Store, dgst digest.Digest) (ocispecs.Descriptor, error) {
info, err := cs.Info(ctx, dgst)
if err != nil {
return ocispecs.Descriptor{}, err
func fieldsFromLabels(labels map[string]string) (fields []string) {
for k := range labels {
fields = append(fields, "labels."+k)
}
if info.Labels == nil {
return ocispecs.Descriptor{}, fmt.Errorf("no blob metadata is stored for %q", info.Digest)
}
mt, ok := info.Labels[compressionVariantMediaTypeLabel]
if !ok {
return ocispecs.Descriptor{}, fmt.Errorf("no media type is stored for %q", info.Digest)
}
desc := ocispecs.Descriptor{
Digest: info.Digest,
Size: info.Size,
MediaType: mt,
}
for k, v := range info.Labels {
if strings.HasPrefix(k, compressionVariantAnnotationsLabelPrefix) {
if desc.Annotations == nil {
desc.Annotations = make(map[string]string)
}
desc.Annotations[strings.TrimPrefix(k, compressionVariantAnnotationsLabelPrefix)] = v
}
}
return desc, nil
return
}
func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (_ snapshot.Mountable, rerr error) {

46
cache/remote.go vendored
View File

@ -12,6 +12,7 @@ import (
"github.com/moby/buildkit/cache/config"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
@ -53,7 +54,7 @@ func (sr *immutableRef) GetRemotes(ctx context.Context, createIfNeeded bool, ref
// compression with all combination of copmressions
res := []*solver.Remote{remote}
topmost, parentChain := remote.Descriptors[len(remote.Descriptors)-1], remote.Descriptors[:len(remote.Descriptors)-1]
vDesc, err := getCompressionVariantBlob(ctx, sr.cm.ContentStore, topmost.Digest, refCfg.Compression.Type)
vDesc, err := getBlobWithCompression(ctx, sr.cm.ContentStore, topmost, refCfg.Compression.Type)
if err != nil {
return res, nil // compression variant doesn't exist. return the main blob only.
}
@ -108,16 +109,16 @@ func getAvailableBlobs(ctx context.Context, cs content.Store, chain *solver.Remo
if err != nil {
return nil, err
}
compressions, err := getCompressionVariants(ctx, cs, target.Digest)
if err != nil {
return nil, err
var descs []ocispecs.Descriptor
if err := walkBlob(ctx, cs, target, func(desc ocispecs.Descriptor) bool {
descs = append(descs, desc)
return true
}); err != nil {
bklog.G(ctx).WithError(err).Warn("failed to walk variant blob") // is not a critical error at this moment.
}
var res []*solver.Remote
for _, c := range compressions {
desc, err := getCompressionVariantBlob(ctx, cs, target.Digest, c)
if err != nil {
return nil, err
}
for _, desc := range descs {
desc := desc
if len(parents) == 0 { // bottommost ref
res = append(res, &solver.Remote{
Descriptors: []ocispecs.Descriptor{desc},
@ -217,9 +218,9 @@ func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refC
} else if needs {
// ensure the compression type.
// compressed blob must be created and stored in the content store.
blobDesc, err := ref.getCompressionBlob(ctx, refCfg.Compression.Type)
blobDesc, err := getBlobWithCompressionWithRetry(ctx, ref, refCfg.Compression, s)
if err != nil {
return nil, errors.Wrapf(err, "compression blob for %q not found", refCfg.Compression.Type)
return nil, errors.Wrapf(err, "failed to get compression blob %q", refCfg.Compression.Type)
}
newDesc := desc
newDesc.MediaType = blobDesc.MediaType
@ -251,6 +252,16 @@ func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refC
return remote, nil
}
func getBlobWithCompressionWithRetry(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) (ocispecs.Descriptor, error) {
if blobDesc, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil {
return blobDesc, nil
}
if err := ensureCompression(ctx, ref, comp, s); err != nil {
return ocispecs.Descriptor{}, errors.Wrapf(err, "failed to get and ensure compression type of %q", comp.Type)
}
return ref.getBlobWithCompression(ctx, comp.Type)
}
type lazyMultiProvider struct {
mprovider *contentutil.MultiProvider
plist []lazyRefProvider
@ -300,6 +311,11 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
} else if !isLazy {
return nil, nil
}
defer func() {
if rerr == nil {
rerr = p.ref.linkBlob(ctx, p.desc)
}
}()
if p.dh == nil {
// shouldn't happen, if you have a lazy immutable ref it already should be validated
@ -334,14 +350,6 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
}
}
compressionType := compression.FromMediaType(p.desc.MediaType)
if compressionType == compression.UnknownCompression {
return nil, errors.Errorf("unhandled layer media type: %q", p.desc.MediaType)
}
if err := p.ref.addCompressionBlob(ctx, p.desc, compressionType); err != nil {
return nil, err
}
return nil, nil
})
return err