Fix progress in schema1 pull.

Signed-off-by: Erik Sipsma <erik@sipsma.dev>
v0.8
Erik Sipsma 2020-08-03 05:58:15 -07:00
parent 55cbd19dec
commit cdcf49fd18
4 changed files with 155 additions and 109 deletions

101
cache/progress.go vendored
View File

@ -1,101 +0,0 @@
package cache
import (
"context"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/moby/buildkit/util/progress"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type providerWithProgress struct {
provider content.Provider
manager interface {
content.IngestManager
content.Manager
}
}
func (p *providerWithProgress) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
doneCh := make(chan struct{})
go func() {
ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop()
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
ingestRef := remotes.MakeRefKey(ctx, desc)
started := time.Now()
onFinalStatus := false
for {
if onFinalStatus {
return
}
select {
case <-doneCh:
onFinalStatus = true
case <-ctx.Done():
onFinalStatus = true
case <-ticker.C:
}
status, err := p.manager.Status(ctx, ingestRef)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Action: "downloading",
Current: int(status.Offset),
Total: int(status.Total),
Started: &started,
})
continue
} else if !errors.Is(err, errdefs.ErrNotFound) {
logrus.Errorf("unexpected error getting ingest status of %q: %v", ingestRef, err)
return
}
info, err := p.manager.Info(ctx, desc.Digest)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Action: "done",
Current: int(info.Size),
Total: int(info.Size),
Started: &started,
Completed: &info.CreatedAt,
})
return
}
if errors.Is(err, errdefs.ErrNotFound) {
pw.Write(desc.Digest.String(), progress.Status{
Action: "waiting",
})
} else {
logrus.Errorf("unexpected error getting content status of %q: %v", desc.Digest.String(), err)
return
}
}
}()
ra, err := p.provider.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
return readerAtWithCloseCh{ReaderAt: ra, closeCh: doneCh}, nil
}
type readerAtWithCloseCh struct {
content.ReaderAt
closeCh chan struct{}
}
func (ra readerAtWithCloseCh) Close() error {
close(ra.closeCh)
return ra.ReaderAt.Close()
}

7
cache/remote.go vendored
View File

@ -13,6 +13,7 @@ import (
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/pull/pullprogress"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
@ -152,9 +153,9 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
// For now, just pull down the whole content and then return a ReaderAt from the local content
// store. If efficient partial reads are desired in the future, something more like a "tee"
// that caches remote partial reads to a local store may need to replace this.
err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &providerWithProgress{
provider: p.dh.Provider,
manager: p.ref.cm.ContentStore,
err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{
Provider: p.dh.Provider,
Manager: p.ref.cm.ContentStore,
}, p.desc)
if err != nil {
return nil, err

View File

@ -14,6 +14,7 @@ import (
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/pull/pullprogress"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
@ -120,7 +121,12 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
var schema1Converter *schema1.Converter
if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
schema1Converter = schema1.NewConverter(p.ContentStore, fetcher)
// schema1 images are not lazy at this time, the converter will pull the whole image
// including layer blobs
schema1Converter = schema1.NewConverter(p.ContentStore, &pullprogress.FetcherWithProgress{
Fetcher: fetcher,
Manager: p.ContentStore,
})
handlers = append(handlers, schema1Converter)
} else {
// Get all the children for a descriptor
@ -152,12 +158,12 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
return nil, err
}
handlers := []images.Handler{
// this just gathers metadata about the converted descriptors making up the image, does
// not fetch anything
if err := images.Dispatch(ctx, images.Handlers(
filterLayerBlobs(metadata, &mu),
images.FilterPlatforms(images.ChildrenHandler(p.ContentStore), platform),
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, p.desc); err != nil {
), nil, p.desc); err != nil {
return nil, err
}
}
@ -202,6 +208,8 @@ func (p *Puller) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content
return contentutil.FromFetcher(fetcher).ReaderAt(ctx, desc)
}
// filterLayerBlobs causes layer blobs to be skipped for fetch, which is required to support lazy blobs.
// It also stores the non-layer blobs (metadata) it encounters in the provided map.
func filterLayerBlobs(metadata map[digest.Digest]ocispec.Descriptor, mu sync.Locker) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
switch desc.MediaType {

View File

@ -0,0 +1,138 @@
package pullprogress
import (
"context"
"io"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/moby/buildkit/util/progress"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type PullManager interface {
content.IngestManager
content.Manager
}
type ProviderWithProgress struct {
Provider content.Provider
Manager PullManager
}
func (p *ProviderWithProgress) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
ra, err := p.Provider.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, p.Manager, doneCh)
return readerAtWithCancel{ReaderAt: ra, cancel: cancel, doneCh: doneCh}, nil
}
type readerAtWithCancel struct {
content.ReaderAt
cancel func()
doneCh <-chan struct{}
}
func (ra readerAtWithCancel) Close() error {
ra.cancel()
select {
case <-ra.doneCh:
case <-time.After(time.Second):
logrus.Warn("timeout waiting for pull progress to complete")
}
return ra.ReaderAt.Close()
}
type FetcherWithProgress struct {
Fetcher remotes.Fetcher
Manager PullManager
}
func (f *FetcherWithProgress) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
rc, err := f.Fetcher.Fetch(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, f.Manager, doneCh)
return readerWithCancel{ReadCloser: rc, cancel: cancel, doneCh: doneCh}, nil
}
type readerWithCancel struct {
io.ReadCloser
cancel func()
doneCh <-chan struct{}
}
func (r readerWithCancel) Close() error {
r.cancel()
select {
case <-r.doneCh:
case <-time.After(time.Second):
logrus.Warn("timeout waiting for pull progress to complete")
}
return r.ReadCloser.Close()
}
func trackProgress(ctx context.Context, desc ocispec.Descriptor, manager PullManager, doneCh chan<- struct{}) {
defer close(doneCh)
ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop()
go func() {
<-ctx.Done()
ticker.Stop()
}()
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
ingestRef := remotes.MakeRefKey(ctx, desc)
started := time.Now()
onFinalStatus := false
for !onFinalStatus {
select {
case <-ctx.Done():
onFinalStatus = true
case <-ticker.C:
}
status, err := manager.Status(ctx, ingestRef)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Current: int(status.Offset),
Total: int(status.Total),
Started: &started,
})
continue
} else if !errors.Is(err, errdefs.ErrNotFound) {
logrus.Errorf("unexpected error getting ingest status of %q: %v", ingestRef, err)
return
}
info, err := manager.Info(ctx, desc.Digest)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Current: int(info.Size),
Total: int(info.Size),
Started: &started,
Completed: &info.CreatedAt,
})
return
}
}
}