mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-16 00:06:53 +01:00
94 lines
4.3 KiB
Python
94 lines
4.3 KiB
Python
import os
|
|
import json
|
|
import logging
|
|
import time
|
|
import traceback
|
|
from typing import List, Dict, Any, Optional
|
|
from agents.base_agent import BaseAgent
|
|
from loaders.ticket_data_loader import TicketDataLoader
|
|
from utils.image_dedup import filtrer_images_uniques
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
|
|
filename='orchestrator.log', filemode='w')
|
|
logger = logging.getLogger("Orchestrator")
|
|
|
|
class Orchestrator:
|
|
def __init__(self,
|
|
output_dir: str = "output/",
|
|
ticket_agent: Optional[BaseAgent] = None,
|
|
image_sorter: Optional[BaseAgent] = None,
|
|
image_analyser: Optional[BaseAgent] = None,
|
|
report_generator: Optional[BaseAgent] = None):
|
|
self.output_dir = output_dir
|
|
self.ticket_agent = ticket_agent
|
|
self.image_sorter = image_sorter
|
|
self.image_analyser = image_analyser
|
|
self.report_generator = report_generator
|
|
self.ticket_loader = TicketDataLoader()
|
|
|
|
def trouver_rapport(self, extraction_path: str, ticket_id: str) -> Optional[str]:
|
|
return self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
|
|
|
|
def _preparer_donnees_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]:
|
|
if not json_path:
|
|
return None
|
|
try:
|
|
return self.ticket_loader.charger(json_path)
|
|
except Exception as e:
|
|
logger.error(f"Erreur chargement JSON: {e}")
|
|
return None
|
|
|
|
def executer(self, ticket_specifique: Optional[str] = None):
|
|
tickets = ([f"ticket_{ticket_specifique}"] if ticket_specifique else self._detecter_tickets())
|
|
for ticket in tickets:
|
|
self.traiter_ticket(os.path.join(self.output_dir, ticket))
|
|
|
|
def traiter_ticket(self, ticket_path: str) -> None:
|
|
ticket_id = os.path.basename(ticket_path).replace("ticket_", "")
|
|
for extraction in os.listdir(ticket_path):
|
|
extraction_path = os.path.join(ticket_path, extraction)
|
|
if not os.path.isdir(extraction_path):
|
|
continue
|
|
attachments_dir = os.path.join(extraction_path, "attachments")
|
|
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
|
|
os.makedirs(rapports_dir, exist_ok=True)
|
|
|
|
json_path = self.trouver_rapport(extraction_path, ticket_id)
|
|
ticket_data = self._preparer_donnees_ticket(json_path)
|
|
if not ticket_data:
|
|
continue
|
|
|
|
ticket_analysis = self.ticket_agent.executer(ticket_data) if self.ticket_agent else None
|
|
|
|
relevant_images, images_analyses = [], {}
|
|
if os.path.exists(attachments_dir):
|
|
images = [f for f in os.listdir(attachments_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
|
|
image_paths = filtrer_images_uniques([os.path.join(attachments_dir, img) for img in images])
|
|
|
|
for path in image_paths:
|
|
result_tri = self.image_sorter.executer(path) if self.image_sorter else {"is_relevant": True, "reason": "pas de tri"}
|
|
is_rel = result_tri.get("is_relevant", True)
|
|
images_analyses[path] = {"sorting": result_tri, "analysis": None}
|
|
if is_rel:
|
|
relevant_images.append(path)
|
|
|
|
for path in relevant_images:
|
|
result = self.image_analyser.executer(path, contexte=ticket_analysis) if self.image_analyser else None
|
|
if result:
|
|
images_analyses[path]["analysis"] = result
|
|
|
|
if self.report_generator:
|
|
rapport_data = {
|
|
"ticket_id": ticket_id,
|
|
"ticket_data": ticket_data,
|
|
"ticket_analyse": ticket_analysis,
|
|
"analyse_images": images_analyses
|
|
}
|
|
reports_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../reports"))
|
|
dest_dir = os.path.join(reports_root, ticket_id)
|
|
os.makedirs(dest_dir, exist_ok=True)
|
|
self.report_generator.executer(rapport_data, dest_dir)
|
|
|
|
def _detecter_tickets(self) -> List[str]:
|
|
return [d for d in os.listdir(self.output_dir) if os.path.isdir(os.path.join(self.output_dir, d)) and d.startswith("ticket_")]
|