new implementation for limiting tcp connections

The previous implementation had many issues. Eg. on fetch, even if
the data already existed and no remote connections were needed
the request would still be waiting in the queue. Or if two fetches
of same blob happened together they would take up two places in queue
although there was only one remote request.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
v0.9
Tonis Tiigi 2021-07-14 19:21:08 -07:00
parent a1818323d4
commit f269d00f28
7 changed files with 193 additions and 35 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/image-spec/specs-go/v1"
@ -72,7 +73,7 @@ func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, hosts docke
return nil, specs.Descriptor{}, err
}
src := &withDistributionSourceLabel{
Provider: contentutil.FromFetcher(fetcher),
Provider: contentutil.FromFetcher(limited.Default.WrapFetcher(fetcher, ref)),
ref: ref,
source: cs,
}

View File

@ -7,14 +7,14 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, ref string, logger func([]byte)) error {
if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), ref, logger)(ctx, desc); err != nil {
if _, err := retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ref), logger)(ctx, desc); err != nil {
return err
}
return nil
@ -65,7 +65,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content.
handlers := []images.Handler{
images.ChildrenHandler(provider),
filterHandler,
retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), "", func(_ []byte) {}),
retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ""), func(_ []byte) {}),
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {

View File

@ -14,6 +14,7 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
@ -101,7 +102,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co
children := childrenConfigHandler(cache, platform)
handlers := []images.Handler{
retryhandler.New(remotes.FetchHandler(cache, fetcher), str, func(_ []byte) {}),
retryhandler.New(limited.FetchHandler(cache, fetcher, str), func(_ []byte) {}),
children,
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {

View File

@ -18,6 +18,7 @@ import (
"github.com/moby/buildkit/util/progress/logs"
"github.com/moby/buildkit/util/pull/pullprogress"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -148,7 +149,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
}
handlers = append(handlers,
filterLayerBlobs(metadata, &mu),
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), p.ref, logs.LoggerFromContext(ctx)),
retryhandler.New(limited.FetchHandler(p.ContentStore, fetcher, p.ref), logs.LoggerFromContext(ctx)),
childrenHandler,
dslHandler,
)

View File

@ -11,7 +11,6 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/moby/buildkit/session"
@ -20,6 +19,7 @@ import (
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/logs"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -86,7 +86,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content
}
})
pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), ref, logs.LoggerFromContext(ctx))
pushHandler := retryhandler.New(limited.PushHandler(pusher, provider, ref), logs.LoggerFromContext(ctx))
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
if err != nil {
return err

View File

@ -0,0 +1,181 @@
package limited
import (
"context"
"io"
"runtime"
"sync"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/docker/distribution/reference"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
var Default = New(4)
type Group struct {
mu sync.Mutex
size int
sem map[string]*semaphore.Weighted
}
type req struct {
g *Group
ref string
}
func (r *req) acquire(ctx context.Context) (func(), error) {
r.g.mu.Lock()
s, ok := r.g.sem[r.ref]
if !ok {
s = semaphore.NewWeighted(int64(r.g.size))
r.g.sem[r.ref] = s
}
r.g.mu.Unlock()
if err := s.Acquire(ctx, 1); err != nil {
return nil, err
}
return func() {
s.Release(1)
}, nil
}
func New(size int) *Group {
return &Group{
size: size,
sem: make(map[string]*semaphore.Weighted),
}
}
func (g *Group) req(ref string) *req {
return &req{g: g, ref: domain(ref)}
}
func (g *Group) WrapFetcher(f remotes.Fetcher, ref string) remotes.Fetcher {
return &fetcher{Fetcher: f, req: g.req(ref)}
}
func (g *Group) WrapPusher(p remotes.Pusher, ref string) remotes.Pusher {
return &pusher{Pusher: p, req: g.req(ref)}
}
type pusher struct {
remotes.Pusher
req *req
}
func (p *pusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) {
release, err := p.req.acquire(ctx)
if err != nil {
return nil, err
}
w, err := p.Pusher.Push(ctx, desc)
if err != nil {
release()
return nil, err
}
ww := &writer{Writer: w}
closer := func() {
if !ww.closed {
logrus.Warnf("writer not closed cleanly: %s", desc.Digest)
}
release()
}
ww.release = closer
runtime.SetFinalizer(ww, func(rc *writer) {
rc.close()
})
return ww, nil
}
type writer struct {
content.Writer
once sync.Once
release func()
closed bool
}
func (w *writer) Close() error {
w.closed = true
w.close()
return w.Writer.Close()
}
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
w.closed = true
w.close()
return w.Writer.Commit(ctx, size, expected, opts...)
}
func (w *writer) close() {
w.once.Do(w.release)
}
type fetcher struct {
remotes.Fetcher
req *req
}
func (f *fetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
release, err := f.req.acquire(ctx)
if err != nil {
return nil, err
}
rc, err := f.Fetcher.Fetch(ctx, desc)
if err != nil {
release()
return nil, err
}
rcw := &readCloser{ReadCloser: rc}
closer := func() {
if !rcw.closed {
logrus.Warnf("fetcher not closed cleanly: %s", desc.Digest)
}
release()
}
rcw.release = closer
runtime.SetFinalizer(rcw, func(rc *readCloser) {
rc.close()
})
return rcw, nil
}
type readCloser struct {
io.ReadCloser
once sync.Once
closed bool
release func()
}
func (r *readCloser) Close() error {
r.closed = true
r.close()
return r.ReadCloser.Close()
}
func (r *readCloser) close() {
r.once.Do(r.release)
}
func FetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, ref string) images.HandlerFunc {
return remotes.FetchHandler(ingester, Default.WrapFetcher(fetcher, ref))
}
func PushHandler(pusher remotes.Pusher, provider content.Provider, ref string) images.HandlerFunc {
return remotes.PushHandler(Default.WrapPusher(pusher, ref), provider)
}
func domain(ref string) string {
if ref != "" {
if named, err := reference.ParseNormalizedNamed(ref); err == nil {
return reference.Domain(named)
}
}
return ref
}

View File

@ -5,43 +5,17 @@ import (
"fmt"
"io"
"net"
"sync"
"syscall"
"time"
"github.com/containerd/containerd/images"
remoteserrors "github.com/containerd/containerd/remotes/errors"
"github.com/docker/distribution/reference"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)
var mu sync.Mutex
var sem = map[string]*semaphore.Weighted{}
const connsPerHost = 4
func New(f images.HandlerFunc, ref string, logger func([]byte)) images.HandlerFunc {
if ref != "" {
if named, err := reference.ParseNormalizedNamed(ref); err == nil {
ref = reference.Domain(named)
}
}
func New(f images.HandlerFunc, logger func([]byte)) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
mu.Lock()
s, ok := sem[ref]
if !ok {
s = semaphore.NewWeighted(connsPerHost)
sem[ref] = s
}
mu.Unlock()
if err := s.Acquire(ctx, 1); err != nil {
return nil, err
}
defer s.Release(1)
backoff := time.Second
for {
descs, err := f(ctx, desc)