
Hybrid RAG Search: Combining BM25 and Semantic Search for Better Retrieval
The Retrieval Problem No One Talks About You built a RAG system with a state-of-the-art embedding model. Semantic search …

Most RAG tutorials stop at “put documents in a vector database and query them.” That gets you a demo. It does not get you a system that handles messy real-world documents, scales under load, and returns answers your users trust.
Production RAG has eight distinct stages, each with its own failure modes. A weakness at any stage cascades downstream: poor chunking poisons retrieval, weak retrieval starves the generator, and without evaluation you never know which stage is broken.
This guide walks through every stage of a production RAG pipeline, with practical code and the pitfalls that derail teams. If you have read our earlier posts on chunking strategies and re-ranking techniques , this is the architecture-level view that ties those pieces together.
Ingestion is the unsexy foundation. Your RAG system is only as good as what it can read.
The challenge: Real document corpora include PDFs with embedded tables, HTML with boilerplate navigation, Markdown with code blocks, scanned images, and spreadsheets. Each format requires dedicated parsing.
from pathlib import Path
from langchain_community.document_loaders import (
PyPDFLoader,
UnstructuredHTMLLoader,
UnstructuredMarkdownLoader,
CSVLoader,
)
LOADER_MAP = {
".pdf": PyPDFLoader,
".html": UnstructuredHTMLLoader,
".md": UnstructuredMarkdownLoader,
".csv": CSVLoader,
}
def load_documents(directory: str):
"""Load documents with format-specific parsers."""
documents = []
for path in Path(directory).rglob("*"):
loader_cls = LOADER_MAP.get(path.suffix.lower())
if loader_cls:
try:
docs = loader_cls(str(path)).load()
for doc in docs:
doc.metadata["source"] = str(path)
doc.metadata["format"] = path.suffix.lower()
documents.extend(docs)
except Exception as e:
print(f"Failed to load {path}: {e}")
return documents
Common pitfalls:
Chunking determines what your retrieval system can find. Split documents wrong and the right answer becomes unretrievable.
We covered this in depth in Advanced Chunking Strategies for RAG . Here is the production-level summary.
Strategy selection depends on document type:
| Document Type | Recommended Strategy | Typical Chunk Size |
|---|---|---|
| Technical docs / Markdown | Structure-aware (heading hierarchy) | 512-1024 tokens |
| Legal contracts | Clause-boundary splitting | 256-512 tokens |
| Code repositories | Function/class boundary | Varies |
| Research papers | Section-based | 1024-2048 tokens |
| Mixed corpora | Hybrid with type detection | 512 tokens default |
from langchain.text_splitter import RecursiveCharacterTextSplitter
def create_splitter(doc_format: str):
"""Select chunking strategy based on document format."""
if doc_format in (".md", ".html"):
return RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=200,
separators=["\n## ", "\n### ", "\n\n", "\n", " "],
)
elif doc_format == ".pdf":
return RecursiveCharacterTextSplitter(
chunk_size=600,
chunk_overlap=150,
separators=["\n\n", "\n", ". ", " "],
)
else:
return RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=128,
)
def chunk_documents(documents):
"""Chunk documents using format-appropriate strategies."""
chunks = []
for doc in documents:
splitter = create_splitter(doc.metadata.get("format", ""))
doc_chunks = splitter.split_documents([doc])
for i, chunk in enumerate(doc_chunks):
chunk.metadata["chunk_index"] = i
chunk.metadata["total_chunks"] = len(doc_chunks)
chunks.extend(doc_chunks)
return chunks
Common pitfalls:
Embedding converts chunks into vectors that capture semantic meaning. Model selection and processing decisions here directly affect retrieval quality.
from openai import OpenAI
import numpy as np
client = OpenAI()
def embed_chunks(chunks, model="text-embedding-3-small", batch_size=100):
"""Embed chunks in batches with error handling."""
embeddings = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [chunk.page_content for chunk in batch]
response = client.embeddings.create(input=texts, model=model)
batch_embeddings = [item.embedding for item in response.data]
embeddings.extend(batch_embeddings)
return np.array(embeddings)
Model selection considerations:
| Model | Dimensions | Strengths | Trade-offs |
|---|---|---|---|
| text-embedding-3-small (OpenAI) | 1536 | Strong general performance, easy API | Vendor lock-in, per-token cost |
| text-embedding-3-large (OpenAI) | 3072 | Best-in-class accuracy | Higher cost, larger storage |
| BGE-large-en-v1.5 | 1024 | Open-source, self-hosted | Requires GPU infrastructure |
| E5-mistral-7b-instruct | 4096 | Instruction-following, high quality | Heavy compute requirement |
Common pitfalls:
Your vector database is the retrieval engine. The choice affects query latency, scalability, and what filtering operations are possible.
import chromadb
def create_vector_store(chunks, embeddings, collection_name="production_rag"):
"""Store chunks and embeddings with metadata."""
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)
ids = [f"{chunk.metadata['source']}_{chunk.metadata['chunk_index']}"
for chunk in chunks]
documents = [chunk.page_content for chunk in chunks]
metadatas = [chunk.metadata for chunk in chunks]
collection.add(
ids=ids,
embeddings=embeddings.tolist(),
documents=documents,
metadatas=metadatas,
)
return collection
Indexing strategies matter at scale:
Common pitfalls:
Retrieval is where your pipeline either finds the right information or fails. Semantic search alone is not enough for production systems.
Hybrid search combines the best of both approaches:
from rank_bm25 import BM25Okapi
import numpy as np
class HybridRetriever:
"""Combine BM25 keyword search with semantic vector search."""
def __init__(self, collection, chunks):
self.collection = collection
self.chunks = chunks
tokenized = [chunk.page_content.split() for chunk in chunks]
self.bm25 = BM25Okapi(tokenized)
def retrieve(self, query: str, top_k: int = 10, alpha: float = 0.5):
"""Hybrid retrieval with tunable keyword/semantic balance."""
# Semantic search
semantic_results = self.collection.query(
query_texts=[query],
n_results=top_k * 2,
)
semantic_scores = {
doc_id: 1 - dist # Convert distance to similarity
for doc_id, dist in zip(
semantic_results["ids"][0],
semantic_results["distances"][0],
)
}
# BM25 keyword search
bm25_scores_raw = self.bm25.get_scores(query.split())
top_bm25_idx = np.argsort(bm25_scores_raw)[-top_k * 2:]
max_bm25 = bm25_scores_raw.max() or 1.0
bm25_scores = {
f"{self.chunks[i].metadata['source']}_{self.chunks[i].metadata['chunk_index']}": bm25_scores_raw[i] / max_bm25
for i in top_bm25_idx
}
# Combine scores
all_ids = set(semantic_scores) | set(bm25_scores)
combined = {
doc_id: alpha * semantic_scores.get(doc_id, 0)
+ (1 - alpha) * bm25_scores.get(doc_id, 0)
for doc_id in all_ids
}
ranked = sorted(combined.items(), key=lambda x: x[1], reverse=True)
return ranked[:top_k]
Query expansion improves recall for ambiguous queries:
def expand_query(query: str, llm_client) -> list[str]:
"""Generate query variations to improve retrieval coverage."""
response = llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"Generate 3 alternative phrasings of this search query. "
f"Return only the queries, one per line.\n\nQuery: {query}",
}],
)
variations = response.choices[0].message.content.strip().split("\n")
return [query] + [v.strip() for v in variations if v.strip()]
Common pitfalls:
Initial retrieval casts a wide net. Re-ranking applies a more expensive, more accurate model to sort the candidates.
We covered this thoroughly in Re-ranking Strategies for RAG . The key production patterns:
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")
def rerank(query: str, candidates: list[dict], top_k: int = 5) -> list[dict]:
"""Re-rank retrieved candidates using a cross-encoder."""
pairs = [[query, candidate["content"]] for candidate in candidates]
scores = reranker.predict(pairs)
for candidate, score in zip(candidates, scores):
candidate["rerank_score"] = float(score)
ranked = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
return ranked[:top_k]
When to add re-ranking:
Common pitfalls:
Generation is where retrieved context meets the language model. Prompt construction and context management determine whether your system produces accurate, grounded answers or hallucinated noise.
def build_prompt(query: str, contexts: list[dict]) -> str:
"""Construct a prompt with numbered source attribution."""
context_block = ""
for i, ctx in enumerate(contexts, 1):
source = ctx.get("source", "Unknown")
context_block += f"[Source {i}: {source}]\n{ctx['content']}\n\n"
return f"""Answer the user's question based on the provided sources.
Rules:
- Only use information from the provided sources.
- Cite sources using [Source N] notation.
- If the sources do not contain enough information, say so explicitly.
Sources:
{context_block}
Question: {query}
Answer:"""
def generate_answer(query: str, contexts: list[dict], llm_client):
"""Generate a grounded answer with citations."""
prompt = build_prompt(query, contexts)
response = llm_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a precise research assistant."},
{"role": "user", "content": prompt},
],
temperature=0.1,
)
return response.choices[0].message.content
Context window management:
Common pitfalls:
Without evaluation, you are flying blind. RAG evaluation spans three levels: retrieval quality, generation quality, and end-to-end performance.
def recall_at_k(retrieved_ids: list[str], relevant_ids: set[str], k: int) -> float:
"""Measure what fraction of relevant documents were retrieved."""
retrieved_set = set(retrieved_ids[:k])
return len(retrieved_set & relevant_ids) / len(relevant_ids)
def mean_reciprocal_rank(retrieved_ids: list[str], relevant_ids: set[str]) -> float:
"""MRR: How high does the first relevant document rank?"""
for i, doc_id in enumerate(retrieved_ids, 1):
if doc_id in relevant_ids:
return 1.0 / i
return 0.0
def ndcg_at_k(retrieved_ids: list[str], relevance_scores: dict, k: int) -> float:
"""Normalized Discounted Cumulative Gain."""
import math
dcg = sum(
relevance_scores.get(doc_id, 0) / math.log2(i + 2)
for i, doc_id in enumerate(retrieved_ids[:k])
)
ideal_scores = sorted(relevance_scores.values(), reverse=True)[:k]
idcg = sum(
score / math.log2(i + 2)
for i, score in enumerate(ideal_scores)
)
return dcg / idcg if idcg > 0 else 0.0
What to measure at each level:
Retrieval metrics:
Generation metrics:
End-to-end evaluation:
def evaluate_rag_pipeline(test_cases: list[dict], pipeline) -> dict:
"""Run end-to-end evaluation on a test set."""
results = {"recall": [], "mrr": [], "faithfulness": []}
for case in test_cases:
query = case["query"]
relevant_ids = set(case["relevant_doc_ids"])
# Run pipeline
retrieved = pipeline.retrieve(query, top_k=10)
retrieved_ids = [r["id"] for r in retrieved]
answer = pipeline.generate(query, retrieved[:5])
# Retrieval metrics
results["recall"].append(recall_at_k(retrieved_ids, relevant_ids, k=10))
results["mrr"].append(mean_reciprocal_rank(retrieved_ids, relevant_ids))
# Faithfulness (LLM-as-judge)
faithfulness_score = judge_faithfulness(answer, retrieved[:5])
results["faithfulness"].append(faithfulness_score)
return {
metric: sum(scores) / len(scores)
for metric, scores in results.items()
}
Common pitfalls:
Here is the complete pipeline, wired end to end:
class ProductionRAGPipeline:
"""End-to-end RAG pipeline with all eight stages."""
def __init__(self, doc_directory: str):
# Stage 1: Ingest
documents = load_documents(doc_directory)
# Stage 2: Chunk
self.chunks = chunk_documents(documents)
# Stage 3: Embed
embeddings = embed_chunks(self.chunks)
# Stage 4: Store
self.collection = create_vector_store(
self.chunks, embeddings
)
# Stage 5: Retrieval engine
self.retriever = HybridRetriever(self.collection, self.chunks)
# Stage 6: Re-ranker
self.reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")
# Stage 7: LLM client
self.llm_client = OpenAI()
def query(self, question: str, top_k: int = 5) -> str:
# Retrieve
candidates = self.retriever.retrieve(question, top_k=top_k * 4)
# Re-rank
candidate_docs = [
{"id": cid, "content": self._get_content(cid), "source": cid}
for cid, score in candidates
]
reranked = rerank(question, candidate_docs, top_k=top_k)
# Generate
return generate_answer(question, reranked, self.llm_client)
Each stage is a module you can test, swap, and optimize independently. Start with the simplest implementation at each stage, measure with your evaluation suite, and improve the weakest link.
After working with teams building production RAG, the pattern is consistent. The prototype works in a week. The next three months are spent on:
The teams that ship reliable RAG systems are the ones that invest in evaluation early and treat each pipeline stage as independently testable.
Calliope’s AI Lab provides the notebook and tooling environment where teams build, test, and iterate on each pipeline stage, from document parsing experiments to evaluation harness development. For teams that need RAG-powered chat out of the box, Chat Studio offers configurable retrieval with built-in chunking, hybrid search, and metadata filtering, letting you move from uploaded documents to a working RAG interface without managing infrastructure.
The code patterns in this guide run directly in both environments. Start in the AI Lab to prototype, validate with evaluation metrics, and promote to Chat Studio for user-facing deployment.
Start building RAG pipelines in Calliope

The Retrieval Problem No One Talks About You built a RAG system with a state-of-the-art embedding model. Semantic search …

The Numbers Are Real In late 2022, running inference at GPT-4-equivalent performance cost roughly $20 per million …