llm_ticket3/agents/agent_report_generator.py
2025-04-09 13:36:42 +02:00

553 lines
23 KiB
Python

import json
import os
from .base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any, Tuple, Optional, List
import logging
import traceback
import re
import sys
from .utils.report_utils import extraire_et_traiter_json
logger = logging.getLogger("AgentReportGenerator")
class AgentReportGenerator(BaseAgent):
"""
Agent pour générer un rapport synthétique à partir des analyses de ticket et d'images.
L'agent récupère:
1. L'analyse du ticket effectuée par AgentTicketAnalyser
2. Les analyses des images pertinentes effectuées par AgentImageAnalyser
Il génère:
- Un rapport JSON structuré (format principal)
- Un rapport Markdown pour la présentation
"""
def __init__(self, llm):
super().__init__("AgentReportGenerator", llm)
# Configuration locale de l'agent
self.temperature = 0.2
self.top_p = 0.9
self.max_tokens = 2500
# Prompt système pour la génération de rapport
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab pour la société CBAO.
Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré.
EXIGENCE ABSOLUE - Ton rapport DOIT inclure:
1. Un résumé du problème initial (nom de la demande + description)
2. Une chronologie des échanges client/support dans un objet JSON avec cette structure:
```json
{
"chronologie_echanges": [
{"date": "date", "emetteur": "CLIENT ou SUPPORT", "type": "Question ou Réponse ou Information technique", "contenu": "contenu synthétisé"}
]
}
```
3. Une analyse des images pertinentes en lien avec le problème
4. Un diagnostic technique des causes probables
IMPORTANT:
- La chronologie des échanges client/support est l'élément le plus important
- Le JSON doit être valide et inclure EXACTEMENT la structure demandée
- Cite précisément les questions du client et les réponses du support
- Tu dois respecter la logique chronologique des échanges pour éviter la confusion des questions/réponses
- Reste factuel et précis dans ton analyse"""
# Version du prompt pour la traçabilité
self.prompt_version = "v2.1"
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentReportGenerator initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
self.llm.configurer(**params)
logger.info(f"Configuration appliquée au modèle: {str(params)}")
def _formater_prompt_pour_rapport(self, ticket_analyse: str, images_analyses: List[Dict]) -> str:
"""
Formate le prompt pour la génération du rapport
Args:
ticket_analyse: Analyse du ticket
images_analyses: Liste des analyses d'images
Returns:
Prompt formaté pour le LLM
"""
num_images = len(images_analyses)
logger.info(f"Formatage du prompt avec {num_images} analyses d'images")
# Construire la section d'analyse du ticket
prompt = f"""Génère un rapport technique complet, en te basant sur les analyses suivantes.
## ANALYSE DU TICKET
{ticket_analyse}
"""
# Ajouter la section d'analyse des images si présente
if num_images > 0:
prompt += f"\n## ANALYSES DES IMAGES ({num_images} images)\n"
for i, img_analyse in enumerate(images_analyses, 1):
image_name = img_analyse.get("image_name", f"Image {i}")
analyse = img_analyse.get("analyse", "Analyse non disponible")
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
# Instructions pour le rapport - utiliser les mêmes éléments que dans system_prompt
# pour éviter les redondances et les incohérences
prompt += """
## INSTRUCTIONS POUR LE RAPPORT
1. Commence par un résumé concis du problème principal.
2. GÉNÈRE LA CHRONOLOGIE DES ÉCHANGES CLIENT/SUPPORT au format exact:
```json
{
"chronologie_echanges": [
{"date": "date1", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu de la question"},
{"date": "date2", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "contenu de la réponse"}
]
}
```
3. Analyse les images et leur contribution à la compréhension du problème.
4. Propose un diagnostic technique succinct des causes probables.
5. Titre clairement chaque section (par exemple "## Résumé du problème", "## Diagnostic technique", etc.)
Le JSON des échanges client/support est CRUCIAL et doit suivre EXACTEMENT le format demandé.
"""
return prompt
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
"""
Génère un rapport à partir des analyses effectuées
Args:
rapport_data: Dictionnaire contenant toutes les données analysées
rapport_dir: Répertoire où sauvegarder le rapport
Returns:
Tuple (chemin JSON, chemin Markdown) - Peut contenir None si une génération échoue
"""
try:
# 1. PRÉPARATION
ticket_id = self._extraire_ticket_id(rapport_data, rapport_dir)
logger.info(f"Génération du rapport pour le ticket: {ticket_id}")
print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}")
# Créer le répertoire de sortie si nécessaire
os.makedirs(rapport_dir, exist_ok=True)
# 2. EXTRACTION DES DONNÉES
ticket_analyse = self._extraire_analyse_ticket(rapport_data)
images_analyses = self._extraire_analyses_images(rapport_data)
# 3. COLLECTE DES INFORMATIONS SUR LES AGENTS
agents_info = self._collecter_info_agents(rapport_data)
prompts_utilises = self._collecter_prompts_agents()
# 4. GÉNÉRATION DU RAPPORT
prompt = self._formater_prompt_pour_rapport(ticket_analyse, images_analyses)
logger.info("Génération du rapport avec le LLM")
print(f" Génération du rapport avec le LLM...")
# Mesurer le temps d'exécution
start_time = datetime.now()
rapport_genere = self.llm.interroger(prompt)
generation_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Rapport généré: {len(rapport_genere)} caractères")
print(f" Rapport généré: {len(rapport_genere)} caractères")
# 5. EXTRACTION DES DONNÉES DU RAPPORT
# Utiliser l'utilitaire de report_utils.py pour extraire les données JSON
rapport_traite, echanges_json, _ = extraire_et_traiter_json(rapport_genere)
# Extraire les sections textuelles (résumé, diagnostic)
resume, diagnostic = self._extraire_sections_texte(rapport_genere)
# 6. CRÉATION ET SAUVEGARDE DU RAPPORT JSON
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
rapport_json = {
"ticket_id": ticket_id,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"rapport_complet": rapport_genere,
"ticket_analyse": ticket_analyse,
"images_analyses": images_analyses,
"chronologie_echanges": echanges_json.get("chronologie_echanges", []) if echanges_json else [],
"resume": resume,
"diagnostic": diagnostic,
"statistiques": {
"total_images": len(rapport_data.get("analyse_images", {})),
"images_pertinentes": len(images_analyses),
"generation_time": generation_time
},
"metadata": {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"model_version": getattr(self.llm, "version", "non spécifiée"),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"generation_time": generation_time,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"agents": agents_info
},
"prompts_utilisés": prompts_utilises,
"workflow": {
"etapes": [
{
"numero": 1,
"nom": "Analyse du ticket",
"agent": "AgentTicketAnalyser",
"description": "Extraction et analyse des informations du ticket"
},
{
"numero": 2,
"nom": "Tri des images",
"agent": "AgentImageSorter",
"description": "Identification des images pertinentes pour l'analyse"
},
{
"numero": 3,
"nom": "Analyse des images",
"agent": "AgentImageAnalyser",
"description": "Analyse détaillée des images pertinentes identifiées"
},
{
"numero": 4,
"nom": "Génération du rapport",
"agent": "AgentReportGenerator",
"description": "Synthèse des analyses et génération du rapport final"
}
]
}
}
# Sauvegarder le JSON
with open(json_path, "w", encoding="utf-8") as f:
json.dump(rapport_json, f, ensure_ascii=False, indent=2)
logger.info(f"Rapport JSON sauvegardé: {json_path}")
print(f" Rapport JSON sauvegardé: {json_path}")
# 7. GÉNÉRATION DU RAPPORT MARKDOWN
md_path = self._generer_rapport_markdown(json_path)
return json_path, md_path
except Exception as e:
error_message = f"Erreur lors de la génération du rapport: {str(e)}"
logger.error(error_message)
logger.error(traceback.format_exc())
print(f" ERREUR: {error_message}")
return None, None
def _extraire_ticket_id(self, rapport_data: Dict, rapport_dir: str) -> str:
"""Extrait l'ID du ticket des données ou du chemin"""
# Essayer d'extraire depuis les données du rapport
ticket_id = rapport_data.get("ticket_id", "")
# Si pas d'ID direct, essayer depuis les données du ticket
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
ticket_id = rapport_data["ticket_data"].get("code", "")
# En dernier recours, extraire depuis le chemin
if not ticket_id:
# Essayer d'extraire un ID de ticket (format Txxxx) du chemin
match = re.search(r'T\d+', rapport_dir)
if match:
ticket_id = match.group(0)
else:
# Sinon, utiliser le dernier segment du chemin
ticket_id = os.path.basename(rapport_dir)
return ticket_id
def _extraire_analyse_ticket(self, rapport_data: Dict) -> str:
"""Extrait l'analyse du ticket des données"""
# Essayer les différentes clés possibles
for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]:
if key in rapport_data and rapport_data[key]:
logger.info(f"Utilisation de {key}")
return rapport_data[key]
# Créer une analyse par défaut si aucune n'est disponible
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
ticket_data = rapport_data.get("ticket_data", {})
ticket_name = ticket_data.get("name", "Sans titre")
ticket_desc = ticket_data.get("description", "Pas de description disponible")
return f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie)"
def _extraire_analyses_images(self, rapport_data: Dict) -> List[Dict]:
"""
Extrait et formate les analyses d'images pertinentes
Args:
rapport_data: Données du rapport contenant les analyses d'images
Returns:
Liste des analyses d'images pertinentes formatées
"""
images_analyses = []
analyse_images_data = rapport_data.get("analyse_images", {})
# Parcourir toutes les images
for image_path, analyse_data in analyse_images_data.items():
# Vérifier si l'image est pertinente
is_relevant = False
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
is_relevant = analyse_data["sorting"].get("is_relevant", False)
# Si l'image est pertinente, extraire son analyse
if is_relevant:
image_name = os.path.basename(image_path)
analyse = self._extraire_analyse_image(analyse_data)
if analyse:
images_analyses.append({
"image_name": image_name,
"image_path": image_path,
"analyse": analyse,
"sorting_info": analyse_data.get("sorting", {}),
"metadata": analyse_data.get("analysis", {}).get("metadata", {})
})
logger.info(f"Analyse de l'image {image_name} ajoutée")
return images_analyses
def _extraire_analyse_image(self, analyse_data: Dict) -> Optional[str]:
"""
Extrait l'analyse d'une image depuis les données
Args:
analyse_data: Données d'analyse de l'image
Returns:
Texte d'analyse de l'image ou None si aucune analyse n'est disponible
"""
# Si pas de données d'analyse, retourner None
if not "analysis" in analyse_data or not analyse_data["analysis"]:
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
reason = analyse_data["sorting"].get("reason", "Non spécifiée")
return f"Image marquée comme pertinente. Raison: {reason}"
return None
# Extraire l'analyse selon le format des données
analysis = analyse_data["analysis"]
# Structure type 1: {"analyse": "texte"}
if isinstance(analysis, dict) and "analyse" in analysis:
return analysis["analyse"]
# Structure type 2: {"error": false, ...} - contient d'autres données utiles
if isinstance(analysis, dict) and "error" in analysis and not analysis.get("error", True):
return str(analysis)
# Structure type 3: texte d'analyse direct
if isinstance(analysis, str):
return analysis
# Structure type 4: autre format de dictionnaire - convertir en JSON
if isinstance(analysis, dict):
return json.dumps(analysis, ensure_ascii=False, indent=2)
# Aucun format reconnu
return None
def _extraire_sections_texte(self, rapport_genere: str) -> Tuple[str, str]:
"""
Extrait le résumé et le diagnostic du rapport généré
Args:
rapport_genere: Texte du rapport généré par le LLM
Returns:
Tuple (résumé, diagnostic)
"""
resume = ""
diagnostic = ""
# Supprimer le bloc JSON pour analyser le texte restant
rapport_sans_json = re.sub(r'```json.*?```', '', rapport_genere, re.DOTALL)
# Chercher les sections explicites
resume_match = re.search(r'(?:## Résumé du problème|## Résumé|# Résumé)(.*?)(?=##|\Z)', rapport_sans_json, re.DOTALL)
if resume_match:
resume = resume_match.group(1).strip()
diagnostic_match = re.search(r'(?:## Diagnostic technique|## Diagnostic|# Diagnostic)(.*?)(?=##|\Z)', rapport_sans_json, re.DOTALL)
if diagnostic_match:
diagnostic = diagnostic_match.group(1).strip()
# Si sections explicites non trouvées, utiliser l'extraction par paragraphes
if not resume:
# Diviser le texte en paragraphes non vides
paragraphes = [p.strip() for p in rapport_sans_json.split('\n\n') if p.strip()]
# Le premier paragraphe est généralement le résumé
if paragraphes:
resume = paragraphes[0]
# Si diagnostic non trouvé, chercher par mot-clé
if not diagnostic:
for i, p in enumerate(paragraphes):
if any(marker in p.lower() for marker in ["diagnostic", "analyse technique", "conclusion"]):
diagnostic = '\n\n'.join(paragraphes[i:])
break
return resume, diagnostic
def _collecter_info_agents(self, rapport_data: Dict) -> Dict:
"""
Collecte des informations sur les agents utilisés dans l'analyse
Args:
rapport_data: Données du rapport
Returns:
Dictionnaire contenant les informations sur les agents
"""
agents_info = {}
# Informations sur l'agent JSON Analyser (Ticket Analyser)
ticket_analyses = {}
for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]:
if key in rapport_data and isinstance(rapport_data[key], dict) and "metadata" in rapport_data[key]:
ticket_analyses = rapport_data[key]["metadata"]
break
if ticket_analyses:
agents_info["ticket_analyser"] = ticket_analyses
# Informations sur les agents d'image
if "analyse_images" in rapport_data and rapport_data["analyse_images"]:
# Image Sorter
sorter_info = {}
analyser_info = {}
for img_path, img_data in rapport_data["analyse_images"].items():
# Collecter info du sorter
if "sorting" in img_data and isinstance(img_data["sorting"], dict) and "metadata" in img_data["sorting"]:
sorter_info = img_data["sorting"]["metadata"]
# Collecter info de l'analyser
if "analysis" in img_data and isinstance(img_data["analysis"], dict) and "metadata" in img_data["analysis"]:
analyser_info = img_data["analysis"]["metadata"]
# Une fois qu'on a trouvé les deux, on peut sortir
if sorter_info and analyser_info:
break
if sorter_info:
agents_info["image_sorter"] = sorter_info
if analyser_info:
agents_info["image_analyser"] = analyser_info
# Ajouter les informations de l'agent report generator
agents_info["report_generator"] = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"prompt_version": self.prompt_version
}
return agents_info
def _collecter_prompts_agents(self) -> Dict[str, str]:
"""
Collecte les prompts système de tous les agents impliqués dans l'analyse.
Returns:
Dictionnaire contenant les prompts des agents
"""
prompts = {
"rapport_generator": self.system_prompt
}
# Importer les classes d'agents pour accéder à leurs prompts
try:
# Importer les autres agents
from .agent_ticket_analyser import AgentTicketAnalyser
from .agent_image_analyser import AgentImageAnalyser
from .agent_image_sorter import AgentImageSorter
# Créer des instances temporaires pour récupérer les prompts
# En passant None comme LLM pour éviter d'initialiser complètement les agents
try:
ticket_analyser = AgentTicketAnalyser(None)
prompts["ticket_analyser"] = ticket_analyser.system_prompt
logger.info("Prompt récupéré pour ticket_analyser")
except Exception as e:
logger.warning(f"Erreur lors de la récupération du prompt ticket_analyser: {str(e)}")
try:
image_analyser = AgentImageAnalyser(None)
prompts["image_analyser"] = image_analyser.system_prompt
logger.info("Prompt récupéré pour image_analyser")
except Exception as e:
logger.warning(f"Erreur lors de la récupération du prompt image_analyser: {str(e)}")
try:
image_sorter = AgentImageSorter(None)
prompts["image_sorter"] = image_sorter.system_prompt
logger.info("Prompt récupéré pour image_sorter")
except Exception as e:
logger.warning(f"Erreur lors de la récupération du prompt image_sorter: {str(e)}")
except ImportError as e:
logger.warning(f"Erreur lors de l'importation des classes d'agents: {str(e)}")
return prompts
def _generer_rapport_markdown(self, json_path: str) -> Optional[str]:
"""
Génère le rapport Markdown à partir du JSON
Args:
json_path: Chemin vers le fichier JSON du rapport
Returns:
Chemin vers le fichier Markdown généré ou None en cas d'erreur
"""
try:
# Ajouter le répertoire parent au path pour l'importation
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from formatters.report_formatter import generate_markdown_report
# Générer le rapport Markdown
success, md_path = generate_markdown_report(json_path)
if success:
logger.info(f"Rapport Markdown généré: {md_path}")
print(f" Rapport Markdown généré: {md_path}")
return md_path
else:
logger.warning(f"Erreur lors de la génération du rapport Markdown: {md_path}")
return None
except Exception as e:
logger.error(f"Erreur lors de la génération du rapport Markdown: {str(e)}")
return None