llm_ticket3/agents/agent_report_generator.py
2025-04-08 14:40:14 +02:00

815 lines
38 KiB
Python

import json
import os
from .base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any, Tuple, Optional, List
import logging
import traceback
import re
logger = logging.getLogger("AgentReportGenerator")
class AgentReportGenerator(BaseAgent):
"""
Agent pour générer un rapport complet à partir des analyses de ticket et d'images.
Cet agent prend en entrée :
- L'analyse du ticket
- Les analyses des images pertinentes
- Les métadonnées associées
Format de données attendu:
- JSON est le format principal de données en entrée et en sortie
- Le rapport Markdown est généré à partir du JSON uniquement pour la présentation
Structure des données d'analyse d'images:
- Deux structures possibles sont supportées:
1. Liste d'objets: rapport_data["images_analyses"] = [{image_name, analyse}, ...]
2. Dictionnaire: rapport_data["analyse_images"] = {chemin_image: {sorting: {...}, analysis: {...}}, ...}
Flux de traitement:
1. Préparation des données d'entrée
2. Génération du rapport avec le LLM
3. Sauvegarde au format JSON (format principal)
4. Conversion et sauvegarde au format Markdown (pour présentation)
"""
def __init__(self, llm):
super().__init__("AgentReportGenerator", llm)
# Configuration locale de l'agent
self.temperature = 0.4 # Génération de rapport factuelle mais bien structurée
self.top_p = 0.9
self.max_tokens = 2500
# Centralisation des exigences de format JSON
self.exigences_json = """
EXIGENCE ABSOLUE - GÉNÉRATION DE DONNÉES EN FORMAT JSON:
- Tu DOIS IMPÉRATIVEMENT inclure dans ta réponse un objet JSON structuré pour les échanges client/support
- Le format de chaque échange dans le JSON DOIT être:
{
"chronologie_echanges": [
{
"date": "date de l'échange",
"emetteur": "CLIENT ou SUPPORT",
"type": "Question ou Réponse ou Information technique",
"contenu": "contenu synthétisé de l'échange"
},
... autres échanges ...
]
}
- La structure doit être EXACTEMENT comme indiquée, avec le nom de clé "chronologie_echanges" obligatoirement
- Chaque message du ticket doit apparaître comme un objet dans la liste
- Indique clairement qui est CLIENT et qui est SUPPORT dans le champ "emetteur"
- Si une question n'a pas de réponse, assure-toi de le noter clairement
- Toute mention de "CBAD" doit être remplacée par "CBAO" qui est le nom correct de la société
- Tu dois synthétiser au mieux les échanges (le plus court et clair possible)
"""
# Centralisation des instructions de formatage
self.instructions_format = """
IMPORTANT POUR LE FORMAT:
- Le JSON doit être valide et parsable
- Utilise ```json et ``` pour délimiter le bloc JSON
- Ne modifie pas la structure des clés ("chronologie_echanges", "date", "emetteur", "type", "contenu")
- Assure-toi que les accolades et crochets sont correctement équilibrés
"""
# Centralisation de la structure du rapport
self.structure_rapport = """
Structure ton rapport:
1. Résumé exécutif: Synthèse du problème initial (nom de la demande + description)
2. Chronologie des échanges: Objet JSON avec la structure imposée ci-dessus (partie CRUCIALE)
3. Analyse des images: Ce que montrent les captures d'écran et leur pertinence
4. Diagnostic technique: Interprétation des informations techniques pertinentes
"""
# Centralisation des exemples JSON
self.exemples_json = """
EXEMPLES D'ÉCHANGES POUR RÉFÉRENCE:
Exemple 1:
```json
{
"chronologie_echanges": [
{"date": "2023-01-15", "emetteur": "CLIENT", "type": "Question", "contenu": "Je n'arrive pas à me connecter à l'application"},
{"date": "2023-01-16", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Avez-vous essayé de réinitialiser votre mot de passe?"}
]
}
```
Exemple 2:
```json
{
"chronologie_echanges": [
{"date": "2023-02-10", "emetteur": "CLIENT", "type": "Information technique", "contenu": "Version de l'application: 2.3.1"},
{"date": "2023-02-11", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Cette version contient un bug connu, veuillez mettre à jour"}
]
}
```
N'oublie pas que le format EXACT est important. Utilise TOUJOURS la clé "chronologie_echanges" comme clé principale.
"""
# Construction du prompt système final avec des blocs de texte littéraux pour éviter les problèmes d'accolades
self.system_prompt = f"""Tu es un expert en génération de rapports techniques pour BRG-Lab pour la société CBAO.
Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré et exploitable.
{self.exigences_json}
{self.instructions_format}
{self.structure_rapport}
Reste factuel et précis dans ton analyse.
Les données d'échanges client/support sont l'élément le plus important du rapport.
Tu DOIS inclure le JSON des échanges dans ta réponse exactement au format:
```json
{{
"chronologie_echanges": [
{{"date": "...", "emetteur": "CLIENT", "type": "Question", "contenu": "..."}},
{{"date": "...", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "..."}}
]
}}
```"""
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentReportGenerator initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres - mêmes paramètres pour tous les modèles
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
# Ajout des exemples dans le prompt système pour tous les modèles
if not "EXEMPLES D'ÉCHANGES" in self.llm.prompt_system:
self.llm.prompt_system += self.exemples_json
logger.info("Exemples JSON ajoutés au prompt système")
self.llm.configurer(**params)
logger.info(f"Configuration appliquée au modèle: {str(params)}")
else:
logger.warning("Le modèle LLM ne supporte pas la méthode configurer()")
def _generer_prompt_instructions(self) -> str:
"""
Génère les instructions pour la génération du rapport
Returns:
Instructions formatées
"""
return f"""
## INSTRUCTIONS POUR LA GÉNÉRATION DU RAPPORT
1. Résume d'abord le problème principal du ticket en quelques phrases.
2. GÉNÉRER OBLIGATOIREMENT LE JSON DES ÉCHANGES CLIENT/SUPPORT:
- Les données d'échanges sont l'élément le plus important du rapport
- Utilise EXACTEMENT la structure suivante, sans la modifier:
```json
{{
"chronologie_echanges": [
{{"date": "date1", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu de la question"}},
{{"date": "date2", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "contenu de la réponse"}}
]
}}
```
- La clé principale DOIT être "chronologie_echanges"
- N'ajoute pas de commentaires ou de texte dans le JSON
- Assure-toi que le JSON est valide et correspond EXACTEMENT au format demandé
- Entoure le JSON avec ```json et ``` pour faciliter l'extraction
3. Après le JSON, analyse les images pertinentes et leur contribution à la compréhension du problème.
4. Termine par une analyse technique des causes probables du problème.
IMPORTANT: Le JSON des échanges client/support est OBLIGATOIRE et doit être parfaitement formaté.
"""
def _generer_exemple_json(self) -> str:
"""
Génère un exemple JSON pour le prompt
Returns:
Exemple JSON formaté
"""
return """
EXEMPLE EXACT DU FORMAT JSON ATTENDU:
```json
{
"chronologie_echanges": [
{"date": "2023-05-10", "emetteur": "CLIENT", "type": "Question", "contenu": "L'application affiche une erreur lors de la connexion"},
{"date": "2023-05-11", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Pouvez-vous préciser le message d'erreur?"},
{"date": "2023-05-12", "emetteur": "CLIENT", "type": "Information technique", "contenu": "Message: Erreur de connexion au serveur"}
]
}
```
"""
def _formater_prompt_pour_rapport(self, ticket_analyse, images_analyses, ticket_id):
"""
Formate le prompt pour la génération du rapport
Args:
ticket_analyse: Analyse du ticket
images_analyses: Liste des analyses d'images, format [{image_name, analyse}, ...]
ticket_id: ID du ticket
Returns:
Prompt formaté pour le LLM
"""
num_images = len(images_analyses)
logger.info(f"Formatage du prompt avec {num_images} analyses d'images")
# Inclure une vérification des données reçues
prompt = f"""Génère un rapport technique complet pour le ticket #{ticket_id}, en te basant sur les analyses suivantes.
## VÉRIFICATION DES DONNÉES REÇUES
Je vais d'abord vérifier que j'ai bien reçu les données d'analyses:
- Analyse du ticket : {"PRÉSENTE" if ticket_analyse else "MANQUANTE"}
- Analyses d'images : {"PRÉSENTES (" + str(num_images) + " images)" if num_images > 0 else "MANQUANTES"}
## ANALYSE DU TICKET
{ticket_analyse}
## ANALYSES DES IMAGES ({num_images} images analysées)
"""
# Ajouter l'analyse de chaque image
for i, img_analyse in enumerate(images_analyses, 1):
image_name = img_analyse.get("image_name", f"Image {i}")
analyse = img_analyse.get("analyse", "Analyse non disponible")
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
logger.info(f"Ajout de l'analyse de l'image {image_name} au prompt ({len(str(analyse))} caractères)")
# Instructions claires pour tous les modèles
prompt += self._generer_prompt_instructions()
# Ajouter l'exemple non formaté pour éviter les erreurs de formatage
prompt += self._generer_exemple_json()
logger.info(f"Prompt formaté: {len(prompt)} caractères au total")
return prompt
def _generer_tableau_questions_reponses(self, echanges: List[Dict]) -> str:
"""
Génère un tableau question/réponse simplifié à partir des échanges
Args:
echanges: Liste des échanges client/support
Returns:
Tableau au format markdown
"""
if not echanges:
return "Aucun échange trouvé dans ce ticket."
# Initialiser le tableau
tableau = "\n## Tableau récapitulatif des échanges\n\n"
tableau += "| Question (Client) | Réponse (Support) |\n"
tableau += "|------------------|-------------------|\n"
# Variables pour suivre les questions et réponses
question_courante = None
questions_sans_reponse = []
# Parcourir tous les échanges pour identifier les questions et réponses
for echange in echanges:
emetteur = echange.get("emetteur", "").lower()
type_msg = echange.get("type", "").lower()
contenu = echange.get("contenu", "")
date = echange.get("date", "")
# Formater le contenu (synthétiser si trop long)
contenu_formate = self._synthétiser_contenu(contenu, 150)
# Si c'est une question du client
if emetteur == "client" and (type_msg == "question" or "?" in contenu):
# Si une question précédente n'a pas de réponse, l'ajouter à la liste
if question_courante:
questions_sans_reponse.append(question_courante)
# Enregistrer la nouvelle question courante
question_courante = f"{contenu_formate} _(date: {date})_"
# Si c'est une réponse du support et qu'il y a une question en attente
elif emetteur == "support" and question_courante:
# Ajouter la paire question/réponse au tableau
tableau += f"| {question_courante} | {contenu_formate} _(date: {date})_ |\n"
question_courante = None # Réinitialiser la question courante
# Traiter toute question restante sans réponse
if question_courante:
questions_sans_reponse.append(question_courante)
# Ajouter les questions sans réponse au tableau
for q in questions_sans_reponse:
tableau += f"| {q} | **Aucune réponse du support** |\n"
# Ajouter une note si aucun échange support n'a été trouvé
if not any(echange.get("emetteur", "").lower() == "support" for echange in echanges):
tableau += "\n**Note: Aucune réponse du support n'a été trouvée dans ce ticket.**\n"
return tableau
def _synthétiser_contenu(self, contenu: str, longueur_max: int) -> str:
"""
Synthétise le contenu s'il est trop long
Args:
contenu: Contenu à synthétiser
longueur_max: Longueur maximale souhaitée
Returns:
Contenu synthétisé
"""
if len(contenu) <= longueur_max:
return contenu
# Extraire les premiers caractères
debut = contenu[:longueur_max//2].strip()
# Extraire les derniers caractères
fin = contenu[-(longueur_max//2):].strip()
return f"{debut}... {fin}"
def executer(self, rapport_data: Dict, rapport_dir: str) -> Optional[str]:
"""
Génère un rapport à partir des analyses effectuées
Args:
rapport_data: Dictionnaire contenant toutes les données analysées
Doit contenir au moins une des clés:
- "ticket_analyse" ou "analyse_json": Analyse du ticket
- "analyse_images": Analyses des images (facultatif)
rapport_dir: Répertoire où sauvegarder le rapport
Returns:
Chemin vers le rapport JSON
"""
# Récupérer l'ID du ticket depuis les données
ticket_id = rapport_data.get("ticket_id", "")
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
ticket_id = rapport_data["ticket_data"].get("code", "")
if not ticket_id:
ticket_id = os.path.basename(os.path.dirname(rapport_dir))
if not ticket_id.startswith("T"):
# Dernier recours, utiliser le dernier segment du chemin
ticket_id = os.path.basename(rapport_dir)
logger.info(f"Génération du rapport pour le ticket: {ticket_id}")
print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}")
# Validation des données d'entrée
logger.info("Vérification de la complétude des données d'entrée:")
if "ticket_data" in rapport_data:
logger.info(f" - Données de ticket présentes: {len(str(rapport_data['ticket_data']))} caractères")
else:
logger.warning(" - Données de ticket manquantes")
# Vérification des analyses
ticket_analyse_exists = False
if "ticket_analyse" in rapport_data and rapport_data["ticket_analyse"]:
ticket_analyse_exists = True
logger.info(f" - Analyse du ticket présente: {len(rapport_data['ticket_analyse'])} caractères")
elif "analyse_json" in rapport_data and rapport_data["analyse_json"]:
ticket_analyse_exists = True
logger.info(f" - Analyse JSON présente: {len(rapport_data['analyse_json'])} caractères")
else:
logger.warning(" - Analyse du ticket manquante")
# Vérification des analyses d'images
if "analyse_images" in rapport_data and rapport_data["analyse_images"]:
n_images = len(rapport_data["analyse_images"])
n_relevant = sum(1 for _, data in rapport_data["analyse_images"].items()
if "sorting" in data and isinstance(data["sorting"], dict) and data["sorting"].get("is_relevant", False))
n_analyzed = sum(1 for _, data in rapport_data["analyse_images"].items()
if "analysis" in data and data["analysis"])
logger.info(f" - Analyses d'images présentes: {n_images} images, {n_relevant} pertinentes, {n_analyzed} analysées")
else:
logger.warning(" - Analyses d'images manquantes")
# S'assurer que le répertoire existe
if not os.path.exists(rapport_dir):
os.makedirs(rapport_dir)
logger.info(f"Répertoire de rapport créé: {rapport_dir}")
try:
# Préparer les données formatées pour l'analyse
ticket_analyse = None
# Vérifier que l'analyse du ticket est disponible sous l'une des clés possibles
if "ticket_analyse" in rapport_data and rapport_data["ticket_analyse"]:
ticket_analyse = rapport_data["ticket_analyse"]
logger.info("Utilisation de ticket_analyse")
elif "analyse_json" in rapport_data and rapport_data["analyse_json"]:
ticket_analyse = rapport_data["analyse_json"]
logger.info("Utilisation de analyse_json en fallback")
else:
# Créer une analyse par défaut si aucune n'est disponible
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
ticket_data = rapport_data.get("ticket_data", {})
ticket_name = ticket_data.get("name", "Sans titre")
ticket_desc = ticket_data.get("description", "Pas de description disponible")
ticket_analyse = f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie par l'agent d'analyse de ticket)"
# Préparer les données d'analyse d'images
images_analyses = []
analyse_images_data = rapport_data.get("analyse_images", {})
# Statistiques pour les métadonnées
total_images = len(analyse_images_data) if analyse_images_data else 0
images_pertinentes = 0
# Collecter des informations sur les agents et LLM utilisés
agents_info = self._collecter_info_agents(rapport_data)
# Transformer les analyses d'images en liste structurée pour le prompt
for image_path, analyse_data in analyse_images_data.items():
image_name = os.path.basename(image_path)
# Vérifier si l'image est pertinente
is_relevant = False
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
is_relevant = analyse_data["sorting"].get("is_relevant", False)
if is_relevant:
images_pertinentes += 1
# Récupérer l'analyse détaillée si elle existe et que l'image est pertinente
analyse_detail = None
if is_relevant:
if "analysis" in analyse_data and analyse_data["analysis"]:
# Vérifier différentes structures possibles de l'analyse
if isinstance(analyse_data["analysis"], dict):
if "analyse" in analyse_data["analysis"]:
analyse_detail = analyse_data["analysis"]["analyse"]
elif "error" in analyse_data["analysis"] and not analyse_data["analysis"].get("error", True):
# Si pas d'erreur et que l'analyse est directement dans le dictionnaire
analyse_detail = str(analyse_data["analysis"])
else:
# Essayer de récupérer directement le contenu du dictionnaire
analyse_detail = json.dumps(analyse_data["analysis"], ensure_ascii=False, indent=2)
elif isinstance(analyse_data["analysis"], str):
# Si l'analyse est directement une chaîne
analyse_detail = analyse_data["analysis"]
# Si l'analyse n'a pas été trouvée mais que l'image est pertinente
if not analyse_detail:
analyse_detail = f"Image marquée comme pertinente. Raison: {analyse_data['sorting'].get('reason', 'Non spécifiée')}"
# Analyse détaillée
if analyse_detail:
images_analyses.append({
"image_name": image_name,
"image_path": image_path,
"analyse": analyse_detail,
"sorting_info": analyse_data.get("sorting", {})
})
logger.info(f"Analyse de l'image {image_name} ajoutée au rapport (longueur: {len(str(analyse_detail))} caractères)")
else:
logger.warning(f"Analyse non trouvée pour l'image pertinente {image_name}")
else:
logger.info(f"Image {image_name} ignorée car non pertinente")
# Créer le chemin du fichier de rapport JSON (sortie principale)
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
# Formater les données pour le LLM
prompt = self._formater_prompt_pour_rapport(ticket_analyse, images_analyses, ticket_id)
# Générer le rapport avec le LLM
logger.info("Génération du rapport avec le LLM")
print(f" Génération du rapport avec le LLM...")
# Debut du timing
start_time = datetime.now()
# Interroger le LLM
rapport_genere = self.llm.interroger(prompt)
# Fin du timing
end_time = datetime.now()
generation_time = (end_time - start_time).total_seconds()
logger.info(f"Rapport généré: {len(rapport_genere)} caractères")
print(f" Rapport généré: {len(rapport_genere)} caractères")
# Traiter le JSON pour extraire la chronologie des échanges
_, echanges_json, _ = self._extraire_et_traiter_json(rapport_genere)
# Tracer l'historique avec le prompt pour la transparence
self.ajouter_historique("generation_rapport",
{
"ticket_id": ticket_id,
"prompt_taille": len(prompt),
"timestamp": self._get_timestamp()
},
rapport_genere)
# Préparer les métadonnées complètes pour le rapport
timestamp = self._get_timestamp()
# Extraire le résumé et diagnostic du rapport généré (première partie et dernière partie)
resume = ""
diagnostic = ""
if rapport_genere:
# Supprimer le bloc JSON (pour isoler le texte d'analyse)
rapport_sans_json = re.sub(r'```json.*?```', '', rapport_genere, flags=re.DOTALL)
# Diviser le texte en paragraphes
paragraphes = [p.strip() for p in rapport_sans_json.split('\n\n') if p.strip()]
# Le premier paragraphe est généralement le résumé
if paragraphes:
resume = paragraphes[0]
# Les derniers paragraphes après "Diagnostic" ou "Analyse technique"
# contiennent généralement le diagnostic
for i, p in enumerate(paragraphes):
if any(marker in p.lower() for marker in ["diagnostic", "analyse technique", "conclusion"]):
diagnostic = '\n\n'.join(paragraphes[i:])
break
# Préparer le JSON complet du rapport (format principal)
rapport_data_complet = {
"ticket_id": ticket_id,
"timestamp": timestamp,
"rapport_complet": rapport_genere, # Texte complet généré par le LLM
"ticket_analyse": ticket_analyse, # Analyse du ticket d'origine
"images_analyses": images_analyses, # Analyses des images
"chronologie_echanges": echanges_json.get("chronologie_echanges", []) if echanges_json else [],
"resume": resume, # Résumé extrait du rapport généré
"diagnostic": diagnostic, # Diagnostic technique extrait du rapport
"statistiques": {
"total_images": total_images,
"images_pertinentes": images_pertinentes,
"analyses_generees": len(images_analyses),
"generation_time": generation_time
},
"prompt": {
"systeme": self.system_prompt,
"utilisateur": prompt
}
}
# Ajouter les métadonnées pour la traçabilité
metadata = {
"timestamp": timestamp,
"generation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"model": getattr(self.llm, "modele", str(type(self.llm))),
"model_version": getattr(self.llm, "version", "non spécifiée"),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"agents": agents_info,
"generation_time": generation_time,
"duree_traitement": str(getattr(self.llm, "dureeTraitement", "N/A"))
}
rapport_data_complet["metadata"] = metadata
# Ajouter le tableau questions/réponses dans les métadonnées
if echanges_json and "chronologie_echanges" in echanges_json:
tableau_qr = self._generer_tableau_questions_reponses(echanges_json["chronologie_echanges"])
rapport_data_complet["tableau_questions_reponses"] = tableau_qr
# ÉTAPE 1: Sauvegarder le rapport au format JSON (FORMAT PRINCIPAL)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(rapport_data_complet, f, ensure_ascii=False, indent=2)
logger.info(f"Rapport JSON (format principal) sauvegardé: {json_path}")
print(f" Rapport JSON sauvegardé: {json_path}")
# Retourner le chemin du fichier JSON
return json_path
except Exception as e:
error_message = f"Erreur lors de la génération du rapport: {str(e)}"
logger.error(error_message)
logger.error(traceback.format_exc())
print(f" ERREUR: {error_message}")
return None
def _collecter_info_agents(self, rapport_data: Dict) -> Dict:
"""
Collecte des informations sur les agents utilisés dans l'analyse
Args:
rapport_data: Données du rapport
Returns:
Dictionnaire contenant les informations sur les agents
"""
agents_info = {}
# Informations sur l'agent JSON Analyser
if "analyse_json" in rapport_data:
json_analysis = rapport_data["analyse_json"]
# Vérifier si l'analyse JSON contient des métadonnées
if isinstance(json_analysis, dict) and "metadata" in json_analysis:
agents_info["json_analyser"] = json_analysis["metadata"]
# Informations sur les agents d'image
if "analyse_images" in rapport_data and rapport_data["analyse_images"]:
# Image Sorter
sorter_info = {}
analyser_info = {}
for img_path, img_data in rapport_data["analyse_images"].items():
# Collecter info du sorter
if "sorting" in img_data and isinstance(img_data["sorting"], dict) and "metadata" in img_data["sorting"]:
if "model_info" in img_data["sorting"]["metadata"]:
sorter_info = img_data["sorting"]["metadata"]["model_info"]
# Collecter info de l'analyser
if "analysis" in img_data and img_data["analysis"] and isinstance(img_data["analysis"], dict) and "metadata" in img_data["analysis"]:
if "model_info" in img_data["analysis"]["metadata"]:
analyser_info = img_data["analysis"]["metadata"]["model_info"]
# Une fois qu'on a trouvé les deux, on peut sortir
if sorter_info and analyser_info:
break
if sorter_info:
agents_info["image_sorter"] = sorter_info
if analyser_info:
agents_info["image_analyser"] = analyser_info
# Ajouter les informations de l'agent report generator
agents_info["report_generator"] = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
return agents_info
def _extraire_et_traiter_json(self, texte_rapport):
"""
Extrait l'objet JSON des échanges du texte du rapport et le convertit en Markdown
Args:
texte_rapport: Texte complet du rapport généré par le LLM
Returns:
Tuple (rapport_traité, echanges_json, echanges_markdown)
"""
# Remplacer CBAD par CBAO dans tout le rapport
texte_rapport = texte_rapport.replace("CBAD", "CBAO")
# Patterns de recherche plus variés pour s'adapter aux différents modèles
patterns = [
r'```json\s*({.*?})\s*```', # Pattern standard avec backticks triples
r'```\s*({.*?"chronologie_echanges".*?})\s*```', # Pattern sans spécifier json mais avec le contenu attendu
r'{[\s\n]*"chronologie_echanges"[\s\n]*:[\s\n]*\[.*?\][\s\n]*}', # Pattern sans backticks
r'<json>(.*?)</json>' # Pattern alternatif avec balises xml
]
# Essayer chaque pattern
json_text = None
for pattern in patterns:
json_match = re.search(pattern, texte_rapport, re.DOTALL)
if json_match:
json_text = json_match.group(1).strip()
logger.info(f"JSON trouvé avec le pattern: {pattern[:20]}...")
break
# Si aucun pattern n'a fonctionné, tenter une approche plus agressive pour extraire le JSON
if not json_text:
# Chercher des indices de début de JSON dans le texte
potential_starts = [
texte_rapport.find('{"chronologie_echanges"'),
texte_rapport.find('{\n "chronologie_echanges"'),
texte_rapport.find('{ "chronologie_echanges"')
]
# Filtrer les indices valides (non -1)
valid_starts = [idx for idx in potential_starts if idx != -1]
if valid_starts:
# Prendre l'indice le plus petit (premier dans le texte)
start_idx = min(valid_starts)
# Chercher la fin du JSON (accolade fermante suivie d'une nouvelle ligne ou de la fin du texte)
json_extract = texte_rapport[start_idx:]
# Compter les accolades pour trouver la fermeture du JSON
open_braces = 0
close_idx = -1
for i, char in enumerate(json_extract):
if char == '{':
open_braces += 1
elif char == '}':
open_braces -= 1
if open_braces == 0:
close_idx = i
break
if close_idx != -1:
json_text = json_extract[:close_idx + 1]
logger.info(f"JSON extrait par analyse d'accolades: {len(json_text)} caractères")
if not json_text:
logger.warning("Aucun JSON trouvé dans le rapport")
return texte_rapport, None, None
# Nettoyage supplémentaire du JSON
# Enlever caractères non imprimables ou indésirables qui pourraient être ajoutés par certains modèles
json_text = re.sub(r'[\x00-\x1F\x7F]', '', json_text)
try:
# Vérifier que le texte commence par { et se termine par }
if not (json_text.startswith('{') and json_text.endswith('}')):
logger.warning(f"Format JSON incorrect, tentative de correction. Texte: {json_text[:50]}...")
# Chercher les délimiteurs du JSON
start = json_text.find('{')
end = json_text.rfind('}')
if start != -1 and end != -1 and start < end:
json_text = json_text[start:end+1]
echanges_json = json.loads(json_text)
logger.info(f"JSON extrait avec succès: {len(json_text)} caractères")
# Vérifier si le JSON a la structure attendue
if not isinstance(echanges_json, dict) or "chronologie_echanges" not in echanges_json:
# Tenter de corriger la structure si possible
if len(echanges_json) > 0 and isinstance(list(echanges_json.values())[0], list):
# Prendre la première liste comme chronologie
key = list(echanges_json.keys())[0]
echanges_json = {"chronologie_echanges": echanges_json[key]}
logger.info(f"Structure JSON corrigée en utilisant la clé: {key}")
else:
logger.warning("Structure JSON incorrecte et non réparable")
return texte_rapport, None, None
# Convertir en tableau Markdown
echanges_markdown = "| Date | Émetteur | Type | Contenu | Statut |\n"
echanges_markdown += "|------|---------|------|---------|--------|\n"
if "chronologie_echanges" in echanges_json and isinstance(echanges_json["chronologie_echanges"], list):
# Pré-traitement pour vérifier les questions sans réponse
questions_sans_reponse = {}
for i, echange in enumerate(echanges_json["chronologie_echanges"]):
if echange.get("type", "").lower() == "question" and echange.get("emetteur", "").lower() == "client":
has_response = False
# Vérifier si la question a une réponse
for j in range(i+1, len(echanges_json["chronologie_echanges"])):
next_echange = echanges_json["chronologie_echanges"][j]
if next_echange.get("type", "").lower() == "réponse" and next_echange.get("emetteur", "").lower() == "support":
has_response = True
break
questions_sans_reponse[i] = not has_response
# Générer le tableau
for i, echange in enumerate(echanges_json["chronologie_echanges"]):
date = echange.get("date", "-")
emetteur = echange.get("emetteur", "-")
type_msg = echange.get("type", "-")
contenu = echange.get("contenu", "-")
# Ajouter un statut pour les questions sans réponse
statut = ""
if emetteur.lower() == "client" and type_msg.lower() == "question" and questions_sans_reponse.get(i, False):
statut = "**Sans réponse**"
echanges_markdown += f"| {date} | {emetteur} | {type_msg} | {contenu} | {statut} |\n"
# Ajouter une note si aucune réponse du support n'a été trouvée
if not any(echange.get("emetteur", "").lower() == "support" for echange in echanges_json["chronologie_echanges"]):
echanges_markdown += "\n**Note: Aucune réponse du support n'a été trouvée dans ce ticket.**\n\n"
# Ajouter un tableau questions/réponses simplifié
tableau_qr = self._generer_tableau_questions_reponses(echanges_json["chronologie_echanges"])
echanges_markdown += f"\n{tableau_qr}\n"
# Remplacer le JSON dans le texte par le tableau Markdown
# Si le JSON était entouré de backticks, remplacer tout le bloc
if json_match:
rapport_traite = texte_rapport.replace(json_match.group(0), echanges_markdown)
else:
# Sinon, remplacer juste le texte JSON
rapport_traite = texte_rapport.replace(json_text, echanges_markdown)
return rapport_traite, echanges_json, echanges_markdown
except json.JSONDecodeError as e:
logger.error(f"Erreur lors du décodage JSON: {e}")
logger.debug(f"Contenu JSON problématique: {json_text[:100]}...")
return texte_rapport, None, None
def _get_timestamp(self) -> str:
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
return datetime.now().strftime("%Y%m%d_%H%M%S")