devsite/llm-api/search.py
2026-05-01 12:17:48 +02:00

929 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Pipeline de recherche GrasBot — graph + BM25, sans embeddings (2026-04-22).
Remplace l'ancien `rag.py` (ChromaDB + embeddings Ollama). Rationnel :
- Vault de taille modeste (~36 notes, ~50-100 Ko) : la recherche sémantique
vectorielle est sur-dimensionnée et imprévisible sur vocabulaire précis.
- Le vault est déjà **structuré** (frontmatter YAML, wikilinks, MOCs) : on
exploite directement cette structure comme un graphe de connaissance.
- Résultat : retrieval **déterministe**, **traçable**, instantané (~50 ms),
sans Chroma, sans compilation C++, sans `nomic-embed-text` chargé en VRAM.
Pipeline :
1. `load_vault()` : parcours récursif de `vault-grasbot/`, parsing YAML +
body, extraction des wikilinks. Mémoïsé (chargé une fois par process).
2. `search(query, top_k)` : score chaque note (alias/title/slug/answers/
domains/tags + BM25 sur le body), expansion par graphe (voisins via
`linked`, `related`, wikilinks du body), dédoublonnage, top_k.
3. `build_prompt(query, notes)` : assemble (system, user) avec notes entières.
4. `generate(system, user)` : appel Ollama `/api/chat` (Qwen3 par défaut).
5. `answer(query)` : façade haut-niveau consommée par `api.py`.
Variables d'environnement (toutes optionnelles) :
- `OLLAMA_URL` (default: http://localhost:11434)
- `LLM_MODEL` (default: qwen3:8b)
- `VAULT_DIR` (default: <repo_root>/vault-grasbot)
- `SEARCH_TOP_K` (default: 5)
- `SEARCH_MIN_SCORE` (default: 1.0) — seuil en-dessous duquel on
considère qu'aucune note pertinente n'a été trouvée.
- `SEARCH_SECONDARY_MAX_CHARS` (default: 1500) — taille max (en chars) du body
des sources rank 2+ dans le prompt. Les sources
dépassant cette limite sont tronquées à la
frontière de paragraphe la plus proche.
- `SEARCH_SECONDARY_KEEP_RATIO` (default: 0.8) — seuil relatif au score de la
source #1. Tant que score(rank>=2) est ≥
ratio × score(#1), la source est gardée
entière (considérée aussi pertinente).
Instrumentation Langfuse (2026-04-23) :
- `answer()` : trace racine. Metadata (session_id, user_id, tags grounded/model).
- `search()` : span `retrieval` avec scores, reasons, seeds, voisins du graphe.
- `build_prompt()` : span `prompt_build` avec system/user en output.
- `generate()` : span `generation` (type Langfuse spécial : tokens, latence, model).
Voir `docs-site-interne/langfuse-observability.md`.
"""
from __future__ import annotations
import math
import os
import re
import time
from dataclasses import dataclass, field
from functools import lru_cache
from pathlib import Path
from typing import Any
import requests
import yaml
from observability import langfuse
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
LLM_MODEL = os.environ.get("LLM_MODEL", "qwen3:8b")
_DEFAULT_VAULT = (Path(__file__).resolve().parent.parent / "vault-grasbot").as_posix()
VAULT_DIR = Path(os.environ.get("VAULT_DIR", _DEFAULT_VAULT))
TOP_K = int(os.environ.get("SEARCH_TOP_K", "5"))
MIN_SCORE = float(os.environ.get("SEARCH_MIN_SCORE", "1.0"))
# Troncature des sources secondaires dans le prompt (étape 2, 2026-04-23).
# Rationnel : BM25 peut remonter des projets entiers (ex. `inception`, `cpp-partie2`)
# avec un score respectable pour des questions biographiques — ils polluent le
# contexte sans apporter d'info pertinente. On garde la source #1 entière et on
# tronque uniquement les sources rank 2+ dont le score est < SECONDARY_KEEP_RATIO
# fois celui de la #1, ET dont le body dépasse SECONDARY_MAX_CHARS caractères.
# Aucune source n'est jamais supprimée : le modèle voit toujours le top-K complet.
SECONDARY_MAX_CHARS = int(os.environ.get("SEARCH_SECONDARY_MAX_CHARS", "1500"))
SECONDARY_KEEP_RATIO = float(os.environ.get("SEARCH_SECONDARY_KEEP_RATIO", "0.8"))
# ---------------------------------------------------------------------------
# Tokenisation FR (stop-words minimalistes, suffisants pour 36 notes)
# ---------------------------------------------------------------------------
_FR_STOPWORDS = {
"le", "la", "les", "un", "une", "des", "du", "de", "d", "au", "aux",
"l", "et", "ou", "ni", "mais", "donc", "or", "car", "que", "qui",
"quoi", "dont", "ou", "", "si", "en", "y", "à", "a", "sur", "sous",
"dans", "par", "pour", "avec", "sans", "vers", "chez", "entre",
"est", "sont", "été", "être", "avoir", "il", "elle", "ils", "elles",
"on", "nous", "vous", "je", "tu", "me", "te", "se", "moi", "toi",
"son", "sa", "ses", "ma", "mon", "mes", "ta", "ton", "tes",
"notre", "nos", "votre", "vos", "leur", "leurs",
"ce", "cet", "cette", "ces", "cela", "ça", "celui", "celle",
"ceci", "celles", "ceux", "tout", "tous", "toute", "toutes",
"pas", "ne", "n", "plus", "moins", "très", "bien", "mal",
"peux", "peut", "peuvent", "pouvoir", "fait", "faire", "dit", "dire",
"quel", "quelle", "quels", "quelles", "comment", "pourquoi", "quand",
"parle", "parles", "parlez", "parler",
"moi", "toi", "lui", "eux",
}
_TOKEN_RE = re.compile(r"[A-Za-zÀ-ÖØ-öø-ÿ0-9+#.]+", re.UNICODE)
# Normalisations courantes pour que `c++`, `C#`, `push-swap`, `push_swap`
# tombent tous sur les mêmes tokens que le vault.
_NORMALIZE = {
"c++": "cpp",
"c#": "csharp",
}
def tokenize_fr(text: str) -> list[str]:
"""Tokenisation minimale avec normalisations :
- minuscules
- `-` et `_` éclatés en espaces (ex. `push-swap` → `push swap`)
- `c++` → `cpp`, `c#` → `csharp`
- stop-words FR retirés, tokens de 1 caractère écartés
"""
if not text:
return []
# Éclate les slugs/identifiants composés
cleaned = text.lower().replace("-", " ").replace("_", " ")
words = _TOKEN_RE.findall(cleaned)
out: list[str] = []
for w in words:
w = _NORMALIZE.get(w, w)
if w in _FR_STOPWORDS or len(w) <= 1:
continue
out.append(w)
return out
# ---------------------------------------------------------------------------
# Structure d'une note
# ---------------------------------------------------------------------------
@dataclass
class Note:
"""Note Obsidian chargée en mémoire, prête à être scorée."""
slug: str
title: str
type: str # projet | competence | moc | parcours | technique
path: Path
body: str
body_tokens: list[str]
# Frontmatter structurant
source: str = ""
visibility: str = "public"
domains: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
aliases: list[str] = field(default_factory=list)
answers: list[str] = field(default_factory=list)
priority: int = 5 # 1 (rarement pertinent) à 10 (toujours à remonter)
# Graphe
linked: list[str] = field(default_factory=list) # slugs
related: list[str] = field(default_factory=list) # slugs
wikilinks: list[str] = field(default_factory=list) # slugs mentionnés dans le body
# Utile côté UI
extra: dict[str, Any] = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Parsing du vault
# ---------------------------------------------------------------------------
_WIKILINK_RE = re.compile(r"\[\[([^\[\]|]+?)(?:\|[^\]]*?)?\]\]")
_YAML_WIKILINK_RE = re.compile(r'"\[\[([^\[\]|"]+?)(?:\|[^\]"]*?)?\]\]"')
def _extract_slugs_from_list(value: Any) -> list[str]:
"""Extrait des slugs depuis une liste YAML pouvant contenir '[[slug]]'."""
if not value:
return []
if isinstance(value, str):
value = [value]
if not isinstance(value, list):
return []
slugs: list[str] = []
for item in value:
if not isinstance(item, str):
continue
m = re.match(r"\[\[([^\[\]|]+?)(?:\|[^\]]*?)?\]\]", item.strip())
if m:
slugs.append(m.group(1).strip())
elif item.strip():
slugs.append(item.strip())
return slugs
def _extract_wikilinks_from_body(body: str) -> list[str]:
"""Retourne les slugs mentionnés via [[slug]] ou [[slug|alias]] dans le body."""
return sorted({m.group(1).strip() for m in _WIKILINK_RE.finditer(body)})
_FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n(.*)$", re.DOTALL)
def parse_note(path: Path) -> Note | None:
"""Parse une note Markdown avec frontmatter YAML. Retourne None si illisible."""
try:
raw = path.read_text(encoding="utf-8")
except OSError as exc:
print(f"⚠ parse_note: {path} illisible ({exc})")
return None
m = _FRONTMATTER_RE.match(raw)
if not m:
# Note sans frontmatter (rare) : on l'accepte quand même avec défauts.
fm: dict[str, Any] = {}
body = raw.strip()
else:
fm_text, body = m.group(1), m.group(2).strip()
try:
fm = yaml.safe_load(fm_text) or {}
except yaml.YAMLError as exc:
print(f"⚠ parse_note: frontmatter invalide dans {path.name} ({exc})")
fm = {}
slug = str(fm.get("slug") or path.stem).strip()
title = str(fm.get("title") or slug).strip()
type_ = str(fm.get("type") or "note").strip()
domains = [str(d).strip() for d in (fm.get("domains") or []) if str(d).strip()]
tags = [str(t).strip() for t in (fm.get("tags") or []) if str(t).strip()]
aliases = [str(a).strip() for a in (fm.get("aliases") or []) if str(a).strip()]
answers = [str(a).strip() for a in (fm.get("answers") or []) if str(a).strip()]
linked = _extract_slugs_from_list(fm.get("linked"))
related = _extract_slugs_from_list(fm.get("related"))
wikilinks = _extract_wikilinks_from_body(body)
try:
priority = int(fm.get("priority", 5))
except (TypeError, ValueError):
priority = 5
extra: dict[str, Any] = {}
for key in ("link", "updated"):
if key in fm and fm[key] is not None:
extra[key] = fm[key]
return Note(
slug=slug,
title=title,
type=type_,
path=path,
body=body,
body_tokens=tokenize_fr(body),
source=str(fm.get("source") or ""),
visibility=str(fm.get("visibility") or "public"),
domains=domains,
tags=tags,
aliases=aliases,
answers=answers,
priority=priority,
linked=linked,
related=related,
wikilinks=wikilinks,
extra=extra,
)
@lru_cache(maxsize=1)
def load_vault() -> dict[str, Note]:
"""Charge toutes les notes `.md` du vault en mémoire (mémoïsé).
Retourne un dict {slug: Note}. Les notes invisibles (`visibility: private`)
sont **exclues** pour que le chatbot ne puisse jamais les surfaces.
"""
if not VAULT_DIR.exists():
print(f"⚠ load_vault: {VAULT_DIR} introuvable")
return {}
vault: dict[str, Note] = {}
for md_path in sorted(VAULT_DIR.rglob("*.md")):
if md_path.name == "README.md" or md_path.name == "TAXONOMIE.md":
# Méta-docs du vault, pas de contenu à surfacer au chatbot.
continue
note = parse_note(md_path)
if note is None:
continue
if note.visibility != "public":
continue
if note.slug in vault:
print(f"⚠ load_vault: slug dupliqué '{note.slug}' ({md_path.name})")
vault[note.slug] = note
print(f"📚 Vault chargé : {len(vault)} notes ({VAULT_DIR})")
return vault
def reload_vault() -> dict[str, Note]:
"""Force la relecture du vault (utile après édition sans redémarrer l'API)."""
load_vault.cache_clear()
return load_vault()
# ---------------------------------------------------------------------------
# Scoring
# ---------------------------------------------------------------------------
def _contains_any(haystack: str, needles: list[str]) -> bool:
"""True si au moins un `needle` apparaît dans `haystack` (insensible à la casse)."""
if not needles:
return False
lower = haystack.lower()
return any(n.lower() in lower for n in needles if n)
def _token_overlap(tokens_a: list[str], tokens_b: list[str]) -> int:
"""Nombre de tokens communs (intersection simple)."""
if not tokens_a or not tokens_b:
return 0
return len(set(tokens_a) & set(tokens_b))
def _bm25_score(query_tokens: list[str], note: Note, corpus_stats: dict[str, Any]) -> float:
"""Score BM25 simplifié sur le body. Normalisé [0, ~5]."""
if not query_tokens or not note.body_tokens:
return 0.0
k1 = 1.5
b = 0.75
avgdl = corpus_stats["avgdl"]
N = corpus_stats["N"]
idf = corpus_stats["idf"]
doc_len = len(note.body_tokens)
tf_counts: dict[str, int] = {}
for tok in note.body_tokens:
tf_counts[tok] = tf_counts.get(tok, 0) + 1
score = 0.0
for q in query_tokens:
if q not in tf_counts:
continue
f = tf_counts[q]
denom = f + k1 * (1 - b + b * doc_len / avgdl) if avgdl else f
if denom == 0:
continue
score += idf.get(q, 0.0) * (f * (k1 + 1)) / denom
# Normalisation empirique : BM25 sur body court donne des valeurs 0-15,
# on écrase à [0, 5] pour rester comparable aux boosts exacts.
return min(score / 3.0, 5.0)
@lru_cache(maxsize=1)
def _corpus_stats() -> dict[str, Any]:
"""Pré-calcule les stats BM25 globales (IDF, avgdl) une fois."""
vault = load_vault()
N = len(vault) or 1
total_len = 0
df: dict[str, int] = {}
for note in vault.values():
total_len += len(note.body_tokens)
for tok in set(note.body_tokens):
df[tok] = df.get(tok, 0) + 1
avgdl = total_len / N if N else 1
idf = {
tok: math.log((N - d + 0.5) / (d + 0.5) + 1.0)
for tok, d in df.items()
}
return {"N": N, "avgdl": avgdl, "idf": idf}
@dataclass
class ScoredNote:
note: Note
score: float
reasons: list[str] = field(default_factory=list)
def _alias_matches(aliases: list[str], query_tokens: set[str], query_lower: str) -> list[str]:
"""Un alias matche si :
- il est composé de >=2 tokens ET apparaît en substring (ex. "home assistant")
- OU il est un token unique ET ce token apparaît dans les query_tokens.
"""
hits: list[str] = []
for alias in aliases:
if not alias:
continue
al = alias.strip().lower()
al_tokens = tokenize_fr(al)
if len(al_tokens) >= 2:
if al in query_lower:
hits.append(alias)
elif al_tokens:
if al_tokens[0] in query_tokens:
hits.append(alias)
elif al in query_lower:
hits.append(alias)
return hits
def _keyword_matches(keywords: list[str], query_tokens: set[str]) -> list[str]:
"""Match strict par token : évite que 'c' matche 'recette'."""
if not keywords:
return []
kw_lower = {k.lower() for k in keywords if k}
return sorted(kw_lower & query_tokens)
def score_note(note: Note, query: str, query_tokens: list[str],
stats: dict[str, Any]) -> ScoredNote:
"""Score une note selon plusieurs signaux, retourne (score, reasons)."""
score = 0.0
reasons: list[str] = []
query_lower = query.lower()
query_token_set = set(query_tokens)
title_tokens = tokenize_fr(note.title)
# 1. Match d'alias : signal très fort (synonymes explicites)
alias_hits = _alias_matches(note.aliases, query_token_set, query_lower)
if alias_hits:
score += 10.0
reasons.append(f"alias:{','.join(alias_hits[:2])}")
# 2. Match titre / slug (stricts par tokens, pas substring)
if note.title.lower() in query_lower and len(note.title) >= 4:
score += 8.0
reasons.append("title")
elif _token_overlap(title_tokens, query_tokens) >= 2:
score += 4.0
reasons.append("title-tokens")
slug_tokens = tokenize_fr(note.slug.replace("-", " ").replace("_", " "))
if slug_tokens and all(t in query_token_set for t in slug_tokens):
score += 8.0
reasons.append("slug")
# 3. Questions-type : si la requête ressemble à une question-réponse prévue
for ans in note.answers:
if ans:
overlap = _token_overlap(tokenize_fr(ans), query_tokens)
if overlap >= 3:
score += 12.0
reasons.append("answers")
break
elif overlap >= 2:
score += 5.0
reasons.append("answers-partial")
break
# 4. Domaines et tags : match STRICT par token pour éviter les faux positifs
domain_hits = _keyword_matches(note.domains, query_token_set)
if domain_hits:
score += 5.0 * len(domain_hits)
reasons.append(f"domains:{','.join(domain_hits)}")
tag_hits = _keyword_matches(note.tags, query_token_set)
if tag_hits:
score += 3.0 * len(tag_hits)
reasons.append(f"tags:{','.join(tag_hits)}")
# 4. BM25 sur le body
bm25 = _bm25_score(query_tokens, note, stats)
if bm25 > 0:
score += bm25
reasons.append(f"bm25:{bm25:.2f}")
# À ce stade, `score` reflète uniquement des **signaux textuels** (alias,
# titre, slug, answers, domaines, tags, BM25). Les boosts ci-dessous
# (priorité, moc-hub) ne s'appliquent que si l'on a au moins un vrai
# signal — sinon une note MOC au repos bouffererait tout le top à 1.6.
if score > 0:
if note.priority > 5:
score += (note.priority - 5) * 0.3
reasons.append(f"priority:{note.priority}")
if note.type == "moc":
score += 1.0
reasons.append("moc-hub")
return ScoredNote(note=note, score=score, reasons=reasons)
# ---------------------------------------------------------------------------
# Expansion par graphe
# ---------------------------------------------------------------------------
def expand_by_graph(seed: list[ScoredNote], vault: dict[str, Note],
max_extra: int = 3) -> list[ScoredNote]:
"""Ajoute les voisins directs (linked + related + wikilinks) des seeds.
Chaque voisin récupère un score dérivé de son parent (70 %) + éventuellement
boosté s'il est déjà présent dans plusieurs seeds.
"""
if not seed:
return []
result: dict[str, ScoredNote] = {s.note.slug: s for s in seed}
for parent in seed:
neighbors = set(parent.note.linked + parent.note.related + parent.note.wikilinks)
for slug in neighbors:
neighbor = vault.get(slug)
if neighbor is None:
continue
derived = parent.score * 0.6
if slug in result:
# Renforcement : voisin mentionné par plusieurs seeds = plus pertinent
result[slug].score += derived * 0.3
if "graph-reinforce" not in result[slug].reasons:
result[slug].reasons.append("graph-reinforce")
else:
result[slug] = ScoredNote(
note=neighbor,
score=derived,
reasons=[f"graph-from:{parent.note.slug}"],
)
# Limite le total de voisins ajoutés pour ne pas noyer le contexte
extras = [s for s in result.values() if s.note.slug not in {x.note.slug for x in seed}]
extras.sort(key=lambda x: -x.score)
return seed + extras[:max_extra]
# ---------------------------------------------------------------------------
# Sérialisation pour Langfuse (évite de loguer des objets Python opaques)
# ---------------------------------------------------------------------------
def _scored_note_to_dict(s: ScoredNote) -> dict[str, Any]:
"""Projection JSON-safe d'une `ScoredNote` pour l'UI Langfuse."""
return {
"slug": s.note.slug,
"title": s.note.title,
"type": s.note.type,
"score": round(s.score, 3),
"reasons": s.reasons,
}
# ---------------------------------------------------------------------------
# API haut-niveau : search (instrumenté Langfuse)
# ---------------------------------------------------------------------------
def search(query: str, top_k: int | None = None) -> list[ScoredNote]:
"""Retourne la liste des notes pertinentes pour `query`, triée par score.
Tracé dans Langfuse comme un span `retrieval` — on y log les tokens extraits,
les seeds avant expansion, les voisins ajoutés par le graphe, et le top-K final.
"""
top_k = top_k or TOP_K
vault = load_vault()
if not vault:
return []
with langfuse.start_as_current_span(
name="retrieval",
input={"query": query, "top_k": top_k},
) as span:
t0 = time.perf_counter()
stats = _corpus_stats()
query_tokens = tokenize_fr(query)
scored = [score_note(note, query, query_tokens, stats) for note in vault.values()]
scored = [s for s in scored if s.score > 0]
scored.sort(key=lambda x: -x.score)
# Top-N brut avant expansion (garde 3 seeds pour expansion graphe)
seeds = scored[:3]
expanded = expand_by_graph(seeds, vault, max_extra=top_k - len(seeds))
expanded.sort(key=lambda x: -x.score)
result = expanded[:top_k]
elapsed_ms = (time.perf_counter() - t0) * 1000
span.update(
output=[_scored_note_to_dict(s) for s in result],
metadata={
"query_tokens": query_tokens,
"vault_size": len(vault),
"candidates_with_signal": len(scored),
"seeds_before_graph": [_scored_note_to_dict(s) for s in seeds],
"bm25_stats": {
"N": stats["N"],
"avgdl": round(stats["avgdl"], 2),
"idf_terms": len(stats["idf"]),
},
"elapsed_ms": round(elapsed_ms, 1),
},
)
return result
# ---------------------------------------------------------------------------
# Prompt building (instrumenté)
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = """Tu es GrasBot, l'assistant IA du portfolio de Fernand Gras-Calvet, étudiant à l'École 42 Perpignan.
Ton rôle :
- Répondre aux visiteurs du site sur le parcours, les projets, les compétences de Fernand.
- T'appuyer sur les notes du vault personnel fournies dans le contexte.
Règles de ton :
- Réponds en français, ton sobre et précis, sans emojis.
- N'inclus **pas** de références entre crochets du type `[slug-de-la-note]` dans ta réponse : l'interface liste déjà les fiches du vault sous le message (liens cliquables). Tu peux nommer un projet ou une compétence dans la phrase si utile, sans notation `[slug]`.
- Pour les **liens externes** (site personnel, GitHub, LinkedIn, etc.), utilise le Markdown `[libellé court](url)`. Ne répète pas l'URL en texte brut après le lien. Les listes à puces conviennent pour les coordonnées (téléphone, email, adresse).
- **CV PDF** : le fichier officiel est servi sous `/cv/cv-fernand-grascalvet.pdf` (toujours ce chemin). Si le visiteur demande le CV, un PDF, ou « télécharger », inclus dans ta réponse un lien Markdown `[Télécharger le CV (PDF)](/cv/cv-fernand-grascalvet.pdf)`. Pour une question sur **alternance**, **embauche** ou **parcours** où un CV est naturellement utile, tu peux proposer ce lien une fois en fin de réponse (sans insister si hors contexte).
- Reste concis (3 à 6 phrases en général), sauf demande explicite de détail.
- Si la question est hors sujet (ex. question généraliste sans rapport avec Fernand), indique poliment ton rôle et invite à poser une question sur son parcours.
Règles de fidélité aux sources (important) :
- Chaque source fournie est annotée `type=parcours | projet | moc | competence | glossaire`.
- Pour toute question biographique (qui est Fernand, âge, situation, école, objectif, contact, localisation), appuie-toi **en priorité** sur les sources de `type=parcours` visibles dans le contexte (bio, CV). Ne **déduis jamais** d'informations biographiques depuis une source `type=projet` ou `type=moc`.
- Ne **jamais inventer** un fait factuel (âge, date, diplôme, école, entreprise, technologie utilisée) qui n'apparaît pas littéralement dans les sources. Si l'info n'est pas présente, écris « non précisé dans les notes » et oriente vers /portfolio, /competences ou /contact.
- En cas de contradiction entre deux sources, privilégie la source de plus haut score, mentionne brièvement la divergence, et ne choisis jamais une valeur absente des deux.
- Une note dont le body se termine par « note tronquée » a été résumée : signale-le si tu t'appuies dessus pour un point précis, ou invite à consulter la note complète."""
_TRUNCATION_MARKER = "\n\n… *(note tronquée — voir le vault pour le détail)*"
def _truncate_body(body: str, max_chars: int) -> str:
"""Coupe `body` à `max_chars` en essayant de finir sur une frontière propre.
Stratégie :
1. Si le body est déjà ≤ max_chars → inchangé.
2. Sinon on garde `body[:max_chars]` puis on cherche la dernière coupure
"naturelle" (double saut de ligne = fin de paragraphe, sinon fin de
phrase). On ne recule que si la coupure trouvée est dans la moitié
haute de la fenêtre, pour éviter de perdre trop de contenu.
3. On ajoute un marqueur explicite pour signaler au modèle que la note
a été résumée (évite qu'il conclue "il n'y a pas d'info sur ...").
"""
if len(body) <= max_chars:
return body
truncated = body[:max_chars]
cutoff = -1
for sep in ("\n\n", ". ", "\n", " "):
idx = truncated.rfind(sep)
if idx >= max_chars * 0.6:
cutoff = idx + (len(sep) if sep.endswith(" ") else 0)
break
if cutoff > 0:
truncated = truncated[:cutoff]
return truncated.rstrip() + _TRUNCATION_MARKER
def build_prompt(query: str, scored_notes: list[ScoredNote]) -> tuple[str, str]:
"""Assemble (system, user) pour Qwen3.
- Sources gardées : celles dont le score ≥ MIN_SCORE.
- Troncature : la source #1 (top score) reste **entière**. Les sources
rank 2+ dont le score est < SECONDARY_KEEP_RATIO × score(#1) et dont le
body dépasse SECONDARY_MAX_CHARS sont résumées par `_truncate_body`.
Aucune source n'est supprimée — le modèle voit toujours tout le top-K.
"""
relevant = [s for s in scored_notes if s.score >= MIN_SCORE]
with langfuse.start_as_current_span(
name="prompt_build",
input={"query": query, "scored_count": len(scored_notes)},
) as span:
truncated_log: list[dict[str, Any]] = []
if relevant:
top_score = relevant[0].score
keep_full_threshold = top_score * SECONDARY_KEEP_RATIO
context_blocks = []
for i, s in enumerate(relevant, 1):
n = s.note
body = n.body
original_chars = len(body)
should_truncate = (
i > 1
and s.score < keep_full_threshold
and original_chars > SECONDARY_MAX_CHARS
)
if should_truncate:
body = _truncate_body(body, SECONDARY_MAX_CHARS)
truncated_log.append(
{
"rank": i,
"slug": n.slug,
"score": round(s.score, 2),
"original_chars": original_chars,
"truncated_chars": len(body),
}
)
header = f"[SOURCE {i} · slug={n.slug} · type={n.type} · score={s.score:.1f}] {n.title}"
context_blocks.append(f"{header}\n{body}")
context = "\n\n---\n\n".join(context_blocks)
user = (
"Voici les notes pertinentes du vault personnel de Fernand :\n\n"
f"{context}\n\n"
"---\n\n"
f"Question du visiteur : {query}\n\n"
"Réponds en t'appuyant sur ces notes. Si la question dépasse leur portée, dis-le."
)
else:
user = (
f"Question du visiteur : {query}\n\n"
"Note : aucune fiche du vault ne correspond clairement à cette question. "
"Réponds sobrement à partir de tes connaissances générales, "
"sans inventer de faits spécifiques sur Fernand. "
"Invite le visiteur à explorer /portfolio, /competences, /contact."
)
grounded = bool(relevant)
span.update(
output={"system": SYSTEM_PROMPT, "user": user},
metadata={
"grounded": grounded,
"relevant_notes": [_scored_note_to_dict(s) for s in relevant],
"system_chars": len(SYSTEM_PROMPT),
"user_chars": len(user),
"min_score_threshold": MIN_SCORE,
"truncation": {
"secondary_max_chars": SECONDARY_MAX_CHARS,
"secondary_keep_ratio": SECONDARY_KEEP_RATIO,
"truncated_notes": truncated_log,
},
},
)
return SYSTEM_PROMPT, user
# ---------------------------------------------------------------------------
# Génération via Ollama (instrumenté comme "generation" Langfuse)
# ---------------------------------------------------------------------------
def generate(system: str, user: str) -> str:
"""Appelle Ollama `/api/chat` et renvoie le texte de réponse.
Span Langfuse de type `generation` → expose latence, modèle, paramètres,
et tokens (si l'API Ollama les retourne dans `prompt_eval_count` /
`eval_count`) comme un LLM-call standard dans le dashboard.
Paramètres clefs (tunés 2026-04-23 après audit des traces Langfuse) :
- `num_ctx=8192` : fenêtre de contexte explicite (le défaut Ollama
à 2048/4096 tronquait silencieusement le début du prompt quand les
sources du RAG étaient volumineuses, d'où hallucinations sur l'identité).
- `num_predict=1024` : budget de sortie doublé (512 coupait les réponses
détaillées — p. ex. description du site ou d'un projet — en plein milieu).
- `think=False` (top-level, hors `options`) : désactive le mode *thinking*
de qwen3. Sinon le modèle consomme du budget de sortie en raisonnement
interne avant de générer la réponse visible.
"""
model_params = {
"temperature": 0.4,
"num_ctx": 8192,
"num_predict": 1024,
}
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
with langfuse.start_as_current_observation(
as_type="generation",
name="ollama-chat",
model=LLM_MODEL,
input=messages,
model_parameters={**model_params, "think": False},
) as generation:
response = requests.post(
f"{OLLAMA_URL}/api/chat",
json={
"model": LLM_MODEL,
"messages": messages,
"stream": False,
"think": False,
"options": model_params,
"keep_alive": "30m",
},
timeout=180,
)
response.raise_for_status()
data = response.json()
message = data.get("message") or {}
content = message.get("content", "").strip()
if not content:
generation.update(
output=None,
metadata={"ollama_raw": data},
level="ERROR",
status_message=f"Empty response from model '{LLM_MODEL}'",
)
raise RuntimeError(
f"generate: réponse vide du modèle '{LLM_MODEL}' — vérifier qu'il est pullé."
)
# Ollama renvoie parfois les comptes de tokens — on les propage si dispos
# (compatible avec le format Langfuse "usage").
usage: dict[str, int] = {}
if "prompt_eval_count" in data:
usage["input"] = int(data["prompt_eval_count"])
if "eval_count" in data:
usage["output"] = int(data["eval_count"])
if usage:
usage["total"] = usage.get("input", 0) + usage.get("output", 0)
update_kwargs: dict[str, Any] = {"output": content}
if usage:
update_kwargs["usage_details"] = usage
generation.update(**update_kwargs)
return content
# ---------------------------------------------------------------------------
# Façade haut-niveau — trace racine Langfuse
# ---------------------------------------------------------------------------
def answer(
query: str,
top_k: int | None = None,
session_id: str | None = None,
user_id: str | None = None,
) -> dict[str, Any]:
"""Entrée principale consommée par `api.py`.
Retourne :
{
"response": str, # texte LLM (consommé par askAI.js → ChatBot.js)
"sources": list[{slug, title, type, score, reasons, url?}],
"model": str,
"grounded": bool, # True si au moins 1 note a dépassé MIN_SCORE
"vault_size": int,
}
Côté Langfuse, crée une trace racine `ask` qui englobe :
- span `retrieval`
- span `prompt_build`
- span `generation` (type generation : model, params, usage)
Avec session_id/user_id propagés au trace-level pour regroupement dans Langfuse.
"""
with langfuse.start_as_current_span(
name="ask",
input={"query": query},
) as root_span:
# Méta au niveau de la TRACE (pas du span), pour filtrer/grouper dans l'UI.
trace_metadata: dict[str, Any] = {
"top_k": top_k or TOP_K,
"min_score": MIN_SCORE,
}
trace_update: dict[str, Any] = {
"name": "ask",
"input": {"query": query},
"metadata": trace_metadata,
}
if session_id:
trace_update["session_id"] = session_id
if user_id:
trace_update["user_id"] = user_id
langfuse.update_current_trace(**trace_update)
# --- Pipeline ---
t0 = time.perf_counter()
scored = search(query, top_k=top_k)
system, user = build_prompt(query, scored)
text = generate(system, user)
elapsed_ms = (time.perf_counter() - t0) * 1000
# --- Construction de la réponse API ---
sources = []
for s in scored:
url = None
if s.note.type == "projet":
url = f"/portfolio/{s.note.slug}"
elif s.note.type == "competence":
url = f"/competences/{s.note.slug}"
sources.append({
"slug": s.note.slug,
"title": s.note.title,
"type": s.note.type,
"score": round(s.score, 2),
"reasons": s.reasons,
**({"url": url} if url else {}),
})
grounded = any(s.score >= MIN_SCORE for s in scored)
max_score = max((s.score for s in scored), default=0.0)
# Score normalisé pour Langfuse : 0 si pas de contexte, sinon
# min(max_score / 15, 1) — 15 ≈ score typique d'un match fort (title + alias).
retrieval_relevance = min(max_score / 15.0, 1.0)
# --- Finalisation : output + scores + tags sur la trace ---
tags = [
"grounded" if grounded else "ungrounded",
f"model:{LLM_MODEL}",
]
if not scored:
tags.append("vault-miss")
langfuse.update_current_trace(
output={
"response": text,
"sources_count": len(sources),
"grounded": grounded,
},
tags=tags,
)
# Scores Langfuse : permettent de filtrer le dashboard (ex. "toutes les
# traces non-grounded du mois") et de tracer des régressions.
try:
langfuse.score_current_trace(
name="grounded",
value=1.0 if grounded else 0.0,
data_type="BOOLEAN",
)
langfuse.score_current_trace(
name="retrieval_relevance",
value=round(retrieval_relevance, 3),
data_type="NUMERIC",
)
except Exception as exc: # pragma: no cover
print(f"⚠ score_current_trace failed: {exc}")
root_span.update(
output={"response_chars": len(text)},
metadata={
"elapsed_ms": round(elapsed_ms, 1),
"sources_count": len(sources),
"max_score": round(max_score, 2),
"grounded": grounded,
},
)
return {
"response": text,
"sources": sources,
"model": LLM_MODEL,
"grounded": grounded,
"vault_size": len(load_vault()),
}