convex-actions-scheduling

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Convex Actions & Scheduling Guide

Convex 动作与调度指南

Overview

概述

Convex provides powerful tools for handling asynchronous work, external API calls, and scheduled tasks. This skill covers actions (non-deterministic operations), the scheduler for background jobs, cron jobs for recurring tasks, and orchestration patterns for complex workflows.
Convex 提供了强大的工具来处理异步工作、外部API调用和定时任务。本指南涵盖动作(非确定性操作)、用于后台任务的调度器、用于重复任务的Cron任务,以及复杂工作流的编排模式。

TypeScript: NEVER Use
any
Type

TypeScript:禁止使用
any
类型

CRITICAL RULE: This codebase has
@typescript-eslint/no-explicit-any
enabled. Using
any
will cause build failures.
关键规则: 此代码库启用了
@typescript-eslint/no-explicit-any
规则。使用
any
类型会导致构建失败。

When to Use This Skill

适用场景

Use this skill when:
  • Calling external APIs (fetch, third-party SDKs)
  • Implementing background job processing
  • Scheduling delayed or recurring tasks
  • Creating cron jobs for periodic work
  • Building multi-step workflows
  • Orchestrating complex operations across functions
在以下场景中使用本指南:
  • 调用外部API(fetch、第三方SDK)
  • 实现后台任务处理
  • 调度延迟或重复执行的任务
  • 创建周期性执行的Cron任务
  • 构建多步骤工作流
  • 跨函数编排复杂操作

Actions: Non-Deterministic Operations

动作:非确定性操作

What Actions Can Do

动作的适用场景

Actions are for work that:
  • Calls external APIs (
    fetch
    , third-party SDKs)
  • Uses Node.js modules (crypto, fs, etc.)
  • Performs non-deterministic operations
  • Needs to orchestrate multiple queries/mutations
动作用于处理以下工作:
  • 调用外部API(
    fetch
    、第三方SDK)
  • 使用Node.js模块(crypto、fs等)
  • 执行非确定性操作
  • 需要编排多个查询/变更操作

What Actions Cannot Do

动作的限制

CRITICAL: Actions have NO direct database access!
typescript
// ❌ WRONG: Actions cannot access ctx.db
export const processData = action({
  args: { id: v.id("items") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const item = await ctx.db.get(args.id); // ❌ ERROR! No ctx.db in actions
    return null;
  },
});

// ✅ CORRECT: Use ctx.runQuery and ctx.runMutation
export const processData = action({
  args: { id: v.id("items") },
  returns: v.null(),
  handler: async (ctx, args) => {
    // Read via query
    const item = await ctx.runQuery(internal.items.getById, { id: args.id });

    // Call external API
    const result = await fetch("https://api.example.com/process", {
      method: "POST",
      body: JSON.stringify(item),
    });

    // Write via mutation
    await ctx.runMutation(internal.items.updateResult, {
      id: args.id,
      result: await result.json(),
    });

    return null;
  },
});
重点: 动作无法直接访问数据库!
typescript
// ❌ 错误:动作无法访问ctx.db
export const processData = action({
  args: { id: v.id("items") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const item = await ctx.db.get(args.id); // ❌ 错误!动作中没有ctx.db
    return null;
  },
});

// ✅ 正确:使用ctx.runQuery和ctx.runMutation
export const processData = action({
  args: { id: v.id("items") },
  returns: v.null(),
  handler: async (ctx, args) => {
    // 通过查询读取数据
    const item = await ctx.runQuery(internal.items.getById, { id: args.id });

    // 调用外部API
    const result = await fetch("https://api.example.com/process", {
      method: "POST",
      body: JSON.stringify(item),
    });

    // 通过变更写入数据
    await ctx.runMutation(internal.items.updateResult, {
      id: args.id,
      result: await result.json(),
    });

    return null;
  },
});

Node.js Runtime for Actions

动作的Node.js运行时

Add
"use node";
at the top of files using Node.js modules:
typescript
// convex/pdf.ts
"use node";

import { internalAction } from "./_generated/server";
import { v } from "convex/values";
import pdf from "pdf-parse";

export const extractText = internalAction({
  args: { pdfData: v.bytes() },
  returns: v.string(),
  handler: async (ctx, args) => {
    const buffer = Buffer.from(args.pdfData);
    const data = await pdf(buffer);
    return data.text;
  },
});
在使用Node.js模块的文件顶部添加
"use node";
typescript
// convex/pdf.ts
"use node";

import { internalAction } from "./_generated/server";
import { v } from "convex/values";
import pdf from "pdf-parse";

export const extractText = internalAction({
  args: { pdfData: v.bytes() },
  returns: v.string(),
  handler: async (ctx, args) => {
    const buffer = Buffer.from(args.pdfData);
    const data = await pdf(buffer);
    return data.text;
  },
});

Action Patterns

动作模式

Pattern 1: External API Call
typescript
export const sendEmail = internalAction({
  args: {
    to: v.string(),
    subject: v.string(),
    body: v.string(),
  },
  returns: v.object({
    success: v.boolean(),
    messageId: v.optional(v.string()),
  }),
  handler: async (ctx, args) => {
    const response = await fetch("https://api.sendgrid.com/v3/mail/send", {
      method: "POST",
      headers: {
        Authorization: `Bearer ${process.env.SENDGRID_API_KEY}`,
        "Content-Type": "application/json",
      },
      body: JSON.stringify({
        personalizations: [{ to: [{ email: args.to }] }],
        from: { email: "noreply@example.com" },
        subject: args.subject,
        content: [{ type: "text/plain", value: args.body }],
      }),
    });

    if (response.ok) {
      const data = await response.json();
      return { success: true, messageId: data.id };
    }

    return { success: false };
  },
});
Pattern 2: Multi-Step Workflow
typescript
export const processOrder = internalAction({
  args: { orderId: v.id("orders") },
  returns: v.null(),
  handler: async (ctx, args) => {
    // Step 1: Get order details
    const order = await ctx.runQuery(internal.orders.getById, {
      orderId: args.orderId,
    });
    if (!order) throw new Error("Order not found");

    try {
      // Step 2: Charge payment (external API)
      const paymentResult = await fetch("https://api.stripe.com/v1/charges", {
        method: "POST",
        headers: {
          Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
          "Content-Type": "application/x-www-form-urlencoded",
        },
        body: new URLSearchParams({
          amount: String(order.total),
          currency: "usd",
          source: order.paymentMethodId,
        }),
      });

      if (!paymentResult.ok) {
        throw new Error("Payment failed");
      }

      const paymentData = await paymentResult.json();

      // Step 3: Update order status
      await ctx.runMutation(internal.orders.markPaid, {
        orderId: args.orderId,
        chargeId: paymentData.id,
      });

      // Step 4: Schedule fulfillment
      await ctx.scheduler.runAfter(0, internal.fulfillment.processOrder, {
        orderId: args.orderId,
      });
    } catch (error) {
      await ctx.runMutation(internal.orders.markFailed, {
        orderId: args.orderId,
        error: String(error),
      });
    }

    return null;
  },
});
模式1:外部API调用
typescript
export const sendEmail = internalAction({
  args: {
    to: v.string(),
    subject: v.string(),
    body: v.string(),
  },
  returns: v.object({
    success: v.boolean(),
    messageId: v.optional(v.string()),
  }),
  handler: async (ctx, args) => {
    const response = await fetch("https://api.sendgrid.com/v3/mail/send", {
      method: "POST",
      headers: {
        Authorization: `Bearer ${process.env.SENDGRID_API_KEY}`,
        "Content-Type": "application/json",
      },
      body: JSON.stringify({
        personalizations: [{ to: [{ email: args.to }] }],
        from: { email: "noreply@example.com" },
        subject: args.subject,
        content: [{ type: "text/plain", value: args.body }],
      }),
    });

    if (response.ok) {
      const data = await response.json();
      return { success: true, messageId: data.id };
    }

    return { success: false };
  },
});
模式2:多步骤工作流
typescript
export const processOrder = internalAction({
  args: { orderId: v.id("orders") },
  returns: v.null(),
  handler: async (ctx, args) => {
    // 步骤1:获取订单详情
    const order = await ctx.runQuery(internal.orders.getById, {
      orderId: args.orderId,
    });
    if (!order) throw new Error("订单未找到");

    try {
      // 步骤2:处理支付(外部API)
      const paymentResult = await fetch("https://api.stripe.com/v1/charges", {
        method: "POST",
        headers: {
          Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
          "Content-Type": "application/x-www-form-urlencoded",
        },
        body: new URLSearchParams({
          amount: String(order.total),
          currency: "usd",
          source: order.paymentMethodId,
        }),
      });

      if (!paymentResult.ok) {
        throw new Error("支付失败");
      }

      const paymentData = await paymentResult.json();

      // 步骤3:更新订单状态
      await ctx.runMutation(internal.orders.markPaid, {
        orderId: args.orderId,
        chargeId: paymentData.id,
      });

      // 步骤4:调度订单履约
      await ctx.scheduler.runAfter(0, internal.fulfillment.processOrder, {
        orderId: args.orderId,
      });
    } catch (error) {
      await ctx.runMutation(internal.orders.markFailed, {
        orderId: args.orderId,
        error: String(error),
      });
    }

    return null;
  },
});

Scheduling

任务调度

Fire-and-Forget (Immediate)

即发即弃(立即执行)

Schedule work to run immediately but asynchronously:
typescript
export const submitJob = mutation({
  args: { data: v.string() },
  returns: v.id("jobs"),
  handler: async (ctx, args) => {
    const jobId = await ctx.db.insert("jobs", {
      data: args.data,
      status: "pending",
    });

    // Schedule immediately (0ms delay)
    await ctx.scheduler.runAfter(0, internal.jobs.process, { jobId });

    return jobId;
  },
});
调度任务立即异步执行:
typescript
export const submitJob = mutation({
  args: { data: v.string() },
  returns: v.id("jobs"),
  handler: async (ctx, args) => {
    const jobId = await ctx.db.insert("jobs", {
      data: args.data,
      status: "pending",
    });

    // 立即调度(延迟0毫秒)
    await ctx.scheduler.runAfter(0, internal.jobs.process, { jobId });

    return jobId;
  },
});

Delayed Execution

延迟执行

Schedule work to run after a delay:
typescript
// Self-destructing message
export const sendExpiringMessage = mutation({
  args: { body: v.string(), expiresInMs: v.number() },
  returns: v.id("messages"),
  handler: async (ctx, args) => {
    const id = await ctx.db.insert("messages", { body: args.body });

    // Delete after specified time
    await ctx.scheduler.runAfter(args.expiresInMs, internal.messages.delete, {
      id,
    });

    return id;
  },
});

export const delete_ = internalMutation({
  args: { id: v.id("messages") },
  returns: v.null(),
  handler: async (ctx, args) => {
    await ctx.db.delete(args.id);
    return null;
  },
});
调度任务在指定延迟后执行:
typescript
// 自动销毁消息
export const sendExpiringMessage = mutation({
  args: { body: v.string(), expiresInMs: v.number() },
  returns: v.id("messages"),
  handler: async (ctx, args) => {
    const id = await ctx.db.insert("messages", { body: args.body });

    // 指定时间后删除
    await ctx.scheduler.runAfter(args.expiresInMs, internal.messages.delete, {
      id,
    });

    return id;
  },
});

export const delete_ = internalMutation({
  args: { id: v.id("messages") },
  returns: v.null(),
  handler: async (ctx, args) => {
    await ctx.db.delete(args.id);
    return null;
  },
});

Scheduled at Specific Time

指定时间执行

typescript
export const scheduleReminder = mutation({
  args: {
    userId: v.id("users"),
    message: v.string(),
    sendAt: v.number(), // Unix timestamp
  },
  returns: v.id("scheduledFunctions"),
  handler: async (ctx, args) => {
    // Schedule at specific timestamp
    return await ctx.scheduler.runAt(
      args.sendAt,
      internal.notifications.sendReminder,
      {
        userId: args.userId,
        message: args.message,
      }
    );
  },
});
typescript
export const scheduleReminder = mutation({
  args: {
    userId: v.id("users"),
    message: v.string(),
    sendAt: v.number(), // Unix时间戳
  },
  returns: v.id("scheduledFunctions"),
  handler: async (ctx, args) => {
    // 在指定时间戳调度
    return await ctx.scheduler.runAt(
      args.sendAt,
      internal.notifications.sendReminder,
      {
        userId: args.userId,
        message: args.message,
      }
    );
  },
});

Cancel Scheduled Functions

取消已调度任务

typescript
export const cancelReminder = mutation({
  args: { scheduledId: v.id("_scheduled_functions") },
  returns: v.null(),
  handler: async (ctx, args) => {
    await ctx.scheduler.cancel(args.scheduledId);
    return null;
  },
});
typescript
export const cancelReminder = mutation({
  args: { scheduledId: v.id("_scheduled_functions") },
  returns: v.null(),
  handler: async (ctx, args) => {
    await ctx.scheduler.cancel(args.scheduledId);
    return null;
  },
});

Scheduling from Actions

在动作中调度任务

Actions can also schedule work:
typescript
export const processWithRetry = internalAction({
  args: { jobId: v.id("jobs"), attempt: v.number() },
  returns: v.null(),
  handler: async (ctx, args) => {
    try {
      // Try to process
      const job = await ctx.runQuery(internal.jobs.getById, {
        jobId: args.jobId,
      });
      const result = await fetch("https://api.example.com/process", {
        method: "POST",
        body: JSON.stringify(job),
      });

      if (!result.ok) throw new Error("API error");

      await ctx.runMutation(internal.jobs.markComplete, {
        jobId: args.jobId,
        result: await result.json(),
      });
    } catch (error) {
      if (args.attempt < 3) {
        // Retry with exponential backoff
        const delay = Math.pow(2, args.attempt) * 1000;
        await ctx.scheduler.runAfter(delay, internal.jobs.processWithRetry, {
          jobId: args.jobId,
          attempt: args.attempt + 1,
        });
      } else {
        await ctx.runMutation(internal.jobs.markFailed, {
          jobId: args.jobId,
          error: String(error),
        });
      }
    }

    return null;
  },
});
动作同样可以调度任务:
typescript
export const processWithRetry = internalAction({
  args: { jobId: v.id("jobs"), attempt: v.number() },
  returns: v.null(),
  handler: async (ctx, args) => {
    try {
      // 尝试处理任务
      const job = await ctx.runQuery(internal.jobs.getById, {
        jobId: args.jobId,
      });
      const result = await fetch("https://api.example.com/process", {
        method: "POST",
        body: JSON.stringify(job),
      });

      if (!result.ok) throw new Error("API错误");

      await ctx.runMutation(internal.jobs.markComplete, {
        jobId: args.jobId,
        result: await result.json(),
      });
    } catch (error) {
      if (args.attempt < 3) {
        // 指数退避重试
        const delay = Math.pow(2, args.attempt) * 1000;
        await ctx.scheduler.runAfter(delay, internal.jobs.processWithRetry, {
          jobId: args.jobId,
          attempt: args.attempt + 1,
        });
      } else {
        await ctx.runMutation(internal.jobs.markFailed, {
          jobId: args.jobId,
          error: String(error),
        });
      }
    }

    return null;
  },
});

Cron Jobs

Cron任务

Creating Cron Jobs

创建Cron任务

Cron jobs must be defined in
convex/crons.ts
:
typescript
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";

const crons = cronJobs();

// Run every hour
crons.interval(
  "cleanup stale jobs",
  { hours: 1 },
  internal.jobs.cleanupStale,
  {}
);

// Run at specific cron schedule
crons.cron(
  "daily report",
  "0 9 * * *", // 9 AM every day
  internal.reports.generateDaily,
  {}
);

// Run every 5 minutes
crons.interval(
  "sync external data",
  { minutes: 5 },
  internal.sync.pullExternalData,
  {}
);

// Run weekly on Sundays at midnight
crons.cron(
  "weekly cleanup",
  "0 0 * * 0",
  internal.maintenance.weeklyCleanup,
  {}
);

export default crons;
Cron任务必须定义在
convex/crons.ts
文件中:
typescript
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";

const crons = cronJobs();

// 每小时执行一次
crons.interval(
  "cleanup stale jobs",
  { hours: 1 },
  internal.jobs.cleanupStale,
  {}
);

// 按指定Cron表达式执行
crons.cron(
  "daily report",
  "0 9 * * *", // 每天上午9点
  internal.reports.generateDaily,
  {}
);

// 每5分钟执行一次
crons.interval(
  "sync external data",
  { minutes: 5 },
  internal.sync.pullExternalData,
  {}
);

// 每周日午夜执行
crons.cron(
  "weekly cleanup",
  "0 0 * * 0",
  internal.maintenance.weeklyCleanup,
  {}
);

export default crons;

Interval Options

时间间隔配置

typescript
// Various interval configurations
crons.interval("every-minute", { minutes: 1 }, handler, {});
crons.interval("every-hour", { hours: 1 }, handler, {});
crons.interval("every-day", { hours: 24 }, handler, {});
crons.interval("every-30-seconds", { seconds: 30 }, handler, {});
typescript
// 各种时间间隔配置
crons.interval("every-minute", { minutes: 1 }, handler, {});
crons.interval("every-hour", { hours: 1 }, handler, {});
crons.interval("every-day", { hours: 24 }, handler, {});
crons.interval("every-30-seconds", { seconds: 30 }, handler, {});

Cron Schedule Syntax

Cron表达式语法

Standard cron format:
minute hour day month weekday
typescript
// Examples
"* * * * *"; // Every minute
"0 * * * *"; // Every hour
"0 0 * * *"; // Every day at midnight
"0 9 * * *"; // Every day at 9 AM
"0 0 * * 0"; // Every Sunday at midnight
"0 0 1 * *"; // First day of every month
"*/5 * * * *"; // Every 5 minutes
"0 */2 * * *"; // Every 2 hours
标准Cron格式:
分钟 小时 日期 月份 星期
typescript
// 示例
"* * * * *"; // 每分钟
"0 * * * *"; // 每小时
"0 0 * * *"; // 每天午夜
"0 9 * * *"; // 每天上午9点
"0 0 * * 0"; // 每周日午夜
"0 0 1 * *"; // 每月第一天
"*/5 * * * *"; // 每5分钟
"0 */2 * * *"; // 每2小时

Cron Job Implementation

Cron任务实现

typescript
// convex/jobs.ts
export const cleanupStale = internalMutation({
  args: {},
  returns: v.number(),
  handler: async (ctx) => {
    const oneHourAgo = Date.now() - 60 * 60 * 1000;

    const staleJobs = await ctx.db
      .query("jobs")
      .withIndex("by_status_and_createdAt", (q) =>
        q.eq("status", "pending").lt("createdAt", oneHourAgo)
      )
      .collect();

    for (const job of staleJobs) {
      await ctx.db.patch(job._id, { status: "stale" });
    }

    return staleJobs.length;
  },
});
typescript
// convex/jobs.ts
export const cleanupStale = internalMutation({
  args: {},
  returns: v.number(),
  handler: async (ctx) => {
    const oneHourAgo = Date.now() - 60 * 60 * 1000;

    const staleJobs = await ctx.db
      .query("jobs")
      .withIndex("by_status_and_createdAt", (q) =>
        q.eq("status", "pending").lt("createdAt", oneHourAgo)
      )
      .collect();

    for (const job of staleJobs) {
      await ctx.db.patch(job._id, { status: "stale" });
    }

    return staleJobs.length;
  },
});

Orchestration Patterns

编排模式

Pattern 1: Saga Pattern (Compensating Transactions)

模式1:Saga模式(补偿事务)

For operations that span multiple services with rollback:
typescript
export const createSubscription = internalAction({
  args: {
    userId: v.id("users"),
    planId: v.string(),
  },
  returns: v.union(v.id("subscriptions"), v.null()),
  handler: async (ctx, args) => {
    const user = await ctx.runQuery(internal.users.getById, {
      userId: args.userId,
    });
    if (!user) throw new Error("User not found");

    // Step 1: Create Stripe subscription
    let stripeSubscriptionId: string | null = null;
    try {
      const stripeResponse = await fetch(
        "https://api.stripe.com/v1/subscriptions",
        {
          method: "POST",
          headers: {
            Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
            "Content-Type": "application/x-www-form-urlencoded",
          },
          body: new URLSearchParams({
            customer: user.stripeCustomerId,
            items: [{ price: args.planId }]
              .map((i, idx) => `items[${idx}][price]=${i.price}`)
              .join("&"),
          }),
        }
      );

      if (!stripeResponse.ok) {
        throw new Error("Stripe subscription failed");
      }

      const stripeData = await stripeResponse.json();
      stripeSubscriptionId = stripeData.id;
    } catch (error) {
      // No cleanup needed - nothing created yet
      return null;
    }

    // Step 2: Create local subscription record
    try {
      const subscriptionId = await ctx.runMutation(
        internal.subscriptions.create,
        {
          userId: args.userId,
          planId: args.planId,
          stripeSubscriptionId,
        }
      );

      return subscriptionId;
    } catch (error) {
      // Rollback: Cancel Stripe subscription
      await fetch(
        `https://api.stripe.com/v1/subscriptions/${stripeSubscriptionId}`,
        {
          method: "DELETE",
          headers: {
            Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
          },
        }
      );

      return null;
    }
  },
});
适用于跨多个服务且需要回滚的操作:
typescript
export const createSubscription = internalAction({
  args: {
    userId: v.id("users"),
    planId: v.string(),
  },
  returns: v.union(v.id("subscriptions"), v.null()),
  handler: async (ctx, args) => {
    const user = await ctx.runQuery(internal.users.getById, {
      userId: args.userId,
    });
    if (!user) throw new Error("用户未找到");

    // 步骤1:创建Stripe订阅
    let stripeSubscriptionId: string | null = null;
    try {
      const stripeResponse = await fetch(
        "https://api.stripe.com/v1/subscriptions",
        {
          method: "POST",
          headers: {
            Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
            "Content-Type": "application/x-www-form-urlencoded",
          },
          body: new URLSearchParams({
            customer: user.stripeCustomerId,
            items: [{ price: args.planId }]
              .map((i, idx) => `items[${idx}][price]=${i.price}`)
              .join("&"),
          }),
        }
      );

      if (!stripeResponse.ok) {
        throw new Error("Stripe订阅创建失败");
      }

      const stripeData = await stripeResponse.json();
      stripeSubscriptionId = stripeData.id;
    } catch (error) {
      // 无需清理:尚未创建任何资源
      return null;
    }

    // 步骤2:创建本地订阅记录
    try {
      const subscriptionId = await ctx.runMutation(
        internal.subscriptions.create,
        {
          userId: args.userId,
          planId: args.planId,
          stripeSubscriptionId,
        }
      );

      return subscriptionId;
    } catch (error) {
      // 回滚:取消Stripe订阅
      await fetch(
        `https://api.stripe.com/v1/subscriptions/${stripeSubscriptionId}`,
        {
          method: "DELETE",
          headers: {
            Authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
          },
        }
      );

      return null;
    }
  },
});

Pattern 2: Fan-Out / Fan-In

模式2:扇出/扇入

Process multiple items in parallel, then aggregate:
typescript
// Fan-out: Schedule processing for each item
export const processAll = mutation({
  args: { itemIds: v.array(v.id("items")) },
  returns: v.id("batchJobs"),
  handler: async (ctx, args) => {
    const batchId = await ctx.db.insert("batchJobs", {
      totalItems: args.itemIds.length,
      completedItems: 0,
      status: "processing",
    });

    // Fan-out: Schedule each item
    for (const itemId of args.itemIds) {
      await ctx.scheduler.runAfter(0, internal.items.processOne, {
        itemId,
        batchId,
      });
    }

    return batchId;
  },
});

// Process single item
export const processOne = internalAction({
  args: { itemId: v.id("items"), batchId: v.id("batchJobs") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const item = await ctx.runQuery(internal.items.getById, {
      itemId: args.itemId,
    });

    // Process item (external API, etc.)
    const result = await fetch("https://api.example.com/process", {
      method: "POST",
      body: JSON.stringify(item),
    });

    // Update item with result
    await ctx.runMutation(internal.items.updateResult, {
      itemId: args.itemId,
      result: await result.json(),
    });

    // Fan-in: Update batch progress
    await ctx.runMutation(internal.batches.incrementCompleted, {
      batchId: args.batchId,
    });

    return null;
  },
});

// Fan-in: Track completion
export const incrementCompleted = internalMutation({
  args: { batchId: v.id("batchJobs") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const batch = await ctx.db.get(args.batchId);
    if (!batch) return null;

    const newCompleted = batch.completedItems + 1;
    const isComplete = newCompleted >= batch.totalItems;

    await ctx.db.patch(args.batchId, {
      completedItems: newCompleted,
      status: isComplete ? "completed" : "processing",
    });

    if (isComplete) {
      // Trigger completion handler
      await ctx.scheduler.runAfter(0, internal.batches.onComplete, {
        batchId: args.batchId,
      });
    }

    return null;
  },
});
并行处理多个项,然后聚合结果:
typescript
// 扇出:为每个项调度处理任务
export const processAll = mutation({
  args: { itemIds: v.array(v.id("items")) },
  returns: v.id("batchJobs"),
  handler: async (ctx, args) => {
    const batchId = await ctx.db.insert("batchJobs", {
      totalItems: args.itemIds.length,
      completedItems: 0,
      status: "processing",
    });

    // 扇出:为每个项调度任务
    for (const itemId of args.itemIds) {
      await ctx.scheduler.runAfter(0, internal.items.processOne, {
        itemId,
        batchId,
      });
    }

    return batchId;
  },
});

// 处理单个项
export const processOne = internalAction({
  args: { itemId: v.id("items"), batchId: v.id("batchJobs") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const item = await ctx.runQuery(internal.items.getById, {
      itemId: args.itemId,
    });

    // 处理项(外部API等)
    const result = await fetch("https://api.example.com/process", {
      method: "POST",
      body: JSON.stringify(item),
    });

    // 更新项的处理结果
    await ctx.runMutation(internal.items.updateResult, {
      itemId: args.itemId,
      result: await result.json(),
    });

    // 扇入:更新批处理进度
    await ctx.runMutation(internal.batches.incrementCompleted, {
      batchId: args.batchId,
    });

    return null;
  },
});

// 扇入:跟踪完成状态
export const incrementCompleted = internalMutation({
  args: { batchId: v.id("batchJobs") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const batch = await ctx.db.get(args.batchId);
    if (!batch) return null;

    const newCompleted = batch.completedItems + 1;
    const isComplete = newCompleted >= batch.totalItems;

    await ctx.db.patch(args.batchId, {
      completedItems: newCompleted,
      status: isComplete ? "completed" : "processing",
    });

    if (isComplete) {
      // 触发完成处理逻辑
      await ctx.scheduler.runAfter(0, internal.batches.onComplete, {
        batchId: args.batchId,
      });
    }

    return null;
  },
});

Pattern 3: Retry with Exponential Backoff and Jitter

模式3:指数退避与抖动重试

typescript
export const processWithRetry = internalAction({
  args: {
    jobId: v.id("jobs"),
    attempt: v.number(),
    maxAttempts: v.optional(v.number()),
  },
  returns: v.null(),
  handler: async (ctx, args) => {
    const maxAttempts = args.maxAttempts ?? 5;

    try {
      const job = await ctx.runQuery(internal.jobs.getById, {
        jobId: args.jobId,
      });
      if (!job) return null;

      const response = await fetch("https://api.example.com/process", {
        method: "POST",
        body: JSON.stringify(job),
      });

      if (!response.ok) {
        throw new Error(`API error: ${response.status}`);
      }

      await ctx.runMutation(internal.jobs.markComplete, {
        jobId: args.jobId,
        result: await response.json(),
      });
    } catch (error) {
      if (args.attempt < maxAttempts) {
        // Exponential backoff with jitter
        const baseDelay = Math.pow(2, args.attempt) * 1000;
        const jitter = Math.random() * 1000;
        const delay = baseDelay + jitter;

        await ctx.scheduler.runAfter(delay, internal.jobs.processWithRetry, {
          jobId: args.jobId,
          attempt: args.attempt + 1,
          maxAttempts,
        });

        await ctx.runMutation(internal.jobs.updateAttempt, {
          jobId: args.jobId,
          attempt: args.attempt + 1,
          nextRetryAt: Date.now() + delay,
        });
      } else {
        await ctx.runMutation(internal.jobs.markFailed, {
          jobId: args.jobId,
          error: String(error),
          finalAttempt: args.attempt,
        });
      }
    }

    return null;
  },
});
typescript
export const processWithRetry = internalAction({
  args: {
    jobId: v.id("jobs"),
    attempt: v.number(),
    maxAttempts: v.optional(v.number()),
  },
  returns: v.null(),
  handler: async (ctx, args) => {
    const maxAttempts = args.maxAttempts ?? 5;

    try {
      const job = await ctx.runQuery(internal.jobs.getById, {
        jobId: args.jobId,
      });
      if (!job) return null;

      const response = await fetch("https://api.example.com/process", {
        method: "POST",
        body: JSON.stringify(job),
      });

      if (!response.ok) {
        throw new Error(`API错误: ${response.status}`);
      }

      await ctx.runMutation(internal.jobs.markComplete, {
        jobId: args.jobId,
        result: await response.json(),
      });
    } catch (error) {
      if (args.attempt < maxAttempts) {
        // 指数退避加抖动
        const baseDelay = Math.pow(2, args.attempt) * 1000;
        const jitter = Math.random() * 1000;
        const delay = baseDelay + jitter;

        await ctx.scheduler.runAfter(delay, internal.jobs.processWithRetry, {
          jobId: args.jobId,
          attempt: args.attempt + 1,
          maxAttempts,
        });

        await ctx.runMutation(internal.jobs.updateAttempt, {
          jobId: args.jobId,
          attempt: args.attempt + 1,
          nextRetryAt: Date.now() + delay,
        });
      } else {
        await ctx.runMutation(internal.jobs.markFailed, {
          jobId: args.jobId,
          error: String(error),
          finalAttempt: args.attempt,
        });
      }
    }

    return null;
  },
});

Pattern 4: Idempotency Keys

模式4:幂等键

Prevent duplicate processing:
typescript
export const processPayment = mutation({
  args: {
    idempotencyKey: v.string(),
    amount: v.number(),
    customerId: v.string(),
  },
  returns: v.union(v.id("payments"), v.null()),
  handler: async (ctx, args) => {
    // Check if already processed
    const existing = await ctx.db
      .query("payments")
      .withIndex("by_idempotency_key", (q) =>
        q.eq("idempotencyKey", args.idempotencyKey)
      )
      .unique();

    if (existing) return existing._id; // Already done, return existing

    // Process and record
    const paymentId = await ctx.db.insert("payments", {
      idempotencyKey: args.idempotencyKey,
      amount: args.amount,
      customerId: args.customerId,
      status: "pending",
    });

    await ctx.scheduler.runAfter(0, internal.payments.charge, { paymentId });

    return paymentId;
  },
});
防止重复处理:
typescript
export const processPayment = mutation({
  args: {
    idempotencyKey: v.string(),
    amount: v.number(),
    customerId: v.string(),
  },
  returns: v.union(v.id("payments"), v.null()),
  handler: async (ctx, args) => {
    // 检查是否已处理过
    const existing = await ctx.db
      .query("payments")
      .withIndex("by_idempotency_key", (q) =>
        q.eq("idempotencyKey", args.idempotencyKey)
      )
      .unique();

    if (existing) return existing._id; // 已处理,返回现有记录

    // 处理并记录
    const paymentId = await ctx.db.insert("payments", {
      idempotencyKey: args.idempotencyKey,
      amount: args.amount,
      customerId: args.customerId,
      status: "pending",
    });

    await ctx.scheduler.runAfter(0, internal.payments.charge, { paymentId });

    return paymentId;
  },
});

Common Pitfalls

常见陷阱

Pitfall 1: Actions Without Error Handling

陷阱1:动作未处理错误

❌ WRONG:
typescript
export const sendNotification = internalAction({
  args: { userId: v.id("users") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const user = await ctx.runQuery(internal.users.getById, {
      userId: args.userId,
    });

    // If this fails, we lose track of the failure
    await fetch("https://api.pushover.net/send", {
      method: "POST",
      body: JSON.stringify({ user: user.pushToken, message: "Hello" }),
    });

    return null;
  },
});
✅ CORRECT:
typescript
export const sendNotification = internalAction({
  args: { userId: v.id("users"), notificationId: v.id("notifications") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const user = await ctx.runQuery(internal.users.getById, {
      userId: args.userId,
    });

    try {
      const response = await fetch("https://api.pushover.net/send", {
        method: "POST",
        body: JSON.stringify({ user: user.pushToken, message: "Hello" }),
      });

      if (!response.ok) {
        throw new Error(`Push API error: ${response.status}`);
      }

      await ctx.runMutation(internal.notifications.markSent, {
        notificationId: args.notificationId,
      });
    } catch (error) {
      await ctx.runMutation(internal.notifications.markFailed, {
        notificationId: args.notificationId,
        error: String(error),
      });
    }

    return null;
  },
});
❌ 错误示例:
typescript
export const sendNotification = internalAction({
  args: { userId: v.id("users") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const user = await ctx.runQuery(internal.users.getById, {
      userId: args.userId,
    });

    // 如果调用失败,无法追踪失败状态
    await fetch("https://api.pushover.net/send", {
      method: "POST",
      body: JSON.stringify({ user: user.pushToken, message: "Hello" }),
    });

    return null;
  },
});
✅ 正确示例:
typescript
export const sendNotification = internalAction({
  args: { userId: v.id("users"), notificationId: v.id("notifications") },
  returns: v.null(),
  handler: async (ctx, args) => {
    const user = await ctx.runQuery(internal.users.getById, {
      userId: args.userId,
    });

    try {
      const response = await fetch("https://api.pushover.net/send", {
        method: "POST",
        body: JSON.stringify({ user: user.pushToken, message: "Hello" }),
      });

      if (!response.ok) {
        throw new Error(`推送API错误: ${response.status}`);
      }

      await ctx.runMutation(internal.notifications.markSent, {
        notificationId: args.notificationId,
      });
    } catch (error) {
      await ctx.runMutation(internal.notifications.markFailed, {
        notificationId: args.notificationId,
        error: String(error),
      });
    }

    return null;
  },
});

Pitfall 2: Not Using Internal Functions

陷阱2:未使用内部函数

❌ WRONG:
typescript
// Public action callable by anyone!
export const deleteAllUserData = action({
  args: { userId: v.id("users") },
  returns: v.null(),
  handler: async (ctx, args) => {
    // Dangerous! No auth check, publicly accessible
    await ctx.runMutation(api.users.delete, { userId: args.userId });
    return null;
  },
});
✅ CORRECT:
typescript
// Internal action - only callable from other functions
export const deleteAllUserData = internalAction({
  args: { userId: v.id("users") },
  returns: v.null(),
  handler: async (ctx, args) => {
    // Safe - only called from authenticated internal code
    await ctx.runMutation(internal.users.delete, { userId: args.userId });
    return null;
  },
});

// Public mutation with auth check schedules the internal action
export const requestAccountDeletion = mutation({
  args: {},
  returns: v.null(),
  handler: async (ctx) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("Unauthorized");

    const user = await ctx.db
      .query("users")
      .withIndex("by_tokenIdentifier", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("User not found");

    await ctx.scheduler.runAfter(0, internal.users.deleteAllUserData, {
      userId: user._id,
    });

    return null;
  },
});
❌ 错误示例:
typescript
// 公开可调用的动作!存在安全风险
export const deleteAllUserData = action({
  args: { userId: v.id("users") },
  returns: v.null(),
  handler: async (ctx, args) => {
    // 危险!无权限校验,公开可访问
    await ctx.runMutation(api.users.delete, { userId: args.userId });
    return null;
  },
});
✅ 正确示例:
typescript
// 内部动作 - 仅可被其他内部函数调用
export const deleteAllUserData = internalAction({
  args: { userId: v.id("users") },
  returns: v.null(),
  handler: async (ctx, args) => {
    // 安全 - 仅可通过已认证的内部代码调用
    await ctx.runMutation(internal.users.delete, { userId: args.userId });
    return null;
  },
});

// 带权限校验的公开变更操作,调度内部动作
export const requestAccountDeletion = mutation({
  args: {},
  returns: v.null(),
  handler: async (ctx) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("未授权");

    const user = await ctx.db
      .query("users")
      .withIndex("by_tokenIdentifier", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("用户未找到");

    await ctx.scheduler.runAfter(0, internal.users.deleteAllUserData, {
      userId: user._id,
    });

    return null;
  },
});

Pitfall 3: Thundering Herd

陷阱3:惊群效应

❌ WRONG:
typescript
// All retries happen at the same time
const retryDelay = 5000;
await ctx.scheduler.runAfter(retryDelay, internal.jobs.retry, { jobId });
✅ CORRECT:
typescript
// Add jitter to spread out retries
const baseDelay = 5000;
const jitter = Math.random() * 1000;
await ctx.scheduler.runAfter(baseDelay + jitter, internal.jobs.retry, {
  jobId,
});
❌ 错误示例:
typescript
// 所有重试同时执行
const retryDelay = 5000;
await ctx.scheduler.runAfter(retryDelay, internal.jobs.retry, { jobId });
✅ 正确示例:
typescript
// 添加抖动分散重试时间
const baseDelay = 5000;
const jitter = Math.random() * 1000;
await ctx.scheduler.runAfter(baseDelay + jitter, internal.jobs.retry, {
  jobId,
});

Quick Reference

快速参考

Scheduling Methods

调度方法

typescript
// Immediate (0ms delay)
await ctx.scheduler.runAfter(0, internal.jobs.process, { jobId });

// Delayed (milliseconds)
await ctx.scheduler.runAfter(5000, internal.messages.delete, { id });

// At specific timestamp
await ctx.scheduler.runAt(timestamp, internal.reports.send, {});

// Cancel scheduled function
await ctx.scheduler.cancel(scheduledFunctionId);
typescript
// 立即执行(延迟0毫秒)
await ctx.scheduler.runAfter(0, internal.jobs.process, { jobId });

// 延迟执行(毫秒)
await ctx.scheduler.runAfter(5000, internal.messages.delete, { id });

// 指定时间戳执行
await ctx.scheduler.runAt(timestamp, internal.reports.send, {});

// 取消已调度任务
await ctx.scheduler.cancel(scheduledFunctionId);

Cron Syntax

Cron表达式示例

ExpressionDescription
* * * * *
Every minute
0 * * * *
Every hour
0 0 * * *
Every day at midnight
0 9 * * 1-5
9 AM weekdays
*/15 * * * *
Every 15 minutes
0 0 1 * *
First of month
表达式描述
* * * * *
每分钟执行一次
0 * * * *
每小时执行一次
0 0 * * *
每天午夜执行一次
0 9 * * 1-5
工作日上午9点执行
*/15 * * * *
每15分钟执行一次
0 0 1 * *
每月第一天执行一次

Action Context Methods

动作上下文方法

typescript
// Read data
await ctx.runQuery(internal.table.query, args);

// Write data
await ctx.runMutation(internal.table.mutation, args);

// Call another action
await ctx.runAction(internal.external.action, args);

// Schedule work
await ctx.scheduler.runAfter(delay, internal.jobs.process, args);
await ctx.scheduler.runAt(timestamp, internal.jobs.process, args);
typescript
// 读取数据
await ctx.runQuery(internal.table.query, args);

// 写入数据
await ctx.runMutation(internal.table.mutation, args);

// 调用其他动作
await ctx.runAction(internal.external.action, args);

// 调度任务
await ctx.scheduler.runAfter(delay, internal.jobs.process, args);
await ctx.scheduler.runAt(timestamp, internal.jobs.process, args);