ray-train

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Ray Train - Distributed Training Orchestration

Ray Train - 分布式训练编排

Quick start

快速开始

Ray Train scales machine learning training from single GPU to multi-node clusters with minimal code changes.
Installation:
bash
pip install -U "ray[train]"
Basic PyTorch training (single node):
python
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
Ray Train 只需极少代码改动,就能将机器学习训练从单GPU扩展到多节点集群。
安装:
bash
pip install -U "ray[train]"
基础PyTorch训练(单节点):
python
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn

Define training function

定义训练函数

def train_func(config): # Your normal PyTorch code model = nn.Linear(10, 1) optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# Prepare for distributed (Ray handles device placement)
model = train.torch.prepare_model(model)

for epoch in range(10):
    # Your training loop
    output = model(torch.randn(32, 10))
    loss = output.sum()
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

    # Report metrics (logged automatically)
    train.report({"loss": loss.item(), "epoch": epoch})
def train_func(config): # 常规PyTorch代码 model = nn.Linear(10, 1) optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 为分布式训练做准备(Ray自动处理设备分配)
model = train.torch.prepare_model(model)

for epoch in range(10):
    # 训练循环
    output = model(torch.randn(32, 10))
    loss = output.sum()
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

    # 上报指标(自动记录)
    train.report({"loss": loss.item(), "epoch": epoch})

Run distributed training

运行分布式训练

trainer = TorchTrainer( train_func, scaling_config=ScalingConfig( num_workers=4, # 4 GPUs/workers use_gpu=True ) )
result = trainer.fit() print(f"Final loss: {result.metrics['loss']}")

**That's it!** Ray handles:
- Distributed coordination
- GPU allocation
- Fault tolerance
- Checkpointing
- Metric aggregation
trainer = TorchTrainer( train_func, scaling_config=ScalingConfig( num_workers=4, # 4个GPU/工作节点 use_gpu=True ) )
result = trainer.fit() print(f"最终损失: {result.metrics['loss']}")

**就是这么简单!** Ray会处理以下工作:
- 分布式协调
- GPU分配
- 容错机制
- 检查点保存
- 指标聚合

Common workflows

常见工作流

Workflow 1: Scale existing PyTorch code

工作流1:扩展现有PyTorch代码

Original single-GPU code:
python
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())

for epoch in range(epochs):
    for batch in dataloader:
        loss = model(batch)
        loss.backward()
        optimizer.step()
Ray Train version (scales to multi-GPU/multi-node):
python
from ray.train.torch import TorchTrainer
from ray import train

def train_func(config):
    model = MyModel()
    optimizer = torch.optim.Adam(model.parameters())

    # Prepare for distributed (automatic device placement)
    model = train.torch.prepare_model(model)
    dataloader = train.torch.prepare_data_loader(dataloader)

    for epoch in range(epochs):
        for batch in dataloader:
            loss = model(batch)
            loss.backward()
            optimizer.step()

            # Report metrics
            train.report({"loss": loss.item()})
原始单GPU代码:
python
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())

for epoch in range(epochs):
    for batch in dataloader:
        loss = model(batch)
        loss.backward()
        optimizer.step()
Ray Train版本(可扩展至多GPU/多节点):
python
from ray.train.torch import TorchTrainer
from ray import train

def train_func(config):
    model = MyModel()
    optimizer = torch.optim.Adam(model.parameters())

    # 为分布式训练做准备(自动设备分配)
    model = train.torch.prepare_model(model)
    dataloader = train.torch.prepare_data_loader(dataloader)

    for epoch in range(epochs):
        for batch in dataloader:
            loss = model(batch)
            loss.backward()
            optimizer.step()

            # 上报指标
            train.report({"loss": loss.item()})

Scale to 8 GPUs

扩展至8个GPU

trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=8, use_gpu=True) ) trainer.fit()

**Benefits**: Same code runs on 1 GPU or 1000 GPUs
trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=8, use_gpu=True) ) trainer.fit()

**优势**: 相同代码可在1个GPU或1000个GPU上运行

Workflow 2: HuggingFace Transformers integration

工作流2:HuggingFace Transformers集成

python
from ray.train.huggingface import TransformersTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments

def train_func(config):
    # Load model and tokenizer
    model = AutoModelForCausalLM.from_pretrained("gpt2")
    tokenizer = AutoTokenizer.from_pretrained("gpt2")

    # Training arguments (HuggingFace API)
    training_args = TrainingArguments(
        output_dir="./output",
        num_train_epochs=3,
        per_device_train_batch_size=8,
        learning_rate=2e-5,
    )

    # Ray automatically handles distributed training
    from transformers import Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
    )

    trainer.train()
python
from ray.train.huggingface import TransformersTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments

def train_func(config):
    # 加载模型和分词器
    model = AutoModelForCausalLM.from_pretrained("gpt2")
    tokenizer = AutoTokenizer.from_pretrained("gpt2")

    # 训练参数(HuggingFace API)
    training_args = TrainingArguments(
        output_dir="./output",
        num_train_epochs=3,
        per_device_train_batch_size=8,
        learning_rate=2e-5,
    )

    # Ray自动处理分布式训练
    from transformers import Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
    )

    trainer.train()

Scale to multi-node (2 nodes × 8 GPUs = 16 workers)

扩展至多节点(2个节点 × 8个GPU = 16个工作节点)

trainer = TransformersTrainer( train_func, scaling_config=ScalingConfig( num_workers=16, use_gpu=True, resources_per_worker={"GPU": 1} ) )
result = trainer.fit()
undefined
trainer = TransformersTrainer( train_func, scaling_config=ScalingConfig( num_workers=16, use_gpu=True, resources_per_worker={"GPU": 1} ) )
result = trainer.fit()
undefined

Workflow 3: Hyperparameter tuning with Ray Tune

工作流3:结合Ray Tune进行超参数调优

python
from ray import tune
from ray.train.torch import TorchTrainer
from ray.tune.schedulers import ASHAScheduler

def train_func(config):
    # Use hyperparameters from config
    lr = config["lr"]
    batch_size = config["batch_size"]

    model = MyModel()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    model = train.torch.prepare_model(model)

    for epoch in range(10):
        # Training loop
        loss = train_epoch(model, optimizer, batch_size)
        train.report({"loss": loss, "epoch": epoch})
python
from ray import tune
from ray.train.torch import TorchTrainer
from ray.tune.schedulers import ASHAScheduler

def train_func(config):
    # 使用配置中的超参数
    lr = config["lr"]
    batch_size = config["batch_size"]

    model = MyModel()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    model = train.torch.prepare_model(model)

    for epoch in range(10):
        # 训练循环
        loss = train_epoch(model, optimizer, batch_size)
        train.report({"loss": loss, "epoch": epoch})

Define search space

定义搜索空间

param_space = { "lr": tune.loguniform(1e-5, 1e-2), "batch_size": tune.choice([16, 32, 64, 128]) }
param_space = { "lr": tune.loguniform(1e-5, 1e-2), "batch_size": tune.choice([16, 32, 64, 128]) }

Run 20 trials with early stopping

运行20次带早停的试验

tuner = tune.Tuner( TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=4, use_gpu=True) ), param_space=param_space, tune_config=tune.TuneConfig( num_samples=20, scheduler=ASHAScheduler(metric="loss", mode="min") ) )
results = tuner.fit() best = results.get_best_result(metric="loss", mode="min") print(f"Best hyperparameters: {best.config}")

**Result**: Distributed hyperparameter search across cluster
tuner = tune.Tuner( TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=4, use_gpu=True) ), param_space=param_space, tune_config=tune.TuneConfig( num_samples=20, scheduler=ASHAScheduler(metric="loss", mode="min") ) )
results = tuner.fit() best = results.get_best_result(metric="loss", mode="min") print(f"最优超参数: {best.config}")

**结果**: 跨集群的分布式超参数搜索

Workflow 4: Checkpointing and fault tolerance

工作流4:检查点与容错

python
from ray import train
from ray.train import Checkpoint

def train_func(config):
    model = MyModel()
    optimizer = torch.optim.Adam(model.parameters())

    # Try to resume from checkpoint
    checkpoint = train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            state = torch.load(f"{checkpoint_dir}/model.pt")
            model.load_state_dict(state["model"])
            optimizer.load_state_dict(state["optimizer"])
            start_epoch = state["epoch"]
    else:
        start_epoch = 0

    model = train.torch.prepare_model(model)

    for epoch in range(start_epoch, 100):
        loss = train_epoch(model, optimizer)

        # Save checkpoint every 10 epochs
        if epoch % 10 == 0:
            checkpoint = Checkpoint.from_directory(
                train.get_context().get_trial_dir()
            )
            torch.save({
                "model": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "epoch": epoch
            }, checkpoint.path / "model.pt")

            train.report({"loss": loss}, checkpoint=checkpoint)

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
python
from ray import train
from ray.train import Checkpoint

def train_func(config):
    model = MyModel()
    optimizer = torch.optim.Adam(model.parameters())

    # 尝试从检查点恢复
    checkpoint = train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            state = torch.load(f"{checkpoint_dir}/model.pt")
            model.load_state_dict(state["model"])
            optimizer.load_state_dict(state["optimizer"])
            start_epoch = state["epoch"]
    else:
        start_epoch = 0

    model = train.torch.prepare_model(model)

    for epoch in range(start_epoch, 100):
        loss = train_epoch(model, optimizer)

        # 每10个epoch保存一次检查点
        if epoch % 10 == 0:
            checkpoint = Checkpoint.from_directory(
                train.get_context().get_trial_dir()
            )
            torch.save({
                "model": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "epoch": epoch
            }, checkpoint.path / "model.pt")

            train.report({"loss": loss}, checkpoint=checkpoint)

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)

Automatically resumes from checkpoint if training fails

训练失败时自动从检查点恢复

result = trainer.fit()
undefined
result = trainer.fit()
undefined

Workflow 5: Multi-node training

工作流5:多节点训练

python
from ray.train import ScalingConfig
python
from ray.train import ScalingConfig

Connect to Ray cluster

连接到Ray集群

ray.init(address="auto") # Or ray.init("ray://head-node:10001")
ray.init(address="auto") # 或 ray.init("ray://head-node:10001")

Train across 4 nodes × 8 GPUs = 32 workers

跨4个节点 × 8个GPU = 32个工作节点进行训练

trainer = TorchTrainer( train_func, scaling_config=ScalingConfig( num_workers=32, use_gpu=True, resources_per_worker={"GPU": 1, "CPU": 4}, placement_strategy="SPREAD" # Spread across nodes ) )
result = trainer.fit()

**Launch Ray cluster**:
```bash
trainer = TorchTrainer( train_func, scaling_config=ScalingConfig( num_workers=32, use_gpu=True, resources_per_worker={"GPU": 1, "CPU": 4}, placement_strategy="SPREAD" # 跨节点分散部署 ) )
result = trainer.fit()

**启动Ray集群**:
```bash

On head node

在头节点执行

ray start --head --port=6379
ray start --head --port=6379

On worker nodes

在工作节点执行

ray start --address=<head-node-ip>:6379
undefined
ray start --address=<head-node-ip>:6379
undefined

When to use vs alternatives

适用场景与替代方案对比

Use Ray Train when:
  • Training across multiple machines (multi-node)
  • Need hyperparameter tuning at scale
  • Want fault tolerance (auto-restart failed workers)
  • Elastic scaling (add/remove nodes during training)
  • Unified framework (same code for PyTorch/TF/HF)
Key advantages:
  • Multi-node orchestration: Easiest multi-node setup
  • Ray Tune integration: Best-in-class hyperparameter tuning
  • Fault tolerance: Automatic recovery from failures
  • Elastic: Add/remove nodes without restarting
  • Framework agnostic: PyTorch, TensorFlow, HuggingFace, XGBoost
Use alternatives instead:
  • Accelerate: Single-node multi-GPU, simpler
  • PyTorch Lightning: High-level abstractions, callbacks
  • DeepSpeed: Maximum performance, complex setup
  • Raw DDP: Maximum control, minimal overhead
适合使用Ray Train的场景:
  • 跨多台机器训练(多节点)
  • 需要大规模超参数调优
  • 希望具备容错能力(自动重启失败的工作节点)
  • 需要弹性伸缩(训练过程中添加/移除节点)
  • 统一框架(PyTorch/TF/HF使用相同代码)
核心优势:
  • 多节点编排: 最简单的多节点设置方式
  • Ray Tune集成: 业界领先的超参数调优能力
  • 容错机制: 从故障中自动恢复
  • 弹性伸缩: 无需重启即可添加/移除节点
  • 框架无关: 支持PyTorch、TensorFlow、HuggingFace、XGBoost
适合使用替代方案的场景:
  • Accelerate: 单节点多GPU,设置更简单
  • PyTorch Lightning: 高层抽象、回调机制丰富
  • DeepSpeed: 性能最优,但设置复杂
  • 原生DDP: 控制度最高,开销最小

Common issues

常见问题

Issue: Ray cluster not connecting
Check ray status:
bash
ray status
问题: Ray集群连接失败
检查Ray状态:
bash
ray status

Should show:

正常输出应包含:

- Nodes: 4

- Nodes: 4

- GPUs: 32

- GPUs: 32

- Workers: Ready

- Workers: Ready


If not connected:
```bash

如果连接失败:
```bash

Restart head node

重启头节点

ray stop ray start --head --port=6379 --dashboard-host=0.0.0.0
ray stop ray start --head --port=6379 --dashboard-host=0.0.0.0

Restart worker nodes

重启工作节点

ray stop ray start --address=<head-ip>:6379

**Issue: Out of memory**

Reduce workers or use gradient accumulation:
```python
scaling_config=ScalingConfig(
    num_workers=4,  # Reduce from 8
    use_gpu=True
)
ray stop ray start --address=<head-ip>:6379

**问题: 内存不足**

减少工作节点数量或使用梯度累积:
```python
scaling_config=ScalingConfig(
    num_workers=4,  # 从8个减少到4个
    use_gpu=True
)

In train_func, accumulate gradients

在训练函数中使用梯度累积

for i, batch in enumerate(dataloader): loss = model(batch) / accumulation_steps loss.backward()
if (i + 1) % accumulation_steps == 0:
    optimizer.step()
    optimizer.zero_grad()

**Issue: Slow training**

Check if data loading is bottleneck:
```python
import time

def train_func(config):
    for epoch in range(epochs):
        start = time.time()
        for batch in dataloader:
            data_time = time.time() - start
            # Train...
            start = time.time()
            print(f"Data loading: {data_time:.3f}s")
If data loading is slow, increase workers:
python
dataloader = DataLoader(dataset, num_workers=8)
for i, batch in enumerate(dataloader): loss = model(batch) / accumulation_steps loss.backward()
if (i + 1) % accumulation_steps == 0:
    optimizer.step()
    optimizer.zero_grad()

**问题: 训练速度慢**

检查数据加载是否成为瓶颈:
```python
import time

def train_func(config):
    for epoch in range(epochs):
        start = time.time()
        for batch in dataloader:
            data_time = time.time() - start
            # 训练...
            start = time.time()
            print(f"数据加载耗时: {data_time:.3f}s")
如果数据加载缓慢,增加数据加载工作节点:
python
dataloader = DataLoader(dataset, num_workers=8)

Advanced topics

高级主题

Multi-node setup: See references/multi-node.md for Ray cluster deployment on AWS, GCP, Kubernetes, and SLURM.
Hyperparameter tuning: See references/hyperparameter-tuning.md for Ray Tune integration, search algorithms (Optuna, HyperOpt), and population-based training.
Custom training loops: See references/custom-loops.md for advanced Ray Train usage, custom backends, and integration with other frameworks.
多节点设置: 请参考 references/multi-node.md 了解在AWS、GCP、Kubernetes和SLURM上部署Ray集群的方法。
超参数调优: 请参考 references/hyperparameter-tuning.md 了解Ray Tune集成、搜索算法(Optuna、HyperOpt)和基于种群的训练。
自定义训练循环: 请参考 references/custom-loops.md 了解Ray Train的高级用法、自定义后端以及与其他框架的集成。

Hardware requirements

硬件要求

  • Single node: 1+ GPUs (or CPUs)
  • Multi-node: 2+ machines with network connectivity
  • Cloud: AWS, GCP, Azure (Ray autoscaling)
  • On-prem: Kubernetes, SLURM clusters
Supported accelerators:
  • NVIDIA GPUs (CUDA)
  • AMD GPUs (ROCm)
  • TPUs (Google Cloud)
  • CPUs
  • 单节点: 1个及以上GPU(或CPU)
  • 多节点: 2台及以上具备网络连通性的机器
  • 云环境: AWS、GCP、Azure(支持Ray自动伸缩)
  • 本地集群: Kubernetes、SLURM集群
支持的加速器:
  • NVIDIA GPU(CUDA)
  • AMD GPU(ROCm)
  • TPU(Google Cloud)
  • CPU

Resources

资源