Update Executor interface for Run and Exec

Signed-off-by: Cory Bennett <cbennett@netflix.com>
v0.8
Cory Bennett 2020-07-09 23:07:28 +00:00
parent 488130002a
commit 6d58121c11
7 changed files with 358 additions and 16 deletions

View File

@ -2,7 +2,6 @@ package containerdexecutor
import (
"context"
"io"
"os"
"path/filepath"
"strings"
@ -21,6 +20,7 @@ import (
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/network"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -48,8 +48,11 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb
}
}
func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (err error) {
id := identity.NewID()
func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo) (err error) {
if id == "" {
id = identity.NewID()
}
meta := process.Meta
resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig)
if err != nil {
@ -160,7 +163,12 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c
}
}()
task, err := container.NewTask(ctx, cio.NewCreator(cio.WithStreams(stdin, stdout, stderr)), containerd.WithRootFS(rootMounts))
cioOpts := []cio.Opt{cio.WithStreams(process.Stdin, process.Stdout, process.Stderr)}
if meta.Tty {
cioOpts = append(cioOpts, cio.WithTerminal)
}
task, err := container.NewTask(ctx, cio.NewCreator(cioOpts...), containerd.WithRootFS(rootMounts))
if err != nil {
return err
}
@ -209,5 +217,77 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c
return nil
}
}
}
func (w containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
meta := process.Meta
// first verify the container is running, if we get an error assume the container
// is in the process of being created and check again every 100ms or until
// context is canceled.
var container containerd.Container
var task containerd.Task
for {
if container == nil {
container, _ = w.client.LoadContainer(ctx, id)
}
if container != nil && task == nil {
task, _ = container.Task(ctx, nil)
}
if task != nil {
status, _ := task.Status(ctx)
if status.Status == containerd.Running {
break
}
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
continue
}
}
spec, err := container.Spec(ctx)
if err != nil {
return errors.WithStack(err)
}
proc := spec.Process
// TODO how do we get rootfsPath for oci.GetUser in case user passed in username rather than uid:gid?
// For now only support uid:gid
if meta.User != "" {
uid, gid, err := oci.ParseUIDGID(meta.User)
if err != nil {
return errors.WithStack(err)
}
proc.User = specs.User{
UID: uid,
GID: gid,
AdditionalGids: []uint32{},
}
}
proc.Terminal = meta.Tty
proc.Args = meta.Args
if meta.Cwd != "" {
spec.Process.Cwd = meta.Cwd
}
if len(meta.Env) > 0 {
// merge exec env with pid1 env
spec.Process.Env = append(spec.Process.Env, meta.Env...)
}
cioOpts := []cio.Opt{cio.WithStreams(process.Stdin, process.Stdout, process.Stderr)}
if meta.Tty {
cioOpts = append(cioOpts, cio.WithTerminal)
}
taskProcess, err := task.Exec(ctx, identity.NewID(), proc, cio.NewCreator(cioOpts...))
if err != nil {
return errors.WithStack(err)
}
return taskProcess.Start(ctx)
}

View File

@ -28,9 +28,15 @@ type Mount struct {
Readonly bool
}
type ProcessInfo struct {
Meta Meta
Stdin io.ReadCloser
Stdout, Stderr io.WriteCloser
}
type Executor interface {
// TODO: add stdout/err
Exec(ctx context.Context, meta Meta, rootfs cache.Mountable, mounts []Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
Run(ctx context.Context, id string, rootfs cache.Mountable, mounts []Mount, process ProcessInfo) error
Exec(ctx context.Context, id string, process ProcessInfo) error
}
type HostIP struct {

View File

@ -24,6 +24,7 @@ import (
"github.com/moby/buildkit/util/network"
rootlessspecconv "github.com/moby/buildkit/util/rootless/specconv"
"github.com/moby/buildkit/util/stack"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -123,7 +124,8 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex
return w, nil
}
func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo) error {
meta := process.Meta
provider, ok := w.networkProviders[meta.NetMode]
if !ok {
return errors.Errorf("unknown network mode %s", meta.NetMode)
@ -164,7 +166,9 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
defer release()
}
id := identity.NewID()
if id == "" {
id = identity.NewID()
}
bundle := filepath.Join(w.root, id)
if err := os.Mkdir(bundle, 0711); err != nil {
@ -245,6 +249,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
}
}
spec.Process.Terminal = meta.Tty
spec.Process.OOMScoreAdj = w.oomScoreAdj
if w.rootless {
if err := rootlessspecconv.ToRootless(spec); err != nil {
@ -290,7 +295,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
logrus.Debugf("> creating %s %v", id, meta.Args)
status, err := w.runc.Run(runCtx, id, bundle, &runc.CreateOpts{
IO: &forwardIO{stdin: stdin, stdout: stdout, stderr: stderr},
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
NoPivot: w.noPivot,
})
close(done)
@ -310,6 +315,62 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
return nil
}
func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
// first verify the container is running, if we get an error assume the container
// is in the process of being created and check again every 100ms or until
// context is canceled.
state, _ := w.runc.State(ctx, id)
for {
if state != nil && state.Status == "running" {
break
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
state, _ = w.runc.State(ctx, id)
}
}
// load default process spec (for Env, Cwd etc) from bundle
f, err := os.Open(filepath.Join(state.Bundle, "config.json"))
if err != nil {
return errors.WithStack(err)
}
defer f.Close()
spec := &specs.Spec{}
if err := json.NewDecoder(f).Decode(spec); err != nil {
return err
}
if process.Meta.User != "" {
uid, gid, sgids, err := oci.GetUser(ctx, state.Rootfs, process.Meta.User)
if err != nil {
return err
}
spec.Process.User = specs.User{
UID: uid,
GID: gid,
AdditionalGids: sgids,
}
}
spec.Process.Terminal = process.Meta.Tty
spec.Process.Args = process.Meta.Args
if process.Meta.Cwd != "" {
spec.Process.Cwd = process.Meta.Cwd
}
if len(process.Meta.Env) > 0 {
// merge exec env with pid1 env
spec.Process.Env = append(spec.Process.Env, process.Meta.Env...)
}
return w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
})
}
type forwardIO struct {
stdin io.ReadCloser
stdout, stderr io.WriteCloser

View File

@ -381,6 +381,7 @@ func (sm *sshMountInstance) Mount() ([]mount.Mount, func() error, error) {
GID: gid,
})
if err != nil {
cancel()
return nil, nil, err
}
uid = identity.UID
@ -731,7 +732,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
defer stdout.Close()
defer stderr.Close()
if err := e.exec.Exec(ctx, meta, root, mounts, nil, stdout, stderr); err != nil {
if err := e.exec.Run(ctx, "", root, mounts, executor.ProcessInfo{Meta: meta, Stdin: nil, Stdout: stdout, Stderr: stderr}); err != nil {
return nil, errors.Wrapf(err, "executor failed running %v", meta.Args)
}

View File

@ -297,7 +297,7 @@ func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.Immu
return err
}
defer active.Release(context.TODO())
return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr)
return w.Executor.Run(ctx, "", active, nil, executor.ProcessInfo{Meta: meta, Stdin: stdin, Stdout: stdout, Stderr: stderr})
}
func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {

View File

@ -0,0 +1,131 @@
// +build linux,!no_containerd_worker
package containerd
import (
"bytes"
"context"
"io"
"io/ioutil"
"os"
"testing"
"github.com/containerd/containerd/namespaces"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/network/netproviders"
"github.com/moby/buildkit/worker/base"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
const sockFile = "/run/containerd/containerd.sock"
const ns = "buildkit-test"
func newWorkerOpt(t *testing.T) (base.WorkerOpt, func()) {
tmpdir, err := ioutil.TempDir("", "workertest")
require.NoError(t, err)
cleanup := func() { os.RemoveAll(tmpdir) }
workerOpt, err := NewWorkerOpt(tmpdir, sockFile, "overlayfs", ns, nil, nil, netproviders.Opt{Mode: "host"})
require.NoError(t, err)
return workerOpt, cleanup
}
func checkRequirement(t *testing.T) {
if os.Getuid() != 0 {
t.Skip("requires root")
}
fi, err := os.Stat(sockFile)
if err != nil {
t.Skipf("Failed to stat %s: %s", sockFile, err.Error())
}
if fi.Mode()&os.ModeSocket == 0 {
t.Skipf("%s is not a unix domain socket", sockFile)
}
}
func newCtx(s string) context.Context {
return namespaces.WithNamespace(context.Background(), s)
}
func newBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker, sm *session.Manager) cache.ImmutableRef {
img, err := source.NewImageIdentifier("docker.io/library/busybox:latest")
require.NoError(t, err)
src, err := w.SourceManager.Resolve(ctx, img, sm)
require.NoError(t, err)
snap, err := src.Snapshot(ctx, nil)
require.NoError(t, err)
return snap
}
func TestContainerdWorkerExec(t *testing.T) {
t.Parallel()
checkRequirement(t)
workerOpt, cleanupWorkerOpt := newWorkerOpt(t)
defer cleanupWorkerOpt()
w, err := base.NewWorker(workerOpt)
require.NoError(t, err)
ctx := newCtx(ns)
ctx, cancel := context.WithCancel(ctx)
sm, err := session.NewManager()
require.NoError(t, err)
snap := newBusyboxSourceSnapshot(ctx, t, w, sm)
root, err := w.CacheManager.New(ctx, snap)
require.NoError(t, err)
id := identity.NewID()
// first start pid1 in the background
eg := errgroup.Group{}
eg.Go(func() error {
return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"sleep", "10"},
Cwd: "/",
Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"},
},
})
})
stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
err = w.Executor.Exec(ctx, id, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"ps", "-o", "pid,comm"},
},
Stdout: &nopCloser{stdout},
Stderr: &nopCloser{stderr},
})
t.Logf("Stdout: %s", stdout.String())
t.Logf("Stderr: %s", stderr.String())
require.NoError(t, err)
// verify pid1 is sleep
require.Contains(t, stdout.String(), "1 sleep")
require.Empty(t, stderr.String())
// stop pid1
cancel()
err = eg.Wait()
// we expect this to get canceled after we test the exec
require.EqualError(t, errors.Cause(err), "context canceled")
err = snap.Release(ctx)
require.NoError(t, err)
}
type nopCloser struct {
io.Writer
}
func (n *nopCloser) Close() error {
return nil
}

View File

@ -20,12 +20,15 @@ import (
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/executor/oci"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/network/netproviders"
"github.com/moby/buildkit/worker/base"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func newWorkerOpt(t *testing.T, processMode oci.ProcessMode) (base.WorkerOpt, func()) {
@ -124,7 +127,7 @@ func TestRuncWorker(t *testing.T) {
}
stderr := bytes.NewBuffer(nil)
err = w.Executor.Exec(ctx, meta, snap, nil, nil, nil, &nopCloser{stderr})
err = w.Executor.Run(ctx, "", snap, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}})
require.Error(t, err) // Read-only root
// typical error is like `mkdir /.../rootfs/proc: read-only file system`.
// make sure the error is caused before running `echo foo > /bar`.
@ -133,7 +136,7 @@ func TestRuncWorker(t *testing.T) {
root, err := w.CacheManager.New(ctx, snap)
require.NoError(t, err)
err = w.Executor.Exec(ctx, meta, root, nil, nil, nil, &nopCloser{stderr})
err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}})
require.NoError(t, err)
meta = executor.Meta{
@ -141,7 +144,7 @@ func TestRuncWorker(t *testing.T) {
Cwd: "/",
}
err = w.Executor.Exec(ctx, meta, root, nil, nil, nil, &nopCloser{stderr})
err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}})
require.NoError(t, err)
rf, err := root.Commit(ctx)
@ -202,11 +205,71 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) {
}
stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
err = w.Executor.Exec(ctx, meta, root, nil, nil, &nopCloser{stdout}, &nopCloser{stderr})
err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}})
require.NoError(t, err, fmt.Sprintf("stdout=%q, stderr=%q", stdout.String(), stderr.String()))
require.Equal(t, string(selfCmdline), stdout.String())
}
func TestRuncWorkerExec(t *testing.T) {
t.Parallel()
checkRequirement(t)
workerOpt, cleanupWorkerOpt := newWorkerOpt(t, oci.ProcessSandbox)
defer cleanupWorkerOpt()
w, err := base.NewWorker(workerOpt)
require.NoError(t, err)
ctx := newCtx("buildkit-test")
ctx, cancel := context.WithCancel(ctx)
sm, err := session.NewManager()
require.NoError(t, err)
snap := newBusyboxSourceSnapshot(ctx, t, w, sm)
root, err := w.CacheManager.New(ctx, snap)
require.NoError(t, err)
id := identity.NewID()
// first start pid1 in the background
eg := errgroup.Group{}
eg.Go(func() error {
return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"sleep", "10"},
Cwd: "/",
Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"},
},
})
})
stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
err = w.Executor.Exec(ctx, id, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"ps", "-o", "pid,comm"},
},
Stdout: &nopCloser{stdout},
Stderr: &nopCloser{stderr},
})
t.Logf("Stdout: %s", stdout.String())
t.Logf("Stderr: %s", stderr.String())
require.NoError(t, err)
// verify pid1 is sleep
require.Contains(t, stdout.String(), "1 sleep")
require.Empty(t, stderr.String())
// stop pid1
cancel()
err = eg.Wait()
// we expect pid1 to get canceled after we test the exec
require.EqualError(t, errors.Cause(err), "context canceled")
err = snap.Release(ctx)
require.NoError(t, err)
}
type nopCloser struct {
io.Writer
}