Skip to main content

Background Tasks

Overview

SpeedPy uses Celery with Redis as the message broker for background task processing. Redbeat is used as the scheduler for periodic tasks.

Architecture

The Docker Compose setup includes three Celery-related services:

  • redis — Message broker and result backend
  • celery — Worker that processes tasks
  • celery-beat — Scheduler that triggers periodic tasks
celery:
command: celery -A project.celeryapp:app worker -Q default --loglevel=DEBUG --concurrency=1

celery-beat:
command: celery -A project.celeryapp:app beat -S redbeat.RedBeatScheduler --loglevel=DEBUG

Configuration

Celery is configured in project/celeryapp.py:

app = Celery("project")
app.autodiscover_tasks()
app.conf.broker_url = env("REDIS_URL", default=None)
app.conf.result_backend = env("REDIS_URL", default=None)
app.conf.accept_content = ["application/json"]
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.task_default_queue = "default"
app.conf.worker_prefetch_multiplier = 1
app.conf.broker_pool_limit = 1

Key settings:

  • JSON serialization for tasks and results
  • Single queue (default) — add more as needed
  • Prefetch multiplier of 1 — appropriate for longer-running tasks
  • Structlog integration via DjangoStructLogInitStep

Writing Tasks

Place tasks in mainapp/tasks/ (one file per domain). Tasks are auto-discovered by Celery.

# mainapp/tasks/my_feature.py
from celery import shared_task
import structlog

logger = structlog.get_logger(__name__)

@shared_task(name="process_something")
def process_something(item_id):
logger.info("processing_item", item_id=item_id)
# your logic here
return f"Processed {item_id}"

Make sure to import your tasks module in mainapp/tasks/__init__.py:

from .my_feature import *

Calling Tasks

from mainapp.tasks import process_something

# Async (queued)
process_something.delay(item_id=123)

# With options
process_something.apply_async(
args=[123],
queue="default",
countdown=60, # delay 60 seconds
)

Periodic Tasks (Beat Schedule)

Define periodic tasks in project/celeryapp.py:

from celery.schedules import crontab

app.conf.beat_schedule = {
"expire-team-memberships": {
"task": "expire_team_memberships",
"schedule": crontab(hour=2, minute=0), # Daily at 2:00 AM
"options": {
"ignore_result": True,
"queue": "default",
},
},
}

Schedule examples:

  • crontab(minute=0, hour=0) — midnight daily
  • crontab(minute=0, hour='*/2') — every 2 hours
  • crontab(hour=0, minute=0, day_of_month=1) — first of every month
  • 60 * 10 — every 10 minutes (in seconds)

Included Tasks

SpeedPy ships with these periodic tasks:

TaskSchedulePurpose
expire_team_membershipsDaily 2:00 AMDeletes memberships past access_expires_at
expire_team_memberships_invitationsDaily 2:30 AMDeletes expired pending invitations