add per domain semaphore to limit concurrent connections
This is a safer alternative until we figure out why http.Transport based limiting fails. Some connections like cache export/import do not have a domain key atm and these connections use global pool. Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>v0.9
parent
d3cd28f59f
commit
a558ac4cce
|
@ -13,6 +13,7 @@ type DescHandler struct {
|
||||||
Provider func(session.Group) content.Provider
|
Provider func(session.Group) content.Provider
|
||||||
Progress progress.Controller
|
Progress progress.Controller
|
||||||
SnapshotLabels map[string]string
|
SnapshotLabels map[string]string
|
||||||
|
Ref string // string representation of desc origin, can be used as a sync key
|
||||||
}
|
}
|
||||||
|
|
||||||
type DescHandlers map[digest.Digest]*DescHandler
|
type DescHandlers map[digest.Digest]*DescHandler
|
||||||
|
|
|
@ -208,7 +208,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
|
||||||
err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{
|
err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{
|
||||||
Provider: p.dh.Provider(p.session),
|
Provider: p.dh.Provider(p.session),
|
||||||
Manager: p.ref.cm.ContentStore,
|
Manager: p.ref.cm.ContentStore,
|
||||||
}, p.desc, logs.LoggerFromContext(ctx))
|
}, p.desc, p.dh.Ref, logs.LoggerFromContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,11 +58,12 @@ type contentCacheExporter struct {
|
||||||
chains *v1.CacheChains
|
chains *v1.CacheChains
|
||||||
ingester content.Ingester
|
ingester content.Ingester
|
||||||
oci bool
|
oci bool
|
||||||
|
ref string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewExporter(ingester content.Ingester, oci bool) Exporter {
|
func NewExporter(ingester content.Ingester, ref string, oci bool) Exporter {
|
||||||
cc := v1.NewCacheChains()
|
cc := v1.NewCacheChains()
|
||||||
return &contentCacheExporter{CacheExporterTarget: cc, chains: cc, ingester: ingester, oci: oci}
|
return &contentCacheExporter{CacheExporterTarget: cc, chains: cc, ingester: ingester, oci: oci, ref: ref}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string, error) {
|
func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string, error) {
|
||||||
|
@ -95,7 +96,7 @@ func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string
|
||||||
return nil, errors.Errorf("missing blob %s", l.Blob)
|
return nil, errors.Errorf("missing blob %s", l.Blob)
|
||||||
}
|
}
|
||||||
layerDone := oneOffProgress(ctx, fmt.Sprintf("writing layer %s", l.Blob))
|
layerDone := oneOffProgress(ctx, fmt.Sprintf("writing layer %s", l.Blob))
|
||||||
if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, logs.LoggerFromContext(ctx)); err != nil {
|
if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, ce.ref, logs.LoggerFromContext(ctx)); err != nil {
|
||||||
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
|
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
|
||||||
}
|
}
|
||||||
layerDone(nil)
|
layerDone(nil)
|
||||||
|
|
|
@ -42,7 +42,7 @@ func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExpor
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return remotecache.NewExporter(cs, ociMediatypes), nil
|
return remotecache.NewExporter(cs, "", ociMediatypes), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) r
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return remotecache.NewExporter(contentutil.FromPusher(pusher), ociMediatypes), nil
|
return remotecache.NewExporter(contentutil.FromPusher(pusher), ref, ociMediatypes), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -237,6 +237,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach
|
||||||
Provider: p.manifest.Provider,
|
Provider: p.manifest.Provider,
|
||||||
Progress: progressController,
|
Progress: progressController,
|
||||||
SnapshotLabels: labels,
|
SnapshotLabels: labels,
|
||||||
|
Ref: p.manifest.Ref,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,8 +13,8 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, logger func([]byte)) error {
|
func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, ref string, logger func([]byte)) error {
|
||||||
if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), logger)(ctx, desc); err != nil {
|
if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), ref, logger)(ctx, desc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -65,7 +65,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content.
|
||||||
handlers := []images.Handler{
|
handlers := []images.Handler{
|
||||||
images.ChildrenHandler(provider),
|
images.ChildrenHandler(provider),
|
||||||
filterHandler,
|
filterHandler,
|
||||||
retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), func(_ []byte) {}),
|
retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), "", func(_ []byte) {}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
||||||
|
@ -73,7 +73,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content.
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := len(manifestStack) - 1; i >= 0; i-- {
|
for i := len(manifestStack) - 1; i >= 0; i-- {
|
||||||
if err := Copy(ctx, ingester, provider, manifestStack[i], nil); err != nil {
|
if err := Copy(ctx, ingester, provider, manifestStack[i], "", nil); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ func TestCopy(t *testing.T) {
|
||||||
err := content.WriteBlob(ctx, b0, "foo", bytes.NewBuffer([]byte("foobar")), ocispec.Descriptor{Size: -1})
|
err := content.WriteBlob(ctx, b0, "foo", bytes.NewBuffer([]byte("foobar")), ocispec.Descriptor{Size: -1})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil)
|
err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, "", nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
|
dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
|
||||||
|
|
|
@ -26,7 +26,7 @@ func TestFetcher(t *testing.T) {
|
||||||
p := FromFetcher(f)
|
p := FromFetcher(f)
|
||||||
|
|
||||||
b1 := NewBuffer()
|
b1 := NewBuffer()
|
||||||
err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil)
|
err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, "", nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
|
dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
|
||||||
|
|
|
@ -101,7 +101,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co
|
||||||
children := childrenConfigHandler(cache, platform)
|
children := childrenConfigHandler(cache, platform)
|
||||||
|
|
||||||
handlers := []images.Handler{
|
handlers := []images.Handler{
|
||||||
retryhandler.New(remotes.FetchHandler(cache, fetcher), func(_ []byte) {}),
|
retryhandler.New(remotes.FetchHandler(cache, fetcher), str, func(_ []byte) {}),
|
||||||
children,
|
children,
|
||||||
}
|
}
|
||||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
||||||
|
|
|
@ -148,7 +148,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
|
||||||
}
|
}
|
||||||
handlers = append(handlers,
|
handlers = append(handlers,
|
||||||
filterLayerBlobs(metadata, &mu),
|
filterLayerBlobs(metadata, &mu),
|
||||||
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), logs.LoggerFromContext(ctx)),
|
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), p.ref, logs.LoggerFromContext(ctx)),
|
||||||
childrenHandler,
|
childrenHandler,
|
||||||
dslHandler,
|
dslHandler,
|
||||||
)
|
)
|
||||||
|
|
|
@ -86,7 +86,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), logs.LoggerFromContext(ctx))
|
pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), ref, logs.LoggerFromContext(ctx))
|
||||||
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
|
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -5,17 +5,43 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
remoteserrors "github.com/containerd/containerd/remotes/errors"
|
remoteserrors "github.com/containerd/containerd/remotes/errors"
|
||||||
|
"github.com/docker/distribution/reference"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
)
|
)
|
||||||
|
|
||||||
func New(f images.HandlerFunc, logger func([]byte)) images.HandlerFunc {
|
var mu sync.Mutex
|
||||||
|
var sem = map[string]*semaphore.Weighted{}
|
||||||
|
|
||||||
|
const connsPerHost = 4
|
||||||
|
|
||||||
|
func New(f images.HandlerFunc, ref string, logger func([]byte)) images.HandlerFunc {
|
||||||
|
if ref != "" {
|
||||||
|
if named, err := reference.ParseNormalizedNamed(ref); err == nil {
|
||||||
|
ref = reference.Domain(named)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||||
|
mu.Lock()
|
||||||
|
s, ok := sem[ref]
|
||||||
|
if !ok {
|
||||||
|
s = semaphore.NewWeighted(connsPerHost)
|
||||||
|
sem[ref] = s
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
if err := s.Acquire(ctx, 1); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer s.Release(1)
|
||||||
|
|
||||||
backoff := time.Second
|
backoff := time.Second
|
||||||
for {
|
for {
|
||||||
descs, err := f(ctx, desc)
|
descs, err := f(ctx, desc)
|
||||||
|
|
Loading…
Reference in New Issue