preloader
blog post

Building Production RAG: An End-to-End Implementation Guide

author image

From Prototype to Production: What It Actually Takes

Most RAG tutorials stop at “put documents in a vector database and query them.” That gets you a demo. It does not get you a system that handles messy real-world documents, scales under load, and returns answers your users trust.

Production RAG has eight distinct stages, each with its own failure modes. A weakness at any stage cascades downstream: poor chunking poisons retrieval, weak retrieval starves the generator, and without evaluation you never know which stage is broken.

This guide walks through every stage of a production RAG pipeline, with practical code and the pitfalls that derail teams. If you have read our earlier posts on chunking strategies and re-ranking techniques , this is the architecture-level view that ties those pieces together.

Stage 1: Data Ingestion

Ingestion is the unsexy foundation. Your RAG system is only as good as what it can read.

The challenge: Real document corpora include PDFs with embedded tables, HTML with boilerplate navigation, Markdown with code blocks, scanned images, and spreadsheets. Each format requires dedicated parsing.

from pathlib import Path
from langchain_community.document_loaders import (
    PyPDFLoader,
    UnstructuredHTMLLoader,
    UnstructuredMarkdownLoader,
    CSVLoader,
)

LOADER_MAP = {
    ".pdf": PyPDFLoader,
    ".html": UnstructuredHTMLLoader,
    ".md": UnstructuredMarkdownLoader,
    ".csv": CSVLoader,
}

def load_documents(directory: str):
    """Load documents with format-specific parsers."""
    documents = []
    for path in Path(directory).rglob("*"):
        loader_cls = LOADER_MAP.get(path.suffix.lower())
        if loader_cls:
            try:
                docs = loader_cls(str(path)).load()
                for doc in docs:
                    doc.metadata["source"] = str(path)
                    doc.metadata["format"] = path.suffix.lower()
                documents.extend(docs)
            except Exception as e:
                print(f"Failed to load {path}: {e}")
    return documents

Common pitfalls:

  • PDF table extraction. Standard PDF loaders flatten tables into text lines. Use a dedicated table extractor (Camelot, pdfplumber) and store tables as structured data.
  • Encoding issues. Always detect encoding before parsing. A single malformed document can crash a batch job.
  • Boilerplate contamination. HTML pages carry navigation, footers, and ads. Strip boilerplate before chunking or your embeddings will be polluted with repeated noise.
  • Ignoring metadata. Capture source path, format, modification date, and document title at load time. You will need these for filtering and debugging later.

Stage 2: Chunking

Chunking determines what your retrieval system can find. Split documents wrong and the right answer becomes unretrievable.

We covered this in depth in Advanced Chunking Strategies for RAG . Here is the production-level summary.

Strategy selection depends on document type:

Document TypeRecommended StrategyTypical Chunk Size
Technical docs / MarkdownStructure-aware (heading hierarchy)512-1024 tokens
Legal contractsClause-boundary splitting256-512 tokens
Code repositoriesFunction/class boundaryVaries
Research papersSection-based1024-2048 tokens
Mixed corporaHybrid with type detection512 tokens default
from langchain.text_splitter import RecursiveCharacterTextSplitter

def create_splitter(doc_format: str):
    """Select chunking strategy based on document format."""
    if doc_format in (".md", ".html"):
        return RecursiveCharacterTextSplitter(
            chunk_size=800,
            chunk_overlap=200,
            separators=["\n## ", "\n### ", "\n\n", "\n", " "],
        )
    elif doc_format == ".pdf":
        return RecursiveCharacterTextSplitter(
            chunk_size=600,
            chunk_overlap=150,
            separators=["\n\n", "\n", ". ", " "],
        )
    else:
        return RecursiveCharacterTextSplitter(
            chunk_size=512,
            chunk_overlap=128,
        )

def chunk_documents(documents):
    """Chunk documents using format-appropriate strategies."""
    chunks = []
    for doc in documents:
        splitter = create_splitter(doc.metadata.get("format", ""))
        doc_chunks = splitter.split_documents([doc])
        for i, chunk in enumerate(doc_chunks):
            chunk.metadata["chunk_index"] = i
            chunk.metadata["total_chunks"] = len(doc_chunks)
        chunks.extend(doc_chunks)
    return chunks

Common pitfalls:

  • One-size-fits-all chunking. A 512-token fixed split works acceptably for homogeneous text. It fails on tables, code, and structured documents.
  • No overlap. Zero overlap means information at chunk boundaries is effectively lost. Use 15-25% overlap as a baseline.
  • Ignoring chunk-to-document relationships. Always store the parent document ID and chunk position. You need these for context expansion at retrieval time.

Stage 3: Embedding

Embedding converts chunks into vectors that capture semantic meaning. Model selection and processing decisions here directly affect retrieval quality.

from openai import OpenAI
import numpy as np

client = OpenAI()

def embed_chunks(chunks, model="text-embedding-3-small", batch_size=100):
    """Embed chunks in batches with error handling."""
    embeddings = []
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        texts = [chunk.page_content for chunk in batch]
        response = client.embeddings.create(input=texts, model=model)
        batch_embeddings = [item.embedding for item in response.data]
        embeddings.extend(batch_embeddings)
    return np.array(embeddings)

Model selection considerations:

ModelDimensionsStrengthsTrade-offs
text-embedding-3-small (OpenAI)1536Strong general performance, easy APIVendor lock-in, per-token cost
text-embedding-3-large (OpenAI)3072Best-in-class accuracyHigher cost, larger storage
BGE-large-en-v1.51024Open-source, self-hostedRequires GPU infrastructure
E5-mistral-7b-instruct4096Instruction-following, high qualityHeavy compute requirement

Common pitfalls:

  • Not batching. Embedding one chunk at a time is 10-50x slower than batching. Always batch.
  • Ignoring dimensionality trade-offs. Higher dimensions capture more nuance but increase storage and search cost. For most use cases, 1024-1536 dimensions are sufficient.
  • Inconsistent embedding models. If you embed documents with model A, you must query with model A. Mixing models produces meaningless similarity scores.
  • No normalization. Some models return unnormalized vectors. Normalize to unit length if your similarity metric is cosine similarity.

Stage 4: Storage

Your vector database is the retrieval engine. The choice affects query latency, scalability, and what filtering operations are possible.

import chromadb

def create_vector_store(chunks, embeddings, collection_name="production_rag"):
    """Store chunks and embeddings with metadata."""
    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_or_create_collection(
        name=collection_name,
        metadata={"hnsw:space": "cosine"},
    )

    ids = [f"{chunk.metadata['source']}_{chunk.metadata['chunk_index']}"
           for chunk in chunks]
    documents = [chunk.page_content for chunk in chunks]
    metadatas = [chunk.metadata for chunk in chunks]

    collection.add(
        ids=ids,
        embeddings=embeddings.tolist(),
        documents=documents,
        metadatas=metadatas,
    )
    return collection

Indexing strategies matter at scale:

  • HNSW (Hierarchical Navigable Small World). Best for most use cases. Fast approximate nearest neighbor search with tunable recall.
  • IVF (Inverted File Index). Better for very large datasets (100M+ vectors). Requires training on your data.
  • Flat/brute-force. Only viable under ~100K vectors. Exact results but does not scale.

Common pitfalls:

  • No metadata indexing. If you need to filter by date, source, or document type, ensure your vector database supports metadata filtering and that you have indexed those fields.
  • Ignoring persistence. In-memory vector stores lose data on restart. Use persistent storage or a managed service for anything beyond prototyping.
  • Skipping ID strategy. Without deterministic IDs, re-indexing creates duplicates. Use source + chunk index as a compound key.

Stage 5: Retrieval

Retrieval is where your pipeline either finds the right information or fails. Semantic search alone is not enough for production systems.

Hybrid search combines the best of both approaches:

from rank_bm25 import BM25Okapi
import numpy as np

class HybridRetriever:
    """Combine BM25 keyword search with semantic vector search."""

    def __init__(self, collection, chunks):
        self.collection = collection
        self.chunks = chunks
        tokenized = [chunk.page_content.split() for chunk in chunks]
        self.bm25 = BM25Okapi(tokenized)

    def retrieve(self, query: str, top_k: int = 10, alpha: float = 0.5):
        """Hybrid retrieval with tunable keyword/semantic balance."""
        # Semantic search
        semantic_results = self.collection.query(
            query_texts=[query],
            n_results=top_k * 2,
        )
        semantic_scores = {
            doc_id: 1 - dist  # Convert distance to similarity
            for doc_id, dist in zip(
                semantic_results["ids"][0],
                semantic_results["distances"][0],
            )
        }

        # BM25 keyword search
        bm25_scores_raw = self.bm25.get_scores(query.split())
        top_bm25_idx = np.argsort(bm25_scores_raw)[-top_k * 2:]
        max_bm25 = bm25_scores_raw.max() or 1.0
        bm25_scores = {
            f"{self.chunks[i].metadata['source']}_{self.chunks[i].metadata['chunk_index']}": bm25_scores_raw[i] / max_bm25
            for i in top_bm25_idx
        }

        # Combine scores
        all_ids = set(semantic_scores) | set(bm25_scores)
        combined = {
            doc_id: alpha * semantic_scores.get(doc_id, 0)
            + (1 - alpha) * bm25_scores.get(doc_id, 0)
            for doc_id in all_ids
        }

        ranked = sorted(combined.items(), key=lambda x: x[1], reverse=True)
        return ranked[:top_k]

Query expansion improves recall for ambiguous queries:

def expand_query(query: str, llm_client) -> list[str]:
    """Generate query variations to improve retrieval coverage."""
    response = llm_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"Generate 3 alternative phrasings of this search query. "
                       f"Return only the queries, one per line.\n\nQuery: {query}",
        }],
    )
    variations = response.choices[0].message.content.strip().split("\n")
    return [query] + [v.strip() for v in variations if v.strip()]

Common pitfalls:

  • Semantic search only. Pure vector search misses exact keyword matches. A query for “error code 5012” needs BM25 to find the exact code.
  • Fixed top-k. Retrieving exactly 5 chunks regardless of query complexity leaves information on the table. Retrieve more than you need and let re-ranking handle precision.
  • No metadata filtering. If your corpus spans multiple products or time periods, always filter before searching. Retrieving outdated documentation is worse than retrieving nothing.

Stage 6: Re-ranking

Initial retrieval casts a wide net. Re-ranking applies a more expensive, more accurate model to sort the candidates.

We covered this thoroughly in Re-ranking Strategies for RAG . The key production patterns:

from sentence_transformers import CrossEncoder

reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")

def rerank(query: str, candidates: list[dict], top_k: int = 5) -> list[dict]:
    """Re-rank retrieved candidates using a cross-encoder."""
    pairs = [[query, candidate["content"]] for candidate in candidates]
    scores = reranker.predict(pairs)

    for candidate, score in zip(candidates, scores):
        candidate["rerank_score"] = float(score)

    ranked = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
    return ranked[:top_k]

When to add re-ranking:

  • Your initial retrieval returns 20+ candidates and you need to narrow to 3-5.
  • Precision matters more than latency (enterprise search, compliance, legal).
  • You observe that relevant documents appear in positions 5-15 but rarely in the top 3.

Common pitfalls:

  • Re-ranking too few candidates. If you retrieve 5 and re-rank 5, you gain nothing. Retrieve 20-50, re-rank to 3-5.
  • Ignoring latency. Cross-encoder re-ranking adds 50-200ms per query. For real-time applications, batch candidates and use smaller cross-encoder models.

Stage 7: Generation

Generation is where retrieved context meets the language model. Prompt construction and context management determine whether your system produces accurate, grounded answers or hallucinated noise.

def build_prompt(query: str, contexts: list[dict]) -> str:
    """Construct a prompt with numbered source attribution."""
    context_block = ""
    for i, ctx in enumerate(contexts, 1):
        source = ctx.get("source", "Unknown")
        context_block += f"[Source {i}: {source}]\n{ctx['content']}\n\n"

    return f"""Answer the user's question based on the provided sources.
Rules:
- Only use information from the provided sources.
- Cite sources using [Source N] notation.
- If the sources do not contain enough information, say so explicitly.

Sources:
{context_block}

Question: {query}

Answer:"""


def generate_answer(query: str, contexts: list[dict], llm_client):
    """Generate a grounded answer with citations."""
    prompt = build_prompt(query, contexts)

    response = llm_client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are a precise research assistant."},
            {"role": "user", "content": prompt},
        ],
        temperature=0.1,
    )
    return response.choices[0].message.content

Context window management:

  • Budget your context. If your model supports 128K tokens, do not fill it. More context means more noise and higher latency. 3-5 focused chunks typically outperform 20 loosely related ones.
  • Order matters. Place the most relevant chunks first. Models attend more strongly to early context.
  • Truncate gracefully. If total context exceeds your budget, drop the lowest-ranked chunks rather than truncating mid-chunk.

Common pitfalls:

  • No attribution instructions. Without explicit citation requirements, the model will blend and rephrase sources in ways that make verification impossible.
  • Temperature too high. For factual RAG responses, use temperature 0.0-0.2. Creative generation settings cause hallucination.
  • Stuffing the context window. More context is not better. Irrelevant context actively degrades answer quality.

Stage 8: Evaluation

Without evaluation, you are flying blind. RAG evaluation spans three levels: retrieval quality, generation quality, and end-to-end performance.

def recall_at_k(retrieved_ids: list[str], relevant_ids: set[str], k: int) -> float:
    """Measure what fraction of relevant documents were retrieved."""
    retrieved_set = set(retrieved_ids[:k])
    return len(retrieved_set & relevant_ids) / len(relevant_ids)


def mean_reciprocal_rank(retrieved_ids: list[str], relevant_ids: set[str]) -> float:
    """MRR: How high does the first relevant document rank?"""
    for i, doc_id in enumerate(retrieved_ids, 1):
        if doc_id in relevant_ids:
            return 1.0 / i
    return 0.0


def ndcg_at_k(retrieved_ids: list[str], relevance_scores: dict, k: int) -> float:
    """Normalized Discounted Cumulative Gain."""
    import math

    dcg = sum(
        relevance_scores.get(doc_id, 0) / math.log2(i + 2)
        for i, doc_id in enumerate(retrieved_ids[:k])
    )
    ideal_scores = sorted(relevance_scores.values(), reverse=True)[:k]
    idcg = sum(
        score / math.log2(i + 2)
        for i, score in enumerate(ideal_scores)
    )
    return dcg / idcg if idcg > 0 else 0.0

What to measure at each level:

Retrieval metrics:

  • Recall@k – Are the relevant documents in the retrieved set? Target: >0.8 at k=10.
  • MRR – Is the first relevant document ranked high? Target: >0.5.
  • NDCG@k – Are relevant documents ranked in the right order? Target: >0.6.

Generation metrics:

  • Faithfulness – Does the answer only contain information from the sources? Use an LLM-as-judge to verify claims against retrieved context.
  • Relevance – Does the answer address the question? Score with both automated metrics and human evaluation.
  • Completeness – Does the answer cover all aspects of the question?

End-to-end evaluation:

def evaluate_rag_pipeline(test_cases: list[dict], pipeline) -> dict:
    """Run end-to-end evaluation on a test set."""
    results = {"recall": [], "mrr": [], "faithfulness": []}

    for case in test_cases:
        query = case["query"]
        relevant_ids = set(case["relevant_doc_ids"])

        # Run pipeline
        retrieved = pipeline.retrieve(query, top_k=10)
        retrieved_ids = [r["id"] for r in retrieved]
        answer = pipeline.generate(query, retrieved[:5])

        # Retrieval metrics
        results["recall"].append(recall_at_k(retrieved_ids, relevant_ids, k=10))
        results["mrr"].append(mean_reciprocal_rank(retrieved_ids, relevant_ids))

        # Faithfulness (LLM-as-judge)
        faithfulness_score = judge_faithfulness(answer, retrieved[:5])
        results["faithfulness"].append(faithfulness_score)

    return {
        metric: sum(scores) / len(scores)
        for metric, scores in results.items()
    }

Common pitfalls:

  • No test set. You cannot evaluate without labeled query-document pairs. Build a test set of 50-100 queries with known relevant documents before optimizing.
  • Only measuring retrieval. High retrieval recall with low faithfulness means your generator is hallucinating despite having good context.
  • Evaluating once. Evaluation must be continuous. As your corpus grows and user queries evolve, performance drifts.

Putting It All Together

Here is the complete pipeline, wired end to end:

class ProductionRAGPipeline:
    """End-to-end RAG pipeline with all eight stages."""

    def __init__(self, doc_directory: str):
        # Stage 1: Ingest
        documents = load_documents(doc_directory)

        # Stage 2: Chunk
        self.chunks = chunk_documents(documents)

        # Stage 3: Embed
        embeddings = embed_chunks(self.chunks)

        # Stage 4: Store
        self.collection = create_vector_store(
            self.chunks, embeddings
        )

        # Stage 5: Retrieval engine
        self.retriever = HybridRetriever(self.collection, self.chunks)

        # Stage 6: Re-ranker
        self.reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")

        # Stage 7: LLM client
        self.llm_client = OpenAI()

    def query(self, question: str, top_k: int = 5) -> str:
        # Retrieve
        candidates = self.retriever.retrieve(question, top_k=top_k * 4)

        # Re-rank
        candidate_docs = [
            {"id": cid, "content": self._get_content(cid), "source": cid}
            for cid, score in candidates
        ]
        reranked = rerank(question, candidate_docs, top_k=top_k)

        # Generate
        return generate_answer(question, reranked, self.llm_client)

Each stage is a module you can test, swap, and optimize independently. Start with the simplest implementation at each stage, measure with your evaluation suite, and improve the weakest link.

Where Teams Get Stuck

After working with teams building production RAG, the pattern is consistent. The prototype works in a week. The next three months are spent on:

  1. Document parsing edge cases. The 5% of documents that break your parser cause 50% of bad answers.
  2. Chunking for heterogeneous corpora. A single strategy rarely works when your corpus includes contracts, code, and conversation logs.
  3. Evaluation infrastructure. Teams that skip evaluation end up debugging by reading individual query results, which does not scale.
  4. Drift. The corpus grows, user behavior shifts, and what worked at 10K documents breaks at 500K.

The teams that ship reliable RAG systems are the ones that invest in evaluation early and treat each pipeline stage as independently testable.

Building RAG Pipelines in Calliope

Calliope’s AI Lab provides the notebook and tooling environment where teams build, test, and iterate on each pipeline stage, from document parsing experiments to evaluation harness development. For teams that need RAG-powered chat out of the box, Chat Studio offers configurable retrieval with built-in chunking, hybrid search, and metadata filtering, letting you move from uploaded documents to a working RAG interface without managing infrastructure.

The code patterns in this guide run directly in both environments. Start in the AI Lab to prototype, validate with evaluation metrics, and promote to Chat Studio for user-facing deployment.

Start building RAG pipelines in Calliope

Sources

  1. Lewis, P. et al. “Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks.” NeurIPS 2020.
  2. Robertson, S. & Zaragoza, H. “The Probabilistic Relevance Framework: BM25 and Beyond.” Foundations and Trends in Information Retrieval, 2009.
  3. Nogueira, R. & Cho, K. “Passage Re-ranking with BERT.” arXiv:1901.04085, 2019.
  4. Es, S. et al. “RAGAS: Automated Evaluation of Retrieval Augmented Generation.” arXiv:2309.15217, 2023.
  5. Gao, Y. et al. “Retrieval-Augmented Generation for Large Language Models: A Survey.” arXiv:2312.10997, 2023.

Related Articles