llm_ticket3/orchestrator.py.new
2025-04-06 16:51:34 +02:00

207 lines
11 KiB
Plaintext

import os
import json
from typing import List, Dict, Any, Optional
from agents.base_agent import BaseAgent
from datetime import datetime
class Orchestrator:
def __init__(self,
output_dir: str = "output/",
json_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None):
self.output_dir = output_dir
# Assignation directe des agents (qui peuvent être injectés lors des tests)
self.json_agent = json_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
# Métadonnées pour suivre l'utilisation des LLM
self.metadata = {
"ticket_id": None,
"timestamp_debut": datetime.now().strftime("%Y%m%d_%H%M%S"),
"json_agent": self._get_agent_info(json_agent),
"image_sorter": self._get_agent_info(image_sorter),
"image_analyser": self._get_agent_info(image_analyser),
"report_generator": self._get_agent_info(report_generator),
"etapes": []
}
def _get_agent_info(self, agent: Optional[BaseAgent]) -> Dict:
"""
Récupère les informations de base sur un agent.
"""
if not agent:
return {"status": "non configuré"}
info = {
"nom": agent.nom,
"model": getattr(agent.llm, "modele", str(type(agent.llm))),
}
if hasattr(agent, "config"):
info["configuration"] = agent.config.to_dict()
return info
def detecter_tickets(self) -> List[str]:
tickets = []
for ticket_dir in os.listdir(self.output_dir):
ticket_path = os.path.join(self.output_dir, ticket_dir)
if os.path.isdir(ticket_path) and ticket_dir.startswith("ticket_"):
tickets.append(ticket_path)
return tickets
def traiter_ticket(self, ticket_path: str):
# Extraire l'ID du ticket
ticket_id = os.path.basename(ticket_path)
self.metadata["ticket_id"] = ticket_id
self.metadata["timestamp_debut"] = datetime.now().strftime("%Y%m%d_%H%M%S")
for extraction in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extraction)
if os.path.isdir(extraction_path):
attachments_dir = os.path.join(extraction_path, "attachments")
rapport_json_path = os.path.join(extraction_path, f"{extraction.split('_')[0]}_rapport.json")
rapports_dir = os.path.join(extraction_path, f"{extraction.split('_')[0]}_rapports")
os.makedirs(rapports_dir, exist_ok=True)
if os.path.exists(rapport_json_path):
with open(rapport_json_path, 'r', encoding='utf-8') as file:
ticket_json = json.load(file)
# Analyse JSON
json_analysis = None
if self.json_agent:
print(f"Analyse du ticket JSON {ticket_id}...")
json_analysis = self.json_agent.executer(ticket_json)
# Capturer les métadonnées de l'exécution
if self.json_agent.historique:
latest_history = self.json_agent.historique[-1]
self.metadata["etapes"].append({
"agent": "json_agent",
"action": latest_history["action"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"input": str(latest_history["input"])[:100] + "...", # Limiter la taille
"output": str(latest_history["output"])[:100] + "...", # Limiter la taille
"metadata": latest_history["metadata"]
})
print(f" → Analyse JSON terminée")
# Tri et analyse des images
relevant_images = []
image_metadata = {}
if os.path.exists(attachments_dir):
print(f"Traitement des images dans {attachments_dir}...")
for attachment in os.listdir(attachments_dir):
attachment_path = os.path.join(attachments_dir, attachment)
if attachment.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
# Tri des images
is_relevant = False
if self.image_sorter:
print(f" Évaluation de la pertinence de l'image {attachment}...")
is_relevant = self.image_sorter.executer(attachment_path)
# Capturer les métadonnées
if self.image_sorter.historique:
latest_history = self.image_sorter.historique[-1]
image_metadata[attachment] = {
"tri": {
"result": is_relevant,
"metadata": latest_history["metadata"]
}
}
# Ajouter aux étapes
self.metadata["etapes"].append({
"agent": "image_sorter",
"image": attachment,
"action": latest_history["action"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"input": str(latest_history["input"])[:100] + "...",
"output": str(latest_history["output"]),
"metadata": latest_history["metadata"]
})
print(f" → Image {'pertinente' if is_relevant else 'non pertinente'}")
if is_relevant:
relevant_images.append(attachment_path)
# Analyse des images pertinentes
image_analysis_results = {}
if relevant_images:
print(f"Analyse des {len(relevant_images)} images pertinentes...")
for image_path in relevant_images:
image_name = os.path.basename(image_path)
if self.image_analyser and json_analysis:
print(f" Analyse de l'image {image_name}...")
analysis = self.image_analyser.executer(
image_path,
contexte=json_analysis
)
image_analysis_results[image_path] = analysis
# Capturer les métadonnées
if self.image_analyser.historique:
latest_history = self.image_analyser.historique[-1]
if image_name not in image_metadata:
image_metadata[image_name] = {}
image_metadata[image_name]["analyse"] = {
"metadata": latest_history["metadata"]
}
# Ajouter aux étapes
self.metadata["etapes"].append({
"agent": "image_analyser",
"image": image_name,
"action": latest_history["action"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"input": str(latest_history["input"])[:100] + "...",
"output": str(latest_history["output"])[:100] + "...",
"metadata": latest_history["metadata"]
})
print(f" → Analyse terminée")
# Préparation des données pour le rapport
rapport_data = {
"ticket_id": ticket_id,
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"analyse_json": json_analysis,
"analyse_images": image_analysis_results,
"metadata": self.metadata,
"images_metadata": image_metadata
}
# Génération du rapport
print(f"Génération du rapport pour le ticket {ticket_id}...")
if self.report_generator:
rapport_name = f"{ticket_id}_{extraction.split('_')[0]}"
json_path, md_path = self.report_generator.executer(
rapport_data,
os.path.join(rapports_dir, rapport_name)
)
# Capturer les métadonnées
if self.report_generator.historique:
latest_history = self.report_generator.historique[-1]
self.metadata["etapes"].append({
"agent": "report_generator",
"action": latest_history["action"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"input": rapport_name,
"output": f"JSON: {json_path}, MD: {md_path}",
"metadata": latest_history["metadata"]
})
print(f" → Rapports générés: JSON: {json_path}, Markdown: {md_path}")
print(f"Traitement du ticket {ticket_path} terminé.\n")
def executer(self):
tickets = self.detecter_tickets()
for ticket in tickets:
self.traiter_ticket(ticket)