datatalks-data-engineering-zoomcamp
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDataTalks Data Engineering Zoomcamp
DataTalks 数据工程训练营
Overview
概述
The Data Engineering Zoomcamp is a comprehensive 9-week free course covering production-ready data pipeline development. It includes hands-on modules on containerization (Docker), infrastructure as code (Terraform), workflow orchestration (Kestra), data warehousing (BigQuery), analytics engineering (dbt), data platforms (Bruin), batch processing (Spark), and streaming (Kafka).
The course operates in cohorts (next starts January 2026) but all materials are available for self-paced learning.
数据工程训练营是一门为期9周的综合性免费课程,涵盖可用于生产环境的数据管道开发。课程包含容器化(Docker)、基础设施即代码(Terraform)、工作流编排(Kestra)、数据仓库(BigQuery)、分析工程(dbt)、数据平台(Bruin)、批处理(Spark)和流处理(Kafka)的实操模块。
课程以 cohort 形式开展(下一期将于2026年1月启动),但所有学习材料均支持自主进度学习。
Prerequisites
前置要求
- Basic coding experience
- SQL familiarity
- Python knowledge (helpful but not required)
- Git installed
- Docker Desktop or Docker Engine
- Google Cloud Platform (GCP) account (free tier)
- 基础编码经验
- 熟悉SQL
- 掌握Python(有帮助但非必需)
- 已安装Git
- 已安装Docker Desktop或Docker Engine
- Google Cloud Platform(GCP)账号(免费层级即可)
Course Structure
课程结构
Module 1: Docker & Terraform
模块1:Docker & Terraform
Set up containerized PostgreSQL database:
bash
undefined搭建容器化PostgreSQL数据库:
bash
undefinedCreate network
Create network
docker network create pg-network
docker network create pg-network
Run PostgreSQL
Run PostgreSQL
docker run -d
--name pg-database
--network pg-network
-e POSTGRES_USER=root
-e POSTGRES_PASSWORD=root
-e POSTGRES_DB=ny_taxi
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data
-p 5432:5432
postgres:13
--name pg-database
--network pg-network
-e POSTGRES_USER=root
-e POSTGRES_PASSWORD=root
-e POSTGRES_DB=ny_taxi
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data
-p 5432:5432
postgres:13
docker run -d
--name pg-database
--network pg-network
-e POSTGRES_USER=root
-e POSTGRES_PASSWORD=root
-e POSTGRES_DB=ny_taxi
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data
-p 5432:5432
postgres:13
--name pg-database
--network pg-network
-e POSTGRES_USER=root
-e POSTGRES_PASSWORD=root
-e POSTGRES_DB=ny_taxi
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data
-p 5432:5432
postgres:13
Run pgAdmin
Run pgAdmin
docker run -d
--name pgadmin
--network pg-network
-e PGADMIN_DEFAULT_EMAIL=admin@admin.com
-e PGADMIN_DEFAULT_PASSWORD=root
-p 8080:80
dpage/pgadmin4
--name pgadmin
--network pg-network
-e PGADMIN_DEFAULT_EMAIL=admin@admin.com
-e PGADMIN_DEFAULT_PASSWORD=root
-p 8080:80
dpage/pgadmin4
**Docker Compose for entire stack:**
```yamldocker run -d
--name pgadmin
--network pg-network
-e PGADMIN_DEFAULT_EMAIL=admin@admin.com
-e PGADMIN_DEFAULT_PASSWORD=root
-p 8080:80
dpage/pgadmin4
--name pgadmin
--network pg-network
-e PGADMIN_DEFAULT_EMAIL=admin@admin.com
-e PGADMIN_DEFAULT_PASSWORD=root
-p 8080:80
dpage/pgadmin4
**使用Docker Compose管理整个技术栈:**
```yamldocker-compose.yaml
docker-compose.yaml
services:
pgdatabase:
image: postgres:13
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=root
- POSTGRES_DB=ny_taxi
volumes:
- ./ny_taxi_postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
pgadmin:
image: dpage/pgadmin4
environment:
- PGADMIN_DEFAULT_EMAIL=admin@admin.com
- PGADMIN_DEFAULT_PASSWORD=root
ports:
- "8080:80"
```bashservices:
pgdatabase:
image: postgres:13
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=root
- POSTGRES_DB=ny_taxi
volumes:
- ./ny_taxi_postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
pgadmin:
image: dpage/pgadmin4
environment:
- PGADMIN_DEFAULT_EMAIL=admin@admin.com
- PGADMIN_DEFAULT_PASSWORD=root
ports:
- "8080:80"
```bashStart services
Start services
docker-compose up -d
docker-compose up -d
Stop services
Stop services
docker-compose down
**Terraform GCP setup:**
```hcldocker-compose down
**Terraform GCP配置:**
```hclmain.tf
main.tf
terraform {
required_version = ">= 1.0"
backend "local" {}
required_providers {
google = {
source = "hashicorp/google"
}
}
}
provider "google" {
project = var.project
region = var.region
}
terraform {
required_version = ">= 1.0"
backend "local" {}
required_providers {
google = {
source = "hashicorp/google"
}
}
}
provider "google" {
project = var.project
region = var.region
}
Data Lake Bucket
Data Lake Bucket
resource "google_storage_bucket" "data-lake-bucket" {
name = "${local.data_lake_bucket}_${var.project}"
location = var.region
storage_class = var.storage_class
uniform_bucket_level_access = true
versioning {
enabled = true
}
lifecycle_rule {
action {
type = "Delete"
}
condition {
age = 30
}
}
force_destroy = true
}
resource "google_storage_bucket" "data-lake-bucket" {
name = "${local.data_lake_bucket}_${var.project}"
location = var.region
storage_class = var.storage_class
uniform_bucket_level_access = true
versioning {
enabled = true
}
lifecycle_rule {
action {
type = "Delete"
}
condition {
age = 30
}
}
force_destroy = true
}
BigQuery Dataset
BigQuery Dataset
resource "google_bigquery_dataset" "dataset" {
dataset_id = var.BQ_DATASET
project = var.project
location = var.region
}
```hclresource "google_bigquery_dataset" "dataset" {
dataset_id = var.BQ_DATASET
project = var.project
location = var.region
}
```hclvariables.tf
variables.tf
locals {
data_lake_bucket = "dtc_data_lake"
}
variable "project" {
description = "Your GCP Project ID"
}
variable "region" {
description = "Region for GCP resources"
default = "europe-west6"
type = string
}
variable "storage_class" {
description = "Storage class type for your bucket"
default = "STANDARD"
}
variable "BQ_DATASET" {
description = "BigQuery Dataset"
type = string
default = "trips_data_all"
}
```bashlocals {
data_lake_bucket = "dtc_data_lake"
}
variable "project" {
description = "Your GCP Project ID"
}
variable "region" {
description = "Region for GCP resources"
default = "europe-west6"
type = string
}
variable "storage_class" {
description = "Storage class type for your bucket"
default = "STANDARD"
}
variable "BQ_DATASET" {
description = "BigQuery Dataset"
type = string
default = "trips_data_all"
}
```bashInitialize Terraform
Initialize Terraform
terraform init
terraform init
Plan infrastructure
Plan infrastructure
terraform plan
terraform plan
Apply infrastructure
Apply infrastructure
terraform apply
terraform apply
Destroy infrastructure
Destroy infrastructure
terraform destroy
undefinedterraform destroy
undefinedModule 2: Workflow Orchestration (Kestra)
模块2:工作流编排(Kestra)
Example Kestra workflow for data ingestion:
yaml
undefined数据 ingestion 示例Kestra工作流:
yaml
undefinedflows/ingest_data.yaml
flows/ingest_data.yaml
id: ingest_ny_taxi_data
namespace: zoomcamp
tasks:
-
id: download_data type: io.kestra.core.tasks.scripts.Bash commands:
- wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
- gunzip yellow_tripdata_2021-01.csv.gz
-
id: python_ingest type: io.kestra.plugin.scripts.python.Script docker: image: python:3.9 script: | import pandas as pd from sqlalchemy import create_engine import osdf = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100000)engine = create_engine(os.getenv('POSTGRES_CONNECTION')) df.to_sql('yellow_taxi_data', engine, if_exists='replace', chunksize=10000)print(f"Inserted {len(df)} rows")
-
id: log_completion type: io.kestra.core.tasks.log.Log message: "Data ingestion completed successfully"
**Python data ingestion script:**
```pythonid: ingest_ny_taxi_data
namespace: zoomcamp
tasks:
-
id: download_data type: io.kestra.core.tasks.scripts.Bash commands:
- wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
- gunzip yellow_tripdata_2021-01.csv.gz
-
id: python_ingest type: io.kestra.plugin.scripts.python.Script docker: image: python:3.9 script: | import pandas as pd from sqlalchemy import create_engine import osdf = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100000)engine = create_engine(os.getenv('POSTGRES_CONNECTION')) df.to_sql('yellow_taxi_data', engine, if_exists='replace', chunksize=10000)print(f"Inserted {len(df)} rows")
-
id: log_completion type: io.kestra.core.tasks.log.Log message: "Data ingestion completed successfully"
**Python数据 ingestion 脚本:**
```pythoningest_data.py
ingest_data.py
import pandas as pd
from sqlalchemy import create_engine
import argparse
from time import time
def main(params):
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
table_name = params.table_name
url = params.url
# Download CSV
csv_name = 'output.csv'
os.system(f"wget {url} -O {csv_name}")
# Create engine
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
# Read CSV in chunks
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)
df = next(df_iter)
# Convert datetime columns
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
# Create table
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
# Insert first chunk
df.to_sql(name=table_name, con=engine, if_exists='append')
# Insert remaining chunks
while True:
try:
t_start = time()
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name=table_name, con=engine, if_exists='append')
t_end = time()
print(f'Inserted another chunk, took %.3f seconds' % (t_end - t_start))
except StopIteration:
print("Finished ingesting data")
breakif name == 'main':
parser = argparse.ArgumentParser(description='Ingest CSV data to Postgres')
parser.add_argument('--user', required=True, help='user name for postgres')
parser.add_argument('--password', required=True, help='password for postgres')
parser.add_argument('--host', required=True, help='host for postgres')
parser.add_argument('--port', required=True, help='port for postgres')
parser.add_argument('--db', required=True, help='database name for postgres')
parser.add_argument('--table_name', required=True, help='name of the table')
parser.add_argument('--url', required=True, help='url of the csv file')
args = parser.parse_args()
main(args)
```bashimport pandas as pd
from sqlalchemy import create_engine
import argparse
from time import time
def main(params):
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
table_name = params.table_name
url = params.url
# Download CSV
csv_name = 'output.csv'
os.system(f"wget {url} -O {csv_name}")
# Create engine
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
# Read CSV in chunks
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)
df = next(df_iter)
# Convert datetime columns
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
# Create table
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
# Insert first chunk
df.to_sql(name=table_name, con=engine, if_exists='append')
# Insert remaining chunks
while True:
try:
t_start = time()
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name=table_name, con=engine, if_exists='append')
t_end = time()
print(f'Inserted another chunk, took %.3f seconds' % (t_end - t_start))
except StopIteration:
print("Finished ingesting data")
breakif name == 'main':
parser = argparse.ArgumentParser(description='Ingest CSV data to Postgres')
parser.add_argument('--user', required=True, help='user name for postgres')
parser.add_argument('--password', required=True, help='password for postgres')
parser.add_argument('--host', required=True, help='host for postgres')
parser.add_argument('--port', required=True, help='port for postgres')
parser.add_argument('--db', required=True, help='database name for postgres')
parser.add_argument('--table_name', required=True, help='name of the table')
parser.add_argument('--url', required=True, help='url of the csv file')
args = parser.parse_args()
main(args)
```bashRun ingestion script
Run ingestion script
python ingest_data.py
--user=root
--password=root
--host=localhost
--port=5432
--db=ny_taxi
--table_name=yellow_taxi_trips
--url=https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
--user=root
--password=root
--host=localhost
--port=5432
--db=ny_taxi
--table_name=yellow_taxi_trips
--url=https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
undefinedpython ingest_data.py
--user=root
--password=root
--host=localhost
--port=5432
--db=ny_taxi
--table_name=yellow_taxi_trips
--url=https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
--user=root
--password=root
--host=localhost
--port=5432
--db=ny_taxi
--table_name=yellow_taxi_trips
--url=https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
undefinedModule 3: Data Warehouse (BigQuery)
模块3:数据仓库(BigQuery)
Create partitioned and clustered table:
sql
-- Create external table
CREATE OR REPLACE EXTERNAL TABLE `trips_data_all.external_yellow_tripdata`
OPTIONS (
format = 'CSV',
uris = ['gs://nyc-tl-data/trip data/yellow_tripdata_2019-*.csv',
'gs://nyc-tl-data/trip data/yellow_tripdata_2020-*.csv']
);
-- Create partitioned table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned`
PARTITION BY
DATE(tpep_pickup_datetime) AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;
-- Create partitioned and clustered table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned_clustered`
PARTITION BY DATE(tpep_pickup_datetime)
CLUSTER BY VendorID AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;
-- Query comparison
SELECT DISTINCT(VendorID)
FROM `trips_data_all.yellow_tripdata_partitioned`
WHERE DATE(tpep_pickup_datetime) BETWEEN '2020-06-01' AND '2020-06-30';
-- This query will process less data vs non-partitionedLoad data from GCS to BigQuery:
python
undefined创建分区和聚类表:
sql
-- Create external table
CREATE OR REPLACE EXTERNAL TABLE `trips_data_all.external_yellow_tripdata`
OPTIONS (
format = 'CSV',
uris = ['gs://nyc-tl-data/trip data/yellow_tripdata_2019-*.csv',
'gs://nyc-tl-data/trip data/yellow_tripdata_2020-*.csv']
);
-- Create partitioned table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned`
PARTITION BY
DATE(tpep_pickup_datetime) AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;
-- Create partitioned and clustered table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned_clustered`
PARTITION BY DATE(tpep_pickup_datetime)
CLUSTER BY VendorID AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;
-- Query comparison
SELECT DISTINCT(VendorID)
FROM `trips_data_all.yellow_tripdata_partitioned`
WHERE DATE(tpep_pickup_datetime) BETWEEN '2020-06-01' AND '2020-06-30';
-- This query will process less data vs non-partitioned从GCS加载数据到BigQuery:
python
undefinedload_to_bq.py
load_to_bq.py
from google.cloud import bigquery
import os
from google.cloud import bigquery
import os
Set credentials
Set credentials
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'path/to/credentials.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'path/to/credentials.json'
Initialize client
Initialize client
client = bigquery.Client()
client = bigquery.Client()
Define table
Define table
table_id = 'your-project.trips_data_all.yellow_tripdata'
table_id = 'your-project.trips_data_all.yellow_tripdata'
Configure load job
Configure load job
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
uri = 'gs://your-bucket/yellow_tripdata_2021-01.parquet'
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
uri = 'gs://your-bucket/yellow_tripdata_2021-01.parquet'
Load data
Load data
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
)
load_job.result() # Wait for job to complete
print(f"Loaded {load_job.output_rows} rows to {table_id}")
undefinedload_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
)
load_job.result() # Wait for job to complete
print(f"Loaded {load_job.output_rows} rows to {table_id}")
undefinedModule 4: Analytics Engineering (dbt)
模块4:分析工程(dbt)
Project structure:
dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/
│ │ ├── stg_yellow_tripdata.sql
│ │ └── schema.yml
│ └── core/
│ ├── fact_trips.sql
│ └── dim_zones.sql
└── macros/
└── get_payment_type_description.sqldbt_project.yml:
yaml
name: 'taxi_rides_ny'
version: '1.0.0'
config-version: 2
profile: 'default'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
taxi_rides_ny:
staging:
+materialized: view
core:
+materialized: tableprofiles.yml:
yaml
default:
outputs:
dev:
type: bigquery
method: service-account
project: "{{ env_var('GCP_PROJECT_ID') }}"
dataset: dbt_dev
threads: 4
keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
location: EU
prod:
type: bigquery
method: service-account
project: "{{ env_var('GCP_PROJECT_ID') }}"
dataset: production
threads: 4
keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
location: EU
target: devStaging model (models/staging/stg_yellow_tripdata.sql):
sql
{{ config(materialized='view') }}
with tripdata as
(
select *,
row_number() over(partition by vendorid, tpep_pickup_datetime) as rn
from {{ source('staging','yellow_tripdata') }}
where vendorid is not null
)
select
-- identifiers
{{ dbt_utils.generate_surrogate_key(['vendorid', 'tpep_pickup_datetime']) }} as tripid,
cast(vendorid as integer) as vendorid,
cast(ratecodeid as integer) as ratecodeid,
cast(pulocationid as integer) as pickup_locationid,
cast(dolocationid as integer) as dropoff_locationid,
-- timestamps
cast(tpep_pickup_datetime as timestamp) as pickup_datetime,
cast(tpep_dropoff_datetime as timestamp) as dropoff_datetime,
-- trip info
store_and_fwd_flag,
cast(passenger_count as integer) as passenger_count,
cast(trip_distance as numeric) as trip_distance,
-- payment info
cast(fare_amount as numeric) as fare_amount,
cast(extra as numeric) as extra,
cast(mta_tax as numeric) as mta_tax,
cast(tip_amount as numeric) as tip_amount,
cast(tolls_amount as numeric) as tolls_amount,
cast(improvement_surcharge as numeric) as improvement_surcharge,
cast(total_amount as numeric) as total_amount,
cast(payment_type as integer) as payment_type,
{{ get_payment_type_description('payment_type') }} as payment_type_description
from tripdata
where rn = 1Core model (models/core/fact_trips.sql):
sql
{{ config(materialized='table') }}
with green_data as (
select *,
'Green' as service_type
from {{ ref('stg_green_tripdata') }}
),
yellow_data as (
select *,
'Yellow' as service_type
from {{ ref('stg_yellow_tripdata') }}
),
trips_unioned as (
select * from green_data
union all
select * from yellow_data
),
dim_zones as (
select * from {{ ref('dim_zones') }}
where borough != 'Unknown'
)
select
trips_unioned.tripid,
trips_unioned.vendorid,
trips_unioned.service_type,
trips_unioned.ratecodeid,
trips_unioned.pickup_locationid,
pickup_zone.borough as pickup_borough,
pickup_zone.zone as pickup_zone,
trips_unioned.dropoff_locationid,
dropoff_zone.borough as dropoff_borough,
dropoff_zone.zone as dropoff_zone,
trips_unioned.pickup_datetime,
trips_unioned.dropoff_datetime,
trips_unioned.store_and_fwd_flag,
trips_unioned.passenger_count,
trips_unioned.trip_distance,
trips_unioned.fare_amount,
trips_unioned.extra,
trips_unioned.mta_tax,
trips_unioned.tip_amount,
trips_unioned.tolls_amount,
trips_unioned.total_amount,
trips_unioned.payment_type,
trips_unioned.payment_type_description
from trips_unioned
inner join dim_zones as pickup_zone
on trips_unioned.pickup_locationid = pickup_zone.locationid
inner join dim_zones as dropoff_zone
on trips_unioned.dropoff_locationid = dropoff_zone.locationidMacro (macros/get_payment_type_description.sql):
sql
{#
This macro returns the description of the payment_type
#}
{% macro get_payment_type_description(payment_type) -%}
case {{ payment_type }}
when 1 then 'Credit card'
when 2 then 'Cash'
when 3 then 'No charge'
when 4 then 'Dispute'
when 5 then 'Unknown'
when 6 then 'Voided trip'
end
{%- endmacro %}Schema and tests (models/staging/schema.yml):
yaml
version: 2
sources:
- name: staging
database: "{{ env_var('GCP_PROJECT_ID') }}"
schema: trips_data_all
tables:
- name: yellow_tripdata
- name: green_tripdata
models:
- name: stg_yellow_tripdata
description: >
Trip made by yellow taxis.
columns:
- name: tripid
description: Primary key for this table, generated with a concatenation of vendorid+pickup_datetime
tests:
- unique:
severity: warn
- not_null:
severity: warn
- name: vendorid
description: >
A code indicating the TPEP provider that provided the record.
tests:
- accepted_values:
values: [1, 2]
- name: pickup_datetime
description: The date and time when the meter was engaged.
tests:
- not_null:
severity: warn
- name: passenger_count
description: The number of passengers in the vehicle.
tests:
- accepted_values:
values: [1, 2, 3, 4, 5, 6]
severity: warndbt commands:
bash
undefined项目结构:
dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/
│ │ ├── stg_yellow_tripdata.sql
│ │ └── schema.yml
│ └── core/
│ ├── fact_trips.sql
│ └── dim_zones.sql
└── macros/
└── get_payment_type_description.sqldbt_project.yml:
yaml
name: 'taxi_rides_ny'
version: '1.0.0'
config-version: 2
profile: 'default'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
taxi_rides_ny:
staging:
+materialized: view
core:
+materialized: tableprofiles.yml:
yaml
default:
outputs:
dev:
type: bigquery
method: service-account
project: "{{ env_var('GCP_PROJECT_ID') }}"
dataset: dbt_dev
threads: 4
keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
location: EU
prod:
type: bigquery
method: service-account
project: "{{ env_var('GCP_PROJECT_ID') }}"
dataset: production
threads: 4
keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
location: EU
target: devStaging模型(models/staging/stg_yellow_tripdata.sql):
sql
{{ config(materialized='view') }}
with tripdata as
(
select *,
row_number() over(partition by vendorid, tpep_pickup_datetime) as rn
from {{ source('staging','yellow_tripdata') }}
where vendorid is not null
)
select
-- identifiers
{{ dbt_utils.generate_surrogate_key(['vendorid', 'tpep_pickup_datetime']) }} as tripid,
cast(vendorid as integer) as vendorid,
cast(ratecodeid as integer) as ratecodeid,
cast(pulocationid as integer) as pickup_locationid,
cast(dolocationid as integer) as dropoff_locationid,
-- timestamps
cast(tpep_pickup_datetime as timestamp) as pickup_datetime,
cast(tpep_dropoff_datetime as timestamp) as dropoff_datetime,
-- trip info
store_and_fwd_flag,
cast(passenger_count as integer) as passenger_count,
cast(trip_distance as numeric) as trip_distance,
-- payment info
cast(fare_amount as numeric) as fare_amount,
cast(extra as numeric) as extra,
cast(mta_tax as numeric) as mta_tax,
cast(tip_amount as numeric) as tip_amount,
cast(tolls_amount as numeric) as tolls_amount,
cast(improvement_surcharge as numeric) as improvement_surcharge,
cast(total_amount as numeric) as total_amount,
cast(payment_type as integer) as payment_type,
{{ get_payment_type_description('payment_type') }} as payment_type_description
from tripdata
where rn = 1Core模型(models/core/fact_trips.sql):
sql
{{ config(materialized='table') }}
with green_data as (
select *,
'Green' as service_type
from {{ ref('stg_green_tripdata') }}
),
yellow_data as (
select *,
'Yellow' as service_type
from {{ ref('stg_yellow_tripdata') }}
),
trips_unioned as (
select * from green_data
union all
select * from yellow_data
),
dim_zones as (
select * from {{ ref('dim_zones') }}
where borough != 'Unknown'
)
select
trips_unioned.tripid,
trips_unioned.vendorid,
trips_unioned.service_type,
trips_unioned.ratecodeid,
trips_unioned.pickup_locationid,
pickup_zone.borough as pickup_borough,
pickup_zone.zone as pickup_zone,
trips_unioned.dropoff_locationid,
dropoff_zone.borough as dropoff_borough,
dropoff_zone.zone as dropoff_zone,
trips_unioned.pickup_datetime,
trips_unioned.dropoff_datetime,
trips_unioned.store_and_fwd_flag,
trips_unioned.passenger_count,
trips_unioned.trip_distance,
trips_unioned.fare_amount,
trips_unioned.extra,
trips_unioned.mta_tax,
trips_unioned.tip_amount,
trips_unioned.tolls_amount,
trips_unioned.total_amount,
trips_unioned.payment_type,
trips_unioned.payment_type_description
from trips_unioned
inner join dim_zones as pickup_zone
on trips_unioned.pickup_locationid = pickup_zone.locationid
inner join dim_zones as dropoff_zone
on trips_unioned.dropoff_locationid = dropoff_zone.locationid宏(macros/get_payment_type_description.sql):
sql
{#
This macro returns the description of the payment_type
#}
{% macro get_payment_type_description(payment_type) -%}
case {{ payment_type }}
when 1 then 'Credit card'
when 2 then 'Cash'
when 3 then 'No charge'
when 4 then 'Dispute'
when 5 then 'Unknown'
when 6 then 'Voided trip'
end
{%- endmacro %}Schema和测试(models/staging/schema.yml):
yaml
version: 2
sources:
- name: staging
database: "{{ env_var('GCP_PROJECT_ID') }}"
schema: trips_data_all
tables:
- name: yellow_tripdata
- name: green_tripdata
models:
- name: stg_yellow_tripdata
description: >
Trip made by yellow taxis.
columns:
- name: tripid
description: Primary key for this table, generated with a concatenation of vendorid+pickup_datetime
tests:
- unique:
severity: warn
- not_null:
severity: warn
- name: vendorid
description: >
A code indicating the TPEP provider that provided the record.
tests:
- accepted_values:
values: [1, 2]
- name: pickup_datetime
description: The date and time when the meter was engaged.
tests:
- not_null:
severity: warn
- name: passenger_count
description: The number of passengers in the vehicle.
tests:
- accepted_values:
values: [1, 2, 3, 4, 5, 6]
severity: warndbt命令:
bash
undefinedInstall dependencies
Install dependencies
dbt deps
dbt deps
Run models
Run models
dbt run
dbt run
Run specific model
Run specific model
dbt run --select stg_yellow_tripdata
dbt run --select stg_yellow_tripdata
Test models
Test models
dbt test
dbt test
Generate documentation
Generate documentation
dbt docs generate
dbt docs generate
Serve documentation
Serve documentation
dbt docs serve
dbt docs serve
Run models and tests
Run models and tests
dbt build
dbt build
Run with specific target
Run with specific target
dbt run --target prod
undefineddbt run --target prod
undefinedModule 6: Batch Processing (Spark)
模块6:批处理(Spark)
PySpark setup:
python
undefinedPySpark配置:
python
undefinedDownload Spark
Download Spark
wget https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
tar xzfv spark-3.3.2-bin-hadoop3.tgz
rm spark-3.3.2-bin-hadoop3.tgz
wget https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
tar xzfv spark-3.3.2-bin-hadoop3.tgz
rm spark-3.3.2-bin-hadoop3.tgz
Set environment variables
Set environment variables
export SPARK_HOME="${HOME}/spark-3.3.2-bin-hadoop3"
export PATH="${SPARK_HOME}/bin:${PATH}"
**PySpark script for data processing:**
```pythonexport SPARK_HOME="${HOME}/spark-3.3.2-bin-hadoop3"
export PATH="${SPARK_HOME}/bin:${PATH}"
**PySpark数据处理脚本:**
```pythonspark_processing.py
spark_processing.py
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
Create Spark session
Create Spark session
spark = SparkSession.builder
.master("local[*]")
.appName('test')
.getOrCreate()
.master("local[*]")
.appName('test')
.getOrCreate()
spark = SparkSession.builder
.master("local[*]")
.appName('test')
.getOrCreate()
.master("local[*]")
.appName('test')
.getOrCreate()
Define schema
Define schema
schema = types.StructType([
types.StructField('hvfhs_license_num', types.StringType(), True),
types.StructField('dispatching_base_num', types.StringType(), True),
types.StructField('pickup_datetime', types.TimestampType(), True),
types.StructField('dropoff_datetime', types.TimestampType(), True),
types.StructField('PULocationID', types.IntegerType(), True),
types.StructField('DOLocationID', types.IntegerType(), True),
types.StructField('SR_Flag', types.StringType(), True)
])
schema = types.StructType([
types.StructField('hvfhs_license_num', types.StringType(), True),
types.StructField('dispatching_base_num', types.StringType(), True),
types.StructField('pickup_datetime', types.TimestampType(), True),
types.StructField('dropoff_datetime', types.TimestampType(), True),
types.StructField('PULocationID', types.IntegerType(), True),
types.StructField('DOLocationID', types.IntegerType(), True),
types.StructField('SR_Flag', types.StringType(), True)
])
Read CSV
Read CSV
df = spark.read
.option("header", "true")
.schema(schema)
.csv('fhvhv_tripdata_2021-01.csv')
.option("header", "true")
.schema(schema)
.csv('fhvhv_tripdata_2021-01.csv')
df = spark.read
.option("header", "true")
.schema(schema)
.csv('fhvhv_tripdata_2021-01.csv')
.option("header", "true")
.schema(schema)
.csv('fhvhv_tripdata_2021-01.csv')
Show schema
Show schema
df.printSchema()
df.printSchema()
Repartition and save as parquet
Repartition and save as parquet
df.repartition(24)
.write.parquet('fhvhv/2021/01/', mode='overwrite')
.write.parquet('fhvhv/2021/01/', mode='overwrite')
df.repartition(24)
.write.parquet('fhvhv/2021/01/', mode='overwrite')
.write.parquet('fhvhv/2021/01/', mode='overwrite')
Read parquet
Read parquet
df = spark.read.parquet('fhvhv/2021/01/')
df = spark.read.parquet('fhvhv/2021/01/')
SQL transformations
SQL transformations
df.registerTempTable('fhvhv_2021_01')
spark.sql("""
SELECT
PULocationID AS revenue_zone,
date_trunc('month', pickup_datetime) AS revenue_month,
COUNT(1) AS number_of_trips
FROM
fhvhv_2021_01
WHERE
hvfhs_license_num = 'HV0003'
GROUP BY
1, 2
ORDER BY
1, 2
""").show()
df.registerTempTable('fhvhv_2021_01')
spark.sql("""
SELECT
PULocationID AS revenue_zone,
date_trunc('month', pickup_datetime) AS revenue_month,
COUNT(1) AS number_of_trips
FROM
fhvhv_2021_01
WHERE
hvfhs_license_num = 'HV0003'
GROUP BY
1, 2
ORDER BY
1, 2
""").show()
DataFrame API
DataFrame API
df_result = df
.withColumn('revenue_month', F.date_trunc('month', 'pickup_datetime'))
.filter(F.col('hvfhs_license_num') == 'HV0003')
.groupBy('PULocationID', 'revenue_month')
.agg(F.count('*').alias('number_of_trips'))
.orderBy('PULocationID', 'revenue_month')
.withColumn('revenue_month', F.date_trunc('month', 'pickup_datetime'))
.filter(F.col('hvfhs_license_num') == 'HV0003')
.groupBy('PULocationID', 'revenue_month')
.agg(F.count('*').alias('number_of_trips'))
.orderBy('PULocationID', 'revenue_month')
df_result.show()
df_result = df
.withColumn('revenue_month', F.date_trunc('month', 'pickup_datetime'))
.filter(F.col('hvfhs_license_num') == 'HV0003')
.groupBy('PULocationID', 'revenue_month')
.agg(F.count('*').alias('number_of_trips'))
.orderBy('PULocationID', 'revenue_month')
.withColumn('revenue_month', F.date_trunc('month', 'pickup_datetime'))
.filter(F.col('hvfhs_license_num') == 'HV0003')
.groupBy('PULocationID', 'revenue_month')
.agg(F.count('*').alias('number_of_trips'))
.orderBy('PULocationID', 'revenue_month')
df_result.show()
Write results
Write results
df_result.coalesce(1).write.parquet('tmp/revenue-zones', mode='overwrite')
**Spark with Google Cloud Storage:**
```python
import pyspark
from pyspark.sql import SparkSessiondf_result.coalesce(1).write.parquet('tmp/revenue-zones', mode='overwrite')
**Spark对接Google Cloud Storage:**
```python
import pyspark
from pyspark.sql import SparkSessionConfigure Spark for GCS
Configure Spark for GCS
spark = SparkSession.builder
.master("local[*]")
.appName('test')
.config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar")
.getOrCreate()
.master("local[*]")
.appName('test')
.config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar")
.getOrCreate()
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile",
"path/to/credentials.json")
spark = SparkSession.builder
.master("local[*]")
.appName('test')
.config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar")
.getOrCreate()
.master("local[*]")
.appName('test')
.config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar")
.getOrCreate()
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile",
"path/to/credentials.json")
Read from GCS
Read from GCS
df_green = spark.read.parquet('gs://your-bucket/pq/green//')
df_green = spark.read.parquet('gs://your-bucket/pq/green//')
Write to BigQuery
Write to BigQuery
df_green.write.format('bigquery')
.option('table', 'trips_data_all.green_tripdata')
.option('temporaryGcsBucket', 'your-temp-bucket')
.mode('overwrite')
.save()
.option('table', 'trips_data_all.green_tripdata')
.option('temporaryGcsBucket', 'your-temp-bucket')
.mode('overwrite')
.save()
undefineddf_green.write.format('bigquery')
.option('table', 'trips_data_all.green_tripdata')
.option('temporaryGcsBucket', 'your-temp-bucket')
.mode('overwrite')
.save()
.option('table', 'trips_data_all.green_tripdata')
.option('temporaryGcsBucket', 'your-temp-bucket')
.mode('overwrite')
.save()
undefinedModule 7: Streaming (Kafka)
模块7:流处理(Kafka)
Docker Compose for Kafka:
yaml
undefinedKafka的Docker Compose配置:
yaml
undefineddocker-compose-kafka.yml
docker-compose-kafka.yml
version: '3.6'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.2.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
schema-registry:
image: confluentinc/cp-schema-registry:7.2.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
control-center:
image: confluentinc/cp-enterprise-control-center:7.2.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
PORT: 9021
```bashversion: '3.6'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.2.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
schema-registry:
image: confluentinc/cp-schema-registry:7.2.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
control-center:
image: confluentinc/cp-enterprise-control-center:7.2.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
PORT: 9021
```bashStart Kafka
Start Kafka
docker-compose -f docker-compose-kafka.yml up -d
**Python Kafka producer:**
```pythondocker-compose -f docker-compose-kafka.yml up -d
**Python Kafka生产者:**
```pythonproducer.py
producer.py
from kafka import KafkaProducer
import json
import time
from datetime import datetime
from kafka import KafkaProducer
import json
import time
from datetime import datetime
Create producer
Create producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
Send messages
Send messages
for i in range(100):
message = {
'trip_id': i,
'vendor_id': 1,
'pickup_datetime': datetime.now().isoformat(),
'passenger_count': 1,
'trip_distance': 5.2
}
producer.send('rides', value=message)
print(f"Sent message {i}")
time.sleep(1)producer.flush()
**Python Kafka consumer:**
```pythonfor i in range(100):
message = {
'trip_id': i,
'vendor_id': 1,
'pickup_datetime': datetime.now().isoformat(),
'passenger_count': 1,
'trip_distance': 5.2
}
producer.send('rides', value=message)
print(f"Sent message {i}")
time.sleep(1)producer.flush()
**Python Kafka消费者:**
```pythonconsumer.py
consumer.py
from kafka import KafkaConsumer
import json
from kafka import KafkaConsumer
import json
Create consumer
Create consumer
consumer = KafkaConsumer(
'rides',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
consumer = KafkaConsumer(
'rides',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
Consume messages
Consume messages
for message in consumer:
print(f"Received: {message.value}")
# Process message
trip_id = message.value['trip_id']
trip_distance = message.value['trip_distance']
print(f"Trip {trip_id}: {trip_distance} miles")
**Kafka Streams example:**
```pythonfor message in consumer:
print(f"Received: {message.value}")
# Process message
trip_id = message.value['trip_id']
trip_distance = message.value['trip_distance']
print(f"Trip {trip_id}: {trip_distance} miles")
**Kafka Streams示例:**
```pythonstreams_processing.py
streams_processing.py
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
Create topics
Create topics
admin_client = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
topic_list = [
NewTopic(name="rides", num_partitions=2, replication_factor=1),
NewTopic(name="rides-pulocationid", num_partitions=2, replication_factor=1)
]
try:
admin_client.create_topics(new_topics=topic_list, validate_only=False)
except Exception as e:
print(f"Topics might already exist: {e}")
admin_client = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
topic_list = [
NewTopic(name="rides", num_partitions=2, replication_factor=1),
NewTopic(name="rides-pulocationid", num_partitions=2, replication_factor=1)
]
try:
admin_client.create_topics(new_topics=topic_list, validate_only=False)
except Exception as e:
print(f"Topics might already exist: {e}")
Stream processing (aggregation)
Stream processing (aggregation)
from collections import defaultdict
consumer = KafkaConsumer(
'rides',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
from collections import defaultdict
consumer = KafkaConsumer(
'rides',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
Count rides by location
Count rides by location
location_counts = defaultdict(int)
for message in consumer:
ride = message.value
location = ride.get('PULocationID', 'unknown')
location_counts[location] += 1
# Send aggregated results
result = {
'location': location,
'count': location_counts[location]
}
producer.send('rides-pulocationid', value=result)
print(f"Location {location}: {location_counts[location]} rides")undefinedlocation_counts = defaultdict(int)
for message in consumer:
ride = message.value
location = ride.get('PULocationID', 'unknown')
location_counts[location] += 1
# Send aggregated results
result = {
'location': location,
'count': location_counts[location]
}
producer.send('rides-pulocationid', value=result)
print(f"Location {location}: {location_counts[location]} rides")undefinedCommon Workflows
通用工作流
Setting Up Development Environment
搭建开发环境
bash
undefinedbash
undefinedClone repository
Clone repository
git clone https://github.com/DataTalksClub/data-engineering-zoomcamp.git
cd data-engineering-zoomcamp
git clone https://github.com/DataTalksClub/data-engineering-zoomcamp.git
cd data-engineering-zoomcamp
Set up Python environment
Set up Python environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
Set up GCP credentials
Set up GCP credentials
export GOOGLE_APPLICATION_CREDENTIALS="path/to/credentials.json"
export GCP_PROJECT_ID="your-project-id"
undefinedexport GOOGLE_APPLICATION_CREDENTIALS="path/to/credentials.json"
export GCP_PROJECT_ID="your-project-id"
undefinedComplete Module Workflow
完整模块工作流
bash
undefinedbash
undefined1. Start Docker infrastructure
1. Start Docker infrastructure
docker-compose up -d
docker-compose up -d
2. Run Terraform
2. Run Terraform
cd 01-docker-terraform/terraform
terraform init
terraform apply
cd 01-docker-terraform/terraform
terraform init
terraform apply
3. Ingest data
3. Ingest data
python ingest_data.py --params...
python ingest_data.py --params...
4. Run dbt models
4. Run dbt models
cd 04-analytics-engineering
dbt run
dbt test
cd 04-analytics-engineering
dbt run
dbt test
5. Run Spark job
5. Run Spark job
spark-submit
--master local[*]
spark_processing.py
--master local[*]
spark_processing.py
spark-submit
--master local[*]
spark_processing.py
--master local[*]
spark_processing.py
6. Clean up
6. Clean up
terraform destroy
docker-compose down
undefinedterraform destroy
docker-compose down
undefinedHomework Submission Pattern
作业提交流程
bash
undefinedbash
undefinedNavigate to cohort folder
Navigate to cohort folder
cd cohorts/2026/01-docker-terraform
cd cohorts/2026/01-docker-terraform
Complete homework in Jupyter notebook
Complete homework in Jupyter notebook
jupyter notebook homework.ipynb
jupyter notebook homework.ipynb
Export answers
Export answers
Submit through course platform
Submit through course platform
undefinedundefinedTroubleshooting
故障排查
Docker Issues
Docker问题
Port already in use:
bash
undefined端口已被占用:
bash
undefined