Retour au blog
Guides

L'observabilite transforme le monitoring des systemes RAG en production

Racine AI

Derniere mise a jour le 14 janvier 2026

L’observabilite d’un systeme RAG en production permet de detecter les degradations de qualite avant que les utilisateurs ne les remarquent. Un pipeline RAG combine retrieval vectoriel et generation LLM, deux composants dont les defaillances silencieuses peuvent passer inaperques pendant des semaines si aucun monitoring n’est en place. Les metriques traditionnelles de latence et de disponibilite ne suffisent pas : il faut mesurer la pertinence des documents recuperes et la fidelite des reponses generees.

Un RAG en production degrade silencieusement sans monitoring adapte

Les systemes RAG souffrent de modes de defaillance specifiques que les alertes classiques ne detectent pas. Le retrieval peut retourner des documents obsoletes ou non pertinents sans lever d’erreur. Le LLM peut halluciner des informations absentes du contexte tout en produisant des reponses grammaticalement correctes. Ces problemes de qualite s’accumulent sans signal visible dans les logs applicatifs.

Le paper “Evaluating RAG Applications with RAGAs” (Shahul et al., 2023) identifie trois dimensions critiques pour l’evaluation des RAG : la pertinence du contexte recupere, la fidelite de la reponse au contexte, et la pertinence de la reponse a la question posee. Sans instrumentation specifique pour ces trois axes, un RAG peut fonctionner techniquement tout en delivrant des reponses de mauvaise qualite.

La derive des donnees constitue un autre risque silencieux. Quand de nouveaux documents sont indexes sans reembedding des anciens, les representations vectorielles deviennent incoherentes. Une requete qui retrouvait les bons documents il y a trois mois peut progressivement se degrader. Le monitoring doit detecter ces derives avant qu’elles n’impactent significativement les utilisateurs.

Les equipes qui deploient des RAG sans observabilite decouvrent generalement les problemes par les plaintes utilisateurs. A ce stade, la degradation dure souvent depuis plusieurs semaines. Une architecture d’observabilite bien concue detecte les problemes en quelques heures et permet une intervention proactive.

Quatre categories de metriques couvrent la sante d’un RAG en production

Les metriques de retrieval evaluent la qualite de la recherche vectorielle

La precision du retrieval mesure la proportion de documents pertinents parmi ceux recuperes. Un top-k de 5 documents dont 3 sont pertinents donne une precision de 0.6. Cette metrique requiert soit des annotations manuelles, soit une evaluation automatique par un LLM juge.

Le recall mesure la proportion de documents pertinents effectivement recuperes. Si le corpus contient 4 documents pertinents pour une requete et que le retrieval n’en retourne que 2, le recall est de 0.5. Le recall est plus difficile a calculer en production car il necessite de connaitre l’ensemble des documents pertinents.

Le Mean Reciprocal Rank (MRR) evalue la position du premier document pertinent. Si le document le plus pertinent apparait en 3eme position, le reciprocal rank est 1/3. Le MRR moyenne ces valeurs sur l’ensemble des requetes. Cette metrique capture l’importance de l’ordre dans les resultats.

from dataclasses import dataclass
from typing import List
import numpy as np

@dataclass
class RetrievalMetrics:
    """
    Metriques de qualite du retrieval calculees sur un batch de requetes.
    Necessitent des annotations de pertinence (automatiques ou manuelles).
    """
    precision_at_k: float
    recall_at_k: float
    mrr: float
    ndcg: float

def calculer_precision_at_k(
    documents_recuperes: List[str],
    documents_pertinents: set,
    k: int
) -> float:
    """
    Calcule la precision a k pour une requete donnee.

    Args:
        documents_recuperes: Liste ordonnee des IDs de documents retournes
        documents_pertinents: Ensemble des IDs de documents pertinents
        k: Nombre de documents a considerer

    Returns:
        Precision entre 0 et 1
    """
    top_k = documents_recuperes[:k]
    pertinents_dans_top_k = sum(1 for doc in top_k if doc in documents_pertinents)
    return pertinents_dans_top_k / k

def calculer_mrr(
    resultats_batch: List[tuple[List[str], set]]
) -> float:
    """
    Calcule le Mean Reciprocal Rank sur un batch de requetes.

    Args:
        resultats_batch: Liste de (documents_recuperes, documents_pertinents)

    Returns:
        MRR entre 0 et 1
    """
    reciprocal_ranks = []

    for documents_recuperes, documents_pertinents in resultats_batch:
        for rang, doc in enumerate(documents_recuperes, start=1):
            if doc in documents_pertinents:
                reciprocal_ranks.append(1 / rang)
                break
        else:
            reciprocal_ranks.append(0)

    return np.mean(reciprocal_ranks)

Les metriques de generation evaluent la qualite des reponses LLM

La fidelite (groundedness) mesure si la reponse reste ancree dans les documents fournis. Une reponse fidele n’affirme rien qui ne soit present dans le contexte. Cette metrique detecte les hallucinations, probleme majeur des RAG en production. L’evaluation automatique utilise un LLM juge qui compare la reponse aux documents sources.

La pertinence de la reponse (answer relevance) evalue si la reponse adresse effectivement la question posee. Une reponse peut etre fidele au contexte mais hors sujet si le retrieval a retourne des documents non pertinents. Cette metrique capture la qualite end-to-end du pipeline.

La completude evalue si la reponse couvre tous les aspects de la question. Une question multi-parties necessitant plusieurs informations peut recevoir une reponse partielle mais correcte. Le monitoring de la completude detecte les cas ou le contexte est insuffisant.

from openai import OpenAI

client = OpenAI()

def evaluer_groundedness(
    question: str,
    contexte: str,
    reponse: str
) -> dict:
    """
    Evalue la fidelite de la reponse au contexte fourni.
    Utilise un LLM juge pour detecter les affirmations non supportees.

    Returns:
        Dict avec score (0-1) et liste des affirmations non supportees
    """
    prompt_evaluation = f"""Analyse la reponse suivante et verifie si chaque affirmation
est supportee par le contexte fourni.

Question: {question}

Contexte:
{contexte}

Reponse a evaluer:
{reponse}

Pour chaque affirmation de la reponse, indique si elle est:
- SUPPORTEE: L'information est presente dans le contexte
- NON_SUPPORTEE: L'information n'est pas dans le contexte (hallucination potentielle)
- INFERENCE_RAISONNABLE: L'information peut etre deduite logiquement du contexte

Fournis un score de fidelite entre 0 et 1, et liste les affirmations non supportees.

Format de sortie JSON:
{{"score": 0.85, "affirmations_non_supportees": ["affirmation 1", "affirmation 2"]}}
"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt_evaluation}],
        response_format={"type": "json_object"}
    )

    import json
    return json.loads(response.choices[0].message.content)

def evaluer_answer_relevance(
    question: str,
    reponse: str
) -> float:
    """
    Evalue si la reponse adresse effectivement la question posee.
    Independant du contexte - se concentre sur l'adequation question/reponse.

    Returns:
        Score entre 0 et 1
    """
    prompt_evaluation = f"""Evalue si la reponse suivante repond a la question posee.

Question: {question}

Reponse: {reponse}

Criteres d'evaluation:
- La reponse adresse-t-elle le sujet de la question?
- Les informations fournies sont-elles utiles pour repondre?
- La reponse est-elle complete ou partielle?

Score entre 0 (hors sujet) et 1 (parfaitement pertinent).
Reponds uniquement avec le score numerique.
"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt_evaluation}],
        max_tokens=10
    )

    try:
        return float(response.choices[0].message.content.strip())
    except ValueError:
        return 0.5  # Valeur par defaut en cas d'erreur de parsing

Les metriques de latence revelent les goulots d’etranglement

Le Time to First Token (TTFT) mesure le delai entre la requete utilisateur et l’apparition du premier token de reponse. Pour les interfaces streaming, cette metrique est plus importante que la latence totale car elle determine la reactivite percue. Un TTFT eleve indique soit un retrieval lent, soit une queue d’inference LLM saturee.

La latence de retrieval isole le temps de la recherche vectorielle. Cette composante devrait rester sous 100-200ms pour des corpus de taille moyenne avec pgvector. Des latences superieures signalent soit un index mal configure, soit un probleme d’infrastructure.

La latence de generation mesure le temps de l’appel LLM hors streaming. Pour les APIs externes, cette metrique inclut le temps reseau. Pour les deploiements on-premise, elle reflete la charge GPU et la longueur du contexte.

import time
from dataclasses import dataclass
from typing import Optional
from contextlib import contextmanager

@dataclass
class LatencyMetrics:
    """
    Decomposition des latences d'une requete RAG.
    Permet d'identifier les composants lents.
    """
    retrieval_ms: float
    reranking_ms: Optional[float]
    generation_ttft_ms: float
    generation_total_ms: float
    total_ms: float

class LatencyTracker:
    """
    Context manager pour mesurer les latences de chaque etape du pipeline.
    """

    def __init__(self):
        self.timestamps = {}

    @contextmanager
    def track(self, step_name: str):
        """Mesure le temps d'execution d'une etape."""
        start = time.perf_counter()
        try:
            yield
        finally:
            elapsed_ms = (time.perf_counter() - start) * 1000
            self.timestamps[step_name] = elapsed_ms

    def get_metrics(self) -> LatencyMetrics:
        """Construit les metriques a partir des timestamps collectes."""
        return LatencyMetrics(
            retrieval_ms=self.timestamps.get('retrieval', 0),
            reranking_ms=self.timestamps.get('reranking'),
            generation_ttft_ms=self.timestamps.get('generation_ttft', 0),
            generation_total_ms=self.timestamps.get('generation_total', 0),
            total_ms=sum(self.timestamps.values())
        )

# Exemple d'utilisation dans un pipeline RAG
async def rag_pipeline_instrumente(question: str, tracker: LatencyTracker):
    """Pipeline RAG avec instrumentation des latences."""

    with tracker.track('retrieval'):
        documents = await rechercher_documents(question)

    with tracker.track('reranking'):
        documents_reranked = await reranker.rerank(question, documents)

    # Pour le TTFT, on utilise une approche differente avec streaming
    generation_start = time.perf_counter()
    first_token_time = None

    async for token in generer_reponse_stream(question, documents_reranked):
        if first_token_time is None:
            first_token_time = time.perf_counter()
            tracker.timestamps['generation_ttft'] = (first_token_time - generation_start) * 1000
        yield token

    tracker.timestamps['generation_total'] = (time.perf_counter() - generation_start) * 1000

Les metriques de cout controlent la viabilite economique

Le cout par requete agregge les appels d’API d’embedding, de reranking et de generation. Pour les APIs externes (OpenAI, Anthropic), chaque composant a une tarification distincte. Le monitoring du cout par requete detecte les derives et permet l’optimisation.

Le nombre de tokens consommes par requete varie selon la taille du contexte injecte et la longueur des reponses. Un monitoring fin distingue les tokens d’input (contexte + question) et d’output (reponse generee).

@dataclass
class CostMetrics:
    """
    Decomposition des couts d'une requete RAG.
    Les prix sont en dollars et correspondent aux tarifs OpenAI janvier 2026.
    """
    embedding_cost: float
    reranking_cost: float
    generation_input_tokens: int
    generation_output_tokens: int
    generation_cost: float
    total_cost: float

def calculer_cout_requete(
    query_tokens: int,
    documents_tokens: int,
    output_tokens: int,
    embedding_model: str = "text-embedding-3-small",
    generation_model: str = "gpt-4o"
) -> CostMetrics:
    """
    Calcule le cout d'une requete RAG selon les modeles utilises.
    Les tarifs doivent etre mis a jour selon la documentation officielle.
    """
    # Tarifs approximatifs - a verifier sur la documentation officielle
    EMBEDDING_COSTS = {
        "text-embedding-3-small": 0.00002 / 1000,  # par token
        "text-embedding-3-large": 0.00013 / 1000,
    }

    GENERATION_COSTS = {
        "gpt-4o": {"input": 0.0025 / 1000, "output": 0.01 / 1000},
        "gpt-4o-mini": {"input": 0.00015 / 1000, "output": 0.0006 / 1000},
    }

    embedding_cost = query_tokens * EMBEDDING_COSTS.get(embedding_model, 0)

    gen_costs = GENERATION_COSTS.get(generation_model, {"input": 0, "output": 0})
    input_tokens = query_tokens + documents_tokens
    generation_cost = (input_tokens * gen_costs["input"]) + (output_tokens * gen_costs["output"])

    return CostMetrics(
        embedding_cost=embedding_cost,
        reranking_cost=0,  # Varie selon le provider
        generation_input_tokens=input_tokens,
        generation_output_tokens=output_tokens,
        generation_cost=generation_cost,
        total_cost=embedding_cost + generation_cost
    )

Une architecture d’observabilite RAG s’organise en trois couches

La couche de collecte capture les donnees brutes de chaque requete

Chaque requete traversant le pipeline genere des traces contenant la question, les documents recuperes, la reponse generee et les metriques de performance. Ces traces constituent la matiere premiere de l’observabilite. Un identifiant unique (trace_id) relie tous les evenements d’une meme requete.

Le schema de trace doit etre suffisamment riche pour permettre le debugging sans etre si verbeux qu’il sature le stockage. Les documents complets ne sont pas toujours necessaires dans la trace si leur ID suffit pour les retrouver. Les embeddings vectoriels n’ont generalement pas besoin d’etre stockes.

from datetime import datetime
from typing import List, Optional, Any
from dataclasses import dataclass, asdict
import json
import uuid

@dataclass
class DocumentTrace:
    """Representation compacte d'un document dans une trace."""
    doc_id: str
    score: float
    chunk_preview: str  # Premiers 200 caracteres
    metadata: dict

@dataclass
class RAGTrace:
    """
    Trace complete d'une requete RAG.
    Contient les informations necessaires au debugging et a l'evaluation.
    """
    trace_id: str
    timestamp: datetime
    user_id: Optional[str]
    session_id: Optional[str]

    # Input
    question: str

    # Retrieval
    documents_retrieved: List[DocumentTrace]
    retrieval_latency_ms: float

    # Reranking (optionnel)
    documents_reranked: Optional[List[DocumentTrace]]
    reranking_latency_ms: Optional[float]

    # Generation
    response: str
    generation_model: str
    generation_latency_ms: float
    tokens_input: int
    tokens_output: int

    # Metriques qualite (evaluees en async si necessaire)
    groundedness_score: Optional[float] = None
    answer_relevance_score: Optional[float] = None

    # Metadata
    pipeline_version: str = "1.0"
    custom_metadata: dict = None

    def to_json(self) -> str:
        """Serialise la trace en JSON pour stockage."""
        data = asdict(self)
        data['timestamp'] = self.timestamp.isoformat()
        return json.dumps(data)

    @classmethod
    def from_json(cls, json_str: str) -> 'RAGTrace':
        """Deserialise une trace depuis JSON."""
        data = json.loads(json_str)
        data['timestamp'] = datetime.fromisoformat(data['timestamp'])
        data['documents_retrieved'] = [DocumentTrace(**d) for d in data['documents_retrieved']]
        if data['documents_reranked']:
            data['documents_reranked'] = [DocumentTrace(**d) for d in data['documents_reranked']]
        return cls(**data)

class TraceCollector:
    """
    Collecteur de traces avec buffer et envoi batch.
    Minimise l'impact sur la latence des requetes.
    """

    def __init__(self, backend: 'TraceBackend', buffer_size: int = 100):
        self.backend = backend
        self.buffer: List[RAGTrace] = []
        self.buffer_size = buffer_size

    def record(self, trace: RAGTrace):
        """Enregistre une trace dans le buffer."""
        self.buffer.append(trace)

        if len(self.buffer) >= self.buffer_size:
            self.flush()

    def flush(self):
        """Envoie le buffer au backend de stockage."""
        if self.buffer:
            self.backend.store_batch(self.buffer)
            self.buffer = []

La couche d’aggregation calcule les metriques a differentes granularites

Les metriques brutes des traces individuelles doivent etre agregees pour reveler les tendances. L’aggregation temporelle (par minute, heure, jour) detecte les variations de performance. L’aggregation par dimension (par utilisateur, par source documentaire, par type de question) identifie les segments problematiques.

Les percentiles (P50, P95, P99) sont plus informatifs que les moyennes pour les metriques de latence. Une moyenne de 200ms peut cacher un P99 a 5 secondes affectant une partie des utilisateurs. Le monitoring doit suivre plusieurs percentiles.

from collections import defaultdict
from datetime import datetime, timedelta
import numpy as np

class MetricsAggregator:
    """
    Agregateur de metriques avec support multi-dimensions.
    Calcule moyennes, percentiles et compteurs sur fenetres glissantes.
    """

    def __init__(self, window_minutes: int = 60):
        self.window = timedelta(minutes=window_minutes)
        self.traces: List[RAGTrace] = []

    def add_trace(self, trace: RAGTrace):
        """Ajoute une trace et purge les anciennes."""
        self.traces.append(trace)
        cutoff = datetime.utcnow() - self.window
        self.traces = [t for t in self.traces if t.timestamp > cutoff]

    def get_latency_percentiles(self) -> dict:
        """Calcule les percentiles de latence totale."""
        if not self.traces:
            return {}

        latencies = [
            t.retrieval_latency_ms + t.generation_latency_ms + (t.reranking_latency_ms or 0)
            for t in self.traces
        ]

        return {
            'p50': np.percentile(latencies, 50),
            'p95': np.percentile(latencies, 95),
            'p99': np.percentile(latencies, 99),
            'mean': np.mean(latencies),
            'count': len(latencies)
        }

    def get_quality_scores(self) -> dict:
        """Calcule les scores de qualite moyens."""
        traces_with_scores = [
            t for t in self.traces
            if t.groundedness_score is not None
        ]

        if not traces_with_scores:
            return {}

        return {
            'groundedness_mean': np.mean([t.groundedness_score for t in traces_with_scores]),
            'answer_relevance_mean': np.mean([
                t.answer_relevance_score for t in traces_with_scores
                if t.answer_relevance_score is not None
            ]),
            'evaluated_count': len(traces_with_scores)
        }

    def get_metrics_by_dimension(self, dimension: str) -> dict:
        """
        Agrege les metriques selon une dimension (user_id, session_id, etc.).
        Utile pour identifier les segments problematiques.
        """
        by_dimension = defaultdict(list)

        for trace in self.traces:
            key = getattr(trace, dimension, 'unknown')
            by_dimension[key].append(trace)

        results = {}
        for key, traces in by_dimension.items():
            latencies = [
                t.retrieval_latency_ms + t.generation_latency_ms
                for t in traces
            ]
            results[key] = {
                'count': len(traces),
                'latency_p50': np.percentile(latencies, 50),
                'latency_p95': np.percentile(latencies, 95),
            }

        return results

La couche d’alerting detecte les anomalies et notifie les equipes

Les alertes statiques (seuils fixes) detectent les violations evidentes : latence > 5s, taux d’erreur > 1%. Mais elles generent des faux positifs lors des pics de trafic legitimes. Les alertes basees sur les anomalies comparent les metriques actuelles aux valeurs historiques.

Le choix des seuils d’alerte depend du SLA et de la tolerance aux faux positifs. Des seuils trop bas generent de la fatigue d’alerte. Des seuils trop hauts manquent des problemes reels. L’approche iterative ajuste les seuils selon les retours terrain.

from enum import Enum
from typing import Callable, List
from dataclasses import dataclass

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class AlertRule:
    """
    Definition d'une regle d'alerte pour le monitoring RAG.
    """
    name: str
    description: str
    severity: AlertSeverity
    condition: Callable[[dict], bool]
    cooldown_minutes: int = 15

@dataclass
class Alert:
    """Instance d'alerte declenchee."""
    rule_name: str
    severity: AlertSeverity
    message: str
    timestamp: datetime
    metrics_snapshot: dict

class AlertManager:
    """
    Gestionnaire d'alertes avec cooldown et notification.
    """

    def __init__(self, rules: List[AlertRule], notifier: 'AlertNotifier'):
        self.rules = rules
        self.notifier = notifier
        self.last_alerts: dict[str, datetime] = {}

    def evaluate(self, metrics: dict) -> List[Alert]:
        """Evalue toutes les regles et declenche les alertes necessaires."""
        triggered = []
        now = datetime.utcnow()

        for rule in self.rules:
            # Verifier le cooldown
            if rule.name in self.last_alerts:
                elapsed = now - self.last_alerts[rule.name]
                if elapsed < timedelta(minutes=rule.cooldown_minutes):
                    continue

            # Evaluer la condition
            if rule.condition(metrics):
                alert = Alert(
                    rule_name=rule.name,
                    severity=rule.severity,
                    message=f"{rule.description} - Metriques: {metrics}",
                    timestamp=now,
                    metrics_snapshot=metrics
                )
                triggered.append(alert)
                self.last_alerts[rule.name] = now
                self.notifier.send(alert)

        return triggered

# Exemple de regles d'alerte pour un RAG
REGLES_ALERTE_RAG = [
    AlertRule(
        name="latence_elevee",
        description="Latence P95 superieure a 3 secondes",
        severity=AlertSeverity.WARNING,
        condition=lambda m: m.get('latency_p95', 0) > 3000
    ),
    AlertRule(
        name="latence_critique",
        description="Latence P99 superieure a 10 secondes",
        severity=AlertSeverity.CRITICAL,
        condition=lambda m: m.get('latency_p99', 0) > 10000
    ),
    AlertRule(
        name="qualite_degradee",
        description="Score de groundedness moyen inferieur a 0.7",
        severity=AlertSeverity.WARNING,
        condition=lambda m: m.get('groundedness_mean', 1) < 0.7
    ),
    AlertRule(
        name="retrieval_lent",
        description="Latence retrieval P95 superieure a 500ms",
        severity=AlertSeverity.WARNING,
        condition=lambda m: m.get('retrieval_p95', 0) > 500
    ),
]

Les outils d’observabilite RAG s’adaptent aux differents contextes

LangSmith offre une integration native avec LangChain

LangSmith, developpe par LangChain, propose un tracing automatique des pipelines LangChain et LangGraph. L’integration se fait par variables d’environnement sans modification du code. Les traces capturent chaque etape du pipeline avec les inputs, outputs et latences.

L’interface permet de visualiser les chaines d’execution, d’identifier les etapes lentes et de comparer les runs. L’evaluation integree permet de scorer les reponses sur des datasets de test. Le pricing est base sur le nombre de traces, ce qui peut devenir couteux a haut volume.

# Configuration LangSmith via variables d'environnement
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls__..."
os.environ["LANGCHAIN_PROJECT"] = "rag-production"

# Le tracing est automatique pour les pipelines LangChain
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import PGVector
from langchain.chains import RetrievalQA

# Ce code est automatiquement trace par LangSmith
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = PGVector(
    embedding_function=embeddings,
    connection_string="postgresql://...",
    collection_name="documents"
)

llm = ChatOpenAI(model="gpt-4o", temperature=0)

qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
    return_source_documents=True
)

# Chaque appel est trace automatiquement
result = qa_chain.invoke({"query": "Comment configurer pgvector?"})

Weights and Biases convient aux equipes ML existantes

W&B propose des fonctionnalites d’observabilite LLM via son module Weave. L’integration avec l’ecosysteme W&B existant (tracking d’experiences, registre de modeles) facilite l’adoption pour les equipes deja utilisatrices. Le tracing capture les appels LLM et permet l’evaluation sur datasets.

La force de W&B reside dans les capacites de visualisation et de comparaison. Les dashboards personnalisables permettent de suivre les metriques specifiques au cas d’usage. L’integration avec les pipelines de CI/CD automatise les tests de regression.

Phoenix fournit une solution open source auto-hebergee

Phoenix (Arize AI, open source) permet un deploiement on-premise complet de l’observabilite. Pas de donnees envoyees vers le cloud, ce qui convient aux contextes reglementaires stricts. L’interface web permet le tracing, l’evaluation et le debugging.

L’installation locale via pip ou Docker simplifie le demarrage. L’integration avec OpenTelemetry standardise la collecte de traces. La communaute active developpe des connecteurs pour les principaux frameworks.

# Installation: pip install arize-phoenix openinference-instrumentation-openai

import phoenix as px

# Lancement de l'interface Phoenix
session = px.launch_app()
print(f"Phoenix UI: {session.url}")

# Instrumentation automatique des appels OpenAI
from openinference.instrumentation.openai import OpenAIInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor

# Configuration du tracing OpenTelemetry vers Phoenix
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(
    SimpleSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:6006/v1/traces"))
)
trace.set_tracer_provider(tracer_provider)

# Instrumenter OpenAI
OpenAIInstrumentor().instrument()

# Les appels OpenAI sont maintenant traces dans Phoenix
from openai import OpenAI
client = OpenAI()

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Explique le RAG"}]
)

Une solution custom offre le controle total

Pour les besoins specifiques ou les gros volumes, une solution d’observabilite custom peut etre preferable. Le stockage des traces dans une base de donnees dediee (PostgreSQL, ClickHouse) evite les couts de SaaS. Les dashboards Grafana visualisent les metriques Prometheus.

La charge de maintenance est significative : schema de donnees, API d’ingestion, pipeline d’evaluation, dashboards. Cette approche convient aux equipes ayant des ressources d’infrastructure et des besoins tres specifiques.

# Exemple de backend custom avec PostgreSQL et Prometheus

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import psycopg2
from psycopg2.extras import execute_values

# Metriques Prometheus
rag_requests_total = Counter(
    'rag_requests_total',
    'Total des requetes RAG',
    ['status']
)

rag_latency_histogram = Histogram(
    'rag_latency_seconds',
    'Distribution des latences RAG',
    buckets=[0.1, 0.25, 0.5, 1, 2.5, 5, 10]
)

rag_groundedness_gauge = Gauge(
    'rag_groundedness_score',
    'Score de groundedness moyen sur la derniere heure'
)

rag_retrieval_latency = Histogram(
    'rag_retrieval_latency_seconds',
    'Latence du retrieval',
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1]
)

class CustomTraceBackend:
    """
    Backend de stockage des traces dans PostgreSQL.
    """

    def __init__(self, connection_string: str):
        self.conn = psycopg2.connect(connection_string)
        self._create_tables()

    def _create_tables(self):
        """Cree les tables de stockage des traces."""
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS rag_traces (
                    trace_id UUID PRIMARY KEY,
                    timestamp TIMESTAMPTZ NOT NULL,
                    user_id TEXT,
                    question TEXT NOT NULL,
                    response TEXT NOT NULL,
                    retrieval_latency_ms FLOAT,
                    generation_latency_ms FLOAT,
                    groundedness_score FLOAT,
                    answer_relevance_score FLOAT,
                    metadata JSONB
                );

                CREATE INDEX IF NOT EXISTS idx_traces_timestamp
                ON rag_traces(timestamp DESC);

                CREATE INDEX IF NOT EXISTS idx_traces_user
                ON rag_traces(user_id);
            """)
            self.conn.commit()

    def store_batch(self, traces: List[RAGTrace]):
        """Insere un batch de traces."""
        data = [
            (
                trace.trace_id,
                trace.timestamp,
                trace.user_id,
                trace.question,
                trace.response,
                trace.retrieval_latency_ms,
                trace.generation_latency_ms,
                trace.groundedness_score,
                trace.answer_relevance_score,
                json.dumps(trace.custom_metadata or {})
            )
            for trace in traces
        ]

        with self.conn.cursor() as cur:
            execute_values(
                cur,
                """INSERT INTO rag_traces
                   (trace_id, timestamp, user_id, question, response,
                    retrieval_latency_ms, generation_latency_ms,
                    groundedness_score, answer_relevance_score, metadata)
                   VALUES %s""",
                data
            )
            self.conn.commit()

        # Mise a jour des metriques Prometheus
        for trace in traces:
            total_latency = (trace.retrieval_latency_ms + trace.generation_latency_ms) / 1000
            rag_latency_histogram.observe(total_latency)
            rag_retrieval_latency.observe(trace.retrieval_latency_ms / 1000)
            rag_requests_total.labels(status='success').inc()

# Demarrer le serveur Prometheus
start_http_server(8000)

Le debugging systematique accelere la resolution des incidents

La reproduction des cas problematiques necessite des traces completes

Quand un utilisateur signale une mauvaise reponse, la trace doit permettre de reproduire exactement le cas. L’identifiant de trace ou le timestamp permet de retrouver la question, les documents recuperes et la reponse generee. Sans cette traabilite, le debugging devient de la speculation.

Les traces doivent persister suffisamment longtemps pour couvrir les cycles de reporting. Un probleme signale une semaine apres l’incident necessite que les traces soient encore disponibles. La retention de 30 a 90 jours couvre la plupart des besoins.

L’analyse des patterns de defaillance revele les causes racines

Les requetes problematiques partagent souvent des caracteristiques communes : un type de question, une source documentaire, une plage horaire. L’agregation des traces par dimensions permet d’identifier ces patterns. Une degradation limitee aux questions sur un sujet particulier pointe vers un probleme de contenu.

L’analyse des scores de groundedness par document source detecte les contenus qui induisent des hallucinations. Certains documents mal structures ou ambigus peuvent systematiquement degrader la qualite des reponses. L’identification de ces documents permet de les corriger ou de les exclure.

def analyser_patterns_defaillance(
    traces: List[RAGTrace],
    seuil_groundedness: float = 0.7
) -> dict:
    """
    Analyse les traces problematiques pour identifier des patterns.

    Args:
        traces: Liste de traces a analyser
        seuil_groundedness: Score en dessous duquel la trace est problematique

    Returns:
        Dict avec les patterns identifies
    """
    problematiques = [
        t for t in traces
        if t.groundedness_score is not None and t.groundedness_score < seuil_groundedness
    ]

    if not problematiques:
        return {"problemes_detectes": 0}

    # Analyser les documents frequemment impliques
    doc_counts = defaultdict(int)
    for trace in problematiques:
        for doc in trace.documents_retrieved:
            doc_counts[doc.doc_id] += 1

    # Trier par frequence
    docs_problematiques = sorted(
        doc_counts.items(),
        key=lambda x: x[1],
        reverse=True
    )[:10]

    # Analyser les mots-cles des questions problematiques
    from collections import Counter
    mots = []
    for trace in problematiques:
        mots.extend(trace.question.lower().split())

    mots_frequents = Counter(mots).most_common(20)

    return {
        "problemes_detectes": len(problematiques),
        "taux_problemes": len(problematiques) / len(traces),
        "documents_impliques": docs_problematiques,
        "mots_cles_questions": mots_frequents,
        "groundedness_moyen": np.mean([t.groundedness_score for t in problematiques])
    }

Les limites de l’observabilite RAG meritent d’etre reconnues

L’evaluation automatique par LLM juge n’est pas parfaite. Le modele d’evaluation peut lui-meme faire des erreurs de jugement. La correlation entre les scores automatiques et les evaluations humaines varie selon les cas d’usage. Une calibration sur des cas annotes manuellement reste necessaire.

Le cout de l’evaluation peut devenir significatif. Evaluer chaque requete avec un LLM ajoute un appel supplementaire. L’echantillonnage (evaluer 10% des requetes) reduit le cout mais peut manquer des problemes sur les segments peu frequents.

La latence de l’evaluation asynchrone cree un delai entre le probleme et sa detection. Une degradation peut durer plusieurs heures avant que suffisamment de traces evaluees n’alertent. Les metriques de latence et d’erreur restent les indicateurs les plus reactifs.

La definition des seuils de qualite est subjective. Qu’est-ce qu’un score de groundedness acceptable ? La reponse depend du cas d’usage et de la tolerance au risque. Un RAG medical aura des exigences plus elevees qu’un chatbot de support general.

Racine AI integre l’observabilite dans ses pipelines documentaires

Les pipelines de traitement documentaire de Racine AI integrent une couche d’observabilite native. Chaque extraction est tracee avec les metriques de confiance du modele VLM. Le monitoring detecte les documents problematiques et les variations de qualite d’extraction.

Pour les deployments on-premise, l’observabilite reste locale. Aucune donnee de production ne quitte l’infrastructure du client. Les tableaux de bord Grafana fournissent une visibilite complete sur la sante du systeme sans compromis sur la confidentialite.

L’evaluation continue des extractions permet d’identifier les types de documents qui necessitent un ajustement du pipeline. Cette boucle de feedback ameliore progressivement la qualite sur les corpus specifiques de chaque client.

Newsletter technique

1 article par mois sur l'IA documentaire. Pas de spam.

13 - 8 =

On nous demande souvent

Quelles sont les metriques essentielles pour monitorer un RAG en production ?

Les quatre categories cles sont : metriques de retrieval (precision@k, recall@k, MRR), metriques de generation (groundedness, answer relevance, completude), metriques de latence (TTFT, latence retrieval, latence generation), et metriques de cout (cout par requete, tokens consommes). Le groundedness est particulierement critique pour detecter les hallucinations.

Comment detecter les hallucinations dans un pipeline RAG automatiquement ?

Le score de groundedness (fidelite) mesure si chaque affirmation de la reponse est supportee par le contexte fourni. Un LLM juge compare la reponse aux documents sources et identifie les affirmations non supportees. Un score en dessous de 0.7 devrait declencher une alerte.

LangSmith, Phoenix ou solution custom : que choisir ?

LangSmith convient aux equipes LangChain avec un budget SaaS. Phoenix est ideal pour le on-premise et les contextes reglementaires stricts (open source, aucune donnee dans le cloud). Une solution custom convient aux gros volumes ou besoins tres specifiques, mais demande des ressources de maintenance significatives.

Quel pourcentage de requetes faut-il evaluer en production ?

L'evaluation par LLM juge ajoute un cout et une latence. Un echantillonnage de 5 a 10% des requetes fournit une visibilite suffisante pour detecter les degradations. Pour les applications critiques, augmentez a 20-30%. Les metriques de latence et d'erreur doivent etre collectees sur 100% des requetes.

Comment distinguer un probleme de retrieval d'un probleme de generation ?

Decomposez les metriques. Un score de context precision bas avec un answer relevance bas indique un probleme de retrieval (mauvais documents recuperes). Un context precision eleve mais un groundedness bas indique un probleme de generation (le LLM hallucine malgre un bon contexte). Le monitoring par composant isole la source du probleme.

Discutons de

Votre Projet.

IA Documents, automatisation legacy, inspection terrain. Nous deployons des solutions qui passent en production.

Decrivez votre projet et recevez une reponse sous 48h.

Nous contacter