convex-actions-scheduling
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseConvex Actions & Scheduling Guide
Convex 动作与调度指南
Overview
概述
Convex provides powerful tools for handling asynchronous work, external API calls, and scheduled tasks. This skill covers actions (non-deterministic operations), the scheduler for background jobs, cron jobs for recurring tasks, and orchestration patterns for complex workflows.
Convex 提供了强大的工具来处理异步工作、外部API调用和定时任务。本指南涵盖动作(非确定性操作)、用于后台任务的调度器、用于重复任务的Cron任务,以及复杂工作流的编排模式。
TypeScript: NEVER Use any
Type
anyTypeScript:禁止使用any
类型
anyCRITICAL RULE: This codebase has enabled. Using will cause build failures.
@typescript-eslint/no-explicit-anyany关键规则: 此代码库启用了规则。使用类型会导致构建失败。
@typescript-eslint/no-explicit-anyanyWhen to Use This Skill
适用场景
Use this skill when:
- Calling external APIs (fetch, third-party SDKs)
- Implementing background job processing
- Scheduling delayed or recurring tasks
- Creating cron jobs for periodic work
- Building multi-step workflows
- Orchestrating complex operations across functions
在以下场景中使用本指南:
- 调用外部API(fetch、第三方SDK)
- 实现后台任务处理
- 调度延迟或重复执行的任务
- 创建周期性执行的Cron任务
- 构建多步骤工作流
- 跨函数编排复杂操作
Actions: Non-Deterministic Operations
动作:非确定性操作
What Actions Can Do
动作的适用场景
Actions are for work that:
- Calls external APIs (, third-party SDKs)
fetch - Uses Node.js modules (crypto, fs, etc.)
- Performs non-deterministic operations
- Needs to orchestrate multiple queries/mutations
动作用于处理以下工作:
- 调用外部API(、第三方SDK)
fetch - 使用Node.js模块(crypto、fs等)
- 执行非确定性操作
- 需要编排多个查询/变更操作
What Actions Cannot Do
动作的限制
CRITICAL: Actions have NO direct database access!
typescript
// ❌ WRONG: Actions cannot access ctx.db
export const processData = action({
args: { id: v.id("items") },
returns: v.null(),
handler: async (ctx, args) => {
const item = await ctx.db.get(args.id); // ❌ ERROR! No ctx.db in actions
return null;
},
});
// ✅ CORRECT: Use ctx.runQuery and ctx.runMutation
export const processData = action({
args: { id: v.id("items") },
returns: v.null(),
handler: async (ctx, args) => {
// Read via query
const item = await ctx.runQuery(internal.items.getById, { id: args.id });
// Call external API
const result = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(item),
});
// Write via mutation
await ctx.runMutation(internal.items.updateResult, {
id: args.id,
result: await result.json(),
});
return null;
},
});重点: 动作无法直接访问数据库!
typescript
// ❌ 错误:动作无法访问ctx.db
export const processData = action({
args: { id: v.id("items") },
returns: v.null(),
handler: async (ctx, args) => {
const item = await ctx.db.get(args.id); // ❌ 错误!动作中没有ctx.db
return null;
},
});
// ✅ 正确:使用ctx.runQuery和ctx.runMutation
export const processData = action({
args: { id: v.id("items") },
returns: v.null(),
handler: async (ctx, args) => {
// 通过查询读取数据
const item = await ctx.runQuery(internal.items.getById, { id: args.id });
// 调用外部API
const result = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(item),
});
// 通过变更写入数据
await ctx.runMutation(internal.items.updateResult, {
id: args.id,
result: await result.json(),
});
return null;
},
});Node.js Runtime for Actions
动作的Node.js运行时
Add at the top of files using Node.js modules:
"use node";typescript
// convex/pdf.ts
"use node";
import { internalAction } from "./_generated/server";
import { v } from "convex/values";
import pdf from "pdf-parse";
export const extractText = internalAction({
args: { pdfData: v.bytes() },
returns: v.string(),
handler: async (ctx, args) => {
const buffer = Buffer.from(args.pdfData);
const data = await pdf(buffer);
return data.text;
},
});在使用Node.js模块的文件顶部添加:
"use node";typescript
// convex/pdf.ts
"use node";
import { internalAction } from "./_generated/server";
import { v } from "convex/values";
import pdf from "pdf-parse";
export const extractText = internalAction({
args: { pdfData: v.bytes() },
returns: v.string(),
handler: async (ctx, args) => {
const buffer = Buffer.from(args.pdfData);
const data = await pdf(buffer);
return data.text;
},
});Action Patterns
动作模式
Pattern 1: External API Call
typescript
export const sendEmail = internalAction({
args: {
to: v.string(),
subject: v.string(),
body: v.string(),
},
returns: v.object({
success: v.boolean(),
messageId: v.optional(v.string()),
}),
handler: async (ctx, args) => {
const response = await fetch("https://api.sendgrid.com/v3/mail/send", {
method: "POST",
headers: {
Authorization: `Bearer ${process.env.SENDGRID_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
personalizations: [{ to: [{ email: args.to }] }],
from: { email: "noreply@example.com" },
subject: args.subject,
content: [{ type: "text/plain", value: args.body }],
}),
});
if (response.ok) {
const data = await response.json();
return { success: true, messageId: data.id };
}
return { success: false };
},
});Pattern 2: Multi-Step Workflow
typescript
export const processOrder = internalAction({
args: { orderId: v.id("orders") },
returns: v.null(),
handler: async (ctx, args) => {
// Step 1: Get order details
const order = await ctx.runQuery(internal.orders.getById, {
orderId: args.orderId,
});
if (!order) throw new Error("Order not found");
try {
// Step 2: Charge payment (external API)
const paymentResult = await fetch("https://api.stripe.com/v1/charges", {
method: "POST",
headers: {
Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
"Content-Type": "application/x-www-form-urlencoded",
},
body: new URLSearchParams({
amount: String(order.total),
currency: "usd",
source: order.paymentMethodId,
}),
});
if (!paymentResult.ok) {
throw new Error("Payment failed");
}
const paymentData = await paymentResult.json();
// Step 3: Update order status
await ctx.runMutation(internal.orders.markPaid, {
orderId: args.orderId,
chargeId: paymentData.id,
});
// Step 4: Schedule fulfillment
await ctx.scheduler.runAfter(0, internal.fulfillment.processOrder, {
orderId: args.orderId,
});
} catch (error) {
await ctx.runMutation(internal.orders.markFailed, {
orderId: args.orderId,
error: String(error),
});
}
return null;
},
});模式1:外部API调用
typescript
export const sendEmail = internalAction({
args: {
to: v.string(),
subject: v.string(),
body: v.string(),
},
returns: v.object({
success: v.boolean(),
messageId: v.optional(v.string()),
}),
handler: async (ctx, args) => {
const response = await fetch("https://api.sendgrid.com/v3/mail/send", {
method: "POST",
headers: {
Authorization: `Bearer ${process.env.SENDGRID_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
personalizations: [{ to: [{ email: args.to }] }],
from: { email: "noreply@example.com" },
subject: args.subject,
content: [{ type: "text/plain", value: args.body }],
}),
});
if (response.ok) {
const data = await response.json();
return { success: true, messageId: data.id };
}
return { success: false };
},
});模式2:多步骤工作流
typescript
export const processOrder = internalAction({
args: { orderId: v.id("orders") },
returns: v.null(),
handler: async (ctx, args) => {
// 步骤1:获取订单详情
const order = await ctx.runQuery(internal.orders.getById, {
orderId: args.orderId,
});
if (!order) throw new Error("订单未找到");
try {
// 步骤2:处理支付(外部API)
const paymentResult = await fetch("https://api.stripe.com/v1/charges", {
method: "POST",
headers: {
Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
"Content-Type": "application/x-www-form-urlencoded",
},
body: new URLSearchParams({
amount: String(order.total),
currency: "usd",
source: order.paymentMethodId,
}),
});
if (!paymentResult.ok) {
throw new Error("支付失败");
}
const paymentData = await paymentResult.json();
// 步骤3:更新订单状态
await ctx.runMutation(internal.orders.markPaid, {
orderId: args.orderId,
chargeId: paymentData.id,
});
// 步骤4:调度订单履约
await ctx.scheduler.runAfter(0, internal.fulfillment.processOrder, {
orderId: args.orderId,
});
} catch (error) {
await ctx.runMutation(internal.orders.markFailed, {
orderId: args.orderId,
error: String(error),
});
}
return null;
},
});Scheduling
任务调度
Fire-and-Forget (Immediate)
即发即弃(立即执行)
Schedule work to run immediately but asynchronously:
typescript
export const submitJob = mutation({
args: { data: v.string() },
returns: v.id("jobs"),
handler: async (ctx, args) => {
const jobId = await ctx.db.insert("jobs", {
data: args.data,
status: "pending",
});
// Schedule immediately (0ms delay)
await ctx.scheduler.runAfter(0, internal.jobs.process, { jobId });
return jobId;
},
});调度任务立即异步执行:
typescript
export const submitJob = mutation({
args: { data: v.string() },
returns: v.id("jobs"),
handler: async (ctx, args) => {
const jobId = await ctx.db.insert("jobs", {
data: args.data,
status: "pending",
});
// 立即调度(延迟0毫秒)
await ctx.scheduler.runAfter(0, internal.jobs.process, { jobId });
return jobId;
},
});Delayed Execution
延迟执行
Schedule work to run after a delay:
typescript
// Self-destructing message
export const sendExpiringMessage = mutation({
args: { body: v.string(), expiresInMs: v.number() },
returns: v.id("messages"),
handler: async (ctx, args) => {
const id = await ctx.db.insert("messages", { body: args.body });
// Delete after specified time
await ctx.scheduler.runAfter(args.expiresInMs, internal.messages.delete, {
id,
});
return id;
},
});
export const delete_ = internalMutation({
args: { id: v.id("messages") },
returns: v.null(),
handler: async (ctx, args) => {
await ctx.db.delete(args.id);
return null;
},
});调度任务在指定延迟后执行:
typescript
// 自动销毁消息
export const sendExpiringMessage = mutation({
args: { body: v.string(), expiresInMs: v.number() },
returns: v.id("messages"),
handler: async (ctx, args) => {
const id = await ctx.db.insert("messages", { body: args.body });
// 指定时间后删除
await ctx.scheduler.runAfter(args.expiresInMs, internal.messages.delete, {
id,
});
return id;
},
});
export const delete_ = internalMutation({
args: { id: v.id("messages") },
returns: v.null(),
handler: async (ctx, args) => {
await ctx.db.delete(args.id);
return null;
},
});Scheduled at Specific Time
指定时间执行
typescript
export const scheduleReminder = mutation({
args: {
userId: v.id("users"),
message: v.string(),
sendAt: v.number(), // Unix timestamp
},
returns: v.id("scheduledFunctions"),
handler: async (ctx, args) => {
// Schedule at specific timestamp
return await ctx.scheduler.runAt(
args.sendAt,
internal.notifications.sendReminder,
{
userId: args.userId,
message: args.message,
}
);
},
});typescript
export const scheduleReminder = mutation({
args: {
userId: v.id("users"),
message: v.string(),
sendAt: v.number(), // Unix时间戳
},
returns: v.id("scheduledFunctions"),
handler: async (ctx, args) => {
// 在指定时间戳调度
return await ctx.scheduler.runAt(
args.sendAt,
internal.notifications.sendReminder,
{
userId: args.userId,
message: args.message,
}
);
},
});Cancel Scheduled Functions
取消已调度任务
typescript
export const cancelReminder = mutation({
args: { scheduledId: v.id("_scheduled_functions") },
returns: v.null(),
handler: async (ctx, args) => {
await ctx.scheduler.cancel(args.scheduledId);
return null;
},
});typescript
export const cancelReminder = mutation({
args: { scheduledId: v.id("_scheduled_functions") },
returns: v.null(),
handler: async (ctx, args) => {
await ctx.scheduler.cancel(args.scheduledId);
return null;
},
});Scheduling from Actions
在动作中调度任务
Actions can also schedule work:
typescript
export const processWithRetry = internalAction({
args: { jobId: v.id("jobs"), attempt: v.number() },
returns: v.null(),
handler: async (ctx, args) => {
try {
// Try to process
const job = await ctx.runQuery(internal.jobs.getById, {
jobId: args.jobId,
});
const result = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(job),
});
if (!result.ok) throw new Error("API error");
await ctx.runMutation(internal.jobs.markComplete, {
jobId: args.jobId,
result: await result.json(),
});
} catch (error) {
if (args.attempt < 3) {
// Retry with exponential backoff
const delay = Math.pow(2, args.attempt) * 1000;
await ctx.scheduler.runAfter(delay, internal.jobs.processWithRetry, {
jobId: args.jobId,
attempt: args.attempt + 1,
});
} else {
await ctx.runMutation(internal.jobs.markFailed, {
jobId: args.jobId,
error: String(error),
});
}
}
return null;
},
});动作同样可以调度任务:
typescript
export const processWithRetry = internalAction({
args: { jobId: v.id("jobs"), attempt: v.number() },
returns: v.null(),
handler: async (ctx, args) => {
try {
// 尝试处理任务
const job = await ctx.runQuery(internal.jobs.getById, {
jobId: args.jobId,
});
const result = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(job),
});
if (!result.ok) throw new Error("API错误");
await ctx.runMutation(internal.jobs.markComplete, {
jobId: args.jobId,
result: await result.json(),
});
} catch (error) {
if (args.attempt < 3) {
// 指数退避重试
const delay = Math.pow(2, args.attempt) * 1000;
await ctx.scheduler.runAfter(delay, internal.jobs.processWithRetry, {
jobId: args.jobId,
attempt: args.attempt + 1,
});
} else {
await ctx.runMutation(internal.jobs.markFailed, {
jobId: args.jobId,
error: String(error),
});
}
}
return null;
},
});Cron Jobs
Cron任务
Creating Cron Jobs
创建Cron任务
Cron jobs must be defined in :
convex/crons.tstypescript
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// Run every hour
crons.interval(
"cleanup stale jobs",
{ hours: 1 },
internal.jobs.cleanupStale,
{}
);
// Run at specific cron schedule
crons.cron(
"daily report",
"0 9 * * *", // 9 AM every day
internal.reports.generateDaily,
{}
);
// Run every 5 minutes
crons.interval(
"sync external data",
{ minutes: 5 },
internal.sync.pullExternalData,
{}
);
// Run weekly on Sundays at midnight
crons.cron(
"weekly cleanup",
"0 0 * * 0",
internal.maintenance.weeklyCleanup,
{}
);
export default crons;Cron任务必须定义在文件中:
convex/crons.tstypescript
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 每小时执行一次
crons.interval(
"cleanup stale jobs",
{ hours: 1 },
internal.jobs.cleanupStale,
{}
);
// 按指定Cron表达式执行
crons.cron(
"daily report",
"0 9 * * *", // 每天上午9点
internal.reports.generateDaily,
{}
);
// 每5分钟执行一次
crons.interval(
"sync external data",
{ minutes: 5 },
internal.sync.pullExternalData,
{}
);
// 每周日午夜执行
crons.cron(
"weekly cleanup",
"0 0 * * 0",
internal.maintenance.weeklyCleanup,
{}
);
export default crons;Interval Options
时间间隔配置
typescript
// Various interval configurations
crons.interval("every-minute", { minutes: 1 }, handler, {});
crons.interval("every-hour", { hours: 1 }, handler, {});
crons.interval("every-day", { hours: 24 }, handler, {});
crons.interval("every-30-seconds", { seconds: 30 }, handler, {});typescript
// 各种时间间隔配置
crons.interval("every-minute", { minutes: 1 }, handler, {});
crons.interval("every-hour", { hours: 1 }, handler, {});
crons.interval("every-day", { hours: 24 }, handler, {});
crons.interval("every-30-seconds", { seconds: 30 }, handler, {});Cron Schedule Syntax
Cron表达式语法
Standard cron format:
minute hour day month weekdaytypescript
// Examples
"* * * * *"; // Every minute
"0 * * * *"; // Every hour
"0 0 * * *"; // Every day at midnight
"0 9 * * *"; // Every day at 9 AM
"0 0 * * 0"; // Every Sunday at midnight
"0 0 1 * *"; // First day of every month
"*/5 * * * *"; // Every 5 minutes
"0 */2 * * *"; // Every 2 hours标准Cron格式:
分钟 小时 日期 月份 星期typescript
// 示例
"* * * * *"; // 每分钟
"0 * * * *"; // 每小时
"0 0 * * *"; // 每天午夜
"0 9 * * *"; // 每天上午9点
"0 0 * * 0"; // 每周日午夜
"0 0 1 * *"; // 每月第一天
"*/5 * * * *"; // 每5分钟
"0 */2 * * *"; // 每2小时Cron Job Implementation
Cron任务实现
typescript
// convex/jobs.ts
export const cleanupStale = internalMutation({
args: {},
returns: v.number(),
handler: async (ctx) => {
const oneHourAgo = Date.now() - 60 * 60 * 1000;
const staleJobs = await ctx.db
.query("jobs")
.withIndex("by_status_and_createdAt", (q) =>
q.eq("status", "pending").lt("createdAt", oneHourAgo)
)
.collect();
for (const job of staleJobs) {
await ctx.db.patch(job._id, { status: "stale" });
}
return staleJobs.length;
},
});typescript
// convex/jobs.ts
export const cleanupStale = internalMutation({
args: {},
returns: v.number(),
handler: async (ctx) => {
const oneHourAgo = Date.now() - 60 * 60 * 1000;
const staleJobs = await ctx.db
.query("jobs")
.withIndex("by_status_and_createdAt", (q) =>
q.eq("status", "pending").lt("createdAt", oneHourAgo)
)
.collect();
for (const job of staleJobs) {
await ctx.db.patch(job._id, { status: "stale" });
}
return staleJobs.length;
},
});Orchestration Patterns
编排模式
Pattern 1: Saga Pattern (Compensating Transactions)
模式1:Saga模式(补偿事务)
For operations that span multiple services with rollback:
typescript
export const createSubscription = internalAction({
args: {
userId: v.id("users"),
planId: v.string(),
},
returns: v.union(v.id("subscriptions"), v.null()),
handler: async (ctx, args) => {
const user = await ctx.runQuery(internal.users.getById, {
userId: args.userId,
});
if (!user) throw new Error("User not found");
// Step 1: Create Stripe subscription
let stripeSubscriptionId: string | null = null;
try {
const stripeResponse = await fetch(
"https://api.stripe.com/v1/subscriptions",
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
"Content-Type": "application/x-www-form-urlencoded",
},
body: new URLSearchParams({
customer: user.stripeCustomerId,
items: [{ price: args.planId }]
.map((i, idx) => `items[${idx}][price]=${i.price}`)
.join("&"),
}),
}
);
if (!stripeResponse.ok) {
throw new Error("Stripe subscription failed");
}
const stripeData = await stripeResponse.json();
stripeSubscriptionId = stripeData.id;
} catch (error) {
// No cleanup needed - nothing created yet
return null;
}
// Step 2: Create local subscription record
try {
const subscriptionId = await ctx.runMutation(
internal.subscriptions.create,
{
userId: args.userId,
planId: args.planId,
stripeSubscriptionId,
}
);
return subscriptionId;
} catch (error) {
// Rollback: Cancel Stripe subscription
await fetch(
`https://api.stripe.com/v1/subscriptions/${stripeSubscriptionId}`,
{
method: "DELETE",
headers: {
Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
},
}
);
return null;
}
},
});适用于跨多个服务且需要回滚的操作:
typescript
export const createSubscription = internalAction({
args: {
userId: v.id("users"),
planId: v.string(),
},
returns: v.union(v.id("subscriptions"), v.null()),
handler: async (ctx, args) => {
const user = await ctx.runQuery(internal.users.getById, {
userId: args.userId,
});
if (!user) throw new Error("用户未找到");
// 步骤1:创建Stripe订阅
let stripeSubscriptionId: string | null = null;
try {
const stripeResponse = await fetch(
"https://api.stripe.com/v1/subscriptions",
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
"Content-Type": "application/x-www-form-urlencoded",
},
body: new URLSearchParams({
customer: user.stripeCustomerId,
items: [{ price: args.planId }]
.map((i, idx) => `items[${idx}][price]=${i.price}`)
.join("&"),
}),
}
);
if (!stripeResponse.ok) {
throw new Error("Stripe订阅创建失败");
}
const stripeData = await stripeResponse.json();
stripeSubscriptionId = stripeData.id;
} catch (error) {
// 无需清理:尚未创建任何资源
return null;
}
// 步骤2:创建本地订阅记录
try {
const subscriptionId = await ctx.runMutation(
internal.subscriptions.create,
{
userId: args.userId,
planId: args.planId,
stripeSubscriptionId,
}
);
return subscriptionId;
} catch (error) {
// 回滚:取消Stripe订阅
await fetch(
`https://api.stripe.com/v1/subscriptions/${stripeSubscriptionId}`,
{
method: "DELETE",
headers: {
Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
},
}
);
return null;
}
},
});Pattern 2: Fan-Out / Fan-In
模式2:扇出/扇入
Process multiple items in parallel, then aggregate:
typescript
// Fan-out: Schedule processing for each item
export const processAll = mutation({
args: { itemIds: v.array(v.id("items")) },
returns: v.id("batchJobs"),
handler: async (ctx, args) => {
const batchId = await ctx.db.insert("batchJobs", {
totalItems: args.itemIds.length,
completedItems: 0,
status: "processing",
});
// Fan-out: Schedule each item
for (const itemId of args.itemIds) {
await ctx.scheduler.runAfter(0, internal.items.processOne, {
itemId,
batchId,
});
}
return batchId;
},
});
// Process single item
export const processOne = internalAction({
args: { itemId: v.id("items"), batchId: v.id("batchJobs") },
returns: v.null(),
handler: async (ctx, args) => {
const item = await ctx.runQuery(internal.items.getById, {
itemId: args.itemId,
});
// Process item (external API, etc.)
const result = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(item),
});
// Update item with result
await ctx.runMutation(internal.items.updateResult, {
itemId: args.itemId,
result: await result.json(),
});
// Fan-in: Update batch progress
await ctx.runMutation(internal.batches.incrementCompleted, {
batchId: args.batchId,
});
return null;
},
});
// Fan-in: Track completion
export const incrementCompleted = internalMutation({
args: { batchId: v.id("batchJobs") },
returns: v.null(),
handler: async (ctx, args) => {
const batch = await ctx.db.get(args.batchId);
if (!batch) return null;
const newCompleted = batch.completedItems + 1;
const isComplete = newCompleted >= batch.totalItems;
await ctx.db.patch(args.batchId, {
completedItems: newCompleted,
status: isComplete ? "completed" : "processing",
});
if (isComplete) {
// Trigger completion handler
await ctx.scheduler.runAfter(0, internal.batches.onComplete, {
batchId: args.batchId,
});
}
return null;
},
});并行处理多个项,然后聚合结果:
typescript
// 扇出:为每个项调度处理任务
export const processAll = mutation({
args: { itemIds: v.array(v.id("items")) },
returns: v.id("batchJobs"),
handler: async (ctx, args) => {
const batchId = await ctx.db.insert("batchJobs", {
totalItems: args.itemIds.length,
completedItems: 0,
status: "processing",
});
// 扇出:为每个项调度任务
for (const itemId of args.itemIds) {
await ctx.scheduler.runAfter(0, internal.items.processOne, {
itemId,
batchId,
});
}
return batchId;
},
});
// 处理单个项
export const processOne = internalAction({
args: { itemId: v.id("items"), batchId: v.id("batchJobs") },
returns: v.null(),
handler: async (ctx, args) => {
const item = await ctx.runQuery(internal.items.getById, {
itemId: args.itemId,
});
// 处理项(外部API等)
const result = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(item),
});
// 更新项的处理结果
await ctx.runMutation(internal.items.updateResult, {
itemId: args.itemId,
result: await result.json(),
});
// 扇入:更新批处理进度
await ctx.runMutation(internal.batches.incrementCompleted, {
batchId: args.batchId,
});
return null;
},
});
// 扇入:跟踪完成状态
export const incrementCompleted = internalMutation({
args: { batchId: v.id("batchJobs") },
returns: v.null(),
handler: async (ctx, args) => {
const batch = await ctx.db.get(args.batchId);
if (!batch) return null;
const newCompleted = batch.completedItems + 1;
const isComplete = newCompleted >= batch.totalItems;
await ctx.db.patch(args.batchId, {
completedItems: newCompleted,
status: isComplete ? "completed" : "processing",
});
if (isComplete) {
// 触发完成处理逻辑
await ctx.scheduler.runAfter(0, internal.batches.onComplete, {
batchId: args.batchId,
});
}
return null;
},
});Pattern 3: Retry with Exponential Backoff and Jitter
模式3:指数退避与抖动重试
typescript
export const processWithRetry = internalAction({
args: {
jobId: v.id("jobs"),
attempt: v.number(),
maxAttempts: v.optional(v.number()),
},
returns: v.null(),
handler: async (ctx, args) => {
const maxAttempts = args.maxAttempts ?? 5;
try {
const job = await ctx.runQuery(internal.jobs.getById, {
jobId: args.jobId,
});
if (!job) return null;
const response = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(job),
});
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
await ctx.runMutation(internal.jobs.markComplete, {
jobId: args.jobId,
result: await response.json(),
});
} catch (error) {
if (args.attempt < maxAttempts) {
// Exponential backoff with jitter
const baseDelay = Math.pow(2, args.attempt) * 1000;
const jitter = Math.random() * 1000;
const delay = baseDelay + jitter;
await ctx.scheduler.runAfter(delay, internal.jobs.processWithRetry, {
jobId: args.jobId,
attempt: args.attempt + 1,
maxAttempts,
});
await ctx.runMutation(internal.jobs.updateAttempt, {
jobId: args.jobId,
attempt: args.attempt + 1,
nextRetryAt: Date.now() + delay,
});
} else {
await ctx.runMutation(internal.jobs.markFailed, {
jobId: args.jobId,
error: String(error),
finalAttempt: args.attempt,
});
}
}
return null;
},
});typescript
export const processWithRetry = internalAction({
args: {
jobId: v.id("jobs"),
attempt: v.number(),
maxAttempts: v.optional(v.number()),
},
returns: v.null(),
handler: async (ctx, args) => {
const maxAttempts = args.maxAttempts ?? 5;
try {
const job = await ctx.runQuery(internal.jobs.getById, {
jobId: args.jobId,
});
if (!job) return null;
const response = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(job),
});
if (!response.ok) {
throw new Error(`API错误: ${response.status}`);
}
await ctx.runMutation(internal.jobs.markComplete, {
jobId: args.jobId,
result: await response.json(),
});
} catch (error) {
if (args.attempt < maxAttempts) {
// 指数退避加抖动
const baseDelay = Math.pow(2, args.attempt) * 1000;
const jitter = Math.random() * 1000;
const delay = baseDelay + jitter;
await ctx.scheduler.runAfter(delay, internal.jobs.processWithRetry, {
jobId: args.jobId,
attempt: args.attempt + 1,
maxAttempts,
});
await ctx.runMutation(internal.jobs.updateAttempt, {
jobId: args.jobId,
attempt: args.attempt + 1,
nextRetryAt: Date.now() + delay,
});
} else {
await ctx.runMutation(internal.jobs.markFailed, {
jobId: args.jobId,
error: String(error),
finalAttempt: args.attempt,
});
}
}
return null;
},
});Pattern 4: Idempotency Keys
模式4:幂等键
Prevent duplicate processing:
typescript
export const processPayment = mutation({
args: {
idempotencyKey: v.string(),
amount: v.number(),
customerId: v.string(),
},
returns: v.union(v.id("payments"), v.null()),
handler: async (ctx, args) => {
// Check if already processed
const existing = await ctx.db
.query("payments")
.withIndex("by_idempotency_key", (q) =>
q.eq("idempotencyKey", args.idempotencyKey)
)
.unique();
if (existing) return existing._id; // Already done, return existing
// Process and record
const paymentId = await ctx.db.insert("payments", {
idempotencyKey: args.idempotencyKey,
amount: args.amount,
customerId: args.customerId,
status: "pending",
});
await ctx.scheduler.runAfter(0, internal.payments.charge, { paymentId });
return paymentId;
},
});防止重复处理:
typescript
export const processPayment = mutation({
args: {
idempotencyKey: v.string(),
amount: v.number(),
customerId: v.string(),
},
returns: v.union(v.id("payments"), v.null()),
handler: async (ctx, args) => {
// 检查是否已处理过
const existing = await ctx.db
.query("payments")
.withIndex("by_idempotency_key", (q) =>
q.eq("idempotencyKey", args.idempotencyKey)
)
.unique();
if (existing) return existing._id; // 已处理,返回现有记录
// 处理并记录
const paymentId = await ctx.db.insert("payments", {
idempotencyKey: args.idempotencyKey,
amount: args.amount,
customerId: args.customerId,
status: "pending",
});
await ctx.scheduler.runAfter(0, internal.payments.charge, { paymentId });
return paymentId;
},
});Common Pitfalls
常见陷阱
Pitfall 1: Actions Without Error Handling
陷阱1:动作未处理错误
❌ WRONG:
typescript
export const sendNotification = internalAction({
args: { userId: v.id("users") },
returns: v.null(),
handler: async (ctx, args) => {
const user = await ctx.runQuery(internal.users.getById, {
userId: args.userId,
});
// If this fails, we lose track of the failure
await fetch("https://api.pushover.net/send", {
method: "POST",
body: JSON.stringify({ user: user.pushToken, message: "Hello" }),
});
return null;
},
});✅ CORRECT:
typescript
export const sendNotification = internalAction({
args: { userId: v.id("users"), notificationId: v.id("notifications") },
returns: v.null(),
handler: async (ctx, args) => {
const user = await ctx.runQuery(internal.users.getById, {
userId: args.userId,
});
try {
const response = await fetch("https://api.pushover.net/send", {
method: "POST",
body: JSON.stringify({ user: user.pushToken, message: "Hello" }),
});
if (!response.ok) {
throw new Error(`Push API error: ${response.status}`);
}
await ctx.runMutation(internal.notifications.markSent, {
notificationId: args.notificationId,
});
} catch (error) {
await ctx.runMutation(internal.notifications.markFailed, {
notificationId: args.notificationId,
error: String(error),
});
}
return null;
},
});❌ 错误示例:
typescript
export const sendNotification = internalAction({
args: { userId: v.id("users") },
returns: v.null(),
handler: async (ctx, args) => {
const user = await ctx.runQuery(internal.users.getById, {
userId: args.userId,
});
// 如果调用失败,无法追踪失败状态
await fetch("https://api.pushover.net/send", {
method: "POST",
body: JSON.stringify({ user: user.pushToken, message: "Hello" }),
});
return null;
},
});✅ 正确示例:
typescript
export const sendNotification = internalAction({
args: { userId: v.id("users"), notificationId: v.id("notifications") },
returns: v.null(),
handler: async (ctx, args) => {
const user = await ctx.runQuery(internal.users.getById, {
userId: args.userId,
});
try {
const response = await fetch("https://api.pushover.net/send", {
method: "POST",
body: JSON.stringify({ user: user.pushToken, message: "Hello" }),
});
if (!response.ok) {
throw new Error(`推送API错误: ${response.status}`);
}
await ctx.runMutation(internal.notifications.markSent, {
notificationId: args.notificationId,
});
} catch (error) {
await ctx.runMutation(internal.notifications.markFailed, {
notificationId: args.notificationId,
error: String(error),
});
}
return null;
},
});Pitfall 2: Not Using Internal Functions
陷阱2:未使用内部函数
❌ WRONG:
typescript
// Public action callable by anyone!
export const deleteAllUserData = action({
args: { userId: v.id("users") },
returns: v.null(),
handler: async (ctx, args) => {
// Dangerous! No auth check, publicly accessible
await ctx.runMutation(api.users.delete, { userId: args.userId });
return null;
},
});✅ CORRECT:
typescript
// Internal action - only callable from other functions
export const deleteAllUserData = internalAction({
args: { userId: v.id("users") },
returns: v.null(),
handler: async (ctx, args) => {
// Safe - only called from authenticated internal code
await ctx.runMutation(internal.users.delete, { userId: args.userId });
return null;
},
});
// Public mutation with auth check schedules the internal action
export const requestAccountDeletion = mutation({
args: {},
returns: v.null(),
handler: async (ctx) => {
const identity = await ctx.auth.getUserIdentity();
if (!identity) throw new Error("Unauthorized");
const user = await ctx.db
.query("users")
.withIndex("by_tokenIdentifier", (q) =>
q.eq("tokenIdentifier", identity.tokenIdentifier)
)
.unique();
if (!user) throw new Error("User not found");
await ctx.scheduler.runAfter(0, internal.users.deleteAllUserData, {
userId: user._id,
});
return null;
},
});❌ 错误示例:
typescript
// 公开可调用的动作!存在安全风险
export const deleteAllUserData = action({
args: { userId: v.id("users") },
returns: v.null(),
handler: async (ctx, args) => {
// 危险!无权限校验,公开可访问
await ctx.runMutation(api.users.delete, { userId: args.userId });
return null;
},
});✅ 正确示例:
typescript
// 内部动作 - 仅可被其他内部函数调用
export const deleteAllUserData = internalAction({
args: { userId: v.id("users") },
returns: v.null(),
handler: async (ctx, args) => {
// 安全 - 仅可通过已认证的内部代码调用
await ctx.runMutation(internal.users.delete, { userId: args.userId });
return null;
},
});
// 带权限校验的公开变更操作,调度内部动作
export const requestAccountDeletion = mutation({
args: {},
returns: v.null(),
handler: async (ctx) => {
const identity = await ctx.auth.getUserIdentity();
if (!identity) throw new Error("未授权");
const user = await ctx.db
.query("users")
.withIndex("by_tokenIdentifier", (q) =>
q.eq("tokenIdentifier", identity.tokenIdentifier)
)
.unique();
if (!user) throw new Error("用户未找到");
await ctx.scheduler.runAfter(0, internal.users.deleteAllUserData, {
userId: user._id,
});
return null;
},
});Pitfall 3: Thundering Herd
陷阱3:惊群效应
❌ WRONG:
typescript
// All retries happen at the same time
const retryDelay = 5000;
await ctx.scheduler.runAfter(retryDelay, internal.jobs.retry, { jobId });✅ CORRECT:
typescript
// Add jitter to spread out retries
const baseDelay = 5000;
const jitter = Math.random() * 1000;
await ctx.scheduler.runAfter(baseDelay + jitter, internal.jobs.retry, {
jobId,
});❌ 错误示例:
typescript
// 所有重试同时执行
const retryDelay = 5000;
await ctx.scheduler.runAfter(retryDelay, internal.jobs.retry, { jobId });✅ 正确示例:
typescript
// 添加抖动分散重试时间
const baseDelay = 5000;
const jitter = Math.random() * 1000;
await ctx.scheduler.runAfter(baseDelay + jitter, internal.jobs.retry, {
jobId,
});Quick Reference
快速参考
Scheduling Methods
调度方法
typescript
// Immediate (0ms delay)
await ctx.scheduler.runAfter(0, internal.jobs.process, { jobId });
// Delayed (milliseconds)
await ctx.scheduler.runAfter(5000, internal.messages.delete, { id });
// At specific timestamp
await ctx.scheduler.runAt(timestamp, internal.reports.send, {});
// Cancel scheduled function
await ctx.scheduler.cancel(scheduledFunctionId);typescript
// 立即执行(延迟0毫秒)
await ctx.scheduler.runAfter(0, internal.jobs.process, { jobId });
// 延迟执行(毫秒)
await ctx.scheduler.runAfter(5000, internal.messages.delete, { id });
// 指定时间戳执行
await ctx.scheduler.runAt(timestamp, internal.reports.send, {});
// 取消已调度任务
await ctx.scheduler.cancel(scheduledFunctionId);Cron Syntax
Cron表达式示例
| Expression | Description |
|---|---|
| Every minute |
| Every hour |
| Every day at midnight |
| 9 AM weekdays |
| Every 15 minutes |
| First of month |
| 表达式 | 描述 |
|---|---|
| 每分钟执行一次 |
| 每小时执行一次 |
| 每天午夜执行一次 |
| 工作日上午9点执行 |
| 每15分钟执行一次 |
| 每月第一天执行一次 |
Action Context Methods
动作上下文方法
typescript
// Read data
await ctx.runQuery(internal.table.query, args);
// Write data
await ctx.runMutation(internal.table.mutation, args);
// Call another action
await ctx.runAction(internal.external.action, args);
// Schedule work
await ctx.scheduler.runAfter(delay, internal.jobs.process, args);
await ctx.scheduler.runAt(timestamp, internal.jobs.process, args);typescript
// 读取数据
await ctx.runQuery(internal.table.query, args);
// 写入数据
await ctx.runMutation(internal.table.mutation, args);
// 调用其他动作
await ctx.runAction(internal.external.action, args);
// 调度任务
await ctx.scheduler.runAfter(delay, internal.jobs.process, args);
await ctx.scheduler.runAt(timestamp, internal.jobs.process, args);