Merge pull request #1963 from tonistiigi/release-count-fixes

v0.9
Akihiro Suda 2021-02-19 08:22:54 +09:00 committed by GitHub
commit 19d31737fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 38 additions and 10 deletions

View File

@ -127,6 +127,7 @@ type MountRef struct {
type MountMutableRef struct { type MountMutableRef struct {
Ref cache.MutableRef Ref cache.MutableRef
MountIndex int MountIndex int
NoCommit bool
} }
type MakeMutable func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) type MakeMutable func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error)
@ -196,6 +197,7 @@ func PrepareMounts(ctx context.Context, mm *mounts.MountManager, cm cache.Manage
p.Actives = append(p.Actives, MountMutableRef{ p.Actives = append(p.Actives, MountMutableRef{
MountIndex: i, MountIndex: i,
Ref: active, Ref: active,
NoCommit: true,
}) })
if m.Output != opspb.SkipOutput && ref != nil { if m.Output != opspb.SkipOutput && ref != nil {
p.OutputRefs = append(p.OutputRefs, MountRef{ p.OutputRefs = append(p.OutputRefs, MountRef{

View File

@ -21,10 +21,15 @@ func (e *ExecError) Unwrap() error {
} }
func (e *ExecError) EachRef(fn func(solver.Result) error) (err error) { func (e *ExecError) EachRef(fn func(solver.Result) error) (err error) {
m := map[solver.Result]struct{}{}
for _, res := range e.Inputs { for _, res := range e.Inputs {
if res == nil { if res == nil {
continue continue
} }
if _, ok := m[res]; ok {
continue
}
m[res] = struct{}{}
if err1 := fn(res); err1 != nil && err == nil { if err1 := fn(res); err1 != nil && err == nil {
err = err1 err = err1
} }
@ -33,6 +38,10 @@ func (e *ExecError) EachRef(fn func(solver.Result) error) (err error) {
if res == nil { if res == nil {
continue continue
} }
if _, ok := m[res]; ok {
continue
}
m[res] = struct{}{}
if err1 := fn(res); err1 != nil && err == nil { if err1 := fn(res); err1 != nil && err == nil {
err = err1 err = err1
} }

View File

@ -235,7 +235,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
if m.Input == -1 { if m.Input == -1 {
continue continue
} }
execInputs[i] = inputs[m.Input] execInputs[i] = inputs[m.Input].Clone()
} }
execMounts := make([]solver.Result, len(e.op.Mounts)) execMounts := make([]solver.Result, len(e.op.Mounts))
copy(execMounts, execInputs) copy(execMounts, execInputs)
@ -243,12 +243,16 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
execMounts[p.OutputRefs[i].MountIndex] = res execMounts[p.OutputRefs[i].MountIndex] = res
} }
for _, active := range p.Actives { for _, active := range p.Actives {
ref, cerr := active.Ref.Commit(ctx) if active.NoCommit {
if cerr != nil { active.Ref.Release(context.TODO())
err = errors.Wrapf(err, "error committing %s: %s", active.Ref.ID(), cerr) } else {
continue ref, cerr := active.Ref.Commit(ctx)
if cerr != nil {
err = errors.Wrapf(err, "error committing %s: %s", active.Ref.ID(), cerr)
continue
}
execMounts[active.MountIndex] = worker.NewWorkerRefResult(ref, e.w)
} }
execMounts[active.MountIndex] = worker.NewWorkerRefResult(ref, e.w)
} }
err = errdefs.WithExecError(err, execInputs, execMounts) err = errdefs.WithExecError(err, execInputs, execMounts)
} else { } else {

View File

@ -430,7 +430,6 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp
if cerr == nil { if cerr == nil {
outputRes[idx-len(inputs)] = worker.NewWorkerRefResult(ref.(cache.ImmutableRef), s.w) outputRes[idx-len(inputs)] = worker.NewWorkerRefResult(ref.(cache.ImmutableRef), s.w)
} }
inpMount.Release(context.TODO())
} }
// If the action has a secondary input, commit it and set the ref on // If the action has a secondary input, commit it and set the ref on

View File

@ -47,7 +47,7 @@ type splitResult struct {
func (r *splitResult) Release(ctx context.Context) error { func (r *splitResult) Release(ctx context.Context) error {
if atomic.AddInt64(&r.released, 1) > 1 { if atomic.AddInt64(&r.released, 1) > 1 {
err := errors.Errorf("releasing already released reference") err := errors.Errorf("releasing already released reference %+v", r.Result.ID())
logrus.Error(err) logrus.Error(err)
return err return err
} }
@ -78,10 +78,14 @@ func NewSharedCachedResult(res CachedResult) *SharedCachedResult {
} }
} }
func (r *SharedCachedResult) Clone() CachedResult { func (r *SharedCachedResult) CloneCachedResult() CachedResult {
return &clonedCachedResult{Result: r.SharedResult.Clone(), cr: r.CachedResult} return &clonedCachedResult{Result: r.SharedResult.Clone(), cr: r.CachedResult}
} }
func (r *SharedCachedResult) Clone() Result {
return r.CloneCachedResult()
}
func (r *SharedCachedResult) Release(ctx context.Context) error { func (r *SharedCachedResult) Release(ctx context.Context) error {
return r.SharedResult.Release(ctx) return r.SharedResult.Release(ctx)
} }

View File

@ -244,7 +244,7 @@ func (s *scheduler) build(ctx context.Context, edge Edge) (CachedResult, error)
if err := p.Receiver.Status().Err; err != nil { if err := p.Receiver.Status().Err; err != nil {
return nil, err return nil, err
} }
return p.Receiver.Status().Value.(*edgeState).result.Clone(), nil return p.Receiver.Status().Value.(*edgeState).result.CloneCachedResult(), nil
} }
// newPipe creates a new request pipe between two edges // newPipe creates a new request pipe between two edges

View File

@ -3640,6 +3640,7 @@ type dummyResult struct {
func (r *dummyResult) ID() string { return r.id } func (r *dummyResult) ID() string { return r.id }
func (r *dummyResult) Release(context.Context) error { return nil } func (r *dummyResult) Release(context.Context) error { return nil }
func (r *dummyResult) Sys() interface{} { return r } func (r *dummyResult) Sys() interface{} { return r }
func (r *dummyResult) Clone() Result { return r }
func testOpResolver(v Vertex, b Builder) (Op, error) { func testOpResolver(v Vertex, b Builder) (Op, error) {
if op, ok := v.Sys().(Op); ok { if op, ok := v.Sys().(Op); ok {

View File

@ -61,6 +61,7 @@ type Result interface {
ID() string ID() string
Release(context.Context) error Release(context.Context) error
Sys() interface{} Sys() interface{}
Clone() Result
} }
// CachedResult is a result connected with its cache key // CachedResult is a result connected with its cache key

View File

@ -52,3 +52,11 @@ func (r *workerRefResult) Release(ctx context.Context) error {
func (r *workerRefResult) Sys() interface{} { func (r *workerRefResult) Sys() interface{} {
return r.WorkerRef return r.WorkerRef
} }
func (r *workerRefResult) Clone() solver.Result {
r2 := *r
if r.ImmutableRef != nil {
r.ImmutableRef = r.ImmutableRef.Clone()
}
return &r2
}