Merge pull request #1548 from tonistiigi/push-fix

push: dedupe push handlers
v0.8
Akihiro Suda 2020-06-30 12:49:37 +09:00 committed by GitHub
commit c25f2217dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 101 additions and 2 deletions

View File

@ -2,21 +2,34 @@ package contentutil
import ( import (
"context" "context"
"runtime"
"sync"
"time"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
func FromPusher(p remotes.Pusher) content.Ingester { func FromPusher(p remotes.Pusher) content.Ingester {
var mu sync.Mutex
c := sync.NewCond(&mu)
return &pushingIngester{ return &pushingIngester{
mu: &mu,
c: c,
p: p, p: p,
active: map[digest.Digest]struct{}{},
} }
} }
type pushingIngester struct { type pushingIngester struct {
p remotes.Pusher p remotes.Pusher
mu *sync.Mutex
c *sync.Cond
active map[digest.Digest]struct{}
} }
// Writer implements content.Ingester. desc.MediaType must be set for manifest blobs. // Writer implements content.Ingester. desc.MediaType must be set for manifest blobs.
@ -30,20 +43,55 @@ func (i *pushingIngester) Writer(ctx context.Context, opts ...content.WriterOpt)
if wOpts.Ref == "" { if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty") return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
} }
st := time.Now()
i.mu.Lock()
for {
if time.Since(st) > time.Hour {
i.mu.Unlock()
return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %v locked", wOpts.Desc.Digest)
}
if _, ok := i.active[wOpts.Desc.Digest]; ok {
i.c.Wait()
} else {
break
}
}
i.active[wOpts.Desc.Digest] = struct{}{}
i.mu.Unlock()
var once sync.Once
release := func() {
once.Do(func() {
i.mu.Lock()
delete(i.active, wOpts.Desc.Digest)
i.c.Broadcast()
i.mu.Unlock()
})
}
// pusher requires desc.MediaType to determine the PUT URL, especially for manifest blobs. // pusher requires desc.MediaType to determine the PUT URL, especially for manifest blobs.
contentWriter, err := i.p.Push(ctx, wOpts.Desc) contentWriter, err := i.p.Push(ctx, wOpts.Desc)
if err != nil { if err != nil {
release()
return nil, err return nil, err
} }
runtime.SetFinalizer(contentWriter, func(_ content.Writer) {
release()
})
return &writer{ return &writer{
Writer: contentWriter, Writer: contentWriter,
contentWriterRef: wOpts.Ref, contentWriterRef: wOpts.Ref,
release: release,
}, nil }, nil
} }
type writer struct { type writer struct {
content.Writer // returned from pusher.Push content.Writer // returned from pusher.Push
contentWriterRef string // ref passed for Writer() contentWriterRef string // ref passed for Writer()
release func()
} }
func (w *writer) Status() (content.Status, error) { func (w *writer) Status() (content.Status, error) {
@ -56,3 +104,19 @@ func (w *writer) Status() (content.Status, error) {
} }
return st, nil return st, nil
} }
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
err := w.Writer.Commit(ctx, size, expected, opts...)
if w.release != nil {
w.release()
}
return err
}
func (w *writer) Close() error {
err := w.Writer.Close()
if w.release != nil {
w.release()
}
return err
}

View File

@ -14,6 +14,7 @@ import (
"github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference" "github.com/docker/distribution/reference"
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/imageutil" "github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/resolver" "github.com/moby/buildkit/util/resolver"
@ -73,7 +74,7 @@ func Push(ctx context.Context, sm *session.Manager, cs content.Store, dgst diges
handlers := append([]images.Handler{}, handlers := append([]images.Handler{},
images.HandlerFunc(annotateDistributionSourceHandler(cs, childrenHandler(cs))), images.HandlerFunc(annotateDistributionSourceHandler(cs, childrenHandler(cs))),
filterHandler, filterHandler,
pushUpdateSourceHandler, dedupeHandler(pushUpdateSourceHandler),
) )
ra, err := cs.ReaderAt(ctx, desc) ra, err := cs.ReaderAt(ctx, desc)
@ -248,3 +249,37 @@ func updateDistributionSourceHandler(cs content.Store, pushF images.HandlerFunc,
return children, nil return children, nil
}), nil }), nil
} }
func dedupeHandler(h images.HandlerFunc) images.HandlerFunc {
var g flightcontrol.Group
res := map[digest.Digest][]ocispec.Descriptor{}
var mu sync.Mutex
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
res, err := g.Do(ctx, desc.Digest.String(), func(ctx context.Context) (interface{}, error) {
mu.Lock()
if r, ok := res[desc.Digest]; ok {
mu.Unlock()
return r, nil
}
mu.Unlock()
children, err := h(ctx, desc)
if err != nil {
return nil, err
}
mu.Lock()
res[desc.Digest] = children
mu.Unlock()
return children, nil
})
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return res.([]ocispec.Descriptor), nil
})
}