From a96f9a9f1fc0094c96bcd735dbc5f003bb372fdd Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Thu, 24 Feb 2022 12:30:41 -0800 Subject: [PATCH] progress: use unique id for vertex start/stop This commit fixes more issues where vertex progress updates were using the same ID and incorrectly deduplicating with each other. A previous commit addressed this in solver/jobs.go but the same problem existed elsewhere. The update results in a random ID being used for vertex start/stop events as it is the simplest approach that works everywhere in a consistent pattern. Signed-off-by: Erik Sipsma --- solver/jobs.go | 56 ++++++++++++-------------- solver/llbsolver/solver.go | 38 ++++++++--------- util/progress/controller/controller.go | 8 +++- 3 files changed, 48 insertions(+), 54 deletions(-) diff --git a/solver/jobs.go b/solver/jobs.go index 37df6fa9..0d59fb36 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -8,6 +8,7 @@ import ( "time" "github.com/moby/buildkit/client" + "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver/errdefs" "github.com/moby/buildkit/util/flightcontrol" @@ -417,7 +418,7 @@ func (jl *Solver) connectProgressFromState(target, src *state) { if _, ok := target.allPw[j.pw]; !ok { target.mpw.Add(j.pw) target.allPw[j.pw] = struct{}{} - j.pw.Write(target.clientVertex.Digest.String(), target.clientVertex) + j.pw.Write(identity.NewID(), target.clientVertex) if j.span != nil && j.span.SpanContext().IsValid() { target.mspan.Add(j.span) } @@ -647,10 +648,10 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err } // no cache hit. start evaluating the node span, ctx := tracing.StartSpan(ctx, "load cache: "+s.st.vtx.Name()) - notifyStarted(ctx, &s.st.clientVertex, true, "load-cache") + notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, true) res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s.st), rec) tracing.FinishWithError(span, err) - notifyCompleted(ctx, &s.st.clientVertex, err, true, "load-cache") + notifyCompleted(err, true) return res, err } @@ -727,8 +728,8 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF if s.st.mspan.Span != nil { ctx = trace.ContextWithSpan(ctx, s.st.mspan) } - notifyStarted(ctx, &s.st.clientVertex, false, flightControlKey) - notifyCompleted(ctx, &s.st.clientVertex, err, false, flightControlKey) + notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, false) + notifyCompleted(err, false) return "", err } return key.(digest.Digest), nil @@ -759,10 +760,10 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp, if len(s.st.vtx.Inputs()) == 0 { // no cache hit. start evaluating the node span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name()) - notifyStarted(ctx, &s.st.clientVertex, false, flightControlKey) + notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, false) defer func() { tracing.FinishWithError(span, retErr) - notifyCompleted(ctx, &s.st.clientVertex, retErr, false, flightControlKey) + notifyCompleted(retErr, false) }() } res, done, err := op.CacheMap(ctx, s.st, len(s.cacheRes)) @@ -826,10 +827,10 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, // no cache hit. start evaluating the node span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name()) - notifyStarted(ctx, &s.st.clientVertex, false, flightControlKey) + notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, false) defer func() { tracing.FinishWithError(span, retErr) - notifyCompleted(ctx, &s.st.clientVertex, retErr, false, flightControlKey) + notifyCompleted(retErr, false) }() res, err := op.Exec(ctx, s.st, inputs) @@ -929,33 +930,26 @@ func (v *vertexWithCacheOptions) Inputs() []Edge { return v.inputs } -func notifyStarted(ctx context.Context, v *client.Vertex, cached bool, idSuffixes ...string) { +func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) func(err error, cached bool) { pw, _, _ := progress.NewFromContext(ctx) - defer pw.Close() - now := time.Now() - v.Started = &now + start := time.Now() + v.Started = &start v.Completed = nil v.Cached = cached - id := v.Digest.String() + strings.Join(idSuffixes, "-") + id := identity.NewID() pw.Write(id, *v) -} - -func notifyCompleted(ctx context.Context, v *client.Vertex, err error, cached bool, idSuffixes ...string) { - pw, _, _ := progress.NewFromContext(ctx) - defer pw.Close() - now := time.Now() - if v.Started == nil { - v.Started = &now + return func(err error, cached bool) { + defer pw.Close() + stop := time.Now() + v.Completed = &stop + v.Cached = cached + if err != nil { + v.Error = err.Error() + } else { + v.Error = "" + } + pw.Write(id, *v) } - v.Completed = &now - v.Cached = cached - if err != nil { - v.Error = err.Error() - } else { - v.Error = "" - } - id := v.Digest.String() + strings.Join(idSuffixes, "-") - pw.Write(id, *v) } type SlowCacheError struct { diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 0a203f1f..7e2fb174 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -16,6 +16,7 @@ import ( "github.com/moby/buildkit/exporter/containerimage/exptypes" "github.com/moby/buildkit/frontend" "github.com/moby/buildkit/frontend/gateway" + "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/buildinfo" @@ -431,37 +432,32 @@ func inBuilderContext(ctx context.Context, b solver.Builder, name, id string, f } return b.InContext(ctx, func(ctx context.Context, g session.Group) error { pw, _, ctx := progress.NewFromContext(ctx, progress.WithMetadata("vertex", v.Digest)) - notifyStarted(ctx, &v, false) + notifyCompleted := notifyStarted(ctx, &v, false) defer pw.Close() err := f(ctx, g) - notifyCompleted(ctx, &v, err, false) + notifyCompleted(err, false) return err }) } -func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) { +func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) func(err error, cached bool) { pw, _, _ := progress.NewFromContext(ctx) - defer pw.Close() - now := time.Now() - v.Started = &now + start := time.Now() + v.Started = &start v.Completed = nil v.Cached = cached - pw.Write(v.Digest.String(), *v) -} - -func notifyCompleted(ctx context.Context, v *client.Vertex, err error, cached bool) { - pw, _, _ := progress.NewFromContext(ctx) - defer pw.Close() - now := time.Now() - if v.Started == nil { - v.Started = &now + id := identity.NewID() + pw.Write(id, *v) + return func(err error, cached bool) { + defer pw.Close() + stop := time.Now() + v.Completed = &stop + v.Cached = cached + if err != nil { + v.Error = err.Error() + } + pw.Write(id, *v) } - v.Completed = &now - v.Cached = cached - if err != nil { - v.Error = err.Error() - } - pw.Write(v.Digest.String(), *v) } func supportedEntitlements(ents []string) []entitlements.Entitlement { diff --git a/util/progress/controller/controller.go b/util/progress/controller/controller.go index df9e4281..3d9693d4 100644 --- a/util/progress/controller/controller.go +++ b/util/progress/controller/controller.go @@ -6,6 +6,7 @@ import ( "time" "github.com/moby/buildkit/client" + "github.com/moby/buildkit/identity" "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/progress" digest "github.com/opencontainers/go-digest" @@ -16,6 +17,7 @@ type Controller struct { started *time.Time writer progress.Writer mu sync.Mutex + id string Digest digest.Digest Name string @@ -34,10 +36,11 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) { now := time.Now() c.started = &now c.writer, _, _ = c.WriterFactory(ctx) + c.id = identity.NewID() } if c.Digest != "" { - c.writer.Write(c.Digest.String(), client.Vertex{ + c.writer.Write(c.id, client.Vertex{ Digest: c.Digest, Name: c.Name, Started: c.started, @@ -56,7 +59,7 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) { errString = err.Error() } if c.Digest != "" { - c.writer.Write(c.Digest.String(), client.Vertex{ + c.writer.Write(c.id, client.Vertex{ Digest: c.Digest, Name: c.Name, Started: c.started, @@ -67,6 +70,7 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) { } c.writer.Close() c.started = nil + c.id = "" } } }