solver: gracefully handle cache loading errors

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
(cherry picked from commit dd765674fb)
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
v0.7
Tonis Tiigi 2020-05-18 15:48:52 -07:00
parent 4b35ebce5b
commit b5f0df198d
2 changed files with 132 additions and 16 deletions

View File

@ -26,12 +26,13 @@ func (t edgeStatusType) String() string {
func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
e := &edge{
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{},
index: index,
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{},
cacheRecordsLoaded: map[string]struct{}{},
index: index,
}
return e
}
@ -44,14 +45,16 @@ type edge struct {
depRequests map[pipe.Receiver]*dep
deps []*dep
cacheMapReq pipe.Receiver
cacheMapDone bool
cacheMapIndex int
cacheMapDigests []digest.Digest
execReq pipe.Receiver
err error
cacheRecords map[string]*CacheRecord
keyMap map[string]struct{}
cacheMapReq pipe.Receiver
cacheMapDone bool
cacheMapIndex int
cacheMapDigests []digest.Digest
execReq pipe.Receiver
execCacheLoad bool
err error
cacheRecords map[string]*CacheRecord
cacheRecordsLoaded map[string]struct{}
keyMap map[string]struct{}
noCacheMatchPossible bool
allDepsCompletedCacheFast bool
@ -425,7 +428,11 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
if upt == e.execReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
e.execReq = nil
if !upt.Status().Canceled && e.err == nil {
if e.execCacheLoad {
for k := range e.cacheRecordsLoaded {
delete(e.cacheRecords, k)
}
} else if !upt.Status().Canceled && e.err == nil {
e.err = err
}
} else {
@ -561,7 +568,9 @@ func (e *edge) recalcCurrentState() {
}
for _, r := range records {
e.cacheRecords[r.ID] = r
if _, ok := e.cacheRecordsLoaded[r.ID]; !ok {
e.cacheRecords[r.ID] = r
}
}
e.keys = append(e.keys, e.makeExportable(mergedKey, records))
@ -821,6 +830,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool {
return true
}
e.execReq = f.NewFuncRequest(e.loadCache)
e.execCacheLoad = true
for req := range e.depRequests {
req.Cancel()
}
@ -831,6 +841,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool {
return true
}
e.execReq = f.NewFuncRequest(e.execOp)
e.execCacheLoad = false
return true
}
return false
@ -851,6 +862,7 @@ func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
}
rec := getBestResult(recs)
e.cacheRecordsLoaded[rec.ID] = struct{}{}
logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
res, err := e.op.LoadCache(ctx, rec)

View File

@ -3102,6 +3102,106 @@ func TestMergedEdgesLookup(t *testing.T) {
}
}
func TestCacheLoadError(t *testing.T) {
t.Parallel()
rand.Seed(time.Now().UnixNano())
ctx := context.TODO()
cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())
l := NewSolver(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g := Edge{
Vertex: vtxSum(3, vtxOpt{inputs: []Edge{
{Vertex: vtxSum(0, vtxOpt{inputs: []Edge{
{Vertex: vtxSum(2, vtxOpt{inputs: []Edge{
{Vertex: vtxConst(2, vtxOpt{})},
}})},
{Vertex: vtxConst(0, vtxOpt{})},
}})},
{Vertex: vtxSum(2, vtxOpt{inputs: []Edge{
{Vertex: vtxConst(2, vtxOpt{})},
}})},
}}),
}
g.Vertex.(*vertexSum).setupCallCounters()
res, err := j0.Build(ctx, g)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(0), cacheManager.loadCounter)
require.NoError(t, j0.Discard())
j0 = nil
// repeat with cache
j1, err := l.NewJob("j1")
require.NoError(t, err)
defer func() {
if j1 != nil {
j1.Discard()
}
}()
g1 := g
g1.Vertex.(*vertexSum).setupCallCounters()
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(0), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(1), cacheManager.loadCounter)
require.NoError(t, j1.Discard())
j1 = nil
// repeat with cache but loading will now fail
j2, err := l.NewJob("j2")
require.NoError(t, err)
defer func() {
if j2 != nil {
j2.Discard()
}
}()
g2 := g
g2.Vertex.(*vertexSum).setupCallCounters()
cacheManager.forceFail = true
res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(6), cacheManager.loadCounter)
require.NoError(t, j2.Discard())
j2 = nil
}
func TestInputRequestDeadlock(t *testing.T) {
t.Parallel()
ctx := context.TODO()
@ -3584,10 +3684,14 @@ func newTrackingCacheManager(cm CacheManager) *trackingCacheManager {
type trackingCacheManager struct {
CacheManager
loadCounter int64
forceFail bool
}
func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
atomic.AddInt64(&cm.loadCounter, 1)
if cm.forceFail {
return nil, errors.Errorf("force fail")
}
return cm.CacheManager.Load(ctx, rec)
}