cloud-platforms

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Cloud Platforms for Data Engineering

面向数据工程的云平台

Production-grade cloud infrastructure for data pipelines, storage, and analytics on AWS, GCP, and Azure.
基于AWS、GCP和Azure的生产级云基础设施,用于数据管道、存储与分析。

Quick Start

快速开始

python
undefined
python
undefined

AWS S3 + Lambda Data Pipeline

AWS S3 + Lambda Data Pipeline

import boto3 import json
s3_client = boto3.client('s3') glue_client = boto3.client('glue')
def lambda_handler(event, context): """Process S3 event and trigger Glue job.""" bucket = event['Records'][0]['s3']['bucket']['name'] key = event['Records'][0]['s3']['object']['key']
# Trigger Glue ETL job
response = glue_client.start_job_run(
    JobName='etl-process-raw-data',
    Arguments={
        '--source_path': f's3://{bucket}/{key}',
        '--output_path': 's3://processed-bucket/output/'
    }
)

return {'statusCode': 200, 'jobRunId': response['JobRunId']}
undefined
import boto3 import json
s3_client = boto3.client('s3') glue_client = boto3.client('glue')
def lambda_handler(event, context): """Process S3 event and trigger Glue job.""" bucket = event['Records'][0]['s3']['bucket']['name'] key = event['Records'][0]['s3']['object']['key']
# Trigger Glue ETL job
response = glue_client.start_job_run(
    JobName='etl-process-raw-data',
    Arguments={
        '--source_path': f's3://{bucket}/{key}',
        '--output_path': 's3://processed-bucket/output/'
    }
)

return {'statusCode': 200, 'jobRunId': response['JobRunId']}
undefined

Core Concepts

核心概念

1. AWS Data Stack

1. AWS Data Stack

python
undefined
python
undefined

AWS Glue ETL Job (PySpark)

AWS Glue ETL Job (PySpark)

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'source_path', 'output_path']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args)
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'source_path', 'output_path']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args)

Read from S3

Read from S3

df = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": [args['source_path']]}, format="parquet" )
df = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": [args['source_path']]}, format="parquet" )

Transform

Transform

df_transformed = ApplyMapping.apply( frame=df, mappings=[ ("id", "long", "id", "long"), ("name", "string", "customer_name", "string"), ("created_at", "string", "created_at", "timestamp") ] )
df_transformed = ApplyMapping.apply( frame=df, mappings=[ ("id", "long", "id", "long"), ("name", "string", "customer_name", "string"), ("created_at", "string", "created_at", "timestamp") ] )

Write to S3 with partitioning

Write to S3 with partitioning

glueContext.write_dynamic_frame.from_options( frame=df_transformed, connection_type="s3", connection_options={ "path": args['output_path'], "partitionKeys": ["year", "month"] }, format="parquet" )
job.commit()
undefined
glueContext.write_dynamic_frame.from_options( frame=df_transformed, connection_type="s3", connection_options={ "path": args['output_path'], "partitionKeys": ["year", "month"] }, format="parquet" )
job.commit()
undefined

2. GCP Data Stack

2. GCP Data Stack

python
undefined
python
undefined

BigQuery + Cloud Functions

BigQuery + Cloud Functions

from google.cloud import bigquery from google.cloud import storage
def process_gcs_file(event, context): """Cloud Function triggered by GCS upload.""" bucket = event['bucket'] name = event['name']
client = bigquery.Client()

# Load data from GCS to BigQuery
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)

uri = f"gs://{bucket}/{name}"
table_id = "project.dataset.events"

load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result()  # Wait for completion

return f"Loaded {load_job.output_rows} rows"
undefined
from google.cloud import bigquery from google.cloud import storage
def process_gcs_file(event, context): """Cloud Function triggered by GCS upload.""" bucket = event['bucket'] name = event['name']
client = bigquery.Client()

# Load data from GCS to BigQuery
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)

uri = f"gs://{bucket}/{name}"
table_id = "project.dataset.events"

load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result()  # Wait for completion

return f"Loaded {load_job.output_rows} rows"
undefined

3. Terraform Infrastructure

3. Terraform Infrastructure

hcl
undefined
hcl
undefined

AWS Data Lake Infrastructure

AWS Data Lake Infrastructure

resource "aws_s3_bucket" "data_lake" { bucket = "company-data-lake-${var.environment}"
tags = { Environment = var.environment Purpose = "data-lake" } }
resource "aws_s3_bucket_lifecycle_configuration" "data_lake_lifecycle" { bucket = aws_s3_bucket.data_lake.id
rule { id = "archive-old-data" status = "Enabled"
transition {
  days          = 90
  storage_class = "GLACIER"
}

expiration {
  days = 365
}
} }
resource "aws_glue_catalog_database" "analytics" { name = "analytics_${var.environment}" }
resource "aws_glue_crawler" "data_crawler" { database_name = aws_glue_catalog_database.analytics.name name = "data-crawler" role = aws_iam_role.glue_role.arn
s3_target { path = "s3://${aws_s3_bucket.data_lake.bucket}/raw/" }
schedule = "cron(0 6 * * ? *)" }
undefined
resource "aws_s3_bucket" "data_lake" { bucket = "company-data-lake-${var.environment}"
tags = { Environment = var.environment Purpose = "data-lake" } }
resource "aws_s3_bucket_lifecycle_configuration" "data_lake_lifecycle" { bucket = aws_s3_bucket.data_lake.id
rule { id = "archive-old-data" status = "Enabled"
transition {
  days          = 90
  storage_class = "GLACIER"
}

expiration {
  days = 365
}
} }
resource "aws_glue_catalog_database" "analytics" { name = "analytics_${var.environment}" }
resource "aws_glue_crawler" "data_crawler" { database_name = aws_glue_catalog_database.analytics.name name = "data-crawler" role = aws_iam_role.glue_role.arn
s3_target { path = "s3://${aws_s3_bucket.data_lake.bucket}/raw/" }
schedule = "cron(0 6 * * ? *)" }
undefined

4. Cost Optimization

4. 成本优化

python
undefined
python
undefined

AWS Cost monitoring

AWS Cost monitoring

import boto3 from datetime import datetime, timedelta
def get_service_costs(days=30): """Get cost breakdown by service.""" ce = boto3.client('ce')
end = datetime.now().strftime('%Y-%m-%d')
start = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')

response = ce.get_cost_and_usage(
    TimePeriod={'Start': start, 'End': end},
    Granularity='MONTHLY',
    Metrics=['UnblendedCost'],
    GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
)

for result in response['ResultsByTime']:
    for group in result['Groups']:
        service = group['Keys'][0]
        cost = float(group['Metrics']['UnblendedCost']['Amount'])
        print(f"{service}: ${cost:.2f}")
import boto3 from datetime import datetime, timedelta
def get_service_costs(days=30): """Get cost breakdown by service.""" ce = boto3.client('ce')
end = datetime.now().strftime('%Y-%m-%d')
start = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')

response = ce.get_cost_and_usage(
    TimePeriod={'Start': start, 'End': end},
    Granularity='MONTHLY',
    Metrics=['UnblendedCost'],
    GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
)

for result in response['ResultsByTime']:
    for group in result['Groups']:
        service = group['Keys'][0]
        cost = float(group['Metrics']['UnblendedCost']['Amount'])
        print(f"{service}: ${cost:.2f}")

S3 Intelligent Tiering

S3 Intelligent Tiering

s3 = boto3.client('s3') s3.put_bucket_intelligent_tiering_configuration( Bucket='data-lake', Id='AutoTiering', IntelligentTieringConfiguration={ 'Id': 'AutoTiering', 'Status': 'Enabled', 'Tierings': [ {'Days': 90, 'AccessTier': 'ARCHIVE_ACCESS'}, {'Days': 180, 'AccessTier': 'DEEP_ARCHIVE_ACCESS'} ] } )
undefined
s3 = boto3.client('s3') s3.put_bucket_intelligent_tiering_configuration( Bucket='data-lake', Id='AutoTiering', IntelligentTieringConfiguration={ 'Id': 'AutoTiering', 'Status': 'Enabled', 'Tierings': [ {'Days': 90, 'AccessTier': 'ARCHIVE_ACCESS'}, {'Days': 180, 'AccessTier': 'DEEP_ARCHIVE_ACCESS'} ] } )
undefined

Tools & Technologies

工具与技术

ToolPurposeVersion (2025)
AWS S3Object storageLatest
AWS GlueETL service4.0
AWS EMRManaged Spark7.0+
BigQueryAnalytics DWLatest
Cloud DataflowStream/batchLatest
Azure Data FactoryETL/ELTLatest
TerraformIaC1.6+
PulumiIaC (Python)3.0+
工具用途版本(2025)
AWS S3对象存储最新版本
AWS GlueETL服务4.0
AWS EMR托管Spark服务7.0+
BigQuery分析型数据仓库最新版本
Cloud Dataflow流/批处理最新版本
Azure Data FactoryETL/ELT服务最新版本
Terraform基础设施即代码1.6+
Pulumi基础设施即代码(Python)3.0+

Troubleshooting Guide

故障排查指南

IssueSymptomsRoot CauseFix
Permission DeniedAccessDenied errorIAM policy missingCheck IAM roles
TimeoutLambda/Function timeoutLong-running processIncrease timeout, use Step Functions
Cost SpikeUnexpected chargesUnoptimized queries/storageEnable cost alerts, lifecycle policies
Cold StartSlow first invocationLambda cold startProvisioned concurrency
问题症状根本原因解决方法
权限拒绝AccessDenied错误IAM策略缺失检查IAM角色
超时Lambda/函数超时运行时间过长的进程增加超时时间,使用Step Functions
成本激增意外收费查询/存储未优化启用成本告警,配置生命周期策略
冷启动首次调用缓慢Lambda冷启动配置预配置并发

Best Practices

最佳实践

python
undefined
python
undefined

✅ DO: Use IAM roles, not access keys

✅ DO: Use IAM roles, not access keys

session = boto3.Session() # Uses instance role
session = boto3.Session() # Uses instance role

✅ DO: Enable encryption at rest

✅ DO: Enable encryption at rest

s3.put_bucket_encryption( Bucket='my-bucket', ServerSideEncryptionConfiguration={...} )
s3.put_bucket_encryption( Bucket='my-bucket', ServerSideEncryptionConfiguration={...} )

✅ DO: Use VPC endpoints for private access

✅ DO: Use VPC endpoints for private access

✅ DO: Enable CloudWatch alarms for monitoring

✅ DO: Enable CloudWatch alarms for monitoring

✅ DO: Use tags for cost allocation

✅ DO: Use tags for cost allocation

❌ DON'T: Hard-code credentials

❌ DON'T: Hard-code credentials

❌ DON'T: Use root account for operations

❌ DON'T: Use root account for operations

❌ DON'T: Leave buckets public

❌ DON'T: Leave buckets public

undefined
undefined

Resources

资源


Skill Certification Checklist:
  • Can design cloud data lake architecture
  • Can implement ETL with Glue/Dataflow
  • Can manage infrastructure with Terraform
  • Can optimize cloud costs
  • Can implement security best practices

技能认证检查清单:
  • 能够设计云数据湖架构
  • 能够使用Glue/Dataflow实现ETL
  • 能够使用Terraform管理基础设施
  • 能够优化云成本
  • 能够实施安全最佳实践