solver: cache vertex progress for replays
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-18.09
parent
878a0a74e2
commit
65c2f8ef4c
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
"github.com/moby/buildkit/cache"
|
"github.com/moby/buildkit/cache"
|
||||||
"github.com/moby/buildkit/util/flightcontrol"
|
"github.com/moby/buildkit/util/flightcontrol"
|
||||||
|
"github.com/moby/buildkit/util/progress"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
@ -20,8 +21,9 @@ type refCache struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type cachedReq struct {
|
type cachedReq struct {
|
||||||
jobs map[*job]struct{}
|
jobs map[*job]struct{}
|
||||||
value []*sharedRef
|
value []*sharedRef
|
||||||
|
progressCtx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *refCache) probe(j *job, key digest.Digest) bool {
|
func (c *refCache) probe(j *job, key digest.Digest) bool {
|
||||||
|
@ -59,13 +61,14 @@ func (c *refCache) get(key digest.Digest) ([]cache.ImmutableRef, error) {
|
||||||
}
|
}
|
||||||
return refs, nil
|
return refs, nil
|
||||||
}
|
}
|
||||||
func (c *refCache) set(key digest.Digest, refs []cache.ImmutableRef) {
|
func (c *refCache) set(ctx context.Context, key digest.Digest, refs []cache.ImmutableRef) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
sharedRefs := make([]*sharedRef, 0, len(refs))
|
sharedRefs := make([]*sharedRef, 0, len(refs))
|
||||||
for _, r := range refs {
|
for _, r := range refs {
|
||||||
sharedRefs = append(sharedRefs, newSharedRef(r))
|
sharedRefs = append(sharedRefs, newSharedRef(r))
|
||||||
}
|
}
|
||||||
c.cache[key].value = sharedRefs
|
c.cache[key].value = sharedRefs
|
||||||
|
c.cache[key].progressCtx = ctx
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
func (c *refCache) cancel(j *job) {
|
func (c *refCache) cancel(j *job) {
|
||||||
|
@ -84,6 +87,24 @@ func (c *refCache) cancel(j *job) {
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *refCache) writeProgressSnapshot(ctx context.Context, key digest.Digest) error {
|
||||||
|
pw, ok, _ := progress.FromContext(ctx)
|
||||||
|
if ok {
|
||||||
|
c.mu.Lock()
|
||||||
|
v, ok := c.cache[key]
|
||||||
|
if !ok {
|
||||||
|
c.mu.Unlock()
|
||||||
|
return errors.Errorf("no ref cache found")
|
||||||
|
}
|
||||||
|
pctx := v.progressCtx
|
||||||
|
c.mu.Unlock()
|
||||||
|
if pctx != nil {
|
||||||
|
return flightcontrol.WriteProgress(pctx, pw)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// sharedRef is a wrapper around releasable that allows you to make new
|
// sharedRef is a wrapper around releasable that allows you to make new
|
||||||
// releasable child objects
|
// releasable child objects
|
||||||
type sharedRef struct {
|
type sharedRef struct {
|
||||||
|
|
|
@ -133,14 +133,16 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac
|
||||||
|
|
||||||
_, err := s.active.Do(ctx, g.dgst.String(), func(ctx context.Context) (interface{}, error) {
|
_, err := s.active.Do(ctx, g.dgst.String(), func(ctx context.Context) (interface{}, error) {
|
||||||
if hit := s.active.probe(j, g.dgst); hit {
|
if hit := s.active.probe(j, g.dgst); hit {
|
||||||
|
if err := s.active.writeProgressSnapshot(ctx, g.dgst); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
refs, err := s.runVertex(ctx, g, inputs)
|
refs, err := s.runVertex(ctx, g, inputs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
s.active.set(ctx, g.dgst, refs)
|
||||||
s.active.set(g.dgst, refs)
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -17,6 +17,10 @@ import (
|
||||||
|
|
||||||
var errRetry = errors.Errorf("retry")
|
var errRetry = errors.Errorf("retry")
|
||||||
|
|
||||||
|
type contextKeyT string
|
||||||
|
|
||||||
|
var contextKey = contextKeyT("buildkit/util/flightcontrol.progress")
|
||||||
|
|
||||||
type Group struct {
|
type Group struct {
|
||||||
mu sync.Mutex // protects m
|
mu sync.Mutex // protects m
|
||||||
m map[string]*call // lazily initialized
|
m map[string]*call // lazily initialized
|
||||||
|
@ -167,6 +171,9 @@ func (c *call) Err() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *call) Value(key interface{}) interface{} {
|
func (c *call) Value(key interface{}) interface{} {
|
||||||
|
if key == contextKey {
|
||||||
|
return c.progressState
|
||||||
|
}
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
for _, ctx := range append([]context.Context{c.progressCtx}, c.ctxs...) {
|
for _, ctx := range append([]context.Context{c.progressCtx}, c.ctxs...) {
|
||||||
|
@ -292,3 +299,13 @@ func (ps *progressState) close(pw progress.Writer) {
|
||||||
}
|
}
|
||||||
ps.mu.Unlock()
|
ps.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WriteProgress(ctx context.Context, pw progress.Writer) error {
|
||||||
|
v := ctx.Value(contextKey)
|
||||||
|
p, ok := v.(*progressState)
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("invalid context not from flightcontrol")
|
||||||
|
}
|
||||||
|
p.add(pw)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue