From f269d00f281b31ec28c5ef9ed75db8be1675409b Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 14 Jul 2021 19:21:08 -0700 Subject: [PATCH] new implementation for limiting tcp connections The previous implementation had many issues. Eg. on fetch, even if the data already existed and no remote connections were needed the request would still be waiting in the queue. Or if two fetches of same blob happened together they would take up two places in queue although there was only one remote request. Signed-off-by: Tonis Tiigi --- cache/remotecache/registry/registry.go | 3 +- util/contentutil/copy.go | 6 +- util/imageutil/config.go | 3 +- util/pull/pull.go | 3 +- util/push/push.go | 4 +- util/resolver/limited/group.go | 181 +++++++++++++++++++++++++ util/resolver/retryhandler/retry.go | 28 +--- 7 files changed, 193 insertions(+), 35 deletions(-) create mode 100644 util/resolver/limited/group.go diff --git a/cache/remotecache/registry/registry.go b/cache/remotecache/registry/registry.go index a704693e..e4e4177b 100644 --- a/cache/remotecache/registry/registry.go +++ b/cache/remotecache/registry/registry.go @@ -11,6 +11,7 @@ import ( "github.com/moby/buildkit/session" "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/resolver" + "github.com/moby/buildkit/util/resolver/limited" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/image-spec/specs-go/v1" @@ -72,7 +73,7 @@ func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, hosts docke return nil, specs.Descriptor{}, err } src := &withDistributionSourceLabel{ - Provider: contentutil.FromFetcher(fetcher), + Provider: contentutil.FromFetcher(limited.Default.WrapFetcher(fetcher, ref)), ref: ref, source: cs, } diff --git a/util/contentutil/copy.go b/util/contentutil/copy.go index b4d68d73..12e6f8be 100644 --- a/util/contentutil/copy.go +++ b/util/contentutil/copy.go @@ -7,14 +7,14 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" - "github.com/containerd/containerd/remotes" + "github.com/moby/buildkit/util/resolver/limited" "github.com/moby/buildkit/util/resolver/retryhandler" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, ref string, logger func([]byte)) error { - if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), ref, logger)(ctx, desc); err != nil { + if _, err := retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ref), logger)(ctx, desc); err != nil { return err } return nil @@ -65,7 +65,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content. handlers := []images.Handler{ images.ChildrenHandler(provider), filterHandler, - retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), "", func(_ []byte) {}), + retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ""), func(_ []byte) {}), } if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { diff --git a/util/imageutil/config.go b/util/imageutil/config.go index f514c9de..fb60147b 100644 --- a/util/imageutil/config.go +++ b/util/imageutil/config.go @@ -14,6 +14,7 @@ import ( "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/moby/buildkit/util/leaseutil" + "github.com/moby/buildkit/util/resolver/limited" "github.com/moby/buildkit/util/resolver/retryhandler" digest "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go/v1" @@ -101,7 +102,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co children := childrenConfigHandler(cache, platform) handlers := []images.Handler{ - retryhandler.New(remotes.FetchHandler(cache, fetcher), str, func(_ []byte) {}), + retryhandler.New(limited.FetchHandler(cache, fetcher, str), func(_ []byte) {}), children, } if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { diff --git a/util/pull/pull.go b/util/pull/pull.go index befdb8c0..c9dae4e5 100644 --- a/util/pull/pull.go +++ b/util/pull/pull.go @@ -18,6 +18,7 @@ import ( "github.com/moby/buildkit/util/progress/logs" "github.com/moby/buildkit/util/pull/pullprogress" "github.com/moby/buildkit/util/resolver" + "github.com/moby/buildkit/util/resolver/limited" "github.com/moby/buildkit/util/resolver/retryhandler" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -148,7 +149,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) { } handlers = append(handlers, filterLayerBlobs(metadata, &mu), - retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), p.ref, logs.LoggerFromContext(ctx)), + retryhandler.New(limited.FetchHandler(p.ContentStore, fetcher, p.ref), logs.LoggerFromContext(ctx)), childrenHandler, dslHandler, ) diff --git a/util/push/push.go b/util/push/push.go index 9cd1aaca..41bc6146 100644 --- a/util/push/push.go +++ b/util/push/push.go @@ -11,7 +11,6 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" - "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/docker/distribution/reference" "github.com/moby/buildkit/session" @@ -20,6 +19,7 @@ import ( "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/progress/logs" "github.com/moby/buildkit/util/resolver" + "github.com/moby/buildkit/util/resolver/limited" "github.com/moby/buildkit/util/resolver/retryhandler" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -86,7 +86,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content } }) - pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), ref, logs.LoggerFromContext(ctx)) + pushHandler := retryhandler.New(limited.PushHandler(pusher, provider, ref), logs.LoggerFromContext(ctx)) pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref) if err != nil { return err diff --git a/util/resolver/limited/group.go b/util/resolver/limited/group.go new file mode 100644 index 00000000..c25de21a --- /dev/null +++ b/util/resolver/limited/group.go @@ -0,0 +1,181 @@ +package limited + +import ( + "context" + "io" + "runtime" + "sync" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/remotes" + "github.com/docker/distribution/reference" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" +) + +var Default = New(4) + +type Group struct { + mu sync.Mutex + size int + sem map[string]*semaphore.Weighted +} + +type req struct { + g *Group + ref string +} + +func (r *req) acquire(ctx context.Context) (func(), error) { + r.g.mu.Lock() + s, ok := r.g.sem[r.ref] + if !ok { + s = semaphore.NewWeighted(int64(r.g.size)) + r.g.sem[r.ref] = s + } + r.g.mu.Unlock() + if err := s.Acquire(ctx, 1); err != nil { + return nil, err + } + return func() { + s.Release(1) + }, nil +} + +func New(size int) *Group { + return &Group{ + size: size, + sem: make(map[string]*semaphore.Weighted), + } +} + +func (g *Group) req(ref string) *req { + return &req{g: g, ref: domain(ref)} +} + +func (g *Group) WrapFetcher(f remotes.Fetcher, ref string) remotes.Fetcher { + return &fetcher{Fetcher: f, req: g.req(ref)} +} + +func (g *Group) WrapPusher(p remotes.Pusher, ref string) remotes.Pusher { + return &pusher{Pusher: p, req: g.req(ref)} +} + +type pusher struct { + remotes.Pusher + req *req +} + +func (p *pusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) { + release, err := p.req.acquire(ctx) + if err != nil { + return nil, err + } + w, err := p.Pusher.Push(ctx, desc) + if err != nil { + release() + return nil, err + } + ww := &writer{Writer: w} + closer := func() { + if !ww.closed { + logrus.Warnf("writer not closed cleanly: %s", desc.Digest) + } + release() + } + ww.release = closer + runtime.SetFinalizer(ww, func(rc *writer) { + rc.close() + }) + return ww, nil +} + +type writer struct { + content.Writer + once sync.Once + release func() + closed bool +} + +func (w *writer) Close() error { + w.closed = true + w.close() + return w.Writer.Close() +} + +func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + w.closed = true + w.close() + return w.Writer.Commit(ctx, size, expected, opts...) +} + +func (w *writer) close() { + w.once.Do(w.release) +} + +type fetcher struct { + remotes.Fetcher + req *req +} + +func (f *fetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) { + release, err := f.req.acquire(ctx) + if err != nil { + return nil, err + } + rc, err := f.Fetcher.Fetch(ctx, desc) + if err != nil { + release() + return nil, err + } + rcw := &readCloser{ReadCloser: rc} + closer := func() { + if !rcw.closed { + logrus.Warnf("fetcher not closed cleanly: %s", desc.Digest) + } + release() + } + rcw.release = closer + runtime.SetFinalizer(rcw, func(rc *readCloser) { + rc.close() + }) + + return rcw, nil +} + +type readCloser struct { + io.ReadCloser + once sync.Once + closed bool + release func() +} + +func (r *readCloser) Close() error { + r.closed = true + r.close() + return r.ReadCloser.Close() +} + +func (r *readCloser) close() { + r.once.Do(r.release) +} + +func FetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, ref string) images.HandlerFunc { + return remotes.FetchHandler(ingester, Default.WrapFetcher(fetcher, ref)) +} + +func PushHandler(pusher remotes.Pusher, provider content.Provider, ref string) images.HandlerFunc { + return remotes.PushHandler(Default.WrapPusher(pusher, ref), provider) +} + +func domain(ref string) string { + if ref != "" { + if named, err := reference.ParseNormalizedNamed(ref); err == nil { + return reference.Domain(named) + } + } + return ref +} diff --git a/util/resolver/retryhandler/retry.go b/util/resolver/retryhandler/retry.go index 5b461259..36cf6af9 100644 --- a/util/resolver/retryhandler/retry.go +++ b/util/resolver/retryhandler/retry.go @@ -5,43 +5,17 @@ import ( "fmt" "io" "net" - "sync" "syscall" "time" "github.com/containerd/containerd/images" remoteserrors "github.com/containerd/containerd/remotes/errors" - "github.com/docker/distribution/reference" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" - "golang.org/x/sync/semaphore" ) -var mu sync.Mutex -var sem = map[string]*semaphore.Weighted{} - -const connsPerHost = 4 - -func New(f images.HandlerFunc, ref string, logger func([]byte)) images.HandlerFunc { - if ref != "" { - if named, err := reference.ParseNormalizedNamed(ref); err == nil { - ref = reference.Domain(named) - } - } - +func New(f images.HandlerFunc, logger func([]byte)) images.HandlerFunc { return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - mu.Lock() - s, ok := sem[ref] - if !ok { - s = semaphore.NewWeighted(connsPerHost) - sem[ref] = s - } - mu.Unlock() - if err := s.Acquire(ctx, 1); err != nil { - return nil, err - } - defer s.Release(1) - backoff := time.Second for { descs, err := f(ctx, desc)