solver: split llbop pkg
Signed-off-by: Akihiro Suda <suda.akihiro@lab.ntt.co.jp>docker-18.09
parent
b55bf20d7e
commit
a4316d16c7
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/moby/buildkit/session"
|
"github.com/moby/buildkit/session"
|
||||||
"github.com/moby/buildkit/session/grpchijack"
|
"github.com/moby/buildkit/session/grpchijack"
|
||||||
"github.com/moby/buildkit/solver"
|
"github.com/moby/buildkit/solver"
|
||||||
|
solverimpl "github.com/moby/buildkit/solver/solver"
|
||||||
"github.com/moby/buildkit/worker"
|
"github.com/moby/buildkit/worker"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -25,13 +26,13 @@ type Opt struct {
|
||||||
|
|
||||||
type Controller struct { // TODO: ControlService
|
type Controller struct { // TODO: ControlService
|
||||||
opt Opt
|
opt Opt
|
||||||
solver *solver.Solver
|
solver solver.Solver
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewController(opt Opt) (*Controller, error) {
|
func NewController(opt Opt) (*Controller, error) {
|
||||||
c := &Controller{
|
c := &Controller{
|
||||||
opt: opt,
|
opt: opt,
|
||||||
solver: solver.NewLLBSolver(opt.WorkerController, opt.Frontends),
|
solver: solverimpl.NewLLBOpSolver(opt.WorkerController, opt.Frontends),
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,46 +1,47 @@
|
||||||
package solver
|
package reference
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/moby/buildkit/cache"
|
"github.com/moby/buildkit/cache"
|
||||||
|
"github.com/moby/buildkit/solver"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
// sharedRef is a wrapper around releasable that allows you to make new
|
// SharedRef is a wrapper around releasable that allows you to make new
|
||||||
// releasable child objects
|
// releasable child objects
|
||||||
type sharedRef struct {
|
type SharedRef struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
refs map[*sharedRefInstance]struct{}
|
refs map[*sharedRefInstance]struct{}
|
||||||
main Reference
|
main solver.Ref
|
||||||
Reference
|
solver.Ref
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSharedRef(main Reference) *sharedRef {
|
func NewSharedRef(main solver.Ref) *SharedRef {
|
||||||
mr := &sharedRef{
|
mr := &SharedRef{
|
||||||
refs: make(map[*sharedRefInstance]struct{}),
|
refs: make(map[*sharedRefInstance]struct{}),
|
||||||
Reference: main,
|
Ref: main,
|
||||||
}
|
}
|
||||||
mr.main = mr.Clone()
|
mr.main = mr.Clone()
|
||||||
return mr
|
return mr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mr *sharedRef) Clone() Reference {
|
func (mr *SharedRef) Clone() solver.Ref {
|
||||||
mr.mu.Lock()
|
mr.mu.Lock()
|
||||||
r := &sharedRefInstance{sharedRef: mr}
|
r := &sharedRefInstance{SharedRef: mr}
|
||||||
mr.refs[r] = struct{}{}
|
mr.refs[r] = struct{}{}
|
||||||
mr.mu.Unlock()
|
mr.mu.Unlock()
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mr *sharedRef) Release(ctx context.Context) error {
|
func (mr *SharedRef) Release(ctx context.Context) error {
|
||||||
return mr.main.Release(ctx)
|
return mr.main.Release(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mr *sharedRef) Sys() Reference {
|
func (mr *SharedRef) Sys() solver.Ref {
|
||||||
sys := mr.Reference
|
sys := mr.Ref
|
||||||
if s, ok := sys.(interface {
|
if s, ok := sys.(interface {
|
||||||
Sys() Reference
|
Sys() solver.Ref
|
||||||
}); ok {
|
}); ok {
|
||||||
return s.Sys()
|
return s.Sys()
|
||||||
}
|
}
|
||||||
|
@ -48,31 +49,31 @@ func (mr *sharedRef) Sys() Reference {
|
||||||
}
|
}
|
||||||
|
|
||||||
type sharedRefInstance struct {
|
type sharedRefInstance struct {
|
||||||
*sharedRef
|
*SharedRef
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *sharedRefInstance) Release(ctx context.Context) error {
|
func (r *sharedRefInstance) Release(ctx context.Context) error {
|
||||||
r.sharedRef.mu.Lock()
|
r.SharedRef.mu.Lock()
|
||||||
defer r.sharedRef.mu.Unlock()
|
defer r.SharedRef.mu.Unlock()
|
||||||
delete(r.sharedRef.refs, r)
|
delete(r.SharedRef.refs, r)
|
||||||
if len(r.sharedRef.refs) == 0 {
|
if len(r.SharedRef.refs) == 0 {
|
||||||
return r.sharedRef.Reference.Release(ctx)
|
return r.SharedRef.Ref.Release(ctx)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func originRef(ref Reference) Reference {
|
func OriginRef(ref solver.Ref) solver.Ref {
|
||||||
sysRef := ref
|
sysRef := ref
|
||||||
if sys, ok := ref.(interface {
|
if sys, ok := ref.(interface {
|
||||||
Sys() Reference
|
Sys() solver.Ref
|
||||||
}); ok {
|
}); ok {
|
||||||
sysRef = sys.Sys()
|
sysRef = sys.Sys()
|
||||||
}
|
}
|
||||||
return sysRef
|
return sysRef
|
||||||
}
|
}
|
||||||
|
|
||||||
func toImmutableRef(ref Reference) (cache.ImmutableRef, bool) {
|
func ToImmutableRef(ref solver.Ref) (cache.ImmutableRef, bool) {
|
||||||
immutable, ok := originRef(ref).(cache.ImmutableRef)
|
immutable, ok := OriginRef(ref).(cache.ImmutableRef)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
753
solver/solver.go
753
solver/solver.go
|
@ -1,65 +1,18 @@
|
||||||
package solver
|
package solver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/moby/buildkit/cache"
|
|
||||||
"github.com/moby/buildkit/cache/cacheimport"
|
|
||||||
"github.com/moby/buildkit/cache/contenthash"
|
|
||||||
"github.com/moby/buildkit/cache/instructioncache"
|
|
||||||
"github.com/moby/buildkit/client"
|
"github.com/moby/buildkit/client"
|
||||||
"github.com/moby/buildkit/exporter"
|
"github.com/moby/buildkit/exporter"
|
||||||
"github.com/moby/buildkit/frontend"
|
"github.com/moby/buildkit/frontend"
|
||||||
"github.com/moby/buildkit/solver/pb"
|
"github.com/moby/buildkit/solver/pb"
|
||||||
"github.com/moby/buildkit/util/bgfunc"
|
|
||||||
"github.com/moby/buildkit/util/progress"
|
|
||||||
"github.com/moby/buildkit/worker"
|
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// DetermineVertexWorker determines worker for a vertex.
|
// Ref is a reference to the object passed through the build steps.
|
||||||
// Currently, constraint is just ignored.
|
// This interface is a subset of the github.com/buildkit/buildkit/cache.Ref interface.
|
||||||
// Also we need to track the workers of the inputs.
|
|
||||||
func DetermineVertexWorker(wc *worker.Controller, v Vertex) (*worker.Worker, error) {
|
|
||||||
// TODO: multiworker
|
|
||||||
return wc.GetDefault()
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewLLBSolver(wc *worker.Controller, frontends map[string]frontend.Frontend) *Solver {
|
|
||||||
var s *Solver
|
|
||||||
s = New(func(v Vertex) (Op, error) {
|
|
||||||
w, err := DetermineVertexWorker(wc, v)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
switch op := v.Sys().(type) {
|
|
||||||
case *pb.Op_Source:
|
|
||||||
return newSourceOp(v, op, w.SourceManager)
|
|
||||||
case *pb.Op_Exec:
|
|
||||||
return newExecOp(v, op, w.CacheManager, w.Executor)
|
|
||||||
case *pb.Op_Build:
|
|
||||||
return newBuildOp(v, op, s)
|
|
||||||
default:
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
}, wc, frontends)
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResolveOpFunc finds an Op implementation for a vertex
|
|
||||||
type ResolveOpFunc func(Vertex) (Op, error)
|
|
||||||
|
|
||||||
// Reference is a reference to the object passed through the build steps.
|
|
||||||
// This interface is a subset of the cache.Ref interface.
|
|
||||||
// For ease of unit testing, this interface only has Release().
|
// For ease of unit testing, this interface only has Release().
|
||||||
type Reference interface {
|
type Ref interface {
|
||||||
Release(context.Context) error
|
Release(context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,18 +25,7 @@ type Op interface {
|
||||||
// content based cache key.
|
// content based cache key.
|
||||||
ContentMask(context.Context) (digest.Digest, [][]string, error)
|
ContentMask(context.Context) (digest.Digest, [][]string, error)
|
||||||
// Run runs an operation and returns the output references.
|
// Run runs an operation and returns the output references.
|
||||||
Run(ctx context.Context, inputs []Reference) (outputs []Reference, err error)
|
Run(ctx context.Context, inputs []Ref) (outputs []Ref, err error)
|
||||||
}
|
|
||||||
|
|
||||||
type Solver struct {
|
|
||||||
resolve ResolveOpFunc
|
|
||||||
jobs *jobList
|
|
||||||
workerController *worker.Controller
|
|
||||||
frontends map[string]frontend.Frontend
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(resolve ResolveOpFunc, wc *worker.Controller, f map[string]frontend.Frontend) *Solver {
|
|
||||||
return &Solver{resolve: resolve, jobs: newJobList(), workerController: wc, frontends: f}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type SolveRequest struct {
|
type SolveRequest struct {
|
||||||
|
@ -95,688 +37,7 @@ type SolveRequest struct {
|
||||||
ImportCacheRef string
|
ImportCacheRef string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Solver) solve(ctx context.Context, j *job, req SolveRequest) (Reference, map[string][]byte, error) {
|
type Solver interface {
|
||||||
if req.Definition == nil {
|
Solve(ctx context.Context, id string, req SolveRequest) error
|
||||||
if req.Frontend == nil {
|
Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error
|
||||||
return nil, nil, errors.Errorf("invalid request: no definition nor frontend")
|
|
||||||
}
|
|
||||||
return req.Frontend.Solve(ctx, s.llbBridge(j), req.FrontendOpt)
|
|
||||||
}
|
|
||||||
|
|
||||||
inp, err := j.load(req.Definition, s.resolve)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
ref, err := j.getRef(ctx, inp.Vertex.(*vertex), inp.Index)
|
|
||||||
return ref, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Solver) llbBridge(j *job) *llbBridge {
|
|
||||||
// FIXME(AkihiroSuda): make sure worker implements interfaces required by llbBridge
|
|
||||||
worker, err := s.workerController.GetDefault()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return &llbBridge{job: j, Solver: s, worker: worker}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error {
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pr, ctx, closeProgressWriter := progress.NewContext(ctx)
|
|
||||||
defer closeProgressWriter()
|
|
||||||
|
|
||||||
// TODO: multiworker
|
|
||||||
defaultWorker, err := s.workerController.GetDefault()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if importRef := req.ImportCacheRef; importRef != "" {
|
|
||||||
cache, err := defaultWorker.CacheImporter.Import(ctx, importRef)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defaultWorker.InstructionCache = instructioncache.Union(defaultWorker.InstructionCache, cache)
|
|
||||||
}
|
|
||||||
|
|
||||||
// register a build job. vertex needs to be loaded to a job to run
|
|
||||||
ctx, j, err := s.jobs.new(ctx, id, pr, defaultWorker.InstructionCache)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ref, exporterOpt, err := s.solve(ctx, j, req)
|
|
||||||
defer j.discard()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if ref != nil {
|
|
||||||
go ref.Release(context.TODO())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var immutable cache.ImmutableRef
|
|
||||||
if ref != nil {
|
|
||||||
var ok bool
|
|
||||||
immutable, ok = toImmutableRef(ref)
|
|
||||||
if !ok {
|
|
||||||
return errors.Errorf("invalid reference for exporting: %T", ref)
|
|
||||||
}
|
|
||||||
if err := immutable.Finalize(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if exp := req.Exporter; exp != nil {
|
|
||||||
if err := inVertexContext(ctx, exp.Name(), func(ctx context.Context) error {
|
|
||||||
return exp.Export(ctx, immutable, exporterOpt)
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if exportName := req.ExportCacheRef; exportName != "" {
|
|
||||||
if err := inVertexContext(ctx, "exporting build cache", func(ctx context.Context) error {
|
|
||||||
cache, err := j.cacheExporter(ref)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
records, err := cache.Export(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: multiworker
|
|
||||||
return defaultWorker.CacheExporter.Export(ctx, records, exportName)
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error {
|
|
||||||
j, err := s.jobs.get(id)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer close(statusChan)
|
|
||||||
return j.pipe(ctx, statusChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Solver) subBuild(ctx context.Context, dgst digest.Digest, req SolveRequest) (Reference, error) {
|
|
||||||
jl := s.jobs
|
|
||||||
jl.mu.Lock()
|
|
||||||
st, ok := jl.actives[dgst]
|
|
||||||
if !ok {
|
|
||||||
jl.mu.Unlock()
|
|
||||||
return nil, errors.Errorf("no such parent vertex: %v", dgst)
|
|
||||||
}
|
|
||||||
|
|
||||||
var inp *Input
|
|
||||||
for j := range st.jobs {
|
|
||||||
var err error
|
|
||||||
inp, err = j.loadInternal(req.Definition, s.resolve)
|
|
||||||
if err != nil {
|
|
||||||
jl.mu.Unlock()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
st = jl.actives[inp.Vertex.Digest()]
|
|
||||||
jl.mu.Unlock()
|
|
||||||
|
|
||||||
w, err := DetermineVertexWorker(s.workerController, inp.Vertex)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return getRef(ctx, st.solver, inp.Vertex.(*vertex), inp.Index, w.InstructionCache) // TODO: combine to pass single input // TODO: export cache for subbuilds
|
|
||||||
}
|
|
||||||
|
|
||||||
type VertexSolver interface {
|
|
||||||
CacheKey(ctx context.Context, index Index) (digest.Digest, error)
|
|
||||||
OutputEvaluator(Index) (VertexEvaluator, error)
|
|
||||||
Release() error
|
|
||||||
Cache(Index, Reference) CacheExporter
|
|
||||||
}
|
|
||||||
|
|
||||||
type vertexInput struct {
|
|
||||||
solver VertexSolver
|
|
||||||
ev VertexEvaluator
|
|
||||||
cacheKeys []digest.Digest
|
|
||||||
ref Reference
|
|
||||||
}
|
|
||||||
|
|
||||||
type vertexSolver struct {
|
|
||||||
inputs []*vertexInput
|
|
||||||
v *vertex
|
|
||||||
cv client.Vertex
|
|
||||||
op Op
|
|
||||||
cache instructioncache.InstructionCache
|
|
||||||
refs []*sharedRef
|
|
||||||
f *bgfunc.F
|
|
||||||
ctx context.Context
|
|
||||||
|
|
||||||
baseKey digest.Digest
|
|
||||||
mu sync.Mutex
|
|
||||||
results []digest.Digest
|
|
||||||
markCachedOnce sync.Once
|
|
||||||
contentKey digest.Digest
|
|
||||||
|
|
||||||
signal *signal // used to notify that there are callers who need more data
|
|
||||||
}
|
|
||||||
|
|
||||||
type resolveF func(digest.Digest) (VertexSolver, error)
|
|
||||||
|
|
||||||
func newVertexSolver(ctx context.Context, v *vertex, op Op, c instructioncache.InstructionCache, resolve resolveF) (*vertexSolver, error) {
|
|
||||||
inputs := make([]*vertexInput, len(v.inputs))
|
|
||||||
for i, in := range v.inputs {
|
|
||||||
s, err := resolve(in.vertex.digest)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ev, err := s.OutputEvaluator(in.index)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ev.Cancel()
|
|
||||||
inputs[i] = &vertexInput{
|
|
||||||
solver: s,
|
|
||||||
ev: ev,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &vertexSolver{
|
|
||||||
ctx: ctx,
|
|
||||||
inputs: inputs,
|
|
||||||
v: v,
|
|
||||||
cv: v.clientVertex,
|
|
||||||
op: op,
|
|
||||||
cache: c,
|
|
||||||
signal: newSignaller(),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func markCached(ctx context.Context, cv client.Vertex) {
|
|
||||||
pw, _, _ := progress.FromContext(ctx)
|
|
||||||
defer pw.Close()
|
|
||||||
|
|
||||||
if cv.Started == nil {
|
|
||||||
now := time.Now()
|
|
||||||
cv.Started = &now
|
|
||||||
cv.Completed = &now
|
|
||||||
cv.Cached = true
|
|
||||||
}
|
|
||||||
pw.Write(cv.Digest.String(), cv)
|
|
||||||
}
|
|
||||||
|
|
||||||
type CacheExporter interface {
|
|
||||||
Export(context.Context) ([]cacheimport.CacheRecord, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *vertexSolver) Cache(index Index, ref Reference) CacheExporter {
|
|
||||||
return &cacheExporter{vertexSolver: vs, index: index, ref: ref}
|
|
||||||
}
|
|
||||||
|
|
||||||
type cacheExporter struct {
|
|
||||||
*vertexSolver
|
|
||||||
index Index
|
|
||||||
ref Reference
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ce *cacheExporter) Export(ctx context.Context) ([]cacheimport.CacheRecord, error) {
|
|
||||||
return ce.vertexSolver.Export(ctx, ce.index, ce.ref)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *vertexSolver) Export(ctx context.Context, index Index, ref Reference) ([]cacheimport.CacheRecord, error) {
|
|
||||||
mp := map[digest.Digest]cacheimport.CacheRecord{}
|
|
||||||
if err := vs.appendInputCache(ctx, mp); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
dgst, err := vs.mainCacheKey()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
immutable, ok := toImmutableRef(ref)
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.Errorf("invalid reference")
|
|
||||||
}
|
|
||||||
dgst = cacheKeyForIndex(dgst, index)
|
|
||||||
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: immutable}
|
|
||||||
out := make([]cacheimport.CacheRecord, 0, len(mp))
|
|
||||||
for _, cr := range mp {
|
|
||||||
out = append(out, cr)
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *vertexSolver) appendInputCache(ctx context.Context, mp map[digest.Digest]cacheimport.CacheRecord) error {
|
|
||||||
for i, inp := range vs.inputs {
|
|
||||||
mainDgst, err := inp.solver.(*vertexSolver).mainCacheKey()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
dgst := cacheKeyForIndex(mainDgst, vs.v.inputs[i].index)
|
|
||||||
if cr, ok := mp[dgst]; !ok || (cr.Reference == nil && inp.ref != nil) {
|
|
||||||
if err := inp.solver.(*vertexSolver).appendInputCache(ctx, mp); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if inp.ref != nil && len(inp.solver.(*vertexSolver).inputs) > 0 { // Ignore pushing the refs for sources
|
|
||||||
ref, ok := toImmutableRef(inp.ref)
|
|
||||||
if !ok {
|
|
||||||
return errors.Errorf("invalid reference")
|
|
||||||
}
|
|
||||||
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: ref}
|
|
||||||
} else {
|
|
||||||
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ck := vs.contentKey; ck != "" {
|
|
||||||
mainDgst, err := vs.mainCacheKey()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
mp[ck] = cacheimport.CacheRecord{CacheKey: mainDgst, ContentKey: ck}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *vertexSolver) CacheKey(ctx context.Context, index Index) (digest.Digest, error) {
|
|
||||||
vs.mu.Lock()
|
|
||||||
defer vs.mu.Unlock()
|
|
||||||
if vs.baseKey == "" {
|
|
||||||
eg, ctx := errgroup.WithContext(vs.ctx)
|
|
||||||
for i := range vs.inputs {
|
|
||||||
func(i int) {
|
|
||||||
eg.Go(func() error {
|
|
||||||
k, err := vs.inputs[i].solver.CacheKey(ctx, vs.v.inputs[i].index)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
vs.inputs[i].cacheKeys = append(vs.inputs[i].cacheKeys, k)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
var dgst digest.Digest
|
|
||||||
eg.Go(func() error {
|
|
||||||
var err error
|
|
||||||
dgst, err = vs.op.CacheKey(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
if err := eg.Wait(); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
vs.baseKey = dgst
|
|
||||||
}
|
|
||||||
|
|
||||||
k, err := vs.lastCacheKey()
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return cacheKeyForIndex(k, index), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *vertexSolver) lastCacheKey() (digest.Digest, error) {
|
|
||||||
return vs.currentCacheKey(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *vertexSolver) mainCacheKey() (digest.Digest, error) {
|
|
||||||
return vs.currentCacheKey(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *vertexSolver) currentCacheKey(last bool) (digest.Digest, error) {
|
|
||||||
inputKeys := make([]digest.Digest, len(vs.inputs))
|
|
||||||
for i, inp := range vs.inputs {
|
|
||||||
if len(inp.cacheKeys) == 0 {
|
|
||||||
return "", errors.Errorf("inputs not processed")
|
|
||||||
}
|
|
||||||
if last {
|
|
||||||
inputKeys[i] = inp.cacheKeys[len(inp.cacheKeys)-1]
|
|
||||||
} else {
|
|
||||||
inputKeys[i] = inp.cacheKeys[0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
dt, err := json.Marshal(struct {
|
|
||||||
Inputs []digest.Digest
|
|
||||||
CacheKey digest.Digest
|
|
||||||
}{Inputs: inputKeys, CacheKey: vs.baseKey})
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return digest.FromBytes(dt), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *vertexSolver) OutputEvaluator(index Index) (VertexEvaluator, error) {
|
|
||||||
if vs.f == nil {
|
|
||||||
f, err := bgfunc.New(vs.ctx, vs.run)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
vs.f = f
|
|
||||||
}
|
|
||||||
c := vs.f.NewCaller()
|
|
||||||
ve := &vertexEvaluator{vertexSolver: vs, c: c, index: index}
|
|
||||||
return ve, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *vertexSolver) Release() error {
|
|
||||||
for _, inp := range vs.inputs {
|
|
||||||
if inp.ref != nil {
|
|
||||||
inp.ref.Release(context.TODO())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if vs.refs != nil {
|
|
||||||
for _, r := range vs.refs {
|
|
||||||
r.Release(context.TODO())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// run is called by the bgfunc concurrency primitive. This function may be
|
|
||||||
// called multiple times but never in parallal. Repeated calls should make an
|
|
||||||
// effort to continue from previous state. Lock vs.mu to syncronize data to the
|
|
||||||
// callers. Signal parameter can be used to notify callers that new data is
|
|
||||||
// available without returning from the function.
|
|
||||||
func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
|
|
||||||
vs.mu.Lock()
|
|
||||||
if vs.refs != nil {
|
|
||||||
vs.mu.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
vs.mu.Unlock()
|
|
||||||
|
|
||||||
waitFirst := vs.signal.Wait()
|
|
||||||
waitRun := waitFirst
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case <-waitFirst:
|
|
||||||
}
|
|
||||||
|
|
||||||
// this is where you lookup the cache keys that were successfully probed
|
|
||||||
|
|
||||||
eg, ctx2 := errgroup.WithContext(ctx)
|
|
||||||
|
|
||||||
// process all the inputs
|
|
||||||
for i, inp := range vs.inputs {
|
|
||||||
if inp.ref == nil {
|
|
||||||
func(i int) {
|
|
||||||
eg.Go(func() error {
|
|
||||||
inp := vs.inputs[i]
|
|
||||||
defer inp.ev.Cancel()
|
|
||||||
|
|
||||||
waitNext := waitFirst
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx2.Done():
|
|
||||||
return ctx2.Err()
|
|
||||||
case <-waitNext:
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if current cache key is in cache
|
|
||||||
if len(inp.cacheKeys) > 0 {
|
|
||||||
ref, err := vs.cache.Lookup(ctx2, inp.cacheKeys[len(inp.cacheKeys)-1], inp.solver.(*vertexSolver).v.Name())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if ref != nil {
|
|
||||||
inp.ref = ref.(Reference)
|
|
||||||
inp.solver.(*vertexSolver).markCachedOnce.Do(func() {
|
|
||||||
markCached(ctx, inp.solver.(*vertexSolver).cv)
|
|
||||||
})
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// evaluate next cachekey/reference for input
|
|
||||||
res, err := inp.ev.Next(ctx2)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if res == nil { // there is no more data coming
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if ref := res.Reference; ref != nil {
|
|
||||||
if ref, ok := toImmutableRef(ref); ok {
|
|
||||||
if !cache.HasCachePolicyRetain(ref) {
|
|
||||||
if err := cache.CachePolicyRetain(ref); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
ref.Metadata().Commit()
|
|
||||||
}
|
|
||||||
inp.ref = ref
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only try matching cache if the cachekey for input is present
|
|
||||||
exists, err := vs.cache.Probe(ctx2, res.CacheKey)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if exists {
|
|
||||||
vs.mu.Lock()
|
|
||||||
inp.cacheKeys = append(inp.cacheKeys, res.CacheKey)
|
|
||||||
dgst, err := vs.lastCacheKey()
|
|
||||||
if err != nil {
|
|
||||||
vs.mu.Unlock()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
vs.results = append(vs.results, dgst)
|
|
||||||
signal() // wake up callers
|
|
||||||
waitNext = vs.signal.Reset() // make sure we don't continue unless there are callers
|
|
||||||
waitRun = waitNext
|
|
||||||
vs.mu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := eg.Wait(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find extra cache keys by content
|
|
||||||
inputRefs := make([]Reference, len(vs.inputs))
|
|
||||||
lastInputKeys := make([]digest.Digest, len(vs.inputs))
|
|
||||||
for i := range vs.inputs {
|
|
||||||
inputRefs[i] = vs.inputs[i].ref
|
|
||||||
lastInputKeys[i] = vs.inputs[i].cacheKeys[len(vs.inputs[i].cacheKeys)-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
dgst, inp, err := vs.op.ContentMask(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var contentKey digest.Digest
|
|
||||||
if dgst != "" {
|
|
||||||
contentKey, err = calculateContentHash(ctx, inputRefs, dgst, lastInputKeys, inp)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
vs.contentKey = contentKey
|
|
||||||
|
|
||||||
var extraKeys []digest.Digest
|
|
||||||
cks, err := vs.cache.GetContentMapping(contentKey)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
extraKeys = append(extraKeys, cks...)
|
|
||||||
if len(extraKeys) > 0 {
|
|
||||||
vs.mu.Lock()
|
|
||||||
vs.results = append(vs.results, extraKeys...)
|
|
||||||
signal()
|
|
||||||
waitRun = vs.signal.Reset()
|
|
||||||
vs.mu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-waitRun:
|
|
||||||
}
|
|
||||||
|
|
||||||
// no cache hit. start evaluating the node
|
|
||||||
notifyStarted(ctx, &vs.cv)
|
|
||||||
defer func() {
|
|
||||||
notifyCompleted(ctx, &vs.cv, retErr)
|
|
||||||
}()
|
|
||||||
|
|
||||||
refs, err := vs.op.Run(ctx, inputRefs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sr := make([]*sharedRef, len(refs))
|
|
||||||
for i, r := range refs {
|
|
||||||
sr[i] = newSharedRef(r)
|
|
||||||
}
|
|
||||||
vs.mu.Lock()
|
|
||||||
vs.refs = sr
|
|
||||||
vs.mu.Unlock()
|
|
||||||
|
|
||||||
// store the cacheKeys for current refs
|
|
||||||
if vs.cache != nil {
|
|
||||||
cacheKey, err := vs.lastCacheKey()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for i, ref := range refs {
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
r := originRef(ref)
|
|
||||||
if err := vs.cache.Set(cacheKeyForIndex(cacheKey, Index(i)), r); err != nil {
|
|
||||||
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if contentKey != "" {
|
|
||||||
if err := vs.cache.SetContentMapping(contentKey, cacheKey); err != nil {
|
|
||||||
logrus.Errorf("failed to save content mapping: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getInputContentHash(ctx context.Context, ref cache.ImmutableRef, selectors []string) (digest.Digest, error) {
|
|
||||||
out := make([]digest.Digest, 0, len(selectors))
|
|
||||||
for _, s := range selectors {
|
|
||||||
dgst, err := contenthash.Checksum(ctx, ref, s)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
out = append(out, dgst)
|
|
||||||
}
|
|
||||||
if len(out) == 1 {
|
|
||||||
return out[0], nil
|
|
||||||
}
|
|
||||||
dt, err := json.Marshal(out)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return digest.FromBytes(dt), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func calculateContentHash(ctx context.Context, refs []Reference, mainDigest digest.Digest, inputs []digest.Digest, contentMap [][]string) (digest.Digest, error) {
|
|
||||||
dgsts := make([]digest.Digest, len(contentMap))
|
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
|
||||||
for i, sel := range contentMap {
|
|
||||||
if sel == nil {
|
|
||||||
dgsts[i] = inputs[i]
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
func(i int) {
|
|
||||||
eg.Go(func() error {
|
|
||||||
ref, ok := toImmutableRef(refs[i])
|
|
||||||
if !ok {
|
|
||||||
return errors.Errorf("invalid reference for exporting: %T", ref)
|
|
||||||
}
|
|
||||||
dgst, err := getInputContentHash(ctx, ref, contentMap[i])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
dgsts[i] = dgst
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
if err := eg.Wait(); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
dt, err := json.Marshal(struct {
|
|
||||||
Main digest.Digest
|
|
||||||
Inputs []digest.Digest
|
|
||||||
}{
|
|
||||||
Main: mainDigest,
|
|
||||||
Inputs: dgsts,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return digest.FromBytes(dt), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type VertexEvaluator interface {
|
|
||||||
Next(context.Context) (*VertexResult, error)
|
|
||||||
Cancel() error
|
|
||||||
}
|
|
||||||
|
|
||||||
type vertexEvaluator struct {
|
|
||||||
*vertexSolver
|
|
||||||
c *bgfunc.Caller
|
|
||||||
cursor int
|
|
||||||
index Index
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ve *vertexEvaluator) Next(ctx context.Context) (*VertexResult, error) {
|
|
||||||
v, err := ve.c.Call(ctx, func() (interface{}, error) {
|
|
||||||
ve.mu.Lock()
|
|
||||||
defer ve.mu.Unlock()
|
|
||||||
if ve.refs != nil {
|
|
||||||
return &VertexResult{Reference: ve.refs[int(ve.index)].Clone()}, nil
|
|
||||||
}
|
|
||||||
if i := ve.cursor; i < len(ve.results) {
|
|
||||||
ve.cursor++
|
|
||||||
return &VertexResult{CacheKey: cacheKeyForIndex(ve.results[i], ve.index)}, nil
|
|
||||||
}
|
|
||||||
ve.signal.Signal()
|
|
||||||
return nil, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if v == nil {
|
|
||||||
return nil, nil // no more records are coming
|
|
||||||
}
|
|
||||||
return v.(*VertexResult), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ve *vertexEvaluator) Cancel() error {
|
|
||||||
return ve.c.Cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
type VertexResult struct {
|
|
||||||
CacheKey digest.Digest
|
|
||||||
Reference Reference
|
|
||||||
}
|
|
||||||
|
|
||||||
func cacheKeyForIndex(dgst digest.Digest, index Index) digest.Digest {
|
|
||||||
return digest.FromBytes([]byte(fmt.Sprintf("%s.%d", dgst, index)))
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,9 @@ import (
|
||||||
"github.com/moby/buildkit/cache/instructioncache"
|
"github.com/moby/buildkit/cache/instructioncache"
|
||||||
"github.com/moby/buildkit/client"
|
"github.com/moby/buildkit/client"
|
||||||
"github.com/moby/buildkit/session"
|
"github.com/moby/buildkit/session"
|
||||||
|
"github.com/moby/buildkit/solver"
|
||||||
"github.com/moby/buildkit/solver/pb"
|
"github.com/moby/buildkit/solver/pb"
|
||||||
|
"github.com/moby/buildkit/solver/reference"
|
||||||
"github.com/moby/buildkit/util/progress"
|
"github.com/moby/buildkit/util/progress"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -161,18 +163,18 @@ type job struct {
|
||||||
|
|
||||||
type cacheRecord struct {
|
type cacheRecord struct {
|
||||||
VertexSolver
|
VertexSolver
|
||||||
index Index
|
index solver.Index
|
||||||
ref Reference
|
ref solver.Ref
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *job) load(def *pb.Definition, resolveOp ResolveOpFunc) (*Input, error) {
|
func (j *job) load(def *pb.Definition, resolveOp ResolveOpFunc) (*solver.Input, error) {
|
||||||
j.l.mu.Lock()
|
j.l.mu.Lock()
|
||||||
defer j.l.mu.Unlock()
|
defer j.l.mu.Unlock()
|
||||||
|
|
||||||
return j.loadInternal(def, resolveOp)
|
return j.loadInternal(def, resolveOp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*Input, error) {
|
func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*solver.Input, error) {
|
||||||
vtx, idx, err := loadLLB(def, func(dgst digest.Digest, pbOp *pb.Op, load func(digest.Digest) (interface{}, error)) (interface{}, error) {
|
vtx, idx, err := loadLLB(def, func(dgst digest.Digest, pbOp *pb.Op, load func(digest.Digest) (interface{}, error)) (interface{}, error) {
|
||||||
if st, ok := j.l.actives[dgst]; ok {
|
if st, ok := j.l.actives[dgst]; ok {
|
||||||
if vtx, ok := st.jobs[j]; ok {
|
if vtx, ok := st.jobs[j]; ok {
|
||||||
|
@ -204,7 +206,7 @@ func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*Input,
|
||||||
}
|
}
|
||||||
for i, input := range pbOp.Inputs {
|
for i, input := range pbOp.Inputs {
|
||||||
if inputMetadata := def.Metadata[input.Digest]; inputMetadata.IgnoreCache {
|
if inputMetadata := def.Metadata[input.Digest]; inputMetadata.IgnoreCache {
|
||||||
k, err := s.CacheKey(ctx, Index(i))
|
k, err := s.CacheKey(ctx, solver.Index(i))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -225,7 +227,7 @@ func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*Input,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Input{Vertex: vtx.(*vertex), Index: idx}, nil
|
return &solver.Input{Vertex: vtx.(*vertex), Index: solver.Index(idx)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *job) discard() {
|
func (j *job) discard() {
|
||||||
|
@ -253,12 +255,12 @@ func (j *job) getSolver(dgst digest.Digest) (VertexSolver, error) {
|
||||||
return st.solver, nil
|
return st.solver, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *job) getRef(ctx context.Context, v *vertex, index Index) (Reference, error) {
|
func (j *job) getRef(ctx context.Context, cv client.Vertex, index solver.Index) (solver.Ref, error) {
|
||||||
s, err := j.getSolver(v.Digest())
|
s, err := j.getSolver(cv.Digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ref, err := getRef(ctx, s, v, index, j.cache)
|
ref, err := getRef(ctx, s, cv, index, j.cache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -266,15 +268,15 @@ func (j *job) getRef(ctx context.Context, v *vertex, index Index) (Reference, er
|
||||||
return ref, nil
|
return ref, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *job) keepCacheRef(s VertexSolver, index Index, ref Reference) {
|
func (j *job) keepCacheRef(s VertexSolver, index solver.Index, ref solver.Ref) {
|
||||||
immutable, ok := toImmutableRef(ref)
|
immutable, ok := reference.ToImmutableRef(ref)
|
||||||
if ok {
|
if ok {
|
||||||
j.cached[immutable.ID()] = &cacheRecord{s, index, ref}
|
j.cached[immutable.ID()] = &cacheRecord{s, index, ref}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *job) cacheExporter(ref Reference) (CacheExporter, error) {
|
func (j *job) cacheExporter(ref solver.Ref) (CacheExporter, error) {
|
||||||
immutable, ok := toImmutableRef(ref)
|
immutable, ok := reference.ToImmutableRef(ref)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.Errorf("invalid reference")
|
return nil, errors.Errorf("invalid reference")
|
||||||
}
|
}
|
||||||
|
@ -285,7 +287,7 @@ func (j *job) cacheExporter(ref Reference) (CacheExporter, error) {
|
||||||
return cr.Cache(cr.index, cr.ref), nil
|
return cr.Cache(cr.index, cr.ref), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRef(ctx context.Context, s VertexSolver, v *vertex, index Index, cache instructioncache.InstructionCache) (Reference, error) {
|
func getRef(ctx context.Context, s VertexSolver, cv client.Vertex, index solver.Index, cache instructioncache.InstructionCache) (solver.Ref, error) {
|
||||||
k, err := s.CacheKey(ctx, index)
|
k, err := s.CacheKey(ctx, index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -295,8 +297,8 @@ func getRef(ctx context.Context, s VertexSolver, v *vertex, index Index, cache i
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if ref != nil {
|
if ref != nil {
|
||||||
markCached(ctx, v.clientVertex)
|
markCached(ctx, cv)
|
||||||
return ref.(Reference), nil
|
return ref.(solver.Ref), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ev, err := s.OutputEvaluator(index)
|
ev, err := s.OutputEvaluator(index)
|
||||||
|
@ -316,8 +318,8 @@ func getRef(ctx context.Context, s VertexSolver, v *vertex, index Index, cache i
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if ref != nil {
|
if ref != nil {
|
||||||
markCached(ctx, v.clientVertex)
|
markCached(ctx, cv)
|
||||||
return ref.(Reference), nil
|
return ref.(solver.Ref), nil
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
|
@ -6,6 +6,8 @@ import (
|
||||||
"github.com/moby/buildkit/cache"
|
"github.com/moby/buildkit/cache"
|
||||||
"github.com/moby/buildkit/executor"
|
"github.com/moby/buildkit/executor"
|
||||||
"github.com/moby/buildkit/frontend"
|
"github.com/moby/buildkit/frontend"
|
||||||
|
solver "github.com/moby/buildkit/solver"
|
||||||
|
"github.com/moby/buildkit/solver/reference"
|
||||||
"github.com/moby/buildkit/worker"
|
"github.com/moby/buildkit/worker"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -37,7 +39,7 @@ func (s *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (cache
|
||||||
return nil, nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ref, exp, err := s.solve(ctx, s.job, SolveRequest{
|
ref, exp, err := s.solve(ctx, s.job, solver.SolveRequest{
|
||||||
Definition: req.Definition,
|
Definition: req.Definition,
|
||||||
Frontend: f,
|
Frontend: f,
|
||||||
FrontendOpt: req.FrontendOpt,
|
FrontendOpt: req.FrontendOpt,
|
||||||
|
@ -45,7 +47,7 @@ func (s *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (cache
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
immutable, ok := toImmutableRef(ref)
|
immutable, ok := reference.ToImmutableRef(ref)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, errors.Errorf("invalid reference for exporting: %T", ref)
|
return nil, nil, errors.Errorf("invalid reference for exporting: %T", ref)
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package solver
|
package llbop
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
@ -7,21 +7,27 @@ import (
|
||||||
"github.com/containerd/containerd/fs"
|
"github.com/containerd/containerd/fs"
|
||||||
"github.com/moby/buildkit/client/llb"
|
"github.com/moby/buildkit/client/llb"
|
||||||
"github.com/moby/buildkit/snapshot"
|
"github.com/moby/buildkit/snapshot"
|
||||||
|
solver "github.com/moby/buildkit/solver"
|
||||||
"github.com/moby/buildkit/solver/pb"
|
"github.com/moby/buildkit/solver/pb"
|
||||||
|
"github.com/moby/buildkit/solver/reference"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type SubBuilder interface {
|
||||||
|
SubBuild(ctx context.Context, dgst digest.Digest, req solver.SolveRequest) (solver.Ref, error)
|
||||||
|
}
|
||||||
|
|
||||||
const buildCacheType = "buildkit.build.v0"
|
const buildCacheType = "buildkit.build.v0"
|
||||||
|
|
||||||
type buildOp struct {
|
type buildOp struct {
|
||||||
op *pb.BuildOp
|
op *pb.BuildOp
|
||||||
s *Solver
|
s SubBuilder
|
||||||
v Vertex
|
v solver.Vertex
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBuildOp(v Vertex, op *pb.Op_Build, s *Solver) (Op, error) {
|
func NewBuildOp(v solver.Vertex, op *pb.Op_Build, s SubBuilder) (solver.Op, error) {
|
||||||
return &buildOp{
|
return &buildOp{
|
||||||
op: op.Build,
|
op: op.Build,
|
||||||
s: s,
|
s: s,
|
||||||
|
@ -43,7 +49,7 @@ func (b *buildOp) CacheKey(ctx context.Context) (digest.Digest, error) {
|
||||||
return digest.FromBytes(dt), nil
|
return digest.FromBytes(dt), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *buildOp) Run(ctx context.Context, inputs []Reference) (outputs []Reference, retErr error) {
|
func (b *buildOp) Run(ctx context.Context, inputs []solver.Ref) (outputs []solver.Ref, retErr error) {
|
||||||
if b.op.Builder != pb.LLBBuilder {
|
if b.op.Builder != pb.LLBBuilder {
|
||||||
return nil, errors.Errorf("only llb builder is currently allowed")
|
return nil, errors.Errorf("only llb builder is currently allowed")
|
||||||
}
|
}
|
||||||
|
@ -60,7 +66,7 @@ func (b *buildOp) Run(ctx context.Context, inputs []Reference) (outputs []Refere
|
||||||
}
|
}
|
||||||
inp := inputs[i]
|
inp := inputs[i]
|
||||||
|
|
||||||
ref, ok := toImmutableRef(inp)
|
ref, ok := reference.ToImmutableRef(inp)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.Errorf("invalid reference for build %T", inp)
|
return nil, errors.Errorf("invalid reference for build %T", inp)
|
||||||
}
|
}
|
||||||
|
@ -107,14 +113,14 @@ func (b *buildOp) Run(ctx context.Context, inputs []Reference) (outputs []Refere
|
||||||
lm.Unmount()
|
lm.Unmount()
|
||||||
lm = nil
|
lm = nil
|
||||||
|
|
||||||
newref, err := b.s.subBuild(ctx, b.v.Digest(), SolveRequest{
|
newref, err := b.s.SubBuild(ctx, b.v.Digest(), solver.SolveRequest{
|
||||||
Definition: def.ToPB(),
|
Definition: def.ToPB(),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return []Reference{newref}, err
|
return []solver.Ref{newref}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *buildOp) ContentMask(context.Context) (digest.Digest, [][]string, error) {
|
func (b *buildOp) ContentMask(context.Context) (digest.Digest, [][]string, error) {
|
|
@ -1,4 +1,4 @@
|
||||||
package solver
|
package llbop
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
@ -10,7 +10,9 @@ import (
|
||||||
|
|
||||||
"github.com/moby/buildkit/cache"
|
"github.com/moby/buildkit/cache"
|
||||||
"github.com/moby/buildkit/executor"
|
"github.com/moby/buildkit/executor"
|
||||||
|
"github.com/moby/buildkit/solver"
|
||||||
"github.com/moby/buildkit/solver/pb"
|
"github.com/moby/buildkit/solver/pb"
|
||||||
|
"github.com/moby/buildkit/solver/reference"
|
||||||
"github.com/moby/buildkit/util/progress/logs"
|
"github.com/moby/buildkit/util/progress/logs"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -26,7 +28,7 @@ type execOp struct {
|
||||||
numInputs int
|
numInputs int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newExecOp(v Vertex, op *pb.Op_Exec, cm cache.Manager, exec executor.Executor) (Op, error) {
|
func NewExecOp(v solver.Vertex, op *pb.Op_Exec, cm cache.Manager, exec executor.Executor) (solver.Op, error) {
|
||||||
return &execOp{
|
return &execOp{
|
||||||
op: op.Exec,
|
op: op.Exec,
|
||||||
cm: cm,
|
cm: cm,
|
||||||
|
@ -54,9 +56,9 @@ func (e *execOp) CacheKey(ctx context.Context) (digest.Digest, error) {
|
||||||
return digest.FromBytes(dt), nil
|
return digest.FromBytes(dt), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, error) {
|
func (e *execOp) Run(ctx context.Context, inputs []solver.Ref) ([]solver.Ref, error) {
|
||||||
var mounts []executor.Mount
|
var mounts []executor.Mount
|
||||||
var outputs []Reference
|
var outputs []solver.Ref
|
||||||
var root cache.Mountable
|
var root cache.Mountable
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -76,7 +78,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
|
||||||
}
|
}
|
||||||
inp := inputs[int(m.Input)]
|
inp := inputs[int(m.Input)]
|
||||||
var ok bool
|
var ok bool
|
||||||
ref, ok = toImmutableRef(inp)
|
ref, ok = reference.ToImmutableRef(inp)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.Errorf("invalid reference for exec %T", inputs[int(m.Input)])
|
return nil, errors.Errorf("invalid reference for exec %T", inputs[int(m.Input)])
|
||||||
}
|
}
|
||||||
|
@ -84,7 +86,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
|
||||||
}
|
}
|
||||||
if m.Output != pb.SkipOutput {
|
if m.Output != pb.SkipOutput {
|
||||||
if m.Readonly && ref != nil && m.Dest != pb.RootMount { // exclude read-only rootfs
|
if m.Readonly && ref != nil && m.Dest != pb.RootMount { // exclude read-only rootfs
|
||||||
outputs = append(outputs, newSharedRef(ref).Clone())
|
outputs = append(outputs, reference.NewSharedRef(ref).Clone())
|
||||||
} else {
|
} else {
|
||||||
active, err := e.cm.New(ctx, ref, cache.WithDescription(fmt.Sprintf("mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " ")))) // TODO: should be method
|
active, err := e.cm.New(ctx, ref, cache.WithDescription(fmt.Sprintf("mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " ")))) // TODO: should be method
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -120,7 +122,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
|
||||||
return nil, errors.Wrapf(err, "executor failed running %v", meta.Args)
|
return nil, errors.Wrapf(err, "executor failed running %v", meta.Args)
|
||||||
}
|
}
|
||||||
|
|
||||||
refs := []Reference{}
|
refs := []solver.Ref{}
|
||||||
for i, o := range outputs {
|
for i, o := range outputs {
|
||||||
if mutable, ok := o.(cache.MutableRef); ok {
|
if mutable, ok := o.(cache.MutableRef); ok {
|
||||||
ref, err := mutable.Commit(ctx)
|
ref, err := mutable.Commit(ctx)
|
|
@ -1,8 +1,9 @@
|
||||||
package solver
|
package llbop
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/moby/buildkit/solver"
|
||||||
"github.com/moby/buildkit/solver/pb"
|
"github.com/moby/buildkit/solver/pb"
|
||||||
"github.com/moby/buildkit/source"
|
"github.com/moby/buildkit/source"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
|
@ -18,7 +19,7 @@ type sourceOp struct {
|
||||||
src source.SourceInstance
|
src source.SourceInstance
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSourceOp(_ Vertex, op *pb.Op_Source, sm *source.Manager) (Op, error) {
|
func NewSourceOp(_ solver.Vertex, op *pb.Op_Source, sm *source.Manager) (solver.Op, error) {
|
||||||
return &sourceOp{
|
return &sourceOp{
|
||||||
op: op,
|
op: op,
|
||||||
sm: sm,
|
sm: sm,
|
||||||
|
@ -55,7 +56,7 @@ func (s *sourceOp) CacheKey(ctx context.Context) (digest.Digest, error) {
|
||||||
return digest.FromBytes([]byte(sourceCacheType + ":" + k)), nil
|
return digest.FromBytes([]byte(sourceCacheType + ":" + k)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) {
|
func (s *sourceOp) Run(ctx context.Context, _ []solver.Ref) ([]solver.Ref, error) {
|
||||||
src, err := s.instance(ctx)
|
src, err := s.instance(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -64,7 +65,7 @@ func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return []Reference{ref}, nil
|
return []solver.Ref{ref}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sourceOp) ContentMask(context.Context) (digest.Digest, [][]string, error) {
|
func (s *sourceOp) ContentMask(context.Context) (digest.Digest, [][]string, error) {
|
|
@ -0,0 +1,38 @@
|
||||||
|
package solver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/moby/buildkit/frontend"
|
||||||
|
solver "github.com/moby/buildkit/solver"
|
||||||
|
"github.com/moby/buildkit/solver/pb"
|
||||||
|
"github.com/moby/buildkit/solver/solver/llbop"
|
||||||
|
"github.com/moby/buildkit/worker"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DetermineVertexWorker determines worker for a vertex.
|
||||||
|
// Currently, constraint is just ignored.
|
||||||
|
// Also we need to track the workers of the inputs.
|
||||||
|
func DetermineVertexWorker(wc *worker.Controller, v solver.Vertex) (*worker.Worker, error) {
|
||||||
|
// TODO: multiworker
|
||||||
|
return wc.GetDefault()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLLBOpSolver(wc *worker.Controller, frontends map[string]frontend.Frontend) solver.Solver {
|
||||||
|
var s *Solver
|
||||||
|
s = New(func(v solver.Vertex) (solver.Op, error) {
|
||||||
|
w, err := DetermineVertexWorker(wc, v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
switch op := v.Sys().(type) {
|
||||||
|
case *pb.Op_Source:
|
||||||
|
return llbop.NewSourceOp(v, op, w.SourceManager)
|
||||||
|
case *pb.Op_Exec:
|
||||||
|
return llbop.NewExecOp(v, op, w.CacheManager, w.Executor)
|
||||||
|
case *pb.Op_Build:
|
||||||
|
return llbop.NewBuildOp(v, op, s)
|
||||||
|
default:
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}, wc, DetermineVertexWorker, frontends)
|
||||||
|
return s
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package solver
|
||||||
import (
|
import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/moby/buildkit/solver"
|
||||||
"github.com/moby/buildkit/solver/pb"
|
"github.com/moby/buildkit/solver/pb"
|
||||||
"github.com/moby/buildkit/source"
|
"github.com/moby/buildkit/source"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
|
@ -16,18 +17,18 @@ func newVertex(dgst digest.Digest, op *pb.Op, opMeta *pb.OpMetadata, load func(d
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
vtx.inputs = append(vtx.inputs, &input{index: Index(in.Index), vertex: sub.(*vertex)})
|
vtx.inputs = append(vtx.inputs, &input{index: solver.Index(in.Index), vertex: sub.(*vertex)})
|
||||||
}
|
}
|
||||||
vtx.initClientVertex()
|
vtx.initClientVertex()
|
||||||
return vtx, nil
|
return vtx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func toInternalVertex(v Vertex) *vertex {
|
func toInternalVertex(v solver.Vertex) *vertex {
|
||||||
cache := make(map[digest.Digest]*vertex)
|
cache := make(map[digest.Digest]*vertex)
|
||||||
return loadInternalVertexHelper(v, cache)
|
return loadInternalVertexHelper(v, cache)
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadInternalVertexHelper(v Vertex, cache map[digest.Digest]*vertex) *vertex {
|
func loadInternalVertexHelper(v solver.Vertex, cache map[digest.Digest]*vertex) *vertex {
|
||||||
if v, ok := cache[v.Digest()]; ok {
|
if v, ok := cache[v.Digest()]; ok {
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
@ -43,7 +44,7 @@ func loadInternalVertexHelper(v Vertex, cache map[digest.Digest]*vertex) *vertex
|
||||||
|
|
||||||
// loadLLB loads LLB.
|
// loadLLB loads LLB.
|
||||||
// fn is executed sequentially.
|
// fn is executed sequentially.
|
||||||
func loadLLB(def *pb.Definition, fn func(digest.Digest, *pb.Op, func(digest.Digest) (interface{}, error)) (interface{}, error)) (interface{}, Index, error) {
|
func loadLLB(def *pb.Definition, fn func(digest.Digest, *pb.Op, func(digest.Digest) (interface{}, error)) (interface{}, error)) (interface{}, pb.OutputIndex, error) {
|
||||||
if len(def.Def) == 0 {
|
if len(def.Def) == 0 {
|
||||||
return nil, 0, errors.New("invalid empty definition")
|
return nil, 0, errors.New("invalid empty definition")
|
||||||
}
|
}
|
||||||
|
@ -81,7 +82,7 @@ func loadLLB(def *pb.Definition, fn func(digest.Digest, *pb.Op, func(digest.Dige
|
||||||
}
|
}
|
||||||
|
|
||||||
v, err := rec(dgst)
|
v, err := rec(dgst)
|
||||||
return v, Index(lastOp.Inputs[0].Index), err
|
return v, lastOp.Inputs[0].Index, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func llbOpName(op *pb.Op) string {
|
func llbOpName(op *pb.Op) string {
|
|
@ -0,0 +1,729 @@
|
||||||
|
package solver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/moby/buildkit/cache"
|
||||||
|
"github.com/moby/buildkit/cache/cacheimport"
|
||||||
|
"github.com/moby/buildkit/cache/contenthash"
|
||||||
|
"github.com/moby/buildkit/cache/instructioncache"
|
||||||
|
"github.com/moby/buildkit/client"
|
||||||
|
"github.com/moby/buildkit/frontend"
|
||||||
|
solver "github.com/moby/buildkit/solver"
|
||||||
|
"github.com/moby/buildkit/solver/reference"
|
||||||
|
"github.com/moby/buildkit/util/bgfunc"
|
||||||
|
"github.com/moby/buildkit/util/progress"
|
||||||
|
"github.com/moby/buildkit/worker"
|
||||||
|
digest "github.com/opencontainers/go-digest"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FIXME: Also we need to track the workers of the inputs.
|
||||||
|
type VertexWorkerDeterminer func(wc *worker.Controller, v solver.Vertex) (*worker.Worker, error)
|
||||||
|
|
||||||
|
// ResolveOpFunc finds an Op implementation for a vertex
|
||||||
|
type ResolveOpFunc func(solver.Vertex) (solver.Op, error)
|
||||||
|
|
||||||
|
type Solver struct {
|
||||||
|
resolve ResolveOpFunc
|
||||||
|
jobs *jobList
|
||||||
|
workerController *worker.Controller
|
||||||
|
determineVertexWorker VertexWorkerDeterminer
|
||||||
|
frontends map[string]frontend.Frontend
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(resolve ResolveOpFunc, wc *worker.Controller, vwd VertexWorkerDeterminer, f map[string]frontend.Frontend) *Solver {
|
||||||
|
return &Solver{resolve: resolve, jobs: newJobList(), workerController: wc, determineVertexWorker: vwd, frontends: f}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Solver) solve(ctx context.Context, j *job, req solver.SolveRequest) (solver.Ref, map[string][]byte, error) {
|
||||||
|
if req.Definition == nil {
|
||||||
|
if req.Frontend == nil {
|
||||||
|
return nil, nil, errors.Errorf("invalid request: no definition nor frontend")
|
||||||
|
}
|
||||||
|
return req.Frontend.Solve(ctx, s.llbBridge(j), req.FrontendOpt)
|
||||||
|
}
|
||||||
|
|
||||||
|
inp, err := j.load(req.Definition, s.resolve)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
ref, err := j.getRef(ctx, inp.Vertex.(*vertex).clientVertex, inp.Index)
|
||||||
|
return ref, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Solver) llbBridge(j *job) *llbBridge {
|
||||||
|
// FIXME(AkihiroSuda): make sure worker implements interfaces required by llbBridge
|
||||||
|
worker, err := s.workerController.GetDefault()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return &llbBridge{job: j, Solver: s, worker: worker}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Solver) Solve(ctx context.Context, id string, req solver.SolveRequest) error {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
pr, ctx, closeProgressWriter := progress.NewContext(ctx)
|
||||||
|
defer closeProgressWriter()
|
||||||
|
|
||||||
|
// TODO: multiworker
|
||||||
|
defaultWorker, err := s.workerController.GetDefault()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if importRef := req.ImportCacheRef; importRef != "" {
|
||||||
|
cache, err := defaultWorker.CacheImporter.Import(ctx, importRef)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defaultWorker.InstructionCache = instructioncache.Union(defaultWorker.InstructionCache, cache)
|
||||||
|
}
|
||||||
|
|
||||||
|
// register a build job. vertex needs to be loaded to a job to run
|
||||||
|
ctx, j, err := s.jobs.new(ctx, id, pr, defaultWorker.InstructionCache)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ref, exporterOpt, err := s.solve(ctx, j, req)
|
||||||
|
defer j.discard()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if ref != nil {
|
||||||
|
go ref.Release(context.TODO())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var immutable cache.ImmutableRef
|
||||||
|
if ref != nil {
|
||||||
|
var ok bool
|
||||||
|
immutable, ok = reference.ToImmutableRef(ref)
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("invalid reference for exporting: %T", ref)
|
||||||
|
}
|
||||||
|
if err := immutable.Finalize(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if exp := req.Exporter; exp != nil {
|
||||||
|
if err := inVertexContext(ctx, exp.Name(), func(ctx context.Context) error {
|
||||||
|
return exp.Export(ctx, immutable, exporterOpt)
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if exportName := req.ExportCacheRef; exportName != "" {
|
||||||
|
if err := inVertexContext(ctx, "exporting build cache", func(ctx context.Context) error {
|
||||||
|
cache, err := j.cacheExporter(ref)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
records, err := cache.Export(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: multiworker
|
||||||
|
return defaultWorker.CacheExporter.Export(ctx, records, exportName)
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error {
|
||||||
|
j, err := s.jobs.get(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer close(statusChan)
|
||||||
|
return j.pipe(ctx, statusChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Solver) SubBuild(ctx context.Context, dgst digest.Digest, req solver.SolveRequest) (solver.Ref, error) {
|
||||||
|
jl := s.jobs
|
||||||
|
jl.mu.Lock()
|
||||||
|
st, ok := jl.actives[dgst]
|
||||||
|
if !ok {
|
||||||
|
jl.mu.Unlock()
|
||||||
|
return nil, errors.Errorf("no such parent vertex: %v", dgst)
|
||||||
|
}
|
||||||
|
|
||||||
|
var inp *solver.Input
|
||||||
|
for j := range st.jobs {
|
||||||
|
var err error
|
||||||
|
inp, err = j.loadInternal(req.Definition, s.resolve)
|
||||||
|
if err != nil {
|
||||||
|
jl.mu.Unlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
st = jl.actives[inp.Vertex.Digest()]
|
||||||
|
jl.mu.Unlock()
|
||||||
|
|
||||||
|
w, err := s.determineVertexWorker(s.workerController, inp.Vertex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return getRef(ctx, st.solver, inp.Vertex.(*vertex).clientVertex, inp.Index, w.InstructionCache) // TODO: combine to pass single input // TODO: export cache for subbuilds
|
||||||
|
}
|
||||||
|
|
||||||
|
type VertexSolver interface {
|
||||||
|
CacheKey(ctx context.Context, index solver.Index) (digest.Digest, error)
|
||||||
|
OutputEvaluator(solver.Index) (VertexEvaluator, error)
|
||||||
|
Release() error
|
||||||
|
Cache(solver.Index, solver.Ref) CacheExporter
|
||||||
|
}
|
||||||
|
|
||||||
|
type vertexInput struct {
|
||||||
|
solver VertexSolver
|
||||||
|
ev VertexEvaluator
|
||||||
|
cacheKeys []digest.Digest
|
||||||
|
ref solver.Ref
|
||||||
|
}
|
||||||
|
|
||||||
|
type vertexSolver struct {
|
||||||
|
inputs []*vertexInput
|
||||||
|
v *vertex
|
||||||
|
cv client.Vertex
|
||||||
|
op solver.Op
|
||||||
|
cache instructioncache.InstructionCache
|
||||||
|
refs []*reference.SharedRef
|
||||||
|
f *bgfunc.F
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
baseKey digest.Digest
|
||||||
|
mu sync.Mutex
|
||||||
|
results []digest.Digest
|
||||||
|
markCachedOnce sync.Once
|
||||||
|
contentKey digest.Digest
|
||||||
|
|
||||||
|
signal *signal // used to notify that there are callers who need more data
|
||||||
|
}
|
||||||
|
|
||||||
|
type resolveF func(digest.Digest) (VertexSolver, error)
|
||||||
|
|
||||||
|
func newVertexSolver(ctx context.Context, v *vertex, op solver.Op, c instructioncache.InstructionCache, resolve resolveF) (*vertexSolver, error) {
|
||||||
|
inputs := make([]*vertexInput, len(v.inputs))
|
||||||
|
for i, in := range v.inputs {
|
||||||
|
s, err := resolve(in.vertex.digest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ev, err := s.OutputEvaluator(in.index)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ev.Cancel()
|
||||||
|
inputs[i] = &vertexInput{
|
||||||
|
solver: s,
|
||||||
|
ev: ev,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &vertexSolver{
|
||||||
|
ctx: ctx,
|
||||||
|
inputs: inputs,
|
||||||
|
v: v,
|
||||||
|
cv: v.clientVertex,
|
||||||
|
op: op,
|
||||||
|
cache: c,
|
||||||
|
signal: newSignaller(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func markCached(ctx context.Context, cv client.Vertex) {
|
||||||
|
pw, _, _ := progress.FromContext(ctx)
|
||||||
|
defer pw.Close()
|
||||||
|
|
||||||
|
if cv.Started == nil {
|
||||||
|
now := time.Now()
|
||||||
|
cv.Started = &now
|
||||||
|
cv.Completed = &now
|
||||||
|
cv.Cached = true
|
||||||
|
}
|
||||||
|
pw.Write(cv.Digest.String(), cv)
|
||||||
|
}
|
||||||
|
|
||||||
|
type CacheExporter interface {
|
||||||
|
Export(context.Context) ([]cacheimport.CacheRecord, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *vertexSolver) Cache(index solver.Index, ref solver.Ref) CacheExporter {
|
||||||
|
return &cacheExporter{vertexSolver: vs, index: index, ref: ref}
|
||||||
|
}
|
||||||
|
|
||||||
|
type cacheExporter struct {
|
||||||
|
*vertexSolver
|
||||||
|
index solver.Index
|
||||||
|
ref solver.Ref
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ce *cacheExporter) Export(ctx context.Context) ([]cacheimport.CacheRecord, error) {
|
||||||
|
return ce.vertexSolver.Export(ctx, ce.index, ce.ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *vertexSolver) Export(ctx context.Context, index solver.Index, ref solver.Ref) ([]cacheimport.CacheRecord, error) {
|
||||||
|
mp := map[digest.Digest]cacheimport.CacheRecord{}
|
||||||
|
if err := vs.appendInputCache(ctx, mp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dgst, err := vs.mainCacheKey()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
immutable, ok := reference.ToImmutableRef(ref)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("invalid reference")
|
||||||
|
}
|
||||||
|
dgst = cacheKeyForIndex(dgst, index)
|
||||||
|
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: immutable}
|
||||||
|
out := make([]cacheimport.CacheRecord, 0, len(mp))
|
||||||
|
for _, cr := range mp {
|
||||||
|
out = append(out, cr)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *vertexSolver) appendInputCache(ctx context.Context, mp map[digest.Digest]cacheimport.CacheRecord) error {
|
||||||
|
for i, inp := range vs.inputs {
|
||||||
|
mainDgst, err := inp.solver.(*vertexSolver).mainCacheKey()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
dgst := cacheKeyForIndex(mainDgst, vs.v.inputs[i].index)
|
||||||
|
if cr, ok := mp[dgst]; !ok || (cr.Reference == nil && inp.ref != nil) {
|
||||||
|
if err := inp.solver.(*vertexSolver).appendInputCache(ctx, mp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if inp.ref != nil && len(inp.solver.(*vertexSolver).inputs) > 0 { // Ignore pushing the refs for sources
|
||||||
|
ref, ok := reference.ToImmutableRef(inp.ref)
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("invalid reference")
|
||||||
|
}
|
||||||
|
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst, Reference: ref}
|
||||||
|
} else {
|
||||||
|
mp[dgst] = cacheimport.CacheRecord{CacheKey: dgst}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ck := vs.contentKey; ck != "" {
|
||||||
|
mainDgst, err := vs.mainCacheKey()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
mp[ck] = cacheimport.CacheRecord{CacheKey: mainDgst, ContentKey: ck}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *vertexSolver) CacheKey(ctx context.Context, index solver.Index) (digest.Digest, error) {
|
||||||
|
vs.mu.Lock()
|
||||||
|
defer vs.mu.Unlock()
|
||||||
|
if vs.baseKey == "" {
|
||||||
|
eg, ctx := errgroup.WithContext(vs.ctx)
|
||||||
|
for i := range vs.inputs {
|
||||||
|
func(i int) {
|
||||||
|
eg.Go(func() error {
|
||||||
|
k, err := vs.inputs[i].solver.CacheKey(ctx, vs.v.inputs[i].index)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
vs.inputs[i].cacheKeys = append(vs.inputs[i].cacheKeys, k)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
var dgst digest.Digest
|
||||||
|
eg.Go(func() error {
|
||||||
|
var err error
|
||||||
|
dgst, err = vs.op.CacheKey(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
vs.baseKey = dgst
|
||||||
|
}
|
||||||
|
|
||||||
|
k, err := vs.lastCacheKey()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return cacheKeyForIndex(k, index), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *vertexSolver) lastCacheKey() (digest.Digest, error) {
|
||||||
|
return vs.currentCacheKey(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *vertexSolver) mainCacheKey() (digest.Digest, error) {
|
||||||
|
return vs.currentCacheKey(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *vertexSolver) currentCacheKey(last bool) (digest.Digest, error) {
|
||||||
|
inputKeys := make([]digest.Digest, len(vs.inputs))
|
||||||
|
for i, inp := range vs.inputs {
|
||||||
|
if len(inp.cacheKeys) == 0 {
|
||||||
|
return "", errors.Errorf("inputs not processed")
|
||||||
|
}
|
||||||
|
if last {
|
||||||
|
inputKeys[i] = inp.cacheKeys[len(inp.cacheKeys)-1]
|
||||||
|
} else {
|
||||||
|
inputKeys[i] = inp.cacheKeys[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dt, err := json.Marshal(struct {
|
||||||
|
Inputs []digest.Digest
|
||||||
|
CacheKey digest.Digest
|
||||||
|
}{Inputs: inputKeys, CacheKey: vs.baseKey})
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return digest.FromBytes(dt), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *vertexSolver) OutputEvaluator(index solver.Index) (VertexEvaluator, error) {
|
||||||
|
if vs.f == nil {
|
||||||
|
f, err := bgfunc.New(vs.ctx, vs.run)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
vs.f = f
|
||||||
|
}
|
||||||
|
c := vs.f.NewCaller()
|
||||||
|
ve := &vertexEvaluator{vertexSolver: vs, c: c, index: index}
|
||||||
|
return ve, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *vertexSolver) Release() error {
|
||||||
|
for _, inp := range vs.inputs {
|
||||||
|
if inp.ref != nil {
|
||||||
|
inp.ref.Release(context.TODO())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if vs.refs != nil {
|
||||||
|
for _, r := range vs.refs {
|
||||||
|
r.Release(context.TODO())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is called by the bgfunc concurrency primitive. This function may be
|
||||||
|
// called multiple times but never in parallal. Repeated calls should make an
|
||||||
|
// effort to continue from previous state. Lock vs.mu to syncronize data to the
|
||||||
|
// callers. Signal parameter can be used to notify callers that new data is
|
||||||
|
// available without returning from the function.
|
||||||
|
func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
|
||||||
|
vs.mu.Lock()
|
||||||
|
if vs.refs != nil {
|
||||||
|
vs.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
vs.mu.Unlock()
|
||||||
|
|
||||||
|
waitFirst := vs.signal.Wait()
|
||||||
|
waitRun := waitFirst
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-waitFirst:
|
||||||
|
}
|
||||||
|
|
||||||
|
// this is where you lookup the cache keys that were successfully probed
|
||||||
|
|
||||||
|
eg, ctx2 := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
|
// process all the inputs
|
||||||
|
for i, inp := range vs.inputs {
|
||||||
|
if inp.ref == nil {
|
||||||
|
func(i int) {
|
||||||
|
eg.Go(func() error {
|
||||||
|
inp := vs.inputs[i]
|
||||||
|
defer inp.ev.Cancel()
|
||||||
|
|
||||||
|
waitNext := waitFirst
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx2.Done():
|
||||||
|
return ctx2.Err()
|
||||||
|
case <-waitNext:
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if current cache key is in cache
|
||||||
|
if len(inp.cacheKeys) > 0 {
|
||||||
|
ref, err := vs.cache.Lookup(ctx2, inp.cacheKeys[len(inp.cacheKeys)-1], inp.solver.(*vertexSolver).v.Name())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if ref != nil {
|
||||||
|
inp.ref = ref.(solver.Ref)
|
||||||
|
inp.solver.(*vertexSolver).markCachedOnce.Do(func() {
|
||||||
|
markCached(ctx, inp.solver.(*vertexSolver).cv)
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// evaluate next cachekey/reference for input
|
||||||
|
res, err := inp.ev.Next(ctx2)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if res == nil { // there is no more data coming
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if ref := res.Reference; ref != nil {
|
||||||
|
if ref, ok := reference.ToImmutableRef(ref); ok {
|
||||||
|
if !cache.HasCachePolicyRetain(ref) {
|
||||||
|
if err := cache.CachePolicyRetain(ref); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ref.Metadata().Commit()
|
||||||
|
}
|
||||||
|
inp.ref = ref
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only try matching cache if the cachekey for input is present
|
||||||
|
exists, err := vs.cache.Probe(ctx2, res.CacheKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if exists {
|
||||||
|
vs.mu.Lock()
|
||||||
|
inp.cacheKeys = append(inp.cacheKeys, res.CacheKey)
|
||||||
|
dgst, err := vs.lastCacheKey()
|
||||||
|
if err != nil {
|
||||||
|
vs.mu.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
vs.results = append(vs.results, dgst)
|
||||||
|
signal() // wake up callers
|
||||||
|
waitNext = vs.signal.Reset() // make sure we don't continue unless there are callers
|
||||||
|
waitRun = waitNext
|
||||||
|
vs.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find extra cache keys by content
|
||||||
|
inputRefs := make([]solver.Ref, len(vs.inputs))
|
||||||
|
lastInputKeys := make([]digest.Digest, len(vs.inputs))
|
||||||
|
for i := range vs.inputs {
|
||||||
|
inputRefs[i] = vs.inputs[i].ref
|
||||||
|
lastInputKeys[i] = vs.inputs[i].cacheKeys[len(vs.inputs[i].cacheKeys)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
dgst, inp, err := vs.op.ContentMask(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var contentKey digest.Digest
|
||||||
|
if dgst != "" {
|
||||||
|
contentKey, err = calculateContentHash(ctx, inputRefs, dgst, lastInputKeys, inp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
vs.contentKey = contentKey
|
||||||
|
|
||||||
|
var extraKeys []digest.Digest
|
||||||
|
cks, err := vs.cache.GetContentMapping(contentKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
extraKeys = append(extraKeys, cks...)
|
||||||
|
if len(extraKeys) > 0 {
|
||||||
|
vs.mu.Lock()
|
||||||
|
vs.results = append(vs.results, extraKeys...)
|
||||||
|
signal()
|
||||||
|
waitRun = vs.signal.Reset()
|
||||||
|
vs.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-waitRun:
|
||||||
|
}
|
||||||
|
|
||||||
|
// no cache hit. start evaluating the node
|
||||||
|
notifyStarted(ctx, &vs.cv)
|
||||||
|
defer func() {
|
||||||
|
notifyCompleted(ctx, &vs.cv, retErr)
|
||||||
|
}()
|
||||||
|
|
||||||
|
refs, err := vs.op.Run(ctx, inputRefs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sr := make([]*reference.SharedRef, len(refs))
|
||||||
|
for i, r := range refs {
|
||||||
|
sr[i] = reference.NewSharedRef(r)
|
||||||
|
}
|
||||||
|
vs.mu.Lock()
|
||||||
|
vs.refs = sr
|
||||||
|
vs.mu.Unlock()
|
||||||
|
|
||||||
|
// store the cacheKeys for current refs
|
||||||
|
if vs.cache != nil {
|
||||||
|
cacheKey, err := vs.lastCacheKey()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for i, ref := range refs {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r := reference.OriginRef(ref)
|
||||||
|
if err := vs.cache.Set(cacheKeyForIndex(cacheKey, solver.Index(i)), r); err != nil {
|
||||||
|
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if contentKey != "" {
|
||||||
|
if err := vs.cache.SetContentMapping(contentKey, cacheKey); err != nil {
|
||||||
|
logrus.Errorf("failed to save content mapping: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getInputContentHash(ctx context.Context, ref cache.ImmutableRef, selectors []string) (digest.Digest, error) {
|
||||||
|
out := make([]digest.Digest, 0, len(selectors))
|
||||||
|
for _, s := range selectors {
|
||||||
|
dgst, err := contenthash.Checksum(ctx, ref, s)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
out = append(out, dgst)
|
||||||
|
}
|
||||||
|
if len(out) == 1 {
|
||||||
|
return out[0], nil
|
||||||
|
}
|
||||||
|
dt, err := json.Marshal(out)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return digest.FromBytes(dt), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func calculateContentHash(ctx context.Context, refs []solver.Ref, mainDigest digest.Digest, inputs []digest.Digest, contentMap [][]string) (digest.Digest, error) {
|
||||||
|
dgsts := make([]digest.Digest, len(contentMap))
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
for i, sel := range contentMap {
|
||||||
|
if sel == nil {
|
||||||
|
dgsts[i] = inputs[i]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
func(i int) {
|
||||||
|
eg.Go(func() error {
|
||||||
|
ref, ok := reference.ToImmutableRef(refs[i])
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("invalid reference for exporting: %T", ref)
|
||||||
|
}
|
||||||
|
dgst, err := getInputContentHash(ctx, ref, contentMap[i])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
dgsts[i] = dgst
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
dt, err := json.Marshal(struct {
|
||||||
|
Main digest.Digest
|
||||||
|
Inputs []digest.Digest
|
||||||
|
}{
|
||||||
|
Main: mainDigest,
|
||||||
|
Inputs: dgsts,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return digest.FromBytes(dt), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type VertexEvaluator interface {
|
||||||
|
Next(context.Context) (*VertexResult, error)
|
||||||
|
Cancel() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type vertexEvaluator struct {
|
||||||
|
*vertexSolver
|
||||||
|
c *bgfunc.Caller
|
||||||
|
cursor int
|
||||||
|
index solver.Index
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ve *vertexEvaluator) Next(ctx context.Context) (*VertexResult, error) {
|
||||||
|
v, err := ve.c.Call(ctx, func() (interface{}, error) {
|
||||||
|
ve.mu.Lock()
|
||||||
|
defer ve.mu.Unlock()
|
||||||
|
if ve.refs != nil {
|
||||||
|
return &VertexResult{Reference: ve.refs[int(ve.index)].Clone()}, nil
|
||||||
|
}
|
||||||
|
if i := ve.cursor; i < len(ve.results) {
|
||||||
|
ve.cursor++
|
||||||
|
return &VertexResult{CacheKey: cacheKeyForIndex(ve.results[i], ve.index)}, nil
|
||||||
|
}
|
||||||
|
ve.signal.Signal()
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if v == nil {
|
||||||
|
return nil, nil // no more records are coming
|
||||||
|
}
|
||||||
|
return v.(*VertexResult), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ve *vertexEvaluator) Cancel() error {
|
||||||
|
return ve.c.Cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
type VertexResult struct {
|
||||||
|
CacheKey digest.Digest
|
||||||
|
Reference solver.Ref
|
||||||
|
}
|
||||||
|
|
||||||
|
func cacheKeyForIndex(dgst digest.Digest, index solver.Index) digest.Digest {
|
||||||
|
return digest.FromBytes([]byte(fmt.Sprintf("%s.%d", dgst, index)))
|
||||||
|
}
|
|
@ -0,0 +1,104 @@
|
||||||
|
package solver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/moby/buildkit/client"
|
||||||
|
"github.com/moby/buildkit/identity"
|
||||||
|
"github.com/moby/buildkit/solver"
|
||||||
|
"github.com/moby/buildkit/solver/pb"
|
||||||
|
"github.com/moby/buildkit/util/progress"
|
||||||
|
digest "github.com/opencontainers/go-digest"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type input struct {
|
||||||
|
index solver.Index
|
||||||
|
vertex *vertex
|
||||||
|
}
|
||||||
|
|
||||||
|
type vertex struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
sys interface{}
|
||||||
|
metadata *pb.OpMetadata
|
||||||
|
inputs []*input
|
||||||
|
err error
|
||||||
|
digest digest.Digest
|
||||||
|
clientVertex client.Vertex
|
||||||
|
name string
|
||||||
|
notifyMu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *vertex) initClientVertex() {
|
||||||
|
inputDigests := make([]digest.Digest, 0, len(v.inputs))
|
||||||
|
for _, inp := range v.inputs {
|
||||||
|
inputDigests = append(inputDigests, inp.vertex.Digest())
|
||||||
|
}
|
||||||
|
v.clientVertex = client.Vertex{
|
||||||
|
Inputs: inputDigests,
|
||||||
|
Name: v.Name(),
|
||||||
|
Digest: v.digest,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *vertex) Digest() digest.Digest {
|
||||||
|
return v.digest
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *vertex) Sys() interface{} {
|
||||||
|
return v.sys
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *vertex) Metadata() *pb.OpMetadata {
|
||||||
|
return v.metadata
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *vertex) Inputs() (inputs []solver.Input) {
|
||||||
|
inputs = make([]solver.Input, 0, len(v.inputs))
|
||||||
|
for _, i := range v.inputs {
|
||||||
|
inputs = append(inputs, solver.Input{i.index, i.vertex})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *vertex) Name() string {
|
||||||
|
return v.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func notifyStarted(ctx context.Context, v *client.Vertex) {
|
||||||
|
pw, _, _ := progress.FromContext(ctx)
|
||||||
|
defer pw.Close()
|
||||||
|
now := time.Now()
|
||||||
|
v.Started = &now
|
||||||
|
v.Completed = nil
|
||||||
|
pw.Write(v.Digest.String(), *v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func notifyCompleted(ctx context.Context, v *client.Vertex, err error) {
|
||||||
|
pw, _, _ := progress.FromContext(ctx)
|
||||||
|
defer pw.Close()
|
||||||
|
now := time.Now()
|
||||||
|
if v.Started == nil {
|
||||||
|
v.Started = &now
|
||||||
|
}
|
||||||
|
v.Completed = &now
|
||||||
|
v.Cached = false
|
||||||
|
if err != nil {
|
||||||
|
v.Error = err.Error()
|
||||||
|
}
|
||||||
|
pw.Write(v.Digest.String(), *v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error {
|
||||||
|
v := client.Vertex{
|
||||||
|
Digest: digest.FromBytes([]byte(identity.NewID())),
|
||||||
|
Name: name,
|
||||||
|
}
|
||||||
|
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
|
||||||
|
notifyStarted(ctx, &v)
|
||||||
|
defer pw.Close()
|
||||||
|
err := f(ctx)
|
||||||
|
notifyCompleted(ctx, &v, err)
|
||||||
|
return err
|
||||||
|
}
|
|
@ -1,15 +1,8 @@
|
||||||
package solver
|
package solver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/moby/buildkit/client"
|
|
||||||
"github.com/moby/buildkit/identity"
|
|
||||||
"github.com/moby/buildkit/solver/pb"
|
"github.com/moby/buildkit/solver/pb"
|
||||||
"github.com/moby/buildkit/util/progress"
|
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"golang.org/x/net/context"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Vertex is one node in the build graph
|
// Vertex is one node in the build graph
|
||||||
|
@ -33,93 +26,3 @@ type Input struct {
|
||||||
Index Index
|
Index Index
|
||||||
Vertex Vertex
|
Vertex Vertex
|
||||||
}
|
}
|
||||||
|
|
||||||
type input struct {
|
|
||||||
index Index
|
|
||||||
vertex *vertex
|
|
||||||
}
|
|
||||||
|
|
||||||
type vertex struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
sys interface{}
|
|
||||||
metadata *pb.OpMetadata
|
|
||||||
inputs []*input
|
|
||||||
err error
|
|
||||||
digest digest.Digest
|
|
||||||
clientVertex client.Vertex
|
|
||||||
name string
|
|
||||||
notifyMu sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *vertex) initClientVertex() {
|
|
||||||
inputDigests := make([]digest.Digest, 0, len(v.inputs))
|
|
||||||
for _, inp := range v.inputs {
|
|
||||||
inputDigests = append(inputDigests, inp.vertex.Digest())
|
|
||||||
}
|
|
||||||
v.clientVertex = client.Vertex{
|
|
||||||
Inputs: inputDigests,
|
|
||||||
Name: v.Name(),
|
|
||||||
Digest: v.digest,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *vertex) Digest() digest.Digest {
|
|
||||||
return v.digest
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *vertex) Sys() interface{} {
|
|
||||||
return v.sys
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *vertex) Metadata() *pb.OpMetadata {
|
|
||||||
return v.metadata
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *vertex) Inputs() (inputs []Input) {
|
|
||||||
inputs = make([]Input, 0, len(v.inputs))
|
|
||||||
for _, i := range v.inputs {
|
|
||||||
inputs = append(inputs, Input{i.index, i.vertex})
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *vertex) Name() string {
|
|
||||||
return v.name
|
|
||||||
}
|
|
||||||
|
|
||||||
func notifyStarted(ctx context.Context, v *client.Vertex) {
|
|
||||||
pw, _, _ := progress.FromContext(ctx)
|
|
||||||
defer pw.Close()
|
|
||||||
now := time.Now()
|
|
||||||
v.Started = &now
|
|
||||||
v.Completed = nil
|
|
||||||
pw.Write(v.Digest.String(), *v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func notifyCompleted(ctx context.Context, v *client.Vertex, err error) {
|
|
||||||
pw, _, _ := progress.FromContext(ctx)
|
|
||||||
defer pw.Close()
|
|
||||||
now := time.Now()
|
|
||||||
if v.Started == nil {
|
|
||||||
v.Started = &now
|
|
||||||
}
|
|
||||||
v.Completed = &now
|
|
||||||
v.Cached = false
|
|
||||||
if err != nil {
|
|
||||||
v.Error = err.Error()
|
|
||||||
}
|
|
||||||
pw.Write(v.Digest.String(), *v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error {
|
|
||||||
v := client.Vertex{
|
|
||||||
Digest: digest.FromBytes([]byte(identity.NewID())),
|
|
||||||
Name: name,
|
|
||||||
}
|
|
||||||
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
|
|
||||||
notifyStarted(ctx, &v)
|
|
||||||
defer pw.Close()
|
|
||||||
err := f(ctx)
|
|
||||||
notifyCompleted(ctx, &v, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue