move ExecOps mounting logic into new package so logic can be reused from gateway exec

Signed-off-by: Cory Bennett <cbennett@netflix.com>
v0.8
Cory Bennett 2020-09-15 06:40:05 +00:00
parent dc3d45de87
commit 3cc8aa0649
16 changed files with 1159 additions and 919 deletions

View File

@ -571,7 +571,8 @@ func testClientGatewayContainerMounts(t *testing.T, sb integration.Sandbox) {
b := func(ctx context.Context, c client.Client) (*client.Result, error) {
mounts := map[string]llb.State{
"/": llb.Image("busybox:latest").Run(
llb.Shlex("touch /root-file"),
llb.Shlex("touch /root-file /cached/cache-file"),
llb.AddMount("/cached", llb.Scratch(), llb.AsPersistentCacheDir(t.Name(), llb.CacheMountShared)),
).Root(),
"/foo": llb.Image("busybox:latest").Run(
llb.Shlex("touch foo-file"),
@ -582,7 +583,17 @@ func testClientGatewayContainerMounts(t *testing.T, sb integration.Sandbox) {
// TODO How do we get a results.Ref for a cache mount, tmpfs mount
}
containerMounts := []client.Mount{}
containerMounts := []client.Mount{{
Dest: "/cached",
MountType: pb.MountType_CACHE,
CacheOpt: &pb.CacheOpt{
ID: t.Name(),
Sharing: pb.CacheSharingOpt_SHARED,
},
}, {
Dest: "/tmpfs",
MountType: pb.MountType_TMPFS,
}}
for mountpoint, st := range mounts {
def, err := st.Marshal(ctx)
@ -639,6 +650,22 @@ func testClientGatewayContainerMounts(t *testing.T, sb integration.Sandbox) {
err = pid.Wait()
require.NoError(t, err)
pid, err = ctr.Start(ctx, client.StartRequest{
Args: []string{"test", "-f", "/cached/cache-file"},
Cwd: "/",
})
require.NoError(t, err)
err = pid.Wait()
require.NoError(t, err)
pid, err = ctr.Start(ctx, client.StartRequest{
Args: []string{"test", "-w", "/tmpfs"},
Cwd: "/",
})
require.NoError(t, err)
err = pid.Wait()
require.NoError(t, err)
return &client.Result{}, ctr.Release(ctx)
}

View File

@ -7,6 +7,7 @@ import (
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/executor"
gw "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/pb"
digest "github.com/opencontainers/go-digest"
)
@ -19,6 +20,7 @@ type FrontendLLBBridge interface {
executor.Executor
Solve(ctx context.Context, req SolveRequest, sid string) (*Result, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (digest.Digest, []byte, error)
SessionManager() *session.Manager
}
type SolveRequest = gw.SolveRequest

View File

@ -36,6 +36,9 @@ type Mount struct {
Ref Reference
Readonly bool
MountType pb.MountType
CacheOpt *pb.CacheOpt
SecretOpt *pb.SecretOpt
SSHOpt *pb.SSHOpt
}
// Container is used to start new processes inside a container and release the

View File

@ -2,13 +2,17 @@ package gateway
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver/mounts"
opspb "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/stack"
utilsystem "github.com/moby/buildkit/util/system"
@ -34,9 +38,24 @@ type Mount struct {
Readonly bool
MountType opspb.MountType
RefProxy solver.ResultProxy
CacheOpt *opspb.CacheOpt
SecretOpt *opspb.SecretOpt
SSHOpt *opspb.SSHOpt
}
func NewContainer(ctx context.Context, e executor.Executor, req NewContainerRequest) (client.Container, error) {
func toProtoMount(m Mount) *opspb.Mount {
return &opspb.Mount{
Selector: m.Selector,
Dest: m.Dest,
Readonly: m.Readonly,
MountType: m.MountType,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
}
}
func NewContainer(ctx context.Context, e executor.Executor, sm *session.Manager, g session.Group, req NewContainerRequest) (client.Container, error) {
ctx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
ctr := &gatewayContainer{
@ -49,41 +68,130 @@ func NewContainer(ctx context.Context, e executor.Executor, req NewContainerRequ
cancel: cancel,
}
for _, m := range req.Mounts {
res, err := m.RefProxy.Result(ctx)
makeMutable := func(worker worker.Worker, ref cache.ImmutableRef) (cache.MutableRef, error) {
mRef, err := worker.CacheManager().New(ctx, ref)
if err != nil {
return nil, stack.Enable(err)
}
workerRef, ok := res.Sys().(*worker.WorkerRef)
if !ok {
return nil, stack.Enable(errors.Errorf("invalid reference for exec %T", res.Sys()))
ctr.cleanup = append(ctr.cleanup, func() error {
return stack.Enable(mRef.Release(context.TODO()))
})
return mRef, nil
}
var mm mounts.MountManager
mnts := req.Mounts
for i, m := range mnts {
if m.Dest == opspb.RootMount && m.RefProxy != nil {
res, err := m.RefProxy.Result(ctx)
if err != nil {
return nil, stack.Enable(err)
}
workerRef, ok := res.Sys().(*worker.WorkerRef)
if !ok {
return nil, errors.Errorf("invalid reference for exec %T", res.Sys())
}
name := fmt.Sprintf("container %s", req.ContainerID)
mm = mounts.NewMountManager(name, workerRef.Worker.CacheManager(), sm, workerRef.Worker.MetadataStore())
ctr.rootFS = workerRef.ImmutableRef
if !m.Readonly {
ctr.rootFS, err = makeMutable(workerRef.Worker, workerRef.ImmutableRef)
if err != nil {
return nil, stack.Enable(err)
}
}
// delete root mount from list, handled here
mnts = append(mnts[:i], mnts[i+1:]...)
break
}
}
if ctr.rootFS == nil {
return nil, errors.Errorf("root mount required")
}
for _, m := range mnts {
var ref cache.ImmutableRef
var mountable cache.Mountable
if m.RefProxy != nil {
res, err := m.RefProxy.Result(ctx)
if err != nil {
return nil, stack.Enable(err)
}
workerRef, ok := res.Sys().(*worker.WorkerRef)
if !ok {
return nil, errors.Errorf("invalid reference for exec %T", res.Sys())
}
ref = workerRef.ImmutableRef
mountable = ref
if !m.Readonly {
mountable, err = makeMutable(workerRef.Worker, ref)
if err != nil {
return nil, stack.Enable(err)
}
}
}
switch m.MountType {
case opspb.MountType_BIND:
// nothing to do here
case opspb.MountType_CACHE:
mRef, err := mm.MountableCache(ctx, toProtoMount(m), ref)
if err != nil {
return nil, err
}
mountable = mRef
ctr.cleanup = append(ctr.cleanup, func() error {
return stack.Enable(mRef.Release(context.TODO()))
})
case opspb.MountType_TMPFS:
mountable = mm.MountableTmpFS()
case opspb.MountType_SECRET:
var err error
mountable, err = mm.MountableSecret(ctx, toProtoMount(m), g)
if err != nil {
return nil, err
}
if mountable == nil {
continue
}
case opspb.MountType_SSH:
var err error
mountable, err = mm.MountableSSH(ctx, toProtoMount(m), g)
if err != nil {
return nil, err
}
if mountable == nil {
continue
}
default:
return nil, errors.Errorf("mount type %s not implemented", m.MountType)
}
// validate that there is a mount
if mountable == nil {
return nil, errors.Errorf("mount %s has no input", m.Dest)
}
execMount := executor.Mount{
Src: workerRef.ImmutableRef,
Src: mountable,
Selector: m.Selector,
Dest: m.Dest,
Readonly: m.Readonly,
}
if !m.Readonly {
ref, err := workerRef.Worker.CacheManager().New(ctx, workerRef.ImmutableRef)
if err != nil {
return nil, stack.Enable(err)
}
ctr.cleanup = append(ctr.cleanup, func() error {
return stack.Enable(ref.Release(context.TODO()))
})
execMount.Src = ref
}
if m.Dest == "/" {
ctr.rootFS = execMount.Src
} else {
ctr.mounts = append(ctr.mounts, execMount)
}
ctr.mounts = append(ctr.mounts, execMount)
}
// sort mounts so parents are mounted first
sort.Slice(ctr.mounts, func(i, j int) bool {
return ctr.mounts[i].Dest < ctr.mounts[j].Dest
})
return ctr, nil
}

View File

@ -13,6 +13,7 @@ import (
"github.com/moby/buildkit/frontend/gateway/client"
gwpb "github.com/moby/buildkit/frontend/gateway/pb"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
opspb "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/apicaps"
@ -159,9 +160,13 @@ func (c *bridgeClient) NewContainer(ctx context.Context, req client.NewContainer
}
for _, m := range req.Mounts {
refProxy, ok := m.Ref.(*ref)
if !ok {
return nil, errors.Errorf("unexpected Ref type: %T", m.Ref)
var refProxy solver.ResultProxy
if m.Ref != nil {
var ok bool
refProxy, ok = m.Ref.(*ref)
if !ok {
return nil, errors.Errorf("unexpected Ref type: %T", m.Ref)
}
}
ctrReq.Mounts = append(ctrReq.Mounts, gateway.Mount{
Dest: m.Dest,
@ -169,10 +174,14 @@ func (c *bridgeClient) NewContainer(ctx context.Context, req client.NewContainer
Readonly: m.Readonly,
MountType: m.MountType,
RefProxy: refProxy,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
})
}
ctr, err := gateway.NewContainer(ctx, c, ctrReq)
group := session.NewGroup(c.sid)
ctr, err := gateway.NewContainer(ctx, c, c.SessionManager(), group, ctrReq)
if err != nil {
return nil, err
}

View File

@ -27,6 +27,7 @@ import (
gwclient "github.com/moby/buildkit/frontend/gateway/client"
pb "github.com/moby/buildkit/frontend/gateway/pb"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/errdefs"
opspb "github.com/moby/buildkit/solver/pb"
@ -756,9 +757,12 @@ func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewConta
}
for _, m := range in.Mounts {
refProxy, err := lbf.convertRef(m.ResultID)
if err != nil {
return nil, errors.Wrapf(err, "failed to find ref %s for %q mount", m.ResultID, m.Dest)
var refProxy solver.ResultProxy
if m.ResultID != "" {
refProxy, err = lbf.convertRef(m.ResultID)
if err != nil {
return nil, errors.Wrapf(err, "failed to find ref %s for %q mount", m.ResultID, m.Dest)
}
}
ctrReq.Mounts = append(ctrReq.Mounts, Mount{
Dest: m.Dest,
@ -766,12 +770,16 @@ func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewConta
Readonly: m.Readonly,
MountType: m.MountType,
RefProxy: refProxy,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
})
}
// Not using `ctx` here because it will get cancelled as soon as NewContainer returns
// and we want the context to live for the duration of the container.
ctr, err := NewContainer(context.Background(), lbf.llbBridge, ctrReq)
group := session.NewGroup(lbf.sid)
ctr, err := NewContainer(context.Background(), lbf.llbBridge, lbf.llbBridge.SessionManager(), group, ctrReq)
if err != nil {
return nil, stack.Enable(err)
}

View File

@ -645,16 +645,30 @@ func (c *grpcClient) NewContainer(ctx context.Context, req client.NewContainerRe
id := identity.NewID()
var mounts []*opspb.Mount
for _, m := range req.Mounts {
ref, ok := m.Ref.(*reference)
if !ok {
return nil, errors.Errorf("unexpected type for reference, got %T", m.Ref)
if m.CacheOpt != nil {
mounts = append(mounts, &opspb.Mount{
Dest: m.Dest,
Selector: m.Selector,
Readonly: m.Readonly,
MountType: opspb.MountType_CACHE,
CacheOpt: m.CacheOpt,
})
continue
}
var resultID string
if m.Ref != nil {
ref, ok := m.Ref.(*reference)
if !ok {
return nil, errors.Errorf("unexpected type for reference, got %T", m.Ref)
}
resultID = ref.id
}
mounts = append(mounts, &opspb.Mount{
Dest: m.Dest,
Selector: m.Selector,
Readonly: m.Readonly,
MountType: m.MountType,
ResultID: ref.id,
ResultID: resultID,
})
}

View File

@ -142,6 +142,10 @@ func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest, sid st
return
}
func (b *llbBridge) SessionManager() *session.Manager {
return b.sm
}
type resultProxy struct {
cb func(context.Context) (solver.CachedResult, error)
def *pb.Definition

View File

@ -0,0 +1,522 @@
package mounts
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/sys"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/locker"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/secrets"
"github.com/moby/buildkit/session/sshforward"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"google.golang.org/grpc/codes"
)
type MountManager interface {
MountableCache(ctx context.Context, m *pb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error)
MountableTmpFS() cache.Mountable
MountableSecret(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error)
MountableSSH(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error)
}
// NewMountManager(fmt.Sprintf("exec %s",strings.Join(e.op.Meta.Args," ")))
func NewMountManager(name string, cm cache.Manager, sm *session.Manager, md *metadata.Store) MountManager {
return &mountManager{
cm: cm,
sm: sm,
cacheMounts: map[string]*cacheRefShare{},
md: md,
managerName: name,
}
}
type mountManager struct {
cm cache.Manager
sm *session.Manager
cacheMountsMu sync.Mutex
cacheMounts map[string]*cacheRefShare
md *metadata.Store
managerName string
}
func (mm *mountManager) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, m *pb.Mount, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) {
g := &cacheRefGetter{
locker: &mm.cacheMountsMu,
cacheMounts: mm.cacheMounts,
cm: mm.cm,
md: mm.md,
globalCacheRefs: sharedCacheRefs,
name: fmt.Sprintf("cached mount %s from %s", m.Dest, mm.managerName),
}
return g.getRefCacheDir(ctx, ref, id, sharing)
}
type cacheRefGetter struct {
locker sync.Locker
cacheMounts map[string]*cacheRefShare
cm cache.Manager
md *metadata.Store
globalCacheRefs *cacheRefs
name string
}
func (g *cacheRefGetter) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) {
key := "cache-dir:" + id
if ref != nil {
key += ":" + ref.ID()
}
mu := g.locker
mu.Lock()
defer mu.Unlock()
if ref, ok := g.cacheMounts[key]; ok {
return ref.clone(), nil
}
defer func() {
if err == nil {
share := &cacheRefShare{MutableRef: mref, refs: map[*cacheRef]struct{}{}}
g.cacheMounts[key] = share
mref = share.clone()
}
}()
switch sharing {
case pb.CacheSharingOpt_SHARED:
return g.globalCacheRefs.get(key, func() (cache.MutableRef, error) {
return g.getRefCacheDirNoCache(ctx, key, ref, id, false)
})
case pb.CacheSharingOpt_PRIVATE:
return g.getRefCacheDirNoCache(ctx, key, ref, id, false)
case pb.CacheSharingOpt_LOCKED:
return g.getRefCacheDirNoCache(ctx, key, ref, id, true)
default:
return nil, errors.Errorf("invalid cache sharing option: %s", sharing.String())
}
}
func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string, ref cache.ImmutableRef, id string, block bool) (cache.MutableRef, error) {
makeMutable := func(ref cache.ImmutableRef) (cache.MutableRef, error) {
return g.cm.New(ctx, ref, cache.WithRecordType(client.UsageRecordTypeCacheMount), cache.WithDescription(g.name), cache.CachePolicyRetain)
}
cacheRefsLocker.Lock(key)
defer cacheRefsLocker.Unlock(key)
for {
sis, err := g.md.Search(key)
if err != nil {
return nil, err
}
locked := false
for _, si := range sis {
if mRef, err := g.cm.GetMutable(ctx, si.ID()); err == nil {
logrus.Debugf("reusing ref for cache dir: %s", mRef.ID())
return mRef, nil
} else if errors.Is(err, cache.ErrLocked) {
locked = true
}
}
if block && locked {
cacheRefsLocker.Unlock(key)
select {
case <-ctx.Done():
cacheRefsLocker.Lock(key)
return nil, ctx.Err()
case <-time.After(100 * time.Millisecond):
cacheRefsLocker.Lock(key)
}
} else {
break
}
}
mRef, err := makeMutable(ref)
if err != nil {
return nil, err
}
si, _ := g.md.Get(mRef.ID())
v, err := metadata.NewValue(key)
if err != nil {
mRef.Release(context.TODO())
return nil, err
}
v.Index = key
if err := si.Update(func(b *bolt.Bucket) error {
return si.SetValue(b, key, v)
}); err != nil {
mRef.Release(context.TODO())
return nil, err
}
return mRef, nil
}
func (mm *mountManager) getSSHMountable(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
var caller session.Caller
err := mm.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error {
if err := sshforward.CheckSSHID(ctx, c, m.SSHOpt.ID); err != nil {
if m.SSHOpt.Optional {
return nil
}
if grpcerrors.Code(err) == codes.Unimplemented {
return errors.Errorf("no SSH key %q forwarded from the client", m.SSHOpt.ID)
}
return err
}
caller = c
return nil
})
if err != nil {
return nil, err
}
// because ssh socket remains active, to actually handle session disconnecting ssh error
// should restart the whole exec with new session
return &sshMount{mount: m, caller: caller, idmap: mm.cm.IdentityMapping()}, nil
}
type sshMount struct {
mount *pb.Mount
caller session.Caller
idmap *idtools.IdentityMapping
}
func (sm *sshMount) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
return &sshMountInstance{sm: sm, idmap: sm.idmap}, nil
}
type sshMountInstance struct {
sm *sshMount
idmap *idtools.IdentityMapping
}
func (sm *sshMountInstance) Mount() ([]mount.Mount, func() error, error) {
ctx, cancel := context.WithCancel(context.TODO())
uid := int(sm.sm.mount.SSHOpt.Uid)
gid := int(sm.sm.mount.SSHOpt.Gid)
if sm.idmap != nil {
identity, err := sm.idmap.ToHost(idtools.Identity{
UID: uid,
GID: gid,
})
if err != nil {
cancel()
return nil, nil, err
}
uid = identity.UID
gid = identity.GID
}
sock, cleanup, err := sshforward.MountSSHSocket(ctx, sm.sm.caller, sshforward.SocketOpt{
ID: sm.sm.mount.SSHOpt.ID,
UID: uid,
GID: gid,
Mode: int(sm.sm.mount.SSHOpt.Mode & 0777),
})
if err != nil {
cancel()
return nil, nil, err
}
release := func() error {
var err error
if cleanup != nil {
err = cleanup()
}
cancel()
return err
}
return []mount.Mount{{
Type: "bind",
Source: sock,
Options: []string{"rbind"},
}}, release, nil
}
func (sm *sshMountInstance) IdentityMapping() *idtools.IdentityMapping {
return sm.idmap
}
func (mm *mountManager) getSecretMountable(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
if m.SecretOpt == nil {
return nil, errors.Errorf("invalid secret mount options")
}
sopt := *m.SecretOpt
id := sopt.ID
if id == "" {
return nil, errors.Errorf("secret ID missing from mount options")
}
var dt []byte
var err error
err = mm.sm.Any(ctx, g, func(ctx context.Context, _ string, caller session.Caller) error {
dt, err = secrets.GetSecret(ctx, caller, id)
if err != nil {
if errors.Is(err, secrets.ErrNotFound) && m.SecretOpt.Optional {
return nil
}
return err
}
return nil
})
if err != nil || dt == nil {
return nil, err
}
return &secretMount{mount: m, data: dt, idmap: mm.cm.IdentityMapping()}, nil
}
type secretMount struct {
mount *pb.Mount
data []byte
idmap *idtools.IdentityMapping
}
func (sm *secretMount) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
return &secretMountInstance{sm: sm, idmap: sm.idmap}, nil
}
type secretMountInstance struct {
sm *secretMount
root string
idmap *idtools.IdentityMapping
}
func (sm *secretMountInstance) Mount() ([]mount.Mount, func() error, error) {
dir, err := ioutil.TempDir("", "buildkit-secrets")
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create temp dir")
}
cleanupDir := func() error {
return os.RemoveAll(dir)
}
if err := os.Chmod(dir, 0711); err != nil {
cleanupDir()
return nil, nil, err
}
tmpMount := mount.Mount{
Type: "tmpfs",
Source: "tmpfs",
Options: []string{"nodev", "nosuid", "noexec", fmt.Sprintf("uid=%d,gid=%d", os.Geteuid(), os.Getegid())},
}
if sys.RunningInUserNS() {
tmpMount.Options = nil
}
if err := mount.All([]mount.Mount{tmpMount}, dir); err != nil {
cleanupDir()
return nil, nil, errors.Wrap(err, "unable to setup secret mount")
}
sm.root = dir
cleanup := func() error {
if err := mount.Unmount(dir, 0); err != nil {
return err
}
return cleanupDir()
}
randID := identity.NewID()
fp := filepath.Join(dir, randID)
if err := ioutil.WriteFile(fp, sm.sm.data, 0600); err != nil {
cleanup()
return nil, nil, err
}
uid := int(sm.sm.mount.SecretOpt.Uid)
gid := int(sm.sm.mount.SecretOpt.Gid)
if sm.idmap != nil {
identity, err := sm.idmap.ToHost(idtools.Identity{
UID: uid,
GID: gid,
})
if err != nil {
cleanup()
return nil, nil, err
}
uid = identity.UID
gid = identity.GID
}
if err := os.Chown(fp, uid, gid); err != nil {
cleanup()
return nil, nil, err
}
if err := os.Chmod(fp, os.FileMode(sm.sm.mount.SecretOpt.Mode&0777)); err != nil {
cleanup()
return nil, nil, err
}
return []mount.Mount{{
Type: "bind",
Source: fp,
Options: []string{"ro", "rbind", "nodev", "nosuid", "noexec"},
}}, cleanup, nil
}
func (sm *secretMountInstance) IdentityMapping() *idtools.IdentityMapping {
return sm.idmap
}
func (mm *mountManager) MountableCache(ctx context.Context, m *pb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) {
if m.CacheOpt == nil {
return nil, errors.Errorf("missing cache mount options")
}
return mm.getRefCacheDir(ctx, ref, m.CacheOpt.ID, m, m.CacheOpt.Sharing)
}
func (mm *mountManager) MountableTmpFS() cache.Mountable {
return newTmpfs(mm.cm.IdentityMapping())
}
func (mm *mountManager) MountableSecret(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
return mm.getSecretMountable(ctx, m, g)
}
func (mm *mountManager) MountableSSH(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
return mm.getSSHMountable(ctx, m, g)
}
func newTmpfs(idmap *idtools.IdentityMapping) cache.Mountable {
return &tmpfs{idmap: idmap}
}
type tmpfs struct {
idmap *idtools.IdentityMapping
}
func (f *tmpfs) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
return &tmpfsMount{readonly: readonly, idmap: f.idmap}, nil
}
type tmpfsMount struct {
readonly bool
idmap *idtools.IdentityMapping
}
func (m *tmpfsMount) Mount() ([]mount.Mount, func() error, error) {
opt := []string{"nosuid"}
if m.readonly {
opt = append(opt, "ro")
}
return []mount.Mount{{
Type: "tmpfs",
Source: "tmpfs",
Options: opt,
}}, func() error { return nil }, nil
}
func (m *tmpfsMount) IdentityMapping() *idtools.IdentityMapping {
return m.idmap
}
var cacheRefsLocker = locker.New()
var sharedCacheRefs = &cacheRefs{}
type cacheRefs struct {
mu sync.Mutex
shares map[string]*cacheRefShare
}
// ClearActiveCacheMounts clears shared cache mounts currently in use.
// Caller needs to hold CacheMountsLocker before calling
func ClearActiveCacheMounts() {
sharedCacheRefs.shares = nil
}
func CacheMountsLocker() sync.Locker {
return &sharedCacheRefs.mu
}
func (r *cacheRefs) get(key string, fn func() (cache.MutableRef, error)) (cache.MutableRef, error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.shares == nil {
r.shares = map[string]*cacheRefShare{}
}
share, ok := r.shares[key]
if ok {
return share.clone(), nil
}
mref, err := fn()
if err != nil {
return nil, err
}
share = &cacheRefShare{MutableRef: mref, main: r, key: key, refs: map[*cacheRef]struct{}{}}
r.shares[key] = share
return share.clone(), nil
}
type cacheRefShare struct {
cache.MutableRef
mu sync.Mutex
refs map[*cacheRef]struct{}
main *cacheRefs
key string
}
func (r *cacheRefShare) clone() cache.MutableRef {
cacheRef := &cacheRef{cacheRefShare: r}
if cacheRefCloneHijack != nil {
cacheRefCloneHijack()
}
r.mu.Lock()
r.refs[cacheRef] = struct{}{}
r.mu.Unlock()
return cacheRef
}
func (r *cacheRefShare) release(ctx context.Context) error {
if r.main != nil {
delete(r.main.shares, r.key)
}
return r.MutableRef.Release(ctx)
}
var cacheRefReleaseHijack func()
var cacheRefCloneHijack func()
type cacheRef struct {
*cacheRefShare
}
func (r *cacheRef) Release(ctx context.Context) error {
if r.main != nil {
r.main.mu.Lock()
defer r.main.mu.Unlock()
}
r.mu.Lock()
defer r.mu.Unlock()
delete(r.refs, r)
if len(r.refs) == 0 {
if cacheRefReleaseHijack != nil {
cacheRefReleaseHijack()
}
return r.release(ctx)
}
return nil
}

View File

@ -0,0 +1,387 @@
package mounts
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/diff/apply"
"github.com/containerd/containerd/leases"
ctdmetadata "github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/containerd/snapshots/native"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/snapshot"
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/leaseutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
bolt "go.etcd.io/bbolt"
"golang.org/x/sync/errgroup"
)
type cmOpt struct {
snapshotterName string
snapshotter snapshots.Snapshotter
tmpdir string
}
type cmOut struct {
manager cache.Manager
lm leases.Manager
cs content.Store
md *metadata.Store
}
func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() error, err error) {
ns, ok := namespaces.Namespace(ctx)
if !ok {
return nil, nil, errors.Errorf("namespace required for test")
}
if opt.snapshotterName == "" {
opt.snapshotterName = "native"
}
tmpdir, err := ioutil.TempDir("", "cachemanager")
if err != nil {
return nil, nil, err
}
defers := make([]func() error, 0)
cleanup = func() error {
var err error
for i := range defers {
if err1 := defers[len(defers)-1-i](); err1 != nil && err == nil {
err = err1
}
}
return err
}
defer func() {
if err != nil {
cleanup()
}
}()
if opt.tmpdir == "" {
defers = append(defers, func() error {
return os.RemoveAll(tmpdir)
})
} else {
os.RemoveAll(tmpdir)
tmpdir = opt.tmpdir
}
if opt.snapshotter == nil {
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
if err != nil {
return nil, nil, err
}
opt.snapshotter = snapshotter
}
md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db"))
if err != nil {
return nil, nil, err
}
store, err := local.NewStore(tmpdir)
if err != nil {
return nil, nil, err
}
db, err := bolt.Open(filepath.Join(tmpdir, "containerdmeta.db"), 0644, nil)
if err != nil {
return nil, nil, err
}
defers = append(defers, func() error {
return db.Close()
})
mdb := ctdmetadata.NewDB(db, store, map[string]snapshots.Snapshotter{
opt.snapshotterName: opt.snapshotter,
})
if err := mdb.Init(context.TODO()); err != nil {
return nil, nil, err
}
lm := ctdmetadata.NewLeaseManager(mdb)
cm, err := cache.NewManager(cache.ManagerOpt{
Snapshotter: snapshot.FromContainerdSnapshotter(opt.snapshotterName, containerdsnapshot.NSSnapshotter(ns, mdb.Snapshotter(opt.snapshotterName)), nil),
MetadataStore: md,
ContentStore: mdb.ContentStore(),
LeaseManager: leaseutil.WithNamespace(lm, ns),
GarbageCollect: mdb.GarbageCollect,
Applier: apply.NewFileSystemApplier(mdb.ContentStore()),
})
if err != nil {
return nil, nil, err
}
return &cmOut{
manager: cm,
lm: lm,
cs: mdb.ContentStore(),
md: md,
}, cleanup, nil
}
func newRefGetter(m cache.Manager, md *metadata.Store, shared *cacheRefs) *cacheRefGetter {
return &cacheRefGetter{
locker: &sync.Mutex{},
cacheMounts: map[string]*cacheRefShare{},
cm: m,
md: md,
globalCacheRefs: shared,
}
}
func TestCacheMountPrivateRefs(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g3 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g4 := newRefGetter(co.manager, co.md, sharedCacheRefs)
ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
ref2, err := g1.getRefCacheDir(ctx, nil, "bar", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
// different ID returns different ref
require.NotEqual(t, ref.ID(), ref2.ID())
// same ID on same mount still shares the reference
ref3, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref3.ID())
// same ID on different mount gets a new ID
ref4, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.NotEqual(t, ref.ID(), ref4.ID())
// releasing one of two refs still keeps first ID private
ref.Release(context.TODO())
ref5, err := g3.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.NotEqual(t, ref.ID(), ref5.ID())
require.NotEqual(t, ref4.ID(), ref5.ID())
// releasing all refs releases ID to be reused
ref3.Release(context.TODO())
ref5, err = g4.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref5.ID())
// other mounts still keep their IDs
ref6, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.Equal(t, ref4.ID(), ref6.ID())
}
func TestCacheMountSharedRefs(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g3 := newRefGetter(co.manager, co.md, sharedCacheRefs)
ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
ref2, err := g1.getRefCacheDir(ctx, nil, "bar", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
// different ID returns different ref
require.NotEqual(t, ref.ID(), ref2.ID())
// same ID on same mount still shares the reference
ref3, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref3.ID())
// same ID on different mount gets same ID
ref4, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref4.ID())
// private gets a new ID
ref5, err := g3.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.NotEqual(t, ref.ID(), ref5.ID())
}
func TestCacheMountLockedRefs(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)
ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_LOCKED)
require.NoError(t, err)
ref2, err := g1.getRefCacheDir(ctx, nil, "bar", pb.CacheSharingOpt_LOCKED)
require.NoError(t, err)
// different ID returns different ref
require.NotEqual(t, ref.ID(), ref2.ID())
// same ID on same mount still shares the reference
ref3, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_LOCKED)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref3.ID())
// same ID on different mount blocks
gotRef4 := make(chan struct{})
go func() {
ref4, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_LOCKED)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref4.ID())
close(gotRef4)
}()
select {
case <-gotRef4:
require.FailNow(t, "mount did not lock")
case <-time.After(500 * time.Millisecond):
}
ref.Release(ctx)
ref3.Release(ctx)
select {
case <-gotRef4:
case <-time.After(500 * time.Millisecond):
require.FailNow(t, "mount did not unlock")
}
}
// moby/buildkit#1322
func TestCacheMountSharedRefsDeadlock(t *testing.T) {
// not parallel
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
var sharedCacheRefs = &cacheRefs{}
g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)
ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
cacheRefReleaseHijack = func() {
time.Sleep(200 * time.Millisecond)
}
cacheRefCloneHijack = func() {
time.Sleep(400 * time.Millisecond)
}
defer func() {
cacheRefReleaseHijack = nil
cacheRefCloneHijack = nil
}()
eg, _ := errgroup.WithContext(context.TODO())
eg.Go(func() error {
return ref.Release(context.TODO())
})
eg.Go(func() error {
_, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
return err
})
done := make(chan struct{})
go func() {
err = eg.Wait()
require.NoError(t, err)
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
require.FailNow(t, "deadlock on releasing while getting new ref")
}
}

View File

@ -5,34 +5,21 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"path"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/sys"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/locker"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/secrets"
"github.com/moby/buildkit/session/sshforward"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
"github.com/moby/buildkit/solver/llbsolver/mounts"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/progress/logs"
utilsystem "github.com/moby/buildkit/util/system"
"github.com/moby/buildkit/worker"
@ -40,8 +27,6 @@ import (
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"google.golang.org/grpc/codes"
)
const execCacheType = "buildkit.exec.v0"
@ -49,31 +34,26 @@ const execCacheType = "buildkit.exec.v0"
type execOp struct {
op *pb.ExecOp
cm cache.Manager
sm *session.Manager
md *metadata.Store
mm mounts.MountManager
exec executor.Executor
w worker.Worker
platform *pb.Platform
numInputs int
cacheMounts map[string]*cacheRefShare
cacheMountsMu sync.Mutex
}
func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, sm *session.Manager, md *metadata.Store, exec executor.Executor, w worker.Worker) (solver.Op, error) {
if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil {
return nil, err
}
name := fmt.Sprintf("exec %s", strings.Join(op.Exec.Meta.Args, " "))
return &execOp{
op: op.Exec,
cm: cm,
sm: sm,
md: md,
exec: exec,
numInputs: len(v.Inputs()),
w: w,
platform: platform,
cacheMounts: map[string]*cacheRefShare{},
op: op.Exec,
mm: mounts.NewMountManager(name, cm, sm, md),
cm: cm,
exec: exec,
numInputs: len(v.Inputs()),
w: w,
platform: platform,
}, nil
}
@ -221,328 +201,6 @@ func (e *execOp) getMountDeps() ([]dep, error) {
return deps, nil
}
func (e *execOp) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, m *pb.Mount, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) {
g := &cacheRefGetter{
locker: &e.cacheMountsMu,
cacheMounts: e.cacheMounts,
cm: e.cm,
md: e.md,
globalCacheRefs: sharedCacheRefs,
name: fmt.Sprintf("cached mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " ")),
}
return g.getRefCacheDir(ctx, ref, id, sharing)
}
type cacheRefGetter struct {
locker sync.Locker
cacheMounts map[string]*cacheRefShare
cm cache.Manager
md *metadata.Store
globalCacheRefs *cacheRefs
name string
}
func (g *cacheRefGetter) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) {
key := "cache-dir:" + id
if ref != nil {
key += ":" + ref.ID()
}
mu := g.locker
mu.Lock()
defer mu.Unlock()
if ref, ok := g.cacheMounts[key]; ok {
return ref.clone(), nil
}
defer func() {
if err == nil {
share := &cacheRefShare{MutableRef: mref, refs: map[*cacheRef]struct{}{}}
g.cacheMounts[key] = share
mref = share.clone()
}
}()
switch sharing {
case pb.CacheSharingOpt_SHARED:
return g.globalCacheRefs.get(key, func() (cache.MutableRef, error) {
return g.getRefCacheDirNoCache(ctx, key, ref, id, false)
})
case pb.CacheSharingOpt_PRIVATE:
return g.getRefCacheDirNoCache(ctx, key, ref, id, false)
case pb.CacheSharingOpt_LOCKED:
return g.getRefCacheDirNoCache(ctx, key, ref, id, true)
default:
return nil, errors.Errorf("invalid cache sharing option: %s", sharing.String())
}
}
func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string, ref cache.ImmutableRef, id string, block bool) (cache.MutableRef, error) {
makeMutable := func(ref cache.ImmutableRef) (cache.MutableRef, error) {
return g.cm.New(ctx, ref, cache.WithRecordType(client.UsageRecordTypeCacheMount), cache.WithDescription(g.name), cache.CachePolicyRetain)
}
cacheRefsLocker.Lock(key)
defer cacheRefsLocker.Unlock(key)
for {
sis, err := g.md.Search(key)
if err != nil {
return nil, err
}
locked := false
for _, si := range sis {
if mRef, err := g.cm.GetMutable(ctx, si.ID()); err == nil {
logrus.Debugf("reusing ref for cache dir: %s", mRef.ID())
return mRef, nil
} else if errors.Is(err, cache.ErrLocked) {
locked = true
}
}
if block && locked {
cacheRefsLocker.Unlock(key)
select {
case <-ctx.Done():
cacheRefsLocker.Lock(key)
return nil, ctx.Err()
case <-time.After(100 * time.Millisecond):
cacheRefsLocker.Lock(key)
}
} else {
break
}
}
mRef, err := makeMutable(ref)
if err != nil {
return nil, err
}
si, _ := g.md.Get(mRef.ID())
v, err := metadata.NewValue(key)
if err != nil {
mRef.Release(context.TODO())
return nil, err
}
v.Index = key
if err := si.Update(func(b *bolt.Bucket) error {
return si.SetValue(b, key, v)
}); err != nil {
mRef.Release(context.TODO())
return nil, err
}
return mRef, nil
}
func (e *execOp) getSSHMountable(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
var caller session.Caller
err := e.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error {
if err := sshforward.CheckSSHID(ctx, c, m.SSHOpt.ID); err != nil {
if m.SSHOpt.Optional {
return nil
}
if grpcerrors.Code(err) == codes.Unimplemented {
return errors.Errorf("no SSH key %q forwarded from the client", m.SSHOpt.ID)
}
return err
}
caller = c
return nil
})
if err != nil {
return nil, err
}
// because ssh socket remains active, to actually handle session disconnecting ssh error
// should restart the whole exec with new session
return &sshMount{mount: m, caller: caller, idmap: e.cm.IdentityMapping()}, nil
}
type sshMount struct {
mount *pb.Mount
caller session.Caller
idmap *idtools.IdentityMapping
}
func (sm *sshMount) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
return &sshMountInstance{sm: sm, idmap: sm.idmap}, nil
}
type sshMountInstance struct {
sm *sshMount
idmap *idtools.IdentityMapping
}
func (sm *sshMountInstance) Mount() ([]mount.Mount, func() error, error) {
ctx, cancel := context.WithCancel(context.TODO())
uid := int(sm.sm.mount.SSHOpt.Uid)
gid := int(sm.sm.mount.SSHOpt.Gid)
if sm.idmap != nil {
identity, err := sm.idmap.ToHost(idtools.Identity{
UID: uid,
GID: gid,
})
if err != nil {
cancel()
return nil, nil, err
}
uid = identity.UID
gid = identity.GID
}
sock, cleanup, err := sshforward.MountSSHSocket(ctx, sm.sm.caller, sshforward.SocketOpt{
ID: sm.sm.mount.SSHOpt.ID,
UID: uid,
GID: gid,
Mode: int(sm.sm.mount.SSHOpt.Mode & 0777),
})
if err != nil {
cancel()
return nil, nil, err
}
release := func() error {
var err error
if cleanup != nil {
err = cleanup()
}
cancel()
return err
}
return []mount.Mount{{
Type: "bind",
Source: sock,
Options: []string{"rbind"},
}}, release, nil
}
func (sm *sshMountInstance) IdentityMapping() *idtools.IdentityMapping {
return sm.idmap
}
func (e *execOp) getSecretMountable(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
if m.SecretOpt == nil {
return nil, errors.Errorf("invalid sercet mount options")
}
sopt := *m.SecretOpt
id := sopt.ID
if id == "" {
return nil, errors.Errorf("secret ID missing from mount options")
}
var dt []byte
var err error
err = e.sm.Any(ctx, g, func(ctx context.Context, _ string, caller session.Caller) error {
dt, err = secrets.GetSecret(ctx, caller, id)
if err != nil {
if errors.Is(err, secrets.ErrNotFound) && m.SecretOpt.Optional {
return nil
}
return err
}
return nil
})
if err != nil || dt == nil {
return nil, err
}
return &secretMount{mount: m, data: dt, idmap: e.cm.IdentityMapping()}, nil
}
type secretMount struct {
mount *pb.Mount
data []byte
idmap *idtools.IdentityMapping
}
func (sm *secretMount) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
return &secretMountInstance{sm: sm, idmap: sm.idmap}, nil
}
type secretMountInstance struct {
sm *secretMount
root string
idmap *idtools.IdentityMapping
}
func (sm *secretMountInstance) Mount() ([]mount.Mount, func() error, error) {
dir, err := ioutil.TempDir("", "buildkit-secrets")
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create temp dir")
}
cleanupDir := func() error {
return os.RemoveAll(dir)
}
if err := os.Chmod(dir, 0711); err != nil {
cleanupDir()
return nil, nil, err
}
tmpMount := mount.Mount{
Type: "tmpfs",
Source: "tmpfs",
Options: []string{"nodev", "nosuid", "noexec", fmt.Sprintf("uid=%d,gid=%d", os.Geteuid(), os.Getegid())},
}
if sys.RunningInUserNS() {
tmpMount.Options = nil
}
if err := mount.All([]mount.Mount{tmpMount}, dir); err != nil {
cleanupDir()
return nil, nil, errors.Wrap(err, "unable to setup secret mount")
}
sm.root = dir
cleanup := func() error {
if err := mount.Unmount(dir, 0); err != nil {
return err
}
return cleanupDir()
}
randID := identity.NewID()
fp := filepath.Join(dir, randID)
if err := ioutil.WriteFile(fp, sm.sm.data, 0600); err != nil {
cleanup()
return nil, nil, err
}
uid := int(sm.sm.mount.SecretOpt.Uid)
gid := int(sm.sm.mount.SecretOpt.Gid)
if sm.idmap != nil {
identity, err := sm.idmap.ToHost(idtools.Identity{
UID: uid,
GID: gid,
})
if err != nil {
cleanup()
return nil, nil, err
}
uid = identity.UID
gid = identity.GID
}
if err := os.Chown(fp, uid, gid); err != nil {
cleanup()
return nil, nil, err
}
if err := os.Chmod(fp, os.FileMode(sm.sm.mount.SecretOpt.Mode&0777)); err != nil {
cleanup()
return nil, nil, err
}
return []mount.Mount{{
Type: "bind",
Source: fp,
Options: []string{"ro", "rbind", "nodev", "nosuid", "noexec"},
}}, cleanup, nil
}
func (sm *secretMountInstance) IdentityMapping() *idtools.IdentityMapping {
return sm.idmap
}
func addDefaultEnvvar(env []string, k, v string) []string {
for _, e := range env {
if strings.HasPrefix(e, k+"=") {
@ -599,7 +257,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
case pb.MountType_BIND:
// if mount creates an output
if m.Output != pb.SkipOutput {
// it it is readonly and not root then output is the input
// if it is readonly and not root then output is the input
if m.Readonly && ref != nil && m.Dest != pb.RootMount {
outputs = append(outputs, ref.Clone())
} else {
@ -622,10 +280,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
}
case pb.MountType_CACHE:
if m.CacheOpt == nil {
return nil, errors.Errorf("missing cache mount options")
}
mRef, err := e.getRefCacheDir(ctx, ref, m.CacheOpt.ID, m, m.CacheOpt.Sharing)
mRef, err := e.mm.MountableCache(ctx, m, ref)
if err != nil {
return nil, err
}
@ -638,27 +293,25 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
}
case pb.MountType_TMPFS:
mountable = newTmpfs(e.cm.IdentityMapping())
mountable = e.mm.MountableTmpFS()
case pb.MountType_SECRET:
secretMount, err := e.getSecretMountable(ctx, m, g)
var err error
mountable, err = e.mm.MountableSecret(ctx, m, g)
if err != nil {
return nil, err
}
if secretMount == nil {
if mountable == nil {
continue
}
mountable = secretMount
case pb.MountType_SSH:
sshMount, err := e.getSSHMountable(ctx, m, g)
var err error
mountable, err = e.mm.MountableSSH(ctx, m, g)
if err != nil {
return nil, err
}
if sshMount == nil {
if mountable == nil {
continue
}
mountable = sshMount
default:
return nil, errors.Errorf("mount type %s not implemented", m.MountType)
@ -769,130 +422,6 @@ func proxyEnvList(p *pb.ProxyEnv) []string {
return out
}
func newTmpfs(idmap *idtools.IdentityMapping) cache.Mountable {
return &tmpfs{idmap: idmap}
}
type tmpfs struct {
idmap *idtools.IdentityMapping
}
func (f *tmpfs) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
return &tmpfsMount{readonly: readonly, idmap: f.idmap}, nil
}
type tmpfsMount struct {
readonly bool
idmap *idtools.IdentityMapping
}
func (m *tmpfsMount) Mount() ([]mount.Mount, func() error, error) {
opt := []string{"nosuid"}
if m.readonly {
opt = append(opt, "ro")
}
return []mount.Mount{{
Type: "tmpfs",
Source: "tmpfs",
Options: opt,
}}, func() error { return nil }, nil
}
func (m *tmpfsMount) IdentityMapping() *idtools.IdentityMapping {
return m.idmap
}
var cacheRefsLocker = locker.New()
var sharedCacheRefs = &cacheRefs{}
type cacheRefs struct {
mu sync.Mutex
shares map[string]*cacheRefShare
}
// ClearActiveCacheMounts clears shared cache mounts currently in use.
// Caller needs to hold CacheMountsLocker before calling
func ClearActiveCacheMounts() {
sharedCacheRefs.shares = nil
}
func CacheMountsLocker() sync.Locker {
return &sharedCacheRefs.mu
}
func (r *cacheRefs) get(key string, fn func() (cache.MutableRef, error)) (cache.MutableRef, error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.shares == nil {
r.shares = map[string]*cacheRefShare{}
}
share, ok := r.shares[key]
if ok {
return share.clone(), nil
}
mref, err := fn()
if err != nil {
return nil, err
}
share = &cacheRefShare{MutableRef: mref, main: r, key: key, refs: map[*cacheRef]struct{}{}}
r.shares[key] = share
return share.clone(), nil
}
type cacheRefShare struct {
cache.MutableRef
mu sync.Mutex
refs map[*cacheRef]struct{}
main *cacheRefs
key string
}
func (r *cacheRefShare) clone() cache.MutableRef {
cacheRef := &cacheRef{cacheRefShare: r}
if cacheRefCloneHijack != nil {
cacheRefCloneHijack()
}
r.mu.Lock()
r.refs[cacheRef] = struct{}{}
r.mu.Unlock()
return cacheRef
}
func (r *cacheRefShare) release(ctx context.Context) error {
if r.main != nil {
delete(r.main.shares, r.key)
}
return r.MutableRef.Release(ctx)
}
var cacheRefReleaseHijack func()
var cacheRefCloneHijack func()
type cacheRef struct {
*cacheRefShare
}
func (r *cacheRef) Release(ctx context.Context) error {
if r.main != nil {
r.main.mu.Lock()
defer r.main.mu.Unlock()
}
r.mu.Lock()
defer r.mu.Unlock()
delete(r.refs, r)
if len(r.refs) == 0 {
if cacheRefReleaseHijack != nil {
cacheRefReleaseHijack()
}
return r.release(ctx)
}
return nil
}
func parseExtraHosts(ips []*pb.HostIP) ([]executor.HostIP, error) {
out := make([]executor.HostIP, len(ips))
for i, hip := range ips {

View File

@ -1,140 +1,11 @@
package ops
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/diff/apply"
"github.com/containerd/containerd/leases"
ctdmetadata "github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/containerd/snapshots/native"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/snapshot"
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/leaseutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
bolt "go.etcd.io/bbolt"
"golang.org/x/sync/errgroup"
)
type cmOpt struct {
snapshotterName string
snapshotter snapshots.Snapshotter
tmpdir string
}
type cmOut struct {
manager cache.Manager
lm leases.Manager
cs content.Store
md *metadata.Store
}
func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() error, err error) {
ns, ok := namespaces.Namespace(ctx)
if !ok {
return nil, nil, errors.Errorf("namespace required for test")
}
if opt.snapshotterName == "" {
opt.snapshotterName = "native"
}
tmpdir, err := ioutil.TempDir("", "cachemanager")
if err != nil {
return nil, nil, err
}
defers := make([]func() error, 0)
cleanup = func() error {
var err error
for i := range defers {
if err1 := defers[len(defers)-1-i](); err1 != nil && err == nil {
err = err1
}
}
return err
}
defer func() {
if err != nil {
cleanup()
}
}()
if opt.tmpdir == "" {
defers = append(defers, func() error {
return os.RemoveAll(tmpdir)
})
} else {
os.RemoveAll(tmpdir)
tmpdir = opt.tmpdir
}
if opt.snapshotter == nil {
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
if err != nil {
return nil, nil, err
}
opt.snapshotter = snapshotter
}
md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db"))
if err != nil {
return nil, nil, err
}
store, err := local.NewStore(tmpdir)
if err != nil {
return nil, nil, err
}
db, err := bolt.Open(filepath.Join(tmpdir, "containerdmeta.db"), 0644, nil)
if err != nil {
return nil, nil, err
}
defers = append(defers, func() error {
return db.Close()
})
mdb := ctdmetadata.NewDB(db, store, map[string]snapshots.Snapshotter{
opt.snapshotterName: opt.snapshotter,
})
if err := mdb.Init(context.TODO()); err != nil {
return nil, nil, err
}
lm := ctdmetadata.NewLeaseManager(mdb)
cm, err := cache.NewManager(cache.ManagerOpt{
Snapshotter: snapshot.FromContainerdSnapshotter(opt.snapshotterName, containerdsnapshot.NSSnapshotter(ns, mdb.Snapshotter(opt.snapshotterName)), nil),
MetadataStore: md,
ContentStore: mdb.ContentStore(),
LeaseManager: leaseutil.WithNamespace(lm, ns),
GarbageCollect: mdb.GarbageCollect,
Applier: apply.NewFileSystemApplier(mdb.ContentStore()),
})
if err != nil {
return nil, nil, err
}
return &cmOut{
manager: cm,
lm: lm,
cs: mdb.ContentStore(),
md: md,
}, cleanup, nil
}
func TestDedupPaths(t *testing.T) {
res := dedupePaths([]string{"Gemfile", "Gemfile/foo"})
require.Equal(t, []string{"Gemfile"}, res)
@ -154,254 +25,3 @@ func TestDedupPaths(t *testing.T) {
res = dedupePaths([]string{"foo/bar/baz", "foo/bara", "foo/bar/bax", "foo/bar"})
require.Equal(t, []string{"foo/bar", "foo/bara"}, res)
}
func newRefGetter(m cache.Manager, md *metadata.Store, shared *cacheRefs) *cacheRefGetter {
return &cacheRefGetter{
locker: &sync.Mutex{},
cacheMounts: map[string]*cacheRefShare{},
cm: m,
md: md,
globalCacheRefs: shared,
}
}
func TestCacheMountPrivateRefs(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g3 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g4 := newRefGetter(co.manager, co.md, sharedCacheRefs)
ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
ref2, err := g1.getRefCacheDir(ctx, nil, "bar", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
// different ID returns different ref
require.NotEqual(t, ref.ID(), ref2.ID())
// same ID on same mount still shares the reference
ref3, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref3.ID())
// same ID on different mount gets a new ID
ref4, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.NotEqual(t, ref.ID(), ref4.ID())
// releasing one of two refs still keeps first ID private
ref.Release(context.TODO())
ref5, err := g3.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.NotEqual(t, ref.ID(), ref5.ID())
require.NotEqual(t, ref4.ID(), ref5.ID())
// releasing all refs releases ID to be reused
ref3.Release(context.TODO())
ref5, err = g4.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref5.ID())
// other mounts still keep their IDs
ref6, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.Equal(t, ref4.ID(), ref6.ID())
}
func TestCacheMountSharedRefs(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g3 := newRefGetter(co.manager, co.md, sharedCacheRefs)
ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
ref2, err := g1.getRefCacheDir(ctx, nil, "bar", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
// different ID returns different ref
require.NotEqual(t, ref.ID(), ref2.ID())
// same ID on same mount still shares the reference
ref3, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref3.ID())
// same ID on different mount gets same ID
ref4, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref4.ID())
// private gets a new ID
ref5, err := g3.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
require.NotEqual(t, ref.ID(), ref5.ID())
}
func TestCacheMountLockedRefs(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)
ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_LOCKED)
require.NoError(t, err)
ref2, err := g1.getRefCacheDir(ctx, nil, "bar", pb.CacheSharingOpt_LOCKED)
require.NoError(t, err)
// different ID returns different ref
require.NotEqual(t, ref.ID(), ref2.ID())
// same ID on same mount still shares the reference
ref3, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_LOCKED)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref3.ID())
// same ID on different mount blocks
gotRef4 := make(chan struct{})
go func() {
ref4, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_LOCKED)
require.NoError(t, err)
require.Equal(t, ref.ID(), ref4.ID())
close(gotRef4)
}()
select {
case <-gotRef4:
require.FailNow(t, "mount did not lock")
case <-time.After(500 * time.Millisecond):
}
ref.Release(ctx)
ref3.Release(ctx)
select {
case <-gotRef4:
case <-time.After(500 * time.Millisecond):
require.FailNow(t, "mount did not unlock")
}
}
// moby/buildkit#1322
func TestCacheMountSharedRefsDeadlock(t *testing.T) {
// not parallel
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)
co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()
var sharedCacheRefs = &cacheRefs{}
g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)
ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)
cacheRefReleaseHijack = func() {
time.Sleep(200 * time.Millisecond)
}
cacheRefCloneHijack = func() {
time.Sleep(400 * time.Millisecond)
}
defer func() {
cacheRefReleaseHijack = nil
cacheRefCloneHijack = nil
}()
eg, _ := errgroup.WithContext(context.TODO())
eg.Go(func() error {
return ref.Release(context.TODO())
})
eg.Go(func() error {
_, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
return err
})
done := make(chan struct{})
go func() {
err = eg.Wait()
require.NoError(t, err)
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
require.FailNow(t, "deadlock on releasing while getting new ref")
}
}

View File

@ -33,6 +33,7 @@ import (
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/snapshot/imagerefchecker"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver/mounts"
"github.com/moby/buildkit/solver/llbsolver/ops"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
@ -62,7 +63,7 @@ type WorkerOpt struct {
Labels map[string]string
Platforms []specs.Platform
GCPolicy []client.PruneInfo
MetadataStore *metadata.Store
MdStore *metadata.Store
Executor executor.Executor
Snapshotter snapshot.Snapshotter
ContentStore content.Store
@ -94,7 +95,7 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
cm, err := cache.NewManager(cache.ManagerOpt{
Snapshotter: opt.Snapshotter,
MetadataStore: opt.MetadataStore,
MetadataStore: opt.MdStore,
PruneRefChecker: imageRefChecker,
Applier: opt.Applier,
GarbageCollect: opt.GarbageCollect,
@ -129,7 +130,7 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
if err := git.Supported(); err == nil {
gs, err := git.NewSource(git.Opt{
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
MetadataStore: opt.MdStore,
})
if err != nil {
return nil, err
@ -141,7 +142,7 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
hs, err := http.NewSource(http.Opt{
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
MetadataStore: opt.MdStore,
})
if err != nil {
return nil, err
@ -151,7 +152,7 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
ss, err := local.NewSource(local.Opt{
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
MetadataStore: opt.MdStore,
})
if err != nil {
return nil, err
@ -252,15 +253,19 @@ func (w *Worker) CacheManager() cache.Manager {
return w.CacheMgr
}
func (w *Worker) MetadataStore() *metadata.Store {
return w.MdStore
}
func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error) {
if baseOp, ok := v.Sys().(*pb.Op); ok {
switch op := baseOp.Op.(type) {
case *pb.Op_Source:
return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, sm, w)
case *pb.Op_Exec:
return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, sm, w.MetadataStore, w.WorkerOpt.Executor, w)
return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, sm, w.MdStore, w.WorkerOpt.Executor, w)
case *pb.Op_File:
return ops.NewFileOp(v, op, w.CacheMgr, w.MetadataStore, w)
return ops.NewFileOp(v, op, w.CacheMgr, w.MdStore, w)
case *pb.Op_Build:
return ops.NewBuildOp(v, op, s, w)
default:
@ -271,13 +276,13 @@ func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *se
}
func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error {
mu := ops.CacheMountsLocker()
mu := mounts.CacheMountsLocker()
mu.Lock()
defer mu.Unlock()
for _, id := range ids {
id = "cache-dir:" + id
sis, err := w.MetadataStore.Search(id)
sis, err := w.MdStore.Search(id)
if err != nil {
return err
}
@ -306,7 +311,7 @@ func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error {
}
}
ops.ClearActiveCacheMounts()
mounts.ClearActiveCacheMounts()
return nil
}

View File

@ -103,7 +103,7 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
opt := base.WorkerOpt{
ID: id,
Labels: xlabels,
MetadataStore: md,
MdStore: md,
Executor: containerdexecutor.New(client, root, "", np, dns),
Snapshotter: snap,
ContentStore: cs,

View File

@ -112,7 +112,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
opt = base.WorkerOpt{
ID: id,
Labels: xlabels,
MetadataStore: md,
MdStore: md,
Executor: exe,
Snapshotter: snap,
ContentStore: c,

View File

@ -5,6 +5,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/executor"
@ -35,6 +36,7 @@ type Worker interface {
ContentStore() content.Store
Executor() executor.Executor
CacheManager() cache.Manager
MetadataStore() *metadata.Store
}
// Pre-defined label keys