如何使用Opentelemetry SDK进行分布式追踪和监控,实现微服务架构下的长尾词分析?

2026-04-18 06:512阅读0评论SEO资源
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1899个文字,预计阅读时间需要8分钟。

如何使用Opentelemetry SDK进行分布式追踪和监控,实现微服务架构下的长尾词分析?

Opentelemetry SDK 的使用概述Opentelemetry trace 简单架构图

客户端和服务器端都需要启动一个 traceProvider,主要用于将 trace 数据传输到 registry(如 Jaeger、OpenCensus 等)。

Opentelemetry SDK的简单用法 概述

Opentelemetry trace的简单架构图如下,客户端和服务端都需要启动一个traceProvider,主要用于将trace数据传输到registry(如jaeger、opencensus等)。client和server通过context将整个链路串起来。

traceProvider会周期性的将数据推送到Registry,默认是5s

func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor { ... o := BatchSpanProcessorOptions{ BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond, ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond, MaxQueueSize: maxQueueSize, MaxExportBatchSize: maxExportBatchSize, } ... }

下面是官方提供的SDK,它实现了opentelemetry的API,也是操作opentelemetry所使用的基本库:

tracesdk "go.opentelemetry.io/otel/sdk/trace" 创建TracerProvider

要使用trace,首先要创建一个TracerProvider,定义exporter以及相关属性。

使用全局TracerProvider

参数表示应用名称或代码库名称

var tracer = otel.Tracer("app_or_package_name") 创建TracerProvider

下面展示了使用Jaeger作为exporter的tracerProvider,其中包含两个概念:exporter和resource。前者为发送遥测数据的目的地,如jaeger、zepkin、opencensus等;后者通常用于添加非临时的底层元数据信息,如主机名,实例ID等。

// tracerProvider returns an OpenTelemetry TracerProvider configured to use // the Jaeger exporter that will send spans to the provided url. The returned // TracerProvider will also use a Resource configured with all the information // about the application. func tracerProvider(url string) (*tracesdk.TracerProvider, error) { // Create the Jaeger exporter exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) if err != nil { return nil, err } tp := tracesdk.NewTracerProvider( // Always be sure to batch in production. tracesdk.WithBatcher(exp), // Record information about this application in a Resource. tracesdk.WithResource(resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String(service), attribute.String("environment", environment), attribute.Int64("ID", id), )), ) return tp, nil }

可以使用如下方式创建resource,semconv包可以为资源属性提供规范化的名称。

// newResource returns a resource describing this application. func newResource() *resource.Resource { r, _ := resource.Merge( resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String("fib"), semconv.ServiceVersionKey.String("v0.1.0"), attribute.String("environment", "demo"), ), ) return r } 注册tracerProvider

如果使用自定义的tracerProvider,需要将其注册为全局tracerProvider:

如何使用Opentelemetry SDK进行分布式追踪和监控,实现微服务架构下的长尾词分析?

tp, err := tracerProvider("localhost:14268/api/traces") if err != nil { log.Fatal(err) } // Register our TracerProvider as the global so any imported // instrumentation in the future will default to using it. otel.SetTracerProvider(tp) 启动tracerProvider

tr := tp.Tracer("component-main") ctx, span := tr.Start(ctx, "foo") defer span.End() 关闭tracerProvider

当程序退出前,需要关闭tracerProvider,执行数据清理工作:

ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Cleanly shutdown and flush telemetry when the application exits. defer func(ctx context.Context) { // Do not make the application hang when it is shutdown. ctx, cancel = context.WithTimeout(ctx, time.Second*5) defer cancel() if err := tp.Shutdown(ctx); err != nil { log.Fatal(err) } }(ctx) span的简单用法

tracer会创建span,为了创建span,需要一个context.Context实例。该context通常来自于请求对象,或已经存在的父span。Go的context用于保存活动的span,当span启用后,就可以操作创建好的span以及其包含的已修改的上下文。当span结束后,其将成为不可变状态。

下面为从请求中获取span:

func localhost:14268/api/traces") if err != nil { log.Fatal(err) } // Register our TracerProvider as the global so any imported // instrumentation in the future will default to using it. otel.SetTracerProvider(tp) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Cleanly shutdown and flush telemetry when the application exits. defer func(ctx context.Context) { // Do not make the application hang when it is shutdown. ctx, cancel = context.WithTimeout(ctx, time.Second*5) defer cancel() if err := tp.Shutdown(ctx); err != nil { log.Fatal(err) } }(ctx) tr := tp.Tracer("component-main") ctx, span := tr.Start(ctx, "foo") defer span.End() bar(ctx) } func bar(ctx context.Context) { // Use the global TracerProvider. tr := otel.Tracer("component-bar") _, span := tr.Start(ctx, "bar") span.SetAttributes(attribute.Key("testset").String("value")) defer span.End() // Do bar... } Trace context的跨服务传播

为了跨服务传播Trace context需要注册一个propagator ,通常在创建注册TracerProvider之后执行。

func initTracer() (*sdktrace.TracerProvider, error) { // Create stdout exporter to be able to retrieve // the collected spans. exporter, err := stdout.New(stdout.WithPrettyPrint()) if err != nil { return nil, err } // For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces. // In a production application, use sdktrace.ProbabilitySampler with a desired probability. tp := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), sdktrace.WithBatcher(exporter), ) otel.SetTracerProvider(tp) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) return tp, err }

如上注册了两种propagator :TraceContext和Baggage,因此可以使用这两种数据结构传播上下文。

TraceContext

下面是gorilla/mux的服务端代码,通过 trace.SpanFromContext(r.Context())从请求的context构建span,当然也可以通过tracer.Start(c.Context(), "getUser", oteltrace.WithAttributes(attribute.String("id", id)))这种方式启动一个新的span:

func TestPropagationWithCustomPropagators(t *testing.T) { prop := propagation.TraceContext{} r := localhost:7777/hello", "server url") flag.Parse() client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)} bag, _ := baggage.Parse("username=donuts") ctx := baggage.ContextWithBaggage(context.Background(), bag) var body []byte tr := otel.Tracer("example/client") err = func(ctx context.Context) error { ctx, span := tr.Start(ctx, "say hello", trace.WithAttributes(semconv.PeerServiceKey.String("ExampleService"))) defer span.End() req, _ := http.NewRequestWithContext(ctx, "GET", *url, nil) fmt.Printf("Sending request...\n") res, err := client.Do(req) if err != nil { panic(err) } body, err = ioutil.ReadAll(res.Body) _ = res.Body.Close() return err }(ctx) if err != nil { log.Fatal(err) } fmt.Printf("Response Received: %s\n\n\n", body) fmt.Printf("Waiting for few seconds to export spans ...\n\n") time.Sleep(10 * time.Second) fmt.Printf("Inspect traces on stdout\n") }

服务端代码:

package main import ( "context" "io" "log" "net/http" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/baggage" stdout "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" "go.opentelemetry.io/otel/trace" ) func initTracer() (*sdktrace.TracerProvider, error) { // Create stdout exporter to be able to retrieve // the collected spans. exporter, err := stdout.New(stdout.WithPrettyPrint()) if err != nil { return nil, err } // For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces. // In a production application, use sdktrace.ProbabilitySampler with a desired probability. tp := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), sdktrace.WithBatcher(exporter), sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL, semconv.ServiceNameKey.String("ExampleService"))), ) otel.SetTracerProvider(tp) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) return tp, err } func main() { tp, err := initTracer() if err != nil { log.Fatal(err) } defer func() { if err := tp.Shutdown(context.Background()); err != nil { log.Printf("Error shutting down tracer provider: %v", err) } }() uk := attribute.Key("username") helloHandler := func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() span := trace.SpanFromContext(ctx) // span为Hello defer span.End() bag := baggage.FromContext(ctx) span.AddEvent("handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value()))) _, _ = io.WriteString(w, "Hello, world!\n") } // otelhttp.NewHandler会在处理请求的同时创建一个名为Hello的span otelHandler := otelhttp.NewHandler(http.HandlerFunc(helloHandler), "Hello") http.Handle("/hello", otelHandler) err = http.ListenAndServe(":7777", nil) if err != nil { log.Fatal(err) } }

上述代码生成的链路跟踪如下,client的HTTP GET会调用server端的Hello。Server的Hello span是在处理请求时生成的,上述用的是otelhttp,其他registry也是类似的处理方式。

使用如下代码则可以启动两个独立的span,可以表示两个并行的任务:

helloHandler := func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() ctx, span1 := tracer.Start(ctx, "span1 proecss", trace.WithLinks()) defer span1.End() bag := baggage.FromContext(req.Context()) span1.SetAttributes(attribute.String("span1", "test1")) span1.AddEvent("span1 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value()))) ctx, span2 := tracer.Start(req.Context(), "span2 proecss", trace.WithLinks()) defer span2.End() span2.SetAttributes(attribute.String("span2", "test2")) span2.AddEvent("span2 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value()))) _, _ = io.WriteString(w, "Hello, world!\n") }

此外还可以通过baggage.NewKeyValueProperty("key", "value")等方式创建baggage。

注:baggage要遵循W3C Baggage 规范。

支持otel的工具

官方给出了很多Registry,如Gorilla Mux、GORM、Gin-gonic 、gRPC等。更多可以参见官方代码库。

采样

provider := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), )

  • AlwaysSample:采集每条链路信息
  • NeverSample :不采集
  • TraceIDRatioBased:按比例采集,即如果将其设置.5,则表示采集一半链路信息
  • ParentBased:根据传入的采样决策表现不同。通常会父span已采样的span进行采样,而不会对父span未采样的span进行采样。

生产中可以考虑使用TraceIDRatioBasedParentBased

参考
  • Opentelemetry 官方指导

本文共计1899个文字,预计阅读时间需要8分钟。

如何使用Opentelemetry SDK进行分布式追踪和监控,实现微服务架构下的长尾词分析?

Opentelemetry SDK 的使用概述Opentelemetry trace 简单架构图

客户端和服务器端都需要启动一个 traceProvider,主要用于将 trace 数据传输到 registry(如 Jaeger、OpenCensus 等)。

Opentelemetry SDK的简单用法 概述

Opentelemetry trace的简单架构图如下,客户端和服务端都需要启动一个traceProvider,主要用于将trace数据传输到registry(如jaeger、opencensus等)。client和server通过context将整个链路串起来。

traceProvider会周期性的将数据推送到Registry,默认是5s

func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor { ... o := BatchSpanProcessorOptions{ BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond, ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond, MaxQueueSize: maxQueueSize, MaxExportBatchSize: maxExportBatchSize, } ... }

下面是官方提供的SDK,它实现了opentelemetry的API,也是操作opentelemetry所使用的基本库:

tracesdk "go.opentelemetry.io/otel/sdk/trace" 创建TracerProvider

要使用trace,首先要创建一个TracerProvider,定义exporter以及相关属性。

使用全局TracerProvider

参数表示应用名称或代码库名称

var tracer = otel.Tracer("app_or_package_name") 创建TracerProvider

下面展示了使用Jaeger作为exporter的tracerProvider,其中包含两个概念:exporter和resource。前者为发送遥测数据的目的地,如jaeger、zepkin、opencensus等;后者通常用于添加非临时的底层元数据信息,如主机名,实例ID等。

// tracerProvider returns an OpenTelemetry TracerProvider configured to use // the Jaeger exporter that will send spans to the provided url. The returned // TracerProvider will also use a Resource configured with all the information // about the application. func tracerProvider(url string) (*tracesdk.TracerProvider, error) { // Create the Jaeger exporter exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) if err != nil { return nil, err } tp := tracesdk.NewTracerProvider( // Always be sure to batch in production. tracesdk.WithBatcher(exp), // Record information about this application in a Resource. tracesdk.WithResource(resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String(service), attribute.String("environment", environment), attribute.Int64("ID", id), )), ) return tp, nil }

可以使用如下方式创建resource,semconv包可以为资源属性提供规范化的名称。

// newResource returns a resource describing this application. func newResource() *resource.Resource { r, _ := resource.Merge( resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String("fib"), semconv.ServiceVersionKey.String("v0.1.0"), attribute.String("environment", "demo"), ), ) return r } 注册tracerProvider

如果使用自定义的tracerProvider,需要将其注册为全局tracerProvider:

如何使用Opentelemetry SDK进行分布式追踪和监控,实现微服务架构下的长尾词分析?

tp, err := tracerProvider("localhost:14268/api/traces") if err != nil { log.Fatal(err) } // Register our TracerProvider as the global so any imported // instrumentation in the future will default to using it. otel.SetTracerProvider(tp) 启动tracerProvider

tr := tp.Tracer("component-main") ctx, span := tr.Start(ctx, "foo") defer span.End() 关闭tracerProvider

当程序退出前,需要关闭tracerProvider,执行数据清理工作:

ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Cleanly shutdown and flush telemetry when the application exits. defer func(ctx context.Context) { // Do not make the application hang when it is shutdown. ctx, cancel = context.WithTimeout(ctx, time.Second*5) defer cancel() if err := tp.Shutdown(ctx); err != nil { log.Fatal(err) } }(ctx) span的简单用法

tracer会创建span,为了创建span,需要一个context.Context实例。该context通常来自于请求对象,或已经存在的父span。Go的context用于保存活动的span,当span启用后,就可以操作创建好的span以及其包含的已修改的上下文。当span结束后,其将成为不可变状态。

下面为从请求中获取span:

func localhost:14268/api/traces") if err != nil { log.Fatal(err) } // Register our TracerProvider as the global so any imported // instrumentation in the future will default to using it. otel.SetTracerProvider(tp) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Cleanly shutdown and flush telemetry when the application exits. defer func(ctx context.Context) { // Do not make the application hang when it is shutdown. ctx, cancel = context.WithTimeout(ctx, time.Second*5) defer cancel() if err := tp.Shutdown(ctx); err != nil { log.Fatal(err) } }(ctx) tr := tp.Tracer("component-main") ctx, span := tr.Start(ctx, "foo") defer span.End() bar(ctx) } func bar(ctx context.Context) { // Use the global TracerProvider. tr := otel.Tracer("component-bar") _, span := tr.Start(ctx, "bar") span.SetAttributes(attribute.Key("testset").String("value")) defer span.End() // Do bar... } Trace context的跨服务传播

为了跨服务传播Trace context需要注册一个propagator ,通常在创建注册TracerProvider之后执行。

func initTracer() (*sdktrace.TracerProvider, error) { // Create stdout exporter to be able to retrieve // the collected spans. exporter, err := stdout.New(stdout.WithPrettyPrint()) if err != nil { return nil, err } // For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces. // In a production application, use sdktrace.ProbabilitySampler with a desired probability. tp := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), sdktrace.WithBatcher(exporter), ) otel.SetTracerProvider(tp) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) return tp, err }

如上注册了两种propagator :TraceContext和Baggage,因此可以使用这两种数据结构传播上下文。

TraceContext

下面是gorilla/mux的服务端代码,通过 trace.SpanFromContext(r.Context())从请求的context构建span,当然也可以通过tracer.Start(c.Context(), "getUser", oteltrace.WithAttributes(attribute.String("id", id)))这种方式启动一个新的span:

func TestPropagationWithCustomPropagators(t *testing.T) { prop := propagation.TraceContext{} r := localhost:7777/hello", "server url") flag.Parse() client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)} bag, _ := baggage.Parse("username=donuts") ctx := baggage.ContextWithBaggage(context.Background(), bag) var body []byte tr := otel.Tracer("example/client") err = func(ctx context.Context) error { ctx, span := tr.Start(ctx, "say hello", trace.WithAttributes(semconv.PeerServiceKey.String("ExampleService"))) defer span.End() req, _ := http.NewRequestWithContext(ctx, "GET", *url, nil) fmt.Printf("Sending request...\n") res, err := client.Do(req) if err != nil { panic(err) } body, err = ioutil.ReadAll(res.Body) _ = res.Body.Close() return err }(ctx) if err != nil { log.Fatal(err) } fmt.Printf("Response Received: %s\n\n\n", body) fmt.Printf("Waiting for few seconds to export spans ...\n\n") time.Sleep(10 * time.Second) fmt.Printf("Inspect traces on stdout\n") }

服务端代码:

package main import ( "context" "io" "log" "net/http" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/baggage" stdout "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" "go.opentelemetry.io/otel/trace" ) func initTracer() (*sdktrace.TracerProvider, error) { // Create stdout exporter to be able to retrieve // the collected spans. exporter, err := stdout.New(stdout.WithPrettyPrint()) if err != nil { return nil, err } // For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces. // In a production application, use sdktrace.ProbabilitySampler with a desired probability. tp := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), sdktrace.WithBatcher(exporter), sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL, semconv.ServiceNameKey.String("ExampleService"))), ) otel.SetTracerProvider(tp) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) return tp, err } func main() { tp, err := initTracer() if err != nil { log.Fatal(err) } defer func() { if err := tp.Shutdown(context.Background()); err != nil { log.Printf("Error shutting down tracer provider: %v", err) } }() uk := attribute.Key("username") helloHandler := func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() span := trace.SpanFromContext(ctx) // span为Hello defer span.End() bag := baggage.FromContext(ctx) span.AddEvent("handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value()))) _, _ = io.WriteString(w, "Hello, world!\n") } // otelhttp.NewHandler会在处理请求的同时创建一个名为Hello的span otelHandler := otelhttp.NewHandler(http.HandlerFunc(helloHandler), "Hello") http.Handle("/hello", otelHandler) err = http.ListenAndServe(":7777", nil) if err != nil { log.Fatal(err) } }

上述代码生成的链路跟踪如下,client的HTTP GET会调用server端的Hello。Server的Hello span是在处理请求时生成的,上述用的是otelhttp,其他registry也是类似的处理方式。

使用如下代码则可以启动两个独立的span,可以表示两个并行的任务:

helloHandler := func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() ctx, span1 := tracer.Start(ctx, "span1 proecss", trace.WithLinks()) defer span1.End() bag := baggage.FromContext(req.Context()) span1.SetAttributes(attribute.String("span1", "test1")) span1.AddEvent("span1 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value()))) ctx, span2 := tracer.Start(req.Context(), "span2 proecss", trace.WithLinks()) defer span2.End() span2.SetAttributes(attribute.String("span2", "test2")) span2.AddEvent("span2 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value()))) _, _ = io.WriteString(w, "Hello, world!\n") }

此外还可以通过baggage.NewKeyValueProperty("key", "value")等方式创建baggage。

注:baggage要遵循W3C Baggage 规范。

支持otel的工具

官方给出了很多Registry,如Gorilla Mux、GORM、Gin-gonic 、gRPC等。更多可以参见官方代码库。

采样

provider := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), )

  • AlwaysSample:采集每条链路信息
  • NeverSample :不采集
  • TraceIDRatioBased:按比例采集,即如果将其设置.5,则表示采集一半链路信息
  • ParentBased:根据传入的采样决策表现不同。通常会父span已采样的span进行采样,而不会对父span未采样的span进行采样。

生产中可以考虑使用TraceIDRatioBasedParentBased

参考
  • Opentelemetry 官方指导