llm_ticket3/orchestrator.py
2025-04-07 09:46:27 +02:00

159 lines
8.3 KiB
Python

import os
import json
import logging
from typing import List, Dict, Any, Optional
from agents.base_agent import BaseAgent
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='orchestrator.log', filemode='w')
logger = logging.getLogger("Orchestrator")
class Orchestrator:
def __init__(self,
output_dir: str = "output/",
json_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None):
self.output_dir = output_dir
# Assignation directe des agents (qui peuvent être injectés lors des tests)
self.json_agent = json_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
logger.info(f"Orchestrator initialisé avec output_dir: {output_dir}")
logger.info(f"JSON Agent: {json_agent}")
logger.info(f"Image Sorter: {image_sorter}")
logger.info(f"Image Analyser: {image_analyser}")
logger.info(f"Report Generator: {report_generator}")
def detecter_tickets(self) -> List[str]:
logger.info(f"Recherche de tickets dans: {self.output_dir}")
tickets = []
for ticket_dir in os.listdir(self.output_dir):
ticket_path = os.path.join(self.output_dir, ticket_dir)
if os.path.isdir(ticket_path) and ticket_dir.startswith("ticket_"):
tickets.append(ticket_path)
logger.info(f"Tickets trouvés: {tickets}")
print(f"Tickets détectés: {len(tickets)}")
return tickets
def traiter_ticket(self, ticket_path: str):
logger.info(f"Début du traitement du ticket: {ticket_path}")
print(f"Traitement du ticket: {ticket_path}")
extractions_trouvees = False
for extraction in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extraction)
if os.path.isdir(extraction_path):
extractions_trouvees = True
logger.info(f"Traitement de l'extraction: {extraction_path}")
print(f" Traitement de l'extraction: {extraction}")
attachments_dir = os.path.join(extraction_path, "attachments")
rapport_json_path = os.path.join(extraction_path, f"{extraction.split('_')[0]}_rapport.json")
rapports_dir = os.path.join(extraction_path, f"{extraction.split('_')[0]}_rapports")
logger.info(f"Vérification des chemins: rapport_json_path={rapport_json_path}, existe={os.path.exists(rapport_json_path)}")
print(f" Vérification du rapport JSON: {os.path.exists(rapport_json_path)}")
os.makedirs(rapports_dir, exist_ok=True)
if os.path.exists(rapport_json_path):
with open(rapport_json_path, 'r', encoding='utf-8') as file:
ticket_json = json.load(file)
logger.info(f"Rapport JSON chargé: {rapport_json_path}")
if self.json_agent:
logger.info("Exécution de l'agent JSON")
print(" Analyse du JSON en cours...")
json_analysis = self.json_agent.executer(ticket_json)
logger.info(f"Résultat de l'analyse JSON: {json_analysis[:100]}...")
else:
logger.warning("Agent JSON non disponible")
json_analysis = None
relevant_images = []
if os.path.exists(attachments_dir):
logger.info(f"Vérification des pièces jointes dans: {attachments_dir}")
print(f" Vérification des pièces jointes: {attachments_dir}")
images_count = 0
for attachment in os.listdir(attachments_dir):
attachment_path = os.path.join(attachments_dir, attachment)
if attachment.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
images_count += 1
logger.info(f"Image trouvée: {attachment_path}")
if self.image_sorter:
logger.info(f"Analyse de la pertinence de l'image: {attachment_path}")
print(f" Analyse de la pertinence de l'image: {attachment}")
is_relevant = self.image_sorter.executer(attachment_path)
logger.info(f"Image pertinente: {is_relevant}")
if is_relevant:
relevant_images.append(attachment_path)
else:
logger.warning("Image Sorter non disponible")
logger.info(f"Images analysées: {images_count}, Images pertinentes: {len(relevant_images)}")
print(f" Images analysées: {images_count}, Images pertinentes: {len(relevant_images)}")
else:
logger.warning(f"Répertoire des pièces jointes non trouvé: {attachments_dir}")
print(f" Répertoire des pièces jointes non trouvé: {attachments_dir}")
image_analysis_results = {}
for image_path in relevant_images:
if self.image_analyser and json_analysis:
logger.info(f"Analyse de l'image: {image_path}")
print(f" Analyse de l'image: {os.path.basename(image_path)}")
image_analysis_results[image_path] = self.image_analyser.executer(
image_path,
contexte=json_analysis
)
logger.info(f"Résultat de l'analyse d'image: {image_analysis_results[image_path][:100]}...")
rapport_data = {
"analyse_json": json_analysis,
"analyse_images": image_analysis_results
}
if self.report_generator:
logger.info("Génération du rapport final")
print(" Génération du rapport final")
rapport_path = os.path.join(rapports_dir, extraction.split('_')[0])
self.report_generator.executer(rapport_data, rapport_path)
logger.info(f"Rapport généré à: {rapport_path}")
print(f" Rapport généré à: {rapport_path}")
else:
logger.warning("Report Generator non disponible")
print(f"Traitement du ticket {ticket_path} terminé.\n")
logger.info(f"Traitement du ticket {ticket_path} terminé.")
else:
logger.warning(f"Fichier rapport JSON non trouvé: {rapport_json_path}")
print(f" ERREUR: Fichier rapport JSON non trouvé: {rapport_json_path}")
if not extractions_trouvees:
logger.warning(f"Aucune extraction trouvée dans le ticket: {ticket_path}")
print(f" ERREUR: Aucune extraction trouvée dans le ticket: {ticket_path}")
def executer(self):
logger.info("Début de l'exécution de l'orchestrateur")
print("Début de l'exécution de l'orchestrateur")
tickets = self.detecter_tickets()
if not tickets:
logger.warning("Aucun ticket détecté dans le répertoire de sortie")
print("ATTENTION: Aucun ticket détecté dans le répertoire de sortie")
for ticket in tickets:
self.traiter_ticket(ticket)
logger.info("Fin de l'exécution de l'orchestrateur")
print("Fin de l'exécution de l'orchestrateur")