diff --git a/solver-next/edge.go b/solver-next/edge.go index ae271601..0ced0c72 100644 --- a/solver-next/edge.go +++ b/solver-next/edge.go @@ -162,6 +162,9 @@ func (e *edge) probeCache(d *dep, keys []CacheKey) { if len(keys) == 0 { return } + if e.op.IgnoreCache() { + return + } records, err := e.op.Cache().Query(keys, d.index, e.cacheMap.Digest, e.edge.Index) if err != nil { e.err = errors.Wrap(err, "error on cache query") @@ -286,21 +289,23 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) { } else { e.cacheMap = upt.Status().Value.(*CacheMap) if len(e.deps) == 0 { - k := NewCacheKey(e.cacheMap.Digest, e.edge.Index, nil) - records, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index) - if err != nil { - logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error - } else { - for _, r := range records { - e.cacheRecords[r.ID] = r - } - if len(records) > 0 { - e.keys = append(e.keys, k) - } - if e.allDepsHaveKeys() { - e.keysDidChange = true + if !e.op.IgnoreCache() { + k := NewCacheKey(e.cacheMap.Digest, e.edge.Index, nil) + records, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index) + if err != nil { + logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error + } else { + for _, r := range records { + e.cacheRecords[r.ID] = r + } + if len(records) > 0 { + e.keys = append(e.keys, k) + } } } + if e.allDepsHaveKeys() { + e.keysDidChange = true + } e.state = edgeStatusCacheSlow } // probe keys that were loaded before cache map diff --git a/solver-next/index.go b/solver-next/index.go index fe6f7da7..400952f1 100644 --- a/solver-next/index.go +++ b/solver-next/index.go @@ -26,8 +26,8 @@ func (ei *EdgeIndex) LoadOrStore(e *edge, dgst digest.Digest, index Index, deps ei.mu.Lock() defer ei.mu.Unlock() - if e := ei.load(e, dgst, index, deps); e != nil { - return e + if old := ei.load(e, dgst, index, deps); old != nil && !(!old.edge.Vertex.Options().IgnoreCache && e.edge.Vertex.Options().IgnoreCache) { + return old } ei.store(e, dgst, index, deps) @@ -64,6 +64,12 @@ func (ei *EdgeIndex) load(ignore *edge, dgst digest.Digest, index Index, deps [] if !ok { return nil } + // prioritize edges with ignoreCache + for e := range m2 { + if e.edge.Vertex.Options().IgnoreCache && e != ignore { + return e + } + } for e := range m2 { if e != ignore { return e @@ -102,6 +108,13 @@ func (ei *EdgeIndex) load(ignore *edge, dgst digest.Digest, index Index, deps [] } } + // prioritize edges with ignoreCache + for m := range matches { + if m.edge.Vertex.Options().IgnoreCache { + return m + } + } + for m := range matches { return m } diff --git a/solver-next/jobs.go b/solver-next/jobs.go index 0d3ea6f5..165eae50 100644 --- a/solver-next/jobs.go +++ b/solver-next/jobs.go @@ -158,10 +158,11 @@ func (jl *JobList) GetEdge(e Edge) *edge { } func (jl *JobList) SubBuild(ctx context.Context, e Edge, parent Vertex) (CachedResult, error) { - if err := jl.load(e.Vertex, parent, nil); err != nil { + v, err := jl.load(e.Vertex, parent, nil) + if err != nil { return nil, err } - + e.Vertex = v return jl.s.build(ctx, e) } @@ -169,22 +170,45 @@ func (jl *JobList) Close() { jl.s.Stop() } -func (jl *JobList) load(v, parent Vertex, j *Job) error { +func (jl *JobList) load(v, parent Vertex, j *Job) (Vertex, error) { jl.mu.Lock() defer jl.mu.Unlock() return jl.loadUnlocked(v, parent, j) } -func (jl *JobList) loadUnlocked(v, parent Vertex, j *Job) error { - for _, e := range v.Inputs() { - if err := jl.loadUnlocked(e.Vertex, parent, j); err != nil { - return err +func (jl *JobList) loadUnlocked(v, parent Vertex, j *Job) (Vertex, error) { + inputs := make([]Edge, len(v.Inputs())) + for i, e := range v.Inputs() { + v, err := jl.loadUnlocked(e.Vertex, parent, j) + if err != nil { + return nil, err } + inputs[i] = Edge{Index: e.Index, Vertex: v} } dgst := v.Digest() - st, ok := jl.actives[dgst] + dgstWithoutCache := digest.FromBytes([]byte(fmt.Sprintf("%s-ignorecache", dgst))) + + st, ok := jl.actives[dgstWithoutCache] + + if !ok { + st, ok = jl.actives[dgst] + + // !ignorecache merges with ignorecache but ignorecache doesn't merge with !ignorecache + if ok && !st.vtx.Options().IgnoreCache && v.Options().IgnoreCache { + dgst = dgstWithoutCache + } + + v = &vertexWithCacheOptions{ + Vertex: v, + dgst: dgst, + inputs: inputs, + } + + st, ok = jl.actives[dgst] + } + if !ok { st = &state{ opts: jl.opts, @@ -220,14 +244,14 @@ func (jl *JobList) loadUnlocked(v, parent Vertex, j *Job) error { st.parents[parent.Digest()] = struct{}{} parentState, ok := jl.actives[parent.Digest()] if !ok { - return errors.Errorf("inactive parent %s", parent.Digest()) + return nil, errors.Errorf("inactive parent %s", parent.Digest()) } parentState.childVtx[dgst] = struct{}{} } } jl.connectProgressFromState(st, st) - return nil + return v, nil } func (jl *JobList) connectProgressFromState(target, src *state) { @@ -307,9 +331,11 @@ func (jl *JobList) deleteIfUnreferenced(k digest.Digest, st *state) { } func (j *Job) Build(ctx context.Context, e Edge) (CachedResult, error) { - if err := j.list.load(e.Vertex, nil, j); err != nil { + v, err := j.list.load(e.Vertex, nil, j) + if err != nil { return nil, err } + e.Vertex = v return j.list.s.build(ctx, e) } @@ -335,6 +361,7 @@ func (j *Job) Discard() error { type activeOp interface { Op + IgnoreCache() bool Cache() CacheManager CalcSlowCache(context.Context, Index, ResultBasedCacheFunc, Result) (digest.Digest, error) } @@ -369,6 +396,10 @@ type sharedOp struct { slowCacheErr map[Index]error } +func (s *sharedOp) IgnoreCache() bool { + return s.st.vtx.Options().IgnoreCache +} + func (s *sharedOp) Cache() CacheManager { return s.st.combinedCacheManager() } @@ -538,3 +569,17 @@ func unwrapShared(inp []*SharedResult) []Result { } return out } + +type vertexWithCacheOptions struct { + Vertex + inputs []Edge + dgst digest.Digest +} + +func (v *vertexWithCacheOptions) Digest() digest.Digest { + return v.dgst +} + +func (v *vertexWithCacheOptions) Inputs() []Edge { + return v.inputs +} diff --git a/solver-next/scheduler.go b/solver-next/scheduler.go index 8fd27b26..a29cd573 100644 --- a/solver-next/scheduler.go +++ b/solver-next/scheduler.go @@ -145,8 +145,9 @@ func (s *Scheduler) dispatch(e *edge) { origEdge := e.index.LoadOrStore(e, e.cacheMap.Digest, e.edge.Index, e.depKeys()) if origEdge != nil { logrus.Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name()) - s.mergeTo(origEdge, e) - s.ef.SetEdge(e.edge, origEdge) + if s.mergeTo(origEdge, e) { + s.ef.SetEdge(e.edge, origEdge) + } } e.keysDidChange = false } @@ -254,7 +255,10 @@ func (s *Scheduler) newRequestWithFunc(e *edge, f func(context.Context) (interfa } // mergeTo merges the state from one edge to another. source edge is discarded. -func (s *Scheduler) mergeTo(target, src *edge) { +func (s *Scheduler) mergeTo(target, src *edge) bool { + if !target.edge.Vertex.Options().IgnoreCache && src.edge.Vertex.Options().IgnoreCache { + return false + } for _, inc := range s.incoming[src] { inc.mu.Lock() inc.Target = target @@ -275,7 +279,8 @@ func (s *Scheduler) mergeTo(target, src *edge) { s.signal(target) // TODO(tonistiigi): merge cache providers - // TODO(tonistiigi): check ignore-cache compat before merge + + return true } // EdgeFactory allows access to the edges from a shared graph diff --git a/solver-next/scheduler_test.go b/solver-next/scheduler_test.go index d44bcd74..4b48ff27 100644 --- a/solver-next/scheduler_test.go +++ b/solver-next/scheduler_test.go @@ -1422,7 +1422,378 @@ func TestMultipleCacheSources(t *testing.T) { require.Equal(t, int64(0), cacheManager2.loadCounter) require.NoError(t, j1.Discard()) + j1 = nil +} + +func TestRepeatBuildWithIgnoreCache(t *testing.T) { + t.Parallel() + ctx := context.TODO() + + l := NewJobList(SolverOpt{ + ResolveOpFunc: testOpResolver, + }) + defer l.Close() + + j0, err := l.NewJob("j0") + require.NoError(t, err) + + defer func() { + if j0 != nil { + j0.Discard() + } + }() + + g0 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v0", + cacheKeySeed: "seed0", + value: "result0", + inputs: []Edge{ + {Vertex: vtx(vtxOpt{ + name: "v1", + cacheKeySeed: "seed1", + value: "result1", + })}, + }, + }), + } + g0.Vertex.(*vertex).setupCallCounters() + + res, err := j0.Build(ctx, g0) + require.NoError(t, err) + require.Equal(t, unwrap(res), "result0") + require.Equal(t, int64(2), *g0.Vertex.(*vertex).cacheCallCount) + require.Equal(t, int64(2), *g0.Vertex.(*vertex).execCallCount) + + require.NoError(t, j0.Discard()) j0 = nil + + // rebuild with ignore-cache reevaluates everything + + j1, err := l.NewJob("j1") + require.NoError(t, err) + + defer func() { + if j1 != nil { + j1.Discard() + } + }() + + g1 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v0", + cacheKeySeed: "seed0", + value: "result0-1", + ignoreCache: true, + inputs: []Edge{ + {Vertex: vtx(vtxOpt{ + name: "v1", + cacheKeySeed: "seed1", + value: "result1-1", + ignoreCache: true, + })}, + }, + }), + } + g1.Vertex.(*vertex).setupCallCounters() + + res, err = j1.Build(ctx, g1) + require.NoError(t, err) + require.Equal(t, unwrap(res), "result0-1") + require.Equal(t, int64(2), *g1.Vertex.(*vertex).cacheCallCount) + require.Equal(t, int64(2), *g1.Vertex.(*vertex).execCallCount) + + require.NoError(t, j1.Discard()) + j1 = nil + + // ignore-cache in child reevaluates parent + + j2, err := l.NewJob("j2") + require.NoError(t, err) + + defer func() { + if j2 != nil { + j2.Discard() + } + }() + + g2 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v0", + cacheKeySeed: "seed0", + value: "result0-2", + inputs: []Edge{ + {Vertex: vtx(vtxOpt{ + name: "v1", + cacheKeySeed: "seed1", + value: "result1-2", + ignoreCache: true, + })}, + }, + }), + } + g2.Vertex.(*vertex).setupCallCounters() + + res, err = j2.Build(ctx, g2) + require.NoError(t, err) + require.Equal(t, unwrap(res), "result0-2") + require.Equal(t, int64(2), *g2.Vertex.(*vertex).cacheCallCount) + require.Equal(t, int64(2), *g2.Vertex.(*vertex).execCallCount) + + require.NoError(t, j2.Discard()) + j2 = nil +} + +// TestIgnoreCacheResumeFromSlowCache tests that parent cache resumes if child +// with ignore-cache generates same slow cache key +func TestIgnoreCacheResumeFromSlowCache(t *testing.T) { + t.Parallel() + ctx := context.TODO() + + l := NewJobList(SolverOpt{ + ResolveOpFunc: testOpResolver, + }) + defer l.Close() + + j0, err := l.NewJob("j0") + require.NoError(t, err) + + defer func() { + if j0 != nil { + j0.Discard() + } + }() + + g0 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v0", + cacheKeySeed: "seed0", + value: "result0", + slowCacheCompute: map[int]ResultBasedCacheFunc{ + 0: digestFromResult, + }, + inputs: []Edge{ + {Vertex: vtx(vtxOpt{ + name: "v1", + cacheKeySeed: "seed1", + value: "result1", + })}, + }, + }), + } + g0.Vertex.(*vertex).setupCallCounters() + + res, err := j0.Build(ctx, g0) + require.NoError(t, err) + require.Equal(t, unwrap(res), "result0") + require.Equal(t, int64(2), *g0.Vertex.(*vertex).cacheCallCount) + require.Equal(t, int64(2), *g0.Vertex.(*vertex).execCallCount) + + require.NoError(t, j0.Discard()) + j0 = nil + + // rebuild reevaluates child, but not parent + + j1, err := l.NewJob("j1") + require.NoError(t, err) + + defer func() { + if j1 != nil { + j1.Discard() + } + }() + + g1 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v0-1", // doesn't matter but avoid match because another bug + cacheKeySeed: "seed0", + value: "result0-no-cache", + slowCacheCompute: map[int]ResultBasedCacheFunc{ + 0: digestFromResult, + }, + inputs: []Edge{ + {Vertex: vtx(vtxOpt{ + name: "v1-1", + cacheKeySeed: "seed1-1", // doesn't matter but avoid match because another bug + value: "result1", // same as g0 + ignoreCache: true, + })}, + }, + }), + } + g1.Vertex.(*vertex).setupCallCounters() + + res, err = j1.Build(ctx, g1) + require.NoError(t, err) + require.Equal(t, unwrap(res), "result0") + require.Equal(t, int64(2), *g1.Vertex.(*vertex).cacheCallCount) + require.Equal(t, int64(1), *g1.Vertex.(*vertex).execCallCount) + + require.NoError(t, j1.Discard()) + j1 = nil +} + +func TestParallelBuildsIgnoreCache(t *testing.T) { + t.Parallel() + ctx := context.TODO() + + l := NewJobList(SolverOpt{ + ResolveOpFunc: testOpResolver, + }) + defer l.Close() + + j0, err := l.NewJob("j0") + require.NoError(t, err) + + defer func() { + if j0 != nil { + j0.Discard() + } + }() + + g0 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v0", + cacheKeySeed: "seed0", + value: "result0", + }), + } + g0.Vertex.(*vertex).setupCallCounters() + + res, err := j0.Build(ctx, g0) + require.NoError(t, err) + + require.Equal(t, unwrap(res), "result0") + + // match by vertex digest + j1, err := l.NewJob("j1") + require.NoError(t, err) + + defer func() { + if j1 != nil { + j1.Discard() + } + }() + + g1 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v0", + cacheKeySeed: "seed1", + value: "result1", + ignoreCache: true, + }), + } + g1.Vertex.(*vertex).setupCallCounters() + + res, err = j1.Build(ctx, g1) + require.NoError(t, err) + + require.Equal(t, unwrap(res), "result1") + + require.NoError(t, j0.Discard()) + j0 = nil + require.NoError(t, j1.Discard()) + j1 = nil + + // new base + j2, err := l.NewJob("j2") + require.NoError(t, err) + + defer func() { + if j2 != nil { + j2.Discard() + } + }() + + g2 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v2", + cacheKeySeed: "seed2", + value: "result2", + }), + } + g2.Vertex.(*vertex).setupCallCounters() + + res, err = j2.Build(ctx, g2) + require.NoError(t, err) + + require.Equal(t, unwrap(res), "result2") + + // match by cache key + j3, err := l.NewJob("j3") + require.NoError(t, err) + + defer func() { + if j3 != nil { + j3.Discard() + } + }() + + g3 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v3", + cacheKeySeed: "seed2", + value: "result3", + ignoreCache: true, + }), + } + g3.Vertex.(*vertex).setupCallCounters() + + res, err = j3.Build(ctx, g3) + require.NoError(t, err) + + require.Equal(t, unwrap(res), "result3") + + // add another ignorecache merges now + + j4, err := l.NewJob("j4") + require.NoError(t, err) + + defer func() { + if j4 != nil { + j4.Discard() + } + }() + + g4 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v4", + cacheKeySeed: "seed2", // same as g2/g3 + value: "result4", + ignoreCache: true, + }), + } + g4.Vertex.(*vertex).setupCallCounters() + + res, err = j4.Build(ctx, g4) + require.NoError(t, err) + + require.Equal(t, unwrap(res), "result3") + + // add another !ignorecache merges now + + j5, err := l.NewJob("j5") + require.NoError(t, err) + + defer func() { + if j5 != nil { + j5.Discard() + } + }() + + g5 := Edge{ + Vertex: vtx(vtxOpt{ + name: "v5", + cacheKeySeed: "seed2", // same as g2/g3/g4 + value: "result5", + }), + } + g5.Vertex.(*vertex).setupCallCounters() + + res, err = j5.Build(ctx, g5) + require.NoError(t, err) + + require.Equal(t, unwrap(res), "result3") } func generateSubGraph(nodes int) (Edge, int) { @@ -1468,6 +1839,7 @@ type vtxOpt struct { value string slowCacheCompute map[int]ResultBasedCacheFunc cacheSource CacheManager + ignoreCache bool } func vtx(opt vtxOpt) *vertex { @@ -1502,6 +1874,7 @@ func (v *vertex) Name() string { func (v *vertex) Options() VertexOptions { return VertexOptions{ CacheSource: v.opt.cacheSource, + IgnoreCache: v.opt.ignoreCache, } }