Merge pull request #1731 from coryb/issue-1714

add tty support for runc executor
v0.8
Edgar Lee 2020-10-19 14:52:31 -07:00 committed by GitHub
commit 5eaecb905c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 514 additions and 30 deletions

View File

@ -3,10 +3,12 @@ package client
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"
@ -37,6 +39,8 @@ func TestClientGatewayIntegration(t *testing.T) {
testClientGatewayContainerPID1Fail,
testClientGatewayContainerPID1Exit,
testClientGatewayContainerMounts,
testClientGatewayContainerPID1Tty,
testClientGatewayContainerExecTty,
}, integration.WithMirroredImages(integration.OfficialImages("busybox:latest")))
}
@ -718,6 +722,234 @@ func testClientGatewayContainerMounts(t *testing.T, sb integration.Sandbox) {
checkAllReleasable(t, c, sb, true)
}
// testClientGatewayContainerPID1Tty is testing that we can get a tty via
// a container pid1, executor.Run
func testClientGatewayContainerPID1Tty(t *testing.T, sb integration.Sandbox) {
requiresLinux(t)
ctx := context.TODO()
c, err := New(ctx, sb.Address())
require.NoError(t, err)
defer c.Close()
product := "buildkit_test"
inputR, inputW := io.Pipe()
output := bytes.NewBuffer(nil)
b := func(ctx context.Context, c client.Client) (*client.Result, error) {
ctx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()
st := llb.Image("busybox:latest")
def, err := st.Marshal(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal state")
}
r, err := c.Solve(ctx, client.SolveRequest{
Definition: def.ToPB(),
})
if err != nil {
return nil, errors.Wrap(err, "failed to solve")
}
ctr, err := c.NewContainer(ctx, client.NewContainerRequest{
Mounts: []client.Mount{{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: r.Ref,
}},
})
require.NoError(t, err)
defer ctr.Release(ctx)
prompt := newTestPrompt(ctx, t, inputW, output)
pid1, err := ctr.Start(ctx, client.StartRequest{
Args: []string{"sh"},
Tty: true,
Stdin: inputR,
Stdout: &nopCloser{output},
Stderr: &nopCloser{output},
Env: []string{fmt.Sprintf("PS1=%s", prompt.String())},
})
require.NoError(t, err)
err = pid1.Resize(ctx, client.WinSize{Rows: 40, Cols: 80})
require.NoError(t, err)
prompt.SendExpect("ttysize", "80 40")
prompt.Send("cd /tmp")
prompt.SendExpect("pwd", "/tmp")
prompt.Send("echo foobar > newfile")
prompt.SendExpect("cat /tmp/newfile", "foobar")
err = pid1.Resize(ctx, client.WinSize{Rows: 60, Cols: 100})
require.NoError(t, err)
prompt.SendExpect("ttysize", "100 60")
prompt.SendExit(99)
err = pid1.Wait()
var exitError *errdefs.ExitError
require.True(t, errors.As(err, &exitError))
require.Equal(t, uint32(99), exitError.ExitCode)
return &client.Result{}, err
}
_, err = c.Build(ctx, SolveOpt{}, product, b, nil)
require.Error(t, err)
inputW.Close()
inputR.Close()
checkAllReleasable(t, c, sb, true)
}
type testPrompt struct {
ctx context.Context
t *testing.T
output *bytes.Buffer
input io.Writer
prompt string
pos int
}
func newTestPrompt(ctx context.Context, t *testing.T, input io.Writer, output *bytes.Buffer) *testPrompt {
return &testPrompt{
ctx: ctx,
t: t,
input: input,
output: output,
prompt: "% ",
}
}
func (p *testPrompt) String() string { return p.prompt }
func (p *testPrompt) SendExit(status int) {
p.input.Write([]byte(fmt.Sprintf("exit %d\n", status)))
}
func (p *testPrompt) Send(cmd string) {
p.input.Write([]byte(cmd + "\n"))
p.wait(p.prompt)
}
func (p *testPrompt) SendExpect(cmd, expected string) {
for {
p.input.Write([]byte(cmd + "\n"))
response := p.wait(p.prompt)
if strings.Contains(response, expected) {
return
}
}
}
func (p *testPrompt) wait(msg string) string {
for {
newOutput := p.output.String()[p.pos:]
if strings.Contains(newOutput, msg) {
p.pos += len(newOutput)
return newOutput
}
select {
case <-p.ctx.Done():
p.t.Logf("Output at timeout: %s", p.output.String())
p.t.Fatalf("Timeout waiting for %q", msg)
case <-time.After(100 * time.Millisecond):
}
}
}
// testClientGatewayContainerExecTty is testing that we can get a tty via
// executor.Exec (secondary process)
func testClientGatewayContainerExecTty(t *testing.T, sb integration.Sandbox) {
if sb.Rootless() {
// TODO fix this
// We get `panic: cannot statfs cgroup root` when running this test
// with runc-rootless
t.Skip("Skipping runc-rootless for cgroup error")
}
requiresLinux(t)
ctx := context.TODO()
c, err := New(ctx, sb.Address())
require.NoError(t, err)
defer c.Close()
product := "buildkit_test"
inputR, inputW := io.Pipe()
output := bytes.NewBuffer(nil)
b := func(ctx context.Context, c client.Client) (*client.Result, error) {
ctx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()
st := llb.Image("busybox:latest")
def, err := st.Marshal(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal state")
}
r, err := c.Solve(ctx, client.SolveRequest{
Definition: def.ToPB(),
})
if err != nil {
return nil, errors.Wrap(err, "failed to solve")
}
ctr, err := c.NewContainer(ctx, client.NewContainerRequest{
Mounts: []client.Mount{{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: r.Ref,
}},
})
require.NoError(t, err)
pid1, err := ctr.Start(ctx, client.StartRequest{
Args: []string{"sleep", "10"},
})
require.NoError(t, err)
defer pid1.Wait()
defer ctr.Release(ctx)
prompt := newTestPrompt(ctx, t, inputW, output)
pid2, err := ctr.Start(ctx, client.StartRequest{
Args: []string{"sh"},
Tty: true,
Stdin: inputR,
Stdout: &nopCloser{output},
Stderr: &nopCloser{output},
Env: []string{fmt.Sprintf("PS1=%s", prompt.String())},
})
require.NoError(t, err)
err = pid2.Resize(ctx, client.WinSize{Rows: 40, Cols: 80})
require.NoError(t, err)
prompt.SendExpect("ttysize", "80 40")
prompt.Send("cd /tmp")
prompt.SendExpect("pwd", "/tmp")
prompt.Send("echo foobar > newfile")
prompt.SendExpect("cat /tmp/newfile", "foobar")
err = pid2.Resize(ctx, client.WinSize{Rows: 60, Cols: 100})
require.NoError(t, err)
prompt.SendExpect("ttysize", "100 60")
prompt.SendExit(99)
return &client.Result{}, pid2.Wait()
}
_, err = c.Build(ctx, SolveOpt{}, product, b, nil)
require.Error(t, err)
require.Regexp(t, "exit code: 99|runc did not terminate successfully", err.Error())
inputW.Close()
inputR.Close()
checkAllReleasable(t, c, sb, true)
}
type nopCloser struct {
io.Writer
}

View File

@ -272,10 +272,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
}
}
if meta.Tty {
return errors.New("tty with runc not implemented")
}
spec.Process.Terminal = meta.Tty
spec.Process.OOMScoreAdj = w.oomScoreAdj
if w.rootless {
if err := rootlessspecconv.ToRootless(spec); err != nil {
@ -326,10 +323,8 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
close(started)
})
}
status, err := w.runc.Run(runCtx, id, bundle, &runc.CreateOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
NoPivot: w.noPivot,
})
status, err := w.run(runCtx, id, bundle, process)
close(ended)
if status != 0 || err != nil {
@ -413,21 +408,14 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro
spec.Process.Env = process.Meta.Env
}
err = w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
})
var exitError *exec.ExitError
if errors.As(err, &exitError) {
err = &errdefs.ExitError{
ExitCode: uint32(exitError.ExitCode()),
status, err := w.exec(ctx, id, state.Bundle, spec.Process, process)
if status == 0 && err == nil {
return nil
}
return &errdefs.ExitError{
ExitCode: uint32(status),
Err: err,
}
return err
} else if err != nil {
return err
}
return nil
}
type forwardIO struct {

View File

@ -0,0 +1,44 @@
// +build !linux
package runcexecutor
import (
"context"
"os/exec"
"github.com/containerd/containerd"
runc "github.com/containerd/go-runc"
"github.com/moby/buildkit/executor"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
var unsupportedConsoleError = errors.New("tty for runc is only supported on linux")
func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo) (int, error) {
if process.Meta.Tty {
return 0, unsupportedConsoleError
}
return w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
NoPivot: w.noPivot,
})
}
func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo) (int, error) {
if process.Meta.Tty {
return 0, unsupportedConsoleError
}
err := w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
})
var exitError *exec.ExitError
if errors.As(err, &exitError) {
return exitError.ExitCode(), err
}
if err != nil {
return containerd.UnknownExitStatus, err
}
return 0, nil
}

View File

@ -0,0 +1,206 @@
package runcexecutor
import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"strconv"
"strings"
"syscall"
"time"
"github.com/containerd/console"
"github.com/containerd/containerd"
runc "github.com/containerd/go-runc"
"github.com/docker/docker/pkg/signal"
"github.com/moby/buildkit/executor"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo) (int, error) {
return w.callWithIO(ctx, id, bundle, process, func(ctx context.Context, pidfile string, io runc.IO) (int, error) {
return w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
NoPivot: w.noPivot,
PidFile: pidfile,
IO: io,
})
})
}
func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo) (int, error) {
return w.callWithIO(ctx, id, bundle, process, func(ctx context.Context, pidfile string, io runc.IO) (int, error) {
err := w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
PidFile: pidfile,
IO: io,
})
var exitError *exec.ExitError
if errors.As(err, &exitError) {
return exitError.ExitCode(), err
}
if err != nil {
return containerd.UnknownExitStatus, err
}
return 0, nil
})
}
type runcCall func(ctx context.Context, pidfile string, io runc.IO) (int, error)
func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, process executor.ProcessInfo, call runcCall) (int, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
pidfile, err := ioutil.TempFile(bundle, "*.pid")
if err != nil {
return containerd.UnknownExitStatus, errors.Wrap(err, "failed to create pidfile")
}
defer os.Remove(pidfile.Name())
pidfile.Close()
if !process.Meta.Tty {
return call(ctx, pidfile.Name(), &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr})
}
ptm, ptsName, err := console.NewPty()
if err != nil {
return containerd.UnknownExitStatus, err
}
pts, err := os.OpenFile(ptsName, os.O_RDWR|syscall.O_NOCTTY, 0)
if err != nil {
ptm.Close()
return containerd.UnknownExitStatus, err
}
eg, ctx := errgroup.WithContext(ctx)
defer func() {
if process.Stdin != nil {
process.Stdin.Close()
}
pts.Close()
ptm.Close()
cancel() // this will shutdown resize loop
err := eg.Wait()
if err != nil {
logrus.Warningf("error while shutting down tty io: %s", err)
}
}()
if process.Stdin != nil {
eg.Go(func() error {
_, err := io.Copy(ptm, process.Stdin)
// stdin might be a pipe, so this is like EOF
if errors.Is(err, io.ErrClosedPipe) {
return nil
}
return err
})
}
if process.Stdout != nil {
eg.Go(func() error {
_, err := io.Copy(process.Stdout, ptm)
// ignore `read /dev/ptmx: input/output error` when ptm is closed
var ptmClosedError *os.PathError
if errors.As(err, &ptmClosedError) {
if ptmClosedError.Op == "read" &&
ptmClosedError.Path == "/dev/ptmx" &&
ptmClosedError.Err == syscall.EIO {
return nil
}
}
return err
})
}
eg.Go(func() error {
// need to poll until the pidfile has the pid written to it
pidfileCtx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()
var runcProcess *os.Process
for {
st, err := os.Stat(pidfile.Name())
if err == nil && st.Size() > 0 {
pid, err := runc.ReadPidFile(pidfile.Name())
if err != nil {
return errors.Wrapf(err, "unable to read pid file: %s", pidfile.Name())
}
// pid will be for the process in process.Meta, not the parent runc process.
// We need to send SIGWINCH to the runc process, not the process.Meta process.
ppid, err := getppid(pid)
if err != nil {
return errors.Wrapf(err, "unable to find runc process (parent of %d)", pid)
}
runcProcess, err = os.FindProcess(ppid)
if err != nil {
return errors.Wrapf(err, "unable to find process for pid %d", ppid)
}
break
}
select {
case <-pidfileCtx.Done():
return errors.New("pidfile never updated")
case <-time.After(100 * time.Microsecond):
}
}
for {
select {
case <-ctx.Done():
return nil
case resize := <-process.Resize:
err = ptm.Resize(console.WinSize{
Height: uint16(resize.Rows),
Width: uint16(resize.Cols),
})
if err != nil {
logrus.Errorf("failed to resize ptm: %s", err)
}
err = runcProcess.Signal(signal.SIGWINCH)
if err != nil {
logrus.Errorf("failed to send SIGWINCH to process: %s", err)
}
}
}
})
runcIO := &forwardIO{}
if process.Stdin != nil {
runcIO.stdin = pts
}
if process.Stdout != nil {
runcIO.stdout = pts
}
if process.Stderr != nil {
runcIO.stderr = pts
}
return call(ctx, pidfile.Name(), runcIO)
}
const PPidStatusPrefix = "PPid:\t"
func getppid(pid int) (int, error) {
fh, err := os.Open(fmt.Sprintf("/proc/%d/status", pid))
if err != nil {
return -1, err
}
defer fh.Close()
scanner := bufio.NewScanner(fh)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, PPidStatusPrefix) {
return strconv.Atoi(strings.TrimPrefix(line, PPidStatusPrefix))
}
}
return -1, errors.Errorf("PPid line not found in /proc/%d/status", pid)
}

View File

@ -475,14 +475,15 @@ func (b *procMessageForwarder) Send(ctx context.Context, m *pb.ExecMessage) {
}
}
func (b *procMessageForwarder) Recv(ctx context.Context) *pb.ExecMessage {
func (b *procMessageForwarder) Recv(ctx context.Context) (m *pb.ExecMessage, ok bool) {
select {
case <-ctx.Done():
return nil, true
case <-b.done:
case m := <-b.msgs:
return m
return nil, false
case m = <-b.msgs:
return m, true
}
return nil
}
func (b *procMessageForwarder) Close() {
@ -734,7 +735,7 @@ func (ctr *container) Start(ctx context.Context, req client.StartRequest) (clien
return nil, err
}
msg := msgs.Recv(ctx)
msg, _ := msgs.Recv(ctx)
if msg == nil {
return nil, errors.Errorf("failed to receive started message")
}
@ -798,13 +799,24 @@ func (ctr *container) Start(ctx context.Context, req client.StartRequest) (clien
}
ctrProc.eg.Go(func() error {
var closeDoneOnce sync.Once
var exitError error
for {
msg := msgs.Recv(ctx)
if msg == nil {
msg, ok := msgs.Recv(ctx)
if !ok {
// no more messages, return
return exitError
}
if msg == nil {
// empty message from ctx cancel, so just start shutting down
// input, but continue processing more exit/done messages
closeDoneOnce.Do(func() {
close(done)
})
continue
}
if file := msg.GetFile(); file != nil {
var out io.WriteCloser
switch file.Fd {
@ -826,7 +838,9 @@ func (ctr *container) Start(ctx context.Context, req client.StartRequest) (clien
} else if exit := msg.GetExit(); exit != nil {
// capture exit message to exitError so we can return it after
// the server sends the Done message
closeDoneOnce.Do(func() {
close(done)
})
if exit.Code == 0 {
continue
}