Merge pull request #1773 from tonistiigi/preprocess

solver: add input preprocess capability
v0.8
Tibor Vass 2020-11-12 21:07:00 -08:00 committed by GitHub
commit ce283f39b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 86 additions and 24 deletions

6
cache/refs.go vendored
View File

@ -375,11 +375,9 @@ func (sr *immutableRef) parentRefChain() []*immutableRef {
}
func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (snapshot.Mountable, error) {
if getBlobOnly(sr.md) {
if err := sr.Extract(ctx, s); err != nil {
return nil, err
}
}
sr.mu.Lock()
defer sr.mu.Unlock()
@ -387,6 +385,10 @@ func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Grou
}
func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr error) {
if !getBlobOnly(sr.md) {
return
}
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return err

View File

@ -223,6 +223,14 @@ func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc {
return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc
}
// preprocessFunc returns result based cache func
func (e *edge) preprocessFunc(dep *dep) PreprocessFunc {
if e.cacheMap == nil {
return nil
}
return e.cacheMap.Deps[int(dep.index)].PreprocessFunc
}
// allDepsHaveKeys checks if all dependencies have at least one key. used for
// determining if there is enough data for combining cache key for edge
func (e *edge) allDepsHaveKeys(matching bool) bool {
@ -487,7 +495,9 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
e.err = upt.Status().Err
}
} else if !dep.slowCacheComplete {
k := NewCacheKey(upt.Status().Value.(digest.Digest), -1)
dgst := upt.Status().Value.(digest.Digest)
if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" {
k := NewCacheKey(dgst, -1)
dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys()))
@ -498,6 +508,7 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
// connect def key to slow key
e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index)
}
dep.slowCacheComplete = true
e.keysDidChange = true
@ -583,7 +594,7 @@ func (e *edge) recalcCurrentState() {
stHigh := edgeStatusCacheSlow // maximum possible state
if e.cacheMap != nil {
for _, dep := range e.deps {
isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
isSlowIncomplete := (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete {
stLow = dep.state
@ -804,15 +815,16 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
}
}
// initialize function to compute cache key based on dependency result
if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && e.slowCacheFunc(dep) != nil && e.cacheMap != nil {
if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && e.cacheMap != nil {
pfn := e.preprocessFunc(dep)
fn := e.slowCacheFunc(dep)
res := dep.result
func(fn ResultBasedCacheFunc, res Result, index Index) {
func(pfn PreprocessFunc, fn ResultBasedCacheFunc, res Result, index Index) {
dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
v, err := e.op.CalcSlowCache(ctx, index, fn, res)
v, err := e.op.CalcSlowCache(ctx, index, pfn, fn, res)
return v, errors.Wrap(err, "failed to compute cache key")
})
}(fn, res, dep.index)
}(pfn, fn, res, dep.index)
addedNew = true
}
}

View File

@ -268,6 +268,17 @@ func (jl *Solver) setEdge(e Edge, newEdge *edge) {
st.setEdge(e.Index, newEdge)
}
func (jl *Solver) getState(e Edge) *state {
jl.mu.RLock()
defer jl.mu.RUnlock()
st, ok := jl.actives[e.Vertex.Digest()]
if !ok {
return nil
}
return st
}
func (jl *Solver) getEdge(e Edge) *edge {
jl.mu.RLock()
defer jl.mu.RUnlock()
@ -547,7 +558,7 @@ type activeOp interface {
Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, err error)
IgnoreCache() bool
Cache() CacheManager
CalcSlowCache(context.Context, Index, ResultBasedCacheFunc, Result) (digest.Digest, error)
CalcSlowCache(context.Context, Index, PreprocessFunc, ResultBasedCacheFunc, Result) (digest.Digest, error)
}
func newSharedOp(resolver ResolveOpFunc, cacheManager CacheManager, st *state) *sharedOp {
@ -606,14 +617,14 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err
return res, err
}
func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBasedCacheFunc, res Result) (dgst digest.Digest, err error) {
func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessFunc, f ResultBasedCacheFunc, res Result) (dgst digest.Digest, err error) {
defer func() {
err = errdefs.WrapVertex(err, s.st.origDigest)
}()
key, err := s.g.Do(ctx, fmt.Sprintf("slow-compute-%d", index), func(ctx context.Context) (interface{}, error) {
s.slowMu.Lock()
// TODO: add helpers for these stored values
if res := s.slowCacheRes[index]; res != "" {
if res, ok := s.slowCacheRes[index]; ok {
s.slowMu.Unlock()
return res, nil
}
@ -622,9 +633,23 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBased
return err, nil
}
s.slowMu.Unlock()
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
key, err := f(withAncestorCacheOpts(ctx, s.st), res, s.st)
complete := true
if p != nil {
st := s.st.solver.getState(s.st.vtx.Inputs()[index])
ctx2 := opentracing.ContextWithSpan(progress.WithProgress(ctx, st.mpw), st.mspan)
err = p(ctx2, res, st)
if err != nil {
f = nil
ctx = ctx2
}
}
var key digest.Digest
if f != nil {
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
key, err = f(withAncestorCacheOpts(ctx, s.st), res, s.st)
}
if err != nil {
select {
case <-ctx.Done():

View File

@ -54,6 +54,7 @@ func (b *buildOp) CacheMap(ctx context.Context, g session.Group, index int) (*so
Deps: make([]struct {
Selector digest.Digest
ComputeDigestFunc solver.ResultBasedCacheFunc
PreprocessFunc solver.PreprocessFunc
}, len(b.v.Inputs())),
}, true, nil
}

View File

@ -118,6 +118,7 @@ func (e *execOp) CacheMap(ctx context.Context, g session.Group, index int) (*sol
Deps: make([]struct {
Selector digest.Digest
ComputeDigestFunc solver.ResultBasedCacheFunc
PreprocessFunc solver.PreprocessFunc
}, e.numInputs),
}
@ -137,6 +138,7 @@ func (e *execOp) CacheMap(ctx context.Context, g session.Group, index int) (*sol
if !dep.NoContentBasedHash {
cm.Deps[i].ComputeDigestFunc = llbsolver.NewContentHashFunc(toSelectors(dedupePaths(dep.Selectors)))
}
cm.Deps[i].PreprocessFunc = llbsolver.UnlazyResultFunc
}
return cm, true, nil

View File

@ -120,6 +120,7 @@ func (f *fileOp) CacheMap(ctx context.Context, g session.Group, index int) (*sol
Deps: make([]struct {
Selector digest.Digest
ComputeDigestFunc solver.ResultBasedCacheFunc
PreprocessFunc solver.PreprocessFunc
}, f.numInputs),
}
@ -138,6 +139,9 @@ func (f *fileOp) CacheMap(ctx context.Context, g session.Group, index int) (*sol
cm.Deps[idx].ComputeDigestFunc = llbsolver.NewContentHashFunc(dedupeSelectors(m))
}
for idx := range cm.Deps {
cm.Deps[idx].PreprocessFunc = llbsolver.UnlazyResultFunc
}
return cm, true, nil
}

View File

@ -21,6 +21,17 @@ type Selector struct {
FollowLinks bool
}
func UnlazyResultFunc(ctx context.Context, res solver.Result, g session.Group) error {
ref, ok := res.Sys().(*worker.WorkerRef)
if !ok {
return errors.Errorf("invalid reference: %T", res)
}
if ref.ImmutableRef == nil {
return nil
}
return ref.ImmutableRef.Extract(ctx, g)
}
func NewContentHashFunc(selectors []Selector) solver.ResultBasedCacheFunc {
return func(ctx context.Context, res solver.Result, s session.Group) (digest.Digest, error) {
ref, ok := res.Sys().(*worker.WorkerRef)

View File

@ -3517,6 +3517,7 @@ func (v *vertex) makeCacheMap() *CacheMap {
Deps: make([]struct {
Selector digest.Digest
ComputeDigestFunc ResultBasedCacheFunc
PreprocessFunc PreprocessFunc
}, len(v.Inputs())),
}
for i, f := range v.opt.slowCacheCompute {

View File

@ -148,6 +148,7 @@ type Op interface {
}
type ResultBasedCacheFunc func(context.Context, Result, session.Group) (digest.Digest, error)
type PreprocessFunc func(context.Context, Result, session.Group) error
// CacheMap is a description for calculating the cache key of an operation.
type CacheMap struct {
@ -173,6 +174,9 @@ type CacheMap struct {
// For example, in LLB this is invoked to calculate the cache key based on
// the checksum of file contents from input snapshots.
ComputeDigestFunc ResultBasedCacheFunc
// PreprocessFunc is a function that runs on an input before it is passed to op
PreprocessFunc PreprocessFunc
}
// Opts specifies generic options that will be passed to cache load calls if/when