Revert "solver: new implementation"

This reverts commit 5939939666.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2018-01-31 18:01:51 -08:00
parent f78b03608a
commit a2d9a6ea0b
13 changed files with 0 additions and 4155 deletions

View File

@ -1,186 +0,0 @@
package solver
import (
"fmt"
"sync"
digest "github.com/opencontainers/go-digest"
)
// EdgeIndex is a synchronous map for detecting edge collisions.
type EdgeIndex struct {
mu sync.Mutex
items map[indexedDigest]map[indexedDigest]map[*edge]struct{}
backRefs map[*edge]map[indexedDigest]map[indexedDigest]struct{}
}
func NewEdgeIndex() *EdgeIndex {
return &EdgeIndex{
items: map[indexedDigest]map[indexedDigest]map[*edge]struct{}{},
backRefs: map[*edge]map[indexedDigest]map[indexedDigest]struct{}{},
}
}
func (ei *EdgeIndex) LoadOrStore(e *edge, dgst digest.Digest, index Index, deps [][]CacheKey) *edge {
ei.mu.Lock()
defer ei.mu.Unlock()
if e := ei.load(e, dgst, index, deps); e != nil {
return e
}
ei.store(e, dgst, index, deps)
return nil
}
func (ei *EdgeIndex) Release(e *edge) {
ei.mu.Lock()
defer ei.mu.Unlock()
for id, backRefs := range ei.backRefs[e] {
for id2 := range backRefs {
delete(ei.items[id][id2], e)
if len(ei.items[id][id2]) == 0 {
delete(ei.items[id], id2)
}
}
if len(ei.items[id]) == 0 {
delete(ei.items, id)
}
}
delete(ei.backRefs, e)
}
func (ei *EdgeIndex) load(ignore *edge, dgst digest.Digest, index Index, deps [][]CacheKey) *edge {
id := indexedDigest{dgst: dgst, index: index, depsCount: len(deps)}
m, ok := ei.items[id]
if !ok {
return nil
}
if len(deps) == 0 {
m2, ok := m[indexedDigest{}]
if !ok {
return nil
}
for e := range m2 {
if e != ignore {
return e
}
}
return nil
}
matches := map[*edge]struct{}{}
for i, keys := range deps {
if i == 0 {
for _, key := range keys {
id := indexedDigest{dgst: getUniqueID(key), index: Index(i)}
for e := range m[id] {
if e != ignore {
matches[e] = struct{}{}
}
}
}
} else {
loop0:
for match := range matches {
for _, key := range keys {
id := indexedDigest{dgst: getUniqueID(key), index: Index(i)}
if m[id] != nil {
if _, ok := m[id][match]; ok {
continue loop0
}
}
}
delete(matches, match)
}
}
if len(matches) == 0 {
break
}
}
for m := range matches {
return m
}
return nil
}
func (ei *EdgeIndex) store(e *edge, dgst digest.Digest, index Index, deps [][]CacheKey) {
id := indexedDigest{dgst: dgst, index: index, depsCount: len(deps)}
m, ok := ei.items[id]
if !ok {
m = map[indexedDigest]map[*edge]struct{}{}
ei.items[id] = m
}
backRefsMain, ok := ei.backRefs[e]
if !ok {
backRefsMain = map[indexedDigest]map[indexedDigest]struct{}{}
ei.backRefs[e] = backRefsMain
}
backRefs, ok := backRefsMain[id]
if !ok {
backRefs = map[indexedDigest]struct{}{}
backRefsMain[id] = backRefs
}
if len(deps) == 0 {
m2, ok := m[indexedDigest{}]
if !ok {
m2 = map[*edge]struct{}{}
m[indexedDigest{}] = m2
}
m2[e] = struct{}{}
backRefs[indexedDigest{}] = struct{}{}
return
}
for i, keys := range deps {
for _, key := range keys {
id := indexedDigest{dgst: getUniqueID(key), index: Index(i)}
m2, ok := m[id]
if !ok {
m2 = map[*edge]struct{}{}
m[id] = m2
}
m2[e] = struct{}{}
backRefs[id] = struct{}{}
}
}
}
type indexedDigest struct {
dgst digest.Digest
index Index
depsCount int
}
type internalKeyT string
var internalKey = internalKeyT("buildkit/unique-cache-id")
func getUniqueID(k CacheKey) digest.Digest {
internalV := k.GetValue(internalKey)
if internalV != nil {
return internalV.(digest.Digest)
}
dgstr := digest.SHA256.Digester()
for _, inp := range k.Deps() {
dgstr.Hash().Write([]byte(getUniqueID(inp)))
}
dgstr.Hash().Write([]byte(k.Digest()))
dgstr.Hash().Write([]byte(fmt.Sprintf("%d", k.Output())))
dgst := dgstr.Digest()
k.SetValue(internalKey, dgst)
return dgst
}

View File

@ -1 +0,0 @@
package solver

View File

@ -1,515 +0,0 @@
package solver
import (
"context"
"fmt"
"sync"
"time"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// ResolveOpFunc finds an Op implementation for a Vertex
type ResolveOpFunc func(Vertex) (Op, error)
// JobList provides a shared graph of all the vertexes currently being
// processed. Every vertex that is being solved needs to be loaded into job
// first. Vertex operations are invoked and progress tracking happends through
// jobs.
// TODO: s/JobList/Solver
type JobList struct {
mu sync.RWMutex
jobs map[string]*Job
actives map[digest.Digest]*state
opts SolverOpt
updateCond *sync.Cond
s *Scheduler
index *EdgeIndex
}
type state struct {
jobs map[*Job]struct{}
parents map[digest.Digest]struct{}
childVtx map[digest.Digest]struct{}
mpw *progress.MultiWriter
allPw map[progress.Writer]struct{}
vtx Vertex
clientVertex client.Vertex
mu sync.Mutex
op *sharedOp
edges map[Index]*edge
opts SolverOpt
index *EdgeIndex
}
func (s *state) getEdge(index Index) *edge {
s.mu.Lock()
defer s.mu.Unlock()
if e, ok := s.edges[index]; ok {
return e
}
if s.op == nil {
s.op = newSharedOp(s.opts.ResolveOpFunc, s.opts.DefaultCache, s)
}
e := newEdge(Edge{Index: index, Vertex: s.vtx}, s.op, s.index)
s.edges[index] = e
return e
}
func (s *state) setEdge(index Index, newEdge *edge) {
s.mu.Lock()
defer s.mu.Unlock()
e, ok := s.edges[index]
if ok {
if e == newEdge {
return
}
e.release()
}
newEdge.duplicateReleaser()
s.edges[index] = newEdge
}
func (s *state) Release() {
for _, e := range s.edges {
e.release()
}
}
type Job struct {
list *JobList
pr *progress.MultiReader
pw progress.Writer
progressCloser func()
}
type SolverOpt struct {
ResolveOpFunc ResolveOpFunc
DefaultCache CacheManager
}
func NewJobList(opts SolverOpt) *JobList {
if opts.DefaultCache == nil {
opts.DefaultCache = NewInMemoryCacheManager()
}
jl := &JobList{
jobs: make(map[string]*Job),
actives: make(map[digest.Digest]*state),
opts: opts,
index: NewEdgeIndex(),
}
jl.s = NewScheduler(jl)
jl.updateCond = sync.NewCond(jl.mu.RLocker())
return jl
}
func (jl *JobList) SetEdge(e Edge, newEdge *edge) {
jl.mu.RLock()
defer jl.mu.RUnlock()
st, ok := jl.actives[e.Vertex.Digest()]
if !ok {
return
}
st.setEdge(e.Index, newEdge)
}
func (jl *JobList) GetEdge(e Edge) *edge {
jl.mu.RLock()
defer jl.mu.RUnlock()
st, ok := jl.actives[e.Vertex.Digest()]
if !ok {
return nil
}
return st.getEdge(e.Index)
}
func (jl *JobList) SubBuild(ctx context.Context, e Edge, parent Vertex) (CachedResult, error) {
if err := jl.load(e.Vertex, parent, nil); err != nil {
return nil, err
}
return jl.s.build(ctx, e)
}
func (jl *JobList) Close() {
jl.s.Stop()
}
func (jl *JobList) load(v, parent Vertex, j *Job) error {
jl.mu.Lock()
defer jl.mu.Unlock()
return jl.loadUnlocked(v, parent, j)
}
func (jl *JobList) loadUnlocked(v, parent Vertex, j *Job) error {
for _, e := range v.Inputs() {
if err := jl.loadUnlocked(e.Vertex, parent, j); err != nil {
return err
}
}
dgst := v.Digest()
st, ok := jl.actives[dgst]
if !ok {
st = &state{
opts: jl.opts,
jobs: map[*Job]struct{}{},
parents: map[digest.Digest]struct{}{},
childVtx: map[digest.Digest]struct{}{},
allPw: map[progress.Writer]struct{}{},
mpw: progress.NewMultiWriter(progress.WithMetadata("vertex", dgst)),
vtx: v,
clientVertex: initClientVertex(v),
edges: map[Index]*edge{},
index: jl.index,
}
jl.actives[dgst] = st
}
if j != nil {
if _, ok := st.jobs[j]; !ok {
st.jobs[j] = struct{}{}
}
}
if parent != nil {
if _, ok := st.parents[parent.Digest()]; !ok {
st.parents[parent.Digest()] = struct{}{}
parentState, ok := jl.actives[parent.Digest()]
if !ok {
return errors.Errorf("inactive parent %s", parent.Digest())
}
parentState.childVtx[dgst] = struct{}{}
}
}
jl.connectProgressFromState(st, st)
return nil
}
func (jl *JobList) connectProgressFromState(target, src *state) {
for j := range src.jobs {
if _, ok := target.allPw[j.pw]; !ok {
target.mpw.Add(j.pw)
target.allPw[j.pw] = struct{}{}
j.pw.Write(target.clientVertex.Digest.String(), target.clientVertex)
}
}
for p := range src.parents {
jl.connectProgressFromState(target, jl.actives[p])
}
}
func (jl *JobList) NewJob(id string) (*Job, error) {
jl.mu.Lock()
defer jl.mu.Unlock()
if _, ok := jl.jobs[id]; ok {
return nil, errors.Errorf("job ID %s exists", id)
}
pr, ctx, progressCloser := progress.NewContext(context.Background())
pw, _, _ := progress.FromContext(ctx) // TODO: expose progress.Pipe()
j := &Job{
list: jl,
pr: progress.NewMultiReader(pr),
pw: pw,
progressCloser: progressCloser,
}
jl.jobs[id] = j
jl.updateCond.Broadcast()
return j, nil
}
func (jl *JobList) Get(id string) (*Job, error) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
go func() {
<-ctx.Done()
jl.updateCond.Broadcast()
}()
jl.mu.RLock()
defer jl.mu.RUnlock()
for {
select {
case <-ctx.Done():
return nil, errors.Errorf("no such job %s", id)
default:
}
j, ok := jl.jobs[id]
if !ok {
jl.updateCond.Wait()
continue
}
return j, nil
}
}
// called with joblist lock
func (jl *JobList) deleteIfUnreferenced(k digest.Digest, st *state) {
if len(st.jobs) == 0 && len(st.parents) == 0 {
for chKey := range st.childVtx {
chState := jl.actives[chKey]
delete(chState.parents, k)
jl.deleteIfUnreferenced(chKey, chState)
}
st.Release()
delete(jl.actives, k)
}
}
func (j *Job) Build(ctx context.Context, e Edge) (CachedResult, error) {
if err := j.list.load(e.Vertex, nil, j); err != nil {
return nil, err
}
return j.list.s.build(ctx, e)
}
func (j *Job) Discard() error {
defer j.progressCloser()
j.list.mu.Lock()
defer j.list.mu.Unlock()
j.pw.Close()
for k, st := range j.list.actives {
if _, ok := st.jobs[j]; ok {
delete(st.jobs, j)
j.list.deleteIfUnreferenced(k, st)
}
if _, ok := st.allPw[j.pw]; ok {
delete(st.allPw, j.pw)
}
}
return nil
}
type activeOp interface {
Op
Cache() CacheManager
CalcSlowCache(context.Context, Index, ResultBasedCacheFunc, Result) (digest.Digest, error)
}
func newSharedOp(resolver ResolveOpFunc, cacheManager CacheManager, st *state) *sharedOp {
so := &sharedOp{
resolver: resolver,
st: st,
cacheManager: cacheManager,
slowCacheRes: map[Index]digest.Digest{},
slowCacheErr: map[Index]error{},
}
return so
}
type sharedOp struct {
resolver ResolveOpFunc
cacheManager CacheManager
st *state
g flightcontrol.Group
opOnce sync.Once
op Op
err error
execRes []*SharedResult
execErr error
cacheRes *CacheMap
cacheErr error
slowMu sync.Mutex
slowCacheRes map[Index]digest.Digest
slowCacheErr map[Index]error
}
func (s *sharedOp) Cache() CacheManager {
return s.cacheManager // TODO: add on load
}
func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBasedCacheFunc, res Result) (digest.Digest, error) {
key, err := s.g.Do(ctx, fmt.Sprintf("slow-compute-%d", index), func(ctx context.Context) (interface{}, error) {
s.slowMu.Lock()
// TODO: add helpers for these stored values
if res := s.slowCacheRes[index]; res != "" {
s.slowMu.Unlock()
return res, nil
}
if err := s.slowCacheErr[index]; err != nil {
s.slowMu.Unlock()
return err, nil
}
s.slowMu.Unlock()
ctx = progress.WithProgress(ctx, s.st.mpw)
key, err := f(ctx, res)
complete := true
if err != nil {
canceled := false
select {
case <-ctx.Done():
canceled = true
default:
}
if canceled && errors.Cause(err) == context.Canceled {
complete = false
}
}
s.slowMu.Lock()
defer s.slowMu.Unlock()
if complete {
if err == nil {
s.slowCacheRes[index] = key
}
s.slowCacheErr[index] = err
}
return key, err
})
if err != nil {
return "", err
}
return key.(digest.Digest), nil
}
func (s *sharedOp) CacheMap(ctx context.Context) (*CacheMap, error) {
op, err := s.getOp()
if err != nil {
return nil, err
}
res, err := s.g.Do(ctx, "cachemap", func(ctx context.Context) (interface{}, error) {
if s.cacheRes != nil {
return s.cacheRes, nil
}
if s.cacheErr != nil {
return nil, s.cacheErr
}
ctx = progress.WithProgress(ctx, s.st.mpw)
res, err := op.CacheMap(ctx)
complete := true
if err != nil {
canceled := false
select {
case <-ctx.Done():
canceled = true
default:
}
if canceled && errors.Cause(err) == context.Canceled {
complete = false
}
}
if complete {
if err == nil {
s.cacheRes = res
}
s.cacheErr = err
}
return res, err
})
if err != nil {
return nil, err
}
return res.(*CacheMap), nil
}
func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) {
op, err := s.getOp()
if err != nil {
return nil, err
}
res, err := s.g.Do(ctx, "exec", func(ctx context.Context) (interface{}, error) {
if s.execRes != nil || s.execErr != nil {
return s.execRes, s.execErr
}
ctx = progress.WithProgress(ctx, s.st.mpw)
res, err := op.Exec(ctx, inputs)
complete := true
if err != nil {
canceled := false
select {
case <-ctx.Done():
canceled = true
default:
}
if canceled && errors.Cause(err) == context.Canceled {
complete = false
}
}
if complete {
if res != nil {
s.execRes = wrapShared(res)
}
s.execErr = err
}
return s.execRes, err
})
if err != nil {
return nil, err
}
return unwrapShared(res.([]*SharedResult)), nil
}
func (s *sharedOp) getOp() (Op, error) {
s.opOnce.Do(func() {
s.op, s.err = s.resolver(s.st.vtx)
})
if s.err != nil {
return nil, s.err
}
return s.op, nil
}
func (s *sharedOp) release() {
if s.execRes != nil {
for _, r := range s.execRes {
r.Release(context.TODO())
}
}
}
func initClientVertex(v Vertex) client.Vertex {
inputDigests := make([]digest.Digest, 0, len(v.Inputs()))
for _, inp := range v.Inputs() {
inputDigests = append(inputDigests, inp.Vertex.Digest())
}
return client.Vertex{
Inputs: inputDigests,
Name: v.Name(),
Digest: v.Digest(),
}
}
func wrapShared(inp []Result) []*SharedResult {
out := make([]*SharedResult, len(inp))
for i, r := range inp {
out[i] = NewSharedResult(r)
}
return out
}
func unwrapShared(inp []*SharedResult) []Result {
out := make([]Result, len(inp))
for i, r := range inp {
out[i] = r.Clone()
}
return out
}

View File

@ -1,128 +0,0 @@
package llb
import (
"strings"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/solver2/solver"
"github.com/moby/buildkit/source"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
type vertex struct {
sys interface{}
metadata *pb.OpMetadata
inputs []solver.Edge
digest digest.Digest
name string
}
func (v *vertex) Digest() digest.Digest {
return v.digest
}
func (v *vertex) Sys() interface{} {
return v.sys
}
func (v *vertex) Metadata() *pb.OpMetadata {
return v.metadata
}
func (v *vertex) Inputs() []solver.Edge {
return v.inputs
}
func (v *vertex) Name() string {
return v.name
}
func Load(def *pb.Definition) (solver.Edge, error) {
return loadLLB(def, func(dgst digest.Digest, pbOp *pb.Op, load func(digest.Digest) (solver.Vertex, error)) (solver.Vertex, error) {
opMetadata := def.Metadata[dgst]
vtx, err := newVertex(dgst, pbOp, &opMetadata, load)
if err != nil {
return nil, err
}
return vtx, nil
})
}
func newVertex(dgst digest.Digest, op *pb.Op, opMeta *pb.OpMetadata, load func(digest.Digest) (solver.Vertex, error)) (*vertex, error) {
vtx := &vertex{sys: op.Op, metadata: opMeta, digest: dgst, name: llbOpName(op)}
for _, in := range op.Inputs {
sub, err := load(in.Digest)
if err != nil {
return nil, err
}
vtx.inputs = append(vtx.inputs, solver.Edge{Index: solver.Index(in.Index), Vertex: sub})
}
return vtx, nil
}
// loadLLB loads LLB.
// fn is executed sequentially.
func loadLLB(def *pb.Definition, fn func(digest.Digest, *pb.Op, func(digest.Digest) (solver.Vertex, error)) (solver.Vertex, error)) (solver.Edge, error) {
if len(def.Def) == 0 {
return solver.Edge{}, errors.New("invalid empty definition")
}
allOps := make(map[digest.Digest]*pb.Op)
var dgst digest.Digest
for _, dt := range def.Def {
var op pb.Op
if err := (&op).Unmarshal(dt); err != nil {
return solver.Edge{}, errors.Wrap(err, "failed to parse llb proto op")
}
dgst = digest.FromBytes(dt)
allOps[dgst] = &op
}
lastOp := allOps[dgst]
delete(allOps, dgst)
dgst = lastOp.Inputs[0].Digest
cache := make(map[digest.Digest]solver.Vertex)
var rec func(dgst digest.Digest) (solver.Vertex, error)
rec = func(dgst digest.Digest) (solver.Vertex, error) {
if v, ok := cache[dgst]; ok {
return v, nil
}
v, err := fn(dgst, allOps[dgst], rec)
if err != nil {
return nil, err
}
cache[dgst] = v
return v, nil
}
v, err := rec(dgst)
if err != nil {
return solver.Edge{}, err
}
return solver.Edge{Vertex: v, Index: solver.Index(lastOp.Inputs[0].Index)}, nil
}
func llbOpName(op *pb.Op) string {
switch op := op.Op.(type) {
case *pb.Op_Source:
if id, err := source.FromLLB(op); err == nil {
if id, ok := id.(*source.LocalIdentifier); ok {
if len(id.IncludePatterns) == 1 {
return op.Source.Identifier + " (" + id.IncludePatterns[0] + ")"
}
}
}
return op.Source.Identifier
case *pb.Op_Exec:
return strings.Join(op.Exec.Meta.Args, " ")
case *pb.Op_Build:
return "build"
default:
return "unknown"
}
}

View File

@ -1,222 +0,0 @@
package solver
import (
"context"
"fmt"
"strings"
"sync"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
type internalMemoryKeyT string
var internalMemoryKey = internalMemoryKeyT("buildkit/memory-cache-id")
func NewInMemoryCacheManager() CacheManager {
return &inMemoryCacheManager{
byID: map[string]*inMemoryCacheKey{},
}
}
type inMemoryCacheKey struct {
CacheKey
id string
dgst digest.Digest
output Index
deps []CacheKey // only []*inMemoryCacheManager
results map[Index]map[string]Result
links map[link]map[string]struct{}
}
func (ck *inMemoryCacheKey) Deps() []CacheKey {
return ck.deps
}
func (ck *inMemoryCacheKey) Digest() digest.Digest {
return ck.dgst
}
func (ck *inMemoryCacheKey) Index() Index {
return ck.output
}
type link struct {
input, output Index
digest digest.Digest
}
type inMemoryCacheManager struct {
mu sync.RWMutex
byID map[string]*inMemoryCacheKey
}
func (c *inMemoryCacheManager) Query(deps []CacheKey, input Index, dgst digest.Digest, output Index) ([]*CacheRecord, error) {
c.mu.RLock()
defer c.mu.RUnlock()
refs := map[string]struct{}{}
sublinks := map[string]struct{}{}
for _, dep := range deps {
ck, err := c.getInternalKey(dep, false)
if err == nil {
for key := range ck.links[link{input, output, dgst}] {
refs[key] = struct{}{}
}
for key := range ck.links[link{Index(-1), Index(0), ""}] {
sublinks[key] = struct{}{}
}
}
}
for id := range sublinks {
if ck, ok := c.byID[id]; ok {
for key := range ck.links[link{input, output, dgst}] {
refs[key] = struct{}{}
}
}
}
if len(deps) == 0 {
ck, err := c.getInternalKey(NewCacheKey(dgst, 0, nil), false)
if err != nil {
return nil, nil
}
refs[ck.id] = struct{}{}
}
outs := make([]*CacheRecord, 0, len(refs))
for id := range refs {
if ck, ok := c.byID[id]; ok {
for _, res := range ck.results[output] {
outs = append(outs, &CacheRecord{
ID: id + "@" + res.ID(),
CacheKey: ck,
})
}
}
}
return outs, nil
}
func (c *inMemoryCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
c.mu.RLock()
defer c.mu.RUnlock()
keyParts := strings.Split(rec.ID, "@")
if len(keyParts) != 2 {
return nil, errors.Errorf("invalid cache record ID")
}
ck, err := c.getInternalKey(rec.CacheKey, false)
if err != nil {
return nil, err
}
for output := range ck.results {
res, ok := ck.results[output][keyParts[1]]
if ok {
return res, nil
}
}
return nil, errors.Errorf("failed to load cache record") // TODO: typed error
}
func (c *inMemoryCacheManager) Save(k CacheKey, r Result) (CacheKey, error) {
c.mu.Lock()
defer c.mu.Unlock()
ck, err := c.getInternalKey(k, true)
if err != nil {
return nil, err
}
if err := c.addResult(ck, k.Output(), r); err != nil {
return nil, err
}
return ck, nil
}
func (c *inMemoryCacheManager) getInternalKey(k CacheKey, createIfNotExist bool) (*inMemoryCacheKey, error) {
if ck, ok := k.(*inMemoryCacheKey); ok {
return ck, nil
}
internalV := k.GetValue(internalMemoryKey)
if internalV != nil {
ck, ok := c.byID[internalV.(string)]
if !ok {
return nil, errors.Errorf("failed lookup by internal ID %s", internalV.(string))
}
return ck, nil
}
inputs := make([]CacheKey, len(k.Deps()))
dgstr := digest.SHA256.Digester()
for i, inp := range k.Deps() {
ck, err := c.getInternalKey(inp, createIfNotExist)
if err != nil {
return nil, err
}
inputs[i] = ck
if _, err := dgstr.Hash().Write([]byte(ck.id)); err != nil {
return nil, err
}
}
if _, err := dgstr.Hash().Write([]byte(k.Digest())); err != nil {
return nil, err
}
if _, err := dgstr.Hash().Write([]byte(fmt.Sprintf("%d", k.Output()))); err != nil {
return nil, err
}
internalKey := string(dgstr.Digest())
ck, ok := c.byID[internalKey]
if !ok {
if !createIfNotExist {
return nil, errors.Errorf("not-found")
}
ck = &inMemoryCacheKey{
CacheKey: k,
id: internalKey,
dgst: k.Digest(),
output: k.Output(),
deps: inputs,
results: map[Index]map[string]Result{},
links: map[link]map[string]struct{}{},
}
ck.SetValue(internalMemoryKey, internalKey)
c.byID[internalKey] = ck
}
for i, inp := range inputs {
if ck.dgst == "" {
i = -1
}
if err := c.addLink(link{Index(i), ck.output, ck.dgst}, inp.(*inMemoryCacheKey), ck); err != nil {
return nil, err
}
}
return ck, nil
}
func (c *inMemoryCacheManager) addResult(ck *inMemoryCacheKey, output Index, r Result) error {
m, ok := ck.results[output]
if !ok {
m = map[string]Result{}
ck.results[output] = m
}
m[r.ID()] = r
return nil
}
func (c *inMemoryCacheManager) addLink(l link, from, to *inMemoryCacheKey) error {
m, ok := from.links[l]
if !ok {
m = map[string]struct{}{}
from.links[l] = m
}
m[to.id] = struct{}{}
return nil
}

View File

@ -1,127 +0,0 @@
package solver
import (
"context"
"testing"
"github.com/moby/buildkit/identity"
digest "github.com/opencontainers/go-digest"
"github.com/stretchr/testify/require"
)
func TestInMemoryCache(t *testing.T) {
ctx := context.TODO()
m := NewInMemoryCacheManager()
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("result0"))
require.NoError(t, err)
matches, err := m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
res, err := m.Load(ctx, matches[0])
require.NoError(t, err)
require.Equal(t, "result0", unwrap(res))
// another record
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0, nil), testResult("result1"))
require.NoError(t, err)
matches, err = m.Query(nil, 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
res, err = m.Load(ctx, matches[0])
require.NoError(t, err)
require.Equal(t, "result1", unwrap(res))
// invalid request
matches, err = m.Query(nil, 0, dgst("baz"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
// second level
k := NewCacheKey(dgst("baz"), Index(1), []CacheKey{
cacheFoo, cacheBar,
})
cacheBaz, err := m.Save(k, testResult("result2"))
require.NoError(t, err)
matches, err = m.Query(nil, 0, dgst("baz"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
matches, err = m.Query([]CacheKey{cacheFoo}, 0, dgst("baz"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
matches, err = m.Query([]CacheKey{cacheFoo}, 1, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(matches), 0)
matches, err = m.Query([]CacheKey{cacheFoo}, 0, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(matches), 1)
res, err = m.Load(ctx, matches[0])
require.NoError(t, err)
require.Equal(t, "result2", unwrap(res))
matches2, err := m.Query([]CacheKey{cacheBar}, 1, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(matches2), 1)
require.Equal(t, matches[0].ID, matches2[0].ID)
k = NewCacheKey(dgst("baz"), Index(1), []CacheKey{
cacheFoo,
})
_, err = m.Save(k, testResult("result3"))
require.NoError(t, err)
matches, err = m.Query([]CacheKey{cacheFoo}, 0, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(matches), 2)
// combination save
k2 := NewCacheKey("", 0, []CacheKey{
cacheFoo, cacheBaz,
})
k = NewCacheKey(dgst("bax"), 0, []CacheKey{
k2, cacheBar,
})
_, err = m.Save(k, testResult("result4"))
require.NoError(t, err)
// foo, bar, baz should all point to result4
matches, err = m.Query([]CacheKey{cacheFoo}, 0, dgst("bax"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
id := matches[0].ID
matches, err = m.Query([]CacheKey{cacheBar}, 1, dgst("bax"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, matches[0].ID, id)
matches, err = m.Query([]CacheKey{cacheBaz}, 0, dgst("bax"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, matches[0].ID, id)
}
func dgst(s string) digest.Digest {
return digest.FromBytes([]byte(s))
}
func testResult(v string) Result {
return &dummyResult{
id: identity.NewID(),
value: v,
}
}

View File

@ -1,195 +0,0 @@
package solver
import (
"context"
"sync"
"sync/atomic"
"github.com/pkg/errors"
)
type Channel struct {
Signal func()
mu sync.Mutex
value atomic.Value
lastValue interface{}
}
func (c *Channel) Send(v interface{}) {
c.value.Store(v)
if c.Signal != nil {
c.Signal()
}
}
func (c *Channel) Receive() (interface{}, bool) {
v := c.value.Load()
if c.lastValue == v {
return nil, false
}
c.lastValue = v
return v, true
}
type Pipe struct {
Writer PipeWriter
Reader PipeReader
SignalReader func()
SignalWriter func()
}
type PipeRequest struct {
Request interface{} // Payload
Canceled bool
}
type PipeWriter interface {
Request() PipeRequest
Update(v interface{})
Finalize(v interface{}, err error)
Status() PipeStatus
}
type PipeReader interface {
Reload() bool
Cancel()
Status() PipeStatus
Request() interface{}
}
type PipeStatus struct {
Canceled bool
Completed bool
Err error
Value interface{}
}
func newFuncionPipe(f func(context.Context) (interface{}, error)) (*Pipe, func()) {
p := NewPipe(PipeRequest{})
ctx, cancel := context.WithCancel(context.TODO())
p.SignalReader = func() {
if req := p.Writer.Request(); req.Canceled {
cancel()
}
}
return p, func() {
res, err := f(ctx)
if err != nil {
p.Writer.Finalize(nil, err)
return
}
p.Writer.Finalize(res, nil)
}
}
func NewPipe(req PipeRequest) *Pipe {
cancelCh := &Channel{}
roundTripCh := &Channel{}
pw := &pipeWriter{
req: req,
recvChannel: cancelCh,
sendChannel: roundTripCh,
}
pr := &pipeReader{
req: req,
recvChannel: roundTripCh,
sendChannel: cancelCh,
}
p := &Pipe{
Writer: pw,
Reader: pr,
}
cancelCh.Signal = func() {
v, ok := cancelCh.Receive()
if ok {
pw.setRequest(v.(PipeRequest))
}
if p.SignalReader != nil {
p.SignalReader()
}
}
roundTripCh.Signal = func() {
if p.SignalWriter != nil {
p.SignalWriter()
}
}
return p
}
type pipeWriter struct {
status PipeStatus
req PipeRequest
recvChannel *Channel
sendChannel *Channel
mu sync.Mutex
}
func (pw *pipeWriter) Status() PipeStatus {
return pw.status
}
func (pw *pipeWriter) Request() PipeRequest {
pw.mu.Lock()
defer pw.mu.Unlock()
return pw.req
}
func (pw *pipeWriter) setRequest(req PipeRequest) {
pw.mu.Lock()
defer pw.mu.Unlock()
pw.req = req
}
func (pw *pipeWriter) Update(v interface{}) {
pw.status.Value = v
pw.sendChannel.Send(pw.status)
}
func (pw *pipeWriter) Finalize(v interface{}, err error) {
if v != nil {
pw.status.Value = v
}
pw.status.Err = err
pw.status.Completed = true
if errors.Cause(err) == context.Canceled && pw.req.Canceled {
pw.status.Canceled = true
}
pw.sendChannel.Send(pw.status)
}
type pipeReader struct {
status PipeStatus
req PipeRequest
recvChannel *Channel
sendChannel *Channel
}
func (pr *pipeReader) Request() interface{} {
return pr.req.Request
}
func (pr *pipeReader) Reload() bool {
v, ok := pr.recvChannel.Receive()
if !ok {
return false
}
pr.status = v.(PipeStatus)
return true
}
func (pr *pipeReader) Cancel() {
req := pr.req
req.Canceled = true
pr.sendChannel.Send(req)
}
func (pr *pipeReader) Status() PipeStatus {
return pr.status
}

View File

@ -1,88 +0,0 @@
package solver
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestPipe(t *testing.T) {
runCh := make(chan struct{})
f := func(ctx context.Context) (interface{}, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-runCh:
return "res0", nil
}
}
waitSignal := make(chan struct{}, 10)
signalled := 0
signal := func() {
signalled++
waitSignal <- struct{}{}
}
p, start := newFuncionPipe(f)
p.SignalWriter = signal
go start()
require.Equal(t, false, p.Reader.Reload())
st := p.Reader.Status()
require.Equal(t, st.Completed, false)
require.Equal(t, st.Canceled, false)
require.Nil(t, st.Value)
require.Equal(t, signalled, 0)
close(runCh)
<-waitSignal
p.Reader.Reload()
st = p.Reader.Status()
require.Equal(t, st.Completed, true)
require.Equal(t, st.Canceled, false)
require.NoError(t, st.Err)
require.Equal(t, st.Value.(string), "res0")
}
func TestPipeCancel(t *testing.T) {
runCh := make(chan struct{})
f := func(ctx context.Context) (interface{}, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-runCh:
return "res0", nil
}
}
waitSignal := make(chan struct{}, 10)
signalled := 0
signal := func() {
signalled++
waitSignal <- struct{}{}
}
p, start := newFuncionPipe(f)
p.SignalWriter = signal
go start()
p.Reader.Reload()
st := p.Reader.Status()
require.Equal(t, st.Completed, false)
require.Equal(t, st.Canceled, false)
require.Nil(t, st.Value)
require.Equal(t, signalled, 0)
p.Reader.Cancel()
<-waitSignal
p.Reader.Reload()
st = p.Reader.Status()
require.Equal(t, st.Completed, true)
require.Equal(t, st.Canceled, true)
require.Error(t, st.Err)
require.Equal(t, st.Err, context.Canceled)
}

View File

@ -1,107 +0,0 @@
package solver
import (
"context"
"io"
"time"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
)
func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error {
vs := &vertexStream{cache: map[digest.Digest]*client.Vertex{}}
pr := j.pr.Reader(ctx)
defer func() {
if enc := vs.encore(); len(enc) > 0 {
ch <- &client.SolveStatus{Vertexes: enc}
}
}()
for {
p, err := pr.Read(ctx)
if err != nil {
if err == io.EOF {
return nil
}
return err
}
ss := &client.SolveStatus{}
for _, p := range p {
switch v := p.Sys.(type) {
case client.Vertex:
ss.Vertexes = append(ss.Vertexes, vs.append(v)...)
case progress.Status:
vtx, ok := p.Meta("vertex")
if !ok {
logrus.Warnf("progress %s status without vertex info", p.ID)
continue
}
vs := &client.VertexStatus{
ID: p.ID,
Vertex: vtx.(digest.Digest),
Name: v.Action,
Total: int64(v.Total),
Current: int64(v.Current),
Timestamp: p.Timestamp,
Started: v.Started,
Completed: v.Completed,
}
ss.Statuses = append(ss.Statuses, vs)
case client.VertexLog:
vtx, ok := p.Meta("vertex")
if !ok {
logrus.Warnf("progress %s log without vertex info", p.ID)
continue
}
v.Vertex = vtx.(digest.Digest)
v.Timestamp = p.Timestamp
ss.Logs = append(ss.Logs, &v)
}
}
select {
case <-ctx.Done():
return ctx.Err()
case ch <- ss:
}
}
}
type vertexStream struct {
cache map[digest.Digest]*client.Vertex
}
func (vs *vertexStream) append(v client.Vertex) []*client.Vertex {
var out []*client.Vertex
vs.cache[v.Digest] = &v
if v.Cached {
for _, inp := range v.Inputs {
if inpv, ok := vs.cache[inp]; ok {
if !inpv.Cached && inpv.Completed == nil {
inpv.Cached = true
inpv.Started = v.Completed
inpv.Completed = v.Completed
}
delete(vs.cache, inp)
out = append(out, vs.append(*inpv)...)
}
}
}
vcopy := v
return append(out, &vcopy)
}
func (vs *vertexStream) encore() []*client.Vertex {
var out []*client.Vertex
for _, v := range vs.cache {
if v.Started != nil && v.Completed == nil {
now := time.Now()
v.Completed = &now
v.Error = context.Canceled.Error()
out = append(out, v)
}
}
return out
}

View File

@ -1,72 +0,0 @@
package solver
import (
"context"
"sync"
"sync/atomic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// SharedResult is a result that can be cloned
type SharedResult struct {
mu sync.Mutex
main Result
}
func NewSharedResult(main Result) *SharedResult {
return &SharedResult{main: main}
}
func (r *SharedResult) Clone() Result {
r.mu.Lock()
defer r.mu.Unlock()
r1, r2 := dup(r.main)
r.main = r1
return r2
}
func (r *SharedResult) Release(ctx context.Context) error {
r.mu.Lock()
defer r.mu.Unlock()
return r.main.Release(ctx)
}
func dup(res Result) (Result, Result) {
sem := int64(0)
return &splitResult{Result: res, sem: &sem}, &splitResult{Result: res, sem: &sem}
}
type splitResult struct {
Result
released int64
sem *int64
}
func (r *splitResult) Release(ctx context.Context) error {
if atomic.AddInt64(&r.released, 1) > 1 {
err := errors.Errorf("releasing already released reference")
logrus.Error(err)
return err
}
if atomic.AddInt64(r.sem, 1) == 2 {
return r.Result.Release(ctx)
}
return nil
}
// NewCachedResult combines a result and cache key into cached result
func NewCachedResult(res Result, k CacheKey) CachedResult {
return &cachedResult{res, k}
}
type cachedResult struct {
Result
k CacheKey
}
func (cr *cachedResult) CacheKey() CacheKey {
return cr.k
}

View File

@ -1,948 +0,0 @@
package solver
import (
"context"
"sync"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const debugScheduler = false // TODO: replace with logs in build trace
type edgeStatusType int
const (
edgeStatusInitial edgeStatusType = iota
edgeStatusCacheFast
edgeStatusCacheSlow
edgeStatusComplete
)
type EdgeFactory interface {
GetEdge(Edge) *edge
SetEdge(Edge, *edge)
}
type edgePipe struct {
*Pipe
From, Target *edge
mu sync.Mutex
}
func NewScheduler(ef EdgeFactory) *Scheduler {
s := &Scheduler{
waitq: map[*edge]struct{}{},
incoming: map[*edge][]*edgePipe{},
outgoing: map[*edge][]*edgePipe{},
stopped: make(chan struct{}),
closed: make(chan struct{}),
ef: ef,
}
s.cond = sync.NewCond(&s.mu)
go s.loop()
return s
}
type Scheduler struct {
cond *sync.Cond
mu sync.Mutex
muQ sync.Mutex
ef EdgeFactory
waitq map[*edge]struct{}
stopped chan struct{}
stoppedOnce sync.Once
closed chan struct{}
incoming map[*edge][]*edgePipe
outgoing map[*edge][]*edgePipe
}
func (s *Scheduler) Stop() {
s.stoppedOnce.Do(func() {
close(s.stopped)
})
<-s.closed
}
func (s *Scheduler) loop() {
defer func() {
close(s.closed)
}()
go func() {
<-s.stopped
s.mu.Lock()
s.cond.Signal()
s.mu.Unlock()
}()
s.mu.Lock()
for {
select {
case <-s.stopped:
s.mu.Unlock()
return
default:
}
s.muQ.Lock()
q := s.waitq
s.waitq = map[*edge]struct{}{}
s.muQ.Unlock()
if len(q) == 0 {
s.cond.Wait()
continue
}
for e := range q {
inc := make([]PipeWriter, len(s.incoming[e]))
for i, p := range s.incoming[e] {
inc[i] = p.Writer
}
out := make([]PipeReader, len(s.outgoing[e]))
for i, p := range s.outgoing[e] {
out[i] = p.Reader
}
e.hasActiveOutgoing = false
updates := []PipeReader{}
for _, p := range out {
if ok := p.Reload(); ok {
updates = append(updates, p)
}
if !p.Status().Completed {
e.hasActiveOutgoing = true
}
}
if debugScheduler {
logrus.Debugf(">> unpark %s req=%d upt=%d out=%d state=%d", e.edge.Vertex.Name(), len(inc), len(updates), len(out), e.state)
for i, dep := range e.deps {
des := edgeStatusInitial
if dep.req != nil {
des = dep.req.Request().(*edgeRequest).desiredState
}
logrus.Debugf(":: dep%d %s state=%d des=%d", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des)
}
for i, in := range inc {
req := in.Request()
logrus.Debugf("> incoming-%d: %p dstate=%d canceled=%v", i, in, req.Request.(*edgeRequest).desiredState, req.Canceled)
}
for i, up := range updates {
if up == e.cacheMapReq {
logrus.Debugf("> update-%d: %p cacheMapReq complete=%v", i, up, up.Status().Completed)
} else if up == e.execReq {
logrus.Debugf("> update-%d: %p execReq complete=%v", i, up, up.Status().Completed)
} else {
st, ok := up.Status().Value.(*edgeState)
if ok {
index := -1
if dep, ok := e.depRequests[up]; ok {
index = int(dep.index)
}
logrus.Debugf("> update-%d: %p input-%d keys=%d state=%d", i, up, index, len(st.keys), st.state)
}
}
}
}
e.unpark(inc, updates, out, &pipeFactory{s: s, e: e})
if debugScheduler {
for i, in := range inc {
logrus.Debugf("< incoming-%d: %p completed=%v", i, in, in.Status().Completed)
}
logrus.Debugf("<< unpark %s\n", e.edge.Vertex.Name())
}
inc2 := make([]*edgePipe, 0, len(inc))
for _, r := range s.incoming[e] {
if !r.Writer.Status().Completed {
inc2 = append(inc2, r)
}
}
if len(inc2) > 0 {
s.incoming[e] = inc2
} else {
delete(s.incoming, e)
}
out2 := make([]*edgePipe, 0, len(out))
for _, r := range s.outgoing[e] {
if !r.Reader.Status().Completed {
out2 = append(out2, r)
}
}
if len(out2) > 0 {
s.outgoing[e] = out2
} else {
delete(s.outgoing, e)
}
if e.keysDidChange {
origEdge := e.index.LoadOrStore(e, e.cacheMap.Digest, e.edge.Index, e.depKeys())
if origEdge != nil {
logrus.Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name())
s.mergeTo(origEdge, e)
s.ef.SetEdge(e.edge, origEdge)
}
e.keysDidChange = false
}
// avoid deadlocks.
// TODO: if these start showing up in error reports they can be changed
// to error the edge instead. They can only appear of algorithm bugs in
// unpark(), not for any external input.
if len(inc2) > 0 && len(out2) == 0 {
panic("invalid dispatch: return leaving incoming open")
}
if len(inc2) == 0 && len(out2) > 0 {
panic("invalid dispatch: return leaving outgoing open")
}
}
}
}
func (s *Scheduler) signal(e *edge) {
s.muQ.Lock()
if _, ok := s.waitq[e]; !ok {
s.waitq[e] = struct{}{}
go func() {
s.mu.Lock()
s.muQ.Lock()
_, ok := s.waitq[e]
s.muQ.Unlock()
if !ok {
s.mu.Unlock()
return
}
s.cond.Signal()
s.mu.Unlock()
}()
}
s.muQ.Unlock()
}
func (s *Scheduler) build(ctx context.Context, edge Edge) (CachedResult, error) {
s.mu.Lock()
e := s.ef.GetEdge(edge)
if e == nil {
s.mu.Unlock()
return nil, errors.Errorf("invalid request %v for build", edge)
}
wait := make(chan struct{})
var p *Pipe
p = s.newPipe(e, nil, PipeRequest{Request: &edgeRequest{desiredState: edgeStatusComplete}})
p.SignalWriter = func() {
p.Reader.Reload()
if p.Reader.Status().Completed {
close(wait)
}
}
s.mu.Unlock()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-ctx.Done()
p.Reader.Cancel()
}()
<-wait
if err := p.Reader.Status().Err; err != nil {
return nil, err
}
return p.Reader.Status().Value.(*edgeState).result, nil
}
func (s *Scheduler) newPipe(target, from *edge, req PipeRequest) *Pipe {
p := &edgePipe{
Pipe: NewPipe(req),
Target: target,
From: from,
}
s.signal(target)
if from != nil {
p.SignalWriter = func() {
p.mu.Lock()
defer p.mu.Unlock()
s.signal(p.From)
}
s.outgoing[from] = append(s.outgoing[from], p)
}
s.incoming[target] = append(s.incoming[target], p)
p.SignalReader = func() {
p.mu.Lock()
defer p.mu.Unlock()
s.signal(p.Target)
}
return p.Pipe
}
func (s *Scheduler) newRequestWithFunc(e *edge, f func(context.Context) (interface{}, error)) PipeReader {
pp, start := newFuncionPipe(f)
p := &edgePipe{
Pipe: pp,
From: e,
}
p.SignalWriter = func() {
p.mu.Lock()
defer p.mu.Unlock()
s.signal(p.From)
}
s.outgoing[e] = append(s.outgoing[e], p)
go start()
return p.Reader
}
func (s *Scheduler) mergeTo(target, src *edge) {
for _, inc := range s.incoming[src] {
inc.mu.Lock()
inc.Target = target
s.incoming[target] = append(s.incoming[target], inc)
inc.mu.Unlock()
}
for _, out := range s.outgoing[src] {
out.mu.Lock()
out.From = target
s.outgoing[target] = append(s.outgoing[target], out)
out.mu.Unlock()
out.Reader.Cancel()
}
delete(s.incoming, src)
delete(s.outgoing, src)
s.signal(target)
// TODO(tonistiigi): merge cache providers
// TODO(tonistiigi): check ignore-cache compat before merge
}
func newEdge(ed Edge, op activeOp, index *EdgeIndex) *edge {
e := &edge{
edge: ed,
op: op,
depRequests: map[PipeReader]*dep{},
cacheRecords: map[string]*CacheRecord{},
index: index,
}
return e
}
type edge struct {
edge Edge
op activeOp
edgeState
depRequests map[PipeReader]*dep
deps []*dep
cacheMapReq PipeReader
execReq PipeReader
err error
cacheRecords map[string]*CacheRecord
noCacheMatchPossible bool
allDepsCompletedCacheFast bool
allDepsCompletedCacheSlow bool
allDepsCompleted bool
hasActiveOutgoing bool
releaserCount int
keysDidChange bool
index *EdgeIndex
}
// dep holds state for a dependant edge
type dep struct {
req PipeReader
edgeState
index Index
cacheRecords map[string]*CacheRecord
desiredState edgeStatusType
e *edge
slowCacheReq PipeReader // TODO: reuse req
slowCacheComplete bool
slowCacheKey CacheKey
err error
}
func newDep(i Index) *dep {
return &dep{index: i, cacheRecords: map[string]*CacheRecord{}}
}
type edgeState struct {
state edgeStatusType
result CachedResult
cacheMap *CacheMap
keys []CacheKey
}
func isEqualState(s1, s2 edgeState) bool {
if s1.state != s2.state || s1.result != s2.result || s1.cacheMap != s2.cacheMap || len(s1.keys) != len(s2.keys) {
return false
}
return true
}
type edgeRequest struct {
desiredState edgeStatusType
currentState edgeState
}
func (e *edge) duplicateReleaser() {
e.releaserCount += 1
}
func (e *edge) release() {
if e.releaserCount > 0 {
e.releaserCount--
return
}
e.index.Release(e)
}
// commitOptions returns parameters for the op execution
func (e *edge) commitOptions() (CacheKey, []Result) {
if e.deps == nil {
return NewCacheKey(e.cacheMap.Digest, e.edge.Index, nil), nil
}
inputs := make([]CacheKey, len(e.deps))
results := make([]Result, len(e.deps))
for i, dep := range e.deps {
inputs[i] = dep.result.CacheKey()
if dep.slowCacheKey != nil {
inputs[i] = NewCacheKey("", 0, []CacheKey{inputs[i], dep.slowCacheKey})
}
results[i] = dep.result
}
return NewCacheKey(e.cacheMap.Digest, e.edge.Index, inputs), results
}
func (e *edge) isComplete() bool {
return e.err != nil || e.result != nil
}
func (e *edge) cancelPipes(pipes []PipeReader) {
for _, p := range pipes {
p.Cancel()
}
}
func (e *edge) finishIncoming(req PipeWriter) {
err := e.err
if req.Request().Canceled && err == nil {
err = context.Canceled
}
if debugScheduler {
logrus.Debugf("finishIncoming %s %v %#v %v", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Request.(*edgeRequest).desiredState)
}
req.Finalize(&e.edgeState, err)
}
func (e *edge) updateIncoming(req PipeWriter) {
req.Update(&e.edgeState)
}
// probeCache is called with unprocessed cache keys for dependency
// if the key could match the edge the cacheRecords for dependency are filled
func (e *edge) probeCache(d *dep, keys []CacheKey) {
if len(keys) == 0 {
return
}
records, err := e.op.Cache().Query(keys, d.index, e.cacheMap.Digest, e.edge.Index)
if err != nil {
e.err = errors.Wrap(err, "error on cache query")
}
for _, r := range records {
if _, ok := d.cacheRecords[r.ID]; !ok {
d.cacheRecords[r.ID] = r
}
}
}
// checkDepMatchPossible checks if any cache matches are possible pass this point
func (e *edge) checkDepMatchPossible(dep *dep) {
depHasSlowCache := e.cacheMap.Deps[dep.index].ComputeDigestFunc != nil
if !e.noCacheMatchPossible && ((dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state == edgeStatusCacheFast) && len(dep.cacheRecords) == 0) {
e.noCacheMatchPossible = true
}
}
// slowCacheFunc returns the result based cache func for dependency if it exists
func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc {
if e.cacheMap == nil {
return nil
}
return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc
}
// allDepsHaveKeys checks if all dependencies have at least one key. used for
// determining if there is enough data for combining cache key for edge
func (e *edge) allDepsHaveKeys() bool {
for _, d := range e.deps {
if len(d.keys) == 0 {
return false
}
}
return true
}
// depKeys returns all current dependency cache keys
func (e *edge) depKeys() [][]CacheKey {
keys := make([][]CacheKey, len(e.deps))
for i, d := range e.deps {
keys[i] = d.keys
if d.result != nil {
keys[i] = append(keys[i], d.result.CacheKey())
}
if d.slowCacheKey != nil {
keys[i] = append(keys[i], d.slowCacheKey)
}
}
return keys
}
// slow cache keys can be computed in 2 phases if there are multiple deps.
// first evaluate ones that didn't match any definition based keys
func (e *edge) skipPhase2SlowCache(dep *dep) bool {
isPhase1 := false
for _, dep := range e.deps {
if !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.cacheRecords) == 0 {
isPhase1 = true
break
}
}
if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.cacheRecords) > 0 {
return true
}
return false
}
// unpark is called by the scheduler with incoming requests and updates for
// previous calls.
// To avoid deadlocks and resource leaks this function needs to follow
// following rules:
// 1) this function needs to return unclosed outgoing requests if some incoming
// requests were not completed
// 2) this function may not return outgoing requests if it has completed all
// incoming requests
func (e *edge) unpark(incoming []PipeWriter, updates, allPipes []PipeReader, f *pipeFactory) {
// TODO: split into helper functions
// process all latest changes
depChanged := false
for _, upt := range updates {
// response for cachemap request
if upt == e.cacheMapReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
if e.err == nil {
e.err = err
}
} else {
e.cacheMap = upt.Status().Value.(*CacheMap)
if len(e.deps) == 0 {
k := NewCacheKey(e.cacheMap.Digest, e.edge.Index, nil)
records, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
if err != nil {
logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
} else {
for _, r := range records {
e.cacheRecords[r.ID] = r
}
if len(records) > 0 {
e.keys = append(e.keys, k)
}
if e.allDepsHaveKeys() {
e.keysDidChange = true
}
}
e.state = edgeStatusCacheSlow
}
// probe keys that were loaded before cache map
for _, dep := range e.deps {
e.probeCache(dep, dep.keys)
e.checkDepMatchPossible(dep)
}
depChanged = true
}
// response for exec request
} else if upt == e.execReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
if e.err == nil {
e.err = err
}
} else {
e.result = upt.Status().Value.(CachedResult)
e.state = edgeStatusComplete
}
// response for requests to dependencies
} else if dep, ok := e.depRequests[upt]; ok { // TODO: ignore canceled
if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil {
if e.err == nil {
e.err = err
}
dep.err = err
}
state := upt.Status().Value.(*edgeState)
if len(dep.keys) < len(state.keys) {
newKeys := state.keys[len(dep.keys):]
if e.cacheMap != nil {
e.probeCache(dep, newKeys)
if e.allDepsHaveKeys() {
e.keysDidChange = true
}
}
depChanged = true
}
if dep.state != edgeStatusComplete && state.state == edgeStatusComplete {
e.keysDidChange = true
}
recheck := state.state != dep.state
dep.edgeState = *state
if recheck && e.cacheMap != nil {
e.checkDepMatchPossible(dep)
depChanged = true
}
// set current state
// add to probedKeys
} else {
for _, dep := range e.deps {
if upt == dep.slowCacheReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil && e.err == nil {
e.err = upt.Status().Err
} else if !dep.slowCacheComplete {
k := NewCacheKey(upt.Status().Value.(digest.Digest), -1, nil)
dep.slowCacheKey = k
e.probeCache(dep, []CacheKey{k})
dep.slowCacheComplete = true
e.keysDidChange = true
}
// dep.slowCacheReq = nil
depChanged = true
}
}
}
}
// the dep responses had changes. need to reevaluate edge state
if depChanged {
// TODO: fast pass to detect incomplete results
newRecords := map[string]*CacheRecord{}
for i, dep := range e.deps {
if i == 0 {
for key, r := range dep.cacheRecords {
if _, ok := e.cacheRecords[key]; ok {
continue
}
newRecords[key] = r
}
} else {
for key := range newRecords {
if _, ok := dep.cacheRecords[key]; !ok {
delete(newRecords, key)
}
}
}
if len(newRecords) == 0 {
break
}
}
for k, r := range newRecords {
e.keys = append(e.keys, r.CacheKey)
e.cacheRecords[k] = r
}
// detect lower/upper bound for current state
allDepsCompletedCacheFast := true
allDepsCompletedCacheSlow := true
allDepsCompleted := true
stLow := edgeStatusInitial
stHigh := edgeStatusCacheSlow
if e.cacheMap != nil {
for _, dep := range e.deps {
isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
if dep.state > stLow && len(dep.cacheRecords) == 0 && !isSlowIncomplete {
stLow = dep.state
if stLow > edgeStatusCacheSlow {
stLow = edgeStatusCacheSlow
}
}
if dep.state < stHigh {
stHigh = dep.state
}
if isSlowIncomplete || dep.state < edgeStatusComplete {
allDepsCompleted = false
}
if dep.state < edgeStatusCacheFast {
allDepsCompletedCacheFast = false
}
if isSlowIncomplete || dep.state < edgeStatusCacheSlow {
allDepsCompletedCacheSlow = false
}
}
if stHigh > e.state {
e.state = stHigh
}
if stLow > e.state {
e.state = stLow
}
e.allDepsCompletedCacheFast = allDepsCompletedCacheFast
e.allDepsCompletedCacheSlow = allDepsCompletedCacheSlow
e.allDepsCompleted = allDepsCompleted
}
}
// detect the result state for the requests
allIncomingCanComplete := true
desiredState := e.state
// check incoming requests
// check if all requests can be either answered
if !e.isComplete() {
for _, req := range incoming {
if !req.Request().Canceled {
if r := req.Request().Request.(*edgeRequest); desiredState < r.desiredState {
desiredState = r.desiredState
allIncomingCanComplete = false
}
}
}
}
// do not set allIncomingCanComplete if some e.state != edgeStateComplete dep.state < e.state && len(e.keys) == 0
hasIncompleteDeps := false
if e.state < edgeStatusComplete && len(e.keys) == 0 {
for _, dep := range e.deps {
if dep.err == nil && dep.state < e.state {
hasIncompleteDeps = true
break
}
}
}
if hasIncompleteDeps {
allIncomingCanComplete = false
}
if debugScheduler {
logrus.Debugf("status state=%d cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast)
}
if allIncomingCanComplete && e.hasActiveOutgoing {
// cancel all current requests
e.cancelPipes(allPipes)
// can close all but one requests
var leaveOpen PipeWriter
for _, req := range incoming {
if !req.Request().Canceled {
leaveOpen = req
break
}
}
for _, req := range incoming {
if leaveOpen == nil || leaveOpen == req {
leaveOpen = req
continue
}
e.finishIncoming(req)
}
return
}
// can complete, finish and return
if allIncomingCanComplete && !e.hasActiveOutgoing {
for _, req := range incoming {
e.finishIncoming(req)
}
return
}
// update incoming based on current state
for _, req := range incoming {
r := req.Request().Request.(*edgeRequest)
if !hasIncompleteDeps && (e.state >= r.desiredState || req.Request().Canceled) {
e.finishIncoming(req)
} else if !isEqualState(r.currentState, e.edgeState) {
e.updateIncoming(req)
}
}
// set up new outgoing requests if needed
if e.cacheMapReq == nil {
e.cacheMapReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
return e.op.CacheMap(ctx)
})
}
// initialize deps state
if e.deps == nil {
e.depRequests = make(map[PipeReader]*dep)
e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
for i := range e.edge.Vertex.Inputs() {
e.deps = append(e.deps, newDep(Index(i)))
}
}
// cycle all dependencies. set up outgoing requests if needed
for _, dep := range e.deps {
desiredStateDep := dep.state
if e.noCacheMatchPossible {
desiredStateDep = desiredState
} else if dep.state == edgeStatusInitial && desiredState > dep.state {
desiredStateDep = edgeStatusCacheFast
} else if dep.state == edgeStatusCacheFast && desiredState > dep.state {
if e.allDepsCompletedCacheFast && len(e.keys) == 0 {
desiredStateDep = edgeStatusCacheSlow
}
} else if dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete {
if (e.allDepsCompletedCacheSlow || e.slowCacheFunc(dep) != nil) && len(e.keys) == 0 {
if !e.skipPhase2SlowCache(dep) {
desiredStateDep = edgeStatusComplete
}
}
} else if dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow {
if !e.skipPhase2SlowCache(dep) {
desiredStateDep = edgeStatusComplete
}
}
// outgoing request is needed
if dep.state < desiredStateDep {
addNew := true
if dep.req != nil && !dep.req.Status().Completed {
if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
dep.req.Cancel()
} else {
addNew = false
}
}
if addNew {
req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{
currentState: dep.edgeState,
desiredState: desiredStateDep,
})
e.depRequests[req] = dep
dep.req = req
}
} else if dep.req != nil && !dep.req.Status().Completed {
dep.req.Cancel()
}
// initialize function to compute cache key based on dependency result
if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && e.slowCacheFunc(dep) != nil && e.cacheMap != nil {
fn := e.slowCacheFunc(dep)
res := dep.result
func(fn ResultBasedCacheFunc, res Result, index Index) {
dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
return e.op.CalcSlowCache(ctx, index, fn, res)
})
}(fn, res, dep.index)
}
}
// execute op
if e.execReq == nil && desiredState == edgeStatusComplete {
if e.keysDidChange {
// postpone executing to next invocation if we have unprocessed keys
f.NewFuncRequest(func(context.Context) (interface{}, error) {
return nil, nil
})
return
}
if len(e.keys) > 0 {
e.execReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
var rec *CacheRecord
for _, r := range e.cacheRecords { // TODO: time/priority order
rec = r
break
}
logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
res, err := e.op.Cache().Load(ctx, rec)
if err != nil {
return nil, err
}
return NewCachedResult(res, rec.CacheKey), nil
})
for req := range e.depRequests {
req.Cancel()
}
} else if e.allDepsCompleted {
e.execReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
cacheKey, inputs := e.commitOptions()
results, err := e.op.Exec(ctx, inputs)
if err != nil {
return nil, err
}
index := e.edge.Index
if len(results) <= int(index) {
return nil, errors.Errorf("invalid response from exec need %d index but %d results received", index, len(results))
}
res := results[int(index)]
ck, err := e.op.Cache().Save(cacheKey, res)
if err != nil {
return nil, err
}
return NewCachedResult(res, ck), nil
})
}
}
}
type pipeFactory struct {
e *edge
s *Scheduler
}
func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) PipeReader {
target := pf.s.ef.GetEdge(ee)
if target == nil {
panic("failed to get edge") // TODO: return errored pipe
}
p := pf.s.newPipe(target, pf.e, PipeRequest{Request: req})
if debugScheduler {
logrus.Debugf("> newPipe %s %p desiredState=%d", ee.Vertex.Name(), p, req.desiredState)
}
return p.Reader
}
func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, error)) PipeReader {
p := pf.s.newRequestWithFunc(pf.e, f)
if debugScheduler {
logrus.Debugf("> newFunc %p", p)
}
return p
}

File diff suppressed because it is too large Load Diff

View File

@ -1,144 +0,0 @@
package solver
import (
"context"
"sync"
digest "github.com/opencontainers/go-digest"
)
// Vertex is one node in the build graph
type Vertex interface {
// Digest is a content-addressable vertex identifier
Digest() digest.Digest
// Sys returns an internal value that is used to execute the vertex. Usually
// this is capured by the operation resolver method during solve.
Sys() interface{}
// FIXME(AkihiroSuda): we should not import pb pkg here.
// TODO(tonistiigi): reenable strict metadata CacheManager, cache_ignore
// Metadata() *pb.OpMetadata
// Array of edges current vertex depends on.
Inputs() []Edge
Name() string
}
// Index is a index value for output edge
type Index int
// Edge is a path to a specific output of the vertex
type Edge struct {
Index Index
Vertex Vertex
}
// Result is an abstract return value for a solve
type Result interface {
ID() string
Release(context.Context) error
Sys() interface{}
}
// CachedResult is a result connected with its cache key
type CachedResult interface {
Result
CacheKey() CacheKey
// ExportCache(context.Context, content.Store) (*ocispec.Descriptor, error)
}
// Op is an implementation for running a vertex
type Op interface {
// CacheMap returns structure describing how the operation is cached
CacheMap(context.Context) (*CacheMap, error)
// Exec runs an operation given results from previous operations.
Exec(ctx context.Context, inputs []Result) (outputs []Result, err error)
}
type ResultBasedCacheFunc func(context.Context, Result) (digest.Digest, error)
type CacheMap struct {
// Digest is a base digest for operation that needs to be combined with
// inputs cache or selectors for dependencies.
Digest digest.Digest
Deps []struct {
// Optional digest that is merged with the cache key of the input
// TODO(tonistiigi): not implemented
Selector digest.Digest
// Optional function that returns a digest for the input based on its
// return value
ComputeDigestFunc ResultBasedCacheFunc
}
}
// CacheKey is an identifier for storing/loading build cache
type CacheKey interface {
// Deps are dependant cache keys
Deps() []CacheKey
// Base digest for operation. Usually CacheMap.Digest
Digest() digest.Digest
// Index for the output that is cached
Output() Index
// Helpers for implementations for adding internal metadata
SetValue(key, value interface{})
GetValue(key interface{}) interface{}
}
// CacheRecord is an identifier for loading in cache
type CacheRecord struct {
ID string
CacheKey CacheKey
// Loadable bool
// Size int
// CreatedAt time.Time
}
// CacheManager implements build cache backend
type CacheManager interface {
// Query searches for cache paths from one cache key to the output of a possible match.
Query(inp []CacheKey, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheRecord, error)
// Load pulls and returns the cached result
Load(ctx context.Context, rec *CacheRecord) (Result, error)
// Save saves a result based on a cache key
Save(key CacheKey, s Result) (CacheKey, error)
}
// NewCacheKey creates a new cache key for a specific output index
func NewCacheKey(dgst digest.Digest, index Index, deps []CacheKey) CacheKey {
return &cacheKey{
dgst: dgst,
deps: deps,
index: index,
values: map[interface{}]interface{}{},
}
}
type cacheKey struct {
mu sync.RWMutex
dgst digest.Digest
index Index
deps []CacheKey
values map[interface{}]interface{}
}
func (ck *cacheKey) SetValue(key, value interface{}) {
ck.mu.Lock()
defer ck.mu.Unlock()
ck.values[key] = value
}
func (ck *cacheKey) GetValue(key interface{}) interface{} {
ck.mu.RLock()
defer ck.mu.RUnlock()
return ck.values[key]
}
func (ck *cacheKey) Deps() []CacheKey {
return ck.deps
}
func (ck *cacheKey) Digest() digest.Digest {
return ck.dgst
}
func (ck *cacheKey) Output() Index {
return ck.index
}