AI Movie Recommender: An Agentic RAG System with LangGraph
This project is a full-stack, production-grade movie and series recommendation system built around an agentic RAG (Retrieval-Augmented Generation) architecture. Instead of relying on a single prompt-and-retrieve cycle, the system is orchestrated as a LangGraph state machine that contextualizes the user turn, classifies intent, retrieves with a hybrid search pipeline, evaluates retrieval quality, and either generates a grounded response or asks for clarification.
The backend runs on FastAPI with LangGraph for agent orchestration, Qdrant as the vector database, LiteLLM as the unified LLM gateway, PostgreSQL for durable conversation history, and Redis as a stream bus: each assistant run publishes fine-grained events to a Redis Stream keyed by message_id, while generation executes in a detached background task with its own DB session. That way a page refresh or brief WebSocket drop does not cancel inference, the client can resume_conversation or resume_stream and keep reading the same stream from the last Redis entry id.
LLM calls are routed through LiteLLM to Ollama. The frontend is a React + TypeScript chat interface with a live LangGraph execution panel that visualizes the agent's decision path as it runs.
1. Agent Architecture: The LangGraph State Graph
The core of the system is a 6-node LangGraph StateGraph that models the full recommendation flow as a directed graph with conditional edges. Every user turn enters at contextualize, then router, so routing always sees a single self-contained query (short follow-ups after a re-ask still carry merged intent). The graph exits through generate_retrieve, generate_general, or reask_user depending on intent and retrieval quality. The compiled graph is a process-wide singleton (app_graph) reused for every generation task.
LangGraph Workflow Topology
1a. The Agent State
LangGraph threads a typed state dictionary through every node. This AgentState carries the conversation messages, the contextualized query produced before routing, the router's intent classification, retrieved documents, the final generation, and quality flags that drive conditional routing:
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]
contextualized_question: str
decision: str # "RETRIEVE" | "GENERAL"
media_type: str # "movie" | "series" | "any"
documents: list[str]
generation: str
needs_reask: bool
reask_count: int1b. Graph Nodes in Detail
Each node is a single async function that reads from the shared state, performs its work, and returns a partial state update. LangGraph merges these updates automatically. It uses two llm models, depending on the nature of the task, it is better explained in the following section.
Node Responsibilities
- Contextualize, Runs immediately after
START. Rewrites the user's raw last turn into a standalone question using chat history (anaphora, prior titles, etc.), via the secondary Model. The router and retriever both consumecontextualized_question. - Router, Classifies intent (
RETRIEVEvsGENERAL) and extractsmedia_type(movie,series, orany) in one structured LLM call on the already contextualized text, so routing is stable even when the raw message is a short fragment. - Retrieve, Hybrid search (dense + sparse + reranking) against Qdrant with optional media-type filters. Compares the best reranker score to
RETRIEVAL_SCORE_THRESHOLD(default 0.5); setsneeds_reask=Truewhen quality is poor. - Generate Retrieve, Grounded answer over retrieved documents and history using the primary Gemma tier.
- Generate General, Non-retrieval turns (greetings, meta questions, chit-chat) using only history, no vector search.
- Reask User, When retrieval is weak and the re-ask budget allows it, emits a clarifying question. Implemented with the primary LLM for natural phrasing; the turn ends here and the next user message starts a fresh graph pass with richer context.
1c. Speculative RAG: Quality-Gated Retrieval
The system implements a Speculative RAG pattern, retrieval happens optimistically, but generation is gated on a quality check. After the retrieve node executes, a conditional edge inspects the best reranker score:
- If the score meets the threshold (>= 0.5), the graph routes to generate_retrieve and produces a grounded answer.
- If the score falls below threshold, the graph routes to reask_user instead of hallucinating with poor context.
- A max reask counter prevents infinite clarification loops, after one reask, the system generates the best answer it can with whatever context is available.
This is a key architectural decision: the system prefers asking for clarification over generating a low-confidence response. The quality gate acts as a circuit breaker that protects against hallucination when the vector store doesn't have strong matches for the query.
For a deeper dive into retrieval evaluation and how to measure whether your RAG pipeline is returning the right chunks, see my blog post on RAG Evaluation Metrics and RAGAS.
RETRIEVAL_SCORE_THRESHOLD = 0.5
MAX_REASK_COUNT = 1
def _retrieval_quality_ok(results: list[dict]) -> bool:
if not results:
return False
return results[0].get("score", 0.0) >= RETRIEVAL_SCORE_THRESHOLD
def route_after_retrieve(state: AgentState) -> str:
if not state.get("needs_reask"):
return "generate_retrieve"
if state.get("reask_count", 0) >= MAX_REASK_COUNT:
return "generate_retrieve" # fallback after max reasks
return "reask_user"1d. Background Summarization
Long conversations can bloat the context window and degrade LLM performance. After each turn completes, the system launches an asynchronous background summarization task using the secondary LLM. If either the user message or the assistant response exceeds a configurable token threshold, it gets compressed into a shorter summary.
This compressed version replaces the original in the DB (while the raw content is preserved in a separate column), so the next turn loads a shorter, denser chat history. The summarization runs concurrently with the user's thinking time, by the time the next message arrives, the compression is already done and flushed to the database.
2. ETL Pipeline & Semantic Chunking
The data pipeline ingests two Kaggle datasets, the Netflix Shows dataset (~8,800 titles) and the IMDb dataset (~1,000 movies), and transforms them into a unified schema before indexing into Qdrant.
Pipeline Steps
- Download, Datasets are auto-downloaded via
kagglehubon first run. - Load & Parse, CSVs are loaded with Polars for fast, parallel DataFrame processing. Fields like duration (regex extraction), cast/genre (CSV parsing), and media type (normalization to
movie/series) are cleaned. - Unify, Both datasets are mapped into a common
MediaItemdataclass with fields:title,media_type,director,genre,cast,duration_min,description. - Semantic Document Building, Each
MediaItembecomes a singleDocumentwith structured natural-language content concatenating all fields. Metadata is preserved as a separate payload for Qdrant filtering. - Batch Indexing, Documents are uploaded to Qdrant in batches of 64 with retry logic and exponential backoff.
2a. Why One Document Per Item (No Fragmentation)
Unlike traditional RAG systems that chunk long documents into overlapping windows, this pipeline uses entity-level semantic chunking: each movie or series is a single document. This is a deliberate choice, a movie's metadata (title, director, cast, genre, description) forms a coherent semantic unit that should never be split across chunks. Splitting would mean the retriever might surface a chunk with an actor's name but lose the genre context, degrading recommendation quality.
For a comprehensive comparison of chunking strategies, from fixed-size windows to semantic and agentic approaches, see my blog post on Chunking Strategies for RAG.
def build_semantic_documents_from_media_item(item: MediaItem) -> Document:
parts = [f"Title: {item.title or 'Unknown'}"]
if item.media_type:
parts.append(f"Type: {item.media_type}")
if item.director:
parts.append(f"Director: {item.director}")
if item.genre:
parts.append(f"Genres: {', '.join(item.genre)}")
if item.cast:
parts.append(f"Cast: {', '.join(item.cast)}")
if item.duration_min:
parts.append(f"Duration: {item.duration_min} minutes")
if item.description:
parts.append(f"Description: {item.description}")
metadata = {
"title": item.title, "media_type": item.media_type,
"director": item.director, "genre": item.genre,
"cast": item.cast, "duration_min": item.duration_min,
}
return Document(
page_content="\n".join(parts),
metadata={k: v for k, v in metadata.items() if v not in (None, "", [])},
)3. Hybrid Retriever: Dense + Sparse + Reranking
The retriever is the heart of the RAG pipeline. It implements a three-stage hybrid search using Qdrant's native fusion capabilities, combining the strengths of dense semantic search, sparse lexical matching, and cross-encoder reranking.
For the theory behind dense retrieval, sparse retrieval (BM25/SPLADE), and reranking, including how cosine similarity, Reciprocal Rank Fusion, and cross-encoders work under the hood, see my blog post on Information Retrieval: Dense, Sparse & Reranking.
Hybrid Search Pipeline
Search Stages
- Dense Search, Encodes the query with
paraphrase-multilingual-MiniLM-L12-v2(SentenceTransformers) and performs cosine similarity search against the dense vector index. Captures semantic meaning, "suspenseful thriller" will match movies described as "edge-of-your-seat tension". - Sparse Search, Encodes the query with
Splade_PP_en_v1for learned sparse representations. Captures lexical overlap, exact actor names, specific titles, or niche genre terms that dense embeddings might miss. - Reciprocal Rank Fusion (RRF), Qdrant's built-in
FusionQuerymerges both ranked lists using RRF, which rewards documents that appear high in multiple rankings without being biased by score scale differences. The top 15 candidates are prefetched. - Cross-Encoder Reranking, The top 15 fused results are re-scored using
jinaai/jina-reranker-v2-base-multilingual, a cross-encoder that sees query and document together (not independently like bi-encoders). Scores are sigmoid-normalized to [0, 1] for consistent thresholding. The final top 5 are returned.
Model Stack
| Component | Model | Purpose |
|---|---|---|
| Dense Embeddings | paraphrase-multilingual-MiniLM-L12-v2 | Semantic similarity search |
| Sparse Embeddings | Splade_PP_en_v1 | Learned lexical matching (like BM25, but trained) |
| Reranker | jinaai/jina-reranker-v2-base-multilingual | Cross-encoder re-scoring for precision |
| Primary LLM | Google Gemma 3 , gemma4:26b (Ollama via LiteLLM) | Grounded answers, general chat, re-ask wording |
| Secondary LLM | Google Gemma 3 , gemma4:e4b (Ollama via LiteLLM) | Router JSON, contextualize, background summarization |
async def search(self, text: str, rerank: bool = True,
filter: models.Filter | None = None) -> list[dict]:
result = await self.async_qdrant_client.query_points(
collection_name=self.collection_name,
query=models.FusionQuery(fusion=models.Fusion.RRF),
prefetch=[
models.Prefetch(
query=models.Document(text=text, model=self.dense_model_name),
using=self.dense_vector_name,
),
models.Prefetch(
query=models.Document(text=text, model=self.sparse_model_name),
using=self.sparse_vector_name,
),
],
query_filter=filter,
limit=self.prefetch_limit, # 15 candidates for reranking
)
if rerank:
texts = [p.payload["page-content"] for p in result.points]
raw_scores = list(self.reranker.rerank(text, texts))
norm_scores = [_sigmoid(s) for s in raw_scores] # normalize to [0, 1]
ranking = sorted(enumerate(norm_scores), key=lambda x: x[1], reverse=True)
return [{"score": score, **points[i].payload}
for i, score in ranking[:self.final_limit]] # top 54. Redis-Backed Streaming, WebSockets & Resumability
The client still talks over a persistent WebSocket, but the server decouples generation from the socket. When a user sends start_conversation or message, the handler spawns generate_to_redis: a background coroutine that opens its own async SQLAlchemy session, runs app_graph.astream_events, and appends every UI event to a Redis Stream (namespaced by message_id). A lightweight relay task on the WebSocket connection reads that stream and forwards entries to the browser.
If the tab reloads or the socket drops mid-stream, inference keeps running. On reconnect the client can send resume_conversation (rebind to a conversation and attach to an active generation if any) or resume_stream with the last known Redis stream id to replay tail events without losing tokens. interrupt sets a Redis flag checked on each graph iteration so the user can still stop generation cooperatively.
WebSocket Event Protocol
| Event | Emitted When | Payload |
|---|---|---|
generation_started | Background task claimed the message id | message_id string |
thinking_start / thinking_end | Bracket the LangGraph execution phase | null |
graph_start / graph_end | LangGraph astream_events loop begins / completes | null |
node_start / node_end | A graph node begins or finishes execution | Node name (e.g. contextualize, router, retrieve) |
node_output | A node completes with metadata | JSON: decision, media_type, documents_count, etc. |
response_chunk | A token is streamed from the LLM | Token string |
response_done | Generation is complete | Empty string |
interrupt_ack | User interruption was processed | null |
4a. Interruption vs disconnect
When the user clicks stop, the client sends interrupt, which sets a Redis-backed interrupt flag keyed by message_id. The detached generate_to_redis loop consults that flag between events; on interrupt it publishes tail events (including an "[message interrupted by the user]" suffix), flushes partial content to PostgreSQL, and clears Redis bookkeeping.
A plain WebSocket disconnect is different: the relay task is cancelled, but generation is intentionally not cancelled, tokens continue landing in the Redis stream so a later resume_stream can catch up. That is what makes refresh-tolerant streaming possible.
5. LiteLLM: Unified LLM Gateway
The system uses LiteLLM as a proxy layer between the application and the LLM providers. Instead of coupling directly to a specific provider's SDK, all LLM calls go through LiteLLM's OpenAI-compatible API. This means the backend can switch from Ollama (local) to OpenAI, Anthropic, vLLM, or any supported provider by changing a single YAML configuration, zero code changes required.
The active stack targets Google Gemma 3-class open weights served locally: a primary route for large, fluent completions and a secondary route for fast, low-temperature structured tasks (router JSON, contextualize, summarization). In the checked-in Docker setup both routes hit Ollama with the tags below.
For a detailed walkthrough of LiteLLM's architecture, proxy modes, and configuration, see my blog post on LiteLLM: One Interface for Every LLM Provider.
model_list:
- model_name: primary-llm
litellm_params:
model: ollama/gemma4:26b
api_base: http://ollama:11434
- model_name: secondary-llm
litellm_params:
model: ollama/gemma4:e4b
api_base: http://ollama:114346. Infrastructure: Fully Containerized
The entire system is orchestrated with Docker Compose, a single docker compose up spins up seven core services (plus optional qdrant-init profile) with proper networking and health checks:
Docker Compose Services
| Service | Technology | Port | Role |
|---|---|---|---|
| qdrant | Qdrant | 6333 | Vector database for hybrid search |
| postgres | PostgreSQL 15 | 5432 | Conversation and message persistence |
| redis | Redis 7 (AOF) | 6379 | Stream bus for resumable generation events + interrupt flags |
| ollama | Ollama | 11434 | Local Gemma inference |
| litellm | LiteLLM Proxy | 4000 | Unified LLM gateway |
| backend | FastAPI (Python 3.13) | 8000 | WebSocket API, Redis relay, LangGraph workers |
| frontend | React + Vite | 5173 | Chat interface + LangGraph visualization |
7. Frontend: Live Graph Visualization
The frontend is built with React, TypeScript, Vite, and Tailwind CSS. Beyond the standard chat interface with markdown rendering, message history, and conversation management, the standout feature is the LangGraph Panel, a real-time visualization that shows the agent's decision path as it executes.
As stream events arrive (graph_start, node_start, node_end, node_output, …), the panel updates a visual graph with color-coded nodes in the same conceptual order as production: contextualize → router → (retrieve | generate_general) → …. SVG edges show the RETRIEVE vs GENERAL split. Each node card surfaces router decisions, media-type badges, retriever stats, or the contextualized question.
The WebSocket client implements automatic reconnection with exponential backoff and, on success, issues resume_conversation / resume_stream so a dropped tab can reattach to Redis-backed progress instead of restarting the answer from scratch.

