From efde4f234030542acd8b6e3fefac9b626f9598ed Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Fri, 15 Dec 2017 00:06:54 -0800 Subject: [PATCH] worker, solver: update interfaces Signed-off-by: Tonis Tiigi --- cmd/buildd/main.go | 28 +- cmd/buildd/main_containerd_worker.go | 7 +- cmd/buildd/main_oci_worker.go | 7 +- control/control.go | 23 +- solver/{solver => }/jobs.go | 30 +- solver/{solver => }/llbbridge.go | 29 +- solver/{solver => }/llbop/build.go | 14 +- solver/{solver => }/llbop/exec.go | 5 +- solver/{solver => }/llbop/source.go | 0 solver/llbsolver.go | 35 ++ solver/{solver => }/load.go | 7 +- solver/{reference => }/sharedref.go | 21 +- solver/{solver => }/signal.go | 0 solver/solver.go | 740 ++++++++++++++++++++++++++- solver/solver/llbopsolver.go | 38 -- solver/solver/solver.go | 729 -------------------------- solver/solver/vertex.go | 104 ---- solver/types.go | 16 + solver/types/types.go | 58 +++ solver/vertex.go | 111 +++- worker/base/worker.go | 260 ++++++++++ worker/containerd/containerd.go | 16 +- worker/runc/runc.go | 8 +- worker/runc/runc_test.go | 4 +- worker/worker.go | 206 +------- worker/workercontroller.go | 10 +- 26 files changed, 1306 insertions(+), 1200 deletions(-) rename solver/{solver => }/jobs.go (92%) rename solver/{solver => }/llbbridge.go (53%) rename solver/{solver => }/llbop/build.go (86%) rename solver/{solver => }/llbop/exec.go (96%) rename solver/{solver => }/llbop/source.go (100%) create mode 100644 solver/llbsolver.go rename solver/{solver => }/load.go (90%) rename solver/{reference => }/sharedref.go (79%) rename solver/{solver => }/signal.go (100%) delete mode 100644 solver/solver/llbopsolver.go delete mode 100644 solver/solver/solver.go delete mode 100644 solver/solver/vertex.go create mode 100644 solver/types.go create mode 100644 solver/types/types.go create mode 100644 worker/base/worker.go diff --git a/cmd/buildd/main.go b/cmd/buildd/main.go index 99d2b9c3..a69a89d6 100644 --- a/cmd/buildd/main.go +++ b/cmd/buildd/main.go @@ -10,6 +10,7 @@ import ( "github.com/containerd/containerd/sys" "github.com/docker/go-connections/sockets" + "github.com/moby/buildkit/cache/cacheimport" "github.com/moby/buildkit/control" "github.com/moby/buildkit/frontend" "github.com/moby/buildkit/frontend/dockerfile" @@ -19,6 +20,7 @@ import ( "github.com/moby/buildkit/util/appdefaults" "github.com/moby/buildkit/util/profiler" "github.com/moby/buildkit/worker" + "github.com/moby/buildkit/worker/base" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -33,7 +35,7 @@ type workerInitializerOpt struct { } type workerInitializer struct { - fn func(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) + fn func(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) // less priority number, more preferred priority int } @@ -227,10 +229,28 @@ func newController(c *cli.Context, root string) (*control.Controller, error) { frontends := map[string]frontend.Frontend{} frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend() frontends["gateway.v0"] = gateway.NewGatewayFrontend() + + // cache exporter and importer are manager concepts but as there is no + // way to pull data into specific worker yet we currently set them up + // as part of default worker + var ce *cacheimport.CacheExporter + var ci *cacheimport.CacheImporter + + w, err := wc.GetDefault() + if err != nil { + return nil, err + } + + wt := w.(*base.Worker) + ce = wt.CacheExporter + ci = wt.CacheImporter + return control.NewController(control.Opt{ SessionManager: sessionManager, WorkerController: wc, Frontends: frontends, + CacheExporter: ce, + CacheImporter: ci, }) } @@ -242,7 +262,7 @@ func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Co return nil, err } for _, w := range ws { - logrus.Infof("Found worker %q", w.Name) + logrus.Infof("found worker %q", w.Name()) if err = wc.Add(w); err != nil { return nil, err } @@ -256,7 +276,7 @@ func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Co if err != nil { return nil, err } - logrus.Infof("Found %d workers, default=%q", nWorkers, defaultWorker.Name) - logrus.Warn("Currently, only the default worker can be used.") + logrus.Infof("found %d workers, default=%q", nWorkers, defaultWorker.Name) + logrus.Warn("currently, only the default worker can be used.") return wc, nil } diff --git a/cmd/buildd/main_containerd_worker.go b/cmd/buildd/main_containerd_worker.go index 5a4d1913..26b20dcb 100644 --- a/cmd/buildd/main_containerd_worker.go +++ b/cmd/buildd/main_containerd_worker.go @@ -8,6 +8,7 @@ import ( ctd "github.com/containerd/containerd" "github.com/moby/buildkit/worker" + "github.com/moby/buildkit/worker/base" "github.com/moby/buildkit/worker/containerd" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -33,7 +34,7 @@ func init() { // TODO(AkihiroSuda): allow using multiple snapshotters. should be useful for some applications that does not work with the default overlay snapshotter. e.g. mysql (docker/for-linux#72)", } -func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) { +func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) { socket := c.GlobalString("containerd-worker-addr") boolOrAuto, err := parseBoolOrAuto(c.GlobalString("containerd-worker")) if err != nil { @@ -47,11 +48,11 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([ return nil, err } opt.SessionManager = common.sessionManager - w, err := worker.NewWorker(opt) + w, err := base.NewWorker(opt) if err != nil { return nil, err } - return []*worker.Worker{w}, nil + return []worker.Worker{w}, nil } func validContainerdSocket(socket string) bool { diff --git a/cmd/buildd/main_oci_worker.go b/cmd/buildd/main_oci_worker.go index f5e7b371..a86a4b5f 100644 --- a/cmd/buildd/main_oci_worker.go +++ b/cmd/buildd/main_oci_worker.go @@ -6,6 +6,7 @@ import ( "os/exec" "github.com/moby/buildkit/worker" + "github.com/moby/buildkit/worker/base" "github.com/moby/buildkit/worker/runc" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -25,7 +26,7 @@ func init() { // TODO: allow multiple oci runtimes and snapshotters } -func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) { +func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) { boolOrAuto, err := parseBoolOrAuto(c.GlobalString("oci-worker")) if err != nil { return nil, err @@ -38,11 +39,11 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worke return nil, err } opt.SessionManager = common.sessionManager - w, err := worker.NewWorker(opt) + w, err := base.NewWorker(opt) if err != nil { return nil, err } - return []*worker.Worker{w}, nil + return []worker.Worker{w}, nil } func validOCIBinary() bool { diff --git a/control/control.go b/control/control.go index 2b9d62c0..a097d0dd 100644 --- a/control/control.go +++ b/control/control.go @@ -3,13 +3,13 @@ package control import ( "github.com/docker/distribution/reference" controlapi "github.com/moby/buildkit/api/services/control" + "github.com/moby/buildkit/cache/cacheimport" "github.com/moby/buildkit/client" "github.com/moby/buildkit/exporter" "github.com/moby/buildkit/frontend" "github.com/moby/buildkit/session" "github.com/moby/buildkit/session/grpchijack" "github.com/moby/buildkit/solver" - solverimpl "github.com/moby/buildkit/solver/solver" "github.com/moby/buildkit/worker" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -22,17 +22,24 @@ type Opt struct { SessionManager *session.Manager WorkerController *worker.Controller Frontends map[string]frontend.Frontend + CacheExporter *cacheimport.CacheExporter + CacheImporter *cacheimport.CacheImporter } type Controller struct { // TODO: ControlService opt Opt - solver solver.Solver + solver *solver.Solver } func NewController(opt Opt) (*Controller, error) { c := &Controller{ - opt: opt, - solver: solverimpl.NewLLBOpSolver(opt.WorkerController, opt.Frontends), + opt: opt, + solver: solver.NewLLBOpSolver(solver.LLBOpt{ + WorkerController: opt.WorkerController, + Frontends: opt.Frontends, + CacheExporter: opt.CacheExporter, + CacheImporter: opt.CacheImporter, + }), } return c, nil } @@ -45,7 +52,7 @@ func (c *Controller) Register(server *grpc.Server) error { func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) { resp := &controlapi.DiskUsageResponse{} for _, w := range c.opt.WorkerController.GetAll() { - du, err := w.CacheManager.DiskUsage(ctx, client.DiskUsageInfo{ + du, err := w.DiskUsage(ctx, client.DiskUsageInfo{ Filter: r.Filter, }) if err != nil { @@ -90,9 +97,9 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (* return nil, err } if req.Exporter != "" { - exp, ok := w.Exporters[req.Exporter] - if !ok { - return nil, errors.Errorf("exporter %q could not be found", req.Exporter) + exp, err := w.Exporter(req.Exporter) + if err != nil { + return nil, err } expi, err = exp.Resolve(ctx, req.ExporterAttrs) if err != nil { diff --git a/solver/solver/jobs.go b/solver/jobs.go similarity index 92% rename from solver/solver/jobs.go rename to solver/jobs.go index 6ca17243..1a8adbdc 100644 --- a/solver/solver/jobs.go +++ b/solver/jobs.go @@ -8,9 +8,7 @@ import ( "github.com/moby/buildkit/cache/instructioncache" "github.com/moby/buildkit/client" "github.com/moby/buildkit/session" - "github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver/pb" - "github.com/moby/buildkit/solver/reference" "github.com/moby/buildkit/util/progress" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -163,18 +161,18 @@ type job struct { type cacheRecord struct { VertexSolver - index solver.Index - ref solver.Ref + index Index + ref Ref } -func (j *job) load(def *pb.Definition, resolveOp ResolveOpFunc) (*solver.Input, error) { +func (j *job) load(def *pb.Definition, resolveOp ResolveOpFunc) (*Input, error) { j.l.mu.Lock() defer j.l.mu.Unlock() return j.loadInternal(def, resolveOp) } -func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*solver.Input, error) { +func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*Input, error) { vtx, idx, err := loadLLB(def, func(dgst digest.Digest, pbOp *pb.Op, load func(digest.Digest) (interface{}, error)) (interface{}, error) { if st, ok := j.l.actives[dgst]; ok { if vtx, ok := st.jobs[j]; ok { @@ -206,7 +204,7 @@ func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*solver } for i, input := range pbOp.Inputs { if inputMetadata := def.Metadata[input.Digest]; inputMetadata.IgnoreCache { - k, err := s.CacheKey(ctx, solver.Index(i)) + k, err := s.CacheKey(ctx, Index(i)) if err != nil { return nil, err } @@ -227,7 +225,7 @@ func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*solver if err != nil { return nil, err } - return &solver.Input{Vertex: vtx.(*vertex), Index: solver.Index(idx)}, nil + return &Input{Vertex: vtx.(*vertex), Index: Index(idx)}, nil } func (j *job) discard() { @@ -255,7 +253,7 @@ func (j *job) getSolver(dgst digest.Digest) (VertexSolver, error) { return st.solver, nil } -func (j *job) getRef(ctx context.Context, cv client.Vertex, index solver.Index) (solver.Ref, error) { +func (j *job) getRef(ctx context.Context, cv client.Vertex, index Index) (Ref, error) { s, err := j.getSolver(cv.Digest) if err != nil { return nil, err @@ -268,15 +266,15 @@ func (j *job) getRef(ctx context.Context, cv client.Vertex, index solver.Index) return ref, nil } -func (j *job) keepCacheRef(s VertexSolver, index solver.Index, ref solver.Ref) { - immutable, ok := reference.ToImmutableRef(ref) +func (j *job) keepCacheRef(s VertexSolver, index Index, ref Ref) { + immutable, ok := ToImmutableRef(ref) if ok { j.cached[immutable.ID()] = &cacheRecord{s, index, ref} } } -func (j *job) cacheExporter(ref solver.Ref) (CacheExporter, error) { - immutable, ok := reference.ToImmutableRef(ref) +func (j *job) cacheExporter(ref Ref) (CacheExporter, error) { + immutable, ok := ToImmutableRef(ref) if !ok { return nil, errors.Errorf("invalid reference") } @@ -287,7 +285,7 @@ func (j *job) cacheExporter(ref solver.Ref) (CacheExporter, error) { return cr.Cache(cr.index, cr.ref), nil } -func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index solver.Index, cache instructioncache.InstructionCache) (solver.Ref, error) { +func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index Index, cache instructioncache.InstructionCache) (Ref, error) { k, err := s.CacheKey(ctx, index) if err != nil { return nil, err @@ -298,7 +296,7 @@ func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index solver. } if ref != nil { markCached(ctx, cv) - return ref.(solver.Ref), nil + return ref.(Ref), nil } ev, err := s.OutputEvaluator(index) @@ -319,7 +317,7 @@ func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index solver. } if ref != nil { markCached(ctx, cv) - return ref.(solver.Ref), nil + return ref.(Ref), nil } continue } diff --git a/solver/solver/llbbridge.go b/solver/llbbridge.go similarity index 53% rename from solver/solver/llbbridge.go rename to solver/llbbridge.go index cf5030a7..d7f14245 100644 --- a/solver/solver/llbbridge.go +++ b/solver/llbbridge.go @@ -1,13 +1,8 @@ package solver import ( - "io" - "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/executor" "github.com/moby/buildkit/frontend" - solver "github.com/moby/buildkit/solver" - "github.com/moby/buildkit/solver/reference" "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -19,7 +14,7 @@ type llbBridge struct { *Solver job *job // this worker is used for running containerized frontend, not vertices - worker *worker.Worker + worker.Worker } type resolveImageConfig interface { @@ -39,7 +34,7 @@ func (s *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (cache return nil, nil, nil } } - ref, exp, err := s.solve(ctx, s.job, solver.SolveRequest{ + ref, exp, err := s.solve(ctx, s.job, SolveRequest{ Definition: req.Definition, Frontend: f, FrontendOpt: req.FrontendOpt, @@ -47,27 +42,9 @@ func (s *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (cache if err != nil { return nil, nil, err } - immutable, ok := reference.ToImmutableRef(ref) + immutable, ok := ToImmutableRef(ref) if !ok { return nil, nil, errors.Errorf("invalid reference for exporting: %T", ref) } return immutable, exp, nil } - -func (s *llbBridge) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) { - // ImageSource is typically source/containerimage - resolveImageConfig, ok := s.worker.ImageSource.(resolveImageConfig) - if !ok { - return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", s.worker.Name) - } - return resolveImageConfig.ResolveImageConfig(ctx, ref) -} - -func (s *llbBridge) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { - active, err := s.worker.CacheManager.New(ctx, rootFS) - if err != nil { - return err - } - defer active.Release(context.TODO()) - return s.worker.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr) -} diff --git a/solver/solver/llbop/build.go b/solver/llbop/build.go similarity index 86% rename from solver/solver/llbop/build.go rename to solver/llbop/build.go index d2bd9649..229807db 100644 --- a/solver/solver/llbop/build.go +++ b/solver/llbop/build.go @@ -7,27 +7,23 @@ import ( "github.com/containerd/containerd/fs" "github.com/moby/buildkit/client/llb" "github.com/moby/buildkit/snapshot" - solver "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver/pb" - "github.com/moby/buildkit/solver/reference" + "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/net/context" ) -type SubBuilder interface { - SubBuild(ctx context.Context, dgst digest.Digest, req solver.SolveRequest) (solver.Ref, error) -} - const buildCacheType = "buildkit.build.v0" type buildOp struct { op *pb.BuildOp - s SubBuilder + s worker.SubBuilder v solver.Vertex } -func NewBuildOp(v solver.Vertex, op *pb.Op_Build, s SubBuilder) (solver.Op, error) { +func NewBuildOp(v solver.Vertex, op *pb.Op_Build, s worker.SubBuilder) (solver.Op, error) { return &buildOp{ op: op.Build, s: s, @@ -66,7 +62,7 @@ func (b *buildOp) Run(ctx context.Context, inputs []solver.Ref) (outputs []solve } inp := inputs[i] - ref, ok := reference.ToImmutableRef(inp) + ref, ok := solver.ToImmutableRef(inp) if !ok { return nil, errors.Errorf("invalid reference for build %T", inp) } diff --git a/solver/solver/llbop/exec.go b/solver/llbop/exec.go similarity index 96% rename from solver/solver/llbop/exec.go rename to solver/llbop/exec.go index 6b4c4b2a..67a32de0 100644 --- a/solver/solver/llbop/exec.go +++ b/solver/llbop/exec.go @@ -12,7 +12,6 @@ import ( "github.com/moby/buildkit/executor" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver/pb" - "github.com/moby/buildkit/solver/reference" "github.com/moby/buildkit/util/progress/logs" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -78,7 +77,7 @@ func (e *execOp) Run(ctx context.Context, inputs []solver.Ref) ([]solver.Ref, er } inp := inputs[int(m.Input)] var ok bool - ref, ok = reference.ToImmutableRef(inp) + ref, ok = solver.ToImmutableRef(inp) if !ok { return nil, errors.Errorf("invalid reference for exec %T", inputs[int(m.Input)]) } @@ -86,7 +85,7 @@ func (e *execOp) Run(ctx context.Context, inputs []solver.Ref) ([]solver.Ref, er } if m.Output != pb.SkipOutput { if m.Readonly && ref != nil && m.Dest != pb.RootMount { // exclude read-only rootfs - outputs = append(outputs, reference.NewSharedRef(ref).Clone()) + outputs = append(outputs, solver.NewSharedRef(ref).Clone()) } else { active, err := e.cm.New(ctx, ref, cache.WithDescription(fmt.Sprintf("mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " ")))) // TODO: should be method if err != nil { diff --git a/solver/solver/llbop/source.go b/solver/llbop/source.go similarity index 100% rename from solver/solver/llbop/source.go rename to solver/llbop/source.go diff --git a/solver/llbsolver.go b/solver/llbsolver.go new file mode 100644 index 00000000..54398246 --- /dev/null +++ b/solver/llbsolver.go @@ -0,0 +1,35 @@ +package solver + +import ( + "github.com/moby/buildkit/cache/cacheimport" + "github.com/moby/buildkit/frontend" + "github.com/moby/buildkit/worker" +) + +// DetermineVertexWorker determines worker for a vertex. +// Currently, constraint is just ignored. +// Also we need to track the workers of the inputs. +func determineVertexWorker(wc *worker.Controller, v Vertex) (worker.Worker, error) { + // TODO: multiworker + return wc.GetDefault() +} + +type LLBOpt struct { + WorkerController *worker.Controller + Frontends map[string]frontend.Frontend // used by nested invocations + CacheExporter *cacheimport.CacheExporter + CacheImporter *cacheimport.CacheImporter +} + +func NewLLBOpSolver(opt LLBOpt) *Solver { + var s *Solver + s = New(func(v Vertex) (Op, error) { + // TODO: in reality, worker should be determined already and passed into this function(or this function would be removed) + w, err := determineVertexWorker(opt.WorkerController, v) + if err != nil { + return nil, err + } + return w.Resolve(v, s) + }, opt.WorkerController, determineVertexWorker, opt.Frontends, opt.CacheExporter, opt.CacheImporter) + return s +} diff --git a/solver/solver/load.go b/solver/load.go similarity index 90% rename from solver/solver/load.go rename to solver/load.go index 1b4b16da..1914d073 100644 --- a/solver/solver/load.go +++ b/solver/load.go @@ -3,7 +3,6 @@ package solver import ( "strings" - "github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/source" digest "github.com/opencontainers/go-digest" @@ -17,18 +16,18 @@ func newVertex(dgst digest.Digest, op *pb.Op, opMeta *pb.OpMetadata, load func(d if err != nil { return nil, err } - vtx.inputs = append(vtx.inputs, &input{index: solver.Index(in.Index), vertex: sub.(*vertex)}) + vtx.inputs = append(vtx.inputs, &input{index: Index(in.Index), vertex: sub.(*vertex)}) } vtx.initClientVertex() return vtx, nil } -func toInternalVertex(v solver.Vertex) *vertex { +func toInternalVertex(v Vertex) *vertex { cache := make(map[digest.Digest]*vertex) return loadInternalVertexHelper(v, cache) } -func loadInternalVertexHelper(v solver.Vertex, cache map[digest.Digest]*vertex) *vertex { +func loadInternalVertexHelper(v Vertex, cache map[digest.Digest]*vertex) *vertex { if v, ok := cache[v.Digest()]; ok { return v } diff --git a/solver/reference/sharedref.go b/solver/sharedref.go similarity index 79% rename from solver/reference/sharedref.go rename to solver/sharedref.go index 161fba96..90115b3e 100644 --- a/solver/reference/sharedref.go +++ b/solver/sharedref.go @@ -1,10 +1,9 @@ -package reference +package solver import ( "sync" "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/solver" "golang.org/x/net/context" ) @@ -13,11 +12,11 @@ import ( type SharedRef struct { mu sync.Mutex refs map[*sharedRefInstance]struct{} - main solver.Ref - solver.Ref + main Ref + Ref } -func NewSharedRef(main solver.Ref) *SharedRef { +func NewSharedRef(main Ref) *SharedRef { mr := &SharedRef{ refs: make(map[*sharedRefInstance]struct{}), Ref: main, @@ -26,7 +25,7 @@ func NewSharedRef(main solver.Ref) *SharedRef { return mr } -func (mr *SharedRef) Clone() solver.Ref { +func (mr *SharedRef) Clone() Ref { mr.mu.Lock() r := &sharedRefInstance{SharedRef: mr} mr.refs[r] = struct{}{} @@ -38,10 +37,10 @@ func (mr *SharedRef) Release(ctx context.Context) error { return mr.main.Release(ctx) } -func (mr *SharedRef) Sys() solver.Ref { +func (mr *SharedRef) Sys() Ref { sys := mr.Ref if s, ok := sys.(interface { - Sys() solver.Ref + Sys() Ref }); ok { return s.Sys() } @@ -62,17 +61,17 @@ func (r *sharedRefInstance) Release(ctx context.Context) error { return nil } -func OriginRef(ref solver.Ref) solver.Ref { +func OriginRef(ref Ref) Ref { sysRef := ref if sys, ok := ref.(interface { - Sys() solver.Ref + Sys() Ref }); ok { sysRef = sys.Sys() } return sysRef } -func ToImmutableRef(ref solver.Ref) (cache.ImmutableRef, bool) { +func ToImmutableRef(ref Ref) (cache.ImmutableRef, bool) { immutable, ok := OriginRef(ref).(cache.ImmutableRef) if !ok { return nil, false diff --git a/solver/solver/signal.go b/solver/signal.go similarity index 100% rename from solver/solver/signal.go rename to solver/signal.go diff --git a/solver/solver.go b/solver/solver.go index dfcfaba4..87ffd50c 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -1,43 +1,729 @@ package solver import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/cache/cacheimport" + "github.com/moby/buildkit/cache/contenthash" + "github.com/moby/buildkit/cache/instructioncache" "github.com/moby/buildkit/client" - "github.com/moby/buildkit/exporter" "github.com/moby/buildkit/frontend" - "github.com/moby/buildkit/solver/pb" + "github.com/moby/buildkit/util/bgfunc" + "github.com/moby/buildkit/util/progress" + "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" "golang.org/x/net/context" + "golang.org/x/sync/errgroup" ) -// Ref is a reference to the object passed through the build steps. -// This interface is a subset of the github.com/buildkit/buildkit/cache.Ref interface. -// For ease of unit testing, this interface only has Release(). -type Ref interface { - Release(context.Context) error +// FIXME: Also we need to track the workers of the inputs. +// TODO: REMOVE +type VertexWorkerDeterminer func(wc *worker.Controller, v Vertex) (worker.Worker, error) + +// ResolveOpFunc finds an Op implementation for a vertex +type ResolveOpFunc func(Vertex) (Op, error) + +type Solver struct { + resolve ResolveOpFunc + jobs *jobList + workerController *worker.Controller + determineVertexWorker VertexWorkerDeterminer + frontends map[string]frontend.Frontend + ce *cacheimport.CacheExporter + ci *cacheimport.CacheImporter } -// Op is an implementation for running a vertex -type Op interface { - // CacheKey returns a persistent cache key for operation. - CacheKey(context.Context) (digest.Digest, error) - // ContentMask returns a partial cache checksum with content paths to the - // inputs. User can combine the content checksum of these paths to get a valid - // content based cache key. - ContentMask(context.Context) (digest.Digest, [][]string, error) - // Run runs an operation and returns the output references. - Run(ctx context.Context, inputs []Ref) (outputs []Ref, err error) +func New(resolve ResolveOpFunc, wc *worker.Controller, vwd VertexWorkerDeterminer, f map[string]frontend.Frontend, ce *cacheimport.CacheExporter, ci *cacheimport.CacheImporter) *Solver { + return &Solver{resolve: resolve, jobs: newJobList(), workerController: wc, determineVertexWorker: vwd, frontends: f, ce: ce, ci: ci} } -type SolveRequest struct { - Definition *pb.Definition - Frontend frontend.Frontend - Exporter exporter.ExporterInstance - FrontendOpt map[string]string - ExportCacheRef string - ImportCacheRef string +func (s *Solver) solve(ctx context.Context, j *job, req SolveRequest) (Ref, map[string][]byte, error) { + if req.Definition == nil { + if req.Frontend == nil { + return nil, nil, errors.Errorf("invalid request: no definition nor frontend") + } + return req.Frontend.Solve(ctx, s.llbBridge(j), req.FrontendOpt) + } + + inp, err := j.load(req.Definition, s.resolve) + if err != nil { + return nil, nil, err + } + ref, err := j.getRef(ctx, inp.Vertex.(*vertex).clientVertex, inp.Index) + return ref, nil, err } -type Solver interface { - Solve(ctx context.Context, id string, req SolveRequest) error - Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error +func (s *Solver) llbBridge(j *job) *llbBridge { + // FIXME(AkihiroSuda): make sure worker implements interfaces required by llbBridge + worker, err := s.workerController.GetDefault() + if err != nil { + panic(err) + } + return &llbBridge{job: j, Solver: s, Worker: worker} +} + +func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + pr, ctx, closeProgressWriter := progress.NewContext(ctx) + defer closeProgressWriter() + + // TODO: multiworker. This should take union cache of all workers + defaultWorker, err := s.workerController.GetDefault() + if err != nil { + return err + } + mainCache := defaultWorker.InstructionCache() + if importRef := req.ImportCacheRef; importRef != "" { + cache, err := s.ci.Import(ctx, importRef) + if err != nil { + return err + } + mainCache = instructioncache.Union(mainCache, cache) + } + + // register a build job. vertex needs to be loaded to a job to run + ctx, j, err := s.jobs.new(ctx, id, pr, mainCache) + if err != nil { + return err + } + + ref, exporterOpt, err := s.solve(ctx, j, req) + defer j.discard() + if err != nil { + return err + } + + defer func() { + if ref != nil { + go ref.Release(context.TODO()) + } + }() + + var immutable cache.ImmutableRef + if ref != nil { + var ok bool + immutable, ok = ToImmutableRef(ref) + if !ok { + return errors.Errorf("invalid reference for exporting: %T", ref) + } + if err := immutable.Finalize(ctx); err != nil { + return err + } + } + + if exp := req.Exporter; exp != nil { + if err := inVertexContext(ctx, exp.Name(), func(ctx context.Context) error { + return exp.Export(ctx, immutable, exporterOpt) + }); err != nil { + return err + } + } + + if exportName := req.ExportCacheRef; exportName != "" { + if err := inVertexContext(ctx, "exporting build cache", func(ctx context.Context) error { + cache, err := j.cacheExporter(ref) + if err != nil { + return err + } + + records, err := cache.Export(ctx) + if err != nil { + return err + } + + // TODO: multiworker + return s.ce.Export(ctx, records, exportName) + }); err != nil { + return err + } + } + + return err +} + +func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error { + j, err := s.jobs.get(id) + if err != nil { + return err + } + defer close(statusChan) + return j.pipe(ctx, statusChan) +} + +func (s *Solver) SubBuild(ctx context.Context, dgst digest.Digest, req SolveRequest) (Ref, error) { + jl := s.jobs + jl.mu.Lock() + st, ok := jl.actives[dgst] + if !ok { + jl.mu.Unlock() + return nil, errors.Errorf("no such parent vertex: %v", dgst) + } + + var inp *Input + var cache instructioncache.InstructionCache + for j := range st.jobs { + var err error + inp, err = j.loadInternal(req.Definition, s.resolve) + if err != nil { + jl.mu.Unlock() + return nil, err + } + cache = j.cache // TODO: combine? + } + st = jl.actives[inp.Vertex.Digest()] + jl.mu.Unlock() + + return getRef(ctx, st.solver, inp.Vertex.(*vertex).clientVertex, inp.Index, cache) // TODO: combine to pass single input // TODO: export cache for subbuilds +} + +type VertexSolver interface { + CacheKey(ctx context.Context, index Index) (digest.Digest, error) + OutputEvaluator(Index) (VertexEvaluator, error) + Release() error + Cache(Index, Ref) CacheExporter +} + +type vertexInput struct { + solver VertexSolver + ev VertexEvaluator + cacheKeys []digest.Digest + ref Ref +} + +type vertexSolver struct { + inputs []*vertexInput + v *vertex + cv client.Vertex + op Op + cache instructioncache.InstructionCache + refs []*SharedRef + f *bgfunc.F + ctx context.Context + + baseKey digest.Digest + mu sync.Mutex + results []digest.Digest + markCachedOnce sync.Once + contentKey digest.Digest + + signal *signal // used to notify that there are callers who need more data +} + +type resolveF func(digest.Digest) (VertexSolver, error) + +func newVertexSolver(ctx context.Context, v *vertex, op Op, c instructioncache.InstructionCache, resolve resolveF) (*vertexSolver, error) { + inputs := make([]*vertexInput, len(v.inputs)) + for i, in := range v.inputs { + s, err := resolve(in.vertex.digest) + if err != nil { + return nil, err + } + ev, err := s.OutputEvaluator(in.index) + if err != nil { + return nil, err + } + ev.Cancel() + inputs[i] = &vertexInput{ + solver: s, + ev: ev, + } + } + return &vertexSolver{ + ctx: ctx, + inputs: inputs, + v: v, + cv: v.clientVertex, + op: op, + cache: c, + signal: newSignaller(), + }, nil +} + +func markCached(ctx context.Context, cv client.Vertex) { + pw, _, _ := progress.FromContext(ctx) + defer pw.Close() + + if cv.Started == nil { + now := time.Now() + cv.Started = &now + cv.Completed = &now + cv.Cached = true + } + pw.Write(cv.Digest.String(), cv) +} + +type CacheExporter interface { + Export(context.Context) ([]cacheimport.CacheRecord, error) +} + +func (vs *vertexSolver) Cache(index Index, ref Ref) CacheExporter { + return &cacheExporter{vertexSolver: vs, index: index, ref: ref} +} + +type cacheExporter struct { + *vertexSolver + index Index + ref Ref +} + +func (ce *cacheExporter) Export(ctx context.Context) ([]cacheimport.CacheRecord, error) { + return ce.vertexSolver.Export(ctx, ce.index, ce.ref) +} + +func (vs *vertexSolver) Export(ctx context.Context, index Index, ref Ref) ([]cacheimport.CacheRecord, error) { + mp := map[digest.Digest]cacheimport.CacheRecord{} + if err := vs.appendInputCache(ctx, mp); err != nil { + return nil, err + } + dgst, err := vs.mainCacheKey() + if err != nil { + return nil, err + } + immutable, ok := ToImmutableRef(ref) + if !ok { + return nil, errors.Errorf("invalid reference") + } + dgst = cacheKeyForIndex(dgst, index) + mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: immutable} + out := make([]cacheimport.CacheRecord, 0, len(mp)) + for _, cr := range mp { + out = append(out, cr) + } + return out, nil +} + +func (vs *vertexSolver) appendInputCache(ctx context.Context, mp map[digest.Digest]cacheimport.CacheRecord) error { + for i, inp := range vs.inputs { + mainDgst, err := inp.solver.(*vertexSolver).mainCacheKey() + if err != nil { + return err + } + dgst := cacheKeyForIndex(mainDgst, vs.v.inputs[i].index) + if cr, ok := mp[dgst]; !ok || (cr.Reference == nil && inp.ref != nil) { + if err := inp.solver.(*vertexSolver).appendInputCache(ctx, mp); err != nil { + return err + } + if inp.ref != nil && len(inp.solver.(*vertexSolver).inputs) > 0 { // Ignore pushing the refs for sources + ref, ok := ToImmutableRef(inp.ref) + if !ok { + return errors.Errorf("invalid reference") + } + mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: ref} + } else { + mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst} + } + } + } + if ck := vs.contentKey; ck != "" { + mainDgst, err := vs.mainCacheKey() + if err != nil { + return err + } + mp[ck] = cacheimport.CacheRecord{CacheKey: mainDgst, ContentKey: ck} + } + return nil +} + +func (vs *vertexSolver) CacheKey(ctx context.Context, index Index) (digest.Digest, error) { + vs.mu.Lock() + defer vs.mu.Unlock() + if vs.baseKey == "" { + eg, ctx := errgroup.WithContext(vs.ctx) + for i := range vs.inputs { + func(i int) { + eg.Go(func() error { + k, err := vs.inputs[i].solver.CacheKey(ctx, vs.v.inputs[i].index) + if err != nil { + return err + } + vs.inputs[i].cacheKeys = append(vs.inputs[i].cacheKeys, k) + return nil + }) + }(i) + } + var dgst digest.Digest + eg.Go(func() error { + var err error + dgst, err = vs.op.CacheKey(ctx) + if err != nil { + return err + } + return nil + }) + + if err := eg.Wait(); err != nil { + return "", err + } + + vs.baseKey = dgst + } + + k, err := vs.lastCacheKey() + if err != nil { + return "", err + } + + return cacheKeyForIndex(k, index), nil +} + +func (vs *vertexSolver) lastCacheKey() (digest.Digest, error) { + return vs.currentCacheKey(true) +} + +func (vs *vertexSolver) mainCacheKey() (digest.Digest, error) { + return vs.currentCacheKey(false) +} + +func (vs *vertexSolver) currentCacheKey(last bool) (digest.Digest, error) { + inputKeys := make([]digest.Digest, len(vs.inputs)) + for i, inp := range vs.inputs { + if len(inp.cacheKeys) == 0 { + return "", errors.Errorf("inputs not processed") + } + if last { + inputKeys[i] = inp.cacheKeys[len(inp.cacheKeys)-1] + } else { + inputKeys[i] = inp.cacheKeys[0] + } + } + dt, err := json.Marshal(struct { + Inputs []digest.Digest + CacheKey digest.Digest + }{Inputs: inputKeys, CacheKey: vs.baseKey}) + if err != nil { + return "", err + } + return digest.FromBytes(dt), nil +} + +func (vs *vertexSolver) OutputEvaluator(index Index) (VertexEvaluator, error) { + if vs.f == nil { + f, err := bgfunc.New(vs.ctx, vs.run) + if err != nil { + return nil, err + } + vs.f = f + } + c := vs.f.NewCaller() + ve := &vertexEvaluator{vertexSolver: vs, c: c, index: index} + return ve, nil +} + +func (vs *vertexSolver) Release() error { + for _, inp := range vs.inputs { + if inp.ref != nil { + inp.ref.Release(context.TODO()) + } + } + if vs.refs != nil { + for _, r := range vs.refs { + r.Release(context.TODO()) + } + } + return nil +} + +// run is called by the bgfunc concurrency primitive. This function may be +// called multiple times but never in parallal. Repeated calls should make an +// effort to continue from previous state. Lock vs.mu to syncronize data to the +// callers. Signal parameter can be used to notify callers that new data is +// available without returning from the function. +func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) { + vs.mu.Lock() + if vs.refs != nil { + vs.mu.Unlock() + return nil + } + vs.mu.Unlock() + + waitFirst := vs.signal.Wait() + waitRun := waitFirst + + select { + case <-ctx.Done(): + return ctx.Err() + case <-waitFirst: + } + + // this is where you lookup the cache keys that were successfully probed + + eg, ctx2 := errgroup.WithContext(ctx) + + // process all the inputs + for i, inp := range vs.inputs { + if inp.ref == nil { + func(i int) { + eg.Go(func() error { + inp := vs.inputs[i] + defer inp.ev.Cancel() + + waitNext := waitFirst + for { + select { + case <-ctx2.Done(): + return ctx2.Err() + case <-waitNext: + } + + // check if current cache key is in cache + if len(inp.cacheKeys) > 0 { + ref, err := vs.cache.Lookup(ctx2, inp.cacheKeys[len(inp.cacheKeys)-1], inp.solver.(*vertexSolver).v.Name()) + if err != nil { + return err + } + if ref != nil { + inp.ref = ref.(Ref) + inp.solver.(*vertexSolver).markCachedOnce.Do(func() { + markCached(ctx, inp.solver.(*vertexSolver).cv) + }) + return nil + } + } + + // evaluate next cachekey/reference for input + res, err := inp.ev.Next(ctx2) + if err != nil { + return err + } + if res == nil { // there is no more data coming + return nil + } + if ref := res.Reference; ref != nil { + if ref, ok := ToImmutableRef(ref); ok { + if !cache.HasCachePolicyRetain(ref) { + if err := cache.CachePolicyRetain(ref); err != nil { + return err + } + ref.Metadata().Commit() + } + inp.ref = ref + } + return nil + } + + // Only try matching cache if the cachekey for input is present + exists, err := vs.cache.Probe(ctx2, res.CacheKey) + if err != nil { + return err + } + if exists { + vs.mu.Lock() + inp.cacheKeys = append(inp.cacheKeys, res.CacheKey) + dgst, err := vs.lastCacheKey() + if err != nil { + vs.mu.Unlock() + return err + } + vs.results = append(vs.results, dgst) + signal() // wake up callers + waitNext = vs.signal.Reset() // make sure we don't continue unless there are callers + waitRun = waitNext + vs.mu.Unlock() + } + } + }) + }(i) + } + } + + if err := eg.Wait(); err != nil { + return err + } + + // Find extra cache keys by content + inputRefs := make([]Ref, len(vs.inputs)) + lastInputKeys := make([]digest.Digest, len(vs.inputs)) + for i := range vs.inputs { + inputRefs[i] = vs.inputs[i].ref + lastInputKeys[i] = vs.inputs[i].cacheKeys[len(vs.inputs[i].cacheKeys)-1] + } + + dgst, inp, err := vs.op.ContentMask(ctx) + if err != nil { + return err + } + + var contentKey digest.Digest + if dgst != "" { + contentKey, err = calculateContentHash(ctx, inputRefs, dgst, lastInputKeys, inp) + if err != nil { + return err + } + vs.contentKey = contentKey + + var extraKeys []digest.Digest + cks, err := vs.cache.GetContentMapping(contentKey) + if err != nil { + return err + } + extraKeys = append(extraKeys, cks...) + if len(extraKeys) > 0 { + vs.mu.Lock() + vs.results = append(vs.results, extraKeys...) + signal() + waitRun = vs.signal.Reset() + vs.mu.Unlock() + } + } + + select { + case <-ctx.Done(): + return + case <-waitRun: + } + + // no cache hit. start evaluating the node + notifyStarted(ctx, &vs.cv) + defer func() { + notifyCompleted(ctx, &vs.cv, retErr) + }() + + refs, err := vs.op.Run(ctx, inputRefs) + if err != nil { + return err + } + sr := make([]*SharedRef, len(refs)) + for i, r := range refs { + sr[i] = NewSharedRef(r) + } + vs.mu.Lock() + vs.refs = sr + vs.mu.Unlock() + + // store the cacheKeys for current refs + if vs.cache != nil { + cacheKey, err := vs.lastCacheKey() + if err != nil { + return err + } + for i, ref := range refs { + if err != nil { + return err + } + r := OriginRef(ref) + if err := vs.cache.Set(cacheKeyForIndex(cacheKey, Index(i)), r); err != nil { + logrus.Errorf("failed to save cache for %s: %v", cacheKey, err) + } + } + if contentKey != "" { + if err := vs.cache.SetContentMapping(contentKey, cacheKey); err != nil { + logrus.Errorf("failed to save content mapping: %v", err) + } + } + } + return nil +} + +func getInputContentHash(ctx context.Context, ref cache.ImmutableRef, selectors []string) (digest.Digest, error) { + out := make([]digest.Digest, 0, len(selectors)) + for _, s := range selectors { + dgst, err := contenthash.Checksum(ctx, ref, s) + if err != nil { + return "", err + } + out = append(out, dgst) + } + if len(out) == 1 { + return out[0], nil + } + dt, err := json.Marshal(out) + if err != nil { + return "", err + } + return digest.FromBytes(dt), nil +} + +func calculateContentHash(ctx context.Context, refs []Ref, mainDigest digest.Digest, inputs []digest.Digest, contentMap [][]string) (digest.Digest, error) { + dgsts := make([]digest.Digest, len(contentMap)) + eg, ctx := errgroup.WithContext(ctx) + for i, sel := range contentMap { + if sel == nil { + dgsts[i] = inputs[i] + continue + } + func(i int) { + eg.Go(func() error { + ref, ok := ToImmutableRef(refs[i]) + if !ok { + return errors.Errorf("invalid reference for exporting: %T", ref) + } + dgst, err := getInputContentHash(ctx, ref, contentMap[i]) + if err != nil { + return err + } + dgsts[i] = dgst + return nil + }) + }(i) + } + if err := eg.Wait(); err != nil { + return "", err + } + dt, err := json.Marshal(struct { + Main digest.Digest + Inputs []digest.Digest + }{ + Main: mainDigest, + Inputs: dgsts, + }) + if err != nil { + return "", err + } + return digest.FromBytes(dt), nil +} + +type VertexEvaluator interface { + Next(context.Context) (*VertexResult, error) + Cancel() error +} + +type vertexEvaluator struct { + *vertexSolver + c *bgfunc.Caller + cursor int + index Index +} + +func (ve *vertexEvaluator) Next(ctx context.Context) (*VertexResult, error) { + v, err := ve.c.Call(ctx, func() (interface{}, error) { + ve.mu.Lock() + defer ve.mu.Unlock() + if ve.refs != nil { + return &VertexResult{Reference: ve.refs[int(ve.index)].Clone()}, nil + } + if i := ve.cursor; i < len(ve.results) { + ve.cursor++ + return &VertexResult{CacheKey: cacheKeyForIndex(ve.results[i], ve.index)}, nil + } + ve.signal.Signal() + return nil, nil + }) + if err != nil { + return nil, err + } + if v == nil { + return nil, nil // no more records are coming + } + return v.(*VertexResult), nil +} + +func (ve *vertexEvaluator) Cancel() error { + return ve.c.Cancel() +} + +type VertexResult struct { + CacheKey digest.Digest + Reference Ref +} + +func cacheKeyForIndex(dgst digest.Digest, index Index) digest.Digest { + return digest.FromBytes([]byte(fmt.Sprintf("%s.%d", dgst, index))) } diff --git a/solver/solver/llbopsolver.go b/solver/solver/llbopsolver.go deleted file mode 100644 index f2f5f042..00000000 --- a/solver/solver/llbopsolver.go +++ /dev/null @@ -1,38 +0,0 @@ -package solver - -import ( - "github.com/moby/buildkit/frontend" - solver "github.com/moby/buildkit/solver" - "github.com/moby/buildkit/solver/pb" - "github.com/moby/buildkit/solver/solver/llbop" - "github.com/moby/buildkit/worker" -) - -// DetermineVertexWorker determines worker for a vertex. -// Currently, constraint is just ignored. -// Also we need to track the workers of the inputs. -func DetermineVertexWorker(wc *worker.Controller, v solver.Vertex) (*worker.Worker, error) { - // TODO: multiworker - return wc.GetDefault() -} - -func NewLLBOpSolver(wc *worker.Controller, frontends map[string]frontend.Frontend) solver.Solver { - var s *Solver - s = New(func(v solver.Vertex) (solver.Op, error) { - w, err := DetermineVertexWorker(wc, v) - if err != nil { - return nil, err - } - switch op := v.Sys().(type) { - case *pb.Op_Source: - return llbop.NewSourceOp(v, op, w.SourceManager) - case *pb.Op_Exec: - return llbop.NewExecOp(v, op, w.CacheManager, w.Executor) - case *pb.Op_Build: - return llbop.NewBuildOp(v, op, s) - default: - return nil, nil - } - }, wc, DetermineVertexWorker, frontends) - return s -} diff --git a/solver/solver/solver.go b/solver/solver/solver.go deleted file mode 100644 index 9c71e191..00000000 --- a/solver/solver/solver.go +++ /dev/null @@ -1,729 +0,0 @@ -package solver - -import ( - "encoding/json" - "fmt" - "sync" - "time" - - "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/cacheimport" - "github.com/moby/buildkit/cache/contenthash" - "github.com/moby/buildkit/cache/instructioncache" - "github.com/moby/buildkit/client" - "github.com/moby/buildkit/frontend" - solver "github.com/moby/buildkit/solver" - "github.com/moby/buildkit/solver/reference" - "github.com/moby/buildkit/util/bgfunc" - "github.com/moby/buildkit/util/progress" - "github.com/moby/buildkit/worker" - digest "github.com/opencontainers/go-digest" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "golang.org/x/net/context" - "golang.org/x/sync/errgroup" -) - -// FIXME: Also we need to track the workers of the inputs. -type VertexWorkerDeterminer func(wc *worker.Controller, v solver.Vertex) (*worker.Worker, error) - -// ResolveOpFunc finds an Op implementation for a vertex -type ResolveOpFunc func(solver.Vertex) (solver.Op, error) - -type Solver struct { - resolve ResolveOpFunc - jobs *jobList - workerController *worker.Controller - determineVertexWorker VertexWorkerDeterminer - frontends map[string]frontend.Frontend -} - -func New(resolve ResolveOpFunc, wc *worker.Controller, vwd VertexWorkerDeterminer, f map[string]frontend.Frontend) *Solver { - return &Solver{resolve: resolve, jobs: newJobList(), workerController: wc, determineVertexWorker: vwd, frontends: f} -} - -func (s *Solver) solve(ctx context.Context, j *job, req solver.SolveRequest) (solver.Ref, map[string][]byte, error) { - if req.Definition == nil { - if req.Frontend == nil { - return nil, nil, errors.Errorf("invalid request: no definition nor frontend") - } - return req.Frontend.Solve(ctx, s.llbBridge(j), req.FrontendOpt) - } - - inp, err := j.load(req.Definition, s.resolve) - if err != nil { - return nil, nil, err - } - ref, err := j.getRef(ctx, inp.Vertex.(*vertex).clientVertex, inp.Index) - return ref, nil, err -} - -func (s *Solver) llbBridge(j *job) *llbBridge { - // FIXME(AkihiroSuda): make sure worker implements interfaces required by llbBridge - worker, err := s.workerController.GetDefault() - if err != nil { - panic(err) - } - return &llbBridge{job: j, Solver: s, worker: worker} -} - -func (s *Solver) Solve(ctx context.Context, id string, req solver.SolveRequest) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - pr, ctx, closeProgressWriter := progress.NewContext(ctx) - defer closeProgressWriter() - - // TODO: multiworker - defaultWorker, err := s.workerController.GetDefault() - if err != nil { - return err - } - if importRef := req.ImportCacheRef; importRef != "" { - cache, err := defaultWorker.CacheImporter.Import(ctx, importRef) - if err != nil { - return err - } - defaultWorker.InstructionCache = instructioncache.Union(defaultWorker.InstructionCache, cache) - } - - // register a build job. vertex needs to be loaded to a job to run - ctx, j, err := s.jobs.new(ctx, id, pr, defaultWorker.InstructionCache) - if err != nil { - return err - } - - ref, exporterOpt, err := s.solve(ctx, j, req) - defer j.discard() - if err != nil { - return err - } - - defer func() { - if ref != nil { - go ref.Release(context.TODO()) - } - }() - - var immutable cache.ImmutableRef - if ref != nil { - var ok bool - immutable, ok = reference.ToImmutableRef(ref) - if !ok { - return errors.Errorf("invalid reference for exporting: %T", ref) - } - if err := immutable.Finalize(ctx); err != nil { - return err - } - } - - if exp := req.Exporter; exp != nil { - if err := inVertexContext(ctx, exp.Name(), func(ctx context.Context) error { - return exp.Export(ctx, immutable, exporterOpt) - }); err != nil { - return err - } - } - - if exportName := req.ExportCacheRef; exportName != "" { - if err := inVertexContext(ctx, "exporting build cache", func(ctx context.Context) error { - cache, err := j.cacheExporter(ref) - if err != nil { - return err - } - - records, err := cache.Export(ctx) - if err != nil { - return err - } - - // TODO: multiworker - return defaultWorker.CacheExporter.Export(ctx, records, exportName) - }); err != nil { - return err - } - } - - return err -} - -func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error { - j, err := s.jobs.get(id) - if err != nil { - return err - } - defer close(statusChan) - return j.pipe(ctx, statusChan) -} - -func (s *Solver) SubBuild(ctx context.Context, dgst digest.Digest, req solver.SolveRequest) (solver.Ref, error) { - jl := s.jobs - jl.mu.Lock() - st, ok := jl.actives[dgst] - if !ok { - jl.mu.Unlock() - return nil, errors.Errorf("no such parent vertex: %v", dgst) - } - - var inp *solver.Input - for j := range st.jobs { - var err error - inp, err = j.loadInternal(req.Definition, s.resolve) - if err != nil { - jl.mu.Unlock() - return nil, err - } - } - st = jl.actives[inp.Vertex.Digest()] - jl.mu.Unlock() - - w, err := s.determineVertexWorker(s.workerController, inp.Vertex) - if err != nil { - return nil, err - } - return getRef(ctx, st.solver, inp.Vertex.(*vertex).clientVertex, inp.Index, w.InstructionCache) // TODO: combine to pass single input // TODO: export cache for subbuilds -} - -type VertexSolver interface { - CacheKey(ctx context.Context, index solver.Index) (digest.Digest, error) - OutputEvaluator(solver.Index) (VertexEvaluator, error) - Release() error - Cache(solver.Index, solver.Ref) CacheExporter -} - -type vertexInput struct { - solver VertexSolver - ev VertexEvaluator - cacheKeys []digest.Digest - ref solver.Ref -} - -type vertexSolver struct { - inputs []*vertexInput - v *vertex - cv client.Vertex - op solver.Op - cache instructioncache.InstructionCache - refs []*reference.SharedRef - f *bgfunc.F - ctx context.Context - - baseKey digest.Digest - mu sync.Mutex - results []digest.Digest - markCachedOnce sync.Once - contentKey digest.Digest - - signal *signal // used to notify that there are callers who need more data -} - -type resolveF func(digest.Digest) (VertexSolver, error) - -func newVertexSolver(ctx context.Context, v *vertex, op solver.Op, c instructioncache.InstructionCache, resolve resolveF) (*vertexSolver, error) { - inputs := make([]*vertexInput, len(v.inputs)) - for i, in := range v.inputs { - s, err := resolve(in.vertex.digest) - if err != nil { - return nil, err - } - ev, err := s.OutputEvaluator(in.index) - if err != nil { - return nil, err - } - ev.Cancel() - inputs[i] = &vertexInput{ - solver: s, - ev: ev, - } - } - return &vertexSolver{ - ctx: ctx, - inputs: inputs, - v: v, - cv: v.clientVertex, - op: op, - cache: c, - signal: newSignaller(), - }, nil -} - -func markCached(ctx context.Context, cv client.Vertex) { - pw, _, _ := progress.FromContext(ctx) - defer pw.Close() - - if cv.Started == nil { - now := time.Now() - cv.Started = &now - cv.Completed = &now - cv.Cached = true - } - pw.Write(cv.Digest.String(), cv) -} - -type CacheExporter interface { - Export(context.Context) ([]cacheimport.CacheRecord, error) -} - -func (vs *vertexSolver) Cache(index solver.Index, ref solver.Ref) CacheExporter { - return &cacheExporter{vertexSolver: vs, index: index, ref: ref} -} - -type cacheExporter struct { - *vertexSolver - index solver.Index - ref solver.Ref -} - -func (ce *cacheExporter) Export(ctx context.Context) ([]cacheimport.CacheRecord, error) { - return ce.vertexSolver.Export(ctx, ce.index, ce.ref) -} - -func (vs *vertexSolver) Export(ctx context.Context, index solver.Index, ref solver.Ref) ([]cacheimport.CacheRecord, error) { - mp := map[digest.Digest]cacheimport.CacheRecord{} - if err := vs.appendInputCache(ctx, mp); err != nil { - return nil, err - } - dgst, err := vs.mainCacheKey() - if err != nil { - return nil, err - } - immutable, ok := reference.ToImmutableRef(ref) - if !ok { - return nil, errors.Errorf("invalid reference") - } - dgst = cacheKeyForIndex(dgst, index) - mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: immutable} - out := make([]cacheimport.CacheRecord, 0, len(mp)) - for _, cr := range mp { - out = append(out, cr) - } - return out, nil -} - -func (vs *vertexSolver) appendInputCache(ctx context.Context, mp map[digest.Digest]cacheimport.CacheRecord) error { - for i, inp := range vs.inputs { - mainDgst, err := inp.solver.(*vertexSolver).mainCacheKey() - if err != nil { - return err - } - dgst := cacheKeyForIndex(mainDgst, vs.v.inputs[i].index) - if cr, ok := mp[dgst]; !ok || (cr.Reference == nil && inp.ref != nil) { - if err := inp.solver.(*vertexSolver).appendInputCache(ctx, mp); err != nil { - return err - } - if inp.ref != nil && len(inp.solver.(*vertexSolver).inputs) > 0 { // Ignore pushing the refs for sources - ref, ok := reference.ToImmutableRef(inp.ref) - if !ok { - return errors.Errorf("invalid reference") - } - mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: ref} - } else { - mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst} - } - } - } - if ck := vs.contentKey; ck != "" { - mainDgst, err := vs.mainCacheKey() - if err != nil { - return err - } - mp[ck] = cacheimport.CacheRecord{CacheKey: mainDgst, ContentKey: ck} - } - return nil -} - -func (vs *vertexSolver) CacheKey(ctx context.Context, index solver.Index) (digest.Digest, error) { - vs.mu.Lock() - defer vs.mu.Unlock() - if vs.baseKey == "" { - eg, ctx := errgroup.WithContext(vs.ctx) - for i := range vs.inputs { - func(i int) { - eg.Go(func() error { - k, err := vs.inputs[i].solver.CacheKey(ctx, vs.v.inputs[i].index) - if err != nil { - return err - } - vs.inputs[i].cacheKeys = append(vs.inputs[i].cacheKeys, k) - return nil - }) - }(i) - } - var dgst digest.Digest - eg.Go(func() error { - var err error - dgst, err = vs.op.CacheKey(ctx) - if err != nil { - return err - } - return nil - }) - - if err := eg.Wait(); err != nil { - return "", err - } - - vs.baseKey = dgst - } - - k, err := vs.lastCacheKey() - if err != nil { - return "", err - } - - return cacheKeyForIndex(k, index), nil -} - -func (vs *vertexSolver) lastCacheKey() (digest.Digest, error) { - return vs.currentCacheKey(true) -} - -func (vs *vertexSolver) mainCacheKey() (digest.Digest, error) { - return vs.currentCacheKey(false) -} - -func (vs *vertexSolver) currentCacheKey(last bool) (digest.Digest, error) { - inputKeys := make([]digest.Digest, len(vs.inputs)) - for i, inp := range vs.inputs { - if len(inp.cacheKeys) == 0 { - return "", errors.Errorf("inputs not processed") - } - if last { - inputKeys[i] = inp.cacheKeys[len(inp.cacheKeys)-1] - } else { - inputKeys[i] = inp.cacheKeys[0] - } - } - dt, err := json.Marshal(struct { - Inputs []digest.Digest - CacheKey digest.Digest - }{Inputs: inputKeys, CacheKey: vs.baseKey}) - if err != nil { - return "", err - } - return digest.FromBytes(dt), nil -} - -func (vs *vertexSolver) OutputEvaluator(index solver.Index) (VertexEvaluator, error) { - if vs.f == nil { - f, err := bgfunc.New(vs.ctx, vs.run) - if err != nil { - return nil, err - } - vs.f = f - } - c := vs.f.NewCaller() - ve := &vertexEvaluator{vertexSolver: vs, c: c, index: index} - return ve, nil -} - -func (vs *vertexSolver) Release() error { - for _, inp := range vs.inputs { - if inp.ref != nil { - inp.ref.Release(context.TODO()) - } - } - if vs.refs != nil { - for _, r := range vs.refs { - r.Release(context.TODO()) - } - } - return nil -} - -// run is called by the bgfunc concurrency primitive. This function may be -// called multiple times but never in parallal. Repeated calls should make an -// effort to continue from previous state. Lock vs.mu to syncronize data to the -// callers. Signal parameter can be used to notify callers that new data is -// available without returning from the function. -func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) { - vs.mu.Lock() - if vs.refs != nil { - vs.mu.Unlock() - return nil - } - vs.mu.Unlock() - - waitFirst := vs.signal.Wait() - waitRun := waitFirst - - select { - case <-ctx.Done(): - return ctx.Err() - case <-waitFirst: - } - - // this is where you lookup the cache keys that were successfully probed - - eg, ctx2 := errgroup.WithContext(ctx) - - // process all the inputs - for i, inp := range vs.inputs { - if inp.ref == nil { - func(i int) { - eg.Go(func() error { - inp := vs.inputs[i] - defer inp.ev.Cancel() - - waitNext := waitFirst - for { - select { - case <-ctx2.Done(): - return ctx2.Err() - case <-waitNext: - } - - // check if current cache key is in cache - if len(inp.cacheKeys) > 0 { - ref, err := vs.cache.Lookup(ctx2, inp.cacheKeys[len(inp.cacheKeys)-1], inp.solver.(*vertexSolver).v.Name()) - if err != nil { - return err - } - if ref != nil { - inp.ref = ref.(solver.Ref) - inp.solver.(*vertexSolver).markCachedOnce.Do(func() { - markCached(ctx, inp.solver.(*vertexSolver).cv) - }) - return nil - } - } - - // evaluate next cachekey/reference for input - res, err := inp.ev.Next(ctx2) - if err != nil { - return err - } - if res == nil { // there is no more data coming - return nil - } - if ref := res.Reference; ref != nil { - if ref, ok := reference.ToImmutableRef(ref); ok { - if !cache.HasCachePolicyRetain(ref) { - if err := cache.CachePolicyRetain(ref); err != nil { - return err - } - ref.Metadata().Commit() - } - inp.ref = ref - } - return nil - } - - // Only try matching cache if the cachekey for input is present - exists, err := vs.cache.Probe(ctx2, res.CacheKey) - if err != nil { - return err - } - if exists { - vs.mu.Lock() - inp.cacheKeys = append(inp.cacheKeys, res.CacheKey) - dgst, err := vs.lastCacheKey() - if err != nil { - vs.mu.Unlock() - return err - } - vs.results = append(vs.results, dgst) - signal() // wake up callers - waitNext = vs.signal.Reset() // make sure we don't continue unless there are callers - waitRun = waitNext - vs.mu.Unlock() - } - } - }) - }(i) - } - } - - if err := eg.Wait(); err != nil { - return err - } - - // Find extra cache keys by content - inputRefs := make([]solver.Ref, len(vs.inputs)) - lastInputKeys := make([]digest.Digest, len(vs.inputs)) - for i := range vs.inputs { - inputRefs[i] = vs.inputs[i].ref - lastInputKeys[i] = vs.inputs[i].cacheKeys[len(vs.inputs[i].cacheKeys)-1] - } - - dgst, inp, err := vs.op.ContentMask(ctx) - if err != nil { - return err - } - - var contentKey digest.Digest - if dgst != "" { - contentKey, err = calculateContentHash(ctx, inputRefs, dgst, lastInputKeys, inp) - if err != nil { - return err - } - vs.contentKey = contentKey - - var extraKeys []digest.Digest - cks, err := vs.cache.GetContentMapping(contentKey) - if err != nil { - return err - } - extraKeys = append(extraKeys, cks...) - if len(extraKeys) > 0 { - vs.mu.Lock() - vs.results = append(vs.results, extraKeys...) - signal() - waitRun = vs.signal.Reset() - vs.mu.Unlock() - } - } - - select { - case <-ctx.Done(): - return - case <-waitRun: - } - - // no cache hit. start evaluating the node - notifyStarted(ctx, &vs.cv) - defer func() { - notifyCompleted(ctx, &vs.cv, retErr) - }() - - refs, err := vs.op.Run(ctx, inputRefs) - if err != nil { - return err - } - sr := make([]*reference.SharedRef, len(refs)) - for i, r := range refs { - sr[i] = reference.NewSharedRef(r) - } - vs.mu.Lock() - vs.refs = sr - vs.mu.Unlock() - - // store the cacheKeys for current refs - if vs.cache != nil { - cacheKey, err := vs.lastCacheKey() - if err != nil { - return err - } - for i, ref := range refs { - if err != nil { - return err - } - r := reference.OriginRef(ref) - if err := vs.cache.Set(cacheKeyForIndex(cacheKey, solver.Index(i)), r); err != nil { - logrus.Errorf("failed to save cache for %s: %v", cacheKey, err) - } - } - if contentKey != "" { - if err := vs.cache.SetContentMapping(contentKey, cacheKey); err != nil { - logrus.Errorf("failed to save content mapping: %v", err) - } - } - } - return nil -} - -func getInputContentHash(ctx context.Context, ref cache.ImmutableRef, selectors []string) (digest.Digest, error) { - out := make([]digest.Digest, 0, len(selectors)) - for _, s := range selectors { - dgst, err := contenthash.Checksum(ctx, ref, s) - if err != nil { - return "", err - } - out = append(out, dgst) - } - if len(out) == 1 { - return out[0], nil - } - dt, err := json.Marshal(out) - if err != nil { - return "", err - } - return digest.FromBytes(dt), nil -} - -func calculateContentHash(ctx context.Context, refs []solver.Ref, mainDigest digest.Digest, inputs []digest.Digest, contentMap [][]string) (digest.Digest, error) { - dgsts := make([]digest.Digest, len(contentMap)) - eg, ctx := errgroup.WithContext(ctx) - for i, sel := range contentMap { - if sel == nil { - dgsts[i] = inputs[i] - continue - } - func(i int) { - eg.Go(func() error { - ref, ok := reference.ToImmutableRef(refs[i]) - if !ok { - return errors.Errorf("invalid reference for exporting: %T", ref) - } - dgst, err := getInputContentHash(ctx, ref, contentMap[i]) - if err != nil { - return err - } - dgsts[i] = dgst - return nil - }) - }(i) - } - if err := eg.Wait(); err != nil { - return "", err - } - dt, err := json.Marshal(struct { - Main digest.Digest - Inputs []digest.Digest - }{ - Main: mainDigest, - Inputs: dgsts, - }) - if err != nil { - return "", err - } - return digest.FromBytes(dt), nil -} - -type VertexEvaluator interface { - Next(context.Context) (*VertexResult, error) - Cancel() error -} - -type vertexEvaluator struct { - *vertexSolver - c *bgfunc.Caller - cursor int - index solver.Index -} - -func (ve *vertexEvaluator) Next(ctx context.Context) (*VertexResult, error) { - v, err := ve.c.Call(ctx, func() (interface{}, error) { - ve.mu.Lock() - defer ve.mu.Unlock() - if ve.refs != nil { - return &VertexResult{Reference: ve.refs[int(ve.index)].Clone()}, nil - } - if i := ve.cursor; i < len(ve.results) { - ve.cursor++ - return &VertexResult{CacheKey: cacheKeyForIndex(ve.results[i], ve.index)}, nil - } - ve.signal.Signal() - return nil, nil - }) - if err != nil { - return nil, err - } - if v == nil { - return nil, nil // no more records are coming - } - return v.(*VertexResult), nil -} - -func (ve *vertexEvaluator) Cancel() error { - return ve.c.Cancel() -} - -type VertexResult struct { - CacheKey digest.Digest - Reference solver.Ref -} - -func cacheKeyForIndex(dgst digest.Digest, index solver.Index) digest.Digest { - return digest.FromBytes([]byte(fmt.Sprintf("%s.%d", dgst, index))) -} diff --git a/solver/solver/vertex.go b/solver/solver/vertex.go deleted file mode 100644 index ff4f90e8..00000000 --- a/solver/solver/vertex.go +++ /dev/null @@ -1,104 +0,0 @@ -package solver - -import ( - "sync" - "time" - - "github.com/moby/buildkit/client" - "github.com/moby/buildkit/identity" - "github.com/moby/buildkit/solver" - "github.com/moby/buildkit/solver/pb" - "github.com/moby/buildkit/util/progress" - digest "github.com/opencontainers/go-digest" - "golang.org/x/net/context" -) - -type input struct { - index solver.Index - vertex *vertex -} - -type vertex struct { - mu sync.Mutex - sys interface{} - metadata *pb.OpMetadata - inputs []*input - err error - digest digest.Digest - clientVertex client.Vertex - name string - notifyMu sync.Mutex -} - -func (v *vertex) initClientVertex() { - inputDigests := make([]digest.Digest, 0, len(v.inputs)) - for _, inp := range v.inputs { - inputDigests = append(inputDigests, inp.vertex.Digest()) - } - v.clientVertex = client.Vertex{ - Inputs: inputDigests, - Name: v.Name(), - Digest: v.digest, - } -} - -func (v *vertex) Digest() digest.Digest { - return v.digest -} - -func (v *vertex) Sys() interface{} { - return v.sys -} - -func (v *vertex) Metadata() *pb.OpMetadata { - return v.metadata -} - -func (v *vertex) Inputs() (inputs []solver.Input) { - inputs = make([]solver.Input, 0, len(v.inputs)) - for _, i := range v.inputs { - inputs = append(inputs, solver.Input{i.index, i.vertex}) - } - return -} - -func (v *vertex) Name() string { - return v.name -} - -func notifyStarted(ctx context.Context, v *client.Vertex) { - pw, _, _ := progress.FromContext(ctx) - defer pw.Close() - now := time.Now() - v.Started = &now - v.Completed = nil - pw.Write(v.Digest.String(), *v) -} - -func notifyCompleted(ctx context.Context, v *client.Vertex, err error) { - pw, _, _ := progress.FromContext(ctx) - defer pw.Close() - now := time.Now() - if v.Started == nil { - v.Started = &now - } - v.Completed = &now - v.Cached = false - if err != nil { - v.Error = err.Error() - } - pw.Write(v.Digest.String(), *v) -} - -func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error { - v := client.Vertex{ - Digest: digest.FromBytes([]byte(identity.NewID())), - Name: name, - } - pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest)) - notifyStarted(ctx, &v) - defer pw.Close() - err := f(ctx) - notifyCompleted(ctx, &v, err) - return err -} diff --git a/solver/types.go b/solver/types.go new file mode 100644 index 00000000..a2c81b81 --- /dev/null +++ b/solver/types.go @@ -0,0 +1,16 @@ +package solver + +import ( + "github.com/moby/buildkit/solver/types" +) + +// Ref is a reference to the object passed through the build steps. +// This interface is a subset of the github.com/buildkit/buildkit/cache.Ref interface. +// For ease of unit testing, this interface only has Release(). + +type Ref = types.Ref +type Op = types.Op +type SolveRequest = types.SolveRequest +type Vertex = types.Vertex +type Input = types.Input +type Index = types.Index diff --git a/solver/types/types.go b/solver/types/types.go new file mode 100644 index 00000000..7440744d --- /dev/null +++ b/solver/types/types.go @@ -0,0 +1,58 @@ +package types + +import ( + "github.com/moby/buildkit/exporter" + "github.com/moby/buildkit/frontend" + "github.com/moby/buildkit/solver/pb" + digest "github.com/opencontainers/go-digest" + "golang.org/x/net/context" +) + +// These could be also defined in worker + +type Ref interface { + Release(context.Context) error +} + +// Op is an implementation for running a vertex +type Op interface { + // CacheKey returns a persistent cache key for operation. + CacheKey(context.Context) (digest.Digest, error) + // ContentMask returns a partial cache checksum with content paths to the + // inputs. User can combine the content checksum of these paths to get a valid + // content based cache key. + ContentMask(context.Context) (digest.Digest, [][]string, error) + // Run runs an operation and returns the output references. + Run(ctx context.Context, inputs []Ref) (outputs []Ref, err error) +} + +type SolveRequest struct { + Definition *pb.Definition + Frontend frontend.Frontend + Exporter exporter.ExporterInstance + FrontendOpt map[string]string + ExportCacheRef string + ImportCacheRef string +} + +// Vertex is one node in the build graph +type Vertex interface { + // Digest is a content-addressable vertex identifier + Digest() digest.Digest + // Sys returns an internal value that is used to execute the vertex. Usually + // this is capured by the operation resolver method during solve. + Sys() interface{} + // FIXME(AkihiroSuda): we should not import pb pkg here. + Metadata() *pb.OpMetadata + // Array of vertexes current vertex depends on. + Inputs() []Input + Name() string // change this to general metadata +} + +type Index int + +// Input is an pointer to a single reference from a vertex by an index. +type Input struct { + Index Index + Vertex Vertex +} diff --git a/solver/vertex.go b/solver/vertex.go index 5b0b27e3..1e847e58 100644 --- a/solver/vertex.go +++ b/solver/vertex.go @@ -1,28 +1,103 @@ package solver import ( + "sync" + "time" + + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/identity" "github.com/moby/buildkit/solver/pb" + "github.com/moby/buildkit/util/progress" digest "github.com/opencontainers/go-digest" + "golang.org/x/net/context" ) -// Vertex is one node in the build graph -type Vertex interface { - // Digest is a content-addressable vertex identifier - Digest() digest.Digest - // Sys returns an internal value that is used to execute the vertex. Usually - // this is capured by the operation resolver method during solve. - Sys() interface{} - // FIXME(AkihiroSuda): we should not import pb pkg here. - Metadata() *pb.OpMetadata - // Array of vertexes current vertex depends on. - Inputs() []Input - Name() string // change this to general metadata +type input struct { + index Index + vertex *vertex } -type Index int - -// Input is an pointer to a single reference from a vertex by an index. -type Input struct { - Index Index - Vertex Vertex +type vertex struct { + mu sync.Mutex + sys interface{} + metadata *pb.OpMetadata + inputs []*input + err error + digest digest.Digest + clientVertex client.Vertex + name string + notifyMu sync.Mutex +} + +func (v *vertex) initClientVertex() { + inputDigests := make([]digest.Digest, 0, len(v.inputs)) + for _, inp := range v.inputs { + inputDigests = append(inputDigests, inp.vertex.Digest()) + } + v.clientVertex = client.Vertex{ + Inputs: inputDigests, + Name: v.Name(), + Digest: v.digest, + } +} + +func (v *vertex) Digest() digest.Digest { + return v.digest +} + +func (v *vertex) Sys() interface{} { + return v.sys +} + +func (v *vertex) Metadata() *pb.OpMetadata { + return v.metadata +} + +func (v *vertex) Inputs() (inputs []Input) { + inputs = make([]Input, 0, len(v.inputs)) + for _, i := range v.inputs { + inputs = append(inputs, Input{i.index, i.vertex}) + } + return +} + +func (v *vertex) Name() string { + return v.name +} + +func notifyStarted(ctx context.Context, v *client.Vertex) { + pw, _, _ := progress.FromContext(ctx) + defer pw.Close() + now := time.Now() + v.Started = &now + v.Completed = nil + pw.Write(v.Digest.String(), *v) +} + +func notifyCompleted(ctx context.Context, v *client.Vertex, err error) { + pw, _, _ := progress.FromContext(ctx) + defer pw.Close() + now := time.Now() + if v.Started == nil { + v.Started = &now + } + v.Completed = &now + v.Cached = false + if err != nil { + v.Error = err.Error() + } + pw.Write(v.Digest.String(), *v) +} + +func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error { + v := client.Vertex{ + Digest: digest.FromBytes([]byte(identity.NewID())), + Name: name, + } + pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest)) + notifyStarted(ctx, &v) + defer pw.Close() + err := f(ctx) + notifyCompleted(ctx, &v, err) + return err } diff --git a/worker/base/worker.go b/worker/base/worker.go new file mode 100644 index 00000000..7affebc1 --- /dev/null +++ b/worker/base/worker.go @@ -0,0 +1,260 @@ +package base + +import ( + "io" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/diff" + "github.com/containerd/containerd/images" + ctdsnapshot "github.com/containerd/containerd/snapshots" + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/cache/cacheimport" + "github.com/moby/buildkit/cache/instructioncache" + localcache "github.com/moby/buildkit/cache/instructioncache/local" + "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/executor" + "github.com/moby/buildkit/exporter" + imageexporter "github.com/moby/buildkit/exporter/containerimage" + localexporter "github.com/moby/buildkit/exporter/local" + ociexporter "github.com/moby/buildkit/exporter/oci" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/snapshot/blobmapping" + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/solver/llbop" + "github.com/moby/buildkit/solver/pb" + "github.com/moby/buildkit/source" + "github.com/moby/buildkit/source/containerimage" + "github.com/moby/buildkit/source/git" + "github.com/moby/buildkit/source/http" + "github.com/moby/buildkit/source/local" + "github.com/moby/buildkit/worker" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +// TODO: this file should be removed. containerd defines ContainerdWorker, oci defines OCIWorker. There is no base worker. + +// WorkerOpt is specific to a worker. +// See also CommonOpt. +type WorkerOpt struct { + Name string + SessionManager *session.Manager + MetadataStore *metadata.Store + Executor executor.Executor + BaseSnapshotter ctdsnapshot.Snapshotter // not blobmapping one (FIXME: just require blobmapping snapshotter?) + ContentStore content.Store + Applier diff.Differ + Differ diff.Differ + ImageStore images.Store +} + +// Worker is a local worker instance with dedicated snapshotter, cache, and so on. +// TODO: s/Worker/OpWorker/g ? +// FIXME: Worker should be rather an interface +type Worker struct { + WorkerOpt + Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter + CacheManager cache.Manager + SourceManager *source.Manager + cache instructioncache.InstructionCache + Exporters map[string]exporter.Exporter + ImageSource source.Source + CacheExporter *cacheimport.CacheExporter // TODO: remove + CacheImporter *cacheimport.CacheImporter // TODO: remove + // no frontend here +} + +// NewWorker instantiates a local worker +func NewWorker(opt WorkerOpt) (*Worker, error) { + bmSnapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{ + Content: opt.ContentStore, + Snapshotter: opt.BaseSnapshotter, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + cm, err := cache.NewManager(cache.ManagerOpt{ + Snapshotter: bmSnapshotter, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + ic := &localcache.LocalStore{ + MetadataStore: opt.MetadataStore, + Cache: cm, + } + + sm, err := source.NewManager() + if err != nil { + return nil, err + } + + is, err := containerimage.NewSource(containerimage.SourceOpt{ + Snapshotter: bmSnapshotter, + ContentStore: opt.ContentStore, + SessionManager: opt.SessionManager, + Applier: opt.Applier, + CacheAccessor: cm, + }) + if err != nil { + return nil, err + } + + sm.Register(is) + + gs, err := git.NewSource(git.Opt{ + CacheAccessor: cm, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + sm.Register(gs) + + hs, err := http.NewSource(http.Opt{ + CacheAccessor: cm, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + sm.Register(hs) + + ss, err := local.NewSource(local.Opt{ + SessionManager: opt.SessionManager, + CacheAccessor: cm, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + sm.Register(ss) + + exporters := map[string]exporter.Exporter{} + + iw, err := imageexporter.NewImageWriter(imageexporter.WriterOpt{ + Snapshotter: bmSnapshotter, + ContentStore: opt.ContentStore, + Differ: opt.Differ, + }) + if err != nil { + return nil, err + } + + imageExporter, err := imageexporter.New(imageexporter.Opt{ + Images: opt.ImageStore, + SessionManager: opt.SessionManager, + ImageWriter: iw, + }) + if err != nil { + return nil, err + } + exporters[client.ExporterImage] = imageExporter + + localExporter, err := localexporter.New(localexporter.Opt{ + SessionManager: opt.SessionManager, + }) + if err != nil { + return nil, err + } + exporters[client.ExporterLocal] = localExporter + + ociExporter, err := ociexporter.New(ociexporter.Opt{ + SessionManager: opt.SessionManager, + ImageWriter: iw, + }) + if err != nil { + return nil, err + } + exporters[client.ExporterOCI] = ociExporter + + ce := cacheimport.NewCacheExporter(cacheimport.ExporterOpt{ + Snapshotter: bmSnapshotter, + ContentStore: opt.ContentStore, + SessionManager: opt.SessionManager, + Differ: opt.Differ, + }) + + ci := cacheimport.NewCacheImporter(cacheimport.ImportOpt{ + Snapshotter: bmSnapshotter, + ContentStore: opt.ContentStore, + Applier: opt.Applier, + CacheAccessor: cm, + SessionManager: opt.SessionManager, + }) + + return &Worker{ + WorkerOpt: opt, + Snapshotter: bmSnapshotter, + CacheManager: cm, + SourceManager: sm, + cache: ic, + Exporters: exporters, + ImageSource: is, + CacheExporter: ce, + CacheImporter: ci, + }, nil +} + +func (w *Worker) Resolve(v solver.Vertex, s worker.SubBuilder) (solver.Op, error) { + switch op := v.Sys().(type) { + case *pb.Op_Source: + return llbop.NewSourceOp(v, op, w.SourceManager) + case *pb.Op_Exec: + return llbop.NewExecOp(v, op, w.CacheManager, w.Executor) + case *pb.Op_Build: + return llbop.NewBuildOp(v, op, s) + default: + return nil, errors.Errorf("could not resolve %v", v) + } +} + +func (w *Worker) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) { + // ImageSource is typically source/containerimage + resolveImageConfig, ok := w.ImageSource.(resolveImageConfig) + if !ok { + return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.Name()) + } + return resolveImageConfig.ResolveImageConfig(ctx, ref) +} + +type resolveImageConfig interface { + ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) +} + +func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { + active, err := w.CacheManager.New(ctx, rootFS) + if err != nil { + return err + } + defer active.Release(context.TODO()) + return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr) +} + +func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) { + return w.CacheManager.DiskUsage(ctx, opt) +} + +func (w *Worker) Name() string { + return w.WorkerOpt.Name +} + +func (w *Worker) Exporter(name string) (exporter.Exporter, error) { + exp, ok := w.Exporters[name] + if !ok { + return nil, errors.Errorf("exporter %q could not be found", name) + } + return exp, nil +} + +func (w *Worker) InstructionCache() instructioncache.InstructionCache { + return w.cache +} diff --git a/worker/containerd/containerd.go b/worker/containerd/containerd.go index e43bd13d..cba5a159 100644 --- a/worker/containerd/containerd.go +++ b/worker/containerd/containerd.go @@ -11,7 +11,7 @@ import ( "github.com/containerd/containerd/content" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor/containerdexecutor" - "github.com/moby/buildkit/worker" + "github.com/moby/buildkit/worker/base" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -19,32 +19,32 @@ import ( // NewWorkerOpt creates a WorkerOpt. // But it does not set the following fields: // - SessionManager -func NewWorkerOpt(root string, address, snapshotterName string, opts ...containerd.ClientOpt) (worker.WorkerOpt, error) { +func NewWorkerOpt(root string, address, snapshotterName string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) { // TODO: take lock to make sure there are no duplicates opts = append([]containerd.ClientOpt{containerd.WithDefaultNamespace("buildkit")}, opts...) client, err := containerd.New(address, opts...) if err != nil { - return worker.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address) + return base.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address) } return newContainerd(root, client, snapshotterName) } -func newContainerd(root string, client *containerd.Client, snapshotterName string) (worker.WorkerOpt, error) { +func newContainerd(root string, client *containerd.Client, snapshotterName string) (base.WorkerOpt, error) { if strings.Contains(snapshotterName, "/") { - return worker.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName) + return base.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName) } name := "containerd-" + snapshotterName root = filepath.Join(root, name) if err := os.MkdirAll(root, 0700); err != nil { - return worker.WorkerOpt{}, errors.Wrapf(err, "failed to create %s", root) + return base.WorkerOpt{}, errors.Wrapf(err, "failed to create %s", root) } md, err := metadata.NewStore(filepath.Join(root, "metadata.db")) if err != nil { - return worker.WorkerOpt{}, err + return base.WorkerOpt{}, err } df := client.DiffService() - opt := worker.WorkerOpt{ + opt := base.WorkerOpt{ Name: name, MetadataStore: md, Executor: containerdexecutor.New(client, root), diff --git a/worker/runc/runc.go b/worker/runc/runc.go index 275dadf9..d0a38351 100644 --- a/worker/runc/runc.go +++ b/worker/runc/runc.go @@ -16,15 +16,15 @@ import ( "github.com/containerd/containerd/snapshots/overlay" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor/runcexecutor" - "github.com/moby/buildkit/worker" + "github.com/moby/buildkit/worker/base" "github.com/opencontainers/go-digest" ) // NewWorkerOpt creates a WorkerOpt. // But it does not set the following fields: // - SessionManager -func NewWorkerOpt(root string) (worker.WorkerOpt, error) { - var opt worker.WorkerOpt +func NewWorkerOpt(root string) (base.WorkerOpt, error) { + var opt base.WorkerOpt name := "runc-overlay" root = filepath.Join(root, name) if err := os.MkdirAll(root, 0700); err != nil { @@ -68,7 +68,7 @@ func NewWorkerOpt(root string) (worker.WorkerOpt, error) { // TODO: call mdb.GarbageCollect . maybe just inject it into nsSnapshotter.Remove and csContent.Delete - opt = worker.WorkerOpt{ + opt = base.WorkerOpt{ Name: name, MetadataStore: md, Executor: exe, diff --git a/worker/runc/runc_test.go b/worker/runc/runc_test.go index 64fe979e..446a4aeb 100644 --- a/worker/runc/runc_test.go +++ b/worker/runc/runc_test.go @@ -17,7 +17,7 @@ import ( "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/source" - "github.com/moby/buildkit/worker" + "github.com/moby/buildkit/worker/base" "github.com/stretchr/testify/require" "golang.org/x/net/context" ) @@ -44,7 +44,7 @@ func TestRuncWorker(t *testing.T) { workerOpt.SessionManager, err = session.NewManager() require.NoError(t, err) - w, err := worker.NewWorker(workerOpt) + w, err := base.NewWorker(workerOpt) require.NoError(t, err) img, err := source.NewImageIdentifier("docker.io/library/busybox:latest") diff --git a/worker/worker.go b/worker/worker.go index 3b214e91..6f30c6d5 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,194 +1,44 @@ package worker import ( - "github.com/containerd/containerd/content" - "github.com/containerd/containerd/diff" - "github.com/containerd/containerd/images" - ctdsnapshot "github.com/containerd/containerd/snapshots" + "io" + "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/cacheimport" "github.com/moby/buildkit/cache/instructioncache" - localcache "github.com/moby/buildkit/cache/instructioncache/local" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/exporter" - imageexporter "github.com/moby/buildkit/exporter/containerimage" - localexporter "github.com/moby/buildkit/exporter/local" - ociexporter "github.com/moby/buildkit/exporter/oci" - "github.com/moby/buildkit/session" - "github.com/moby/buildkit/snapshot/blobmapping" - "github.com/moby/buildkit/source" - "github.com/moby/buildkit/source/containerimage" - "github.com/moby/buildkit/source/git" - "github.com/moby/buildkit/source/http" - "github.com/moby/buildkit/source/local" + "github.com/moby/buildkit/solver/types" + digest "github.com/opencontainers/go-digest" + "golang.org/x/net/context" ) -// WorkerOpt is specific to a worker. -// See also CommonOpt. -type WorkerOpt struct { - Name string - SessionManager *session.Manager - MetadataStore *metadata.Store - Executor executor.Executor - BaseSnapshotter ctdsnapshot.Snapshotter // not blobmapping one (FIXME: just require blobmapping snapshotter?) - ContentStore content.Store - Applier diff.Differ - Differ diff.Differ - ImageStore images.Store -} - // Worker is a local worker instance with dedicated snapshotter, cache, and so on. // TODO: s/Worker/OpWorker/g ? // FIXME: Worker should be rather an interface -type Worker struct { - WorkerOpt - Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter - CacheManager cache.Manager - SourceManager *source.Manager - InstructionCache instructioncache.InstructionCache - Exporters map[string]exporter.Exporter - ImageSource source.Source - CacheExporter *cacheimport.CacheExporter - CacheImporter *cacheimport.CacheImporter - // no frontend here +// type Worker struct { +// WorkerOpt +// Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter +// CacheManager cache.Manager +// SourceManager *source.Manager +// InstructionCache instructioncache.InstructionCache +// Exporters map[string]exporter.Exporter +// ImageSource source.Source +// CacheExporter *cacheimport.CacheExporter +// CacheImporter *cacheimport.CacheImporter +// // no frontend here +// } + +type SubBuilder interface { + SubBuild(ctx context.Context, dgst digest.Digest, req types.SolveRequest) (types.Ref, error) } -// NewWorker instantiates a local worker -func NewWorker(opt WorkerOpt) (*Worker, error) { - bmSnapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{ - Content: opt.ContentStore, - Snapshotter: opt.BaseSnapshotter, - MetadataStore: opt.MetadataStore, - }) - if err != nil { - return nil, err - } - - cm, err := cache.NewManager(cache.ManagerOpt{ - Snapshotter: bmSnapshotter, - MetadataStore: opt.MetadataStore, - }) - if err != nil { - return nil, err - } - - ic := &localcache.LocalStore{ - MetadataStore: opt.MetadataStore, - Cache: cm, - } - - sm, err := source.NewManager() - if err != nil { - return nil, err - } - - is, err := containerimage.NewSource(containerimage.SourceOpt{ - Snapshotter: bmSnapshotter, - ContentStore: opt.ContentStore, - SessionManager: opt.SessionManager, - Applier: opt.Applier, - CacheAccessor: cm, - }) - if err != nil { - return nil, err - } - - sm.Register(is) - - gs, err := git.NewSource(git.Opt{ - CacheAccessor: cm, - MetadataStore: opt.MetadataStore, - }) - if err != nil { - return nil, err - } - - sm.Register(gs) - - hs, err := http.NewSource(http.Opt{ - CacheAccessor: cm, - MetadataStore: opt.MetadataStore, - }) - if err != nil { - return nil, err - } - - sm.Register(hs) - - ss, err := local.NewSource(local.Opt{ - SessionManager: opt.SessionManager, - CacheAccessor: cm, - MetadataStore: opt.MetadataStore, - }) - if err != nil { - return nil, err - } - sm.Register(ss) - - exporters := map[string]exporter.Exporter{} - - iw, err := imageexporter.NewImageWriter(imageexporter.WriterOpt{ - Snapshotter: bmSnapshotter, - ContentStore: opt.ContentStore, - Differ: opt.Differ, - }) - if err != nil { - return nil, err - } - - imageExporter, err := imageexporter.New(imageexporter.Opt{ - Images: opt.ImageStore, - SessionManager: opt.SessionManager, - ImageWriter: iw, - }) - if err != nil { - return nil, err - } - exporters[client.ExporterImage] = imageExporter - - localExporter, err := localexporter.New(localexporter.Opt{ - SessionManager: opt.SessionManager, - }) - if err != nil { - return nil, err - } - exporters[client.ExporterLocal] = localExporter - - ociExporter, err := ociexporter.New(ociexporter.Opt{ - SessionManager: opt.SessionManager, - ImageWriter: iw, - }) - if err != nil { - return nil, err - } - exporters[client.ExporterOCI] = ociExporter - - ce := cacheimport.NewCacheExporter(cacheimport.ExporterOpt{ - Snapshotter: bmSnapshotter, - ContentStore: opt.ContentStore, - SessionManager: opt.SessionManager, - Differ: opt.Differ, - }) - - ci := cacheimport.NewCacheImporter(cacheimport.ImportOpt{ - Snapshotter: bmSnapshotter, - ContentStore: opt.ContentStore, - Applier: opt.Applier, - CacheAccessor: cm, - SessionManager: opt.SessionManager, - }) - - return &Worker{ - WorkerOpt: opt, - Snapshotter: bmSnapshotter, - CacheManager: cm, - SourceManager: sm, - InstructionCache: ic, - Exporters: exporters, - ImageSource: is, - CacheExporter: ce, - CacheImporter: ci, - }, nil +type Worker interface { + InstructionCache() instructioncache.InstructionCache + Resolve(v types.Vertex, s SubBuilder) (types.Op, error) + ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) + Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error + DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) + Name() string + Exporter(name string) (exporter.Exporter, error) } diff --git a/worker/workercontroller.go b/worker/workercontroller.go index b05c854d..a74c29db 100644 --- a/worker/workercontroller.go +++ b/worker/workercontroller.go @@ -11,11 +11,11 @@ import ( type Controller struct { mu sync.Mutex // TODO: define worker interface and support remote ones - workers []*Worker + workers []Worker } // Add adds a local worker -func (c *Controller) Add(w *Worker) error { +func (c *Controller) Add(w Worker) error { c.mu.Lock() c.workers = append(c.workers, w) c.mu.Unlock() @@ -23,7 +23,7 @@ func (c *Controller) Add(w *Worker) error { } // GetAll returns all local workers -func (c *Controller) GetAll() []*Worker { +func (c *Controller) GetAll() []Worker { c.mu.Lock() workers := c.workers c.mu.Unlock() @@ -31,8 +31,8 @@ func (c *Controller) GetAll() []*Worker { } // GetDefault returns the default local worker -func (c *Controller) GetDefault() (*Worker, error) { - var w *Worker +func (c *Controller) GetDefault() (Worker, error) { + var w Worker c.mu.Lock() if len(c.workers) > 0 { w = c.workers[0]