llm_ticket3/orchestrator_llama.py
2025-04-22 16:01:17 +02:00

190 lines
8.0 KiB
Python

import os
import json
import logging
import time
import traceback
from typing import List, Dict, Any, Optional
from agents.base_agent import BaseAgent
from loaders.ticket_data_loader import TicketDataLoader
from utils.image_dedup import filtrer_images_uniques
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='orchestrator_llama.log', filemode='w')
logger = logging.getLogger("OrchestratorLlamaVision")
class OrchestratorLlamaVision:
"""Orchestrateur pour l'analyse des tickets avec llama_vision."""
def __init__(self,
output_dir: str = "output/",
ticket_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None,
config: Optional[Dict[str, Any]] = None):
"""Initialisation de l'orchestrateur."""
self.output_dir = output_dir
self.ticket_agent = ticket_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
self.ticket_loader = TicketDataLoader()
self.config = {
"dedup_enabled": True,
"dedup_threshold": 5,
"save_results": True,
"debug_mode": False,
"reports_dir": "reports"
}
if config:
self.config.update(config)
logger.info(f"OrchestratorLlamaVision initialisé avec les paramètres: {self.config}")
def executer(self, ticket_id: Optional[str] = None):
ticket_path = os.path.join(self.output_dir, f"ticket_{ticket_id}")
if not os.path.exists(ticket_path):
logger.error(f"Le ticket {ticket_id} est introuvable dans {ticket_path}")
return
try:
self.traiter_ticket(ticket_path)
except Exception as e:
logger.error(f"Erreur globale sur le ticket {ticket_id}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
def traiter_ticket(self, ticket_path: str):
ticket_id = os.path.basename(ticket_path).replace("ticket_", "")
logger.info(f"Traitement du ticket {ticket_id}")
extractions = self._trouver_extractions(ticket_path, ticket_id)
if not extractions:
logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}")
return
extraction_path = extractions[0]
attachments_dir = os.path.join(extraction_path, "attachments")
rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
os.makedirs(rapport_dir, exist_ok=True)
json_path = self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
ticket_data = self._charger_ticket(json_path)
if not ticket_data:
return
ticket_analysis = None
if self.ticket_agent:
try:
ticket_analysis = self.ticket_agent.executer(ticket_data)
except Exception as e:
logger.error(f"Erreur analyse ticket : {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
images_analyses, relevant_images = {}, []
if os.path.exists(attachments_dir):
images = self._lister_images(attachments_dir)
if self.config.get("dedup_enabled", True):
images = filtrer_images_uniques(images, seuil_hamming=self.config["dedup_threshold"], ticket_id=ticket_id)
for img in images:
result_sort = {}
is_relevant = True
if self.image_sorter:
try:
result_sort = self.image_sorter.executer(img)
is_relevant = result_sort.get("is_relevant", True)
except Exception as e:
logger.warning(f"Erreur tri image {os.path.basename(img)}: {e}")
if is_relevant:
relevant_images.append(img)
images_analyses[img] = {
"sorting": result_sort or {"is_relevant": True},
"analysis": None
}
if self.image_analyser and ticket_analysis:
for img in relevant_images:
try:
result = self.image_analyser.executer(img, contexte=ticket_analysis)
images_analyses[img]["analysis"] = result
except Exception as e:
logger.warning(f"Erreur analyse image {os.path.basename(img)}: {e}")
if self.report_generator and ticket_analysis:
try:
rapport_data = {
"ticket_id": ticket_id,
"ticket_data": ticket_data,
"ticket_analyse": ticket_analysis,
"analyse_images": images_analyses
}
dest_dir = os.path.join(self.config["reports_dir"], ticket_id)
os.makedirs(dest_dir, exist_ok=True)
rapport_final = self.report_generator.executer(rapport_data)
if self.config.get("save_results", True):
with open(os.path.join(dest_dir, f"rapport_final_{ticket_id}.json"), "w", encoding="utf-8") as f:
json.dump({
"ticket_id": ticket_id,
"rapport": rapport_final,
"metadata": {
"images_total": len(images_analyses),
"images_pertinentes": len(relevant_images),
"images_analysees": sum(1 for x in images_analyses.values() if x.get("analysis")),
"timestamp": time.strftime("%Y%m%d_%H%M%S")
}
}, f, ensure_ascii=False, indent=2)
with open(os.path.join(dest_dir, f"rapport_final_{ticket_id}.txt"), "w", encoding="utf-8") as f:
f.write(f"RAPPORT TICKET {ticket_id}\n{'='*40}\n\n")
f.write(rapport_final)
except Exception as e:
logger.error(f"Erreur lors du rapport : {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
logger.info(f"Traitement terminé pour le ticket {ticket_id}")
def _charger_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]:
if not json_path:
logger.warning("Aucun chemin JSON fourni")
return None
try:
return self.ticket_loader.charger(json_path)
except Exception as e:
logger.error(f"Erreur chargemnt ticket JSON : {e}")
return None
def _trouver_extractions(self, ticket_path: str, ticket_id: str) -> List[str]:
return sorted(
[os.path.join(ticket_path, d) for d in os.listdir(ticket_path)
if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)],
key=lambda x: os.path.getmtime(x),
reverse=True
)
def _lister_images(self, dossier: str) -> List[str]:
extensions = ['.png', '.jpeg', 'jpg', '.gif', '.webp', '.bmp']
images = []
for racine, _, fichiers in os.walk(dossier):
for f in fichiers:
if any(f.lower().endswith(ext) for ext in extensions):
images.append(os.path.join(racine, f))
return images