harvard-art-museums-etl-pipeline

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Harvard Art Museums ETL Pipeline Skill

哈佛艺术博物馆ETL管道技能

Skill by ara.so — Data Skills collection.
ara.so提供的技能——数据技能合集。

Overview

概述

The Harvard Artifacts Collection Data Engineering Analytics App is an end-to-end data pipeline that demonstrates:
  • ETL Operations: Extract data from Harvard Art Museums API, transform nested JSON, load into SQL
  • Database Design: Relational schema with artifact metadata, media, and color tables
  • Analytics: Pre-built SQL queries for insights
  • Visualization: Interactive Streamlit dashboards with Plotly charts
Architecture:
API → ETL → SQL → Analytics → Visualization
哈佛文物收藏数据工程分析应用是一个端到端数据管道,展示了:
  • ETL操作:从哈佛艺术博物馆API提取数据,转换嵌套JSON,加载至SQL数据库
  • 数据库设计:包含文物元数据、媒体和颜色表的关系型架构
  • 分析功能:用于获取洞察的预构建SQL查询
  • 可视化:基于Plotly图表的交互式Streamlit仪表盘
架构
API → ETL → SQL → Analytics → Visualization

Installation

安装

bash
undefined
bash
undefined

Clone the repository

Clone the repository

git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App

Install dependencies

Install dependencies

pip install -r requirements.txt
pip install -r requirements.txt

Set environment variables

Set environment variables

export HARVARD_API_KEY="your_api_key_here" export DB_HOST="your_database_host" export DB_USER="your_database_user" export DB_PASSWORD="your_database_password" export DB_NAME="your_database_name"

**Required packages**:
streamlit pandas requests mysql-connector-python plotly python-dotenv
undefined
export HARVARD_API_KEY="your_api_key_here" export DB_HOST="your_database_host" export DB_USER="your_database_user" export DB_PASSWORD="your_database_password" export DB_NAME="your_database_name"

**所需依赖包**:
streamlit pandas requests mysql-connector-python plotly python-dotenv
undefined

Harvard Art Museums API Integration

哈佛艺术博物馆API集成

API Configuration

API配置

python
import requests
import os
python
import requests
import os

API endpoint and authentication

API endpoint and authentication

API_BASE_URL = "https://api.harvardartmuseums.org" API_KEY = os.getenv("HARVARD_API_KEY")
def fetch_artifacts(page=1, size=100): """ Fetch artifacts from Harvard Art Museums API with pagination """ url = f"{API_BASE_URL}/object" params = { "apikey": API_KEY, "page": page, "size": size, "hasimage": 1 # Only artifacts with images }
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()
API_BASE_URL = "https://api.harvardartmuseums.org" API_KEY = os.getenv("HARVARD_API_KEY")
def fetch_artifacts(page=1, size=100): """ Fetch artifacts from Harvard Art Museums API with pagination """ url = f"{API_BASE_URL}/object" params = { "apikey": API_KEY, "page": page, "size": size, "hasimage": 1 # Only artifacts with images }
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()

Fetch multiple pages

Fetch multiple pages

def collect_artifacts(num_pages=5): all_artifacts = [] for page in range(1, num_pages + 1): data = fetch_artifacts(page=page) all_artifacts.extend(data.get("records", [])) print(f"Fetched page {page}, total artifacts: {len(all_artifacts)}") return all_artifacts
undefined
def collect_artifacts(num_pages=5): all_artifacts = [] for page in range(1, num_pages + 1): data = fetch_artifacts(page=page) all_artifacts.extend(data.get("records", [])) print(f"Fetched page {page}, total artifacts: {len(all_artifacts)}") return all_artifacts
undefined

Rate Limiting and Error Handling

速率限制与错误处理

python
import time
from requests.exceptions import RequestException

def fetch_with_retry(page, max_retries=3):
    """
    Fetch data with retry logic and rate limiting
    """
    for attempt in range(max_retries):
        try:
            time.sleep(0.5)  # Rate limiting
            return fetch_artifacts(page=page)
        except RequestException as e:
            if attempt == max_retries - 1:
                raise
            print(f"Retry {attempt + 1} for page {page}")
            time.sleep(2 ** attempt)  # Exponential backoff
python
import time
from requests.exceptions import RequestException

def fetch_with_retry(page, max_retries=3):
    """
    Fetch data with retry logic and rate limiting
    """
    for attempt in range(max_retries):
        try:
            time.sleep(0.5)  # Rate limiting
            return fetch_artifacts(page=page)
        except RequestException as e:
            if attempt == max_retries - 1:
                raise
            print(f"Retry {attempt + 1} for page {page}")
            time.sleep(2 ** attempt)  # Exponential backoff

ETL Pipeline

ETL管道

Extract

提取

python
def extract_artifact_data(artifact):
    """
    Extract key fields from artifact JSON
    """
    return {
        "object_id": artifact.get("objectid"),
        "title": artifact.get("title", "Unknown"),
        "culture": artifact.get("culture"),
        "century": artifact.get("century"),
        "classification": artifact.get("classification"),
        "department": artifact.get("department"),
        "dated": artifact.get("dated"),
        "medium": artifact.get("medium"),
        "dimensions": artifact.get("dimensions"),
        "description": artifact.get("description"),
        "provenance": artifact.get("provenance")
    }
python
def extract_artifact_data(artifact):
    """
    Extract key fields from artifact JSON
    """
    return {
        "object_id": artifact.get("objectid"),
        "title": artifact.get("title", "Unknown"),
        "culture": artifact.get("culture"),
        "century": artifact.get("century"),
        "classification": artifact.get("classification"),
        "department": artifact.get("department"),
        "dated": artifact.get("dated"),
        "medium": artifact.get("medium"),
        "dimensions": artifact.get("dimensions"),
        "description": artifact.get("description"),
        "provenance": artifact.get("provenance")
    }

Transform

转换

python
import pandas as pd

def transform_artifacts(raw_artifacts):
    """
    Transform nested JSON into relational dataframes
    """
    metadata_list = []
    media_list = []
    colors_list = []
    
    for artifact in raw_artifacts:
        object_id = artifact.get("objectid")
        
        # Metadata table
        metadata_list.append(extract_artifact_data(artifact))
        
        # Media table (one-to-many)
        for image in artifact.get("images", []):
            media_list.append({
                "object_id": object_id,
                "image_url": image.get("baseimageurl"),
                "image_width": image.get("width"),
                "image_height": image.get("height"),
                "alt_text": image.get("alttext")
            })
        
        # Colors table (one-to-many)
        for color in artifact.get("colors", []):
            colors_list.append({
                "object_id": object_id,
                "color_hex": color.get("hex"),
                "color_name": color.get("color"),
                "percentage": color.get("percent")
            })
    
    return (
        pd.DataFrame(metadata_list),
        pd.DataFrame(media_list),
        pd.DataFrame(colors_list)
    )
python
import pandas as pd

def transform_artifacts(raw_artifacts):
    """
    Transform nested JSON into relational dataframes
    """
    metadata_list = []
    media_list = []
    colors_list = []
    
    for artifact in raw_artifacts:
        object_id = artifact.get("objectid")
        
        # Metadata table
        metadata_list.append(extract_artifact_data(artifact))
        
        # Media table (one-to-many)
        for image in artifact.get("images", []):
            media_list.append({
                "object_id": object_id,
                "image_url": image.get("baseimageurl"),
                "image_width": image.get("width"),
                "image_height": image.get("height"),
                "alt_text": image.get("alttext")
            })
        
        # Colors table (one-to-many)
        for color in artifact.get("colors", []):
            colors_list.append({
                "object_id": object_id,
                "color_hex": color.get("hex"),
                "color_name": color.get("color"),
                "percentage": color.get("percent")
            })
    
    return (
        pd.DataFrame(metadata_list),
        pd.DataFrame(media_list),
        pd.DataFrame(colors_list)
    )

Load

加载

python
import mysql.connector
from mysql.connector import Error

def get_db_connection():
    """
    Create MySQL database connection
    """
    return mysql.connector.connect(
        host=os.getenv("DB_HOST"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        database=os.getenv("DB_NAME")
    )

def create_tables(conn):
    """
    Create database schema
    """
    cursor = conn.cursor()
    
    # Metadata table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactmetadata (
            object_id INT PRIMARY KEY,
            title VARCHAR(500),
            culture VARCHAR(200),
            century VARCHAR(100),
            classification VARCHAR(200),
            department VARCHAR(200),
            dated VARCHAR(200),
            medium TEXT,
            dimensions VARCHAR(500),
            description TEXT,
            provenance TEXT
        )
    """)
    
    # Media table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactmedia (
            id INT AUTO_INCREMENT PRIMARY KEY,
            object_id INT,
            image_url VARCHAR(1000),
            image_width INT,
            image_height INT,
            alt_text TEXT,
            FOREIGN KEY (object_id) REFERENCES artifactmetadata(object_id)
        )
    """)
    
    # Colors table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactcolors (
            id INT AUTO_INCREMENT PRIMARY KEY,
            object_id INT,
            color_hex VARCHAR(7),
            color_name VARCHAR(50),
            percentage FLOAT,
            FOREIGN KEY (object_id) REFERENCES artifactmetadata(object_id)
        )
    """)
    
    conn.commit()
    cursor.close()

def batch_insert_dataframe(conn, df, table_name):
    """
    Batch insert dataframe into SQL table
    """
    if df.empty:
        return
    
    cursor = conn.cursor()
    columns = ", ".join(df.columns)
    placeholders = ", ".join(["%s"] * len(df.columns))
    
    insert_query = f"INSERT IGNORE INTO {table_name} ({columns}) VALUES ({placeholders})"
    
    # Convert dataframe to list of tuples
    data = [tuple(row) for row in df.values]
    
    cursor.executemany(insert_query, data)
    conn.commit()
    cursor.close()
    print(f"Inserted {len(data)} rows into {table_name}")
python
import mysql.connector
from mysql.connector import Error

def get_db_connection():
    """
    Create MySQL database connection
    """
    return mysql.connector.connect(
        host=os.getenv("DB_HOST"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        database=os.getenv("DB_NAME")
    )

def create_tables(conn):
    """
    Create database schema
    """
    cursor = conn.cursor()
    
    # Metadata table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactmetadata (
            object_id INT PRIMARY KEY,
            title VARCHAR(500),
            culture VARCHAR(200),
            century VARCHAR(100),
            classification VARCHAR(200),
            department VARCHAR(200),
            dated VARCHAR(200),
            medium TEXT,
            dimensions VARCHAR(500),
            description TEXT,
            provenance TEXT
        )
    """)
    
    # Media table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactmedia (
            id INT AUTO_INCREMENT PRIMARY KEY,
            object_id INT,
            image_url VARCHAR(1000),
            image_width INT,
            image_height INT,
            alt_text TEXT,
            FOREIGN KEY (object_id) REFERENCES artifactmetadata(object_id)
        )
    """)
    
    # Colors table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactcolors (
            id INT AUTO_INCREMENT PRIMARY KEY,
            object_id INT,
            color_hex VARCHAR(7),
            color_name VARCHAR(50),
            percentage FLOAT,
            FOREIGN KEY (object_id) REFERENCES artifactmetadata(object_id)
        )
    """)
    
    conn.commit()
    cursor.close()

def batch_insert_dataframe(conn, df, table_name):
    """
    Batch insert dataframe into SQL table
    """
    if df.empty:
        return
    
    cursor = conn.cursor()
    columns = ", ".join(df.columns)
    placeholders = ", ".join(["%s"] * len(df.columns))
    
    insert_query = f"INSERT IGNORE INTO {table_name} ({columns}) VALUES ({placeholders})"
    
    # Convert dataframe to list of tuples
    data = [tuple(row) for row in df.values]
    
    cursor.executemany(insert_query, data)
    conn.commit()
    cursor.close()
    print(f"Inserted {len(data)} rows into {table_name}")

Complete ETL Pipeline

完整ETL管道

python
def run_etl_pipeline(num_pages=10):
    """
    Execute full ETL pipeline
    """
    print("Starting ETL pipeline...")
    
    # Extract
    print("Extracting data from API...")
    raw_artifacts = collect_artifacts(num_pages=num_pages)
    
    # Transform
    print("Transforming data...")
    df_metadata, df_media, df_colors = transform_artifacts(raw_artifacts)
    
    # Load
    print("Loading data into database...")
    conn = get_db_connection()
    create_tables(conn)
    
    batch_insert_dataframe(conn, df_metadata, "artifactmetadata")
    batch_insert_dataframe(conn, df_media, "artifactmedia")
    batch_insert_dataframe(conn, df_colors, "artifactcolors")
    
    conn.close()
    print("ETL pipeline completed successfully!")
python
def run_etl_pipeline(num_pages=10):
    """
    Execute full ETL pipeline
    """
    print("Starting ETL pipeline...")
    
    # Extract
    print("Extracting data from API...")
    raw_artifacts = collect_artifacts(num_pages=num_pages)
    
    # Transform
    print("Transforming data...")
    df_metadata, df_media, df_colors = transform_artifacts(raw_artifacts)
    
    # Load
    print("Loading data into database...")
    conn = get_db_connection()
    create_tables(conn)
    
    batch_insert_dataframe(conn, df_metadata, "artifactmetadata")
    batch_insert_dataframe(conn, df_media, "artifactmedia")
    batch_insert_dataframe(conn, df_colors, "artifactcolors")
    
    conn.close()
    print("ETL pipeline completed successfully!")

Run pipeline

Run pipeline

if name == "main": run_etl_pipeline(num_pages=5)
undefined
if name == "main": run_etl_pipeline(num_pages=5)
undefined

SQL Analytics Queries

SQL分析查询

Common Analytics Patterns

常见分析模式

python
def execute_query(query):
    """
    Execute SQL query and return results as dataframe
    """
    conn = get_db_connection()
    df = pd.read_sql(query, conn)
    conn.close()
    return df
python
def execute_query(query):
    """
    Execute SQL query and return results as dataframe
    """
    conn = get_db_connection()
    df = pd.read_sql(query, conn)
    conn.close()
    return df

Query 1: Artifacts by century

Query 1: Artifacts by century

query_by_century = """ SELECT century, COUNT(*) as artifact_count FROM artifactmetadata WHERE century IS NOT NULL GROUP BY century ORDER BY artifact_count DESC LIMIT 10 """
query_by_century = """ SELECT century, COUNT(*) as artifact_count FROM artifactmetadata WHERE century IS NOT NULL GROUP BY century ORDER BY artifact_count DESC LIMIT 10 """

Query 2: Top cultures by artifact count

Query 2: Top cultures by artifact count

query_by_culture = """ SELECT culture, COUNT(*) as count FROM artifactmetadata WHERE culture IS NOT NULL GROUP BY culture ORDER BY count DESC LIMIT 15 """
query_by_culture = """ SELECT culture, COUNT(*) as count FROM artifactmetadata WHERE culture IS NOT NULL GROUP BY culture ORDER BY count DESC LIMIT 15 """

Query 3: Artifacts with most images

Query 3: Artifacts with most images

query_most_images = """ SELECT m.object_id, m.title, COUNT(med.id) as image_count FROM artifactmetadata m JOIN artifactmedia med ON m.object_id = med.object_id GROUP BY m.object_id, m.title ORDER BY image_count DESC LIMIT 10 """
query_most_images = """ SELECT m.object_id, m.title, COUNT(med.id) as image_count FROM artifactmetadata m JOIN artifactmedia med ON m.object_id = med.object_id GROUP BY m.object_id, m.title ORDER BY image_count DESC LIMIT 10 """

Query 4: Color distribution

Query 4: Color distribution

query_color_distribution = """ SELECT color_name, COUNT(*) as usage_count, AVG(percentage) as avg_percentage FROM artifactcolors GROUP BY color_name ORDER BY usage_count DESC LIMIT 10 """
query_color_distribution = """ SELECT color_name, COUNT(*) as usage_count, AVG(percentage) as avg_percentage FROM artifactcolors GROUP BY color_name ORDER BY usage_count DESC LIMIT 10 """

Query 5: Department statistics

Query 5: Department statistics

query_department_stats = """ SELECT department, COUNT(*) as total_artifacts, COUNT(DISTINCT culture) as unique_cultures FROM artifactmetadata WHERE department IS NOT NULL GROUP BY department ORDER BY total_artifacts DESC """
undefined
query_department_stats = """ SELECT department, COUNT(*) as total_artifacts, COUNT(DISTINCT culture) as unique_cultures FROM artifactmetadata WHERE department IS NOT NULL GROUP BY department ORDER BY total_artifacts DESC """
undefined

Streamlit Dashboard

Streamlit仪表盘

python
import streamlit as st
import plotly.express as px

def main():
    st.title("Harvard Art Museums Analytics Dashboard")
    st.markdown("### ETL Pipeline & SQL Analytics")
    
    # Sidebar for ETL controls
    st.sidebar.header("ETL Pipeline")
    num_pages = st.sidebar.slider("Number of API pages", 1, 20, 5)
    
    if st.sidebar.button("Run ETL Pipeline"):
        with st.spinner("Running ETL..."):
            run_etl_pipeline(num_pages=num_pages)
            st.success("ETL completed!")
    
    # Analytics section
    st.header("SQL Analytics")
    
    query_options = {
        "Artifacts by Century": query_by_century,
        "Top Cultures": query_by_culture,
        "Most Documented Artifacts": query_most_images,
        "Color Distribution": query_color_distribution,
        "Department Statistics": query_department_stats
    }
    
    selected_query = st.selectbox("Select Analysis", list(query_options.keys()))
    
    if st.button("Execute Query"):
        df = execute_query(query_options[selected_query])
        
        # Display results
        st.dataframe(df)
        
        # Auto-generate visualization
        if len(df.columns) == 2:
            fig = px.bar(df, x=df.columns[0], y=df.columns[1], 
                        title=selected_query)
            st.plotly_chart(fig)

if __name__ == "__main__":
    main()
python
import streamlit as st
import plotly.express as px

def main():
    st.title("Harvard Art Museums Analytics Dashboard")
    st.markdown("### ETL Pipeline & SQL Analytics")
    
    # Sidebar for ETL controls
    st.sidebar.header("ETL Pipeline")
    num_pages = st.sidebar.slider("Number of API pages", 1, 20, 5)
    
    if st.sidebar.button("Run ETL Pipeline"):
        with st.spinner("Running ETL..."):
            run_etl_pipeline(num_pages=num_pages)
            st.success("ETL completed!")
    
    # Analytics section
    st.header("SQL Analytics")
    
    query_options = {
        "Artifacts by Century": query_by_century,
        "Top Cultures": query_by_culture,
        "Most Documented Artifacts": query_most_images,
        "Color Distribution": query_color_distribution,
        "Department Statistics": query_department_stats
    }
    
    selected_query = st.selectbox("Select Analysis", list(query_options.keys()))
    
    if st.button("Execute Query"):
        df = execute_query(query_options[selected_query])
        
        # Display results
        st.dataframe(df)
        
        # Auto-generate visualization
        if len(df.columns) == 2:
            fig = px.bar(df, x=df.columns[0], y=df.columns[1], 
                        title=selected_query)
            st.plotly_chart(fig)

if __name__ == "__main__":
    main()

Configuration

配置

Create a
.env
file:
bash
HARVARD_API_KEY=your_api_key_from_harvard
DB_HOST=localhost
DB_USER=root
DB_PASSWORD=your_password
DB_NAME=harvard_artifacts
Get your Harvard API key from: https://docs.harvardartmuseums.org/
创建一个
.env
文件:
bash
HARVARD_API_KEY=your_api_key_from_harvard
DB_HOST=localhost
DB_USER=root
DB_PASSWORD=your_password
DB_NAME=harvard_artifacts
从以下链接获取哈佛API密钥:https://docs.harvardartmuseums.org/

Troubleshooting

故障排除

API Rate Limiting:
python
undefined
API速率限制:
python
undefined

Add sleep between requests

Add sleep between requests

import time time.sleep(0.5) # 500ms delay

**Database Connection Issues**:
```python
import time time.sleep(0.5) # 500ms delay

**数据库连接问题**:
```python

Test connection

Test connection

try: conn = get_db_connection() print("Connection successful") conn.close() except Error as e: print(f"Connection failed: {e}")

**Empty API Response**:
```python
try: conn = get_db_connection() print("Connection successful") conn.close() except Error as e: print(f"Connection failed: {e}")

**API响应为空**:
```python

Verify API key and check response

Verify API key and check response

response = fetch_artifacts(page=1) print(f"Total records: {response.get('info', {}).get('totalrecords')}")

**Foreign Key Constraint Errors**:
```python
response = fetch_artifacts(page=1) print(f"Total records: {response.get('info', {}).get('totalrecords')}")

**外键约束错误**:
```python

Ensure metadata is inserted before media/colors

Ensure metadata is inserted before media/colors

Use IGNORE to skip duplicates

Use IGNORE to skip duplicates

cursor.execute("INSERT IGNORE INTO artifactmetadata ...")

**Memory Issues with Large Datasets**:
```python
cursor.execute("INSERT IGNORE INTO artifactmetadata ...")

**大数据集内存问题**:
```python

Process in smaller batches

Process in smaller batches

def batch_process(artifacts, batch_size=100): for i in range(0, len(artifacts), batch_size): batch = artifacts[i:i+batch_size] # Process batch
undefined
def batch_process(artifacts, batch_size=100): for i in range(0, len(artifacts), batch_size): batch = artifacts[i:i+batch_size] # Process batch
undefined