customerio-performance-tuning

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Customer.io Performance Tuning

Customer.io性能调优

Overview

概述

Optimize Customer.io API performance for high-volume and low-latency integrations.
优化Customer.io API性能,以适配高并发、低延迟的集成场景。

Prerequisites

前提条件

  • Customer.io integration working
  • Monitoring infrastructure
  • Understanding of your traffic patterns
  • 已完成Customer.io集成
  • 具备监控基础设施
  • 了解自身流量模式

Instructions

操作步骤

Step 1: Connection Pooling

步骤1:连接池配置

typescript
// lib/customerio-pooled.ts
import { TrackClient, RegionUS } from '@customerio/track';
import { Agent } from 'http';
import { Agent as HttpsAgent } from 'https';

// Create connection pool with keep-alive
const httpsAgent = new HttpsAgent({
  keepAlive: true,
  keepAliveMsecs: 30000,
  maxSockets: 100,
  maxFreeSockets: 20,
  timeout: 30000
});

// Create client with connection pooling
export function createPooledClient(): TrackClient {
  return new TrackClient(
    process.env.CUSTOMERIO_SITE_ID!,
    process.env.CUSTOMERIO_API_KEY!,
    {
      region: RegionUS,
      // Pass custom agent for connection pooling
      httpAgent: httpsAgent
    }
  );
}

// Singleton for connection reuse
let clientInstance: TrackClient | null = null;

export function getClient(): TrackClient {
  if (!clientInstance) {
    clientInstance = createPooledClient();
  }
  return clientInstance;
}
typescript
// lib/customerio-pooled.ts
import { TrackClient, RegionUS } from '@customerio/track';
import { Agent } from 'http';
import { Agent as HttpsAgent } from 'https';

// Create connection pool with keep-alive
const httpsAgent = new HttpsAgent({
  keepAlive: true,
  keepAliveMsecs: 30000,
  maxSockets: 100,
  maxFreeSockets: 20,
  timeout: 30000
});

// Create client with connection pooling
export function createPooledClient(): TrackClient {
  return new TrackClient(
    process.env.CUSTOMERIO_SITE_ID!,
    process.env.CUSTOMERIO_API_KEY!,
    {
      region: RegionUS,
      // Pass custom agent for connection pooling
      httpAgent: httpsAgent
    }
  );
}

// Singleton for connection reuse
let clientInstance: TrackClient | null = null;

export function getClient(): TrackClient {
  if (!clientInstance) {
    clientInstance = createPooledClient();
  }
  return clientInstance;
}

Step 2: Batch Processing

步骤2:批量处理

typescript
// lib/batch-processor.ts
import { TrackClient } from '@customerio/track';

interface BatchItem {
  type: 'identify' | 'track';
  userId: string;
  data: Record<string, any>;
}

export class BatchProcessor {
  private batch: BatchItem[] = [];
  private batchSize: number;
  private flushInterval: number;
  private timer: NodeJS.Timer | null = null;

  constructor(
    private client: TrackClient,
    options: { batchSize?: number; flushIntervalMs?: number } = {}
  ) {
    this.batchSize = options.batchSize || 100;
    this.flushInterval = options.flushIntervalMs || 1000;
    this.startFlushTimer();
  }

  add(item: BatchItem): void {
    this.batch.push(item);

    if (this.batch.length >= this.batchSize) {
      this.flush();
    }
  }

  async flush(): Promise<void> {
    if (this.batch.length === 0) return;

    const items = this.batch.splice(0, this.batchSize);

    // Process in parallel with concurrency limit
    const concurrency = 10;
    for (let i = 0; i < items.length; i += concurrency) {
      const chunk = items.slice(i, i + concurrency);
      await Promise.all(chunk.map(item => this.processItem(item)));
    }
  }

  private async processItem(item: BatchItem): Promise<void> {
    try {
      if (item.type === 'identify') {
        await this.client.identify(item.userId, item.data);
      } else {
        await this.client.track(item.userId, {
          name: item.data.event,
          data: item.data.properties
        });
      }
    } catch (error) {
      console.error(`Failed to process ${item.type} for ${item.userId}:`, error);
    }
  }

  private startFlushTimer(): void {
    this.timer = setInterval(() => this.flush(), this.flushInterval);
  }

  async shutdown(): Promise<void> {
    if (this.timer) {
      clearInterval(this.timer);
    }
    await this.flush();
  }
}
typescript
// lib/batch-processor.ts
import { TrackClient } from '@customerio/track';

interface BatchItem {
  type: 'identify' | 'track';
  userId: string;
  data: Record<string, any>;
}

export class BatchProcessor {
  private batch: BatchItem[] = [];
  private batchSize: number;
  private flushInterval: number;
  private timer: NodeJS.Timer | null = null;

  constructor(
    private client: TrackClient,
    options: { batchSize?: number; flushIntervalMs?: number } = {}
  ) {
    this.batchSize = options.batchSize || 100;
    this.flushInterval = options.flushIntervalMs || 1000;
    this.startFlushTimer();
  }

  add(item: BatchItem): void {
    this.batch.push(item);

    if (this.batch.length >= this.batchSize) {
      this.flush();
    }
  }

  async flush(): Promise<void> {
    if (this.batch.length === 0) return;

    const items = this.batch.splice(0, this.batchSize);

    // Process in parallel with concurrency limit
    const concurrency = 10;
    for (let i = 0; i < items.length; i += concurrency) {
      const chunk = items.slice(i, i + concurrency);
      await Promise.all(chunk.map(item => this.processItem(item)));
    }
  }

  private async processItem(item: BatchItem): Promise<void> {
    try {
      if (item.type === 'identify') {
        await this.client.identify(item.userId, item.data);
      } else {
        await this.client.track(item.userId, {
          name: item.data.event,
          data: item.data.properties
        });
      }
    } catch (error) {
      console.error(`Failed to process ${item.type} for ${item.userId}:`, error);
    }
  }

  private startFlushTimer(): void {
    this.timer = setInterval(() => this.flush(), this.flushInterval);
  }

  async shutdown(): Promise<void> {
    if (this.timer) {
      clearInterval(this.timer);
    }
    await this.flush();
  }
}

Step 3: Async Fire-and-Forget

步骤3:异步即发即弃

typescript
// lib/async-tracker.ts
import { TrackClient } from '@customerio/track';

class AsyncTracker {
  private queue: Array<() => Promise<void>> = [];
  private processing = false;
  private concurrency = 5;

  constructor(private client: TrackClient) {}

  // Non-blocking identify
  identifyAsync(userId: string, attributes: Record<string, any>): void {
    this.enqueue(() => this.client.identify(userId, attributes));
  }

  // Non-blocking track
  trackAsync(userId: string, event: string, data?: Record<string, any>): void {
    this.enqueue(() => this.client.track(userId, { name: event, data }));
  }

  private enqueue(operation: () => Promise<void>): void {
    this.queue.push(operation);
    this.processQueue();
  }

  private async processQueue(): Promise<void> {
    if (this.processing) return;
    this.processing = true;

    while (this.queue.length > 0) {
      const batch = this.queue.splice(0, this.concurrency);
      await Promise.allSettled(batch.map(op => op()));
    }

    this.processing = false;
  }
}

export const asyncTracker = new AsyncTracker(getClient());
typescript
// lib/async-tracker.ts
import { TrackClient } from '@customerio/track';

class AsyncTracker {
  private queue: Array<() => Promise<void>> = [];
  private processing = false;
  private concurrency = 5;

  constructor(private client: TrackClient) {}

  // Non-blocking identify
  identifyAsync(userId: string, attributes: Record<string, any>): void {
    this.enqueue(() => this.client.identify(userId, attributes));
  }

  // Non-blocking track
  trackAsync(userId: string, event: string, data?: Record<string, any>): void {
    this.enqueue(() => this.client.track(userId, { name: event, data }));
  }

  private enqueue(operation: () => Promise<void>): void {
    this.queue.push(operation);
    this.processQueue();
  }

  private async processQueue(): Promise<void> {
    if (this.processing) return;
    this.processing = true;

    while (this.queue.length > 0) {
      const batch = this.queue.splice(0, this.concurrency);
      await Promise.allSettled(batch.map(op => op()));
    }

    this.processing = false;
  }
}

export const asyncTracker = new AsyncTracker(getClient());

Step 4: Caching for Deduplication

步骤4:缓存去重

typescript
// lib/dedup-cache.ts
import { LRUCache } from 'lru-cache';

interface CacheEntry {
  userId: string;
  attributes: Record<string, any>;
  timestamp: number;
}

const identifyCache = new LRUCache<string, CacheEntry>({
  max: 10000,
  ttl: 60000 // 1 minute
});

export function shouldIdentify(
  userId: string,
  attributes: Record<string, any>
): boolean {
  const cacheKey = `${userId}:${JSON.stringify(attributes)}`;
  const cached = identifyCache.get(cacheKey);

  if (cached) {
    // Skip if identical identify within TTL
    return false;
  }

  identifyCache.set(cacheKey, {
    userId,
    attributes,
    timestamp: Date.now()
  });

  return true;
}

// Track event deduplication
const eventCache = new LRUCache<string, number>({
  max: 50000,
  ttl: 5000 // 5 seconds
});

export function shouldTrack(
  userId: string,
  eventName: string,
  eventId?: string
): boolean {
  const cacheKey = eventId || `${userId}:${eventName}:${Date.now()}`;

  if (eventCache.has(cacheKey)) {
    return false;
  }

  eventCache.set(cacheKey, Date.now());
  return true;
}
typescript
// lib/dedup-cache.ts
import { LRUCache } from 'lru-cache';

interface CacheEntry {
  userId: string;
  attributes: Record<string, any>;
  timestamp: number;
}

const identifyCache = new LRUCache<string, CacheEntry>({
  max: 10000,
  ttl: 60000 // 1 minute
});

export function shouldIdentify(
  userId: string,
  attributes: Record<string, any>
): boolean {
  const cacheKey = `${userId}:${JSON.stringify(attributes)}`;
  const cached = identifyCache.get(cacheKey);

  if (cached) {
    // Skip if identical identify within TTL
    return false;
  }

  identifyCache.set(cacheKey, {
    userId,
    attributes,
    timestamp: Date.now()
  });

  return true;
}

// Track event deduplication
const eventCache = new LRUCache<string, number>({
  max: 50000,
  ttl: 5000 // 5 seconds
});

export function shouldTrack(
  userId: string,
  eventName: string,
  eventId?: string
): boolean {
  const cacheKey = eventId || `${userId}:${eventName}:${Date.now()}`;

  if (eventCache.has(cacheKey)) {
    return false;
  }

  eventCache.set(cacheKey, Date.now());
  return true;
}

Step 5: Regional Optimization

步骤5:区域优化

typescript
// lib/regional-client.ts
import { TrackClient, RegionUS, RegionEU } from '@customerio/track';

interface RegionalConfig {
  us: { siteId: string; apiKey: string };
  eu: { siteId: string; apiKey: string };
}

class RegionalCustomerIO {
  private clients: Map<string, TrackClient> = new Map();

  constructor(config: RegionalConfig) {
    this.clients.set('us', new TrackClient(
      config.us.siteId,
      config.us.apiKey,
      { region: RegionUS }
    ));

    this.clients.set('eu', new TrackClient(
      config.eu.siteId,
      config.eu.apiKey,
      { region: RegionEU }
    ));
  }

  private getClientForUser(userId: string, userRegion?: string): TrackClient {
    // Route to nearest region
    const region = userRegion || this.inferRegion(userId);
    return this.clients.get(region) || this.clients.get('us')!;
  }

  private inferRegion(userId: string): string {
    // Implement region inference logic
    // Could be based on user preferences, IP geolocation, etc.
    return 'us';
  }

  async identify(
    userId: string,
    attributes: Record<string, any>,
    region?: string
  ): Promise<void> {
    const client = this.getClientForUser(userId, region);
    await client.identify(userId, attributes);
  }
}
typescript
// lib/regional-client.ts
import { TrackClient, RegionUS, RegionEU } from '@customerio/track';

interface RegionalConfig {
  us: { siteId: string; apiKey: string };
  eu: { siteId: string; apiKey: string };
}

class RegionalCustomerIO {
  private clients: Map<string, TrackClient> = new Map();

  constructor(config: RegionalConfig) {
    this.clients.set('us', new TrackClient(
      config.us.siteId,
      config.us.apiKey,
      { region: RegionUS }
    ));

    this.clients.set('eu', new TrackClient(
      config.eu.siteId,
      config.eu.apiKey,
      { region: RegionEU }
    ));
  }

  private getClientForUser(userId: string, userRegion?: string): TrackClient {
    // Route to nearest region
    const region = userRegion || this.inferRegion(userId);
    return this.clients.get(region) || this.clients.get('us')!;
  }

  private inferRegion(userId: string): string {
    // Implement region inference logic
    // Could be based on user preferences, IP geolocation, etc.
    return 'us';
  }

  async identify(
    userId: string,
    attributes: Record<string, any>,
    region?: string
  ): Promise<void> {
    const client = this.getClientForUser(userId, region);
    await client.identify(userId, attributes);
  }
}

Step 6: Performance Monitoring

步骤6:性能监控

typescript
// lib/performance-monitor.ts
import { metrics } from './metrics';

function wrapWithTiming<T>(
  name: string,
  operation: () => Promise<T>
): Promise<T> {
  const start = Date.now();

  return operation()
    .then(result => {
      metrics.histogram(`customerio.${name}.latency`, Date.now() - start);
      metrics.increment(`customerio.${name}.success`);
      return result;
    })
    .catch(error => {
      metrics.histogram(`customerio.${name}.latency`, Date.now() - start);
      metrics.increment(`customerio.${name}.error`);
      throw error;
    });
}

// Usage
await wrapWithTiming('identify', () =>
  client.identify(userId, attributes)
);
typescript
// lib/performance-monitor.ts
import { metrics } from './metrics';

function wrapWithTiming<T>(
  name: string,
  operation: () => Promise<T>
): Promise<T> {
  const start = Date.now();

  return operation()
    .then(result => {
      metrics.histogram(`customerio.${name}.latency`, Date.now() - start);
      metrics.increment(`customerio.${name}.success`);
      return result;
    })
    .catch(error => {
      metrics.histogram(`customerio.${name}.latency`, Date.now() - start);
      metrics.increment(`customerio.${name}.error`);
      throw error;
    });
}

// Usage
await wrapWithTiming('identify', () =>
  client.identify(userId, attributes)
);

Performance Benchmarks

性能基准

OperationTarget LatencyNotes
Identify< 100msWith connection pooling
Track Event< 100msWith connection pooling
Batch (100 items)< 500msParallel processing
Webhook Processing< 50msExcluding downstream ops
操作目标延迟说明
Identify< 100ms启用连接池后
跟踪事件< 100ms启用连接池后
批量处理(100条数据)< 500ms并行处理
Webhook处理< 50ms不包含下游操作

Optimization Checklist

优化检查清单

  • Connection pooling enabled
  • Batch processing for bulk operations
  • Async fire-and-forget for non-critical events
  • Deduplication cache implemented
  • Regional routing configured
  • Performance monitoring in place
  • 已启用连接池
  • 已为批量操作配置批量处理
  • 已为非关键事件配置异步即发即弃
  • 已实现去重缓存
  • 已配置区域路由
  • 已部署性能监控

Error Handling

错误处理

IssueSolution
High latencyEnable connection pooling
Timeout errorsReduce payload size, increase timeout
Memory pressureLimit cache and queue sizes
问题解决方案
高延迟启用连接池
超时错误减小负载大小,增加超时时间
内存压力限制缓存和队列大小

Resources

参考资源

Next Steps

后续步骤

After performance tuning, proceed to
customerio-cost-tuning
for cost optimization.
完成性能调优后,可继续执行
customerio-cost-tuning
进行成本优化。