Merge pull request #1147 from tonistiigi/cross-repo-push

add cross-repo push support
v0.7
Tõnis Tiigi 2019-08-27 10:48:43 -07:00 committed by GitHub
commit df7e93a159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 135 additions and 9 deletions

View File

@ -27,6 +27,11 @@ type Importer interface {
Resolve(ctx context.Context, desc ocispec.Descriptor, id string, w worker.Worker) (solver.CacheManager, error)
}
type DistributionSourceLabelSetter interface {
SetDistributionSourceLabel(context.Context, digest.Digest) error
SetDistributionSourceAnnotation(desc ocispec.Descriptor) ocispec.Descriptor
}
func NewImporter(provider content.Provider) Importer {
return &contentCacheImporter{provider: provider}
}
@ -61,6 +66,15 @@ func (ci *contentCacheImporter) Resolve(ctx context.Context, desc ocispec.Descri
}
}
if dsls, ok := ci.provider.(DistributionSourceLabelSetter); ok {
for dgst, l := range allLayers {
err := dsls.SetDistributionSourceLabel(ctx, dgst)
_ = err // error ignored because layer may not exist
l.Descriptor = dsls.SetDistributionSourceAnnotation(l.Descriptor)
allLayers[dgst] = l
}
}
if configDesc.Digest == "" {
return ci.importInlineCache(ctx, dt, id, w)
}
@ -127,6 +141,14 @@ func (ci *contentCacheImporter) importInlineCache(ctx context.Context, dt []byte
return nil
}
if dsls, ok := ci.provider.(DistributionSourceLabelSetter); ok {
for i, l := range m.Layers {
err := dsls.SetDistributionSourceLabel(ctx, l.Digest)
_ = err // error ignored because layer may not exist
m.Layers[i] = dsls.SetDistributionSourceAnnotation(l)
}
}
p, err := content.ReadBlob(ctx, ci.provider, m.Config)
if err != nil {
return errors.WithStack(err)

View File

@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
@ -12,6 +13,8 @@ import (
"github.com/moby/buildkit/session/auth"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/resolver"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
@ -46,7 +49,7 @@ func ResolveCacheExporterFunc(sm *session.Manager, resolverOpt resolver.ResolveO
}
}
func ResolveCacheImporterFunc(sm *session.Manager, resolverOpt resolver.ResolveOptionsFunc) remotecache.ResolveCacheImporterFunc {
func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, resolverOpt resolver.ResolveOptionsFunc) remotecache.ResolveCacheImporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
ref, err := canonicalizeRef(attrs[attrRef])
if err != nil {
@ -61,8 +64,38 @@ func ResolveCacheImporterFunc(sm *session.Manager, resolverOpt resolver.ResolveO
if err != nil {
return nil, specs.Descriptor{}, err
}
return remotecache.NewImporter(contentutil.FromFetcher(fetcher)), desc, nil
src := &withDistributionSourceLabel{
Provider: contentutil.FromFetcher(fetcher),
ref: ref,
source: cs,
}
return remotecache.NewImporter(src), desc, nil
}
}
type withDistributionSourceLabel struct {
content.Provider
ref string
source content.Manager
}
var _ remotecache.DistributionSourceLabelSetter = &withDistributionSourceLabel{}
func (dsl *withDistributionSourceLabel) SetDistributionSourceLabel(ctx context.Context, dgst digest.Digest) error {
hf, err := docker.AppendDistributionSourceLabel(dsl.source, dsl.ref)
if err != nil {
return err
}
_, err = hf(ctx, ocispec.Descriptor{Digest: dgst})
return err
}
func (dsl *withDistributionSourceLabel) SetDistributionSourceAnnotation(desc ocispec.Descriptor) ocispec.Descriptor {
if desc.Annotations == nil {
desc.Annotations = map[string]string{}
}
desc.Annotations["containerd.io/distribution.source.ref"] = dsl.ref
return desc
}
func newRemoteResolver(ctx context.Context, resolverOpt resolver.ResolveOptionsFunc, sm *session.Manager, ref string) remotes.Resolver {

View File

@ -591,13 +591,18 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
resolverFn := resolverFunc(cfg)
w, err := wc.GetDefault()
if err != nil {
return nil, err
}
remoteCacheExporterFuncs := map[string]remotecache.ResolveCacheExporterFunc{
"registry": registryremotecache.ResolveCacheExporterFunc(sessionManager, resolverFn),
"local": localremotecache.ResolveCacheExporterFunc(sessionManager),
"inline": inlineremotecache.ResolveCacheExporterFunc(),
}
remoteCacheImporterFuncs := map[string]remotecache.ResolveCacheImporterFunc{
"registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, resolverFn),
"registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn),
"local": localremotecache.ResolveCacheImporterFunc(sessionManager),
}
return control.NewController(control.Opt{

View File

@ -13,6 +13,7 @@ import (
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/reference"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
"github.com/containerd/containerd/rootfs"
ctdsnapshot "github.com/containerd/containerd/snapshots"
@ -138,9 +139,15 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
// Limit manifests pulled to the best match in an index
childrenHandler = images.LimitManifests(childrenHandler, platform, 1)
dslHandler, err := docker.AppendDistributionSourceLabel(p.ContentStore, p.ref)
if err != nil {
stopProgress()
return nil, err
}
handlers = append(handlers,
remotes.FetchHandler(p.ContentStore, fetcher),
childrenHandler,
dslHandler,
)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
@ -41,7 +42,7 @@ func getCredentialsFunc(ctx context.Context, sm *session.Manager) func(string) (
}
}
func Push(ctx context.Context, sm *session.Manager, cs content.Provider, dgst digest.Digest, ref string, insecure bool, rfn resolver.ResolveOptionsFunc, byDigest bool) error {
func Push(ctx context.Context, sm *session.Manager, cs content.Store, dgst digest.Digest, ref string, insecure bool, rfn resolver.ResolveOptionsFunc, byDigest bool) error {
desc := ocispec.Descriptor{
Digest: dgst,
}
@ -91,7 +92,7 @@ func Push(ctx context.Context, sm *session.Manager, cs content.Provider, dgst di
pushHandler := remotes.PushHandler(pusher, cs)
handlers := append([]images.Handler{},
childrenHandler(cs),
images.HandlerFunc(annotateDistributionSourceHandler(cs, childrenHandler(cs))),
filterHandler,
pushHandler,
)
@ -129,6 +130,46 @@ func Push(ctx context.Context, sm *session.Manager, cs content.Provider, dgst di
return nil
}
func annotateDistributionSourceHandler(cs content.Store, f images.HandlerFunc) func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
children, err := f(ctx, desc)
if err != nil {
return nil, err
}
// only add distribution source for the config or blob data descriptor
switch desc.MediaType {
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
default:
return children, nil
}
for i := range children {
child := children[i]
info, err := cs.Info(ctx, child.Digest)
if err != nil {
return nil, err
}
for k, v := range info.Labels {
if !strings.HasPrefix(k, "containerd.io/distribution.source.") {
continue
}
if child.Annotations == nil {
child.Annotations = map[string]string{}
}
child.Annotations[k] = v
}
children[i] = child
}
return children, nil
}
}
func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)
now := time.Now()

View File

@ -14,6 +14,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/rootfs"
cdsnapshot "github.com/containerd/containerd/snapshots"
"github.com/docker/docker/pkg/idtools"
@ -177,6 +178,10 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
}, nil
}
func (w *Worker) ContentStore() content.Store {
return w.WorkerOpt.ContentStore
}
func (w *Worker) ID() string {
return w.WorkerOpt.ID
}
@ -327,7 +332,7 @@ func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter,
}
func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) {
diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore, w.Snapshotter, w.Differ, ref, createIfNeeded)
diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore(), w.Snapshotter, w.Differ, ref, createIfNeeded)
if err != nil {
return nil, errors.Wrap(err, "failed calculating diff pairs for exported snapshot")
}
@ -343,7 +348,7 @@ func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIf
descs := make([]ocispec.Descriptor, len(diffPairs))
for i, dp := range diffPairs {
info, err := w.ContentStore.Info(ctx, dp.Blobsum)
info, err := w.ContentStore().Info(ctx, dp.Blobsum)
if err != nil {
return nil, err
}
@ -366,7 +371,7 @@ func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIf
return &solver.Remote{
Descriptors: descs,
Provider: w.ContentStore,
Provider: w.ContentStore(),
}, nil
}
@ -391,7 +396,18 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.I
func(desc ocispec.Descriptor) {
eg.Go(func() error {
done := oneOffProgress(ctx, fmt.Sprintf("pulling %s", desc.Digest))
return done(contentutil.Copy(gctx, w.ContentStore, remote.Provider, desc))
if err := contentutil.Copy(gctx, w.ContentStore(), remote.Provider, desc); err != nil {
return done(err)
}
if ref, ok := desc.Annotations["containerd.io/distribution.source.ref"]; ok {
hf, err := docker.AppendDistributionSourceLabel(w.ContentStore(), ref)
if err != nil {
return done(err)
}
_, err = hf(ctx, desc)
return done(err)
}
return done(nil)
})
}(desc)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"io"
"github.com/containerd/containerd/content"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/executor"
@ -34,6 +35,7 @@ type Worker interface {
GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error)
FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error)
PruneCacheMounts(ctx context.Context, ids []string) error
ContentStore() content.Store
}
// Pre-defined label keys