From 6d58121c1147447700f08452e8f662e0f7301a0e Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Thu, 9 Jul 2020 23:07:28 +0000 Subject: [PATCH 1/4] Update Executor interface for Run and Exec Signed-off-by: Cory Bennett --- executor/containerdexecutor/executor.go | 90 +++++++++++++++- executor/executor.go | 10 +- executor/runcexecutor/executor.go | 67 +++++++++++- solver/llbsolver/ops/exec.go | 3 +- worker/base/worker.go | 2 +- worker/containerd/containerd_test.go | 131 ++++++++++++++++++++++++ worker/runc/runc_test.go | 71 ++++++++++++- 7 files changed, 358 insertions(+), 16 deletions(-) create mode 100644 worker/containerd/containerd_test.go diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 4919054a..0a0104db 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -2,7 +2,6 @@ package containerdexecutor import ( "context" - "io" "os" "path/filepath" "strings" @@ -21,6 +20,7 @@ import ( "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/network" + "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -48,8 +48,11 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb } } -func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (err error) { - id := identity.NewID() +func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo) (err error) { + if id == "" { + id = identity.NewID() + } + meta := process.Meta resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig) if err != nil { @@ -160,7 +163,12 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c } }() - task, err := container.NewTask(ctx, cio.NewCreator(cio.WithStreams(stdin, stdout, stderr)), containerd.WithRootFS(rootMounts)) + cioOpts := []cio.Opt{cio.WithStreams(process.Stdin, process.Stdout, process.Stderr)} + if meta.Tty { + cioOpts = append(cioOpts, cio.WithTerminal) + } + + task, err := container.NewTask(ctx, cio.NewCreator(cioOpts...), containerd.WithRootFS(rootMounts)) if err != nil { return err } @@ -209,5 +217,77 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c return nil } } - +} + +func (w containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { + meta := process.Meta + + // first verify the container is running, if we get an error assume the container + // is in the process of being created and check again every 100ms or until + // context is canceled. + + var container containerd.Container + var task containerd.Task + for { + if container == nil { + container, _ = w.client.LoadContainer(ctx, id) + } + if container != nil && task == nil { + task, _ = container.Task(ctx, nil) + } + if task != nil { + status, _ := task.Status(ctx) + if status.Status == containerd.Running { + break + } + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(100 * time.Millisecond): + continue + } + } + + spec, err := container.Spec(ctx) + if err != nil { + return errors.WithStack(err) + } + + proc := spec.Process + + // TODO how do we get rootfsPath for oci.GetUser in case user passed in username rather than uid:gid? + // For now only support uid:gid + if meta.User != "" { + uid, gid, err := oci.ParseUIDGID(meta.User) + if err != nil { + return errors.WithStack(err) + } + proc.User = specs.User{ + UID: uid, + GID: gid, + AdditionalGids: []uint32{}, + } + } + + proc.Terminal = meta.Tty + proc.Args = meta.Args + if meta.Cwd != "" { + spec.Process.Cwd = meta.Cwd + } + if len(meta.Env) > 0 { + // merge exec env with pid1 env + spec.Process.Env = append(spec.Process.Env, meta.Env...) + } + + cioOpts := []cio.Opt{cio.WithStreams(process.Stdin, process.Stdout, process.Stderr)} + if meta.Tty { + cioOpts = append(cioOpts, cio.WithTerminal) + } + + taskProcess, err := task.Exec(ctx, identity.NewID(), proc, cio.NewCreator(cioOpts...)) + if err != nil { + return errors.WithStack(err) + } + return taskProcess.Start(ctx) } diff --git a/executor/executor.go b/executor/executor.go index df6d7920..3cf60e7d 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -28,9 +28,15 @@ type Mount struct { Readonly bool } +type ProcessInfo struct { + Meta Meta + Stdin io.ReadCloser + Stdout, Stderr io.WriteCloser +} + type Executor interface { - // TODO: add stdout/err - Exec(ctx context.Context, meta Meta, rootfs cache.Mountable, mounts []Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error + Run(ctx context.Context, id string, rootfs cache.Mountable, mounts []Mount, process ProcessInfo) error + Exec(ctx context.Context, id string, process ProcessInfo) error } type HostIP struct { diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index dc16c8f0..ad80d257 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -24,6 +24,7 @@ import ( "github.com/moby/buildkit/util/network" rootlessspecconv "github.com/moby/buildkit/util/rootless/specconv" "github.com/moby/buildkit/util/stack" + "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -123,7 +124,8 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex return w, nil } -func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { +func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo) error { + meta := process.Meta provider, ok := w.networkProviders[meta.NetMode] if !ok { return errors.Errorf("unknown network mode %s", meta.NetMode) @@ -164,7 +166,9 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache. defer release() } - id := identity.NewID() + if id == "" { + id = identity.NewID() + } bundle := filepath.Join(w.root, id) if err := os.Mkdir(bundle, 0711); err != nil { @@ -245,6 +249,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache. } } + spec.Process.Terminal = meta.Tty spec.Process.OOMScoreAdj = w.oomScoreAdj if w.rootless { if err := rootlessspecconv.ToRootless(spec); err != nil { @@ -290,7 +295,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache. logrus.Debugf("> creating %s %v", id, meta.Args) status, err := w.runc.Run(runCtx, id, bundle, &runc.CreateOpts{ - IO: &forwardIO{stdin: stdin, stdout: stdout, stderr: stderr}, + IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, NoPivot: w.noPivot, }) close(done) @@ -310,6 +315,62 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache. return nil } +func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { + // first verify the container is running, if we get an error assume the container + // is in the process of being created and check again every 100ms or until + // context is canceled. + state, _ := w.runc.State(ctx, id) + for { + if state != nil && state.Status == "running" { + break + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(100 * time.Millisecond): + state, _ = w.runc.State(ctx, id) + } + } + + // load default process spec (for Env, Cwd etc) from bundle + f, err := os.Open(filepath.Join(state.Bundle, "config.json")) + if err != nil { + return errors.WithStack(err) + } + defer f.Close() + + spec := &specs.Spec{} + if err := json.NewDecoder(f).Decode(spec); err != nil { + return err + } + + if process.Meta.User != "" { + uid, gid, sgids, err := oci.GetUser(ctx, state.Rootfs, process.Meta.User) + if err != nil { + return err + } + spec.Process.User = specs.User{ + UID: uid, + GID: gid, + AdditionalGids: sgids, + } + } + + spec.Process.Terminal = process.Meta.Tty + spec.Process.Args = process.Meta.Args + if process.Meta.Cwd != "" { + spec.Process.Cwd = process.Meta.Cwd + } + if len(process.Meta.Env) > 0 { + // merge exec env with pid1 env + spec.Process.Env = append(spec.Process.Env, process.Meta.Env...) + } + + return w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{ + IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, + }) +} + type forwardIO struct { stdin io.ReadCloser stdout, stderr io.WriteCloser diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index 127b8259..b4306bd8 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -381,6 +381,7 @@ func (sm *sshMountInstance) Mount() ([]mount.Mount, func() error, error) { GID: gid, }) if err != nil { + cancel() return nil, nil, err } uid = identity.UID @@ -731,7 +732,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu defer stdout.Close() defer stderr.Close() - if err := e.exec.Exec(ctx, meta, root, mounts, nil, stdout, stderr); err != nil { + if err := e.exec.Run(ctx, "", root, mounts, executor.ProcessInfo{Meta: meta, Stdin: nil, Stdout: stdout, Stderr: stderr}); err != nil { return nil, errors.Wrapf(err, "executor failed running %v", meta.Args) } diff --git a/worker/base/worker.go b/worker/base/worker.go index 19a5a2bd..cd08b35b 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -297,7 +297,7 @@ func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.Immu return err } defer active.Release(context.TODO()) - return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr) + return w.Executor.Run(ctx, "", active, nil, executor.ProcessInfo{Meta: meta, Stdin: stdin, Stdout: stdout, Stderr: stderr}) } func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) { diff --git a/worker/containerd/containerd_test.go b/worker/containerd/containerd_test.go new file mode 100644 index 00000000..28f74df7 --- /dev/null +++ b/worker/containerd/containerd_test.go @@ -0,0 +1,131 @@ +// +build linux,!no_containerd_worker + +package containerd + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "os" + "testing" + + "github.com/containerd/containerd/namespaces" + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/executor" + "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/source" + "github.com/moby/buildkit/util/network/netproviders" + "github.com/moby/buildkit/worker/base" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +const sockFile = "/run/containerd/containerd.sock" +const ns = "buildkit-test" + +func newWorkerOpt(t *testing.T) (base.WorkerOpt, func()) { + tmpdir, err := ioutil.TempDir("", "workertest") + require.NoError(t, err) + cleanup := func() { os.RemoveAll(tmpdir) } + workerOpt, err := NewWorkerOpt(tmpdir, sockFile, "overlayfs", ns, nil, nil, netproviders.Opt{Mode: "host"}) + require.NoError(t, err) + return workerOpt, cleanup +} + +func checkRequirement(t *testing.T) { + if os.Getuid() != 0 { + t.Skip("requires root") + } + + fi, err := os.Stat(sockFile) + if err != nil { + t.Skipf("Failed to stat %s: %s", sockFile, err.Error()) + } + if fi.Mode()&os.ModeSocket == 0 { + t.Skipf("%s is not a unix domain socket", sockFile) + } +} + +func newCtx(s string) context.Context { + return namespaces.WithNamespace(context.Background(), s) +} + +func newBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker, sm *session.Manager) cache.ImmutableRef { + img, err := source.NewImageIdentifier("docker.io/library/busybox:latest") + require.NoError(t, err) + src, err := w.SourceManager.Resolve(ctx, img, sm) + require.NoError(t, err) + snap, err := src.Snapshot(ctx, nil) + require.NoError(t, err) + return snap +} + +func TestContainerdWorkerExec(t *testing.T) { + t.Parallel() + checkRequirement(t) + + workerOpt, cleanupWorkerOpt := newWorkerOpt(t) + defer cleanupWorkerOpt() + w, err := base.NewWorker(workerOpt) + require.NoError(t, err) + + ctx := newCtx(ns) + ctx, cancel := context.WithCancel(ctx) + sm, err := session.NewManager() + require.NoError(t, err) + + snap := newBusyboxSourceSnapshot(ctx, t, w, sm) + root, err := w.CacheManager.New(ctx, snap) + require.NoError(t, err) + + id := identity.NewID() + + // first start pid1 in the background + eg := errgroup.Group{} + eg.Go(func() error { + return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"sleep", "10"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"}, + }, + }) + }) + + stdout := bytes.NewBuffer(nil) + stderr := bytes.NewBuffer(nil) + err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"ps", "-o", "pid,comm"}, + }, + Stdout: &nopCloser{stdout}, + Stderr: &nopCloser{stderr}, + }) + t.Logf("Stdout: %s", stdout.String()) + t.Logf("Stderr: %s", stderr.String()) + require.NoError(t, err) + // verify pid1 is sleep + require.Contains(t, stdout.String(), "1 sleep") + require.Empty(t, stderr.String()) + + // stop pid1 + cancel() + + err = eg.Wait() + // we expect this to get canceled after we test the exec + require.EqualError(t, errors.Cause(err), "context canceled") + + err = snap.Release(ctx) + require.NoError(t, err) +} + +type nopCloser struct { + io.Writer +} + +func (n *nopCloser) Close() error { + return nil +} diff --git a/worker/runc/runc_test.go b/worker/runc/runc_test.go index 5b062f77..3645769f 100644 --- a/worker/runc/runc_test.go +++ b/worker/runc/runc_test.go @@ -20,12 +20,15 @@ import ( "github.com/moby/buildkit/client" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/executor/oci" + "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/source" "github.com/moby/buildkit/util/network/netproviders" "github.com/moby/buildkit/worker/base" + "github.com/pkg/errors" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func newWorkerOpt(t *testing.T, processMode oci.ProcessMode) (base.WorkerOpt, func()) { @@ -124,7 +127,7 @@ func TestRuncWorker(t *testing.T) { } stderr := bytes.NewBuffer(nil) - err = w.Executor.Exec(ctx, meta, snap, nil, nil, nil, &nopCloser{stderr}) + err = w.Executor.Run(ctx, "", snap, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}) require.Error(t, err) // Read-only root // typical error is like `mkdir /.../rootfs/proc: read-only file system`. // make sure the error is caused before running `echo foo > /bar`. @@ -133,7 +136,7 @@ func TestRuncWorker(t *testing.T) { root, err := w.CacheManager.New(ctx, snap) require.NoError(t, err) - err = w.Executor.Exec(ctx, meta, root, nil, nil, nil, &nopCloser{stderr}) + err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}) require.NoError(t, err) meta = executor.Meta{ @@ -141,7 +144,7 @@ func TestRuncWorker(t *testing.T) { Cwd: "/", } - err = w.Executor.Exec(ctx, meta, root, nil, nil, nil, &nopCloser{stderr}) + err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}) require.NoError(t, err) rf, err := root.Commit(ctx) @@ -202,11 +205,71 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) { } stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) - err = w.Executor.Exec(ctx, meta, root, nil, nil, &nopCloser{stdout}, &nopCloser{stderr}) + err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}) require.NoError(t, err, fmt.Sprintf("stdout=%q, stderr=%q", stdout.String(), stderr.String())) require.Equal(t, string(selfCmdline), stdout.String()) } +func TestRuncWorkerExec(t *testing.T) { + t.Parallel() + checkRequirement(t) + + workerOpt, cleanupWorkerOpt := newWorkerOpt(t, oci.ProcessSandbox) + defer cleanupWorkerOpt() + w, err := base.NewWorker(workerOpt) + require.NoError(t, err) + + ctx := newCtx("buildkit-test") + ctx, cancel := context.WithCancel(ctx) + sm, err := session.NewManager() + require.NoError(t, err) + + snap := newBusyboxSourceSnapshot(ctx, t, w, sm) + root, err := w.CacheManager.New(ctx, snap) + require.NoError(t, err) + + id := identity.NewID() + + // first start pid1 in the background + eg := errgroup.Group{} + eg.Go(func() error { + return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"sleep", "10"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"}, + }, + }) + }) + + stdout := bytes.NewBuffer(nil) + stderr := bytes.NewBuffer(nil) + err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"ps", "-o", "pid,comm"}, + }, + Stdout: &nopCloser{stdout}, + Stderr: &nopCloser{stderr}, + }) + t.Logf("Stdout: %s", stdout.String()) + t.Logf("Stderr: %s", stderr.String()) + require.NoError(t, err) + // verify pid1 is sleep + require.Contains(t, stdout.String(), "1 sleep") + require.Empty(t, stderr.String()) + + // stop pid1 + cancel() + + err = eg.Wait() + // we expect pid1 to get canceled after we test the exec + require.EqualError(t, errors.Cause(err), "context canceled") + + err = snap.Release(ctx) + require.NoError(t, err) + +} + type nopCloser struct { io.Writer } From 5e91dff4ed55b4ab22a4eea892187e66ce7fb21a Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Fri, 10 Jul 2020 22:05:30 +0000 Subject: [PATCH 2/4] fix error handling for exec when container fails to start update run/exec tests for stdin and expected failures move common tests for runc and container to shared tests package Signed-off-by: Cory Bennett --- executor/containerdexecutor/executor.go | 86 +++++++--- executor/executor.go | 2 +- executor/runcexecutor/executor.go | 98 +++++++++--- solver/llbsolver/ops/exec.go | 2 +- worker/base/worker.go | 2 +- worker/containerd/containerd_test.go | 91 ++--------- worker/runc/runc_test.go | 96 +++-------- worker/tests/common.go | 201 ++++++++++++++++++++++++ 8 files changed, 373 insertions(+), 205 deletions(-) create mode 100644 worker/tests/common.go diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 0a0104db..ead0c112 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "strings" + "sync" "syscall" "time" @@ -31,6 +32,8 @@ type containerdExecutor struct { networkProviders map[pb.NetMode]network.Provider cgroupParent string dnsConfig *oci.DNSConfig + running map[string]chan error + mu *sync.Mutex } // New creates a new executor backed by connection to containerd API @@ -45,23 +48,43 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb networkProviders: networkProviders, cgroupParent: cgroup, dnsConfig: dnsConfig, + running: make(map[string]chan error), + mu: &sync.Mutex{}, } } -func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo) (err error) { +func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { if id == "" { id = identity.NewID() } + + startedOnce := sync.Once{} + done := make(chan error, 1) + w.mu.Lock() + w.running[id] = done + w.mu.Unlock() + defer func() { + w.mu.Lock() + delete(w.running, id) + w.mu.Unlock() + close(done) + if started != nil { + startedOnce.Do(func() { + close(started) + }) + } + }() + meta := process.Meta resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig) if err != nil { - return err + return sendErr(done, err) } hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, nil) if err != nil { - return err + return sendErr(done, err) } if clean != nil { defer clean() @@ -69,12 +92,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount mountable, err := root.Mount(ctx, false) if err != nil { - return err + return sendErr(done, err) } rootMounts, release, err := mountable.Mount() if err != nil { - return err + return sendErr(done, err) } if release != nil { defer release() @@ -86,12 +109,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount lm := snapshot.LocalMounterWithMounts(rootMounts) rootfsPath, err := lm.Mount() if err != nil { - return err + return sendErr(done, err) } uid, gid, sgids, err = oci.GetUser(ctx, rootfsPath, meta.User) if err != nil { lm.Unmount() - return err + return sendErr(done, err) } identity := idtools.Identity{ @@ -102,12 +125,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount newp, err := fs.RootPath(rootfsPath, meta.Cwd) if err != nil { lm.Unmount() - return errors.Wrapf(err, "working dir %s points to invalid target", newp) + return sendErr(done, errors.Wrapf(err, "working dir %s points to invalid target", newp)) } if _, err := os.Stat(newp); err != nil { if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil { lm.Unmount() - return errors.Wrapf(err, "failed to create working directory %s", newp) + return sendErr(done, errors.Wrapf(err, "failed to create working directory %s", newp)) } } @@ -116,11 +139,11 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount provider, ok := w.networkProviders[meta.NetMode] if !ok { - return errors.Errorf("unknown network mode %s", meta.NetMode) + return sendErr(done, errors.Errorf("unknown network mode %s", meta.NetMode)) } namespace, err := provider.New() if err != nil { - return err + return sendErr(done, err) } defer namespace.Close() @@ -146,7 +169,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount processMode := oci.ProcessSandbox // FIXME(AkihiroSuda) spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, opts...) if err != nil { - return err + return sendErr(done, err) } defer cleanup() @@ -154,12 +177,13 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount containerd.WithSpec(spec), ) if err != nil { - return err + return sendErr(done, err) } defer func() { if err1 := container.Delete(context.TODO()); err == nil && err1 != nil { err = errors.Wrapf(err1, "failed to delete container %s", id) + sendErr(done, err) } }() @@ -170,21 +194,27 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount task, err := container.NewTask(ctx, cio.NewCreator(cioOpts...), containerd.WithRootFS(rootMounts)) if err != nil { - return err + return sendErr(done, err) } defer func() { if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil { err = errors.Wrapf(err1, "failed to delete task %s", id) + sendErr(done, err) } }() if err := task.Start(ctx); err != nil { - return err + return sendErr(done, err) } + if started != nil { + startedOnce.Do(func() { + close(started) + }) + } statusCh, err := task.Wait(context.Background()) if err != nil { - return err + return sendErr(done, err) } var cancel func() @@ -212,7 +242,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount err = errors.Wrap(ctx.Err(), err.Error()) default: } - return err + return sendErr(done, err) } return nil } @@ -226,6 +256,13 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo // is in the process of being created and check again every 100ms or until // context is canceled. + w.mu.Lock() + done, ok := w.running[id] + w.mu.Unlock() + if !ok { + return errors.Errorf("container %s not found", id) + } + var container containerd.Container var task containerd.Task for { @@ -244,6 +281,11 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo select { case <-ctx.Done(): return ctx.Err() + case err, ok := <-done: + if !ok { + return errors.Errorf("container %s has stopped", id) + } + return errors.Wrapf(err, "container %s has exited with error", id) case <-time.After(100 * time.Millisecond): continue } @@ -275,9 +317,8 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo if meta.Cwd != "" { spec.Process.Cwd = meta.Cwd } - if len(meta.Env) > 0 { - // merge exec env with pid1 env - spec.Process.Env = append(spec.Process.Env, meta.Env...) + if len(process.Meta.Env) > 0 { + spec.Process.Env = process.Meta.Env } cioOpts := []cio.Opt{cio.WithStreams(process.Stdin, process.Stdout, process.Stderr)} @@ -291,3 +332,8 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo } return taskProcess.Start(ctx) } + +func sendErr(c chan error, err error) error { + c <- err + return err +} diff --git a/executor/executor.go b/executor/executor.go index 3cf60e7d..ad7279cb 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -35,7 +35,7 @@ type ProcessInfo struct { } type Executor interface { - Run(ctx context.Context, id string, rootfs cache.Mountable, mounts []Mount, process ProcessInfo) error + Run(ctx context.Context, id string, rootfs cache.Mountable, mounts []Mount, process ProcessInfo, started chan<- struct{}) error Exec(ctx context.Context, id string, process ProcessInfo) error } diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index ad80d257..7c8e6554 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -8,6 +8,7 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "syscall" "time" @@ -60,6 +61,8 @@ type runcExecutor struct { noPivot bool dns *oci.DNSConfig oomScoreAdj *int + running map[string]chan error + mu sync.Mutex } func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Executor, error) { @@ -120,19 +123,39 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex noPivot: opt.NoPivot, dns: opt.DNS, oomScoreAdj: opt.OOMScoreAdj, + running: make(map[string]chan error), + mu: sync.Mutex{}, } return w, nil } -func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo) error { +func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) error { meta := process.Meta + + startedOnce := sync.Once{} + done := make(chan error, 1) + w.mu.Lock() + w.running[id] = done + w.mu.Unlock() + defer func() { + w.mu.Lock() + delete(w.running, id) + w.mu.Unlock() + close(done) + if started != nil { + startedOnce.Do(func() { + close(started) + }) + } + }() + provider, ok := w.networkProviders[meta.NetMode] if !ok { - return errors.Errorf("unknown network mode %s", meta.NetMode) + return sendErr(done, errors.Errorf("unknown network mode %s", meta.NetMode)) } namespace, err := provider.New() if err != nil { - return err + return sendErr(done, err) } defer namespace.Close() @@ -142,12 +165,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, resolvConf, err := oci.GetResolvConf(ctx, w.root, w.idmap, w.dns) if err != nil { - return err + return sendErr(done, err) } hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, w.idmap) if err != nil { - return err + return sendErr(done, err) } if clean != nil { defer clean() @@ -155,12 +178,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mountable, err := root.Mount(ctx, false) if err != nil { - return err + return sendErr(done, err) } rootMount, release, err := mountable.Mount() if err != nil { - return err + return sendErr(done, err) } if release != nil { defer release() @@ -172,7 +195,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, bundle := filepath.Join(w.root, id) if err := os.Mkdir(bundle, 0711); err != nil { - return err + return sendErr(done, err) } defer os.RemoveAll(bundle) @@ -183,21 +206,21 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, rootFSPath := filepath.Join(bundle, "rootfs") if err := idtools.MkdirAllAndChown(rootFSPath, 0700, identity); err != nil { - return err + return sendErr(done, err) } if err := mount.All(rootMount, rootFSPath); err != nil { - return err + return sendErr(done, err) } defer mount.Unmount(rootFSPath, 0) uid, gid, sgids, err := oci.GetUser(ctx, rootFSPath, meta.User) if err != nil { - return err + return sendErr(done, err) } f, err := os.Create(filepath.Join(bundle, "config.json")) if err != nil { - return err + return sendErr(done, err) } defer f.Close() @@ -214,7 +237,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, if w.idmap != nil { identity, err = w.idmap.ToHost(identity) if err != nil { - return err + return sendErr(done, err) } } @@ -230,7 +253,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, } spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.processMode, w.idmap, opts...) if err != nil { - return err + return sendErr(done, err) } defer cleanup() @@ -241,11 +264,11 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, newp, err := fs.RootPath(rootFSPath, meta.Cwd) if err != nil { - return errors.Wrapf(err, "working dir %s points to invalid target", newp) + return sendErr(done, errors.Wrapf(err, "working dir %s points to invalid target", newp)) } if _, err := os.Stat(newp); err != nil { if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil { - return errors.Wrapf(err, "failed to create working directory %s", newp) + return sendErr(done, errors.Wrapf(err, "failed to create working directory %s", newp)) } } @@ -253,19 +276,19 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, spec.Process.OOMScoreAdj = w.oomScoreAdj if w.rootless { if err := rootlessspecconv.ToRootless(spec); err != nil { - return err + return sendErr(done, err) } } if err := json.NewEncoder(f).Encode(spec); err != nil { - return err + return sendErr(done, err) } // runCtx/killCtx is used for extra check in case the kill command blocks runCtx, cancelRun := context.WithCancel(context.Background()) defer cancelRun() - done := make(chan struct{}) + ended := make(chan struct{}) go func() { for { select { @@ -284,21 +307,27 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, timeout() select { case <-time.After(50 * time.Millisecond): - case <-done: + case <-ended: return } - case <-done: + case <-ended: return } } }() logrus.Debugf("> creating %s %v", id, meta.Args) + // this is a cheat, we have not actually started, but as close as we can get with runc for now + if started != nil { + startedOnce.Do(func() { + close(started) + }) + } status, err := w.runc.Run(runCtx, id, bundle, &runc.CreateOpts{ IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, NoPivot: w.noPivot, }) - close(done) + close(ended) if status != 0 || err != nil { if err == nil { @@ -306,9 +335,9 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, } select { case <-ctx.Done(): - return errors.Wrapf(ctx.Err(), err.Error()) + return sendErr(done, errors.Wrapf(ctx.Err(), err.Error())) default: - return stack.Enable(err) + return sendErr(done, stack.Enable(err)) } } @@ -319,6 +348,13 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro // first verify the container is running, if we get an error assume the container // is in the process of being created and check again every 100ms or until // context is canceled. + w.mu.Lock() + done, ok := w.running[id] + w.mu.Unlock() + if !ok { + return errors.Errorf("container %s not found", id) + } + state, _ := w.runc.State(ctx, id) for { if state != nil && state.Status == "running" { @@ -327,6 +363,11 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro select { case <-ctx.Done(): return ctx.Err() + case err, ok := <-done: + if !ok { + return errors.Errorf("container %s has stopped", id) + } + return errors.Wrapf(err, "container %s has exited with error", id) case <-time.After(100 * time.Millisecond): state, _ = w.runc.State(ctx, id) } @@ -361,9 +402,9 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro if process.Meta.Cwd != "" { spec.Process.Cwd = process.Meta.Cwd } + if len(process.Meta.Env) > 0 { - // merge exec env with pid1 env - spec.Process.Env = append(spec.Process.Env, process.Meta.Env...) + spec.Process.Env = process.Meta.Env } return w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{ @@ -397,3 +438,8 @@ func (s *forwardIO) Stdout() io.ReadCloser { func (s *forwardIO) Stderr() io.ReadCloser { return nil } + +func sendErr(c chan error, err error) error { + c <- err + return err +} diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index b4306bd8..6258ff53 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -732,7 +732,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu defer stdout.Close() defer stderr.Close() - if err := e.exec.Run(ctx, "", root, mounts, executor.ProcessInfo{Meta: meta, Stdin: nil, Stdout: stdout, Stderr: stderr}); err != nil { + if err := e.exec.Run(ctx, "", root, mounts, executor.ProcessInfo{Meta: meta, Stdin: nil, Stdout: stdout, Stderr: stderr}, nil); err != nil { return nil, errors.Wrapf(err, "executor failed running %v", meta.Args) } diff --git a/worker/base/worker.go b/worker/base/worker.go index cd08b35b..43024394 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -297,7 +297,7 @@ func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.Immu return err } defer active.Release(context.TODO()) - return w.Executor.Run(ctx, "", active, nil, executor.ProcessInfo{Meta: meta, Stdin: stdin, Stdout: stdout, Stderr: stderr}) + return w.Executor.Run(ctx, "", active, nil, executor.ProcessInfo{Meta: meta, Stdin: stdin, Stdout: stdout, Stderr: stderr}, nil) } func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) { diff --git a/worker/containerd/containerd_test.go b/worker/containerd/containerd_test.go index 28f74df7..2f76f05d 100644 --- a/worker/containerd/containerd_test.go +++ b/worker/containerd/containerd_test.go @@ -3,34 +3,23 @@ package containerd import ( - "bytes" - "context" - "io" "io/ioutil" "os" "testing" - "github.com/containerd/containerd/namespaces" - "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/executor" - "github.com/moby/buildkit/identity" - "github.com/moby/buildkit/session" - "github.com/moby/buildkit/source" "github.com/moby/buildkit/util/network/netproviders" "github.com/moby/buildkit/worker/base" - "github.com/pkg/errors" + "github.com/moby/buildkit/worker/tests" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" ) const sockFile = "/run/containerd/containerd.sock" -const ns = "buildkit-test" func newWorkerOpt(t *testing.T) (base.WorkerOpt, func()) { tmpdir, err := ioutil.TempDir("", "workertest") require.NoError(t, err) cleanup := func() { os.RemoveAll(tmpdir) } - workerOpt, err := NewWorkerOpt(tmpdir, sockFile, "overlayfs", ns, nil, nil, netproviders.Opt{Mode: "host"}) + workerOpt, err := NewWorkerOpt(tmpdir, sockFile, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}) require.NoError(t, err) return workerOpt, cleanup } @@ -49,20 +38,6 @@ func checkRequirement(t *testing.T) { } } -func newCtx(s string) context.Context { - return namespaces.WithNamespace(context.Background(), s) -} - -func newBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker, sm *session.Manager) cache.ImmutableRef { - img, err := source.NewImageIdentifier("docker.io/library/busybox:latest") - require.NoError(t, err) - src, err := w.SourceManager.Resolve(ctx, img, sm) - require.NoError(t, err) - snap, err := src.Snapshot(ctx, nil) - require.NoError(t, err) - return snap -} - func TestContainerdWorkerExec(t *testing.T) { t.Parallel() checkRequirement(t) @@ -72,60 +47,16 @@ func TestContainerdWorkerExec(t *testing.T) { w, err := base.NewWorker(workerOpt) require.NoError(t, err) - ctx := newCtx(ns) - ctx, cancel := context.WithCancel(ctx) - sm, err := session.NewManager() - require.NoError(t, err) - - snap := newBusyboxSourceSnapshot(ctx, t, w, sm) - root, err := w.CacheManager.New(ctx, snap) - require.NoError(t, err) - - id := identity.NewID() - - // first start pid1 in the background - eg := errgroup.Group{} - eg.Go(func() error { - return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ - Meta: executor.Meta{ - Args: []string{"sleep", "10"}, - Cwd: "/", - Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"}, - }, - }) - }) - - stdout := bytes.NewBuffer(nil) - stderr := bytes.NewBuffer(nil) - err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ - Meta: executor.Meta{ - Args: []string{"ps", "-o", "pid,comm"}, - }, - Stdout: &nopCloser{stdout}, - Stderr: &nopCloser{stderr}, - }) - t.Logf("Stdout: %s", stdout.String()) - t.Logf("Stderr: %s", stderr.String()) - require.NoError(t, err) - // verify pid1 is sleep - require.Contains(t, stdout.String(), "1 sleep") - require.Empty(t, stderr.String()) - - // stop pid1 - cancel() - - err = eg.Wait() - // we expect this to get canceled after we test the exec - require.EqualError(t, errors.Cause(err), "context canceled") - - err = snap.Release(ctx) - require.NoError(t, err) + tests.TestWorkerExec(t, w) } +func TestContainerdWorkerExecFailures(t *testing.T) { + t.Parallel() + checkRequirement(t) -type nopCloser struct { - io.Writer -} + workerOpt, cleanupWorkerOpt := newWorkerOpt(t) + defer cleanupWorkerOpt() + w, err := base.NewWorker(workerOpt) + require.NoError(t, err) -func (n *nopCloser) Close() error { - return nil + tests.TestWorkerExecFailures(t, w) } diff --git a/worker/runc/runc_test.go b/worker/runc/runc_test.go index 3645769f..b18dceb6 100644 --- a/worker/runc/runc_test.go +++ b/worker/runc/runc_test.go @@ -4,7 +4,6 @@ package runc import ( "bytes" - "context" "fmt" "io" "io/ioutil" @@ -13,22 +12,17 @@ import ( "path/filepath" "testing" - "github.com/containerd/containerd/namespaces" ctdsnapshot "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/overlay" - "github.com/moby/buildkit/cache" "github.com/moby/buildkit/client" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/executor/oci" - "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" - "github.com/moby/buildkit/source" "github.com/moby/buildkit/util/network/netproviders" "github.com/moby/buildkit/worker/base" - "github.com/pkg/errors" + "github.com/moby/buildkit/worker/tests" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" ) func newWorkerOpt(t *testing.T, processMode oci.ProcessMode) (base.WorkerOpt, func()) { @@ -61,20 +55,6 @@ func checkRequirement(t *testing.T) { } } -func newCtx(s string) context.Context { - return namespaces.WithNamespace(context.Background(), s) -} - -func newBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker, sm *session.Manager) cache.ImmutableRef { - img, err := source.NewImageIdentifier("docker.io/library/busybox:latest") - require.NoError(t, err) - src, err := w.SourceManager.Resolve(ctx, img, sm) - require.NoError(t, err) - snap, err := src.Snapshot(ctx, nil) - require.NoError(t, err) - return snap -} - func TestRuncWorker(t *testing.T) { t.Parallel() checkRequirement(t) @@ -84,10 +64,10 @@ func TestRuncWorker(t *testing.T) { w, err := base.NewWorker(workerOpt) require.NoError(t, err) - ctx := newCtx("buildkit-test") + ctx := tests.NewCtx("buildkit-test") sm, err := session.NewManager() require.NoError(t, err) - snap := newBusyboxSourceSnapshot(ctx, t, w, sm) + snap := tests.NewBusyboxSourceSnapshot(ctx, t, w, sm) mounts, err := snap.Mount(ctx, false) require.NoError(t, err) @@ -127,7 +107,7 @@ func TestRuncWorker(t *testing.T) { } stderr := bytes.NewBuffer(nil) - err = w.Executor.Run(ctx, "", snap, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}) + err = w.Executor.Run(ctx, "", snap, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) require.Error(t, err) // Read-only root // typical error is like `mkdir /.../rootfs/proc: read-only file system`. // make sure the error is caused before running `echo foo > /bar`. @@ -136,7 +116,7 @@ func TestRuncWorker(t *testing.T) { root, err := w.CacheManager.New(ctx, snap) require.NoError(t, err) - err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}) + err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) require.NoError(t, err) meta = executor.Meta{ @@ -144,7 +124,7 @@ func TestRuncWorker(t *testing.T) { Cwd: "/", } - err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}) + err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) require.NoError(t, err) rf, err := root.Commit(ctx) @@ -188,10 +168,10 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) { w, err := base.NewWorker(workerOpt) require.NoError(t, err) - ctx := newCtx("buildkit-test") + ctx := tests.NewCtx("buildkit-test") sm, err := session.NewManager() require.NoError(t, err) - snap := newBusyboxSourceSnapshot(ctx, t, w, sm) + snap := tests.NewBusyboxSourceSnapshot(ctx, t, w, sm) root, err := w.CacheManager.New(ctx, snap) require.NoError(t, err) @@ -205,7 +185,7 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) { } stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) - err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}) + err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}, nil) require.NoError(t, err, fmt.Sprintf("stdout=%q, stderr=%q", stdout.String(), stderr.String())) require.Equal(t, string(selfCmdline), stdout.String()) } @@ -219,55 +199,19 @@ func TestRuncWorkerExec(t *testing.T) { w, err := base.NewWorker(workerOpt) require.NoError(t, err) - ctx := newCtx("buildkit-test") - ctx, cancel := context.WithCancel(ctx) - sm, err := session.NewManager() - require.NoError(t, err) - - snap := newBusyboxSourceSnapshot(ctx, t, w, sm) - root, err := w.CacheManager.New(ctx, snap) - require.NoError(t, err) - - id := identity.NewID() - - // first start pid1 in the background - eg := errgroup.Group{} - eg.Go(func() error { - return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ - Meta: executor.Meta{ - Args: []string{"sleep", "10"}, - Cwd: "/", - Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"}, - }, - }) - }) - - stdout := bytes.NewBuffer(nil) - stderr := bytes.NewBuffer(nil) - err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ - Meta: executor.Meta{ - Args: []string{"ps", "-o", "pid,comm"}, - }, - Stdout: &nopCloser{stdout}, - Stderr: &nopCloser{stderr}, - }) - t.Logf("Stdout: %s", stdout.String()) - t.Logf("Stderr: %s", stderr.String()) - require.NoError(t, err) - // verify pid1 is sleep - require.Contains(t, stdout.String(), "1 sleep") - require.Empty(t, stderr.String()) - - // stop pid1 - cancel() - - err = eg.Wait() - // we expect pid1 to get canceled after we test the exec - require.EqualError(t, errors.Cause(err), "context canceled") - - err = snap.Release(ctx) + tests.TestWorkerExec(t, w) +} + +func TestRuncWorkerExecFailures(t *testing.T) { + t.Parallel() + checkRequirement(t) + + workerOpt, cleanupWorkerOpt := newWorkerOpt(t, oci.ProcessSandbox) + defer cleanupWorkerOpt() + w, err := base.NewWorker(workerOpt) require.NoError(t, err) + tests.TestWorkerExecFailures(t, w) } type nopCloser struct { diff --git a/worker/tests/common.go b/worker/tests/common.go new file mode 100644 index 00000000..1e622e54 --- /dev/null +++ b/worker/tests/common.go @@ -0,0 +1,201 @@ +package tests + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/containerd/containerd/namespaces" + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/executor" + "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/source" + "github.com/moby/buildkit/worker/base" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func NewBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker, sm *session.Manager) cache.ImmutableRef { + img, err := source.NewImageIdentifier("docker.io/library/busybox:latest") + require.NoError(t, err) + src, err := w.SourceManager.Resolve(ctx, img, sm) + require.NoError(t, err) + snap, err := src.Snapshot(ctx, nil) + require.NoError(t, err) + return snap +} + +func NewCtx(s string) context.Context { + return namespaces.WithNamespace(context.Background(), s) +} + +func TestWorkerExec(t *testing.T, w *base.Worker) { + ctx := NewCtx("buildkit-test") + ctx, cancel := context.WithCancel(ctx) + sm, err := session.NewManager() + require.NoError(t, err) + + snap := NewBusyboxSourceSnapshot(ctx, t, w, sm) + root, err := w.CacheManager.New(ctx, snap) + require.NoError(t, err) + + id := identity.NewID() + + // first start pid1 in the background + eg := errgroup.Group{} + started := make(chan struct{}) + eg.Go(func() error { + return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"sleep", "10"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"}, + }, + }, started) + }) + + <-started + + stdout := bytes.NewBuffer(nil) + stderr := bytes.NewBuffer(nil) + + // verify pid1 is the sleep command via Exec + err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"ps", "-o", "pid,comm"}, + }, + Stdout: &nopCloser{stdout}, + Stderr: &nopCloser{stderr}, + }) + t.Logf("Stdout: %s", stdout.String()) + t.Logf("Stderr: %s", stderr.String()) + require.NoError(t, err) + // verify pid1 is sleep + require.Contains(t, stdout.String(), "1 sleep") + require.Empty(t, stderr.String()) + + // simulate: echo -n "hello" | cat > /tmp/msg + stdin := bytes.NewReader([]byte("hello")) + stdout.Reset() + stderr.Reset() + err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"sh", "-c", "cat > /tmp/msg"}, + }, + Stdin: &nopReadCloser{stdin}, + Stdout: &nopCloser{stdout}, + Stderr: &nopCloser{stderr}, + }) + require.NoError(t, err) + require.Empty(t, stdout.String()) + require.Empty(t, stderr.String()) + + // verify contents of /tmp/msg + stdout.Reset() + stderr.Reset() + err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"cat", "/tmp/msg"}, + }, + Stdout: &nopCloser{stdout}, + Stderr: &nopCloser{stderr}, + }) + t.Logf("Stdout: %s", stdout.String()) + t.Logf("Stderr: %s", stderr.String()) + require.NoError(t, err) + require.Equal(t, "hello", stdout.String()) + require.Empty(t, stderr.String()) + + // stop pid1 + cancel() + + err = eg.Wait() + // we expect pid1 to get canceled after we test the exec + require.EqualError(t, errors.Cause(err), context.Canceled.Error()) + + err = snap.Release(ctx) + require.NoError(t, err) +} + +func TestWorkerExecFailures(t *testing.T, w *base.Worker) { + ctx := NewCtx("buildkit-test") + sm, err := session.NewManager() + require.NoError(t, err) + + snap := NewBusyboxSourceSnapshot(ctx, t, w, sm) + root, err := w.CacheManager.New(ctx, snap) + require.NoError(t, err) + + id := identity.NewID() + + // pid1 will start but only long enough for /bin/false to run + eg := errgroup.Group{} + started := make(chan struct{}) + eg.Go(func() error { + return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"/bin/false"}, + Cwd: "/", + }, + }, started) + }) + + <-started + + // this should fail since pid1 has already exited + err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"/bin/true"}, + }, + }) + require.Error(t, err) // pid1 no longer running + + err = eg.Wait() + require.Error(t, err) // process returned non-zero exit code: 1 + + // pid1 will not start, bogus pid1 command + eg = errgroup.Group{} + started = make(chan struct{}) + eg.Go(func() error { + return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"bogus"}, + }, + }, started) + }) + + <-started + + // this should fail since pid1 never started + err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"/bin/true"}, + }, + }) + require.Error(t, err) // container has exited with error + + err = eg.Wait() + require.Error(t, err) // pid1 did not terminate successfully + + err = snap.Release(ctx) + require.NoError(t, err) +} + +type nopReadCloser struct { + io.Reader +} + +func (n *nopReadCloser) Close() error { + return nil +} + +type nopCloser struct { + io.Writer +} + +func (n *nopCloser) Close() error { + return nil +} From 5909d1642e8ece08e7aa64c95103c8fcffd2feb9 Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Sat, 11 Jul 2020 00:02:09 +0000 Subject: [PATCH 3/4] simplify done channel handling, fix other pr comments. Signed-off-by: Cory Bennett --- executor/containerdexecutor/executor.go | 66 +++++++++++------------- executor/runcexecutor/executor.go | 67 ++++++++++++------------- worker/tests/common.go | 30 ++++++----- 3 files changed, 79 insertions(+), 84 deletions(-) diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index ead0c112..7857e2ea 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -33,7 +33,7 @@ type containerdExecutor struct { cgroupParent string dnsConfig *oci.DNSConfig running map[string]chan error - mu *sync.Mutex + mu sync.Mutex } // New creates a new executor backed by connection to containerd API @@ -42,18 +42,17 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb os.RemoveAll(filepath.Join(root, "hosts")) os.RemoveAll(filepath.Join(root, "resolv.conf")) - return containerdExecutor{ + return &containerdExecutor{ client: client, root: root, networkProviders: networkProviders, cgroupParent: cgroup, dnsConfig: dnsConfig, running: make(map[string]chan error), - mu: &sync.Mutex{}, } } -func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { if id == "" { id = identity.NewID() } @@ -67,6 +66,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount w.mu.Lock() delete(w.running, id) w.mu.Unlock() + done <- err close(done) if started != nil { startedOnce.Do(func() { @@ -79,12 +79,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig) if err != nil { - return sendErr(done, err) + return err } hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, nil) if err != nil { - return sendErr(done, err) + return err } if clean != nil { defer clean() @@ -92,12 +92,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount mountable, err := root.Mount(ctx, false) if err != nil { - return sendErr(done, err) + return err } rootMounts, release, err := mountable.Mount() if err != nil { - return sendErr(done, err) + return err } if release != nil { defer release() @@ -109,12 +109,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount lm := snapshot.LocalMounterWithMounts(rootMounts) rootfsPath, err := lm.Mount() if err != nil { - return sendErr(done, err) + return err } uid, gid, sgids, err = oci.GetUser(ctx, rootfsPath, meta.User) if err != nil { lm.Unmount() - return sendErr(done, err) + return err } identity := idtools.Identity{ @@ -125,12 +125,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount newp, err := fs.RootPath(rootfsPath, meta.Cwd) if err != nil { lm.Unmount() - return sendErr(done, errors.Wrapf(err, "working dir %s points to invalid target", newp)) + return errors.Wrapf(err, "working dir %s points to invalid target", newp) } if _, err := os.Stat(newp); err != nil { if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil { lm.Unmount() - return sendErr(done, errors.Wrapf(err, "failed to create working directory %s", newp)) + return errors.Wrapf(err, "failed to create working directory %s", newp) } } @@ -139,11 +139,11 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount provider, ok := w.networkProviders[meta.NetMode] if !ok { - return sendErr(done, errors.Errorf("unknown network mode %s", meta.NetMode)) + return errors.Errorf("unknown network mode %s", meta.NetMode) } namespace, err := provider.New() if err != nil { - return sendErr(done, err) + return err } defer namespace.Close() @@ -169,7 +169,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount processMode := oci.ProcessSandbox // FIXME(AkihiroSuda) spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, opts...) if err != nil { - return sendErr(done, err) + return err } defer cleanup() @@ -177,13 +177,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount containerd.WithSpec(spec), ) if err != nil { - return sendErr(done, err) + return err } defer func() { if err1 := container.Delete(context.TODO()); err == nil && err1 != nil { err = errors.Wrapf(err1, "failed to delete container %s", id) - sendErr(done, err) } }() @@ -194,17 +193,16 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount task, err := container.NewTask(ctx, cio.NewCreator(cioOpts...), containerd.WithRootFS(rootMounts)) if err != nil { - return sendErr(done, err) + return err } defer func() { if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil { err = errors.Wrapf(err1, "failed to delete task %s", id) - sendErr(done, err) } }() if err := task.Start(ctx); err != nil { - return sendErr(done, err) + return err } if started != nil { @@ -214,7 +212,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount } statusCh, err := task.Wait(context.Background()) if err != nil { - return sendErr(done, err) + return err } var cancel func() @@ -242,30 +240,31 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount err = errors.Wrap(ctx.Err(), err.Error()) default: } - return sendErr(done, err) + return err } return nil } } } -func (w containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { +func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { meta := process.Meta // first verify the container is running, if we get an error assume the container // is in the process of being created and check again every 100ms or until // context is canceled. - w.mu.Lock() - done, ok := w.running[id] - w.mu.Unlock() - if !ok { - return errors.Errorf("container %s not found", id) - } - var container containerd.Container var task containerd.Task for { + w.mu.Lock() + done, ok := w.running[id] + w.mu.Unlock() + + if !ok { + return errors.Errorf("container %s not found", id) + } + if container == nil { container, _ = w.client.LoadContainer(ctx, id) } @@ -282,7 +281,7 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo case <-ctx.Done(): return ctx.Err() case err, ok := <-done: - if !ok { + if !ok || err == nil { return errors.Errorf("container %s has stopped", id) } return errors.Wrapf(err, "container %s has exited with error", id) @@ -332,8 +331,3 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo } return taskProcess.Start(ctx) } - -func sendErr(c chan error, err error) error { - c <- err - return err -} diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 7c8e6554..068929c6 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -124,12 +124,11 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex dns: opt.DNS, oomScoreAdj: opt.OOMScoreAdj, running: make(map[string]chan error), - mu: sync.Mutex{}, } return w, nil } -func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) error { +func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { meta := process.Meta startedOnce := sync.Once{} @@ -141,6 +140,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, w.mu.Lock() delete(w.running, id) w.mu.Unlock() + done <- err close(done) if started != nil { startedOnce.Do(func() { @@ -151,11 +151,11 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, provider, ok := w.networkProviders[meta.NetMode] if !ok { - return sendErr(done, errors.Errorf("unknown network mode %s", meta.NetMode)) + return errors.Errorf("unknown network mode %s", meta.NetMode) } namespace, err := provider.New() if err != nil { - return sendErr(done, err) + return err } defer namespace.Close() @@ -165,12 +165,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, resolvConf, err := oci.GetResolvConf(ctx, w.root, w.idmap, w.dns) if err != nil { - return sendErr(done, err) + return err } hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, w.idmap) if err != nil { - return sendErr(done, err) + return err } if clean != nil { defer clean() @@ -178,12 +178,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mountable, err := root.Mount(ctx, false) if err != nil { - return sendErr(done, err) + return err } rootMount, release, err := mountable.Mount() if err != nil { - return sendErr(done, err) + return err } if release != nil { defer release() @@ -195,7 +195,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, bundle := filepath.Join(w.root, id) if err := os.Mkdir(bundle, 0711); err != nil { - return sendErr(done, err) + return err } defer os.RemoveAll(bundle) @@ -206,21 +206,21 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, rootFSPath := filepath.Join(bundle, "rootfs") if err := idtools.MkdirAllAndChown(rootFSPath, 0700, identity); err != nil { - return sendErr(done, err) + return err } if err := mount.All(rootMount, rootFSPath); err != nil { - return sendErr(done, err) + return err } defer mount.Unmount(rootFSPath, 0) uid, gid, sgids, err := oci.GetUser(ctx, rootFSPath, meta.User) if err != nil { - return sendErr(done, err) + return err } f, err := os.Create(filepath.Join(bundle, "config.json")) if err != nil { - return sendErr(done, err) + return err } defer f.Close() @@ -237,7 +237,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, if w.idmap != nil { identity, err = w.idmap.ToHost(identity) if err != nil { - return sendErr(done, err) + return err } } @@ -253,7 +253,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, } spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.processMode, w.idmap, opts...) if err != nil { - return sendErr(done, err) + return err } defer cleanup() @@ -264,11 +264,11 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, newp, err := fs.RootPath(rootFSPath, meta.Cwd) if err != nil { - return sendErr(done, errors.Wrapf(err, "working dir %s points to invalid target", newp)) + return errors.Wrapf(err, "working dir %s points to invalid target", newp) } if _, err := os.Stat(newp); err != nil { if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil { - return sendErr(done, errors.Wrapf(err, "failed to create working directory %s", newp)) + return errors.Wrapf(err, "failed to create working directory %s", newp) } } @@ -276,12 +276,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, spec.Process.OOMScoreAdj = w.oomScoreAdj if w.rootless { if err := rootlessspecconv.ToRootless(spec); err != nil { - return sendErr(done, err) + return err } } if err := json.NewEncoder(f).Encode(spec); err != nil { - return sendErr(done, err) + return err } // runCtx/killCtx is used for extra check in case the kill command blocks @@ -335,9 +335,9 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, } select { case <-ctx.Done(): - return sendErr(done, errors.Wrapf(ctx.Err(), err.Error())) + return errors.Wrapf(ctx.Err(), err.Error()) default: - return sendErr(done, stack.Enable(err)) + return stack.Enable(err) } } @@ -348,15 +348,16 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro // first verify the container is running, if we get an error assume the container // is in the process of being created and check again every 100ms or until // context is canceled. - w.mu.Lock() - done, ok := w.running[id] - w.mu.Unlock() - if !ok { - return errors.Errorf("container %s not found", id) - } - - state, _ := w.runc.State(ctx, id) + var state *runc.Container for { + w.mu.Lock() + done, ok := w.running[id] + w.mu.Unlock() + if !ok { + return errors.Errorf("container %s not found", id) + } + + state, _ = w.runc.State(ctx, id) if state != nil && state.Status == "running" { break } @@ -364,12 +365,11 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro case <-ctx.Done(): return ctx.Err() case err, ok := <-done: - if !ok { + if !ok || err == nil { return errors.Errorf("container %s has stopped", id) } return errors.Wrapf(err, "container %s has exited with error", id) case <-time.After(100 * time.Millisecond): - state, _ = w.runc.State(ctx, id) } } @@ -438,8 +438,3 @@ func (s *forwardIO) Stdout() io.ReadCloser { func (s *forwardIO) Stderr() io.ReadCloser { return nil } - -func sendErr(c chan error, err error) error { - c <- err - return err -} diff --git a/worker/tests/common.go b/worker/tests/common.go index 1e622e54..0fabd51a 100644 --- a/worker/tests/common.go +++ b/worker/tests/common.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "io" + "io/ioutil" "testing" + "time" "github.com/containerd/containerd/namespaces" "github.com/moby/buildkit/cache" @@ -57,7 +59,11 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { }, started) }) - <-started + select { + case <-started: + case <-time.After(10 * time.Second): + t.Error("Unexpected timeout waiting for pid1 to start") + } stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) @@ -85,7 +91,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { Meta: executor.Meta{ Args: []string{"sh", "-c", "cat > /tmp/msg"}, }, - Stdin: &nopReadCloser{stdin}, + Stdin: ioutil.NopCloser(stdin), Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}, }) @@ -143,7 +149,11 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { }, started) }) - <-started + select { + case <-started: + case <-time.After(10 * time.Second): + t.Error("Unexpected timeout waiting for pid1 to start") + } // this should fail since pid1 has already exited err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ @@ -167,7 +177,11 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { }, started) }) - <-started + select { + case <-started: + case <-time.After(10 * time.Second): + t.Error("Unexpected timeout waiting for pid1 to start") + } // this should fail since pid1 never started err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ @@ -184,14 +198,6 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { require.NoError(t, err) } -type nopReadCloser struct { - io.Reader -} - -func (n *nopReadCloser) Close() error { - return nil -} - type nopCloser struct { io.Writer } From caac9457888435ee0cda97b89a142196cd4acd5b Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Mon, 13 Jul 2020 20:16:05 +0000 Subject: [PATCH 4/4] update godoc for Executor interface Signed-off-by: Cory Bennett --- executor/executor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/executor/executor.go b/executor/executor.go index ad7279cb..5ab42525 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -35,7 +35,12 @@ type ProcessInfo struct { } type Executor interface { + // Run will start a container for the given process with rootfs, mounts. + // `id` is an optional name for the container so it can be referenced later via Exec. + // `started` is an optional channel that will be closed when the container setup completes and has started running. Run(ctx context.Context, id string, rootfs cache.Mountable, mounts []Mount, process ProcessInfo, started chan<- struct{}) error + // Exec will start a process in container matching `id`. An error will be returned + // if the container failed to start (via Run) or has exited before Exec is called. Exec(ctx context.Context, id string, process ProcessInfo) error }