erpnext-syntax-scheduler
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseERPNext Syntax: Scheduler & Background Jobs
ERPNext 语法参考:调度器与后台任务
Deterministic syntax reference for Frappe scheduler events and background job processing.
Frappe调度器事件与后台任务处理的确定性语法参考。
Quick Reference
快速参考
Scheduler Events (hooks.py)
调度器事件(hooks.py)
python
undefinedpython
undefinedhooks.py
hooks.py
scheduler_events = {
"all": ["myapp.tasks.every_tick"],
"hourly": ["myapp.tasks.hourly_task"],
"daily": ["myapp.tasks.daily_task"],
"weekly": ["myapp.tasks.weekly_task"],
"monthly": ["myapp.tasks.monthly_task"],
"daily_long": ["myapp.tasks.heavy_daily"], # Long queue
"cron": {
"0 9 * * 1-5": ["myapp.tasks.weekday_9am"],
"*/15 * * * *": ["myapp.tasks.every_15_min"]
}
}
**CRITICAL**: After EVERY change to scheduler_events: `bench migrate`scheduler_events = {
"all": ["myapp.tasks.every_tick"],
"hourly": ["myapp.tasks.hourly_task"],
"daily": ["myapp.tasks.daily_task"],
"weekly": ["myapp.tasks.weekly_task"],
"monthly": ["myapp.tasks.monthly_task"],
"daily_long": ["myapp.tasks.heavy_daily"], # 长队列
"cron": {
"0 9 * * 1-5": ["myapp.tasks.weekday_9am"],
"*/15 * * * *": ["myapp.tasks.every_15_min"]
}
}
**重要提示**:每次修改scheduler_events后,必须执行`bench migrate`frappe.enqueue Basics
frappe.enqueue 基础用法
python
undefinedpython
undefinedSimple
简单调用
frappe.enqueue("myapp.tasks.process", customer="CUST-001")
frappe.enqueue("myapp.tasks.process", customer="CUST-001")
With queue and timeout
指定队列与超时时间
frappe.enqueue(
"myapp.tasks.heavy_task",
queue="long",
timeout=3600,
param="value"
)
frappe.enqueue(
"myapp.tasks.heavy_task",
queue="long",
timeout=3600,
param="value"
)
With deduplication (v15)
任务去重(v15版本)
from frappe.utils.background_jobs import is_job_enqueued
job_id = f"import::{doc.name}"
if not is_job_enqueued(job_id):
frappe.enqueue("myapp.tasks.import_data", job_id=job_id, doc=doc.name)
undefinedfrom frappe.utils.background_jobs import is_job_enqueued
job_id = f"import::{doc.name}"
if not is_job_enqueued(job_id):
frappe.enqueue("myapp.tasks.import_data", job_id=job_id, doc=doc.name)
undefinedScheduler Event Types
调度器事件类型
| Event | Frequency | Queue |
|---|---|---|
| Every tick (v14: 4min, v15: 60s) | default |
| Per hour | default |
| Per day | default |
| Per week | default |
| Per month | default |
| Per hour | long |
| Per day | long |
| Per week | long |
| Per month | long |
| Custom schedule | configurable |
Version difference scheduler tick:
- v14: ~240 seconds (4 min)
- v15: ~60 seconds
| 事件类型 | 执行频率 | 队列 |
|---|---|---|
| 每次触发(v14:4分钟,v15:60秒) | default |
| 每小时 | default |
| 每天 | default |
| 每周 | default |
| 每月 | default |
| 每小时 | long |
| 每天 | long |
| 每周 | long |
| 每月 | long |
| 自定义调度 | 可配置 |
版本差异:调度触发间隔:
- v14: ~240秒(4分钟)
- v15: ~60秒
Queue Types
队列类型
| Queue | Timeout | Usage |
|---|---|---|
| 300s (5 min) | Quick tasks, UI responses |
| 300s (5 min) | Standard tasks |
| 1500s (25 min) | Heavy processing, imports |
| 队列 | 超时时间 | 使用场景 |
|---|---|---|
| 300秒(5分钟) | 快速任务、UI响应类任务 |
| 300秒(5分钟) | 标准任务 |
| 1500秒(25分钟) | 重处理任务、数据导入 |
frappe.enqueue Parameters
frappe.enqueue 参数说明
python
frappe.enqueue(
method, # REQUIRED: function or module path
queue="default", # Queue name
timeout=None, # Override timeout (seconds)
is_async=True, # False = execute directly
now=False, # True = via frappe.call()
job_id=None, # v15: unique ID for deduplication
enqueue_after_commit=False, # Wait for DB commit
at_front=False, # Place at front of queue
on_success=None, # Success callback
on_failure=None, # Failure callback
**kwargs # Arguments for method
)python
frappe.enqueue(
method, # 必填:函数或模块路径
queue="default", # 队列名称
timeout=None, # 覆盖默认超时时间(秒)
is_async=True, # False = 直接执行
now=False, # True = 通过frappe.call()执行
job_id=None, # v15:用于去重的唯一ID
enqueue_after_commit=False, # 等待数据库提交后再入队
at_front=False, # 插入队列头部
on_success=None, # 成功回调函数
on_failure=None, # 失败回调函数
**kwargs # 传递给目标方法的参数
)Job Deduplication
任务去重
v15+ (Recommended)
v15+(推荐用法)
python
from frappe.utils.background_jobs import is_job_enqueued
job_id = f"process::{doc.name}"
if not is_job_enqueued(job_id):
frappe.enqueue(
"myapp.tasks.process",
job_id=job_id,
doc_name=doc.name
)python
from frappe.utils.background_jobs import is_job_enqueued
job_id = f"process::{doc.name}"
if not is_job_enqueued(job_id):
frappe.enqueue(
"myapp.tasks.process",
job_id=job_id,
doc_name=doc.name
)v14 (Deprecated)
v14(已废弃)
python
undefinedpython
undefinedDO NOT USE - only for legacy code
请勿使用 - 仅适用于遗留代码
from frappe.core.page.background_jobs.background_jobs import get_info
enqueued = [d.get("job_name") for d in get_info()]
if name not in enqueued:
frappe.enqueue(..., job_name=name)
undefinedfrom frappe.core.page.background_jobs.background_jobs import get_info
enqueued = [d.get("job_name") for d in get_info()]
if name not in enqueued:
frappe.enqueue(..., job_name=name)
undefinedError Handling Pattern
错误处理模式
python
def process_records(records):
for record in records:
try:
process_single(record)
frappe.db.commit() # Commit per success
except Exception:
frappe.db.rollback() # Rollback on error
frappe.log_error(
frappe.get_traceback(),
f"Process Error: {record}"
)python
def process_records(records):
for record in records:
try:
process_single(record)
frappe.db.commit() # 每条记录处理成功后提交
except Exception:
frappe.db.rollback() # 出错时回滚
frappe.log_error(
frappe.get_traceback(),
f"Process Error: {record}"
)Callbacks
回调函数
python
def on_success_handler(job, connection, result, *args, **kwargs):
frappe.publish_realtime("show_alert", {"message": "Done!"})
def on_failure_handler(job, connection, type, value, traceback):
frappe.log_error(f"Job {job.id} failed: {value}")
frappe.enqueue(
"myapp.tasks.risky_task",
on_success=on_success_handler,
on_failure=on_failure_handler
)python
def on_success_handler(job, connection, result, *args, **kwargs):
frappe.publish_realtime("show_alert", {"message": "完成!"})
def on_failure_handler(job, connection, type, value, traceback):
frappe.log_error(f"Job {job.id} failed: {value}")
frappe.enqueue(
"myapp.tasks.risky_task",
on_success=on_success_handler,
on_failure=on_failure_handler
)User Context
用户上下文
IMPORTANT: Scheduler jobs run as Administrator!
python
def scheduled_task():
# frappe.session.user = "Administrator"
# Set explicit owner:
doc = frappe.new_doc("ToDo")
doc.owner = "user@example.com"
doc.insert(ignore_permissions=True)重要提示:调度器任务以Administrator身份运行!
python
def scheduled_task():
# frappe.session.user = "Administrator"
# 设置明确的所有者:
doc = frappe.new_doc("ToDo")
doc.owner = "user@example.com"
doc.insert(ignore_permissions=True)Monitoring
监控工具
| Tool | Description |
|---|---|
| RQ Worker (DocType) | Worker status, busy/idle |
| RQ Job (DocType) | Job status, queue filter |
| Scheduler status overview |
| Scheduled Job Log | Execution history |
| 工具 | 说明 |
|---|---|
| RQ Worker(文档类型) | Worker状态、忙碌/空闲状态 |
| RQ Job(文档类型) | 任务状态、队列筛选 |
| 调度器状态概览 |
| 定时任务日志 | 执行历史记录 |
Version Differences v14 vs v15
v14与v15版本差异
| Feature | v14 | v15 |
|---|---|---|
| Tick interval | 4 min | 60 sec |
| Config key | | |
| Deduplication | | |
| 特性 | v14 | v15 |
|---|---|---|
| 触发间隔 | 4分钟 | 60秒 |
| 配置键 | | |
| 去重方式 | | |
Reference Files
参考文档
- scheduler-events.md: All event types, cron syntax, configuration
- enqueue-api.md: Complete frappe.enqueue/enqueue_doc API
- queues.md: Queue types, timeouts, custom queues, workers
- examples.md: Complete working examples
- anti-patterns.md: Common mistakes and corrections
- scheduler-events.md: 所有事件类型、cron语法、配置说明
- enqueue-api.md: 完整的frappe.enqueue/enqueue_doc API文档
- queues.md: 队列类型、超时设置、自定义队列、Worker说明
- examples.md: 完整的工作示例
- anti-patterns.md: 常见错误及修正方案
Critical Rules
关键规则
- ALWAYS after hooks.py scheduler_events changes
bench migrate - USE +
job_idfor deduplication (v15)is_job_enqueued() - CHOOSE correct queue: short/default/long based on duration
- COMMIT per successful record, rollback on error
- REMEMBER that jobs run as Administrator
- ENQUEUE heavy tasks from scheduler events, don't execute directly
- 每次修改hooks.py中的scheduler_events后,必须执行
bench migrate - 推荐使用+
job_id实现任务去重(v15版本)is_job_enqueued() - 根据任务时长选择正确的队列:short/default/long
- 每条成功记录提交一次,出错时回滚
- 注意任务以Administrator身份运行
- 将重处理任务加入队列,不要在调度器事件中直接执行