Merge pull request #42 from tonistiigi/vertex-sharing

solver: add vertex sharing on concurrent builds
docker-18.09
Tõnis Tiigi 2017-06-29 11:04:45 -07:00 committed by GitHub
commit 8a5b6c8118
5 changed files with 249 additions and 107 deletions

129
solver/refcache.go Normal file
View File

@ -0,0 +1,129 @@
package solver
import (
"sync"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/util/flightcontrol"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// refCache holds the references to snapshots what are currently activve
// and allows sharing them between jobs
type refCache struct {
mu sync.Mutex
cache map[digest.Digest]*cachedReq
flightcontrol.Group
}
type cachedReq struct {
jobs map[*job]struct{}
value []*sharedRef
}
func (c *refCache) probe(j *job, key digest.Digest) bool {
c.mu.Lock()
if c.cache == nil {
c.cache = make(map[digest.Digest]*cachedReq)
}
cr, ok := c.cache[key]
if !ok {
cr = &cachedReq{jobs: make(map[*job]struct{})}
c.cache[key] = cr
}
cr.jobs[j] = struct{}{}
if ok && cr.value != nil {
c.mu.Unlock()
return true
}
c.mu.Unlock()
return false
}
func (c *refCache) get(key digest.Digest) ([]cache.ImmutableRef, error) {
c.mu.Lock()
defer c.mu.Unlock()
v, ok := c.cache[key]
// these errors should not be reached
if !ok {
return nil, errors.Errorf("no ref cache found")
}
if v.value == nil {
return nil, errors.Errorf("no ref cache value set")
}
refs := make([]cache.ImmutableRef, 0, len(v.value))
for _, r := range v.value {
refs = append(refs, r.Clone())
}
return refs, nil
}
func (c *refCache) set(key digest.Digest, refs []cache.ImmutableRef) {
c.mu.Lock()
sharedRefs := make([]*sharedRef, 0, len(refs))
for _, r := range refs {
sharedRefs = append(sharedRefs, newSharedRef(r))
}
c.cache[key].value = sharedRefs
c.mu.Unlock()
}
func (c *refCache) cancel(j *job) {
c.mu.Lock()
for k, r := range c.cache {
if _, ok := r.jobs[j]; ok {
delete(r.jobs, j)
}
if len(r.jobs) == 0 {
for _, r := range r.value {
go r.Release(context.TODO())
}
delete(c.cache, k)
}
}
c.mu.Unlock()
}
// sharedRef is a wrapper around releasable that allows you to make new
// releasable child objects
type sharedRef struct {
mu sync.Mutex
refs map[*sharedRefInstance]struct{}
main cache.ImmutableRef
cache.ImmutableRef
}
func newSharedRef(main cache.ImmutableRef) *sharedRef {
mr := &sharedRef{
refs: make(map[*sharedRefInstance]struct{}),
ImmutableRef: main,
}
mr.main = mr.Clone()
return mr
}
func (mr *sharedRef) Clone() cache.ImmutableRef {
mr.mu.Lock()
r := &sharedRefInstance{sharedRef: mr}
mr.refs[r] = struct{}{}
mr.mu.Unlock()
return r
}
func (mr *sharedRef) Release(ctx context.Context) error {
return mr.main.Release(ctx)
}
type sharedRefInstance struct {
*sharedRef
}
func (r *sharedRefInstance) Release(ctx context.Context) error {
r.sharedRef.mu.Lock()
defer r.sharedRef.mu.Unlock()
delete(r.sharedRef.refs, r)
if len(r.sharedRef.refs) == 0 {
return r.sharedRef.ImmutableRef.Release(ctx)
}
return nil
}

View File

@ -29,6 +29,7 @@ type Opt struct {
type Solver struct {
opt Opt
jobs *jobList
active refCache
}
func New(opt Opt) *Solver {
@ -45,18 +46,21 @@ func (s *Solver) Solve(ctx context.Context, id string, g *opVertex) error {
g = g.inputs[0]
}
_, err := s.jobs.new(ctx, id, g, pr)
j, err := s.jobs.new(ctx, id, g, pr)
if err != nil {
return err
}
err = g.solve(ctx, s.opt) // TODO: separate exporting
refs, err := s.getRefs(ctx, j, j.g)
closeProgressWriter()
s.active.cancel(j)
if err != nil {
return err
}
g.release(ctx)
for _, r := range refs {
r.Release(context.TODO())
}
// TODO: export final vertex state
return err
}
@ -70,11 +74,94 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.
return j.pipe(ctx, statusChan)
}
func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) ([]cache.ImmutableRef, error) {
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.dgst))
defer pw.Close()
s.active.probe(j, g.dgst) // this registers the key with the job
// refs contains all outputs for all input vertexes
refs := make([][]*sharedRef, len(g.inputs))
if len(g.inputs) > 0 {
eg, ctx := errgroup.WithContext(ctx)
for i, in := range g.inputs {
func(i int, in *opVertex) {
eg.Go(func() error {
r, err := s.getRefs(ctx, j, in)
if err != nil {
return err
}
for _, r := range r {
refs[i] = append(refs[i], newSharedRef(r))
}
return nil
})
}(i, in)
}
err := eg.Wait()
if err != nil {
for _, r := range refs {
for _, r := range r {
go r.Release(context.TODO())
}
}
return nil, err
}
}
// determine the inputs that were needed
inputs := make([]cache.ImmutableRef, 0, len(g.op.Inputs))
for _, inp := range g.op.Inputs {
for i, v := range g.inputs {
if v.dgst == digest.Digest(inp.Digest) {
inputs = append(inputs, refs[i][int(inp.Index)].Clone())
}
}
}
// release anything else
for _, r := range refs {
for _, r := range r {
go r.Release(context.TODO())
}
}
g.notifyStarted(ctx)
defer g.notifyCompleted(ctx)
_, err := s.active.Do(ctx, g.dgst.String(), func(ctx context.Context) (interface{}, error) {
if hit := s.active.probe(j, g.dgst); hit {
return nil, nil
}
refs, err := s.runVertex(ctx, g, inputs)
if err != nil {
return nil, err
}
s.active.set(g.dgst, refs)
return nil, nil
})
if err != nil {
return nil, err
}
return s.active.get(g.dgst)
}
func (s *Solver) runVertex(ctx context.Context, g *opVertex, inputs []cache.ImmutableRef) ([]cache.ImmutableRef, error) {
switch op := g.op.Op.(type) {
case *pb.Op_Source:
return g.runSourceOp(ctx, s.opt.SourceManager, op)
case *pb.Op_Exec:
return g.runExecOp(ctx, s.opt.CacheManager, s.opt.Worker, op, inputs)
default:
return nil, errors.Errorf("invalid op type %T", g.op.Op)
}
}
type opVertex struct {
mu sync.Mutex
op *pb.Op
inputs []*opVertex
refs []cache.ImmutableRef
err error
dgst digest.Digest
vtx client.Vertex
@ -84,103 +171,19 @@ func (g *opVertex) inputRequiresExport(i int) bool {
return true // TODO
}
func (g *opVertex) release(ctx context.Context) (retErr error) {
for _, i := range g.inputs {
if err := i.release(ctx); err != nil {
retErr = err
}
}
for _, ref := range g.refs {
if ref != nil {
if err := ref.Release(ctx); err != nil {
retErr = err
}
}
}
return retErr
}
func (g *opVertex) getInputRefForIndex(i int) cache.ImmutableRef {
input := g.op.Inputs[i]
for _, v := range g.inputs {
if v.dgst == digest.Digest(input.Digest) {
return v.refs[input.Index]
}
}
return nil
}
func (g *opVertex) solve(ctx context.Context, opt Opt) (retErr error) {
g.mu.Lock()
defer g.mu.Unlock()
if g.err != nil {
return g.err
}
if len(g.refs) > 0 {
return nil
}
defer func() {
if retErr != nil {
g.err = retErr
}
}()
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.dgst))
defer pw.Close()
if len(g.inputs) > 0 {
eg, ctx := errgroup.WithContext(ctx)
for _, in := range g.inputs {
func(in *opVertex) {
eg.Go(func() error {
if err := in.solve(ctx, opt); err != nil {
return err
}
return nil
})
}(in)
}
err := eg.Wait()
if err != nil {
return err
}
}
g.notifyStarted(ctx)
defer g.notifyCompleted(ctx)
switch op := g.op.Op.(type) {
case *pb.Op_Source:
if err := g.runSourceOp(ctx, opt.SourceManager, op); err != nil {
return err
}
case *pb.Op_Exec:
if err := g.runExecOp(ctx, opt.CacheManager, opt.Worker, op); err != nil {
return err
}
default:
return errors.Errorf("invalid op type %T", g.op.Op)
}
return nil
}
func (g *opVertex) runSourceOp(ctx context.Context, sm *source.Manager, op *pb.Op_Source) error {
func (g *opVertex) runSourceOp(ctx context.Context, sm *source.Manager, op *pb.Op_Source) ([]cache.ImmutableRef, error) {
id, err := source.FromString(op.Source.Identifier)
if err != nil {
return err
return nil, err
}
ref, err := sm.Pull(ctx, id)
if err != nil {
return err
return nil, err
}
g.refs = []cache.ImmutableRef{ref}
return nil
return []cache.ImmutableRef{ref}, nil
}
func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op_Exec) error {
func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op_Exec, inputs []cache.ImmutableRef) ([]cache.ImmutableRef, error) {
mounts := make(map[string]cache.Mountable)
var outputs []cache.MutableRef
@ -190,7 +193,7 @@ func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Wor
if o != nil {
s, err := o.Freeze() // TODO: log error
if err == nil {
s.Release(ctx)
go s.Release(ctx)
}
}
}
@ -198,12 +201,15 @@ func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Wor
for _, m := range op.Exec.Mounts {
var mountable cache.Mountable
ref := g.getInputRefForIndex(int(m.Input))
if int(m.Input) > len(inputs) {
return nil, errors.Errorf("missing input %d", m.Input)
}
ref := inputs[int(m.Input)]
mountable = ref
if m.Output != -1 {
active, err := cm.New(ctx, ref) // TODO: should be method
if err != nil {
return err
return nil, err
}
outputs = append(outputs, active)
mountable = active
@ -223,19 +229,19 @@ func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Wor
defer stderr.Close()
if err := w.Exec(ctx, meta, mounts, stdout, stderr); err != nil {
return errors.Wrapf(err, "worker failed running %v", meta.Args)
return nil, errors.Wrapf(err, "worker failed running %v", meta.Args)
}
g.refs = []cache.ImmutableRef{}
refs := []cache.ImmutableRef{}
for i, o := range outputs {
ref, err := o.ReleaseAndCommit(ctx)
if err != nil {
return errors.Wrapf(err, "error committing %s", o.ID())
return nil, errors.Wrapf(err, "error committing %s", o.ID())
}
g.refs = append(g.refs, ref)
refs = append(refs, ref)
outputs[i] = nil
}
return nil
return refs, nil
}
func (g *opVertex) notifyStarted(ctx context.Context) {

View File

@ -64,6 +64,7 @@ type call struct {
closeProgressWriter func()
progressState *progressState
progressCtx context.Context
}
func newCall(fn func(ctx context.Context) (interface{}, error)) *call {
@ -73,8 +74,9 @@ func newCall(fn func(ctx context.Context) (interface{}, error)) *call {
progressState: newProgressState(),
}
ctx := newContext(c) // newSharedContext
pr, _, closeProgressWriter := progress.NewContext(ctx)
pr, pctx, closeProgressWriter := progress.NewContext(context.Background())
c.progressCtx = pctx
c.ctx = ctx
c.closeProgressWriter = closeProgressWriter
@ -175,7 +177,7 @@ func (c *call) Err() error {
func (c *call) Value(key interface{}) interface{} {
c.mu.Lock()
defer c.mu.Unlock()
for _, ctx := range append([]context.Context{}, c.ctxs...) {
for _, ctx := range append([]context.Context{c.progressCtx}, c.ctxs...) {
select {
case <-ctx.Done():
default:

View File

@ -69,7 +69,7 @@ func (mr *MultiReader) handle() error {
mr.mu.Lock()
for _, p := range p {
for w := range mr.writers {
w.WriteRawProgress(p)
w.writeRawProgress(p)
}
}
mr.mu.Unlock()

View File

@ -183,7 +183,7 @@ func (pw *progressWriter) Write(id string, v interface{}) error {
if pw.done {
return errors.Errorf("writing %s to closed progress writer", id)
}
return pw.WriteRawProgress(&Progress{
return pw.writeRawProgress(&Progress{
ID: id,
Timestamp: time.Now(),
Sys: v,
@ -192,6 +192,11 @@ func (pw *progressWriter) Write(id string, v interface{}) error {
}
func (pw *progressWriter) WriteRawProgress(p *Progress) error {
p.meta = pw.meta
return pw.writeRawProgress(p)
}
func (pw *progressWriter) writeRawProgress(p *Progress) error {
pw.reader.mu.Lock()
pw.reader.dirty[p.ID] = p
pw.reader.mu.Unlock()