mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-16 06:37:46 +01:00
469 lines
22 KiB
Python
469 lines
22 KiB
Python
import json
|
|
import os
|
|
from .base_agent import BaseAgent
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Tuple, Optional
|
|
import logging
|
|
|
|
logger = logging.getLogger("AgentReportGenerator")
|
|
|
|
class AgentReportGenerator(BaseAgent):
|
|
"""
|
|
Agent pour générer un rapport complet à partir des analyses de ticket et d'images
|
|
"""
|
|
def __init__(self, llm):
|
|
super().__init__("AgentReportGenerator", llm)
|
|
|
|
# Configuration locale de l'agent (remplace AgentConfig)
|
|
self.temperature = 0.4 # Génération de rapport factuelle mais bien structurée
|
|
self.top_p = 0.9
|
|
self.max_tokens = 2500
|
|
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab.
|
|
Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré et exploitable.
|
|
|
|
IMPORTANCE DES ÉCHANGES CLIENT/SUPPORT:
|
|
- Tu dois impérativement présenter les échanges client/support sous forme d'un TABLEAU MARKDOWN clair
|
|
- Chaque ligne du tableau doit contenir: Date | Émetteur | Type (Question/Réponse) | Contenu
|
|
- Identifie clairement qui est l'émetteur (CLIENT ou SUPPORT)
|
|
- Mets en évidence les questions posées et les réponses fournies
|
|
|
|
Structure ton rapport:
|
|
1. Résumé exécutif: Synthèse du problème initial (nom de la demande + description)
|
|
2. Chronologie des échanges: TABLEAU des interactions client/support
|
|
3. Analyse des images: Ce que montrent les captures d'écran et leur pertinence
|
|
4. Diagnostic technique: Interprétation des informations techniques pertinentes
|
|
|
|
Reste factuel et précis. Ne fais pas d'analyses inutiles ou de recommandations non fondées.
|
|
Ton rapport doit mettre en avant la chronologie des échanges et les informations techniques clés."""
|
|
|
|
# Appliquer la configuration au LLM
|
|
self._appliquer_config_locale()
|
|
|
|
logger.info("AgentReportGenerator initialisé")
|
|
|
|
def _appliquer_config_locale(self) -> None:
|
|
"""
|
|
Applique la configuration locale au modèle LLM.
|
|
"""
|
|
# Appliquer le prompt système
|
|
if hasattr(self.llm, "prompt_system"):
|
|
self.llm.prompt_system = self.system_prompt
|
|
|
|
# Appliquer les paramètres
|
|
if hasattr(self.llm, "configurer"):
|
|
params = {
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens
|
|
}
|
|
|
|
# Ajustements selon le type de modèle
|
|
if "mistral_medium" in self.llm.__class__.__name__.lower():
|
|
params["temperature"] += 0.05
|
|
params["max_tokens"] = 1000
|
|
elif "pixtral" in self.llm.__class__.__name__.lower():
|
|
params["temperature"] -= 0.05
|
|
elif "ollama" in self.llm.__class__.__name__.lower():
|
|
params["temperature"] += 0.1
|
|
params.update({
|
|
"num_ctx": 2048,
|
|
"repeat_penalty": 1.1,
|
|
})
|
|
|
|
self.llm.configurer(**params)
|
|
|
|
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Génère un rapport à partir des analyses effectuées
|
|
|
|
Args:
|
|
rapport_data: Dictionnaire contenant toutes les données analysées
|
|
rapport_dir: Répertoire où sauvegarder le rapport
|
|
|
|
Returns:
|
|
Tuple (chemin vers le rapport JSON, chemin vers le rapport Markdown)
|
|
"""
|
|
# Récupérer l'ID du ticket depuis les données
|
|
ticket_id = rapport_data.get("ticket_id", "")
|
|
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
|
|
ticket_id = rapport_data["ticket_data"].get("code", "")
|
|
|
|
if not ticket_id:
|
|
ticket_id = os.path.basename(os.path.dirname(rapport_dir))
|
|
if not ticket_id.startswith("T"):
|
|
# Dernier recours, utiliser le dernier segment du chemin
|
|
ticket_id = os.path.basename(rapport_dir)
|
|
|
|
logger.info(f"Génération du rapport pour le ticket: {ticket_id}")
|
|
print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}")
|
|
|
|
# S'assurer que le répertoire existe
|
|
if not os.path.exists(rapport_dir):
|
|
os.makedirs(rapport_dir)
|
|
|
|
try:
|
|
# Préparer les données formatées pour l'analyse
|
|
ticket_analyse = rapport_data.get("ticket_analyse", "")
|
|
if not ticket_analyse and "analyse_json" in rapport_data:
|
|
ticket_analyse = rapport_data.get("analyse_json", "Aucune analyse de ticket disponible")
|
|
|
|
# Préparer les données d'analyse d'images
|
|
images_analyses = []
|
|
analyse_images_data = rapport_data.get("analyse_images", {})
|
|
|
|
# Statistiques pour les métadonnées
|
|
total_images = len(analyse_images_data) if analyse_images_data else 0
|
|
images_pertinentes = 0
|
|
|
|
# Collecter des informations sur les agents et LLM utilisés
|
|
agents_info = self._collecter_info_agents(rapport_data)
|
|
|
|
# Transformer les analyses d'images en liste structurée pour le prompt
|
|
for image_path, analyse_data in analyse_images_data.items():
|
|
image_name = os.path.basename(image_path)
|
|
|
|
# Vérifier si l'image est pertinente
|
|
is_relevant = False
|
|
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
|
|
is_relevant = analyse_data["sorting"].get("is_relevant", False)
|
|
if is_relevant:
|
|
images_pertinentes += 1
|
|
|
|
# Récupérer l'analyse détaillée si elle existe et que l'image est pertinente
|
|
analyse_detail = None
|
|
if is_relevant:
|
|
if "analysis" in analyse_data and analyse_data["analysis"]:
|
|
if isinstance(analyse_data["analysis"], dict) and "analyse" in analyse_data["analysis"]:
|
|
analyse_detail = analyse_data["analysis"]["analyse"]
|
|
elif isinstance(analyse_data["analysis"], dict):
|
|
analyse_detail = str(analyse_data["analysis"])
|
|
|
|
# Si l'analyse n'a pas été trouvée mais que l'image est pertinente
|
|
if not analyse_detail:
|
|
analyse_detail = f"Image marquée comme pertinente. Raison: {analyse_data['sorting'].get('reason', 'Non spécifiée')}"
|
|
|
|
# Ajouter l'analyse à la liste si elle existe
|
|
if analyse_detail:
|
|
images_analyses.append({
|
|
"image_name": image_name,
|
|
"analyse": analyse_detail
|
|
})
|
|
|
|
num_images = len(images_analyses)
|
|
|
|
# Mettre à jour les métadonnées avec les statistiques
|
|
rapport_data.setdefault("metadata", {}).update({
|
|
"images_analysees": total_images,
|
|
"images_pertinentes": images_pertinentes
|
|
})
|
|
|
|
# Créer un prompt détaillé en s'assurant que toutes les analyses sont incluses
|
|
prompt = f"""Génère un rapport technique complet pour le ticket #{ticket_id}, en te basant sur les analyses suivantes.
|
|
|
|
## ANALYSE DU TICKET
|
|
{ticket_analyse}
|
|
|
|
## ANALYSES DES IMAGES ({num_images} images pertinentes sur {total_images} analysées)
|
|
"""
|
|
|
|
# Ajouter l'analyse de chaque image
|
|
for i, img_analyse in enumerate(images_analyses, 1):
|
|
image_name = img_analyse.get("image_name", f"Image {i}")
|
|
analyse = img_analyse.get("analyse", "Analyse non disponible")
|
|
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
|
|
|
|
prompt += f"""
|
|
Ton rapport doit être structuré avec les sections suivantes:
|
|
1. Résumé exécutif: Synthèse concise du problème initial (reprend le nom de la demande + description)
|
|
2. Chronologie des échanges: PRÉSENTE IMPÉRATIVEMENT SOUS FORME DE TABLEAU MARKDOWN, avec colonnes: Date | Émetteur (CLIENT/SUPPORT) | Type (Question/Réponse) | Contenu
|
|
3. Analyse des images pertinentes: Ce que montrent les captures d'écran
|
|
4. Diagnostic technique: Points clés et interprétation technique
|
|
|
|
IMPORTANT:
|
|
- Le tableau de chronologie doit être formaté correctement en Markdown (|---|---|---|---|)
|
|
- Tu dois clairement identifier qui est CLIENT et qui est SUPPORT
|
|
- Ne fais pas de recommandations génériques, reste factuel
|
|
- Utilise le format Markdown pour structurer ton rapport avec des titres clairs
|
|
|
|
Fournis un rapport concis, factuel et technique.
|
|
"""
|
|
|
|
# Appeler le LLM pour générer le rapport
|
|
logger.info("Interrogation du LLM pour la génération du rapport")
|
|
rapport_contenu = self.llm.interroger(prompt)
|
|
|
|
# Créer les noms de fichiers pour la sauvegarde
|
|
timestamp = self._get_timestamp()
|
|
base_filename = f"{ticket_id}_{timestamp}"
|
|
json_path = os.path.join(rapport_dir, f"{base_filename}.json")
|
|
md_path = os.path.join(rapport_dir, f"{base_filename}.md")
|
|
|
|
# Collecter les métadonnées du rapport avec détails sur les agents et LLM utilisés
|
|
metadata = rapport_data.get("metadata", {})
|
|
metadata.update({
|
|
"timestamp": timestamp,
|
|
"model": getattr(self.llm, "modele", str(type(self.llm))),
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens,
|
|
"system_prompt": self.system_prompt,
|
|
"agents_info": agents_info,
|
|
"images_analysees": total_images,
|
|
"images_pertinentes": images_pertinentes
|
|
})
|
|
|
|
# Sauvegarder le rapport au format JSON (données brutes + rapport généré)
|
|
rapport_data_complet = rapport_data.copy()
|
|
rapport_data_complet["rapport_genere"] = rapport_contenu
|
|
rapport_data_complet["metadata"] = metadata
|
|
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|
json.dump(rapport_data_complet, f, ensure_ascii=False, indent=2)
|
|
|
|
# Générer et sauvegarder le rapport au format Markdown basé directement sur le JSON
|
|
markdown_content = self._generer_markdown_depuis_json(rapport_data_complet)
|
|
|
|
with open(md_path, "w", encoding="utf-8") as f:
|
|
f.write(markdown_content)
|
|
|
|
logger.info(f"Rapport sauvegardé: {json_path} et {md_path}")
|
|
|
|
except Exception as e:
|
|
error_message = f"Erreur lors de la génération du rapport: {str(e)}"
|
|
logger.error(error_message)
|
|
print(f" ERREUR: {error_message}")
|
|
return None, None
|
|
|
|
# Enregistrer l'historique
|
|
self.ajouter_historique("generation_rapport",
|
|
{
|
|
"rapport_dir": rapport_dir,
|
|
"rapport_data": rapport_data,
|
|
"prompt": prompt,
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens
|
|
},
|
|
{
|
|
"json_path": json_path,
|
|
"md_path": md_path,
|
|
"rapport_contenu": rapport_contenu[:300] + ("..." if len(rapport_contenu) > 300 else "")
|
|
})
|
|
|
|
message = f"Rapports générés dans: {rapport_dir}"
|
|
print(f" {message}")
|
|
|
|
return json_path, md_path
|
|
|
|
def _collecter_info_agents(self, rapport_data: Dict) -> Dict:
|
|
"""
|
|
Collecte des informations sur les agents utilisés dans l'analyse
|
|
|
|
Args:
|
|
rapport_data: Données du rapport
|
|
|
|
Returns:
|
|
Dictionnaire contenant les informations sur les agents
|
|
"""
|
|
agents_info = {}
|
|
|
|
# Informations sur l'agent JSON Analyser
|
|
if "analyse_json" in rapport_data:
|
|
json_analysis = rapport_data["analyse_json"]
|
|
# Vérifier si l'analyse JSON contient des métadonnées
|
|
if isinstance(json_analysis, dict) and "metadata" in json_analysis:
|
|
agents_info["json_analyser"] = json_analysis["metadata"]
|
|
|
|
# Informations sur les agents d'image
|
|
if "analyse_images" in rapport_data and rapport_data["analyse_images"]:
|
|
# Image Sorter
|
|
sorter_info = {}
|
|
analyser_info = {}
|
|
|
|
for img_path, img_data in rapport_data["analyse_images"].items():
|
|
# Collecter info du sorter
|
|
if "sorting" in img_data and isinstance(img_data["sorting"], dict) and "metadata" in img_data["sorting"]:
|
|
if "model_info" in img_data["sorting"]["metadata"]:
|
|
sorter_info = img_data["sorting"]["metadata"]["model_info"]
|
|
|
|
# Collecter info de l'analyser
|
|
if "analysis" in img_data and img_data["analysis"] and isinstance(img_data["analysis"], dict) and "metadata" in img_data["analysis"]:
|
|
if "model_info" in img_data["analysis"]["metadata"]:
|
|
analyser_info = img_data["analysis"]["metadata"]["model_info"]
|
|
|
|
# Une fois qu'on a trouvé les deux, on peut sortir
|
|
if sorter_info and analyser_info:
|
|
break
|
|
|
|
if sorter_info:
|
|
agents_info["image_sorter"] = sorter_info
|
|
if analyser_info:
|
|
agents_info["image_analyser"] = analyser_info
|
|
|
|
# Ajouter les informations de l'agent report generator
|
|
agents_info["report_generator"] = {
|
|
"model": getattr(self.llm, "modele", str(type(self.llm))),
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens
|
|
}
|
|
|
|
return agents_info
|
|
|
|
def _generer_markdown_depuis_json(self, rapport_data: Dict) -> str:
|
|
"""
|
|
Génère un rapport Markdown directement à partir des données JSON
|
|
|
|
Args:
|
|
rapport_data: Données JSON complètes du rapport
|
|
|
|
Returns:
|
|
Contenu Markdown du rapport
|
|
"""
|
|
ticket_id = rapport_data.get("ticket_id", "")
|
|
timestamp = rapport_data.get("metadata", {}).get("timestamp", self._get_timestamp())
|
|
|
|
# Contenu de base du rapport (partie générée par le LLM)
|
|
rapport_contenu = rapport_data.get("rapport_genere", "")
|
|
|
|
# Entête du document
|
|
markdown = f"# Rapport d'analyse du ticket #{ticket_id}\n\n"
|
|
markdown += f"*Généré le: {timestamp}*\n\n"
|
|
|
|
# Ajouter le rapport principal
|
|
markdown += rapport_contenu + "\n\n"
|
|
|
|
# Ajouter les informations techniques (agents, LLM, paramètres)
|
|
markdown += "## Informations techniques\n\n"
|
|
|
|
# Ajouter un résumé du processus d'analyse complet
|
|
markdown += "### Processus d'analyse\n\n"
|
|
|
|
# 1. Analyse de ticket
|
|
ticket_analyse = rapport_data.get("ticket_analyse", "")
|
|
if not ticket_analyse and "analyse_json" in rapport_data:
|
|
ticket_analyse = rapport_data.get("analyse_json", "")
|
|
|
|
if ticket_analyse:
|
|
markdown += "#### Analyse du ticket\n\n"
|
|
markdown += "<details>\n<summary>Cliquez pour voir l'analyse complète du ticket</summary>\n\n"
|
|
markdown += "```\n" + ticket_analyse + "\n```\n\n"
|
|
markdown += "</details>\n\n"
|
|
|
|
# 2. Tri des images
|
|
markdown += "#### Tri des images\n\n"
|
|
analyse_images_data = rapport_data.get("analyse_images", {})
|
|
|
|
if analyse_images_data:
|
|
# Créer un tableau pour le tri des images
|
|
markdown += "| Image | Pertinence | Raison |\n"
|
|
markdown += "|-------|------------|--------|\n"
|
|
|
|
for image_path, analyse_data in analyse_images_data.items():
|
|
image_name = os.path.basename(image_path)
|
|
|
|
# Information de tri
|
|
is_relevant = "Non"
|
|
reason = "Non spécifiée"
|
|
|
|
if "sorting" in analyse_data:
|
|
sorting_data = analyse_data["sorting"]
|
|
if isinstance(sorting_data, dict):
|
|
is_relevant = "Oui" if sorting_data.get("is_relevant", False) else "Non"
|
|
reason = sorting_data.get("reason", "Non spécifiée")
|
|
|
|
markdown += f"| {image_name} | {is_relevant} | {reason} |\n"
|
|
|
|
markdown += "\n"
|
|
|
|
# 3. Analyse des images pertinentes
|
|
markdown += "#### Analyse des images pertinentes\n\n"
|
|
|
|
images_pertinentes = 0
|
|
for image_path, analyse_data in analyse_images_data.items():
|
|
# Vérifier si l'image est pertinente
|
|
is_relevant = False
|
|
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
|
|
is_relevant = analyse_data["sorting"].get("is_relevant", False)
|
|
|
|
if is_relevant:
|
|
images_pertinentes += 1
|
|
image_name = os.path.basename(image_path)
|
|
|
|
# Récupérer l'analyse détaillée
|
|
analyse_detail = "Analyse non disponible"
|
|
if "analysis" in analyse_data and analyse_data["analysis"]:
|
|
if isinstance(analyse_data["analysis"], dict) and "analyse" in analyse_data["analysis"]:
|
|
analyse_detail = analyse_data["analysis"]["analyse"]
|
|
elif isinstance(analyse_data["analysis"], dict):
|
|
analyse_detail = str(analyse_data["analysis"])
|
|
|
|
markdown += f"##### Image: {image_name}\n\n"
|
|
markdown += "<details>\n<summary>Cliquez pour voir l'analyse complète de l'image</summary>\n\n"
|
|
markdown += "```\n" + analyse_detail + "\n```\n\n"
|
|
markdown += "</details>\n\n"
|
|
|
|
if images_pertinentes == 0:
|
|
markdown += "Aucune image pertinente n'a été identifiée.\n\n"
|
|
|
|
# Ajouter les informations sur les agents utilisés
|
|
agents_info = rapport_data.get("metadata", {}).get("agents_info", {})
|
|
if agents_info:
|
|
markdown += "### Agents et modèles utilisés\n\n"
|
|
|
|
# Agent JSON Analyser
|
|
if "json_analyser" in agents_info:
|
|
info = agents_info["json_analyser"]
|
|
markdown += "#### Agent d'analyse de texte\n"
|
|
markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n"
|
|
markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n"
|
|
markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n"
|
|
markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n"
|
|
|
|
# Agent Image Sorter
|
|
if "image_sorter" in agents_info:
|
|
info = agents_info["image_sorter"]
|
|
markdown += "#### Agent de tri d'images\n"
|
|
markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n"
|
|
markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n"
|
|
markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n"
|
|
markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n"
|
|
|
|
# Agent Image Analyser
|
|
if "image_analyser" in agents_info:
|
|
info = agents_info["image_analyser"]
|
|
markdown += "#### Agent d'analyse d'images\n"
|
|
markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n"
|
|
markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n"
|
|
markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n"
|
|
markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n"
|
|
|
|
# Agent Report Generator
|
|
if "report_generator" in agents_info:
|
|
info = agents_info["report_generator"]
|
|
markdown += "#### Agent de génération de rapport\n"
|
|
markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n"
|
|
markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n"
|
|
markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n"
|
|
markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n"
|
|
|
|
# Statistiques d'analyse
|
|
markdown += "### Statistiques\n\n"
|
|
|
|
total_images = len(analyse_images_data) if analyse_images_data else 0
|
|
|
|
markdown += f"- **Images analysées**: {total_images}\n"
|
|
markdown += f"- **Images pertinentes**: {images_pertinentes}\n\n"
|
|
|
|
# Ajouter les métadonnées supplémentaires si présentes
|
|
if "metadata" in rapport_data:
|
|
metadata = rapport_data["metadata"]
|
|
for key, value in metadata.items():
|
|
if key not in ["agents_info", "timestamp"] and not isinstance(value, dict):
|
|
markdown += f"- **{key}**: {value}\n"
|
|
|
|
return markdown
|
|
|
|
def _get_timestamp(self) -> str:
|
|
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S") |