From 489e17aea91829a4ada48cb09e1119606802dc5e Mon Sep 17 00:00:00 2001 From: "Vlad A. Ionescu" Date: Mon, 29 Mar 2021 14:23:14 -0700 Subject: [PATCH 1/3] Add a configuration item to limit parallelism. Signed-off-by: Vlad A. Ionescu --- cmd/buildkitd/config/config.go | 2 + cmd/buildkitd/main.go | 6 ++ control/control.go | 4 +- solver/jobs.go | 13 +++- solver/jobs_test.go | 106 +++++++++++++++++++++++++++++++++ solver/llbsolver/ops/build.go | 4 ++ solver/llbsolver/ops/exec.go | 4 ++ solver/llbsolver/ops/file.go | 4 ++ solver/llbsolver/ops/source.go | 4 ++ solver/llbsolver/solver.go | 8 ++- solver/scheduler_test.go | 16 +++++ solver/types.go | 4 ++ 12 files changed, 169 insertions(+), 6 deletions(-) create mode 100644 solver/jobs_test.go diff --git a/cmd/buildkitd/config/config.go b/cmd/buildkitd/config/config.go index 1841fefe..a3485c02 100644 --- a/cmd/buildkitd/config/config.go +++ b/cmd/buildkitd/config/config.go @@ -22,6 +22,8 @@ type Config struct { Registries map[string]RegistryConfig `toml:"registry"` DNS *DNSConfig `toml:"dns"` + + MaxParallelism int `toml:"max-parallelism"` } type GRPCConfig struct { diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index e1b62c3a..d9ccc556 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -55,6 +55,7 @@ import ( "github.com/sirupsen/logrus" "github.com/urfave/cli" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" ) @@ -621,6 +622,10 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont "registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn), "local": localremotecache.ResolveCacheImporterFunc(sessionManager), } + var parallelismSem *semaphore.Weighted + if cfg.MaxParallelism > 0 { + parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism)) + } return control.NewController(control.Opt{ SessionManager: sessionManager, WorkerController: wc, @@ -629,6 +634,7 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont ResolveCacheImporterFuncs: remoteCacheImporterFuncs, CacheKeyStorage: cacheStorage, Entitlements: cfg.Entitlements, + ParallelismSem: parallelismSem, }) } diff --git a/control/control.go b/control/control.go index 048e6966..44f5bd5f 100644 --- a/control/control.go +++ b/control/control.go @@ -24,6 +24,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" ) @@ -35,6 +36,7 @@ type Opt struct { ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc Entitlements []string + ParallelismSem *semaphore.Weighted } type Controller struct { // TODO: ControlService @@ -52,7 +54,7 @@ func NewController(opt Opt) (*Controller, error) { gatewayForwarder := controlgateway.NewGatewayForwarder() - solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFuncs, gatewayForwarder, opt.SessionManager, opt.Entitlements) + solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFuncs, gatewayForwarder, opt.SessionManager, opt.Entitlements, opt.ParallelismSem) if err != nil { return nil, errors.Wrap(err, "failed to create solver") } diff --git a/solver/jobs.go b/solver/jobs.go index 1942fdf1..97b59cfe 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -16,6 +16,7 @@ import ( digest "github.com/opencontainers/go-digest" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "golang.org/x/sync/semaphore" ) // ResolveOpFunc finds an Op implementation for a Vertex @@ -237,8 +238,9 @@ type Job struct { } type SolverOpt struct { - ResolveOpFunc ResolveOpFunc - DefaultCache CacheManager + ResolveOpFunc ResolveOpFunc + DefaultCache CacheManager + ParallelismSem *semaphore.Weighted } func NewSolver(opts SolverOpt) *Solver { @@ -766,6 +768,13 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, if s.execRes != nil || s.execErr != nil { return s.execRes, s.execErr } + if s.st.opts.ParallelismSem != nil && op.CountsAsParallelism() { + err := s.st.opts.ParallelismSem.Acquire(ctx, 1) + if err != nil { + return nil, errors.Wrap(err, "acquire parallelism sem") + } + defer s.st.opts.ParallelismSem.Release(1) + } ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan) ctx = withAncestorCacheOpts(ctx, s.st) diff --git a/solver/jobs_test.go b/solver/jobs_test.go new file mode 100644 index 00000000..d90b6c1a --- /dev/null +++ b/solver/jobs_test.go @@ -0,0 +1,106 @@ +package solver + +import ( + "context" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/client/llb" + "github.com/moby/buildkit/util/testutil/integration" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func init() { + integration.InitOCIWorker() +} + +func TestJobsIntegration(t *testing.T) { + mirrors := integration.WithMirroredImages(integration.OfficialImages("busybox:latest")) + integration.Run(t, []integration.Test{ + testParallelism, + }, + mirrors, + integration.WithMatrix("max-parallelism", map[string]interface{}{ + "single": maxParallelismSingle, + "unlimited": maxParallelismUnlimited, + }), + ) +} + +func testParallelism(t *testing.T, sb integration.Sandbox) { + ctx := context.TODO() + + c, err := client.New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + cacheMount := llb.AddMount( + "/shared", llb.Scratch(), + llb.AsPersistentCacheDir("shared", llb.CacheMountShared)) + run1 := llb.Image("busybox:latest").Run( + llb.Args([]string{ + "/bin/sh", "-c", + "touch /shared/signal1 && i=0; while [ ! -f /shared/signal2 ] && [ $i -lt 10 ]; do i=$((i+1)); sleep 1; done", + }), + cacheMount, + ).Root() + d1, err := run1.Marshal(ctx) + require.NoError(t, err) + run2 := llb.Image("busybox:latest").Run( + llb.Args([]string{ + "/bin/sh", "-c", + "touch /shared/signal2 && i=0; while [ ! -f /shared/signal1 ] && [ $i -lt 10 ]; do i=$((i+1)); sleep 1; done", + }), + cacheMount, + ).Root() + d2, err := run2.Marshal(ctx) + require.NoError(t, err) + + timeStart := time.Now() + eg, egCtx := errgroup.WithContext(ctx) + tmpDir, err := ioutil.TempDir("", "solver-jobs-test-") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + solveOpt := client.SolveOpt{ + LocalDirs: map[string]string{"cache": tmpDir}, + } + eg.Go(func() error { + _, err := c.Solve(egCtx, d1, solveOpt, nil) + return err + }) + eg.Go(func() error { + _, err := c.Solve(egCtx, d2, solveOpt, nil) + return err + }) + err = eg.Wait() + require.NoError(t, err) + + elapsed := time.Since(timeStart) + + maxParallelism := sb.Value("max-parallelism") + if maxParallelism == maxParallelismSingle { + require.Greater(t, elapsed, 10*time.Second, "parallelism not restricted") + } else if maxParallelism == maxParallelismUnlimited { + require.Less(t, elapsed, 10*time.Second, "parallelism hindered") + } +} + +type parallelismSetterSingle struct{} + +func (*parallelismSetterSingle) UpdateConfigFile(in string) string { + return in + "\n\nmax-parallelism = 1\n" +} + +var maxParallelismSingle integration.ConfigUpdater = ¶llelismSetterSingle{} + +type parallelismSetterUnlimited struct{} + +func (*parallelismSetterUnlimited) UpdateConfigFile(in string) string { + return in +} + +var maxParallelismUnlimited integration.ConfigUpdater = ¶llelismSetterUnlimited{} diff --git a/solver/llbsolver/ops/build.go b/solver/llbsolver/ops/build.go index 5e7fbc7a..fe1f6607 100644 --- a/solver/llbsolver/ops/build.go +++ b/solver/llbsolver/ops/build.go @@ -141,3 +141,7 @@ func (b *buildOp) Exec(ctx context.Context, g session.Group, inputs []solver.Res return []solver.Result{r}, err } + +func (b *buildOp) CountsAsParallelism() bool { + return false +} diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index d9f9b0ac..f13c5def 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -388,3 +388,7 @@ func parseExtraHosts(ips []*pb.HostIP) ([]executor.HostIP, error) { } return out, nil } + +func (e *execOp) CountsAsParallelism() bool { + return true +} diff --git a/solver/llbsolver/ops/file.go b/solver/llbsolver/ops/file.go index 554532c1..54fb3a79 100644 --- a/solver/llbsolver/ops/file.go +++ b/solver/llbsolver/ops/file.go @@ -179,6 +179,10 @@ func (f *fileOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu return outResults, nil } +func (f *fileOp) CountsAsParallelism() bool { + return true +} + func addSelector(m map[int]map[llbsolver.Selector]struct{}, idx int, sel string, wildcard, followLinks bool) { mm, ok := m[idx] if !ok { diff --git a/solver/llbsolver/ops/source.go b/solver/llbsolver/ops/source.go index 4f8b6694..0ec76146 100644 --- a/solver/llbsolver/ops/source.go +++ b/solver/llbsolver/ops/source.go @@ -93,3 +93,7 @@ func (s *sourceOp) Exec(ctx context.Context, g session.Group, _ []solver.Result) } return []solver.Result{worker.NewWorkerRefResult(ref, s.w)}, nil } + +func (s *sourceOp) CountsAsParallelism() bool { + return true +} diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 31e8afca..d6925b5c 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -23,6 +23,7 @@ import ( digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) const keyEntitlements = "llb.entitlements" @@ -48,7 +49,7 @@ type Solver struct { entitlements []string } -func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI map[string]remotecache.ResolveCacheImporterFunc, gatewayForwarder *controlgateway.GatewayForwarder, sm *session.Manager, ents []string) (*Solver, error) { +func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI map[string]remotecache.ResolveCacheImporterFunc, gatewayForwarder *controlgateway.GatewayForwarder, sm *session.Manager, ents []string, parallelismSem *semaphore.Weighted) (*Solver, error) { s := &Solver{ workerController: wc, resolveWorker: defaultResolver(wc), @@ -61,8 +62,9 @@ func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.Cac } s.solver = solver.NewSolver(solver.SolverOpt{ - ResolveOpFunc: s.resolver(), - DefaultCache: cache, + ResolveOpFunc: s.resolver(), + DefaultCache: cache, + ParallelismSem: parallelismSem, }) return s, nil } diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 4c58f3e1..77b33933 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3511,6 +3511,10 @@ func (v *vertex) Exec(ctx context.Context, g session.Group, inputs []Result) (ou return []Result{&dummyResult{id: identity.NewID(), value: v.opt.value}}, nil } +func (v *vertex) CountsAsParallelism() bool { + return false +} + func (v *vertex) makeCacheMap() *CacheMap { m := &CacheMap{ Digest: digest.FromBytes([]byte(fmt.Sprintf("seed:%s", v.opt.cacheKeySeed))), @@ -3556,6 +3560,10 @@ func (v *vertexConst) Exec(ctx context.Context, g session.Group, inputs []Result return []Result{&dummyResult{id: identity.NewID(), intValue: v.value}}, nil } +func (v *vertexConst) CountsAsParallelism() bool { + return false +} + // vtxSum returns a vertex that ourputs sum of its inputs plus a constant func vtxSum(v int, opt vtxOpt) *vertexSum { if opt.cacheKeySeed == "" { @@ -3591,6 +3599,10 @@ func (v *vertexSum) Exec(ctx context.Context, g session.Group, inputs []Result) return []Result{&dummyResult{id: identity.NewID(), intValue: s}}, nil } +func (v *vertexSum) CountsAsParallelism() bool { + return false +} + func vtxSubBuild(g Edge, opt vtxOpt) *vertexSubBuild { if opt.cacheKeySeed == "" { opt.cacheKeySeed = fmt.Sprintf("sum-%s", identity.NewID()) @@ -3622,6 +3634,10 @@ func (v *vertexSubBuild) Exec(ctx context.Context, g session.Group, inputs []Res return []Result{res}, nil } +func (v *vertexSubBuild) CountsAsParallelism() bool { + return false +} + //nolint:unused func printGraph(e Edge, pfx string) { name := e.Vertex.Name() diff --git a/solver/types.go b/solver/types.go index f9de6a93..0bfcc44f 100644 --- a/solver/types.go +++ b/solver/types.go @@ -146,6 +146,10 @@ type Op interface { // Exec runs an operation given results from previous operations. Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error) + + // CountsAsParallelism specifies whether the `Op` should be subject to the parallelism + // semaphore. + CountsAsParallelism() bool } type ResultBasedCacheFunc func(context.Context, Result, session.Group) (digest.Digest, error) From b3cf7c43cfefdfd7a945002c0e76b54e346ab6cf Mon Sep 17 00:00:00 2001 From: "Vlad A. Ionescu" Date: Wed, 12 May 2021 13:41:33 +0300 Subject: [PATCH 2/3] Switch to Acquire API. Signed-off-by: Vlad A. Ionescu --- cmd/buildkitd/main.go | 6 ---- cmd/buildkitd/main_containerd_worker.go | 8 ++++- cmd/buildkitd/main_oci_worker.go | 8 ++++- control/control.go | 4 +-- solver/jobs.go | 16 ++++----- solver/jobs_test.go | 1 + solver/llbsolver/ops/build.go | 5 +-- solver/llbsolver/ops/exec.go | 46 ++++++++++++++++--------- solver/llbsolver/ops/file.go | 38 +++++++++++++------- solver/llbsolver/ops/source.go | 46 ++++++++++++++++--------- solver/llbsolver/solver.go | 8 ++--- solver/scheduler_test.go | 16 ++++----- solver/types.go | 7 ++-- worker/base/worker.go | 8 +++-- worker/containerd/containerd.go | 8 +++-- worker/containerd/containerd_test.go | 2 +- worker/runc/runc.go | 4 ++- worker/runc/runc_test.go | 2 +- 18 files changed, 138 insertions(+), 95 deletions(-) diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index d9ccc556..e1b62c3a 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -55,7 +55,6 @@ import ( "github.com/sirupsen/logrus" "github.com/urfave/cli" "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" "google.golang.org/grpc" ) @@ -622,10 +621,6 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont "registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn), "local": localremotecache.ResolveCacheImporterFunc(sessionManager), } - var parallelismSem *semaphore.Weighted - if cfg.MaxParallelism > 0 { - parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism)) - } return control.NewController(control.Opt{ SessionManager: sessionManager, WorkerController: wc, @@ -634,7 +629,6 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont ResolveCacheImporterFuncs: remoteCacheImporterFuncs, CacheKeyStorage: cacheStorage, Entitlements: cfg.Entitlements, - ParallelismSem: parallelismSem, }) } diff --git a/cmd/buildkitd/main_containerd_worker.go b/cmd/buildkitd/main_containerd_worker.go index cde0f8ae..b42b6c24 100644 --- a/cmd/buildkitd/main_containerd_worker.go +++ b/cmd/buildkitd/main_containerd_worker.go @@ -18,6 +18,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" + "golang.org/x/sync/semaphore" ) const ( @@ -225,11 +226,16 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([ }, } + var parallelismSem *semaphore.Weighted + if common.config.MaxParallelism > 0 { + parallelismSem = semaphore.NewWeighted(int64(common.config.MaxParallelism)) + } + snapshotter := ctd.DefaultSnapshotter if cfg.Snapshotter != "" { snapshotter = cfg.Snapshotter } - opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, ctd.WithTimeout(60*time.Second)) + opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, ctd.WithTimeout(60*time.Second)) if err != nil { return nil, err } diff --git a/cmd/buildkitd/main_oci_worker.go b/cmd/buildkitd/main_oci_worker.go index 450dc3b3..ccc9d3b0 100644 --- a/cmd/buildkitd/main_oci_worker.go +++ b/cmd/buildkitd/main_oci_worker.go @@ -36,6 +36,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/backoff" ) @@ -276,7 +277,12 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker }, } - opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile) + var parallelismSem *semaphore.Weighted + if common.config.MaxParallelism > 0 { + parallelismSem = semaphore.NewWeighted(int64(common.config.MaxParallelism)) + } + + opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem) if err != nil { return nil, err } diff --git a/control/control.go b/control/control.go index 44f5bd5f..048e6966 100644 --- a/control/control.go +++ b/control/control.go @@ -24,7 +24,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" "google.golang.org/grpc" ) @@ -36,7 +35,6 @@ type Opt struct { ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc Entitlements []string - ParallelismSem *semaphore.Weighted } type Controller struct { // TODO: ControlService @@ -54,7 +52,7 @@ func NewController(opt Opt) (*Controller, error) { gatewayForwarder := controlgateway.NewGatewayForwarder() - solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFuncs, gatewayForwarder, opt.SessionManager, opt.Entitlements, opt.ParallelismSem) + solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFuncs, gatewayForwarder, opt.SessionManager, opt.Entitlements) if err != nil { return nil, errors.Wrap(err, "failed to create solver") } diff --git a/solver/jobs.go b/solver/jobs.go index 97b59cfe..20479818 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -16,7 +16,6 @@ import ( digest "github.com/opencontainers/go-digest" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "golang.org/x/sync/semaphore" ) // ResolveOpFunc finds an Op implementation for a Vertex @@ -238,9 +237,8 @@ type Job struct { } type SolverOpt struct { - ResolveOpFunc ResolveOpFunc - DefaultCache CacheManager - ParallelismSem *semaphore.Weighted + ResolveOpFunc ResolveOpFunc + DefaultCache CacheManager } func NewSolver(opts SolverOpt) *Solver { @@ -768,13 +766,11 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, if s.execRes != nil || s.execErr != nil { return s.execRes, s.execErr } - if s.st.opts.ParallelismSem != nil && op.CountsAsParallelism() { - err := s.st.opts.ParallelismSem.Acquire(ctx, 1) - if err != nil { - return nil, errors.Wrap(err, "acquire parallelism sem") - } - defer s.st.opts.ParallelismSem.Release(1) + release, err := op.Acquire(ctx) + if err != nil { + return nil, errors.Wrap(err, "acquire op resources") } + defer release() ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan) ctx = withAncestorCacheOpts(ctx, s.st) diff --git a/solver/jobs_test.go b/solver/jobs_test.go index d90b6c1a..af0f7011 100644 --- a/solver/jobs_test.go +++ b/solver/jobs_test.go @@ -16,6 +16,7 @@ import ( func init() { integration.InitOCIWorker() + integration.InitContainerdWorker() } func TestJobsIntegration(t *testing.T) { diff --git a/solver/llbsolver/ops/build.go b/solver/llbsolver/ops/build.go index fe1f6607..39d2a770 100644 --- a/solver/llbsolver/ops/build.go +++ b/solver/llbsolver/ops/build.go @@ -142,6 +142,7 @@ func (b *buildOp) Exec(ctx context.Context, g session.Group, inputs []solver.Res return []solver.Result{r}, err } -func (b *buildOp) CountsAsParallelism() bool { - return false +func (b *buildOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) { + // buildOp itself does not count towards parallelism budget. + return func() {}, nil } diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index f13c5def..b5ef0f16 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -29,33 +29,36 @@ import ( specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" ) const execCacheType = "buildkit.exec.v0" type execOp struct { - op *pb.ExecOp - cm cache.Manager - mm *mounts.MountManager - exec executor.Executor - w worker.Worker - platform *pb.Platform - numInputs int + op *pb.ExecOp + cm cache.Manager + mm *mounts.MountManager + exec executor.Executor + w worker.Worker + platform *pb.Platform + numInputs int + parallelism *semaphore.Weighted } -func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, sm *session.Manager, md *metadata.Store, exec executor.Executor, w worker.Worker) (solver.Op, error) { +func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, parallelism *semaphore.Weighted, sm *session.Manager, md *metadata.Store, exec executor.Executor, w worker.Worker) (solver.Op, error) { if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil { return nil, err } name := fmt.Sprintf("exec %s", strings.Join(op.Exec.Meta.Args, " ")) return &execOp{ - op: op.Exec, - mm: mounts.NewMountManager(name, cm, sm, md), - cm: cm, - exec: exec, - numInputs: len(v.Inputs()), - w: w, - platform: platform, + op: op.Exec, + mm: mounts.NewMountManager(name, cm, sm, md), + cm: cm, + exec: exec, + numInputs: len(v.Inputs()), + w: w, + platform: platform, + parallelism: parallelism, }, nil } @@ -389,6 +392,15 @@ func parseExtraHosts(ips []*pb.HostIP) ([]executor.HostIP, error) { return out, nil } -func (e *execOp) CountsAsParallelism() bool { - return true +func (e *execOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) { + if e.parallelism == nil { + return func() {}, nil + } + err := e.parallelism.Acquire(ctx, 1) + if err != nil { + return nil, err + } + return func() { + e.parallelism.Release(1) + }, nil } diff --git a/solver/llbsolver/ops/file.go b/solver/llbsolver/ops/file.go index 54fb3a79..aeabda9b 100644 --- a/solver/llbsolver/ops/file.go +++ b/solver/llbsolver/ops/file.go @@ -24,28 +24,31 @@ import ( digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) const fileCacheType = "buildkit.file.v0" type fileOp struct { - op *pb.FileOp - md *metadata.Store - w worker.Worker - solver *FileOpSolver - numInputs int + op *pb.FileOp + md *metadata.Store + w worker.Worker + solver *FileOpSolver + numInputs int + parallelism *semaphore.Weighted } -func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, md *metadata.Store, w worker.Worker) (solver.Op, error) { +func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, parallelism *semaphore.Weighted, md *metadata.Store, w worker.Worker) (solver.Op, error) { if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil { return nil, err } return &fileOp{ - op: op.File, - md: md, - numInputs: len(v.Inputs()), - w: w, - solver: NewFileOpSolver(w, &file.Backend{}, file.NewRefManager(cm)), + op: op.File, + md: md, + numInputs: len(v.Inputs()), + w: w, + solver: NewFileOpSolver(w, &file.Backend{}, file.NewRefManager(cm)), + parallelism: parallelism, }, nil } @@ -179,8 +182,17 @@ func (f *fileOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu return outResults, nil } -func (f *fileOp) CountsAsParallelism() bool { - return true +func (f *fileOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) { + if f.parallelism == nil { + return func() {}, nil + } + err := f.parallelism.Acquire(ctx, 1) + if err != nil { + return nil, err + } + return func() { + f.parallelism.Release(1) + }, nil } func addSelector(m map[int]map[llbsolver.Selector]struct{}, idx int, sel string, wildcard, followLinks bool) { diff --git a/solver/llbsolver/ops/source.go b/solver/llbsolver/ops/source.go index 0ec76146..6cec6320 100644 --- a/solver/llbsolver/ops/source.go +++ b/solver/llbsolver/ops/source.go @@ -12,32 +12,35 @@ import ( "github.com/moby/buildkit/source" "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" + "golang.org/x/sync/semaphore" ) const sourceCacheType = "buildkit.source.v0" type sourceOp struct { - mu sync.Mutex - op *pb.Op_Source - platform *pb.Platform - sm *source.Manager - src source.SourceInstance - sessM *session.Manager - w worker.Worker - vtx solver.Vertex + mu sync.Mutex + op *pb.Op_Source + platform *pb.Platform + sm *source.Manager + src source.SourceInstance + sessM *session.Manager + w worker.Worker + vtx solver.Vertex + parallelism *semaphore.Weighted } -func NewSourceOp(vtx solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, sessM *session.Manager, w worker.Worker) (solver.Op, error) { +func NewSourceOp(vtx solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, parallelism *semaphore.Weighted, sessM *session.Manager, w worker.Worker) (solver.Op, error) { if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil { return nil, err } return &sourceOp{ - op: op, - sm: sm, - w: w, - sessM: sessM, - platform: platform, - vtx: vtx, + op: op, + sm: sm, + w: w, + sessM: sessM, + platform: platform, + vtx: vtx, + parallelism: parallelism, }, nil } @@ -94,6 +97,15 @@ func (s *sourceOp) Exec(ctx context.Context, g session.Group, _ []solver.Result) return []solver.Result{worker.NewWorkerRefResult(ref, s.w)}, nil } -func (s *sourceOp) CountsAsParallelism() bool { - return true +func (s *sourceOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) { + if s.parallelism == nil { + return func() {}, nil + } + err := s.parallelism.Acquire(ctx, 1) + if err != nil { + return nil, err + } + return func() { + s.parallelism.Release(1) + }, nil } diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index d6925b5c..31e8afca 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -23,7 +23,6 @@ import ( digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" ) const keyEntitlements = "llb.entitlements" @@ -49,7 +48,7 @@ type Solver struct { entitlements []string } -func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI map[string]remotecache.ResolveCacheImporterFunc, gatewayForwarder *controlgateway.GatewayForwarder, sm *session.Manager, ents []string, parallelismSem *semaphore.Weighted) (*Solver, error) { +func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI map[string]remotecache.ResolveCacheImporterFunc, gatewayForwarder *controlgateway.GatewayForwarder, sm *session.Manager, ents []string) (*Solver, error) { s := &Solver{ workerController: wc, resolveWorker: defaultResolver(wc), @@ -62,9 +61,8 @@ func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.Cac } s.solver = solver.NewSolver(solver.SolverOpt{ - ResolveOpFunc: s.resolver(), - DefaultCache: cache, - ParallelismSem: parallelismSem, + ResolveOpFunc: s.resolver(), + DefaultCache: cache, }) return s, nil } diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 77b33933..92d1d626 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3511,8 +3511,8 @@ func (v *vertex) Exec(ctx context.Context, g session.Group, inputs []Result) (ou return []Result{&dummyResult{id: identity.NewID(), value: v.opt.value}}, nil } -func (v *vertex) CountsAsParallelism() bool { - return false +func (v *vertex) Acquire(ctx context.Context) (ReleaseFunc, error) { + return func() {}, nil } func (v *vertex) makeCacheMap() *CacheMap { @@ -3560,8 +3560,8 @@ func (v *vertexConst) Exec(ctx context.Context, g session.Group, inputs []Result return []Result{&dummyResult{id: identity.NewID(), intValue: v.value}}, nil } -func (v *vertexConst) CountsAsParallelism() bool { - return false +func (v *vertexConst) Acquire(ctx context.Context) (ReleaseFunc, error) { + return func() {}, nil } // vtxSum returns a vertex that ourputs sum of its inputs plus a constant @@ -3599,8 +3599,8 @@ func (v *vertexSum) Exec(ctx context.Context, g session.Group, inputs []Result) return []Result{&dummyResult{id: identity.NewID(), intValue: s}}, nil } -func (v *vertexSum) CountsAsParallelism() bool { - return false +func (v *vertexSum) Acquire(ctx context.Context) (ReleaseFunc, error) { + return func() {}, nil } func vtxSubBuild(g Edge, opt vtxOpt) *vertexSubBuild { @@ -3634,8 +3634,8 @@ func (v *vertexSubBuild) Exec(ctx context.Context, g session.Group, inputs []Res return []Result{res}, nil } -func (v *vertexSubBuild) CountsAsParallelism() bool { - return false +func (v *vertexSubBuild) Acquire(ctx context.Context) (ReleaseFunc, error) { + return func() {}, nil } //nolint:unused diff --git a/solver/types.go b/solver/types.go index 0bfcc44f..a8776069 100644 --- a/solver/types.go +++ b/solver/types.go @@ -135,6 +135,8 @@ type CacheLink struct { Selector digest.Digest `json:",omitempty"` } +type ReleaseFunc func() + // Op defines how the solver can evaluate the properties of a vertex operation. // An op is executed in the worker, and is retrieved from the vertex by the // value of `vertex.Sys()`. The solver is configured with a resolve function to @@ -147,9 +149,8 @@ type Op interface { // Exec runs an operation given results from previous operations. Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error) - // CountsAsParallelism specifies whether the `Op` should be subject to the parallelism - // semaphore. - CountsAsParallelism() bool + // Acquire acquires the necessary resources to execute the `Op`. + Acquire(ctx context.Context) (release ReleaseFunc, err error) } type ResultBasedCacheFunc func(context.Context, Result, session.Group) (digest.Digest, error) diff --git a/worker/base/worker.go b/worker/base/worker.go index 73465db3..e078d105 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -50,6 +50,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" + "golang.org/x/sync/semaphore" ) const labelCreatedAt = "buildkit/createdat" @@ -74,6 +75,7 @@ type WorkerOpt struct { IdentityMapping *idtools.IdentityMapping LeaseManager leases.Manager GarbageCollect func(context.Context) (gc.Stats, error) + ParallelismSem *semaphore.Weighted } // Worker is a local worker instance with dedicated snapshotter, cache, and so on. @@ -261,11 +263,11 @@ func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *se if baseOp, ok := v.Sys().(*pb.Op); ok { switch op := baseOp.Op.(type) { case *pb.Op_Source: - return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, sm, w) + return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, w.ParallelismSem, sm, w) case *pb.Op_Exec: - return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, sm, w.WorkerOpt.MetadataStore, w.WorkerOpt.Executor, w) + return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, w.ParallelismSem, sm, w.WorkerOpt.MetadataStore, w.WorkerOpt.Executor, w) case *pb.Op_File: - return ops.NewFileOp(v, op, w.CacheMgr, w.WorkerOpt.MetadataStore, w) + return ops.NewFileOp(v, op, w.CacheMgr, w.ParallelismSem, w.WorkerOpt.MetadataStore, w) case *pb.Op_Build: return ops.NewBuildOp(v, op, s, w) default: diff --git a/worker/containerd/containerd.go b/worker/containerd/containerd.go index a42894b5..5b5ef079 100644 --- a/worker/containerd/containerd.go +++ b/worker/containerd/containerd.go @@ -22,19 +22,20 @@ import ( "github.com/moby/buildkit/worker/base" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" + "golang.org/x/sync/semaphore" ) // NewWorkerOpt creates a WorkerOpt. -func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) { +func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, opts ...containerd.ClientOpt) (base.WorkerOpt, error) { opts = append(opts, containerd.WithDefaultNamespace(ns)) client, err := containerd.New(address, opts...) if err != nil { return base.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address) } - return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile) + return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile, parallelismSem) } -func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string) (base.WorkerOpt, error) { +func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) { if strings.Contains(snapshotterName, "/") { return base.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName) } @@ -123,6 +124,7 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s Platforms: platforms, LeaseManager: lm, GarbageCollect: gc, + ParallelismSem: parallelismSem, } return opt, nil } diff --git a/worker/containerd/containerd_test.go b/worker/containerd/containerd_test.go index d4c9f5bb..f2eb7198 100644 --- a/worker/containerd/containerd_test.go +++ b/worker/containerd/containerd_test.go @@ -30,7 +30,7 @@ func newWorkerOpt(t *testing.T, addr string) (base.WorkerOpt, func()) { tmpdir, err := ioutil.TempDir("", "workertest") require.NoError(t, err) cleanup := func() { os.RemoveAll(tmpdir) } - workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "") + workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "", nil) require.NoError(t, err) return workerOpt, cleanup } diff --git a/worker/runc/runc.go b/worker/runc/runc.go index 54b8c125..6d81274c 100644 --- a/worker/runc/runc.go +++ b/worker/runc/runc.go @@ -23,6 +23,7 @@ import ( "github.com/moby/buildkit/worker/base" specs "github.com/opencontainers/image-spec/specs-go/v1" bolt "go.etcd.io/bbolt" + "golang.org/x/sync/semaphore" ) // SnapshotterFactory instantiates a snapshotter @@ -32,7 +33,7 @@ type SnapshotterFactory struct { } // NewWorkerOpt creates a WorkerOpt. -func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string) (base.WorkerOpt, error) { +func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) { var opt base.WorkerOpt name := "runc-" + snFactory.Name root = filepath.Join(root, name) @@ -124,6 +125,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc IdentityMapping: idmap, LeaseManager: lm, GarbageCollect: mdb.GarbageCollect, + ParallelismSem: parallelismSem, } return opt, nil } diff --git a/worker/runc/runc_test.go b/worker/runc/runc_test.go index a0321cc7..3bd249db 100644 --- a/worker/runc/runc_test.go +++ b/worker/runc/runc_test.go @@ -40,7 +40,7 @@ func newWorkerOpt(t *testing.T, processMode oci.ProcessMode) (base.WorkerOpt, fu }, } rootless := false - workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "") + workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "", nil) require.NoError(t, err) return workerOpt, cleanup From 60d38f972cbbe9954243b3a238dad3a62c55dcda Mon Sep 17 00:00:00 2001 From: "Vlad A. Ionescu" Date: Thu, 13 May 2021 14:55:36 +0300 Subject: [PATCH 3/3] Move config under worker config. Add new integration test to GHA matrix. Signed-off-by: Vlad A. Ionescu --- .github/workflows/build.yml | 3 +++ cmd/buildkitd/config/config.go | 6 ++++-- cmd/buildkitd/main_containerd_worker.go | 4 ++-- cmd/buildkitd/main_oci_worker.go | 4 ++-- solver/jobs_test.go | 2 +- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0316f887..03fecacb 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -90,6 +90,9 @@ jobs: - pkg: ./cmd/buildctl ./worker/containerd typ: integration + - + pkg: ./solver + typ: integration - pkg: '' skip-integration-tests: 1 diff --git a/cmd/buildkitd/config/config.go b/cmd/buildkitd/config/config.go index a3485c02..f296e6cd 100644 --- a/cmd/buildkitd/config/config.go +++ b/cmd/buildkitd/config/config.go @@ -22,8 +22,6 @@ type Config struct { Registries map[string]RegistryConfig `toml:"registry"` DNS *DNSConfig `toml:"dns"` - - MaxParallelism int `toml:"max-parallelism"` } type GRPCConfig struct { @@ -93,6 +91,8 @@ type OCIConfig struct { // ApparmorProfile is the name of the apparmor profile that should be used to constrain build containers. // The profile should already be loaded (by a higher level system) before creating a worker. ApparmorProfile string `toml:"apparmor-profile"` + + MaxParallelism int `toml:"max-parallelism"` } type ContainerdConfig struct { @@ -108,6 +108,8 @@ type ContainerdConfig struct { // ApparmorProfile is the name of the apparmor profile that should be used to constrain build containers. // The profile should already be loaded (by a higher level system) before creating a worker. ApparmorProfile string `toml:"apparmor-profile"` + + MaxParallelism int `toml:"max-parallelism"` } type GCPolicy struct { diff --git a/cmd/buildkitd/main_containerd_worker.go b/cmd/buildkitd/main_containerd_worker.go index b42b6c24..4e096334 100644 --- a/cmd/buildkitd/main_containerd_worker.go +++ b/cmd/buildkitd/main_containerd_worker.go @@ -227,8 +227,8 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([ } var parallelismSem *semaphore.Weighted - if common.config.MaxParallelism > 0 { - parallelismSem = semaphore.NewWeighted(int64(common.config.MaxParallelism)) + if cfg.MaxParallelism > 0 { + parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism)) } snapshotter := ctd.DefaultSnapshotter diff --git a/cmd/buildkitd/main_oci_worker.go b/cmd/buildkitd/main_oci_worker.go index ccc9d3b0..9d7d0172 100644 --- a/cmd/buildkitd/main_oci_worker.go +++ b/cmd/buildkitd/main_oci_worker.go @@ -278,8 +278,8 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker } var parallelismSem *semaphore.Weighted - if common.config.MaxParallelism > 0 { - parallelismSem = semaphore.NewWeighted(int64(common.config.MaxParallelism)) + if cfg.MaxParallelism > 0 { + parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism)) } opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem) diff --git a/solver/jobs_test.go b/solver/jobs_test.go index af0f7011..198eba64 100644 --- a/solver/jobs_test.go +++ b/solver/jobs_test.go @@ -93,7 +93,7 @@ func testParallelism(t *testing.T, sb integration.Sandbox) { type parallelismSetterSingle struct{} func (*parallelismSetterSingle) UpdateConfigFile(in string) string { - return in + "\n\nmax-parallelism = 1\n" + return in + "\n\n[worker.oci]\n max-parallelism = 1\n\n[worker.containerd]\n max-parallelism = 1\n" } var maxParallelismSingle integration.ConfigUpdater = ¶llelismSetterSingle{}