mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-15 21:16:52 +01:00
336 lines
15 KiB
Python
336 lines
15 KiB
Python
"""
|
|
Utilitaires pour la génération de rapports d'analyse, extraits de AgentReportGenerator.
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import logging
|
|
from typing import Dict, List, Any, Tuple, Optional
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger("report_utils")
|
|
|
|
def extraire_question_initiale(echanges: List[Dict]) -> Optional[Dict]:
|
|
"""
|
|
Extrait la question initiale à partir du premier message de support qui cite souvent la demande du client.
|
|
|
|
Args:
|
|
echanges: Liste des échanges client/support
|
|
|
|
Returns:
|
|
Dict contenant la question initiale ou None si aucune question n'est trouvée
|
|
"""
|
|
if not echanges:
|
|
return None
|
|
|
|
# Chercher le premier message (qui est souvent une réponse du support)
|
|
premier_message = echanges[0]
|
|
|
|
# Si c'est déjà une question du client, pas besoin d'extraire
|
|
if premier_message.get("emetteur", "").upper() == "CLIENT" and premier_message.get("type", "").upper() == "QUESTION":
|
|
return None
|
|
|
|
# Si c'est une réponse du support, chercher une citation de la demande cliente
|
|
if premier_message.get("emetteur", "").upper() == "SUPPORT":
|
|
contenu = premier_message.get("contenu", "")
|
|
|
|
# Patterns communs de citation d'une demande initiale
|
|
patterns = [
|
|
r"(?:concernant|au sujet de)(?:\s*:|\s*\n)(.+?)(?:\n\n|Je viens)",
|
|
r"(?:demande|ticket)(?:\s*:|\s*\n)(.+?)(?:\n\n|Je viens)",
|
|
r"suite à ta demande\s*:(.+?)(?:\n\n|Je viens)",
|
|
r"(?:Bonjour|Salut).*?\n(.+?)(?:\n\n|Je viens)",
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, contenu, re.DOTALL)
|
|
if match:
|
|
question_text = match.group(1).strip()
|
|
|
|
# Créer un nouvel échange pour la question initiale
|
|
# Utiliser la date du premier message ou une estimation antérieure
|
|
date_support = premier_message.get("date", "")
|
|
if date_support:
|
|
# Si on a une date, estimer une date légèrement antérieure
|
|
if ":" in date_support: # Format avec heure
|
|
parts = date_support.split(" ")
|
|
if len(parts) >= 2 and ":" in parts[1]:
|
|
time_parts = parts[1].split(":")
|
|
if len(time_parts) >= 2:
|
|
# Réduire l'heure de quelques minutes
|
|
hour = int(time_parts[0])
|
|
minute = int(time_parts[1])
|
|
minute = max(0, minute - 30) # 30 minutes avant
|
|
if minute < 0:
|
|
hour = max(0, hour - 1)
|
|
minute += 60
|
|
date_question = f"{parts[0]} {hour:02d}:{minute:02d}"
|
|
else:
|
|
date_question = date_support
|
|
else:
|
|
date_question = date_support
|
|
else:
|
|
date_question = date_support
|
|
else:
|
|
date_question = "Date inconnue"
|
|
|
|
return {
|
|
"date": date_question,
|
|
"emetteur": "CLIENT",
|
|
"type": "Question",
|
|
"contenu": question_text
|
|
}
|
|
|
|
return None
|
|
|
|
def get_timestamp() -> str:
|
|
"""
|
|
Retourne un timestamp au format YYYYMMDD_HHMMSS pour identifier les fichiers et données.
|
|
|
|
Returns:
|
|
Chaîne formatée avec le timestamp actuel
|
|
"""
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
def generer_tableau_questions_reponses(echanges: List[Dict]) -> str:
|
|
"""
|
|
Génère un tableau question/réponse simplifié à partir des échanges
|
|
|
|
Args:
|
|
echanges: Liste des échanges client/support
|
|
|
|
Returns:
|
|
Tableau au format markdown
|
|
"""
|
|
if not echanges:
|
|
return "Aucun échange trouvé dans ce ticket."
|
|
|
|
# Initialiser le tableau
|
|
tableau = "\n## Tableau récapitulatif des échanges\n\n"
|
|
tableau += "| Question (Client) | Réponse (Support) |\n"
|
|
tableau += "|------------------|-------------------|\n"
|
|
|
|
# Variables pour suivre les questions et réponses
|
|
question_courante = None
|
|
questions_sans_reponse = []
|
|
|
|
# Parcourir tous les échanges pour identifier les questions et réponses
|
|
for echange in echanges:
|
|
emetteur = echange.get("emetteur", "").lower()
|
|
type_msg = echange.get("type", "").lower()
|
|
contenu = echange.get("contenu", "")
|
|
date = echange.get("date", "")
|
|
|
|
# Formater le contenu (synthétiser si trop long)
|
|
contenu_formate = synthétiser_contenu(contenu, 150)
|
|
|
|
# Si c'est une question du client
|
|
if emetteur == "client" and (type_msg == "question" or "?" in contenu):
|
|
# Si une question précédente n'a pas de réponse, l'ajouter à la liste
|
|
if question_courante:
|
|
questions_sans_reponse.append(question_courante)
|
|
|
|
# Enregistrer la nouvelle question courante
|
|
question_courante = f"{contenu_formate} _(date: {date})_"
|
|
|
|
# Si c'est une réponse du support et qu'il y a une question en attente
|
|
elif emetteur == "support" and question_courante:
|
|
# Ajouter la paire question/réponse au tableau
|
|
tableau += f"| {question_courante} | {contenu_formate} _(date: {date})_ |\n"
|
|
question_courante = None # Réinitialiser la question courante
|
|
|
|
# Traiter toute question restante sans réponse
|
|
if question_courante:
|
|
questions_sans_reponse.append(question_courante)
|
|
|
|
# Ajouter les questions sans réponse au tableau
|
|
for q in questions_sans_reponse:
|
|
tableau += f"| {q} | **Aucune réponse du support** |\n"
|
|
|
|
# Ajouter une note si aucun échange support n'a été trouvé
|
|
if not any(echange.get("emetteur", "").lower() == "support" for echange in echanges):
|
|
tableau += "\n**Note: Aucune réponse du support n'a été trouvée dans ce ticket.**\n"
|
|
|
|
return tableau
|
|
|
|
def synthétiser_contenu(contenu: str, longueur_max: int) -> str:
|
|
"""
|
|
Synthétise le contenu s'il est trop long
|
|
|
|
Args:
|
|
contenu: Contenu à synthétiser
|
|
longueur_max: Longueur maximale souhaitée
|
|
|
|
Returns:
|
|
Contenu synthétisé
|
|
"""
|
|
if len(contenu) <= longueur_max:
|
|
return contenu
|
|
|
|
# Extraire les premiers caractères
|
|
debut = contenu[:longueur_max//2].strip()
|
|
# Extraire les derniers caractères
|
|
fin = contenu[-(longueur_max//2):].strip()
|
|
|
|
return f"{debut}... {fin}"
|
|
|
|
def extraire_et_traiter_json(texte_rapport: str) -> Tuple[str, Optional[Dict], Optional[str]]:
|
|
"""
|
|
Extrait l'objet JSON des échanges du texte du rapport et le convertit en Markdown
|
|
|
|
Args:
|
|
texte_rapport: Texte complet du rapport généré par le LLM
|
|
|
|
Returns:
|
|
Tuple (rapport_traité, echanges_json, echanges_markdown)
|
|
"""
|
|
# Remplacer CBAD par CBAO dans tout le rapport
|
|
texte_rapport = texte_rapport.replace("CBAD", "CBAO")
|
|
|
|
# Patterns de recherche plus variés pour s'adapter aux différents modèles
|
|
patterns = [
|
|
r'```json\s*({.*?})\s*```', # Pattern standard avec backticks triples
|
|
r'```\s*({.*?"chronologie_echanges".*?})\s*```', # Pattern sans spécifier json mais avec le contenu attendu
|
|
r'{[\s\n]*"chronologie_echanges"[\s\n]*:[\s\n]*\[.*?\][\s\n]*}', # Pattern sans backticks
|
|
r'<json>(.*?)</json>' # Pattern alternatif avec balises xml
|
|
]
|
|
|
|
# Essayer chaque pattern
|
|
json_text = None
|
|
json_match = None
|
|
for pattern in patterns:
|
|
json_match = re.search(pattern, texte_rapport, re.DOTALL)
|
|
if json_match:
|
|
json_text = json_match.group(1).strip()
|
|
logger.info(f"JSON trouvé avec le pattern: {pattern[:20]}...")
|
|
break
|
|
|
|
# Si aucun pattern n'a fonctionné, tenter une approche plus agressive pour extraire le JSON
|
|
if not json_text:
|
|
# Chercher des indices de début de JSON dans le texte
|
|
potential_starts = [
|
|
texte_rapport.find('{"chronologie_echanges"'),
|
|
texte_rapport.find('{\n "chronologie_echanges"'),
|
|
texte_rapport.find('{ "chronologie_echanges"')
|
|
]
|
|
|
|
# Filtrer les indices valides (non -1)
|
|
valid_starts = [idx for idx in potential_starts if idx != -1]
|
|
|
|
if valid_starts:
|
|
# Prendre l'indice le plus petit (premier dans le texte)
|
|
start_idx = min(valid_starts)
|
|
# Chercher la fin du JSON (accolade fermante suivie d'une nouvelle ligne ou de la fin du texte)
|
|
json_extract = texte_rapport[start_idx:]
|
|
# Compter les accolades pour trouver la fermeture du JSON
|
|
open_braces = 0
|
|
close_idx = -1
|
|
|
|
for i, char in enumerate(json_extract):
|
|
if char == '{':
|
|
open_braces += 1
|
|
elif char == '}':
|
|
open_braces -= 1
|
|
if open_braces == 0:
|
|
close_idx = i
|
|
break
|
|
|
|
if close_idx != -1:
|
|
json_text = json_extract[:close_idx + 1]
|
|
logger.info(f"JSON extrait par analyse d'accolades: {len(json_text)} caractères")
|
|
|
|
if not json_text:
|
|
logger.warning("Aucun JSON trouvé dans le rapport")
|
|
return texte_rapport, None, None
|
|
|
|
# Nettoyage supplémentaire du JSON
|
|
# Enlever caractères non imprimables ou indésirables qui pourraient être ajoutés par certains modèles
|
|
json_text = re.sub(r'[\x00-\x1F\x7F]', '', json_text)
|
|
|
|
try:
|
|
# Vérifier que le texte commence par { et se termine par }
|
|
if not (json_text.startswith('{') and json_text.endswith('}')):
|
|
logger.warning(f"Format JSON incorrect, tentative de correction. Texte: {json_text[:50]}...")
|
|
# Chercher les délimiteurs du JSON
|
|
start = json_text.find('{')
|
|
end = json_text.rfind('}')
|
|
if start != -1 and end != -1 and start < end:
|
|
json_text = json_text[start:end+1]
|
|
|
|
echanges_json = json.loads(json_text)
|
|
logger.info(f"JSON extrait avec succès: {len(json_text)} caractères")
|
|
|
|
# Vérifier si le JSON a la structure attendue
|
|
if not isinstance(echanges_json, dict) or "chronologie_echanges" not in echanges_json:
|
|
# Tenter de corriger la structure si possible
|
|
if len(echanges_json) > 0 and isinstance(list(echanges_json.values())[0], list):
|
|
# Prendre la première liste comme chronologie
|
|
key = list(echanges_json.keys())[0]
|
|
echanges_json = {"chronologie_echanges": echanges_json[key]}
|
|
logger.info(f"Structure JSON corrigée en utilisant la clé: {key}")
|
|
else:
|
|
logger.warning("Structure JSON incorrecte et non réparable")
|
|
return texte_rapport, None, None
|
|
|
|
# Vérifier s'il faut ajouter une question initiale
|
|
if "chronologie_echanges" in echanges_json and len(echanges_json["chronologie_echanges"]) > 0:
|
|
question_initiale = extraire_question_initiale(echanges_json["chronologie_echanges"])
|
|
if question_initiale:
|
|
# Insérer la question initiale au début de la chronologie
|
|
echanges_json["chronologie_echanges"].insert(0, question_initiale)
|
|
logger.info("Question initiale extraite et ajoutée à la chronologie")
|
|
|
|
# Convertir en tableau Markdown
|
|
echanges_markdown = "| Date | Émetteur | Type | Contenu | Statut |\n"
|
|
echanges_markdown += "|------|---------|------|---------|--------|\n"
|
|
|
|
if "chronologie_echanges" in echanges_json and isinstance(echanges_json["chronologie_echanges"], list):
|
|
# Pré-traitement pour vérifier les questions sans réponse
|
|
questions_sans_reponse = {}
|
|
for i, echange in enumerate(echanges_json["chronologie_echanges"]):
|
|
if echange.get("type", "").lower() == "question" and echange.get("emetteur", "").lower() == "client":
|
|
has_response = False
|
|
# Vérifier si la question a une réponse
|
|
for j in range(i+1, len(echanges_json["chronologie_echanges"])):
|
|
next_echange = echanges_json["chronologie_echanges"][j]
|
|
if next_echange.get("type", "").lower() == "réponse" and next_echange.get("emetteur", "").lower() == "support":
|
|
has_response = True
|
|
break
|
|
questions_sans_reponse[i] = not has_response
|
|
|
|
# Générer le tableau
|
|
for i, echange in enumerate(echanges_json["chronologie_echanges"]):
|
|
date = echange.get("date", "-")
|
|
emetteur = echange.get("emetteur", "-")
|
|
type_msg = echange.get("type", "-")
|
|
contenu = echange.get("contenu", "-")
|
|
|
|
# Ajouter un statut pour les questions sans réponse
|
|
statut = ""
|
|
if emetteur.lower() == "client" and type_msg.lower() == "question" and questions_sans_reponse.get(i, False):
|
|
statut = "**Sans réponse**"
|
|
|
|
echanges_markdown += f"| {date} | {emetteur} | {type_msg} | {contenu} | {statut} |\n"
|
|
|
|
# Ajouter une note si aucune réponse du support n'a été trouvée
|
|
if not any(echange.get("emetteur", "").lower() == "support" for echange in echanges_json["chronologie_echanges"]):
|
|
echanges_markdown += "\n**Note: Aucune réponse du support n'a été trouvée dans ce ticket.**\n\n"
|
|
|
|
# Ajouter un tableau questions/réponses simplifié
|
|
tableau_qr = generer_tableau_questions_reponses(echanges_json["chronologie_echanges"])
|
|
echanges_markdown += f"\n{tableau_qr}\n"
|
|
|
|
# Remplacer le JSON dans le texte par le tableau Markdown
|
|
# Si le JSON était entouré de backticks, remplacer tout le bloc
|
|
if json_match:
|
|
rapport_traite = texte_rapport.replace(json_match.group(0), echanges_markdown)
|
|
else:
|
|
# Sinon, remplacer juste le texte JSON
|
|
rapport_traite = texte_rapport.replace(json_text, echanges_markdown)
|
|
|
|
return rapport_traite, echanges_json, echanges_markdown
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Erreur lors du décodage JSON: {e}")
|
|
logger.debug(f"Contenu JSON problématique: {json_text[:100]}...")
|
|
return texte_rapport, None, None |