348 lines
10 KiB
Go
348 lines
10 KiB
Go
package push
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/remotes"
|
|
"github.com/containerd/containerd/remotes/docker"
|
|
"github.com/docker/distribution/reference"
|
|
"github.com/moby/buildkit/session"
|
|
"github.com/moby/buildkit/util/flightcontrol"
|
|
"github.com/moby/buildkit/util/imageutil"
|
|
"github.com/moby/buildkit/util/progress"
|
|
"github.com/moby/buildkit/util/progress/logs"
|
|
"github.com/moby/buildkit/util/resolver"
|
|
resolverconfig "github.com/moby/buildkit/util/resolver/config"
|
|
"github.com/moby/buildkit/util/resolver/limited"
|
|
"github.com/moby/buildkit/util/resolver/retryhandler"
|
|
digest "github.com/opencontainers/go-digest"
|
|
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type pusher struct {
|
|
remotes.Pusher
|
|
}
|
|
|
|
// Pusher creates and new pusher instance for resolver
|
|
// containerd resolver.Pusher() method is broken and should not be called directly
|
|
// we need to wrap to mask interface detection
|
|
func Pusher(ctx context.Context, resolver remotes.Resolver, ref string) (remotes.Pusher, error) {
|
|
p, err := resolver.Pusher(ctx, ref)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pusher{Pusher: p}, nil
|
|
}
|
|
|
|
func Push(ctx context.Context, sm *session.Manager, sid string, provider content.Provider, manager content.Manager, dgst digest.Digest, ref string, insecure bool, hosts docker.RegistryHosts, byDigest bool, annotations map[digest.Digest]map[string]string) error {
|
|
desc := ocispecs.Descriptor{
|
|
Digest: dgst,
|
|
}
|
|
parsed, err := reference.ParseNormalizedNamed(ref)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if byDigest && !reference.IsNameOnly(parsed) {
|
|
return errors.Errorf("can't push tagged ref %s by digest", parsed.String())
|
|
}
|
|
|
|
if byDigest {
|
|
ref = parsed.Name()
|
|
} else {
|
|
// add digest to ref, this is what containderd uses to choose root manifest from all manifests
|
|
r, err := reference.WithDigest(reference.TagNameOnly(parsed), dgst)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to combine ref %s with digest %s", ref, dgst)
|
|
}
|
|
ref = r.String()
|
|
}
|
|
|
|
scope := "push"
|
|
if insecure {
|
|
insecureTrue := true
|
|
httpTrue := true
|
|
hosts = resolver.NewRegistryConfig(map[string]resolverconfig.RegistryConfig{
|
|
reference.Domain(parsed): {
|
|
Insecure: &insecureTrue,
|
|
PlainHTTP: &httpTrue,
|
|
},
|
|
})
|
|
scope += ":insecure"
|
|
}
|
|
|
|
resolver := resolver.DefaultPool.GetResolver(hosts, ref, scope, sm, session.NewGroup(sid))
|
|
|
|
pusher, err := Pusher(ctx, resolver, ref)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var m sync.Mutex
|
|
manifestStack := []ocispecs.Descriptor{}
|
|
|
|
filterHandler := images.HandlerFunc(func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema2Manifest, ocispecs.MediaTypeImageManifest,
|
|
images.MediaTypeDockerSchema2ManifestList, ocispecs.MediaTypeImageIndex:
|
|
m.Lock()
|
|
manifestStack = append(manifestStack, desc)
|
|
m.Unlock()
|
|
return nil, images.ErrStopHandler
|
|
default:
|
|
return nil, nil
|
|
}
|
|
})
|
|
|
|
pushHandler := retryhandler.New(limited.PushHandler(pusher, provider, ref), logs.LoggerFromContext(ctx))
|
|
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
handlers := append([]images.Handler{},
|
|
images.HandlerFunc(annotateDistributionSourceHandler(manager, annotations, childrenHandler(provider))),
|
|
filterHandler,
|
|
dedupeHandler(pushUpdateSourceHandler),
|
|
)
|
|
|
|
ra, err := provider.ReaderAt(ctx, desc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mtype, err := imageutil.DetectManifestMediaType(ra)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
layersDone := oneOffProgress(ctx, "pushing layers")
|
|
err = images.Dispatch(ctx, skipNonDistributableBlobs(images.Handlers(handlers...)), nil, ocispecs.Descriptor{
|
|
Digest: dgst,
|
|
Size: ra.Size(),
|
|
MediaType: mtype,
|
|
})
|
|
if err := layersDone(err); err != nil {
|
|
return err
|
|
}
|
|
|
|
mfstDone := oneOffProgress(ctx, fmt.Sprintf("pushing manifest for %s", ref))
|
|
for i := len(manifestStack) - 1; i >= 0; i-- {
|
|
if _, err := pushHandler(ctx, manifestStack[i]); err != nil {
|
|
return mfstDone(err)
|
|
}
|
|
}
|
|
return mfstDone(nil)
|
|
}
|
|
|
|
// TODO: the containerd function for this is filtering too much, that needs to be fixed.
|
|
// For now we just carry this.
|
|
func skipNonDistributableBlobs(f images.HandlerFunc) images.HandlerFunc {
|
|
return func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
|
|
if images.IsNonDistributable(desc.MediaType) {
|
|
log.G(ctx).WithField("digest", desc.Digest).WithField("mediatype", desc.MediaType).Debug("Skipping non-distributable blob")
|
|
return nil, images.ErrSkipDesc
|
|
}
|
|
return f(ctx, desc)
|
|
}
|
|
}
|
|
|
|
func annotateDistributionSourceHandler(manager content.Manager, annotations map[digest.Digest]map[string]string, f images.HandlerFunc) func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
|
|
return func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
|
|
children, err := f(ctx, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// only add distribution source for the config or blob data descriptor
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema2Manifest, ocispecs.MediaTypeImageManifest,
|
|
images.MediaTypeDockerSchema2ManifestList, ocispecs.MediaTypeImageIndex:
|
|
default:
|
|
return children, nil
|
|
}
|
|
|
|
for i := range children {
|
|
child := children[i]
|
|
|
|
if m, ok := annotations[child.Digest]; ok {
|
|
for k, v := range m {
|
|
if !strings.HasPrefix(k, "containerd.io/distribution.source.") {
|
|
continue
|
|
}
|
|
if child.Annotations == nil {
|
|
child.Annotations = map[string]string{}
|
|
}
|
|
child.Annotations[k] = v
|
|
}
|
|
}
|
|
children[i] = child
|
|
|
|
info, err := manager.Info(ctx, child.Digest)
|
|
if errors.Is(err, errdefs.ErrNotFound) {
|
|
continue
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for k, v := range info.Labels {
|
|
if !strings.HasPrefix(k, "containerd.io/distribution.source.") {
|
|
continue
|
|
}
|
|
|
|
if child.Annotations == nil {
|
|
child.Annotations = map[string]string{}
|
|
}
|
|
child.Annotations[k] = v
|
|
}
|
|
|
|
children[i] = child
|
|
}
|
|
return children, nil
|
|
}
|
|
}
|
|
|
|
func oneOffProgress(ctx context.Context, id string) func(err error) error {
|
|
pw, _, _ := progress.NewFromContext(ctx)
|
|
now := time.Now()
|
|
st := progress.Status{
|
|
Started: &now,
|
|
}
|
|
pw.Write(id, st)
|
|
return func(err error) error {
|
|
// TODO: set error on status
|
|
now := time.Now()
|
|
st.Completed = &now
|
|
pw.Write(id, st)
|
|
pw.Close()
|
|
return err
|
|
}
|
|
}
|
|
|
|
func childrenHandler(provider content.Provider) images.HandlerFunc {
|
|
return func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
|
|
var descs []ocispecs.Descriptor
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema2Manifest, ocispecs.MediaTypeImageManifest:
|
|
p, err := content.ReadBlob(ctx, provider, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO(stevvooe): We just assume oci manifest, for now. There may be
|
|
// subtle differences from the docker version.
|
|
var manifest ocispecs.Manifest
|
|
if err := json.Unmarshal(p, &manifest); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
descs = append(descs, manifest.Config)
|
|
descs = append(descs, manifest.Layers...)
|
|
case images.MediaTypeDockerSchema2ManifestList, ocispecs.MediaTypeImageIndex:
|
|
p, err := content.ReadBlob(ctx, provider, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var index ocispecs.Index
|
|
if err := json.Unmarshal(p, &index); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, m := range index.Manifests {
|
|
if m.Digest != "" {
|
|
descs = append(descs, m)
|
|
}
|
|
}
|
|
case images.MediaTypeDockerSchema2Layer, images.MediaTypeDockerSchema2LayerGzip,
|
|
images.MediaTypeDockerSchema2Config, ocispecs.MediaTypeImageConfig,
|
|
ocispecs.MediaTypeImageLayer, ocispecs.MediaTypeImageLayerGzip:
|
|
// childless data types.
|
|
return nil, nil
|
|
default:
|
|
logrus.Warnf("encountered unknown type %v; children may not be fetched", desc.MediaType)
|
|
}
|
|
|
|
return descs, nil
|
|
}
|
|
}
|
|
|
|
// updateDistributionSourceHandler will update distribution source label after
|
|
// pushing layer successfully.
|
|
//
|
|
// FIXME(fuweid): There is race condition for current design of distribution
|
|
// source label if there are pull/push jobs consuming same layer.
|
|
func updateDistributionSourceHandler(manager content.Manager, pushF images.HandlerFunc, ref string) (images.HandlerFunc, error) {
|
|
updateF, err := docker.AppendDistributionSourceLabel(manager, ref)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return images.HandlerFunc(func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
|
|
var islayer bool
|
|
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema2Layer, images.MediaTypeDockerSchema2LayerGzip,
|
|
ocispecs.MediaTypeImageLayer, ocispecs.MediaTypeImageLayerGzip:
|
|
islayer = true
|
|
}
|
|
|
|
children, err := pushF(ctx, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// update distribution source to layer
|
|
if islayer {
|
|
if _, err := updateF(ctx, desc); err != nil {
|
|
logrus.Warnf("failed to update distribution source for layer %v: %v", desc.Digest, err)
|
|
}
|
|
}
|
|
return children, nil
|
|
}), nil
|
|
}
|
|
|
|
func dedupeHandler(h images.HandlerFunc) images.HandlerFunc {
|
|
var g flightcontrol.Group
|
|
res := map[digest.Digest][]ocispecs.Descriptor{}
|
|
var mu sync.Mutex
|
|
|
|
return images.HandlerFunc(func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
|
|
res, err := g.Do(ctx, desc.Digest.String(), func(ctx context.Context) (interface{}, error) {
|
|
mu.Lock()
|
|
if r, ok := res[desc.Digest]; ok {
|
|
mu.Unlock()
|
|
return r, nil
|
|
}
|
|
mu.Unlock()
|
|
|
|
children, err := h(ctx, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mu.Lock()
|
|
res[desc.Digest] = children
|
|
mu.Unlock()
|
|
return children, nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if res == nil {
|
|
return nil, nil
|
|
}
|
|
return res.([]ocispecs.Descriptor), nil
|
|
})
|
|
}
|