Merge pull request #220 from AkihiroSuda/split-solver-1

solver: split llbop pkg
docker-18.09
Tõnis Tiigi 2017-12-18 12:18:00 -08:00 committed by GitHub
commit 1c0b8acf6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 559 additions and 416 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/containerd/containerd/sys"
"github.com/docker/go-connections/sockets"
"github.com/moby/buildkit/cache/cacheimport"
"github.com/moby/buildkit/control"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/dockerfile"
@ -19,6 +20,7 @@ import (
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/worker"
"github.com/moby/buildkit/worker/base"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
@ -33,7 +35,7 @@ type workerInitializerOpt struct {
}
type workerInitializer struct {
fn func(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error)
fn func(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error)
// less priority number, more preferred
priority int
}
@ -227,10 +229,28 @@ func newController(c *cli.Context, root string) (*control.Controller, error) {
frontends := map[string]frontend.Frontend{}
frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend()
frontends["gateway.v0"] = gateway.NewGatewayFrontend()
// cache exporter and importer are manager concepts but as there is no
// way to pull data into specific worker yet we currently set them up
// as part of default worker
var ce *cacheimport.CacheExporter
var ci *cacheimport.CacheImporter
w, err := wc.GetDefault()
if err != nil {
return nil, err
}
wt := w.(*base.Worker)
ce = wt.CacheExporter
ci = wt.CacheImporter
return control.NewController(control.Opt{
SessionManager: sessionManager,
WorkerController: wc,
Frontends: frontends,
CacheExporter: ce,
CacheImporter: ci,
})
}
@ -242,7 +262,7 @@ func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Co
return nil, err
}
for _, w := range ws {
logrus.Infof("Found worker %q", w.Name)
logrus.Infof("found worker %q", w.Name())
if err = wc.Add(w); err != nil {
return nil, err
}
@ -256,7 +276,7 @@ func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Co
if err != nil {
return nil, err
}
logrus.Infof("Found %d workers, default=%q", nWorkers, defaultWorker.Name)
logrus.Warn("Currently, only the default worker can be used.")
logrus.Infof("found %d workers, default=%q", nWorkers, defaultWorker.Name)
logrus.Warn("currently, only the default worker can be used.")
return wc, nil
}

View File

@ -8,6 +8,7 @@ import (
ctd "github.com/containerd/containerd"
"github.com/moby/buildkit/worker"
"github.com/moby/buildkit/worker/base"
"github.com/moby/buildkit/worker/containerd"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
@ -33,7 +34,7 @@ func init() {
// TODO(AkihiroSuda): allow using multiple snapshotters. should be useful for some applications that does not work with the default overlay snapshotter. e.g. mysql (docker/for-linux#72)",
}
func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) {
func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) {
socket := c.GlobalString("containerd-worker-addr")
boolOrAuto, err := parseBoolOrAuto(c.GlobalString("containerd-worker"))
if err != nil {
@ -47,11 +48,11 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
return nil, err
}
opt.SessionManager = common.sessionManager
w, err := worker.NewWorker(opt)
w, err := base.NewWorker(opt)
if err != nil {
return nil, err
}
return []*worker.Worker{w}, nil
return []worker.Worker{w}, nil
}
func validContainerdSocket(socket string) bool {

View File

@ -6,6 +6,7 @@ import (
"os/exec"
"github.com/moby/buildkit/worker"
"github.com/moby/buildkit/worker/base"
"github.com/moby/buildkit/worker/runc"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
@ -25,7 +26,7 @@ func init() {
// TODO: allow multiple oci runtimes and snapshotters
}
func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) {
func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) {
boolOrAuto, err := parseBoolOrAuto(c.GlobalString("oci-worker"))
if err != nil {
return nil, err
@ -38,11 +39,11 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worke
return nil, err
}
opt.SessionManager = common.sessionManager
w, err := worker.NewWorker(opt)
w, err := base.NewWorker(opt)
if err != nil {
return nil, err
}
return []*worker.Worker{w}, nil
return []worker.Worker{w}, nil
}
func validOCIBinary() bool {

View File

@ -3,6 +3,7 @@ package control
import (
"github.com/docker/distribution/reference"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/buildkit/cache/cacheimport"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/frontend"
@ -21,6 +22,8 @@ type Opt struct {
SessionManager *session.Manager
WorkerController *worker.Controller
Frontends map[string]frontend.Frontend
CacheExporter *cacheimport.CacheExporter
CacheImporter *cacheimport.CacheImporter
}
type Controller struct { // TODO: ControlService
@ -30,8 +33,13 @@ type Controller struct { // TODO: ControlService
func NewController(opt Opt) (*Controller, error) {
c := &Controller{
opt: opt,
solver: solver.NewLLBSolver(opt.WorkerController, opt.Frontends),
opt: opt,
solver: solver.NewLLBOpSolver(solver.LLBOpt{
WorkerController: opt.WorkerController,
Frontends: opt.Frontends,
CacheExporter: opt.CacheExporter,
CacheImporter: opt.CacheImporter,
}),
}
return c, nil
}
@ -44,7 +52,7 @@ func (c *Controller) Register(server *grpc.Server) error {
func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) {
resp := &controlapi.DiskUsageResponse{}
for _, w := range c.opt.WorkerController.GetAll() {
du, err := w.CacheManager.DiskUsage(ctx, client.DiskUsageInfo{
du, err := w.DiskUsage(ctx, client.DiskUsageInfo{
Filter: r.Filter,
})
if err != nil {
@ -89,9 +97,9 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
return nil, err
}
if req.Exporter != "" {
exp, ok := w.Exporters[req.Exporter]
if !ok {
return nil, errors.Errorf("exporter %q could not be found", req.Exporter)
exp, err := w.Exporter(req.Exporter)
if err != nil {
return nil, err
}
expi, err = exp.Resolve(ctx, req.ExporterAttrs)
if err != nil {

View File

@ -162,7 +162,7 @@ type job struct {
type cacheRecord struct {
VertexSolver
index Index
ref Reference
ref Ref
}
func (j *job) load(def *pb.Definition, resolveOp ResolveOpFunc) (*Input, error) {
@ -225,7 +225,7 @@ func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*Input,
if err != nil {
return nil, err
}
return &Input{Vertex: vtx.(*vertex), Index: idx}, nil
return &Input{Vertex: vtx.(*vertex), Index: Index(idx)}, nil
}
func (j *job) discard() {
@ -253,12 +253,12 @@ func (j *job) getSolver(dgst digest.Digest) (VertexSolver, error) {
return st.solver, nil
}
func (j *job) getRef(ctx context.Context, v *vertex, index Index) (Reference, error) {
s, err := j.getSolver(v.Digest())
func (j *job) getRef(ctx context.Context, cv client.Vertex, index Index) (Ref, error) {
s, err := j.getSolver(cv.Digest)
if err != nil {
return nil, err
}
ref, err := getRef(ctx, s, v, index, j.cache)
ref, err := getRef(ctx, s, cv, index, j.cache)
if err != nil {
return nil, err
}
@ -266,15 +266,15 @@ func (j *job) getRef(ctx context.Context, v *vertex, index Index) (Reference, er
return ref, nil
}
func (j *job) keepCacheRef(s VertexSolver, index Index, ref Reference) {
immutable, ok := toImmutableRef(ref)
func (j *job) keepCacheRef(s VertexSolver, index Index, ref Ref) {
immutable, ok := ToImmutableRef(ref)
if ok {
j.cached[immutable.ID()] = &cacheRecord{s, index, ref}
}
}
func (j *job) cacheExporter(ref Reference) (CacheExporter, error) {
immutable, ok := toImmutableRef(ref)
func (j *job) cacheExporter(ref Ref) (CacheExporter, error) {
immutable, ok := ToImmutableRef(ref)
if !ok {
return nil, errors.Errorf("invalid reference")
}
@ -285,7 +285,7 @@ func (j *job) cacheExporter(ref Reference) (CacheExporter, error) {
return cr.Cache(cr.index, cr.ref), nil
}
func getRef(ctx context.Context, s VertexSolver, v *vertex, index Index, cache instructioncache.InstructionCache) (Reference, error) {
func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index Index, cache instructioncache.InstructionCache) (Ref, error) {
k, err := s.CacheKey(ctx, index)
if err != nil {
return nil, err
@ -295,8 +295,8 @@ func getRef(ctx context.Context, s VertexSolver, v *vertex, index Index, cache i
return nil, err
}
if ref != nil {
markCached(ctx, v.clientVertex)
return ref.(Reference), nil
markCached(ctx, cv)
return ref.(Ref), nil
}
ev, err := s.OutputEvaluator(index)
@ -316,8 +316,8 @@ func getRef(ctx context.Context, s VertexSolver, v *vertex, index Index, cache i
return nil, err
}
if ref != nil {
markCached(ctx, v.clientVertex)
return ref.(Reference), nil
markCached(ctx, cv)
return ref.(Ref), nil
}
continue
}

View File

@ -1,10 +1,7 @@
package solver
import (
"io"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
@ -17,7 +14,7 @@ type llbBridge struct {
*Solver
job *job
// this worker is used for running containerized frontend, not vertices
worker *worker.Worker
worker.Worker
}
type resolveImageConfig interface {
@ -45,27 +42,9 @@ func (s *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (cache
if err != nil {
return nil, nil, err
}
immutable, ok := toImmutableRef(ref)
immutable, ok := ToImmutableRef(ref)
if !ok {
return nil, nil, errors.Errorf("invalid reference for exporting: %T", ref)
}
return immutable, exp, nil
}
func (s *llbBridge) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) {
// ImageSource is typically source/containerimage
resolveImageConfig, ok := s.worker.ImageSource.(resolveImageConfig)
if !ok {
return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", s.worker.Name)
}
return resolveImageConfig.ResolveImageConfig(ctx, ref)
}
func (s *llbBridge) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
active, err := s.worker.CacheManager.New(ctx, rootFS)
if err != nil {
return err
}
defer active.Release(context.TODO())
return s.worker.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr)
}

View File

@ -1,4 +1,4 @@
package solver
package llbop
import (
"encoding/json"
@ -7,7 +7,9 @@ import (
"github.com/containerd/containerd/fs"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
@ -17,11 +19,11 @@ const buildCacheType = "buildkit.build.v0"
type buildOp struct {
op *pb.BuildOp
s *Solver
v Vertex
s worker.SubBuilder
v solver.Vertex
}
func newBuildOp(v Vertex, op *pb.Op_Build, s *Solver) (Op, error) {
func NewBuildOp(v solver.Vertex, op *pb.Op_Build, s worker.SubBuilder) (solver.Op, error) {
return &buildOp{
op: op.Build,
s: s,
@ -43,7 +45,7 @@ func (b *buildOp) CacheKey(ctx context.Context) (digest.Digest, error) {
return digest.FromBytes(dt), nil
}
func (b *buildOp) Run(ctx context.Context, inputs []Reference) (outputs []Reference, retErr error) {
func (b *buildOp) Run(ctx context.Context, inputs []solver.Ref) (outputs []solver.Ref, retErr error) {
if b.op.Builder != pb.LLBBuilder {
return nil, errors.Errorf("only llb builder is currently allowed")
}
@ -60,7 +62,7 @@ func (b *buildOp) Run(ctx context.Context, inputs []Reference) (outputs []Refere
}
inp := inputs[i]
ref, ok := toImmutableRef(inp)
ref, ok := solver.ToImmutableRef(inp)
if !ok {
return nil, errors.Errorf("invalid reference for build %T", inp)
}
@ -107,14 +109,14 @@ func (b *buildOp) Run(ctx context.Context, inputs []Reference) (outputs []Refere
lm.Unmount()
lm = nil
newref, err := b.s.subBuild(ctx, b.v.Digest(), SolveRequest{
newref, err := b.s.SubBuild(ctx, b.v.Digest(), solver.SolveRequest{
Definition: def.ToPB(),
})
if err != nil {
return nil, err
}
return []Reference{newref}, err
return []solver.Ref{newref}, err
}
func (b *buildOp) ContentMask(context.Context) (digest.Digest, [][]string, error) {

View File

@ -1,4 +1,4 @@
package solver
package llbop
import (
"encoding/json"
@ -10,6 +10,7 @@ import (
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/progress/logs"
digest "github.com/opencontainers/go-digest"
@ -26,7 +27,7 @@ type execOp struct {
numInputs int
}
func newExecOp(v Vertex, op *pb.Op_Exec, cm cache.Manager, exec executor.Executor) (Op, error) {
func NewExecOp(v solver.Vertex, op *pb.Op_Exec, cm cache.Manager, exec executor.Executor) (solver.Op, error) {
return &execOp{
op: op.Exec,
cm: cm,
@ -54,9 +55,9 @@ func (e *execOp) CacheKey(ctx context.Context) (digest.Digest, error) {
return digest.FromBytes(dt), nil
}
func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, error) {
func (e *execOp) Run(ctx context.Context, inputs []solver.Ref) ([]solver.Ref, error) {
var mounts []executor.Mount
var outputs []Reference
var outputs []solver.Ref
var root cache.Mountable
defer func() {
@ -76,7 +77,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
}
inp := inputs[int(m.Input)]
var ok bool
ref, ok = toImmutableRef(inp)
ref, ok = solver.ToImmutableRef(inp)
if !ok {
return nil, errors.Errorf("invalid reference for exec %T", inputs[int(m.Input)])
}
@ -84,7 +85,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
}
if m.Output != pb.SkipOutput {
if m.Readonly && ref != nil && m.Dest != pb.RootMount { // exclude read-only rootfs
outputs = append(outputs, newSharedRef(ref).Clone())
outputs = append(outputs, solver.NewSharedRef(ref).Clone())
} else {
active, err := e.cm.New(ctx, ref, cache.WithDescription(fmt.Sprintf("mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " ")))) // TODO: should be method
if err != nil {
@ -120,7 +121,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
return nil, errors.Wrapf(err, "executor failed running %v", meta.Args)
}
refs := []Reference{}
refs := []solver.Ref{}
for i, o := range outputs {
if mutable, ok := o.(cache.MutableRef); ok {
ref, err := mutable.Commit(ctx)

View File

@ -1,8 +1,9 @@
package solver
package llbop
import (
"sync"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
digest "github.com/opencontainers/go-digest"
@ -18,7 +19,7 @@ type sourceOp struct {
src source.SourceInstance
}
func newSourceOp(_ Vertex, op *pb.Op_Source, sm *source.Manager) (Op, error) {
func NewSourceOp(_ solver.Vertex, op *pb.Op_Source, sm *source.Manager) (solver.Op, error) {
return &sourceOp{
op: op,
sm: sm,
@ -55,7 +56,7 @@ func (s *sourceOp) CacheKey(ctx context.Context) (digest.Digest, error) {
return digest.FromBytes([]byte(sourceCacheType + ":" + k)), nil
}
func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) {
func (s *sourceOp) Run(ctx context.Context, _ []solver.Ref) ([]solver.Ref, error) {
src, err := s.instance(ctx)
if err != nil {
return nil, err
@ -64,7 +65,7 @@ func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error)
if err != nil {
return nil, err
}
return []Reference{ref}, nil
return []solver.Ref{ref}, nil
}
func (s *sourceOp) ContentMask(context.Context) (digest.Digest, [][]string, error) {

35
solver/llbsolver.go Normal file
View File

@ -0,0 +1,35 @@
package solver
import (
"github.com/moby/buildkit/cache/cacheimport"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/worker"
)
// DetermineVertexWorker determines worker for a vertex.
// Currently, constraint is just ignored.
// Also we need to track the workers of the inputs.
func determineVertexWorker(wc *worker.Controller, v Vertex) (worker.Worker, error) {
// TODO: multiworker
return wc.GetDefault()
}
type LLBOpt struct {
WorkerController *worker.Controller
Frontends map[string]frontend.Frontend // used by nested invocations
CacheExporter *cacheimport.CacheExporter
CacheImporter *cacheimport.CacheImporter
}
func NewLLBOpSolver(opt LLBOpt) *Solver {
var s *Solver
s = New(func(v Vertex) (Op, error) {
// TODO: in reality, worker should be determined already and passed into this function(or this function would be removed)
w, err := determineVertexWorker(opt.WorkerController, v)
if err != nil {
return nil, err
}
return w.ResolveVertex(v, s)
}, opt.WorkerController, determineVertexWorker, opt.Frontends, opt.CacheExporter, opt.CacheImporter)
return s
}

View File

@ -43,7 +43,7 @@ func loadInternalVertexHelper(v Vertex, cache map[digest.Digest]*vertex) *vertex
// loadLLB loads LLB.
// fn is executed sequentially.
func loadLLB(def *pb.Definition, fn func(digest.Digest, *pb.Op, func(digest.Digest) (interface{}, error)) (interface{}, error)) (interface{}, Index, error) {
func loadLLB(def *pb.Definition, fn func(digest.Digest, *pb.Op, func(digest.Digest) (interface{}, error)) (interface{}, error)) (interface{}, pb.OutputIndex, error) {
if len(def.Def) == 0 {
return nil, 0, errors.New("invalid empty definition")
}
@ -81,7 +81,7 @@ func loadLLB(def *pb.Definition, fn func(digest.Digest, *pb.Op, func(digest.Dige
}
v, err := rec(dgst)
return v, Index(lastOp.Inputs[0].Index), err
return v, lastOp.Inputs[0].Index, err
}
func llbOpName(op *pb.Op) string {

View File

@ -7,40 +7,40 @@ import (
"golang.org/x/net/context"
)
// sharedRef is a wrapper around releasable that allows you to make new
// SharedRef is a wrapper around releasable that allows you to make new
// releasable child objects
type sharedRef struct {
type SharedRef struct {
mu sync.Mutex
refs map[*sharedRefInstance]struct{}
main Reference
Reference
main Ref
Ref
}
func newSharedRef(main Reference) *sharedRef {
mr := &sharedRef{
refs: make(map[*sharedRefInstance]struct{}),
Reference: main,
func NewSharedRef(main Ref) *SharedRef {
mr := &SharedRef{
refs: make(map[*sharedRefInstance]struct{}),
Ref: main,
}
mr.main = mr.Clone()
return mr
}
func (mr *sharedRef) Clone() Reference {
func (mr *SharedRef) Clone() Ref {
mr.mu.Lock()
r := &sharedRefInstance{sharedRef: mr}
r := &sharedRefInstance{SharedRef: mr}
mr.refs[r] = struct{}{}
mr.mu.Unlock()
return r
}
func (mr *sharedRef) Release(ctx context.Context) error {
func (mr *SharedRef) Release(ctx context.Context) error {
return mr.main.Release(ctx)
}
func (mr *sharedRef) Sys() Reference {
sys := mr.Reference
func (mr *SharedRef) Sys() Ref {
sys := mr.Ref
if s, ok := sys.(interface {
Sys() Reference
Sys() Ref
}); ok {
return s.Sys()
}
@ -48,31 +48,31 @@ func (mr *sharedRef) Sys() Reference {
}
type sharedRefInstance struct {
*sharedRef
*SharedRef
}
func (r *sharedRefInstance) Release(ctx context.Context) error {
r.sharedRef.mu.Lock()
defer r.sharedRef.mu.Unlock()
delete(r.sharedRef.refs, r)
if len(r.sharedRef.refs) == 0 {
return r.sharedRef.Reference.Release(ctx)
r.SharedRef.mu.Lock()
defer r.SharedRef.mu.Unlock()
delete(r.SharedRef.refs, r)
if len(r.SharedRef.refs) == 0 {
return r.SharedRef.Ref.Release(ctx)
}
return nil
}
func originRef(ref Reference) Reference {
func OriginRef(ref Ref) Ref {
sysRef := ref
if sys, ok := ref.(interface {
Sys() Reference
Sys() Ref
}); ok {
sysRef = sys.Sys()
}
return sysRef
}
func toImmutableRef(ref Reference) (cache.ImmutableRef, bool) {
immutable, ok := originRef(ref).(cache.ImmutableRef)
func ToImmutableRef(ref Ref) (cache.ImmutableRef, bool) {
immutable, ok := OriginRef(ref).(cache.ImmutableRef)
if !ok {
return nil, false
}

View File

@ -11,9 +11,7 @@ import (
"github.com/moby/buildkit/cache/contenthash"
"github.com/moby/buildkit/cache/instructioncache"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/bgfunc"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/worker"
@ -24,78 +22,28 @@ import (
"golang.org/x/sync/errgroup"
)
// DetermineVertexWorker determines worker for a vertex.
// Currently, constraint is just ignored.
// Also we need to track the workers of the inputs.
func DetermineVertexWorker(wc *worker.Controller, v Vertex) (*worker.Worker, error) {
// TODO: multiworker
return wc.GetDefault()
}
func NewLLBSolver(wc *worker.Controller, frontends map[string]frontend.Frontend) *Solver {
var s *Solver
s = New(func(v Vertex) (Op, error) {
w, err := DetermineVertexWorker(wc, v)
if err != nil {
return nil, err
}
switch op := v.Sys().(type) {
case *pb.Op_Source:
return newSourceOp(v, op, w.SourceManager)
case *pb.Op_Exec:
return newExecOp(v, op, w.CacheManager, w.Executor)
case *pb.Op_Build:
return newBuildOp(v, op, s)
default:
return nil, nil
}
}, wc, frontends)
return s
}
// FIXME: Also we need to track the workers of the inputs.
// TODO: REMOVE
type VertexWorkerDeterminer func(wc *worker.Controller, v Vertex) (worker.Worker, error)
// ResolveOpFunc finds an Op implementation for a vertex
type ResolveOpFunc func(Vertex) (Op, error)
// Reference is a reference to the object passed through the build steps.
// This interface is a subset of the cache.Ref interface.
// For ease of unit testing, this interface only has Release().
type Reference interface {
Release(context.Context) error
}
// Op is an implementation for running a vertex
type Op interface {
// CacheKey returns a persistent cache key for operation.
CacheKey(context.Context) (digest.Digest, error)
// ContentMask returns a partial cache checksum with content paths to the
// inputs. User can combine the content checksum of these paths to get a valid
// content based cache key.
ContentMask(context.Context) (digest.Digest, [][]string, error)
// Run runs an operation and returns the output references.
Run(ctx context.Context, inputs []Reference) (outputs []Reference, err error)
}
type Solver struct {
resolve ResolveOpFunc
jobs *jobList
workerController *worker.Controller
frontends map[string]frontend.Frontend
resolve ResolveOpFunc
jobs *jobList
workerController *worker.Controller
determineVertexWorker VertexWorkerDeterminer
frontends map[string]frontend.Frontend
ce *cacheimport.CacheExporter
ci *cacheimport.CacheImporter
}
func New(resolve ResolveOpFunc, wc *worker.Controller, f map[string]frontend.Frontend) *Solver {
return &Solver{resolve: resolve, jobs: newJobList(), workerController: wc, frontends: f}
func New(resolve ResolveOpFunc, wc *worker.Controller, vwd VertexWorkerDeterminer, f map[string]frontend.Frontend, ce *cacheimport.CacheExporter, ci *cacheimport.CacheImporter) *Solver {
return &Solver{resolve: resolve, jobs: newJobList(), workerController: wc, determineVertexWorker: vwd, frontends: f, ce: ce, ci: ci}
}
type SolveRequest struct {
Definition *pb.Definition
Frontend frontend.Frontend
Exporter exporter.ExporterInstance
FrontendOpt map[string]string
ExportCacheRef string
ImportCacheRef string
}
func (s *Solver) solve(ctx context.Context, j *job, req SolveRequest) (Reference, map[string][]byte, error) {
func (s *Solver) solve(ctx context.Context, j *job, req SolveRequest) (Ref, map[string][]byte, error) {
if req.Definition == nil {
if req.Frontend == nil {
return nil, nil, errors.Errorf("invalid request: no definition nor frontend")
@ -107,7 +55,7 @@ func (s *Solver) solve(ctx context.Context, j *job, req SolveRequest) (Reference
if err != nil {
return nil, nil, err
}
ref, err := j.getRef(ctx, inp.Vertex.(*vertex), inp.Index)
ref, err := j.getRef(ctx, inp.Vertex.(*vertex).clientVertex, inp.Index)
return ref, nil, err
}
@ -117,7 +65,7 @@ func (s *Solver) llbBridge(j *job) *llbBridge {
if err != nil {
panic(err)
}
return &llbBridge{job: j, Solver: s, worker: worker}
return &llbBridge{job: j, Solver: s, Worker: worker}
}
func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error {
@ -127,21 +75,22 @@ func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error {
pr, ctx, closeProgressWriter := progress.NewContext(ctx)
defer closeProgressWriter()
// TODO: multiworker
// TODO: multiworker. This should take union cache of all workers
defaultWorker, err := s.workerController.GetDefault()
if err != nil {
return err
}
mainCache := defaultWorker.InstructionCache()
if importRef := req.ImportCacheRef; importRef != "" {
cache, err := defaultWorker.CacheImporter.Import(ctx, importRef)
cache, err := s.ci.Import(ctx, importRef)
if err != nil {
return err
}
defaultWorker.InstructionCache = instructioncache.Union(defaultWorker.InstructionCache, cache)
mainCache = instructioncache.Union(mainCache, cache)
}
// register a build job. vertex needs to be loaded to a job to run
ctx, j, err := s.jobs.new(ctx, id, pr, defaultWorker.InstructionCache)
ctx, j, err := s.jobs.new(ctx, id, pr, mainCache)
if err != nil {
return err
}
@ -161,7 +110,7 @@ func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error {
var immutable cache.ImmutableRef
if ref != nil {
var ok bool
immutable, ok = toImmutableRef(ref)
immutable, ok = ToImmutableRef(ref)
if !ok {
return errors.Errorf("invalid reference for exporting: %T", ref)
}
@ -191,7 +140,7 @@ func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error {
}
// TODO: multiworker
return defaultWorker.CacheExporter.Export(ctx, records, exportName)
return s.ce.Export(ctx, records, exportName)
}); err != nil {
return err
}
@ -209,7 +158,7 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.
return j.pipe(ctx, statusChan)
}
func (s *Solver) subBuild(ctx context.Context, dgst digest.Digest, req SolveRequest) (Reference, error) {
func (s *Solver) SubBuild(ctx context.Context, dgst digest.Digest, req SolveRequest) (Ref, error) {
jl := s.jobs
jl.mu.Lock()
st, ok := jl.actives[dgst]
@ -219,6 +168,7 @@ func (s *Solver) subBuild(ctx context.Context, dgst digest.Digest, req SolveRequ
}
var inp *Input
var cache instructioncache.InstructionCache
for j := range st.jobs {
var err error
inp, err = j.loadInternal(req.Definition, s.resolve)
@ -226,29 +176,26 @@ func (s *Solver) subBuild(ctx context.Context, dgst digest.Digest, req SolveRequ
jl.mu.Unlock()
return nil, err
}
cache = j.cache // TODO: combine?
}
st = jl.actives[inp.Vertex.Digest()]
jl.mu.Unlock()
w, err := DetermineVertexWorker(s.workerController, inp.Vertex)
if err != nil {
return nil, err
}
return getRef(ctx, st.solver, inp.Vertex.(*vertex), inp.Index, w.InstructionCache) // TODO: combine to pass single input // TODO: export cache for subbuilds
return getRef(ctx, st.solver, inp.Vertex.(*vertex).clientVertex, inp.Index, cache) // TODO: combine to pass single input // TODO: export cache for subbuilds
}
type VertexSolver interface {
CacheKey(ctx context.Context, index Index) (digest.Digest, error)
OutputEvaluator(Index) (VertexEvaluator, error)
Release() error
Cache(Index, Reference) CacheExporter
Cache(Index, Ref) CacheExporter
}
type vertexInput struct {
solver VertexSolver
ev VertexEvaluator
cacheKeys []digest.Digest
ref Reference
ref Ref
}
type vertexSolver struct {
@ -257,7 +204,7 @@ type vertexSolver struct {
cv client.Vertex
op Op
cache instructioncache.InstructionCache
refs []*sharedRef
refs []*SharedRef
f *bgfunc.F
ctx context.Context
@ -317,21 +264,21 @@ type CacheExporter interface {
Export(context.Context) ([]cacheimport.CacheRecord, error)
}
func (vs *vertexSolver) Cache(index Index, ref Reference) CacheExporter {
func (vs *vertexSolver) Cache(index Index, ref Ref) CacheExporter {
return &cacheExporter{vertexSolver: vs, index: index, ref: ref}
}
type cacheExporter struct {
*vertexSolver
index Index
ref Reference
ref Ref
}
func (ce *cacheExporter) Export(ctx context.Context) ([]cacheimport.CacheRecord, error) {
return ce.vertexSolver.Export(ctx, ce.index, ce.ref)
}
func (vs *vertexSolver) Export(ctx context.Context, index Index, ref Reference) ([]cacheimport.CacheRecord, error) {
func (vs *vertexSolver) Export(ctx context.Context, index Index, ref Ref) ([]cacheimport.CacheRecord, error) {
mp := map[digest.Digest]cacheimport.CacheRecord{}
if err := vs.appendInputCache(ctx, mp); err != nil {
return nil, err
@ -340,7 +287,7 @@ func (vs *vertexSolver) Export(ctx context.Context, index Index, ref Reference)
if err != nil {
return nil, err
}
immutable, ok := toImmutableRef(ref)
immutable, ok := ToImmutableRef(ref)
if !ok {
return nil, errors.Errorf("invalid reference")
}
@ -365,7 +312,7 @@ func (vs *vertexSolver) appendInputCache(ctx context.Context, mp map[digest.Dige
return err
}
if inp.ref != nil && len(inp.solver.(*vertexSolver).inputs) > 0 { // Ignore pushing the refs for sources
ref, ok := toImmutableRef(inp.ref)
ref, ok := ToImmutableRef(inp.ref)
if !ok {
return errors.Errorf("invalid reference")
}
@ -533,7 +480,7 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
return err
}
if ref != nil {
inp.ref = ref.(Reference)
inp.ref = ref.(Ref)
inp.solver.(*vertexSolver).markCachedOnce.Do(func() {
markCached(ctx, inp.solver.(*vertexSolver).cv)
})
@ -550,7 +497,7 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
return nil
}
if ref := res.Reference; ref != nil {
if ref, ok := toImmutableRef(ref); ok {
if ref, ok := ToImmutableRef(ref); ok {
if !cache.HasCachePolicyRetain(ref) {
if err := cache.CachePolicyRetain(ref); err != nil {
return err
@ -592,7 +539,7 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
}
// Find extra cache keys by content
inputRefs := make([]Reference, len(vs.inputs))
inputRefs := make([]Ref, len(vs.inputs))
lastInputKeys := make([]digest.Digest, len(vs.inputs))
for i := range vs.inputs {
inputRefs[i] = vs.inputs[i].ref
@ -643,9 +590,9 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
if err != nil {
return err
}
sr := make([]*sharedRef, len(refs))
sr := make([]*SharedRef, len(refs))
for i, r := range refs {
sr[i] = newSharedRef(r)
sr[i] = NewSharedRef(r)
}
vs.mu.Lock()
vs.refs = sr
@ -661,7 +608,7 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
if err != nil {
return err
}
r := originRef(ref)
r := OriginRef(ref)
if err := vs.cache.Set(cacheKeyForIndex(cacheKey, Index(i)), r); err != nil {
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
}
@ -694,7 +641,7 @@ func getInputContentHash(ctx context.Context, ref cache.ImmutableRef, selectors
return digest.FromBytes(dt), nil
}
func calculateContentHash(ctx context.Context, refs []Reference, mainDigest digest.Digest, inputs []digest.Digest, contentMap [][]string) (digest.Digest, error) {
func calculateContentHash(ctx context.Context, refs []Ref, mainDigest digest.Digest, inputs []digest.Digest, contentMap [][]string) (digest.Digest, error) {
dgsts := make([]digest.Digest, len(contentMap))
eg, ctx := errgroup.WithContext(ctx)
for i, sel := range contentMap {
@ -704,7 +651,7 @@ func calculateContentHash(ctx context.Context, refs []Reference, mainDigest dige
}
func(i int) {
eg.Go(func() error {
ref, ok := toImmutableRef(refs[i])
ref, ok := ToImmutableRef(refs[i])
if !ok {
return errors.Errorf("invalid reference for exporting: %T", ref)
}
@ -774,7 +721,7 @@ func (ve *vertexEvaluator) Cancel() error {
type VertexResult struct {
CacheKey digest.Digest
Reference Reference
Reference Ref
}
func cacheKeyForIndex(dgst digest.Digest, index Index) digest.Digest {

16
solver/types.go Normal file
View File

@ -0,0 +1,16 @@
package solver
import (
"github.com/moby/buildkit/solver/types"
)
// Ref is a reference to the object passed through the build steps.
// This interface is a subset of the github.com/buildkit/buildkit/cache.Ref interface.
// For ease of unit testing, this interface only has Release().
type Ref = types.Ref
type Op = types.Op
type SolveRequest = types.SolveRequest
type Vertex = types.Vertex
type Input = types.Input
type Index = types.Index

58
solver/types/types.go Normal file
View File

@ -0,0 +1,58 @@
package types
import (
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/solver/pb"
digest "github.com/opencontainers/go-digest"
"golang.org/x/net/context"
)
// These could be also defined in worker
type Ref interface {
Release(context.Context) error
}
// Op is an implementation for running a vertex
type Op interface {
// CacheKey returns a persistent cache key for operation.
CacheKey(context.Context) (digest.Digest, error)
// ContentMask returns a partial cache checksum with content paths to the
// inputs. User can combine the content checksum of these paths to get a valid
// content based cache key.
ContentMask(context.Context) (digest.Digest, [][]string, error)
// Run runs an operation and returns the output references.
Run(ctx context.Context, inputs []Ref) (outputs []Ref, err error)
}
type SolveRequest struct {
Definition *pb.Definition
Frontend frontend.Frontend
Exporter exporter.ExporterInstance
FrontendOpt map[string]string
ExportCacheRef string
ImportCacheRef string
}
// Vertex is one node in the build graph
type Vertex interface {
// Digest is a content-addressable vertex identifier
Digest() digest.Digest
// Sys returns an internal value that is used to execute the vertex. Usually
// this is capured by the operation resolver method during solve.
Sys() interface{}
// FIXME(AkihiroSuda): we should not import pb pkg here.
Metadata() *pb.OpMetadata
// Array of vertexes current vertex depends on.
Inputs() []Input
Name() string // change this to general metadata
}
type Index int
// Input is an pointer to a single reference from a vertex by an index.
type Input struct {
Index Index
Vertex Vertex
}

View File

@ -12,28 +12,6 @@ import (
"golang.org/x/net/context"
)
// Vertex is one node in the build graph
type Vertex interface {
// Digest is a content-addressable vertex identifier
Digest() digest.Digest
// Sys returns an internal value that is used to execute the vertex. Usually
// this is capured by the operation resolver method during solve.
Sys() interface{}
// FIXME(AkihiroSuda): we should not import pb pkg here.
Metadata() *pb.OpMetadata
// Array of vertexes current vertex depends on.
Inputs() []Input
Name() string // change this to general metadata
}
type Index int
// Input is an pointer to a single reference from a vertex by an index.
type Input struct {
Index Index
Vertex Vertex
}
type input struct {
index Index
vertex *vertex

260
worker/base/worker.go Normal file
View File

@ -0,0 +1,260 @@
package base
import (
"io"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/images"
ctdsnapshot "github.com/containerd/containerd/snapshots"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/cacheimport"
"github.com/moby/buildkit/cache/instructioncache"
localcache "github.com/moby/buildkit/cache/instructioncache/local"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/exporter"
imageexporter "github.com/moby/buildkit/exporter/containerimage"
localexporter "github.com/moby/buildkit/exporter/local"
ociexporter "github.com/moby/buildkit/exporter/oci"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot/blobmapping"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbop"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/source/containerimage"
"github.com/moby/buildkit/source/git"
"github.com/moby/buildkit/source/http"
"github.com/moby/buildkit/source/local"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// TODO: this file should be removed. containerd defines ContainerdWorker, oci defines OCIWorker. There is no base worker.
// WorkerOpt is specific to a worker.
// See also CommonOpt.
type WorkerOpt struct {
Name string
SessionManager *session.Manager
MetadataStore *metadata.Store
Executor executor.Executor
BaseSnapshotter ctdsnapshot.Snapshotter // not blobmapping one (FIXME: just require blobmapping snapshotter?)
ContentStore content.Store
Applier diff.Differ
Differ diff.Differ
ImageStore images.Store
}
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
// TODO: s/Worker/OpWorker/g ?
// FIXME: Worker should be rather an interface
type Worker struct {
WorkerOpt
Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter
CacheManager cache.Manager
SourceManager *source.Manager
cache instructioncache.InstructionCache
Exporters map[string]exporter.Exporter
ImageSource source.Source
CacheExporter *cacheimport.CacheExporter // TODO: remove
CacheImporter *cacheimport.CacheImporter // TODO: remove
// no frontend here
}
// NewWorker instantiates a local worker
func NewWorker(opt WorkerOpt) (*Worker, error) {
bmSnapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{
Content: opt.ContentStore,
Snapshotter: opt.BaseSnapshotter,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
cm, err := cache.NewManager(cache.ManagerOpt{
Snapshotter: bmSnapshotter,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
ic := &localcache.LocalStore{
MetadataStore: opt.MetadataStore,
Cache: cm,
}
sm, err := source.NewManager()
if err != nil {
return nil, err
}
is, err := containerimage.NewSource(containerimage.SourceOpt{
Snapshotter: bmSnapshotter,
ContentStore: opt.ContentStore,
SessionManager: opt.SessionManager,
Applier: opt.Applier,
CacheAccessor: cm,
})
if err != nil {
return nil, err
}
sm.Register(is)
gs, err := git.NewSource(git.Opt{
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
sm.Register(gs)
hs, err := http.NewSource(http.Opt{
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
sm.Register(hs)
ss, err := local.NewSource(local.Opt{
SessionManager: opt.SessionManager,
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
sm.Register(ss)
exporters := map[string]exporter.Exporter{}
iw, err := imageexporter.NewImageWriter(imageexporter.WriterOpt{
Snapshotter: bmSnapshotter,
ContentStore: opt.ContentStore,
Differ: opt.Differ,
})
if err != nil {
return nil, err
}
imageExporter, err := imageexporter.New(imageexporter.Opt{
Images: opt.ImageStore,
SessionManager: opt.SessionManager,
ImageWriter: iw,
})
if err != nil {
return nil, err
}
exporters[client.ExporterImage] = imageExporter
localExporter, err := localexporter.New(localexporter.Opt{
SessionManager: opt.SessionManager,
})
if err != nil {
return nil, err
}
exporters[client.ExporterLocal] = localExporter
ociExporter, err := ociexporter.New(ociexporter.Opt{
SessionManager: opt.SessionManager,
ImageWriter: iw,
})
if err != nil {
return nil, err
}
exporters[client.ExporterOCI] = ociExporter
ce := cacheimport.NewCacheExporter(cacheimport.ExporterOpt{
Snapshotter: bmSnapshotter,
ContentStore: opt.ContentStore,
SessionManager: opt.SessionManager,
Differ: opt.Differ,
})
ci := cacheimport.NewCacheImporter(cacheimport.ImportOpt{
Snapshotter: bmSnapshotter,
ContentStore: opt.ContentStore,
Applier: opt.Applier,
CacheAccessor: cm,
SessionManager: opt.SessionManager,
})
return &Worker{
WorkerOpt: opt,
Snapshotter: bmSnapshotter,
CacheManager: cm,
SourceManager: sm,
cache: ic,
Exporters: exporters,
ImageSource: is,
CacheExporter: ce,
CacheImporter: ci,
}, nil
}
func (w *Worker) ResolveVertex(v solver.Vertex, s worker.SubBuilder) (solver.Op, error) {
switch op := v.Sys().(type) {
case *pb.Op_Source:
return llbop.NewSourceOp(v, op, w.SourceManager)
case *pb.Op_Exec:
return llbop.NewExecOp(v, op, w.CacheManager, w.Executor)
case *pb.Op_Build:
return llbop.NewBuildOp(v, op, s)
default:
return nil, errors.Errorf("could not resolve %v", v)
}
}
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) {
// ImageSource is typically source/containerimage
resolveImageConfig, ok := w.ImageSource.(resolveImageConfig)
if !ok {
return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.Name())
}
return resolveImageConfig.ResolveImageConfig(ctx, ref)
}
type resolveImageConfig interface {
ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error)
}
func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
active, err := w.CacheManager.New(ctx, rootFS)
if err != nil {
return err
}
defer active.Release(context.TODO())
return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr)
}
func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
return w.CacheManager.DiskUsage(ctx, opt)
}
func (w *Worker) Name() string {
return w.WorkerOpt.Name
}
func (w *Worker) Exporter(name string) (exporter.Exporter, error) {
exp, ok := w.Exporters[name]
if !ok {
return nil, errors.Errorf("exporter %q could not be found", name)
}
return exp, nil
}
func (w *Worker) InstructionCache() instructioncache.InstructionCache {
return w.cache
}

View File

@ -11,7 +11,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/executor/containerdexecutor"
"github.com/moby/buildkit/worker"
"github.com/moby/buildkit/worker/base"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
@ -19,32 +19,32 @@ import (
// NewWorkerOpt creates a WorkerOpt.
// But it does not set the following fields:
// - SessionManager
func NewWorkerOpt(root string, address, snapshotterName string, opts ...containerd.ClientOpt) (worker.WorkerOpt, error) {
func NewWorkerOpt(root string, address, snapshotterName string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
// TODO: take lock to make sure there are no duplicates
opts = append([]containerd.ClientOpt{containerd.WithDefaultNamespace("buildkit")}, opts...)
client, err := containerd.New(address, opts...)
if err != nil {
return worker.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address)
return base.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address)
}
return newContainerd(root, client, snapshotterName)
}
func newContainerd(root string, client *containerd.Client, snapshotterName string) (worker.WorkerOpt, error) {
func newContainerd(root string, client *containerd.Client, snapshotterName string) (base.WorkerOpt, error) {
if strings.Contains(snapshotterName, "/") {
return worker.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName)
return base.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName)
}
name := "containerd-" + snapshotterName
root = filepath.Join(root, name)
if err := os.MkdirAll(root, 0700); err != nil {
return worker.WorkerOpt{}, errors.Wrapf(err, "failed to create %s", root)
return base.WorkerOpt{}, errors.Wrapf(err, "failed to create %s", root)
}
md, err := metadata.NewStore(filepath.Join(root, "metadata.db"))
if err != nil {
return worker.WorkerOpt{}, err
return base.WorkerOpt{}, err
}
df := client.DiffService()
opt := worker.WorkerOpt{
opt := base.WorkerOpt{
Name: name,
MetadataStore: md,
Executor: containerdexecutor.New(client, root),

View File

@ -16,15 +16,15 @@ import (
"github.com/containerd/containerd/snapshots/overlay"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/executor/runcexecutor"
"github.com/moby/buildkit/worker"
"github.com/moby/buildkit/worker/base"
"github.com/opencontainers/go-digest"
)
// NewWorkerOpt creates a WorkerOpt.
// But it does not set the following fields:
// - SessionManager
func NewWorkerOpt(root string) (worker.WorkerOpt, error) {
var opt worker.WorkerOpt
func NewWorkerOpt(root string) (base.WorkerOpt, error) {
var opt base.WorkerOpt
name := "runc-overlay"
root = filepath.Join(root, name)
if err := os.MkdirAll(root, 0700); err != nil {
@ -68,7 +68,7 @@ func NewWorkerOpt(root string) (worker.WorkerOpt, error) {
// TODO: call mdb.GarbageCollect . maybe just inject it into nsSnapshotter.Remove and csContent.Delete
opt = worker.WorkerOpt{
opt = base.WorkerOpt{
Name: name,
MetadataStore: md,
Executor: exe,

View File

@ -17,7 +17,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/worker"
"github.com/moby/buildkit/worker/base"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
@ -44,7 +44,7 @@ func TestRuncWorker(t *testing.T) {
workerOpt.SessionManager, err = session.NewManager()
require.NoError(t, err)
w, err := worker.NewWorker(workerOpt)
w, err := base.NewWorker(workerOpt)
require.NoError(t, err)
img, err := source.NewImageIdentifier("docker.io/library/busybox:latest")

View File

@ -1,194 +1,30 @@
package worker
import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/images"
ctdsnapshot "github.com/containerd/containerd/snapshots"
"io"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/cacheimport"
"github.com/moby/buildkit/cache/instructioncache"
localcache "github.com/moby/buildkit/cache/instructioncache/local"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/exporter"
imageexporter "github.com/moby/buildkit/exporter/containerimage"
localexporter "github.com/moby/buildkit/exporter/local"
ociexporter "github.com/moby/buildkit/exporter/oci"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot/blobmapping"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/source/containerimage"
"github.com/moby/buildkit/source/git"
"github.com/moby/buildkit/source/http"
"github.com/moby/buildkit/source/local"
"github.com/moby/buildkit/solver/types"
digest "github.com/opencontainers/go-digest"
"golang.org/x/net/context"
)
// WorkerOpt is specific to a worker.
// See also CommonOpt.
type WorkerOpt struct {
Name string
SessionManager *session.Manager
MetadataStore *metadata.Store
Executor executor.Executor
BaseSnapshotter ctdsnapshot.Snapshotter // not blobmapping one (FIXME: just require blobmapping snapshotter?)
ContentStore content.Store
Applier diff.Differ
Differ diff.Differ
ImageStore images.Store
type SubBuilder interface {
SubBuild(ctx context.Context, dgst digest.Digest, req types.SolveRequest) (types.Ref, error)
}
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
// TODO: s/Worker/OpWorker/g ?
// FIXME: Worker should be rather an interface
type Worker struct {
WorkerOpt
Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter
CacheManager cache.Manager
SourceManager *source.Manager
InstructionCache instructioncache.InstructionCache
Exporters map[string]exporter.Exporter
ImageSource source.Source
CacheExporter *cacheimport.CacheExporter
CacheImporter *cacheimport.CacheImporter
// no frontend here
}
// NewWorker instantiates a local worker
func NewWorker(opt WorkerOpt) (*Worker, error) {
bmSnapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{
Content: opt.ContentStore,
Snapshotter: opt.BaseSnapshotter,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
cm, err := cache.NewManager(cache.ManagerOpt{
Snapshotter: bmSnapshotter,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
ic := &localcache.LocalStore{
MetadataStore: opt.MetadataStore,
Cache: cm,
}
sm, err := source.NewManager()
if err != nil {
return nil, err
}
is, err := containerimage.NewSource(containerimage.SourceOpt{
Snapshotter: bmSnapshotter,
ContentStore: opt.ContentStore,
SessionManager: opt.SessionManager,
Applier: opt.Applier,
CacheAccessor: cm,
})
if err != nil {
return nil, err
}
sm.Register(is)
gs, err := git.NewSource(git.Opt{
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
sm.Register(gs)
hs, err := http.NewSource(http.Opt{
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
sm.Register(hs)
ss, err := local.NewSource(local.Opt{
SessionManager: opt.SessionManager,
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
sm.Register(ss)
exporters := map[string]exporter.Exporter{}
iw, err := imageexporter.NewImageWriter(imageexporter.WriterOpt{
Snapshotter: bmSnapshotter,
ContentStore: opt.ContentStore,
Differ: opt.Differ,
})
if err != nil {
return nil, err
}
imageExporter, err := imageexporter.New(imageexporter.Opt{
Images: opt.ImageStore,
SessionManager: opt.SessionManager,
ImageWriter: iw,
})
if err != nil {
return nil, err
}
exporters[client.ExporterImage] = imageExporter
localExporter, err := localexporter.New(localexporter.Opt{
SessionManager: opt.SessionManager,
})
if err != nil {
return nil, err
}
exporters[client.ExporterLocal] = localExporter
ociExporter, err := ociexporter.New(ociexporter.Opt{
SessionManager: opt.SessionManager,
ImageWriter: iw,
})
if err != nil {
return nil, err
}
exporters[client.ExporterOCI] = ociExporter
ce := cacheimport.NewCacheExporter(cacheimport.ExporterOpt{
Snapshotter: bmSnapshotter,
ContentStore: opt.ContentStore,
SessionManager: opt.SessionManager,
Differ: opt.Differ,
})
ci := cacheimport.NewCacheImporter(cacheimport.ImportOpt{
Snapshotter: bmSnapshotter,
ContentStore: opt.ContentStore,
Applier: opt.Applier,
CacheAccessor: cm,
SessionManager: opt.SessionManager,
})
return &Worker{
WorkerOpt: opt,
Snapshotter: bmSnapshotter,
CacheManager: cm,
SourceManager: sm,
InstructionCache: ic,
Exporters: exporters,
ImageSource: is,
CacheExporter: ce,
CacheImporter: ci,
}, nil
type Worker interface {
InstructionCache() instructioncache.InstructionCache
// ResolveVertex resolves Vertex.Sys() to Op implementation. SubBuilder is needed for pb.Op_Build.
ResolveVertex(v types.Vertex, s SubBuilder) (types.Op, error)
ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error)
// Exec is similar to executor.Exec but without []mount.Mount
Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error)
Name() string
Exporter(name string) (exporter.Exporter, error)
}

View File

@ -11,11 +11,11 @@ import (
type Controller struct {
mu sync.Mutex
// TODO: define worker interface and support remote ones
workers []*Worker
workers []Worker
}
// Add adds a local worker
func (c *Controller) Add(w *Worker) error {
func (c *Controller) Add(w Worker) error {
c.mu.Lock()
c.workers = append(c.workers, w)
c.mu.Unlock()
@ -23,7 +23,7 @@ func (c *Controller) Add(w *Worker) error {
}
// GetAll returns all local workers
func (c *Controller) GetAll() []*Worker {
func (c *Controller) GetAll() []Worker {
c.mu.Lock()
workers := c.workers
c.mu.Unlock()
@ -31,8 +31,8 @@ func (c *Controller) GetAll() []*Worker {
}
// GetDefault returns the default local worker
func (c *Controller) GetDefault() (*Worker, error) {
var w *Worker
func (c *Controller) GetDefault() (Worker, error) {
var w Worker
c.mu.Lock()
if len(c.workers) > 0 {
w = c.workers[0]