From b873f0b5f16c45a6480e6976573f7d71dcd422fa Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 9 Jun 2021 20:50:54 -0700 Subject: [PATCH] tracing: add delegated exporter Signed-off-by: Tonis Tiigi --- util/tracing/detect/delegated/delegated.go | 66 ++++++++++++++++++++++ util/tracing/detect/detect.go | 15 ++++- 2 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 util/tracing/detect/delegated/delegated.go diff --git a/util/tracing/detect/delegated/delegated.go b/util/tracing/detect/delegated/delegated.go new file mode 100644 index 00000000..8b41b54d --- /dev/null +++ b/util/tracing/detect/delegated/delegated.go @@ -0,0 +1,66 @@ +package jaeger + +import ( + "context" + "sync" + + "github.com/moby/buildkit/util/tracing/detect" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +const maxBuffer = 128 + +var exp = &Exporter{} + +func init() { + detect.Register("delegated", func() (sdktrace.SpanExporter, error) { + return exp, nil + }) +} + +type Exporter struct { + mu sync.Mutex + delegate sdktrace.SpanExporter + buffer []sdktrace.ReadOnlySpan +} + +var _ sdktrace.SpanExporter = &Exporter{} + +func (e *Exporter) ExportSpans(ctx context.Context, ss []sdktrace.ReadOnlySpan) error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.delegate != nil { + return e.delegate.ExportSpans(ctx, ss) + } + + if len(e.buffer) > maxBuffer { + return nil + } + + e.buffer = append(e.buffer, ss...) + return nil +} + +func (e *Exporter) Shutdown(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.delegate != nil { + return e.delegate.Shutdown(ctx) + } + + return nil +} + +func (e *Exporter) SetDelegate(ctx context.Context, del sdktrace.SpanExporter) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.delegate = del + + if len(e.buffer) > 0 { + return e.delegate.ExportSpans(ctx, e.buffer) + } + return nil +} diff --git a/util/tracing/detect/detect.go b/util/tracing/detect/detect.go index 0a6b7e0b..a1ed79f6 100644 --- a/util/tracing/detect/detect.go +++ b/util/tracing/detect/detect.go @@ -78,7 +78,11 @@ func detect() error { sdktp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sp), sdktrace.WithResource(res)) closers = append(closers, sdktp.Shutdown) - tp = sdktp + + tp = &tracerProviderWithExporter{ + TracerProvider: sdktp, + exp: exp, + } return nil } @@ -122,3 +126,12 @@ func (serviceNameDetector) Detect(ctx context.Context) (*resource.Resource, erro }, ).Detect(ctx) } + +type tracerProviderWithExporter struct { + trace.TracerProvider + exp sdktrace.SpanExporter +} + +func (t *tracerProviderWithExporter) SpanExporter() sdktrace.SpanExporter { + return t.exp +}