progress: refactor logger handler to better reuse

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
v0.8
Tonis Tiigi 2020-11-15 12:33:37 -08:00
parent aa29e77294
commit cf8babde54
5 changed files with 24 additions and 55 deletions

16
cache/remote.go vendored
View File

@ -9,14 +9,12 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/reference"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/logs"
"github.com/moby/buildkit/util/pull/pullprogress"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
@ -183,7 +181,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{
Provider: p.dh.Provider(p.session),
Manager: p.ref.cm.ContentStore,
}, p.desc, loggerFromContext(ctx))
}, p.desc, logs.LoggerFromContext(ctx))
if err != nil {
return nil, err
}
@ -203,13 +201,3 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
})
return err
}
func loggerFromContext(ctx context.Context) func([]byte) {
return func(dt []byte) {
pw, _, _ := progress.FromContext(ctx)
pw.Write(identity.NewID(), client.VertexLog{
Stream: 2,
Data: []byte(dt),
})
}
}

View File

@ -10,13 +10,12 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/logs"
digest "github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -96,7 +95,7 @@ func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string
return nil, errors.Errorf("missing blob %s", l.Blob)
}
layerDone := oneOffProgress(ctx, fmt.Sprintf("writing layer %s", l.Blob))
if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, loggerFromContext(ctx)); err != nil {
if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, logs.LoggerFromContext(ctx)); err != nil {
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
}
layerDone(nil)
@ -146,13 +145,3 @@ func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string
mfstDone(nil)
return res, nil
}
func loggerFromContext(ctx context.Context) func([]byte) {
return func(dt []byte) {
pw, _, _ := progress.FromContext(ctx)
pw.Write(identity.NewID(), client.VertexLog{
Stream: 2,
Data: []byte(dt),
})
}
}

View File

@ -20,10 +20,15 @@ import (
var defaultMaxLogSize = 1024 * 1024
var defaultMaxLogSpeed = 100 * 1024 // per second
const (
stdout = 1
stderr = 2
)
var configCheckOnce sync.Once
func NewLogStreams(ctx context.Context, printOutput bool) (io.WriteCloser, io.WriteCloser) {
return newStreamWriter(ctx, 1, printOutput), newStreamWriter(ctx, 2, printOutput)
return newStreamWriter(ctx, stdout, printOutput), newStreamWriter(ctx, stderr, printOutput)
}
func newStreamWriter(ctx context.Context, stream int, printOutput bool) io.WriteCloser {
@ -123,3 +128,13 @@ func (sw *streamWriter) Write(dt []byte) (int, error) {
func (sw *streamWriter) Close() error {
return sw.pw.Close()
}
func LoggerFromContext(ctx context.Context) func([]byte) {
return func(dt []byte) {
pw, _, _ := progress.FromContext(ctx)
pw.Write(identity.NewID(), client.VertexLog{
Stream: stderr,
Data: []byte(dt),
})
}
}

View File

@ -11,13 +11,11 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/logs"
"github.com/moby/buildkit/util/pull/pullprogress"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/retryhandler"
@ -150,7 +148,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
}
handlers = append(handlers,
filterLayerBlobs(metadata, &mu),
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), loggerFromContext(ctx)),
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), logs.LoggerFromContext(ctx)),
childrenHandler,
dslHandler,
)
@ -263,13 +261,3 @@ func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Desc
}
return layers, nil
}
func loggerFromContext(ctx context.Context) func([]byte) {
return func(dt []byte) {
pw, _, _ := progress.FromContext(ctx)
pw.Write(identity.NewID(), client.VertexLog{
Stream: 2,
Data: []byte(dt),
})
}
}

View File

@ -14,13 +14,12 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/cmd/buildkitd/config"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/logs"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
@ -83,7 +82,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content
}
})
pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), loggerFromContext(ctx))
pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), logs.LoggerFromContext(ctx))
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
if err != nil {
return err
@ -312,13 +311,3 @@ func dedupeHandler(h images.HandlerFunc) images.HandlerFunc {
return res.([]ocispec.Descriptor), nil
})
}
func loggerFromContext(ctx context.Context) func([]byte) {
return func(dt []byte) {
pw, _, _ := progress.FromContext(ctx)
pw.Write(identity.NewID(), client.VertexLog{
Stream: 2,
Data: []byte(dt),
})
}
}