llm_ticket3/agents/agent_report_generator.py
2025-04-07 15:40:17 +02:00

469 lines
22 KiB
Python

import json
import os
from .base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any, Tuple, Optional
import logging
logger = logging.getLogger("AgentReportGenerator")
class AgentReportGenerator(BaseAgent):
"""
Agent pour générer un rapport complet à partir des analyses de ticket et d'images
"""
def __init__(self, llm):
super().__init__("AgentReportGenerator", llm)
# Configuration locale de l'agent (remplace AgentConfig)
self.temperature = 0.4 # Génération de rapport factuelle mais bien structurée
self.top_p = 0.9
self.max_tokens = 2500
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab.
Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré et exploitable.
IMPORTANCE DES ÉCHANGES CLIENT/SUPPORT:
- Tu dois impérativement présenter les échanges client/support sous forme d'un TABLEAU MARKDOWN clair
- Chaque ligne du tableau doit contenir: Date | Émetteur | Type (Question/Réponse) | Contenu
- Identifie clairement qui est l'émetteur (CLIENT ou SUPPORT)
- Mets en évidence les questions posées et les réponses fournies
Structure ton rapport:
1. Résumé exécutif: Synthèse du problème initial (nom de la demande + description)
2. Chronologie des échanges: TABLEAU des interactions client/support
3. Analyse des images: Ce que montrent les captures d'écran et leur pertinence
4. Diagnostic technique: Interprétation des informations techniques pertinentes
Reste factuel et précis. Ne fais pas d'analyses inutiles ou de recommandations non fondées.
Ton rapport doit mettre en avant la chronologie des échanges et les informations techniques clés."""
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentReportGenerator initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
# Ajustements selon le type de modèle
if "mistral_medium" in self.llm.__class__.__name__.lower():
params["temperature"] += 0.05
params["max_tokens"] = 1000
elif "pixtral" in self.llm.__class__.__name__.lower():
params["temperature"] -= 0.05
elif "ollama" in self.llm.__class__.__name__.lower():
params["temperature"] += 0.1
params.update({
"num_ctx": 2048,
"repeat_penalty": 1.1,
})
self.llm.configurer(**params)
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
"""
Génère un rapport à partir des analyses effectuées
Args:
rapport_data: Dictionnaire contenant toutes les données analysées
rapport_dir: Répertoire où sauvegarder le rapport
Returns:
Tuple (chemin vers le rapport JSON, chemin vers le rapport Markdown)
"""
# Récupérer l'ID du ticket depuis les données
ticket_id = rapport_data.get("ticket_id", "")
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
ticket_id = rapport_data["ticket_data"].get("code", "")
if not ticket_id:
ticket_id = os.path.basename(os.path.dirname(rapport_dir))
if not ticket_id.startswith("T"):
# Dernier recours, utiliser le dernier segment du chemin
ticket_id = os.path.basename(rapport_dir)
logger.info(f"Génération du rapport pour le ticket: {ticket_id}")
print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}")
# S'assurer que le répertoire existe
if not os.path.exists(rapport_dir):
os.makedirs(rapport_dir)
try:
# Préparer les données formatées pour l'analyse
ticket_analyse = rapport_data.get("ticket_analyse", "")
if not ticket_analyse and "analyse_json" in rapport_data:
ticket_analyse = rapport_data.get("analyse_json", "Aucune analyse de ticket disponible")
# Préparer les données d'analyse d'images
images_analyses = []
analyse_images_data = rapport_data.get("analyse_images", {})
# Statistiques pour les métadonnées
total_images = len(analyse_images_data) if analyse_images_data else 0
images_pertinentes = 0
# Collecter des informations sur les agents et LLM utilisés
agents_info = self._collecter_info_agents(rapport_data)
# Transformer les analyses d'images en liste structurée pour le prompt
for image_path, analyse_data in analyse_images_data.items():
image_name = os.path.basename(image_path)
# Vérifier si l'image est pertinente
is_relevant = False
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
is_relevant = analyse_data["sorting"].get("is_relevant", False)
if is_relevant:
images_pertinentes += 1
# Récupérer l'analyse détaillée si elle existe et que l'image est pertinente
analyse_detail = None
if is_relevant:
if "analysis" in analyse_data and analyse_data["analysis"]:
if isinstance(analyse_data["analysis"], dict) and "analyse" in analyse_data["analysis"]:
analyse_detail = analyse_data["analysis"]["analyse"]
elif isinstance(analyse_data["analysis"], dict):
analyse_detail = str(analyse_data["analysis"])
# Si l'analyse n'a pas été trouvée mais que l'image est pertinente
if not analyse_detail:
analyse_detail = f"Image marquée comme pertinente. Raison: {analyse_data['sorting'].get('reason', 'Non spécifiée')}"
# Ajouter l'analyse à la liste si elle existe
if analyse_detail:
images_analyses.append({
"image_name": image_name,
"analyse": analyse_detail
})
num_images = len(images_analyses)
# Mettre à jour les métadonnées avec les statistiques
rapport_data.setdefault("metadata", {}).update({
"images_analysees": total_images,
"images_pertinentes": images_pertinentes
})
# Créer un prompt détaillé en s'assurant que toutes les analyses sont incluses
prompt = f"""Génère un rapport technique complet pour le ticket #{ticket_id}, en te basant sur les analyses suivantes.
## ANALYSE DU TICKET
{ticket_analyse}
## ANALYSES DES IMAGES ({num_images} images pertinentes sur {total_images} analysées)
"""
# Ajouter l'analyse de chaque image
for i, img_analyse in enumerate(images_analyses, 1):
image_name = img_analyse.get("image_name", f"Image {i}")
analyse = img_analyse.get("analyse", "Analyse non disponible")
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
prompt += f"""
Ton rapport doit être structuré avec les sections suivantes:
1. Résumé exécutif: Synthèse concise du problème initial (reprend le nom de la demande + description)
2. Chronologie des échanges: PRÉSENTE IMPÉRATIVEMENT SOUS FORME DE TABLEAU MARKDOWN, avec colonnes: Date | Émetteur (CLIENT/SUPPORT) | Type (Question/Réponse) | Contenu
3. Analyse des images pertinentes: Ce que montrent les captures d'écran
4. Diagnostic technique: Points clés et interprétation technique
IMPORTANT:
- Le tableau de chronologie doit être formaté correctement en Markdown (|---|---|---|---|)
- Tu dois clairement identifier qui est CLIENT et qui est SUPPORT
- Ne fais pas de recommandations génériques, reste factuel
- Utilise le format Markdown pour structurer ton rapport avec des titres clairs
Fournis un rapport concis, factuel et technique.
"""
# Appeler le LLM pour générer le rapport
logger.info("Interrogation du LLM pour la génération du rapport")
rapport_contenu = self.llm.interroger(prompt)
# Créer les noms de fichiers pour la sauvegarde
timestamp = self._get_timestamp()
base_filename = f"{ticket_id}_{timestamp}"
json_path = os.path.join(rapport_dir, f"{base_filename}.json")
md_path = os.path.join(rapport_dir, f"{base_filename}.md")
# Collecter les métadonnées du rapport avec détails sur les agents et LLM utilisés
metadata = rapport_data.get("metadata", {})
metadata.update({
"timestamp": timestamp,
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"system_prompt": self.system_prompt,
"agents_info": agents_info,
"images_analysees": total_images,
"images_pertinentes": images_pertinentes
})
# Sauvegarder le rapport au format JSON (données brutes + rapport généré)
rapport_data_complet = rapport_data.copy()
rapport_data_complet["rapport_genere"] = rapport_contenu
rapport_data_complet["metadata"] = metadata
with open(json_path, "w", encoding="utf-8") as f:
json.dump(rapport_data_complet, f, ensure_ascii=False, indent=2)
# Générer et sauvegarder le rapport au format Markdown basé directement sur le JSON
markdown_content = self._generer_markdown_depuis_json(rapport_data_complet)
with open(md_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
logger.info(f"Rapport sauvegardé: {json_path} et {md_path}")
except Exception as e:
error_message = f"Erreur lors de la génération du rapport: {str(e)}"
logger.error(error_message)
print(f" ERREUR: {error_message}")
return None, None
# Enregistrer l'historique
self.ajouter_historique("generation_rapport",
{
"rapport_dir": rapport_dir,
"rapport_data": rapport_data,
"prompt": prompt,
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
},
{
"json_path": json_path,
"md_path": md_path,
"rapport_contenu": rapport_contenu[:300] + ("..." if len(rapport_contenu) > 300 else "")
})
message = f"Rapports générés dans: {rapport_dir}"
print(f" {message}")
return json_path, md_path
def _collecter_info_agents(self, rapport_data: Dict) -> Dict:
"""
Collecte des informations sur les agents utilisés dans l'analyse
Args:
rapport_data: Données du rapport
Returns:
Dictionnaire contenant les informations sur les agents
"""
agents_info = {}
# Informations sur l'agent JSON Analyser
if "analyse_json" in rapport_data:
json_analysis = rapport_data["analyse_json"]
# Vérifier si l'analyse JSON contient des métadonnées
if isinstance(json_analysis, dict) and "metadata" in json_analysis:
agents_info["json_analyser"] = json_analysis["metadata"]
# Informations sur les agents d'image
if "analyse_images" in rapport_data and rapport_data["analyse_images"]:
# Image Sorter
sorter_info = {}
analyser_info = {}
for img_path, img_data in rapport_data["analyse_images"].items():
# Collecter info du sorter
if "sorting" in img_data and isinstance(img_data["sorting"], dict) and "metadata" in img_data["sorting"]:
if "model_info" in img_data["sorting"]["metadata"]:
sorter_info = img_data["sorting"]["metadata"]["model_info"]
# Collecter info de l'analyser
if "analysis" in img_data and img_data["analysis"] and isinstance(img_data["analysis"], dict) and "metadata" in img_data["analysis"]:
if "model_info" in img_data["analysis"]["metadata"]:
analyser_info = img_data["analysis"]["metadata"]["model_info"]
# Une fois qu'on a trouvé les deux, on peut sortir
if sorter_info and analyser_info:
break
if sorter_info:
agents_info["image_sorter"] = sorter_info
if analyser_info:
agents_info["image_analyser"] = analyser_info
# Ajouter les informations de l'agent report generator
agents_info["report_generator"] = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
return agents_info
def _generer_markdown_depuis_json(self, rapport_data: Dict) -> str:
"""
Génère un rapport Markdown directement à partir des données JSON
Args:
rapport_data: Données JSON complètes du rapport
Returns:
Contenu Markdown du rapport
"""
ticket_id = rapport_data.get("ticket_id", "")
timestamp = rapport_data.get("metadata", {}).get("timestamp", self._get_timestamp())
# Contenu de base du rapport (partie générée par le LLM)
rapport_contenu = rapport_data.get("rapport_genere", "")
# Entête du document
markdown = f"# Rapport d'analyse du ticket #{ticket_id}\n\n"
markdown += f"*Généré le: {timestamp}*\n\n"
# Ajouter le rapport principal
markdown += rapport_contenu + "\n\n"
# Ajouter les informations techniques (agents, LLM, paramètres)
markdown += "## Informations techniques\n\n"
# Ajouter un résumé du processus d'analyse complet
markdown += "### Processus d'analyse\n\n"
# 1. Analyse de ticket
ticket_analyse = rapport_data.get("ticket_analyse", "")
if not ticket_analyse and "analyse_json" in rapport_data:
ticket_analyse = rapport_data.get("analyse_json", "")
if ticket_analyse:
markdown += "#### Analyse du ticket\n\n"
markdown += "<details>\n<summary>Cliquez pour voir l'analyse complète du ticket</summary>\n\n"
markdown += "```\n" + ticket_analyse + "\n```\n\n"
markdown += "</details>\n\n"
# 2. Tri des images
markdown += "#### Tri des images\n\n"
analyse_images_data = rapport_data.get("analyse_images", {})
if analyse_images_data:
# Créer un tableau pour le tri des images
markdown += "| Image | Pertinence | Raison |\n"
markdown += "|-------|------------|--------|\n"
for image_path, analyse_data in analyse_images_data.items():
image_name = os.path.basename(image_path)
# Information de tri
is_relevant = "Non"
reason = "Non spécifiée"
if "sorting" in analyse_data:
sorting_data = analyse_data["sorting"]
if isinstance(sorting_data, dict):
is_relevant = "Oui" if sorting_data.get("is_relevant", False) else "Non"
reason = sorting_data.get("reason", "Non spécifiée")
markdown += f"| {image_name} | {is_relevant} | {reason} |\n"
markdown += "\n"
# 3. Analyse des images pertinentes
markdown += "#### Analyse des images pertinentes\n\n"
images_pertinentes = 0
for image_path, analyse_data in analyse_images_data.items():
# Vérifier si l'image est pertinente
is_relevant = False
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
is_relevant = analyse_data["sorting"].get("is_relevant", False)
if is_relevant:
images_pertinentes += 1
image_name = os.path.basename(image_path)
# Récupérer l'analyse détaillée
analyse_detail = "Analyse non disponible"
if "analysis" in analyse_data and analyse_data["analysis"]:
if isinstance(analyse_data["analysis"], dict) and "analyse" in analyse_data["analysis"]:
analyse_detail = analyse_data["analysis"]["analyse"]
elif isinstance(analyse_data["analysis"], dict):
analyse_detail = str(analyse_data["analysis"])
markdown += f"##### Image: {image_name}\n\n"
markdown += "<details>\n<summary>Cliquez pour voir l'analyse complète de l'image</summary>\n\n"
markdown += "```\n" + analyse_detail + "\n```\n\n"
markdown += "</details>\n\n"
if images_pertinentes == 0:
markdown += "Aucune image pertinente n'a été identifiée.\n\n"
# Ajouter les informations sur les agents utilisés
agents_info = rapport_data.get("metadata", {}).get("agents_info", {})
if agents_info:
markdown += "### Agents et modèles utilisés\n\n"
# Agent JSON Analyser
if "json_analyser" in agents_info:
info = agents_info["json_analyser"]
markdown += "#### Agent d'analyse de texte\n"
markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n"
markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n"
markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n"
markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n"
# Agent Image Sorter
if "image_sorter" in agents_info:
info = agents_info["image_sorter"]
markdown += "#### Agent de tri d'images\n"
markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n"
markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n"
markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n"
markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n"
# Agent Image Analyser
if "image_analyser" in agents_info:
info = agents_info["image_analyser"]
markdown += "#### Agent d'analyse d'images\n"
markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n"
markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n"
markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n"
markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n"
# Agent Report Generator
if "report_generator" in agents_info:
info = agents_info["report_generator"]
markdown += "#### Agent de génération de rapport\n"
markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n"
markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n"
markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n"
markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n"
# Statistiques d'analyse
markdown += "### Statistiques\n\n"
total_images = len(analyse_images_data) if analyse_images_data else 0
markdown += f"- **Images analysées**: {total_images}\n"
markdown += f"- **Images pertinentes**: {images_pertinentes}\n\n"
# Ajouter les métadonnées supplémentaires si présentes
if "metadata" in rapport_data:
metadata = rapport_data["metadata"]
for key, value in metadata.items():
if key not in ["agents_info", "timestamp"] and not isinstance(value, dict):
markdown += f"- **{key}**: {value}\n"
return markdown
def _get_timestamp(self) -> str:
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
return datetime.now().strftime("%Y%m%d_%H%M%S")