buildkit/frontend/gateway/gateway.go

1343 lines
34 KiB
Go

package gateway
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/docker/distribution/reference"
"github.com/gogo/googleapis/google/rpc"
gogotypes "github.com/gogo/protobuf/types"
"github.com/golang/protobuf/ptypes/any"
apitypes "github.com/moby/buildkit/api/types"
"github.com/moby/buildkit/cache"
cacheutil "github.com/moby/buildkit/cache/util"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/exporter/containerimage/exptypes"
"github.com/moby/buildkit/frontend"
gwclient "github.com/moby/buildkit/frontend/gateway/client"
pb "github.com/moby/buildkit/frontend/gateway/pb"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/errdefs"
llberrdefs "github.com/moby/buildkit/solver/llbsolver/errdefs"
opspb "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/apicaps"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/stack"
"github.com/moby/buildkit/util/tracing"
"github.com/moby/buildkit/worker"
"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/http2"
"golang.org/x/sync/errgroup"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)
const (
keySource = "source"
keyDevel = "gateway-devel"
)
func NewGatewayFrontend(w worker.Infos) frontend.Frontend {
return &gatewayFrontend{
workers: w,
}
}
type gatewayFrontend struct {
workers worker.Infos
}
func filterPrefix(opts map[string]string, pfx string) map[string]string {
m := map[string]string{}
for k, v := range opts {
if strings.HasPrefix(k, pfx) {
m[strings.TrimPrefix(k, pfx)] = v
}
}
return m
}
func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*frontend.Result, error) {
source, ok := opts[keySource]
if !ok {
return nil, errors.Errorf("no source specified for gateway")
}
_, isDevel := opts[keyDevel]
var img specs.Image
var mfstDigest digest.Digest
var rootFS cache.MutableRef
var readonly bool // TODO: try to switch to read-only by default.
if isDevel {
devRes, err := llbBridge.Solve(ctx,
frontend.SolveRequest{
Frontend: source,
FrontendOpt: filterPrefix(opts, "gateway-"),
FrontendInputs: inputs,
}, "gateway:"+sid)
if err != nil {
return nil, err
}
defer func() {
devRes.EachRef(func(ref solver.ResultProxy) error {
return ref.Release(context.TODO())
})
}()
if devRes.Ref == nil {
return nil, errors.Errorf("development gateway didn't return default result")
}
res, err := devRes.Ref.Result(ctx)
if err != nil {
return nil, err
}
workerRef, ok := res.Sys().(*worker.WorkerRef)
if !ok {
return nil, errors.Errorf("invalid ref: %T", res.Sys())
}
rootFS, err = workerRef.Worker.CacheManager().New(ctx, workerRef.ImmutableRef, session.NewGroup(sid))
if err != nil {
return nil, err
}
defer rootFS.Release(context.TODO())
config, ok := devRes.Metadata[exptypes.ExporterImageConfigKey]
if ok {
if err := json.Unmarshal(config, &img); err != nil {
return nil, err
}
}
} else {
sourceRef, err := reference.ParseNormalizedNamed(source)
if err != nil {
return nil, err
}
dgst, config, err := llbBridge.ResolveImageConfig(ctx, reference.TagNameOnly(sourceRef).String(), llb.ResolveImageConfigOpt{})
if err != nil {
return nil, err
}
mfstDigest = dgst
if err := json.Unmarshal(config, &img); err != nil {
return nil, err
}
if dgst != "" {
sourceRef, err = reference.WithDigest(sourceRef, dgst)
if err != nil {
return nil, err
}
}
src := llb.Image(sourceRef.String(), &markTypeFrontend{})
def, err := src.Marshal(ctx)
if err != nil {
return nil, err
}
res, err := llbBridge.Solve(ctx, frontend.SolveRequest{
Definition: def.ToPB(),
}, sid)
if err != nil {
return nil, err
}
defer func() {
res.EachRef(func(ref solver.ResultProxy) error {
return ref.Release(context.TODO())
})
}()
if res.Ref == nil {
return nil, errors.Errorf("gateway source didn't return default result")
}
r, err := res.Ref.Result(ctx)
if err != nil {
return nil, err
}
workerRef, ok := r.Sys().(*worker.WorkerRef)
if !ok {
return nil, errors.Errorf("invalid ref: %T", r.Sys())
}
rootFS, err = workerRef.Worker.CacheManager().New(ctx, workerRef.ImmutableRef, session.NewGroup(sid))
if err != nil {
return nil, err
}
defer rootFS.Release(context.TODO())
}
args := []string{"/run"}
env := []string{}
cwd := "/"
if img.Config.Env != nil {
env = img.Config.Env
}
if img.Config.Entrypoint != nil {
args = img.Config.Entrypoint
}
if img.Config.WorkingDir != "" {
cwd = img.Config.WorkingDir
}
i := 0
for k, v := range opts {
env = append(env, fmt.Sprintf("BUILDKIT_FRONTEND_OPT_%d", i)+"="+k+"="+v)
i++
}
env = append(env, "BUILDKIT_SESSION_ID="+sid)
dt, err := json.Marshal(gf.workers.WorkerInfos())
if err != nil {
return nil, errors.Wrap(err, "failed to marshal workers array")
}
env = append(env, "BUILDKIT_WORKERS="+string(dt))
env = append(env, "BUILDKIT_EXPORTEDPRODUCT="+apicaps.ExportedProduct)
meta := executor.Meta{
Env: env,
Args: args,
Cwd: cwd,
ReadonlyRootFS: readonly,
}
if v, ok := img.Config.Labels["moby.buildkit.frontend.network.none"]; ok {
if ok, _ := strconv.ParseBool(v); ok {
meta.NetMode = opspb.NetMode_NONE
}
}
curCaps := getCaps(img.Config.Labels["moby.buildkit.frontend.caps"])
addCapsForKnownFrontends(curCaps, mfstDigest)
reqCaps := getCaps(opts["frontend.caps"])
if len(inputs) > 0 {
reqCaps["moby.buildkit.frontend.inputs"] = struct{}{}
}
for c := range reqCaps {
if _, ok := curCaps[c]; !ok {
return nil, stack.Enable(grpcerrors.WrapCode(errdefs.NewUnsupportedFrontendCapError(c), codes.Unimplemented))
}
}
lbf, ctx, err := serveLLBBridgeForwarder(ctx, llbBridge, gf.workers, inputs, sid, sm)
defer lbf.conn.Close() //nolint
if err != nil {
return nil, err
}
defer lbf.Discard()
w, err := gf.workers.GetDefault()
if err != nil {
return nil, err
}
err = w.Executor().Run(ctx, "", mountWithSession(rootFS, session.NewGroup(sid)), nil, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil)
if err != nil {
if errors.Is(err, context.Canceled) && lbf.isErrServerClosed {
err = errors.Errorf("frontend grpc server closed unexpectedly")
}
// An existing error (set via Return rpc) takes
// precedence over this error, which in turn takes
// precedence over a success reported via Return.
lbf.mu.Lock()
if lbf.err == nil {
lbf.result = nil
lbf.err = err
}
lbf.mu.Unlock()
}
return lbf.Result()
}
func (lbf *llbBridgeForwarder) Discard() {
lbf.mu.Lock()
defer lbf.mu.Unlock()
for id, workerRef := range lbf.workerRefByID {
workerRef.ImmutableRef.Release(context.TODO())
delete(lbf.workerRefByID, id)
}
for id, r := range lbf.refs {
if lbf.err == nil && lbf.result != nil {
keep := false
lbf.result.EachRef(func(r2 solver.ResultProxy) error {
if r == r2 {
keep = true
}
return nil
})
if keep {
continue
}
}
r.Release(context.TODO())
delete(lbf.refs, id)
}
}
func (lbf *llbBridgeForwarder) Done() <-chan struct{} {
return lbf.doneCh
}
func (lbf *llbBridgeForwarder) setResult(r *frontend.Result, err error) (*pb.ReturnResponse, error) {
lbf.mu.Lock()
defer lbf.mu.Unlock()
if (r == nil) == (err == nil) {
return nil, errors.New("gateway return must be either result or err")
}
if lbf.result != nil || lbf.err != nil {
return nil, errors.New("gateway result is already set")
}
lbf.result = r
lbf.err = err
close(lbf.doneCh)
return &pb.ReturnResponse{}, nil
}
func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) {
lbf.mu.Lock()
defer lbf.mu.Unlock()
if lbf.result == nil && lbf.err == nil {
return nil, errors.New("no result for incomplete build")
}
if lbf.err != nil {
return nil, lbf.err
}
return lbf.result, nil
}
func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) LLBBridgeForwarder {
return newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm)
}
func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) *llbBridgeForwarder {
lbf := &llbBridgeForwarder{
callCtx: ctx,
llbBridge: llbBridge,
refs: map[string]solver.ResultProxy{},
workerRefByID: map[string]*worker.WorkerRef{},
doneCh: make(chan struct{}),
pipe: newPipe(),
workers: workers,
inputs: inputs,
sid: sid,
sm: sm,
ctrs: map[string]gwclient.Container{},
}
return lbf
}
func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) {
ctx, cancel := context.WithCancel(ctx)
lbf := newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm)
server := grpc.NewServer(grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor), grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor))
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
pb.RegisterLLBBridgeServer(server, lbf)
go func() {
serve(ctx, server, lbf.conn)
select {
case <-ctx.Done():
default:
lbf.isErrServerClosed = true
}
cancel()
}()
return lbf, ctx, nil
}
type pipe struct {
Stdin io.ReadCloser
Stdout io.WriteCloser
conn net.Conn
}
func newPipe() *pipe {
pr1, pw1, _ := os.Pipe()
pr2, pw2, _ := os.Pipe()
return &pipe{
Stdin: pr1,
Stdout: pw2,
conn: &conn{
Reader: pr2,
Writer: pw1,
Closer: pw2,
},
}
}
type conn struct {
io.Reader
io.Writer
io.Closer
}
func (s *conn) LocalAddr() net.Addr {
return dummyAddr{}
}
func (s *conn) RemoteAddr() net.Addr {
return dummyAddr{}
}
func (s *conn) SetDeadline(t time.Time) error {
return nil
}
func (s *conn) SetReadDeadline(t time.Time) error {
return nil
}
func (s *conn) SetWriteDeadline(t time.Time) error {
return nil
}
type dummyAddr struct {
}
func (d dummyAddr) Network() string {
return "pipe"
}
func (d dummyAddr) String() string {
return "localhost"
}
type LLBBridgeForwarder interface {
pb.LLBBridgeServer
Done() <-chan struct{}
Result() (*frontend.Result, error)
Discard()
}
type llbBridgeForwarder struct {
mu sync.Mutex
callCtx context.Context
llbBridge frontend.FrontendLLBBridge
refs map[string]solver.ResultProxy
workerRefByID map[string]*worker.WorkerRef
// lastRef solver.CachedResult
// lastRefs map[string]solver.CachedResult
// err error
doneCh chan struct{} // closed when result or err become valid through a call to a Return
result *frontend.Result
err error
workers worker.Infos
inputs map[string]*opspb.Definition
isErrServerClosed bool
sid string
sm *session.Manager
*pipe
ctrs map[string]gwclient.Container
ctrsMu sync.Mutex
}
func (lbf *llbBridgeForwarder) ResolveImageConfig(ctx context.Context, req *pb.ResolveImageConfigRequest) (*pb.ResolveImageConfigResponse, error) {
ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
var platform *specs.Platform
if p := req.Platform; p != nil {
platform = &specs.Platform{
OS: p.OS,
Architecture: p.Architecture,
Variant: p.Variant,
OSVersion: p.OSVersion,
OSFeatures: p.OSFeatures,
}
}
dgst, dt, err := lbf.llbBridge.ResolveImageConfig(ctx, req.Ref, llb.ResolveImageConfigOpt{
Platform: platform,
ResolveMode: req.ResolveMode,
LogName: req.LogName,
})
if err != nil {
return nil, err
}
return &pb.ResolveImageConfigResponse{
Digest: dgst,
Config: dt,
}, nil
}
func translateLegacySolveRequest(req *pb.SolveRequest) error {
// translates ImportCacheRefs to new CacheImports (v0.4.0)
for _, legacyImportRef := range req.ImportCacheRefsDeprecated {
im := &pb.CacheOptionsEntry{
Type: "registry",
Attrs: map[string]string{"ref": legacyImportRef},
}
// FIXME(AkihiroSuda): skip append if already exists
req.CacheImports = append(req.CacheImports, im)
}
req.ImportCacheRefsDeprecated = nil
return nil
}
func (lbf *llbBridgeForwarder) wrapSolveError(solveErr error) error {
var (
ee *llberrdefs.ExecError
fae *llberrdefs.FileActionError
sce *solver.SlowCacheError
inputIDs []string
mountIDs []string
subject errdefs.IsSolve_Subject
)
if errors.As(solveErr, &ee) {
var err error
inputIDs, err = lbf.registerResultIDs(ee.Inputs...)
if err != nil {
return err
}
mountIDs, err = lbf.registerResultIDs(ee.Mounts...)
if err != nil {
return err
}
}
if errors.As(solveErr, &fae) {
subject = fae.ToSubject()
}
if errors.As(solveErr, &sce) {
var err error
inputIDs, err = lbf.registerResultIDs(sce.Result)
if err != nil {
return err
}
subject = sce.ToSubject()
}
return errdefs.WithSolveError(solveErr, subject, inputIDs, mountIDs)
}
func (lbf *llbBridgeForwarder) registerResultIDs(results ...solver.Result) (ids []string, err error) {
lbf.mu.Lock()
defer lbf.mu.Unlock()
ids = make([]string, len(results))
for i, res := range results {
if res == nil {
continue
}
workerRef, ok := res.Sys().(*worker.WorkerRef)
if !ok {
return ids, errors.Errorf("unexpected type for result, got %T", res.Sys())
}
ids[i] = workerRef.ID()
lbf.workerRefByID[workerRef.ID()] = workerRef
}
return ids, nil
}
func (lbf *llbBridgeForwarder) Solve(ctx context.Context, req *pb.SolveRequest) (*pb.SolveResponse, error) {
if err := translateLegacySolveRequest(req); err != nil {
return nil, err
}
var cacheImports []frontend.CacheOptionsEntry
for _, e := range req.CacheImports {
cacheImports = append(cacheImports, frontend.CacheOptionsEntry{
Type: e.Type,
Attrs: e.Attrs,
})
}
ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
res, err := lbf.llbBridge.Solve(ctx, frontend.SolveRequest{
Evaluate: req.Evaluate,
Definition: req.Definition,
Frontend: req.Frontend,
FrontendOpt: req.FrontendOpt,
FrontendInputs: req.FrontendInputs,
CacheImports: cacheImports,
}, lbf.sid)
if err != nil {
return nil, lbf.wrapSolveError(err)
}
if len(res.Refs) > 0 && !req.AllowResultReturn {
// this should never happen because old client shouldn't make a map request
return nil, errors.Errorf("solve did not return default result")
}
pbRes := &pb.Result{
Metadata: res.Metadata,
}
var defaultID string
lbf.mu.Lock()
if res.Refs != nil {
ids := make(map[string]string, len(res.Refs))
defs := make(map[string]*opspb.Definition, len(res.Refs))
for k, ref := range res.Refs {
id := identity.NewID()
if ref == nil {
id = ""
} else {
lbf.refs[id] = ref
}
ids[k] = id
defs[k] = ref.Definition()
}
if req.AllowResultArrayRef {
refMap := make(map[string]*pb.Ref, len(res.Refs))
for k, id := range ids {
refMap[k] = &pb.Ref{Id: id, Def: defs[k]}
}
pbRes.Result = &pb.Result_Refs{Refs: &pb.RefMap{Refs: refMap}}
} else {
pbRes.Result = &pb.Result_RefsDeprecated{RefsDeprecated: &pb.RefMapDeprecated{Refs: ids}}
}
} else {
ref := res.Ref
id := identity.NewID()
var def *opspb.Definition
if ref == nil {
id = ""
} else {
def = ref.Definition()
lbf.refs[id] = ref
}
defaultID = id
if req.AllowResultArrayRef {
pbRes.Result = &pb.Result_Ref{Ref: &pb.Ref{Id: id, Def: def}}
} else {
pbRes.Result = &pb.Result_RefDeprecated{RefDeprecated: id}
}
}
lbf.mu.Unlock()
// compatibility mode for older clients
if req.Final {
exp := map[string][]byte{}
if err := json.Unmarshal(req.ExporterAttr, &exp); err != nil {
return nil, err
}
for k, v := range res.Metadata {
exp[k] = v
}
lbf.mu.Lock()
lbf.result = &frontend.Result{
Ref: lbf.refs[defaultID],
Metadata: exp,
}
lbf.mu.Unlock()
}
resp := &pb.SolveResponse{
Result: pbRes,
}
if !req.AllowResultReturn {
resp.Ref = defaultID
}
return resp, nil
}
func (lbf *llbBridgeForwarder) getImmutableRef(ctx context.Context, id, path string) (cache.ImmutableRef, error) {
lbf.mu.Lock()
ref, ok := lbf.refs[id]
lbf.mu.Unlock()
if !ok {
return nil, errors.Errorf("no such ref: %v", id)
}
if ref == nil {
return nil, errors.Wrapf(os.ErrNotExist, "%s not found", path)
}
r, err := ref.Result(ctx)
if err != nil {
return nil, lbf.wrapSolveError(err)
}
workerRef, ok := r.Sys().(*worker.WorkerRef)
if !ok {
return nil, errors.Errorf("invalid ref: %T", r.Sys())
}
return workerRef.ImmutableRef, nil
}
func (lbf *llbBridgeForwarder) ReadFile(ctx context.Context, req *pb.ReadFileRequest) (*pb.ReadFileResponse, error) {
ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
ref, err := lbf.getImmutableRef(ctx, req.Ref, req.FilePath)
if err != nil {
return nil, err
}
newReq := cacheutil.ReadRequest{
Filename: req.FilePath,
}
if r := req.Range; r != nil {
newReq.Range = &cacheutil.FileRange{
Offset: int(r.Offset),
Length: int(r.Length),
}
}
m, err := ref.Mount(ctx, true, session.NewGroup(lbf.sid))
if err != nil {
return nil, err
}
dt, err := cacheutil.ReadFile(ctx, m, newReq)
if err != nil {
return nil, lbf.wrapSolveError(err)
}
return &pb.ReadFileResponse{Data: dt}, nil
}
func (lbf *llbBridgeForwarder) ReadDir(ctx context.Context, req *pb.ReadDirRequest) (*pb.ReadDirResponse, error) {
ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
ref, err := lbf.getImmutableRef(ctx, req.Ref, req.DirPath)
if err != nil {
return nil, err
}
newReq := cacheutil.ReadDirRequest{
Path: req.DirPath,
IncludePattern: req.IncludePattern,
}
m, err := ref.Mount(ctx, true, session.NewGroup(lbf.sid))
if err != nil {
return nil, err
}
entries, err := cacheutil.ReadDir(ctx, m, newReq)
if err != nil {
return nil, lbf.wrapSolveError(err)
}
return &pb.ReadDirResponse{Entries: entries}, nil
}
func (lbf *llbBridgeForwarder) StatFile(ctx context.Context, req *pb.StatFileRequest) (*pb.StatFileResponse, error) {
ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
ref, err := lbf.getImmutableRef(ctx, req.Ref, req.Path)
if err != nil {
return nil, err
}
m, err := ref.Mount(ctx, true, session.NewGroup(lbf.sid))
if err != nil {
return nil, err
}
st, err := cacheutil.StatFile(ctx, m, req.Path)
if err != nil {
return nil, err
}
return &pb.StatFileResponse{Stat: st}, nil
}
func (lbf *llbBridgeForwarder) Ping(context.Context, *pb.PingRequest) (*pb.PongResponse, error) {
workers := lbf.workers.WorkerInfos()
pbWorkers := make([]*apitypes.WorkerRecord, 0, len(workers))
for _, w := range workers {
pbWorkers = append(pbWorkers, &apitypes.WorkerRecord{
ID: w.ID,
Labels: w.Labels,
Platforms: opspb.PlatformsFromSpec(w.Platforms),
})
}
return &pb.PongResponse{
FrontendAPICaps: pb.Caps.All(),
Workers: pbWorkers,
LLBCaps: opspb.Caps.All(),
}, nil
}
func (lbf *llbBridgeForwarder) Return(ctx context.Context, in *pb.ReturnRequest) (*pb.ReturnResponse, error) {
if in.Error != nil {
return lbf.setResult(nil, grpcerrors.FromGRPC(status.ErrorProto(&spb.Status{
Code: in.Error.Code,
Message: in.Error.Message,
Details: convertGogoAny(in.Error.Details),
})))
}
r := &frontend.Result{
Metadata: in.Result.Metadata,
}
switch res := in.Result.Result.(type) {
case *pb.Result_RefDeprecated:
ref, err := lbf.convertRef(res.RefDeprecated)
if err != nil {
return nil, err
}
r.Ref = ref
case *pb.Result_RefsDeprecated:
m := map[string]solver.ResultProxy{}
for k, id := range res.RefsDeprecated.Refs {
ref, err := lbf.convertRef(id)
if err != nil {
return nil, err
}
m[k] = ref
}
r.Refs = m
case *pb.Result_Ref:
ref, err := lbf.convertRef(res.Ref.Id)
if err != nil {
return nil, err
}
r.Ref = ref
case *pb.Result_Refs:
m := map[string]solver.ResultProxy{}
for k, ref := range res.Refs.Refs {
ref, err := lbf.convertRef(ref.Id)
if err != nil {
return nil, err
}
m[k] = ref
}
r.Refs = m
}
return lbf.setResult(r, nil)
}
func (lbf *llbBridgeForwarder) Inputs(ctx context.Context, in *pb.InputsRequest) (*pb.InputsResponse, error) {
return &pb.InputsResponse{
Definitions: lbf.inputs,
}, nil
}
func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewContainerRequest) (_ *pb.NewContainerResponse, err error) {
logrus.Debugf("|<--- NewContainer %s", in.ContainerID)
ctrReq := NewContainerRequest{
ContainerID: in.ContainerID,
NetMode: in.Network,
Platform: in.Platform,
Constraints: in.Constraints,
}
for _, m := range in.Mounts {
var workerRef *worker.WorkerRef
if m.ResultID != "" {
var ok bool
workerRef, ok = lbf.workerRefByID[m.ResultID]
if !ok {
refProxy, err := lbf.convertRef(m.ResultID)
if err != nil {
return nil, errors.Wrapf(err, "failed to find ref %s for %q mount", m.ResultID, m.Dest)
}
res, err := refProxy.Result(ctx)
if err != nil {
return nil, stack.Enable(err)
}
workerRef, ok = res.Sys().(*worker.WorkerRef)
if !ok {
return nil, errors.Errorf("invalid reference %T", res.Sys())
}
}
}
ctrReq.Mounts = append(ctrReq.Mounts, Mount{
WorkerRef: workerRef,
Mount: &opspb.Mount{
Dest: m.Dest,
Selector: m.Selector,
Readonly: m.Readonly,
MountType: m.MountType,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
},
})
}
// Not using `ctx` here because it will get cancelled as soon as NewContainer returns
// and we want the context to live for the duration of the container.
group := session.NewGroup(lbf.sid)
w, err := lbf.workers.GetDefault()
if err != nil {
return nil, stack.Enable(err)
}
ctr, err := NewContainer(context.Background(), w, lbf.sm, group, ctrReq)
if err != nil {
return nil, stack.Enable(err)
}
defer func() {
if err != nil {
ctr.Release(ctx) // ensure release on error
}
}()
lbf.ctrsMu.Lock()
defer lbf.ctrsMu.Unlock()
// ensure we are not clobbering a dup container id request
if _, ok := lbf.ctrs[in.ContainerID]; ok {
return nil, stack.Enable(status.Errorf(codes.AlreadyExists, "Container %s already exists", in.ContainerID))
}
lbf.ctrs[in.ContainerID] = ctr
return &pb.NewContainerResponse{}, nil
}
func (lbf *llbBridgeForwarder) ReleaseContainer(ctx context.Context, in *pb.ReleaseContainerRequest) (*pb.ReleaseContainerResponse, error) {
logrus.Debugf("|<--- ReleaseContainer %s", in.ContainerID)
lbf.ctrsMu.Lock()
ctr, ok := lbf.ctrs[in.ContainerID]
delete(lbf.ctrs, in.ContainerID)
lbf.ctrsMu.Unlock()
if !ok {
return nil, errors.Errorf("container details for %s not found", in.ContainerID)
}
err := ctr.Release(ctx)
return &pb.ReleaseContainerResponse{}, stack.Enable(err)
}
type processIO struct {
id string
mu sync.Mutex
resize func(context.Context, gwclient.WinSize) error
done chan struct{}
doneOnce sync.Once
// these track the process side of the io pipe for
// read (fd=0) and write (fd=1, fd=2)
processReaders map[uint32]io.ReadCloser
processWriters map[uint32]io.WriteCloser
// these track the server side of the io pipe, so
// when we receive an EOF over grpc, we will close
// this end
serverWriters map[uint32]io.WriteCloser
serverReaders map[uint32]io.ReadCloser
}
func newProcessIO(id string, openFds []uint32) *processIO {
pio := &processIO{
id: id,
processReaders: map[uint32]io.ReadCloser{},
processWriters: map[uint32]io.WriteCloser{},
serverReaders: map[uint32]io.ReadCloser{},
serverWriters: map[uint32]io.WriteCloser{},
done: make(chan struct{}),
}
for _, fd := range openFds {
// TODO do we know which way to pipe each fd? For now assume fd0 is for
// reading, and the rest are for writing
r, w := io.Pipe()
if fd == 0 {
pio.processReaders[fd] = r
pio.serverWriters[fd] = w
} else {
pio.processWriters[fd] = w
pio.serverReaders[fd] = r
}
}
return pio
}
func (pio *processIO) Close() (err error) {
pio.mu.Lock()
defer pio.mu.Unlock()
for fd, r := range pio.processReaders {
delete(pio.processReaders, fd)
err1 := r.Close()
if err1 != nil && err == nil {
err = stack.Enable(err1)
}
}
for fd, w := range pio.serverReaders {
delete(pio.serverReaders, fd)
err1 := w.Close()
if err1 != nil && err == nil {
err = stack.Enable(err1)
}
}
pio.Done()
return err
}
func (pio *processIO) Done() {
stillOpen := len(pio.processReaders) + len(pio.processWriters) + len(pio.serverReaders) + len(pio.serverWriters)
if stillOpen == 0 {
pio.doneOnce.Do(func() {
close(pio.done)
})
}
}
func (pio *processIO) Write(f *pb.FdMessage) (err error) {
pio.mu.Lock()
writer := pio.serverWriters[f.Fd]
pio.mu.Unlock()
if writer == nil {
return status.Errorf(codes.OutOfRange, "fd %d unavailable to write", f.Fd)
}
defer func() {
if err != nil || f.EOF {
writer.Close()
pio.mu.Lock()
defer pio.mu.Unlock()
delete(pio.serverWriters, f.Fd)
pio.Done()
}
}()
if len(f.Data) > 0 {
_, err = writer.Write(f.Data)
return stack.Enable(err)
}
return nil
}
type outputWriter struct {
stream pb.LLBBridge_ExecProcessServer
fd uint32
processID string
}
func (w *outputWriter) Write(msg []byte) (int, error) {
logrus.Debugf("|---> File Message %s, fd=%d, %d bytes", w.processID, w.fd, len(msg))
err := w.stream.Send(&pb.ExecMessage{
ProcessID: w.processID,
Input: &pb.ExecMessage_File{
File: &pb.FdMessage{
Fd: w.fd,
Data: msg,
},
},
})
return len(msg), stack.Enable(err)
}
func (lbf *llbBridgeForwarder) ExecProcess(srv pb.LLBBridge_ExecProcessServer) error {
eg, ctx := errgroup.WithContext(srv.Context())
msgs := make(chan *pb.ExecMessage)
eg.Go(func() error {
defer close(msgs)
for {
execMsg, err := srv.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return stack.Enable(err)
}
switch m := execMsg.GetInput().(type) {
case *pb.ExecMessage_Init:
logrus.Debugf("|<--- Init Message %s", execMsg.ProcessID)
case *pb.ExecMessage_File:
if m.File.EOF {
logrus.Debugf("|<--- File Message %s, fd=%d, EOF", execMsg.ProcessID, m.File.Fd)
} else {
logrus.Debugf("|<--- File Message %s, fd=%d, %d bytes", execMsg.ProcessID, m.File.Fd, len(m.File.Data))
}
case *pb.ExecMessage_Resize:
logrus.Debugf("|<--- Resize Message %s", execMsg.ProcessID)
}
select {
case <-ctx.Done():
case msgs <- execMsg:
}
}
})
eg.Go(func() error {
pios := make(map[string]*processIO)
// close any stray pios on exit to make sure
// all the associated resources get cleaned up
defer func() {
for _, pio := range pios {
pio.Close()
}
}()
for {
var execMsg *pb.ExecMessage
select {
case <-ctx.Done():
return nil
case execMsg = <-msgs:
}
if execMsg == nil {
return nil
}
pid := execMsg.ProcessID
if pid == "" {
return stack.Enable(status.Errorf(codes.InvalidArgument, "ProcessID required"))
}
pio, pioFound := pios[pid]
if data := execMsg.GetFile(); data != nil {
if !pioFound {
return stack.Enable(status.Errorf(codes.NotFound, "IO for process %q not found", pid))
}
err := pio.Write(data)
if err != nil {
return stack.Enable(err)
}
} else if resize := execMsg.GetResize(); resize != nil {
if !pioFound {
return stack.Enable(status.Errorf(codes.NotFound, "IO for process %q not found", pid))
}
pio.resize(ctx, gwclient.WinSize{
Cols: resize.Cols,
Rows: resize.Rows,
})
} else if init := execMsg.GetInit(); init != nil {
if pioFound {
return stack.Enable(status.Errorf(codes.AlreadyExists, "Process %s already exists", pid))
}
id := init.ContainerID
lbf.ctrsMu.Lock()
ctr, ok := lbf.ctrs[id]
lbf.ctrsMu.Unlock()
if !ok {
return stack.Enable(status.Errorf(codes.NotFound, "container %q previously released or not created", id))
}
initCtx, initCancel := context.WithCancel(context.Background())
defer initCancel()
pio := newProcessIO(pid, init.Fds)
pios[pid] = pio
proc, err := ctr.Start(initCtx, gwclient.StartRequest{
Args: init.Meta.Args,
Env: init.Meta.Env,
User: init.Meta.User,
Cwd: init.Meta.Cwd,
Tty: init.Tty,
Stdin: pio.processReaders[0],
Stdout: pio.processWriters[1],
Stderr: pio.processWriters[2],
})
if err != nil {
return stack.Enable(err)
}
pio.resize = proc.Resize
eg.Go(func() error {
<-pio.done
logrus.Debugf("|---> Done Message %s", pid)
err := srv.Send(&pb.ExecMessage{
ProcessID: pid,
Input: &pb.ExecMessage_Done{
Done: &pb.DoneMessage{},
},
})
return stack.Enable(err)
})
eg.Go(func() error {
defer func() {
pio.Close()
}()
err := proc.Wait()
var statusCode uint32
var exitError *errdefs.ExitError
var statusError *rpc.Status
if err != nil {
statusCode = errdefs.ContainerdUnknownExitStatus
st, _ := status.FromError(grpcerrors.ToGRPC(err))
stp := st.Proto()
statusError = &rpc.Status{
Code: stp.Code,
Message: stp.Message,
Details: convertToGogoAny(stp.Details),
}
}
if errors.As(err, &exitError) {
statusCode = exitError.ExitCode
}
logrus.Debugf("|---> Exit Message %s, code=%d, error=%s", pid, statusCode, err)
sendErr := srv.Send(&pb.ExecMessage{
ProcessID: pid,
Input: &pb.ExecMessage_Exit{
Exit: &pb.ExitMessage{
Code: statusCode,
Error: statusError,
},
},
})
if sendErr != nil && err != nil {
return errors.Wrap(sendErr, err.Error())
} else if sendErr != nil {
return stack.Enable(sendErr)
}
if err != nil && statusCode != 0 {
// this was a container exit error which is "normal" so
// don't return this error from the errgroup
return nil
}
return stack.Enable(err)
})
logrus.Debugf("|---> Started Message %s", pid)
err = srv.Send(&pb.ExecMessage{
ProcessID: pid,
Input: &pb.ExecMessage_Started{
Started: &pb.StartedMessage{},
},
})
if err != nil {
return stack.Enable(err)
}
// start sending Fd output back to client, this is done after
// StartedMessage so that Fd output will not potentially arrive
// to the client before "Started" as the container starts up.
for fd, file := range pio.serverReaders {
fd, file := fd, file
eg.Go(func() error {
defer func() {
file.Close()
pio.mu.Lock()
defer pio.mu.Unlock()
w := pio.processWriters[fd]
if w != nil {
w.Close()
}
delete(pio.processWriters, fd)
pio.Done()
}()
dest := &outputWriter{
stream: srv,
fd: uint32(fd),
processID: pid,
}
_, err := io.Copy(dest, file)
// ignore ErrClosedPipe, it is EOF for our usage.
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
return stack.Enable(err)
}
// no error so must be EOF
logrus.Debugf("|---> File Message %s, fd=%d, EOF", pid, fd)
err = srv.Send(&pb.ExecMessage{
ProcessID: pid,
Input: &pb.ExecMessage_File{
File: &pb.FdMessage{
Fd: uint32(fd),
EOF: true,
},
},
})
return stack.Enable(err)
})
}
}
}
})
err := eg.Wait()
return stack.Enable(err)
}
func (lbf *llbBridgeForwarder) convertRef(id string) (solver.ResultProxy, error) {
if id == "" {
return nil, nil
}
lbf.mu.Lock()
defer lbf.mu.Unlock()
r, ok := lbf.refs[id]
if !ok {
return nil, errors.Errorf("return reference %s not found", id)
}
return r, nil
}
func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) {
go func() {
<-ctx.Done()
conn.Close()
}()
logrus.Debugf("serving grpc connection")
(&http2.Server{}).ServeConn(conn, &http2.ServeConnOpts{Handler: grpcServer})
}
type markTypeFrontend struct{}
func (*markTypeFrontend) SetImageOption(ii *llb.ImageInfo) {
ii.RecordType = string(client.UsageRecordTypeFrontend)
}
func convertGogoAny(in []*gogotypes.Any) []*any.Any {
out := make([]*any.Any, len(in))
for i := range in {
out[i] = &any.Any{TypeUrl: in[i].TypeUrl, Value: in[i].Value}
}
return out
}
func convertToGogoAny(in []*any.Any) []*gogotypes.Any {
out := make([]*gogotypes.Any, len(in))
for i := range in {
out[i] = &gogotypes.Any{TypeUrl: in[i].TypeUrl, Value: in[i].Value}
}
return out
}
func getCaps(label string) map[string]struct{} {
if label == "" {
return make(map[string]struct{})
}
caps := strings.Split(label, ",")
out := make(map[string]struct{}, len(caps))
for _, c := range caps {
name := strings.SplitN(c, "+", 2)
if name[0] != "" {
out[name[0]] = struct{}{}
}
}
return out
}
func addCapsForKnownFrontends(caps map[string]struct{}, dgst digest.Digest) {
// these frontends were built without caps detection but do support inputs
defaults := map[digest.Digest]struct{}{
"sha256:9ac1c43a60e31dca741a6fe8314130a9cd4c4db0311fbbc636ff992ef60ae76d": {}, // docker/dockerfile:1.1.6
"sha256:080bd74d8778f83e7b670de193362d8c593c8b14f5c8fb919d28ee8feda0d069": {}, // docker/dockerfile:1.1.7
"sha256:60543a9d92b92af5088fb2938fb09b2072684af8384399e153e137fe081f8ab4": {}, // docker/dockerfile:1.1.6-experimental
"sha256:de85b2f3a3e8a2f7fe48e8e84a65f6fdd5cd5183afa6412fff9caa6871649c44": {}, // docker/dockerfile:1.1.7-experimental
}
if _, ok := defaults[dgst]; ok {
caps["moby.buildkit.frontend.inputs"] = struct{}{}
}
}