data-scraper-agent
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseData Scraper Agent
数据采集Agent
Build a production-ready, AI-powered data collection agent for any public data source.
Runs on a schedule, enriches results with a free LLM, stores to a database, and improves over time.
Stack: Python · Gemini Flash (free) · GitHub Actions (free) · Notion / Sheets / Supabase
构建可投入生产环境的、AI驱动的任意公开数据源采集Agent。支持定时运行,借助免费LLM增强采集结果,将数据存储至数据库,并能随时间迭代优化。
技术栈:Python · Gemini Flash(免费)· GitHub Actions(免费)· Notion / Sheets / Supabase
When to Activate
适用场景
- User wants to scrape or monitor any public website or API
- User says "build a bot that checks...", "monitor X for me", "collect data from..."
- User wants to track jobs, prices, news, repos, sports scores, events, listings
- User asks how to automate data collection without paying for hosting
- User wants an agent that gets smarter over time based on their decisions
- 用户需要采集或监控任意公开网站或API
- 用户提出“构建一个检查...的机器人”、“帮我监控X”、“从...采集数据”等需求
- 用户需要追踪招聘信息、价格、新闻、代码仓库、体育比分、活动、列表类内容
- 用户询问如何无需支付托管费用即可实现数据采集自动化
- 用户需要一个能根据自身决策逐步优化的Agent
Core Concepts
核心概念
The Three Layers
三层架构
Every data scraper agent has three layers:
COLLECT → ENRICH → STORE
│ │ │
Scraper AI (LLM) Database
runs on scores/ Notion /
schedule summarises Sheets /
& classifies Supabase每个数据采集Agent都包含三层架构:
COLLECT → ENRICH → STORE
│ │ │
Scraper AI (LLM) Database
runs on scores/ Notion /
schedule summarises Sheets /
& classifies SupabaseFree Stack
免费技术栈
| Layer | Tool | Why |
|---|---|---|
| Scraping | | No cost, covers 80% of public sites |
| JS-rendered sites | | When HTML scraping fails |
| AI enrichment | Gemini Flash via REST API | 500 req/day, 1M tokens/day — free |
| Storage | Notion API | Free tier, great UI for review |
| Schedule | GitHub Actions cron | Free for public repos |
| Learning | JSON feedback file in repo | Zero infra, persists in git |
| 层级 | 工具 | 选用原因 |
|---|---|---|
| 采集层 | | 零成本,支持80%的公开网站 |
| JS渲染网站 | | 当HTML采集失效时使用 |
| AI增强层 | 通过REST API调用Gemini Flash | 每日500次请求,100万token额度——完全免费 |
| 存储层 | Notion API | 免费版可用,具备优秀的内容查看界面 |
| 调度层 | GitHub Actions cron | 公开仓库完全免费 |
| 学习层 | 仓库中的JSON反馈文件 | 无需额外基础设施,数据持久化存储在Git中 |
AI Model Fallback Chain
AI模型降级链
Build agents to auto-fallback across Gemini models on quota exhaustion:
gemini-2.0-flash-lite (30 RPM) →
gemini-2.0-flash (15 RPM) →
gemini-2.5-flash (10 RPM) →
gemini-flash-lite-latest (fallback)构建的Agent会在配额耗尽时自动切换至其他Gemini模型:
gemini-2.0-flash-lite (30 RPM) →
gemini-2.0-flash (15 RPM) →
gemini-2.5-flash (10 RPM) →
gemini-flash-lite-latest (fallback)Batch API Calls for Efficiency
批量API调用提升效率
Never call the LLM once per item. Always batch:
python
undefined切勿为每个条目单独调用LLM,务必使用批量调用:
python
undefinedBAD: 33 API calls for 33 items
BAD: 33 API calls for 33 items
for item in items:
result = call_ai(item) # 33 calls → hits rate limit
for item in items:
result = call_ai(item) # 33 calls → hits rate limit
GOOD: 7 API calls for 33 items (batch size 5)
GOOD: 7 API calls for 33 items (batch size 5)
for batch in chunks(items, size=5):
results = call_ai(batch) # 7 calls → stays within free tier
---for batch in chunks(items, size=5):
results = call_ai(batch) # 7 calls → stays within free tier
---Workflow
工作流程
Step 1: Understand the Goal
步骤1:明确需求
Ask the user:
- What to collect: "What data source? URL / API / RSS / public endpoint?"
- What to extract: "What fields matter? Title, price, URL, date, score?"
- How to store: "Where should results go? Notion, Google Sheets, Supabase, or local file?"
- How to enrich: "Do you want AI to score, summarise, classify, or match each item?"
- Frequency: "How often should it run? Every hour, daily, weekly?"
Common examples to prompt:
- Job boards → score relevance to resume
- Product prices → alert on drops
- GitHub repos → summarise new releases
- News feeds → classify by topic + sentiment
- Sports results → extract stats to tracker
- Events calendar → filter by interest
向用户确认以下信息:
- 采集对象: “数据源是什么?URL/API/RSS/公开端点?”
- 提取字段: “需要提取哪些字段?标题、价格、URL、日期、评分?”
- 存储方式: “采集结果存储至何处?Notion、Google Sheets、Supabase还是本地文件?”
- 数据增强: “是否需要AI对每个条目进行评分、摘要、分类或匹配?”
- 运行频率: “运行频率是多少?每小时、每日还是每周?”
常见需求示例:
- 招聘网站 → 基于简历匹配度评分
- 产品价格 → 价格下降时发送提醒
- GitHub仓库 → 生成新版本摘要
- 新闻订阅源 → 按主题和情感分类
- 体育赛事结果 → 将统计数据提取至追踪表
- 活动日历 → 按兴趣筛选
Step 2: Design the Agent Architecture
步骤2:设计Agent架构
Generate this directory structure for the user:
my-agent/
├── config.yaml # User customises this (keywords, filters, preferences)
├── profile/
│ └── context.md # User context the AI uses (resume, interests, criteria)
├── scraper/
│ ├── __init__.py
│ ├── main.py # Orchestrator: scrape → enrich → store
│ ├── filters.py # Rule-based pre-filter (fast, before AI)
│ └── sources/
│ ├── __init__.py
│ └── source_name.py # One file per data source
├── ai/
│ ├── __init__.py
│ ├── client.py # Gemini REST client with model fallback
│ ├── pipeline.py # Batch AI analysis
│ ├── jd_fetcher.py # Fetch full content from URLs (optional)
│ └── memory.py # Learn from user feedback
├── storage/
│ ├── __init__.py
│ └── notion_sync.py # Or sheets_sync.py / supabase_sync.py
├── data/
│ └── feedback.json # User decision history (auto-updated)
├── .env.example
├── setup.py # One-time DB/schema creation
├── enrich_existing.py # Backfill AI scores on old rows
├── requirements.txt
└── .github/
└── workflows/
└── scraper.yml # GitHub Actions schedule为用户生成以下目录结构:
my-agent/
├── config.yaml # 用户可自定义此文件(关键词、过滤器、偏好设置)
├── profile/
│ └── context.md # AI使用的用户上下文信息(简历、兴趣、筛选标准)
├── scraper/
│ ├── __init__.py
│ ├── main.py # 编排器:采集 → 增强 → 存储
│ ├── filters.py AI处理前的规则预过滤器(快速高效)
│ └── sources/
│ ├── __init__.py
│ └── source_name.py # 每个数据源对应一个文件
├── ai/
│ ├── __init__.py
│ ├── client.py # 具备模型降级功能的Gemini REST客户端
│ ├── pipeline.py # AI批量分析流水线
│ ├── jd_fetcher.py # 从URL获取完整内容(可选)
│ └── memory.py # 从用户反馈中学习
├── storage/
│ ├── __init__.py
│ └── notion_sync.py # 或sheets_sync.py / supabase_sync.py
├── data/
│ └── feedback.json # 用户决策历史(自动更新)
├── .env.example
├── setup.py # 一键创建数据库/表结构
├── enrich_existing.py # 为旧数据补填AI评分
├── requirements.txt
└── .github/
└── workflows/
└── scraper.yml # GitHub Actions调度配置Step 3: Build the Scraper Source
步骤3:构建采集源
Template for any data source:
python
undefined通用数据源模板:
python
undefinedscraper/sources/my_source.py
scraper/sources/my_source.py
"""
[Source Name] — scrapes [what] from [where].
Method: [REST API / HTML scraping / RSS feed]
"""
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timezone
from scraper.filters import is_relevant
HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; research-bot/1.0)",
}
def fetch() -> list[dict]:
"""
Returns a list of items with consistent schema.
Each item must have at minimum: name, url, date_found.
"""
results = []
# ---- REST API source ----
resp = requests.get("https://api.example.com/items", headers=HEADERS, timeout=15)
if resp.status_code == 200:
for item in resp.json().get("results", []):
if not is_relevant(item.get("title", "")):
continue
results.append(_normalise(item))
return resultsdef _normalise(raw: dict) -> dict:
"""Convert raw API/HTML data to the standard schema."""
return {
"name": raw.get("title", ""),
"url": raw.get("link", ""),
"source": "MySource",
"date_found": datetime.now(timezone.utc).date().isoformat(),
# add domain-specific fields here
}
**HTML scraping pattern:**
```python
soup = BeautifulSoup(resp.text, "lxml")
for card in soup.select("[class*='listing']"):
title = card.select_one("h2, h3").get_text(strip=True)
link = card.select_one("a")["href"]
if not link.startswith("http"):
link = f"https://example.com{link}"RSS feed pattern:
python
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.text)
for item in root.findall(".//item"):
title = item.findtext("title", "")
link = item.findtext("link", "")"""
[Source Name] — scrapes [what] from [where].
Method: [REST API / HTML scraping / RSS feed]
"""
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timezone
from scraper.filters import is_relevant
HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; research-bot/1.0)",
}
def fetch() -> list[dict]:
"""
Returns a list of items with consistent schema.
Each item must have at minimum: name, url, date_found.
"""
results = []
# ---- REST API source ----
resp = requests.get("https://api.example.com/items", headers=HEADERS, timeout=15)
if resp.status_code == 200:
for item in resp.json().get("results", []):
if not is_relevant(item.get("title", "")):
continue
results.append(_normalise(item))
return resultsdef _normalise(raw: dict) -> dict:
"""Convert raw API/HTML data to the standard schema."""
return {
"name": raw.get("title", ""),
"url": raw.get("link", ""),
"source": "MySource",
"date_found": datetime.now(timezone.utc).date().isoformat(),
# 在此添加领域特定字段
}
**HTML采集模式:**
```python
soup = BeautifulSoup(resp.text, "lxml")
for card in soup.select("[class*='listing']"):
title = card.select_one("h2, h3").get_text(strip=True)
link = card.select_one("a")["href"]
if not link.startswith("http"):
link = f"https://example.com{link}"RSS订阅源采集模式:
python
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.text)
for item in root.findall(".//item"):
title = item.findtext("title", "")
link = item.findtext("link", "")Step 4: Build the Gemini AI Client
步骤4:构建Gemini AI客户端
python
undefinedpython
undefinedai/client.py
ai/client.py
import os, json, time, requests
_last_call = 0.0
MODEL_FALLBACK = [
"gemini-2.0-flash-lite",
"gemini-2.0-flash",
"gemini-2.5-flash",
"gemini-flash-lite-latest",
]
def generate(prompt: str, model: str = "", rate_limit: float = 7.0) -> dict:
"""Call Gemini with auto-fallback on 429. Returns parsed JSON or {}."""
global _last_call
api_key = os.environ.get("GEMINI_API_KEY", "")
if not api_key:
return {}
elapsed = time.time() - _last_call
if elapsed < rate_limit:
time.sleep(rate_limit - elapsed)
models = [model] + [m for m in MODEL_FALLBACK if m != model] if model else MODEL_FALLBACK
_last_call = time.time()
for m in models:
url = f"https://generativelanguage.googleapis.com/v1beta/models/{m}:generateContent?key={api_key}"
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"responseMimeType": "application/json",
"temperature": 0.3,
"maxOutputTokens": 2048,
},
}
try:
resp = requests.post(url, json=payload, timeout=30)
if resp.status_code == 200:
return _parse(resp)
if resp.status_code in (429, 404):
time.sleep(1)
continue
return {}
except requests.RequestException:
return {}
return {}def _parse(resp) -> dict:
try:
text = (
resp.json()
.get("candidates", [{}])[0]
.get("content", {})
.get("parts", [{}])[0]
.get("text", "")
.strip()
)
if text.startswith("", 1)[0]
return json.loads(text)
except (json.JSONDecodeError, KeyError):
return {}
"): text = text.split("\n", 1)[-1].rsplit("
---import os, json, time, requests
_last_call = 0.0
MODEL_FALLBACK = [
"gemini-2.0-flash-lite",
"gemini-2.0-flash",
"gemini-2.5-flash",
"gemini-flash-lite-latest",
]
def generate(prompt: str, model: str = "", rate_limit: float = 7.0) -> dict:
"""Call Gemini with auto-fallback on 429. Returns parsed JSON or {}."""
global _last_call
api_key = os.environ.get("GEMINI_API_KEY", "")
if not api_key:
return {}
elapsed = time.time() - _last_call
if elapsed < rate_limit:
time.sleep(rate_limit - elapsed)
models = [model] + [m for m in MODEL_FALLBACK if m != model] if model else MODEL_FALLBACK
_last_call = time.time()
for m in models:
url = f"https://generativelanguage.googleapis.com/v1beta/models/{m}:generateContent?key={api_key}"
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"responseMimeType": "application/json",
"temperature": 0.3,
"maxOutputTokens": 2048,
},
}
try:
resp = requests.post(url, json=payload, timeout=30)
if resp.status_code == 200:
return _parse(resp)
if resp.status_code in (429, 404):
time.sleep(1)
continue
return {}
except requests.RequestException:
return {}
return {}def _parse(resp) -> dict:
try:
text = (
resp.json()
.get("candidates", [{}])[0]
.get("content", {})
.get("parts", [{}])[0]
.get("text", "")
.strip()
)
if text.startswith("", 1)[0]
return json.loads(text)
except (json.JSONDecodeError, KeyError):
return {}
"): text = text.split("\n", 1)[-1].rsplit("
---Step 5: Build the AI Pipeline (Batch)
步骤5:构建AI批量处理流水线
python
undefinedpython
undefinedai/pipeline.py
ai/pipeline.py
import json
import yaml
from pathlib import Path
from ai.client import generate
def analyse_batch(items: list[dict], context: str = "", preference_prompt: str = "") -> list[dict]:
"""Analyse items in batches. Returns items enriched with AI fields."""
config = yaml.safe_load((Path(file).parent.parent / "config.yaml").read_text())
model = config.get("ai", {}).get("model", "gemini-2.5-flash")
rate_limit = config.get("ai", {}).get("rate_limit_seconds", 7.0)
min_score = config.get("ai", {}).get("min_score", 0)
batch_size = config.get("ai", {}).get("batch_size", 5)
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
print(f" [AI] {len(items)} items → {len(batches)} API calls")
enriched = []
for i, batch in enumerate(batches):
print(f" [AI] Batch {i + 1}/{len(batches)}...")
prompt = _build_prompt(batch, context, preference_prompt, config)
result = generate(prompt, model=model, rate_limit=rate_limit)
analyses = result.get("analyses", [])
for j, item in enumerate(batch):
ai = analyses[j] if j < len(analyses) else {}
if ai:
score = max(0, min(100, int(ai.get("score", 0))))
if min_score and score < min_score:
continue
enriched.append({**item, "ai_score": score, "ai_summary": ai.get("summary", ""), "ai_notes": ai.get("notes", "")})
else:
enriched.append(item)
return enricheddef build_prompt(batch, context, preference_prompt, config):
priorities = config.get("priorities", [])
items_text = "\n\n".join(
f"Item {i+1}: {json.dumps({k: v for k, v in item.items() if not k.startswith('')})}"
for i, item in enumerate(batch)
)
return f"""Analyse these {len(batch)} items and return a JSON object.import json
import yaml
from pathlib import Path
from ai.client import generate
def analyse_batch(items: list[dict], context: str = "", preference_prompt: str = "") -> list[dict]:
"""Analyse items in batches. Returns items enriched with AI fields."""
config = yaml.safe_load((Path(file).parent.parent / "config.yaml").read_text())
model = config.get("ai", {}).get("model", "gemini-2.5-flash")
rate_limit = config.get("ai", {}).get("rate_limit_seconds", 7.0)
min_score = config.get("ai", {}).get("min_score", 0)
batch_size = config.get("ai", {}).get("batch_size", 5)
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
print(f" [AI] {len(items)} items → {len(batches)} API calls")
enriched = []
for i, batch in enumerate(batches):
print(f" [AI] Batch {i + 1}/{len(batches)}...")
prompt = _build_prompt(batch, context, preference_prompt, config)
result = generate(prompt, model=model, rate_limit=rate_limit)
analyses = result.get("analyses", [])
for j, item in enumerate(batch):
ai = analyses[j] if j < len(analyses) else {}
if ai:
score = max(0, min(100, int(ai.get("score", 0))))
if min_score and score < min_score:
continue
enriched.append({**item, "ai_score": score, "ai_summary": ai.get("summary", ""), "ai_notes": ai.get("notes", "")})
else:
enriched.append(item)
return enricheddef build_prompt(batch, context, preference_prompt, config):
priorities = config.get("priorities", [])
items_text = "\n\n".join(
f"Item {i+1}: {json.dumps({k: v for k, v in item.items() if not k.startswith('')})}"
for i, item in enumerate(batch)
)
return f"""Analyse these {len(batch)} items and return a JSON object.Items
Items
{items_text}
{items_text}
User Context
User Context
{context[:800] if context else "Not provided"}
{context[:800] if context else "Not provided"}
User Priorities
User Priorities
{chr(10).join(f"- {p}" for p in priorities)}
{preference_prompt}
{chr(10).join(f"- {p}" for p in priorities)}
{preference_prompt}
Instructions
Instructions
Return: {{"analyses": [{{"score": <0-100>, "summary": "<2 sentences>", "notes": "<why this matches or doesn't>"}} for each item in order]}}
Be concise. Score 90+=excellent match, 70-89=good, 50-69=ok, <50=weak."""
---Return: {{"analyses": [{{"score": <0-100>, "summary": "<2 sentences>", "notes": "<why this matches or doesn't>"}} for each item in order]}}
Be concise. Score 90+=excellent match, 70-89=good, 50-69=ok, <50=weak."""
---Step 6: Build the Feedback Learning System
步骤6:构建反馈学习系统
python
undefinedpython
undefinedai/memory.py
ai/memory.py
"""Learn from user decisions to improve future scoring."""
import json
from pathlib import Path
FEEDBACK_PATH = Path(file).parent.parent / "data" / "feedback.json"
def load_feedback() -> dict:
if FEEDBACK_PATH.exists():
try:
return json.loads(FEEDBACK_PATH.read_text())
except (json.JSONDecodeError, OSError):
pass
return {"positive": [], "negative": []}
def save_feedback(fb: dict):
FEEDBACK_PATH.parent.mkdir(parents=True, exist_ok=True)
FEEDBACK_PATH.write_text(json.dumps(fb, indent=2))
def build_preference_prompt(feedback: dict, max_examples: int = 15) -> str:
"""Convert feedback history into a prompt bias section."""
lines = []
if feedback.get("positive"):
lines.append("# Items the user LIKED (positive signal):")
for e in feedback["positive"][-max_examples:]:
lines.append(f"- {e}")
if feedback.get("negative"):
lines.append("\n# Items the user SKIPPED/REJECTED (negative signal):")
for e in feedback["negative"][-max_examples:]:
lines.append(f"- {e}")
if lines:
lines.append("\nUse these patterns to bias scoring on new items.")
return "\n".join(lines)
**Integration with your storage layer:** after each run, query your DB for items with positive/negative status and call `save_feedback()` with the extracted patterns.
---"""Learn from user decisions to improve future scoring."""
import json
from pathlib import Path
FEEDBACK_PATH = Path(file).parent.parent / "data" / "feedback.json"
def load_feedback() -> dict:
if FEEDBACK_PATH.exists():
try:
return json.loads(FEEDBACK_PATH.read_text())
except (json.JSONDecodeError, OSError):
pass
return {"positive": [], "negative": []}
def save_feedback(fb: dict):
FEEDBACK_PATH.parent.mkdir(parents=True, exist_ok=True)
FEEDBACK_PATH.write_text(json.dumps(fb, indent=2))
def build_preference_prompt(feedback: dict, max_examples: int = 15) -> str:
"""Convert feedback history into a prompt bias section."""
lines = []
if feedback.get("positive"):
lines.append("# Items the user LIKED (positive signal):")
for e in feedback["positive"][-max_examples:]:
lines.append(f"- {e}")
if feedback.get("negative"):
lines.append("\n# Items the user SKIPPED/REJECTED (negative signal):")
for e in feedback["negative"][-max_examples:]:
lines.append(f"- {e}")
if lines:
lines.append("\nUse these patterns to bias scoring on new items.")
return "\n".join(lines)
**与存储层集成:** 每次运行后,查询数据库中标记为正面/负面状态的条目,提取模式后调用`save_feedback()`方法。
---Step 7: Build Storage (Notion example)
步骤7:构建存储模块(Notion示例)
python
undefinedpython
undefinedstorage/notion_sync.py
storage/notion_sync.py
import os
from notion_client import Client
from notion_client.errors import APIResponseError
_client = None
def get_client():
global _client
if _client is None:
_client = Client(auth=os.environ["NOTION_TOKEN"])
return _client
def get_existing_urls(db_id: str) -> set[str]:
"""Fetch all URLs already stored — used for deduplication."""
client, seen, cursor = get_client(), set(), None
while True:
resp = client.databases.query(database_id=db_id, page_size=100, **{"start_cursor": cursor} if cursor else {})
for page in resp["results"]:
url = page["properties"].get("URL", {}).get("url", "")
if url: seen.add(url)
if not resp["has_more"]: break
cursor = resp["next_cursor"]
return seen
def push_item(db_id: str, item: dict) -> bool:
"""Push one item to Notion. Returns True on success."""
props = {
"Name": {"title": [{"text": {"content": item.get("name", "")[:100]}}]},
"URL": {"url": item.get("url")},
"Source": {"select": {"name": item.get("source", "Unknown")}},
"Date Found": {"date": {"start": item.get("date_found")}},
"Status": {"select": {"name": "New"}},
}
# AI fields
if item.get("ai_score") is not None:
props["AI Score"] = {"number": item["ai_score"]}
if item.get("ai_summary"):
props["Summary"] = {"rich_text": [{"text": {"content": item["ai_summary"][:2000]}}]}
if item.get("ai_notes"):
props["Notes"] = {"rich_text": [{"text": {"content": item["ai_notes"][:2000]}}]}
try:
get_client().pages.create(parent={"database_id": db_id}, properties=props)
return True
except APIResponseError as e:
print(f"[notion] Push failed: {e}")
return Falsedef sync(db_id: str, items: list[dict]) -> tuple[int, int]:
existing = get_existing_urls(db_id)
added = skipped = 0
for item in items:
if item.get("url") in existing:
skipped += 1; continue
if push_item(db_id, item):
added += 1; existing.add(item["url"])
else:
skipped += 1
return added, skipped
---import os
from notion_client import Client
from notion_client.errors import APIResponseError
_client = None
def get_client():
global _client
if _client is None:
_client = Client(auth=os.environ["NOTION_TOKEN"])
return _client
def get_existing_urls(db_id: str) -> set[str]:
"""Fetch all URLs already stored — used for deduplication."""
client, seen, cursor = get_client(), set(), None
while True:
resp = client.databases.query(database_id=db_id, page_size=100, **{"start_cursor": cursor} if cursor else {})
for page in resp["results"]:
url = page["properties"].get("URL", {}).get("url", "")
if url: seen.add(url)
if not resp["has_more"]: break
cursor = resp["next_cursor"]
return seen
def push_item(db_id: str, item: dict) -> bool:
"""Push one item to Notion. Returns True on success."""
props = {
"Name": {"title": [{"text": {"content": item.get("name", "")[:100]}}]},
"URL": {"url": item.get("url")},
"Source": {"select": {"name": item.get("source", "Unknown")}},
"Date Found": {"date": {"start": item.get("date_found")}},
"Status": {"select": {"name": "New"}},
}
# AI fields
if item.get("ai_score") is not None:
props["AI Score"] = {"number": item["ai_score"]}
if item.get("ai_summary"):
props["Summary"] = {"rich_text": [{"text": {"content": item["ai_summary"][:2000]}}]}
if item.get("ai_notes"):
props["Notes"] = {"rich_text": [{"text": {"content": item["ai_notes"][:2000]}}]}
try:
get_client().pages.create(parent={"database_id": db_id}, properties=props)
return True
except APIResponseError as e:
print(f"[notion] Push failed: {e}")
return Falsedef sync(db_id: str, items: list[dict]) -> tuple[int, int]:
existing = get_existing_urls(db_id)
added = skipped = 0
for item in items:
if item.get("url") in existing:
skipped += 1; continue
if push_item(db_id, item):
added += 1; existing.add(item["url"])
else:
skipped += 1
return added, skipped
---Step 8: Orchestrate in main.py
步骤8:在main.py中实现编排
python
undefinedpython
undefinedscraper/main.py
scraper/main.py
import os, sys, yaml
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
from scraper.sources import my_source # add your sources
import os, sys, yaml
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
from scraper.sources import my_source # 添加你的数据源
NOTE: This example uses Notion. If storage.provider is "sheets" or "supabase",
注意:本示例使用Notion。如果storage.provider设置为"sheets"或"supabase",
replace this import with storage.sheets_sync or storage.supabase_sync and update
请将此处导入替换为storage.sheets_sync或storage.supabase_sync,并相应更新
the env var and sync() call accordingly.
环境变量和sync()调用。
from storage.notion_sync import sync
SOURCES = [
("My Source", my_source.fetch),
]
def ai_enabled():
return bool(os.environ.get("GEMINI_API_KEY"))
def main():
config = yaml.safe_load((Path(file).parent.parent / "config.yaml").read_text())
provider = config.get("storage", {}).get("provider", "notion")
# Resolve the storage target identifier from env based on provider
if provider == "notion":
db_id = os.environ.get("NOTION_DATABASE_ID")
if not db_id:
print("ERROR: NOTION_DATABASE_ID not set"); sys.exit(1)
else:
# Extend here for sheets (SHEET_ID) or supabase (SUPABASE_TABLE) etc.
print(f"ERROR: provider '{provider}' not yet wired in main.py"); sys.exit(1)
config = yaml.safe_load((Path(__file__).parent.parent / "config.yaml").read_text())
all_items = []
for name, fetch_fn in SOURCES:
try:
items = fetch_fn()
print(f"[{name}] {len(items)} items")
all_items.extend(items)
except Exception as e:
print(f"[{name}] FAILED: {e}")
# Deduplicate by URL
seen, deduped = set(), []
for item in all_items:
if (url := item.get("url", "")) and url not in seen:
seen.add(url); deduped.append(item)
print(f"Unique items: {len(deduped)}")
if ai_enabled() and deduped:
from ai.memory import load_feedback, build_preference_prompt
from ai.pipeline import analyse_batch
# load_feedback() reads data/feedback.json written by your feedback sync script.
# To keep it current, implement a separate feedback_sync.py that queries your
# storage provider for items with positive/negative statuses and calls save_feedback().
feedback = load_feedback()
preference = build_preference_prompt(feedback)
context_path = Path(__file__).parent.parent / "profile" / "context.md"
context = context_path.read_text() if context_path.exists() else ""
deduped = analyse_batch(deduped, context=context, preference_prompt=preference)
else:
print("[AI] Skipped — GEMINI_API_KEY not set")
added, skipped = sync(db_id, deduped)
print(f"Done — {added} new, {skipped} existing")if name == "main":
main()
---from storage.notion_sync import sync
SOURCES = [
("My Source", my_source.fetch),
]
def ai_enabled():
return bool(os.environ.get("GEMINI_API_KEY"))
def main():
config = yaml.safe_load((Path(file).parent.parent / "config.yaml").read_text())
provider = config.get("storage", {}).get("provider", "notion")
# 根据存储提供商从环境变量中解析目标标识
if provider == "notion":
db_id = os.environ.get("NOTION_DATABASE_ID")
if not db_id:
print("ERROR: NOTION_DATABASE_ID not set"); sys.exit(1)
else:
# 在此处扩展sheets(SHEET_ID)或supabase(SUPABASE_TABLE)等提供商的支持
print(f"ERROR: provider '{provider}' not yet wired in main.py"); sys.exit(1)
config = yaml.safe_load((Path(__file__).parent.parent / "config.yaml").read_text())
all_items = []
for name, fetch_fn in SOURCES:
try:
items = fetch_fn()
print(f"[{name}] {len(items)} items")
all_items.extend(items)
except Exception as e:
print(f"[{name}] FAILED: {e}")
# 通过URL去重
seen, deduped = set(), []
for item in all_items:
if (url := item.get("url", "")) and url not in seen:
seen.add(url); deduped.append(item)
print(f"Unique items: {len(deduped)}")
if ai_enabled() and deduped:
from ai.memory import load_feedback, build_preference_prompt
from ai.pipeline import analyse_batch
# load_feedback()读取由反馈同步脚本写入的data/feedback.json文件。
# 为保持数据最新,请实现单独的feedback_sync.py脚本,查询存储提供商中标记为
# 正面/负面状态的条目,并调用save_feedback()方法。
feedback = load_feedback()
preference = build_preference_prompt(feedback)
context_path = Path(__file__).parent.parent / "profile" / "context.md"
context = context_path.read_text() if context_path.exists() else ""
deduped = analyse_batch(deduped, context=context, preference_prompt=preference)
else:
print("[AI] Skipped — GEMINI_API_KEY not set")
added, skipped = sync(db_id, deduped)
print(f"Done — {added} new, {skipped} existing")if name == "main":
main()
---Step 9: GitHub Actions Workflow
步骤9:GitHub Actions工作流
yaml
undefinedyaml
undefined.github/workflows/scraper.yml
.github/workflows/scraper.yml
name: Data Scraper Agent
on:
schedule:
- cron: "0 */3 * * *" # every 3 hours — adjust to your needs
workflow_dispatch: # allow manual trigger
permissions:
contents: write # required for the feedback-history commit step
jobs:
scrape:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"
- run: pip install -r requirements.txt
# Uncomment if Playwright is enabled in requirements.txt
# - name: Install Playwright browsers
# run: python -m playwright install chromium --with-deps
- name: Run agent
env:
NOTION_TOKEN: ${{ secrets.NOTION_TOKEN }}
NOTION_DATABASE_ID: ${{ secrets.NOTION_DATABASE_ID }}
GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
run: python -m scraper.main
- name: Commit feedback history
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git add data/feedback.json || true
git diff --cached --quiet || git commit -m "chore: update feedback history"
git push
---name: Data Scraper Agent
on:
schedule:
- cron: "0 */3 * * *" # 每3小时运行一次 — 根据需求调整
workflow_dispatch: # 允许手动触发
permissions:
contents: write # 反馈历史提交步骤需要此权限
jobs:
scrape:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"
- run: pip install -r requirements.txt
# 如果requirements.txt中包含Playwright,请取消以下注释
# - name: Install Playwright browsers
# run: python -m playwright install chromium --with-deps
- name: Run agent
env:
NOTION_TOKEN: ${{ secrets.NOTION_TOKEN }}
NOTION_DATABASE_ID: ${{ secrets.NOTION_DATABASE_ID }}
GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
run: python -m scraper.main
- name: Commit feedback history
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git add data/feedback.json || true
git diff --cached --quiet || git commit -m "chore: update feedback history"
git push
---Step 10: config.yaml Template
步骤10:config.yaml模板
yaml
undefinedyaml
undefinedCustomise this file — no code changes needed
自定义此文件 — 无需修改代码
What to collect (pre-filter before AI)
采集规则(AI处理前的预过滤)
filters:
required_keywords: [] # item must contain at least one
blocked_keywords: [] # item must not contain any
filters:
required_keywords: [] # 条目必须包含至少一个关键词
blocked_keywords: [] # 条目不得包含任何关键词
Your priorities — AI uses these for scoring
你的优先级 — AI将其用于评分
priorities:
- "example priority 1"
- "example priority 2"
priorities:
- "示例优先级1"
- "示例优先级2"
Storage
存储设置
storage:
provider: "notion" # notion | sheets | supabase | sqlite
storage:
provider: "notion" # notion | sheets | supabase | sqlite
Feedback learning
反馈学习设置
feedback:
positive_statuses: ["Saved", "Applied", "Interested"]
negative_statuses: ["Skip", "Rejected", "Not relevant"]
feedback:
positive_statuses: ["Saved", "Applied", "Interested"]
negative_statuses: ["Skip", "Rejected", "Not relevant"]
AI settings
AI设置
ai:
enabled: true
model: "gemini-2.5-flash"
min_score: 0 # filter out items below this score
rate_limit_seconds: 7 # seconds between API calls
batch_size: 5 # items per API call
---ai:
enabled: true
model: "gemini-2.5-flash"
min_score: 0 # 过滤掉评分低于此值的条目
rate_limit_seconds: 7 # API调用间隔时间(秒)
batch_size: 5 # 每次API调用处理的条目数量
---Common Scraping Patterns
常见采集模式
Pattern 1: REST API (easiest)
模式1:REST API(最简便)
python
resp = requests.get(url, params={"q": query}, headers=HEADERS, timeout=15)
items = resp.json().get("results", [])python
resp = requests.get(url, params={"q": query}, headers=HEADERS, timeout=15)
items = resp.json().get("results", [])Pattern 2: HTML Scraping
模式2:HTML采集
python
soup = BeautifulSoup(resp.text, "lxml")
for card in soup.select(".listing-card"):
title = card.select_one("h2").get_text(strip=True)
href = card.select_one("a")["href"]python
soup = BeautifulSoup(resp.text, "lxml")
for card in soup.select(".listing-card"):
title = card.select_one("h2").get_text(strip=True)
href = card.select_one("a")["href"]Pattern 3: RSS Feed
模式3:RSS订阅源
python
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.text)
for item in root.findall(".//item"):
title = item.findtext("title", "")
link = item.findtext("link", "")
pub_date = item.findtext("pubDate", "")python
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.text)
for item in root.findall(".//item"):
title = item.findtext("title", "")
link = item.findtext("link", "")
pub_date = item.findtext("pubDate", "")Pattern 4: Paginated API
模式4:分页API
python
page = 1
while True:
resp = requests.get(url, params={"page": page, "limit": 50}, timeout=15)
data = resp.json()
items = data.get("results", [])
if not items:
break
for item in items:
results.append(_normalise(item))
if not data.get("has_more"):
break
page += 1python
page = 1
while True:
resp = requests.get(url, params={"page": page, "limit": 50}, timeout=15)
data = resp.json()
items = data.get("results", [])
if not items:
break
for item in items:
results.append(_normalise(item))
if not data.get("has_more"):
break
page += 1Pattern 5: JS-Rendered Pages (Playwright)
模式5:JS渲染页面(Playwright)
python
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto(url)
page.wait_for_selector(".listing")
html = page.content()
browser.close()
soup = BeautifulSoup(html, "lxml")python
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto(url)
page.wait_for_selector(".listing")
html = page.content()
browser.close()
soup = BeautifulSoup(html, "lxml")Anti-Patterns to Avoid
需避免的反模式
| Anti-pattern | Problem | Fix |
|---|---|---|
| One LLM call per item | Hits rate limits instantly | Batch 5 items per call |
| Hardcoded keywords in code | Not reusable | Move all config to |
| Scraping without rate limit | IP ban | Add |
| Storing secrets in code | Security risk | Always use |
| No deduplication | Duplicate rows pile up | Always check URL before pushing |
Ignoring | Legal/ethical risk | Respect crawl rules; use public APIs when available |
JS-rendered sites with | Empty response | Use Playwright or look for the underlying API |
| Truncated JSON, parse error | Use 2048+ for batch responses |
| 反模式 | 问题 | 解决方案 |
|---|---|---|
| 为每个条目单独调用LLM | 立即触发速率限制 | 每次调用批量处理5个条目 |
| 代码中硬编码关键词 | 无法复用 | 将所有配置移至 |
| 无速率限制的采集 | IP被封禁 | 在请求之间添加 |
| 代码中存储密钥 | 安全风险 | 始终使.env` + GitHub Secrets |
| 无去重机制 | 重复条目堆积 | 推送前始终检查URL |
忽略 | 法律/伦理风险 | 遵守爬取规则;优先使用公开API |
用 | 返回空响应 | 使用Playwright或寻找底层API |
| JSON被截断,解析错误 | 批量响应设置为2048及以上 |
Free Tier Limits Reference
免费版额度参考
| Service | Free Limit | Typical Usage |
|---|---|---|
| Gemini Flash Lite | 30 RPM, 1500 RPD | ~56 req/day at 3-hr intervals |
| Gemini 2.0 Flash | 15 RPM, 1500 RPD | Good fallback |
| Gemini 2.5 Flash | 10 RPM, 500 RPD | Use sparingly |
| GitHub Actions | Unlimited (public repos) | ~20 min/day |
| Notion API | Unlimited | ~200 writes/day |
| Supabase | 500MB DB, 2GB transfer | Fine for most agents |
| Google Sheets API | 300 req/min | Works for small agents |
| 服务 | 免费额度 | 典型使用场景 |
|---|---|---|
| Gemini Flash Lite | 每分钟30次请求,每日1500次请求 | 每3小时运行一次,每日约56次请求 |
| Gemini 2.0 Flash | 每分钟15次请求,每日1500次请求 | 优秀的降级备选 |
| Gemini 2.5 Flash | 每分钟10次请求,每日500次请求 | 谨慎使用 |
| GitHub Actions | 公开仓库无限制 | 每日约20分钟运行时间 |
| Notion API | 无限制 | 每日约200次写入 |
| Supabase | 500MB数据库,2GB流量 | 满足大多数Agent需求 |
| Google Sheets API | 每分钟300次请求 | 适用于小型Agent |
Requirements Template
依赖模板
requests==2.31.0
beautifulsoup4==4.12.3
lxml==5.1.0
python-dotenv==1.0.1
pyyaml==6.0.2
notion-client==2.2.1 # if using Notionrequests==2.31.0
beautifulsoup4==4.12.3
lxml==5.1.0
python-dotenv==1.0.1
pyyaml==6.0.2
notion-client==2.2.1 # 如果使用Notionplaywright==1.40.0 # uncomment for JS-rendered sites
playwright==1.40.0 # 如需采集JS渲染网站请取消注释
---
---Quality Checklist
质量检查清单
Before marking the agent complete:
- controls all user-facing settings — no hardcoded values
config.yaml - holds user-specific context for AI matching
profile/context.md - Deduplication by URL before every storage push
- Gemini client has model fallback chain (4 models)
- Batch size ≤ 5 items per API call
- ≥ 2048
maxOutputTokens - is in
.env.gitignore - provided for onboarding
.env.example - creates DB schema on first run
setup.py - backfills AI scores on old rows
enrich_existing.py - GitHub Actions workflow commits after each run
feedback.json - README covers: setup in < 5 minutes, required secrets, customisation
在标记Agent完成前,确认以下事项:
- 所有用户可配置项均由控制,无硬编码值
config.yaml - 存储用户专属上下文,用于AI匹配
profile/context.md - 每次存储推送前通过URL去重
- Gemini客户端具备4个模型的降级链
- 每次API调用批量处理不超过5个条目
- 设置为2048及以上
maxOutputTokens - 已添加至
.env.gitignore - 提供用于快速上手
.env.example - 在首次运行时创建数据库表结构
setup.py - 可对旧数据补填AI评分
enrich_existing.py - GitHub Actions工作流每次运行后提交
feedback.json - README包含:5分钟快速搭建、所需密钥、自定义配置说明
Real-World Examples
实际应用案例
"Build me an agent that monitors Hacker News for AI startup funding news"
"Scrape product prices from 3 e-commerce sites and alert when they drop"
"Track new GitHub repos tagged with 'llm' or 'agents' — summarise each one"
"Collect Chief of Staff job listings from LinkedIn and Cutshort into Notion"
"Monitor a subreddit for posts mentioning my company — classify sentiment"
"Scrape new academic papers from arXiv on a topic I care about daily"
"Track sports fixture results and keep a running table in Google Sheets"
"Build a real estate listing watcher — alert on new properties under ₹1 Cr""帮我构建一个监控Hacker News上AI初创企业融资新闻的Agent"
"从3个电商网站采集产品价格,在价格下降时发送提醒"
"追踪标记为'llm'或'agents'的新GitHub仓库——为每个仓库生成摘要"
"从LinkedIn和Cutshort采集首席幕僚职位信息并存储至Notion"
"监控某Reddit子版块中提及我司的帖子——进行情感分类"
"每日从arXiv采集我关注主题的新学术论文"
"追踪体育赛事结果并在Google Sheets中维护动态表格"
"构建一个房产列表监控工具——当出现低于1000万卢比的新房产时发送提醒"Reference Implementation
参考实现
A complete working agent built with this exact architecture would scrape 4+ sources,
batch Gemini calls, learn from Applied/Rejected decisions stored in Notion, and run
100% free on GitHub Actions. Follow Steps 1–9 above to build your own.
基于此架构构建的完整可用Agent可采集4个以上数据源,批量调用Gemini,从Notion中存储的“已申请/已拒绝”决策中学习,并100%免费在GitHub Actions上运行。按照上述步骤1-9即可构建属于你自己的Agent。