dali-dynamic-mode

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

DALI Dynamic Mode

DALI动态模式

Dynamic mode is DALI's imperative Python API. Call DALI operators as regular Python functions with standard control flow -- no pipeline graph, no
pipe.build()
, no
pipe.run()
.
python
import nvidia.dali.experimental.dynamic as ndd
动态模式是DALI的命令式Python API。可以像调用普通Python函数一样调用DALI算子,支持标准控制流——无需流水线图,无需
pipe.build()
,无需
pipe.run()
python
import nvidia.dali.experimental.dynamic as ndd

Core Data Types

核心数据类型

Tensor -- single sample

Tensor——单个样本

python
t = ndd.tensor(data)           # copy
t = ndd.as_tensor(data)        # wrap, no copy if possible
t.cpu()                        # move to CPU
t.gpu()                        # move to GPU
t.torch(copy=False)            # conversion to PyTorch tensor with no copy (default)
t[1:3]                         # slicing supported
np.asarray(t)                  # NumPy via __array__ (CPU only)
Supports
__dlpack__
,
__cuda_array_interface__
,
__array__
, arithmetic operators.
python
t = ndd.tensor(data)           # 复制
t = ndd.as_tensor(data)        # 包装,尽可能避免复制
t.cpu()                        # 转移至CPU
t.gpu()                        # 转移至GPU
t.torch(copy=False)            # 转换为PyTorch张量,默认不复制
t[1:3]                         # 支持切片
np.asarray(t)                  # 通过__array__转为NumPy(仅CPU)
支持
__dlpack__
__cuda_array_interface__
__array__
以及算术运算符。

Batch -- collection of samples (variable shapes OK)

Batch——样本集合(支持可变形状)

python
b = ndd.batch([arr1, arr2])    # copy
b = ndd.as_batch(data)         # wrap, no copy if possible
Batch has no
__getitem__
--
batch[i]
raises
TypeError
because indexing is ambiguous (sample selection vs. per-sample slicing). Use the explicit APIs instead:
IntentMethodReturns
Get sample i
batch.select(i)
Tensor
Get subset of samples
batch.select(slice_or_list)
Batch
Slice within each sample
batch.slice[...]
Batch
(same batch_size)
.select()
picks which samples.
.slice
indexes inside each sample.
python
xy = ndd.random.uniform(batch_size=16, range=[0, 1], shape=2)
crop_x = xy.slice[0]       # Batch of 16 scalars, first element from each sample
crop_y = xy.slice[1]       # Batch of 16 scalars, second element from each sample
sample_0 = xy.select(0)    # Tensor, the entire first sample [x, y]
PyTorch conversion:
  • batch.torch()
    -- works for uniform shapes; raises for ragged batches
  • batch.torch(pad=True)
    -- zero-pads ragged batches to max shape (use for variable-length audio, detection boxes, etc.)
  • batch.torch(copy=None)
    is the default (avoids copy if possible)
  • Batch has no
    __dlpack__
    -- use
    ndd.as_tensor(batch)
    first for DLPack consumers.
    ndd.as_tensor
    supports
    pad
    as well.
  • Tensor.torch(copy=False)
    is default (no copy)
Iteration:
for sample in batch:
yields Tensors.
python
b = ndd.batch([arr1, arr2])    # 复制
b = ndd.as_batch(data)         # 包装,尽可能避免复制
Batch不支持
__getitem__
——
batch[i]
会触发
TypeError
,因为索引存在歧义(样本选择 vs 逐样本切片)。请改用以下显式API:
用途方法返回值
获取第i个样本
batch.select(i)
Tensor
获取样本子集
batch.select(slice_or_list)
Batch
对每个样本内部进行切片
batch.slice[...]
Batch
(批次大小不变)
.select()
用于选择哪些样本
.slice
用于对每个样本内部进行索引。
python
xy = ndd.random.uniform(batch_size=16, range=[0, 1], shape=2)
crop_x = xy.slice[0]       # 包含16个标量的Batch,取每个样本的第一个元素
crop_y = xy.slice[1]       # 包含16个标量的Batch,取每个样本的第二个元素
sample_0 = xy.select(0)    # Tensor,完整的第一个样本 [x, y]
PyTorch转换:
  • batch.torch()
    ——适用于形状统一的批次;不规则批次会触发错误
  • batch.torch(pad=True)
    ——将不规则批次零填充至最大形状(适用于可变长度音频、检测框等)
  • batch.torch(copy=None)
    为默认设置(尽可能避免复制)
  • Batch不支持
    __dlpack__
    ——对于DLPack消费者,请先使用
    ndd.as_tensor(batch)
    ndd.as_tensor
    也支持
    pad
    参数。
  • Tensor.torch(copy=False)
    为默认设置(不复制)
迭代:
for sample in batch:
会逐个返回Tensor。

Readers

读取器

Readers are stateful objects -- create once, reuse across epochs. This matters because readers track internal state like shuffle order and shard position.
python
reader = ndd.readers.File(file_root=image_dir, random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        # jpegs, labels are Batch objects
        ...
Key points:
  • Reader outputs (jpegs, labels, etc.) are CPU tensors/batches. Labels typically stay on CPU until you convert them for your framework (e.g.
    labels.torch().to(device)
    ).
  • Reader classes are PascalCase:
    ndd.readers.File(...)
    ,
    ndd.readers.COCO(...)
    ,
    ndd.readers.TFRecord(...)
  • batch_size
    goes to
    next_epoch()
    , not to the reader constructor
  • next_epoch(batch_size=N)
    yields tuples of
    Batch
    ;
    next_epoch()
    without batch_size yields tuples of
    Tensor
  • The iterator from
    next_epoch()
    must be fully consumed before calling
    next_epoch()
    again
  • Once a reader is used with a given batch_size, it cannot be changed. Similarly, a reader used in batch mode cannot switch to sample mode or vice versa.
Sharded reading for distributed training:
python
reader = ndd.readers.File(
    file_root=image_dir,
    shard_id=rank, num_shards=world_size,
    stick_to_shard=True,
    pad_last_batch=True,
)
读取器是有状态对象——创建一次,跨epoch复用。这一点很重要,因为读取器会跟踪内部状态,如洗牌顺序和分片位置。
python
reader = ndd.readers.File(file_root=image_dir, random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        # jpegs、labels为Batch对象
        ...
关键点:
  • 读取器的输出(jpegs、labels等)是CPU张量/Batch。标签通常会留在CPU上,直到你将其转换为框架所需格式(例如
    labels.torch().to(device)
    )。
  • 读取器类采用大驼峰命名
    ndd.readers.File(...)
    ndd.readers.COCO(...)
    ndd.readers.TFRecord(...)
  • batch_size
    传入
    next_epoch()
    ,而非读取器构造函数
  • next_epoch(batch_size=N)
    返回
    Batch
    元组;不带batch_size的
    next_epoch()
    返回
    Tensor
    元组
  • 在再次调用
    next_epoch()
    之前,必须完全消费
    next_epoch()
    返回的迭代器
  • 读取器一旦使用某个batch_size,就无法更改。同样,用于批次模式的读取器无法切换到样本模式,反之亦然。
分布式训练的分片读取:
python
reader = ndd.readers.File(
    file_root=image_dir,
    shard_id=rank, num_shards=world_size,
    stick_to_shard=True,
    pad_last_batch=True,
)

Device Handling

设备处理

  • Device is inferred from inputs -- GPU if any input is on GPU
  • For hybrid decode: use
    device="gpu"
    (NOT
    "mixed"
    ). The
    "mixed"
    keyword is a pipeline-mode concept for implicit CPU-to-GPU transfer; in dynamic mode, passing
    device="gpu"
    triggers the same hardware-accelerated decode path.
  • Don't call
    .cpu()
    before passing to a GPU model --
    .torch()
    gives you a GPU tensor directly.
    .cpu()
    is only needed for consumers requiring host memory (numpy,
    __array__
    ).
  • CUDA stream sync between DALI and PyTorch is automatic via DLPack -- no manual stream management needed.
  • 设备由输入自动推断——若任何输入在GPU上,则使用GPU
  • 混合解码:使用
    device="gpu"
    不要使用
    "mixed"
    )。
    "mixed"
    是流水线模式的概念,用于隐式CPU到GPU的传输;在动态模式下,传入
    device="gpu"
    会触发相同的硬件加速解码路径。
  • 在传入GPU模型前不要调用
    .cpu()
    ——
    .torch()
    会直接返回GPU张量。
    .cpu()
    仅在需要主机内存的消费者(numpy、
    __array__
    )时使用。
  • DALI与PyTorch之间的CUDA流同步通过DLPack自动完成——无需手动管理流。

Execution Model

执行模型

Default mode is
eager
-- async execution in a background thread, returns immediately.
No
.evaluate()
needed in most cases.
Any data consumption (
.torch()
,
__dlpack__
,
__array__
,
.shape
, property access, iteration) triggers evaluation automatically.
For debugging, switch to synchronous mode so errors surface at the exact call site rather than later in the async queue:
python
with ndd.EvalMode.sync_full:
    images = ndd.decoders.image(jpegs, device="gpu")
    images = ndd.resize(images, size=[224, 224])
    # Any error surfaces here, at the exact op that failed
Modes (increasing synchronicity):
deferred
<
eager
<
sync_cpu
<
sync_full
Use
EvalMode.sync_full
for debugging instead of scattering
.evaluate()
calls -- it's cleaner and catches all issues at once.
sync_cpu
is often sufficient and lighter than
sync_full
.
默认模式为
eager
——在后台线程异步执行,立即返回。
大多数情况下无需调用
.evaluate()
。任何数据消费操作(
.torch()
__dlpack__
__array__
.shape
、属性访问、迭代)都会自动触发评估。
调试时,切换到同步模式,以便错误在确切的调用位置而非异步队列中暴露:
python
with ndd.EvalMode.sync_full:
    images = ndd.decoders.image(jpegs, device="gpu")
    images = ndd.resize(images, size=[224, 224])
    # 任何错误都会在此处暴露,即出错的算子所在位置
模式(同步性递增):
deferred
<
eager
<
sync_cpu
<
sync_full
调试时使用
EvalMode.sync_full
,而非分散调用
.evaluate()
——这样更简洁,能一次性捕获所有问题。
sync_cpu
通常已足够,且比
sync_full
更轻量。

Thread Configuration

线程配置

python
ndd.set_num_threads(4)  # Call once at startup, only if necessary to override the defaults
Controls DALI's internal worker threads for CPU operators. Defaults to CPU affinity count or
DALI_NUM_THREADS
env var. Unrelated to Python-level threading.
python
ndd.set_num_threads(4)  # 仅在需要覆盖默认值时,在启动时调用一次
控制DALI用于CPU算子的内部工作线程。默认值为CPU亲和性计数或
DALI_NUM_THREADS
环境变量。与Python级别的线程无关。

RNG

RNG

Two approaches (use one, not both):
python
undefined
两种方式(二选一):
python
undefined

Approach 1: set the thread-local default seed (simple, good enough for most cases)

方式1:设置线程本地默认种子(简单,适用于大多数场景)

ndd.random.set_seed(42) angles = ndd.random.uniform(batch_size=64, range=(-30, 30))
ndd.random.set_seed(42) angles = ndd.random.uniform(batch_size=64, range=(-30, 30))

Approach 2: explicit RNG object (finer control, pass rng= to each op)

方式2:显式RNG对象(更精细的控制,为每个算子传入rng=参数)

rng = ndd.random.RNG(seed=42) values = ndd.random.uniform(batch_size=64, range=[0, 1], shape=2, rng=rng)

When `rng=` is passed to a random op, the explicit RNG overrides the default seed. Thread-local: each thread has independent random state.

Random ops need an explicit `batch_size` when working with batches -- there is no pipeline-level batch size to inherit.
rng = ndd.random.RNG(seed=42) values = ndd.random.uniform(batch_size=64, range=[0, 1], shape=2, rng=rng)

当为随机算子传入`rng=`参数时,显式RNG会覆盖默认种子。线程本地:每个线程有独立的随机状态。

处理批次时,随机算子需要显式指定`batch_size`——没有流水线级别的批次大小可以继承。

Example: Image Classification Pipeline

示例:图像分类流水线

python
import nvidia.dali.experimental.dynamic as ndd

ndd.set_num_threads(4)
reader = ndd.readers.File(file_root="/data/imagenet/train", random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        images = ndd.decoders.image(jpegs, device="gpu")
        images = ndd.resize(images, size=[224, 224])
        images = ndd.crop_mirror_normalize(
            images,
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
        )
        train_step(images.torch(), labels.torch())
python
import nvidia.dali.experimental.dynamic as ndd

ndd.set_num_threads(4)
reader = ndd.readers.File(file_root="/data/imagenet/train", random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        images = ndd.decoders.image(jpegs, device="gpu")
        images = ndd.resize(images, size=[224, 224])
        images = ndd.crop_mirror_normalize(
            images,
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
        )
        train_step(images.torch(), labels.torch())

Common Mistakes

常见错误

WrongRightWhy
device="mixed"
device="gpu"
"mixed"
is pipeline mode only
batch[i]
batch.select(i)
Batch
has no
__getitem__
batch.select(0)
for per-sample slicing
batch.slice[0]
.select()
picks samples;
.slice
slices within each sample
.evaluate()
after every op
Let consumption trigger eval
.torch()
,
.shape
, etc. trigger it automatically
.cpu()
before GPU model
.torch()
directly
Avoids wasteful D2H + H2D round-trip
Recreate reader each epoch
reader.next_epoch()
Readers are stateful -- create once, reuse
ndd.readers.file(...)
ndd.readers.File(...)
Reader classes are PascalCase
break
from
next_epoch()
loop
Exhaust iterator or create new readerIterator must be fully consumed before next
next_epoch()
No
batch_size
to random ops
ndd.random.uniform(batch_size=N, ...)
No pipeline-level batch size to inherit
错误做法正确做法原因
device="mixed"
device="gpu"
"mixed"
仅适用于流水线模式
batch[i]
batch.select(i)
Batch
不支持
__getitem__
使用
batch.select(0)
进行逐样本切片
使用
batch.slice[0]
.select()
用于选择样本;
.slice
用于对每个样本内部切片
每个算子后调用
.evaluate()
让数据消费操作触发评估
.torch()
.shape
等会自动触发评估
传入GPU模型前调用
.cpu()
直接调用
.torch()
避免不必要的设备间往返传输
每个epoch重新创建读取器使用
reader.next_epoch()
读取器是有状态的——创建一次,复用即可
ndd.readers.file(...)
ndd.readers.File(...)
读取器类采用大驼峰命名
next_epoch()
循环中
break
耗尽迭代器或创建新读取器再次调用
next_epoch()
前必须完全消费迭代器
随机算子未指定
batch_size
ndd.random.uniform(batch_size=N, ...)
没有流水线级别的批次大小可以继承

Pipeline Mode Migration

流水线模式迁移

Pipeline ModeDynamic Mode
@pipeline_def
/
pipe.build()
/
pipe.run()
Direct function calls in a loop
fn.readers.file(...)
ndd.readers.File(...)
(PascalCase, stateful)
fn.decoders.image(jpegs, device="mixed")
ndd.decoders.image(jpegs, device="gpu")
fn.op_name(...)
ndd.op_name(...)
Pipeline-level
batch_size=64
reader.next_epoch(batch_size=64)
+ random ops
batch_size=64
Pipeline-level
seed=42
ndd.random.set_seed(42)
or
ndd.random.RNG(seed=42)
Pipeline-level
num_threads=4
ndd.set_num_threads(4)
at startup
output.at(i)
batch.select(i)
output.as_cpu()
batch.cpu()
pipe.run()
returns tuple of
TensorList
reader.next_epoch(batch_size=N)
yields tuples of
Batch
流水线模式动态模式
@pipeline_def
/
pipe.build()
/
pipe.run()
在循环中直接调用函数
fn.readers.file(...)
ndd.readers.File(...)
(大驼峰命名,有状态)
fn.decoders.image(jpegs, device="mixed")
ndd.decoders.image(jpegs, device="gpu")
fn.op_name(...)
ndd.op_name(...)
流水线级别
batch_size=64
reader.next_epoch(batch_size=64)
+ 随机算子指定
batch_size=64
流水线级别
seed=42
ndd.random.set_seed(42)
ndd.random.RNG(seed=42)
流水线级别
num_threads=4
启动时调用
ndd.set_num_threads(4)
output.at(i)
batch.select(i)
output.as_cpu()
batch.cpu()
pipe.run()
返回
TensorList
元组
reader.next_epoch(batch_size=N)
返回
Batch
元组