Loading...
Loading...
Expert Django and Celery guidance for asynchronous task processing. Use when designing background tasks, configuring workers, handling retries and errors, optimizing task performance, implementing periodic tasks, or setting up production monitoring. Follows Celery best practices with Django integration patterns.
npx skill4agent add vintasoftware/django-ai-plugins django-celery-expertreferences/django-integration.mdreferences/task-design-patterns.mdreferences/configuration-guide.mdreferences/error-handling.mdreferences/periodic-tasks.mdreferences/monitoring-observability.mdreferences/production-deployment.mdreferences/django-integration.mdreferences/task-design-patterns.mdreferences/configuration-guide.mdreferences/error-handling.mdreferences/periodic-tasks.mdreferences/monitoring-observability.mdreferences/production-deployment.md"Send welcome emails in the background after user registration"
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id):
from users.models import User
try:
user = User.objects.get(id=user_id)
send_mail(
subject="Welcome!",
message=f"Hi {user.name}, welcome to our platform!",
from_email="noreply@example.com",
recipient_list=[user.email],
)
except User.DoesNotExist:
pass # User deleted, skip
except Exception as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# views.py
def register(request):
user = User.objects.create(...)
send_welcome_email.delay(user.id) # Fire and forget
return redirect('dashboard')"Process a large CSV import with progress updates"
@shared_task(bind=True)
def import_csv(self, file_path, total_rows):
from myapp.models import Record
with open(file_path) as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
Record.objects.create(**row)
if i % 100 == 0:
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': total_rows}
)
return {'status': 'complete', 'processed': total_rows}
# Check progress
result = import_csv.AsyncResult(task_id)
if result.state == 'PROGRESS':
progress = result.info.get('current', 0) / result.info.get('total', 1)"Process an order: validate inventory, charge payment, then send confirmation"
from celery import chain
@shared_task
def validate_inventory(order_id):
# Returns order_id if valid, raises if not
order = Order.objects.get(id=order_id)
if not order.items_in_stock():
raise ValueError("Items out of stock")
return order_id
@shared_task
def charge_payment(order_id):
order = Order.objects.get(id=order_id)
order.charge()
return order_id
@shared_task
def send_confirmation(order_id):
order = Order.objects.get(id=order_id)
order.send_confirmation_email()
def process_order(order_id):
workflow = chain(
validate_inventory.s(order_id),
charge_payment.s(),
send_confirmation.s()
)
workflow.delay()django-celery-beatdjango-celery-resultsCELERY_@shared_task