mirror of
https://github.com/Ladebeze66/devsite.git
synced 2026-05-11 16:56:26 +02:00
828 lines
30 KiB
Python
828 lines
30 KiB
Python
"""Pipeline de recherche GrasBot — graph + BM25, sans embeddings (2026-04-22).
|
|
|
|
Remplace l'ancien `rag.py` (ChromaDB + embeddings Ollama). Rationnel :
|
|
|
|
- Vault de taille modeste (~36 notes, ~50-100 Ko) : la recherche sémantique
|
|
vectorielle est sur-dimensionnée et imprévisible sur vocabulaire précis.
|
|
- Le vault est déjà **structuré** (frontmatter YAML, wikilinks, MOCs) : on
|
|
exploite directement cette structure comme un graphe de connaissance.
|
|
- Résultat : retrieval **déterministe**, **traçable**, instantané (~50 ms),
|
|
sans Chroma, sans compilation C++, sans `nomic-embed-text` chargé en VRAM.
|
|
|
|
Pipeline :
|
|
|
|
1. `load_vault()` : parcours récursif de `vault-grasbot/`, parsing YAML +
|
|
body, extraction des wikilinks. Mémoïsé (chargé une fois par process).
|
|
2. `search(query, top_k)` : score chaque note (alias/title/slug/answers/
|
|
domains/tags + BM25 sur le body), expansion par graphe (voisins via
|
|
`linked`, `related`, wikilinks du body), dédoublonnage, top_k.
|
|
3. `build_prompt(query, notes)` : assemble (system, user) avec notes entières.
|
|
4. `generate(system, user)` : appel Ollama `/api/chat` (Qwen3 par défaut).
|
|
5. `answer(query)` : façade haut-niveau consommée par `api.py`.
|
|
|
|
Variables d'environnement (toutes optionnelles) :
|
|
|
|
- `OLLAMA_URL` (default: http://localhost:11434)
|
|
- `LLM_MODEL` (default: qwen3:8b)
|
|
- `VAULT_DIR` (default: <repo_root>/vault-grasbot)
|
|
- `SEARCH_TOP_K` (default: 5)
|
|
- `SEARCH_MIN_SCORE` (default: 1.0) — seuil en-dessous duquel on considère
|
|
qu'aucune note pertinente n'a été trouvée.
|
|
|
|
Instrumentation Langfuse (2026-04-23) :
|
|
|
|
- `answer()` : trace racine. Metadata (session_id, user_id, tags grounded/model).
|
|
- `search()` : span `retrieval` avec scores, reasons, seeds, voisins du graphe.
|
|
- `build_prompt()` : span `prompt_build` avec system/user en output.
|
|
- `generate()` : span `generation` (type Langfuse spécial : tokens, latence, model).
|
|
Voir `docs-site-interne/langfuse-observability.md`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
import os
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import requests
|
|
import yaml
|
|
|
|
from observability import langfuse
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
|
|
LLM_MODEL = os.environ.get("LLM_MODEL", "qwen3:8b")
|
|
|
|
_DEFAULT_VAULT = (Path(__file__).resolve().parent.parent / "vault-grasbot").as_posix()
|
|
VAULT_DIR = Path(os.environ.get("VAULT_DIR", _DEFAULT_VAULT))
|
|
|
|
TOP_K = int(os.environ.get("SEARCH_TOP_K", "5"))
|
|
MIN_SCORE = float(os.environ.get("SEARCH_MIN_SCORE", "1.0"))
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tokenisation FR (stop-words minimalistes, suffisants pour 36 notes)
|
|
# ---------------------------------------------------------------------------
|
|
_FR_STOPWORDS = {
|
|
"le", "la", "les", "un", "une", "des", "du", "de", "d", "au", "aux",
|
|
"l", "et", "ou", "ni", "mais", "donc", "or", "car", "que", "qui",
|
|
"quoi", "dont", "ou", "où", "si", "en", "y", "à", "a", "sur", "sous",
|
|
"dans", "par", "pour", "avec", "sans", "vers", "chez", "entre",
|
|
"est", "sont", "été", "être", "avoir", "il", "elle", "ils", "elles",
|
|
"on", "nous", "vous", "je", "tu", "me", "te", "se", "moi", "toi",
|
|
"son", "sa", "ses", "ma", "mon", "mes", "ta", "ton", "tes",
|
|
"notre", "nos", "votre", "vos", "leur", "leurs",
|
|
"ce", "cet", "cette", "ces", "cela", "ça", "celui", "celle",
|
|
"ceci", "celles", "ceux", "tout", "tous", "toute", "toutes",
|
|
"pas", "ne", "n", "plus", "moins", "très", "bien", "mal",
|
|
"peux", "peut", "peuvent", "pouvoir", "fait", "faire", "dit", "dire",
|
|
"quel", "quelle", "quels", "quelles", "comment", "pourquoi", "quand",
|
|
"parle", "parles", "parlez", "parler",
|
|
"moi", "toi", "lui", "eux",
|
|
}
|
|
|
|
_TOKEN_RE = re.compile(r"[A-Za-zÀ-ÖØ-öø-ÿ0-9+#.]+", re.UNICODE)
|
|
|
|
# Normalisations courantes pour que `c++`, `C#`, `push-swap`, `push_swap`
|
|
# tombent tous sur les mêmes tokens que le vault.
|
|
_NORMALIZE = {
|
|
"c++": "cpp",
|
|
"c#": "csharp",
|
|
}
|
|
|
|
|
|
def tokenize_fr(text: str) -> list[str]:
|
|
"""Tokenisation minimale avec normalisations :
|
|
- minuscules
|
|
- `-` et `_` éclatés en espaces (ex. `push-swap` → `push swap`)
|
|
- `c++` → `cpp`, `c#` → `csharp`
|
|
- stop-words FR retirés, tokens de 1 caractère écartés
|
|
"""
|
|
if not text:
|
|
return []
|
|
# Éclate les slugs/identifiants composés
|
|
cleaned = text.lower().replace("-", " ").replace("_", " ")
|
|
words = _TOKEN_RE.findall(cleaned)
|
|
out: list[str] = []
|
|
for w in words:
|
|
w = _NORMALIZE.get(w, w)
|
|
if w in _FR_STOPWORDS or len(w) <= 1:
|
|
continue
|
|
out.append(w)
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Structure d'une note
|
|
# ---------------------------------------------------------------------------
|
|
@dataclass
|
|
class Note:
|
|
"""Note Obsidian chargée en mémoire, prête à être scorée."""
|
|
|
|
slug: str
|
|
title: str
|
|
type: str # projet | competence | moc | parcours | technique
|
|
path: Path
|
|
body: str
|
|
body_tokens: list[str]
|
|
# Frontmatter structurant
|
|
source: str = ""
|
|
visibility: str = "public"
|
|
domains: list[str] = field(default_factory=list)
|
|
tags: list[str] = field(default_factory=list)
|
|
aliases: list[str] = field(default_factory=list)
|
|
answers: list[str] = field(default_factory=list)
|
|
priority: int = 5 # 1 (rarement pertinent) à 10 (toujours à remonter)
|
|
# Graphe
|
|
linked: list[str] = field(default_factory=list) # slugs
|
|
related: list[str] = field(default_factory=list) # slugs
|
|
wikilinks: list[str] = field(default_factory=list) # slugs mentionnés dans le body
|
|
# Utile côté UI
|
|
extra: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parsing du vault
|
|
# ---------------------------------------------------------------------------
|
|
_WIKILINK_RE = re.compile(r"\[\[([^\[\]|]+?)(?:\|[^\]]*?)?\]\]")
|
|
_YAML_WIKILINK_RE = re.compile(r'"\[\[([^\[\]|"]+?)(?:\|[^\]"]*?)?\]\]"')
|
|
|
|
|
|
def _extract_slugs_from_list(value: Any) -> list[str]:
|
|
"""Extrait des slugs depuis une liste YAML pouvant contenir '[[slug]]'."""
|
|
if not value:
|
|
return []
|
|
if isinstance(value, str):
|
|
value = [value]
|
|
if not isinstance(value, list):
|
|
return []
|
|
slugs: list[str] = []
|
|
for item in value:
|
|
if not isinstance(item, str):
|
|
continue
|
|
m = re.match(r"\[\[([^\[\]|]+?)(?:\|[^\]]*?)?\]\]", item.strip())
|
|
if m:
|
|
slugs.append(m.group(1).strip())
|
|
elif item.strip():
|
|
slugs.append(item.strip())
|
|
return slugs
|
|
|
|
|
|
def _extract_wikilinks_from_body(body: str) -> list[str]:
|
|
"""Retourne les slugs mentionnés via [[slug]] ou [[slug|alias]] dans le body."""
|
|
return sorted({m.group(1).strip() for m in _WIKILINK_RE.finditer(body)})
|
|
|
|
|
|
_FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n(.*)$", re.DOTALL)
|
|
|
|
|
|
def parse_note(path: Path) -> Note | None:
|
|
"""Parse une note Markdown avec frontmatter YAML. Retourne None si illisible."""
|
|
try:
|
|
raw = path.read_text(encoding="utf-8")
|
|
except OSError as exc:
|
|
print(f"⚠ parse_note: {path} illisible ({exc})")
|
|
return None
|
|
|
|
m = _FRONTMATTER_RE.match(raw)
|
|
if not m:
|
|
# Note sans frontmatter (rare) : on l'accepte quand même avec défauts.
|
|
fm: dict[str, Any] = {}
|
|
body = raw.strip()
|
|
else:
|
|
fm_text, body = m.group(1), m.group(2).strip()
|
|
try:
|
|
fm = yaml.safe_load(fm_text) or {}
|
|
except yaml.YAMLError as exc:
|
|
print(f"⚠ parse_note: frontmatter invalide dans {path.name} ({exc})")
|
|
fm = {}
|
|
|
|
slug = str(fm.get("slug") or path.stem).strip()
|
|
title = str(fm.get("title") or slug).strip()
|
|
type_ = str(fm.get("type") or "note").strip()
|
|
|
|
domains = [str(d).strip() for d in (fm.get("domains") or []) if str(d).strip()]
|
|
tags = [str(t).strip() for t in (fm.get("tags") or []) if str(t).strip()]
|
|
aliases = [str(a).strip() for a in (fm.get("aliases") or []) if str(a).strip()]
|
|
answers = [str(a).strip() for a in (fm.get("answers") or []) if str(a).strip()]
|
|
|
|
linked = _extract_slugs_from_list(fm.get("linked"))
|
|
related = _extract_slugs_from_list(fm.get("related"))
|
|
wikilinks = _extract_wikilinks_from_body(body)
|
|
|
|
try:
|
|
priority = int(fm.get("priority", 5))
|
|
except (TypeError, ValueError):
|
|
priority = 5
|
|
|
|
extra: dict[str, Any] = {}
|
|
for key in ("link", "updated"):
|
|
if key in fm and fm[key] is not None:
|
|
extra[key] = fm[key]
|
|
|
|
return Note(
|
|
slug=slug,
|
|
title=title,
|
|
type=type_,
|
|
path=path,
|
|
body=body,
|
|
body_tokens=tokenize_fr(body),
|
|
source=str(fm.get("source") or ""),
|
|
visibility=str(fm.get("visibility") or "public"),
|
|
domains=domains,
|
|
tags=tags,
|
|
aliases=aliases,
|
|
answers=answers,
|
|
priority=priority,
|
|
linked=linked,
|
|
related=related,
|
|
wikilinks=wikilinks,
|
|
extra=extra,
|
|
)
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def load_vault() -> dict[str, Note]:
|
|
"""Charge toutes les notes `.md` du vault en mémoire (mémoïsé).
|
|
|
|
Retourne un dict {slug: Note}. Les notes invisibles (`visibility: private`)
|
|
sont **exclues** pour que le chatbot ne puisse jamais les surfaces.
|
|
"""
|
|
if not VAULT_DIR.exists():
|
|
print(f"⚠ load_vault: {VAULT_DIR} introuvable")
|
|
return {}
|
|
|
|
vault: dict[str, Note] = {}
|
|
for md_path in sorted(VAULT_DIR.rglob("*.md")):
|
|
if md_path.name == "README.md" or md_path.name == "TAXONOMIE.md":
|
|
# Méta-docs du vault, pas de contenu à surfacer au chatbot.
|
|
continue
|
|
note = parse_note(md_path)
|
|
if note is None:
|
|
continue
|
|
if note.visibility != "public":
|
|
continue
|
|
if note.slug in vault:
|
|
print(f"⚠ load_vault: slug dupliqué '{note.slug}' ({md_path.name})")
|
|
vault[note.slug] = note
|
|
|
|
print(f"📚 Vault chargé : {len(vault)} notes ({VAULT_DIR})")
|
|
return vault
|
|
|
|
|
|
def reload_vault() -> dict[str, Note]:
|
|
"""Force la relecture du vault (utile après édition sans redémarrer l'API)."""
|
|
load_vault.cache_clear()
|
|
return load_vault()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scoring
|
|
# ---------------------------------------------------------------------------
|
|
def _contains_any(haystack: str, needles: list[str]) -> bool:
|
|
"""True si au moins un `needle` apparaît dans `haystack` (insensible à la casse)."""
|
|
if not needles:
|
|
return False
|
|
lower = haystack.lower()
|
|
return any(n.lower() in lower for n in needles if n)
|
|
|
|
|
|
def _token_overlap(tokens_a: list[str], tokens_b: list[str]) -> int:
|
|
"""Nombre de tokens communs (intersection simple)."""
|
|
if not tokens_a or not tokens_b:
|
|
return 0
|
|
return len(set(tokens_a) & set(tokens_b))
|
|
|
|
|
|
def _bm25_score(query_tokens: list[str], note: Note, corpus_stats: dict[str, Any]) -> float:
|
|
"""Score BM25 simplifié sur le body. Normalisé [0, ~5]."""
|
|
if not query_tokens or not note.body_tokens:
|
|
return 0.0
|
|
k1 = 1.5
|
|
b = 0.75
|
|
avgdl = corpus_stats["avgdl"]
|
|
N = corpus_stats["N"]
|
|
idf = corpus_stats["idf"]
|
|
doc_len = len(note.body_tokens)
|
|
tf_counts: dict[str, int] = {}
|
|
for tok in note.body_tokens:
|
|
tf_counts[tok] = tf_counts.get(tok, 0) + 1
|
|
|
|
score = 0.0
|
|
for q in query_tokens:
|
|
if q not in tf_counts:
|
|
continue
|
|
f = tf_counts[q]
|
|
denom = f + k1 * (1 - b + b * doc_len / avgdl) if avgdl else f
|
|
if denom == 0:
|
|
continue
|
|
score += idf.get(q, 0.0) * (f * (k1 + 1)) / denom
|
|
|
|
# Normalisation empirique : BM25 sur body court donne des valeurs 0-15,
|
|
# on écrase à [0, 5] pour rester comparable aux boosts exacts.
|
|
return min(score / 3.0, 5.0)
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _corpus_stats() -> dict[str, Any]:
|
|
"""Pré-calcule les stats BM25 globales (IDF, avgdl) une fois."""
|
|
vault = load_vault()
|
|
N = len(vault) or 1
|
|
total_len = 0
|
|
df: dict[str, int] = {}
|
|
for note in vault.values():
|
|
total_len += len(note.body_tokens)
|
|
for tok in set(note.body_tokens):
|
|
df[tok] = df.get(tok, 0) + 1
|
|
avgdl = total_len / N if N else 1
|
|
idf = {
|
|
tok: math.log((N - d + 0.5) / (d + 0.5) + 1.0)
|
|
for tok, d in df.items()
|
|
}
|
|
return {"N": N, "avgdl": avgdl, "idf": idf}
|
|
|
|
|
|
@dataclass
|
|
class ScoredNote:
|
|
note: Note
|
|
score: float
|
|
reasons: list[str] = field(default_factory=list)
|
|
|
|
|
|
def _alias_matches(aliases: list[str], query_tokens: set[str], query_lower: str) -> list[str]:
|
|
"""Un alias matche si :
|
|
- il est composé de >=2 tokens ET apparaît en substring (ex. "home assistant")
|
|
- OU il est un token unique ET ce token apparaît dans les query_tokens.
|
|
"""
|
|
hits: list[str] = []
|
|
for alias in aliases:
|
|
if not alias:
|
|
continue
|
|
al = alias.strip().lower()
|
|
al_tokens = tokenize_fr(al)
|
|
if len(al_tokens) >= 2:
|
|
if al in query_lower:
|
|
hits.append(alias)
|
|
elif al_tokens:
|
|
if al_tokens[0] in query_tokens:
|
|
hits.append(alias)
|
|
elif al in query_lower:
|
|
hits.append(alias)
|
|
return hits
|
|
|
|
|
|
def _keyword_matches(keywords: list[str], query_tokens: set[str]) -> list[str]:
|
|
"""Match strict par token : évite que 'c' matche 'recette'."""
|
|
if not keywords:
|
|
return []
|
|
kw_lower = {k.lower() for k in keywords if k}
|
|
return sorted(kw_lower & query_tokens)
|
|
|
|
|
|
def score_note(note: Note, query: str, query_tokens: list[str],
|
|
stats: dict[str, Any]) -> ScoredNote:
|
|
"""Score une note selon plusieurs signaux, retourne (score, reasons)."""
|
|
score = 0.0
|
|
reasons: list[str] = []
|
|
|
|
query_lower = query.lower()
|
|
query_token_set = set(query_tokens)
|
|
title_tokens = tokenize_fr(note.title)
|
|
|
|
# 1. Match d'alias : signal très fort (synonymes explicites)
|
|
alias_hits = _alias_matches(note.aliases, query_token_set, query_lower)
|
|
if alias_hits:
|
|
score += 10.0
|
|
reasons.append(f"alias:{','.join(alias_hits[:2])}")
|
|
|
|
# 2. Match titre / slug (stricts par tokens, pas substring)
|
|
if note.title.lower() in query_lower and len(note.title) >= 4:
|
|
score += 8.0
|
|
reasons.append("title")
|
|
elif _token_overlap(title_tokens, query_tokens) >= 2:
|
|
score += 4.0
|
|
reasons.append("title-tokens")
|
|
slug_tokens = tokenize_fr(note.slug.replace("-", " ").replace("_", " "))
|
|
if slug_tokens and all(t in query_token_set for t in slug_tokens):
|
|
score += 8.0
|
|
reasons.append("slug")
|
|
|
|
# 3. Questions-type : si la requête ressemble à une question-réponse prévue
|
|
for ans in note.answers:
|
|
if ans:
|
|
overlap = _token_overlap(tokenize_fr(ans), query_tokens)
|
|
if overlap >= 3:
|
|
score += 12.0
|
|
reasons.append("answers")
|
|
break
|
|
elif overlap >= 2:
|
|
score += 5.0
|
|
reasons.append("answers-partial")
|
|
break
|
|
|
|
# 4. Domaines et tags : match STRICT par token pour éviter les faux positifs
|
|
domain_hits = _keyword_matches(note.domains, query_token_set)
|
|
if domain_hits:
|
|
score += 5.0 * len(domain_hits)
|
|
reasons.append(f"domains:{','.join(domain_hits)}")
|
|
tag_hits = _keyword_matches(note.tags, query_token_set)
|
|
if tag_hits:
|
|
score += 3.0 * len(tag_hits)
|
|
reasons.append(f"tags:{','.join(tag_hits)}")
|
|
|
|
# 4. BM25 sur le body
|
|
bm25 = _bm25_score(query_tokens, note, stats)
|
|
if bm25 > 0:
|
|
score += bm25
|
|
reasons.append(f"bm25:{bm25:.2f}")
|
|
|
|
# À ce stade, `score` reflète uniquement des **signaux textuels** (alias,
|
|
# titre, slug, answers, domaines, tags, BM25). Les boosts ci-dessous
|
|
# (priorité, moc-hub) ne s'appliquent que si l'on a au moins un vrai
|
|
# signal — sinon une note MOC au repos bouffererait tout le top à 1.6.
|
|
if score > 0:
|
|
if note.priority > 5:
|
|
score += (note.priority - 5) * 0.3
|
|
reasons.append(f"priority:{note.priority}")
|
|
if note.type == "moc":
|
|
score += 1.0
|
|
reasons.append("moc-hub")
|
|
|
|
return ScoredNote(note=note, score=score, reasons=reasons)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Expansion par graphe
|
|
# ---------------------------------------------------------------------------
|
|
def expand_by_graph(seed: list[ScoredNote], vault: dict[str, Note],
|
|
max_extra: int = 3) -> list[ScoredNote]:
|
|
"""Ajoute les voisins directs (linked + related + wikilinks) des seeds.
|
|
|
|
Chaque voisin récupère un score dérivé de son parent (70 %) + éventuellement
|
|
boosté s'il est déjà présent dans plusieurs seeds.
|
|
"""
|
|
if not seed:
|
|
return []
|
|
|
|
result: dict[str, ScoredNote] = {s.note.slug: s for s in seed}
|
|
|
|
for parent in seed:
|
|
neighbors = set(parent.note.linked + parent.note.related + parent.note.wikilinks)
|
|
for slug in neighbors:
|
|
neighbor = vault.get(slug)
|
|
if neighbor is None:
|
|
continue
|
|
derived = parent.score * 0.6
|
|
if slug in result:
|
|
# Renforcement : voisin mentionné par plusieurs seeds = plus pertinent
|
|
result[slug].score += derived * 0.3
|
|
if "graph-reinforce" not in result[slug].reasons:
|
|
result[slug].reasons.append("graph-reinforce")
|
|
else:
|
|
result[slug] = ScoredNote(
|
|
note=neighbor,
|
|
score=derived,
|
|
reasons=[f"graph-from:{parent.note.slug}"],
|
|
)
|
|
|
|
# Limite le total de voisins ajoutés pour ne pas noyer le contexte
|
|
extras = [s for s in result.values() if s.note.slug not in {x.note.slug for x in seed}]
|
|
extras.sort(key=lambda x: -x.score)
|
|
return seed + extras[:max_extra]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sérialisation pour Langfuse (évite de loguer des objets Python opaques)
|
|
# ---------------------------------------------------------------------------
|
|
def _scored_note_to_dict(s: ScoredNote) -> dict[str, Any]:
|
|
"""Projection JSON-safe d'une `ScoredNote` pour l'UI Langfuse."""
|
|
return {
|
|
"slug": s.note.slug,
|
|
"title": s.note.title,
|
|
"type": s.note.type,
|
|
"score": round(s.score, 3),
|
|
"reasons": s.reasons,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# API haut-niveau : search (instrumenté Langfuse)
|
|
# ---------------------------------------------------------------------------
|
|
def search(query: str, top_k: int | None = None) -> list[ScoredNote]:
|
|
"""Retourne la liste des notes pertinentes pour `query`, triée par score.
|
|
|
|
Tracé dans Langfuse comme un span `retrieval` — on y log les tokens extraits,
|
|
les seeds avant expansion, les voisins ajoutés par le graphe, et le top-K final.
|
|
"""
|
|
top_k = top_k or TOP_K
|
|
vault = load_vault()
|
|
if not vault:
|
|
return []
|
|
|
|
with langfuse.start_as_current_span(
|
|
name="retrieval",
|
|
input={"query": query, "top_k": top_k},
|
|
) as span:
|
|
t0 = time.perf_counter()
|
|
stats = _corpus_stats()
|
|
query_tokens = tokenize_fr(query)
|
|
|
|
scored = [score_note(note, query, query_tokens, stats) for note in vault.values()]
|
|
scored = [s for s in scored if s.score > 0]
|
|
scored.sort(key=lambda x: -x.score)
|
|
|
|
# Top-N brut avant expansion (garde 3 seeds pour expansion graphe)
|
|
seeds = scored[:3]
|
|
expanded = expand_by_graph(seeds, vault, max_extra=top_k - len(seeds))
|
|
expanded.sort(key=lambda x: -x.score)
|
|
result = expanded[:top_k]
|
|
|
|
elapsed_ms = (time.perf_counter() - t0) * 1000
|
|
|
|
span.update(
|
|
output=[_scored_note_to_dict(s) for s in result],
|
|
metadata={
|
|
"query_tokens": query_tokens,
|
|
"vault_size": len(vault),
|
|
"candidates_with_signal": len(scored),
|
|
"seeds_before_graph": [_scored_note_to_dict(s) for s in seeds],
|
|
"bm25_stats": {
|
|
"N": stats["N"],
|
|
"avgdl": round(stats["avgdl"], 2),
|
|
"idf_terms": len(stats["idf"]),
|
|
},
|
|
"elapsed_ms": round(elapsed_ms, 1),
|
|
},
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Prompt building (instrumenté)
|
|
# ---------------------------------------------------------------------------
|
|
SYSTEM_PROMPT = """Tu es GrasBot, l'assistant IA du portfolio de Fernand Gras-Calvet, étudiant à l'École 42 Perpignan.
|
|
|
|
Ton rôle :
|
|
- Répondre aux visiteurs du site sur le parcours, les projets, les compétences de Fernand.
|
|
- T'appuyer sur les notes du vault personnel fournies dans le contexte.
|
|
|
|
Règles :
|
|
- Réponds en français, ton sobre et précis, sans emojis.
|
|
- Cite tes sources entre crochets carrés en utilisant le slug (ex. [push-swap], [ia]).
|
|
- Si l'information n'apparaît pas dans les notes fournies, dis-le honnêtement et oriente vers le site (/portfolio, /competences, /contact) sans inventer.
|
|
- Reste concis (3 à 6 phrases en général), sauf demande explicite de détail.
|
|
- Si la question est hors sujet (ex. question généraliste sans rapport avec Fernand), indique poliment ton rôle et invite à poser une question sur son parcours."""
|
|
|
|
|
|
def build_prompt(query: str, scored_notes: list[ScoredNote]) -> tuple[str, str]:
|
|
"""Assemble (system, user) pour Qwen3. Notes **entières** dans le contexte."""
|
|
# Seuil : si toutes les notes sont en-dessous, on considère "pas de contexte pertinent"
|
|
relevant = [s for s in scored_notes if s.score >= MIN_SCORE]
|
|
|
|
with langfuse.start_as_current_span(
|
|
name="prompt_build",
|
|
input={"query": query, "scored_count": len(scored_notes)},
|
|
) as span:
|
|
if relevant:
|
|
context_blocks = []
|
|
for i, s in enumerate(relevant, 1):
|
|
n = s.note
|
|
header = f"[SOURCE {i} · slug={n.slug} · type={n.type} · score={s.score:.1f}] {n.title}"
|
|
context_blocks.append(f"{header}\n{n.body}")
|
|
context = "\n\n---\n\n".join(context_blocks)
|
|
user = (
|
|
"Voici les notes pertinentes du vault personnel de Fernand :\n\n"
|
|
f"{context}\n\n"
|
|
"---\n\n"
|
|
f"Question du visiteur : {query}\n\n"
|
|
"Réponds en t'appuyant sur ces notes. Si la question dépasse leur portée, dis-le."
|
|
)
|
|
else:
|
|
user = (
|
|
f"Question du visiteur : {query}\n\n"
|
|
"Note : aucune fiche du vault ne correspond clairement à cette question. "
|
|
"Réponds sobrement à partir de tes connaissances générales, "
|
|
"sans inventer de faits spécifiques sur Fernand. "
|
|
"Invite le visiteur à explorer /portfolio, /competences, /contact."
|
|
)
|
|
|
|
grounded = bool(relevant)
|
|
span.update(
|
|
output={"system": SYSTEM_PROMPT, "user": user},
|
|
metadata={
|
|
"grounded": grounded,
|
|
"relevant_notes": [_scored_note_to_dict(s) for s in relevant],
|
|
"system_chars": len(SYSTEM_PROMPT),
|
|
"user_chars": len(user),
|
|
"min_score_threshold": MIN_SCORE,
|
|
},
|
|
)
|
|
|
|
return SYSTEM_PROMPT, user
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Génération via Ollama (instrumenté comme "generation" Langfuse)
|
|
# ---------------------------------------------------------------------------
|
|
def generate(system: str, user: str) -> str:
|
|
"""Appelle Ollama `/api/chat` et renvoie le texte de réponse.
|
|
|
|
Span Langfuse de type `generation` → expose latence, modèle, paramètres,
|
|
et tokens (si l'API Ollama les retourne dans `prompt_eval_count` /
|
|
`eval_count`) comme un LLM-call standard dans le dashboard.
|
|
"""
|
|
model_params = {
|
|
"temperature": 0.4,
|
|
"num_predict": 512,
|
|
}
|
|
messages = [
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": user},
|
|
]
|
|
|
|
with langfuse.start_as_current_observation(
|
|
as_type="generation",
|
|
name="ollama-chat",
|
|
model=LLM_MODEL,
|
|
input=messages,
|
|
model_parameters=model_params,
|
|
) as generation:
|
|
response = requests.post(
|
|
f"{OLLAMA_URL}/api/chat",
|
|
json={
|
|
"model": LLM_MODEL,
|
|
"messages": messages,
|
|
"stream": False,
|
|
"options": model_params,
|
|
"keep_alive": "30m",
|
|
},
|
|
timeout=180,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
message = data.get("message") or {}
|
|
content = message.get("content", "").strip()
|
|
if not content:
|
|
generation.update(
|
|
output=None,
|
|
metadata={"ollama_raw": data},
|
|
level="ERROR",
|
|
status_message=f"Empty response from model '{LLM_MODEL}'",
|
|
)
|
|
raise RuntimeError(
|
|
f"generate: réponse vide du modèle '{LLM_MODEL}' — vérifier qu'il est pullé."
|
|
)
|
|
|
|
# Ollama renvoie parfois les comptes de tokens — on les propage si dispos
|
|
# (compatible avec le format Langfuse "usage").
|
|
usage: dict[str, int] = {}
|
|
if "prompt_eval_count" in data:
|
|
usage["input"] = int(data["prompt_eval_count"])
|
|
if "eval_count" in data:
|
|
usage["output"] = int(data["eval_count"])
|
|
if usage:
|
|
usage["total"] = usage.get("input", 0) + usage.get("output", 0)
|
|
|
|
update_kwargs: dict[str, Any] = {"output": content}
|
|
if usage:
|
|
update_kwargs["usage_details"] = usage
|
|
generation.update(**update_kwargs)
|
|
|
|
return content
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Façade haut-niveau — trace racine Langfuse
|
|
# ---------------------------------------------------------------------------
|
|
def answer(
|
|
query: str,
|
|
top_k: int | None = None,
|
|
session_id: str | None = None,
|
|
user_id: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Entrée principale consommée par `api.py`.
|
|
|
|
Retourne :
|
|
{
|
|
"response": str, # texte LLM (consommé par askAI.js → ChatBot.js)
|
|
"sources": list[{slug, title, type, score, reasons, url?}],
|
|
"model": str,
|
|
"grounded": bool, # True si au moins 1 note a dépassé MIN_SCORE
|
|
"vault_size": int,
|
|
}
|
|
|
|
Côté Langfuse, crée une trace racine `ask` qui englobe :
|
|
- span `retrieval`
|
|
- span `prompt_build`
|
|
- span `generation` (type generation : model, params, usage)
|
|
Avec session_id/user_id propagés au trace-level pour regroupement dans Langfuse.
|
|
"""
|
|
with langfuse.start_as_current_span(
|
|
name="ask",
|
|
input={"query": query},
|
|
) as root_span:
|
|
# Méta au niveau de la TRACE (pas du span), pour filtrer/grouper dans l'UI.
|
|
trace_metadata: dict[str, Any] = {
|
|
"top_k": top_k or TOP_K,
|
|
"min_score": MIN_SCORE,
|
|
}
|
|
trace_update: dict[str, Any] = {
|
|
"name": "ask",
|
|
"input": {"query": query},
|
|
"metadata": trace_metadata,
|
|
}
|
|
if session_id:
|
|
trace_update["session_id"] = session_id
|
|
if user_id:
|
|
trace_update["user_id"] = user_id
|
|
|
|
langfuse.update_current_trace(**trace_update)
|
|
|
|
# --- Pipeline ---
|
|
t0 = time.perf_counter()
|
|
scored = search(query, top_k=top_k)
|
|
system, user = build_prompt(query, scored)
|
|
text = generate(system, user)
|
|
elapsed_ms = (time.perf_counter() - t0) * 1000
|
|
|
|
# --- Construction de la réponse API ---
|
|
sources = []
|
|
for s in scored:
|
|
url = None
|
|
if s.note.type == "projet":
|
|
url = f"/portfolio/{s.note.slug}"
|
|
elif s.note.type == "competence":
|
|
url = f"/competences/{s.note.slug}"
|
|
sources.append({
|
|
"slug": s.note.slug,
|
|
"title": s.note.title,
|
|
"type": s.note.type,
|
|
"score": round(s.score, 2),
|
|
"reasons": s.reasons,
|
|
**({"url": url} if url else {}),
|
|
})
|
|
|
|
grounded = any(s.score >= MIN_SCORE for s in scored)
|
|
max_score = max((s.score for s in scored), default=0.0)
|
|
# Score normalisé pour Langfuse : 0 si pas de contexte, sinon
|
|
# min(max_score / 15, 1) — 15 ≈ score typique d'un match fort (title + alias).
|
|
retrieval_relevance = min(max_score / 15.0, 1.0)
|
|
|
|
# --- Finalisation : output + scores + tags sur la trace ---
|
|
tags = [
|
|
"grounded" if grounded else "ungrounded",
|
|
f"model:{LLM_MODEL}",
|
|
]
|
|
if not scored:
|
|
tags.append("vault-miss")
|
|
|
|
langfuse.update_current_trace(
|
|
output={
|
|
"response": text,
|
|
"sources_count": len(sources),
|
|
"grounded": grounded,
|
|
},
|
|
tags=tags,
|
|
)
|
|
|
|
# Scores Langfuse : permettent de filtrer le dashboard (ex. "toutes les
|
|
# traces non-grounded du mois") et de tracer des régressions.
|
|
try:
|
|
langfuse.score_current_trace(
|
|
name="grounded",
|
|
value=1.0 if grounded else 0.0,
|
|
data_type="BOOLEAN",
|
|
)
|
|
langfuse.score_current_trace(
|
|
name="retrieval_relevance",
|
|
value=round(retrieval_relevance, 3),
|
|
data_type="NUMERIC",
|
|
)
|
|
except Exception as exc: # pragma: no cover
|
|
print(f"⚠ score_current_trace failed: {exc}")
|
|
|
|
root_span.update(
|
|
output={"response_chars": len(text)},
|
|
metadata={
|
|
"elapsed_ms": round(elapsed_ms, 1),
|
|
"sources_count": len(sources),
|
|
"max_score": round(max_score, 2),
|
|
"grounded": grounded,
|
|
},
|
|
)
|
|
|
|
return {
|
|
"response": text,
|
|
"sources": sources,
|
|
"model": LLM_MODEL,
|
|
"grounded": grounded,
|
|
"vault_size": len(load_vault()),
|
|
}
|