Allow scratch mounts with gateway.RunContainer

- Plumb default worker by adding GetDefault() to frontend.WorkerInfos
- To avoid cyclic dependency, refactor frontend.WorkerInfos to worker.Infos
- Refactor gateway.NewContainer to share code with llbsolver/ops/exec.go

Signed-off-by: Edgar Lee <edgarl@netflix.com>
v0.8
Edgar Lee 2020-11-13 21:58:20 -08:00
parent a459eb4927
commit bcff7baf60
9 changed files with 386 additions and 392 deletions

View File

@ -1084,79 +1084,139 @@ func testClientGatewayExecError(t *testing.T, sb integration.Sandbox) {
require.NoError(t, err)
defer c.Close()
id := identity.NewID()
st := llb.Image("busybox:latest").Run(
llb.Dir("/src"),
llb.AddMount("/src", llb.Scratch()),
llb.Shlexf("sh -c \"echo %s > output && fail\"", id),
).Root()
def, err := st.Marshal(ctx)
require.NoError(t, err)
b := func(ctx context.Context, c client.Client) (*client.Result, error) {
_, solveErr := c.Solve(ctx, client.SolveRequest{
Evaluate: true,
Definition: def.ToPB(),
})
require.Error(t, solveErr)
id := identity.NewID()
tests := []struct {
Name string
State llb.State
NumMounts int
Paths []string
}{{
"only rootfs",
llb.Image("busybox:latest").Run(
llb.Shlexf(`sh -c "echo %s > /data && fail"`, id),
).Root(),
1, []string{"/data"},
}, {
"rootfs and readwrite scratch mount",
llb.Image("busybox:latest").Run(
llb.Shlexf(`sh -c "echo %s > /data && echo %s > /rw/data && fail"`, id, id),
llb.AddMount("/rw", llb.Scratch()),
).Root(),
2, []string{"/data", "/rw/data"},
}, {
"rootfs and readwrite mount",
llb.Image("busybox:latest").Run(
llb.Shlexf(`sh -c "echo %s > /data && echo %s > /rw/data && fail"`, id, id),
llb.AddMount("/rw", llb.Scratch().File(llb.Mkfile("foo", 0700, []byte(id)))),
).Root(),
2, []string{"/data", "/rw/data", "/rw/foo"},
}, {
"rootfs and readonly scratch mount",
llb.Image("busybox:latest").Run(
llb.Shlexf(`sh -c "echo %s > /data && echo %s > /readonly/foo"`, id, id),
llb.AddMount("/readonly", llb.Scratch(), llb.Readonly),
).Root(),
2, []string{"/data"},
}}
var se *errdefs.SolveError
require.True(t, errors.As(solveErr, &se))
for _, tt := range tests {
tt := tt
t.Run(tt.Name, func(t *testing.T) {
def, err := tt.State.Marshal(ctx)
require.NoError(t, err)
op := se.Solve.Op
opExec, ok := se.Solve.Op.Op.(*pb.Op_Exec)
require.True(t, ok)
_, solveErr := c.Solve(ctx, client.SolveRequest{
Evaluate: true,
Definition: def.ToPB(),
})
require.Error(t, solveErr)
exec := opExec.Exec
var se *errdefs.SolveError
require.True(t, errors.As(solveErr, &se))
require.Len(t, se.InputIDs, tt.NumMounts)
require.Len(t, se.OutputIDs, tt.NumMounts)
var mounts []client.Mount
for _, mnt := range exec.Mounts {
mounts = append(mounts, client.Mount{
Selector: mnt.Selector,
Dest: mnt.Dest,
ResultID: se.Solve.OutputIDs[mnt.Output],
Readonly: mnt.Readonly,
MountType: mnt.MountType,
CacheOpt: mnt.CacheOpt,
SecretOpt: mnt.SecretOpt,
SSHOpt: mnt.SSHOpt,
op := se.Solve.Op
require.NotNil(t, op)
require.NotNil(t, op.Op)
opExec, ok := se.Solve.Op.Op.(*pb.Op_Exec)
require.True(t, ok)
exec := opExec.Exec
var mounts []client.Mount
for i, mnt := range exec.Mounts {
mounts = append(mounts, client.Mount{
Selector: mnt.Selector,
Dest: mnt.Dest,
ResultID: se.Solve.OutputIDs[i],
Readonly: mnt.Readonly,
MountType: mnt.MountType,
CacheOpt: mnt.CacheOpt,
SecretOpt: mnt.SecretOpt,
SSHOpt: mnt.SSHOpt,
})
}
ctr, err := c.NewContainer(ctx, client.NewContainerRequest{
Mounts: mounts,
NetMode: exec.Network,
Platform: op.Platform,
Constraints: op.Constraints,
})
require.NoError(t, err)
defer ctr.Release(ctx)
inputR, inputW := io.Pipe()
defer inputW.Close()
defer inputR.Close()
pid1Output := bytes.NewBuffer(nil)
prompt := newTestPrompt(ctx, t, inputW, pid1Output)
pid1, err := ctr.Start(ctx, client.StartRequest{
Args: []string{"sh"},
Tty: true,
Stdin: inputR,
Stdout: &nopCloser{pid1Output},
Stderr: &nopCloser{pid1Output},
Env: []string{fmt.Sprintf("PS1=%s", prompt.String())},
})
require.NoError(t, err)
meta := exec.Meta
for _, p := range tt.Paths {
output := bytes.NewBuffer(nil)
proc, err := ctr.Start(ctx, client.StartRequest{
Args: []string{"cat", p},
Env: meta.Env,
User: meta.User,
Cwd: meta.Cwd,
Stdout: &nopCloser{output},
SecurityMode: exec.Security,
})
require.NoError(t, err)
err = proc.Wait()
require.NoError(t, err)
require.Equal(t, id, strings.TrimSpace(output.String()))
}
prompt.SendExit(0)
err = pid1.Wait()
require.NoError(t, err)
})
}
ctr, err := c.NewContainer(ctx, client.NewContainerRequest{
Mounts: mounts,
NetMode: exec.Network,
Platform: op.Platform,
Constraints: op.Constraints,
})
require.NoError(t, err)
defer ctr.Release(ctx)
meta := exec.Meta
output := bytes.NewBuffer(nil)
proc, err := ctr.Start(ctx, client.StartRequest{
Args: []string{"cat", "output"},
Env: meta.Env,
User: meta.User,
Cwd: meta.Cwd,
Stdout: &nopCloser{output},
SecurityMode: exec.Security,
})
require.NoError(t, err)
err = proc.Wait()
require.NoError(t, err)
require.Equal(t, id, strings.TrimSpace(output.String()))
return nil, solveErr
return client.NewResult(), nil
}
_, err = c.Build(ctx, SolveOpt{}, "buildkit_test", b, nil)
require.Error(t, err)
require.NoError(t, err)
checkAllReleasable(t, c, sb, true)
// checkAllReleasable(t, c, sb, true)
}
// testClientGatewaySlowCacheExecError is testing gateway exec into the ref

View File

@ -3,9 +3,7 @@ package frontend
import (
"context"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/executor"
gw "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/pb"
@ -17,7 +15,6 @@ type Frontend interface {
}
type FrontendLLBBridge interface {
executor.Executor
Solve(ctx context.Context, req SolveRequest, sid string) (*Result, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (digest.Digest, []byte, error)
}
@ -25,7 +22,3 @@ type FrontendLLBBridge interface {
type SolveRequest = gw.SolveRequest
type CacheOptionsEntry = gw.CacheOptionsEntry
type WorkerInfos interface {
WorkerInfos() []client.WorkerInfo
}

View File

@ -35,29 +35,11 @@ type NewContainerRequest struct {
// except is has a RefProxy instead of Ref to allow for a common abstraction
// between gateway clients.
type Mount struct {
Dest string
Selector string
Readonly bool
MountType opspb.MountType
*opspb.Mount
WorkerRef *worker.WorkerRef
CacheOpt *opspb.CacheOpt
SecretOpt *opspb.SecretOpt
SSHOpt *opspb.SSHOpt
}
func toProtoMount(m Mount) *opspb.Mount {
return &opspb.Mount{
Selector: m.Selector,
Dest: m.Dest,
Readonly: m.Readonly,
MountType: m.MountType,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
}
}
func NewContainer(ctx context.Context, e executor.Executor, sm *session.Manager, g session.Group, req NewContainerRequest) (client.Container, error) {
func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g session.Group, req NewContainerRequest) (client.Container, error) {
ctx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
platform := opspb.Platform{
@ -71,122 +53,202 @@ func NewContainer(ctx context.Context, e executor.Executor, sm *session.Manager,
id: req.ContainerID,
netMode: req.NetMode,
platform: platform,
executor: e,
executor: w.Executor(),
errGroup: eg,
ctx: ctx,
cancel: cancel,
}
makeMutable := func(worker worker.Worker, ref cache.ImmutableRef) (cache.MutableRef, error) {
mRef, err := worker.CacheManager().New(ctx, ref, g)
if err != nil {
return nil, stack.Enable(err)
}
ctr.cleanup = append(ctr.cleanup, func() error {
return stack.Enable(mRef.Release(context.TODO()))
})
return mRef, nil
}
var mm *mounts.MountManager
mnts := req.Mounts
for i, m := range mnts {
if m.Dest == opspb.RootMount && m.WorkerRef != nil {
name := fmt.Sprintf("container %s", req.ContainerID)
mm = mounts.NewMountManager(name, m.WorkerRef.Worker.CacheManager(), sm, m.WorkerRef.Worker.MetadataStore())
ctr.rootFS = mountWithSession(m.WorkerRef.ImmutableRef, g)
if !m.Readonly {
ref, err := makeMutable(m.WorkerRef.Worker, m.WorkerRef.ImmutableRef)
if err != nil {
return nil, stack.Enable(err)
}
ctr.rootFS = mountWithSession(ref, g)
}
// delete root mount from list, handled here
mnts = append(mnts[:i], mnts[i+1:]...)
break
}
}
if ctr.rootFS.Src == nil {
return nil, errors.Errorf("root mount required")
}
for _, m := range mnts {
var ref cache.ImmutableRef
var mountable cache.Mountable
var (
mnts []*opspb.Mount
refs []*worker.WorkerRef
)
for _, m := range req.Mounts {
mnts = append(mnts, m.Mount)
if m.WorkerRef != nil {
ref = m.WorkerRef.ImmutableRef
mountable = ref
if !m.Readonly {
var err error
mountable, err = makeMutable(m.WorkerRef.Worker, ref)
if err != nil {
return nil, stack.Enable(err)
}
}
refs = append(refs, m.WorkerRef)
m.Mount.Input = opspb.InputIndex(len(refs) - 1)
} else {
m.Mount.Input = opspb.Empty
}
}
name := fmt.Sprintf("container %s", req.ContainerID)
mm := mounts.NewMountManager(name, w.CacheManager(), sm, w.MetadataStore())
p, err := PrepareMounts(ctx, mm, w.CacheManager(), g, mnts, refs, func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) {
cm := w.CacheManager()
if m.Input != opspb.Empty {
cm = refs[m.Input].Worker.CacheManager()
}
return cm.New(ctx, ref, g)
})
if err != nil {
for i := len(p.Actives) - 1; i >= 0; i-- { // call in LIFO order
p.Actives[i].Release(context.TODO())
}
for _, o := range p.OutputRefs {
o.Ref.Release(context.TODO())
}
return nil, err
}
ctr.rootFS = p.Root
ctr.mounts = p.Mounts
for _, o := range p.OutputRefs {
ctr.cleanup = append(ctr.cleanup, func() error {
return o.Ref.Release(context.TODO())
})
}
for _, active := range p.Actives {
ctr.cleanup = append(ctr.cleanup, func() error {
return active.Release(context.TODO())
})
}
return ctr, nil
}
type PreparedMounts struct {
Root executor.Mount
ReadonlyRootFS bool
Mounts []executor.Mount
OutputRefs []OutputRef
Actives []cache.MutableRef
}
type OutputRef struct {
Ref cache.Ref
MountIndex int
}
type MakeMutable func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error)
func PrepareMounts(ctx context.Context, mm *mounts.MountManager, cm cache.Manager, g session.Group, mnts []*opspb.Mount, refs []*worker.WorkerRef, makeMutable MakeMutable) (p PreparedMounts, err error) {
// loop over all mounts, fill in mounts, root and outputs
for i, m := range mnts {
var (
mountable cache.Mountable
ref cache.ImmutableRef
)
if m.Dest == opspb.RootMount && m.MountType != opspb.MountType_BIND {
return p, errors.Errorf("invalid mount type %s for %s", m.MountType.String(), m.Dest)
}
// if mount is based on input validate and load it
if m.Input != opspb.Empty {
if int(m.Input) > len(refs) {
return p, errors.Errorf("missing input %d", m.Input)
}
ref = refs[int(m.Input)].ImmutableRef
mountable = ref
}
switch m.MountType {
case opspb.MountType_BIND:
// nothing to do here
case opspb.MountType_CACHE:
mRef, err := mm.MountableCache(ctx, toProtoMount(m), ref, g)
if err != nil {
return nil, err
// if mount creates an output
if m.Output != opspb.SkipOutput {
// if it is readonly and not root then output is the input
if m.Readonly && ref != nil && m.Dest != opspb.RootMount {
p.OutputRefs = append(p.OutputRefs, OutputRef{
MountIndex: i,
Ref: ref.Clone(),
})
} else {
// otherwise output and mount is the mutable child
active, err := makeMutable(m, ref)
if err != nil {
return p, err
}
mountable = active
p.OutputRefs = append(p.OutputRefs, OutputRef{
MountIndex: i,
Ref: active,
})
}
} else if (!m.Readonly || ref == nil) && m.Dest != opspb.RootMount {
// this case is empty readonly scratch without output that is not really useful for anything but don't error
active, err := makeMutable(m, ref)
if err != nil {
return p, err
}
p.Actives = append(p.Actives, active)
mountable = active
}
mountable = mRef
ctr.cleanup = append(ctr.cleanup, func() error {
return stack.Enable(mRef.Release(context.TODO()))
})
case opspb.MountType_CACHE:
active, err := mm.MountableCache(ctx, m, ref, g)
if err != nil {
return p, err
}
mountable = active
p.Actives = append(p.Actives, active)
if m.Output != opspb.SkipOutput && ref != nil {
p.OutputRefs = append(p.OutputRefs, OutputRef{
MountIndex: i,
Ref: ref.Clone(),
})
}
case opspb.MountType_TMPFS:
mountable = mm.MountableTmpFS()
case opspb.MountType_SECRET:
var err error
mountable, err = mm.MountableSecret(ctx, toProtoMount(m), g)
mountable, err = mm.MountableSecret(ctx, m, g)
if err != nil {
return nil, err
return p, err
}
if mountable == nil {
continue
}
case opspb.MountType_SSH:
var err error
mountable, err = mm.MountableSSH(ctx, toProtoMount(m), g)
mountable, err = mm.MountableSSH(ctx, m, g)
if err != nil {
return nil, err
return p, err
}
if mountable == nil {
continue
}
default:
return nil, errors.Errorf("mount type %s not implemented", m.MountType)
return p, errors.Errorf("mount type %s not implemented", m.MountType)
}
// validate that there is a mount
if mountable == nil {
return nil, errors.Errorf("mount %s has no input", m.Dest)
return p, errors.Errorf("mount %s has no input", m.Dest)
}
execMount := executor.Mount{
Src: mountableWithSession(mountable, g),
Selector: m.Selector,
Dest: m.Dest,
Readonly: m.Readonly,
// if dest is root we need mutable ref even if there is no output
if m.Dest == opspb.RootMount {
root := mountable
p.ReadonlyRootFS = m.Readonly
if m.Output == opspb.SkipOutput && p.ReadonlyRootFS {
active, err := makeMutable(m, ref)
if err != nil {
return p, err
}
p.Actives = append(p.Actives, active)
root = active
}
p.Root = mountWithSession(root, g)
} else {
mws := mountWithSession(mountable, g)
mws.Dest = m.Dest
mws.Readonly = m.Readonly
mws.Selector = m.Selector
p.Mounts = append(p.Mounts, mws)
}
ctr.mounts = append(ctr.mounts, execMount)
}
// sort mounts so parents are mounted first
sort.Slice(ctr.mounts, func(i, j int) bool {
return ctr.mounts[i].Dest < ctr.mounts[j].Dest
sort.Slice(p.Mounts, func(i, j int) bool {
return p.Mounts[i].Dest < p.Mounts[j].Dest
})
return ctr, nil
return p, nil
}
type gatewayContainer struct {
@ -337,15 +399,11 @@ func addDefaultEnvvar(env []string, k, v string) []string {
func mountWithSession(m cache.Mountable, g session.Group) executor.Mount {
_, readonly := m.(cache.ImmutableRef)
return executor.Mount{
Src: mountableWithSession(m, g),
Src: &mountable{m: m, g: g},
Readonly: readonly,
}
}
func mountableWithSession(m cache.Mountable, g session.Group) executor.Mountable {
return &mountable{m: m, g: g}
}
type mountable struct {
m cache.Mountable
g session.Group

View File

@ -5,7 +5,6 @@ import (
"sync"
cacheutil "github.com/moby/buildkit/cache/util"
clienttypes "github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/gateway"
@ -24,14 +23,14 @@ import (
fstypes "github.com/tonistiigi/fsutil/types"
)
func llbBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, workerInfos []clienttypes.WorkerInfo, sid string, sm *session.Manager) (*bridgeClient, error) {
func llbBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, w worker.Infos, sid string, sm *session.Manager) (*bridgeClient, error) {
return &bridgeClient{
opts: opts,
inputs: inputs,
FrontendLLBBridge: llbBridge,
sid: sid,
sm: sm,
workerInfos: workerInfos,
workers: w,
final: map[*ref]struct{}{},
workerRefByID: make(map[string]*worker.WorkerRef),
}, nil
@ -46,8 +45,8 @@ type bridgeClient struct {
sid string
sm *session.Manager
refs []*ref
workers worker.Infos
workerRefByID map[string]*worker.WorkerRef
workerInfos []clienttypes.WorkerInfo
}
func (c *bridgeClient) Solve(ctx context.Context, req client.SolveRequest) (*client.Result, error) {
@ -87,8 +86,8 @@ func (c *bridgeClient) Solve(ctx context.Context, req client.SolveRequest) (*cli
return cRes, nil
}
func (c *bridgeClient) BuildOpts() client.BuildOpts {
workers := make([]client.WorkerInfo, 0, len(c.workerInfos))
for _, w := range c.workerInfos {
workers := make([]client.WorkerInfo, 0, len(c.workers.WorkerInfos()))
for _, w := range c.workers.WorkerInfos() {
workers = append(workers, client.WorkerInfo{
ID: w.ID,
Labels: w.Labels,
@ -247,19 +246,26 @@ func (c *bridgeClient) NewContainer(ctx context.Context, req client.NewContainer
}
}
ctrReq.Mounts = append(ctrReq.Mounts, gateway.Mount{
Dest: m.Dest,
Selector: m.Selector,
Readonly: m.Readonly,
MountType: m.MountType,
WorkerRef: workerRef,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
Mount: &opspb.Mount{
Dest: m.Dest,
Selector: m.Selector,
Readonly: m.Readonly,
MountType: m.MountType,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
},
})
}
w, err := c.workers.GetDefault()
if err != nil {
return nil, err
}
group := session.NewGroup(c.sid)
ctr, err := gateway.NewContainer(ctx, c, c.sm, group, ctrReq)
ctr, err := gateway.NewContainer(ctx, w, c.sm, group, ctrReq)
if err != nil {
return nil, err
}

View File

@ -7,9 +7,10 @@ import (
"github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/worker"
)
func NewGatewayForwarder(w frontend.WorkerInfos, f client.BuildFunc) frontend.Frontend {
func NewGatewayForwarder(w worker.Infos, f client.BuildFunc) frontend.Frontend {
return &GatewayForwarder{
workers: w,
f: f,
@ -17,12 +18,12 @@ func NewGatewayForwarder(w frontend.WorkerInfos, f client.BuildFunc) frontend.Fr
}
type GatewayForwarder struct {
workers frontend.WorkerInfos
workers worker.Infos
f client.BuildFunc
}
func (gf *GatewayForwarder) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*pb.Definition, sid string, sm *session.Manager) (retRes *frontend.Result, retErr error) {
c, err := llbBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers.WorkerInfos(), sid, sm)
c, err := llbBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers, sid, sm)
if err != nil {
return nil, err
}

View File

@ -57,14 +57,14 @@ const (
keyDevel = "gateway-devel"
)
func NewGatewayFrontend(w frontend.WorkerInfos) frontend.Frontend {
func NewGatewayFrontend(w worker.Infos) frontend.Frontend {
return &gatewayFrontend{
workers: w,
}
}
type gatewayFrontend struct {
workers frontend.WorkerInfos
workers worker.Infos
}
func filterPrefix(opts map[string]string, pfx string) map[string]string {
@ -248,7 +248,12 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
}
defer lbf.Discard()
err = llbBridge.Run(ctx, "", mountWithSession(rootFS, session.NewGroup(sid)), nil, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil)
w, err := gf.workers.GetDefault()
if err != nil {
return nil, err
}
err = w.Executor().Run(ctx, "", mountWithSession(rootFS, session.NewGroup(sid)), nil, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil)
if err != nil {
if errors.Is(err, context.Canceled) && lbf.isErrServerClosed {
@ -330,11 +335,11 @@ func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) {
return lbf.result, nil
}
func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) LLBBridgeForwarder {
func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) LLBBridgeForwarder {
return newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm)
}
func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) *llbBridgeForwarder {
func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) *llbBridgeForwarder {
lbf := &llbBridgeForwarder{
callCtx: ctx,
llbBridge: llbBridge,
@ -351,7 +356,7 @@ func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridg
return lbf
}
func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) {
func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) {
ctx, cancel := context.WithCancel(ctx)
lbf := newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm)
server := grpc.NewServer(grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor), grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor))
@ -443,7 +448,7 @@ type llbBridgeForwarder struct {
doneCh chan struct{} // closed when result or err become valid through a call to a Return
result *frontend.Result
err error
workers frontend.WorkerInfos
workers worker.Infos
inputs map[string]*opspb.Definition
isErrServerClosed bool
sid string
@ -861,21 +866,29 @@ func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewConta
}
ctrReq.Mounts = append(ctrReq.Mounts, Mount{
Dest: m.Dest,
Selector: m.Selector,
Readonly: m.Readonly,
MountType: m.MountType,
WorkerRef: workerRef,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
Mount: &opspb.Mount{
Dest: m.Dest,
Selector: m.Selector,
Readonly: m.Readonly,
MountType: m.MountType,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
},
})
}
// Not using `ctx` here because it will get cancelled as soon as NewContainer returns
// and we want the context to live for the duration of the container.
group := session.NewGroup(lbf.sid)
ctr, err := NewContainer(context.Background(), lbf.llbBridge, lbf.sm, group, ctrReq)
w, err := lbf.workers.GetDefault()
if err != nil {
return nil, stack.Enable(err)
}
ctr, err := NewContainer(context.Background(), w, lbf.sm, group, ctrReq)
if err != nil {
return nil, stack.Enable(err)
}

View File

@ -11,7 +11,6 @@ import (
"github.com/mitchellh/hashstructure"
"github.com/moby/buildkit/cache/remotecache"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/frontend"
gw "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/session"
@ -19,7 +18,6 @@ import (
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/tracing"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
@ -263,28 +261,6 @@ func (rp *resultProxy) Result(ctx context.Context) (res solver.CachedResult, err
return nil, err
}
func (b *llbBridge) Run(ctx context.Context, id string, root executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
w, err := b.resolveWorker()
if err != nil {
return err
}
span, ctx := tracing.StartSpan(ctx, strings.Join(process.Meta.Args, " "))
err = w.Executor().Run(ctx, id, root, mounts, process, started)
tracing.FinishWithError(span, err)
return err
}
func (b *llbBridge) Exec(ctx context.Context, id string, process executor.ProcessInfo) (err error) {
w, err := b.resolveWorker()
if err != nil {
return err
}
span, ctx := tracing.StartSpan(ctx, strings.Join(process.Meta.Args, " "))
err = w.Executor().Exec(ctx, id, process)
tracing.FinishWithError(span, err)
return err
}
func (b *llbBridge) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (dgst digest.Digest, config []byte, err error) {
w, err := b.resolveWorker()
if err != nil {

View File

@ -15,8 +15,8 @@ import (
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/frontend/gateway"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
"github.com/moby/buildkit/solver/llbsolver/errdefs"
@ -214,148 +214,47 @@ func addDefaultEnvvar(env []string, k, v string) []string {
return append(env, k+"="+v)
}
func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Result) (refs []solver.Result, err error) {
var mounts []executor.Mount
var root cache.Mountable
var readonlyRootFS bool
var outputs []cache.Ref
defer func() {
for _, o := range outputs {
if o != nil {
go o.Release(context.TODO())
}
}
if err != nil {
err = errdefs.WithExecError(errors.Wrapf(err, "executor failed running %v", e.op.Meta.Args), inputs, refs)
}
}()
// loop over all mounts, fill in mounts, root and outputs
for _, m := range e.op.Mounts {
var mountable cache.Mountable
var ref cache.ImmutableRef
if m.Dest == pb.RootMount && m.MountType != pb.MountType_BIND {
return nil, errors.Errorf("invalid mount type %s for %s", m.MountType.String(), m.Dest)
}
// if mount is based on input validate and load it
if m.Input != pb.Empty {
if int(m.Input) > len(inputs) {
return nil, errors.Errorf("missing input %d", m.Input)
}
inp := inputs[int(m.Input)]
workerRef, ok := inp.Sys().(*worker.WorkerRef)
if !ok {
return nil, errors.Errorf("invalid reference for exec %T", inp.Sys())
}
ref = workerRef.ImmutableRef
mountable = ref
}
makeMutable := func(ref cache.ImmutableRef) (cache.MutableRef, error) {
desc := fmt.Sprintf("mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " "))
return e.cm.New(ctx, ref, g, cache.WithDescription(desc))
}
switch m.MountType {
case pb.MountType_BIND:
// if mount creates an output
if m.Output != pb.SkipOutput {
// if it is readonly and not root then output is the input
if m.Readonly && ref != nil && m.Dest != pb.RootMount {
outputs = append(outputs, ref.Clone())
} else {
// otherwise output and mount is the mutable child
active, err := makeMutable(ref)
if err != nil {
return nil, err
}
outputs = append(outputs, active)
mountable = active
}
} else if (!m.Readonly || ref == nil) && m.Dest != pb.RootMount {
// this case is empty readonly scratch without output that is not really useful for anything but don't error
active, err := makeMutable(ref)
if err != nil {
return nil, err
}
defer active.Release(context.TODO())
mountable = active
}
case pb.MountType_CACHE:
mRef, err := e.mm.MountableCache(ctx, m, ref, g)
if err != nil {
return nil, err
}
mountable = mRef
defer func() {
go mRef.Release(context.TODO())
}()
if m.Output != pb.SkipOutput && ref != nil {
outputs = append(outputs, ref.Clone())
}
case pb.MountType_TMPFS:
mountable = e.mm.MountableTmpFS()
case pb.MountType_SECRET:
var err error
mountable, err = e.mm.MountableSecret(ctx, m, g)
if err != nil {
return nil, err
}
if mountable == nil {
continue
}
case pb.MountType_SSH:
var err error
mountable, err = e.mm.MountableSSH(ctx, m, g)
if err != nil {
return nil, err
}
if mountable == nil {
continue
}
default:
return nil, errors.Errorf("mount type %s not implemented", m.MountType)
}
// validate that there is a mount
if mountable == nil {
return nil, errors.Errorf("mount %s has no input", m.Dest)
}
// if dest is root we need mutable ref even if there is no output
if m.Dest == pb.RootMount {
root = mountable
readonlyRootFS = m.Readonly
if m.Output == pb.SkipOutput && readonlyRootFS {
active, err := makeMutable(ref)
if err != nil {
return nil, err
}
defer func() {
go active.Release(context.TODO())
}()
root = active
}
} else {
mws := mountWithSession(mountable, g)
mws.Dest = m.Dest
mws.Readonly = m.Readonly
mws.Selector = m.Selector
mounts = append(mounts, mws)
func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Result) (results []solver.Result, err error) {
refs := make([]*worker.WorkerRef, len(inputs))
for i, inp := range inputs {
var ok bool
refs[i], ok = inp.Sys().(*worker.WorkerRef)
if !ok {
return nil, errors.Errorf("invalid reference for exec %T", inp.Sys())
}
}
// sort mounts so parents are mounted first
sort.Slice(mounts, func(i, j int) bool {
return mounts[i].Dest < mounts[j].Dest
p, err := gateway.PrepareMounts(ctx, e.mm, e.cm, g, e.op.Mounts, refs, func(m *pb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) {
desc := fmt.Sprintf("mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " "))
return e.cm.New(ctx, ref, g, cache.WithDescription(desc))
})
defer func() {
for i := len(p.Actives) - 1; i >= 0; i-- { // call in LIFO order
p.Actives[i].Release(context.TODO())
}
for _, o := range p.OutputRefs {
if o.Ref != nil {
o.Ref.Release(context.TODO())
}
}
if err != nil {
execInputs := make([]solver.Result, len(e.op.Mounts))
for i, m := range e.op.Mounts {
if m.Input == -1 {
continue
}
execInputs[i] = inputs[m.Input]
}
execOutputs := make([]solver.Result, len(e.op.Mounts))
for i, res := range results {
execOutputs[p.OutputRefs[i].MountIndex] = res
}
err = errdefs.WithExecError(err, execInputs, execOutputs)
}
}()
if err != nil {
return nil, err
}
extraHosts, err := parseExtraHosts(e.op.Meta.ExtraHosts)
if err != nil {
@ -366,7 +265,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
if err == nil && emu != nil {
e.op.Meta.Args = append([]string{qemuMountName}, e.op.Meta.Args...)
mounts = append(mounts, executor.Mount{
p.Mounts = append(p.Mounts, executor.Mount{
Readonly: true,
Src: emu,
Dest: qemuMountName,
@ -382,7 +281,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
Cwd: e.op.Meta.Cwd,
User: e.op.Meta.User,
Hostname: e.op.Meta.Hostname,
ReadonlyRootFS: readonlyRootFS,
ReadonlyRootFS: p.ReadonlyRootFS,
ExtraHosts: extraHosts,
NetMode: e.op.Network,
SecurityMode: e.op.Security,
@ -401,27 +300,27 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
defer stdout.Close()
defer stderr.Close()
execErr := e.exec.Run(ctx, "", mountWithSession(root, g), mounts, executor.ProcessInfo{
execErr := e.exec.Run(ctx, "", p.Root, p.Mounts, executor.ProcessInfo{
Meta: meta,
Stdin: nil,
Stdout: stdout,
Stderr: stderr,
}, nil)
for i, out := range outputs {
if mutable, ok := out.(cache.MutableRef); ok {
for i, out := range p.OutputRefs {
if mutable, ok := out.Ref.(cache.MutableRef); ok {
ref, err := mutable.Commit(ctx)
if err != nil {
return nil, errors.Wrapf(err, "error committing %s", mutable.ID())
}
refs = append(refs, worker.NewWorkerRefResult(ref, e.w))
results = append(results, worker.NewWorkerRefResult(ref, e.w))
} else {
refs = append(refs, worker.NewWorkerRefResult(out.(cache.ImmutableRef), e.w))
results = append(results, worker.NewWorkerRefResult(out.Ref.(cache.ImmutableRef), e.w))
}
outputs[i] = nil
// Prevent the result from being released.
p.OutputRefs[i].Ref = nil
}
return refs, execErr
return results, errors.Wrapf(execErr, "executor failed running %v", e.op.Meta.Args)
}
func proxyEnvList(p *pb.ProxyEnv) []string {
@ -455,20 +354,3 @@ func parseExtraHosts(ips []*pb.HostIP) ([]executor.HostIP, error) {
}
return out, nil
}
func mountWithSession(m cache.Mountable, g session.Group) executor.Mount {
_, readonly := m.(cache.ImmutableRef)
return executor.Mount{
Src: &mountable{m: m, g: g},
Readonly: readonly,
}
}
type mountable struct {
m cache.Mountable
g session.Group
}
func (m *mountable) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) {
return m.m.Mount(ctx, readonly, m.g)
}

View File

@ -39,6 +39,11 @@ type Worker interface {
MetadataStore() *metadata.Store
}
type Infos interface {
GetDefault() (Worker, error)
WorkerInfos() []client.WorkerInfo
}
// Pre-defined label keys
const (
labelPrefix = "org.mobyproject.buildkit.worker."