convex-helpers-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Convex Helpers Library Patterns

convex-helpers 库模式指南

Overview

概述

The
convex-helpers
library provides battle-tested patterns for common Convex development needs. This skill covers Triggers (automatic side effects), Row-Level Security, Relationship helpers, Custom Functions, Rate Limiting, and Workpool for concurrency control.
convex-helpers
库为常见的 Convex 开发需求提供了经过实战检验的模式。本指南涵盖 Triggers(自动副作用)、行级安全(RLS)、关系助手、自定义函数、速率限制以及用于并发控制的 Workpool。

Installation

安装

bash
npm install convex-helpers @convex-dev/workpool
bash
npm install convex-helpers @convex-dev/workpool

TypeScript: NEVER Use
any
Type

TypeScript:切勿使用
any
类型

CRITICAL RULE: This codebase has
@typescript-eslint/no-explicit-any
enabled. Using
any
will cause build failures.
重要规则: 此代码库启用了
@typescript-eslint/no-explicit-any
规则。使用
any
会导致构建失败。

When to Use This Skill

适用场景

Use this skill when:
  • Implementing automatic side effects on document changes (Triggers)
  • Adding declarative access control (Row-Level Security)
  • Traversing relationships between documents
  • Creating reusable authenticated function wrappers
  • Implementing rate limiting
  • Managing concurrent writes with Workpool
  • Building custom function builders
在以下场景中使用本指南:
  • 实现文档变更时的自动副作用(Triggers)
  • 添加声明式访问控制(行级安全)
  • 遍历文档间的关系
  • 创建可复用的认证函数包装器
  • 实现速率限制
  • 使用 Workpool 管理并发写入
  • 构建自定义函数构建器

Key Patterns Overview

核心模式概览

PatternUse Case
TriggersRun code automatically on document changes
Row-Level SecurityDeclarative access control at the database layer
Relationship HelpersSimplified traversal of document relations
Custom FunctionsWrap queries/mutations with auth, logging, etc.
Rate LimiterApplication-level rate limiting
WorkpoolFan-out parallel jobs, serialize conflicting mutations
MigrationsSchema migrations with state tracking
模式适用场景
Triggers文档变更时自动运行代码
行级安全(RLS)在数据库层实现声明式访问控制
关系助手简化文档关系的遍历操作
自定义函数为查询/突变操作添加认证、日志等包装
速率限制器应用层的请求速率限制
Workpool批量并行任务处理、冲突突变操作序列化
迁移带状态跟踪的 Schema 迁移

Triggers (Automatic Side Effects)

Triggers(自动副作用)

Triggers run code automatically when documents change. They execute atomically within the same transaction as the mutation.
Triggers 会在文档变更时自动运行代码,它们与突变操作在同一事务中原子执行。

Setting Up Triggers

设置 Triggers

typescript
// convex/functions.ts
import { mutation as rawMutation } from "./_generated/server";
import { Triggers } from "convex-helpers/server/triggers";
import {
  customCtx,
  customMutation,
} from "convex-helpers/server/customFunctions";
import { DataModel } from "./_generated/dataModel";

const triggers = new Triggers<DataModel>();

// 1. Compute fullName on every user change
triggers.register("users", async (ctx, change) => {
  if (change.newDoc) {
    const fullName = `${change.newDoc.firstName} ${change.newDoc.lastName}`;
    if (change.newDoc.fullName !== fullName) {
      await ctx.db.patch(change.id, { fullName });
    }
  }
});

// 2. Keep denormalized count (careful: single doc = write contention)
triggers.register("users", async (ctx, change) => {
  const countDoc = (await ctx.db.query("userCount").unique())!;
  if (change.operation === "insert") {
    await ctx.db.patch(countDoc._id, { count: countDoc.count + 1 });
  } else if (change.operation === "delete") {
    await ctx.db.patch(countDoc._id, { count: countDoc.count - 1 });
  }
});

// 3. Cascading deletes
triggers.register("users", async (ctx, change) => {
  if (change.operation === "delete") {
    const messages = await ctx.db
      .query("messages")
      .withIndex("by_author", (q) => q.eq("authorId", change.id))
      .collect();
    for (const msg of messages) {
      await ctx.db.delete(msg._id);
    }
  }
});

// Export wrapped mutation that runs triggers
export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB));
typescript
// convex/functions.ts
import { mutation as rawMutation } from "./_generated/server";
import { Triggers } from "convex-helpers/server/triggers";
import {
  customCtx,
  customMutation,
} from "convex-helpers/server/customFunctions";
import { DataModel } from "./_generated/dataModel";

const triggers = new Triggers<DataModel>();

// 1. 在用户信息变更时自动计算全名
triggers.register("users", async (ctx, change) => {
  if (change.newDoc) {
    const fullName = `${change.newDoc.firstName} ${change.newDoc.lastName}`;
    if (change.newDoc.fullName !== fullName) {
      await ctx.db.patch(change.id, { fullName });
    }
  }
});

// 2. 维护非规范化的计数(注意:单文档写入会引发竞争)
triggers.register("users", async (ctx, change) => {
  const countDoc = (await ctx.db.query("userCount").unique())!;
  if (change.operation === "insert") {
    await ctx.db.patch(countDoc._id, { count: countDoc.count + 1 });
  } else if (change.operation === "delete") {
    await ctx.db.patch(countDoc._id, { count: countDoc.count - 1 });
  }
});

// 3. 级联删除
triggers.register("users", async (ctx, change) => {
  if (change.operation === "delete") {
    const messages = await ctx.db
      .query("messages")
      .withIndex("by_author", (q) => q.eq("authorId", change.id))
      .collect();
    for (const msg of messages) {
      await ctx.db.delete(msg._id);
    }
  }
});

// 导出包含 Triggers 的包装后突变函数
export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB));

Trigger Change Object

Trigger 变更对象

typescript
interface Change<Doc> {
  id: Id<TableName>;
  operation: "insert" | "update" | "delete";
  oldDoc: Doc | null; // null for inserts
  newDoc: Doc | null; // null for deletes
}
typescript
interface Change<Doc> {
  id: Id<TableName>;
  operation: "insert" | "update" | "delete";
  oldDoc: Doc | null; // 插入操作时为 null
  newDoc: Doc | null; // 删除操作时为 null
}

Trigger Warnings

Trigger 注意事项

Warning: Triggers run inside the same transaction as the mutation. Writing to hot-spot documents (e.g., global counters) inside triggers will cause OCC conflicts under load. Use sharding or Workpool for high-contention writes.
警告: Triggers 与突变操作在同一事务中运行。在 Trigger 中写入热点文档(如全局计数器)会在高负载下引发乐观并发控制(OCC)冲突。对于高竞争写入,请使用分片或 Workpool。

Row-Level Security (RLS)

行级安全(RLS)

Declarative access control at the database layer. RLS wraps the database context to enforce rules on every read and write.
在数据库层实现声明式访问控制。RLS 会包装数据库上下文,对每一次读写操作强制执行规则。

Setting Up RLS

设置 RLS

typescript
// convex/functions.ts
import {
  Rules,
  wrapDatabaseReader,
  wrapDatabaseWriter,
} from "convex-helpers/server/rowLevelSecurity";
import {
  customCtx,
  customQuery,
  customMutation,
} from "convex-helpers/server/customFunctions";
import { query, mutation } from "./_generated/server";
import { QueryCtx } from "./_generated/server";
import { DataModel } from "./_generated/dataModel";

async function rlsRules(ctx: QueryCtx) {
  const identity = await ctx.auth.getUserIdentity();

  return {
    users: {
      read: async (_, user) => {
        // Unauthenticated users can only read users over 18
        if (!identity && user.age < 18) return false;
        return true;
      },
      insert: async () => true,
      modify: async (_, user) => {
        if (!identity) throw new Error("Must be authenticated");
        // Users can only modify their own record
        return user.tokenIdentifier === identity.tokenIdentifier;
      },
    },

    messages: {
      read: async (_, message) => {
        // Only read messages in conversations you're a member of
        const conversation = await ctx.db.get(message.conversationId);
        return conversation?.members.includes(identity?.subject ?? "") ?? false;
      },
      modify: async (_, message) => {
        // Only modify your own messages
        return message.authorId === identity?.subject;
      },
    },

    // Table with no restrictions
    publicPosts: {
      read: async () => true,
      insert: async () => true,
      modify: async () => true,
    },
  } satisfies Rules<QueryCtx, DataModel>;
}

// Wrap query/mutation with RLS
export const queryWithRLS = customQuery(
  query,
  customCtx(async (ctx) => ({
    db: wrapDatabaseReader(ctx, ctx.db, await rlsRules(ctx)),
  }))
);

export const mutationWithRLS = customMutation(
  mutation,
  customCtx(async (ctx) => ({
    db: wrapDatabaseWriter(ctx, ctx.db, await rlsRules(ctx)),
  }))
);
typescript
// convex/functions.ts
import {
  Rules,
  wrapDatabaseReader,
  wrapDatabaseWriter,
} from "convex-helpers/server/rowLevelSecurity";
import {
  customCtx,
  customQuery,
  customMutation,
} from "convex-helpers/server/customFunctions";
import { query, mutation } from "./_generated/server";
import { QueryCtx } from "./_generated/server";
import { DataModel } from "./_generated/dataModel";

async function rlsRules(ctx: QueryCtx) {
  const identity = await ctx.auth.getUserIdentity();

  return {
    users: {
      read: async (_, user) => {
        // 未认证用户只能查看年满18岁的用户信息
        if (!identity && user.age < 18) return false;
        return true;
      },
      insert: async () => true,
      modify: async (_, user) => {
        if (!identity) throw new Error("必须先认证");
        // 用户只能修改自己的记录
        return user.tokenIdentifier === identity.tokenIdentifier;
      },
    },

    messages: {
      read: async (_, message) => {
        // 只能查看自己所在会话中的消息
        const conversation = await ctx.db.get(message.conversationId);
        return conversation?.members.includes(identity?.subject ?? "") ?? false;
      },
      modify: async (_, message) => {
        // 只能修改自己发送的消息
        return message.authorId === identity?.subject;
      },
    },

    // 无限制的公开表
    publicPosts: {
      read: async () => true,
      insert: async () => true,
      modify: async () => true,
    },
  } satisfies Rules<QueryCtx, DataModel>;
}

// 为查询/突变操作添加 RLS 包装
export const queryWithRLS = customQuery(
  query,
  customCtx(async (ctx) => ({
    db: wrapDatabaseReader(ctx, ctx.db, await rlsRules(ctx)),
  }))
);

export const mutationWithRLS = customMutation(
  mutation,
  customCtx(async (ctx) => ({
    db: wrapDatabaseWriter(ctx, ctx.db, await rlsRules(ctx)),
  }))
);

Using RLS-Wrapped Functions

使用带 RLS 的包装函数

typescript
// convex/messages.ts
import { queryWithRLS, mutationWithRLS } from "./functions";
import { v } from "convex/values";

// This query automatically enforces RLS rules
export const list = queryWithRLS({
  args: { conversationId: v.id("conversations") },
  returns: v.array(
    v.object({
      _id: v.id("messages"),
      _creationTime: v.number(),
      content: v.string(),
      authorId: v.string(),
    })
  ),
  handler: async (ctx, args) => {
    // RLS automatically filters out unauthorized messages
    return await ctx.db
      .query("messages")
      .withIndex("by_conversation", (q) =>
        q.eq("conversationId", args.conversationId)
      )
      .collect();
  },
});

// This mutation automatically enforces RLS rules
export const update = mutationWithRLS({
  args: { messageId: v.id("messages"), content: v.string() },
  returns: v.null(),
  handler: async (ctx, args) => {
    // RLS checks if user can modify this message
    await ctx.db.patch(args.messageId, { content: args.content });
    return null;
  },
});
typescript
// convex/messages.ts
import { queryWithRLS, mutationWithRLS } from "./functions";
import { v } from "convex/values";

// 此查询会自动强制执行 RLS 规则
export const list = queryWithRLS({
  args: { conversationId: v.id("conversations") },
  returns: v.array(
    v.object({
      _id: v.id("messages"),
      _creationTime: v.number(),
      content: v.string(),
      authorId: v.string(),
    })
  ),
  handler: async (ctx, args) => {
    // RLS 会自动过滤掉无权限的消息
    return await ctx.db
      .query("messages")
      .withIndex("by_conversation", (q) =>
        q.eq("conversationId", args.conversationId)
      )
      .collect();
  },
});

// 此突变操作会自动强制执行 RLS 规则
export const update = mutationWithRLS({
  args: { messageId: v.id("messages"), content: v.string() },
  returns: v.null(),
  handler: async (ctx, args) => {
    // RLS 会检查用户是否有权修改该消息
    await ctx.db.patch(args.messageId, { content: args.content });
    return null;
  },
});

Relationship Helpers

关系助手

Simplify traversing relationships without manual lookups.
无需手动查询即可简化文档关系的遍历操作。

Available Helpers

可用助手函数

typescript
import {
  getAll,
  getOneFrom,
  getManyFrom,
  getManyVia,
} from "convex-helpers/server/relationships";
typescript
import {
  getAll,
  getOneFrom,
  getManyFrom,
  getManyVia,
} from "convex-helpers/server/relationships";

One-to-One Relationship

一对一关系

typescript
// Get single related document via back reference
const profile = await getOneFrom(
  ctx.db,
  "profiles", // target table
  "userId", // index field
  user._id // value to match
);
typescript
// 通过反向引用获取单个关联文档
const profile = await getOneFrom(
  ctx.db,
  "profiles", // 目标表
  "userId", // 索引字段
  user._id // 匹配值
);

One-to-Many (by ID array)

一对多(通过 ID 数组)

typescript
// Load multiple documents by IDs
const users = await getAll(ctx.db, userIds);
// Returns array of documents in same order as IDs (null for missing)
typescript
// 通过 ID 列表加载多个文档
const users = await getAll(ctx.db, userIds);
// 返回与 ID 列表顺序一致的文档数组(不存在的 ID 对应 null)

One-to-Many (via index)

一对多(通过索引)

typescript
// Get all posts by author
const posts = await getManyFrom(
  ctx.db,
  "posts", // target table
  "by_authorId", // index name
  author._id // value to match
);
typescript
// 获取某作者的所有文章
const posts = await getManyFrom(
  ctx.db,
  "posts", // 目标表
  "by_authorId", // 索引名称
  author._id // 匹配值
);

Many-to-Many (via join table)

多对多(通过关联表)

typescript
// Schema:
// posts: { title: v.string() }
// categories: { name: v.string() }
// postCategories: { postId: v.id("posts"), categoryId: v.id("categories") }
//   .index("by_post", ["postId"])
//   .index("by_category", ["categoryId"])

// Get all categories for a post
const categories = await getManyVia(
  ctx.db,
  "postCategories", // join table
  "categoryId", // field pointing to target
  "by_post", // index to query join table
  post._id // source ID
);

// Get all posts in a category
const posts = await getManyVia(
  ctx.db,
  "postCategories",
  "postId",
  "by_category",
  category._id
);
typescript
// 数据结构:
// posts: { title: v.string() }
// categories: { name: v.string() }
// postCategories: { postId: v.id("posts"), categoryId: v.id("categories") }
//   .index("by_post", ["postId"])
//   .index("by_category", ["categoryId"])

// 获取某篇文章的所有分类
const categories = await getManyVia(
  ctx.db,
  "postCategories", // 关联表
  "categoryId", // 指向目标表的字段
  "by_post", // 查询关联表的索引
  post._id // 源文档 ID
);

// 获取某分类下的所有文章
const posts = await getManyVia(
  ctx.db,
  "postCategories",
  "postId",
  "by_category",
  category._id
);

Complete Example

完整示例

typescript
// convex/posts.ts
import { query } from "./_generated/server";
import { v } from "convex/values";
import {
  getOneFrom,
  getManyFrom,
  getManyVia,
} from "convex-helpers/server/relationships";

export const getPostWithDetails = query({
  args: { postId: v.id("posts") },
  returns: v.union(
    v.object({
      post: v.object({
        _id: v.id("posts"),
        title: v.string(),
        body: v.string(),
      }),
      author: v.union(
        v.object({
          _id: v.id("users"),
          name: v.string(),
        }),
        v.null()
      ),
      comments: v.array(
        v.object({
          _id: v.id("comments"),
          body: v.string(),
        })
      ),
      categories: v.array(
        v.object({
          _id: v.id("categories"),
          name: v.string(),
        })
      ),
    }),
    v.null()
  ),
  handler: async (ctx, args) => {
    const post = await ctx.db.get(args.postId);
    if (!post) return null;

    const [author, comments, categories] = await Promise.all([
      // One-to-one: post -> author
      ctx.db.get(post.authorId),

      // One-to-many: post -> comments
      getManyFrom(ctx.db, "comments", "by_post", post._id),

      // Many-to-many: post -> categories (via join table)
      getManyVia(ctx.db, "postCategories", "categoryId", "by_post", post._id),
    ]);

    return {
      post: { _id: post._id, title: post.title, body: post.body },
      author: author ? { _id: author._id, name: author.name } : null,
      comments: comments.map((c) => ({ _id: c._id, body: c.body })),
      categories: categories
        .filter((c): c is NonNullable<typeof c> => c !== null)
        .map((c) => ({ _id: c._id, name: c.name })),
    };
  },
});
typescript
// convex/posts.ts
import { query } from "./_generated/server";
import { v } from "convex/values";
import {
  getOneFrom,
  getManyFrom,
  getManyVia,
} from "convex-helpers/server/relationships";

export const getPostWithDetails = query({
  args: { postId: v.id("posts") },
  returns: v.union(
    v.object({
      post: v.object({
        _id: v.id("posts"),
        title: v.string(),
        body: v.string(),
      }),
      author: v.union(
        v.object({
          _id: v.id("users"),
          name: v.string(),
        }),
        v.null()
      ),
      comments: v.array(
        v.object({
          _id: v.id("comments"),
          body: v.string(),
        })
      ),
      categories: v.array(
        v.object({
          _id: v.id("categories"),
          name: v.string(),
        })
      ),
    }),
    v.null()
  ),
  handler: async (ctx, args) => {
    const post = await ctx.db.get(args.postId);
    if (!post) return null;

    const [author, comments, categories] = await Promise.all([
      // 一对一:文章 -> 作者
      ctx.db.get(post.authorId),

      // 一对多:文章 -> 评论
      getManyFrom(ctx.db, "comments", "by_post", post._id),

      // 多对多:文章 -> 分类(通过关联表)
      getManyVia(ctx.db, "postCategories", "categoryId", "by_post", post._id),
    ]);

    return {
      post: { _id: post._id, title: post.title, body: post.body },
      author: author ? { _id: author._id, name: author.name } : null,
      comments: comments.map((c) => ({ _id: c._id, body: c.body })),
      categories: categories
        .filter((c): c is NonNullable<typeof c> => c !== null)
        .map((c) => ({ _id: c._id, name: c.name })),
    };
  },
});

Custom Functions (Auth Wrappers)

自定义函数(认证包装器)

Create reusable function wrappers with built-in authentication.
创建内置认证功能的可复用函数包装器。

Basic Auth Wrapper

基础认证包装器

typescript
// convex/functions.ts
import {
  customQuery,
  customMutation,
} from "convex-helpers/server/customFunctions";
import { query, mutation } from "./_generated/server";
import { Doc } from "./_generated/dataModel";

// Query that requires authentication
export const authedQuery = customQuery(query, {
  args: {},
  input: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("Unauthorized");

    const user = await ctx.db
      .query("users")
      .withIndex("by_token", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("User not found");

    return { ctx: { ...ctx, user }, args };
  },
});

// Mutation that requires authentication
export const authedMutation = customMutation(mutation, {
  args: {},
  input: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("Unauthorized");

    const user = await ctx.db
      .query("users")
      .withIndex("by_token", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("User not found");

    return { ctx: { ...ctx, user }, args };
  },
});
typescript
// convex/functions.ts
import {
  customQuery,
  customMutation,
} from "convex-helpers/server/customFunctions";
import { query, mutation } from "./_generated/server";
import { Doc } from "./_generated/dataModel";

// 需要认证的查询函数
export const authedQuery = customQuery(query, {
  args: {},
  input: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("未认证");

    const user = await ctx.db
      .query("users")
      .withIndex("by_token", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("用户不存在");

    return { ctx: { ...ctx, user }, args };
  },
});

// 需要认证的突变函数
export const authedMutation = customMutation(mutation, {
  args: {},
  input: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("未认证");

    const user = await ctx.db
      .query("users")
      .withIndex("by_token", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("用户不存在");

    return { ctx: { ...ctx, user }, args };
  },
});

Using Authed Functions

使用带认证的函数

typescript
// convex/profile.ts
import { authedQuery, authedMutation } from "./functions";
import { v } from "convex/values";

// ctx.user is guaranteed to exist
export const getMyProfile = authedQuery({
  args: {},
  returns: v.object({
    _id: v.id("users"),
    name: v.string(),
    email: v.string(),
  }),
  handler: async (ctx) => {
    // ctx.user is typed and guaranteed to exist!
    return {
      _id: ctx.user._id,
      name: ctx.user.name,
      email: ctx.user.email,
    };
  },
});

export const updateMyName = authedMutation({
  args: { name: v.string() },
  returns: v.null(),
  handler: async (ctx, args) => {
    await ctx.db.patch(ctx.user._id, { name: args.name });
    return null;
  },
});
typescript
// convex/profile.ts
import { authedQuery, authedMutation } from "./functions";
import { v } from "convex/values";

// ctx.user 已保证存在
export const getMyProfile = authedQuery({
  args: {},
  returns: v.object({
    _id: v.id("users"),
    name: v.string(),
    email: v.string(),
  }),
  handler: async (ctx) => {
    // ctx.user 已被类型化且保证存在!
    return {
      _id: ctx.user._id,
      name: ctx.user.name,
      email: ctx.user.email,
    };
  },
});

export const updateMyName = authedMutation({
  args: { name: v.string() },
  returns: v.null(),
  handler: async (ctx, args) => {
    await ctx.db.patch(ctx.user._id, { name: args.name });
    return null;
  },
});

Role-Based Auth Wrapper

基于角色的认证包装器

typescript
// convex/functions.ts
export const adminQuery = customQuery(query, {
  args: {},
  input: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("Unauthorized");

    const user = await ctx.db
      .query("users")
      .withIndex("by_token", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("User not found");
    if (user.role !== "admin") throw new Error("Admin access required");

    return { ctx: { ...ctx, user }, args };
  },
});

// Usage
export const listAllUsers = adminQuery({
  args: {},
  returns: v.array(
    v.object({
      _id: v.id("users"),
      name: v.string(),
      role: v.string(),
    })
  ),
  handler: async (ctx) => {
    const users = await ctx.db.query("users").collect();
    return users.map((u) => ({ _id: u._id, name: u.name, role: u.role }));
  },
});
typescript
// convex/functions.ts
export const adminQuery = customQuery(query, {
  args: {},
  input: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("未认证");

    const user = await ctx.db
      .query("users")
      .withIndex("by_token", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("用户不存在");
    if (user.role !== "admin") throw new Error("需要管理员权限");

    return { ctx: { ...ctx, user }, args };
  },
});

// 使用示例
export const listAllUsers = adminQuery({
  args: {},
  returns: v.array(
    v.object({
      _id: v.id("users"),
      name: v.string(),
      role: v.string(),
    })
  ),
  handler: async (ctx) => {
    const users = await ctx.db.query("users").collect();
    return users.map((u) => ({ _id: u._id, name: u.name, role: u.role }));
  },
});

Workpool (Concurrency Control)

Workpool(并发控制)

Workpool manages concurrent execution with parallelism limits, useful for:
  • Serializing writes to avoid OCC conflicts
  • Fan-out parallel processing with limits
  • Rate-limited external API calls
Workpool 通过并行度限制管理并发执行,适用于:
  • 序列化写入操作以避免 OCC 冲突
  • 带限制的批量并行处理
  • 速率受限的外部 API 调用

Setting Up Workpool

设置 Workpool

First, install and configure the Workpool component:
typescript
// convex/convex.config.ts
import { defineApp } from "convex/server";
import workpool from "@convex-dev/workpool/convex.config";

const app = defineApp();
app.use(workpool, { name: "workpool" });

export default app;
首先,安装并配置 Workpool 组件:
typescript
// convex/convex.config.ts
import { defineApp } from "convex/server";
import workpool from "@convex-dev/workpool/convex.config";

const app = defineApp();
app.use(workpool, { name: "workpool" });

export default app;

Using Workpool

使用 Workpool

typescript
// convex/counters.ts
import { Workpool } from "@convex-dev/workpool";
import { components, internal } from "./_generated/api";
import { mutation, internalMutation } from "./_generated/server";
import { v } from "convex/values";

// Create workpool with parallelism limit
const counterPool = new Workpool(components.workpool, {
  maxParallelism: 1, // Serialize all counter updates
});

// Public mutation enqueues work
export const incrementCounter = mutation({
  args: {},
  returns: v.null(),
  handler: async (ctx) => {
    await counterPool.enqueueMutation(ctx, internal.counters.doIncrement, {});
    return null;
  },
});

// Internal mutation does the actual work
export const doIncrement = internalMutation({
  args: {},
  returns: v.null(),
  handler: async (ctx) => {
    const counter = await ctx.db.query("counters").unique();
    if (counter) {
      await ctx.db.patch(counter._id, { count: counter.count + 1 });
    }
    return null;
  },
});
typescript
// convex/counters.ts
import { Workpool } from "@convex-dev/workpool";
import { components, internal } from "./_generated/api";
import { mutation, internalMutation } from "./_generated/server";
import { v } from "convex/values";

// 创建并行度限制为1的 Workpool
const counterPool = new Workpool(components.workpool, {
  maxParallelism: 1, // 所有计数器更新操作序列化执行
});

// 公开的突变函数用于将任务加入队列
export const incrementCounter = mutation({
  args: {},
  returns: v.null(),
  handler: async (ctx) => {
    await counterPool.enqueueMutation(ctx, internal.counters.doIncrement, {});
    return null;
  },
});

// 内部突变函数执行实际的更新操作
export const doIncrement = internalMutation({
  args: {},
  returns: v.null(),
  handler: async (ctx) => {
    const counter = await ctx.db.query("counters").unique();
    if (counter) {
      await ctx.db.patch(counter._id, { count: counter.count + 1 });
    }
    return null;
  },
});

Parallel Processing with Limits

带限制的并行处理

typescript
// Process many items with limited concurrency
const processingPool = new Workpool(components.workpool, {
  maxParallelism: 5, // Process 5 items at a time
});

export const processAll = mutation({
  args: { itemIds: v.array(v.id("items")) },
  returns: v.null(),
  handler: async (ctx, args) => {
    for (const itemId of args.itemIds) {
      await processingPool.enqueueAction(ctx, internal.items.processOne, {
        itemId,
      });
    }
    return null;
  },
});
typescript
// 带并发限制的批量处理任务
const processingPool = new Workpool(components.workpool, {
  maxParallelism: 5, // 同时处理5个任务
});

export const processAll = mutation({
  args: { itemIds: v.array(v.id("items")) },
  returns: v.null(),
  handler: async (ctx, args) => {
    for (const itemId of args.itemIds) {
      await processingPool.enqueueAction(ctx, internal.items.processOne, {
        itemId,
      });
    }
    return null;
  },
});

Rate Limiting

速率限制

Application-level rate limiting using convex-helpers.
使用 convex-helpers 实现应用层的速率限制。

Setting Up Rate Limiter

设置速率限制器

typescript
// convex/rateLimit.ts
import { RateLimiter } from "convex-helpers/server/rateLimit";
import { components } from "./_generated/api";

export const rateLimiter = new RateLimiter(components.rateLimit, {
  // Global rate limit
  global: {
    kind: "token bucket",
    rate: 100, // 100 requests
    period: 60000, // per minute
  },

  // Per-user rate limit
  perUser: {
    kind: "token bucket",
    rate: 10,
    period: 60000,
  },
});
typescript
// convex/rateLimit.ts
import { RateLimiter } from "convex-helpers/server/rateLimit";
import { components } from "./_generated/api";

export const rateLimiter = new RateLimiter(components.rateLimit, {
  // 全局速率限制
  global: {
    kind: "token bucket",
    rate: 100, // 100次请求
    period: 60000, // 每分钟
  },

  // 按用户的速率限制
  perUser: {
    kind: "token bucket",
    rate: 10,
    period: 60000,
  },
});

Using Rate Limiter

使用速率限制器

typescript
// convex/api.ts
import { mutation } from "./_generated/server";
import { v } from "convex/values";
import { rateLimiter } from "./rateLimit";

export const createPost = mutation({
  args: { title: v.string(), body: v.string() },
  returns: v.union(
    v.id("posts"),
    v.object({
      error: v.string(),
      retryAfter: v.number(),
    })
  ),
  handler: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("Unauthorized");

    // Check rate limit
    const { ok, retryAfter } = await rateLimiter.limit(ctx, "perUser", {
      key: identity.subject,
    });

    if (!ok) {
      // Add jitter to prevent thundering herd
      const jitter = Math.random() * 1000;
      return {
        error: "Rate limit exceeded",
        retryAfter: retryAfter + jitter,
      };
    }

    const postId = await ctx.db.insert("posts", {
      title: args.title,
      body: args.body,
      authorId: identity.subject,
    });

    return postId;
  },
});
typescript
// convex/api.ts
import { mutation } from "./_generated/server";
import { v } from "convex/values";
import { rateLimiter } from "./rateLimit";

export const createPost = mutation({
  args: { title: v.string(), body: v.string() },
  returns: v.union(
    v.id("posts"),
    v.object({
      error: v.string(),
      retryAfter: v.number(),
    })
  ),
  handler: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("未认证");

    // 检查速率限制
    const { ok, retryAfter } = await rateLimiter.limit(ctx, "perUser", {
      key: identity.subject,
    });

    if (!ok) {
      // 添加随机抖动防止惊群效应
      const jitter = Math.random() * 1000;
      return {
        error: "请求速率超出限制",
        retryAfter: retryAfter + jitter,
      };
    }

    const postId = await ctx.db.insert("posts", {
      title: args.title,
      body: args.body,
      authorId: identity.subject,
    });

    return postId;
  },
});

Combining Patterns

模式组合

Triggers + RLS + Custom Functions

Triggers + RLS + 自定义函数

typescript
// convex/functions.ts
import {
  mutation as rawMutation,
  query as rawQuery,
} from "./_generated/server";
import { Triggers } from "convex-helpers/server/triggers";
import {
  wrapDatabaseReader,
  wrapDatabaseWriter,
} from "convex-helpers/server/rowLevelSecurity";
import {
  customCtx,
  customQuery,
  customMutation,
} from "convex-helpers/server/customFunctions";

// Set up triggers
const triggers = new Triggers<DataModel>();
triggers.register("posts", async (ctx, change) => {
  if (change.operation === "insert") {
    // Update author's post count
    const author = await ctx.db.get(change.newDoc!.authorId);
    if (author) {
      await ctx.db.patch(author._id, {
        postCount: (author.postCount ?? 0) + 1,
      });
    }
  }
});

// Set up RLS rules
async function rlsRules(ctx: QueryCtx) {
  const identity = await ctx.auth.getUserIdentity();
  return {
    posts: {
      read: async () => true,
      modify: async (_, post) => post.authorId === identity?.subject,
    },
  };
}

// Combine everything into authenticated, RLS-protected, trigger-enabled functions
export const authedMutation = customMutation(rawMutation, {
  args: {},
  input: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("Unauthorized");

    const user = await ctx.db
      .query("users")
      .withIndex("by_token", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("User not found");

    // Wrap DB with triggers and RLS
    const wrappedDb = wrapDatabaseWriter(
      ctx,
      triggers.wrapDB(ctx).db,
      await rlsRules(ctx)
    );

    return { ctx: { ...ctx, user, db: wrappedDb }, args };
  },
});
typescript
// convex/functions.ts
import {
  mutation as rawMutation,
  query as rawQuery,
} from "./_generated/server";
import { Triggers } from "convex-helpers/server/triggers";
import {
  wrapDatabaseReader,
  wrapDatabaseWriter,
} from "convex-helpers/server/rowLevelSecurity";
import {
  customCtx,
  customQuery,
  customMutation,
} from "convex-helpers/server/customFunctions";

// 设置 Triggers
const triggers = new Triggers<DataModel>();
triggers.register("posts", async (ctx, change) => {
  if (change.operation === "insert") {
    // 更新作者的文章计数
    const author = await ctx.db.get(change.newDoc!.authorId);
    if (author) {
      await ctx.db.patch(author._id, {
        postCount: (author.postCount ?? 0) + 1,
      });
    }
  }
});

// 设置 RLS 规则
async function rlsRules(ctx: QueryCtx) {
  const identity = await ctx.auth.getUserIdentity();
  return {
    posts: {
      read: async () => true,
      modify: async (_, post) => post.authorId === identity?.subject,
    },
  };
}

// 组合所有功能:带认证、RLS 保护、Triggers 的函数
export const authedMutation = customMutation(rawMutation, {
  args: {},
  input: async (ctx, args) => {
    const identity = await ctx.auth.getUserIdentity();
    if (!identity) throw new Error("未认证");

    const user = await ctx.db
      .query("users")
      .withIndex("by_token", (q) =>
        q.eq("tokenIdentifier", identity.tokenIdentifier)
      )
      .unique();

    if (!user) throw new Error("用户不存在");

    // 为数据库添加 Triggers 和 RLS 包装
    const wrappedDb = wrapDatabaseWriter(
      ctx,
      triggers.wrapDB(ctx).db,
      await rlsRules(ctx)
    );

    return { ctx: { ...ctx, user, db: wrappedDb }, args };
  },
});

Common Pitfalls

常见陷阱

Pitfall 1: Triggers Causing OCC Conflicts

陷阱1:Triggers 引发 OCC 冲突

❌ WRONG:
typescript
// This trigger updates a single global counter - will cause OCC under load
triggers.register("posts", async (ctx, change) => {
  if (change.operation === "insert") {
    const stats = await ctx.db.query("globalStats").unique();
    await ctx.db.patch(stats!._id, { postCount: stats!.postCount + 1 });
  }
});
✅ CORRECT:
typescript
// Use sharding or Workpool for high-contention updates
triggers.register("posts", async (ctx, change) => {
  if (change.operation === "insert") {
    const shardId = Math.floor(Math.random() * 10);
    await ctx.db.insert("postCountShards", { shardId, delta: 1 });
  }
});
❌ 错误示例:
typescript
// 此 Trigger 更新单个全局计数器 - 高负载下会引发 OCC 冲突
triggers.register("posts", async (ctx, change) => {
  if (change.operation === "insert") {
    const stats = await ctx.db.query("globalStats").unique();
    await ctx.db.patch(stats!._id, { postCount: stats!.postCount + 1 });
  }
});
✅ 正确示例:
typescript
// 对高竞争更新使用分片或 Workpool
triggers.register("posts", async (ctx, change) => {
  if (change.operation === "insert") {
    const shardId = Math.floor(Math.random() * 10);
    await ctx.db.insert("postCountShards", { shardId, delta: 1 });
  }
});

Pitfall 2: RLS Rules Missing Tables

陷阱2:RLS 规则遗漏表

❌ WRONG:
typescript
// Missing rules for some tables - they'll be unprotected!
async function rlsRules(ctx: QueryCtx) {
  return {
    users: { read: async () => true, modify: async () => false },
    // Missing posts, messages, etc.!
  };
}
✅ CORRECT:
typescript
// Define rules for ALL tables
async function rlsRules(ctx: QueryCtx) {
  return {
    users: { read: async () => true, modify: async () => false },
    posts: { read: async () => true, modify: async () => true },
    messages: { read: async () => true, modify: async () => true },
    // ... all other tables
  } satisfies Rules<QueryCtx, DataModel>;
}
❌ 错误示例:
typescript
// 遗漏了部分表的规则 - 这些表将不受保护!
async function rlsRules(ctx: QueryCtx) {
  return {
    users: { read: async () => true, modify: async () => false },
   </think_never_used_51bce0c785ca2f68081bfa7d91973934>
    },
  };
}
✅ 正确示例:
typescript
// 为所有表定义规则
async function rlsRules(ctx: QueryCtx) {
  return {
    users: { read: async () => true, modify: async () => false },
    posts: { read: async () => true, modify: async () => true },
    messages: { read: async () => true, modify: async () => true },
    // ... 所有其他表
  } satisfies Rules<QueryCtx, DataModel>;
}

Quick Reference

快速参考

Import Patterns

导入模式

typescript
// Triggers
import { Triggers } from "convex-helpers/server/triggers";

// RLS
import {
  Rules,
  wrapDatabaseReader,
  wrapDatabaseWriter,
} from "convex-helpers/server/rowLevelSecurity";

// Custom Functions
import {
  customCtx,
  customQuery,
  customMutation,
} from "convex-helpers/server/customFunctions";

// Relationships
import {
  getAll,
  getOneFrom,
  getManyFrom,
  getManyVia,
} from "convex-helpers/server/relationships";

// Workpool
import { Workpool } from "@convex-dev/workpool";

// Rate Limiter
import { RateLimiter } from "convex-helpers/server/rateLimit";
typescript
// Triggers
import { Triggers } from "convex-helpers/server/triggers";

// RLS
import {
  Rules,
  wrapDatabaseReader,
  wrapDatabaseWriter,
} from "convex-helpers/server/rowLevelSecurity";

// 自定义函数
import {
  customCtx,
  customQuery,
  customMutation,
} from "convex-helpers/server/customFunctions";

// 关系助手
import {
  getAll,
  getOneFrom,
  getManyFrom,
  getManyVia,
} from "convex-helpers/server/relationships";

// Workpool
import { Workpool } from "@convex-dev/workpool";

// 速率限制器
import { RateLimiter } from "convex-helpers/server/rateLimit";
",