ml-data-pipeline-architecture

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

ML Data Pipeline Architecture

ML数据管道架构

Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.
ADR: 2026-01-22-polars-preference-hook (efficiency preferences framework)
Note: A PreToolUse hook enforces Polars preference. To use Pandas, add
# polars-exception: <reason>
at file top.
使用Polars、Arrow和ClickHouse构建高效ML数据管道的模式。
ADR: 2026-01-22-polars-preference-hook(效率偏好框架)
注意:PreToolUse钩子会强制优先使用Polars。若要使用Pandas,请在文件顶部添加
# polars-exception: <原因>

When to Use This Skill

何时使用该技能

Use this skill when:
  • Deciding between Polars and Pandas for a data pipeline
  • Optimizing memory usage with zero-copy Arrow patterns
  • Loading data from ClickHouse into PyTorch DataLoaders
  • Implementing lazy evaluation for large datasets
  • Migrating existing Pandas code to Polars

在以下场景中使用该技能:
  • 为数据管道选择Polars或Pandas时
  • 使用零拷贝Arrow模式优化内存使用时
  • 将ClickHouse中的数据加载到PyTorch DataLoaders时
  • 为大型数据集实现延迟计算时
  • 将现有Pandas代码迁移到Polars时

1. Decision Tree: Polars vs Pandas

1. 决策树:Polars vs Pandas

Dataset size?
├─ < 1M rows → Pandas OK (simpler API, richer ecosystem)
├─ 1M-10M rows → Consider Polars (2-5x faster, less memory)
└─ > 10M rows → Use Polars (required for memory efficiency)

Operations?
├─ Simple transforms → Either works
├─ Group-by aggregations → Polars 5-10x faster
├─ Complex joins → Polars with lazy evaluation
└─ Streaming/chunked → Polars scan_* functions

Integration?
├─ scikit-learn heavy → Pandas (better interop)
├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor)
└─ ClickHouse source → Arrow stream → Polars (optimal)

数据集大小?
├─ < 100万行 → Pandas适用(API更简单,生态更丰富)
├─ 100万-1000万行 → 考虑使用Polars(速度快2-5倍,内存占用更少)
└─ > 1000万行 → 必须使用Polars(满足内存效率要求)

操作类型?
├─ 简单转换 → 两者均可
├─ 分组聚合 → Polars快5-10倍
├─ 复杂连接 → 使用带延迟计算的Polars
└─ 流式/分块处理 → Polars scan_*函数

集成场景?
├─ 重度使用scikit-learn → Pandas(互操作性更好)
├─ PyTorch/自定义场景 → Polars + Arrow(零拷贝到张量)
└─ 数据源为ClickHouse → Arrow流 → Polars(最优选择)

2. Zero-Copy Pipeline Architecture

2. 零拷贝管道架构

The Problem with Pandas

Pandas的问题

python
undefined
python
undefined

BAD: 3 memory copies

糟糕:3次内存拷贝

df = pd.read_sql(query, conn) # Copy 1: DB → pandas X = df[features].values # Copy 2: pandas → numpy tensor = torch.from_numpy(X) # Copy 3: numpy → tensor
df = pd.read_sql(query, conn) # 拷贝1:数据库 → pandas X = df[features].values # 拷贝2:pandas → numpy tensor = torch.from_numpy(X) # 拷贝3:numpy → tensor

Peak memory: 3x data size

峰值内存:数据大小的3倍

undefined
undefined

The Solution with Arrow

使用Arrow的解决方案

python
undefined
python
undefined

GOOD: 0-1 memory copies

优秀:0-1次内存拷贝

import clickhouse_connect import polars as pl import torch
client = clickhouse_connect.get_client(...) arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow in DB memory df = pl.from_arrow(arrow_table) # Zero-copy view X = df.select(features).to_numpy() # Single allocation tensor = torch.from_numpy(X) # View (no copy)
import clickhouse_connect import polars as pl import torch
client = clickhouse_connect.get_client(...) arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow存储在数据库内存中 df = pl.from_arrow(arrow_table) # 零拷贝视图 X = df.select(features).to_numpy() # 单次内存分配 tensor = torch.from_numpy(X) # 视图(无拷贝)

Peak memory: 1.2x data size

峰值内存:数据大小的1.2倍


---

---

3. ClickHouse Integration Patterns

3. ClickHouse集成模式

Pattern A: Arrow Stream (Recommended)

模式A:Arrow流(推荐)

python
def query_arrow(client, query: str) -> pl.DataFrame:
    """ClickHouse → Arrow → Polars (zero-copy chain)."""
    arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
    return pl.from_arrow(arrow_table)
python
def query_arrow(client, query: str) -> pl.DataFrame:
    """ClickHouse → Arrow → Polars(零拷贝链路)。"""
    arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
    return pl.from_arrow(arrow_table)

Usage

使用示例

df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
undefined
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
undefined

Pattern B: Polars Native (Simpler)

模式B:Polars原生集成(更简单)

python
undefined
python
undefined

Polars has native ClickHouse support (see pola.rs for version requirements)

Polars原生支持ClickHouse(查看pola.rs了解版本要求)

df = pl.read_database_uri( query="SELECT * FROM bars", uri="clickhouse://user:pass@host/db" )
undefined
df = pl.read_database_uri( query="SELECT * FROM bars", uri="clickhouse://user:pass@host/db" )
undefined

Pattern C: Parquet Export (Batch Jobs)

模式C:Parquet导出(批处理任务)

python
undefined
python
undefined

For reproducible batch processing

用于可复现的批处理

client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet") df = pl.scan_parquet("data.parquet") # Lazy, memory-mapped

---
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet") df = pl.scan_parquet("data.parquet") # 延迟加载,内存映射

---

4. PyTorch DataLoader Integration

4. PyTorch DataLoader集成

Minimal Change Pattern

最小改动模式

python
from torch.utils.data import TensorDataset, DataLoader
python
from torch.utils.data import TensorDataset, DataLoader

Accept both pandas and polars

同时支持pandas和polars

def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]: if isinstance(df, pd.DataFrame): df = pl.from_pandas(df)
X = df.select(features).to_numpy()
y = df.select(target).to_numpy()

return (
    torch.from_numpy(X).float(),
    torch.from_numpy(y).float()
)
X, y = prepare_data(df) dataset = TensorDataset(X, y) loader = DataLoader(dataset, batch_size=32, pin_memory=True)
undefined
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]: if isinstance(df, pd.DataFrame): df = pl.from_pandas(df)
X = df.select(features).to_numpy()
y = df.select(target).to_numpy()

return (
    torch.from_numpy(X).float(),
    torch.from_numpy(y).float()
)
X, y = prepare_data(df) dataset = TensorDataset(X, y) loader = DataLoader(dataset, batch_size=32, pin_memory=True)
undefined

Custom PolarsDataset (Large Data)

自定义PolarsDataset(大数据场景)

python
class PolarsDataset(torch.utils.data.Dataset):
    """Memory-efficient dataset from Polars DataFrame."""

    def __init__(self, df: pl.DataFrame, features: list[str], target: str):
        self.arrow = df.to_arrow()  # Arrow backing for zero-copy slicing
        self.features = features
        self.target = target

    def __len__(self) -> int:
        return self.arrow.num_rows

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        row = self.arrow.slice(idx, 1)
        x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
        y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
        return x, y

python
class PolarsDataset(torch.utils.data.Dataset):
    """基于Polars DataFrame的内存高效数据集。"""

    def __init__(self, df: pl.DataFrame, features: list[str], target: str):
        self.arrow = df.to_arrow()  # 基于Arrow实现零拷贝切片
        self.features = features
        self.target = target

    def __len__(self) -> int:
        return self.arrow.num_rows

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        row = self.arrow.slice(idx, 1)
        x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
        y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
        return x, y

5. Lazy Evaluation Patterns

5. 延迟计算模式

Pipeline Composition

管道组合

python
undefined
python
undefined

Define transformations lazily (no computation yet)

延迟定义转换(尚未执行计算)

pipeline = ( pl.scan_parquet("raw_data.parquet") .filter(pl.col("timestamp") >= start_date) .with_columns([ (pl.col("close").pct_change()).alias("returns"), (pl.col("volume").log()).alias("log_volume"), ]) .select(features + [target]) )
pipeline = ( pl.scan_parquet("raw_data.parquet") .filter(pl.col("timestamp") >= start_date) .with_columns([ (pl.col("close").pct_change()).alias("returns"), (pl.col("volume").log()).alias("log_volume"), ]) .select(features + [target]) )

Execute only when needed

仅在需要时执行

train_df = pipeline.filter(pl.col("timestamp") < split_date).collect() test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
undefined
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect() test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
undefined

Streaming for Large Files

大文件流式处理

python
undefined
python
undefined

Process file in chunks (never loads full file)

分块处理文件(不会加载整个文件)

def process_large_file(path: str, chunk_size: int = 100_000): reader = pl.scan_parquet(path)
for batch in reader.iter_batches(n_rows=chunk_size):
    # Process each chunk
    features = compute_features(batch)
    yield features.to_numpy()

---
def process_large_file(path: str, chunk_size: int = 100_000): reader = pl.scan_parquet(path)
for batch in reader.iter_batches(n_rows=chunk_size):
    # 处理每个分块
    features = compute_features(batch)
    yield features.to_numpy()

---

6. Schema Validation

6. Schema验证

Pydantic for Config

使用Pydantic进行配置

python
from pydantic import BaseModel, field_validator

class FeatureConfig(BaseModel):
    features: list[str]
    target: str
    seq_len: int = 15

    @field_validator("features")
    @classmethod
    def validate_features(cls, v):
        required = {"returns_vs", "momentum_z", "atr_pct"}
        missing = required - set(v)
        if missing:
            raise ValueError(f"Missing required features: {missing}")
        return v
python
from pydantic import BaseModel, field_validator

class FeatureConfig(BaseModel):
    features: list[str]
    target: str
    seq_len: int = 15

    @field_validator("features")
    @classmethod
    def validate_features(cls, v):
        required = {"returns_vs", "momentum_z", "atr_pct"}
        missing = required - set(v)
        if missing:
            raise ValueError(f"缺少必需特征:{missing}")
        return v

DataFrame Schema Validation

DataFrame Schema验证

python
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
    """Fail-fast schema validation."""
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(
            f"[{stage}] Missing columns: {missing}\n"
            f"Available: {sorted(df.columns)}"
        )

python
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
    """快速失败的Schema验证。"""
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(
            f"[{stage}] 缺少列:{missing}\n"
            f"可用列:{sorted(df.columns)}"
        )

7. Performance Benchmarks

7. 性能基准测试

OperationPandasPolarsSpeedup
Read CSV (1GB)45s4s11x
Filter rows2.1s0.4s5x
Group-by agg3.8s0.3s13x
Sort5.2s0.4s13x
Memory peak10GB2.5GB4x
Benchmark: 50M rows, 20 columns, MacBook M2

操作类型PandasPolars速度提升倍数
读取1GB CSV文件45s4s11x
行过滤2.1s0.4s5x
分组聚合3.8s0.3s13x
排序5.2s0.4s13x
峰值内存占用10GB2.5GB4x
基准测试环境:5000万行,20列,MacBook M2

8. Migration Checklist

8. 迁移检查清单

Phase 1: Add Arrow Support

阶段1:添加Arrow支持

  • Add
    polars = "<version>"
    to dependencies (see PyPI)
  • Implement
    query_arrow()
    in data client
  • Verify zero-copy with memory profiler
  • 在依赖中添加
    polars = "<version>"
    (查看PyPI
  • 在数据客户端中实现
    query_arrow()
  • 使用内存分析器验证零拷贝功能

Phase 2: Polars at Entry Points

阶段2:在入口处使用Polars

  • Add
    pl.from_pandas()
    wrapper at trainer entry
  • Update
    prepare_sequences()
    to accept both types
  • Add schema validation after conversion
  • 在训练器入口添加
    pl.from_pandas()
    包装器
  • 更新
    prepare_sequences()
    以支持两种类型
  • 转换后添加Schema验证

Phase 3: Full Lazy Evaluation

阶段3:完全延迟计算

  • Convert file reads to
    pl.scan_*
  • Compose transformations lazily
  • Call
    .collect()
    only before
    .to_numpy()

  • 将文件读取转换为
    pl.scan_*
  • 延迟组合转换操作
  • 仅在
    .to_numpy()
    之前调用
    .collect()

9. Anti-Patterns to Avoid

9. 需要避免的反模式

DON'T: Mix APIs Unnecessarily

不要:不必要地混合API

python
undefined
python
undefined

BAD: Convert back and forth

糟糕:来回转换

df_polars = pl.from_pandas(df_pandas) df_pandas_again = df_polars.to_pandas() # Why?
undefined
df_polars = pl.from_pandas(df_pandas) df_pandas_again = df_polars.to_pandas() # 何必多此一举?
undefined

DON'T: Collect Too Early

不要:过早调用collect

python
undefined
python
undefined

BAD: Defeats lazy evaluation

糟糕:违背延迟计算的初衷

df = pl.scan_parquet("data.parquet").collect() # Full load filtered = df.filter(...) # After the fact
df = pl.scan_parquet("data.parquet").collect() # 加载全部数据 filtered = df.filter(...) # 事后过滤

GOOD: Filter before collect

优秀:先过滤再collect

df = pl.scan_parquet("data.parquet").filter(...).collect()
undefined
df = pl.scan_parquet("data.parquet").filter(...).collect()
undefined

DON'T: Ignore Memory Pressure

不要:忽略内存压力

python
undefined
python
undefined

BAD: Loads entire file

糟糕:加载整个文件

df = pl.read_parquet("huge_file.parquet")
df = pl.read_parquet("huge_file.parquet")

GOOD: Stream in chunks

优秀:分块流式处理

for batch in pl.scan_parquet("huge_file.parquet").iter_batches(): process(batch)

---
for batch in pl.scan_parquet("huge_file.parquet").iter_batches(): process(batch)

---

References

参考资料

Troubleshooting

故障排除

IssueCauseSolution
Memory spike during loadCollecting too earlyUse lazy evaluation, call collect() only when needed
Arrow conversion failsUnsupported data typeCheck for object columns, convert to native types
ClickHouse connection errorWrong port or credentialsVerify host:8123 (HTTP) or host:9000 (native)
Zero-copy not workingIntermediate pandas conversionRemove to_pandas() calls, stay in Arrow/Polars
Polars hook blocking codePandas used without exceptionAdd
# polars-exception: reason
comment at file top
Slow group-by operationsUsing pandas for large datasetsMigrate to Polars for 5-10x speedup
Schema validation failureColumn names case-sensitiveVerify exact column names from source
PyTorch DataLoader OOMLoading full dataset into memoryUse PolarsDataset with Arrow backing for lazy access
Parquet scan performanceNot using predicate pushdownAdd filters before collect() for lazy evaluation
Type mismatch in tensorFloat64 vs Float32 mismatchExplicitly cast with .cast(pl.Float32) before numpy
问题描述原因解决方案
加载时内存突增过早调用collect使用延迟计算,仅在需要时调用collect()
Arrow转换失败不支持的数据类型检查是否存在object类型列,转换为原生类型
ClickHouse连接错误端口或凭证错误验证主机端口:8123(HTTP)或9000(原生协议)
零拷贝功能不生效中间进行了Pandas转换移除to_pandas()调用,全程使用Arrow/Polars
Polars钩子阻止代码运行使用Pandas未添加例外声明在文件顶部添加
# polars-exception: 原因
注释
分组聚合操作缓慢对大数据集使用Pandas迁移到Polars以获得5-10倍的速度提升
Schema验证失败列名区分大小写验证与数据源完全一致的列名
PyTorch DataLoader内存不足将整个数据集加载到内存中使用基于Arrow的PolarsDataset实现延迟访问
Parquet扫描性能差未使用谓词下推在collect()之前添加过滤条件以利用延迟计算
张量类型不匹配Float64与Float32不匹配在转换为numpy前使用.cast(pl.Float32)显式转换