LangChain & LangGraph Integration ================================== Build LLM-powered processes and agent workflows in your Django modular monolith. .. index:: langchain, langgraph, llm, agents, ai, workflows, rag Overview -------- `LangChain `_ is a framework for building applications powered by large language models (LLMs). `LangGraph `_ extends LangChain with graph-based orchestration for complex, stateful agent workflows. These tools fit naturally into the modular monolith architecture: - **LangChain chains** become services that encapsulate LLM interactions - **LangGraph agents** handle multi-step workflows with state management - **RAG pipelines** provide domain-specific knowledge retrieval Common use cases: - Chatbots and conversational interfaces - Document Q&A with retrieval-augmented generation (RAG) - Autonomous agents that use tools and APIs - Multi-step workflows (research, analysis, content generation) Architecture Patterns --------------------- LLM components integrate with the existing services/selectors pattern. There are three approaches, depending on scope: Dedicated AI Module ^^^^^^^^^^^^^^^^^^^ For shared LLM infrastructure, create a central ``ai`` module: .. code-block:: text {project_slug}/ ├── users/ ├── orders/ └── ai/ # Shared LLM infrastructure ├── __init__.py ├── models.py # Conversation, Message, Embedding models ├── services.py # Chain and agent services ├── selectors.py # Conversation history queries └── clients.py # LLM client configuration This module owns: - LLM client initialization and configuration - Shared prompt templates - Conversation/message persistence - Common chains (summarization, classification) Per-Module Agents ^^^^^^^^^^^^^^^^^ Domain-specific agents live within their respective modules: .. code-block:: text {project_slug}/orders/ ├── models.py ├── services.py ├── selectors.py └── agents/ # Order-specific agents ├── __init__.py ├── support_agent.py # Customer support agent └── analysis_agent.py # Order analytics agent This keeps domain knowledge close to the data it operates on. Hybrid Approach ^^^^^^^^^^^^^^^ Most projects use both: - Shared ``ai`` module for infrastructure and common chains - Per-module agents for domain-specific workflows Installation & Setup -------------------- Add dependencies to ``pyproject.toml``: .. code-block:: toml [project] dependencies = [ # ... existing deps "langchain>=0.3", "langchain-openai>=0.2", "langgraph>=0.2", ] Add environment variables to ``.env``: .. code-block:: bash # OpenAI OPENAI_API_KEY=sk-... # Optional: LangSmith for tracing LANGCHAIN_TRACING_V2=true LANGCHAIN_API_KEY=lsv2_... Create settings in ``config/settings/base.py``: .. code-block:: python # LLM Configuration OPENAI_API_KEY = env("OPENAI_API_KEY", default="") OPENAI_MODEL = env("OPENAI_MODEL", default="gpt-4o-mini") # LangSmith (optional) LANGCHAIN_TRACING_V2 = env.bool("LANGCHAIN_TRACING_V2", default=False) Initialize the LLM client in your AI module: .. code-block:: python # {project_slug}/ai/clients.py from django.conf import settings from langchain_openai import ChatOpenAI def get_llm( model: str | None = None, temperature: float = 0.7, ) -> ChatOpenAI: """Get configured LLM client.""" return ChatOpenAI( model=model or settings.OPENAI_MODEL, api_key=settings.OPENAI_API_KEY, temperature=temperature, ) Building Your First Chain ------------------------- Chains are sequences of LLM calls with prompt templates. Implement them as services: .. code-block:: python # {project_slug}/ai/services.py from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from .clients import get_llm def summarize_text(*, text: str, max_words: int = 100) -> str: """Summarize text using LLM. Args: text: The text to summarize. max_words: Maximum words in summary. Returns: Summarized text. Raises: ValueError: If text is empty. """ if not text.strip(): raise ValueError("Text cannot be empty") prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant that summarizes text concisely."), ("user", "Summarize the following in {max_words} words or less:\n\n{text}"), ]) chain = prompt | get_llm(temperature=0.3) | StrOutputParser() return chain.invoke({"text": text, "max_words": max_words}) def classify_intent(*, message: str, categories: list[str]) -> str: """Classify user message into one of the given categories.""" prompt = ChatPromptTemplate.from_messages([ ("system", "Classify the user message into exactly one category. " "Respond with only the category name."), ("user", "Categories: {categories}\n\nMessage: {message}"), ]) chain = prompt | get_llm(temperature=0) | StrOutputParser() return chain.invoke({ "message": message, "categories": ", ".join(categories), }) Use these services from views or other services: .. code-block:: python # In a DRF view from {project_slug}.ai.services import summarize_text class SummarizeView(APIView): def post(self, request): serializer = SummarizeInputSerializer(data=request.data) serializer.is_valid(raise_exception=True) summary = summarize_text( text=serializer.validated_data["text"], max_words=serializer.validated_data.get("max_words", 100), ) return Response({"summary": summary}) LangGraph for Agent Workflows ----------------------------- LangGraph enables complex, stateful workflows using a graph-based approach. Agents can make decisions, use tools, and maintain state across steps. State Definition ^^^^^^^^^^^^^^^^ Define the state your agent will track: .. code-block:: python # {project_slug}/orders/agents/support_agent.py from typing import Annotated, TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_core.messages import BaseMessage, HumanMessage, AIMessage class SupportState(TypedDict): """State for customer support agent.""" messages: Annotated[list[BaseMessage], add_messages] order_id: int | None intent: str | None resolved: bool Node Functions ^^^^^^^^^^^^^^ Each node is a function that takes state and returns updates: .. code-block:: python from {project_slug}.ai.clients import get_llm from {project_slug}.orders.selectors import order_get def classify_intent_node(state: SupportState) -> dict: """Classify the customer's intent.""" last_message = state["messages"][-1].content llm = get_llm(temperature=0) response = llm.invoke([ {"role": "system", "content": "Classify intent as: order_status, refund, general"}, {"role": "user", "content": last_message}, ]) return {"intent": response.content.strip().lower()} def lookup_order_node(state: SupportState) -> dict: """Look up order details.""" order_id = state.get("order_id") if not order_id: return {"messages": [AIMessage(content="I need your order ID to help you.")]} order = order_get(order_id=order_id) if not order: return {"messages": [AIMessage(content="I couldn't find that order.")]} return { "messages": [AIMessage( content=f"Order #{order.id}: Status is {order.status}. " f"Placed on {order.created_at.date()}." )] } def generate_response_node(state: SupportState) -> dict: """Generate a helpful response based on context.""" llm = get_llm() system_prompt = """You are a helpful customer support agent. Be concise and friendly. If you can't help, offer to escalate.""" response = llm.invoke([ {"role": "system", "content": system_prompt}, *[{"role": m.type, "content": m.content} for m in state["messages"]], ]) return {"messages": [response], "resolved": True} Graph Construction ^^^^^^^^^^^^^^^^^^ Wire nodes together with conditional edges: .. code-block:: python def route_by_intent(state: SupportState) -> str: """Route to appropriate node based on intent.""" intent = state.get("intent", "general") if intent == "order_status": return "lookup_order" return "generate_response" def build_support_agent(): """Build and compile the support agent graph.""" graph = StateGraph(SupportState) # Add nodes graph.add_node("classify_intent", classify_intent_node) graph.add_node("lookup_order", lookup_order_node) graph.add_node("generate_response", generate_response_node) # Add edges graph.add_edge(START, "classify_intent") graph.add_conditional_edges( "classify_intent", route_by_intent, {"lookup_order": "lookup_order", "generate_response": "generate_response"}, ) graph.add_edge("lookup_order", "generate_response") graph.add_edge("generate_response", END) return graph.compile() # Service function to run the agent def run_support_agent(*, message: str, order_id: int | None = None) -> str: """Run the support agent and return response.""" agent = build_support_agent() initial_state = { "messages": [HumanMessage(content=message)], "order_id": order_id, "intent": None, "resolved": False, } result = agent.invoke(initial_state) return result["messages"][-1].content RAG (Retrieval Augmented Generation) ------------------------------------ RAG enhances LLM responses with relevant documents from your knowledge base. Core Concepts ^^^^^^^^^^^^^ 1. **Document Loading**: Parse documents (PDFs, web pages, databases) 2. **Chunking**: Split documents into smaller pieces 3. **Embedding**: Convert chunks to vectors 4. **Retrieval**: Find relevant chunks for a query 5. **Generation**: Use retrieved context to generate answers Building a RAG Service ^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python # {project_slug}/ai/services.py from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough from langchain_openai import OpenAIEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from .clients import get_llm def create_embeddings(): """Create embeddings model.""" return OpenAIEmbeddings(model="text-embedding-3-small") def chunk_documents(documents: list[str], chunk_size: int = 1000) -> list[str]: """Split documents into chunks for embedding.""" splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=200, ) chunks = [] for doc in documents: chunks.extend(splitter.split_text(doc)) return chunks class RAGService: """Service for retrieval-augmented generation.""" def __init__(self, vector_store): """Initialize with a vector store. Args: vector_store: Any LangChain-compatible vector store (Chroma, Pinecone, pgvector, FAISS, etc.) """ self.vector_store = vector_store self.retriever = vector_store.as_retriever(search_kwargs={"k": 4}) def query(self, question: str) -> str: """Answer a question using RAG.""" prompt = ChatPromptTemplate.from_messages([ ("system", "Answer based on the context. If unsure, say so.\n\n" "Context:\n{context}"), ("user", "{question}"), ]) def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs) chain = ( {"context": self.retriever | format_docs, "question": RunnablePassthrough()} | prompt | get_llm(temperature=0.3) | StrOutputParser() ) return chain.invoke(question) def add_documents(self, documents: list[str]) -> None: """Add documents to the knowledge base.""" chunks = chunk_documents(documents) self.vector_store.add_texts(chunks) Vector Store Options ^^^^^^^^^^^^^^^^^^^^ LangChain supports many vector stores. Choose based on your needs: - **FAISS**: In-memory, good for development and small datasets - **Chroma**: Lightweight, persistent, good for prototyping - **pgvector**: PostgreSQL extension, keeps data in your existing database - **Pinecone/Weaviate**: Managed services for production scale Example with FAISS (development): .. code-block:: python from langchain_community.vectorstores import FAISS # Create vector store embeddings = create_embeddings() vector_store = FAISS.from_texts( ["Your documents here..."], embeddings, ) # Use RAG service rag = RAGService(vector_store) answer = rag.query("What is the return policy?") Integration Patterns -------------------- With DRF ^^^^^^^^ Create API endpoints for LLM interactions: .. code-block:: python # {project_slug}/ai/apis.py from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import serializers from .services import summarize_text, classify_intent class ChatInputSerializer(serializers.Serializer): message = serializers.CharField(max_length=4000) order_id = serializers.IntegerField(required=False) class ChatView(APIView): def post(self, request): serializer = ChatInputSerializer(data=request.data) serializer.is_valid(raise_exception=True) from {project_slug}.orders.agents.support_agent import run_support_agent response = run_support_agent( message=serializer.validated_data["message"], order_id=serializer.validated_data.get("order_id"), ) return Response({"response": response}) With Celery ^^^^^^^^^^^ Run LLM tasks asynchronously for better user experience: .. code-block:: python # {project_slug}/ai/tasks.py from celery import shared_task from .services import summarize_text @shared_task def summarize_document_task(document_id: int) -> str: """Summarize a document asynchronously.""" from {project_slug}.documents.selectors import document_get document = document_get(document_id=document_id) summary = summarize_text(text=document.content) # Store result from {project_slug}.documents.services import document_update document_update(document_id=document_id, summary=summary) return summary # Usage summarize_document_task.delay(document_id=123) With Event Bus ^^^^^^^^^^^^^^ Publish events from agent actions for cross-module communication. First, define the event class (see :doc:`event-driven-architecture` for the full pattern): .. code-block:: python # {project_slug}/domain_events/events.py from {project_slug}.domain_events.base import DomainEvent class FeedbackAnalyzedEvent(DomainEvent): """Emitted when AI analyzes customer feedback.""" def __init__(self, feedback_id: int, sentiment: str): self.feedback_id = feedback_id self.sentiment = sentiment Then publish the event from your service: .. code-block:: python # {project_slug}/ai/services.py from {project_slug}.domain_events.bus import event_bus from {project_slug}.domain_events.events import FeedbackAnalyzedEvent def analyze_feedback(*, feedback_id: int) -> dict: """Analyze customer feedback using LLM.""" from {project_slug}.feedback.selectors import feedback_get feedback = feedback_get(feedback_id=feedback_id) sentiment = classify_intent( message=feedback.content, categories=["positive", "neutral", "negative"], ) # Publish event for other modules event_bus.publish(FeedbackAnalyzedEvent( feedback_id=feedback_id, sentiment=sentiment, )) return {"sentiment": sentiment} With Django ORM ^^^^^^^^^^^^^^^ Persist conversations and agent state: .. code-block:: python # {project_slug}/ai/models.py from django.db import models from django.conf import settings class Conversation(models.Model): user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE) created_at = models.DateTimeField(auto_now_add=True) metadata = models.JSONField(default=dict) class Message(models.Model): conversation = models.ForeignKey( Conversation, on_delete=models.CASCADE, related_name="messages" ) role = models.CharField(max_length=20) # user, assistant, system content = models.TextField() created_at = models.DateTimeField(auto_now_add=True) class Meta: ordering = ["created_at"] # Service to persist conversation def conversation_add_message( *, conversation_id: int, role: str, content: str, ) -> Message: """Add a message to a conversation.""" return Message.objects.create( conversation_id=conversation_id, role=role, content=content, ) Streaming Responses ------------------- For chat interfaces, stream responses token-by-token: Django StreamingHttpResponse ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python # {project_slug}/ai/apis.py from django.http import StreamingHttpResponse from .clients import get_llm def stream_chat(request): """Stream LLM response.""" message = request.POST.get("message", "") def generate(): llm = get_llm() for chunk in llm.stream(message): yield f"data: {chunk.content}\n\n" yield "data: [DONE]\n\n" return StreamingHttpResponse( generate(), content_type="text/event-stream", ) Server-Sent Events with Async Views ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For async Django (``use_async=y``): .. code-block:: python # {project_slug}/ai/apis.py from django.http import StreamingHttpResponse from .clients import get_llm async def stream_chat_async(request): """Stream LLM response asynchronously.""" message = request.POST.get("message", "") async def generate(): llm = get_llm() async for chunk in llm.astream(message): yield f"data: {chunk.content}\n\n" yield "data: [DONE]\n\n" return StreamingHttpResponse( generate(), content_type="text/event-stream", ) Frontend Integration ^^^^^^^^^^^^^^^^^^^^ Consume the stream in your React frontend: .. code-block:: typescript // apps/{project_slug}/src/hooks/useChat.ts export function useChat() { const [response, setResponse] = useState(""); const sendMessage = async (message: string) => { setResponse(""); const response = await fetch("/api/chat/stream/", { method: "POST", body: new FormData().append("message", message), }); const reader = response.body?.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader!.read(); if (done) break; const text = decoder.decode(value); const lines = text.split("\n"); for (const line of lines) { if (line.startsWith("data: ") && line !== "data: [DONE]") { setResponse((prev) => prev + line.slice(6)); } } } }; return { response, sendMessage }; } Testing Agents -------------- Test LLM services by mocking the LLM client: Mocking LLM Responses ^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python # tests/test_ai_services.py import pytest from unittest.mock import patch, MagicMock from {project_slug}.ai.services import summarize_text, classify_intent @pytest.fixture def mock_llm(): """Mock LLM client.""" with patch("{project_slug}.ai.clients.get_llm") as mock: llm = MagicMock() mock.return_value = llm yield llm def test_summarize_text(mock_llm): # Arrange mock_llm.__or__ = lambda self, other: other # Handle pipe operator mock_response = MagicMock() mock_response.content = "This is a summary." # Mock the chain invoke with patch("{project_slug}.ai.services.StrOutputParser") as mock_parser: mock_parser.return_value.invoke.return_value = "This is a summary." # Act result = summarize_text(text="Long text here...") # Assert assert "summary" in result.lower() def test_classify_intent(mock_llm): mock_response = MagicMock() mock_response.content = "order_status" mock_llm.invoke.return_value = mock_response result = classify_intent( message="Where is my order?", categories=["order_status", "refund", "general"], ) assert result == "order_status" Testing LangGraph Agents ^^^^^^^^^^^^^^^^^^^^^^^^ Test agent workflows by mocking individual nodes: .. code-block:: python def test_support_agent_routes_to_order_lookup(mock_llm): """Test that order queries route to lookup node.""" from {project_slug}.orders.agents.support_agent import build_support_agent # Mock intent classification mock_llm.invoke.return_value = MagicMock(content="order_status") agent = build_support_agent() result = agent.invoke({ "messages": [HumanMessage(content="Where is order 123?")], "order_id": 123, "intent": None, "resolved": False, }) # Verify order was looked up assert "order" in result["messages"][-1].content.lower() Using LangSmith ^^^^^^^^^^^^^^^ Enable LangSmith for debugging in tests: .. code-block:: python # conftest.py import os @pytest.fixture(autouse=True) def enable_langsmith_tracing(): """Enable LangSmith tracing for debugging.""" os.environ["LANGCHAIN_TRACING_V2"] = "true" yield os.environ["LANGCHAIN_TRACING_V2"] = "false" Production Considerations ------------------------- Rate Limiting ^^^^^^^^^^^^^ Protect against abuse and control costs: .. code-block:: python from django.core.cache import cache from rest_framework.exceptions import Throttled def check_rate_limit(user_id: int, limit: int = 100) -> None: """Check if user has exceeded rate limit.""" key = f"llm_rate_limit:{user_id}" count = cache.get(key, 0) if count >= limit: raise Throttled(detail="LLM rate limit exceeded") cache.set(key, count + 1, timeout=3600) # Reset hourly Cost Tracking ^^^^^^^^^^^^^ Track token usage for billing and optimization: .. code-block:: python from langchain_core.callbacks import BaseCallbackHandler class CostTracker(BaseCallbackHandler): """Track LLM token usage and costs.""" def on_llm_end(self, response, **kwargs): usage = response.llm_output.get("token_usage", {}) # Log or store usage logger.info( "LLM usage", input_tokens=usage.get("prompt_tokens"), output_tokens=usage.get("completion_tokens"), ) # Use with LLM llm = get_llm(callbacks=[CostTracker()]) Caching ^^^^^^^ Cache deterministic responses: .. code-block:: python from django.core.cache import cache import hashlib def cached_classify(message: str, categories: list[str]) -> str: """Classify with caching for repeated queries.""" cache_key = hashlib.md5( f"{message}:{sorted(categories)}".encode() ).hexdigest() result = cache.get(cache_key) if result: return result result = classify_intent(message=message, categories=categories) cache.set(cache_key, result, timeout=3600) return result Error Handling ^^^^^^^^^^^^^^ Handle LLM failures gracefully: .. code-block:: python from langchain_core.exceptions import OutputParserException from openai import RateLimitError, APIError def safe_summarize(text: str) -> str | None: """Summarize with graceful error handling.""" try: return summarize_text(text=text) except RateLimitError: logger.warning("OpenAI rate limit hit, retrying...") time.sleep(60) return summarize_text(text=text) except APIError as e: logger.error("OpenAI API error", error=str(e)) return None except OutputParserException: logger.warning("Failed to parse LLM output") return None Common Patterns --------------- Quick reference for common LLM patterns: .. list-table:: :header-rows: 1 :widths: 20 40 20 20 * - Pattern - Use Case - Complexity - Example * - Simple Chain - Text generation, summarization, classification - Low - ``summarize_text()`` * - RAG Chain - Knowledge-base Q&A, document search - Medium - ``RAGService.query()`` * - ReAct Agent - Tool-using assistant, API integration - Medium - Support agent with order lookup * - Multi-Agent - Complex workflows, research, analysis - High - Orchestrated specialist agents See Also -------- - `LangChain Documentation `_ — Official LangChain docs - `LangGraph Documentation `_ — Official LangGraph docs - `LangSmith `_ — Tracing and debugging platform - :doc:`service-layer-patterns` — Where LLM services fit in the architecture - :doc:`event-driven-architecture` — Publishing events from agent actions - :doc:`testing` — Testing patterns for Django - :doc:`adding-modules` — Creating a dedicated AI module - :doc:`../5-ai-development/claude-code` — AI-assisted development workflow