cupynumeric-parallel-data-load

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Parallel sharded data -> cupynumeric load

并行分片数据加载至cupynumeric

Why this skill exists. cupynumeric mirrors NumPy's array API, including
cupynumeric.load
for a single
.npy
file. Beyond that, file loading lives in Legate, not cupynumeric:
FormatBuilt-in loader
Single
.npy
cupynumeric.load(path)
(NumPy-API parity)
HDF5 (single file)
legate.io.hdf5.from_file
/
from_file_batched
Sharded multi-file (any format), Parquet/Arrow, raw binary, custom layoutsNo built-in loader — this skill.
This skill shows the canonical way to fill the gap in the last row: write a Legate Python task that calls the third-party reader the format needs (
h5py
,
pyarrow
,
np.memmap
, ...) inside the task body, and let Legate distribute the reads across GPUs / nodes. For the formats with a built-in loader, prefer it unless you need a custom in-task body (mmap-based loader, format-specific decoder, sidecar metadata, partial / sharded reads).
Canonical pattern: manual partition + manual task launch, sized to the machine, not the files. Only axis 0 is sharded; trailing axes ride along inside each tile. Per-shard row counts may differ across files (only
dtype
and trailing axes must match); the launch fills every available processor regardless of how many files there are.
.npy
is the worked example because the header carries shape and dtype on disk, but the skeleton applies to any format with cheap range/slice reads (raw binary, HDF5, Parquet/Arrow — see "Other formats" below). Reference implementation:
assets/examples/parallel_npy_load.py
.
该技能的存在意义。cupynumeric镜像了NumPy的数组API,包括用于单个
.npy
文件的
cupynumeric.load
。除此之外,文件加载功能属于Legate而非cupynumeric:
格式内置加载器
单个
.npy
文件
cupynumeric.load(path)
(与NumPy API兼容)
HDF5(单个文件)
legate.io.hdf5.from_file
/
from_file_batched
分片多文件(任意格式)、Parquet/Arrow、原始二进制、自定义布局无内置加载器 — 需使用本技能
本技能展示了填补最后一项空白的标准方法:编写一个Legate Python任务,在任务体内调用对应格式所需的第三方读取器(如
h5py
pyarrow
np.memmap
等),让Legate在GPU/节点间分布式执行读取操作。对于有内置加载器的格式,除非你需要自定义任务体(如基于内存映射的加载器、格式特定解码器、辅助元数据、部分/分片读取),否则优先使用内置加载器。
标准模式:手动分区 + 手动任务启动,根据机器规模而非文件数量调整大小。仅对轴0进行分片;后续轴保持在每个分片内。各分片文件的行数可以不同(仅
dtype
和后续轴必须匹配);无论文件数量多少,启动操作都会利用所有可用处理器。
.npy
作为示例是因为其头部在磁盘上存储了形状和dtype信息,但该框架适用于任何支持低成本范围/切片读取的格式(原始二进制、HDF5、Parquet/Arrow — 见下文“其他格式”)。参考实现:
assets/examples/parallel_npy_load.py

Data layout assumption

数据布局假设

This skill is purely about loading — it assumes the data is already laid out on a shared filesystem in some predictable, indexable way. Producing those files is out of scope (the example ships a
write
subcommand for convenience, but real users bring their own).
The worked example assumes one specific layout:
  • A directory containing files named
    shard_0000.npy
    ,
    shard_0001.npy
    , ... in a contiguous integer sequence (zero-padded width 4).
  • All shards share the same
    dtype
    and the same trailing axes (
    shape[1:]
    ); axis 0 (rows per shard) may differ across files — the recipe builds a cumulative row-offset table and reads each file's overlapping slice from inside the leaf task.
  • The directory is visible to every rank (shared filesystem for multi-node runs).
The example's
discover_layout()
prints what it found and hard-fails with a descriptive error when the layout is wrong (missing directory, no shards, mismatched
dtype
/ trailing axes, or a hole in the contiguous
shard_NNNN.npy
sequence).
If your data lives in a different layout — fixed-stride raw binary, an HDF5 file with one dataset per shard, a directory tree, ... — only the glob pattern, the per-file reader (step 4 below), and the metadata discovery (step 1 below) change. The partitioning and launch machinery is layout-agnostic.
本技能仅关注加载环节 — 假设数据已按可预测、可索引的方式存储在共享文件系统中。生成这些文件的过程不在本技能范围内(示例中提供了
write
子命令以方便使用,但实际用户需自行生成)。
示例采用以下特定布局:
  • 目录包含命名为
    shard_0000.npy
    shard_0001.npy
    ……的文件,文件名采用连续整数序列(补零至4位宽度)。
  • 所有分片共享相同的
    dtype
    和后续轴(
    shape[1:]
    );轴0(每个分片的行数)可在不同文件间不同 — 本方案会构建累积行偏移表,并在叶子任务中读取每个文件的重叠切片。
  • 该目录对所有rank可见(多节点运行时需使用共享文件系统)。
示例中的
discover_layout()
会打印检测到的布局信息,当布局不符合要求时(如目录缺失、无分片文件、
dtype
/后续轴不匹配、
shard_NNNN.npy
序列不连续),会抛出明确的错误信息。
如果你的数据采用其他布局 — 固定步长的原始二进制、每个分片对应一个数据集的HDF5文件、目录树结构等 — 仅需修改全局匹配模式、每个文件的读取器(下文步骤4)和元数据检测(下文步骤1)。分区和启动机制与布局无关。

When to use

使用场景

See the format table above for the routing decision (built-in loader vs. this skill). Beyond that, two additional cues that this skill is the right fit:
  • Replacing sequential
    np.concatenate([read(f) for f in files])
    with parallel per-GPU reads.
  • Demonstrating how a user-defined Legate Python task writes into a cupynumeric output array via a manual launch.
参考上方的格式表格来决定使用内置加载器还是本技能。除此之外,以下两种情况也适合使用本技能:
  • 用并行的GPU读取替代串行的
    np.concatenate([read(f) for f in files])
  • 演示如何通过手动启动,让用户自定义的Legate Python任务写入cupynumeric输出数组。

Examples

示例

Paths below are written relative to this skill's directory (the script ships at
assets/examples/parallel_npy_load.py
). Adjust the prefix to match wherever your skill is installed (e.g.
skills/cupynumeric-parallel-data-load/assets/...
if the skill lives under a top-level
skills/
directory).
bash
undefined
以下路径均相对于本技能的目录(脚本位于
assets/examples/parallel_npy_load.py
)。请根据技能安装位置调整前缀(例如,如果技能位于顶级
skills/
目录下,则路径为
skills/cupynumeric-parallel-data-load/assets/...
)。
bash
undefined

Single-node, 4 GPUs.

单节点,4块GPU

legate --gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo

```bash
legate --gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo

```bash

Multi-node, 2 nodes x 4 GPUs (slurm), shared filesystem at --shard-dir.

多节点,2节点×4块GPU(使用slurm),共享文件系统路径为--shard-dir

Generate the shards once on rank 0, then re-run
read
at any scale.

先在rank 0上生成分片,之后可在任意规模下重新运行
read
命令

legate --launcher srun --nodes 2 --cpus 1
assets/examples/parallel_npy_load.py
write --shard-dir /shared/scratch/demo
legate --launcher srun --nodes 2 --ranks-per-node 4
--gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo

No layout flags — the read driver walks every `.npy` header to recover
per-file row counts, the trailing shape, and the dtype, then derives
`tile_rows` from the available processor count.

`--min-gpu-chunk 1` is only needed when the per-tile element count is
below Legate's default minimum chunk size for GPU launches (e.g. the
worked example's defaults — total rows split across 4 GPUs at
`~1M` per tile — fall below the threshold and would otherwise be
folded onto a single GPU). For production-sized datasets (tens of
millions of elements per tile or larger) you can drop the flag and
let Legate use its default. Bumping it to a moderate value (e.g.
`--min-gpu-chunk 1024`) is fine when each tile is large enough that
per-task overhead matters more than getting *every* GPU a tile.
legate --launcher srun --nodes 2 --cpus 1
assets/examples/parallel_npy_load.py
write --shard-dir /shared/scratch/demo
legate --launcher srun --nodes 2 --ranks-per-node 4
--gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo

无需布局标志 — 读取驱动会遍历每个`.npy`文件的头部,恢复每个文件的行数、后续形状和dtype,然后根据可用处理器数量计算`tile_rows`。

`--min-gpu-chunk 1`仅在每个分片的元素数量低于Legate默认的GPU启动最小块大小时需要(例如,示例默认设置 — 总行数拆分到4块GPU,每个分片约1M元素 — 低于阈值,否则会被合并到单块GPU上)。对于生产级数据集(每个分片包含数千万或更多元素),可以省略该标志,让Legate使用默认值。当每个分片足够大,任务开销比利用每一块GPU更重要时,可将该值调整为适中的数值(如`--min-gpu-chunk 1024`)。

Instructions

操作步骤

Five steps from a
.npy
worked example; only step 1 (parsing the format header) and step 4 (the per-file reader inside the task body) are format-specific. The other three (allocate destination, partition, fence) are reused unchanged across formats — see "Other formats" below for the swap-points.
以下是基于
.npy
文件的五个步骤;仅步骤1(解析格式头部)和步骤4(任务体内的单文件读取器)与格式相关。其他三个步骤(分配目标存储、分区、栅栏)可在所有格式中复用 — 见下文“其他格式”中的替换点。

1. Read the metadata from every shard

1. 读取所有分片的元数据

Scan the directory and peek at every
.npy
header (
mmap_mode="r"
reads only the header). The header carries the per-shard shape and dtype, so the driver can recover total rows, trailing shape, and a cumulative row-offset table without ever loading the data:
python
paths = sorted(SHARD_DIR.glob("shard_*.npy"))

per_file_rows = []                       # rows along axis 0 per file
trailing_shape = None                    # shape[1:], must match across files
dtype = None
for p in paths:
    hdr = np.load(p, mmap_mode="r")
    if trailing_shape is None:
        trailing_shape = tuple(hdr.shape[1:])
        dtype = hdr.dtype
    elif tuple(hdr.shape[1:]) != trailing_shape or hdr.dtype != dtype:
        raise RuntimeError(
            f"{p.name}: trailing shape / dtype mismatch "
            f"({hdr.shape[1:]}/{hdr.dtype} vs {trailing_shape}/{dtype})"
        )
    per_file_rows.append(int(hdr.shape[0]))

cum_rows = np.cumsum([0] + per_file_rows, dtype=np.int64)  # length N+1
total_rows = int(cum_rows[-1])
The snippet above enforces matching
dtype
and
trailing_shape
(i.e.
shape[1:]
) across files. Per-shard row counts may differ — the cum-rows table handles that. Production code should also verify that names form a contiguous
shard_0000.npy ... shard_NNNN.npy
sequence (omitted from the snippet for brevity; see
discover_layout()
in the worked example). Discovery relies only on what the on-disk format itself exposes (the
.npy
header here,
.shape
/
.dtype
for HDF5, etc.); any sidecar (manifest, content hashes) is a separate verification step on top.
扫描目录并查看每个
.npy
文件的头部(
mmap_mode="r"
仅读取头部)。头部包含每个分片的形状和dtype,因此驱动无需加载数据即可恢复总行数、后续形状和累积行偏移表:
python
paths = sorted(SHARD_DIR.glob("shard_*.npy"))

per_file_rows = []                       # 每个文件沿轴0的行数
trailing_shape = None                    # shape[1:],所有文件必须匹配
dtype = None
for p in paths:
    hdr = np.load(p, mmap_mode="r")
    if trailing_shape is None:
        trailing_shape = tuple(hdr.shape[1:])
        dtype = hdr.dtype
    elif tuple(hdr.shape[1:]) != trailing_shape or hdr.dtype != dtype:
        raise RuntimeError(
            f"{p.name}: trailing shape / dtype mismatch "
            f"({hdr.shape[1:]}/{hdr.dtype} vs {trailing_shape}/{dtype})"
        )
    per_file_rows.append(int(hdr.shape[0]))

cum_rows = np.cumsum([0] + per_file_rows, dtype=np.int64)  # 长度为N+1
total_rows = int(cum_rows[-1])
上述代码片段强制要求所有文件的
dtype
trailing_shape
(即
shape[1:]
)匹配。每个分片的行数可以不同 — 累积行表会处理这种情况。生产环境代码还应验证文件名是否形成连续的
shard_0000.npy ... shard_NNNN.npy
序列(为简洁起见,该部分从代码片段中省略;可参考示例中的
discover_layout()
)。元数据检测仅依赖磁盘格式本身暴露的信息(此处为
.npy
头部,HDF5为
.shape
/
.dtype
等);任何辅助文件(清单、内容哈希)都是额外的验证步骤。

2. Create the cupynumeric output store from the metadata

2. 根据元数据创建cupynumeric输出存储

The total array spans
total_rows
along axis 0; trailing axes come from
trailing_shape
unchanged. Use
cn.empty
— the task overwrites every cell, zero-init would be wasted.
python
import cupynumeric as cn

total_shape = (total_rows,) + trailing_shape
out = cn.empty(total_shape, dtype=dtype)
总数组沿轴0的长度为
total_rows
;后续轴与
trailing_shape
保持一致。使用
cn.empty
— 任务会覆盖所有单元格,零初始化会造成浪费。
python
import cupynumeric as cn

total_shape = (total_rows,) + trailing_shape
out = cn.empty(total_shape, dtype=dtype)

3. Tile the store by processor count

3. 根据处理器数量分片存储

The launch shape is sized to the available processors, not to the file count. Pick
tile_rows = ceil(total_rows / num_processors)
and partition axis 0 by that tile size. Trailing axes are not partitioned (tile spans the full extent there). The last tile is allowed to be short — that's exactly what
partition_by_tiling
supports — so the recipe needs no divisibility constraint.
python
from legate.core import TaskTarget, get_legate_runtime
from legate.core.data_interface import as_logical_array

runtime = get_legate_runtime()
machine = runtime.get_machine()
num_processors = max(
    machine.count(TaskTarget.GPU),
    machine.count(TaskTarget.OMP),
    machine.count(TaskTarget.CPU),
    1,
)

tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
tile_shape = (tile_rows,) + trailing_shape
partition = as_logical_array(out).data.partition_by_tiling(tile_shape)

num_tasks = (total_rows + tile_rows - 1) // tile_rows  # match partition tile count
启动规模根据可用处理器数量而非文件数量确定。设置
tile_rows = ceil(total_rows / num_processors)
,并按该分片大小对轴0进行分区。后续轴不分区(分片覆盖整个范围)。允许最后一个分片长度较短 — 这正是
partition_by_tiling
支持的功能 — 因此本方案无需整除约束。
python
from legate.core import TaskTarget, get_legate_runtime
from legate.core.data_interface import as_logical_array

runtime = get_legate_runtime()
machine = runtime.get_machine()
num_processors = max(
    machine.count(TaskTarget.GPU),
    machine.count(TaskTarget.OMP),
    machine.count(TaskTarget.CPU),
    1,
)

tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
tile_shape = (tile_rows,) + trailing_shape
partition = as_logical_array(out).data.partition_by_tiling(tile_shape)

num_tasks = (total_rows + tile_rows - 1) // tile_rows  # 与分区分片数量匹配

4. Define the leaf task and launch it manually

4. 定义叶子任务并手动启动

PATHS
and
CUM_ROWS
(the file paths and cumulative row-offset table from step 1) plus
TILE_ROWS
are populated as module globals by the driver before launching; control replication runs the driver on every rank, so every worker sees identical values.
Each task builds its consumer view first (cupy on GPU, numpy on CPU/OMP) and reads the tile's actual row count from
view.shape[0]
PhysicalStore
itself has no
.shape
attribute, so going through the view is required. It then computes its global row range from its launch coordinate and that row count, bisects
cum_rows
for the overlapping file(s), and copies each overlapping file slice into the matching destination slice. Register CPU, OMP, and GPU variants so the same launch runs unchanged anywhere; dispatch on
ctx.get_variant_kind()
picks the consumer matching where the
OutputStore
is resident (
cp.from_dlpack(dst)
for FBMEM,
np.asarray(dst)
for SYSMEM). cupy is imported inside the GPU branch only, so the task body loads on machines without cupy.
python
import bisect
from legate.core import TaskContext, VariantCode
from legate.core.task import OutputStore, task

@task(variants=(VariantCode.CPU, VariantCode.OMP, VariantCode.GPU))
def load_tile(ctx: TaskContext, dst: OutputStore) -> None:
    t = ctx.task_index[0]                              # tile index 0..num_tasks-1

    variant = ctx.get_variant_kind()
    if variant == VariantCode.GPU:
        import cupy as cp                              # lazy: only on GPU
        view = cp.from_dlpack(dst)
    else:
        view = np.asarray(dst)                         # zero-copy numpy view

    tile_rows_actual = view.shape[0]                   # short on the last tile
    row_start = t * TILE_ROWS                          # global axis-0 start
    row_end = row_start + tile_rows_actual

    # Find the half-open range of file indices that overlap [row_start, row_end).
    first_file = bisect.bisect_right(CUM_ROWS, row_start) - 1
    last_file = bisect.bisect_right(CUM_ROWS, row_end - 1) - 1

    for f in range(first_file, last_file + 1):
        # Intersection of tile [row_start, row_end) with file [cum[f], cum[f+1]).
        lo = max(row_start, int(CUM_ROWS[f]))
        hi = min(row_end, int(CUM_ROWS[f + 1]))
        file_lo = lo - int(CUM_ROWS[f])
        file_hi = hi - int(CUM_ROWS[f])
        dst_lo = lo - row_start
        dst_hi = hi - row_start
        chunk = np.ascontiguousarray(
            np.load(PATHS[f], mmap_mode="r")[file_lo:file_hi]
        )
        if variant == VariantCode.GPU:
            view[dst_lo:dst_hi].set(chunk)             # cudaMemcpyAsync H2D
        else:
            view[dst_lo:dst_hi] = chunk                # zero-copy numpy write

manual_task = runtime.create_manual_task(
    load_tile.library,
    load_tile.task_id,
    (num_tasks,),                                      # launch domain == tile count
)
manual_task.add_output(partition)
manual_task.execute()
Both consumers go through
PhysicalStore
's native producers (
__dlpack__
for cupy,
__array_interface__
for
np.asarray
) — zero-copy views of the local tile. Bisect cost is
O(log num_shards)
and the inner loop typically iterates 1–2 times (tiles overlap at most a couple of files).
PATHS
CUM_ROWS
(步骤1中的文件路径和累积行偏移表)以及
TILE_ROWS
在启动前由驱动填充为模块全局变量;控制复制机制会在所有rank上运行驱动,因此所有工作节点都会看到相同的值。
每个任务首先构建其消费者视图(GPU上使用cupy,CPU/OMP上使用numpy),并从
view.shape[0]
读取分片的实际行数 —
PhysicalStore
本身没有
.shape
属性,因此必须通过视图获取。然后根据启动坐标和行数计算全局行范围,在
cum_rows
中二分查找重叠的文件,并将每个重叠的文件切片复制到对应的目标切片中。注册CPU、OMP和GPU变体,以便同一启动操作可在任意环境中运行;通过
ctx.get_variant_kind()
选择与
OutputStore
所在位置匹配的消费者(FBMEM使用
cp.from_dlpack(dst)
,SYSMEM使用
np.asarray(dst)
)。仅在GPU分支中导入cupy,因此任务体可在没有cupy的机器上运行。
python
import bisect
from legate.core import TaskContext, VariantCode
from legate.core.task import OutputStore, task

@task(variants=(VariantCode.CPU, VariantCode.OMP, VariantCode.GPU))
def load_tile(ctx: TaskContext, dst: OutputStore) -> None:
    t = ctx.task_index[0]                              # 分片索引0..num_tasks-1

    variant = ctx.get_variant_kind()
    if variant == VariantCode.GPU:
        import cupy as cp                              # 延迟导入:仅在GPU环境
        view = cp.from_dlpack(dst)
    else:
        view = np.asarray(dst)                         # 零拷贝numpy视图

    tile_rows_actual = view.shape[0]                   # 最后一个分片可能较短
    row_start = t * TILE_ROWS                          # 全局轴0起始位置
    row_end = row_start + tile_rows_actual

    # 查找与[row_start, row_end)重叠的文件索引的半开范围
    first_file = bisect.bisect_right(CUM_ROWS, row_start) - 1
    last_file = bisect.bisect_right(CUM_ROWS, row_end - 1) - 1

    for f in range(first_file, last_file + 1):
        # 分片[row_start, row_end)与文件[cum[f], cum[f+1])的交集
        lo = max(row_start, int(CUM_ROWS[f]))
        hi = min(row_end, int(CUM_ROWS[f + 1]))
        file_lo = lo - int(CUM_ROWS[f])
        file_hi = hi - int(CUM_ROWS[f])
        dst_lo = lo - row_start
        dst_hi = hi - row_start
        chunk = np.ascontiguousarray(
            np.load(PATHS[f], mmap_mode="r")[file_lo:file_hi]
        )
        if variant == VariantCode.GPU:
            view[dst_lo:dst_hi].set(chunk)             # cudaMemcpyAsync H2D
        else:
            view[dst_lo:dst_hi] = chunk                # 零拷贝numpy写入

manual_task = runtime.create_manual_task(
    load_tile.library,
    load_tile.task_id,
    (num_tasks,),                                      # 启动域 == 分片数量
)
manual_task.add_output(partition)
manual_task.execute()
两种消费者均通过
PhysicalStore
的原生生产者(cupy使用
__dlpack__
np.asarray
使用
__array_interface__
)获取分片的零拷贝视图。二分查找的时间复杂度为
O(log num_shards)
,内部循环通常迭代1-2次(分片最多与几个文件重叠)。

5. Fence and verify

5. 栅栏与验证

python
get_legate_runtime().issue_execution_fence(block=True)
python
get_legate_runtime().issue_execution_fence(block=True)

Hard constraints

硬性约束

  1. All shards must share
    dtype
    and trailing axes (
    shape[1:]
    ).
    The recipe stacks shards along axis 0; the destination's trailing axes come from
    trailing_shape
    , which the discovery step locks to the value of the first file. Per-shard row counts (
    shape[0]
    ) may freely differ — the cumulative-offset table handles them. The example rejects any shard whose
    dtype
    or trailing shape differs from the first one with a descriptive error.
  2. Pick the consumer that matches the variant.
    cp.from_dlpack
    rejects SYSMEM-resident stores;
    np.asarray
    silently returns a host view of an FBMEM-resident store you can't actually write through. Dispatch on
    ctx.get_variant_kind()
    so each variant uses its own consumer — see step 4.
  3. mmap views aren't always C-contiguous — wrap each per-file slice with
    np.ascontiguousarray(arr[file_lo:file_hi])
    before
    .set()
    or the numpy in-place write.
  4. Multi-node:
    SHARD_DIR
    must be on a shared filesystem.
    Every worker (on every rank) opens shards by path; node-local
    /tmp
    paths only work for single-node demos.
  1. 所有分片必须共享
    dtype
    和后续轴(
    shape[1:]
    。本方案沿轴0堆叠分片;目标的后续轴来自
    trailing_shape
    ,检测步骤会将其锁定为第一个文件的值。每个分片的行数(
    shape[0]
    )可自由不同 — 累积偏移表会处理这种情况。示例会拒绝任何
    dtype
    或后续形状与第一个文件不同的分片,并抛出明确的错误信息。
  2. 选择与变体匹配的消费者
    cp.from_dlpack
    会拒绝驻留在SYSMEM的存储;
    np.asarray
    会静默返回驻留在FBMEM的存储的主机视图,无法实际写入。通过
    ctx.get_variant_kind()
    进行分发,让每个变体使用对应的消费者 — 见步骤4。
  3. 内存映射视图并非始终是C连续的 — 在
    .set()
    或numpy原地写入之前,用
    np.ascontiguousarray(arr[file_lo:file_hi])
    包装每个文件切片。
  4. 多节点场景:
    SHARD_DIR
    必须位于共享文件系统
    。每个工作节点(所有rank)通过路径打开分片;节点本地的
    /tmp
    路径仅适用于单节点演示。

Variants

变体

Uniform-shard fast path (one task per file)

统一分片快速路径(每个文件对应一个任务)

When every shard already has the same
(shape, dtype)
and you happen to have
num_shards
processors available, the cum-rows / bisect machinery is overhead. Set
tile_rows = shard_shape[0]
and
num_tasks = num_shards
; the partition then has one tile per file and each task reads exactly one file end-to-end (no bisect, no inner loop). The driver-side switch is a one-liner:
python
if all(r == per_file_rows[0] for r in per_file_rows) and num_shards == num_processors:
    tile_rows = per_file_rows[0]
else:
    tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
The same
load_tile
task body still works in either mode — the inner loop just happens to iterate exactly once per task. There's no need for a separate task body for the fast path.
当所有分片的
(shape, dtype)
都相同,且恰好有
num_shards
个可用处理器时,累积行/二分查找机制会产生额外开销。设置
tile_rows = shard_shape[0]
num_tasks = num_shards
;此时分区每个文件对应一个分片,每个任务读取整个文件(无需二分查找,无内部循环)。驱动端只需一行代码即可切换:
python
if all(r == per_file_rows[0] for r in per_file_rows) and num_shards == num_processors:
    tile_rows = per_file_rows[0]
else:
    tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
同一个
load_tile
任务体在两种模式下均可工作 — 内部循环恰好每个任务迭代一次。无需为快速路径编写单独的任务体。

Over-decompose for better load balancing

过度分解以实现更好的负载均衡

The default
tile_rows = ceil(total_rows / num_processors)
gives one tile per processor. To over-decompose by a factor
K
(smaller tiles, more point tasks, finer-grained queueing), divide by
K * num_processors
instead:
python
tile_rows = max(1, (total_rows + K * num_processors - 1) // (K * num_processors))
num_tasks = ceil(total_rows / tile_rows)
then expands to roughly
K * num_processors
. The same task body still works — bisect just lands on more tasks per file.
默认的
tile_rows = ceil(total_rows / num_processors)
为每个处理器分配一个分片。要按因子
K
过度分解(更小的分片、更多的点任务、更细粒度的队列),改为除以
K * num_processors
python
tile_rows = max(1, (total_rows + K * num_processors - 1) // (K * num_processors))
此时
num_tasks = ceil(total_rows / tile_rows)
约等于
K * num_processors
。同一个任务体仍可工作 — 二分查找会为每个文件匹配更多任务。

Other formats

其他格式

Only the per-file reader inside
load_tile
changes. The reader's contract: given a file path and a half-open row range
[file_lo, file_hi)
along axis 0, return a numpy array of shape
(file_hi - file_lo,) + trailing_shape
that can be made C-contiguous. Cheap range/slice reads are required — formats that only support "read the whole file" defeat the partial-overlap case (a tile that covers only part of one file).
FormatReader inside the leaf task
.npy
(worked example)
host = np.ascontiguousarray(np.load(p, mmap_mode="r")[file_lo:file_hi])
Raw binary (fixed-shape)
arr = np.memmap(p, dtype=DTYPE, mode="r", shape=(rows_in_file, *trailing_shape)); host = np.ascontiguousarray(arr[file_lo:file_hi])
HDF5
with h5py.File(p, "r") as f: host = np.ascontiguousarray(f["data"][file_lo:file_hi])
Parquet / Arrow
tbl = pq.read_table(p, columns=..., use_threads=False).slice(file_lo, file_hi - file_lo); host = tbl.to_pandas().values
(For built-in single-call loaders per format, see the "Why this skill exists" table at the top of this file.)
The discovery step (step 1) parses each format's metadata:
.npy
/ HDF5 / Parquet all carry per-file row count + dtype on disk. Raw binary doesn't — sidecar or derive from file size.
仅需修改
load_tile
内的单文件读取器。读取器的约定:给定文件路径和沿轴0的半开行范围
[file_lo, file_hi)
,返回形状为
(file_hi - file_lo,) + trailing_shape
的numpy数组,且该数组可转换为C连续数组。需要支持低成本的范围/切片读取 — 仅支持“读取整个文件”的格式无法处理部分重叠的情况(分片仅覆盖文件的一部分)。
格式叶子任务内的读取器
.npy
(示例)
host = np.ascontiguousarray(np.load(p, mmap_mode="r")[file_lo:file_hi])
原始二进制(固定形状)
arr = np.memmap(p, dtype=DTYPE, mode="r", shape=(rows_in_file, *trailing_shape)); host = np.ascontiguousarray(arr[file_lo:file_hi])
HDF5
with h5py.File(p, "r") as f: host = np.ascontiguousarray(f["data"][file_lo:file_hi])
Parquet / Arrow
tbl = pq.read_table(p, columns=..., use_threads=False).slice(file_lo, file_hi - file_lo); host = tbl.to_pandas().values
(各格式的内置单次调用加载器见本文开头“该技能的存在意义”中的表格。)
检测步骤(步骤1)解析每种格式的元数据:
.npy
/HDF5/Parquet均在磁盘上存储了每个文件的行数和dtype。原始二进制没有 — 需通过辅助文件或文件大小推导。

Common pitfalls

常见陷阱

cn.asarray(dst)
is illegal in a leaf task

叶子任务中使用
cn.asarray(dst)
非法

Inside a
@task
body, any cupynumeric op that touches the top-level runtime —
cn.asarray(store)
, slice assignment
cn_dst[s] = host_np
— triggers
create_index_space
from the wrong context and Legion aborts:
LEGION API USAGE EXCEPTION: Invalid task context passed to runtime call
create_index_space
Fix: consume the DLPack capsule with a third-party library (cupy / torch / numpy) inside leaf tasks.
cn.asarray
is fine in the driver, just not in leaf tasks. See
examples/dlpack/leaf_task_interop.py
for the torch-flavoured workaround.
@task
体内,任何触及顶层runtime的cupynumeric操作 — 如
cn.asarray(store)
、切片赋值
cn_dst[s] = host_np
— 都会在错误的上下文中触发
create_index_space
,导致Legion终止:
LEGION API USAGE EXCEPTION: Invalid task context passed to runtime call
create_index_space
修复方法:在叶子任务中使用第三方库(cupy/torch/numpy)消费DLPack胶囊。
cn.asarray
在驱动中使用是安全的,但不能在叶子任务中使用。可参考
examples/dlpack/leaf_task_interop.py
中的torch版本解决方案。

In-task
assert
aborts the runtime

任务内的
assert
会终止runtime

Legate treats unraised exceptions in a
@task
as a contract violation and aborts unless the task was registered with
throws_exception()
. Sanity-check on the host before launching.
Legate将
@task
中未捕获的异常视为违反约定,除非任务注册了
throws_exception()
,否则会终止runtime。请在启动前在主机端进行完整性检查。

Launch domain must match the partition tile count

启动域必须与分区分片数量匹配

create_manual_task(launch_shape=...)
and
partition_by_tiling(...)
are independent — the runtime doesn't catch a mismatch. Larger launch domain → out-of-range tiles; smaller → unwritten tiles. Always derive both from the same
(total_rows, tile_rows)
via two separate
ceil
divisions (sizing the launch domain to
num_processors
directly would over-launch when
num_processors > total_rows
):
python
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
num_tasks = (total_rows + tile_rows - 1) // tile_rows
partition = ...partition_by_tiling((tile_rows,) + trailing_shape)
runtime.create_manual_task(load_tile.library, load_tile.task_id, (num_tasks,))
create_manual_task(launch_shape=...)
partition_by_tiling(...)
是独立的 — runtime不会检测不匹配的情况。启动域过大 → 分片越界;启动域过小 → 部分分片未被写入。始终通过两次单独的向上取整运算,从相同的
(total_rows, tile_rows)
推导两者(直接根据
num_processors
设置启动域会在
num_processors > total_rows
时过度启动):
python
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
num_tasks = (total_rows + tile_rows - 1) // tile_rows
partition = ...partition_by_tiling((tile_rows,) + trailing_shape)
runtime.create_manual_task(load_tile.library, load_tile.task_id, (num_tasks,))