diff --git a/client/client.go b/client/client.go index dc09846d..07403beb 100644 --- a/client/client.go +++ b/client/client.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "io/ioutil" + "log" "net" "net/url" "strings" @@ -17,6 +18,7 @@ import ( "github.com/moby/buildkit/session/grpchijack" "github.com/moby/buildkit/util/appdefaults" "github.com/moby/buildkit/util/grpcerrors" + "github.com/moby/buildkit/util/tracing/detect" "github.com/moby/buildkit/util/tracing/otlptracegrpc" "github.com/pkg/errors" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -136,17 +138,12 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error } func (c *Client) setupDelegatedTracing(ctx context.Context) error { - span := trace.SpanFromContext(ctx) - if !span.SpanContext().IsValid() { - return nil + exp, err := detect.Exporter() + if err != nil { + return err } - exp, ok := span.TracerProvider().(interface { - SpanExporter() sdktrace.SpanExporter - }) - if !ok || exp == nil { - return nil - } + log.Printf("exporter %v %T", exp, exp) del, ok := exp.(interface { SetDelegate(context.Context, sdktrace.SpanExporter) error }) diff --git a/cmd/buildctl/main.go b/cmd/buildctl/main.go index f324e78c..97e89e7a 100644 --- a/cmd/buildctl/main.go +++ b/cmd/buildctl/main.go @@ -15,6 +15,7 @@ import ( "github.com/moby/buildkit/util/stack" _ "github.com/moby/buildkit/util/tracing/detect/delegated" _ "github.com/moby/buildkit/util/tracing/detect/jaeger" + _ "github.com/moby/buildkit/util/tracing/env" "github.com/moby/buildkit/version" "github.com/sirupsen/logrus" "github.com/urfave/cli" diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index 6f522551..16e85230 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -50,6 +50,8 @@ import ( "github.com/moby/buildkit/util/stack" "github.com/moby/buildkit/util/tracing/detect" _ "github.com/moby/buildkit/util/tracing/detect/jaeger" + _ "github.com/moby/buildkit/util/tracing/env" + "github.com/moby/buildkit/util/tracing/transform" "github.com/moby/buildkit/version" "github.com/moby/buildkit/worker" specs "github.com/opencontainers/image-spec/specs-go/v1" @@ -60,6 +62,8 @@ import ( "go.opentelemetry.io/otel/propagation" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" + tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" + v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) @@ -78,6 +82,7 @@ type workerInitializerOpt struct { config *config.Config configMetaData *toml.MetaData sessionManager *session.Manager + traceSocket string } type workerInitializer struct { @@ -305,7 +310,7 @@ func main() { profiler.Attach(app) if err := app.Run(os.Args); err != nil { - fmt.Fprintf(os.Stderr, "buildkitd: %s\n", err) + fmt.Fprintf(os.Stderr, "buildkitd: %+v\n", err) os.Exit(1) } } @@ -553,6 +558,10 @@ func unaryInterceptor(globalCtx context.Context, tp trace.TracerProvider) grpc.U } }() + if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") { + return handler(ctx, req) + } + resp, err = withTrace(ctx, req, info, handler) if err != nil { logrus.Errorf("%s returned error: %+v", info.FullMethod, stack.Formatter(err)) @@ -603,10 +612,25 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont if err != nil { return nil, err } + + tc, err := detect.Exporter() + if err != nil { + return nil, err + } + + var traceSocket string + if tc != nil { + traceSocket = filepath.Join(cfg.Root, "otel-grpc.sock") + if err := runTraceController(traceSocket, tc); err != nil { + return nil, err + } + } + wc, err := newWorkerController(c, workerInitializerOpt{ config: cfg, configMetaData: md, sessionManager: sessionManager, + traceSocket: traceSocket, }) if err != nil { return nil, err @@ -637,16 +661,6 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont "local": localremotecache.ResolveCacheImporterFunc(sessionManager), } - tp, err := detect.TracerProvider() - if err != nil { - return nil, err - } - - var tc sdktrace.SpanExporter - if tp, ok := tp.(detect.TraceCollector); ok { - tc = tp.SpanExporter() - } - return control.NewController(control.Opt{ SessionManager: sessionManager, WorkerController: wc, @@ -755,3 +769,32 @@ func getDNSConfig(cfg *config.DNSConfig) *oci.DNSConfig { } return dns } + +func runTraceController(p string, exp sdktrace.SpanExporter) error { + server := grpc.NewServer() + tracev1.RegisterTraceServiceServer(server, &traceCollector{exporter: exp}) + uid := os.Getuid() + l, err := sys.GetLocalListener(p, uid, uid) + if err != nil { + return err + } + if err := os.Chmod(p, 0666); err != nil { + l.Close() + return err + } + go server.Serve(l) + return nil +} + +type traceCollector struct { + *tracev1.UnimplementedTraceServiceServer + exporter sdktrace.SpanExporter +} + +func (t *traceCollector) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) { + err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans())) + if err != nil { + return nil, err + } + return &v1.ExportTraceServiceResponse{}, nil +} diff --git a/cmd/buildkitd/main_containerd_worker.go b/cmd/buildkitd/main_containerd_worker.go index 4e096334..570335fd 100644 --- a/cmd/buildkitd/main_containerd_worker.go +++ b/cmd/buildkitd/main_containerd_worker.go @@ -235,7 +235,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([ if cfg.Snapshotter != "" { snapshotter = cfg.Snapshotter } - opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, ctd.WithTimeout(60*time.Second)) + opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, common.traceSocket, ctd.WithTimeout(60*time.Second)) if err != nil { return nil, err } diff --git a/cmd/buildkitd/main_oci_worker.go b/cmd/buildkitd/main_oci_worker.go index dd822be5..dad6b23b 100644 --- a/cmd/buildkitd/main_oci_worker.go +++ b/cmd/buildkitd/main_oci_worker.go @@ -287,7 +287,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism)) } - opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem) + opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem, common.traceSocket) if err != nil { return nil, err } diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 305ac008..5e41712e 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -38,10 +38,11 @@ type containerdExecutor struct { running map[string]chan error mu sync.Mutex apparmorProfile string + traceSocket string } // New creates a new executor backed by connection to containerd API -func New(client *containerd.Client, root, cgroup string, networkProviders map[pb.NetMode]network.Provider, dnsConfig *oci.DNSConfig, apparmorProfile string) executor.Executor { +func New(client *containerd.Client, root, cgroup string, networkProviders map[pb.NetMode]network.Provider, dnsConfig *oci.DNSConfig, apparmorProfile string, traceSocket string) executor.Executor { // clean up old hosts/resolv.conf file. ignore errors os.RemoveAll(filepath.Join(root, "hosts")) os.RemoveAll(filepath.Join(root, "resolv.conf")) @@ -54,6 +55,7 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb dnsConfig: dnsConfig, running: make(map[string]chan error), apparmorProfile: apparmorProfile, + traceSocket: traceSocket, } } @@ -170,7 +172,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M opts = append(opts, containerdoci.WithCgroup(cgroupsPath)) } processMode := oci.ProcessSandbox // FIXME(AkihiroSuda) - spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, w.apparmorProfile, opts...) + spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, w.apparmorProfile, w.traceSocket, opts...) if err != nil { return err } diff --git a/executor/oci/spec.go b/executor/oci/spec.go index a01ae6c8..340d6203 100644 --- a/executor/oci/spec.go +++ b/executor/oci/spec.go @@ -37,7 +37,7 @@ const ( // GenerateSpec generates spec using containerd functionality. // opts are ignored for s.Process, s.Hostname, and s.Mounts . -func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mount, id, resolvConf, hostsFile string, namespace network.Namespace, processMode ProcessMode, idmap *idtools.IdentityMapping, apparmorProfile string, opts ...oci.SpecOpts) (*specs.Spec, func(), error) { +func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mount, id, resolvConf, hostsFile string, namespace network.Namespace, processMode ProcessMode, idmap *idtools.IdentityMapping, apparmorProfile string, tracingSocket string, opts ...oci.SpecOpts) (*specs.Spec, func(), error) { c := &containers.Container{ ID: id, } @@ -77,7 +77,11 @@ func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mou hostname = meta.Hostname } - meta.Env = append(meta.Env, traceexec.Environ(ctx)...) + if tracingSocket != "" { + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md + meta.Env = append(meta.Env, "OTEL_TRACES_EXPORTER=otlp", "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=unix:///dev/otel-grpc.sock", "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=grpc") + meta.Env = append(meta.Env, traceexec.Environ(ctx)...) + } opts = append(opts, oci.WithProcessArgs(meta.Args...), @@ -142,6 +146,15 @@ func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mou } } + if tracingSocket != "" { + s.Mounts = append(s.Mounts, specs.Mount{ + Destination: "/dev/otel-grpc.sock", + Type: "bind", + Source: tracingSocket, + Options: []string{"ro", "rbind"}, + }) + } + return s, releaseAll, nil } diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 79906325..7a858021 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -46,6 +46,7 @@ type Opt struct { DNS *oci.DNSConfig OOMScoreAdj *int ApparmorProfile string + TracingSocket string } var defaultCommandCandidates = []string{"buildkit-runc", "runc"} @@ -64,6 +65,7 @@ type runcExecutor struct { running map[string]chan error mu sync.Mutex apparmorProfile string + tracingSocket string } func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Executor, error) { @@ -127,6 +129,7 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex oomScoreAdj: opt.OOMScoreAdj, running: make(map[string]chan error), apparmorProfile: opt.ApparmorProfile, + tracingSocket: opt.TracingSocket, } return w, nil } @@ -256,7 +259,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, } opts = append(opts, containerdoci.WithCgroup(cgroupsPath)) } - spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.processMode, w.idmap, w.apparmorProfile, opts...) + spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.processMode, w.idmap, w.apparmorProfile, w.tracingSocket, opts...) if err != nil { return err } diff --git a/go.mod b/go.mod index 864556ba..60e416c3 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.0.0-RC1 go.opentelemetry.io/otel/sdk v1.0.0-RC1 go.opentelemetry.io/otel/trace v1.0.0-RC1 + go.opentelemetry.io/proto/otlp v0.9.0 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c diff --git a/util/tracing/detect/detect.go b/util/tracing/detect/detect.go index fb00c8cd..22b1ea2e 100644 --- a/util/tracing/detect/detect.go +++ b/util/tracing/detect/detect.go @@ -27,6 +27,7 @@ var ServiceName string var detectors map[string]detector var once sync.Once var tp trace.TracerProvider +var exporter sdktrace.SpanExporter var closers []func(context.Context) error var err error @@ -95,10 +96,8 @@ func detect() error { sdktp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sp), sdktrace.WithResource(res)) closers = append(closers, sdktp.Shutdown) - tp = &tracerProviderWithExporter{ - TracerProvider: sdktp, - exp: exp, - } + exporter = exp + tp = sdktp return nil } @@ -115,6 +114,14 @@ func TracerProvider() (trace.TracerProvider, error) { return tp, nil } +func Exporter() (sdktrace.SpanExporter, error) { + _, err := TracerProvider() + if err != nil { + return nil, err + } + return exporter, nil +} + func Shutdown(ctx context.Context) error { for _, c := range closers { if err := c(ctx); err != nil { @@ -141,18 +148,3 @@ func (serviceNameDetector) Detect(ctx context.Context) (*resource.Resource, erro }, ).Detect(ctx) } - -type TraceCollector interface { - SpanExporter() sdktrace.SpanExporter -} - -type tracerProviderWithExporter struct { - trace.TracerProvider - exp sdktrace.SpanExporter -} - -func (t *tracerProviderWithExporter) SpanExporter() sdktrace.SpanExporter { - return t.exp -} - -var _ TraceCollector = &tracerProviderWithExporter{} diff --git a/worker/containerd/containerd.go b/worker/containerd/containerd.go index 5b5ef079..2ac5ebf0 100644 --- a/worker/containerd/containerd.go +++ b/worker/containerd/containerd.go @@ -26,16 +26,16 @@ import ( ) // NewWorkerOpt creates a WorkerOpt. -func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, opts ...containerd.ClientOpt) (base.WorkerOpt, error) { +func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) { opts = append(opts, containerd.WithDefaultNamespace(ns)) client, err := containerd.New(address, opts...) if err != nil { return base.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address) } - return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile, parallelismSem) + return newContainerd(root, client, snapshotterName, ns, labels, dns, nopt, apparmorProfile, parallelismSem, traceSocket) } -func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) { +func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string) (base.WorkerOpt, error) { if strings.Contains(snapshotterName, "/") { return base.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName) } @@ -115,7 +115,7 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s ID: id, Labels: xlabels, MetadataStore: md, - Executor: containerdexecutor.New(client, root, "", np, dns, apparmorProfile), + Executor: containerdexecutor.New(client, root, "", np, dns, apparmorProfile, traceSocket), Snapshotter: snap, ContentStore: cs, Applier: winlayers.NewFileSystemApplierWithWindows(cs, df), diff --git a/worker/containerd/containerd_test.go b/worker/containerd/containerd_test.go index f2eb7198..c91575c4 100644 --- a/worker/containerd/containerd_test.go +++ b/worker/containerd/containerd_test.go @@ -30,7 +30,7 @@ func newWorkerOpt(t *testing.T, addr string) (base.WorkerOpt, func()) { tmpdir, err := ioutil.TempDir("", "workertest") require.NoError(t, err) cleanup := func() { os.RemoveAll(tmpdir) } - workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "", nil) + workerOpt, err := NewWorkerOpt(tmpdir, addr, "overlayfs", "buildkit-test", nil, nil, netproviders.Opt{Mode: "host"}, "", nil, "") require.NoError(t, err) return workerOpt, cleanup } diff --git a/worker/runc/runc.go b/worker/runc/runc.go index 6d81274c..0dc89609 100644 --- a/worker/runc/runc.go +++ b/worker/runc/runc.go @@ -33,7 +33,7 @@ type SnapshotterFactory struct { } // NewWorkerOpt creates a WorkerOpt. -func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string, parallelismSem *semaphore.Weighted) (base.WorkerOpt, error) { +func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string) (base.WorkerOpt, error) { var opt base.WorkerOpt name := "runc-" + snFactory.Name root = filepath.Join(root, name) @@ -64,6 +64,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc IdentityMapping: idmap, DNS: dns, ApparmorProfile: apparmorProfile, + TracingSocket: traceSocket, }, np) if err != nil { return opt, err diff --git a/worker/runc/runc_test.go b/worker/runc/runc_test.go index 3bd249db..65cde4eb 100644 --- a/worker/runc/runc_test.go +++ b/worker/runc/runc_test.go @@ -40,7 +40,7 @@ func newWorkerOpt(t *testing.T, processMode oci.ProcessMode) (base.WorkerOpt, fu }, } rootless := false - workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "", nil) + workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil, nil, netproviders.Opt{Mode: "host"}, nil, "", "", nil, "") require.NoError(t, err) return workerOpt, cleanup