correlation-tracing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Correlation & Distributed Tracing

关联ID与分布式追踪

Overview

概述

Implement correlation IDs and distributed tracing to track requests across multiple services and understand system behavior.
实现关联ID与分布式追踪,以跨多个服务追踪请求并了解系统行为。

When to Use

适用场景

  • Microservices architectures
  • Debugging distributed systems
  • Performance monitoring
  • Request flow visualization
  • Error tracking across services
  • Dependency analysis
  • Latency optimization
  • 微服务架构
  • 调试分布式系统
  • 性能监控
  • 请求流可视化
  • 跨服务错误追踪
  • 依赖分析
  • 延迟优化

Implementation Examples

实现示例

1. Correlation ID Middleware (Express)

1. 关联ID中间件(Express)

typescript
import express from 'express';
import { v4 as uuidv4 } from 'uuid';

// Async local storage for context
import { AsyncLocalStorage } from 'async_hooks';

const traceContext = new AsyncLocalStorage<Map<string, any>>();

interface TraceContext {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  serviceName: string;
}

function correlationMiddleware(serviceName: string) {
  return (
    req: express.Request,
    res: express.Response,
    next: express.NextFunction
  ) => {
    // Extract or generate trace ID
    const traceId = req.headers['x-trace-id'] as string || uuidv4();
    const parentSpanId = req.headers['x-span-id'] as string;
    const spanId = uuidv4();

    // Set context
    const context = new Map<string, any>();
    context.set('traceId', traceId);
    context.set('spanId', spanId);
    context.set('parentSpanId', parentSpanId);
    context.set('serviceName', serviceName);

    // Inject trace headers
    res.setHeader('X-Trace-Id', traceId);
    res.setHeader('X-Span-Id', spanId);

    // Run in context
    traceContext.run(context, () => {
      next();
    });
  };
}

// Helper to get current context
function getTraceContext(): TraceContext | null {
  const context = traceContext.getStore();
  if (!context) return null;

  return {
    traceId: context.get('traceId'),
    spanId: context.get('spanId'),
    parentSpanId: context.get('parentSpanId'),
    serviceName: context.get('serviceName')
  };
}

// Enhanced logger with trace context
class TracedLogger {
  log(level: string, message: string, data?: any): void {
    const context = getTraceContext();

    const logEntry = {
      level,
      message,
      ...data,
      ...context,
      timestamp: new Date().toISOString()
    };

    console.log(JSON.stringify(logEntry));
  }

  info(message: string, data?: any): void {
    this.log('info', message, data);
  }

  error(message: string, data?: any): void {
    this.log('error', message, data);
  }

  warn(message: string, data?: any): void {
    this.log('warn', message, data);
  }
}

const logger = new TracedLogger();

// HTTP client with trace propagation
async function tracedFetch(
  url: string,
  options: RequestInit = {}
): Promise<Response> {
  const context = getTraceContext();

  const headers = new Headers(options.headers);

  if (context) {
    headers.set('X-Trace-Id', context.traceId);
    headers.set('X-Span-Id', context.spanId);
    headers.set('X-Parent-Span-Id', context.spanId);
  }

  const startTime = Date.now();

  try {
    const response = await fetch(url, {
      ...options,
      headers
    });

    const duration = Date.now() - startTime;

    logger.info('HTTP request completed', {
      method: options.method || 'GET',
      url,
      statusCode: response.status,
      duration
    });

    return response;
  } catch (error) {
    const duration = Date.now() - startTime;

    logger.error('HTTP request failed', {
      method: options.method || 'GET',
      url,
      error: (error as Error).message,
      duration
    });

    throw error;
  }
}

// Usage
const app = express();

app.use(correlationMiddleware('api-service'));

app.get('/api/users/:id', async (req, res) => {
  logger.info('Fetching user', { userId: req.params.id });

  // Call another service with trace propagation
  const response = await tracedFetch(
    `http://user-service/users/${req.params.id}`
  );

  const data = await response.json();

  logger.info('User fetched successfully');

  res.json(data);
});
typescript
import express from 'express';
import { v4 as uuidv4 } from 'uuid';

// Async local storage for context
import { AsyncLocalStorage } from 'async_hooks';

const traceContext = new AsyncLocalStorage<Map<string, any>>();

interface TraceContext {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  serviceName: string;
}

function correlationMiddleware(serviceName: string) {
  return (
    req: express.Request,
    res: express.Response,
    next: express.NextFunction
  ) => {
    // Extract or generate trace ID
    const traceId = req.headers['x-trace-id'] as string || uuidv4();
    const parentSpanId = req.headers['x-span-id'] as string;
    const spanId = uuidv4();

    // Set context
    const context = new Map<string, any>();
    context.set('traceId', traceId);
    context.set('spanId', spanId);
    context.set('parentSpanId', parentSpanId);
    context.set('serviceName', serviceName);

    // Inject trace headers
    res.setHeader('X-Trace-Id', traceId);
    res.setHeader('X-Span-Id', spanId);

    // Run in context
    traceContext.run(context, () => {
      next();
    });
  };
}

// Helper to get current context
function getTraceContext(): TraceContext | null {
  const context = traceContext.getStore();
  if (!context) return null;

  return {
    traceId: context.get('traceId'),
    spanId: context.get('spanId'),
    parentSpanId: context.get('parentSpanId'),
    serviceName: context.get('serviceName')
  };
}

// Enhanced logger with trace context
class TracedLogger {
  log(level: string, message: string, data?: any): void {
    const context = getTraceContext();

    const logEntry = {
      level,
      message,
      ...data,
      ...context,
      timestamp: new Date().toISOString()
    };

    console.log(JSON.stringify(logEntry));
  }

  info(message: string, data?: any): void {
    this.log('info', message, data);
  }

  error(message: string, data?: any): void {
    this.log('error', message, data);
  }

  warn(message: string, data?: any): void {
    this.log('warn', message, data);
  }
}

const logger = new TracedLogger();

// HTTP client with trace propagation
async function tracedFetch(
  url: string,
  options: RequestInit = {}
): Promise<Response> {
  const context = getTraceContext();

  const headers = new Headers(options.headers);

  if (context) {
    headers.set('X-Trace-Id', context.traceId);
    headers.set('X-Span-Id', context.spanId);
    headers.set('X-Parent-Span-Id', context.spanId);
  }

  const startTime = Date.now();

  try {
    const response = await fetch(url, {
      ...options,
      headers
    });

    const duration = Date.now() - startTime;

    logger.info('HTTP request completed', {
      method: options.method || 'GET',
      url,
      statusCode: response.status,
      duration
    });

    return response;
  } catch (error) {
    const duration = Date.now() - startTime;

    logger.error('HTTP request failed', {
      method: options.method || 'GET',
      url,
      error: (error as Error).message,
      duration
    });

    throw error;
  }
}

// Usage
const app = express();

app.use(correlationMiddleware('api-service'));

app.get('/api/users/:id', async (req, res) => {
  logger.info('Fetching user', { userId: req.params.id });

  // Call another service with trace propagation
  const response = await tracedFetch(
    `http://user-service/users/${req.params.id}`
  );

  const data = await response.json();

  logger.info('User fetched successfully');

  res.json(data);
});

2. OpenTelemetry Integration

2. OpenTelemetry集成

typescript
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';

// Configure OpenTelemetry
const sdk = new NodeSDK({
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: 'my-service',
    [SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
  }),
  traceExporter: new JaegerExporter({
    endpoint: 'http://localhost:14268/api/traces',
  }),
  instrumentations: [
    getNodeAutoInstrumentations({
      '@opentelemetry/instrumentation-http': {
        enabled: true,
      },
      '@opentelemetry/instrumentation-express': {
        enabled: true,
      },
      '@opentelemetry/instrumentation-pg': {
        enabled: true,
      },
    }),
  ],
});

sdk.start();

// Custom spans
import { trace, SpanStatusCode } from '@opentelemetry/api';

const tracer = trace.getTracer('my-service');

async function processOrder(orderId: string) {
  const span = tracer.startSpan('process_order');

  span.setAttribute('order.id', orderId);

  try {
    // Validate order
    const validateSpan = tracer.startSpan('validate_order', {
      parent: span,
    });

    await validateOrder(orderId);
    validateSpan.setStatus({ code: SpanStatusCode.OK });
    validateSpan.end();

    // Process payment
    const paymentSpan = tracer.startSpan('process_payment', {
      parent: span,
    });

    await processPayment(orderId);
    paymentSpan.setStatus({ code: SpanStatusCode.OK });
    paymentSpan.end();

    span.setStatus({ code: SpanStatusCode.OK });
  } catch (error) {
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: (error as Error).message,
    });
    span.recordException(error as Error);
    throw error;
  } finally {
    span.end();
  }
}

async function validateOrder(orderId: string) {
  // Validation logic
}

async function processPayment(orderId: string) {
  // Payment logic
}
typescript
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';

// Configure OpenTelemetry
const sdk = new NodeSDK({
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: 'my-service',
    [SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
  }),
  traceExporter: new JaegerExporter({
    endpoint: 'http://localhost:14268/api/traces',
  }),
  instrumentations: [
    getNodeAutoInstrumentations({
      '@opentelemetry/instrumentation-http': {
        enabled: true,
      },
      '@opentelemetry/instrumentation-express': {
        enabled: true,
      },
      '@opentelemetry/instrumentation-pg': {
        enabled: true,
      },
    }),
  ],
});

sdk.start();

// Custom spans
import { trace, SpanStatusCode } from '@opentelemetry/api';

const tracer = trace.getTracer('my-service');

async function processOrder(orderId: string) {
  const span = tracer.startSpan('process_order');

  span.setAttribute('order.id', orderId);

  try {
    // Validate order
    const validateSpan = tracer.startSpan('validate_order', {
      parent: span,
    });

    await validateOrder(orderId);
    validateSpan.setStatus({ code: SpanStatusCode.OK });
    validateSpan.end();

    // Process payment
    const paymentSpan = tracer.startSpan('process_payment', {
      parent: span,
    });

    await processPayment(orderId);
    paymentSpan.setStatus({ code: SpanStatusCode.OK });
    paymentSpan.end();

    span.setStatus({ code: SpanStatusCode.OK });
  } catch (error) {
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: (error as Error).message,
    });
    span.recordException(error as Error);
    throw error;
  } finally {
    span.end();
  }
}

async function validateOrder(orderId: string) {
  // Validation logic
}

async function processPayment(orderId: string) {
  // Payment logic
}

3. Python Distributed Tracing

3. Python分布式追踪

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource
from flask import Flask, request
import requests
import uuid
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource
from flask import Flask, request
import requests
import uuid

Setup tracing

Setup tracing

resource = Resource.create({"service.name": "python-service"}) trace.set_tracer_provider(TracerProvider(resource=resource))
jaeger_exporter = JaegerExporter( agent_host_name="localhost", agent_port=6831, )
trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) )
resource = Resource.create({"service.name": "python-service"}) trace.set_tracer_provider(TracerProvider(resource=resource))
jaeger_exporter = JaegerExporter( agent_host_name="localhost", agent_port=6831, )
trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) )

Auto-instrument Flask and requests

Auto-instrument Flask and requests

app = Flask(name) FlaskInstrumentor().instrument_app(app) RequestsInstrumentor().instrument()
tracer = trace.get_tracer(name)
@app.route('/api/orders/<order_id>') def get_order(order_id): # Current span is automatically created by FlaskInstrumentor
with tracer.start_as_current_span("fetch_order_details") as span:
    span.set_attribute("order.id", order_id)

    # Fetch from database
    with tracer.start_as_current_span("database_query"):
        order = fetch_order_from_db(order_id)

    # Call another service (automatically traced)
    with tracer.start_as_current_span("fetch_user_details"):
        user = requests.get(
            f"http://user-service/users/{order['user_id']}"
        ).json()

    return {
        "order": order,
        "user": user
    }
def fetch_order_from_db(order_id): # Database logic return {"id": order_id, "user_id": "user123"}
if name == 'main': app.run(port=5000)
undefined
app = Flask(name) FlaskInstrumentor().instrument_app(app) RequestsInstrumentor().instrument()
tracer = trace.get_tracer(name)
@app.route('/api/orders/<order_id>') def get_order(order_id): # Current span is automatically created by FlaskInstrumentor
with tracer.start_as_current_span("fetch_order_details") as span:
    span.set_attribute("order.id", order_id)

    # Fetch from database
    with tracer.start_as_current_span("database_query"):
        order = fetch_order_from_db(order_id)

    # Call another service (automatically traced)
    with tracer.start_as_current_span("fetch_user_details"):
        user = requests.get(
            f"http://user-service/users/{order['user_id']}"
        ).json()

    return {
        "order": order,
        "user": user
    }
def fetch_order_from_db(order_id): # Database logic return {"id": order_id, "user_id": "user123"}
if name == 'main': app.run(port=5000)
undefined

4. Manual Trace Propagation

4. 手动追踪传播

typescript
interface Span {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  name: string;
  serviceName: string;
  startTime: number;
  endTime?: number;
  duration?: number;
  tags: Record<string, any>;
  logs: Array<{ timestamp: number; message: string; fields?: any }>;
  status: 'ok' | 'error';
}

class DistributedTracer {
  private spans: Span[] = [];

  startSpan(
    name: string,
    parentSpanId?: string
  ): Span {
    const context = getTraceContext();

    const span: Span = {
      traceId: context?.traceId || uuidv4(),
      spanId: uuidv4(),
      parentSpanId: parentSpanId || context?.parentSpanId,
      name,
      serviceName: context?.serviceName || 'unknown',
      startTime: Date.now(),
      tags: {},
      logs: [],
      status: 'ok'
    };

    this.spans.push(span);
    return span;
  }

  endSpan(span: Span): void {
    span.endTime = Date.now();
    span.duration = span.endTime - span.startTime;

    // Send to tracing backend
    this.reportSpan(span);
  }

  setTag(span: Span, key: string, value: any): void {
    span.tags[key] = value;
  }

  logEvent(span: Span, message: string, fields?: any): void {
    span.logs.push({
      timestamp: Date.now(),
      message,
      fields
    });
  }

  setError(span: Span, error: Error): void {
    span.status = 'error';
    span.tags['error'] = true;
    span.tags['error.message'] = error.message;
    span.tags['error.stack'] = error.stack;
  }

  private async reportSpan(span: Span): Promise<void> {
    // Send to Jaeger, Zipkin, or other backend
    console.log('Reporting span:', JSON.stringify(span, null, 2));

    // In production:
    // await fetch('http://tracing-collector/api/spans', {
    //   method: 'POST',
    //   headers: { 'Content-Type': 'application/json' },
    //   body: JSON.stringify(span)
    // });
  }

  getAllSpans(): Span[] {
    return this.spans;
  }

  getTrace(traceId: string): Span[] {
    return this.spans.filter(s => s.traceId === traceId);
  }
}

const tracer = new DistributedTracer();

// Usage
async function handleRequest() {
  const span = tracer.startSpan('handle_request');

  tracer.setTag(span, 'http.method', 'GET');
  tracer.setTag(span, 'http.url', '/api/users/123');

  try {
    // Database operation
    const dbSpan = tracer.startSpan('database_query', span.spanId);
    tracer.setTag(dbSpan, 'db.type', 'postgresql');
    tracer.setTag(dbSpan, 'db.statement', 'SELECT * FROM users WHERE id = $1');

    await queryDatabase();

    tracer.endSpan(dbSpan);

    // External API call
    const apiSpan = tracer.startSpan('external_api_call', span.spanId);
    tracer.setTag(apiSpan, 'http.url', 'https://api.example.com/data');

    await callExternalAPI();

    tracer.endSpan(apiSpan);

    tracer.logEvent(span, 'Request completed successfully');
    tracer.endSpan(span);
  } catch (error) {
    tracer.setError(span, error as Error);
    tracer.logEvent(span, 'Request failed', { error: (error as Error).message });
    tracer.endSpan(span);
    throw error;
  }
}

async function queryDatabase() {
  await new Promise(resolve => setTimeout(resolve, 100));
}

async function callExternalAPI() {
  await new Promise(resolve => setTimeout(resolve, 200));
}
typescript
interface Span {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  name: string;
  serviceName: string;
  startTime: number;
  endTime?: number;
  duration?: number;
  tags: Record<string, any>;
  logs: Array<{ timestamp: number; message: string; fields?: any }>;
  status: 'ok' | 'error';
}

class DistributedTracer {
  private spans: Span[] = [];

  startSpan(
    name: string,
    parentSpanId?: string
  ): Span {
    const context = getTraceContext();

    const span: Span = {
      traceId: context?.traceId || uuidv4(),
      spanId: uuidv4(),
      parentSpanId: parentSpanId || context?.parentSpanId,
      name,
      serviceName: context?.serviceName || 'unknown',
      startTime: Date.now(),
      tags: {},
      logs: [],
      status: 'ok'
    };

    this.spans.push(span);
    return span;
  }

  endSpan(span: Span): void {
    span.endTime = Date.now();
    span.duration = span.endTime - span.startTime;

    // Send to tracing backend
    this.reportSpan(span);
  }

  setTag(span: Span, key: string, value: any): void {
    span.tags[key] = value;
  }

  logEvent(span: Span, message: string, fields?: any): void {
    span.logs.push({
      timestamp: Date.now(),
      message,
      fields
    });
  }

  setError(span: Span, error: Error): void {
    span.status = 'error';
    span.tags['error'] = true;
    span.tags['error.message'] = error.message;
    span.tags['error.stack'] = error.stack;
  }

  private async reportSpan(span: Span): Promise<void> {
    // Send to Jaeger, Zipkin, or other backend
    console.log('Reporting span:', JSON.stringify(span, null, 2));

    // In production:
    // await fetch('http://tracing-collector/api/spans', {
    //   method: 'POST',
    //   headers: { 'Content-Type': 'application/json' },
    //   body: JSON.stringify(span)
    // });
  }

  getAllSpans(): Span[] {
    return this.spans;
  }

  getTrace(traceId: string): Span[] {
    return this.spans.filter(s => s.traceId === traceId);
  }
}

const tracer = new DistributedTracer();

// Usage
async function handleRequest() {
  const span = tracer.startSpan('handle_request');

  tracer.setTag(span, 'http.method', 'GET');
  tracer.setTag(span, 'http.url', '/api/users/123');

  try {
    // Database operation
    const dbSpan = tracer.startSpan('database_query', span.spanId);
    tracer.setTag(dbSpan, 'db.type', 'postgresql');
    tracer.setTag(dbSpan, 'db.statement', 'SELECT * FROM users WHERE id = $1');

    await queryDatabase();

    tracer.endSpan(dbSpan);

    // External API call
    const apiSpan = tracer.startSpan('external_api_call', span.spanId);
    tracer.setTag(apiSpan, 'http.url', 'https://api.example.com/data');

    await callExternalAPI();

    tracer.endSpan(apiSpan);

    tracer.logEvent(span, 'Request completed successfully');
    tracer.endSpan(span);
  } catch (error) {
    tracer.setError(span, error as Error);
    tracer.logEvent(span, 'Request failed', { error: (error as Error).message });
    tracer.endSpan(span);
    throw error;
  }
}

async function queryDatabase() {
  await new Promise(resolve => setTimeout(resolve, 100));
}

async function callExternalAPI() {
  await new Promise(resolve => setTimeout(resolve, 200));
}

Best Practices

最佳实践

✅ DO

✅ 建议

  • Generate trace IDs at entry points
  • Propagate trace context across services
  • Include correlation IDs in logs
  • Use structured logging
  • Set appropriate span attributes
  • Sample traces in high-traffic systems
  • Monitor trace collection overhead
  • Implement context propagation
  • 在入口点生成追踪ID
  • 在服务间传播追踪上下文
  • 在日志中包含关联ID
  • 使用结构化日志
  • 设置合适的Span属性
  • 在高流量系统中对追踪进行采样
  • 监控追踪采集的开销
  • 实现上下文传播

❌ DON'T

❌ 不建议

  • Skip trace propagation
  • Log without correlation context
  • Create too many spans
  • Store sensitive data in spans
  • Block on trace reporting
  • Forget error tracking
  • 跳过追踪传播
  • 日志中不包含关联上下文
  • 创建过多Span
  • 在Span中存储敏感数据
  • 阻塞追踪上报
  • 忽略错误追踪

Trace Headers

追踪头部

X-Trace-Id: trace identifier
X-Span-Id: current span
X-Parent-Span-Id: parent span
X-Sampled: sampling decision
X-Trace-Id: trace identifier
X-Span-Id: current span
X-Parent-Span-Id: parent span
X-Sampled: sampling decision

Resources

参考资源