diff --git a/util/resolver/limited/group.go b/util/resolver/limited/group.go index 6c0ba3a9..7fdd947a 100644 --- a/util/resolver/limited/group.go +++ b/util/resolver/limited/group.go @@ -11,12 +11,15 @@ import ( "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" "github.com/docker/distribution/reference" - digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" "golang.org/x/sync/semaphore" ) +type contextKeyT string + +var contextKey = contextKeyT("buildkit/util/resolver/limited") + var Default = New(4) type Group struct { @@ -30,7 +33,13 @@ type req struct { ref string } -func (r *req) acquire(ctx context.Context, desc ocispecs.Descriptor) (func(), error) { +func (r *req) acquire(ctx context.Context, desc ocispecs.Descriptor) (context.Context, func(), error) { + if v := ctx.Value(contextKey); v != nil { + return ctx, func() {}, nil + } + + ctx = context.WithValue(ctx, contextKey, struct{}{}) + // json request get one additional connection highPriority := strings.HasSuffix(desc.MediaType, "+json") @@ -46,16 +55,16 @@ func (r *req) acquire(ctx context.Context, desc ocispecs.Descriptor) (func(), er r.g.mu.Unlock() if !highPriority { if err := s[0].Acquire(ctx, 1); err != nil { - return nil, err + return ctx, nil, err } } if err := s[1].Acquire(ctx, 1); err != nil { if !highPriority { s[0].Release(1) } - return nil, err + return ctx, nil, err } - return func() { + return ctx, func() { s[1].Release(1) if !highPriority { s[0].Release(1) @@ -78,60 +87,17 @@ func (g *Group) WrapFetcher(f remotes.Fetcher, ref string) remotes.Fetcher { return &fetcher{Fetcher: f, req: g.req(ref)} } -func (g *Group) WrapPusher(p remotes.Pusher, ref string) remotes.Pusher { - return &pusher{Pusher: p, req: g.req(ref)} -} - -type pusher struct { - remotes.Pusher - req *req -} - -func (p *pusher) Push(ctx context.Context, desc ocispecs.Descriptor) (content.Writer, error) { - release, err := p.req.acquire(ctx, desc) - if err != nil { - return nil, err - } - w, err := p.Pusher.Push(ctx, desc) - if err != nil { - release() - return nil, err - } - ww := &writer{Writer: w} - closer := func() { - if !ww.closed { - logrus.Warnf("writer not closed cleanly: %s", desc.Digest) +func (g *Group) PushHandler(pusher remotes.Pusher, provider content.Provider, ref string) images.HandlerFunc { + ph := remotes.PushHandler(pusher, provider) + req := g.req(ref) + return func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) { + ctx, release, err := req.acquire(ctx, desc) + if err != nil { + return nil, err } - release() + defer release() + return ph(ctx, desc) } - ww.release = closer - runtime.SetFinalizer(ww, func(rc *writer) { - rc.close() - }) - return ww, nil -} - -type writer struct { - content.Writer - once sync.Once - release func() - closed bool -} - -func (w *writer) Close() error { - w.closed = true - w.close() - return w.Writer.Close() -} - -func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { - w.closed = true - w.close() - return w.Writer.Commit(ctx, size, expected, opts...) -} - -func (w *writer) close() { - w.once.Do(w.release) } type fetcher struct { @@ -140,7 +106,7 @@ type fetcher struct { } func (f *fetcher) Fetch(ctx context.Context, desc ocispecs.Descriptor) (io.ReadCloser, error) { - release, err := f.req.acquire(ctx, desc) + ctx, release, err := f.req.acquire(ctx, desc) if err != nil { return nil, err } @@ -196,7 +162,7 @@ func FetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, ref string } func PushHandler(pusher remotes.Pusher, provider content.Provider, ref string) images.HandlerFunc { - return remotes.PushHandler(Default.WrapPusher(pusher, ref), provider) + return Default.PushHandler(pusher, provider, ref) } func domain(ref string) string {