Merge pull request #1563 from coryb/executor-exec-interface

Update Executor interface for Run and Exec
v0.8
Edgar Lee 2020-07-13 15:10:26 -07:00 committed by GitHub
commit 721eb7e3c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 553 additions and 43 deletions

View File

@ -2,10 +2,10 @@ package containerdexecutor
import ( import (
"context" "context"
"io"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
@ -21,6 +21,7 @@ import (
"github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/network" "github.com/moby/buildkit/util/network"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -31,6 +32,8 @@ type containerdExecutor struct {
networkProviders map[pb.NetMode]network.Provider networkProviders map[pb.NetMode]network.Provider
cgroupParent string cgroupParent string
dnsConfig *oci.DNSConfig dnsConfig *oci.DNSConfig
running map[string]chan error
mu sync.Mutex
} }
// New creates a new executor backed by connection to containerd API // New creates a new executor backed by connection to containerd API
@ -39,17 +42,40 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb
os.RemoveAll(filepath.Join(root, "hosts")) os.RemoveAll(filepath.Join(root, "hosts"))
os.RemoveAll(filepath.Join(root, "resolv.conf")) os.RemoveAll(filepath.Join(root, "resolv.conf"))
return containerdExecutor{ return &containerdExecutor{
client: client, client: client,
root: root, root: root,
networkProviders: networkProviders, networkProviders: networkProviders,
cgroupParent: cgroup, cgroupParent: cgroup,
dnsConfig: dnsConfig, dnsConfig: dnsConfig,
running: make(map[string]chan error),
} }
} }
func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (err error) { func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
id := identity.NewID() if id == "" {
id = identity.NewID()
}
startedOnce := sync.Once{}
done := make(chan error, 1)
w.mu.Lock()
w.running[id] = done
w.mu.Unlock()
defer func() {
w.mu.Lock()
delete(w.running, id)
w.mu.Unlock()
done <- err
close(done)
if started != nil {
startedOnce.Do(func() {
close(started)
})
}
}()
meta := process.Meta
resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig) resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig)
if err != nil { if err != nil {
@ -160,7 +186,12 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c
} }
}() }()
task, err := container.NewTask(ctx, cio.NewCreator(cio.WithStreams(stdin, stdout, stderr)), containerd.WithRootFS(rootMounts)) cioOpts := []cio.Opt{cio.WithStreams(process.Stdin, process.Stdout, process.Stderr)}
if meta.Tty {
cioOpts = append(cioOpts, cio.WithTerminal)
}
task, err := container.NewTask(ctx, cio.NewCreator(cioOpts...), containerd.WithRootFS(rootMounts))
if err != nil { if err != nil {
return err return err
} }
@ -174,6 +205,11 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c
return err return err
} }
if started != nil {
startedOnce.Do(func() {
close(started)
})
}
statusCh, err := task.Wait(context.Background()) statusCh, err := task.Wait(context.Background())
if err != nil { if err != nil {
return err return err
@ -209,5 +245,89 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c
return nil return nil
} }
} }
}
func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
meta := process.Meta
// first verify the container is running, if we get an error assume the container
// is in the process of being created and check again every 100ms or until
// context is canceled.
var container containerd.Container
var task containerd.Task
for {
w.mu.Lock()
done, ok := w.running[id]
w.mu.Unlock()
if !ok {
return errors.Errorf("container %s not found", id)
}
if container == nil {
container, _ = w.client.LoadContainer(ctx, id)
}
if container != nil && task == nil {
task, _ = container.Task(ctx, nil)
}
if task != nil {
status, _ := task.Status(ctx)
if status.Status == containerd.Running {
break
}
}
select {
case <-ctx.Done():
return ctx.Err()
case err, ok := <-done:
if !ok || err == nil {
return errors.Errorf("container %s has stopped", id)
}
return errors.Wrapf(err, "container %s has exited with error", id)
case <-time.After(100 * time.Millisecond):
continue
}
}
spec, err := container.Spec(ctx)
if err != nil {
return errors.WithStack(err)
}
proc := spec.Process
// TODO how do we get rootfsPath for oci.GetUser in case user passed in username rather than uid:gid?
// For now only support uid:gid
if meta.User != "" {
uid, gid, err := oci.ParseUIDGID(meta.User)
if err != nil {
return errors.WithStack(err)
}
proc.User = specs.User{
UID: uid,
GID: gid,
AdditionalGids: []uint32{},
}
}
proc.Terminal = meta.Tty
proc.Args = meta.Args
if meta.Cwd != "" {
spec.Process.Cwd = meta.Cwd
}
if len(process.Meta.Env) > 0 {
spec.Process.Env = process.Meta.Env
}
cioOpts := []cio.Opt{cio.WithStreams(process.Stdin, process.Stdout, process.Stderr)}
if meta.Tty {
cioOpts = append(cioOpts, cio.WithTerminal)
}
taskProcess, err := task.Exec(ctx, identity.NewID(), proc, cio.NewCreator(cioOpts...))
if err != nil {
return errors.WithStack(err)
}
return taskProcess.Start(ctx)
} }

View File

@ -28,9 +28,20 @@ type Mount struct {
Readonly bool Readonly bool
} }
type ProcessInfo struct {
Meta Meta
Stdin io.ReadCloser
Stdout, Stderr io.WriteCloser
}
type Executor interface { type Executor interface {
// TODO: add stdout/err // Run will start a container for the given process with rootfs, mounts.
Exec(ctx context.Context, meta Meta, rootfs cache.Mountable, mounts []Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error // `id` is an optional name for the container so it can be referenced later via Exec.
// `started` is an optional channel that will be closed when the container setup completes and has started running.
Run(ctx context.Context, id string, rootfs cache.Mountable, mounts []Mount, process ProcessInfo, started chan<- struct{}) error
// Exec will start a process in container matching `id`. An error will be returned
// if the container failed to start (via Run) or has exited before Exec is called.
Exec(ctx context.Context, id string, process ProcessInfo) error
} }
type HostIP struct { type HostIP struct {

View File

@ -8,6 +8,7 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
@ -24,6 +25,7 @@ import (
"github.com/moby/buildkit/util/network" "github.com/moby/buildkit/util/network"
rootlessspecconv "github.com/moby/buildkit/util/rootless/specconv" rootlessspecconv "github.com/moby/buildkit/util/rootless/specconv"
"github.com/moby/buildkit/util/stack" "github.com/moby/buildkit/util/stack"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -59,6 +61,8 @@ type runcExecutor struct {
noPivot bool noPivot bool
dns *oci.DNSConfig dns *oci.DNSConfig
oomScoreAdj *int oomScoreAdj *int
running map[string]chan error
mu sync.Mutex
} }
func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Executor, error) { func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Executor, error) {
@ -119,11 +123,32 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex
noPivot: opt.NoPivot, noPivot: opt.NoPivot,
dns: opt.DNS, dns: opt.DNS,
oomScoreAdj: opt.OOMScoreAdj, oomScoreAdj: opt.OOMScoreAdj,
running: make(map[string]chan error),
} }
return w, nil return w, nil
} }
func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
meta := process.Meta
startedOnce := sync.Once{}
done := make(chan error, 1)
w.mu.Lock()
w.running[id] = done
w.mu.Unlock()
defer func() {
w.mu.Lock()
delete(w.running, id)
w.mu.Unlock()
done <- err
close(done)
if started != nil {
startedOnce.Do(func() {
close(started)
})
}
}()
provider, ok := w.networkProviders[meta.NetMode] provider, ok := w.networkProviders[meta.NetMode]
if !ok { if !ok {
return errors.Errorf("unknown network mode %s", meta.NetMode) return errors.Errorf("unknown network mode %s", meta.NetMode)
@ -164,7 +189,9 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
defer release() defer release()
} }
id := identity.NewID() if id == "" {
id = identity.NewID()
}
bundle := filepath.Join(w.root, id) bundle := filepath.Join(w.root, id)
if err := os.Mkdir(bundle, 0711); err != nil { if err := os.Mkdir(bundle, 0711); err != nil {
@ -245,6 +272,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
} }
} }
spec.Process.Terminal = meta.Tty
spec.Process.OOMScoreAdj = w.oomScoreAdj spec.Process.OOMScoreAdj = w.oomScoreAdj
if w.rootless { if w.rootless {
if err := rootlessspecconv.ToRootless(spec); err != nil { if err := rootlessspecconv.ToRootless(spec); err != nil {
@ -260,7 +288,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
runCtx, cancelRun := context.WithCancel(context.Background()) runCtx, cancelRun := context.WithCancel(context.Background())
defer cancelRun() defer cancelRun()
done := make(chan struct{}) ended := make(chan struct{})
go func() { go func() {
for { for {
select { select {
@ -279,21 +307,27 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
timeout() timeout()
select { select {
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
case <-done: case <-ended:
return return
} }
case <-done: case <-ended:
return return
} }
} }
}() }()
logrus.Debugf("> creating %s %v", id, meta.Args) logrus.Debugf("> creating %s %v", id, meta.Args)
// this is a cheat, we have not actually started, but as close as we can get with runc for now
if started != nil {
startedOnce.Do(func() {
close(started)
})
}
status, err := w.runc.Run(runCtx, id, bundle, &runc.CreateOpts{ status, err := w.runc.Run(runCtx, id, bundle, &runc.CreateOpts{
IO: &forwardIO{stdin: stdin, stdout: stdout, stderr: stderr}, IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
NoPivot: w.noPivot, NoPivot: w.noPivot,
}) })
close(done) close(ended)
if status != 0 || err != nil { if status != 0 || err != nil {
if err == nil { if err == nil {
@ -310,6 +344,74 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
return nil return nil
} }
func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
// first verify the container is running, if we get an error assume the container
// is in the process of being created and check again every 100ms or until
// context is canceled.
var state *runc.Container
for {
w.mu.Lock()
done, ok := w.running[id]
w.mu.Unlock()
if !ok {
return errors.Errorf("container %s not found", id)
}
state, _ = w.runc.State(ctx, id)
if state != nil && state.Status == "running" {
break
}
select {
case <-ctx.Done():
return ctx.Err()
case err, ok := <-done:
if !ok || err == nil {
return errors.Errorf("container %s has stopped", id)
}
return errors.Wrapf(err, "container %s has exited with error", id)
case <-time.After(100 * time.Millisecond):
}
}
// load default process spec (for Env, Cwd etc) from bundle
f, err := os.Open(filepath.Join(state.Bundle, "config.json"))
if err != nil {
return errors.WithStack(err)
}
defer f.Close()
spec := &specs.Spec{}
if err := json.NewDecoder(f).Decode(spec); err != nil {
return err
}
if process.Meta.User != "" {
uid, gid, sgids, err := oci.GetUser(ctx, state.Rootfs, process.Meta.User)
if err != nil {
return err
}
spec.Process.User = specs.User{
UID: uid,
GID: gid,
AdditionalGids: sgids,
}
}
spec.Process.Terminal = process.Meta.Tty
spec.Process.Args = process.Meta.Args
if process.Meta.Cwd != "" {
spec.Process.Cwd = process.Meta.Cwd
}
if len(process.Meta.Env) > 0 {
spec.Process.Env = process.Meta.Env
}
return w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
})
}
type forwardIO struct { type forwardIO struct {
stdin io.ReadCloser stdin io.ReadCloser
stdout, stderr io.WriteCloser stdout, stderr io.WriteCloser

View File

@ -381,6 +381,7 @@ func (sm *sshMountInstance) Mount() ([]mount.Mount, func() error, error) {
GID: gid, GID: gid,
}) })
if err != nil { if err != nil {
cancel()
return nil, nil, err return nil, nil, err
} }
uid = identity.UID uid = identity.UID
@ -731,7 +732,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
defer stdout.Close() defer stdout.Close()
defer stderr.Close() defer stderr.Close()
if err := e.exec.Exec(ctx, meta, root, mounts, nil, stdout, stderr); err != nil { if err := e.exec.Run(ctx, "", root, mounts, executor.ProcessInfo{Meta: meta, Stdin: nil, Stdout: stdout, Stderr: stderr}, nil); err != nil {
return nil, errors.Wrapf(err, "executor failed running %v", meta.Args) return nil, errors.Wrapf(err, "executor failed running %v", meta.Args)
} }

View File

@ -297,7 +297,7 @@ func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.Immu
return err return err
} }
defer active.Release(context.TODO()) defer active.Release(context.TODO())
return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr) return w.Executor.Run(ctx, "", active, nil, executor.ProcessInfo{Meta: meta, Stdin: stdin, Stdout: stdout, Stderr: stderr}, nil)
} }
func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) { func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {

View File

@ -0,0 +1,62 @@
// +build linux,!no_containerd_worker
package containerd
import (
"io/ioutil"
"os"
"testing"
"github.com/moby/buildkit/util/network/netproviders"
"github.com/moby/buildkit/worker/base"
"github.com/moby/buildkit/worker/tests"
"github.com/stretchr/testify/require"
)
const sockFile = "/run/containerd/containerd.sock"
func newWorkerOpt(t *testing.T) (base.WorkerOpt, func()) {
tmpdir, err := ioutil.TempDir("", "workertest")
require.NoError(t, err)
cleanup := func() { os.RemoveAll(tmpdir) }
workerOpt, err := NewWorkerOpt(tmpdir, sockFile, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"})
require.NoError(t, err)
return workerOpt, cleanup
}
func checkRequirement(t *testing.T) {
if os.Getuid() != 0 {
t.Skip("requires root")
}
fi, err := os.Stat(sockFile)
if err != nil {
t.Skipf("Failed to stat %s: %s", sockFile, err.Error())
}
if fi.Mode()&os.ModeSocket == 0 {
t.Skipf("%s is not a unix domain socket", sockFile)
}
}
func TestContainerdWorkerExec(t *testing.T) {
t.Parallel()
checkRequirement(t)
workerOpt, cleanupWorkerOpt := newWorkerOpt(t)
defer cleanupWorkerOpt()
w, err := base.NewWorker(workerOpt)
require.NoError(t, err)
tests.TestWorkerExec(t, w)
}
func TestContainerdWorkerExecFailures(t *testing.T) {
t.Parallel()
checkRequirement(t)
workerOpt, cleanupWorkerOpt := newWorkerOpt(t)
defer cleanupWorkerOpt()
w, err := base.NewWorker(workerOpt)
require.NoError(t, err)
tests.TestWorkerExecFailures(t, w)
}

View File

@ -4,7 +4,6 @@ package runc
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -13,18 +12,16 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/containerd/containerd/namespaces"
ctdsnapshot "github.com/containerd/containerd/snapshots" ctdsnapshot "github.com/containerd/containerd/snapshots"
"github.com/containerd/containerd/snapshots/overlay" "github.com/containerd/containerd/snapshots/overlay"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
"github.com/moby/buildkit/executor" "github.com/moby/buildkit/executor"
"github.com/moby/buildkit/executor/oci" "github.com/moby/buildkit/executor/oci"
"github.com/moby/buildkit/session" "github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/network/netproviders" "github.com/moby/buildkit/util/network/netproviders"
"github.com/moby/buildkit/worker/base" "github.com/moby/buildkit/worker/base"
"github.com/moby/buildkit/worker/tests"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -58,20 +55,6 @@ func checkRequirement(t *testing.T) {
} }
} }
func newCtx(s string) context.Context {
return namespaces.WithNamespace(context.Background(), s)
}
func newBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker, sm *session.Manager) cache.ImmutableRef {
img, err := source.NewImageIdentifier("docker.io/library/busybox:latest")
require.NoError(t, err)
src, err := w.SourceManager.Resolve(ctx, img, sm)
require.NoError(t, err)
snap, err := src.Snapshot(ctx, nil)
require.NoError(t, err)
return snap
}
func TestRuncWorker(t *testing.T) { func TestRuncWorker(t *testing.T) {
t.Parallel() t.Parallel()
checkRequirement(t) checkRequirement(t)
@ -81,10 +64,10 @@ func TestRuncWorker(t *testing.T) {
w, err := base.NewWorker(workerOpt) w, err := base.NewWorker(workerOpt)
require.NoError(t, err) require.NoError(t, err)
ctx := newCtx("buildkit-test") ctx := tests.NewCtx("buildkit-test")
sm, err := session.NewManager() sm, err := session.NewManager()
require.NoError(t, err) require.NoError(t, err)
snap := newBusyboxSourceSnapshot(ctx, t, w, sm) snap := tests.NewBusyboxSourceSnapshot(ctx, t, w, sm)
mounts, err := snap.Mount(ctx, false) mounts, err := snap.Mount(ctx, false)
require.NoError(t, err) require.NoError(t, err)
@ -124,7 +107,7 @@ func TestRuncWorker(t *testing.T) {
} }
stderr := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil)
err = w.Executor.Exec(ctx, meta, snap, nil, nil, nil, &nopCloser{stderr}) err = w.Executor.Run(ctx, "", snap, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil)
require.Error(t, err) // Read-only root require.Error(t, err) // Read-only root
// typical error is like `mkdir /.../rootfs/proc: read-only file system`. // typical error is like `mkdir /.../rootfs/proc: read-only file system`.
// make sure the error is caused before running `echo foo > /bar`. // make sure the error is caused before running `echo foo > /bar`.
@ -133,7 +116,7 @@ func TestRuncWorker(t *testing.T) {
root, err := w.CacheManager.New(ctx, snap) root, err := w.CacheManager.New(ctx, snap)
require.NoError(t, err) require.NoError(t, err)
err = w.Executor.Exec(ctx, meta, root, nil, nil, nil, &nopCloser{stderr}) err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil)
require.NoError(t, err) require.NoError(t, err)
meta = executor.Meta{ meta = executor.Meta{
@ -141,7 +124,7 @@ func TestRuncWorker(t *testing.T) {
Cwd: "/", Cwd: "/",
} }
err = w.Executor.Exec(ctx, meta, root, nil, nil, nil, &nopCloser{stderr}) err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil)
require.NoError(t, err) require.NoError(t, err)
rf, err := root.Commit(ctx) rf, err := root.Commit(ctx)
@ -185,10 +168,10 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) {
w, err := base.NewWorker(workerOpt) w, err := base.NewWorker(workerOpt)
require.NoError(t, err) require.NoError(t, err)
ctx := newCtx("buildkit-test") ctx := tests.NewCtx("buildkit-test")
sm, err := session.NewManager() sm, err := session.NewManager()
require.NoError(t, err) require.NoError(t, err)
snap := newBusyboxSourceSnapshot(ctx, t, w, sm) snap := tests.NewBusyboxSourceSnapshot(ctx, t, w, sm)
root, err := w.CacheManager.New(ctx, snap) root, err := w.CacheManager.New(ctx, snap)
require.NoError(t, err) require.NoError(t, err)
@ -202,11 +185,35 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) {
} }
stdout := bytes.NewBuffer(nil) stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil)
err = w.Executor.Exec(ctx, meta, root, nil, nil, &nopCloser{stdout}, &nopCloser{stderr}) err = w.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}, nil)
require.NoError(t, err, fmt.Sprintf("stdout=%q, stderr=%q", stdout.String(), stderr.String())) require.NoError(t, err, fmt.Sprintf("stdout=%q, stderr=%q", stdout.String(), stderr.String()))
require.Equal(t, string(selfCmdline), stdout.String()) require.Equal(t, string(selfCmdline), stdout.String())
} }
func TestRuncWorkerExec(t *testing.T) {
t.Parallel()
checkRequirement(t)
workerOpt, cleanupWorkerOpt := newWorkerOpt(t, oci.ProcessSandbox)
defer cleanupWorkerOpt()
w, err := base.NewWorker(workerOpt)
require.NoError(t, err)
tests.TestWorkerExec(t, w)
}
func TestRuncWorkerExecFailures(t *testing.T) {
t.Parallel()
checkRequirement(t)
workerOpt, cleanupWorkerOpt := newWorkerOpt(t, oci.ProcessSandbox)
defer cleanupWorkerOpt()
w, err := base.NewWorker(workerOpt)
require.NoError(t, err)
tests.TestWorkerExecFailures(t, w)
}
type nopCloser struct { type nopCloser struct {
io.Writer io.Writer
} }

207
worker/tests/common.go Normal file
View File

@ -0,0 +1,207 @@
package tests
import (
"bytes"
"context"
"io"
"io/ioutil"
"testing"
"time"
"github.com/containerd/containerd/namespaces"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/worker/base"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func NewBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker, sm *session.Manager) cache.ImmutableRef {
img, err := source.NewImageIdentifier("docker.io/library/busybox:latest")
require.NoError(t, err)
src, err := w.SourceManager.Resolve(ctx, img, sm)
require.NoError(t, err)
snap, err := src.Snapshot(ctx, nil)
require.NoError(t, err)
return snap
}
func NewCtx(s string) context.Context {
return namespaces.WithNamespace(context.Background(), s)
}
func TestWorkerExec(t *testing.T, w *base.Worker) {
ctx := NewCtx("buildkit-test")
ctx, cancel := context.WithCancel(ctx)
sm, err := session.NewManager()
require.NoError(t, err)
snap := NewBusyboxSourceSnapshot(ctx, t, w, sm)
root, err := w.CacheManager.New(ctx, snap)
require.NoError(t, err)
id := identity.NewID()
// first start pid1 in the background
eg := errgroup.Group{}
started := make(chan struct{})
eg.Go(func() error {
return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"sleep", "10"},
Cwd: "/",
Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"},
},
}, started)
})
select {
case <-started:
case <-time.After(10 * time.Second):
t.Error("Unexpected timeout waiting for pid1 to start")
}
stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
// verify pid1 is the sleep command via Exec
err = w.Executor.Exec(ctx, id, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"ps", "-o", "pid,comm"},
},
Stdout: &nopCloser{stdout},
Stderr: &nopCloser{stderr},
})
t.Logf("Stdout: %s", stdout.String())
t.Logf("Stderr: %s", stderr.String())
require.NoError(t, err)
// verify pid1 is sleep
require.Contains(t, stdout.String(), "1 sleep")
require.Empty(t, stderr.String())
// simulate: echo -n "hello" | cat > /tmp/msg
stdin := bytes.NewReader([]byte("hello"))
stdout.Reset()
stderr.Reset()
err = w.Executor.Exec(ctx, id, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"sh", "-c", "cat > /tmp/msg"},
},
Stdin: ioutil.NopCloser(stdin),
Stdout: &nopCloser{stdout},
Stderr: &nopCloser{stderr},
})
require.NoError(t, err)
require.Empty(t, stdout.String())
require.Empty(t, stderr.String())
// verify contents of /tmp/msg
stdout.Reset()
stderr.Reset()
err = w.Executor.Exec(ctx, id, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"cat", "/tmp/msg"},
},
Stdout: &nopCloser{stdout},
Stderr: &nopCloser{stderr},
})
t.Logf("Stdout: %s", stdout.String())
t.Logf("Stderr: %s", stderr.String())
require.NoError(t, err)
require.Equal(t, "hello", stdout.String())
require.Empty(t, stderr.String())
// stop pid1
cancel()
err = eg.Wait()
// we expect pid1 to get canceled after we test the exec
require.EqualError(t, errors.Cause(err), context.Canceled.Error())
err = snap.Release(ctx)
require.NoError(t, err)
}
func TestWorkerExecFailures(t *testing.T, w *base.Worker) {
ctx := NewCtx("buildkit-test")
sm, err := session.NewManager()
require.NoError(t, err)
snap := NewBusyboxSourceSnapshot(ctx, t, w, sm)
root, err := w.CacheManager.New(ctx, snap)
require.NoError(t, err)
id := identity.NewID()
// pid1 will start but only long enough for /bin/false to run
eg := errgroup.Group{}
started := make(chan struct{})
eg.Go(func() error {
return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"/bin/false"},
Cwd: "/",
},
}, started)
})
select {
case <-started:
case <-time.After(10 * time.Second):
t.Error("Unexpected timeout waiting for pid1 to start")
}
// this should fail since pid1 has already exited
err = w.Executor.Exec(ctx, id, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"/bin/true"},
},
})
require.Error(t, err) // pid1 no longer running
err = eg.Wait()
require.Error(t, err) // process returned non-zero exit code: 1
// pid1 will not start, bogus pid1 command
eg = errgroup.Group{}
started = make(chan struct{})
eg.Go(func() error {
return w.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"bogus"},
},
}, started)
})
select {
case <-started:
case <-time.After(10 * time.Second):
t.Error("Unexpected timeout waiting for pid1 to start")
}
// this should fail since pid1 never started
err = w.Executor.Exec(ctx, id, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"/bin/true"},
},
})
require.Error(t, err) // container has exited with error
err = eg.Wait()
require.Error(t, err) // pid1 did not terminate successfully
err = snap.Release(ctx)
require.NoError(t, err)
}
type nopCloser struct {
io.Writer
}
func (n *nopCloser) Close() error {
return nil
}