progress: use unique id for vertex start/stop
This commit fixes more issues where vertex progress updates were using the same ID and incorrectly deduplicating with each other. A previous commit addressed this in solver/jobs.go but the same problem existed elsewhere. The update results in a random ID being used for vertex start/stop events as it is the simplest approach that works everywhere in a consistent pattern. Signed-off-by: Erik Sipsma <erik@sipsma.dev>master
parent
c393d5c66d
commit
a96f9a9f1f
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/identity"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver/errdefs"
|
||||
"github.com/moby/buildkit/util/flightcontrol"
|
||||
|
@ -417,7 +418,7 @@ func (jl *Solver) connectProgressFromState(target, src *state) {
|
|||
if _, ok := target.allPw[j.pw]; !ok {
|
||||
target.mpw.Add(j.pw)
|
||||
target.allPw[j.pw] = struct{}{}
|
||||
j.pw.Write(target.clientVertex.Digest.String(), target.clientVertex)
|
||||
j.pw.Write(identity.NewID(), target.clientVertex)
|
||||
if j.span != nil && j.span.SpanContext().IsValid() {
|
||||
target.mspan.Add(j.span)
|
||||
}
|
||||
|
@ -647,10 +648,10 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err
|
|||
}
|
||||
// no cache hit. start evaluating the node
|
||||
span, ctx := tracing.StartSpan(ctx, "load cache: "+s.st.vtx.Name())
|
||||
notifyStarted(ctx, &s.st.clientVertex, true, "load-cache")
|
||||
notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, true)
|
||||
res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s.st), rec)
|
||||
tracing.FinishWithError(span, err)
|
||||
notifyCompleted(ctx, &s.st.clientVertex, err, true, "load-cache")
|
||||
notifyCompleted(err, true)
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -727,8 +728,8 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF
|
|||
if s.st.mspan.Span != nil {
|
||||
ctx = trace.ContextWithSpan(ctx, s.st.mspan)
|
||||
}
|
||||
notifyStarted(ctx, &s.st.clientVertex, false, flightControlKey)
|
||||
notifyCompleted(ctx, &s.st.clientVertex, err, false, flightControlKey)
|
||||
notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, false)
|
||||
notifyCompleted(err, false)
|
||||
return "", err
|
||||
}
|
||||
return key.(digest.Digest), nil
|
||||
|
@ -759,10 +760,10 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp,
|
|||
if len(s.st.vtx.Inputs()) == 0 {
|
||||
// no cache hit. start evaluating the node
|
||||
span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name())
|
||||
notifyStarted(ctx, &s.st.clientVertex, false, flightControlKey)
|
||||
notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, false)
|
||||
defer func() {
|
||||
tracing.FinishWithError(span, retErr)
|
||||
notifyCompleted(ctx, &s.st.clientVertex, retErr, false, flightControlKey)
|
||||
notifyCompleted(retErr, false)
|
||||
}()
|
||||
}
|
||||
res, done, err := op.CacheMap(ctx, s.st, len(s.cacheRes))
|
||||
|
@ -826,10 +827,10 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
|
|||
|
||||
// no cache hit. start evaluating the node
|
||||
span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name())
|
||||
notifyStarted(ctx, &s.st.clientVertex, false, flightControlKey)
|
||||
notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, false)
|
||||
defer func() {
|
||||
tracing.FinishWithError(span, retErr)
|
||||
notifyCompleted(ctx, &s.st.clientVertex, retErr, false, flightControlKey)
|
||||
notifyCompleted(retErr, false)
|
||||
}()
|
||||
|
||||
res, err := op.Exec(ctx, s.st, inputs)
|
||||
|
@ -929,33 +930,26 @@ func (v *vertexWithCacheOptions) Inputs() []Edge {
|
|||
return v.inputs
|
||||
}
|
||||
|
||||
func notifyStarted(ctx context.Context, v *client.Vertex, cached bool, idSuffixes ...string) {
|
||||
func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) func(err error, cached bool) {
|
||||
pw, _, _ := progress.NewFromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
v.Started = &now
|
||||
start := time.Now()
|
||||
v.Started = &start
|
||||
v.Completed = nil
|
||||
v.Cached = cached
|
||||
id := v.Digest.String() + strings.Join(idSuffixes, "-")
|
||||
id := identity.NewID()
|
||||
pw.Write(id, *v)
|
||||
}
|
||||
|
||||
func notifyCompleted(ctx context.Context, v *client.Vertex, err error, cached bool, idSuffixes ...string) {
|
||||
pw, _, _ := progress.NewFromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
if v.Started == nil {
|
||||
v.Started = &now
|
||||
return func(err error, cached bool) {
|
||||
defer pw.Close()
|
||||
stop := time.Now()
|
||||
v.Completed = &stop
|
||||
v.Cached = cached
|
||||
if err != nil {
|
||||
v.Error = err.Error()
|
||||
} else {
|
||||
v.Error = ""
|
||||
}
|
||||
pw.Write(id, *v)
|
||||
}
|
||||
v.Completed = &now
|
||||
v.Cached = cached
|
||||
if err != nil {
|
||||
v.Error = err.Error()
|
||||
} else {
|
||||
v.Error = ""
|
||||
}
|
||||
id := v.Digest.String() + strings.Join(idSuffixes, "-")
|
||||
pw.Write(id, *v)
|
||||
}
|
||||
|
||||
type SlowCacheError struct {
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/moby/buildkit/exporter/containerimage/exptypes"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
"github.com/moby/buildkit/frontend/gateway"
|
||||
"github.com/moby/buildkit/identity"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/util/buildinfo"
|
||||
|
@ -431,37 +432,32 @@ func inBuilderContext(ctx context.Context, b solver.Builder, name, id string, f
|
|||
}
|
||||
return b.InContext(ctx, func(ctx context.Context, g session.Group) error {
|
||||
pw, _, ctx := progress.NewFromContext(ctx, progress.WithMetadata("vertex", v.Digest))
|
||||
notifyStarted(ctx, &v, false)
|
||||
notifyCompleted := notifyStarted(ctx, &v, false)
|
||||
defer pw.Close()
|
||||
err := f(ctx, g)
|
||||
notifyCompleted(ctx, &v, err, false)
|
||||
notifyCompleted(err, false)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) {
|
||||
func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) func(err error, cached bool) {
|
||||
pw, _, _ := progress.NewFromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
v.Started = &now
|
||||
start := time.Now()
|
||||
v.Started = &start
|
||||
v.Completed = nil
|
||||
v.Cached = cached
|
||||
pw.Write(v.Digest.String(), *v)
|
||||
}
|
||||
|
||||
func notifyCompleted(ctx context.Context, v *client.Vertex, err error, cached bool) {
|
||||
pw, _, _ := progress.NewFromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
if v.Started == nil {
|
||||
v.Started = &now
|
||||
id := identity.NewID()
|
||||
pw.Write(id, *v)
|
||||
return func(err error, cached bool) {
|
||||
defer pw.Close()
|
||||
stop := time.Now()
|
||||
v.Completed = &stop
|
||||
v.Cached = cached
|
||||
if err != nil {
|
||||
v.Error = err.Error()
|
||||
}
|
||||
pw.Write(id, *v)
|
||||
}
|
||||
v.Completed = &now
|
||||
v.Cached = cached
|
||||
if err != nil {
|
||||
v.Error = err.Error()
|
||||
}
|
||||
pw.Write(v.Digest.String(), *v)
|
||||
}
|
||||
|
||||
func supportedEntitlements(ents []string) []entitlements.Entitlement {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/identity"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/util/progress"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
|
@ -16,6 +17,7 @@ type Controller struct {
|
|||
started *time.Time
|
||||
writer progress.Writer
|
||||
mu sync.Mutex
|
||||
id string
|
||||
|
||||
Digest digest.Digest
|
||||
Name string
|
||||
|
@ -34,10 +36,11 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) {
|
|||
now := time.Now()
|
||||
c.started = &now
|
||||
c.writer, _, _ = c.WriterFactory(ctx)
|
||||
c.id = identity.NewID()
|
||||
}
|
||||
|
||||
if c.Digest != "" {
|
||||
c.writer.Write(c.Digest.String(), client.Vertex{
|
||||
c.writer.Write(c.id, client.Vertex{
|
||||
Digest: c.Digest,
|
||||
Name: c.Name,
|
||||
Started: c.started,
|
||||
|
@ -56,7 +59,7 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) {
|
|||
errString = err.Error()
|
||||
}
|
||||
if c.Digest != "" {
|
||||
c.writer.Write(c.Digest.String(), client.Vertex{
|
||||
c.writer.Write(c.id, client.Vertex{
|
||||
Digest: c.Digest,
|
||||
Name: c.Name,
|
||||
Started: c.started,
|
||||
|
@ -67,6 +70,7 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) {
|
|||
}
|
||||
c.writer.Close()
|
||||
c.started = nil
|
||||
c.id = ""
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue