Propagate compression options to the inline cache export

Co-authored-by: Tonis Tiigi <tonistiigi@gmail.com>
Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
master
Kohei Tokunaga 2021-10-05 15:43:22 +09:00
parent b2ff444122
commit f9e0346b34
21 changed files with 488 additions and 59 deletions

121
cache/manager_test.go vendored
View File

@ -14,6 +14,7 @@ import (
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -35,6 +36,7 @@ import (
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/snapshot"
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil" "github.com/moby/buildkit/util/leaseutil"
@ -1125,7 +1127,9 @@ func TestConversion(t *testing.T) {
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
} }
func TestGetRemote(t *testing.T) { type idxToVariants []map[compression.Type]ocispecs.Descriptor
func TestGetRemotes(t *testing.T) {
t.Parallel() t.Parallel()
// windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'" // windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'"
if runtime.GOOS != "linux" { if runtime.GOOS != "linux" {
@ -1251,15 +1255,24 @@ func TestGetRemote(t *testing.T) {
checkNumBlobs(ctx, t, co.cs, 1) checkNumBlobs(ctx, t, co.cs, 1)
// Call GetRemote on all the refs variantsMap := make(map[string]idxToVariants)
var variantsMapMu sync.Mutex
// Call GetRemotes on all the refs
eg, egctx := errgroup.WithContext(ctx) eg, egctx := errgroup.WithContext(ctx)
for _, ir := range refs { for _, ir := range refs {
ir := ir.(*immutableRef) ir := ir.(*immutableRef)
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} { for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType compressionType := compressionType
compressionopt := solver.CompressionOpt{
Type: compressionType,
Force: true,
}
eg.Go(func() error { eg.Go(func() error {
remote, err := ir.GetRemote(egctx, true, compressionType, true, nil) remotes, err := ir.GetRemotes(egctx, true, compressionopt, false, nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(remotes))
remote := remotes[0]
refChain := ir.parentRefChain() refChain := ir.parentRefChain()
for i, desc := range remote.Descriptors { for i, desc := range remote.Descriptors {
switch compressionType { switch compressionType {
@ -1278,6 +1291,21 @@ func TestGetRemote(t *testing.T) {
require.Contains(t, expectedContent, dgst, "for %v", compressionType) require.Contains(t, expectedContent, dgst, "for %v", compressionType)
checkDescriptor(ctx, t, co.cs, desc, compressionType) checkDescriptor(ctx, t, co.cs, desc, compressionType)
variantsMapMu.Lock()
if len(variantsMap[ir.ID()]) == 0 {
variantsMap[ir.ID()] = make(idxToVariants, len(remote.Descriptors))
}
variantsMapMu.Unlock()
require.Equal(t, len(remote.Descriptors), len(variantsMap[ir.ID()]))
variantsMapMu.Lock()
if variantsMap[ir.ID()][i] == nil {
variantsMap[ir.ID()][i] = make(map[compression.Type]ocispecs.Descriptor)
}
variantsMap[ir.ID()][i][compressionType] = desc
variantsMapMu.Unlock()
r := refChain[i] r := refChain[i]
isLazy, err := r.isLazy(egctx) isLazy, err := r.isLazy(egctx)
require.NoError(t, err) require.NoError(t, err)
@ -1318,6 +1346,93 @@ func TestGetRemote(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, map[digest.Digest]struct{}{}, expectedContent) require.Equal(t, map[digest.Digest]struct{}{}, expectedContent)
// Check if "all" option returns all available blobs
for _, ir := range refs {
ir := ir.(*immutableRef)
variantsMapMu.Lock()
variants, ok := variantsMap[ir.ID()]
variantsMapMu.Unlock()
require.True(t, ok, ir.ID())
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType
compressionopt := solver.CompressionOpt{Type: compressionType}
eg.Go(func() error {
remotes, err := ir.GetRemotes(egctx, false, compressionopt, true, nil)
require.NoError(t, err)
require.True(t, len(remotes) > 0, "for %s : %d", compressionType, len(remotes))
gotMain, gotVariants := remotes[0], remotes[1:]
// Check the main blob is compatible with all == false
mainOnly, err := ir.GetRemotes(egctx, false, compressionopt, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(mainOnly))
mainRemote := mainOnly[0]
require.Equal(t, len(mainRemote.Descriptors), len(gotMain.Descriptors))
for i := 0; i < len(mainRemote.Descriptors); i++ {
require.Equal(t, mainRemote.Descriptors[i].Digest, gotMain.Descriptors[i].Digest)
}
// Check all variants are covered
checkVariantsCoverage(egctx, t, variants, len(remotes[0].Descriptors)-1, gotVariants, &compressionType)
return nil
})
}
}
require.NoError(t, eg.Wait())
}
func checkVariantsCoverage(ctx context.Context, t *testing.T, variants idxToVariants, idx int, remotes []*solver.Remote, expectCompression *compression.Type) {
if idx < 0 {
for _, r := range remotes {
require.Equal(t, len(r.Descriptors), 0)
}
return
}
// check the contents of the topmost blob of each remote
got := make(map[digest.Digest][]*solver.Remote)
for _, r := range remotes {
require.Equal(t, len(r.Descriptors)-1, idx, "idx = %d", idx)
// record this variant
topmost, lower := r.Descriptors[idx], r.Descriptors[:idx]
got[topmost.Digest] = append(got[topmost.Digest], &solver.Remote{Descriptors: lower, Provider: r.Provider})
// check the contents
r, err := r.Provider.ReaderAt(ctx, topmost)
require.NoError(t, err)
dgstr := digest.Canonical.Digester()
_, err = io.Copy(dgstr.Hash(), io.NewSectionReader(r, 0, topmost.Size))
require.NoError(t, err)
require.NoError(t, r.Close())
require.Equal(t, dgstr.Digest(), topmost.Digest)
}
// check the lowers as well
eg, egctx := errgroup.WithContext(ctx)
for _, lowers := range got {
lowers := lowers
eg.Go(func() error {
checkVariantsCoverage(egctx, t, variants, idx-1, lowers, nil) // expect all compression variants
return nil
})
}
require.NoError(t, eg.Wait())
// check the coverage of the variants
targets := variants[idx]
if expectCompression != nil {
c, ok := variants[idx][*expectCompression]
require.True(t, ok, "idx = %d, compression = %q, variants = %+v, got = %+v", idx, *expectCompression, variants[idx], got)
targets = map[compression.Type]ocispecs.Descriptor{*expectCompression: c}
}
for c, d := range targets {
_, ok := got[d.Digest]
require.True(t, ok, "idx = %d, compression = %q, want = %+v, got = %+v", idx, c, d, got)
delete(got, d.Digest)
}
require.Equal(t, 0, len(got))
} }
func checkInfo(ctx context.Context, t *testing.T, cs content.Store, info content.Info) { func checkInfo(ctx context.Context, t *testing.T, cs content.Store, info content.Info) {

26
cache/refs.go vendored
View File

@ -42,7 +42,7 @@ type ImmutableRef interface {
Clone() ImmutableRef Clone() ImmutableRef
Extract(ctx context.Context, s session.Group) error // +progress Extract(ctx context.Context, s session.Group) error // +progress
GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error) GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error)
} }
type MutableRef interface { type MutableRef interface {
@ -376,9 +376,29 @@ func compressionVariantDigestLabel(compressionType compression.Type) string {
return compressionVariantDigestLabelPrefix + compressionType.String() return compressionVariantDigestLabelPrefix + compressionType.String()
} }
func getCompressionVariants(ctx context.Context, cs content.Store, dgst digest.Digest) (res []compression.Type, _ error) {
info, err := cs.Info(ctx, dgst)
if errors.Is(err, errdefs.ErrNotFound) {
return nil, nil
} else if err != nil {
return nil, err
}
for k := range info.Labels {
if strings.HasPrefix(k, compressionVariantDigestLabelPrefix) {
if t := compression.Parse(strings.TrimPrefix(k, compressionVariantDigestLabelPrefix)); t != compression.UnknownCompression {
res = append(res, t)
}
}
}
return
}
func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) { func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) {
cs := sr.cm.ContentStore return getCompressionVariantBlob(ctx, sr.cm.ContentStore, sr.getBlob(), compressionType)
info, err := cs.Info(ctx, sr.getBlob()) }
func getCompressionVariantBlob(ctx context.Context, cs content.Store, dgst digest.Digest, compressionType compression.Type) (ocispecs.Descriptor, error) {
info, err := cs.Info(ctx, dgst)
if err != nil { if err != nil {
return ocispecs.Descriptor{}, err return ocispecs.Descriptor{}, err
} }

114
cache/remote.go vendored
View File

@ -25,16 +25,122 @@ type Unlazier interface {
Unlazy(ctx context.Context) error Unlazy(ctx context.Context) error
} }
// GetRemote gets a *solver.Remote from content store for this ref (potentially pulling lazily). // GetRemotes gets []*solver.Remote from content store for this ref (potentially pulling lazily).
// Note: Use WorkerRef.GetRemote instead as moby integration requires custom GetRemote implementation. // Compressionopt can be used to specify the compression type of blobs. If Force is true, the compression
func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error) { // type is applied to all blobs in the chain. If Force is false, it's applied only to the newly created
// layers. If all is true, all available chains that has the specified compression type of topmost blob are
// appended to the result.
// Note: Use WorkerRef.GetRemotes instead as moby integration requires custom GetRemotes implementation.
func (sr *immutableRef) GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error) {
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary) ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer done(ctx) defer done(ctx)
err = sr.computeBlobChain(ctx, createIfNeeded, compressionType, forceCompression, s) // fast path if compression variants aren't required
// NOTE: compressionopt is applied only to *newly created layers* if Force != true.
remote, err := sr.getRemote(ctx, createIfNeeded, compressionopt, s)
if err != nil {
return nil, err
}
if !all || compressionopt.Force || len(remote.Descriptors) == 0 {
return []*solver.Remote{remote}, nil // early return if compression variants aren't required
}
// Search all available remotes that has the topmost blob with the specified
// compression with all combination of copmressions
res := []*solver.Remote{remote}
topmost, parentChain := remote.Descriptors[len(remote.Descriptors)-1], remote.Descriptors[:len(remote.Descriptors)-1]
vDesc, err := getCompressionVariantBlob(ctx, sr.cm.ContentStore, topmost.Digest, compressionopt.Type)
if err != nil {
return res, nil // compression variant doesn't exist. return the main blob only.
}
var variants []*solver.Remote
if len(parentChain) == 0 {
variants = append(variants, &solver.Remote{
Descriptors: []ocispecs.Descriptor{vDesc},
Provider: sr.cm.ContentStore,
})
} else {
// get parents with all combination of all available compressions.
parents, err := getAvailableBlobs(ctx, sr.cm.ContentStore, &solver.Remote{
Descriptors: parentChain,
Provider: remote.Provider,
})
if err != nil {
return nil, err
}
variants = appendRemote(parents, vDesc, sr.cm.ContentStore)
}
// Return the main remote and all its compression variants.
// NOTE: Because compressionopt is applied only to *newly created layers* in the main remote (i.e. res[0]),
// it's possible that the main remote doesn't contain any blobs of the compressionopt.Type.
// The topmost blob of the variants (res[1:]) is guaranteed to be the compressionopt.Type.
res = append(res, variants...)
return res, nil
}
func appendRemote(parents []*solver.Remote, desc ocispecs.Descriptor, p content.Provider) (res []*solver.Remote) {
for _, pRemote := range parents {
provider := contentutil.NewMultiProvider(pRemote.Provider)
provider.Add(desc.Digest, p)
res = append(res, &solver.Remote{
Descriptors: append(pRemote.Descriptors, desc),
Provider: provider,
})
}
return
}
func getAvailableBlobs(ctx context.Context, cs content.Store, chain *solver.Remote) ([]*solver.Remote, error) {
if len(chain.Descriptors) == 0 {
return nil, nil
}
target, parentChain := chain.Descriptors[len(chain.Descriptors)-1], chain.Descriptors[:len(chain.Descriptors)-1]
parents, err := getAvailableBlobs(ctx, cs, &solver.Remote{
Descriptors: parentChain,
Provider: chain.Provider,
})
if err != nil {
return nil, err
}
compressions, err := getCompressionVariants(ctx, cs, target.Digest)
if err != nil {
return nil, err
}
var res []*solver.Remote
for _, c := range compressions {
desc, err := getCompressionVariantBlob(ctx, cs, target.Digest, c)
if err != nil {
return nil, err
}
if len(parents) == 0 { // bottommost ref
res = append(res, &solver.Remote{
Descriptors: []ocispecs.Descriptor{desc},
Provider: cs,
})
continue
}
res = append(res, appendRemote(parents, desc, cs)...)
}
if len(res) == 0 {
// no available compression blobs for this blob. return the original blob.
if len(parents) == 0 { // bottommost ref
return []*solver.Remote{chain}, nil
}
return appendRemote(parents, target, chain.Provider), nil
}
return res, nil
}
func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, s session.Group) (*solver.Remote, error) {
compressionType := compressionopt.Type
forceCompression := compressionopt.Force
err := sr.computeBlobChain(ctx, createIfNeeded, compressionType, forceCompression, s)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -267,9 +267,25 @@ func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult)
return worker.NewWorkerRefResult(ref, cs.w), nil return worker.NewWorkerRefResult(ref, cs.w), nil
} }
func (cs *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult, _ session.Group) (*solver.Remote, error) { func (cs *cacheResultStorage) LoadRemotes(ctx context.Context, res solver.CacheResult, compressionopts *solver.CompressionOpt, _ session.Group) ([]*solver.Remote, error) {
if r := cs.byResultID(res.ID); r != nil && r.result != nil { if r := cs.byResultID(res.ID); r != nil && r.result != nil {
return r.result, nil if compressionopts == nil {
return []*solver.Remote{r.result}, nil
}
// Any of blobs in the remote must meet the specified compression option.
match := false
for _, desc := range r.result.Descriptors {
m := compressionopts.Type.IsMediaType(desc.MediaType)
match = match || m
if compressionopts.Force && !m {
match = false
break
}
}
if match {
return []*solver.Remote{r.result}, nil
}
return nil, nil // return nil as it's best effort.
} }
return nil, errors.WithStack(solver.ErrNotFound) return nil, errors.WithStack(solver.ErrNotFound)
} }

View File

@ -2231,6 +2231,12 @@ func testBuildExportZstd(t *testing.T, sb integration.Sandbox) {
}, },
}, },
}, },
// compression option should work even with inline cache exports
CacheExports: []CacheOptionsEntry{
{
Type: "inline",
},
},
}, nil) }, nil)
require.NoError(t, err) require.NoError(t, err)
@ -3034,6 +3040,50 @@ func testBasicInlineCacheImportExport(t *testing.T, sb integration.Sandbox) {
checkAllRemoved(t, c, sb) checkAllRemoved(t, c, sb)
// Export the cache again with compression
resp, err = c.Solve(sb.Context(), def, SolveOpt{
// specifying inline cache exporter is needed for reproducing containerimage.digest
// (not needed for reproducing rootfs/unique)
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": target,
"push": "true",
"compression": "uncompressed", // inline cache should work with compression
"force-compression": "true",
},
},
},
CacheExports: []CacheOptionsEntry{
{
Type: "inline",
},
},
CacheImports: []CacheOptionsEntry{
{
Type: "registry",
Attrs: map[string]string{
"ref": target,
},
},
},
}, nil)
require.NoError(t, err)
dgst2uncompress, ok := resp.ExporterResponse[exptypes.ExporterImageDigestKey]
require.Equal(t, ok, true)
// dgst2uncompress != dgst, because the compression type is different
unique2uncompress, err := readFileInImage(sb.Context(), c, target+"@"+dgst2uncompress, "/unique")
require.NoError(t, err)
require.EqualValues(t, unique, unique2uncompress)
err = c.Prune(sb.Context(), nil, PruneAll)
require.NoError(t, err)
checkAllRemoved(t, c, sb)
resp, err = c.Solve(sb.Context(), def, SolveOpt{ resp, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{ Exports: []ExportEntry{
{ {

View File

@ -19,6 +19,7 @@ import (
"github.com/moby/buildkit/exporter/containerimage/exptypes" "github.com/moby/buildkit/exporter/containerimage/exptypes"
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/buildinfo" "github.com/moby/buildkit/util/buildinfo"
"github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/contentutil"
@ -207,6 +208,15 @@ func (e *imageExporterInstance) Name() string {
return "exporting to image" return "exporting to image"
} }
func (e *imageExporterInstance) Config() exporter.Config {
return exporter.Config{
Compression: solver.CompressionOpt{
Type: e.layerCompression,
Force: e.forceCompression,
},
}
}
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, error) { func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, error) {
if src.Metadata == nil { if src.Metadata == nil {
src.Metadata = make(map[string][]byte) src.Metadata = make(map[string][]byte)
@ -287,11 +297,16 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
if e.push { if e.push {
annotations := map[digest.Digest]map[string]string{} annotations := map[digest.Digest]map[string]string{}
mprovider := contentutil.NewMultiProvider(e.opt.ImageWriter.ContentStore()) mprovider := contentutil.NewMultiProvider(e.opt.ImageWriter.ContentStore())
compressionopt := solver.CompressionOpt{
Type: e.layerCompression,
Force: e.forceCompression,
}
if src.Ref != nil { if src.Ref != nil {
remote, err := src.Ref.GetRemote(ctx, false, e.layerCompression, e.forceCompression, session.NewGroup(sessionID)) remotes, err := src.Ref.GetRemotes(ctx, false, compressionopt, false, session.NewGroup(sessionID))
if err != nil { if err != nil {
return nil, err return nil, err
} }
remote := remotes[0]
for _, desc := range remote.Descriptors { for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider) mprovider.Add(desc.Digest, remote.Provider)
addAnnotations(annotations, desc) addAnnotations(annotations, desc)
@ -299,10 +314,11 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
} }
if len(src.Refs) > 0 { if len(src.Refs) > 0 {
for _, r := range src.Refs { for _, r := range src.Refs {
remote, err := r.GetRemote(ctx, false, e.layerCompression, e.forceCompression, session.NewGroup(sessionID)) remotes, err := r.GetRemotes(ctx, false, compressionopt, false, session.NewGroup(sessionID))
if err != nil { if err != nil {
return nil, err return nil, err
} }
remote := remotes[0]
for _, desc := range remote.Descriptors { for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider) mprovider.Add(desc.Digest, remote.Provider)
addAnnotations(annotations, desc) addAnnotations(annotations, desc)
@ -352,10 +368,15 @@ func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Imag
} }
} }
remote, err := topLayerRef.GetRemote(ctx, true, e.layerCompression, e.forceCompression, s) compressionopt := solver.CompressionOpt{
Type: e.layerCompression,
Force: e.forceCompression,
}
remotes, err := topLayerRef.GetRemotes(ctx, true, compressionopt, false, s)
if err != nil { if err != nil {
return err return err
} }
remote := remotes[0]
// ensure the content for each layer exists locally in case any are lazy // ensure the content for each layer exists locally in case any are lazy
if unlazier, ok := remote.Provider.(cache.Unlazier); ok { if unlazier, ok := remote.Provider.(cache.Unlazier); ok {

View File

@ -175,6 +175,10 @@ func (ic *ImageWriter) exportLayers(ctx context.Context, compressionType compres
layersDone := oneOffProgress(ctx, "exporting layers") layersDone := oneOffProgress(ctx, "exporting layers")
out := make([]solver.Remote, len(refs)) out := make([]solver.Remote, len(refs))
compressionopt := solver.CompressionOpt{
Type: compressionType,
Force: forceCompression,
}
for i, ref := range refs { for i, ref := range refs {
func(i int, ref cache.ImmutableRef) { func(i int, ref cache.ImmutableRef) {
@ -182,10 +186,11 @@ func (ic *ImageWriter) exportLayers(ctx context.Context, compressionType compres
return return
} }
eg.Go(func() error { eg.Go(func() error {
remote, err := ref.GetRemote(ctx, true, compressionType, forceCompression, s) remotes, err := ref.GetRemotes(ctx, true, compressionopt, false, s)
if err != nil { if err != nil {
return err return err
} }
remote := remotes[0]
out[i] = *remote out[i] = *remote
return nil return nil
}) })

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache"
"github.com/moby/buildkit/solver"
) )
type Exporter interface { type Exporter interface {
@ -12,6 +13,7 @@ type Exporter interface {
type ExporterInstance interface { type ExporterInstance interface {
Name() string Name() string
Config() Config
Export(ctx context.Context, src Source, sessionID string) (map[string]string, error) Export(ctx context.Context, src Source, sessionID string) (map[string]string, error)
} }
@ -20,3 +22,7 @@ type Source struct {
Refs map[string]cache.ImmutableRef Refs map[string]cache.ImmutableRef
Metadata map[string][]byte Metadata map[string][]byte
} }
type Config struct {
Compression solver.CompressionOpt
}

View File

@ -46,6 +46,10 @@ func (e *localExporterInstance) Name() string {
return "exporting to client" return "exporting to client"
} }
func (e *localExporter) Config() exporter.Config {
return exporter.Config{}
}
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) { func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)

View File

@ -15,6 +15,7 @@ import (
"github.com/moby/buildkit/exporter/containerimage/exptypes" "github.com/moby/buildkit/exporter/containerimage/exptypes"
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync" "github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/buildinfo" "github.com/moby/buildkit/util/buildinfo"
"github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/contentutil"
@ -144,6 +145,15 @@ func (e *imageExporterInstance) Name() string {
return "exporting to oci image format" return "exporting to oci image format"
} }
func (e *imageExporterInstance) Config() exporter.Config {
return exporter.Config{
Compression: solver.CompressionOpt{
Type: e.layerCompression,
Force: e.forceCompression,
},
}
}
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, error) { func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, error) {
if e.opt.Variant == VariantDocker && len(src.Refs) > 0 { if e.opt.Variant == VariantDocker && len(src.Refs) > 0 {
return nil, errors.Errorf("docker exporter does not currently support exporting manifest lists") return nil, errors.Errorf("docker exporter does not currently support exporting manifest lists")
@ -227,11 +237,16 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
} }
mprovider := contentutil.NewMultiProvider(e.opt.ImageWriter.ContentStore()) mprovider := contentutil.NewMultiProvider(e.opt.ImageWriter.ContentStore())
compressionopt := solver.CompressionOpt{
Type: e.layerCompression,
Force: e.forceCompression,
}
if src.Ref != nil { if src.Ref != nil {
remote, err := src.Ref.GetRemote(ctx, false, e.layerCompression, e.forceCompression, session.NewGroup(sessionID)) remotes, err := src.Ref.GetRemotes(ctx, false, compressionopt, false, session.NewGroup(sessionID))
if err != nil { if err != nil {
return nil, err return nil, err
} }
remote := remotes[0]
// unlazy before tar export as the tar writer does not handle // unlazy before tar export as the tar writer does not handle
// layer blobs in parallel (whereas unlazy does) // layer blobs in parallel (whereas unlazy does)
if unlazier, ok := remote.Provider.(cache.Unlazier); ok { if unlazier, ok := remote.Provider.(cache.Unlazier); ok {
@ -245,10 +260,11 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
} }
if len(src.Refs) > 0 { if len(src.Refs) > 0 {
for _, r := range src.Refs { for _, r := range src.Refs {
remote, err := r.GetRemote(ctx, false, e.layerCompression, e.forceCompression, session.NewGroup(sessionID)) remotes, err := r.GetRemotes(ctx, false, compressionopt, false, session.NewGroup(sessionID))
if err != nil { if err != nil {
return nil, err return nil, err
} }
remote := remotes[0]
if unlazier, ok := remote.Provider.(cache.Unlazier); ok { if unlazier, ok := remote.Provider.(cache.Unlazier); ok {
if err := unlazier.Unlazy(ctx); err != nil { if err := unlazier.Unlazy(ctx); err != nil {
return nil, err return nil, err

View File

@ -45,6 +45,10 @@ func (e *localExporterInstance) Name() string {
return "exporting to client" return "exporting to client"
} }
func (e *localExporterInstance) Config() exporter.Config {
return exporter.Config{}
}
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) { func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {
var defers []func() var defers []func()

View File

@ -47,6 +47,6 @@ type CacheInfoLink struct {
type CacheResultStorage interface { type CacheResultStorage interface {
Save(Result, time.Time) (CacheResult, error) Save(Result, time.Time) (CacheResult, error)
Load(ctx context.Context, res CacheResult) (Result, error) Load(ctx context.Context, res CacheResult) (Result, error)
LoadRemote(ctx context.Context, res CacheResult, s session.Group) (*Remote, error) LoadRemotes(ctx context.Context, res CacheResult, compression *CompressionOpt, s session.Group) ([]*Remote, error)
Exists(id string) bool Exists(id string) bool
} }

View File

@ -78,7 +78,8 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
selector digest.Digest selector digest.Digest
} }
rec := t.Add(rootKey(e.k.Digest(), e.k.Output())) recKey := rootKey(e.k.Digest(), e.k.Output())
rec := t.Add(recKey)
allRec := []CacheExporterRecord{rec} allRec := []CacheExporterRecord{rec}
addRecord := true addRecord := true
@ -93,6 +94,8 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
var remote *Remote var remote *Remote
if v := e.record; v != nil && len(e.k.Deps()) > 0 && addRecord { if v := e.record; v != nil && len(e.k.Deps()) > 0 && addRecord {
var variants []CacheExporterRecord
cm := v.cacheManager cm := v.cacheManager
key := cm.getID(v.key) key := cm.getID(v.key)
res, err := cm.backend.Load(key, v.ID) res, err := cm.backend.Load(key, v.ID)
@ -100,21 +103,41 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
return nil, err return nil, err
} }
remote, err = cm.results.LoadRemote(ctx, res, opt.Session) remotes, err := cm.results.LoadRemotes(ctx, res, opt.CompressionOpt, opt.Session)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(remotes) > 0 {
remote, remotes = remotes[0], remotes[1:] // pop the first element
}
if opt.CompressionOpt != nil {
for _, r := range remotes { // record all remaining remotes as well
rec := t.Add(recKey)
rec.AddResult(v.CreatedAt, r)
variants = append(variants, rec)
}
}
if remote == nil && opt.Mode != CacheExportModeRemoteOnly { if (remote == nil || opt.CompressionOpt != nil) && opt.Mode != CacheExportModeRemoteOnly {
res, err := cm.results.Load(ctx, res) res, err := cm.results.Load(ctx, res)
if err != nil { if err != nil {
return nil, err return nil, err
} }
remote, err = opt.Convert(ctx, res) remotes, err := opt.ResolveRemotes(ctx, res)
if err != nil { if err != nil {
return nil, err return nil, err
} }
res.Release(context.TODO()) res.Release(context.TODO())
if remote == nil {
remote, remotes = remotes[0], remotes[1:] // pop the first element
}
if opt.CompressionOpt != nil {
for _, r := range remotes { // record all remaining remotes as well
rec := t.Add(recKey)
rec.AddResult(v.CreatedAt, r)
variants = append(variants, rec)
}
}
} }
if remote != nil { if remote != nil {
@ -122,6 +145,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
rec.AddResult(v.CreatedAt, remote) rec.AddResult(v.CreatedAt, remote)
} }
} }
allRec = append(allRec, variants...)
} }
if remote != nil && opt.Mode == CacheExportModeMin { if remote != nil && opt.Mode == CacheExportModeMin {
@ -154,15 +178,17 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
} }
} }
for i, srcs := range srcs { for _, rec := range allRec {
for _, src := range srcs { for i, srcs := range srcs {
rec.LinkFrom(src.r, i, src.selector.String()) for _, src := range srcs {
rec.LinkFrom(src.r, i, src.selector.String())
}
} }
}
for cm, id := range e.k.ids { for cm, id := range e.k.ids {
if _, err := addBacklinks(t, rec, cm, id, bkm); err != nil { if _, err := addBacklinks(t, rec, cm, id, bkm); err != nil {
return nil, err return nil, err
}
} }
} }

View File

@ -8,7 +8,6 @@ import (
"github.com/moby/buildkit/cache/contenthash" "github.com/moby/buildkit/cache/contenthash"
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/worker" "github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -82,13 +81,13 @@ func NewContentHashFunc(selectors []Selector) solver.ResultBasedCacheFunc {
} }
} }
func workerRefConverter(g session.Group) func(ctx context.Context, res solver.Result) (*solver.Remote, error) { func workerRefResolver(compressionopt solver.CompressionOpt, all bool, g session.Group) func(ctx context.Context, res solver.Result) ([]*solver.Remote, error) {
return func(ctx context.Context, res solver.Result) (*solver.Remote, error) { return func(ctx context.Context, res solver.Result) ([]*solver.Remote, error) {
ref, ok := res.Sys().(*worker.WorkerRef) ref, ok := res.Sys().(*worker.WorkerRef)
if !ok { if !ok {
return nil, errors.Errorf("invalid result: %T", res.Sys()) return nil, errors.Errorf("invalid result: %T", res.Sys())
} }
return ref.GetRemote(ctx, true, compression.Default, false, g) return ref.GetRemotes(ctx, true, compressionopt, all, g)
} }
} }

View File

@ -183,7 +183,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro
inp.Metadata[exptypes.ExporterBuildInfo] = dtbi inp.Metadata[exptypes.ExporterBuildInfo] = dtbi
} }
dtic, err := inlineCache(ctx, exp.CacheExporter, r, session.NewGroup(sessionID)) dtic, err := inlineCache(ctx, exp.CacheExporter, r, e.Config().Compression, session.NewGroup(sessionID))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -215,7 +215,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro
inp.Metadata[fmt.Sprintf("%s/%s", exptypes.ExporterBuildInfo, k)] = dtbi inp.Metadata[fmt.Sprintf("%s/%s", exptypes.ExporterBuildInfo, k)] = dtbi
} }
dtic, err := inlineCache(ctx, exp.CacheExporter, r, session.NewGroup(sessionID)) dtic, err := inlineCache(ctx, exp.CacheExporter, r, e.Config().Compression, session.NewGroup(sessionID))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -247,7 +247,9 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro
} }
// all keys have same export chain so exporting others is not needed // all keys have same export chain so exporting others is not needed
_, err = r.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{ _, err = r.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{
Convert: workerRefConverter(g), ResolveRemotes: workerRefResolver(solver.CompressionOpt{
Type: compression.Default, // TODO: make configurable
}, false, g),
Mode: exp.CacheExportMode, Mode: exp.CacheExportMode,
Session: g, Session: g,
}) })
@ -286,7 +288,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro
}, nil }, nil
} }
func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedResult, g session.Group) ([]byte, error) { func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedResult, compressionopt solver.CompressionOpt, g session.Group) ([]byte, error) {
if efl, ok := e.(interface { if efl, ok := e.(interface {
ExportForLayers([]digest.Digest) ([]byte, error) ExportForLayers([]digest.Digest) ([]byte, error)
}); ok { }); ok {
@ -295,10 +297,11 @@ func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedR
return nil, errors.Errorf("invalid reference: %T", res.Sys()) return nil, errors.Errorf("invalid reference: %T", res.Sys())
} }
remote, err := workerRef.GetRemote(ctx, true, compression.Default, false, g) remotes, err := workerRef.GetRemotes(ctx, true, compressionopt, false, g)
if err != nil || remote == nil { if err != nil || len(remotes) == 0 {
return nil, nil return nil, nil
} }
remote := remotes[0]
digests := make([]digest.Digest, 0, len(remote.Descriptors)) digests := make([]digest.Digest, 0, len(remote.Descriptors))
for _, desc := range remote.Descriptors { for _, desc := range remote.Descriptors {
@ -306,9 +309,10 @@ func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedR
} }
if _, err := res.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{ if _, err := res.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{
Convert: workerRefConverter(g), ResolveRemotes: workerRefResolver(compressionopt, true, g), // load as many compression blobs as possible
Mode: solver.CacheExportModeMin, Mode: solver.CacheExportModeMin,
Session: g, Session: g,
CompressionOpt: &compressionopt, // cache possible compression variants
}); err != nil { }); err != nil {
return nil, err return nil, err
} }

View File

@ -298,7 +298,7 @@ func (s *inMemoryResultStore) Load(ctx context.Context, res CacheResult) (Result
return v.(Result), nil return v.(Result), nil
} }
func (s *inMemoryResultStore) LoadRemote(_ context.Context, _ CacheResult, _ session.Group) (*Remote, error) { func (s *inMemoryResultStore) LoadRemotes(_ context.Context, _ CacheResult, _ *CompressionOpt, _ session.Group) ([]*Remote, error) {
return nil, nil return nil, nil
} }

View File

@ -3789,11 +3789,11 @@ func testExporterOpts(all bool) CacheExportOpt {
mode = CacheExportModeMax mode = CacheExportModeMax
} }
return CacheExportOpt{ return CacheExportOpt{
Convert: func(ctx context.Context, res Result) (*Remote, error) { ResolveRemotes: func(ctx context.Context, res Result) ([]*Remote, error) {
if dr, ok := res.Sys().(*dummyResult); ok { if dr, ok := res.Sys().(*dummyResult); ok {
return &Remote{Descriptors: []ocispecs.Descriptor{{ return []*Remote{{Descriptors: []ocispecs.Descriptor{{
Annotations: map[string]string{"value": fmt.Sprintf("%d", dr.intValue)}, Annotations: map[string]string{"value": fmt.Sprintf("%d", dr.intValue)},
}}}, nil }}}}, nil
} }
return nil, nil return nil, nil
}, },

View File

@ -7,6 +7,7 @@ import (
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1" ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
) )
@ -93,12 +94,21 @@ const (
// CacheExportOpt defines options for exporting build cache // CacheExportOpt defines options for exporting build cache
type CacheExportOpt struct { type CacheExportOpt struct {
// Convert can convert a build result to transferable object // ResolveRemotes can convert a build result to transferable objects
Convert func(context.Context, Result) (*Remote, error) ResolveRemotes func(context.Context, Result) ([]*Remote, error)
// Mode defines a cache export algorithm // Mode defines a cache export algorithm
Mode CacheExportMode Mode CacheExportMode
// Session is the session group to client (for auth credentials etc) // Session is the session group to client (for auth credentials etc)
Session session.Group Session session.Group
// CompressionOpt is an option to specify the compression of the object to load.
// If specified, all objects that meet the option will be cached.
CompressionOpt *CompressionOpt
}
// CompressionOpt is compression information of a blob
type CompressionOpt struct {
Type compression.Type
Force bool
} }
// CacheExporter can export the artifacts of the build chain // CacheExporter can export the artifacts of the build chain

View File

@ -41,6 +41,21 @@ const (
var Default = Gzip var Default = Gzip
func Parse(t string) Type {
switch t {
case "uncompressed":
return Uncompressed
case "gzip":
return Gzip
case "estargz":
return EStargz
case "zstd":
return Zstd
default:
return UnknownCompression
}
}
func (ct Type) String() string { func (ct Type) String() string {
switch ct { switch ct {
case Uncompressed: case Uncompressed:
@ -69,6 +84,14 @@ func (ct Type) DefaultMediaType() string {
} }
} }
func (ct Type) IsMediaType(mt string) bool {
mt, ok := toOCILayerType[mt]
if !ok {
return false
}
return mt == ct.DefaultMediaType()
}
func FromMediaType(mediaType string) Type { func FromMediaType(mediaType string) Type {
switch toOCILayerType[mediaType] { switch toOCILayerType[mediaType] {
case ocispecs.MediaTypeImageLayer: case ocispecs.MediaTypeImageLayer:

View File

@ -66,7 +66,7 @@ func (s *cacheResultStorage) load(ctx context.Context, id string, hidden bool) (
return NewWorkerRefResult(ref, w), nil return NewWorkerRefResult(ref, w), nil
} }
func (s *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult, g session.Group) (*solver.Remote, error) { func (s *cacheResultStorage) LoadRemotes(ctx context.Context, res solver.CacheResult, compressionopt *solver.CompressionOpt, g session.Group) ([]*solver.Remote, error) {
w, refID, err := s.getWorkerRef(res.ID) w, refID, err := s.getWorkerRef(res.ID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -77,11 +77,16 @@ func (s *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheRes
} }
defer ref.Release(context.TODO()) defer ref.Release(context.TODO())
wref := WorkerRef{ref, w} wref := WorkerRef{ref, w}
remote, err := wref.GetRemote(ctx, false, compression.Default, false, g) all := true // load as many compression blobs as possible
if compressionopt == nil {
compressionopt = &solver.CompressionOpt{Type: compression.Default}
all = false
}
remotes, err := wref.GetRemotes(ctx, false, *compressionopt, all, g)
if err != nil { if err != nil {
return nil, nil // ignore error. loadRemote is best effort return nil, nil // ignore error. loadRemote is best effort
} }
return remote, nil return remotes, nil
} }
func (s *cacheResultStorage) Exists(id string) bool { func (s *cacheResultStorage) Exists(id string) bool {
ref, err := s.load(context.TODO(), id, true) ref, err := s.load(context.TODO(), id, true)

View File

@ -6,7 +6,6 @@ import (
"github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache"
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
) )
func NewWorkerRefResult(ref cache.ImmutableRef, worker Worker) solver.Result { func NewWorkerRefResult(ref cache.ImmutableRef, worker Worker) solver.Result {
@ -26,16 +25,16 @@ func (wr *WorkerRef) ID() string {
return wr.Worker.ID() + "::" + refID return wr.Worker.ID() + "::" + refID
} }
// GetRemote method abstracts ImmutableRef's GetRemote to allow a Worker to override. // GetRemotes method abstracts ImmutableRef's GetRemotes to allow a Worker to override.
// This is needed for moby integration. // This is needed for moby integration.
// Use this method instead of calling ImmutableRef.GetRemote() directly. // Use this method instead of calling ImmutableRef.GetRemotes() directly.
func (wr *WorkerRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, g session.Group) (*solver.Remote, error) { func (wr *WorkerRef) GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, all bool, g session.Group) ([]*solver.Remote, error) {
if w, ok := wr.Worker.(interface { if w, ok := wr.Worker.(interface {
GetRemote(context.Context, cache.ImmutableRef, bool, compression.Type, bool, session.Group) (*solver.Remote, error) GetRemotes(context.Context, cache.ImmutableRef, bool, solver.CompressionOpt, bool, session.Group) ([]*solver.Remote, error)
}); ok { }); ok {
return w.GetRemote(ctx, wr.ImmutableRef, createIfNeeded, compressionType, forceCompression, g) return w.GetRemotes(ctx, wr.ImmutableRef, createIfNeeded, compressionopt, all, g)
} }
return wr.ImmutableRef.GetRemote(ctx, createIfNeeded, compressionType, forceCompression, g) return wr.ImmutableRef.GetRemotes(ctx, createIfNeeded, compressionopt, all, g)
} }
type workerRefResult struct { type workerRefResult struct {