simplify done channel handling, fix other pr comments.

Signed-off-by: Cory Bennett <cbennett@netflix.com>
v0.8
Cory Bennett 2020-07-11 00:02:09 +00:00
parent 5e91dff4ed
commit 5909d1642e
3 changed files with 79 additions and 84 deletions

View File

@ -33,7 +33,7 @@ type containerdExecutor struct {
cgroupParent string
dnsConfig *oci.DNSConfig
running map[string]chan error
mu *sync.Mutex
mu sync.Mutex
}
// New creates a new executor backed by connection to containerd API
@ -42,18 +42,17 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb
os.RemoveAll(filepath.Join(root, "hosts"))
os.RemoveAll(filepath.Join(root, "resolv.conf"))
return containerdExecutor{
return &containerdExecutor{
client: client,
root: root,
networkProviders: networkProviders,
cgroupParent: cgroup,
dnsConfig: dnsConfig,
running: make(map[string]chan error),
mu: &sync.Mutex{},
}
}
func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
if id == "" {
id = identity.NewID()
}
@ -67,6 +66,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
w.mu.Lock()
delete(w.running, id)
w.mu.Unlock()
done <- err
close(done)
if started != nil {
startedOnce.Do(func() {
@ -79,12 +79,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig)
if err != nil {
return sendErr(done, err)
return err
}
hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, nil)
if err != nil {
return sendErr(done, err)
return err
}
if clean != nil {
defer clean()
@ -92,12 +92,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
mountable, err := root.Mount(ctx, false)
if err != nil {
return sendErr(done, err)
return err
}
rootMounts, release, err := mountable.Mount()
if err != nil {
return sendErr(done, err)
return err
}
if release != nil {
defer release()
@ -109,12 +109,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
lm := snapshot.LocalMounterWithMounts(rootMounts)
rootfsPath, err := lm.Mount()
if err != nil {
return sendErr(done, err)
return err
}
uid, gid, sgids, err = oci.GetUser(ctx, rootfsPath, meta.User)
if err != nil {
lm.Unmount()
return sendErr(done, err)
return err
}
identity := idtools.Identity{
@ -125,12 +125,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
newp, err := fs.RootPath(rootfsPath, meta.Cwd)
if err != nil {
lm.Unmount()
return sendErr(done, errors.Wrapf(err, "working dir %s points to invalid target", newp))
return errors.Wrapf(err, "working dir %s points to invalid target", newp)
}
if _, err := os.Stat(newp); err != nil {
if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil {
lm.Unmount()
return sendErr(done, errors.Wrapf(err, "failed to create working directory %s", newp))
return errors.Wrapf(err, "failed to create working directory %s", newp)
}
}
@ -139,11 +139,11 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
provider, ok := w.networkProviders[meta.NetMode]
if !ok {
return sendErr(done, errors.Errorf("unknown network mode %s", meta.NetMode))
return errors.Errorf("unknown network mode %s", meta.NetMode)
}
namespace, err := provider.New()
if err != nil {
return sendErr(done, err)
return err
}
defer namespace.Close()
@ -169,7 +169,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
processMode := oci.ProcessSandbox // FIXME(AkihiroSuda)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, opts...)
if err != nil {
return sendErr(done, err)
return err
}
defer cleanup()
@ -177,13 +177,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
containerd.WithSpec(spec),
)
if err != nil {
return sendErr(done, err)
return err
}
defer func() {
if err1 := container.Delete(context.TODO()); err == nil && err1 != nil {
err = errors.Wrapf(err1, "failed to delete container %s", id)
sendErr(done, err)
}
}()
@ -194,17 +193,16 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
task, err := container.NewTask(ctx, cio.NewCreator(cioOpts...), containerd.WithRootFS(rootMounts))
if err != nil {
return sendErr(done, err)
return err
}
defer func() {
if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil {
err = errors.Wrapf(err1, "failed to delete task %s", id)
sendErr(done, err)
}
}()
if err := task.Start(ctx); err != nil {
return sendErr(done, err)
return err
}
if started != nil {
@ -214,7 +212,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
}
statusCh, err := task.Wait(context.Background())
if err != nil {
return sendErr(done, err)
return err
}
var cancel func()
@ -242,30 +240,31 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
err = errors.Wrap(ctx.Err(), err.Error())
default:
}
return sendErr(done, err)
return err
}
return nil
}
}
}
func (w containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
meta := process.Meta
// first verify the container is running, if we get an error assume the container
// is in the process of being created and check again every 100ms or until
// context is canceled.
var container containerd.Container
var task containerd.Task
for {
w.mu.Lock()
done, ok := w.running[id]
w.mu.Unlock()
if !ok {
return errors.Errorf("container %s not found", id)
}
var container containerd.Container
var task containerd.Task
for {
if container == nil {
container, _ = w.client.LoadContainer(ctx, id)
}
@ -282,7 +281,7 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo
case <-ctx.Done():
return ctx.Err()
case err, ok := <-done:
if !ok {
if !ok || err == nil {
return errors.Errorf("container %s has stopped", id)
}
return errors.Wrapf(err, "container %s has exited with error", id)
@ -332,8 +331,3 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo
}
return taskProcess.Start(ctx)
}
func sendErr(c chan error, err error) error {
c <- err
return err
}

View File

@ -124,12 +124,11 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex
dns: opt.DNS,
oomScoreAdj: opt.OOMScoreAdj,
running: make(map[string]chan error),
mu: sync.Mutex{},
}
return w, nil
}
func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) error {
func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
meta := process.Meta
startedOnce := sync.Once{}
@ -141,6 +140,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
w.mu.Lock()
delete(w.running, id)
w.mu.Unlock()
done <- err
close(done)
if started != nil {
startedOnce.Do(func() {
@ -151,11 +151,11 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
provider, ok := w.networkProviders[meta.NetMode]
if !ok {
return sendErr(done, errors.Errorf("unknown network mode %s", meta.NetMode))
return errors.Errorf("unknown network mode %s", meta.NetMode)
}
namespace, err := provider.New()
if err != nil {
return sendErr(done, err)
return err
}
defer namespace.Close()
@ -165,12 +165,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
resolvConf, err := oci.GetResolvConf(ctx, w.root, w.idmap, w.dns)
if err != nil {
return sendErr(done, err)
return err
}
hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, w.idmap)
if err != nil {
return sendErr(done, err)
return err
}
if clean != nil {
defer clean()
@ -178,12 +178,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
mountable, err := root.Mount(ctx, false)
if err != nil {
return sendErr(done, err)
return err
}
rootMount, release, err := mountable.Mount()
if err != nil {
return sendErr(done, err)
return err
}
if release != nil {
defer release()
@ -195,7 +195,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
bundle := filepath.Join(w.root, id)
if err := os.Mkdir(bundle, 0711); err != nil {
return sendErr(done, err)
return err
}
defer os.RemoveAll(bundle)
@ -206,21 +206,21 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
rootFSPath := filepath.Join(bundle, "rootfs")
if err := idtools.MkdirAllAndChown(rootFSPath, 0700, identity); err != nil {
return sendErr(done, err)
return err
}
if err := mount.All(rootMount, rootFSPath); err != nil {
return sendErr(done, err)
return err
}
defer mount.Unmount(rootFSPath, 0)
uid, gid, sgids, err := oci.GetUser(ctx, rootFSPath, meta.User)
if err != nil {
return sendErr(done, err)
return err
}
f, err := os.Create(filepath.Join(bundle, "config.json"))
if err != nil {
return sendErr(done, err)
return err
}
defer f.Close()
@ -237,7 +237,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
if w.idmap != nil {
identity, err = w.idmap.ToHost(identity)
if err != nil {
return sendErr(done, err)
return err
}
}
@ -253,7 +253,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
}
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.processMode, w.idmap, opts...)
if err != nil {
return sendErr(done, err)
return err
}
defer cleanup()
@ -264,11 +264,11 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
newp, err := fs.RootPath(rootFSPath, meta.Cwd)
if err != nil {
return sendErr(done, errors.Wrapf(err, "working dir %s points to invalid target", newp))
return errors.Wrapf(err, "working dir %s points to invalid target", newp)
}
if _, err := os.Stat(newp); err != nil {
if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil {
return sendErr(done, errors.Wrapf(err, "failed to create working directory %s", newp))
return errors.Wrapf(err, "failed to create working directory %s", newp)
}
}
@ -276,12 +276,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
spec.Process.OOMScoreAdj = w.oomScoreAdj
if w.rootless {
if err := rootlessspecconv.ToRootless(spec); err != nil {
return sendErr(done, err)
return err
}
}
if err := json.NewEncoder(f).Encode(spec); err != nil {
return sendErr(done, err)
return err
}
// runCtx/killCtx is used for extra check in case the kill command blocks
@ -335,9 +335,9 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
}
select {
case <-ctx.Done():
return sendErr(done, errors.Wrapf(ctx.Err(), err.Error()))
return errors.Wrapf(ctx.Err(), err.Error())
default:
return sendErr(done, stack.Enable(err))
return stack.Enable(err)
}
}
@ -348,6 +348,8 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro
// first verify the container is running, if we get an error assume the container
// is in the process of being created and check again every 100ms or until
// context is canceled.
var state *runc.Container
for {
w.mu.Lock()
done, ok := w.running[id]
w.mu.Unlock()
@ -355,8 +357,7 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro
return errors.Errorf("container %s not found", id)
}
state, _ := w.runc.State(ctx, id)
for {
state, _ = w.runc.State(ctx, id)
if state != nil && state.Status == "running" {
break
}
@ -364,12 +365,11 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro
case <-ctx.Done():
return ctx.Err()
case err, ok := <-done:
if !ok {
if !ok || err == nil {
return errors.Errorf("container %s has stopped", id)
}
return errors.Wrapf(err, "container %s has exited with error", id)
case <-time.After(100 * time.Millisecond):
state, _ = w.runc.State(ctx, id)
}
}
@ -438,8 +438,3 @@ func (s *forwardIO) Stdout() io.ReadCloser {
func (s *forwardIO) Stderr() io.ReadCloser {
return nil
}
func sendErr(c chan error, err error) error {
c <- err
return err
}

View File

@ -4,7 +4,9 @@ import (
"bytes"
"context"
"io"
"io/ioutil"
"testing"
"time"
"github.com/containerd/containerd/namespaces"
"github.com/moby/buildkit/cache"
@ -57,7 +59,11 @@ func TestWorkerExec(t *testing.T, w *base.Worker) {
}, started)
})
<-started
select {
case <-started:
case <-time.After(10 * time.Second):
t.Error("Unexpected timeout waiting for pid1 to start")
}
stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
@ -85,7 +91,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) {
Meta: executor.Meta{
Args: []string{"sh", "-c", "cat > /tmp/msg"},
},
Stdin: &nopReadCloser{stdin},
Stdin: ioutil.NopCloser(stdin),
Stdout: &nopCloser{stdout},
Stderr: &nopCloser{stderr},
})
@ -143,7 +149,11 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) {
}, started)
})
<-started
select {
case <-started:
case <-time.After(10 * time.Second):
t.Error("Unexpected timeout waiting for pid1 to start")
}
// this should fail since pid1 has already exited
err = w.Executor.Exec(ctx, id, executor.ProcessInfo{
@ -167,7 +177,11 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) {
}, started)
})
<-started
select {
case <-started:
case <-time.After(10 * time.Second):
t.Error("Unexpected timeout waiting for pid1 to start")
}
// this should fail since pid1 never started
err = w.Executor.Exec(ctx, id, executor.ProcessInfo{
@ -184,14 +198,6 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) {
require.NoError(t, err)
}
type nopReadCloser struct {
io.Reader
}
func (n *nopReadCloser) Close() error {
return nil
}
type nopCloser struct {
io.Writer
}