aws-serverless-eda
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAWS Serverless & Event-Driven Architecture
AWS 无服务器与事件驱动架构
This skill provides comprehensive guidance for building serverless applications and event-driven architectures on AWS based on Well-Architected Framework principles.
本技能基于Well-Architected Framework原则,为在AWS上构建无服务器应用和事件驱动架构提供全面指导。
AWS Documentation Requirement
AWS 文档要求
CRITICAL: This skill requires AWS MCP tools for accurate, up-to-date AWS information.
重要提示:本技能需要AWS MCP工具来获取准确、最新的AWS相关信息。
Before Answering AWS Questions
回答AWS问题前的准备
-
Always verify using AWS MCP tools (if available):
- or
mcp__aws-mcp__aws___search_documentation- Search AWS docsmcp__*awsdocs*__aws___search_documentation - or
mcp__aws-mcp__aws___read_documentation- Read specific pagesmcp__*awsdocs*__aws___read_documentation - - Check service availability
mcp__aws-mcp__aws___get_regional_availability
-
If AWS MCP tools are unavailable:
- Guide user to configure AWS MCP using the skill (auto-loaded as dependency)
aws-mcp-setup - Help determine which option fits their environment:
- Has uvx + AWS credentials → Full AWS MCP Server
- No Python/credentials → AWS Documentation MCP (no auth)
- If cannot determine → Ask user which option to use
- Guide user to configure AWS MCP using the
-
始终通过AWS MCP工具验证(若可用):
- 或
mcp__aws-mcp__aws___search_documentation- 搜索AWS文档mcp__*awsdocs*__aws___search_documentation - 或
mcp__aws-mcp__aws___read_documentation- 阅读特定页面mcp__*awsdocs*__aws___read_documentation - - 检查服务区域可用性
mcp__aws-mcp__aws___get_regional_availability
-
若AWS MCP工具不可用:
- 引导用户使用技能配置AWS MCP(作为依赖项自动加载)
aws-mcp-setup - 帮助用户确定适合其环境的选项:
- 已安装uvx且拥有AWS凭证 → 完整AWS MCP Server
- 无Python/凭证 → AWS Documentation MCP(无需认证)
- 若无法确定 → 询问用户选择哪种选项
- 引导用户使用
Serverless MCP Servers
无服务器MCP服务器
This skill can leverage serverless-specific MCP servers for enhanced development workflows:
本技能可利用特定于无服务器的MCP服务器来增强开发工作流:
AWS Serverless MCP Server
AWS Serverless MCP Server
Purpose: Complete serverless application lifecycle with SAM CLI
- Initialize new serverless applications
- Deploy serverless applications
- Test Lambda functions locally
- Generate SAM templates
- Manage serverless application lifecycle
用途:通过SAM CLI完成无服务器应用生命周期管理
- 初始化新的无服务器应用
- 部署无服务器应用
- 本地测试Lambda函数
- 生成SAM模板
- 管理无服务器应用生命周期
AWS Lambda Tool MCP Server
AWS Lambda Tool MCP Server
Purpose: Execute Lambda functions as tools
- Invoke Lambda functions directly
- Test Lambda integrations
- Execute workflows requiring private resource access
- Run Lambda-based automation
用途:将Lambda函数作为工具执行
- 直接调用Lambda函数
- 测试Lambda集成
- 执行需要访问私有资源的工作流
- 运行基于Lambda的自动化任务
AWS Step Functions MCP Server
AWS Step Functions MCP Server
Purpose: Execute complex workflows and orchestration
- Create and manage state machines
- Execute workflow orchestrations
- Handle distributed transactions
- Implement saga patterns
- Coordinate microservices
用途:执行复杂工作流与编排
- 创建和管理状态机
- 执行工作流编排
- 处理分布式事务
- 实现saga模式
- 协调微服务
Amazon SNS/SQS MCP Server
Amazon SNS/SQS MCP Server
Purpose: Event-driven messaging and queue management
- Publish messages to SNS topics
- Send/receive messages from SQS queues
- Manage event-driven communication
- Implement pub/sub patterns
- Handle asynchronous processing
用途:事件驱动消息传递与队列管理
- 向SNS主题发布消息
- 从SQS队列发送/接收消息
- 管理事件驱动通信
- 实现发布/订阅模式
- 处理异步处理任务
When to Use This Skill
何时使用本技能
Use this skill when:
- Building serverless applications with Lambda
- Designing event-driven architectures
- Implementing microservices patterns
- Creating asynchronous processing workflows
- Orchestrating multi-service transactions
- Building real-time data processing pipelines
- Implementing saga patterns for distributed transactions
- Designing for scale and resilience
在以下场景使用本技能:
- 使用Lambda构建无服务器应用
- 设计事件驱动架构
- 实现微服务模式
- 创建异步处理工作流
- 编排多服务事务
- 构建实时数据处理管道
- 为分布式事务实现saga模式
- 为可扩展性和弹性进行设计
AWS Well-Architected Serverless Design Principles
AWS Well-Architected无服务器设计原则
1. Speedy, Simple, Singular
1. 快速、简洁、单一职责
Functions should be concise and single-purpose
typescript
// ✅ GOOD - Single purpose, focused function
export const processOrder = async (event: OrderEvent) => {
// Only handles order processing
const order = await validateOrder(event);
await saveOrder(order);
await publishOrderCreatedEvent(order);
return { statusCode: 200, body: JSON.stringify({ orderId: order.id }) };
};
// ❌ BAD - Function does too much
export const handleEverything = async (event: any) => {
// Handles orders, inventory, payments, shipping...
// Too many responsibilities
};Keep functions environmentally efficient and cost-aware:
- Minimize cold start times
- Optimize memory allocation
- Use provisioned concurrency only when needed
- Leverage connection reuse
函数应简洁且单一职责
typescript
// ✅ 良好示例 - 单一职责、聚焦的函数
export const processOrder = async (event: OrderEvent) => {
// 仅处理订单相关逻辑
const order = await validateOrder(event);
await saveOrder(order);
await publishOrderCreatedEvent(order);
return { statusCode: 200, body: JSON.stringify({ orderId: order.id }) };
};
// ❌ 不良示例 - 函数职责过多
export const handleEverything = async (event: any) => {
// 处理订单、库存、支付、物流等...
// 职责过于繁杂
};保持函数环境高效且成本可控:
- 最小化冷启动时间
- 优化内存分配
- 仅在需要时使用预置并发
- 利用连接复用
2. Think Concurrent Requests, Not Total Requests
2. 考虑并发请求,而非总请求量
Design for concurrency, not volume
Lambda scales horizontally - design considerations should focus on:
- Concurrent execution limits
- Downstream service throttling
- Shared resource contention
- Connection pool sizing
typescript
// Consider concurrent Lambda executions accessing DynamoDB
const table = new dynamodb.Table(this, 'Table', {
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, // Auto-scales with load
});
// Or with provisioned capacity + auto-scaling
const table = new dynamodb.Table(this, 'Table', {
billingMode: dynamodb.BillingMode.PROVISIONED,
readCapacity: 5,
writeCapacity: 5,
});
// Enable auto-scaling for concurrent load
table.autoScaleReadCapacity({ minCapacity: 5, maxCapacity: 100 });
table.autoScaleWriteCapacity({ minCapacity: 5, maxCapacity: 100 });为并发而非总量设计
Lambda水平扩展 - 设计时应关注:
- 并发执行限制
- 下游服务限流
- 共享资源竞争
- 连接池大小
typescript
// 考虑访问DynamoDB的Lambda并发执行情况
const table = new dynamodb.Table(this, 'Table', {
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, // 随负载自动扩展
});
// 或使用预置容量+自动扩展
const table = new dynamodb.Table(this, 'Table', {
billingMode: dynamodb.BillingMode.PROVISIONED,
readCapacity: 5,
writeCapacity: 5,
});
// 为并发负载启用自动扩展
table.autoScaleReadCapacity({ minCapacity: 5, maxCapacity: 100 });
table.autoScaleWriteCapacity({ minCapacity: 5, maxCapacity: 100 });3. Share Nothing
3. 无共享设计
Function runtime environments are short-lived
typescript
// ❌ BAD - Relying on local file system
export const handler = async (event: any) => {
fs.writeFileSync('/tmp/data.json', JSON.stringify(data)); // Lost after execution
};
// ✅ GOOD - Use persistent storage
export const handler = async (event: any) => {
await s3.putObject({
Bucket: process.env.BUCKET_NAME,
Key: 'data.json',
Body: JSON.stringify(data),
});
};State management:
- Use DynamoDB for persistent state
- Use Step Functions for workflow state
- Use ElastiCache for session state
- Use S3 for file storage
函数运行时环境是短暂的
typescript
// ❌ 不良示例 - 依赖本地文件系统
export const handler = async (event: any) => {
fs.writeFileSync('/tmp/data.json', JSON.stringify(data)); // 执行后数据丢失
};
// ✅ 良好示例 - 使用持久化存储
export const handler = async (event: any) => {
await s3.putObject({
Bucket: process.env.BUCKET_NAME,
Key: 'data.json',
Body: JSON.stringify(data),
});
};状态管理:
- 使用DynamoDB存储持久化状态
- 使用Step Functions管理工作流状态
- 使用ElastiCache存储会话状态
- 使用S3存储文件
4. Assume No Hardware Affinity
4. 不依赖硬件特性
Applications must be hardware-agnostic
Infrastructure can change without notice:
- Lambda functions can run on different hardware
- Container instances can be replaced
- No assumption about underlying infrastructure
Design for portability:
- Use environment variables for configuration
- Avoid hardware-specific optimizations
- Test across different environments
应用必须与硬件无关
基础设施可能会无预警地变更:
- Lambda函数可能在不同硬件上运行
- 容器实例可能被替换
- 不要假设底层基础设施的特性
为可移植性设计:
- 使用环境变量进行配置
- 避免硬件特定优化
- 在不同环境中进行测试
5. Orchestrate with State Machines, Not Function Chaining
5. 使用状态机编排,而非函数链式调用
Use Step Functions for orchestration
typescript
// ❌ BAD - Lambda function chaining
export const handler1 = async (event: any) => {
const result = await processStep1(event);
await lambda.invoke({
FunctionName: 'handler2',
Payload: JSON.stringify(result),
});
};
// ✅ GOOD - Step Functions orchestration
const stateMachine = new stepfunctions.StateMachine(this, 'OrderWorkflow', {
definition: stepfunctions.Chain
.start(validateOrder)
.next(processPayment)
.next(shipOrder)
.next(sendConfirmation),
});Benefits of Step Functions:
- Visual workflow representation
- Built-in error handling and retries
- Execution history and debugging
- Parallel and sequential execution
- Service integrations without code
使用Step Functions进行编排
typescript
// ❌ 不良示例 - Lambda函数链式调用
export const handler1 = async (event: any) => {
const result = await processStep1(event);
await lambda.invoke({
FunctionName: 'handler2',
Payload: JSON.stringify(result),
});
};
// ✅ 良好示例 - Step Functions编排
const stateMachine = new stepfunctions.StateMachine(this, 'OrderWorkflow', {
definition: stepfunctions.Chain
.start(validateOrder)
.next(processPayment)
.next(shipOrder)
.next(sendConfirmation),
});Step Functions的优势:
- 可视化工作流展示
- 内置错误处理与重试机制
- 执行历史与调试功能
- 并行与顺序执行
- 无需代码即可集成服务
6. Use Events to Trigger Transactions
6. 使用事件触发事务
Event-driven over synchronous request/response
typescript
// Pattern: Event-driven processing
const bucket = new s3.Bucket(this, 'DataBucket');
bucket.addEventNotification(
s3.EventType.OBJECT_CREATED,
new s3n.LambdaDestination(processFunction),
{ prefix: 'uploads/' }
);
// Pattern: EventBridge integration
const rule = new events.Rule(this, 'OrderRule', {
eventPattern: {
source: ['orders'],
detailType: ['OrderPlaced'],
},
});
rule.addTarget(new targets.LambdaFunction(processOrderFunction));Benefits:
- Loose coupling between services
- Asynchronous processing
- Better fault tolerance
- Independent scaling
事件驱动优于同步请求/响应
typescript
// 模式:事件驱动处理
const bucket = new s3.Bucket(this, 'DataBucket');
bucket.addEventNotification(
s3.EventType.OBJECT_CREATED,
new s3n.LambdaDestination(processFunction),
{ prefix: 'uploads/' }
);
// 模式:EventBridge集成
const rule = new events.Rule(this, 'OrderRule', {
eventPattern: {
source: ['orders'],
detailType: ['OrderPlaced'],
},
});
rule.addTarget(new targets.LambdaFunction(processOrderFunction));优势:
- 服务间松耦合
- 异步处理
- 更好的容错性
- 独立扩展
7. Design for Failures and Duplicates
7. 为故障与重复情况设计
Operations must be idempotent
typescript
// ✅ GOOD - Idempotent operation
export const handler = async (event: SQSEvent) => {
for (const record of event.Records) {
const orderId = JSON.parse(record.body).orderId;
// Check if already processed (idempotency)
const existing = await dynamodb.getItem({
TableName: process.env.TABLE_NAME,
Key: { orderId },
});
if (existing.Item) {
console.log('Order already processed:', orderId);
continue; // Skip duplicate
}
// Process order
await processOrder(orderId);
// Mark as processed
await dynamodb.putItem({
TableName: process.env.TABLE_NAME,
Item: { orderId, processedAt: Date.now() },
});
}
};Implement retry logic with exponential backoff:
typescript
async function withRetry<T>(fn: () => Promise<T>, maxRetries = 3): Promise<T> {
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (error) {
if (i === maxRetries - 1) throw error;
await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 1000));
}
}
throw new Error('Max retries exceeded');
}操作必须具备幂等性
typescript
// ✅ 良好示例 - 幂等操作
export const handler = async (event: SQSEvent) => {
for (const record of event.Records) {
const orderId = JSON.parse(record.body).orderId;
// 检查是否已处理(幂等性)
const existing = await dynamodb.getItem({
TableName: process.env.TABLE_NAME,
Key: { orderId },
});
if (existing.Item) {
console.log('订单已处理:', orderId);
continue; // 跳过重复项
}
// 处理订单
await processOrder(orderId);
// 标记为已处理
await dynamodb.putItem({
TableName: process.env.TABLE_NAME,
Item: { orderId, processedAt: Date.now() },
});
}
};实现带指数退避的重试逻辑:
typescript
async function withRetry<T>(fn: () => Promise<T>, maxRetries = 3): Promise<T> {
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (error) {
if (i === maxRetries - 1) throw error;
await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 1000));
}
}
throw new Error('超出最大重试次数');
}Event-Driven Architecture Patterns
事件驱动架构模式
Pattern 1: Event Router (EventBridge)
模式1:事件路由器(EventBridge)
Use EventBridge for event routing and filtering:
typescript
// Create custom event bus
const eventBus = new events.EventBus(this, 'AppEventBus', {
eventBusName: 'application-events',
});
// Define event schema
const schema = new events.Schema(this, 'OrderSchema', {
schemaName: 'OrderPlaced',
definition: events.SchemaDefinition.fromInline({
openapi: '3.0.0',
info: { version: '1.0.0', title: 'Order Events' },
paths: {},
components: {
schemas: {
OrderPlaced: {
type: 'object',
properties: {
orderId: { type: 'string' },
customerId: { type: 'string' },
amount: { type: 'number' },
},
},
},
},
}),
});
// Create rules for different consumers
new events.Rule(this, 'ProcessOrderRule', {
eventBus,
eventPattern: {
source: ['orders'],
detailType: ['OrderPlaced'],
},
targets: [new targets.LambdaFunction(processOrderFunction)],
});
new events.Rule(this, 'NotifyCustomerRule', {
eventBus,
eventPattern: {
source: ['orders'],
detailType: ['OrderPlaced'],
},
targets: [new targets.LambdaFunction(notifyCustomerFunction)],
});使用EventBridge进行事件路由与过滤:
typescript
// 创建自定义事件总线
const eventBus = new events.EventBus(this, 'AppEventBus', {
eventBusName: 'application-events',
});
// 定义事件 schema
const schema = new events.Schema(this, 'OrderSchema', {
schemaName: 'OrderPlaced',
definition: events.SchemaDefinition.fromInline({
openapi: '3.0.0',
info: { version: '1.0.0', title: 'Order Events' },
paths: {},
components: {
schemas: {
OrderPlaced: {
type: 'object',
properties: {
orderId: { type: 'string' },
customerId: { type: 'string' },
amount: { type: 'number' },
},
},
},
},
}),
});
// 为不同消费者创建规则
new events.Rule(this, 'ProcessOrderRule', {
eventBus,
eventPattern: {
source: ['orders'],
detailType: ['OrderPlaced'],
},
targets: [new targets.LambdaFunction(processOrderFunction)],
});
new events.Rule(this, 'NotifyCustomerRule', {
eventBus,
eventPattern: {
source: ['orders'],
detailType: ['OrderPlaced'],
},
targets: [new targets.LambdaFunction(notifyCustomerFunction)],
});Pattern 2: Queue-Based Processing (SQS)
模式2:基于队列的处理(SQS)
Use SQS for reliable asynchronous processing:
typescript
// Standard queue for at-least-once delivery
const queue = new sqs.Queue(this, 'ProcessingQueue', {
visibilityTimeout: Duration.seconds(300),
retentionPeriod: Duration.days(14),
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3,
},
});
// FIFO queue for ordered processing
const fifoQueue = new sqs.Queue(this, 'OrderedQueue', {
fifo: true,
contentBasedDeduplication: true,
deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
});
// Lambda consumer
new lambda.EventSourceMapping(this, 'QueueConsumer', {
target: processingFunction,
eventSourceArn: queue.queueArn,
batchSize: 10,
maxBatchingWindow: Duration.seconds(5),
});使用SQS进行可靠的异步处理:
typescript
// 标准队列(至少一次交付)
const queue = new sqs.Queue(this, 'ProcessingQueue', {
visibilityTimeout: Duration.seconds(300),
retentionPeriod: Duration.days(14),
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3,
},
});
// FIFO队列(有序处理)
const fifoQueue = new sqs.Queue(this, 'OrderedQueue', {
fifo: true,
contentBasedDeduplication: true,
deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
});
// Lambda消费者
new lambda.EventSourceMapping(this, 'QueueConsumer', {
target: processingFunction,
eventSourceArn: queue.queueArn,
batchSize: 10,
maxBatchingWindow: Duration.seconds(5),
});Pattern 3: Pub/Sub (SNS + SQS Fan-Out)
模式3:发布/订阅(SNS + SQS扇出)
Implement fan-out pattern for multiple consumers:
typescript
// Create SNS topic
const topic = new sns.Topic(this, 'OrderTopic', {
displayName: 'Order Events',
});
// Multiple SQS queues subscribe to topic
const inventoryQueue = new sqs.Queue(this, 'InventoryQueue');
const shippingQueue = new sqs.Queue(this, 'ShippingQueue');
const analyticsQueue = new sqs.Queue(this, 'AnalyticsQueue');
topic.addSubscription(new subscriptions.SqsSubscription(inventoryQueue));
topic.addSubscription(new subscriptions.SqsSubscription(shippingQueue));
topic.addSubscription(new subscriptions.SqsSubscription(analyticsQueue));
// Each queue has its own Lambda consumer
new lambda.EventSourceMapping(this, 'InventoryConsumer', {
target: inventoryFunction,
eventSourceArn: inventoryQueue.queueArn,
});为多个消费者实现扇出模式:
typescript
// 创建SNS主题
const topic = new sns.Topic(this, 'OrderTopic', {
displayName: 'Order Events',
});
// 多个SQS队列订阅该主题
const inventoryQueue = new sqs.Queue(this, 'InventoryQueue');
const shippingQueue = new sqs.Queue(this, 'ShippingQueue');
const analyticsQueue = new sqs.Queue(this, 'AnalyticsQueue');
topic.addSubscription(new subscriptions.SqsSubscription(inventoryQueue));
topic.addSubscription(new subscriptions.SqsSubscription(shippingQueue));
topic.addSubscription(new subscriptions.SqsSubscription(analyticsQueue));
// 每个队列有各自的Lambda消费者
new lambda.EventSourceMapping(this, 'InventoryConsumer', {
target: inventoryFunction,
eventSourceArn: inventoryQueue.queueArn,
});Pattern 4: Saga Pattern with Step Functions
模式4:使用Step Functions的Saga模式
Implement distributed transactions:
typescript
const reserveFlight = new tasks.LambdaInvoke(this, 'ReserveFlight', {
lambdaFunction: reserveFlightFunction,
outputPath: '$.Payload',
});
const reserveHotel = new tasks.LambdaInvoke(this, 'ReserveHotel', {
lambdaFunction: reserveHotelFunction,
outputPath: '$.Payload',
});
const processPayment = new tasks.LambdaInvoke(this, 'ProcessPayment', {
lambdaFunction: processPaymentFunction,
outputPath: '$.Payload',
});
// Compensating transactions
const cancelFlight = new tasks.LambdaInvoke(this, 'CancelFlight', {
lambdaFunction: cancelFlightFunction,
});
const cancelHotel = new tasks.LambdaInvoke(this, 'CancelHotel', {
lambdaFunction: cancelHotelFunction,
});
// Define saga with compensation
const definition = reserveFlight
.next(reserveHotel)
.next(processPayment)
.addCatch(cancelHotel.next(cancelFlight), {
resultPath: '$.error',
});
new stepfunctions.StateMachine(this, 'BookingStateMachine', {
definition,
timeout: Duration.minutes(5),
});实现分布式事务:
typescript
const reserveFlight = new tasks.LambdaInvoke(this, 'ReserveFlight', {
lambdaFunction: reserveFlightFunction,
outputPath: '$.Payload',
});
const reserveHotel = new tasks.LambdaInvoke(this, 'ReserveHotel', {
lambdaFunction: reserveHotelFunction,
outputPath: '$.Payload',
});
const processPayment = new tasks.LambdaInvoke(this, 'ProcessPayment', {
lambdaFunction: processPaymentFunction,
outputPath: '$.Payload',
});
// 补偿事务
const cancelFlight = new tasks.LambdaInvoke(this, 'CancelFlight', {
lambdaFunction: cancelFlightFunction,
});
const cancelHotel = new tasks.LambdaInvoke(this, 'CancelHotel', {
lambdaFunction: cancelHotelFunction,
});
// 定义带补偿的saga
const definition = reserveFlight
.next(reserveHotel)
.next(processPayment)
.addCatch(cancelHotel.next(cancelFlight), {
resultPath: '$.error',
});
new stepfunctions.StateMachine(this, 'BookingStateMachine', {
definition,
timeout: Duration.minutes(5),
});Pattern 5: Event Sourcing
模式5:事件溯源
Store events as source of truth:
typescript
// Event store with DynamoDB
const eventStore = new dynamodb.Table(this, 'EventStore', {
partitionKey: { name: 'aggregateId', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'version', type: dynamodb.AttributeType.NUMBER },
stream: dynamodb.StreamViewType.NEW_IMAGE,
});
// Lambda function stores events
export const handleCommand = async (event: any) => {
const { aggregateId, eventType, eventData } = event;
// Get current version
const items = await dynamodb.query({
TableName: process.env.EVENT_STORE,
KeyConditionExpression: 'aggregateId = :id',
ExpressionAttributeValues: { ':id': aggregateId },
ScanIndexForward: false,
Limit: 1,
});
const nextVersion = items.Items?.[0]?.version + 1 || 1;
// Append new event
await dynamodb.putItem({
TableName: process.env.EVENT_STORE,
Item: {
aggregateId,
version: nextVersion,
eventType,
eventData,
timestamp: Date.now(),
},
});
};
// Projections read from event stream
eventStore.grantStreamRead(projectionFunction);将事件作为事实来源存储:
typescript
// 使用DynamoDB的事件存储
const eventStore = new dynamodb.Table(this, 'EventStore', {
partitionKey: { name: 'aggregateId', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'version', type: dynamodb.AttributeType.NUMBER },
stream: dynamodb.StreamViewType.NEW_IMAGE,
});
// Lambda函数存储事件
export const handleCommand = async (event: any) => {
const { aggregateId, eventType, eventData } = event;
// 获取当前版本
const items = await dynamodb.query({
TableName: process.env.EVENT_STORE,
KeyConditionExpression: 'aggregateId = :id',
ExpressionAttributeValues: { ':id': aggregateId },
ScanIndexForward: false,
Limit: 1,
});
const nextVersion = items.Items?.[0]?.version + 1 || 1;
// 追加新事件
await dynamodb.putItem({
TableName: process.env.EVENT_STORE,
Item: {
aggregateId,
version: nextVersion,
eventType,
eventData,
timestamp: Date.now(),
},
});
};
// 投影从事件流读取数据
eventStore.grantStreamRead(projectionFunction);Serverless Architecture Patterns
无服务器架构模式
Pattern 1: API-Driven Microservices
模式1:API驱动的微服务
REST APIs with Lambda backend:
typescript
const api = new apigateway.RestApi(this, 'Api', {
restApiName: 'microservices-api',
deployOptions: {
throttlingRateLimit: 1000,
throttlingBurstLimit: 2000,
tracingEnabled: true,
},
});
// User service
const users = api.root.addResource('users');
users.addMethod('GET', new apigateway.LambdaIntegration(getUsersFunction));
users.addMethod('POST', new apigateway.LambdaIntegration(createUserFunction));
// Order service
const orders = api.root.addResource('orders');
orders.addMethod('GET', new apigateway.LambdaIntegration(getOrdersFunction));
orders.addMethod('POST', new apigateway.LambdaIntegration(createOrderFunction));使用Lambda后端的REST API:
typescript
const api = new apigateway.RestApi(this, 'Api', {
restApiName: 'microservices-api',
deployOptions: {
throttlingRateLimit: 1000,
throttlingBurstLimit: 2000,
tracingEnabled: true,
},
});
// 用户服务
const users = api.root.addResource('users');
users.addMethod('GET', new apigateway.LambdaIntegration(getUsersFunction));
users.addMethod('POST', new apigateway.LambdaIntegration(createUserFunction));
// 订单服务
const orders = api.root.addResource('orders');
orders.addMethod('GET', new apigateway.LambdaIntegration(getOrdersFunction));
orders.addMethod('POST', new apigateway.LambdaIntegration(createOrderFunction));Pattern 2: Stream Processing
模式2:流处理
Real-time data processing with Kinesis:
typescript
const stream = new kinesis.Stream(this, 'DataStream', {
shardCount: 2,
retentionPeriod: Duration.days(7),
});
// Lambda processes stream records
new lambda.EventSourceMapping(this, 'StreamProcessor', {
target: processFunction,
eventSourceArn: stream.streamArn,
batchSize: 100,
maxBatchingWindow: Duration.seconds(5),
parallelizationFactor: 10,
startingPosition: lambda.StartingPosition.LATEST,
retryAttempts: 3,
bisectBatchOnError: true,
onFailure: new lambdaDestinations.SqsDestination(dlq),
});使用Kinesis进行实时数据处理:
typescript
const stream = new kinesis.Stream(this, 'DataStream', {
shardCount: 2,
retentionPeriod: Duration.days(7),
});
// Lambda处理流记录
new lambda.EventSourceMapping(this, 'StreamProcessor', {
target: processFunction,
eventSourceArn: stream.streamArn,
batchSize: 100,
maxBatchingWindow: Duration.seconds(5),
parallelizationFactor: 10,
startingPosition: lambda.StartingPosition.LATEST,
retryAttempts: 3,
bisectBatchOnError: true,
onFailure: new lambdaDestinations.SqsDestination(dlq),
});Pattern 3: Async Task Processing
模式3:异步任务处理
Background job processing:
typescript
// SQS queue for tasks
const taskQueue = new sqs.Queue(this, 'TaskQueue', {
visibilityTimeout: Duration.minutes(5),
receiveMessageWaitTime: Duration.seconds(20), // Long polling
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3,
},
});
// Lambda worker processes tasks
const worker = new lambda.Function(this, 'TaskWorker', {
// ... configuration
reservedConcurrentExecutions: 10, // Control concurrency
});
new lambda.EventSourceMapping(this, 'TaskConsumer', {
target: worker,
eventSourceArn: taskQueue.queueArn,
batchSize: 10,
reportBatchItemFailures: true, // Partial batch failure handling
});后台作业处理:
typescript
// 用于任务的SQS队列
const taskQueue = new sqs.Queue(this, 'TaskQueue', {
visibilityTimeout: Duration.minutes(5),
receiveMessageWaitTime: Duration.seconds(20), // 长轮询
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3,
},
});
// Lambda工作进程处理任务
const worker = new lambda.Function(this, 'TaskWorker', {
// ... 配置信息
reservedConcurrentExecutions: 10, // 控制并发数
});
new lambda.EventSourceMapping(this, 'TaskConsumer', {
target: worker,
eventSourceArn: taskQueue.queueArn,
batchSize: 10,
reportBatchItemFailures: true, // 部分批量失败处理
});Pattern 4: Scheduled Jobs
模式4:定时任务
Periodic processing with EventBridge:
typescript
// Daily cleanup job
new events.Rule(this, 'DailyCleanup', {
schedule: events.Schedule.cron({ hour: '2', minute: '0' }),
targets: [new targets.LambdaFunction(cleanupFunction)],
});
// Process every 5 minutes
new events.Rule(this, 'FrequentProcessing', {
schedule: events.Schedule.rate(Duration.minutes(5)),
targets: [new targets.LambdaFunction(processFunction)],
});使用EventBridge进行周期性处理:
typescript
// 每日清理作业
new events.Rule(this, 'DailyCleanup', {
schedule: events.Schedule.cron({ hour: '2', minute: '0' }),
targets: [new targets.LambdaFunction(cleanupFunction)],
});
// 每5分钟处理一次
new events.Rule(this, 'FrequentProcessing', {
schedule: events.Schedule.rate(Duration.minutes(5)),
targets: [new targets.LambdaFunction(processFunction)],
});Pattern 5: Webhook Processing
模式5:Webhook处理
Handle external webhooks:
typescript
// API Gateway endpoint for webhooks
const webhookApi = new apigateway.RestApi(this, 'WebhookApi', {
restApiName: 'webhooks',
});
const webhook = webhookApi.root.addResource('webhook');
webhook.addMethod('POST', new apigateway.LambdaIntegration(webhookFunction, {
proxy: true,
timeout: Duration.seconds(29), // API Gateway max
}));
// Lambda handler validates and queues webhook
export const handler = async (event: APIGatewayProxyEvent) => {
// Validate webhook signature
const isValid = validateSignature(event.headers, event.body);
if (!isValid) {
return { statusCode: 401, body: 'Invalid signature' };
}
// Queue for async processing
await sqs.sendMessage({
QueueUrl: process.env.QUEUE_URL,
MessageBody: event.body,
});
// Return immediately
return { statusCode: 202, body: 'Accepted' };
};处理外部Webhook:
typescript
// 用于Webhook的API Gateway端点
const webhookApi = new apigateway.RestApi(this, 'WebhookApi', {
restApiName: 'webhooks',
});
const webhook = webhookApi.root.addResource('webhook');
webhook.addMethod('POST', new apigateway.LambdaIntegration(webhookFunction, {
proxy: true,
timeout: Duration.seconds(29), // API Gateway最大值
}));
// Lambda处理程序验证并将Webhook加入队列
export const handler = async (event: APIGatewayProxyEvent) => {
// 验证Webhook签名
const isValid = validateSignature(event.headers, event.body);
if (!isValid) {
return { statusCode: 401, body: '无效签名' };
}
// 加入队列进行异步处理
await sqs.sendMessage({
QueueUrl: process.env.QUEUE_URL,
MessageBody: event.body,
});
// 立即返回
return { statusCode: 202, body: '已接受' };
};Best Practices
最佳实践
Error Handling
错误处理
Implement comprehensive error handling:
typescript
export const handler = async (event: SQSEvent) => {
const failures: SQSBatchItemFailure[] = [];
for (const record of event.Records) {
try {
await processRecord(record);
} catch (error) {
console.error('Failed to process record:', record.messageId, error);
failures.push({ itemIdentifier: record.messageId });
}
}
// Return partial batch failures for retry
return { batchItemFailures: failures };
};实现全面的错误处理:
typescript
export const handler = async (event: SQSEvent) => {
const failures: SQSBatchItemFailure[] = [];
for (const record of event.Records) {
try {
await processRecord(record);
} catch (error) {
console.error('处理记录失败:', record.messageId, error);
failures.push({ itemIdentifier: record.messageId });
}
}
// 返回部分批量失败以进行重试
return { batchItemFailures: failures };
};Dead Letter Queues
死信队列
Always configure DLQs for error handling:
typescript
const dlq = new sqs.Queue(this, 'DLQ', {
retentionPeriod: Duration.days(14),
});
const queue = new sqs.Queue(this, 'Queue', {
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3,
},
});
// Monitor DLQ depth
new cloudwatch.Alarm(this, 'DLQAlarm', {
metric: dlq.metricApproximateNumberOfMessagesVisible(),
threshold: 1,
evaluationPeriods: 1,
alarmDescription: 'Messages in DLQ require attention',
});始终配置DLQ以处理错误:
typescript
const dlq = new sqs.Queue(this, 'DLQ', {
retentionPeriod: Duration.days(14),
});
const queue = new sqs.Queue(this, 'Queue', {
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3,
},
});
// 监控DLQ深度
new cloudwatch.Alarm(this, 'DLQAlarm', {
metric: dlq.metricApproximateNumberOfMessagesVisible(),
threshold: 1,
evaluationPeriods: 1,
alarmDescription: 'DLQ中的消息需要关注',
});Observability
可观测性
Enable tracing and monitoring:
typescript
new NodejsFunction(this, 'Function', {
entry: 'src/handler.ts',
tracing: lambda.Tracing.ACTIVE, // X-Ray tracing
environment: {
POWERTOOLS_SERVICE_NAME: 'order-service',
POWERTOOLS_METRICS_NAMESPACE: 'MyApp',
LOG_LEVEL: 'INFO',
},
});启用追踪与监控:
typescript
new NodejsFunction(this, 'Function', {
entry: 'src/handler.ts',
tracing: lambda.Tracing.ACTIVE, // X-Ray追踪
environment: {
POWERTOOLS_SERVICE_NAME: 'order-service',
POWERTOOLS_METRICS_NAMESPACE: 'MyApp',
LOG_LEVEL: 'INFO',
},
});Using MCP Servers Effectively
有效使用MCP服务器
AWS Serverless MCP Usage
AWS Serverless MCP使用方法
Lifecycle management:
- Initialize new serverless projects
- Generate SAM templates
- Deploy applications
- Test locally before deployment
生命周期管理:
- 初始化新的无服务器项目
- 生成SAM模板
- 部署应用
- 部署前在本地测试
Lambda Tool MCP Usage
Lambda Tool MCP使用方法
Function execution:
- Test Lambda functions directly
- Execute automation workflows
- Access private resources
- Validate integrations
函数执行:
- 直接测试Lambda函数
- 执行自动化工作流
- 访问私有资源
- 验证集成
Step Functions MCP Usage
Step Functions MCP使用方法
Workflow orchestration:
- Create state machines for complex workflows
- Execute distributed transactions
- Implement saga patterns
- Coordinate microservices
工作流编排:
- 为复杂工作流创建状态机
- 执行分布式事务
- 实现saga模式
- 协调微服务
SNS/SQS MCP Usage
SNS/SQS MCP使用方法
Messaging operations:
- Test pub/sub patterns
- Send test messages to queues
- Validate event routing
- Debug message processing
消息传递操作:
- 测试发布/订阅模式
- 向队列发送测试消息
- 验证事件路由
- 调试消息处理
Additional Resources
额外资源
This skill includes comprehensive reference documentation based on AWS best practices:
-
Serverless Patterns:
references/serverless-patterns.md- Core serverless architectures and API patterns
- Data processing and integration patterns
- Orchestration with Step Functions
- Anti-patterns to avoid
-
Event-Driven Architecture Patterns:
references/eda-patterns.md- Event routing and processing patterns
- Event sourcing and saga patterns
- Idempotency and error handling
- Message ordering and deduplication
-
Security Best Practices:
references/security-best-practices.md- Shared responsibility model
- IAM least privilege patterns
- Data protection and encryption
- Network security with VPC
-
Observability Best Practices:
references/observability-best-practices.md- Three pillars: metrics, logs, traces
- Structured logging with Lambda Powertools
- X-Ray distributed tracing
- CloudWatch alarms and dashboards
-
Performance Optimization:
references/performance-optimization.md- Cold start optimization techniques
- Memory and CPU optimization
- Package size reduction
- Provisioned concurrency patterns
-
Deployment Best Practices:
references/deployment-best-practices.md- CI/CD pipeline design
- Testing strategies (unit, integration, load)
- Deployment strategies (canary, blue/green)
- Rollback and safety mechanisms
External Resources:
- AWS Well-Architected Serverless Lens: https://docs.aws.amazon.com/wellarchitected/latest/serverless-applications-lens/
- ServerlessLand.com: Pre-built serverless patterns
- AWS Serverless Workshops: https://serverlessland.com/learn?type=Workshops
For detailed implementation patterns, anti-patterns, and code examples, refer to the comprehensive references in the skill directory.
本技能包含基于AWS最佳实践的全面参考文档:
-
无服务器模式:
references/serverless-patterns.md- 核心无服务器架构与API模式
- 数据处理与集成模式
- 使用Step Functions进行编排
- 需要避免的反模式
-
事件驱动架构模式:
references/eda-patterns.md- 事件路由与处理模式
- 事件溯源与saga模式
- 幂等性与错误处理
- 消息排序与去重
-
安全最佳实践:
references/security-best-practices.md- 共享责任模型
- IAM最小权限模式
- 数据保护与加密
- 使用VPC的网络安全
-
可观测性最佳实践:
references/observability-best-practices.md- 三大支柱:指标、日志、追踪
- 使用Lambda Powertools进行结构化日志
- X-Ray分布式追踪
- CloudWatch告警与仪表板
-
性能优化:
references/performance-optimization.md- 冷启动优化技巧
- 内存与CPU优化
- 包大小缩减
- 预置并发模式
-
部署最佳实践:
references/deployment-best-practices.md- CI/CD管道设计
- 测试策略(单元测试、集成测试、负载测试)
- 部署策略(金丝雀、蓝绿部署)
- 回滚与安全机制
外部资源:
- AWS Well-Architected无服务器视角:https://docs.aws.amazon.com/wellarchitected/latest/serverless-applications-lens/
- ServerlessLand.com:预构建的无服务器模式
- AWS无服务器工作坊:https://serverlessland.com/learn?type=Workshops
如需详细的实现模式、反模式与代码示例,请参考技能目录中的全面参考文档。