Derniere mise a jour le 14 janvier 2026
Le streaming des reponses LLM transforme l’experience utilisateur en passant de l’attente de plusieurs secondes pour une reponse complete a l’affichage du texte token par token en temps reel. Cette technique elimine la latence percue en affichant la sortie au fur et a mesure que le modele la genere, rendant meme une inference lente reactive et interactive.
La generation token par token cree des delais naturels que le streaming peut masquer
Les grands modeles de langage generent leur sortie un token a la fois via le decodage autoregressif. Chaque prediction de token depend de tous les tokens precedents, creant un processus sequentiel qui ne peut pas etre parallelise. Pour une reponse de 500 tokens aux vitesses d’inference typiques, les utilisateurs pourraient attendre plusieurs secondes avant de voir un quelconque output.
La documentation OpenAI note que le streaming “permet au client de commencer a traiter la reponse avant que la reponse complete soit disponible” (OpenAI API Reference, 2025). Ce choix architectural reflete une intuition fondamentale : les utilisateurs percoivent une interface comme plus rapide quand ils voient des resultats progressifs, meme si le temps total de completion reste inchange.
L’API Claude d’Anthropic et l’API Gemini de Google supportent toutes deux le streaming via des mecanismes similaires. Le parametre stream=true active les server-sent events qui delivrent chaque token des qu’il devient disponible. Cette coherence entre fournisseurs suggere que le streaming est devenu une attente standard pour les interfaces LLM.
Sans streaming, un chatbot qui prend 8 secondes pour generer une reponse affiche un spinner de chargement pendant toute la duree. Avec streaming, le premier token apparait apres 200-400 millisecondes, et les utilisateurs lisent au fur et a mesure que le reste arrive. La difference psychologique est substantielle.
Les Server-Sent Events fournissent l’implementation de streaming la plus simple
SSE offre un protocole leger pour le streaming serveur vers client sur HTTP. Contrairement aux WebSockets, SSE utilise des connexions HTTP standards qui fonctionnent a travers les proxies et load balancers sans configuration speciale. Cette simplicite fait de SSE le choix par defaut pour la plupart des scenarios de streaming LLM.
L’API EventSource dans les navigateurs gere les connexions SSE automatiquement, incluant la reconnexion lors d’interruptions reseau. Les implementations serveur envoient des evenements avec un format specifique : des lignes prefixees par data: suivies du payload, terminees par des doubles retours a la ligne.
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
app = FastAPI()
client = OpenAI()
async def streamer_reponse_llm(prompt: str):
"""
Generateur qui yield des chunks formates SSE depuis l'API streaming OpenAI.
Chaque chunk contient un seul token ou token partiel.
"""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
# Formater comme evenement SSE
data = json.dumps({"token": token})
yield f"data: {data}\n\n"
# Signaler la completion
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(request: Request):
"""
Endpoint qui retourne une reponse streaming au format SSE.
Le header Content-Type indique au client d'attendre un event stream.
"""
body = await request.json()
prompt = body.get("prompt", "")
return StreamingResponse(
streamer_reponse_llm(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Desactiver le buffering nginx
}
)
Le header X-Accel-Buffering: no merite attention. Nginx et autres reverse proxies bufferisent les reponses par defaut, ce qui annule le streaming en accumulant les chunks avant de les transmettre. Desactiver ce buffering assure que les tokens atteignent le client immediatement.
La consommation cote client du flux SSE utilise l’EventSource natif du navigateur ou l’API Fetch avec readable streams pour plus de controle :
// Avec EventSource (plus simple, reconnexion automatique)
const eventSource = new EventSource('/chat/stream', {
method: 'POST',
// Note: EventSource ne supporte pas nativement le body POST
// Utiliser fetch avec ReadableStream pour les requetes POST
});
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
ajouterToken(data.token);
};
// Avec l'API Fetch et ReadableStream (plus flexible)
async function streamerChat(prompt) {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parser les evenements SSE depuis le buffer
const lignes = buffer.split('\n\n');
buffer = lignes.pop(); // Garder l'evenement incomplet dans le buffer
for (const ligne of lignes) {
if (ligne.startsWith('data: ')) {
const data = ligne.slice(6);
if (data === '[DONE]') continue;
const parsed = JSON.parse(data);
ajouterToken(parsed.token);
}
}
}
}
function ajouterToken(token) {
const output = document.getElementById('chat-output');
output.textContent += token;
}
Les WebSockets permettent le streaming bidirectionnel pour les interactions complexes
Alors que SSE gere la plupart des besoins de streaming LLM, les WebSockets offrent des avantages pour les applications necessitant une communication bidirectionnelle. Les interfaces vocales qui streament l’audio dans les deux sens, l’edition collaborative avec mises a jour en temps reel, ou les applications qui doivent annuler la generation en cours beneficient toutes des connexions WebSocket.
Les WebSockets maintiennent une connexion persistante apres un handshake HTTP initial. Cela elimine l’overhead d’etablir de nouvelles connexions pour chaque interaction, reduisant la latence pour les messages de suivi dans les applications conversationnelles.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import OpenAI
import json
import asyncio
app = FastAPI()
client = OpenAI()
class GestionnaireConnexions:
"""
Gere les connexions WebSocket actives.
Gere le broadcast et le nettoyage a la deconnexion.
"""
def __init__(self):
self.connexions_actives: list[WebSocket] = []
async def connecter(self, websocket: WebSocket):
await websocket.accept()
self.connexions_actives.append(websocket)
def deconnecter(self, websocket: WebSocket):
self.connexions_actives.remove(websocket)
gestionnaire = GestionnaireConnexions()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
"""
Endpoint WebSocket pour le streaming chat bidirectionnel.
Supporte l'annulation de message et les interactions temps reel.
"""
await gestionnaire.connecter(websocket)
try:
while True:
# Recevoir message du client
data = await websocket.receive_json()
if data.get("type") == "annuler":
# Le client veut annuler la generation en cours
continue
prompt = data.get("prompt", "")
# Streamer la reponse
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
await websocket.send_json({
"type": "token",
"contenu": token
})
await websocket.send_json({"type": "termine"})
except WebSocketDisconnect:
gestionnaire.deconnecter(websocket)
L’approche WebSocket ajoute de la complexite mais permet des fonctionnalites impossibles avec SSE. L’annulation requiert que le client envoie un message pendant que le serveur streame, chose que SSE ne peut pas supporter puisqu’il n’autorise que la communication serveur vers client.
class ChatWebSocket {
constructor(url) {
this.ws = new WebSocket(url);
this.onToken = null;
this.onComplete = null;
this.buffer = '';
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'token':
this.buffer += data.contenu;
if (this.onToken) this.onToken(data.contenu, this.buffer);
break;
case 'termine':
if (this.onComplete) this.onComplete(this.buffer);
this.buffer = '';
break;
}
};
}
envoyer(prompt) {
this.buffer = '';
this.ws.send(JSON.stringify({ type: 'message', prompt }));
}
annuler() {
this.ws.send(JSON.stringify({ type: 'annuler' }));
}
}
// Utilisation
const chat = new ChatWebSocket('ws://localhost:8000/ws/chat');
chat.onToken = (token, texteComplet) => {
document.getElementById('output').textContent = texteComplet;
};
chat.envoyer('Explique l\'informatique quantique');
// L'utilisateur clique sur le bouton annuler
document.getElementById('btn-annuler').onclick = () => chat.annuler();
Le chunked transfer encoding fonctionne au niveau du protocole HTTP
Pour les environnements ou le support SSE ou WebSocket est limite, le chunked transfer encoding HTTP fournit un mecanisme de streaming alternatif. Le serveur envoie le corps de la reponse en chunks sans specifier la longueur totale du contenu a l’avance, permettant aux clients de traiter les donnees au fur et a mesure qu’elles arrivent.
Cette approche fonctionne avec tout client HTTP supportant les reponses streaming. Le compromis est moins de structure que les evenements SSE, necessitant une logique de parsing personnalisee pour gerer les donnees partielles et les limites de messages.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
async def generer_chunks(prompt: str):
"""
Generateur yielding des chunks de texte brut sans formatage SSE.
Plus simple mais necessite une logique de buffering cote client.
"""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if content := chunk.choices[0].delta.content:
yield content
@app.post("/chat/chunked")
async def chat_chunked(prompt: str):
"""
Retourne une reponse chunked avec Transfer-Encoding: chunked.
Chaque chunk contient un ou plusieurs tokens en texte brut.
"""
return StreamingResponse(
generer_chunks(prompt),
media_type="text/plain"
)
Les composants React necessitent une gestion de state soignee pour les mises a jour streaming
Les frameworks frontend requierent des patterns specifiques pour rendre le contenu streaming efficacement. Les approches naives qui mettent a jour le state a chaque token causent des re-renders excessifs, degradant la performance et creant des saccades visuelles. Batcher les mises a jour et utiliser des refs pour le buffer de texte sous-jacent resout ces problemes.
import { useState, useRef, useEffect, useCallback } from 'react';
function useReponseStreaming(endpoint) {
const [texteAffiche, setTexteAffiche] = useState('');
const [enStreaming, setEnStreaming] = useState(false);
const bufferRef = useRef('');
const rafRef = useRef(null);
const mettreAJourAffichage = useCallback(() => {
setTexteAffiche(bufferRef.current);
rafRef.current = null;
}, []);
const ajouterToken = useCallback((token) => {
bufferRef.current += token;
// Batcher les mises a jour avec requestAnimationFrame
if (!rafRef.current) {
rafRef.current = requestAnimationFrame(mettreAJourAffichage);
}
}, [mettreAJourAffichage]);
const streamerMessage = useCallback(async (prompt) => {
bufferRef.current = '';
setTexteAffiche('');
setEnStreaming(true);
try {
const response = await fetch(endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let sseBuffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
sseBuffer += decoder.decode(value, { stream: true });
const events = sseBuffer.split('\n\n');
sseBuffer = events.pop();
for (const event of events) {
if (event.startsWith('data: ')) {
const data = event.slice(6);
if (data === '[DONE]') continue;
const parsed = JSON.parse(data);
ajouterToken(parsed.token);
}
}
}
} finally {
setEnStreaming(false);
// Mise a jour finale pour s'assurer que tout le contenu buffere est affiche
setTexteAffiche(bufferRef.current);
}
}, [endpoint, ajouterToken]);
useEffect(() => {
return () => {
if (rafRef.current) {
cancelAnimationFrame(rafRef.current);
}
};
}, []);
return { texteAffiche, enStreaming, streamerMessage };
}
function ComposantChat() {
const { texteAffiche, enStreaming, streamerMessage } = useReponseStreaming('/chat/stream');
const [input, setInput] = useState('');
const gererSoumission = (e) => {
e.preventDefault();
streamerMessage(input);
setInput('');
};
return (
<div className="chat-container">
<div className="messages">
<div className="message-assistant">
{texteAffiche}
{enStreaming && <span className="curseur">|</span>}
</div>
</div>
<form onSubmit={gererSoumission}>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
disabled={enStreaming}
placeholder="Tapez votre message..."
/>
<button type="submit" disabled={enStreaming}>
Envoyer
</button>
</form>
</div>
);
}
Le batching requestAnimationFrame limite les mises a jour DOM au taux de rafraichissement de l’ecran, typiquement 60 fois par seconde. Cela empeche la degradation de performance qui survient quand on met a jour le state pour chaque token individuel, ce qui peut arriver des centaines de fois par seconde avec une inference rapide.
Les strategies de buffering de tokens affectent la lisibilite et la performance
Differentes strategies de buffering creent differentes experiences utilisateur. Pas de buffering affiche chaque token immediatement, creant un effet machine a ecrire mais potentiellement un rendu saccade. Le buffering par mot accumule les tokens jusqu’a une limite de mot, puis flush le mot complet. Le buffering par phrase attend la ponctuation, produisant une sortie plus fluide mais plus retardee.
import re
from typing import AsyncIterator
class BufferTokens:
"""
Bufferise les tokens et les yield selon differentes strategies.
Equilibre la latence percue contre la fluidite du rendu.
"""
def __init__(self, strategie: str = "mot"):
self.buffer = ""
self.strategie = strategie
def ajouter(self, token: str) -> str | None:
"""
Ajoute un token au buffer.
Retourne le contenu a flush si une limite est atteinte.
"""
self.buffer += token
if self.strategie == "aucun":
# Pas de buffering - flush immediat
resultat = self.buffer
self.buffer = ""
return resultat
elif self.strategie == "mot":
# Flush sur espace blanc
if re.search(r'\s$', self.buffer):
resultat = self.buffer
self.buffer = ""
return resultat
elif self.strategie == "phrase":
# Flush sur ponctuation de fin de phrase
if re.search(r'[.!?]\s*$', self.buffer):
resultat = self.buffer
self.buffer = ""
return resultat
return None
def flush(self) -> str:
"""Retourne tout contenu buffere restant."""
resultat = self.buffer
self.buffer = ""
return resultat
async def stream_bufferise(
stream_tokens: AsyncIterator[str],
strategie: str = "mot"
) -> AsyncIterator[str]:
"""
Applique une strategie de buffering a un stream de tokens.
Yield des chunks de contenu selon la strategie choisie.
"""
buffer = BufferTokens(strategie)
async for token in stream_tokens:
if chunk := buffer.ajouter(token):
yield chunk
# Flush le contenu restant
if final := buffer.flush():
yield final
Le choix de strategie de buffering depend du cas d’usage. Les interfaces chat utilisent typiquement pas de buffering ou le buffering par mot pour l’immediatet. La generation de documents pourrait utiliser le buffering par phrase pour une lecture plus fluide. La generation de code beneficie souvent du buffering par ligne qui flush sur les retours a la ligne.
La gestion d’erreurs requiert une degradation gracieuse pendant le streaming
Les interruptions reseau, les limites de taux et les erreurs serveur peuvent survenir en plein stream. Les implementations robustes detectent ces echecs et fournissent un feedback significatif sans perdre le contenu deja recu.
from fastapi import HTTPException
from openai import OpenAI, APIError, RateLimitError
import asyncio
async def stream_resilient(prompt: str, max_retries: int = 3):
"""
Stream avec logique de retry et gestion d'erreurs.
Preserve la sortie partielle en cas d'echec et tente la recuperation.
"""
client = OpenAI()
accumule = ""
compteur_retry = 0
while compteur_retry < max_retries:
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if content := chunk.choices[0].delta.content:
accumule += content
yield {"type": "token", "contenu": content}
yield {"type": "termine", "total": accumule}
return
except RateLimitError as e:
compteur_retry += 1
temps_attente = 2 ** compteur_retry # Backoff exponentiel
yield {
"type": "erreur",
"recuperable": True,
"message": f"Limite de taux atteinte. Nouvel essai dans {temps_attente}s...",
"partiel": accumule
}
await asyncio.sleep(temps_attente)
except APIError as e:
yield {
"type": "erreur",
"recuperable": False,
"message": str(e),
"partiel": accumule
}
return
yield {
"type": "erreur",
"recuperable": False,
"message": "Nombre maximum de retries depasse",
"partiel": accumule
}
La gestion d’erreurs cote client devrait preserver la reponse partielle et offrir des options de retry :
async function streamerAvecRecuperation(prompt, elementOutput) {
let contenuPartiel = '';
try {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
const events = parserSSE(text);
for (const event of events) {
switch (event.type) {
case 'token':
contenuPartiel += event.contenu;
elementOutput.textContent = contenuPartiel;
break;
case 'erreur':
if (event.recuperable) {
afficherMessageRetry(event.message);
} else {
afficherErreur(event.message, contenuPartiel);
}
break;
case 'termine':
afficherComplete();
break;
}
}
}
} catch (erreurReseau) {
// Connexion perdue en plein stream
afficherErreur(
'Connexion perdue. Votre reponse partielle a ete preservee.',
contenuPartiel
);
proposerRetry(prompt, contenuPartiel);
}
}
La gestion du backpressure empeche l’epuisement memoire sur les clients lents
Quand les clients ne peuvent pas consommer les tokens aussi vite que le serveur les produit, un backpressure non gere mene a une croissance memoire. Le serveur bufferise les donnees non envoyees, epuisant potentiellement la memoire sous charge. Un controle de flux approprie respecte le taux de consommation du client.
from fastapi import FastAPI
from starlette.responses import StreamingResponse
from asyncio import Queue, wait_for, TimeoutError
import asyncio
class StreamAvecBackpressure:
"""
Reponse streaming qui respecte le backpressure client.
Implemente une queue bornee avec timeout pour les consommateurs lents.
"""
def __init__(self, max_buffer: int = 100, timeout: float = 30.0):
self.queue: Queue = Queue(maxsize=max_buffer)
self.timeout = timeout
self.termine = False
async def mettre(self, item: str):
"""
Ajoute un item a la queue, bloquant si le buffer est plein.
Leve TimeoutError si le client est trop lent.
"""
try:
await wait_for(
self.queue.put(item),
timeout=self.timeout
)
except TimeoutError:
self.termine = True
raise
async def completer(self):
"""Signale la completion du stream."""
await self.queue.put(None)
self.termine = True
async def __aiter__(self):
"""Itere sur les items en queue."""
while True:
item = await self.queue.get()
if item is None:
break
yield item
async def produire_tokens(stream: StreamAvecBackpressure, prompt: str):
"""
Coroutine producteur qui genere des tokens et les met en queue.
Respecte le backpressure depuis le buffer du stream.
"""
client = OpenAI()
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in response:
if content := chunk.choices[0].delta.content:
await stream.mettre(f"data: {content}\n\n")
await stream.completer()
except TimeoutError:
# Client trop lent, abandonner la generation
pass
@app.post("/chat/backpressure")
async def chat_avec_backpressure(prompt: str):
"""
Endpoint avec streaming conscient du backpressure.
Protege la memoire serveur des clients lents.
"""
stream = StreamAvecBackpressure(max_buffer=50, timeout=30.0)
# Demarrer le producteur en arriere-plan
asyncio.create_task(produire_tokens(stream, prompt))
return StreamingResponse(
stream,
media_type="text/event-stream"
)
Le load balancing des connexions streaming necessite des sessions sticky
Les load balancers HTTP distribuent les requetes entre serveurs backend. Pour le streaming, cela cree un defi : la requete initiale et les chunks subsequents doivent atteindre le meme serveur. Sans affinite de session, les chunks de differents serveurs s’entrelacent, corrompant la reponse.
Les connexions WebSocket maintiennent naturellement l’affinite serveur puisque la connexion persiste. SSE sur HTTP necessite une configuration explicite. La plupart des load balancers supportent l’affinite de session basee sur cookie ou IP qui maintient les requetes streaming sur leur serveur d’origine.
Pour les deploiements Kubernetes, l’annotation d’affinite de session assure que les pods recoivent tous les chunks des connexions qu’ils initient :
apiVersion: v1
kind: Service
metadata:
name: llm-streaming-service
spec:
selector:
app: llm-server
ports:
- port: 80
targetPort: 8000
sessionAffinity: ClientIP
sessionAffinityConfig:
clientIP:
timeoutSeconds: 3600 # 1 heure de stickiness de session
Le monitoring des endpoints streaming differe des metriques de requetes standard
Les metriques traditionnelles de duree de requete perdent leur sens pour les endpoints streaming. Un stream de 30 secondes n’est pas une requete lente mais une reponse longue reussie. Le monitoring devrait tracker le temps jusqu’au premier token, le throughput en tokens par seconde, et le taux de completion plutot que la duree totale.
import time
from prometheus_client import Histogram, Counter, Gauge
# Definitions des metriques
temps_premier_token = Histogram(
'llm_temps_premier_token_secondes',
'Temps depuis la requete jusqu\'au premier token',
buckets=[0.1, 0.2, 0.5, 1.0, 2.0, 5.0]
)
tokens_par_seconde = Histogram(
'llm_tokens_par_seconde',
'Throughput de generation de tokens',
buckets=[10, 25, 50, 100, 200]
)
streams_actifs = Gauge(
'llm_streams_actifs',
'Connexions streaming actuellement actives'
)
completion_stream = Counter(
'llm_completions_stream_total',
'Total des completions de stream',
['statut'] # succes, erreur, annule
)
class StreamMetre:
"""
Wrapper qui instrumente un stream de tokens avec les metriques Prometheus.
"""
def __init__(self, generateur_tokens):
self.generateur = generateur_tokens
self.temps_debut = None
self.temps_premier_token = None
self.compte_tokens = 0
async def __aiter__(self):
self.temps_debut = time.time()
streams_actifs.inc()
try:
async for token in self.generateur:
if self.temps_premier_token is None:
self.temps_premier_token = time.time()
ttft = self.temps_premier_token - self.temps_debut
temps_premier_token.observe(ttft)
self.compte_tokens += 1
yield token
# Calculer le throughput
duree = time.time() - self.temps_debut
if duree > 0:
tps = self.compte_tokens / duree
tokens_par_seconde.observe(tps)
completion_stream.labels(statut='succes').inc()
except Exception:
completion_stream.labels(statut='erreur').inc()
raise
finally:
streams_actifs.dec()
Progresser avec les implementations de streaming
Commencez simple avec SSE pour le streaming unidirectionnel avant d’ajouter la complexite WebSocket. La plupart des applications LLM n’ont besoin que du streaming serveur vers client, et SSE gere ce cas avec un code minimal. Reservez les WebSockets pour les applications qui necessitent vraiment une communication bidirectionnelle ou l’annulation en cours de stream.
Testez le comportement streaming dans des conditions realistes. Les clients lents, les interruptions reseau et la haute concurrence exposent des problemes que les tests unitaires manquent. Les tests de charge avec des outils comme k6 ou Locust peuvent simuler des connexions streaming et mesurer le temps jusqu’au premier token sous charge.
Considerez le chemin complet du fournisseur LLM a l’utilisateur final. Chaque saut introduit un buffering potentiel : l’API LLM, votre serveur backend, les reverse proxies, les CDN et le navigateur client. Auditez chaque couche pour vous assurer que le streaming streame vraiment de bout en bout. Un seul composant qui bufferise annule les benefices de la livraison token par token.