This commit is contained in:
Ladebeze66 2025-04-18 14:47:02 +02:00
parent c826b23aca
commit 2fd6c8ac2e
4 changed files with 529 additions and 730 deletions

View File

@ -0,0 +1,368 @@
import json
import os
from ..base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any, Tuple, Optional, List
import logging
import traceback
import re
import sys
from ..utils.report_utils import extraire_et_traiter_json
from ..utils.report_formatter import extraire_sections_texte, generer_rapport_markdown, construire_rapport_json
from ..utils.agent_info_collector import collecter_info_agents, collecter_prompts_agents
logger = logging.getLogger("AgentReportGenerator")
class AgentReportGenerator(BaseAgent):
"""
Agent pour générer un rapport synthétique à partir des analyses de ticket et d'images.
"""
def __init__(self, llm):
super().__init__("AgentReportGenerator", llm)
# Configuration locale de l'agent
self.temperature = 0.2
self.top_p = 0.9
self.max_tokens = 8000
# Prompt système principal
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab pour la société CBAO.
Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré.
EXIGENCE ABSOLUE - Ton rapport DOIT inclure dans l'ordre:
1. Un résumé du problème initial (nom de la demande + description)
2. Une analyse détaillée des images pertinentes en lien avec le problème
3. Une synthèse globale des analyses d'images
4. Une reconstitution du fil de discussion client/support
5. Un tableau JSON de chronologie des échanges avec cette structure:
```json
{
"chronologie_echanges": [
{"date": "date exacte", "emetteur": "CLIENT ou SUPPORT", "type": "Question ou Réponse", "contenu": "contenu synthétisé"}
]
}
```
6. Un diagnostic technique des causes probables
MÉTHODE D'ANALYSE (ÉTAPES OBLIGATOIRES):
1. ANALYSE TOUTES les images AVANT de créer le tableau des échanges
2. Concentre-toi sur les éléments mis en évidence (encadrés/surlignés) dans chaque image
3. Réalise une SYNTHÈSE TRANSVERSALE en expliquant comment les images se complètent
4. Remets les images en ordre chronologique selon le fil de discussion
5. CONSERVE TOUS les liens documentaires, FAQ et références techniques
6. Ajoute une entrée "Complément visuel" dans le tableau des échanges"""
# Version du prompt pour la traçabilité
self.prompt_version = "v3.2"
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentReportGenerator initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
self.llm.configurer(**params)
logger.info(f"Configuration appliquée au modèle: {str(params)}")
def _formater_prompt_pour_rapport(self, ticket_analyse: str, images_analyses: List[Dict]) -> str:
"""
Formate le prompt pour la génération du rapport
"""
num_images = len(images_analyses)
logger.info(f"Formatage du prompt avec {num_images} analyses d'images")
# Construire la section d'analyse du ticket
prompt = f"""Génère un rapport technique complet, en te basant sur les analyses suivantes.
## ANALYSE DU TICKET
{ticket_analyse}
"""
# Ajouter la section d'analyse des images si présente
if num_images > 0:
prompt += f"\n## ANALYSES DES IMAGES ({num_images} images)\n"
for i, img_analyse in enumerate(images_analyses, 1):
image_name = img_analyse.get("image_name", f"Image {i}")
analyse = img_analyse.get("analyse", "Analyse non disponible")
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
else:
prompt += "\n## ANALYSES DES IMAGES\nAucune image n'a été fournie pour ce ticket.\n"
# Instructions pour le rapport
prompt += """
## INSTRUCTIONS POUR LE RAPPORT
STRUCTURE OBLIGATOIRE ET ORDRE À SUIVRE:
1. Titre principal (# Rapport d'analyse: Nom du ticket)
2. Résumé du problème (## Résumé du problème)
3. Analyse des images (## Analyse des images) - CRUCIAL: FAIRE CETTE SECTION AVANT LE TABLEAU
4. Synthèse globale des analyses d'images (## 3.1 Synthèse globale des analyses d'images)
5. Fil de discussion (## Fil de discussion)
6. Tableau questions/réponses (## Tableau questions/réponses)
7. Diagnostic technique (## Diagnostic technique)
MÉTHODE POUR ANALYSER LES IMAGES:
- Pour chaque image, concentre-toi prioritairement sur:
* Les éléments mis en évidence (zones encadrées, surlignées)
* La relation avec le problème décrit
* Le lien avec le fil de discussion
SYNTHÈSE GLOBALE DES IMAGES (SECTION CRUCIALE):
- Titre à utiliser OBLIGATOIREMENT: ## 3.1 Synthèse globale des analyses d'images
- Premier sous-titre à utiliser OBLIGATOIREMENT: _Analyse transversale des captures d'écran_
- Structure cette section avec les sous-parties:
* Points communs et complémentaires entre les images
* Corrélation entre les éléments et le problème global
* Confirmation visuelle des informations du support
- Montre comment les images se complètent pour illustrer le processus complet
- Cette synthèse transversale servira de base pour le "Complément visuel"
POUR LE TABLEAU QUESTIONS/RÉPONSES:
- Tu DOIS créer et inclure un tableau JSON structuré comme ceci:
```json
{
"chronologie_echanges": [
{"date": "date demande", "emetteur": "CLIENT", "type": "Question", "contenu": "Texte exact du problème initial extrait du ticket"},
{"date": "date exacte", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "réponse avec TOUS les liens documentaires"},
{"date": "date analyse", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "synthèse unifiée de TOUTES les images"}
]
}
```
DIRECTIVES ESSENTIELLES:
- COMMENCE ABSOLUMENT par une entrée CLIENT avec les questions du NOM et de la DESCRIPTION du ticket
- Si le premier message chronologique est une réponse du SUPPORT qui cite la question, extrais la question citée pour l'ajouter comme première entrée CLIENT
- CONSERVE ABSOLUMENT TOUS les liens vers la documentation, FAQ, manuels et références techniques
- Ajoute UNE SEULE entrée "Complément visuel" qui synthétise l'apport global des images
- Cette entrée doit montrer comment les images confirment/illustrent le processus complet
- Formulation recommandée: "L'analyse des captures d'écran confirme visuellement le processus: (1)..., (2)..., (3)... Ces interfaces complémentaires illustrent..."
- Évite de traiter les images séparément dans le tableau; présente une vision unifiée
- Identifie clairement chaque intervenant (CLIENT ou SUPPORT)
"""
return prompt
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
"""
Génère un rapport à partir des analyses effectuées
"""
try:
# 1. PRÉPARATION
ticket_id = self._extraire_ticket_id(rapport_data, rapport_dir)
logger.info(f"Génération du rapport pour le ticket: {ticket_id}")
print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}")
# Créer le répertoire de sortie si nécessaire
os.makedirs(rapport_dir, exist_ok=True)
# 2. EXTRACTION DES DONNÉES
ticket_analyse = self._extraire_analyse_ticket(rapport_data)
images_analyses = self._extraire_analyses_images(rapport_data)
# 3. COLLECTE DES INFORMATIONS SUR LES AGENTS (via le nouveau module)
agent_info = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"prompt_version": self.prompt_version
}
agents_info = collecter_info_agents(rapport_data, agent_info)
prompts_utilises = collecter_prompts_agents(self.system_prompt)
# 4. GÉNÉRATION DU RAPPORT
prompt = self._formater_prompt_pour_rapport(ticket_analyse, images_analyses)
logger.info("Génération du rapport avec le LLM")
print(f" Génération du rapport avec le LLM...")
# Mesurer le temps d'exécution
start_time = datetime.now()
rapport_genere = self.llm.interroger(prompt)
generation_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Rapport généré: {len(rapport_genere)} caractères")
print(f" Rapport généré: {len(rapport_genere)} caractères")
# 5. EXTRACTION DES DONNÉES DU RAPPORT
# Utiliser l'utilitaire de report_utils.py pour extraire les données JSON
rapport_traite, echanges_json, _ = extraire_et_traiter_json(rapport_genere)
# Vérifier que echanges_json n'est pas None pour éviter l'erreur de type
if echanges_json is None:
echanges_json = {"chronologie_echanges": []}
logger.warning("Aucun échange JSON extrait du rapport, création d'une structure vide")
# Extraire les sections textuelles (résumé, diagnostic)
resume, analyse_images, diagnostic = extraire_sections_texte(rapport_genere)
# 6. CRÉATION DU RAPPORT JSON
# Préparer les métadonnées de l'agent
agent_metadata = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"model_version": getattr(self.llm, "version", "non spécifiée"),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"generation_time": generation_time,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"agents": agents_info
}
# Construire le rapport JSON
rapport_json = construire_rapport_json(
rapport_genere=rapport_genere,
rapport_data=rapport_data,
ticket_id=ticket_id,
ticket_analyse=ticket_analyse,
images_analyses=images_analyses,
generation_time=generation_time,
resume=resume,
analyse_images=analyse_images,
diagnostic=diagnostic,
echanges_json=echanges_json,
agent_metadata=agent_metadata,
prompts_utilises=prompts_utilises
)
# 7. SAUVEGARDE DU RAPPORT JSON
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(rapport_json, f, ensure_ascii=False, indent=2)
logger.info(f"Rapport JSON sauvegardé: {json_path}")
print(f" Rapport JSON sauvegardé: {json_path}")
# 8. GÉNÉRATION DU RAPPORT MARKDOWN
md_path = generer_rapport_markdown(json_path)
if md_path:
logger.info(f"Rapport Markdown généré: {md_path}")
print(f" Rapport Markdown généré: {md_path}")
else:
logger.error("Échec de la génération du rapport Markdown")
print(f" ERREUR: Échec de la génération du rapport Markdown")
return json_path, md_path
except Exception as e:
error_message = f"Erreur lors de la génération du rapport: {str(e)}"
logger.error(error_message)
logger.error(traceback.format_exc())
print(f" ERREUR: {error_message}")
return None, None
def _extraire_ticket_id(self, rapport_data: Dict, rapport_dir: str) -> str:
"""Extrait l'ID du ticket des données ou du chemin"""
# Essayer d'extraire depuis les données du rapport
ticket_id = rapport_data.get("ticket_id", "")
# Si pas d'ID direct, essayer depuis les données du ticket
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
ticket_id = rapport_data["ticket_data"].get("code", "")
# En dernier recours, extraire depuis le chemin
if not ticket_id:
# Essayer d'extraire un ID de ticket (format Txxxx) du chemin
match = re.search(r'T\d+', rapport_dir)
if match:
ticket_id = match.group(0)
else:
# Sinon, utiliser le dernier segment du chemin
ticket_id = os.path.basename(rapport_dir)
return ticket_id
def _extraire_analyse_ticket(self, rapport_data: Dict) -> str:
"""Extrait l'analyse du ticket des données"""
# Essayer les différentes clés possibles
for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]:
if key in rapport_data and rapport_data[key]:
logger.info(f"Utilisation de {key}")
return rapport_data[key]
# Créer une analyse par défaut si aucune n'est disponible
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
ticket_data = rapport_data.get("ticket_data", {})
ticket_name = ticket_data.get("name", "Sans titre")
ticket_desc = ticket_data.get("description", "Pas de description disponible")
return f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie)"
def _extraire_analyses_images(self, rapport_data: Dict) -> List[Dict]:
"""
Extrait et formate les analyses d'images pertinentes
"""
images_analyses = []
analyse_images_data = rapport_data.get("analyse_images", {})
# Parcourir toutes les images
for image_path, analyse_data in analyse_images_data.items():
# Vérifier si l'image est pertinente
is_relevant = False
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
is_relevant = analyse_data["sorting"].get("is_relevant", False)
# Si l'image est pertinente, extraire son analyse
if is_relevant:
image_name = os.path.basename(image_path)
analyse = self._extraire_analyse_image(analyse_data)
if analyse:
images_analyses.append({
"image_name": image_name,
"image_path": image_path,
"analyse": analyse,
"sorting_info": analyse_data.get("sorting", {}),
"metadata": analyse_data.get("analysis", {}).get("metadata", {})
})
logger.info(f"Analyse de l'image {image_name} ajoutée")
return images_analyses
def _extraire_analyse_image(self, analyse_data: Dict) -> Optional[str]:
"""
Extrait l'analyse d'une image depuis les données
"""
# Si pas de données d'analyse, retourner None
if not "analysis" in analyse_data or not analyse_data["analysis"]:
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
reason = analyse_data["sorting"].get("reason", "Non spécifiée")
return f"Image marquée comme pertinente. Raison: {reason}"
return None
# Extraire l'analyse selon le format des données
analysis = analyse_data["analysis"]
# Structure type 1: {"analyse": "texte"}
if isinstance(analysis, dict) and "analyse" in analysis:
return analysis["analyse"]
# Structure type 2: {"error": false, ...} - contient d'autres données utiles
if isinstance(analysis, dict) and "error" in analysis and not analysis.get("error", True):
return str(analysis)
# Structure type 3: texte d'analyse direct
if isinstance(analysis, str):
return analysis
# Structure type 4: autre format de dictionnaire - convertir en JSON
if isinstance(analysis, dict):
return json.dumps(analysis, ensure_ascii=False, indent=2)
# Aucun format reconnu
return None

View File

@ -1,368 +1,106 @@
import json
import os
from ..base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any, Tuple, Optional, List
from typing import Dict, Any
import logging
import traceback
import re
import sys
from ..utils.report_utils import extraire_et_traiter_json
from ..utils.report_formatter import extraire_sections_texte, generer_rapport_markdown, construire_rapport_json
from ..utils.agent_info_collector import collecter_info_agents, collecter_prompts_agents
import os
from datetime import datetime
from ..utils.pipeline_logger import sauvegarder_donnees
logger = logging.getLogger("AgentReportGenerator")
class AgentReportGenerator(BaseAgent):
"""
Agent pour générer un rapport synthétique à partir des analyses de ticket et d'images.
"""
def __init__(self, llm):
super().__init__("AgentReportGenerator", llm)
# Configuration locale de l'agent
self.temperature = 0.2
self.top_p = 0.9
self.max_tokens = 8000
# Prompt système principal
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab pour la société CBAO.
Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré.
EXIGENCE ABSOLUE - Ton rapport DOIT inclure dans l'ordre:
1. Un résumé du problème initial (nom de la demande + description)
2. Une analyse détaillée des images pertinentes en lien avec le problème
3. Une synthèse globale des analyses d'images
4. Une reconstitution du fil de discussion client/support
5. Un tableau JSON de chronologie des échanges avec cette structure:
```json
{
"chronologie_echanges": [
{"date": "date exacte", "emetteur": "CLIENT ou SUPPORT", "type": "Question ou Réponse", "contenu": "contenu synthétisé"}
]
}
```
6. Un diagnostic technique des causes probables
self.params = {
"temperature": 0.2,
"top_p": 0.8,
"max_tokens": 8000
}
self.system_prompt = """Tu es un expert en support technique chargé de générer un rapport final à partir des analyses d'un ticket de support.
Ton rôle est de croiser les informations provenant :
- de l'analyse textuelle du ticket client
- des analyses détaillées de plusieurs captures d'écran
Tu dois structurer ta réponse en format question/réponse de manière claire, en gardant l'intégralité des points importants.
Ne propose jamais de solution. Ne reformule pas le contexte.
Ta seule mission est de croiser les données textuelles et visuelles et d'en tirer des observations claires, en listant les éléments factuels visibles dans les captures qui appuient ou complètent le texte du ticket.
Structure du rapport attendu :
1. Contexte général (résumé du ticket textuel en une phrase)
2. Problèmes ou questions identifiés (sous forme de questions claires)
3. Résumé croisé image/texte pour chaque question
4. Liste d'observations supplémentaires pertinentes (si applicable)
Reste strictement factuel. Ne fais aucune hypothèse. Ne suggère pas d'étapes ni d'interprétation."""
MÉTHODE D'ANALYSE (ÉTAPES OBLIGATOIRES):
1. ANALYSE TOUTES les images AVANT de créer le tableau des échanges
2. Concentre-toi sur les éléments mis en évidence (encadrés/surlignés) dans chaque image
3. Réalise une SYNTHÈSE TRANSVERSALE en expliquant comment les images se complètent
4. Remets les images en ordre chronologique selon le fil de discussion
5. CONSERVE TOUS les liens documentaires, FAQ et références techniques
6. Ajoute une entrée "Complément visuel" dans le tableau des échanges"""
# Version du prompt pour la traçabilité
self.prompt_version = "v3.2"
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentReportGenerator initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens
}
self.llm.configurer(**params)
logger.info(f"Configuration appliquée au modèle: {str(params)}")
def _formater_prompt_pour_rapport(self, ticket_analyse: str, images_analyses: List[Dict]) -> str:
"""
Formate le prompt pour la génération du rapport
"""
num_images = len(images_analyses)
logger.info(f"Formatage du prompt avec {num_images} analyses d'images")
# Construire la section d'analyse du ticket
prompt = f"""Génère un rapport technique complet, en te basant sur les analyses suivantes.
self.llm.configurer(**self.params)
## ANALYSE DU TICKET
{ticket_analyse}
"""
# Ajouter la section d'analyse des images si présente
if num_images > 0:
prompt += f"\n## ANALYSES DES IMAGES ({num_images} images)\n"
for i, img_analyse in enumerate(images_analyses, 1):
image_name = img_analyse.get("image_name", f"Image {i}")
analyse = img_analyse.get("analyse", "Analyse non disponible")
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
else:
prompt += "\n## ANALYSES DES IMAGES\nAucune image n'a été fournie pour ce ticket.\n"
# Instructions pour le rapport
prompt += """
## INSTRUCTIONS POUR LE RAPPORT
def executer(self, rapport_data: Dict[str, Any], dossier_destination: str) -> str:
ticket_id = rapport_data.get("ticket_id", "Inconnu")
print(f"AgentReportGenerator : génération du rapport pour le ticket {ticket_id}")
STRUCTURE OBLIGATOIRE ET ORDRE À SUIVRE:
1. Titre principal (# Rapport d'analyse: Nom du ticket)
2. Résumé du problème (## Résumé du problème)
3. Analyse des images (## Analyse des images) - CRUCIAL: FAIRE CETTE SECTION AVANT LE TABLEAU
4. Synthèse globale des analyses d'images (## 3.1 Synthèse globale des analyses d'images)
5. Fil de discussion (## Fil de discussion)
6. Tableau questions/réponses (## Tableau questions/réponses)
7. Diagnostic technique (## Diagnostic technique)
MÉTHODE POUR ANALYSER LES IMAGES:
- Pour chaque image, concentre-toi prioritairement sur:
* Les éléments mis en évidence (zones encadrées, surlignées)
* La relation avec le problème décrit
* Le lien avec le fil de discussion
SYNTHÈSE GLOBALE DES IMAGES (SECTION CRUCIALE):
- Titre à utiliser OBLIGATOIREMENT: ## 3.1 Synthèse globale des analyses d'images
- Premier sous-titre à utiliser OBLIGATOIREMENT: _Analyse transversale des captures d'écran_
- Structure cette section avec les sous-parties:
* Points communs et complémentaires entre les images
* Corrélation entre les éléments et le problème global
* Confirmation visuelle des informations du support
- Montre comment les images se complètent pour illustrer le processus complet
- Cette synthèse transversale servira de base pour le "Complément visuel"
POUR LE TABLEAU QUESTIONS/RÉPONSES:
- Tu DOIS créer et inclure un tableau JSON structuré comme ceci:
```json
{
"chronologie_echanges": [
{"date": "date demande", "emetteur": "CLIENT", "type": "Question", "contenu": "Texte exact du problème initial extrait du ticket"},
{"date": "date exacte", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "réponse avec TOUS les liens documentaires"},
{"date": "date analyse", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "synthèse unifiée de TOUTES les images"}
]
}
```
DIRECTIVES ESSENTIELLES:
- COMMENCE ABSOLUMENT par une entrée CLIENT avec les questions du NOM et de la DESCRIPTION du ticket
- Si le premier message chronologique est une réponse du SUPPORT qui cite la question, extrais la question citée pour l'ajouter comme première entrée CLIENT
- CONSERVE ABSOLUMENT TOUS les liens vers la documentation, FAQ, manuels et références techniques
- Ajoute UNE SEULE entrée "Complément visuel" qui synthétise l'apport global des images
- Cette entrée doit montrer comment les images confirment/illustrent le processus complet
- Formulation recommandée: "L'analyse des captures d'écran confirme visuellement le processus: (1)..., (2)..., (3)... Ces interfaces complémentaires illustrent..."
- Évite de traiter les images séparément dans le tableau; présente une vision unifiée
- Identifie clairement chaque intervenant (CLIENT ou SUPPORT)
"""
return prompt
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
"""
Génère un rapport à partir des analyses effectuées
"""
try:
# 1. PRÉPARATION
ticket_id = self._extraire_ticket_id(rapport_data, rapport_dir)
logger.info(f"Génération du rapport pour le ticket: {ticket_id}")
print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}")
# Créer le répertoire de sortie si nécessaire
os.makedirs(rapport_dir, exist_ok=True)
# 2. EXTRACTION DES DONNÉES
ticket_analyse = self._extraire_analyse_ticket(rapport_data)
images_analyses = self._extraire_analyses_images(rapport_data)
# 3. COLLECTE DES INFORMATIONS SUR LES AGENTS (via le nouveau module)
agent_info = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"prompt_version": self.prompt_version
prompt = self._generer_prompt(rapport_data)
response = self.llm.interroger(prompt)
logger.info(f"Rapport généré pour le ticket {ticket_id}, longueur: {len(response)} caractères")
result = {
"prompt": prompt,
"response": response,
"metadata": {
"ticket_id": ticket_id,
"timestamp": self._get_timestamp(),
"source_agent": self.nom,
"model_info": {
"model": getattr(self.llm, "modele", str(type(self.llm))),
**getattr(self.llm, "params", {})
}
}
}
agents_info = collecter_info_agents(rapport_data, agent_info)
prompts_utilises = collecter_prompts_agents(self.system_prompt)
# 4. GÉNÉRATION DU RAPPORT
prompt = self._formater_prompt_pour_rapport(ticket_analyse, images_analyses)
logger.info("Génération du rapport avec le LLM")
print(f" Génération du rapport avec le LLM...")
# Mesurer le temps d'exécution
start_time = datetime.now()
rapport_genere = self.llm.interroger(prompt)
generation_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Rapport généré: {len(rapport_genere)} caractères")
print(f" Rapport généré: {len(rapport_genere)} caractères")
# 5. EXTRACTION DES DONNÉES DU RAPPORT
# Utiliser l'utilitaire de report_utils.py pour extraire les données JSON
rapport_traite, echanges_json, _ = extraire_et_traiter_json(rapport_genere)
# Vérifier que echanges_json n'est pas None pour éviter l'erreur de type
if echanges_json is None:
echanges_json = {"chronologie_echanges": []}
logger.warning("Aucun échange JSON extrait du rapport, création d'une structure vide")
# Extraire les sections textuelles (résumé, diagnostic)
resume, analyse_images, diagnostic = extraire_sections_texte(rapport_genere)
# 6. CRÉATION DU RAPPORT JSON
# Préparer les métadonnées de l'agent
agent_metadata = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"model_version": getattr(self.llm, "version", "non spécifiée"),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"generation_time": generation_time,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"agents": agents_info
}
# Construire le rapport JSON
rapport_json = construire_rapport_json(
rapport_genere=rapport_genere,
rapport_data=rapport_data,
ticket_id=ticket_id,
ticket_analyse=ticket_analyse,
images_analyses=images_analyses,
generation_time=generation_time,
resume=resume,
analyse_images=analyse_images,
diagnostic=diagnostic,
echanges_json=echanges_json,
agent_metadata=agent_metadata,
prompts_utilises=prompts_utilises
)
# 7. SAUVEGARDE DU RAPPORT JSON
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(rapport_json, f, ensure_ascii=False, indent=2)
logger.info(f"Rapport JSON sauvegardé: {json_path}")
print(f" Rapport JSON sauvegardé: {json_path}")
# 8. GÉNÉRATION DU RAPPORT MARKDOWN
md_path = generer_rapport_markdown(json_path)
if md_path:
logger.info(f"Rapport Markdown généré: {md_path}")
print(f" Rapport Markdown généré: {md_path}")
else:
logger.error("Échec de la génération du rapport Markdown")
print(f" ERREUR: Échec de la génération du rapport Markdown")
return json_path, md_path
sauvegarder_donnees(ticket_id, "rapport_final", result, base_dir=dossier_destination, is_resultat=True)
self.ajouter_historique("rapport_final", {
"ticket_id": ticket_id,
"prompt": prompt,
"timestamp": self._get_timestamp()
}, response)
return response
except Exception as e:
error_message = f"Erreur lors de la génération du rapport: {str(e)}"
logger.error(error_message)
logger.error(traceback.format_exc())
print(f" ERREUR: {error_message}")
return None, None
def _extraire_ticket_id(self, rapport_data: Dict, rapport_dir: str) -> str:
"""Extrait l'ID du ticket des données ou du chemin"""
# Essayer d'extraire depuis les données du rapport
ticket_id = rapport_data.get("ticket_id", "")
# Si pas d'ID direct, essayer depuis les données du ticket
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
ticket_id = rapport_data["ticket_data"].get("code", "")
# En dernier recours, extraire depuis le chemin
if not ticket_id:
# Essayer d'extraire un ID de ticket (format Txxxx) du chemin
match = re.search(r'T\d+', rapport_dir)
if match:
ticket_id = match.group(0)
else:
# Sinon, utiliser le dernier segment du chemin
ticket_id = os.path.basename(rapport_dir)
return ticket_id
def _extraire_analyse_ticket(self, rapport_data: Dict) -> str:
"""Extrait l'analyse du ticket des données"""
# Essayer les différentes clés possibles
for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]:
if key in rapport_data and rapport_data[key]:
logger.info(f"Utilisation de {key}")
return rapport_data[key]
# Créer une analyse par défaut si aucune n'est disponible
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
ticket_data = rapport_data.get("ticket_data", {})
ticket_name = ticket_data.get("name", "Sans titre")
ticket_desc = ticket_data.get("description", "Pas de description disponible")
return f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie)"
def _extraire_analyses_images(self, rapport_data: Dict) -> List[Dict]:
"""
Extrait et formate les analyses d'images pertinentes
"""
images_analyses = []
analyse_images_data = rapport_data.get("analyse_images", {})
# Parcourir toutes les images
for image_path, analyse_data in analyse_images_data.items():
# Vérifier si l'image est pertinente
is_relevant = False
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
is_relevant = analyse_data["sorting"].get("is_relevant", False)
# Si l'image est pertinente, extraire son analyse
if is_relevant:
image_name = os.path.basename(image_path)
analyse = self._extraire_analyse_image(analyse_data)
if analyse:
images_analyses.append({
"image_name": image_name,
"image_path": image_path,
"analyse": analyse,
"sorting_info": analyse_data.get("sorting", {}),
"metadata": analyse_data.get("analysis", {}).get("metadata", {})
})
logger.info(f"Analyse de l'image {image_name} ajoutée")
return images_analyses
def _extraire_analyse_image(self, analyse_data: Dict) -> Optional[str]:
"""
Extrait l'analyse d'une image depuis les données
"""
# Si pas de données d'analyse, retourner None
if not "analysis" in analyse_data or not analyse_data["analysis"]:
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
reason = analyse_data["sorting"].get("reason", "Non spécifiée")
return f"Image marquée comme pertinente. Raison: {reason}"
return None
# Extraire l'analyse selon le format des données
analysis = analyse_data["analysis"]
# Structure type 1: {"analyse": "texte"}
if isinstance(analysis, dict) and "analyse" in analysis:
return analysis["analyse"]
# Structure type 2: {"error": false, ...} - contient d'autres données utiles
if isinstance(analysis, dict) and "error" in analysis and not analysis.get("error", True):
return str(analysis)
# Structure type 3: texte d'analyse direct
if isinstance(analysis, str):
return analysis
# Structure type 4: autre format de dictionnaire - convertir en JSON
if isinstance(analysis, dict):
return json.dumps(analysis, ensure_ascii=False, indent=2)
# Aucun format reconnu
return None
logger.error(f"Erreur lors de la génération du rapport : {str(e)}")
return f"ERREUR: {str(e)}"
def _generer_prompt(self, rapport_data: Dict[str, Any]) -> str:
ticket_text = rapport_data.get("ticket_analyse", "")
image_blocs = []
analyses_images = rapport_data.get("analyse_images", {})
for chemin_image, analyse_obj in analyses_images.items():
analyse = analyse_obj.get("analysis", {}).get("analyse", "")
if analyse:
image_blocs.append(f"--- IMAGE : {os.path.basename(chemin_image)} ---\n{analyse}\n")
bloc_images = "\n".join(image_blocs)
return (
f"Voici les données d'analyse pour un ticket de support :\n\n"
f"=== ANALYSE DU TICKET ===\n{ticket_text}\n\n"
f"=== ANALYSES D'IMAGES ===\n{bloc_images}\n\n"
f"Génère un rapport croisé en suivant les instructions précédentes."
)
def _get_timestamp(self) -> str:
from datetime import datetime
return datetime.now().strftime("%Y%m%d_%H%M%S")

View File

@ -3,418 +3,91 @@ import json
import logging
import time
import traceback
from typing import List, Dict, Any, Optional, Union, Mapping, cast
from typing import List, Dict, Any, Optional
from agents.base_agent import BaseAgent
from loaders.ticket_data_loader import TicketDataLoader
from agents.utils.report_formatter import generer_rapport_markdown
from utils.image_dedup import filtrer_images_uniques
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='orchestrator.log', filemode='w')
logger = logging.getLogger("Orchestrator")
class Orchestrator:
"""
Orchestrateur pour l'analyse de tickets et la génération de rapports.
"""
def __init__(self,
output_dir: str = "output/",
ticket_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None):
self.output_dir = output_dir
# Assignation directe des agents
self.ticket_agent = ticket_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
# Initialisation du loader de données de ticket
self.ticket_loader = TicketDataLoader()
# Collecter et enregistrer les informations détaillées sur les agents
agents_info = self._collecter_info_agents()
logger.info(f"Orchestrator initialisé avec output_dir: {output_dir}")
logger.info(f"Agents disponibles: TicketAgent={ticket_agent is not None}, ImageSorter={image_sorter is not None}, ImageAnalyser={image_analyser is not None}, ReportGenerator={report_generator is not None}")
logger.info(f"Configuration des agents: {json.dumps(agents_info, indent=2)}")
def _collecter_info_agents(self) -> Dict[str, Dict[str, Any]]:
"""
Collecte des informations détaillées sur les agents configurés
"""
agents_info = {}
# Information sur l'agent Ticket
if self.ticket_agent:
agents_info["ticket_agent"] = self._get_agent_info(self.ticket_agent)
# Information sur l'agent Image Sorter
if self.image_sorter:
agents_info["image_sorter"] = self._get_agent_info(self.image_sorter)
# Information sur l'agent Image Analyser
if self.image_analyser:
agents_info["image_analyser"] = self._get_agent_info(self.image_analyser)
# Information sur l'agent Report Generator
if self.report_generator:
agents_info["report_generator"] = self._get_agent_info(self.report_generator)
return agents_info
def trouver_rapport(self, extraction_path: str, ticket_id: str) -> Optional[str]:
return self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
def detecter_tickets(self) -> List[str]:
"""Détecte tous les tickets disponibles dans le répertoire de sortie"""
logger.info(f"Recherche de tickets dans: {self.output_dir}")
tickets = []
if not os.path.exists(self.output_dir):
logger.warning(f"Le répertoire de sortie {self.output_dir} n'existe pas")
print(f"ERREUR: Le répertoire {self.output_dir} n'existe pas")
return tickets
for ticket_dir in os.listdir(self.output_dir):
ticket_path = os.path.join(self.output_dir, ticket_dir)
if os.path.isdir(ticket_path) and ticket_dir.startswith("ticket_"):
tickets.append(ticket_dir)
return tickets
def _preparer_donnees_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]:
if not json_path:
return None
try:
return self.ticket_loader.charger(json_path)
except Exception as e:
logger.error(f"Erreur chargement JSON: {e}")
return None
def trouver_rapport(self, extraction_path: str, ticket_id: str) -> Dict[str, Optional[str]]:
"""
Cherche le rapport JSON.
"""
result = self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
if not result:
logger.warning(f"Aucun fichier JSON trouvé pour le ticket {ticket_id}")
return {"json": None}
return {"json": result}
def executer(self, ticket_specifique: Optional[str] = None):
"""
Exécute l'orchestrateur soit sur un ticket spécifique, soit sur tous les tickets
Args:
ticket_specifique: Code du ticket spécifique à traiter (optionnel)
"""
start_time = time.time()
# Obtenir la liste des tickets
if ticket_specifique:
# Chercher le ticket spécifique
ticket_path = os.path.join(self.output_dir, f"ticket_{ticket_specifique}")
if os.path.exists(ticket_path):
ticket_dirs = [ticket_path]
logger.info(f"Ticket spécifique à traiter: {ticket_specifique}")
print(f"Ticket spécifique à traiter: {ticket_specifique}")
else:
logger.error(f"Le ticket {ticket_specifique} n'existe pas")
print(f"ERREUR: Le ticket {ticket_specifique} n'existe pas")
return
else:
# Lister tous les tickets
ticket_dirs = [os.path.join(self.output_dir, d) for d in self.detecter_tickets()]
logger.info(f"Tickets à traiter: {len(ticket_dirs)}")
if not ticket_dirs:
logger.warning("Aucun ticket trouvé dans le répertoire de sortie")
print("Aucun ticket trouvé dans le répertoire de sortie")
return
# Un seul log de début d'exécution
logger.info("Début de l'exécution de l'orchestrateur")
print("Début de l'exécution de l'orchestrateur")
# Traitement des tickets
for ticket_dir in ticket_dirs:
try:
self.traiter_ticket(ticket_dir)
except Exception as e:
logger.error(f"Erreur lors du traitement du ticket {ticket_dir}: {str(e)}")
print(f"Erreur lors du traitement du ticket {ticket_dir}: {str(e)}")
traceback.print_exc()
# Calcul de la durée d'exécution
duration = time.time() - start_time
logger.info(f"Fin de l'exécution de l'orchestrateur (durée: {duration:.2f} secondes)")
print(f"Fin de l'exécution de l'orchestrateur (durée: {duration:.2f} secondes)")
def traiter_ticket(self, ticket_path: str) -> bool:
"""Traite un ticket spécifique et retourne True si le traitement a réussi"""
logger.info(f"Début du traitement du ticket: {ticket_path}")
print(f"\nTraitement du ticket: {os.path.basename(ticket_path)}")
success = False
extractions_trouvees = False
if not os.path.exists(ticket_path):
logger.error(f"Le chemin du ticket n'existe pas: {ticket_path}")
print(f"ERREUR: Le chemin du ticket n'existe pas: {ticket_path}")
return False
tickets = ([f"ticket_{ticket_specifique}"] if ticket_specifique else self._detecter_tickets())
for ticket in tickets:
self.traiter_ticket(os.path.join(self.output_dir, ticket))
def traiter_ticket(self, ticket_path: str) -> None:
ticket_id = os.path.basename(ticket_path).replace("ticket_", "")
for extraction in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extraction)
if os.path.isdir(extraction_path):
extractions_trouvees = True
logger.info(f"Traitement de l'extraction: {extraction}")
print(f" Traitement de l'extraction: {extraction}")
# Recherche des rapports (JSON et MD) dans différents emplacements
rapports = self.trouver_rapport(extraction_path, ticket_id)
# Dossier des pièces jointes
attachments_dir = os.path.join(extraction_path, "attachments")
# Dossier pour les rapports générés
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
os.makedirs(rapports_dir, exist_ok=True)
# Préparer les données du ticket à partir des rapports trouvés
ticket_data = self._preparer_donnees_ticket(rapports, ticket_id)
if ticket_data:
success = True
logger.info(f"Données du ticket chargées avec succès")
print(f" Données du ticket chargées")
# Traitement avec l'agent Ticket
if self.ticket_agent:
logger.info("Exécution de l'agent Ticket")
print(" Analyse du ticket en cours...")
# Log détaillé sur l'agent Ticket
agent_info = self._get_agent_info(self.ticket_agent)
logger.info(f"Agent Ticket: {json.dumps(agent_info, indent=2)}")
ticket_analysis = self.ticket_agent.executer(ticket_data)
logger.info("Analyse du ticket terminée")
print(f" Analyse du ticket terminée: {len(ticket_analysis) if ticket_analysis else 0} caractères")
else:
logger.warning("Agent Ticket non disponible")
ticket_analysis = None
print(" Agent Ticket non disponible, analyse ignorée")
if not os.path.isdir(extraction_path):
continue
attachments_dir = os.path.join(extraction_path, "attachments")
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
os.makedirs(rapports_dir, exist_ok=True)
# Traitement des images
relevant_images = []
images_analyses = {}
images_count = 0
if os.path.exists(attachments_dir):
logger.info(f"Vérification des pièces jointes dans: {attachments_dir}")
print(f" Vérification des pièces jointes...")
# Log détaillé sur l'agent Image Sorter
if self.image_sorter:
agent_info = self._get_agent_info(self.image_sorter)
logger.info(f"Agent Image Sorter: {json.dumps(agent_info, indent=2)}")
# Compter le nombre d'images
images = [f for f in os.listdir(attachments_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
image_paths = [os.path.join(attachments_dir, img) for img in images]
image_paths_uniques = filtrer_images_uniques(image_paths)
images = [os.path.basename(p) for p in image_paths_uniques]
images_count = len(images)
# Tri des images
for img in images:
img_path = os.path.join(attachments_dir, img)
if self.image_sorter:
logger.info(f"Évaluation de la pertinence de l'image: {img}")
print(f" Évaluation de l'image: {img}")
sorting_result = self.image_sorter.executer(img_path)
is_relevant = sorting_result.get("is_relevant", False)
reason = sorting_result.get("reason", "")
# Log détaillé du résultat
if is_relevant:
logger.info(f"Image {img} considérée comme pertinente")
else:
logger.info(f"Image {img} considérée comme non pertinente")
# Ajouter les métadonnées de tri à la liste des analyses
images_analyses[img_path] = {
"sorting": sorting_result,
"analysis": None # Sera rempli plus tard si pertinent
}
if is_relevant:
logger.info(f"Image pertinente identifiée: {img} ({reason})")
print(f" => Pertinente: {reason}")
relevant_images.append(img_path)
else:
logger.info(f"Image non pertinente: {img} ({reason})")
print(f" => Non pertinente: {reason}")
else:
logger.warning("Image Sorter non disponible")
# Si pas de tri, considérer toutes les images comme pertinentes
relevant_images.append(img_path)
images_analyses[img_path] = {
"sorting": {"is_relevant": True, "reason": "Auto-sélectionné (pas de tri)"},
"analysis": None
}
print(f" => Auto-sélectionné (pas de tri)")
logger.info(f"Images analysées: {images_count}, Images pertinentes: {len(relevant_images)}")
print(f" Images analysées: {images_count}, Images pertinentes: {len(relevant_images)}")
else:
logger.warning(f"Répertoire des pièces jointes non trouvé: {attachments_dir}")
print(f" Répertoire des pièces jointes non trouvé")
json_path = self.trouver_rapport(extraction_path, ticket_id)
ticket_data = self._preparer_donnees_ticket(json_path)
if not ticket_data:
continue
# Analyse approfondie des images pertinentes
if relevant_images and self.image_analyser:
agent_info = self._get_agent_info(self.image_analyser)
logger.info(f"Agent Image Analyser: {json.dumps(agent_info, indent=2)}")
# S'assurer que l'analyse du ticket est disponible comme contexte
contexte_ticket = ticket_analysis if ticket_analysis else "Aucune analyse de ticket disponible"
# Analyse de chaque image pertinente
for image_path in relevant_images:
image_name = os.path.basename(image_path)
logger.info(f"Analyse approfondie de l'image: {image_name}")
print(f" Analyse approfondie de l'image: {image_name}")
# Appeler l'analyseur d'images avec le contexte du ticket
analysis_result = self.image_analyser.executer(image_path, contexte=contexte_ticket)
if images_analyses[image_path]:
images_analyses[image_path]["analysis"] = analysis_result
logger.info(f"Analyse complétée pour {image_name}")
# Préparer les données pour le rapport final
rapport_data = {
"ticket_data": ticket_data,
"ticket_id": ticket_id,
"ticket_analyse": ticket_analysis,
"analyse_images": images_analyses,
"metadata": {
"timestamp_debut": self._get_timestamp(),
"ticket_id": ticket_id,
"images_analysees": images_count,
"images_pertinentes": len(relevant_images)
}
}
# Génération du rapport final
if self.report_generator:
logger.info("Génération du rapport final")
print(" Génération du rapport final")
# Log détaillé sur l'agent Report Generator
agent_info = self._get_agent_info(self.report_generator)
logger.info(f"Agent Report Generator: {json.dumps(agent_info, indent=2)}")
# Créer le répertoire pour le rapport dans reports/
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__)))
reports_root_dir = os.path.join(project_root, 'reports')
ticket_reports_dir = os.path.join(reports_root_dir, ticket_id)
# Créer le sous-répertoire pour le modèle spécifique
model_name = getattr(self.report_generator.llm, "modele", str(type(self.report_generator.llm)))
model_reports_dir = os.path.join(ticket_reports_dir, model_name)
os.makedirs(model_reports_dir, exist_ok=True)
# Générer le rapport
json_path, md_path = self.report_generator.executer(rapport_data, model_reports_dir)
if json_path:
logger.info(f"Rapport JSON généré à: {json_path}")
print(f" Rapport JSON généré avec succès: {os.path.basename(json_path)}")
# Utiliser directement le rapport Markdown généré par l'agent
if md_path:
logger.info(f"Rapport Markdown généré à: {md_path}")
print(f" Rapport Markdown généré avec succès: {os.path.basename(md_path)}")
else:
logger.warning("Report Generator non disponible")
print(" Report Generator non disponible, génération de rapport ignorée")
ticket_analysis = self.ticket_agent.executer(ticket_data) if self.ticket_agent else None
print(f"Traitement du ticket {os.path.basename(ticket_path)} terminé avec succès.\n")
logger.info(f"Traitement du ticket {ticket_path} terminé avec succès.")
else:
logger.warning(f"Aucune donnée de ticket trouvée pour: {ticket_id}")
print(f" ERREUR: Aucune donnée de ticket trouvée pour {ticket_id}")
if not extractions_trouvees:
logger.warning(f"Aucune extraction trouvée dans le ticket: {ticket_path}")
print(f" ERREUR: Aucune extraction trouvée dans le ticket")
return success
def _preparer_donnees_ticket(self, rapports: Dict[str, Optional[str]], ticket_id: str) -> Optional[Dict]:
"""
Prépare les données du ticket à partir du rapport trouvé (JSON)
"""
ticket_data = None
# Charger le fichier JSON si le chemin existe
json_path = rapports.get("json")
if json_path is not None:
try:
ticket_data = self.ticket_loader.charger(json_path)
logger.info(f"Données JSON chargées depuis: {json_path}")
print(f" Rapport JSON chargé: {os.path.basename(json_path)}")
# Ajouter une métadonnée sur le format source
if "metadata" not in ticket_data:
ticket_data["metadata"] = {}
ticket_data["metadata"]["format_source"] = "json"
except Exception as e:
logger.error(f"Erreur lors du chargement du JSON: {e}")
print(f" ERREUR: Impossible de charger le fichier JSON: {e}")
return None
else:
logger.warning(f"Aucun fichier JSON trouvé pour le ticket {ticket_id}")
print(f" ERREUR: Aucun fichier JSON trouvé pour le ticket {ticket_id}")
return None
def _get_timestamp(self) -> str:
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
from datetime import datetime
return datetime.now().strftime("%Y%m%d_%H%M%S")
relevant_images, images_analyses = [], {}
if os.path.exists(attachments_dir):
images = [f for f in os.listdir(attachments_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
image_paths = filtrer_images_uniques([os.path.join(attachments_dir, img) for img in images])
def _get_agent_info(self, agent: Optional[BaseAgent]) -> Dict:
"""
Récupère les informations détaillées sur un agent.
Args:
agent: L'agent dont on veut récupérer les informations
Returns:
Dictionnaire contenant les informations de l'agent
"""
if not agent:
return {"status": "non configuré"}
# Récupérer les informations du modèle
model_info = {
"nom": agent.nom,
"model": getattr(agent.llm, "modele", str(type(agent.llm))),
}
# Ajouter les paramètres de configuration s'ils sont disponibles directement dans l'agent
# Utiliser getattr avec une valeur par défaut pour éviter les erreurs
model_info["temperature"] = getattr(agent, "temperature", None)
model_info["top_p"] = getattr(agent, "top_p", None)
model_info["max_tokens"] = getattr(agent, "max_tokens", None)
# Ajouter le prompt système s'il est disponible
if hasattr(agent, "system_prompt"):
prompt_preview = getattr(agent, "system_prompt", "")
# Tronquer le prompt s'il est trop long
if prompt_preview and len(prompt_preview) > 200:
prompt_preview = prompt_preview[:200] + "..."
model_info["system_prompt_preview"] = prompt_preview
# Supprimer les valeurs None
model_info = {k: v for k, v in model_info.items() if v is not None}
return model_info
for path in image_paths:
result_tri = self.image_sorter.executer(path) if self.image_sorter else {"is_relevant": True, "reason": "pas de tri"}
is_rel = result_tri.get("is_relevant", True)
images_analyses[path] = {"sorting": result_tri, "analysis": None}
if is_rel:
relevant_images.append(path)
for path in relevant_images:
result = self.image_analyser.executer(path, contexte=ticket_analysis) if self.image_analyser else None
if result:
images_analyses[path]["analysis"] = result
if self.report_generator:
rapport_data = {
"ticket_id": ticket_id,
"ticket_data": ticket_data,
"ticket_analyse": ticket_analysis,
"analyse_images": images_analyses
}
reports_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../reports"))
dest_dir = os.path.join(reports_root, ticket_id)
os.makedirs(dest_dir, exist_ok=True)
self.report_generator.executer(rapport_data, dest_dir)
def _detecter_tickets(self) -> List[str]:
return [d for d in os.listdir(self.output_dir) if os.path.isdir(os.path.join(self.output_dir, d)) and d.startswith("ticket_")]

View File

@ -13,5 +13,25 @@
"max_tokens": 7000
}
}
},
{
"prompt": "### TICKET T11143\n\n--- MESSAGE INITIAL DU CLIENT ---\nAuteur : GIRAUD TP (JCG), Victor BOLLÉE, v.bollee@labojcg.fr\nDate : 03/04/2025 08:34\nContenu :\nBRGLAB - Essai inaccessible\n*Contenu non extractible*\n\n--- MESSAGE 1 ---\nAuteur : Fabien LAFAY\nDate : 03/04/2025 08:35\nType : Système\nSujet : Re: [T11143] BRGLAB - Essai inaccessible\nContenu :\nGIRAUD TP (JCG), Victor BOLLÉE\n-\nil y a 9 minutes\n;\nFabien LAFAY\n;\nRomuald GRUSON\n;\nsupport\n;\nsupport\n-\nQuentin FAIVRE\n-\nFabien LAFAY\n-\nRomuald GRUSON\nBonjour,\nJe ne parviens pas à accéder au lessai au bleu :\nMerci par avance pour votre.\nCordialement\n![Image](https://odoo.cbao.fr/web/image/145435?access_token=608ac9e7-3627-4a13-a8ec-06ff5046ebf3)\n\n---\n\n--- MESSAGE 2 ---\nAuteur : Fabien LAFAY\nDate : 03/04/2025 12:17\nType : E-mail\nSujet : Re: [T11143] - BRGLAB - Essai inaccessible\nContenu :\nBonjour\n,\nPouvez-vous vérifier si vous avez bien accès à la page suivante en l'ouvrant dans votre navigateur :\nhttps://zk1.brg-lab.com/\nVoici ce que vous devriez voir affiché :\nSi ce n'est pas le cas, pouvez-vous me faire une capture d'écran de ce qui est affiché?\nJe reste à votre entière disposition pour toute information complémentaire.\nCordialement,\n---\nSupport technique\n\n- image.png (image/png) [ID: 145453]\n\n---\n\n--- MESSAGE 3 ---\nAuteur : Victor BOLLÉE\nDate : 03/04/2025 12:21\nType : E-mail\nSujet : TR: [T11143] - BRGLAB - Essai inaccessible\nContenu :\nBonjour,\nLe problème sest résolu seul par la suite.\nJe vous remercie pour votre retour.\nBonne journée\nPS : ladresse fonctionne\nDe :\n\n---\n\n",
"response": "### Résumé du contexte\n- **Client** : Victor BOLLÉE, v.bollee@labojcg.fr\n- **Sujet du ticket reformulé en une ou plusieurs questions** :\n - Pourquoi l'essai au bleu est-il inaccessible ?\n - Comment résoudre le problème d'accès à l'essai au bleu ?\n- **Description technique synthétique** :\n - Le client ne parvient pas à accéder à l'essai au bleu.\n - Le problème s'est résolu de lui-même par la suite.\n - L'adresse https://zk1.brg-lab.com/ fonctionne.\n\n### Informations techniques détectées\n- **Logiciels/modules mentionnés** :\n - Essai au bleu\n- **Paramètres évoqués** :\n - Aucun paramètre spécifique mentionné.\n- **Fonctionnalités impactées** :\n - Accès à l'essai au bleu.\n- **Conditions spécifiques** :\n - Aucune condition spécifique mentionnée.\n\n### Fil de discussion (filtrée, nettoyée, classée)\n- **Intervenant** : Victor BOLLÉE (Client)\n - **Date** : 03/04/2025 08:34\n - **Contenu** :\n - Bonjour, Je ne parviens pas à accéder au lessai au bleu. Merci par avance pour votre. Cordialement\n - **Résumés techniques** :\n - Problème d'accès à l'essai au bleu.\n\n- **Intervenant** : Fabien LAFAY (Support)\n - **Date** : 03/04/2025 12:17\n - **Contenu** :\n - Bonjour, Pouvez-vous vérifier si vous avez bien accès à la page suivante en l'ouvrant dans votre navigateur : https://zk1.brg-lab.com/ Voici ce que vous devriez voir affiché : Si ce n'est pas le cas, pouvez-vous me faire une capture d'écran de ce qui est affiché? Je reste à votre entière disposition pour toute information complémentaire. Cordialement,\n - **Résumés techniques** :\n - Demande de vérification de l'accès à l'URL https://zk1.brg-lab.com/.\n - Demande de capture d'écran si l'affichage n'est pas conforme.\n\n- **Intervenant** : Victor BOLLÉE (Client)\n - **Date** : 03/04/2025 12:21\n - **Contenu** :\n - Bonjour, Le problème sest résolu seul par la suite. Je vous remercie pour votre retour. Bonne journée PS : ladresse fonctionne\n - **Résumés techniques** :\n - Le problème d'accès s'est résolu de lui-même.\n - L'adresse https://zk1.brg-lab.com/ fonctionne.\n\n### Éléments liés à l'analyse visuelle\n- **Nombre d'images attachées** : 1\n- **Références aux interfaces ou options à visualiser** :\n - Capture d'écran de l'affichage de la page https://zk1.brg-lab.com/.\n- **Points à vérifier dans les captures** :\n - Vérifier si l'affichage de la page https://zk1.brg-lab.com/ est conforme à ce qui est attendu.",
"metadata": {
"timestamp": "20250418_144351",
"source_agent": "AgentTicketAnalyser",
"ticket_id": "T11143",
"model_info": {
"model": "mistral-large-latest",
"temperature": 0.1,
"top_p": 0.2,
"max_tokens": 7000,
"presence_penalty": 0,
"frequency_penalty": 0,
"stop": [],
"stream": false,
"n": 1
}
}
}
]