#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Gestionnaire de flux de travail pour coordonner les agents LLM """ import os import time from typing import Dict, Any, Optional, List, Tuple, Callable, Type, Union from agents.ocr import OCRAgent from agents.vision import VisionAgent from agents.translation import TranslationAgent from agents.summary import SummaryAgent from agents.rewriter import RewriterAgent from agents.base import LLMBaseAgent from config.agent_config import ( is_agent_enabled, get_default_model, DEFAULT_ENDPOINT, VERBOSE_LOGGING, ACTIVE_AGENTS ) from utils.workflow_logger import WorkflowLogger class WorkflowManager: """ Gestionnaire de flux de travail pour coordonner les agents """ def __init__(self, vision_model: Optional[str] = None, summary_model: Optional[str] = None, translation_model: Optional[str] = None, active_agents: Optional[Dict[str, bool]] = None, config: Optional[Dict[str, Any]] = None): """ Initialise le gestionnaire de flux de travail Args: vision_model (str, optional): Modèle à utiliser pour l'agent vision summary_model (str, optional): Modèle à utiliser pour l'agent résumé translation_model (str, optional): Modèle à utiliser pour l'agent traduction active_agents (Dict[str, bool], optional): Dictionnaire des agents actifs config (Dict[str, Any], optional): Configuration supplémentaire """ # Créer un ID unique pour cette session self.session_id = f"session_{int(time.time())}" # Initialiser le logger de flux self.logger = WorkflowLogger(self.session_id) # Stocker les modèles spécifiés self.model_overrides = { "vision": vision_model, "summary": summary_model, "translation": translation_model } # Stocker la configuration self.config = config or {} # Utiliser les agents actifs spécifiés ou la configuration par défaut self.active_agents = active_agents if active_agents is not None else ACTIVE_AGENTS.copy() # Initialiser les agents actifs selon la configuration self.agents = {} self._init_agents() print(f"Gestionnaire de flux initialisé avec session ID: {self.session_id}") print(f"Agents actifs: {', '.join(self.agents.keys())}") def _init_agents(self) -> None: """ Initialise les agents selon la configuration """ # OCR Agent if self.active_agents.get("ocr", True): self.agents["ocr"] = OCRAgent(model_name="ocr", endpoint=DEFAULT_ENDPOINT) # Vision Agent (analyse d'images) if self.active_agents.get("vision", True): model = self.model_overrides["vision"] or get_default_model("vision") self.agents["vision"] = VisionAgent(model, DEFAULT_ENDPOINT) # Translation Agent if self.active_agents.get("translation", True): model = self.model_overrides["translation"] or get_default_model("translation") self.agents["translation"] = TranslationAgent(model, DEFAULT_ENDPOINT) # Summary Agent (désactivé selon la configuration) if self.active_agents.get("summary", False): model = self.model_overrides["summary"] or get_default_model("summary") self.agents["summary"] = SummaryAgent(model, DEFAULT_ENDPOINT) # Rewriter Agent if self.active_agents.get("rewriter", True): model = get_default_model("rewriter") self.agents["rewriter"] = RewriterAgent(model, DEFAULT_ENDPOINT) def process_image(self, image_data: bytes, content_type: str, context: str = "", target_lang: str = "fr") -> Dict[str, Any]: """ Traite une image avec contexte en utilisant les agents configurés Args: image_data (bytes): Données de l'image content_type (str): Type de contenu (schéma, tableau, etc.) context (str): Contexte textuel associé à l'image target_lang (str): Langue cible pour les résultats (fr par défaut) Returns: Dict: Résultats du traitement """ results = {} # Vérifier si l'option d'enregistrement des images est désactivée save_images = self.config.get("save_images", True) if not save_images and "vision" in self.agents: # Appliquer la configuration au modèle de vision self.agents["vision"].config["save_images"] = False try: # Étape 1: Analyser l'image avec le contexte if "vision" in self.agents: print(f"Étape 1/3: Analyse de l'image avec le modèle {self.agents['vision'].model_name}...") # Si le contexte est en français et qu'on utilise un modèle en anglais, # on pourrait le traduire ici, mais pour simplifier on l'utilise tel quel vision_result = self.agents["vision"].generate( images=[image_data], selection_type=content_type, context=context ) results["vision"] = vision_result # Journalisation self.logger.log_step( agent_name="vision", input_data={"content_type": content_type, "context": context}, output_data=vision_result, metadata={"model": self.agents["vision"].model_name} ) # Si nous avons une erreur ou un message vide, ne pas continuer if "Error" in vision_result or "Erreur" in vision_result or not vision_result.strip(): print(f"Avertissement: L'analyse d'image a échoué ou retourné un résultat vide.") if not "translation" in self.agents: return results # Attendre un peu avant le prochain appel pour éviter de surcharger le serveur print("Délai de sécurité entre les appels d'agents (2s)...") time.sleep(2) # Étape 2: Traduire l'analyse en français si nécessaire if "translation" in self.agents and target_lang == "fr": text_to_translate = results.get("vision", "") # Si nous n'avons pas de résultat de vision mais avons un contexte, utiliser le contexte if (not text_to_translate or "Error" in text_to_translate or "Erreur" in text_to_translate) and context: text_to_translate = f"Voici le contexte de l'image: {context}" elif not text_to_translate: text_to_translate = "Aucune analyse d'image disponible." print(f"Étape 2/3: Traduction du texte avec le modèle {self.agents['translation'].model_name}...") translation_result = self.agents["translation"].generate( prompt=text_to_translate, source_language="en", target_language="fr" ) results["translation"] = translation_result # Journalisation self.logger.log_step( agent_name="translation", input_data={"text": text_to_translate}, output_data=translation_result, metadata={"model": self.agents["translation"].model_name} ) # Attendre un peu avant le prochain appel print("Délai de sécurité entre les appels d'agents (2s)...") time.sleep(2) # Étape 3: Générer un résumé si l'agent est activé if "summary" in self.agents: # Déterminer le texte à résumer text_to_summarize = "" if "translation" in results: text_to_summarize = results["translation"] elif "vision" in results: text_to_summarize = results["vision"] else: text_to_summarize = context or "Aucun texte disponible pour le résumé." if text_to_summarize.strip(): print(f"Étape 3/3: Génération du résumé avec le modèle {self.agents['summary'].model_name}...") # Utiliser le texte traduit pour le résumé summary_result = self.agents["summary"].generate( prompt=text_to_summarize, language="fr" if "translation" in results else "en" ) results["summary"] = summary_result # Journalisation self.logger.log_step( agent_name="summary", input_data={"text": text_to_summarize}, output_data=summary_result, metadata={"model": self.agents["summary"].model_name} ) # Générer un résumé du flux de travail models_used = { agent_name: agent.model_name for agent_name, agent in self.agents.items() if agent_name in ["vision", "translation", "summary"] } summary = f"Traitement terminé avec succès.\n" summary += f"Modèles utilisés: {', '.join([f'{k}={v}' for k, v in models_used.items()])}\n" summary += f"Agents exécutés: {', '.join(results.keys())}" self.logger.log_summary(summary) print(f"Traitement terminé avec succès.") return results except Exception as e: error_msg = f"Erreur lors du traitement de l'image: {str(e)}" print(error_msg) self.logger.log_error("workflow_manager", error_msg) results["error"] = error_msg return results def _translate_text(self, text: str, source_lang: str, target_lang: str) -> str: """ Traduit un texte d'une langue à une autre Args: text (str): Texte à traduire source_lang (str): Langue source target_lang (str): Langue cible Returns: str: Texte traduit """ if not text.strip(): return "" # Préparer les entrées pour le logger input_data = { "prompt": text, "source_language": source_lang, "target_language": target_lang } # Traduire le texte translation_agent = self.agents["translation"] translated_text = translation_agent.generate( prompt=text, source_language=source_lang, target_language=target_lang ) # Enregistrer l'étape self.logger.log_step( agent_name="translation", input_data=input_data, output_data=translated_text, metadata={ "source_language": source_lang, "target_language": target_lang, "model": translation_agent.model_name } ) return translated_text def _analyze_image(self, image_data: bytes, selection_type: str, context: str) -> str: """ Analyse une image avec contexte à l'aide de l'agent de vision Args: image_data (bytes): Données de l'image selection_type (str): Type de sélection context (str): Contexte en anglais Returns: str: Analyse de l'image """ # Préparer les entrées pour le logger input_data = { "images": [image_data], "selection_type": selection_type, "context": context } # Analyser l'image vision_agent = self.agents["vision"] analysis = vision_agent.generate( images=[image_data], selection_type=selection_type, context=context ) # Enregistrer l'étape self.logger.log_step( agent_name="vision", input_data=input_data, output_data=analysis, metadata={ "selection_type": selection_type, "model": vision_agent.model_name } ) return analysis