From 355126e080f6f299311f8939d6fa18c8e19f57da Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Fri, 13 Apr 2018 14:10:59 -0700 Subject: [PATCH] cacheimport: implementation using v1 package Signed-off-by: Tonis Tiigi --- cache/cacheimport/export.go | 165 ++++++++------------ cache/cacheimport/import.go | 290 ++++-------------------------------- 2 files changed, 89 insertions(+), 366 deletions(-) diff --git a/cache/cacheimport/export.go b/cache/cacheimport/export.go index 19ab1179..303b6fe5 100644 --- a/cache/cacheimport/export.go +++ b/cache/cacheimport/export.go @@ -4,36 +4,24 @@ import ( "bytes" "context" "encoding/json" + "fmt" "time" "github.com/containerd/containerd/content" - "github.com/containerd/containerd/diff" "github.com/containerd/containerd/images" "github.com/docker/distribution/manifest" - "github.com/docker/distribution/manifest/schema2" - "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/blobs" + v1 "github.com/moby/buildkit/cache/cacheimport/v1" "github.com/moby/buildkit/session" - "github.com/moby/buildkit/snapshot" + solver "github.com/moby/buildkit/solver-next" + "github.com/moby/buildkit/util/contentutil" + "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/push" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) -const mediaTypeConfig = "application/vnd.buildkit.cacheconfig.v0" - -type CacheRecord struct { - CacheKey digest.Digest - Reference cache.ImmutableRef - ContentKey digest.Digest -} - type ExporterOpt struct { - Snapshotter snapshot.Snapshotter - ContentStore content.Store - Differ diff.Comparer SessionManager *session.Manager } @@ -45,41 +33,15 @@ type CacheExporter struct { opt ExporterOpt } -func (ce *CacheExporter) Export(ctx context.Context, rec []CacheRecord, target string) error { - allBlobs := map[digest.Digest][]blobs.DiffPair{} - currentBlobs := map[digest.Digest]struct{}{} - type cr struct { - CacheRecord - dgst digest.Digest - } +func (ce *CacheExporter) ExporterForTarget(target string) *RegistryCacheExporter { + cc := v1.NewCacheChains() + return &RegistryCacheExporter{target: target, ExporterTarget: cc, chains: cc, exporter: ce} +} - list := make([]cr, 0, len(rec)) - - for _, r := range rec { - ref := r.Reference - if ref == nil { - list = append(list, cr{CacheRecord: r}) - continue - } - - dpairs, err := blobs.GetDiffPairs(ctx, ce.opt.ContentStore, ce.opt.Snapshotter, ce.opt.Differ, ref) - if err != nil { - return err - } - - for i, dp := range dpairs { - allBlobs[dp.Blobsum] = dpairs[:i+1] - } - - dgst := dpairs[len(dpairs)-1].Blobsum - list = append(list, cr{CacheRecord: r, dgst: dgst}) - currentBlobs[dgst] = struct{}{} - } - - for b := range allBlobs { - if _, ok := currentBlobs[b]; !ok { - list = append(list, cr{dgst: b}) - } +func (ce *CacheExporter) Finalize(ctx context.Context, cc *v1.CacheChains, target string) error { + config, descs, err := cc.Marshal() + if err != nil { + return err } // own type because oci type can't be pushed and docker type doesn't have annotations @@ -90,61 +52,42 @@ func (ce *CacheExporter) Export(ctx context.Context, rec []CacheRecord, target s Manifests []ocispec.Descriptor `json:"manifests"` } - var config cacheConfig - var mfst manifestList mfst.SchemaVersion = 2 mfst.MediaType = images.MediaTypeDockerSchema2ManifestList - for _, l := range list { - var size int64 - var parent digest.Digest - var diffID digest.Digest - if l.dgst != "" { - info, err := ce.opt.ContentStore.Info(ctx, l.dgst) - if err != nil { - return err - } - size = info.Size - chain := allBlobs[l.dgst] - if len(chain) > 1 { - parent = chain[len(chain)-2].Blobsum - } - diffID = chain[len(chain)-1].DiffID - - mfst.Manifests = append(mfst.Manifests, ocispec.Descriptor{ - MediaType: schema2.MediaTypeLayer, - Size: size, - Digest: l.dgst, - }) + allBlobs := map[digest.Digest]struct{}{} + mp := contentutil.NewMultiProvider(nil) + for _, l := range config.Layers { + if _, ok := allBlobs[l.Blob]; ok { + continue } + dgstPair, ok := descs[l.Blob] + if !ok { + return errors.Errorf("missing blob %s", l.Blob) + } + allBlobs[l.Blob] = struct{}{} + mp.Add(l.Blob, dgstPair.Provider) - config.Items = append(config.Items, configItem{ - Blobsum: l.dgst, - CacheKey: l.CacheKey, - ContentKey: l.ContentKey, - Parent: parent, - DiffID: diffID, - }) + mfst.Manifests = append(mfst.Manifests, dgstPair.Descriptor) } dt, err := json.Marshal(config) if err != nil { return err } - dgst := digest.FromBytes(dt) - addAsRoot := content.WithLabels(map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - }) - - if err := content.WriteBlob(ctx, ce.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst, addAsRoot); err != nil { - return errors.Wrap(err, "error writing config blob") + configDone := oneOffProgress(ctx, fmt.Sprintf("writing config %s", dgst)) + buf := contentutil.NewBuffer() + if err := content.WriteBlob(ctx, buf, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst); err != nil { + return configDone(errors.Wrap(err, "error writing config blob")) } + configDone(nil) + mp.Add(dgst, buf) mfst.Manifests = append(mfst.Manifests, ocispec.Descriptor{ - MediaType: mediaTypeConfig, + MediaType: v1.CacheConfigMediaTypeV0, Size: int64(len(dt)), Digest: dgst, }) @@ -153,26 +96,42 @@ func (ce *CacheExporter) Export(ctx context.Context, rec []CacheRecord, target s if err != nil { return errors.Wrap(err, "failed to marshal manifest") } - dgst = digest.FromBytes(dt) - if err := content.WriteBlob(ctx, ce.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst, addAsRoot); err != nil { - return errors.Wrap(err, "error writing manifest blob") + buf = contentutil.NewBuffer() + mfstDone := oneOffProgress(ctx, fmt.Sprintf("writing manifest %s", dgst)) + if err := content.WriteBlob(ctx, buf, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst); err != nil { + return mfstDone(errors.Wrap(err, "error writing manifest blob")) } + mfstDone(nil) + mp.Add(dgst, buf) - logrus.Debugf("cache-manifest: %s", dgst) - - return push.Push(ctx, ce.opt.SessionManager, ce.opt.ContentStore, dgst, target, false) + return push.Push(ctx, ce.opt.SessionManager, mp, dgst, target, false) } -type configItem struct { - Blobsum digest.Digest - CacheKey digest.Digest - ContentKey digest.Digest - Parent digest.Digest - DiffID digest.Digest +type RegistryCacheExporter struct { + solver.ExporterTarget + chains *v1.CacheChains + target string + exporter *CacheExporter } -type cacheConfig struct { - Items []configItem +func (ce *RegistryCacheExporter) Finalize(ctx context.Context) error { + return ce.exporter.Finalize(ctx, ce.chains, ce.target) +} + +func oneOffProgress(ctx context.Context, id string) func(err error) error { + pw, _, _ := progress.FromContext(ctx) + now := time.Now() + st := progress.Status{ + Started: &now, + } + pw.Write(id, st) + return func(err error) error { + now := time.Now() + st.Completed = &now + pw.Write(id, st) + pw.Close() + return err + } } diff --git a/cache/cacheimport/import.go b/cache/cacheimport/import.go index d7c84f98..5689855f 100644 --- a/cache/cacheimport/import.go +++ b/cache/cacheimport/import.go @@ -3,38 +3,25 @@ package cacheimport import ( "context" "encoding/json" - "fmt" "net/http" "time" "github.com/containerd/containerd/content" - "github.com/containerd/containerd/diff" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" - "github.com/containerd/containerd/rootfs" - cdsnapshot "github.com/containerd/containerd/snapshots" - "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/blobs" - "github.com/moby/buildkit/cache/instructioncache" - "github.com/moby/buildkit/client" - buildkitidentity "github.com/moby/buildkit/identity" + v1 "github.com/moby/buildkit/cache/cacheimport/v1" "github.com/moby/buildkit/session" "github.com/moby/buildkit/session/auth" - "github.com/moby/buildkit/snapshot" - "github.com/moby/buildkit/util/progress" - digest "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/identity" + solver "github.com/moby/buildkit/solver-next" + "github.com/moby/buildkit/util/contentutil" + "github.com/moby/buildkit/worker" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" ) type ImportOpt struct { SessionManager *session.Manager - ContentStore content.Store - Snapshotter snapshot.Snapshotter - Applier diff.Applier - CacheAccessor cache.Accessor + Worker worker.Worker // TODO: remove. This sets the worker where the cache is imported to. Should be passed on load instead. } func NewCacheImporter(opt ImportOpt) *CacheImporter { @@ -64,7 +51,7 @@ func (ci *CacheImporter) getCredentialsFromSession(ctx context.Context) func(str } } -func (ci *CacheImporter) pull(ctx context.Context, ref string) (*ocispec.Descriptor, remotes.Fetcher, error) { +func (ci *CacheImporter) Resolve(ctx context.Context, ref string) (solver.CacheManager, error) { resolver := docker.NewResolver(docker.ResolverOptions{ Client: http.DefaultClient, Credentials: ci.getCredentialsFromSession(ctx), @@ -72,28 +59,21 @@ func (ci *CacheImporter) pull(ctx context.Context, ref string) (*ocispec.Descrip ref, desc, err := resolver.Resolve(ctx, ref) if err != nil { - return nil, nil, err + return nil, err } fetcher, err := resolver.Fetcher(ctx, ref) - if err != nil { - return nil, nil, err - } - - if _, err := remotes.FetchHandler(ci.opt.ContentStore, fetcher)(ctx, desc); err != nil { - return nil, nil, err - } - - return &desc, fetcher, err -} - -func (ci *CacheImporter) Import(ctx context.Context, ref string) (instructioncache.InstructionCache, error) { - desc, fetcher, err := ci.pull(ctx, ref) if err != nil { return nil, err } - dt, err := content.ReadBlob(ctx, ci.opt.ContentStore, desc.Digest) + b := contentutil.NewBuffer() + + if _, err := remotes.FetchHandler(b, fetcher)(ctx, desc); err != nil { + return nil, err + } + + dt, err := content.ReadBlob(ctx, b, desc.Digest) if err != nil { return nil, err } @@ -103,258 +83,42 @@ func (ci *CacheImporter) Import(ctx context.Context, ref string) (instructioncac return nil, err } - allDesc := map[digest.Digest]ocispec.Descriptor{} - allBlobs := map[digest.Digest]configItem{} - byCacheKey := map[digest.Digest]configItem{} - byContentKey := map[digest.Digest][]digest.Digest{} + allLayers := v1.DescriptorProvider{} var configDesc ocispec.Descriptor for _, m := range mfst.Manifests { - if m.MediaType == mediaTypeConfig { + if m.MediaType == v1.CacheConfigMediaTypeV0 { configDesc = m continue } - allDesc[m.Digest] = m + allLayers[m.Digest] = v1.DescriptorProviderPair{ + Descriptor: m, + Provider: contentutil.FromFetcher(fetcher, m), + } } if configDesc.Digest == "" { - return nil, errors.Errorf("invalid build cache: %s", ref) + return nil, errors.Errorf("invalid build cache from %s", ref) } - if _, err := remotes.FetchHandler(ci.opt.ContentStore, fetcher)(ctx, configDesc); err != nil { + if _, err := remotes.FetchHandler(b, fetcher)(ctx, configDesc); err != nil { return nil, err } - dt, err = content.ReadBlob(ctx, ci.opt.ContentStore, configDesc.Digest) + dt, err = content.ReadBlob(ctx, b, configDesc.Digest) if err != nil { return nil, err } - var cfg cacheConfig - if err := json.Unmarshal(dt, &cfg); err != nil { + cc := v1.NewCacheChains() + if err := v1.Parse(dt, allLayers, cc); err != nil { return nil, err } - for _, ci := range cfg.Items { - if ci.Blobsum != "" { - allBlobs[ci.Blobsum] = ci - } - if ci.CacheKey != "" { - byCacheKey[ci.CacheKey] = ci - if ci.ContentKey != "" { - byContentKey[ci.ContentKey] = append(byContentKey[ci.ContentKey], ci.CacheKey) - } - } - } - - return &importInfo{ - CacheImporter: ci, - byCacheKey: byCacheKey, - byContentKey: byContentKey, - allBlobs: allBlobs, - allDesc: allDesc, - fetcher: fetcher, - ref: ref, - }, nil -} - -type importInfo struct { - *CacheImporter - fetcher remotes.Fetcher - byCacheKey map[digest.Digest]configItem - byContentKey map[digest.Digest][]digest.Digest - allDesc map[digest.Digest]ocispec.Descriptor - allBlobs map[digest.Digest]configItem - ref string -} - -func (ii *importInfo) Probe(ctx context.Context, key digest.Digest) (bool, error) { - _, ok := ii.byCacheKey[key] - return ok, nil -} - -func (ii *importInfo) getChain(dgst digest.Digest) ([]blobs.DiffPair, error) { - cfg, ok := ii.allBlobs[dgst] - if !ok { - return nil, errors.Errorf("blob %s not found in cache", dgst) - } - parent := cfg.Parent - - var out []blobs.DiffPair - if parent != "" { - parentChain, err := ii.getChain(parent) - if err != nil { - return nil, err - } - out = parentChain - } - return append(out, blobs.DiffPair{Blobsum: dgst, DiffID: cfg.DiffID}), nil -} - -func (ii *importInfo) Lookup(ctx context.Context, key digest.Digest, msg string) (interface{}, error) { - desc, ok := ii.byCacheKey[key] - if !ok || desc.Blobsum == "" { - return nil, nil - } - var out interface{} - if err := inVertexContext(ctx, fmt.Sprintf("cache from %s for %s", ii.ref, msg), func(ctx context.Context) error { - - ch, err := ii.getChain(desc.Blobsum) - if err != nil { - return err - } - res, err := ii.fetch(ctx, ch) - if err != nil { - return err - } - out = res - return nil - }); err != nil { - return nil, err - } - return out, nil -} - -func (ii *importInfo) Set(key digest.Digest, ref interface{}) error { - return nil -} - -func (ii *importInfo) SetContentMapping(contentKey, key digest.Digest) error { - return nil -} - -func (ii *importInfo) GetContentMapping(dgst digest.Digest) ([]digest.Digest, error) { - dgsts, ok := ii.byContentKey[dgst] - if !ok { - return nil, nil - } - return dgsts, nil -} - -func (ii *importInfo) fetch(ctx context.Context, chain []blobs.DiffPair) (cache.ImmutableRef, error) { - eg, ctx := errgroup.WithContext(ctx) - for _, dp := range chain { - func(dp blobs.DiffPair) { - eg.Go(func() error { - desc, ok := ii.allDesc[dp.Blobsum] - if !ok { - return errors.Errorf("failed to find %s for fetch", dp.Blobsum) - } - if _, err := remotes.FetchHandler(ii.opt.ContentStore, ii.fetcher)(ctx, desc); err != nil { - return err - } - return nil - }) - }(dp) - } - if err := eg.Wait(); err != nil { - return nil, err - } - - cs, release := snapshot.NewContainerdSnapshotter(ii.opt.Snapshotter) - defer release() - - chainid, err := ii.unpack(ctx, chain, cs) + keysStorage, resultStorage, err := v1.NewCacheKeyStorage(cc, ci.opt.Worker) if err != nil { return nil, err } - - return ii.opt.CacheAccessor.Get(ctx, chainid, cache.WithDescription("imported cache")) // TODO: more descriptive name -} - -func (ii *importInfo) unpack(ctx context.Context, dpairs []blobs.DiffPair, s cdsnapshot.Snapshotter) (string, error) { - layers, err := ii.getLayers(ctx, dpairs) - if err != nil { - return "", err - } - - var chain []digest.Digest - for _, layer := range layers { - labels := map[string]string{ - "containerd.io/uncompressed": layer.Diff.Digest.String(), - } - if _, err := rootfs.ApplyLayer(ctx, layer, chain, s, ii.opt.Applier, cdsnapshot.WithLabels(labels)); err != nil { - return "", err - } - chain = append(chain, layer.Diff.Digest) - } - chainID := identity.ChainID(chain) - - if err := ii.fillBlobMapping(ctx, layers); err != nil { - return "", err - } - - return string(chainID), nil -} - -func (ii *importInfo) fillBlobMapping(ctx context.Context, layers []rootfs.Layer) error { - var chain []digest.Digest - for _, l := range layers { - chain = append(chain, l.Diff.Digest) - chainID := identity.ChainID(chain) - if err := ii.opt.Snapshotter.SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil { - return err - } - } - return nil -} - -func (ii *importInfo) getLayers(ctx context.Context, dpairs []blobs.DiffPair) ([]rootfs.Layer, error) { - layers := make([]rootfs.Layer, len(dpairs)) - for i := range dpairs { - layers[i].Diff = ocispec.Descriptor{ - // TODO: derive media type from compressed type - MediaType: ocispec.MediaTypeImageLayer, - Digest: dpairs[i].DiffID, - } - info, err := ii.opt.ContentStore.Info(ctx, dpairs[i].Blobsum) - if err != nil { - return nil, err - } - layers[i].Blob = ocispec.Descriptor{ - // TODO: derive media type from compressed type - MediaType: ocispec.MediaTypeImageLayerGzip, - Digest: dpairs[i].Blobsum, - Size: info.Size, - } - } - return layers, nil -} - -func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error { - v := client.Vertex{ - Digest: digest.FromBytes([]byte(buildkitidentity.NewID())), - Name: name, - } - pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest)) - notifyStarted(ctx, &v) - defer pw.Close() - err := f(ctx) - notifyCompleted(ctx, &v, err) - return err -} - -func notifyStarted(ctx context.Context, v *client.Vertex) { - pw, _, _ := progress.FromContext(ctx) - defer pw.Close() - now := time.Now() - v.Started = &now - v.Completed = nil - pw.Write(v.Digest.String(), *v) -} - -func notifyCompleted(ctx context.Context, v *client.Vertex, err error) { - pw, _, _ := progress.FromContext(ctx) - defer pw.Close() - now := time.Now() - if v.Started == nil { - v.Started = &now - } - v.Completed = &now - v.Cached = false - if err != nil { - v.Error = err.Error() - } - pw.Write(v.Digest.String(), *v) + return solver.NewCacheManager(ref, keysStorage, resultStorage), nil }