parent
a9209caa93
commit
6be136db6e
|
@ -273,13 +273,6 @@ func (cm *cacheManager) Get(ctx context.Context, id string, opts ...RefOption) (
|
|||
return cm.get(ctx, id, opts...)
|
||||
}
|
||||
|
||||
// Get returns an immutable snapshot reference for ID
|
||||
// func (cm *cacheManager) GetFromSnapshotter(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) {
|
||||
// cm.mu.Lock()
|
||||
// defer cm.mu.Unlock()
|
||||
// return cm.get(ctx, id, opts...)
|
||||
// }
|
||||
|
||||
func (cm *cacheManager) Metadata(id string) *metadata.StorageItem {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
|
|
@ -542,6 +542,8 @@ func TestSetBlob(t *testing.T) {
|
|||
require.Equal(t, info.Extracted, true)
|
||||
|
||||
ctx, clean, err := leaseutil.WithLease(ctx, co.lm)
|
||||
require.NoError(t, err)
|
||||
defer clean(context.TODO())
|
||||
|
||||
b, desc, err := mapToBlob(map[string]string{"foo": "bar"})
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -1,157 +0,0 @@
|
|||
package blobmapping
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/snapshots"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/snapshot"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const blobKey = "blobmapping.blob"
|
||||
|
||||
type Opt struct {
|
||||
Content content.Store
|
||||
Snapshotter snapshot.Snapshotter
|
||||
MetadataStore *metadata.Store
|
||||
}
|
||||
|
||||
type Info struct {
|
||||
snapshots.Info
|
||||
Blob string
|
||||
}
|
||||
|
||||
type DiffPair struct {
|
||||
Blobsum digest.Digest
|
||||
DiffID digest.Digest
|
||||
}
|
||||
|
||||
// this snapshotter keeps an internal mapping between a snapshot and a blob
|
||||
|
||||
type Snapshotter struct {
|
||||
snapshot.Snapshotter
|
||||
opt Opt
|
||||
}
|
||||
|
||||
func NewSnapshotter(opt Opt) snapshot.Snapshotter {
|
||||
s := &Snapshotter{
|
||||
Snapshotter: opt.Snapshotter,
|
||||
opt: opt,
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Remove also removes a reference to a blob. If it is a last reference then it deletes it the blob as well
|
||||
// Remove is not safe to be called concurrently
|
||||
func (s *Snapshotter) Remove(ctx context.Context, key string) error {
|
||||
_, blob, err := s.GetBlob(ctx, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blobs, err := s.opt.MetadataStore.Search(index(blob))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.Snapshotter.Remove(ctx, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(blobs) == 1 && blobs[0].ID() == key { // last snapshot
|
||||
if err := s.opt.Content.Delete(ctx, blob); err != nil {
|
||||
logrus.Errorf("failed to delete blob %v: %+v", blob, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
|
||||
return snapshots.Usage{}, errors.Errorf("to-be-removed")
|
||||
|
||||
u, err := s.Snapshotter.Usage(ctx, key)
|
||||
if err != nil {
|
||||
return snapshots.Usage{}, err
|
||||
}
|
||||
_, blob, err := s.GetBlob(ctx, key)
|
||||
if err != nil {
|
||||
return u, err
|
||||
}
|
||||
if blob != "" {
|
||||
info, err := s.opt.Content.Info(ctx, blob)
|
||||
if err != nil {
|
||||
return u, err
|
||||
}
|
||||
(&u).Add(snapshots.Usage{Size: info.Size, Inodes: 1})
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func (s *Snapshotter) GetBlob(ctx context.Context, key string) (digest.Digest, digest.Digest, error) {
|
||||
return "", "", errors.Errorf("to-be-removed")
|
||||
|
||||
md, _ := s.opt.MetadataStore.Get(key)
|
||||
v := md.Get(blobKey)
|
||||
if v == nil {
|
||||
return "", "", nil
|
||||
}
|
||||
var blob DiffPair
|
||||
if err := v.Unmarshal(&blob); err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
return blob.DiffID, blob.Blobsum, nil
|
||||
}
|
||||
|
||||
// Validates that there is no blob associated with the snapshot.
|
||||
// Checks that there is a blob in the content store.
|
||||
// If same blob has already been set then this is a noop.
|
||||
func (s *Snapshotter) SetBlob(ctx context.Context, key string, diffID, blobsum digest.Digest) error {
|
||||
return errors.Errorf("SetBlob should not be called")
|
||||
info, err := s.opt.Content.Info(ctx, blobsum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, ok := info.Labels["containerd.io/uncompressed"]; !ok {
|
||||
labels := map[string]string{
|
||||
"containerd.io/uncompressed": diffID.String(),
|
||||
}
|
||||
if _, err := s.opt.Content.Update(ctx, content.Info{
|
||||
Digest: blobsum,
|
||||
Labels: labels,
|
||||
}, "labels.containerd.io/uncompressed"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// update gc.root cause blob might be held by lease only
|
||||
if _, err := s.opt.Content.Update(ctx, content.Info{
|
||||
Digest: blobsum,
|
||||
Labels: map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
},
|
||||
}, "labels.containerd.io/gc.root"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
md, _ := s.opt.MetadataStore.Get(key)
|
||||
|
||||
v, err := metadata.NewValue(DiffPair{DiffID: diffID, Blobsum: blobsum})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v.Index = index(blobsum)
|
||||
|
||||
return md.Update(func(b *bolt.Bucket) error {
|
||||
return md.SetValue(b, blobKey, v)
|
||||
})
|
||||
}
|
||||
|
||||
func index(blob digest.Digest) string {
|
||||
return "blobmap::" + blob.String()
|
||||
}
|
|
@ -10,8 +10,6 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type garbageCollectFn func(context.Context) error
|
||||
|
||||
func NewContentStore(store content.Store, ns string) content.Store {
|
||||
return &nsContent{ns, store}
|
||||
}
|
||||
|
@ -82,26 +80,3 @@ func (w *nsWriter) Commit(ctx context.Context, size int64, expected digest.Diges
|
|||
ctx = namespaces.WithNamespace(ctx, w.ns)
|
||||
return w.Writer.Commit(ctx, size, expected, opts...)
|
||||
}
|
||||
|
||||
// type noGCContentStore struct {
|
||||
// content.Store
|
||||
// }
|
||||
// type noGCWriter struct {
|
||||
// content.Writer
|
||||
// }
|
||||
|
||||
// func (cs *noGCContentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
|
||||
// w, err := cs.Store.Writer(ctx, opts...)
|
||||
// return &noGCWriter{w}, err
|
||||
// }
|
||||
|
||||
// func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
// opts = append(opts, func(info *content.Info) error {
|
||||
// if info.Labels == nil {
|
||||
// info.Labels = map[string]string{}
|
||||
// }
|
||||
// info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano)
|
||||
// return nil
|
||||
// })
|
||||
// return w.Writer.Commit(ctx, size, expected, opts...)
|
||||
// }
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/diff"
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
"github.com/moby/buildkit/util/flightcontrol"
|
||||
"github.com/moby/buildkit/util/imageutil"
|
||||
"github.com/moby/buildkit/util/leaseutil"
|
||||
"github.com/moby/buildkit/util/progress"
|
||||
"github.com/moby/buildkit/util/pull"
|
||||
"github.com/moby/buildkit/util/resolver"
|
||||
"github.com/moby/buildkit/util/winlayers"
|
||||
|
@ -210,11 +212,13 @@ func (p *puller) Snapshot(ctx context.Context) (ir cache.ImmutableRef, err error
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
extractDone := oneOffProgress(ctx, "unpacking "+pulled.Ref)
|
||||
var current cache.ImmutableRef
|
||||
defer func() {
|
||||
if err != nil && current != nil {
|
||||
current.Release(context.TODO())
|
||||
}
|
||||
extractDone(err)
|
||||
}()
|
||||
for _, l := range pulled.Layers {
|
||||
ref, err := p.CacheAccessor.GetByBlob(ctx, l, current, cache.WithDescription("pulled from "+pulled.Ref))
|
||||
|
@ -278,3 +282,20 @@ func cacheKeyFromConfig(dt []byte) digest.Digest {
|
|||
}
|
||||
return identity.ChainID(img.RootFS.DiffIDs)
|
||||
}
|
||||
|
||||
func oneOffProgress(ctx context.Context, id string) func(err error) error {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
now := time.Now()
|
||||
st := progress.Status{
|
||||
Started: &now,
|
||||
}
|
||||
pw.Write(id, st)
|
||||
return func(err error) error {
|
||||
// TODO: set error on status
|
||||
now := time.Now()
|
||||
st.Completed = &now
|
||||
pw.Write(id, st)
|
||||
pw.Close()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -196,43 +196,11 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// for _, l := range layerBlobs {
|
||||
// labels := map[string]string{}
|
||||
// var fields []string
|
||||
// for _, nl := range notLayerBlobs {
|
||||
// k := "containerd.io/gc.ref.content." + nl.Digest.Hex()[:12]
|
||||
// labels[k] = nl.Digest.String()
|
||||
// fields = append(fields, "labels."+k)
|
||||
// }
|
||||
// if _, err := p.ContentStore.Update(ctx, content.Info{
|
||||
// Digest: l.Digest,
|
||||
// Labels: labels,
|
||||
// }, fields...); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
|
||||
// for _, nl := range append(notLayerBlobs, unusedBlobs...) {
|
||||
// if err := p.ContentStore.Delete(ctx, nl.Digest); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
|
||||
layers, err := getLayers(ctx, p.ContentStore, p.desc, platform)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// csh, release := snapshot.NewContainerdSnapshotter(p.Snapshotter)
|
||||
// defer release()
|
||||
|
||||
// unpackProgressDone := oneOffProgress(ctx, "unpacking "+p.Src.String())
|
||||
// chainid, err := unpack(ctx, p.desc, p.ContentStore, csh, p.Snapshotter, p.Applier, platform)
|
||||
// if err != nil {
|
||||
// return nil, unpackProgressDone(err)
|
||||
// }
|
||||
// unpackProgressDone(nil)
|
||||
|
||||
return &Pulled{
|
||||
Ref: p.ref,
|
||||
Descriptor: p.desc,
|
||||
|
@ -241,46 +209,6 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// func unpack(ctx context.Context, desc ocispec.Descriptor, cs content.Store, csh ctdsnapshot.Snapshotter, s snapshot.Snapshotter, applier diff.Applier, platform platforms.MatchComparer) (digest.Digest, error) {
|
||||
// layers, err := getLayers(ctx, cs, desc, platform)
|
||||
// if err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
|
||||
// var chain []digest.Digest
|
||||
// for _, layer := range layers {
|
||||
// labels := map[string]string{
|
||||
// "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
// }
|
||||
// if _, err := rootfs.ApplyLayer(ctx, layer, chain, csh, applier, ctdsnapshot.WithLabels(labels)); err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
// chain = append(chain, layer.Diff.Digest)
|
||||
// }
|
||||
// chainID := identity.ChainID(chain)
|
||||
// if err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
|
||||
// if err := fillBlobMapping(ctx, s, layers); err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
|
||||
// return chainID, nil
|
||||
// }
|
||||
|
||||
// func fillBlobMapping(ctx context.Context, s snapshot.Snapshotter, layers []rootfs.Layer) error {
|
||||
// var chain []digest.Digest
|
||||
// for _, l := range layers {
|
||||
// chain = append(chain, l.Diff.Digest)
|
||||
// chainID := identity.ChainID(chain)
|
||||
// if err := s.SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
// }
|
||||
|
||||
func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Descriptor, platform platforms.MatchComparer) ([]ocispec.Descriptor, error) {
|
||||
manifest, err := images.Manifest(ctx, provider, desc, platform)
|
||||
if err != nil {
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/leases"
|
||||
"github.com/containerd/containerd/remotes/docker"
|
||||
"github.com/containerd/containerd/rootfs"
|
||||
"github.com/docker/docker/pkg/idtools"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/blobs"
|
||||
|
@ -468,48 +467,6 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cac
|
|||
return current, nil
|
||||
}
|
||||
|
||||
// func (w *Worker) unpack(ctx context.Context, cm cache.Manager, descs []ocispec.Descriptor, s cdsnapshot.Snapshotter) (ids []string, refs []cache.ImmutableRef, err error) {
|
||||
// defer func() {
|
||||
// if err != nil {
|
||||
// for _, r := range refs {
|
||||
// r.Release(context.TODO())
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
|
||||
// layers, err := getLayers(ctx, descs)
|
||||
// if err != nil {
|
||||
// return nil, nil, err
|
||||
// }
|
||||
|
||||
// var chain []digest.Digest
|
||||
// for _, layer := range layers {
|
||||
// newChain := append(chain, layer.Diff.Digest)
|
||||
|
||||
// chainID := ociidentity.ChainID(newChain)
|
||||
// ref, err := cm.Get(ctx, string(chainID))
|
||||
// if err == nil {
|
||||
// refs = append(refs, ref)
|
||||
// } else {
|
||||
// if _, err := rootfs.ApplyLayer(ctx, layer, chain, s, w.Applier); err != nil {
|
||||
// return nil, nil, err
|
||||
// }
|
||||
// }
|
||||
// chain = newChain
|
||||
|
||||
// if err := w.Snapshotter.SetBlob(ctx, string(chainID), layer.Diff.Digest, layer.Blob.Digest); err != nil {
|
||||
// return nil, nil, err
|
||||
// }
|
||||
// }
|
||||
|
||||
// ids = make([]string, len(chain))
|
||||
// for i := range chain {
|
||||
// ids[i] = string(ociidentity.ChainID(chain[:i+1]))
|
||||
// }
|
||||
|
||||
// return ids, refs, nil
|
||||
// }
|
||||
|
||||
// Labels returns default labels
|
||||
// utility function. could be moved to the constructor logic?
|
||||
func Labels(executor, snapshotter string) map[string]string {
|
||||
|
@ -542,30 +499,6 @@ func ID(root string) (string, error) {
|
|||
return string(b), nil
|
||||
}
|
||||
|
||||
func getLayers(ctx context.Context, descs []ocispec.Descriptor) ([]rootfs.Layer, error) {
|
||||
layers := make([]rootfs.Layer, len(descs))
|
||||
for i, desc := range descs {
|
||||
diffIDStr := desc.Annotations["containerd.io/uncompressed"]
|
||||
if diffIDStr == "" {
|
||||
return nil, errors.Errorf("%s missing uncompressed digest", desc.Digest)
|
||||
}
|
||||
diffID, err := digest.Parse(diffIDStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
layers[i].Diff = ocispec.Descriptor{
|
||||
MediaType: ocispec.MediaTypeImageLayer,
|
||||
Digest: diffID,
|
||||
}
|
||||
layers[i].Blob = ocispec.Descriptor{
|
||||
MediaType: desc.MediaType,
|
||||
Digest: desc.Digest,
|
||||
Size: desc.Size,
|
||||
}
|
||||
}
|
||||
return layers, nil
|
||||
}
|
||||
|
||||
func oneOffProgress(ctx context.Context, id string) func(err error) error {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
now := time.Now()
|
||||
|
|
Loading…
Reference in New Issue