From a2d9a6ea0b47c1dac438c1375a4fd303660966db Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 31 Jan 2018 18:01:51 -0800 Subject: [PATCH] Revert "solver: new implementation" This reverts commit 5939939666833bad4270a82dd52257b302b9cf73. Signed-off-by: Tonis Tiigi --- solver2/solver/index.go | 186 ---- solver2/solver/index_test.go | 1 - solver2/solver/jobs.go | 515 ---------- solver2/solver/llb/vertex.go | 128 --- solver2/solver/memorycache.go | 222 ----- solver2/solver/memorycache_test.go | 127 --- solver2/solver/pipe.go | 195 ---- solver2/solver/pipe_test.go | 88 -- solver2/solver/progress.go | 107 --- solver2/solver/result.go | 72 -- solver2/solver/scheduler.go | 948 ------------------- solver2/solver/scheduler_test.go | 1422 ---------------------------- solver2/solver/types.go | 144 --- 13 files changed, 4155 deletions(-) delete mode 100644 solver2/solver/index.go delete mode 100644 solver2/solver/index_test.go delete mode 100644 solver2/solver/jobs.go delete mode 100644 solver2/solver/llb/vertex.go delete mode 100644 solver2/solver/memorycache.go delete mode 100644 solver2/solver/memorycache_test.go delete mode 100644 solver2/solver/pipe.go delete mode 100644 solver2/solver/pipe_test.go delete mode 100644 solver2/solver/progress.go delete mode 100644 solver2/solver/result.go delete mode 100644 solver2/solver/scheduler.go delete mode 100644 solver2/solver/scheduler_test.go delete mode 100644 solver2/solver/types.go diff --git a/solver2/solver/index.go b/solver2/solver/index.go deleted file mode 100644 index fe6f7da7..00000000 --- a/solver2/solver/index.go +++ /dev/null @@ -1,186 +0,0 @@ -package solver - -import ( - "fmt" - "sync" - - digest "github.com/opencontainers/go-digest" -) - -// EdgeIndex is a synchronous map for detecting edge collisions. -type EdgeIndex struct { - mu sync.Mutex - - items map[indexedDigest]map[indexedDigest]map[*edge]struct{} - backRefs map[*edge]map[indexedDigest]map[indexedDigest]struct{} -} - -func NewEdgeIndex() *EdgeIndex { - return &EdgeIndex{ - items: map[indexedDigest]map[indexedDigest]map[*edge]struct{}{}, - backRefs: map[*edge]map[indexedDigest]map[indexedDigest]struct{}{}, - } -} - -func (ei *EdgeIndex) LoadOrStore(e *edge, dgst digest.Digest, index Index, deps [][]CacheKey) *edge { - ei.mu.Lock() - defer ei.mu.Unlock() - - if e := ei.load(e, dgst, index, deps); e != nil { - return e - } - - ei.store(e, dgst, index, deps) - - return nil -} - -func (ei *EdgeIndex) Release(e *edge) { - ei.mu.Lock() - defer ei.mu.Unlock() - - for id, backRefs := range ei.backRefs[e] { - for id2 := range backRefs { - delete(ei.items[id][id2], e) - if len(ei.items[id][id2]) == 0 { - delete(ei.items[id], id2) - } - } - if len(ei.items[id]) == 0 { - delete(ei.items, id) - } - } - delete(ei.backRefs, e) -} - -func (ei *EdgeIndex) load(ignore *edge, dgst digest.Digest, index Index, deps [][]CacheKey) *edge { - id := indexedDigest{dgst: dgst, index: index, depsCount: len(deps)} - m, ok := ei.items[id] - if !ok { - return nil - } - if len(deps) == 0 { - m2, ok := m[indexedDigest{}] - if !ok { - return nil - } - for e := range m2 { - if e != ignore { - return e - } - } - return nil - } - - matches := map[*edge]struct{}{} - for i, keys := range deps { - if i == 0 { - for _, key := range keys { - id := indexedDigest{dgst: getUniqueID(key), index: Index(i)} - for e := range m[id] { - if e != ignore { - matches[e] = struct{}{} - } - } - } - } else { - loop0: - for match := range matches { - for _, key := range keys { - id := indexedDigest{dgst: getUniqueID(key), index: Index(i)} - if m[id] != nil { - if _, ok := m[id][match]; ok { - continue loop0 - } - } - } - delete(matches, match) - } - } - if len(matches) == 0 { - break - } - } - - for m := range matches { - return m - } - return nil -} - -func (ei *EdgeIndex) store(e *edge, dgst digest.Digest, index Index, deps [][]CacheKey) { - id := indexedDigest{dgst: dgst, index: index, depsCount: len(deps)} - m, ok := ei.items[id] - if !ok { - m = map[indexedDigest]map[*edge]struct{}{} - ei.items[id] = m - } - - backRefsMain, ok := ei.backRefs[e] - if !ok { - backRefsMain = map[indexedDigest]map[indexedDigest]struct{}{} - ei.backRefs[e] = backRefsMain - } - - backRefs, ok := backRefsMain[id] - if !ok { - backRefs = map[indexedDigest]struct{}{} - backRefsMain[id] = backRefs - } - - if len(deps) == 0 { - m2, ok := m[indexedDigest{}] - if !ok { - m2 = map[*edge]struct{}{} - m[indexedDigest{}] = m2 - } - m2[e] = struct{}{} - - backRefs[indexedDigest{}] = struct{}{} - - return - } - - for i, keys := range deps { - for _, key := range keys { - id := indexedDigest{dgst: getUniqueID(key), index: Index(i)} - m2, ok := m[id] - if !ok { - m2 = map[*edge]struct{}{} - m[id] = m2 - } - m2[e] = struct{}{} - backRefs[id] = struct{}{} - } - } -} - -type indexedDigest struct { - dgst digest.Digest - index Index - depsCount int -} - -type internalKeyT string - -var internalKey = internalKeyT("buildkit/unique-cache-id") - -func getUniqueID(k CacheKey) digest.Digest { - internalV := k.GetValue(internalKey) - if internalV != nil { - return internalV.(digest.Digest) - } - - dgstr := digest.SHA256.Digester() - for _, inp := range k.Deps() { - dgstr.Hash().Write([]byte(getUniqueID(inp))) - } - - dgstr.Hash().Write([]byte(k.Digest())) - dgstr.Hash().Write([]byte(fmt.Sprintf("%d", k.Output()))) - - dgst := dgstr.Digest() - k.SetValue(internalKey, dgst) - - return dgst -} diff --git a/solver2/solver/index_test.go b/solver2/solver/index_test.go deleted file mode 100644 index 321561cd..00000000 --- a/solver2/solver/index_test.go +++ /dev/null @@ -1 +0,0 @@ -package solver diff --git a/solver2/solver/jobs.go b/solver2/solver/jobs.go deleted file mode 100644 index 5c65d830..00000000 --- a/solver2/solver/jobs.go +++ /dev/null @@ -1,515 +0,0 @@ -package solver - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/moby/buildkit/client" - "github.com/moby/buildkit/util/flightcontrol" - "github.com/moby/buildkit/util/progress" - digest "github.com/opencontainers/go-digest" - "github.com/pkg/errors" -) - -// ResolveOpFunc finds an Op implementation for a Vertex -type ResolveOpFunc func(Vertex) (Op, error) - -// JobList provides a shared graph of all the vertexes currently being -// processed. Every vertex that is being solved needs to be loaded into job -// first. Vertex operations are invoked and progress tracking happends through -// jobs. -// TODO: s/JobList/Solver -type JobList struct { - mu sync.RWMutex - jobs map[string]*Job - actives map[digest.Digest]*state - opts SolverOpt - - updateCond *sync.Cond - s *Scheduler - index *EdgeIndex -} - -type state struct { - jobs map[*Job]struct{} - parents map[digest.Digest]struct{} - childVtx map[digest.Digest]struct{} - - mpw *progress.MultiWriter - allPw map[progress.Writer]struct{} - - vtx Vertex - clientVertex client.Vertex - - mu sync.Mutex - op *sharedOp - edges map[Index]*edge - opts SolverOpt - index *EdgeIndex -} - -func (s *state) getEdge(index Index) *edge { - s.mu.Lock() - defer s.mu.Unlock() - if e, ok := s.edges[index]; ok { - return e - } - - if s.op == nil { - s.op = newSharedOp(s.opts.ResolveOpFunc, s.opts.DefaultCache, s) - } - - e := newEdge(Edge{Index: index, Vertex: s.vtx}, s.op, s.index) - s.edges[index] = e - return e -} - -func (s *state) setEdge(index Index, newEdge *edge) { - s.mu.Lock() - defer s.mu.Unlock() - e, ok := s.edges[index] - if ok { - if e == newEdge { - return - } - e.release() - } - - newEdge.duplicateReleaser() - s.edges[index] = newEdge -} - -func (s *state) Release() { - for _, e := range s.edges { - e.release() - } -} - -type Job struct { - list *JobList - pr *progress.MultiReader - pw progress.Writer - - progressCloser func() -} - -type SolverOpt struct { - ResolveOpFunc ResolveOpFunc - DefaultCache CacheManager -} - -func NewJobList(opts SolverOpt) *JobList { - if opts.DefaultCache == nil { - opts.DefaultCache = NewInMemoryCacheManager() - } - jl := &JobList{ - jobs: make(map[string]*Job), - actives: make(map[digest.Digest]*state), - opts: opts, - index: NewEdgeIndex(), - } - jl.s = NewScheduler(jl) - jl.updateCond = sync.NewCond(jl.mu.RLocker()) - return jl -} - -func (jl *JobList) SetEdge(e Edge, newEdge *edge) { - jl.mu.RLock() - defer jl.mu.RUnlock() - - st, ok := jl.actives[e.Vertex.Digest()] - if !ok { - return - } - - st.setEdge(e.Index, newEdge) -} - -func (jl *JobList) GetEdge(e Edge) *edge { - jl.mu.RLock() - defer jl.mu.RUnlock() - - st, ok := jl.actives[e.Vertex.Digest()] - if !ok { - return nil - } - return st.getEdge(e.Index) -} - -func (jl *JobList) SubBuild(ctx context.Context, e Edge, parent Vertex) (CachedResult, error) { - if err := jl.load(e.Vertex, parent, nil); err != nil { - return nil, err - } - - return jl.s.build(ctx, e) -} - -func (jl *JobList) Close() { - jl.s.Stop() -} - -func (jl *JobList) load(v, parent Vertex, j *Job) error { - jl.mu.Lock() - defer jl.mu.Unlock() - return jl.loadUnlocked(v, parent, j) -} - -func (jl *JobList) loadUnlocked(v, parent Vertex, j *Job) error { - for _, e := range v.Inputs() { - if err := jl.loadUnlocked(e.Vertex, parent, j); err != nil { - return err - } - } - - dgst := v.Digest() - - st, ok := jl.actives[dgst] - if !ok { - st = &state{ - opts: jl.opts, - jobs: map[*Job]struct{}{}, - parents: map[digest.Digest]struct{}{}, - childVtx: map[digest.Digest]struct{}{}, - allPw: map[progress.Writer]struct{}{}, - mpw: progress.NewMultiWriter(progress.WithMetadata("vertex", dgst)), - vtx: v, - clientVertex: initClientVertex(v), - edges: map[Index]*edge{}, - index: jl.index, - } - jl.actives[dgst] = st - } - - if j != nil { - if _, ok := st.jobs[j]; !ok { - st.jobs[j] = struct{}{} - } - } - - if parent != nil { - if _, ok := st.parents[parent.Digest()]; !ok { - st.parents[parent.Digest()] = struct{}{} - parentState, ok := jl.actives[parent.Digest()] - if !ok { - return errors.Errorf("inactive parent %s", parent.Digest()) - } - parentState.childVtx[dgst] = struct{}{} - } - } - - jl.connectProgressFromState(st, st) - return nil -} - -func (jl *JobList) connectProgressFromState(target, src *state) { - for j := range src.jobs { - if _, ok := target.allPw[j.pw]; !ok { - target.mpw.Add(j.pw) - target.allPw[j.pw] = struct{}{} - j.pw.Write(target.clientVertex.Digest.String(), target.clientVertex) - } - } - for p := range src.parents { - jl.connectProgressFromState(target, jl.actives[p]) - } -} - -func (jl *JobList) NewJob(id string) (*Job, error) { - jl.mu.Lock() - defer jl.mu.Unlock() - - if _, ok := jl.jobs[id]; ok { - return nil, errors.Errorf("job ID %s exists", id) - } - - pr, ctx, progressCloser := progress.NewContext(context.Background()) - pw, _, _ := progress.FromContext(ctx) // TODO: expose progress.Pipe() - - j := &Job{ - list: jl, - pr: progress.NewMultiReader(pr), - pw: pw, - progressCloser: progressCloser, - } - jl.jobs[id] = j - - jl.updateCond.Broadcast() - - return j, nil -} - -func (jl *JobList) Get(id string) (*Job, error) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - go func() { - <-ctx.Done() - jl.updateCond.Broadcast() - }() - - jl.mu.RLock() - defer jl.mu.RUnlock() - for { - select { - case <-ctx.Done(): - return nil, errors.Errorf("no such job %s", id) - default: - } - j, ok := jl.jobs[id] - if !ok { - jl.updateCond.Wait() - continue - } - return j, nil - } -} - -// called with joblist lock -func (jl *JobList) deleteIfUnreferenced(k digest.Digest, st *state) { - if len(st.jobs) == 0 && len(st.parents) == 0 { - for chKey := range st.childVtx { - chState := jl.actives[chKey] - delete(chState.parents, k) - jl.deleteIfUnreferenced(chKey, chState) - } - st.Release() - delete(jl.actives, k) - } -} - -func (j *Job) Build(ctx context.Context, e Edge) (CachedResult, error) { - if err := j.list.load(e.Vertex, nil, j); err != nil { - return nil, err - } - return j.list.s.build(ctx, e) -} - -func (j *Job) Discard() error { - defer j.progressCloser() - - j.list.mu.Lock() - defer j.list.mu.Unlock() - - j.pw.Close() - - for k, st := range j.list.actives { - if _, ok := st.jobs[j]; ok { - delete(st.jobs, j) - j.list.deleteIfUnreferenced(k, st) - } - if _, ok := st.allPw[j.pw]; ok { - delete(st.allPw, j.pw) - } - } - return nil -} - -type activeOp interface { - Op - Cache() CacheManager - CalcSlowCache(context.Context, Index, ResultBasedCacheFunc, Result) (digest.Digest, error) -} - -func newSharedOp(resolver ResolveOpFunc, cacheManager CacheManager, st *state) *sharedOp { - so := &sharedOp{ - resolver: resolver, - st: st, - cacheManager: cacheManager, - slowCacheRes: map[Index]digest.Digest{}, - slowCacheErr: map[Index]error{}, - } - return so -} - -type sharedOp struct { - resolver ResolveOpFunc - cacheManager CacheManager - st *state - g flightcontrol.Group - - opOnce sync.Once - op Op - err error - - execRes []*SharedResult - execErr error - - cacheRes *CacheMap - cacheErr error - - slowMu sync.Mutex - slowCacheRes map[Index]digest.Digest - slowCacheErr map[Index]error -} - -func (s *sharedOp) Cache() CacheManager { - return s.cacheManager // TODO: add on load -} - -func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBasedCacheFunc, res Result) (digest.Digest, error) { - key, err := s.g.Do(ctx, fmt.Sprintf("slow-compute-%d", index), func(ctx context.Context) (interface{}, error) { - s.slowMu.Lock() - // TODO: add helpers for these stored values - if res := s.slowCacheRes[index]; res != "" { - s.slowMu.Unlock() - return res, nil - } - if err := s.slowCacheErr[index]; err != nil { - s.slowMu.Unlock() - return err, nil - } - s.slowMu.Unlock() - ctx = progress.WithProgress(ctx, s.st.mpw) - key, err := f(ctx, res) - complete := true - if err != nil { - canceled := false - select { - case <-ctx.Done(): - canceled = true - default: - } - if canceled && errors.Cause(err) == context.Canceled { - complete = false - } - } - s.slowMu.Lock() - defer s.slowMu.Unlock() - if complete { - if err == nil { - s.slowCacheRes[index] = key - } - s.slowCacheErr[index] = err - } - return key, err - }) - if err != nil { - return "", err - } - return key.(digest.Digest), nil -} - -func (s *sharedOp) CacheMap(ctx context.Context) (*CacheMap, error) { - op, err := s.getOp() - if err != nil { - return nil, err - } - res, err := s.g.Do(ctx, "cachemap", func(ctx context.Context) (interface{}, error) { - if s.cacheRes != nil { - return s.cacheRes, nil - } - if s.cacheErr != nil { - return nil, s.cacheErr - } - ctx = progress.WithProgress(ctx, s.st.mpw) - res, err := op.CacheMap(ctx) - complete := true - if err != nil { - canceled := false - select { - case <-ctx.Done(): - canceled = true - default: - } - if canceled && errors.Cause(err) == context.Canceled { - complete = false - } - } - if complete { - if err == nil { - s.cacheRes = res - } - s.cacheErr = err - } - return res, err - }) - if err != nil { - return nil, err - } - return res.(*CacheMap), nil -} - -func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) { - op, err := s.getOp() - if err != nil { - return nil, err - } - res, err := s.g.Do(ctx, "exec", func(ctx context.Context) (interface{}, error) { - if s.execRes != nil || s.execErr != nil { - return s.execRes, s.execErr - } - ctx = progress.WithProgress(ctx, s.st.mpw) - res, err := op.Exec(ctx, inputs) - complete := true - if err != nil { - canceled := false - select { - case <-ctx.Done(): - canceled = true - default: - } - if canceled && errors.Cause(err) == context.Canceled { - complete = false - } - } - if complete { - if res != nil { - s.execRes = wrapShared(res) - } - s.execErr = err - } - return s.execRes, err - }) - if err != nil { - return nil, err - } - return unwrapShared(res.([]*SharedResult)), nil -} - -func (s *sharedOp) getOp() (Op, error) { - s.opOnce.Do(func() { - s.op, s.err = s.resolver(s.st.vtx) - }) - if s.err != nil { - return nil, s.err - } - return s.op, nil -} - -func (s *sharedOp) release() { - if s.execRes != nil { - for _, r := range s.execRes { - r.Release(context.TODO()) - } - } -} - -func initClientVertex(v Vertex) client.Vertex { - inputDigests := make([]digest.Digest, 0, len(v.Inputs())) - for _, inp := range v.Inputs() { - inputDigests = append(inputDigests, inp.Vertex.Digest()) - } - return client.Vertex{ - Inputs: inputDigests, - Name: v.Name(), - Digest: v.Digest(), - } -} - -func wrapShared(inp []Result) []*SharedResult { - out := make([]*SharedResult, len(inp)) - for i, r := range inp { - out[i] = NewSharedResult(r) - } - return out -} - -func unwrapShared(inp []*SharedResult) []Result { - out := make([]Result, len(inp)) - for i, r := range inp { - out[i] = r.Clone() - } - return out -} diff --git a/solver2/solver/llb/vertex.go b/solver2/solver/llb/vertex.go deleted file mode 100644 index df1e75b8..00000000 --- a/solver2/solver/llb/vertex.go +++ /dev/null @@ -1,128 +0,0 @@ -package llb - -import ( - "strings" - - "github.com/moby/buildkit/solver/pb" - "github.com/moby/buildkit/solver2/solver" - "github.com/moby/buildkit/source" - digest "github.com/opencontainers/go-digest" - "github.com/pkg/errors" -) - -type vertex struct { - sys interface{} - metadata *pb.OpMetadata - inputs []solver.Edge - digest digest.Digest - name string -} - -func (v *vertex) Digest() digest.Digest { - return v.digest -} - -func (v *vertex) Sys() interface{} { - return v.sys -} - -func (v *vertex) Metadata() *pb.OpMetadata { - return v.metadata -} - -func (v *vertex) Inputs() []solver.Edge { - return v.inputs -} - -func (v *vertex) Name() string { - return v.name -} - -func Load(def *pb.Definition) (solver.Edge, error) { - return loadLLB(def, func(dgst digest.Digest, pbOp *pb.Op, load func(digest.Digest) (solver.Vertex, error)) (solver.Vertex, error) { - opMetadata := def.Metadata[dgst] - vtx, err := newVertex(dgst, pbOp, &opMetadata, load) - if err != nil { - return nil, err - } - return vtx, nil - }) -} - -func newVertex(dgst digest.Digest, op *pb.Op, opMeta *pb.OpMetadata, load func(digest.Digest) (solver.Vertex, error)) (*vertex, error) { - vtx := &vertex{sys: op.Op, metadata: opMeta, digest: dgst, name: llbOpName(op)} - for _, in := range op.Inputs { - sub, err := load(in.Digest) - if err != nil { - return nil, err - } - vtx.inputs = append(vtx.inputs, solver.Edge{Index: solver.Index(in.Index), Vertex: sub}) - } - return vtx, nil -} - -// loadLLB loads LLB. -// fn is executed sequentially. -func loadLLB(def *pb.Definition, fn func(digest.Digest, *pb.Op, func(digest.Digest) (solver.Vertex, error)) (solver.Vertex, error)) (solver.Edge, error) { - if len(def.Def) == 0 { - return solver.Edge{}, errors.New("invalid empty definition") - } - - allOps := make(map[digest.Digest]*pb.Op) - - var dgst digest.Digest - - for _, dt := range def.Def { - var op pb.Op - if err := (&op).Unmarshal(dt); err != nil { - return solver.Edge{}, errors.Wrap(err, "failed to parse llb proto op") - } - dgst = digest.FromBytes(dt) - allOps[dgst] = &op - } - - lastOp := allOps[dgst] - delete(allOps, dgst) - dgst = lastOp.Inputs[0].Digest - - cache := make(map[digest.Digest]solver.Vertex) - - var rec func(dgst digest.Digest) (solver.Vertex, error) - rec = func(dgst digest.Digest) (solver.Vertex, error) { - if v, ok := cache[dgst]; ok { - return v, nil - } - v, err := fn(dgst, allOps[dgst], rec) - if err != nil { - return nil, err - } - cache[dgst] = v - return v, nil - } - - v, err := rec(dgst) - if err != nil { - return solver.Edge{}, err - } - return solver.Edge{Vertex: v, Index: solver.Index(lastOp.Inputs[0].Index)}, nil -} - -func llbOpName(op *pb.Op) string { - switch op := op.Op.(type) { - case *pb.Op_Source: - if id, err := source.FromLLB(op); err == nil { - if id, ok := id.(*source.LocalIdentifier); ok { - if len(id.IncludePatterns) == 1 { - return op.Source.Identifier + " (" + id.IncludePatterns[0] + ")" - } - } - } - return op.Source.Identifier - case *pb.Op_Exec: - return strings.Join(op.Exec.Meta.Args, " ") - case *pb.Op_Build: - return "build" - default: - return "unknown" - } -} diff --git a/solver2/solver/memorycache.go b/solver2/solver/memorycache.go deleted file mode 100644 index 2037f9b1..00000000 --- a/solver2/solver/memorycache.go +++ /dev/null @@ -1,222 +0,0 @@ -package solver - -import ( - "context" - "fmt" - "strings" - "sync" - - digest "github.com/opencontainers/go-digest" - "github.com/pkg/errors" -) - -type internalMemoryKeyT string - -var internalMemoryKey = internalMemoryKeyT("buildkit/memory-cache-id") - -func NewInMemoryCacheManager() CacheManager { - return &inMemoryCacheManager{ - byID: map[string]*inMemoryCacheKey{}, - } -} - -type inMemoryCacheKey struct { - CacheKey - id string - dgst digest.Digest - output Index - deps []CacheKey // only []*inMemoryCacheManager - - results map[Index]map[string]Result - links map[link]map[string]struct{} -} - -func (ck *inMemoryCacheKey) Deps() []CacheKey { - return ck.deps -} -func (ck *inMemoryCacheKey) Digest() digest.Digest { - return ck.dgst -} -func (ck *inMemoryCacheKey) Index() Index { - return ck.output -} - -type link struct { - input, output Index - digest digest.Digest -} - -type inMemoryCacheManager struct { - mu sync.RWMutex - byID map[string]*inMemoryCacheKey -} - -func (c *inMemoryCacheManager) Query(deps []CacheKey, input Index, dgst digest.Digest, output Index) ([]*CacheRecord, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - refs := map[string]struct{}{} - sublinks := map[string]struct{}{} - - for _, dep := range deps { - ck, err := c.getInternalKey(dep, false) - if err == nil { - for key := range ck.links[link{input, output, dgst}] { - refs[key] = struct{}{} - } - for key := range ck.links[link{Index(-1), Index(0), ""}] { - sublinks[key] = struct{}{} - } - } - } - - for id := range sublinks { - if ck, ok := c.byID[id]; ok { - for key := range ck.links[link{input, output, dgst}] { - refs[key] = struct{}{} - } - } - } - - if len(deps) == 0 { - ck, err := c.getInternalKey(NewCacheKey(dgst, 0, nil), false) - if err != nil { - return nil, nil - } - refs[ck.id] = struct{}{} - } - - outs := make([]*CacheRecord, 0, len(refs)) - for id := range refs { - if ck, ok := c.byID[id]; ok { - for _, res := range ck.results[output] { - outs = append(outs, &CacheRecord{ - ID: id + "@" + res.ID(), - CacheKey: ck, - }) - } - } - } - - return outs, nil -} - -func (c *inMemoryCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - keyParts := strings.Split(rec.ID, "@") - if len(keyParts) != 2 { - return nil, errors.Errorf("invalid cache record ID") - } - ck, err := c.getInternalKey(rec.CacheKey, false) - if err != nil { - return nil, err - } - - for output := range ck.results { - res, ok := ck.results[output][keyParts[1]] - if ok { - return res, nil - } - } - return nil, errors.Errorf("failed to load cache record") // TODO: typed error -} - -func (c *inMemoryCacheManager) Save(k CacheKey, r Result) (CacheKey, error) { - c.mu.Lock() - defer c.mu.Unlock() - - ck, err := c.getInternalKey(k, true) - if err != nil { - return nil, err - } - if err := c.addResult(ck, k.Output(), r); err != nil { - return nil, err - } - return ck, nil -} - -func (c *inMemoryCacheManager) getInternalKey(k CacheKey, createIfNotExist bool) (*inMemoryCacheKey, error) { - if ck, ok := k.(*inMemoryCacheKey); ok { - return ck, nil - } - internalV := k.GetValue(internalMemoryKey) - if internalV != nil { - ck, ok := c.byID[internalV.(string)] - if !ok { - return nil, errors.Errorf("failed lookup by internal ID %s", internalV.(string)) - } - return ck, nil - } - inputs := make([]CacheKey, len(k.Deps())) - dgstr := digest.SHA256.Digester() - for i, inp := range k.Deps() { - ck, err := c.getInternalKey(inp, createIfNotExist) - if err != nil { - return nil, err - } - inputs[i] = ck - if _, err := dgstr.Hash().Write([]byte(ck.id)); err != nil { - return nil, err - } - } - - if _, err := dgstr.Hash().Write([]byte(k.Digest())); err != nil { - return nil, err - } - - if _, err := dgstr.Hash().Write([]byte(fmt.Sprintf("%d", k.Output()))); err != nil { - return nil, err - } - - internalKey := string(dgstr.Digest()) - ck, ok := c.byID[internalKey] - if !ok { - if !createIfNotExist { - return nil, errors.Errorf("not-found") - } - ck = &inMemoryCacheKey{ - CacheKey: k, - id: internalKey, - dgst: k.Digest(), - output: k.Output(), - deps: inputs, - results: map[Index]map[string]Result{}, - links: map[link]map[string]struct{}{}, - } - ck.SetValue(internalMemoryKey, internalKey) - c.byID[internalKey] = ck - } - - for i, inp := range inputs { - if ck.dgst == "" { - i = -1 - } - if err := c.addLink(link{Index(i), ck.output, ck.dgst}, inp.(*inMemoryCacheKey), ck); err != nil { - return nil, err - } - } - - return ck, nil -} - -func (c *inMemoryCacheManager) addResult(ck *inMemoryCacheKey, output Index, r Result) error { - m, ok := ck.results[output] - if !ok { - m = map[string]Result{} - ck.results[output] = m - } - m[r.ID()] = r - return nil -} - -func (c *inMemoryCacheManager) addLink(l link, from, to *inMemoryCacheKey) error { - m, ok := from.links[l] - if !ok { - m = map[string]struct{}{} - from.links[l] = m - } - m[to.id] = struct{}{} - return nil -} diff --git a/solver2/solver/memorycache_test.go b/solver2/solver/memorycache_test.go deleted file mode 100644 index bf83592f..00000000 --- a/solver2/solver/memorycache_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package solver - -import ( - "context" - "testing" - - "github.com/moby/buildkit/identity" - digest "github.com/opencontainers/go-digest" - "github.com/stretchr/testify/require" -) - -func TestInMemoryCache(t *testing.T) { - ctx := context.TODO() - - m := NewInMemoryCacheManager() - - cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("result0")) - require.NoError(t, err) - - matches, err := m.Query(nil, 0, dgst("foo"), 0) - require.NoError(t, err) - require.Equal(t, len(matches), 1) - - res, err := m.Load(ctx, matches[0]) - require.NoError(t, err) - require.Equal(t, "result0", unwrap(res)) - - // another record - cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0, nil), testResult("result1")) - require.NoError(t, err) - - matches, err = m.Query(nil, 0, dgst("bar"), 0) - require.NoError(t, err) - require.Equal(t, len(matches), 1) - - res, err = m.Load(ctx, matches[0]) - require.NoError(t, err) - require.Equal(t, "result1", unwrap(res)) - - // invalid request - matches, err = m.Query(nil, 0, dgst("baz"), 0) - require.NoError(t, err) - require.Equal(t, len(matches), 0) - - // second level - k := NewCacheKey(dgst("baz"), Index(1), []CacheKey{ - cacheFoo, cacheBar, - }) - cacheBaz, err := m.Save(k, testResult("result2")) - require.NoError(t, err) - - matches, err = m.Query(nil, 0, dgst("baz"), 0) - require.NoError(t, err) - require.Equal(t, len(matches), 0) - - matches, err = m.Query([]CacheKey{cacheFoo}, 0, dgst("baz"), 0) - require.NoError(t, err) - require.Equal(t, len(matches), 0) - - matches, err = m.Query([]CacheKey{cacheFoo}, 1, dgst("baz"), Index(1)) - require.NoError(t, err) - require.Equal(t, len(matches), 0) - - matches, err = m.Query([]CacheKey{cacheFoo}, 0, dgst("baz"), Index(1)) - require.NoError(t, err) - require.Equal(t, len(matches), 1) - - res, err = m.Load(ctx, matches[0]) - require.NoError(t, err) - require.Equal(t, "result2", unwrap(res)) - - matches2, err := m.Query([]CacheKey{cacheBar}, 1, dgst("baz"), Index(1)) - require.NoError(t, err) - require.Equal(t, len(matches2), 1) - - require.Equal(t, matches[0].ID, matches2[0].ID) - - k = NewCacheKey(dgst("baz"), Index(1), []CacheKey{ - cacheFoo, - }) - _, err = m.Save(k, testResult("result3")) - require.NoError(t, err) - - matches, err = m.Query([]CacheKey{cacheFoo}, 0, dgst("baz"), Index(1)) - require.NoError(t, err) - require.Equal(t, len(matches), 2) - - // combination save - k2 := NewCacheKey("", 0, []CacheKey{ - cacheFoo, cacheBaz, - }) - - k = NewCacheKey(dgst("bax"), 0, []CacheKey{ - k2, cacheBar, - }) - _, err = m.Save(k, testResult("result4")) - require.NoError(t, err) - - // foo, bar, baz should all point to result4 - matches, err = m.Query([]CacheKey{cacheFoo}, 0, dgst("bax"), 0) - require.NoError(t, err) - require.Equal(t, len(matches), 1) - - id := matches[0].ID - - matches, err = m.Query([]CacheKey{cacheBar}, 1, dgst("bax"), 0) - require.NoError(t, err) - require.Equal(t, len(matches), 1) - require.Equal(t, matches[0].ID, id) - - matches, err = m.Query([]CacheKey{cacheBaz}, 0, dgst("bax"), 0) - require.NoError(t, err) - require.Equal(t, len(matches), 1) - require.Equal(t, matches[0].ID, id) - -} - -func dgst(s string) digest.Digest { - return digest.FromBytes([]byte(s)) -} - -func testResult(v string) Result { - return &dummyResult{ - id: identity.NewID(), - value: v, - } -} diff --git a/solver2/solver/pipe.go b/solver2/solver/pipe.go deleted file mode 100644 index 3689924d..00000000 --- a/solver2/solver/pipe.go +++ /dev/null @@ -1,195 +0,0 @@ -package solver - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/pkg/errors" -) - -type Channel struct { - Signal func() - mu sync.Mutex - value atomic.Value - lastValue interface{} -} - -func (c *Channel) Send(v interface{}) { - c.value.Store(v) - if c.Signal != nil { - c.Signal() - } -} - -func (c *Channel) Receive() (interface{}, bool) { - v := c.value.Load() - if c.lastValue == v { - return nil, false - } - c.lastValue = v - return v, true -} - -type Pipe struct { - Writer PipeWriter - Reader PipeReader - SignalReader func() - SignalWriter func() -} - -type PipeRequest struct { - Request interface{} // Payload - Canceled bool -} - -type PipeWriter interface { - Request() PipeRequest - Update(v interface{}) - Finalize(v interface{}, err error) - Status() PipeStatus -} - -type PipeReader interface { - Reload() bool - Cancel() - Status() PipeStatus - Request() interface{} -} - -type PipeStatus struct { - Canceled bool - Completed bool - Err error - Value interface{} -} - -func newFuncionPipe(f func(context.Context) (interface{}, error)) (*Pipe, func()) { - p := NewPipe(PipeRequest{}) - - ctx, cancel := context.WithCancel(context.TODO()) - - p.SignalReader = func() { - if req := p.Writer.Request(); req.Canceled { - cancel() - } - } - - return p, func() { - res, err := f(ctx) - if err != nil { - p.Writer.Finalize(nil, err) - return - } - p.Writer.Finalize(res, nil) - } -} - -func NewPipe(req PipeRequest) *Pipe { - cancelCh := &Channel{} - roundTripCh := &Channel{} - pw := &pipeWriter{ - req: req, - recvChannel: cancelCh, - sendChannel: roundTripCh, - } - pr := &pipeReader{ - req: req, - recvChannel: roundTripCh, - sendChannel: cancelCh, - } - - p := &Pipe{ - Writer: pw, - Reader: pr, - } - - cancelCh.Signal = func() { - v, ok := cancelCh.Receive() - if ok { - pw.setRequest(v.(PipeRequest)) - } - if p.SignalReader != nil { - p.SignalReader() - } - } - - roundTripCh.Signal = func() { - if p.SignalWriter != nil { - p.SignalWriter() - } - } - - return p -} - -type pipeWriter struct { - status PipeStatus - req PipeRequest - recvChannel *Channel - sendChannel *Channel - mu sync.Mutex -} - -func (pw *pipeWriter) Status() PipeStatus { - return pw.status -} - -func (pw *pipeWriter) Request() PipeRequest { - pw.mu.Lock() - defer pw.mu.Unlock() - return pw.req -} - -func (pw *pipeWriter) setRequest(req PipeRequest) { - pw.mu.Lock() - defer pw.mu.Unlock() - pw.req = req -} - -func (pw *pipeWriter) Update(v interface{}) { - pw.status.Value = v - pw.sendChannel.Send(pw.status) -} - -func (pw *pipeWriter) Finalize(v interface{}, err error) { - if v != nil { - pw.status.Value = v - } - pw.status.Err = err - pw.status.Completed = true - if errors.Cause(err) == context.Canceled && pw.req.Canceled { - pw.status.Canceled = true - } - pw.sendChannel.Send(pw.status) -} - -type pipeReader struct { - status PipeStatus - req PipeRequest - recvChannel *Channel - sendChannel *Channel -} - -func (pr *pipeReader) Request() interface{} { - return pr.req.Request -} - -func (pr *pipeReader) Reload() bool { - v, ok := pr.recvChannel.Receive() - if !ok { - return false - } - pr.status = v.(PipeStatus) - return true -} - -func (pr *pipeReader) Cancel() { - req := pr.req - req.Canceled = true - pr.sendChannel.Send(req) -} - -func (pr *pipeReader) Status() PipeStatus { - return pr.status -} diff --git a/solver2/solver/pipe_test.go b/solver2/solver/pipe_test.go deleted file mode 100644 index 21e371f2..00000000 --- a/solver2/solver/pipe_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package solver - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestPipe(t *testing.T) { - runCh := make(chan struct{}) - f := func(ctx context.Context) (interface{}, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-runCh: - return "res0", nil - } - } - - waitSignal := make(chan struct{}, 10) - signalled := 0 - signal := func() { - signalled++ - waitSignal <- struct{}{} - } - - p, start := newFuncionPipe(f) - p.SignalWriter = signal - go start() - require.Equal(t, false, p.Reader.Reload()) - - st := p.Reader.Status() - require.Equal(t, st.Completed, false) - require.Equal(t, st.Canceled, false) - require.Nil(t, st.Value) - require.Equal(t, signalled, 0) - - close(runCh) - <-waitSignal - - p.Reader.Reload() - st = p.Reader.Status() - require.Equal(t, st.Completed, true) - require.Equal(t, st.Canceled, false) - require.NoError(t, st.Err) - require.Equal(t, st.Value.(string), "res0") -} - -func TestPipeCancel(t *testing.T) { - runCh := make(chan struct{}) - f := func(ctx context.Context) (interface{}, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-runCh: - return "res0", nil - } - } - - waitSignal := make(chan struct{}, 10) - signalled := 0 - signal := func() { - signalled++ - waitSignal <- struct{}{} - } - - p, start := newFuncionPipe(f) - p.SignalWriter = signal - go start() - p.Reader.Reload() - - st := p.Reader.Status() - require.Equal(t, st.Completed, false) - require.Equal(t, st.Canceled, false) - require.Nil(t, st.Value) - require.Equal(t, signalled, 0) - - p.Reader.Cancel() - <-waitSignal - - p.Reader.Reload() - st = p.Reader.Status() - require.Equal(t, st.Completed, true) - require.Equal(t, st.Canceled, true) - require.Error(t, st.Err) - require.Equal(t, st.Err, context.Canceled) -} diff --git a/solver2/solver/progress.go b/solver2/solver/progress.go deleted file mode 100644 index 4e0bcb5f..00000000 --- a/solver2/solver/progress.go +++ /dev/null @@ -1,107 +0,0 @@ -package solver - -import ( - "context" - "io" - "time" - - "github.com/moby/buildkit/client" - "github.com/moby/buildkit/util/progress" - digest "github.com/opencontainers/go-digest" - "github.com/sirupsen/logrus" -) - -func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error { - vs := &vertexStream{cache: map[digest.Digest]*client.Vertex{}} - pr := j.pr.Reader(ctx) - defer func() { - if enc := vs.encore(); len(enc) > 0 { - ch <- &client.SolveStatus{Vertexes: enc} - } - }() - for { - p, err := pr.Read(ctx) - if err != nil { - if err == io.EOF { - return nil - } - return err - } - ss := &client.SolveStatus{} - for _, p := range p { - switch v := p.Sys.(type) { - case client.Vertex: - ss.Vertexes = append(ss.Vertexes, vs.append(v)...) - - case progress.Status: - vtx, ok := p.Meta("vertex") - if !ok { - logrus.Warnf("progress %s status without vertex info", p.ID) - continue - } - vs := &client.VertexStatus{ - ID: p.ID, - Vertex: vtx.(digest.Digest), - Name: v.Action, - Total: int64(v.Total), - Current: int64(v.Current), - Timestamp: p.Timestamp, - Started: v.Started, - Completed: v.Completed, - } - ss.Statuses = append(ss.Statuses, vs) - case client.VertexLog: - vtx, ok := p.Meta("vertex") - if !ok { - logrus.Warnf("progress %s log without vertex info", p.ID) - continue - } - v.Vertex = vtx.(digest.Digest) - v.Timestamp = p.Timestamp - ss.Logs = append(ss.Logs, &v) - } - } - select { - case <-ctx.Done(): - return ctx.Err() - case ch <- ss: - } - } -} - -type vertexStream struct { - cache map[digest.Digest]*client.Vertex -} - -func (vs *vertexStream) append(v client.Vertex) []*client.Vertex { - var out []*client.Vertex - vs.cache[v.Digest] = &v - if v.Cached { - for _, inp := range v.Inputs { - if inpv, ok := vs.cache[inp]; ok { - if !inpv.Cached && inpv.Completed == nil { - inpv.Cached = true - inpv.Started = v.Completed - inpv.Completed = v.Completed - } - delete(vs.cache, inp) - out = append(out, vs.append(*inpv)...) - } - } - } - vcopy := v - return append(out, &vcopy) -} - -func (vs *vertexStream) encore() []*client.Vertex { - var out []*client.Vertex - for _, v := range vs.cache { - if v.Started != nil && v.Completed == nil { - now := time.Now() - v.Completed = &now - v.Error = context.Canceled.Error() - out = append(out, v) - } - } - return out -} diff --git a/solver2/solver/result.go b/solver2/solver/result.go deleted file mode 100644 index 9ec3e88c..00000000 --- a/solver2/solver/result.go +++ /dev/null @@ -1,72 +0,0 @@ -package solver - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -// SharedResult is a result that can be cloned -type SharedResult struct { - mu sync.Mutex - main Result -} - -func NewSharedResult(main Result) *SharedResult { - return &SharedResult{main: main} -} - -func (r *SharedResult) Clone() Result { - r.mu.Lock() - defer r.mu.Unlock() - - r1, r2 := dup(r.main) - r.main = r1 - return r2 -} - -func (r *SharedResult) Release(ctx context.Context) error { - r.mu.Lock() - defer r.mu.Unlock() - return r.main.Release(ctx) -} - -func dup(res Result) (Result, Result) { - sem := int64(0) - return &splitResult{Result: res, sem: &sem}, &splitResult{Result: res, sem: &sem} -} - -type splitResult struct { - Result - released int64 - sem *int64 -} - -func (r *splitResult) Release(ctx context.Context) error { - if atomic.AddInt64(&r.released, 1) > 1 { - err := errors.Errorf("releasing already released reference") - logrus.Error(err) - return err - } - if atomic.AddInt64(r.sem, 1) == 2 { - return r.Result.Release(ctx) - } - return nil -} - -// NewCachedResult combines a result and cache key into cached result -func NewCachedResult(res Result, k CacheKey) CachedResult { - return &cachedResult{res, k} -} - -type cachedResult struct { - Result - k CacheKey -} - -func (cr *cachedResult) CacheKey() CacheKey { - return cr.k -} diff --git a/solver2/solver/scheduler.go b/solver2/solver/scheduler.go deleted file mode 100644 index 1ceb205f..00000000 --- a/solver2/solver/scheduler.go +++ /dev/null @@ -1,948 +0,0 @@ -package solver - -import ( - "context" - "sync" - - digest "github.com/opencontainers/go-digest" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -const debugScheduler = false // TODO: replace with logs in build trace - -type edgeStatusType int - -const ( - edgeStatusInitial edgeStatusType = iota - edgeStatusCacheFast - edgeStatusCacheSlow - edgeStatusComplete -) - -type EdgeFactory interface { - GetEdge(Edge) *edge - SetEdge(Edge, *edge) -} - -type edgePipe struct { - *Pipe - From, Target *edge - mu sync.Mutex -} - -func NewScheduler(ef EdgeFactory) *Scheduler { - s := &Scheduler{ - waitq: map[*edge]struct{}{}, - incoming: map[*edge][]*edgePipe{}, - outgoing: map[*edge][]*edgePipe{}, - - stopped: make(chan struct{}), - closed: make(chan struct{}), - - ef: ef, - } - s.cond = sync.NewCond(&s.mu) - - go s.loop() - - return s -} - -type Scheduler struct { - cond *sync.Cond - mu sync.Mutex - muQ sync.Mutex - - ef EdgeFactory - - waitq map[*edge]struct{} - stopped chan struct{} - stoppedOnce sync.Once - closed chan struct{} - - incoming map[*edge][]*edgePipe - outgoing map[*edge][]*edgePipe -} - -func (s *Scheduler) Stop() { - s.stoppedOnce.Do(func() { - close(s.stopped) - }) - <-s.closed -} - -func (s *Scheduler) loop() { - defer func() { - close(s.closed) - }() - - go func() { - <-s.stopped - s.mu.Lock() - s.cond.Signal() - s.mu.Unlock() - }() - - s.mu.Lock() - for { - select { - case <-s.stopped: - s.mu.Unlock() - return - default: - } - s.muQ.Lock() - q := s.waitq - s.waitq = map[*edge]struct{}{} - s.muQ.Unlock() - if len(q) == 0 { - s.cond.Wait() - continue - } - - for e := range q { - inc := make([]PipeWriter, len(s.incoming[e])) - for i, p := range s.incoming[e] { - inc[i] = p.Writer - } - out := make([]PipeReader, len(s.outgoing[e])) - for i, p := range s.outgoing[e] { - out[i] = p.Reader - } - - e.hasActiveOutgoing = false - updates := []PipeReader{} - for _, p := range out { - if ok := p.Reload(); ok { - updates = append(updates, p) - } - if !p.Status().Completed { - e.hasActiveOutgoing = true - } - } - - if debugScheduler { - logrus.Debugf(">> unpark %s req=%d upt=%d out=%d state=%d", e.edge.Vertex.Name(), len(inc), len(updates), len(out), e.state) - - for i, dep := range e.deps { - des := edgeStatusInitial - if dep.req != nil { - des = dep.req.Request().(*edgeRequest).desiredState - } - logrus.Debugf(":: dep%d %s state=%d des=%d", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des) - } - - for i, in := range inc { - req := in.Request() - logrus.Debugf("> incoming-%d: %p dstate=%d canceled=%v", i, in, req.Request.(*edgeRequest).desiredState, req.Canceled) - } - - for i, up := range updates { - if up == e.cacheMapReq { - logrus.Debugf("> update-%d: %p cacheMapReq complete=%v", i, up, up.Status().Completed) - } else if up == e.execReq { - logrus.Debugf("> update-%d: %p execReq complete=%v", i, up, up.Status().Completed) - } else { - st, ok := up.Status().Value.(*edgeState) - if ok { - index := -1 - if dep, ok := e.depRequests[up]; ok { - index = int(dep.index) - } - logrus.Debugf("> update-%d: %p input-%d keys=%d state=%d", i, up, index, len(st.keys), st.state) - } - } - } - } - - e.unpark(inc, updates, out, &pipeFactory{s: s, e: e}) - - if debugScheduler { - for i, in := range inc { - logrus.Debugf("< incoming-%d: %p completed=%v", i, in, in.Status().Completed) - } - logrus.Debugf("<< unpark %s\n", e.edge.Vertex.Name()) - } - - inc2 := make([]*edgePipe, 0, len(inc)) - for _, r := range s.incoming[e] { - if !r.Writer.Status().Completed { - inc2 = append(inc2, r) - } - } - if len(inc2) > 0 { - s.incoming[e] = inc2 - } else { - delete(s.incoming, e) - } - - out2 := make([]*edgePipe, 0, len(out)) - for _, r := range s.outgoing[e] { - if !r.Reader.Status().Completed { - out2 = append(out2, r) - } - } - if len(out2) > 0 { - s.outgoing[e] = out2 - } else { - delete(s.outgoing, e) - } - - if e.keysDidChange { - origEdge := e.index.LoadOrStore(e, e.cacheMap.Digest, e.edge.Index, e.depKeys()) - if origEdge != nil { - logrus.Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name()) - s.mergeTo(origEdge, e) - s.ef.SetEdge(e.edge, origEdge) - } - e.keysDidChange = false - } - - // avoid deadlocks. - // TODO: if these start showing up in error reports they can be changed - // to error the edge instead. They can only appear of algorithm bugs in - // unpark(), not for any external input. - if len(inc2) > 0 && len(out2) == 0 { - panic("invalid dispatch: return leaving incoming open") - } - if len(inc2) == 0 && len(out2) > 0 { - panic("invalid dispatch: return leaving outgoing open") - } - } - } -} - -func (s *Scheduler) signal(e *edge) { - s.muQ.Lock() - if _, ok := s.waitq[e]; !ok { - s.waitq[e] = struct{}{} - go func() { - s.mu.Lock() - s.muQ.Lock() - _, ok := s.waitq[e] - s.muQ.Unlock() - if !ok { - s.mu.Unlock() - return - } - s.cond.Signal() - s.mu.Unlock() - }() - } - s.muQ.Unlock() -} - -func (s *Scheduler) build(ctx context.Context, edge Edge) (CachedResult, error) { - s.mu.Lock() - e := s.ef.GetEdge(edge) - if e == nil { - s.mu.Unlock() - return nil, errors.Errorf("invalid request %v for build", edge) - } - - wait := make(chan struct{}) - - var p *Pipe - p = s.newPipe(e, nil, PipeRequest{Request: &edgeRequest{desiredState: edgeStatusComplete}}) - p.SignalWriter = func() { - p.Reader.Reload() - if p.Reader.Status().Completed { - close(wait) - } - } - s.mu.Unlock() - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - go func() { - <-ctx.Done() - p.Reader.Cancel() - }() - - <-wait - - if err := p.Reader.Status().Err; err != nil { - return nil, err - } - return p.Reader.Status().Value.(*edgeState).result, nil -} - -func (s *Scheduler) newPipe(target, from *edge, req PipeRequest) *Pipe { - p := &edgePipe{ - Pipe: NewPipe(req), - Target: target, - From: from, - } - - s.signal(target) - if from != nil { - p.SignalWriter = func() { - p.mu.Lock() - defer p.mu.Unlock() - s.signal(p.From) - } - s.outgoing[from] = append(s.outgoing[from], p) - } - s.incoming[target] = append(s.incoming[target], p) - p.SignalReader = func() { - p.mu.Lock() - defer p.mu.Unlock() - s.signal(p.Target) - } - return p.Pipe -} - -func (s *Scheduler) newRequestWithFunc(e *edge, f func(context.Context) (interface{}, error)) PipeReader { - pp, start := newFuncionPipe(f) - p := &edgePipe{ - Pipe: pp, - From: e, - } - p.SignalWriter = func() { - p.mu.Lock() - defer p.mu.Unlock() - s.signal(p.From) - } - s.outgoing[e] = append(s.outgoing[e], p) - go start() - return p.Reader -} - -func (s *Scheduler) mergeTo(target, src *edge) { - for _, inc := range s.incoming[src] { - inc.mu.Lock() - inc.Target = target - s.incoming[target] = append(s.incoming[target], inc) - inc.mu.Unlock() - } - - for _, out := range s.outgoing[src] { - out.mu.Lock() - out.From = target - s.outgoing[target] = append(s.outgoing[target], out) - out.mu.Unlock() - out.Reader.Cancel() - } - - delete(s.incoming, src) - delete(s.outgoing, src) - s.signal(target) - - // TODO(tonistiigi): merge cache providers - // TODO(tonistiigi): check ignore-cache compat before merge -} - -func newEdge(ed Edge, op activeOp, index *EdgeIndex) *edge { - e := &edge{ - edge: ed, - op: op, - depRequests: map[PipeReader]*dep{}, - cacheRecords: map[string]*CacheRecord{}, - index: index, - } - return e -} - -type edge struct { - edge Edge - op activeOp - - edgeState - depRequests map[PipeReader]*dep - deps []*dep - - cacheMapReq PipeReader - execReq PipeReader - err error - cacheRecords map[string]*CacheRecord - - noCacheMatchPossible bool - allDepsCompletedCacheFast bool - allDepsCompletedCacheSlow bool - allDepsCompleted bool - hasActiveOutgoing bool - - releaserCount int - keysDidChange bool - index *EdgeIndex -} - -// dep holds state for a dependant edge -type dep struct { - req PipeReader - edgeState - index Index - cacheRecords map[string]*CacheRecord - desiredState edgeStatusType - e *edge - slowCacheReq PipeReader // TODO: reuse req - slowCacheComplete bool - slowCacheKey CacheKey - err error -} - -func newDep(i Index) *dep { - return &dep{index: i, cacheRecords: map[string]*CacheRecord{}} -} - -type edgeState struct { - state edgeStatusType - result CachedResult - cacheMap *CacheMap - keys []CacheKey -} - -func isEqualState(s1, s2 edgeState) bool { - if s1.state != s2.state || s1.result != s2.result || s1.cacheMap != s2.cacheMap || len(s1.keys) != len(s2.keys) { - return false - } - return true -} - -type edgeRequest struct { - desiredState edgeStatusType - currentState edgeState -} - -func (e *edge) duplicateReleaser() { - e.releaserCount += 1 -} - -func (e *edge) release() { - if e.releaserCount > 0 { - e.releaserCount-- - return - } - e.index.Release(e) -} - -// commitOptions returns parameters for the op execution -func (e *edge) commitOptions() (CacheKey, []Result) { - if e.deps == nil { - return NewCacheKey(e.cacheMap.Digest, e.edge.Index, nil), nil - } - - inputs := make([]CacheKey, len(e.deps)) - results := make([]Result, len(e.deps)) - for i, dep := range e.deps { - inputs[i] = dep.result.CacheKey() - if dep.slowCacheKey != nil { - inputs[i] = NewCacheKey("", 0, []CacheKey{inputs[i], dep.slowCacheKey}) - } - results[i] = dep.result - } - return NewCacheKey(e.cacheMap.Digest, e.edge.Index, inputs), results -} - -func (e *edge) isComplete() bool { - return e.err != nil || e.result != nil -} - -func (e *edge) cancelPipes(pipes []PipeReader) { - for _, p := range pipes { - p.Cancel() - } -} - -func (e *edge) finishIncoming(req PipeWriter) { - err := e.err - if req.Request().Canceled && err == nil { - err = context.Canceled - } - if debugScheduler { - logrus.Debugf("finishIncoming %s %v %#v %v", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Request.(*edgeRequest).desiredState) - } - req.Finalize(&e.edgeState, err) -} - -func (e *edge) updateIncoming(req PipeWriter) { - req.Update(&e.edgeState) -} - -// probeCache is called with unprocessed cache keys for dependency -// if the key could match the edge the cacheRecords for dependency are filled -func (e *edge) probeCache(d *dep, keys []CacheKey) { - if len(keys) == 0 { - return - } - records, err := e.op.Cache().Query(keys, d.index, e.cacheMap.Digest, e.edge.Index) - if err != nil { - e.err = errors.Wrap(err, "error on cache query") - } - for _, r := range records { - if _, ok := d.cacheRecords[r.ID]; !ok { - d.cacheRecords[r.ID] = r - } - } -} - -// checkDepMatchPossible checks if any cache matches are possible pass this point -func (e *edge) checkDepMatchPossible(dep *dep) { - depHasSlowCache := e.cacheMap.Deps[dep.index].ComputeDigestFunc != nil - if !e.noCacheMatchPossible && ((dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state == edgeStatusCacheFast) && len(dep.cacheRecords) == 0) { - e.noCacheMatchPossible = true - } -} - -// slowCacheFunc returns the result based cache func for dependency if it exists -func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc { - if e.cacheMap == nil { - return nil - } - return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc -} - -// allDepsHaveKeys checks if all dependencies have at least one key. used for -// determining if there is enough data for combining cache key for edge -func (e *edge) allDepsHaveKeys() bool { - for _, d := range e.deps { - if len(d.keys) == 0 { - return false - } - } - return true -} - -// depKeys returns all current dependency cache keys -func (e *edge) depKeys() [][]CacheKey { - keys := make([][]CacheKey, len(e.deps)) - for i, d := range e.deps { - keys[i] = d.keys - if d.result != nil { - keys[i] = append(keys[i], d.result.CacheKey()) - } - if d.slowCacheKey != nil { - keys[i] = append(keys[i], d.slowCacheKey) - } - } - return keys -} - -// slow cache keys can be computed in 2 phases if there are multiple deps. -// first evaluate ones that didn't match any definition based keys -func (e *edge) skipPhase2SlowCache(dep *dep) bool { - isPhase1 := false - for _, dep := range e.deps { - if !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.cacheRecords) == 0 { - isPhase1 = true - break - } - } - - if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.cacheRecords) > 0 { - return true - } - return false -} - -// unpark is called by the scheduler with incoming requests and updates for -// previous calls. -// To avoid deadlocks and resource leaks this function needs to follow -// following rules: -// 1) this function needs to return unclosed outgoing requests if some incoming -// requests were not completed -// 2) this function may not return outgoing requests if it has completed all -// incoming requests -func (e *edge) unpark(incoming []PipeWriter, updates, allPipes []PipeReader, f *pipeFactory) { - // TODO: split into helper functions - - // process all latest changes - depChanged := false - for _, upt := range updates { - // response for cachemap request - if upt == e.cacheMapReq && upt.Status().Completed { - if err := upt.Status().Err; err != nil { - if e.err == nil { - e.err = err - } - } else { - e.cacheMap = upt.Status().Value.(*CacheMap) - if len(e.deps) == 0 { - k := NewCacheKey(e.cacheMap.Digest, e.edge.Index, nil) - records, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index) - if err != nil { - logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error - } else { - for _, r := range records { - e.cacheRecords[r.ID] = r - } - if len(records) > 0 { - e.keys = append(e.keys, k) - } - if e.allDepsHaveKeys() { - e.keysDidChange = true - } - } - e.state = edgeStatusCacheSlow - } - // probe keys that were loaded before cache map - for _, dep := range e.deps { - e.probeCache(dep, dep.keys) - e.checkDepMatchPossible(dep) - } - depChanged = true - } - // response for exec request - } else if upt == e.execReq && upt.Status().Completed { - if err := upt.Status().Err; err != nil { - if e.err == nil { - e.err = err - } - } else { - e.result = upt.Status().Value.(CachedResult) - e.state = edgeStatusComplete - } - // response for requests to dependencies - } else if dep, ok := e.depRequests[upt]; ok { // TODO: ignore canceled - if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil { - if e.err == nil { - e.err = err - } - dep.err = err - } - - state := upt.Status().Value.(*edgeState) - - if len(dep.keys) < len(state.keys) { - newKeys := state.keys[len(dep.keys):] - - if e.cacheMap != nil { - e.probeCache(dep, newKeys) - if e.allDepsHaveKeys() { - e.keysDidChange = true - } - } - depChanged = true - } - if dep.state != edgeStatusComplete && state.state == edgeStatusComplete { - e.keysDidChange = true - } - - recheck := state.state != dep.state - - dep.edgeState = *state - - if recheck && e.cacheMap != nil { - e.checkDepMatchPossible(dep) - depChanged = true - } - - // set current state - // add to probedKeys - } else { - for _, dep := range e.deps { - if upt == dep.slowCacheReq && upt.Status().Completed { - if err := upt.Status().Err; err != nil && e.err == nil { - e.err = upt.Status().Err - } else if !dep.slowCacheComplete { - k := NewCacheKey(upt.Status().Value.(digest.Digest), -1, nil) - dep.slowCacheKey = k - e.probeCache(dep, []CacheKey{k}) - dep.slowCacheComplete = true - e.keysDidChange = true - } - // dep.slowCacheReq = nil - depChanged = true - } - } - } - } - - // the dep responses had changes. need to reevaluate edge state - if depChanged { - // TODO: fast pass to detect incomplete results - newRecords := map[string]*CacheRecord{} - - for i, dep := range e.deps { - if i == 0 { - for key, r := range dep.cacheRecords { - if _, ok := e.cacheRecords[key]; ok { - continue - } - newRecords[key] = r - } - } else { - for key := range newRecords { - if _, ok := dep.cacheRecords[key]; !ok { - delete(newRecords, key) - } - } - } - if len(newRecords) == 0 { - break - } - } - - for k, r := range newRecords { - e.keys = append(e.keys, r.CacheKey) - e.cacheRecords[k] = r - } - - // detect lower/upper bound for current state - allDepsCompletedCacheFast := true - allDepsCompletedCacheSlow := true - allDepsCompleted := true - stLow := edgeStatusInitial - stHigh := edgeStatusCacheSlow - if e.cacheMap != nil { - for _, dep := range e.deps { - isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete)) - if dep.state > stLow && len(dep.cacheRecords) == 0 && !isSlowIncomplete { - stLow = dep.state - if stLow > edgeStatusCacheSlow { - stLow = edgeStatusCacheSlow - } - } - if dep.state < stHigh { - stHigh = dep.state - } - if isSlowIncomplete || dep.state < edgeStatusComplete { - allDepsCompleted = false - } - if dep.state < edgeStatusCacheFast { - allDepsCompletedCacheFast = false - } - if isSlowIncomplete || dep.state < edgeStatusCacheSlow { - allDepsCompletedCacheSlow = false - } - } - if stHigh > e.state { - e.state = stHigh - } - if stLow > e.state { - e.state = stLow - } - e.allDepsCompletedCacheFast = allDepsCompletedCacheFast - e.allDepsCompletedCacheSlow = allDepsCompletedCacheSlow - e.allDepsCompleted = allDepsCompleted - } - } - - // detect the result state for the requests - allIncomingCanComplete := true - desiredState := e.state - - // check incoming requests - // check if all requests can be either answered - if !e.isComplete() { - for _, req := range incoming { - if !req.Request().Canceled { - if r := req.Request().Request.(*edgeRequest); desiredState < r.desiredState { - desiredState = r.desiredState - allIncomingCanComplete = false - } - } - } - } - - // do not set allIncomingCanComplete if some e.state != edgeStateComplete dep.state < e.state && len(e.keys) == 0 - hasIncompleteDeps := false - if e.state < edgeStatusComplete && len(e.keys) == 0 { - for _, dep := range e.deps { - if dep.err == nil && dep.state < e.state { - hasIncompleteDeps = true - break - } - } - } - if hasIncompleteDeps { - allIncomingCanComplete = false - } - - if debugScheduler { - logrus.Debugf("status state=%d cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast) - } - - if allIncomingCanComplete && e.hasActiveOutgoing { - // cancel all current requests - e.cancelPipes(allPipes) - - // can close all but one requests - var leaveOpen PipeWriter - for _, req := range incoming { - if !req.Request().Canceled { - leaveOpen = req - break - } - } - for _, req := range incoming { - if leaveOpen == nil || leaveOpen == req { - leaveOpen = req - continue - } - e.finishIncoming(req) - } - return - } - - // can complete, finish and return - if allIncomingCanComplete && !e.hasActiveOutgoing { - for _, req := range incoming { - e.finishIncoming(req) - } - return - } - - // update incoming based on current state - for _, req := range incoming { - r := req.Request().Request.(*edgeRequest) - if !hasIncompleteDeps && (e.state >= r.desiredState || req.Request().Canceled) { - e.finishIncoming(req) - } else if !isEqualState(r.currentState, e.edgeState) { - e.updateIncoming(req) - } - } - - // set up new outgoing requests if needed - - if e.cacheMapReq == nil { - e.cacheMapReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { - return e.op.CacheMap(ctx) - }) - } - - // initialize deps state - if e.deps == nil { - e.depRequests = make(map[PipeReader]*dep) - e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs())) - for i := range e.edge.Vertex.Inputs() { - e.deps = append(e.deps, newDep(Index(i))) - } - } - - // cycle all dependencies. set up outgoing requests if needed - for _, dep := range e.deps { - desiredStateDep := dep.state - - if e.noCacheMatchPossible { - desiredStateDep = desiredState - } else if dep.state == edgeStatusInitial && desiredState > dep.state { - desiredStateDep = edgeStatusCacheFast - } else if dep.state == edgeStatusCacheFast && desiredState > dep.state { - if e.allDepsCompletedCacheFast && len(e.keys) == 0 { - desiredStateDep = edgeStatusCacheSlow - } - } else if dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete { - if (e.allDepsCompletedCacheSlow || e.slowCacheFunc(dep) != nil) && len(e.keys) == 0 { - if !e.skipPhase2SlowCache(dep) { - desiredStateDep = edgeStatusComplete - } - } - } else if dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow { - if !e.skipPhase2SlowCache(dep) { - desiredStateDep = edgeStatusComplete - } - } - - // outgoing request is needed - if dep.state < desiredStateDep { - addNew := true - if dep.req != nil && !dep.req.Status().Completed { - if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep { - dep.req.Cancel() - } else { - addNew = false - } - } - if addNew { - req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{ - currentState: dep.edgeState, - desiredState: desiredStateDep, - }) - e.depRequests[req] = dep - dep.req = req - } - } else if dep.req != nil && !dep.req.Status().Completed { - dep.req.Cancel() - } - - // initialize function to compute cache key based on dependency result - if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && e.slowCacheFunc(dep) != nil && e.cacheMap != nil { - fn := e.slowCacheFunc(dep) - res := dep.result - func(fn ResultBasedCacheFunc, res Result, index Index) { - dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { - return e.op.CalcSlowCache(ctx, index, fn, res) - }) - }(fn, res, dep.index) - } - } - - // execute op - if e.execReq == nil && desiredState == edgeStatusComplete { - if e.keysDidChange { - // postpone executing to next invocation if we have unprocessed keys - f.NewFuncRequest(func(context.Context) (interface{}, error) { - return nil, nil - }) - return - } - if len(e.keys) > 0 { - e.execReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { - var rec *CacheRecord - for _, r := range e.cacheRecords { // TODO: time/priority order - rec = r - break - } - logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID) - res, err := e.op.Cache().Load(ctx, rec) - if err != nil { - return nil, err - } - return NewCachedResult(res, rec.CacheKey), nil - }) - for req := range e.depRequests { - req.Cancel() - } - } else if e.allDepsCompleted { - e.execReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { - cacheKey, inputs := e.commitOptions() - results, err := e.op.Exec(ctx, inputs) - if err != nil { - return nil, err - } - - index := e.edge.Index - if len(results) <= int(index) { - return nil, errors.Errorf("invalid response from exec need %d index but %d results received", index, len(results)) - } - - res := results[int(index)] - - ck, err := e.op.Cache().Save(cacheKey, res) - if err != nil { - return nil, err - } - return NewCachedResult(res, ck), nil - }) - } - } - -} - -type pipeFactory struct { - e *edge - s *Scheduler -} - -func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) PipeReader { - target := pf.s.ef.GetEdge(ee) - if target == nil { - panic("failed to get edge") // TODO: return errored pipe - } - p := pf.s.newPipe(target, pf.e, PipeRequest{Request: req}) - if debugScheduler { - logrus.Debugf("> newPipe %s %p desiredState=%d", ee.Vertex.Name(), p, req.desiredState) - } - return p.Reader -} - -func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, error)) PipeReader { - p := pf.s.newRequestWithFunc(pf.e, f) - if debugScheduler { - logrus.Debugf("> newFunc %p", p) - } - return p -} diff --git a/solver2/solver/scheduler_test.go b/solver2/solver/scheduler_test.go deleted file mode 100644 index caf2afc8..00000000 --- a/solver2/solver/scheduler_test.go +++ /dev/null @@ -1,1422 +0,0 @@ -package solver - -import ( - "context" - _ "crypto/sha256" - "fmt" - "math" - "math/rand" - "os" - "sync/atomic" - "testing" - "time" - - "github.com/moby/buildkit/identity" - digest "github.com/opencontainers/go-digest" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" -) - -func init() { - if debugScheduler { - logrus.SetOutput(os.Stdout) - logrus.SetLevel(logrus.DebugLevel) - } -} - -func TestSingleLevelActiveGraph(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - s := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - }) - defer s.Close() - - j0, err := s.NewJob("job0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - g0 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - value: "result0", - }), - } - g0.Vertex.(*vertex).setupCallCounters() - - res, err := j0.Build(ctx, g0) - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, unwrap(res), "result0") - - require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1)) - - // calling again with same digest just uses the active queue - j1, err := s.NewJob("job1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - g1 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - value: "result1", - }), - } - g1.Vertex.(*vertex).setupCallCounters() - - res, err = j1.Build(ctx, g1) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1)) - require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(0)) - require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(0)) - - require.NoError(t, j0.Discard()) - j0 = nil - - // after discarding j0, j1 still holds the state - - j2, err := s.NewJob("job2") - require.NoError(t, err) - - defer func() { - if j2 != nil { - j2.Discard() - } - }() - - g2 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - value: "result2", - }), - } - g2.Vertex.(*vertex).setupCallCounters() - - res, err = j2.Build(ctx, g2) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1)) - require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(0)) - require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(0)) - require.Equal(t, *g2.Vertex.(*vertex).cacheCallCount, int64(0)) - require.Equal(t, *g2.Vertex.(*vertex).execCallCount, int64(0)) - - require.NoError(t, j1.Discard()) - j1 = nil - require.NoError(t, j2.Discard()) - j2 = nil - - // everything should be released now - - j3, err := s.NewJob("job3") - require.NoError(t, err) - - defer func() { - if j3 != nil { - j3.Discard() - } - }() - - g3 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - value: "result3", - }), - } - g3.Vertex.(*vertex).setupCallCounters() - - res, err = j3.Build(ctx, g3) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result3") - - require.Equal(t, *g3.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g3.Vertex.(*vertex).execCallCount, int64(1)) - - require.NoError(t, j3.Discard()) - j3 = nil - - // repeat the same test but make sure the build run in parallel now - - j4, err := s.NewJob("job4") - require.NoError(t, err) - - defer func() { - if j4 != nil { - j4.Discard() - } - }() - - j5, err := s.NewJob("job5") - require.NoError(t, err) - - defer func() { - if j5 != nil { - j5.Discard() - } - }() - - g4 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheDelay: 100 * time.Millisecond, - value: "result4", - }), - } - g4.Vertex.(*vertex).setupCallCounters() - - eg, _ := errgroup.WithContext(ctx) - - eg.Go(func() error { - res, err := j4.Build(ctx, g4) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result4") - return err - }) - - eg.Go(func() error { - res, err := j5.Build(ctx, g4) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result4") - return err - }) - - require.NoError(t, eg.Wait()) - - require.Equal(t, *g4.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g4.Vertex.(*vertex).execCallCount, int64(1)) -} - -func TestSingleLevelCache(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - s := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - }) - defer s.Close() - - j0, err := s.NewJob("job0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - g0 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheKeySeed: "seed0", - value: "result0", - }), - } - g0.Vertex.(*vertex).setupCallCounters() - - res, err := j0.Build(ctx, g0) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.NoError(t, j0.Discard()) - j0 = nil - - // first try that there is no match for different cache - j1, err := s.NewJob("job1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - g1 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v1", - cacheKeySeed: "seed1", - value: "result1", - }), - } - g1.Vertex.(*vertex).setupCallCounters() - - res, err = j1.Build(ctx, g1) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result1") - - require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(1)) - - require.NoError(t, j1.Discard()) - j1 = nil - - // expect cache match for first build - - j2, err := s.NewJob("job2") - require.NoError(t, err) - - defer func() { - if j2 != nil { - j2.Discard() - } - }() - - g2 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v2", - cacheKeySeed: "seed0", // same as first build - value: "result2", - }), - } - g2.Vertex.(*vertex).setupCallCounters() - - res, err = j2.Build(ctx, g2) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(1)) - require.Equal(t, *g2.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g2.Vertex.(*vertex).execCallCount, int64(0)) - - require.NoError(t, j2.Discard()) - j2 = nil - -} - -func TestSingleLevelCacheParallel(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - s := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - }) - defer s.Close() - - // rebuild in parallel. only executed once. - - j0, err := s.NewJob("job0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - wait2Ready := blockingFuncion(2) - - g0 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheKeySeed: "seed0", - cachePreFunc: wait2Ready, - value: "result0", - }), - } - g0.Vertex.(*vertex).setupCallCounters() - - j1, err := s.NewJob("job1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - g1 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v1", - cacheKeySeed: "seed0", // same as g0 - cachePreFunc: wait2Ready, - value: "result0", - }), - } - g1.Vertex.(*vertex).setupCallCounters() - - eg, _ := errgroup.WithContext(ctx) - - eg.Go(func() error { - res, err := j0.Build(ctx, g0) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - return err - }) - - eg.Go(func() error { - res, err := j1.Build(ctx, g1) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - return err - }) - - require.NoError(t, eg.Wait()) - - require.Equal(t, int64(1), *g0.Vertex.(*vertex).cacheCallCount) - require.Equal(t, int64(1), *g1.Vertex.(*vertex).cacheCallCount) - // only one execution ran - require.Equal(t, int64(1), *g0.Vertex.(*vertex).execCallCount+*g1.Vertex.(*vertex).execCallCount) - -} - -func TestMultiLevelCacheParallel(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - s := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - }) - defer s.Close() - - // rebuild in parallel. only executed once. - - j0, err := s.NewJob("job0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - wait2Ready := blockingFuncion(2) - wait2Ready2 := blockingFuncion(2) - - g0 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheKeySeed: "seed0", - cachePreFunc: wait2Ready, - value: "result0", - inputs: []Edge{{ - Vertex: vtx(vtxOpt{ - name: "v0-c0", - cacheKeySeed: "seed0-c0", - cachePreFunc: wait2Ready2, - value: "result0-c0", - })}, - }, - }), - } - g0.Vertex.(*vertex).setupCallCounters() - - j1, err := s.NewJob("job1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - g1 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v1", - cacheKeySeed: "seed0", // same as g0 - cachePreFunc: wait2Ready, - value: "result0", - inputs: []Edge{{ - Vertex: vtx(vtxOpt{ - name: "v1-c0", - cacheKeySeed: "seed0-c0", // same as g0 - cachePreFunc: wait2Ready2, - value: "result0-c", - })}, - }, - }), - } - g1.Vertex.(*vertex).setupCallCounters() - - eg, _ := errgroup.WithContext(ctx) - - eg.Go(func() error { - res, err := j0.Build(ctx, g0) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - return err - }) - - eg.Go(func() error { - res, err := j1.Build(ctx, g1) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - return err - }) - - require.NoError(t, eg.Wait()) - - require.Equal(t, int64(2), *g0.Vertex.(*vertex).cacheCallCount) - require.Equal(t, int64(2), *g1.Vertex.(*vertex).cacheCallCount) - require.Equal(t, int64(2), *g0.Vertex.(*vertex).execCallCount+*g1.Vertex.(*vertex).execCallCount) -} - -func TestSingleCancelCache(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - s := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - }) - defer s.Close() - - j0, err := s.NewJob("job0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - ctx, cancel := context.WithCancel(ctx) - - g0 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cachePreFunc: func(ctx context.Context) error { - cancel() - <-ctx.Done() - return nil // error should still come from context - }, - }), - } - g0.Vertex.(*vertex).setupCallCounters() - - _, err = j0.Build(ctx, g0) - require.Error(t, err) - require.Equal(t, errors.Cause(err), context.Canceled) - - require.Equal(t, *g0.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g0.Vertex.(*vertex).execCallCount, int64(0)) - - require.NoError(t, j0.Discard()) - j0 = nil - -} -func TestSingleCancelExec(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - s := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - }) - defer s.Close() - - j1, err := s.NewJob("job1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - ctx, cancel := context.WithCancel(ctx) - - g1 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v2", - execPreFunc: func(ctx context.Context) error { - cancel() - <-ctx.Done() - return nil // error should still come from context - }, - }), - } - g1.Vertex.(*vertex).setupCallCounters() - - _, err = j1.Build(ctx, g1) - require.Error(t, err) - require.Equal(t, errors.Cause(err), context.Canceled) - - require.Equal(t, *g1.Vertex.(*vertex).cacheCallCount, int64(1)) - require.Equal(t, *g1.Vertex.(*vertex).execCallCount, int64(1)) - - require.NoError(t, j1.Discard()) - j1 = nil -} - -func TestSingleCancelParallel(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - s := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - }) - defer s.Close() - - // run 2 in parallel cancel first, second one continues without errors - eg, ctx := errgroup.WithContext(ctx) - - firstReady := make(chan struct{}) - firstErrored := make(chan struct{}) - - eg.Go(func() error { - j, err := s.NewJob("job2") - require.NoError(t, err) - - defer func() { - if j != nil { - j.Discard() - } - }() - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - g := Edge{ - Vertex: vtx(vtxOpt{ - name: "v2", - value: "result2", - cachePreFunc: func(ctx context.Context) error { - close(firstReady) - time.Sleep(200 * time.Millisecond) - cancel() - <-firstErrored - return nil - }, - }), - } - - _, err = j.Build(ctx, g) - close(firstErrored) - require.Error(t, err) - require.Equal(t, errors.Cause(err), context.Canceled) - return nil - }) - - eg.Go(func() error { - j, err := s.NewJob("job3") - require.NoError(t, err) - - defer func() { - if j != nil { - j.Discard() - } - }() - - g := Edge{ - Vertex: vtx(vtxOpt{ - name: "v2", - }), - } - <-firstReady - - res, err := j.Build(ctx, g) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result2") - return err - }) - - require.NoError(t, eg.Wait()) -} - -func TestMultiLevelCalculation(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - l := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - }) - defer l.Close() - - j0, err := l.NewJob("j0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - g := Edge{ - Vertex: vtxSum(1, vtxOpt{ - inputs: []Edge{ - {Vertex: vtxSum(0, vtxOpt{ - inputs: []Edge{ - {Vertex: vtxConst(7, vtxOpt{})}, - {Vertex: vtxConst(2, vtxOpt{})}, - }, - })}, - {Vertex: vtxSum(0, vtxOpt{ - inputs: []Edge{ - {Vertex: vtxConst(7, vtxOpt{})}, - {Vertex: vtxConst(2, vtxOpt{})}, - }, - })}, - {Vertex: vtxConst(2, vtxOpt{})}, - {Vertex: vtxConst(2, vtxOpt{})}, - {Vertex: vtxConst(19, vtxOpt{})}, - }, - }), - } - - res, err := j0.Build(ctx, g) - require.NoError(t, err) - require.Equal(t, unwrapInt(res), 42) // 1 + 2*(7 + 2) + 2 + 2 + 19 - - require.NoError(t, j0.Discard()) - j0 = nil - - j1, err := l.NewJob("j1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - g2 := Edge{ - Vertex: vtxSum(1, vtxOpt{ - inputs: []Edge{ - {Vertex: vtxSum(0, vtxOpt{ - inputs: []Edge{ - {Vertex: vtxConst(7, vtxOpt{})}, - {Vertex: vtxConst(2, vtxOpt{})}, - }, - })}, - {Vertex: vtxSum(0, vtxOpt{ - inputs: []Edge{ - {Vertex: vtxConst(7, vtxOpt{})}, - {Vertex: vtxConst(2, vtxOpt{})}, - }, - })}, - {Vertex: vtxConst(2, vtxOpt{})}, - {Vertex: vtxConst(2, vtxOpt{})}, - {Vertex: vtxConst(19, vtxOpt{})}, - }, - }), - } - res, err = j1.Build(ctx, g2) - require.NoError(t, err) - require.Equal(t, unwrapInt(res), 42) - -} - -func TestHugeGraph(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - rand.Seed(time.Now().UnixNano()) - - cacheManager := newTrackingCacheManager(NewInMemoryCacheManager()) - - l := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - DefaultCache: cacheManager, - }) - defer l.Close() - - j0, err := l.NewJob("j0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - nodes := 1000 - - g, v := generateSubGraph(nodes) - // printGraph(g, "") - g.Vertex.(*vertexSum).setupCallCounters() - - res, err := j0.Build(ctx, g) - require.NoError(t, err) - require.Equal(t, unwrapInt(res), v) - require.Equal(t, int64(nodes), *g.Vertex.(*vertexSum).cacheCallCount) - // execCount := *g.Vertex.(*vertexSum).execCallCount - // require.True(t, execCount < 1000) - // require.True(t, execCount > 600) - require.Equal(t, int64(0), cacheManager.loadCounter) - - require.NoError(t, j0.Discard()) - j0 = nil - - j1, err := l.NewJob("j1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - g.Vertex.(*vertexSum).setupCallCounters() - res, err = j1.Build(ctx, g) - require.NoError(t, err) - require.Equal(t, unwrapInt(res), v) - - require.Equal(t, int64(nodes), *g.Vertex.(*vertexSum).cacheCallCount) - require.Equal(t, int64(0), *g.Vertex.(*vertexSum).execCallCount) - require.Equal(t, int64(1), cacheManager.loadCounter) -} - -// TestOptimizedCacheAccess tests that inputs are not loaded from cache unless -// they are really needed -func TestOptimizedCacheAccess(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - cacheManager := newTrackingCacheManager(NewInMemoryCacheManager()) - - l := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - DefaultCache: cacheManager, - }) - defer l.Close() - - j0, err := l.NewJob("j0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - g0 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheKeySeed: "seed0", - value: "result0", - inputs: []Edge{ - {Vertex: vtx(vtxOpt{ - name: "v1", - cacheKeySeed: "seed1", - value: "result1", - })}, - {Vertex: vtx(vtxOpt{ - name: "v2", - cacheKeySeed: "seed2", - value: "result2", - })}, - }, - slowCacheCompute: map[int]ResultBasedCacheFunc{ - 1: digestFromResult, - }, - }), - } - g0.Vertex.(*vertex).setupCallCounters() - - res, err := j0.Build(ctx, g0) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.Equal(t, int64(3), *g0.Vertex.(*vertex).cacheCallCount) - require.Equal(t, int64(3), *g0.Vertex.(*vertex).execCallCount) - require.Equal(t, int64(0), cacheManager.loadCounter) - - require.NoError(t, j0.Discard()) - j0 = nil - - // changing cache seed for the input with slow cache should not pull result1 - - j1, err := l.NewJob("j1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - g1 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheKeySeed: "seed0", - value: "result0-nocache", - inputs: []Edge{ - {Vertex: vtx(vtxOpt{ - name: "v1", - cacheKeySeed: "seed1", - value: "result1-nocache", - })}, - {Vertex: vtx(vtxOpt{ - name: "v2-changed", - cacheKeySeed: "seed2-changed", - value: "result2", // produces same slow key as g0 - })}, - }, - slowCacheCompute: map[int]ResultBasedCacheFunc{ - 1: digestFromResult, - }, - }), - } - g1.Vertex.(*vertex).setupCallCounters() - - res, err = j1.Build(ctx, g1) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.Equal(t, int64(3), *g1.Vertex.(*vertex).cacheCallCount) - require.Equal(t, int64(1), *g1.Vertex.(*vertex).execCallCount) - require.Equal(t, int64(1), cacheManager.loadCounter) - - require.NoError(t, j1.Discard()) - j1 = nil -} - -// TestOptimizedCacheAccess2 is a more narrow case that tests that inputs are -// not loaded from cache unless they are really needed. Inputs that match by -// definition should be less prioritized for slow cache calculation than the -// inputs that didn't. -func TestOptimizedCacheAccess2(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - cacheManager := newTrackingCacheManager(NewInMemoryCacheManager()) - - l := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - DefaultCache: cacheManager, - }) - defer l.Close() - - j0, err := l.NewJob("j0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - g0 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheKeySeed: "seed0", - value: "result0", - inputs: []Edge{ - {Vertex: vtx(vtxOpt{ - name: "v1", - cacheKeySeed: "seed1", - value: "result1", - })}, - {Vertex: vtx(vtxOpt{ - name: "v2", - cacheKeySeed: "seed2", - value: "result2", - })}, - }, - slowCacheCompute: map[int]ResultBasedCacheFunc{ - 0: digestFromResult, - 1: digestFromResult, - }, - }), - } - g0.Vertex.(*vertex).setupCallCounters() - - res, err := j0.Build(ctx, g0) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.Equal(t, int64(3), *g0.Vertex.(*vertex).cacheCallCount) - require.Equal(t, int64(3), *g0.Vertex.(*vertex).execCallCount) - require.Equal(t, int64(0), cacheManager.loadCounter) - - require.NoError(t, j0.Discard()) - j0 = nil - - // changing cache seed for the input with slow cache should not pull result1 - - j1, err := l.NewJob("j1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - g1 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheKeySeed: "seed0", - value: "result0-nocache", - inputs: []Edge{ - {Vertex: vtx(vtxOpt{ - name: "v1", - cacheKeySeed: "seed1", - value: "result1", - })}, - {Vertex: vtx(vtxOpt{ - name: "v2-changed", - cacheKeySeed: "seed2-changed", - value: "result2", // produces same slow key as g0 - })}, - }, - slowCacheCompute: map[int]ResultBasedCacheFunc{ - 0: digestFromResult, - 1: digestFromResult, - }, - }), - } - g1.Vertex.(*vertex).setupCallCounters() - - res, err = j1.Build(ctx, g1) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.Equal(t, int64(3), *g1.Vertex.(*vertex).cacheCallCount) - require.Equal(t, int64(1), *g1.Vertex.(*vertex).execCallCount) - require.Equal(t, int64(1), cacheManager.loadCounter) // v1 is never loaded nor executed - - require.NoError(t, j1.Discard()) - j1 = nil - - // make sure that both inputs are still used for slow cache hit - j2, err := l.NewJob("j2") - require.NoError(t, err) - - defer func() { - if j2 != nil { - j2.Discard() - } - }() - - g2 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheKeySeed: "seed0", - value: "result0-nocache", - inputs: []Edge{ - {Vertex: vtx(vtxOpt{ - name: "v1", - cacheKeySeed: "seed1-changed2", - value: "result1", - })}, - {Vertex: vtx(vtxOpt{ - name: "v2-changed", - cacheKeySeed: "seed2-changed2", - value: "result2", // produces same slow key as g0 - })}, - }, - slowCacheCompute: map[int]ResultBasedCacheFunc{ - 0: digestFromResult, - 1: digestFromResult, - }, - }), - } - g2.Vertex.(*vertex).setupCallCounters() - - res, err = j2.Build(ctx, g2) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.Equal(t, int64(3), *g2.Vertex.(*vertex).cacheCallCount) - require.Equal(t, int64(2), *g2.Vertex.(*vertex).execCallCount) - require.Equal(t, int64(2), cacheManager.loadCounter) - - require.NoError(t, j2.Discard()) - j1 = nil -} - -func TestSlowCache(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - rand.Seed(time.Now().UnixNano()) - - l := NewJobList(SolverOpt{ - ResolveOpFunc: testOpResolver, - }) - defer l.Close() - - j0, err := l.NewJob("j0") - require.NoError(t, err) - - defer func() { - if j0 != nil { - j0.Discard() - } - }() - - g0 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v0", - cacheKeySeed: "seed0", - value: "result0", - inputs: []Edge{ - {Vertex: vtx(vtxOpt{ - name: "v1", - cacheKeySeed: "seed1", - value: "result1", - })}, - }, - slowCacheCompute: map[int]ResultBasedCacheFunc{ - 0: digestFromResult, - }, - }), - } - - res, err := j0.Build(ctx, g0) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.NoError(t, j0.Discard()) - j0 = nil - - j1, err := l.NewJob("j1") - require.NoError(t, err) - - defer func() { - if j1 != nil { - j1.Discard() - } - }() - - g1 := Edge{ - Vertex: vtx(vtxOpt{ - name: "v2", - cacheKeySeed: "seed0", - value: "not-cached", - inputs: []Edge{ - {Vertex: vtx(vtxOpt{ - name: "v3", - cacheKeySeed: "seed3", - value: "result1", // used for slow key - })}, - }, - slowCacheCompute: map[int]ResultBasedCacheFunc{ - 0: digestFromResult, - }, - }), - } - - res, err = j1.Build(ctx, g1) - require.NoError(t, err) - require.Equal(t, unwrap(res), "result0") - - require.NoError(t, j1.Discard()) - j1 = nil - -} - -func generateSubGraph(nodes int) (Edge, int) { - if nodes == 1 { - value := rand.Int() % 500 - return Edge{Vertex: vtxConst(value, vtxOpt{})}, value - } - spread := rand.Int()%5 + 2 - inc := int(math.Ceil(float64(nodes) / float64(spread))) - if inc > nodes { - inc = nodes - } - added := 1 - value := 0 - inputs := []Edge{} - i := 0 - for { - i++ - if added >= nodes { - break - } - if added+inc > nodes { - inc = nodes - added - } - e, v := generateSubGraph(inc) - inputs = append(inputs, e) - value += v - added += inc - } - extra := rand.Int() % 500 - value += extra - return Edge{Vertex: vtxSum(extra, vtxOpt{inputs: inputs})}, value -} - -type vtxOpt struct { - name string - cacheKeySeed string - execDelay time.Duration - cacheDelay time.Duration - cachePreFunc func(context.Context) error - execPreFunc func(context.Context) error - inputs []Edge - value string - slowCacheCompute map[int]ResultBasedCacheFunc -} - -func vtx(opt vtxOpt) *vertex { - if opt.name == "" { - opt.name = identity.NewID() - } - if opt.cacheKeySeed == "" { - opt.cacheKeySeed = identity.NewID() - } - return &vertex{opt: opt} -} - -type vertex struct { - opt vtxOpt - - cacheCallCount *int64 - execCallCount *int64 -} - -func (v *vertex) Digest() digest.Digest { - return digest.FromBytes([]byte(v.opt.name)) -} -func (v *vertex) Sys() interface{} { - return v -} -func (v *vertex) Inputs() []Edge { - return v.opt.inputs -} -func (v *vertex) Name() string { - return v.opt.name -} - -func (v *vertex) setupCallCounters() { - var cacheCount int64 - var execCount int64 - - v.setCallCounters(&cacheCount, &execCount) -} - -func (v *vertex) setCallCounters(cacheCount, execCount *int64) { - v.cacheCallCount = cacheCount - v.execCallCount = execCount - - for _, inp := range v.opt.inputs { - var v *vertex - switch vv := inp.Vertex.(type) { - case *vertex: - v = vv - case *vertexSum: - v = vv.vertex - case *vertexConst: - v = vv.vertex - } - v.setCallCounters(cacheCount, execCount) - } -} - -func (v *vertex) cacheMap(ctx context.Context) error { - if f := v.opt.cachePreFunc; f != nil { - if err := f(ctx); err != nil { - return err - } - } - if v.cacheCallCount != nil { - atomic.AddInt64(v.cacheCallCount, 1) - } - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - select { - case <-time.After(v.opt.cacheDelay): - case <-ctx.Done(): - return ctx.Err() - } - return nil -} - -func (v *vertex) CacheMap(ctx context.Context) (*CacheMap, error) { - if err := v.cacheMap(ctx); err != nil { - return nil, err - } - return v.makeCacheMap(), nil -} - -func (v *vertex) exec(ctx context.Context, inputs []Result) error { - if len(inputs) != len(v.Inputs()) { - return errors.Errorf("invalid number of inputs") - } - if f := v.opt.execPreFunc; f != nil { - if err := f(ctx); err != nil { - return err - } - } - if v.execCallCount != nil { - atomic.AddInt64(v.execCallCount, 1) - } - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - select { - case <-time.After(v.opt.execDelay): - case <-ctx.Done(): - return ctx.Err() - } - return nil -} - -func (v *vertex) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) { - if err := v.exec(ctx, inputs); err != nil { - return nil, err - } - return []Result{&dummyResult{id: identity.NewID(), value: v.opt.value}}, nil -} - -func (v *vertex) makeCacheMap() *CacheMap { - m := &CacheMap{ - Digest: digest.FromBytes([]byte(fmt.Sprintf("seed:%s", v.opt.cacheKeySeed))), - Deps: make([]struct { - Selector digest.Digest - ComputeDigestFunc ResultBasedCacheFunc - }, len(v.Inputs())), - } - for i, f := range v.opt.slowCacheCompute { - m.Deps[i].ComputeDigestFunc = f - } - return m -} - -// vtxConst returns a vertex that outputs a constant integer -func vtxConst(v int, opt vtxOpt) *vertexConst { - if opt.cacheKeySeed == "" { - opt.cacheKeySeed = fmt.Sprintf("const-%d", v) - } - if opt.name == "" { - opt.name = opt.cacheKeySeed + "-" + identity.NewID() - } - return &vertexConst{vertex: vtx(opt), value: v} -} - -type vertexConst struct { - *vertex - value int -} - -func (v *vertexConst) Sys() interface{} { - return v -} - -func (v *vertexConst) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) { - if err := v.exec(ctx, inputs); err != nil { - return nil, err - } - return []Result{&dummyResult{id: identity.NewID(), intValue: v.value}}, nil -} - -// vtxSum returns a vertex that ourputs sum of its inputs plus a constant -func vtxSum(v int, opt vtxOpt) *vertexSum { - if opt.cacheKeySeed == "" { - opt.cacheKeySeed = fmt.Sprintf("sum-%d", v) - } - if opt.name == "" { - opt.name = opt.cacheKeySeed + "-" + identity.NewID() - } - return &vertexSum{vertex: vtx(opt), value: v} -} - -type vertexSum struct { - *vertex - value int -} - -func (v *vertexSum) Sys() interface{} { - return v -} - -func (v *vertexSum) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) { - if err := v.exec(ctx, inputs); err != nil { - return nil, err - } - s := v.value - for _, inp := range inputs { - r, ok := inp.Sys().(*dummyResult) - if !ok { - return nil, errors.Errorf("invalid input type: %T", inp.Sys()) - } - s += r.intValue - } - return []Result{&dummyResult{id: identity.NewID(), intValue: s}}, nil -} - -func printGraph(e Edge, pfx string) { - name := e.Vertex.Name() - fmt.Printf("%s %d %s\n", pfx, e.Index, name) - for _, inp := range e.Vertex.Inputs() { - printGraph(inp, pfx+"-->") - } -} - -type dummyResult struct { - id string - value string - intValue int -} - -func (r *dummyResult) ID() string { return r.id } -func (r *dummyResult) Release(context.Context) error { return nil } -func (r *dummyResult) Sys() interface{} { return r } - -func testOpResolver(v Vertex) (Op, error) { - if op, ok := v.Sys().(Op); ok { - return op, nil - } - return nil, errors.Errorf("invalid vertex") -} - -func unwrap(res Result) string { - r, ok := res.Sys().(*dummyResult) - if !ok { - return "unwrap-error" - } - return r.value -} - -func unwrapInt(res Result) int { - r, ok := res.Sys().(*dummyResult) - if !ok { - return -1e6 - } - return r.intValue -} - -func blockingFuncion(i int) func(context.Context) error { - limit := int64(i) - block := make(chan struct{}) - return func(context.Context) error { - if atomic.AddInt64(&limit, -1) == 0 { - close(block) - } - <-block - return nil - } -} - -func newTrackingCacheManager(cm CacheManager) *trackingCacheManager { - return &trackingCacheManager{CacheManager: cm} -} - -type trackingCacheManager struct { - CacheManager - loadCounter int64 -} - -func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) { - atomic.AddInt64(&cm.loadCounter, 1) - return cm.CacheManager.Load(ctx, rec) -} - -func digestFromResult(ctx context.Context, res Result) (digest.Digest, error) { - return digest.FromBytes([]byte(unwrap(res))), nil -} diff --git a/solver2/solver/types.go b/solver2/solver/types.go deleted file mode 100644 index fe00cf37..00000000 --- a/solver2/solver/types.go +++ /dev/null @@ -1,144 +0,0 @@ -package solver - -import ( - "context" - "sync" - - digest "github.com/opencontainers/go-digest" -) - -// Vertex is one node in the build graph -type Vertex interface { - // Digest is a content-addressable vertex identifier - Digest() digest.Digest - // Sys returns an internal value that is used to execute the vertex. Usually - // this is capured by the operation resolver method during solve. - Sys() interface{} - // FIXME(AkihiroSuda): we should not import pb pkg here. - // TODO(tonistiigi): reenable strict metadata CacheManager, cache_ignore - // Metadata() *pb.OpMetadata - // Array of edges current vertex depends on. - Inputs() []Edge - Name() string -} - -// Index is a index value for output edge -type Index int - -// Edge is a path to a specific output of the vertex -type Edge struct { - Index Index - Vertex Vertex -} - -// Result is an abstract return value for a solve -type Result interface { - ID() string - Release(context.Context) error - Sys() interface{} -} - -// CachedResult is a result connected with its cache key -type CachedResult interface { - Result - CacheKey() CacheKey - // ExportCache(context.Context, content.Store) (*ocispec.Descriptor, error) -} - -// Op is an implementation for running a vertex -type Op interface { - // CacheMap returns structure describing how the operation is cached - CacheMap(context.Context) (*CacheMap, error) - // Exec runs an operation given results from previous operations. - Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) -} - -type ResultBasedCacheFunc func(context.Context, Result) (digest.Digest, error) - -type CacheMap struct { - // Digest is a base digest for operation that needs to be combined with - // inputs cache or selectors for dependencies. - Digest digest.Digest - Deps []struct { - // Optional digest that is merged with the cache key of the input - // TODO(tonistiigi): not implemented - Selector digest.Digest - // Optional function that returns a digest for the input based on its - // return value - ComputeDigestFunc ResultBasedCacheFunc - } -} - -// CacheKey is an identifier for storing/loading build cache -type CacheKey interface { - // Deps are dependant cache keys - Deps() []CacheKey - // Base digest for operation. Usually CacheMap.Digest - Digest() digest.Digest - // Index for the output that is cached - Output() Index - // Helpers for implementations for adding internal metadata - SetValue(key, value interface{}) - GetValue(key interface{}) interface{} -} - -// CacheRecord is an identifier for loading in cache -type CacheRecord struct { - ID string - CacheKey CacheKey - // Loadable bool - // Size int - // CreatedAt time.Time -} - -// CacheManager implements build cache backend -type CacheManager interface { - // Query searches for cache paths from one cache key to the output of a possible match. - Query(inp []CacheKey, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheRecord, error) - // Load pulls and returns the cached result - Load(ctx context.Context, rec *CacheRecord) (Result, error) - // Save saves a result based on a cache key - Save(key CacheKey, s Result) (CacheKey, error) -} - -// NewCacheKey creates a new cache key for a specific output index -func NewCacheKey(dgst digest.Digest, index Index, deps []CacheKey) CacheKey { - return &cacheKey{ - dgst: dgst, - deps: deps, - index: index, - values: map[interface{}]interface{}{}, - } -} - -type cacheKey struct { - mu sync.RWMutex - dgst digest.Digest - index Index - deps []CacheKey - values map[interface{}]interface{} -} - -func (ck *cacheKey) SetValue(key, value interface{}) { - ck.mu.Lock() - defer ck.mu.Unlock() - ck.values[key] = value -} - -func (ck *cacheKey) GetValue(key interface{}) interface{} { - ck.mu.RLock() - defer ck.mu.RUnlock() - return ck.values[key] -} - -func (ck *cacheKey) Deps() []CacheKey { - return ck.deps -} - -func (ck *cacheKey) Digest() digest.Digest { - return ck.dgst -} - -func (ck *cacheKey) Output() Index { - return ck.index -}