util: progress reporting facility

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2017-05-30 17:45:58 -07:00
parent b15a254867
commit 46339fb6fd
2 changed files with 231 additions and 10 deletions

View File

@ -2,17 +2,31 @@ package progress
import ( import (
"context" "context"
"sync"
"sync/atomic"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
func FromContext(ctx context.Context, name string) (*ProgressWriter, bool, context.Context) { type contextKeyT string
return nil, false, ctx
var contextKey = contextKeyT("buildkit/util/progress")
func FromContext(ctx context.Context, name string) (ProgressWriter, bool, context.Context) {
pw, ok := ctx.Value(contextKey).(*progressWriter)
if !ok {
return &noOpWriter{}, false, ctx
}
pw = newWriter(pw, name)
ctx = context.WithValue(ctx, contextKey, pw)
return pw, false, ctx
} }
func NewContext(ctx context.Context) (*ProgressReader, context.Context) { func NewContext(ctx context.Context) (ProgressReader, context.Context, func()) {
return nil, ctx pr, pw, cancel := pipe()
ctx = context.WithValue(ctx, contextKey, pw)
return pr, ctx, cancel
} }
type ProgressWriter interface { type ProgressWriter interface {
@ -32,26 +46,161 @@ type Progress struct {
// ...progress of an action // ...progress of an action
Action string Action string
Current int64 Current int
Total int64 Total int
Timestamp time.Time Timestamp time.Time
Done bool Done bool
} }
type progressReader struct{} type progressReader struct {
ctx context.Context
cond *sync.Cond
mu sync.Mutex
handles []*streamHandle
}
type streamHandle struct {
pw *progressWriter
lastP *Progress
}
func (sh *streamHandle) next() (*Progress, bool) {
last := sh.pw.lastP.Load().(*Progress)
if last != sh.lastP {
sh.lastP = last
return last, true
}
return nil, false
}
func (pr *progressReader) Read(ctx context.Context) (*Progress, error) { func (pr *progressReader) Read(ctx context.Context) (*Progress, error) {
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-done:
case <-ctx.Done():
pr.cond.Broadcast()
}
}()
pr.mu.Lock()
for {
select {
case <-ctx.Done():
pr.mu.Unlock()
return nil, ctx.Err()
default:
}
open := false
for _, sh := range pr.handles {
p, ok := sh.next()
if ok {
pr.mu.Unlock()
return p, nil
}
if !sh.lastP.Done {
open = true
}
}
select {
case <-pr.ctx.Done():
if !open {
pr.mu.Unlock()
return nil, nil
}
pr.cond.Wait()
default:
pr.cond.Wait()
}
}
return nil, errors.Errorf("Read not implemented") return nil, errors.Errorf("Read not implemented")
} }
type progressWriter struct{} func (pr *progressReader) append(pw *progressWriter) {
pr.mu.Lock()
defer pr.mu.Unlock()
select {
case <-pr.ctx.Done():
return
default:
pr.handles = append(pr.handles, &streamHandle{pw: pw})
}
}
func pipe() (*progressReader, *progressWriter, func()) {
ctx, cancel := context.WithCancel(context.Background())
pr := &progressReader{
ctx: ctx,
}
pr.cond = sync.NewCond(&pr.mu)
pw := &progressWriter{
reader: pr,
}
return pr, pw, cancel
}
func newWriter(pw *progressWriter, name string) *progressWriter {
if pw.id != "" {
name = pw.id + "." + name
}
pw = &progressWriter{
id: name,
reader: pw.reader,
}
pw.reader.append(pw)
return pw
}
type progressWriter struct {
id string
lastP atomic.Value
done bool
reader *progressReader
}
func (pw *progressWriter) Write(p Progress) error { func (pw *progressWriter) Write(p Progress) error {
return errors.Errorf("Write not implemented") // find progressstream, write to it
// if no progressstream then make one
// if done then close stream for writing
if pw.done {
return errors.Errorf("writing to closed progresswriter %s", pw.id)
}
p.ID = pw.id
if p.Timestamp.IsZero() {
p.Timestamp = time.Now()
}
pw.lastP.Store(&p)
if p.Done {
pw.done = true
}
pw.reader.cond.Broadcast()
return nil
} }
func (pw *progressWriter) Done() error { func (pw *progressWriter) Done() error {
return errors.Errorf("Done not implemented") var p Progress
lastP := pw.lastP.Load().(*Progress)
if lastP != nil {
p = *lastP
if p.Done {
return nil
}
} else {
p = Progress{}
}
return pw.Write(p)
}
type noOpWriter struct{}
func (pw *noOpWriter) Write(p Progress) error {
return nil
}
func (pw *noOpWriter) Done() error {
return nil
} }
// type ProgressRecord struct { // type ProgressRecord struct {

View File

@ -0,0 +1,72 @@
package progress
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
)
func TestProgress(t *testing.T) {
s, err := calc(context.TODO(), 4)
assert.NoError(t, err)
assert.Equal(t, 10, s)
eg, ctx := errgroup.WithContext(context.Background())
pr, ctx, cancelProgress := NewContext(ctx)
var trace trace
eg.Go(func() error {
return saveProgress(ctx, pr, &trace)
})
s, err = calc(ctx, 5)
assert.NoError(t, err)
assert.Equal(t, 15, s)
cancelProgress()
err = eg.Wait()
assert.NoError(t, err)
assert.Equal(t, 6, len(trace.items))
assert.Equal(t, trace.items[len(trace.items)-1].Done, true)
}
func calc(ctx context.Context, total int) (int, error) {
pw, _, ctx := FromContext(ctx, "calc")
defer pw.Done()
sum := 0
pw.Write(Progress{Action: "starting", Total: total})
for i := 1; i <= total; i++ {
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-time.After(10 * time.Millisecond):
}
pw.Write(Progress{Action: "calculating", Total: total, Current: i})
sum += i
}
pw.Write(Progress{Action: "done", Total: total, Current: total, Done: true})
return sum, nil
}
type trace struct {
items []Progress
}
func saveProgress(ctx context.Context, pr ProgressReader, t *trace) error {
for {
p, err := pr.Read(ctx)
if err != nil {
return err
}
if p == nil {
return nil
}
t.items = append(t.items, *p)
}
return nil
}