diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index ead0c112..7857e2ea 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -33,7 +33,7 @@ type containerdExecutor struct { cgroupParent string dnsConfig *oci.DNSConfig running map[string]chan error - mu *sync.Mutex + mu sync.Mutex } // New creates a new executor backed by connection to containerd API @@ -42,18 +42,17 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb os.RemoveAll(filepath.Join(root, "hosts")) os.RemoveAll(filepath.Join(root, "resolv.conf")) - return containerdExecutor{ + return &containerdExecutor{ client: client, root: root, networkProviders: networkProviders, cgroupParent: cgroup, dnsConfig: dnsConfig, running: make(map[string]chan error), - mu: &sync.Mutex{}, } } -func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { if id == "" { id = identity.NewID() } @@ -67,6 +66,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount w.mu.Lock() delete(w.running, id) w.mu.Unlock() + done <- err close(done) if started != nil { startedOnce.Do(func() { @@ -79,12 +79,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig) if err != nil { - return sendErr(done, err) + return err } hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, nil) if err != nil { - return sendErr(done, err) + return err } if clean != nil { defer clean() @@ -92,12 +92,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount mountable, err := root.Mount(ctx, false) if err != nil { - return sendErr(done, err) + return err } rootMounts, release, err := mountable.Mount() if err != nil { - return sendErr(done, err) + return err } if release != nil { defer release() @@ -109,12 +109,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount lm := snapshot.LocalMounterWithMounts(rootMounts) rootfsPath, err := lm.Mount() if err != nil { - return sendErr(done, err) + return err } uid, gid, sgids, err = oci.GetUser(ctx, rootfsPath, meta.User) if err != nil { lm.Unmount() - return sendErr(done, err) + return err } identity := idtools.Identity{ @@ -125,12 +125,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount newp, err := fs.RootPath(rootfsPath, meta.Cwd) if err != nil { lm.Unmount() - return sendErr(done, errors.Wrapf(err, "working dir %s points to invalid target", newp)) + return errors.Wrapf(err, "working dir %s points to invalid target", newp) } if _, err := os.Stat(newp); err != nil { if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil { lm.Unmount() - return sendErr(done, errors.Wrapf(err, "failed to create working directory %s", newp)) + return errors.Wrapf(err, "failed to create working directory %s", newp) } } @@ -139,11 +139,11 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount provider, ok := w.networkProviders[meta.NetMode] if !ok { - return sendErr(done, errors.Errorf("unknown network mode %s", meta.NetMode)) + return errors.Errorf("unknown network mode %s", meta.NetMode) } namespace, err := provider.New() if err != nil { - return sendErr(done, err) + return err } defer namespace.Close() @@ -169,7 +169,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount processMode := oci.ProcessSandbox // FIXME(AkihiroSuda) spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, opts...) if err != nil { - return sendErr(done, err) + return err } defer cleanup() @@ -177,13 +177,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount containerd.WithSpec(spec), ) if err != nil { - return sendErr(done, err) + return err } defer func() { if err1 := container.Delete(context.TODO()); err == nil && err1 != nil { err = errors.Wrapf(err1, "failed to delete container %s", id) - sendErr(done, err) } }() @@ -194,17 +193,16 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount task, err := container.NewTask(ctx, cio.NewCreator(cioOpts...), containerd.WithRootFS(rootMounts)) if err != nil { - return sendErr(done, err) + return err } defer func() { if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil { err = errors.Wrapf(err1, "failed to delete task %s", id) - sendErr(done, err) } }() if err := task.Start(ctx); err != nil { - return sendErr(done, err) + return err } if started != nil { @@ -214,7 +212,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount } statusCh, err := task.Wait(context.Background()) if err != nil { - return sendErr(done, err) + return err } var cancel func() @@ -242,30 +240,31 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount err = errors.Wrap(ctx.Err(), err.Error()) default: } - return sendErr(done, err) + return err } return nil } } } -func (w containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { +func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { meta := process.Meta // first verify the container is running, if we get an error assume the container // is in the process of being created and check again every 100ms or until // context is canceled. - w.mu.Lock() - done, ok := w.running[id] - w.mu.Unlock() - if !ok { - return errors.Errorf("container %s not found", id) - } - var container containerd.Container var task containerd.Task for { + w.mu.Lock() + done, ok := w.running[id] + w.mu.Unlock() + + if !ok { + return errors.Errorf("container %s not found", id) + } + if container == nil { container, _ = w.client.LoadContainer(ctx, id) } @@ -282,7 +281,7 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo case <-ctx.Done(): return ctx.Err() case err, ok := <-done: - if !ok { + if !ok || err == nil { return errors.Errorf("container %s has stopped", id) } return errors.Wrapf(err, "container %s has exited with error", id) @@ -332,8 +331,3 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo } return taskProcess.Start(ctx) } - -func sendErr(c chan error, err error) error { - c <- err - return err -} diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 7c8e6554..068929c6 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -124,12 +124,11 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex dns: opt.DNS, oomScoreAdj: opt.OOMScoreAdj, running: make(map[string]chan error), - mu: sync.Mutex{}, } return w, nil } -func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) error { +func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { meta := process.Meta startedOnce := sync.Once{} @@ -141,6 +140,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, w.mu.Lock() delete(w.running, id) w.mu.Unlock() + done <- err close(done) if started != nil { startedOnce.Do(func() { @@ -151,11 +151,11 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, provider, ok := w.networkProviders[meta.NetMode] if !ok { - return sendErr(done, errors.Errorf("unknown network mode %s", meta.NetMode)) + return errors.Errorf("unknown network mode %s", meta.NetMode) } namespace, err := provider.New() if err != nil { - return sendErr(done, err) + return err } defer namespace.Close() @@ -165,12 +165,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, resolvConf, err := oci.GetResolvConf(ctx, w.root, w.idmap, w.dns) if err != nil { - return sendErr(done, err) + return err } hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, w.idmap) if err != nil { - return sendErr(done, err) + return err } if clean != nil { defer clean() @@ -178,12 +178,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mountable, err := root.Mount(ctx, false) if err != nil { - return sendErr(done, err) + return err } rootMount, release, err := mountable.Mount() if err != nil { - return sendErr(done, err) + return err } if release != nil { defer release() @@ -195,7 +195,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, bundle := filepath.Join(w.root, id) if err := os.Mkdir(bundle, 0711); err != nil { - return sendErr(done, err) + return err } defer os.RemoveAll(bundle) @@ -206,21 +206,21 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, rootFSPath := filepath.Join(bundle, "rootfs") if err := idtools.MkdirAllAndChown(rootFSPath, 0700, identity); err != nil { - return sendErr(done, err) + return err } if err := mount.All(rootMount, rootFSPath); err != nil { - return sendErr(done, err) + return err } defer mount.Unmount(rootFSPath, 0) uid, gid, sgids, err := oci.GetUser(ctx, rootFSPath, meta.User) if err != nil { - return sendErr(done, err) + return err } f, err := os.Create(filepath.Join(bundle, "config.json")) if err != nil { - return sendErr(done, err) + return err } defer f.Close() @@ -237,7 +237,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, if w.idmap != nil { identity, err = w.idmap.ToHost(identity) if err != nil { - return sendErr(done, err) + return err } } @@ -253,7 +253,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, } spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.processMode, w.idmap, opts...) if err != nil { - return sendErr(done, err) + return err } defer cleanup() @@ -264,11 +264,11 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, newp, err := fs.RootPath(rootFSPath, meta.Cwd) if err != nil { - return sendErr(done, errors.Wrapf(err, "working dir %s points to invalid target", newp)) + return errors.Wrapf(err, "working dir %s points to invalid target", newp) } if _, err := os.Stat(newp); err != nil { if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil { - return sendErr(done, errors.Wrapf(err, "failed to create working directory %s", newp)) + return errors.Wrapf(err, "failed to create working directory %s", newp) } } @@ -276,12 +276,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, spec.Process.OOMScoreAdj = w.oomScoreAdj if w.rootless { if err := rootlessspecconv.ToRootless(spec); err != nil { - return sendErr(done, err) + return err } } if err := json.NewEncoder(f).Encode(spec); err != nil { - return sendErr(done, err) + return err } // runCtx/killCtx is used for extra check in case the kill command blocks @@ -335,9 +335,9 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, } select { case <-ctx.Done(): - return sendErr(done, errors.Wrapf(ctx.Err(), err.Error())) + return errors.Wrapf(ctx.Err(), err.Error()) default: - return sendErr(done, stack.Enable(err)) + return stack.Enable(err) } } @@ -348,15 +348,16 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro // first verify the container is running, if we get an error assume the container // is in the process of being created and check again every 100ms or until // context is canceled. - w.mu.Lock() - done, ok := w.running[id] - w.mu.Unlock() - if !ok { - return errors.Errorf("container %s not found", id) - } - - state, _ := w.runc.State(ctx, id) + var state *runc.Container for { + w.mu.Lock() + done, ok := w.running[id] + w.mu.Unlock() + if !ok { + return errors.Errorf("container %s not found", id) + } + + state, _ = w.runc.State(ctx, id) if state != nil && state.Status == "running" { break } @@ -364,12 +365,11 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro case <-ctx.Done(): return ctx.Err() case err, ok := <-done: - if !ok { + if !ok || err == nil { return errors.Errorf("container %s has stopped", id) } return errors.Wrapf(err, "container %s has exited with error", id) case <-time.After(100 * time.Millisecond): - state, _ = w.runc.State(ctx, id) } } @@ -438,8 +438,3 @@ func (s *forwardIO) Stdout() io.ReadCloser { func (s *forwardIO) Stderr() io.ReadCloser { return nil } - -func sendErr(c chan error, err error) error { - c <- err - return err -} diff --git a/worker/tests/common.go b/worker/tests/common.go index 1e622e54..0fabd51a 100644 --- a/worker/tests/common.go +++ b/worker/tests/common.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "io" + "io/ioutil" "testing" + "time" "github.com/containerd/containerd/namespaces" "github.com/moby/buildkit/cache" @@ -57,7 +59,11 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { }, started) }) - <-started + select { + case <-started: + case <-time.After(10 * time.Second): + t.Error("Unexpected timeout waiting for pid1 to start") + } stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) @@ -85,7 +91,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { Meta: executor.Meta{ Args: []string{"sh", "-c", "cat > /tmp/msg"}, }, - Stdin: &nopReadCloser{stdin}, + Stdin: ioutil.NopCloser(stdin), Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}, }) @@ -143,7 +149,11 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { }, started) }) - <-started + select { + case <-started: + case <-time.After(10 * time.Second): + t.Error("Unexpected timeout waiting for pid1 to start") + } // this should fail since pid1 has already exited err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ @@ -167,7 +177,11 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { }, started) }) - <-started + select { + case <-started: + case <-time.After(10 * time.Second): + t.Error("Unexpected timeout waiting for pid1 to start") + } // this should fail since pid1 never started err = w.Executor.Exec(ctx, id, executor.ProcessInfo{ @@ -184,14 +198,6 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { require.NoError(t, err) } -type nopReadCloser struct { - io.Reader -} - -func (n *nopReadCloser) Close() error { - return nil -} - type nopCloser struct { io.Writer }