cacheimport: implementation using v1 package

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2018-04-13 14:10:59 -07:00
parent 7b5fc36f5f
commit 355126e080
2 changed files with 89 additions and 366 deletions

View File

@ -4,36 +4,24 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/images"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/manifest/schema2"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/blobs"
v1 "github.com/moby/buildkit/cache/cacheimport/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
solver "github.com/moby/buildkit/solver-next"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/push"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const mediaTypeConfig = "application/vnd.buildkit.cacheconfig.v0"
type CacheRecord struct {
CacheKey digest.Digest
Reference cache.ImmutableRef
ContentKey digest.Digest
}
type ExporterOpt struct {
Snapshotter snapshot.Snapshotter
ContentStore content.Store
Differ diff.Comparer
SessionManager *session.Manager
}
@ -45,41 +33,15 @@ type CacheExporter struct {
opt ExporterOpt
}
func (ce *CacheExporter) Export(ctx context.Context, rec []CacheRecord, target string) error {
allBlobs := map[digest.Digest][]blobs.DiffPair{}
currentBlobs := map[digest.Digest]struct{}{}
type cr struct {
CacheRecord
dgst digest.Digest
}
func (ce *CacheExporter) ExporterForTarget(target string) *RegistryCacheExporter {
cc := v1.NewCacheChains()
return &RegistryCacheExporter{target: target, ExporterTarget: cc, chains: cc, exporter: ce}
}
list := make([]cr, 0, len(rec))
for _, r := range rec {
ref := r.Reference
if ref == nil {
list = append(list, cr{CacheRecord: r})
continue
}
dpairs, err := blobs.GetDiffPairs(ctx, ce.opt.ContentStore, ce.opt.Snapshotter, ce.opt.Differ, ref)
if err != nil {
return err
}
for i, dp := range dpairs {
allBlobs[dp.Blobsum] = dpairs[:i+1]
}
dgst := dpairs[len(dpairs)-1].Blobsum
list = append(list, cr{CacheRecord: r, dgst: dgst})
currentBlobs[dgst] = struct{}{}
}
for b := range allBlobs {
if _, ok := currentBlobs[b]; !ok {
list = append(list, cr{dgst: b})
}
func (ce *CacheExporter) Finalize(ctx context.Context, cc *v1.CacheChains, target string) error {
config, descs, err := cc.Marshal()
if err != nil {
return err
}
// own type because oci type can't be pushed and docker type doesn't have annotations
@ -90,61 +52,42 @@ func (ce *CacheExporter) Export(ctx context.Context, rec []CacheRecord, target s
Manifests []ocispec.Descriptor `json:"manifests"`
}
var config cacheConfig
var mfst manifestList
mfst.SchemaVersion = 2
mfst.MediaType = images.MediaTypeDockerSchema2ManifestList
for _, l := range list {
var size int64
var parent digest.Digest
var diffID digest.Digest
if l.dgst != "" {
info, err := ce.opt.ContentStore.Info(ctx, l.dgst)
if err != nil {
return err
}
size = info.Size
chain := allBlobs[l.dgst]
if len(chain) > 1 {
parent = chain[len(chain)-2].Blobsum
}
diffID = chain[len(chain)-1].DiffID
mfst.Manifests = append(mfst.Manifests, ocispec.Descriptor{
MediaType: schema2.MediaTypeLayer,
Size: size,
Digest: l.dgst,
})
allBlobs := map[digest.Digest]struct{}{}
mp := contentutil.NewMultiProvider(nil)
for _, l := range config.Layers {
if _, ok := allBlobs[l.Blob]; ok {
continue
}
dgstPair, ok := descs[l.Blob]
if !ok {
return errors.Errorf("missing blob %s", l.Blob)
}
allBlobs[l.Blob] = struct{}{}
mp.Add(l.Blob, dgstPair.Provider)
config.Items = append(config.Items, configItem{
Blobsum: l.dgst,
CacheKey: l.CacheKey,
ContentKey: l.ContentKey,
Parent: parent,
DiffID: diffID,
})
mfst.Manifests = append(mfst.Manifests, dgstPair.Descriptor)
}
dt, err := json.Marshal(config)
if err != nil {
return err
}
dgst := digest.FromBytes(dt)
addAsRoot := content.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
})
if err := content.WriteBlob(ctx, ce.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst, addAsRoot); err != nil {
return errors.Wrap(err, "error writing config blob")
configDone := oneOffProgress(ctx, fmt.Sprintf("writing config %s", dgst))
buf := contentutil.NewBuffer()
if err := content.WriteBlob(ctx, buf, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst); err != nil {
return configDone(errors.Wrap(err, "error writing config blob"))
}
configDone(nil)
mp.Add(dgst, buf)
mfst.Manifests = append(mfst.Manifests, ocispec.Descriptor{
MediaType: mediaTypeConfig,
MediaType: v1.CacheConfigMediaTypeV0,
Size: int64(len(dt)),
Digest: dgst,
})
@ -153,26 +96,42 @@ func (ce *CacheExporter) Export(ctx context.Context, rec []CacheRecord, target s
if err != nil {
return errors.Wrap(err, "failed to marshal manifest")
}
dgst = digest.FromBytes(dt)
if err := content.WriteBlob(ctx, ce.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst, addAsRoot); err != nil {
return errors.Wrap(err, "error writing manifest blob")
buf = contentutil.NewBuffer()
mfstDone := oneOffProgress(ctx, fmt.Sprintf("writing manifest %s", dgst))
if err := content.WriteBlob(ctx, buf, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst); err != nil {
return mfstDone(errors.Wrap(err, "error writing manifest blob"))
}
mfstDone(nil)
mp.Add(dgst, buf)
logrus.Debugf("cache-manifest: %s", dgst)
return push.Push(ctx, ce.opt.SessionManager, ce.opt.ContentStore, dgst, target, false)
return push.Push(ctx, ce.opt.SessionManager, mp, dgst, target, false)
}
type configItem struct {
Blobsum digest.Digest
CacheKey digest.Digest
ContentKey digest.Digest
Parent digest.Digest
DiffID digest.Digest
type RegistryCacheExporter struct {
solver.ExporterTarget
chains *v1.CacheChains
target string
exporter *CacheExporter
}
type cacheConfig struct {
Items []configItem
func (ce *RegistryCacheExporter) Finalize(ctx context.Context) error {
return ce.exporter.Finalize(ctx, ce.chains, ce.target)
}
func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
}
pw.Write(id, st)
return func(err error) error {
now := time.Now()
st.Completed = &now
pw.Write(id, st)
pw.Close()
return err
}
}

View File

@ -3,38 +3,25 @@ package cacheimport
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/rootfs"
cdsnapshot "github.com/containerd/containerd/snapshots"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/blobs"
"github.com/moby/buildkit/cache/instructioncache"
"github.com/moby/buildkit/client"
buildkitidentity "github.com/moby/buildkit/identity"
v1 "github.com/moby/buildkit/cache/cacheimport/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/auth"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
solver "github.com/moby/buildkit/solver-next"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/worker"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type ImportOpt struct {
SessionManager *session.Manager
ContentStore content.Store
Snapshotter snapshot.Snapshotter
Applier diff.Applier
CacheAccessor cache.Accessor
Worker worker.Worker // TODO: remove. This sets the worker where the cache is imported to. Should be passed on load instead.
}
func NewCacheImporter(opt ImportOpt) *CacheImporter {
@ -64,7 +51,7 @@ func (ci *CacheImporter) getCredentialsFromSession(ctx context.Context) func(str
}
}
func (ci *CacheImporter) pull(ctx context.Context, ref string) (*ocispec.Descriptor, remotes.Fetcher, error) {
func (ci *CacheImporter) Resolve(ctx context.Context, ref string) (solver.CacheManager, error) {
resolver := docker.NewResolver(docker.ResolverOptions{
Client: http.DefaultClient,
Credentials: ci.getCredentialsFromSession(ctx),
@ -72,28 +59,21 @@ func (ci *CacheImporter) pull(ctx context.Context, ref string) (*ocispec.Descrip
ref, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
return nil, nil, err
return nil, err
}
fetcher, err := resolver.Fetcher(ctx, ref)
if err != nil {
return nil, nil, err
}
if _, err := remotes.FetchHandler(ci.opt.ContentStore, fetcher)(ctx, desc); err != nil {
return nil, nil, err
}
return &desc, fetcher, err
}
func (ci *CacheImporter) Import(ctx context.Context, ref string) (instructioncache.InstructionCache, error) {
desc, fetcher, err := ci.pull(ctx, ref)
if err != nil {
return nil, err
}
dt, err := content.ReadBlob(ctx, ci.opt.ContentStore, desc.Digest)
b := contentutil.NewBuffer()
if _, err := remotes.FetchHandler(b, fetcher)(ctx, desc); err != nil {
return nil, err
}
dt, err := content.ReadBlob(ctx, b, desc.Digest)
if err != nil {
return nil, err
}
@ -103,258 +83,42 @@ func (ci *CacheImporter) Import(ctx context.Context, ref string) (instructioncac
return nil, err
}
allDesc := map[digest.Digest]ocispec.Descriptor{}
allBlobs := map[digest.Digest]configItem{}
byCacheKey := map[digest.Digest]configItem{}
byContentKey := map[digest.Digest][]digest.Digest{}
allLayers := v1.DescriptorProvider{}
var configDesc ocispec.Descriptor
for _, m := range mfst.Manifests {
if m.MediaType == mediaTypeConfig {
if m.MediaType == v1.CacheConfigMediaTypeV0 {
configDesc = m
continue
}
allDesc[m.Digest] = m
allLayers[m.Digest] = v1.DescriptorProviderPair{
Descriptor: m,
Provider: contentutil.FromFetcher(fetcher, m),
}
}
if configDesc.Digest == "" {
return nil, errors.Errorf("invalid build cache: %s", ref)
return nil, errors.Errorf("invalid build cache from %s", ref)
}
if _, err := remotes.FetchHandler(ci.opt.ContentStore, fetcher)(ctx, configDesc); err != nil {
if _, err := remotes.FetchHandler(b, fetcher)(ctx, configDesc); err != nil {
return nil, err
}
dt, err = content.ReadBlob(ctx, ci.opt.ContentStore, configDesc.Digest)
dt, err = content.ReadBlob(ctx, b, configDesc.Digest)
if err != nil {
return nil, err
}
var cfg cacheConfig
if err := json.Unmarshal(dt, &cfg); err != nil {
cc := v1.NewCacheChains()
if err := v1.Parse(dt, allLayers, cc); err != nil {
return nil, err
}
for _, ci := range cfg.Items {
if ci.Blobsum != "" {
allBlobs[ci.Blobsum] = ci
}
if ci.CacheKey != "" {
byCacheKey[ci.CacheKey] = ci
if ci.ContentKey != "" {
byContentKey[ci.ContentKey] = append(byContentKey[ci.ContentKey], ci.CacheKey)
}
}
}
return &importInfo{
CacheImporter: ci,
byCacheKey: byCacheKey,
byContentKey: byContentKey,
allBlobs: allBlobs,
allDesc: allDesc,
fetcher: fetcher,
ref: ref,
}, nil
}
type importInfo struct {
*CacheImporter
fetcher remotes.Fetcher
byCacheKey map[digest.Digest]configItem
byContentKey map[digest.Digest][]digest.Digest
allDesc map[digest.Digest]ocispec.Descriptor
allBlobs map[digest.Digest]configItem
ref string
}
func (ii *importInfo) Probe(ctx context.Context, key digest.Digest) (bool, error) {
_, ok := ii.byCacheKey[key]
return ok, nil
}
func (ii *importInfo) getChain(dgst digest.Digest) ([]blobs.DiffPair, error) {
cfg, ok := ii.allBlobs[dgst]
if !ok {
return nil, errors.Errorf("blob %s not found in cache", dgst)
}
parent := cfg.Parent
var out []blobs.DiffPair
if parent != "" {
parentChain, err := ii.getChain(parent)
if err != nil {
return nil, err
}
out = parentChain
}
return append(out, blobs.DiffPair{Blobsum: dgst, DiffID: cfg.DiffID}), nil
}
func (ii *importInfo) Lookup(ctx context.Context, key digest.Digest, msg string) (interface{}, error) {
desc, ok := ii.byCacheKey[key]
if !ok || desc.Blobsum == "" {
return nil, nil
}
var out interface{}
if err := inVertexContext(ctx, fmt.Sprintf("cache from %s for %s", ii.ref, msg), func(ctx context.Context) error {
ch, err := ii.getChain(desc.Blobsum)
if err != nil {
return err
}
res, err := ii.fetch(ctx, ch)
if err != nil {
return err
}
out = res
return nil
}); err != nil {
return nil, err
}
return out, nil
}
func (ii *importInfo) Set(key digest.Digest, ref interface{}) error {
return nil
}
func (ii *importInfo) SetContentMapping(contentKey, key digest.Digest) error {
return nil
}
func (ii *importInfo) GetContentMapping(dgst digest.Digest) ([]digest.Digest, error) {
dgsts, ok := ii.byContentKey[dgst]
if !ok {
return nil, nil
}
return dgsts, nil
}
func (ii *importInfo) fetch(ctx context.Context, chain []blobs.DiffPair) (cache.ImmutableRef, error) {
eg, ctx := errgroup.WithContext(ctx)
for _, dp := range chain {
func(dp blobs.DiffPair) {
eg.Go(func() error {
desc, ok := ii.allDesc[dp.Blobsum]
if !ok {
return errors.Errorf("failed to find %s for fetch", dp.Blobsum)
}
if _, err := remotes.FetchHandler(ii.opt.ContentStore, ii.fetcher)(ctx, desc); err != nil {
return err
}
return nil
})
}(dp)
}
if err := eg.Wait(); err != nil {
return nil, err
}
cs, release := snapshot.NewContainerdSnapshotter(ii.opt.Snapshotter)
defer release()
chainid, err := ii.unpack(ctx, chain, cs)
keysStorage, resultStorage, err := v1.NewCacheKeyStorage(cc, ci.opt.Worker)
if err != nil {
return nil, err
}
return ii.opt.CacheAccessor.Get(ctx, chainid, cache.WithDescription("imported cache")) // TODO: more descriptive name
}
func (ii *importInfo) unpack(ctx context.Context, dpairs []blobs.DiffPair, s cdsnapshot.Snapshotter) (string, error) {
layers, err := ii.getLayers(ctx, dpairs)
if err != nil {
return "", err
}
var chain []digest.Digest
for _, layer := range layers {
labels := map[string]string{
"containerd.io/uncompressed": layer.Diff.Digest.String(),
}
if _, err := rootfs.ApplyLayer(ctx, layer, chain, s, ii.opt.Applier, cdsnapshot.WithLabels(labels)); err != nil {
return "", err
}
chain = append(chain, layer.Diff.Digest)
}
chainID := identity.ChainID(chain)
if err := ii.fillBlobMapping(ctx, layers); err != nil {
return "", err
}
return string(chainID), nil
}
func (ii *importInfo) fillBlobMapping(ctx context.Context, layers []rootfs.Layer) error {
var chain []digest.Digest
for _, l := range layers {
chain = append(chain, l.Diff.Digest)
chainID := identity.ChainID(chain)
if err := ii.opt.Snapshotter.SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil {
return err
}
}
return nil
}
func (ii *importInfo) getLayers(ctx context.Context, dpairs []blobs.DiffPair) ([]rootfs.Layer, error) {
layers := make([]rootfs.Layer, len(dpairs))
for i := range dpairs {
layers[i].Diff = ocispec.Descriptor{
// TODO: derive media type from compressed type
MediaType: ocispec.MediaTypeImageLayer,
Digest: dpairs[i].DiffID,
}
info, err := ii.opt.ContentStore.Info(ctx, dpairs[i].Blobsum)
if err != nil {
return nil, err
}
layers[i].Blob = ocispec.Descriptor{
// TODO: derive media type from compressed type
MediaType: ocispec.MediaTypeImageLayerGzip,
Digest: dpairs[i].Blobsum,
Size: info.Size,
}
}
return layers, nil
}
func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error {
v := client.Vertex{
Digest: digest.FromBytes([]byte(buildkitidentity.NewID())),
Name: name,
}
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
notifyStarted(ctx, &v)
defer pw.Close()
err := f(ctx)
notifyCompleted(ctx, &v, err)
return err
}
func notifyStarted(ctx context.Context, v *client.Vertex) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
v.Started = &now
v.Completed = nil
pw.Write(v.Digest.String(), *v)
}
func notifyCompleted(ctx context.Context, v *client.Vertex, err error) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
if v.Started == nil {
v.Started = &now
}
v.Completed = &now
v.Cached = false
if err != nil {
v.Error = err.Error()
}
pw.Write(v.Digest.String(), *v)
return solver.NewCacheManager(ref, keysStorage, resultStorage), nil
}