harvard-artifacts-collection-etl-analytics
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseHarvard Artifacts Collection ETL Analytics
哈佛文物收藏ETL分析
Skill by ara.so — Data Skills collection
This skill enables you to build end-to-end data engineering and analytics applications using the Harvard Art Museums API. It demonstrates real-world ETL pipelines, SQL database design, analytical queries, and interactive data visualization using Streamlit.
由ara.so提供的技能——数据技能合集
本技能可让你基于哈佛艺术博物馆API构建端到端的数据工程与分析应用,展示了真实场景下的ETL管道、SQL数据库设计、分析查询以及使用Streamlit实现的交互式数据可视化。
What It Does
功能介绍
The Harvard Artifacts Collection ETL Analytics project provides:
- API Integration: Fetch artifact data from Harvard Art Museums API with pagination and rate limiting
- ETL Pipeline: Extract, transform, and load nested JSON data into relational SQL tables
- Database Design: Structured schema with ,
artifactmetadata, andartifactmediatablesartifactcolors - SQL Analytics: 20+ predefined analytical queries for insights
- Interactive Dashboards: Streamlit-based UI with Plotly visualizations
哈佛文物收藏ETL分析项目提供以下功能:
- API集成:通过分页和速率限制从哈佛艺术博物馆API获取文物数据
- ETL管道:将嵌套JSON数据提取、转换并加载到关系型SQL表中
- 数据库设计:包含、
artifactmetadata和artifactmedia表的结构化schemaartifactcolors - SQL分析:20+预定义分析查询以获取洞察
- 交互式仪表盘:基于Streamlit的UI搭配Plotly可视化
Installation
安装
bash
undefinedbash
undefinedClone the repository
Clone the repository
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
Install dependencies
Install dependencies
pip install -r requirements.txt
pip install -r requirements.txt
Required packages
Required packages
pip install streamlit pandas requests mysql-connector-python plotly python-dotenv
undefinedpip install streamlit pandas requests mysql-connector-python plotly python-dotenv
undefinedConfiguration
配置
Environment Variables
环境变量
Create a file in the project root:
.envenv
HARVARD_API_KEY=your_api_key_here
DB_HOST=your_database_host
DB_PORT=3306
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifacts在项目根目录创建文件:
.envenv
HARVARD_API_KEY=your_api_key_here
DB_HOST=your_database_host
DB_PORT=3306
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifactsGet Harvard API Key
获取哈佛API密钥
- Visit Harvard Art Museums API
- Register for a free API key
- Add the key to your file
.env
- 访问Harvard Art Museums API
- 注册获取免费API密钥
- 将密钥添加到文件中
.env
Database Setup
数据库设置
Create the required database schema:
sql
CREATE DATABASE IF NOT EXISTS harvard_artifacts;
USE harvard_artifacts;
CREATE TABLE artifactmetadata (
id INT PRIMARY KEY,
title VARCHAR(500),
culture VARCHAR(200),
period VARCHAR(200),
century VARCHAR(100),
classification VARCHAR(200),
department VARCHAR(200),
technique VARCHAR(500),
dated VARCHAR(200),
url VARCHAR(500),
description TEXT
);
CREATE TABLE artifactmedia (
id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
media_type VARCHAR(100),
base_image_url VARCHAR(500),
has_image BOOLEAN,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);
CREATE TABLE artifactcolors (
id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
color_hex VARCHAR(10),
color_name VARCHAR(100),
percentage FLOAT,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);创建所需的数据库schema:
sql
CREATE DATABASE IF NOT EXISTS harvard_artifacts;
USE harvard_artifacts;
CREATE TABLE artifactmetadata (
id INT PRIMARY KEY,
title VARCHAR(500),
culture VARCHAR(200),
period VARCHAR(200),
century VARCHAR(100),
classification VARCHAR(200),
department VARCHAR(200),
technique VARCHAR(500),
dated VARCHAR(200),
url VARCHAR(500),
description TEXT
);
CREATE TABLE artifactmedia (
id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
media_type VARCHAR(100),
base_image_url VARCHAR(500),
has_image BOOLEAN,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);
CREATE TABLE artifactcolors (
id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
color_hex VARCHAR(10),
color_name VARCHAR(100),
percentage FLOAT,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);Core API Integration
核心API集成
Fetching Artifact Data
获取文物数据
python
import requests
import os
from dotenv import load_dotenv
load_dotenv()
def fetch_artifacts(page=1, size=100):
"""Fetch artifacts from Harvard Art Museums API"""
api_key = os.getenv('HARVARD_API_KEY')
base_url = "https://api.harvardartmuseums.org/object"
params = {
'apikey': api_key,
'page': page,
'size': size,
'hasimage': 1 # Only fetch artifacts with images
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"API request failed: {response.status_code}")python
import requests
import os
from dotenv import load_dotenv
load_dotenv()
def fetch_artifacts(page=1, size=100):
"""Fetch artifacts from Harvard Art Museums API"""
api_key = os.getenv('HARVARD_API_KEY')
base_url = "https://api.harvardartmuseums.org/object"
params = {
'apikey': api_key,
'page': page,
'size': size,
'hasimage': 1 # Only fetch artifacts with images
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"API request failed: {response.status_code}")Fetch first page of artifacts
Fetch first page of artifacts
data = fetch_artifacts(page=1, size=50)
print(f"Total records: {data['info']['totalrecords']}")
print(f"Fetched: {len(data['records'])} artifacts")
undefineddata = fetch_artifacts(page=1, size=50)
print(f"Total records: {data['info']['totalrecords']}")
print(f"Fetched: {len(data['records'])} artifacts")
undefinedHandling Pagination
处理分页
python
def fetch_all_artifacts(max_pages=10):
"""Fetch multiple pages of artifacts with pagination"""
all_artifacts = []
for page in range(1, max_pages + 1):
try:
data = fetch_artifacts(page=page, size=100)
artifacts = data.get('records', [])
all_artifacts.extend(artifacts)
print(f"Fetched page {page}: {len(artifacts)} artifacts")
# Check if there are more pages
if len(artifacts) == 0:
break
except Exception as e:
print(f"Error on page {page}: {e}")
break
return all_artifactspython
def fetch_all_artifacts(max_pages=10):
"""Fetch multiple pages of artifacts with pagination"""
all_artifacts = []
for page in range(1, max_pages + 1):
try:
data = fetch_artifacts(page=page, size=100)
artifacts = data.get('records', [])
all_artifacts.extend(artifacts)
print(f"Fetched page {page}: {len(artifacts)} artifacts")
# Check if there are more pages
if len(artifacts) == 0:
break
except Exception as e:
print(f"Error on page {page}: {e}")
break
return all_artifactsETL Pipeline Implementation
ETL管道实现
Extract and Transform
提取与转换
python
import pandas as pd
def transform_artifact_metadata(raw_data):
"""Transform raw API data to metadata DataFrame"""
metadata_list = []
for artifact in raw_data:
metadata = {
'id': artifact.get('id'),
'title': artifact.get('title'),
'culture': artifact.get('culture'),
'period': artifact.get('period'),
'century': artifact.get('century'),
'classification': artifact.get('classification'),
'department': artifact.get('department'),
'technique': artifact.get('technique'),
'dated': artifact.get('dated'),
'url': artifact.get('url'),
'description': artifact.get('description')
}
metadata_list.append(metadata)
return pd.DataFrame(metadata_list)
def transform_artifact_media(raw_data):
"""Transform media information to DataFrame"""
media_list = []
for artifact in raw_data:
artifact_id = artifact.get('id')
primary_image = artifact.get('primaryimageurl')
has_image = 1 if primary_image else 0
media = {
'artifact_id': artifact_id,
'media_type': 'image',
'base_image_url': primary_image,
'has_image': has_image
}
media_list.append(media)
return pd.DataFrame(media_list)
def transform_artifact_colors(raw_data):
"""Transform color information to DataFrame"""
colors_list = []
for artifact in raw_data:
artifact_id = artifact.get('id')
colors = artifact.get('colors', [])
for color in colors:
color_data = {
'artifact_id': artifact_id,
'color_hex': color.get('hex'),
'color_name': color.get('color'),
'percentage': color.get('percent')
}
colors_list.append(color_data)
return pd.DataFrame(colors_list)python
import pandas as pd
def transform_artifact_metadata(raw_data):
"""Transform raw API data to metadata DataFrame"""
metadata_list = []
for artifact in raw_data:
metadata = {
'id': artifact.get('id'),
'title': artifact.get('title'),
'culture': artifact.get('culture'),
'period': artifact.get('period'),
'century': artifact.get('century'),
'classification': artifact.get('classification'),
'department': artifact.get('department'),
'technique': artifact.get('technique'),
'dated': artifact.get('dated'),
'url': artifact.get('url'),
'description': artifact.get('description')
}
metadata_list.append(metadata)
return pd.DataFrame(metadata_list)
def transform_artifact_media(raw_data):
"""Transform media information to DataFrame"""
media_list = []
for artifact in raw_data:
artifact_id = artifact.get('id')
primary_image = artifact.get('primaryimageurl')
has_image = 1 if primary_image else 0
media = {
'artifact_id': artifact_id,
'media_type': 'image',
'base_image_url': primary_image,
'has_image': has_image
}
media_list.append(media)
return pd.DataFrame(media_list)
def transform_artifact_colors(raw_data):
"""Transform color information to DataFrame"""
colors_list = []
for artifact in raw_data:
artifact_id = artifact.get('id')
colors = artifact.get('colors', [])
for color in colors:
color_data = {
'artifact_id': artifact_id,
'color_hex': color.get('hex'),
'color_name': color.get('color'),
'percentage': color.get('percent')
}
colors_list.append(color_data)
return pd.DataFrame(colors_list)Load to Database
加载到数据库
python
import mysql.connector
from mysql.connector import Error
def get_db_connection():
"""Create database connection"""
return mysql.connector.connect(
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT', 3306),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
def load_metadata(df_metadata):
"""Load metadata DataFrame to database"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactmetadata
(id, title, culture, period, century, classification, department, technique, dated, url, description)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title=VALUES(title), culture=VALUES(culture), period=VALUES(period)
"""
data_tuples = [tuple(x) for x in df_metadata.to_numpy()]
cursor.executemany(insert_query, data_tuples)
conn.commit()
cursor.close()
conn.close()
print(f"Loaded {len(df_metadata)} metadata records")
def load_media(df_media):
"""Load media DataFrame to database"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactmedia
(artifact_id, media_type, base_image_url, has_image)
VALUES (%s, %s, %s, %s)
"""
data_tuples = [tuple(x) for x in df_media.to_numpy()]
cursor.executemany(insert_query, data_tuples)
conn.commit()
cursor.close()
conn.close()
def load_colors(df_colors):
"""Load colors DataFrame to database"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactcolors
(artifact_id, color_hex, color_name, percentage)
VALUES (%s, %s, %s, %s)
"""
data_tuples = [tuple(x) for x in df_colors.to_numpy()]
cursor.executemany(insert_query, data_tuples)
conn.commit()
cursor.close()
conn.close()python
import mysql.connector
from mysql.connector import Error
def get_db_connection():
"""Create database connection"""
return mysql.connector.connect(
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT', 3306),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
def load_metadata(df_metadata):
"""Load metadata DataFrame to database"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactmetadata
(id, title, culture, period, century, classification, department, technique, dated, url, description)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title=VALUES(title), culture=VALUES(culture), period=VALUES(period)
"""
data_tuples = [tuple(x) for x in df_metadata.to_numpy()]
cursor.executemany(insert_query, data_tuples)
conn.commit()
cursor.close()
conn.close()
print(f"Loaded {len(df_metadata)} metadata records")
def load_media(df_media):
"""Load media DataFrame to database"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactmedia
(artifact_id, media_type, base_image_url, has_image)
VALUES (%s, %s, %s, %s)
"""
data_tuples = [tuple(x) for x in df_media.to_numpy()]
cursor.executemany(insert_query, data_tuples)
conn.commit()
cursor.close()
conn.close()
def load_colors(df_colors):
"""Load colors DataFrame to database"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactcolors
(artifact_id, color_hex, color_name, percentage)
VALUES (%s, %s, %s, %s)
"""
data_tuples = [tuple(x) for x in df_colors.to_numpy()]
cursor.executemany(insert_query, data_tuples)
conn.commit()
cursor.close()
conn.close()Complete ETL Pipeline
完整ETL管道
python
def run_etl_pipeline(num_pages=5):
"""Execute complete ETL pipeline"""
print("Starting ETL Pipeline...")
# Extract
print("1. Extracting data from API...")
raw_artifacts = fetch_all_artifacts(max_pages=num_pages)
print(f"Extracted {len(raw_artifacts)} artifacts")
# Transform
print("2. Transforming data...")
df_metadata = transform_artifact_metadata(raw_artifacts)
df_media = transform_artifact_media(raw_artifacts)
df_colors = transform_artifact_colors(raw_artifacts)
# Load
print("3. Loading data to database...")
load_metadata(df_metadata)
load_media(df_media)
load_colors(df_colors)
print("ETL Pipeline completed successfully!")
return df_metadata, df_media, df_colorspython
def run_etl_pipeline(num_pages=5):
"""Execute complete ETL pipeline"""
print("Starting ETL Pipeline...")
# Extract
print("1. Extracting data from API...")
raw_artifacts = fetch_all_artifacts(max_pages=num_pages)
print(f"Extracted {len(raw_artifacts)} artifacts")
# Transform
print("2. Transforming data...")
df_metadata = transform_artifact_metadata(raw_artifacts)
df_media = transform_artifact_media(raw_artifacts)
df_colors = transform_artifact_colors(raw_artifacts)
# Load
print("3. Loading data to database...")
load_metadata(df_metadata)
load_media(df_media)
load_colors(df_colors)
print("ETL Pipeline completed successfully!")
return df_metadata, df_media, df_colorsSQL Analytics Queries
SQL分析查询
Common Analytics Patterns
常见分析模式
python
def execute_query(query):
"""Execute SQL query and return results as DataFrame"""
conn = get_db_connection()
df = pd.read_sql(query, conn)
conn.close()
return dfpython
def execute_query(query):
"""Execute SQL query and return results as DataFrame"""
conn = get_db_connection()
df = pd.read_sql(query, conn)
conn.close()
return dfArtifacts by Culture
Artifacts by Culture
query_culture = """
SELECT culture, COUNT(*) as count
FROM artifactmetadata
WHERE culture IS NOT NULL
GROUP BY culture
ORDER BY count DESC
LIMIT 10
"""
query_culture = """
SELECT culture, COUNT(*) as count
FROM artifactmetadata
WHERE culture IS NOT NULL
GROUP BY culture
ORDER BY count DESC
LIMIT 10
"""
Artifacts by Century
Artifacts by Century
query_century = """
SELECT century, COUNT(*) as count
FROM artifactmetadata
WHERE century IS NOT NULL
GROUP BY century
ORDER BY count DESC
"""
query_century = """
SELECT century, COUNT(*) as count
FROM artifactmetadata
WHERE century IS NOT NULL
GROUP BY century
ORDER BY count DESC
"""
Top Classifications
Top Classifications
query_classification = """
SELECT classification, COUNT(*) as count
FROM artifactmetadata
WHERE classification IS NOT NULL
GROUP BY classification
ORDER BY count DESC
LIMIT 15
"""
query_classification = """
SELECT classification, COUNT(*) as count
FROM artifactmetadata
WHERE classification IS NOT NULL
GROUP BY classification
ORDER BY count DESC
LIMIT 15
"""
Media Availability
Media Availability
query_media = """
SELECT
has_image,
COUNT() as count,
ROUND(COUNT() * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
FROM artifactmedia
GROUP BY has_image
"""
query_media = """
SELECT
has_image,
COUNT() as count,
ROUND(COUNT() * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
FROM artifactmedia
GROUP BY has_image
"""
Top Colors Used
Top Colors Used
query_colors = """
SELECT
color_name,
COUNT(*) as occurrences,
ROUND(AVG(percentage), 2) as avg_percentage
FROM artifactcolors
WHERE color_name IS NOT NULL
GROUP BY color_name
ORDER BY occurrences DESC
LIMIT 10
"""
query_colors = """
SELECT
color_name,
COUNT(*) as occurrences,
ROUND(AVG(percentage), 2) as avg_percentage
FROM artifactcolors
WHERE color_name IS NOT NULL
GROUP BY color_name
ORDER BY occurrences DESC
LIMIT 10
"""
Department-wise Distribution
Department-wise Distribution
query_department = """
SELECT
department,
COUNT(*) as artifact_count,
SUM(CASE WHEN m.has_image = 1 THEN 1 ELSE 0 END) as with_images
FROM artifactmetadata a
LEFT JOIN artifactmedia m ON a.id = m.artifact_id
WHERE department IS NOT NULL
GROUP BY department
ORDER BY artifact_count DESC
"""
undefinedquery_department = """
SELECT
department,
COUNT(*) as artifact_count,
SUM(CASE WHEN m.has_image = 1 THEN 1 ELSE 0 END) as with_images
FROM artifactmetadata a
LEFT JOIN artifactmedia m ON a.id = m.artifact_id
WHERE department IS NOT NULL
GROUP BY department
ORDER BY artifact_count DESC
"""
undefinedStreamlit Dashboard
Streamlit仪表盘
Basic App Structure
基础应用结构
python
import streamlit as st
import plotly.express as px
def main():
st.set_page_config(page_title="Harvard Artifacts Analytics", layout="wide")
st.title("🏛️ Harvard Art Museums Analytics Dashboard")
st.markdown("---")
# Sidebar for navigation
page = st.sidebar.selectbox(
"Select Page",
["ETL Pipeline", "Analytics Dashboard", "Data Explorer"]
)
if page == "ETL Pipeline":
show_etl_page()
elif page == "Analytics Dashboard":
show_analytics_page()
else:
show_explorer_page()
def show_etl_page():
st.header("📥 ETL Pipeline")
col1, col2 = st.columns(2)
with col1:
num_pages = st.number_input("Number of pages to fetch", min_value=1, max_value=50, value=5)
with col2:
if st.button("Run ETL Pipeline"):
with st.spinner("Running ETL pipeline..."):
df_meta, df_media, df_colors = run_etl_pipeline(num_pages)
st.success(f"✅ Loaded {len(df_meta)} artifacts successfully!")
# Show summary
st.metric("Total Artifacts", len(df_meta))
st.metric("Total Media Records", len(df_media))
st.metric("Total Color Records", len(df_colors))
def show_analytics_page():
st.header("📊 Analytics Dashboard")
# Query selector
queries = {
"Artifacts by Culture": query_culture,
"Artifacts by Century": query_century,
"Top Classifications": query_classification,
"Media Availability": query_media,
"Top Colors": query_colors,
"Department Distribution": query_department
}
selected_query = st.selectbox("Select Analysis", list(queries.keys()))
if st.button("Run Analysis"):
with st.spinner("Executing query..."):
df_result = execute_query(queries[selected_query])
# Display results
st.dataframe(df_result, use_container_width=True)
# Auto-generate visualization
if len(df_result.columns) >= 2:
fig = px.bar(
df_result,
x=df_result.columns[0],
y=df_result.columns[1],
title=selected_query
)
st.plotly_chart(fig, use_container_width=True)
if __name__ == "__main__":
main()python
import streamlit as st
import plotly.express as px
def main():
st.set_page_config(page_title="Harvard Artifacts Analytics", layout="wide")
st.title("🏛️ Harvard Art Museums Analytics Dashboard")
st.markdown("---")
# Sidebar for navigation
page = st.sidebar.selectbox(
"Select Page",
["ETL Pipeline", "Analytics Dashboard", "Data Explorer"]
)
if page == "ETL Pipeline":
show_etl_page()
elif page == "Analytics Dashboard":
show_analytics_page()
else:
show_explorer_page()
def show_etl_page():
st.header("📥 ETL Pipeline")
col1, col2 = st.columns(2)
with col1:
num_pages = st.number_input("Number of pages to fetch", min_value=1, max_value=50, value=5)
with col2:
if st.button("Run ETL Pipeline"):
with st.spinner("Running ETL pipeline..."):
df_meta, df_media, df_colors = run_etl_pipeline(num_pages)
st.success(f"✅ Loaded {len(df_meta)} artifacts successfully!")
# Show summary
st.metric("Total Artifacts", len(df_meta))
st.metric("Total Media Records", len(df_media))
st.metric("Total Color Records", len(df_colors))
def show_analytics_page():
st.header("📊 Analytics Dashboard")
# Query selector
queries = {
"Artifacts by Culture": query_culture,
"Artifacts by Century": query_century,
"Top Classifications": query_classification,
"Media Availability": query_media,
"Top Colors": query_colors,
"Department Distribution": query_department
}
selected_query = st.selectbox("Select Analysis", list(queries.keys()))
if st.button("Run Analysis"):
with st.spinner("Executing query..."):
df_result = execute_query(queries[selected_query])
# Display results
st.dataframe(df_result, use_container_width=True)
# Auto-generate visualization
if len(df_result.columns) >= 2:
fig = px.bar(
df_result,
x=df_result.columns[0],
y=df_result.columns[1],
title=selected_query
)
st.plotly_chart(fig, use_container_width=True)
if __name__ == "__main__":
main()Advanced Visualization
高级可视化
python
def create_color_distribution_chart(df_colors):
"""Create interactive color distribution visualization"""
fig = px.treemap(
df_colors,
path=['color_name'],
values='occurrences',
title='Color Distribution in Artifacts',
color='avg_percentage',
color_continuous_scale='Viridis'
)
return fig
def create_timeline_chart(df_century):
"""Create timeline chart for artifacts by century"""
fig = px.line(
df_century,
x='century',
y='count',
title='Artifact Collection Timeline',
markers=True
)
fig.update_layout(
xaxis_title="Century",
yaxis_title="Number of Artifacts"
)
return fig
def create_department_comparison(df_dept):
"""Create department comparison with images"""
fig = px.bar(
df_dept,
x='department',
y=['artifact_count', 'with_images'],
title='Artifacts by Department (Total vs With Images)',
barmode='group'
)
return figpython
def create_color_distribution_chart(df_colors):
"""Create interactive color distribution visualization"""
fig = px.treemap(
df_colors,
path=['color_name'],
values='occurrences',
title='Color Distribution in Artifacts',
color='avg_percentage',
color_continuous_scale='Viridis'
)
return fig
def create_timeline_chart(df_century):
"""Create timeline chart for artifacts by century"""
fig = px.line(
df_century,
x='century',
y='count',
title='Artifact Collection Timeline',
markers=True
)
fig.update_layout(
xaxis_title="Century",
yaxis_title="Number of Artifacts"
)
return fig
def create_department_comparison(df_dept):
"""Create department comparison with images"""
fig = px.bar(
df_dept,
x='department',
y=['artifact_count', 'with_images'],
title='Artifacts by Department (Total vs With Images)',
barmode='group'
)
return figRunning the Application
运行应用
bash
undefinedbash
undefinedStart the Streamlit app
Start the Streamlit app
streamlit run app.py
streamlit run app.py
The app will be available at http://localhost:8501
The app will be available at http://localhost:8501
undefinedundefinedCommon Patterns
常见模式
Batch Processing with Rate Limiting
带速率限制的批量处理
python
import time
def fetch_with_rate_limit(max_pages, delay=1):
"""Fetch artifacts with rate limiting"""
artifacts = []
for page in range(1, max_pages + 1):
data = fetch_artifacts(page=page)
artifacts.extend(data['records'])
# Rate limit: wait between requests
time.sleep(delay)
if page % 10 == 0:
print(f"Progress: {page}/{max_pages} pages")
return artifactspython
import time
def fetch_with_rate_limit(max_pages, delay=1):
"""Fetch artifacts with rate limiting"""
artifacts = []
for page in range(1, max_pages + 1):
data = fetch_artifacts(page=page)
artifacts.extend(data['records'])
# Rate limit: wait between requests
time.sleep(delay)
if page % 10 == 0:
print(f"Progress: {page}/{max_pages} pages")
return artifactsIncremental Data Loading
增量数据加载
python
def get_latest_artifact_id():
"""Get the latest artifact ID in database"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT MAX(id) FROM artifactmetadata")
result = cursor.fetchone()
conn.close()
return result[0] if result[0] else 0
def incremental_etl():
"""Load only new artifacts"""
latest_id = get_latest_artifact_id()
print(f"Latest artifact ID: {latest_id}")
# Fetch new artifacts with filter
# Implementation depends on API support for ID filtering
passpython
def get_latest_artifact_id():
"""Get the latest artifact ID in database"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT MAX(id) FROM artifactmetadata")
result = cursor.fetchone()
conn.close()
return result[0] if result[0] else 0
def incremental_etl():
"""Load only new artifacts"""
latest_id = get_latest_artifact_id()
print(f"Latest artifact ID: {latest_id}")
# Fetch new artifacts with filter
# Implementation depends on API support for ID filtering
passData Quality Checks
数据质量检查
python
def validate_data_quality(df):
"""Check data quality before loading"""
issues = []
# Check for nulls in critical fields
if df['id'].isnull().any():
issues.append("ID field contains null values")
# Check for duplicates
if df['id'].duplicated().any():
issues.append(f"Found {df['id'].duplicated().sum()} duplicate IDs")
# Check data types
if not pd.api.types.is_integer_dtype(df['id']):
issues.append("ID field is not integer type")
return issuespython
def validate_data_quality(df):
"""Check data quality before loading"""
issues = []
# Check for nulls in critical fields
if df['id'].isnull().any():
issues.append("ID field contains null values")
# Check for duplicates
if df['id'].duplicated().any():
issues.append(f"Found {df['id'].duplicated().sum()} duplicate IDs")
# Check data types
if not pd.api.types.is_integer_dtype(df['id']):
issues.append("ID field is not integer type")
return issuesTroubleshooting
故障排除
API Rate Limiting
API速率限制
If you encounter rate limit errors:
python
import time
from functools import wraps
def retry_with_backoff(max_retries=3, backoff_factor=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt
print(f"Retry {attempt + 1}/{max_retries} after {wait_time}s")
time.sleep(wait_time)
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def fetch_artifacts_with_retry(page, size):
return fetch_artifacts(page, size)如果遇到速率限制错误:
python
import time
from functools import wraps
def retry_with_backoff(max_retries=3, backoff_factor=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt
print(f"Retry {attempt + 1}/{max_retries} after {wait_time}s")
time.sleep(wait_time)
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def fetch_artifacts_with_retry(page, size):
return fetch_artifacts(page, size)Database Connection Issues
数据库连接问题
python
def test_db_connection():
"""Test database connection"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
conn.close()
print("✅ Database connection successful")
return True
except Error as e:
print(f"❌ Database connection failed: {e}")
return Falsepython
def test_db_connection():
"""Test database connection"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
conn.close()
print("✅ Database connection successful")
return True
except Error as e:
print(f"❌ Database connection failed: {e}")
return FalseHandling Missing Data
处理缺失数据
python
def safe_get(dictionary, key, default=''):
"""Safely get value from dictionary"""
value = dictionary.get(key, default)
return value if value is not None else default
def transform_with_defaults(artifact):
"""Transform artifact with default values for missing fields"""
return {
'id': artifact.get('id', 0),
'title': safe_get(artifact, 'title', 'Untitled'),
'culture': safe_get(artifact, 'culture', 'Unknown'),
'century': safe_get(artifact, 'century', 'Unknown'),
# ... other fields
}python
def safe_get(dictionary, key, default=''):
"""Safely get value from dictionary"""
value = dictionary.get(key, default)
return value if value is not None else default
def transform_with_defaults(artifact):
"""Transform artifact with default values for missing fields"""
return {
'id': artifact.get('id', 0),
'title': safe_get(artifact, 'title', 'Untitled'),
'culture': safe_get(artifact, 'culture', 'Unknown'),
'century': safe_get(artifact, 'century', 'Unknown'),
# ... other fields
}Memory Management for Large Datasets
大型数据集的内存管理
python
def chunked_load(df, chunk_size=1000):
"""Load data in chunks to manage memory"""
total_rows = len(df)
for i in range(0, total_rows, chunk_size):
chunk = df.iloc[i:i + chunk_size]
load_metadata(chunk)
print(f"Loaded chunk {i//chunk_size + 1}: {len(chunk)} rows")This skill provides everything needed to build, deploy, and extend the Harvard Artifacts Collection ETL Analytics application for real-world data engineering scenarios.
python
def chunked_load(df, chunk_size=1000):
"""Load data in chunks to manage memory"""
total_rows = len(df)
for i in range(0, total_rows, chunk_size):
chunk = df.iloc[i:i + chunk_size]
load_metadata(chunk)
print(f"Loaded chunk {i//chunk_size + 1}: {len(chunk)} rows")本技能提供了构建、部署和扩展哈佛文物收藏ETL分析应用所需的全部内容,适用于真实场景下的数据工程需求。