pg-boss
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePG Boss Job Queue
PG Boss 任务队列
PG Boss is a PostgreSQL-based job queue for Node.js. It stores jobs in the database, providing persistence and reliability.
PG Boss 是一款基于PostgreSQL的Node.js任务队列工具。它将任务存储在数据库中,具备持久性与可靠性。
Quick Setup
快速设置
typescript
import { PgBoss } from 'pg-boss';
const boss = new PgBoss({
connectionString: process.env.DATABASE_URL,
schema: 'pgboss',
monitorIntervalSeconds: 30,
});
await boss.start();typescript
import { PgBoss } from 'pg-boss';
const boss = new PgBoss({
connectionString: process.env.DATABASE_URL,
schema: 'pgboss',
monitorIntervalSeconds: 30,
});
await boss.start();Common Patterns
常见使用模式
1. Scheduled Jobs (Cron-like)
1. 定时任务(类Cron风格)
typescript
// v12: queue must exist before schedule/work
await boss.createQueue('my-job', {
retryLimit: 3,
retryDelay: 60,
expireInSeconds: 600,
deleteAfterSeconds: 7 * 24 * 60 * 60,
});
// Schedule job to run every minute
await boss.schedule('my-job', '* * * * *', {}, {
tz: 'UTC',
singletonKey: 'my-job-schedule', // Prevents duplicates on restart
});
// Register handler
await boss.work('my-job', { pollingIntervalSeconds: 10 }, async (jobs) => {
for (const job of jobs) {
// Process job
}
});typescript
// v12: queue must exist before schedule/work
await boss.createQueue('my-job', {
retryLimit: 3,
retryDelay: 60,
expireInSeconds: 600,
deleteAfterSeconds: 7 * 24 * 60 * 60,
});
// Schedule job to run every minute
await boss.schedule('my-job', '* * * * *', {}, {
tz: 'UTC',
singletonKey: 'my-job-schedule', // Prevents duplicates on restart
});
// Register handler
await boss.work('my-job', { pollingIntervalSeconds: 10 }, async (jobs) => {
for (const job of jobs) {
// Process job
}
});2. Queuing Jobs
2. 任务入队
typescript
// Queue a single job
await boss.send('process-user', { userId: '123' });
// Queue with options
await boss.send('send-email', { to: 'user@example.com' }, {
retryLimit: 5,
expireInSeconds: 300,
});typescript
// Queue a single job
await boss.send('process-user', { userId: '123' });
// Queue with options
await boss.send('send-email', { to: 'user@example.com' }, {
retryLimit: 5,
expireInSeconds: 300,
});3. Batch Processing with Concurrency
3. 并发批量处理
typescript
await boss.work(
'batch-job',
{ batchSize: 10 }, // Process 10 jobs at once
async (jobs) => {
await Promise.all(jobs.map(job => processJob(job)));
}
);typescript
await boss.work(
'batch-job',
{ batchSize: 10 }, // Process 10 jobs at once
async (jobs) => {
await Promise.all(jobs.map(job => processJob(job)));
}
);Reliability Patterns
可靠性模式
Singleton Pattern (Prevent Duplicates)
单例模式(防止重复实例)
typescript
// Use getPgBoss() singleton instead of creating new instances
let boss: PgBoss | null = null;
export async function getPgBoss(): Promise<PgBoss> {
if (boss) return boss;
boss = new PgBoss({ connectionString: process.env.DATABASE_URL });
await boss.start();
return boss;
}typescript
// Use getPgBoss() singleton instead of creating new instances
let boss: PgBoss | null = null;
export async function getPgBoss(): Promise<PgBoss> {
if (boss) return boss;
boss = new PgBoss({ connectionString: process.env.DATABASE_URL });
await boss.start();
return boss;
}Watchdog for Auto-Recovery
自动恢复监控器
PG Boss can stop unexpectedly (connection drops, crashes). Add a watchdog:
typescript
let boss: PgBoss | null = null;
let watchdogInterval: NodeJS.Timeout | null = null;
// Listen for unexpected stops
boss.on('stopped', () => {
console.error('[PG Boss] Stopped unexpectedly');
boss = null;
// Watchdog will attempt recovery
});
// Watchdog checks every 60s
watchdogInterval = setInterval(async () => {
if (!boss && workerRegistrationFn) {
console.log('[PG Boss] Attempting recovery...');
await workerRegistrationFn();
}
}, 60000);PG Boss 可能会意外停止(连接断开、崩溃)。添加监控器:
typescript
let boss: PgBoss | null = null;
let watchdogInterval: NodeJS.Timeout | null = null;
// Listen for unexpected stops
boss.on('stopped', () => {
console.error('[PG Boss] Stopped unexpectedly');
boss = null;
// Watchdog will attempt recovery
});
// Watchdog checks every 60s
watchdogInterval = setInterval(async () => {
if (!boss && workerRegistrationFn) {
console.log('[PG Boss] Attempting recovery...');
await workerRegistrationFn();
}
}, 60000);Health Check Integration
健康检查集成
Make health check fail when PG Boss is dead (triggers container restart):
typescript
app.get('/health', (c) => {
const pgBossRunning = boss !== null;
if (!pgBossRunning) {
return c.json({ status: 'degraded', pgBossRunning: false }, 503);
}
return c.json({ status: 'ok', pgBossRunning: true });
});当PG Boss停止时,让健康检查失败(触发容器重启):
typescript
app.get('/health', (c) => {
const pgBossRunning = boss !== null;
if (!pgBossRunning) {
return c.json({ status: 'degraded', pgBossRunning: false }, 503);
}
return c.json({ status: 'ok', pgBossRunning: true });
});Debugging
调试方法
Check PG Boss State in Database
在数据库中查看PG Boss状态
sql
-- Check scheduled jobs
SELECT name, cron FROM pgboss.schedule;
-- Check recent jobs
SELECT name, state, created_on, completed_on
FROM pgboss.job
ORDER BY created_on DESC
LIMIT 20;
-- Check job queue size
SELECT name, COUNT(*)
FROM pgboss.job
WHERE state = 'created'
GROUP BY name;sql
-- Check scheduled jobs
SELECT name, cron FROM pgboss.schedule;
-- Check recent jobs
SELECT name, state, created_on, completed_on
FROM pgboss.job
ORDER BY created_on DESC
LIMIT 20;
-- Check job queue size
SELECT name, COUNT(*)
FROM pgboss.job
WHERE state = 'created'
GROUP BY name;Via Node.js
通过Node.js调试
javascript
const boss = await getPgBoss();
// Get queue size
const pending = await boss.getQueueSize('my-job');
// Get job by ID
const job = await boss.getJobById(jobId);javascript
const boss = await getPgBoss();
// Get queue size
const pending = await boss.getQueueSize('my-job');
// Get job by ID
const job = await boss.getJobById(jobId);Railway-Specific Debugging
Railway 专属调试方法
bash
undefinedbash
undefinedCheck environment variables
Check environment variables
railway variables --service api
railway variables --service api
Get public DATABASE_URL for direct access
Get public DATABASE_URL for direct access
railway variables --service Postgres | grep DATABASE_PUBLIC_URL
railway variables --service Postgres | grep DATABASE_PUBLIC_URL
Query database directly
Query database directly
DATABASE_URL="postgresql://..." node -e "
const { Pool } = require('pg');
const pool = new Pool({ connectionString: process.env.DATABASE_URL, ssl: { rejectUnauthorized: false } });
// Run queries
"
undefinedDATABASE_URL="postgresql://..." node -e "
const { Pool } = require('pg');
const pool = new Pool({ connectionString: process.env.DATABASE_URL, ssl: { rejectUnauthorized: false } });
// Run queries
"
undefinedCommon Issues
常见问题
| Issue | Cause | Fix |
|---|---|---|
| Jobs not running | PG Boss not started | Check health endpoint, verify DATABASE_URL |
| Duplicate jobs | Missing | Add singletonKey to |
| Queue not found | | Call |
| Jobs stuck in 'created' | No worker registered | Call |
| Connection errors | Database URL wrong/expired | Check DATABASE_URL, SSL settings |
| Jobs not persisting | Wrong schema | Verify |
| Worker over-polling | Wrong option name | Use |
Next.js build fails ( | instrumentation bundles server-only deps | Use |
| 问题 | 原因 | 解决方法 |
|---|---|---|
| 任务未执行 | PG Boss未启动 | 检查健康检查端点,验证DATABASE_URL配置 |
| 重复任务 | 缺少 | 在 |
| 队列未找到 | | 先调用 |
| 任务卡在'created'状态 | 未注册对应的工作进程 | 为该队列调用 |
| 连接错误 | 数据库URL错误/过期 | 检查DATABASE_URL及SSL设置 |
| 任务未持久化 | 架构配置错误 | 验证 |
| 工作进程轮询过于频繁 | 配置项名称错误 | 使用 |
Next.js构建失败( | 工具打包了仅服务器端依赖 | 在工具动态导入时添加 |
Best Practices
最佳实践
- Always use singletonKey for scheduled jobs to prevent duplicates on restart
- Create queues explicitly in PG Boss v10+:
await boss.createQueue('my-job') - Handle the 'stopped' event to detect unexpected shutdowns
- Use a watchdog to auto-recover from crashes (clear old interval before starting a new one)
- Return 503 in health check when PG Boss is dead (triggers container restart)
- Log job errors but don't swallow them - let PG Boss retry
- Use separate workers for different job types (rollover, email, etc.)
- Graceful shutdown: call on SIGTERM/SIGINT
boss.stop({ graceful: true, timeout: 30000 }) - Next.js instrumentation: load server-only job modules with
import(/* webpackIgnore: true */ ...)
- 定时任务务必使用singletonKey,避免重启后出现重复任务
- 在PG Boss v10+中显式创建队列:
await boss.createQueue('my-job') - 监听'stopped'事件,以检测意外关闭
- 使用监控器,实现崩溃后自动恢复(启动新的监控器前请清除旧的定时器)
- 当PG Boss停止时,健康检查返回503,触发容器重启
- 记录任务错误但不要吞掉错误,让PG Boss自动重试
- 为不同类型的任务使用独立的工作进程(如轮转任务、邮件任务等)
- 优雅关闭:在收到SIGTERM/SIGINT信号时调用
boss.stop({ graceful: true, timeout: 30000 }) - Next.js工具集成:使用加载仅服务器端的任务模块
import(/* webpackIgnore: true */ ...)
File Structure Example
文件结构示例
apps/api/src/
├── lib/
│ └── pgboss.ts # Singleton, getPgBoss(), stopPgBoss()
├── workers/
│ ├── index.ts # registerAllWorkers()
│ ├── rollover/
│ │ ├── index.ts # registerRolloverWorkers()
│ │ ├── timezone-check.ts
│ │ └── batch-processor.ts
│ └── email/
│ ├── index.ts # registerEmailWorkers()
│ └── send-email.ts
└── index.ts # Calls registerAllWorkers() on startupapps/api/src/
├── lib/
│ └── pgboss.ts # 单例、getPgBoss()、stopPgBoss()
├── workers/
│ ├── index.ts # registerAllWorkers()
│ ├── rollover/
│ │ ├── index.ts # registerRolloverWorkers()
│ │ ├── timezone-check.ts
│ │ └── batch-processor.ts
│ └── email/
│ ├── index.ts # registerEmailWorkers()
│ └── send-email.ts
└── index.ts # 启动时调用registerAllWorkers()Timezone-Aware Scheduling
时区感知型调度
For jobs that need to run at specific local times (like midnight rollover):
typescript
import { toZonedTime } from 'date-fns-tz';
// Check if it's midnight in a timezone
const zonedNow = toZonedTime(new Date(), userTimezone);
const hour = zonedNow.getHours();
const minute = zonedNow.getMinutes();
// 10-minute window for reliability
const isMidnightWindow = hour === 0 && minute <= 10;对于需要在特定本地时间执行的任务(如午夜轮转):
typescript
import { toZonedTime } from 'date-fns-tz';
// Check if it's midnight in a timezone
const zonedNow = toZonedTime(new Date(), userTimezone);
const hour = zonedNow.getHours();
const minute = zonedNow.getMinutes();
// 10-minute window for reliability
const isMidnightWindow = hour === 0 && minute <= 10;Idempotency
幂等性处理
Use a log table to prevent duplicate processing:
typescript
// Check if already processed
const existing = await db.query.logs.findFirst({
where: and(
eq(logs.timezone, timezone),
eq(logs.date, targetDate)
),
});
if (existing) return; // Already processed
// Process and log
await processJobs();
await db.insert(logs).values({ timezone, date: targetDate, status: 'completed' });使用日志表防止重复处理:
typescript
// Check if already processed
const existing = await db.query.logs.findFirst({
where: and(
eq(logs.timezone, timezone),
eq(logs.date, targetDate)
),
});
if (existing) return; // Already processed
// Process and log
await processJobs();
await db.insert(logs).values({ timezone, date: targetDate, status: 'completed' });