workflow-management
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseWorkflow Management Skill
工作流管理Skill
This skill helps you work with QStash-based workflows in .
apps/api/src/lib/workflows/该Skill可帮助你在目录下处理基于QStash的工作流。
apps/api/src/lib/workflows/When to Use This Skill
何时使用该Skill
- Adding new scheduled workflows for data fetching
- Debugging workflow execution errors
- Modifying existing workflow schedules or logic
- Integrating new data sources into the update pipeline
- Adding new social media posting workflows
- 为数据获取添加新的定时工作流
- 调试工作流执行错误
- 修改现有工作流的调度或逻辑
- 将新数据源集成到更新流水线中
- 添加新的社交媒体发布工作流
Workflow Architecture
工作流架构
The project uses QStash workflows with the following structure:
apps/api/src/lib/workflows/
├── cars/ # Car registration data workflows
│ └── update.ts # Scheduled car data updates
├── coe/ # COE bidding data workflows
│ └── update.ts # Scheduled COE data updates
└── social/ # Social media posting workflows
├── discord.ts
├── linkedin.ts
├── telegram.ts
└── twitter.ts本项目使用QStash工作流,结构如下:
apps/api/src/lib/workflows/
├── cars/ # 车辆注册数据工作流
│ └── update.ts # 定时车辆数据更新
├── coe/ # COE竞拍数据工作流
│ └── update.ts # 定时COE数据更新
└── social/ # 社交媒体发布工作流
├── discord.ts
├── linkedin.ts
├── telegram.ts
└── twitter.tsKey Patterns
核心模式
1. Workflow Definition
1. 工作流定义
Workflows are defined using QStash SDK:
typescript
import { serve } from "@upstash/workflow";
export const POST = serve(async (context) => {
// Step 1: Fetch data
await context.run("fetch-data", async () => {
// Fetching logic
});
// Step 2: Process data
const processed = await context.run("process-data", async () => {
// Processing logic
});
// Step 3: Store results
await context.run("store-results", async () => {
// Storage logic
});
});工作流通过QStash SDK进行定义:
typescript
import { serve } from "@upstash/workflow";
export const POST = serve(async (context) => {
// 步骤1:获取数据
await context.run("fetch-data", async () => {
// 获取数据逻辑
});
// 步骤2:处理数据
const processed = await context.run("process-data", async () => {
// 处理数据逻辑
});
// 步骤3:存储结果
await context.run("store-results", async () => {
// 存储逻辑
});
});2. Scheduling Workflows
2. 工作流调度
Workflows are triggered via cron schedules configured in:
- SST infrastructure ()
infra/ - QStash console
- Manual API calls to workflow endpoints
工作流通过以下方式配置的cron调度触发:
- SST基础设施 ()
infra/ - QStash控制台
- 手动调用工作流端点的API
3. Error Handling
3. 错误处理
Always include comprehensive error handling:
typescript
await context.run("step-name", async () => {
try {
// Logic here
} catch (error) {
console.error("Step failed:", error);
// Log to monitoring service
throw error; // Re-throw for workflow retry
}
});始终要包含完善的错误处理:
typescript
await context.run("step-name", async () => {
try {
// 业务逻辑
} catch (error) {
console.error("步骤失败:", error);
// 记录到监控服务
throw error; // 抛出错误以触发工作流重试
}
});Common Tasks
常见任务
Adding a New Workflow
添加新工作流
- Create workflow file in appropriate directory
- Define workflow steps using
context.run() - Add route handler in
apps/api/src/routes/ - Configure scheduling (if needed)
- Add tests for workflow logic
- 在合适的目录下创建工作流文件
- 使用定义工作流步骤
context.run() - 在目录下添加路由处理器
apps/api/src/routes/ - 配置调度(如有需要)
- 为工作流逻辑添加测试
Debugging Workflow Failures
调试工作流失败问题
- Check QStash dashboard for execution logs
- Review CloudWatch logs for Lambda errors
- Verify environment variables are set correctly
- Test workflow locally using development server
- Check database connectivity and Redis availability
- 在QStash控制台查看执行日志
- 查看CloudWatch日志中的Lambda错误
- 验证环境变量是否配置正确
- 使用开发服务器在本地测试工作流
- 检查数据库连接和Redis可用性
Modifying Existing Workflows
修改现有工作流
- Read existing workflow implementation
- Identify which step needs modification
- Update step logic while maintaining error handling
- Test changes locally
- Deploy and monitor execution
- 阅读现有工作流的实现代码
- 确定需要修改的步骤
- 在保留错误处理的前提下更新步骤逻辑
- 在本地测试修改内容
- 部署并监控执行情况
Environment Variables
环境变量
Workflows typically need:
- - PostgreSQL connection
DATABASE_URL - /
UPSTASH_REDIS_REST_URL- RedisUPSTASH_REDIS_REST_TOKEN - - QStash authentication
QSTASH_TOKEN - Service-specific tokens (Discord webhook, Twitter API, etc.)
工作流通常需要以下环境变量:
- - PostgreSQL连接地址
DATABASE_URL - /
UPSTASH_REDIS_REST_URL- Redis相关凭证UPSTASH_REDIS_REST_TOKEN - - QStash认证令牌
QSTASH_TOKEN - 各服务专属令牌(Discord webhook、Twitter API等)
Testing Workflows
工作流测试
Run workflow tests:
bash
pnpm -F @sgcarstrends/api test -- src/lib/workflowsTest individual workflow locally:
bash
undefined运行工作流测试:
bash
pnpm -F @sgcarstrends/api test -- src/lib/workflows在本地测试单个工作流:
bash
undefinedStart dev server
启动开发服务器
pnpm dev
pnpm dev
Trigger workflow via HTTP
通过HTTP触发工作流
curl -X POST http://localhost:3000/api/workflows/cars/update
undefinedcurl -X POST http://localhost:3000/api/workflows/cars/update
undefinedReferences
参考资料
- QStash Workflows: Check Context7 for Upstash QStash documentation
- Related files:
- - Workflow route handlers
apps/api/src/routes/workflows.ts - - QStash configuration
apps/api/src/config/qstash.ts - - API service documentation
apps/api/CLAUDE.md
- QStash工作流:查看Context7获取Upstash QStash的文档
- 相关文件:
- - 工作流路由处理器
apps/api/src/routes/workflows.ts - - QStash配置文件
apps/api/src/config/qstash.ts - - API服务文档
apps/api/CLAUDE.md
Best Practices
最佳实践
- Idempotency: Ensure workflows can safely retry without duplicating data
- Step Granularity: Break workflows into small, focused steps
- Logging: Add comprehensive logging for debugging
- Timeouts: Configure appropriate timeouts for long-running operations
- Testing: Write unit tests for workflow logic
- Monitoring: Track workflow execution metrics
- 幂等性:确保工作流可以安全重试,不会重复数据
- 步骤粒度:将工作流拆分为小而专注的步骤
- 日志记录:添加完善的日志以便调试
- 超时设置:为长时间运行的操作配置合适的超时时间
- 测试:为工作流逻辑编写单元测试
- 监控:跟踪工作流执行指标