Loading...
Loading...
Process substitution, named pipes (FIFOs), and advanced IPC patterns for efficient bash data streaming (2025)
npx skill4agent add josiahsiegel/claude-plugin-marketplace process-substitution-fifos\/<(command)#!/usr/bin/env bash
set -euo pipefail
# Compare two command outputs
diff <(sort file1.txt) <(sort file2.txt)
# Compare remote and local files
diff <(ssh server 'cat /etc/config') /etc/config
# Merge sorted files
sort -m <(sort file1.txt) <(sort file2.txt) <(sort file3.txt)
# Read from multiple sources simultaneously
paste <(cut -f1 data.tsv) <(cut -f3 data.tsv)
# Feed command output to programs expecting files
# Many programs require filename arguments, not stdin
wc -l <(grep "error" *.log)
# Process API response with tool expecting file
jq '.items[]' <(curl -s "https://api.example.com/data")
# Source environment from command output
source <(aws configure export-credentials --format env)
# Feed to while loop without subshell issues
while IFS= read -r line; do
((count++))
process "$line"
done < <(find . -name "*.txt")
echo "Processed $count files" # Variable survives!>(command)#!/usr/bin/env bash
set -euo pipefail
# Write to multiple destinations simultaneously (tee alternative)
echo "Log message" | tee >(logger -t myapp) >(mail -s "Alert" admin@example.com)
# Compress and checksum in one pass
tar cf - /data | tee >(gzip > backup.tar.gz) >(sha256sum > backup.sha256)
# Send output to multiple processors
generate_data | tee >(processor1 > result1.txt) >(processor2 > result2.txt) > /dev/null
# Log and process simultaneously
./build.sh 2>&1 | tee >(grep -i error > errors.log) >(grep -i warning > warnings.log)
# Real-time filtering with multiple outputs
tail -f /var/log/syslog | tee \
>(grep --line-buffered "ERROR" >> errors.log) \
>(grep --line-buffered "WARNING" >> warnings.log) \
>(grep --line-buffered "CRITICAL" | mail -s "Critical Alert" admin@example.com)#!/usr/bin/env bash
set -euo pipefail
# Transform and compare
diff <(sort input.txt | uniq) <(sort reference.txt | uniq)
# Pipeline with multiple branches
cat data.csv | tee \
>(awk -F, '{print $1}' > column1.txt) \
>(awk -F, '{print $2}' > column2.txt) \
| wc -l
# Complex data flow
process_data() {
local input="$1"
# Read from process substitution, write to multiple outputs
while IFS= read -r line; do
echo "$line" | tee \
>(echo "LOG: $line" >> "$log_file") \
>(process_line "$line" >> results.txt)
done < <(cat "$input" | filter_input)
}#!/usr/bin/env bash
set -euo pipefail
# Create FIFO
mkfifo my_pipe
# Clean up on exit
trap 'rm -f my_pipe' EXIT
# Writer (in background or separate terminal)
echo "Hello from writer" > my_pipe &
# Reader (blocks until data available)
cat < my_pipe
# With timeout (using read)
if read -t 5 line < my_pipe; then
echo "Received: $line"
else
echo "Timeout waiting for data"
fi#!/usr/bin/env bash
set -euo pipefail
# Create two FIFOs for bidirectional communication
REQUEST_PIPE="/tmp/request_$$"
RESPONSE_PIPE="/tmp/response_$$"
mkfifo "$REQUEST_PIPE" "$RESPONSE_PIPE"
trap 'rm -f "$REQUEST_PIPE" "$RESPONSE_PIPE"' EXIT
# Server process
server() {
while true; do
if read -r request < "$REQUEST_PIPE"; then
case "$request" in
"QUIT")
echo "BYE" > "$RESPONSE_PIPE"
break
;;
"TIME")
date > "$RESPONSE_PIPE"
;;
"UPTIME")
uptime > "$RESPONSE_PIPE"
;;
*)
echo "UNKNOWN: $request" > "$RESPONSE_PIPE"
;;
esac
fi
done
}
# Client function
send_request() {
local request="$1"
echo "$request" > "$REQUEST_PIPE"
cat < "$RESPONSE_PIPE"
}
# Start server in background
server &
SERVER_PID=$!
# Send requests
send_request "TIME"
send_request "UPTIME"
send_request "QUIT"
wait "$SERVER_PID"#!/usr/bin/env bash
set -euo pipefail
WORK_QUEUE="/tmp/work_queue_$$"
mkfifo "$WORK_QUEUE"
trap 'rm -f "$WORK_QUEUE"' EXIT
# Producer
producer() {
local item
for item in {1..100}; do
echo "TASK:$item"
done
echo "DONE"
}
# Consumer (can have multiple)
consumer() {
local id="$1"
while read -r item; do
[[ "$item" == "DONE" ]] && break
echo "Consumer $id processing: $item"
sleep 0.1 # Simulate work
done
}
# Start consumers (they'll block waiting for data)
consumer 1 < "$WORK_QUEUE" &
consumer 2 < "$WORK_QUEUE" &
consumer 3 < "$WORK_QUEUE" &
# Start producer
producer > "$WORK_QUEUE"
wait
echo "All work complete"#!/usr/bin/env bash
set -euo pipefail
FIFO="/tmp/fd_fifo_$$"
mkfifo "$FIFO"
trap 'rm -f "$FIFO"' EXIT
# Open FIFO for read/write on FD 3
# Opening for both prevents blocking on open
exec 3<>"$FIFO"
# Write to FIFO via FD
echo "Message 1" >&3
echo "Message 2" >&3
# Read from FIFO via FD
read -r msg1 <&3
read -r msg2 <&3
echo "Got: $msg1, $msg2"
# Close FD
exec 3>&-#!/usr/bin/env bash
set -euo pipefail
# Start coprocess (bidirectional pipe)
coproc BC { bc -l; }
# Send data to coprocess
echo "scale=10; 355/113" >&"${BC[1]}"
# Read result
read -r result <&"${BC[0]}"
echo "Pi approximation: $result"
# More calculations
echo "sqrt(2)" >&"${BC[1]}"
read -r sqrt2 <&"${BC[0]}"
echo "Square root of 2: $sqrt2"
# Close write end to signal EOF
exec {BC[1]}>&-
# Wait for coprocess to finish
wait "$BC_PID"#!/usr/bin/env bash
set -euo pipefail
# Named coprocess for Python interpreter
coproc PYTHON { python3 -u -c "
import sys
for line in sys.stdin:
exec(line.strip())
"; }
# Send Python commands
echo "print('Hello from Python')" >&"${PYTHON[1]}"
read -r output <&"${PYTHON[0]}"
echo "Python said: $output"
echo "print(2**100)" >&"${PYTHON[1]}"
read -r big_num <&"${PYTHON[0]}"
echo "2^100 = $big_num"
# Cleanup
exec {PYTHON[1]}>&-
wait "$PYTHON_PID" 2>/dev/null || true#!/usr/bin/env bash
set -euo pipefail
# Create pool of worker coprocesses
declare -A WORKERS
declare -A WORKER_PIDS
start_workers() {
local count="$1"
local i
for ((i=0; i<count; i++)); do
# Each worker runs a processing loop
coproc "WORKER_$i" {
while IFS= read -r task; do
[[ "$task" == "QUIT" ]] && exit 0
# Simulate work
sleep 0.1
echo "DONE:$task"
done
}
# Store FDs dynamically
local -n write_fd="WORKER_${i}[1]"
local -n read_fd="WORKER_${i}[0]"
local -n pid="WORKER_${i}_PID"
WORKERS["$i,in"]="$write_fd"
WORKERS["$i,out"]="$read_fd"
WORKER_PIDS["$i"]="$pid"
done
}
# Note: Coprocess pool management is complex
# Consider GNU Parallel for production workloads#!/usr/bin/env bash
set -euo pipefail
PROGRESS_PIPE="/tmp/progress_$$"
mkfifo "$PROGRESS_PIPE"
trap 'rm -f "$PROGRESS_PIPE"' EXIT
# Progress monitor
monitor_progress() {
local total="$1"
local current=0
while read -r update; do
((current++))
local pct=$((current * 100 / total))
printf "\rProgress: [%-50s] %d%%" \
"$(printf '#%.0s' $(seq 1 $((pct/2))))" "$pct"
done < "$PROGRESS_PIPE"
echo
}
# Worker that reports progress
do_work() {
local items=("$@")
local item
for item in "${items[@]}"; do
process_item "$item"
echo "done" > "$PROGRESS_PIPE"
done
}
# Usage
items=(item1 item2 item3 ... item100)
monitor_progress "${#items[@]}" &
MONITOR_PID=$!
do_work "${items[@]}"
exec 3>"$PROGRESS_PIPE" # Keep pipe open
exec 3>&- # Close to signal completion
wait "$MONITOR_PID"#!/usr/bin/env bash
set -euo pipefail
LOG_DIR="/tmp/logs_$$"
mkdir -p "$LOG_DIR"
# Create FIFOs for each log level
for level in DEBUG INFO WARN ERROR; do
mkfifo "$LOG_DIR/$level"
done
trap 'rm -rf "$LOG_DIR"' EXIT
# Aggregator process
aggregate_logs() {
local output_file="$1"
# Open all FIFOs for reading
exec 3<"$LOG_DIR/DEBUG"
exec 4<"$LOG_DIR/INFO"
exec 5<"$LOG_DIR/WARN"
exec 6<"$LOG_DIR/ERROR"
while true; do
# Use select-like behavior with read timeout
read -t 0.1 -r msg <&3 && echo "[DEBUG] $(date '+%H:%M:%S') $msg" >> "$output_file"
read -t 0.1 -r msg <&4 && echo "[INFO] $(date '+%H:%M:%S') $msg" >> "$output_file"
read -t 0.1 -r msg <&5 && echo "[WARN] $(date '+%H:%M:%S') $msg" >> "$output_file"
read -t 0.1 -r msg <&6 && echo "[ERROR] $(date '+%H:%M:%S') $msg" >> "$output_file"
done
}
# Logging functions
log_debug() { echo "$*" > "$LOG_DIR/DEBUG"; }
log_info() { echo "$*" > "$LOG_DIR/INFO"; }
log_warn() { echo "$*" > "$LOG_DIR/WARN"; }
log_error() { echo "$*" > "$LOG_DIR/ERROR"; }
# Start aggregator
aggregate_logs "/var/log/app.log" &
AGGREGATOR_PID=$!
# Application code uses logging functions
log_info "Application started"
log_debug "Processing item"
log_warn "Resource running low"
log_error "Critical failure"
# Cleanup
kill "$AGGREGATOR_PID" 2>/dev/null#!/usr/bin/env bash
set -euo pipefail
# Buffered pipeline stage
buffered_stage() {
local name="$1"
local buffer_size="${2:-100}"
local buffer=()
while IFS= read -r line || [[ ${#buffer[@]} -gt 0 ]]; do
if [[ -n "$line" ]]; then
buffer+=("$line")
fi
# Flush when buffer full or EOF
if [[ ${#buffer[@]} -ge $buffer_size ]] || [[ -z "$line" && ${#buffer[@]} -gt 0 ]]; then
printf '%s\n' "${buffer[@]}" | process_batch
buffer=()
fi
done
}
# Parallel pipeline with process substitution
run_parallel_pipeline() {
local input="$1"
cat "$input" | \
tee >(filter_a | transform_a > output_a.txt) \
>(filter_b | transform_b > output_b.txt) \
>(filter_c | transform_c > output_c.txt) \
> /dev/null
# Wait for all background processes
wait
}#!/usr/bin/env bash
set -euo pipefail
# Stream JSON array elements
stream_json_array() {
local url="$1"
# Use jq to stream array elements one per line
curl -s "$url" | jq -c '.items[]' | while IFS= read -r item; do
process_json_item "$item"
done
}
# Parallel JSON processing with process substitution
parallel_json_process() {
local input="$1"
local workers=4
# Split input across workers
jq -c '.[]' "$input" | \
parallel --pipe -N100 --jobs "$workers" '
while IFS= read -r item; do
echo "$item" | jq ".processed = true"
done
' | jq -s '.'
}
# Transform JSON stream
transform_json_stream() {
jq -c '.' | while IFS= read -r obj; do
# Process with bash
local id
id=$(echo "$obj" | jq -r '.id')
# Enrich and output
echo "$obj" | jq --arg ts "$(date -Iseconds)" '. + {timestamp: $ts}'
done
}#!/usr/bin/env bash
# Requires Bash 5.3+
set -euo pipefail
# Traditional: forks subshell
result=$(echo "hello")
# Bash 5.3: No fork, runs in current shell
result=${ echo "hello"; }
# Significant for variable modifications
counter=0
# Traditional - counter stays 0 (subshell)
result=$(counter=$((counter + 1)); echo "$counter")
echo "Counter: $counter" # Still 0
# Bash 5.3 - counter is modified (same shell)
result=${ counter=$((counter + 1)); echo "$counter"; }
echo "Counter: $counter" # Now 1
# REPLY variable syntax (even more concise)
${ REPLY="computed value"; }
echo "$REPLY"
# Or using ${| } syntax
${| REPLY=$(expensive_computation); }
echo "Result: $REPLY"#!/usr/bin/env bash
# Requires Bash 5.3+
set -euo pipefail
# Build result without forks
build_path() {
local parts=("$@")
local result=""
for part in "${parts[@]}"; do
# No fork for each concatenation
result=${ printf '%s/%s' "$result" "$part"; }
done
echo "${result#/}"
}
# Accumulate values efficiently
accumulate() {
local -n arr="$1"
local sum=0
for val in "${arr[@]}"; do
# In-shell arithmetic capture
sum=${ echo $((sum + val)); }
done
echo "$sum"
}#!/usr/bin/env bash
set -euo pipefail
# Check all pipeline stages
run_pipeline() {
local result
# pipefail ensures we catch errors in any stage
if ! result=$(stage1 | stage2 | stage3); then
echo "Pipeline failed" >&2
return 1
fi
echo "$result"
}
# PIPESTATUS for detailed error info
run_with_status() {
cmd1 | cmd2 | cmd3
local -a status=("${PIPESTATUS[@]}")
for i in "${!status[@]}"; do
if [[ "${status[$i]}" -ne 0 ]]; then
echo "Stage $i failed with status ${status[$i]}" >&2
fi
done
# Return highest exit status
local max=0
for s in "${status[@]}"; do
((s > max)) && max="$s"
done
return "$max"
}#!/usr/bin/env bash
set -euo pipefail
# Track resources for cleanup
declare -a CLEANUP_PIDS=()
declare -a CLEANUP_FILES=()
cleanup() {
local pid file
for pid in "${CLEANUP_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
for file in "${CLEANUP_FILES[@]}"; do
rm -f "$file" 2>/dev/null || true
done
}
trap cleanup EXIT
# Register cleanup
register_pid() { CLEANUP_PIDS+=("$1"); }
register_file() { CLEANUP_FILES+=("$1"); }
# Example usage
run_safe_pipeline() {
local fifo="/tmp/pipeline_$$"
mkfifo "$fifo"
register_file "$fifo"
producer > "$fifo" &
register_pid "$!"
consumer < "$fifo" &
register_pid "$!"
wait
}#!/usr/bin/env bash
set -euo pipefail
# Include PID and descriptive name
create_fifo() {
local name="$1"
local fifo="/tmp/${name}_$$_$(date +%s)"
mkfifo -m 600 "$fifo" # Restrictive permissions
echo "$fifo"
}
# Use tmpdir for security
create_secure_fifo() {
local name="$1"
local tmpdir
tmpdir=$(mktemp -d)
local fifo="$tmpdir/$name"
mkfifo -m 600 "$fifo"
echo "$fifo"
}#!/usr/bin/env bash
set -euo pipefail
# ✗ DEADLOCK - writer blocks, reader never starts
# mkfifo pipe
# echo "data" > pipe # Blocks forever
# ✓ SAFE - open both ends or use background
mkfifo pipe
trap 'rm -f pipe' EXIT
# Option 1: Background writer
echo "data" > pipe &
cat < pipe
# Option 2: Open for read/write
exec 3<>pipe
echo "data" >&3
read -r data <&3
exec 3>&-
# Option 3: Non-blocking open (requires careful handling)
exec 3<pipe &
exec 4>pipe
echo "data" >&4
read -r data <&3#!/usr/bin/env bash
set -euo pipefail
# Read with timeout
read_with_timeout() {
local fifo="$1"
local timeout="$2"
local result
if read -t "$timeout" -r result < "$fifo"; then
echo "$result"
return 0
else
echo "Timeout after ${timeout}s" >&2
return 1
fi
}
# Write with timeout (using timeout command)
write_with_timeout() {
local fifo="$1"
local timeout="$2"
local data="$3"
if timeout "$timeout" bash -c "echo '$data' > '$fifo'"; then
return 0
else
echo "Write timeout after ${timeout}s" >&2
return 1
fi
}