251 lines
6.7 KiB
Go
251 lines
6.7 KiB
Go
package push
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/remotes"
|
|
"github.com/containerd/containerd/remotes/docker"
|
|
"github.com/docker/distribution/reference"
|
|
"github.com/moby/buildkit/session"
|
|
"github.com/moby/buildkit/util/imageutil"
|
|
"github.com/moby/buildkit/util/progress"
|
|
"github.com/moby/buildkit/util/resolver"
|
|
digest "github.com/opencontainers/go-digest"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func Push(ctx context.Context, sm *session.Manager, cs content.Store, dgst digest.Digest, ref string, insecure bool, hosts docker.RegistryHosts, byDigest bool) error {
|
|
desc := ocispec.Descriptor{
|
|
Digest: dgst,
|
|
}
|
|
parsed, err := reference.ParseNormalizedNamed(ref)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if byDigest && !reference.IsNameOnly(parsed) {
|
|
return errors.Errorf("can't push tagged ref %s by digest", parsed.String())
|
|
}
|
|
|
|
if byDigest {
|
|
ref = parsed.Name()
|
|
} else {
|
|
ref = reference.TagNameOnly(parsed).String()
|
|
}
|
|
|
|
resolver := resolver.New(ctx, hosts, sm)
|
|
|
|
pusher, err := resolver.Pusher(ctx, ref)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var m sync.Mutex
|
|
manifestStack := []ocispec.Descriptor{}
|
|
|
|
filterHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
|
|
images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
|
|
m.Lock()
|
|
manifestStack = append(manifestStack, desc)
|
|
m.Unlock()
|
|
return nil, images.ErrStopHandler
|
|
default:
|
|
return nil, nil
|
|
}
|
|
})
|
|
|
|
pushHandler := remotes.PushHandler(pusher, cs)
|
|
pushUpdateSourceHandler, err := updateDistributionSourceHandler(cs, pushHandler, ref)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
handlers := append([]images.Handler{},
|
|
images.HandlerFunc(annotateDistributionSourceHandler(cs, childrenHandler(cs))),
|
|
filterHandler,
|
|
pushUpdateSourceHandler,
|
|
)
|
|
|
|
ra, err := cs.ReaderAt(ctx, desc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mtype, err := imageutil.DetectManifestMediaType(ra)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
layersDone := oneOffProgress(ctx, "pushing layers")
|
|
err = images.Dispatch(ctx, images.Handlers(handlers...), nil, ocispec.Descriptor{
|
|
Digest: dgst,
|
|
Size: ra.Size(),
|
|
MediaType: mtype,
|
|
})
|
|
layersDone(err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mfstDone := oneOffProgress(ctx, fmt.Sprintf("pushing manifest for %s", ref))
|
|
for i := len(manifestStack) - 1; i >= 0; i-- {
|
|
_, err := pushHandler(ctx, manifestStack[i])
|
|
if err != nil {
|
|
mfstDone(err)
|
|
return err
|
|
}
|
|
}
|
|
mfstDone(nil)
|
|
return nil
|
|
}
|
|
|
|
func annotateDistributionSourceHandler(cs content.Store, f images.HandlerFunc) func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
children, err := f(ctx, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// only add distribution source for the config or blob data descriptor
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
|
|
images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
|
|
default:
|
|
return children, nil
|
|
}
|
|
|
|
for i := range children {
|
|
child := children[i]
|
|
|
|
info, err := cs.Info(ctx, child.Digest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for k, v := range info.Labels {
|
|
if !strings.HasPrefix(k, "containerd.io/distribution.source.") {
|
|
continue
|
|
}
|
|
|
|
if child.Annotations == nil {
|
|
child.Annotations = map[string]string{}
|
|
}
|
|
child.Annotations[k] = v
|
|
}
|
|
|
|
children[i] = child
|
|
}
|
|
return children, nil
|
|
}
|
|
}
|
|
|
|
func oneOffProgress(ctx context.Context, id string) func(err error) error {
|
|
pw, _, _ := progress.FromContext(ctx)
|
|
now := time.Now()
|
|
st := progress.Status{
|
|
Started: &now,
|
|
}
|
|
pw.Write(id, st)
|
|
return func(err error) error {
|
|
// TODO: set error on status
|
|
now := time.Now()
|
|
st.Completed = &now
|
|
pw.Write(id, st)
|
|
pw.Close()
|
|
return err
|
|
}
|
|
}
|
|
|
|
func childrenHandler(provider content.Provider) images.HandlerFunc {
|
|
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
var descs []ocispec.Descriptor
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
|
p, err := content.ReadBlob(ctx, provider, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO(stevvooe): We just assume oci manifest, for now. There may be
|
|
// subtle differences from the docker version.
|
|
var manifest ocispec.Manifest
|
|
if err := json.Unmarshal(p, &manifest); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
descs = append(descs, manifest.Config)
|
|
descs = append(descs, manifest.Layers...)
|
|
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
|
|
p, err := content.ReadBlob(ctx, provider, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var index ocispec.Index
|
|
if err := json.Unmarshal(p, &index); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, m := range index.Manifests {
|
|
if m.Digest != "" {
|
|
descs = append(descs, m)
|
|
}
|
|
}
|
|
case images.MediaTypeDockerSchema2Layer, images.MediaTypeDockerSchema2LayerGzip,
|
|
images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig,
|
|
ocispec.MediaTypeImageLayer, ocispec.MediaTypeImageLayerGzip:
|
|
// childless data types.
|
|
return nil, nil
|
|
default:
|
|
logrus.Warnf("encountered unknown type %v; children may not be fetched", desc.MediaType)
|
|
}
|
|
|
|
return descs, nil
|
|
}
|
|
}
|
|
|
|
// updateDistributionSourceHandler will update distribution source label after
|
|
// pushing layer successfully.
|
|
//
|
|
// FIXME(fuweid): There is race condition for current design of distribution
|
|
// source label if there are pull/push jobs consuming same layer.
|
|
func updateDistributionSourceHandler(cs content.Store, pushF images.HandlerFunc, ref string) (images.HandlerFunc, error) {
|
|
updateF, err := docker.AppendDistributionSourceLabel(cs, ref)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
var islayer bool
|
|
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema2Layer, images.MediaTypeDockerSchema2LayerGzip,
|
|
ocispec.MediaTypeImageLayer, ocispec.MediaTypeImageLayerGzip:
|
|
islayer = true
|
|
}
|
|
|
|
children, err := pushF(ctx, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// update distribution source to layer
|
|
if islayer {
|
|
if _, err := updateF(ctx, desc); err != nil {
|
|
logrus.Warnf("failed to update distribution source for layer %v: %v", desc.Digest, err)
|
|
}
|
|
}
|
|
return children, nil
|
|
}), nil
|
|
}
|