From f0c14879c71e2bddd2edb21810ce7b92e519b1fa Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 5 Jul 2017 21:25:51 -0700 Subject: [PATCH] solver: remove llb dependency Signed-off-by: Tonis Tiigi --- control/control.go | 4 +- solver/exec.go | 43 ++++++++++--- solver/jobs.go | 27 ++++---- solver/load.go | 79 +++++++++++++++--------- solver/refcache.go | 31 ++++++---- solver/solver.go | 150 +++++++++++++++++++-------------------------- solver/source.go | 36 +++++++---- solver/vertex.go | 98 +++++++++++++++++++++++++++++ source/manager.go | 10 +++ 9 files changed, 311 insertions(+), 167 deletions(-) create mode 100644 solver/vertex.go diff --git a/control/control.go b/control/control.go index e365fdeb..dcde822d 100644 --- a/control/control.go +++ b/control/control.go @@ -29,7 +29,7 @@ type Controller struct { // TODO: ControlService func NewController(opt Opt) (*Controller, error) { c := &Controller{ opt: opt, - solver: solver.New(solver.Opt{ + solver: solver.NewLLBSolver(solver.LLBOpt{ SourceManager: opt.SourceManager, CacheManager: opt.CacheManager, Worker: opt.Worker, @@ -62,7 +62,7 @@ func (c *Controller) DiskUsage(ctx context.Context, _ *controlapi.DiskUsageReque } func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*controlapi.SolveResponse, error) { - v, err := solver.Load(req.Definition) + v, err := solver.LoadLLB(req.Definition) if err != nil { return nil, errors.Wrap(err, "failed to load") } diff --git a/solver/exec.go b/solver/exec.go index 16e30821..d45bf43b 100644 --- a/solver/exec.go +++ b/solver/exec.go @@ -1,7 +1,6 @@ package solver import ( - "context" "io" "os" @@ -12,9 +11,24 @@ import ( "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/worker" "github.com/pkg/errors" + "golang.org/x/net/context" ) -func runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op_Exec, inputs []cache.ImmutableRef) ([]cache.ImmutableRef, error) { +type execOp struct { + op *pb.Op_Exec + cm cache.Manager + w worker.Worker +} + +func newExecOp(op *pb.Op_Exec, cm cache.Manager, w worker.Worker) (Op, error) { + return &execOp{ + op: op, + cm: cm, + w: w, + }, nil +} + +func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, error) { mounts := make(map[string]cache.Mountable) var outputs []cache.MutableRef @@ -30,15 +44,24 @@ func runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op } }() - for _, m := range op.Exec.Mounts { + for _, m := range e.op.Exec.Mounts { var mountable cache.Mountable if int(m.Input) > len(inputs) { return nil, errors.Errorf("missing input %d", m.Input) } - ref := inputs[int(m.Input)] + inp := inputs[int(m.Input)] + if sys, ok := inp.(interface { + Sys() Reference + }); ok { + inp = sys.Sys() + } + ref, ok := inp.(cache.ImmutableRef) + if !ok { + return nil, errors.Errorf("invalid reference for exec %T", inputs[int(m.Input)]) + } mountable = ref if m.Output != -1 { - active, err := cm.New(ctx, ref) // TODO: should be method + active, err := e.cm.New(ctx, ref) // TODO: should be method if err != nil { return nil, err } @@ -49,9 +72,9 @@ func runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op } meta := worker.Meta{ - Args: op.Exec.Meta.Args, - Env: op.Exec.Meta.Env, - Cwd: op.Exec.Meta.Cwd, + Args: e.op.Exec.Meta.Args, + Env: e.op.Exec.Meta.Env, + Cwd: e.op.Exec.Meta.Cwd, } stdout := newStreamWriter(ctx, 1) @@ -59,11 +82,11 @@ func runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op stderr := newStreamWriter(ctx, 2) defer stderr.Close() - if err := w.Exec(ctx, meta, mounts, stdout, stderr); err != nil { + if err := e.w.Exec(ctx, meta, mounts, stdout, stderr); err != nil { return nil, errors.Wrapf(err, "worker failed running %v", meta.Args) } - refs := []cache.ImmutableRef{} + refs := []Reference{} for i, o := range outputs { ref, err := o.ReleaseAndCommit(ctx) if err != nil { diff --git a/solver/jobs.go b/solver/jobs.go index ade143f0..c3fec844 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -27,7 +27,7 @@ func newJobList() *jobList { return jl } -func (jl *jobList) new(ctx context.Context, id string, g *opVertex, pr progress.Reader) (*job, error) { +func (jl *jobList) new(ctx context.Context, id string, g *vertex, pr progress.Reader) (*job, error) { jl.mu.Lock() defer jl.mu.Unlock() @@ -75,15 +75,16 @@ func (jl *jobList) get(id string) (*job, error) { type job struct { mu sync.Mutex - g *opVertex + g *vertex pr *progress.MultiReader } func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error { pr := j.pr.Reader(ctx) for v := range walk(j.g) { + vv := v.(*vertex) ss := &client.SolveStatus{ - Vertexes: []*client.Vertex{&v.vtx}, + Vertexes: []*client.Vertex{&vv.clientVertex}, } select { case <-ctx.Done(): @@ -141,23 +142,23 @@ func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error { } } -func walk(op *opVertex) chan *opVertex { +func walk(v Vertex) chan Vertex { cache := make(map[digest.Digest]struct{}) - ch := make(chan *opVertex, 32) + ch := make(chan Vertex, 32) - var send func(op *opVertex) - send = func(op *opVertex) { - for _, v := range op.inputs { - send(v) + var send func(v Vertex) + send = func(v Vertex) { + for _, v := range v.Inputs() { + send(v.Vertex) } - if _, ok := cache[op.dgst]; !ok { - ch <- op - cache[op.dgst] = struct{}{} + if _, ok := cache[v.Digest()]; !ok { + ch <- v + cache[v.Digest()] = struct{}{} } } go func() { - send(op) + send(v) close(ch) }() return ch diff --git a/solver/load.go b/solver/load.go index a1dd94af..ae6ae23a 100644 --- a/solver/load.go +++ b/solver/load.go @@ -1,70 +1,89 @@ package solver import ( - "github.com/moby/buildkit/client" + "strings" + "github.com/moby/buildkit/solver/pb" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) -func Load(ops [][]byte) (*opVertex, error) { +func LoadLLB(ops [][]byte) (Vertex, error) { if len(ops) == 0 { return nil, errors.New("invalid empty definition") } - m := make(map[digest.Digest]*pb.Op) + allOps := make(map[digest.Digest]*pb.Op) var lastOp *pb.Op - var dgst digest.Digest + var lastDigest digest.Digest - for i, dt := range ops { + for _, dt := range ops { var op pb.Op if err := (&op).Unmarshal(dt); err != nil { - return nil, errors.Wrap(err, "failed to parse op") + return nil, errors.Wrap(err, "failed to parse llb proto op") } lastOp = &op - dgst = digest.FromBytes(dt) - if i != len(ops)-1 { - m[dgst] = &op - } - // logrus.Debugf("op %d %s %#v", i, dgst, op) + lastDigest = digest.FromBytes(dt) + allOps[lastDigest] = &op } - cache := make(map[digest.Digest]*opVertex) + delete(allOps, lastDigest) // avoid loops + + cache := make(map[digest.Digest]*vertex) // TODO: validate the connections - vtx, err := loadReqursive(dgst, lastOp, m, cache) - if err != nil { - return nil, err - } - - return vtx, err + return loadLLBVertexRecursive(lastDigest, lastOp, allOps, cache) } -func loadReqursive(dgst digest.Digest, op *pb.Op, inputs map[digest.Digest]*pb.Op, cache map[digest.Digest]*opVertex) (*opVertex, error) { +func toInternalVertex(v Vertex) *vertex { + cache := make(map[digest.Digest]*vertex) + return loadInternalVertexHelper(v, cache) +} + +func loadInternalVertexHelper(v Vertex, cache map[digest.Digest]*vertex) *vertex { + if v, ok := cache[v.Digest()]; ok { + return v + } + vtx := &vertex{sys: v.Sys(), digest: v.Digest(), name: v.Name()} + for _, in := range v.Inputs() { + vv := loadInternalVertexHelper(in.Vertex, cache) + vtx.inputs = append(vtx.inputs, &input{index: in.Index, vertex: vv}) + } + vtx.initClientVertex() + cache[v.Digest()] = vtx + return vtx +} + +func loadLLBVertexRecursive(dgst digest.Digest, op *pb.Op, all map[digest.Digest]*pb.Op, cache map[digest.Digest]*vertex) (*vertex, error) { if v, ok := cache[dgst]; ok { return v, nil } - vtx := &opVertex{op: op, dgst: dgst} - inputDigests := make([]digest.Digest, 0, len(op.Inputs)) + vtx := &vertex{sys: op.Op, digest: dgst, name: llbOpName(op)} for _, in := range op.Inputs { dgst := digest.Digest(in.Digest) - inputDigests = append(inputDigests, dgst) - op, ok := inputs[dgst] + op, ok := all[dgst] if !ok { return nil, errors.Errorf("failed to find %s", in) } - sub, err := loadReqursive(dgst, op, inputs, cache) + sub, err := loadLLBVertexRecursive(dgst, op, all, cache) if err != nil { return nil, err } - vtx.inputs = append(vtx.inputs, sub) - } - vtx.vtx = client.Vertex{ - Inputs: inputDigests, - Name: vtx.name(), - Digest: dgst, + vtx.inputs = append(vtx.inputs, &input{index: int(in.Index), vertex: sub}) } + vtx.initClientVertex() cache[dgst] = vtx return vtx, nil } + +func llbOpName(op *pb.Op) string { + switch op := op.Op.(type) { + case *pb.Op_Source: + return op.Source.Identifier + case *pb.Op_Exec: + return strings.Join(op.Exec.Meta.Args, " ") + default: + return "unknown" + } +} diff --git a/solver/refcache.go b/solver/refcache.go index 134afa0c..50411f6c 100644 --- a/solver/refcache.go +++ b/solver/refcache.go @@ -3,7 +3,6 @@ package solver import ( "sync" - "github.com/moby/buildkit/cache" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/progress" digest "github.com/opencontainers/go-digest" @@ -44,7 +43,7 @@ func (c *refCache) probe(j *job, key digest.Digest) bool { c.mu.Unlock() return false } -func (c *refCache) get(key digest.Digest) ([]cache.ImmutableRef, error) { +func (c *refCache) get(key digest.Digest) ([]Reference, error) { c.mu.Lock() defer c.mu.Unlock() v, ok := c.cache[key] @@ -55,13 +54,13 @@ func (c *refCache) get(key digest.Digest) ([]cache.ImmutableRef, error) { if v.value == nil { return nil, errors.Errorf("no ref cache value set") } - refs := make([]cache.ImmutableRef, 0, len(v.value)) + refs := make([]Reference, 0, len(v.value)) for _, r := range v.value { refs = append(refs, r.Clone()) } return refs, nil } -func (c *refCache) set(ctx context.Context, key digest.Digest, refs []cache.ImmutableRef) { +func (c *refCache) set(ctx context.Context, key digest.Digest, refs []Reference) { c.mu.Lock() sharedRefs := make([]*sharedRef, 0, len(refs)) for _, r := range refs { @@ -110,20 +109,20 @@ func (c *refCache) writeProgressSnapshot(ctx context.Context, key digest.Digest) type sharedRef struct { mu sync.Mutex refs map[*sharedRefInstance]struct{} - main cache.ImmutableRef - cache.ImmutableRef + main Reference + Reference } -func newSharedRef(main cache.ImmutableRef) *sharedRef { +func newSharedRef(main Reference) *sharedRef { mr := &sharedRef{ - refs: make(map[*sharedRefInstance]struct{}), - ImmutableRef: main, + refs: make(map[*sharedRefInstance]struct{}), + Reference: main, } mr.main = mr.Clone() return mr } -func (mr *sharedRef) Clone() cache.ImmutableRef { +func (mr *sharedRef) Clone() Reference { mr.mu.Lock() r := &sharedRefInstance{sharedRef: mr} mr.refs[r] = struct{}{} @@ -135,6 +134,16 @@ func (mr *sharedRef) Release(ctx context.Context) error { return mr.main.Release(ctx) } +func (mr *sharedRef) Sys() Reference { + sys := mr.Reference + if s, ok := sys.(interface { + Sys() Reference + }); ok { + return s.Sys() + } + return sys +} + type sharedRefInstance struct { *sharedRef } @@ -144,7 +153,7 @@ func (r *sharedRefInstance) Release(ctx context.Context) error { defer r.sharedRef.mu.Unlock() delete(r.sharedRef.refs, r) if len(r.sharedRef.refs) == 0 { - return r.sharedRef.ImmutableRef.Release(ctx) + return r.sharedRef.Reference.Release(ctx) } return nil } diff --git a/solver/solver.go b/solver/solver.go index 124a3d54..ebfb774b 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -1,49 +1,77 @@ package solver import ( - "strings" - "sync" - "time" - "github.com/moby/buildkit/cache" "github.com/moby/buildkit/client" "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/source" "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/worker" - digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/net/context" "golang.org/x/sync/errgroup" ) -type Opt struct { +type LLBOpt struct { SourceManager *source.Manager CacheManager cache.Manager // TODO: this shouldn't be needed before instruction cache Worker worker.Worker } +func NewLLBSolver(opt LLBOpt) *Solver { + return New(func(v Vertex) (Op, error) { + switch op := v.Sys().(type) { + case *pb.Op_Source: + return newSourceOp(op, opt.SourceManager) + case *pb.Op_Exec: + return newExecOp(op, opt.CacheManager, opt.Worker) + default: + return nil, errors.Errorf("invalid op type %T", op) + } + }) +} + +// ResolveOpFunc finds an Op implementation for a vertex +type ResolveOpFunc func(Vertex) (Op, error) + +// Reference is a reference to the object passed through the build steps. +type Reference interface { + Release(context.Context) error +} + +// Op is an implementation for running a vertex +type Op interface { + // CacheKeys(context.Context, [][]string) ([]string, error) + Run(ctx context.Context, inputs []Reference) (outputs []Reference, err error) +} + +// type Cache interface { +// Lookup(context.Context, string) ([]Reference, error) +// } + type Solver struct { - opt Opt - jobs *jobList - active refCache + resolve ResolveOpFunc + jobs *jobList + active refCache } -func New(opt Opt) *Solver { - return &Solver{opt: opt, jobs: newJobList()} +func New(resolve ResolveOpFunc) *Solver { + return &Solver{resolve: resolve, jobs: newJobList()} } -func (s *Solver) Solve(ctx context.Context, id string, g *opVertex) error { +func (s *Solver) Solve(ctx context.Context, id string, v Vertex) error { ctx, cancel := context.WithCancel(ctx) defer cancel() pr, ctx, closeProgressWriter := progress.NewContext(ctx) - if len(g.inputs) > 0 { // TODO: detect op_return better - g = g.inputs[0] + if len(v.Inputs()) > 0 { // TODO: detect op_return better + v = v.Inputs()[0].Vertex } - j, err := s.jobs.new(ctx, id, g, pr) + vv := toInternalVertex(v) + + j, err := s.jobs.new(ctx, id, vv, pr) if err != nil { return err } @@ -71,16 +99,16 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client. return j.pipe(ctx, statusChan) } -func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cache.ImmutableRef, retErr error) { +func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Reference, retErr error) { - s.active.probe(j, g.dgst) // this registers the key with the job + s.active.probe(j, g.digest) // this registers the key with the job // refs contains all outputs for all input vertexes refs := make([][]*sharedRef, len(g.inputs)) if len(g.inputs) > 0 { eg, ctx := errgroup.WithContext(ctx) for i, in := range g.inputs { - func(i int, in *opVertex) { + func(i int, in *vertex) { eg.Go(func() error { r, err := s.getRefs(ctx, j, in) if err != nil { @@ -91,7 +119,7 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac } return nil }) - }(i, in) + }(i, in.vertex) } err := eg.Wait() if err != nil { @@ -105,17 +133,13 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac } // determine the inputs that were needed - inputs := make([]cache.ImmutableRef, 0, len(g.op.Inputs)) - for _, inp := range g.op.Inputs { - for i, v := range g.inputs { - if v.dgst == digest.Digest(inp.Digest) { - inputs = append(inputs, refs[i][int(inp.Index)].Clone()) - } - } + inputRefs := make([]Reference, 0, len(g.inputs)) + for i, inp := range g.inputs { + inputRefs = append(inputRefs, refs[i][inp.index].Clone()) } defer func() { - for _, r := range inputs { + for _, r := range inputRefs { go r.Release(context.TODO()) } }() @@ -127,7 +151,7 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac } } - pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.dgst)) + pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.Digest())) defer pw.Close() g.notifyStarted(ctx) @@ -135,76 +159,26 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac g.notifyCompleted(ctx, retErr) }() - _, err := s.active.Do(ctx, g.dgst.String(), func(doctx context.Context) (interface{}, error) { - if hit := s.active.probe(j, g.dgst); hit { - if err := s.active.writeProgressSnapshot(ctx, g.dgst); err != nil { + _, err := s.active.Do(ctx, g.digest.String(), func(doctx context.Context) (interface{}, error) { + if hit := s.active.probe(j, g.digest); hit { + if err := s.active.writeProgressSnapshot(ctx, g.digest); err != nil { return nil, err } return nil, nil } - refs, err := s.runVertex(doctx, g, inputs) + op, err := s.resolve(g) if err != nil { return nil, err } - s.active.set(doctx, g.dgst, refs) + refs, err := op.Run(doctx, inputRefs) + if err != nil { + return nil, err + } + s.active.set(doctx, g.digest, refs) return nil, nil }) if err != nil { return nil, err } - return s.active.get(g.dgst) -} - -func (s *Solver) runVertex(ctx context.Context, g *opVertex, inputs []cache.ImmutableRef) ([]cache.ImmutableRef, error) { - switch op := g.op.Op.(type) { - case *pb.Op_Source: - return runSourceOp(ctx, s.opt.SourceManager, op) - case *pb.Op_Exec: - return runExecOp(ctx, s.opt.CacheManager, s.opt.Worker, op, inputs) - default: - return nil, errors.Errorf("invalid op type %T", g.op.Op) - } -} - -type opVertex struct { - mu sync.Mutex - op *pb.Op - inputs []*opVertex - err error - dgst digest.Digest - vtx client.Vertex -} - -func (g *opVertex) inputRequiresExport(i int) bool { - return true // TODO -} - -func (g *opVertex) notifyStarted(ctx context.Context) { - pw, _, _ := progress.FromContext(ctx) - defer pw.Close() - now := time.Now() - g.vtx.Started = &now - pw.Write(g.dgst.String(), g.vtx) -} - -func (g *opVertex) notifyCompleted(ctx context.Context, err error) { - pw, _, _ := progress.FromContext(ctx) - defer pw.Close() - now := time.Now() - g.vtx.Completed = &now - if err != nil { - g.vtx.Error = err.Error() - } - pw.Write(g.dgst.String(), g.vtx) -} - -func (g *opVertex) name() string { - switch op := g.op.Op.(type) { - case *pb.Op_Source: - return op.Source.Identifier - case *pb.Op_Exec: - return strings.Join(op.Exec.Meta.Args, " ") - default: - return "unknown" - } + return s.active.get(g.digest) } diff --git a/solver/source.go b/solver/source.go index 7986ab7b..647deeb4 100644 --- a/solver/source.go +++ b/solver/source.go @@ -1,21 +1,31 @@ package solver import ( - "context" - - "github.com/moby/buildkit/cache" "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/source" + "golang.org/x/net/context" ) -func runSourceOp(ctx context.Context, sm *source.Manager, op *pb.Op_Source) ([]cache.ImmutableRef, error) { - id, err := source.FromString(op.Source.Identifier) - if err != nil { - return nil, err - } - ref, err := sm.Pull(ctx, id) - if err != nil { - return nil, err - } - return []cache.ImmutableRef{ref}, nil +type sourceOp struct { + op *pb.Op_Source + sm *source.Manager +} + +func newSourceOp(op *pb.Op_Source, sm *source.Manager) (Op, error) { + return &sourceOp{ + op: op, + sm: sm, + }, nil +} + +func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) { + id, err := source.FromString(s.op.Source.Identifier) + if err != nil { + return nil, err + } + ref, err := s.sm.Pull(ctx, id) + if err != nil { + return nil, err + } + return []Reference{ref}, nil } diff --git a/solver/vertex.go b/solver/vertex.go new file mode 100644 index 00000000..1f0eed0c --- /dev/null +++ b/solver/vertex.go @@ -0,0 +1,98 @@ +package solver + +import ( + "sync" + "time" + + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/util/progress" + digest "github.com/opencontainers/go-digest" + "golang.org/x/net/context" +) + +// Vertex is one node in the build graph +type Vertex interface { + // Digest is a content-addressable vertex identifier + Digest() digest.Digest + // Sys returns an internal value that is used to execute the vertex. Usually + // this is capured by the operation resolver method during solve. + Sys() interface{} + // Array of vertexes current vertex depends on. + Inputs() []Input + Name() string // change this to general metadata +} + +// Input is an pointer to a single reference from a vertex by an index. +type Input struct { + Index int + Vertex Vertex +} + +type input struct { + index int + vertex *vertex +} + +type vertex struct { + mu sync.Mutex + sys interface{} + inputs []*input + err error + digest digest.Digest + clientVertex client.Vertex + name string +} + +func (v *vertex) initClientVertex() { + inputDigests := make([]digest.Digest, 0, len(v.inputs)) + for _, inp := range v.inputs { + inputDigests = append(inputDigests, inp.vertex.Digest()) + } + v.clientVertex = client.Vertex{ + Inputs: inputDigests, + Name: v.Name(), + Digest: v.digest, + } +} + +func (v *vertex) Digest() digest.Digest { + return v.digest +} + +func (v *vertex) Sys() interface{} { + return v.sys +} + +func (v *vertex) Inputs() (inputs []Input) { + for _, i := range v.inputs { + inputs = append(inputs, Input{i.index, i.vertex}) + } + return +} + +func (v *vertex) Name() string { + return v.name +} + +func (v *vertex) inputRequiresExport(i int) bool { + return true // TODO +} + +func (v *vertex) notifyStarted(ctx context.Context) { + pw, _, _ := progress.FromContext(ctx) + defer pw.Close() + now := time.Now() + v.clientVertex.Started = &now + pw.Write(v.Digest().String(), v.clientVertex) +} + +func (v *vertex) notifyCompleted(ctx context.Context, err error) { + pw, _, _ := progress.FromContext(ctx) + defer pw.Close() + now := time.Now() + v.clientVertex.Completed = &now + if err != nil { + v.clientVertex.Error = err.Error() + } + pw.Write(v.Digest().String(), v.clientVertex) +} diff --git a/source/manager.go b/source/manager.go index 7726c903..b7c9f67f 100644 --- a/source/manager.go +++ b/source/manager.go @@ -13,6 +13,16 @@ type Source interface { Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error) } +// type Source interface { +// ID() string +// Resolve(ctx context.Context, id Identifier) (SourceInstance, error) +// } +// +// type SourceInstance interface { +// GetCacheKey(ctx context.Context) ([]string, error) +// GetSnapshot(ctx context.Context) (cache.ImmutableRef, error) +// } + type Manager struct { mu sync.Mutex sources map[string]Source