page-monitoring
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePage monitoring methodology
网页监控方法论
Patterns for tracking web page changes, detecting content removal, and preserving important pages before they disappear.
用于追踪网页变更、检测内容移除,以及在重要页面消失前进行备份的实践模式。
Monitoring service comparison
监控服务对比
| Service | Free Tier | Best For | Storage | Alert Speed |
|---|---|---|---|---|
| Visualping | 5 pages | Visual changes | Standard | Minutes |
| ChangeTower | Yes | Compliance, archiving | 12 years | Minutes |
| Distill.io | 25 pages | Element-level tracking | 12 months | Seconds |
| Wachete | Limited | Login-protected pages | 12 months | Minutes |
| UptimeRobot | 50 monitors | Uptime only | 2 months | Minutes |
| 服务 | 免费版 | 适用场景 | 存储时长 | 告警速度 |
|---|---|---|---|---|
| Visualping | 5个页面 | 视觉变更监控 | 标准存储 | 分钟级 |
| ChangeTower | 支持 | 合规性监控、归档 | 12年 | 分钟级 |
| Distill.io | 25个页面 | 元素级跟踪 | 12个月 | 秒级 |
| Wachete | 有限额度 | 需登录的页面监控 | 12个月 | 分钟级 |
| UptimeRobot | 50个监控项 | 仅可用性监控 | 2个月 | 分钟级 |
Quick-start: Monitor a page
快速入门:监控页面
Distill.io element monitoring
Distill.io元素监控
javascript
// Distill.io allows CSS/XPath selectors for precise monitoring
// Example selectors for common use cases:
// Monitor news article headlines
const newsSelector = '.article-headline, h1.title, .story-title';
// Monitor price changes
const priceSelector = '.price, .product-price, [data-price]';
// Monitor stock/availability
const availabilitySelector = '.in-stock, .availability, .stock-status';
// Monitor specific paragraph or section
const sectionSelector = '#main-content p:first-child';
// Monitor table data
const tableSelector = 'table.data-table tbody tr';javascript
// Distill.io支持使用CSS/XPath选择器进行精准监控
// 常见场景的选择器示例:
// 监控新闻文章标题
const newsSelector = '.article-headline, h1.title, .story-title';
// 监控价格变更
const priceSelector = '.price, .product-price, [data-price]';
// 监控库存/可用性
const availabilitySelector = '.in-stock, .availability, .stock-status';
// 监控特定段落或区域
const sectionSelector = '#main-content p:first-child';
// 监控表格数据
const tableSelector = 'table.data-table tbody tr';Python monitoring script
Python监控脚本
python
import requests
import hashlib
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from pathlib import Path
from typing import Optional
from bs4 import BeautifulSoup
class PageMonitor:
"""Simple page change monitor with local storage."""
def __init__(self, storage_dir: Path):
self.storage_dir = storage_dir
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.state_file = storage_dir / 'monitor_state.json'
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {'pages': {}}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def _get_page_hash(self, url: str, selector: str = None) -> tuple[str, str]:
"""Get content hash and content for a page or element."""
response = requests.get(url, timeout=30, headers={
'User-Agent': 'Mozilla/5.0 (PageMonitor/1.0)'
})
response.raise_for_status()
if selector:
soup = BeautifulSoup(response.text, 'html.parser')
element = soup.select_one(selector)
content = element.get_text(strip=True) if element else ''
else:
content = response.text
content_hash = hashlib.sha256(content.encode()).hexdigest()
return content_hash, content
def add_page(self, url: str, name: str, selector: str = None):
"""Add a page to monitor."""
content_hash, content = self._get_page_hash(url, selector)
self.state['pages'][url] = {
'name': name,
'selector': selector,
'last_hash': content_hash,
'last_check': datetime.now().isoformat(),
'last_content': content[:1000], # Store preview
'change_count': 0
}
self._save_state()
print(f"Added: {name} ({url})")
def check_page(self, url: str) -> Optional[dict]:
"""Check single page for changes."""
if url not in self.state['pages']:
return None
page = self.state['pages'][url]
selector = page.get('selector')
try:
new_hash, new_content = self._get_page_hash(url, selector)
except Exception as e:
return {
'url': url,
'name': page['name'],
'status': 'error',
'error': str(e)
}
changed = new_hash != page['last_hash']
result = {
'url': url,
'name': page['name'],
'status': 'changed' if changed else 'unchanged',
'previous_content': page['last_content'],
'new_content': new_content[:1000] if changed else None
}
if changed:
page['last_hash'] = new_hash
page['last_content'] = new_content[:1000]
page['change_count'] += 1
# Archive the change
archive_file = self.storage_dir / f"{hashlib.md5(url.encode()).hexdigest()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
archive_file.write_text(new_content)
page['last_check'] = datetime.now().isoformat()
self._save_state()
return result
def check_all(self) -> list[dict]:
"""Check all monitored pages."""
results = []
for url in self.state['pages']:
result = self.check_page(url)
if result:
results.append(result)
return resultspython
import requests
import hashlib
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from pathlib import Path
from typing import Optional
from bs4 import BeautifulSoup
class PageMonitor:
"""基于本地存储的简单页面变更监控工具。"""
def __init__(self, storage_dir: Path):
self.storage_dir = storage_dir
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.state_file = storage_dir / 'monitor_state.json'
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {'pages': {}}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def _get_page_hash(self, url: str, selector: str = None) -> tuple[str, str]:
"""获取页面或指定元素的内容哈希值与内容。"""
response = requests.get(url, timeout=30, headers={
'User-Agent': 'Mozilla/5.0 (PageMonitor/1.0)'
})
response.raise_for_status()
if selector:
soup = BeautifulSoup(response.text, 'html.parser')
element = soup.select_one(selector)
content = element.get_text(strip=True) if element else ''
else:
content = response.text
content_hash = hashlib.sha256(content.encode()).hexdigest()
return content_hash, content
def add_page(self, url: str, name: str, selector: str = None):
"""添加需要监控的页面。"""
content_hash, content = self._get_page_hash(url, selector)
self.state['pages'][url] = {
'name': name,
'selector': selector,
'last_hash': content_hash,
'last_check': datetime.now().isoformat(),
'last_content': content[:1000], # 存储预览内容
'change_count': 0
}
self._save_state()
print(f"已添加: {name} ({url})")
def check_page(self, url: str) -> Optional[dict]:
"""检查单个页面是否有变更。"""
if url not in self.state['pages']:
return None
page = self.state['pages'][url]
selector = page.get('selector')
try:
new_hash, new_content = self._get_page_hash(url, selector)
except Exception as e:
return {
'url': url,
'name': page['name'],
'status': 'error',
'error': str(e)
}
changed = new_hash != page['last_hash']
result = {
'url': url,
'name': page['name'],
'status': 'changed' if changed else 'unchanged',
'previous_content': page['last_content'],
'new_content': new_content[:1000] if changed else None
}
if changed:
page['last_hash'] = new_hash
page['last_content'] = new_content[:1000]
page['change_count'] += 1
# 归档变更内容
archive_file = self.storage_dir / f"{hashlib.md5(url.encode()).hexdigest()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
archive_file.write_text(new_content)
page['last_check'] = datetime.now().isoformat()
self._save_state()
return result
def check_all(self) -> list[dict]:
"""检查所有已监控页面。"""
results = []
for url in self.state['pages']:
result = self.check_page(url)
if result:
results.append(result)
return resultsUsage
使用示例
monitor = PageMonitor(Path('./page_monitor_data'))
monitor = PageMonitor(Path('./page_monitor_data'))
Add pages to monitor
添加需要监控的页面
monitor.add_page(
'https://example.com/important-page',
'Important Page',
selector='.main-content' # Optional: monitor specific element
)
monitor.add_page(
'https://example.com/important-page',
'重要页面',
selector='.main-content' # 可选:监控特定元素
)
Check for changes
检查变更
results = monitor.check_all()
for result in results:
if result['status'] == 'changed':
print(f"CHANGED: {result['name']}")
print(f" Previous: {result['previous_content'][:100]}...")
print(f" New: {result['new_content'][:100]}...")
undefinedresults = monitor.check_all()
for result in results:
if result['status'] == 'changed':
print(f"已变更: {result['name']}")
print(f" 之前内容: {result['previous_content'][:100]}...")
print(f" 新内容: {result['new_content'][:100]}...")
undefinedUptime monitoring
可用性监控
UptimeRobot API integration
UptimeRobot API集成
python
import requests
from typing import List, Optional
class UptimeRobotClient:
"""UptimeRobot API client for monitoring page availability."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.uptimerobot.com/v2"
def _request(self, endpoint: str, params: dict = None) -> dict:
data = {'api_key': self.api_key}
if params:
data.update(params)
response = requests.post(f"{self.base_url}/{endpoint}", data=data)
return response.json()
def get_monitors(self) -> List[dict]:
"""Get all monitors."""
result = self._request('getMonitors')
return result.get('monitors', [])
def create_monitor(self, friendly_name: str, url: str,
monitor_type: int = 1) -> dict:
"""Create a new monitor.
Types: 1=HTTP(s), 2=Keyword, 3=Ping, 4=Port
"""
return self._request('newMonitor', {
'friendly_name': friendly_name,
'url': url,
'type': monitor_type
})
def get_monitor_uptime(self, monitor_id: int,
custom_uptime_ratios: str = "7-30-90") -> dict:
"""Get uptime statistics for a monitor."""
return self._request('getMonitors', {
'monitors': monitor_id,
'custom_uptime_ratios': custom_uptime_ratios
})
def pause_monitor(self, monitor_id: int) -> dict:
"""Pause a monitor."""
return self._request('editMonitor', {
'id': monitor_id,
'status': 0
})
def resume_monitor(self, monitor_id: int) -> dict:
"""Resume a monitor."""
return self._request('editMonitor', {
'id': monitor_id,
'status': 1
})python
import requests
from typing import List, Optional
class UptimeRobotClient:
"""用于监控页面可用性的UptimeRobot API客户端。"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.uptimerobot.com/v2"
def _request(self, endpoint: str, params: dict = None) -> dict:
data = {'api_key': self.api_key}
if params:
data.update(params)
response = requests.post(f"{self.base_url}/{endpoint}", data=data)
return response.json()
def get_monitors(self) -> List[dict]:
"""获取所有监控项。"""
result = self._request('getMonitors')
return result.get('monitors', [])
def create_monitor(self, friendly_name: str, url: str,
monitor_type: int = 1) -> dict:
"""创建新的监控项。
类型: 1=HTTP(s), 2=关键词监控, 3=Ping监控, 4=端口监控
"""
return self._request('newMonitor', {
'friendly_name': friendly_name,
'url': url,
'type': monitor_type
})
def get_monitor_uptime(self, monitor_id: int,
custom_uptime_ratios: str = "7-30-90") -> dict:
"""获取监控项的可用性统计数据。"""
return self._request('getMonitors', {
'monitors': monitor_id,
'custom_uptime_ratios': custom_uptime_ratios
})
def pause_monitor(self, monitor_id: int) -> dict:
"""暂停监控项。"""
return self._request('editMonitor', {
'id': monitor_id,
'status': 0
})
def resume_monitor(self, monitor_id: int) -> dict:
"""恢复监控项。"""
return self._request('editMonitor', {
'id': monitor_id,
'status': 1
})Usage
使用示例
client = UptimeRobotClient('your-api-key')
client = UptimeRobotClient('your-api-key')
Create monitors for important pages
为重要页面创建监控项
client.create_monitor('News Homepage', 'https://example-news.com')
client.create_monitor('API Status', 'https://api.example.com/health')
client.create_monitor('新闻首页', 'https://example-news.com')
client.create_monitor('API状态页', 'https://api.example.com/health')
Check all monitors
检查所有监控项
for monitor in client.get_monitors():
status = 'UP' if monitor['status'] == 2 else 'DOWN'
print(f"{monitor['friendly_name']}: {status}")
undefinedfor monitor in client.get_monitors():
status = '正常' if monitor['status'] == 2 else '宕机'
print(f"{monitor['friendly_name']}: {status}")
undefinedRSS feed generation
RSS订阅源生成
Generate RSS from pages without feeds
为无RSS的页面生成RSS订阅源
python
import requests
from bs4 import BeautifulSoup
from feedgen.feed import FeedGenerator
from datetime import datetime
import hashlib
class RSSGenerator:
"""Generate RSS feeds from web pages."""
def __init__(self, feed_id: str, title: str, link: str):
self.fg = FeedGenerator()
self.fg.id(feed_id)
self.fg.title(title)
self.fg.link(href=link)
self.fg.description(f'Auto-generated feed for {title}')
def add_from_page(self, url: str, item_selector: str,
title_selector: str, link_selector: str,
description_selector: str = None):
"""Parse a page and add items to feed.
Args:
url: Page URL to parse
item_selector: CSS selector for each item container
title_selector: CSS selector for title (relative to item)
link_selector: CSS selector for link (relative to item)
description_selector: Optional CSS selector for description
"""
response = requests.get(url, timeout=30)
soup = BeautifulSoup(response.text, 'html.parser')
items = soup.select(item_selector)
for item in items[:20]: # Limit to 20 items
title_elem = item.select_one(title_selector)
link_elem = item.select_one(link_selector)
if not title_elem or not link_elem:
continue
title = title_elem.get_text(strip=True)
link = link_elem.get('href', '')
# Make absolute URL if relative
if link.startswith('/'):
from urllib.parse import urljoin
link = urljoin(url, link)
fe = self.fg.add_entry()
fe.id(hashlib.md5(link.encode()).hexdigest())
fe.title(title)
fe.link(href=link)
if description_selector:
desc_elem = item.select_one(description_selector)
if desc_elem:
fe.description(desc_elem.get_text(strip=True))
fe.published(datetime.now())
def generate_rss(self) -> str:
"""Generate RSS XML string."""
return self.fg.rss_str(pretty=True).decode()
def save_rss(self, filepath: str):
"""Save RSS feed to file."""
self.fg.rss_file(filepath)python
import requests
from bs4 import BeautifulSoup
from feedgen.feed import FeedGenerator
from datetime import datetime
import hashlib
class RSSGenerator:
"""从网页生成RSS订阅源的工具。"""
def __init__(self, feed_id: str, title: str, link: str):
self.fg = FeedGenerator()
self.fg.id(feed_id)
self.fg.title(title)
self.fg.link(href=link)
self.fg.description(f'为{title}自动生成的订阅源')
def add_from_page(self, url: str, item_selector: str,
title_selector: str, link_selector: str,
description_selector: str = None):
"""解析页面并将内容添加到订阅源。
参数:
url: 要解析的页面URL
item_selector: 单个内容项的CSS选择器
title_selector: 标题的CSS选择器(相对内容项)
link_selector: 链接的CSS选择器(相对内容项)
description_selector: 描述内容的可选CSS选择器
"""
response = requests.get(url, timeout=30)
soup = BeautifulSoup(response.text, 'html.parser')
items = soup.select(item_selector)
for item in items[:20]: # 限制最多20个内容项
title_elem = item.select_one(title_selector)
link_elem = item.select_one(link_selector)
if not title_elem or not link_elem:
continue
title = title_elem.get_text(strip=True)
link = link_elem.get('href', '')
# 若为相对链接则转换为绝对URL
if link.startswith('/'):
from urllib.parse import urljoin
link = urljoin(url, link)
fe = self.fg.add_entry()
fe.id(hashlib.md5(link.encode()).hexdigest())
fe.title(title)
fe.link(href=link)
if description_selector:
desc_elem = item.select_one(description_selector)
if desc_elem:
fe.description(desc_elem.get_text(strip=True))
fe.published(datetime.now())
def generate_rss(self) -> str:
"""生成RSS XML字符串。"""
return self.fg.rss_str(pretty=True).decode()
def save_rss(self, filepath: str):
"""将RSS订阅源保存到文件。"""
self.fg.rss_file(filepath)Example: Generate feed for a news site without RSS
示例:为无RSS的新闻网站生成订阅源
rss = RSSGenerator(
'https://example.com/news',
'Example News Feed',
'https://example.com/news'
)
rss.add_from_page(
'https://example.com/news',
item_selector='.news-item',
title_selector='h2 a',
link_selector='h2 a',
description_selector='.summary'
)
rss = RSSGenerator(
'https://example.com/news',
'示例新闻订阅源',
'https://example.com/news'
)
rss.add_from_page(
'https://example.com/news',
item_selector='.news-item',
title_selector='h2 a',
link_selector='h2 a',
description_selector='.summary'
)
Save the feed
保存订阅源
rss.save_rss('example_feed.xml')
undefinedrss.save_rss('example_feed.xml')
undefinedUsing RSS-Bridge (self-hosted)
使用RSS-Bridge(自托管)
bash
undefinedbash
undefinedRSS-Bridge generates feeds for sites without them
RSS-Bridge可为无订阅源的网站生成RSS
Supports Twitter, Instagram, YouTube, and many others
支持Twitter、Instagram、YouTube等众多平台
Docker installation
Docker安装方式
docker pull rssbridge/rss-bridge
docker run -d -p 3000:80 rssbridge/rss-bridge
docker pull rssbridge/rss-bridge
docker run -d -p 3000:80 rssbridge/rss-bridge
Access at http://localhost:3000
Select a bridge, enter parameters, get RSS feed URL
选择对应的桥接器,输入参数即可获取RSS订阅源URL
undefinedundefinedSocial media monitoring
社交媒体监控
Twitter/X archiving with Twarc
使用Twarc归档Twitter/X内容
python
undefinedpython
undefinedTwarc requires Twitter API credentials
Twarc需要Twitter API凭证
Installation
安装方式
pip install twarc
pip install twarc
Configure
配置步骤
twarc2 configure
twarc2 configure
import subprocess
import json
from pathlib import Path
class TwitterArchiver:
"""Archive Twitter searches and timelines."""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
def search(self, query: str, max_results: int = 100) -> Path:
"""Search tweets and save to file."""
output_file = self.output_dir / f"search_{query.replace(' ', '_')}.jsonl"
subprocess.run([
'twarc2', 'search',
'--max-results', str(max_results),
query,
str(output_file)
], check=True)
return output_file
def get_timeline(self, username: str, max_results: int = 100) -> Path:
"""Get user timeline."""
output_file = self.output_dir / f"timeline_{username}.jsonl"
subprocess.run([
'twarc2', 'timeline',
'--max-results', str(max_results),
username,
str(output_file)
], check=True)
return output_file
def parse_archive(self, filepath: Path) -> list[dict]:
"""Parse archived tweets."""
tweets = []
with open(filepath) as f:
for line in f:
data = json.loads(line)
if 'data' in data:
tweets.extend(data['data'])
return tweetsundefinedimport subprocess
import json
from pathlib import Path
class TwitterArchiver:
"""归档Twitter搜索结果和用户时间线的工具。"""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
def search(self, query: str, max_results: int = 100) -> Path:
"""搜索推文并保存到文件。"""
output_file = self.output_dir / f"search_{query.replace(' ', '_')}.jsonl"
subprocess.run([
'twarc2', 'search',
'--max-results', str(max_results),
query,
str(output_file)
], check=True)
return output_file
def get_timeline(self, username: str, max_results: int = 100) -> Path:
"""获取用户时间线内容。"""
output_file = self.output_dir / f"timeline_{username}.jsonl"
subprocess.run([
'twarc2', 'timeline',
'--max-results', str(max_results),
username,
str(output_file)
], check=True)
return output_file
def parse_archive(self, filepath: Path) -> list[dict]:
"""解析已归档的推文数据。"""
tweets = []
with open(filepath) as f:
for line in f:
data = json.loads(line)
if 'data' in data:
tweets.extend(data['data'])
return tweetsundefinedWebhook notifications
Webhook告警通知
Send alerts on changes
变更发生时发送告警
python
import requests
from typing import Optional
class AlertManager:
"""Send alerts when monitored pages change."""
def __init__(self, slack_webhook: str = None,
discord_webhook: str = None,
email_config: dict = None):
self.slack_webhook = slack_webhook
self.discord_webhook = discord_webhook
self.email_config = email_config
def send_slack(self, message: str, channel: str = None):
"""Send Slack notification."""
if not self.slack_webhook:
return
payload = {'text': message}
if channel:
payload['channel'] = channel
requests.post(self.slack_webhook, json=payload)
def send_discord(self, message: str):
"""Send Discord notification."""
if not self.discord_webhook:
return
requests.post(self.discord_webhook, json={'content': message})
def send_email(self, subject: str, body: str, to: str):
"""Send email notification."""
if not self.email_config:
return
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.email_config['from']
msg['To'] = to
with smtplib.SMTP(self.email_config['smtp_host'],
self.email_config['smtp_port']) as server:
server.starttls()
server.login(self.email_config['username'],
self.email_config['password'])
server.send_message(msg)
def alert_change(self, page_name: str, url: str,
old_content: str, new_content: str):
"""Send change alert to all configured channels."""
message = f"""
Page Changed: {page_name}
URL: {url}
Time: {datetime.now().isoformat()}
Previous content (preview):
{old_content[:200]}...
New content (preview):
{new_content[:200]}...
"""
if self.slack_webhook:
self.send_slack(message)
if self.discord_webhook:
self.send_discord(message)python
import requests
from typing import Optional
from datetime import datetime
class AlertManager:
"""当监控页面发生变更时发送告警通知。"""
def __init__(self, slack_webhook: str = None,
discord_webhook: str = None,
email_config: dict = None):
self.slack_webhook = slack_webhook
self.discord_webhook = discord_webhook
self.email_config = email_config
def send_slack(self, message: str, channel: str = None):
"""发送Slack通知。"""
if not self.slack_webhook:
return
payload = {'text': message}
if channel:
payload['channel'] = channel
requests.post(self.slack_webhook, json=payload)
def send_discord(self, message: str):
"""发送Discord通知。"""
if not self.discord_webhook:
return
requests.post(self.discord_webhook, json={'content': message})
def send_email(self, subject: str, body: str, to: str):
"""发送邮件通知。"""
if not self.email_config:
return
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.email_config['from']
msg['To'] = to
with smtplib.SMTP(self.email_config['smtp_host'],
self.email_config['smtp_port']) as server:
server.starttls()
server.login(self.email_config['username'],
self.email_config['password'])
server.send_message(msg)
def alert_change(self, page_name: str, url: str,
old_content: str, new_content: str):
"""向所有已配置的渠道发送变更告警。"""
message = f"""
页面已变更: {page_name}
URL: {url}
时间: {datetime.now().isoformat()}
之前内容(预览):
{old_content[:200]}...
新内容(预览):
{new_content[:200]}...
"""
if self.slack_webhook:
self.send_slack(message)
if self.discord_webhook:
self.send_discord(message)Scheduled monitoring with cron
使用Cron实现定时监控
Cron setup for continuous monitoring
Cron配置实现持续监控
bash
undefinedbash
undefinedEdit crontab
编辑Crontab
crontab -e
crontab -e
Check pages every 15 minutes
每15分钟检查一次页面
*/15 * * * * /usr/bin/python3 /path/to/monitor_script.py >> /var/log/monitor.log 2>&1
*/15 * * * * /usr/bin/python3 /path/to/monitor_script.py >> /var/log/monitor.log 2>&1
Check critical pages every 5 minutes
每5分钟检查一次关键页面
*/5 * * * * /usr/bin/python3 /path/to/critical_monitor.py >> /var/log/critical.log 2>&1
*/5 * * * * /usr/bin/python3 /path/to/critical_monitor.py >> /var/log/critical.log 2>&1
Daily summary report at 8 AM
每天早上8点生成汇总报告
0 8 * * * /usr/bin/python3 /path/to/daily_report.py
undefined0 8 * * * /usr/bin/python3 /path/to/daily_report.py
undefinedMonitoring script template
监控脚本模板
python
#!/usr/bin/env python3
"""Page monitoring script for cron execution."""
import sys
from pathlib import Path
from datetime import datetimepython
#!/usr/bin/env python3
"""用于Cron定时执行的页面监控脚本。"""
import sys
from pathlib import Path
from datetime import datetimeAdd project to path
将项目路径添加到系统路径
sys.path.insert(0, str(Path(file).parent))
from monitor import PageMonitor
from alerts import AlertManager
def main():
# Initialize
monitor = PageMonitor(Path('./data'))
alerts = AlertManager(
slack_webhook='https://hooks.slack.com/services/...',
discord_webhook='https://discord.com/api/webhooks/...'
)
# Check all pages
results = monitor.check_all()
# Process results
changes = [r for r in results if r['status'] == 'changed']
errors = [r for r in results if r['status'] == 'error']
# Alert on changes
for change in changes:
alerts.alert_change(
change['name'],
change['url'],
change['previous_content'],
change['new_content']
)
print(f"[{datetime.now()}] CHANGE: {change['name']}")
# Alert on errors
for error in errors:
alerts.send_slack(f"Monitor error for {error['name']}: {error['error']}")
print(f"[{datetime.now()}] ERROR: {error['name']} - {error['error']}")
# Summary
print(f"[{datetime.now()}] Checked {len(results)} pages, "
f"{len(changes)} changes, {len(errors)} errors")if name == 'main':
main()
undefinedsys.path.insert(0, str(Path(file).parent))
from monitor import PageMonitor
from alerts import AlertManager
def main():
# 初始化
monitor = PageMonitor(Path('./data'))
alerts = AlertManager(
slack_webhook='https://hooks.slack.com/services/...',
discord_webhook='https://discord.com/api/webhooks/...'
)
# 检查所有页面
results = monitor.check_all()
# 处理结果
changes = [r for r in results if r['status'] == 'changed']
errors = [r for r in results if r['status'] == 'error']
# 发送变更告警
for change in changes:
alerts.alert_change(
change['name'],
change['url'],
change['previous_content'],
change['new_content']
)
print(f"[{datetime.now()}] 已变更: {change['name']}")
# 发送错误告警
for error in errors:
alerts.send_slack(f"监控错误 - {error['name']}: {error['error']}")
print(f"[{datetime.now()}] 错误: {error['name']} - {error['error']}")
# 汇总信息
print(f"[{datetime.now()}] 已检查{len(results)}个页面,"
f"发现{len(changes)}处变更,{len(errors)}个错误")if name == 'main':
main()
undefinedArchive on change
变更时自动归档
Automatic archiving when changes detected
检测到变更时自动归档内容
python
from multiarchiver import MultiArchiver
class ArchivingMonitor(PageMonitor):
"""Page monitor that archives content when changes detected."""
def __init__(self, storage_dir: Path):
super().__init__(storage_dir)
self.archiver = MultiArchiver()
def check_page(self, url: str) -> dict:
"""Check page and archive if changed."""
result = super().check_page(url)
if result and result['status'] == 'changed':
# Archive to multiple services
archive_results = self.archiver.archive_url(url)
successful_archives = [
r.archived_url for r in archive_results
if r.success
]
result['archives'] = successful_archives
# Log archive URLs
print(f"Archived {url} to:")
for archive_url in successful_archives:
print(f" - {archive_url}")
return resultpython
from multiarchiver import MultiArchiver
class ArchivingMonitor(PageMonitor):
"""检测到页面变更时自动归档内容的监控工具。"""
def __init__(self, storage_dir: Path):
super().__init__(storage_dir)
self.archiver = MultiArchiver()
def check_page(self, url: str) -> dict:
"""检查页面并在变更时进行归档。"""
result = super().check_page(url)
if result and result['status'] == 'changed':
# 归档到多个服务
archive_results = self.archiver.archive_url(url)
successful_archives = [
r.archived_url for r in archive_results
if r.success
]
result['archives'] = successful_archives
# 打印归档URL
print(f"已将{url}归档到:")
for archive_url in successful_archives:
print(f" - {archive_url}")
return resultMonitoring strategy by use case
按场景划分的监控策略
News monitoring
新闻监控
markdown
undefinedmarkdown
undefinedNews/Current Events Monitoring
新闻/时事监控
Pages to monitor:
监控页面范围:
- Breaking news sections
- Press release pages
- Government announcement pages
- Company newsrooms
- 突发新闻板块
- 新闻发布页面
- 政府公告页面
- 企业新闻中心
Monitoring frequency:
监控频率:
- Breaking news: Every 5 minutes
- Press releases: Every 15-30 minutes
- General news: Every hour
- 突发新闻:每5分钟一次
- 新闻发布:每15-30分钟一次
- 普通新闻:每小时一次
Archive strategy:
归档策略:
- Archive immediately on detection
- Use both Wayback Machine and Archive.today
- Save local copy with timestamp
undefined- 检测到变更后立即归档
- 同时使用Wayback Machine和Archive.today
- 保存带时间戳的本地副本
undefinedResearch monitoring
研究内容监控
markdown
undefinedmarkdown
undefinedAcademic/Research Monitoring
学术/研究内容监控
Pages to monitor:
监控页面范围:
- Preprint servers (arXiv, SSRN)
- Journal table of contents
- Conference proceedings
- Researcher profiles
- 预印本服务器(arXiv、SSRN)
- 期刊目录
- 会议论文集
- 研究人员个人主页
Monitoring frequency:
监控频率:
- Daily for active topics
- Weekly for general monitoring
- 热门主题:每日监控
- 常规主题:每周监控
Tools recommended:
推荐工具:
- Google Scholar alerts (free, built-in)
- Semantic Scholar alerts
- RSS feeds where available
- Custom monitors for specific pages
undefined- Google Scholar告警(免费内置功能)
- Semantic Scholar告警
- 现有RSS订阅源
- 针对特定页面的自定义监控脚本
undefinedCompetitive intelligence
竞争情报监控
markdown
undefinedmarkdown
undefinedCompetitor Monitoring
竞品监控
Pages to monitor:
监控页面范围:
- Pricing pages
- Product pages
- Job postings
- Press releases
- Executive bios
- 定价页面
- 产品页面
- 招聘信息
- 新闻发布
- 高管介绍
Monitoring frequency:
监控频率:
- Pricing: Daily
- Products: Daily
- Jobs: Weekly
- Press: Daily
- 定价:每日监控
- 产品:每日监控
- 招聘:每周监控
- 新闻:每日监控
Legal considerations:
法律注意事项:
- Don't violate terms of service
- Don't circumvent access controls
- Public pages only
- Don't scrape at high frequency
undefined- 不得违反服务条款
- 不得规避访问控制
- 仅监控公开页面
- 不得高频抓取
undefinedBest practices
最佳实践
Monitoring checklist
监控检查清单
markdown
undefinedmarkdown
undefinedBefore monitoring a page:
监控页面之前:
- Is the page publicly accessible?
- Are you respecting robots.txt?
- Is monitoring frequency reasonable?
- Do you have a legitimate purpose?
- Are you storing data securely?
- Do you have alerts configured?
- Is archiving set up for important pages?
- 页面是否可公开访问?
- 是否遵守robots.txt规则?
- 监控频率是否合理?
- 是否有合法的监控目的?
- 数据存储是否安全?
- 是否已配置告警通知?
- 是否已为重要页面设置归档?
Maintenance:
日常维护:
- Review monitors monthly
- Remove stale monitors
- Update selectors if pages change
- Check alert delivery
- Verify archives are working
undefined- 每月复查监控项
- 移除失效的监控项
- 页面更新时同步修改选择器
- 检查告警通知是否正常送达
- 验证归档功能是否正常
undefinedRate limiting
请求频率限制
python
import time
from functools import wraps
def rate_limit(min_interval: float = 1.0):
"""Decorator to rate limit function calls."""
last_call = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decoratorpython
import time
from functools import wraps
def rate_limit(min_interval: float = 1.0):
"""用于限制函数调用频率的装饰器。"""
last_call = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decoratorUsage
使用示例
@rate_limit(min_interval=2.0) # Max once per 2 seconds
def check_page(url: str):
return requests.get(url)
undefined@rate_limit(min_interval=2.0) # 最多每2秒调用一次
def check_page(url: str):
return requests.get(url)
undefined