aws-serverless-eda

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

AWS Serverless & Event-Driven Architecture

AWS 无服务器与事件驱动架构

This skill provides comprehensive guidance for building serverless applications and event-driven architectures on AWS based on Well-Architected Framework principles.
本技能基于Well-Architected Framework原则,为在AWS上构建无服务器应用和事件驱动架构提供全面指导。

AWS Documentation Requirement

AWS 文档要求

CRITICAL: This skill requires AWS MCP tools for accurate, up-to-date AWS information.
重要提示:本技能需要AWS MCP工具来获取准确、最新的AWS相关信息。

Before Answering AWS Questions

回答AWS问题前的准备

  1. Always verify using AWS MCP tools (if available):
    • mcp__aws-mcp__aws___search_documentation
      or
      mcp__*awsdocs*__aws___search_documentation
      - Search AWS docs
    • mcp__aws-mcp__aws___read_documentation
      or
      mcp__*awsdocs*__aws___read_documentation
      - Read specific pages
    • mcp__aws-mcp__aws___get_regional_availability
      - Check service availability
  2. If AWS MCP tools are unavailable:
    • Guide user to configure AWS MCP using the
      aws-mcp-setup
      skill (auto-loaded as dependency)
    • Help determine which option fits their environment:
      • Has uvx + AWS credentials → Full AWS MCP Server
      • No Python/credentials → AWS Documentation MCP (no auth)
    • If cannot determine → Ask user which option to use
  1. 始终通过AWS MCP工具验证(若可用):
    • mcp__aws-mcp__aws___search_documentation
      mcp__*awsdocs*__aws___search_documentation
      - 搜索AWS文档
    • mcp__aws-mcp__aws___read_documentation
      mcp__*awsdocs*__aws___read_documentation
      - 阅读特定页面
    • mcp__aws-mcp__aws___get_regional_availability
      - 检查服务区域可用性
  2. 若AWS MCP工具不可用
    • 引导用户使用
      aws-mcp-setup
      技能配置AWS MCP(作为依赖项自动加载)
    • 帮助用户确定适合其环境的选项:
      • 已安装uvx且拥有AWS凭证 → 完整AWS MCP Server
      • 无Python/凭证 → AWS Documentation MCP(无需认证)
    • 若无法确定 → 询问用户选择哪种选项

Serverless MCP Servers

无服务器MCP服务器

This skill can leverage serverless-specific MCP servers for enhanced development workflows:
本技能可利用特定于无服务器的MCP服务器来增强开发工作流:

AWS Serverless MCP Server

AWS Serverless MCP Server

Purpose: Complete serverless application lifecycle with SAM CLI
  • Initialize new serverless applications
  • Deploy serverless applications
  • Test Lambda functions locally
  • Generate SAM templates
  • Manage serverless application lifecycle
用途:通过SAM CLI完成无服务器应用生命周期管理
  • 初始化新的无服务器应用
  • 部署无服务器应用
  • 本地测试Lambda函数
  • 生成SAM模板
  • 管理无服务器应用生命周期

AWS Lambda Tool MCP Server

AWS Lambda Tool MCP Server

Purpose: Execute Lambda functions as tools
  • Invoke Lambda functions directly
  • Test Lambda integrations
  • Execute workflows requiring private resource access
  • Run Lambda-based automation
用途:将Lambda函数作为工具执行
  • 直接调用Lambda函数
  • 测试Lambda集成
  • 执行需要访问私有资源的工作流
  • 运行基于Lambda的自动化任务

AWS Step Functions MCP Server

AWS Step Functions MCP Server

Purpose: Execute complex workflows and orchestration
  • Create and manage state machines
  • Execute workflow orchestrations
  • Handle distributed transactions
  • Implement saga patterns
  • Coordinate microservices
用途:执行复杂工作流与编排
  • 创建和管理状态机
  • 执行工作流编排
  • 处理分布式事务
  • 实现saga模式
  • 协调微服务

Amazon SNS/SQS MCP Server

Amazon SNS/SQS MCP Server

Purpose: Event-driven messaging and queue management
  • Publish messages to SNS topics
  • Send/receive messages from SQS queues
  • Manage event-driven communication
  • Implement pub/sub patterns
  • Handle asynchronous processing
用途:事件驱动消息传递与队列管理
  • 向SNS主题发布消息
  • 从SQS队列发送/接收消息
  • 管理事件驱动通信
  • 实现发布/订阅模式
  • 处理异步处理任务

When to Use This Skill

何时使用本技能

Use this skill when:
  • Building serverless applications with Lambda
  • Designing event-driven architectures
  • Implementing microservices patterns
  • Creating asynchronous processing workflows
  • Orchestrating multi-service transactions
  • Building real-time data processing pipelines
  • Implementing saga patterns for distributed transactions
  • Designing for scale and resilience
在以下场景使用本技能:
  • 使用Lambda构建无服务器应用
  • 设计事件驱动架构
  • 实现微服务模式
  • 创建异步处理工作流
  • 编排多服务事务
  • 构建实时数据处理管道
  • 为分布式事务实现saga模式
  • 为可扩展性和弹性进行设计

AWS Well-Architected Serverless Design Principles

AWS Well-Architected无服务器设计原则

1. Speedy, Simple, Singular

1. 快速、简洁、单一职责

Functions should be concise and single-purpose
typescript
// ✅ GOOD - Single purpose, focused function
export const processOrder = async (event: OrderEvent) => {
  // Only handles order processing
  const order = await validateOrder(event);
  await saveOrder(order);
  await publishOrderCreatedEvent(order);
  return { statusCode: 200, body: JSON.stringify({ orderId: order.id }) };
};

// ❌ BAD - Function does too much
export const handleEverything = async (event: any) => {
  // Handles orders, inventory, payments, shipping...
  // Too many responsibilities
};
Keep functions environmentally efficient and cost-aware:
  • Minimize cold start times
  • Optimize memory allocation
  • Use provisioned concurrency only when needed
  • Leverage connection reuse
函数应简洁且单一职责
typescript
// ✅ 良好示例 - 单一职责、聚焦的函数
export const processOrder = async (event: OrderEvent) => {
  // 仅处理订单相关逻辑
  const order = await validateOrder(event);
  await saveOrder(order);
  await publishOrderCreatedEvent(order);
  return { statusCode: 200, body: JSON.stringify({ orderId: order.id }) };
};

// ❌ 不良示例 - 函数职责过多
export const handleEverything = async (event: any) => {
  // 处理订单、库存、支付、物流等...
  // 职责过于繁杂
};
保持函数环境高效且成本可控
  • 最小化冷启动时间
  • 优化内存分配
  • 仅在需要时使用预置并发
  • 利用连接复用

2. Think Concurrent Requests, Not Total Requests

2. 考虑并发请求,而非总请求量

Design for concurrency, not volume
Lambda scales horizontally - design considerations should focus on:
  • Concurrent execution limits
  • Downstream service throttling
  • Shared resource contention
  • Connection pool sizing
typescript
// Consider concurrent Lambda executions accessing DynamoDB
const table = new dynamodb.Table(this, 'Table', {
  billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, // Auto-scales with load
});

// Or with provisioned capacity + auto-scaling
const table = new dynamodb.Table(this, 'Table', {
  billingMode: dynamodb.BillingMode.PROVISIONED,
  readCapacity: 5,
  writeCapacity: 5,
});

// Enable auto-scaling for concurrent load
table.autoScaleReadCapacity({ minCapacity: 5, maxCapacity: 100 });
table.autoScaleWriteCapacity({ minCapacity: 5, maxCapacity: 100 });
为并发而非总量设计
Lambda水平扩展 - 设计时应关注:
  • 并发执行限制
  • 下游服务限流
  • 共享资源竞争
  • 连接池大小
typescript
// 考虑访问DynamoDB的Lambda并发执行情况
const table = new dynamodb.Table(this, 'Table', {
  billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, // 随负载自动扩展
});

// 或使用预置容量+自动扩展
const table = new dynamodb.Table(this, 'Table', {
  billingMode: dynamodb.BillingMode.PROVISIONED,
  readCapacity: 5,
  writeCapacity: 5,
});

// 为并发负载启用自动扩展
table.autoScaleReadCapacity({ minCapacity: 5, maxCapacity: 100 });
table.autoScaleWriteCapacity({ minCapacity: 5, maxCapacity: 100 });

3. Share Nothing

3. 无共享设计

Function runtime environments are short-lived
typescript
// ❌ BAD - Relying on local file system
export const handler = async (event: any) => {
  fs.writeFileSync('/tmp/data.json', JSON.stringify(data)); // Lost after execution
};

// ✅ GOOD - Use persistent storage
export const handler = async (event: any) => {
  await s3.putObject({
    Bucket: process.env.BUCKET_NAME,
    Key: 'data.json',
    Body: JSON.stringify(data),
  });
};
State management:
  • Use DynamoDB for persistent state
  • Use Step Functions for workflow state
  • Use ElastiCache for session state
  • Use S3 for file storage
函数运行时环境是短暂的
typescript
// ❌ 不良示例 - 依赖本地文件系统
export const handler = async (event: any) => {
  fs.writeFileSync('/tmp/data.json', JSON.stringify(data)); // 执行后数据丢失
};

// ✅ 良好示例 - 使用持久化存储
export const handler = async (event: any) => {
  await s3.putObject({
    Bucket: process.env.BUCKET_NAME,
    Key: 'data.json',
    Body: JSON.stringify(data),
  });
};
状态管理
  • 使用DynamoDB存储持久化状态
  • 使用Step Functions管理工作流状态
  • 使用ElastiCache存储会话状态
  • 使用S3存储文件

4. Assume No Hardware Affinity

4. 不依赖硬件特性

Applications must be hardware-agnostic
Infrastructure can change without notice:
  • Lambda functions can run on different hardware
  • Container instances can be replaced
  • No assumption about underlying infrastructure
Design for portability:
  • Use environment variables for configuration
  • Avoid hardware-specific optimizations
  • Test across different environments
应用必须与硬件无关
基础设施可能会无预警地变更:
  • Lambda函数可能在不同硬件上运行
  • 容器实例可能被替换
  • 不要假设底层基础设施的特性
为可移植性设计
  • 使用环境变量进行配置
  • 避免硬件特定优化
  • 在不同环境中进行测试

5. Orchestrate with State Machines, Not Function Chaining

5. 使用状态机编排,而非函数链式调用

Use Step Functions for orchestration
typescript
// ❌ BAD - Lambda function chaining
export const handler1 = async (event: any) => {
  const result = await processStep1(event);
  await lambda.invoke({
    FunctionName: 'handler2',
    Payload: JSON.stringify(result),
  });
};

// ✅ GOOD - Step Functions orchestration
const stateMachine = new stepfunctions.StateMachine(this, 'OrderWorkflow', {
  definition: stepfunctions.Chain
    .start(validateOrder)
    .next(processPayment)
    .next(shipOrder)
    .next(sendConfirmation),
});
Benefits of Step Functions:
  • Visual workflow representation
  • Built-in error handling and retries
  • Execution history and debugging
  • Parallel and sequential execution
  • Service integrations without code
使用Step Functions进行编排
typescript
// ❌ 不良示例 - Lambda函数链式调用
export const handler1 = async (event: any) => {
  const result = await processStep1(event);
  await lambda.invoke({
    FunctionName: 'handler2',
    Payload: JSON.stringify(result),
  });
};

// ✅ 良好示例 - Step Functions编排
const stateMachine = new stepfunctions.StateMachine(this, 'OrderWorkflow', {
  definition: stepfunctions.Chain
    .start(validateOrder)
    .next(processPayment)
    .next(shipOrder)
    .next(sendConfirmation),
});
Step Functions的优势
  • 可视化工作流展示
  • 内置错误处理与重试机制
  • 执行历史与调试功能
  • 并行与顺序执行
  • 无需代码即可集成服务

6. Use Events to Trigger Transactions

6. 使用事件触发事务

Event-driven over synchronous request/response
typescript
// Pattern: Event-driven processing
const bucket = new s3.Bucket(this, 'DataBucket');

bucket.addEventNotification(
  s3.EventType.OBJECT_CREATED,
  new s3n.LambdaDestination(processFunction),
  { prefix: 'uploads/' }
);

// Pattern: EventBridge integration
const rule = new events.Rule(this, 'OrderRule', {
  eventPattern: {
    source: ['orders'],
    detailType: ['OrderPlaced'],
  },
});

rule.addTarget(new targets.LambdaFunction(processOrderFunction));
Benefits:
  • Loose coupling between services
  • Asynchronous processing
  • Better fault tolerance
  • Independent scaling
事件驱动优于同步请求/响应
typescript
// 模式:事件驱动处理
const bucket = new s3.Bucket(this, 'DataBucket');

bucket.addEventNotification(
  s3.EventType.OBJECT_CREATED,
  new s3n.LambdaDestination(processFunction),
  { prefix: 'uploads/' }
);

// 模式:EventBridge集成
const rule = new events.Rule(this, 'OrderRule', {
  eventPattern: {
    source: ['orders'],
    detailType: ['OrderPlaced'],
  },
});

rule.addTarget(new targets.LambdaFunction(processOrderFunction));
优势
  • 服务间松耦合
  • 异步处理
  • 更好的容错性
  • 独立扩展

7. Design for Failures and Duplicates

7. 为故障与重复情况设计

Operations must be idempotent
typescript
// ✅ GOOD - Idempotent operation
export const handler = async (event: SQSEvent) => {
  for (const record of event.Records) {
    const orderId = JSON.parse(record.body).orderId;

    // Check if already processed (idempotency)
    const existing = await dynamodb.getItem({
      TableName: process.env.TABLE_NAME,
      Key: { orderId },
    });

    if (existing.Item) {
      console.log('Order already processed:', orderId);
      continue; // Skip duplicate
    }

    // Process order
    await processOrder(orderId);

    // Mark as processed
    await dynamodb.putItem({
      TableName: process.env.TABLE_NAME,
      Item: { orderId, processedAt: Date.now() },
    });
  }
};
Implement retry logic with exponential backoff:
typescript
async function withRetry<T>(fn: () => Promise<T>, maxRetries = 3): Promise<T> {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn();
    } catch (error) {
      if (i === maxRetries - 1) throw error;
      await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 1000));
    }
  }
  throw new Error('Max retries exceeded');
}
操作必须具备幂等性
typescript
// ✅ 良好示例 - 幂等操作
export const handler = async (event: SQSEvent) => {
  for (const record of event.Records) {
    const orderId = JSON.parse(record.body).orderId;

    // 检查是否已处理(幂等性)
    const existing = await dynamodb.getItem({
      TableName: process.env.TABLE_NAME,
      Key: { orderId },
    });

    if (existing.Item) {
      console.log('订单已处理:', orderId);
      continue; // 跳过重复项
    }

    // 处理订单
    await processOrder(orderId);

    // 标记为已处理
    await dynamodb.putItem({
      TableName: process.env.TABLE_NAME,
      Item: { orderId, processedAt: Date.now() },
    });
  }
};
实现带指数退避的重试逻辑
typescript
async function withRetry<T>(fn: () => Promise<T>, maxRetries = 3): Promise<T> {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn();
    } catch (error) {
      if (i === maxRetries - 1) throw error;
      await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 1000));
    }
  }
  throw new Error('超出最大重试次数');
}

Event-Driven Architecture Patterns

事件驱动架构模式

Pattern 1: Event Router (EventBridge)

模式1:事件路由器(EventBridge)

Use EventBridge for event routing and filtering:
typescript
// Create custom event bus
const eventBus = new events.EventBus(this, 'AppEventBus', {
  eventBusName: 'application-events',
});

// Define event schema
const schema = new events.Schema(this, 'OrderSchema', {
  schemaName: 'OrderPlaced',
  definition: events.SchemaDefinition.fromInline({
    openapi: '3.0.0',
    info: { version: '1.0.0', title: 'Order Events' },
    paths: {},
    components: {
      schemas: {
        OrderPlaced: {
          type: 'object',
          properties: {
            orderId: { type: 'string' },
            customerId: { type: 'string' },
            amount: { type: 'number' },
          },
        },
      },
    },
  }),
});

// Create rules for different consumers
new events.Rule(this, 'ProcessOrderRule', {
  eventBus,
  eventPattern: {
    source: ['orders'],
    detailType: ['OrderPlaced'],
  },
  targets: [new targets.LambdaFunction(processOrderFunction)],
});

new events.Rule(this, 'NotifyCustomerRule', {
  eventBus,
  eventPattern: {
    source: ['orders'],
    detailType: ['OrderPlaced'],
  },
  targets: [new targets.LambdaFunction(notifyCustomerFunction)],
});
使用EventBridge进行事件路由与过滤:
typescript
// 创建自定义事件总线
const eventBus = new events.EventBus(this, 'AppEventBus', {
  eventBusName: 'application-events',
});

// 定义事件 schema
const schema = new events.Schema(this, 'OrderSchema', {
  schemaName: 'OrderPlaced',
  definition: events.SchemaDefinition.fromInline({
    openapi: '3.0.0',
    info: { version: '1.0.0', title: 'Order Events' },
    paths: {},
    components: {
      schemas: {
        OrderPlaced: {
          type: 'object',
          properties: {
            orderId: { type: 'string' },
            customerId: { type: 'string' },
            amount: { type: 'number' },
          },
        },
      },
    },
  }),
});

// 为不同消费者创建规则
new events.Rule(this, 'ProcessOrderRule', {
  eventBus,
  eventPattern: {
    source: ['orders'],
    detailType: ['OrderPlaced'],
  },
  targets: [new targets.LambdaFunction(processOrderFunction)],
});

new events.Rule(this, 'NotifyCustomerRule', {
  eventBus,
  eventPattern: {
    source: ['orders'],
    detailType: ['OrderPlaced'],
  },
  targets: [new targets.LambdaFunction(notifyCustomerFunction)],
});

Pattern 2: Queue-Based Processing (SQS)

模式2:基于队列的处理(SQS)

Use SQS for reliable asynchronous processing:
typescript
// Standard queue for at-least-once delivery
const queue = new sqs.Queue(this, 'ProcessingQueue', {
  visibilityTimeout: Duration.seconds(300),
  retentionPeriod: Duration.days(14),
  deadLetterQueue: {
    queue: dlq,
    maxReceiveCount: 3,
  },
});

// FIFO queue for ordered processing
const fifoQueue = new sqs.Queue(this, 'OrderedQueue', {
  fifo: true,
  contentBasedDeduplication: true,
  deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
});

// Lambda consumer
new lambda.EventSourceMapping(this, 'QueueConsumer', {
  target: processingFunction,
  eventSourceArn: queue.queueArn,
  batchSize: 10,
  maxBatchingWindow: Duration.seconds(5),
});
使用SQS进行可靠的异步处理:
typescript
// 标准队列(至少一次交付)
const queue = new sqs.Queue(this, 'ProcessingQueue', {
  visibilityTimeout: Duration.seconds(300),
  retentionPeriod: Duration.days(14),
  deadLetterQueue: {
    queue: dlq,
    maxReceiveCount: 3,
  },
});

// FIFO队列(有序处理)
const fifoQueue = new sqs.Queue(this, 'OrderedQueue', {
  fifo: true,
  contentBasedDeduplication: true,
  deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
});

// Lambda消费者
new lambda.EventSourceMapping(this, 'QueueConsumer', {
  target: processingFunction,
  eventSourceArn: queue.queueArn,
  batchSize: 10,
  maxBatchingWindow: Duration.seconds(5),
});

Pattern 3: Pub/Sub (SNS + SQS Fan-Out)

模式3:发布/订阅(SNS + SQS扇出)

Implement fan-out pattern for multiple consumers:
typescript
// Create SNS topic
const topic = new sns.Topic(this, 'OrderTopic', {
  displayName: 'Order Events',
});

// Multiple SQS queues subscribe to topic
const inventoryQueue = new sqs.Queue(this, 'InventoryQueue');
const shippingQueue = new sqs.Queue(this, 'ShippingQueue');
const analyticsQueue = new sqs.Queue(this, 'AnalyticsQueue');

topic.addSubscription(new subscriptions.SqsSubscription(inventoryQueue));
topic.addSubscription(new subscriptions.SqsSubscription(shippingQueue));
topic.addSubscription(new subscriptions.SqsSubscription(analyticsQueue));

// Each queue has its own Lambda consumer
new lambda.EventSourceMapping(this, 'InventoryConsumer', {
  target: inventoryFunction,
  eventSourceArn: inventoryQueue.queueArn,
});
为多个消费者实现扇出模式:
typescript
// 创建SNS主题
const topic = new sns.Topic(this, 'OrderTopic', {
  displayName: 'Order Events',
});

// 多个SQS队列订阅该主题
const inventoryQueue = new sqs.Queue(this, 'InventoryQueue');
const shippingQueue = new sqs.Queue(this, 'ShippingQueue');
const analyticsQueue = new sqs.Queue(this, 'AnalyticsQueue');

topic.addSubscription(new subscriptions.SqsSubscription(inventoryQueue));
topic.addSubscription(new subscriptions.SqsSubscription(shippingQueue));
topic.addSubscription(new subscriptions.SqsSubscription(analyticsQueue));

// 每个队列有各自的Lambda消费者
new lambda.EventSourceMapping(this, 'InventoryConsumer', {
  target: inventoryFunction,
  eventSourceArn: inventoryQueue.queueArn,
});

Pattern 4: Saga Pattern with Step Functions

模式4:使用Step Functions的Saga模式

Implement distributed transactions:
typescript
const reserveFlight = new tasks.LambdaInvoke(this, 'ReserveFlight', {
  lambdaFunction: reserveFlightFunction,
  outputPath: '$.Payload',
});

const reserveHotel = new tasks.LambdaInvoke(this, 'ReserveHotel', {
  lambdaFunction: reserveHotelFunction,
  outputPath: '$.Payload',
});

const processPayment = new tasks.LambdaInvoke(this, 'ProcessPayment', {
  lambdaFunction: processPaymentFunction,
  outputPath: '$.Payload',
});

// Compensating transactions
const cancelFlight = new tasks.LambdaInvoke(this, 'CancelFlight', {
  lambdaFunction: cancelFlightFunction,
});

const cancelHotel = new tasks.LambdaInvoke(this, 'CancelHotel', {
  lambdaFunction: cancelHotelFunction,
});

// Define saga with compensation
const definition = reserveFlight
  .next(reserveHotel)
  .next(processPayment)
  .addCatch(cancelHotel.next(cancelFlight), {
    resultPath: '$.error',
  });

new stepfunctions.StateMachine(this, 'BookingStateMachine', {
  definition,
  timeout: Duration.minutes(5),
});
实现分布式事务:
typescript
const reserveFlight = new tasks.LambdaInvoke(this, 'ReserveFlight', {
  lambdaFunction: reserveFlightFunction,
  outputPath: '$.Payload',
});

const reserveHotel = new tasks.LambdaInvoke(this, 'ReserveHotel', {
  lambdaFunction: reserveHotelFunction,
  outputPath: '$.Payload',
});

const processPayment = new tasks.LambdaInvoke(this, 'ProcessPayment', {
  lambdaFunction: processPaymentFunction,
  outputPath: '$.Payload',
});

// 补偿事务
const cancelFlight = new tasks.LambdaInvoke(this, 'CancelFlight', {
  lambdaFunction: cancelFlightFunction,
});

const cancelHotel = new tasks.LambdaInvoke(this, 'CancelHotel', {
  lambdaFunction: cancelHotelFunction,
});

// 定义带补偿的saga
const definition = reserveFlight
  .next(reserveHotel)
  .next(processPayment)
  .addCatch(cancelHotel.next(cancelFlight), {
    resultPath: '$.error',
  });

new stepfunctions.StateMachine(this, 'BookingStateMachine', {
  definition,
  timeout: Duration.minutes(5),
});

Pattern 5: Event Sourcing

模式5:事件溯源

Store events as source of truth:
typescript
// Event store with DynamoDB
const eventStore = new dynamodb.Table(this, 'EventStore', {
  partitionKey: { name: 'aggregateId', type: dynamodb.AttributeType.STRING },
  sortKey: { name: 'version', type: dynamodb.AttributeType.NUMBER },
  stream: dynamodb.StreamViewType.NEW_IMAGE,
});

// Lambda function stores events
export const handleCommand = async (event: any) => {
  const { aggregateId, eventType, eventData } = event;

  // Get current version
  const items = await dynamodb.query({
    TableName: process.env.EVENT_STORE,
    KeyConditionExpression: 'aggregateId = :id',
    ExpressionAttributeValues: { ':id': aggregateId },
    ScanIndexForward: false,
    Limit: 1,
  });

  const nextVersion = items.Items?.[0]?.version + 1 || 1;

  // Append new event
  await dynamodb.putItem({
    TableName: process.env.EVENT_STORE,
    Item: {
      aggregateId,
      version: nextVersion,
      eventType,
      eventData,
      timestamp: Date.now(),
    },
  });
};

// Projections read from event stream
eventStore.grantStreamRead(projectionFunction);
将事件作为事实来源存储:
typescript
// 使用DynamoDB的事件存储
const eventStore = new dynamodb.Table(this, 'EventStore', {
  partitionKey: { name: 'aggregateId', type: dynamodb.AttributeType.STRING },
  sortKey: { name: 'version', type: dynamodb.AttributeType.NUMBER },
  stream: dynamodb.StreamViewType.NEW_IMAGE,
});

// Lambda函数存储事件
export const handleCommand = async (event: any) => {
  const { aggregateId, eventType, eventData } = event;

  // 获取当前版本
  const items = await dynamodb.query({
    TableName: process.env.EVENT_STORE,
    KeyConditionExpression: 'aggregateId = :id',
    ExpressionAttributeValues: { ':id': aggregateId },
    ScanIndexForward: false,
    Limit: 1,
  });

  const nextVersion = items.Items?.[0]?.version + 1 || 1;

  // 追加新事件
  await dynamodb.putItem({
    TableName: process.env.EVENT_STORE,
    Item: {
      aggregateId,
      version: nextVersion,
      eventType,
      eventData,
      timestamp: Date.now(),
    },
  });
};

// 投影从事件流读取数据
eventStore.grantStreamRead(projectionFunction);

Serverless Architecture Patterns

无服务器架构模式

Pattern 1: API-Driven Microservices

模式1:API驱动的微服务

REST APIs with Lambda backend:
typescript
const api = new apigateway.RestApi(this, 'Api', {
  restApiName: 'microservices-api',
  deployOptions: {
    throttlingRateLimit: 1000,
    throttlingBurstLimit: 2000,
    tracingEnabled: true,
  },
});

// User service
const users = api.root.addResource('users');
users.addMethod('GET', new apigateway.LambdaIntegration(getUsersFunction));
users.addMethod('POST', new apigateway.LambdaIntegration(createUserFunction));

// Order service
const orders = api.root.addResource('orders');
orders.addMethod('GET', new apigateway.LambdaIntegration(getOrdersFunction));
orders.addMethod('POST', new apigateway.LambdaIntegration(createOrderFunction));
使用Lambda后端的REST API:
typescript
const api = new apigateway.RestApi(this, 'Api', {
  restApiName: 'microservices-api',
  deployOptions: {
    throttlingRateLimit: 1000,
    throttlingBurstLimit: 2000,
    tracingEnabled: true,
  },
});

// 用户服务
const users = api.root.addResource('users');
users.addMethod('GET', new apigateway.LambdaIntegration(getUsersFunction));
users.addMethod('POST', new apigateway.LambdaIntegration(createUserFunction));

// 订单服务
const orders = api.root.addResource('orders');
orders.addMethod('GET', new apigateway.LambdaIntegration(getOrdersFunction));
orders.addMethod('POST', new apigateway.LambdaIntegration(createOrderFunction));

Pattern 2: Stream Processing

模式2:流处理

Real-time data processing with Kinesis:
typescript
const stream = new kinesis.Stream(this, 'DataStream', {
  shardCount: 2,
  retentionPeriod: Duration.days(7),
});

// Lambda processes stream records
new lambda.EventSourceMapping(this, 'StreamProcessor', {
  target: processFunction,
  eventSourceArn: stream.streamArn,
  batchSize: 100,
  maxBatchingWindow: Duration.seconds(5),
  parallelizationFactor: 10,
  startingPosition: lambda.StartingPosition.LATEST,
  retryAttempts: 3,
  bisectBatchOnError: true,
  onFailure: new lambdaDestinations.SqsDestination(dlq),
});
使用Kinesis进行实时数据处理:
typescript
const stream = new kinesis.Stream(this, 'DataStream', {
  shardCount: 2,
  retentionPeriod: Duration.days(7),
});

// Lambda处理流记录
new lambda.EventSourceMapping(this, 'StreamProcessor', {
  target: processFunction,
  eventSourceArn: stream.streamArn,
  batchSize: 100,
  maxBatchingWindow: Duration.seconds(5),
  parallelizationFactor: 10,
  startingPosition: lambda.StartingPosition.LATEST,
  retryAttempts: 3,
  bisectBatchOnError: true,
  onFailure: new lambdaDestinations.SqsDestination(dlq),
});

Pattern 3: Async Task Processing

模式3:异步任务处理

Background job processing:
typescript
// SQS queue for tasks
const taskQueue = new sqs.Queue(this, 'TaskQueue', {
  visibilityTimeout: Duration.minutes(5),
  receiveMessageWaitTime: Duration.seconds(20), // Long polling
  deadLetterQueue: {
    queue: dlq,
    maxReceiveCount: 3,
  },
});

// Lambda worker processes tasks
const worker = new lambda.Function(this, 'TaskWorker', {
  // ... configuration
  reservedConcurrentExecutions: 10, // Control concurrency
});

new lambda.EventSourceMapping(this, 'TaskConsumer', {
  target: worker,
  eventSourceArn: taskQueue.queueArn,
  batchSize: 10,
  reportBatchItemFailures: true, // Partial batch failure handling
});
后台作业处理:
typescript
// 用于任务的SQS队列
const taskQueue = new sqs.Queue(this, 'TaskQueue', {
  visibilityTimeout: Duration.minutes(5),
  receiveMessageWaitTime: Duration.seconds(20), // 长轮询
  deadLetterQueue: {
    queue: dlq,
    maxReceiveCount: 3,
  },
});

// Lambda工作进程处理任务
const worker = new lambda.Function(this, 'TaskWorker', {
  // ... 配置信息
  reservedConcurrentExecutions: 10, // 控制并发数
});

new lambda.EventSourceMapping(this, 'TaskConsumer', {
  target: worker,
  eventSourceArn: taskQueue.queueArn,
  batchSize: 10,
  reportBatchItemFailures: true, // 部分批量失败处理
});

Pattern 4: Scheduled Jobs

模式4:定时任务

Periodic processing with EventBridge:
typescript
// Daily cleanup job
new events.Rule(this, 'DailyCleanup', {
  schedule: events.Schedule.cron({ hour: '2', minute: '0' }),
  targets: [new targets.LambdaFunction(cleanupFunction)],
});

// Process every 5 minutes
new events.Rule(this, 'FrequentProcessing', {
  schedule: events.Schedule.rate(Duration.minutes(5)),
  targets: [new targets.LambdaFunction(processFunction)],
});
使用EventBridge进行周期性处理:
typescript
// 每日清理作业
new events.Rule(this, 'DailyCleanup', {
  schedule: events.Schedule.cron({ hour: '2', minute: '0' }),
  targets: [new targets.LambdaFunction(cleanupFunction)],
});

// 每5分钟处理一次
new events.Rule(this, 'FrequentProcessing', {
  schedule: events.Schedule.rate(Duration.minutes(5)),
  targets: [new targets.LambdaFunction(processFunction)],
});

Pattern 5: Webhook Processing

模式5:Webhook处理

Handle external webhooks:
typescript
// API Gateway endpoint for webhooks
const webhookApi = new apigateway.RestApi(this, 'WebhookApi', {
  restApiName: 'webhooks',
});

const webhook = webhookApi.root.addResource('webhook');
webhook.addMethod('POST', new apigateway.LambdaIntegration(webhookFunction, {
  proxy: true,
  timeout: Duration.seconds(29), // API Gateway max
}));

// Lambda handler validates and queues webhook
export const handler = async (event: APIGatewayProxyEvent) => {
  // Validate webhook signature
  const isValid = validateSignature(event.headers, event.body);
  if (!isValid) {
    return { statusCode: 401, body: 'Invalid signature' };
  }

  // Queue for async processing
  await sqs.sendMessage({
    QueueUrl: process.env.QUEUE_URL,
    MessageBody: event.body,
  });

  // Return immediately
  return { statusCode: 202, body: 'Accepted' };
};
处理外部Webhook:
typescript
// 用于Webhook的API Gateway端点
const webhookApi = new apigateway.RestApi(this, 'WebhookApi', {
  restApiName: 'webhooks',
});

const webhook = webhookApi.root.addResource('webhook');
webhook.addMethod('POST', new apigateway.LambdaIntegration(webhookFunction, {
  proxy: true,
  timeout: Duration.seconds(29), // API Gateway最大值
}));

// Lambda处理程序验证并将Webhook加入队列
export const handler = async (event: APIGatewayProxyEvent) => {
  // 验证Webhook签名
  const isValid = validateSignature(event.headers, event.body);
  if (!isValid) {
    return { statusCode: 401, body: '无效签名' };
  }

  // 加入队列进行异步处理
  await sqs.sendMessage({
    QueueUrl: process.env.QUEUE_URL,
    MessageBody: event.body,
  });

  // 立即返回
  return { statusCode: 202, body: '已接受' };
};

Best Practices

最佳实践

Error Handling

错误处理

Implement comprehensive error handling:
typescript
export const handler = async (event: SQSEvent) => {
  const failures: SQSBatchItemFailure[] = [];

  for (const record of event.Records) {
    try {
      await processRecord(record);
    } catch (error) {
      console.error('Failed to process record:', record.messageId, error);
      failures.push({ itemIdentifier: record.messageId });
    }
  }

  // Return partial batch failures for retry
  return { batchItemFailures: failures };
};
实现全面的错误处理
typescript
export const handler = async (event: SQSEvent) => {
  const failures: SQSBatchItemFailure[] = [];

  for (const record of event.Records) {
    try {
      await processRecord(record);
    } catch (error) {
      console.error('处理记录失败:', record.messageId, error);
      failures.push({ itemIdentifier: record.messageId });
    }
  }

  // 返回部分批量失败以进行重试
  return { batchItemFailures: failures };
};

Dead Letter Queues

死信队列

Always configure DLQs for error handling:
typescript
const dlq = new sqs.Queue(this, 'DLQ', {
  retentionPeriod: Duration.days(14),
});

const queue = new sqs.Queue(this, 'Queue', {
  deadLetterQueue: {
    queue: dlq,
    maxReceiveCount: 3,
  },
});

// Monitor DLQ depth
new cloudwatch.Alarm(this, 'DLQAlarm', {
  metric: dlq.metricApproximateNumberOfMessagesVisible(),
  threshold: 1,
  evaluationPeriods: 1,
  alarmDescription: 'Messages in DLQ require attention',
});
始终配置DLQ以处理错误
typescript
const dlq = new sqs.Queue(this, 'DLQ', {
  retentionPeriod: Duration.days(14),
});

const queue = new sqs.Queue(this, 'Queue', {
  deadLetterQueue: {
    queue: dlq,
    maxReceiveCount: 3,
  },
});

// 监控DLQ深度
new cloudwatch.Alarm(this, 'DLQAlarm', {
  metric: dlq.metricApproximateNumberOfMessagesVisible(),
  threshold: 1,
  evaluationPeriods: 1,
  alarmDescription: 'DLQ中的消息需要关注',
});

Observability

可观测性

Enable tracing and monitoring:
typescript
new NodejsFunction(this, 'Function', {
  entry: 'src/handler.ts',
  tracing: lambda.Tracing.ACTIVE, // X-Ray tracing
  environment: {
    POWERTOOLS_SERVICE_NAME: 'order-service',
    POWERTOOLS_METRICS_NAMESPACE: 'MyApp',
    LOG_LEVEL: 'INFO',
  },
});
启用追踪与监控
typescript
new NodejsFunction(this, 'Function', {
  entry: 'src/handler.ts',
  tracing: lambda.Tracing.ACTIVE, // X-Ray追踪
  environment: {
    POWERTOOLS_SERVICE_NAME: 'order-service',
    POWERTOOLS_METRICS_NAMESPACE: 'MyApp',
    LOG_LEVEL: 'INFO',
  },
});

Using MCP Servers Effectively

有效使用MCP服务器

AWS Serverless MCP Usage

AWS Serverless MCP使用方法

Lifecycle management:
  • Initialize new serverless projects
  • Generate SAM templates
  • Deploy applications
  • Test locally before deployment
生命周期管理
  • 初始化新的无服务器项目
  • 生成SAM模板
  • 部署应用
  • 部署前在本地测试

Lambda Tool MCP Usage

Lambda Tool MCP使用方法

Function execution:
  • Test Lambda functions directly
  • Execute automation workflows
  • Access private resources
  • Validate integrations
函数执行
  • 直接测试Lambda函数
  • 执行自动化工作流
  • 访问私有资源
  • 验证集成

Step Functions MCP Usage

Step Functions MCP使用方法

Workflow orchestration:
  • Create state machines for complex workflows
  • Execute distributed transactions
  • Implement saga patterns
  • Coordinate microservices
工作流编排
  • 为复杂工作流创建状态机
  • 执行分布式事务
  • 实现saga模式
  • 协调微服务

SNS/SQS MCP Usage

SNS/SQS MCP使用方法

Messaging operations:
  • Test pub/sub patterns
  • Send test messages to queues
  • Validate event routing
  • Debug message processing
消息传递操作
  • 测试发布/订阅模式
  • 向队列发送测试消息
  • 验证事件路由
  • 调试消息处理

Additional Resources

额外资源

This skill includes comprehensive reference documentation based on AWS best practices:
  • Serverless Patterns:
    references/serverless-patterns.md
    • Core serverless architectures and API patterns
    • Data processing and integration patterns
    • Orchestration with Step Functions
    • Anti-patterns to avoid
  • Event-Driven Architecture Patterns:
    references/eda-patterns.md
    • Event routing and processing patterns
    • Event sourcing and saga patterns
    • Idempotency and error handling
    • Message ordering and deduplication
  • Security Best Practices:
    references/security-best-practices.md
    • Shared responsibility model
    • IAM least privilege patterns
    • Data protection and encryption
    • Network security with VPC
  • Observability Best Practices:
    references/observability-best-practices.md
    • Three pillars: metrics, logs, traces
    • Structured logging with Lambda Powertools
    • X-Ray distributed tracing
    • CloudWatch alarms and dashboards
  • Performance Optimization:
    references/performance-optimization.md
    • Cold start optimization techniques
    • Memory and CPU optimization
    • Package size reduction
    • Provisioned concurrency patterns
  • Deployment Best Practices:
    references/deployment-best-practices.md
    • CI/CD pipeline design
    • Testing strategies (unit, integration, load)
    • Deployment strategies (canary, blue/green)
    • Rollback and safety mechanisms
External Resources:
For detailed implementation patterns, anti-patterns, and code examples, refer to the comprehensive references in the skill directory.
本技能包含基于AWS最佳实践的全面参考文档:
  • 无服务器模式
    references/serverless-patterns.md
    • 核心无服务器架构与API模式
    • 数据处理与集成模式
    • 使用Step Functions进行编排
    • 需要避免的反模式
  • 事件驱动架构模式
    references/eda-patterns.md
    • 事件路由与处理模式
    • 事件溯源与saga模式
    • 幂等性与错误处理
    • 消息排序与去重
  • 安全最佳实践
    references/security-best-practices.md
    • 共享责任模型
    • IAM最小权限模式
    • 数据保护与加密
    • 使用VPC的网络安全
  • 可观测性最佳实践
    references/observability-best-practices.md
    • 三大支柱:指标、日志、追踪
    • 使用Lambda Powertools进行结构化日志
    • X-Ray分布式追踪
    • CloudWatch告警与仪表板
  • 性能优化
    references/performance-optimization.md
    • 冷启动优化技巧
    • 内存与CPU优化
    • 包大小缩减
    • 预置并发模式
  • 部署最佳实践
    references/deployment-best-practices.md
    • CI/CD管道设计
    • 测试策略(单元测试、集成测试、负载测试)
    • 部署策略(金丝雀、蓝绿部署)
    • 回滚与安全机制
外部资源
如需详细的实现模式、反模式与代码示例,请参考技能目录中的全面参考文档。