progressui: add better streaming text build status

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2018-05-29 19:49:51 -07:00
parent 376093d234
commit 39e19516fc
4 changed files with 360 additions and 41 deletions

View File

@ -199,25 +199,14 @@ func build(clicontext *cli.Context) error {
})
eg.Go(func() error {
var c console.Console
if !clicontext.Bool("no-progress") {
if c, err := console.ConsoleFromFile(os.Stderr); err == nil {
// not using shared context to not disrupt display but let is finish reporting errors
return progressui.DisplaySolveStatus(context.TODO(), c, displayCh)
if cf, err := console.ConsoleFromFile(os.Stderr); err == nil {
c = cf
}
}
for s := range displayCh {
for _, v := range s.Vertexes {
logrus.Debugf("vertex: %s %s %v %v", v.Digest, v.Name, v.Started, v.Completed)
}
for _, s := range s.Statuses {
logrus.Debugf("status: %s %s %d", s.Vertex, s.ID, s.Current)
}
for _, l := range s.Logs {
logrus.Debugf("log: %s\n%s", l.Vertex, l.Data)
}
}
return nil
// not using shared context to not disrupt display but let is finish reporting errors
return progressui.DisplaySolveStatus(context.TODO(), c, os.Stdout, displayCh)
})
return eg.Wait()

View File

@ -85,11 +85,12 @@ func action(clicontext *cli.Context) error {
return err
})
eg.Go(func() error {
if c, err := console.ConsoleFromFile(os.Stderr); err == nil {
// not using shared context to not disrupt display but let is finish reporting errors
return progressui.DisplaySolveStatus(context.TODO(), c, ch)
var c console.Console
if cn, err := console.ConsoleFromFile(os.Stderr); err == nil {
c = cn
}
return nil
// not using shared context to not disrupt display but let is finish reporting errors
return progressui.DisplaySolveStatus(context.TODO(), c, os.Stdout, ch)
})
eg.Go(func() error {
if err := loadDockerTar(pipeR); err != nil {

View File

@ -1,6 +1,7 @@
package progressui
import (
"bytes"
"context"
"fmt"
"io"
@ -15,17 +16,21 @@ import (
"golang.org/x/time/rate"
)
func DisplaySolveStatus(ctx context.Context, c console.Console, ch chan *client.SolveStatus) error {
disp := &display{c: c}
func DisplaySolveStatus(ctx context.Context, c console.Console, w io.Writer, ch chan *client.SolveStatus) error {
t := newTrace()
modeConsole := c != nil
disp := &display{c: c}
printer := &textMux{w: w}
t := newTrace(w)
var done bool
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
displayLimiter := rate.NewLimiter(rate.Every(70*time.Millisecond), 1)
var done bool
for {
select {
case <-ctx.Done():
@ -39,12 +44,21 @@ func DisplaySolveStatus(ctx context.Context, c console.Console, ch chan *client.
}
}
if done {
disp.print(t.displayInfo(), true)
t.printErrorLogs(c)
return nil
} else if displayLimiter.Allow() {
disp.print(t.displayInfo(), false)
if modeConsole {
if done {
disp.print(t.displayInfo(), true)
t.printErrorLogs(c)
return nil
} else if displayLimiter.Allow() {
disp.print(t.displayInfo(), false)
}
} else {
if done || displayLimiter.Allow() {
printer.print(t)
if done {
return nil
}
}
}
}
}
@ -66,37 +80,108 @@ type job struct {
}
type trace struct {
w io.Writer
localTimeDiff time.Duration
vertexes []*vertex
byDigest map[digest.Digest]*vertex
nextIndex int
updates map[digest.Digest]struct{}
}
type vertex struct {
*client.Vertex
statuses []*status
byID map[string]*status
logs []*client.VertexLog
indent string
index int
logs [][]byte
logsPartial bool
logsOffset int
prev *client.Vertex
events []string
lastBlockTime *time.Time
count int
statusUpdates map[string]struct{}
}
func (v *vertex) update(c int) {
if v.count == 0 {
now := time.Now()
v.lastBlockTime = &now
}
v.count += c
}
type status struct {
*client.VertexStatus
}
func newTrace() *trace {
func newTrace(w io.Writer) *trace {
return &trace{
byDigest: make(map[digest.Digest]*vertex),
updates: make(map[digest.Digest]struct{}),
w: w,
}
}
func (t *trace) triggerVertexEvent(v *client.Vertex) {
if v.Started == nil {
return
}
var old client.Vertex
vtx := t.byDigest[v.Digest]
if v := vtx.prev; v != nil {
old = *v
}
var ev []string
if v.Digest != old.Digest {
ev = append(ev, fmt.Sprintf("%13s %s", "digest:", v.Digest))
}
if v.Name != old.Name {
ev = append(ev, fmt.Sprintf("%13s %q", "name:", v.Name))
}
if v.Started != old.Started {
if v.Started != nil && old.Started == nil || !v.Started.Equal(*old.Started) {
ev = append(ev, fmt.Sprintf("%13s %v", "started:", v.Started))
}
}
if v.Completed != old.Completed && v.Completed != nil {
ev = append(ev, fmt.Sprintf("%13s %v", "completed:", v.Completed))
if v.Started != nil {
ev = append(ev, fmt.Sprintf("%13s %v", "duration:", v.Completed.Sub(*v.Started)))
}
}
if v.Cached != old.Cached {
ev = append(ev, fmt.Sprintf("%13s %v", "cached:", v.Cached))
}
if v.Error != old.Error {
ev = append(ev, fmt.Sprintf("%13s %q", "error:", v.Error))
}
if len(ev) > 0 {
vtx.events = append(vtx.events, ev...)
vtx.update(len(ev))
t.updates[v.Digest] = struct{}{}
}
t.byDigest[v.Digest].prev = v
}
func (t *trace) update(s *client.SolveStatus) {
for _, v := range s.Vertexes {
prev, ok := t.byDigest[v.Digest]
if !ok {
t.nextIndex++
t.byDigest[v.Digest] = &vertex{
byID: make(map[string]*status),
byID: make(map[string]*status),
statusUpdates: make(map[string]struct{}),
index: t.nextIndex,
}
}
t.triggerVertexEvent(v)
if v.Started != nil && (prev == nil || prev.Started == nil) {
if t.localTimeDiff == 0 {
t.localTimeDiff = time.Since(*v.Started)
@ -118,13 +203,29 @@ func (t *trace) update(s *client.SolveStatus) {
v.statuses = append(v.statuses, v.byID[s.ID])
}
v.byID[s.ID].VertexStatus = s
v.statusUpdates[s.ID] = struct{}{}
t.updates[v.Digest] = struct{}{}
v.update(1)
}
for _, l := range s.Logs {
v, ok := t.byDigest[l.Vertex]
if !ok {
continue // shouldn't happen
}
v.logs = append(v.logs, l)
complete := split(l.Data, byte('\n'), func(dt []byte) {
if v.logsPartial && len(v.logs) != 0 {
v.logs[len(v.logs)-1] = append(v.logs[len(v.logs)-1], dt...)
} else {
ts := time.Duration(0)
if v.Started != nil {
ts = l.Timestamp.Sub(*v.Started)
}
v.logs = append(v.logs, []byte(fmt.Sprintf("%s %s", fmt.Sprintf("%#.4g", ts.Seconds())[:5], dt)))
}
})
v.logsPartial = !complete
t.updates[v.Digest] = struct{}{}
v.update(1)
}
}
@ -134,12 +235,8 @@ func (t *trace) printErrorLogs(f io.Writer) {
fmt.Fprintln(f, "------")
fmt.Fprintf(f, " > %s:\n", v.Name)
for _, l := range v.logs {
switch l.Stream {
case 1:
f.Write(l.Data)
case 2:
f.Write(l.Data)
}
f.Write(l)
fmt.Fprintln(f)
}
fmt.Fprintln(f, "------")
}
@ -196,6 +293,24 @@ func (t *trace) displayInfo() (d displayInfo) {
return d
}
func split(dt []byte, sep byte, fn func([]byte)) bool {
if len(dt) == 0 {
return false
}
for {
if len(dt) == 0 {
return true
}
idx := bytes.IndexByte(dt, sep)
if idx == -1 {
fn(dt)
return false
}
fn(dt[:idx])
dt = dt[idx+1:]
}
}
func addTime(tm *time.Time, d time.Duration) *time.Time {
if tm == nil {
return nil

View File

@ -0,0 +1,214 @@
package progressui
import (
"fmt"
"io"
"time"
digest "github.com/opencontainers/go-digest"
"github.com/tonistiigi/units"
)
const antiFlicker = 5 * time.Second
const maxDelay = 10 * time.Second
type textMux struct {
w io.Writer
current digest.Digest
}
func (p *textMux) printVtx(t *trace, dgst digest.Digest) {
v, ok := t.byDigest[dgst]
if !ok {
return
}
if dgst != p.current {
if p.current != "" {
old := t.byDigest[p.current]
if old.logsPartial {
fmt.Fprintln(p.w, "")
}
old.logsOffset = 0
old.count = 0
fmt.Fprintf(p.w, "# ...\n")
}
fmt.Fprintf(p.w, "\n# %d %s\n", v.index, limitString(v.Name, 72))
}
if len(v.events) != 0 {
v.logsOffset = 0
}
for _, ev := range v.events {
fmt.Fprintf(p.w, "# %s\n", ev)
}
v.events = v.events[:0]
for _, s := range v.statuses {
if _, ok := v.statusUpdates[s.ID]; ok {
var bytes string
if s.Total != 0 {
bytes = fmt.Sprintf(" %.2f / %.2f", units.Bytes(s.Current), units.Bytes(s.Total))
} else if s.Current != 0 {
bytes = fmt.Sprintf(" %.2f", units.Bytes(s.Current))
}
var tm string
endTime := s.Timestamp
if s.Completed != nil {
endTime = *s.Completed
}
if s.Started != nil {
diff := endTime.Sub(*s.Started).Seconds()
if diff > 0.01 {
tm = fmt.Sprintf(" %.1fs", diff)
}
}
if s.Completed != nil {
tm += " done"
}
fmt.Fprintf(p.w, "# %s%s%s\n", s.ID, bytes, tm)
}
}
v.statusUpdates = map[string]struct{}{}
for i, l := range v.logs {
if i == 0 {
l = l[v.logsOffset:]
}
fmt.Fprintf(p.w, "%s", []byte(l))
if i != len(v.logs)-1 || !v.logsPartial {
fmt.Fprintln(p.w, "")
}
}
if len(v.logs) > 0 {
if v.logsPartial {
v.logs = v.logs[len(v.logs)-1:]
v.logsOffset = len(v.logs[0])
} else {
v.logs = nil
v.logsOffset = 0
}
}
p.current = dgst
if v.Completed != nil {
p.current = ""
v.count = 0
fmt.Fprintf(p.w, "\n")
}
delete(t.updates, dgst)
}
func (p *textMux) print(t *trace) {
completed := map[digest.Digest]struct{}{}
rest := map[digest.Digest]struct{}{}
for dgst := range t.updates {
v, ok := t.byDigest[dgst]
if !ok {
continue
}
if v.Vertex.Completed != nil {
completed[dgst] = struct{}{}
} else {
rest[dgst] = struct{}{}
}
}
current := p.current
// items that have completed need to be printed first
if _, ok := completed[current]; ok {
p.printVtx(t, current)
}
for dgst := range completed {
if dgst != current {
p.printVtx(t, dgst)
}
}
if len(rest) == 0 {
if current != "" {
if v := t.byDigest[current]; v.Started != nil && v.Completed == nil {
return
}
}
// make any open vertex active
for dgst, v := range t.byDigest {
if v.Started != nil && v.Completed == nil {
p.printVtx(t, dgst)
return
}
}
return
}
// now print the active one
if _, ok := rest[current]; ok {
p.printVtx(t, current)
}
stats := map[digest.Digest]*vtxStat{}
now := time.Now()
sum := 0.0
var max digest.Digest
if current != "" {
rest[current] = struct{}{}
}
for dgst := range rest {
v, ok := t.byDigest[dgst]
if !ok {
continue
}
tm := now.Sub(*v.lastBlockTime)
speed := float64(v.count) / tm.Seconds()
overLimit := tm > maxDelay && dgst != current
stats[dgst] = &vtxStat{blockTime: tm, speed: speed, overLimit: overLimit}
sum += speed
if overLimit || max == "" || stats[max].speed < speed {
max = dgst
}
}
for dgst := range stats {
stats[dgst].share = stats[dgst].speed / sum
}
if _, ok := completed[current]; ok || current == "" {
p.printVtx(t, max)
return
}
// show items that were hidden
for dgst := range rest {
if stats[dgst].overLimit {
p.printVtx(t, dgst)
return
}
}
// fair split between vertexes
if 1.0/(1.0-stats[current].share)*antiFlicker.Seconds() < stats[current].blockTime.Seconds() {
p.printVtx(t, max)
return
}
}
type vtxStat struct {
blockTime time.Duration
speed float64
share float64
overLimit bool
}
func limitString(s string, l int) string {
if len(s) > l {
return s[:l] + "..."
}
return s
}