diff --git a/cache/refs.go b/cache/refs.go index 2ddac6d1..2b25235e 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -375,10 +375,8 @@ func (sr *immutableRef) parentRefChain() []*immutableRef { } func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (snapshot.Mountable, error) { - if getBlobOnly(sr.md) { - if err := sr.Extract(ctx, s); err != nil { - return nil, err - } + if err := sr.Extract(ctx, s); err != nil { + return nil, err } sr.mu.Lock() @@ -387,6 +385,10 @@ func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Grou } func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr error) { + if !getBlobOnly(sr.md) { + return + } + ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary) if err != nil { return err diff --git a/solver/edge.go b/solver/edge.go index f6adca98..6ebf3a74 100644 --- a/solver/edge.go +++ b/solver/edge.go @@ -223,6 +223,14 @@ func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc { return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc } +// preprocessFunc returns result based cache func +func (e *edge) preprocessFunc(dep *dep) PreprocessFunc { + if e.cacheMap == nil { + return nil + } + return e.cacheMap.Deps[int(dep.index)].PreprocessFunc +} + // allDepsHaveKeys checks if all dependencies have at least one key. used for // determining if there is enough data for combining cache key for edge func (e *edge) allDepsHaveKeys(matching bool) bool { @@ -487,17 +495,20 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) { e.err = upt.Status().Err } } else if !dep.slowCacheComplete { - k := NewCacheKey(upt.Status().Value.(digest.Digest), -1) - dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}} - slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey} - defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys())) - for _, dk := range dep.result.CacheKeys() { - defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector}) - } - dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp}) + dgst := upt.Status().Value.(digest.Digest) + if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" { + k := NewCacheKey(dgst, -1) + dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}} + slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey} + defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys())) + for _, dk := range dep.result.CacheKeys() { + defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector}) + } + dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp}) - // connect def key to slow key - e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index) + // connect def key to slow key + e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index) + } dep.slowCacheComplete = true e.keysDidChange = true @@ -583,7 +594,7 @@ func (e *edge) recalcCurrentState() { stHigh := edgeStatusCacheSlow // maximum possible state if e.cacheMap != nil { for _, dep := range e.deps { - isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete)) + isSlowIncomplete := (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete)) if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete { stLow = dep.state @@ -804,15 +815,16 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, } } // initialize function to compute cache key based on dependency result - if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && e.slowCacheFunc(dep) != nil && e.cacheMap != nil { + if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && e.cacheMap != nil { + pfn := e.preprocessFunc(dep) fn := e.slowCacheFunc(dep) res := dep.result - func(fn ResultBasedCacheFunc, res Result, index Index) { + func(pfn PreprocessFunc, fn ResultBasedCacheFunc, res Result, index Index) { dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { - v, err := e.op.CalcSlowCache(ctx, index, fn, res) + v, err := e.op.CalcSlowCache(ctx, index, pfn, fn, res) return v, errors.Wrap(err, "failed to compute cache key") }) - }(fn, res, dep.index) + }(pfn, fn, res, dep.index) addedNew = true } } diff --git a/solver/jobs.go b/solver/jobs.go index 99e31b23..d34db5e7 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -268,6 +268,17 @@ func (jl *Solver) setEdge(e Edge, newEdge *edge) { st.setEdge(e.Index, newEdge) } +func (jl *Solver) getState(e Edge) *state { + jl.mu.RLock() + defer jl.mu.RUnlock() + + st, ok := jl.actives[e.Vertex.Digest()] + if !ok { + return nil + } + return st +} + func (jl *Solver) getEdge(e Edge) *edge { jl.mu.RLock() defer jl.mu.RUnlock() @@ -547,7 +558,7 @@ type activeOp interface { Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, err error) IgnoreCache() bool Cache() CacheManager - CalcSlowCache(context.Context, Index, ResultBasedCacheFunc, Result) (digest.Digest, error) + CalcSlowCache(context.Context, Index, PreprocessFunc, ResultBasedCacheFunc, Result) (digest.Digest, error) } func newSharedOp(resolver ResolveOpFunc, cacheManager CacheManager, st *state) *sharedOp { @@ -606,14 +617,14 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err return res, err } -func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBasedCacheFunc, res Result) (dgst digest.Digest, err error) { +func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessFunc, f ResultBasedCacheFunc, res Result) (dgst digest.Digest, err error) { defer func() { err = errdefs.WrapVertex(err, s.st.origDigest) }() key, err := s.g.Do(ctx, fmt.Sprintf("slow-compute-%d", index), func(ctx context.Context) (interface{}, error) { s.slowMu.Lock() // TODO: add helpers for these stored values - if res := s.slowCacheRes[index]; res != "" { + if res, ok := s.slowCacheRes[index]; ok { s.slowMu.Unlock() return res, nil } @@ -622,9 +633,23 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBased return err, nil } s.slowMu.Unlock() - ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan) - key, err := f(withAncestorCacheOpts(ctx, s.st), res, s.st) + complete := true + if p != nil { + st := s.st.solver.getState(s.st.vtx.Inputs()[index]) + ctx2 := opentracing.ContextWithSpan(progress.WithProgress(ctx, st.mpw), st.mspan) + err = p(ctx2, res, st) + if err != nil { + f = nil + ctx = ctx2 + } + } + + var key digest.Digest + if f != nil { + ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan) + key, err = f(withAncestorCacheOpts(ctx, s.st), res, s.st) + } if err != nil { select { case <-ctx.Done(): diff --git a/solver/llbsolver/ops/build.go b/solver/llbsolver/ops/build.go index 45f0d41a..5e7fbc7a 100644 --- a/solver/llbsolver/ops/build.go +++ b/solver/llbsolver/ops/build.go @@ -54,6 +54,7 @@ func (b *buildOp) CacheMap(ctx context.Context, g session.Group, index int) (*so Deps: make([]struct { Selector digest.Digest ComputeDigestFunc solver.ResultBasedCacheFunc + PreprocessFunc solver.PreprocessFunc }, len(b.v.Inputs())), }, true, nil } diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index 5938df7a..1ad18903 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -118,6 +118,7 @@ func (e *execOp) CacheMap(ctx context.Context, g session.Group, index int) (*sol Deps: make([]struct { Selector digest.Digest ComputeDigestFunc solver.ResultBasedCacheFunc + PreprocessFunc solver.PreprocessFunc }, e.numInputs), } @@ -137,6 +138,7 @@ func (e *execOp) CacheMap(ctx context.Context, g session.Group, index int) (*sol if !dep.NoContentBasedHash { cm.Deps[i].ComputeDigestFunc = llbsolver.NewContentHashFunc(toSelectors(dedupePaths(dep.Selectors))) } + cm.Deps[i].PreprocessFunc = llbsolver.UnlazyResultFunc } return cm, true, nil diff --git a/solver/llbsolver/ops/file.go b/solver/llbsolver/ops/file.go index f2f0f387..78b0c031 100644 --- a/solver/llbsolver/ops/file.go +++ b/solver/llbsolver/ops/file.go @@ -120,6 +120,7 @@ func (f *fileOp) CacheMap(ctx context.Context, g session.Group, index int) (*sol Deps: make([]struct { Selector digest.Digest ComputeDigestFunc solver.ResultBasedCacheFunc + PreprocessFunc solver.PreprocessFunc }, f.numInputs), } @@ -138,6 +139,9 @@ func (f *fileOp) CacheMap(ctx context.Context, g session.Group, index int) (*sol cm.Deps[idx].ComputeDigestFunc = llbsolver.NewContentHashFunc(dedupeSelectors(m)) } + for idx := range cm.Deps { + cm.Deps[idx].PreprocessFunc = llbsolver.UnlazyResultFunc + } return cm, true, nil } diff --git a/solver/llbsolver/result.go b/solver/llbsolver/result.go index 069b3025..a36891c7 100644 --- a/solver/llbsolver/result.go +++ b/solver/llbsolver/result.go @@ -21,6 +21,17 @@ type Selector struct { FollowLinks bool } +func UnlazyResultFunc(ctx context.Context, res solver.Result, g session.Group) error { + ref, ok := res.Sys().(*worker.WorkerRef) + if !ok { + return errors.Errorf("invalid reference: %T", res) + } + if ref.ImmutableRef == nil { + return nil + } + return ref.ImmutableRef.Extract(ctx, g) +} + func NewContentHashFunc(selectors []Selector) solver.ResultBasedCacheFunc { return func(ctx context.Context, res solver.Result, s session.Group) (digest.Digest, error) { ref, ok := res.Sys().(*worker.WorkerRef) diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 84067e8c..96a6dc39 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3517,6 +3517,7 @@ func (v *vertex) makeCacheMap() *CacheMap { Deps: make([]struct { Selector digest.Digest ComputeDigestFunc ResultBasedCacheFunc + PreprocessFunc PreprocessFunc }, len(v.Inputs())), } for i, f := range v.opt.slowCacheCompute { diff --git a/solver/types.go b/solver/types.go index 892beb35..76e4c911 100644 --- a/solver/types.go +++ b/solver/types.go @@ -148,6 +148,7 @@ type Op interface { } type ResultBasedCacheFunc func(context.Context, Result, session.Group) (digest.Digest, error) +type PreprocessFunc func(context.Context, Result, session.Group) error // CacheMap is a description for calculating the cache key of an operation. type CacheMap struct { @@ -173,6 +174,9 @@ type CacheMap struct { // For example, in LLB this is invoked to calculate the cache key based on // the checksum of file contents from input snapshots. ComputeDigestFunc ResultBasedCacheFunc + + // PreprocessFunc is a function that runs on an input before it is passed to op + PreprocessFunc PreprocessFunc } // Opts specifies generic options that will be passed to cache load calls if/when