16:51atesté

This commit is contained in:
Ladebeze66 2025-04-06 16:51:34 +02:00
parent b07a6512ea
commit ffcadc6896
11 changed files with 740 additions and 17 deletions

View File

@ -6,7 +6,7 @@ class AgentImageAnalyser(BaseAgent):
Agent pour analyser les images et extraire les informations pertinentes.
"""
def __init__(self, llm):
super().__init__("AgentImageAnalyser", llm)
super().__init__("AgentImageAnalyser", llm, "image_analyser")
def executer(self, image_description: str, contexte: str) -> str:
prompt = f"Analyse cette image en tenant compte du contexte suivant : {contexte}. Description de l'image : {image_description}"

View File

@ -6,7 +6,7 @@ class AgentImageSorter(BaseAgent):
Agent pour trier les images en fonction de leur contenu.
"""
def __init__(self, llm):
super().__init__("AgentImageSorter", llm)
super().__init__("AgentImageSorter", llm, "image_sorter")
def executer(self, image_description: str) -> bool:
prompt = f"L'image suivante est-elle pertinente pour BRG_Lab ? Description : {image_description}"

View File

@ -6,7 +6,7 @@ class AgentJsonAnalyser(BaseAgent):
Agent pour analyser les fichiers JSON et extraire les informations pertinentes.
"""
def __init__(self, llm):
super().__init__("AgentJsonAnalyser", llm)
super().__init__("AgentJsonAnalyser", llm, "json_analyser")
def executer(self, ticket_json: Dict) -> str:
prompt = f"Analyse ce ticket JSON et identifie les éléments importants : {ticket_json}"

View File

@ -1,28 +1,174 @@
import json
import os
from .base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any
from typing import Dict, Any, Tuple, List
class AgentReportGenerator(BaseAgent):
"""
Agent pour générer un rapport à partir des informations collectées.
"""
def __init__(self, llm):
super().__init__("AgentReportGenerator", llm)
super().__init__("AgentReportGenerator", llm, "report_generator")
def executer(self, rapport_data: Dict, filename: str):
def executer(self, rapport_data: Dict, filename: str) -> Tuple[str, str]:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
#Sauvegarde json
json_path = f"../reports/json_reports/{filename}_{timestamp}.json"
# Ajouter les métadonnées des LLM utilisés
if "metadata" not in rapport_data:
rapport_data["metadata"] = {}
rapport_data["metadata"]["report_generator"] = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"configuration": self.config.to_dict()
}
# Créer les dossiers si nécessaire
reports_dir = "../reports"
json_dir = os.path.join(reports_dir, "json_reports")
md_dir = os.path.join(reports_dir, "markdown_reports")
os.makedirs(json_dir, exist_ok=True)
os.makedirs(md_dir, exist_ok=True)
# Sauvegarde JSON
json_path = f"{json_dir}/{filename}_{timestamp}.json"
with open(json_path, "w", encoding="utf-8") as f_json:
json.dump(rapport_data, f_json, ensure_ascii=False, indent=4)
#Sauvegarde Markdown
md_path =f"../repports/markdown_reports/{filename}_{timestamp}.md"
# Sauvegarde Markdown
md_path = f"{md_dir}/{filename}_{timestamp}.md"
with open(md_path, "w", encoding="utf-8") as f_md:
f_md.write(f"# Rapport {filename}\n\n")
for key, value in rapport_data.items():
f_md.write(f"## {key.capitalize()}\n{value}\n\n")
# En-tête du rapport
ticket_id = rapport_data.get("ticket_id", filename)
f_md.write(f"# Rapport d'Analyse du Ticket {ticket_id}\n\n")
f_md.write(f"*Généré le: {datetime.now().strftime('%d/%m/%Y à %H:%M:%S')}*\n\n")
# Résumé
metadata = rapport_data.get("metadata", {})
etapes = metadata.get("etapes", [])
f_md.write("## Résumé\n\n")
f_md.write(f"- **ID Ticket**: {ticket_id}\n")
f_md.write(f"- **Date d'analyse**: {metadata.get('timestamp_debut', timestamp)}\n")
f_md.write(f"- **Nombre d'étapes**: {len(etapes)}\n\n")
# Vue d'ensemble des agents et modèles utilisés
f_md.write("## Modèles et Paramètres Utilisés\n\n")
f_md.write("### Vue d'ensemble\n\n")
f_md.write("| Agent | Modèle | Température | Top-P | Max Tokens |\n")
f_md.write("|-------|--------|-------------|-------|------------|\n")
for agent_name in ["json_agent", "image_sorter", "image_analyser", "report_generator"]:
agent_info = metadata.get(agent_name, {})
if agent_info.get("status") == "non configuré":
continue
model = agent_info.get("model", "N/A")
config = agent_info.get("configuration", {}).get("config", {})
temp = config.get("temperature", "N/A")
top_p = config.get("top_p", "N/A")
max_tokens = config.get("max_tokens", "N/A")
self.ajouter_historique("generation_rapport", filename, "Rapport généré")
f_md.write(f"| {agent_name} | {model} | {temp} | {top_p} | {max_tokens} |\n")
f_md.write("\n")
# Détails des paramètres par agent
f_md.write("### Détails des Paramètres\n\n")
f_md.write("```json\n")
agents_config = {}
for agent_name in ["json_agent", "image_sorter", "image_analyser", "report_generator"]:
if agent_name in metadata and "configuration" in metadata[agent_name]:
agents_config[agent_name] = metadata[agent_name]["configuration"]
f_md.write(json.dumps(agents_config, indent=2))
f_md.write("\n```\n\n")
# Détails des prompts système
f_md.write("### Prompts Système\n\n")
for agent_name in ["json_agent", "image_sorter", "image_analyser", "report_generator"]:
agent_info = metadata.get(agent_name, {})
if agent_info.get("status") == "non configuré":
continue
config = agent_info.get("configuration", {}).get("config", {})
prompt = config.get("system_prompt", "")
if prompt:
f_md.write(f"**{agent_name}**:\n")
f_md.write("```\n")
f_md.write(prompt)
f_md.write("\n```\n\n")
# Étapes d'analyse
f_md.write("## Étapes d'Analyse\n\n")
for i, etape in enumerate(etapes, 1):
agent = etape.get("agent", "")
action = etape.get("action", "")
timestamp = etape.get("timestamp", "")
image = etape.get("image", "")
title = f"### {i}. {agent.replace('_', ' ').title()}"
if image:
title += f" - Image: {image}"
f_md.write(f"{title}\n\n")
f_md.write(f"- **Action**: {action}\n")
f_md.write(f"- **Timestamp**: {timestamp}\n")
# Métadonnées du modèle pour cette étape
etape_metadata = etape.get("metadata", {})
model = etape_metadata.get("model", "")
config = etape_metadata.get("configuration", {}).get("config", {})
duree = etape_metadata.get("duree_traitement", "")
f_md.write(f"- **Modèle**: {model}\n")
if duree:
f_md.write(f"- **Durée de traitement**: {duree}\n")
# Paramètres spécifiques pour cette exécution
if config:
f_md.write("- **Paramètres**:\n")
f_md.write("```json\n")
params = {k: v for k, v in config.items() if k != "system_prompt"}
f_md.write(json.dumps(params, indent=2))
f_md.write("\n```\n")
# Input/Output (limités en taille)
input_data = etape.get("input", "")
output_data = etape.get("output", "")
if input_data:
f_md.write("- **Entrée**:\n")
f_md.write("```\n")
f_md.write(str(input_data)[:300] + ("..." if len(str(input_data)) > 300 else ""))
f_md.write("\n```\n")
if output_data:
f_md.write("- **Sortie**:\n")
f_md.write("```\n")
f_md.write(str(output_data)[:300] + ("..." if len(str(output_data)) > 300 else ""))
f_md.write("\n```\n")
f_md.write("\n")
# Résultats des analyses
if "analyse_json" in rapport_data:
f_md.write("## Résultat de l'Analyse JSON\n\n")
f_md.write("```\n")
f_md.write(str(rapport_data["analyse_json"]))
f_md.write("\n```\n\n")
# Analyses des images
if "analyse_images" in rapport_data and rapport_data["analyse_images"]:
f_md.write("## Analyses des Images\n\n")
for image_path, analysis in rapport_data["analyse_images"].items():
image_name = os.path.basename(image_path)
f_md.write(f"### Image: {image_name}\n\n")
f_md.write("```\n")
f_md.write(str(analysis))
f_md.write("\n```\n\n")
# Informations supplémentaires
f_md.write("## Informations Supplémentaires\n\n")
f_md.write(f"Pour plus de détails, consultez le fichier JSON complet: `{os.path.basename(json_path)}`\n\n")
self.ajouter_historique("generation_rapport", filename, f"Rapport généré: {md_path}")
return json_path, md_path

View File

@ -0,0 +1,48 @@
import json
from .base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any, Tuple
class AgentReportGenerator(BaseAgent):
"""
Agent pour générer un rapport à partir des informations collectées.
"""
def __init__(self, llm):
super().__init__("AgentReportGenerator", llm, "report_generator")
def executer(self, rapport_data: Dict, filename: str) -> Tuple[str, str]:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Ajouter les métadonnées des LLM utilisés
if "metadata" not in rapport_data:
rapport_data["metadata"] = {}
rapport_data["metadata"]["report_generator"] = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"configuration": self.config.to_dict()
}
# Sauvegarde JSON
json_path = f"../reports/json_reports/{filename}_{timestamp}.json"
with open(json_path, "w", encoding="utf-8") as f_json:
json.dump(rapport_data, f_json, ensure_ascii=False, indent=4)
# Sauvegarde Markdown
md_path = f"../reports/markdown_reports/{filename}_{timestamp}.md"
with open(md_path, "w", encoding="utf-8") as f_md:
f_md.write(f"# Rapport {filename}\n\n")
# Ajouter les métadonnées des modèles utilisés
if "metadata" in rapport_data:
f_md.write("## Modèles et paramètres utilisés\n\n")
f_md.write("```json\n")
f_md.write(json.dumps(rapport_data["metadata"], indent=2))
f_md.write("\n```\n\n")
# Ajouter le contenu du rapport
for key, value in rapport_data.items():
if key != "metadata": # Ignorer la section metadata déjà traitée
f_md.write(f"## {key.capitalize()}\n{value}\n\n")
self.ajouter_historique("generation_rapport", filename, "Rapport généré")
return json_path, md_path

View File

@ -1,20 +1,67 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
from .utils.agent_config import AgentConfig
class BaseAgent(ABC):
"""
Classe de base pour les agents.
"""
def __init__(self, nom: str, llm: Any):
def __init__(self, nom: str, llm: Any, role: Optional[str] = None):
self.nom = nom
self.llm = llm
self.historique: List[Dict[str, Any]] = []
# Détecter le type de modèle
model_type = self._detecter_model_type()
# Définir le rôle par défaut si non spécifié
agent_role = role if role is not None else nom.lower().replace("agent", "").strip()
# Créer la configuration d'agent
self.config = AgentConfig(agent_role, model_type)
# Appliquer les paramètres au LLM
self._appliquer_config()
def _detecter_model_type(self) -> str:
"""
Détecte le type de modèle LLM.
"""
llm_class_name = self.llm.__class__.__name__.lower()
if "mistral" in llm_class_name:
return "mistral"
elif "pixtral" in llm_class_name:
return "pixtral"
elif "ollama" in llm_class_name:
return "ollama"
else:
return "generic"
def _appliquer_config(self) -> None:
"""
Applique la configuration au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.config.get_system_prompt()
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
self.llm.configurer(**self.config.get_params())
def ajouter_historique(self, action: str, input_data: Any, output_data: Any):
# Ajouter les informations sur le modèle et les paramètres utilisés
metadata = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"configuration": self.config.to_dict(),
"duree_traitement": str(getattr(self.llm, "dureeTraitement", "N/A"))
}
self.historique.append({
"action": action,
"input": input_data,
"output": output_data
"output": output_data,
"metadata": metadata
})
@abstractmethod

3
agents/utils/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .agent_config import AgentConfig
__all__ = ['AgentConfig']

View File

@ -0,0 +1,114 @@
from typing import Dict, Any, Optional
class AgentConfig:
"""
Classe pour configurer les paramètres LLM par rôle d'agent.
Cette classe permet d'harmoniser les paramètres entre différents LLM.
"""
# Configurations par défaut pour chaque rôle d'agent
DEFAULT_CONFIGS = {
"json_analyser": {
"temperature": 0.2, # Besoin d'analyse précise
"top_p": 0.9,
"max_tokens": 1500,
"system_prompt": "Tu es un assistant spécialisé dans l'analyse de tickets JSON. Extrais les informations pertinentes et structure ta réponse de manière claire.",
},
"image_sorter": {
"temperature": 0.3, # Décision de classification binaire
"top_p": 0.9,
"max_tokens": 200,
"system_prompt": "Tu es un assistant spécialisé dans le tri d'images. Tu dois décider si une image est pertinente ou non pour BRG_Lab.",
},
"image_analyser": {
"temperature": 0.5, # Analyse créative mais factuelle
"top_p": 0.95,
"max_tokens": 1000,
"system_prompt": "Tu es un assistant spécialisé dans l'analyse d'images. Décris ce que tu vois en tenant compte du contexte fourni.",
},
"report_generator": {
"temperature": 0.7, # Génération de rapport plus créative
"top_p": 1.0,
"max_tokens": 2000,
"system_prompt": "Tu es un assistant spécialisé dans la génération de rapports. Synthétise les informations fournies de manière claire et professionnelle.",
}
}
# Paramètres spécifiques à chaque type de modèle pour harmoniser les performances
MODEL_ADJUSTMENTS = {
"mistral": {}, # Pas d'ajustement nécessaire, modèle de référence
"pixtral": {
"temperature": -0.1, # Légèrement plus conservateur
},
"ollama": {
"temperature": +0.1, # Légèrement plus créatif
"num_ctx": 2048,
"repeat_penalty": 1.1,
}
}
def __init__(self, role: str, model_type: str = "mistral"):
"""
Initialise la configuration avec les paramètres appropriés pour le rôle et le modèle.
Args:
role: Rôle de l'agent ('json_analyser', 'image_sorter', etc.)
model_type: Type de modèle ('mistral', 'pixtral', 'ollama', etc.)
"""
self.role = role
self.model_type = model_type.lower()
# Récupérer la configuration de base pour ce rôle
if role in self.DEFAULT_CONFIGS:
self.config = self.DEFAULT_CONFIGS[role].copy()
else:
# Configuration par défaut
self.config = {
"temperature": 0.5,
"top_p": 0.9,
"max_tokens": 1000,
"system_prompt": ""
}
# Appliquer les ajustements spécifiques au modèle
if model_type.lower() in self.MODEL_ADJUSTMENTS:
adjustments = self.MODEL_ADJUSTMENTS[model_type.lower()]
for key, adjustment in adjustments.items():
if key in self.config:
if isinstance(adjustment, (int, float)) and isinstance(self.config[key], (int, float)):
self.config[key] += adjustment # Ajustement relatif
else:
self.config[key] = adjustment # Remplacement direct
else:
self.config[key] = adjustment # Ajout d'un nouveau paramètre
def get_params(self) -> Dict[str, Any]:
"""
Retourne tous les paramètres sauf le prompt système.
"""
params = self.config.copy()
if "system_prompt" in params:
params.pop("system_prompt")
return params
def get_system_prompt(self) -> str:
"""
Retourne le prompt système.
"""
return self.config.get("system_prompt", "")
def to_dict(self) -> Dict[str, Any]:
"""
Retourne toute la configuration sous forme de dictionnaire.
"""
return {
"role": self.role,
"model_type": self.model_type,
"config": self.config
}
def update(self, **kwargs) -> None:
"""
Met à jour la configuration avec les nouveaux paramètres.
"""
self.config.update(kwargs)

75
orchestrator.py.backup Normal file
View File

@ -0,0 +1,75 @@
import os
import json
from typing import List, Dict, Any, Optional
from agents.base_agent import BaseAgent
class Orchestrator:
def __init__(self,
output_dir: str = "output/",
json_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None):
self.output_dir = output_dir
# Assignation directe des agents (qui peuvent être injectés lors des tests)
self.json_agent = json_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
def detecter_tickets(self) -> List[str]:
tickets = []
for ticket_dir in os.listdir(self.output_dir):
ticket_path = os.path.join(self.output_dir, ticket_dir)
if os.path.isdir(ticket_path) and ticket_dir.startswith("ticket_"):
tickets.append(ticket_path)
return tickets
def traiter_ticket(self, ticket_path: str):
for extraction in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extraction)
if os.path.isdir(extraction_path):
attachments_dir = os.path.join(extraction_path, "attachments")
rapport_json_path = os.path.join(extraction_path, f"{extraction.split('_')[0]}_rapport.json")
rapports_dir = os.path.join(extraction_path, f"{extraction.split('_')[0]}_rapports")
os.makedirs(rapports_dir, exist_ok=True)
if os.path.exists(rapport_json_path):
with open(rapport_json_path, 'r', encoding='utf-8') as file:
ticket_json = json.load(file)
json_analysis = self.json_agent.executer(ticket_json) if self.json_agent else None
relevant_images = []
if os.path.exists(attachments_dir):
for attachment in os.listdir(attachments_dir):
attachment_path = os.path.join(attachments_dir, attachment)
if attachment.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
is_relevant = self.image_sorter.executer(attachment_path) if self.image_sorter else False
if is_relevant:
relevant_images.append(attachment_path)
image_analysis_results = {}
for image_path in relevant_images:
if self.image_analyser and json_analysis:
image_analysis_results[image_path] = self.image_analyser.executer(
image_path,
contexte=json_analysis
)
rapport_data = {
"analyse_json": json_analysis,
"analyse_images": image_analysis_results
}
if self.report_generator:
self.report_generator.executer(rapport_data, os.path.join(rapports_dir, extraction.split('_')[0]))
print(f"Traitement du ticket {ticket_path} terminé.\n")
def executer(self):
tickets = self.detecter_tickets()
for ticket in tickets:
self.traiter_ticket(ticket)

206
orchestrator.py.new Normal file
View File

@ -0,0 +1,206 @@
import os
import json
from typing import List, Dict, Any, Optional
from agents.base_agent import BaseAgent
from datetime import datetime
class Orchestrator:
def __init__(self,
output_dir: str = "output/",
json_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None):
self.output_dir = output_dir
# Assignation directe des agents (qui peuvent être injectés lors des tests)
self.json_agent = json_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
# Métadonnées pour suivre l'utilisation des LLM
self.metadata = {
"ticket_id": None,
"timestamp_debut": datetime.now().strftime("%Y%m%d_%H%M%S"),
"json_agent": self._get_agent_info(json_agent),
"image_sorter": self._get_agent_info(image_sorter),
"image_analyser": self._get_agent_info(image_analyser),
"report_generator": self._get_agent_info(report_generator),
"etapes": []
}
def _get_agent_info(self, agent: Optional[BaseAgent]) -> Dict:
"""
Récupère les informations de base sur un agent.
"""
if not agent:
return {"status": "non configuré"}
info = {
"nom": agent.nom,
"model": getattr(agent.llm, "modele", str(type(agent.llm))),
}
if hasattr(agent, "config"):
info["configuration"] = agent.config.to_dict()
return info
def detecter_tickets(self) -> List[str]:
tickets = []
for ticket_dir in os.listdir(self.output_dir):
ticket_path = os.path.join(self.output_dir, ticket_dir)
if os.path.isdir(ticket_path) and ticket_dir.startswith("ticket_"):
tickets.append(ticket_path)
return tickets
def traiter_ticket(self, ticket_path: str):
# Extraire l'ID du ticket
ticket_id = os.path.basename(ticket_path)
self.metadata["ticket_id"] = ticket_id
self.metadata["timestamp_debut"] = datetime.now().strftime("%Y%m%d_%H%M%S")
for extraction in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extraction)
if os.path.isdir(extraction_path):
attachments_dir = os.path.join(extraction_path, "attachments")
rapport_json_path = os.path.join(extraction_path, f"{extraction.split('_')[0]}_rapport.json")
rapports_dir = os.path.join(extraction_path, f"{extraction.split('_')[0]}_rapports")
os.makedirs(rapports_dir, exist_ok=True)
if os.path.exists(rapport_json_path):
with open(rapport_json_path, 'r', encoding='utf-8') as file:
ticket_json = json.load(file)
# Analyse JSON
json_analysis = None
if self.json_agent:
print(f"Analyse du ticket JSON {ticket_id}...")
json_analysis = self.json_agent.executer(ticket_json)
# Capturer les métadonnées de l'exécution
if self.json_agent.historique:
latest_history = self.json_agent.historique[-1]
self.metadata["etapes"].append({
"agent": "json_agent",
"action": latest_history["action"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"input": str(latest_history["input"])[:100] + "...", # Limiter la taille
"output": str(latest_history["output"])[:100] + "...", # Limiter la taille
"metadata": latest_history["metadata"]
})
print(f" → Analyse JSON terminée")
# Tri et analyse des images
relevant_images = []
image_metadata = {}
if os.path.exists(attachments_dir):
print(f"Traitement des images dans {attachments_dir}...")
for attachment in os.listdir(attachments_dir):
attachment_path = os.path.join(attachments_dir, attachment)
if attachment.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
# Tri des images
is_relevant = False
if self.image_sorter:
print(f" Évaluation de la pertinence de l'image {attachment}...")
is_relevant = self.image_sorter.executer(attachment_path)
# Capturer les métadonnées
if self.image_sorter.historique:
latest_history = self.image_sorter.historique[-1]
image_metadata[attachment] = {
"tri": {
"result": is_relevant,
"metadata": latest_history["metadata"]
}
}
# Ajouter aux étapes
self.metadata["etapes"].append({
"agent": "image_sorter",
"image": attachment,
"action": latest_history["action"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"input": str(latest_history["input"])[:100] + "...",
"output": str(latest_history["output"]),
"metadata": latest_history["metadata"]
})
print(f" → Image {'pertinente' if is_relevant else 'non pertinente'}")
if is_relevant:
relevant_images.append(attachment_path)
# Analyse des images pertinentes
image_analysis_results = {}
if relevant_images:
print(f"Analyse des {len(relevant_images)} images pertinentes...")
for image_path in relevant_images:
image_name = os.path.basename(image_path)
if self.image_analyser and json_analysis:
print(f" Analyse de l'image {image_name}...")
analysis = self.image_analyser.executer(
image_path,
contexte=json_analysis
)
image_analysis_results[image_path] = analysis
# Capturer les métadonnées
if self.image_analyser.historique:
latest_history = self.image_analyser.historique[-1]
if image_name not in image_metadata:
image_metadata[image_name] = {}
image_metadata[image_name]["analyse"] = {
"metadata": latest_history["metadata"]
}
# Ajouter aux étapes
self.metadata["etapes"].append({
"agent": "image_analyser",
"image": image_name,
"action": latest_history["action"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"input": str(latest_history["input"])[:100] + "...",
"output": str(latest_history["output"])[:100] + "...",
"metadata": latest_history["metadata"]
})
print(f" → Analyse terminée")
# Préparation des données pour le rapport
rapport_data = {
"ticket_id": ticket_id,
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"analyse_json": json_analysis,
"analyse_images": image_analysis_results,
"metadata": self.metadata,
"images_metadata": image_metadata
}
# Génération du rapport
print(f"Génération du rapport pour le ticket {ticket_id}...")
if self.report_generator:
rapport_name = f"{ticket_id}_{extraction.split('_')[0]}"
json_path, md_path = self.report_generator.executer(
rapport_data,
os.path.join(rapports_dir, rapport_name)
)
# Capturer les métadonnées
if self.report_generator.historique:
latest_history = self.report_generator.historique[-1]
self.metadata["etapes"].append({
"agent": "report_generator",
"action": latest_history["action"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"input": rapport_name,
"output": f"JSON: {json_path}, MD: {md_path}",
"metadata": latest_history["metadata"]
})
print(f" → Rapports générés: JSON: {json_path}, Markdown: {md_path}")
print(f"Traitement du ticket {ticket_path} terminé.\n")
def executer(self):
tickets = self.detecter_tickets()
for ticket in tickets:
self.traiter_ticket(ticket)

View File

@ -0,0 +1,84 @@
import json
import os
from agents.agent_json_analyser import AgentJsonAnalyser
from agents.agent_image_sorter import AgentImageSorter
from agents.agent_image_analyser import AgentImageAnalyser
from agents.agent_report_generator import AgentReportGenerator
# Importer les classes LLM
from llm_classes.mistral_large import MistralLarge
from llm_classes.pixtral_12b import Pixtral12b
from llm_classes.ollama import Ollama
def test_different_models():
"""
Tester les performances de différents modèles LLM avec le même agent.
"""
# Données de test pour le JSON
test_json = {
"id": "123456",
"title": "Problème de connexion au serveur",
"description": "Depuis ce matin, impossible de se connecter au serveur principal. Erreur 500.",
"priority": "high",
"tags": ["connexion", "serveur", "erreur"],
"history": [
{"date": "2023-04-01", "action": "Création du ticket"},
{"date": "2023-04-02", "action": "Première tentative de résolution"}
]
}
# Répertoire pour les rapports
os.makedirs("reports/json_reports", exist_ok=True)
os.makedirs("reports/markdown_reports", exist_ok=True)
# Liste des modèles à tester
models = {
"mistral": MistralLarge(),
"pixtral": Pixtral12b(),
"ollama": Ollama("llama2") # Spécifier le modèle pour Ollama
}
# Test de chaque modèle pour l'analyse JSON
results = {}
for model_name, model in models.items():
print(f"Test avec le modèle {model_name}...")
# Créer l'agent avec ce modèle
json_agent = AgentJsonAnalyser(model)
# Tester les paramètres appliqués
print(f" Paramètres: {json_agent.config.get_params()}")
print(f" Prompt système: {json_agent.config.get_system_prompt()[:50]}...")
# Exécuter le test
try:
result = json_agent.executer(test_json)
success = True
except Exception as e:
result = str(e)
success = False
# Enregistrer le résultat
results[model_name] = {
"result": result,
"success": success,
"metadata": json_agent.historique[-1]["metadata"] if json_agent.historique else None
}
print(f" Succès: {success}")
print()
# Générer un rapport comparatif
print("Génération du rapport comparatif...")
report_generator = AgentReportGenerator(MistralLarge())
json_path, md_path = report_generator.executer(
{"resultats_comparatifs": results},
"comparaison_modeles"
)
print(f"Rapport généré avec succès!")
print(f"JSON: {json_path}")
print(f"Markdown: {md_path}")
if __name__ == "__main__":
test_different_models()