cache: add migration flow to new lease based format
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>v0.7
parent
d35d2c1c94
commit
31a9aeea88
|
@ -896,7 +896,7 @@ func setupCacheManager(t *testing.T, tmpdir string, snapshotterName string, snap
|
|||
cm, err := cache.NewManager(cache.ManagerOpt{
|
||||
Snapshotter: snapshot.FromContainerdSnapshotter(snapshotterName, containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter(snapshotterName)), nil),
|
||||
MetadataStore: md,
|
||||
LeaseManager: leaseutil.WithNamespace(leaseutil.NewManager(mdb), "buildkit"),
|
||||
LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"),
|
||||
ContentStore: mdb.ContentStore(),
|
||||
GarbageCollect: mdb.GarbageCollect,
|
||||
})
|
||||
|
|
|
@ -115,7 +115,7 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func()
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
lm := leaseutil.NewManager(mdb)
|
||||
lm := ctdmetadata.NewLeaseManager(mdb)
|
||||
|
||||
cm, err := NewManager(ManagerOpt{
|
||||
Snapshotter: snapshot.FromContainerdSnapshotter(opt.snapshotterName, containerdsnapshot.NSSnapshotter(ns, mdb.Snapshotter(opt.snapshotterName)), nil),
|
||||
|
|
|
@ -0,0 +1,254 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/leases"
|
||||
"github.com/containerd/containerd/snapshots"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/snapshot"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func migrateChainID(si *metadata.StorageItem, all map[string]*metadata.StorageItem) (digest.Digest, digest.Digest, error) {
|
||||
diffID := digest.Digest(getDiffID(si))
|
||||
if diffID == "" {
|
||||
return "", "", nil
|
||||
}
|
||||
blobID := digest.Digest(getBlob(si))
|
||||
if blobID == "" {
|
||||
return "", "", nil
|
||||
}
|
||||
chainID := digest.Digest(getChainID(si))
|
||||
blobChainID := digest.Digest(getBlobChainID(si))
|
||||
|
||||
if chainID != "" && blobChainID != "" {
|
||||
return chainID, blobChainID, nil
|
||||
}
|
||||
|
||||
chainID = diffID
|
||||
blobChainID = digest.FromBytes([]byte(blobID + " " + diffID))
|
||||
|
||||
parent := getParent(si)
|
||||
if parent != "" {
|
||||
pChainID, pBlobChainID, err := migrateChainID(all[parent], all)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
chainID = digest.FromBytes([]byte(pChainID + " " + chainID))
|
||||
blobChainID = digest.FromBytes([]byte(pBlobChainID + " " + blobChainID))
|
||||
}
|
||||
|
||||
queueChainID(si, chainID.String())
|
||||
queueBlobChainID(si, blobChainID.String())
|
||||
|
||||
return chainID, blobChainID, si.Commit()
|
||||
}
|
||||
|
||||
func MigrateV2(ctx context.Context, from, to string, cs content.Store, s snapshot.Snapshotter, lm leases.Manager) error {
|
||||
_, err := os.Stat(to)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(errors.Cause(err)) {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = os.Stat(from)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(errors.Cause(err)) {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
tmpPath := to + ".tmp"
|
||||
tmpFile, err := os.Create(tmpPath)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
src, err := os.Open(from)
|
||||
if err != nil {
|
||||
tmpFile.Close()
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
if _, err = io.Copy(tmpFile, src); err != nil {
|
||||
tmpFile.Close()
|
||||
src.Close()
|
||||
return errors.Wrapf(err, "failed to copy db for migration")
|
||||
}
|
||||
src.Close()
|
||||
tmpFile.Close()
|
||||
|
||||
md, err := metadata.NewStore(tmpPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
items, err := md.All()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
byID := map[string]*metadata.StorageItem{}
|
||||
for _, item := range items {
|
||||
byID[item.ID()] = item
|
||||
}
|
||||
|
||||
// add committed, parent, snapshot
|
||||
for id, item := range byID {
|
||||
em := getEqualMutable(item)
|
||||
var parent string
|
||||
if em == "" {
|
||||
info, err := s.Stat(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.Kind == snapshots.KindCommitted {
|
||||
queueCommitted(item)
|
||||
}
|
||||
parent = info.Parent
|
||||
queueParent(item, parent)
|
||||
} else {
|
||||
queueCommitted(item)
|
||||
}
|
||||
queueSnapshotID(item, id)
|
||||
item.Commit()
|
||||
}
|
||||
|
||||
for _, item := range byID {
|
||||
em := getEqualMutable(item)
|
||||
if em != "" {
|
||||
if getParent(item) == "" {
|
||||
queueParent(item, getParent(byID[em]))
|
||||
item.Commit()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type diffPair struct {
|
||||
Blobsum string
|
||||
DiffID string
|
||||
}
|
||||
// move diffID, blobsum to new location
|
||||
for _, item := range byID {
|
||||
v := item.Get("blobmapping.blob")
|
||||
if v == nil {
|
||||
continue
|
||||
}
|
||||
var blob diffPair
|
||||
if err := v.Unmarshal(&blob); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
queueDiffID(item, blob.DiffID)
|
||||
queueBlob(item, blob.Blobsum)
|
||||
queueMediaType(item, images.MediaTypeDockerSchema2LayerGzip)
|
||||
if err := item.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// calculate new chainid/blobsumid
|
||||
for _, item := range byID {
|
||||
if _, _, err := migrateChainID(item, byID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ctx = context.TODO() // no cancellation allowed pass this point
|
||||
|
||||
// add new leases
|
||||
for _, item := range byID {
|
||||
l, err := lm.Create(ctx, func(l *leases.Lease) error {
|
||||
l.ID = item.ID()
|
||||
l.Labels = map[string]string{
|
||||
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
// if we are running the migration twice
|
||||
if errdefs.IsAlreadyExists(err) {
|
||||
continue
|
||||
}
|
||||
return errors.Wrap(err, "failed to create lease")
|
||||
}
|
||||
|
||||
if err := lm.AddResource(ctx, l, leases.Resource{
|
||||
ID: getSnapshotID(item),
|
||||
Type: "snapshots/" + s.Name(),
|
||||
}); err != nil {
|
||||
return errors.Wrapf(err, "failed to add snapshot %s to lease", item.ID())
|
||||
}
|
||||
|
||||
if blobID := getBlob(item); blobID != "" {
|
||||
if err := lm.AddResource(ctx, l, leases.Resource{
|
||||
ID: blobID,
|
||||
Type: "content",
|
||||
}); err != nil {
|
||||
return errors.Wrapf(err, "failed to add blob %s to lease", item.ID())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove old root labels
|
||||
for _, item := range byID {
|
||||
if _, err := s.Update(ctx, snapshots.Info{
|
||||
Name: getSnapshotID(item),
|
||||
}, "labels.containerd.io/gc.root"); err != nil {
|
||||
if !errdefs.IsNotFound(errors.Cause(err)) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if blob := getBlob(item); blob != "" {
|
||||
if _, err := cs.Update(ctx, content.Info{
|
||||
Digest: digest.Digest(blob),
|
||||
}, "labels.containerd.io/gc.root"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// previous implementation can leak views, just clean up all views
|
||||
err = s.Walk(ctx, func(ctx context.Context, info snapshots.Info) error {
|
||||
if info.Kind == snapshots.KindView {
|
||||
if _, err := s.Update(ctx, snapshots.Info{
|
||||
Name: info.Name,
|
||||
}, "labels.containerd.io/gc.root"); err != nil {
|
||||
if !errdefs.IsNotFound(errors.Cause(err)) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// switch to new DB
|
||||
if err := md.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpPath, to); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, item := range byID {
|
||||
logrus.Infof("migrated %s parent:%q snapshot:%v committed:%v blob:%v diffid:%v chainID:%v blobChainID:%v",
|
||||
item.ID(), getParent(item), getSnapshotID(item), getCommitted(item), getBlob(item), getDiffID(item), getChainID(item), getBlobChainID(item))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -495,7 +495,7 @@ func (cr *cacheRecord) finalize(ctx context.Context, commit bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
l, err := cr.cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error {
|
||||
_, err := cr.cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error {
|
||||
l.ID = cr.ID()
|
||||
l.Labels = map[string]string{
|
||||
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
|
@ -503,10 +503,12 @@ func (cr *cacheRecord) finalize(ctx context.Context, commit bool) error {
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create lease")
|
||||
if !errdefs.IsAlreadyExists(err) { // migrator adds leases for everything
|
||||
return errors.Wrap(err, "failed to create lease")
|
||||
}
|
||||
}
|
||||
|
||||
if err := cr.cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{
|
||||
if err := cr.cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: cr.ID()}, leases.Resource{
|
||||
ID: cr.ID(),
|
||||
Type: "snapshots/" + cr.cm.ManagerOpt.Snapshotter.Name(),
|
||||
}); err != nil {
|
||||
|
|
|
@ -330,7 +330,7 @@ func setupGitSource(t *testing.T, tmpdir string) source.Source {
|
|||
cm, err := cache.NewManager(cache.ManagerOpt{
|
||||
Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil),
|
||||
MetadataStore: md,
|
||||
LeaseManager: leaseutil.WithNamespace(leaseutil.NewManager(mdb), "buildkit"),
|
||||
LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"),
|
||||
ContentStore: mdb.ContentStore(),
|
||||
GarbageCollect: mdb.GarbageCollect,
|
||||
})
|
||||
|
|
|
@ -337,7 +337,7 @@ func newHTTPSource(tmpdir string) (source.Source, error) {
|
|||
cm, err := cache.NewManager(cache.ManagerOpt{
|
||||
Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil),
|
||||
MetadataStore: md,
|
||||
LeaseManager: leaseutil.WithNamespace(leaseutil.NewManager(mdb), "buildkit"),
|
||||
LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"),
|
||||
ContentStore: mdb.ContentStore(),
|
||||
GarbageCollect: mdb.GarbageCollect,
|
||||
})
|
||||
|
|
|
@ -31,10 +31,10 @@ github.com/containerd/containerd/diff
|
|||
github.com/containerd/containerd/errdefs
|
||||
github.com/containerd/containerd/filters
|
||||
github.com/containerd/containerd/gc
|
||||
github.com/containerd/containerd/images
|
||||
github.com/containerd/containerd/leases
|
||||
github.com/containerd/containerd/mount
|
||||
github.com/containerd/containerd/snapshots
|
||||
github.com/containerd/containerd/images
|
||||
github.com/containerd/containerd/remotes
|
||||
github.com/containerd/containerd/remotes/docker
|
||||
github.com/containerd/containerd/content/local
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
introspection "github.com/containerd/containerd/api/services/introspection/v1"
|
||||
"github.com/containerd/containerd/gc"
|
||||
"github.com/containerd/containerd/leases"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/executor/containerdexecutor"
|
||||
"github.com/moby/buildkit/executor/oci"
|
||||
|
@ -45,10 +46,6 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
|
|||
return base.WorkerOpt{}, errors.Wrapf(err, "failed to create %s", root)
|
||||
}
|
||||
|
||||
md, err := metadata.NewStore(filepath.Join(root, "metadata.db"))
|
||||
if err != nil {
|
||||
return base.WorkerOpt{}, err
|
||||
}
|
||||
df := client.DiffService()
|
||||
// TODO: should use containerd daemon instance ID (containerd/containerd#1862)?
|
||||
id, err := base.ID(root)
|
||||
|
@ -96,12 +93,23 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
|
|||
return base.WorkerOpt{}, err
|
||||
}
|
||||
|
||||
snap := containerdsnapshot.NewSnapshotter(snapshotterName, client.SnapshotService(snapshotterName), ns, nil)
|
||||
|
||||
if err := cache.MigrateV2(context.TODO(), filepath.Join(root, "metadata.db"), filepath.Join(root, "metadata_v2.db"), cs, snap, lm); err != nil {
|
||||
return base.WorkerOpt{}, err
|
||||
}
|
||||
|
||||
md, err := metadata.NewStore(filepath.Join(root, "metadata_v2.db"))
|
||||
if err != nil {
|
||||
return base.WorkerOpt{}, err
|
||||
}
|
||||
|
||||
opt := base.WorkerOpt{
|
||||
ID: id,
|
||||
Labels: xlabels,
|
||||
MetadataStore: md,
|
||||
Executor: containerdexecutor.New(client, root, "", np, dns),
|
||||
Snapshotter: containerdsnapshot.NewSnapshotter(snapshotterName, client.SnapshotService(snapshotterName), ns, nil),
|
||||
Snapshotter: snap,
|
||||
ContentStore: cs,
|
||||
Applier: winlayers.NewFileSystemApplierWithWindows(cs, df),
|
||||
Differ: winlayers.NewWalkingDiffWithWindows(cs, df),
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/containerd/containerd/platforms"
|
||||
ctdsnapshot "github.com/containerd/containerd/snapshots"
|
||||
"github.com/docker/docker/pkg/idtools"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/executor/oci"
|
||||
"github.com/moby/buildkit/executor/runcexecutor"
|
||||
|
@ -38,10 +39,6 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
|
|||
if err := os.MkdirAll(root, 0700); err != nil {
|
||||
return opt, err
|
||||
}
|
||||
md, err := metadata.NewStore(filepath.Join(root, "metadata.db"))
|
||||
if err != nil {
|
||||
return opt, err
|
||||
}
|
||||
|
||||
np, err := netproviders.Providers(nopt)
|
||||
if err != nil {
|
||||
|
@ -92,19 +89,30 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
|
|||
for k, v := range labels {
|
||||
xlabels[k] = v
|
||||
}
|
||||
snap := containerdsnapshot.NewSnapshotter(snFactory.Name, mdb.Snapshotter(snFactory.Name), "buildkit", idmap)
|
||||
lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit")
|
||||
if err := cache.MigrateV2(context.TODO(), filepath.Join(root, "metadata.db"), filepath.Join(root, "metadata_v2.db"), c, snap, lm); err != nil {
|
||||
return opt, err
|
||||
}
|
||||
|
||||
md, err := metadata.NewStore(filepath.Join(root, "metadata_v2.db"))
|
||||
if err != nil {
|
||||
return opt, err
|
||||
}
|
||||
|
||||
opt = base.WorkerOpt{
|
||||
ID: id,
|
||||
Labels: xlabels,
|
||||
MetadataStore: md,
|
||||
Executor: exe,
|
||||
Snapshotter: containerdsnapshot.NewSnapshotter(snFactory.Name, mdb.Snapshotter(snFactory.Name), "buildkit", idmap),
|
||||
Snapshotter: snap,
|
||||
ContentStore: c,
|
||||
Applier: winlayers.NewFileSystemApplierWithWindows(c, apply.NewFileSystemApplier(c)),
|
||||
Differ: winlayers.NewWalkingDiffWithWindows(c, walking.NewWalkingDiff(c)),
|
||||
ImageStore: nil, // explicitly
|
||||
Platforms: []specs.Platform{platforms.Normalize(platforms.DefaultSpec())},
|
||||
IdentityMapping: idmap,
|
||||
LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"),
|
||||
LeaseManager: lm,
|
||||
GarbageCollect: mdb.GarbageCollect,
|
||||
}
|
||||
return opt, nil
|
||||
|
|
Loading…
Reference in New Issue