⚡ Supercharge your async applications with tasks so fast, you'll think you're bending time itself. ⚡
Documentation: https://asyncmq.dymmond.com 📚
Source Code: https://github.com/dymmond/asyncmq
The official supported version is always the latest released.
AsyncMQ is an asynchronous Python job queue focused on asyncio/anyio workloads.
It gives you:
- task registration via
@task - queue and worker runtime APIs
- delayed jobs, retries/backoff, TTL expiration, and dead-letter routing
- multiple backends (
Redis,Postgres,MongoDB,RabbitMQ, in-memory) - a CLI (
asyncmq) and a built-in dashboard app
AsyncMQ is:
- a library-first queue/worker runtime you embed in Python apps
- backend-pluggable through a shared
BaseBackendcontract - suitable for both local development and production deployments
AsyncMQ is not:
- a hosted queue service
- a guaranteed exactly-once execution system
- a replacement for domain-level idempotency in your task code
At runtime, AsyncMQ has four main layers:
- Task registration:
@task(queue=...)stores handlers inTASK_REGISTRYand adds.enqueue()helpers. - Queue API:
Queuewraps backend operations (enqueue,pause,list_jobs, delayed/repeatable APIs). - Worker runtime:
process_job/handle_jobrun tasks, manage state transitions, retries, and acknowledgements. - Backend and store: concrete backends persist job state and queue metadata.
For an end-to-end walkthrough, start with Core Concepts.
Use in-memory backend first so you can run without Redis/Postgres.
# myapp/settings.py
from asyncmq.backends.memory import InMemoryBackend
from asyncmq.conf.global_settings import Settings
class AppSettings(Settings):
backend = InMemoryBackend()export ASYNCMQ_SETTINGS_MODULE=myapp.settings.AppSettings# myapp/tasks.py
from asyncmq.tasks import task
@task(queue="emails", retries=2, ttl=300)
async def send_welcome(email: str) -> None:
print(f"sent welcome email to {email}")# producer.py
import anyio
from asyncmq.queues import Queue
from myapp.tasks import send_welcome
async def main() -> None:
queue = Queue("emails")
job_id = await send_welcome.enqueue("alice@example.com", backend=queue.backend)
print("enqueued", job_id)
anyio.run(main)asyncmq worker start emails --concurrency 1