parallel-processing-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CRITICAL GUIDELINES

重要指南

Windows File Path Requirements

Windows文件路径要求

MANDATORY: Always Use Backslashes on Windows for File Paths
When using Edit or Write tools on Windows, you MUST use backslashes (
\
) in file paths, NOT forward slashes (
/
).

强制要求:在Windows上使用文件路径时始终使用反斜杠
在Windows上使用编辑或写入工具时,文件路径必须使用反斜杠(
\
),而不能使用正斜杠(
/
)。

Parallel Processing Patterns in Bash (2025)

Bash中的并行处理模式(2025版)

Overview

概述

Comprehensive guide to parallel and concurrent execution in bash, covering GNU Parallel, xargs parallelization, job control, worker pools, and modern async patterns for maximum performance.
全面介绍Bash中的并行与并发执行,涵盖GNU Parallel、xargs并行化、作业控制、工作池以及现代异步模式,以实现最高性能。

GNU Parallel (Recommended)

GNU Parallel(推荐使用)

Installation

安装

bash
undefined
bash
undefined

Debian/Ubuntu

Debian/Ubuntu

sudo apt-get install parallel
sudo apt-get install parallel

macOS

macOS

brew install parallel
brew install parallel

From source

从源码安装

wget https://ftp.gnu.org/gnu/parallel/parallel-latest.tar.bz2 tar -xjf parallel-latest.tar.bz2 cd parallel-* ./configure && make && sudo make install
undefined
wget https://ftp.gnu.org/gnu/parallel/parallel-latest.tar.bz2 tar -xjf parallel-latest.tar.bz2 cd parallel-* ./configure && make && sudo make install
undefined

Basic Usage

基础用法

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Process multiple files in parallel

并行处理多个文件

parallel gzip ::: *.txt
parallel gzip ::: *.txt

Equivalent to:

等效于:

for f in *.txt; do gzip "$f"; done

for f in *.txt; do gzip "$f"; done

But runs in parallel!

但以并行方式运行!

Using find with parallel

结合find使用parallel

find . -name "*.jpg" | parallel convert {} -resize 50% resized/{}
find . -name "*.jpg" | parallel convert {} -resize 50% resized/{}

Specify number of jobs

指定作业数量

parallel -j 8 process_file ::: *.dat
parallel -j 8 process_file ::: *.dat

From stdin

从标准输入读取

cat urls.txt | parallel -j 10 wget -q
cat urls.txt | parallel -j 10 wget -q

Multiple inputs

多组输入

parallel echo ::: A B C ::: 1 2 3
parallel echo ::: A B C ::: 1 2 3

Output: A 1, A 2, A 3, B 1, B 2, B 3, C 1, C 2, C 3

输出:A 1, A 2, A 3, B 1, B 2, B 3, C 1, C 2, C 3

Paired inputs with :::+

使用:::+进行配对输入

parallel echo ::: A B C :::+ 1 2 3
parallel echo ::: A B C :::+ 1 2 3

Output: A 1, B 2, C 3

输出:A 1, B 2, C 3

undefined
undefined

Input Handling

输入处理

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Input from file

从文件读取输入

parallel -a input.txt process_line
parallel -a input.txt process_line

Multiple input files

多个输入文件

parallel -a file1.txt -a file2.txt 'echo {1} {2}'
parallel -a file1.txt -a file2.txt 'echo {1} {2}'

Column-based input

基于列的输入

cat data.tsv | parallel --colsep '\t' 'echo Name: {1}, Value: {2}'
cat data.tsv | parallel --colsep '\t' 'echo 名称: {1}, 值: {2}'

Named columns

命名列

cat data.csv | parallel --header : --colsep ',' 'echo {name}: {value}'
cat data.csv | parallel --header : --colsep ',' 'echo {name}: {value}'

Null-delimited for safety with special characters

空字符分隔(处理含特殊字符的路径更安全)

find . -name "*.txt" -print0 | parallel -0 wc -l
find . -name "*.txt" -print0 | parallel -0 wc -l

Line-based chunking

基于行的分块处理

cat huge_file.txt | parallel --pipe -N1000 'wc -l'
undefined
cat huge_file.txt | parallel --pipe -N1000 'wc -l'
undefined

Replacement Strings

替换字符串

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

{} - Full input

{} - 完整输入内容

parallel echo 'Processing: {}' ::: file1.txt file2.txt
parallel echo '正在处理: {}' ::: file1.txt file2.txt

{.} - Remove extension

{.} - 移除文件扩展名

parallel echo '{.}' ::: file.txt file.csv
parallel echo '{.}' ::: file.txt file.csv

Output: file, file

输出:file, file

{/} - Basename

{/} - 文件名(不含路径)

parallel echo '{/}' ::: /path/to/file.txt
parallel echo '{/}' ::: /path/to/file.txt

Output: file.txt

输出:file.txt

{//} - Directory path

{//} - 目录路径

parallel echo '{//}' ::: /path/to/file.txt
parallel echo '{//}' ::: /path/to/file.txt

Output: /path/to

输出:/path/to

{/.} - Basename without extension

{/.} - 不含路径和扩展名的文件名

parallel echo '{/.}' ::: /path/to/file.txt
parallel echo '{/.}' ::: /path/to/file.txt

Output: file

输出:file

{#} - Job number (1-based)

{#} - 作业编号(从1开始)

parallel echo 'Job {#}: {}' ::: A B C
parallel echo '作业 {#}: {}' ::: A B C

{%} - Slot number (recycled job slot)

{%} - 作业槽编号(循环复用的作业槽)

parallel -j 2 'echo "Slot {%}: {}"' ::: A B C D E
parallel -j 2 'echo "槽位 {%}: {}"' ::: A B C D E

Combined

组合使用

parallel 'convert {} -resize 50% {//}/thumb_{/.}.jpg' ::: *.png
undefined
parallel 'convert {} -resize 50% {//}/thumb_{/.}.jpg' ::: *.png
undefined

Progress and Logging

进度与日志

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Show progress bar

显示进度条

parallel --bar process_item ::: {1..100}
parallel --bar process_item ::: {1..100}

Progress with ETA

显示进度和预计完成时间

parallel --progress process_item ::: {1..100}
parallel --progress process_item ::: {1..100}

Verbose output

详细输出

parallel --verbose gzip ::: *.txt
parallel --verbose gzip ::: *.txt

Log to file

记录到日志文件

parallel --joblog jobs.log gzip ::: *.txt
parallel --joblog jobs.log gzip ::: *.txt

Resume from where it left off (skip completed jobs)

从断点恢复(跳过已完成的作业)

parallel --joblog jobs.log --resume gzip ::: *.txt
parallel --joblog jobs.log --resume gzip ::: *.txt

Results logging

结果日志

parallel --results results_dir 'echo {1} + {2}' ::: 1 2 3 ::: 4 5 6
parallel --results results_dir 'echo {1} + {2}' ::: 1 2 3 ::: 4 5 6

Creates: results_dir/1/4/stdout, results_dir/1/4/stderr, etc.

生成文件:results_dir/1/4/stdout, results_dir/1/4/stderr等

undefined
undefined

Resource Management

资源管理

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

CPU-based parallelism (number of cores)

基于CPU核心数的并行(使用所有核心)

parallel -j "$(nproc)" process_item ::: {1..1000}
parallel -j "$(nproc)" process_item ::: {1..1000}

Leave some cores free

预留部分核心

parallel -j '-2' process_item ::: {1..1000} # nproc - 2
parallel -j '-2' process_item ::: {1..1000} # nproc - 2

Percentage of cores

使用指定比例的核心

parallel -j '50%' process_item ::: {1..1000}
parallel -j '50%' process_item ::: {1..1000}

Load-based throttling

基于系统负载的限流

parallel --load 80% process_item ::: {1..1000}
parallel --load 80% process_item ::: {1..1000}

Memory-based throttling

基于可用内存的限流

parallel --memfree 2G process_item ::: {1..1000}
parallel --memfree 2G process_item ::: {1..1000}

Rate limiting (max jobs per second)

速率限制(每秒最大作业数)

parallel -j 4 --delay 0.5 wget ::: url1 url2 url3 url4
parallel -j 4 --delay 0.5 wget ::: url1 url2 url3 url4

Timeout per job

单作业超时时间

parallel --timeout 60 long_process ::: {1..100}
parallel --timeout 60 long_process ::: {1..100}

Retry failed jobs

重试失败的作业

parallel --retries 3 flaky_process ::: {1..100}
undefined
parallel --retries 3 flaky_process ::: {1..100}
undefined

Distributed Execution

分布式执行

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Run on multiple servers

在多台服务器上运行

parallel --sshloginfile servers.txt process_item ::: {1..1000}
parallel --sshloginfile servers.txt process_item ::: {1..1000}

servers.txt format:

servers.txt格式:

4/server1.example.com (4 jobs on server1)

4/server1.example.com (在server1上运行4个作业)

8/server2.example.com (8 jobs on server2)

8/server2.example.com (在server2上运行8个作业)

: (local machine)

: (本地机器)

Transfer files before execution

执行前传输文件

parallel --sshloginfile servers.txt --transferfile {} process {} ::: *.dat
parallel --sshloginfile servers.txt --transferfile {} process {} ::: *.dat

Return results

取回执行结果

parallel --sshloginfile servers.txt --return {.}.result process {} ::: *.dat
parallel --sshloginfile servers.txt --return {.}.result process {} ::: *.dat

Cleanup after transfer

传输后清理文件

parallel --sshloginfile servers.txt --transfer --return {.}.out --cleanup
'process {} > {.}.out' ::: *.dat
parallel --sshloginfile servers.txt --transfer --return {.}.out --cleanup
'process {} > {.}.out' ::: *.dat

Environment variables

环境变量传递

export MY_VAR="value" parallel --env MY_VAR --sshloginfile servers.txt 'echo $MY_VAR' ::: A B C
undefined
export MY_VAR="value" parallel --env MY_VAR --sshloginfile servers.txt 'echo $MY_VAR' ::: A B C
undefined

Complex Pipelines

复杂流水线

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Pipe mode - distribute stdin across workers

管道模式 - 将标准输入分配给多个工作进程

cat huge_file.txt | parallel --pipe -N1000 'sort | uniq -c'
cat huge_file.txt | parallel --pipe -N1000 'sort | uniq -c'

Block size for pipe mode

管道模式的块大小

cat data.bin | parallel --pipe --block 10M 'process_chunk'
cat data.bin | parallel --pipe --block 10M 'process_chunk'

Keep order of output

保持输出顺序

parallel --keep-order 'sleep $((RANDOM % 3)); echo {}' ::: A B C D E
parallel --keep-order 'sleep $((RANDOM % 3)); echo {}' ::: A B C D E

Group output (don't mix output from different jobs)

分组输出(不混合不同作业的输出)

parallel --group 'for i in 1 2 3; do echo "Job {}: line $i"; done' ::: A B C
parallel --group 'for i in 1 2 3; do echo "作业 {}: 第$i行"; done' ::: A B C

Tag output with job identifier

为输出添加作业标识符

parallel --tag 'echo "output from {}"' ::: A B C
parallel --tag 'echo "来自{}的输出"' ::: A B C

Sequence output (output as they complete, but grouped)

按完成顺序输出(但分组显示)

parallel --ungroup 'echo "Starting {}"; sleep 1; echo "Done {}"' ::: A B C
undefined
parallel --ungroup 'echo "开始执行{}"; sleep 1; echo "完成执行{}"' ::: A B C
undefined

xargs Parallelization

xargs并行化

Basic Parallel xargs

xargs基础并行用法

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

-P for parallel jobs

-P 指定并行作业数

find . -name "*.txt" | xargs -P 4 -I {} gzip {}
find . -name "*.txt" | xargs -P 4 -I {} gzip {}

-n for items per command

-n 指定每个命令处理的项目数

echo {1..100} | xargs -n 10 -P 4 echo "Batch:"
echo {1..100} | xargs -n 10 -P 4 echo "批次:"

Null-delimited for safety

空字符分隔(处理含特殊字符的路径更安全)

find . -name "*.txt" -print0 | xargs -0 -P 4 -I {} process {}
find . -name "*.txt" -print0 | xargs -0 -P 4 -I {} process {}

Multiple arguments per process

每个进程处理多个参数

cat urls.txt | xargs -P 10 -n 5 wget -q
cat urls.txt | xargs -P 10 -n 5 wget -q

Limit max total arguments

限制总参数数量

echo {1..1000} | xargs -P 4 --max-args=50 echo
undefined
echo {1..1000} | xargs -P 4 --max-args=50 echo
undefined

xargs with Complex Commands

xargs结合复杂命令

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Use sh -c for complex commands

使用sh -c执行复杂命令

find . -name "*.jpg" -print0 |
xargs -0 -P 4 -I {} sh -c 'convert "$1" -resize 50% "thumb_$(basename "$1")"' _ {}
find . -name "*.jpg" -print0 |
xargs -0 -P 4 -I {} sh -c 'convert "$1" -resize 50% "thumb_$(basename "$1")"' _ {}

Multiple placeholders

多个占位符

paste file1.txt file2.txt |
xargs -P 4 -n 2 sh -c 'diff "$1" "$2" > "diff_$(basename "$1" .txt).patch"' _
paste file1.txt file2.txt |
xargs -P 4 -n 2 sh -c 'diff "$1" "$2" > "diff_$(basename "$1" .txt).patch"' _

Process in batches

批量处理

find . -name "*.log" -print0 |
xargs -0 -P 4 -n 100 tar -czvf logs_batch.tar.gz
find . -name "*.log" -print0 |
xargs -0 -P 4 -n 100 tar -czvf logs_batch.tar.gz

With failure handling

失败处理

find . -name "*.dat" -print0 |
xargs -0 -P 4 -I {} sh -c 'process "$1" || echo "Failed: $1" >> failures.log' _ {}
undefined
find . -name "*.dat" -print0 |
xargs -0 -P 4 -I {} sh -c 'process "$1" || echo "失败: $1" >> failures.log' _ {}
undefined

Job Control Patterns

作业控制模式

Background Job Management

后台作业管理

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Track background jobs

跟踪后台作业的PID

declare -a PIDS=()
declare -a PIDS=()

Start jobs

启动作业

for item in {1..10}; do process_item "$item" & PIDS+=($!) done
for item in {1..10}; do process_item "$item" & PIDS+=($!) done

Wait for all

等待所有作业完成

for pid in "${PIDS[@]}"; do wait "$pid" done
echo "All jobs complete"
for pid in "${PIDS[@]}"; do wait "$pid" done
echo "所有作业已完成"

Or wait for any to complete

或等待任意一个作业完成

wait -n # Bash 4.3+ echo "At least one job complete"
undefined
wait -n # Bash 4.3+ echo "至少一个作业已完成"
undefined

Job Pool with Semaphore

基于信号量的作业池

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Maximum concurrent jobs

最大并发作业数

MAX_JOBS=4
MAX_JOBS=4

Simple semaphore using a counter

使用计数器实现简单信号量

job_count=0
run_with_limit() { local cmd=("$@")
# Wait if at limit
while ((job_count >= MAX_JOBS)); do
    wait -n 2>/dev/null || true
    ((job_count--))
done

# Start new job
"${cmd[@]}" &
((job_count++))
}
job_count=0
run_with_limit() { local cmd=("$@")
# 达到上限时等待
while ((job_count >= MAX_JOBS)); do
    wait -n 2>/dev/null || true
    ((job_count--))
done

# 启动新作业
"${cmd[@]}" &
((job_count++))
}

Usage

使用示例

for item in {1..20}; do run_with_limit process_item "$item" done
for item in {1..20}; do run_with_limit process_item "$item" done

Wait for remaining

等待剩余作业完成

wait
undefined
wait
undefined

FIFO-Based Job Pool

基于FIFO的作业池

bash
#!/usr/bin/env bash
set -euo pipefail

MAX_JOBS=4
JOB_FIFO="/tmp/job_pool_$$"
bash
#!/usr/bin/env bash
set -euo pipefail

MAX_JOBS=4
JOB_FIFO="/tmp/job_pool_$$"

Create job slots

创建作业槽

mkfifo "$JOB_FIFO" trap 'rm -f "$JOB_FIFO"' EXIT
mkfifo "$JOB_FIFO" trap 'rm -f "$JOB_FIFO"' EXIT

Initialize slots

初始化作业槽

exec 3<>"$JOB_FIFO" for ((i=0; i<MAX_JOBS; i++)); do echo >&3 done
exec 3<>"$JOB_FIFO" for ((i=0; i<MAX_JOBS; i++)); do echo >&3 done

Run with slot

使用作业槽运行作业

run_with_slot() { local cmd=("$@")
read -u 3  # Acquire slot (blocks if none available)

{
    "${cmd[@]}"
    echo >&3  # Release slot
} &
}
run_with_slot() { local cmd=("$@")
read -u 3  # 获取作业槽(无可用槽时阻塞)

{
    "${cmd[@]}"
    echo >&3  # 释放作业槽
} &
}

Usage

使用示例

for item in {1..20}; do run_with_slot process_item "$item" done
wait exec 3>&-
undefined
for item in {1..20}; do run_with_slot process_item "$item" done
wait exec 3>&-
undefined

Worker Pool Pattern

工作池模式

bash
#!/usr/bin/env bash
set -euo pipefail

WORK_QUEUE="/tmp/work_queue_$$"
RESULT_QUEUE="/tmp/result_queue_$$"
NUM_WORKERS=4

mkfifo "$WORK_QUEUE" "$RESULT_QUEUE"
trap 'rm -f "$WORK_QUEUE" "$RESULT_QUEUE"' EXIT
bash
#!/usr/bin/env bash
set -euo pipefail

WORK_QUEUE="/tmp/work_queue_$$"
RESULT_QUEUE="/tmp/result_queue_$$"
NUM_WORKERS=4

mkfifo "$WORK_QUEUE" "$RESULT_QUEUE"
trap 'rm -f "$WORK_QUEUE" "$RESULT_QUEUE"' EXIT

Worker function

工作进程函数

worker() { local id="$1" while read -r task; do [[ "$task" == "STOP" ]] && break
    # Process task
    local result
    result=$(process_task "$task" 2>&1)
    echo "RESULT:$id:$task:$result"
done
}
worker() { local id="$1" while read -r task; do [[ "$task" == "STOP" ]] && break
    # 处理任务
    local result
    result=$(process_task "$task" 2>&1)
    echo "RESULT:$id:$task:$result"
done
}

Start workers

启动工作进程

for ((i=0; i<NUM_WORKERS; i++)); do worker "$i" < "$WORK_QUEUE" > "$RESULT_QUEUE" & done
for ((i=0; i<NUM_WORKERS; i++)); do worker "$i" < "$WORK_QUEUE" > "$RESULT_QUEUE" & done

Result collector (background)

结果收集器(后台运行)

collect_results() { while read -r line; do [[ "$line" == "DONE" ]] && break echo "$line" >> results.txt done < "$RESULT_QUEUE" } & COLLECTOR_PID=$!
collect_results() { while read -r line; do [[ "$line" == "DONE" ]] && break echo "$line" >> results.txt done < "$RESULT_QUEUE" } & COLLECTOR_PID=$!

Producer - send work

生产者 - 发送任务

{ for task in "${TASKS[@]}"; do echo "$task" done
# Stop signals for workers
for ((i=0; i<NUM_WORKERS; i++)); do
    echo "STOP"
done
} > "$WORK_QUEUE"
{ for task in "${TASKS[@]}"; do echo "$task" done
# 向工作进程发送停止信号
for ((i=0; i<NUM_WORKERS; i++)); do
    echo "STOP"
done
} > "$WORK_QUEUE"

Signal end of results

通知结果收集器结束

wait # Wait for workers echo "DONE" > "$RESULT_QUEUE" wait "$COLLECTOR_PID"
undefined
wait # 等待所有工作进程完成 echo "DONE" > "$RESULT_QUEUE" wait "$COLLECTOR_PID"
undefined

Modern Async Patterns

现代异步模式

Promise-Like Pattern

类Promise模式

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Async function wrapper

异步函数包装器

async() { local result_var="$1" shift local cmd=("$@")
# Create temp file for result
local result_file
result_file=$(mktemp)

# Run in background, save result
{
    if "${cmd[@]}" > "$result_file" 2>&1; then
        echo "0" >> "$result_file.status"
    else
        echo "$?" >> "$result_file.status"
    fi
} &

# Store PID and result file location
eval "${result_var}_pid=$!"
eval "${result_var}_file='$result_file'"
}
async() { local result_var="$1" shift local cmd=("$@")
# 创建存储结果的临时文件
local result_file
result_file=$(mktemp)

# 后台运行命令并保存结果
{
    if "${cmd[@]}" > "$result_file" 2>&1; then
        echo "0" >> "$result_file.status"
    else
        echo "$?" >> "$result_file.status"
    fi
} &

# 存储PID和结果文件路径
eval "${result_var}_pid=$!"
eval "${result_var}_file='$result_file'"
}

Await result

等待结果返回

await() { local result_var="$1" local pid_var="${result_var}_pid" local file_var="${result_var}_file"
# Wait for completion
wait "${!pid_var}"

# Get result
cat "${!file_var}"
local status
status=$(cat "${!file_var}.status")

# Cleanup
rm -f "${!file_var}" "${!file_var}.status"

return "$status"
}
await() { local result_var="$1" local pid_var="${result_var}_pid" local file_var="${result_var}_file"
# 等待作业完成
wait "${!pid_var}"

# 获取结果
cat "${!file_var}"
local status
status=$(cat "${!file_var}.status")

# 清理临时文件
rm -f "${!file_var}" "${!file_var}.status"

return "$status"
}

Usage

使用示例

async result1 curl -s "https://api1.example.com/data" async result2 curl -s "https://api2.example.com/data" async result3 process_local_data
async result1 curl -s "https://api1.example.com/data" async result2 curl -s "https://api2.example.com/data" async result3 process_local_data

Do other work here...

在此处执行其他任务...

Get results (blocks until complete)

获取结果(阻塞直到完成)

data1=$(await result1) data2=$(await result2) data3=$(await result3)
undefined
data1=$(await result1) data2=$(await result2) data3=$(await result3)
undefined

Event Loop Pattern

事件循环模式

bash
#!/usr/bin/env bash
set -euo pipefail

declare -A TASKS
declare -A TASK_RESULTS
TASK_COUNTER=0
bash
#!/usr/bin/env bash
set -euo pipefail

declare -A TASKS
declare -A TASK_RESULTS
TASK_COUNTER=0

Register async task

注册异步任务

schedule() { local cmd=("$@") local task_id=$((++TASK_COUNTER)) local output_file="/tmp/task_${task_id}_$$"
"${cmd[@]}" > "$output_file" 2>&1 &

TASKS[$task_id]=$!
TASK_RESULTS[$task_id]="$output_file"

echo "$task_id"
}
schedule() { local cmd=("$@") local task_id=$((++TASK_COUNTER)) local output_file="/tmp/task_${task_id}_$$"
"${cmd[@]}" > "$output_file" 2>&1 &

TASKS[$task_id]=$!
TASK_RESULTS[$task_id]="$output_file"

echo "$task_id"
}

Check if task complete

检查任务是否完成

is_complete() { local task_id="$1" ! kill -0 "${TASKS[$task_id]}" 2>/dev/null }
is_complete() { local task_id="$1" ! kill -0 "${TASKS[$task_id]}" 2>/dev/null }

Get task result

获取任务结果

get_result() { local task_id="$1" wait "${TASKS[$task_id]}" 2>/dev/null || true cat "${TASK_RESULTS[$task_id]}" rm -f "${TASK_RESULTS[$task_id]}" }
get_result() { local task_id="$1" wait "${TASKS[$task_id]}" 2>/dev/null || true cat "${TASK_RESULTS[$task_id]}" rm -f "${TASK_RESULTS[$task_id]}" }

Event loop

事件循环

run_event_loop() { local pending=("${!TASKS[@]}")
while ((${#pending[@]} > 0)); do
    local still_pending=()

    for task_id in "${pending[@]}"; do
        if is_complete "$task_id"; then
            local result
            result=$(get_result "$task_id")
            on_task_complete "$task_id" "$result"
        else
            still_pending+=("$task_id")
        fi
    done

    pending=("${still_pending[@]}")

    # Small sleep to prevent busy-waiting
    ((${#pending[@]} > 0)) && sleep 0.1
done
}
run_event_loop() { local pending=("${!TASKS[@]}")
while ((${#pending[@]} > 0)); do
    local still_pending=()

    for task_id in "${pending[@]}"; do
        if is_complete "$task_id"; then
            local result
            result=$(get_result "$task_id")
            on_task_complete "$task_id" "$result"
        else
            still_pending+=("$task_id")
        fi
    done

    pending=("${still_pending[@]}")

    # 短暂休眠避免忙等
    ((${#pending[@]} > 0)) && sleep 0.1
done
}

Callback for completed tasks

任务完成回调

on_task_complete() { local task_id="$1" local result="$2" echo "Task $task_id complete: ${result:0:50}..." }
undefined
on_task_complete() { local task_id="$1" local result="$2" echo "任务 $task_id 完成: ${result:0:50}..." }
undefined

Fan-Out/Fan-In Pattern

扇出/扇入模式

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Fan-out: distribute work

扇出:分配任务

fan_out() { local -n items="$1" local workers="$2" local worker_func="$3"
local chunk_size=$(( (${#items[@]} + workers - 1) / workers ))
local pids=()

for ((i=0; i<workers; i++)); do
    local start=$((i * chunk_size))
    local chunk=("${items[@]:start:chunk_size}")

    if ((${#chunk[@]} > 0)); then
        $worker_func "${chunk[@]}" &
        pids+=($!)
    fi
done

# Return PIDs for fan_in
echo "${pids[*]}"
}
fan_out() { local -n items="$1" local workers="$2" local worker_func="$3"
local chunk_size=$(( (${#items[@]} + workers - 1) / workers ))
local pids=()

for ((i=0; i<workers; i++)); do
    local start=$((i * chunk_size))
    local chunk=("${items[@]:start:chunk_size}")

    if ((${#chunk[@]} > 0)); then
        $worker_func "${chunk[@]}" &
        pids+=($!)
    fi
done

# 返回PID用于扇入
echo "${pids[*]}"
}

Fan-in: collect results

扇入:收集结果

fan_in() { local -a pids=($1) local results=()
for pid in "${pids[@]}"; do
    wait "$pid"
done
}
fan_in() { local -a pids=($1) local results=()
for pid in "${pids[@]}"; do
    wait "$pid"
done
}

Example worker

示例工作函数

process_chunk() { local items=("$@") for item in "${items[@]}"; do echo "Processed: $item" done }
process_chunk() { local items=("$@") for item in "${items[@]}"; do echo "已处理: $item" done }

Usage

使用示例

data=({1..100}) pids=$(fan_out data 4 process_chunk) fan_in "$pids"
undefined
data=({1..100}) pids=$(fan_out data 4 process_chunk) fan_in "$pids"
undefined

Map-Reduce Pattern

映射-归约模式

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Map function

映射函数

parallel_map() { local -n input="$1" local map_func="$2" local workers="${3:-$(nproc)}"
printf '%s\n' "${input[@]}" | \
    parallel -j "$workers" "$map_func"
}
parallel_map() { local -n input="$1" local map_func="$2" local workers="${3:-$(nproc)}"
printf '%s\n' "${input[@]}" | \
    parallel -j "$workers" "$map_func"
}

Reduce function

归约函数

reduce() { local reduce_func="$1" local accumulator="$2"
while IFS= read -r value; do
    accumulator=$($reduce_func "$accumulator" "$value")
done

echo "$accumulator"
}
reduce() { local reduce_func="$1" local accumulator="$2"
while IFS= read -r value; do
    accumulator=$($reduce_func "$accumulator" "$value")
done

echo "$accumulator"
}

Example: Sum of squares

示例:平方和

square() { echo $(($1 * $1)); } add() { echo $(($1 + $2)); }
numbers=({1..100}) sum_of_squares=$( parallel_map numbers square 4 | reduce add 0 ) echo "Sum of squares: $sum_of_squares"
square() { echo $(($1 * $1)); } add() { echo $(($1 + $2)); }
numbers=({1..100}) sum_of_squares=$( parallel_map numbers square 4 | reduce add 0 ) echo "平方和: $sum_of_squares"

Word count example

词频统计示例

word_count_map() { tr ' ' '\n' | sort | uniq -c }
word_count_reduce() { sort -k2 | awk '{ if ($2 == prev) { count += $1 } else { if (prev) print count, prev; count = $1; prev = $2 } } END { if (prev) print count, prev }' }
cat large_text.txt |
parallel --pipe -N1000 word_count_map |
word_count_reduce
undefined
word_count_map() { tr ' ' '\n' | sort | uniq -c }
word_count_reduce() { sort -k2 | awk '{ if ($2 == prev) { count += $1 } else { if (prev) print count, prev; count = $1; prev = $2 } } END { if (prev) print count, prev }' }
cat large_text.txt |
parallel --pipe -N1000 word_count_map |
word_count_reduce
undefined

Performance Optimization

性能优化

Batch Processing

批量处理

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Process in optimal batch sizes

以最优批量大小处理

optimal_batch_process() { local items=("$@") local batch_size=100 local workers=$(nproc)
printf '%s\n' "${items[@]}" | \
    parallel --pipe -N"$batch_size" -j"$workers" '
        while IFS= read -r item; do
            process_item "$item"
        done
    '
}
optimal_batch_process() { local items=("$@") local batch_size=100 local workers=$(nproc)
printf '%s\n' "${items[@]}" | \
    parallel --pipe -N"$batch_size" -j"$workers" '
        while IFS= read -r item; do
            process_item "$item"
        done
    '
}

Dynamic batch sizing based on memory

基于内存的动态批量调整

dynamic_batch() { local mem_available mem_available=$(free -m | awk '/^Mem:/ {print $7}')
# Adjust batch size based on available memory
local batch_size=$((mem_available / 100))  # 100MB per batch
((batch_size < 10)) && batch_size=10
((batch_size > 1000)) && batch_size=1000

parallel --pipe -N"$batch_size" process_batch
}
undefined
dynamic_batch() { local mem_available mem_available=$(free -m | awk '/^Mem:/ {print $7}')
# 根据可用内存调整批量大小
local batch_size=$((mem_available / 100))  # 每个批量占用100MB内存
((batch_size < 10)) && batch_size=10
((batch_size > 1000)) && batch_size=1000

parallel --pipe -N"$batch_size" process_batch
}
undefined

I/O Optimization

I/O优化

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Use tmpfs for intermediate files

使用tmpfs存储中间文件

setup_fast_temp() { local tmpdir="/dev/shm/parallel_$$" mkdir -p "$tmpdir" trap 'rm -rf "$tmpdir"' EXIT echo "$tmpdir" }
setup_fast_temp() { local tmpdir="/dev/shm/parallel_$$" mkdir -p "$tmpdir" trap 'rm -rf "$tmpdir"' EXIT echo "$tmpdir" }

Buffer I/O operations

缓冲I/O操作

buffered_parallel() { local input="$1" local tmpdir tmpdir=$(setup_fast_temp)
# Split input into chunks
split -l 1000 "$input" "$tmpdir/chunk_"

# Process chunks in parallel
parallel process_chunk {} ::: "$tmpdir"/chunk_*

# Combine results
cat "$tmpdir"/result_* > output.txt
}
buffered_parallel() { local input="$1" local tmpdir tmpdir=$(setup_fast_temp)
# 将输入拆分为多个块
split -l 1000 "$input" "$tmpdir/chunk_"

# 并行处理块
parallel process_chunk {} ::: "$tmpdir"/chunk_*

# 合并结果
cat "$tmpdir"/result_* > output.txt
}

Avoid disk I/O with process substitution

使用进程替换避免磁盘I/O

no_disk_parallel() { # Instead of: # command > temp.txt # parallel process ::: temp.txt # rm temp.txt
# Do this:
command | parallel --pipe process
}
undefined
no_disk_parallel() { # 不要这样做: # command > temp.txt # parallel process ::: temp.txt # rm temp.txt
# 应该这样做:
command | parallel --pipe process
}
undefined

CPU Affinity

CPU亲和性

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Pin workers to specific CPUs

将工作进程绑定到特定CPU

cpu_pinned_parallel() { local num_cpus num_cpus=$(nproc)
for ((cpu=0; cpu<num_cpus; cpu++)); do
    taskset -c "$cpu" process_worker "$cpu" &
done

wait
}
cpu_pinned_parallel() { local num_cpus num_cpus=$(nproc)
for ((cpu=0; cpu<num_cpus; cpu++)); do
    taskset -c "$cpu" process_worker "$cpu" &
done

wait
}

NUMA-aware processing

NUMA感知处理

numa_parallel() { local num_nodes num_nodes=$(numactl --hardware | grep "available:" | awk '{print $2}')
for ((node=0; node<num_nodes; node++)); do
    numactl --cpunodebind="$node" --membind="$node" \
        process_chunk "$node" &
done

wait
}
undefined
numa_parallel() { local num_nodes num_nodes=$(numactl --hardware | grep "available:" | awk '{print $2}')
for ((node=0; node<num_nodes; node++)); do
    numactl --cpunodebind="$node" --membind="$node" \
        process_chunk "$node" &
done

wait
}
undefined

Error Handling

错误处理

Graceful Failure Handling

优雅的失败处理

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Track failures

跟踪失败的作业

declare -A FAILURES
parallel_with_retry() { local max_retries=3 local items=("$@")
for item in "${items[@]}"; do
    local retries=0
    local success=false

    while ((retries < max_retries)) && ! $success; do
        if process_item "$item"; then
            success=true
        else
            ((retries++))
            echo "Retry $retries for $item" >&2
            sleep $((retries * 2))  # Exponential backoff
        fi
    done

    if ! $success; then
        FAILURES["$item"]="Failed after $max_retries retries"
    fi
done &

wait
}
declare -A FAILURES
parallel_with_retry() { local max_retries=3 local items=("$@")
for item in "${items[@]}"; do
    local retries=0
    local success=false

    while ((retries < max_retries)) && ! $success; do
        if process_item "$item"; then
            success=true
        else
            ((retries++))
            echo "正在重试第$retries次: $item" >&2
            sleep $((retries * 2))  # 指数退避
        fi
    done

    if ! $success; then
        FAILURES["$item"]="经过$max_retries次重试后仍失败"
    fi
done &

wait
}

Report failures

报告失败情况

report_failures() { if ((${#FAILURES[@]} > 0)); then echo "Failures:" >&2 for item in "${!FAILURES[@]}"; do echo " $item: ${FAILURES[$item]}" >&2 done return 1 fi }
undefined
report_failures() { if ((${#FAILURES[@]} > 0)); then echo "失败列表:" >&2 for item in "${!FAILURES[@]}"; do echo " $item: ${FAILURES[$item]}" >&2 done return 1 fi }
undefined

Cancellation Support

取消支持

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Global cancellation flag

全局取消标志

CANCELLED=false declare -a WORKER_PIDS=()
cancel_all() { CANCELLED=true for pid in "${WORKER_PIDS[@]}"; do kill "$pid" 2>/dev/null || true done }
trap cancel_all SIGINT SIGTERM
cancellable_worker() { local id="$1" while ! $CANCELLED; do # Check for work if work=$(get_next_work); then process_work "$work" else sleep 0.1 fi done }
CANCELLED=false declare -a WORKER_PIDS=()
cancel_all() { CANCELLED=true for pid in "${WORKER_PIDS[@]}"; do kill "$pid" 2>/dev/null || true done }
trap cancel_all SIGINT SIGTERM
cancellable_worker() { local id="$1" while ! $CANCELLED; do # 检查是否有新任务 if work=$(get_next_work); then process_work "$work" else sleep 0.1 fi done }

Start workers

启动工作进程

for ((i=0; i<NUM_WORKERS; i++)); do cancellable_worker "$i" & WORKER_PIDS+=($!) done
for ((i=0; i<NUM_WORKERS; i++)); do cancellable_worker "$i" & WORKER_PIDS+=($!) done

Wait with interrupt support

等待并支持中断

wait || true
undefined
wait || true
undefined

Resources

参考资源


Master parallel processing for efficient multi-core utilization and faster script execution.

掌握并行处理,实现高效的多核利用和更快的脚本执行速度。