Background Tasks
Overview
SpeedPy uses Celery with Redis as the message broker for background task processing. Redbeat is used as the scheduler for periodic tasks.
Architecture
The Docker Compose setup includes three Celery-related services:
redis— Message broker and result backendcelery— Worker that processes taskscelery-beat— Scheduler that triggers periodic tasks
celery:
command: celery -A project.celeryapp:app worker -Q default --loglevel=DEBUG --concurrency=1
celery-beat:
command: celery -A project.celeryapp:app beat -S redbeat.RedBeatScheduler --loglevel=DEBUG
Configuration
Celery is configured in project/celeryapp.py:
app = Celery("project")
app.autodiscover_tasks()
app.conf.broker_url = env("REDIS_URL", default=None)
app.conf.result_backend = env("REDIS_URL", default=None)
app.conf.accept_content = ["application/json"]
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.task_default_queue = "default"
app.conf.worker_prefetch_multiplier = 1
app.conf.broker_pool_limit = 1
Key settings:
- JSON serialization for tasks and results
- Single queue (
default) — add more as needed - Prefetch multiplier of 1 — appropriate for longer-running tasks
- Structlog integration via
DjangoStructLogInitStep
Writing Tasks
Place tasks in mainapp/tasks/ (one file per domain). Tasks are auto-discovered by Celery.
# mainapp/tasks/my_feature.py
from celery import shared_task
import structlog
logger = structlog.get_logger(__name__)
@shared_task(name="process_something")
def process_something(item_id):
logger.info("processing_item", item_id=item_id)
# your logic here
return f"Processed {item_id}"
Make sure to import your tasks module in mainapp/tasks/__init__.py:
from .my_feature import *
Calling Tasks
from mainapp.tasks import process_something
# Async (queued)
process_something.delay(item_id=123)
# With options
process_something.apply_async(
args=[123],
queue="default",
countdown=60, # delay 60 seconds
)
Periodic Tasks (Beat Schedule)
Define periodic tasks in project/celeryapp.py:
from celery.schedules import crontab
app.conf.beat_schedule = {
"expire-team-memberships": {
"task": "expire_team_memberships",
"schedule": crontab(hour=2, minute=0), # Daily at 2:00 AM
"options": {
"ignore_result": True,
"queue": "default",
},
},
}
Schedule examples:
crontab(minute=0, hour=0)— midnight dailycrontab(minute=0, hour='*/2')— every 2 hourscrontab(hour=0, minute=0, day_of_month=1)— first of every month60 * 10— every 10 minutes (in seconds)
Included Tasks
SpeedPy ships with these periodic tasks:
| Task | Schedule | Purpose |
|---|---|---|
expire_team_memberships | Daily 2:00 AM | Deletes memberships past access_expires_at |
expire_team_memberships_invitations | Daily 2:30 AM | Deletes expired pending invitations |