Loading...
Loading...
Scheduler and background jobs syntax for Frappe/ERPNext v14/v15/v16. Use for scheduler_events in hooks.py, frappe.enqueue() for async jobs, queue configuration, job deduplication, error handling, and monitoring. Triggers on questions about scheduled tasks, background processing, cron jobs, RQ workers, job queues, async tasks.
npx skill4agent add openaec-foundation/erpnext_anthropic_claude_development_skill_package erpnext-syntax-scheduler# hooks.py
scheduler_events = {
"all": ["myapp.tasks.every_tick"],
"hourly": ["myapp.tasks.hourly_task"],
"daily": ["myapp.tasks.daily_task"],
"weekly": ["myapp.tasks.weekly_task"],
"monthly": ["myapp.tasks.monthly_task"],
"daily_long": ["myapp.tasks.heavy_daily"], # Long queue
"cron": {
"0 9 * * 1-5": ["myapp.tasks.weekday_9am"],
"*/15 * * * *": ["myapp.tasks.every_15_min"]
}
}bench migrate# Simple
frappe.enqueue("myapp.tasks.process", customer="CUST-001")
# With queue and timeout
frappe.enqueue(
"myapp.tasks.heavy_task",
queue="long",
timeout=3600,
param="value"
)
# With deduplication (v15)
from frappe.utils.background_jobs import is_job_enqueued
job_id = f"import::{doc.name}"
if not is_job_enqueued(job_id):
frappe.enqueue("myapp.tasks.import_data", job_id=job_id, doc=doc.name)| Event | Frequency | Queue |
|---|---|---|
| Every tick (v14: 4min, v15: 60s) | default |
| Per hour | default |
| Per day | default |
| Per week | default |
| Per month | default |
| Per hour | long |
| Per day | long |
| Per week | long |
| Per month | long |
| Custom schedule | configurable |
| Queue | Timeout | Usage |
|---|---|---|
| 300s (5 min) | Quick tasks, UI responses |
| 300s (5 min) | Standard tasks |
| 1500s (25 min) | Heavy processing, imports |
frappe.enqueue(
method, # REQUIRED: function or module path
queue="default", # Queue name
timeout=None, # Override timeout (seconds)
is_async=True, # False = execute directly
now=False, # True = via frappe.call()
job_id=None, # v15: unique ID for deduplication
enqueue_after_commit=False, # Wait for DB commit
at_front=False, # Place at front of queue
on_success=None, # Success callback
on_failure=None, # Failure callback
**kwargs # Arguments for method
)from frappe.utils.background_jobs import is_job_enqueued
job_id = f"process::{doc.name}"
if not is_job_enqueued(job_id):
frappe.enqueue(
"myapp.tasks.process",
job_id=job_id,
doc_name=doc.name
)# DO NOT USE - only for legacy code
from frappe.core.page.background_jobs.background_jobs import get_info
enqueued = [d.get("job_name") for d in get_info()]
if name not in enqueued:
frappe.enqueue(..., job_name=name)def process_records(records):
for record in records:
try:
process_single(record)
frappe.db.commit() # Commit per success
except Exception:
frappe.db.rollback() # Rollback on error
frappe.log_error(
frappe.get_traceback(),
f"Process Error: {record}"
)def on_success_handler(job, connection, result, *args, **kwargs):
frappe.publish_realtime("show_alert", {"message": "Done!"})
def on_failure_handler(job, connection, type, value, traceback):
frappe.log_error(f"Job {job.id} failed: {value}")
frappe.enqueue(
"myapp.tasks.risky_task",
on_success=on_success_handler,
on_failure=on_failure_handler
)def scheduled_task():
# frappe.session.user = "Administrator"
# Set explicit owner:
doc = frappe.new_doc("ToDo")
doc.owner = "user@example.com"
doc.insert(ignore_permissions=True)| Tool | Description |
|---|---|
| RQ Worker (DocType) | Worker status, busy/idle |
| RQ Job (DocType) | Job status, queue filter |
| Scheduler status overview |
| Scheduled Job Log | Execution history |
| Feature | v14 | v15 |
|---|---|---|
| Tick interval | 4 min | 60 sec |
| Config key | | |
| Deduplication | | |
bench migratejob_idis_job_enqueued()