workflow-management

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Workflow Management Skill

工作流管理Skill

This skill helps you work with QStash-based workflows in
apps/api/src/lib/workflows/
.
该Skill可帮助你在
apps/api/src/lib/workflows/
目录下处理基于QStash的工作流。

When to Use This Skill

何时使用该Skill

  • Adding new scheduled workflows for data fetching
  • Debugging workflow execution errors
  • Modifying existing workflow schedules or logic
  • Integrating new data sources into the update pipeline
  • Adding new social media posting workflows
  • 为数据获取添加新的定时工作流
  • 调试工作流执行错误
  • 修改现有工作流的调度或逻辑
  • 将新数据源集成到更新流水线中
  • 添加新的社交媒体发布工作流

Workflow Architecture

工作流架构

The project uses QStash workflows with the following structure:
apps/api/src/lib/workflows/
├── cars/                 # Car registration data workflows
│   └── update.ts        # Scheduled car data updates
├── coe/                 # COE bidding data workflows
│   └── update.ts        # Scheduled COE data updates
└── social/              # Social media posting workflows
    ├── discord.ts
    ├── linkedin.ts
    ├── telegram.ts
    └── twitter.ts
本项目使用QStash工作流,结构如下:
apps/api/src/lib/workflows/
├── cars/                 # 车辆注册数据工作流
│   └── update.ts        # 定时车辆数据更新
├── coe/                 # COE竞拍数据工作流
│   └── update.ts        # 定时COE数据更新
└── social/              # 社交媒体发布工作流
    ├── discord.ts
    ├── linkedin.ts
    ├── telegram.ts
    └── twitter.ts

Key Patterns

核心模式

1. Workflow Definition

1. 工作流定义

Workflows are defined using QStash SDK:
typescript
import { serve } from "@upstash/workflow";

export const POST = serve(async (context) => {
  // Step 1: Fetch data
  await context.run("fetch-data", async () => {
    // Fetching logic
  });

  // Step 2: Process data
  const processed = await context.run("process-data", async () => {
    // Processing logic
  });

  // Step 3: Store results
  await context.run("store-results", async () => {
    // Storage logic
  });
});
工作流通过QStash SDK进行定义:
typescript
import { serve } from "@upstash/workflow";

export const POST = serve(async (context) => {
  // 步骤1:获取数据
  await context.run("fetch-data", async () => {
    // 获取数据逻辑
  });

  // 步骤2:处理数据
  const processed = await context.run("process-data", async () => {
    // 处理数据逻辑
  });

  // 步骤3:存储结果
  await context.run("store-results", async () => {
    // 存储逻辑
  });
});

2. Scheduling Workflows

2. 工作流调度

Workflows are triggered via cron schedules configured in:
  • SST infrastructure (
    infra/
    )
  • QStash console
  • Manual API calls to workflow endpoints
工作流通过以下方式配置的cron调度触发:
  • SST基础设施 (
    infra/
    )
  • QStash控制台
  • 手动调用工作流端点的API

3. Error Handling

3. 错误处理

Always include comprehensive error handling:
typescript
await context.run("step-name", async () => {
  try {
    // Logic here
  } catch (error) {
    console.error("Step failed:", error);
    // Log to monitoring service
    throw error; // Re-throw for workflow retry
  }
});
始终要包含完善的错误处理:
typescript
await context.run("step-name", async () => {
  try {
    // 业务逻辑
  } catch (error) {
    console.error("步骤失败:", error);
    // 记录到监控服务
    throw error; // 抛出错误以触发工作流重试
  }
});

Common Tasks

常见任务

Adding a New Workflow

添加新工作流

  1. Create workflow file in appropriate directory
  2. Define workflow steps using
    context.run()
  3. Add route handler in
    apps/api/src/routes/
  4. Configure scheduling (if needed)
  5. Add tests for workflow logic
  1. 在合适的目录下创建工作流文件
  2. 使用
    context.run()
    定义工作流步骤
  3. apps/api/src/routes/
    目录下添加路由处理器
  4. 配置调度(如有需要)
  5. 为工作流逻辑添加测试

Debugging Workflow Failures

调试工作流失败问题

  1. Check QStash dashboard for execution logs
  2. Review CloudWatch logs for Lambda errors
  3. Verify environment variables are set correctly
  4. Test workflow locally using development server
  5. Check database connectivity and Redis availability
  1. 在QStash控制台查看执行日志
  2. 查看CloudWatch日志中的Lambda错误
  3. 验证环境变量是否配置正确
  4. 使用开发服务器在本地测试工作流
  5. 检查数据库连接和Redis可用性

Modifying Existing Workflows

修改现有工作流

  1. Read existing workflow implementation
  2. Identify which step needs modification
  3. Update step logic while maintaining error handling
  4. Test changes locally
  5. Deploy and monitor execution
  1. 阅读现有工作流的实现代码
  2. 确定需要修改的步骤
  3. 在保留错误处理的前提下更新步骤逻辑
  4. 在本地测试修改内容
  5. 部署并监控执行情况

Environment Variables

环境变量

Workflows typically need:
  • DATABASE_URL
    - PostgreSQL connection
  • UPSTASH_REDIS_REST_URL
    /
    UPSTASH_REDIS_REST_TOKEN
    - Redis
  • QSTASH_TOKEN
    - QStash authentication
  • Service-specific tokens (Discord webhook, Twitter API, etc.)
工作流通常需要以下环境变量:
  • DATABASE_URL
    - PostgreSQL连接地址
  • UPSTASH_REDIS_REST_URL
    /
    UPSTASH_REDIS_REST_TOKEN
    - Redis相关凭证
  • QSTASH_TOKEN
    - QStash认证令牌
  • 各服务专属令牌(Discord webhook、Twitter API等)

Testing Workflows

工作流测试

Run workflow tests:
bash
pnpm -F @sgcarstrends/api test -- src/lib/workflows
Test individual workflow locally:
bash
undefined
运行工作流测试:
bash
pnpm -F @sgcarstrends/api test -- src/lib/workflows
在本地测试单个工作流:
bash
undefined

Start dev server

启动开发服务器

pnpm dev
pnpm dev

Trigger workflow via HTTP

通过HTTP触发工作流

References

参考资料

  • QStash Workflows: Check Context7 for Upstash QStash documentation
  • Related files:
    • apps/api/src/routes/workflows.ts
      - Workflow route handlers
    • apps/api/src/config/qstash.ts
      - QStash configuration
    • apps/api/CLAUDE.md
      - API service documentation
  • QStash工作流:查看Context7获取Upstash QStash的文档
  • 相关文件:
    • apps/api/src/routes/workflows.ts
      - 工作流路由处理器
    • apps/api/src/config/qstash.ts
      - QStash配置文件
    • apps/api/CLAUDE.md
      - API服务文档

Best Practices

最佳实践

  1. Idempotency: Ensure workflows can safely retry without duplicating data
  2. Step Granularity: Break workflows into small, focused steps
  3. Logging: Add comprehensive logging for debugging
  4. Timeouts: Configure appropriate timeouts for long-running operations
  5. Testing: Write unit tests for workflow logic
  6. Monitoring: Track workflow execution metrics
  1. 幂等性:确保工作流可以安全重试,不会重复数据
  2. 步骤粒度:将工作流拆分为小而专注的步骤
  3. 日志记录:添加完善的日志以便调试
  4. 超时设置:为长时间运行的操作配置合适的超时时间
  5. 测试:为工作流逻辑编写单元测试
  6. 监控:跟踪工作流执行指标