buildkit/cmd/buildkitd/main.go

417 lines
9.9 KiB
Go

package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"os"
"os/user"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/containerd/containerd/sys"
"github.com/docker/go-connections/sockets"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/moby/buildkit/cache/cacheimport"
"github.com/moby/buildkit/control"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/dockerfile"
"github.com/moby/buildkit/frontend/gateway"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/appcontext"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/worker"
"github.com/moby/buildkit/worker/base"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
type workerInitializerOpt struct {
sessionManager *session.Manager
root string
}
type workerInitializer struct {
fn func(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error)
// less priority number, more preferred
priority int
}
var (
appFlags []cli.Flag
workerInitializers []workerInitializer
)
func registerWorkerInitializer(wi workerInitializer, flags ...cli.Flag) {
workerInitializers = append(workerInitializers, wi)
sort.Slice(workerInitializers,
func(i, j int) bool {
return workerInitializers[i].priority < workerInitializers[j].priority
})
appFlags = append(appFlags, flags...)
}
func main() {
app := cli.NewApp()
app.Name = "buildkitd"
app.Usage = "build daemon"
app.Flags = []cli.Flag{
cli.BoolFlag{
Name: "debug",
Usage: "enable debug output in logs",
},
cli.StringFlag{
Name: "root",
Usage: "path to state directory",
Value: appdefaults.Root,
},
cli.StringSliceFlag{
Name: "addr",
Usage: "listening address (socket or tcp)",
Value: &cli.StringSlice{appdefaults.Address},
},
cli.StringFlag{
Name: "group",
Usage: "group (name or gid) which will own all Unix socket listening addresses",
Value: "",
},
cli.StringFlag{
Name: "debugaddr",
Usage: "debugging address (eg. 0.0.0.0:6060)",
Value: "",
},
cli.StringFlag{
Name: "tlscert",
Usage: "certificate file to use",
},
cli.StringFlag{
Name: "tlskey",
Usage: "key file to use",
},
cli.StringFlag{
Name: "tlscacert",
Usage: "ca certificate to verify clients",
},
}
app.Flags = append(app.Flags, appFlags...)
app.Action = func(c *cli.Context) error {
ctx, cancel := context.WithCancel(appcontext.Context())
defer cancel()
if debugAddr := c.GlobalString("debugaddr"); debugAddr != "" {
if err := setupDebugHandlers(debugAddr); err != nil {
return err
}
}
opts := []grpc.ServerOption{unaryInterceptor(ctx), grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer))}
creds, err := serverCredentials(c)
if err != nil {
return err
}
if creds != nil {
opts = append(opts, creds)
}
server := grpc.NewServer(opts...)
// relative path does not work with nightlyone/lockfile
root, err := filepath.Abs(c.GlobalString("root"))
if err != nil {
return err
}
if err := os.MkdirAll(root, 0700); err != nil {
return errors.Wrapf(err, "failed to create %s", root)
}
controller, err := newController(c, root)
if err != nil {
return err
}
controller.Register(server)
errCh := make(chan error, 1)
addrs := c.GlobalStringSlice("addr")
if len(addrs) > 1 {
addrs = addrs[1:] // https://github.com/urfave/cli/issues/160
}
if err := serveGRPC(c, server, addrs, errCh); err != nil {
return err
}
select {
case serverErr := <-errCh:
err = serverErr
cancel()
case <-ctx.Done():
err = ctx.Err()
}
logrus.Infof("stopping server")
server.GracefulStop()
return err
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
logrus.SetLevel(logrus.DebugLevel)
}
return nil
}
app.After = func(context *cli.Context) error {
if closeTracer != nil {
return closeTracer.Close()
}
return nil
}
profiler.Attach(app)
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "buildkitd: %s\n", err)
os.Exit(1)
}
}
func serveGRPC(c *cli.Context, server *grpc.Server, addrs []string, errCh chan error) error {
if len(addrs) == 0 {
return errors.New("--addr cannot be empty")
}
eg, _ := errgroup.WithContext(context.Background())
listeners := make([]net.Listener, 0, len(addrs))
for _, addr := range addrs {
l, err := getListener(c, addr)
if err != nil {
for _, l := range listeners {
l.Close()
}
return err
}
listeners = append(listeners, l)
}
for _, l := range listeners {
func(l net.Listener) {
eg.Go(func() error {
defer l.Close()
logrus.Infof("running server on %s", l.Addr())
return server.Serve(l)
})
}(l)
}
go func() {
errCh <- eg.Wait()
}()
return nil
}
// Convert a string containing either a group name or a stringified gid into a numeric id)
func groupToGid(group string) (int, error) {
if group == "" {
return os.Getgid(), nil
}
var (
err error
id int
)
// Try and parse as a number, if the error is ErrSyntax
// (i.e. its not a number) then we carry on and try it as a
// name.
if id, err = strconv.Atoi(group); err == nil {
return id, nil
} else if err.(*strconv.NumError).Err != strconv.ErrSyntax {
return 0, err
}
ginfo, err := user.LookupGroup(group)
if err != nil {
return 0, err
}
group = ginfo.Gid
if id, err = strconv.Atoi(group); err != nil {
return 0, err
}
return id, nil
}
func getListener(c *cli.Context, addr string) (net.Listener, error) {
addrSlice := strings.SplitN(addr, "://", 2)
if len(addrSlice) < 2 {
return nil, errors.Errorf("address %s does not contain proto, you meant unix://%s ?",
addr, addr)
}
proto := addrSlice[0]
listenAddr := addrSlice[1]
switch proto {
case "unix", "npipe":
uid := os.Getuid()
gid, err := groupToGid(c.GlobalString("group"))
if err != nil {
return nil, err
}
return sys.GetLocalListener(listenAddr, uid, gid)
case "tcp":
return sockets.NewTCPSocket(listenAddr, nil)
default:
return nil, errors.Errorf("addr %s not supported", addr)
}
}
func unaryInterceptor(globalCtx context.Context) grpc.ServerOption {
withTrace := otgrpc.OpenTracingServerInterceptor(tracer, otgrpc.LogPayloads())
return grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-ctx.Done():
case <-globalCtx.Done():
cancel()
}
}()
resp, err = withTrace(ctx, req, info, handler)
if err != nil {
logrus.Errorf("%s returned error: %+v", info.FullMethod, err)
}
return
})
}
func serverCredentials(c *cli.Context) (grpc.ServerOption, error) {
certFile := c.GlobalString("tlscert")
keyFile := c.GlobalString("tlskey")
caFile := c.GlobalString("tlscacert")
if certFile == "" && keyFile == "" {
return nil, nil
}
err := errors.New("you must specify key and cert file if one is specified")
if certFile == "" {
return nil, err
}
if keyFile == "" {
return nil, err
}
certificate, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, errors.Wrap(err, "could not load server key pair")
}
tlsConf := &tls.Config{
Certificates: []tls.Certificate{certificate},
}
if caFile != "" {
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
if err != nil {
return nil, errors.Wrap(err, "could not read ca certificate")
}
// Append the client certificates from the CA
if ok := certPool.AppendCertsFromPEM(ca); !ok {
return nil, errors.New("failed to append ca cert")
}
tlsConf.ClientAuth = tls.RequireAndVerifyClientCert
tlsConf.ClientCAs = certPool
}
creds := grpc.Creds(credentials.NewTLS(tlsConf))
return creds, nil
}
func newController(c *cli.Context, root string) (*control.Controller, error) {
sessionManager, err := session.NewManager()
if err != nil {
return nil, err
}
wc, err := newWorkerController(c, workerInitializerOpt{
sessionManager: sessionManager,
root: root,
})
if err != nil {
return nil, err
}
frontends := map[string]frontend.Frontend{}
frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend()
frontends["gateway.v0"] = gateway.NewGatewayFrontend()
// cache exporter and importer are manager concepts but as there is no
// way to pull data into specific worker yet we currently set them up
// as part of default worker
var ce *cacheimport.CacheExporter
var ci *cacheimport.CacheImporter
w, err := wc.GetDefault()
if err != nil {
return nil, err
}
wt := w.(*base.Worker)
ce = wt.CacheExporter
ci = wt.CacheImporter
return control.NewController(control.Opt{
SessionManager: sessionManager,
WorkerController: wc,
Frontends: frontends,
CacheExporter: ce,
CacheImporter: ci,
})
}
func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Controller, error) {
wc := &worker.Controller{}
nWorkers := 0
for _, wi := range workerInitializers {
ws, err := wi.fn(c, wiOpt)
if err != nil {
return nil, err
}
for _, w := range ws {
logrus.Infof("found worker %q, labels=%v", w.ID(), w.Labels())
if err = wc.Add(w); err != nil {
return nil, err
}
nWorkers++
}
}
if nWorkers == 0 {
return nil, errors.New("no worker found, rebuild the buildkit daemon?")
}
defaultWorker, err := wc.GetDefault()
if err != nil {
return nil, err
}
logrus.Infof("found %d workers, default=%q", nWorkers, defaultWorker.ID())
logrus.Warn("currently, only the default worker can be used.")
return wc, nil
}
func attrMap(sl []string) (map[string]string, error) {
m := map[string]string{}
for _, v := range sl {
parts := strings.SplitN(v, "=", 2)
if len(parts) != 2 {
return nil, errors.Errorf("invalid value %s", v)
}
m[parts[0]] = parts[1]
}
return m, nil
}