ray-train
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseRay Train - Distributed Training Orchestration
Ray Train - 分布式训练编排
Quick start
快速开始
Ray Train scales machine learning training from single GPU to multi-node clusters with minimal code changes.
Installation:
bash
pip install -U "ray[train]"Basic PyTorch training (single node):
python
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nnRay Train 只需极少代码改动,就能将机器学习训练从单GPU扩展到多节点集群。
安装:
bash
pip install -U "ray[train]"基础PyTorch训练(单节点):
python
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nnDefine training function
定义训练函数
def train_func(config):
# Your normal PyTorch code
model = nn.Linear(10, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# Prepare for distributed (Ray handles device placement)
model = train.torch.prepare_model(model)
for epoch in range(10):
# Your training loop
output = model(torch.randn(32, 10))
loss = output.sum()
loss.backward()
optimizer.step()
optimizer.zero_grad()
# Report metrics (logged automatically)
train.report({"loss": loss.item(), "epoch": epoch})def train_func(config):
# 常规PyTorch代码
model = nn.Linear(10, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 为分布式训练做准备(Ray自动处理设备分配)
model = train.torch.prepare_model(model)
for epoch in range(10):
# 训练循环
output = model(torch.randn(32, 10))
loss = output.sum()
loss.backward()
optimizer.step()
optimizer.zero_grad()
# 上报指标(自动记录)
train.report({"loss": loss.item(), "epoch": epoch})Run distributed training
运行分布式训练
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=4, # 4 GPUs/workers
use_gpu=True
)
)
result = trainer.fit()
print(f"Final loss: {result.metrics['loss']}")
**That's it!** Ray handles:
- Distributed coordination
- GPU allocation
- Fault tolerance
- Checkpointing
- Metric aggregationtrainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=4, # 4个GPU/工作节点
use_gpu=True
)
)
result = trainer.fit()
print(f"最终损失: {result.metrics['loss']}")
**就是这么简单!** Ray会处理以下工作:
- 分布式协调
- GPU分配
- 容错机制
- 检查点保存
- 指标聚合Common workflows
常见工作流
Workflow 1: Scale existing PyTorch code
工作流1:扩展现有PyTorch代码
Original single-GPU code:
python
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()Ray Train version (scales to multi-GPU/multi-node):
python
from ray.train.torch import TorchTrainer
from ray import train
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# Prepare for distributed (automatic device placement)
model = train.torch.prepare_model(model)
dataloader = train.torch.prepare_data_loader(dataloader)
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
# Report metrics
train.report({"loss": loss.item()})原始单GPU代码:
python
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()Ray Train版本(可扩展至多GPU/多节点):
python
from ray.train.torch import TorchTrainer
from ray import train
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# 为分布式训练做准备(自动设备分配)
model = train.torch.prepare_model(model)
dataloader = train.torch.prepare_data_loader(dataloader)
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
# 上报指标
train.report({"loss": loss.item()})Scale to 8 GPUs
扩展至8个GPU
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
trainer.fit()
**Benefits**: Same code runs on 1 GPU or 1000 GPUstrainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
trainer.fit()
**优势**: 相同代码可在1个GPU或1000个GPU上运行Workflow 2: HuggingFace Transformers integration
工作流2:HuggingFace Transformers集成
python
from ray.train.huggingface import TransformersTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
def train_func(config):
# Load model and tokenizer
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
# Training arguments (HuggingFace API)
training_args = TrainingArguments(
output_dir="./output",
num_train_epochs=3,
per_device_train_batch_size=8,
learning_rate=2e-5,
)
# Ray automatically handles distributed training
from transformers import Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
)
trainer.train()python
from ray.train.huggingface import TransformersTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
def train_func(config):
# 加载模型和分词器
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
# 训练参数(HuggingFace API)
training_args = TrainingArguments(
output_dir="./output",
num_train_epochs=3,
per_device_train_batch_size=8,
learning_rate=2e-5,
)
# Ray自动处理分布式训练
from transformers import Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
)
trainer.train()Scale to multi-node (2 nodes × 8 GPUs = 16 workers)
扩展至多节点(2个节点 × 8个GPU = 16个工作节点)
trainer = TransformersTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=16,
use_gpu=True,
resources_per_worker={"GPU": 1}
)
)
result = trainer.fit()
undefinedtrainer = TransformersTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=16,
use_gpu=True,
resources_per_worker={"GPU": 1}
)
)
result = trainer.fit()
undefinedWorkflow 3: Hyperparameter tuning with Ray Tune
工作流3:结合Ray Tune进行超参数调优
python
from ray import tune
from ray.train.torch import TorchTrainer
from ray.tune.schedulers import ASHAScheduler
def train_func(config):
# Use hyperparameters from config
lr = config["lr"]
batch_size = config["batch_size"]
model = MyModel()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
model = train.torch.prepare_model(model)
for epoch in range(10):
# Training loop
loss = train_epoch(model, optimizer, batch_size)
train.report({"loss": loss, "epoch": epoch})python
from ray import tune
from ray.train.torch import TorchTrainer
from ray.tune.schedulers import ASHAScheduler
def train_func(config):
# 使用配置中的超参数
lr = config["lr"]
batch_size = config["batch_size"]
model = MyModel()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
model = train.torch.prepare_model(model)
for epoch in range(10):
# 训练循环
loss = train_epoch(model, optimizer, batch_size)
train.report({"loss": loss, "epoch": epoch})Define search space
定义搜索空间
param_space = {
"lr": tune.loguniform(1e-5, 1e-2),
"batch_size": tune.choice([16, 32, 64, 128])
}
param_space = {
"lr": tune.loguniform(1e-5, 1e-2),
"batch_size": tune.choice([16, 32, 64, 128])
}
Run 20 trials with early stopping
运行20次带早停的试验
tuner = tune.Tuner(
TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
),
param_space=param_space,
tune_config=tune.TuneConfig(
num_samples=20,
scheduler=ASHAScheduler(metric="loss", mode="min")
)
)
results = tuner.fit()
best = results.get_best_result(metric="loss", mode="min")
print(f"Best hyperparameters: {best.config}")
**Result**: Distributed hyperparameter search across clustertuner = tune.Tuner(
TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
),
param_space=param_space,
tune_config=tune.TuneConfig(
num_samples=20,
scheduler=ASHAScheduler(metric="loss", mode="min")
)
)
results = tuner.fit()
best = results.get_best_result(metric="loss", mode="min")
print(f"最优超参数: {best.config}")
**结果**: 跨集群的分布式超参数搜索Workflow 4: Checkpointing and fault tolerance
工作流4:检查点与容错
python
from ray import train
from ray.train import Checkpoint
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# Try to resume from checkpoint
checkpoint = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
state = torch.load(f"{checkpoint_dir}/model.pt")
model.load_state_dict(state["model"])
optimizer.load_state_dict(state["optimizer"])
start_epoch = state["epoch"]
else:
start_epoch = 0
model = train.torch.prepare_model(model)
for epoch in range(start_epoch, 100):
loss = train_epoch(model, optimizer)
# Save checkpoint every 10 epochs
if epoch % 10 == 0:
checkpoint = Checkpoint.from_directory(
train.get_context().get_trial_dir()
)
torch.save({
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"epoch": epoch
}, checkpoint.path / "model.pt")
train.report({"loss": loss}, checkpoint=checkpoint)
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)python
from ray import train
from ray.train import Checkpoint
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# 尝试从检查点恢复
checkpoint = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
state = torch.load(f"{checkpoint_dir}/model.pt")
model.load_state_dict(state["model"])
optimizer.load_state_dict(state["optimizer"])
start_epoch = state["epoch"]
else:
start_epoch = 0
model = train.torch.prepare_model(model)
for epoch in range(start_epoch, 100):
loss = train_epoch(model, optimizer)
# 每10个epoch保存一次检查点
if epoch % 10 == 0:
checkpoint = Checkpoint.from_directory(
train.get_context().get_trial_dir()
)
torch.save({
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"epoch": epoch
}, checkpoint.path / "model.pt")
train.report({"loss": loss}, checkpoint=checkpoint)
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)Automatically resumes from checkpoint if training fails
训练失败时自动从检查点恢复
result = trainer.fit()
undefinedresult = trainer.fit()
undefinedWorkflow 5: Multi-node training
工作流5:多节点训练
python
from ray.train import ScalingConfigpython
from ray.train import ScalingConfigConnect to Ray cluster
连接到Ray集群
ray.init(address="auto") # Or ray.init("ray://head-node:10001")
ray.init(address="auto") # 或 ray.init("ray://head-node:10001")
Train across 4 nodes × 8 GPUs = 32 workers
跨4个节点 × 8个GPU = 32个工作节点进行训练
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=32,
use_gpu=True,
resources_per_worker={"GPU": 1, "CPU": 4},
placement_strategy="SPREAD" # Spread across nodes
)
)
result = trainer.fit()
**Launch Ray cluster**:
```bashtrainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=32,
use_gpu=True,
resources_per_worker={"GPU": 1, "CPU": 4},
placement_strategy="SPREAD" # 跨节点分散部署
)
)
result = trainer.fit()
**启动Ray集群**:
```bashOn head node
在头节点执行
ray start --head --port=6379
ray start --head --port=6379
On worker nodes
在工作节点执行
ray start --address=<head-node-ip>:6379
undefinedray start --address=<head-node-ip>:6379
undefinedWhen to use vs alternatives
适用场景与替代方案对比
Use Ray Train when:
- Training across multiple machines (multi-node)
- Need hyperparameter tuning at scale
- Want fault tolerance (auto-restart failed workers)
- Elastic scaling (add/remove nodes during training)
- Unified framework (same code for PyTorch/TF/HF)
Key advantages:
- Multi-node orchestration: Easiest multi-node setup
- Ray Tune integration: Best-in-class hyperparameter tuning
- Fault tolerance: Automatic recovery from failures
- Elastic: Add/remove nodes without restarting
- Framework agnostic: PyTorch, TensorFlow, HuggingFace, XGBoost
Use alternatives instead:
- Accelerate: Single-node multi-GPU, simpler
- PyTorch Lightning: High-level abstractions, callbacks
- DeepSpeed: Maximum performance, complex setup
- Raw DDP: Maximum control, minimal overhead
适合使用Ray Train的场景:
- 跨多台机器训练(多节点)
- 需要大规模超参数调优
- 希望具备容错能力(自动重启失败的工作节点)
- 需要弹性伸缩(训练过程中添加/移除节点)
- 统一框架(PyTorch/TF/HF使用相同代码)
核心优势:
- 多节点编排: 最简单的多节点设置方式
- Ray Tune集成: 业界领先的超参数调优能力
- 容错机制: 从故障中自动恢复
- 弹性伸缩: 无需重启即可添加/移除节点
- 框架无关: 支持PyTorch、TensorFlow、HuggingFace、XGBoost
适合使用替代方案的场景:
- Accelerate: 单节点多GPU,设置更简单
- PyTorch Lightning: 高层抽象、回调机制丰富
- DeepSpeed: 性能最优,但设置复杂
- 原生DDP: 控制度最高,开销最小
Common issues
常见问题
Issue: Ray cluster not connecting
Check ray status:
bash
ray status问题: Ray集群连接失败
检查Ray状态:
bash
ray statusShould show:
正常输出应包含:
- Nodes: 4
- Nodes: 4
- GPUs: 32
- GPUs: 32
- Workers: Ready
- Workers: Ready
If not connected:
```bash
如果连接失败:
```bashRestart head node
重启头节点
ray stop
ray start --head --port=6379 --dashboard-host=0.0.0.0
ray stop
ray start --head --port=6379 --dashboard-host=0.0.0.0
Restart worker nodes
重启工作节点
ray stop
ray start --address=<head-ip>:6379
**Issue: Out of memory**
Reduce workers or use gradient accumulation:
```python
scaling_config=ScalingConfig(
num_workers=4, # Reduce from 8
use_gpu=True
)ray stop
ray start --address=<head-ip>:6379
**问题: 内存不足**
减少工作节点数量或使用梯度累积:
```python
scaling_config=ScalingConfig(
num_workers=4, # 从8个减少到4个
use_gpu=True
)In train_func, accumulate gradients
在训练函数中使用梯度累积
for i, batch in enumerate(dataloader):
loss = model(batch) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
**Issue: Slow training**
Check if data loading is bottleneck:
```python
import time
def train_func(config):
for epoch in range(epochs):
start = time.time()
for batch in dataloader:
data_time = time.time() - start
# Train...
start = time.time()
print(f"Data loading: {data_time:.3f}s")If data loading is slow, increase workers:
python
dataloader = DataLoader(dataset, num_workers=8)for i, batch in enumerate(dataloader):
loss = model(batch) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
**问题: 训练速度慢**
检查数据加载是否成为瓶颈:
```python
import time
def train_func(config):
for epoch in range(epochs):
start = time.time()
for batch in dataloader:
data_time = time.time() - start
# 训练...
start = time.time()
print(f"数据加载耗时: {data_time:.3f}s")如果数据加载缓慢,增加数据加载工作节点:
python
dataloader = DataLoader(dataset, num_workers=8)Advanced topics
高级主题
Multi-node setup: See references/multi-node.md for Ray cluster deployment on AWS, GCP, Kubernetes, and SLURM.
Hyperparameter tuning: See references/hyperparameter-tuning.md for Ray Tune integration, search algorithms (Optuna, HyperOpt), and population-based training.
Custom training loops: See references/custom-loops.md for advanced Ray Train usage, custom backends, and integration with other frameworks.
多节点设置: 请参考 references/multi-node.md 了解在AWS、GCP、Kubernetes和SLURM上部署Ray集群的方法。
超参数调优: 请参考 references/hyperparameter-tuning.md 了解Ray Tune集成、搜索算法(Optuna、HyperOpt)和基于种群的训练。
自定义训练循环: 请参考 references/custom-loops.md 了解Ray Train的高级用法、自定义后端以及与其他框架的集成。
Hardware requirements
硬件要求
- Single node: 1+ GPUs (or CPUs)
- Multi-node: 2+ machines with network connectivity
- Cloud: AWS, GCP, Azure (Ray autoscaling)
- On-prem: Kubernetes, SLURM clusters
Supported accelerators:
- NVIDIA GPUs (CUDA)
- AMD GPUs (ROCm)
- TPUs (Google Cloud)
- CPUs
- 单节点: 1个及以上GPU(或CPU)
- 多节点: 2台及以上具备网络连通性的机器
- 云环境: AWS、GCP、Azure(支持Ray自动伸缩)
- 本地集群: Kubernetes、SLURM集群
支持的加速器:
- NVIDIA GPU(CUDA)
- AMD GPU(ROCm)
- TPU(Google Cloud)
- CPU
Resources
资源
- Docs: https://docs.ray.io/en/latest/train/train.html
- GitHub: https://github.com/ray-project/ray ⭐ 36,000+
- Version: 2.40.0+
- Examples: https://docs.ray.io/en/latest/train/examples.html
- Slack: https://forms.gle/9TSdDYUgxYs8SA9e8
- Used by: OpenAI, Uber, Spotify, Shopify, Instacart
- 文档: https://docs.ray.io/en/latest/train/train.html
- GitHub: https://github.com/ray-project/ray ⭐ 36,000+
- 版本: 2.40.0+
- 示例: https://docs.ray.io/en/latest/train/examples.html
- Slack社区: https://forms.gle/9TSdDYUgxYs8SA9e8
- 使用者: OpenAI、Uber、Spotify、Shopify、Instacart