llm_ticket3/orchestrator.py
2025-04-14 11:19:55 +02:00

477 lines
24 KiB
Python

import os
import json
import logging
import time
import traceback
from typing import List, Dict, Any, Optional, Union, Mapping, cast
from agents.base_agent import BaseAgent
from loaders.ticket_data_loader import TicketDataLoader
from agents.utils.report_formatter import generer_rapport_markdown
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='orchestrator.log', filemode='w')
logger = logging.getLogger("Orchestrator")
class Orchestrator:
"""
Orchestrateur pour l'analyse de tickets et la génération de rapports.
Stratégie de gestion des formats:
- JSON est le format principal pour le traitement des données et l'analyse
- Markdown est utilisé uniquement comme format de présentation finale
- Les agents LLM travaillent principalement avec le format JSON
- La conversion JSON->Markdown se fait uniquement à la fin du processus pour la présentation
Cette approche permet de:
1. Simplifier le code des agents
2. Réduire les redondances et incohérences entre formats
3. Améliorer la performance des agents LLM avec un format plus structuré
4. Faciliter la maintenance et l'évolution du système
"""
def __init__(self,
output_dir: str = "output/",
ticket_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None):
self.output_dir = output_dir
# Assignation directe des agents
self.ticket_agent = ticket_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
# Initialisation du loader de données de ticket
self.ticket_loader = TicketDataLoader()
# Collecter et enregistrer les informations détaillées sur les agents
agents_info = self._collecter_info_agents()
logger.info(f"Orchestrator initialisé avec output_dir: {output_dir}")
logger.info(f"Agents disponibles: TicketAgent={ticket_agent is not None}, ImageSorter={image_sorter is not None}, ImageAnalyser={image_analyser is not None}, ReportGenerator={report_generator is not None}")
logger.info(f"Configuration des agents: {json.dumps(agents_info, indent=2)}")
def _collecter_info_agents(self) -> Dict[str, Dict[str, Any]]:
"""
Collecte des informations détaillées sur les agents configurés
"""
agents_info = {}
# Information sur l'agent Ticket
if self.ticket_agent:
agents_info["ticket_agent"] = self._get_agent_info(self.ticket_agent)
# Information sur l'agent Image Sorter
if self.image_sorter:
agents_info["image_sorter"] = self._get_agent_info(self.image_sorter)
# Information sur l'agent Image Analyser
if self.image_analyser:
agents_info["image_analyser"] = self._get_agent_info(self.image_analyser)
# Information sur l'agent Report Generator
if self.report_generator:
agents_info["report_generator"] = self._get_agent_info(self.report_generator)
return agents_info
def detecter_tickets(self) -> List[str]:
"""Détecte tous les tickets disponibles dans le répertoire de sortie"""
logger.info(f"Recherche de tickets dans: {self.output_dir}")
tickets = []
if not os.path.exists(self.output_dir):
logger.warning(f"Le répertoire de sortie {self.output_dir} n'existe pas")
print(f"ERREUR: Le répertoire {self.output_dir} n'existe pas")
return tickets
for ticket_dir in os.listdir(self.output_dir):
ticket_path = os.path.join(self.output_dir, ticket_dir)
if os.path.isdir(ticket_path) and ticket_dir.startswith("ticket_"):
tickets.append(ticket_dir)
return tickets
def trouver_rapport(self, extraction_path: str, ticket_id: str) -> Dict[str, Optional[str]]:
"""
Cherche les rapports disponibles (JSON et/ou MD) pour un ticket
Args:
extraction_path: Chemin vers l'extraction
ticket_id: ID du ticket
Returns:
Dictionnaire avec {"json": chemin_json, "markdown": chemin_md}
"""
# Utiliser la méthode du TicketDataLoader pour trouver les fichiers
result = self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
# S'assurer que nous avons un dictionnaire avec la structure correcte
rapports: Dict[str, Optional[str]] = {"json": None, "markdown": None} if result is None else result
# Si on a un JSON mais pas de Markdown, générer le Markdown à partir du JSON
json_path = rapports.get("json")
if json_path and not rapports.get("markdown"):
logger.info(f"Rapport JSON trouvé sans Markdown correspondant, génération du Markdown: {json_path}")
md_path = generer_rapport_markdown(json_path, True)
if md_path:
rapports["markdown"] = md_path
logger.info(f"Markdown généré avec succès: {md_path}")
else:
logger.warning(f"Erreur lors de la génération du Markdown")
return rapports
def executer(self, ticket_specifique: Optional[str] = None):
"""
Exécute l'orchestrateur soit sur un ticket spécifique, soit sur tous les tickets
Args:
ticket_specifique: Code du ticket spécifique à traiter (optionnel)
"""
start_time = time.time()
# Obtenir la liste des tickets
if ticket_specifique:
# Chercher le ticket spécifique
ticket_path = os.path.join(self.output_dir, f"ticket_{ticket_specifique}")
if os.path.exists(ticket_path):
ticket_dirs = [ticket_path]
logger.info(f"Ticket spécifique à traiter: {ticket_specifique}")
print(f"Ticket spécifique à traiter: {ticket_specifique}")
else:
logger.error(f"Le ticket {ticket_specifique} n'existe pas")
print(f"ERREUR: Le ticket {ticket_specifique} n'existe pas")
return
else:
# Lister tous les tickets
ticket_dirs = [os.path.join(self.output_dir, d) for d in self.detecter_tickets()]
logger.info(f"Tickets à traiter: {len(ticket_dirs)}")
if not ticket_dirs:
logger.warning("Aucun ticket trouvé dans le répertoire de sortie")
print("Aucun ticket trouvé dans le répertoire de sortie")
return
# Un seul log de début d'exécution
logger.info("Début de l'exécution de l'orchestrateur")
print("Début de l'exécution de l'orchestrateur")
# Traitement des tickets
for ticket_dir in ticket_dirs:
try:
self.traiter_ticket(ticket_dir)
except Exception as e:
logger.error(f"Erreur lors du traitement du ticket {ticket_dir}: {str(e)}")
print(f"Erreur lors du traitement du ticket {ticket_dir}: {str(e)}")
traceback.print_exc()
# Calcul de la durée d'exécution
duration = time.time() - start_time
logger.info(f"Fin de l'exécution de l'orchestrateur (durée: {duration:.2f} secondes)")
print(f"Fin de l'exécution de l'orchestrateur (durée: {duration:.2f} secondes)")
def traiter_ticket(self, ticket_path: str) -> bool:
"""Traite un ticket spécifique et retourne True si le traitement a réussi"""
logger.info(f"Début du traitement du ticket: {ticket_path}")
print(f"\nTraitement du ticket: {os.path.basename(ticket_path)}")
success = False
extractions_trouvees = False
if not os.path.exists(ticket_path):
logger.error(f"Le chemin du ticket n'existe pas: {ticket_path}")
print(f"ERREUR: Le chemin du ticket n'existe pas: {ticket_path}")
return False
ticket_id = os.path.basename(ticket_path).replace("ticket_", "")
for extraction in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extraction)
if os.path.isdir(extraction_path):
extractions_trouvees = True
logger.info(f"Traitement de l'extraction: {extraction}")
print(f" Traitement de l'extraction: {extraction}")
# Recherche des rapports (JSON et MD) dans différents emplacements
rapports = self.trouver_rapport(extraction_path, ticket_id)
# Dossier des pièces jointes
attachments_dir = os.path.join(extraction_path, "attachments")
# Dossier pour les rapports générés
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
os.makedirs(rapports_dir, exist_ok=True)
# Préparer les données du ticket à partir des rapports trouvés
ticket_data = self._preparer_donnees_ticket(rapports, ticket_id)
if ticket_data:
success = True
logger.info(f"Données du ticket chargées avec succès")
print(f" Données du ticket chargées")
# Traitement avec l'agent Ticket
if self.ticket_agent:
logger.info("Exécution de l'agent Ticket")
print(" Analyse du ticket en cours...")
# Log détaillé sur l'agent Ticket
agent_info = self._get_agent_info(self.ticket_agent)
logger.info(f"Agent Ticket: {json.dumps(agent_info, indent=2)}")
ticket_analysis = self.ticket_agent.executer(ticket_data)
logger.info("Analyse du ticket terminée")
print(f" Analyse du ticket terminée: {len(ticket_analysis) if ticket_analysis else 0} caractères")
else:
logger.warning("Agent Ticket non disponible")
ticket_analysis = None
print(" Agent Ticket non disponible, analyse ignorée")
# Traitement des images
relevant_images = []
images_analyses = {}
images_count = 0
if os.path.exists(attachments_dir):
logger.info(f"Vérification des pièces jointes dans: {attachments_dir}")
print(f" Vérification des pièces jointes...")
# Log détaillé sur l'agent Image Sorter
if self.image_sorter:
agent_info = self._get_agent_info(self.image_sorter)
logger.info(f"Agent Image Sorter: {json.dumps(agent_info, indent=2)}")
# Compter le nombre d'images
images = [f for f in os.listdir(attachments_dir)
if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
images_count = len(images)
# Tri des images
for img in images:
img_path = os.path.join(attachments_dir, img)
if self.image_sorter:
logger.info(f"Évaluation de la pertinence de l'image: {img}")
print(f" Évaluation de l'image: {img}")
sorting_result = self.image_sorter.executer(img_path)
is_relevant = sorting_result.get("is_relevant", False)
reason = sorting_result.get("reason", "")
# Log détaillé du résultat
if is_relevant:
logger.info(f"Image {img} considérée comme pertinente")
else:
logger.info(f"Image {img} considérée comme non pertinente")
# Ajouter les métadonnées de tri à la liste des analyses
images_analyses[img_path] = {
"sorting": sorting_result,
"analysis": None # Sera rempli plus tard si pertinent
}
if is_relevant:
logger.info(f"Image pertinente identifiée: {img} ({reason})")
print(f" => Pertinente: {reason}")
relevant_images.append(img_path)
else:
logger.info(f"Image non pertinente: {img} ({reason})")
print(f" => Non pertinente: {reason}")
else:
logger.warning("Image Sorter non disponible")
# Si pas de tri, considérer toutes les images comme pertinentes
relevant_images.append(img_path)
images_analyses[img_path] = {
"sorting": {"is_relevant": True, "reason": "Auto-sélectionné (pas de tri)"},
"analysis": None
}
print(f" => Auto-sélectionné (pas de tri)")
logger.info(f"Images analysées: {images_count}, Images pertinentes: {len(relevant_images)}")
print(f" Images analysées: {images_count}, Images pertinentes: {len(relevant_images)}")
else:
logger.warning(f"Répertoire des pièces jointes non trouvé: {attachments_dir}")
print(f" Répertoire des pièces jointes non trouvé")
# Analyse approfondie des images pertinentes
if relevant_images and self.image_analyser:
agent_info = self._get_agent_info(self.image_analyser)
logger.info(f"Agent Image Analyser: {json.dumps(agent_info, indent=2)}")
# S'assurer que l'analyse du ticket est disponible comme contexte
contexte_ticket = ticket_analysis if ticket_analysis else "Aucune analyse de ticket disponible"
# Analyse de chaque image pertinente
for image_path in relevant_images:
image_name = os.path.basename(image_path)
logger.info(f"Analyse approfondie de l'image: {image_name}")
print(f" Analyse approfondie de l'image: {image_name}")
# Appeler l'analyseur d'images avec le contexte du ticket
analysis_result = self.image_analyser.executer(image_path, contexte=contexte_ticket)
if images_analyses[image_path]:
images_analyses[image_path]["analysis"] = analysis_result
logger.info(f"Analyse complétée pour {image_name}")
# Préparer les données pour le rapport final
rapport_data = {
"ticket_data": ticket_data,
"ticket_id": ticket_id,
"ticket_analyse": ticket_analysis,
"analyse_images": images_analyses,
"metadata": {
"timestamp_debut": self._get_timestamp(),
"ticket_id": ticket_id,
"images_analysees": images_count,
"images_pertinentes": len(relevant_images)
}
}
# Génération du rapport final
if self.report_generator:
logger.info("Génération du rapport final")
print(" Génération du rapport final")
# Log détaillé sur l'agent Report Generator
agent_info = self._get_agent_info(self.report_generator)
logger.info(f"Agent Report Generator: {json.dumps(agent_info, indent=2)}")
# Créer le répertoire pour le rapport dans reports/
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__)))
reports_root_dir = os.path.join(project_root, 'reports')
ticket_reports_dir = os.path.join(reports_root_dir, ticket_id)
# Créer le sous-répertoire pour le modèle spécifique
model_name = getattr(self.report_generator.llm, "modele", str(type(self.report_generator.llm)))
model_reports_dir = os.path.join(ticket_reports_dir, model_name)
os.makedirs(model_reports_dir, exist_ok=True)
# Générer le rapport
json_path, md_path = self.report_generator.executer(rapport_data, model_reports_dir)
if json_path:
logger.info(f"Rapport JSON généré à: {json_path}")
print(f" Rapport JSON généré avec succès: {os.path.basename(json_path)}")
# Utiliser directement le rapport Markdown généré par l'agent
if md_path:
logger.info(f"Rapport Markdown généré à: {md_path}")
print(f" Rapport Markdown généré avec succès: {os.path.basename(md_path)}")
else:
logger.warning("Report Generator non disponible")
print(" Report Generator non disponible, génération de rapport ignorée")
print(f"Traitement du ticket {os.path.basename(ticket_path)} terminé avec succès.\n")
logger.info(f"Traitement du ticket {ticket_path} terminé avec succès.")
else:
logger.warning(f"Aucune donnée de ticket trouvée pour: {ticket_id}")
print(f" ERREUR: Aucune donnée de ticket trouvée pour {ticket_id}")
if not extractions_trouvees:
logger.warning(f"Aucune extraction trouvée dans le ticket: {ticket_path}")
print(f" ERREUR: Aucune extraction trouvée dans le ticket")
return success
def _preparer_donnees_ticket(self, rapports: Dict[str, Optional[str]], ticket_id: str) -> Optional[Dict]:
"""
Prépare les données du ticket à partir des rapports trouvés (JSON et/ou MD)
Args:
rapports: Dictionnaire avec les chemins des rapports JSON et MD
ticket_id: ID du ticket
Returns:
Dictionnaire avec les données du ticket, ou None si aucun rapport n'est trouvé
"""
ticket_data = None
# Si aucun rapport n'est trouvé
if not rapports or (not rapports.get("json") and not rapports.get("markdown")):
logger.warning(f"Aucun rapport trouvé pour le ticket {ticket_id}")
return None
# Privilégier le format JSON (format principal)
if rapports.get("json") and rapports["json"] is not None:
try:
ticket_data = self.ticket_loader.charger(rapports["json"])
logger.info(f"Données JSON chargées depuis: {rapports['json']}")
print(f" Rapport JSON chargé: {os.path.basename(rapports['json'])}")
# Ajouter une métadonnée sur le format source
if ticket_data and "metadata" not in ticket_data:
ticket_data["metadata"] = {}
if ticket_data:
ticket_data["metadata"]["format_source"] = "json"
except Exception as e:
logger.error(f"Erreur lors du chargement du JSON: {e}")
print(f" ERREUR: Impossible de charger le fichier JSON: {e}")
# Fallback sur le Markdown uniquement si JSON non disponible
if not ticket_data and rapports.get("markdown") and rapports["markdown"] is not None:
try:
# Utiliser le loader pour charger les données depuis le Markdown
ticket_data = self.ticket_loader.charger(rapports["markdown"])
logger.info(f"Données Markdown chargées depuis: {rapports['markdown']} (fallback)")
print(f" Rapport Markdown chargé (fallback): {os.path.basename(rapports['markdown'])}")
# Ajouter une métadonnée sur le format source
if ticket_data and "metadata" not in ticket_data:
ticket_data["metadata"] = {}
if ticket_data:
ticket_data["metadata"]["format_source"] = "markdown"
except Exception as e:
logger.error(f"Erreur lors du chargement du Markdown: {e}")
print(f" ERREUR: Impossible de charger le fichier Markdown: {e}")
# Assurer que l'ID du ticket est correct
if ticket_data:
ticket_data["code"] = ticket_id
return ticket_data
def _get_timestamp(self) -> str:
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
from datetime import datetime
return datetime.now().strftime("%Y%m%d_%H%M%S")
def _get_agent_info(self, agent: Optional[BaseAgent]) -> Dict:
"""
Récupère les informations détaillées sur un agent.
Args:
agent: L'agent dont on veut récupérer les informations
Returns:
Dictionnaire contenant les informations de l'agent
"""
if not agent:
return {"status": "non configuré"}
# Récupérer les informations du modèle
model_info = {
"nom": agent.nom,
"model": getattr(agent.llm, "modele", str(type(agent.llm))),
}
# Ajouter les paramètres de configuration s'ils sont disponibles directement dans l'agent
# Utiliser getattr avec une valeur par défaut pour éviter les erreurs
model_info["temperature"] = getattr(agent, "temperature", None)
model_info["top_p"] = getattr(agent, "top_p", None)
model_info["max_tokens"] = getattr(agent, "max_tokens", None)
# Ajouter le prompt système s'il est disponible
if hasattr(agent, "system_prompt"):
prompt_preview = getattr(agent, "system_prompt", "")
# Tronquer le prompt s'il est trop long
if prompt_preview and len(prompt_preview) > 200:
prompt_preview = prompt_preview[:200] + "..."
model_info["system_prompt_preview"] = prompt_preview
# Supprimer les valeurs None
model_info = {k: v for k, v in model_info.items() if v is not None}
return model_info