data-scraper-agent

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Scraper Agent

数据采集Agent

Build a production-ready, AI-powered data collection agent for any public data source. Runs on a schedule, enriches results with a free LLM, stores to a database, and improves over time.
Stack: Python · Gemini Flash (free) · GitHub Actions (free) · Notion / Sheets / Supabase
构建一个可投入生产环境的、AI驱动的任意公开数据源数据收集Agent。它可按计划运行,借助免费LLM丰富结果,存储至数据库,并能随时间不断优化。
技术栈:Python · Gemini Flash (免费) · GitHub Actions (免费) · Notion / Sheets / Supabase

When to Activate

适用场景

  • User wants to scrape or monitor any public website or API
  • User says "build a bot that checks...", "monitor X for me", "collect data from..."
  • User wants to track jobs, prices, news, repos, sports scores, events, listings
  • User asks how to automate data collection without paying for hosting
  • User wants an agent that gets smarter over time based on their decisions
  • 用户需要爬取或监控任意公开网站或API
  • 用户提出类似需求:"帮我做一个监控...的机器人"、"帮我监控X"、"从...收集数据"
  • 用户需要追踪招聘信息、产品价格、新闻、代码仓库、体育比分、活动、列表信息
  • 用户询问如何无需支付托管费用即可实现数据收集自动化
  • 用户需要一个能根据自身决策不断优化的Agent

Core Concepts

核心概念

The Three Layers

三层架构

Every data scraper agent has three layers:
COLLECT → ENRICH → STORE
  │           │        │
Scraper    AI (LLM)  Database
runs on    scores/   Notion /
schedule   summarises Sheets /
           & classifies Supabase
每个数据采集Agent都包含三层架构:
COLLECT → ENRICH → STORE
  │           │        │
Scraper    AI (LLM)  Database
runs on    scores/   Notion /
schedule   summarises Sheets /
           & classifies Supabase

Free Stack

免费技术栈

LayerToolWhy
Scraping
requests
+
BeautifulSoup
No cost, covers 80% of public sites
JS-rendered sites
playwright
(free)
When HTML scraping fails
AI enrichmentGemini Flash via REST API500 req/day, 1M tokens/day — free
StorageNotion APIFree tier, great UI for review
ScheduleGitHub Actions cronFree for public repos
LearningJSON feedback file in repoZero infra, persists in git
层级工具选择理由
数据爬取
requests
+
BeautifulSoup
零成本,覆盖80%的公开网站
JS渲染网站
playwright
(免费)
当HTML爬取失效时使用
AI数据增强Gemini Flash via REST API每日500次请求,100万token额度 — 完全免费
数据存储Notion API免费层级,具备优秀的可视化查看界面
任务调度GitHub Actions cron公开仓库可免费使用
学习优化仓库中的JSON反馈文件无需基础设施,内容持久化存储于Git中

AI Model Fallback Chain

AI模型降级链

Build agents to auto-fallback across Gemini models on quota exhaustion:
gemini-2.0-flash-lite (30 RPM) →
gemini-2.0-flash (15 RPM) →
gemini-2.5-flash (10 RPM) →
gemini-flash-lite-latest (fallback)
构建Agent时,需实现当配额耗尽时自动切换至其他Gemini模型:
gemini-2.0-flash-lite (30 RPM) →
gemini-2.0-flash (15 RPM) →
gemini-2.5-flash (10 RPM) →
gemini-flash-lite-latest (降级备选)

Batch API Calls for Efficiency

批量API调用提升效率

Never call the LLM once per item. Always batch:
python
undefined
切勿为每个条目单独调用LLM,务必使用批量调用:
python
undefined

BAD: 33 API calls for 33 items

错误示例:33个条目发起33次API调用

for item in items: result = call_ai(item) # 33 calls → hits rate limit
for item in items: result = call_ai(item) # 33次调用 → 触发速率限制

GOOD: 7 API calls for 33 items (batch size 5)

正确示例:33个条目发起7次API调用(批量大小为5)

for batch in chunks(items, size=5): results = call_ai(batch) # 7 calls → stays within free tier

---
for batch in chunks(items, size=5): results = call_ai(batch) # 7次调用 → 处于免费层级限制内

---

Workflow

工作流程

Step 1: Understand the Goal

步骤1:明确目标

Ask the user:
  1. What to collect: "What data source? URL / API / RSS / public endpoint?"
  2. What to extract: "What fields matter? Title, price, URL, date, score?"
  3. How to store: "Where should results go? Notion, Google Sheets, Supabase, or local file?"
  4. How to enrich: "Do you want AI to score, summarise, classify, or match each item?"
  5. Frequency: "How often should it run? Every hour, daily, weekly?"
Common examples to prompt:
  • Job boards → score relevance to resume
  • Product prices → alert on drops
  • GitHub repos → summarise new releases
  • News feeds → classify by topic + sentiment
  • Sports results → extract stats to tracker
  • Events calendar → filter by interest

向用户确认以下信息:
  1. 采集内容:"数据源是什么?URL / API / RSS / 公开端点?"
  2. 提取字段:"哪些字段是重点?标题、价格、URL、日期、评分?"
  3. 存储位置:"结果应存储至何处?Notion、Google Sheets、Supabase还是本地文件?"
  4. 数据增强:"是否需要AI为每个条目进行评分、总结、分类或匹配?"
  5. 执行频率:"运行频率是多少?每小时、每日还是每周?"
常见需求示例参考:
  • 招聘网站 → 根据简历匹配度评分
  • 产品价格 → 价格下降时发出提醒
  • GitHub仓库 → 总结新版本内容
  • 新闻推送 → 按主题+情感分类
  • 体育结果 → 提取统计数据至追踪表
  • 活动日历 → 根据兴趣筛选

Step 2: Design the Agent Architecture

步骤2:设计Agent架构

Generate this directory structure for the user:
my-agent/
├── config.yaml              # User customises this (keywords, filters, preferences)
├── profile/
│   └── context.md           # User context the AI uses (resume, interests, criteria)
├── scraper/
│   ├── __init__.py
│   ├── main.py              # Orchestrator: scrape → enrich → store
│   ├── filters.py           # Rule-based pre-filter (fast, before AI)
│   └── sources/
│       ├── __init__.py
│       └── source_name.py   # One file per data source
├── ai/
│   ├── __init__.py
│   ├── client.py            # Gemini REST client with model fallback
│   ├── pipeline.py          # Batch AI analysis
│   ├── jd_fetcher.py        # Fetch full content from URLs (optional)
│   └── memory.py            # Learn from user feedback
├── storage/
│   ├── __init__.py
│   └── notion_sync.py       # Or sheets_sync.py / supabase_sync.py
├── data/
│   └── feedback.json        # User decision history (auto-updated)
├── .env.example
├── setup.py                 # One-time DB/schema creation
├── enrich_existing.py       # Backfill AI scores on old rows
├── requirements.txt
└── .github/
    └── workflows/
        └── scraper.yml      # GitHub Actions schedule

为用户生成如下目录结构:
my-agent/
├── config.yaml              # 用户可自定义此文件(关键词、过滤器、偏好设置)
├── profile/
│   └── context.md           # AI使用的用户上下文信息(简历、兴趣、筛选标准)
├── scraper/
│   ├── __init__.py
│   ├── main.py              # 编排器:爬取 → 增强 → 存储
│   ├── filters.py           # 基于规则的预过滤器(快速处理,在AI之前执行)
│   └── sources/
│       ├── __init__.py
│       └── source_name.py   # 每个数据源对应一个文件
├── ai/
│   ├── __init__.py
│   ├── client.py            # 具备模型降级功能的Gemini REST客户端
│   ├── pipeline.py          # 批量AI分析
│   ├── jd_fetcher.py        # 从URL获取完整内容(可选)
│   └── memory.py            # 从用户反馈中学习
├── storage/
│   ├── __init__.py
│   └── notion_sync.py       # 或sheets_sync.py / supabase_sync.py
├── data/
│   └── feedback.json        # 用户决策历史记录(自动更新)
├── .env.example
├── setup.py                 # 一键创建数据库/表结构
├── enrich_existing.py       # 为旧数据批量补充AI评分
├── requirements.txt
└── .github/
    └── workflows/
        └── scraper.yml      # GitHub Actions调度配置

Step 3: Build the Scraper Source

步骤3:构建数据源采集器

Template for any data source:
python
undefined
任意数据源的模板:
python
undefined

scraper/sources/my_source.py

scraper/sources/my_source.py

""" [Source Name] — scrapes [what] from [where]. Method: [REST API / HTML scraping / RSS feed] """ import requests from bs4 import BeautifulSoup from datetime import datetime, timezone from scraper.filters import is_relevant
HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; research-bot/1.0)", }
def fetch() -> list[dict]: """ Returns a list of items with consistent schema. Each item must have at minimum: name, url, date_found. """ results = []
# ---- REST API source ----
resp = requests.get("https://api.example.com/items", headers=HEADERS, timeout=15)
if resp.status_code == 200:
    for item in resp.json().get("results", []):
        if not is_relevant(item.get("title", "")):
            continue
        results.append(_normalise(item))

return results
def _normalise(raw: dict) -> dict: """Convert raw API/HTML data to the standard schema.""" return { "name": raw.get("title", ""), "url": raw.get("link", ""), "source": "MySource", "date_found": datetime.now(timezone.utc).date().isoformat(), # add domain-specific fields here }

**HTML scraping pattern:**
```python
soup = BeautifulSoup(resp.text, "lxml")
for card in soup.select("[class*='listing']"):
    title = card.select_one("h2, h3").get_text(strip=True)
    link = card.select_one("a")["href"]
    if not link.startswith("http"):
        link = f"https://example.com{link}"
RSS feed pattern:
python
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.text)
for item in root.findall(".//item"):
    title = item.findtext("title", "")
    link = item.findtext("link", "")

""" [数据源名称] — 从[来源地址]爬取[内容类型]。 方法:[REST API / HTML爬取 / RSS订阅] """ import requests from bs4 import BeautifulSoup from datetime import datetime, timezone from scraper.filters import is_relevant
HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; research-bot/1.0)", }
def fetch() -> list[dict]: """ 返回符合统一格式的条目列表。 每个条目至少包含:name, url, date_found字段。 """ results = []
# ---- REST API数据源 ----
resp = requests.get("https://api.example.com/items", headers=HEADERS, timeout=15)
if resp.status_code == 200:
    for item in resp.json().get("results", []):
        if not is_relevant(item.get("title", "")):
            continue
        results.append(_normalise(item))

return results
def _normalise(raw: dict) -> dict: """将原始API/HTML数据转换为标准格式。""" return { "name": raw.get("title", ""), "url": raw.get("link", ""), "source": "MySource", "date_found": datetime.now(timezone.utc).date().isoformat(), # 在此添加领域特定字段 }

**HTML爬取模式:**
```python
soup = BeautifulSoup(resp.text, "lxml")
for card in soup.select("[class*='listing']"):
    title = card.select_one("h2, h3").get_text(strip=True)
    link = card.select_one("a")["href"]
    if not link.startswith("http"):
        link = f"https://example.com{link}"
RSS订阅模式:
python
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.text)
for item in root.findall(".//item"):
    title = item.findtext("title", "")
    link = item.findtext("link", "")

Step 4: Build the Gemini AI Client

步骤4:构建Gemini AI客户端

python
undefined
python
undefined

ai/client.py

ai/client.py

import os, json, time, requests
_last_call = 0.0
MODEL_FALLBACK = [ "gemini-2.0-flash-lite", "gemini-2.0-flash", "gemini-2.5-flash", "gemini-flash-lite-latest", ]
def generate(prompt: str, model: str = "", rate_limit: float = 7.0) -> dict: """Call Gemini with auto-fallback on 429. Returns parsed JSON or {}.""" global _last_call
api_key = os.environ.get("GEMINI_API_KEY", "")
if not api_key:
    return {}

elapsed = time.time() - _last_call
if elapsed < rate_limit:
    time.sleep(rate_limit - elapsed)

models = [model] + [m for m in MODEL_FALLBACK if m != model] if model else MODEL_FALLBACK
_last_call = time.time()

for m in models:
    url = f"https://generativelanguage.googleapis.com/v1beta/models/{m}:generateContent?key={api_key}"
    payload = {
        "contents": [{"parts": [{"text": prompt}]}],
        "generationConfig": {
            "responseMimeType": "application/json",
            "temperature": 0.3,
            "maxOutputTokens": 2048,
        },
    }
    try:
        resp = requests.post(url, json=payload, timeout=30)
        if resp.status_code == 200:
            return _parse(resp)
        if resp.status_code in (429, 404):
            time.sleep(1)
            continue
        return {}
    except requests.RequestException:
        return {}

return {}
def _parse(resp) -> dict: try: text = ( resp.json() .get("candidates", [{}])[0] .get("content", {}) .get("parts", [{}])[0] .get("text", "") .strip() ) if text.startswith("
"):             text = text.split("\n", 1)[-1].rsplit("
", 1)[0] return json.loads(text) except (json.JSONDecodeError, KeyError): return {}

---
import os, json, time, requests
_last_call = 0.0
MODEL_FALLBACK = [ "gemini-2.0-flash-lite", "gemini-2.0-flash", "gemini-2.5-flash", "gemini-flash-lite-latest", ]
def generate(prompt: str, model: str = "", rate_limit: float = 7.0) -> dict: """调用Gemini API,触发429错误时自动降级。返回解析后的JSON或空字典。""" global _last_call
api_key = os.environ.get("GEMINI_API_KEY", "")
if not api_key:
    return {}

elapsed = time.time() - _last_call
if elapsed < rate_limit:
    time.sleep(rate_limit - elapsed)

models = [model] + [m for m in MODEL_FALLBACK if m != model] if model else MODEL_FALLBACK
_last_call = time.time()

for m in models:
    url = f"https://generativelanguage.googleapis.com/v1beta/models/{m}:generateContent?key={api_key}"
    payload = {
        "contents": [{"parts": [{"text": prompt}]}],
        "generationConfig": {
            "responseMimeType": "application/json",
            "temperature": 0.3,
            "maxOutputTokens": 2048,
        },
    }
    try:
        resp = requests.post(url, json=payload, timeout=30)
        if resp.status_code == 200:
            return _parse(resp)
        if resp.status_code in (429, 404):
            time.sleep(1)
            continue
        return {}
    except requests.RequestException:
        return {}

return {}
def _parse(resp) -> dict: try: text = ( resp.json() .get("candidates", [{}])[0] .get("content", {}) .get("parts", [{}])[0] .get("text", "") .strip() ) if text.startswith("
"):             text = text.split("\n", 1)[-1].rsplit("
", 1)[0] return json.loads(text) except (json.JSONDecodeError, KeyError): return {}

---

Step 5: Build the AI Pipeline (Batch)

步骤5:构建AI批量分析流水线

python
undefined
python
undefined

ai/pipeline.py

ai/pipeline.py

import json import yaml from pathlib import Path from ai.client import generate
def analyse_batch(items: list[dict], context: str = "", preference_prompt: str = "") -> list[dict]: """Analyse items in batches. Returns items enriched with AI fields.""" config = yaml.safe_load((Path(file).parent.parent / "config.yaml").read_text()) model = config.get("ai", {}).get("model", "gemini-2.5-flash") rate_limit = config.get("ai", {}).get("rate_limit_seconds", 7.0) min_score = config.get("ai", {}).get("min_score", 0) batch_size = config.get("ai", {}).get("batch_size", 5)
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
print(f"  [AI] {len(items)} items → {len(batches)} API calls")

enriched = []
for i, batch in enumerate(batches):
    print(f"  [AI] Batch {i + 1}/{len(batches)}...")
    prompt = _build_prompt(batch, context, preference_prompt, config)
    result = generate(prompt, model=model, rate_limit=rate_limit)

    analyses = result.get("analyses", [])
    for j, item in enumerate(batch):
        ai = analyses[j] if j < len(analyses) else {}
        if ai:
            score = max(0, min(100, int(ai.get("score", 0))))
            if min_score and score < min_score:
                continue
            enriched.append({**item, "ai_score": score, "ai_summary": ai.get("summary", ""), "ai_notes": ai.get("notes", "")})
        else:
            enriched.append(item)

return enriched
def build_prompt(batch, context, preference_prompt, config): priorities = config.get("priorities", []) items_text = "\n\n".join( f"Item {i+1}: {json.dumps({k: v for k, v in item.items() if not k.startswith('')})}" for i, item in enumerate(batch) )
return f"""Analyse these {len(batch)} items and return a JSON object.
import json import yaml from pathlib import Path from ai.client import generate
def analyse_batch(items: list[dict], context: str = "", preference_prompt: str = "") -> list[dict]: """批量分析条目。返回添加了AI字段的条目列表。""" config = yaml.safe_load((Path(file).parent.parent / "config.yaml").read_text()) model = config.get("ai", {}).get("model", "gemini-2.5-flash") rate_limit = config.get("ai", {}).get("rate_limit_seconds", 7.0) min_score = config.get("ai", {}).get("min_score", 0) batch_size = config.get("ai", {}).get("batch_size", 5)
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
print(f"  [AI] {len(items)}个条目 → {len(batches)}次API调用")

enriched = []
for i, batch in enumerate(batches):
    print(f"  [AI] 正在处理第{i + 1}/{len(batches)}批...")
    prompt = _build_prompt(batch, context, preference_prompt, config)
    result = generate(prompt, model=model, rate_limit=rate_limit)

    analyses = result.get("analyses", [])
    for j, item in enumerate(batch):
        ai = analyses[j] if j < len(analyses) else {}
        if ai:
            score = max(0, min(100, int(ai.get("score", 0))))
            if min_score and score < min_score:
                continue
            enriched.append({**item, "ai_score": score, "ai_summary": ai.get("summary", ""), "ai_notes": ai.get("notes", "")})
        else:
            enriched.append(item)

return enriched
def build_prompt(batch, context, preference_prompt, config): priorities = config.get("priorities", []) items_text = "\n\n".join( f"Item {i+1}: {json.dumps({k: v for k, v in item.items() if not k.startswith('')})}" for i, item in enumerate(batch) )
return f"""分析以下{len(batch)}个条目并返回一个JSON对象。

Items

条目

{items_text}
{items_text}

User Context

用户上下文

{context[:800] if context else "Not provided"}
{context[:800] if context else "未提供"}

User Priorities

用户优先级

{chr(10).join(f"- {p}" for p in priorities)}
{preference_prompt}
{chr(10).join(f"- {p}" for p in priorities)}
{preference_prompt}

Instructions

指令

Return: {{"analyses": [{{"score": <0-100>, "summary": "<2 sentences>", "notes": "<why this matches or doesn't>"}} for each item in order]}} Be concise. Score 90+=excellent match, 70-89=good, 50-69=ok, <50=weak."""

---
返回格式:{{"analyses": [{{"score": <0-100>, "summary": "<2句话总结>", "notes": "<匹配或不匹配的原因>"}} 按条目顺序排列]}} 内容需简洁。评分90+=极佳匹配,70-89=良好,50-69=一般,<50=弱匹配。"""

---

Step 6: Build the Feedback Learning System

步骤6:构建反馈学习系统

python
undefined
python
undefined

ai/memory.py

ai/memory.py

"""Learn from user decisions to improve future scoring.""" import json from pathlib import Path
FEEDBACK_PATH = Path(file).parent.parent / "data" / "feedback.json"
def load_feedback() -> dict: if FEEDBACK_PATH.exists(): try: return json.loads(FEEDBACK_PATH.read_text()) except (json.JSONDecodeError, OSError): pass return {"positive": [], "negative": []}
def save_feedback(fb: dict): FEEDBACK_PATH.parent.mkdir(parents=True, exist_ok=True) FEEDBACK_PATH.write_text(json.dumps(fb, indent=2))
def build_preference_prompt(feedback: dict, max_examples: int = 15) -> str: """Convert feedback history into a prompt bias section.""" lines = [] if feedback.get("positive"): lines.append("# Items the user LIKED (positive signal):") for e in feedback["positive"][-max_examples:]: lines.append(f"- {e}") if feedback.get("negative"): lines.append("\n# Items the user SKIPPED/REJECTED (negative signal):") for e in feedback["negative"][-max_examples:]: lines.append(f"- {e}") if lines: lines.append("\nUse these patterns to bias scoring on new items.") return "\n".join(lines)

**Integration with your storage layer:** after each run, query your DB for items with positive/negative status and call `save_feedback()` with the extracted patterns.

---
"""从用户决策中学习以优化未来的评分逻辑。""" import json from pathlib import Path
FEEDBACK_PATH = Path(file).parent.parent / "data" / "feedback.json"
def load_feedback() -> dict: if FEEDBACK_PATH.exists(): try: return json.loads(FEEDBACK_PATH.read_text()) except (json.JSONDecodeError, OSError): pass return {"positive": [], "negative": []}
def save_feedback(fb: dict): FEEDBACK_PATH.parent.mkdir(parents=True, exist_ok=True) FEEDBACK_PATH.write_text(json.dumps(fb, indent=2))
def build_preference_prompt(feedback: dict, max_examples: int = 15) -> str: """将反馈历史转换为用于调整评分的提示词片段。""" lines = [] if feedback.get("positive"): lines.append("# 用户偏好的条目(正向信号):") for e in feedback["positive"][-max_examples:]: lines.append(f"- {e}") if feedback.get("negative"): lines.append("\n# 用户跳过/拒绝的条目(负向信号):") for e in feedback["negative"][-max_examples:]: lines.append(f"- {e}") if lines: lines.append("\n请根据这些模式调整对新条目的评分。") return "\n".join(lines)

**与存储层集成:** 每次运行后,查询数据库中标记为正向/负向状态的条目,提取模式后调用`save_feedback()`保存。

---

Step 7: Build Storage (Notion example)

步骤7:构建存储模块(Notion示例)

python
undefined
python
undefined

storage/notion_sync.py

storage/notion_sync.py

import os from notion_client import Client from notion_client.errors import APIResponseError
_client = None
def get_client(): global _client if _client is None: _client = Client(auth=os.environ["NOTION_TOKEN"]) return _client
def get_existing_urls(db_id: str) -> set[str]: """Fetch all URLs already stored — used for deduplication.""" client, seen, cursor = get_client(), set(), None while True: resp = client.databases.query(database_id=db_id, page_size=100, **{"start_cursor": cursor} if cursor else {}) for page in resp["results"]: url = page["properties"].get("URL", {}).get("url", "") if url: seen.add(url) if not resp["has_more"]: break cursor = resp["next_cursor"] return seen
def push_item(db_id: str, item: dict) -> bool: """Push one item to Notion. Returns True on success.""" props = { "Name": {"title": [{"text": {"content": item.get("name", "")[:100]}}]}, "URL": {"url": item.get("url")}, "Source": {"select": {"name": item.get("source", "Unknown")}}, "Date Found": {"date": {"start": item.get("date_found")}}, "Status": {"select": {"name": "New"}}, } # AI fields if item.get("ai_score") is not None: props["AI Score"] = {"number": item["ai_score"]} if item.get("ai_summary"): props["Summary"] = {"rich_text": [{"text": {"content": item["ai_summary"][:2000]}}]} if item.get("ai_notes"): props["Notes"] = {"rich_text": [{"text": {"content": item["ai_notes"][:2000]}}]}
try:
    get_client().pages.create(parent={"database_id": db_id}, properties=props)
    return True
except APIResponseError as e:
    print(f"[notion] Push failed: {e}")
    return False
def sync(db_id: str, items: list[dict]) -> tuple[int, int]: existing = get_existing_urls(db_id) added = skipped = 0 for item in items: if item.get("url") in existing: skipped += 1; continue if push_item(db_id, item): added += 1; existing.add(item["url"]) else: skipped += 1 return added, skipped

---
import os from notion_client import Client from notion_client.errors import APIResponseError
_client = None
def get_client(): global _client if _client is None: _client = Client(auth=os.environ["NOTION_TOKEN"]) return _client
def get_existing_urls(db_id: str) -> set[str]: """获取已存储的所有URL — 用于去重。""" client, seen, cursor = get_client(), set(), None while True: resp = client.databases.query(database_id=db_id, page_size=100, **{"start_cursor": cursor} if cursor else {}) for page in resp["results"]: url = page["properties"].get("URL", {}).get("url", "") if url: seen.add(url) if not resp["has_more"]: break cursor = resp["next_cursor"] return seen
def push_item(db_id: str, item: dict) -> bool: """将单个条目推送至Notion。成功返回True。""" props = { "Name": {"title": [{"text": {"content": item.get("name", "")[:100]}}]}, "URL": {"url": item.get("url")}, "Source": {"select": {"name": item.get("source", "Unknown")}}, "Date Found": {"date": {"start": item.get("date_found")}}, "Status": {"select": {"name": "New"}}, } # AI相关字段 if item.get("ai_score") is not None: props["AI Score"] = {"number": item["ai_score"]} if item.get("ai_summary"): props["Summary"] = {"rich_text": [{"text": {"content": item["ai_summary"][:2000]}}]} if item.get("ai_notes"): props["Notes"] = {"rich_text": [{"text": {"content": item["ai_notes"][:2000]}}]}
try:
    get_client().pages.create(parent={"database_id": db_id}, properties=props)
    return True
except APIResponseError as e:
    print(f"[notion] 推送失败:{e}")
    return False
def sync(db_id: str, items: list[dict]) -> tuple[int, int]: existing = get_existing_urls(db_id) added = skipped = 0 for item in items: if item.get("url") in existing: skipped += 1; continue if push_item(db_id, item): added += 1; existing.add(item["url"]) else: skipped += 1 return added, skipped

---

Step 8: Orchestrate in main.py

步骤8:在main.py中编排流程

python
undefined
python
undefined

scraper/main.py

scraper/main.py

import os, sys, yaml from pathlib import Path from dotenv import load_dotenv
load_dotenv()
from scraper.sources import my_source # add your sources
import os, sys, yaml from pathlib import Path from dotenv import load_dotenv
load_dotenv()
from scraper.sources import my_source # 添加你的数据源

NOTE: This example uses Notion. If storage.provider is "sheets" or "supabase",

注意:本示例使用Notion。如果storage.provider设置为"sheets"或"supabase",

replace this import with storage.sheets_sync or storage.supabase_sync and update

请将此处导入替换为storage.sheets_sync或storage.supabase_sync,并相应更新

the env var and sync() call accordingly.

环境变量和sync()调用。

from storage.notion_sync import sync
SOURCES = [ ("My Source", my_source.fetch), ]
def ai_enabled(): return bool(os.environ.get("GEMINI_API_KEY"))
def main(): config = yaml.safe_load((Path(file).parent.parent / "config.yaml").read_text()) provider = config.get("storage", {}).get("provider", "notion")
# Resolve the storage target identifier from env based on provider
if provider == "notion":
    db_id = os.environ.get("NOTION_DATABASE_ID")
    if not db_id:
        print("ERROR: NOTION_DATABASE_ID not set"); sys.exit(1)
else:
    # Extend here for sheets (SHEET_ID) or supabase (SUPABASE_TABLE) etc.
    print(f"ERROR: provider '{provider}' not yet wired in main.py"); sys.exit(1)

config = yaml.safe_load((Path(__file__).parent.parent / "config.yaml").read_text())
all_items = []

for name, fetch_fn in SOURCES:
    try:
        items = fetch_fn()
        print(f"[{name}] {len(items)} items")
        all_items.extend(items)
    except Exception as e:
        print(f"[{name}] FAILED: {e}")

# Deduplicate by URL
seen, deduped = set(), []
for item in all_items:
    if (url := item.get("url", "")) and url not in seen:
        seen.add(url); deduped.append(item)

print(f"Unique items: {len(deduped)}")

if ai_enabled() and deduped:
    from ai.memory import load_feedback, build_preference_prompt
    from ai.pipeline import analyse_batch

    # load_feedback() reads data/feedback.json written by your feedback sync script.
    # To keep it current, implement a separate feedback_sync.py that queries your
    # storage provider for items with positive/negative statuses and calls save_feedback().
    feedback = load_feedback()
    preference = build_preference_prompt(feedback)
    context_path = Path(__file__).parent.parent / "profile" / "context.md"
    context = context_path.read_text() if context_path.exists() else ""
    deduped = analyse_batch(deduped, context=context, preference_prompt=preference)
else:
    print("[AI] Skipped — GEMINI_API_KEY not set")

added, skipped = sync(db_id, deduped)
print(f"Done — {added} new, {skipped} existing")
if name == "main": main()

---
from storage.notion_sync import sync
SOURCES = [ ("My Source", my_source.fetch), ]
def ai_enabled(): return bool(os.environ.get("GEMINI_API_KEY"))
def main(): config = yaml.safe_load((Path(file).parent.parent / "config.yaml").read_text()) provider = config.get("storage", {}).get("provider", "notion")
# 根据存储提供商从环境变量中解析目标标识符
if provider == "notion":
    db_id = os.environ.get("NOTION_DATABASE_ID")
    if not db_id:
        print("错误:NOTION_DATABASE_ID未设置"); sys.exit(1)
else:
    # 在此扩展sheets(SHEET_ID)或supabase(SUPABASE_TABLE)等提供商的处理逻辑
    print(f"错误:提供商'{provider}'尚未在main.py中实现"); sys.exit(1)

config = yaml.safe_load((Path(__file__).parent.parent / "config.yaml").read_text())
all_items = []

for name, fetch_fn in SOURCES:
    try:
        items = fetch_fn()
        print(f"[{name}] 采集到{len(items)}个条目")
        all_items.extend(items)
    except Exception as e:
        print(f"[{name}] 采集失败:{e}")

# 按URL去重
seen, deduped = set(), []
for item in all_items:
    if (url := item.get("url", "")) and url not in seen:
        seen.add(url); deduped.append(item)

print(f"去重后条目数:{len(deduped)}")

if ai_enabled() and deduped:
    from ai.memory import load_feedback, build_preference_prompt
    from ai.pipeline import analyse_batch

    # load_feedback()读取由反馈同步脚本写入的data/feedback.json文件。
    # 为保持数据最新,请实现一个单独的feedback_sync.py脚本,查询
    # 存储提供商中标记为正向/负向状态的条目并调用save_feedback()。
    feedback = load_feedback()
    preference = build_preference_prompt(feedback)
    context_path = Path(__file__).parent.parent / "profile" / "context.md"
    context = context_path.read_text() if context_path.exists() else ""
    deduped = analyse_batch(deduped, context=context, preference_prompt=preference)
else:
    print("[AI] 跳过 — GEMINI_API_KEY未设置")

added, skipped = sync(db_id, deduped)
print(f"完成 — 新增{added}条,跳过{skipped}条(已存在)")
if name == "main": main()

---

Step 9: GitHub Actions Workflow

步骤9:GitHub Actions工作流配置

yaml
undefined
yaml
undefined

.github/workflows/scraper.yml

.github/workflows/scraper.yml

name: Data Scraper Agent
on: schedule: - cron: "0 */3 * * *" # every 3 hours — adjust to your needs workflow_dispatch: # allow manual trigger
permissions: contents: write # required for the feedback-history commit step
jobs: scrape: runs-on: ubuntu-latest timeout-minutes: 20
steps:
  - uses: actions/checkout@v4

  - uses: actions/setup-python@v5
    with:
      python-version: "3.11"
      cache: "pip"

  - run: pip install -r requirements.txt

  # Uncomment if Playwright is enabled in requirements.txt
  # - name: Install Playwright browsers
  #   run: python -m playwright install chromium --with-deps

  - name: Run agent
    env:
      NOTION_TOKEN: ${{ secrets.NOTION_TOKEN }}
      NOTION_DATABASE_ID: ${{ secrets.NOTION_DATABASE_ID }}
      GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
    run: python -m scraper.main

  - name: Commit feedback history
    run: |
      git config user.name "github-actions[bot]"
      git config user.email "github-actions[bot]@users.noreply.github.com"
      git add data/feedback.json || true
      git diff --cached --quiet || git commit -m "chore: update feedback history"
      git push

---
name: Data Scraper Agent
on: schedule: - cron: "0 */3 * * *" # 每3小时运行一次 — 根据需求调整 workflow_dispatch: # 允许手动触发
permissions: contents: write # 提交反馈历史时需要此权限
jobs: scrape: runs-on: ubuntu-latest timeout-minutes: 20
steps:
  - uses: actions/checkout@v4

  - uses: actions/setup-python@v5
    with:
      python-version: "3.11"
      cache: "pip"

  - run: pip install -r requirements.txt

  # 如果requirements.txt中启用了Playwright,请取消注释以下步骤
  # - name: 安装Playwright浏览器
  #   run: python -m playwright install chromium --with-deps

  - name: 运行Agent
    env:
      NOTION_TOKEN: ${{ secrets.NOTION_TOKEN }}
      NOTION_DATABASE_ID: ${{ secrets.NOTION_DATABASE_ID }}
      GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
    run: python -m scraper.main

  - name: 提交反馈历史
    run: |
      git config user.name "github-actions[bot]"
      git config user.email "github-actions[bot]@users.noreply.github.com"
      git add data/feedback.json || true
      git diff --cached --quiet || git commit -m "chore: update feedback history"
      git push

---

Step 10: config.yaml Template

步骤10:config.yaml模板

yaml
undefined
yaml
undefined

Customise this file — no code changes needed

自定义此文件 — 无需修改代码

What to collect (pre-filter before AI)

采集内容(AI处理前的预过滤)

filters: required_keywords: [] # item must contain at least one blocked_keywords: [] # item must not contain any
filters: required_keywords: [] # 条目必须包含至少一个关键词 blocked_keywords: [] # 条目不得包含任何屏蔽关键词

Your priorities — AI uses these for scoring

你的优先级 — AI将据此进行评分

priorities:
  • "example priority 1"
  • "example priority 2"
priorities:
  • "示例优先级1"
  • "示例优先级2"

Storage

存储配置

storage: provider: "notion" # notion | sheets | supabase | sqlite
storage: provider: "notion" # notion | sheets | supabase | sqlite

Feedback learning

反馈学习配置

feedback: positive_statuses: ["Saved", "Applied", "Interested"] negative_statuses: ["Skip", "Rejected", "Not relevant"]
feedback: positive_statuses: ["已保存", "已申请", "感兴趣"] negative_statuses: ["跳过", "已拒绝", "不相关"]

AI settings

AI设置

ai: enabled: true model: "gemini-2.5-flash" min_score: 0 # filter out items below this score rate_limit_seconds: 7 # seconds between API calls batch_size: 5 # items per API call

---
ai: enabled: true model: "gemini-2.5-flash" min_score: 0 # 过滤掉评分低于此值的条目 rate_limit_seconds: 7 # API调用间隔时间(秒) batch_size: 5 # 每次API调用处理的条目数

---

Common Scraping Patterns

常见爬取模式

Pattern 1: REST API (easiest)

模式1:REST API(最简单)

python
resp = requests.get(url, params={"q": query}, headers=HEADERS, timeout=15)
items = resp.json().get("results", [])
python
resp = requests.get(url, params={"q": query}, headers=HEADERS, timeout=15)
items = resp.json().get("results", [])

Pattern 2: HTML Scraping

模式2:HTML爬取

python
soup = BeautifulSoup(resp.text, "lxml")
for card in soup.select(".listing-card"):
    title = card.select_one("h2").get_text(strip=True)
    href = card.select_one("a")["href"]
python
soup = BeautifulSoup(resp.text, "lxml")
for card in soup.select(".listing-card"):
    title = card.select_one("h2").get_text(strip=True)
    href = card.select_one("a")["href"]

Pattern 3: RSS Feed

模式3:RSS订阅

python
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.text)
for item in root.findall(".//item"):
    title = item.findtext("title", "")
    link = item.findtext("link", "")
    pub_date = item.findtext("pubDate", "")
python
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.text)
for item in root.findall(".//item"):
    title = item.findtext("title", "")
    link = item.findtext("link", "")
    pub_date = item.findtext("pubDate", "")

Pattern 4: Paginated API

模式4:分页API

python
page = 1
while True:
    resp = requests.get(url, params={"page": page, "limit": 50}, timeout=15)
    data = resp.json()
    items = data.get("results", [])
    if not items:
        break
    for item in items:
        results.append(_normalise(item))
    if not data.get("has_more"):
        break
    page += 1
python
page = 1
while True:
    resp = requests.get(url, params={"page": page, "limit": 50}, timeout=15)
    data = resp.json()
    items = data.get("results", [])
    if not items:
        break
    for item in items:
        results.append(_normalise(item))
    if not data.get("has_more"):
        break
    page += 1

Pattern 5: JS-Rendered Pages (Playwright)

模式5:JS渲染页面(Playwright)

python
from playwright.sync_api import sync_playwright

with sync_playwright() as p:
    browser = p.chromium.launch()
    page = browser.new_page()
    page.goto(url)
    page.wait_for_selector(".listing")
    html = page.content()
    browser.close()

soup = BeautifulSoup(html, "lxml")

python
from playwright.sync_api import sync_playwright

with sync_playwright() as p:
    browser = p.chromium.launch()
    page = browser.new_page()
    page.goto(url)
    page.wait_for_selector(".listing")
    html = page.content()
    browser.close()

soup = BeautifulSoup(html, "lxml")

Anti-Patterns to Avoid

需避免的反模式

Anti-patternProblemFix
One LLM call per itemHits rate limits instantlyBatch 5 items per call
Hardcoded keywords in codeNot reusableMove all config to
config.yaml
Scraping without rate limitIP banAdd
time.sleep(1)
between requests
Storing secrets in codeSecurity riskAlways use
.env
+ GitHub Secrets
No deduplicationDuplicate rows pile upAlways check URL before pushing
Ignoring
robots.txt
Legal/ethical riskRespect crawl rules; use public APIs when available
JS-rendered sites with
requests
Empty responseUse Playwright or look for the underlying API
maxOutputTokens
too low
Truncated JSON, parse errorUse 2048+ for batch responses

反模式问题解决方案
为每个条目单独调用LLM立即触发速率限制每次调用批量处理5个条目
关键词硬编码在代码中无法复用将所有配置移至
config.yaml
爬取时不设置速率限制IP被封禁在请求之间添加
time.sleep(1)
密钥存储在代码中安全风险始终使用
.env
+ GitHub Secrets
不做去重处理重复数据堆积推送前始终检查URL是否已存在
忽略
robots.txt
法律/伦理风险遵守爬取规则;尽可能使用公开API
requests
处理JS渲染网站
返回空响应使用Playwright或寻找底层API
maxOutputTokens
设置过低
JSON被截断,解析错误批量响应设置为2048及以上

Free Tier Limits Reference

免费层级限制参考

ServiceFree LimitTypical Usage
Gemini Flash Lite30 RPM, 1500 RPD~56 req/day at 3-hr intervals
Gemini 2.0 Flash15 RPM, 1500 RPDGood fallback
Gemini 2.5 Flash10 RPM, 500 RPDUse sparingly
GitHub ActionsUnlimited (public repos)~20 min/day
Notion APIUnlimited~200 writes/day
Supabase500MB DB, 2GB transferFine for most agents
Google Sheets API300 req/minWorks for small agents

服务免费额度典型使用场景
Gemini Flash Lite30次/分钟,1500次/天每3小时运行一次,每日约56次请求
Gemini 2.0 Flash15次/分钟,1500次/天作为降级备选
Gemini 2.5 Flash10次/分钟,500次/天谨慎使用
GitHub Actions无限制(公开仓库)每日约20分钟运行时间
Notion API无限制每日约200次写入操作
Supabase500MB数据库,2GB流量适用于大多数Agent
Google Sheets API300次/分钟适用于小型Agent

Requirements Template

依赖包模板

requests==2.31.0
beautifulsoup4==4.12.3
lxml==5.1.0
python-dotenv==1.0.1
pyyaml==6.0.2
notion-client==2.2.1   # if using Notion
requests==2.31.0
beautifulsoup4==4.12.3
lxml==5.1.0
python-dotenv==1.0.1
pyyaml==6.0.2
notion-client==2.2.1   # 如果使用Notion

playwright==1.40.0 # uncomment for JS-rendered sites

playwright==1.40.0 # 处理JS渲染网站时取消注释


---

---

Quality Checklist

质量检查清单

Before marking the agent complete:
  • config.yaml
    controls all user-facing settings — no hardcoded values
  • profile/context.md
    holds user-specific context for AI matching
  • Deduplication by URL before every storage push
  • Gemini client has model fallback chain (4 models)
  • Batch size ≤ 5 items per API call
  • maxOutputTokens
    ≥ 2048
  • .env
    is in
    .gitignore
  • .env.example
    provided for onboarding
  • setup.py
    creates DB schema on first run
  • enrich_existing.py
    backfills AI scores on old rows
  • GitHub Actions workflow commits
    feedback.json
    after each run
  • README covers: setup in < 5 minutes, required secrets, customisation

在标记Agent完成前,请确认:
  • 所有用户可见的设置均由
    config.yaml
    控制 — 无硬编码值
  • profile/context.md
    存储AI匹配所需的用户特定上下文
  • 每次存储推送前都会按URL去重
  • Gemini客户端具备4个模型的降级链
  • 每次API调用的批量大小≤5个条目
  • maxOutputTokens
    设置≥2048
  • .env
    已添加至
    .gitignore
  • 提供
    .env.example
    用于快速上手
  • setup.py
    可在首次运行时创建数据库/表结构
  • enrich_existing.py
    可为旧数据批量补充AI评分
  • GitHub Actions工作流在每次运行后会提交
    feedback.json
  • README文档包含:5分钟快速搭建、所需密钥、自定义方法

Real-World Examples

实际应用示例

"Build me an agent that monitors Hacker News for AI startup funding news"
"Scrape product prices from 3 e-commerce sites and alert when they drop"
"Track new GitHub repos tagged with 'llm' or 'agents' — summarise each one"
"Collect Chief of Staff job listings from LinkedIn and Cutshort into Notion"
"Monitor a subreddit for posts mentioning my company — classify sentiment"
"Scrape new academic papers from arXiv on a topic I care about daily"
"Track sports fixture results and keep a running table in Google Sheets"
"Build a real estate listing watcher — alert on new properties under ₹1 Cr"

"帮我做一个监控Hacker News上AI初创公司融资新闻的Agent"
"从3个电商网站爬取产品价格,价格下降时发出提醒"
"追踪GitHub上标记为'llm'或'agents'的新仓库 — 为每个仓库生成摘要"
"从LinkedIn和Cutshort收集首席运营官职位列表并存储至Notion"
"监控某个Reddit版块中提及我公司的帖子 — 进行情感分类"
"每日从arXiv爬取我关注主题的新学术论文"
"追踪体育赛事结果并在Google Sheets中维护实时积分榜"
"构建一个房产列表监控器 — 当出现1000万卢比以下的新房产时发出提醒"

Reference Implementation

参考实现

A complete working agent built with this exact architecture would scrape 4+ sources, batch Gemini calls, learn from Applied/Rejected decisions stored in Notion, and run 100% free on GitHub Actions. Follow Steps 1–9 above to build your own.
一个基于此架构构建的完整可用Agent可爬取4个以上数据源,批量调用Gemini API,从Notion中标记为已申请/已拒绝的决策中学习,并100%免费在GitHub Actions上运行。按照上述步骤1-9即可构建你自己的Agent。