harvard-art-museums-data-pipeline
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseHarvard Art Museums Data Pipeline
Harvard Art Museums 数据流水线
Skill by ara.so — Data Skills collection.
This project demonstrates a complete data engineering workflow: extracting artifact data from the Harvard Art Museums API, transforming it into relational tables, loading into SQL databases, and visualizing analytics through a Streamlit dashboard.
技能来自 ara.so —— 数据技能合集。
本项目展示了完整的数据工程工作流:从Harvard Art Museums API提取文物数据,将其转换为关系型表,加载到SQL数据库中,并通过Streamlit仪表板实现分析可视化。
What It Does
功能介绍
- API Integration: Fetches artifact metadata, media, and color information from Harvard Art Museums API
- ETL Pipeline: Transforms nested JSON into normalized relational data
- Database Design: Creates three related tables (metadata, media, colors) with proper foreign keys
- SQL Analytics: Executes 20+ analytical queries for insights
- Interactive Visualization: Streamlit dashboard with Plotly charts
Architecture:
API → ETL → SQL → Analytics → Visualization- API集成:从Harvard Art Museums API获取文物元数据、媒体资源和颜色信息
- ETL流水线:将嵌套JSON转换为标准化关系型数据
- 数据库设计:创建三个带正确外键的关联表(元数据表、媒体表、颜色表)
- SQL分析:执行20+条分析查询以获取洞察
- 交互式可视化:基于Plotly图表的Streamlit仪表板
架构:
API → ETL → SQL → 分析 → 可视化Installation
安装步骤
bash
undefinedbash
undefinedClone the repository
Clone the repository
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
Install dependencies
Install dependencies
pip install -r requirements.txt
**Required packages:**streamlit
pandas
requests
mysql-connector-python
plotly
python-dotenv
undefinedpip install -r requirements.txt
**所需依赖包:**streamlit
pandas
requests
mysql-connector-python
plotly
python-dotenv
undefinedConfiguration
配置说明
1. Harvard Art Museums API Key
1. Harvard Art Museums API密钥
Sign up at Harvard Art Museums API to get your API key.
Store it as an environment variable:
bash
export HARVARD_API_KEY="your_api_key_here"Or use a file:
.envHARVARD_API_KEY=your_api_key_here前往Harvard Art Museums API注册以获取API密钥。
将其存储为环境变量:
bash
export HARVARD_API_KEY="your_api_key_here"或使用文件:
.envHARVARD_API_KEY=your_api_key_here2. Database Connection
2. 数据库连接
Configure your MySQL/TiDB Cloud connection:
python
import os
import mysql.connector
db_config = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_NAME', 'harvard_art'),
'port': int(os.getenv('DB_PORT', 3306))
}
conn = mysql.connector.connect(**db_config)Environment variables needed:
DB_HOST=your_database_host
DB_USER=your_database_user
DB_PASSWORD=your_database_password
DB_NAME=harvard_art
DB_PORT=3306配置你的MySQL/TiDB Cloud连接:
python
import os
import mysql.connector
db_config = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_NAME', 'harvard_art'),
'port': int(os.getenv('DB_PORT', 3306))
}
conn = mysql.connector.connect(**db_config)所需环境变量:
DB_HOST=your_database_host
DB_USER=your_database_user
DB_PASSWORD=your_database_password
DB_NAME=harvard_art
DB_PORT=3306Database Schema
数据库架构
Create Tables
创建表
python
import mysql.connector
def create_tables(connection):
cursor = connection.cursor()
# Artifact Metadata Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS artifactmetadata (
id INT PRIMARY KEY,
title VARCHAR(500),
culture VARCHAR(200),
century VARCHAR(100),
classification VARCHAR(200),
department VARCHAR(200),
technique VARCHAR(300),
medium VARCHAR(300),
dimensions VARCHAR(300),
dated VARCHAR(200),
url VARCHAR(500),
INDEX idx_culture (culture),
INDEX idx_century (century),
INDEX idx_department (department)
)
""")
# Artifact Media Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS artifactmedia (
id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
media_type VARCHAR(50),
baseimageurl VARCHAR(500),
iiifbaseuri VARCHAR(500),
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id) ON DELETE CASCADE,
INDEX idx_artifact_id (artifact_id)
)
""")
# Artifact Colors Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS artifactcolors (
id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
color VARCHAR(50),
spectrum VARCHAR(50),
percent FLOAT,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id) ON DELETE CASCADE,
INDEX idx_artifact_id (artifact_id),
INDEX idx_color (color)
)
""")
connection.commit()
cursor.close()python
import mysql.connector
def create_tables(connection):
cursor = connection.cursor()
# Artifact Metadata Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS artifactmetadata (
id INT PRIMARY KEY,
title VARCHAR(500),
culture VARCHAR(200),
century VARCHAR(100),
classification VARCHAR(200),
department VARCHAR(200),
technique VARCHAR(300),
medium VARCHAR(300),
dimensions VARCHAR(300),
dated VARCHAR(200),
url VARCHAR(500),
INDEX idx_culture (culture),
INDEX idx_century (century),
INDEX idx_department (department)
)
""")
# Artifact Media Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS artifactmedia (
id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
media_type VARCHAR(50),
baseimageurl VARCHAR(500),
iiifbaseuri VARCHAR(500),
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id) ON DELETE CASCADE,
INDEX idx_artifact_id (artifact_id)
)
""")
# Artifact Colors Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS artifactcolors (
id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
color VARCHAR(50),
spectrum VARCHAR(50),
percent FLOAT,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id) ON DELETE CASCADE,
INDEX idx_artifact_id (artifact_id),
INDEX idx_color (color)
)
""")
connection.commit()
cursor.close()ETL Pipeline Implementation
ETL流水线实现
Extract: Fetch Data from API
提取:从API获取数据
python
import requests
import os
def fetch_artifacts(api_key, size=100, page=1):
"""
Fetch artifacts from Harvard Art Museums API with pagination
"""
base_url = "https://api.harvardartmuseums.org/object"
params = {
'apikey': api_key,
'size': size,
'page': page,
'hasimage': 1 # Only artifacts with images
}
try:
response = requests.get(base_url, params=params)
response.raise_for_status()
data = response.json()
return {
'records': data.get('records', []),
'total_pages': data.get('info', {}).get('pages', 1),
'total_records': data.get('info', {}).get('totalrecords', 0)
}
except requests.exceptions.RequestException as e:
print(f"API Error: {e}")
return Nonepython
import requests
import os
def fetch_artifacts(api_key, size=100, page=1):
"""
Fetch artifacts from Harvard Art Museums API with pagination
"""
base_url = "https://api.harvardartmuseums.org/object"
params = {
'apikey': api_key,
'size': size,
'page': page,
'hasimage': 1 # Only artifacts with images
}
try:
response = requests.get(base_url, params=params)
response.raise_for_status()
data = response.json()
return {
'records': data.get('records', []),
'total_pages': data.get('info', {}).get('pages', 1),
'total_records': data.get('info', {}).get('totalrecords', 0)
}
except requests.exceptions.RequestException as e:
print(f"API Error: {e}")
return NoneTransform: Process JSON to DataFrames
转换:将JSON处理为DataFrame
python
import pandas as pd
def transform_artifacts(records):
"""
Transform API records into normalized dataframes
"""
metadata_list = []
media_list = []
colors_list = []
for record in records:
artifact_id = record.get('id')
# Extract metadata
metadata_list.append({
'id': artifact_id,
'title': record.get('title', '')[:500],
'culture': record.get('culture', '')[:200],
'century': record.get('century', '')[:100],
'classification': record.get('classification', '')[:200],
'department': record.get('department', '')[:200],
'technique': record.get('technique', '')[:300],
'medium': record.get('medium', '')[:300],
'dimensions': record.get('dimensions', '')[:300],
'dated': record.get('dated', '')[:200],
'url': record.get('url', '')[:500]
})
# Extract media
images = record.get('images', [])
for img in images:
media_list.append({
'artifact_id': artifact_id,
'media_type': 'image',
'baseimageurl': img.get('baseimageurl', '')[:500],
'iiifbaseuri': img.get('iiifbaseuri', '')[:500]
})
# Extract colors
colors = record.get('colors', [])
for color in colors:
colors_list.append({
'artifact_id': artifact_id,
'color': color.get('color', '')[:50],
'spectrum': color.get('spectrum', '')[:50],
'percent': color.get('percent', 0.0)
})
df_metadata = pd.DataFrame(metadata_list)
df_media = pd.DataFrame(media_list)
df_colors = pd.DataFrame(colors_list)
return df_metadata, df_media, df_colorspython
import pandas as pd
def transform_artifacts(records):
"""
Transform API records into normalized dataframes
"""
metadata_list = []
media_list = []
colors_list = []
for record in records:
artifact_id = record.get('id')
# Extract metadata
metadata_list.append({
'id': artifact_id,
'title': record.get('title', '')[:500],
'culture': record.get('culture', '')[:200],
'century': record.get('century', '')[:100],
'classification': record.get('classification', '')[:200],
'department': record.get('department', '')[:200],
'technique': record.get('technique', '')[:300],
'medium': record.get('medium', '')[:300],
'dimensions': record.get('dimensions', '')[:300],
'dated': record.get('dated', '')[:200],
'url': record.get('url', '')[:500]
})
# Extract media
images = record.get('images', [])
for img in images:
media_list.append({
'artifact_id': artifact_id,
'media_type': 'image',
'baseimageurl': img.get('baseimageurl', '')[:500],
'iiifbaseuri': img.get('iiifbaseuri', '')[:500]
})
# Extract colors
colors = record.get('colors', [])
for color in colors:
colors_list.append({
'artifact_id': artifact_id,
'color': color.get('color', '')[:50],
'spectrum': color.get('spectrum', '')[:50],
'percent': color.get('percent', 0.0)
})
df_metadata = pd.DataFrame(metadata_list)
df_media = pd.DataFrame(media_list)
df_colors = pd.DataFrame(colors_list)
return df_metadata, df_media, df_colorsLoad: Insert into Database
加载:插入数据库
python
def load_to_database(df_metadata, df_media, df_colors, connection):
"""
Load dataframes into database tables using batch inserts
"""
cursor = connection.cursor()
try:
# Insert metadata (use REPLACE to handle duplicates)
if not df_metadata.empty:
metadata_query = """
REPLACE INTO artifactmetadata
(id, title, culture, century, classification, department,
technique, medium, dimensions, dated, url)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.executemany(metadata_query, df_metadata.values.tolist())
# Insert media
if not df_media.empty:
media_query = """
INSERT INTO artifactmedia
(artifact_id, media_type, baseimageurl, iiifbaseuri)
VALUES (%s, %s, %s, %s)
"""
cursor.executemany(media_query, df_media.values.tolist())
# Insert colors
if not df_colors.empty:
colors_query = """
INSERT INTO artifactcolors
(artifact_id, color, spectrum, percent)
VALUES (%s, %s, %s, %s)
"""
cursor.executemany(colors_query, df_colors.values.tolist())
connection.commit()
return True
except Exception as e:
connection.rollback()
print(f"Database Error: {e}")
return False
finally:
cursor.close()python
def load_to_database(df_metadata, df_media, df_colors, connection):
"""
Load dataframes into database tables using batch inserts
"""
cursor = connection.cursor()
try:
# Insert metadata (use REPLACE to handle duplicates)
if not df_metadata.empty:
metadata_query = """
REPLACE INTO artifactmetadata
(id, title, culture, century, classification, department,
technique, medium, dimensions, dated, url)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.executemany(metadata_query, df_metadata.values.tolist())
# Insert media
if not df_media.empty:
media_query = """
INSERT INTO artifactmedia
(artifact_id, media_type, baseimageurl, iiifbaseuri)
VALUES (%s, %s, %s, %s)
"""
cursor.executemany(media_query, df_media.values.tolist())
# Insert colors
if not df_colors.empty:
colors_query = """
INSERT INTO artifactcolors
(artifact_id, color, spectrum, percent)
VALUES (%s, %s, %s, %s)
"""
cursor.executemany(colors_query, df_colors.values.tolist())
connection.commit()
return True
except Exception as e:
connection.rollback()
print(f"Database Error: {e}")
return False
finally:
cursor.close()Complete ETL Workflow
完整ETL工作流
python
def run_etl_pipeline(api_key, db_config, num_pages=5):
"""
Execute complete ETL pipeline
"""
# Connect to database
conn = mysql.connector.connect(**db_config)
create_tables(conn)
total_loaded = 0
for page in range(1, num_pages + 1):
print(f"Processing page {page}/{num_pages}...")
# Extract
result = fetch_artifacts(api_key, size=100, page=page)
if not result or not result['records']:
break
# Transform
df_meta, df_media, df_colors = transform_artifacts(result['records'])
# Load
success = load_to_database(df_meta, df_media, df_colors, conn)
if success:
total_loaded += len(df_meta)
print(f"Loaded {len(df_meta)} artifacts")
conn.close()
print(f"ETL Complete: {total_loaded} total artifacts loaded")python
def run_etl_pipeline(api_key, db_config, num_pages=5):
"""
Execute complete ETL pipeline
"""
# Connect to database
conn = mysql.connector.connect(**db_config)
create_tables(conn)
total_loaded = 0
for page in range(1, num_pages + 1):
print(f"Processing page {page}/{num_pages}...")
# Extract
result = fetch_artifacts(api_key, size=100, page=page)
if not result or not result['records']:
break
# Transform
df_meta, df_media, df_colors = transform_artifacts(result['records'])
# Load
success = load_to_database(df_meta, df_media, df_colors, conn)
if success:
total_loaded += len(df_meta)
print(f"Loaded {len(df_meta)} artifacts")
conn.close()
print(f"ETL Complete: {total_loaded} total artifacts loaded")SQL Analytics Queries
SQL分析查询
Example Analytical Queries
示例分析查询
python
undefinedpython
undefinedTop 10 cultures by artifact count
Top 10 cultures by artifact count
QUERY_TOP_CULTURES = """
SELECT culture, COUNT(*) as artifact_count
FROM artifactmetadata
WHERE culture IS NOT NULL AND culture != ''
GROUP BY culture
ORDER BY artifact_count DESC
LIMIT 10
"""
QUERY_TOP_CULTURES = """
SELECT culture, COUNT(*) as artifact_count
FROM artifactmetadata
WHERE culture IS NOT NULL AND culture != ''
GROUP BY culture
ORDER BY artifact_count DESC
LIMIT 10
"""
Artifacts by century distribution
Artifacts by century distribution
QUERY_CENTURY_DISTRIBUTION = """
SELECT century, COUNT(*) as count
FROM artifactmetadata
WHERE century IS NOT NULL AND century != ''
GROUP BY century
ORDER BY count DESC
"""
QUERY_CENTURY_DISTRIBUTION = """
SELECT century, COUNT(*) as count
FROM artifactmetadata
WHERE century IS NOT NULL AND century != ''
GROUP BY century
ORDER BY count DESC
"""
Most common colors across artifacts
Most common colors across artifacts
QUERY_TOP_COLORS = """
SELECT color, COUNT(*) as frequency, AVG(percent) as avg_percent
FROM artifactcolors
GROUP BY color
ORDER BY frequency DESC
LIMIT 10
"""
QUERY_TOP_COLORS = """
SELECT color, COUNT(*) as frequency, AVG(percent) as avg_percent
FROM artifactcolors
GROUP BY color
ORDER BY frequency DESC
LIMIT 10
"""
Artifacts with media availability
Artifacts with media availability
QUERY_MEDIA_STATS = """
SELECT
COUNT(DISTINCT m.id) as total_artifacts,
COUNT(DISTINCT med.artifact_id) as artifacts_with_media,
ROUND(COUNT(DISTINCT med.artifact_id) * 100.0 / COUNT(DISTINCT m.id), 2) as media_percentage
FROM artifactmetadata m
LEFT JOIN artifactmedia med ON m.id = med.artifact_id
"""
QUERY_MEDIA_STATS = """
SELECT
COUNT(DISTINCT m.id) as total_artifacts,
COUNT(DISTINCT med.artifact_id) as artifacts_with_media,
ROUND(COUNT(DISTINCT med.artifact_id) * 100.0 / COUNT(DISTINCT m.id), 2) as media_percentage
FROM artifactmetadata m
LEFT JOIN artifactmedia med ON m.id = med.artifact_id
"""
Department-wise classification
Department-wise classification
QUERY_DEPT_CLASSIFICATION = """
SELECT department, classification, COUNT(*) as count
FROM artifactmetadata
WHERE department IS NOT NULL AND classification IS NOT NULL
GROUP BY department, classification
ORDER BY department, count DESC
"""
undefinedQUERY_DEPT_CLASSIFICATION = """
SELECT department, classification, COUNT(*) as count
FROM artifactmetadata
WHERE department IS NOT NULL AND classification IS NOT NULL
GROUP BY department, classification
ORDER BY department, count DESC
"""
undefinedExecute Queries
执行查询
python
def execute_analytics_query(query, connection):
"""
Execute analytical query and return results as DataFrame
"""
try:
df = pd.read_sql(query, connection)
return df
except Exception as e:
print(f"Query Error: {e}")
return pd.DataFrame()python
def execute_analytics_query(query, connection):
"""
Execute analytical query and return results as DataFrame
"""
try:
df = pd.read_sql(query, connection)
return df
except Exception as e:
print(f"Query Error: {e}")
return pd.DataFrame()Streamlit Dashboard
Streamlit仪表板
Basic App Structure
基础应用结构
python
import streamlit as st
import plotly.express as px
import mysql.connector
import os
st.set_page_config(page_title="Harvard Art Analytics", layout="wide")python
import streamlit as st
import plotly.express as px
import mysql.connector
import os
st.set_page_config(page_title="Harvard Art Analytics", layout="wide")Database connection
Database connection
@st.cache_resource
def get_connection():
return mysql.connector.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
st.title("🎨 Harvard Art Museums Analytics Dashboard")
@st.cache_resource
def get_connection():
return mysql.connector.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
st.title("🎨 Harvard Art Museums Analytics Dashboard")
Sidebar for navigation
Sidebar for navigation
page = st.sidebar.selectbox(
"Select Analysis",
["Overview", "Culture Analysis", "Color Patterns", "Department Stats"]
)
conn = get_connection()
if page == "Overview":
col1, col2, col3 = st.columns(3)
# Total artifacts
total = pd.read_sql("SELECT COUNT(*) as total FROM artifactmetadata", conn)
col1.metric("Total Artifacts", total['total'].iloc[0])
# Unique cultures
cultures = pd.read_sql("SELECT COUNT(DISTINCT culture) as count FROM artifactmetadata", conn)
col2.metric("Unique Cultures", cultures['count'].iloc[0])
# Artifacts with images
with_images = pd.read_sql("SELECT COUNT(DISTINCT artifact_id) as count FROM artifactmedia", conn)
col3.metric("With Images", with_images['count'].iloc[0])elif page == "Culture Analysis":
st.subheader("Top Cultures by Artifact Count")
df = pd.read_sql(QUERY_TOP_CULTURES, conn)
fig = px.bar(df, x='culture', y='artifact_count',
title='Top 10 Cultures',
labels={'artifact_count': 'Number of Artifacts'})
st.plotly_chart(fig, use_container_width=True)
st.dataframe(df, use_container_width=True)undefinedpage = st.sidebar.selectbox(
"Select Analysis",
["Overview", "Culture Analysis", "Color Patterns", "Department Stats"]
)
conn = get_connection()
if page == "Overview":
col1, col2, col3 = st.columns(3)
# Total artifacts
total = pd.read_sql("SELECT COUNT(*) as total FROM artifactmetadata", conn)
col1.metric("Total Artifacts", total['total'].iloc[0])
# Unique cultures
cultures = pd.read_sql("SELECT COUNT(DISTINCT culture) as count FROM artifactmetadata", conn)
col2.metric("Unique Cultures", cultures['count'].iloc[0])
# Artifacts with images
with_images = pd.read_sql("SELECT COUNT(DISTINCT artifact_id) as count FROM artifactmedia", conn)
col3.metric("With Images", with_images['count'].iloc[0])elif page == "Culture Analysis":
st.subheader("Top Cultures by Artifact Count")
df = pd.read_sql(QUERY_TOP_CULTURES, conn)
fig = px.bar(df, x='culture', y='artifact_count',
title='Top 10 Cultures',
labels={'artifact_count': 'Number of Artifacts'})
st.plotly_chart(fig, use_container_width=True)
st.dataframe(df, use_container_width=True)undefinedVisualization Patterns
可视化模式
python
undefinedpython
undefinedBar chart for categorical data
Bar chart for categorical data
def create_bar_chart(df, x_col, y_col, title):
fig = px.bar(df, x=x_col, y=y_col, title=title)
fig.update_layout(xaxis_tickangle=-45)
return fig
def create_bar_chart(df, x_col, y_col, title):
fig = px.bar(df, x=x_col, y=y_col, title=title)
fig.update_layout(xaxis_tickangle=-45)
return fig
Pie chart for proportions
Pie chart for proportions
def create_pie_chart(df, names_col, values_col, title):
fig = px.pie(df, names=names_col, values=values_col, title=title)
return fig
def create_pie_chart(df, names_col, values_col, title):
fig = px.pie(df, names=names_col, values=values_col, title=title)
return fig
Histogram for distributions
Histogram for distributions
def create_histogram(df, column, title):
fig = px.histogram(df, x=column, title=title)
return fig
undefineddef create_histogram(df, column, title):
fig = px.histogram(df, x=column, title=title)
return fig
undefinedRunning the Application
运行应用
bash
undefinedbash
undefinedSet environment variables
Set environment variables
export HARVARD_API_KEY="your_api_key"
export DB_HOST="localhost"
export DB_USER="root"
export DB_PASSWORD="your_password"
export DB_NAME="harvard_art"
export HARVARD_API_KEY="your_api_key"
export DB_HOST="localhost"
export DB_USER="root"
export DB_PASSWORD="your_password"
export DB_NAME="harvard_art"
Run Streamlit app
Run Streamlit app
streamlit run app.py
undefinedstreamlit run app.py
undefinedCommon Patterns
通用模式
Rate Limiting for API Calls
API调用速率限制
python
import time
def fetch_with_rate_limit(api_key, pages, delay=1):
"""
Fetch multiple pages with rate limiting
"""
all_records = []
for page in range(1, pages + 1):
result = fetch_artifacts(api_key, page=page)
if result:
all_records.extend(result['records'])
time.sleep(delay) # Respect API rate limits
return all_recordspython
import time
def fetch_with_rate_limit(api_key, pages, delay=1):
"""
Fetch multiple pages with rate limiting
"""
all_records = []
for page in range(1, pages + 1):
result = fetch_artifacts(api_key, page=page)
if result:
all_records.extend(result['records'])
time.sleep(delay) # Respect API rate limits
return all_recordsError Handling in ETL
ETL中的错误处理
python
def safe_etl_run(api_key, db_config):
"""
ETL with comprehensive error handling
"""
conn = None
try:
conn = mysql.connector.connect(**db_config)
create_tables(conn)
result = fetch_artifacts(api_key)
if not result:
raise Exception("Failed to fetch data from API")
df_meta, df_media, df_colors = transform_artifacts(result['records'])
if df_meta.empty:
raise Exception("No data to load")
load_to_database(df_meta, df_media, df_colors, conn)
return {"status": "success", "records": len(df_meta)}
except Exception as e:
return {"status": "error", "message": str(e)}
finally:
if conn:
conn.close()python
def safe_etl_run(api_key, db_config):
"""
ETL with comprehensive error handling
"""
conn = None
try:
conn = mysql.connector.connect(**db_config)
create_tables(conn)
result = fetch_artifacts(api_key)
if not result:
raise Exception("Failed to fetch data from API")
df_meta, df_media, df_colors = transform_artifacts(result['records'])
if df_meta.empty:
raise Exception("No data to load")
load_to_database(df_meta, df_media, df_colors, conn)
return {"status": "success", "records": len(df_meta)}
except Exception as e:
return {"status": "error", "message": str(e)}
finally:
if conn:
conn.close()Troubleshooting
故障排查
API Connection Issues
API连接问题
python
undefinedpython
undefinedTest API connection
Test API connection
def test_api_connection(api_key):
try:
response = requests.get(
"https://api.harvardartmuseums.org/object",
params={'apikey': api_key, 'size': 1},
timeout=10
)
if response.status_code == 200:
print("✓ API connection successful")
return True
elif response.status_code == 401:
print("✗ Invalid API key")
else:
print(f"✗ API returned status code: {response.status_code}")
return False
except Exception as e:
print(f"✗ Connection error: {e}")
return False
undefineddef test_api_connection(api_key):
try:
response = requests.get(
"https://api.harvardartmuseums.org/object",
params={'apikey': api_key, 'size': 1},
timeout=10
)
if response.status_code == 200:
print("✓ API connection successful")
return True
elif response.status_code == 401:
print("✗ Invalid API key")
else:
print(f"✗ API returned status code: {response.status_code}")
return False
except Exception as e:
print(f"✗ Connection error: {e}")
return False
undefinedDatabase Connection Issues
数据库连接问题
python
undefinedpython
undefinedTest database connection
Test database connection
def test_db_connection(db_config):
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
cursor.close()
conn.close()
print("✓ Database connection successful")
return True
except mysql.connector.Error as e:
print(f"✗ Database error: {e}")
return False
undefineddef test_db_connection(db_config):
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
cursor.close()
conn.close()
print("✓ Database connection successful")
return True
except mysql.connector.Error as e:
print(f"✗ Database error: {e}")
return False
undefinedData Quality Checks
数据质量检查
python
def validate_data_quality(df_metadata):
"""
Check data quality before loading
"""
issues = []
if df_metadata['id'].duplicated().any():
issues.append("Duplicate artifact IDs found")
if df_metadata['id'].isnull().any():
issues.append("Null artifact IDs found")
if len(df_metadata) == 0:
issues.append("Empty dataset")
return issues if issues else NoneThis skill provides a complete reference for building and operating the Harvard Art Museums data pipeline with ETL, SQL analytics, and interactive dashboards.
python
def validate_data_quality(df_metadata):
"""
Check data quality before loading
"""
issues = []
if df_metadata['id'].duplicated().any():
issues.append("Duplicate artifact IDs found")
if df_metadata['id'].isnull().any():
issues.append("Null artifact IDs found")
if len(df_metadata) == 0:
issues.append("Empty dataset")
return issues if issues else None本技能为构建和运行包含ETL、SQL分析及交互式仪表板的Harvard Art Museums数据流水线提供了完整参考。