solver: fix possible data races

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2017-09-27 22:38:54 -07:00
parent f41283caed
commit 4a9142223d
5 changed files with 48 additions and 24 deletions

2
cache/manager.go vendored
View File

@ -104,7 +104,7 @@ func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (
defer rec.mu.Unlock()
if rec.mutable {
if len(rec.refs) != 0 {
if rec.dead || len(rec.refs) != 0 {
return nil, errors.Wrapf(errLocked, "%s is locked", id)
}
if rec.equalImmutable != nil {

17
cache/refs.go vendored
View File

@ -8,6 +8,7 @@ import (
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
@ -44,6 +45,7 @@ type cacheRecord struct {
md *metadata.StorageItem
view string
viewMount []mount.Mount
dead bool
sizeG flightcontrol.Group
// size int64
@ -215,13 +217,18 @@ func (sr *cacheRecord) finalize(ctx context.Context) error {
if mutable == nil {
return nil
}
err := sr.cm.Snapshotter.Commit(ctx, sr.ID(), sr.equalMutable.ID())
err := sr.cm.Snapshotter.Commit(ctx, sr.ID(), mutable.ID())
if err != nil {
return errors.Wrapf(err, "failed to commit %s", sr.equalMutable.ID())
}
if err := sr.equalMutable.remove(ctx, false); err != nil {
return err
return errors.Wrapf(err, "failed to commit %s", mutable.ID())
}
mutable.dead = true
go func() {
sr.cm.mu.Lock()
defer sr.cm.mu.Unlock()
if err := mutable.remove(context.TODO(), false); err != nil {
logrus.Error(err)
}
}()
sr.equalMutable = nil
clearEqualMutable(sr.md)
return sr.md.Commit()

View File

@ -326,7 +326,8 @@ func (vs *vertexStream) append(v client.Vertex) []*client.Vertex {
}
}
}
return append(out, &v)
vcopy := v
return append(out, &vcopy)
}
func (vs *vertexStream) encore() []*client.Vertex {

View File

@ -182,9 +182,10 @@ type vertexSolver struct {
f *bgfunc.F
ctx context.Context
baseKey digest.Digest
mu sync.Mutex
results []digest.Digest
baseKey digest.Digest
mu sync.Mutex
results []digest.Digest
markCachedOnce sync.Once
signal *signal // used to notify that there are callers who need more data
}
@ -344,12 +345,13 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
}
vs.mu.Unlock()
wait := vs.signal.Wait()
waitFirst := vs.signal.Wait()
waitRun := waitFirst
select {
case <-ctx.Done():
return ctx.Err()
case <-wait:
case <-waitFirst:
}
// this is where you lookup the cache keys that were successfully probed
@ -363,12 +365,13 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
eg.Go(func() error {
inp := vs.inputs[i]
defer inp.ev.Cancel()
for {
waitNext := waitFirst
for {
select {
case <-ctx2.Done():
return ctx2.Err()
case <-wait:
case <-waitNext:
}
// check if current cache key is in cache
@ -379,7 +382,9 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
}
if ref != nil {
inp.ref = ref.(Reference)
markCached(ctx, inp.solver.(*vertexSolver).cv)
inp.solver.(*vertexSolver).markCachedOnce.Do(func() {
markCached(ctx, inp.solver.(*vertexSolver).cv)
})
return nil
}
}
@ -419,8 +424,9 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
return err
}
vs.results = append(vs.results, dgst)
signal() // wake up callers
wait = vs.signal.Reset() // make sure we don't continue unless there are callers
signal() // wake up callers
waitNext = vs.signal.Reset() // make sure we don't continue unless there are callers
waitRun = waitNext
vs.mu.Unlock()
}
}
@ -459,14 +465,14 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
vs.mu.Lock()
vs.results = append(vs.results, extraKeys...)
signal()
wait = vs.signal.Reset()
waitRun = vs.signal.Reset()
vs.mu.Unlock()
}
select {
case <-ctx.Done():
return
case <-wait:
case <-waitRun:
}
// no cache hit. start evaluating the node

View File

@ -43,23 +43,24 @@ func (f *F) run() {
f.running = true
ctx, cancel := context.WithCancel(f.mainCtx)
ctxErr := make(chan error, 1)
f.cancelCtx = cancel
f.ctxErr = ctxErr
f.cancelCtx = cancel
go func() {
var err error
var nodone bool
defer func() {
// release all cancellations
ctxErr <- err
close(ctxErr)
f.mu.Lock()
f.runMu.Lock()
f.running = false
f.runMu.Unlock()
f.mu.Lock()
if !nodone {
f.done = true
f.err = err
}
f.cond.Broadcast()
ctxErr <- err
f.mu.Unlock()
}()
err = f.f(ctx, func() {
@ -82,11 +83,13 @@ func (f *F) addSem() {
func (f *F) clearSem() error {
f.sem--
var err error
f.runMu.Lock()
if cctx := f.cancelCtx; f.sem == 0 && cctx != nil {
cctx()
err = <-f.ctxErr
f.cancelCtx = nil
}
f.runMu.Unlock()
return err
}
@ -112,10 +115,11 @@ func (c *Caller) Call(ctx context.Context, f func() (interface{}, error)) (inter
for {
select {
case <-ctx.Done():
c.F.mu.RUnlock()
if err := c.Cancel(); err != nil {
if err := c.cancel(); err != nil {
c.F.mu.RUnlock()
return nil, err
}
c.F.mu.RUnlock()
return nil, ctx.Err()
default:
}
@ -148,6 +152,12 @@ func (c *Caller) Call(ctx context.Context, f func() (interface{}, error)) (inter
}
func (c *Caller) Cancel() error {
c.F.mu.Lock()
defer c.F.mu.Unlock()
return c.cancel()
}
func (c *Caller) cancel() error {
c.F.semMu.Lock()
defer c.F.semMu.Unlock()
if c.active {