commit
5e3884ca7d
|
@ -2,7 +2,9 @@ package blobs
|
|||
|
||||
import (
|
||||
gocontext "context"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/diff"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/moby/buildkit/cache"
|
||||
|
@ -17,6 +19,8 @@ import (
|
|||
|
||||
var g flightcontrol.Group
|
||||
|
||||
const containerdUncompressed = "containerd.io/uncompressed"
|
||||
|
||||
type DiffPair struct {
|
||||
DiffID digest.Digest
|
||||
Blobsum digest.Digest
|
||||
|
@ -27,7 +31,7 @@ type blobmapper interface {
|
|||
SetBlob(ctx gocontext.Context, key string, diffID, blob digest.Digest) error
|
||||
}
|
||||
|
||||
func GetDiffPairs(ctx context.Context, snapshotter snapshot.Snapshotter, differ diff.Differ, ref cache.ImmutableRef) ([]DiffPair, error) {
|
||||
func GetDiffPairs(ctx context.Context, contentStore content.Store, snapshotter snapshot.Snapshotter, differ diff.Differ, ref cache.ImmutableRef) ([]DiffPair, error) {
|
||||
blobmap, ok := snapshotter.(blobmapper)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("image exporter requires snapshotter with blobs mapping support")
|
||||
|
@ -40,7 +44,7 @@ func GetDiffPairs(ctx context.Context, snapshotter snapshot.Snapshotter, differ
|
|||
if parent != nil {
|
||||
defer parent.Release(context.TODO())
|
||||
eg.Go(func() error {
|
||||
dp, err := GetDiffPairs(ctx, snapshotter, differ, parent)
|
||||
dp, err := GetDiffPairs(ctx, contentStore, snapshotter, differ, parent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -72,15 +76,31 @@ func GetDiffPairs(ctx context.Context, snapshotter snapshot.Snapshotter, differ
|
|||
return nil, err
|
||||
}
|
||||
descr, err := differ.DiffMounts(ctx, lower, upper,
|
||||
diff.WithMediaType(ocispec.MediaTypeImageLayer),
|
||||
diff.WithReference(ref.ID()))
|
||||
diff.WithMediaType(ocispec.MediaTypeImageLayerGzip),
|
||||
diff.WithReference(ref.ID()),
|
||||
diff.WithLabels(map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := blobmap.SetBlob(ctx, ref.ID(), descr.Digest, descr.Digest); err != nil {
|
||||
info, err := contentStore.Info(ctx, descr.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return DiffPair{DiffID: descr.Digest, Blobsum: descr.Digest}, nil
|
||||
diffIDStr, ok := info.Labels[containerdUncompressed]
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid differ response with no diffID")
|
||||
}
|
||||
diffIDDigest, err := digest.Parse(diffIDStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := blobmap.SetBlob(ctx, ref.ID(), diffIDDigest, descr.Digest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return DiffPair{DiffID: diffIDDigest, Blobsum: descr.Digest}, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
gocontext "context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/diff"
|
||||
|
@ -67,7 +68,7 @@ func (ce *CacheExporter) Export(ctx context.Context, rec []CacheRecord, target s
|
|||
continue
|
||||
}
|
||||
|
||||
dpairs, err := blobs.GetDiffPairs(ctx, ce.opt.Snapshotter, ce.opt.Differ, ref)
|
||||
dpairs, err := blobs.GetDiffPairs(ctx, ce.opt.ContentStore, ce.opt.Snapshotter, ce.opt.Differ, ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -140,7 +141,11 @@ func (ce *CacheExporter) Export(ctx context.Context, rec []CacheRecord, target s
|
|||
|
||||
dgst := digest.FromBytes(dt)
|
||||
|
||||
if err := content.WriteBlob(ctx, ce.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst); err != nil {
|
||||
addAsRoot := content.WithLabels(map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
})
|
||||
|
||||
if err := content.WriteBlob(ctx, ce.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst, addAsRoot); err != nil {
|
||||
return errors.Wrap(err, "error writing config blob")
|
||||
}
|
||||
|
||||
|
@ -157,7 +162,7 @@ func (ce *CacheExporter) Export(ctx context.Context, rec []CacheRecord, target s
|
|||
|
||||
dgst = digest.FromBytes(dt)
|
||||
|
||||
if err := content.WriteBlob(ctx, ce.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst); err != nil {
|
||||
if err := content.WriteBlob(ctx, ce.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst, addAsRoot); err != nil {
|
||||
return errors.Wrap(err, "error writing manifest blob")
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -10,7 +11,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/moby/buildkit/worker/containerdworker"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
|
@ -41,7 +44,7 @@ func newContainerdPullDeps(client *containerd.Client) *pullDeps {
|
|||
diff := client.DiffService()
|
||||
return &pullDeps{
|
||||
Snapshotter: client.SnapshotService(containerd.DefaultSnapshotter),
|
||||
ContentStore: client.ContentStore(),
|
||||
ContentStore: &noGCContentStore{client.ContentStore()},
|
||||
Applier: diff,
|
||||
Differ: diff,
|
||||
Images: client.ImageService(),
|
||||
|
@ -56,3 +59,28 @@ func dialer(address string, timeout time.Duration) (net.Conn, error) {
|
|||
func dialAddress(address string) string {
|
||||
return fmt.Sprintf("unix://%s", address)
|
||||
}
|
||||
|
||||
// TODO: Replace this with leases
|
||||
|
||||
type noGCContentStore struct {
|
||||
content.Store
|
||||
}
|
||||
type noGCWriter struct {
|
||||
content.Writer
|
||||
}
|
||||
|
||||
func (cs *noGCContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
|
||||
w, err := cs.Store.Writer(ctx, ref, size, expected)
|
||||
return &noGCWriter{w}, err
|
||||
}
|
||||
|
||||
func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
opts = append(opts, func(info *content.Info) error {
|
||||
if info.Labels == nil {
|
||||
info.Labels = map[string]string{}
|
||||
}
|
||||
info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano)
|
||||
return nil
|
||||
})
|
||||
return w.Writer.Commit(ctx, size, expected, opts...)
|
||||
}
|
||||
|
|
|
@ -7,13 +7,17 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/content/local"
|
||||
"github.com/containerd/containerd/diff/walking"
|
||||
"github.com/containerd/containerd/metadata"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
ctdsnapshot "github.com/containerd/containerd/snapshot"
|
||||
"github.com/containerd/containerd/snapshot/overlay"
|
||||
"github.com/moby/buildkit/worker/runcworker"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
|
@ -55,13 +59,29 @@ func newStandalonePullDeps(root string) (*pullDeps, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
db, err := bolt.Open(filepath.Join(root, "containerdmeta.db"), 0644, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mdb := metadata.NewDB(db, c, map[string]ctdsnapshot.Snapshotter{
|
||||
"overlay": s,
|
||||
})
|
||||
if err := mdb.Init(context.TODO()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c = &nsContent{mdb.ContentStore()}
|
||||
|
||||
df, err := walking.NewWalkingDiff(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: call mdb.GarbageCollect . maybe just inject it into nsSnapshotter.Remove and csContent.Delete
|
||||
|
||||
return &pullDeps{
|
||||
Snapshotter: &nsSnapshotter{s},
|
||||
Snapshotter: &nsSnapshotter{mdb.Snapshotter("overlay")},
|
||||
ContentStore: c,
|
||||
Applier: df,
|
||||
Differ: df,
|
||||
|
@ -71,6 +91,55 @@ func newStandalonePullDeps(root string) (*pullDeps, error) {
|
|||
// this should be supported by containerd. currently packages are unusable without wrapping
|
||||
const dummyNs = "buildkit"
|
||||
|
||||
type nsContent struct {
|
||||
content.Store
|
||||
}
|
||||
|
||||
func (c *nsContent) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
|
||||
ctx = namespaces.WithNamespace(ctx, dummyNs)
|
||||
return c.Store.Info(ctx, dgst)
|
||||
}
|
||||
|
||||
func (c *nsContent) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
|
||||
ctx = namespaces.WithNamespace(ctx, dummyNs)
|
||||
return c.Store.Update(ctx, info, fieldpaths...)
|
||||
}
|
||||
|
||||
func (c *nsContent) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
|
||||
ctx = namespaces.WithNamespace(ctx, dummyNs)
|
||||
return c.Store.Walk(ctx, fn, filters...)
|
||||
}
|
||||
|
||||
func (c *nsContent) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
ctx = namespaces.WithNamespace(ctx, dummyNs)
|
||||
return c.Store.Delete(ctx, dgst)
|
||||
}
|
||||
|
||||
func (c *nsContent) Status(ctx context.Context, ref string) (content.Status, error) {
|
||||
ctx = namespaces.WithNamespace(ctx, dummyNs)
|
||||
return c.Store.Status(ctx, ref)
|
||||
}
|
||||
|
||||
func (c *nsContent) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
|
||||
ctx = namespaces.WithNamespace(ctx, dummyNs)
|
||||
return c.Store.ListStatuses(ctx, filters...)
|
||||
}
|
||||
|
||||
func (c *nsContent) Abort(ctx context.Context, ref string) error {
|
||||
ctx = namespaces.WithNamespace(ctx, dummyNs)
|
||||
return c.Store.Abort(ctx, ref)
|
||||
}
|
||||
|
||||
func (c *nsContent) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
|
||||
ctx = namespaces.WithNamespace(ctx, dummyNs)
|
||||
return c.Store.ReaderAt(ctx, dgst)
|
||||
}
|
||||
|
||||
func (c *nsContent) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
|
||||
ctx = namespaces.WithNamespace(ctx, dummyNs)
|
||||
return c.Store.Writer(ctx, ref, size, expected)
|
||||
}
|
||||
|
||||
type nsSnapshotter struct {
|
||||
ctdsnapshot.Snapshotter
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ func (e *imageExporterInstance) Name() string {
|
|||
|
||||
func (e *imageExporterInstance) Export(ctx context.Context, ref cache.ImmutableRef, opt map[string][]byte) error {
|
||||
layersDone := oneOffProgress(ctx, "exporting layers")
|
||||
diffPairs, err := blobs.GetDiffPairs(ctx, e.opt.Snapshotter, e.opt.Differ, ref)
|
||||
diffPairs, err := blobs.GetDiffPairs(ctx, e.opt.ContentStore, e.opt.Snapshotter, e.opt.Differ, ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -105,10 +105,14 @@ func (e *imageExporterInstance) Export(ctx context.Context, ref cache.ImmutableR
|
|||
}
|
||||
}
|
||||
|
||||
addAsRoot := content.WithLabels(map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
})
|
||||
|
||||
dgst := digest.FromBytes(dt)
|
||||
configDone := oneOffProgress(ctx, "exporting config "+dgst.String())
|
||||
|
||||
if err := content.WriteBlob(ctx, e.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst); err != nil {
|
||||
if err := content.WriteBlob(ctx, e.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst, addAsRoot); err != nil {
|
||||
return configDone(errors.Wrap(err, "error writing config blob"))
|
||||
}
|
||||
configDone(nil)
|
||||
|
@ -143,7 +147,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, ref cache.ImmutableR
|
|||
dgst = digest.FromBytes(dt)
|
||||
mfstDone := oneOffProgress(ctx, "exporting manifest "+dgst.String())
|
||||
|
||||
if err := content.WriteBlob(ctx, e.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst); err != nil {
|
||||
if err := content.WriteBlob(ctx, e.opt.ContentStore, dgst.String(), bytes.NewReader(dt), int64(len(dt)), dgst, addAsRoot); err != nil {
|
||||
return mfstDone(errors.Wrap(err, "error writing manifest blob"))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue