Merge pull request #130 from tonistiigi/progress-fixes

solver: fix shared request progress and cancellation
docker-18.09
Akihiro Suda 2017-09-30 16:24:03 +09:00 committed by GitHub
commit 2d9bd79d61
14 changed files with 281 additions and 266 deletions

View File

@ -68,7 +68,6 @@ message Vertex {
google.protobuf.Timestamp started = 5 [(gogoproto.stdtime) = true ];
google.protobuf.Timestamp completed = 6 [(gogoproto.stdtime) = true ];
string error = 7; // typed errors?
string parent = 8 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
}
message VertexStatus {

2
cache/manager.go vendored
View File

@ -104,7 +104,7 @@ func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (
defer rec.mu.Unlock()
if rec.mutable {
if len(rec.refs) != 0 {
if rec.dead || len(rec.refs) != 0 {
return nil, errors.Wrapf(errLocked, "%s is locked", id)
}
if rec.equalImmutable != nil {

17
cache/refs.go vendored
View File

@ -8,6 +8,7 @@ import (
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
@ -44,6 +45,7 @@ type cacheRecord struct {
md *metadata.StorageItem
view string
viewMount []mount.Mount
dead bool
sizeG flightcontrol.Group
// size int64
@ -215,13 +217,18 @@ func (sr *cacheRecord) finalize(ctx context.Context) error {
if mutable == nil {
return nil
}
err := sr.cm.Snapshotter.Commit(ctx, sr.ID(), sr.equalMutable.ID())
err := sr.cm.Snapshotter.Commit(ctx, sr.ID(), mutable.ID())
if err != nil {
return errors.Wrapf(err, "failed to commit %s", sr.equalMutable.ID())
}
if err := sr.equalMutable.remove(ctx, false); err != nil {
return err
return errors.Wrapf(err, "failed to commit %s", mutable.ID())
}
mutable.dead = true
go func() {
sr.cm.mu.Lock()
defer sr.cm.mu.Unlock()
if err := mutable.remove(context.TODO(), false); err != nil {
logrus.Error(err)
}
}()
sr.equalMutable = nil
clearEqualMutable(sr.md)
return sr.md.Commit()

View File

@ -14,7 +14,6 @@ type Vertex struct {
Completed *time.Time
Cached bool
Error string
Parent digest.Digest
}
type VertexStatus struct {
@ -40,13 +39,3 @@ type SolveStatus struct {
Statuses []*VertexStatus
Logs []*VertexLog
}
//
// type VertexEvent struct {
// ID digest.Digest
// Vertex digest.Digest
// Name string
// Total int
// Current int
// Timestamp int64
// }

View File

@ -132,7 +132,6 @@ func (c *Client) Solve(ctx context.Context, r io.Reader, opt SolveOpt, statusCha
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
Parent: v.Parent,
})
}
for _, v := range resp.Statuses {

View File

@ -90,15 +90,6 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
}
}
var vertex solver.Vertex
if req.Frontend == "" {
v, err := solver.LoadLLB(req.Definition)
if err != nil {
return nil, errors.Wrap(err, "failed to load llb definition")
}
vertex = v
}
ctx = session.NewContext(ctx, req.Session)
var expi exporter.ExporterInstance
@ -114,7 +105,7 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
}
}
if err := c.solver.Solve(ctx, req.Ref, frontend, vertex, expi, req.FrontendAttrs); err != nil {
if err := c.solver.Solve(ctx, req.Ref, frontend, req.Definition, expi, req.FrontendAttrs); err != nil {
return nil, err
}
return &controlapi.SolveResponse{}, nil
@ -130,49 +121,44 @@ func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Con
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case ss, ok := <-ch:
if !ok {
return nil
}
sr := controlapi.StatusResponse{}
for _, v := range ss.Vertexes {
sr.Vertexes = append(sr.Vertexes, &controlapi.Vertex{
Digest: v.Digest,
Inputs: v.Inputs,
Name: v.Name,
Started: v.Started,
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
Parent: v.Parent,
})
}
for _, v := range ss.Statuses {
sr.Statuses = append(sr.Statuses, &controlapi.VertexStatus{
ID: v.ID,
Vertex: v.Vertex,
Name: v.Name,
Current: v.Current,
Total: v.Total,
Timestamp: v.Timestamp,
Started: v.Started,
Completed: v.Completed,
})
}
for _, v := range ss.Logs {
sr.Logs = append(sr.Logs, &controlapi.VertexLog{
Vertex: v.Vertex,
Stream: int64(v.Stream),
Msg: v.Data,
Timestamp: v.Timestamp,
})
}
if err := stream.SendMsg(&sr); err != nil {
return err
}
ss, ok := <-ch
if !ok {
return nil
}
sr := controlapi.StatusResponse{}
for _, v := range ss.Vertexes {
sr.Vertexes = append(sr.Vertexes, &controlapi.Vertex{
Digest: v.Digest,
Inputs: v.Inputs,
Name: v.Name,
Started: v.Started,
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
})
}
for _, v := range ss.Statuses {
sr.Statuses = append(sr.Statuses, &controlapi.VertexStatus{
ID: v.ID,
Vertex: v.Vertex,
Name: v.Name,
Current: v.Current,
Total: v.Total,
Timestamp: v.Timestamp,
Started: v.Started,
Completed: v.Completed,
})
}
for _, v := range ss.Logs {
sr.Logs = append(sr.Logs, &controlapi.VertexLog{
Vertex: v.Vertex,
Stream: int64(v.Stream),
Msg: v.Data,
Timestamp: v.Timestamp,
})
}
if err := stream.SendMsg(&sr); err != nil {
return err
}
}
})

View File

@ -109,21 +109,7 @@ func (b *buildOp) Run(ctx context.Context, inputs []Reference) (outputs []Refere
lm.Unmount()
lm = nil
v, err := LoadLLB(def)
if err != nil {
return nil, err
}
if len(v.Inputs()) == 0 {
return nil, errors.New("required vertex needs to have inputs")
}
index := v.Inputs()[0].Index
v = v.Inputs()[0].Vertex
vv := toInternalVertex(v)
newref, err := b.s.loadAndSolveChildVertex(ctx, b.v.Digest(), vv, index)
newref, err := b.s.loadAndSolve(ctx, b.v.Digest(), def)
if err != nil {
return nil, err
}

View File

@ -7,6 +7,7 @@ import (
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
@ -26,7 +27,7 @@ type jobList struct {
}
type state struct {
jobs map[*job]struct{}
jobs map[*job]*vertex
solver VertexSolver
mpw *progress.MultiWriter
}
@ -90,7 +91,7 @@ func (jl *jobList) get(id string) (*job, error) {
}
}
func (jl *jobList) loadAndSolveChildVertex(ctx context.Context, dgst digest.Digest, vv *vertex, index Index, f ResolveOpFunc, cache InstructionCache) (Reference, error) {
func (jl *jobList) loadAndSolve(ctx context.Context, dgst digest.Digest, ops [][]byte, f ResolveOpFunc, cache InstructionCache) (Reference, error) {
jl.mu.Lock()
st, ok := jl.actives[dgst]
@ -99,18 +100,19 @@ func (jl *jobList) loadAndSolveChildVertex(ctx context.Context, dgst digest.Dige
return nil, errors.Errorf("no such parent vertex: %v", dgst)
}
var newst *state
var inp *Input
for j := range st.jobs {
var err error
newst, err = j.loadInternal(vv, f)
inp, err = j.loadInternal(ops, f)
if err != nil {
jl.mu.Unlock()
return nil, err
}
}
st = jl.actives[inp.Vertex.Digest()]
jl.mu.Unlock()
return getRef(newst.solver, ctx, vv, index, cache)
return getRef(st.solver, ctx, inp.Vertex.(*vertex), inp.Index, cache) // TODO: combine to pass single input
}
type job struct {
@ -121,49 +123,57 @@ type job struct {
cache InstructionCache
}
func (j *job) load(v *vertex, f ResolveOpFunc) error {
func (j *job) load(ops [][]byte, resolveOp ResolveOpFunc) (*Input, error) {
j.l.mu.Lock()
defer j.l.mu.Unlock()
_, err := j.loadInternal(v, f)
return err
return j.loadInternal(ops, resolveOp)
}
func (j *job) loadInternal(v *vertex, f ResolveOpFunc) (*state, error) {
for _, inp := range v.inputs {
if _, err := j.loadInternal(inp.vertex, f); err != nil {
return nil, err
func (j *job) loadInternal(ops [][]byte, resolveOp ResolveOpFunc) (*Input, error) {
vtx, idx, err := loadLLB(ops, func(dgst digest.Digest, op *pb.Op, load func(digest.Digest) (interface{}, error)) (interface{}, error) {
if st, ok := j.l.actives[dgst]; ok {
if vtx, ok := st.jobs[j]; ok {
return vtx, nil
}
}
}
dgst := v.Digest()
st, ok := j.l.actives[dgst]
if !ok {
st = &state{
jobs: map[*job]struct{}{},
mpw: progress.NewMultiWriter(progress.WithMetadata("vertex", dgst)),
}
op, err := f(v)
vtx, err := newVertex(dgst, op, load)
if err != nil {
return nil, err
}
ctx := progress.WithProgress(context.Background(), st.mpw)
ctx = session.NewContext(ctx, j.session) // TODO: support multiple
s, err := newVertexSolver(ctx, v, op, j.cache, j.getSolver)
if err != nil {
return nil, err
st, ok := j.l.actives[dgst]
if !ok {
st = &state{
jobs: map[*job]*vertex{},
mpw: progress.NewMultiWriter(progress.WithMetadata("vertex", dgst)),
}
op, err := resolveOp(vtx)
if err != nil {
return nil, err
}
ctx := progress.WithProgress(context.Background(), st.mpw)
ctx = session.NewContext(ctx, j.session) // TODO: support multiple
s, err := newVertexSolver(ctx, vtx, op, j.cache, j.getSolver)
if err != nil {
return nil, err
}
st.solver = s
j.l.actives[dgst] = st
}
st.solver = s
j.l.actives[dgst] = st
if _, ok := st.jobs[j]; !ok {
j.pw.Write(vtx.Digest().String(), vtx.clientVertex)
st.mpw.Add(j.pw)
st.jobs[j] = vtx
}
return vtx, nil
})
if err != nil {
return nil, err
}
if _, ok := st.jobs[j]; !ok {
j.pw.Write(v.Digest().String(), v.clientVertex)
st.mpw.Add(j.pw)
st.jobs[j] = struct{}{}
}
return st, nil
return &Input{Vertex: vtx.(*vertex), Index: idx}, nil
}
func (j *job) discard() {
@ -209,7 +219,7 @@ func getRef(s VertexSolver, ctx context.Context, v *vertex, index Index, cache I
return nil, err
}
if ref != nil {
v.notifyCompleted(ctx, true, nil)
markCached(ctx, v.clientVertex)
return ref.(Reference), nil
}
@ -230,7 +240,7 @@ func getRef(s VertexSolver, ctx context.Context, v *vertex, index Index, cache I
return nil, err
}
if ref != nil {
v.notifyCompleted(ctx, true, nil)
markCached(ctx, v.clientVertex)
return ref.(Reference), nil
}
continue
@ -240,7 +250,13 @@ func getRef(s VertexSolver, ctx context.Context, v *vertex, index Index, cache I
}
func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error {
vs := &vertexStream{cache: map[digest.Digest]*client.Vertex{}}
pr := j.pr.Reader(ctx)
defer func() {
if enc := vs.encore(); len(enc) > 0 {
ch <- &client.SolveStatus{Vertexes: enc}
}
}()
for {
p, err := pr.Read(ctx)
if err != nil {
@ -253,7 +269,7 @@ func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error {
for _, p := range p {
switch v := p.Sys.(type) {
case client.Vertex:
ss.Vertexes = append(ss.Vertexes, &v)
ss.Vertexes = append(ss.Vertexes, vs.append(v)...)
case progress.Status:
vtx, ok := p.Meta("vertex")
@ -290,3 +306,39 @@ func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error {
}
}
}
type vertexStream struct {
cache map[digest.Digest]*client.Vertex
}
func (vs *vertexStream) append(v client.Vertex) []*client.Vertex {
var out []*client.Vertex
vs.cache[v.Digest] = &v
if v.Cached {
for _, inp := range v.Inputs {
if inpv, ok := vs.cache[inp]; ok {
if !inpv.Cached && inpv.Completed == nil {
inpv.Cached = true
inpv.Started = v.Completed
inpv.Completed = v.Completed
out = append(vs.append(*inpv), inpv)
}
}
}
}
vcopy := v
return append(out, &vcopy)
}
func (vs *vertexStream) encore() []*client.Vertex {
var out []*client.Vertex
for _, v := range vs.cache {
if v.Started != nil && v.Completed == nil {
now := time.Now()
v.Completed = &now
v.Error = context.Canceled.Error()
out = append(out, v)
}
}
return out
}

View File

@ -8,32 +8,17 @@ import (
"github.com/pkg/errors"
)
func LoadLLB(ops [][]byte) (Vertex, error) {
if len(ops) == 0 {
return nil, errors.New("invalid empty definition")
}
allOps := make(map[digest.Digest]*pb.Op)
var lastOp *pb.Op
var lastDigest digest.Digest
for _, dt := range ops {
var op pb.Op
if err := (&op).Unmarshal(dt); err != nil {
return nil, errors.Wrap(err, "failed to parse llb proto op")
func newVertex(dgst digest.Digest, op *pb.Op, load func(digest.Digest) (interface{}, error)) (*vertex, error) {
vtx := &vertex{sys: op.Op, digest: dgst, name: llbOpName(op)}
for _, in := range op.Inputs {
sub, err := load(in.Digest)
if err != nil {
return nil, err
}
lastOp = &op
lastDigest = digest.FromBytes(dt)
allOps[lastDigest] = &op
vtx.inputs = append(vtx.inputs, &input{index: Index(in.Index), vertex: sub.(*vertex)})
}
delete(allOps, lastDigest) // avoid loops
cache := make(map[digest.Digest]*vertex)
// TODO: validate the connections
return loadLLBVertexRecursive(lastDigest, lastOp, allOps, cache)
vtx.initClientVertex()
return vtx, nil
}
func toInternalVertex(v Vertex) *vertex {
@ -55,26 +40,45 @@ func loadInternalVertexHelper(v Vertex, cache map[digest.Digest]*vertex) *vertex
return vtx
}
func loadLLBVertexRecursive(dgst digest.Digest, op *pb.Op, all map[digest.Digest]*pb.Op, cache map[digest.Digest]*vertex) (*vertex, error) {
if v, ok := cache[dgst]; ok {
return v, nil
func loadLLB(ops [][]byte, fn func(digest.Digest, *pb.Op, func(digest.Digest) (interface{}, error)) (interface{}, error)) (interface{}, Index, error) {
if len(ops) == 0 {
return nil, 0, errors.New("invalid empty definition")
}
vtx := &vertex{sys: op.Op, digest: dgst, name: llbOpName(op)}
for _, in := range op.Inputs {
dgst := digest.Digest(in.Digest)
op, ok := all[dgst]
if !ok {
return nil, errors.Errorf("failed to find %s", in)
allOps := make(map[digest.Digest]*pb.Op)
var dgst digest.Digest
for _, dt := range ops {
var op pb.Op
if err := (&op).Unmarshal(dt); err != nil {
return nil, 0, errors.Wrap(err, "failed to parse llb proto op")
}
sub, err := loadLLBVertexRecursive(dgst, op, all, cache)
dgst = digest.FromBytes(dt)
allOps[dgst] = &op
}
lastOp := allOps[dgst]
delete(allOps, dgst)
dgst = lastOp.Inputs[0].Digest
cache := make(map[digest.Digest]interface{})
var rec func(dgst digest.Digest) (interface{}, error)
rec = func(dgst digest.Digest) (interface{}, error) {
if v, ok := cache[dgst]; ok {
return v, nil
}
v, err := fn(dgst, allOps[dgst], rec)
if err != nil {
return nil, err
}
vtx.inputs = append(vtx.inputs, &input{index: Index(in.Index), vertex: sub})
cache[dgst] = v
return v, nil
}
vtx.initClientVertex()
cache[dgst] = vtx
return vtx, nil
v, err := rec(dgst)
return v, Index(lastOp.Inputs[0].Index), err
}
func llbOpName(op *pb.Op) string {

View File

@ -4,11 +4,13 @@ import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/bgfunc"
@ -80,7 +82,7 @@ func New(resolve ResolveOpFunc, cache InstructionCache, imageSource source.Sourc
return &Solver{resolve: resolve, jobs: newJobList(), cache: cache, imageSource: imageSource}
}
func (s *Solver) Solve(ctx context.Context, id string, f frontend.Frontend, v Vertex, exp exporter.ExporterInstance, frontendOpt map[string]string) error {
func (s *Solver) Solve(ctx context.Context, id string, f frontend.Frontend, dt [][]byte, exp exporter.ExporterInstance, frontendOpt map[string]string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -88,19 +90,6 @@ func (s *Solver) Solve(ctx context.Context, id string, f frontend.Frontend, v Ve
defer closeProgressWriter()
var vv *vertex
var index Index
if v != nil {
if len(v.Inputs()) == 0 {
return errors.New("required vertex needs to have inputs")
}
index = v.Inputs()[0].Index
v = v.Inputs()[0].Vertex
vv = toInternalVertex(v)
}
ctx, j, err := s.jobs.new(ctx, id, pr, s.cache)
if err != nil {
return err
@ -108,13 +97,14 @@ func (s *Solver) Solve(ctx context.Context, id string, f frontend.Frontend, v Ve
var ref Reference
var exporterOpt map[string]interface{}
// solver: s.getRef,
if vv != nil {
if err := j.load(vv, s.resolve); err != nil {
if dt != nil {
var inp *Input
inp, err = j.load(dt, s.resolve)
if err != nil {
j.discard()
return err
}
ref, err = j.getRef(ctx, vv, index)
ref, err = j.getRef(ctx, inp.Vertex.(*vertex), inp.Index)
} else {
ref, exporterOpt, err = f.Solve(ctx, &llbBridge{
job: j,
@ -140,11 +130,15 @@ func (s *Solver) Solve(ctx context.Context, id string, f frontend.Frontend, v Ve
}
if exp != nil {
vv.notifyStarted(ctx)
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", vv.Digest()))
v := client.Vertex{
Digest: digest.FromBytes([]byte(identity.NewID())),
Name: exp.Name(),
}
notifyStarted(ctx, &v)
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
defer pw.Close()
err := exp.Export(ctx, immutable, exporterOpt)
vv.notifyCompleted(ctx, false, err)
notifyCompleted(ctx, &v, err)
if err != nil {
return err
}
@ -161,8 +155,8 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.
return j.pipe(ctx, statusChan)
}
func (s *Solver) loadAndSolveChildVertex(ctx context.Context, dgst digest.Digest, vv *vertex, index Index) (Reference, error) {
return s.jobs.loadAndSolveChildVertex(ctx, dgst, vv, index, s.resolve, s.cache)
func (s *Solver) loadAndSolve(ctx context.Context, dgst digest.Digest, def [][]byte) (Reference, error) {
return s.jobs.loadAndSolve(ctx, dgst, def, s.resolve, s.cache)
}
type VertexSolver interface {
@ -181,22 +175,24 @@ type vertexInput struct {
type vertexSolver struct {
inputs []*vertexInput
v *vertex
cv client.Vertex
op Op
cache InstructionCache
refs []*sharedRef
f *bgfunc.F
ctx context.Context
baseKey digest.Digest
mu sync.Mutex
results []digest.Digest
baseKey digest.Digest
mu sync.Mutex
results []digest.Digest
markCachedOnce sync.Once
signal *signal // used to notify that there are callers who need more data
}
type resolveF func(digest.Digest) (VertexSolver, error)
func newVertexSolver(ctx context.Context, v *vertex, op Op, c InstructionCache, resolve resolveF) (VertexSolver, error) {
func newVertexSolver(ctx context.Context, v *vertex, op Op, c InstructionCache, resolve resolveF) (*vertexSolver, error) {
inputs := make([]*vertexInput, len(v.inputs))
for i, in := range v.inputs {
s, err := resolve(in.vertex.digest)
@ -207,6 +203,7 @@ func newVertexSolver(ctx context.Context, v *vertex, op Op, c InstructionCache,
if err != nil {
return nil, err
}
ev.Cancel()
inputs[i] = &vertexInput{
solver: s,
ev: ev,
@ -216,12 +213,26 @@ func newVertexSolver(ctx context.Context, v *vertex, op Op, c InstructionCache,
ctx: ctx,
inputs: inputs,
v: v,
cv: v.clientVertex,
op: op,
cache: c,
signal: newSignaller(),
}, nil
}
func markCached(ctx context.Context, cv client.Vertex) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
if cv.Started == nil {
now := time.Now()
cv.Started = &now
cv.Completed = &now
cv.Cached = true
}
pw.Write(cv.Digest.String(), cv)
}
func (vs *vertexSolver) CacheKey(ctx context.Context, index Index) (digest.Digest, error) {
vs.mu.Lock()
defer vs.mu.Unlock()
@ -334,12 +345,13 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
}
vs.mu.Unlock()
wait := vs.signal.Wait()
waitFirst := vs.signal.Wait()
waitRun := waitFirst
select {
case <-ctx.Done():
return ctx.Err()
case <-wait:
case <-waitFirst:
}
// this is where you lookup the cache keys that were successfully probed
@ -353,12 +365,13 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
eg.Go(func() error {
inp := vs.inputs[i]
defer inp.ev.Cancel()
for {
waitNext := waitFirst
for {
select {
case <-ctx2.Done():
return ctx2.Err()
case <-wait:
case <-waitNext:
}
// check if current cache key is in cache
@ -369,6 +382,9 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
}
if ref != nil {
inp.ref = ref.(Reference)
inp.solver.(*vertexSolver).markCachedOnce.Do(func() {
markCached(ctx, inp.solver.(*vertexSolver).cv)
})
return nil
}
}
@ -408,8 +424,9 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
return err
}
vs.results = append(vs.results, dgst)
signal() // wake up callers
wait = vs.signal.Reset() // make sure we don't continue unless there are callers
signal() // wake up callers
waitNext = vs.signal.Reset() // make sure we don't continue unless there are callers
waitRun = waitNext
vs.mu.Unlock()
}
}
@ -448,20 +465,20 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
vs.mu.Lock()
vs.results = append(vs.results, extraKeys...)
signal()
wait = vs.signal.Reset()
waitRun = vs.signal.Reset()
vs.mu.Unlock()
}
select {
case <-ctx.Done():
return
case <-wait:
case <-waitRun:
}
// no cache hit. start evaluating the node
vs.v.notifyStarted(ctx)
notifyStarted(ctx, &vs.cv)
defer func() {
vs.v.notifyCompleted(ctx, false, retErr)
notifyCompleted(ctx, &vs.cv, retErr)
}()
refs, err := vs.op.Run(ctx, inputRefs)
@ -472,11 +489,13 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
for i, r := range refs {
sr[i] = newSharedRef(r)
}
vs.mu.Lock()
vs.refs = sr
vs.mu.Unlock()
// store the cacheKeys for current refs
if vs.cache != nil {
cacheKey, err := vs.mainCacheKey()
cacheKey, err := vs.lastCacheKey()
if err != nil {
return err
}
@ -556,23 +575,11 @@ type resolveImageConfig interface {
}
func (s *llbBridge) Solve(ctx context.Context, dt [][]byte) (cache.ImmutableRef, error) {
v, err := LoadLLB(dt)
inp, err := s.job.load(dt, s.resolveOp)
if err != nil {
return nil, err
}
if len(v.Inputs()) == 0 {
return nil, errors.New("required vertex needs to have inputs")
}
index := v.Inputs()[0].Index
v = v.Inputs()[0].Vertex
vv := toInternalVertex(v)
if err := s.job.load(vv, s.resolveOp); err != nil {
return nil, err
}
ref, err := s.job.getRef(ctx, vv, index)
ref, err := s.job.getRef(ctx, inp.Vertex.(*vertex), inp.Index)
if err != nil {
return nil, err
}

View File

@ -1,13 +1,13 @@
package solver
import (
"context"
"sync"
"time"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"golang.org/x/net/context"
)
// Vertex is one node in the build graph
@ -78,44 +78,26 @@ func (v *vertex) Name() string {
return v.name
}
func (v *vertex) inputRequiresExport(i int) bool {
return true // TODO
}
func (v *vertex) notifyStarted(ctx context.Context) {
v.recursiveMarkCached(ctx)
func notifyStarted(ctx context.Context, v *client.Vertex) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
v.clientVertex.Started = &now
v.clientVertex.Completed = nil
pw.Write(v.Digest().String(), v.clientVertex)
v.Started = &now
v.Completed = nil
pw.Write(v.Digest.String(), *v)
}
func (v *vertex) notifyCompleted(ctx context.Context, cached bool, err error) {
func notifyCompleted(ctx context.Context, v *client.Vertex, err error) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
v.recursiveMarkCached(ctx)
if v.clientVertex.Started == nil {
v.clientVertex.Started = &now
if v.Started == nil {
v.Started = &now
}
v.clientVertex.Completed = &now
v.clientVertex.Cached = cached
v.Completed = &now
v.Cached = false
if err != nil {
v.clientVertex.Error = err.Error()
v.Error = err.Error()
}
pw.Write(v.Digest().String(), v.clientVertex)
}
func (v *vertex) recursiveMarkCached(ctx context.Context) {
for _, inp := range v.inputs {
inp.vertex.notifyMu.Lock()
if inp.vertex.clientVertex.Started == nil {
inp.vertex.recursiveMarkCached(ctx)
inp.vertex.notifyCompleted(ctx, true, nil)
}
inp.vertex.notifyMu.Unlock()
}
pw.Write(v.Digest.String(), *v)
}

View File

@ -295,11 +295,12 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
for _, j := range ongoing.jobs() {
refKey := remotes.MakeRefKey(ctx, j.Descriptor)
if a, ok := actives[refKey]; ok {
started := j.started
pw.Write(j.Digest.String(), progress.Status{
Action: a.Status,
Total: int(a.Total),
Current: int(a.Offset),
Started: &j.started,
Started: &started,
})
continue
}
@ -318,12 +319,14 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
}
if done || j.done {
started := j.started
createdAt := info.CreatedAt
pw.Write(j.Digest.String(), progress.Status{
Action: "done",
Current: int(info.Size),
Total: int(info.Size),
Completed: &info.CreatedAt,
Started: &j.started,
Completed: &createdAt,
Started: &started,
})
}
}

View File

@ -41,29 +41,28 @@ func (f *F) run() {
f.runMu.Lock()
if !f.running && !f.done {
f.running = true
ctx, cancel := context.WithCancel(f.mainCtx)
ctxErr := make(chan error, 1)
f.ctxErr = ctxErr
f.cancelCtx = cancel
go func() {
var err error
var nodone bool
ctxErr := make(chan error, 1)
defer func() {
// release all cancellations
ctxErr <- err
close(ctxErr)
f.mu.Lock()
f.runMu.Lock()
f.running = false
f.runMu.Unlock()
f.mu.Lock()
if !nodone {
f.done = true
f.err = err
}
f.cond.Broadcast()
ctxErr <- err
f.mu.Unlock()
}()
ctx, cancel := context.WithCancel(f.mainCtx)
f.runMu.Lock()
f.cancelCtx = cancel
f.ctxErr = ctxErr
f.runMu.Unlock()
err = f.f(ctx, func() {
f.cond.Broadcast()
})
@ -84,11 +83,13 @@ func (f *F) addSem() {
func (f *F) clearSem() error {
f.sem--
var err error
f.runMu.Lock()
if cctx := f.cancelCtx; f.sem == 0 && cctx != nil {
cctx()
err = <-f.ctxErr
f.cancelCtx = nil
}
f.runMu.Unlock()
return err
}
@ -114,10 +115,11 @@ func (c *Caller) Call(ctx context.Context, f func() (interface{}, error)) (inter
for {
select {
case <-ctx.Done():
c.F.mu.RUnlock()
if err := c.Cancel(); err != nil {
if err := c.cancel(); err != nil {
c.F.mu.RUnlock()
return nil, err
}
c.F.mu.RUnlock()
return nil, ctx.Err()
default:
}
@ -150,6 +152,12 @@ func (c *Caller) Call(ctx context.Context, f func() (interface{}, error)) (inter
}
func (c *Caller) Cancel() error {
c.F.mu.Lock()
defer c.F.mu.Unlock()
return c.cancel()
}
func (c *Caller) cancel() error {
c.F.semMu.Lock()
defer c.F.semMu.Unlock()
if c.active {

View File

@ -100,10 +100,6 @@ func (t *trace) update(s *client.SolveStatus) {
t.byDigest[v.Digest] = &vertex{
byID: make(map[string]*status),
}
} else {
if prev.Parent != v.Parent { // skip vertexes already in list for other parents
continue
}
}
if v.Started != nil && (prev == nil || prev.Started == nil) {
if t.localTimeDiff == 0 {
@ -112,9 +108,6 @@ func (t *trace) update(s *client.SolveStatus) {
t.vertexes = append(t.vertexes, t.byDigest[v.Digest])
}
t.byDigest[v.Digest].Vertex = v
if v.Parent != "" {
t.byDigest[v.Digest].indent = t.byDigest[v.Parent].indent + "=> "
}
}
for _, s := range s.Statuses {
v, ok := t.byDigest[s.Vertex]