page-monitoring

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Page monitoring methodology

网页监控方法论

Patterns for tracking web page changes, detecting content removal, and preserving important pages before they disappear.
用于追踪网页变更、检测内容移除,以及在重要页面消失前进行备份的实践模式。

Monitoring service comparison

监控服务对比

ServiceFree TierBest ForStorageAlert Speed
Visualping5 pagesVisual changesStandardMinutes
ChangeTowerYesCompliance, archiving12 yearsMinutes
Distill.io25 pagesElement-level tracking12 monthsSeconds
WacheteLimitedLogin-protected pages12 monthsMinutes
UptimeRobot50 monitorsUptime only2 monthsMinutes
服务免费版适用场景存储时长告警速度
Visualping5个页面视觉变更监控标准存储分钟级
ChangeTower支持合规性监控、归档12年分钟级
Distill.io25个页面元素级跟踪12个月秒级
Wachete有限额度需登录的页面监控12个月分钟级
UptimeRobot50个监控项仅可用性监控2个月分钟级

Quick-start: Monitor a page

快速入门:监控页面

Distill.io element monitoring

Distill.io元素监控

javascript
// Distill.io allows CSS/XPath selectors for precise monitoring
// Example selectors for common use cases:

// Monitor news article headlines
const newsSelector = '.article-headline, h1.title, .story-title';

// Monitor price changes
const priceSelector = '.price, .product-price, [data-price]';

// Monitor stock/availability
const availabilitySelector = '.in-stock, .availability, .stock-status';

// Monitor specific paragraph or section
const sectionSelector = '#main-content p:first-child';

// Monitor table data
const tableSelector = 'table.data-table tbody tr';
javascript
// Distill.io支持使用CSS/XPath选择器进行精准监控
// 常见场景的选择器示例:

// 监控新闻文章标题
const newsSelector = '.article-headline, h1.title, .story-title';

// 监控价格变更
const priceSelector = '.price, .product-price, [data-price]';

// 监控库存/可用性
const availabilitySelector = '.in-stock, .availability, .stock-status';

// 监控特定段落或区域
const sectionSelector = '#main-content p:first-child';

// 监控表格数据
const tableSelector = 'table.data-table tbody tr';

Python monitoring script

Python监控脚本

python
import requests
import hashlib
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from pathlib import Path
from typing import Optional
from bs4 import BeautifulSoup

class PageMonitor:
    """Simple page change monitor with local storage."""

    def __init__(self, storage_dir: Path):
        self.storage_dir = storage_dir
        self.storage_dir.mkdir(parents=True, exist_ok=True)
        self.state_file = storage_dir / 'monitor_state.json'
        self.state = self._load_state()

    def _load_state(self) -> dict:
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {'pages': {}}

    def _save_state(self):
        self.state_file.write_text(json.dumps(self.state, indent=2))

    def _get_page_hash(self, url: str, selector: str = None) -> tuple[str, str]:
        """Get content hash and content for a page or element."""

        response = requests.get(url, timeout=30, headers={
            'User-Agent': 'Mozilla/5.0 (PageMonitor/1.0)'
        })
        response.raise_for_status()

        if selector:
            soup = BeautifulSoup(response.text, 'html.parser')
            element = soup.select_one(selector)
            content = element.get_text(strip=True) if element else ''
        else:
            content = response.text

        content_hash = hashlib.sha256(content.encode()).hexdigest()
        return content_hash, content

    def add_page(self, url: str, name: str, selector: str = None):
        """Add a page to monitor."""

        content_hash, content = self._get_page_hash(url, selector)

        self.state['pages'][url] = {
            'name': name,
            'selector': selector,
            'last_hash': content_hash,
            'last_check': datetime.now().isoformat(),
            'last_content': content[:1000],  # Store preview
            'change_count': 0
        }

        self._save_state()
        print(f"Added: {name} ({url})")

    def check_page(self, url: str) -> Optional[dict]:
        """Check single page for changes."""

        if url not in self.state['pages']:
            return None

        page = self.state['pages'][url]
        selector = page.get('selector')

        try:
            new_hash, new_content = self._get_page_hash(url, selector)
        except Exception as e:
            return {
                'url': url,
                'name': page['name'],
                'status': 'error',
                'error': str(e)
            }

        changed = new_hash != page['last_hash']

        result = {
            'url': url,
            'name': page['name'],
            'status': 'changed' if changed else 'unchanged',
            'previous_content': page['last_content'],
            'new_content': new_content[:1000] if changed else None
        }

        if changed:
            page['last_hash'] = new_hash
            page['last_content'] = new_content[:1000]
            page['change_count'] += 1

            # Archive the change
            archive_file = self.storage_dir / f"{hashlib.md5(url.encode()).hexdigest()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
            archive_file.write_text(new_content)

        page['last_check'] = datetime.now().isoformat()
        self._save_state()

        return result

    def check_all(self) -> list[dict]:
        """Check all monitored pages."""
        results = []
        for url in self.state['pages']:
            result = self.check_page(url)
            if result:
                results.append(result)
        return results
python
import requests
import hashlib
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from pathlib import Path
from typing import Optional
from bs4 import BeautifulSoup

class PageMonitor:
    """基于本地存储的简单页面变更监控工具。"""

    def __init__(self, storage_dir: Path):
        self.storage_dir = storage_dir
        self.storage_dir.mkdir(parents=True, exist_ok=True)
        self.state_file = storage_dir / 'monitor_state.json'
        self.state = self._load_state()

    def _load_state(self) -> dict:
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {'pages': {}}

    def _save_state(self):
        self.state_file.write_text(json.dumps(self.state, indent=2))

    def _get_page_hash(self, url: str, selector: str = None) -> tuple[str, str]:
        """获取页面或指定元素的内容哈希值与内容。"""

        response = requests.get(url, timeout=30, headers={
            'User-Agent': 'Mozilla/5.0 (PageMonitor/1.0)'
        })
        response.raise_for_status()

        if selector:
            soup = BeautifulSoup(response.text, 'html.parser')
            element = soup.select_one(selector)
            content = element.get_text(strip=True) if element else ''
        else:
            content = response.text

        content_hash = hashlib.sha256(content.encode()).hexdigest()
        return content_hash, content

    def add_page(self, url: str, name: str, selector: str = None):
        """添加需要监控的页面。"""

        content_hash, content = self._get_page_hash(url, selector)

        self.state['pages'][url] = {
            'name': name,
            'selector': selector,
            'last_hash': content_hash,
            'last_check': datetime.now().isoformat(),
            'last_content': content[:1000],  # 存储预览内容
            'change_count': 0
        }

        self._save_state()
        print(f"已添加: {name} ({url})")

    def check_page(self, url: str) -> Optional[dict]:
        """检查单个页面是否有变更。"""

        if url not in self.state['pages']:
            return None

        page = self.state['pages'][url]
        selector = page.get('selector')

        try:
            new_hash, new_content = self._get_page_hash(url, selector)
        except Exception as e:
            return {
                'url': url,
                'name': page['name'],
                'status': 'error',
                'error': str(e)
            }

        changed = new_hash != page['last_hash']

        result = {
            'url': url,
            'name': page['name'],
            'status': 'changed' if changed else 'unchanged',
            'previous_content': page['last_content'],
            'new_content': new_content[:1000] if changed else None
        }

        if changed:
            page['last_hash'] = new_hash
            page['last_content'] = new_content[:1000]
            page['change_count'] += 1

            # 归档变更内容
            archive_file = self.storage_dir / f"{hashlib.md5(url.encode()).hexdigest()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
            archive_file.write_text(new_content)

        page['last_check'] = datetime.now().isoformat()
        self._save_state()

        return result

    def check_all(self) -> list[dict]:
        """检查所有已监控页面。"""
        results = []
        for url in self.state['pages']:
            result = self.check_page(url)
            if result:
                results.append(result)
        return results

Usage

使用示例

monitor = PageMonitor(Path('./page_monitor_data'))
monitor = PageMonitor(Path('./page_monitor_data'))

Add pages to monitor

添加需要监控的页面

monitor.add_page( 'https://example.com/important-page', 'Important Page', selector='.main-content' # Optional: monitor specific element )
monitor.add_page( 'https://example.com/important-page', '重要页面', selector='.main-content' # 可选:监控特定元素 )

Check for changes

检查变更

results = monitor.check_all() for result in results: if result['status'] == 'changed': print(f"CHANGED: {result['name']}") print(f" Previous: {result['previous_content'][:100]}...") print(f" New: {result['new_content'][:100]}...")
undefined
results = monitor.check_all() for result in results: if result['status'] == 'changed': print(f"已变更: {result['name']}") print(f" 之前内容: {result['previous_content'][:100]}...") print(f" 新内容: {result['new_content'][:100]}...")
undefined

Uptime monitoring

可用性监控

UptimeRobot API integration

UptimeRobot API集成

python
import requests
from typing import List, Optional

class UptimeRobotClient:
    """UptimeRobot API client for monitoring page availability."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.uptimerobot.com/v2"

    def _request(self, endpoint: str, params: dict = None) -> dict:
        data = {'api_key': self.api_key}
        if params:
            data.update(params)

        response = requests.post(f"{self.base_url}/{endpoint}", data=data)
        return response.json()

    def get_monitors(self) -> List[dict]:
        """Get all monitors."""
        result = self._request('getMonitors')
        return result.get('monitors', [])

    def create_monitor(self, friendly_name: str, url: str,
                       monitor_type: int = 1) -> dict:
        """Create a new monitor.

        Types: 1=HTTP(s), 2=Keyword, 3=Ping, 4=Port
        """
        return self._request('newMonitor', {
            'friendly_name': friendly_name,
            'url': url,
            'type': monitor_type
        })

    def get_monitor_uptime(self, monitor_id: int,
                           custom_uptime_ratios: str = "7-30-90") -> dict:
        """Get uptime statistics for a monitor."""
        return self._request('getMonitors', {
            'monitors': monitor_id,
            'custom_uptime_ratios': custom_uptime_ratios
        })

    def pause_monitor(self, monitor_id: int) -> dict:
        """Pause a monitor."""
        return self._request('editMonitor', {
            'id': monitor_id,
            'status': 0
        })

    def resume_monitor(self, monitor_id: int) -> dict:
        """Resume a monitor."""
        return self._request('editMonitor', {
            'id': monitor_id,
            'status': 1
        })
python
import requests
from typing import List, Optional

class UptimeRobotClient:
    """用于监控页面可用性的UptimeRobot API客户端。"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.uptimerobot.com/v2"

    def _request(self, endpoint: str, params: dict = None) -> dict:
        data = {'api_key': self.api_key}
        if params:
            data.update(params)

        response = requests.post(f"{self.base_url}/{endpoint}", data=data)
        return response.json()

    def get_monitors(self) -> List[dict]:
        """获取所有监控项。"""
        result = self._request('getMonitors')
        return result.get('monitors', [])

    def create_monitor(self, friendly_name: str, url: str,
                       monitor_type: int = 1) -> dict:
        """创建新的监控项。

        类型: 1=HTTP(s), 2=关键词监控, 3=Ping监控, 4=端口监控
        """
        return self._request('newMonitor', {
            'friendly_name': friendly_name,
            'url': url,
            'type': monitor_type
        })

    def get_monitor_uptime(self, monitor_id: int,
                           custom_uptime_ratios: str = "7-30-90") -> dict:
        """获取监控项的可用性统计数据。"""
        return self._request('getMonitors', {
            'monitors': monitor_id,
            'custom_uptime_ratios': custom_uptime_ratios
        })

    def pause_monitor(self, monitor_id: int) -> dict:
        """暂停监控项。"""
        return self._request('editMonitor', {
            'id': monitor_id,
            'status': 0
        })

    def resume_monitor(self, monitor_id: int) -> dict:
        """恢复监控项。"""
        return self._request('editMonitor', {
            'id': monitor_id,
            'status': 1
        })

Usage

使用示例

client = UptimeRobotClient('your-api-key')
client = UptimeRobotClient('your-api-key')

Create monitors for important pages

为重要页面创建监控项

client.create_monitor('News Homepage', 'https://example-news.com') client.create_monitor('API Status', 'https://api.example.com/health')
client.create_monitor('新闻首页', 'https://example-news.com') client.create_monitor('API状态页', 'https://api.example.com/health')

Check all monitors

检查所有监控项

for monitor in client.get_monitors(): status = 'UP' if monitor['status'] == 2 else 'DOWN' print(f"{monitor['friendly_name']}: {status}")
undefined
for monitor in client.get_monitors(): status = '正常' if monitor['status'] == 2 else '宕机' print(f"{monitor['friendly_name']}: {status}")
undefined

RSS feed generation

RSS订阅源生成

Generate RSS from pages without feeds

为无RSS的页面生成RSS订阅源

python
import requests
from bs4 import BeautifulSoup
from feedgen.feed import FeedGenerator
from datetime import datetime
import hashlib

class RSSGenerator:
    """Generate RSS feeds from web pages."""

    def __init__(self, feed_id: str, title: str, link: str):
        self.fg = FeedGenerator()
        self.fg.id(feed_id)
        self.fg.title(title)
        self.fg.link(href=link)
        self.fg.description(f'Auto-generated feed for {title}')

    def add_from_page(self, url: str, item_selector: str,
                      title_selector: str, link_selector: str,
                      description_selector: str = None):
        """Parse a page and add items to feed.

        Args:
            url: Page URL to parse
            item_selector: CSS selector for each item container
            title_selector: CSS selector for title (relative to item)
            link_selector: CSS selector for link (relative to item)
            description_selector: Optional CSS selector for description
        """

        response = requests.get(url, timeout=30)
        soup = BeautifulSoup(response.text, 'html.parser')

        items = soup.select(item_selector)

        for item in items[:20]:  # Limit to 20 items
            title_elem = item.select_one(title_selector)
            link_elem = item.select_one(link_selector)

            if not title_elem or not link_elem:
                continue

            title = title_elem.get_text(strip=True)
            link = link_elem.get('href', '')

            # Make absolute URL if relative
            if link.startswith('/'):
                from urllib.parse import urljoin
                link = urljoin(url, link)

            fe = self.fg.add_entry()
            fe.id(hashlib.md5(link.encode()).hexdigest())
            fe.title(title)
            fe.link(href=link)

            if description_selector:
                desc_elem = item.select_one(description_selector)
                if desc_elem:
                    fe.description(desc_elem.get_text(strip=True))

            fe.published(datetime.now())

    def generate_rss(self) -> str:
        """Generate RSS XML string."""
        return self.fg.rss_str(pretty=True).decode()

    def save_rss(self, filepath: str):
        """Save RSS feed to file."""
        self.fg.rss_file(filepath)
python
import requests
from bs4 import BeautifulSoup
from feedgen.feed import FeedGenerator
from datetime import datetime
import hashlib

class RSSGenerator:
    """从网页生成RSS订阅源的工具。"""

    def __init__(self, feed_id: str, title: str, link: str):
        self.fg = FeedGenerator()
        self.fg.id(feed_id)
        self.fg.title(title)
        self.fg.link(href=link)
        self.fg.description(f'为{title}自动生成的订阅源')

    def add_from_page(self, url: str, item_selector: str,
                      title_selector: str, link_selector: str,
                      description_selector: str = None):
        """解析页面并将内容添加到订阅源。

        参数:
            url: 要解析的页面URL
            item_selector: 单个内容项的CSS选择器
            title_selector: 标题的CSS选择器(相对内容项)
            link_selector: 链接的CSS选择器(相对内容项)
            description_selector: 描述内容的可选CSS选择器
        """

        response = requests.get(url, timeout=30)
        soup = BeautifulSoup(response.text, 'html.parser')

        items = soup.select(item_selector)

        for item in items[:20]:  # 限制最多20个内容项
            title_elem = item.select_one(title_selector)
            link_elem = item.select_one(link_selector)

            if not title_elem or not link_elem:
                continue

            title = title_elem.get_text(strip=True)
            link = link_elem.get('href', '')

            # 若为相对链接则转换为绝对URL
            if link.startswith('/'):
                from urllib.parse import urljoin
                link = urljoin(url, link)

            fe = self.fg.add_entry()
            fe.id(hashlib.md5(link.encode()).hexdigest())
            fe.title(title)
            fe.link(href=link)

            if description_selector:
                desc_elem = item.select_one(description_selector)
                if desc_elem:
                    fe.description(desc_elem.get_text(strip=True))

            fe.published(datetime.now())

    def generate_rss(self) -> str:
        """生成RSS XML字符串。"""
        return self.fg.rss_str(pretty=True).decode()

    def save_rss(self, filepath: str):
        """将RSS订阅源保存到文件。"""
        self.fg.rss_file(filepath)

Example: Generate feed for a news site without RSS

示例:为无RSS的新闻网站生成订阅源

rss = RSSGenerator( 'https://example.com/news', 'Example News Feed', 'https://example.com/news' )
rss.add_from_page( 'https://example.com/news', item_selector='.news-item', title_selector='h2 a', link_selector='h2 a', description_selector='.summary' )
rss = RSSGenerator( 'https://example.com/news', '示例新闻订阅源', 'https://example.com/news' )
rss.add_from_page( 'https://example.com/news', item_selector='.news-item', title_selector='h2 a', link_selector='h2 a', description_selector='.summary' )

Save the feed

保存订阅源

rss.save_rss('example_feed.xml')
undefined
rss.save_rss('example_feed.xml')
undefined

Using RSS-Bridge (self-hosted)

使用RSS-Bridge(自托管)

bash
undefined
bash
undefined

RSS-Bridge generates feeds for sites without them

RSS-Bridge可为无订阅源的网站生成RSS

Supports Twitter, Instagram, YouTube, and many others

支持Twitter、Instagram、YouTube等众多平台

Docker installation

Docker安装方式

docker pull rssbridge/rss-bridge docker run -d -p 3000:80 rssbridge/rss-bridge
docker pull rssbridge/rss-bridge docker run -d -p 3000:80 rssbridge/rss-bridge

Select a bridge, enter parameters, get RSS feed URL

选择对应的桥接器,输入参数即可获取RSS订阅源URL

undefined
undefined

Social media monitoring

社交媒体监控

Twitter/X archiving with Twarc

使用Twarc归档Twitter/X内容

python
undefined
python
undefined

Twarc requires Twitter API credentials

Twarc需要Twitter API凭证

Installation

安装方式

pip install twarc

pip install twarc

Configure

配置步骤

twarc2 configure

twarc2 configure

import subprocess import json from pathlib import Path
class TwitterArchiver: """Archive Twitter searches and timelines."""
def __init__(self, output_dir: Path):
    self.output_dir = output_dir
    self.output_dir.mkdir(parents=True, exist_ok=True)

def search(self, query: str, max_results: int = 100) -> Path:
    """Search tweets and save to file."""

    output_file = self.output_dir / f"search_{query.replace(' ', '_')}.jsonl"

    subprocess.run([
        'twarc2', 'search',
        '--max-results', str(max_results),
        query,
        str(output_file)
    ], check=True)

    return output_file

def get_timeline(self, username: str, max_results: int = 100) -> Path:
    """Get user timeline."""

    output_file = self.output_dir / f"timeline_{username}.jsonl"

    subprocess.run([
        'twarc2', 'timeline',
        '--max-results', str(max_results),
        username,
        str(output_file)
    ], check=True)

    return output_file

def parse_archive(self, filepath: Path) -> list[dict]:
    """Parse archived tweets."""

    tweets = []
    with open(filepath) as f:
        for line in f:
            data = json.loads(line)
            if 'data' in data:
                tweets.extend(data['data'])

    return tweets
undefined
import subprocess import json from pathlib import Path
class TwitterArchiver: """归档Twitter搜索结果和用户时间线的工具。"""
def __init__(self, output_dir: Path):
    self.output_dir = output_dir
    self.output_dir.mkdir(parents=True, exist_ok=True)

def search(self, query: str, max_results: int = 100) -> Path:
    """搜索推文并保存到文件。"""

    output_file = self.output_dir / f"search_{query.replace(' ', '_')}.jsonl"

    subprocess.run([
        'twarc2', 'search',
        '--max-results', str(max_results),
        query,
        str(output_file)
    ], check=True)

    return output_file

def get_timeline(self, username: str, max_results: int = 100) -> Path:
    """获取用户时间线内容。"""

    output_file = self.output_dir / f"timeline_{username}.jsonl"

    subprocess.run([
        'twarc2', 'timeline',
        '--max-results', str(max_results),
        username,
        str(output_file)
    ], check=True)

    return output_file

def parse_archive(self, filepath: Path) -> list[dict]:
    """解析已归档的推文数据。"""

    tweets = []
    with open(filepath) as f:
        for line in f:
            data = json.loads(line)
            if 'data' in data:
                tweets.extend(data['data'])

    return tweets
undefined

Webhook notifications

Webhook告警通知

Send alerts on changes

变更发生时发送告警

python
import requests
from typing import Optional

class AlertManager:
    """Send alerts when monitored pages change."""

    def __init__(self, slack_webhook: str = None,
                 discord_webhook: str = None,
                 email_config: dict = None):
        self.slack_webhook = slack_webhook
        self.discord_webhook = discord_webhook
        self.email_config = email_config

    def send_slack(self, message: str, channel: str = None):
        """Send Slack notification."""
        if not self.slack_webhook:
            return

        payload = {'text': message}
        if channel:
            payload['channel'] = channel

        requests.post(self.slack_webhook, json=payload)

    def send_discord(self, message: str):
        """Send Discord notification."""
        if not self.discord_webhook:
            return

        requests.post(self.discord_webhook, json={'content': message})

    def send_email(self, subject: str, body: str, to: str):
        """Send email notification."""
        if not self.email_config:
            return

        import smtplib
        from email.mime.text import MIMEText

        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = self.email_config['from']
        msg['To'] = to

        with smtplib.SMTP(self.email_config['smtp_host'],
                         self.email_config['smtp_port']) as server:
            server.starttls()
            server.login(self.email_config['username'],
                        self.email_config['password'])
            server.send_message(msg)

    def alert_change(self, page_name: str, url: str,
                     old_content: str, new_content: str):
        """Send change alert to all configured channels."""

        message = f"""
Page Changed: {page_name}
URL: {url}
Time: {datetime.now().isoformat()}

Previous content (preview):
{old_content[:200]}...

New content (preview):
{new_content[:200]}...
"""

        if self.slack_webhook:
            self.send_slack(message)

        if self.discord_webhook:
            self.send_discord(message)
python
import requests
from typing import Optional
from datetime import datetime

class AlertManager:
    """当监控页面发生变更时发送告警通知。"""

    def __init__(self, slack_webhook: str = None,
                 discord_webhook: str = None,
                 email_config: dict = None):
        self.slack_webhook = slack_webhook
        self.discord_webhook = discord_webhook
        self.email_config = email_config

    def send_slack(self, message: str, channel: str = None):
        """发送Slack通知。"""
        if not self.slack_webhook:
            return

        payload = {'text': message}
        if channel:
            payload['channel'] = channel

        requests.post(self.slack_webhook, json=payload)

    def send_discord(self, message: str):
        """发送Discord通知。"""
        if not self.discord_webhook:
            return

        requests.post(self.discord_webhook, json={'content': message})

    def send_email(self, subject: str, body: str, to: str):
        """发送邮件通知。"""
        if not self.email_config:
            return

        import smtplib
        from email.mime.text import MIMEText

        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = self.email_config['from']
        msg['To'] = to

        with smtplib.SMTP(self.email_config['smtp_host'],
                         self.email_config['smtp_port']) as server:
            server.starttls()
            server.login(self.email_config['username'],
                        self.email_config['password'])
            server.send_message(msg)

    def alert_change(self, page_name: str, url: str,
                     old_content: str, new_content: str):
        """向所有已配置的渠道发送变更告警。"""

        message = f"""
页面已变更: {page_name}
URL: {url}
时间: {datetime.now().isoformat()}

之前内容(预览):
{old_content[:200]}...

新内容(预览):
{new_content[:200]}...
"""

        if self.slack_webhook:
            self.send_slack(message)

        if self.discord_webhook:
            self.send_discord(message)

Scheduled monitoring with cron

使用Cron实现定时监控

Cron setup for continuous monitoring

Cron配置实现持续监控

bash
undefined
bash
undefined

Edit crontab

编辑Crontab

crontab -e
crontab -e

Check pages every 15 minutes

每15分钟检查一次页面

*/15 * * * * /usr/bin/python3 /path/to/monitor_script.py >> /var/log/monitor.log 2>&1
*/15 * * * * /usr/bin/python3 /path/to/monitor_script.py >> /var/log/monitor.log 2>&1

Check critical pages every 5 minutes

每5分钟检查一次关键页面

*/5 * * * * /usr/bin/python3 /path/to/critical_monitor.py >> /var/log/critical.log 2>&1
*/5 * * * * /usr/bin/python3 /path/to/critical_monitor.py >> /var/log/critical.log 2>&1

Daily summary report at 8 AM

每天早上8点生成汇总报告

0 8 * * * /usr/bin/python3 /path/to/daily_report.py
undefined
0 8 * * * /usr/bin/python3 /path/to/daily_report.py
undefined

Monitoring script template

监控脚本模板

python
#!/usr/bin/env python3
"""Page monitoring script for cron execution."""

import sys
from pathlib import Path
from datetime import datetime
python
#!/usr/bin/env python3
"""用于Cron定时执行的页面监控脚本。"""

import sys
from pathlib import Path
from datetime import datetime

Add project to path

将项目路径添加到系统路径

sys.path.insert(0, str(Path(file).parent))
from monitor import PageMonitor from alerts import AlertManager
def main(): # Initialize monitor = PageMonitor(Path('./data')) alerts = AlertManager( slack_webhook='https://hooks.slack.com/services/...', discord_webhook='https://discord.com/api/webhooks/...' )
# Check all pages
results = monitor.check_all()

# Process results
changes = [r for r in results if r['status'] == 'changed']
errors = [r for r in results if r['status'] == 'error']

# Alert on changes
for change in changes:
    alerts.alert_change(
        change['name'],
        change['url'],
        change['previous_content'],
        change['new_content']
    )
    print(f"[{datetime.now()}] CHANGE: {change['name']}")

# Alert on errors
for error in errors:
    alerts.send_slack(f"Monitor error for {error['name']}: {error['error']}")
    print(f"[{datetime.now()}] ERROR: {error['name']} - {error['error']}")

# Summary
print(f"[{datetime.now()}] Checked {len(results)} pages, "
      f"{len(changes)} changes, {len(errors)} errors")
if name == 'main': main()
undefined
sys.path.insert(0, str(Path(file).parent))
from monitor import PageMonitor from alerts import AlertManager
def main(): # 初始化 monitor = PageMonitor(Path('./data')) alerts = AlertManager( slack_webhook='https://hooks.slack.com/services/...', discord_webhook='https://discord.com/api/webhooks/...' )
# 检查所有页面
results = monitor.check_all()

# 处理结果
changes = [r for r in results if r['status'] == 'changed']
errors = [r for r in results if r['status'] == 'error']

# 发送变更告警
for change in changes:
    alerts.alert_change(
        change['name'],
        change['url'],
        change['previous_content'],
        change['new_content']
    )
    print(f"[{datetime.now()}] 已变更: {change['name']}")

# 发送错误告警
for error in errors:
    alerts.send_slack(f"监控错误 - {error['name']}: {error['error']}")
    print(f"[{datetime.now()}] 错误: {error['name']} - {error['error']}")

# 汇总信息
print(f"[{datetime.now()}] 已检查{len(results)}个页面," 
      f"发现{len(changes)}处变更,{len(errors)}个错误")
if name == 'main': main()
undefined

Archive on change

变更时自动归档

Automatic archiving when changes detected

检测到变更时自动归档内容

python
from multiarchiver import MultiArchiver

class ArchivingMonitor(PageMonitor):
    """Page monitor that archives content when changes detected."""

    def __init__(self, storage_dir: Path):
        super().__init__(storage_dir)
        self.archiver = MultiArchiver()

    def check_page(self, url: str) -> dict:
        """Check page and archive if changed."""

        result = super().check_page(url)

        if result and result['status'] == 'changed':
            # Archive to multiple services
            archive_results = self.archiver.archive_url(url)

            successful_archives = [
                r.archived_url for r in archive_results
                if r.success
            ]

            result['archives'] = successful_archives

            # Log archive URLs
            print(f"Archived {url} to:")
            for archive_url in successful_archives:
                print(f"  - {archive_url}")

        return result
python
from multiarchiver import MultiArchiver

class ArchivingMonitor(PageMonitor):
    """检测到页面变更时自动归档内容的监控工具。"""

    def __init__(self, storage_dir: Path):
        super().__init__(storage_dir)
        self.archiver = MultiArchiver()

    def check_page(self, url: str) -> dict:
        """检查页面并在变更时进行归档。"""

        result = super().check_page(url)

        if result and result['status'] == 'changed':
            # 归档到多个服务
            archive_results = self.archiver.archive_url(url)

            successful_archives = [
                r.archived_url for r in archive_results
                if r.success
            ]

            result['archives'] = successful_archives

            # 打印归档URL
            print(f"已将{url}归档到:")
            for archive_url in successful_archives:
                print(f"  - {archive_url}")

        return result

Monitoring strategy by use case

按场景划分的监控策略

News monitoring

新闻监控

markdown
undefined
markdown
undefined

News/Current Events Monitoring

新闻/时事监控

Pages to monitor:

监控页面范围:

  • Breaking news sections
  • Press release pages
  • Government announcement pages
  • Company newsrooms
  • 突发新闻板块
  • 新闻发布页面
  • 政府公告页面
  • 企业新闻中心

Monitoring frequency:

监控频率:

  • Breaking news: Every 5 minutes
  • Press releases: Every 15-30 minutes
  • General news: Every hour
  • 突发新闻:每5分钟一次
  • 新闻发布:每15-30分钟一次
  • 普通新闻:每小时一次

Archive strategy:

归档策略:

  • Archive immediately on detection
  • Use both Wayback Machine and Archive.today
  • Save local copy with timestamp
undefined
  • 检测到变更后立即归档
  • 同时使用Wayback Machine和Archive.today
  • 保存带时间戳的本地副本
undefined

Research monitoring

研究内容监控

markdown
undefined
markdown
undefined

Academic/Research Monitoring

学术/研究内容监控

Pages to monitor:

监控页面范围:

  • Preprint servers (arXiv, SSRN)
  • Journal table of contents
  • Conference proceedings
  • Researcher profiles
  • 预印本服务器(arXiv、SSRN)
  • 期刊目录
  • 会议论文集
  • 研究人员个人主页

Monitoring frequency:

监控频率:

  • Daily for active topics
  • Weekly for general monitoring
  • 热门主题:每日监控
  • 常规主题:每周监控

Tools recommended:

推荐工具:

  • Google Scholar alerts (free, built-in)
  • Semantic Scholar alerts
  • RSS feeds where available
  • Custom monitors for specific pages
undefined
  • Google Scholar告警(免费内置功能)
  • Semantic Scholar告警
  • 现有RSS订阅源
  • 针对特定页面的自定义监控脚本
undefined

Competitive intelligence

竞争情报监控

markdown
undefined
markdown
undefined

Competitor Monitoring

竞品监控

Pages to monitor:

监控页面范围:

  • Pricing pages
  • Product pages
  • Job postings
  • Press releases
  • Executive bios
  • 定价页面
  • 产品页面
  • 招聘信息
  • 新闻发布
  • 高管介绍

Monitoring frequency:

监控频率:

  • Pricing: Daily
  • Products: Daily
  • Jobs: Weekly
  • Press: Daily
  • 定价:每日监控
  • 产品:每日监控
  • 招聘:每周监控
  • 新闻:每日监控

Legal considerations:

法律注意事项:

  • Don't violate terms of service
  • Don't circumvent access controls
  • Public pages only
  • Don't scrape at high frequency
undefined
  • 不得违反服务条款
  • 不得规避访问控制
  • 仅监控公开页面
  • 不得高频抓取
undefined

Best practices

最佳实践

Monitoring checklist

监控检查清单

markdown
undefined
markdown
undefined

Before monitoring a page:

监控页面之前:

  • Is the page publicly accessible?
  • Are you respecting robots.txt?
  • Is monitoring frequency reasonable?
  • Do you have a legitimate purpose?
  • Are you storing data securely?
  • Do you have alerts configured?
  • Is archiving set up for important pages?
  • 页面是否可公开访问?
  • 是否遵守robots.txt规则?
  • 监控频率是否合理?
  • 是否有合法的监控目的?
  • 数据存储是否安全?
  • 是否已配置告警通知?
  • 是否已为重要页面设置归档?

Maintenance:

日常维护:

  • Review monitors monthly
  • Remove stale monitors
  • Update selectors if pages change
  • Check alert delivery
  • Verify archives are working
undefined
  • 每月复查监控项
  • 移除失效的监控项
  • 页面更新时同步修改选择器
  • 检查告警通知是否正常送达
  • 验证归档功能是否正常
undefined

Rate limiting

请求频率限制

python
import time
from functools import wraps

def rate_limit(min_interval: float = 1.0):
    """Decorator to rate limit function calls."""
    last_call = [0.0]

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_call[0]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            last_call[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator
python
import time
from functools import wraps

def rate_limit(min_interval: float = 1.0):
    """用于限制函数调用频率的装饰器。"""
    last_call = [0.0]

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_call[0]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            last_call[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

Usage

使用示例

@rate_limit(min_interval=2.0) # Max once per 2 seconds def check_page(url: str): return requests.get(url)
undefined
@rate_limit(min_interval=2.0) # 最多每2秒调用一次 def check_page(url: str): return requests.get(url)
undefined