datatalks-data-engineering-zoomcamp

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

DataTalks Data Engineering Zoomcamp

DataTalks 数据工程训练营

Skill by ara.so — Data Skills collection.
技能来自 ara.so —— 数据技能合集。

Overview

概述

The Data Engineering Zoomcamp is a comprehensive 9-week free course covering production-ready data pipeline development. It includes hands-on modules on containerization (Docker), infrastructure as code (Terraform), workflow orchestration (Kestra), data warehousing (BigQuery), analytics engineering (dbt), data platforms (Bruin), batch processing (Spark), and streaming (Kafka).
The course operates in cohorts (next starts January 2026) but all materials are available for self-paced learning.
数据工程训练营是一门为期9周的综合性免费课程,涵盖可用于生产环境的数据管道开发。课程包含容器化(Docker)、基础设施即代码(Terraform)、工作流编排(Kestra)、数据仓库(BigQuery)、分析工程(dbt)、数据平台(Bruin)、批处理(Spark)和流处理(Kafka)的实操模块。
课程以 cohort 形式开展(下一期将于2026年1月启动),但所有学习材料均支持自主进度学习。

Prerequisites

前置要求

  • Basic coding experience
  • SQL familiarity
  • Python knowledge (helpful but not required)
  • Git installed
  • Docker Desktop or Docker Engine
  • Google Cloud Platform (GCP) account (free tier)
  • 基础编码经验
  • 熟悉SQL
  • 掌握Python(有帮助但非必需)
  • 已安装Git
  • 已安装Docker Desktop或Docker Engine
  • Google Cloud Platform(GCP)账号(免费层级即可)

Course Structure

课程结构

Module 1: Docker & Terraform

模块1:Docker & Terraform

Set up containerized PostgreSQL database:
bash
undefined
搭建容器化PostgreSQL数据库:
bash
undefined

Create network

Create network

docker network create pg-network
docker network create pg-network

Run PostgreSQL

Run PostgreSQL

docker run -d
--name pg-database
--network pg-network
-e POSTGRES_USER=root
-e POSTGRES_PASSWORD=root
-e POSTGRES_DB=ny_taxi
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data
-p 5432:5432
postgres:13
docker run -d
--name pg-database
--network pg-network
-e POSTGRES_USER=root
-e POSTGRES_PASSWORD=root
-e POSTGRES_DB=ny_taxi
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data
-p 5432:5432
postgres:13

Run pgAdmin

Run pgAdmin

docker run -d
--name pgadmin
--network pg-network
-e PGADMIN_DEFAULT_EMAIL=admin@admin.com
-e PGADMIN_DEFAULT_PASSWORD=root
-p 8080:80
dpage/pgadmin4

**Docker Compose for entire stack:**

```yaml
docker run -d
--name pgadmin
--network pg-network
-e PGADMIN_DEFAULT_EMAIL=admin@admin.com
-e PGADMIN_DEFAULT_PASSWORD=root
-p 8080:80
dpage/pgadmin4

**使用Docker Compose管理整个技术栈:**

```yaml

docker-compose.yaml

docker-compose.yaml

services: pgdatabase: image: postgres:13 environment: - POSTGRES_USER=root - POSTGRES_PASSWORD=root - POSTGRES_DB=ny_taxi volumes: - ./ny_taxi_postgres_data:/var/lib/postgresql/data ports: - "5432:5432"
pgadmin: image: dpage/pgadmin4 environment: - PGADMIN_DEFAULT_EMAIL=admin@admin.com - PGADMIN_DEFAULT_PASSWORD=root ports: - "8080:80"

```bash
services: pgdatabase: image: postgres:13 environment: - POSTGRES_USER=root - POSTGRES_PASSWORD=root - POSTGRES_DB=ny_taxi volumes: - ./ny_taxi_postgres_data:/var/lib/postgresql/data ports: - "5432:5432"
pgadmin: image: dpage/pgadmin4 environment: - PGADMIN_DEFAULT_EMAIL=admin@admin.com - PGADMIN_DEFAULT_PASSWORD=root ports: - "8080:80"

```bash

Start services

Start services

docker-compose up -d
docker-compose up -d

Stop services

Stop services

docker-compose down

**Terraform GCP setup:**

```hcl
docker-compose down

**Terraform GCP配置:**

```hcl

main.tf

main.tf

terraform { required_version = ">= 1.0" backend "local" {} required_providers { google = { source = "hashicorp/google" } } }
provider "google" { project = var.project region = var.region }
terraform { required_version = ">= 1.0" backend "local" {} required_providers { google = { source = "hashicorp/google" } } }
provider "google" { project = var.project region = var.region }

Data Lake Bucket

Data Lake Bucket

resource "google_storage_bucket" "data-lake-bucket" { name = "${local.data_lake_bucket}_${var.project}" location = var.region storage_class = var.storage_class uniform_bucket_level_access = true
versioning { enabled = true }
lifecycle_rule { action { type = "Delete" } condition { age = 30 } }
force_destroy = true }
resource "google_storage_bucket" "data-lake-bucket" { name = "${local.data_lake_bucket}_${var.project}" location = var.region storage_class = var.storage_class uniform_bucket_level_access = true
versioning { enabled = true }
lifecycle_rule { action { type = "Delete" } condition { age = 30 } }
force_destroy = true }

BigQuery Dataset

BigQuery Dataset

resource "google_bigquery_dataset" "dataset" { dataset_id = var.BQ_DATASET project = var.project location = var.region }

```hcl
resource "google_bigquery_dataset" "dataset" { dataset_id = var.BQ_DATASET project = var.project location = var.region }

```hcl

variables.tf

variables.tf

locals { data_lake_bucket = "dtc_data_lake" }
variable "project" { description = "Your GCP Project ID" }
variable "region" { description = "Region for GCP resources" default = "europe-west6" type = string }
variable "storage_class" { description = "Storage class type for your bucket" default = "STANDARD" }
variable "BQ_DATASET" { description = "BigQuery Dataset" type = string default = "trips_data_all" }

```bash
locals { data_lake_bucket = "dtc_data_lake" }
variable "project" { description = "Your GCP Project ID" }
variable "region" { description = "Region for GCP resources" default = "europe-west6" type = string }
variable "storage_class" { description = "Storage class type for your bucket" default = "STANDARD" }
variable "BQ_DATASET" { description = "BigQuery Dataset" type = string default = "trips_data_all" }

```bash

Initialize Terraform

Initialize Terraform

terraform init
terraform init

Plan infrastructure

Plan infrastructure

terraform plan
terraform plan

Apply infrastructure

Apply infrastructure

terraform apply
terraform apply

Destroy infrastructure

Destroy infrastructure

terraform destroy
undefined
terraform destroy
undefined

Module 2: Workflow Orchestration (Kestra)

模块2:工作流编排(Kestra)

Example Kestra workflow for data ingestion:
yaml
undefined
数据 ingestion 示例Kestra工作流:
yaml
undefined

flows/ingest_data.yaml

flows/ingest_data.yaml

id: ingest_ny_taxi_data namespace: zoomcamp
tasks:
  • id: download_data type: io.kestra.core.tasks.scripts.Bash commands:
  • id: python_ingest type: io.kestra.plugin.scripts.python.Script docker: image: python:3.9 script: | import pandas as pd from sqlalchemy import create_engine import os
    df = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100000)
    engine = create_engine(os.getenv('POSTGRES_CONNECTION')) df.to_sql('yellow_taxi_data', engine, if_exists='replace', chunksize=10000)
    print(f"Inserted {len(df)} rows")
  • id: log_completion type: io.kestra.core.tasks.log.Log message: "Data ingestion completed successfully"

**Python data ingestion script:**

```python
id: ingest_ny_taxi_data namespace: zoomcamp
tasks:
  • id: download_data type: io.kestra.core.tasks.scripts.Bash commands:
  • id: python_ingest type: io.kestra.plugin.scripts.python.Script docker: image: python:3.9 script: | import pandas as pd from sqlalchemy import create_engine import os
    df = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100000)
    engine = create_engine(os.getenv('POSTGRES_CONNECTION')) df.to_sql('yellow_taxi_data', engine, if_exists='replace', chunksize=10000)
    print(f"Inserted {len(df)} rows")
  • id: log_completion type: io.kestra.core.tasks.log.Log message: "Data ingestion completed successfully"

**Python数据 ingestion 脚本:**

```python

ingest_data.py

ingest_data.py

import pandas as pd from sqlalchemy import create_engine import argparse from time import time
def main(params): user = params.user password = params.password host = params.host port = params.port db = params.db table_name = params.table_name url = params.url
# Download CSV
csv_name = 'output.csv'
os.system(f"wget {url} -O {csv_name}")

# Create engine
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')

# Read CSV in chunks
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)

df = next(df_iter)

# Convert datetime columns
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

# Create table
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')

# Insert first chunk
df.to_sql(name=table_name, con=engine, if_exists='append')

# Insert remaining chunks
while True:
    try:
        t_start = time()
        df = next(df_iter)
        
        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        df.to_sql(name=table_name, con=engine, if_exists='append')
        
        t_end = time()
        print(f'Inserted another chunk, took %.3f seconds' % (t_end - t_start))
    except StopIteration:
        print("Finished ingesting data")
        break
if name == 'main': parser = argparse.ArgumentParser(description='Ingest CSV data to Postgres')
parser.add_argument('--user', required=True, help='user name for postgres')
parser.add_argument('--password', required=True, help='password for postgres')
parser.add_argument('--host', required=True, help='host for postgres')
parser.add_argument('--port', required=True, help='port for postgres')
parser.add_argument('--db', required=True, help='database name for postgres')
parser.add_argument('--table_name', required=True, help='name of the table')
parser.add_argument('--url', required=True, help='url of the csv file')

args = parser.parse_args()

main(args)

```bash
import pandas as pd from sqlalchemy import create_engine import argparse from time import time
def main(params): user = params.user password = params.password host = params.host port = params.port db = params.db table_name = params.table_name url = params.url
# Download CSV
csv_name = 'output.csv'
os.system(f"wget {url} -O {csv_name}")

# Create engine
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')

# Read CSV in chunks
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)

df = next(df_iter)

# Convert datetime columns
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

# Create table
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')

# Insert first chunk
df.to_sql(name=table_name, con=engine, if_exists='append')

# Insert remaining chunks
while True:
    try:
        t_start = time()
        df = next(df_iter)
        
        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        df.to_sql(name=table_name, con=engine, if_exists='append')
        
        t_end = time()
        print(f'Inserted another chunk, took %.3f seconds' % (t_end - t_start))
    except StopIteration:
        print("Finished ingesting data")
        break
if name == 'main': parser = argparse.ArgumentParser(description='Ingest CSV data to Postgres')
parser.add_argument('--user', required=True, help='user name for postgres')
parser.add_argument('--password', required=True, help='password for postgres')
parser.add_argument('--host', required=True, help='host for postgres')
parser.add_argument('--port', required=True, help='port for postgres')
parser.add_argument('--db', required=True, help='database name for postgres')
parser.add_argument('--table_name', required=True, help='name of the table')
parser.add_argument('--url', required=True, help='url of the csv file')

args = parser.parse_args()

main(args)

```bash

Run ingestion script

Run ingestion script

python ingest_data.py
--user=root
--password=root
--host=localhost
--port=5432
--db=ny_taxi
--table_name=yellow_taxi_trips
--url=https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
undefined
python ingest_data.py
--user=root
--password=root
--host=localhost
--port=5432
--db=ny_taxi
--table_name=yellow_taxi_trips
--url=https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
undefined

Module 3: Data Warehouse (BigQuery)

模块3:数据仓库(BigQuery)

Create partitioned and clustered table:
sql
-- Create external table
CREATE OR REPLACE EXTERNAL TABLE `trips_data_all.external_yellow_tripdata`
OPTIONS (
  format = 'CSV',
  uris = ['gs://nyc-tl-data/trip data/yellow_tripdata_2019-*.csv', 
          'gs://nyc-tl-data/trip data/yellow_tripdata_2020-*.csv']
);

-- Create partitioned table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned`
PARTITION BY
  DATE(tpep_pickup_datetime) AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;

-- Create partitioned and clustered table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned_clustered`
PARTITION BY DATE(tpep_pickup_datetime)
CLUSTER BY VendorID AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;

-- Query comparison
SELECT DISTINCT(VendorID)
FROM `trips_data_all.yellow_tripdata_partitioned`
WHERE DATE(tpep_pickup_datetime) BETWEEN '2020-06-01' AND '2020-06-30';
-- This query will process less data vs non-partitioned
Load data from GCS to BigQuery:
python
undefined
创建分区和聚类表:
sql
-- Create external table
CREATE OR REPLACE EXTERNAL TABLE `trips_data_all.external_yellow_tripdata`
OPTIONS (
  format = 'CSV',
  uris = ['gs://nyc-tl-data/trip data/yellow_tripdata_2019-*.csv', 
          'gs://nyc-tl-data/trip data/yellow_tripdata_2020-*.csv']
);

-- Create partitioned table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned`
PARTITION BY
  DATE(tpep_pickup_datetime) AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;

-- Create partitioned and clustered table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned_clustered`
PARTITION BY DATE(tpep_pickup_datetime)
CLUSTER BY VendorID AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;

-- Query comparison
SELECT DISTINCT(VendorID)
FROM `trips_data_all.yellow_tripdata_partitioned`
WHERE DATE(tpep_pickup_datetime) BETWEEN '2020-06-01' AND '2020-06-30';
-- This query will process less data vs non-partitioned
从GCS加载数据到BigQuery:
python
undefined

load_to_bq.py

load_to_bq.py

from google.cloud import bigquery import os
from google.cloud import bigquery import os

Set credentials

Set credentials

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'path/to/credentials.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'path/to/credentials.json'

Initialize client

Initialize client

client = bigquery.Client()
client = bigquery.Client()

Define table

Define table

table_id = 'your-project.trips_data_all.yellow_tripdata'
table_id = 'your-project.trips_data_all.yellow_tripdata'

Configure load job

Configure load job

job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.PARQUET, write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, )
uri = 'gs://your-bucket/yellow_tripdata_2021-01.parquet'
job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.PARQUET, write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, )
uri = 'gs://your-bucket/yellow_tripdata_2021-01.parquet'

Load data

Load data

load_job = client.load_table_from_uri( uri, table_id, job_config=job_config )
load_job.result() # Wait for job to complete
print(f"Loaded {load_job.output_rows} rows to {table_id}")
undefined
load_job = client.load_table_from_uri( uri, table_id, job_config=job_config )
load_job.result() # Wait for job to complete
print(f"Loaded {load_job.output_rows} rows to {table_id}")
undefined

Module 4: Analytics Engineering (dbt)

模块4:分析工程(dbt)

Project structure:
dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│   ├── staging/
│   │   ├── stg_yellow_tripdata.sql
│   │   └── schema.yml
│   └── core/
│       ├── fact_trips.sql
│       └── dim_zones.sql
└── macros/
    └── get_payment_type_description.sql
dbt_project.yml:
yaml
name: 'taxi_rides_ny'
version: '1.0.0'
config-version: 2

profile: 'default'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"

models:
  taxi_rides_ny:
    staging:
      +materialized: view
    core:
      +materialized: table
profiles.yml:
yaml
default:
  outputs:
    dev:
      type: bigquery
      method: service-account
      project: "{{ env_var('GCP_PROJECT_ID') }}"
      dataset: dbt_dev
      threads: 4
      keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
      location: EU
      
    prod:
      type: bigquery
      method: service-account
      project: "{{ env_var('GCP_PROJECT_ID') }}"
      dataset: production
      threads: 4
      keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
      location: EU
      
  target: dev
Staging model (models/staging/stg_yellow_tripdata.sql):
sql
{{ config(materialized='view') }}

with tripdata as 
(
  select *,
    row_number() over(partition by vendorid, tpep_pickup_datetime) as rn
  from {{ source('staging','yellow_tripdata') }}
  where vendorid is not null 
)
select
    -- identifiers
    {{ dbt_utils.generate_surrogate_key(['vendorid', 'tpep_pickup_datetime']) }} as tripid,
    cast(vendorid as integer) as vendorid,
    cast(ratecodeid as integer) as ratecodeid,
    cast(pulocationid as integer) as pickup_locationid,
    cast(dolocationid as integer) as dropoff_locationid,
    
    -- timestamps
    cast(tpep_pickup_datetime as timestamp) as pickup_datetime,
    cast(tpep_dropoff_datetime as timestamp) as dropoff_datetime,
    
    -- trip info
    store_and_fwd_flag,
    cast(passenger_count as integer) as passenger_count,
    cast(trip_distance as numeric) as trip_distance,
    
    -- payment info
    cast(fare_amount as numeric) as fare_amount,
    cast(extra as numeric) as extra,
    cast(mta_tax as numeric) as mta_tax,
    cast(tip_amount as numeric) as tip_amount,
    cast(tolls_amount as numeric) as tolls_amount,
    cast(improvement_surcharge as numeric) as improvement_surcharge,
    cast(total_amount as numeric) as total_amount,
    cast(payment_type as integer) as payment_type,
    {{ get_payment_type_description('payment_type') }} as payment_type_description
from tripdata
where rn = 1
Core model (models/core/fact_trips.sql):
sql
{{ config(materialized='table') }}

with green_data as (
    select *, 
        'Green' as service_type 
    from {{ ref('stg_green_tripdata') }}
), 

yellow_data as (
    select *, 
        'Yellow' as service_type
    from {{ ref('stg_yellow_tripdata') }}
), 

trips_unioned as (
    select * from green_data
    union all
    select * from yellow_data
), 

dim_zones as (
    select * from {{ ref('dim_zones') }}
    where borough != 'Unknown'
)
select 
    trips_unioned.tripid, 
    trips_unioned.vendorid, 
    trips_unioned.service_type,
    trips_unioned.ratecodeid, 
    trips_unioned.pickup_locationid, 
    pickup_zone.borough as pickup_borough, 
    pickup_zone.zone as pickup_zone, 
    trips_unioned.dropoff_locationid,
    dropoff_zone.borough as dropoff_borough, 
    dropoff_zone.zone as dropoff_zone,  
    trips_unioned.pickup_datetime, 
    trips_unioned.dropoff_datetime, 
    trips_unioned.store_and_fwd_flag, 
    trips_unioned.passenger_count, 
    trips_unioned.trip_distance, 
    trips_unioned.fare_amount, 
    trips_unioned.extra, 
    trips_unioned.mta_tax, 
    trips_unioned.tip_amount, 
    trips_unioned.tolls_amount, 
    trips_unioned.total_amount, 
    trips_unioned.payment_type, 
    trips_unioned.payment_type_description
from trips_unioned
inner join dim_zones as pickup_zone
on trips_unioned.pickup_locationid = pickup_zone.locationid
inner join dim_zones as dropoff_zone
on trips_unioned.dropoff_locationid = dropoff_zone.locationid
Macro (macros/get_payment_type_description.sql):
sql
{#
    This macro returns the description of the payment_type 
#}

{% macro get_payment_type_description(payment_type) -%}

    case {{ payment_type }}
        when 1 then 'Credit card'
        when 2 then 'Cash'
        when 3 then 'No charge'
        when 4 then 'Dispute'
        when 5 then 'Unknown'
        when 6 then 'Voided trip'
    end

{%- endmacro %}
Schema and tests (models/staging/schema.yml):
yaml
version: 2

sources:
  - name: staging
    database: "{{ env_var('GCP_PROJECT_ID') }}"
    schema: trips_data_all

    tables:
      - name: yellow_tripdata
      - name: green_tripdata

models:
  - name: stg_yellow_tripdata
    description: >
      Trip made by yellow taxis.
    columns:
      - name: tripid
        description: Primary key for this table, generated with a concatenation of vendorid+pickup_datetime
        tests:
          - unique:
              severity: warn
          - not_null:
              severity: warn
      - name: vendorid
        description: >
          A code indicating the TPEP provider that provided the record.
        tests:
          - accepted_values:
              values: [1, 2]
      - name: pickup_datetime
        description: The date and time when the meter was engaged.
        tests:
          - not_null:
              severity: warn
      - name: passenger_count
        description: The number of passengers in the vehicle.
        tests:
          - accepted_values:
              values: [1, 2, 3, 4, 5, 6]
              severity: warn
dbt commands:
bash
undefined
项目结构:
dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│   ├── staging/
│   │   ├── stg_yellow_tripdata.sql
│   │   └── schema.yml
│   └── core/
│       ├── fact_trips.sql
│       └── dim_zones.sql
└── macros/
    └── get_payment_type_description.sql
dbt_project.yml:
yaml
name: 'taxi_rides_ny'
version: '1.0.0'
config-version: 2

profile: 'default'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"

models:
  taxi_rides_ny:
    staging:
      +materialized: view
    core:
      +materialized: table
profiles.yml:
yaml
default:
  outputs:
    dev:
      type: bigquery
      method: service-account
      project: "{{ env_var('GCP_PROJECT_ID') }}"
      dataset: dbt_dev
      threads: 4
      keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
      location: EU
      
    prod:
      type: bigquery
      method: service-account
      project: "{{ env_var('GCP_PROJECT_ID') }}"
      dataset: production
      threads: 4
      keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
      location: EU
      
  target: dev
Staging模型(models/staging/stg_yellow_tripdata.sql):
sql
{{ config(materialized='view') }}

with tripdata as 
(
  select *,
    row_number() over(partition by vendorid, tpep_pickup_datetime) as rn
  from {{ source('staging','yellow_tripdata') }}
  where vendorid is not null 
)
select
    -- identifiers
    {{ dbt_utils.generate_surrogate_key(['vendorid', 'tpep_pickup_datetime']) }} as tripid,
    cast(vendorid as integer) as vendorid,
    cast(ratecodeid as integer) as ratecodeid,
    cast(pulocationid as integer) as pickup_locationid,
    cast(dolocationid as integer) as dropoff_locationid,
    
    -- timestamps
    cast(tpep_pickup_datetime as timestamp) as pickup_datetime,
    cast(tpep_dropoff_datetime as timestamp) as dropoff_datetime,
    
    -- trip info
    store_and_fwd_flag,
    cast(passenger_count as integer) as passenger_count,
    cast(trip_distance as numeric) as trip_distance,
    
    -- payment info
    cast(fare_amount as numeric) as fare_amount,
    cast(extra as numeric) as extra,
    cast(mta_tax as numeric) as mta_tax,
    cast(tip_amount as numeric) as tip_amount,
    cast(tolls_amount as numeric) as tolls_amount,
    cast(improvement_surcharge as numeric) as improvement_surcharge,
    cast(total_amount as numeric) as total_amount,
    cast(payment_type as integer) as payment_type,
    {{ get_payment_type_description('payment_type') }} as payment_type_description
from tripdata
where rn = 1
Core模型(models/core/fact_trips.sql):
sql
{{ config(materialized='table') }}

with green_data as (
    select *, 
        'Green' as service_type 
    from {{ ref('stg_green_tripdata') }}
), 

yellow_data as (
    select *, 
        'Yellow' as service_type
    from {{ ref('stg_yellow_tripdata') }}
), 

trips_unioned as (
    select * from green_data
    union all
    select * from yellow_data
), 

dim_zones as (
    select * from {{ ref('dim_zones') }}
    where borough != 'Unknown'
)
select 
    trips_unioned.tripid, 
    trips_unioned.vendorid, 
    trips_unioned.service_type,
    trips_unioned.ratecodeid, 
    trips_unioned.pickup_locationid, 
    pickup_zone.borough as pickup_borough, 
    pickup_zone.zone as pickup_zone, 
    trips_unioned.dropoff_locationid,
    dropoff_zone.borough as dropoff_borough, 
    dropoff_zone.zone as dropoff_zone,  
    trips_unioned.pickup_datetime, 
    trips_unioned.dropoff_datetime, 
    trips_unioned.store_and_fwd_flag, 
    trips_unioned.passenger_count, 
    trips_unioned.trip_distance, 
    trips_unioned.fare_amount, 
    trips_unioned.extra, 
    trips_unioned.mta_tax, 
    trips_unioned.tip_amount, 
    trips_unioned.tolls_amount, 
    trips_unioned.total_amount, 
    trips_unioned.payment_type, 
    trips_unioned.payment_type_description
from trips_unioned
inner join dim_zones as pickup_zone
on trips_unioned.pickup_locationid = pickup_zone.locationid
inner join dim_zones as dropoff_zone
on trips_unioned.dropoff_locationid = dropoff_zone.locationid
宏(macros/get_payment_type_description.sql):
sql
{#
    This macro returns the description of the payment_type 
#}

{% macro get_payment_type_description(payment_type) -%}

    case {{ payment_type }}
        when 1 then 'Credit card'
        when 2 then 'Cash'
        when 3 then 'No charge'
        when 4 then 'Dispute'
        when 5 then 'Unknown'
        when 6 then 'Voided trip'
    end

{%- endmacro %}
Schema和测试(models/staging/schema.yml):
yaml
version: 2

sources:
  - name: staging
    database: "{{ env_var('GCP_PROJECT_ID') }}"
    schema: trips_data_all

    tables:
      - name: yellow_tripdata
      - name: green_tripdata

models:
  - name: stg_yellow_tripdata
    description: >
      Trip made by yellow taxis.
    columns:
      - name: tripid
        description: Primary key for this table, generated with a concatenation of vendorid+pickup_datetime
        tests:
          - unique:
              severity: warn
          - not_null:
              severity: warn
      - name: vendorid
        description: >
          A code indicating the TPEP provider that provided the record.
        tests:
          - accepted_values:
              values: [1, 2]
      - name: pickup_datetime
        description: The date and time when the meter was engaged.
        tests:
          - not_null:
              severity: warn
      - name: passenger_count
        description: The number of passengers in the vehicle.
        tests:
          - accepted_values:
              values: [1, 2, 3, 4, 5, 6]
              severity: warn
dbt命令:
bash
undefined

Install dependencies

Install dependencies

dbt deps
dbt deps

Run models

Run models

dbt run
dbt run

Run specific model

Run specific model

dbt run --select stg_yellow_tripdata
dbt run --select stg_yellow_tripdata

Test models

Test models

dbt test
dbt test

Generate documentation

Generate documentation

dbt docs generate
dbt docs generate

Serve documentation

Serve documentation

dbt docs serve
dbt docs serve

Run models and tests

Run models and tests

dbt build
dbt build

Run with specific target

Run with specific target

dbt run --target prod
undefined
dbt run --target prod
undefined

Module 6: Batch Processing (Spark)

模块6:批处理(Spark)

PySpark setup:
python
undefined
PySpark配置:
python
undefined

Download Spark

Download Spark

wget https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz tar xzfv spark-3.3.2-bin-hadoop3.tgz rm spark-3.3.2-bin-hadoop3.tgz
wget https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz tar xzfv spark-3.3.2-bin-hadoop3.tgz rm spark-3.3.2-bin-hadoop3.tgz

Set environment variables

Set environment variables

export SPARK_HOME="${HOME}/spark-3.3.2-bin-hadoop3" export PATH="${SPARK_HOME}/bin:${PATH}"

**PySpark script for data processing:**

```python
export SPARK_HOME="${HOME}/spark-3.3.2-bin-hadoop3" export PATH="${SPARK_HOME}/bin:${PATH}"

**PySpark数据处理脚本:**

```python

spark_processing.py

spark_processing.py

import pyspark from pyspark.sql import SparkSession from pyspark.sql import types from pyspark.sql import functions as F
import pyspark from pyspark.sql import SparkSession from pyspark.sql import types from pyspark.sql import functions as F

Create Spark session

Create Spark session

spark = SparkSession.builder
.master("local[*]")
.appName('test')
.getOrCreate()
spark = SparkSession.builder
.master("local[*]")
.appName('test')
.getOrCreate()

Define schema

Define schema

schema = types.StructType([ types.StructField('hvfhs_license_num', types.StringType(), True), types.StructField('dispatching_base_num', types.StringType(), True), types.StructField('pickup_datetime', types.TimestampType(), True), types.StructField('dropoff_datetime', types.TimestampType(), True), types.StructField('PULocationID', types.IntegerType(), True), types.StructField('DOLocationID', types.IntegerType(), True), types.StructField('SR_Flag', types.StringType(), True) ])
schema = types.StructType([ types.StructField('hvfhs_license_num', types.StringType(), True), types.StructField('dispatching_base_num', types.StringType(), True), types.StructField('pickup_datetime', types.TimestampType(), True), types.StructField('dropoff_datetime', types.TimestampType(), True), types.StructField('PULocationID', types.IntegerType(), True), types.StructField('DOLocationID', types.IntegerType(), True), types.StructField('SR_Flag', types.StringType(), True) ])

Read CSV

Read CSV

df = spark.read
.option("header", "true")
.schema(schema)
.csv('fhvhv_tripdata_2021-01.csv')
df = spark.read
.option("header", "true")
.schema(schema)
.csv('fhvhv_tripdata_2021-01.csv')

Show schema

Show schema

df.printSchema()
df.printSchema()

Repartition and save as parquet

Repartition and save as parquet

df.repartition(24)
.write.parquet('fhvhv/2021/01/', mode='overwrite')
df.repartition(24)
.write.parquet('fhvhv/2021/01/', mode='overwrite')

Read parquet

Read parquet

df = spark.read.parquet('fhvhv/2021/01/')
df = spark.read.parquet('fhvhv/2021/01/')

SQL transformations

SQL transformations

df.registerTempTable('fhvhv_2021_01')
spark.sql(""" SELECT PULocationID AS revenue_zone, date_trunc('month', pickup_datetime) AS revenue_month, COUNT(1) AS number_of_trips FROM fhvhv_2021_01 WHERE hvfhs_license_num = 'HV0003' GROUP BY 1, 2 ORDER BY 1, 2 """).show()
df.registerTempTable('fhvhv_2021_01')
spark.sql(""" SELECT PULocationID AS revenue_zone, date_trunc('month', pickup_datetime) AS revenue_month, COUNT(1) AS number_of_trips FROM fhvhv_2021_01 WHERE hvfhs_license_num = 'HV0003' GROUP BY 1, 2 ORDER BY 1, 2 """).show()

DataFrame API

DataFrame API

df_result = df
.withColumn('revenue_month', F.date_trunc('month', 'pickup_datetime'))
.filter(F.col('hvfhs_license_num') == 'HV0003')
.groupBy('PULocationID', 'revenue_month')
.agg(F.count('*').alias('number_of_trips'))
.orderBy('PULocationID', 'revenue_month')
df_result.show()
df_result = df
.withColumn('revenue_month', F.date_trunc('month', 'pickup_datetime'))
.filter(F.col('hvfhs_license_num') == 'HV0003')
.groupBy('PULocationID', 'revenue_month')
.agg(F.count('*').alias('number_of_trips'))
.orderBy('PULocationID', 'revenue_month')
df_result.show()

Write results

Write results

df_result.coalesce(1).write.parquet('tmp/revenue-zones', mode='overwrite')

**Spark with Google Cloud Storage:**

```python
import pyspark
from pyspark.sql import SparkSession
df_result.coalesce(1).write.parquet('tmp/revenue-zones', mode='overwrite')

**Spark对接Google Cloud Storage:**

```python
import pyspark
from pyspark.sql import SparkSession

Configure Spark for GCS

Configure Spark for GCS

spark = SparkSession.builder
.master("local[*]")
.appName('test')
.config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar")
.getOrCreate()
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "path/to/credentials.json")
spark = SparkSession.builder
.master("local[*]")
.appName('test')
.config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar")
.getOrCreate()
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "path/to/credentials.json")

Read from GCS

Read from GCS

df_green = spark.read.parquet('gs://your-bucket/pq/green//')
df_green = spark.read.parquet('gs://your-bucket/pq/green//')

Write to BigQuery

Write to BigQuery

df_green.write.format('bigquery')
.option('table', 'trips_data_all.green_tripdata')
.option('temporaryGcsBucket', 'your-temp-bucket')
.mode('overwrite')
.save()
undefined
df_green.write.format('bigquery')
.option('table', 'trips_data_all.green_tripdata')
.option('temporaryGcsBucket', 'your-temp-bucket')
.mode('overwrite')
.save()
undefined

Module 7: Streaming (Kafka)

模块7:流处理(Kafka)

Docker Compose for Kafka:
yaml
undefined
Kafka的Docker Compose配置:
yaml
undefined

docker-compose-kafka.yml

docker-compose-kafka.yml

version: '3.6' services: zookeeper: image: confluentinc/cp-zookeeper:7.2.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000
broker: image: confluentinc/cp-kafka:7.2.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
schema-registry: image: confluentinc/cp-schema-registry:7.2.0 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
control-center: image: confluentinc/cp-enterprise-control-center:7.2.0 hostname: control-center container_name: control-center depends_on: - broker - schema-registry ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 PORT: 9021

```bash
version: '3.6' services: zookeeper: image: confluentinc/cp-zookeeper:7.2.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000
broker: image: confluentinc/cp-kafka:7.2.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
schema-registry: image: confluentinc/cp-schema-registry:7.2.0 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
control-center: image: confluentinc/cp-enterprise-control-center:7.2.0 hostname: control-center container_name: control-center depends_on: - broker - schema-registry ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 PORT: 9021

```bash

Start Kafka

Start Kafka

docker-compose -f docker-compose-kafka.yml up -d

**Python Kafka producer:**

```python
docker-compose -f docker-compose-kafka.yml up -d

**Python Kafka生产者:**

```python

producer.py

producer.py

from kafka import KafkaProducer import json import time from datetime import datetime
from kafka import KafkaProducer import json import time from datetime import datetime

Create producer

Create producer

producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') )
producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') )

Send messages

Send messages

for i in range(100): message = { 'trip_id': i, 'vendor_id': 1, 'pickup_datetime': datetime.now().isoformat(), 'passenger_count': 1, 'trip_distance': 5.2 }
producer.send('rides', value=message)
print(f"Sent message {i}")
time.sleep(1)
producer.flush()

**Python Kafka consumer:**

```python
for i in range(100): message = { 'trip_id': i, 'vendor_id': 1, 'pickup_datetime': datetime.now().isoformat(), 'passenger_count': 1, 'trip_distance': 5.2 }
producer.send('rides', value=message)
print(f"Sent message {i}")
time.sleep(1)
producer.flush()

**Python Kafka消费者:**

```python

consumer.py

consumer.py

from kafka import KafkaConsumer import json
from kafka import KafkaConsumer import json

Create consumer

Create consumer

consumer = KafkaConsumer( 'rides', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: json.loads(x.decode('utf-8')) )
consumer = KafkaConsumer( 'rides', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: json.loads(x.decode('utf-8')) )

Consume messages

Consume messages

for message in consumer: print(f"Received: {message.value}")
# Process message
trip_id = message.value['trip_id']
trip_distance = message.value['trip_distance']
print(f"Trip {trip_id}: {trip_distance} miles")

**Kafka Streams example:**

```python
for message in consumer: print(f"Received: {message.value}")
# Process message
trip_id = message.value['trip_id']
trip_distance = message.value['trip_distance']
print(f"Trip {trip_id}: {trip_distance} miles")

**Kafka Streams示例:**

```python

streams_processing.py

streams_processing.py

from kafka import KafkaProducer, KafkaConsumer from kafka.admin import KafkaAdminClient, NewTopic import json
from kafka import KafkaProducer, KafkaConsumer from kafka.admin import KafkaAdminClient, NewTopic import json

Create topics

Create topics

admin_client = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
topic_list = [ NewTopic(name="rides", num_partitions=2, replication_factor=1), NewTopic(name="rides-pulocationid", num_partitions=2, replication_factor=1) ]
try: admin_client.create_topics(new_topics=topic_list, validate_only=False) except Exception as e: print(f"Topics might already exist: {e}")
admin_client = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
topic_list = [ NewTopic(name="rides", num_partitions=2, replication_factor=1), NewTopic(name="rides-pulocationid", num_partitions=2, replication_factor=1) ]
try: admin_client.create_topics(new_topics=topic_list, validate_only=False) except Exception as e: print(f"Topics might already exist: {e}")

Stream processing (aggregation)

Stream processing (aggregation)

from collections import defaultdict
consumer = KafkaConsumer( 'rides', bootstrap_servers=['localhost:9092'], value_deserializer=lambda x: json.loads(x.decode('utf-8')) )
producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') )
from collections import defaultdict
consumer = KafkaConsumer( 'rides', bootstrap_servers=['localhost:9092'], value_deserializer=lambda x: json.loads(x.decode('utf-8')) )
producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') )

Count rides by location

Count rides by location

location_counts = defaultdict(int)
for message in consumer: ride = message.value location = ride.get('PULocationID', 'unknown') location_counts[location] += 1
# Send aggregated results
result = {
    'location': location,
    'count': location_counts[location]
}

producer.send('rides-pulocationid', value=result)
print(f"Location {location}: {location_counts[location]} rides")
undefined
location_counts = defaultdict(int)
for message in consumer: ride = message.value location = ride.get('PULocationID', 'unknown') location_counts[location] += 1
# Send aggregated results
result = {
    'location': location,
    'count': location_counts[location]
}

producer.send('rides-pulocationid', value=result)
print(f"Location {location}: {location_counts[location]} rides")
undefined

Common Workflows

通用工作流

Setting Up Development Environment

搭建开发环境

bash
undefined
bash
undefined

Clone repository

Clone repository

Set up Python environment

Set up Python environment

python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate pip install -r requirements.txt
python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate pip install -r requirements.txt

Set up GCP credentials

Set up GCP credentials

export GOOGLE_APPLICATION_CREDENTIALS="path/to/credentials.json" export GCP_PROJECT_ID="your-project-id"
undefined
export GOOGLE_APPLICATION_CREDENTIALS="path/to/credentials.json" export GCP_PROJECT_ID="your-project-id"
undefined

Complete Module Workflow

完整模块工作流

bash
undefined
bash
undefined

1. Start Docker infrastructure

1. Start Docker infrastructure

docker-compose up -d
docker-compose up -d

2. Run Terraform

2. Run Terraform

cd 01-docker-terraform/terraform terraform init terraform apply
cd 01-docker-terraform/terraform terraform init terraform apply

3. Ingest data

3. Ingest data

python ingest_data.py --params...
python ingest_data.py --params...

4. Run dbt models

4. Run dbt models

cd 04-analytics-engineering dbt run dbt test
cd 04-analytics-engineering dbt run dbt test

5. Run Spark job

5. Run Spark job

spark-submit
--master local[*]
spark_processing.py
spark-submit
--master local[*]
spark_processing.py

6. Clean up

6. Clean up

terraform destroy docker-compose down
undefined
terraform destroy docker-compose down
undefined

Homework Submission Pattern

作业提交流程

bash
undefined
bash
undefined

Navigate to cohort folder

Navigate to cohort folder

cd cohorts/2026/01-docker-terraform
cd cohorts/2026/01-docker-terraform

Complete homework in Jupyter notebook

Complete homework in Jupyter notebook

jupyter notebook homework.ipynb
jupyter notebook homework.ipynb

Export answers

Export answers

Submit through course platform

Submit through course platform

undefined
undefined

Troubleshooting

故障排查

Docker Issues

Docker问题

Port already in use:
bash
undefined
端口已被占用:
bash
undefined

Find process using

Find process using