A production-ready implementation of the Outbox Pattern using FastAPI, PostgreSQL, and pgqueuer - a powerful PostgreSQL-based job queue library.
This implementation leverages pgqueuer, an excellent library by janbjorge that provides:
- PostgreSQL-native job queuing with LISTEN/NOTIFY
- Automatic retry mechanisms with exponential backoff
- Horizontal worker scaling with
FOR UPDATE SKIP LOCKED
- Built-in dead letter queue handling
- Crash recovery via heartbeat system
Our contribution is demonstrating how to use pgqueuer to implement the Outbox Pattern for reliable, transactional event processing.
When building distributed systems, you often need to:
- Update your database (e.g., create a user)
- Send external messages (e.g., audit events, notifications, webhooks)
# This is problematic!
async def create_user_naive(user_data):
# Write 1: Save to database
user = await db.save_user(user_data)
# Write 2: Send external message
await audit_api.send_event(user_created_event)
return user
What can go wrong:
- π₯ Database succeeds, API fails β User created but no audit trail
- π₯ API succeeds, database fails β Audit event sent for non-existent user
- π₯ Partial failures during retries β Duplicate audit events
- π₯ Network timeouts β Uncertain state, manual cleanup needed
The Outbox Pattern eliminates dual-write problems by using a single transaction:
# This is reliable!
async def create_user_outbox(user_data):
async with database.transaction():
# Single transaction contains both writes
user = await db.save_user(user_data) # Write 1
await outbox.save_event(user_created_event) # Write 2
# Both succeed together, or both fail together
# Separate process handles event delivery
# (Worker picks up events and sends them reliably)
Benefits:
- β Atomic operations - Both writes succeed or both fail
- β Guaranteed delivery - Events stored in database, can't be lost
- β Exactly-once processing - No duplicates, even with retries
- β Reliable recovery - Events survive crashes and network issues
- β Decoupled systems - Database and external APIs don't affect each other
βββββββββββββββ ββββββββββββββββ βββββββββββββββ
β FastAPI βββββΆβ PostgreSQL βββββΆβ Worker β
β β β β β β
β 1. Save Userβ β 2. Save Job β β 3. Process β
β β β (Outbox) β β Audit β
βββββββββββββββ ββββββββββββββββ βββββββββββββββ
Flow:
- API Request: Create user
- Database Transaction: Save user + audit job atomically
- Worker Notification: PostgreSQL NOTIFY triggers worker immediately
- Job Processing: Worker processes audit job with retries
- Publishing: Worker publishes to external systems (APIs, message brokers, webhooks)
- Failure Handling: Failed jobs moved to Dead Letter Queue (DLQ)
Our code demonstrates:
- π Atomic Transactions - User creation and job queuing in single transaction
- π― Practical Outbox Pattern - Real-world example with FastAPI and PostgreSQL
- π§ Custom Retry Logic - Publisher-specific retry strategies
- π Comprehensive Logging - Enhanced job tracking and failure reporting
- π³ Docker Setup - Ready-to-run development environment
- π Complete Documentation - How to extend for different publishers
Powered by pgqueuer's features:
- β‘ Immediate Processing - PostgreSQL LISTEN/NOTIFY for instant job pickup
- π Smart Retries - Exponential backoff with configurable attempts
- π‘οΈ Crash Recovery - Jobs survive worker restarts via heartbeat mechanism
- π Dead Letter Queue - Failed jobs tracked for manual investigation
- ποΈ Horizontal Scaling - Multiple workers process jobs concurrently
git clone <this-repo>
cd OutboxPython
docker-compose up --build
curl -X POST http://localhost:8000/users/ \
-H "Content-Type: application/json" \
-d '{"name": "Alice"}'
You'll see the complete flow in the Docker logs.
fastapi_app | INFO: "POST /users/ HTTP/1.1" 200 OK
pgqueuer_worker | INFO - Processing audit job: {'event_type': 'user_created', 'user_id': 27, 'user_name': 'test', 'timestamp': '27'} (job_id: 23)
pgqueuer_worker | INFO - Mock Audit API success for user_id: 27
pgqueuer_worker | INFO - Successfully processed audit for user_id: 27 (job_id: 23)
fastapi_app | INFO: "POST /users/ HTTP/1.1" 200 OK
pgqueuer_worker | INFO - Processing audit job: {'event_type': 'user_created', 'user_id': 29, 'user_name': 'test', 'timestamp': '29'} (job_id: 25)
pgqueuer_worker | ERROR - Mock Audit API failed for payload: {'event_type': 'user_created', 'user_id': 29, 'user_name': 'test', 'timestamp': '29'}
pgqueuer_worker | WARNING - Audit API failed for job_id: 25, will retry
pgqueuer_worker | ERROR - Failed to process audit job 25: Audit API call failed - Job will be retried
# 5 seconds later (exponential backoff)
pgqueuer_worker | INFO - Processing audit job: {'event_type': 'user_created', 'user_id': 29, 'user_name': 'test', 'timestamp': '29'} (job_id: 25)
pgqueuer_worker | ERROR - Mock Audit API failed for payload: {'event_type': 'user_created', 'user_id': 29, 'user_name': 'test', 'timestamp': '29'}
pgqueuer_worker | WARNING - Audit API failed for job_id: 25, will retry
# 10 seconds later
pgqueuer_worker | INFO - Processing audit job: {'event_type': 'user_created', 'user_id': 29, 'user_name': 'test', 'timestamp': '29'} (job_id: 25)
pgqueuer_worker | ERROR - Mock Audit API failed for payload: {'event_type': 'user_created', 'user_id': 29, 'user_name': 'test', 'timestamp': '29'}
pgqueuer_worker | WARNING - Audit API failed for job_id: 25, will retry
pgqueuer_worker | ERROR - π¨ FINAL FAILURE - Job 25 failed permanently after 3 attempts. Payload: {'event_type': 'user_created', 'user_id': 29, 'user_name': 'test', 'timestamp': '29'}. Job moved to DLQ/failed status.
pgqueuer_worker | ERROR - Exception while processing entrypoint/job-id: audit_events/25
pgqueuer_worker | pgqueuer.errors.MaxRetriesExceeded: 3
# worker.py
@pgq.entrypoint("audit_events",
retry_timer=timedelta(seconds=30), # Wait 30s between retry cycles
executor_factory=retry_executor_factory
)
# Custom retry configuration:
max_attempts = 3, # 3 attempts per cycle
initial_delay = 5.0, # Start with 5s delay
max_delay = timedelta(seconds=60), # Cap at 60s delay
max_time = timedelta(minutes=10) # Give up after 10 minutes
# Scale to 3 workers
docker-compose up --scale worker=3
# worker.py - Fallback polling for missed notifications
await pgq.run(
dequeue_timeout=timedelta(seconds=5), # Poll every 5 seconds
batch_size=5, # Process 5 jobs per batch
max_concurrent_tasks=10 # 10 concurrent jobs per worker
)
The system creates several tables:
users
- Your application datapgqueuer
- Active job queuepgqueuer_statistics
- Completed/failed jobs (DLQ)pgqueuer_log
- Job processing history
# View active jobs
docker exec postgres_db psql -U fastapi_user -d fastapi_db -c "SELECT id, entrypoint, status, created FROM pgqueuer ORDER BY created DESC LIMIT 10;"
# View failed jobs (DLQ)
docker exec postgres_db psql -U fastapi_user -d fastapi_db -c "SELECT id, entrypoint, status, payload FROM pgqueuer_statistics WHERE status = 'failed';"
# Count jobs by status
docker exec postgres_db psql -U fastapi_user -d fastapi_db -c "SELECT status, COUNT(*) FROM pgqueuer GROUP BY status;"
This pattern is perfect for:
- User registration β Audit trail, compliance reporting
- Payment processing β Fraud detection, notifications
- Account updates β Risk assessment, customer alerts
- Order creation β Inventory updates, email confirmations
- User signup β Welcome emails, analytics events
- Product updates β Search index, recommendation engine
- User actions β Analytics, billing events
- Data changes β Webhooks, integrations
- System events β Monitoring, alerting
- Data access β Audit logs, security monitoring
- Configuration changes β Change tracking, approvals
- User permissions β Access reviews, compliance reports
- Track job processing rates and failures
- Monitor DLQ for persistent failures
- Set up alerts for worker health
- Scale workers based on queue depth
- Use different worker pools for different job types
- Consider regional deployment for global applications
- Encrypt sensitive job payloads
- Use proper database permissions
- Secure inter-service communication
- Tune batch sizes based on job complexity
- Monitor database connection pools
- Consider job payload size limits
- β Simpler setup - No additional infrastructure (thanks to PostgreSQL-native approach)
- β ACID transactions - Built-in consistency guarantees with your existing database
- β Easy debugging - Jobs visible in database using standard SQL
- β Familiar tooling - Standard PostgreSQL operations and monitoring
- β No vendor lock-in - Pure PostgreSQL solution
- β Lower latency - No network hops to external services
- β Cost effective - No per-message pricing
- β Better observability - Direct database access to job status
- β Transactional safety - Jobs and data in same PostgreSQL transaction
- β Built-in persistence - No separate Redis/broker needed
- β Automatic retries - pgqueuer's sophisticated retry mechanisms
- β Worker recovery - Jobs survive crashes automatically via heartbeat system
The worker currently demonstrates API-based publishing but can be easily extended to support multiple publishing mechanisms:
# worker.py - Current approach
async def mock_audit_api(payload: dict) -> bool:
"""Simulate publishing to external audit service via HTTP API"""
await asyncio.sleep(0.1) # API call simulation
return random.random() > 0.3 # 70% success rate
pgqueuer's @pgq.entrypoint
pattern makes it easy to add different publishing methods:
@pgq.entrypoint("kafka_events")
async def publish_to_kafka(job: Job) -> None:
"""Publish events to Kafka"""
payload = json.loads(job.payload.decode())
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092')
await producer.start()
try:
await producer.send_and_wait(
topic='user_events',
value=json.dumps(payload).encode()
)
logger.info(f"Published to Kafka: {payload}")
finally:
await producer.stop()
@pgq.entrypoint("rabbitmq_notifications")
async def publish_to_rabbitmq(job: Job) -> None:
"""Publish notifications to RabbitMQ"""
payload = json.loads(job.payload.decode())
connection = await aio_pika.connect_robust("amqp://rabbitmq:5672/")
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(json.dumps(payload).encode()),
routing_key="user.notifications"
)
await connection.close()
@pgq.entrypoint("sqs_events")
async def publish_to_sqs(job: Job) -> None:
"""Publish to AWS SQS"""
import boto3
payload = json.loads(job.payload.decode())
sqs = boto3.client('sqs')
sqs.send_message(
QueueUrl='https://sqs.region.amazonaws.com/account/queue',
MessageBody=json.dumps(payload)
)
@pgq.entrypoint("pubsub_events")
async def publish_to_pubsub(job: Job) -> None:
"""Publish to Google Pub/Sub"""
from google.cloud import pubsub_v1
payload = json.loads(job.payload.decode())
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('project-id', 'user-events')
future = publisher.publish(topic_path, json.dumps(payload).encode())
message_id = future.result()
@pgq.entrypoint("webhook_delivery")
async def deliver_webhook(job: Job) -> None:
"""Deliver webhook to external systems"""
import httpx
payload = json.loads(job.payload.decode())
webhook_config = payload['webhook_config']
async with httpx.AsyncClient() as client:
response = await client.post(
webhook_config['url'],
json=payload['data'],
headers=webhook_config.get('headers', {}),
timeout=30.0
)
response.raise_for_status()
@pgq.entrypoint("email_notifications")
async def send_email(job: Job) -> None:
"""Send email notifications"""
payload = json.loads(job.payload.decode())
# Using SendGrid, SES, or SMTP
await email_service.send(
to=payload['recipient'],
subject=payload['subject'],
body=payload['body']
)
You can route different events to different publishers:
# In your FastAPI app
async def create_user(user: UserCreate):
async with conn.transaction():
user_id = await conn.fetchval(
"INSERT INTO users (name) VALUES ($1) RETURNING id",
user.name
)
# Route to different publishers based on event type
await queries.enqueue(["audit_events"], [audit_payload]) # β HTTP API
await queries.enqueue(["kafka_events"], [analytics_payload]) # β Kafka
await queries.enqueue(["webhook_delivery"], [webhook_payload]) # β Webhooks
await queries.enqueue(["email_notifications"], [email_payload]) # β Email
Each publisher can have different retry behavior:
# HTTP APIs: Fast retries, short timeout
@pgq.entrypoint("api_calls",
retry_timer=timedelta(seconds=10),
executor_factory=fast_retry_executor
)
# Message brokers: Medium retries, connection handling
@pgq.entrypoint("kafka_events",
retry_timer=timedelta(seconds=30),
executor_factory=broker_retry_executor
)
# Email: Slow retries, higher tolerance
@pgq.entrypoint("email_notifications",
retry_timer=timedelta(minutes=5),
executor_factory=email_retry_executor
)
- π Unified Retry Logic - Same retry/DLQ behavior for all publishers
- π― Publisher Isolation - API failures don't affect message broker delivery
- π Consistent Monitoring - All publishing tracked in same database
- π§ Easy Testing - Mock individual publishers independently
- β‘ Selective Processing - Scale different publisher types separately
- Add publisher libraries to
pyproject.toml
- Create new entrypoint functions for each publisher type
- Update job creation to route to appropriate publishers
- Configure retry strategies per publisher needs
- Scale workers based on publisher requirements
The beauty of this approach is that your core business logic remains unchanged - you simply add new entrypoint handlers for different publishing destinations, and pgqueuer handles all the reliability, retry, and scaling concerns automatically.