Merge pull request #556 from tonistiigi/netproviders

executor: allow network providers
docker-18.09
Akihiro Suda 2018-08-09 08:46:04 +09:00 committed by GitHub
commit de1c0cc8de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 273 additions and 40 deletions

View File

@ -15,19 +15,23 @@ import (
"github.com/moby/buildkit/executor/oci"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/network"
"github.com/moby/buildkit/util/system"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type containerdExecutor struct {
client *containerd.Client
root string
networkProvider network.Provider
}
func New(client *containerd.Client, root string) executor.Executor {
func New(client *containerd.Client, root string, networkProvider network.Provider) executor.Executor {
return containerdExecutor{
client: client,
root: root,
networkProvider: networkProvider,
}
}
@ -74,6 +78,18 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c
lm.Unmount()
}
hostNetworkEnabled := true
var iface network.Interface
if w.networkProvider != nil {
iface, err = w.networkProvider.NewInterface()
if err == nil && iface != nil {
hostNetworkEnabled = false
}
}
if hostNetworkEnabled {
logrus.Info("enabling HostNetworking")
}
opts := []containerdoci.SpecOpts{oci.WithUIDGID(uid, gid, sgids)}
if meta.ReadonlyRootFS {
opts = append(opts, containerdoci.WithRootFSReadonly())
@ -81,7 +97,7 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c
if system.SeccompSupported() {
opts = append(opts, seccomp.WithDefaultProfile())
}
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, opts...)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, hostNetworkEnabled, opts...)
if err != nil {
return err
}
@ -108,14 +124,24 @@ func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root c
if err != nil {
return err
}
if iface != nil {
if err := iface.Set(int(task.Pid())); err != nil {
return errors.Wrap(err, "could not set the network")
}
}
defer func() {
if iface != nil {
iface.Remove(int(task.Pid()))
w.networkProvider.Release(iface)
}
if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil {
err = errors.Wrapf(err1, "failed to delete task %s", id)
}
}()
// TODO: Configure bridge networking
if err := task.Start(ctx); err != nil {
return err
}

View File

@ -21,7 +21,7 @@ import (
// Ideally we don't have to import whole containerd just for the default spec
// GenerateSpec generates spec using containerd functionality.
func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mount, id, resolvConf, hostsFile string, opts ...oci.SpecOpts) (*specs.Spec, func(), error) {
func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mount, id, resolvConf, hostsFile string, hostNetwork bool, opts ...oci.SpecOpts) (*specs.Spec, func(), error) {
c := &containers.Container{
ID: id,
}
@ -30,9 +30,9 @@ func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mou
ctx = namespaces.WithNamespace(ctx, "buildkit")
}
opts = append(opts,
oci.WithHostNamespace(specs.NetworkNamespace),
)
if hostNetwork {
opts = append(opts, oci.WithHostNamespace(specs.NetworkNamespace))
}
// Note that containerd.GenerateSpec is namespaced so as to make
// specs.Linux.CgroupsPath namespaced

View File

@ -10,6 +10,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"github.com/containerd/containerd/contrib/seccomp"
@ -21,8 +22,10 @@ import (
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/executor/oci"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/util/network"
rootlessspecconv "github.com/moby/buildkit/util/rootless/specconv"
"github.com/moby/buildkit/util/system"
runcsystem "github.com/opencontainers/runc/libcontainer/system"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -43,9 +46,10 @@ type runcExecutor struct {
root string
cmd string
rootless bool
networkProvider network.Provider
}
func New(opt Opt) (executor.Executor, error) {
func New(opt Opt, networkProvider network.Provider) (executor.Executor, error) {
cmds := opt.CommandCandidates
if cmds == nil {
cmds = defaultCommandCandidates
@ -65,6 +69,10 @@ func New(opt Opt) (executor.Executor, error) {
root := opt.Root
if err := setSubReaper(); err != nil {
return nil, err
}
if err := os.MkdirAll(root, 0700); err != nil {
return nil, errors.Wrapf(err, "failed to create %s", root)
}
@ -92,11 +100,29 @@ func New(opt Opt) (executor.Executor, error) {
runc: runtime,
root: root,
rootless: opt.Rootless,
networkProvider: networkProvider,
}
return w, nil
}
func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
hostNetworkEnabled := true
var iface network.Interface
if w.networkProvider != nil {
var err error
iface, err = w.networkProvider.NewInterface()
if err == nil && iface != nil {
hostNetworkEnabled = false
}
}
if hostNetworkEnabled {
logrus.Info("enabling HostNetworking")
}
defer func() {
if iface != nil {
w.networkProvider.Release(iface)
}
}()
resolvConf, err := oci.GetResolvConf(ctx, w.root)
if err != nil {
@ -148,6 +174,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
return err
}
defer f.Close()
opts := []containerdoci.SpecOpts{oci.WithUIDGID(uid, gid, sgids)}
if system.SeccompSupported() {
opts = append(opts, seccomp.WithDefaultProfile())
@ -155,7 +182,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
if meta.ReadonlyRootFS {
opts = append(opts, containerdoci.WithRootFSReadonly())
}
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, opts...)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, hostNetworkEnabled, opts...)
if err != nil {
return err
}
@ -187,32 +214,140 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
return err
}
logrus.Debugf("> running %s %v", id, meta.Args)
forwardIO, err := newForwardIO(stdin, stdout, stderr)
if err != nil {
return errors.Wrap(err, "creating new forwarding IO")
}
defer forwardIO.Close()
status, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
IO: &forwardIO{stdin: stdin, stdout: stdout, stderr: stderr},
pidFilePath := filepath.Join(w.root, "runc_pid_"+identity.NewID())
defer os.RemoveAll(pidFilePath)
logrus.Debugf("> creating %s %v", id, meta.Args)
err = w.runc.Create(ctx, id, bundle, &runc.CreateOpts{
PidFile: pidFilePath,
IO: forwardIO,
})
logrus.Debugf("< completed %s %v %v", id, status, err)
if status != 0 {
select {
case <-ctx.Done():
// runc can't report context.Cancelled directly
return errors.Wrapf(ctx.Err(), "exit code %d", status)
default:
if err != nil {
return err
}
return errors.Errorf("exit code %d", status)
forwardIO.release()
defer func() {
go func() {
if err := w.runc.Delete(context.TODO(), id, &runc.DeleteOpts{}); err != nil {
logrus.Errorf("failed to delete %s: %+v", id, err)
}
}()
}()
dt, err := ioutil.ReadFile(pidFilePath)
if err != nil {
return err
}
pid, err := strconv.Atoi(string(dt))
if err != nil {
return err
}
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-done:
case <-ctx.Done():
syscall.Kill(-pid, syscall.SIGKILL)
}
}()
if iface != nil {
if err := iface.Set(pid); err != nil {
return errors.Wrap(err, "could not set the network")
}
defer func() {
iface.Remove(pid)
}()
}
err = w.runc.Start(ctx, id)
if err != nil {
return err
}
p, err := os.FindProcess(pid)
if err != nil {
return err
}
status := 0
ps, err := p.Wait()
if err != nil {
status = 255
}
if ws, ok := ps.Sys().(syscall.WaitStatus); ok {
status = ws.ExitStatus()
}
if status != 0 {
return errors.Errorf("exit code: %d", status)
}
return nil
}
type forwardIO struct {
stdin io.ReadCloser
stdout, stderr io.WriteCloser
stdin, stdout, stderr *os.File
toRelease []io.Closer
toClose []io.Closer
}
func newForwardIO(stdin io.ReadCloser, stdout, stderr io.WriteCloser) (f *forwardIO, err error) {
fio := &forwardIO{}
defer func() {
if err != nil {
fio.Close()
}
}()
if stdin != nil {
fio.stdin, err = fio.readCloserToFile(stdin)
if err != nil {
return nil, err
}
}
if stdout != nil {
fio.stdout, err = fio.writeCloserToFile(stdout)
if err != nil {
return nil, err
}
}
if stderr != nil {
fio.stderr, err = fio.writeCloserToFile(stderr)
if err != nil {
return nil, err
}
}
return fio, nil
}
func (s *forwardIO) Close() error {
return nil
s.release()
var err error
for _, cl := range s.toClose {
if err1 := cl.Close(); err == nil {
err = err1
}
}
s.toClose = nil
return err
}
// release releases active FDs if the process doesn't need them any more
func (s *forwardIO) release() {
for _, cl := range s.toRelease {
cl.Close()
}
s.toRelease = nil
}
func (s *forwardIO) Set(cmd *exec.Cmd) {
@ -221,6 +356,56 @@ func (s *forwardIO) Set(cmd *exec.Cmd) {
cmd.Stderr = s.stderr
}
func (s *forwardIO) readCloserToFile(rc io.ReadCloser) (*os.File, error) {
if f, ok := rc.(*os.File); ok {
return f, nil
}
pr, pw, err := os.Pipe()
if err != nil {
return nil, err
}
s.toClose = append(s.toClose, pw)
s.toRelease = append(s.toRelease, pr)
go func() {
_, err := io.Copy(pw, rc)
if err1 := pw.Close(); err == nil {
err = err1
}
_ = err
}()
return pr, nil
}
func (s *forwardIO) writeCloserToFile(wc io.WriteCloser) (*os.File, error) {
if f, ok := wc.(*os.File); ok {
return f, nil
}
pr, pw, err := os.Pipe()
if err != nil {
return nil, err
}
s.toClose = append(s.toClose, pr)
s.toRelease = append(s.toRelease, pw)
go func() {
_, err := io.Copy(wc, pr)
if err1 := pw.Close(); err == nil {
err = err1
}
_ = err
}()
return pw, nil
}
var subReaperOnce sync.Once
var subReaperError error
func setSubReaper() error {
subReaperOnce.Do(func() {
subReaperError = runcsystem.SetSubreaper(1)
})
return subReaperError
}
func (s *forwardIO) Stdin() io.WriteCloser {
return nil
}

22
util/network/network.go Normal file
View File

@ -0,0 +1,22 @@
package network
// Provider interface for Network
type Provider interface {
NewInterface() (Interface, error)
Release(Interface) error
}
// Interface of network for workers
type Interface interface {
// Set the pid with network interace namespace
Set(int) error
// Removes the network interface
Remove(int) error
}
// NetworkOpts hold network options
type NetworkOpts struct {
Type string
CNIConfigPath string
CNIPluginPath string
}

View File

@ -107,7 +107,7 @@ func newContainerd(root string, client *containerd.Client, snapshotterName strin
ID: id,
Labels: xlabels,
MetadataStore: md,
Executor: containerdexecutor.New(client, root),
Executor: containerdexecutor.New(client, root, nil),
Snapshotter: containerdsnapshot.NewSnapshotter(client.SnapshotService(snapshotterName), cs, md, "buildkit", gc),
ContentStore: cs,
Applier: winlayers.NewFileSystemApplierWithWindows(cs, df),

View File

@ -48,7 +48,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, labe
Root: filepath.Join(root, "executor"),
// without root privileges
Rootless: rootless,
})
}, nil)
if err != nil {
return opt, err
}

View File

@ -112,7 +112,7 @@ func TestRuncWorker(t *testing.T) {
root, err := w.CacheManager.New(ctx, snap)
require.NoError(t, err)
err = w.Executor.Exec(ctx, meta, root, nil, nil, nil, nil)
err = w.Executor.Exec(ctx, meta, root, nil, nil, nil, &nopCloser{stderr})
require.NoError(t, err)
meta = executor.Meta{