ray-data
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseRay Data - Scalable ML Data Processing
Ray Data - 可扩展的机器学习数据处理工具
Distributed data processing library for ML and AI workloads.
面向机器学习与人工智能工作负载的分布式数据处理库。
When to use Ray Data
何时使用Ray Data
Use Ray Data when:
- Processing large datasets (>100GB) for ML training
- Need distributed data preprocessing across cluster
- Building batch inference pipelines
- Loading multi-modal data (images, audio, video)
- Scaling data processing from laptop to cluster
Key features:
- Streaming execution: Process data larger than memory
- GPU support: Accelerate transforms with GPUs
- Framework integration: PyTorch, TensorFlow, HuggingFace
- Multi-modal: Images, Parquet, CSV, JSON, audio, video
Use alternatives instead:
- Pandas: Small data (<1GB) on single machine
- Dask: Tabular data, SQL-like operations
- Spark: Enterprise ETL, SQL queries
在以下场景使用Ray Data:
- 处理用于机器学习训练的大型数据集(>100GB)
- 需要在集群中进行分布式数据预处理
- 构建批量推理流水线
- 加载多模态数据(图像、音频、视频)
- 将数据处理从笔记本电脑扩展至集群
核心特性:
- 流式执行:处理超出内存容量的数据
- GPU支持:利用GPU加速数据转换
- 框架集成:兼容PyTorch、TensorFlow、HuggingFace
- 多模态支持:支持图像、Parquet、CSV、JSON、音频、视频格式
以下场景建议使用替代工具:
- Pandas:单台机器上的小型数据(<1GB)处理
- Dask:表格数据、类SQL操作
- Spark:企业级ETL、SQL查询
Quick start
快速开始
Installation
安装
bash
pip install -U 'ray[data]'bash
pip install -U 'ray[data]'Load and transform data
加载与转换数据
python
import raypython
import rayRead Parquet files
读取Parquet文件
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
Transform data (lazy execution)
转换数据(惰性执行)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
Consume data
消费数据
for batch in ds.iter_batches(batch_size=100):
print(batch)
undefinedfor batch in ds.iter_batches(batch_size=100):
print(batch)
undefinedIntegration with Ray Train
与Ray Train集成
python
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainerpython
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainerCreate dataset
创建数据集
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
# Access dataset in training
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
# Train on batch
passtrain_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
# 在训练中访问数据集
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
# 基于批次训练
passTrain with Ray
使用Ray进行训练
trainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
undefinedtrainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
undefinedReading data
读取数据
From cloud storage
从云存储读取
python
import raypython
import rayParquet (recommended for ML)
Parquet(推荐用于机器学习)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
CSV
CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")
ds = ray.data.read_csv("s3://bucket/data/*.csv")
JSON
JSON
ds = ray.data.read_json("gs://bucket/data/*.json")
ds = ray.data.read_json("gs://bucket/data/*.json")
Images
图像
ds = ray.data.read_images("s3://bucket/images/")
undefinedds = ray.data.read_images("s3://bucket/images/")
undefinedFrom Python objects
从Python对象读取
python
undefinedpython
undefinedFrom list
从列表读取
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
From range
从范围生成
ds = ray.data.range(1000000) # Synthetic data
ds = ray.data.range(1000000) # 生成合成数据
From pandas
从Pandas读取
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
undefinedimport pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
undefinedTransformations
数据转换
Map batches (vectorized)
批量映射(向量化)
python
undefinedpython
undefinedBatch transformation (fast)
批量转换(速度更快)
def process_batch(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(process_batch, batch_size=1000)
undefineddef process_batch(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(process_batch, batch_size=1000)
undefinedRow transformations
行级转换
python
undefinedpython
undefinedRow-by-row (slower)
逐行转换(速度较慢)
def process_row(row):
row["squared"] = row["value"] ** 2
return row
ds = ds.map(process_row)
undefineddef process_row(row):
row["squared"] = row["value"] ** 2
return row
ds = ds.map(process_row)
undefinedFilter
过滤
python
undefinedpython
undefinedFilter rows
过滤行
ds = ds.filter(lambda row: row["value"] > 100)
undefinedds = ds.filter(lambda row: row["value"] > 100)
undefinedGroup by and aggregate
分组与聚合
python
undefinedpython
undefinedGroup by column
按列分组
ds = ds.groupby("category").count()
ds = ds.groupby("category").count()
Custom aggregation
自定义聚合
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
undefinedds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
undefinedGPU-accelerated transforms
GPU加速转换
python
undefinedpython
undefinedUse GPU for preprocessing
使用GPU进行预处理
def preprocess_images_gpu(batch):
import torch
images = torch.tensor(batch["image"]).cuda()
# GPU preprocessing
processed = images * 255
return {"processed": processed.cpu().numpy()}
ds = ds.map_batches(
preprocess_images_gpu,
batch_size=64,
num_gpus=1 # Request GPU
)
undefineddef preprocess_images_gpu(batch):
import torch
images = torch.tensor(batch["image"]).cuda()
# GPU预处理
processed = images * 255
return {"processed": processed.cpu().numpy()}
ds = ds.map_batches(
preprocess_images_gpu,
batch_size=64,
num_gpus=1 # 请求GPU资源
)
undefinedWriting data
写入数据
python
undefinedpython
undefinedWrite to Parquet
写入Parquet格式
ds.write_parquet("s3://bucket/output/")
ds.write_parquet("s3://bucket/output/")
Write to CSV
写入CSV格式
ds.write_csv("output/")
ds.write_csv("output/")
Write to JSON
写入JSON格式
ds.write_json("output/")
undefinedds.write_json("output/")
undefinedPerformance optimization
性能优化
Repartition
重新分区
python
undefinedpython
undefinedControl parallelism
控制并行度
ds = ds.repartition(100) # 100 blocks for 100-core cluster
undefinedds = ds.repartition(100) # 为100核集群设置100个数据块
undefinedBatch size tuning
批量大小调优
python
undefinedpython
undefinedLarger batches = faster vectorized ops
更大的批量 = 更快的向量化操作
ds.map_batches(process_fn, batch_size=10000) # vs batch_size=100
undefinedds.map_batches(process_fn, batch_size=10000) # 对比batch_size=100
undefinedStreaming execution
流式执行
python
undefinedpython
undefinedProcess data larger than memory
处理超出内存容量的数据
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
process(batch) # Streamed, not loaded to memory
undefinedds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
process(batch) # 流式处理,无需加载至内存
undefinedCommon patterns
常见模式
Batch inference
批量推理
python
import raypython
import rayLoad model
加载模型
def load_model():
# Load once per worker
return MyModel()
def load_model():
# 每个worker仅加载一次
return MyModel()
Inference function
推理函数
class BatchInference:
def init(self):
self.model = load_model()
def __call__(self, batch):
predictions = self.model(batch["input"])
return {"prediction": predictions}class BatchInference:
def init(self):
self.model = load_model()
def __call__(self, batch):
predictions = self.model(batch["input"])
return {"prediction": predictions}Run distributed inference
运行分布式推理
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
undefinedds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
undefinedData preprocessing pipeline
数据预处理流水线
python
undefinedpython
undefinedMulti-step pipeline
多步骤流水线
ds = (
ray.data.read_parquet("s3://raw/")
.map_batches(clean_data)
.map_batches(tokenize)
.map_batches(augment)
.write_parquet("s3://processed/")
)
undefinedds = (
ray.data.read_parquet("s3://raw/")
.map_batches(clean_data)
.map_batches(tokenize)
.map_batches(augment)
.write_parquet("s3://processed/")
)
undefinedIntegration with ML frameworks
与机器学习框架集成
PyTorch
PyTorch
python
undefinedpython
undefinedConvert to PyTorch
转换为PyTorch数据集
torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds:
# batch is dict with tensors
inputs, labels = batch["features"], batch["label"]
undefinedtorch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds:
# batch为包含张量的字典
inputs, labels = batch["features"], batch["label"]
undefinedTensorFlow
TensorFlow
python
undefinedpython
undefinedConvert to TensorFlow
转换为TensorFlow数据集
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds:
# Train model
pass
undefinedtf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds:
# 训练模型
pass
undefinedSupported data formats
支持的数据格式
| Format | Read | Write | Use Case |
|---|---|---|---|
| Parquet | ✅ | ✅ | ML data (recommended) |
| CSV | ✅ | ✅ | Tabular data |
| JSON | ✅ | ✅ | Semi-structured |
| Images | ✅ | ❌ | Computer vision |
| NumPy | ✅ | ✅ | Arrays |
| Pandas | ✅ | ❌ | DataFrames |
| 格式 | 读取 | 写入 | 使用场景 |
|---|---|---|---|
| Parquet | ✅ | ✅ | 机器学习数据(推荐) |
| CSV | ✅ | ✅ | 表格数据 |
| JSON | ✅ | ✅ | 半结构化数据 |
| 图像 | ✅ | ❌ | 计算机视觉 |
| NumPy | ✅ | ✅ | 数组数据 |
| Pandas | ✅ | ❌ | DataFrame数据 |
Performance benchmarks
性能基准测试
Scaling (processing 100GB data):
- 1 node (16 cores): ~30 minutes
- 4 nodes (64 cores): ~8 minutes
- 16 nodes (256 cores): ~2 minutes
GPU acceleration (image preprocessing):
- CPU only: 1,000 images/sec
- 1 GPU: 5,000 images/sec
- 4 GPUs: 18,000 images/sec
扩展性(处理100GB数据):
- 1节点(16核):约30分钟
- 4节点(64核):约8分钟
- 16节点(256核):约2分钟
GPU加速(图像预处理):
- 仅CPU:1000张图像/秒
- 1块GPU:5000张图像/秒
- 4块GPU:18000张图像/秒
Use cases
实际案例
Production deployments:
- Pinterest: Last-mile data processing for model training
- ByteDance: Scaling offline inference with multi-modal LLMs
- Spotify: ML platform for batch inference
生产部署案例:
- Pinterest:模型训练的最后一公里数据处理
- ByteDance:基于多模态大语言模型扩展离线推理
- Spotify:用于批量推理的机器学习平台
References
参考资料
- Transformations Guide - Map, filter, groupby operations
- Integration Guide - Ray Train, PyTorch, TensorFlow
- 转换操作指南 - 映射、过滤、分组操作
- 集成指南 - Ray Train、PyTorch、TensorFlow集成
Resources
资源
- Docs: https://docs.ray.io/en/latest/data/data.html
- GitHub: https://github.com/ray-project/ray ⭐ 36,000+
- Version: Ray 2.40.0+
- Examples: https://docs.ray.io/en/latest/data/examples/overview.html
- 文档: https://docs.ray.io/en/latest/data/data.html
- GitHub: https://github.com/ray-project/ray ⭐ 36,000+
- 版本: Ray 2.40.0+
- 示例: https://docs.ray.io/en/latest/data/examples/overview.html