erpnext-syntax-scheduler

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

ERPNext Syntax: Scheduler & Background Jobs

ERPNext 语法参考:调度器与后台任务

Deterministic syntax reference for Frappe scheduler events and background job processing.
Frappe调度器事件与后台任务处理的确定性语法参考。

Quick Reference

快速参考

Scheduler Events (hooks.py)

调度器事件(hooks.py)

python
undefined
python
undefined

hooks.py

hooks.py

scheduler_events = { "all": ["myapp.tasks.every_tick"], "hourly": ["myapp.tasks.hourly_task"], "daily": ["myapp.tasks.daily_task"], "weekly": ["myapp.tasks.weekly_task"], "monthly": ["myapp.tasks.monthly_task"], "daily_long": ["myapp.tasks.heavy_daily"], # Long queue "cron": { "0 9 * * 1-5": ["myapp.tasks.weekday_9am"], "*/15 * * * *": ["myapp.tasks.every_15_min"] } }

**CRITICAL**: After EVERY change to scheduler_events: `bench migrate`
scheduler_events = { "all": ["myapp.tasks.every_tick"], "hourly": ["myapp.tasks.hourly_task"], "daily": ["myapp.tasks.daily_task"], "weekly": ["myapp.tasks.weekly_task"], "monthly": ["myapp.tasks.monthly_task"], "daily_long": ["myapp.tasks.heavy_daily"], # 长队列 "cron": { "0 9 * * 1-5": ["myapp.tasks.weekday_9am"], "*/15 * * * *": ["myapp.tasks.every_15_min"] } }

**重要提示**:每次修改scheduler_events后,必须执行`bench migrate`

frappe.enqueue Basics

frappe.enqueue 基础用法

python
undefined
python
undefined

Simple

简单调用

frappe.enqueue("myapp.tasks.process", customer="CUST-001")
frappe.enqueue("myapp.tasks.process", customer="CUST-001")

With queue and timeout

指定队列与超时时间

frappe.enqueue( "myapp.tasks.heavy_task", queue="long", timeout=3600, param="value" )
frappe.enqueue( "myapp.tasks.heavy_task", queue="long", timeout=3600, param="value" )

With deduplication (v15)

任务去重(v15版本)

from frappe.utils.background_jobs import is_job_enqueued
job_id = f"import::{doc.name}" if not is_job_enqueued(job_id): frappe.enqueue("myapp.tasks.import_data", job_id=job_id, doc=doc.name)
undefined
from frappe.utils.background_jobs import is_job_enqueued
job_id = f"import::{doc.name}" if not is_job_enqueued(job_id): frappe.enqueue("myapp.tasks.import_data", job_id=job_id, doc=doc.name)
undefined

Scheduler Event Types

调度器事件类型

EventFrequencyQueue
all
Every tick (v14: 4min, v15: 60s)default
hourly
Per hourdefault
daily
Per daydefault
weekly
Per weekdefault
monthly
Per monthdefault
hourly_long
Per hourlong
daily_long
Per daylong
weekly_long
Per weeklong
monthly_long
Per monthlong
cron
Custom scheduleconfigurable
Version difference scheduler tick:
  • v14: ~240 seconds (4 min)
  • v15: ~60 seconds
事件类型执行频率队列
all
每次触发(v14:4分钟,v15:60秒)default
hourly
每小时default
daily
每天default
weekly
每周default
monthly
每月default
hourly_long
每小时long
daily_long
每天long
weekly_long
每周long
monthly_long
每月long
cron
自定义调度可配置
版本差异:调度触发间隔:
  • v14: ~240秒(4分钟)
  • v15: ~60秒

Queue Types

队列类型

QueueTimeoutUsage
short
300s (5 min)Quick tasks, UI responses
default
300s (5 min)Standard tasks
long
1500s (25 min)Heavy processing, imports
队列超时时间使用场景
short
300秒(5分钟)快速任务、UI响应类任务
default
300秒(5分钟)标准任务
long
1500秒(25分钟)重处理任务、数据导入

frappe.enqueue Parameters

frappe.enqueue 参数说明

python
frappe.enqueue(
    method,                      # REQUIRED: function or module path
    queue="default",             # Queue name
    timeout=None,                # Override timeout (seconds)
    is_async=True,               # False = execute directly
    now=False,                   # True = via frappe.call()
    job_id=None,                 # v15: unique ID for deduplication
    enqueue_after_commit=False,  # Wait for DB commit
    at_front=False,              # Place at front of queue
    on_success=None,             # Success callback
    on_failure=None,             # Failure callback
    **kwargs                     # Arguments for method
)
python
frappe.enqueue(
    method,                      # 必填:函数或模块路径
    queue="default",             # 队列名称
    timeout=None,                # 覆盖默认超时时间(秒)
    is_async=True,               # False = 直接执行
    now=False,                   # True = 通过frappe.call()执行
    job_id=None,                 # v15:用于去重的唯一ID
    enqueue_after_commit=False,  # 等待数据库提交后再入队
    at_front=False,              # 插入队列头部
    on_success=None,             # 成功回调函数
    on_failure=None,             # 失败回调函数
    **kwargs                     # 传递给目标方法的参数
)

Job Deduplication

任务去重

v15+ (Recommended)

v15+(推荐用法)

python
from frappe.utils.background_jobs import is_job_enqueued

job_id = f"process::{doc.name}"
if not is_job_enqueued(job_id):
    frappe.enqueue(
        "myapp.tasks.process",
        job_id=job_id,
        doc_name=doc.name
    )
python
from frappe.utils.background_jobs import is_job_enqueued

job_id = f"process::{doc.name}"
if not is_job_enqueued(job_id):
    frappe.enqueue(
        "myapp.tasks.process",
        job_id=job_id,
        doc_name=doc.name
    )

v14 (Deprecated)

v14(已废弃)

python
undefined
python
undefined

DO NOT USE - only for legacy code

请勿使用 - 仅适用于遗留代码

from frappe.core.page.background_jobs.background_jobs import get_info enqueued = [d.get("job_name") for d in get_info()] if name not in enqueued: frappe.enqueue(..., job_name=name)
undefined
from frappe.core.page.background_jobs.background_jobs import get_info enqueued = [d.get("job_name") for d in get_info()] if name not in enqueued: frappe.enqueue(..., job_name=name)
undefined

Error Handling Pattern

错误处理模式

python
def process_records(records):
    for record in records:
        try:
            process_single(record)
            frappe.db.commit()  # Commit per success
        except Exception:
            frappe.db.rollback()  # Rollback on error
            frappe.log_error(
                frappe.get_traceback(),
                f"Process Error: {record}"
            )
python
def process_records(records):
    for record in records:
        try:
            process_single(record)
            frappe.db.commit()  # 每条记录处理成功后提交
        except Exception:
            frappe.db.rollback()  # 出错时回滚
            frappe.log_error(
                frappe.get_traceback(),
                f"Process Error: {record}"
            )

Callbacks

回调函数

python
def on_success_handler(job, connection, result, *args, **kwargs):
    frappe.publish_realtime("show_alert", {"message": "Done!"})

def on_failure_handler(job, connection, type, value, traceback):
    frappe.log_error(f"Job {job.id} failed: {value}")

frappe.enqueue(
    "myapp.tasks.risky_task",
    on_success=on_success_handler,
    on_failure=on_failure_handler
)
python
def on_success_handler(job, connection, result, *args, **kwargs):
    frappe.publish_realtime("show_alert", {"message": "完成!"})

def on_failure_handler(job, connection, type, value, traceback):
    frappe.log_error(f"Job {job.id} failed: {value}")

frappe.enqueue(
    "myapp.tasks.risky_task",
    on_success=on_success_handler,
    on_failure=on_failure_handler
)

User Context

用户上下文

IMPORTANT: Scheduler jobs run as Administrator!
python
def scheduled_task():
    # frappe.session.user = "Administrator"
    
    # Set explicit owner:
    doc = frappe.new_doc("ToDo")
    doc.owner = "user@example.com"
    doc.insert(ignore_permissions=True)
重要提示:调度器任务以Administrator身份运行!
python
def scheduled_task():
    # frappe.session.user = "Administrator"
    
    # 设置明确的所有者:
    doc = frappe.new_doc("ToDo")
    doc.owner = "user@example.com"
    doc.insert(ignore_permissions=True)

Monitoring

监控工具

ToolDescription
RQ Worker (DocType)Worker status, busy/idle
RQ Job (DocType)Job status, queue filter
bench doctor
Scheduler status overview
Scheduled Job LogExecution history
工具说明
RQ Worker(文档类型)Worker状态、忙碌/空闲状态
RQ Job(文档类型)任务状态、队列筛选
bench doctor
调度器状态概览
定时任务日志执行历史记录

Version Differences v14 vs v15

v14与v15版本差异

Featurev14v15
Tick interval4 min60 sec
Config key
scheduler_interval
scheduler_tick_interval
Deduplication
job_name
job_id
+
is_job_enqueued()
特性v14v15
触发间隔4分钟60秒
配置键
scheduler_interval
scheduler_tick_interval
去重方式
job_name
job_id
+
is_job_enqueued()

Reference Files

参考文档

  • scheduler-events.md: All event types, cron syntax, configuration
  • enqueue-api.md: Complete frappe.enqueue/enqueue_doc API
  • queues.md: Queue types, timeouts, custom queues, workers
  • examples.md: Complete working examples
  • anti-patterns.md: Common mistakes and corrections
  • scheduler-events.md: 所有事件类型、cron语法、配置说明
  • enqueue-api.md: 完整的frappe.enqueue/enqueue_doc API文档
  • queues.md: 队列类型、超时设置、自定义队列、Worker说明
  • examples.md: 完整的工作示例
  • anti-patterns.md: 常见错误及修正方案

Critical Rules

关键规则

  1. ALWAYS
    bench migrate
    after hooks.py scheduler_events changes
  2. USE
    job_id
    +
    is_job_enqueued()
    for deduplication (v15)
  3. CHOOSE correct queue: short/default/long based on duration
  4. COMMIT per successful record, rollback on error
  5. REMEMBER that jobs run as Administrator
  6. ENQUEUE heavy tasks from scheduler events, don't execute directly
  1. 每次修改hooks.py中的scheduler_events后,必须执行
    bench migrate
  2. 推荐使用
    job_id
    +
    is_job_enqueued()
    实现任务去重(v15版本)
  3. 根据任务时长选择正确的队列:short/default/long
  4. 每条成功记录提交一次,出错时回滚
  5. 注意任务以Administrator身份运行
  6. 将重处理任务加入队列,不要在调度器事件中直接执行