Merge pull request #1838 from hinshun/release-err-result
Guarantee err results are released when result proxy is releasedv0.8
commit
de3e15b5dc
|
@ -659,6 +659,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF
|
|||
case <-ctx.Done():
|
||||
if strings.Contains(err.Error(), context.Canceled.Error()) {
|
||||
complete = false
|
||||
releaseError(err)
|
||||
err = errors.Wrap(ctx.Err(), err.Error())
|
||||
}
|
||||
default:
|
||||
|
@ -717,6 +718,7 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp,
|
|||
case <-ctx.Done():
|
||||
if strings.Contains(err.Error(), context.Canceled.Error()) {
|
||||
complete = false
|
||||
releaseError(err)
|
||||
err = errors.Wrap(ctx.Err(), err.Error())
|
||||
}
|
||||
default:
|
||||
|
@ -774,6 +776,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
|
|||
case <-ctx.Done():
|
||||
if strings.Contains(err.Error(), context.Canceled.Error()) {
|
||||
complete = false
|
||||
releaseError(err)
|
||||
err = errors.Wrap(ctx.Err(), err.Error())
|
||||
}
|
||||
default:
|
||||
|
@ -911,3 +914,15 @@ func WrapSlowCache(err error, index Index, res Result) error {
|
|||
}
|
||||
return &SlowCacheError{Index: index, Result: res, error: err}
|
||||
}
|
||||
|
||||
func releaseError(err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
if re, ok := err.(interface {
|
||||
Release() error
|
||||
}); ok {
|
||||
re.Release()
|
||||
}
|
||||
releaseError(errors.Unwrap(err))
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/errdefs"
|
||||
llberrdefs "github.com/moby/buildkit/solver/llbsolver/errdefs"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/util/flightcontrol"
|
||||
"github.com/moby/buildkit/worker"
|
||||
|
@ -144,41 +145,60 @@ func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest, sid st
|
|||
}
|
||||
|
||||
type resultProxy struct {
|
||||
cb func(context.Context) (solver.CachedResult, error)
|
||||
def *pb.Definition
|
||||
g flightcontrol.Group
|
||||
mu sync.Mutex
|
||||
released bool
|
||||
v solver.CachedResult
|
||||
err error
|
||||
cb func(context.Context) (solver.CachedResult, error)
|
||||
def *pb.Definition
|
||||
g flightcontrol.Group
|
||||
mu sync.Mutex
|
||||
released bool
|
||||
v solver.CachedResult
|
||||
err error
|
||||
errResults []solver.Result
|
||||
}
|
||||
|
||||
func newResultProxy(b *llbBridge, req frontend.SolveRequest) *resultProxy {
|
||||
return &resultProxy{
|
||||
rp := &resultProxy{
|
||||
def: req.Definition,
|
||||
cb: func(ctx context.Context) (solver.CachedResult, error) {
|
||||
return b.loadResult(ctx, req.Definition, req.CacheImports)
|
||||
},
|
||||
}
|
||||
rp.cb = func(ctx context.Context) (solver.CachedResult, error) {
|
||||
res, err := b.loadResult(ctx, req.Definition, req.CacheImports)
|
||||
var ee *llberrdefs.ExecError
|
||||
if errors.As(err, &ee) {
|
||||
ee.EachRef(func(res solver.Result) error {
|
||||
rp.errResults = append(rp.errResults, res)
|
||||
return nil
|
||||
})
|
||||
// acquire ownership so ExecError finalizer doesn't attempt to release as well
|
||||
ee.OwnerBorrowed = true
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
return rp
|
||||
}
|
||||
|
||||
func (rp *resultProxy) Definition() *pb.Definition {
|
||||
return rp.def
|
||||
}
|
||||
|
||||
func (rp *resultProxy) Release(ctx context.Context) error {
|
||||
func (rp *resultProxy) Release(ctx context.Context) (err error) {
|
||||
rp.mu.Lock()
|
||||
defer rp.mu.Unlock()
|
||||
for _, res := range rp.errResults {
|
||||
rerr := res.Release(ctx)
|
||||
if rerr != nil {
|
||||
err = rerr
|
||||
}
|
||||
}
|
||||
if rp.v != nil {
|
||||
if rp.released {
|
||||
logrus.Warnf("release of already released result")
|
||||
}
|
||||
if err := rp.v.Release(ctx); err != nil {
|
||||
return err
|
||||
rerr := rp.v.Release(ctx)
|
||||
if err != nil {
|
||||
return rerr
|
||||
}
|
||||
}
|
||||
rp.released = true
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
func (rp *resultProxy) wrapError(err error) error {
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
package errdefs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// ExecError will be returned when an error is encountered when evaluating an op.
|
||||
type ExecError struct {
|
||||
error
|
||||
Inputs []solver.Result
|
||||
Mounts []solver.Result
|
||||
Inputs []solver.Result
|
||||
Mounts []solver.Result
|
||||
OwnerBorrowed bool
|
||||
}
|
||||
|
||||
func (e *ExecError) Unwrap() error {
|
||||
|
@ -35,13 +40,35 @@ func (e *ExecError) EachRef(fn func(solver.Result) error) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
func (e *ExecError) Release() error {
|
||||
if e.OwnerBorrowed {
|
||||
return nil
|
||||
}
|
||||
err := e.EachRef(func(r solver.Result) error {
|
||||
r.Release(context.TODO())
|
||||
return nil
|
||||
})
|
||||
e.OwnerBorrowed = true
|
||||
return err
|
||||
}
|
||||
|
||||
func WithExecError(err error, inputs, mounts []solver.Result) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return &ExecError{
|
||||
ee := &ExecError{
|
||||
error: err,
|
||||
Inputs: inputs,
|
||||
Mounts: mounts,
|
||||
}
|
||||
runtime.SetFinalizer(ee, func(e *ExecError) {
|
||||
if !e.OwnerBorrowed {
|
||||
e.EachRef(func(r solver.Result) error {
|
||||
logrus.Warn("leaked execError detected and released")
|
||||
r.Release(context.TODO())
|
||||
return nil
|
||||
})
|
||||
}
|
||||
})
|
||||
return ee
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/moby/buildkit/frontend/gateway"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/llbsolver/errdefs"
|
||||
"github.com/moby/buildkit/util/compression"
|
||||
"github.com/moby/buildkit/util/entitlements"
|
||||
"github.com/moby/buildkit/util/progress"
|
||||
|
@ -143,20 +142,6 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro
|
|||
res.EachRef(func(ref solver.ResultProxy) error {
|
||||
eg.Go(func() error {
|
||||
_, err := ref.Result(ctx2)
|
||||
if err != nil {
|
||||
// Also release any results referenced by exec errors.
|
||||
var ee *errdefs.ExecError
|
||||
if errors.As(err, &ee) {
|
||||
ee.EachRef(func(res solver.Result) error {
|
||||
|
||||
workerRef, ok := res.Sys().(*worker.WorkerRef)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return workerRef.ImmutableRef.Release(ctx)
|
||||
})
|
||||
}
|
||||
}
|
||||
return err
|
||||
})
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue