Merge pull request #140 from AkihiroSuda/ignorecache
solver: implement IgnoreCache LLB Metadatadocker-18.09
commit
5ebab3d126
|
@ -55,7 +55,7 @@ var buildCommand = cli.Command{
|
|||
},
|
||||
cli.BoolFlag{
|
||||
Name: "no-cache",
|
||||
Usage: "Disable cache for all the vertices. (Not yet implemented.) Frontend is not supported.",
|
||||
Usage: "Disable cache for all the vertices. Frontend is not supported.",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "export-cache",
|
||||
|
|
|
@ -41,6 +41,63 @@ func newJobList() *jobList {
|
|||
return jl
|
||||
}
|
||||
|
||||
// jobInstructionCache implements InstructionCache.
|
||||
// jobInstructionCache is instantiated for each of job instances rather than jobList or solver instances.
|
||||
// Lookup for objects with IgnoreCache fail until Set is called.
|
||||
type jobInstructionCache struct {
|
||||
mu sync.RWMutex
|
||||
InstructionCache
|
||||
ignoreCache map[digest.Digest]struct{}
|
||||
setInThisJob map[digest.Digest]struct{}
|
||||
}
|
||||
|
||||
// Probe implements InstructionCache
|
||||
func (jic *jobInstructionCache) Probe(ctx context.Context, key digest.Digest) (bool, error) {
|
||||
jic.mu.RLock()
|
||||
defer jic.mu.RUnlock()
|
||||
_, ignoreCache := jic.ignoreCache[key]
|
||||
_, setInThisJob := jic.setInThisJob[key]
|
||||
if ignoreCache && !setInThisJob {
|
||||
return false, nil
|
||||
}
|
||||
return jic.InstructionCache.Probe(ctx, key)
|
||||
}
|
||||
|
||||
// Lookup implements InstructionCache
|
||||
func (jic *jobInstructionCache) Lookup(ctx context.Context, key digest.Digest, msg string) (interface{}, error) {
|
||||
jic.mu.RLock()
|
||||
defer jic.mu.RUnlock()
|
||||
_, ignoreCache := jic.ignoreCache[key]
|
||||
_, setInThisJob := jic.setInThisJob[key]
|
||||
if ignoreCache && !setInThisJob {
|
||||
return nil, nil
|
||||
}
|
||||
return jic.InstructionCache.Lookup(ctx, key, msg)
|
||||
}
|
||||
|
||||
// Set implements InstructionCache
|
||||
func (jic *jobInstructionCache) Set(key digest.Digest, ref interface{}) error {
|
||||
jic.mu.Lock()
|
||||
defer jic.mu.Unlock()
|
||||
jic.setInThisJob[key] = struct{}{}
|
||||
return jic.InstructionCache.Set(key, ref)
|
||||
}
|
||||
|
||||
// SetIgnoreCache is jobInstructionCache-specific extension
|
||||
func (jic *jobInstructionCache) SetIgnoreCache(key digest.Digest) {
|
||||
jic.mu.Lock()
|
||||
defer jic.mu.Unlock()
|
||||
jic.ignoreCache[key] = struct{}{}
|
||||
}
|
||||
|
||||
func newJobInstructionCache(base InstructionCache) *jobInstructionCache {
|
||||
return &jobInstructionCache{
|
||||
InstructionCache: base,
|
||||
ignoreCache: make(map[digest.Digest]struct{}),
|
||||
setInThisJob: make(map[digest.Digest]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (jl *jobList) new(ctx context.Context, id string, pr progress.Reader, cache InstructionCache) (context.Context, *job, error) {
|
||||
jl.mu.Lock()
|
||||
defer jl.mu.Unlock()
|
||||
|
@ -52,7 +109,8 @@ func (jl *jobList) new(ctx context.Context, id string, pr progress.Reader, cache
|
|||
pw, _, _ := progress.FromContext(ctx) // TODO: remove this
|
||||
sid := session.FromContext(ctx)
|
||||
|
||||
j := &job{l: jl, pr: progress.NewMultiReader(pr), pw: pw, session: sid, cache: cache, cached: map[string]*cacheRecord{}}
|
||||
// TODO(AkihiroSuda): find a way to integrate map[string]*cacheRecord to jobInstructionCache?
|
||||
j := &job{l: jl, pr: progress.NewMultiReader(pr), pw: pw, session: sid, cache: newJobInstructionCache(cache), cached: map[string]*cacheRecord{}}
|
||||
jl.refs[id] = j
|
||||
jl.updateCond.Broadcast()
|
||||
go func() {
|
||||
|
@ -96,7 +154,7 @@ type job struct {
|
|||
pr *progress.MultiReader
|
||||
pw progress.Writer
|
||||
session string
|
||||
cache InstructionCache
|
||||
cache *jobInstructionCache
|
||||
cached map[string]*cacheRecord
|
||||
}
|
||||
|
||||
|
@ -114,14 +172,14 @@ func (j *job) load(def *pb.Definition, resolveOp ResolveOpFunc) (*Input, error)
|
|||
}
|
||||
|
||||
func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*Input, error) {
|
||||
vtx, idx, err := loadLLB(def, func(dgst digest.Digest, op *pb.Op, load func(digest.Digest) (interface{}, error)) (interface{}, error) {
|
||||
vtx, idx, err := loadLLB(def, func(dgst digest.Digest, pbOp *pb.Op, load func(digest.Digest) (interface{}, error)) (interface{}, error) {
|
||||
if st, ok := j.l.actives[dgst]; ok {
|
||||
if vtx, ok := st.jobs[j]; ok {
|
||||
return vtx, nil
|
||||
}
|
||||
}
|
||||
opMetadata := def.Metadata[dgst]
|
||||
vtx, err := newVertex(dgst, op, &opMetadata, load)
|
||||
vtx, err := newVertex(dgst, pbOp, &opMetadata, load)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -143,6 +201,15 @@ func (j *job) loadInternal(def *pb.Definition, resolveOp ResolveOpFunc) (*Input,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i, input := range pbOp.Inputs {
|
||||
if inputMetadata := def.Metadata[input.Digest]; inputMetadata.IgnoreCache {
|
||||
k, err := s.CacheKey(ctx, Index(i))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
j.cache.SetIgnoreCache(k)
|
||||
}
|
||||
}
|
||||
st.solver = s
|
||||
|
||||
j.l.actives[dgst] = st
|
||||
|
@ -218,9 +285,6 @@ func (j *job) cacheExporter(ref Reference) (CacheExporter, error) {
|
|||
}
|
||||
|
||||
func getRef(s VertexSolver, ctx context.Context, v *vertex, index Index, cache InstructionCache) (Reference, error) {
|
||||
if v.metadata != nil && v.metadata.GetIgnoreCache() {
|
||||
logrus.Warnf("Unimplemented vertex metadata: IgnoreCache (%s, %s)", v.digest, v.name)
|
||||
}
|
||||
k, err := s.CacheKey(ctx, index)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
Loading…
Reference in New Issue