import os import json import logging import time import traceback from typing import List, Dict, Any, Optional from agents.base_agent import BaseAgent from loaders.ticket_data_loader import TicketDataLoader from utils.image_dedup import filtrer_images_uniques # Configuration du logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename='orchestrator_llama.log', filemode='w') logger = logging.getLogger("OrchestratorLlamaVision") class OrchestratorLlamaVision: """Orchestrateur pour l'analyse des tickets avec llama_vision.""" def __init__(self, output_dir: str = "output/", ticket_agent: Optional[BaseAgent] = None, image_sorter: Optional[BaseAgent] = None, image_analyser: Optional[BaseAgent] = None, report_generator: Optional[BaseAgent] = None, config: Optional[Dict[str, Any]] = None): """Initialisation de l'orchestrateur.""" self.output_dir = output_dir self.ticket_agent = ticket_agent self.image_sorter = image_sorter self.image_analyser = image_analyser self.report_generator = report_generator self.ticket_loader = TicketDataLoader() self.config = { "dedup_enabled": True, "dedup_threshold": 5, "save_results": True, "debug_mode": False, "reports_dir": "reports" } if config: self.config.update(config) logger.info(f"OrchestratorLlamaVision initialisé avec les paramètres: {self.config}") def executer(self, ticket_id: Optional[str] = None): ticket_path = os.path.join(self.output_dir, f"ticket_{ticket_id}") if not os.path.exists(ticket_path): logger.error(f"Le ticket {ticket_id} est introuvable dans {ticket_path}") return try: self.traiter_ticket(ticket_path) except Exception as e: logger.error(f"Erreur globale sur le ticket {ticket_id}: {e}") if self.config.get("debug_mode"): logger.error(traceback.format_exc()) def traiter_ticket(self, ticket_path: str): ticket_id = os.path.basename(ticket_path).replace("ticket_", "") logger.info(f"Traitement du ticket {ticket_id}") extractions = self._trouver_extractions(ticket_path, ticket_id) if not extractions: logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}") return extraction_path = extractions[0] attachments_dir = os.path.join(extraction_path, "attachments") rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports") os.makedirs(rapport_dir, exist_ok=True) json_path = self.ticket_loader.trouver_ticket(extraction_path, ticket_id) ticket_data = self._charger_ticket(json_path) if not ticket_data: return ticket_analysis = None if self.ticket_agent: try: ticket_analysis = self.ticket_agent.executer(ticket_data) except Exception as e: logger.error(f"Erreur analyse ticket : {e}") if self.config.get("debug_mode"): logger.error(traceback.format_exc()) images_analyses, relevant_images = {}, [] if os.path.exists(attachments_dir): images = self._lister_images(attachments_dir) if self.config.get("dedup_enabled", True): images = filtrer_images_uniques(images, seuil_hamming=self.config["dedup_threshold"], ticket_id=ticket_id) for img in images: result_sort = {} is_relevant = True if self.image_sorter: try: result_sort = self.image_sorter.executer(img) is_relevant = result_sort.get("is_relevant", True) except Exception as e: logger.warning(f"Erreur tri image {os.path.basename(img)}: {e}") if is_relevant: relevant_images.append(img) images_analyses[img] = { "sorting": result_sort or {"is_relevant": True}, "analysis": None } if self.image_analyser and ticket_analysis: for img in relevant_images: try: result = self.image_analyser.executer(img, contexte=ticket_analysis) images_analyses[img]["analysis"] = result except Exception as e: logger.warning(f"Erreur analyse image {os.path.basename(img)}: {e}") if self.report_generator and ticket_analysis: try: rapport_data = { "ticket_id": ticket_id, "ticket_data": ticket_data, "ticket_analyse": ticket_analysis, "analyse_images": images_analyses } dest_dir = os.path.join(self.config["reports_dir"], ticket_id) os.makedirs(dest_dir, exist_ok=True) rapport_final = self.report_generator.executer(rapport_data) if self.config.get("save_results", True): with open(os.path.join(dest_dir, f"rapport_final_{ticket_id}.json"), "w", encoding="utf-8") as f: json.dump({ "ticket_id": ticket_id, "rapport": rapport_final, "metadata": { "images_total": len(images_analyses), "images_pertinentes": len(relevant_images), "images_analysees": sum(1 for x in images_analyses.values() if x.get("analysis")), "timestamp": time.strftime("%Y%m%d_%H%M%S") } }, f, ensure_ascii=False, indent=2) with open(os.path.join(dest_dir, f"rapport_final_{ticket_id}.txt"), "w", encoding="utf-8") as f: f.write(f"RAPPORT TICKET {ticket_id}\n{'='*40}\n\n") f.write(rapport_final) except Exception as e: logger.error(f"Erreur lors du rapport : {e}") if self.config.get("debug_mode"): logger.error(traceback.format_exc()) logger.info(f"Traitement terminé pour le ticket {ticket_id}") def _charger_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]: if not json_path: logger.warning("Aucun chemin JSON fourni") return None try: return self.ticket_loader.charger(json_path) except Exception as e: logger.error(f"Erreur chargemnt ticket JSON : {e}") return None def _trouver_extractions(self, ticket_path: str, ticket_id: str) -> List[str]: return sorted( [os.path.join(ticket_path, d) for d in os.listdir(ticket_path) if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)], key=lambda x: os.path.getmtime(x), reverse=True ) def _lister_images(self, dossier: str) -> List[str]: extensions = ['.png', '.jpeg', 'jpg', '.gif', '.webp', '.bmp'] images = [] for racine, _, fichiers in os.walk(dossier): for f in fichiers: if any(f.lower().endswith(ext) for ext in extensions): images.append(os.path.join(racine, f)) return images