worker: fix containerd cancellation and cleanup
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-18.09
parent
e15cbb0b3a
commit
61573b1868
|
@ -2,6 +2,8 @@ package containerdworker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd"
|
"github.com/containerd/containerd"
|
||||||
"github.com/containerd/containerd/cio"
|
"github.com/containerd/containerd/cio"
|
||||||
|
@ -23,7 +25,7 @@ func New(client *containerd.Client) worker.Worker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w containerdWorker) Exec(ctx context.Context, meta worker.Meta, root cache.Mountable, mounts []worker.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
|
func (w containerdWorker) Exec(ctx context.Context, meta worker.Meta, root cache.Mountable, mounts []worker.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (err error) {
|
||||||
id := identity.NewID()
|
id := identity.NewID()
|
||||||
|
|
||||||
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id)
|
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id)
|
||||||
|
@ -43,7 +45,12 @@ func (w containerdWorker) Exec(ctx context.Context, meta worker.Meta, root cache
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer container.Delete(ctx)
|
|
||||||
|
defer func() {
|
||||||
|
if err1 := container.Delete(context.TODO()); err == nil && err1 != nil {
|
||||||
|
err = errors.Wrapf(err1, "failed to delete container %s", id)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
if stdin == nil {
|
if stdin == nil {
|
||||||
stdin = &emptyReadCloser{}
|
stdin = &emptyReadCloser{}
|
||||||
|
@ -53,26 +60,40 @@ func (w containerdWorker) Exec(ctx context.Context, meta worker.Meta, root cache
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer task.Delete(ctx)
|
defer func() {
|
||||||
|
if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil {
|
||||||
|
err = errors.Wrapf(err1, "failed to delete task %s", id)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// TODO: Configure bridge networking
|
// TODO: Configure bridge networking
|
||||||
|
|
||||||
// TODO: support sending signals
|
|
||||||
|
|
||||||
if err := task.Start(ctx); err != nil {
|
if err := task.Start(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
statusCh, err := task.Wait(ctx)
|
statusCh, err := task.Wait(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
status := <-statusCh
|
|
||||||
|
killCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
|
||||||
|
ctxDone := ctx.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctxDone:
|
||||||
|
ctxDone = nil
|
||||||
|
task.Kill(killCtx, syscall.SIGKILL)
|
||||||
|
case status := <-statusCh:
|
||||||
|
cancel()
|
||||||
if status.ExitCode() != 0 {
|
if status.ExitCode() != 0 {
|
||||||
return errors.Errorf("process returned non-zero exit code: %d", status.ExitCode())
|
return errors.Errorf("process returned non-zero exit code: %d", status.ExitCode())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type emptyReadCloser struct{}
|
type emptyReadCloser struct{}
|
||||||
|
|
Loading…
Reference in New Issue