Switch to Acquire API.

Signed-off-by: Vlad A. Ionescu <vladaionescu@users.noreply.github.com>
v0.9
Vlad A. Ionescu 2021-05-12 13:41:33 +03:00
parent 489e17aea9
commit b3cf7c43cf
18 changed files with 138 additions and 95 deletions

View File

@ -55,7 +55,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
)
@ -622,10 +621,6 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
"registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn),
"local": localremotecache.ResolveCacheImporterFunc(sessionManager),
}
var parallelismSem *semaphore.Weighted
if cfg.MaxParallelism > 0 {
parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism))
}
return control.NewController(control.Opt{
SessionManager: sessionManager,
WorkerController: wc,
@ -634,7 +629,6 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
ResolveCacheImporterFuncs: remoteCacheImporterFuncs,
CacheKeyStorage: cacheStorage,
Entitlements: cfg.Entitlements,
ParallelismSem: parallelismSem,
})
}

View File

@ -18,6 +18,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"golang.org/x/sync/semaphore"
)
const (
@ -225,11 +226,16 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
},
}
var parallelismSem *semaphore.Weighted
if common.config.MaxParallelism > 0 {
parallelismSem = semaphore.NewWeighted(int64(common.config.MaxParallelism))
}
snapshotter := ctd.DefaultSnapshotter
if cfg.Snapshotter != "" {
snapshotter = cfg.Snapshotter
}
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, ctd.WithTimeout(60*time.Second))
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, ctd.WithTimeout(60*time.Second))
if err != nil {
return nil, err
}

View File

@ -36,6 +36,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)
@ -276,7 +277,12 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
},
}
opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile)
var parallelismSem *semaphore.Weighted
if common.config.MaxParallelism > 0 {
parallelismSem = semaphore.NewWeighted(int64(common.config.MaxParallelism))
}
opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem)
if err != nil {
return nil, err
}

View File

@ -24,7 +24,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
)
@ -36,7 +35,6 @@ type Opt struct {
ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc
ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
Entitlements []string
ParallelismSem *semaphore.Weighted
}
type Controller struct { // TODO: ControlService
@ -54,7 +52,7 @@ func NewController(opt Opt) (*Controller, error) {
gatewayForwarder := controlgateway.NewGatewayForwarder()
solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFuncs, gatewayForwarder, opt.SessionManager, opt.Entitlements, opt.ParallelismSem)
solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFuncs, gatewayForwarder, opt.SessionManager, opt.Entitlements)
if err != nil {
return nil, errors.Wrap(err, "failed to create solver")
}

View File

@ -16,7 +16,6 @@ import (
digest "github.com/opencontainers/go-digest"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)
// ResolveOpFunc finds an Op implementation for a Vertex
@ -238,9 +237,8 @@ type Job struct {
}
type SolverOpt struct {
ResolveOpFunc ResolveOpFunc
DefaultCache CacheManager
ParallelismSem *semaphore.Weighted
ResolveOpFunc ResolveOpFunc
DefaultCache CacheManager
}
func NewSolver(opts SolverOpt) *Solver {
@ -768,13 +766,11 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
if s.execRes != nil || s.execErr != nil {
return s.execRes, s.execErr
}
if s.st.opts.ParallelismSem != nil && op.CountsAsParallelism() {
err := s.st.opts.ParallelismSem.Acquire(ctx, 1)
if err != nil {
return nil, errors.Wrap(err, "acquire parallelism sem")
}
defer s.st.opts.ParallelismSem.Release(1)
release, err := op.Acquire(ctx)
if err != nil {
return nil, errors.Wrap(err, "acquire op resources")
}
defer release()
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
ctx = withAncestorCacheOpts(ctx, s.st)

View File

@ -16,6 +16,7 @@ import (
func init() {
integration.InitOCIWorker()
integration.InitContainerdWorker()
}
func TestJobsIntegration(t *testing.T) {

View File

@ -142,6 +142,7 @@ func (b *buildOp) Exec(ctx context.Context, g session.Group, inputs []solver.Res
return []solver.Result{r}, err
}
func (b *buildOp) CountsAsParallelism() bool {
return false
func (b *buildOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) {
// buildOp itself does not count towards parallelism budget.
return func() {}, nil
}

View File

@ -29,33 +29,36 @@ import (
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
const execCacheType = "buildkit.exec.v0"
type execOp struct {
op *pb.ExecOp
cm cache.Manager
mm *mounts.MountManager
exec executor.Executor
w worker.Worker
platform *pb.Platform
numInputs int
op *pb.ExecOp
cm cache.Manager
mm *mounts.MountManager
exec executor.Executor
w worker.Worker
platform *pb.Platform
numInputs int
parallelism *semaphore.Weighted
}
func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, sm *session.Manager, md *metadata.Store, exec executor.Executor, w worker.Worker) (solver.Op, error) {
func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, parallelism *semaphore.Weighted, sm *session.Manager, md *metadata.Store, exec executor.Executor, w worker.Worker) (solver.Op, error) {
if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil {
return nil, err
}
name := fmt.Sprintf("exec %s", strings.Join(op.Exec.Meta.Args, " "))
return &execOp{
op: op.Exec,
mm: mounts.NewMountManager(name, cm, sm, md),
cm: cm,
exec: exec,
numInputs: len(v.Inputs()),
w: w,
platform: platform,
op: op.Exec,
mm: mounts.NewMountManager(name, cm, sm, md),
cm: cm,
exec: exec,
numInputs: len(v.Inputs()),
w: w,
platform: platform,
parallelism: parallelism,
}, nil
}
@ -389,6 +392,15 @@ func parseExtraHosts(ips []*pb.HostIP) ([]executor.HostIP, error) {
return out, nil
}
func (e *execOp) CountsAsParallelism() bool {
return true
func (e *execOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) {
if e.parallelism == nil {
return func() {}, nil
}
err := e.parallelism.Acquire(ctx, 1)
if err != nil {
return nil, err
}
return func() {
e.parallelism.Release(1)
}, nil
}

View File

@ -24,28 +24,31 @@ import (
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
const fileCacheType = "buildkit.file.v0"
type fileOp struct {
op *pb.FileOp
md *metadata.Store
w worker.Worker
solver *FileOpSolver
numInputs int
op *pb.FileOp
md *metadata.Store
w worker.Worker
solver *FileOpSolver
numInputs int
parallelism *semaphore.Weighted
}
func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, md *metadata.Store, w worker.Worker) (solver.Op, error) {
func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, parallelism *semaphore.Weighted, md *metadata.Store, w worker.Worker) (solver.Op, error) {
if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil {
return nil, err
}
return &fileOp{
op: op.File,
md: md,
numInputs: len(v.Inputs()),
w: w,
solver: NewFileOpSolver(w, &file.Backend{}, file.NewRefManager(cm)),
op: op.File,
md: md,
numInputs: len(v.Inputs()),
w: w,
solver: NewFileOpSolver(w, &file.Backend{}, file.NewRefManager(cm)),
parallelism: parallelism,
}, nil
}
@ -179,8 +182,17 @@ func (f *fileOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
return outResults, nil
}
func (f *fileOp) CountsAsParallelism() bool {
return true
func (f *fileOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) {
if f.parallelism == nil {
return func() {}, nil
}
err := f.parallelism.Acquire(ctx, 1)
if err != nil {
return nil, err
}
return func() {
f.parallelism.Release(1)
}, nil
}
func addSelector(m map[int]map[llbsolver.Selector]struct{}, idx int, sel string, wildcard, followLinks bool) {

View File

@ -12,32 +12,35 @@ import (
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"golang.org/x/sync/semaphore"
)
const sourceCacheType = "buildkit.source.v0"
type sourceOp struct {
mu sync.Mutex
op *pb.Op_Source
platform *pb.Platform
sm *source.Manager
src source.SourceInstance
sessM *session.Manager
w worker.Worker
vtx solver.Vertex
mu sync.Mutex
op *pb.Op_Source
platform *pb.Platform
sm *source.Manager
src source.SourceInstance
sessM *session.Manager
w worker.Worker
vtx solver.Vertex
parallelism *semaphore.Weighted
}
func NewSourceOp(vtx solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, sessM *session.Manager, w worker.Worker) (solver.Op, error) {
func NewSourceOp(vtx solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, parallelism *semaphore.Weighted, sessM *session.Manager, w worker.Worker) (solver.Op, error) {
if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil {
return nil, err
}
return &sourceOp{
op: op,
sm: sm,
w: w,
sessM: sessM,
platform: platform,
vtx: vtx,
op: op,
sm: sm,
w: w,
sessM: sessM,
platform: platform,
vtx: vtx,
parallelism: parallelism,
}, nil
}
@ -94,6 +97,15 @@ func (s *sourceOp) Exec(ctx context.Context, g session.Group, _ []solver.Result)
return []solver.Result{worker.NewWorkerRefResult(ref, s.w)}, nil
}
func (s *sourceOp) CountsAsParallelism() bool {
return true
func (s *sourceOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) {
if s.parallelism == nil {
return func() {}, nil
}
err := s.parallelism.Acquire(ctx, 1)
if err != nil {
return nil, err
}
return func() {
s.parallelism.Release(1)
}, nil
}

View File

@ -23,7 +23,6 @@ import (
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
const keyEntitlements = "llb.entitlements"
@ -49,7 +48,7 @@ type Solver struct {
entitlements []string
}
func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI map[string]remotecache.ResolveCacheImporterFunc, gatewayForwarder *controlgateway.GatewayForwarder, sm *session.Manager, ents []string, parallelismSem *semaphore.Weighted) (*Solver, error) {
func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI map[string]remotecache.ResolveCacheImporterFunc, gatewayForwarder *controlgateway.GatewayForwarder, sm *session.Manager, ents []string) (*Solver, error) {
s := &Solver{
workerController: wc,
resolveWorker: defaultResolver(wc),
@ -62,9 +61,8 @@ func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.Cac
}
s.solver = solver.NewSolver(solver.SolverOpt{
ResolveOpFunc: s.resolver(),
DefaultCache: cache,
ParallelismSem: parallelismSem,
ResolveOpFunc: s.resolver(),
DefaultCache: cache,
})
return s, nil
}

View File

@ -3511,8 +3511,8 @@ func (v *vertex) Exec(ctx context.Context, g session.Group, inputs []Result) (ou
return []Result{&dummyResult{id: identity.NewID(), value: v.opt.value}}, nil
}
func (v *vertex) CountsAsParallelism() bool {
return false
func (v *vertex) Acquire(ctx context.Context) (ReleaseFunc, error) {
return func() {}, nil
}
func (v *vertex) makeCacheMap() *CacheMap {
@ -3560,8 +3560,8 @@ func (v *vertexConst) Exec(ctx context.Context, g session.Group, inputs []Result
return []Result{&dummyResult{id: identity.NewID(), intValue: v.value}}, nil
}
func (v *vertexConst) CountsAsParallelism() bool {
return false
func (v *vertexConst) Acquire(ctx context.Context) (ReleaseFunc, error) {
return func() {}, nil
}
// vtxSum returns a vertex that ourputs sum of its inputs plus a constant
@ -3599,8 +3599,8 @@ func (v *vertexSum) Exec(ctx context.Context, g session.Group, inputs []Result)
return []Result{&dummyResult{id: identity.NewID(), intValue: s}}, nil
}
func (v *vertexSum) CountsAsParallelism() bool {
return false
func (v *vertexSum) Acquire(ctx context.Context) (ReleaseFunc, error) {
return func() {}, nil
}
func vtxSubBuild(g Edge, opt vtxOpt) *vertexSubBuild {
@ -3634,8 +3634,8 @@ func (v *vertexSubBuild) Exec(ctx context.Context, g session.Group, inputs []Res
return []Result{res}, nil
}
func (v *vertexSubBuild) CountsAsParallelism() bool {
return false
func (v *vertexSubBuild) Acquire(ctx context.Context) (ReleaseFunc, error) {
return func() {}, nil
}
//nolint:unused

View File

@ -135,6 +135,8 @@ type CacheLink struct {
Selector digest.Digest `json:",omitempty"`
}
type ReleaseFunc func()
// Op defines how the solver can evaluate the properties of a vertex operation.
// An op is executed in the worker, and is retrieved from the vertex by the
// value of `vertex.Sys()`. The solver is configured with a resolve function to
@ -147,9 +149,8 @@ type Op interface {
// Exec runs an operation given results from previous operations.
Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error)
// CountsAsParallelism specifies whether the `Op` should be subject to the parallelism
// semaphore.
CountsAsParallelism() bool
// Acquire acquires the necessary resources to execute the `Op`.
Acquire(ctx context.Context) (release ReleaseFunc, err error)
}
type ResultBasedCacheFunc func(context.Context, Result, session.Group) (digest.Digest, error)

View File

@ -50,6 +50,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"golang.org/x/sync/semaphore"
)
const labelCreatedAt = "buildkit/createdat"
@ -74,6 +75,7 @@ type WorkerOpt struct {
IdentityMapping *idtools.IdentityMapping
LeaseManager leases.Manager
GarbageCollect func(context.Context) (gc.Stats, error)
ParallelismSem *semaphore.Weighted
}
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
@ -261,11 +263,11 @@ func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *se
if baseOp, ok := v.Sys().(*pb.Op); ok {
switch op := baseOp.Op.(type) {
case *pb.Op_Source:
return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, sm, w)
return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, w.ParallelismSem, sm, w)
case *pb.Op_Exec:
return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, sm, w.WorkerOpt.MetadataStore, w.WorkerOpt.Executor, w)
return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, w.ParallelismSem, sm, w.WorkerOpt.MetadataStore, w.WorkerOpt.Executor, w)
case *pb.Op_File:
return ops.NewFileOp(v, op, w.CacheMgr, w.WorkerOpt.MetadataStore, w)
return ops.NewFileOp(v, op, w.CacheMgr, w.ParallelismSem, w.WorkerOpt.MetadataStore, w)
case *pb.Op_Build:
return ops.NewBuildOp(v, op, s, w)
default:

View File

@ -22,19 +22,20 @@ import (
"github.com/moby/buildkit/worker/base"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)
// NewWorkerOpt creates a WorkerOpt.
func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
opts = append(opts, containerd.WithDefaultNamespace(ns))
client, err := containerd.New(address, opts...)
if err != nil {
return base.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address)
}
return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile)
return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile, parallelismSem)
}
func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string) (base.WorkerOpt, error) {
func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) {
if strings.Contains(snapshotterName, "/") {
return base.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName)
}
@ -123,6 +124,7 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
Platforms: platforms,
LeaseManager: lm,
GarbageCollect: gc,
ParallelismSem: parallelismSem,
}
return opt, nil
}

View File

@ -30,7 +30,7 @@ func newWorkerOpt(t *testing.T, addr string) (base.WorkerOpt, func()) {
tmpdir, err := ioutil.TempDir("", "workertest")
require.NoError(t, err)
cleanup := func() { os.RemoveAll(tmpdir) }
workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "")
workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "", nil)
require.NoError(t, err)
return workerOpt, cleanup
}

View File

@ -23,6 +23,7 @@ import (
"github.com/moby/buildkit/worker/base"
specs "github.com/opencontainers/image-spec/specs-go/v1"
bolt "go.etcd.io/bbolt"
"golang.org/x/sync/semaphore"
)
// SnapshotterFactory instantiates a snapshotter
@ -32,7 +33,7 @@ type SnapshotterFactory struct {
}
// NewWorkerOpt creates a WorkerOpt.
func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string) (base.WorkerOpt, error) {
func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) {
var opt base.WorkerOpt
name := "runc-" + snFactory.Name
root = filepath.Join(root, name)
@ -124,6 +125,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
IdentityMapping: idmap,
LeaseManager: lm,
GarbageCollect: mdb.GarbageCollect,
ParallelismSem: parallelismSem,
}
return opt, nil
}

View File

@ -40,7 +40,7 @@ func newWorkerOpt(t *testing.T, processMode oci.ProcessMode) (base.WorkerOpt, fu
},
}
rootless := false
workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "")
workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "", nil)
require.NoError(t, err)
return workerOpt, cleanup