Enable to use remote snapshots for refs
Signed-off-by: ktock <ktokunaga.mail@gmail.com>v0.8
parent
4b66930270
commit
c975424deb
|
@ -385,12 +385,17 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt
|
|||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// parent refs are possibly lazy so keep it hold the description handlers.
|
||||
var dhs DescHandlers
|
||||
if mutable.parent != nil {
|
||||
dhs = mutable.parent.descHandlers
|
||||
}
|
||||
rec := &cacheRecord{
|
||||
mu: &sync.Mutex{},
|
||||
cm: cm,
|
||||
refs: make(map[ref]struct{}),
|
||||
// mutable refs are always non-lazy, so we can set parent desc handlers to nil
|
||||
parent: mutable.parentRef(false, nil),
|
||||
parent: mutable.parentRef(false, dhs),
|
||||
md: md,
|
||||
equalMutable: &mutableRef{cacheRecord: mutable},
|
||||
}
|
||||
|
@ -535,7 +540,12 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti
|
|||
|
||||
cm.records[id] = rec // TODO: save to db
|
||||
|
||||
return rec.mref(true, nil), nil
|
||||
// parent refs are possibly lazy so keep it hold the description handlers.
|
||||
var dhs DescHandlers
|
||||
if parent != nil {
|
||||
dhs = parent.descHandlers
|
||||
}
|
||||
return rec.mref(true, dhs), nil
|
||||
}
|
||||
|
||||
func (cm *cacheManager) GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) {
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
type DescHandler struct {
|
||||
Provider content.Provider
|
||||
Progress progress.Controller
|
||||
SnapshotLabels map[string]string
|
||||
}
|
||||
|
||||
type DescHandlers map[digest.Digest]*DescHandler
|
||||
|
|
|
@ -391,15 +391,69 @@ func (sr *immutableRef) Extract(ctx context.Context) (rerr error) {
|
|||
ctx = winlayers.UseWindowsLayerMode(ctx)
|
||||
}
|
||||
|
||||
if _, err := sr.prepareRemoteSnapshots(ctx, sr.descHandlers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return sr.extract(ctx, sr.descHandlers)
|
||||
}
|
||||
|
||||
func (sr *immutableRef) prepareRemoteSnapshots(ctx context.Context, dhs DescHandlers) (bool, error) {
|
||||
ok, err := sr.sizeG.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ interface{}, rerr error) {
|
||||
snapshotID := getSnapshotID(sr.md)
|
||||
if _, err := sr.cm.Snapshotter.Stat(ctx, snapshotID); err == nil {
|
||||
return true, nil
|
||||
}
|
||||
desc, err := sr.ociDesc()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
dh := dhs[desc.Digest]
|
||||
if dh == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
parentID := ""
|
||||
if sr.parent != nil {
|
||||
if ok, err := sr.parent.prepareRemoteSnapshots(ctx, dhs); !ok {
|
||||
return false, err
|
||||
}
|
||||
parentID = getSnapshotID(sr.parent.md)
|
||||
}
|
||||
|
||||
// Hint labels to the snapshotter
|
||||
labels := dh.SnapshotLabels
|
||||
if labels == nil {
|
||||
labels = make(map[string]string)
|
||||
}
|
||||
labels["containerd.io/snapshot.ref"] = snapshotID
|
||||
opt := snapshots.WithLabels(labels)
|
||||
|
||||
// Try to preapre the remote snapshot
|
||||
key := fmt.Sprintf("tmp-%s %s", identity.NewID(), sr.Info().ChainID)
|
||||
if err = sr.cm.Snapshotter.Prepare(ctx, key, parentID, opt); err != nil {
|
||||
if errdefs.IsAlreadyExists(err) {
|
||||
// Check if the targeting snapshot ID has been prepared as a remote
|
||||
// snapshot in the snapshotter.
|
||||
if _, err := sr.cm.Snapshotter.Stat(ctx, snapshotID); err == nil {
|
||||
// We can use this remote snapshot without unlazying.
|
||||
// Try the next layer as well.
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This layer cannot be prepared without unlazying.
|
||||
return false, nil
|
||||
})
|
||||
return ok.(bool), err
|
||||
}
|
||||
|
||||
func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers) error {
|
||||
_, err := sr.sizeG.Do(ctx, sr.ID()+"-extract", func(ctx context.Context) (_ interface{}, rerr error) {
|
||||
snapshotID := getSnapshotID(sr.md)
|
||||
if _, err := sr.cm.Snapshotter.Stat(ctx, snapshotID); err == nil {
|
||||
queueBlobOnly(sr.md, false)
|
||||
return nil, sr.md.Commit()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
eg, egctx := errgroup.WithContext(ctx)
|
||||
|
|
|
@ -35,7 +35,7 @@ func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, comp
|
|||
return nil, err
|
||||
}
|
||||
|
||||
mprovider := lazyMultiProvider{mprovider: contentutil.NewMultiProvider(nil)}
|
||||
mprovider := &lazyMultiProvider{mprovider: contentutil.NewMultiProvider(nil)}
|
||||
remote := &solver.Remote{
|
||||
Provider: mprovider,
|
||||
}
|
||||
|
@ -115,18 +115,19 @@ type lazyMultiProvider struct {
|
|||
plist []lazyRefProvider
|
||||
}
|
||||
|
||||
func (mp lazyMultiProvider) Add(p lazyRefProvider) {
|
||||
func (mp *lazyMultiProvider) Add(p lazyRefProvider) {
|
||||
mp.mprovider.Add(p.desc.Digest, p)
|
||||
mp.plist = append(mp.plist, p)
|
||||
}
|
||||
|
||||
func (mp lazyMultiProvider) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
|
||||
func (mp *lazyMultiProvider) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
|
||||
return mp.mprovider.ReaderAt(ctx, desc)
|
||||
}
|
||||
|
||||
func (mp lazyMultiProvider) Unlazy(ctx context.Context) error {
|
||||
func (mp *lazyMultiProvider) Unlazy(ctx context.Context) error {
|
||||
eg, egctx := errgroup.WithContext(ctx)
|
||||
for _, p := range mp.plist {
|
||||
p := p
|
||||
eg.Go(func() error {
|
||||
return p.Unlazy(egctx)
|
||||
})
|
||||
|
|
|
@ -79,6 +79,7 @@ type OCIConfig struct {
|
|||
UserRemapUnsupported string `toml:"userRemapUnsupported"`
|
||||
// For use in storing the OCI worker binary name that will replace buildkit-runc
|
||||
Binary string `toml:"binary"`
|
||||
ProxySnapshotterPath string `toml:"proxySnapshotterPath"`
|
||||
}
|
||||
|
||||
type ContainerdConfig struct {
|
||||
|
@ -89,6 +90,7 @@ type ContainerdConfig struct {
|
|||
Namespace string `toml:"namespace"`
|
||||
GCConfig
|
||||
NetworkConfig
|
||||
Snapshotter string `toml:"snapshotter"`
|
||||
}
|
||||
|
||||
type GCPolicy struct {
|
||||
|
|
|
@ -86,6 +86,11 @@ func init() {
|
|||
Usage: "path of cni binary files",
|
||||
Value: defaultConf.Workers.Containerd.NetworkConfig.CNIBinaryPath,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "containerd-worker-snapshotter",
|
||||
Usage: "snapshotter name to use",
|
||||
Value: ctd.DefaultSnapshotter,
|
||||
},
|
||||
}
|
||||
|
||||
if defaultConf.Workers.Containerd.GC == nil || *defaultConf.Workers.Containerd.GC {
|
||||
|
@ -184,6 +189,9 @@ func applyContainerdFlags(c *cli.Context, cfg *config.Config) error {
|
|||
if c.GlobalIsSet("containerd-cni-binary-dir") {
|
||||
cfg.Workers.Containerd.NetworkConfig.CNIBinaryPath = c.GlobalString("containerd-cni-binary-dir")
|
||||
}
|
||||
if c.GlobalIsSet("containerd-worker-snapshotter") {
|
||||
cfg.Workers.Containerd.Snapshotter = c.GlobalString("containerd-worker-snapshotter")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -210,7 +218,11 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
|
|||
},
|
||||
}
|
||||
|
||||
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, ctd.DefaultSnapshotter, cfg.Namespace, cfg.Labels, dns, nc, ctd.WithTimeout(60*time.Second))
|
||||
snapshotter := ctd.DefaultSnapshotter
|
||||
if cfg.Snapshotter != "" {
|
||||
snapshotter = cfg.Snapshotter
|
||||
}
|
||||
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, ctd.WithTimeout(60*time.Second))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -6,11 +6,16 @@ import (
|
|||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
fuseoverlayfs "github.com/AkihiroSuda/containerd-fuse-overlayfs"
|
||||
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
|
||||
"github.com/containerd/containerd/defaults"
|
||||
"github.com/containerd/containerd/pkg/dialer"
|
||||
ctdsnapshot "github.com/containerd/containerd/snapshots"
|
||||
"github.com/containerd/containerd/snapshots/native"
|
||||
"github.com/containerd/containerd/snapshots/overlay"
|
||||
snproxy "github.com/containerd/containerd/snapshots/proxy"
|
||||
"github.com/containerd/containerd/sys"
|
||||
"github.com/moby/buildkit/cmd/buildkitd/config"
|
||||
"github.com/moby/buildkit/executor/oci"
|
||||
|
@ -22,6 +27,8 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -50,9 +57,13 @@ func init() {
|
|||
},
|
||||
cli.StringFlag{
|
||||
Name: "oci-worker-snapshotter",
|
||||
Usage: "name of snapshotter (overlayfs or native)",
|
||||
Usage: "name of snapshotter (overlayfs, native, etc.)",
|
||||
Value: defaultConf.Workers.OCI.Snapshotter,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "oci-worker-proxy-snapshotter-path",
|
||||
Usage: "address of proxy snapshotter socket (do not include 'unix://' prefix)",
|
||||
},
|
||||
cli.StringSliceFlag{
|
||||
Name: "oci-worker-platform",
|
||||
Usage: "override supported platforms for worker",
|
||||
|
@ -193,6 +204,9 @@ func applyOCIFlags(c *cli.Context, cfg *config.Config) error {
|
|||
if c.GlobalIsSet("oci-worker-binary") {
|
||||
cfg.Workers.OCI.Binary = c.GlobalString("oci-worker-binary")
|
||||
}
|
||||
if c.GlobalIsSet("oci-worker-proxy-snapshotter-path") {
|
||||
cfg.Workers.OCI.ProxySnapshotterPath = c.GlobalString("oci-worker-proxy-snapshotter-path")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -213,7 +227,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
|
|||
return nil, err
|
||||
}
|
||||
|
||||
snFactory, err := snapshotterFactory(common.config.Root, cfg.Snapshotter)
|
||||
snFactory, err := snapshotterFactory(common.config.Root, cfg.Snapshotter, cfg.ProxySnapshotterPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -266,7 +280,36 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
|
|||
return []worker.Worker{w}, nil
|
||||
}
|
||||
|
||||
func snapshotterFactory(commonRoot, name string) (runc.SnapshotterFactory, error) {
|
||||
func snapshotterFactory(commonRoot, name, address string) (runc.SnapshotterFactory, error) {
|
||||
if address != "" {
|
||||
snFactory := runc.SnapshotterFactory{
|
||||
Name: name,
|
||||
}
|
||||
if _, err := os.Stat(address); os.IsNotExist(err) {
|
||||
return snFactory, errors.Wrapf(err, "snapshotter doesn't exist on %q (Do not include 'unix://' prefix)", address)
|
||||
}
|
||||
snFactory.New = func(root string) (ctdsnapshot.Snapshotter, error) {
|
||||
backoffConfig := backoff.DefaultConfig
|
||||
backoffConfig.MaxDelay = 3 * time.Second
|
||||
connParams := grpc.ConnectParams{
|
||||
Backoff: backoffConfig,
|
||||
}
|
||||
gopts := []grpc.DialOption{
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithConnectParams(connParams),
|
||||
grpc.WithContextDialer(dialer.ContextDialer),
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
|
||||
}
|
||||
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
||||
}
|
||||
return snproxy.NewSnapshotter(snapshotsapi.NewSnapshotsClient(conn), name), nil
|
||||
}
|
||||
return snFactory, nil
|
||||
}
|
||||
|
||||
if name == "auto" {
|
||||
if err := overlay.Supported(commonRoot); err == nil {
|
||||
name = "overlayfs"
|
||||
|
|
|
@ -3,7 +3,9 @@ package containerimage
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -13,6 +15,7 @@ import (
|
|||
"github.com/containerd/containerd/leases"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/containerd/containerd/remotes/docker"
|
||||
"github.com/containerd/containerd/snapshots"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/client/llb"
|
||||
|
@ -203,14 +206,29 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach
|
|||
progressController.Name = p.vtx.Name()
|
||||
}
|
||||
|
||||
descHandler := &cache.DescHandler{
|
||||
Provider: p.manifest.Remote.Provider,
|
||||
Progress: progressController,
|
||||
var layers string
|
||||
for _, desc := range p.manifest.Remote.Descriptors {
|
||||
layers += fmt.Sprintf("%s,", desc.Digest.String())
|
||||
}
|
||||
layers = strings.TrimSuffix(layers, ",")
|
||||
|
||||
p.descHandlers = cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
|
||||
for _, desc := range p.manifest.Remote.Descriptors {
|
||||
p.descHandlers[desc.Digest] = descHandler
|
||||
|
||||
// Hints for remote/stargz snapshotter for searching for remote snapshots
|
||||
labels := snapshots.FilterInheritedLabels(desc.Annotations)
|
||||
if labels == nil {
|
||||
labels = make(map[string]string)
|
||||
}
|
||||
labels["containerd.io/snapshot/remote/stargz.reference"] = p.manifest.Ref
|
||||
labels["containerd.io/snapshot/remote/stargz.digest"] = desc.Digest.String()
|
||||
labels["containerd.io/snapshot/remote/stargz.layers"] = layers
|
||||
|
||||
p.descHandlers[desc.Digest] = &cache.DescHandler{
|
||||
Provider: p.manifest.Remote.Provider,
|
||||
Progress: progressController,
|
||||
SnapshotLabels: labels,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue