cloud-platforms
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCloud Platforms for Data Engineering
面向数据工程的云平台
Production-grade cloud infrastructure for data pipelines, storage, and analytics on AWS, GCP, and Azure.
基于AWS、GCP和Azure的生产级云基础设施,用于数据管道、存储与分析。
Quick Start
快速开始
python
undefinedpython
undefinedAWS S3 + Lambda Data Pipeline
AWS S3 + Lambda Data Pipeline
import boto3
import json
s3_client = boto3.client('s3')
glue_client = boto3.client('glue')
def lambda_handler(event, context):
"""Process S3 event and trigger Glue job."""
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# Trigger Glue ETL job
response = glue_client.start_job_run(
JobName='etl-process-raw-data',
Arguments={
'--source_path': f's3://{bucket}/{key}',
'--output_path': 's3://processed-bucket/output/'
}
)
return {'statusCode': 200, 'jobRunId': response['JobRunId']}undefinedimport boto3
import json
s3_client = boto3.client('s3')
glue_client = boto3.client('glue')
def lambda_handler(event, context):
"""Process S3 event and trigger Glue job."""
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# Trigger Glue ETL job
response = glue_client.start_job_run(
JobName='etl-process-raw-data',
Arguments={
'--source_path': f's3://{bucket}/{key}',
'--output_path': 's3://processed-bucket/output/'
}
)
return {'statusCode': 200, 'jobRunId': response['JobRunId']}undefinedCore Concepts
核心概念
1. AWS Data Stack
1. AWS Data Stack
python
undefinedpython
undefinedAWS Glue ETL Job (PySpark)
AWS Glue ETL Job (PySpark)
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'source_path', 'output_path'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'source_path', 'output_path'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
Read from S3
Read from S3
df = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": [args['source_path']]},
format="parquet"
)
df = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": [args['source_path']]},
format="parquet"
)
Transform
Transform
df_transformed = ApplyMapping.apply(
frame=df,
mappings=[
("id", "long", "id", "long"),
("name", "string", "customer_name", "string"),
("created_at", "string", "created_at", "timestamp")
]
)
df_transformed = ApplyMapping.apply(
frame=df,
mappings=[
("id", "long", "id", "long"),
("name", "string", "customer_name", "string"),
("created_at", "string", "created_at", "timestamp")
]
)
Write to S3 with partitioning
Write to S3 with partitioning
glueContext.write_dynamic_frame.from_options(
frame=df_transformed,
connection_type="s3",
connection_options={
"path": args['output_path'],
"partitionKeys": ["year", "month"]
},
format="parquet"
)
job.commit()
undefinedglueContext.write_dynamic_frame.from_options(
frame=df_transformed,
connection_type="s3",
connection_options={
"path": args['output_path'],
"partitionKeys": ["year", "month"]
},
format="parquet"
)
job.commit()
undefined2. GCP Data Stack
2. GCP Data Stack
python
undefinedpython
undefinedBigQuery + Cloud Functions
BigQuery + Cloud Functions
from google.cloud import bigquery
from google.cloud import storage
def process_gcs_file(event, context):
"""Cloud Function triggered by GCS upload."""
bucket = event['bucket']
name = event['name']
client = bigquery.Client()
# Load data from GCS to BigQuery
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
uri = f"gs://{bucket}/{name}"
table_id = "project.dataset.events"
load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result() # Wait for completion
return f"Loaded {load_job.output_rows} rows"undefinedfrom google.cloud import bigquery
from google.cloud import storage
def process_gcs_file(event, context):
"""Cloud Function triggered by GCS upload."""
bucket = event['bucket']
name = event['name']
client = bigquery.Client()
# Load data from GCS to BigQuery
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
uri = f"gs://{bucket}/{name}"
table_id = "project.dataset.events"
load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result() # Wait for completion
return f"Loaded {load_job.output_rows} rows"undefined3. Terraform Infrastructure
3. Terraform Infrastructure
hcl
undefinedhcl
undefinedAWS Data Lake Infrastructure
AWS Data Lake Infrastructure
resource "aws_s3_bucket" "data_lake" {
bucket = "company-data-lake-${var.environment}"
tags = {
Environment = var.environment
Purpose = "data-lake"
}
}
resource "aws_s3_bucket_lifecycle_configuration" "data_lake_lifecycle" {
bucket = aws_s3_bucket.data_lake.id
rule {
id = "archive-old-data"
status = "Enabled"
transition {
days = 90
storage_class = "GLACIER"
}
expiration {
days = 365
}}
}
resource "aws_glue_catalog_database" "analytics" {
name = "analytics_${var.environment}"
}
resource "aws_glue_crawler" "data_crawler" {
database_name = aws_glue_catalog_database.analytics.name
name = "data-crawler"
role = aws_iam_role.glue_role.arn
s3_target {
path = "s3://${aws_s3_bucket.data_lake.bucket}/raw/"
}
schedule = "cron(0 6 * * ? *)"
}
undefinedresource "aws_s3_bucket" "data_lake" {
bucket = "company-data-lake-${var.environment}"
tags = {
Environment = var.environment
Purpose = "data-lake"
}
}
resource "aws_s3_bucket_lifecycle_configuration" "data_lake_lifecycle" {
bucket = aws_s3_bucket.data_lake.id
rule {
id = "archive-old-data"
status = "Enabled"
transition {
days = 90
storage_class = "GLACIER"
}
expiration {
days = 365
}}
}
resource "aws_glue_catalog_database" "analytics" {
name = "analytics_${var.environment}"
}
resource "aws_glue_crawler" "data_crawler" {
database_name = aws_glue_catalog_database.analytics.name
name = "data-crawler"
role = aws_iam_role.glue_role.arn
s3_target {
path = "s3://${aws_s3_bucket.data_lake.bucket}/raw/"
}
schedule = "cron(0 6 * * ? *)"
}
undefined4. Cost Optimization
4. 成本优化
python
undefinedpython
undefinedAWS Cost monitoring
AWS Cost monitoring
import boto3
from datetime import datetime, timedelta
def get_service_costs(days=30):
"""Get cost breakdown by service."""
ce = boto3.client('ce')
end = datetime.now().strftime('%Y-%m-%d')
start = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
response = ce.get_cost_and_usage(
TimePeriod={'Start': start, 'End': end},
Granularity='MONTHLY',
Metrics=['UnblendedCost'],
GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
)
for result in response['ResultsByTime']:
for group in result['Groups']:
service = group['Keys'][0]
cost = float(group['Metrics']['UnblendedCost']['Amount'])
print(f"{service}: ${cost:.2f}")import boto3
from datetime import datetime, timedelta
def get_service_costs(days=30):
"""Get cost breakdown by service."""
ce = boto3.client('ce')
end = datetime.now().strftime('%Y-%m-%d')
start = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
response = ce.get_cost_and_usage(
TimePeriod={'Start': start, 'End': end},
Granularity='MONTHLY',
Metrics=['UnblendedCost'],
GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
)
for result in response['ResultsByTime']:
for group in result['Groups']:
service = group['Keys'][0]
cost = float(group['Metrics']['UnblendedCost']['Amount'])
print(f"{service}: ${cost:.2f}")S3 Intelligent Tiering
S3 Intelligent Tiering
s3 = boto3.client('s3')
s3.put_bucket_intelligent_tiering_configuration(
Bucket='data-lake',
Id='AutoTiering',
IntelligentTieringConfiguration={
'Id': 'AutoTiering',
'Status': 'Enabled',
'Tierings': [
{'Days': 90, 'AccessTier': 'ARCHIVE_ACCESS'},
{'Days': 180, 'AccessTier': 'DEEP_ARCHIVE_ACCESS'}
]
}
)
undefineds3 = boto3.client('s3')
s3.put_bucket_intelligent_tiering_configuration(
Bucket='data-lake',
Id='AutoTiering',
IntelligentTieringConfiguration={
'Id': 'AutoTiering',
'Status': 'Enabled',
'Tierings': [
{'Days': 90, 'AccessTier': 'ARCHIVE_ACCESS'},
{'Days': 180, 'AccessTier': 'DEEP_ARCHIVE_ACCESS'}
]
}
)
undefinedTools & Technologies
工具与技术
| Tool | Purpose | Version (2025) |
|---|---|---|
| AWS S3 | Object storage | Latest |
| AWS Glue | ETL service | 4.0 |
| AWS EMR | Managed Spark | 7.0+ |
| BigQuery | Analytics DW | Latest |
| Cloud Dataflow | Stream/batch | Latest |
| Azure Data Factory | ETL/ELT | Latest |
| Terraform | IaC | 1.6+ |
| Pulumi | IaC (Python) | 3.0+ |
| 工具 | 用途 | 版本(2025) |
|---|---|---|
| AWS S3 | 对象存储 | 最新版本 |
| AWS Glue | ETL服务 | 4.0 |
| AWS EMR | 托管Spark服务 | 7.0+ |
| BigQuery | 分析型数据仓库 | 最新版本 |
| Cloud Dataflow | 流/批处理 | 最新版本 |
| Azure Data Factory | ETL/ELT服务 | 最新版本 |
| Terraform | 基础设施即代码 | 1.6+ |
| Pulumi | 基础设施即代码(Python) | 3.0+ |
Troubleshooting Guide
故障排查指南
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Permission Denied | AccessDenied error | IAM policy missing | Check IAM roles |
| Timeout | Lambda/Function timeout | Long-running process | Increase timeout, use Step Functions |
| Cost Spike | Unexpected charges | Unoptimized queries/storage | Enable cost alerts, lifecycle policies |
| Cold Start | Slow first invocation | Lambda cold start | Provisioned concurrency |
| 问题 | 症状 | 根本原因 | 解决方法 |
|---|---|---|---|
| 权限拒绝 | AccessDenied错误 | IAM策略缺失 | 检查IAM角色 |
| 超时 | Lambda/函数超时 | 运行时间过长的进程 | 增加超时时间,使用Step Functions |
| 成本激增 | 意外收费 | 查询/存储未优化 | 启用成本告警,配置生命周期策略 |
| 冷启动 | 首次调用缓慢 | Lambda冷启动 | 配置预配置并发 |
Best Practices
最佳实践
python
undefinedpython
undefined✅ DO: Use IAM roles, not access keys
✅ DO: Use IAM roles, not access keys
session = boto3.Session() # Uses instance role
session = boto3.Session() # Uses instance role
✅ DO: Enable encryption at rest
✅ DO: Enable encryption at rest
s3.put_bucket_encryption(
Bucket='my-bucket',
ServerSideEncryptionConfiguration={...}
)
s3.put_bucket_encryption(
Bucket='my-bucket',
ServerSideEncryptionConfiguration={...}
)
✅ DO: Use VPC endpoints for private access
✅ DO: Use VPC endpoints for private access
✅ DO: Enable CloudWatch alarms for monitoring
✅ DO: Enable CloudWatch alarms for monitoring
✅ DO: Use tags for cost allocation
✅ DO: Use tags for cost allocation
❌ DON'T: Hard-code credentials
❌ DON'T: Hard-code credentials
❌ DON'T: Use root account for operations
❌ DON'T: Use root account for operations
❌ DON'T: Leave buckets public
❌ DON'T: Leave buckets public
undefinedundefinedResources
资源
Skill Certification Checklist:
- Can design cloud data lake architecture
- Can implement ETL with Glue/Dataflow
- Can manage infrastructure with Terraform
- Can optimize cloud costs
- Can implement security best practices
技能认证检查清单:
- 能够设计云数据湖架构
- 能够使用Glue/Dataflow实现ETL
- 能够使用Terraform管理基础设施
- 能够优化云成本
- 能够实施安全最佳实践