Merge pull request #2049 from earthly/vlad/parallelism-sem

Add a configuration item to limit parallelism
v0.9
Tõnis Tiigi 2021-05-21 23:47:40 -07:00 committed by GitHub
commit 8d5c5f1974
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 263 additions and 52 deletions

View File

@ -90,6 +90,9 @@ jobs:
-
pkg: ./cmd/buildctl ./worker/containerd
typ: integration
-
pkg: ./solver
typ: integration
-
pkg: ''
skip-integration-tests: 1

View File

@ -91,6 +91,8 @@ type OCIConfig struct {
// ApparmorProfile is the name of the apparmor profile that should be used to constrain build containers.
// The profile should already be loaded (by a higher level system) before creating a worker.
ApparmorProfile string `toml:"apparmor-profile"`
MaxParallelism int `toml:"max-parallelism"`
}
type ContainerdConfig struct {
@ -106,6 +108,8 @@ type ContainerdConfig struct {
// ApparmorProfile is the name of the apparmor profile that should be used to constrain build containers.
// The profile should already be loaded (by a higher level system) before creating a worker.
ApparmorProfile string `toml:"apparmor-profile"`
MaxParallelism int `toml:"max-parallelism"`
}
type GCPolicy struct {

View File

@ -18,6 +18,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"golang.org/x/sync/semaphore"
)
const (
@ -225,11 +226,16 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
},
}
var parallelismSem *semaphore.Weighted
if cfg.MaxParallelism > 0 {
parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism))
}
snapshotter := ctd.DefaultSnapshotter
if cfg.Snapshotter != "" {
snapshotter = cfg.Snapshotter
}
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, ctd.WithTimeout(60*time.Second))
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, ctd.WithTimeout(60*time.Second))
if err != nil {
return nil, err
}

View File

@ -36,6 +36,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)
@ -276,7 +277,12 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
},
}
opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile)
var parallelismSem *semaphore.Weighted
if cfg.MaxParallelism > 0 {
parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism))
}
opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem)
if err != nil {
return nil, err
}

View File

@ -766,6 +766,11 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
if s.execRes != nil || s.execErr != nil {
return s.execRes, s.execErr
}
release, err := op.Acquire(ctx)
if err != nil {
return nil, errors.Wrap(err, "acquire op resources")
}
defer release()
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
ctx = withAncestorCacheOpts(ctx, s.st)

107
solver/jobs_test.go Normal file
View File

@ -0,0 +1,107 @@
package solver
import (
"context"
"io/ioutil"
"os"
"testing"
"time"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/util/testutil/integration"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func init() {
integration.InitOCIWorker()
integration.InitContainerdWorker()
}
func TestJobsIntegration(t *testing.T) {
mirrors := integration.WithMirroredImages(integration.OfficialImages("busybox:latest"))
integration.Run(t, []integration.Test{
testParallelism,
},
mirrors,
integration.WithMatrix("max-parallelism", map[string]interface{}{
"single": maxParallelismSingle,
"unlimited": maxParallelismUnlimited,
}),
)
}
func testParallelism(t *testing.T, sb integration.Sandbox) {
ctx := context.TODO()
c, err := client.New(ctx, sb.Address())
require.NoError(t, err)
defer c.Close()
cacheMount := llb.AddMount(
"/shared", llb.Scratch(),
llb.AsPersistentCacheDir("shared", llb.CacheMountShared))
run1 := llb.Image("busybox:latest").Run(
llb.Args([]string{
"/bin/sh", "-c",
"touch /shared/signal1 && i=0; while [ ! -f /shared/signal2 ] && [ $i -lt 10 ]; do i=$((i+1)); sleep 1; done",
}),
cacheMount,
).Root()
d1, err := run1.Marshal(ctx)
require.NoError(t, err)
run2 := llb.Image("busybox:latest").Run(
llb.Args([]string{
"/bin/sh", "-c",
"touch /shared/signal2 && i=0; while [ ! -f /shared/signal1 ] && [ $i -lt 10 ]; do i=$((i+1)); sleep 1; done",
}),
cacheMount,
).Root()
d2, err := run2.Marshal(ctx)
require.NoError(t, err)
timeStart := time.Now()
eg, egCtx := errgroup.WithContext(ctx)
tmpDir, err := ioutil.TempDir("", "solver-jobs-test-")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
solveOpt := client.SolveOpt{
LocalDirs: map[string]string{"cache": tmpDir},
}
eg.Go(func() error {
_, err := c.Solve(egCtx, d1, solveOpt, nil)
return err
})
eg.Go(func() error {
_, err := c.Solve(egCtx, d2, solveOpt, nil)
return err
})
err = eg.Wait()
require.NoError(t, err)
elapsed := time.Since(timeStart)
maxParallelism := sb.Value("max-parallelism")
if maxParallelism == maxParallelismSingle {
require.Greater(t, elapsed, 10*time.Second, "parallelism not restricted")
} else if maxParallelism == maxParallelismUnlimited {
require.Less(t, elapsed, 10*time.Second, "parallelism hindered")
}
}
type parallelismSetterSingle struct{}
func (*parallelismSetterSingle) UpdateConfigFile(in string) string {
return in + "\n\n[worker.oci]\n max-parallelism = 1\n\n[worker.containerd]\n max-parallelism = 1\n"
}
var maxParallelismSingle integration.ConfigUpdater = &parallelismSetterSingle{}
type parallelismSetterUnlimited struct{}
func (*parallelismSetterUnlimited) UpdateConfigFile(in string) string {
return in
}
var maxParallelismUnlimited integration.ConfigUpdater = &parallelismSetterUnlimited{}

View File

@ -141,3 +141,8 @@ func (b *buildOp) Exec(ctx context.Context, g session.Group, inputs []solver.Res
return []solver.Result{r}, err
}
func (b *buildOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) {
// buildOp itself does not count towards parallelism budget.
return func() {}, nil
}

View File

@ -29,33 +29,36 @@ import (
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
const execCacheType = "buildkit.exec.v0"
type execOp struct {
op *pb.ExecOp
cm cache.Manager
mm *mounts.MountManager
exec executor.Executor
w worker.Worker
platform *pb.Platform
numInputs int
op *pb.ExecOp
cm cache.Manager
mm *mounts.MountManager
exec executor.Executor
w worker.Worker
platform *pb.Platform
numInputs int
parallelism *semaphore.Weighted
}
func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, sm *session.Manager, md *metadata.Store, exec executor.Executor, w worker.Worker) (solver.Op, error) {
func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, parallelism *semaphore.Weighted, sm *session.Manager, md *metadata.Store, exec executor.Executor, w worker.Worker) (solver.Op, error) {
if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil {
return nil, err
}
name := fmt.Sprintf("exec %s", strings.Join(op.Exec.Meta.Args, " "))
return &execOp{
op: op.Exec,
mm: mounts.NewMountManager(name, cm, sm, md),
cm: cm,
exec: exec,
numInputs: len(v.Inputs()),
w: w,
platform: platform,
op: op.Exec,
mm: mounts.NewMountManager(name, cm, sm, md),
cm: cm,
exec: exec,
numInputs: len(v.Inputs()),
w: w,
platform: platform,
parallelism: parallelism,
}, nil
}
@ -388,3 +391,16 @@ func parseExtraHosts(ips []*pb.HostIP) ([]executor.HostIP, error) {
}
return out, nil
}
func (e *execOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) {
if e.parallelism == nil {
return func() {}, nil
}
err := e.parallelism.Acquire(ctx, 1)
if err != nil {
return nil, err
}
return func() {
e.parallelism.Release(1)
}, nil
}

View File

@ -24,28 +24,31 @@ import (
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
const fileCacheType = "buildkit.file.v0"
type fileOp struct {
op *pb.FileOp
md *metadata.Store
w worker.Worker
solver *FileOpSolver
numInputs int
op *pb.FileOp
md *metadata.Store
w worker.Worker
solver *FileOpSolver
numInputs int
parallelism *semaphore.Weighted
}
func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, md *metadata.Store, w worker.Worker) (solver.Op, error) {
func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, parallelism *semaphore.Weighted, md *metadata.Store, w worker.Worker) (solver.Op, error) {
if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil {
return nil, err
}
return &fileOp{
op: op.File,
md: md,
numInputs: len(v.Inputs()),
w: w,
solver: NewFileOpSolver(w, &file.Backend{}, file.NewRefManager(cm)),
op: op.File,
md: md,
numInputs: len(v.Inputs()),
w: w,
solver: NewFileOpSolver(w, &file.Backend{}, file.NewRefManager(cm)),
parallelism: parallelism,
}, nil
}
@ -179,6 +182,19 @@ func (f *fileOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
return outResults, nil
}
func (f *fileOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) {
if f.parallelism == nil {
return func() {}, nil
}
err := f.parallelism.Acquire(ctx, 1)
if err != nil {
return nil, err
}
return func() {
f.parallelism.Release(1)
}, nil
}
func addSelector(m map[int]map[llbsolver.Selector]struct{}, idx int, sel string, wildcard, followLinks bool) {
mm, ok := m[idx]
if !ok {

View File

@ -12,32 +12,35 @@ import (
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"golang.org/x/sync/semaphore"
)
const sourceCacheType = "buildkit.source.v0"
type sourceOp struct {
mu sync.Mutex
op *pb.Op_Source
platform *pb.Platform
sm *source.Manager
src source.SourceInstance
sessM *session.Manager
w worker.Worker
vtx solver.Vertex
mu sync.Mutex
op *pb.Op_Source
platform *pb.Platform
sm *source.Manager
src source.SourceInstance
sessM *session.Manager
w worker.Worker
vtx solver.Vertex
parallelism *semaphore.Weighted
}
func NewSourceOp(vtx solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, sessM *session.Manager, w worker.Worker) (solver.Op, error) {
func NewSourceOp(vtx solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, parallelism *semaphore.Weighted, sessM *session.Manager, w worker.Worker) (solver.Op, error) {
if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil {
return nil, err
}
return &sourceOp{
op: op,
sm: sm,
w: w,
sessM: sessM,
platform: platform,
vtx: vtx,
op: op,
sm: sm,
w: w,
sessM: sessM,
platform: platform,
vtx: vtx,
parallelism: parallelism,
}, nil
}
@ -93,3 +96,16 @@ func (s *sourceOp) Exec(ctx context.Context, g session.Group, _ []solver.Result)
}
return []solver.Result{worker.NewWorkerRefResult(ref, s.w)}, nil
}
func (s *sourceOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) {
if s.parallelism == nil {
return func() {}, nil
}
err := s.parallelism.Acquire(ctx, 1)
if err != nil {
return nil, err
}
return func() {
s.parallelism.Release(1)
}, nil
}

View File

@ -3511,6 +3511,10 @@ func (v *vertex) Exec(ctx context.Context, g session.Group, inputs []Result) (ou
return []Result{&dummyResult{id: identity.NewID(), value: v.opt.value}}, nil
}
func (v *vertex) Acquire(ctx context.Context) (ReleaseFunc, error) {
return func() {}, nil
}
func (v *vertex) makeCacheMap() *CacheMap {
m := &CacheMap{
Digest: digest.FromBytes([]byte(fmt.Sprintf("seed:%s", v.opt.cacheKeySeed))),
@ -3556,6 +3560,10 @@ func (v *vertexConst) Exec(ctx context.Context, g session.Group, inputs []Result
return []Result{&dummyResult{id: identity.NewID(), intValue: v.value}}, nil
}
func (v *vertexConst) Acquire(ctx context.Context) (ReleaseFunc, error) {
return func() {}, nil
}
// vtxSum returns a vertex that ourputs sum of its inputs plus a constant
func vtxSum(v int, opt vtxOpt) *vertexSum {
if opt.cacheKeySeed == "" {
@ -3591,6 +3599,10 @@ func (v *vertexSum) Exec(ctx context.Context, g session.Group, inputs []Result)
return []Result{&dummyResult{id: identity.NewID(), intValue: s}}, nil
}
func (v *vertexSum) Acquire(ctx context.Context) (ReleaseFunc, error) {
return func() {}, nil
}
func vtxSubBuild(g Edge, opt vtxOpt) *vertexSubBuild {
if opt.cacheKeySeed == "" {
opt.cacheKeySeed = fmt.Sprintf("sum-%s", identity.NewID())
@ -3622,6 +3634,10 @@ func (v *vertexSubBuild) Exec(ctx context.Context, g session.Group, inputs []Res
return []Result{res}, nil
}
func (v *vertexSubBuild) Acquire(ctx context.Context) (ReleaseFunc, error) {
return func() {}, nil
}
//nolint:unused
func printGraph(e Edge, pfx string) {
name := e.Vertex.Name()

View File

@ -135,6 +135,8 @@ type CacheLink struct {
Selector digest.Digest `json:",omitempty"`
}
type ReleaseFunc func()
// Op defines how the solver can evaluate the properties of a vertex operation.
// An op is executed in the worker, and is retrieved from the vertex by the
// value of `vertex.Sys()`. The solver is configured with a resolve function to
@ -146,6 +148,9 @@ type Op interface {
// Exec runs an operation given results from previous operations.
Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error)
// Acquire acquires the necessary resources to execute the `Op`.
Acquire(ctx context.Context) (release ReleaseFunc, err error)
}
type ResultBasedCacheFunc func(context.Context, Result, session.Group) (digest.Digest, error)

View File

@ -50,6 +50,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"golang.org/x/sync/semaphore"
)
const labelCreatedAt = "buildkit/createdat"
@ -74,6 +75,7 @@ type WorkerOpt struct {
IdentityMapping *idtools.IdentityMapping
LeaseManager leases.Manager
GarbageCollect func(context.Context) (gc.Stats, error)
ParallelismSem *semaphore.Weighted
}
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
@ -261,11 +263,11 @@ func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *se
if baseOp, ok := v.Sys().(*pb.Op); ok {
switch op := baseOp.Op.(type) {
case *pb.Op_Source:
return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, sm, w)
return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, w.ParallelismSem, sm, w)
case *pb.Op_Exec:
return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, sm, w.WorkerOpt.MetadataStore, w.WorkerOpt.Executor, w)
return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, w.ParallelismSem, sm, w.WorkerOpt.MetadataStore, w.WorkerOpt.Executor, w)
case *pb.Op_File:
return ops.NewFileOp(v, op, w.CacheMgr, w.WorkerOpt.MetadataStore, w)
return ops.NewFileOp(v, op, w.CacheMgr, w.ParallelismSem, w.WorkerOpt.MetadataStore, w)
case *pb.Op_Build:
return ops.NewBuildOp(v, op, s, w)
default:

View File

@ -22,19 +22,20 @@ import (
"github.com/moby/buildkit/worker/base"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)
// NewWorkerOpt creates a WorkerOpt.
func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
opts = append(opts, containerd.WithDefaultNamespace(ns))
client, err := containerd.New(address, opts...)
if err != nil {
return base.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address)
}
return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile)
return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile, parallelismSem)
}
func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string) (base.WorkerOpt, error) {
func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) {
if strings.Contains(snapshotterName, "/") {
return base.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName)
}
@ -123,6 +124,7 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
Platforms: platforms,
LeaseManager: lm,
GarbageCollect: gc,
ParallelismSem: parallelismSem,
}
return opt, nil
}

View File

@ -30,7 +30,7 @@ func newWorkerOpt(t *testing.T, addr string) (base.WorkerOpt, func()) {
tmpdir, err := ioutil.TempDir("", "workertest")
require.NoError(t, err)
cleanup := func() { os.RemoveAll(tmpdir) }
workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "")
workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "", nil)
require.NoError(t, err)
return workerOpt, cleanup
}

View File

@ -23,6 +23,7 @@ import (
"github.com/moby/buildkit/worker/base"
specs "github.com/opencontainers/image-spec/specs-go/v1"
bolt "go.etcd.io/bbolt"
"golang.org/x/sync/semaphore"
)
// SnapshotterFactory instantiates a snapshotter
@ -32,7 +33,7 @@ type SnapshotterFactory struct {
}
// NewWorkerOpt creates a WorkerOpt.
func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string) (base.WorkerOpt, error) {
func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) {
var opt base.WorkerOpt
name := "runc-" + snFactory.Name
root = filepath.Join(root, name)
@ -124,6 +125,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
IdentityMapping: idmap,
LeaseManager: lm,
GarbageCollect: mdb.GarbageCollect,
ParallelismSem: parallelismSem,
}
return opt, nil
}

View File

@ -40,7 +40,7 @@ func newWorkerOpt(t *testing.T, processMode oci.ProcessMode) (base.WorkerOpt, fu
},
}
rootless := false
workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "")
workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "", nil)
require.NoError(t, err)
return workerOpt, cleanup