cupynumeric-parallel-data-load
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseParallel sharded data -> cupynumeric load
并行分片数据加载至cupynumeric
Why this skill exists. cupynumeric mirrors NumPy's array API,
including for a single file. Beyond that,
file loading lives in Legate, not cupynumeric:
cupynumeric.load.npy| Format | Built-in loader |
|---|---|
Single | |
| HDF5 (single file) | |
| Sharded multi-file (any format), Parquet/Arrow, raw binary, custom layouts | No built-in loader — this skill. |
This skill shows the canonical way to fill the gap in the last row:
write a Legate Python task that calls the third-party reader the
format needs (, , , ...) inside the
task body, and let Legate distribute the reads across GPUs / nodes.
For the formats with a built-in loader, prefer it unless you need a
custom in-task body (mmap-based loader, format-specific decoder,
sidecar metadata, partial / sharded reads).
h5pypyarrownp.memmapCanonical pattern: manual partition + manual task launch, sized to
the machine, not the files. Only axis 0 is sharded; trailing axes
ride along inside each tile. Per-shard row counts may differ across
files (only and trailing axes must match); the launch fills
every available processor regardless of how many files there are.
dtype.npyassets/examples/parallel_npy_load.py该技能的存在意义。cupynumeric镜像了NumPy的数组API,包括用于单个文件的。除此之外,文件加载功能属于Legate而非cupynumeric:
.npycupynumeric.load| 格式 | 内置加载器 |
|---|---|
单个 | |
| HDF5(单个文件) | |
| 分片多文件(任意格式)、Parquet/Arrow、原始二进制、自定义布局 | 无内置加载器 — 需使用本技能 |
本技能展示了填补最后一项空白的标准方法:编写一个Legate Python任务,在任务体内调用对应格式所需的第三方读取器(如、、等),让Legate在GPU/节点间分布式执行读取操作。对于有内置加载器的格式,除非你需要自定义任务体(如基于内存映射的加载器、格式特定解码器、辅助元数据、部分/分片读取),否则优先使用内置加载器。
h5pypyarrownp.memmap标准模式:手动分区 + 手动任务启动,根据机器规模而非文件数量调整大小。仅对轴0进行分片;后续轴保持在每个分片内。各分片文件的行数可以不同(仅和后续轴必须匹配);无论文件数量多少,启动操作都会利用所有可用处理器。
dtype.npyassets/examples/parallel_npy_load.pyData layout assumption
数据布局假设
This skill is purely about loading — it assumes the data is already
laid out on a shared filesystem in some predictable, indexable way.
Producing those files is out of scope (the example ships a
subcommand for convenience, but real users bring their own).
writeThe worked example assumes one specific layout:
- A directory containing files named ,
shard_0000.npy, ... in a contiguous integer sequence (zero-padded width 4).shard_0001.npy - All shards share the same and the same trailing axes (
dtype); axis 0 (rows per shard) may differ across files — the recipe builds a cumulative row-offset table and reads each file's overlapping slice from inside the leaf task.shape[1:] - The directory is visible to every rank (shared filesystem for multi-node runs).
The example's prints what it found and hard-fails
with a descriptive error when the layout is wrong (missing directory,
no shards, mismatched / trailing axes, or a hole in the
contiguous sequence).
discover_layout()dtypeshard_NNNN.npyIf your data lives in a different layout — fixed-stride raw binary, an
HDF5 file with one dataset per shard, a directory tree, ... — only the
glob pattern, the per-file reader (step 4 below), and the metadata
discovery (step 1 below) change. The partitioning and launch machinery
is layout-agnostic.
本技能仅关注加载环节 — 假设数据已按可预测、可索引的方式存储在共享文件系统中。生成这些文件的过程不在本技能范围内(示例中提供了子命令以方便使用,但实际用户需自行生成)。
write示例采用以下特定布局:
- 目录包含命名为、
shard_0000.npy……的文件,文件名采用连续整数序列(补零至4位宽度)。shard_0001.npy - 所有分片共享相同的和后续轴(
dtype);轴0(每个分片的行数)可在不同文件间不同 — 本方案会构建累积行偏移表,并在叶子任务中读取每个文件的重叠切片。shape[1:] - 该目录对所有rank可见(多节点运行时需使用共享文件系统)。
示例中的会打印检测到的布局信息,当布局不符合要求时(如目录缺失、无分片文件、/后续轴不匹配、序列不连续),会抛出明确的错误信息。
discover_layout()dtypeshard_NNNN.npy如果你的数据采用其他布局 — 固定步长的原始二进制、每个分片对应一个数据集的HDF5文件、目录树结构等 — 仅需修改全局匹配模式、每个文件的读取器(下文步骤4)和元数据检测(下文步骤1)。分区和启动机制与布局无关。
When to use
使用场景
See the format table above for the routing decision (built-in loader
vs. this skill). Beyond that, two additional cues that this skill is
the right fit:
- Replacing sequential with parallel per-GPU reads.
np.concatenate([read(f) for f in files]) - Demonstrating how a user-defined Legate Python task writes into a cupynumeric output array via a manual launch.
参考上方的格式表格来决定使用内置加载器还是本技能。除此之外,以下两种情况也适合使用本技能:
- 用并行的GPU读取替代串行的。
np.concatenate([read(f) for f in files]) - 演示如何通过手动启动,让用户自定义的Legate Python任务写入cupynumeric输出数组。
Examples
示例
Paths below are written relative to this skill's directory (the script
ships at ). Adjust the prefix to
match wherever your skill is installed (e.g.
if the skill lives
under a top-level directory).
assets/examples/parallel_npy_load.pyskills/cupynumeric-parallel-data-load/assets/...skills/bash
undefined以下路径均相对于本技能的目录(脚本位于)。请根据技能安装位置调整前缀(例如,如果技能位于顶级目录下,则路径为)。
assets/examples/parallel_npy_load.pyskills/skills/cupynumeric-parallel-data-load/assets/...bash
undefinedSingle-node, 4 GPUs.
单节点,4块GPU
legate --gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo
```bashlegate --gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo
```bashMulti-node, 2 nodes x 4 GPUs (slurm), shared filesystem at --shard-dir.
多节点,2节点×4块GPU(使用slurm),共享文件系统路径为--shard-dir
Generate the shards once on rank 0, then re-run read
at any scale.
read先在rank 0上生成分片,之后可在任意规模下重新运行read
命令
readlegate --launcher srun --nodes 2 --cpus 1
assets/examples/parallel_npy_load.py
write --shard-dir /shared/scratch/demo
assets/examples/parallel_npy_load.py
write --shard-dir /shared/scratch/demo
legate --launcher srun --nodes 2 --ranks-per-node 4
--gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo
--gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo
No layout flags — the read driver walks every `.npy` header to recover
per-file row counts, the trailing shape, and the dtype, then derives
`tile_rows` from the available processor count.
`--min-gpu-chunk 1` is only needed when the per-tile element count is
below Legate's default minimum chunk size for GPU launches (e.g. the
worked example's defaults — total rows split across 4 GPUs at
`~1M` per tile — fall below the threshold and would otherwise be
folded onto a single GPU). For production-sized datasets (tens of
millions of elements per tile or larger) you can drop the flag and
let Legate use its default. Bumping it to a moderate value (e.g.
`--min-gpu-chunk 1024`) is fine when each tile is large enough that
per-task overhead matters more than getting *every* GPU a tile.legate --launcher srun --nodes 2 --cpus 1
assets/examples/parallel_npy_load.py
write --shard-dir /shared/scratch/demo
assets/examples/parallel_npy_load.py
write --shard-dir /shared/scratch/demo
legate --launcher srun --nodes 2 --ranks-per-node 4
--gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo
--gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo
无需布局标志 — 读取驱动会遍历每个`.npy`文件的头部,恢复每个文件的行数、后续形状和dtype,然后根据可用处理器数量计算`tile_rows`。
`--min-gpu-chunk 1`仅在每个分片的元素数量低于Legate默认的GPU启动最小块大小时需要(例如,示例默认设置 — 总行数拆分到4块GPU,每个分片约1M元素 — 低于阈值,否则会被合并到单块GPU上)。对于生产级数据集(每个分片包含数千万或更多元素),可以省略该标志,让Legate使用默认值。当每个分片足够大,任务开销比利用每一块GPU更重要时,可将该值调整为适中的数值(如`--min-gpu-chunk 1024`)。Instructions
操作步骤
Five steps from a worked example; only step 1 (parsing the
format header) and step 4 (the per-file reader inside the task body)
are format-specific. The other three (allocate destination, partition,
fence) are reused unchanged across formats — see "Other formats" below
for the swap-points.
.npy以下是基于文件的五个步骤;仅步骤1(解析格式头部)和步骤4(任务体内的单文件读取器)与格式相关。其他三个步骤(分配目标存储、分区、栅栏)可在所有格式中复用 — 见下文“其他格式”中的替换点。
.npy1. Read the metadata from every shard
1. 读取所有分片的元数据
Scan the directory and peek at every header (
reads only the header). The header carries the per-shard shape and
dtype, so the driver can recover total rows, trailing shape, and a
cumulative row-offset table without ever loading the data:
.npymmap_mode="r"python
paths = sorted(SHARD_DIR.glob("shard_*.npy"))
per_file_rows = [] # rows along axis 0 per file
trailing_shape = None # shape[1:], must match across files
dtype = None
for p in paths:
hdr = np.load(p, mmap_mode="r")
if trailing_shape is None:
trailing_shape = tuple(hdr.shape[1:])
dtype = hdr.dtype
elif tuple(hdr.shape[1:]) != trailing_shape or hdr.dtype != dtype:
raise RuntimeError(
f"{p.name}: trailing shape / dtype mismatch "
f"({hdr.shape[1:]}/{hdr.dtype} vs {trailing_shape}/{dtype})"
)
per_file_rows.append(int(hdr.shape[0]))
cum_rows = np.cumsum([0] + per_file_rows, dtype=np.int64) # length N+1
total_rows = int(cum_rows[-1])The snippet above enforces matching and (i.e.
) across files. Per-shard row counts may differ — the
cum-rows table handles that. Production code should also verify that
names form a contiguous sequence
(omitted from the snippet for brevity; see in the
worked example). Discovery relies only on what the
on-disk format itself exposes (the header here, /
for HDF5, etc.); any sidecar (manifest, content hashes) is a
separate verification step on top.
dtypetrailing_shapeshape[1:]shard_0000.npy ... shard_NNNN.npydiscover_layout().npy.shape.dtype扫描目录并查看每个文件的头部(仅读取头部)。头部包含每个分片的形状和dtype,因此驱动无需加载数据即可恢复总行数、后续形状和累积行偏移表:
.npymmap_mode="r"python
paths = sorted(SHARD_DIR.glob("shard_*.npy"))
per_file_rows = [] # 每个文件沿轴0的行数
trailing_shape = None # shape[1:],所有文件必须匹配
dtype = None
for p in paths:
hdr = np.load(p, mmap_mode="r")
if trailing_shape is None:
trailing_shape = tuple(hdr.shape[1:])
dtype = hdr.dtype
elif tuple(hdr.shape[1:]) != trailing_shape or hdr.dtype != dtype:
raise RuntimeError(
f"{p.name}: trailing shape / dtype mismatch "
f"({hdr.shape[1:]}/{hdr.dtype} vs {trailing_shape}/{dtype})"
)
per_file_rows.append(int(hdr.shape[0]))
cum_rows = np.cumsum([0] + per_file_rows, dtype=np.int64) # 长度为N+1
total_rows = int(cum_rows[-1])上述代码片段强制要求所有文件的和(即)匹配。每个分片的行数可以不同 — 累积行表会处理这种情况。生产环境代码还应验证文件名是否形成连续的序列(为简洁起见,该部分从代码片段中省略;可参考示例中的)。元数据检测仅依赖磁盘格式本身暴露的信息(此处为头部,HDF5为/等);任何辅助文件(清单、内容哈希)都是额外的验证步骤。
dtypetrailing_shapeshape[1:]shard_0000.npy ... shard_NNNN.npydiscover_layout().npy.shape.dtype2. Create the cupynumeric output store from the metadata
2. 根据元数据创建cupynumeric输出存储
The total array spans along axis 0; trailing axes come
from unchanged. Use — the task overwrites
every cell, zero-init would be wasted.
total_rowstrailing_shapecn.emptypython
import cupynumeric as cn
total_shape = (total_rows,) + trailing_shape
out = cn.empty(total_shape, dtype=dtype)总数组沿轴0的长度为;后续轴与保持一致。使用 — 任务会覆盖所有单元格,零初始化会造成浪费。
total_rowstrailing_shapecn.emptypython
import cupynumeric as cn
total_shape = (total_rows,) + trailing_shape
out = cn.empty(total_shape, dtype=dtype)3. Tile the store by processor count
3. 根据处理器数量分片存储
The launch shape is sized to the available processors, not to the
file count. Pick and
partition axis 0 by that tile size. Trailing axes are not partitioned
(tile spans the full extent there). The last tile is allowed to be
short — that's exactly what supports — so the
recipe needs no divisibility constraint.
tile_rows = ceil(total_rows / num_processors)partition_by_tilingpython
from legate.core import TaskTarget, get_legate_runtime
from legate.core.data_interface import as_logical_array
runtime = get_legate_runtime()
machine = runtime.get_machine()
num_processors = max(
machine.count(TaskTarget.GPU),
machine.count(TaskTarget.OMP),
machine.count(TaskTarget.CPU),
1,
)
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
tile_shape = (tile_rows,) + trailing_shape
partition = as_logical_array(out).data.partition_by_tiling(tile_shape)
num_tasks = (total_rows + tile_rows - 1) // tile_rows # match partition tile count启动规模根据可用处理器数量而非文件数量确定。设置,并按该分片大小对轴0进行分区。后续轴不分区(分片覆盖整个范围)。允许最后一个分片长度较短 — 这正是支持的功能 — 因此本方案无需整除约束。
tile_rows = ceil(total_rows / num_processors)partition_by_tilingpython
from legate.core import TaskTarget, get_legate_runtime
from legate.core.data_interface import as_logical_array
runtime = get_legate_runtime()
machine = runtime.get_machine()
num_processors = max(
machine.count(TaskTarget.GPU),
machine.count(TaskTarget.OMP),
machine.count(TaskTarget.CPU),
1,
)
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
tile_shape = (tile_rows,) + trailing_shape
partition = as_logical_array(out).data.partition_by_tiling(tile_shape)
num_tasks = (total_rows + tile_rows - 1) // tile_rows # 与分区分片数量匹配4. Define the leaf task and launch it manually
4. 定义叶子任务并手动启动
PATHSCUM_ROWSTILE_ROWSEach task builds its consumer view first (cupy on GPU, numpy on
CPU/OMP) and reads the tile's actual row count from
— itself has no attribute, so going through
the view is required. It then computes its global row range from its
launch coordinate and that row count, bisects for the
overlapping file(s), and copies each overlapping file slice into the
matching destination slice. Register CPU, OMP, and GPU variants so
the same launch runs unchanged anywhere; dispatch on
picks the consumer matching where the
is resident ( for FBMEM,
for SYSMEM). cupy is imported inside the GPU
branch only, so the task body loads on machines without cupy.
view.shape[0]PhysicalStore.shapecum_rowsctx.get_variant_kind()OutputStorecp.from_dlpack(dst)np.asarray(dst)python
import bisect
from legate.core import TaskContext, VariantCode
from legate.core.task import OutputStore, task
@task(variants=(VariantCode.CPU, VariantCode.OMP, VariantCode.GPU))
def load_tile(ctx: TaskContext, dst: OutputStore) -> None:
t = ctx.task_index[0] # tile index 0..num_tasks-1
variant = ctx.get_variant_kind()
if variant == VariantCode.GPU:
import cupy as cp # lazy: only on GPU
view = cp.from_dlpack(dst)
else:
view = np.asarray(dst) # zero-copy numpy view
tile_rows_actual = view.shape[0] # short on the last tile
row_start = t * TILE_ROWS # global axis-0 start
row_end = row_start + tile_rows_actual
# Find the half-open range of file indices that overlap [row_start, row_end).
first_file = bisect.bisect_right(CUM_ROWS, row_start) - 1
last_file = bisect.bisect_right(CUM_ROWS, row_end - 1) - 1
for f in range(first_file, last_file + 1):
# Intersection of tile [row_start, row_end) with file [cum[f], cum[f+1]).
lo = max(row_start, int(CUM_ROWS[f]))
hi = min(row_end, int(CUM_ROWS[f + 1]))
file_lo = lo - int(CUM_ROWS[f])
file_hi = hi - int(CUM_ROWS[f])
dst_lo = lo - row_start
dst_hi = hi - row_start
chunk = np.ascontiguousarray(
np.load(PATHS[f], mmap_mode="r")[file_lo:file_hi]
)
if variant == VariantCode.GPU:
view[dst_lo:dst_hi].set(chunk) # cudaMemcpyAsync H2D
else:
view[dst_lo:dst_hi] = chunk # zero-copy numpy write
manual_task = runtime.create_manual_task(
load_tile.library,
load_tile.task_id,
(num_tasks,), # launch domain == tile count
)
manual_task.add_output(partition)
manual_task.execute()Both consumers go through 's native producers
( for cupy, for ) —
zero-copy views of the local tile. Bisect cost is
and the inner loop typically iterates 1–2 times (tiles overlap at
most a couple of files).
PhysicalStore__dlpack____array_interface__np.asarrayO(log num_shards)PATHSCUM_ROWSTILE_ROWS每个任务首先构建其消费者视图(GPU上使用cupy,CPU/OMP上使用numpy),并从读取分片的实际行数 — 本身没有属性,因此必须通过视图获取。然后根据启动坐标和行数计算全局行范围,在中二分查找重叠的文件,并将每个重叠的文件切片复制到对应的目标切片中。注册CPU、OMP和GPU变体,以便同一启动操作可在任意环境中运行;通过选择与所在位置匹配的消费者(FBMEM使用,SYSMEM使用)。仅在GPU分支中导入cupy,因此任务体可在没有cupy的机器上运行。
view.shape[0]PhysicalStore.shapecum_rowsctx.get_variant_kind()OutputStorecp.from_dlpack(dst)np.asarray(dst)python
import bisect
from legate.core import TaskContext, VariantCode
from legate.core.task import OutputStore, task
@task(variants=(VariantCode.CPU, VariantCode.OMP, VariantCode.GPU))
def load_tile(ctx: TaskContext, dst: OutputStore) -> None:
t = ctx.task_index[0] # 分片索引0..num_tasks-1
variant = ctx.get_variant_kind()
if variant == VariantCode.GPU:
import cupy as cp # 延迟导入:仅在GPU环境
view = cp.from_dlpack(dst)
else:
view = np.asarray(dst) # 零拷贝numpy视图
tile_rows_actual = view.shape[0] # 最后一个分片可能较短
row_start = t * TILE_ROWS # 全局轴0起始位置
row_end = row_start + tile_rows_actual
# 查找与[row_start, row_end)重叠的文件索引的半开范围
first_file = bisect.bisect_right(CUM_ROWS, row_start) - 1
last_file = bisect.bisect_right(CUM_ROWS, row_end - 1) - 1
for f in range(first_file, last_file + 1):
# 分片[row_start, row_end)与文件[cum[f], cum[f+1])的交集
lo = max(row_start, int(CUM_ROWS[f]))
hi = min(row_end, int(CUM_ROWS[f + 1]))
file_lo = lo - int(CUM_ROWS[f])
file_hi = hi - int(CUM_ROWS[f])
dst_lo = lo - row_start
dst_hi = hi - row_start
chunk = np.ascontiguousarray(
np.load(PATHS[f], mmap_mode="r")[file_lo:file_hi]
)
if variant == VariantCode.GPU:
view[dst_lo:dst_hi].set(chunk) # cudaMemcpyAsync H2D
else:
view[dst_lo:dst_hi] = chunk # 零拷贝numpy写入
manual_task = runtime.create_manual_task(
load_tile.library,
load_tile.task_id,
(num_tasks,), # 启动域 == 分片数量
)
manual_task.add_output(partition)
manual_task.execute()两种消费者均通过的原生生产者(cupy使用,使用)获取分片的零拷贝视图。二分查找的时间复杂度为,内部循环通常迭代1-2次(分片最多与几个文件重叠)。
PhysicalStore__dlpack__np.asarray__array_interface__O(log num_shards)5. Fence and verify
5. 栅栏与验证
python
get_legate_runtime().issue_execution_fence(block=True)python
get_legate_runtime().issue_execution_fence(block=True)Hard constraints
硬性约束
-
All shards must shareand trailing axes (
dtype). The recipe stacks shards along axis 0; the destination's trailing axes come fromshape[1:], which the discovery step locks to the value of the first file. Per-shard row counts (trailing_shape) may freely differ — the cumulative-offset table handles them. The example rejects any shard whoseshape[0]or trailing shape differs from the first one with a descriptive error.dtype -
Pick the consumer that matches the variant.rejects SYSMEM-resident stores;
cp.from_dlpacksilently returns a host view of an FBMEM-resident store you can't actually write through. Dispatch onnp.asarrayso each variant uses its own consumer — see step 4.ctx.get_variant_kind() -
mmap views aren't always C-contiguous — wrap each per-file slice withbefore
np.ascontiguousarray(arr[file_lo:file_hi])or the numpy in-place write..set() -
Multi-node:must be on a shared filesystem. Every worker (on every rank) opens shards by path; node-local
SHARD_DIRpaths only work for single-node demos./tmp
-
所有分片必须共享和后续轴(
dtype)。本方案沿轴0堆叠分片;目标的后续轴来自shape[1:],检测步骤会将其锁定为第一个文件的值。每个分片的行数(trailing_shape)可自由不同 — 累积偏移表会处理这种情况。示例会拒绝任何shape[0]或后续形状与第一个文件不同的分片,并抛出明确的错误信息。dtype -
选择与变体匹配的消费者。会拒绝驻留在SYSMEM的存储;
cp.from_dlpack会静默返回驻留在FBMEM的存储的主机视图,无法实际写入。通过np.asarray进行分发,让每个变体使用对应的消费者 — 见步骤4。ctx.get_variant_kind() -
内存映射视图并非始终是C连续的 — 在或numpy原地写入之前,用
.set()包装每个文件切片。np.ascontiguousarray(arr[file_lo:file_hi]) -
多节点场景:必须位于共享文件系统。每个工作节点(所有rank)通过路径打开分片;节点本地的
SHARD_DIR路径仅适用于单节点演示。/tmp
Variants
变体
Uniform-shard fast path (one task per file)
统一分片快速路径(每个文件对应一个任务)
When every shard already has the same and you happen
to have processors available, the cum-rows / bisect
machinery is overhead. Set and
; the partition then has one tile per file
and each task reads exactly one file end-to-end (no bisect, no inner
loop). The driver-side switch is a one-liner:
(shape, dtype)num_shardstile_rows = shard_shape[0]num_tasks = num_shardspython
if all(r == per_file_rows[0] for r in per_file_rows) and num_shards == num_processors:
tile_rows = per_file_rows[0]
else:
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)The same task body still works in either mode — the inner
loop just happens to iterate exactly once per task. There's no need
for a separate task body for the fast path.
load_tile当所有分片的都相同,且恰好有个可用处理器时,累积行/二分查找机制会产生额外开销。设置和;此时分区每个文件对应一个分片,每个任务读取整个文件(无需二分查找,无内部循环)。驱动端只需一行代码即可切换:
(shape, dtype)num_shardstile_rows = shard_shape[0]num_tasks = num_shardspython
if all(r == per_file_rows[0] for r in per_file_rows) and num_shards == num_processors:
tile_rows = per_file_rows[0]
else:
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)同一个任务体在两种模式下均可工作 — 内部循环恰好每个任务迭代一次。无需为快速路径编写单独的任务体。
load_tileOver-decompose for better load balancing
过度分解以实现更好的负载均衡
The default gives one
tile per processor. To over-decompose by a factor (smaller tiles,
more point tasks, finer-grained queueing), divide by
instead:
tile_rows = ceil(total_rows / num_processors)KK * num_processorspython
tile_rows = max(1, (total_rows + K * num_processors - 1) // (K * num_processors))num_tasks = ceil(total_rows / tile_rows)K * num_processors默认的为每个处理器分配一个分片。要按因子过度分解(更小的分片、更多的点任务、更细粒度的队列),改为除以:
tile_rows = ceil(total_rows / num_processors)KK * num_processorspython
tile_rows = max(1, (total_rows + K * num_processors - 1) // (K * num_processors))此时约等于。同一个任务体仍可工作 — 二分查找会为每个文件匹配更多任务。
num_tasks = ceil(total_rows / tile_rows)K * num_processorsOther formats
其他格式
Only the per-file reader inside changes. The reader's
contract: given a file path and a half-open row range
along axis 0, return a numpy array of shape
that can be made C-contiguous.
Cheap range/slice reads are required — formats that only support
"read the whole file" defeat the partial-overlap case (a tile that
covers only part of one file).
load_tile[file_lo, file_hi)(file_hi - file_lo,) + trailing_shape| Format | Reader inside the leaf task |
|---|---|
| |
| Raw binary (fixed-shape) | |
| HDF5 | |
| Parquet / Arrow | |
(For built-in single-call loaders per format, see the "Why this skill
exists" table at the top of this file.)
The discovery step (step 1) parses each format's metadata: /
HDF5 / Parquet all carry per-file row count + dtype on disk.
Raw binary doesn't — sidecar or derive from file size.
.npy仅需修改内的单文件读取器。读取器的约定:给定文件路径和沿轴0的半开行范围,返回形状为的numpy数组,且该数组可转换为C连续数组。需要支持低成本的范围/切片读取 — 仅支持“读取整个文件”的格式无法处理部分重叠的情况(分片仅覆盖文件的一部分)。
load_tile[file_lo, file_hi)(file_hi - file_lo,) + trailing_shape| 格式 | 叶子任务内的读取器 |
|---|---|
| |
| 原始二进制(固定形状) | |
| HDF5 | |
| Parquet / Arrow | |
(各格式的内置单次调用加载器见本文开头“该技能的存在意义”中的表格。)
检测步骤(步骤1)解析每种格式的元数据:/HDF5/Parquet均在磁盘上存储了每个文件的行数和dtype。原始二进制没有 — 需通过辅助文件或文件大小推导。
.npyCommon pitfalls
常见陷阱
cn.asarray(dst)
is illegal in a leaf task
cn.asarray(dst)叶子任务中使用cn.asarray(dst)
非法
cn.asarray(dst)Inside a body, any cupynumeric op that touches the top-level
runtime — , slice assignment —
triggers from the wrong context and Legion aborts:
@taskcn.asarray(store)cn_dst[s] = host_npcreate_index_spaceLEGION API USAGE EXCEPTION: Invalid task context passed to runtime call
create_index_spaceFix: consume the DLPack capsule with a third-party library (cupy /
torch / numpy) inside leaf tasks. is fine in the driver,
just not in leaf tasks. See for
the torch-flavoured workaround.
cn.asarrayexamples/dlpack/leaf_task_interop.py在体内,任何触及顶层runtime的cupynumeric操作 — 如、切片赋值 — 都会在错误的上下文中触发,导致Legion终止:
@taskcn.asarray(store)cn_dst[s] = host_npcreate_index_spaceLEGION API USAGE EXCEPTION: Invalid task context passed to runtime call
create_index_space修复方法:在叶子任务中使用第三方库(cupy/torch/numpy)消费DLPack胶囊。在驱动中使用是安全的,但不能在叶子任务中使用。可参考中的torch版本解决方案。
cn.asarrayexamples/dlpack/leaf_task_interop.pyIn-task assert
aborts the runtime
assert任务内的assert
会终止runtime
assertLegate treats unraised exceptions in a as a contract violation
and aborts unless the task was registered with .
Sanity-check on the host before launching.
@taskthrows_exception()Legate将中未捕获的异常视为违反约定,除非任务注册了,否则会终止runtime。请在启动前在主机端进行完整性检查。
@taskthrows_exception()Launch domain must match the partition tile count
启动域必须与分区分片数量匹配
create_manual_task(launch_shape=...)partition_by_tiling(...)(total_rows, tile_rows)ceilnum_processorsnum_processors > total_rowspython
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
num_tasks = (total_rows + tile_rows - 1) // tile_rows
partition = ...partition_by_tiling((tile_rows,) + trailing_shape)
runtime.create_manual_task(load_tile.library, load_tile.task_id, (num_tasks,))create_manual_task(launch_shape=...)partition_by_tiling(...)(total_rows, tile_rows)num_processorsnum_processors > total_rowspython
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
num_tasks = (total_rows + tile_rows - 1) // tile_rows
partition = ...partition_by_tiling((tile_rows,) + trailing_shape)
runtime.create_manual_task(load_tile.library, load_tile.task_id, (num_tasks,))