harvard-artifacts-data-engineering-analytics
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseHarvard Artifacts Data Engineering & Analytics
哈佛艺术品数据工程与分析
What This Project Does
本项目功能
An end-to-end data engineering and analytics application that:
- Extracts artifact data from Harvard Art Museums API with pagination handling
- Transforms nested JSON into relational database tables
- Loads structured data into MySQL/TiDB Cloud
- Provides 20+ predefined SQL analytics queries
- Visualizes insights through interactive Streamlit dashboards
Architecture: API → ETL → SQL → Analytics → Visualization
一款端到端的数据工程与分析应用,具备以下功能:
- 处理分页逻辑,从哈佛艺术博物馆API提取艺术品数据
- 将嵌套JSON转换为关系型数据库表
- 将结构化数据加载至MySQL/TiDB Cloud
- 提供20+预定义SQL分析查询语句
- 通过交互式Streamlit仪表盘可视化数据洞察
架构:API → ETL → SQL → 分析 → 可视化
Installation
安装步骤
bash
undefinedbash
undefinedClone the repository
克隆仓库
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
Install dependencies
安装依赖
pip install -r requirements.txt
pip install -r requirements.txt
Required packages
必备包
pip install streamlit pandas requests mysql-connector-python plotly sqlalchemy
undefinedpip install streamlit pandas requests mysql-connector-python plotly sqlalchemy
undefinedConfiguration
配置说明
1. Harvard Art Museums API Key
1. 哈佛艺术博物馆API密钥
Get your API key from: https://harvardartmuseums.org/collections/api
python
undefined从以下地址获取API密钥:https://harvardartmuseums.org/collections/api
python
undefinedSet as environment variable
设置为环境变量
export HARVARD_API_KEY="your_api_key_here"
export HARVARD_API_KEY="your_api_key_here"
Or configure in app
或在应用中配置
api_key = os.getenv('HARVARD_API_KEY')
undefinedapi_key = os.getenv('HARVARD_API_KEY')
undefined2. Database Configuration
2. 数据库配置
python
undefinedpython
undefinedMySQL/TiDB Cloud connection
MySQL/TiDB Cloud连接配置
db_config = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_NAME', 'harvard_artifacts'),
'port': int(os.getenv('DB_PORT', 3306))
}
undefineddb_config = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_NAME', 'harvard_artifacts'),
'port': int(os.getenv('DB_PORT', 3306))
}
undefined3. Environment Variables Setup
3. 环境变量设置
bash
undefinedbash
undefined.env file
.env文件
HARVARD_API_KEY=your_api_key
DB_HOST=your_db_host
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifacts
DB_PORT=3306
undefinedHARVARD_API_KEY=your_api_key
DB_HOST=your_db_host
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifacts
DB_PORT=3306
undefinedDatabase Schema
数据库模式
Create Tables
创建数据表
sql
-- Artifact Metadata Table
CREATE TABLE artifactmetadata (
artifact_id INT PRIMARY KEY,
title VARCHAR(500),
culture VARCHAR(255),
period VARCHAR(255),
century VARCHAR(100),
classification VARCHAR(255),
department VARCHAR(255),
division VARCHAR(255),
dated VARCHAR(255),
medium VARCHAR(500),
dimensions VARCHAR(500),
credit_line TEXT,
accession_number VARCHAR(100),
provenance TEXT,
copyright TEXT,
url VARCHAR(500),
last_updated TIMESTAMP
);
-- Artifact Media Table
CREATE TABLE artifactmedia (
media_id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
image_url VARCHAR(500),
base_url VARCHAR(500),
alt_text TEXT,
caption TEXT,
technique VARCHAR(255),
width INT,
height INT,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(artifact_id)
);
-- Artifact Colors Table
CREATE TABLE artifactcolors (
color_id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
color_hex VARCHAR(10),
color_name VARCHAR(100),
color_percent FLOAT,
css_color VARCHAR(100),
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(artifact_id)
);sql
-- 艺术品元数据表
CREATE TABLE artifactmetadata (
artifact_id INT PRIMARY KEY,
title VARCHAR(500),
culture VARCHAR(255),
period VARCHAR(255),
century VARCHAR(100),
classification VARCHAR(255),
department VARCHAR(255),
division VARCHAR(255),
dated VARCHAR(255),
medium VARCHAR(500),
dimensions VARCHAR(500),
credit_line TEXT,
accession_number VARCHAR(100),
provenance TEXT,
copyright TEXT,
url VARCHAR(500),
last_updated TIMESTAMP
);
-- 艺术品媒体表
CREATE TABLE artifactmedia (
media_id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
image_url VARCHAR(500),
base_url VARCHAR(500),
alt_text TEXT,
caption TEXT,
technique VARCHAR(255),
width INT,
height INT,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(artifact_id)
);
-- 艺术品颜色表
CREATE TABLE artifactcolors (
color_id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
color_hex VARCHAR(10),
color_name VARCHAR(100),
color_percent FLOAT,
css_color VARCHAR(100),
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(artifact_id)
);API Integration
API集成
Extract Artifacts with Pagination
带分页的艺术品数据提取
python
import requests
import time
def fetch_artifacts(api_key, total_records=100, page_size=100):
"""Fetch artifacts from Harvard Art Museums API with pagination"""
base_url = "https://api.harvardartmuseums.org/object"
all_artifacts = []
page = 1
while len(all_artifacts) < total_records:
params = {
'apikey': api_key,
'size': page_size,
'page': page,
'hasimage': 1 # Only artifacts with images
}
try:
response = requests.get(base_url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
records = data.get('records', [])
if not records:
break
all_artifacts.extend(records)
page += 1
# Rate limiting - respect API limits
time.sleep(0.5)
except requests.exceptions.RequestException as e:
print(f"Error fetching page {page}: {e}")
break
return all_artifacts[:total_records]python
import requests
import time
def fetch_artifacts(api_key, total_records=100, page_size=100):
"""从哈佛艺术博物馆API提取艺术品数据并处理分页"""
base_url = "https://api.harvardartmuseums.org/object"
all_artifacts = []
page = 1
while len(all_artifacts) < total_records:
params = {
'apikey': api_key,
'size': page_size,
'page': page,
'hasimage': 1 # 仅提取带图片的艺术品
}
try:
response = requests.get(base_url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
records = data.get('records', [])
if not records:
break
all_artifacts.extend(records)
page += 1
# 请求频率限制 - 遵守API限制
time.sleep(0.5)
except requests.exceptions.RequestException as e:
print(f"提取第{page}页时出错: {e}")
break
return all_artifacts[:total_records]Handle Nested JSON
处理嵌套JSON
python
def extract_artifact_metadata(artifact):
"""Extract metadata from artifact JSON"""
return {
'artifact_id': artifact.get('id'),
'title': artifact.get('title'),
'culture': artifact.get('culture'),
'period': artifact.get('period'),
'century': artifact.get('century'),
'classification': artifact.get('classification'),
'department': artifact.get('department'),
'division': artifact.get('division'),
'dated': artifact.get('dated'),
'medium': artifact.get('medium'),
'dimensions': artifact.get('dimensions'),
'credit_line': artifact.get('creditline'),
'accession_number': artifact.get('accessionmethod'),
'provenance': artifact.get('provenance'),
'copyright': artifact.get('copyright'),
'url': artifact.get('url'),
'last_updated': artifact.get('lastupdate')
}
def extract_artifact_media(artifact):
"""Extract media/images from artifact JSON"""
media_list = []
artifact_id = artifact.get('id')
for image in artifact.get('images', []):
media_list.append({
'artifact_id': artifact_id,
'image_url': image.get('iiifbaseuri'),
'base_url': image.get('baseimageurl'),
'alt_text': image.get('alttext'),
'caption': image.get('caption'),
'technique': image.get('technique'),
'width': image.get('width'),
'height': image.get('height')
})
return media_list
def extract_artifact_colors(artifact):
"""Extract color data from artifact JSON"""
colors_list = []
artifact_id = artifact.get('id')
for color in artifact.get('colors', []):
colors_list.append({
'artifact_id': artifact_id,
'color_hex': color.get('hex'),
'color_name': color.get('color'),
'color_percent': color.get('percent'),
'css_color': color.get('css3')
})
return colors_listpython
def extract_artifact_metadata(artifact):
"""从艺术品JSON中提取元数据"""
return {
'artifact_id': artifact.get('id'),
'title': artifact.get('title'),
'culture': artifact.get('culture'),
'period': artifact.get('period'),
'century': artifact.get('century'),
'classification': artifact.get('classification'),
'department': artifact.get('department'),
'division': artifact.get('division'),
'dated': artifact.get('dated'),
'medium': artifact.get('medium'),
'dimensions': artifact.get('dimensions'),
'credit_line': artifact.get('creditline'),
'accession_number': artifact.get('accessionmethod'),
'provenance': artifact.get('provenance'),
'copyright': artifact.get('copyright'),
'url': artifact.get('url'),
'last_updated': artifact.get('lastupdate')
}
def extract_artifact_media(artifact):
"""从艺术品JSON中提取媒体/图片数据"""
media_list = []
artifact_id = artifact.get('id')
for image in artifact.get('images', []):
media_list.append({
'artifact_id': artifact_id,
'image_url': image.get('iiifbaseuri'),
'base_url': image.get('baseimageurl'),
'alt_text': image.get('alttext'),
'caption': image.get('caption'),
'technique': image.get('technique'),
'width': image.get('width'),
'height': image.get('height')
})
return media_list
def extract_artifact_colors(artifact):
"""从艺术品JSON中提取颜色数据"""
colors_list = []
artifact_id = artifact.get('id')
for color in artifact.get('colors', []):
colors_list.append({
'artifact_id': artifact_id,
'color_hex': color.get('hex'),
'color_name': color.get('color'),
'color_percent': color.get('percent'),
'css_color': color.get('css3')
})
return colors_listETL Pipeline
ETL管道
Complete ETL Workflow
完整ETL工作流
python
import pandas as pd
import mysql.connector
from sqlalchemy import create_engine
class HarvardArtifactsETL:
def __init__(self, api_key, db_config):
self.api_key = api_key
self.db_config = db_config
self.engine = create_engine(
f"mysql+mysqlconnector://{db_config['user']}:{db_config['password']}@"
f"{db_config['host']}:{db_config['port']}/{db_config['database']}"
)
def extract(self, num_records=100):
"""Extract data from API"""
print(f"Extracting {num_records} artifacts...")
artifacts = fetch_artifacts(self.api_key, num_records)
return artifacts
def transform(self, artifacts):
"""Transform nested JSON to relational format"""
print("Transforming data...")
metadata_list = []
media_list = []
colors_list = []
for artifact in artifacts:
metadata_list.append(extract_artifact_metadata(artifact))
media_list.extend(extract_artifact_media(artifact))
colors_list.extend(extract_artifact_colors(artifact))
df_metadata = pd.DataFrame(metadata_list)
df_media = pd.DataFrame(media_list)
df_colors = pd.DataFrame(colors_list)
# Clean data
df_metadata = df_metadata.fillna('')
df_media = df_media.fillna('')
df_colors = df_colors.fillna(0)
return df_metadata, df_media, df_colors
def load(self, df_metadata, df_media, df_colors):
"""Load data into SQL database"""
print("Loading data to database...")
try:
# Load with batch inserts
df_metadata.to_sql('artifactmetadata', self.engine,
if_exists='append', index=False)
df_media.to_sql('artifactmedia', self.engine,
if_exists='append', index=False)
df_colors.to_sql('artifactcolors', self.engine,
if_exists='append', index=False)
print(f"✓ Loaded {len(df_metadata)} artifacts")
print(f"✓ Loaded {len(df_media)} media records")
print(f"✓ Loaded {len(df_colors)} color records")
except Exception as e:
print(f"Error loading data: {e}")
raise
def run(self, num_records=100):
"""Execute full ETL pipeline"""
artifacts = self.extract(num_records)
df_metadata, df_media, df_colors = self.transform(artifacts)
self.load(df_metadata, df_media, df_colors)
return Truepython
import pandas as pd
import mysql.connector
from sqlalchemy import create_engine
class HarvardArtifactsETL:
def __init__(self, api_key, db_config):
self.api_key = api_key
self.db_config = db_config
self.engine = create_engine(
f"mysql+mysqlconnector://{db_config['user']}:{db_config['password']}@"
f"{db_config['host']}:{db_config['port']}/{db_config['database']}"
)
def extract(self, num_records=100):
"""从API提取数据"""
print(f"正在提取{num_records}件艺术品数据...")
artifacts = fetch_artifacts(self.api_key, num_records)
return artifacts
def transform(self, artifacts):
"""将嵌套JSON转换为关系型格式"""
print("正在转换数据...")
metadata_list = []
media_list = []
colors_list = []
for artifact in artifacts:
metadata_list.append(extract_artifact_metadata(artifact))
media_list.extend(extract_artifact_media(artifact))
colors_list.extend(extract_artifact_colors(artifact))
df_metadata = pd.DataFrame(metadata_list)
df_media = pd.DataFrame(media_list)
df_colors = pd.DataFrame(colors_list)
# 数据清洗
df_metadata = df_metadata.fillna('')
df_media = df_media.fillna('')
df_colors = df_colors.fillna(0)
return df_metadata, df_media, df_colors
def load(self, df_metadata, df_media, df_colors):
"""将数据加载至SQL数据库"""
print("正在将数据加载至数据库...")
try:
# 批量插入数据
df_metadata.to_sql('artifactmetadata', self.engine,
if_exists='append', index=False)
df_media.to_sql('artifactmedia', self.engine,
if_exists='append', index=False)
df_colors.to_sql('artifactcolors', self.engine,
if_exists='append', index=False)
print(f"✓ 已加载{len(df_metadata)}件艺术品数据")
print(f"✓ 已加载{len(df_media)}条媒体记录")
print(f"✓ 已加载{len(df_colors)}条颜色记录")
except Exception as e:
print(f"加载数据时出错: {e}")
raise
def run(self, num_records=100):
"""执行完整ETL管道"""
artifacts = self.extract(num_records)
df_metadata, df_media, df_colors = self.transform(artifacts)
self.load(df_metadata, df_media, df_colors)
return TrueUsage
使用示例
etl = HarvardArtifactsETL(
api_key=os.getenv('HARVARD_API_KEY'),
db_config=db_config
)
etl.run(num_records=500)
undefinedetl = HarvardArtifactsETL(
api_key=os.getenv('HARVARD_API_KEY'),
db_config=db_config
)
etl.run(num_records=500)
undefinedAnalytics SQL Queries
分析SQL查询
Common Analytical Queries
常见分析查询语句
python
undefinedpython
undefinedSample analytics queries
示例分析查询
ANALYTICS_QUERIES = {
"artifacts_by_culture": """
SELECT culture, COUNT(*) as count
FROM artifactmetadata
WHERE culture != ''
GROUP BY culture
ORDER BY count DESC
LIMIT 15
""",
"artifacts_by_century": """
SELECT century, COUNT(*) as count
FROM artifactmetadata
WHERE century != ''
GROUP BY century
ORDER BY count DESC
""",
"media_availability": """
SELECT
CASE WHEN m.artifact_id IS NULL THEN 'No Media' ELSE 'Has Media' END as media_status,
COUNT(*) as count
FROM artifactmetadata a
LEFT JOIN artifactmedia m ON a.artifact_id = m.artifact_id
GROUP BY media_status
""",
"top_colors": """
SELECT color_name, COUNT(*) as count, AVG(color_percent) as avg_percent
FROM artifactcolors
WHERE color_name != ''
GROUP BY color_name
ORDER BY count DESC
LIMIT 10
""",
"artifacts_by_department": """
SELECT department, COUNT(*) as count
FROM artifactmetadata
WHERE department != ''
GROUP BY department
ORDER BY count DESC
""",
"classification_distribution": """
SELECT classification, COUNT(*) as count
FROM artifactmetadata
WHERE classification != ''
GROUP BY classification
ORDER BY count DESC
LIMIT 20
"""}
def execute_query(query, db_config):
"""Execute SQL query and return DataFrame"""
conn = mysql.connector.connect(**db_config)
df = pd.read_sql(query, conn)
conn.close()
return df
undefinedANALYTICS_QUERIES = {
"artifacts_by_culture": """
SELECT culture, COUNT(*) as count
FROM artifactmetadata
WHERE culture != ''
GROUP BY culture
ORDER BY count DESC
LIMIT 15
""",
"artifacts_by_century": """
SELECT century, COUNT(*) as count
FROM artifactmetadata
WHERE century != ''
GROUP BY century
ORDER BY count DESC
""",
"media_availability": """
SELECT
CASE WHEN m.artifact_id IS NULL THEN '无媒体' ELSE '有媒体' END as media_status,
COUNT(*) as count
FROM artifactmetadata a
LEFT JOIN artifactmedia m ON a.artifact_id = m.artifact_id
GROUP BY media_status
""",
"top_colors": """
SELECT color_name, COUNT(*) as count, AVG(color_percent) as avg_percent
FROM artifactcolors
WHERE color_name != ''
GROUP BY color_name
ORDER BY count DESC
LIMIT 10
""",
"artifacts_by_department": """
SELECT department, COUNT(*) as count
FROM artifactmetadata
WHERE department != ''
GROUP BY department
ORDER BY count DESC
""",
"classification_distribution": """
SELECT classification, COUNT(*) as count
FROM artifactmetadata
WHERE classification != ''
GROUP BY classification
ORDER BY count DESC
LIMIT 20
"""}
def execute_query(query, db_config):
"""执行SQL查询并返回DataFrame"""
conn = mysql.connector.connect(**db_config)
df = pd.read_sql(query, conn)
conn.close()
return df
undefinedStreamlit Dashboard
Streamlit仪表盘
Main Application Structure
主应用结构
python
import streamlit as st
import plotly.express as px
st.set_page_config(page_title="Harvard Artifacts Analytics", layout="wide")
def main():
st.title("🎨 Harvard Art Museums - Data Analytics Dashboard")
# Sidebar configuration
st.sidebar.header("Configuration")
api_key = st.sidebar.text_input("Harvard API Key", type="password")
# Database connection
db_host = st.sidebar.text_input("Database Host", value="localhost")
db_user = st.sidebar.text_input("Database User", value="root")
db_password = st.sidebar.text_input("Database Password", type="password")
# ETL Section
st.header("📥 Data Collection")
num_records = st.number_input("Number of artifacts to fetch",
min_value=10, max_value=1000, value=100)
if st.button("Run ETL Pipeline"):
with st.spinner("Running ETL..."):
try:
db_config = {
'host': db_host,
'user': db_user,
'password': db_password,
'database': 'harvard_artifacts'
}
etl = HarvardArtifactsETL(api_key, db_config)
etl.run(num_records)
st.success(f"✓ Successfully processed {num_records} artifacts!")
except Exception as e:
st.error(f"Error: {e}")
# Analytics Section
st.header("📊 Analytics Dashboard")
query_options = list(ANALYTICS_QUERIES.keys())
selected_query = st.selectbox("Select Analysis", query_options)
if st.button("Run Analysis"):
with st.spinner("Executing query..."):
try:
df_result = execute_query(
ANALYTICS_QUERIES[selected_query],
db_config
)
# Display table
st.dataframe(df_result)
# Auto-generate visualization
if len(df_result.columns) >= 2:
fig = px.bar(df_result,
x=df_result.columns[0],
y=df_result.columns[1],
title=selected_query.replace('_', ' ').title())
st.plotly_chart(fig, use_container_width=True)
except Exception as e:
st.error(f"Query error: {e}")
if __name__ == "__main__":
main()python
import streamlit as st
import plotly.express as px
st.set_page_config(page_title="哈佛艺术品分析", layout="wide")
def main():
st.title("🎨 哈佛艺术博物馆 - 数据分析仪表盘")
# 侧边栏配置
st.sidebar.header("配置项")
api_key = st.sidebar.text_input("哈佛API密钥", type="password")
# 数据库连接
db_host = st.sidebar.text_input("数据库主机", value="localhost")
db_user = st.sidebar.text_input("数据库用户", value="root")
db_password = st.sidebar.text_input("数据库密码", type="password")
# ETL模块
st.header("📥 数据采集")
num_records = st.number_input("要获取的艺术品数量",
min_value=10, max_value=1000, value=100)
if st.button("运行ETL管道"):
with st.spinner("正在运行ETL..."):
try:
db_config = {
'host': db_host,
'user': db_user,
'password': db_password,
'database': 'harvard_artifacts'
}
etl = HarvardArtifactsETL(api_key, db_config)
etl.run(num_records)
st.success(f"✓ 成功处理{num_records}件艺术品数据!")
except Exception as e:
st.error(f"错误: {e}")
# 分析模块
st.header("📊 分析仪表盘")
query_options = list(ANALYTICS_QUERIES.keys())
selected_query = st.selectbox("选择分析项", query_options)
if st.button("运行分析"):
with st.spinner("正在执行查询..."):
try:
df_result = execute_query(
ANALYTICS_QUERIES[selected_query],
db_config
)
# 显示表格
st.dataframe(df_result)
# 自动生成可视化图表
if len(df_result.columns) >= 2:
fig = px.bar(df_result,
x=df_result.columns[0],
y=df_result.columns[1],
title=selected_query.replace('_', ' ').title())
st.plotly_chart(fig, use_container_width=True)
except Exception as e:
st.error(f"查询错误: {e}")
if __name__ == "__main__":
main()Run the App
运行应用
bash
streamlit run app.pybash
streamlit run app.pyCommon Patterns
通用模式
Batch Processing Large Datasets
大数据集批量处理
python
def batch_insert(df, table_name, engine, batch_size=1000):
"""Insert data in batches for better performance"""
total_rows = len(df)
for start_idx in range(0, total_rows, batch_size):
end_idx = min(start_idx + batch_size, total_rows)
batch = df.iloc[start_idx:end_idx]
batch.to_sql(table_name, engine, if_exists='append', index=False)
print(f"Inserted batch {start_idx}-{end_idx} of {total_rows}")python
def batch_insert(df, table_name, engine, batch_size=1000):
"""批量插入数据以提升性能"""
total_rows = len(df)
for start_idx in range(0, total_rows, batch_size):
end_idx = min(start_idx + batch_size, total_rows)
batch = df.iloc[start_idx:end_idx]
batch.to_sql(table_name, engine, if_exists='append', index=False)
print(f"已插入批次 {start_idx}-{end_idx},共{total_rows}条数据")Error Handling for API Calls
API调用错误处理
python
import time
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def create_session_with_retries():
"""Create requests session with retry logic"""
session = requests.Session()
retry = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return sessionpython
import time
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def create_session_with_retries():
"""创建带重试逻辑的requests会话"""
session = requests.Session()
retry = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return sessionData Quality Validation
数据质量验证
python
def validate_artifacts(df):
"""Validate artifact data quality"""
issues = []
# Check for required fields
if df['artifact_id'].isnull().any():
issues.append("Missing artifact IDs detected")
# Check for duplicates
duplicates = df['artifact_id'].duplicated().sum()
if duplicates > 0:
issues.append(f"{duplicates} duplicate artifact IDs found")
# Check data types
if not pd.api.types.is_numeric_dtype(df['artifact_id']):
issues.append("artifact_id should be numeric")
return issuespython
def validate_artifacts(df):
"""验证艺术品数据质量"""
issues = []
# 检查必填字段
if df['artifact_id'].isnull().any():
issues.append("检测到缺失的艺术品ID")
# 检查重复数据
duplicates = df['artifact_id'].duplicated().sum()
if duplicates > 0:
issues.append(f"发现{duplicates}个重复的艺术品ID")
# 检查数据类型
if not pd.api.types.is_numeric_dtype(df['artifact_id']):
issues.append("artifact_id应为数值类型")
return issuesTroubleshooting
故障排查
API Rate Limiting
API频率限制
python
undefinedpython
undefinedAdd delays between requests
在请求之间添加延迟
time.sleep(0.5) # 500ms delay
time.sleep(0.5) # 500毫秒延迟
Monitor rate limit headers
监控频率限制响应头
if 'X-RateLimit-Remaining' in response.headers:
remaining = int(response.headers['X-RateLimit-Remaining'])
if remaining < 10:
time.sleep(2)
undefinedif 'X-RateLimit-Remaining' in response.headers:
remaining = int(response.headers['X-RateLimit-Remaining'])
if remaining < 10:
time.sleep(2)
undefinedDatabase Connection Issues
数据库连接问题
python
undefinedpython
undefinedTest connection
测试连接
try:
conn = mysql.connector.connect(**db_config)
print("✓ Database connection successful")
conn.close()
except mysql.connector.Error as err:
print(f"✗ Connection failed: {err}")
undefinedtry:
conn = mysql.connector.connect(**db_config)
print("✓ 数据库连接成功")
conn.close()
except mysql.connector.Error as err:
print(f"✗ 连接失败: {err}")
undefinedMemory Management for Large Datasets
大数据集内存管理
python
undefinedpython
undefinedUse chunking for large API responses
对大型API响应使用分块处理
def fetch_artifacts_chunked(api_key, total_records, chunk_size=100):
for offset in range(0, total_records, chunk_size):
yield fetch_artifacts(api_key, chunk_size, page=offset//chunk_size)
undefineddef fetch_artifacts_chunked(api_key, total_records, chunk_size=100):
for offset in range(0, total_records, chunk_size):
yield fetch_artifacts(api_key, chunk_size, page=offset//chunk_size)
undefinedEmpty API Responses
空API响应处理
python
undefinedpython
undefinedHandle missing data gracefully
优雅处理缺失数据
records = data.get('records', [])
if not records:
print("No more records available")
break
records = data.get('records', [])
if not records:
print("无更多记录可用")
break
Validate response structure
验证响应结构
if 'info' not in data or 'records' not in data:
raise ValueError("Invalid API response structure")
undefinedif 'info' not in data or 'records' not in data:
raise ValueError("API响应结构无效")
undefined