buildkit/solver-next/scheduler_test.go

1740 lines
35 KiB
Go
Raw Normal View History

package solver
import (
"context"
_ "crypto/sha256"
"fmt"
"math"
"math/rand"
"os"
"sync/atomic"
"testing"
"time"
"github.com/moby/buildkit/identity"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func init() {
if debugScheduler {
logrus.SetOutput(os.Stdout)
logrus.SetLevel(logrus.DebugLevel)
}
}
func TestSingleLevelActiveGraph(t *testing.T) {
t.Parallel()
ctx := context.TODO()
s := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer s.Close()
j0, err := s.NewJob("job0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
value: "result0",
}),
}
g0.Vertex.(*vertex).setupCallCounters()
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1))
// calling again with same digest just uses the active queue
j1, err := s.NewJob("job1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
value: "result1",
}),
}
g1.Vertex.(*vertex).setupCallCounters()
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1))
require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(0))
require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(0))
require.NoError(t, j0.Discard())
j0 = nil
// after discarding j0, j1 still holds the state
j2, err := s.NewJob("job2")
require.NoError(t, err)
defer func() {
if j2 != nil {
j2.Discard()
}
}()
g2 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
value: "result2",
}),
}
g2.Vertex.(*vertex).setupCallCounters()
res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1))
require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(0))
require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(0))
require.Equal(t, *g2.Vertex.(*vertex).cacheCallCount, int64(0))
require.Equal(t, *g2.Vertex.(*vertex).execCallCount, int64(0))
require.NoError(t, j1.Discard())
j1 = nil
require.NoError(t, j2.Discard())
j2 = nil
// everything should be released now
j3, err := s.NewJob("job3")
require.NoError(t, err)
defer func() {
if j3 != nil {
j3.Discard()
}
}()
g3 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
value: "result3",
}),
}
g3.Vertex.(*vertex).setupCallCounters()
res, err = j3.Build(ctx, g3)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result3")
require.Equal(t, *g3.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g3.Vertex.(*vertex).execCallCount, int64(1))
require.NoError(t, j3.Discard())
j3 = nil
// repeat the same test but make sure the build run in parallel now
j4, err := s.NewJob("job4")
require.NoError(t, err)
defer func() {
if j4 != nil {
j4.Discard()
}
}()
j5, err := s.NewJob("job5")
require.NoError(t, err)
defer func() {
if j5 != nil {
j5.Discard()
}
}()
g4 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheDelay: 100 * time.Millisecond,
value: "result4",
}),
}
g4.Vertex.(*vertex).setupCallCounters()
eg, _ := errgroup.WithContext(ctx)
eg.Go(func() error {
res, err := j4.Build(ctx, g4)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result4")
return err
})
eg.Go(func() error {
res, err := j5.Build(ctx, g4)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result4")
return err
})
require.NoError(t, eg.Wait())
require.Equal(t, *g4.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g4.Vertex.(*vertex).execCallCount, int64(1))
}
func TestSingleLevelCache(t *testing.T) {
t.Parallel()
ctx := context.TODO()
s := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer s.Close()
j0, err := s.NewJob("job0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
}),
}
g0.Vertex.(*vertex).setupCallCounters()
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.NoError(t, j0.Discard())
j0 = nil
// first try that there is no match for different cache
j1, err := s.NewJob("job1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
}),
}
g1.Vertex.(*vertex).setupCallCounters()
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result1")
require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(1))
require.NoError(t, j1.Discard())
j1 = nil
// expect cache match for first build
j2, err := s.NewJob("job2")
require.NoError(t, err)
defer func() {
if j2 != nil {
j2.Discard()
}
}()
g2 := Edge{
Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed0", // same as first build
value: "result2",
}),
}
g2.Vertex.(*vertex).setupCallCounters()
res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1))
require.Equal(t, *g2.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g2.Vertex.(*vertex).execCallCount, int64(0))
require.NoError(t, j2.Discard())
j2 = nil
}
func TestSingleLevelCacheParallel(t *testing.T) {
t.Parallel()
ctx := context.TODO()
s := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer s.Close()
// rebuild in parallel. only executed once.
j0, err := s.NewJob("job0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
wait2Ready := blockingFuncion(2)
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
cachePreFunc: wait2Ready,
value: "result0",
}),
}
g0.Vertex.(*vertex).setupCallCounters()
j1, err := s.NewJob("job1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed0", // same as g0
cachePreFunc: wait2Ready,
value: "result0",
}),
}
g1.Vertex.(*vertex).setupCallCounters()
eg, _ := errgroup.WithContext(ctx)
eg.Go(func() error {
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
return err
})
eg.Go(func() error {
res, err := j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
return err
})
require.NoError(t, eg.Wait())
require.Equal(t, int64(1), *g0.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(1), *g1.Vertex.(*vertex).cacheCallCount)
// only one execution ran
require.Equal(t, int64(1), *g0.Vertex.(*vertex).execCallCount+*g1.Vertex.(*vertex).execCallCount)
}
func TestMultiLevelCacheParallel(t *testing.T) {
t.Parallel()
ctx := context.TODO()
s := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer s.Close()
// rebuild in parallel. only executed once.
j0, err := s.NewJob("job0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
wait2Ready := blockingFuncion(2)
wait2Ready2 := blockingFuncion(2)
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
cachePreFunc: wait2Ready,
value: "result0",
inputs: []Edge{{
Vertex: vtx(vtxOpt{
name: "v0-c0",
cacheKeySeed: "seed0-c0",
cachePreFunc: wait2Ready2,
value: "result0-c0",
})},
},
}),
}
g0.Vertex.(*vertex).setupCallCounters()
j1, err := s.NewJob("job1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed0", // same as g0
cachePreFunc: wait2Ready,
value: "result0",
inputs: []Edge{{
Vertex: vtx(vtxOpt{
name: "v1-c0",
cacheKeySeed: "seed0-c0", // same as g0
cachePreFunc: wait2Ready2,
value: "result0-c",
})},
},
}),
}
g1.Vertex.(*vertex).setupCallCounters()
eg, _ := errgroup.WithContext(ctx)
eg.Go(func() error {
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
return err
})
eg.Go(func() error {
res, err := j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
return err
})
require.NoError(t, eg.Wait())
require.Equal(t, int64(2), *g0.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(2), *g1.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(2), *g0.Vertex.(*vertex).execCallCount+*g1.Vertex.(*vertex).execCallCount)
}
func TestSingleCancelCache(t *testing.T) {
t.Parallel()
ctx := context.TODO()
s := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer s.Close()
j0, err := s.NewJob("job0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
ctx, cancel := context.WithCancel(ctx)
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cachePreFunc: func(ctx context.Context) error {
cancel()
<-ctx.Done()
return nil // error should still come from context
},
}),
}
g0.Vertex.(*vertex).setupCallCounters()
_, err = j0.Build(ctx, g0)
require.Error(t, err)
require.Equal(t, errors.Cause(err), context.Canceled)
require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(0))
require.NoError(t, j0.Discard())
j0 = nil
}
func TestSingleCancelExec(t *testing.T) {
t.Parallel()
ctx := context.TODO()
s := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer s.Close()
j1, err := s.NewJob("job1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
ctx, cancel := context.WithCancel(ctx)
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v2",
execPreFunc: func(ctx context.Context) error {
cancel()
<-ctx.Done()
return nil // error should still come from context
},
}),
}
g1.Vertex.(*vertex).setupCallCounters()
_, err = j1.Build(ctx, g1)
require.Error(t, err)
require.Equal(t, errors.Cause(err), context.Canceled)
require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(1))
require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(1))
require.NoError(t, j1.Discard())
j1 = nil
}
func TestSingleCancelParallel(t *testing.T) {
t.Parallel()
ctx := context.TODO()
s := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer s.Close()
// run 2 in parallel cancel first, second one continues without errors
eg, ctx := errgroup.WithContext(ctx)
firstReady := make(chan struct{})
firstErrored := make(chan struct{})
eg.Go(func() error {
j, err := s.NewJob("job2")
require.NoError(t, err)
defer func() {
if j != nil {
j.Discard()
}
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
g := Edge{
Vertex: vtx(vtxOpt{
name: "v2",
value: "result2",
cachePreFunc: func(ctx context.Context) error {
close(firstReady)
time.Sleep(200 * time.Millisecond)
cancel()
<-firstErrored
return nil
},
}),
}
_, err = j.Build(ctx, g)
close(firstErrored)
require.Error(t, err)
require.Equal(t, errors.Cause(err), context.Canceled)
return nil
})
eg.Go(func() error {
j, err := s.NewJob("job3")
require.NoError(t, err)
defer func() {
if j != nil {
j.Discard()
}
}()
g := Edge{
Vertex: vtx(vtxOpt{
name: "v2",
}),
}
<-firstReady
res, err := j.Build(ctx, g)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result2")
return err
})
require.NoError(t, eg.Wait())
}
func TestMultiLevelCalculation(t *testing.T) {
t.Parallel()
ctx := context.TODO()
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g := Edge{
Vertex: vtxSum(1, vtxOpt{
inputs: []Edge{
{Vertex: vtxSum(0, vtxOpt{
inputs: []Edge{
{Vertex: vtxConst(7, vtxOpt{})},
{Vertex: vtxConst(2, vtxOpt{})},
},
})},
{Vertex: vtxSum(0, vtxOpt{
inputs: []Edge{
{Vertex: vtxConst(7, vtxOpt{})},
{Vertex: vtxConst(2, vtxOpt{})},
},
})},
{Vertex: vtxConst(2, vtxOpt{})},
{Vertex: vtxConst(2, vtxOpt{})},
{Vertex: vtxConst(19, vtxOpt{})},
},
}),
}
res, err := j0.Build(ctx, g)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 42) // 1 + 2*(7 + 2) + 2 + 2 + 19
require.NoError(t, j0.Discard())
j0 = nil
// repeating same build with cache should behave the same
j1, err := l.NewJob("j1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g2 := Edge{
Vertex: vtxSum(1, vtxOpt{
inputs: []Edge{
{Vertex: vtxSum(0, vtxOpt{
inputs: []Edge{
{Vertex: vtxConst(7, vtxOpt{})},
{Vertex: vtxConst(2, vtxOpt{})},
},
})},
{Vertex: vtxSum(0, vtxOpt{
inputs: []Edge{
{Vertex: vtxConst(7, vtxOpt{})},
{Vertex: vtxConst(2, vtxOpt{})},
},
})},
{Vertex: vtxConst(2, vtxOpt{})},
{Vertex: vtxConst(2, vtxOpt{})},
{Vertex: vtxConst(19, vtxOpt{})},
},
}),
}
res, err = j1.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 42)
}
func TestHugeGraph(t *testing.T) {
t.Parallel()
ctx := context.TODO()
rand.Seed(time.Now().UnixNano())
cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
nodes := 1000
g, v := generateSubGraph(nodes)
// printGraph(g, "")
g.Vertex.(*vertexSum).setupCallCounters()
res, err := j0.Build(ctx, g)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), v)
require.Equal(t, int64(nodes), *g.Vertex.(*vertexSum).cacheCallCount)
// execCount := *g.Vertex.(*vertexSum).execCallCount
// require.True(t, execCount < 1000)
// require.True(t, execCount > 600)
require.Equal(t, int64(0), cacheManager.loadCounter)
require.NoError(t, j0.Discard())
j0 = nil
j1, err := l.NewJob("j1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g.Vertex.(*vertexSum).setupCallCounters()
res, err = j1.Build(ctx, g)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), v)
require.Equal(t, int64(nodes), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(0), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(1), cacheManager.loadCounter)
}
// TestOptimizedCacheAccess tests that inputs are not loaded from cache unless
// they are really needed
func TestOptimizedCacheAccess(t *testing.T) {
t.Parallel()
ctx := context.TODO()
cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
})},
{Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
})},
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
1: digestFromResult,
},
}),
}
g0.Vertex.(*vertex).setupCallCounters()
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, int64(3), *g0.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(3), *g0.Vertex.(*vertex).execCallCount)
require.Equal(t, int64(0), cacheManager.loadCounter)
require.NoError(t, j0.Discard())
j0 = nil
// changing cache seed for the input with slow cache should not pull result1
j1, err := l.NewJob("j1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0-nocache",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1-nocache",
})},
{Vertex: vtx(vtxOpt{
name: "v2-changed",
cacheKeySeed: "seed2-changed",
value: "result2", // produces same slow key as g0
})},
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
1: digestFromResult,
},
}),
}
g1.Vertex.(*vertex).setupCallCounters()
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, int64(3), *g1.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(1), *g1.Vertex.(*vertex).execCallCount)
require.Equal(t, int64(1), cacheManager.loadCounter)
require.NoError(t, j1.Discard())
j1 = nil
}
// TestOptimizedCacheAccess2 is a more narrow case that tests that inputs are
// not loaded from cache unless they are really needed. Inputs that match by
// definition should be less prioritized for slow cache calculation than the
// inputs that didn't.
func TestOptimizedCacheAccess2(t *testing.T) {
t.Parallel()
ctx := context.TODO()
cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
})},
{Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
})},
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
0: digestFromResult,
1: digestFromResult,
},
}),
}
g0.Vertex.(*vertex).setupCallCounters()
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, int64(3), *g0.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(3), *g0.Vertex.(*vertex).execCallCount)
require.Equal(t, int64(0), cacheManager.loadCounter)
require.NoError(t, j0.Discard())
j0 = nil
// changing cache seed for the input with slow cache should not pull result1
j1, err := l.NewJob("j1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0-nocache",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
})},
{Vertex: vtx(vtxOpt{
name: "v2-changed",
cacheKeySeed: "seed2-changed",
value: "result2", // produces same slow key as g0
})},
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
0: digestFromResult,
1: digestFromResult,
},
}),
}
g1.Vertex.(*vertex).setupCallCounters()
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, int64(3), *g1.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(1), *g1.Vertex.(*vertex).execCallCount)
require.Equal(t, int64(1), cacheManager.loadCounter) // v1 is never loaded nor executed
require.NoError(t, j1.Discard())
j1 = nil
// make sure that both inputs are still used for slow cache hit
j2, err := l.NewJob("j2")
require.NoError(t, err)
defer func() {
if j2 != nil {
j2.Discard()
}
}()
g2 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0-nocache",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1-changed2",
value: "result1",
})},
{Vertex: vtx(vtxOpt{
name: "v2-changed",
cacheKeySeed: "seed2-changed2",
value: "result2", // produces same slow key as g0
})},
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
0: digestFromResult,
1: digestFromResult,
},
}),
}
g2.Vertex.(*vertex).setupCallCounters()
res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, int64(3), *g2.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(2), *g2.Vertex.(*vertex).execCallCount)
require.Equal(t, int64(2), cacheManager.loadCounter)
require.NoError(t, j2.Discard())
j1 = nil
}
func TestSlowCache(t *testing.T) {
t.Parallel()
ctx := context.TODO()
rand.Seed(time.Now().UnixNano())
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
})},
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
0: digestFromResult,
},
}),
}
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.NoError(t, j0.Discard())
j0 = nil
j1, err := l.NewJob("j1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed0",
value: "not-cached",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v3",
cacheKeySeed: "seed3",
value: "result1", // used for slow key
})},
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
0: digestFromResult,
},
}),
}
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.NoError(t, j1.Discard())
j1 = nil
}
// TestParallelInputs validates that inputs are processed in parallel
func TestParallelInputs(t *testing.T) {
t.Parallel()
ctx := context.TODO()
rand.Seed(time.Now().UnixNano())
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
wait2Ready := blockingFuncion(2)
wait2Ready2 := blockingFuncion(2)
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
cachePreFunc: wait2Ready,
execPreFunc: wait2Ready2,
})},
{Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
cachePreFunc: wait2Ready,
execPreFunc: wait2Ready2,
})},
},
}),
}
g0.Vertex.(*vertex).setupCallCounters()
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.NoError(t, j0.Discard())
j0 = nil
require.Equal(t, int64(3), *g0.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(3), *g0.Vertex.(*vertex).execCallCount)
}
func TestErrorReturns(t *testing.T) {
t.Parallel()
ctx := context.TODO()
rand.Seed(time.Now().UnixNano())
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
cachePreFunc: func(ctx context.Context) error {
return errors.Errorf("error-from-test")
},
})},
{Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
})},
},
}),
}
_, err = j0.Build(ctx, g0)
require.Error(t, err)
require.Contains(t, errors.Cause(err).Error(), "error-from-test")
require.NoError(t, j0.Discard())
j0 = nil
// error with cancel error. to check that this isn't mixed up with regular build cancel.
j1, err := l.NewJob("j1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
cachePreFunc: func(ctx context.Context) error {
return context.Canceled
},
})},
{Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
})},
},
}),
}
_, err = j1.Build(ctx, g1)
require.Error(t, err)
require.Equal(t, errors.Cause(err), context.Canceled)
require.NoError(t, j1.Discard())
j1 = nil
// error from exec
j2, err := l.NewJob("j2")
require.NoError(t, err)
defer func() {
if j2 != nil {
j2.Discard()
}
}()
g2 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
})},
{Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed3",
value: "result2",
execPreFunc: func(ctx context.Context) error {
return errors.Errorf("exec-error-from-test")
},
})},
},
}),
}
_, err = j2.Build(ctx, g2)
require.Error(t, err)
require.Contains(t, errors.Cause(err).Error(), "exec-error-from-test")
require.NoError(t, j2.Discard())
j1 = nil
}
func TestMultipleCacheSources(t *testing.T) {
t.Parallel()
ctx := context.TODO()
cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
})},
},
}),
}
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, int64(0), cacheManager.loadCounter)
require.NoError(t, j0.Discard())
j0 = nil
cacheManager2 := newTrackingCacheManager(NewInMemoryCacheManager())
l2 := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager2,
})
defer l2.Close()
j1, err := l.NewJob("j1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0-no-cache",
cacheSource: cacheManager,
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1-no-cache",
cacheSource: cacheManager,
})},
},
}),
}
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, int64(1), cacheManager.loadCounter)
require.Equal(t, int64(0), cacheManager2.loadCounter)
require.NoError(t, j1.Discard())
j0 = nil
// build on top of old cache
j2, err := l.NewJob("j2")
require.NoError(t, err)
defer func() {
if j2 != nil {
j2.Discard()
}
}()
g2 := Edge{
Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
inputs: []Edge{g1},
}),
}
res, err = j1.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result2")
require.Equal(t, int64(2), cacheManager.loadCounter)
require.Equal(t, int64(0), cacheManager2.loadCounter)
require.NoError(t, j1.Discard())
j0 = nil
}
func generateSubGraph(nodes int) (Edge, int) {
if nodes == 1 {
value := rand.Int() % 500
return Edge{Vertex: vtxConst(value, vtxOpt{})}, value
}
spread := rand.Int()%5 + 2
inc := int(math.Ceil(float64(nodes) / float64(spread)))
if inc > nodes {
inc = nodes
}
added := 1
value := 0
inputs := []Edge{}
i := 0
for {
i++
if added >= nodes {
break
}
if added+inc > nodes {
inc = nodes - added
}
e, v := generateSubGraph(inc)
inputs = append(inputs, e)
value += v
added += inc
}
extra := rand.Int() % 500
value += extra
return Edge{Vertex: vtxSum(extra, vtxOpt{inputs: inputs})}, value
}
type vtxOpt struct {
name string
cacheKeySeed string
execDelay time.Duration
cacheDelay time.Duration
cachePreFunc func(context.Context) error
execPreFunc func(context.Context) error
inputs []Edge
value string
slowCacheCompute map[int]ResultBasedCacheFunc
cacheSource CacheManager
}
func vtx(opt vtxOpt) *vertex {
if opt.name == "" {
opt.name = identity.NewID()
}
if opt.cacheKeySeed == "" {
opt.cacheKeySeed = identity.NewID()
}
return &vertex{opt: opt}
}
type vertex struct {
opt vtxOpt
cacheCallCount *int64
execCallCount *int64
}
func (v *vertex) Digest() digest.Digest {
return digest.FromBytes([]byte(v.opt.name))
}
func (v *vertex) Sys() interface{} {
return v
}
func (v *vertex) Inputs() []Edge {
return v.opt.inputs
}
func (v *vertex) Name() string {
return v.opt.name
}
func (v *vertex) Options() VertexOptions {
return VertexOptions{
CacheSource: v.opt.cacheSource,
}
}
func (v *vertex) setupCallCounters() {
var cacheCount int64
var execCount int64
v.setCallCounters(&cacheCount, &execCount)
}
func (v *vertex) setCallCounters(cacheCount, execCount *int64) {
v.cacheCallCount = cacheCount
v.execCallCount = execCount
for _, inp := range v.opt.inputs {
var v *vertex
switch vv := inp.Vertex.(type) {
case *vertex:
v = vv
case *vertexSum:
v = vv.vertex
case *vertexConst:
v = vv.vertex
}
v.setCallCounters(cacheCount, execCount)
}
}
func (v *vertex) cacheMap(ctx context.Context) error {
if f := v.opt.cachePreFunc; f != nil {
if err := f(ctx); err != nil {
return err
}
}
if v.cacheCallCount != nil {
atomic.AddInt64(v.cacheCallCount, 1)
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
select {
case <-time.After(v.opt.cacheDelay):
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (v *vertex) CacheMap(ctx context.Context) (*CacheMap, error) {
if err := v.cacheMap(ctx); err != nil {
return nil, err
}
return v.makeCacheMap(), nil
}
func (v *vertex) exec(ctx context.Context, inputs []Result) error {
if len(inputs) != len(v.Inputs()) {
return errors.Errorf("invalid number of inputs")
}
if f := v.opt.execPreFunc; f != nil {
if err := f(ctx); err != nil {
return err
}
}
if v.execCallCount != nil {
atomic.AddInt64(v.execCallCount, 1)
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
select {
case <-time.After(v.opt.execDelay):
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (v *vertex) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) {
if err := v.exec(ctx, inputs); err != nil {
return nil, err
}
return []Result{&dummyResult{id: identity.NewID(), value: v.opt.value}}, nil
}
func (v *vertex) makeCacheMap() *CacheMap {
m := &CacheMap{
Digest: digest.FromBytes([]byte(fmt.Sprintf("seed:%s", v.opt.cacheKeySeed))),
Deps: make([]struct {
Selector digest.Digest
ComputeDigestFunc ResultBasedCacheFunc
}, len(v.Inputs())),
}
for i, f := range v.opt.slowCacheCompute {
m.Deps[i].ComputeDigestFunc = f
}
return m
}
// vtxConst returns a vertex that outputs a constant integer
func vtxConst(v int, opt vtxOpt) *vertexConst {
if opt.cacheKeySeed == "" {
opt.cacheKeySeed = fmt.Sprintf("const-%d", v)
}
if opt.name == "" {
opt.name = opt.cacheKeySeed + "-" + identity.NewID()
}
return &vertexConst{vertex: vtx(opt), value: v}
}
type vertexConst struct {
*vertex
value int
}
func (v *vertexConst) Sys() interface{} {
return v
}
func (v *vertexConst) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) {
if err := v.exec(ctx, inputs); err != nil {
return nil, err
}
return []Result{&dummyResult{id: identity.NewID(), intValue: v.value}}, nil
}
// vtxSum returns a vertex that ourputs sum of its inputs plus a constant
func vtxSum(v int, opt vtxOpt) *vertexSum {
if opt.cacheKeySeed == "" {
opt.cacheKeySeed = fmt.Sprintf("sum-%d", v)
}
if opt.name == "" {
opt.name = opt.cacheKeySeed + "-" + identity.NewID()
}
return &vertexSum{vertex: vtx(opt), value: v}
}
type vertexSum struct {
*vertex
value int
}
func (v *vertexSum) Sys() interface{} {
return v
}
func (v *vertexSum) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) {
if err := v.exec(ctx, inputs); err != nil {
return nil, err
}
s := v.value
for _, inp := range inputs {
r, ok := inp.Sys().(*dummyResult)
if !ok {
return nil, errors.Errorf("invalid input type: %T", inp.Sys())
}
s += r.intValue
}
return []Result{&dummyResult{id: identity.NewID(), intValue: s}}, nil
}
func printGraph(e Edge, pfx string) {
name := e.Vertex.Name()
fmt.Printf("%s %d %s\n", pfx, e.Index, name)
for _, inp := range e.Vertex.Inputs() {
printGraph(inp, pfx+"-->")
}
}
type dummyResult struct {
id string
value string
intValue int
}
func (r *dummyResult) ID() string { return r.id }
func (r *dummyResult) Release(context.Context) error { return nil }
func (r *dummyResult) Sys() interface{} { return r }
func testOpResolver(v Vertex) (Op, error) {
if op, ok := v.Sys().(Op); ok {
return op, nil
}
return nil, errors.Errorf("invalid vertex")
}
func unwrap(res Result) string {
r, ok := res.Sys().(*dummyResult)
if !ok {
return "unwrap-error"
}
return r.value
}
func unwrapInt(res Result) int {
r, ok := res.Sys().(*dummyResult)
if !ok {
return -1e6
}
return r.intValue
}
func blockingFuncion(i int) func(context.Context) error {
limit := int64(i)
block := make(chan struct{})
return func(context.Context) error {
if atomic.AddInt64(&limit, -1) == 0 {
close(block)
}
<-block
return nil
}
}
func newTrackingCacheManager(cm CacheManager) *trackingCacheManager {
return &trackingCacheManager{CacheManager: cm}
}
type trackingCacheManager struct {
CacheManager
loadCounter int64
}
func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
atomic.AddInt64(&cm.loadCounter, 1)
return cm.CacheManager.Load(ctx, rec)
}
func digestFromResult(ctx context.Context, res Result) (digest.Digest, error) {
return digest.FromBytes([]byte(unwrap(res))), nil
}