Event-Driven Architecture
Use domain events to decouple modules in your modular monolith. Modules publish events when something significant happens, and other modules subscribe to react.
Overview
In a modular monolith, modules need to communicate without creating tight dependencies. Direct imports between modules lead to tangled dependency graphs that make the codebase hard to maintain.
The solution: modules publish events when something happens, and other modules subscribe to react. This provides:
Decoupling: Publishers don’t know about subscribers
Testability: Modules can be tested in isolation
Scalability path: The same event contracts work with in-memory or external brokers
The Event Bus
At the heart of the system is a simple in-memory event bus that routes events to registered handlers.
# {project_slug}/domain_events/bus.py
from collections import defaultdict
class EventBus:
"""A simple in-memory pub-sub mechanism for domain events."""
def __init__(self):
self._subscribers = defaultdict(list)
def subscribe(self, event_type, handler):
"""Register a handler for a given event type."""
self._subscribers[event_type].append(handler)
def publish(self, event):
"""Publish an event to all registered handlers."""
event_type = type(event)
for handler in self._subscribers.get(event_type, []):
handler(event)
# Module-level singleton
event_bus = EventBus()
The event bus is instantiated once at module load time. All modules import the same event_bus singleton, ensuring a single registry of subscribers across the application.
Defining Events
Events are simple data classes that represent something that happened in your domain. They inherit from a marker base class:
# {project_slug}/domain_events/base.py
from abc import ABC
class DomainEvent(ABC):
"""Base class for all domain events."""
pass
Concrete events store the data needed by handlers:
# {project_slug}/domain_events/events.py
from {project_slug}.domain_events.base import DomainEvent
class OrderPlacedEvent(DomainEvent):
"""Emitted when a new order is placed."""
def __init__(
self,
order_uuid: str,
user_id: int,
product_dict: list[dict],
fulfillment_state: str,
):
self.order_uuid = order_uuid
self.user_id = user_id
self.product_dict = product_dict
self.fulfillment_state = fulfillment_state
class PrescriptionRequestApprovedEvent(DomainEvent):
"""Emitted when a provider approves a prescription request."""
def __init__(
self,
prescription_request_id: int,
prescription_id: int,
encounter_id: int,
order_uuid: str,
order_item_uuid: str,
):
self.prescription_request_id = prescription_request_id
self.prescription_id = prescription_id
self.encounter_id = encounter_id
self.order_uuid = order_uuid
self.order_item_uuid = order_item_uuid
Naming conventions:
Use past tense:
OrderPlacedEvent, notPlaceOrderEventInclude the domain context:
PrescriptionRequestApprovedEventBe specific about what happened
What data to include:
IDs and UUIDs needed to look up related entities
Key state that handlers need without additional queries
Avoid including full model instances (they may be stale)
Registering Handlers
Handlers are registered during Django’s app initialization using AppConfig.ready(). This ensures registration happens after all apps are loaded, avoiding circular import issues.
# {project_slug}/orders/apps.py
from django.apps import AppConfig
class OrdersConfig(AppConfig):
name = "{project_slug}.orders"
verbose_name = "Orders"
def ready(self) -> None:
"""Register event handlers when the app is ready."""
# Lazy imports to avoid circular dependencies
from {project_slug}.domain_events.bus import event_bus
from {project_slug}.domain_events.events import (
PrescriptionRequestApprovedEvent,
PrescriptionRequestRejectedEvent,
EncounterCompletedEvent,
)
from {project_slug}.orders.handlers import (
handle_prescription_request_approved,
handle_prescription_request_rejected,
handle_encounter_completed,
)
# Register handlers
event_bus.subscribe(
PrescriptionRequestApprovedEvent,
handle_prescription_request_approved,
)
event_bus.subscribe(
PrescriptionRequestRejectedEvent,
handle_prescription_request_rejected,
)
event_bus.subscribe(
EncounterCompletedEvent,
handle_encounter_completed,
)
Key points:
Use lazy imports inside
ready()to avoid circular dependenciesHandlers can be functions or class methods
Multiple handlers can subscribe to the same event type
Handler Implementation
Handlers receive the event and perform their logic:
# {project_slug}/orders/handlers.py
import logging
from django.db import transaction
from {project_slug}.domain_events.events import PrescriptionRequestApprovedEvent
from {project_slug}.orders.models import Order, OrderItem, OrderStatus
logger = logging.getLogger(__name__)
def handle_prescription_request_approved(event: PrescriptionRequestApprovedEvent) -> None:
"""Update order item status when a prescription is approved."""
with transaction.atomic():
try:
order_item = OrderItem.objects.select_related("order").get(
uuid=event.order_item_uuid
)
order_item.status = "clinically_approved"
order_item.save()
# Check if all items are approved
order = order_item.order
if order.all_items_approved():
order.status = OrderStatus.READY_FOR_FULFILLMENT
order.save()
# Emit next event in the chain
def _publish():
emit_order_ready_for_fulfillment(order)
transaction.on_commit(_publish)
except OrderItem.DoesNotExist:
logger.exception(
"OrderItem not found for uuid=%s",
event.order_item_uuid,
)
Publishing Events Safely
Always publish events after the transaction commits.
If you publish before commit and the transaction rolls back, handlers will process an event for data that doesn’t exist.
The solution is Django’s transaction.on_commit():
# {project_slug}/ehr/services/prescriptions.py
from django.db import transaction
from {project_slug}.domain_events.bus import event_bus
from {project_slug}.domain_events.events import PrescriptionRequestApprovedEvent
class PrescriptionService:
@classmethod
@transaction.atomic
def approve_request(
cls,
request_id: int,
provider: "Provider",
sig: str,
refills: int = 0,
) -> Prescription:
"""Approve a prescription request and create a prescription."""
request = PrescriptionRequest.objects.select_related(
"encounter", "product"
).get(id=request_id)
# Validate and create the prescription
request.validate_can_modify(provider)
prescription = Prescription.objects.create(
patient=request.encounter.patient,
provider=provider,
# ... other fields
)
request.mark_as_approved(prescription)
# Queue async task for external service
create_erx_prescription.delay(prescription.id)
# CRITICAL: Publish event ONLY after transaction commits
def _publish_event():
event = PrescriptionRequestApprovedEvent(
prescription_request_id=request.id,
prescription_id=prescription.id,
encounter_id=request.encounter.id,
order_uuid=str(request.encounter.order_uuid),
order_item_uuid=str(request.order_item_uuid),
)
event_bus.publish(event)
transaction.on_commit(_publish_event)
return prescription
The inner function _publish_event() captures local variables at definition time, so they’re still available when on_commit() calls it later.
What happens:
Database changes are made within the
@transaction.atomicblocktransaction.on_commit(_publish_event)registers the callbackWhen the transaction commits successfully, Django calls
_publish_event()The event is published and handlers execute
If the transaction rolls back, the callback is never called
Without this pattern, you risk:
Handlers processing events for rolled-back data
Race conditions where handlers query before data is visible
Inconsistent state across modules
Integrating with Celery
The event-driven architecture integrates naturally with Celery for async work.
Queuing Tasks from Services
For external service calls that shouldn’t block the request:
@classmethod
@transaction.atomic
def approve_request(cls, request_id: int, provider: "Provider", ...) -> Prescription:
# ... create prescription ...
# Queue async task (runs independently of event handlers)
create_erx_prescription.delay(prescription.id)
# Publish event for other modules
def _publish_event():
event = PrescriptionRequestApprovedEvent(...)
event_bus.publish(event)
transaction.on_commit(_publish_event)
return prescription
Queuing Tasks from Event Handlers
Handlers can also queue Celery tasks for work that should happen asynchronously:
def handle_order_ready_for_fulfillment(event: OrderReadyForFulfillmentEvent) -> None:
"""Queue async task to process fulfillment."""
process_fulfillment.delay(event.order_uuid)
When to Use Sync Events vs Async Tasks
Use synchronous events when:
The work is fast and doesn’t call external services
You need the result before responding to the request
Failure should fail the whole operation
Use Celery tasks when:
Calling external APIs (payment, email, third-party services)
Processing that takes more than a few hundred milliseconds
Work that can be retried independently
You need to handle rate limits or backoff
Django Signals vs Domain Events
Django provides built-in signals (pre_save, post_save, etc.). When should you use them vs domain events?
When to Use Django Signals
Django signals are appropriate for:
Model lifecycle hooks: Auditing changes, updating timestamps
Single-model concerns: Clearing caches when a model changes
Framework integrations: Third-party packages that need to react to saves
from django.db.models.signals import post_save
from django.dispatch import receiver
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
if created:
Profile.objects.create(user=instance)
When to Use Domain Events
Domain events are preferred for:
Cross-module communication: When the Orders module needs to know about EHR events
Business domain concepts: Events that represent meaningful domain occurrences
Explicit contracts: When you want clear, documented interfaces between modules
Future extraction: When you might extract a module to a separate service
# Domain event - explicit, cross-module communication
event_bus.publish(OrderPlacedEvent(
order_uuid=str(order.uuid),
user_id=order.user.id,
# ... explicit contract
))
Key differences:
Aspect |
Django Signals |
Domain Events |
|---|---|---|
Coupling |
Model-level |
Domain-level |
Scope |
Within Django |
Cross-module |
Contract |
Implicit |
Explicit event classes |
Extractability |
Hard |
Easy (change transport) |
When fires |
Every save |
When you publish |
Guidance: Use Django signals for model-level concerns within a single module. Use domain events for anything that crosses module boundaries or represents a business domain concept.
Scaling the Event Bus
The in-memory event bus handles most applications, even those with significant traffic. When you need to scale beyond a single process, you can swap in an external broker.
When In-Memory Works
The in-memory bus is appropriate when:
Your application runs as a single process (or multiple identical processes)
Event handlers are fast and don’t need independent scaling
You don’t need event persistence or replay
When to Consider External Brokers
Consider moving to RabbitMQ, AWS SNS/SQS, or similar when:
You need to scale event consumers independently
Events should persist if the application restarts
You’re extracting a module to a separate service
You need guaranteed delivery with acknowledgment
Migration Path
Because your events are explicit classes with clear contracts, migration is straightforward:
Define an abstract interface for the event bus
Create a new implementation that publishes to RabbitMQ/SNS
Swap the implementation via configuration
Event classes remain unchanged - they’re just serialized differently
# Future: Abstract interface
class BaseEventBus(ABC):
@abstractmethod
def subscribe(self, event_type, handler): pass
@abstractmethod
def publish(self, event): pass
# Future: Settings-based selection
# EVENT_BUS_BACKEND = "myproject.events.backends.RabbitMQEventBus"
Your event contracts and handler logic stay the same. Only the transport changes. Build with clear boundaries from day one, and extraction becomes straightforward.
Event Sourcing vs Event-Driven Architecture
These terms are often confused, but they’re different approaches.
Event-driven architecture (what this guide covers) uses events for communication between modules. Your database tables remain the source of truth. Events are notifications that something happened.
Event sourcing stores events as the source of truth itself. Instead of an Orders table, you store OrderPlaced, ItemAdded, OrderShipped events and reconstruct current state by replaying them.
Aspect |
Event-Driven (This Guide) |
Event Sourcing |
|---|---|---|
Source of truth |
Database tables (Django models) |
Event log |
Events are |
Notifications after state change |
The state changes themselves |
State reconstruction |
Query the database |
Replay event stream |
Complexity |
Low to moderate |
High |
With our approach:
You update the database (
prescription.status = "approved")You publish an event to notify other modules (
PrescriptionRequestApprovedEvent)The database remains the source of truth
You get module decoupling without the complexity of event sourcing.
When Event Sourcing Makes Sense
Event sourcing is valuable for specific scenarios:
Audit-critical domains: Financial systems, healthcare records where you must prove exactly what happened and when
Complex temporal logic: When business rules depend heavily on the sequence of events
Event-native domains: Trading systems, IoT sensor streams where events are the natural model
The trade-off is complexity: event versioning, snapshot management, eventual consistency, and the inability to query current state directly (you need read models/CQRS).
Common Questions
“Do I need event sourcing for an audit trail?”
No. Add an audit log table alongside your regular data. Django packages like django-auditlog or django-simple-history handle this well.
“Do I need event sourcing to notify other systems?”
No. That’s event-driven architecture—exactly what this guide covers.
“What if I need event sourcing later?”
The patterns here position you well. Your domain events are already explicit classes with clear contracts, and your modules communicate through events. You can introduce event sourcing to specific aggregates without rewriting everything.
Summary
Event Bus Singleton: Single registry of subscribers, imported everywhere
Events as Data Classes: Simple classes representing domain occurrences
Register in AppConfig.ready(): Lazy imports, wired at startup
Publish with transaction.on_commit(): Never publish before data is committed
Integrate with Celery: Async tasks for external services and slow work
Domain Events > Django Signals: For cross-module communication
Scalability Path: Same contracts work with external brokers when needed