harvard-artifacts-data-engineering-app

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Harvard Artifacts Collection Data Engineering Analytics App

哈佛文物收藏数据工程分析应用

Skill by ara.so — Data Skills collection.
ara.so提供的技能——数据技能合集。

Overview

概述

This project is a complete data engineering solution that extracts artifact data from the Harvard Art Museums API, transforms it into relational tables, loads it into SQL databases, and provides interactive analytics through a Streamlit dashboard. It demonstrates production-grade ETL pipelines, SQL analytics, and data visualization patterns.
Architecture Flow: API → ETL → SQL → Analytics → Visualization
本项目是一套完整的数据工程解决方案,从Harvard Art Museums API提取文物数据,将其转换为关系表,加载到SQL数据库中,并通过Streamlit仪表板提供交互式分析。它展示了生产级别的ETL管道、SQL分析和数据可视化模式。
架构流程: API → ETL → SQL → 分析 → 可视化

Installation

安装

bash
undefined
bash
undefined

Clone the repository

克隆仓库

git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App

Install dependencies

安装依赖

pip install -r requirements.txt
undefined
pip install -r requirements.txt
undefined

Required Dependencies

所需依赖

txt
streamlit
pandas
requests
mysql-connector-python
plotly
python-dotenv
txt
streamlit
pandas
requests
mysql-connector-python
plotly
python-dotenv

Configuration

配置

Environment Variables

环境变量

Create a
.env
file in the project root:
bash
undefined
在项目根目录创建
.env
文件:
bash
undefined

Harvard Art Museums API

Harvard Art Museums API

HARVARD_API_KEY=your_api_key_here
HARVARD_API_KEY=your_api_key_here

Database Configuration

数据库配置

DB_HOST=your_database_host DB_PORT=3306 DB_USER=your_database_user DB_PASSWORD=your_database_password DB_NAME=harvard_artifacts
undefined
DB_HOST=your_database_host DB_PORT=3306 DB_USER=your_database_user DB_PASSWORD=your_database_password DB_NAME=harvard_artifacts
undefined

Get Harvard API Key

获取Harvard API密钥

  1. Visit https://harvardartmuseums.org/collections/api
  2. Register for a free API key
  3. Add to your
    .env
    file
  1. 访问https://harvardartmuseums.org/collections/api
  2. 注册获取免费API密钥
  3. 将密钥添加到
    .env
    文件中

Database Setup

数据库设置

The app supports MySQL and TiDB Cloud. Create the database schema:
sql
CREATE DATABASE IF NOT EXISTS harvard_artifacts;
USE harvard_artifacts;

CREATE TABLE IF NOT EXISTS artifactmetadata (
    id INT PRIMARY KEY,
    title VARCHAR(500),
    culture VARCHAR(200),
    classification VARCHAR(200),
    century VARCHAR(100),
    dated VARCHAR(200),
    department VARCHAR(200),
    division VARCHAR(200),
    period VARCHAR(200),
    provenance TEXT,
    description TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE IF NOT EXISTS artifactmedia (
    media_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    baseimageurl VARCHAR(500),
    primaryimageurl VARCHAR(500),
    imagepermissionlevel INT,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

CREATE TABLE IF NOT EXISTS artifactcolors (
    color_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    color VARCHAR(50),
    spectrum VARCHAR(50),
    hue VARCHAR(50),
    percent FLOAT,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);
该应用支持MySQL和TiDB Cloud。创建数据库模式:
sql
CREATE DATABASE IF NOT EXISTS harvard_artifacts;
USE harvard_artifacts;

CREATE TABLE IF NOT EXISTS artifactmetadata (
    id INT PRIMARY KEY,
    title VARCHAR(500),
    culture VARCHAR(200),
    classification VARCHAR(200),
    century VARCHAR(100),
    dated VARCHAR(200),
    department VARCHAR(200),
    division VARCHAR(200),
    period VARCHAR(200),
    provenance TEXT,
    description TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE IF NOT EXISTS artifactmedia (
    media_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    baseimageurl VARCHAR(500),
    primaryimageurl VARCHAR(500),
    imagepermissionlevel INT,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

CREATE TABLE IF NOT EXISTS artifactcolors (
    color_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    color VARCHAR(50),
    spectrum VARCHAR(50),
    hue VARCHAR(50),
    percent FLOAT,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

Running the Application

运行应用

bash
streamlit run app.py
The app will be available at
http://localhost:8501
bash
streamlit run app.py
应用将在
http://localhost:8501
地址可用

Core Components

核心组件

1. API Data Collection

1. API数据采集

python
import requests
import os
from dotenv import load_dotenv

load_dotenv()

def fetch_artifacts(api_key, page=1, size=100):
    """
    Fetch artifacts from Harvard Art Museums API
    
    Args:
        api_key: Harvard API key
        page: Page number for pagination
        size: Number of records per page (max 100)
    
    Returns:
        dict: JSON response containing artifact records
    """
    base_url = "https://api.harvardartmuseums.org/object"
    params = {
        "apikey": api_key,
        "page": page,
        "size": size,
        "hasimage": 1  # Only artifacts with images
    }
    
    response = requests.get(base_url, params=params)
    response.raise_for_status()
    return response.json()

def collect_multiple_pages(api_key, num_pages=10):
    """Collect artifacts from multiple pages"""
    all_artifacts = []
    
    for page in range(1, num_pages + 1):
        data = fetch_artifacts(api_key, page=page)
        all_artifacts.extend(data.get('records', []))
        print(f"Collected page {page}, total artifacts: {len(all_artifacts)}")
    
    return all_artifacts
python
import requests
import os
from dotenv import load_dotenv

load_dotenv()

def fetch_artifacts(api_key, page=1, size=100):
    """
    从Harvard Art Museums API获取文物数据
    
    参数:
        api_key: Harvard API密钥
        page: 分页页码
        size: 每页记录数(最大100)
    
    返回:
        dict: 包含文物记录的JSON响应
    """
    base_url = "https://api.harvardartmuseums.org/object"
    params = {
        "apikey": api_key,
        "page": page,
        "size": size,
        "hasimage": 1  # 仅获取带图片的文物
    }
    
    response = requests.get(base_url, params=params)
    response.raise_for_status()
    return response.json()

def collect_multiple_pages(api_key, num_pages=10):
    """从多页采集文物数据"""
    all_artifacts = []
    
    for page in range(1, num_pages + 1):
        data = fetch_artifacts(api_key, page=page)
        all_artifacts.extend(data.get('records', []))
        print(f"已采集第{page}页,累计文物数:{len(all_artifacts)}")
    
    return all_artifacts

2. ETL Pipeline

2. ETL管道

python
import pandas as pd
import mysql.connector
from typing import List, Dict

class ArtifactETL:
    """ETL pipeline for Harvard artifact data"""
    
    def __init__(self, db_config):
        self.db_config = db_config
        self.connection = None
    
    def connect_db(self):
        """Establish database connection"""
        self.connection = mysql.connector.connect(**self.db_config)
        return self.connection
    
    def extract_metadata(self, artifacts: List[Dict]) -> pd.DataFrame:
        """Extract artifact metadata"""
        metadata = []
        
        for artifact in artifacts:
            metadata.append({
                'id': artifact.get('id'),
                'title': artifact.get('title', '')[:500],
                'culture': artifact.get('culture', '')[:200],
                'classification': artifact.get('classification', '')[:200],
                'century': artifact.get('century', '')[:100],
                'dated': artifact.get('dated', '')[:200],
                'department': artifact.get('department', '')[:200],
                'division': artifact.get('division', '')[:200],
                'period': artifact.get('period', '')[:200],
                'provenance': artifact.get('provenance', ''),
                'description': artifact.get('description', '')
            })
        
        return pd.DataFrame(metadata)
    
    def extract_media(self, artifacts: List[Dict]) -> pd.DataFrame:
        """Extract artifact media/image data"""
        media = []
        
        for artifact in artifacts:
            artifact_id = artifact.get('id')
            media.append({
                'artifact_id': artifact_id,
                'baseimageurl': artifact.get('baseimageurl', ''),
                'primaryimageurl': artifact.get('primaryimageurl', ''),
                'imagepermissionlevel': artifact.get('imagepermissionlevel', 0)
            })
        
        return pd.DataFrame(media)
    
    def extract_colors(self, artifacts: List[Dict]) -> pd.DataFrame:
        """Extract color data from artifacts"""
        colors = []
        
        for artifact in artifacts:
            artifact_id = artifact.get('id')
            color_list = artifact.get('colors', [])
            
            for color in color_list:
                colors.append({
                    'artifact_id': artifact_id,
                    'color': color.get('color', ''),
                    'spectrum': color.get('spectrum', ''),
                    'hue': color.get('hue', ''),
                    'percent': color.get('percent', 0.0)
                })
        
        return pd.DataFrame(colors)
    
    def load_to_db(self, df: pd.DataFrame, table_name: str):
        """Load dataframe to SQL table"""
        cursor = self.connection.cursor()
        
        # Generate INSERT statement
        cols = ', '.join(df.columns)
        placeholders = ', '.join(['%s'] * len(df.columns))
        insert_sql = f"INSERT IGNORE INTO {table_name} ({cols}) VALUES ({placeholders})"
        
        # Batch insert
        data_tuples = [tuple(row) for row in df.values]
        cursor.executemany(insert_sql, data_tuples)
        self.connection.commit()
        
        print(f"Loaded {cursor.rowcount} rows into {table_name}")
        cursor.close()
    
    def run_pipeline(self, artifacts: List[Dict]):
        """Execute full ETL pipeline"""
        self.connect_db()
        
        # Extract
        metadata_df = self.extract_metadata(artifacts)
        media_df = self.extract_media(artifacts)
        colors_df = self.extract_colors(artifacts)
        
        # Load
        self.load_to_db(metadata_df, 'artifactmetadata')
        self.load_to_db(media_df, 'artifactmedia')
        self.load_to_db(colors_df, 'artifactcolors')
        
        self.connection.close()
        print("ETL pipeline completed successfully")
python
import pandas as pd
import mysql.connector
from typing import List, Dict

class ArtifactETL:
    """用于哈佛文物数据的ETL管道"""
    
    def __init__(self, db_config):
        self.db_config = db_config
        self.connection = None
    
    def connect_db(self):
        """建立数据库连接"""
        self.connection = mysql.connector.connect(**self.db_config)
        return self.connection
    
    def extract_metadata(self, artifacts: List[Dict]) -> pd.DataFrame:
        """提取文物元数据"""
        metadata = []
        
        for artifact in artifacts:
            metadata.append({
                'id': artifact.get('id'),
                'title': artifact.get('title', '')[:500],
                'culture': artifact.get('culture', '')[:200],
                'classification': artifact.get('classification', '')[:200],
                'century': artifact.get('century', '')[:100],
                'dated': artifact.get('dated', '')[:200],
                'department': artifact.get('department', '')[:200],
                'division': artifact.get('division', '')[:200],
                'period': artifact.get('period', '')[:200],
                'provenance': artifact.get('provenance', ''),
                'description': artifact.get('description', '')
            })
        
        return pd.DataFrame(metadata)
    
    def extract_media(self, artifacts: List[Dict]) -> pd.DataFrame:
        """提取文物媒体/图片数据"""
        media = []
        
        for artifact in artifacts:
            artifact_id = artifact.get('id')
            media.append({
                'artifact_id': artifact_id,
                'baseimageurl': artifact.get('baseimageurl', ''),
                'primaryimageurl': artifact.get('primaryimageurl', ''),
                'imagepermissionlevel': artifact.get('imagepermissionlevel', 0)
            })
        
        return pd.DataFrame(media)
    
    def extract_colors(self, artifacts: List[Dict]) -> pd.DataFrame:
        """提取文物中的颜色数据"""
        colors = []
        
        for artifact in artifacts:
            artifact_id = artifact.get('id')
            color_list = artifact.get('colors', [])
            
            for color in color_list:
                colors.append({
                    'artifact_id': artifact_id,
                    'color': color.get('color', ''),
                    'spectrum': color.get('spectrum', ''),
                    'hue': color.get('hue', ''),
                    'percent': color.get('percent', 0.0)
                })
        
        return pd.DataFrame(colors)
    
    def load_to_db(self, df: pd.DataFrame, table_name: str):
        """将数据帧加载到SQL表中"""
        cursor = self.connection.cursor()
        
        # 生成INSERT语句
        cols = ', '.join(df.columns)
        placeholders = ', '.join(['%s'] * len(df.columns))
        insert_sql = f"INSERT IGNORE INTO {table_name} ({cols}) VALUES ({placeholders})"
        
        # 批量插入
        data_tuples = [tuple(row) for row in df.values]
        cursor.executemany(insert_sql, data_tuples)
        self.connection.commit()
        
        print(f"已将{cursor.rowcount}行数据加载到{table_name}")
        cursor.close()
    
    def run_pipeline(self, artifacts: List[Dict]):
        """执行完整ETL管道"""
        self.connect_db()
        
        # 提取
        metadata_df = self.extract_metadata(artifacts)
        media_df = self.extract_media(artifacts)
        colors_df = self.extract_colors(artifacts)
        
        # 加载
        self.load_to_db(metadata_df, 'artifactmetadata')
        self.load_to_db(media_df, 'artifactmedia')
        self.load_to_db(colors_df, 'artifactcolors')
        
        self.connection.close()
        print("ETL管道执行成功")

3. SQL Analytics Queries

3. SQL分析查询

python
undefined
python
undefined

Example analytical queries

示例分析查询

ANALYTICS_QUERIES = { "artifacts_by_culture": """ SELECT culture, COUNT(*) as count FROM artifactmetadata WHERE culture IS NOT NULL AND culture != '' GROUP BY culture ORDER BY count DESC LIMIT 20 """,
"artifacts_by_century": """
    SELECT century, COUNT(*) as count
    FROM artifactmetadata
    WHERE century IS NOT NULL AND century != ''
    GROUP BY century
    ORDER BY count DESC
""",

"department_distribution": """
    SELECT department, COUNT(*) as artifact_count
    FROM artifactmetadata
    GROUP BY department
    ORDER BY artifact_count DESC
""",

"top_colors": """
    SELECT color, COUNT(*) as frequency
    FROM artifactcolors
    WHERE color IS NOT NULL
    GROUP BY color
    ORDER BY frequency DESC
    LIMIT 15
""",

"artifacts_with_images": """
    SELECT 
        CASE 
            WHEN primaryimageurl IS NOT NULL THEN 'Has Image'
            ELSE 'No Image'
        END as image_status,
        COUNT(*) as count
    FROM artifactmedia
    GROUP BY image_status
""",

"color_diversity": """
    SELECT artifact_id, COUNT(DISTINCT color) as color_count
    FROM artifactcolors
    GROUP BY artifact_id
    ORDER BY color_count DESC
    LIMIT 20
"""
}
def execute_query(connection, query_name): """Execute predefined analytics query""" cursor = connection.cursor(dictionary=True) cursor.execute(ANALYTICS_QUERIES[query_name]) results = cursor.fetchall() cursor.close() return pd.DataFrame(results)
undefined
ANALYTICS_QUERIES = { "artifacts_by_culture": """ SELECT culture, COUNT(*) as count FROM artifactmetadata WHERE culture IS NOT NULL AND culture != '' GROUP BY culture ORDER BY count DESC LIMIT 20 """,
"artifacts_by_century": """
    SELECT century, COUNT(*) as count
    FROM artifactmetadata
    WHERE century IS NOT NULL AND century != ''
    GROUP BY century
    ORDER BY count DESC
""",

"department_distribution": """
    SELECT department, COUNT(*) as artifact_count
    FROM artifactmetadata
    GROUP BY department
    ORDER BY artifact_count DESC
""",

"top_colors": """
    SELECT color, COUNT(*) as frequency
    FROM artifactcolors
    WHERE color IS NOT NULL
    GROUP BY color
    ORDER BY frequency DESC
    LIMIT 15
""",

"artifacts_with_images": """
    SELECT 
        CASE 
            WHEN primaryimageurl IS NOT NULL THEN 'Has Image'
            ELSE 'No Image'
        END as image_status,
        COUNT(*) as count
    FROM artifactmedia
    GROUP BY image_status
""",

"color_diversity": """
    SELECT artifact_id, COUNT(DISTINCT color) as color_count
    FROM artifactcolors
    GROUP BY artifact_id
    ORDER BY color_count DESC
    LIMIT 20
"""
}
def execute_query(connection, query_name): """执行预定义的分析查询""" cursor = connection.cursor(dictionary=True) cursor.execute(ANALYTICS_QUERIES[query_name]) results = cursor.fetchall() cursor.close() return pd.DataFrame(results)
undefined

4. Streamlit Dashboard

4. Streamlit仪表板

python
import streamlit as st
import plotly.express as px

def main():
    st.set_page_config(page_title="Harvard Artifacts Analytics", layout="wide")
    
    st.title("🎨 Harvard Art Museums Analytics Dashboard")
    st.markdown("End-to-end data engineering and analytics application")
    
    # Sidebar configuration
    st.sidebar.header("Configuration")
    api_key = st.sidebar.text_input("Harvard API Key", type="password", 
                                     value=os.getenv("HARVARD_API_KEY", ""))
    
    # Data collection section
    st.header("📥 Data Collection")
    col1, col2 = st.columns(2)
    
    with col1:
        num_pages = st.number_input("Number of pages to collect", min_value=1, max_value=50, value=5)
    
    with col2:
        if st.button("🚀 Start Data Collection"):
            with st.spinner("Collecting artifacts..."):
                artifacts = collect_multiple_pages(api_key, num_pages)
                st.success(f"Collected {len(artifacts)} artifacts!")
                
                # Run ETL
                db_config = {
                    'host': os.getenv('DB_HOST'),
                    'port': int(os.getenv('DB_PORT', 3306)),
                    'user': os.getenv('DB_USER'),
                    'password': os.getenv('DB_PASSWORD'),
                    'database': os.getenv('DB_NAME')
                }
                
                etl = ArtifactETL(db_config)
                etl.run_pipeline(artifacts)
                st.success("ETL pipeline completed!")
    
    # Analytics section
    st.header("📊 Analytics Dashboard")
    
    query_options = list(ANALYTICS_QUERIES.keys())
    selected_query = st.selectbox("Select Analysis", query_options)
    
    if st.button("Run Query"):
        connection = mysql.connector.connect(**db_config)
        df = execute_query(connection, selected_query)
        connection.close()
        
        st.dataframe(df)
        
        # Auto-generate visualization
        if len(df.columns) == 2:
            fig = px.bar(df, x=df.columns[0], y=df.columns[1], 
                        title=selected_query.replace('_', ' ').title())
            st.plotly_chart(fig, use_container_width=True)

if __name__ == "__main__":
    main()
python
import streamlit as st
import plotly.express as px

def main():
    st.set_page_config(page_title="Harvard Artifacts Analytics", layout="wide")
    
    st.title("🎨 哈佛艺术博物馆分析仪表板")
    st.markdown("端到端数据工程与分析应用")
    
    # 侧边栏配置
    st.sidebar.header("配置")
    api_key = st.sidebar.text_input("Harvard API密钥", type="password", 
                                     value=os.getenv("HARVARD_API_KEY", ""))
    
    # 数据采集区域
    st.header("📥 数据采集")
    col1, col2 = st.columns(2)
    
    with col1:
        num_pages = st.number_input("要采集的页数", min_value=1, max_value=50, value=5)
    
    with col2:
        if st.button("🚀 开始数据采集"):
            with st.spinner("正在采集文物数据..."):
                artifacts = collect_multiple_pages(api_key, num_pages)
                st.success(f"已采集{len(artifacts)}件文物!")
                
                # 运行ETL
                db_config = {
                    'host': os.getenv('DB_HOST'),
                    'port': int(os.getenv('DB_PORT', 3306)),
                    'user': os.getenv('DB_USER'),
                    'password': os.getenv('DB_PASSWORD'),
                    'database': os.getenv('DB_NAME')
                }
                
                etl = ArtifactETL(db_config)
                etl.run_pipeline(artifacts)
                st.success("ETL管道执行完成!")
    
    # 分析区域
    st.header("📊 分析仪表板")
    
    query_options = list(ANALYTICS_QUERIES.keys())
    selected_query = st.selectbox("选择分析项", query_options)
    
    if st.button("运行查询"):
        connection = mysql.connector.connect(**db_config)
        df = execute_query(connection, selected_query)
        connection.close()
        
        st.dataframe(df)
        
        # 自动生成可视化图表
        if len(df.columns) == 2:
            fig = px.bar(df, x=df.columns[0], y=df.columns[1], 
                        title=selected_query.replace('_', ' ').title())
            st.plotly_chart(fig, use_container_width=True)

if __name__ == "__main__":
    main()

Common Patterns

通用模式

Rate Limiting and Error Handling

速率限制与错误处理

python
import time
from requests.exceptions import RequestException

def fetch_with_retry(api_key, page, max_retries=3):
    """Fetch with exponential backoff"""
    for attempt in range(max_retries):
        try:
            data = fetch_artifacts(api_key, page)
            return data
        except RequestException as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"Retry {attempt + 1} after {wait_time}s")
                time.sleep(wait_time)
            else:
                raise e
python
import time
from requests.exceptions import RequestException

def fetch_with_retry(api_key, page, max_retries=3):
    """带指数退避的重试获取"""
    for attempt in range(max_retries):
        try:
            data = fetch_artifacts(api_key, page)
            return data
        except RequestException as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"第{attempt + 1}次重试,等待{wait_time}秒")
                time.sleep(wait_time)
            else:
                raise e

Incremental Data Loading

增量数据加载

python
def get_last_artifact_id(connection):
    """Get the highest artifact ID in database"""
    cursor = connection.cursor()
    cursor.execute("SELECT MAX(id) FROM artifactmetadata")
    result = cursor.fetchone()
    cursor.close()
    return result[0] if result[0] else 0

def incremental_load(api_key, db_config):
    """Load only new artifacts"""
    connection = mysql.connector.connect(**db_config)
    last_id = get_last_artifact_id(connection)
    connection.close()
    
    # Fetch artifacts with ID > last_id
    # Process only new records
python
def get_last_artifact_id(connection):
    """获取数据库中最大的文物ID"""
    cursor = connection.cursor()
    cursor.execute("SELECT MAX(id) FROM artifactmetadata")
    result = cursor.fetchone()
    cursor.close()
    return result[0] if result[0] else 0

def incremental_load(api_key, db_config):
    """仅加载新文物数据"""
    connection = mysql.connector.connect(**db_config)
    last_id = get_last_artifact_id(connection)
    connection.close()
    
    # 获取ID大于last_id的文物
    # 仅处理新记录

Troubleshooting

故障排除

API Key Issues

API密钥问题

Database Connection Errors

数据库连接错误

python
undefined
python
undefined

Test database connection

测试数据库连接

try: connection = mysql.connector.connect(**db_config) print("Database connected successfully") connection.close() except mysql.connector.Error as err: print(f"Database error: {err}")
undefined
try: connection = mysql.connector.connect(**db_config) print("数据库连接成功") connection.close() except mysql.connector.Error as err: print(f"数据库错误:{err}")
undefined

Memory Issues with Large Datasets

大数据集内存问题

python
undefined
python
undefined

Process in chunks

分块处理

def chunked_etl(artifacts, chunk_size=1000): for i in range(0, len(artifacts), chunk_size): chunk = artifacts[i:i+chunk_size] etl.run_pipeline(chunk)
undefined
def chunked_etl(artifacts, chunk_size=1000): for i in range(0, len(artifacts), chunk_size): chunk = artifacts[i:i+chunk_size] etl.run_pipeline(chunk)
undefined

Duplicate Records

重复记录

The ETL uses
INSERT IGNORE
to prevent duplicates. For updates, use:
sql
INSERT INTO table (columns) VALUES (values)
ON DUPLICATE KEY UPDATE column = VALUES(column)
ETL使用
INSERT IGNORE
来避免重复。如需更新,使用:
sql
INSERT INTO table (columns) VALUES (values)
ON DUPLICATE KEY UPDATE column = VALUES(column)