From 481d39c1766b160eb050a966f3084f1f4badc8d4 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 14 Aug 2019 17:00:52 -0700 Subject: [PATCH] add cross-repo push support Signed-off-by: Tonis Tiigi --- cache/remotecache/import.go | 22 +++++++++++++ cache/remotecache/registry/registry.go | 37 +++++++++++++++++++-- cmd/buildkitd/main.go | 7 +++- util/pull/pull.go | 7 ++++ util/push/push.go | 45 ++++++++++++++++++++++++-- worker/base/worker.go | 24 +++++++++++--- worker/worker.go | 2 ++ 7 files changed, 135 insertions(+), 9 deletions(-) diff --git a/cache/remotecache/import.go b/cache/remotecache/import.go index 88223ff9..b19ad880 100644 --- a/cache/remotecache/import.go +++ b/cache/remotecache/import.go @@ -27,6 +27,11 @@ type Importer interface { Resolve(ctx context.Context, desc ocispec.Descriptor, id string, w worker.Worker) (solver.CacheManager, error) } +type DistributionSourceLabelSetter interface { + SetDistributionSourceLabel(context.Context, digest.Digest) error + SetDistributionSourceAnnotation(desc ocispec.Descriptor) ocispec.Descriptor +} + func NewImporter(provider content.Provider) Importer { return &contentCacheImporter{provider: provider} } @@ -61,6 +66,15 @@ func (ci *contentCacheImporter) Resolve(ctx context.Context, desc ocispec.Descri } } + if dsls, ok := ci.provider.(DistributionSourceLabelSetter); ok { + for dgst, l := range allLayers { + err := dsls.SetDistributionSourceLabel(ctx, dgst) + _ = err // error ignored because layer may not exist + l.Descriptor = dsls.SetDistributionSourceAnnotation(l.Descriptor) + allLayers[dgst] = l + } + } + if configDesc.Digest == "" { return ci.importInlineCache(ctx, dt, id, w) } @@ -127,6 +141,14 @@ func (ci *contentCacheImporter) importInlineCache(ctx context.Context, dt []byte return nil } + if dsls, ok := ci.provider.(DistributionSourceLabelSetter); ok { + for i, l := range m.Layers { + err := dsls.SetDistributionSourceLabel(ctx, l.Digest) + _ = err // error ignored because layer may not exist + m.Layers[i] = dsls.SetDistributionSourceAnnotation(l) + } + } + p, err := content.ReadBlob(ctx, ci.provider, m.Config) if err != nil { return errors.WithStack(err) diff --git a/cache/remotecache/registry/registry.go b/cache/remotecache/registry/registry.go index 824d2f81..51e97c78 100644 --- a/cache/remotecache/registry/registry.go +++ b/cache/remotecache/registry/registry.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/docker/distribution/reference" @@ -12,6 +13,8 @@ import ( "github.com/moby/buildkit/session/auth" "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/resolver" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -46,7 +49,7 @@ func ResolveCacheExporterFunc(sm *session.Manager, resolverOpt resolver.ResolveO } } -func ResolveCacheImporterFunc(sm *session.Manager, resolverOpt resolver.ResolveOptionsFunc) remotecache.ResolveCacheImporterFunc { +func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, resolverOpt resolver.ResolveOptionsFunc) remotecache.ResolveCacheImporterFunc { return func(ctx context.Context, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) { ref, err := canonicalizeRef(attrs[attrRef]) if err != nil { @@ -61,10 +64,40 @@ func ResolveCacheImporterFunc(sm *session.Manager, resolverOpt resolver.ResolveO if err != nil { return nil, specs.Descriptor{}, err } - return remotecache.NewImporter(contentutil.FromFetcher(fetcher)), desc, nil + src := &withDistributionSourceLabel{ + Provider: contentutil.FromFetcher(fetcher), + ref: ref, + source: cs, + } + return remotecache.NewImporter(src), desc, nil } } +type withDistributionSourceLabel struct { + content.Provider + ref string + source content.Manager +} + +var _ remotecache.DistributionSourceLabelSetter = &withDistributionSourceLabel{} + +func (dsl *withDistributionSourceLabel) SetDistributionSourceLabel(ctx context.Context, dgst digest.Digest) error { + hf, err := docker.AppendDistributionSourceLabel(dsl.source, dsl.ref) + if err != nil { + return err + } + _, err = hf(ctx, ocispec.Descriptor{Digest: dgst}) + return err +} + +func (dsl *withDistributionSourceLabel) SetDistributionSourceAnnotation(desc ocispec.Descriptor) ocispec.Descriptor { + if desc.Annotations == nil { + desc.Annotations = map[string]string{} + } + desc.Annotations["containerd.io/distribution.source.ref"] = dsl.ref + return desc +} + func newRemoteResolver(ctx context.Context, resolverOpt resolver.ResolveOptionsFunc, sm *session.Manager, ref string) remotes.Resolver { opt := resolverOpt(ref) opt.Credentials = getCredentialsFunc(ctx, sm) diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index 078d8467..6e642e7d 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -591,13 +591,18 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err resolverFn := resolverFunc(cfg) + w, err := wc.GetDefault() + if err != nil { + return nil, err + } + remoteCacheExporterFuncs := map[string]remotecache.ResolveCacheExporterFunc{ "registry": registryremotecache.ResolveCacheExporterFunc(sessionManager, resolverFn), "local": localremotecache.ResolveCacheExporterFunc(sessionManager), "inline": inlineremotecache.ResolveCacheExporterFunc(), } remoteCacheImporterFuncs := map[string]remotecache.ResolveCacheImporterFunc{ - "registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, resolverFn), + "registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn), "local": localremotecache.ResolveCacheImporterFunc(sessionManager), } return control.NewController(control.Opt{ diff --git a/util/pull/pull.go b/util/pull/pull.go index 5d203ab0..eaf8770e 100644 --- a/util/pull/pull.go +++ b/util/pull/pull.go @@ -13,6 +13,7 @@ import ( "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/reference" "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/remotes/docker/schema1" "github.com/containerd/containerd/rootfs" ctdsnapshot "github.com/containerd/containerd/snapshots" @@ -138,9 +139,15 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) { // Limit manifests pulled to the best match in an index childrenHandler = images.LimitManifests(childrenHandler, platform, 1) + dslHandler, err := docker.AppendDistributionSourceLabel(p.ContentStore, p.ref) + if err != nil { + stopProgress() + return nil, err + } handlers = append(handlers, remotes.FetchHandler(p.ContentStore, fetcher), childrenHandler, + dslHandler, ) } diff --git a/util/push/push.go b/util/push/push.go index 54dc5061..eda3a691 100644 --- a/util/push/push.go +++ b/util/push/push.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "sync" "time" @@ -41,7 +42,7 @@ func getCredentialsFunc(ctx context.Context, sm *session.Manager) func(string) ( } } -func Push(ctx context.Context, sm *session.Manager, cs content.Provider, dgst digest.Digest, ref string, insecure bool, rfn resolver.ResolveOptionsFunc, byDigest bool) error { +func Push(ctx context.Context, sm *session.Manager, cs content.Store, dgst digest.Digest, ref string, insecure bool, rfn resolver.ResolveOptionsFunc, byDigest bool) error { desc := ocispec.Descriptor{ Digest: dgst, } @@ -91,7 +92,7 @@ func Push(ctx context.Context, sm *session.Manager, cs content.Provider, dgst di pushHandler := remotes.PushHandler(pusher, cs) handlers := append([]images.Handler{}, - childrenHandler(cs), + images.HandlerFunc(annotateDistributionSourceHandler(cs, childrenHandler(cs))), filterHandler, pushHandler, ) @@ -129,6 +130,46 @@ func Push(ctx context.Context, sm *session.Manager, cs content.Provider, dgst di return nil } +func annotateDistributionSourceHandler(cs content.Store, f images.HandlerFunc) func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + children, err := f(ctx, desc) + if err != nil { + return nil, err + } + + // only add distribution source for the config or blob data descriptor + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest, + images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + default: + return children, nil + } + + for i := range children { + child := children[i] + + info, err := cs.Info(ctx, child.Digest) + if err != nil { + return nil, err + } + + for k, v := range info.Labels { + if !strings.HasPrefix(k, "containerd.io/distribution.source.") { + continue + } + + if child.Annotations == nil { + child.Annotations = map[string]string{} + } + child.Annotations[k] = v + } + + children[i] = child + } + return children, nil + } +} + func oneOffProgress(ctx context.Context, id string) func(err error) error { pw, _, _ := progress.FromContext(ctx) now := time.Now() diff --git a/worker/base/worker.go b/worker/base/worker.go index 0a540a9d..f2dbce56 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -14,6 +14,7 @@ import ( "github.com/containerd/containerd/diff" "github.com/containerd/containerd/images" "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/rootfs" cdsnapshot "github.com/containerd/containerd/snapshots" "github.com/docker/docker/pkg/idtools" @@ -177,6 +178,10 @@ func NewWorker(opt WorkerOpt) (*Worker, error) { }, nil } +func (w *Worker) ContentStore() content.Store { + return w.WorkerOpt.ContentStore +} + func (w *Worker) ID() string { return w.WorkerOpt.ID } @@ -327,7 +332,7 @@ func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, } func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) { - diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore, w.Snapshotter, w.Differ, ref, createIfNeeded) + diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore(), w.Snapshotter, w.Differ, ref, createIfNeeded) if err != nil { return nil, errors.Wrap(err, "failed calculating diff pairs for exported snapshot") } @@ -343,7 +348,7 @@ func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIf descs := make([]ocispec.Descriptor, len(diffPairs)) for i, dp := range diffPairs { - info, err := w.ContentStore.Info(ctx, dp.Blobsum) + info, err := w.ContentStore().Info(ctx, dp.Blobsum) if err != nil { return nil, err } @@ -366,7 +371,7 @@ func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIf return &solver.Remote{ Descriptors: descs, - Provider: w.ContentStore, + Provider: w.ContentStore(), }, nil } @@ -391,7 +396,18 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.I func(desc ocispec.Descriptor) { eg.Go(func() error { done := oneOffProgress(ctx, fmt.Sprintf("pulling %s", desc.Digest)) - return done(contentutil.Copy(gctx, w.ContentStore, remote.Provider, desc)) + if err := contentutil.Copy(gctx, w.ContentStore(), remote.Provider, desc); err != nil { + return done(err) + } + if ref, ok := desc.Annotations["containerd.io/distribution.source.ref"]; ok { + hf, err := docker.AppendDistributionSourceLabel(w.ContentStore(), ref) + if err != nil { + return done(err) + } + _, err = hf(ctx, desc) + return done(err) + } + return done(nil) }) }(desc) } diff --git a/worker/worker.go b/worker/worker.go index 38cc7db0..2c3c616e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -4,6 +4,7 @@ import ( "context" "io" + "github.com/containerd/containerd/content" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/client" "github.com/moby/buildkit/executor" @@ -34,6 +35,7 @@ type Worker interface { GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) PruneCacheMounts(ctx context.Context, ids []string) error + ContentStore() content.Store } // Pre-defined label keys