support collecting traces from llb.Exec

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
v0.9
Tonis Tiigi 2021-06-13 23:29:08 -07:00
parent d8fc4e15f9
commit d512920c24
14 changed files with 106 additions and 53 deletions

View File

@ -5,6 +5,7 @@ import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"net"
"net/url"
"strings"
@ -17,6 +18,7 @@ import (
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/moby/buildkit/util/tracing/otlptracegrpc"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@ -136,17 +138,12 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
}
func (c *Client) setupDelegatedTracing(ctx context.Context) error {
span := trace.SpanFromContext(ctx)
if !span.SpanContext().IsValid() {
return nil
exp, err := detect.Exporter()
if err != nil {
return err
}
exp, ok := span.TracerProvider().(interface {
SpanExporter() sdktrace.SpanExporter
})
if !ok || exp == nil {
return nil
}
log.Printf("exporter %v %T", exp, exp)
del, ok := exp.(interface {
SetDelegate(context.Context, sdktrace.SpanExporter) error
})

View File

@ -15,6 +15,7 @@ import (
"github.com/moby/buildkit/util/stack"
_ "github.com/moby/buildkit/util/tracing/detect/delegated"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
_ "github.com/moby/buildkit/util/tracing/env"
"github.com/moby/buildkit/version"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"

View File

@ -50,6 +50,8 @@ import (
"github.com/moby/buildkit/util/stack"
"github.com/moby/buildkit/util/tracing/detect"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
_ "github.com/moby/buildkit/util/tracing/env"
"github.com/moby/buildkit/util/tracing/transform"
"github.com/moby/buildkit/version"
"github.com/moby/buildkit/worker"
specs "github.com/opencontainers/image-spec/specs-go/v1"
@ -60,6 +62,8 @@ import (
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
@ -78,6 +82,7 @@ type workerInitializerOpt struct {
config *config.Config
configMetaData *toml.MetaData
sessionManager *session.Manager
traceSocket string
}
type workerInitializer struct {
@ -305,7 +310,7 @@ func main() {
profiler.Attach(app)
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "buildkitd: %s\n", err)
fmt.Fprintf(os.Stderr, "buildkitd: %+v\n", err)
os.Exit(1)
}
}
@ -553,6 +558,10 @@ func unaryInterceptor(globalCtx context.Context, tp trace.TracerProvider) grpc.U
}
}()
if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return handler(ctx, req)
}
resp, err = withTrace(ctx, req, info, handler)
if err != nil {
logrus.Errorf("%s returned error: %+v", info.FullMethod, stack.Formatter(err))
@ -603,10 +612,25 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
if err != nil {
return nil, err
}
tc, err := detect.Exporter()
if err != nil {
return nil, err
}
var traceSocket string
if tc != nil {
traceSocket = filepath.Join(cfg.Root, "otel-grpc.sock")
if err := runTraceController(traceSocket, tc); err != nil {
return nil, err
}
}
wc, err := newWorkerController(c, workerInitializerOpt{
config: cfg,
configMetaData: md,
sessionManager: sessionManager,
traceSocket: traceSocket,
})
if err != nil {
return nil, err
@ -637,16 +661,6 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
"local": localremotecache.ResolveCacheImporterFunc(sessionManager),
}
tp, err := detect.TracerProvider()
if err != nil {
return nil, err
}
var tc sdktrace.SpanExporter
if tp, ok := tp.(detect.TraceCollector); ok {
tc = tp.SpanExporter()
}
return control.NewController(control.Opt{
SessionManager: sessionManager,
WorkerController: wc,
@ -755,3 +769,32 @@ func getDNSConfig(cfg *config.DNSConfig) *oci.DNSConfig {
}
return dns
}
func runTraceController(p string, exp sdktrace.SpanExporter) error {
server := grpc.NewServer()
tracev1.RegisterTraceServiceServer(server, &traceCollector{exporter: exp})
uid := os.Getuid()
l, err := sys.GetLocalListener(p, uid, uid)
if err != nil {
return err
}
if err := os.Chmod(p, 0666); err != nil {
l.Close()
return err
}
go server.Serve(l)
return nil
}
type traceCollector struct {
*tracev1.UnimplementedTraceServiceServer
exporter sdktrace.SpanExporter
}
func (t *traceCollector) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
return nil, err
}
return &v1.ExportTraceServiceResponse{}, nil
}

View File

@ -235,7 +235,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
if cfg.Snapshotter != "" {
snapshotter = cfg.Snapshotter
}
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, ctd.WithTimeout(60*time.Second))
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, common.traceSocket, ctd.WithTimeout(60*time.Second))
if err != nil {
return nil, err
}

View File

@ -287,7 +287,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism))
}
opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem)
opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem, common.traceSocket)
if err != nil {
return nil, err
}

View File

@ -38,10 +38,11 @@ type containerdExecutor struct {
running map[string]chan error
mu sync.Mutex
apparmorProfile string
traceSocket string
}
// New creates a new executor backed by connection to containerd API
func New(client *containerd.Client, root, cgroup string, networkProviders map[pb.NetMode]network.Provider, dnsConfig *oci.DNSConfig, apparmorProfile string) executor.Executor {
func New(client *containerd.Client, root, cgroup string, networkProviders map[pb.NetMode]network.Provider, dnsConfig *oci.DNSConfig, apparmorProfile string, traceSocket string) executor.Executor {
// clean up old hosts/resolv.conf file. ignore errors
os.RemoveAll(filepath.Join(root, "hosts"))
os.RemoveAll(filepath.Join(root, "resolv.conf"))
@ -54,6 +55,7 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb
dnsConfig: dnsConfig,
running: make(map[string]chan error),
apparmorProfile: apparmorProfile,
traceSocket: traceSocket,
}
}
@ -170,7 +172,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M
opts = append(opts, containerdoci.WithCgroup(cgroupsPath))
}
processMode := oci.ProcessSandbox // FIXME(AkihiroSuda)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, w.apparmorProfile, opts...)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, w.apparmorProfile, w.traceSocket, opts...)
if err != nil {
return err
}

View File

@ -37,7 +37,7 @@ const (
// GenerateSpec generates spec using containerd functionality.
// opts are ignored for s.Process, s.Hostname, and s.Mounts .
func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mount, id, resolvConf, hostsFile string, namespace network.Namespace, processMode ProcessMode, idmap *idtools.IdentityMapping, apparmorProfile string, opts ...oci.SpecOpts) (*specs.Spec, func(), error) {
func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mount, id, resolvConf, hostsFile string, namespace network.Namespace, processMode ProcessMode, idmap *idtools.IdentityMapping, apparmorProfile string, tracingSocket string, opts ...oci.SpecOpts) (*specs.Spec, func(), error) {
c := &containers.Container{
ID: id,
}
@ -77,7 +77,11 @@ func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mou
hostname = meta.Hostname
}
meta.Env = append(meta.Env, traceexec.Environ(ctx)...)
if tracingSocket != "" {
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md
meta.Env = append(meta.Env, "OTEL_TRACES_EXPORTER=otlp", "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=unix:///dev/otel-grpc.sock", "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=grpc")
meta.Env = append(meta.Env, traceexec.Environ(ctx)...)
}
opts = append(opts,
oci.WithProcessArgs(meta.Args...),
@ -142,6 +146,15 @@ func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mou
}
}
if tracingSocket != "" {
s.Mounts = append(s.Mounts, specs.Mount{
Destination: "/dev/otel-grpc.sock",
Type: "bind",
Source: tracingSocket,
Options: []string{"ro", "rbind"},
})
}
return s, releaseAll, nil
}

View File

@ -46,6 +46,7 @@ type Opt struct {
DNS *oci.DNSConfig
OOMScoreAdj *int
ApparmorProfile string
TracingSocket string
}
var defaultCommandCandidates = []string{"buildkit-runc", "runc"}
@ -64,6 +65,7 @@ type runcExecutor struct {
running map[string]chan error
mu sync.Mutex
apparmorProfile string
tracingSocket string
}
func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Executor, error) {
@ -127,6 +129,7 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex
oomScoreAdj: opt.OOMScoreAdj,
running: make(map[string]chan error),
apparmorProfile: opt.ApparmorProfile,
tracingSocket: opt.TracingSocket,
}
return w, nil
}
@ -256,7 +259,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
}
opts = append(opts, containerdoci.WithCgroup(cgroupsPath))
}
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.processMode, w.idmap, w.apparmorProfile, opts...)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.processMode, w.idmap, w.apparmorProfile, w.tracingSocket, opts...)
if err != nil {
return err
}

1
go.mod
View File

@ -62,6 +62,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.0.0-RC1
go.opentelemetry.io/otel/sdk v1.0.0-RC1
go.opentelemetry.io/otel/trace v1.0.0-RC1
go.opentelemetry.io/proto/otlp v0.9.0
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c

View File

@ -27,6 +27,7 @@ var ServiceName string
var detectors map[string]detector
var once sync.Once
var tp trace.TracerProvider
var exporter sdktrace.SpanExporter
var closers []func(context.Context) error
var err error
@ -95,10 +96,8 @@ func detect() error {
sdktp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sp), sdktrace.WithResource(res))
closers = append(closers, sdktp.Shutdown)
tp = &tracerProviderWithExporter{
TracerProvider: sdktp,
exp: exp,
}
exporter = exp
tp = sdktp
return nil
}
@ -115,6 +114,14 @@ func TracerProvider() (trace.TracerProvider, error) {
return tp, nil
}
func Exporter() (sdktrace.SpanExporter, error) {
_, err := TracerProvider()
if err != nil {
return nil, err
}
return exporter, nil
}
func Shutdown(ctx context.Context) error {
for _, c := range closers {
if err := c(ctx); err != nil {
@ -141,18 +148,3 @@ func (serviceNameDetector) Detect(ctx context.Context) (*resource.Resource, erro
},
).Detect(ctx)
}
type TraceCollector interface {
SpanExporter() sdktrace.SpanExporter
}
type tracerProviderWithExporter struct {
trace.TracerProvider
exp sdktrace.SpanExporter
}
func (t *tracerProviderWithExporter) SpanExporter() sdktrace.SpanExporter {
return t.exp
}
var _ TraceCollector = &tracerProviderWithExporter{}

View File

@ -26,16 +26,16 @@ import (
)
// NewWorkerOpt creates a WorkerOpt.
func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
opts = append(opts, containerd.WithDefaultNamespace(ns))
client, err := containerd.New(address, opts...)
if err != nil {
return base.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address)
}
return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile, parallelismSem)
return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile, parallelismSem, traceSocket)
}
func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) {
func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string) (base.WorkerOpt, error) {
if strings.Contains(snapshotterName, "/") {
return base.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName)
}
@ -115,7 +115,7 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
ID: id,
Labels: xlabels,
MetadataStore: md,
Executor: containerdexecutor.New(client, root, "", np, dns, apparmorProfile),
Executor: containerdexecutor.New(client, root, "", np, dns, apparmorProfile, traceSocket),
Snapshotter: snap,
ContentStore: cs,
Applier: winlayers.NewFileSystemApplierWithWindows(cs, df),

View File

@ -30,7 +30,7 @@ func newWorkerOpt(t *testing.T, addr string) (base.WorkerOpt, func()) {
tmpdir, err := ioutil.TempDir("", "workertest")
require.NoError(t, err)
cleanup := func() { os.RemoveAll(tmpdir) }
workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "", nil)
workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "", nil, "")
require.NoError(t, err)
return workerOpt, cleanup
}

View File

@ -33,7 +33,7 @@ type SnapshotterFactory struct {
}
// NewWorkerOpt creates a WorkerOpt.
func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) {
func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string) (base.WorkerOpt, error) {
var opt base.WorkerOpt
name := "runc-" + snFactory.Name
root = filepath.Join(root, name)
@ -64,6 +64,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
IdentityMapping: idmap,
DNS: dns,
ApparmorProfile: apparmorProfile,
TracingSocket: traceSocket,
}, np)
if err != nil {
return opt, err

View File

@ -40,7 +40,7 @@ func newWorkerOpt(t *testing.T, processMode oci.ProcessMode) (base.WorkerOpt, fu
},
}
rootless := false
workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "", nil)
workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "", nil, "")
require.NoError(t, err)
return workerOpt, cleanup