diff --git a/solver/refcache.go b/solver/refcache.go new file mode 100644 index 00000000..4b8d3116 --- /dev/null +++ b/solver/refcache.go @@ -0,0 +1,129 @@ +package solver + +import ( + "sync" + + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/util/flightcontrol" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +// refCache holds the references to snapshots what are currently activve +// and allows sharing them between jobs + +type refCache struct { + mu sync.Mutex + cache map[digest.Digest]*cachedReq + flightcontrol.Group +} + +type cachedReq struct { + jobs map[*job]struct{} + value []*sharedRef +} + +func (c *refCache) probe(j *job, key digest.Digest) bool { + c.mu.Lock() + if c.cache == nil { + c.cache = make(map[digest.Digest]*cachedReq) + } + cr, ok := c.cache[key] + if !ok { + cr = &cachedReq{jobs: make(map[*job]struct{})} + c.cache[key] = cr + } + cr.jobs[j] = struct{}{} + if ok && cr.value != nil { + c.mu.Unlock() + return true + } + c.mu.Unlock() + return false +} +func (c *refCache) get(key digest.Digest) ([]cache.ImmutableRef, error) { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.cache[key] + // these errors should not be reached + if !ok { + return nil, errors.Errorf("no ref cache found") + } + if v.value == nil { + return nil, errors.Errorf("no ref cache value set") + } + refs := make([]cache.ImmutableRef, 0, len(v.value)) + for _, r := range v.value { + refs = append(refs, r.Clone()) + } + return refs, nil +} +func (c *refCache) set(key digest.Digest, refs []cache.ImmutableRef) { + c.mu.Lock() + sharedRefs := make([]*sharedRef, 0, len(refs)) + for _, r := range refs { + sharedRefs = append(sharedRefs, newSharedRef(r)) + } + c.cache[key].value = sharedRefs + c.mu.Unlock() +} +func (c *refCache) cancel(j *job) { + c.mu.Lock() + for k, r := range c.cache { + if _, ok := r.jobs[j]; ok { + delete(r.jobs, j) + } + if len(r.jobs) == 0 { + for _, r := range r.value { + go r.Release(context.TODO()) + } + delete(c.cache, k) + } + } + c.mu.Unlock() +} + +// sharedRef is a wrapper around releasable that allows you to make new +// releasable child objects +type sharedRef struct { + mu sync.Mutex + refs map[*sharedRefInstance]struct{} + main cache.ImmutableRef + cache.ImmutableRef +} + +func newSharedRef(main cache.ImmutableRef) *sharedRef { + mr := &sharedRef{ + refs: make(map[*sharedRefInstance]struct{}), + ImmutableRef: main, + } + mr.main = mr.Clone() + return mr +} + +func (mr *sharedRef) Clone() cache.ImmutableRef { + mr.mu.Lock() + r := &sharedRefInstance{sharedRef: mr} + mr.refs[r] = struct{}{} + mr.mu.Unlock() + return r +} + +func (mr *sharedRef) Release(ctx context.Context) error { + return mr.main.Release(ctx) +} + +type sharedRefInstance struct { + *sharedRef +} + +func (r *sharedRefInstance) Release(ctx context.Context) error { + r.sharedRef.mu.Lock() + defer r.sharedRef.mu.Unlock() + delete(r.sharedRef.refs, r) + if len(r.sharedRef.refs) == 0 { + return r.sharedRef.ImmutableRef.Release(ctx) + } + return nil +} diff --git a/solver/solver.go b/solver/solver.go index 5e9fa6ab..34d491d5 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -27,8 +27,9 @@ type Opt struct { } type Solver struct { - opt Opt - jobs *jobList + opt Opt + jobs *jobList + active refCache } func New(opt Opt) *Solver { @@ -45,18 +46,21 @@ func (s *Solver) Solve(ctx context.Context, id string, g *opVertex) error { g = g.inputs[0] } - _, err := s.jobs.new(ctx, id, g, pr) + j, err := s.jobs.new(ctx, id, g, pr) if err != nil { return err } - err = g.solve(ctx, s.opt) // TODO: separate exporting + refs, err := s.getRefs(ctx, j, j.g) closeProgressWriter() + s.active.cancel(j) if err != nil { return err } - g.release(ctx) + for _, r := range refs { + r.Release(context.TODO()) + } // TODO: export final vertex state return err } @@ -70,11 +74,94 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client. return j.pipe(ctx, statusChan) } +func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) ([]cache.ImmutableRef, error) { + pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.dgst)) + defer pw.Close() + + s.active.probe(j, g.dgst) // this registers the key with the job + + // refs contains all outputs for all input vertexes + refs := make([][]*sharedRef, len(g.inputs)) + if len(g.inputs) > 0 { + eg, ctx := errgroup.WithContext(ctx) + for i, in := range g.inputs { + func(i int, in *opVertex) { + eg.Go(func() error { + r, err := s.getRefs(ctx, j, in) + if err != nil { + return err + } + for _, r := range r { + refs[i] = append(refs[i], newSharedRef(r)) + } + return nil + }) + }(i, in) + } + err := eg.Wait() + if err != nil { + for _, r := range refs { + for _, r := range r { + go r.Release(context.TODO()) + } + } + return nil, err + } + } + + // determine the inputs that were needed + inputs := make([]cache.ImmutableRef, 0, len(g.op.Inputs)) + for _, inp := range g.op.Inputs { + for i, v := range g.inputs { + if v.dgst == digest.Digest(inp.Digest) { + inputs = append(inputs, refs[i][int(inp.Index)].Clone()) + } + } + } + + // release anything else + for _, r := range refs { + for _, r := range r { + go r.Release(context.TODO()) + } + } + + g.notifyStarted(ctx) + defer g.notifyCompleted(ctx) + + _, err := s.active.Do(ctx, g.dgst.String(), func(ctx context.Context) (interface{}, error) { + if hit := s.active.probe(j, g.dgst); hit { + return nil, nil + } + refs, err := s.runVertex(ctx, g, inputs) + if err != nil { + return nil, err + } + + s.active.set(g.dgst, refs) + return nil, nil + }) + if err != nil { + return nil, err + } + return s.active.get(g.dgst) +} + +func (s *Solver) runVertex(ctx context.Context, g *opVertex, inputs []cache.ImmutableRef) ([]cache.ImmutableRef, error) { + switch op := g.op.Op.(type) { + case *pb.Op_Source: + return g.runSourceOp(ctx, s.opt.SourceManager, op) + case *pb.Op_Exec: + return g.runExecOp(ctx, s.opt.CacheManager, s.opt.Worker, op, inputs) + default: + return nil, errors.Errorf("invalid op type %T", g.op.Op) + } +} + type opVertex struct { mu sync.Mutex op *pb.Op inputs []*opVertex - refs []cache.ImmutableRef err error dgst digest.Digest vtx client.Vertex @@ -84,103 +171,19 @@ func (g *opVertex) inputRequiresExport(i int) bool { return true // TODO } -func (g *opVertex) release(ctx context.Context) (retErr error) { - for _, i := range g.inputs { - if err := i.release(ctx); err != nil { - retErr = err - } - } - for _, ref := range g.refs { - if ref != nil { - if err := ref.Release(ctx); err != nil { - retErr = err - } - } - } - return retErr -} - -func (g *opVertex) getInputRefForIndex(i int) cache.ImmutableRef { - input := g.op.Inputs[i] - for _, v := range g.inputs { - if v.dgst == digest.Digest(input.Digest) { - return v.refs[input.Index] - } - } - return nil -} - -func (g *opVertex) solve(ctx context.Context, opt Opt) (retErr error) { - g.mu.Lock() - defer g.mu.Unlock() - - if g.err != nil { - return g.err - } - if len(g.refs) > 0 { - return nil - } - - defer func() { - if retErr != nil { - g.err = retErr - } - }() - - pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.dgst)) - defer pw.Close() - - if len(g.inputs) > 0 { - eg, ctx := errgroup.WithContext(ctx) - - for _, in := range g.inputs { - func(in *opVertex) { - eg.Go(func() error { - if err := in.solve(ctx, opt); err != nil { - return err - } - return nil - }) - }(in) - } - err := eg.Wait() - if err != nil { - return err - } - } - - g.notifyStarted(ctx) - defer g.notifyCompleted(ctx) - - switch op := g.op.Op.(type) { - case *pb.Op_Source: - if err := g.runSourceOp(ctx, opt.SourceManager, op); err != nil { - return err - } - case *pb.Op_Exec: - if err := g.runExecOp(ctx, opt.CacheManager, opt.Worker, op); err != nil { - return err - } - default: - return errors.Errorf("invalid op type %T", g.op.Op) - } - return nil -} - -func (g *opVertex) runSourceOp(ctx context.Context, sm *source.Manager, op *pb.Op_Source) error { +func (g *opVertex) runSourceOp(ctx context.Context, sm *source.Manager, op *pb.Op_Source) ([]cache.ImmutableRef, error) { id, err := source.FromString(op.Source.Identifier) if err != nil { - return err + return nil, err } ref, err := sm.Pull(ctx, id) if err != nil { - return err + return nil, err } - g.refs = []cache.ImmutableRef{ref} - return nil + return []cache.ImmutableRef{ref}, nil } -func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op_Exec) error { +func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op_Exec, inputs []cache.ImmutableRef) ([]cache.ImmutableRef, error) { mounts := make(map[string]cache.Mountable) var outputs []cache.MutableRef @@ -190,7 +193,7 @@ func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Wor if o != nil { s, err := o.Freeze() // TODO: log error if err == nil { - s.Release(ctx) + go s.Release(ctx) } } } @@ -198,12 +201,15 @@ func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Wor for _, m := range op.Exec.Mounts { var mountable cache.Mountable - ref := g.getInputRefForIndex(int(m.Input)) + if int(m.Input) > len(inputs) { + return nil, errors.Errorf("missing input %d", m.Input) + } + ref := inputs[int(m.Input)] mountable = ref if m.Output != -1 { active, err := cm.New(ctx, ref) // TODO: should be method if err != nil { - return err + return nil, err } outputs = append(outputs, active) mountable = active @@ -223,19 +229,19 @@ func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Wor defer stderr.Close() if err := w.Exec(ctx, meta, mounts, stdout, stderr); err != nil { - return errors.Wrapf(err, "worker failed running %v", meta.Args) + return nil, errors.Wrapf(err, "worker failed running %v", meta.Args) } - g.refs = []cache.ImmutableRef{} + refs := []cache.ImmutableRef{} for i, o := range outputs { ref, err := o.ReleaseAndCommit(ctx) if err != nil { - return errors.Wrapf(err, "error committing %s", o.ID()) + return nil, errors.Wrapf(err, "error committing %s", o.ID()) } - g.refs = append(g.refs, ref) + refs = append(refs, ref) outputs[i] = nil } - return nil + return refs, nil } func (g *opVertex) notifyStarted(ctx context.Context) { diff --git a/util/flightcontrol/flightcontrol.go b/util/flightcontrol/flightcontrol.go index 322aa193..1ba30954 100644 --- a/util/flightcontrol/flightcontrol.go +++ b/util/flightcontrol/flightcontrol.go @@ -64,6 +64,7 @@ type call struct { closeProgressWriter func() progressState *progressState + progressCtx context.Context } func newCall(fn func(ctx context.Context) (interface{}, error)) *call { @@ -73,8 +74,9 @@ func newCall(fn func(ctx context.Context) (interface{}, error)) *call { progressState: newProgressState(), } ctx := newContext(c) // newSharedContext - pr, _, closeProgressWriter := progress.NewContext(ctx) + pr, pctx, closeProgressWriter := progress.NewContext(context.Background()) + c.progressCtx = pctx c.ctx = ctx c.closeProgressWriter = closeProgressWriter @@ -175,7 +177,7 @@ func (c *call) Err() error { func (c *call) Value(key interface{}) interface{} { c.mu.Lock() defer c.mu.Unlock() - for _, ctx := range append([]context.Context{}, c.ctxs...) { + for _, ctx := range append([]context.Context{c.progressCtx}, c.ctxs...) { select { case <-ctx.Done(): default: diff --git a/util/progress/multireader.go b/util/progress/multireader.go index 2583ed81..2bd3f2ca 100644 --- a/util/progress/multireader.go +++ b/util/progress/multireader.go @@ -69,7 +69,7 @@ func (mr *MultiReader) handle() error { mr.mu.Lock() for _, p := range p { for w := range mr.writers { - w.WriteRawProgress(p) + w.writeRawProgress(p) } } mr.mu.Unlock() diff --git a/util/progress/progress.go b/util/progress/progress.go index 4c947821..ab2d121b 100644 --- a/util/progress/progress.go +++ b/util/progress/progress.go @@ -183,7 +183,7 @@ func (pw *progressWriter) Write(id string, v interface{}) error { if pw.done { return errors.Errorf("writing %s to closed progress writer", id) } - return pw.WriteRawProgress(&Progress{ + return pw.writeRawProgress(&Progress{ ID: id, Timestamp: time.Now(), Sys: v, @@ -192,6 +192,11 @@ func (pw *progressWriter) Write(id string, v interface{}) error { } func (pw *progressWriter) WriteRawProgress(p *Progress) error { + p.meta = pw.meta + return pw.writeRawProgress(p) +} + +func (pw *progressWriter) writeRawProgress(p *Progress) error { pw.reader.mu.Lock() pw.reader.dirty[p.ID] = p pw.reader.mu.Unlock()