diff --git a/util/contentutil/pusher.go b/util/contentutil/pusher.go index ab88128a..693dcfea 100644 --- a/util/contentutil/pusher.go +++ b/util/contentutil/pusher.go @@ -2,21 +2,34 @@ package contentutil import ( "context" + "runtime" + "sync" + "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/remotes" + digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) func FromPusher(p remotes.Pusher) content.Ingester { + var mu sync.Mutex + c := sync.NewCond(&mu) return &pushingIngester{ - p: p, + mu: &mu, + c: c, + p: p, + active: map[digest.Digest]struct{}{}, } } type pushingIngester struct { p remotes.Pusher + + mu *sync.Mutex + c *sync.Cond + active map[digest.Digest]struct{} } // Writer implements content.Ingester. desc.MediaType must be set for manifest blobs. @@ -30,20 +43,55 @@ func (i *pushingIngester) Writer(ctx context.Context, opts ...content.WriterOpt) if wOpts.Ref == "" { return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty") } + + st := time.Now() + + i.mu.Lock() + for { + if time.Since(st) > time.Hour { + i.mu.Unlock() + return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %v locked", wOpts.Desc.Digest) + } + if _, ok := i.active[wOpts.Desc.Digest]; ok { + i.c.Wait() + } else { + break + } + } + + i.active[wOpts.Desc.Digest] = struct{}{} + i.mu.Unlock() + + var once sync.Once + release := func() { + once.Do(func() { + i.mu.Lock() + delete(i.active, wOpts.Desc.Digest) + i.c.Broadcast() + i.mu.Unlock() + }) + } + // pusher requires desc.MediaType to determine the PUT URL, especially for manifest blobs. contentWriter, err := i.p.Push(ctx, wOpts.Desc) if err != nil { + release() return nil, err } + runtime.SetFinalizer(contentWriter, func(_ content.Writer) { + release() + }) return &writer{ Writer: contentWriter, contentWriterRef: wOpts.Ref, + release: release, }, nil } type writer struct { content.Writer // returned from pusher.Push contentWriterRef string // ref passed for Writer() + release func() } func (w *writer) Status() (content.Status, error) { @@ -56,3 +104,19 @@ func (w *writer) Status() (content.Status, error) { } return st, nil } + +func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + err := w.Writer.Commit(ctx, size, expected, opts...) + if w.release != nil { + w.release() + } + return err +} + +func (w *writer) Close() error { + err := w.Writer.Close() + if w.release != nil { + w.release() + } + return err +} diff --git a/util/push/push.go b/util/push/push.go index 428010c3..b8d372d7 100644 --- a/util/push/push.go +++ b/util/push/push.go @@ -14,6 +14,7 @@ import ( "github.com/containerd/containerd/remotes/docker" "github.com/docker/distribution/reference" "github.com/moby/buildkit/session" + "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/imageutil" "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/resolver" @@ -73,7 +74,7 @@ func Push(ctx context.Context, sm *session.Manager, cs content.Store, dgst diges handlers := append([]images.Handler{}, images.HandlerFunc(annotateDistributionSourceHandler(cs, childrenHandler(cs))), filterHandler, - pushUpdateSourceHandler, + dedupeHandler(pushUpdateSourceHandler), ) ra, err := cs.ReaderAt(ctx, desc) @@ -248,3 +249,37 @@ func updateDistributionSourceHandler(cs content.Store, pushF images.HandlerFunc, return children, nil }), nil } + +func dedupeHandler(h images.HandlerFunc) images.HandlerFunc { + var g flightcontrol.Group + res := map[digest.Digest][]ocispec.Descriptor{} + var mu sync.Mutex + + return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + res, err := g.Do(ctx, desc.Digest.String(), func(ctx context.Context) (interface{}, error) { + mu.Lock() + if r, ok := res[desc.Digest]; ok { + mu.Unlock() + return r, nil + } + mu.Unlock() + + children, err := h(ctx, desc) + if err != nil { + return nil, err + } + + mu.Lock() + res[desc.Digest] = children + mu.Unlock() + return children, nil + }) + if err != nil { + return nil, err + } + if res == nil { + return nil, nil + } + return res.([]ocispec.Descriptor), nil + }) +}