worker, solver: update interfaces
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-18.09
parent
a4316d16c7
commit
efde4f2340
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/containerd/containerd/sys"
|
||||
"github.com/docker/go-connections/sockets"
|
||||
"github.com/moby/buildkit/cache/cacheimport"
|
||||
"github.com/moby/buildkit/control"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
"github.com/moby/buildkit/frontend/dockerfile"
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
"github.com/moby/buildkit/util/appdefaults"
|
||||
"github.com/moby/buildkit/util/profiler"
|
||||
"github.com/moby/buildkit/worker"
|
||||
"github.com/moby/buildkit/worker/base"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
|
@ -33,7 +35,7 @@ type workerInitializerOpt struct {
|
|||
}
|
||||
|
||||
type workerInitializer struct {
|
||||
fn func(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error)
|
||||
fn func(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error)
|
||||
// less priority number, more preferred
|
||||
priority int
|
||||
}
|
||||
|
@ -227,10 +229,28 @@ func newController(c *cli.Context, root string) (*control.Controller, error) {
|
|||
frontends := map[string]frontend.Frontend{}
|
||||
frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend()
|
||||
frontends["gateway.v0"] = gateway.NewGatewayFrontend()
|
||||
|
||||
// cache exporter and importer are manager concepts but as there is no
|
||||
// way to pull data into specific worker yet we currently set them up
|
||||
// as part of default worker
|
||||
var ce *cacheimport.CacheExporter
|
||||
var ci *cacheimport.CacheImporter
|
||||
|
||||
w, err := wc.GetDefault()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wt := w.(*base.Worker)
|
||||
ce = wt.CacheExporter
|
||||
ci = wt.CacheImporter
|
||||
|
||||
return control.NewController(control.Opt{
|
||||
SessionManager: sessionManager,
|
||||
WorkerController: wc,
|
||||
Frontends: frontends,
|
||||
CacheExporter: ce,
|
||||
CacheImporter: ci,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -242,7 +262,7 @@ func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Co
|
|||
return nil, err
|
||||
}
|
||||
for _, w := range ws {
|
||||
logrus.Infof("Found worker %q", w.Name)
|
||||
logrus.Infof("found worker %q", w.Name())
|
||||
if err = wc.Add(w); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -256,7 +276,7 @@ func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Co
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logrus.Infof("Found %d workers, default=%q", nWorkers, defaultWorker.Name)
|
||||
logrus.Warn("Currently, only the default worker can be used.")
|
||||
logrus.Infof("found %d workers, default=%q", nWorkers, defaultWorker.Name)
|
||||
logrus.Warn("currently, only the default worker can be used.")
|
||||
return wc, nil
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
ctd "github.com/containerd/containerd"
|
||||
"github.com/moby/buildkit/worker"
|
||||
"github.com/moby/buildkit/worker/base"
|
||||
"github.com/moby/buildkit/worker/containerd"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
|
@ -33,7 +34,7 @@ func init() {
|
|||
// TODO(AkihiroSuda): allow using multiple snapshotters. should be useful for some applications that does not work with the default overlay snapshotter. e.g. mysql (docker/for-linux#72)",
|
||||
}
|
||||
|
||||
func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) {
|
||||
func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) {
|
||||
socket := c.GlobalString("containerd-worker-addr")
|
||||
boolOrAuto, err := parseBoolOrAuto(c.GlobalString("containerd-worker"))
|
||||
if err != nil {
|
||||
|
@ -47,11 +48,11 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
|
|||
return nil, err
|
||||
}
|
||||
opt.SessionManager = common.sessionManager
|
||||
w, err := worker.NewWorker(opt)
|
||||
w, err := base.NewWorker(opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []*worker.Worker{w}, nil
|
||||
return []worker.Worker{w}, nil
|
||||
}
|
||||
|
||||
func validContainerdSocket(socket string) bool {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"os/exec"
|
||||
|
||||
"github.com/moby/buildkit/worker"
|
||||
"github.com/moby/buildkit/worker/base"
|
||||
"github.com/moby/buildkit/worker/runc"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
|
@ -25,7 +26,7 @@ func init() {
|
|||
// TODO: allow multiple oci runtimes and snapshotters
|
||||
}
|
||||
|
||||
func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) {
|
||||
func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) {
|
||||
boolOrAuto, err := parseBoolOrAuto(c.GlobalString("oci-worker"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -38,11 +39,11 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worke
|
|||
return nil, err
|
||||
}
|
||||
opt.SessionManager = common.sessionManager
|
||||
w, err := worker.NewWorker(opt)
|
||||
w, err := base.NewWorker(opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []*worker.Worker{w}, nil
|
||||
return []worker.Worker{w}, nil
|
||||
}
|
||||
|
||||
func validOCIBinary() bool {
|
||||
|
|
|
@ -3,13 +3,13 @@ package control
|
|||
import (
|
||||
"github.com/docker/distribution/reference"
|
||||
controlapi "github.com/moby/buildkit/api/services/control"
|
||||
"github.com/moby/buildkit/cache/cacheimport"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/exporter"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/session/grpchijack"
|
||||
"github.com/moby/buildkit/solver"
|
||||
solverimpl "github.com/moby/buildkit/solver/solver"
|
||||
"github.com/moby/buildkit/worker"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -22,17 +22,24 @@ type Opt struct {
|
|||
SessionManager *session.Manager
|
||||
WorkerController *worker.Controller
|
||||
Frontends map[string]frontend.Frontend
|
||||
CacheExporter *cacheimport.CacheExporter
|
||||
CacheImporter *cacheimport.CacheImporter
|
||||
}
|
||||
|
||||
type Controller struct { // TODO: ControlService
|
||||
opt Opt
|
||||
solver solver.Solver
|
||||
solver *solver.Solver
|
||||
}
|
||||
|
||||
func NewController(opt Opt) (*Controller, error) {
|
||||
c := &Controller{
|
||||
opt: opt,
|
||||
solver: solverimpl.NewLLBOpSolver(opt.WorkerController, opt.Frontends),
|
||||
opt: opt,
|
||||
solver: solver.NewLLBOpSolver(solver.LLBOpt{
|
||||
WorkerController: opt.WorkerController,
|
||||
Frontends: opt.Frontends,
|
||||
CacheExporter: opt.CacheExporter,
|
||||
CacheImporter: opt.CacheImporter,
|
||||
}),
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
@ -45,7 +52,7 @@ func (c *Controller) Register(server *grpc.Server) error {
|
|||
func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) {
|
||||
resp := &controlapi.DiskUsageResponse{}
|
||||
for _, w := range c.opt.WorkerController.GetAll() {
|
||||
du, err := w.CacheManager.DiskUsage(ctx, client.DiskUsageInfo{
|
||||
du, err := w.DiskUsage(ctx, client.DiskUsageInfo{
|
||||
Filter: r.Filter,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -90,9 +97,9 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
|
|||
return nil, err
|
||||
}
|
||||
if req.Exporter != "" {
|
||||
exp, ok := w.Exporters[req.Exporter]
|
||||
if !ok {
|
||||
return nil, errors.Errorf("exporter %q could not be found", req.Exporter)
|
||||
exp, err := w.Exporter(req.Exporter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
expi, err = exp.Resolve(ctx, req.ExporterAttrs)
|
||||
if err != nil {
|
||||
|
|
|
@ -8,9 +8,7 @@ import (
|
|||
"github.com/moby/buildkit/cache/instructioncache"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/solver/reference"
|
||||
"github.com/moby/buildkit/util/progress"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -163,18 +161,18 @@ type job struct {
|
|||
|
||||
type cacheRecord struct {
|
||||
VertexSolver
|
||||
index solver.Index
|
||||
ref solver.Ref
|
||||
index Index
|
||||
ref Ref
|
||||
}
|
||||
|
||||
func (j *job) load(def *pb.Definition, resolveOp ResolveOpFunc) (*solver.Input, error) {
|
||||
func (j *job) load(def *pb.Definition, resolveOp ResolveOpFunc) (*Input, error) {
|
||||
j.l.mu.Lock()
|
||||
defer j.l.mu.Unlock()
|
||||
|
||||
return j.loadInternal(def, resolveOp)
|
||||
}
|
||||
|
||||
func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*solver.Input, error) {
|
||||
func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*Input, error) {
|
||||
vtx, idx, err := loadLLB(def, func(dgst digest.Digest, pbOp *pb.Op, load func(digest.Digest) (interface{}, error)) (interface{}, error) {
|
||||
if st, ok := j.l.actives[dgst]; ok {
|
||||
if vtx, ok := st.jobs[j]; ok {
|
||||
|
@ -206,7 +204,7 @@ func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*solver
|
|||
}
|
||||
for i, input := range pbOp.Inputs {
|
||||
if inputMetadata := def.Metadata[input.Digest]; inputMetadata.IgnoreCache {
|
||||
k, err := s.CacheKey(ctx, solver.Index(i))
|
||||
k, err := s.CacheKey(ctx, Index(i))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -227,7 +225,7 @@ func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*solver
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &solver.Input{Vertex: vtx.(*vertex), Index: solver.Index(idx)}, nil
|
||||
return &Input{Vertex: vtx.(*vertex), Index: Index(idx)}, nil
|
||||
}
|
||||
|
||||
func (j *job) discard() {
|
||||
|
@ -255,7 +253,7 @@ func (j *job) getSolver(dgst digest.Digest) (VertexSolver, error) {
|
|||
return st.solver, nil
|
||||
}
|
||||
|
||||
func (j *job) getRef(ctx context.Context, cv client.Vertex, index solver.Index) (solver.Ref, error) {
|
||||
func (j *job) getRef(ctx context.Context, cv client.Vertex, index Index) (Ref, error) {
|
||||
s, err := j.getSolver(cv.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -268,15 +266,15 @@ func (j *job) getRef(ctx context.Context, cv client.Vertex, index solver.Index)
|
|||
return ref, nil
|
||||
}
|
||||
|
||||
func (j *job) keepCacheRef(s VertexSolver, index solver.Index, ref solver.Ref) {
|
||||
immutable, ok := reference.ToImmutableRef(ref)
|
||||
func (j *job) keepCacheRef(s VertexSolver, index Index, ref Ref) {
|
||||
immutable, ok := ToImmutableRef(ref)
|
||||
if ok {
|
||||
j.cached[immutable.ID()] = &cacheRecord{s, index, ref}
|
||||
}
|
||||
}
|
||||
|
||||
func (j *job) cacheExporter(ref solver.Ref) (CacheExporter, error) {
|
||||
immutable, ok := reference.ToImmutableRef(ref)
|
||||
func (j *job) cacheExporter(ref Ref) (CacheExporter, error) {
|
||||
immutable, ok := ToImmutableRef(ref)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid reference")
|
||||
}
|
||||
|
@ -287,7 +285,7 @@ func (j *job) cacheExporter(ref solver.Ref) (CacheExporter, error) {
|
|||
return cr.Cache(cr.index, cr.ref), nil
|
||||
}
|
||||
|
||||
func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index solver.Index, cache instructioncache.InstructionCache) (solver.Ref, error) {
|
||||
func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index Index, cache instructioncache.InstructionCache) (Ref, error) {
|
||||
k, err := s.CacheKey(ctx, index)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -298,7 +296,7 @@ func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index solver.
|
|||
}
|
||||
if ref != nil {
|
||||
markCached(ctx, cv)
|
||||
return ref.(solver.Ref), nil
|
||||
return ref.(Ref), nil
|
||||
}
|
||||
|
||||
ev, err := s.OutputEvaluator(index)
|
||||
|
@ -319,7 +317,7 @@ func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index solver.
|
|||
}
|
||||
if ref != nil {
|
||||
markCached(ctx, cv)
|
||||
return ref.(solver.Ref), nil
|
||||
return ref.(Ref), nil
|
||||
}
|
||||
continue
|
||||
}
|
|
@ -1,13 +1,8 @@
|
|||
package solver
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/executor"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
solver "github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/reference"
|
||||
"github.com/moby/buildkit/worker"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -19,7 +14,7 @@ type llbBridge struct {
|
|||
*Solver
|
||||
job *job
|
||||
// this worker is used for running containerized frontend, not vertices
|
||||
worker *worker.Worker
|
||||
worker.Worker
|
||||
}
|
||||
|
||||
type resolveImageConfig interface {
|
||||
|
@ -39,7 +34,7 @@ func (s *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (cache
|
|||
return nil, nil, nil
|
||||
}
|
||||
}
|
||||
ref, exp, err := s.solve(ctx, s.job, solver.SolveRequest{
|
||||
ref, exp, err := s.solve(ctx, s.job, SolveRequest{
|
||||
Definition: req.Definition,
|
||||
Frontend: f,
|
||||
FrontendOpt: req.FrontendOpt,
|
||||
|
@ -47,27 +42,9 @@ func (s *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (cache
|
|||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
immutable, ok := reference.ToImmutableRef(ref)
|
||||
immutable, ok := ToImmutableRef(ref)
|
||||
if !ok {
|
||||
return nil, nil, errors.Errorf("invalid reference for exporting: %T", ref)
|
||||
}
|
||||
return immutable, exp, nil
|
||||
}
|
||||
|
||||
func (s *llbBridge) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) {
|
||||
// ImageSource is typically source/containerimage
|
||||
resolveImageConfig, ok := s.worker.ImageSource.(resolveImageConfig)
|
||||
if !ok {
|
||||
return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", s.worker.Name)
|
||||
}
|
||||
return resolveImageConfig.ResolveImageConfig(ctx, ref)
|
||||
}
|
||||
|
||||
func (s *llbBridge) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
|
||||
active, err := s.worker.CacheManager.New(ctx, rootFS)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer active.Release(context.TODO())
|
||||
return s.worker.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr)
|
||||
}
|
|
@ -7,27 +7,23 @@ import (
|
|||
"github.com/containerd/containerd/fs"
|
||||
"github.com/moby/buildkit/client/llb"
|
||||
"github.com/moby/buildkit/snapshot"
|
||||
solver "github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/solver/reference"
|
||||
"github.com/moby/buildkit/worker"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type SubBuilder interface {
|
||||
SubBuild(ctx context.Context, dgst digest.Digest, req solver.SolveRequest) (solver.Ref, error)
|
||||
}
|
||||
|
||||
const buildCacheType = "buildkit.build.v0"
|
||||
|
||||
type buildOp struct {
|
||||
op *pb.BuildOp
|
||||
s SubBuilder
|
||||
s worker.SubBuilder
|
||||
v solver.Vertex
|
||||
}
|
||||
|
||||
func NewBuildOp(v solver.Vertex, op *pb.Op_Build, s SubBuilder) (solver.Op, error) {
|
||||
func NewBuildOp(v solver.Vertex, op *pb.Op_Build, s worker.SubBuilder) (solver.Op, error) {
|
||||
return &buildOp{
|
||||
op: op.Build,
|
||||
s: s,
|
||||
|
@ -66,7 +62,7 @@ func (b *buildOp) Run(ctx context.Context, inputs []solver.Ref) (outputs []solve
|
|||
}
|
||||
inp := inputs[i]
|
||||
|
||||
ref, ok := reference.ToImmutableRef(inp)
|
||||
ref, ok := solver.ToImmutableRef(inp)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid reference for build %T", inp)
|
||||
}
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/moby/buildkit/executor"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/solver/reference"
|
||||
"github.com/moby/buildkit/util/progress/logs"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -78,7 +77,7 @@ func (e *execOp) Run(ctx context.Context, inputs []solver.Ref) ([]solver.Ref, er
|
|||
}
|
||||
inp := inputs[int(m.Input)]
|
||||
var ok bool
|
||||
ref, ok = reference.ToImmutableRef(inp)
|
||||
ref, ok = solver.ToImmutableRef(inp)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid reference for exec %T", inputs[int(m.Input)])
|
||||
}
|
||||
|
@ -86,7 +85,7 @@ func (e *execOp) Run(ctx context.Context, inputs []solver.Ref) ([]solver.Ref, er
|
|||
}
|
||||
if m.Output != pb.SkipOutput {
|
||||
if m.Readonly && ref != nil && m.Dest != pb.RootMount { // exclude read-only rootfs
|
||||
outputs = append(outputs, reference.NewSharedRef(ref).Clone())
|
||||
outputs = append(outputs, solver.NewSharedRef(ref).Clone())
|
||||
} else {
|
||||
active, err := e.cm.New(ctx, ref, cache.WithDescription(fmt.Sprintf("mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " ")))) // TODO: should be method
|
||||
if err != nil {
|
|
@ -0,0 +1,35 @@
|
|||
package solver
|
||||
|
||||
import (
|
||||
"github.com/moby/buildkit/cache/cacheimport"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
"github.com/moby/buildkit/worker"
|
||||
)
|
||||
|
||||
// DetermineVertexWorker determines worker for a vertex.
|
||||
// Currently, constraint is just ignored.
|
||||
// Also we need to track the workers of the inputs.
|
||||
func determineVertexWorker(wc *worker.Controller, v Vertex) (worker.Worker, error) {
|
||||
// TODO: multiworker
|
||||
return wc.GetDefault()
|
||||
}
|
||||
|
||||
type LLBOpt struct {
|
||||
WorkerController *worker.Controller
|
||||
Frontends map[string]frontend.Frontend // used by nested invocations
|
||||
CacheExporter *cacheimport.CacheExporter
|
||||
CacheImporter *cacheimport.CacheImporter
|
||||
}
|
||||
|
||||
func NewLLBOpSolver(opt LLBOpt) *Solver {
|
||||
var s *Solver
|
||||
s = New(func(v Vertex) (Op, error) {
|
||||
// TODO: in reality, worker should be determined already and passed into this function(or this function would be removed)
|
||||
w, err := determineVertexWorker(opt.WorkerController, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return w.Resolve(v, s)
|
||||
}, opt.WorkerController, determineVertexWorker, opt.Frontends, opt.CacheExporter, opt.CacheImporter)
|
||||
return s
|
||||
}
|
|
@ -3,7 +3,6 @@ package solver
|
|||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/source"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
|
@ -17,18 +16,18 @@ func newVertex(dgst digest.Digest, op *pb.Op, opMeta *pb.OpMetadata, load func(d
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vtx.inputs = append(vtx.inputs, &input{index: solver.Index(in.Index), vertex: sub.(*vertex)})
|
||||
vtx.inputs = append(vtx.inputs, &input{index: Index(in.Index), vertex: sub.(*vertex)})
|
||||
}
|
||||
vtx.initClientVertex()
|
||||
return vtx, nil
|
||||
}
|
||||
|
||||
func toInternalVertex(v solver.Vertex) *vertex {
|
||||
func toInternalVertex(v Vertex) *vertex {
|
||||
cache := make(map[digest.Digest]*vertex)
|
||||
return loadInternalVertexHelper(v, cache)
|
||||
}
|
||||
|
||||
func loadInternalVertexHelper(v solver.Vertex, cache map[digest.Digest]*vertex) *vertex {
|
||||
func loadInternalVertexHelper(v Vertex, cache map[digest.Digest]*vertex) *vertex {
|
||||
if v, ok := cache[v.Digest()]; ok {
|
||||
return v
|
||||
}
|
|
@ -1,10 +1,9 @@
|
|||
package reference
|
||||
package solver
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
@ -13,11 +12,11 @@ import (
|
|||
type SharedRef struct {
|
||||
mu sync.Mutex
|
||||
refs map[*sharedRefInstance]struct{}
|
||||
main solver.Ref
|
||||
solver.Ref
|
||||
main Ref
|
||||
Ref
|
||||
}
|
||||
|
||||
func NewSharedRef(main solver.Ref) *SharedRef {
|
||||
func NewSharedRef(main Ref) *SharedRef {
|
||||
mr := &SharedRef{
|
||||
refs: make(map[*sharedRefInstance]struct{}),
|
||||
Ref: main,
|
||||
|
@ -26,7 +25,7 @@ func NewSharedRef(main solver.Ref) *SharedRef {
|
|||
return mr
|
||||
}
|
||||
|
||||
func (mr *SharedRef) Clone() solver.Ref {
|
||||
func (mr *SharedRef) Clone() Ref {
|
||||
mr.mu.Lock()
|
||||
r := &sharedRefInstance{SharedRef: mr}
|
||||
mr.refs[r] = struct{}{}
|
||||
|
@ -38,10 +37,10 @@ func (mr *SharedRef) Release(ctx context.Context) error {
|
|||
return mr.main.Release(ctx)
|
||||
}
|
||||
|
||||
func (mr *SharedRef) Sys() solver.Ref {
|
||||
func (mr *SharedRef) Sys() Ref {
|
||||
sys := mr.Ref
|
||||
if s, ok := sys.(interface {
|
||||
Sys() solver.Ref
|
||||
Sys() Ref
|
||||
}); ok {
|
||||
return s.Sys()
|
||||
}
|
||||
|
@ -62,17 +61,17 @@ func (r *sharedRefInstance) Release(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func OriginRef(ref solver.Ref) solver.Ref {
|
||||
func OriginRef(ref Ref) Ref {
|
||||
sysRef := ref
|
||||
if sys, ok := ref.(interface {
|
||||
Sys() solver.Ref
|
||||
Sys() Ref
|
||||
}); ok {
|
||||
sysRef = sys.Sys()
|
||||
}
|
||||
return sysRef
|
||||
}
|
||||
|
||||
func ToImmutableRef(ref solver.Ref) (cache.ImmutableRef, bool) {
|
||||
func ToImmutableRef(ref Ref) (cache.ImmutableRef, bool) {
|
||||
immutable, ok := OriginRef(ref).(cache.ImmutableRef)
|
||||
if !ok {
|
||||
return nil, false
|
740
solver/solver.go
740
solver/solver.go
|
@ -1,43 +1,729 @@
|
|||
package solver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/cacheimport"
|
||||
"github.com/moby/buildkit/cache/contenthash"
|
||||
"github.com/moby/buildkit/cache/instructioncache"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/exporter"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/util/bgfunc"
|
||||
"github.com/moby/buildkit/util/progress"
|
||||
"github.com/moby/buildkit/worker"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// Ref is a reference to the object passed through the build steps.
|
||||
// This interface is a subset of the github.com/buildkit/buildkit/cache.Ref interface.
|
||||
// For ease of unit testing, this interface only has Release().
|
||||
type Ref interface {
|
||||
Release(context.Context) error
|
||||
// FIXME: Also we need to track the workers of the inputs.
|
||||
// TODO: REMOVE
|
||||
type VertexWorkerDeterminer func(wc *worker.Controller, v Vertex) (worker.Worker, error)
|
||||
|
||||
// ResolveOpFunc finds an Op implementation for a vertex
|
||||
type ResolveOpFunc func(Vertex) (Op, error)
|
||||
|
||||
type Solver struct {
|
||||
resolve ResolveOpFunc
|
||||
jobs *jobList
|
||||
workerController *worker.Controller
|
||||
determineVertexWorker VertexWorkerDeterminer
|
||||
frontends map[string]frontend.Frontend
|
||||
ce *cacheimport.CacheExporter
|
||||
ci *cacheimport.CacheImporter
|
||||
}
|
||||
|
||||
// Op is an implementation for running a vertex
|
||||
type Op interface {
|
||||
// CacheKey returns a persistent cache key for operation.
|
||||
CacheKey(context.Context) (digest.Digest, error)
|
||||
// ContentMask returns a partial cache checksum with content paths to the
|
||||
// inputs. User can combine the content checksum of these paths to get a valid
|
||||
// content based cache key.
|
||||
ContentMask(context.Context) (digest.Digest, [][]string, error)
|
||||
// Run runs an operation and returns the output references.
|
||||
Run(ctx context.Context, inputs []Ref) (outputs []Ref, err error)
|
||||
func New(resolve ResolveOpFunc, wc *worker.Controller, vwd VertexWorkerDeterminer, f map[string]frontend.Frontend, ce *cacheimport.CacheExporter, ci *cacheimport.CacheImporter) *Solver {
|
||||
return &Solver{resolve: resolve, jobs: newJobList(), workerController: wc, determineVertexWorker: vwd, frontends: f, ce: ce, ci: ci}
|
||||
}
|
||||
|
||||
type SolveRequest struct {
|
||||
Definition *pb.Definition
|
||||
Frontend frontend.Frontend
|
||||
Exporter exporter.ExporterInstance
|
||||
FrontendOpt map[string]string
|
||||
ExportCacheRef string
|
||||
ImportCacheRef string
|
||||
func (s *Solver) solve(ctx context.Context, j *job, req SolveRequest) (Ref, map[string][]byte, error) {
|
||||
if req.Definition == nil {
|
||||
if req.Frontend == nil {
|
||||
return nil, nil, errors.Errorf("invalid request: no definition nor frontend")
|
||||
}
|
||||
return req.Frontend.Solve(ctx, s.llbBridge(j), req.FrontendOpt)
|
||||
}
|
||||
|
||||
inp, err := j.load(req.Definition, s.resolve)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
ref, err := j.getRef(ctx, inp.Vertex.(*vertex).clientVertex, inp.Index)
|
||||
return ref, nil, err
|
||||
}
|
||||
|
||||
type Solver interface {
|
||||
Solve(ctx context.Context, id string, req SolveRequest) error
|
||||
Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error
|
||||
func (s *Solver) llbBridge(j *job) *llbBridge {
|
||||
// FIXME(AkihiroSuda): make sure worker implements interfaces required by llbBridge
|
||||
worker, err := s.workerController.GetDefault()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &llbBridge{job: j, Solver: s, Worker: worker}
|
||||
}
|
||||
|
||||
func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
pr, ctx, closeProgressWriter := progress.NewContext(ctx)
|
||||
defer closeProgressWriter()
|
||||
|
||||
// TODO: multiworker. This should take union cache of all workers
|
||||
defaultWorker, err := s.workerController.GetDefault()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mainCache := defaultWorker.InstructionCache()
|
||||
if importRef := req.ImportCacheRef; importRef != "" {
|
||||
cache, err := s.ci.Import(ctx, importRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mainCache = instructioncache.Union(mainCache, cache)
|
||||
}
|
||||
|
||||
// register a build job. vertex needs to be loaded to a job to run
|
||||
ctx, j, err := s.jobs.new(ctx, id, pr, mainCache)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ref, exporterOpt, err := s.solve(ctx, j, req)
|
||||
defer j.discard()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if ref != nil {
|
||||
go ref.Release(context.TODO())
|
||||
}
|
||||
}()
|
||||
|
||||
var immutable cache.ImmutableRef
|
||||
if ref != nil {
|
||||
var ok bool
|
||||
immutable, ok = ToImmutableRef(ref)
|
||||
if !ok {
|
||||
return errors.Errorf("invalid reference for exporting: %T", ref)
|
||||
}
|
||||
if err := immutable.Finalize(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if exp := req.Exporter; exp != nil {
|
||||
if err := inVertexContext(ctx, exp.Name(), func(ctx context.Context) error {
|
||||
return exp.Export(ctx, immutable, exporterOpt)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if exportName := req.ExportCacheRef; exportName != "" {
|
||||
if err := inVertexContext(ctx, "exporting build cache", func(ctx context.Context) error {
|
||||
cache, err := j.cacheExporter(ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
records, err := cache.Export(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: multiworker
|
||||
return s.ce.Export(ctx, records, exportName)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error {
|
||||
j, err := s.jobs.get(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer close(statusChan)
|
||||
return j.pipe(ctx, statusChan)
|
||||
}
|
||||
|
||||
func (s *Solver) SubBuild(ctx context.Context, dgst digest.Digest, req SolveRequest) (Ref, error) {
|
||||
jl := s.jobs
|
||||
jl.mu.Lock()
|
||||
st, ok := jl.actives[dgst]
|
||||
if !ok {
|
||||
jl.mu.Unlock()
|
||||
return nil, errors.Errorf("no such parent vertex: %v", dgst)
|
||||
}
|
||||
|
||||
var inp *Input
|
||||
var cache instructioncache.InstructionCache
|
||||
for j := range st.jobs {
|
||||
var err error
|
||||
inp, err = j.loadInternal(req.Definition, s.resolve)
|
||||
if err != nil {
|
||||
jl.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
cache = j.cache // TODO: combine?
|
||||
}
|
||||
st = jl.actives[inp.Vertex.Digest()]
|
||||
jl.mu.Unlock()
|
||||
|
||||
return getRef(ctx, st.solver, inp.Vertex.(*vertex).clientVertex, inp.Index, cache) // TODO: combine to pass single input // TODO: export cache for subbuilds
|
||||
}
|
||||
|
||||
type VertexSolver interface {
|
||||
CacheKey(ctx context.Context, index Index) (digest.Digest, error)
|
||||
OutputEvaluator(Index) (VertexEvaluator, error)
|
||||
Release() error
|
||||
Cache(Index, Ref) CacheExporter
|
||||
}
|
||||
|
||||
type vertexInput struct {
|
||||
solver VertexSolver
|
||||
ev VertexEvaluator
|
||||
cacheKeys []digest.Digest
|
||||
ref Ref
|
||||
}
|
||||
|
||||
type vertexSolver struct {
|
||||
inputs []*vertexInput
|
||||
v *vertex
|
||||
cv client.Vertex
|
||||
op Op
|
||||
cache instructioncache.InstructionCache
|
||||
refs []*SharedRef
|
||||
f *bgfunc.F
|
||||
ctx context.Context
|
||||
|
||||
baseKey digest.Digest
|
||||
mu sync.Mutex
|
||||
results []digest.Digest
|
||||
markCachedOnce sync.Once
|
||||
contentKey digest.Digest
|
||||
|
||||
signal *signal // used to notify that there are callers who need more data
|
||||
}
|
||||
|
||||
type resolveF func(digest.Digest) (VertexSolver, error)
|
||||
|
||||
func newVertexSolver(ctx context.Context, v *vertex, op Op, c instructioncache.InstructionCache, resolve resolveF) (*vertexSolver, error) {
|
||||
inputs := make([]*vertexInput, len(v.inputs))
|
||||
for i, in := range v.inputs {
|
||||
s, err := resolve(in.vertex.digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev, err := s.OutputEvaluator(in.index)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev.Cancel()
|
||||
inputs[i] = &vertexInput{
|
||||
solver: s,
|
||||
ev: ev,
|
||||
}
|
||||
}
|
||||
return &vertexSolver{
|
||||
ctx: ctx,
|
||||
inputs: inputs,
|
||||
v: v,
|
||||
cv: v.clientVertex,
|
||||
op: op,
|
||||
cache: c,
|
||||
signal: newSignaller(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func markCached(ctx context.Context, cv client.Vertex) {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
defer pw.Close()
|
||||
|
||||
if cv.Started == nil {
|
||||
now := time.Now()
|
||||
cv.Started = &now
|
||||
cv.Completed = &now
|
||||
cv.Cached = true
|
||||
}
|
||||
pw.Write(cv.Digest.String(), cv)
|
||||
}
|
||||
|
||||
type CacheExporter interface {
|
||||
Export(context.Context) ([]cacheimport.CacheRecord, error)
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) Cache(index Index, ref Ref) CacheExporter {
|
||||
return &cacheExporter{vertexSolver: vs, index: index, ref: ref}
|
||||
}
|
||||
|
||||
type cacheExporter struct {
|
||||
*vertexSolver
|
||||
index Index
|
||||
ref Ref
|
||||
}
|
||||
|
||||
func (ce *cacheExporter) Export(ctx context.Context) ([]cacheimport.CacheRecord, error) {
|
||||
return ce.vertexSolver.Export(ctx, ce.index, ce.ref)
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) Export(ctx context.Context, index Index, ref Ref) ([]cacheimport.CacheRecord, error) {
|
||||
mp := map[digest.Digest]cacheimport.CacheRecord{}
|
||||
if err := vs.appendInputCache(ctx, mp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dgst, err := vs.mainCacheKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
immutable, ok := ToImmutableRef(ref)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid reference")
|
||||
}
|
||||
dgst = cacheKeyForIndex(dgst, index)
|
||||
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: immutable}
|
||||
out := make([]cacheimport.CacheRecord, 0, len(mp))
|
||||
for _, cr := range mp {
|
||||
out = append(out, cr)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) appendInputCache(ctx context.Context, mp map[digest.Digest]cacheimport.CacheRecord) error {
|
||||
for i, inp := range vs.inputs {
|
||||
mainDgst, err := inp.solver.(*vertexSolver).mainCacheKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dgst := cacheKeyForIndex(mainDgst, vs.v.inputs[i].index)
|
||||
if cr, ok := mp[dgst]; !ok || (cr.Reference == nil && inp.ref != nil) {
|
||||
if err := inp.solver.(*vertexSolver).appendInputCache(ctx, mp); err != nil {
|
||||
return err
|
||||
}
|
||||
if inp.ref != nil && len(inp.solver.(*vertexSolver).inputs) > 0 { // Ignore pushing the refs for sources
|
||||
ref, ok := ToImmutableRef(inp.ref)
|
||||
if !ok {
|
||||
return errors.Errorf("invalid reference")
|
||||
}
|
||||
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: ref}
|
||||
} else {
|
||||
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst}
|
||||
}
|
||||
}
|
||||
}
|
||||
if ck := vs.contentKey; ck != "" {
|
||||
mainDgst, err := vs.mainCacheKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mp[ck] = cacheimport.CacheRecord{CacheKey: mainDgst, ContentKey: ck}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) CacheKey(ctx context.Context, index Index) (digest.Digest, error) {
|
||||
vs.mu.Lock()
|
||||
defer vs.mu.Unlock()
|
||||
if vs.baseKey == "" {
|
||||
eg, ctx := errgroup.WithContext(vs.ctx)
|
||||
for i := range vs.inputs {
|
||||
func(i int) {
|
||||
eg.Go(func() error {
|
||||
k, err := vs.inputs[i].solver.CacheKey(ctx, vs.v.inputs[i].index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vs.inputs[i].cacheKeys = append(vs.inputs[i].cacheKeys, k)
|
||||
return nil
|
||||
})
|
||||
}(i)
|
||||
}
|
||||
var dgst digest.Digest
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
dgst, err = vs.op.CacheKey(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
vs.baseKey = dgst
|
||||
}
|
||||
|
||||
k, err := vs.lastCacheKey()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return cacheKeyForIndex(k, index), nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) lastCacheKey() (digest.Digest, error) {
|
||||
return vs.currentCacheKey(true)
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) mainCacheKey() (digest.Digest, error) {
|
||||
return vs.currentCacheKey(false)
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) currentCacheKey(last bool) (digest.Digest, error) {
|
||||
inputKeys := make([]digest.Digest, len(vs.inputs))
|
||||
for i, inp := range vs.inputs {
|
||||
if len(inp.cacheKeys) == 0 {
|
||||
return "", errors.Errorf("inputs not processed")
|
||||
}
|
||||
if last {
|
||||
inputKeys[i] = inp.cacheKeys[len(inp.cacheKeys)-1]
|
||||
} else {
|
||||
inputKeys[i] = inp.cacheKeys[0]
|
||||
}
|
||||
}
|
||||
dt, err := json.Marshal(struct {
|
||||
Inputs []digest.Digest
|
||||
CacheKey digest.Digest
|
||||
}{Inputs: inputKeys, CacheKey: vs.baseKey})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return digest.FromBytes(dt), nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) OutputEvaluator(index Index) (VertexEvaluator, error) {
|
||||
if vs.f == nil {
|
||||
f, err := bgfunc.New(vs.ctx, vs.run)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vs.f = f
|
||||
}
|
||||
c := vs.f.NewCaller()
|
||||
ve := &vertexEvaluator{vertexSolver: vs, c: c, index: index}
|
||||
return ve, nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) Release() error {
|
||||
for _, inp := range vs.inputs {
|
||||
if inp.ref != nil {
|
||||
inp.ref.Release(context.TODO())
|
||||
}
|
||||
}
|
||||
if vs.refs != nil {
|
||||
for _, r := range vs.refs {
|
||||
r.Release(context.TODO())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// run is called by the bgfunc concurrency primitive. This function may be
|
||||
// called multiple times but never in parallal. Repeated calls should make an
|
||||
// effort to continue from previous state. Lock vs.mu to syncronize data to the
|
||||
// callers. Signal parameter can be used to notify callers that new data is
|
||||
// available without returning from the function.
|
||||
func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
|
||||
vs.mu.Lock()
|
||||
if vs.refs != nil {
|
||||
vs.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
vs.mu.Unlock()
|
||||
|
||||
waitFirst := vs.signal.Wait()
|
||||
waitRun := waitFirst
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-waitFirst:
|
||||
}
|
||||
|
||||
// this is where you lookup the cache keys that were successfully probed
|
||||
|
||||
eg, ctx2 := errgroup.WithContext(ctx)
|
||||
|
||||
// process all the inputs
|
||||
for i, inp := range vs.inputs {
|
||||
if inp.ref == nil {
|
||||
func(i int) {
|
||||
eg.Go(func() error {
|
||||
inp := vs.inputs[i]
|
||||
defer inp.ev.Cancel()
|
||||
|
||||
waitNext := waitFirst
|
||||
for {
|
||||
select {
|
||||
case <-ctx2.Done():
|
||||
return ctx2.Err()
|
||||
case <-waitNext:
|
||||
}
|
||||
|
||||
// check if current cache key is in cache
|
||||
if len(inp.cacheKeys) > 0 {
|
||||
ref, err := vs.cache.Lookup(ctx2, inp.cacheKeys[len(inp.cacheKeys)-1], inp.solver.(*vertexSolver).v.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ref != nil {
|
||||
inp.ref = ref.(Ref)
|
||||
inp.solver.(*vertexSolver).markCachedOnce.Do(func() {
|
||||
markCached(ctx, inp.solver.(*vertexSolver).cv)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// evaluate next cachekey/reference for input
|
||||
res, err := inp.ev.Next(ctx2)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res == nil { // there is no more data coming
|
||||
return nil
|
||||
}
|
||||
if ref := res.Reference; ref != nil {
|
||||
if ref, ok := ToImmutableRef(ref); ok {
|
||||
if !cache.HasCachePolicyRetain(ref) {
|
||||
if err := cache.CachePolicyRetain(ref); err != nil {
|
||||
return err
|
||||
}
|
||||
ref.Metadata().Commit()
|
||||
}
|
||||
inp.ref = ref
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only try matching cache if the cachekey for input is present
|
||||
exists, err := vs.cache.Probe(ctx2, res.CacheKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
vs.mu.Lock()
|
||||
inp.cacheKeys = append(inp.cacheKeys, res.CacheKey)
|
||||
dgst, err := vs.lastCacheKey()
|
||||
if err != nil {
|
||||
vs.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
vs.results = append(vs.results, dgst)
|
||||
signal() // wake up callers
|
||||
waitNext = vs.signal.Reset() // make sure we don't continue unless there are callers
|
||||
waitRun = waitNext
|
||||
vs.mu.Unlock()
|
||||
}
|
||||
}
|
||||
})
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Find extra cache keys by content
|
||||
inputRefs := make([]Ref, len(vs.inputs))
|
||||
lastInputKeys := make([]digest.Digest, len(vs.inputs))
|
||||
for i := range vs.inputs {
|
||||
inputRefs[i] = vs.inputs[i].ref
|
||||
lastInputKeys[i] = vs.inputs[i].cacheKeys[len(vs.inputs[i].cacheKeys)-1]
|
||||
}
|
||||
|
||||
dgst, inp, err := vs.op.ContentMask(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var contentKey digest.Digest
|
||||
if dgst != "" {
|
||||
contentKey, err = calculateContentHash(ctx, inputRefs, dgst, lastInputKeys, inp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vs.contentKey = contentKey
|
||||
|
||||
var extraKeys []digest.Digest
|
||||
cks, err := vs.cache.GetContentMapping(contentKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
extraKeys = append(extraKeys, cks...)
|
||||
if len(extraKeys) > 0 {
|
||||
vs.mu.Lock()
|
||||
vs.results = append(vs.results, extraKeys...)
|
||||
signal()
|
||||
waitRun = vs.signal.Reset()
|
||||
vs.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-waitRun:
|
||||
}
|
||||
|
||||
// no cache hit. start evaluating the node
|
||||
notifyStarted(ctx, &vs.cv)
|
||||
defer func() {
|
||||
notifyCompleted(ctx, &vs.cv, retErr)
|
||||
}()
|
||||
|
||||
refs, err := vs.op.Run(ctx, inputRefs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sr := make([]*SharedRef, len(refs))
|
||||
for i, r := range refs {
|
||||
sr[i] = NewSharedRef(r)
|
||||
}
|
||||
vs.mu.Lock()
|
||||
vs.refs = sr
|
||||
vs.mu.Unlock()
|
||||
|
||||
// store the cacheKeys for current refs
|
||||
if vs.cache != nil {
|
||||
cacheKey, err := vs.lastCacheKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, ref := range refs {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r := OriginRef(ref)
|
||||
if err := vs.cache.Set(cacheKeyForIndex(cacheKey, Index(i)), r); err != nil {
|
||||
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
|
||||
}
|
||||
}
|
||||
if contentKey != "" {
|
||||
if err := vs.cache.SetContentMapping(contentKey, cacheKey); err != nil {
|
||||
logrus.Errorf("failed to save content mapping: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getInputContentHash(ctx context.Context, ref cache.ImmutableRef, selectors []string) (digest.Digest, error) {
|
||||
out := make([]digest.Digest, 0, len(selectors))
|
||||
for _, s := range selectors {
|
||||
dgst, err := contenthash.Checksum(ctx, ref, s)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
out = append(out, dgst)
|
||||
}
|
||||
if len(out) == 1 {
|
||||
return out[0], nil
|
||||
}
|
||||
dt, err := json.Marshal(out)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return digest.FromBytes(dt), nil
|
||||
}
|
||||
|
||||
func calculateContentHash(ctx context.Context, refs []Ref, mainDigest digest.Digest, inputs []digest.Digest, contentMap [][]string) (digest.Digest, error) {
|
||||
dgsts := make([]digest.Digest, len(contentMap))
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for i, sel := range contentMap {
|
||||
if sel == nil {
|
||||
dgsts[i] = inputs[i]
|
||||
continue
|
||||
}
|
||||
func(i int) {
|
||||
eg.Go(func() error {
|
||||
ref, ok := ToImmutableRef(refs[i])
|
||||
if !ok {
|
||||
return errors.Errorf("invalid reference for exporting: %T", ref)
|
||||
}
|
||||
dgst, err := getInputContentHash(ctx, ref, contentMap[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dgsts[i] = dgst
|
||||
return nil
|
||||
})
|
||||
}(i)
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
dt, err := json.Marshal(struct {
|
||||
Main digest.Digest
|
||||
Inputs []digest.Digest
|
||||
}{
|
||||
Main: mainDigest,
|
||||
Inputs: dgsts,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return digest.FromBytes(dt), nil
|
||||
}
|
||||
|
||||
type VertexEvaluator interface {
|
||||
Next(context.Context) (*VertexResult, error)
|
||||
Cancel() error
|
||||
}
|
||||
|
||||
type vertexEvaluator struct {
|
||||
*vertexSolver
|
||||
c *bgfunc.Caller
|
||||
cursor int
|
||||
index Index
|
||||
}
|
||||
|
||||
func (ve *vertexEvaluator) Next(ctx context.Context) (*VertexResult, error) {
|
||||
v, err := ve.c.Call(ctx, func() (interface{}, error) {
|
||||
ve.mu.Lock()
|
||||
defer ve.mu.Unlock()
|
||||
if ve.refs != nil {
|
||||
return &VertexResult{Reference: ve.refs[int(ve.index)].Clone()}, nil
|
||||
}
|
||||
if i := ve.cursor; i < len(ve.results) {
|
||||
ve.cursor++
|
||||
return &VertexResult{CacheKey: cacheKeyForIndex(ve.results[i], ve.index)}, nil
|
||||
}
|
||||
ve.signal.Signal()
|
||||
return nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if v == nil {
|
||||
return nil, nil // no more records are coming
|
||||
}
|
||||
return v.(*VertexResult), nil
|
||||
}
|
||||
|
||||
func (ve *vertexEvaluator) Cancel() error {
|
||||
return ve.c.Cancel()
|
||||
}
|
||||
|
||||
type VertexResult struct {
|
||||
CacheKey digest.Digest
|
||||
Reference Ref
|
||||
}
|
||||
|
||||
func cacheKeyForIndex(dgst digest.Digest, index Index) digest.Digest {
|
||||
return digest.FromBytes([]byte(fmt.Sprintf("%s.%d", dgst, index)))
|
||||
}
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
package solver
|
||||
|
||||
import (
|
||||
"github.com/moby/buildkit/frontend"
|
||||
solver "github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/solver/solver/llbop"
|
||||
"github.com/moby/buildkit/worker"
|
||||
)
|
||||
|
||||
// DetermineVertexWorker determines worker for a vertex.
|
||||
// Currently, constraint is just ignored.
|
||||
// Also we need to track the workers of the inputs.
|
||||
func DetermineVertexWorker(wc *worker.Controller, v solver.Vertex) (*worker.Worker, error) {
|
||||
// TODO: multiworker
|
||||
return wc.GetDefault()
|
||||
}
|
||||
|
||||
func NewLLBOpSolver(wc *worker.Controller, frontends map[string]frontend.Frontend) solver.Solver {
|
||||
var s *Solver
|
||||
s = New(func(v solver.Vertex) (solver.Op, error) {
|
||||
w, err := DetermineVertexWorker(wc, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch op := v.Sys().(type) {
|
||||
case *pb.Op_Source:
|
||||
return llbop.NewSourceOp(v, op, w.SourceManager)
|
||||
case *pb.Op_Exec:
|
||||
return llbop.NewExecOp(v, op, w.CacheManager, w.Executor)
|
||||
case *pb.Op_Build:
|
||||
return llbop.NewBuildOp(v, op, s)
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}, wc, DetermineVertexWorker, frontends)
|
||||
return s
|
||||
}
|
|
@ -1,729 +0,0 @@
|
|||
package solver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/cacheimport"
|
||||
"github.com/moby/buildkit/cache/contenthash"
|
||||
"github.com/moby/buildkit/cache/instructioncache"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
solver "github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/reference"
|
||||
"github.com/moby/buildkit/util/bgfunc"
|
||||
"github.com/moby/buildkit/util/progress"
|
||||
"github.com/moby/buildkit/worker"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// FIXME: Also we need to track the workers of the inputs.
|
||||
type VertexWorkerDeterminer func(wc *worker.Controller, v solver.Vertex) (*worker.Worker, error)
|
||||
|
||||
// ResolveOpFunc finds an Op implementation for a vertex
|
||||
type ResolveOpFunc func(solver.Vertex) (solver.Op, error)
|
||||
|
||||
type Solver struct {
|
||||
resolve ResolveOpFunc
|
||||
jobs *jobList
|
||||
workerController *worker.Controller
|
||||
determineVertexWorker VertexWorkerDeterminer
|
||||
frontends map[string]frontend.Frontend
|
||||
}
|
||||
|
||||
func New(resolve ResolveOpFunc, wc *worker.Controller, vwd VertexWorkerDeterminer, f map[string]frontend.Frontend) *Solver {
|
||||
return &Solver{resolve: resolve, jobs: newJobList(), workerController: wc, determineVertexWorker: vwd, frontends: f}
|
||||
}
|
||||
|
||||
func (s *Solver) solve(ctx context.Context, j *job, req solver.SolveRequest) (solver.Ref, map[string][]byte, error) {
|
||||
if req.Definition == nil {
|
||||
if req.Frontend == nil {
|
||||
return nil, nil, errors.Errorf("invalid request: no definition nor frontend")
|
||||
}
|
||||
return req.Frontend.Solve(ctx, s.llbBridge(j), req.FrontendOpt)
|
||||
}
|
||||
|
||||
inp, err := j.load(req.Definition, s.resolve)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
ref, err := j.getRef(ctx, inp.Vertex.(*vertex).clientVertex, inp.Index)
|
||||
return ref, nil, err
|
||||
}
|
||||
|
||||
func (s *Solver) llbBridge(j *job) *llbBridge {
|
||||
// FIXME(AkihiroSuda): make sure worker implements interfaces required by llbBridge
|
||||
worker, err := s.workerController.GetDefault()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &llbBridge{job: j, Solver: s, worker: worker}
|
||||
}
|
||||
|
||||
func (s *Solver) Solve(ctx context.Context, id string, req solver.SolveRequest) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
pr, ctx, closeProgressWriter := progress.NewContext(ctx)
|
||||
defer closeProgressWriter()
|
||||
|
||||
// TODO: multiworker
|
||||
defaultWorker, err := s.workerController.GetDefault()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if importRef := req.ImportCacheRef; importRef != "" {
|
||||
cache, err := defaultWorker.CacheImporter.Import(ctx, importRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defaultWorker.InstructionCache = instructioncache.Union(defaultWorker.InstructionCache, cache)
|
||||
}
|
||||
|
||||
// register a build job. vertex needs to be loaded to a job to run
|
||||
ctx, j, err := s.jobs.new(ctx, id, pr, defaultWorker.InstructionCache)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ref, exporterOpt, err := s.solve(ctx, j, req)
|
||||
defer j.discard()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if ref != nil {
|
||||
go ref.Release(context.TODO())
|
||||
}
|
||||
}()
|
||||
|
||||
var immutable cache.ImmutableRef
|
||||
if ref != nil {
|
||||
var ok bool
|
||||
immutable, ok = reference.ToImmutableRef(ref)
|
||||
if !ok {
|
||||
return errors.Errorf("invalid reference for exporting: %T", ref)
|
||||
}
|
||||
if err := immutable.Finalize(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if exp := req.Exporter; exp != nil {
|
||||
if err := inVertexContext(ctx, exp.Name(), func(ctx context.Context) error {
|
||||
return exp.Export(ctx, immutable, exporterOpt)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if exportName := req.ExportCacheRef; exportName != "" {
|
||||
if err := inVertexContext(ctx, "exporting build cache", func(ctx context.Context) error {
|
||||
cache, err := j.cacheExporter(ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
records, err := cache.Export(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: multiworker
|
||||
return defaultWorker.CacheExporter.Export(ctx, records, exportName)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error {
|
||||
j, err := s.jobs.get(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer close(statusChan)
|
||||
return j.pipe(ctx, statusChan)
|
||||
}
|
||||
|
||||
func (s *Solver) SubBuild(ctx context.Context, dgst digest.Digest, req solver.SolveRequest) (solver.Ref, error) {
|
||||
jl := s.jobs
|
||||
jl.mu.Lock()
|
||||
st, ok := jl.actives[dgst]
|
||||
if !ok {
|
||||
jl.mu.Unlock()
|
||||
return nil, errors.Errorf("no such parent vertex: %v", dgst)
|
||||
}
|
||||
|
||||
var inp *solver.Input
|
||||
for j := range st.jobs {
|
||||
var err error
|
||||
inp, err = j.loadInternal(req.Definition, s.resolve)
|
||||
if err != nil {
|
||||
jl.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
st = jl.actives[inp.Vertex.Digest()]
|
||||
jl.mu.Unlock()
|
||||
|
||||
w, err := s.determineVertexWorker(s.workerController, inp.Vertex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return getRef(ctx, st.solver, inp.Vertex.(*vertex).clientVertex, inp.Index, w.InstructionCache) // TODO: combine to pass single input // TODO: export cache for subbuilds
|
||||
}
|
||||
|
||||
type VertexSolver interface {
|
||||
CacheKey(ctx context.Context, index solver.Index) (digest.Digest, error)
|
||||
OutputEvaluator(solver.Index) (VertexEvaluator, error)
|
||||
Release() error
|
||||
Cache(solver.Index, solver.Ref) CacheExporter
|
||||
}
|
||||
|
||||
type vertexInput struct {
|
||||
solver VertexSolver
|
||||
ev VertexEvaluator
|
||||
cacheKeys []digest.Digest
|
||||
ref solver.Ref
|
||||
}
|
||||
|
||||
type vertexSolver struct {
|
||||
inputs []*vertexInput
|
||||
v *vertex
|
||||
cv client.Vertex
|
||||
op solver.Op
|
||||
cache instructioncache.InstructionCache
|
||||
refs []*reference.SharedRef
|
||||
f *bgfunc.F
|
||||
ctx context.Context
|
||||
|
||||
baseKey digest.Digest
|
||||
mu sync.Mutex
|
||||
results []digest.Digest
|
||||
markCachedOnce sync.Once
|
||||
contentKey digest.Digest
|
||||
|
||||
signal *signal // used to notify that there are callers who need more data
|
||||
}
|
||||
|
||||
type resolveF func(digest.Digest) (VertexSolver, error)
|
||||
|
||||
func newVertexSolver(ctx context.Context, v *vertex, op solver.Op, c instructioncache.InstructionCache, resolve resolveF) (*vertexSolver, error) {
|
||||
inputs := make([]*vertexInput, len(v.inputs))
|
||||
for i, in := range v.inputs {
|
||||
s, err := resolve(in.vertex.digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev, err := s.OutputEvaluator(in.index)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev.Cancel()
|
||||
inputs[i] = &vertexInput{
|
||||
solver: s,
|
||||
ev: ev,
|
||||
}
|
||||
}
|
||||
return &vertexSolver{
|
||||
ctx: ctx,
|
||||
inputs: inputs,
|
||||
v: v,
|
||||
cv: v.clientVertex,
|
||||
op: op,
|
||||
cache: c,
|
||||
signal: newSignaller(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func markCached(ctx context.Context, cv client.Vertex) {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
defer pw.Close()
|
||||
|
||||
if cv.Started == nil {
|
||||
now := time.Now()
|
||||
cv.Started = &now
|
||||
cv.Completed = &now
|
||||
cv.Cached = true
|
||||
}
|
||||
pw.Write(cv.Digest.String(), cv)
|
||||
}
|
||||
|
||||
type CacheExporter interface {
|
||||
Export(context.Context) ([]cacheimport.CacheRecord, error)
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) Cache(index solver.Index, ref solver.Ref) CacheExporter {
|
||||
return &cacheExporter{vertexSolver: vs, index: index, ref: ref}
|
||||
}
|
||||
|
||||
type cacheExporter struct {
|
||||
*vertexSolver
|
||||
index solver.Index
|
||||
ref solver.Ref
|
||||
}
|
||||
|
||||
func (ce *cacheExporter) Export(ctx context.Context) ([]cacheimport.CacheRecord, error) {
|
||||
return ce.vertexSolver.Export(ctx, ce.index, ce.ref)
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) Export(ctx context.Context, index solver.Index, ref solver.Ref) ([]cacheimport.CacheRecord, error) {
|
||||
mp := map[digest.Digest]cacheimport.CacheRecord{}
|
||||
if err := vs.appendInputCache(ctx, mp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dgst, err := vs.mainCacheKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
immutable, ok := reference.ToImmutableRef(ref)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid reference")
|
||||
}
|
||||
dgst = cacheKeyForIndex(dgst, index)
|
||||
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: immutable}
|
||||
out := make([]cacheimport.CacheRecord, 0, len(mp))
|
||||
for _, cr := range mp {
|
||||
out = append(out, cr)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) appendInputCache(ctx context.Context, mp map[digest.Digest]cacheimport.CacheRecord) error {
|
||||
for i, inp := range vs.inputs {
|
||||
mainDgst, err := inp.solver.(*vertexSolver).mainCacheKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dgst := cacheKeyForIndex(mainDgst, vs.v.inputs[i].index)
|
||||
if cr, ok := mp[dgst]; !ok || (cr.Reference == nil && inp.ref != nil) {
|
||||
if err := inp.solver.(*vertexSolver).appendInputCache(ctx, mp); err != nil {
|
||||
return err
|
||||
}
|
||||
if inp.ref != nil && len(inp.solver.(*vertexSolver).inputs) > 0 { // Ignore pushing the refs for sources
|
||||
ref, ok := reference.ToImmutableRef(inp.ref)
|
||||
if !ok {
|
||||
return errors.Errorf("invalid reference")
|
||||
}
|
||||
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: ref}
|
||||
} else {
|
||||
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst}
|
||||
}
|
||||
}
|
||||
}
|
||||
if ck := vs.contentKey; ck != "" {
|
||||
mainDgst, err := vs.mainCacheKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mp[ck] = cacheimport.CacheRecord{CacheKey: mainDgst, ContentKey: ck}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) CacheKey(ctx context.Context, index solver.Index) (digest.Digest, error) {
|
||||
vs.mu.Lock()
|
||||
defer vs.mu.Unlock()
|
||||
if vs.baseKey == "" {
|
||||
eg, ctx := errgroup.WithContext(vs.ctx)
|
||||
for i := range vs.inputs {
|
||||
func(i int) {
|
||||
eg.Go(func() error {
|
||||
k, err := vs.inputs[i].solver.CacheKey(ctx, vs.v.inputs[i].index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vs.inputs[i].cacheKeys = append(vs.inputs[i].cacheKeys, k)
|
||||
return nil
|
||||
})
|
||||
}(i)
|
||||
}
|
||||
var dgst digest.Digest
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
dgst, err = vs.op.CacheKey(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
vs.baseKey = dgst
|
||||
}
|
||||
|
||||
k, err := vs.lastCacheKey()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return cacheKeyForIndex(k, index), nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) lastCacheKey() (digest.Digest, error) {
|
||||
return vs.currentCacheKey(true)
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) mainCacheKey() (digest.Digest, error) {
|
||||
return vs.currentCacheKey(false)
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) currentCacheKey(last bool) (digest.Digest, error) {
|
||||
inputKeys := make([]digest.Digest, len(vs.inputs))
|
||||
for i, inp := range vs.inputs {
|
||||
if len(inp.cacheKeys) == 0 {
|
||||
return "", errors.Errorf("inputs not processed")
|
||||
}
|
||||
if last {
|
||||
inputKeys[i] = inp.cacheKeys[len(inp.cacheKeys)-1]
|
||||
} else {
|
||||
inputKeys[i] = inp.cacheKeys[0]
|
||||
}
|
||||
}
|
||||
dt, err := json.Marshal(struct {
|
||||
Inputs []digest.Digest
|
||||
CacheKey digest.Digest
|
||||
}{Inputs: inputKeys, CacheKey: vs.baseKey})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return digest.FromBytes(dt), nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) OutputEvaluator(index solver.Index) (VertexEvaluator, error) {
|
||||
if vs.f == nil {
|
||||
f, err := bgfunc.New(vs.ctx, vs.run)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vs.f = f
|
||||
}
|
||||
c := vs.f.NewCaller()
|
||||
ve := &vertexEvaluator{vertexSolver: vs, c: c, index: index}
|
||||
return ve, nil
|
||||
}
|
||||
|
||||
func (vs *vertexSolver) Release() error {
|
||||
for _, inp := range vs.inputs {
|
||||
if inp.ref != nil {
|
||||
inp.ref.Release(context.TODO())
|
||||
}
|
||||
}
|
||||
if vs.refs != nil {
|
||||
for _, r := range vs.refs {
|
||||
r.Release(context.TODO())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// run is called by the bgfunc concurrency primitive. This function may be
|
||||
// called multiple times but never in parallal. Repeated calls should make an
|
||||
// effort to continue from previous state. Lock vs.mu to syncronize data to the
|
||||
// callers. Signal parameter can be used to notify callers that new data is
|
||||
// available without returning from the function.
|
||||
func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
|
||||
vs.mu.Lock()
|
||||
if vs.refs != nil {
|
||||
vs.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
vs.mu.Unlock()
|
||||
|
||||
waitFirst := vs.signal.Wait()
|
||||
waitRun := waitFirst
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-waitFirst:
|
||||
}
|
||||
|
||||
// this is where you lookup the cache keys that were successfully probed
|
||||
|
||||
eg, ctx2 := errgroup.WithContext(ctx)
|
||||
|
||||
// process all the inputs
|
||||
for i, inp := range vs.inputs {
|
||||
if inp.ref == nil {
|
||||
func(i int) {
|
||||
eg.Go(func() error {
|
||||
inp := vs.inputs[i]
|
||||
defer inp.ev.Cancel()
|
||||
|
||||
waitNext := waitFirst
|
||||
for {
|
||||
select {
|
||||
case <-ctx2.Done():
|
||||
return ctx2.Err()
|
||||
case <-waitNext:
|
||||
}
|
||||
|
||||
// check if current cache key is in cache
|
||||
if len(inp.cacheKeys) > 0 {
|
||||
ref, err := vs.cache.Lookup(ctx2, inp.cacheKeys[len(inp.cacheKeys)-1], inp.solver.(*vertexSolver).v.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ref != nil {
|
||||
inp.ref = ref.(solver.Ref)
|
||||
inp.solver.(*vertexSolver).markCachedOnce.Do(func() {
|
||||
markCached(ctx, inp.solver.(*vertexSolver).cv)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// evaluate next cachekey/reference for input
|
||||
res, err := inp.ev.Next(ctx2)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res == nil { // there is no more data coming
|
||||
return nil
|
||||
}
|
||||
if ref := res.Reference; ref != nil {
|
||||
if ref, ok := reference.ToImmutableRef(ref); ok {
|
||||
if !cache.HasCachePolicyRetain(ref) {
|
||||
if err := cache.CachePolicyRetain(ref); err != nil {
|
||||
return err
|
||||
}
|
||||
ref.Metadata().Commit()
|
||||
}
|
||||
inp.ref = ref
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only try matching cache if the cachekey for input is present
|
||||
exists, err := vs.cache.Probe(ctx2, res.CacheKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
vs.mu.Lock()
|
||||
inp.cacheKeys = append(inp.cacheKeys, res.CacheKey)
|
||||
dgst, err := vs.lastCacheKey()
|
||||
if err != nil {
|
||||
vs.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
vs.results = append(vs.results, dgst)
|
||||
signal() // wake up callers
|
||||
waitNext = vs.signal.Reset() // make sure we don't continue unless there are callers
|
||||
waitRun = waitNext
|
||||
vs.mu.Unlock()
|
||||
}
|
||||
}
|
||||
})
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Find extra cache keys by content
|
||||
inputRefs := make([]solver.Ref, len(vs.inputs))
|
||||
lastInputKeys := make([]digest.Digest, len(vs.inputs))
|
||||
for i := range vs.inputs {
|
||||
inputRefs[i] = vs.inputs[i].ref
|
||||
lastInputKeys[i] = vs.inputs[i].cacheKeys[len(vs.inputs[i].cacheKeys)-1]
|
||||
}
|
||||
|
||||
dgst, inp, err := vs.op.ContentMask(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var contentKey digest.Digest
|
||||
if dgst != "" {
|
||||
contentKey, err = calculateContentHash(ctx, inputRefs, dgst, lastInputKeys, inp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vs.contentKey = contentKey
|
||||
|
||||
var extraKeys []digest.Digest
|
||||
cks, err := vs.cache.GetContentMapping(contentKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
extraKeys = append(extraKeys, cks...)
|
||||
if len(extraKeys) > 0 {
|
||||
vs.mu.Lock()
|
||||
vs.results = append(vs.results, extraKeys...)
|
||||
signal()
|
||||
waitRun = vs.signal.Reset()
|
||||
vs.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-waitRun:
|
||||
}
|
||||
|
||||
// no cache hit. start evaluating the node
|
||||
notifyStarted(ctx, &vs.cv)
|
||||
defer func() {
|
||||
notifyCompleted(ctx, &vs.cv, retErr)
|
||||
}()
|
||||
|
||||
refs, err := vs.op.Run(ctx, inputRefs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sr := make([]*reference.SharedRef, len(refs))
|
||||
for i, r := range refs {
|
||||
sr[i] = reference.NewSharedRef(r)
|
||||
}
|
||||
vs.mu.Lock()
|
||||
vs.refs = sr
|
||||
vs.mu.Unlock()
|
||||
|
||||
// store the cacheKeys for current refs
|
||||
if vs.cache != nil {
|
||||
cacheKey, err := vs.lastCacheKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, ref := range refs {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r := reference.OriginRef(ref)
|
||||
if err := vs.cache.Set(cacheKeyForIndex(cacheKey, solver.Index(i)), r); err != nil {
|
||||
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
|
||||
}
|
||||
}
|
||||
if contentKey != "" {
|
||||
if err := vs.cache.SetContentMapping(contentKey, cacheKey); err != nil {
|
||||
logrus.Errorf("failed to save content mapping: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getInputContentHash(ctx context.Context, ref cache.ImmutableRef, selectors []string) (digest.Digest, error) {
|
||||
out := make([]digest.Digest, 0, len(selectors))
|
||||
for _, s := range selectors {
|
||||
dgst, err := contenthash.Checksum(ctx, ref, s)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
out = append(out, dgst)
|
||||
}
|
||||
if len(out) == 1 {
|
||||
return out[0], nil
|
||||
}
|
||||
dt, err := json.Marshal(out)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return digest.FromBytes(dt), nil
|
||||
}
|
||||
|
||||
func calculateContentHash(ctx context.Context, refs []solver.Ref, mainDigest digest.Digest, inputs []digest.Digest, contentMap [][]string) (digest.Digest, error) {
|
||||
dgsts := make([]digest.Digest, len(contentMap))
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for i, sel := range contentMap {
|
||||
if sel == nil {
|
||||
dgsts[i] = inputs[i]
|
||||
continue
|
||||
}
|
||||
func(i int) {
|
||||
eg.Go(func() error {
|
||||
ref, ok := reference.ToImmutableRef(refs[i])
|
||||
if !ok {
|
||||
return errors.Errorf("invalid reference for exporting: %T", ref)
|
||||
}
|
||||
dgst, err := getInputContentHash(ctx, ref, contentMap[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dgsts[i] = dgst
|
||||
return nil
|
||||
})
|
||||
}(i)
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
dt, err := json.Marshal(struct {
|
||||
Main digest.Digest
|
||||
Inputs []digest.Digest
|
||||
}{
|
||||
Main: mainDigest,
|
||||
Inputs: dgsts,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return digest.FromBytes(dt), nil
|
||||
}
|
||||
|
||||
type VertexEvaluator interface {
|
||||
Next(context.Context) (*VertexResult, error)
|
||||
Cancel() error
|
||||
}
|
||||
|
||||
type vertexEvaluator struct {
|
||||
*vertexSolver
|
||||
c *bgfunc.Caller
|
||||
cursor int
|
||||
index solver.Index
|
||||
}
|
||||
|
||||
func (ve *vertexEvaluator) Next(ctx context.Context) (*VertexResult, error) {
|
||||
v, err := ve.c.Call(ctx, func() (interface{}, error) {
|
||||
ve.mu.Lock()
|
||||
defer ve.mu.Unlock()
|
||||
if ve.refs != nil {
|
||||
return &VertexResult{Reference: ve.refs[int(ve.index)].Clone()}, nil
|
||||
}
|
||||
if i := ve.cursor; i < len(ve.results) {
|
||||
ve.cursor++
|
||||
return &VertexResult{CacheKey: cacheKeyForIndex(ve.results[i], ve.index)}, nil
|
||||
}
|
||||
ve.signal.Signal()
|
||||
return nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if v == nil {
|
||||
return nil, nil // no more records are coming
|
||||
}
|
||||
return v.(*VertexResult), nil
|
||||
}
|
||||
|
||||
func (ve *vertexEvaluator) Cancel() error {
|
||||
return ve.c.Cancel()
|
||||
}
|
||||
|
||||
type VertexResult struct {
|
||||
CacheKey digest.Digest
|
||||
Reference solver.Ref
|
||||
}
|
||||
|
||||
func cacheKeyForIndex(dgst digest.Digest, index solver.Index) digest.Digest {
|
||||
return digest.FromBytes([]byte(fmt.Sprintf("%s.%d", dgst, index)))
|
||||
}
|
|
@ -1,104 +0,0 @@
|
|||
package solver
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/identity"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/util/progress"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type input struct {
|
||||
index solver.Index
|
||||
vertex *vertex
|
||||
}
|
||||
|
||||
type vertex struct {
|
||||
mu sync.Mutex
|
||||
sys interface{}
|
||||
metadata *pb.OpMetadata
|
||||
inputs []*input
|
||||
err error
|
||||
digest digest.Digest
|
||||
clientVertex client.Vertex
|
||||
name string
|
||||
notifyMu sync.Mutex
|
||||
}
|
||||
|
||||
func (v *vertex) initClientVertex() {
|
||||
inputDigests := make([]digest.Digest, 0, len(v.inputs))
|
||||
for _, inp := range v.inputs {
|
||||
inputDigests = append(inputDigests, inp.vertex.Digest())
|
||||
}
|
||||
v.clientVertex = client.Vertex{
|
||||
Inputs: inputDigests,
|
||||
Name: v.Name(),
|
||||
Digest: v.digest,
|
||||
}
|
||||
}
|
||||
|
||||
func (v *vertex) Digest() digest.Digest {
|
||||
return v.digest
|
||||
}
|
||||
|
||||
func (v *vertex) Sys() interface{} {
|
||||
return v.sys
|
||||
}
|
||||
|
||||
func (v *vertex) Metadata() *pb.OpMetadata {
|
||||
return v.metadata
|
||||
}
|
||||
|
||||
func (v *vertex) Inputs() (inputs []solver.Input) {
|
||||
inputs = make([]solver.Input, 0, len(v.inputs))
|
||||
for _, i := range v.inputs {
|
||||
inputs = append(inputs, solver.Input{i.index, i.vertex})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (v *vertex) Name() string {
|
||||
return v.name
|
||||
}
|
||||
|
||||
func notifyStarted(ctx context.Context, v *client.Vertex) {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
v.Started = &now
|
||||
v.Completed = nil
|
||||
pw.Write(v.Digest.String(), *v)
|
||||
}
|
||||
|
||||
func notifyCompleted(ctx context.Context, v *client.Vertex, err error) {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
if v.Started == nil {
|
||||
v.Started = &now
|
||||
}
|
||||
v.Completed = &now
|
||||
v.Cached = false
|
||||
if err != nil {
|
||||
v.Error = err.Error()
|
||||
}
|
||||
pw.Write(v.Digest.String(), *v)
|
||||
}
|
||||
|
||||
func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error {
|
||||
v := client.Vertex{
|
||||
Digest: digest.FromBytes([]byte(identity.NewID())),
|
||||
Name: name,
|
||||
}
|
||||
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
|
||||
notifyStarted(ctx, &v)
|
||||
defer pw.Close()
|
||||
err := f(ctx)
|
||||
notifyCompleted(ctx, &v, err)
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package solver
|
||||
|
||||
import (
|
||||
"github.com/moby/buildkit/solver/types"
|
||||
)
|
||||
|
||||
// Ref is a reference to the object passed through the build steps.
|
||||
// This interface is a subset of the github.com/buildkit/buildkit/cache.Ref interface.
|
||||
// For ease of unit testing, this interface only has Release().
|
||||
|
||||
type Ref = types.Ref
|
||||
type Op = types.Op
|
||||
type SolveRequest = types.SolveRequest
|
||||
type Vertex = types.Vertex
|
||||
type Input = types.Input
|
||||
type Index = types.Index
|
|
@ -0,0 +1,58 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"github.com/moby/buildkit/exporter"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// These could be also defined in worker
|
||||
|
||||
type Ref interface {
|
||||
Release(context.Context) error
|
||||
}
|
||||
|
||||
// Op is an implementation for running a vertex
|
||||
type Op interface {
|
||||
// CacheKey returns a persistent cache key for operation.
|
||||
CacheKey(context.Context) (digest.Digest, error)
|
||||
// ContentMask returns a partial cache checksum with content paths to the
|
||||
// inputs. User can combine the content checksum of these paths to get a valid
|
||||
// content based cache key.
|
||||
ContentMask(context.Context) (digest.Digest, [][]string, error)
|
||||
// Run runs an operation and returns the output references.
|
||||
Run(ctx context.Context, inputs []Ref) (outputs []Ref, err error)
|
||||
}
|
||||
|
||||
type SolveRequest struct {
|
||||
Definition *pb.Definition
|
||||
Frontend frontend.Frontend
|
||||
Exporter exporter.ExporterInstance
|
||||
FrontendOpt map[string]string
|
||||
ExportCacheRef string
|
||||
ImportCacheRef string
|
||||
}
|
||||
|
||||
// Vertex is one node in the build graph
|
||||
type Vertex interface {
|
||||
// Digest is a content-addressable vertex identifier
|
||||
Digest() digest.Digest
|
||||
// Sys returns an internal value that is used to execute the vertex. Usually
|
||||
// this is capured by the operation resolver method during solve.
|
||||
Sys() interface{}
|
||||
// FIXME(AkihiroSuda): we should not import pb pkg here.
|
||||
Metadata() *pb.OpMetadata
|
||||
// Array of vertexes current vertex depends on.
|
||||
Inputs() []Input
|
||||
Name() string // change this to general metadata
|
||||
}
|
||||
|
||||
type Index int
|
||||
|
||||
// Input is an pointer to a single reference from a vertex by an index.
|
||||
type Input struct {
|
||||
Index Index
|
||||
Vertex Vertex
|
||||
}
|
111
solver/vertex.go
111
solver/vertex.go
|
@ -1,28 +1,103 @@
|
|||
package solver
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/identity"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/util/progress"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// Vertex is one node in the build graph
|
||||
type Vertex interface {
|
||||
// Digest is a content-addressable vertex identifier
|
||||
Digest() digest.Digest
|
||||
// Sys returns an internal value that is used to execute the vertex. Usually
|
||||
// this is capured by the operation resolver method during solve.
|
||||
Sys() interface{}
|
||||
// FIXME(AkihiroSuda): we should not import pb pkg here.
|
||||
Metadata() *pb.OpMetadata
|
||||
// Array of vertexes current vertex depends on.
|
||||
Inputs() []Input
|
||||
Name() string // change this to general metadata
|
||||
type input struct {
|
||||
index Index
|
||||
vertex *vertex
|
||||
}
|
||||
|
||||
type Index int
|
||||
|
||||
// Input is an pointer to a single reference from a vertex by an index.
|
||||
type Input struct {
|
||||
Index Index
|
||||
Vertex Vertex
|
||||
type vertex struct {
|
||||
mu sync.Mutex
|
||||
sys interface{}
|
||||
metadata *pb.OpMetadata
|
||||
inputs []*input
|
||||
err error
|
||||
digest digest.Digest
|
||||
clientVertex client.Vertex
|
||||
name string
|
||||
notifyMu sync.Mutex
|
||||
}
|
||||
|
||||
func (v *vertex) initClientVertex() {
|
||||
inputDigests := make([]digest.Digest, 0, len(v.inputs))
|
||||
for _, inp := range v.inputs {
|
||||
inputDigests = append(inputDigests, inp.vertex.Digest())
|
||||
}
|
||||
v.clientVertex = client.Vertex{
|
||||
Inputs: inputDigests,
|
||||
Name: v.Name(),
|
||||
Digest: v.digest,
|
||||
}
|
||||
}
|
||||
|
||||
func (v *vertex) Digest() digest.Digest {
|
||||
return v.digest
|
||||
}
|
||||
|
||||
func (v *vertex) Sys() interface{} {
|
||||
return v.sys
|
||||
}
|
||||
|
||||
func (v *vertex) Metadata() *pb.OpMetadata {
|
||||
return v.metadata
|
||||
}
|
||||
|
||||
func (v *vertex) Inputs() (inputs []Input) {
|
||||
inputs = make([]Input, 0, len(v.inputs))
|
||||
for _, i := range v.inputs {
|
||||
inputs = append(inputs, Input{i.index, i.vertex})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (v *vertex) Name() string {
|
||||
return v.name
|
||||
}
|
||||
|
||||
func notifyStarted(ctx context.Context, v *client.Vertex) {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
v.Started = &now
|
||||
v.Completed = nil
|
||||
pw.Write(v.Digest.String(), *v)
|
||||
}
|
||||
|
||||
func notifyCompleted(ctx context.Context, v *client.Vertex, err error) {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
if v.Started == nil {
|
||||
v.Started = &now
|
||||
}
|
||||
v.Completed = &now
|
||||
v.Cached = false
|
||||
if err != nil {
|
||||
v.Error = err.Error()
|
||||
}
|
||||
pw.Write(v.Digest.String(), *v)
|
||||
}
|
||||
|
||||
func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error {
|
||||
v := client.Vertex{
|
||||
Digest: digest.FromBytes([]byte(identity.NewID())),
|
||||
Name: name,
|
||||
}
|
||||
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
|
||||
notifyStarted(ctx, &v)
|
||||
defer pw.Close()
|
||||
err := f(ctx)
|
||||
notifyCompleted(ctx, &v, err)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,260 @@
|
|||
package base
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/diff"
|
||||
"github.com/containerd/containerd/images"
|
||||
ctdsnapshot "github.com/containerd/containerd/snapshots"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/cacheimport"
|
||||
"github.com/moby/buildkit/cache/instructioncache"
|
||||
localcache "github.com/moby/buildkit/cache/instructioncache/local"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/executor"
|
||||
"github.com/moby/buildkit/exporter"
|
||||
imageexporter "github.com/moby/buildkit/exporter/containerimage"
|
||||
localexporter "github.com/moby/buildkit/exporter/local"
|
||||
ociexporter "github.com/moby/buildkit/exporter/oci"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/snapshot/blobmapping"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/llbop"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/source"
|
||||
"github.com/moby/buildkit/source/containerimage"
|
||||
"github.com/moby/buildkit/source/git"
|
||||
"github.com/moby/buildkit/source/http"
|
||||
"github.com/moby/buildkit/source/local"
|
||||
"github.com/moby/buildkit/worker"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// TODO: this file should be removed. containerd defines ContainerdWorker, oci defines OCIWorker. There is no base worker.
|
||||
|
||||
// WorkerOpt is specific to a worker.
|
||||
// See also CommonOpt.
|
||||
type WorkerOpt struct {
|
||||
Name string
|
||||
SessionManager *session.Manager
|
||||
MetadataStore *metadata.Store
|
||||
Executor executor.Executor
|
||||
BaseSnapshotter ctdsnapshot.Snapshotter // not blobmapping one (FIXME: just require blobmapping snapshotter?)
|
||||
ContentStore content.Store
|
||||
Applier diff.Differ
|
||||
Differ diff.Differ
|
||||
ImageStore images.Store
|
||||
}
|
||||
|
||||
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
|
||||
// TODO: s/Worker/OpWorker/g ?
|
||||
// FIXME: Worker should be rather an interface
|
||||
type Worker struct {
|
||||
WorkerOpt
|
||||
Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter
|
||||
CacheManager cache.Manager
|
||||
SourceManager *source.Manager
|
||||
cache instructioncache.InstructionCache
|
||||
Exporters map[string]exporter.Exporter
|
||||
ImageSource source.Source
|
||||
CacheExporter *cacheimport.CacheExporter // TODO: remove
|
||||
CacheImporter *cacheimport.CacheImporter // TODO: remove
|
||||
// no frontend here
|
||||
}
|
||||
|
||||
// NewWorker instantiates a local worker
|
||||
func NewWorker(opt WorkerOpt) (*Worker, error) {
|
||||
bmSnapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{
|
||||
Content: opt.ContentStore,
|
||||
Snapshotter: opt.BaseSnapshotter,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cm, err := cache.NewManager(cache.ManagerOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ic := &localcache.LocalStore{
|
||||
MetadataStore: opt.MetadataStore,
|
||||
Cache: cm,
|
||||
}
|
||||
|
||||
sm, err := source.NewManager()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
is, err := containerimage.NewSource(containerimage.SourceOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
SessionManager: opt.SessionManager,
|
||||
Applier: opt.Applier,
|
||||
CacheAccessor: cm,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sm.Register(is)
|
||||
|
||||
gs, err := git.NewSource(git.Opt{
|
||||
CacheAccessor: cm,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sm.Register(gs)
|
||||
|
||||
hs, err := http.NewSource(http.Opt{
|
||||
CacheAccessor: cm,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sm.Register(hs)
|
||||
|
||||
ss, err := local.NewSource(local.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
CacheAccessor: cm,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sm.Register(ss)
|
||||
|
||||
exporters := map[string]exporter.Exporter{}
|
||||
|
||||
iw, err := imageexporter.NewImageWriter(imageexporter.WriterOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
Differ: opt.Differ,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
imageExporter, err := imageexporter.New(imageexporter.Opt{
|
||||
Images: opt.ImageStore,
|
||||
SessionManager: opt.SessionManager,
|
||||
ImageWriter: iw,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterImage] = imageExporter
|
||||
|
||||
localExporter, err := localexporter.New(localexporter.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterLocal] = localExporter
|
||||
|
||||
ociExporter, err := ociexporter.New(ociexporter.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
ImageWriter: iw,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterOCI] = ociExporter
|
||||
|
||||
ce := cacheimport.NewCacheExporter(cacheimport.ExporterOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
SessionManager: opt.SessionManager,
|
||||
Differ: opt.Differ,
|
||||
})
|
||||
|
||||
ci := cacheimport.NewCacheImporter(cacheimport.ImportOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
Applier: opt.Applier,
|
||||
CacheAccessor: cm,
|
||||
SessionManager: opt.SessionManager,
|
||||
})
|
||||
|
||||
return &Worker{
|
||||
WorkerOpt: opt,
|
||||
Snapshotter: bmSnapshotter,
|
||||
CacheManager: cm,
|
||||
SourceManager: sm,
|
||||
cache: ic,
|
||||
Exporters: exporters,
|
||||
ImageSource: is,
|
||||
CacheExporter: ce,
|
||||
CacheImporter: ci,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (w *Worker) Resolve(v solver.Vertex, s worker.SubBuilder) (solver.Op, error) {
|
||||
switch op := v.Sys().(type) {
|
||||
case *pb.Op_Source:
|
||||
return llbop.NewSourceOp(v, op, w.SourceManager)
|
||||
case *pb.Op_Exec:
|
||||
return llbop.NewExecOp(v, op, w.CacheManager, w.Executor)
|
||||
case *pb.Op_Build:
|
||||
return llbop.NewBuildOp(v, op, s)
|
||||
default:
|
||||
return nil, errors.Errorf("could not resolve %v", v)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) {
|
||||
// ImageSource is typically source/containerimage
|
||||
resolveImageConfig, ok := w.ImageSource.(resolveImageConfig)
|
||||
if !ok {
|
||||
return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.Name())
|
||||
}
|
||||
return resolveImageConfig.ResolveImageConfig(ctx, ref)
|
||||
}
|
||||
|
||||
type resolveImageConfig interface {
|
||||
ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error)
|
||||
}
|
||||
|
||||
func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
|
||||
active, err := w.CacheManager.New(ctx, rootFS)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer active.Release(context.TODO())
|
||||
return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr)
|
||||
}
|
||||
|
||||
func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
|
||||
return w.CacheManager.DiskUsage(ctx, opt)
|
||||
}
|
||||
|
||||
func (w *Worker) Name() string {
|
||||
return w.WorkerOpt.Name
|
||||
}
|
||||
|
||||
func (w *Worker) Exporter(name string) (exporter.Exporter, error) {
|
||||
exp, ok := w.Exporters[name]
|
||||
if !ok {
|
||||
return nil, errors.Errorf("exporter %q could not be found", name)
|
||||
}
|
||||
return exp, nil
|
||||
}
|
||||
|
||||
func (w *Worker) InstructionCache() instructioncache.InstructionCache {
|
||||
return w.cache
|
||||
}
|
|
@ -11,7 +11,7 @@ import (
|
|||
"github.com/containerd/containerd/content"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/executor/containerdexecutor"
|
||||
"github.com/moby/buildkit/worker"
|
||||
"github.com/moby/buildkit/worker/base"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
@ -19,32 +19,32 @@ import (
|
|||
// NewWorkerOpt creates a WorkerOpt.
|
||||
// But it does not set the following fields:
|
||||
// - SessionManager
|
||||
func NewWorkerOpt(root string, address, snapshotterName string, opts ...containerd.ClientOpt) (worker.WorkerOpt, error) {
|
||||
func NewWorkerOpt(root string, address, snapshotterName string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
|
||||
// TODO: take lock to make sure there are no duplicates
|
||||
opts = append([]containerd.ClientOpt{containerd.WithDefaultNamespace("buildkit")}, opts...)
|
||||
client, err := containerd.New(address, opts...)
|
||||
if err != nil {
|
||||
return worker.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address)
|
||||
return base.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address)
|
||||
}
|
||||
return newContainerd(root, client, snapshotterName)
|
||||
}
|
||||
|
||||
func newContainerd(root string, client *containerd.Client, snapshotterName string) (worker.WorkerOpt, error) {
|
||||
func newContainerd(root string, client *containerd.Client, snapshotterName string) (base.WorkerOpt, error) {
|
||||
if strings.Contains(snapshotterName, "/") {
|
||||
return worker.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName)
|
||||
return base.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName)
|
||||
}
|
||||
name := "containerd-" + snapshotterName
|
||||
root = filepath.Join(root, name)
|
||||
if err := os.MkdirAll(root, 0700); err != nil {
|
||||
return worker.WorkerOpt{}, errors.Wrapf(err, "failed to create %s", root)
|
||||
return base.WorkerOpt{}, errors.Wrapf(err, "failed to create %s", root)
|
||||
}
|
||||
|
||||
md, err := metadata.NewStore(filepath.Join(root, "metadata.db"))
|
||||
if err != nil {
|
||||
return worker.WorkerOpt{}, err
|
||||
return base.WorkerOpt{}, err
|
||||
}
|
||||
df := client.DiffService()
|
||||
opt := worker.WorkerOpt{
|
||||
opt := base.WorkerOpt{
|
||||
Name: name,
|
||||
MetadataStore: md,
|
||||
Executor: containerdexecutor.New(client, root),
|
||||
|
|
|
@ -16,15 +16,15 @@ import (
|
|||
"github.com/containerd/containerd/snapshots/overlay"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/executor/runcexecutor"
|
||||
"github.com/moby/buildkit/worker"
|
||||
"github.com/moby/buildkit/worker/base"
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
// NewWorkerOpt creates a WorkerOpt.
|
||||
// But it does not set the following fields:
|
||||
// - SessionManager
|
||||
func NewWorkerOpt(root string) (worker.WorkerOpt, error) {
|
||||
var opt worker.WorkerOpt
|
||||
func NewWorkerOpt(root string) (base.WorkerOpt, error) {
|
||||
var opt base.WorkerOpt
|
||||
name := "runc-overlay"
|
||||
root = filepath.Join(root, name)
|
||||
if err := os.MkdirAll(root, 0700); err != nil {
|
||||
|
@ -68,7 +68,7 @@ func NewWorkerOpt(root string) (worker.WorkerOpt, error) {
|
|||
|
||||
// TODO: call mdb.GarbageCollect . maybe just inject it into nsSnapshotter.Remove and csContent.Delete
|
||||
|
||||
opt = worker.WorkerOpt{
|
||||
opt = base.WorkerOpt{
|
||||
Name: name,
|
||||
MetadataStore: md,
|
||||
Executor: exe,
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/snapshot"
|
||||
"github.com/moby/buildkit/source"
|
||||
"github.com/moby/buildkit/worker"
|
||||
"github.com/moby/buildkit/worker/base"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -44,7 +44,7 @@ func TestRuncWorker(t *testing.T) {
|
|||
workerOpt.SessionManager, err = session.NewManager()
|
||||
require.NoError(t, err)
|
||||
|
||||
w, err := worker.NewWorker(workerOpt)
|
||||
w, err := base.NewWorker(workerOpt)
|
||||
require.NoError(t, err)
|
||||
|
||||
img, err := source.NewImageIdentifier("docker.io/library/busybox:latest")
|
||||
|
|
206
worker/worker.go
206
worker/worker.go
|
@ -1,194 +1,44 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/diff"
|
||||
"github.com/containerd/containerd/images"
|
||||
ctdsnapshot "github.com/containerd/containerd/snapshots"
|
||||
"io"
|
||||
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/cacheimport"
|
||||
"github.com/moby/buildkit/cache/instructioncache"
|
||||
localcache "github.com/moby/buildkit/cache/instructioncache/local"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/executor"
|
||||
"github.com/moby/buildkit/exporter"
|
||||
imageexporter "github.com/moby/buildkit/exporter/containerimage"
|
||||
localexporter "github.com/moby/buildkit/exporter/local"
|
||||
ociexporter "github.com/moby/buildkit/exporter/oci"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/snapshot/blobmapping"
|
||||
"github.com/moby/buildkit/source"
|
||||
"github.com/moby/buildkit/source/containerimage"
|
||||
"github.com/moby/buildkit/source/git"
|
||||
"github.com/moby/buildkit/source/http"
|
||||
"github.com/moby/buildkit/source/local"
|
||||
"github.com/moby/buildkit/solver/types"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// WorkerOpt is specific to a worker.
|
||||
// See also CommonOpt.
|
||||
type WorkerOpt struct {
|
||||
Name string
|
||||
SessionManager *session.Manager
|
||||
MetadataStore *metadata.Store
|
||||
Executor executor.Executor
|
||||
BaseSnapshotter ctdsnapshot.Snapshotter // not blobmapping one (FIXME: just require blobmapping snapshotter?)
|
||||
ContentStore content.Store
|
||||
Applier diff.Differ
|
||||
Differ diff.Differ
|
||||
ImageStore images.Store
|
||||
}
|
||||
|
||||
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
|
||||
// TODO: s/Worker/OpWorker/g ?
|
||||
// FIXME: Worker should be rather an interface
|
||||
type Worker struct {
|
||||
WorkerOpt
|
||||
Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter
|
||||
CacheManager cache.Manager
|
||||
SourceManager *source.Manager
|
||||
InstructionCache instructioncache.InstructionCache
|
||||
Exporters map[string]exporter.Exporter
|
||||
ImageSource source.Source
|
||||
CacheExporter *cacheimport.CacheExporter
|
||||
CacheImporter *cacheimport.CacheImporter
|
||||
// no frontend here
|
||||
// type Worker struct {
|
||||
// WorkerOpt
|
||||
// Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter
|
||||
// CacheManager cache.Manager
|
||||
// SourceManager *source.Manager
|
||||
// InstructionCache instructioncache.InstructionCache
|
||||
// Exporters map[string]exporter.Exporter
|
||||
// ImageSource source.Source
|
||||
// CacheExporter *cacheimport.CacheExporter
|
||||
// CacheImporter *cacheimport.CacheImporter
|
||||
// // no frontend here
|
||||
// }
|
||||
|
||||
type SubBuilder interface {
|
||||
SubBuild(ctx context.Context, dgst digest.Digest, req types.SolveRequest) (types.Ref, error)
|
||||
}
|
||||
|
||||
// NewWorker instantiates a local worker
|
||||
func NewWorker(opt WorkerOpt) (*Worker, error) {
|
||||
bmSnapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{
|
||||
Content: opt.ContentStore,
|
||||
Snapshotter: opt.BaseSnapshotter,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cm, err := cache.NewManager(cache.ManagerOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ic := &localcache.LocalStore{
|
||||
MetadataStore: opt.MetadataStore,
|
||||
Cache: cm,
|
||||
}
|
||||
|
||||
sm, err := source.NewManager()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
is, err := containerimage.NewSource(containerimage.SourceOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
SessionManager: opt.SessionManager,
|
||||
Applier: opt.Applier,
|
||||
CacheAccessor: cm,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sm.Register(is)
|
||||
|
||||
gs, err := git.NewSource(git.Opt{
|
||||
CacheAccessor: cm,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sm.Register(gs)
|
||||
|
||||
hs, err := http.NewSource(http.Opt{
|
||||
CacheAccessor: cm,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sm.Register(hs)
|
||||
|
||||
ss, err := local.NewSource(local.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
CacheAccessor: cm,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sm.Register(ss)
|
||||
|
||||
exporters := map[string]exporter.Exporter{}
|
||||
|
||||
iw, err := imageexporter.NewImageWriter(imageexporter.WriterOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
Differ: opt.Differ,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
imageExporter, err := imageexporter.New(imageexporter.Opt{
|
||||
Images: opt.ImageStore,
|
||||
SessionManager: opt.SessionManager,
|
||||
ImageWriter: iw,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterImage] = imageExporter
|
||||
|
||||
localExporter, err := localexporter.New(localexporter.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterLocal] = localExporter
|
||||
|
||||
ociExporter, err := ociexporter.New(ociexporter.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
ImageWriter: iw,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterOCI] = ociExporter
|
||||
|
||||
ce := cacheimport.NewCacheExporter(cacheimport.ExporterOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
SessionManager: opt.SessionManager,
|
||||
Differ: opt.Differ,
|
||||
})
|
||||
|
||||
ci := cacheimport.NewCacheImporter(cacheimport.ImportOpt{
|
||||
Snapshotter: bmSnapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
Applier: opt.Applier,
|
||||
CacheAccessor: cm,
|
||||
SessionManager: opt.SessionManager,
|
||||
})
|
||||
|
||||
return &Worker{
|
||||
WorkerOpt: opt,
|
||||
Snapshotter: bmSnapshotter,
|
||||
CacheManager: cm,
|
||||
SourceManager: sm,
|
||||
InstructionCache: ic,
|
||||
Exporters: exporters,
|
||||
ImageSource: is,
|
||||
CacheExporter: ce,
|
||||
CacheImporter: ci,
|
||||
}, nil
|
||||
type Worker interface {
|
||||
InstructionCache() instructioncache.InstructionCache
|
||||
Resolve(v types.Vertex, s SubBuilder) (types.Op, error)
|
||||
ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error)
|
||||
Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
|
||||
DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error)
|
||||
Name() string
|
||||
Exporter(name string) (exporter.Exporter, error)
|
||||
}
|
||||
|
|
|
@ -11,11 +11,11 @@ import (
|
|||
type Controller struct {
|
||||
mu sync.Mutex
|
||||
// TODO: define worker interface and support remote ones
|
||||
workers []*Worker
|
||||
workers []Worker
|
||||
}
|
||||
|
||||
// Add adds a local worker
|
||||
func (c *Controller) Add(w *Worker) error {
|
||||
func (c *Controller) Add(w Worker) error {
|
||||
c.mu.Lock()
|
||||
c.workers = append(c.workers, w)
|
||||
c.mu.Unlock()
|
||||
|
@ -23,7 +23,7 @@ func (c *Controller) Add(w *Worker) error {
|
|||
}
|
||||
|
||||
// GetAll returns all local workers
|
||||
func (c *Controller) GetAll() []*Worker {
|
||||
func (c *Controller) GetAll() []Worker {
|
||||
c.mu.Lock()
|
||||
workers := c.workers
|
||||
c.mu.Unlock()
|
||||
|
@ -31,8 +31,8 @@ func (c *Controller) GetAll() []*Worker {
|
|||
}
|
||||
|
||||
// GetDefault returns the default local worker
|
||||
func (c *Controller) GetDefault() (*Worker, error) {
|
||||
var w *Worker
|
||||
func (c *Controller) GetDefault() (Worker, error) {
|
||||
var w Worker
|
||||
c.mu.Lock()
|
||||
if len(c.workers) > 0 {
|
||||
w = c.workers[0]
|
||||
|
|
Loading…
Reference in New Issue