trigger-realtime

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Trigger.dev Realtime

Trigger.dev 实时功能

Subscribe to task runs and stream data in real-time from frontend and backend.
从前端和后端实时订阅任务运行并流式传输数据。

When to Use

适用场景

  • Building progress indicators for long-running tasks
  • Creating live dashboards showing task status
  • Streaming AI/LLM responses to the UI
  • React components that trigger and monitor tasks
  • Waiting for user approval in tasks
  • 为长时间运行的任务构建进度指示器
  • 创建显示任务状态的实时仪表盘
  • 向UI流式传输AI/LLM响应
  • 触发并监控任务的React组件
  • 任务中等待用户审批

Authentication

身份验证

Create Public Access Token (Backend)

创建公共访问令牌(后端)

ts
import { auth } from "@trigger.dev/sdk";

// Read-only token for specific runs
const publicToken = await auth.createPublicToken({
  scopes: {
    read: {
      runs: ["run_123"],
      tasks: ["my-task"],
    },
  },
  expirationTime: "1h",
});

// Pass this token to your frontend
ts
import { auth } from "@trigger.dev/sdk";

// 针对特定运行的只读令牌
const publicToken = await auth.createPublicToken({
  scopes: {
    read: {
      runs: ["run_123"],
      tasks: ["my-task"],
    },
  },
  expirationTime: "1h",
});

// 将此令牌传递给前端

Create Trigger Token (for frontend triggering)

创建触发令牌(用于前端触发任务)

ts
const triggerToken = await auth.createTriggerPublicToken("my-task", {
  expirationTime: "30m",
});
ts
const triggerToken = await auth.createTriggerPublicToken("my-task", {
  expirationTime: "30m",
});

Backend Subscriptions

后端订阅

ts
import { runs, tasks } from "@trigger.dev/sdk";

// Trigger and subscribe
const handle = await tasks.trigger("my-task", { data: "value" });

for await (const run of runs.subscribeToRun(handle.id)) {
  console.log(`Status: ${run.status}`);
  console.log(`Progress: ${run.metadata?.progress}`);
  
  if (run.status === "COMPLETED") {
    console.log("Output:", run.output);
    break;
  }
}

// Subscribe to tagged runs
for await (const run of runs.subscribeToRunsWithTag("user-123")) {
  console.log(`Run ${run.id}: ${run.status}`);
}

// Subscribe to batch
for await (const run of runs.subscribeToBatch(batchId)) {
  console.log(`Batch run ${run.id}: ${run.status}`);
}
ts
import { runs, tasks } from "@trigger.dev/sdk";

// 触发并订阅
const handle = await tasks.trigger("my-task", { data: "value" });

for await (const run of runs.subscribeToRun(handle.id)) {
  console.log(`Status: ${run.status}`);
  console.log(`Progress: ${run.metadata?.progress}`);
  
  if (run.status === "COMPLETED") {
    console.log("Output:", run.output);
    break;
  }
}

// 订阅带标签的任务运行
for await (const run of runs.subscribeToRunsWithTag("user-123")) {
  console.log(`Run ${run.id}: ${run.status}`);
}

// 订阅批量任务运行
for await (const run of runs.subscribeToBatch(batchId)) {
  console.log(`Batch run ${run.id}: ${run.status}`);
}

React Hooks

React Hooks

Installation

安装

bash
npm add @trigger.dev/react-hooks
bash
npm add @trigger.dev/react-hooks

Trigger Task from React

从React中触发任务

tsx
"use client";
import { useRealtimeTaskTrigger } from "@trigger.dev/react-hooks";
import type { myTask } from "../trigger/tasks";

function TaskTrigger({ accessToken }: { accessToken: string }) {
  const { submit, run, isLoading } = useRealtimeTaskTrigger<typeof myTask>(
    "my-task",
    { accessToken }
  );

  return (
    <div>
      <button 
        onClick={() => submit({ data: "value" })} 
        disabled={isLoading}
      >
        Start Task
      </button>
      
      {run && (
        <div>
          <p>Status: {run.status}</p>
          <p>Progress: {run.metadata?.progress}%</p>
          {run.output && <p>Result: {JSON.stringify(run.output)}</p>}
        </div>
      )}
    </div>
  );
}
tsx
"use client";
import { useRealtimeTaskTrigger } from "@trigger.dev/react-hooks";
import type { myTask } from "../trigger/tasks";

function TaskTrigger({ accessToken }: { accessToken: string }) {
  const { submit, run, isLoading } = useRealtimeTaskTrigger<typeof myTask>(
    "my-task",
    { accessToken }
  );

  return (
    <div>
      <button 
        onClick={() => submit({ data: "value" })} 
        disabled={isLoading}
      >
        启动任务
      </button>
      
      {run && (
        <div>
          <p>状态: {run.status}</p>
          <p>进度: {run.metadata?.progress}%</p>
          {run.output && <p>结果: {JSON.stringify(run.output)}</p>}
        </div>
      )}
    </div>
  );
}

Subscribe to Existing Run

订阅已存在的任务运行

tsx
"use client";
import { useRealtimeRun } from "@trigger.dev/react-hooks";
import type { myTask } from "../trigger/tasks";

function RunStatus({ runId, accessToken }: { runId: string; accessToken: string }) {
  const { run, error } = useRealtimeRun<typeof myTask>(runId, {
    accessToken,
    onComplete: (run) => {
      console.log("Completed:", run.output);
    },
  });

  if (error) return <div>Error: {error.message}</div>;
  if (!run) return <div>Loading...</div>;

  return (
    <div>
      <p>Status: {run.status}</p>
      <p>Progress: {run.metadata?.progress || 0}%</p>
    </div>
  );
}
tsx
"use client";
import { useRealtimeRun } from "@trigger.dev/react-hooks";
import type { myTask } from "../trigger/tasks";

function RunStatus({ runId, accessToken }: { runId: string; accessToken: string }) {
  const { run, error } = useRealtimeRun<typeof myTask>(runId, {
    accessToken,
    onComplete: (run) => {
      console.log("已完成:", run.output);
    },
  });

  if (error) return <div>错误: {error.message}</div>;
  if (!run) return <div>加载中...</div>;

  return (
    <div>
      <p>状态: {run.status}</p>
      <p>进度: {run.metadata?.progress || 0}%</p>
    </div>
  );
}

Subscribe to Tagged Runs

订阅带标签的任务运行

tsx
"use client";
import { useRealtimeRunsWithTag } from "@trigger.dev/react-hooks";

function UserTasks({ userId, accessToken }: { userId: string; accessToken: string }) {
  const { runs } = useRealtimeRunsWithTag(`user-${userId}`, { accessToken });

  return (
    <ul>
      {runs.map((run) => (
        <li key={run.id}>{run.id}: {run.status}</li>
      ))}
    </ul>
  );
}
tsx
"use client";
import { useRealtimeRunsWithTag } from "@trigger.dev/react-hooks";

function UserTasks({ userId, accessToken }: { userId: string; accessToken: string }) {
  const { runs } = useRealtimeRunsWithTag(`user-${userId}`, { accessToken });

  return (
    <ul>
      {runs.map((run) => (
        <li key={run.id}>{run.id}: {run.status}</li>
      ))}
    </ul>
  );
}

Realtime Streams (AI/LLM)

实时流(AI/LLM)

Define Stream (shared location)

定义流(共享位置)

ts
// trigger/streams.ts
import { streams } from "@trigger.dev/sdk";

export const aiStream = streams.define<string>({
  id: "ai-output",
});
ts
// trigger/streams.ts
import { streams } from "@trigger.dev/sdk";

export const aiStream = streams.define<string>({
  id: "ai-output",
});

Pipe Stream in Task

在任务中传输流数据

ts
import { task } from "@trigger.dev/sdk";
import { aiStream } from "./streams";

export const streamingTask = task({
  id: "streaming-task",
  run: async (payload: { prompt: string }) => {
    const completion = await openai.chat.completions.create({
      model: "gpt-4",
      messages: [{ role: "user", content: payload.prompt }],
      stream: true,
    });

    const { waitUntilComplete } = aiStream.pipe(completion);
    await waitUntilComplete();
  },
});
ts
import { task } from "@trigger.dev/sdk";
import { aiStream } from "./streams";

export const streamingTask = task({
  id: "streaming-task",
  run: async (payload: { prompt: string }) => {
    const completion = await openai.chat.completions.create({
      model: "gpt-4",
      messages: [{ role: "user", content: payload.prompt }],
      stream: true,
    });

    const { waitUntilComplete } = aiStream.pipe(completion);
    await waitUntilComplete();
  },
});

Read Stream in React

在React中读取流

tsx
"use client";
import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "../trigger/streams";

function AIResponse({ runId, accessToken }: { runId: string; accessToken: string }) {
  const { parts, error } = useRealtimeStream(aiStream, runId, {
    accessToken,
    throttleInMs: 50,
  });

  if (error) return <div>Error: {error.message}</div>;
  if (!parts) return <div>Waiting for response...</div>;

  return <div>{parts.join("")}</div>;
}
tsx
"use client";
import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "../trigger/streams";

function AIResponse({ runId, accessToken }: { runId: string; accessToken: string }) {
  const { parts, error } = useRealtimeStream(aiStream, runId, {
    accessToken,
    throttleInMs: 50,
  });

  if (error) return <div>错误: {error.message}</div>;
  if (!parts) return <div>等待响应中...</div>;

  return <div>{parts.join("")}</div>;
}

Wait Tokens (Human-in-the-loop)

等待令牌(人机协作)

In Task

在任务中

ts
import { task, wait } from "@trigger.dev/sdk";

export const approvalTask = task({
  id: "approval-task",
  run: async (payload) => {
    // Process initial data
    const processed = await processData(payload);

    // Wait for human approval
    const approval = await wait.forToken<{ approved: boolean }>({
      token: `approval-${payload.id}`,
      timeoutInSeconds: 86400, // 24 hours
    });

    if (approval.approved) {
      return await finalizeData(processed);
    }
    
    throw new Error("Not approved");
  },
});
ts
import { task, wait } from "@trigger.dev/sdk";

export const approvalTask = task({
  id: "approval-task",
  run: async (payload) => {
    // 处理初始数据
    const processed = await processData(payload);

    // 等待人工审批
    const approval = await wait.forToken<{ approved: boolean }>({
      token: `approval-${payload.id}`,
      timeoutInSeconds: 86400, // 24小时
    });

    if (approval.approved) {
      return await finalizeData(processed);
    }
    
    throw new Error("未通过审批");
  },
});

Complete Token from React

从React中完成令牌验证

tsx
"use client";
import { useWaitToken } from "@trigger.dev/react-hooks";

function ApprovalButton({ tokenId, accessToken }: { tokenId: string; accessToken: string }) {
  const { complete } = useWaitToken(tokenId, { accessToken });

  return (
    <div>
      <button onClick={() => complete({ approved: true })}>
        Approve
      </button>
      <button onClick={() => complete({ approved: false })}>
        Reject
      </button>
    </div>
  );
}
tsx
"use client";
import { useWaitToken } from "@trigger.dev/react-hooks";

function ApprovalButton({ tokenId, accessToken }: { tokenId: string; accessToken: string }) {
  const { complete } = useWaitToken(tokenId, { accessToken });

  return (
    <div>
      <button onClick={() => complete({ approved: true })}>
        批准
      </button>
      <button onClick={() => complete({ approved: false })}>
        拒绝
      </button>
    </div>
  );
}

Run Object Properties

运行对象属性

PropertyDescription
id
Unique run identifier
status
QUEUED
,
EXECUTING
,
COMPLETED
,
FAILED
,
CANCELED
payload
Task input (typed)
output
Task result (typed, when completed)
metadata
Real-time updatable data
createdAt
Start timestamp
costInCents
Execution cost
属性描述
id
唯一运行标识符
status
QUEUED
(排队中)、
EXECUTING
(执行中)、
COMPLETED
(已完成)、
FAILED
(失败)、
CANCELED
(已取消)
payload
任务输入(带类型)
output
任务结果(带类型,完成后可用)
metadata
可实时更新的数据
createdAt
启动时间戳
costInCents
执行成本

Best Practices

最佳实践

  1. Scope tokens narrowly — only grant necessary permissions
  2. Set expiration times — don't use long-lived tokens
  3. Use typed hooks — pass task types for proper inference
  4. Handle errors — always check for errors in hooks
  5. Throttle streams — use
    throttleInMs
    to control re-renders
See
references/realtime.md
for complete documentation.
  1. 令牌权限最小化 — 仅授予必要的权限
  2. 设置过期时间 — 不要使用长期有效的令牌
  3. 使用带类型的Hooks — 传递任务类型以获得正确的类型推断
  4. 处理错误 — 始终检查Hooks中的错误
  5. 限流流式传输 — 使用
    throttleInMs
    控制重渲染次数
完整文档请查看
references/realtime.md