solver: add support for cache tracking through query

Fix result releasing bugs.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2018-03-01 11:09:21 -08:00
parent 12198eea27
commit 513018806b
8 changed files with 393 additions and 119 deletions

View File

@ -92,7 +92,8 @@ func (s *Store) Walk(fn func(id string) error) error {
}
func (s *Store) WalkResults(id string, fn func(solver.CacheResult) error) error {
return s.db.View(func(tx *bolt.Tx) error {
var list []solver.CacheResult
if err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(resultBucket))
if b == nil {
return nil
@ -101,17 +102,24 @@ func (s *Store) WalkResults(id string, fn func(solver.CacheResult) error) error
if b == nil {
return nil
}
if err := b.ForEach(func(k, v []byte) error {
return b.ForEach(func(k, v []byte) error {
var res solver.CacheResult
if err := json.Unmarshal(v, &res); err != nil {
return err
}
return fn(res)
}); err != nil {
list = append(list, res)
return nil
})
}); err != nil {
return err
}
for _, res := range list {
if err := fn(res); err != nil {
return err
}
return nil
})
}
return nil
}
func (s *Store) Load(id string, resultID string) (solver.CacheResult, error) {
@ -151,7 +159,6 @@ func (s *Store) AddResult(id string, res solver.CacheResult) error {
if err := b.Put([]byte(res.ID), dt); err != nil {
return err
}
b, err = tx.Bucket([]byte(byResultBucket)).CreateBucketIfNotExists([]byte(res.ID))
if err != nil {
return err
@ -298,6 +305,7 @@ func (s *Store) AddLink(id string, link solver.CacheInfoLink, target string) err
}
func (s *Store) WalkLinks(id string, link solver.CacheInfoLink, fn func(id string) error) error {
var links []string
if err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(linksBucket))
if b == nil {
@ -312,18 +320,13 @@ func (s *Store) WalkLinks(id string, link solver.CacheInfoLink, fn func(id strin
if err != nil {
return err
}
index := bytes.Join([][]byte{dt, {}}, []byte("@"))
c := b.Cursor()
k, _ := c.Seek([]byte(index))
for {
if k != nil && bytes.HasPrefix(k, index) {
target := bytes.TrimPrefix(k, index)
if err := fn(string(target)); err != nil {
return err
}
links = append(links, string(target))
k, _ = c.Next()
} else {
break
@ -334,6 +337,11 @@ func (s *Store) WalkLinks(id string, link solver.CacheInfoLink, fn func(id strin
}); err != nil {
return err
}
for _, l := range links {
if err := fn(l); err != nil {
return err
}
}
return nil
}

View File

@ -60,7 +60,7 @@ func (ck *inMemoryCacheKey) Deps() []CacheKeyWithSelector {
logrus.Errorf("dependency %s not found", dep.ID)
} else {
deps[i] = CacheKeyWithSelector{
CacheKey: withExporter(ck.manager.toInMemoryCacheKey(k), nil),
CacheKey: withExporter(ck.manager.toInMemoryCacheKey(k), nil, nil),
Selector: dep.Selector,
}
}
@ -77,16 +77,18 @@ func (ck *inMemoryCacheKey) Output() Index {
return Index(ck.CacheKeyInfo.Output)
}
func withExporter(ck *inMemoryCacheKey, cacheResult *CacheResult) ExportableCacheKey {
func withExporter(ck *inMemoryCacheKey, cacheResult *CacheResult, dep *CacheKeyWithSelector) ExportableCacheKey {
return ExportableCacheKey{ck, &cacheExporter{
inMemoryCacheKey: ck,
cacheResult: cacheResult,
dep: dep,
}}
}
type cacheExporter struct {
*inMemoryCacheKey
cacheResult *CacheResult
dep *CacheKeyWithSelector
}
func (ce *cacheExporter) Export(ctx context.Context, m map[digest.Digest]*ExportRecord, converter func(context.Context, Result) (*Remote, error)) (*ExportRecord, error) {
@ -147,7 +149,13 @@ func (ce *cacheExporter) Export(ctx context.Context, m map[digest.Digest]*Export
}] = struct{}{}
}
for i, dep := range ce.Deps() {
allDeps := ce.Deps()
if ce.dep != nil {
allDeps = []CacheKeyWithSelector{*ce.dep}
}
for i, dep := range allDeps {
r, err := dep.CacheKey.Export(ctx, m, converter)
if err != nil {
return nil, err
@ -209,40 +217,68 @@ func (c *inMemoryCacheManager) Query(deps []ExportableCacheKey, input Index, dgs
c.mu.RLock()
defer c.mu.RUnlock()
refs := map[string]struct{}{}
sublinks := map[string]struct{}{}
refs := map[string]*CacheKeyWithSelector{}
sublinks := map[string]*CacheKeyWithSelector{}
for _, dep := range deps {
ck, err := c.getInternalKey(dep, false)
if err == nil {
if err := c.backend.WalkLinks(ck.CacheKeyInfo.ID, CacheInfoLink{input, output, dgst, selector}, func(id string) error {
refs[id] = struct{}{}
return nil
}); err != nil {
return nil, err
for _, d := range deps {
var dd []CacheKeyWithSelector
if d.Digest() == "" && d.Output() == 0 {
for _, dep := range d.Deps() {
dd = append(dd, dep)
}
} else {
dd = append(dd, CacheKeyWithSelector{Selector: selector, CacheKey: d})
}
if err := c.backend.WalkLinks(ck.CacheKeyInfo.ID, CacheInfoLink{Index(-1), Index(0), "", selector}, func(id string) error {
sublinks[id] = struct{}{}
return nil
}); err != nil {
return nil, err
}
for _, dep := range dd {
ck, err := c.getInternalKey(dep.CacheKey, false)
if err == nil {
if err := c.backend.WalkLinks(ck.CacheKeyInfo.ID, CacheInfoLink{input, output, dgst, selector}, func(id string) error {
refs[id] = &CacheKeyWithSelector{Selector: selector, CacheKey: d}
return nil
}); err != nil {
return nil, err
}
if err := c.backend.WalkLinks(ck.CacheKeyInfo.ID, CacheInfoLink{Index(-1), Index(0), "", NoSelector}, func(id string) error {
sublinks[id] = struct{}{}
return nil
}); err != nil {
return nil, err
if err := c.backend.WalkLinks(ck.CacheKeyInfo.ID, CacheInfoLink{Index(-1), Index(0), "", selector}, func(id string) error {
sublinks[id] = &CacheKeyWithSelector{Selector: selector, CacheKey: d}
return nil
}); err != nil {
return nil, err
}
if err := c.backend.WalkLinks(ck.CacheKeyInfo.ID, CacheInfoLink{Index(-1), Index(0), "", NoSelector}, func(id string) error {
sublinks[id] = &CacheKeyWithSelector{Selector: selector, CacheKey: d}
return nil
}); err != nil {
return nil, err
}
}
}
}
for id := range sublinks {
for id, mainKey := range sublinks {
ck, err := c.backend.Get(id)
if err == nil {
mainCk, err := c.getInternalKey(mainKey.CacheKey, false)
addNewKey := mainKey.CacheKey.Digest() == "" && (err != nil || mainCk.CacheKeyInfo.ID != ck.ID)
if err := c.backend.WalkLinks(ck.ID, CacheInfoLink{input, output, dgst, ""}, func(id string) error {
refs[id] = struct{}{}
if addNewKey {
ck, err := c.getInternalKey(mainKey.CacheKey, true)
if err != nil {
return err
}
err = c.backend.AddLink(ck.CacheKeyInfo.ID, CacheInfoLink{
Input: input,
Output: output,
Digest: dgst,
Selector: "",
}, id)
if err != nil {
return err
}
}
refs[id] = mainKey
return nil
}); err != nil {
return nil, err
@ -255,24 +291,28 @@ func (c *inMemoryCacheManager) Query(deps []ExportableCacheKey, input Index, dgs
if err != nil {
return nil, nil
}
refs[ck.CacheKeyInfo.ID] = struct{}{}
refs[ck.CacheKeyInfo.ID] = nil
}
keys := make([]*inMemoryCacheKey, 0)
keys := make(map[string]*CacheKeyWithSelector)
outs := make([]*CacheRecord, 0, len(refs))
for id := range refs {
for id, dep := range refs {
cki, err := c.backend.Get(id)
if err == nil {
k := c.toInMemoryCacheKey(cki)
keys = append(keys, k)
keys[cki.ID] = dep
if err := c.backend.WalkResults(id, func(r CacheResult) error {
outs = append(outs, &CacheRecord{
ID: id + "@" + r.ID,
CacheKey: withExporter(k, &r),
CacheManager: c,
Loadable: true,
CreatedAt: r.CreatedAt,
})
if c.results.Exists(r.ID) {
outs = append(outs, &CacheRecord{
ID: id + "@" + r.ID,
CacheKey: withExporter(k, &r, dep),
CacheManager: c,
Loadable: true,
CreatedAt: r.CreatedAt,
})
} else {
c.backend.Release(r.ID)
}
return nil
}); err != nil {
return nil, err
@ -281,13 +321,17 @@ func (c *inMemoryCacheManager) Query(deps []ExportableCacheKey, input Index, dgs
}
if len(outs) == 0 {
for _, k := range keys {
outs = append(outs, &CacheRecord{
ID: k.CacheKeyInfo.ID,
CacheKey: withExporter(k, nil),
CacheManager: c,
Loadable: false,
})
for id, dep := range keys {
cki, err := c.backend.Get(id)
if err == nil {
k := c.toInMemoryCacheKey(cki)
outs = append(outs, &CacheRecord{
ID: k.CacheKeyInfo.ID,
CacheKey: withExporter(k, nil, dep),
CacheManager: c,
Loadable: false,
})
}
}
}
@ -335,7 +379,7 @@ func (c *inMemoryCacheManager) Save(k CacheKey, r Result) (ExportableCacheKey, e
return empty, err
}
return withExporter(ck, &res), nil
return withExporter(ck, &res, nil), nil
}
func (c *inMemoryCacheManager) getInternalKey(k CacheKey, createIfNotExist bool) (*inMemoryCacheKey, error) {

View File

@ -275,6 +275,39 @@ func TestInMemoryCacheRestoreOfflineDeletion(t *testing.T) {
require.True(t, matches[0].Loadable)
}
func TestCarryOverFromSublink(t *testing.T) {
storage := NewInMemoryCacheStorage()
results := NewInMemoryResultStorage()
m := NewCacheManager(identity.NewID(), storage, results)
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("resultFoo"))
require.NoError(t, err)
k := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo, Selector: dgst("sel0")},
{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey(dgst("content0"), 0, nil)}, Selector: NoSelector},
})
_, err = m.Save(NewCacheKey(dgst("res"), 0, []CacheKeyWithSelector{{CacheKey: ExportableCacheKey{CacheKey: k}}}), testResult("result0"))
require.NoError(t, err)
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0, nil), testResult("resultBar"))
require.NoError(t, err)
k3 := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheBar, Selector: dgst("sel0")},
{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey(dgst("content0"), 0, nil)}, Selector: NoSelector},
})
matches, err := m.Query([]ExportableCacheKey{{CacheKey: k3}}, 0, dgst("res"), 0, "")
require.NoError(t, err)
require.Equal(t, len(matches), 1)
matches, err = m.Query([]ExportableCacheKey{{CacheKey: cacheBar}}, 0, dgst("res"), 0, dgst("sel0"))
require.NoError(t, err)
require.Equal(t, len(matches), 1)
}
func dgst(s string) digest.Digest {
return digest.FromBytes([]byte(s))
}

View File

@ -50,6 +50,7 @@ type edge struct {
noCacheMatchPossible bool
allDepsCompletedCacheFast bool
allDepsCompletedCacheSlow bool
allDepsStateCacheSlow bool
allDepsCompleted bool
hasActiveOutgoing bool
@ -68,6 +69,7 @@ type dep struct {
e *edge
slowCacheReq pipe.Receiver // TODO: reuse req
slowCacheComplete bool
slowCacheFoundKey bool
slowCacheKey *ExportableCacheKey
err error
}
@ -84,7 +86,7 @@ type edgePipe struct {
type edgeState struct {
state edgeStatusType
result CachedResult
result *SharedCachedResult
cacheMap *CacheMap
keys []ExportableCacheKey
}
@ -99,6 +101,7 @@ func isEqualState(s1, s2 edgeState) bool {
type edgeRequest struct {
desiredState edgeStatusType
currentState edgeState
currentKeys int
}
// incrementReferenceCount increases the number of times release needs to be
@ -114,6 +117,9 @@ func (e *edge) release() {
return
}
e.index.Release(e)
if e.result != nil {
e.result.Release(context.TODO())
}
}
// commitOptions returns parameters for the op execution
@ -127,12 +133,7 @@ func (e *edge) commitOptions() (CacheKey, []CachedResult) {
for i, dep := range e.deps {
inputs[i] = CacheKeyWithSelector{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector}
if dep.slowCacheKey != nil {
ck := NewCacheKey("", 0, []CacheKeyWithSelector{
inputs[i],
{CacheKey: *dep.slowCacheKey, Selector: NoSelector},
},
)
inputs[i] = CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: ck, Exporter: &emptyExporter{ck}}}
inputs[i] = CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
}
results[i] = dep.result
}
@ -163,12 +164,12 @@ func (e *edge) updateIncoming(req pipe.Sender) {
// probeCache is called with unprocessed cache keys for dependency
// if the key could match the edge, the cacheRecords for dependency are filled
func (e *edge) probeCache(d *dep, keys []ExportableCacheKey) {
func (e *edge) probeCache(d *dep, keys []ExportableCacheKey) bool {
if len(keys) == 0 {
return
return false
}
if e.op.IgnoreCache() {
return
return false
}
records, err := e.op.Cache().Query(keys, d.index, e.cacheMap.Digest, e.edge.Index, e.cacheMap.Deps[d.index].Selector)
if err != nil {
@ -179,12 +180,13 @@ func (e *edge) probeCache(d *dep, keys []ExportableCacheKey) {
d.cacheRecords[r.ID] = r
}
}
return len(records) > 0
}
// checkDepMatchPossible checks if any cache matches are possible past this point
func (e *edge) checkDepMatchPossible(dep *dep) {
depHasSlowCache := e.cacheMap.Deps[dep.index].ComputeDigestFunc != nil
if !e.noCacheMatchPossible && ((dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state == edgeStatusCacheFast) && len(dep.cacheRecords) == 0) {
if !e.noCacheMatchPossible && (((!dep.slowCacheFoundKey && dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state >= edgeStatusCacheSlow)) && len(dep.keys) == 0) {
e.noCacheMatchPossible = true
}
}
@ -200,8 +202,11 @@ func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc {
// allDepsHaveKeys checks if all dependencies have at least one key. used for
// determining if there is enough data for combining cache key for edge
func (e *edge) allDepsHaveKeys() bool {
if e.cacheMap == nil {
return false
}
for _, d := range e.deps {
if len(d.keys) == 0 {
if len(d.keys) == 0 && d.slowCacheKey == nil {
return false
}
}
@ -242,6 +247,21 @@ func (e *edge) skipPhase2SlowCache(dep *dep) bool {
return false
}
func (e *edge) skipPhase2FastCache(dep *dep) bool {
isPhase1 := false
for _, dep := range e.deps {
if e.cacheMap == nil || len(dep.cacheRecords) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) {
isPhase1 = true
break
}
}
if isPhase1 && len(dep.cacheRecords) > 0 {
return true
}
return false
}
// unpark is called by the scheduler with incoming requests and updates for
// previous calls.
// To avoid deadlocks and resource leaks this function needs to follow
@ -276,11 +296,15 @@ func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver,
})
}
e.createInputRequests(desiredState, f)
// execute op
if e.execReq == nil && desiredState == edgeStatusComplete {
e.execIfPossible(f)
if ok := e.execIfPossible(f); ok {
return
}
}
if e.execReq == nil {
e.createInputRequests(desiredState, f)
}
}
@ -290,7 +314,8 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
// response for cachemap request
if upt == e.cacheMapReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
if e.err == nil {
e.cacheMapReq = nil
if !upt.Status().Canceled && e.err == nil {
e.err = err
}
} else {
@ -328,11 +353,12 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
// response for exec request
if upt == e.execReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
if e.err == nil {
e.execReq = nil
if !upt.Status().Canceled && e.err == nil {
e.err = err
}
} else {
e.result = upt.Status().Value.(CachedResult)
e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult))
e.state = edgeStatusComplete
}
return true
@ -377,18 +403,24 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
}
// response for result based cache function
for _, dep := range e.deps {
for i, dep := range e.deps {
if upt == dep.slowCacheReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
if e.err == nil {
dep.slowCacheReq = nil
if !upt.Status().Canceled && e.err == nil {
e.err = upt.Status().Err
}
} else if !dep.slowCacheComplete {
k := NewCacheKey(upt.Status().Value.(digest.Digest), -1, nil)
dep.slowCacheKey = &ExportableCacheKey{k, &emptyExporter{k}}
e.probeCache(dep, []ExportableCacheKey{*dep.slowCacheKey})
ck := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector},
{CacheKey: ExportableCacheKey{CacheKey: k, Exporter: &emptyExporter{k}}, Selector: NoSelector},
})
dep.slowCacheKey = &ExportableCacheKey{ck, &emptyExporter{ck}}
dep.slowCacheFoundKey = e.probeCache(dep, []ExportableCacheKey{*dep.slowCacheKey})
dep.slowCacheComplete = true
e.keysDidChange = true
e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true
}
return true
}
@ -431,22 +463,31 @@ func (e *edge) recalcCurrentState() {
}
// detect lower/upper bound for current state
allDepsCompletedCacheFast := true
allDepsCompletedCacheSlow := true
allDepsCompletedCacheFast := e.cacheMap != nil
allDepsCompletedCacheSlow := e.cacheMap != nil
allDepsStateCacheSlow := true
allDepsCompleted := true
stLow := edgeStatusInitial
stHigh := edgeStatusCacheSlow
stLow := edgeStatusInitial // minimal possible state
stHigh := edgeStatusCacheSlow // maximum possible state
if e.cacheMap != nil {
for _, dep := range e.deps {
isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
if dep.state > stLow && len(dep.cacheRecords) == 0 && !isSlowIncomplete {
stLow = dep.state
if stLow > edgeStatusCacheSlow {
stLow = edgeStatusCacheSlow
}
}
if dep.state < stHigh {
stHigh = dep.state
effectiveState := dep.state
if dep.state == edgeStatusCacheSlow && isSlowIncomplete {
effectiveState = edgeStatusCacheFast
}
if dep.state == edgeStatusComplete && isSlowIncomplete {
effectiveState = edgeStatusCacheFast
}
if effectiveState < stHigh {
stHigh = effectiveState
}
if isSlowIncomplete || dep.state < edgeStatusComplete {
allDepsCompleted = false
@ -457,15 +498,20 @@ func (e *edge) recalcCurrentState() {
if isSlowIncomplete || dep.state < edgeStatusCacheSlow {
allDepsCompletedCacheSlow = false
}
}
if stHigh > e.state {
e.state = stHigh
if dep.state < edgeStatusCacheSlow && len(dep.keys) == 0 {
allDepsStateCacheSlow = false
}
}
if stLow > e.state {
e.state = stLow
}
if stHigh > e.state {
e.state = stHigh
}
e.allDepsCompletedCacheFast = allDepsCompletedCacheFast
e.allDepsCompletedCacheSlow = allDepsCompletedCacheSlow
e.allDepsStateCacheSlow = allDepsStateCacheSlow
e.allDepsCompleted = allDepsCompleted
}
}
@ -476,36 +522,31 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
// detect the result state for the requests
allIncomingCanComplete := true
desiredState := e.state
allCanceled := true
// check incoming requests
// check if all requests can be either answered
// check if all requests can be either answered or canceled
if !e.isComplete() {
for _, req := range incoming {
if !req.Request().Canceled {
allCanceled = false
if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
desiredState = r.desiredState
allIncomingCanComplete = false
if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
allIncomingCanComplete = false
}
}
}
}
}
// do not set allIncomingCanComplete if some e.state != edgeStateComplete dep.state < e.state && len(e.keys) == 0
hasIncompleteDeps := false
if e.state < edgeStatusComplete && len(e.keys) == 0 {
for _, dep := range e.deps {
if dep.err == nil && dep.state < e.state {
hasIncompleteDeps = true
break
}
}
}
if hasIncompleteDeps {
// do not set allIncomingCanComplete if active ongoing can modify the state
if !allCanceled && e.state < edgeStatusComplete && len(e.keys) == 0 && e.hasActiveOutgoing {
allIncomingCanComplete = false
}
if debugScheduler {
logrus.Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast)
logrus.Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v keys=%d cacheRecords=%d", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast, len(e.keys), len(e.cacheRecords))
}
if allIncomingCanComplete && e.hasActiveOutgoing {
@ -543,7 +584,9 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
// update incoming based on current state
for _, req := range incoming {
r := req.Request().Payload.(*edgeRequest)
if !hasIncompleteDeps && (e.state >= r.desiredState || req.Request().Canceled) {
if req.Request().Canceled {
e.finishIncoming(req)
} else if !e.hasActiveOutgoing && e.state >= r.desiredState {
e.finishIncoming(req)
} else if !isEqualState(r.currentState, e.edgeState) && !req.Request().Canceled {
e.updateIncoming(req)
@ -569,21 +612,25 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory)
desiredStateDep := dep.state
if e.noCacheMatchPossible {
desiredStateDep = desiredState
desiredStateDep = edgeStatusComplete
} else if dep.state == edgeStatusInitial && desiredState > dep.state {
desiredStateDep = edgeStatusCacheFast
} else if dep.state == edgeStatusCacheFast && desiredState > dep.state {
if e.allDepsCompletedCacheFast && len(e.keys) == 0 {
desiredStateDep = edgeStatusCacheSlow
// wait all deps to complete cache fast before continuing with slow cache
if (e.allDepsCompletedCacheFast && len(e.keys) == 0) || len(dep.keys) == 0 || e.allDepsHaveKeys() {
if !e.skipPhase2FastCache(dep) {
desiredStateDep = edgeStatusCacheSlow
}
}
} else if dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete {
if (e.allDepsCompletedCacheSlow || e.slowCacheFunc(dep) != nil) && len(e.keys) == 0 {
if !e.skipPhase2SlowCache(dep) {
// if all deps have completed cache-slow or content based cache for input is available
if (len(dep.keys) == 0 || e.allDepsCompletedCacheSlow || (!e.skipPhase2FastCache(dep) && e.slowCacheFunc(dep) != nil)) && (len(e.cacheRecords) == 0) {
if len(dep.keys) == 0 || !e.skipPhase2SlowCache(dep) && e.allDepsStateCacheSlow {
desiredStateDep = edgeStatusComplete
}
}
} else if dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow {
if !e.skipPhase2SlowCache(dep) {
if len(dep.keys) == 0 || !e.skipPhase2SlowCache(dep) && e.allDepsStateCacheSlow {
desiredStateDep = edgeStatusComplete
}
}
@ -602,6 +649,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory)
req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{
currentState: dep.edgeState,
desiredState: desiredStateDep,
currentKeys: len(dep.keys),
})
e.depRequests[req] = dep
dep.req = req
@ -625,23 +673,26 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory)
// execIfPossible creates a request for getting the edge result if there is
// enough state
func (e *edge) execIfPossible(f *pipeFactory) {
func (e *edge) execIfPossible(f *pipeFactory) bool {
if len(e.cacheRecords) > 0 {
if e.keysDidChange {
e.postpone(f)
return
return true
}
e.execReq = f.NewFuncRequest(e.loadCache)
for req := range e.depRequests {
req.Cancel()
}
return true
} else if e.allDepsCompleted {
if e.keysDidChange {
e.postpone(f)
return
return true
}
e.execReq = f.NewFuncRequest(e.execOp)
return true
}
return false
}
// postpone delays exec to next unpark invocation if we have unprocessed keys
@ -698,6 +749,11 @@ func (e *edge) execOp(ctx context.Context) (interface{}, error) {
res := results[int(index)]
for i := range results {
if i != int(index) {
go results[i].Release(context.TODO())
}
}
ck, err := e.op.Cache().Save(cacheKey, res)
if err != nil {
return nil, err

View File

@ -196,13 +196,19 @@ func (s *inMemoryStore) AddLink(id string, link CacheInfoLink, target string) er
func (s *inMemoryStore) WalkLinks(id string, link CacheInfoLink, fn func(id string) error) error {
s.mu.RLock()
defer s.mu.RUnlock()
k, ok := s.byID[id]
if !ok {
s.mu.RUnlock()
return errors.Wrapf(ErrNotFound, "no such key %s", id)
}
var links []string
for target := range k.links[link] {
if err := fn(target); err != nil {
links = append(links, target)
}
s.mu.RUnlock()
for _, t := range links {
if err := fn(t); err != nil {
return err
}
}

View File

@ -84,3 +84,40 @@ func (cr *cachedResult) Export(ctx context.Context, converter func(context.Conte
}
return out, nil
}
func NewSharedCachedResult(res CachedResult) *SharedCachedResult {
return &SharedCachedResult{
SharedResult: NewSharedResult(res),
CachedResult: res,
}
}
func (r *SharedCachedResult) Clone() CachedResult {
return &clonedCachedResult{Result: r.SharedResult.Clone(), cr: r.CachedResult}
}
func (r *SharedCachedResult) Release(ctx context.Context) error {
return r.SharedResult.Release(ctx)
}
type clonedCachedResult struct {
Result
cr CachedResult
}
func (r *clonedCachedResult) ID() string {
return r.Result.ID()
}
func (cr *clonedCachedResult) CacheKey() ExportableCacheKey {
return cr.cr.CacheKey()
}
func (cr *clonedCachedResult) Export(ctx context.Context, converter func(context.Context, Result) (*Remote, error)) ([]ExportRecord, error) {
return cr.cr.Export(ctx, converter)
}
type SharedCachedResult struct {
*SharedResult
CachedResult
}

View File

@ -141,7 +141,7 @@ func (s *Scheduler) dispatch(e *edge) {
}
// if keys changed there might be possiblity for merge with other edge
if e.keysDidChange {
if e.keysDidChange && e.cacheMap != nil {
origEdge := e.index.LoadOrStore(e, e.cacheMap.Digest, e.edge.Index, e.depKeys())
if origEdge != nil {
logrus.Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name())
@ -208,7 +208,7 @@ func (s *Scheduler) build(ctx context.Context, edge Edge) (CachedResult, error)
if err := p.Receiver.Status().Err; err != nil {
return nil, err
}
return p.Receiver.Status().Value.(*edgeState).result, nil
return p.Receiver.Status().Value.(*edgeState).result.Clone(), nil
}
// newPipe creates a new request pipe between two edges
@ -318,14 +318,14 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
if !debugScheduler {
return
}
logrus.Debugf(">> unpark %s req=%d upt=%d out=%d state=%s", e.edge.Vertex.Name(), len(inc), len(updates), len(allPipes), e.state)
logrus.Debugf(">> unpark %s req=%d upt=%d out=%d state=%s %s", e.edge.Vertex.Name(), len(inc), len(updates), len(allPipes), e.state, e.edge.Vertex.Digest())
for i, dep := range e.deps {
des := edgeStatusInitial
if dep.req != nil {
des = dep.req.Request().(*edgeRequest).desiredState
}
logrus.Debugf(":: dep%d %s state=%s des=%s", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des)
logrus.Debugf(":: dep%d %s state=%s des=%s keys=%s hasslowcache=%v", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des, len(dep.keys), e.slowCacheFunc(dep) != nil)
}
for i, in := range inc {
@ -346,6 +346,8 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
index = int(dep.index)
}
logrus.Debugf("> update-%d: %p input-%d keys=%d state=%s", i, up, index, len(st.keys), st.state)
} else {
logrus.Debugf("> update-%d: unknown", i)
}
}
}

View File

@ -2156,6 +2156,94 @@ func TestCacheExporting(t *testing.T) {
require.Equal(t, 3, len(rec2))
}
func TestSlowCacheAvoidAccess(t *testing.T) {
t.Parallel()
ctx := context.TODO()
cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
cachePreFunc: func(context.Context) error {
select {
case <-time.After(50 * time.Millisecond):
case <-ctx.Done():
}
return nil
},
value: "result0",
inputs: []Edge{{
Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
})},
},
selectors: map[int]digest.Digest{
0: dgst("sel0"),
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
0: digestFromResult,
},
}),
}},
}),
}
g0.Vertex.(*vertex).setupCallCounters()
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.Equal(t, int64(0), cacheManager.loadCounter)
require.NoError(t, j0.Discard())
j0 = nil
j1, err := l.NewJob("j1")
require.NoError(t, err)
g0.Vertex.(*vertex).setupCallCounters()
defer func() {
if j1 != nil {
j1.Discard()
}
}()
res, err = j1.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.NoError(t, j1.Discard())
j1 = nil
require.Equal(t, int64(3), *g0.Vertex.(*vertex).cacheCallCount)
require.Equal(t, int64(0), *g0.Vertex.(*vertex).execCallCount)
require.Equal(t, int64(1), cacheManager.loadCounter)
}
func TestCacheExportingPartialSelector(t *testing.T) {
t.Parallel()
ctx := context.TODO()