Merge pull request #2648 from ktock/cache-stargz-snapshotter
Enable estargz-based lazy pulling on registry cache importermaster
commit
0154dfcfaf
|
@ -6,10 +6,12 @@ import (
|
|||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/remotes/docker"
|
||||
"github.com/containerd/containerd/snapshots"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/moby/buildkit/cache/remotecache"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/util/contentutil"
|
||||
"github.com/moby/buildkit/util/estargz"
|
||||
"github.com/moby/buildkit/util/push"
|
||||
"github.com/moby/buildkit/util/resolver"
|
||||
"github.com/moby/buildkit/util/resolver/limited"
|
||||
|
@ -105,3 +107,17 @@ func (dsl *withDistributionSourceLabel) SetDistributionSourceAnnotation(desc oci
|
|||
desc.Annotations["containerd.io/distribution.source.ref"] = dsl.ref
|
||||
return desc
|
||||
}
|
||||
|
||||
func (dsl *withDistributionSourceLabel) SnapshotLabels(descs []ocispecs.Descriptor, index int) map[string]string {
|
||||
if len(descs) < index {
|
||||
return nil
|
||||
}
|
||||
labels := snapshots.FilterInheritedLabels(descs[index].Annotations)
|
||||
if labels == nil {
|
||||
labels = make(map[string]string)
|
||||
}
|
||||
for k, v := range estargz.SnapshotLabels(dsl.ref, descs, index) {
|
||||
labels[k] = v
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
|
|
@ -139,6 +139,7 @@ func TestIntegration(t *testing.T) {
|
|||
testSourceMapFromRef,
|
||||
testLazyImagePush,
|
||||
testStargzLazyPull,
|
||||
testStargzLazyInlineCacheImportExport,
|
||||
testFileOpInputSwap,
|
||||
testRelativeMountpoint,
|
||||
testLocalSourceDiffer,
|
||||
|
@ -2882,6 +2883,161 @@ func testBuildPushAndValidate(t *testing.T, sb integration.Sandbox) {
|
|||
require.False(t, ok)
|
||||
}
|
||||
|
||||
func testStargzLazyInlineCacheImportExport(t *testing.T, sb integration.Sandbox) {
|
||||
skipDockerd(t, sb)
|
||||
requiresLinux(t)
|
||||
|
||||
cdAddress := sb.ContainerdAddress()
|
||||
if cdAddress == "" || sb.Snapshotter() != "stargz" {
|
||||
t.Skip("test requires containerd worker with stargz snapshotter")
|
||||
}
|
||||
|
||||
client, err := newContainerd(cdAddress)
|
||||
require.NoError(t, err)
|
||||
defer client.Close()
|
||||
registry, err := sb.NewRegistry()
|
||||
if errors.Is(err, integration.ErrRequirements) {
|
||||
t.Skip(err.Error())
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
var (
|
||||
imageService = client.ImageService()
|
||||
contentStore = client.ContentStore()
|
||||
ctx = namespaces.WithNamespace(sb.Context(), "buildkit")
|
||||
)
|
||||
|
||||
c, err := New(sb.Context(), sb.Address())
|
||||
require.NoError(t, err)
|
||||
defer c.Close()
|
||||
|
||||
// Prepare stargz inline cache
|
||||
orgImage := "docker.io/library/alpine:latest"
|
||||
sgzImage := registry + "/stargz/alpine:" + identity.NewID()
|
||||
baseDef := llb.Image(orgImage).Run(llb.Args([]string{"/bin/touch", "/foo"}))
|
||||
def, err := baseDef.Marshal(sb.Context())
|
||||
require.NoError(t, err)
|
||||
_, err = c.Solve(sb.Context(), def, SolveOpt{
|
||||
Exports: []ExportEntry{
|
||||
{
|
||||
Type: ExporterImage,
|
||||
Attrs: map[string]string{
|
||||
"name": sgzImage,
|
||||
"push": "true",
|
||||
"compression": "estargz",
|
||||
"oci-mediatypes": "true",
|
||||
"force-compression": "true",
|
||||
},
|
||||
},
|
||||
},
|
||||
CacheExports: []CacheOptionsEntry{
|
||||
{
|
||||
Type: "inline",
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// clear all local state out
|
||||
err = imageService.Delete(ctx, sgzImage, images.SynchronousDelete())
|
||||
require.NoError(t, err)
|
||||
checkAllReleasable(t, c, sb, true)
|
||||
|
||||
// stargz layers should be lazy even for executing something on them
|
||||
def, err = baseDef.
|
||||
Run(llb.Args([]string{"/bin/touch", "/bar"})).
|
||||
Marshal(sb.Context())
|
||||
require.NoError(t, err)
|
||||
target := registry + "/buildkit/testlazyimage:" + identity.NewID()
|
||||
_, err = c.Solve(sb.Context(), def, SolveOpt{
|
||||
Exports: []ExportEntry{
|
||||
{
|
||||
Type: ExporterImage,
|
||||
Attrs: map[string]string{
|
||||
"name": target,
|
||||
"push": "true",
|
||||
"oci-mediatypes": "true",
|
||||
"compression": "estargz",
|
||||
},
|
||||
},
|
||||
},
|
||||
CacheExports: []CacheOptionsEntry{
|
||||
{
|
||||
Type: "inline",
|
||||
},
|
||||
},
|
||||
CacheImports: []CacheOptionsEntry{
|
||||
{
|
||||
Type: "registry",
|
||||
Attrs: map[string]string{
|
||||
"ref": sgzImage,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
img, err := imageService.Get(ctx, target)
|
||||
require.NoError(t, err)
|
||||
|
||||
manifest, err := images.Manifest(ctx, contentStore, img.Target, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check if image layers are lazy.
|
||||
// The topmost(last) layer created by `Run` isn't lazy so we skip the check for the layer.
|
||||
var sgzLayers []ocispecs.Descriptor
|
||||
for i, layer := range manifest.Layers[:len(manifest.Layers)-1] {
|
||||
_, err = contentStore.Info(ctx, layer.Digest)
|
||||
require.ErrorIs(t, err, ctderrdefs.ErrNotFound, "unexpected error %v on layer %+v (%d)", err, layer, i)
|
||||
sgzLayers = append(sgzLayers, layer)
|
||||
}
|
||||
require.NotEqual(t, 0, len(sgzLayers), "no layer can be used for checking lazypull")
|
||||
|
||||
// The topmost(last) layer created by `Run` shouldn't be lazy
|
||||
_, err = contentStore.Info(ctx, manifest.Layers[len(manifest.Layers)-1].Digest)
|
||||
require.NoError(t, err)
|
||||
|
||||
// clear all local state out
|
||||
err = imageService.Delete(ctx, img.Name, images.SynchronousDelete())
|
||||
require.NoError(t, err)
|
||||
checkAllReleasable(t, c, sb, true)
|
||||
|
||||
// stargz layers should be exportable
|
||||
destDir, err := ioutil.TempDir("", "buildkit")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(destDir)
|
||||
out := filepath.Join(destDir, "out.tar")
|
||||
outW, err := os.Create(out)
|
||||
require.NoError(t, err)
|
||||
_, err = c.Solve(sb.Context(), def, SolveOpt{
|
||||
Exports: []ExportEntry{
|
||||
{
|
||||
Type: ExporterOCI,
|
||||
Output: fixedWriteCloser(outW),
|
||||
},
|
||||
},
|
||||
CacheImports: []CacheOptionsEntry{
|
||||
{
|
||||
Type: "registry",
|
||||
Attrs: map[string]string{
|
||||
"ref": sgzImage,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check if image layers are un-lazied
|
||||
for _, layer := range sgzLayers {
|
||||
_, err = contentStore.Info(ctx, layer.Digest)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err = c.Prune(sb.Context(), nil, PruneAll)
|
||||
require.NoError(t, err)
|
||||
checkAllRemoved(t, c, sb)
|
||||
}
|
||||
|
||||
func testStargzLazyPull(t *testing.T, sb integration.Sandbox) {
|
||||
skipDockerd(t, sb)
|
||||
requiresLinux(t)
|
||||
|
|
|
@ -3,21 +3,17 @@ package containerimage
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/diff"
|
||||
containerderrdefs "github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/images"
|
||||
ctdlabels "github.com/containerd/containerd/labels"
|
||||
"github.com/containerd/containerd/leases"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/containerd/containerd/remotes/docker"
|
||||
"github.com/containerd/containerd/snapshots"
|
||||
"github.com/containerd/stargz-snapshotter/estargz"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/client/llb"
|
||||
"github.com/moby/buildkit/session"
|
||||
|
@ -26,6 +22,7 @@ import (
|
|||
"github.com/moby/buildkit/solver/errdefs"
|
||||
"github.com/moby/buildkit/source"
|
||||
srctypes "github.com/moby/buildkit/source/types"
|
||||
"github.com/moby/buildkit/util/estargz"
|
||||
"github.com/moby/buildkit/util/flightcontrol"
|
||||
"github.com/moby/buildkit/util/imageutil"
|
||||
"github.com/moby/buildkit/util/leaseutil"
|
||||
|
@ -218,31 +215,13 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach
|
|||
|
||||
p.descHandlers = cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
|
||||
for i, desc := range p.manifest.Descriptors {
|
||||
// Hints for remote/stargz snapshotter for searching for remote snapshots
|
||||
labels := snapshots.FilterInheritedLabels(desc.Annotations)
|
||||
if labels == nil {
|
||||
labels = make(map[string]string)
|
||||
}
|
||||
for _, k := range []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation} {
|
||||
labels[k] = desc.Annotations[k]
|
||||
for k, v := range estargz.SnapshotLabels(p.manifest.Ref, p.manifest.Descriptors, i) {
|
||||
labels[k] = v
|
||||
}
|
||||
labels["containerd.io/snapshot/remote/stargz.reference"] = p.manifest.Ref
|
||||
labels["containerd.io/snapshot/remote/stargz.digest"] = desc.Digest.String()
|
||||
var (
|
||||
layersKey = "containerd.io/snapshot/remote/stargz.layers"
|
||||
layers string
|
||||
)
|
||||
for _, l := range p.manifest.Descriptors[i:] {
|
||||
ls := fmt.Sprintf("%s,", l.Digest.String())
|
||||
// This avoids the label hits the size limitation.
|
||||
// Skipping layers is allowed here and only affects performance.
|
||||
if err := ctdlabels.Validate(layersKey, layers+ls); err != nil {
|
||||
break
|
||||
}
|
||||
layers += ls
|
||||
}
|
||||
labels[layersKey] = strings.TrimSuffix(layers, ",")
|
||||
|
||||
p.descHandlers[desc.Digest] = &cache.DescHandler{
|
||||
Provider: p.manifest.Provider,
|
||||
Progress: progressController,
|
||||
|
|
|
@ -26,6 +26,30 @@ type MultiProvider struct {
|
|||
sub map[digest.Digest]content.Provider
|
||||
}
|
||||
|
||||
func (mp *MultiProvider) SnapshotLabels(descs []ocispecs.Descriptor, index int) map[string]string {
|
||||
if len(descs) < index {
|
||||
return nil
|
||||
}
|
||||
desc := descs[index]
|
||||
type snapshotLabels interface {
|
||||
SnapshotLabels([]ocispecs.Descriptor, int) map[string]string
|
||||
}
|
||||
|
||||
mp.mu.RLock()
|
||||
if p, ok := mp.sub[desc.Digest]; ok {
|
||||
mp.mu.RUnlock()
|
||||
if cd, ok := p.(snapshotLabels); ok {
|
||||
return cd.SnapshotLabels(descs, index)
|
||||
}
|
||||
} else {
|
||||
mp.mu.RUnlock()
|
||||
}
|
||||
if cd, ok := mp.base.(snapshotLabels); ok {
|
||||
return cd.SnapshotLabels(descs, index)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *MultiProvider) CheckDescriptor(ctx context.Context, desc ocispecs.Descriptor) error {
|
||||
type checkDescriptor interface {
|
||||
CheckDescriptor(context.Context, ocispecs.Descriptor) error
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
package estargz
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
ctdlabels "github.com/containerd/containerd/labels"
|
||||
"github.com/containerd/stargz-snapshotter/estargz"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
func SnapshotLabels(ref string, descs []ocispecs.Descriptor, targetIndex int) map[string]string {
|
||||
if len(descs) < targetIndex {
|
||||
return nil
|
||||
}
|
||||
desc := descs[targetIndex]
|
||||
labels := make(map[string]string)
|
||||
for _, k := range []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation} {
|
||||
labels[k] = desc.Annotations[k]
|
||||
}
|
||||
labels["containerd.io/snapshot/remote/stargz.reference"] = ref
|
||||
labels["containerd.io/snapshot/remote/stargz.digest"] = desc.Digest.String()
|
||||
var (
|
||||
layersKey = "containerd.io/snapshot/remote/stargz.layers"
|
||||
layers string
|
||||
)
|
||||
for _, l := range descs[targetIndex:] {
|
||||
ls := fmt.Sprintf("%s,", l.Digest.String())
|
||||
// This avoids the label hits the size limitation.
|
||||
// Skipping layers is allowed here and only affects performance.
|
||||
if err := ctdlabels.Validate(layersKey, layers+ls); err != nil {
|
||||
break
|
||||
}
|
||||
layers += ls
|
||||
}
|
||||
labels[layersKey] = strings.TrimSuffix(layers, ",")
|
||||
return labels
|
||||
}
|
|
@ -397,9 +397,20 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cac
|
|||
WriterFactory: progress.FromContext(ctx),
|
||||
},
|
||||
}
|
||||
snapshotLabels := func([]ocispecs.Descriptor, int) map[string]string { return nil }
|
||||
if cd, ok := remote.Provider.(interface {
|
||||
SnapshotLabels([]ocispecs.Descriptor, int) map[string]string
|
||||
}); ok {
|
||||
snapshotLabels = cd.SnapshotLabels
|
||||
}
|
||||
descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
|
||||
for _, desc := range remote.Descriptors {
|
||||
descHandlers[desc.Digest] = descHandler
|
||||
for i, desc := range remote.Descriptors {
|
||||
descHandlers[desc.Digest] = &cache.DescHandler{
|
||||
Provider: descHandler.Provider,
|
||||
Progress: descHandler.Progress,
|
||||
Annotations: desc.Annotations,
|
||||
SnapshotLabels: snapshotLabels(remote.Descriptors, i),
|
||||
}
|
||||
}
|
||||
|
||||
var current cache.ImmutableRef
|
||||
|
@ -417,10 +428,17 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cac
|
|||
if v, ok := desc.Annotations["buildkit/description"]; ok {
|
||||
descr = v
|
||||
}
|
||||
ref, err := w.CacheMgr.GetByBlob(ctx, desc, current,
|
||||
opts := []cache.RefOption{
|
||||
cache.WithDescription(descr),
|
||||
cache.WithCreationTime(tm),
|
||||
descHandlers)
|
||||
descHandlers,
|
||||
}
|
||||
if dh, ok := descHandlers[desc.Digest]; ok {
|
||||
if ref, ok := dh.Annotations["containerd.io/distribution.source.ref"]; ok {
|
||||
opts = append(opts, cache.WithImageRef(ref)) // can set by registry cache importer
|
||||
}
|
||||
}
|
||||
ref, err := w.CacheMgr.GetByBlob(ctx, desc, current, opts...)
|
||||
if current != nil {
|
||||
current.Release(context.TODO())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue