analytics-pipeline
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAnalytics Pipeline
分析流水线
High-performance analytics with Redis counters and periodic database flush.
基于Redis计数器与定期数据库刷新的高性能分析方案。
When to Use This Skill
何时使用该方案
- Need high-throughput event tracking (thousands/second)
- Want real-time counters without database bottlenecks
- Building dashboards with time-series data
- Tracking user activity, feature usage, or page views
- 需要高吞吐量事件追踪(每秒数千次)
- 希望实现无数据库瓶颈的实时计数器
- 构建基于时间序列数据的仪表盘
- 追踪用户活动、功能使用情况或页面浏览量
Core Concepts
核心概念
Write to Redis for speed, flush to PostgreSQL for persistence. Redis handles high write throughput, periodic workers batch-flush to the database.
Events → Redis Counters → Periodic Flush Worker → PostgreSQL → Dashboard Queries写入Redis以保证速度,刷新到PostgreSQL以实现持久化。Redis处理高写入吞吐量,定期运行的Worker将数据批量刷新到数据库。
Events → Redis Counters → Periodic Flush Worker → PostgreSQL → Dashboard QueriesImplementation
实现方案
Python
Python
python
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, List
import redis.asyncio as redis
class AnalyticsEventType(str, Enum):
GENERATION_COMPLETED = "generation_completed"
USER_SIGNUP = "user_signup"
FEATURE_USED = "feature_used"
PAGE_VIEW = "page_view"
@dataclass
class AnalyticsEvent:
event_type: AnalyticsEventType
user_id: Optional[str] = None
properties: Optional[Dict] = None
timestamp: Optional[datetime] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now(timezone.utc)
class AnalyticsKeys:
"""Redis key patterns for analytics counters."""
PREFIX = "analytics"
@staticmethod
def daily_counter(event_type: str, date: datetime = None) -> str:
d = date or datetime.now(timezone.utc)
return f"analytics:counter:{event_type}:{d.strftime('%Y-%m-%d')}"
@staticmethod
def hourly_counter(event_type: str, date: datetime = None) -> str:
d = date or datetime.now(timezone.utc)
return f"analytics:counter:{event_type}:{d.strftime('%Y-%m-%d:%H')}"
@staticmethod
def user_daily_counter(user_id: str, event_type: str, date: datetime = None) -> str:
d = date or datetime.now(timezone.utc)
return f"analytics:user:{user_id}:{event_type}:{d.strftime('%Y-%m-%d')}"
@staticmethod
def pending_flush_set() -> str:
return "analytics:pending_flush"
class AnalyticsService:
"""High-performance analytics using Redis counters."""
COUNTER_TTL = 7 * 24 * 60 * 60 # 7 days
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def track_event(self, event: AnalyticsEvent) -> None:
pipe = self.redis.pipeline()
# Daily counter
daily_key = AnalyticsKeys.daily_counter(event.event_type.value, event.timestamp)
pipe.incr(daily_key)
pipe.expire(daily_key, self.COUNTER_TTL)
# Hourly counter
hourly_key = AnalyticsKeys.hourly_counter(event.event_type.value, event.timestamp)
pipe.incr(hourly_key)
pipe.expire(hourly_key, self.COUNTER_TTL)
# Per-user counter
if event.user_id:
user_key = AnalyticsKeys.user_daily_counter(event.user_id, event.event_type.value, event.timestamp)
pipe.incr(user_key)
pipe.expire(user_key, self.COUNTER_TTL)
# Track for flush
pipe.sadd(AnalyticsKeys.pending_flush_set(),
f"{event.event_type.value}:{event.timestamp.strftime('%Y-%m-%d')}")
await pipe.execute()
async def get_daily_count(self, event_type: AnalyticsEventType, date: datetime = None) -> int:
key = AnalyticsKeys.daily_counter(event_type.value, date)
count = await self.redis.get(key)
return int(count) if count else 0
async def get_hourly_counts(self, event_type: AnalyticsEventType, date: datetime = None) -> Dict[int, int]:
d = date or datetime.now(timezone.utc)
pipe = self.redis.pipeline()
for hour in range(24):
hour_dt = d.replace(hour=hour, minute=0, second=0, microsecond=0)
pipe.get(AnalyticsKeys.hourly_counter(event_type.value, hour_dt))
results = await pipe.execute()
return {hour: int(count) if count else 0 for hour, count in enumerate(results)}python
class AnalyticsFlushWorker:
"""Periodically flushes Redis counters to PostgreSQL."""
FLUSH_INTERVAL = 300 # 5 minutes
BATCH_SIZE = 100
def __init__(self, redis_client: redis.Redis, pg_pool):
self.redis = redis_client
self.pg = pg_pool
self._running = False
async def start(self) -> None:
self._running = True
while self._running:
try:
await self.flush()
except Exception as e:
logger.error(f"Flush error: {e}")
await asyncio.sleep(self.FLUSH_INTERVAL)
async def flush(self) -> int:
pending = await self.redis.smembers(AnalyticsKeys.pending_flush_set())
if not pending:
return 0
flushed = 0
pending_list = list(pending)
for i in range(0, len(pending_list), self.BATCH_SIZE):
batch = pending_list[i:i + self.BATCH_SIZE]
counters = await self._collect_counters(batch)
if counters:
await self._write_to_postgres(counters)
flushed += len(counters)
await self.redis.srem(AnalyticsKeys.pending_flush_set(), *batch)
return flushed
async def _collect_counters(self, pending_keys: List[str]) -> List[tuple]:
counters = []
pipe = self.redis.pipeline()
for pending in pending_keys:
parts = pending.split(":", 1)
if len(parts) != 2:
continue
event_type, date = parts
key = AnalyticsKeys.daily_counter(event_type, datetime.fromisoformat(date))
pipe.getdel(key) # Atomic get-and-delete
results = await pipe.execute()
for pending, count in zip(pending_keys, results):
if count:
parts = pending.split(":", 1)
counters.append((parts[0], parts[1], int(count)))
return counters
async def _write_to_postgres(self, counters: List[tuple]) -> None:
async with self.pg.acquire() as conn:
await conn.executemany("""
INSERT INTO analytics_daily (event_type, date, count, updated_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (event_type, date)
DO UPDATE SET count = analytics_daily.count + EXCLUDED.count, updated_at = NOW()
""", counters)python
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, List
import redis.asyncio as redis
class AnalyticsEventType(str, Enum):
GENERATION_COMPLETED = "generation_completed"
USER_SIGNUP = "user_signup"
FEATURE_USED = "feature_used"
PAGE_VIEW = "page_view"
@dataclass
class AnalyticsEvent:
event_type: AnalyticsEventType
user_id: Optional[str] = None
properties: Optional[Dict] = None
timestamp: Optional[datetime] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now(timezone.utc)
class AnalyticsKeys:
"""Redis key patterns for analytics counters."""
PREFIX = "analytics"
@staticmethod
def daily_counter(event_type: str, date: datetime = None) -> str:
d = date or datetime.now(timezone.utc)
return f"analytics:counter:{event_type}:{d.strftime('%Y-%m-%d')}"
@staticmethod
def hourly_counter(event_type: str, date: datetime = None) -> str:
d = date or datetime.now(timezone.utc)
return f"analytics:counter:{event_type}:{d.strftime('%Y-%m-%d:%H')}"
@staticmethod
def user_daily_counter(user_id: str, event_type: str, date: datetime = None) -> str:
d = date or datetime.now(timezone.utc)
return f"analytics:user:{user_id}:{event_type}:{d.strftime('%Y-%m-%d')}"
@staticmethod
def pending_flush_set() -> str:
return "analytics:pending_flush"
class AnalyticsService:
"""High-performance analytics using Redis counters."""
COUNTER_TTL = 7 * 24 * 60 * 60 # 7 days
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def track_event(self, event: AnalyticsEvent) -> None:
pipe = self.redis.pipeline()
# Daily counter
daily_key = AnalyticsKeys.daily_counter(event.event_type.value, event.timestamp)
pipe.incr(daily_key)
pipe.expire(daily_key, self.COUNTER_TTL)
# Hourly counter
hourly_key = AnalyticsKeys.hourly_counter(event.event_type.value, event.timestamp)
pipe.incr(hourly_key)
pipe.expire(hourly_key, self.COUNTER_TTL)
# Per-user counter
if event.user_id:
user_key = AnalyticsKeys.user_daily_counter(event.user_id, event.event_type.value, event.timestamp)
pipe.incr(user_key)
pipe.expire(user_key, self.COUNTER_TTL)
# Track for flush
pipe.sadd(AnalyticsKeys.pending_flush_set(),
f"{event.event_type.value}:{event.timestamp.strftime('%Y-%m-%d')}")
await pipe.execute()
async def get_daily_count(self, event_type: AnalyticsEventType, date: datetime = None) -> int:
key = AnalyticsKeys.daily_counter(event_type.value, date)
count = await self.redis.get(key)
return int(count) if count else 0
async def get_hourly_counts(self, event_type: AnalyticsEventType, date: datetime = None) -> Dict[int, int]:
d = date or datetime.now(timezone.utc)
pipe = self.redis.pipeline()
for hour in range(24):
hour_dt = d.replace(hour=hour, minute=0, second=0, microsecond=0)
pipe.get(AnalyticsKeys.hourly_counter(event_type.value, hour_dt))
results = await pipe.execute()
return {hour: int(count) if count else 0 for hour, count in enumerate(results)}python
class AnalyticsFlushWorker:
"""Periodically flushes Redis counters to PostgreSQL."""
FLUSH_INTERVAL = 300 # 5 minutes
BATCH_SIZE = 100
def __init__(self, redis_client: redis.Redis, pg_pool):
self.redis = redis_client
self.pg = pg_pool
self._running = False
async def start(self) -> None:
self._running = True
while self._running:
try:
await self.flush()
except Exception as e:
logger.error(f"Flush error: {e}")
await asyncio.sleep(self.FLUSH_INTERVAL)
async def flush(self) -> int:
pending = await self.redis.smembers(AnalyticsKeys.pending_flush_set())
if not pending:
return 0
flushed = 0
pending_list = list(pending)
for i in range(0, len(pending_list), self.BATCH_SIZE):
batch = pending_list[i:i + self.BATCH_SIZE]
counters = await self._collect_counters(batch)
if counters:
await self._write_to_postgres(counters)
flushed += len(counters)
await self.redis.srem(AnalyticsKeys.pending_flush_set(), *batch)
return flushed
async def _collect_counters(self, pending_keys: List[str]) -> List[tuple]:
counters = []
pipe = self.redis.pipeline()
for pending in pending_keys:
parts = pending.split(":", 1)
if len(parts) != 2:
continue
event_type, date = parts
key = AnalyticsKeys.daily_counter(event_type, datetime.fromisoformat(date))
pipe.getdel(key) # Atomic get-and-delete
results = await pipe.execute()
for pending, count in zip(pending_keys, results):
if count:
parts = pending.split(":", 1)
counters.append((parts[0], parts[1], int(count)))
return counters
async def _write_to_postgres(self, counters: List[tuple]) -> None:
async with self.pg.acquire() as conn:
await conn.executemany("""
INSERT INTO analytics_daily (event_type, date, count, updated_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (event_type, date)
DO UPDATE SET count = analytics_daily.count + EXCLUDED.count, updated_at = NOW()
""", counters)Usage Examples
使用示例
python
undefinedpython
undefinedTrack events
Track events
analytics = AnalyticsService(redis_client)
await analytics.track_event(AnalyticsEvent(
event_type=AnalyticsEventType.GENERATION_COMPLETED,
user_id="user_123",
properties={"model": "gpt-4"},
))
analytics = AnalyticsService(redis_client)
await analytics.track_event(AnalyticsEvent(
event_type=AnalyticsEventType.GENERATION_COMPLETED,
user_id="user_123",
properties={"model": "gpt-4"},
))
Query real-time counts
Query real-time counts
today_count = await analytics.get_daily_count(AnalyticsEventType.GENERATION_COMPLETED)
hourly = await analytics.get_hourly_counts(AnalyticsEventType.GENERATION_COMPLETED)
today_count = await analytics.get_daily_count(AnalyticsEventType.GENERATION_COMPLETED)
hourly = await analytics.get_hourly_counts(AnalyticsEventType.GENERATION_COMPLETED)
Start flush worker
Start flush worker
worker = AnalyticsFlushWorker(redis_client, pg_pool)
asyncio.create_task(worker.start())
undefinedworker = AnalyticsFlushWorker(redis_client, pg_pool)
asyncio.create_task(worker.start())
undefinedBest Practices
最佳实践
- Use Redis pipelines for batched counter updates
- Set TTL on counters to prevent memory growth
- Use GETDEL for atomic flush to prevent double-counting
- Upsert on flush to handle duplicate dates gracefully
- Separate user vs global analytics tables for query efficiency
- 使用Redis流水线进行批量计数器更新
- 为计数器设置TTL以避免内存增长
- 使用GETDEL实现原子刷新,防止重复计数
- 刷新时使用Upsert优雅处理重复日期
- 分离用户与全局分析表以提升查询效率
Common Mistakes
常见错误
- Not setting TTL on Redis keys (memory leak)
- Using GET then DEL instead of GETDEL (race condition)
- Flushing too frequently (database load)
- Not batching flush operations
- 未为Redis键设置TTL(内存泄漏)
- 使用GET后再DEL而非GETDEL(竞态条件)
- 刷新过于频繁(增加数据库负载)
- 未对刷新操作进行批处理
Related Patterns
相关模式
- metrics-collection (system metrics)
- intelligent-cache (caching strategies)
- metrics-collection(系统指标)
- intelligent-cache(缓存策略)