pg-boss

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

PG Boss Job Queue

PG Boss 任务队列

PG Boss is a PostgreSQL-based job queue for Node.js. It stores jobs in the database, providing persistence and reliability.
PG Boss 是一款基于PostgreSQL的Node.js任务队列工具。它将任务存储在数据库中,具备持久性与可靠性。

Quick Setup

快速设置

typescript
import { PgBoss } from 'pg-boss';

const boss = new PgBoss({
  connectionString: process.env.DATABASE_URL,
  schema: 'pgboss',
  monitorIntervalSeconds: 30,
});

await boss.start();
typescript
import { PgBoss } from 'pg-boss';

const boss = new PgBoss({
  connectionString: process.env.DATABASE_URL,
  schema: 'pgboss',
  monitorIntervalSeconds: 30,
});

await boss.start();

Common Patterns

常见使用模式

1. Scheduled Jobs (Cron-like)

1. 定时任务(类Cron风格)

typescript
// v12: queue must exist before schedule/work
await boss.createQueue('my-job', {
  retryLimit: 3,
  retryDelay: 60,
  expireInSeconds: 600,
  deleteAfterSeconds: 7 * 24 * 60 * 60,
});

// Schedule job to run every minute
await boss.schedule('my-job', '* * * * *', {}, {
  tz: 'UTC',
  singletonKey: 'my-job-schedule',  // Prevents duplicates on restart
});

// Register handler
await boss.work('my-job', { pollingIntervalSeconds: 10 }, async (jobs) => {
  for (const job of jobs) {
    // Process job
  }
});
typescript
// v12: queue must exist before schedule/work
await boss.createQueue('my-job', {
  retryLimit: 3,
  retryDelay: 60,
  expireInSeconds: 600,
  deleteAfterSeconds: 7 * 24 * 60 * 60,
});

// Schedule job to run every minute
await boss.schedule('my-job', '* * * * *', {}, {
  tz: 'UTC',
  singletonKey: 'my-job-schedule',  // Prevents duplicates on restart
});

// Register handler
await boss.work('my-job', { pollingIntervalSeconds: 10 }, async (jobs) => {
  for (const job of jobs) {
    // Process job
  }
});

2. Queuing Jobs

2. 任务入队

typescript
// Queue a single job
await boss.send('process-user', { userId: '123' });

// Queue with options
await boss.send('send-email', { to: 'user@example.com' }, {
  retryLimit: 5,
  expireInSeconds: 300,
});
typescript
// Queue a single job
await boss.send('process-user', { userId: '123' });

// Queue with options
await boss.send('send-email', { to: 'user@example.com' }, {
  retryLimit: 5,
  expireInSeconds: 300,
});

3. Batch Processing with Concurrency

3. 并发批量处理

typescript
await boss.work(
  'batch-job',
  { batchSize: 10 },  // Process 10 jobs at once
  async (jobs) => {
    await Promise.all(jobs.map(job => processJob(job)));
  }
);
typescript
await boss.work(
  'batch-job',
  { batchSize: 10 },  // Process 10 jobs at once
  async (jobs) => {
    await Promise.all(jobs.map(job => processJob(job)));
  }
);

Reliability Patterns

可靠性模式

Singleton Pattern (Prevent Duplicates)

单例模式(防止重复实例)

typescript
// Use getPgBoss() singleton instead of creating new instances
let boss: PgBoss | null = null;

export async function getPgBoss(): Promise<PgBoss> {
  if (boss) return boss;
  
  boss = new PgBoss({ connectionString: process.env.DATABASE_URL });
  await boss.start();
  return boss;
}
typescript
// Use getPgBoss() singleton instead of creating new instances
let boss: PgBoss | null = null;

export async function getPgBoss(): Promise<PgBoss> {
  if (boss) return boss;
  
  boss = new PgBoss({ connectionString: process.env.DATABASE_URL });
  await boss.start();
  return boss;
}

Watchdog for Auto-Recovery

自动恢复监控器

PG Boss can stop unexpectedly (connection drops, crashes). Add a watchdog:
typescript
let boss: PgBoss | null = null;
let watchdogInterval: NodeJS.Timeout | null = null;

// Listen for unexpected stops
boss.on('stopped', () => {
  console.error('[PG Boss] Stopped unexpectedly');
  boss = null;
  // Watchdog will attempt recovery
});

// Watchdog checks every 60s
watchdogInterval = setInterval(async () => {
  if (!boss && workerRegistrationFn) {
    console.log('[PG Boss] Attempting recovery...');
    await workerRegistrationFn();
  }
}, 60000);
PG Boss 可能会意外停止(连接断开、崩溃)。添加监控器:
typescript
let boss: PgBoss | null = null;
let watchdogInterval: NodeJS.Timeout | null = null;

// Listen for unexpected stops
boss.on('stopped', () => {
  console.error('[PG Boss] Stopped unexpectedly');
  boss = null;
  // Watchdog will attempt recovery
});

// Watchdog checks every 60s
watchdogInterval = setInterval(async () => {
  if (!boss && workerRegistrationFn) {
    console.log('[PG Boss] Attempting recovery...');
    await workerRegistrationFn();
  }
}, 60000);

Health Check Integration

健康检查集成

Make health check fail when PG Boss is dead (triggers container restart):
typescript
app.get('/health', (c) => {
  const pgBossRunning = boss !== null;
  
  if (!pgBossRunning) {
    return c.json({ status: 'degraded', pgBossRunning: false }, 503);
  }
  
  return c.json({ status: 'ok', pgBossRunning: true });
});
当PG Boss停止时,让健康检查失败(触发容器重启):
typescript
app.get('/health', (c) => {
  const pgBossRunning = boss !== null;
  
  if (!pgBossRunning) {
    return c.json({ status: 'degraded', pgBossRunning: false }, 503);
  }
  
  return c.json({ status: 'ok', pgBossRunning: true });
});

Debugging

调试方法

Check PG Boss State in Database

在数据库中查看PG Boss状态

sql
-- Check scheduled jobs
SELECT name, cron FROM pgboss.schedule;

-- Check recent jobs
SELECT name, state, created_on, completed_on 
FROM pgboss.job 
ORDER BY created_on DESC 
LIMIT 20;

-- Check job queue size
SELECT name, COUNT(*) 
FROM pgboss.job 
WHERE state = 'created' 
GROUP BY name;
sql
-- Check scheduled jobs
SELECT name, cron FROM pgboss.schedule;

-- Check recent jobs
SELECT name, state, created_on, completed_on 
FROM pgboss.job 
ORDER BY created_on DESC 
LIMIT 20;

-- Check job queue size
SELECT name, COUNT(*) 
FROM pgboss.job 
WHERE state = 'created' 
GROUP BY name;

Via Node.js

通过Node.js调试

javascript
const boss = await getPgBoss();

// Get queue size
const pending = await boss.getQueueSize('my-job');

// Get job by ID
const job = await boss.getJobById(jobId);
javascript
const boss = await getPgBoss();

// Get queue size
const pending = await boss.getQueueSize('my-job');

// Get job by ID
const job = await boss.getJobById(jobId);

Railway-Specific Debugging

Railway 专属调试方法

bash
undefined
bash
undefined

Check environment variables

Check environment variables

railway variables --service api
railway variables --service api

Get public DATABASE_URL for direct access

Get public DATABASE_URL for direct access

railway variables --service Postgres | grep DATABASE_PUBLIC_URL
railway variables --service Postgres | grep DATABASE_PUBLIC_URL

Query database directly

Query database directly

DATABASE_URL="postgresql://..." node -e " const { Pool } = require('pg'); const pool = new Pool({ connectionString: process.env.DATABASE_URL, ssl: { rejectUnauthorized: false } }); // Run queries "
undefined
DATABASE_URL="postgresql://..." node -e " const { Pool } = require('pg'); const pool = new Pool({ connectionString: process.env.DATABASE_URL, ssl: { rejectUnauthorized: false } }); // Run queries "
undefined

Common Issues

常见问题

IssueCauseFix
Jobs not runningPG Boss not startedCheck health endpoint, verify DATABASE_URL
Duplicate jobsMissing
singletonKey
Add singletonKey to
schedule()
call
Queue not found
schedule()
called before
createQueue()
Call
boss.createQueue(name)
first
Jobs stuck in 'created'No worker registeredCall
boss.work()
for the queue
Connection errorsDatabase URL wrong/expiredCheck DATABASE_URL, SSL settings
Jobs not persistingWrong schemaVerify
schema: 'pgboss'
option
Worker over-pollingWrong option nameUse
pollingIntervalSeconds
(not
newJobCheckIntervalSeconds
)
Next.js build fails (
stream/net/tls
)
instrumentation bundles server-only depsUse
/* webpackIgnore: true */
on instrumentation dynamic imports
问题原因解决方法
任务未执行PG Boss未启动检查健康检查端点,验证DATABASE_URL配置
重复任务缺少
singletonKey
schedule()
调用中添加singletonKey
队列未找到
schedule()
createQueue()
之前调用
先调用
boss.createQueue(name)
任务卡在'created'状态未注册对应的工作进程为该队列调用
boss.work()
连接错误数据库URL错误/过期检查DATABASE_URL及SSL设置
任务未持久化架构配置错误验证
schema: 'pgboss'
配置项
工作进程轮询过于频繁配置项名称错误使用
pollingIntervalSeconds
(而非
newJobCheckIntervalSeconds
Next.js构建失败(
stream/net/tls
错误)
工具打包了仅服务器端依赖在工具动态导入时添加
/* webpackIgnore: true */

Best Practices

最佳实践

  1. Always use singletonKey for scheduled jobs to prevent duplicates on restart
  2. Create queues explicitly in PG Boss v10+:
    await boss.createQueue('my-job')
  3. Handle the 'stopped' event to detect unexpected shutdowns
  4. Use a watchdog to auto-recover from crashes (clear old interval before starting a new one)
  5. Return 503 in health check when PG Boss is dead (triggers container restart)
  6. Log job errors but don't swallow them - let PG Boss retry
  7. Use separate workers for different job types (rollover, email, etc.)
  8. Graceful shutdown: call
    boss.stop({ graceful: true, timeout: 30000 })
    on SIGTERM/SIGINT
  9. Next.js instrumentation: load server-only job modules with
    import(/* webpackIgnore: true */ ...)
  1. 定时任务务必使用singletonKey,避免重启后出现重复任务
  2. 在PG Boss v10+中显式创建队列
    await boss.createQueue('my-job')
  3. 监听'stopped'事件,以检测意外关闭
  4. 使用监控器,实现崩溃后自动恢复(启动新的监控器前请清除旧的定时器)
  5. 当PG Boss停止时,健康检查返回503,触发容器重启
  6. 记录任务错误但不要吞掉错误,让PG Boss自动重试
  7. 为不同类型的任务使用独立的工作进程(如轮转任务、邮件任务等)
  8. 优雅关闭:在收到SIGTERM/SIGINT信号时调用
    boss.stop({ graceful: true, timeout: 30000 })
  9. Next.js工具集成:使用
    import(/* webpackIgnore: true */ ...)
    加载仅服务器端的任务模块

File Structure Example

文件结构示例

apps/api/src/
├── lib/
│   └── pgboss.ts          # Singleton, getPgBoss(), stopPgBoss()
├── workers/
│   ├── index.ts           # registerAllWorkers()
│   ├── rollover/
│   │   ├── index.ts       # registerRolloverWorkers()
│   │   ├── timezone-check.ts
│   │   └── batch-processor.ts
│   └── email/
│       ├── index.ts       # registerEmailWorkers()
│       └── send-email.ts
└── index.ts               # Calls registerAllWorkers() on startup
apps/api/src/
├── lib/
│   └── pgboss.ts          # 单例、getPgBoss()、stopPgBoss()
├── workers/
│   ├── index.ts           # registerAllWorkers()
│   ├── rollover/
│   │   ├── index.ts       # registerRolloverWorkers()
│   │   ├── timezone-check.ts
│   │   └── batch-processor.ts
│   └── email/
│       ├── index.ts       # registerEmailWorkers()
│       └── send-email.ts
└── index.ts               # 启动时调用registerAllWorkers()

Timezone-Aware Scheduling

时区感知型调度

For jobs that need to run at specific local times (like midnight rollover):
typescript
import { toZonedTime } from 'date-fns-tz';

// Check if it's midnight in a timezone
const zonedNow = toZonedTime(new Date(), userTimezone);
const hour = zonedNow.getHours();
const minute = zonedNow.getMinutes();

// 10-minute window for reliability
const isMidnightWindow = hour === 0 && minute <= 10;
对于需要在特定本地时间执行的任务(如午夜轮转):
typescript
import { toZonedTime } from 'date-fns-tz';

// Check if it's midnight in a timezone
const zonedNow = toZonedTime(new Date(), userTimezone);
const hour = zonedNow.getHours();
const minute = zonedNow.getMinutes();

// 10-minute window for reliability
const isMidnightWindow = hour === 0 && minute <= 10;

Idempotency

幂等性处理

Use a log table to prevent duplicate processing:
typescript
// Check if already processed
const existing = await db.query.logs.findFirst({
  where: and(
    eq(logs.timezone, timezone),
    eq(logs.date, targetDate)
  ),
});

if (existing) return; // Already processed

// Process and log
await processJobs();
await db.insert(logs).values({ timezone, date: targetDate, status: 'completed' });
使用日志表防止重复处理:
typescript
// Check if already processed
const existing = await db.query.logs.findFirst({
  where: and(
    eq(logs.timezone, timezone),
    eq(logs.date, targetDate)
  ),
});

if (existing) return; // Already processed

// Process and log
await processJobs();
await db.insert(logs).values({ timezone, date: targetDate, status: 'completed' });