package solver import ( "context" _ "crypto/sha256" "fmt" "math" "math/rand" "os" "sync/atomic" "testing" "time" "github.com/moby/buildkit/identity" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) func init() { if debugScheduler { logrus.SetOutput(os.Stdout) logrus.SetLevel(logrus.DebugLevel) } } func TestSingleLevelActiveGraph(t *testing.T) { t.Parallel() ctx := context.TODO() s := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, }) defer s.Close() j0, err := s.NewJob("job0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() g0 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", value: "result0", }), } g0.Vertex.(*vertex).setupCallCounters() res, err := j0.Build(ctx, g0) require.NoError(t, err) require.NotNil(t, res) require.Equal(t, unwrap(res), "result0") require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1)) // calling again with same digest just uses the active queue j1, err := s.NewJob("job1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() g1 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", value: "result1", }), } g1.Vertex.(*vertex).setupCallCounters() res, err = j1.Build(ctx, g1) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1)) require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(0)) require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(0)) require.NoError(t, j0.Discard()) j0 = nil // after discarding j0, j1 still holds the state j2, err := s.NewJob("job2") require.NoError(t, err) defer func() { if j2 != nil { j2.Discard() } }() g2 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", value: "result2", }), } g2.Vertex.(*vertex).setupCallCounters() res, err = j2.Build(ctx, g2) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1)) require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(0)) require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(0)) require.Equal(t, *g2.Vertex.(*vertex).cacheCallCount, int64(0)) require.Equal(t, *g2.Vertex.(*vertex).execCallCount, int64(0)) require.NoError(t, j1.Discard()) j1 = nil require.NoError(t, j2.Discard()) j2 = nil // everything should be released now j3, err := s.NewJob("job3") require.NoError(t, err) defer func() { if j3 != nil { j3.Discard() } }() g3 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", value: "result3", }), } g3.Vertex.(*vertex).setupCallCounters() res, err = j3.Build(ctx, g3) require.NoError(t, err) require.Equal(t, unwrap(res), "result3") require.Equal(t, *g3.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g3.Vertex.(*vertex).execCallCount, int64(1)) require.NoError(t, j3.Discard()) j3 = nil // repeat the same test but make sure the build run in parallel now j4, err := s.NewJob("job4") require.NoError(t, err) defer func() { if j4 != nil { j4.Discard() } }() j5, err := s.NewJob("job5") require.NoError(t, err) defer func() { if j5 != nil { j5.Discard() } }() g4 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheDelay: 100 * time.Millisecond, value: "result4", }), } g4.Vertex.(*vertex).setupCallCounters() eg, _ := errgroup.WithContext(ctx) eg.Go(func() error { res, err := j4.Build(ctx, g4) require.NoError(t, err) require.Equal(t, unwrap(res), "result4") return err }) eg.Go(func() error { res, err := j5.Build(ctx, g4) require.NoError(t, err) require.Equal(t, unwrap(res), "result4") return err }) require.NoError(t, eg.Wait()) require.Equal(t, *g4.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g4.Vertex.(*vertex).execCallCount, int64(1)) } func TestSingleLevelCache(t *testing.T) { t.Parallel() ctx := context.TODO() s := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, }) defer s.Close() j0, err := s.NewJob("job0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() g0 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheKeySeed: "seed0", value: "result0", }), } g0.Vertex.(*vertex).setupCallCounters() res, err := j0.Build(ctx, g0) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.NoError(t, j0.Discard()) j0 = nil // first try that there is no match for different cache j1, err := s.NewJob("job1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() g1 := Edge{ Vertex: vtx(vtxOpt{ name: "v1", cacheKeySeed: "seed1", value: "result1", }), } g1.Vertex.(*vertex).setupCallCounters() res, err = j1.Build(ctx, g1) require.NoError(t, err) require.Equal(t, unwrap(res), "result1") require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(1)) require.NoError(t, j1.Discard()) j1 = nil // expect cache match for first build j2, err := s.NewJob("job2") require.NoError(t, err) defer func() { if j2 != nil { j2.Discard() } }() g2 := Edge{ Vertex: vtx(vtxOpt{ name: "v2", cacheKeySeed: "seed0", // same as first build value: "result2", }), } g2.Vertex.(*vertex).setupCallCounters() res, err = j2.Build(ctx, g2) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1)) require.Equal(t, *g2.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g2.Vertex.(*vertex).execCallCount, int64(0)) require.NoError(t, j2.Discard()) j2 = nil } func TestSingleLevelCacheParallel(t *testing.T) { t.Parallel() ctx := context.TODO() s := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, }) defer s.Close() // rebuild in parallel. only executed once. j0, err := s.NewJob("job0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() wait2Ready := blockingFuncion(2) g0 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheKeySeed: "seed0", cachePreFunc: wait2Ready, value: "result0", }), } g0.Vertex.(*vertex).setupCallCounters() j1, err := s.NewJob("job1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() g1 := Edge{ Vertex: vtx(vtxOpt{ name: "v1", cacheKeySeed: "seed0", // same as g0 cachePreFunc: wait2Ready, value: "result0", }), } g1.Vertex.(*vertex).setupCallCounters() eg, _ := errgroup.WithContext(ctx) eg.Go(func() error { res, err := j0.Build(ctx, g0) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") return err }) eg.Go(func() error { res, err := j1.Build(ctx, g1) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") return err }) require.NoError(t, eg.Wait()) require.Equal(t, int64(1), *g0.Vertex.(*vertex).cacheCallCount) require.Equal(t, int64(1), *g1.Vertex.(*vertex).cacheCallCount) // only one execution ran require.Equal(t, int64(1), *g0.Vertex.(*vertex).execCallCount+*g1.Vertex.(*vertex).execCallCount) } func TestMultiLevelCacheParallel(t *testing.T) { t.Parallel() ctx := context.TODO() s := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, }) defer s.Close() // rebuild in parallel. only executed once. j0, err := s.NewJob("job0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() wait2Ready := blockingFuncion(2) wait2Ready2 := blockingFuncion(2) g0 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheKeySeed: "seed0", cachePreFunc: wait2Ready, value: "result0", inputs: []Edge{{ Vertex: vtx(vtxOpt{ name: "v0-c0", cacheKeySeed: "seed0-c0", cachePreFunc: wait2Ready2, value: "result0-c0", })}, }, }), } g0.Vertex.(*vertex).setupCallCounters() j1, err := s.NewJob("job1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() g1 := Edge{ Vertex: vtx(vtxOpt{ name: "v1", cacheKeySeed: "seed0", // same as g0 cachePreFunc: wait2Ready, value: "result0", inputs: []Edge{{ Vertex: vtx(vtxOpt{ name: "v1-c0", cacheKeySeed: "seed0-c0", // same as g0 cachePreFunc: wait2Ready2, value: "result0-c", })}, }, }), } g1.Vertex.(*vertex).setupCallCounters() eg, _ := errgroup.WithContext(ctx) eg.Go(func() error { res, err := j0.Build(ctx, g0) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") return err }) eg.Go(func() error { res, err := j1.Build(ctx, g1) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") return err }) require.NoError(t, eg.Wait()) require.Equal(t, int64(2), *g0.Vertex.(*vertex).cacheCallCount) require.Equal(t, int64(2), *g1.Vertex.(*vertex).cacheCallCount) require.Equal(t, int64(2), *g0.Vertex.(*vertex).execCallCount+*g1.Vertex.(*vertex).execCallCount) } func TestSingleCancelCache(t *testing.T) { t.Parallel() ctx := context.TODO() s := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, }) defer s.Close() j0, err := s.NewJob("job0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() ctx, cancel := context.WithCancel(ctx) g0 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cachePreFunc: func(ctx context.Context) error { cancel() <-ctx.Done() return nil // error should still come from context }, }), } g0.Vertex.(*vertex).setupCallCounters() _, err = j0.Build(ctx, g0) require.Error(t, err) require.Equal(t, errors.Cause(err), context.Canceled) require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(0)) require.NoError(t, j0.Discard()) j0 = nil } func TestSingleCancelExec(t *testing.T) { t.Parallel() ctx := context.TODO() s := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, }) defer s.Close() j1, err := s.NewJob("job1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() ctx, cancel := context.WithCancel(ctx) g1 := Edge{ Vertex: vtx(vtxOpt{ name: "v2", execPreFunc: func(ctx context.Context) error { cancel() <-ctx.Done() return nil // error should still come from context }, }), } g1.Vertex.(*vertex).setupCallCounters() _, err = j1.Build(ctx, g1) require.Error(t, err) require.Equal(t, errors.Cause(err), context.Canceled) require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(1)) require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(1)) require.NoError(t, j1.Discard()) j1 = nil } func TestSingleCancelParallel(t *testing.T) { t.Parallel() ctx := context.TODO() s := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, }) defer s.Close() // run 2 in parallel cancel first, second one continues without errors eg, ctx := errgroup.WithContext(ctx) firstReady := make(chan struct{}) firstErrored := make(chan struct{}) eg.Go(func() error { j, err := s.NewJob("job2") require.NoError(t, err) defer func() { if j != nil { j.Discard() } }() ctx, cancel := context.WithCancel(ctx) defer cancel() g := Edge{ Vertex: vtx(vtxOpt{ name: "v2", value: "result2", cachePreFunc: func(ctx context.Context) error { close(firstReady) time.Sleep(200 * time.Millisecond) cancel() <-firstErrored return nil }, }), } _, err = j.Build(ctx, g) close(firstErrored) require.Error(t, err) require.Equal(t, errors.Cause(err), context.Canceled) return nil }) eg.Go(func() error { j, err := s.NewJob("job3") require.NoError(t, err) defer func() { if j != nil { j.Discard() } }() g := Edge{ Vertex: vtx(vtxOpt{ name: "v2", }), } <-firstReady res, err := j.Build(ctx, g) require.NoError(t, err) require.Equal(t, unwrap(res), "result2") return err }) require.NoError(t, eg.Wait()) } func TestMultiLevelCalculation(t *testing.T) { t.Parallel() ctx := context.TODO() l := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, }) defer l.Close() j0, err := l.NewJob("j0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() g := Edge{ Vertex: vtxSum(1, vtxOpt{ inputs: []Edge{ {Vertex: vtxSum(0, vtxOpt{ inputs: []Edge{ {Vertex: vtxConst(7, vtxOpt{})}, {Vertex: vtxConst(2, vtxOpt{})}, }, })}, {Vertex: vtxSum(0, vtxOpt{ inputs: []Edge{ {Vertex: vtxConst(7, vtxOpt{})}, {Vertex: vtxConst(2, vtxOpt{})}, }, })}, {Vertex: vtxConst(2, vtxOpt{})}, {Vertex: vtxConst(2, vtxOpt{})}, {Vertex: vtxConst(19, vtxOpt{})}, }, }), } res, err := j0.Build(ctx, g) require.NoError(t, err) require.Equal(t, unwrapInt(res), 42) // 1 + 2*(7 + 2) + 2 + 2 + 19 require.NoError(t, j0.Discard()) j0 = nil // repeating same build with cache should behave the same j1, err := l.NewJob("j1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() g2 := Edge{ Vertex: vtxSum(1, vtxOpt{ inputs: []Edge{ {Vertex: vtxSum(0, vtxOpt{ inputs: []Edge{ {Vertex: vtxConst(7, vtxOpt{})}, {Vertex: vtxConst(2, vtxOpt{})}, }, })}, {Vertex: vtxSum(0, vtxOpt{ inputs: []Edge{ {Vertex: vtxConst(7, vtxOpt{})}, {Vertex: vtxConst(2, vtxOpt{})}, }, })}, {Vertex: vtxConst(2, vtxOpt{})}, {Vertex: vtxConst(2, vtxOpt{})}, {Vertex: vtxConst(19, vtxOpt{})}, }, }), } res, err = j1.Build(ctx, g2) require.NoError(t, err) require.Equal(t, unwrapInt(res), 42) } func TestHugeGraph(t *testing.T) { t.Parallel() ctx := context.TODO() rand.Seed(time.Now().UnixNano()) cacheManager := newTrackingCacheManager(NewInMemoryCacheManager()) l := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, DefaultCache: cacheManager, }) defer l.Close() j0, err := l.NewJob("j0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() nodes := 1000 g, v := generateSubGraph(nodes) // printGraph(g, "") g.Vertex.(*vertexSum).setupCallCounters() res, err := j0.Build(ctx, g) require.NoError(t, err) require.Equal(t, unwrapInt(res), v) require.Equal(t, int64(nodes), *g.Vertex.(*vertexSum).cacheCallCount) // execCount := *g.Vertex.(*vertexSum).execCallCount // require.True(t, execCount < 1000) // require.True(t, execCount > 600) require.Equal(t, int64(0), cacheManager.loadCounter) require.NoError(t, j0.Discard()) j0 = nil j1, err := l.NewJob("j1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() g.Vertex.(*vertexSum).setupCallCounters() res, err = j1.Build(ctx, g) require.NoError(t, err) require.Equal(t, unwrapInt(res), v) require.Equal(t, int64(nodes), *g.Vertex.(*vertexSum).cacheCallCount) require.Equal(t, int64(0), *g.Vertex.(*vertexSum).execCallCount) require.Equal(t, int64(1), cacheManager.loadCounter) } // TestOptimizedCacheAccess tests that inputs are not loaded from cache unless // they are really needed func TestOptimizedCacheAccess(t *testing.T) { t.Parallel() ctx := context.TODO() cacheManager := newTrackingCacheManager(NewInMemoryCacheManager()) l := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, DefaultCache: cacheManager, }) defer l.Close() j0, err := l.NewJob("j0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() g0 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheKeySeed: "seed0", value: "result0", inputs: []Edge{ {Vertex: vtx(vtxOpt{ name: "v1", cacheKeySeed: "seed1", value: "result1", })}, {Vertex: vtx(vtxOpt{ name: "v2", cacheKeySeed: "seed2", value: "result2", })}, }, slowCacheCompute: map[int]ResultBasedCacheFunc{ 1: digestFromResult, }, }), } g0.Vertex.(*vertex).setupCallCounters() res, err := j0.Build(ctx, g0) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.Equal(t, int64(3), *g0.Vertex.(*vertex).cacheCallCount) require.Equal(t, int64(3), *g0.Vertex.(*vertex).execCallCount) require.Equal(t, int64(0), cacheManager.loadCounter) require.NoError(t, j0.Discard()) j0 = nil // changing cache seed for the input with slow cache should not pull result1 j1, err := l.NewJob("j1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() g1 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheKeySeed: "seed0", value: "result0-nocache", inputs: []Edge{ {Vertex: vtx(vtxOpt{ name: "v1", cacheKeySeed: "seed1", value: "result1-nocache", })}, {Vertex: vtx(vtxOpt{ name: "v2-changed", cacheKeySeed: "seed2-changed", value: "result2", // produces same slow key as g0 })}, }, slowCacheCompute: map[int]ResultBasedCacheFunc{ 1: digestFromResult, }, }), } g1.Vertex.(*vertex).setupCallCounters() res, err = j1.Build(ctx, g1) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.Equal(t, int64(3), *g1.Vertex.(*vertex).cacheCallCount) require.Equal(t, int64(1), *g1.Vertex.(*vertex).execCallCount) require.Equal(t, int64(1), cacheManager.loadCounter) require.NoError(t, j1.Discard()) j1 = nil } // TestOptimizedCacheAccess2 is a more narrow case that tests that inputs are // not loaded from cache unless they are really needed. Inputs that match by // definition should be less prioritized for slow cache calculation than the // inputs that didn't. func TestOptimizedCacheAccess2(t *testing.T) { t.Parallel() ctx := context.TODO() cacheManager := newTrackingCacheManager(NewInMemoryCacheManager()) l := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, DefaultCache: cacheManager, }) defer l.Close() j0, err := l.NewJob("j0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() g0 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheKeySeed: "seed0", value: "result0", inputs: []Edge{ {Vertex: vtx(vtxOpt{ name: "v1", cacheKeySeed: "seed1", value: "result1", })}, {Vertex: vtx(vtxOpt{ name: "v2", cacheKeySeed: "seed2", value: "result2", })}, }, slowCacheCompute: map[int]ResultBasedCacheFunc{ 0: digestFromResult, 1: digestFromResult, }, }), } g0.Vertex.(*vertex).setupCallCounters() res, err := j0.Build(ctx, g0) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.Equal(t, int64(3), *g0.Vertex.(*vertex).cacheCallCount) require.Equal(t, int64(3), *g0.Vertex.(*vertex).execCallCount) require.Equal(t, int64(0), cacheManager.loadCounter) require.NoError(t, j0.Discard()) j0 = nil // changing cache seed for the input with slow cache should not pull result1 j1, err := l.NewJob("j1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() g1 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheKeySeed: "seed0", value: "result0-nocache", inputs: []Edge{ {Vertex: vtx(vtxOpt{ name: "v1", cacheKeySeed: "seed1", value: "result1", })}, {Vertex: vtx(vtxOpt{ name: "v2-changed", cacheKeySeed: "seed2-changed", value: "result2", // produces same slow key as g0 })}, }, slowCacheCompute: map[int]ResultBasedCacheFunc{ 0: digestFromResult, 1: digestFromResult, }, }), } g1.Vertex.(*vertex).setupCallCounters() res, err = j1.Build(ctx, g1) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.Equal(t, int64(3), *g1.Vertex.(*vertex).cacheCallCount) require.Equal(t, int64(1), *g1.Vertex.(*vertex).execCallCount) require.Equal(t, int64(1), cacheManager.loadCounter) // v1 is never loaded nor executed require.NoError(t, j1.Discard()) j1 = nil // make sure that both inputs are still used for slow cache hit j2, err := l.NewJob("j2") require.NoError(t, err) defer func() { if j2 != nil { j2.Discard() } }() g2 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheKeySeed: "seed0", value: "result0-nocache", inputs: []Edge{ {Vertex: vtx(vtxOpt{ name: "v1", cacheKeySeed: "seed1-changed2", value: "result1", })}, {Vertex: vtx(vtxOpt{ name: "v2-changed", cacheKeySeed: "seed2-changed2", value: "result2", // produces same slow key as g0 })}, }, slowCacheCompute: map[int]ResultBasedCacheFunc{ 0: digestFromResult, 1: digestFromResult, }, }), } g2.Vertex.(*vertex).setupCallCounters() res, err = j2.Build(ctx, g2) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.Equal(t, int64(3), *g2.Vertex.(*vertex).cacheCallCount) require.Equal(t, int64(2), *g2.Vertex.(*vertex).execCallCount) require.Equal(t, int64(2), cacheManager.loadCounter) require.NoError(t, j2.Discard()) j1 = nil } func TestSlowCache(t *testing.T) { t.Parallel() ctx := context.TODO() rand.Seed(time.Now().UnixNano()) l := NewJobList(SolverOpt{ ResolveOpFunc: testOpResolver, }) defer l.Close() j0, err := l.NewJob("j0") require.NoError(t, err) defer func() { if j0 != nil { j0.Discard() } }() g0 := Edge{ Vertex: vtx(vtxOpt{ name: "v0", cacheKeySeed: "seed0", value: "result0", inputs: []Edge{ {Vertex: vtx(vtxOpt{ name: "v1", cacheKeySeed: "seed1", value: "result1", })}, }, slowCacheCompute: map[int]ResultBasedCacheFunc{ 0: digestFromResult, }, }), } res, err := j0.Build(ctx, g0) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.NoError(t, j0.Discard()) j0 = nil j1, err := l.NewJob("j1") require.NoError(t, err) defer func() { if j1 != nil { j1.Discard() } }() g1 := Edge{ Vertex: vtx(vtxOpt{ name: "v2", cacheKeySeed: "seed0", value: "not-cached", inputs: []Edge{ {Vertex: vtx(vtxOpt{ name: "v3", cacheKeySeed: "seed3", value: "result1", // used for slow key })}, }, slowCacheCompute: map[int]ResultBasedCacheFunc{ 0: digestFromResult, }, }), } res, err = j1.Build(ctx, g1) require.NoError(t, err) require.Equal(t, unwrap(res), "result0") require.NoError(t, j1.Discard()) j1 = nil } func generateSubGraph(nodes int) (Edge, int) { if nodes == 1 { value := rand.Int() % 500 return Edge{Vertex: vtxConst(value, vtxOpt{})}, value } spread := rand.Int()%5 + 2 inc := int(math.Ceil(float64(nodes) / float64(spread))) if inc > nodes { inc = nodes } added := 1 value := 0 inputs := []Edge{} i := 0 for { i++ if added >= nodes { break } if added+inc > nodes { inc = nodes - added } e, v := generateSubGraph(inc) inputs = append(inputs, e) value += v added += inc } extra := rand.Int() % 500 value += extra return Edge{Vertex: vtxSum(extra, vtxOpt{inputs: inputs})}, value } type vtxOpt struct { name string cacheKeySeed string execDelay time.Duration cacheDelay time.Duration cachePreFunc func(context.Context) error execPreFunc func(context.Context) error inputs []Edge value string slowCacheCompute map[int]ResultBasedCacheFunc } func vtx(opt vtxOpt) *vertex { if opt.name == "" { opt.name = identity.NewID() } if opt.cacheKeySeed == "" { opt.cacheKeySeed = identity.NewID() } return &vertex{opt: opt} } type vertex struct { opt vtxOpt cacheCallCount *int64 execCallCount *int64 } func (v *vertex) Digest() digest.Digest { return digest.FromBytes([]byte(v.opt.name)) } func (v *vertex) Sys() interface{} { return v } func (v *vertex) Inputs() []Edge { return v.opt.inputs } func (v *vertex) Name() string { return v.opt.name } func (v *vertex) setupCallCounters() { var cacheCount int64 var execCount int64 v.setCallCounters(&cacheCount, &execCount) } func (v *vertex) setCallCounters(cacheCount, execCount *int64) { v.cacheCallCount = cacheCount v.execCallCount = execCount for _, inp := range v.opt.inputs { var v *vertex switch vv := inp.Vertex.(type) { case *vertex: v = vv case *vertexSum: v = vv.vertex case *vertexConst: v = vv.vertex } v.setCallCounters(cacheCount, execCount) } } func (v *vertex) cacheMap(ctx context.Context) error { if f := v.opt.cachePreFunc; f != nil { if err := f(ctx); err != nil { return err } } if v.cacheCallCount != nil { atomic.AddInt64(v.cacheCallCount, 1) } select { case <-ctx.Done(): return ctx.Err() default: } select { case <-time.After(v.opt.cacheDelay): case <-ctx.Done(): return ctx.Err() } return nil } func (v *vertex) CacheMap(ctx context.Context) (*CacheMap, error) { if err := v.cacheMap(ctx); err != nil { return nil, err } return v.makeCacheMap(), nil } func (v *vertex) exec(ctx context.Context, inputs []Result) error { if len(inputs) != len(v.Inputs()) { return errors.Errorf("invalid number of inputs") } if f := v.opt.execPreFunc; f != nil { if err := f(ctx); err != nil { return err } } if v.execCallCount != nil { atomic.AddInt64(v.execCallCount, 1) } select { case <-ctx.Done(): return ctx.Err() default: } select { case <-time.After(v.opt.execDelay): case <-ctx.Done(): return ctx.Err() } return nil } func (v *vertex) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) { if err := v.exec(ctx, inputs); err != nil { return nil, err } return []Result{&dummyResult{id: identity.NewID(), value: v.opt.value}}, nil } func (v *vertex) makeCacheMap() *CacheMap { m := &CacheMap{ Digest: digest.FromBytes([]byte(fmt.Sprintf("seed:%s", v.opt.cacheKeySeed))), Deps: make([]struct { Selector digest.Digest ComputeDigestFunc ResultBasedCacheFunc }, len(v.Inputs())), } for i, f := range v.opt.slowCacheCompute { m.Deps[i].ComputeDigestFunc = f } return m } // vtxConst returns a vertex that outputs a constant integer func vtxConst(v int, opt vtxOpt) *vertexConst { if opt.cacheKeySeed == "" { opt.cacheKeySeed = fmt.Sprintf("const-%d", v) } if opt.name == "" { opt.name = opt.cacheKeySeed + "-" + identity.NewID() } return &vertexConst{vertex: vtx(opt), value: v} } type vertexConst struct { *vertex value int } func (v *vertexConst) Sys() interface{} { return v } func (v *vertexConst) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) { if err := v.exec(ctx, inputs); err != nil { return nil, err } return []Result{&dummyResult{id: identity.NewID(), intValue: v.value}}, nil } // vtxSum returns a vertex that ourputs sum of its inputs plus a constant func vtxSum(v int, opt vtxOpt) *vertexSum { if opt.cacheKeySeed == "" { opt.cacheKeySeed = fmt.Sprintf("sum-%d", v) } if opt.name == "" { opt.name = opt.cacheKeySeed + "-" + identity.NewID() } return &vertexSum{vertex: vtx(opt), value: v} } type vertexSum struct { *vertex value int } func (v *vertexSum) Sys() interface{} { return v } func (v *vertexSum) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) { if err := v.exec(ctx, inputs); err != nil { return nil, err } s := v.value for _, inp := range inputs { r, ok := inp.Sys().(*dummyResult) if !ok { return nil, errors.Errorf("invalid input type: %T", inp.Sys()) } s += r.intValue } return []Result{&dummyResult{id: identity.NewID(), intValue: s}}, nil } func printGraph(e Edge, pfx string) { name := e.Vertex.Name() fmt.Printf("%s %d %s\n", pfx, e.Index, name) for _, inp := range e.Vertex.Inputs() { printGraph(inp, pfx+"-->") } } type dummyResult struct { id string value string intValue int } func (r *dummyResult) ID() string { return r.id } func (r *dummyResult) Release(context.Context) error { return nil } func (r *dummyResult) Sys() interface{} { return r } func testOpResolver(v Vertex) (Op, error) { if op, ok := v.Sys().(Op); ok { return op, nil } return nil, errors.Errorf("invalid vertex") } func unwrap(res Result) string { r, ok := res.Sys().(*dummyResult) if !ok { return "unwrap-error" } return r.value } func unwrapInt(res Result) int { r, ok := res.Sys().(*dummyResult) if !ok { return -1e6 } return r.intValue } func blockingFuncion(i int) func(context.Context) error { limit := int64(i) block := make(chan struct{}) return func(context.Context) error { if atomic.AddInt64(&limit, -1) == 0 { close(block) } <-block return nil } } func newTrackingCacheManager(cm CacheManager) *trackingCacheManager { return &trackingCacheManager{CacheManager: cm} } type trackingCacheManager struct { CacheManager loadCounter int64 } func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) { atomic.AddInt64(&cm.loadCounter, 1) return cm.CacheManager.Load(ctx, rec) } func digestFromResult(ctx context.Context, res Result) (digest.Digest, error) { return digest.FromBytes([]byte(unwrap(res))), nil }