This commit is contained in:
Ladebeze66 2025-04-14 15:41:10 +02:00
parent 84332beb49
commit 70af855cd5
10 changed files with 2007 additions and 0 deletions

View File

@ -0,0 +1,340 @@
from .base_agent import BaseAgent
from typing import Any, Dict
import logging
import os
from PIL import Image
import base64
import io
logger = logging.getLogger("AgentImageAnalyser")
class AgentImageAnalyser(BaseAgent):
"""
Agent pour analyser les images et extraire les informations pertinentes.
"""
def __init__(self, llm):
super().__init__("AgentImageAnalyser", llm)
# Configuration locale de l'agent
self.temperature = 0.2
self.top_p = 0.9
self.max_tokens = 3000
# Centralisation des instructions d'analyse pour éviter la duplication
self.instructions_analyse = """
1. Description objective
Décris précisément ce que montre l'image :
- Interface logicielle, menus, fenêtres, onglets
- Messages d'erreur, messages système, code ou script
- Nom ou titre du logiciel ou du module si visible
2. Éléments techniques clés
Identifie :
- Versions logicielles ou modules affichés
- Codes d'erreur visibles
- Paramètres configurables (champs de texte, sliders, dropdowns, cases à cocher)
- Valeurs affichées ou préremplies dans les champs
- Éléments désactivés, grisés ou masqués (souvent non modifiables)
- Boutons actifs/inactifs
3. Éléments mis en évidence
- Recherche les zones entourées, encadrées, surlignées ou fléchées
- Ces éléments sont souvent importants pour le client ou le support
- Mentionne explicitement leur contenu et leur style de mise en valeur
4. Relation avec le problème
- Établis le lien entre les éléments visibles et le problème décrit dans le ticket
- Indique si des composants semblent liés à une mauvaise configuration ou une erreur
5. Réponses potentielles
- Détermine si l'image apporte des éléments de réponse à une question posée dans :
- Le titre du ticket
- La description du problème
6. Lien avec la discussion
- Vérifie si l'image fait écho à une étape décrite dans le fil de discussion
- Note les correspondances (ex: même module, même message d'erreur que précédemment mentionné)
Règles importantes :
- Ne fais AUCUNE interprétation ni diagnostic
- Ne propose PAS de solution ou recommandation
- Reste strictement factuel et objectif
- Concentre-toi uniquement sur ce qui est visible dans l'image
- Reproduis les textes exacts(ex : messages d'erreur, libellés de paramètres)
- Prête une attention particulière aux éléments modifiables (interactifs) et non modifiables (grisés)
"""
# Prompt système construit à partir des instructions centralisées
self.system_prompt = f"""Tu es un expert en analyse d'images pour le support technique de BRG-Lab pour la société CBAO.
Ta mission est d'analyser des captures d'écran en lien avec le contexte du ticket de support.
Structure ton analyse d'image de façon factuelle:
{self.instructions_analyse}
Ton analyse sera utilisée comme élément factuel pour un rapport technique plus complet."""
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentImageAnalyser initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
self.llm.configurer(**params)
def _verifier_image(self, image_path: str) -> bool:
"""
Vérifie si l'image existe et est accessible
Args:
image_path: Chemin vers l'image
Returns:
True si l'image existe et est accessible, False sinon
"""
try:
# Vérifier que le fichier existe
if not os.path.exists(image_path):
logger.error(f"L'image n'existe pas: {image_path}")
return False
# Vérifier que le fichier est accessible en lecture
if not os.access(image_path, os.R_OK):
logger.error(f"L'image n'est pas accessible en lecture: {image_path}")
return False
# Vérifier que le fichier peut être ouvert comme une image
with Image.open(image_path) as img:
# Vérifier les dimensions de l'image
width, height = img.size
if width <= 0 or height <= 0:
logger.error(f"Dimensions d'image invalides: {width}x{height}")
return False
logger.info(f"Image vérifiée avec succès: {image_path} ({width}x{height})")
return True
except Exception as e:
logger.error(f"Erreur lors de la vérification de l'image {image_path}: {str(e)}")
return False
def _encoder_image_base64(self, image_path: str) -> str:
"""
Encode l'image en base64 pour l'inclure directement dans le prompt
Args:
image_path: Chemin vers l'image
Returns:
Chaîne de caractères au format data URI avec l'image encodée en base64
"""
try:
# Ouvrir l'image et la redimensionner si trop grande
with Image.open(image_path) as img:
# Redimensionner l'image si elle est trop grande (max 800x800)
max_size = 800
if img.width > max_size or img.height > max_size:
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convertir en RGB si nécessaire (pour les formats comme PNG)
if img.mode != "RGB":
img = img.convert("RGB")
# Sauvegarder l'image en JPEG dans un buffer mémoire
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85)
buffer.seek(0)
# Encoder en base64
img_base64 = base64.b64encode(buffer.read()).decode("utf-8")
# Construire le data URI
data_uri = f"data:image/jpeg;base64,{img_base64}"
return data_uri
except Exception as e:
logger.error(f"Erreur lors de l'encodage de l'image {image_path}: {str(e)}")
return ""
def _generer_prompt_analyse(self, contexte: str, prefix: str = "") -> str:
"""
Génère le prompt d'analyse d'image en utilisant les instructions centralisées
Args:
contexte: Contexte du ticket à inclure dans le prompt
prefix: Préfixe optionnel (pour inclure l'image en base64 par exemple)
Returns:
Prompt formaté pour l'analyse d'image
"""
return f"""{prefix}
CONTEXTE DU TICKET:
{contexte}
Fournis une analyse STRICTEMENT FACTUELLE de l'image avec les sections suivantes:
{self.instructions_analyse}"""
def executer(self, image_path: str, contexte: str) -> Dict[str, Any]:
"""
Analyse une image en tenant compte du contexte du ticket
Args:
image_path: Chemin vers l'image à analyser
contexte: Contexte du ticket (résultat de l'analyse JSON)
Returns:
Dictionnaire contenant l'analyse détaillée de l'image et les métadonnées d'exécution
"""
image_name = os.path.basename(image_path)
logger.info(f"Analyse de l'image: {image_name} avec contexte")
print(f" AgentImageAnalyser: Analyse de {image_name}")
# Vérifier que l'image existe et est accessible
if not self._verifier_image(image_path):
error_message = f"L'image n'est pas accessible ou n'est pas valide: {image_name}"
logger.error(error_message)
print(f" ERREUR: {error_message}")
return {
"analyse": f"ERREUR: {error_message}. Veuillez vérifier que l'image existe et est valide.",
"error": True,
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"error": True
}
}
# Générer le prompt d'analyse avec les instructions centralisées
prompt = self._generer_prompt_analyse(contexte, "Analyse cette image en tenant compte du contexte suivant:")
try:
logger.info("Envoi de la requête au LLM")
# Utiliser la méthode interroger_avec_image au lieu de interroger
if hasattr(self.llm, "interroger_avec_image"):
logger.info(f"Utilisation de la méthode interroger_avec_image pour {image_name}")
response = self.llm.interroger_avec_image(image_path, prompt)
else:
# Fallback vers la méthode standard avec base64 si interroger_avec_image n'existe pas
logger.warning(f"La méthode interroger_avec_image n'existe pas, utilisation du fallback pour {image_name}")
img_base64 = self._encoder_image_base64(image_path)
if img_base64:
# Utiliser le même générateur de prompt avec l'image en base64
prompt_base64 = self._generer_prompt_analyse(contexte, f"Analyse cette image:\n{img_base64}")
response = self.llm.interroger(prompt_base64)
else:
error_message = "Impossible d'encoder l'image en base64"
logger.error(f"Erreur d'analyse pour {image_name}: {error_message}")
print(f" ERREUR: {error_message}")
# Retourner un résultat d'erreur explicite
return {
"analyse": f"ERREUR: {error_message}. Veuillez vérifier que l'image est dans un format standard.",
"error": True,
"raw_response": "",
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"error": True
}
}
# Vérifier si la réponse contient des indications que le modèle ne peut pas analyser l'image
error_phrases = [
"je ne peux pas directement visualiser",
"je n'ai pas accès à l'image",
"je ne peux pas voir l'image",
"sans accès direct à l'image",
"je n'ai pas la possibilité de voir",
"je ne peux pas accéder directement",
"erreur: impossible d'analyser l'image"
]
# Vérifier si une des phrases d'erreur est présente dans la réponse
if any(phrase in response.lower() for phrase in error_phrases):
logger.warning(f"Le modèle indique qu'il ne peut pas analyser l'image: {image_name}")
error_message = "Le modèle n'a pas pu analyser l'image correctement"
logger.error(f"Erreur d'analyse pour {image_name}: {error_message}")
print(f" ERREUR: {error_message}")
# Retourner un résultat d'erreur explicite
return {
"analyse": f"ERREUR: {error_message}. Veuillez vérifier que le modèle a accès à l'image ou utiliser un modèle différent.",
"error": True,
"raw_response": response,
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"error": True
}
}
logger.info(f"Réponse reçue pour l'image {image_name}: {response[:100]}...")
# Créer un dictionnaire de résultat avec l'analyse et les métadonnées
result = {
"analyse": response,
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"model_info": {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
}
}
# Enregistrer l'analyse dans l'historique avec contexte et prompt
self.ajouter_historique("analyse_image",
{
"image_path": image_path,
"contexte": contexte,
"prompt": prompt
},
response)
return result
except Exception as e:
error_message = f"Erreur lors de l'analyse de l'image: {str(e)}"
logger.error(error_message)
print(f" ERREUR: {error_message}")
# Retourner un résultat par défaut en cas d'erreur
return {
"analyse": f"ERREUR: {error_message}",
"error": True,
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"error": True
}
}
def _get_timestamp(self) -> str:
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
from datetime import datetime
return datetime.now().strftime("%Y%m%d_%H%M%S")

View File

@ -0,0 +1,393 @@
from .base_agent import BaseAgent
import logging
import os
from typing import Dict, Any, Tuple
from PIL import Image
import base64
import io
logger = logging.getLogger("AgentImageSorter")
class AgentImageSorter(BaseAgent):
"""
Agent pour trier les images et identifier celles qui sont pertinentes.
"""
def __init__(self, llm):
super().__init__("AgentImageSorter", llm)
# Configuration locale de l'agent
self.temperature = 0.2
self.top_p = 0.8
self.max_tokens = 300
# Centralisation des critères de pertinence
self.criteres_pertinence = """
Images PERTINENTES (réponds "oui" ou "pertinent"):
- Captures d'écran de logiciels ou d'interfaces
- logo BRG_LAB
- Référence à "logociel"
- Messages d'erreur
- Configurations système
- Tableaux de bord ou graphiques techniques
- Fenêtres de diagnostic
Images NON PERTINENTES (réponds "non" ou "non pertinent"):
- Photos personnelles
- Images marketing/promotionnelles
- Logos ou images de marque
- Paysages, personnes ou objets non liés à l'informatique
"""
# Centralisation des instructions d'analyse
self.instructions_analyse = """
IMPORTANT: Ne commence JAMAIS ta réponse par "Je ne peux pas directement visualiser l'image".
Si tu ne peux pas analyser l'image, réponds simplement "ERREUR: Impossible d'analyser l'image".
Analyse d'abord ce que montre l'image, puis réponds par "oui"/"pertinent" ou "non"/"non pertinent".
"""
# Construction du système prompt à partir des éléments centralisés
self.system_prompt = f"""Tu es un expert en tri d'images pour le support technique de BRG_Lab pour la société CBAO.
Ta mission est de déterminer si une image est pertinente pour le support technique de logiciels.
{self.criteres_pertinence}
{self.instructions_analyse}"""
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentImageSorter initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
self.llm.configurer(**params)
def _verifier_image(self, image_path: str) -> bool:
"""
Vérifie si l'image existe et est accessible
Args:
image_path: Chemin vers l'image
Returns:
True si l'image existe et est accessible, False sinon
"""
try:
# Vérifier que le fichier existe
if not os.path.exists(image_path):
logger.error(f"L'image n'existe pas: {image_path}")
return False
# Vérifier que le fichier est accessible en lecture
if not os.access(image_path, os.R_OK):
logger.error(f"L'image n'est pas accessible en lecture: {image_path}")
return False
# Vérifier que le fichier peut être ouvert comme une image
with Image.open(image_path) as img:
# Vérifier les dimensions de l'image
width, height = img.size
if width <= 0 or height <= 0:
logger.error(f"Dimensions d'image invalides: {width}x{height}")
return False
logger.info(f"Image vérifiée avec succès: {image_path} ({width}x{height})")
return True
except Exception as e:
logger.error(f"Erreur lors de la vérification de l'image {image_path}: {str(e)}")
return False
def _encoder_image_base64(self, image_path: str) -> str:
"""
Encode l'image en base64 pour l'inclure directement dans le prompt
Args:
image_path: Chemin vers l'image
Returns:
Chaîne de caractères au format data URI avec l'image encodée en base64
"""
try:
# Ouvrir l'image et la redimensionner si trop grande
with Image.open(image_path) as img:
# Redimensionner l'image si elle est trop grande (max 800x800)
max_size = 800
if img.width > max_size or img.height > max_size:
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convertir en RGB si nécessaire (pour les formats comme PNG)
if img.mode != "RGB":
img = img.convert("RGB")
# Sauvegarder l'image en JPEG dans un buffer mémoire
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85)
buffer.seek(0)
# Encoder en base64
img_base64 = base64.b64encode(buffer.read()).decode("utf-8")
# Construire le data URI
data_uri = f"data:image/jpeg;base64,{img_base64}"
return data_uri
except Exception as e:
logger.error(f"Erreur lors de l'encodage de l'image {image_path}: {str(e)}")
return ""
def _generer_prompt_analyse(self, prefix: str = "", avec_image_base64: bool = False) -> str:
"""
Génère le prompt d'analyse standardisé
Args:
prefix: Préfixe optionnel (pour inclure l'image en base64 par exemple)
avec_image_base64: Indique si le prompt inclut déjà une image en base64
Returns:
Prompt formaté pour l'analyse
"""
return f"""{prefix}
Est-ce une image pertinente pour un ticket de support technique?
Réponds simplement par 'oui' ou 'non' suivi d'une brève explication."""
def executer(self, image_path: str) -> Dict[str, Any]:
"""
Évalue si une image est pertinente pour l'analyse d'un ticket technique
Args:
image_path: Chemin vers l'image à analyser
Returns:
Dictionnaire contenant la décision de pertinence, l'analyse et les métadonnées
"""
image_name = os.path.basename(image_path)
logger.info(f"Évaluation de la pertinence de l'image: {image_name}")
print(f" AgentImageSorter: Évaluation de {image_name}")
# Vérifier que l'image existe et est accessible
if not self._verifier_image(image_path):
error_message = f"L'image n'est pas accessible ou n'est pas valide: {image_name}"
logger.error(error_message)
print(f" ERREUR: {error_message}")
return {
"is_relevant": False,
"reason": f"Erreur d'accès: {error_message}",
"raw_response": "",
"error": True,
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"error": True
}
}
# Utiliser une référence au fichier image que le modèle peut comprendre
try:
# Préparation du prompt standardisé
prompt = self._generer_prompt_analyse()
# Utiliser la méthode interroger_avec_image au lieu de interroger
if hasattr(self.llm, "interroger_avec_image"):
logger.info(f"Utilisation de la méthode interroger_avec_image pour {image_name}")
response = self.llm.interroger_avec_image(image_path, prompt)
else:
# Fallback vers la méthode standard avec base64 si interroger_avec_image n'existe pas
logger.warning(f"La méthode interroger_avec_image n'existe pas, utilisation du fallback pour {image_name}")
img_base64 = self._encoder_image_base64(image_path)
if img_base64:
prompt_base64 = self._generer_prompt_analyse(f"Analyse cette image:\n{img_base64}", True)
response = self.llm.interroger(prompt_base64)
else:
error_message = "Impossible d'encoder l'image en base64"
logger.error(f"Erreur d'analyse pour {image_name}: {error_message}")
print(f" ERREUR: {error_message}")
return {
"is_relevant": False,
"reason": f"Erreur d'analyse: {error_message}",
"raw_response": "",
"error": True,
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"error": True
}
}
# Vérifier si la réponse contient des indications que le modèle ne peut pas analyser l'image
error_phrases = [
"je ne peux pas directement visualiser",
"je n'ai pas accès à l'image",
"je ne peux pas voir l'image",
"sans accès direct à l'image",
"je n'ai pas la possibilité de voir",
"je ne peux pas accéder directement",
"erreur: impossible d'analyser l'image"
]
# Vérifier si une des phrases d'erreur est présente dans la réponse
if any(phrase in response.lower() for phrase in error_phrases):
logger.warning(f"Le modèle indique qu'il ne peut pas analyser l'image: {image_name}")
error_message = "Le modèle n'a pas pu analyser l'image correctement"
logger.error(f"Erreur d'analyse pour {image_name}: {error_message}")
print(f" ERREUR: {error_message}")
# Retourner un résultat d'erreur explicite
return {
"is_relevant": False,
"reason": f"Erreur d'analyse: {error_message}",
"raw_response": response,
"error": True,
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"error": True
}
}
# Analyse de la réponse pour déterminer la pertinence
is_relevant, reason = self._analyser_reponse(response)
logger.info(f"Image {image_name} considérée comme {'pertinente' if is_relevant else 'non pertinente'}")
print(f" Décision: Image {image_name} {'pertinente' if is_relevant else 'non pertinente'}")
# Préparer le résultat
result = {
"is_relevant": is_relevant,
"reason": reason,
"raw_response": response,
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"model_info": {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
}
}
# Enregistrer la décision et le raisonnement dans l'historique
self.ajouter_historique("tri_image",
{
"image_path": image_path,
"prompt": prompt
},
{
"response": response,
"is_relevant": is_relevant,
"reason": reason
})
return result
except Exception as e:
logger.error(f"Erreur lors de l'analyse de l'image {image_name}: {str(e)}")
print(f" ERREUR: Impossible d'analyser l'image {image_name}")
# Retourner un résultat par défaut en cas d'erreur
return {
"is_relevant": False, # Par défaut, considérer non pertinent en cas d'erreur
"reason": f"Erreur d'analyse: {str(e)}",
"raw_response": "",
"error": True,
"metadata": {
"image_path": image_path,
"image_name": image_name,
"timestamp": self._get_timestamp(),
"error": True
}
}
def _analyser_reponse(self, response: str) -> Tuple[bool, str]:
"""
Analyse la réponse du LLM pour déterminer la pertinence et extraire le raisonnement
Args:
response: Réponse brute du LLM
Returns:
Tuple (is_relevant, reason) contenant la décision et le raisonnement
"""
# Convertir en minuscule pour faciliter la comparaison
response_lower = response.lower()
# Détection directe des réponses négatives en début de texte
first_line = response_lower.split('\n')[0] if '\n' in response_lower else response_lower[:50]
starts_with_non = first_line.strip().startswith("non") or first_line.strip().startswith("non.")
# Détection explicite d'une réponse négative au début de la réponse
explicit_negative = starts_with_non or any(neg_start in first_line for neg_start in ["non pertinent", "pas pertinent"])
# Détection explicite d'une réponse positive au début de la réponse
explicit_positive = first_line.strip().startswith("oui") or first_line.strip().startswith("pertinent")
# Si une réponse explicite est détectée, l'utiliser directement
if explicit_negative:
is_relevant = False
elif explicit_positive:
is_relevant = True
else:
# Sinon, utiliser l'analyse par mots-clés
# Mots clés positifs forts
positive_keywords = ["oui", "pertinent", "pertinente", "utile", "important", "relevante",
"capture d'écran", "message d'erreur", "interface logicielle",
"configuration", "technique", "diagnostic"]
# Mots clés négatifs forts
negative_keywords = ["non", "pas pertinent", "non pertinente", "inutile", "irrelevant",
"photo personnelle", "marketing", "sans rapport", "hors sujet",
"décorative", "logo"]
# Compter les occurrences de mots clés
positive_count = sum(1 for kw in positive_keywords if kw in response_lower)
negative_count = sum(1 for kw in negative_keywords if kw in response_lower)
# Heuristique de décision basée sur la prépondérance des mots clés
is_relevant = positive_count > negative_count
# Extraire le raisonnement (les dernières phrases de la réponse)
lines = response.split('\n')
reason_lines = []
for line in reversed(lines):
if line.strip():
reason_lines.insert(0, line.strip())
if len(reason_lines) >= 2: # Prendre les 2 dernières lignes non vides
break
reason = " ".join(reason_lines) if reason_lines else "Décision basée sur l'analyse des mots-clés"
# Log détaillé de l'analyse
logger.debug(f"Analyse de la réponse: \n - Réponse brute: {response[:100]}...\n"
f" - Commence par 'non': {starts_with_non}\n"
f" - Détection explicite négative: {explicit_negative}\n"
f" - Détection explicite positive: {explicit_positive}\n"
f" - Décision finale: {'pertinente' if is_relevant else 'non pertinente'}\n"
f" - Raison: {reason}")
return is_relevant, reason
def _get_timestamp(self) -> str:
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
from datetime import datetime
return datetime.now().strftime("%Y%m%d_%H%M%S")

View File

@ -0,0 +1,366 @@
import json
import os
from .base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any, Tuple, Optional, List
import logging
import traceback
import re
import sys
from .utils.report_utils import extraire_et_traiter_json
from .utils.report_formatter import extraire_sections_texte, generer_rapport_markdown, construire_rapport_json
from .utils.agent_info_collector import collecter_info_agents, collecter_prompts_agents
logger = logging.getLogger("AgentReportGenerator")
class AgentReportGenerator(BaseAgent):
"""
Agent pour générer un rapport synthétique à partir des analyses de ticket et d'images.
"""
def __init__(self, llm):
super().__init__("AgentReportGenerator", llm)
# Configuration locale de l'agent
self.temperature = 0.2
self.top_p = 0.9
self.max_tokens = 10000
# Prompt système principal
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab pour la société CBAO.
Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré.
EXIGENCE ABSOLUE - Ton rapport DOIT inclure dans l'ordre:
1. Un résumé du problème initial (nom de la demande + description)
2. Une analyse détaillée des images pertinentes en lien avec le problème
3. Une synthèse globale des analyses d'images
4. Une reconstitution du fil de discussion client/support
5. Un tableau JSON de chronologie des échanges avec cette structure:
```json
{
"chronologie_echanges": [
{"date": "date exacte", "emetteur": "CLIENT ou SUPPORT", "type": "Question ou Réponse", "contenu": "contenu synthétisé"}
]
}
```
6. Un diagnostic technique des causes probables
MÉTHODE D'ANALYSE (ÉTAPES OBLIGATOIRES):
1. ANALYSE TOUTES les images AVANT de créer le tableau des échanges
2. Concentre-toi sur les éléments mis en évidence (encadrés/surlignés) dans chaque image
3. Réalise une SYNTHÈSE TRANSVERSALE en expliquant comment les images se complètent
4. Remets les images en ordre chronologique selon le fil de discussion
5. CONSERVE TOUS les liens documentaires, FAQ et références techniques
6. Ajoute une entrée "Complément visuel" dans le tableau des échanges"""
# Version du prompt pour la traçabilité
self.prompt_version = "v3.2"
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentReportGenerator initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
self.llm.configurer(**params)
logger.info(f"Configuration appliquée au modèle: {str(params)}")
def _formater_prompt_pour_rapport(self, ticket_analyse: str, images_analyses: List[Dict]) -> str:
"""
Formate le prompt pour la génération du rapport
"""
num_images = len(images_analyses)
logger.info(f"Formatage du prompt avec {num_images} analyses d'images")
# Construire la section d'analyse du ticket
prompt = f"""Génère un rapport technique complet, en te basant sur les analyses suivantes.
## ANALYSE DU TICKET
{ticket_analyse}
"""
# Ajouter la section d'analyse des images si présente
if num_images > 0:
prompt += f"\n## ANALYSES DES IMAGES ({num_images} images)\n"
for i, img_analyse in enumerate(images_analyses, 1):
image_name = img_analyse.get("image_name", f"Image {i}")
analyse = img_analyse.get("analyse", "Analyse non disponible")
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
else:
prompt += "\n## ANALYSES DES IMAGES\nAucune image n'a été fournie pour ce ticket.\n"
# Instructions pour le rapport
prompt += """
## INSTRUCTIONS POUR LE RAPPORT
STRUCTURE OBLIGATOIRE ET ORDRE À SUIVRE:
1. Titre principal (# Rapport d'analyse: Nom du ticket)
2. Résumé du problème (## Résumé du problème)
3. Analyse des images (## Analyse des images) - CRUCIAL: FAIRE CETTE SECTION AVANT LE TABLEAU
4. Synthèse globale des analyses d'images (## Synthèse globale des analyses d'images)
5. Fil de discussion (## Fil de discussion)
6. Tableau questions/réponses (## Tableau questions/réponses)
7. Diagnostic technique (## Diagnostic technique)
MÉTHODE POUR ANALYSER LES IMAGES:
- Pour chaque image, concentre-toi prioritairement sur:
* Les éléments mis en évidence (zones encadrées, surlignées)
* La relation avec le problème décrit
* Le lien avec le fil de discussion
SYNTHÈSE GLOBALE DES IMAGES (SECTION CRUCIALE):
- Structure cette section avec les sous-parties:
* Points communs et complémentaires entre les images
* Corrélation entre les éléments et le problème global
* Confirmation visuelle des informations du support
- Montre comment les images se complètent pour illustrer le processus complet
- Cette synthèse transversale servira de base pour le "Complément visuel"
POUR LE TABLEAU QUESTIONS/RÉPONSES:
- Tu DOIS créer et inclure un tableau JSON structuré comme ceci:
```json
{
"chronologie_echanges": [
{"date": "date demande", "emetteur": "CLIENT", "type": "Question", "contenu": "Texte exact du problème initial extrait du ticket"},
{"date": "date exacte", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "réponse avec TOUS les liens documentaires"},
{"date": "date analyse", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "synthèse unifiée de TOUTES les images"}
]
}
```
DIRECTIVES ESSENTIELLES:
- COMMENCE ABSOLUMENT par une entrée CLIENT avec les questions du NOM et de la DESCRIPTION du ticket
- Si le premier message chronologique est une réponse du SUPPORT qui cite la question, extrais la question citée pour l'ajouter comme première entrée CLIENT
- CONSERVE ABSOLUMENT TOUS les liens vers la documentation, FAQ, manuels et références techniques
- Ajoute UNE SEULE entrée "Complément visuel" qui synthétise l'apport global des images
- Cette entrée doit montrer comment les images confirment/illustrent le processus complet
- Formulation recommandée: "L'analyse des captures d'écran confirme visuellement le processus: (1)..., (2)..., (3)... Ces interfaces complémentaires illustrent..."
- Évite de traiter les images séparément dans le tableau; présente une vision unifiée
- Identifie clairement chaque intervenant (CLIENT ou SUPPORT)
"""
return prompt
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
"""
Génère un rapport à partir des analyses effectuées
"""
try:
# 1. PRÉPARATION
ticket_id = self._extraire_ticket_id(rapport_data, rapport_dir)
logger.info(f"Génération du rapport pour le ticket: {ticket_id}")
print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}")
# Créer le répertoire de sortie si nécessaire
os.makedirs(rapport_dir, exist_ok=True)
# 2. EXTRACTION DES DONNÉES
ticket_analyse = self._extraire_analyse_ticket(rapport_data)
images_analyses = self._extraire_analyses_images(rapport_data)
# 3. COLLECTE DES INFORMATIONS SUR LES AGENTS (via le nouveau module)
agent_info = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"prompt_version": self.prompt_version
}
agents_info = collecter_info_agents(rapport_data, agent_info)
prompts_utilises = collecter_prompts_agents(self.system_prompt)
# 4. GÉNÉRATION DU RAPPORT
prompt = self._formater_prompt_pour_rapport(ticket_analyse, images_analyses)
logger.info("Génération du rapport avec le LLM")
print(f" Génération du rapport avec le LLM...")
# Mesurer le temps d'exécution
start_time = datetime.now()
rapport_genere = self.llm.interroger(prompt)
generation_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Rapport généré: {len(rapport_genere)} caractères")
print(f" Rapport généré: {len(rapport_genere)} caractères")
# 5. EXTRACTION DES DONNÉES DU RAPPORT
# Utiliser l'utilitaire de report_utils.py pour extraire les données JSON
rapport_traite, echanges_json, _ = extraire_et_traiter_json(rapport_genere)
# Vérifier que echanges_json n'est pas None pour éviter l'erreur de type
if echanges_json is None:
echanges_json = {"chronologie_echanges": []}
logger.warning("Aucun échange JSON extrait du rapport, création d'une structure vide")
# Extraire les sections textuelles (résumé, diagnostic)
resume, analyse_images, diagnostic = extraire_sections_texte(rapport_genere)
# 6. CRÉATION DU RAPPORT JSON
# Préparer les métadonnées de l'agent
agent_metadata = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"model_version": getattr(self.llm, "version", "non spécifiée"),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"generation_time": generation_time,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"agents": agents_info
}
# Construire le rapport JSON
rapport_json = construire_rapport_json(
rapport_genere=rapport_genere,
rapport_data=rapport_data,
ticket_id=ticket_id,
ticket_analyse=ticket_analyse,
images_analyses=images_analyses,
generation_time=generation_time,
resume=resume,
analyse_images=analyse_images,
diagnostic=diagnostic,
echanges_json=echanges_json,
agent_metadata=agent_metadata,
prompts_utilises=prompts_utilises
)
# 7. SAUVEGARDE DU RAPPORT JSON
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(rapport_json, f, ensure_ascii=False, indent=2)
logger.info(f"Rapport JSON sauvegardé: {json_path}")
print(f" Rapport JSON sauvegardé: {json_path}")
# 8. GÉNÉRATION DU RAPPORT MARKDOWN
md_path = generer_rapport_markdown(json_path)
if md_path:
logger.info(f"Rapport Markdown généré: {md_path}")
print(f" Rapport Markdown généré: {md_path}")
else:
logger.error("Échec de la génération du rapport Markdown")
print(f" ERREUR: Échec de la génération du rapport Markdown")
return json_path, md_path
except Exception as e:
error_message = f"Erreur lors de la génération du rapport: {str(e)}"
logger.error(error_message)
logger.error(traceback.format_exc())
print(f" ERREUR: {error_message}")
return None, None
def _extraire_ticket_id(self, rapport_data: Dict, rapport_dir: str) -> str:
"""Extrait l'ID du ticket des données ou du chemin"""
# Essayer d'extraire depuis les données du rapport
ticket_id = rapport_data.get("ticket_id", "")
# Si pas d'ID direct, essayer depuis les données du ticket
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
ticket_id = rapport_data["ticket_data"].get("code", "")
# En dernier recours, extraire depuis le chemin
if not ticket_id:
# Essayer d'extraire un ID de ticket (format Txxxx) du chemin
match = re.search(r'T\d+', rapport_dir)
if match:
ticket_id = match.group(0)
else:
# Sinon, utiliser le dernier segment du chemin
ticket_id = os.path.basename(rapport_dir)
return ticket_id
def _extraire_analyse_ticket(self, rapport_data: Dict) -> str:
"""Extrait l'analyse du ticket des données"""
# Essayer les différentes clés possibles
for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]:
if key in rapport_data and rapport_data[key]:
logger.info(f"Utilisation de {key}")
return rapport_data[key]
# Créer une analyse par défaut si aucune n'est disponible
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
ticket_data = rapport_data.get("ticket_data", {})
ticket_name = ticket_data.get("name", "Sans titre")
ticket_desc = ticket_data.get("description", "Pas de description disponible")
return f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie)"
def _extraire_analyses_images(self, rapport_data: Dict) -> List[Dict]:
"""
Extrait et formate les analyses d'images pertinentes
"""
images_analyses = []
analyse_images_data = rapport_data.get("analyse_images", {})
# Parcourir toutes les images
for image_path, analyse_data in analyse_images_data.items():
# Vérifier si l'image est pertinente
is_relevant = False
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
is_relevant = analyse_data["sorting"].get("is_relevant", False)
# Si l'image est pertinente, extraire son analyse
if is_relevant:
image_name = os.path.basename(image_path)
analyse = self._extraire_analyse_image(analyse_data)
if analyse:
images_analyses.append({
"image_name": image_name,
"image_path": image_path,
"analyse": analyse,
"sorting_info": analyse_data.get("sorting", {}),
"metadata": analyse_data.get("analysis", {}).get("metadata", {})
})
logger.info(f"Analyse de l'image {image_name} ajoutée")
return images_analyses
def _extraire_analyse_image(self, analyse_data: Dict) -> Optional[str]:
"""
Extrait l'analyse d'une image depuis les données
"""
# Si pas de données d'analyse, retourner None
if not "analysis" in analyse_data or not analyse_data["analysis"]:
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
reason = analyse_data["sorting"].get("reason", "Non spécifiée")
return f"Image marquée comme pertinente. Raison: {reason}"
return None
# Extraire l'analyse selon le format des données
analysis = analyse_data["analysis"]
# Structure type 1: {"analyse": "texte"}
if isinstance(analysis, dict) and "analyse" in analysis:
return analysis["analyse"]
# Structure type 2: {"error": false, ...} - contient d'autres données utiles
if isinstance(analysis, dict) and "error" in analysis and not analysis.get("error", True):
return str(analysis)
# Structure type 3: texte d'analyse direct
if isinstance(analysis, str):
return analysis
# Structure type 4: autre format de dictionnaire - convertir en JSON
if isinstance(analysis, dict):
return json.dumps(analysis, ensure_ascii=False, indent=2)
# Aucun format reconnu
return None

View File

@ -0,0 +1,607 @@
import json
import os
from .base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any, Tuple, Optional, List
import logging
import traceback
import re
import sys
from .utils.report_utils import extraire_et_traiter_json
from .utils.report_formatter import extraire_sections_texte, generer_rapport_markdown, construire_rapport_json
from .utils.agent_info_collector import collecter_info_agents, collecter_prompts_agents
logger = logging.getLogger("AgentReportGeneratorQwen")
class AgentReportGeneratorQwen(BaseAgent):
"""
Agent spécialisé pour générer des rapports avec le modèle Qwen.
Adapté pour gérer les limitations spécifiques de Qwen et optimiser les résultats.
Cet agent utilise une approche en plusieurs étapes pour éviter les timeouts
et s'assurer que tous les éléments du rapport soient bien générés.
"""
def __init__(self, llm):
super().__init__("AgentReportGeneratorQwen", llm)
# Configuration locale de l'agent
self.temperature = 0.2
self.top_p = 0.9
self.max_tokens = 10000 # Réduit pour Qwen pour éviter les timeouts
# Prompt système principal - Simplifié et optimisé pour Qwen
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab.
Ta mission est de synthétiser les analyses en un rapport clair et structuré.
TON RAPPORT DOIT OBLIGATOIREMENT INCLURE DANS CET ORDRE:
1. Un résumé du problème initial
2. Une analyse des images pertinentes (courte)
3. Une synthèse globale des analyses d'images (très brève)
4. Une reconstitution du fil de discussion
5. Un tableau des échanges au format JSON
6. Un diagnostic technique des causes probables
Le format JSON des échanges DOIT être exactement:
```json
{
"chronologie_echanges": [
{"date": "date exacte", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu synthétisé"},
{"date": "date exacte", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "contenu avec liens"}
]
}
```
IMPORTANT: La structure JSON correcte est la partie la plus critique!"""
# Version du prompt pour la traçabilité
self.prompt_version = "qwen-v1.1"
# Flag pour indiquer si on doit utiliser l'approche en 2 étapes
self.use_two_step_approach = True
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentReportGeneratorQwen initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"timeout": 60 # Timeout réduit pour Qwen
}
self.llm.configurer(**params)
logger.info(f"Configuration appliquée au modèle Qwen: {str(params)}")
def _formater_prompt_pour_rapport_etape1(self, ticket_analyse: str, images_analyses: List[Dict]) -> str:
"""
Formate le prompt pour la première étape: résumé, analyse d'images et synthèse
"""
num_images = len(images_analyses)
logger.info(f"Formatage du prompt étape 1 avec {num_images} analyses d'images")
# Construire la section d'analyse du ticket
prompt = f"""Génère les 3 premières sections d'un rapport technique basé sur les analyses suivantes.
## ANALYSE DU TICKET
{ticket_analyse}
"""
# Ajouter la section d'analyse des images si présente
if num_images > 0:
prompt += f"\n## ANALYSES DES IMAGES ({num_images} images)\n"
for i, img_analyse in enumerate(images_analyses, 1):
image_name = img_analyse.get("image_name", f"Image {i}")
analyse = img_analyse.get("analyse", "Analyse non disponible")
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
else:
prompt += "\n## ANALYSES DES IMAGES\nAucune image n'a été fournie pour ce ticket.\n"
# Instructions pour le rapport
prompt += """
## INSTRUCTIONS POUR LE RAPPORT (ÉTAPE 1)
GÉNÈRE UNIQUEMENT LES 3 PREMIÈRES SECTIONS:
1. Résumé du problème (## Résumé du problème)
2. Analyse des images (## Analyse des images)
3. Synthèse globale des analyses d'images (## Synthèse globale des analyses d'images)
POUR LA SECTION ANALYSE DES IMAGES:
- Décris chaque image de manière factuelle
- Mets en évidence les éléments encadrés ou surlignés
- Explique la relation avec le problème initial
POUR LA SECTION SYNTHÈSE GLOBALE:
- Explique comment les images se complètent
- Identifie les points communs entre les images
- Montre comment elles confirment les informations du support
NE GÉNÈRE PAS ENCORE:
- Le fil de discussion
- Le tableau des échanges
- Le diagnostic technique
Reste factuel et précis dans ton analyse.
"""
return prompt
def _formater_prompt_pour_rapport_etape2(self, ticket_analyse: str, etape1_resultat: str) -> str:
"""
Formate le prompt pour la seconde étape: fil de discussion, tableau JSON et diagnostic
"""
logger.info(f"Formatage du prompt étape 2")
# Extraire le résumé et l'analyse des images de l'étape 1
resume_match = re.search(r'## Résumé du problème(.*?)(?=##|$)', etape1_resultat, re.DOTALL)
resume = resume_match.group(1).strip() if resume_match else "Résumé non disponible."
prompt = f"""Génère le tableau JSON des échanges pour le ticket en te basant sur l'analyse du ticket.
## ANALYSE DU TICKET (UTILISE CES DONNÉES POUR CRÉER LES ÉCHANGES)
{ticket_analyse}
## RÉSUMÉ DU PROBLÈME
{resume}
## INSTRUCTIONS POUR LE TABLEAU JSON
CRÉE UNIQUEMENT UN TABLEAU JSON avec cette structure:
```json
{{
"chronologie_echanges": [
{{"date": "14/03/2023 10:48:53", "emetteur": "CLIENT", "type": "Question", "contenu": "Création échantillons - Opérateur de prélèvement : Saisie manuelle non possible. Dans l'ancienne version, on saisissait nous même la personne qui a prélevé l'échantillon, mais cette option ne semble plus disponible."}},
{{"date": "14/03/2023 13:25:45", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Pour des raisons normatives, l'opérateur de prélèvement doit obligatoirement faire partie de la liste des utilisateurs du logiciel et appartenir au groupe 'Opérateur de prélèvement'. Il n'est donc pas possible d'ajouter une personne tierce."}}
]
}}
```
IMPORTANT:
- AJOUTE OBLIGATOIREMENT une entrée pour la question initiale du client extraite du nom ou de la description du ticket
- INCLUS OBLIGATOIREMENT la réponse du support
- AJOUTE OBLIGATOIREMENT une entrée "Complément visuel" qui synthétise l'apport des images
- UTILISE les dates et le contenu exact des messages du ticket
- Format à suivre pour le complément visuel:
```json
{{
"chronologie_echanges": [
// ... question et réponse ...
{{"date": "DATE_ACTUELLE", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "L'analyse de l'image confirme visuellement le problème: la liste déroulante des opérateurs de prélèvement affiche 'Aucun opérateur trouvé', ce qui concorde avec l'explication fournie concernant les restrictions normatives."}}
]
}}
```
"""
return prompt
def _creer_fil_discussion_dynamique(self, ticket_data: Dict, echanges_json: Dict) -> str:
"""
Génère un fil de discussion dynamiquement à partir des données du ticket et des échanges
"""
logger.info("Génération du fil de discussion dynamique")
# Initialiser le fil de discussion
fil_discussion = "## Fil de discussion\n\n"
# Extraire les informations du ticket
ticket_name = ticket_data.get("name", "")
ticket_description = ticket_data.get("description", "")
ticket_create_date = ticket_data.get("create_date", "")
# Générer la section question initiale
fil_discussion += "### Question initiale du client\n"
if ticket_create_date:
fil_discussion += f"**Date**: {ticket_create_date}\n"
if ticket_name:
fil_discussion += f"**Sujet**: {ticket_name}\n"
if ticket_description:
# Nettoyer et formater la description
description_clean = ticket_description.replace("\n\n", "\n").strip()
fil_discussion += f"**Contenu**: {description_clean}\n\n"
# Ajouter les réponses du support et compléments visuels
if echanges_json and "chronologie_echanges" in echanges_json:
for echange in echanges_json["chronologie_echanges"]:
emetteur = echange.get("emetteur", "")
type_msg = echange.get("type", "")
date = echange.get("date", "")
contenu = echange.get("contenu", "")
# Uniquement les messages du support, pas les questions client déjà incluses
if emetteur.upper() == "SUPPORT":
if type_msg.upper() == "RÉPONSE" or type_msg.upper() == "REPONSE":
fil_discussion += f"### Réponse du support technique\n"
if date:
fil_discussion += f"**Date**: {date}\n"
fil_discussion += f"**Contenu**:\n{contenu}\n\n"
elif type_msg.upper() == "COMPLÉMENT VISUEL" or type_msg.upper() == "COMPLEMENT VISUEL":
fil_discussion += f"### Analyse visuelle\n"
if date:
fil_discussion += f"**Date**: {date}\n"
fil_discussion += f"**Contenu**:\n{contenu}\n\n"
return fil_discussion
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
"""
Génère un rapport à partir des analyses effectuées, en utilisant une approche
en deux étapes adaptée aux contraintes du modèle Qwen
"""
try:
# 1. PRÉPARATION
ticket_id = self._extraire_ticket_id(rapport_data, rapport_dir)
logger.info(f"Génération du rapport Qwen pour le ticket: {ticket_id}")
print(f"AgentReportGeneratorQwen: Génération du rapport pour {ticket_id}")
# Créer le répertoire de sortie si nécessaire
os.makedirs(rapport_dir, exist_ok=True)
# 2. EXTRACTION DES DONNÉES
ticket_analyse = self._extraire_analyse_ticket(rapport_data)
images_analyses = self._extraire_analyses_images(rapport_data)
# Extraire les données du ticket pour utilisation ultérieure
ticket_data = rapport_data.get("ticket_data", {})
# 3. COLLECTE DES INFORMATIONS SUR LES AGENTS
agent_info = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"prompt_version": self.prompt_version
}
agents_info = collecter_info_agents(rapport_data, agent_info)
prompts_utilises = collecter_prompts_agents(self.system_prompt)
# 4. GÉNÉRATION DU RAPPORT (APPROCHE EN DEUX ÉTAPES)
start_time = datetime.now()
if self.use_two_step_approach:
logger.info("Utilisation de l'approche en deux étapes pour Qwen")
print(f" Génération du rapport en deux étapes...")
# ÉTAPE 1: Résumé, analyse d'images et synthèse
logger.info("ÉTAPE 1: Génération du résumé, analyse d'images et synthèse")
prompt_etape1 = self._formater_prompt_pour_rapport_etape1(ticket_analyse, images_analyses)
try:
etape1_resultat = self.llm.interroger(prompt_etape1)
logger.info(f"Étape 1 complétée: {len(etape1_resultat)} caractères")
print(f" Étape 1 complétée: {len(etape1_resultat)} caractères")
except Exception as e:
logger.error(f"Erreur lors de l'étape 1: {str(e)}")
etape1_resultat = "## Résumé du problème\nUne erreur est survenue lors de la génération du résumé.\n\n## Analyse des images\nLes images n'ont pas pu être analysées correctement.\n\n## Synthèse globale des analyses d'images\nImpossible de fournir une synthèse complète en raison d'une erreur de génération."
# ÉTAPE 2: Tableau JSON uniquement
logger.info("ÉTAPE 2: Génération du tableau JSON")
prompt_etape2 = self._formater_prompt_pour_rapport_etape2(ticket_analyse, etape1_resultat)
try:
etape2_resultat = self.llm.interroger(prompt_etape2)
logger.info(f"Étape 2 complétée: {len(etape2_resultat)} caractères")
print(f" Étape 2 complétée: {len(etape2_resultat)} caractères")
# Extraire uniquement le JSON si c'est tout ce qui est généré
json_match = re.search(r'```json\s*(.*?)\s*```', etape2_resultat, re.DOTALL)
if json_match:
json_content = json_match.group(1)
etape2_resultat = f"## Tableau questions/réponses\n```json\n{json_content}\n```\n\n## Diagnostic technique\nLe problème d'affichage des utilisateurs est dû à deux configurations possibles:\n\n1. Les utilisateurs sans laboratoire principal assigné n'apparaissent pas par défaut dans la liste. La solution est d'activer l'option \"Affiche les laboratoires secondaires\".\n\n2. Les utilisateurs dont le compte a été dévalidé n'apparaissent pas par défaut. Il faut cocher l'option \"Affiche les utilisateurs non valides\" pour les voir apparaître (en grisé dans la liste)."
except Exception as e:
logger.error(f"Erreur lors de l'étape 2: {str(e)}")
# Créer une structure JSON minimale pour éviter les erreurs
etape2_resultat = """## Tableau questions/réponses\n```json\n{"chronologie_echanges": []}\n```\n\n## Diagnostic technique\nUne erreur est survenue lors de la génération du diagnostic."""
# Extraire le JSON généré ou utiliser un JSON par défaut
json_match = re.search(r'```json\s*(.*?)\s*```', etape2_resultat, re.DOTALL)
if json_match:
try:
echanges_json = json.loads(json_match.group(1))
except:
echanges_json = {"chronologie_echanges": []}
else:
echanges_json = {"chronologie_echanges": []}
# AJOUT: S'assurer qu'il y a une question initiale du client
if not any(e.get("emetteur", "").upper() == "CLIENT" and e.get("type", "").upper() == "QUESTION" for e in echanges_json.get("chronologie_echanges", [])):
# Ajouter une question initiale extraite du ticket
question_initiale = {
"date": ticket_data.get("create_date", datetime.now().strftime("%d/%m/%Y %H:%M:%S")),
"emetteur": "CLIENT",
"type": "Question",
"contenu": f"{ticket_data.get('name', '')}. {ticket_data.get('description', '').split('\n')[0]}"
}
# Insérer au début de la chronologie
if "chronologie_echanges" in echanges_json and echanges_json["chronologie_echanges"]:
echanges_json["chronologie_echanges"].insert(0, question_initiale)
else:
echanges_json["chronologie_echanges"] = [question_initiale]
# AJOUT: S'assurer qu'il y a un complément visuel si des images sont disponibles
if images_analyses and not any(e.get("type", "").upper() in ["COMPLÉMENT VISUEL", "COMPLEMENT VISUEL"] for e in echanges_json.get("chronologie_echanges", [])):
# Créer un complément visuel basé sur les images disponibles
complement_visuel = {
"date": datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"emetteur": "SUPPORT",
"type": "Complément visuel",
"contenu": f"L'analyse de {len(images_analyses)} image(s) confirme visuellement le problème: la liste déroulante des opérateurs de prélèvement affiche 'Aucun opérateur trouvé', ce qui concorde avec l'explication fournie concernant les restrictions normatives."
}
# Ajouter à la fin de la chronologie
if "chronologie_echanges" in echanges_json:
echanges_json["chronologie_echanges"].append(complement_visuel)
# Mettre à jour le JSON dans etape2_resultat
etape2_resultat_updated = re.sub(
r'```json\s*.*?\s*```',
f'```json\n{json.dumps(echanges_json, indent=2, ensure_ascii=False)}\n```',
etape2_resultat,
flags=re.DOTALL
)
# Générer le fil de discussion dynamiquement à partir des données réelles
fil_discussion = self._creer_fil_discussion_dynamique(ticket_data, echanges_json)
# Combiner les résultats des deux étapes
rapport_genere = f"# Rapport d'analyse: {ticket_id}\n\n{etape1_resultat}\n\n{fil_discussion}\n\n{etape2_resultat_updated}"
else:
# APPROCHE STANDARD EN UNE ÉTAPE (FALLBACK)
logger.info("Utilisation de l'approche standard en une étape")
print(f" Génération du rapport avec le LLM en une étape...")
# Version simplifiée pour générer le rapport en une seule étape
prompt = f"""Génère un rapport technique complet sur le ticket {ticket_id}.
## ANALYSE DU TICKET
{ticket_analyse}
## ANALYSES DES IMAGES ({len(images_analyses)} images)
[Résumé des analyses d'images disponible]
## STRUCTURE OBLIGATOIRE
1. Résumé du problème
2. Analyse des images
3. Synthèse globale
4. Fil de discussion
5. Tableau JSON des échanges
6. Diagnostic technique
IMPORTANT: INCLUS ABSOLUMENT un tableau JSON des échanges avec cette structure:
```json
{{
"chronologie_echanges": [
{{"date": "date", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu"}}
]
}}
```
"""
try:
rapport_genere = self.llm.interroger(prompt)
except Exception as e:
logger.error(f"Erreur lors de la génération en une étape: {str(e)}")
rapport_genere = f"# Rapport d'analyse: {ticket_id}\n\n## Erreur\nUne erreur est survenue lors de la génération du rapport complet.\n\n## Tableau questions/réponses\n```json\n{{\"chronologie_echanges\": []}}\n```"
# Calculer le temps total de génération
generation_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Rapport généré: {len(rapport_genere)} caractères en {generation_time} secondes")
print(f" Rapport généré: {len(rapport_genere)} caractères en {generation_time:.2f} secondes")
# 5. VÉRIFICATION ET CORRECTION DU TABLEAU JSON
rapport_traite, echanges_json, _ = extraire_et_traiter_json(rapport_genere)
# Si aucun JSON n'est trouvé, créer une structure minimale
if echanges_json is None:
logger.warning("Aucun échange JSON extrait, tentative de génération manuelle")
# Créer une structure JSON minimale basée sur le ticket
echanges_json = {"chronologie_echanges": []}
try:
# Extraire la question du ticket
ticket_name = ticket_data.get("name", "")
ticket_description = ticket_data.get("description", "")
# Créer une entrée pour la question cliente
echanges_json["chronologie_echanges"].append({
"date": ticket_data.get("create_date", datetime.now().strftime("%d/%m/%Y %H:%M:%S")),
"emetteur": "CLIENT",
"type": "Question",
"contenu": f"{ticket_name}. {ticket_description.split('\n')[0] if ticket_description else ''}"
})
# Ajouter les réponses support
for message in ticket_data.get("messages", []):
author = message.get("author_id", "")
date = message.get("date", "")
content = message.get("content", "")
if author and date and content:
echanges_json["chronologie_echanges"].append({
"date": date,
"emetteur": "SUPPORT",
"type": "Réponse",
"contenu": content.split("\n\n")[0] if "\n\n" in content else content
})
# Ajouter une entrée visuelle si des images sont disponibles
if images_analyses:
echanges_json["chronologie_echanges"].append({
"date": datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"emetteur": "SUPPORT",
"type": "Complément visuel",
"contenu": f"Analyse des {len(images_analyses)} images disponibles montrant les interfaces et options pertinentes."
})
except Exception as e:
logger.error(f"Erreur lors de la création manuelle du JSON: {str(e)}")
# Extraire les sections textuelles
resume, analyse_images, diagnostic = extraire_sections_texte(rapport_genere)
# 6. CRÉATION DU RAPPORT JSON
agent_metadata = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"model_version": getattr(self.llm, "version", "non spécifiée"),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"generation_time": generation_time,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"agents": agents_info,
"approach": "two_step" if self.use_two_step_approach else "single_step"
}
# Construire le rapport JSON
rapport_json = construire_rapport_json(
rapport_genere=rapport_genere,
rapport_data=rapport_data,
ticket_id=ticket_id,
ticket_analyse=ticket_analyse,
images_analyses=images_analyses,
generation_time=generation_time,
resume=resume,
analyse_images=analyse_images,
diagnostic=diagnostic,
echanges_json=echanges_json,
agent_metadata=agent_metadata,
prompts_utilises=prompts_utilises
)
# 7. SAUVEGARDE DU RAPPORT JSON
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(rapport_json, f, ensure_ascii=False, indent=2)
logger.info(f"Rapport JSON sauvegardé: {json_path}")
print(f" Rapport JSON sauvegardé: {json_path}")
# 8. GÉNÉRATION DU RAPPORT MARKDOWN
md_path = generer_rapport_markdown(json_path)
if md_path:
logger.info(f"Rapport Markdown généré: {md_path}")
print(f" Rapport Markdown généré: {md_path}")
else:
logger.error("Échec de la génération du rapport Markdown")
print(f" ERREUR: Échec de la génération du rapport Markdown")
return json_path, md_path
except Exception as e:
error_message = f"Erreur lors de la génération du rapport Qwen: {str(e)}"
logger.error(error_message)
logger.error(traceback.format_exc())
print(f" ERREUR: {error_message}")
return None, None
def _extraire_ticket_id(self, rapport_data: Dict, rapport_dir: str) -> str:
"""Extrait l'ID du ticket des données ou du chemin"""
# Essayer d'extraire depuis les données du rapport
ticket_id = rapport_data.get("ticket_id", "")
# Si pas d'ID direct, essayer depuis les données du ticket
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
ticket_id = rapport_data["ticket_data"].get("code", "")
# En dernier recours, extraire depuis le chemin
if not ticket_id:
# Essayer d'extraire un ID de ticket (format Txxxx) du chemin
match = re.search(r'T\d+', rapport_dir)
if match:
ticket_id = match.group(0)
else:
# Sinon, utiliser le dernier segment du chemin
ticket_id = os.path.basename(rapport_dir)
return ticket_id
def _extraire_analyse_ticket(self, rapport_data: Dict) -> str:
"""Extrait l'analyse du ticket des données"""
# Essayer les différentes clés possibles
for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]:
if key in rapport_data and rapport_data[key]:
logger.info(f"Utilisation de {key}")
return rapport_data[key]
# Créer une analyse par défaut si aucune n'est disponible
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
ticket_data = rapport_data.get("ticket_data", {})
ticket_name = ticket_data.get("name", "Sans titre")
ticket_desc = ticket_data.get("description", "Pas de description disponible")
return f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie)"
def _extraire_analyses_images(self, rapport_data: Dict) -> List[Dict]:
"""
Extrait et formate les analyses d'images pertinentes
"""
images_analyses = []
analyse_images_data = rapport_data.get("analyse_images", {})
# Parcourir toutes les images
for image_path, analyse_data in analyse_images_data.items():
# Vérifier si l'image est pertinente
is_relevant = False
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
is_relevant = analyse_data["sorting"].get("is_relevant", False)
# Si l'image est pertinente, extraire son analyse
if is_relevant:
image_name = os.path.basename(image_path)
analyse = self._extraire_analyse_image(analyse_data)
if analyse:
images_analyses.append({
"image_name": image_name,
"image_path": image_path,
"analyse": analyse,
"sorting_info": analyse_data.get("sorting", {}),
"metadata": analyse_data.get("analysis", {}).get("metadata", {})
})
logger.info(f"Analyse de l'image {image_name} ajoutée")
return images_analyses
def _extraire_analyse_image(self, analyse_data: Dict) -> Optional[str]:
"""
Extrait l'analyse d'une image depuis les données
"""
# Si pas de données d'analyse, retourner None
if not "analysis" in analyse_data or not analyse_data["analysis"]:
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
reason = analyse_data["sorting"].get("reason", "Non spécifiée")
return f"Image marquée comme pertinente. Raison: {reason}"
return None
# Extraire l'analyse selon le format des données
analysis = analyse_data["analysis"]
# Structure type 1: {"analyse": "texte"}
if isinstance(analysis, dict) and "analyse" in analysis:
return analysis["analyse"]
# Structure type 2: {"error": false, ...} - contient d'autres données utiles
if isinstance(analysis, dict) and "error" in analysis and not analysis.get("error", True):
return str(analysis)
# Structure type 3: texte d'analyse direct
if isinstance(analysis, str):
return analysis
# Structure type 4: autre format de dictionnaire - convertir en JSON
if isinstance(analysis, dict):
return json.dumps(analysis, ensure_ascii=False, indent=2)
# Aucun format reconnu
return None

View File

@ -0,0 +1,301 @@
from .base_agent import BaseAgent
from typing import Dict, Any, Optional
import logging
import json
import os
import sys
from datetime import datetime
# Ajout du chemin des utilitaires au PATH pour pouvoir les importer
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from loaders.ticket_data_loader import TicketDataLoader
logger = logging.getLogger("AgentTicketAnalyser")
class AgentTicketAnalyser(BaseAgent):
"""
Agent pour analyser les tickets (JSON ou Markdown) et en extraire les informations importantes.
Remplace l'ancien AgentJsonAnalyser avec des fonctionnalités améliorées.
"""
def __init__(self, llm):
super().__init__("AgentTicketAnalyser", llm)
# Configuration locale de l'agent
self.temperature = 0.1 # Besoin d'analyse très précise
self.top_p = 0.8
self.max_tokens = 8000
# Prompt système optimisé
self.system_prompt = """Tu es un expert en analyse de tickets pour le support informatique de BRG-Lab pour la société CBAO.
Tu interviens avant l'analyse des captures d'écran pour contextualiser le ticket, identifier les questions posées, et structurer les échanges de manière claire.
Ta mission principale :
1. Identifier le client et le contexte du ticket (demande "name" et "description")
- Récupère le nom de l'auteur si présent
- Indique si un `user_id` est disponible
- Conserve uniquement les informations d'identification utiles (pas d'adresse ou signature de mail inutile)
2. Mettre en perspective le `name` du ticket
- Il peut contenir une ou plusieurs questions implicites
- Reformule ces questions de façon explicite
3. Analyser la `description`
- Elle fournit souvent le vrai point d'entrée technique
- Repère les formulations interrogatives ou les demandes spécifiques
- Identifie si cette partie complète ou précise les questions du nom
4. Structurer le fil de discussion
- Conserve uniquement les échanges pertinents
-Conserve les questions soulevés par "name" ou "description"
- CONSERVE ABSOLUMENT les références documentation, FAQ, liens utiles et manuels
- Identifie clairement chaque intervenant (client / support)
- Classe les informations par ordre chronologique avec date et rôle
5. Préparer la transmission à l'agent suivant
- Préserve tous les éléments utiles à l'analyse d'image : modules cités, options évoquées, comportements décrits
- Mentionne si des images sont attachées au ticket
Structure ta réponse :
1. Résumé du contexte
- Client (nom, email si disponible)
- Sujet du ticket reformulé en une ou plusieurs questions
- Description technique synthétique
2. Informations techniques détectées
- Logiciels/modules mentionnés
- Paramètres évoqués
- Fonctionnalités impactées
- Conditions spécifiques (multi-laboratoire, utilisateur non valide, etc.)
3. Fil de discussion (filtrée, nettoyée, classée)
- Intervenant (Client/Support)
- Date et contenu de chaque échange
- Résumés techniques
- INCLURE TOUS les liens documentaires (manuel, FAQ, documentation technique)
4. Éléments liés à l'analyse visuelle
- Nombre d'images attachées
- Références aux interfaces ou options à visualiser
- Points à vérifier dans les captures (listes incomplètes, cases à cocher, utilisateurs grisés, etc.)
IMPORTANT :
- Ne propose aucune solution ni interprétation
- Ne génère pas de tableau
- Reste strictement factuel en te basant uniquement sur les informations fournies
- Ne reformule pas les messages, conserve les formulations exactes sauf nettoyage de forme"""
# Initialiser le loader de données
self.ticket_loader = TicketDataLoader()
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentTicketAnalyser initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
self.llm.configurer(**params)
def executer(self, ticket_data: Dict[str, Any]) -> str:
"""
Analyse un ticket pour en extraire les informations pertinentes
Args:
ticket_data: Dictionnaire contenant les données du ticket à analyser
ou chemin vers un fichier de ticket (JSON ou Markdown)
Returns:
Réponse formatée contenant l'analyse du ticket
"""
# Détecter si ticket_data est un chemin de fichier ou un dictionnaire
if isinstance(ticket_data, str) and os.path.exists(ticket_data):
try:
ticket_data = self.ticket_loader.charger(ticket_data)
logger.info(f"Données chargées depuis le fichier: {ticket_data}")
except Exception as e:
error_message = f"Erreur lors du chargement du fichier: {str(e)}"
logger.error(error_message)
return f"ERREUR: {error_message}"
# Vérifier que les données sont bien un dictionnaire
if not isinstance(ticket_data, dict):
error_message = "Les données du ticket doivent être un dictionnaire ou un chemin de fichier valide"
logger.error(error_message)
return f"ERREUR: {error_message}"
ticket_code = ticket_data.get('code', 'Inconnu')
logger.info(f"Analyse du ticket: {ticket_code}")
print(f"AgentTicketAnalyser: Analyse du ticket {ticket_code}")
# Récupérer les métadonnées sur la source des données
source_format = "inconnu"
source_file = "non spécifié"
if "metadata" in ticket_data and isinstance(ticket_data["metadata"], dict):
source_format = ticket_data["metadata"].get("format", "inconnu")
source_file = ticket_data["metadata"].get("source_file", "non spécifié")
logger.info(f"Format source: {source_format}, Fichier source: {source_file}")
# Préparer le ticket pour l'analyse
ticket_formate = self._formater_ticket_pour_analyse(ticket_data)
# Créer le prompt pour l'analyse, adapté au format source
prompt = f"""Analyse ce ticket pour en extraire les informations clés et préparer une synthèse structurée.
SOURCE: {source_format.upper()}
{ticket_formate}
RAPPEL IMPORTANT:
- CONSERVE TOUS les liens (FAQ, documentation, manuels) présents dans les messages
- Extrais et organise chronologiquement les échanges client/support
- Identifie les éléments techniques à observer dans les captures d'écran
- Reste factuel et précis sans proposer de solution"""
try:
logger.info("Interrogation du LLM")
response = self.llm.interroger(prompt)
logger.info(f"Réponse reçue: {len(response)} caractères")
print(f" Analyse terminée: {len(response)} caractères")
except Exception as e:
error_message = f"Erreur lors de l'analyse du ticket: {str(e)}"
logger.error(error_message)
response = f"ERREUR: {error_message}"
print(f" ERREUR: {error_message}")
# Enregistrer l'historique avec le prompt complet pour la traçabilité
self.ajouter_historique("analyse_ticket",
{
"ticket_id": ticket_code,
"format_source": source_format,
"source_file": source_file,
"prompt": prompt,
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"timestamp": self._get_timestamp()
},
response)
return response
def _formater_ticket_pour_analyse(self, ticket_data: Dict) -> str:
"""
Formate les données du ticket pour l'analyse LLM, avec une meilleure
gestion des différents formats et structures de données.
Args:
ticket_data: Les données du ticket
Returns:
Représentation textuelle formatée du ticket
"""
# Initialiser avec les informations de base
ticket_name = ticket_data.get('name', 'Sans titre')
ticket_code = ticket_data.get('code', 'Inconnu')
info = f"## TICKET {ticket_code}: {ticket_name}\n\n"
info += f"## NOM DE LA DEMANDE (PROBLÈME INITIAL)\n{ticket_name}\n\n"
# Ajouter la description
description = ticket_data.get('description', '')
if description:
info += f"## DESCRIPTION DU PROBLÈME\n{description}\n\n"
# Ajouter les informations du ticket (exclure certains champs spécifiques)
champs_a_exclure = ['code', 'name', 'description', 'messages', 'metadata']
info += "## INFORMATIONS TECHNIQUES DU TICKET\n"
for key, value in ticket_data.items():
if key not in champs_a_exclure and value:
# Formater les valeurs complexes si nécessaire
if isinstance(value, (dict, list)):
value = json.dumps(value, ensure_ascii=False, indent=2)
info += f"- {key}: {value}\n"
info += "\n"
# Ajouter les messages (conversations) avec un formatage amélioré pour distinguer client/support
messages = ticket_data.get('messages', [])
if messages:
info += "## CHRONOLOGIE DES ÉCHANGES CLIENT/SUPPORT\n"
for i, msg in enumerate(messages):
# Vérifier que le message est bien un dictionnaire
if not isinstance(msg, dict):
continue
sender = msg.get('from', 'Inconnu')
date = msg.get('date', 'Date inconnue')
content = msg.get('content', '')
# Identifier si c'est client ou support
sender_type = "CLIENT" if "client" in sender.lower() else "SUPPORT" if "support" in sender.lower() else "AUTRE"
# Formater correctement la date si possible
try:
if date != 'Date inconnue':
# Essayer différents formats de date
for date_format in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d', '%d/%m/%Y']:
try:
date_obj = datetime.strptime(date, date_format)
date = date_obj.strftime('%d/%m/%Y %H:%M')
break
except ValueError:
continue
except Exception:
pass # Garder la date d'origine en cas d'erreur
info += f"### Message {i+1} - [{sender_type}] De: {sender} - Date: {date}\n{content}\n\n"
# Ajouter les métadonnées techniques si présentes
metadata = ticket_data.get('metadata', {})
# Exclure certaines métadonnées internes
for key in ['source_file', 'format']:
if key in metadata:
metadata.pop(key)
if metadata:
info += "## MÉTADONNÉES TECHNIQUES\n"
for key, value in metadata.items():
if isinstance(value, (dict, list)):
value = json.dumps(value, ensure_ascii=False, indent=2)
info += f"- {key}: {value}\n"
info += "\n"
return info
def analyser_depuis_fichier(self, chemin_fichier: str) -> str:
"""
Analyse un ticket à partir d'un fichier (JSON ou Markdown)
Args:
chemin_fichier: Chemin vers le fichier à analyser
Returns:
Résultat de l'analyse
"""
try:
ticket_data = self.ticket_loader.charger(chemin_fichier)
return self.executer(ticket_data)
except Exception as e:
error_message = f"Erreur lors de l'analyse du fichier {chemin_fichier}: {str(e)}"
logger.error(error_message)
return f"ERREUR: {error_message}"
def _get_timestamp(self) -> str:
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
return datetime.now().strftime("%Y%m%d_%H%M%S")