import os import json import logging import time import traceback from typing import List, Dict, Any, Optional, Tuple from agents.base_agent import BaseAgent from loaders.ticket_data_loader import TicketDataLoader from utils.image_dedup import filtrer_images_uniques from utils.ocr_utils import extraire_texte from utils.translate_utils import fr_to_en, en_to_fr, sauvegarder_ocr_traduction # Configuration du logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename='orchestrator_llama.log', filemode='w') logger = logging.getLogger("OrchestratorLlamaVision") class OrchestratorLlamaVision: """Orchestrateur pour l'analyse des tickets avec llama_vision.""" def __init__(self, output_dir: str = "output/", ticket_agent: Optional[BaseAgent] = None, image_sorter: Optional[BaseAgent] = None, image_analyser: Optional[BaseAgent] = None, report_generator: Optional[BaseAgent] = None, config: Optional[Dict[str, Any]] = None): """Initialisation de l'orchestrateur.""" self.output_dir = output_dir self.ticket_agent = ticket_agent self.image_sorter = image_sorter self.image_analyser = image_analyser self.report_generator = report_generator self.ticket_loader = TicketDataLoader() self.config = { "dedup_enabled": True, "dedup_threshold": 5, "save_results": True, "debug_mode": False, "reports_dir": "reports", "ocr_enabled": True, "english_only": True, "model_name": "llama3-vision-90b-instruct" # Nom du modèle par défaut } if config: self.config.update(config) # Assurer la cohérence des noms de modèles if "model_name" in self.config: self.config["model_name"] = self.config["model_name"].replace(".", "-").replace(":", "-").replace("_", "-") logger.info(f"OrchestratorLlamaVision initialisé avec les paramètres: {self.config}") def executer(self, ticket_id: Optional[str] = None): ticket_path = os.path.join(self.output_dir, f"ticket_{ticket_id}") if not os.path.exists(ticket_path): logger.error(f"Le ticket {ticket_id} est introuvable dans {ticket_path}") return try: self.traiter_ticket(ticket_path) except Exception as e: logger.error(f"Erreur globale sur le ticket {ticket_id}: {e}") if self.config.get("debug_mode"): logger.error(traceback.format_exc()) def traiter_ticket(self, ticket_path: str): ticket_id = os.path.basename(ticket_path).replace("ticket_", "") logger.info(f"Traitement du ticket {ticket_id}") extractions = self._trouver_extractions(ticket_path, ticket_id) if not extractions: logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}") return extraction_path = extractions[0] attachments_dir = os.path.join(extraction_path, "attachments") rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports") os.makedirs(rapport_dir, exist_ok=True) # Créer le répertoire pipeline une seule fois pipeline_dir = os.path.join(rapport_dir, "pipeline") os.makedirs(pipeline_dir, exist_ok=True) # Récupérer le nom du modèle pour le logging model_name = self.config.get("model_name", "llama3-vision-90b-instruct") logger.info(f"Utilisation du modèle: {model_name}") json_path = self.ticket_loader.trouver_ticket(extraction_path, ticket_id) ticket_data = self._charger_ticket(json_path) if not ticket_data: return # Ajouter le chemin du fichier JSON au ticket_data pour faciliter l'extraction du ticket_id if json_path: ticket_data["file_path"] = json_path # Traduire le contenu du ticket en anglais avant l'analyse if self.config.get("english_only") and ticket_data.get("content"): logger.info(f"Traduction du contenu du ticket {ticket_id} en anglais") ticket_data["content_original"] = ticket_data["content"] ticket_data["content"] = fr_to_en(ticket_data["content"]) logger.info(f"Traduction terminée: {len(ticket_data['content'])} caractères") ticket_analysis = None if self.ticket_agent: try: ticket_analysis = self.ticket_agent.executer(ticket_data) except Exception as e: logger.error(f"Erreur analyse ticket : {e}") if self.config.get("debug_mode"): logger.error(traceback.format_exc()) images_analyses, relevant_images = {}, [] ocr_results = {} if os.path.exists(attachments_dir): images = self._lister_images(attachments_dir) if self.config.get("dedup_enabled", True): images = filtrer_images_uniques(images, seuil_hamming=self.config["dedup_threshold"], ticket_id=ticket_id) # Réaliser l'OCR sur toutes les images avant le tri if self.config.get("ocr_enabled", True): logger.info(f"Traitement OCR de {len(images)} images") for img in images: try: ocr_fr, langue = extraire_texte(img, lang="auto") # Traduire le texte extrait en anglais pour une meilleure analyse ocr_en = fr_to_en(ocr_fr) if ocr_fr else "" # Traduire à nouveau en français pour vérification (optionnel) ocr_en_back_fr = en_to_fr(ocr_en) if ocr_en else "" # Sauvegarder les résultats OCR directement dans le répertoire pipeline # au lieu de créer un sous-répertoire T11143 sauvegarder_ocr_traduction(img, ticket_id, ocr_fr, ocr_en, ocr_en_back_fr, base_dir=rapport_dir) # Stocker le résultat de l'OCR pour utilisation ultérieure ocr_results[img] = { "texte_fr": ocr_fr, "texte_en": ocr_en, "langue_detectee": langue } logger.info(f"OCR terminé pour {os.path.basename(img)}: {len(ocr_fr)} caractères ({langue})") except Exception as e: logger.warning(f"Erreur OCR pour {os.path.basename(img)}: {e}") ocr_results[img] = {"texte_fr": "", "texte_en": "", "langue_detectee": "unknown"} # Traiter toutes les images avec l'agent de tri if self.image_sorter: logger.info(f"Traitement de {len(images)} images uniques avec l'agent de tri") # Analyser toutes les images for img in images: try: # Inclure l'OCR avec le chemin de l'image pour aider au tri ocr_context = ocr_results.get(img, {"texte_en": ""}).get("texte_en", "") result_sort = self.image_sorter.executer(img, ocr_context=ocr_context) is_relevant = result_sort.get("is_relevant", True) if is_relevant: relevant_images.append(img) images_analyses[img] = { "sorting": result_sort or {"is_relevant": True}, "analysis": None, "ocr": ocr_results.get(img, {}) } except Exception as e: logger.warning(f"Erreur tri image {os.path.basename(img)}: {e}") # Sauvegarder tous les résultats en une seule fois if self.image_sorter: # Utiliser une approche plus générique pour éviter les erreurs de linter try: # Essayer d'appeler la méthode si elle existe sauvegarder_func = getattr(self.image_sorter, "sauvegarder_resultats", None) if sauvegarder_func and callable(sauvegarder_func): sauvegarder_func() logger.info("Sauvegarde groupée des résultats de tri effectuée") else: logger.info("L'agent de tri ne dispose pas de la méthode sauvegarder_resultats") except Exception as e: logger.warning(f"Erreur lors de la sauvegarde des résultats: {e}") else: # Si pas d'agent de tri, considérer toutes les images comme pertinentes relevant_images = images.copy() for img in images: images_analyses[img] = { "sorting": {"is_relevant": True}, "analysis": None, "ocr": ocr_results.get(img, {}) } # Analyser les images pertinentes avec l'agent d'analyse d'images if self.image_analyser and ticket_analysis: for img in relevant_images: try: # Intégrer les résultats de l'OCR dans le contexte ocr_info = ocr_results.get(img, {}) contexte_enrichi = self._enrichir_contexte(ticket_analysis, ocr_info) result = self.image_analyser.executer(img, contexte=contexte_enrichi) images_analyses[img]["analysis"] = result except Exception as e: logger.warning(f"Erreur analyse image {os.path.basename(img)}: {e}") if self.report_generator and ticket_analysis: try: rapport_data = { "ticket_id": ticket_id, "ticket_data": ticket_data, "ticket_analyse": ticket_analysis, "analyse_images": images_analyses } # Utiliser directement le répertoire rapports_dir au lieu de recréer un chemin rapport_final = self.report_generator.executer(rapport_data) # Le rapport_generator est responsable de sauvegarder les fichiers # Nous n'avons pas besoin de dupliquer les sauvegardes ici # Les files sont sauvegardés via la fonction sauvegarder_donnees dans le pipeline except Exception as e: logger.error(f"Erreur lors du rapport : {e}") if self.config.get("debug_mode"): logger.error(traceback.format_exc()) logger.info(f"Traitement terminé pour le ticket {ticket_id}") def _enrichir_contexte(self, ticket_analysis: Dict[str, Any], ocr_info: Dict[str, Any]) -> Dict[str, Any]: """ Enrichit le contexte de l'analyse du ticket avec les informations OCR Args: ticket_analysis: Résultat de l'analyse du ticket ocr_info: Informations OCR d'une image Returns: Contexte enrichi """ if not isinstance(ticket_analysis, dict): return ticket_analysis # Créer une copie pour ne pas modifier l'original contexte_enrichi = ticket_analysis.copy() # Ajouter les informations OCR if ocr_info: contexte_enrichi["ocr_info"] = ocr_info # Utiliser la version anglaise du texte OCR pour l'analyse if self.config.get("english_only") and ocr_info.get("texte_en"): contexte_enrichi["ocr_text"] = ocr_info["texte_en"] else: contexte_enrichi["ocr_text"] = ocr_info.get("texte_fr", "") return contexte_enrichi def _charger_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]: if not json_path: logger.warning("Aucun chemin JSON fourni") return None try: return self.ticket_loader.charger(json_path) except Exception as e: logger.error(f"Erreur chargemnt ticket JSON : {e}") return None def _trouver_extractions(self, ticket_path: str, ticket_id: str) -> List[str]: return sorted( [os.path.join(ticket_path, d) for d in os.listdir(ticket_path) if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)], key=lambda x: os.path.getmtime(x), reverse=True ) def _lister_images(self, dossier: str) -> List[str]: """ Liste toutes les images dans un dossier avec une reconnaissance étendue des formats d'images. Args: dossier: Dossier contenant les images à analyser Returns: Liste des chemins d'images trouvées """ # Liste étendue des extensions d'images courantes extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.tiff', '.tif'] images = [] # Parcourir le dossier pour trouver toutes les images if os.path.exists(dossier): for racine, _, fichiers in os.walk(dossier): for f in fichiers: # Vérifier l'extension du fichier (non sensible à la casse) if any(f.lower().endswith(ext) for ext in extensions): chemin_complet = os.path.join(racine, f) # Vérifier que le fichier est bien une image valide et accessible try: from PIL import Image with Image.open(chemin_complet) as img: # S'assurer que c'est bien une image en vérifiant ses dimensions width, height = img.size if width > 0 and height > 0: images.append(chemin_complet) except Exception as e: logger.warning(f"Image ignorée {f}: {str(e)}") if not images: logger.warning(f"Aucune image trouvée dans {dossier}") else: logger.info(f"{len(images)} images trouvées dans {dossier}") return images