From 39e19516fcd067be3e3b85f4de76521f511a9af2 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 29 May 2018 19:49:51 -0700 Subject: [PATCH] progressui: add better streaming text build status Signed-off-by: Tonis Tiigi --- cmd/buildctl/build.go | 21 +-- examples/build-using-dockerfile/main.go | 9 +- util/progress/progressui/display.go | 157 ++++++++++++++--- util/progress/progressui/printer.go | 214 ++++++++++++++++++++++++ 4 files changed, 360 insertions(+), 41 deletions(-) create mode 100644 util/progress/progressui/printer.go diff --git a/cmd/buildctl/build.go b/cmd/buildctl/build.go index 9e167193..6462faef 100644 --- a/cmd/buildctl/build.go +++ b/cmd/buildctl/build.go @@ -199,25 +199,14 @@ func build(clicontext *cli.Context) error { }) eg.Go(func() error { + var c console.Console if !clicontext.Bool("no-progress") { - if c, err := console.ConsoleFromFile(os.Stderr); err == nil { - // not using shared context to not disrupt display but let is finish reporting errors - return progressui.DisplaySolveStatus(context.TODO(), c, displayCh) + if cf, err := console.ConsoleFromFile(os.Stderr); err == nil { + c = cf } } - - for s := range displayCh { - for _, v := range s.Vertexes { - logrus.Debugf("vertex: %s %s %v %v", v.Digest, v.Name, v.Started, v.Completed) - } - for _, s := range s.Statuses { - logrus.Debugf("status: %s %s %d", s.Vertex, s.ID, s.Current) - } - for _, l := range s.Logs { - logrus.Debugf("log: %s\n%s", l.Vertex, l.Data) - } - } - return nil + // not using shared context to not disrupt display but let is finish reporting errors + return progressui.DisplaySolveStatus(context.TODO(), c, os.Stdout, displayCh) }) return eg.Wait() diff --git a/examples/build-using-dockerfile/main.go b/examples/build-using-dockerfile/main.go index 1772e52c..08143a8e 100644 --- a/examples/build-using-dockerfile/main.go +++ b/examples/build-using-dockerfile/main.go @@ -85,11 +85,12 @@ func action(clicontext *cli.Context) error { return err }) eg.Go(func() error { - if c, err := console.ConsoleFromFile(os.Stderr); err == nil { - // not using shared context to not disrupt display but let is finish reporting errors - return progressui.DisplaySolveStatus(context.TODO(), c, ch) + var c console.Console + if cn, err := console.ConsoleFromFile(os.Stderr); err == nil { + c = cn } - return nil + // not using shared context to not disrupt display but let is finish reporting errors + return progressui.DisplaySolveStatus(context.TODO(), c, os.Stdout, ch) }) eg.Go(func() error { if err := loadDockerTar(pipeR); err != nil { diff --git a/util/progress/progressui/display.go b/util/progress/progressui/display.go index 3f27a6f3..0a22ddd4 100644 --- a/util/progress/progressui/display.go +++ b/util/progress/progressui/display.go @@ -1,6 +1,7 @@ package progressui import ( + "bytes" "context" "fmt" "io" @@ -15,17 +16,21 @@ import ( "golang.org/x/time/rate" ) -func DisplaySolveStatus(ctx context.Context, c console.Console, ch chan *client.SolveStatus) error { - disp := &display{c: c} +func DisplaySolveStatus(ctx context.Context, c console.Console, w io.Writer, ch chan *client.SolveStatus) error { - t := newTrace() + modeConsole := c != nil + + disp := &display{c: c} + printer := &textMux{w: w} + + t := newTrace(w) + + var done bool ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() displayLimiter := rate.NewLimiter(rate.Every(70*time.Millisecond), 1) - var done bool - for { select { case <-ctx.Done(): @@ -39,12 +44,21 @@ func DisplaySolveStatus(ctx context.Context, c console.Console, ch chan *client. } } - if done { - disp.print(t.displayInfo(), true) - t.printErrorLogs(c) - return nil - } else if displayLimiter.Allow() { - disp.print(t.displayInfo(), false) + if modeConsole { + if done { + disp.print(t.displayInfo(), true) + t.printErrorLogs(c) + return nil + } else if displayLimiter.Allow() { + disp.print(t.displayInfo(), false) + } + } else { + if done || displayLimiter.Allow() { + printer.print(t) + if done { + return nil + } + } } } } @@ -66,37 +80,108 @@ type job struct { } type trace struct { + w io.Writer localTimeDiff time.Duration vertexes []*vertex byDigest map[digest.Digest]*vertex + nextIndex int + updates map[digest.Digest]struct{} } type vertex struct { *client.Vertex statuses []*status byID map[string]*status - logs []*client.VertexLog indent string + index int + + logs [][]byte + logsPartial bool + logsOffset int + prev *client.Vertex + events []string + lastBlockTime *time.Time + count int + statusUpdates map[string]struct{} +} + +func (v *vertex) update(c int) { + if v.count == 0 { + now := time.Now() + v.lastBlockTime = &now + } + v.count += c } type status struct { *client.VertexStatus } -func newTrace() *trace { +func newTrace(w io.Writer) *trace { return &trace{ byDigest: make(map[digest.Digest]*vertex), + updates: make(map[digest.Digest]struct{}), + w: w, } } +func (t *trace) triggerVertexEvent(v *client.Vertex) { + if v.Started == nil { + return + } + + var old client.Vertex + vtx := t.byDigest[v.Digest] + if v := vtx.prev; v != nil { + old = *v + } + + var ev []string + if v.Digest != old.Digest { + ev = append(ev, fmt.Sprintf("%13s %s", "digest:", v.Digest)) + } + if v.Name != old.Name { + ev = append(ev, fmt.Sprintf("%13s %q", "name:", v.Name)) + } + if v.Started != old.Started { + if v.Started != nil && old.Started == nil || !v.Started.Equal(*old.Started) { + ev = append(ev, fmt.Sprintf("%13s %v", "started:", v.Started)) + } + } + if v.Completed != old.Completed && v.Completed != nil { + ev = append(ev, fmt.Sprintf("%13s %v", "completed:", v.Completed)) + if v.Started != nil { + ev = append(ev, fmt.Sprintf("%13s %v", "duration:", v.Completed.Sub(*v.Started))) + } + } + if v.Cached != old.Cached { + ev = append(ev, fmt.Sprintf("%13s %v", "cached:", v.Cached)) + } + if v.Error != old.Error { + ev = append(ev, fmt.Sprintf("%13s %q", "error:", v.Error)) + } + + if len(ev) > 0 { + vtx.events = append(vtx.events, ev...) + vtx.update(len(ev)) + t.updates[v.Digest] = struct{}{} + } + + t.byDigest[v.Digest].prev = v +} + func (t *trace) update(s *client.SolveStatus) { for _, v := range s.Vertexes { prev, ok := t.byDigest[v.Digest] if !ok { + t.nextIndex++ t.byDigest[v.Digest] = &vertex{ - byID: make(map[string]*status), + byID: make(map[string]*status), + statusUpdates: make(map[string]struct{}), + index: t.nextIndex, } } + t.triggerVertexEvent(v) if v.Started != nil && (prev == nil || prev.Started == nil) { if t.localTimeDiff == 0 { t.localTimeDiff = time.Since(*v.Started) @@ -118,13 +203,29 @@ func (t *trace) update(s *client.SolveStatus) { v.statuses = append(v.statuses, v.byID[s.ID]) } v.byID[s.ID].VertexStatus = s + v.statusUpdates[s.ID] = struct{}{} + t.updates[v.Digest] = struct{}{} + v.update(1) } for _, l := range s.Logs { v, ok := t.byDigest[l.Vertex] if !ok { continue // shouldn't happen } - v.logs = append(v.logs, l) + complete := split(l.Data, byte('\n'), func(dt []byte) { + if v.logsPartial && len(v.logs) != 0 { + v.logs[len(v.logs)-1] = append(v.logs[len(v.logs)-1], dt...) + } else { + ts := time.Duration(0) + if v.Started != nil { + ts = l.Timestamp.Sub(*v.Started) + } + v.logs = append(v.logs, []byte(fmt.Sprintf("%s %s", fmt.Sprintf("%#.4g", ts.Seconds())[:5], dt))) + } + }) + v.logsPartial = !complete + t.updates[v.Digest] = struct{}{} + v.update(1) } } @@ -134,12 +235,8 @@ func (t *trace) printErrorLogs(f io.Writer) { fmt.Fprintln(f, "------") fmt.Fprintf(f, " > %s:\n", v.Name) for _, l := range v.logs { - switch l.Stream { - case 1: - f.Write(l.Data) - case 2: - f.Write(l.Data) - } + f.Write(l) + fmt.Fprintln(f) } fmt.Fprintln(f, "------") } @@ -196,6 +293,24 @@ func (t *trace) displayInfo() (d displayInfo) { return d } +func split(dt []byte, sep byte, fn func([]byte)) bool { + if len(dt) == 0 { + return false + } + for { + if len(dt) == 0 { + return true + } + idx := bytes.IndexByte(dt, sep) + if idx == -1 { + fn(dt) + return false + } + fn(dt[:idx]) + dt = dt[idx+1:] + } +} + func addTime(tm *time.Time, d time.Duration) *time.Time { if tm == nil { return nil diff --git a/util/progress/progressui/printer.go b/util/progress/progressui/printer.go new file mode 100644 index 00000000..41f333b4 --- /dev/null +++ b/util/progress/progressui/printer.go @@ -0,0 +1,214 @@ +package progressui + +import ( + "fmt" + "io" + "time" + + digest "github.com/opencontainers/go-digest" + "github.com/tonistiigi/units" +) + +const antiFlicker = 5 * time.Second +const maxDelay = 10 * time.Second + +type textMux struct { + w io.Writer + current digest.Digest +} + +func (p *textMux) printVtx(t *trace, dgst digest.Digest) { + v, ok := t.byDigest[dgst] + if !ok { + return + } + + if dgst != p.current { + if p.current != "" { + old := t.byDigest[p.current] + if old.logsPartial { + fmt.Fprintln(p.w, "") + } + old.logsOffset = 0 + old.count = 0 + fmt.Fprintf(p.w, "# ...\n") + } + + fmt.Fprintf(p.w, "\n# %d %s\n", v.index, limitString(v.Name, 72)) + } + + if len(v.events) != 0 { + v.logsOffset = 0 + } + for _, ev := range v.events { + fmt.Fprintf(p.w, "# %s\n", ev) + } + v.events = v.events[:0] + + for _, s := range v.statuses { + if _, ok := v.statusUpdates[s.ID]; ok { + var bytes string + if s.Total != 0 { + bytes = fmt.Sprintf(" %.2f / %.2f", units.Bytes(s.Current), units.Bytes(s.Total)) + } else if s.Current != 0 { + bytes = fmt.Sprintf(" %.2f", units.Bytes(s.Current)) + } + var tm string + endTime := s.Timestamp + if s.Completed != nil { + endTime = *s.Completed + } + if s.Started != nil { + diff := endTime.Sub(*s.Started).Seconds() + if diff > 0.01 { + tm = fmt.Sprintf(" %.1fs", diff) + } + } + if s.Completed != nil { + tm += " done" + } + fmt.Fprintf(p.w, "# %s%s%s\n", s.ID, bytes, tm) + } + } + v.statusUpdates = map[string]struct{}{} + + for i, l := range v.logs { + if i == 0 { + l = l[v.logsOffset:] + } + fmt.Fprintf(p.w, "%s", []byte(l)) + if i != len(v.logs)-1 || !v.logsPartial { + fmt.Fprintln(p.w, "") + } + } + + if len(v.logs) > 0 { + if v.logsPartial { + v.logs = v.logs[len(v.logs)-1:] + v.logsOffset = len(v.logs[0]) + } else { + v.logs = nil + v.logsOffset = 0 + } + } + + p.current = dgst + + if v.Completed != nil { + p.current = "" + v.count = 0 + fmt.Fprintf(p.w, "\n") + } + + delete(t.updates, dgst) +} + +func (p *textMux) print(t *trace) { + + completed := map[digest.Digest]struct{}{} + rest := map[digest.Digest]struct{}{} + + for dgst := range t.updates { + v, ok := t.byDigest[dgst] + if !ok { + continue + } + if v.Vertex.Completed != nil { + completed[dgst] = struct{}{} + } else { + rest[dgst] = struct{}{} + } + } + + current := p.current + + // items that have completed need to be printed first + if _, ok := completed[current]; ok { + p.printVtx(t, current) + } + + for dgst := range completed { + if dgst != current { + p.printVtx(t, dgst) + } + } + + if len(rest) == 0 { + if current != "" { + if v := t.byDigest[current]; v.Started != nil && v.Completed == nil { + return + } + } + // make any open vertex active + for dgst, v := range t.byDigest { + if v.Started != nil && v.Completed == nil { + p.printVtx(t, dgst) + return + } + } + return + } + + // now print the active one + if _, ok := rest[current]; ok { + p.printVtx(t, current) + } + + stats := map[digest.Digest]*vtxStat{} + now := time.Now() + sum := 0.0 + var max digest.Digest + if current != "" { + rest[current] = struct{}{} + } + for dgst := range rest { + v, ok := t.byDigest[dgst] + if !ok { + continue + } + tm := now.Sub(*v.lastBlockTime) + speed := float64(v.count) / tm.Seconds() + overLimit := tm > maxDelay && dgst != current + stats[dgst] = &vtxStat{blockTime: tm, speed: speed, overLimit: overLimit} + sum += speed + if overLimit || max == "" || stats[max].speed < speed { + max = dgst + } + } + for dgst := range stats { + stats[dgst].share = stats[dgst].speed / sum + } + + if _, ok := completed[current]; ok || current == "" { + p.printVtx(t, max) + return + } + + // show items that were hidden + for dgst := range rest { + if stats[dgst].overLimit { + p.printVtx(t, dgst) + return + } + } + + // fair split between vertexes + if 1.0/(1.0-stats[current].share)*antiFlicker.Seconds() < stats[current].blockTime.Seconds() { + p.printVtx(t, max) + return + } +} + +type vtxStat struct { + blockTime time.Duration + speed float64 + share float64 + overLimit bool +} + +func limitString(s string, l int) string { + if len(s) > l { + return s[:l] + "..." + } + return s +}