enable collecting traces via control api

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
v0.9
Tonis Tiigi 2021-06-09 16:34:14 -07:00
parent 750f9af97c
commit d8fc4e15f9
8 changed files with 109 additions and 13 deletions

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"net"
"net/url"
"strings"
"github.com/containerd/containerd/defaults"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@ -16,9 +17,12 @@ import (
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/tracing/otlptracegrpc"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -60,7 +64,6 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
if wt, ok := o.(*withTracer); ok {
customTracer = true
tracerProvider = wt.tp
}
if wd, ok := o.(*withDialer); ok {
gopts = append(gopts, grpc.WithContextDialer(wd.dialer))
@ -76,7 +79,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
if tracerProvider != nil {
var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
unary = append(unary, otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators)))
unary = append(unary, filterInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators))))
stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators)))
}
@ -122,12 +125,43 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q . make sure buildkitd is running", address)
}
c := &Client{
conn: conn,
}
_ = c.setupDelegatedTracing(ctx) // ignore error
return c, nil
}
func (c *Client) setupDelegatedTracing(ctx context.Context) error {
span := trace.SpanFromContext(ctx)
if !span.SpanContext().IsValid() {
return nil
}
exp, ok := span.TracerProvider().(interface {
SpanExporter() sdktrace.SpanExporter
})
if !ok || exp == nil {
return nil
}
del, ok := exp.(interface {
SetDelegate(context.Context, sdktrace.SpanExporter) error
})
if !ok {
return nil
}
pd := otlptracegrpc.NewClient(c.conn)
e, err := otlptrace.New(ctx, pd)
if err != nil {
return nil
}
return del.SetDelegate(ctx, e)
}
func (c *Client) controlClient() controlapi.ControlClient {
return controlapi.NewControlClient(c.conn)
}
@ -219,3 +253,12 @@ func resolveDialer(address string) (func(context.Context, string) (net.Conn, err
// basic dialer
return dialer, nil
}
func filterInterceptor(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if strings.HasSuffix(method, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return invoker(ctx, method, req, reply, cc, opts...)
}
return intercept(ctx, method, req, reply, cc, invoker, opts...)
}
}

View File

@ -13,6 +13,7 @@ import (
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/util/stack"
_ "github.com/moby/buildkit/util/tracing/detect/delegated"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
"github.com/moby/buildkit/version"
"github.com/sirupsen/logrus"

View File

@ -58,6 +58,7 @@ import (
"github.com/urfave/cli"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
@ -635,6 +636,17 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
"registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn),
"local": localremotecache.ResolveCacheImporterFunc(sessionManager),
}
tp, err := detect.TracerProvider()
if err != nil {
return nil, err
}
var tc sdktrace.SpanExporter
if tp, ok := tp.(detect.TraceCollector); ok {
tc = tp.SpanExporter()
}
return control.NewController(control.Opt{
SessionManager: sessionManager,
WorkerController: wc,
@ -643,6 +655,7 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
ResolveCacheImporterFuncs: remoteCacheImporterFuncs,
CacheKeyStorage: cacheStorage,
Entitlements: cfg.Entitlements,
TraceCollector: tc,
})
}

View File

@ -20,9 +20,13 @@ import (
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/throttle"
"github.com/moby/buildkit/util/tracing/transform"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
@ -35,9 +39,12 @@ type Opt struct {
ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc
ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
Entitlements []string
TraceCollector sdktrace.SpanExporter
}
type Controller struct { // TODO: ControlService
*tracev1.UnimplementedTraceServiceServer
buildCount int64
opt Opt
solver *llbsolver.Solver
@ -75,6 +82,7 @@ func NewController(opt Opt) (*Controller, error) {
func (c *Controller) Register(server *grpc.Server) error {
controlapi.RegisterControlServer(server, c)
c.gatewayForwarder.Register(server)
tracev1.RegisterTraceServiceServer(server, c)
return nil
}
@ -184,6 +192,14 @@ func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Contr
return eg2.Wait()
}
func (c *Controller) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
err := c.opt.TraceCollector.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
return nil, err
}
return &v1.ExportTraceServiceResponse{}, nil
}
func translateLegacySolveRequest(req *controlapi.SolveRequest) error {
// translates ExportRef and ExportAttrs to new Exports (v0.4.0)
if legacyExportRef := req.Cache.ExportRefDeprecated; legacyExportRef != "" {

View File

@ -15,7 +15,7 @@ var exp = &Exporter{}
func init() {
detect.Register("delegated", func() (sdktrace.SpanExporter, error) {
return exp, nil
})
}, 100)
}
type Exporter struct {
@ -60,7 +60,9 @@ func (e *Exporter) SetDelegate(ctx context.Context, del sdktrace.SpanExporter) e
e.delegate = del
if len(e.buffer) > 0 {
return e.delegate.ExportSpans(ctx, e.buffer)
err := e.delegate.ExportSpans(ctx, e.buffer)
e.buffer = nil
return err
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
@ -16,19 +17,27 @@ import (
type ExporterDetector func() (sdktrace.SpanExporter, error)
type detector struct {
f ExporterDetector
priority int
}
var ServiceName string
var detectors map[string]ExporterDetector
var detectors map[string]detector
var once sync.Once
var tp trace.TracerProvider
var closers []func(context.Context) error
var err error
func Register(name string, exp ExporterDetector) {
func Register(name string, exp ExporterDetector, priority int) {
if detectors == nil {
detectors = map[string]ExporterDetector{}
detectors = map[string]detector{}
}
detectors[name] = detector{
f: exp,
priority: priority,
}
detectors[name] = exp
}
func detectExporter() (sdktrace.SpanExporter, error) {
@ -40,10 +49,17 @@ func detectExporter() (sdktrace.SpanExporter, error) {
}
return nil, errors.Errorf("unsupported opentelemetry tracer %v", n)
}
return d()
return d.f()
}
arr := make([]detector, 0, len(detectors))
for _, d := range detectors {
exp, err := d()
arr = append(arr, d)
}
sort.Slice(arr, func(i, j int) bool {
return arr[i].priority < arr[j].priority
})
for _, d := range arr {
exp, err := d.f()
if err != nil {
return nil, err
}
@ -83,7 +99,6 @@ func detect() error {
TracerProvider: sdktp,
exp: exp,
}
return nil
}
@ -127,6 +142,10 @@ func (serviceNameDetector) Detect(ctx context.Context) (*resource.Resource, erro
).Detect(ctx)
}
type TraceCollector interface {
SpanExporter() sdktrace.SpanExporter
}
type tracerProviderWithExporter struct {
trace.TracerProvider
exp sdktrace.SpanExporter
@ -135,3 +154,5 @@ type tracerProviderWithExporter struct {
func (t *tracerProviderWithExporter) SpanExporter() sdktrace.SpanExporter {
return t.exp
}
var _ TraceCollector = &tracerProviderWithExporter{}

View File

@ -11,7 +11,7 @@ import (
)
func init() {
detect.Register("jaeger", jaegerExporter)
detect.Register("jaeger", jaegerExporter, 11)
}
func jaegerExporter() (sdktrace.SpanExporter, error) {

View File

@ -12,7 +12,7 @@ import (
)
func init() {
Register("otlp", otlpExporter)
Register("otlp", otlpExporter, 10)
}
func otlpExporter() (sdktrace.SpanExporter, error) {