mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-15 22:06:50 +01:00
717 lines
36 KiB
Plaintext
717 lines
36 KiB
Plaintext
import json
|
|
import os
|
|
from .base_agent import BaseAgent
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Tuple, Optional
|
|
import logging
|
|
import traceback
|
|
import re
|
|
|
|
logger = logging.getLogger("AgentReportGenerator")
|
|
|
|
class AgentReportGenerator(BaseAgent):
|
|
"""
|
|
Agent pour générer un rapport complet à partir des analyses de ticket et d'images.
|
|
|
|
Cet agent prend en entrée :
|
|
- L'analyse du ticket
|
|
- Les analyses des images pertinentes
|
|
- Les métadonnées associées
|
|
|
|
Format de données attendu:
|
|
- JSON est le format principal de données en entrée et en sortie
|
|
- Le rapport Markdown est généré à partir du JSON uniquement pour la présentation
|
|
|
|
Structure des données d'analyse d'images:
|
|
- Deux structures possibles sont supportées:
|
|
1. Liste d'objets: rapport_data["images_analyses"] = [{image_name, analyse}, ...]
|
|
2. Dictionnaire: rapport_data["analyse_images"] = {chemin_image: {sorting: {...}, analysis: {...}}, ...}
|
|
|
|
Flux de traitement:
|
|
1. Préparation des données d'entrée
|
|
2. Génération du rapport avec le LLM
|
|
3. Sauvegarde au format JSON (format principal)
|
|
4. Conversion et sauvegarde au format Markdown (pour présentation)
|
|
"""
|
|
def __init__(self, llm):
|
|
super().__init__("AgentReportGenerator", llm)
|
|
|
|
# Configuration locale de l'agent (remplace AgentConfig)
|
|
self.temperature = 0.4 # Génération de rapport factuelle mais bien structurée
|
|
self.top_p = 0.9
|
|
self.max_tokens = 2500
|
|
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab pour la société CBAO.
|
|
Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré et exploitable.
|
|
|
|
EXIGENCE ABSOLUE - GÉNÉRATION DE DONNÉES EN FORMAT JSON:
|
|
- Tu DOIS IMPÉRATIVEMENT inclure dans ta réponse un objet JSON structuré pour les échanges client/support
|
|
- Le format de chaque échange dans le JSON DOIT être:
|
|
{
|
|
"chronologie_echanges": [
|
|
{
|
|
"date": "date de l'échange",
|
|
"emetteur": "CLIENT ou SUPPORT",
|
|
"type": "Question ou Réponse ou Information technique",
|
|
"contenu": "contenu synthétisé de l'échange"
|
|
},
|
|
... autres échanges ...
|
|
]
|
|
}
|
|
- Chaque message du ticket doit apparaître comme un objet dans la liste
|
|
- Indique clairement qui est CLIENT et qui est SUPPORT dans le champ "emetteur"
|
|
- Si une question n'a pas de réponse, assure-toi de le noter clairement
|
|
- Toute mention de "CBAD" doit être remplacée par "CBAO" qui est le nom correct de la société
|
|
- Tu dois synthétiser au mieux les échanges (le plus court et clair possible)
|
|
|
|
Structure ton rapport:
|
|
1. Résumé exécutif: Synthèse du problème initial (nom de la demande + description)
|
|
2. Chronologie des échanges: Objet JSON avec la structure imposée ci-dessus
|
|
3. Analyse des images: Ce que montrent les captures d'écran et leur pertinence
|
|
4. Diagnostic technique: Interprétation des informations techniques pertinentes
|
|
|
|
Reste factuel et précis dans ton analyse.
|
|
Les données d'échanges client/support sont l'élément le plus important du rapport.
|
|
Tu DOIS inclure le JSON des échanges dans ta réponse au format:
|
|
```json
|
|
{
|
|
"chronologie_echanges": [
|
|
{"date": "...", "emetteur": "CLIENT", "type": "Question", "contenu": "..."},
|
|
{"date": "...", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "..."}
|
|
]
|
|
}
|
|
```"""
|
|
|
|
# Appliquer la configuration au LLM
|
|
self._appliquer_config_locale()
|
|
|
|
logger.info("AgentReportGenerator initialisé")
|
|
|
|
def _appliquer_config_locale(self) -> None:
|
|
"""
|
|
Applique la configuration locale au modèle LLM.
|
|
"""
|
|
# Appliquer le prompt système
|
|
if hasattr(self.llm, "prompt_system"):
|
|
self.llm.prompt_system = self.system_prompt
|
|
|
|
# Appliquer les paramètres
|
|
if hasattr(self.llm, "configurer"):
|
|
params = {
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens
|
|
}
|
|
|
|
# Ajustements selon le type de modèle
|
|
if "mistral_medium" in self.llm.__class__.__name__.lower():
|
|
params["temperature"] += 0.05
|
|
params["max_tokens"] = 1000
|
|
elif "pixtral" in self.llm.__class__.__name__.lower():
|
|
params["temperature"] -= 0.05
|
|
elif "ollama" in self.llm.__class__.__name__.lower():
|
|
params["temperature"] += 0.1
|
|
params.update({
|
|
"num_ctx": 2048,
|
|
"repeat_penalty": 1.1,
|
|
})
|
|
|
|
self.llm.configurer(**params)
|
|
|
|
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Génère un rapport à partir des analyses effectuées
|
|
|
|
Args:
|
|
rapport_data: Dictionnaire contenant toutes les données analysées
|
|
Doit contenir au moins une des clés:
|
|
- "ticket_analyse" ou "analyse_json": Analyse du ticket
|
|
- "analyse_images": Analyses des images (facultatif)
|
|
rapport_dir: Répertoire où sauvegarder le rapport
|
|
|
|
Returns:
|
|
Tuple (chemin vers le rapport JSON, chemin vers le rapport Markdown)
|
|
"""
|
|
# Récupérer l'ID du ticket depuis les données
|
|
ticket_id = rapport_data.get("ticket_id", "")
|
|
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
|
|
ticket_id = rapport_data["ticket_data"].get("code", "")
|
|
|
|
if not ticket_id:
|
|
ticket_id = os.path.basename(os.path.dirname(rapport_dir))
|
|
if not ticket_id.startswith("T"):
|
|
# Dernier recours, utiliser le dernier segment du chemin
|
|
ticket_id = os.path.basename(rapport_dir)
|
|
|
|
logger.info(f"Génération du rapport pour le ticket: {ticket_id}")
|
|
print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}")
|
|
|
|
# Validation des données d'entrée
|
|
logger.info("Vérification de la complétude des données d'entrée:")
|
|
if "ticket_data" in rapport_data:
|
|
logger.info(f" - Données de ticket présentes: {len(str(rapport_data['ticket_data']))} caractères")
|
|
else:
|
|
logger.warning(" - Données de ticket manquantes")
|
|
|
|
# Vérification des analyses
|
|
ticket_analyse_exists = False
|
|
if "ticket_analyse" in rapport_data and rapport_data["ticket_analyse"]:
|
|
ticket_analyse_exists = True
|
|
logger.info(f" - Analyse du ticket présente: {len(rapport_data['ticket_analyse'])} caractères")
|
|
elif "analyse_json" in rapport_data and rapport_data["analyse_json"]:
|
|
ticket_analyse_exists = True
|
|
logger.info(f" - Analyse JSON présente: {len(rapport_data['analyse_json'])} caractères")
|
|
else:
|
|
logger.warning(" - Analyse du ticket manquante")
|
|
|
|
# Vérification des analyses d'images
|
|
if "analyse_images" in rapport_data and rapport_data["analyse_images"]:
|
|
n_images = len(rapport_data["analyse_images"])
|
|
n_relevant = sum(1 for _, data in rapport_data["analyse_images"].items()
|
|
if "sorting" in data and isinstance(data["sorting"], dict) and data["sorting"].get("is_relevant", False))
|
|
n_analyzed = sum(1 for _, data in rapport_data["analyse_images"].items()
|
|
if "analysis" in data and data["analysis"])
|
|
|
|
logger.info(f" - Analyses d'images présentes: {n_images} images, {n_relevant} pertinentes, {n_analyzed} analysées")
|
|
else:
|
|
logger.warning(" - Analyses d'images manquantes")
|
|
|
|
# S'assurer que le répertoire existe
|
|
if not os.path.exists(rapport_dir):
|
|
os.makedirs(rapport_dir)
|
|
logger.info(f"Répertoire de rapport créé: {rapport_dir}")
|
|
|
|
try:
|
|
# Préparer les données formatées pour l'analyse
|
|
ticket_analyse = None
|
|
|
|
# Vérifier que l'analyse du ticket est disponible sous l'une des clés possibles
|
|
if "ticket_analyse" in rapport_data and rapport_data["ticket_analyse"]:
|
|
ticket_analyse = rapport_data["ticket_analyse"]
|
|
logger.info("Utilisation de ticket_analyse")
|
|
elif "analyse_json" in rapport_data and rapport_data["analyse_json"]:
|
|
ticket_analyse = rapport_data["analyse_json"]
|
|
logger.info("Utilisation de analyse_json en fallback")
|
|
else:
|
|
# Créer une analyse par défaut si aucune n'est disponible
|
|
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
|
|
ticket_data = rapport_data.get("ticket_data", {})
|
|
ticket_name = ticket_data.get("name", "Sans titre")
|
|
ticket_desc = ticket_data.get("description", "Pas de description disponible")
|
|
ticket_analyse = f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie par l'agent d'analyse de ticket)"
|
|
|
|
# Préparer les données d'analyse d'images
|
|
images_analyses = []
|
|
analyse_images_data = rapport_data.get("analyse_images", {})
|
|
|
|
# Statistiques pour les métadonnées
|
|
total_images = len(analyse_images_data) if analyse_images_data else 0
|
|
images_pertinentes = 0
|
|
|
|
# Collecter des informations sur les agents et LLM utilisés
|
|
agents_info = self._collecter_info_agents(rapport_data)
|
|
|
|
# Transformer les analyses d'images en liste structurée pour le prompt
|
|
for image_path, analyse_data in analyse_images_data.items():
|
|
image_name = os.path.basename(image_path)
|
|
|
|
# Vérifier si l'image est pertinente
|
|
is_relevant = False
|
|
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
|
|
is_relevant = analyse_data["sorting"].get("is_relevant", False)
|
|
if is_relevant:
|
|
images_pertinentes += 1
|
|
|
|
# Récupérer l'analyse détaillée si elle existe et que l'image est pertinente
|
|
analyse_detail = None
|
|
if is_relevant:
|
|
if "analysis" in analyse_data and analyse_data["analysis"]:
|
|
# Vérifier différentes structures possibles de l'analyse
|
|
if isinstance(analyse_data["analysis"], dict):
|
|
if "analyse" in analyse_data["analysis"]:
|
|
analyse_detail = analyse_data["analysis"]["analyse"]
|
|
elif "error" in analyse_data["analysis"] and not analyse_data["analysis"].get("error", True):
|
|
# Si pas d'erreur et que l'analyse est directement dans le dictionnaire
|
|
analyse_detail = str(analyse_data["analysis"])
|
|
else:
|
|
# Essayer de récupérer directement le contenu du dictionnaire
|
|
analyse_detail = json.dumps(analyse_data["analysis"], ensure_ascii=False, indent=2)
|
|
elif isinstance(analyse_data["analysis"], str):
|
|
# Si l'analyse est directement une chaîne
|
|
analyse_detail = analyse_data["analysis"]
|
|
|
|
# Si l'analyse n'a pas été trouvée mais que l'image est pertinente
|
|
if not analyse_detail:
|
|
analyse_detail = f"Image marquée comme pertinente. Raison: {analyse_data['sorting'].get('reason', 'Non spécifiée')}"
|
|
|
|
# Analyse détaillée
|
|
if analyse_detail:
|
|
images_analyses.append({
|
|
"image_name": image_name,
|
|
"analyse": analyse_detail
|
|
})
|
|
logger.info(f"Analyse de l'image {image_name} ajoutée au rapport (longueur: {len(str(analyse_detail))} caractères)")
|
|
else:
|
|
logger.warning(f"Analyse non trouvée pour l'image pertinente {image_name}")
|
|
else:
|
|
logger.info(f"Image {image_name} ignorée car non pertinente")
|
|
|
|
# Créer le chemin du fichier de rapport JSON (sortie principale)
|
|
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
|
|
md_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.md")
|
|
|
|
# Formater les données pour le LLM
|
|
prompt = self._formater_prompt_pour_rapport(ticket_analyse, images_analyses, ticket_id)
|
|
|
|
# Générer le rapport avec le LLM
|
|
logger.info("Génération du rapport avec le LLM")
|
|
print(f" Génération du rapport avec le LLM...")
|
|
|
|
# Interroger le LLM
|
|
rapport_genere = self.llm.interroger(prompt)
|
|
logger.info(f"Rapport généré: {len(rapport_genere)} caractères")
|
|
print(f" Rapport généré: {len(rapport_genere)} caractères")
|
|
|
|
# Traiter le JSON pour extraire la chronologie des échanges et le convertir en tableau Markdown
|
|
rapport_traite, echanges_json, echanges_markdown = self._extraire_et_traiter_json(rapport_genere)
|
|
if echanges_json and echanges_markdown:
|
|
logger.info(f"Échanges JSON convertis en tableau Markdown: {len(str(echanges_markdown))} caractères")
|
|
print(f" Échanges client/support convertis du JSON vers le format Markdown")
|
|
# Utiliser le rapport traité avec le tableau Markdown à la place du JSON
|
|
rapport_genere = rapport_traite
|
|
else:
|
|
logger.warning("Aucun JSON d'échanges trouvé dans le rapport, conservation du format original")
|
|
|
|
# Tracer l'historique avec le prompt pour la transparence
|
|
self.ajouter_historique("generation_rapport",
|
|
{
|
|
"ticket_id": ticket_id,
|
|
"prompt_taille": len(prompt),
|
|
"timestamp": self._get_timestamp()
|
|
},
|
|
rapport_genere)
|
|
|
|
# Préparer les métadonnées complètes pour le rapport
|
|
timestamp = self._get_timestamp()
|
|
|
|
# Préparer le JSON complet du rapport (format principal)
|
|
rapport_data_complet = {
|
|
"ticket_id": ticket_id,
|
|
"timestamp": timestamp,
|
|
"rapport_genere": rapport_genere,
|
|
"ticket_analyse": ticket_analyse,
|
|
"images_analyses": images_analyses,
|
|
"statistiques": {
|
|
"total_images": total_images,
|
|
"images_pertinentes": images_pertinentes,
|
|
"analyses_generees": len(images_analyses)
|
|
}
|
|
}
|
|
|
|
# Ajouter les métadonnées pour la traçabilité
|
|
metadata = {
|
|
"timestamp": timestamp,
|
|
"model": getattr(self.llm, "modele", str(type(self.llm))),
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens,
|
|
"agents": agents_info
|
|
}
|
|
|
|
rapport_data_complet["metadata"] = metadata
|
|
|
|
# S'assurer que les clés nécessaires pour le markdown sont présentes
|
|
if "ticket_analyse" not in rapport_data_complet:
|
|
rapport_data_complet["ticket_analyse"] = ticket_analyse
|
|
|
|
# ÉTAPE 1: Sauvegarder le rapport au format JSON (FORMAT PRINCIPAL)
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|
json.dump(rapport_data_complet, f, ensure_ascii=False, indent=2)
|
|
|
|
logger.info(f"Rapport JSON (format principal) sauvegardé: {json_path}")
|
|
print(f" Rapport JSON sauvegardé: {json_path}")
|
|
|
|
# ÉTAPE 2: Générer et sauvegarder le rapport au format Markdown (pour présentation)
|
|
markdown_content = self._generer_markdown_depuis_json(rapport_data_complet)
|
|
|
|
with open(md_path, "w", encoding="utf-8") as f:
|
|
f.write(markdown_content)
|
|
|
|
logger.info(f"Rapport Markdown (pour présentation) sauvegardé: {md_path}")
|
|
print(f" Rapport Markdown sauvegardé: {md_path}")
|
|
|
|
logger.info(f"Taille du rapport Markdown: {len(markdown_content)} caractères")
|
|
|
|
# Retourner les chemins des deux fichiers (JSON en premier, Markdown en second)
|
|
return json_path, md_path
|
|
|
|
except Exception as e:
|
|
error_message = f"Erreur lors de la génération du rapport: {str(e)}"
|
|
logger.error(error_message)
|
|
print(f" ERREUR: {error_message}")
|
|
return None, None
|
|
|
|
def _collecter_info_agents(self, rapport_data: Dict) -> Dict:
|
|
"""
|
|
Collecte des informations sur les agents utilisés dans l'analyse
|
|
|
|
Args:
|
|
rapport_data: Données du rapport
|
|
|
|
Returns:
|
|
Dictionnaire contenant les informations sur les agents
|
|
"""
|
|
agents_info = {}
|
|
|
|
# Informations sur l'agent JSON Analyser
|
|
if "analyse_json" in rapport_data:
|
|
json_analysis = rapport_data["analyse_json"]
|
|
# Vérifier si l'analyse JSON contient des métadonnées
|
|
if isinstance(json_analysis, dict) and "metadata" in json_analysis:
|
|
agents_info["json_analyser"] = json_analysis["metadata"]
|
|
|
|
# Informations sur les agents d'image
|
|
if "analyse_images" in rapport_data and rapport_data["analyse_images"]:
|
|
# Image Sorter
|
|
sorter_info = {}
|
|
analyser_info = {}
|
|
|
|
for img_path, img_data in rapport_data["analyse_images"].items():
|
|
# Collecter info du sorter
|
|
if "sorting" in img_data and isinstance(img_data["sorting"], dict) and "metadata" in img_data["sorting"]:
|
|
if "model_info" in img_data["sorting"]["metadata"]:
|
|
sorter_info = img_data["sorting"]["metadata"]["model_info"]
|
|
|
|
# Collecter info de l'analyser
|
|
if "analysis" in img_data and img_data["analysis"] and isinstance(img_data["analysis"], dict) and "metadata" in img_data["analysis"]:
|
|
if "model_info" in img_data["analysis"]["metadata"]:
|
|
analyser_info = img_data["analysis"]["metadata"]["model_info"]
|
|
|
|
# Une fois qu'on a trouvé les deux, on peut sortir
|
|
if sorter_info and analyser_info:
|
|
break
|
|
|
|
if sorter_info:
|
|
agents_info["image_sorter"] = sorter_info
|
|
if analyser_info:
|
|
agents_info["image_analyser"] = analyser_info
|
|
|
|
# Ajouter les informations de l'agent report generator
|
|
agents_info["report_generator"] = {
|
|
"model": getattr(self.llm, "modele", str(type(self.llm))),
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens
|
|
}
|
|
|
|
return agents_info
|
|
|
|
def _generer_markdown_depuis_json(self, rapport_data: Dict) -> str:
|
|
"""
|
|
Génère un rapport Markdown directement à partir des données JSON
|
|
|
|
Args:
|
|
rapport_data: Données JSON complètes du rapport
|
|
Format attendu:
|
|
- ticket_id: ID du ticket
|
|
- metadata: Métadonnées (timestamp, modèle, etc.)
|
|
- rapport_genere: Texte du rapport généré par le LLM
|
|
- ticket_analyse: Analyse du ticket
|
|
- images_analyses: Liste des analyses d'images (format privilégié)
|
|
OU
|
|
- analyse_images: Dictionnaire des analyses d'images (format alternatif)
|
|
|
|
Returns:
|
|
Contenu Markdown du rapport
|
|
"""
|
|
ticket_id = rapport_data.get("ticket_id", "")
|
|
timestamp = rapport_data.get("metadata", {}).get("timestamp", self._get_timestamp())
|
|
|
|
logger.info(f"Génération du rapport Markdown pour le ticket {ticket_id}")
|
|
|
|
# Contenu de base du rapport (partie générée par le LLM)
|
|
rapport_contenu = rapport_data.get("rapport_genere", "")
|
|
|
|
# Entête du document
|
|
markdown = f"# Rapport d'analyse du ticket #{ticket_id}\n\n"
|
|
markdown += f"*Généré le: {timestamp}*\n\n"
|
|
|
|
# Ajouter le rapport principal généré par le LLM
|
|
markdown += rapport_contenu + "\n\n"
|
|
|
|
# Section séparatrice pour les détails d'analyse
|
|
markdown += "---\n\n"
|
|
markdown += "# Détails des analyses effectuées\n\n"
|
|
|
|
# Ajouter un résumé du processus d'analyse complet
|
|
markdown += "## Processus d'analyse\n\n"
|
|
|
|
# 1. Analyse de ticket
|
|
ticket_analyse = rapport_data.get("ticket_analyse", "")
|
|
if not ticket_analyse and "analyse_json" in rapport_data:
|
|
ticket_analyse = rapport_data.get("analyse_json", "")
|
|
|
|
if ticket_analyse:
|
|
markdown += "### Étape 1: Analyse du ticket\n\n"
|
|
markdown += "L'agent d'analyse de ticket a extrait les informations suivantes du ticket d'origine:\n\n"
|
|
markdown += "<details>\n<summary>Cliquez pour voir l'analyse complète du ticket</summary>\n\n"
|
|
markdown += "```\n" + ticket_analyse + "\n```\n\n"
|
|
markdown += "</details>\n\n"
|
|
logger.info(f"Analyse du ticket ajoutée au rapport Markdown ({len(str(ticket_analyse))} caractères)")
|
|
else:
|
|
logger.warning("Aucune analyse de ticket disponible pour le rapport Markdown")
|
|
|
|
# 2. Tri des images
|
|
markdown += "### Étape 2: Tri des images\n\n"
|
|
markdown += "L'agent de tri d'images a évalué chaque image pour déterminer sa pertinence par rapport au problème client:\n\n"
|
|
|
|
# Vérifier quelle structure de données est disponible pour les images
|
|
has_analyse_images = "analyse_images" in rapport_data and rapport_data["analyse_images"]
|
|
has_images_analyses = "images_analyses" in rapport_data and isinstance(rapport_data["images_analyses"], list) and rapport_data["images_analyses"]
|
|
|
|
logger.info(f"Structure des données d'images: analyse_images={has_analyse_images}, images_analyses={has_images_analyses}")
|
|
|
|
analyse_images_data = {}
|
|
if has_analyse_images:
|
|
analyse_images_data = rapport_data["analyse_images"]
|
|
|
|
if analyse_images_data:
|
|
# Créer un tableau pour le tri des images
|
|
markdown += "| Image | Pertinence | Raison |\n"
|
|
markdown += "|-------|------------|--------|\n"
|
|
|
|
for image_path, analyse_data in analyse_images_data.items():
|
|
image_name = os.path.basename(image_path)
|
|
|
|
# Information de tri
|
|
is_relevant = "Non"
|
|
reason = "Non spécifiée"
|
|
|
|
if "sorting" in analyse_data:
|
|
sorting_data = analyse_data["sorting"]
|
|
if isinstance(sorting_data, dict):
|
|
is_relevant = "Oui" if sorting_data.get("is_relevant", False) else "Non"
|
|
reason = sorting_data.get("reason", "Non spécifiée")
|
|
|
|
markdown += f"| {image_name} | {is_relevant} | {reason} |\n"
|
|
|
|
markdown += "\n"
|
|
logger.info(f"Tableau de tri des images ajouté au rapport Markdown ({len(analyse_images_data)} images)")
|
|
elif has_images_analyses and rapport_data["images_analyses"]:
|
|
# Si nous avons les analyses d'images mais pas les données de tri, créer un tableau simplifié
|
|
markdown += "| Image | Pertinence |\n"
|
|
markdown += "|-------|------------|\n"
|
|
|
|
for img_data in rapport_data["images_analyses"]:
|
|
image_name = img_data.get("image_name", "Image inconnue")
|
|
markdown += f"| {image_name} | Oui |\n"
|
|
|
|
markdown += "\n"
|
|
logger.info(f"Tableau de tri simplifié ajouté au rapport Markdown ({len(rapport_data['images_analyses'])} images)")
|
|
else:
|
|
markdown += "*Aucune image n'a été trouvée ou analysée.*\n\n"
|
|
logger.warning("Aucune analyse d'images disponible pour le tableau de tri")
|
|
|
|
# 3. Analyse des images pertinentes
|
|
markdown += "### Étape 3: Analyse détaillée des images pertinentes\n\n"
|
|
|
|
# Traiter directement les images_analyses du rapport_data si disponible
|
|
images_pertinentes = 0
|
|
|
|
# D'abord essayer d'utiliser la liste images_analyses qui est déjà traitée
|
|
if has_images_analyses:
|
|
images_list = rapport_data["images_analyses"]
|
|
for i, img_data in enumerate(images_list, 1):
|
|
images_pertinentes += 1
|
|
image_name = img_data.get("image_name", f"Image {i}")
|
|
analyse_detail = img_data.get("analyse", "Analyse non disponible")
|
|
|
|
markdown += f"#### Image pertinente {images_pertinentes}: {image_name}\n\n"
|
|
markdown += "<details>\n<summary>Cliquez pour voir l'analyse complète de l'image</summary>\n\n"
|
|
markdown += "```\n" + analyse_detail + "\n```\n\n"
|
|
markdown += "</details>\n\n"
|
|
|
|
logger.info(f"Analyse de l'image {image_name} ajoutée au rapport Markdown (from images_analyses)")
|
|
# Sinon, traiter les données brutes d'analyse_images
|
|
elif has_analyse_images:
|
|
analyse_images_data = rapport_data["analyse_images"]
|
|
for image_path, analyse_data in analyse_images_data.items():
|
|
# Vérifier si l'image est pertinente
|
|
is_relevant = False
|
|
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
|
|
is_relevant = analyse_data["sorting"].get("is_relevant", False)
|
|
|
|
if is_relevant:
|
|
images_pertinentes += 1
|
|
image_name = os.path.basename(image_path)
|
|
|
|
# Récupérer l'analyse détaillée avec gestion des différents formats possibles
|
|
analyse_detail = "Analyse non disponible"
|
|
if "analysis" in analyse_data and analyse_data["analysis"]:
|
|
if isinstance(analyse_data["analysis"], dict):
|
|
if "analyse" in analyse_data["analysis"]:
|
|
analyse_detail = analyse_data["analysis"]["analyse"]
|
|
logger.info(f"Analyse de l'image {image_name} récupérée via analyse_data['analysis']['analyse']")
|
|
elif "error" in analyse_data["analysis"] and not analyse_data["analysis"].get("error", True):
|
|
analyse_detail = str(analyse_data["analysis"])
|
|
logger.info(f"Analyse de l'image {image_name} récupérée via str(analyse_data['analysis'])")
|
|
else:
|
|
analyse_detail = json.dumps(analyse_data["analysis"], ensure_ascii=False, indent=2)
|
|
logger.info(f"Analyse de l'image {image_name} récupérée via json.dumps")
|
|
elif isinstance(analyse_data["analysis"], str):
|
|
analyse_detail = analyse_data["analysis"]
|
|
logger.info(f"Analyse de l'image {image_name} récupérée directement (string)")
|
|
else:
|
|
logger.warning(f"Aucune analyse disponible pour l'image pertinente {image_name}")
|
|
|
|
markdown += f"#### Image pertinente {images_pertinentes}: {image_name}\n\n"
|
|
markdown += "<details>\n<summary>Cliquez pour voir l'analyse complète de l'image</summary>\n\n"
|
|
markdown += "```\n" + analyse_detail + "\n```\n\n"
|
|
markdown += "</details>\n\n"
|
|
|
|
if images_pertinentes == 0:
|
|
markdown += "*Aucune image pertinente n'a été identifiée pour ce ticket.*\n\n"
|
|
logger.warning("Aucune image pertinente identifiée pour l'analyse détaillée")
|
|
else:
|
|
logger.info(f"{images_pertinentes} images pertinentes ajoutées au rapport Markdown")
|
|
|
|
# 4. Synthèse (rapport final)
|
|
markdown += "### Étape 4: Génération du rapport de synthèse\n\n"
|
|
markdown += "L'agent de génération de rapport a synthétisé toutes les analyses précédentes pour produire le rapport ci-dessus.\n\n"
|
|
|
|
# Informations techniques
|
|
markdown += "## Informations techniques\n\n"
|
|
|
|
# Statistiques d'analyse
|
|
markdown += "### Statistiques\n\n"
|
|
|
|
total_images = 0
|
|
if has_analyse_images:
|
|
total_images = len(analyse_images_data)
|
|
elif "statistiques" in rapport_data and "total_images" in rapport_data["statistiques"]:
|
|
total_images = rapport_data["statistiques"]["total_images"]
|
|
|
|
markdown += f"- **Images analysées**: {total_images}\n"
|
|
markdown += f"- **Images pertinentes**: {images_pertinentes}\n\n"
|
|
|
|
logger.info(f"Rapport Markdown généré ({len(markdown)} caractères)")
|
|
return markdown
|
|
|
|
def _formater_prompt_pour_rapport(self, ticket_analyse, images_analyses, ticket_id):
|
|
"""
|
|
Formate le prompt pour la génération du rapport
|
|
|
|
Args:
|
|
ticket_analyse: Analyse du ticket
|
|
images_analyses: Liste des analyses d'images, format [{image_name, analyse}, ...]
|
|
ticket_id: ID du ticket
|
|
|
|
Returns:
|
|
Prompt formaté pour le LLM
|
|
"""
|
|
num_images = len(images_analyses)
|
|
logger.info(f"Formatage du prompt avec {num_images} analyses d'images")
|
|
|
|
# Inclure une vérification des données reçues
|
|
prompt = f"""Génère un rapport technique complet pour le ticket #{ticket_id}, en te basant sur les analyses suivantes.
|
|
|
|
## VÉRIFICATION DES DONNÉES REÇUES
|
|
Je vais d'abord vérifier que j'ai bien reçu les données d'analyses:
|
|
- Analyse du ticket : {"PRÉSENTE" if ticket_analyse else "MANQUANTE"}
|
|
- Analyses d'images : {"PRÉSENTES (" + str(num_images) + " images)" if num_images > 0 else "MANQUANTES"}
|
|
|
|
## ANALYSE DU TICKET
|
|
{ticket_analyse}
|
|
|
|
## ANALYSES DES IMAGES ({num_images} images analysées)
|
|
"""
|
|
|
|
# Ajouter l'analyse de chaque image
|
|
for i, img_analyse in enumerate(images_analyses, 1):
|
|
image_name = img_analyse.get("image_name", f"Image {i}")
|
|
analyse = img_analyse.get("analyse", "Analyse non disponible")
|
|
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
|
|
logger.info(f"Ajout de l'analyse de l'image {image_name} au prompt ({len(str(analyse))} caractères)")
|
|
|
|
# Ne pas répéter les instructions déjà présentes dans le system_prompt
|
|
prompt += f"""
|
|
N'oublie pas d'inclure un objet JSON structuré des échanges client/support selon le format demandé.
|
|
Assure-toi que le JSON est valide et clairement balisé avec ```json et ``` dans ta réponse.
|
|
"""
|
|
|
|
logger.info(f"Prompt formaté: {len(prompt)} caractères au total")
|
|
return prompt
|
|
|
|
def _extraire_et_traiter_json(self, texte_rapport):
|
|
"""
|
|
Extrait l'objet JSON des échanges du texte du rapport et le convertit en Markdown
|
|
|
|
Args:
|
|
texte_rapport: Texte complet du rapport généré par le LLM
|
|
|
|
Returns:
|
|
Tuple (rapport_traité, echanges_json, echanges_markdown)
|
|
"""
|
|
# Remplacer CBAD par CBAO dans tout le rapport
|
|
texte_rapport = texte_rapport.replace("CBAD", "CBAO")
|
|
|
|
# Rechercher un objet JSON dans le texte
|
|
json_match = re.search(r'```json\s*({.*?})\s*```', texte_rapport, re.DOTALL)
|
|
|
|
if not json_match:
|
|
logger.warning("Aucun JSON trouvé dans le rapport")
|
|
return texte_rapport, None, None
|
|
|
|
# Extraire le JSON et le parser
|
|
json_text = json_match.group(1)
|
|
try:
|
|
echanges_json = json.loads(json_text)
|
|
logger.info(f"JSON extrait avec succès: {len(json_text)} caractères")
|
|
|
|
# Convertir en tableau Markdown
|
|
echanges_markdown = "| Date | Émetteur | Type | Contenu | Statut |\n"
|
|
echanges_markdown += "|------|---------|------|---------|--------|\n"
|
|
|
|
if "chronologie_echanges" in echanges_json and isinstance(echanges_json["chronologie_echanges"], list):
|
|
# Pré-traitement pour vérifier les questions sans réponse
|
|
questions_sans_reponse = {}
|
|
for i, echange in enumerate(echanges_json["chronologie_echanges"]):
|
|
if echange.get("type", "").lower() == "question" and echange.get("emetteur", "").lower() == "client":
|
|
has_response = False
|
|
# Vérifier si la question a une réponse
|
|
for j in range(i+1, len(echanges_json["chronologie_echanges"])):
|
|
next_echange = echanges_json["chronologie_echanges"][j]
|
|
if next_echange.get("type", "").lower() == "réponse" and next_echange.get("emetteur", "").lower() == "support":
|
|
has_response = True
|
|
break
|
|
questions_sans_reponse[i] = not has_response
|
|
|
|
# Générer le tableau
|
|
for i, echange in enumerate(echanges_json["chronologie_echanges"]):
|
|
date = echange.get("date", "-")
|
|
emetteur = echange.get("emetteur", "-")
|
|
type_msg = echange.get("type", "-")
|
|
contenu = echange.get("contenu", "-")
|
|
|
|
# Ajouter un statut pour les questions sans réponse
|
|
statut = ""
|
|
if emetteur.lower() == "client" and type_msg.lower() == "question" and questions_sans_reponse.get(i, False):
|
|
statut = "**Sans réponse**"
|
|
|
|
echanges_markdown += f"| {date} | {emetteur} | {type_msg} | {contenu} | {statut} |\n"
|
|
|
|
# Ajouter une note si aucune réponse du support n'a été trouvée
|
|
if not any(echange.get("emetteur", "").lower() == "support" for echange in echanges_json["chronologie_echanges"]):
|
|
echanges_markdown += "\n**Note: Aucune réponse du support n'a été trouvée dans ce ticket.**\n\n"
|
|
|
|
# Remplacer le JSON dans le texte par le tableau Markdown
|
|
rapport_traite = texte_rapport.replace(json_match.group(0), echanges_markdown)
|
|
|
|
return rapport_traite, echanges_json, echanges_markdown
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Erreur lors du décodage JSON: {e}")
|
|
return texte_rapport, None, None
|
|
|
|
def _get_timestamp(self) -> str:
|
|
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S") |