ray-data

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Ray Data - Scalable ML Data Processing

Ray Data - 可扩展的机器学习数据处理工具

Distributed data processing library for ML and AI workloads.
面向机器学习与人工智能工作负载的分布式数据处理库。

When to use Ray Data

何时使用Ray Data

Use Ray Data when:
  • Processing large datasets (>100GB) for ML training
  • Need distributed data preprocessing across cluster
  • Building batch inference pipelines
  • Loading multi-modal data (images, audio, video)
  • Scaling data processing from laptop to cluster
Key features:
  • Streaming execution: Process data larger than memory
  • GPU support: Accelerate transforms with GPUs
  • Framework integration: PyTorch, TensorFlow, HuggingFace
  • Multi-modal: Images, Parquet, CSV, JSON, audio, video
Use alternatives instead:
  • Pandas: Small data (<1GB) on single machine
  • Dask: Tabular data, SQL-like operations
  • Spark: Enterprise ETL, SQL queries
在以下场景使用Ray Data:
  • 处理用于机器学习训练的大型数据集(>100GB)
  • 需要在集群中进行分布式数据预处理
  • 构建批量推理流水线
  • 加载多模态数据(图像、音频、视频)
  • 将数据处理从笔记本电脑扩展至集群
核心特性:
  • 流式执行:处理超出内存容量的数据
  • GPU支持:利用GPU加速数据转换
  • 框架集成:兼容PyTorch、TensorFlow、HuggingFace
  • 多模态支持:支持图像、Parquet、CSV、JSON、音频、视频格式
以下场景建议使用替代工具:
  • Pandas:单台机器上的小型数据(<1GB)处理
  • Dask:表格数据、类SQL操作
  • Spark:企业级ETL、SQL查询

Quick start

快速开始

Installation

安装

bash
pip install -U 'ray[data]'
bash
pip install -U 'ray[data]'

Load and transform data

加载与转换数据

python
import ray
python
import ray

Read Parquet files

读取Parquet文件

ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")

Transform data (lazy execution)

转换数据(惰性执行)

ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})

Consume data

消费数据

for batch in ds.iter_batches(batch_size=100): print(batch)
undefined
for batch in ds.iter_batches(batch_size=100): print(batch)
undefined

Integration with Ray Train

与Ray Train集成

python
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
python
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

Create dataset

创建数据集

train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config): # Access dataset in training train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
    for batch in train_ds.iter_batches(batch_size=32):
        # Train on batch
        pass
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config): # 在训练中访问数据集 train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
    for batch in train_ds.iter_batches(batch_size=32):
        # 基于批次训练
        pass

Train with Ray

使用Ray进行训练

trainer = TorchTrainer( train_func, datasets={"train": train_ds}, scaling_config=ScalingConfig(num_workers=4, use_gpu=True) ) trainer.fit()
undefined
trainer = TorchTrainer( train_func, datasets={"train": train_ds}, scaling_config=ScalingConfig(num_workers=4, use_gpu=True) ) trainer.fit()
undefined

Reading data

读取数据

From cloud storage

从云存储读取

python
import ray
python
import ray

Parquet (recommended for ML)

Parquet(推荐用于机器学习)

ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")

CSV

CSV

ds = ray.data.read_csv("s3://bucket/data/*.csv")
ds = ray.data.read_csv("s3://bucket/data/*.csv")

JSON

JSON

ds = ray.data.read_json("gs://bucket/data/*.json")
ds = ray.data.read_json("gs://bucket/data/*.json")

Images

图像

ds = ray.data.read_images("s3://bucket/images/")
undefined
ds = ray.data.read_images("s3://bucket/images/")
undefined

From Python objects

从Python对象读取

python
undefined
python
undefined

From list

从列表读取

ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])

From range

从范围生成

ds = ray.data.range(1000000) # Synthetic data
ds = ray.data.range(1000000) # 生成合成数据

From pandas

从Pandas读取

import pandas as pd df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]}) ds = ray.data.from_pandas(df)
undefined
import pandas as pd df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]}) ds = ray.data.from_pandas(df)
undefined

Transformations

数据转换

Map batches (vectorized)

批量映射(向量化)

python
undefined
python
undefined

Batch transformation (fast)

批量转换(速度更快)

def process_batch(batch): batch["doubled"] = batch["value"] * 2 return batch
ds = ds.map_batches(process_batch, batch_size=1000)
undefined
def process_batch(batch): batch["doubled"] = batch["value"] * 2 return batch
ds = ds.map_batches(process_batch, batch_size=1000)
undefined

Row transformations

行级转换

python
undefined
python
undefined

Row-by-row (slower)

逐行转换(速度较慢)

def process_row(row): row["squared"] = row["value"] ** 2 return row
ds = ds.map(process_row)
undefined
def process_row(row): row["squared"] = row["value"] ** 2 return row
ds = ds.map(process_row)
undefined

Filter

过滤

python
undefined
python
undefined

Filter rows

过滤行

ds = ds.filter(lambda row: row["value"] > 100)
undefined
ds = ds.filter(lambda row: row["value"] > 100)
undefined

Group by and aggregate

分组与聚合

python
undefined
python
undefined

Group by column

按列分组

ds = ds.groupby("category").count()
ds = ds.groupby("category").count()

Custom aggregation

自定义聚合

ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
undefined
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
undefined

GPU-accelerated transforms

GPU加速转换

python
undefined
python
undefined

Use GPU for preprocessing

使用GPU进行预处理

def preprocess_images_gpu(batch): import torch images = torch.tensor(batch["image"]).cuda() # GPU preprocessing processed = images * 255 return {"processed": processed.cpu().numpy()}
ds = ds.map_batches( preprocess_images_gpu, batch_size=64, num_gpus=1 # Request GPU )
undefined
def preprocess_images_gpu(batch): import torch images = torch.tensor(batch["image"]).cuda() # GPU预处理 processed = images * 255 return {"processed": processed.cpu().numpy()}
ds = ds.map_batches( preprocess_images_gpu, batch_size=64, num_gpus=1 # 请求GPU资源 )
undefined

Writing data

写入数据

python
undefined
python
undefined

Write to Parquet

写入Parquet格式

ds.write_parquet("s3://bucket/output/")
ds.write_parquet("s3://bucket/output/")

Write to CSV

写入CSV格式

ds.write_csv("output/")
ds.write_csv("output/")

Write to JSON

写入JSON格式

ds.write_json("output/")
undefined
ds.write_json("output/")
undefined

Performance optimization

性能优化

Repartition

重新分区

python
undefined
python
undefined

Control parallelism

控制并行度

ds = ds.repartition(100) # 100 blocks for 100-core cluster
undefined
ds = ds.repartition(100) # 为100核集群设置100个数据块
undefined

Batch size tuning

批量大小调优

python
undefined
python
undefined

Larger batches = faster vectorized ops

更大的批量 = 更快的向量化操作

ds.map_batches(process_fn, batch_size=10000) # vs batch_size=100
undefined
ds.map_batches(process_fn, batch_size=10000) # 对比batch_size=100
undefined

Streaming execution

流式执行

python
undefined
python
undefined

Process data larger than memory

处理超出内存容量的数据

ds = ray.data.read_parquet("s3://huge-dataset/") for batch in ds.iter_batches(batch_size=1000): process(batch) # Streamed, not loaded to memory
undefined
ds = ray.data.read_parquet("s3://huge-dataset/") for batch in ds.iter_batches(batch_size=1000): process(batch) # 流式处理,无需加载至内存
undefined

Common patterns

常见模式

Batch inference

批量推理

python
import ray
python
import ray

Load model

加载模型

def load_model(): # Load once per worker return MyModel()
def load_model(): # 每个worker仅加载一次 return MyModel()

Inference function

推理函数

class BatchInference: def init(self): self.model = load_model()
def __call__(self, batch):
    predictions = self.model(batch["input"])
    return {"prediction": predictions}
class BatchInference: def init(self): self.model = load_model()
def __call__(self, batch):
    predictions = self.model(batch["input"])
    return {"prediction": predictions}

Run distributed inference

运行分布式推理

ds = ray.data.read_parquet("s3://data/") predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1) predictions.write_parquet("s3://output/")
undefined
ds = ray.data.read_parquet("s3://data/") predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1) predictions.write_parquet("s3://output/")
undefined

Data preprocessing pipeline

数据预处理流水线

python
undefined
python
undefined

Multi-step pipeline

多步骤流水线

ds = ( ray.data.read_parquet("s3://raw/") .map_batches(clean_data) .map_batches(tokenize) .map_batches(augment) .write_parquet("s3://processed/") )
undefined
ds = ( ray.data.read_parquet("s3://raw/") .map_batches(clean_data) .map_batches(tokenize) .map_batches(augment) .write_parquet("s3://processed/") )
undefined

Integration with ML frameworks

与机器学习框架集成

PyTorch

PyTorch

python
undefined
python
undefined

Convert to PyTorch

转换为PyTorch数据集

torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds: # batch is dict with tensors inputs, labels = batch["features"], batch["label"]
undefined
torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds: # batch为包含张量的字典 inputs, labels = batch["features"], batch["label"]
undefined

TensorFlow

TensorFlow

python
undefined
python
undefined

Convert to TensorFlow

转换为TensorFlow数据集

tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds: # Train model pass
undefined
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds: # 训练模型 pass
undefined

Supported data formats

支持的数据格式

FormatReadWriteUse Case
ParquetML data (recommended)
CSVTabular data
JSONSemi-structured
ImagesComputer vision
NumPyArrays
PandasDataFrames
格式读取写入使用场景
Parquet机器学习数据(推荐)
CSV表格数据
JSON半结构化数据
图像计算机视觉
NumPy数组数据
PandasDataFrame数据

Performance benchmarks

性能基准测试

Scaling (processing 100GB data):
  • 1 node (16 cores): ~30 minutes
  • 4 nodes (64 cores): ~8 minutes
  • 16 nodes (256 cores): ~2 minutes
GPU acceleration (image preprocessing):
  • CPU only: 1,000 images/sec
  • 1 GPU: 5,000 images/sec
  • 4 GPUs: 18,000 images/sec
扩展性(处理100GB数据):
  • 1节点(16核):约30分钟
  • 4节点(64核):约8分钟
  • 16节点(256核):约2分钟
GPU加速(图像预处理):
  • 仅CPU:1000张图像/秒
  • 1块GPU:5000张图像/秒
  • 4块GPU:18000张图像/秒

Use cases

实际案例

Production deployments:
  • Pinterest: Last-mile data processing for model training
  • ByteDance: Scaling offline inference with multi-modal LLMs
  • Spotify: ML platform for batch inference
生产部署案例:
  • Pinterest:模型训练的最后一公里数据处理
  • ByteDance:基于多模态大语言模型扩展离线推理
  • Spotify:用于批量推理的机器学习平台

References

参考资料

  • Transformations Guide - Map, filter, groupby operations
  • Integration Guide - Ray Train, PyTorch, TensorFlow
  • 转换操作指南 - 映射、过滤、分组操作
  • 集成指南 - Ray Train、PyTorch、TensorFlow集成

Resources

资源