solver: first pass of instruction cache

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2017-07-06 13:15:54 -07:00
parent c9e2493f06
commit 25ba9d72de
15 changed files with 520 additions and 222 deletions

112
cache/instructioncache/cache.go vendored Normal file
View File

@ -0,0 +1,112 @@
package instructioncache
import (
"github.com/boltdb/bolt"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
const cacheKey = "buildkit.instructioncache"
type cacheGroup struct {
Snapshots []string `json:"snapshots"`
}
type LocalStore struct {
MetadataStore *metadata.Store
Cache cache.Accessor
}
func (ls *LocalStore) Set(key string, refsAny []interface{}) error {
refs, err := toReferenceArray(refsAny)
if err != nil {
return err
}
cg := cacheGroup{}
for _, r := range refs {
cg.Snapshots = append(cg.Snapshots, r.ID())
}
v, err := metadata.NewValue(cg)
if err != nil {
return err
}
v.Index = index(key)
for _, r := range refs {
si, _ := ls.MetadataStore.Get(r.ID())
if err := si.Update(func(b *bolt.Bucket) error { // TODO: should share transaction
return si.SetValue(b, index(key), *v)
}); err != nil {
return err
}
}
return nil
}
func (ls *LocalStore) Lookup(ctx context.Context, key string) ([]interface{}, error) {
snaps, err := ls.MetadataStore.Search(index(key))
if err != nil {
return nil, err
}
refs := make([]cache.ImmutableRef, 0)
var retErr error
loop0:
for _, s := range snaps {
retErr = nil
for _, r := range refs {
r.Release(context.TODO())
}
refs = nil
v := s.Get(index(key))
if v != nil {
var cg cacheGroup
if err = v.Unmarshal(&cg); err != nil {
retErr = err
continue
}
for _, id := range cg.Snapshots {
r, err := ls.Cache.Get(ctx, id)
if err != nil {
retErr = err
continue loop0
}
refs = append(refs, r)
}
retErr = nil
break
}
}
if retErr != nil {
for _, r := range refs {
r.Release(context.TODO())
}
refs = nil
}
return toAny(refs), retErr
}
func index(k string) string {
return cacheKey + "::" + k
}
func toReferenceArray(in []interface{}) ([]cache.ImmutableRef, error) {
out := make([]cache.ImmutableRef, 0, len(in))
for _, i := range in {
r, ok := i.(cache.ImmutableRef)
if !ok {
return nil, errors.Errorf("invalid reference")
}
out = append(out, r)
}
return out, nil
}
func toAny(in []cache.ImmutableRef) []interface{} {
out := make([]interface{}, 0, len(in))
for _, i := range in {
out = append(out, i)
}
return out
}

View File

@ -210,6 +210,14 @@ func (s *StorageItem) Update(fn func(b *bolt.Bucket) error) error {
return s.storage.Update(s.id, fn)
}
func (s *StorageItem) Keys() []string {
keys := make([]string, 0, len(s.values))
for k := range s.values {
keys = append(keys, k)
}
return keys
}
func (s *StorageItem) Get(k string) *Value {
return s.values[k]
}

View File

@ -69,6 +69,7 @@ func (c *Client) Solve(ctx context.Context, r io.Reader, statusChan chan *SolveS
Started: v.Started,
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
})
}
for _, v := range resp.Statuses {

View File

@ -15,10 +15,11 @@ import (
)
type Opt struct {
Snapshotter snapshot.Snapshotter
CacheManager cache.Manager
Worker worker.Worker
SourceManager *source.Manager
Snapshotter snapshot.Snapshotter
CacheManager cache.Manager
Worker worker.Worker
SourceManager *source.Manager
InstructionCache solver.InstructionCache
}
type Controller struct { // TODO: ControlService
@ -30,9 +31,10 @@ func NewController(opt Opt) (*Controller, error) {
c := &Controller{
opt: opt,
solver: solver.NewLLBSolver(solver.LLBOpt{
SourceManager: opt.SourceManager,
CacheManager: opt.CacheManager,
Worker: opt.Worker,
SourceManager: opt.SourceManager,
CacheManager: opt.CacheManager,
Worker: opt.Worker,
InstructionCache: opt.InstructionCache,
}),
}
return c, nil
@ -98,6 +100,7 @@ func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Con
Started: v.Started,
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
})
}
for _, v := range ss.Statuses {

View File

@ -9,6 +9,7 @@ import (
"github.com/containerd/containerd/rootfs"
ctdsnapshot "github.com/containerd/containerd/snapshot"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/instructioncache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/snapshot/blobmapping"
"github.com/moby/buildkit/source"
@ -44,6 +45,11 @@ func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) {
return nil, err
}
ic := &instructioncache.LocalStore{
MetadataStore: md,
Cache: cm,
}
sm, err := source.NewManager()
if err != nil {
return nil, err
@ -62,8 +68,9 @@ func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) {
sm.Register(is)
return &Opt{
Snapshotter: snapshotter,
CacheManager: cm,
SourceManager: sm,
Snapshotter: snapshotter,
CacheManager: cm,
SourceManager: sm,
InstructionCache: ic,
}, nil
}

View File

@ -65,7 +65,10 @@ func TestControl(t *testing.T) {
img, err := source.NewImageIdentifier("docker.io/library/busybox:latest")
assert.NoError(t, err)
snap, err := sm.Pull(ctx, img)
src, err := sm.Resolve(ctx, img)
assert.NoError(t, err)
snap, err := src.Snapshot(ctx)
assert.NoError(t, err)
mounts, err := snap.Mount(ctx)

View File

@ -1,6 +1,7 @@
package solver
import (
"encoding/json"
"io"
"os"
@ -10,24 +11,39 @@ import (
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
type execOp struct {
op *pb.Op_Exec
op *pb.ExecOp
cm cache.Manager
w worker.Worker
}
func newExecOp(op *pb.Op_Exec, cm cache.Manager, w worker.Worker) (Op, error) {
return &execOp{
op: op,
op: op.Exec,
cm: cm,
w: w,
}, nil
}
func (e *execOp) CacheKey(ctx context.Context, inputs []string) (string, error) {
dt, err := json.Marshal(struct {
Inputs []string
Exec *pb.ExecOp
}{
Inputs: inputs,
Exec: e.op,
})
if err != nil {
return "", err
}
return digest.FromBytes(dt).String(), nil
}
func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, error) {
mounts := make(map[string]cache.Mountable)
@ -44,7 +60,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
}
}()
for _, m := range e.op.Exec.Mounts {
for _, m := range e.op.Mounts {
var mountable cache.Mountable
if int(m.Input) > len(inputs) {
return nil, errors.Errorf("missing input %d", m.Input)
@ -72,9 +88,9 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
}
meta := worker.Meta{
Args: e.op.Exec.Meta.Args,
Env: e.op.Exec.Meta.Env,
Cwd: e.op.Exec.Meta.Cwd,
Args: e.op.Meta.Args,
Env: e.op.Meta.Env,
Cwd: e.op.Meta.Cwd,
}
stdout := newStreamWriter(ctx, 1)

View File

@ -1,159 +0,0 @@
package solver
import (
"sync"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// refCache holds the references to snapshots what are currently activve
// and allows sharing them between jobs
type refCache struct {
mu sync.Mutex
cache map[digest.Digest]*cachedReq
flightcontrol.Group
}
type cachedReq struct {
jobs map[*job]struct{}
value []*sharedRef
progressCtx context.Context
}
func (c *refCache) probe(j *job, key digest.Digest) bool {
c.mu.Lock()
if c.cache == nil {
c.cache = make(map[digest.Digest]*cachedReq)
}
cr, ok := c.cache[key]
if !ok {
cr = &cachedReq{jobs: make(map[*job]struct{})}
c.cache[key] = cr
}
cr.jobs[j] = struct{}{}
if ok && cr.value != nil {
c.mu.Unlock()
return true
}
c.mu.Unlock()
return false
}
func (c *refCache) get(key digest.Digest) ([]Reference, error) {
c.mu.Lock()
defer c.mu.Unlock()
v, ok := c.cache[key]
// these errors should not be reached
if !ok {
return nil, errors.Errorf("no ref cache found")
}
if v.value == nil {
return nil, errors.Errorf("no ref cache value set")
}
refs := make([]Reference, 0, len(v.value))
for _, r := range v.value {
refs = append(refs, r.Clone())
}
return refs, nil
}
func (c *refCache) set(ctx context.Context, key digest.Digest, refs []Reference) {
c.mu.Lock()
sharedRefs := make([]*sharedRef, 0, len(refs))
for _, r := range refs {
sharedRefs = append(sharedRefs, newSharedRef(r))
}
c.cache[key].value = sharedRefs
c.cache[key].progressCtx = ctx
c.mu.Unlock()
}
func (c *refCache) cancel(j *job) {
c.mu.Lock()
for k, r := range c.cache {
if _, ok := r.jobs[j]; ok {
delete(r.jobs, j)
}
if len(r.jobs) == 0 {
for _, r := range r.value {
go r.Release(context.TODO())
}
delete(c.cache, k)
}
}
c.mu.Unlock()
}
func (c *refCache) writeProgressSnapshot(ctx context.Context, key digest.Digest) error {
pw, ok, _ := progress.FromContext(ctx)
if ok {
c.mu.Lock()
v, ok := c.cache[key]
if !ok {
c.mu.Unlock()
return errors.Errorf("no ref cache found")
}
pctx := v.progressCtx
c.mu.Unlock()
if pctx != nil {
return flightcontrol.WriteProgress(pctx, pw)
}
}
return nil
}
// sharedRef is a wrapper around releasable that allows you to make new
// releasable child objects
type sharedRef struct {
mu sync.Mutex
refs map[*sharedRefInstance]struct{}
main Reference
Reference
}
func newSharedRef(main Reference) *sharedRef {
mr := &sharedRef{
refs: make(map[*sharedRefInstance]struct{}),
Reference: main,
}
mr.main = mr.Clone()
return mr
}
func (mr *sharedRef) Clone() Reference {
mr.mu.Lock()
r := &sharedRefInstance{sharedRef: mr}
mr.refs[r] = struct{}{}
mr.mu.Unlock()
return r
}
func (mr *sharedRef) Release(ctx context.Context) error {
return mr.main.Release(ctx)
}
func (mr *sharedRef) Sys() Reference {
sys := mr.Reference
if s, ok := sys.(interface {
Sys() Reference
}); ok {
return s.Sys()
}
return sys
}
type sharedRefInstance struct {
*sharedRef
}
func (r *sharedRefInstance) Release(ctx context.Context) error {
r.sharedRef.mu.Lock()
defer r.sharedRef.mu.Unlock()
delete(r.sharedRef.refs, r)
if len(r.sharedRef.refs) == 0 {
return r.sharedRef.Reference.Release(ctx)
}
return nil
}

View File

@ -1,6 +1,7 @@
package solver
import (
"github.com/Sirupsen/logrus"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/solver/pb"
@ -13,9 +14,10 @@ import (
)
type LLBOpt struct {
SourceManager *source.Manager
CacheManager cache.Manager // TODO: this shouldn't be needed before instruction cache
Worker worker.Worker
SourceManager *source.Manager
CacheManager cache.Manager // TODO: this shouldn't be needed before instruction cache
Worker worker.Worker
InstructionCache InstructionCache
}
func NewLLBSolver(opt LLBOpt) *Solver {
@ -28,7 +30,7 @@ func NewLLBSolver(opt LLBOpt) *Solver {
default:
return nil, errors.Errorf("invalid op type %T", op)
}
})
}, opt.InstructionCache)
}
// ResolveOpFunc finds an Op implementation for a vertex
@ -41,22 +43,24 @@ type Reference interface {
// Op is an implementation for running a vertex
type Op interface {
// CacheKeys(context.Context, [][]string) ([]string, error)
CacheKey(context.Context, []string) (string, error)
Run(ctx context.Context, inputs []Reference) (outputs []Reference, err error)
}
// type Cache interface {
// Lookup(context.Context, string) ([]Reference, error)
// }
type Solver struct {
resolve ResolveOpFunc
jobs *jobList
active refCache
type InstructionCache interface {
Lookup(ctx context.Context, key string) ([]interface{}, error) // TODO: regular ref
Set(key string, refs []interface{}) error
}
func New(resolve ResolveOpFunc) *Solver {
return &Solver{resolve: resolve, jobs: newJobList()}
type Solver struct {
resolve ResolveOpFunc
jobs *jobList
activeState activeState
cache InstructionCache
}
func New(resolve ResolveOpFunc, cache InstructionCache) *Solver {
return &Solver{resolve: resolve, jobs: newJobList(), cache: cache}
}
func (s *Solver) Solve(ctx context.Context, id string, v Vertex) error {
@ -78,7 +82,7 @@ func (s *Solver) Solve(ctx context.Context, id string, v Vertex) error {
refs, err := s.getRefs(ctx, j, j.g)
closeProgressWriter()
s.active.cancel(j)
s.activeState.cancel(j)
if err != nil {
return err
}
@ -99,9 +103,77 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.
return j.pipe(ctx, statusChan)
}
func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Reference, retErr error) {
func (s *Solver) getCacheKey(ctx context.Context, j *job, g *vertex) (cacheKey string, retErr error) {
state, err := s.activeState.vertexState(j, g.digest, func() (Op, error) {
return s.resolve(g)
})
if err != nil {
return "", err
}
s.active.probe(j, g.digest) // this registers the key with the job
inputs := make([]string, len(g.inputs))
if len(g.inputs) > 0 {
eg, ctx := errgroup.WithContext(ctx)
for i, in := range g.inputs {
func(i int, in *vertex) {
eg.Go(func() error {
k, err := s.getCacheKey(ctx, j, in)
if err != nil {
return err
}
inputs[i] = k
return nil
})
}(i, in.vertex)
}
if err := eg.Wait(); err != nil {
return "", err
}
}
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.Digest()))
defer pw.Close()
if len(g.inputs) == 0 {
g.notifyStarted(ctx)
defer func() {
g.notifyCompleted(ctx, false, retErr)
}()
}
return state.GetCacheKey(ctx, func(ctx context.Context, op Op) (string, error) {
return op.CacheKey(ctx, inputs)
})
}
func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Reference, retErr error) {
state, err := s.activeState.vertexState(j, g.digest, func() (Op, error) {
return s.resolve(g)
})
if err != nil {
return nil, err
}
var cacheKey string
if s.cache != nil {
var err error
cacheKey, err = s.getCacheKey(ctx, j, g)
if err != nil {
return nil, err
}
cacheRefsAny, err := s.cache.Lookup(ctx, cacheKey)
if err != nil {
return nil, err
}
if len(cacheRefsAny) > 0 {
cacheRefs, err := toReferenceArray(cacheRefsAny)
if err != nil {
return nil, err
}
g.recursiveMarkCached(ctx)
return cacheRefs, nil
}
}
// refs contains all outputs for all input vertexes
refs := make([][]*sharedRef, len(g.inputs))
@ -156,29 +228,39 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer
g.notifyStarted(ctx)
defer func() {
g.notifyCompleted(ctx, retErr)
g.notifyCompleted(ctx, false, retErr)
}()
_, err := s.active.Do(ctx, g.digest.String(), func(doctx context.Context) (interface{}, error) {
if hit := s.active.probe(j, g.digest); hit {
if err := s.active.writeProgressSnapshot(ctx, g.digest); err != nil {
return nil, err
return state.GetRefs(ctx, func(ctx context.Context, op Op) ([]Reference, error) {
refs, err := op.Run(ctx, inputRefs)
if err != nil {
return nil, err
}
if s.cache != nil {
if err := s.cache.Set(cacheKey, toAny(refs)); err != nil {
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
}
return nil, nil
}
op, err := s.resolve(g)
if err != nil {
return nil, err
}
refs, err := op.Run(doctx, inputRefs)
if err != nil {
return nil, err
}
s.active.set(doctx, g.digest, refs)
return nil, nil
return refs, nil
})
if err != nil {
return nil, err
}
return s.active.get(g.digest)
}
func toReferenceArray(in []interface{}) ([]Reference, error) {
out := make([]Reference, 0, len(in))
for _, i := range in {
r, ok := i.(Reference)
if !ok {
return nil, errors.Errorf("invalid reference")
}
out = append(out, r)
}
return out, nil
}
func toAny(in []Reference) []interface{} {
out := make([]interface{}, 0, len(in))
for _, i := range in {
out = append(out, i)
}
return out
}

View File

@ -1,14 +1,18 @@
package solver
import (
"sync"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
"golang.org/x/net/context"
)
type sourceOp struct {
op *pb.Op_Source
sm *source.Manager
mu sync.Mutex
op *pb.Op_Source
sm *source.Manager
src source.SourceInstance
}
func newSourceOp(op *pb.Op_Source, sm *source.Manager) (Op, error) {
@ -18,7 +22,11 @@ func newSourceOp(op *pb.Op_Source, sm *source.Manager) (Op, error) {
}, nil
}
func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) {
func (s *sourceOp) instance(ctx context.Context) (source.SourceInstance, error) {
s.mu.Lock()
if s.src != nil {
return s.src, nil
}
id, err := source.FromString(s.op.Source.Identifier)
if err != nil {
return nil, err
@ -27,6 +35,24 @@ func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error)
if err != nil {
return nil, err
}
s.src = src
s.mu.Unlock()
return s.src, nil
}
func (s *sourceOp) CacheKey(ctx context.Context, _ []string) (string, error) {
src, err := s.instance(ctx)
if err != nil {
return "", err
}
return src.CacheKey(ctx)
}
func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) {
src, err := s.instance(ctx)
if err != nil {
return nil, err
}
ref, err := src.Snapshot(ctx)
if err != nil {
return nil, err

183
solver/state.go Normal file
View File

@ -0,0 +1,183 @@
package solver
import (
"sync"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"golang.org/x/net/context"
)
// activeState holds the references to snapshots what are currently activve
// and allows sharing them between jobs
type activeState struct {
mu sync.Mutex
states map[digest.Digest]*state
flightcontrol.Group
}
type state struct {
*activeState
key digest.Digest
jobs map[*job]struct{}
refs []*sharedRef
cacheKey string
op Op
progressCtx context.Context
cacheCtx context.Context
}
func (s *activeState) vertexState(j *job, key digest.Digest, cb func() (Op, error)) (*state, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.states == nil {
s.states = map[digest.Digest]*state{}
}
st, ok := s.states[key]
if !ok {
op, err := cb()
if err != nil {
return nil, err
}
st = &state{key: key, jobs: map[*job]struct{}{}, op: op, activeState: s}
s.states[key] = st
}
st.jobs[j] = struct{}{}
return st, nil
}
func (s *activeState) cancel(j *job) {
s.mu.Lock()
defer s.mu.Unlock()
for k, st := range s.states {
if _, ok := st.jobs[j]; ok {
delete(st.jobs, j)
}
if len(st.jobs) == 0 {
for _, r := range st.refs {
go r.Release(context.TODO())
}
delete(s.states, k)
}
}
}
func (s *state) GetRefs(ctx context.Context, cb func(context.Context, Op) ([]Reference, error)) ([]Reference, error) {
_, err := s.Do(ctx, s.key.String(), func(doctx context.Context) (interface{}, error) {
if s.refs != nil {
if err := writeProgressSnapshot(s.progressCtx, ctx); err != nil {
return nil, err
}
return nil, nil
}
refs, err := cb(doctx, s.op)
if err != nil {
return nil, err
}
sharedRefs := make([]*sharedRef, 0, len(refs))
for _, r := range refs {
sharedRefs = append(sharedRefs, newSharedRef(r))
}
s.refs = sharedRefs
s.progressCtx = doctx
return nil, nil
})
if err != nil {
return nil, err
}
refs := make([]Reference, 0, len(s.refs))
for _, r := range s.refs {
refs = append(refs, r.Clone())
}
return refs, nil
}
func (s *state) GetCacheKey(ctx context.Context, cb func(context.Context, Op) (string, error)) (string, error) {
_, err := s.Do(ctx, "cache:"+s.key.String(), func(doctx context.Context) (interface{}, error) {
if s.cacheKey != "" {
if err := writeProgressSnapshot(s.cacheCtx, ctx); err != nil {
return nil, err
}
return nil, nil
}
cacheKey, err := cb(doctx, s.op)
if err != nil {
return nil, err
}
s.cacheKey = cacheKey
s.cacheCtx = doctx
return nil, nil
})
if err != nil {
return "", err
}
return s.cacheKey, nil
}
func writeProgressSnapshot(srcCtx, destCtx context.Context) error {
pw, ok, _ := progress.FromContext(destCtx)
if ok {
if srcCtx != nil {
return flightcontrol.WriteProgress(srcCtx, pw)
}
}
return nil
}
// sharedRef is a wrapper around releasable that allows you to make new
// releasable child objects
type sharedRef struct {
mu sync.Mutex
refs map[*sharedRefInstance]struct{}
main Reference
Reference
}
func newSharedRef(main Reference) *sharedRef {
mr := &sharedRef{
refs: make(map[*sharedRefInstance]struct{}),
Reference: main,
}
mr.main = mr.Clone()
return mr
}
func (mr *sharedRef) Clone() Reference {
mr.mu.Lock()
r := &sharedRefInstance{sharedRef: mr}
mr.refs[r] = struct{}{}
mr.mu.Unlock()
return r
}
func (mr *sharedRef) Release(ctx context.Context) error {
return mr.main.Release(ctx)
}
func (mr *sharedRef) Sys() Reference {
sys := mr.Reference
if s, ok := sys.(interface {
Sys() Reference
}); ok {
return s.Sys()
}
return sys
}
type sharedRefInstance struct {
*sharedRef
}
func (r *sharedRefInstance) Release(ctx context.Context) error {
r.sharedRef.mu.Lock()
defer r.sharedRef.mu.Unlock()
delete(r.sharedRef.refs, r)
if len(r.sharedRef.refs) == 0 {
return r.sharedRef.Reference.Release(ctx)
}
return nil
}

View File

@ -86,13 +86,26 @@ func (v *vertex) notifyStarted(ctx context.Context) {
pw.Write(v.Digest().String(), v.clientVertex)
}
func (v *vertex) notifyCompleted(ctx context.Context, err error) {
func (v *vertex) notifyCompleted(ctx context.Context, cached bool, err error) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
if v.clientVertex.Started == nil {
v.clientVertex.Started = &now
}
v.clientVertex.Completed = &now
v.clientVertex.Cached = cached
if err != nil {
v.clientVertex.Error = err.Error()
}
pw.Write(v.Digest().String(), v.clientVertex)
}
func (v *vertex) recursiveMarkCached(ctx context.Context) {
for _, inp := range v.inputs {
inp.vertex.recursiveMarkCached(ctx)
}
if v.clientVertex.Started == nil {
v.notifyCompleted(ctx, true, nil)
}
}

View File

@ -100,11 +100,11 @@ func (p *puller) resolve(ctx context.Context) error {
return p.resolveErr
}
func (p *puller) CacheKey(ctx context.Context) ([]string, error) {
func (p *puller) CacheKey(ctx context.Context) (string, error) {
if err := p.resolve(ctx); err != nil {
return nil, err
return "", err
}
return []string{p.desc.Digest.String()}, nil
return p.desc.Digest.String(), nil
}
func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {

View File

@ -14,7 +14,7 @@ type Source interface {
}
type SourceInstance interface {
CacheKey(ctx context.Context) ([]string, error)
CacheKey(ctx context.Context) (string, error)
Snapshot(ctx context.Context) (cache.ImmutableRef, error)
}

View File

@ -176,6 +176,9 @@ func (t *trace) displayInfo() (d displayInfo) {
j.name = "ERROR " + j.name
}
}
if v.Cached {
j.name = "CACHED " + j.name
}
d.jobs = append(d.jobs, j)
for _, s := range v.statuses {
j := job{