262 lines
5.1 KiB
Go
262 lines
5.1 KiB
Go
package progress
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// Progress package provides utility functions for using the context to capture
|
|
// progress of a running function. All progress items written contain an ID
|
|
// that is used to collapse unread messages.
|
|
|
|
type contextKeyT string
|
|
|
|
var contextKey = contextKeyT("buildkit/util/progress")
|
|
|
|
// FromContext returns a progress writer from a context.
|
|
func FromContext(ctx context.Context, opts ...WriterOption) (Writer, bool, context.Context) {
|
|
v := ctx.Value(contextKey)
|
|
pw, ok := v.(*progressWriter)
|
|
if !ok {
|
|
if pw, ok := v.(*MultiWriter); ok {
|
|
return pw, true, ctx
|
|
}
|
|
return &noOpWriter{}, false, ctx
|
|
}
|
|
pw = newWriter(pw)
|
|
for _, o := range opts {
|
|
o(pw)
|
|
}
|
|
ctx = context.WithValue(ctx, contextKey, pw)
|
|
return pw, true, ctx
|
|
}
|
|
|
|
type WriterOption func(Writer)
|
|
|
|
// NewContext returns a new context and a progress reader that captures all
|
|
// progress items writtern to this context. Last returned parameter is a closer
|
|
// function to signal that no new writes will happen to this context.
|
|
func NewContext(ctx context.Context) (Reader, context.Context, func()) {
|
|
pr, pw, cancel := pipe()
|
|
ctx = WithProgress(ctx, pw)
|
|
return pr, ctx, cancel
|
|
}
|
|
|
|
func WithProgress(ctx context.Context, pw Writer) context.Context {
|
|
return context.WithValue(ctx, contextKey, pw)
|
|
}
|
|
|
|
func WithMetadata(key string, val interface{}) WriterOption {
|
|
return func(w Writer) {
|
|
if pw, ok := w.(*progressWriter); ok {
|
|
pw.meta[key] = val
|
|
}
|
|
if pw, ok := w.(*MultiWriter); ok {
|
|
pw.meta[key] = val
|
|
}
|
|
}
|
|
}
|
|
|
|
type Controller interface {
|
|
Start(context.Context) (context.Context, func(error))
|
|
Status(id string, action string) func()
|
|
}
|
|
|
|
type Writer interface {
|
|
Write(id string, value interface{}) error
|
|
Close() error
|
|
}
|
|
|
|
type Reader interface {
|
|
Read(context.Context) ([]*Progress, error)
|
|
}
|
|
|
|
type Progress struct {
|
|
ID string
|
|
Timestamp time.Time
|
|
Sys interface{}
|
|
meta map[string]interface{}
|
|
}
|
|
|
|
type Status struct {
|
|
Action string
|
|
Current int
|
|
Total int
|
|
Started *time.Time
|
|
Completed *time.Time
|
|
}
|
|
|
|
type progressReader struct {
|
|
ctx context.Context
|
|
cond *sync.Cond
|
|
mu sync.Mutex
|
|
writers map[*progressWriter]struct{}
|
|
dirty map[string]*Progress
|
|
}
|
|
|
|
func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) {
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
go func() {
|
|
select {
|
|
case <-done:
|
|
case <-ctx.Done():
|
|
pr.mu.Lock()
|
|
pr.cond.Broadcast()
|
|
pr.mu.Unlock()
|
|
}
|
|
}()
|
|
pr.mu.Lock()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
pr.mu.Unlock()
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
dmap := pr.dirty
|
|
if len(dmap) == 0 {
|
|
select {
|
|
case <-pr.ctx.Done():
|
|
if len(pr.writers) == 0 {
|
|
pr.mu.Unlock()
|
|
return nil, io.EOF
|
|
}
|
|
default:
|
|
}
|
|
pr.cond.Wait()
|
|
continue
|
|
}
|
|
pr.dirty = make(map[string]*Progress)
|
|
pr.mu.Unlock()
|
|
|
|
out := make([]*Progress, 0, len(dmap))
|
|
for _, p := range dmap {
|
|
out = append(out, p)
|
|
}
|
|
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].Timestamp.Before(out[j].Timestamp)
|
|
})
|
|
|
|
return out, nil
|
|
}
|
|
}
|
|
|
|
func (pr *progressReader) append(pw *progressWriter) {
|
|
pr.mu.Lock()
|
|
defer pr.mu.Unlock()
|
|
|
|
select {
|
|
case <-pr.ctx.Done():
|
|
return
|
|
default:
|
|
pr.writers[pw] = struct{}{}
|
|
}
|
|
}
|
|
|
|
func pipe() (*progressReader, *progressWriter, func()) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
pr := &progressReader{
|
|
ctx: ctx,
|
|
writers: make(map[*progressWriter]struct{}),
|
|
dirty: make(map[string]*Progress),
|
|
}
|
|
pr.cond = sync.NewCond(&pr.mu)
|
|
go func() {
|
|
<-ctx.Done()
|
|
pr.mu.Lock()
|
|
pr.cond.Broadcast()
|
|
pr.mu.Unlock()
|
|
}()
|
|
pw := &progressWriter{
|
|
reader: pr,
|
|
}
|
|
return pr, pw, cancel
|
|
}
|
|
|
|
func newWriter(pw *progressWriter) *progressWriter {
|
|
meta := make(map[string]interface{})
|
|
for k, v := range pw.meta {
|
|
meta[k] = v
|
|
}
|
|
pw = &progressWriter{
|
|
reader: pw.reader,
|
|
meta: meta,
|
|
}
|
|
pw.reader.append(pw)
|
|
return pw
|
|
}
|
|
|
|
type progressWriter struct {
|
|
done bool
|
|
reader *progressReader
|
|
meta map[string]interface{}
|
|
}
|
|
|
|
func (pw *progressWriter) Write(id string, v interface{}) error {
|
|
if pw.done {
|
|
return errors.Errorf("writing %s to closed progress writer", id)
|
|
}
|
|
return pw.writeRawProgress(&Progress{
|
|
ID: id,
|
|
Timestamp: time.Now(),
|
|
Sys: v,
|
|
meta: pw.meta,
|
|
})
|
|
}
|
|
|
|
func (pw *progressWriter) WriteRawProgress(p *Progress) error {
|
|
meta := p.meta
|
|
if len(pw.meta) > 0 {
|
|
meta = map[string]interface{}{}
|
|
for k, v := range p.meta {
|
|
meta[k] = v
|
|
}
|
|
for k, v := range pw.meta {
|
|
if _, ok := meta[k]; !ok {
|
|
meta[k] = v
|
|
}
|
|
}
|
|
}
|
|
p.meta = meta
|
|
return pw.writeRawProgress(p)
|
|
}
|
|
|
|
func (pw *progressWriter) writeRawProgress(p *Progress) error {
|
|
pw.reader.mu.Lock()
|
|
pw.reader.dirty[p.ID] = p
|
|
pw.reader.cond.Broadcast()
|
|
pw.reader.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (pw *progressWriter) Close() error {
|
|
pw.reader.mu.Lock()
|
|
delete(pw.reader.writers, pw)
|
|
pw.reader.mu.Unlock()
|
|
pw.reader.cond.Broadcast()
|
|
pw.done = true
|
|
return nil
|
|
}
|
|
|
|
func (p *Progress) Meta(key string) (interface{}, bool) {
|
|
v, ok := p.meta[key]
|
|
return v, ok
|
|
}
|
|
|
|
type noOpWriter struct{}
|
|
|
|
func (pw *noOpWriter) Write(_ string, _ interface{}) error {
|
|
return nil
|
|
}
|
|
|
|
func (pw *noOpWriter) Close() error {
|
|
return nil
|
|
}
|