diff --git a/cache/remote.go b/cache/remote.go index b74cb271..299a571f 100644 --- a/cache/remote.go +++ b/cache/remote.go @@ -14,6 +14,7 @@ import ( "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/leaseutil" + "github.com/moby/buildkit/util/progress/logs" "github.com/moby/buildkit/util/pull/pullprogress" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -182,7 +183,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{ Provider: p.dh.Provider(p.session), Manager: p.ref.cm.ContentStore, - }, p.desc) + }, p.desc, logs.LoggerFromContext(ctx)) if err != nil { return nil, err } diff --git a/cache/remotecache/export.go b/cache/remotecache/export.go index 542aa760..b39045c8 100644 --- a/cache/remotecache/export.go +++ b/cache/remotecache/export.go @@ -15,6 +15,7 @@ import ( "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/progress" + "github.com/moby/buildkit/util/progress/logs" digest "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -94,7 +95,7 @@ func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string return nil, errors.Errorf("missing blob %s", l.Blob) } layerDone := oneOffProgress(ctx, fmt.Sprintf("writing layer %s", l.Blob)) - if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor); err != nil { + if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, logs.LoggerFromContext(ctx)); err != nil { return nil, layerDone(errors.Wrap(err, "error writing layer blob")) } layerDone(nil) diff --git a/util/contentutil/copy.go b/util/contentutil/copy.go index 060d7a94..08c60473 100644 --- a/util/contentutil/copy.go +++ b/util/contentutil/copy.go @@ -8,12 +8,13 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" + "github.com/moby/buildkit/util/resolver/retryhandler" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) -func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor) error { - if _, err := remotes.FetchHandler(ingester, &localFetcher{provider})(ctx, desc); err != nil { +func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, logger func([]byte)) error { + if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), logger)(ctx, desc); err != nil { return err } return nil @@ -64,7 +65,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content. handlers := []images.Handler{ images.ChildrenHandler(provider), filterHandler, - remotes.FetchHandler(ingester, &localFetcher{provider}), + retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), nil), } if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { @@ -72,7 +73,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content. } for i := len(manifestStack) - 1; i >= 0; i-- { - if err := Copy(ctx, ingester, provider, manifestStack[i]); err != nil { + if err := Copy(ctx, ingester, provider, manifestStack[i], nil); err != nil { return errors.WithStack(err) } } diff --git a/util/contentutil/copy_test.go b/util/contentutil/copy_test.go index f3e8b000..90ba55d3 100644 --- a/util/contentutil/copy_test.go +++ b/util/contentutil/copy_test.go @@ -21,7 +21,7 @@ func TestCopy(t *testing.T) { err := content.WriteBlob(ctx, b0, "foo", bytes.NewBuffer([]byte("foobar")), ocispec.Descriptor{Size: -1}) require.NoError(t, err) - err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}) + err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil) require.NoError(t, err) dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))}) diff --git a/util/contentutil/fetcher_test.go b/util/contentutil/fetcher_test.go index e2521d79..57ba6cc9 100644 --- a/util/contentutil/fetcher_test.go +++ b/util/contentutil/fetcher_test.go @@ -26,7 +26,7 @@ func TestFetcher(t *testing.T) { p := FromFetcher(f) b1 := NewBuffer() - err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}) + err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil) require.NoError(t, err) dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))}) diff --git a/util/imageutil/config.go b/util/imageutil/config.go index 80ed268f..c1ea0214 100644 --- a/util/imageutil/config.go +++ b/util/imageutil/config.go @@ -14,6 +14,7 @@ import ( "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/moby/buildkit/util/leaseutil" + "github.com/moby/buildkit/util/resolver/retryhandler" digest "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -100,7 +101,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co children := childrenConfigHandler(cache, platform) handlers := []images.Handler{ - remotes.FetchHandler(cache, fetcher), + retryhandler.New(remotes.FetchHandler(cache, fetcher), nil), children, } if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { diff --git a/util/progress/logs/logs.go b/util/progress/logs/logs.go index d54dfd3e..da82c692 100644 --- a/util/progress/logs/logs.go +++ b/util/progress/logs/logs.go @@ -20,10 +20,15 @@ import ( var defaultMaxLogSize = 1024 * 1024 var defaultMaxLogSpeed = 100 * 1024 // per second +const ( + stdout = 1 + stderr = 2 +) + var configCheckOnce sync.Once func NewLogStreams(ctx context.Context, printOutput bool) (io.WriteCloser, io.WriteCloser) { - return newStreamWriter(ctx, 1, printOutput), newStreamWriter(ctx, 2, printOutput) + return newStreamWriter(ctx, stdout, printOutput), newStreamWriter(ctx, stderr, printOutput) } func newStreamWriter(ctx context.Context, stream int, printOutput bool) io.WriteCloser { @@ -123,3 +128,13 @@ func (sw *streamWriter) Write(dt []byte) (int, error) { func (sw *streamWriter) Close() error { return sw.pw.Close() } + +func LoggerFromContext(ctx context.Context) func([]byte) { + return func(dt []byte) { + pw, _, _ := progress.FromContext(ctx) + pw.Write(identity.NewID(), client.VertexLog{ + Stream: stderr, + Data: []byte(dt), + }) + } +} diff --git a/util/pull/pull.go b/util/pull/pull.go index c968ba0c..ee9d67ee 100644 --- a/util/pull/pull.go +++ b/util/pull/pull.go @@ -15,8 +15,10 @@ import ( "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/imageutil" + "github.com/moby/buildkit/util/progress/logs" "github.com/moby/buildkit/util/pull/pullprogress" "github.com/moby/buildkit/util/resolver" + "github.com/moby/buildkit/util/resolver/retryhandler" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -146,7 +148,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) { } handlers = append(handlers, filterLayerBlobs(metadata, &mu), - remotes.FetchHandler(p.ContentStore, fetcher), + retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), logs.LoggerFromContext(ctx)), childrenHandler, dslHandler, ) diff --git a/util/push/push.go b/util/push/push.go index e73caf8b..338ab02a 100644 --- a/util/push/push.go +++ b/util/push/push.go @@ -19,7 +19,9 @@ import ( "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/imageutil" "github.com/moby/buildkit/util/progress" + "github.com/moby/buildkit/util/progress/logs" "github.com/moby/buildkit/util/resolver" + "github.com/moby/buildkit/util/resolver/retryhandler" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -80,7 +82,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content } }) - pushHandler := remotes.PushHandler(pusher, provider) + pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), logs.LoggerFromContext(ctx)) pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref) if err != nil { return err diff --git a/util/resolver/retryhandler/retry.go b/util/resolver/retryhandler/retry.go new file mode 100644 index 00000000..bddcfb84 --- /dev/null +++ b/util/resolver/retryhandler/retry.go @@ -0,0 +1,64 @@ +package retryhandler + +import ( + "context" + "fmt" + "io" + "strings" + "syscall" + "time" + + "github.com/containerd/containerd/images" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +func New(f images.HandlerFunc, logger func([]byte)) images.HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + backoff := time.Second + for { + descs, err := f(ctx, desc) + if err != nil { + select { + case <-ctx.Done(): + return nil, err + default: + if !retryError(err) { + return nil, err + } + } + if logger != nil { + logger([]byte(fmt.Sprintf("error: %v\n", err.Error()))) + } + } else { + return descs, nil + } + // backoff logic + if backoff >= 8*time.Second { + return nil, err + } + if logger != nil { + logger([]byte(fmt.Sprintf("retrying in %v\n", backoff))) + } + time.Sleep(backoff) + backoff *= 2 + } + } +} + +func retryError(err error) bool { + if errors.Is(err, io.EOF) || errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.EPIPE) { + return true + } + // https://github.com/containerd/containerd/pull/4724 + if errors.Cause(err).Error() == "no response" { + return true + } + + // net.ErrClosed exposed in go1.16 + if strings.Contains(err.Error(), "use of closed network connection") { + return true + } + + return false +}