solver: gracefully handle cache loading errors

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
v0.8
Tonis Tiigi 2020-05-18 15:48:52 -07:00
parent 24f4838730
commit dd765674fb
2 changed files with 132 additions and 16 deletions

View File

@ -31,6 +31,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
depRequests: map[pipe.Receiver]*dep{}, depRequests: map[pipe.Receiver]*dep{},
keyMap: map[string]struct{}{}, keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{}, cacheRecords: map[string]*CacheRecord{},
cacheRecordsLoaded: map[string]struct{}{},
index: index, index: index,
} }
return e return e
@ -49,8 +50,10 @@ type edge struct {
cacheMapIndex int cacheMapIndex int
cacheMapDigests []digest.Digest cacheMapDigests []digest.Digest
execReq pipe.Receiver execReq pipe.Receiver
execCacheLoad bool
err error err error
cacheRecords map[string]*CacheRecord cacheRecords map[string]*CacheRecord
cacheRecordsLoaded map[string]struct{}
keyMap map[string]struct{} keyMap map[string]struct{}
noCacheMatchPossible bool noCacheMatchPossible bool
@ -425,7 +428,11 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
if upt == e.execReq && upt.Status().Completed { if upt == e.execReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil { if err := upt.Status().Err; err != nil {
e.execReq = nil e.execReq = nil
if !upt.Status().Canceled && e.err == nil { if e.execCacheLoad {
for k := range e.cacheRecordsLoaded {
delete(e.cacheRecords, k)
}
} else if !upt.Status().Canceled && e.err == nil {
e.err = err e.err = err
} }
} else { } else {
@ -561,8 +568,10 @@ func (e *edge) recalcCurrentState() {
} }
for _, r := range records { for _, r := range records {
if _, ok := e.cacheRecordsLoaded[r.ID]; !ok {
e.cacheRecords[r.ID] = r e.cacheRecords[r.ID] = r
} }
}
e.keys = append(e.keys, e.makeExportable(mergedKey, records)) e.keys = append(e.keys, e.makeExportable(mergedKey, records))
} }
@ -821,6 +830,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool {
return true return true
} }
e.execReq = f.NewFuncRequest(e.loadCache) e.execReq = f.NewFuncRequest(e.loadCache)
e.execCacheLoad = true
for req := range e.depRequests { for req := range e.depRequests {
req.Cancel() req.Cancel()
} }
@ -831,6 +841,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool {
return true return true
} }
e.execReq = f.NewFuncRequest(e.execOp) e.execReq = f.NewFuncRequest(e.execOp)
e.execCacheLoad = false
return true return true
} }
return false return false
@ -851,6 +862,7 @@ func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
} }
rec := getBestResult(recs) rec := getBestResult(recs)
e.cacheRecordsLoaded[rec.ID] = struct{}{}
logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID) logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
res, err := e.op.LoadCache(ctx, rec) res, err := e.op.LoadCache(ctx, rec)

View File

@ -3102,6 +3102,106 @@ func TestMergedEdgesLookup(t *testing.T) {
} }
} }
func TestCacheLoadError(t *testing.T) {
t.Parallel()
rand.Seed(time.Now().UnixNano())
ctx := context.TODO()
cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())
l := NewSolver(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g := Edge{
Vertex: vtxSum(3, vtxOpt{inputs: []Edge{
{Vertex: vtxSum(0, vtxOpt{inputs: []Edge{
{Vertex: vtxSum(2, vtxOpt{inputs: []Edge{
{Vertex: vtxConst(2, vtxOpt{})},
}})},
{Vertex: vtxConst(0, vtxOpt{})},
}})},
{Vertex: vtxSum(2, vtxOpt{inputs: []Edge{
{Vertex: vtxConst(2, vtxOpt{})},
}})},
}}),
}
g.Vertex.(*vertexSum).setupCallCounters()
res, err := j0.Build(ctx, g)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(0), cacheManager.loadCounter)
require.NoError(t, j0.Discard())
j0 = nil
// repeat with cache
j1, err := l.NewJob("j1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := g
g1.Vertex.(*vertexSum).setupCallCounters()
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(0), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(1), cacheManager.loadCounter)
require.NoError(t, j1.Discard())
j1 = nil
// repeat with cache but loading will now fail
j2, err := l.NewJob("j2")
require.NoError(t, err)
defer func() {
if j2 != nil {
j2.Discard()
}
}()
g2 := g
g2.Vertex.(*vertexSum).setupCallCounters()
cacheManager.forceFail = true
res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(6), cacheManager.loadCounter)
require.NoError(t, j2.Discard())
j2 = nil
}
func TestInputRequestDeadlock(t *testing.T) { func TestInputRequestDeadlock(t *testing.T) {
t.Parallel() t.Parallel()
ctx := context.TODO() ctx := context.TODO()
@ -3584,10 +3684,14 @@ func newTrackingCacheManager(cm CacheManager) *trackingCacheManager {
type trackingCacheManager struct { type trackingCacheManager struct {
CacheManager CacheManager
loadCounter int64 loadCounter int64
forceFail bool
} }
func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) { func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
atomic.AddInt64(&cm.loadCounter, 1) atomic.AddInt64(&cm.loadCounter, 1)
if cm.forceFail {
return nil, errors.Errorf("force fail")
}
return cm.CacheManager.Load(ctx, rec) return cm.CacheManager.Load(ctx, rec)
} }