parallel-processing-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCRITICAL GUIDELINES
重要指南
Windows File Path Requirements
Windows文件路径要求
MANDATORY: Always Use Backslashes on Windows for File Paths
When using Edit or Write tools on Windows, you MUST use backslashes () in file paths, NOT forward slashes ().
\/强制要求:在Windows上使用文件路径时始终使用反斜杠
在Windows上使用编辑或写入工具时,文件路径必须使用反斜杠(),而不能使用正斜杠()。
\/Parallel Processing Patterns in Bash (2025)
Bash中的并行处理模式(2025版)
Overview
概述
Comprehensive guide to parallel and concurrent execution in bash, covering GNU Parallel, xargs parallelization, job control, worker pools, and modern async patterns for maximum performance.
全面介绍Bash中的并行与并发执行,涵盖GNU Parallel、xargs并行化、作业控制、工作池以及现代异步模式,以实现最高性能。
GNU Parallel (Recommended)
GNU Parallel(推荐使用)
Installation
安装
bash
undefinedbash
undefinedDebian/Ubuntu
Debian/Ubuntu
sudo apt-get install parallel
sudo apt-get install parallel
macOS
macOS
brew install parallel
brew install parallel
From source
从源码安装
wget https://ftp.gnu.org/gnu/parallel/parallel-latest.tar.bz2
tar -xjf parallel-latest.tar.bz2
cd parallel-*
./configure && make && sudo make install
undefinedwget https://ftp.gnu.org/gnu/parallel/parallel-latest.tar.bz2
tar -xjf parallel-latest.tar.bz2
cd parallel-*
./configure && make && sudo make install
undefinedBasic Usage
基础用法
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailProcess multiple files in parallel
并行处理多个文件
parallel gzip ::: *.txt
parallel gzip ::: *.txt
Equivalent to:
等效于:
for f in *.txt; do gzip "$f"; done
for f in *.txt; do gzip "$f"; done
But runs in parallel!
但以并行方式运行!
Using find with parallel
结合find使用parallel
find . -name "*.jpg" | parallel convert {} -resize 50% resized/{}
find . -name "*.jpg" | parallel convert {} -resize 50% resized/{}
Specify number of jobs
指定作业数量
parallel -j 8 process_file ::: *.dat
parallel -j 8 process_file ::: *.dat
From stdin
从标准输入读取
cat urls.txt | parallel -j 10 wget -q
cat urls.txt | parallel -j 10 wget -q
Multiple inputs
多组输入
parallel echo ::: A B C ::: 1 2 3
parallel echo ::: A B C ::: 1 2 3
Output: A 1, A 2, A 3, B 1, B 2, B 3, C 1, C 2, C 3
输出:A 1, A 2, A 3, B 1, B 2, B 3, C 1, C 2, C 3
Paired inputs with :::+
使用:::+进行配对输入
parallel echo ::: A B C :::+ 1 2 3
parallel echo ::: A B C :::+ 1 2 3
Output: A 1, B 2, C 3
输出:A 1, B 2, C 3
undefinedundefinedInput Handling
输入处理
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailInput from file
从文件读取输入
parallel -a input.txt process_line
parallel -a input.txt process_line
Multiple input files
多个输入文件
parallel -a file1.txt -a file2.txt 'echo {1} {2}'
parallel -a file1.txt -a file2.txt 'echo {1} {2}'
Column-based input
基于列的输入
cat data.tsv | parallel --colsep '\t' 'echo Name: {1}, Value: {2}'
cat data.tsv | parallel --colsep '\t' 'echo 名称: {1}, 值: {2}'
Named columns
命名列
cat data.csv | parallel --header : --colsep ',' 'echo {name}: {value}'
cat data.csv | parallel --header : --colsep ',' 'echo {name}: {value}'
Null-delimited for safety with special characters
空字符分隔(处理含特殊字符的路径更安全)
find . -name "*.txt" -print0 | parallel -0 wc -l
find . -name "*.txt" -print0 | parallel -0 wc -l
Line-based chunking
基于行的分块处理
cat huge_file.txt | parallel --pipe -N1000 'wc -l'
undefinedcat huge_file.txt | parallel --pipe -N1000 'wc -l'
undefinedReplacement Strings
替换字符串
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefail{} - Full input
{} - 完整输入内容
parallel echo 'Processing: {}' ::: file1.txt file2.txt
parallel echo '正在处理: {}' ::: file1.txt file2.txt
{.} - Remove extension
{.} - 移除文件扩展名
parallel echo '{.}' ::: file.txt file.csv
parallel echo '{.}' ::: file.txt file.csv
Output: file, file
输出:file, file
{/} - Basename
{/} - 文件名(不含路径)
parallel echo '{/}' ::: /path/to/file.txt
parallel echo '{/}' ::: /path/to/file.txt
Output: file.txt
输出:file.txt
{//} - Directory path
{//} - 目录路径
parallel echo '{//}' ::: /path/to/file.txt
parallel echo '{//}' ::: /path/to/file.txt
Output: /path/to
输出:/path/to
{/.} - Basename without extension
{/.} - 不含路径和扩展名的文件名
parallel echo '{/.}' ::: /path/to/file.txt
parallel echo '{/.}' ::: /path/to/file.txt
Output: file
输出:file
{#} - Job number (1-based)
{#} - 作业编号(从1开始)
parallel echo 'Job {#}: {}' ::: A B C
parallel echo '作业 {#}: {}' ::: A B C
{%} - Slot number (recycled job slot)
{%} - 作业槽编号(循环复用的作业槽)
parallel -j 2 'echo "Slot {%}: {}"' ::: A B C D E
parallel -j 2 'echo "槽位 {%}: {}"' ::: A B C D E
Combined
组合使用
parallel 'convert {} -resize 50% {//}/thumb_{/.}.jpg' ::: *.png
undefinedparallel 'convert {} -resize 50% {//}/thumb_{/.}.jpg' ::: *.png
undefinedProgress and Logging
进度与日志
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailShow progress bar
显示进度条
parallel --bar process_item ::: {1..100}
parallel --bar process_item ::: {1..100}
Progress with ETA
显示进度和预计完成时间
parallel --progress process_item ::: {1..100}
parallel --progress process_item ::: {1..100}
Verbose output
详细输出
parallel --verbose gzip ::: *.txt
parallel --verbose gzip ::: *.txt
Log to file
记录到日志文件
parallel --joblog jobs.log gzip ::: *.txt
parallel --joblog jobs.log gzip ::: *.txt
Resume from where it left off (skip completed jobs)
从断点恢复(跳过已完成的作业)
parallel --joblog jobs.log --resume gzip ::: *.txt
parallel --joblog jobs.log --resume gzip ::: *.txt
Results logging
结果日志
parallel --results results_dir 'echo {1} + {2}' ::: 1 2 3 ::: 4 5 6
parallel --results results_dir 'echo {1} + {2}' ::: 1 2 3 ::: 4 5 6
Creates: results_dir/1/4/stdout, results_dir/1/4/stderr, etc.
生成文件:results_dir/1/4/stdout, results_dir/1/4/stderr等
undefinedundefinedResource Management
资源管理
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailCPU-based parallelism (number of cores)
基于CPU核心数的并行(使用所有核心)
parallel -j "$(nproc)" process_item ::: {1..1000}
parallel -j "$(nproc)" process_item ::: {1..1000}
Leave some cores free
预留部分核心
parallel -j '-2' process_item ::: {1..1000} # nproc - 2
parallel -j '-2' process_item ::: {1..1000} # nproc - 2
Percentage of cores
使用指定比例的核心
parallel -j '50%' process_item ::: {1..1000}
parallel -j '50%' process_item ::: {1..1000}
Load-based throttling
基于系统负载的限流
parallel --load 80% process_item ::: {1..1000}
parallel --load 80% process_item ::: {1..1000}
Memory-based throttling
基于可用内存的限流
parallel --memfree 2G process_item ::: {1..1000}
parallel --memfree 2G process_item ::: {1..1000}
Rate limiting (max jobs per second)
速率限制(每秒最大作业数)
parallel -j 4 --delay 0.5 wget ::: url1 url2 url3 url4
parallel -j 4 --delay 0.5 wget ::: url1 url2 url3 url4
Timeout per job
单作业超时时间
parallel --timeout 60 long_process ::: {1..100}
parallel --timeout 60 long_process ::: {1..100}
Retry failed jobs
重试失败的作业
parallel --retries 3 flaky_process ::: {1..100}
undefinedparallel --retries 3 flaky_process ::: {1..100}
undefinedDistributed Execution
分布式执行
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailRun on multiple servers
在多台服务器上运行
parallel --sshloginfile servers.txt process_item ::: {1..1000}
parallel --sshloginfile servers.txt process_item ::: {1..1000}
servers.txt format:
servers.txt格式:
4/server1.example.com (4 jobs on server1)
4/server1.example.com (在server1上运行4个作业)
8/server2.example.com (8 jobs on server2)
8/server2.example.com (在server2上运行8个作业)
: (local machine)
: (本地机器)
Transfer files before execution
执行前传输文件
parallel --sshloginfile servers.txt --transferfile {} process {} ::: *.dat
parallel --sshloginfile servers.txt --transferfile {} process {} ::: *.dat
Return results
取回执行结果
parallel --sshloginfile servers.txt --return {.}.result process {} ::: *.dat
parallel --sshloginfile servers.txt --return {.}.result process {} ::: *.dat
Cleanup after transfer
传输后清理文件
parallel --sshloginfile servers.txt --transfer --return {.}.out --cleanup
'process {} > {.}.out' ::: *.dat
'process {} > {.}.out' ::: *.dat
parallel --sshloginfile servers.txt --transfer --return {.}.out --cleanup
'process {} > {.}.out' ::: *.dat
'process {} > {.}.out' ::: *.dat
Environment variables
环境变量传递
export MY_VAR="value"
parallel --env MY_VAR --sshloginfile servers.txt 'echo $MY_VAR' ::: A B C
undefinedexport MY_VAR="value"
parallel --env MY_VAR --sshloginfile servers.txt 'echo $MY_VAR' ::: A B C
undefinedComplex Pipelines
复杂流水线
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailPipe mode - distribute stdin across workers
管道模式 - 将标准输入分配给多个工作进程
cat huge_file.txt | parallel --pipe -N1000 'sort | uniq -c'
cat huge_file.txt | parallel --pipe -N1000 'sort | uniq -c'
Block size for pipe mode
管道模式的块大小
cat data.bin | parallel --pipe --block 10M 'process_chunk'
cat data.bin | parallel --pipe --block 10M 'process_chunk'
Keep order of output
保持输出顺序
parallel --keep-order 'sleep $((RANDOM % 3)); echo {}' ::: A B C D E
parallel --keep-order 'sleep $((RANDOM % 3)); echo {}' ::: A B C D E
Group output (don't mix output from different jobs)
分组输出(不混合不同作业的输出)
parallel --group 'for i in 1 2 3; do echo "Job {}: line $i"; done' ::: A B C
parallel --group 'for i in 1 2 3; do echo "作业 {}: 第$i行"; done' ::: A B C
Tag output with job identifier
为输出添加作业标识符
parallel --tag 'echo "output from {}"' ::: A B C
parallel --tag 'echo "来自{}的输出"' ::: A B C
Sequence output (output as they complete, but grouped)
按完成顺序输出(但分组显示)
parallel --ungroup 'echo "Starting {}"; sleep 1; echo "Done {}"' ::: A B C
undefinedparallel --ungroup 'echo "开始执行{}"; sleep 1; echo "完成执行{}"' ::: A B C
undefinedxargs Parallelization
xargs并行化
Basic Parallel xargs
xargs基础并行用法
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefail-P for parallel jobs
-P 指定并行作业数
find . -name "*.txt" | xargs -P 4 -I {} gzip {}
find . -name "*.txt" | xargs -P 4 -I {} gzip {}
-n for items per command
-n 指定每个命令处理的项目数
echo {1..100} | xargs -n 10 -P 4 echo "Batch:"
echo {1..100} | xargs -n 10 -P 4 echo "批次:"
Null-delimited for safety
空字符分隔(处理含特殊字符的路径更安全)
find . -name "*.txt" -print0 | xargs -0 -P 4 -I {} process {}
find . -name "*.txt" -print0 | xargs -0 -P 4 -I {} process {}
Multiple arguments per process
每个进程处理多个参数
cat urls.txt | xargs -P 10 -n 5 wget -q
cat urls.txt | xargs -P 10 -n 5 wget -q
Limit max total arguments
限制总参数数量
echo {1..1000} | xargs -P 4 --max-args=50 echo
undefinedecho {1..1000} | xargs -P 4 --max-args=50 echo
undefinedxargs with Complex Commands
xargs结合复杂命令
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailUse sh -c for complex commands
使用sh -c执行复杂命令
find . -name "*.jpg" -print0 |
xargs -0 -P 4 -I {} sh -c 'convert "$1" -resize 50% "thumb_$(basename "$1")"' _ {}
xargs -0 -P 4 -I {} sh -c 'convert "$1" -resize 50% "thumb_$(basename "$1")"' _ {}
find . -name "*.jpg" -print0 |
xargs -0 -P 4 -I {} sh -c 'convert "$1" -resize 50% "thumb_$(basename "$1")"' _ {}
xargs -0 -P 4 -I {} sh -c 'convert "$1" -resize 50% "thumb_$(basename "$1")"' _ {}
Multiple placeholders
多个占位符
paste file1.txt file2.txt |
xargs -P 4 -n 2 sh -c 'diff "$1" "$2" > "diff_$(basename "$1" .txt).patch"' _
xargs -P 4 -n 2 sh -c 'diff "$1" "$2" > "diff_$(basename "$1" .txt).patch"' _
paste file1.txt file2.txt |
xargs -P 4 -n 2 sh -c 'diff "$1" "$2" > "diff_$(basename "$1" .txt).patch"' _
xargs -P 4 -n 2 sh -c 'diff "$1" "$2" > "diff_$(basename "$1" .txt).patch"' _
Process in batches
批量处理
find . -name "*.log" -print0 |
xargs -0 -P 4 -n 100 tar -czvf logs_batch.tar.gz
xargs -0 -P 4 -n 100 tar -czvf logs_batch.tar.gz
find . -name "*.log" -print0 |
xargs -0 -P 4 -n 100 tar -czvf logs_batch.tar.gz
xargs -0 -P 4 -n 100 tar -czvf logs_batch.tar.gz
With failure handling
失败处理
find . -name "*.dat" -print0 |
xargs -0 -P 4 -I {} sh -c 'process "$1" || echo "Failed: $1" >> failures.log' _ {}
xargs -0 -P 4 -I {} sh -c 'process "$1" || echo "Failed: $1" >> failures.log' _ {}
undefinedfind . -name "*.dat" -print0 |
xargs -0 -P 4 -I {} sh -c 'process "$1" || echo "失败: $1" >> failures.log' _ {}
xargs -0 -P 4 -I {} sh -c 'process "$1" || echo "失败: $1" >> failures.log' _ {}
undefinedJob Control Patterns
作业控制模式
Background Job Management
后台作业管理
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailTrack background jobs
跟踪后台作业的PID
declare -a PIDS=()
declare -a PIDS=()
Start jobs
启动作业
for item in {1..10}; do
process_item "$item" &
PIDS+=($!)
done
for item in {1..10}; do
process_item "$item" &
PIDS+=($!)
done
Wait for all
等待所有作业完成
for pid in "${PIDS[@]}"; do
wait "$pid"
done
echo "All jobs complete"
for pid in "${PIDS[@]}"; do
wait "$pid"
done
echo "所有作业已完成"
Or wait for any to complete
或等待任意一个作业完成
wait -n # Bash 4.3+
echo "At least one job complete"
undefinedwait -n # Bash 4.3+
echo "至少一个作业已完成"
undefinedJob Pool with Semaphore
基于信号量的作业池
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailMaximum concurrent jobs
最大并发作业数
MAX_JOBS=4
MAX_JOBS=4
Simple semaphore using a counter
使用计数器实现简单信号量
job_count=0
run_with_limit() {
local cmd=("$@")
# Wait if at limit
while ((job_count >= MAX_JOBS)); do
wait -n 2>/dev/null || true
((job_count--))
done
# Start new job
"${cmd[@]}" &
((job_count++))}
job_count=0
run_with_limit() {
local cmd=("$@")
# 达到上限时等待
while ((job_count >= MAX_JOBS)); do
wait -n 2>/dev/null || true
((job_count--))
done
# 启动新作业
"${cmd[@]}" &
((job_count++))}
Usage
使用示例
for item in {1..20}; do
run_with_limit process_item "$item"
done
for item in {1..20}; do
run_with_limit process_item "$item"
done
Wait for remaining
等待剩余作业完成
wait
undefinedwait
undefinedFIFO-Based Job Pool
基于FIFO的作业池
bash
#!/usr/bin/env bash
set -euo pipefail
MAX_JOBS=4
JOB_FIFO="/tmp/job_pool_$$"bash
#!/usr/bin/env bash
set -euo pipefail
MAX_JOBS=4
JOB_FIFO="/tmp/job_pool_$$"Create job slots
创建作业槽
mkfifo "$JOB_FIFO"
trap 'rm -f "$JOB_FIFO"' EXIT
mkfifo "$JOB_FIFO"
trap 'rm -f "$JOB_FIFO"' EXIT
Initialize slots
初始化作业槽
exec 3<>"$JOB_FIFO"
for ((i=0; i<MAX_JOBS; i++)); do
echo >&3
done
exec 3<>"$JOB_FIFO"
for ((i=0; i<MAX_JOBS; i++)); do
echo >&3
done
Run with slot
使用作业槽运行作业
run_with_slot() {
local cmd=("$@")
read -u 3 # Acquire slot (blocks if none available)
{
"${cmd[@]}"
echo >&3 # Release slot
} &}
run_with_slot() {
local cmd=("$@")
read -u 3 # 获取作业槽(无可用槽时阻塞)
{
"${cmd[@]}"
echo >&3 # 释放作业槽
} &}
Usage
使用示例
for item in {1..20}; do
run_with_slot process_item "$item"
done
wait
exec 3>&-
undefinedfor item in {1..20}; do
run_with_slot process_item "$item"
done
wait
exec 3>&-
undefinedWorker Pool Pattern
工作池模式
bash
#!/usr/bin/env bash
set -euo pipefail
WORK_QUEUE="/tmp/work_queue_$$"
RESULT_QUEUE="/tmp/result_queue_$$"
NUM_WORKERS=4
mkfifo "$WORK_QUEUE" "$RESULT_QUEUE"
trap 'rm -f "$WORK_QUEUE" "$RESULT_QUEUE"' EXITbash
#!/usr/bin/env bash
set -euo pipefail
WORK_QUEUE="/tmp/work_queue_$$"
RESULT_QUEUE="/tmp/result_queue_$$"
NUM_WORKERS=4
mkfifo "$WORK_QUEUE" "$RESULT_QUEUE"
trap 'rm -f "$WORK_QUEUE" "$RESULT_QUEUE"' EXITWorker function
工作进程函数
worker() {
local id="$1"
while read -r task; do
[[ "$task" == "STOP" ]] && break
# Process task
local result
result=$(process_task "$task" 2>&1)
echo "RESULT:$id:$task:$result"
done}
worker() {
local id="$1"
while read -r task; do
[[ "$task" == "STOP" ]] && break
# 处理任务
local result
result=$(process_task "$task" 2>&1)
echo "RESULT:$id:$task:$result"
done}
Start workers
启动工作进程
for ((i=0; i<NUM_WORKERS; i++)); do
worker "$i" < "$WORK_QUEUE" > "$RESULT_QUEUE" &
done
for ((i=0; i<NUM_WORKERS; i++)); do
worker "$i" < "$WORK_QUEUE" > "$RESULT_QUEUE" &
done
Result collector (background)
结果收集器(后台运行)
collect_results() {
while read -r line; do
[[ "$line" == "DONE" ]] && break
echo "$line" >> results.txt
done < "$RESULT_QUEUE"
} &
COLLECTOR_PID=$!
collect_results() {
while read -r line; do
[[ "$line" == "DONE" ]] && break
echo "$line" >> results.txt
done < "$RESULT_QUEUE"
} &
COLLECTOR_PID=$!
Producer - send work
生产者 - 发送任务
{
for task in "${TASKS[@]}"; do
echo "$task"
done
# Stop signals for workers
for ((i=0; i<NUM_WORKERS; i++)); do
echo "STOP"
done} > "$WORK_QUEUE"
{
for task in "${TASKS[@]}"; do
echo "$task"
done
# 向工作进程发送停止信号
for ((i=0; i<NUM_WORKERS; i++)); do
echo "STOP"
done} > "$WORK_QUEUE"
Signal end of results
通知结果收集器结束
wait # Wait for workers
echo "DONE" > "$RESULT_QUEUE"
wait "$COLLECTOR_PID"
undefinedwait # 等待所有工作进程完成
echo "DONE" > "$RESULT_QUEUE"
wait "$COLLECTOR_PID"
undefinedModern Async Patterns
现代异步模式
Promise-Like Pattern
类Promise模式
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailAsync function wrapper
异步函数包装器
async() {
local result_var="$1"
shift
local cmd=("$@")
# Create temp file for result
local result_file
result_file=$(mktemp)
# Run in background, save result
{
if "${cmd[@]}" > "$result_file" 2>&1; then
echo "0" >> "$result_file.status"
else
echo "$?" >> "$result_file.status"
fi
} &
# Store PID and result file location
eval "${result_var}_pid=$!"
eval "${result_var}_file='$result_file'"}
async() {
local result_var="$1"
shift
local cmd=("$@")
# 创建存储结果的临时文件
local result_file
result_file=$(mktemp)
# 后台运行命令并保存结果
{
if "${cmd[@]}" > "$result_file" 2>&1; then
echo "0" >> "$result_file.status"
else
echo "$?" >> "$result_file.status"
fi
} &
# 存储PID和结果文件路径
eval "${result_var}_pid=$!"
eval "${result_var}_file='$result_file'"}
Await result
等待结果返回
await() {
local result_var="$1"
local pid_var="${result_var}_pid"
local file_var="${result_var}_file"
# Wait for completion
wait "${!pid_var}"
# Get result
cat "${!file_var}"
local status
status=$(cat "${!file_var}.status")
# Cleanup
rm -f "${!file_var}" "${!file_var}.status"
return "$status"}
await() {
local result_var="$1"
local pid_var="${result_var}_pid"
local file_var="${result_var}_file"
# 等待作业完成
wait "${!pid_var}"
# 获取结果
cat "${!file_var}"
local status
status=$(cat "${!file_var}.status")
# 清理临时文件
rm -f "${!file_var}" "${!file_var}.status"
return "$status"}
Usage
使用示例
async result1 curl -s "https://api1.example.com/data"
async result2 curl -s "https://api2.example.com/data"
async result3 process_local_data
async result1 curl -s "https://api1.example.com/data"
async result2 curl -s "https://api2.example.com/data"
async result3 process_local_data
Do other work here...
在此处执行其他任务...
Get results (blocks until complete)
获取结果(阻塞直到完成)
data1=$(await result1)
data2=$(await result2)
data3=$(await result3)
undefineddata1=$(await result1)
data2=$(await result2)
data3=$(await result3)
undefinedEvent Loop Pattern
事件循环模式
bash
#!/usr/bin/env bash
set -euo pipefail
declare -A TASKS
declare -A TASK_RESULTS
TASK_COUNTER=0bash
#!/usr/bin/env bash
set -euo pipefail
declare -A TASKS
declare -A TASK_RESULTS
TASK_COUNTER=0Register async task
注册异步任务
schedule() {
local cmd=("$@")
local task_id=$((++TASK_COUNTER))
local output_file="/tmp/task_${task_id}_$$"
"${cmd[@]}" > "$output_file" 2>&1 &
TASKS[$task_id]=$!
TASK_RESULTS[$task_id]="$output_file"
echo "$task_id"}
schedule() {
local cmd=("$@")
local task_id=$((++TASK_COUNTER))
local output_file="/tmp/task_${task_id}_$$"
"${cmd[@]}" > "$output_file" 2>&1 &
TASKS[$task_id]=$!
TASK_RESULTS[$task_id]="$output_file"
echo "$task_id"}
Check if task complete
检查任务是否完成
is_complete() {
local task_id="$1"
! kill -0 "${TASKS[$task_id]}" 2>/dev/null
}
is_complete() {
local task_id="$1"
! kill -0 "${TASKS[$task_id]}" 2>/dev/null
}
Get task result
获取任务结果
get_result() {
local task_id="$1"
wait "${TASKS[$task_id]}" 2>/dev/null || true
cat "${TASK_RESULTS[$task_id]}"
rm -f "${TASK_RESULTS[$task_id]}"
}
get_result() {
local task_id="$1"
wait "${TASKS[$task_id]}" 2>/dev/null || true
cat "${TASK_RESULTS[$task_id]}"
rm -f "${TASK_RESULTS[$task_id]}"
}
Event loop
事件循环
run_event_loop() {
local pending=("${!TASKS[@]}")
while ((${#pending[@]} > 0)); do
local still_pending=()
for task_id in "${pending[@]}"; do
if is_complete "$task_id"; then
local result
result=$(get_result "$task_id")
on_task_complete "$task_id" "$result"
else
still_pending+=("$task_id")
fi
done
pending=("${still_pending[@]}")
# Small sleep to prevent busy-waiting
((${#pending[@]} > 0)) && sleep 0.1
done}
run_event_loop() {
local pending=("${!TASKS[@]}")
while ((${#pending[@]} > 0)); do
local still_pending=()
for task_id in "${pending[@]}"; do
if is_complete "$task_id"; then
local result
result=$(get_result "$task_id")
on_task_complete "$task_id" "$result"
else
still_pending+=("$task_id")
fi
done
pending=("${still_pending[@]}")
# 短暂休眠避免忙等
((${#pending[@]} > 0)) && sleep 0.1
done}
Callback for completed tasks
任务完成回调
on_task_complete() {
local task_id="$1"
local result="$2"
echo "Task $task_id complete: ${result:0:50}..."
}
undefinedon_task_complete() {
local task_id="$1"
local result="$2"
echo "任务 $task_id 完成: ${result:0:50}..."
}
undefinedFan-Out/Fan-In Pattern
扇出/扇入模式
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailFan-out: distribute work
扇出:分配任务
fan_out() {
local -n items="$1"
local workers="$2"
local worker_func="$3"
local chunk_size=$(( (${#items[@]} + workers - 1) / workers ))
local pids=()
for ((i=0; i<workers; i++)); do
local start=$((i * chunk_size))
local chunk=("${items[@]:start:chunk_size}")
if ((${#chunk[@]} > 0)); then
$worker_func "${chunk[@]}" &
pids+=($!)
fi
done
# Return PIDs for fan_in
echo "${pids[*]}"}
fan_out() {
local -n items="$1"
local workers="$2"
local worker_func="$3"
local chunk_size=$(( (${#items[@]} + workers - 1) / workers ))
local pids=()
for ((i=0; i<workers; i++)); do
local start=$((i * chunk_size))
local chunk=("${items[@]:start:chunk_size}")
if ((${#chunk[@]} > 0)); then
$worker_func "${chunk[@]}" &
pids+=($!)
fi
done
# 返回PID用于扇入
echo "${pids[*]}"}
Fan-in: collect results
扇入:收集结果
fan_in() {
local -a pids=($1)
local results=()
for pid in "${pids[@]}"; do
wait "$pid"
done}
fan_in() {
local -a pids=($1)
local results=()
for pid in "${pids[@]}"; do
wait "$pid"
done}
Example worker
示例工作函数
process_chunk() {
local items=("$@")
for item in "${items[@]}"; do
echo "Processed: $item"
done
}
process_chunk() {
local items=("$@")
for item in "${items[@]}"; do
echo "已处理: $item"
done
}
Usage
使用示例
data=({1..100})
pids=$(fan_out data 4 process_chunk)
fan_in "$pids"
undefineddata=({1..100})
pids=$(fan_out data 4 process_chunk)
fan_in "$pids"
undefinedMap-Reduce Pattern
映射-归约模式
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailMap function
映射函数
parallel_map() {
local -n input="$1"
local map_func="$2"
local workers="${3:-$(nproc)}"
printf '%s\n' "${input[@]}" | \
parallel -j "$workers" "$map_func"}
parallel_map() {
local -n input="$1"
local map_func="$2"
local workers="${3:-$(nproc)}"
printf '%s\n' "${input[@]}" | \
parallel -j "$workers" "$map_func"}
Reduce function
归约函数
reduce() {
local reduce_func="$1"
local accumulator="$2"
while IFS= read -r value; do
accumulator=$($reduce_func "$accumulator" "$value")
done
echo "$accumulator"}
reduce() {
local reduce_func="$1"
local accumulator="$2"
while IFS= read -r value; do
accumulator=$($reduce_func "$accumulator" "$value")
done
echo "$accumulator"}
Example: Sum of squares
示例:平方和
square() { echo $(($1 * $1)); }
add() { echo $(($1 + $2)); }
numbers=({1..100})
sum_of_squares=$(
parallel_map numbers square 4 | reduce add 0
)
echo "Sum of squares: $sum_of_squares"
square() { echo $(($1 * $1)); }
add() { echo $(($1 + $2)); }
numbers=({1..100})
sum_of_squares=$(
parallel_map numbers square 4 | reduce add 0
)
echo "平方和: $sum_of_squares"
Word count example
词频统计示例
word_count_map() {
tr ' ' '\n' | sort | uniq -c
}
word_count_reduce() {
sort -k2 | awk '{
if ($2 == prev) { count += $1 }
else { if (prev) print count, prev; count = $1; prev = $2 }
} END { if (prev) print count, prev }'
}
cat large_text.txt |
parallel --pipe -N1000 word_count_map |
word_count_reduce
parallel --pipe -N1000 word_count_map |
word_count_reduce
undefinedword_count_map() {
tr ' ' '\n' | sort | uniq -c
}
word_count_reduce() {
sort -k2 | awk '{
if ($2 == prev) { count += $1 }
else { if (prev) print count, prev; count = $1; prev = $2 }
} END { if (prev) print count, prev }'
}
cat large_text.txt |
parallel --pipe -N1000 word_count_map |
word_count_reduce
parallel --pipe -N1000 word_count_map |
word_count_reduce
undefinedPerformance Optimization
性能优化
Batch Processing
批量处理
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailProcess in optimal batch sizes
以最优批量大小处理
optimal_batch_process() {
local items=("$@")
local batch_size=100
local workers=$(nproc)
printf '%s\n' "${items[@]}" | \
parallel --pipe -N"$batch_size" -j"$workers" '
while IFS= read -r item; do
process_item "$item"
done
'}
optimal_batch_process() {
local items=("$@")
local batch_size=100
local workers=$(nproc)
printf '%s\n' "${items[@]}" | \
parallel --pipe -N"$batch_size" -j"$workers" '
while IFS= read -r item; do
process_item "$item"
done
'}
Dynamic batch sizing based on memory
基于内存的动态批量调整
dynamic_batch() {
local mem_available
mem_available=$(free -m | awk '/^Mem:/ {print $7}')
# Adjust batch size based on available memory
local batch_size=$((mem_available / 100)) # 100MB per batch
((batch_size < 10)) && batch_size=10
((batch_size > 1000)) && batch_size=1000
parallel --pipe -N"$batch_size" process_batch}
undefineddynamic_batch() {
local mem_available
mem_available=$(free -m | awk '/^Mem:/ {print $7}')
# 根据可用内存调整批量大小
local batch_size=$((mem_available / 100)) # 每个批量占用100MB内存
((batch_size < 10)) && batch_size=10
((batch_size > 1000)) && batch_size=1000
parallel --pipe -N"$batch_size" process_batch}
undefinedI/O Optimization
I/O优化
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailUse tmpfs for intermediate files
使用tmpfs存储中间文件
setup_fast_temp() {
local tmpdir="/dev/shm/parallel_$$"
mkdir -p "$tmpdir"
trap 'rm -rf "$tmpdir"' EXIT
echo "$tmpdir"
}
setup_fast_temp() {
local tmpdir="/dev/shm/parallel_$$"
mkdir -p "$tmpdir"
trap 'rm -rf "$tmpdir"' EXIT
echo "$tmpdir"
}
Buffer I/O operations
缓冲I/O操作
buffered_parallel() {
local input="$1"
local tmpdir
tmpdir=$(setup_fast_temp)
# Split input into chunks
split -l 1000 "$input" "$tmpdir/chunk_"
# Process chunks in parallel
parallel process_chunk {} ::: "$tmpdir"/chunk_*
# Combine results
cat "$tmpdir"/result_* > output.txt}
buffered_parallel() {
local input="$1"
local tmpdir
tmpdir=$(setup_fast_temp)
# 将输入拆分为多个块
split -l 1000 "$input" "$tmpdir/chunk_"
# 并行处理块
parallel process_chunk {} ::: "$tmpdir"/chunk_*
# 合并结果
cat "$tmpdir"/result_* > output.txt}
Avoid disk I/O with process substitution
使用进程替换避免磁盘I/O
no_disk_parallel() {
# Instead of:
# command > temp.txt
# parallel process ::: temp.txt
# rm temp.txt
# Do this:
command | parallel --pipe process}
undefinedno_disk_parallel() {
# 不要这样做:
# command > temp.txt
# parallel process ::: temp.txt
# rm temp.txt
# 应该这样做:
command | parallel --pipe process}
undefinedCPU Affinity
CPU亲和性
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailPin workers to specific CPUs
将工作进程绑定到特定CPU
cpu_pinned_parallel() {
local num_cpus
num_cpus=$(nproc)
for ((cpu=0; cpu<num_cpus; cpu++)); do
taskset -c "$cpu" process_worker "$cpu" &
done
wait}
cpu_pinned_parallel() {
local num_cpus
num_cpus=$(nproc)
for ((cpu=0; cpu<num_cpus; cpu++)); do
taskset -c "$cpu" process_worker "$cpu" &
done
wait}
NUMA-aware processing
NUMA感知处理
numa_parallel() {
local num_nodes
num_nodes=$(numactl --hardware | grep "available:" | awk '{print $2}')
for ((node=0; node<num_nodes; node++)); do
numactl --cpunodebind="$node" --membind="$node" \
process_chunk "$node" &
done
wait}
undefinednuma_parallel() {
local num_nodes
num_nodes=$(numactl --hardware | grep "available:" | awk '{print $2}')
for ((node=0; node<num_nodes; node++)); do
numactl --cpunodebind="$node" --membind="$node" \
process_chunk "$node" &
done
wait}
undefinedError Handling
错误处理
Graceful Failure Handling
优雅的失败处理
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailTrack failures
跟踪失败的作业
declare -A FAILURES
parallel_with_retry() {
local max_retries=3
local items=("$@")
for item in "${items[@]}"; do
local retries=0
local success=false
while ((retries < max_retries)) && ! $success; do
if process_item "$item"; then
success=true
else
((retries++))
echo "Retry $retries for $item" >&2
sleep $((retries * 2)) # Exponential backoff
fi
done
if ! $success; then
FAILURES["$item"]="Failed after $max_retries retries"
fi
done &
wait}
declare -A FAILURES
parallel_with_retry() {
local max_retries=3
local items=("$@")
for item in "${items[@]}"; do
local retries=0
local success=false
while ((retries < max_retries)) && ! $success; do
if process_item "$item"; then
success=true
else
((retries++))
echo "正在重试第$retries次: $item" >&2
sleep $((retries * 2)) # 指数退避
fi
done
if ! $success; then
FAILURES["$item"]="经过$max_retries次重试后仍失败"
fi
done &
wait}
Report failures
报告失败情况
report_failures() {
if ((${#FAILURES[@]} > 0)); then
echo "Failures:" >&2
for item in "${!FAILURES[@]}"; do
echo " $item: ${FAILURES[$item]}" >&2
done
return 1
fi
}
undefinedreport_failures() {
if ((${#FAILURES[@]} > 0)); then
echo "失败列表:" >&2
for item in "${!FAILURES[@]}"; do
echo " $item: ${FAILURES[$item]}" >&2
done
return 1
fi
}
undefinedCancellation Support
取消支持
bash
#!/usr/bin/env bash
set -euo pipefailbash
#!/usr/bin/env bash
set -euo pipefailGlobal cancellation flag
全局取消标志
CANCELLED=false
declare -a WORKER_PIDS=()
cancel_all() {
CANCELLED=true
for pid in "${WORKER_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
}
trap cancel_all SIGINT SIGTERM
cancellable_worker() {
local id="$1"
while ! $CANCELLED; do
# Check for work
if work=$(get_next_work); then
process_work "$work"
else
sleep 0.1
fi
done
}
CANCELLED=false
declare -a WORKER_PIDS=()
cancel_all() {
CANCELLED=true
for pid in "${WORKER_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
}
trap cancel_all SIGINT SIGTERM
cancellable_worker() {
local id="$1"
while ! $CANCELLED; do
# 检查是否有新任务
if work=$(get_next_work); then
process_work "$work"
else
sleep 0.1
fi
done
}
Start workers
启动工作进程
for ((i=0; i<NUM_WORKERS; i++)); do
cancellable_worker "$i" &
WORKER_PIDS+=($!)
done
for ((i=0; i<NUM_WORKERS; i++)); do
cancellable_worker "$i" &
WORKER_PIDS+=($!)
done
Wait with interrupt support
等待并支持中断
wait || true
undefinedwait || true
undefinedResources
参考资源
- GNU Parallel Tutorial
- GNU Parallel Manual
- Bash Job Control
- Advanced Bash-Scripting Guide - Process Substitution
Master parallel processing for efficient multi-core utilization and faster script execution.