diff --git a/solver/refcache.go b/solver/refcache.go index 4b8d3116..134afa0c 100644 --- a/solver/refcache.go +++ b/solver/refcache.go @@ -5,6 +5,7 @@ import ( "github.com/moby/buildkit/cache" "github.com/moby/buildkit/util/flightcontrol" + "github.com/moby/buildkit/util/progress" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/net/context" @@ -20,8 +21,9 @@ type refCache struct { } type cachedReq struct { - jobs map[*job]struct{} - value []*sharedRef + jobs map[*job]struct{} + value []*sharedRef + progressCtx context.Context } func (c *refCache) probe(j *job, key digest.Digest) bool { @@ -59,13 +61,14 @@ func (c *refCache) get(key digest.Digest) ([]cache.ImmutableRef, error) { } return refs, nil } -func (c *refCache) set(key digest.Digest, refs []cache.ImmutableRef) { +func (c *refCache) set(ctx context.Context, key digest.Digest, refs []cache.ImmutableRef) { c.mu.Lock() sharedRefs := make([]*sharedRef, 0, len(refs)) for _, r := range refs { sharedRefs = append(sharedRefs, newSharedRef(r)) } c.cache[key].value = sharedRefs + c.cache[key].progressCtx = ctx c.mu.Unlock() } func (c *refCache) cancel(j *job) { @@ -84,6 +87,24 @@ func (c *refCache) cancel(j *job) { c.mu.Unlock() } +func (c *refCache) writeProgressSnapshot(ctx context.Context, key digest.Digest) error { + pw, ok, _ := progress.FromContext(ctx) + if ok { + c.mu.Lock() + v, ok := c.cache[key] + if !ok { + c.mu.Unlock() + return errors.Errorf("no ref cache found") + } + pctx := v.progressCtx + c.mu.Unlock() + if pctx != nil { + return flightcontrol.WriteProgress(pctx, pw) + } + } + return nil +} + // sharedRef is a wrapper around releasable that allows you to make new // releasable child objects type sharedRef struct { diff --git a/solver/solver.go b/solver/solver.go index d40f1b07..889fc92f 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -133,14 +133,16 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac _, err := s.active.Do(ctx, g.dgst.String(), func(ctx context.Context) (interface{}, error) { if hit := s.active.probe(j, g.dgst); hit { + if err := s.active.writeProgressSnapshot(ctx, g.dgst); err != nil { + return nil, err + } return nil, nil } refs, err := s.runVertex(ctx, g, inputs) if err != nil { return nil, err } - - s.active.set(g.dgst, refs) + s.active.set(ctx, g.dgst, refs) return nil, nil }) if err != nil { diff --git a/util/flightcontrol/flightcontrol.go b/util/flightcontrol/flightcontrol.go index 0fdd368e..97fd9a76 100644 --- a/util/flightcontrol/flightcontrol.go +++ b/util/flightcontrol/flightcontrol.go @@ -17,6 +17,10 @@ import ( var errRetry = errors.Errorf("retry") +type contextKeyT string + +var contextKey = contextKeyT("buildkit/util/flightcontrol.progress") + type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized @@ -167,6 +171,9 @@ func (c *call) Err() error { } func (c *call) Value(key interface{}) interface{} { + if key == contextKey { + return c.progressState + } c.mu.Lock() defer c.mu.Unlock() for _, ctx := range append([]context.Context{c.progressCtx}, c.ctxs...) { @@ -292,3 +299,13 @@ func (ps *progressState) close(pw progress.Writer) { } ps.mu.Unlock() } + +func WriteProgress(ctx context.Context, pw progress.Writer) error { + v := ctx.Value(contextKey) + p, ok := v.(*progressState) + if !ok { + return errors.Errorf("invalid context not from flightcontrol") + } + p.add(pw) + return nil +}