RAG Pipeline Observability: Tracing Retrieval, Chunking, and Embedding Quality

RAG Pipeline Observability: Tracing Retrieval, Chunking, and Embedding Quality

When a RAG application returns a bad answer, the instinct is to blame the LLM. In practice, the LLM is usually doing exactly what it was asked — generating a response based on whatever context it received. The real question is whether that context was actually useful.

A RAG pipeline has at least five distinct failure points before the LLM generates a single token: the query embedding may not capture the user’s intent, the vector search may return the wrong documents, the ranking may surface noise over signal, the context assembly may truncate the relevant parts, and chunking decisions made weeks earlier during indexing may have broken semantic units that the retriever can never recover. None of these failures produce an error. They all produce a response that looks fine until you read it carefully.

This post instruments every stage of a RAG pipeline with OpenTelemetry spans and quality signals, giving you the observability to answer one question when a response goes wrong: which layer failed?

The RAG Failure Taxonomy

Before building the instrumentation, it helps to name the failure modes you are instrumenting against. RAG failures fall into three categories:

Retrieval failures occur when the vector search returns documents that do not contain the answer. The LLM then either hallucinates a plausible answer or correctly refuses to answer — but either way the response is wrong because the retrieval was wrong. Context precision (what fraction of retrieved documents were actually relevant) is the key metric here.

Chunking failures are sneakier. They happen at indexing time when documents are split in ways that destroy semantic units — a table split across two chunks, a list item separated from its heading, a paragraph referencing a figure that got routed to a different chunk. Research published in 2025 found that naive fixed-size chunking produces faithfulness scores of 0.47 to 0.51, while optimized semantic chunking reaches 0.79 to 0.82. Roughly 80% of RAG failures in that study traced back to chunking decisions, not retrieval or generation quality.

Embedding drift happens when the semantic distribution of your incoming queries shifts away from the distribution your embedding model was trained on, or when your knowledge base was embedded with an older model version than the one now handling queries. Embeddings from different model versions are not comparable — a cosine similarity score between a v2 query embedding and a v1 document embedding is meaningless.

RAG Pipeline Observability Architecture

flowchart TD
    U[User Query] --> A

    subgraph TRACE["Single RAG Trace - OpenTelemetry"]
        A[rag.query span\nroot span] --> B
        B[rag.embed_query span\nEmbedding model + latency] --> C
        C[rag.vector_search span\nDB query + result count] --> D
        D[rag.rank_results span\nReranker scores + filter] --> E
        E[rag.assemble_context span\nToken count + chunk IDs] --> F
        F[rag.llm_generate span\nModel + tokens + cost]
    end

    subgraph QUALITY["Quality Signals per Span"]
        B --> QB[embedding_model_version\nquery_embedding_ms]
        C --> QC[docs_retrieved_count\ntop_score\nbottom_score\nscore_spread]
        D --> QD[docs_after_filter\nreranker_model\ntop_rerank_score]
        E --> QE[context_tokens\nchunks_included\ntruncated_flag]
        F --> QF[faithfulness_score\nrelevance_score\nfinish_reason]
    end

    TRACE --> OT[OpenTelemetry Collector\nOTLP]
    OT --> L[Langfuse\nTrace storage]
    OT --> P[Prometheus\nMetrics]
    P --> G[Grafana\nRAG quality dashboard]

    style TRACE fill:#0d1b2e,color:#ffffff
    style QUALITY fill:#1a2e1a,color:#ffffff

Key RAG Metrics to Track

MetricWhat it tells youFailure threshold
Context precisionFraction of retrieved docs that were relevant to the queryBelow 0.6 consistently
Context recallFraction of relevant docs that were actually retrievedBelow 0.7 on known queries
FaithfulnessWhether the response is grounded in the retrieved contextBelow 0.75 (covered in Part 4)
Answer relevanceWhether the response addresses what the user askedBelow 0.80
Score spreadDifference between top and bottom retrieval similarity scoreSpread below 0.1 — all docs similar quality, no signal
Docs after filterHow many docs survive reranking thresholdZero docs after filter — no usable context
Context token ratioRetrieved tokens vs context window usedAbove 0.85 — context window pressure
Embedding model version mismatchQuery and document embeddings from different model versionsAny mismatch is a hard failure

Node.js Implementation

// rag-pipeline.js
const { trace, SpanStatusCode } = require('@opentelemetry/api');
const { OpenAI } = require('openai');
const { QdrantClient } = require('@qdrant/js-client-rest');

const tracer = trace.getTracer('rag-service');
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const qdrant = new QdrantClient({ url: process.env.QDRANT_URL });

const EMBEDDING_MODEL = 'text-embedding-3-small';
const EMBEDDING_MODEL_VERSION = 'text-embedding-3-small-v1'; // track explicitly
const COLLECTION = 'knowledge-base';

async function ragQuery(userQuery, options = {}) {
  return tracer.startActiveSpan('rag.query', async (rootSpan) => {
    rootSpan.setAttributes({
      'rag.query': userQuery.slice(0, 200),
      'rag.feature': options.feature || 'default',
    });

    try {
      // Step 1: Embed the query
      const { embedding, embeddingMs } = await embedQuery(userQuery);

      // Step 2: Vector search
      const { docs, searchMs } = await vectorSearch(embedding, options.topK || 10);

      // Step 3: Rerank
      const rankedDocs = await rerankResults(userQuery, docs);

      // Step 4: Assemble context
      const { context, chunkIds, truncated } = assembleContext(rankedDocs, options.maxTokens || 3000);

      // Step 5: Generate
      const response = await generateResponse(userQuery, context, {
        embeddingMs,
        searchMs,
        chunkIds,
        truncated,
      });

      rootSpan.setAttributes({
        'rag.total_docs_retrieved': docs.length,
        'rag.docs_after_rerank': rankedDocs.length,
        'rag.context_truncated': truncated,
      });

      rootSpan.setStatus({ code: SpanStatusCode.OK });
      rootSpan.end();
      return response;

    } catch (err) {
      rootSpan.recordException(err);
      rootSpan.setStatus({ code: SpanStatusCode.ERROR, message: err.message });
      rootSpan.end();
      throw err;
    }
  });
}

async function embedQuery(query) {
  return tracer.startActiveSpan('rag.embed_query', async (span) => {
    const start = Date.now();

    const result = await openai.embeddings.create({
      model: EMBEDDING_MODEL,
      input: query,
    });

    const embeddingMs = Date.now() - start;

    span.setAttributes({
      'embedding.model': EMBEDDING_MODEL,
      'embedding.model_version': EMBEDDING_MODEL_VERSION,
      'embedding.dimensions': result.data[0].embedding.length,
      'embedding.latency_ms': embeddingMs,
    });

    span.end();
    return { embedding: result.data[0].embedding, embeddingMs };
  });
}

async function vectorSearch(embedding, topK) {
  return tracer.startActiveSpan('rag.vector_search', async (span) => {
    const start = Date.now();

    const results = await qdrant.search(COLLECTION, {
      vector: embedding,
      limit: topK,
      with_payload: true,
    });

    const searchMs = Date.now() - start;
    const scores = results.map(r => r.score);
    const topScore = Math.max(...scores);
    const bottomScore = Math.min(...scores);

    span.setAttributes({
      'retrieval.docs_returned': results.length,
      'retrieval.top_score': topScore,
      'retrieval.bottom_score': bottomScore,
      'retrieval.score_spread': topScore - bottomScore,
      'retrieval.search_latency_ms': searchMs,
      'retrieval.collection': COLLECTION,
    });

    // Alert signal: if score spread is tiny, all docs are equally (un)relevant
    if (topScore - bottomScore < 0.05) {
      span.addEvent('retrieval.low_score_spread', {
        'warning': 'All retrieved docs have similar scores -- possible retrieval quality issue',
      });
    }

    span.end();
    return { docs: results, searchMs };
  });
}

async function rerankResults(query, docs, threshold = 0.5) {
  return tracer.startActiveSpan('rag.rank_results', async (span) => {
    // Filter by relevance threshold -- in production use a cross-encoder reranker
    const ranked = docs
      .filter(doc => doc.score >= threshold)
      .sort((a, b) => b.score - a.score)
      .slice(0, 5); // top 5 after filtering

    span.setAttributes({
      'rerank.docs_input': docs.length,
      'rerank.docs_output': ranked.length,
      'rerank.threshold': threshold,
      'rerank.top_score': ranked[0]?.score || 0,
    });

    if (ranked.length === 0) {
      span.addEvent('rerank.no_docs_passed_threshold', {
        'warning': 'All retrieved documents fell below relevance threshold',
      });
    }

    span.end();
    return ranked;
  });
}

function assembleContext(docs, maxTokens) {
  return tracer.startActiveSpan('rag.assemble_context', (span) => {
    let context = '';
    const chunkIds = [];
    let truncated = false;
    let estimatedTokens = 0;

    for (const doc of docs) {
      const text = doc.payload?.text || '';
      // Rough token estimate: 1 token per ~4 chars
      const docTokens = Math.ceil(text.length / 4);

      if (estimatedTokens + docTokens > maxTokens) {
        truncated = true;
        break;
      }

      context += `\n---\n${text}`;
      chunkIds.push(doc.id);
      estimatedTokens += docTokens;
    }

    span.setAttributes({
      'context.chunks_included': chunkIds.length,
      'context.estimated_tokens': estimatedTokens,
      'context.max_tokens': maxTokens,
      'context.truncated': truncated,
      'context.token_ratio': estimatedTokens / maxTokens,
    });

    span.end();
    return { context, chunkIds, truncated };
  });
}

module.exports = { ragQuery };

Python Implementation

# rag_pipeline.py
import os
import time
from opentelemetry import trace
from opentelemetry.trace import SpanKind, StatusCode
from openai import AsyncOpenAI
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import SearchRequest

tracer = trace.get_tracer("rag-service")
openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
qdrant = AsyncQdrantClient(url=os.getenv("QDRANT_URL"))

EMBEDDING_MODEL = "text-embedding-3-small"
EMBEDDING_MODEL_VERSION = "text-embedding-3-small-v1"
COLLECTION = "knowledge-base"


async def rag_query(user_query: str, feature: str = "default", top_k: int = 10, max_tokens: int = 3000) -> str:
    with tracer.start_as_current_span("rag.query") as root_span:
        root_span.set_attribute("rag.query", user_query[:200])
        root_span.set_attribute("rag.feature", feature)

        try:
            embedding, embedding_ms = await embed_query(user_query)
            docs, search_ms = await vector_search(embedding, top_k)
            ranked_docs = await rerank_results(user_query, docs)
            context, chunk_ids, truncated = assemble_context(ranked_docs, max_tokens)
            response = await generate_response(user_query, context)

            root_span.set_attribute("rag.total_docs_retrieved", len(docs))
            root_span.set_attribute("rag.docs_after_rerank", len(ranked_docs))
            root_span.set_attribute("rag.context_truncated", truncated)
            root_span.set_status(StatusCode.OK)
            return response

        except Exception as e:
            root_span.record_exception(e)
            root_span.set_status(StatusCode.ERROR, str(e))
            raise


async def embed_query(query: str) -> tuple[list[float], int]:
    with tracer.start_as_current_span("rag.embed_query") as span:
        start = time.perf_counter()
        result = await openai.embeddings.create(model=EMBEDDING_MODEL, input=query)
        embedding_ms = int((time.perf_counter() - start) * 1000)
        embedding = result.data[0].embedding

        span.set_attribute("embedding.model", EMBEDDING_MODEL)
        span.set_attribute("embedding.model_version", EMBEDDING_MODEL_VERSION)
        span.set_attribute("embedding.dimensions", len(embedding))
        span.set_attribute("embedding.latency_ms", embedding_ms)

        return embedding, embedding_ms


async def vector_search(embedding: list[float], top_k: int) -> tuple[list, int]:
    with tracer.start_as_current_span("rag.vector_search") as span:
        start = time.perf_counter()
        results = await qdrant.search(
            collection_name=COLLECTION,
            query_vector=embedding,
            limit=top_k,
            with_payload=True,
        )
        search_ms = int((time.perf_counter() - start) * 1000)

        scores = [r.score for r in results]
        top_score = max(scores) if scores else 0.0
        bottom_score = min(scores) if scores else 0.0
        score_spread = top_score - bottom_score

        span.set_attribute("retrieval.docs_returned", len(results))
        span.set_attribute("retrieval.top_score", round(top_score, 4))
        span.set_attribute("retrieval.bottom_score", round(bottom_score, 4))
        span.set_attribute("retrieval.score_spread", round(score_spread, 4))
        span.set_attribute("retrieval.search_latency_ms", search_ms)

        if score_spread < 0.05:
            span.add_event("retrieval.low_score_spread",
                           {"warning": "All retrieved docs have similar scores"})

        return results, search_ms


async def rerank_results(query: str, docs: list, threshold: float = 0.5) -> list:
    with tracer.start_as_current_span("rag.rank_results") as span:
        ranked = sorted(
            [d for d in docs if d.score >= threshold],
            key=lambda d: d.score,
            reverse=True,
        )[:5]

        span.set_attribute("rerank.docs_input", len(docs))
        span.set_attribute("rerank.docs_output", len(ranked))
        span.set_attribute("rerank.threshold", threshold)
        span.set_attribute("rerank.top_score", ranked[0].score if ranked else 0.0)

        if not ranked:
            span.add_event("rerank.no_docs_passed_threshold",
                           {"warning": "All retrieved documents below relevance threshold"})

        return ranked


def assemble_context(docs: list, max_tokens: int) -> tuple[str, list, bool]:
    with tracer.start_as_current_span("rag.assemble_context") as span:
        context_parts = []
        chunk_ids = []
        estimated_tokens = 0
        truncated = False

        for doc in docs:
            text = doc.payload.get("text", "")
            doc_tokens = len(text) // 4  # rough estimate

            if estimated_tokens + doc_tokens > max_tokens:
                truncated = True
                break

            context_parts.append(text)
            chunk_ids.append(str(doc.id))
            estimated_tokens += doc_tokens

        context = "\n---\n".join(context_parts)

        span.set_attribute("context.chunks_included", len(chunk_ids))
        span.set_attribute("context.estimated_tokens", estimated_tokens)
        span.set_attribute("context.max_tokens", max_tokens)
        span.set_attribute("context.truncated", truncated)
        span.set_attribute("context.token_ratio", round(estimated_tokens / max_tokens, 3))

        return context, chunk_ids, truncated

C# Implementation

// RagPipeline.cs
using System.Diagnostics;
using Azure.AI.OpenAI;
using Qdrant.Client;
using Qdrant.Client.Grpc;
using OpenTelemetry.Trace;

public class RagPipeline
{
    private static readonly ActivitySource Source = new("rag-service");
    private readonly AzureOpenAIClient _openai;
    private readonly QdrantClient _qdrant;
    private const string EmbeddingModel = "text-embedding-3-small";
    private const string EmbeddingModelVersion = "text-embedding-3-small-v1";
    private const string Collection = "knowledge-base";

    public RagPipeline(IConfiguration config)
    {
        _openai = new AzureOpenAIClient(
            new Uri(config["AzureOpenAI:Endpoint"]!),
            new System.ClientModel.ApiKeyCredential(config["AzureOpenAI:ApiKey"]!)
        );
        _qdrant = new QdrantClient(config["Qdrant:Host"]!, config.GetValue<int>("Qdrant:Port", 6334));
    }

    public async Task<string> RagQueryAsync(string userQuery, string feature = "default", int topK = 10)
    {
        using var rootActivity = Source.StartActivity("rag.query");
        rootActivity?.SetTag("rag.query", userQuery[..Math.Min(200, userQuery.Length)]);
        rootActivity?.SetTag("rag.feature", feature);

        try
        {
            var (embedding, embeddingMs) = await EmbedQueryAsync(userQuery);
            var (docs, searchMs) = await VectorSearchAsync(embedding, topK);
            var rankedDocs = await RerankResultsAsync(userQuery, docs);
            var (context, chunkIds, truncated) = AssembleContext(rankedDocs, 3000);
            var response = await GenerateResponseAsync(userQuery, context);

            rootActivity?.SetTag("rag.total_docs_retrieved", docs.Count);
            rootActivity?.SetTag("rag.docs_after_rerank", rankedDocs.Count);
            rootActivity?.SetTag("rag.context_truncated", truncated);
            rootActivity?.SetStatus(ActivityStatusCode.Ok);
            return response;
        }
        catch (Exception ex)
        {
            rootActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);
            rootActivity?.RecordException(ex);
            throw;
        }
    }

    private async Task<(float[] Embedding, long EmbeddingMs)> EmbedQueryAsync(string query)
    {
        using var span = Source.StartActivity("rag.embed_query");
        var sw = Stopwatch.StartNew();

        var embeddingClient = _openai.GetEmbeddingClient(EmbeddingModel);
        var result = await embeddingClient.GenerateEmbeddingAsync(query);
        var embedding = result.Value.ToFloats().ToArray();

        sw.Stop();
        span?.SetTag("embedding.model", EmbeddingModel);
        span?.SetTag("embedding.model_version", EmbeddingModelVersion);
        span?.SetTag("embedding.dimensions", embedding.Length);
        span?.SetTag("embedding.latency_ms", sw.ElapsedMilliseconds);

        return (embedding, sw.ElapsedMilliseconds);
    }

    private async Task<(List<ScoredPoint> Docs, long SearchMs)> VectorSearchAsync(float[] embedding, int topK)
    {
        using var span = Source.StartActivity("rag.vector_search");
        var sw = Stopwatch.StartNew();

        var results = await _qdrant.SearchAsync(Collection,
            embedding, limit: (ulong)topK, payloadSelector: true);

        sw.Stop();
        var docs = results.ToList();
        var scores = docs.Select(d => d.Score).ToList();
        var topScore = scores.Any() ? scores.Max() : 0f;
        var bottomScore = scores.Any() ? scores.Min() : 0f;
        var spread = topScore - bottomScore;

        span?.SetTag("retrieval.docs_returned", docs.Count);
        span?.SetTag("retrieval.top_score", Math.Round(topScore, 4));
        span?.SetTag("retrieval.bottom_score", Math.Round(bottomScore, 4));
        span?.SetTag("retrieval.score_spread", Math.Round(spread, 4));
        span?.SetTag("retrieval.search_latency_ms", sw.ElapsedMilliseconds);

        if (spread < 0.05f)
            span?.AddEvent("retrieval.low_score_spread");

        return (docs, sw.ElapsedMilliseconds);
    }

    private Task<List<ScoredPoint>> RerankResultsAsync(string query, List<ScoredPoint> docs,
        float threshold = 0.5f)
    {
        using var span = Source.StartActivity("rag.rank_results");
        var ranked = docs
            .Where(d => d.Score >= threshold)
            .OrderByDescending(d => d.Score)
            .Take(5)
            .ToList();

        span?.SetTag("rerank.docs_input", docs.Count);
        span?.SetTag("rerank.docs_output", ranked.Count);
        span?.SetTag("rerank.threshold", threshold);
        span?.SetTag("rerank.top_score", ranked.FirstOrDefault()?.Score ?? 0f);

        if (!ranked.Any())
            span?.AddEvent("rerank.no_docs_passed_threshold");

        return Task.FromResult(ranked);
    }

    private (string Context, List<string> ChunkIds, bool Truncated) AssembleContext(
        List<ScoredPoint> docs, int maxTokens)
    {
        using var span = Source.StartActivity("rag.assemble_context");
        var contextParts = new List<string>();
        var chunkIds = new List<string>();
        int estimatedTokens = 0;
        bool truncated = false;

        foreach (var doc in docs)
        {
            var text = doc.Payload.TryGetValue("text", out var t) ? t.StringValue : "";
            int docTokens = text.Length / 4;

            if (estimatedTokens + docTokens > maxTokens) { truncated = true; break; }

            contextParts.Add(text);
            chunkIds.Add(doc.Id.ToString());
            estimatedTokens += docTokens;
        }

        span?.SetTag("context.chunks_included", chunkIds.Count);
        span?.SetTag("context.estimated_tokens", estimatedTokens);
        span?.SetTag("context.max_tokens", maxTokens);
        span?.SetTag("context.truncated", truncated);
        span?.SetTag("context.token_ratio", Math.Round((double)estimatedTokens / maxTokens, 3));

        return (string.Join("\n---\n", contextParts), chunkIds, truncated);
    }
}

Detecting Embedding Model Version Mismatches

One of the most damaging and hardest to detect RAG failures is an embedding model version mismatch — your documents were indexed with text-embedding-ada-002 and your queries are now using text-embedding-3-small. The vector dimensions may be different, or the semantic space may have shifted enough that similarity scores are unreliable. Track the embedding model version explicitly in both your index metadata and your query spans, and alert when they diverge.

# embedding_version_guard.py
from opentelemetry import trace
import os

tracer = trace.get_tracer("rag-service")

async def get_collection_embedding_version(qdrant_client, collection_name: str) -> str:
    """Fetch the embedding model version stored in collection metadata."""
    info = await qdrant_client.get_collection(collection_name)
    return info.config.params.vectors.get("metadata", {}).get("embedding_model_version", "unknown")


async def guard_embedding_version(qdrant_client, collection_name: str, query_model_version: str):
    """Raise an alert span event if index and query embedding versions do not match."""
    with tracer.start_as_current_span("rag.embedding_version_check") as span:
        index_version = await get_collection_embedding_version(qdrant_client, collection_name)

        span.set_attribute("embedding.query_version", query_model_version)
        span.set_attribute("embedding.index_version", index_version)
        span.set_attribute("embedding.version_match", index_version == query_model_version)

        if index_version != query_model_version:
            span.add_event("embedding.version_mismatch", {
                "severity": "critical",
                "index_version": index_version,
                "query_version": query_model_version,
                "action": "Re-embed the knowledge base with the current query model version",
            })
            # In production: emit a metric counter here and alert your on-call team

Grafana Dashboard Panels for RAG Observability

With the span attributes above flowing into Prometheus via the OpenTelemetry Collector, you can build a RAG-specific dashboard with the following panels. Combine these with the general LLM metrics panels from Part 3 for a complete picture.

PanelPromQLAlert threshold
Avg context precision (% relevant docs)avg(rag_context_precision_ratio)Below 0.6 for 30 min
Zero-doc retrievals (no context rate)rate(rag_no_docs_after_rerank_total[5m])Above 1% of requests
Context truncation raterate(rag_context_truncated_total[5m]) / rate(rag_requests_total[5m])Above 20%
Low score spread raterate(rag_low_score_spread_total[5m])Above 5% sustained
Embedding latency p95histogram_quantile(0.95, rag_embed_latency_ms_bucket)Above 300ms
Vector search latency p95histogram_quantile(0.95, rag_search_latency_ms_bucket)Above 200ms

What Comes Next

Your RAG pipeline is now fully instrumented — every stage traced, every quality signal captured, embedding version mismatches surfaced before they silently corrupt scores. In Part 7, we step back from quality and look at the bill: cost governance and FinOps for LLM workloads, including token budget enforcement, cost allocation by feature and tenant, and anomaly detection on spend.

Key Takeaways

  • Most RAG failures happen before the LLM — retrieval, reranking, chunking, and context assembly all fail silently
  • Research shows 80% of RAG quality issues trace back to chunking decisions at index time, not retrieval or generation
  • Trace every RAG stage as a child span with quality attributes — when a response is wrong you need to know which layer failed
  • A low score spread (top and bottom similarity scores within 0.05) is a strong signal that your retrieval returned irrelevant documents
  • Track embedding model versions explicitly in both index metadata and query spans — a version mismatch makes cosine similarity scores meaningless
  • Context truncation above 20% indicates your context window budget is too tight for the documents you are retrieving
  • Evaluate retrieval quality independently from generation quality — they fail for different reasons and require different fixes

References

Written by:

604 Posts

View All Posts
Follow Me :