commit
a846452fa1
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/moby/buildkit/util/compression"
|
"github.com/moby/buildkit/util/compression"
|
||||||
"github.com/moby/buildkit/util/contentutil"
|
"github.com/moby/buildkit/util/contentutil"
|
||||||
"github.com/moby/buildkit/util/leaseutil"
|
"github.com/moby/buildkit/util/leaseutil"
|
||||||
|
"github.com/moby/buildkit/util/progress/logs"
|
||||||
"github.com/moby/buildkit/util/pull/pullprogress"
|
"github.com/moby/buildkit/util/pull/pullprogress"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -182,7 +183,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
|
||||||
err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{
|
err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{
|
||||||
Provider: p.dh.Provider(p.session),
|
Provider: p.dh.Provider(p.session),
|
||||||
Manager: p.ref.cm.ContentStore,
|
Manager: p.ref.cm.ContentStore,
|
||||||
}, p.desc)
|
}, p.desc, logs.LoggerFromContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/moby/buildkit/util/compression"
|
"github.com/moby/buildkit/util/compression"
|
||||||
"github.com/moby/buildkit/util/contentutil"
|
"github.com/moby/buildkit/util/contentutil"
|
||||||
"github.com/moby/buildkit/util/progress"
|
"github.com/moby/buildkit/util/progress"
|
||||||
|
"github.com/moby/buildkit/util/progress/logs"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
specs "github.com/opencontainers/image-spec/specs-go"
|
specs "github.com/opencontainers/image-spec/specs-go"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
|
@ -94,7 +95,7 @@ func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string
|
||||||
return nil, errors.Errorf("missing blob %s", l.Blob)
|
return nil, errors.Errorf("missing blob %s", l.Blob)
|
||||||
}
|
}
|
||||||
layerDone := oneOffProgress(ctx, fmt.Sprintf("writing layer %s", l.Blob))
|
layerDone := oneOffProgress(ctx, fmt.Sprintf("writing layer %s", l.Blob))
|
||||||
if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor); err != nil {
|
if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, logs.LoggerFromContext(ctx)); err != nil {
|
||||||
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
|
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
|
||||||
}
|
}
|
||||||
layerDone(nil)
|
layerDone(nil)
|
||||||
|
|
|
@ -8,12 +8,13 @@ import (
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
"github.com/containerd/containerd/remotes"
|
"github.com/containerd/containerd/remotes"
|
||||||
|
"github.com/moby/buildkit/util/resolver/retryhandler"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor) error {
|
func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, logger func([]byte)) error {
|
||||||
if _, err := remotes.FetchHandler(ingester, &localFetcher{provider})(ctx, desc); err != nil {
|
if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), logger)(ctx, desc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -64,7 +65,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content.
|
||||||
handlers := []images.Handler{
|
handlers := []images.Handler{
|
||||||
images.ChildrenHandler(provider),
|
images.ChildrenHandler(provider),
|
||||||
filterHandler,
|
filterHandler,
|
||||||
remotes.FetchHandler(ingester, &localFetcher{provider}),
|
retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), nil),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
||||||
|
@ -72,7 +73,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content.
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := len(manifestStack) - 1; i >= 0; i-- {
|
for i := len(manifestStack) - 1; i >= 0; i-- {
|
||||||
if err := Copy(ctx, ingester, provider, manifestStack[i]); err != nil {
|
if err := Copy(ctx, ingester, provider, manifestStack[i], nil); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ func TestCopy(t *testing.T) {
|
||||||
err := content.WriteBlob(ctx, b0, "foo", bytes.NewBuffer([]byte("foobar")), ocispec.Descriptor{Size: -1})
|
err := content.WriteBlob(ctx, b0, "foo", bytes.NewBuffer([]byte("foobar")), ocispec.Descriptor{Size: -1})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1})
|
err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
|
dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
|
||||||
|
|
|
@ -26,7 +26,7 @@ func TestFetcher(t *testing.T) {
|
||||||
p := FromFetcher(f)
|
p := FromFetcher(f)
|
||||||
|
|
||||||
b1 := NewBuffer()
|
b1 := NewBuffer()
|
||||||
err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1})
|
err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
|
dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/containerd/containerd/remotes"
|
"github.com/containerd/containerd/remotes"
|
||||||
"github.com/containerd/containerd/remotes/docker"
|
"github.com/containerd/containerd/remotes/docker"
|
||||||
"github.com/moby/buildkit/util/leaseutil"
|
"github.com/moby/buildkit/util/leaseutil"
|
||||||
|
"github.com/moby/buildkit/util/resolver/retryhandler"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
specs "github.com/opencontainers/image-spec/specs-go/v1"
|
specs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -100,7 +101,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co
|
||||||
children := childrenConfigHandler(cache, platform)
|
children := childrenConfigHandler(cache, platform)
|
||||||
|
|
||||||
handlers := []images.Handler{
|
handlers := []images.Handler{
|
||||||
remotes.FetchHandler(cache, fetcher),
|
retryhandler.New(remotes.FetchHandler(cache, fetcher), nil),
|
||||||
children,
|
children,
|
||||||
}
|
}
|
||||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
||||||
|
|
|
@ -20,10 +20,15 @@ import (
|
||||||
var defaultMaxLogSize = 1024 * 1024
|
var defaultMaxLogSize = 1024 * 1024
|
||||||
var defaultMaxLogSpeed = 100 * 1024 // per second
|
var defaultMaxLogSpeed = 100 * 1024 // per second
|
||||||
|
|
||||||
|
const (
|
||||||
|
stdout = 1
|
||||||
|
stderr = 2
|
||||||
|
)
|
||||||
|
|
||||||
var configCheckOnce sync.Once
|
var configCheckOnce sync.Once
|
||||||
|
|
||||||
func NewLogStreams(ctx context.Context, printOutput bool) (io.WriteCloser, io.WriteCloser) {
|
func NewLogStreams(ctx context.Context, printOutput bool) (io.WriteCloser, io.WriteCloser) {
|
||||||
return newStreamWriter(ctx, 1, printOutput), newStreamWriter(ctx, 2, printOutput)
|
return newStreamWriter(ctx, stdout, printOutput), newStreamWriter(ctx, stderr, printOutput)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStreamWriter(ctx context.Context, stream int, printOutput bool) io.WriteCloser {
|
func newStreamWriter(ctx context.Context, stream int, printOutput bool) io.WriteCloser {
|
||||||
|
@ -123,3 +128,13 @@ func (sw *streamWriter) Write(dt []byte) (int, error) {
|
||||||
func (sw *streamWriter) Close() error {
|
func (sw *streamWriter) Close() error {
|
||||||
return sw.pw.Close()
|
return sw.pw.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func LoggerFromContext(ctx context.Context) func([]byte) {
|
||||||
|
return func(dt []byte) {
|
||||||
|
pw, _, _ := progress.FromContext(ctx)
|
||||||
|
pw.Write(identity.NewID(), client.VertexLog{
|
||||||
|
Stream: stderr,
|
||||||
|
Data: []byte(dt),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -15,8 +15,10 @@ import (
|
||||||
"github.com/moby/buildkit/util/contentutil"
|
"github.com/moby/buildkit/util/contentutil"
|
||||||
"github.com/moby/buildkit/util/flightcontrol"
|
"github.com/moby/buildkit/util/flightcontrol"
|
||||||
"github.com/moby/buildkit/util/imageutil"
|
"github.com/moby/buildkit/util/imageutil"
|
||||||
|
"github.com/moby/buildkit/util/progress/logs"
|
||||||
"github.com/moby/buildkit/util/pull/pullprogress"
|
"github.com/moby/buildkit/util/pull/pullprogress"
|
||||||
"github.com/moby/buildkit/util/resolver"
|
"github.com/moby/buildkit/util/resolver"
|
||||||
|
"github.com/moby/buildkit/util/resolver/retryhandler"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -146,7 +148,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
|
||||||
}
|
}
|
||||||
handlers = append(handlers,
|
handlers = append(handlers,
|
||||||
filterLayerBlobs(metadata, &mu),
|
filterLayerBlobs(metadata, &mu),
|
||||||
remotes.FetchHandler(p.ContentStore, fetcher),
|
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), logs.LoggerFromContext(ctx)),
|
||||||
childrenHandler,
|
childrenHandler,
|
||||||
dslHandler,
|
dslHandler,
|
||||||
)
|
)
|
||||||
|
|
|
@ -19,7 +19,9 @@ import (
|
||||||
"github.com/moby/buildkit/util/flightcontrol"
|
"github.com/moby/buildkit/util/flightcontrol"
|
||||||
"github.com/moby/buildkit/util/imageutil"
|
"github.com/moby/buildkit/util/imageutil"
|
||||||
"github.com/moby/buildkit/util/progress"
|
"github.com/moby/buildkit/util/progress"
|
||||||
|
"github.com/moby/buildkit/util/progress/logs"
|
||||||
"github.com/moby/buildkit/util/resolver"
|
"github.com/moby/buildkit/util/resolver"
|
||||||
|
"github.com/moby/buildkit/util/resolver/retryhandler"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -80,7 +82,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
pushHandler := remotes.PushHandler(pusher, provider)
|
pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), logs.LoggerFromContext(ctx))
|
||||||
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
|
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
package retryhandler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/images"
|
||||||
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(f images.HandlerFunc, logger func([]byte)) images.HandlerFunc {
|
||||||
|
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||||
|
backoff := time.Second
|
||||||
|
for {
|
||||||
|
descs, err := f(ctx, desc)
|
||||||
|
if err != nil {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, err
|
||||||
|
default:
|
||||||
|
if !retryError(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if logger != nil {
|
||||||
|
logger([]byte(fmt.Sprintf("error: %v\n", err.Error())))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return descs, nil
|
||||||
|
}
|
||||||
|
// backoff logic
|
||||||
|
if backoff >= 8*time.Second {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if logger != nil {
|
||||||
|
logger([]byte(fmt.Sprintf("retrying in %v\n", backoff)))
|
||||||
|
}
|
||||||
|
time.Sleep(backoff)
|
||||||
|
backoff *= 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func retryError(err error) bool {
|
||||||
|
if errors.Is(err, io.EOF) || errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.EPIPE) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// https://github.com/containerd/containerd/pull/4724
|
||||||
|
if errors.Cause(err).Error() == "no response" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// net.ErrClosed exposed in go1.16
|
||||||
|
if strings.Contains(err.Error(), "use of closed network connection") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
Loading…
Reference in New Issue