AI Movie Recommender: An Agentic RAG System with LangGraph

This project is a full-stack, production-grade movie and series recommendation system built around an agentic RAG (Retrieval-Augmented Generation) architecture. Instead of relying on a single prompt-and-retrieve cycle, the system is orchestrated as a LangGraph state machine that contextualizes the user turn, classifies intent, retrieves with a hybrid search pipeline, evaluates retrieval quality, and either generates a grounded response or asks for clarification.

The backend runs on FastAPI with LangGraph for agent orchestration, Qdrant as the vector database, LiteLLM as the unified LLM gateway, PostgreSQL for durable conversation history, and Redis as a stream bus: each assistant run publishes fine-grained events to a Redis Stream keyed by message_id, while generation executes in a detached background task with its own DB session. That way a page refresh or brief WebSocket drop does not cancel inference, the client can resume_conversation or resume_stream and keep reading the same stream from the last Redis entry id.

LLM calls are routed through LiteLLM to Ollama. The frontend is a React + TypeScript chat interface with a live LangGraph execution panel that visualizes the agent's decision path as it runs.


1. Agent Architecture: The LangGraph State Graph

The core of the system is a 6-node LangGraph StateGraph that models the full recommendation flow as a directed graph with conditional edges. Every user turn enters at contextualize, then router, so routing always sees a single self-contained query (short follow-ups after a re-ask still carry merged intent). The graph exits through generate_retrieve, generate_general, or reask_user depending on intent and retrieval quality. The compiled graph is a process-wide singleton (app_graph) reused for every generation task.

LangGraph Workflow Topology

1a. The Agent State

LangGraph threads a typed state dictionary through every node. This AgentState carries the conversation messages, the contextualized query produced before routing, the router's intent classification, retrieved documents, the final generation, and quality flags that drive conditional routing:

class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]
    contextualized_question: str
    decision: str        # "RETRIEVE" | "GENERAL"
    media_type: str      # "movie" | "series" | "any"
    documents: list[str]
    generation: str
    needs_reask: bool
    reask_count: int

1b. Graph Nodes in Detail

Each node is a single async function that reads from the shared state, performs its work, and returns a partial state update. LangGraph merges these updates automatically. It uses two llm models, depending on the nature of the task, it is better explained in the following section.

Node Responsibilities

  1. Contextualize, Runs immediately after START. Rewrites the user's raw last turn into a standalone question using chat history (anaphora, prior titles, etc.), via the secondary Model. The router and retriever both consume contextualized_question.
  2. Router, Classifies intent (RETRIEVE vs GENERAL) and extracts media_type (movie, series, or any) in one structured LLM call on the already contextualized text, so routing is stable even when the raw message is a short fragment.
  3. Retrieve, Hybrid search (dense + sparse + reranking) against Qdrant with optional media-type filters. Compares the best reranker score to RETRIEVAL_SCORE_THRESHOLD (default 0.5); sets needs_reask=True when quality is poor.
  4. Generate Retrieve, Grounded answer over retrieved documents and history using the primary Gemma tier.
  5. Generate General, Non-retrieval turns (greetings, meta questions, chit-chat) using only history, no vector search.
  6. Reask User, When retrieval is weak and the re-ask budget allows it, emits a clarifying question. Implemented with the primary LLM for natural phrasing; the turn ends here and the next user message starts a fresh graph pass with richer context.

1c. Speculative RAG: Quality-Gated Retrieval

The system implements a Speculative RAG pattern, retrieval happens optimistically, but generation is gated on a quality check. After the retrieve node executes, a conditional edge inspects the best reranker score:

- If the score meets the threshold (>= 0.5), the graph routes to generate_retrieve and produces a grounded answer.

- If the score falls below threshold, the graph routes to reask_user instead of hallucinating with poor context.

- A max reask counter prevents infinite clarification loops, after one reask, the system generates the best answer it can with whatever context is available.

This is a key architectural decision: the system prefers asking for clarification over generating a low-confidence response. The quality gate acts as a circuit breaker that protects against hallucination when the vector store doesn't have strong matches for the query.

For a deeper dive into retrieval evaluation and how to measure whether your RAG pipeline is returning the right chunks, see my blog post on RAG Evaluation Metrics and RAGAS.

RETRIEVAL_SCORE_THRESHOLD = 0.5
MAX_REASK_COUNT = 1

def _retrieval_quality_ok(results: list[dict]) -> bool:
    if not results:
        return False
    return results[0].get("score", 0.0) >= RETRIEVAL_SCORE_THRESHOLD

def route_after_retrieve(state: AgentState) -> str:
    if not state.get("needs_reask"):
        return "generate_retrieve"
    if state.get("reask_count", 0) >= MAX_REASK_COUNT:
        return "generate_retrieve"  # fallback after max reasks
    return "reask_user"

1d. Background Summarization

Long conversations can bloat the context window and degrade LLM performance. After each turn completes, the system launches an asynchronous background summarization task using the secondary LLM. If either the user message or the assistant response exceeds a configurable token threshold, it gets compressed into a shorter summary.

This compressed version replaces the original in the DB (while the raw content is preserved in a separate column), so the next turn loads a shorter, denser chat history. The summarization runs concurrently with the user's thinking time, by the time the next message arrives, the compression is already done and flushed to the database.


2. ETL Pipeline & Semantic Chunking

The data pipeline ingests two Kaggle datasets, the Netflix Shows dataset (~8,800 titles) and the IMDb dataset (~1,000 movies), and transforms them into a unified schema before indexing into Qdrant.

Pipeline Steps

  1. Download, Datasets are auto-downloaded via kagglehub on first run.
  2. Load & Parse, CSVs are loaded with Polars for fast, parallel DataFrame processing. Fields like duration (regex extraction), cast/genre (CSV parsing), and media type (normalization to movie/series) are cleaned.
  3. Unify, Both datasets are mapped into a common MediaItem dataclass with fields: title, media_type, director, genre, cast, duration_min, description.
  4. Semantic Document Building, Each MediaItem becomes a single Document with structured natural-language content concatenating all fields. Metadata is preserved as a separate payload for Qdrant filtering.
  5. Batch Indexing, Documents are uploaded to Qdrant in batches of 64 with retry logic and exponential backoff.

2a. Why One Document Per Item (No Fragmentation)

Unlike traditional RAG systems that chunk long documents into overlapping windows, this pipeline uses entity-level semantic chunking: each movie or series is a single document. This is a deliberate choice, a movie's metadata (title, director, cast, genre, description) forms a coherent semantic unit that should never be split across chunks. Splitting would mean the retriever might surface a chunk with an actor's name but lose the genre context, degrading recommendation quality.

For a comprehensive comparison of chunking strategies, from fixed-size windows to semantic and agentic approaches, see my blog post on Chunking Strategies for RAG.

def build_semantic_documents_from_media_item(item: MediaItem) -> Document:
    parts = [f"Title: {item.title or 'Unknown'}"]
    if item.media_type:
        parts.append(f"Type: {item.media_type}")
    if item.director:
        parts.append(f"Director: {item.director}")
    if item.genre:
        parts.append(f"Genres: {', '.join(item.genre)}")
    if item.cast:
        parts.append(f"Cast: {', '.join(item.cast)}")
    if item.duration_min:
        parts.append(f"Duration: {item.duration_min} minutes")
    if item.description:
        parts.append(f"Description: {item.description}")

    metadata = {
        "title": item.title, "media_type": item.media_type,
        "director": item.director, "genre": item.genre,
        "cast": item.cast, "duration_min": item.duration_min,
    }
    return Document(
        page_content="\n".join(parts),
        metadata={k: v for k, v in metadata.items() if v not in (None, "", [])},
    )

3. Hybrid Retriever: Dense + Sparse + Reranking

The retriever is the heart of the RAG pipeline. It implements a three-stage hybrid search using Qdrant's native fusion capabilities, combining the strengths of dense semantic search, sparse lexical matching, and cross-encoder reranking.

For the theory behind dense retrieval, sparse retrieval (BM25/SPLADE), and reranking, including how cosine similarity, Reciprocal Rank Fusion, and cross-encoders work under the hood, see my blog post on Information Retrieval: Dense, Sparse & Reranking.

Hybrid Search Pipeline

Search Stages

  1. Dense Search, Encodes the query with paraphrase-multilingual-MiniLM-L12-v2 (SentenceTransformers) and performs cosine similarity search against the dense vector index. Captures semantic meaning, "suspenseful thriller" will match movies described as "edge-of-your-seat tension".
  2. Sparse Search, Encodes the query with Splade_PP_en_v1 for learned sparse representations. Captures lexical overlap, exact actor names, specific titles, or niche genre terms that dense embeddings might miss.
  3. Reciprocal Rank Fusion (RRF), Qdrant's built-in FusionQuery merges both ranked lists using RRF, which rewards documents that appear high in multiple rankings without being biased by score scale differences. The top 15 candidates are prefetched.
  4. Cross-Encoder Reranking, The top 15 fused results are re-scored using jinaai/jina-reranker-v2-base-multilingual, a cross-encoder that sees query and document together (not independently like bi-encoders). Scores are sigmoid-normalized to [0, 1] for consistent thresholding. The final top 5 are returned.

Model Stack

ComponentModelPurpose
Dense Embeddingsparaphrase-multilingual-MiniLM-L12-v2Semantic similarity search
Sparse EmbeddingsSplade_PP_en_v1Learned lexical matching (like BM25, but trained)
Rerankerjinaai/jina-reranker-v2-base-multilingualCross-encoder re-scoring for precision
Primary LLMGoogle Gemma 3 , gemma4:26b (Ollama via LiteLLM)Grounded answers, general chat, re-ask wording
Secondary LLMGoogle Gemma 3 , gemma4:e4b (Ollama via LiteLLM)Router JSON, contextualize, background summarization
async def search(self, text: str, rerank: bool = True,
                 filter: models.Filter | None = None) -> list[dict]:
    result = await self.async_qdrant_client.query_points(
        collection_name=self.collection_name,
        query=models.FusionQuery(fusion=models.Fusion.RRF),
        prefetch=[
            models.Prefetch(
                query=models.Document(text=text, model=self.dense_model_name),
                using=self.dense_vector_name,
            ),
            models.Prefetch(
                query=models.Document(text=text, model=self.sparse_model_name),
                using=self.sparse_vector_name,
            ),
        ],
        query_filter=filter,
        limit=self.prefetch_limit,  # 15 candidates for reranking
    )

    if rerank:
        texts = [p.payload["page-content"] for p in result.points]
        raw_scores = list(self.reranker.rerank(text, texts))
        norm_scores = [_sigmoid(s) for s in raw_scores]  # normalize to [0, 1]
        ranking = sorted(enumerate(norm_scores), key=lambda x: x[1], reverse=True)
        return [{"score": score, **points[i].payload}
                for i, score in ranking[:self.final_limit]]  # top 5

4. Redis-Backed Streaming, WebSockets & Resumability

The client still talks over a persistent WebSocket, but the server decouples generation from the socket. When a user sends start_conversation or message, the handler spawns generate_to_redis: a background coroutine that opens its own async SQLAlchemy session, runs app_graph.astream_events, and appends every UI event to a Redis Stream (namespaced by message_id). A lightweight relay task on the WebSocket connection reads that stream and forwards entries to the browser.

If the tab reloads or the socket drops mid-stream, inference keeps running. On reconnect the client can send resume_conversation (rebind to a conversation and attach to an active generation if any) or resume_stream with the last known Redis stream id to replay tail events without losing tokens. interrupt sets a Redis flag checked on each graph iteration so the user can still stop generation cooperatively.

WebSocket Event Protocol

EventEmitted WhenPayload
generation_startedBackground task claimed the message idmessage_id string
thinking_start / thinking_endBracket the LangGraph execution phasenull
graph_start / graph_endLangGraph astream_events loop begins / completesnull
node_start / node_endA graph node begins or finishes executionNode name (e.g. contextualize, router, retrieve)
node_outputA node completes with metadataJSON: decision, media_type, documents_count, etc.
response_chunkA token is streamed from the LLMToken string
response_doneGeneration is completeEmpty string
interrupt_ackUser interruption was processednull

4a. Interruption vs disconnect

When the user clicks stop, the client sends interrupt, which sets a Redis-backed interrupt flag keyed by message_id. The detached generate_to_redis loop consults that flag between events; on interrupt it publishes tail events (including an "[message interrupted by the user]" suffix), flushes partial content to PostgreSQL, and clears Redis bookkeeping.

A plain WebSocket disconnect is different: the relay task is cancelled, but generation is intentionally not cancelled, tokens continue landing in the Redis stream so a later resume_stream can catch up. That is what makes refresh-tolerant streaming possible.


5. LiteLLM: Unified LLM Gateway

The system uses LiteLLM as a proxy layer between the application and the LLM providers. Instead of coupling directly to a specific provider's SDK, all LLM calls go through LiteLLM's OpenAI-compatible API. This means the backend can switch from Ollama (local) to OpenAI, Anthropic, vLLM, or any supported provider by changing a single YAML configuration, zero code changes required.

The active stack targets Google Gemma 3-class open weights served locally: a primary route for large, fluent completions and a secondary route for fast, low-temperature structured tasks (router JSON, contextualize, summarization). In the checked-in Docker setup both routes hit Ollama with the tags below.

For a detailed walkthrough of LiteLLM's architecture, proxy modes, and configuration, see my blog post on LiteLLM: One Interface for Every LLM Provider.

model_list:
  - model_name: primary-llm
    litellm_params:
      model: ollama/gemma4:26b
      api_base: http://ollama:11434

  - model_name: secondary-llm
    litellm_params:
      model: ollama/gemma4:e4b
      api_base: http://ollama:11434

6. Infrastructure: Fully Containerized

The entire system is orchestrated with Docker Compose, a single docker compose up spins up seven core services (plus optional qdrant-init profile) with proper networking and health checks:

Docker Compose Services

ServiceTechnologyPortRole
qdrantQdrant6333Vector database for hybrid search
postgresPostgreSQL 155432Conversation and message persistence
redisRedis 7 (AOF)6379Stream bus for resumable generation events + interrupt flags
ollamaOllama11434Local Gemma inference
litellmLiteLLM Proxy4000Unified LLM gateway
backendFastAPI (Python 3.13)8000WebSocket API, Redis relay, LangGraph workers
frontendReact + Vite5173Chat interface + LangGraph visualization

7. Frontend: Live Graph Visualization

The frontend is built with React, TypeScript, Vite, and Tailwind CSS. Beyond the standard chat interface with markdown rendering, message history, and conversation management, the standout feature is the LangGraph Panel, a real-time visualization that shows the agent's decision path as it executes.

As stream events arrive (graph_start, node_start, node_end, node_output, …), the panel updates a visual graph with color-coded nodes in the same conceptual order as production: contextualizerouter → (retrieve | generate_general) → …. SVG edges show the RETRIEVE vs GENERAL split. Each node card surfaces router decisions, media-type badges, retriever stats, or the contextualized question.

The WebSocket client implements automatic reconnection with exponential backoff and, on success, issues resume_conversation / resume_stream so a dropped tab can reattach to Redis-backed progress instead of restarting the answer from scratch.