llm_ticket3/orchestrator.py
2025-04-24 09:45:01 +02:00

354 lines
16 KiB
Python

import os
import json
import logging
import time
import traceback
from typing import List, Dict, Any, Optional, Tuple, Union, Type
from agents.base_agent import BaseAgent
from loaders.ticket_data_loader import TicketDataLoader
from utils.image_dedup import filtrer_images_uniques
# Configuration globale du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='orchestrator.log', filemode='w')
logger = logging.getLogger("Orchestrator")
class Orchestrator:
def __init__(self,
output_dir: str = "output/",
ticket_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None,
config: Optional[Dict[str, Any]] = None):
"""
Initialise l'orchestrateur avec les agents nécessaires et la configuration.
Args:
output_dir: Répertoire de sortie pour les tickets
ticket_agent: Agent d'analyse de tickets
image_sorter: Agent de tri d'images
image_analyser: Agent d'analyse d'images
report_generator: Agent de génération de rapports
config: Configuration supplémentaire pour l'orchestrateur
"""
self.output_dir = output_dir
self.ticket_agent = ticket_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
self.ticket_loader = TicketDataLoader()
# Configuration par défaut
self.config = {
"dedup_enabled": True,
"dedup_threshold": 5,
"save_results": True,
"debug_mode": False,
"reports_dir": "reports"
}
# Mise à jour avec la configuration fournie
if config:
self.config.update(config)
logger.info(f"Orchestrator initialisé avec la configuration: {self.config}")
def trouver_rapport(self, extraction_path: str, ticket_id: str) -> Optional[str]:
"""
Trouve le rapport JSON associé à un ticket dans un chemin d'extraction.
Args:
extraction_path: Chemin de l'extraction
ticket_id: ID du ticket
Returns:
Chemin du rapport trouvé ou None
"""
return self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
def _preparer_donnees_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]:
"""
Charge et prépare les données du ticket à partir d'un fichier JSON.
Args:
json_path: Chemin du fichier JSON
Returns:
Données du ticket chargées ou None en cas d'erreur
"""
if not json_path:
return None
try:
ticket_data = self.ticket_loader.charger(json_path)
logger.info(f"Données du ticket chargées depuis {json_path}")
return ticket_data
except Exception as e:
logger.error(f"Erreur chargement JSON: {e}")
return None
def executer(self, ticket_specifique: Optional[str] = None) -> None:
"""
Exécute l'analyse sur un ticket spécifique ou sur tous les tickets disponibles.
Args:
ticket_specifique: ID du ticket spécifique à analyser
"""
tickets = ([f"ticket_{ticket_specifique}"] if ticket_specifique else self._detecter_tickets())
if not tickets:
logger.warning("Aucun ticket à traiter trouvé.")
return
logger.info(f"Tickets à traiter: {tickets}")
for ticket in tickets:
try:
self.traiter_ticket(os.path.join(self.output_dir, ticket))
except Exception as e:
logger.error(f"Erreur lors du traitement du ticket {ticket}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
def traiter_ticket(self, ticket_path: str) -> None:
"""
Traite un ticket en exécutant tous les agents configurés.
Args:
ticket_path: Chemin du répertoire du ticket
"""
ticket_id = os.path.basename(ticket_path).replace("ticket_", "")
logger.info(f"Traitement du ticket {ticket_id}")
# Trouver l'extraction la plus récente
extractions = self._trouver_extractions(ticket_path, ticket_id)
if not extractions:
logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}")
return
extraction_path = extractions[0]
logger.info(f"Utilisation de l'extraction: {os.path.basename(extraction_path)}")
# Créer les répertoires nécessaires
attachments_dir = os.path.join(extraction_path, "attachments")
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
os.makedirs(rapports_dir, exist_ok=True)
# Charger les données du ticket
json_path = self.trouver_rapport(extraction_path, ticket_id)
ticket_data = self._preparer_donnees_ticket(json_path)
if not ticket_data:
logger.error(f"Impossible de charger les données du ticket {ticket_id}")
return
# Étape 1: Analyser le ticket (si l'agent est disponible)
ticket_analysis = None
if self.ticket_agent:
logger.info(f"Exécution de l'agent d'analyse de ticket pour {ticket_id}")
try:
ticket_analysis = self.ticket_agent.executer(ticket_data)
logger.info(f"Analyse du ticket terminée: {len(ticket_analysis) if ticket_analysis else 0} caractères")
except Exception as e:
logger.error(f"Erreur lors de l'analyse du ticket {ticket_id}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Étape 2: Trier et analyser les images
relevant_images, images_analyses = [], {}
if os.path.exists(attachments_dir) and (self.image_sorter or self.image_analyser):
logger.info(f"Traitement des images pour le ticket {ticket_id}")
# Récupérer toutes les images
images = self._trouver_images_dans_dossier(attachments_dir)
logger.info(f"Images trouvées: {len(images)}")
# Dédupliquer les images si la fonctionnalité est activée
if self.config.get("dedup_enabled", True):
images_avant = len(images)
images = filtrer_images_uniques(
images,
seuil_hamming=self.config.get("dedup_threshold", 5),
ticket_id=ticket_id
)
logger.info(f"Dédoublonnage: {images_avant}{len(images)} images")
# Trier les images
if self.image_sorter:
for path in images:
try:
result_tri = self.image_sorter.executer(path)
is_relevant = result_tri.get("is_relevant", True)
images_analyses[path] = {"sorting": result_tri, "analysis": None}
if is_relevant:
relevant_images.append(path)
logger.info(f"Image pertinente: {os.path.basename(path)}")
except Exception as e:
logger.error(f"Erreur lors du tri de l'image {os.path.basename(path)}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Sauvegarder les résultats accumulés après le traitement de toutes les images
if hasattr(self.image_sorter, "sauvegarder_resultats"):
try:
# Ignorer le type pour éviter l'erreur de linter
sorter_agent = self.image_sorter # type: ignore
sorter_agent.sauvegarder_resultats()
logger.info(f"Résultats de tri d'images sauvegardés pour le ticket {ticket_id}")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des résultats de tri d'images: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
else:
# Sans agent de tri, toutes les images sont considérées comme pertinentes
relevant_images = images
for path in images:
images_analyses[path] = {
"sorting": {"is_relevant": True, "reason": "pas d'agent de tri configuré"},
"analysis": None
}
logger.info(f"Images pertinentes: {len(relevant_images)}/{len(images)}")
# Analyser les images pertinentes
if self.image_analyser and ticket_analysis:
for path in relevant_images:
try:
logger.info(f"Analyse de l'image: {os.path.basename(path)}")
result = self.image_analyser.executer(path, contexte=ticket_analysis)
images_analyses[path]["analysis"] = result
if result and "error" not in result:
logger.info(f"Analyse terminée pour {os.path.basename(path)}")
else:
logger.warning(f"Analyse incomplète pour {os.path.basename(path)}")
except Exception as e:
logger.error(f"Erreur lors de l'analyse de l'image {os.path.basename(path)}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Étape 3: Générer le rapport final
if self.report_generator and ticket_analysis:
try:
logger.info(f"Génération du rapport final pour le ticket {ticket_id}")
rapport_data = {
"ticket_id": ticket_id,
"ticket_data": ticket_data,
"ticket_analyse": ticket_analysis,
"analyse_images": images_analyses
}
# Créer le répertoire pour les rapports si nécessaire
reports_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", self.config.get("reports_dir", "reports")))
dest_dir = os.path.join(reports_root, ticket_id)
os.makedirs(dest_dir, exist_ok=True)
# Générer le rapport
rapport_final = self.report_generator.executer(rapport_data)
# Sauvegarder le rapport
if self.config.get("save_results", True):
rapport_path = os.path.join(dest_dir, f"rapport_final_{ticket_id}.json")
with open(rapport_path, "w", encoding="utf-8") as f:
json.dump(
{
"ticket_id": ticket_id,
"rapport": rapport_final,
"metadata": {
"images_total": len(images_analyses),
"images_pertinentes": len(relevant_images),
"images_analysees": sum(1 for img in images_analyses.values() if img.get("analysis")),
"timestamp": time.strftime("%Y%m%d_%H%M%S")
}
},
f,
ensure_ascii=False,
indent=2
)
logger.info(f"Rapport final sauvegardé dans {rapport_path}")
# Sauvegarder également une version texte du rapport pour faciliter la lecture
rapport_txt_path = os.path.join(dest_dir, f"rapport_final_{ticket_id}.txt")
with open(rapport_txt_path, "w", encoding="utf-8") as f:
f.write(f"RAPPORT D'ANALYSE DU TICKET {ticket_id}\n")
f.write("="*50 + "\n\n")
f.write(rapport_final)
logger.info(f"Version texte du rapport sauvegardée dans {rapport_txt_path}")
except Exception as e:
logger.error(f"Erreur lors de la génération du rapport pour {ticket_id}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
logger.info(f"Traitement du ticket {ticket_id} terminé")
def _detecter_tickets(self) -> List[str]:
"""
Détecte tous les tickets disponibles dans le répertoire de sortie.
Returns:
Liste des noms de répertoires de tickets
"""
if not os.path.exists(self.output_dir):
logger.warning(f"Le répertoire de sortie {self.output_dir} n'existe pas")
return []
return [d for d in os.listdir(self.output_dir)
if os.path.isdir(os.path.join(self.output_dir, d))
and d.startswith("ticket_")]
def _trouver_extractions(self, ticket_path: str, ticket_id: str) -> List[str]:
"""
Trouve toutes les extractions pour un ticket et les trie par date (la plus récente en premier).
Args:
ticket_path: Chemin du répertoire du ticket
ticket_id: ID du ticket
Returns:
Liste des chemins d'extraction triés par date (la plus récente en premier)
"""
if not os.path.exists(ticket_path):
return []
extractions = [
os.path.join(ticket_path, d) for d in os.listdir(ticket_path)
if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)
]
# Trier par date de modification (plus récente en premier)
extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True)
return extractions
def _trouver_images_dans_dossier(self, dossier: str) -> List[str]:
"""
Trouve toutes les images dans un dossier et ses sous-dossiers.
Args:
dossier: Chemin du dossier à explorer
Returns:
Liste des chemins d'images trouvées
"""
extensions = ['.jpg', '.jpeg', '.png', '.gif', '.webp']
images = []
# Explorer le dossier principal
for fichier in os.listdir(dossier):
fichier_path = os.path.join(dossier, fichier)
# Si c'est un fichier avec une extension d'image
if os.path.isfile(fichier_path) and any(fichier.lower().endswith(ext) for ext in extensions):
images.append(fichier_path)
# Si c'est un dossier, l'explorer récursivement
elif os.path.isdir(fichier_path):
for sous_fichier in os.listdir(fichier_path):
sous_fichier_path = os.path.join(fichier_path, sous_fichier)
if os.path.isfile(sous_fichier_path) and any(sous_fichier.lower().endswith(ext) for ext in extensions):
images.append(sous_fichier_path)
return images