Merge pull request #2675 from sipsma/fix-vertex-startstop

progress: use unique id for vertex start/stop
master
Tõnis Tiigi 2022-02-24 20:41:23 -08:00 committed by GitHub
commit 27cc24a648
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 54 deletions

View File

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/errdefs" "github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/flightcontrol"
@ -417,7 +418,7 @@ func (jl *Solver) connectProgressFromState(target, src *state) {
if _, ok := target.allPw[j.pw]; !ok { if _, ok := target.allPw[j.pw]; !ok {
target.mpw.Add(j.pw) target.mpw.Add(j.pw)
target.allPw[j.pw] = struct{}{} target.allPw[j.pw] = struct{}{}
j.pw.Write(target.clientVertex.Digest.String(), target.clientVertex) j.pw.Write(identity.NewID(), target.clientVertex)
if j.span != nil && j.span.SpanContext().IsValid() { if j.span != nil && j.span.SpanContext().IsValid() {
target.mspan.Add(j.span) target.mspan.Add(j.span)
} }
@ -647,10 +648,10 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err
} }
// no cache hit. start evaluating the node // no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "load cache: "+s.st.vtx.Name()) span, ctx := tracing.StartSpan(ctx, "load cache: "+s.st.vtx.Name())
notifyStarted(ctx, &s.st.clientVertex, true, "load-cache") notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, true)
res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s.st), rec) res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s.st), rec)
tracing.FinishWithError(span, err) tracing.FinishWithError(span, err)
notifyCompleted(ctx, &s.st.clientVertex, err, true, "load-cache") notifyCompleted(err, true)
return res, err return res, err
} }
@ -727,8 +728,8 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF
if s.st.mspan.Span != nil { if s.st.mspan.Span != nil {
ctx = trace.ContextWithSpan(ctx, s.st.mspan) ctx = trace.ContextWithSpan(ctx, s.st.mspan)
} }
notifyStarted(ctx, &s.st.clientVertex, false, flightControlKey) notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, false)
notifyCompleted(ctx, &s.st.clientVertex, err, false, flightControlKey) notifyCompleted(err, false)
return "", err return "", err
} }
return key.(digest.Digest), nil return key.(digest.Digest), nil
@ -759,10 +760,10 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp,
if len(s.st.vtx.Inputs()) == 0 { if len(s.st.vtx.Inputs()) == 0 {
// no cache hit. start evaluating the node // no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name()) span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name())
notifyStarted(ctx, &s.st.clientVertex, false, flightControlKey) notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, false)
defer func() { defer func() {
tracing.FinishWithError(span, retErr) tracing.FinishWithError(span, retErr)
notifyCompleted(ctx, &s.st.clientVertex, retErr, false, flightControlKey) notifyCompleted(retErr, false)
}() }()
} }
res, done, err := op.CacheMap(ctx, s.st, len(s.cacheRes)) res, done, err := op.CacheMap(ctx, s.st, len(s.cacheRes))
@ -826,10 +827,10 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
// no cache hit. start evaluating the node // no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name()) span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name())
notifyStarted(ctx, &s.st.clientVertex, false, flightControlKey) notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, false)
defer func() { defer func() {
tracing.FinishWithError(span, retErr) tracing.FinishWithError(span, retErr)
notifyCompleted(ctx, &s.st.clientVertex, retErr, false, flightControlKey) notifyCompleted(retErr, false)
}() }()
res, err := op.Exec(ctx, s.st, inputs) res, err := op.Exec(ctx, s.st, inputs)
@ -929,33 +930,26 @@ func (v *vertexWithCacheOptions) Inputs() []Edge {
return v.inputs return v.inputs
} }
func notifyStarted(ctx context.Context, v *client.Vertex, cached bool, idSuffixes ...string) { func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) func(err error, cached bool) {
pw, _, _ := progress.NewFromContext(ctx) pw, _, _ := progress.NewFromContext(ctx)
defer pw.Close() start := time.Now()
now := time.Now() v.Started = &start
v.Started = &now
v.Completed = nil v.Completed = nil
v.Cached = cached v.Cached = cached
id := v.Digest.String() + strings.Join(idSuffixes, "-") id := identity.NewID()
pw.Write(id, *v) pw.Write(id, *v)
} return func(err error, cached bool) {
func notifyCompleted(ctx context.Context, v *client.Vertex, err error, cached bool, idSuffixes ...string) {
pw, _, _ := progress.NewFromContext(ctx)
defer pw.Close() defer pw.Close()
now := time.Now() stop := time.Now()
if v.Started == nil { v.Completed = &stop
v.Started = &now
}
v.Completed = &now
v.Cached = cached v.Cached = cached
if err != nil { if err != nil {
v.Error = err.Error() v.Error = err.Error()
} else { } else {
v.Error = "" v.Error = ""
} }
id := v.Digest.String() + strings.Join(idSuffixes, "-")
pw.Write(id, *v) pw.Write(id, *v)
}
} }
type SlowCacheError struct { type SlowCacheError struct {

View File

@ -16,6 +16,7 @@ import (
"github.com/moby/buildkit/exporter/containerimage/exptypes" "github.com/moby/buildkit/exporter/containerimage/exptypes"
"github.com/moby/buildkit/frontend" "github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/gateway" "github.com/moby/buildkit/frontend/gateway"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/buildinfo" "github.com/moby/buildkit/util/buildinfo"
@ -431,37 +432,32 @@ func inBuilderContext(ctx context.Context, b solver.Builder, name, id string, f
} }
return b.InContext(ctx, func(ctx context.Context, g session.Group) error { return b.InContext(ctx, func(ctx context.Context, g session.Group) error {
pw, _, ctx := progress.NewFromContext(ctx, progress.WithMetadata("vertex", v.Digest)) pw, _, ctx := progress.NewFromContext(ctx, progress.WithMetadata("vertex", v.Digest))
notifyStarted(ctx, &v, false) notifyCompleted := notifyStarted(ctx, &v, false)
defer pw.Close() defer pw.Close()
err := f(ctx, g) err := f(ctx, g)
notifyCompleted(ctx, &v, err, false) notifyCompleted(err, false)
return err return err
}) })
} }
func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) { func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) func(err error, cached bool) {
pw, _, _ := progress.NewFromContext(ctx) pw, _, _ := progress.NewFromContext(ctx)
defer pw.Close() start := time.Now()
now := time.Now() v.Started = &start
v.Started = &now
v.Completed = nil v.Completed = nil
v.Cached = cached v.Cached = cached
pw.Write(v.Digest.String(), *v) id := identity.NewID()
} pw.Write(id, *v)
return func(err error, cached bool) {
func notifyCompleted(ctx context.Context, v *client.Vertex, err error, cached bool) {
pw, _, _ := progress.NewFromContext(ctx)
defer pw.Close() defer pw.Close()
now := time.Now() stop := time.Now()
if v.Started == nil { v.Completed = &stop
v.Started = &now
}
v.Completed = &now
v.Cached = cached v.Cached = cached
if err != nil { if err != nil {
v.Error = err.Error() v.Error = err.Error()
} }
pw.Write(v.Digest.String(), *v) pw.Write(id, *v)
}
} }
func supportedEntitlements(ents []string) []entitlements.Entitlement { func supportedEntitlements(ents []string) []entitlements.Entitlement {

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
@ -16,6 +17,7 @@ type Controller struct {
started *time.Time started *time.Time
writer progress.Writer writer progress.Writer
mu sync.Mutex mu sync.Mutex
id string
Digest digest.Digest Digest digest.Digest
Name string Name string
@ -34,10 +36,11 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) {
now := time.Now() now := time.Now()
c.started = &now c.started = &now
c.writer, _, _ = c.WriterFactory(ctx) c.writer, _, _ = c.WriterFactory(ctx)
c.id = identity.NewID()
} }
if c.Digest != "" { if c.Digest != "" {
c.writer.Write(c.Digest.String(), client.Vertex{ c.writer.Write(c.id, client.Vertex{
Digest: c.Digest, Digest: c.Digest,
Name: c.Name, Name: c.Name,
Started: c.started, Started: c.started,
@ -56,7 +59,7 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) {
errString = err.Error() errString = err.Error()
} }
if c.Digest != "" { if c.Digest != "" {
c.writer.Write(c.Digest.String(), client.Vertex{ c.writer.Write(c.id, client.Vertex{
Digest: c.Digest, Digest: c.Digest,
Name: c.Name, Name: c.Name,
Started: c.started, Started: c.started,
@ -67,6 +70,7 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) {
} }
c.writer.Close() c.writer.Close()
c.started = nil c.started = nil
c.id = ""
} }
} }
} }