From 05c033a06e810a778a7aee0a5906891fee615896 Mon Sep 17 00:00:00 2001 From: Ladebeze66 Date: Fri, 18 Apr 2025 10:40:28 +0200 Subject: [PATCH] 1804-10:40optiorchestrator --- loaders/ticket_data_loader.bak | 340 ++++++++++++++++++++++++++++++ loaders/ticket_data_loader.py | 372 +++++---------------------------- orchestrator.py | 27 +-- 3 files changed, 397 insertions(+), 342 deletions(-) create mode 100644 loaders/ticket_data_loader.bak diff --git a/loaders/ticket_data_loader.bak b/loaders/ticket_data_loader.bak new file mode 100644 index 0000000..4e4e038 --- /dev/null +++ b/loaders/ticket_data_loader.bak @@ -0,0 +1,340 @@ +import os +import re +import json +import logging +from typing import Dict, Optional, Any, List, Union +from abc import ABC, abstractmethod + +logger = logging.getLogger("TicketDataLoader") + +class TicketDataSource(ABC): + """Classe abstraite pour les sources de données de tickets""" + + @abstractmethod + def charger(self, chemin_fichier: str) -> Dict[str, Any]: + """Charge les données du ticket depuis un fichier source""" + pass + + @abstractmethod + def get_format(self) -> str: + """Retourne le format de la source de données""" + pass + + def valider_donnees(self, donnees: Dict[str, Any]) -> bool: + """Vérifie si les données chargées contiennent les champs obligatoires""" + champs_obligatoires = ["code", "name"] + return all(field in donnees for field in champs_obligatoires) + + +class JsonTicketSource(TicketDataSource): + """Source de données pour les tickets au format JSON""" + + def charger(self, chemin_fichier: str) -> Dict[str, Any]: + """Charge les données du ticket depuis un fichier JSON""" + try: + with open(chemin_fichier, 'r', encoding='utf-8') as f: + donnees = json.load(f) + + # Ajout de métadonnées sur la source + if "metadata" not in donnees: + donnees["metadata"] = {} + + donnees["metadata"]["source_file"] = chemin_fichier + donnees["metadata"]["format"] = "json" + + return donnees + except Exception as e: + logger.error(f"Erreur lors du chargement du fichier JSON {chemin_fichier}: {str(e)}") + raise ValueError(f"Impossible de charger le fichier JSON: {str(e)}") + + def get_format(self) -> str: + return "json" + + +class MarkdownTicketSource(TicketDataSource): + """Source de données pour les tickets au format Markdown""" + + def charger(self, chemin_fichier: str) -> Dict[str, Any]: + """Charge les données du ticket depuis un fichier Markdown""" + try: + with open(chemin_fichier, 'r', encoding='utf-8') as f: + contenu_md = f.read() + + # Extraire les données du contenu Markdown + donnees = self._extraire_donnees_de_markdown(contenu_md) + + # Ajout de métadonnées sur la source + if "metadata" not in donnees: + donnees["metadata"] = {} + + donnees["metadata"]["source_file"] = chemin_fichier + donnees["metadata"]["format"] = "markdown" + + return donnees + except Exception as e: + logger.error(f"Erreur lors du chargement du fichier Markdown {chemin_fichier}: {str(e)}") + raise ValueError(f"Impossible de charger le fichier Markdown: {str(e)}") + + def get_format(self) -> str: + return "markdown" + + def _extraire_donnees_de_markdown(self, contenu_md: str) -> Dict[str, Any]: + """Extrait les données structurées d'un contenu Markdown""" + donnees = {} + + # Diviser le contenu en sections + sections = re.split(r"\n## ", contenu_md) + + # Traiter chaque section + for section in sections: + if section.startswith("Informations du ticket"): + ticket_info = self._analyser_infos_ticket(section) + donnees.update(ticket_info) + elif section.startswith("Messages"): + messages = self._analyser_messages(section) + donnees["messages"] = messages + elif section.startswith("Informations sur l'extraction"): + extraction_info = self._analyser_infos_extraction(section) + donnees.update(extraction_info) + + # Réorganiser les champs pour que la description soit après "name" + ordered_fields = ["id", "code", "name", "description"] + ordered_data = {} + + # D'abord ajouter les champs dans l'ordre spécifié + for field in ordered_fields: + if field in donnees: + ordered_data[field] = donnees[field] + + # Ensuite ajouter les autres champs + for key, value in donnees.items(): + if key not in ordered_data: + ordered_data[key] = value + + # S'assurer que la description est présente + if "description" not in ordered_data: + ordered_data["description"] = "" + + return ordered_data + + def _analyser_infos_ticket(self, section: str) -> Dict[str, Any]: + """Analyse la section d'informations du ticket""" + info = {} + description = [] + capturing_description = False + + lines = section.strip().split("\n") + i = 0 + while i < len(lines): + line = lines[i] + + # Si on est déjà en train de capturer la description + if capturing_description: + # Vérifie si on atteint une nouvelle section ou un nouveau champ + if i + 1 < len(lines) and (lines[i + 1].startswith("## ") or lines[i + 1].startswith("- **")): + capturing_description = False + info["description"] = "\n".join(description).strip() + else: + description.append(line) + i += 1 + continue + + # Détecte le début de la description + desc_match = re.match(r"- \*\*description\*\*:", line) + if desc_match: + capturing_description = True + i += 1 # Passe à la ligne suivante + continue + + # Traite les autres champs normalement + match = re.match(r"- \*\*(.*?)\*\*: (.*)", line) + if match: + key, value = match.groups() + key = key.lower().replace("/", "_").replace(" ", "_") + info[key] = value.strip() + + i += 1 + + # Si on finit en capturant la description, l'ajouter au dictionnaire + if capturing_description and description: + info["description"] = "\n".join(description).strip() + elif "description" not in info: + info["description"] = "" + + return info + + def _analyser_messages(self, section: str) -> List[Dict[str, Any]]: + """Analyse la section des messages""" + messages = [] + current_message = {} + in_message = False + + lines = section.strip().split("\n") + + for line in lines: + if line.startswith("### Message"): + if current_message: + messages.append(current_message) + current_message = {} + in_message = True + + elif line.startswith("**") and in_message: + match = re.match(r"\*\*(.*?)\*\*: (.*)", line) + if match: + key, value = match.groups() + key = key.lower().replace("/", "_").replace(" ", "_") + current_message[key] = value.strip() + else: + if in_message: + current_message["content"] = current_message.get("content", "") + line + "\n" + + if current_message: + messages.append(current_message) + + # Nettoyer le contenu des messages + for message in messages: + if "content" in message: + message["content"] = message["content"].strip() + + return messages + + def _analyser_infos_extraction(self, section: str) -> Dict[str, Any]: + """Analyse la section d'informations sur l'extraction""" + extraction_info = {} + + lines = section.strip().split("\n") + for line in lines: + match = re.match(r"- \*\*(.*?)\*\*: (.*)", line) + if match: + key, value = match.groups() + key = key.lower().replace("/", "_").replace(" ", "_") + extraction_info[key] = value.strip() + + return extraction_info + + +class TicketDataLoader: + """Classe pour charger les données de tickets à partir de différentes sources""" + + def __init__(self): + self.sources = { + "json": JsonTicketSource(), + "markdown": MarkdownTicketSource() + } + + def detecter_format(self, chemin_fichier: str) -> str: + """Détecte le format du fichier à partir de son extension""" + ext = os.path.splitext(chemin_fichier)[1].lower() + if ext == '.json': + return "json" + elif ext in ['.md', '.markdown']: + return "markdown" + else: + raise ValueError(f"Format de fichier non supporté: {ext}") + + def charger(self, chemin_fichier: str, format_force: Optional[str] = None) -> Dict[str, Any]: + """ + Charge les données d'un ticket à partir d'un fichier + + Args: + chemin_fichier: Chemin du fichier à charger + format_force: Format à utiliser (ignore la détection automatique) + + Returns: + Dictionnaire contenant les données du ticket + """ + if not os.path.exists(chemin_fichier): + raise FileNotFoundError(f"Le fichier {chemin_fichier} n'existe pas") + + format_fichier = format_force if format_force else self.detecter_format(chemin_fichier) + + if format_fichier not in self.sources: + raise ValueError(f"Format non supporté: {format_fichier}") + + logger.info(f"Chargement des données au format {format_fichier} depuis {chemin_fichier}") + donnees = self.sources[format_fichier].charger(chemin_fichier) + + # Validation des données + if not self.sources[format_fichier].valider_donnees(donnees): + logger.warning(f"Les données chargées depuis {chemin_fichier} ne contiennent pas tous les champs obligatoires") + + return donnees + + def trouver_ticket(self, ticket_dir: str, ticket_id: str) -> Optional[Dict[str, Optional[str]]]: + """ + Recherche des fichiers de ticket dans un répertoire spécifique + + Args: + ticket_dir: Répertoire contenant les données du ticket + ticket_id: Code du ticket à rechercher + + Returns: + Dictionnaire avec les chemins des fichiers de rapport trouvés (JSON est le format privilégié) + ou None si aucun répertoire valide n'est trouvé + { + "json": chemin_du_fichier_json ou None si non trouvé, + "markdown": chemin_du_fichier_markdown ou None si non trouvé + } + """ + logger.info(f"Recherche du ticket {ticket_id} dans {ticket_dir}") + + if not os.path.exists(ticket_dir): + logger.warning(f"Le répertoire {ticket_dir} n'existe pas") + return None + + rapport_dir = None + + # Chercher d'abord dans le dossier spécifique aux rapports + rapports_dir = os.path.join(ticket_dir, f"{ticket_id}_rapports") + if os.path.exists(rapports_dir) and os.path.isdir(rapports_dir): + rapport_dir = rapports_dir + logger.info(f"Dossier de rapports trouvé: {rapports_dir}") + + # Initialiser les chemins à None + json_path = None + md_path = None + + # Si on a trouvé un dossier de rapports, chercher dedans + if rapport_dir: + # Privilégier d'abord le format JSON (format principal) + for filename in os.listdir(rapport_dir): + # Chercher le fichier JSON + if filename.endswith(".json") and ticket_id in filename: + json_path = os.path.join(rapport_dir, filename) + logger.info(f"Fichier JSON trouvé: {json_path}") + break # Priorité au premier fichier JSON trouvé + + # Chercher le fichier Markdown comme fallback + for filename in os.listdir(rapport_dir): + if filename.endswith(".md") and ticket_id in filename: + md_path = os.path.join(rapport_dir, filename) + logger.info(f"Fichier Markdown trouvé: {md_path}") + break # Priorité au premier fichier Markdown trouvé + else: + # Si pas de dossier de rapports, chercher directement dans le répertoire du ticket + logger.info(f"Pas de dossier _rapports, recherche dans {ticket_dir}") + + # Privilégier d'abord le format JSON (format principal) + for filename in os.listdir(ticket_dir): + # Chercher le JSON en priorité + if filename.endswith(".json") and ticket_id in filename and not filename.startswith("ticket_"): + json_path = os.path.join(ticket_dir, filename) + logger.info(f"Fichier JSON trouvé: {json_path}") + break # Priorité au premier fichier JSON trouvé + + # Chercher le Markdown comme fallback + for filename in os.listdir(ticket_dir): + if filename.endswith(".md") and ticket_id in filename: + md_path = os.path.join(ticket_dir, filename) + logger.info(f"Fichier Markdown trouvé: {md_path}") + break # Priorité au premier fichier Markdown trouvé + + # Si on n'a pas trouvé de fichier, alors renvoyer un dictionnaire vide plutôt que None + if not json_path and not md_path: + logger.warning(f"Aucun fichier de rapport trouvé pour le ticket {ticket_id}") + return {"json": None, "markdown": None} + + return { + "json": json_path, # Format principal (prioritaire) + "markdown": md_path # Format secondaire (fallback) + } \ No newline at end of file diff --git a/loaders/ticket_data_loader.py b/loaders/ticket_data_loader.py index 4e4e038..6b71792 100644 --- a/loaders/ticket_data_loader.py +++ b/loaders/ticket_data_loader.py @@ -1,340 +1,66 @@ import os -import re import json import logging -from typing import Dict, Optional, Any, List, Union -from abc import ABC, abstractmethod +from typing import Dict, Any, Optional logger = logging.getLogger("TicketDataLoader") -class TicketDataSource(ABC): - """Classe abstraite pour les sources de données de tickets""" - - @abstractmethod - def charger(self, chemin_fichier: str) -> Dict[str, Any]: - """Charge les données du ticket depuis un fichier source""" - pass - - @abstractmethod - def get_format(self) -> str: - """Retourne le format de la source de données""" - pass - - def valider_donnees(self, donnees: Dict[str, Any]) -> bool: - """Vérifie si les données chargées contiennent les champs obligatoires""" - champs_obligatoires = ["code", "name"] - return all(field in donnees for field in champs_obligatoires) - - -class JsonTicketSource(TicketDataSource): - """Source de données pour les tickets au format JSON""" - - def charger(self, chemin_fichier: str) -> Dict[str, Any]: - """Charge les données du ticket depuis un fichier JSON""" - try: - with open(chemin_fichier, 'r', encoding='utf-8') as f: - donnees = json.load(f) - - # Ajout de métadonnées sur la source - if "metadata" not in donnees: - donnees["metadata"] = {} - - donnees["metadata"]["source_file"] = chemin_fichier - donnees["metadata"]["format"] = "json" - - return donnees - except Exception as e: - logger.error(f"Erreur lors du chargement du fichier JSON {chemin_fichier}: {str(e)}") - raise ValueError(f"Impossible de charger le fichier JSON: {str(e)}") - - def get_format(self) -> str: - return "json" - - -class MarkdownTicketSource(TicketDataSource): - """Source de données pour les tickets au format Markdown""" - - def charger(self, chemin_fichier: str) -> Dict[str, Any]: - """Charge les données du ticket depuis un fichier Markdown""" - try: - with open(chemin_fichier, 'r', encoding='utf-8') as f: - contenu_md = f.read() - - # Extraire les données du contenu Markdown - donnees = self._extraire_donnees_de_markdown(contenu_md) - - # Ajout de métadonnées sur la source - if "metadata" not in donnees: - donnees["metadata"] = {} - - donnees["metadata"]["source_file"] = chemin_fichier - donnees["metadata"]["format"] = "markdown" - - return donnees - except Exception as e: - logger.error(f"Erreur lors du chargement du fichier Markdown {chemin_fichier}: {str(e)}") - raise ValueError(f"Impossible de charger le fichier Markdown: {str(e)}") - - def get_format(self) -> str: - return "markdown" - - def _extraire_donnees_de_markdown(self, contenu_md: str) -> Dict[str, Any]: - """Extrait les données structurées d'un contenu Markdown""" - donnees = {} - - # Diviser le contenu en sections - sections = re.split(r"\n## ", contenu_md) - - # Traiter chaque section - for section in sections: - if section.startswith("Informations du ticket"): - ticket_info = self._analyser_infos_ticket(section) - donnees.update(ticket_info) - elif section.startswith("Messages"): - messages = self._analyser_messages(section) - donnees["messages"] = messages - elif section.startswith("Informations sur l'extraction"): - extraction_info = self._analyser_infos_extraction(section) - donnees.update(extraction_info) - - # Réorganiser les champs pour que la description soit après "name" - ordered_fields = ["id", "code", "name", "description"] - ordered_data = {} - - # D'abord ajouter les champs dans l'ordre spécifié - for field in ordered_fields: - if field in donnees: - ordered_data[field] = donnees[field] - - # Ensuite ajouter les autres champs - for key, value in donnees.items(): - if key not in ordered_data: - ordered_data[key] = value - - # S'assurer que la description est présente - if "description" not in ordered_data: - ordered_data["description"] = "" - - return ordered_data - - def _analyser_infos_ticket(self, section: str) -> Dict[str, Any]: - """Analyse la section d'informations du ticket""" - info = {} - description = [] - capturing_description = False - - lines = section.strip().split("\n") - i = 0 - while i < len(lines): - line = lines[i] - - # Si on est déjà en train de capturer la description - if capturing_description: - # Vérifie si on atteint une nouvelle section ou un nouveau champ - if i + 1 < len(lines) and (lines[i + 1].startswith("## ") or lines[i + 1].startswith("- **")): - capturing_description = False - info["description"] = "\n".join(description).strip() - else: - description.append(line) - i += 1 - continue - - # Détecte le début de la description - desc_match = re.match(r"- \*\*description\*\*:", line) - if desc_match: - capturing_description = True - i += 1 # Passe à la ligne suivante - continue - - # Traite les autres champs normalement - match = re.match(r"- \*\*(.*?)\*\*: (.*)", line) - if match: - key, value = match.groups() - key = key.lower().replace("/", "_").replace(" ", "_") - info[key] = value.strip() - - i += 1 - - # Si on finit en capturant la description, l'ajouter au dictionnaire - if capturing_description and description: - info["description"] = "\n".join(description).strip() - elif "description" not in info: - info["description"] = "" - - return info - - def _analyser_messages(self, section: str) -> List[Dict[str, Any]]: - """Analyse la section des messages""" - messages = [] - current_message = {} - in_message = False - - lines = section.strip().split("\n") - - for line in lines: - if line.startswith("### Message"): - if current_message: - messages.append(current_message) - current_message = {} - in_message = True - - elif line.startswith("**") and in_message: - match = re.match(r"\*\*(.*?)\*\*: (.*)", line) - if match: - key, value = match.groups() - key = key.lower().replace("/", "_").replace(" ", "_") - current_message[key] = value.strip() - else: - if in_message: - current_message["content"] = current_message.get("content", "") + line + "\n" - - if current_message: - messages.append(current_message) - - # Nettoyer le contenu des messages - for message in messages: - if "content" in message: - message["content"] = message["content"].strip() - - return messages - - def _analyser_infos_extraction(self, section: str) -> Dict[str, Any]: - """Analyse la section d'informations sur l'extraction""" - extraction_info = {} - - lines = section.strip().split("\n") - for line in lines: - match = re.match(r"- \*\*(.*?)\*\*: (.*)", line) - if match: - key, value = match.groups() - key = key.lower().replace("/", "_").replace(" ", "_") - extraction_info[key] = value.strip() - - return extraction_info - - class TicketDataLoader: - """Classe pour charger les données de tickets à partir de différentes sources""" - - def __init__(self): - self.sources = { - "json": JsonTicketSource(), - "markdown": MarkdownTicketSource() - } - - def detecter_format(self, chemin_fichier: str) -> str: - """Détecte le format du fichier à partir de son extension""" - ext = os.path.splitext(chemin_fichier)[1].lower() - if ext == '.json': - return "json" - elif ext in ['.md', '.markdown']: - return "markdown" - else: - raise ValueError(f"Format de fichier non supporté: {ext}") - - def charger(self, chemin_fichier: str, format_force: Optional[str] = None) -> Dict[str, Any]: + """ + Charge uniquement les tickets au format JSON pour le pipeline. + """ + + def charger(self, chemin_fichier: str) -> Dict[str, Any]: """ - Charge les données d'un ticket à partir d'un fichier - + Charge un ticket JSON depuis un chemin de fichier. + Args: - chemin_fichier: Chemin du fichier à charger - format_force: Format à utiliser (ignore la détection automatique) - + chemin_fichier: str - chemin complet vers le fichier JSON + Returns: - Dictionnaire contenant les données du ticket + dict - contenu JSON enrichi de métadonnées """ if not os.path.exists(chemin_fichier): - raise FileNotFoundError(f"Le fichier {chemin_fichier} n'existe pas") + raise FileNotFoundError(f"Fichier introuvable : {chemin_fichier}") - format_fichier = format_force if format_force else self.detecter_format(chemin_fichier) - - if format_fichier not in self.sources: - raise ValueError(f"Format non supporté: {format_fichier}") - - logger.info(f"Chargement des données au format {format_fichier} depuis {chemin_fichier}") - donnees = self.sources[format_fichier].charger(chemin_fichier) - - # Validation des données - if not self.sources[format_fichier].valider_donnees(donnees): - logger.warning(f"Les données chargées depuis {chemin_fichier} ne contiennent pas tous les champs obligatoires") - - return donnees - - def trouver_ticket(self, ticket_dir: str, ticket_id: str) -> Optional[Dict[str, Optional[str]]]: + try: + with open(chemin_fichier, "r", encoding="utf-8") as f: + donnees = json.load(f) + + # Ajout de métadonnées utiles + if "metadata" not in donnees: + donnees["metadata"] = {} + donnees["metadata"]["source_file"] = chemin_fichier + donnees["metadata"]["format"] = "json" + + return donnees + + except Exception as e: + logger.error(f"Erreur lors du chargement du ticket : {e}") + raise + + def trouver_ticket(self, ticket_dir: str, ticket_id: str) -> Optional[str]: """ - Recherche des fichiers de ticket dans un répertoire spécifique - + Tente de retrouver le chemin du rapport JSON d'analyse d'un ticket. + Args: - ticket_dir: Répertoire contenant les données du ticket - ticket_id: Code du ticket à rechercher - + ticket_dir: str - chemin du répertoire d'extraction du ticket + ticket_id: str - code du ticket (ex: T1234) + Returns: - Dictionnaire avec les chemins des fichiers de rapport trouvés (JSON est le format privilégié) - ou None si aucun répertoire valide n'est trouvé - { - "json": chemin_du_fichier_json ou None si non trouvé, - "markdown": chemin_du_fichier_markdown ou None si non trouvé - } + str | None - chemin du fichier JSON ou None si non trouvé """ - logger.info(f"Recherche du ticket {ticket_id} dans {ticket_dir}") - - if not os.path.exists(ticket_dir): - logger.warning(f"Le répertoire {ticket_dir} n'existe pas") - return None - - rapport_dir = None - - # Chercher d'abord dans le dossier spécifique aux rapports - rapports_dir = os.path.join(ticket_dir, f"{ticket_id}_rapports") - if os.path.exists(rapports_dir) and os.path.isdir(rapports_dir): - rapport_dir = rapports_dir - logger.info(f"Dossier de rapports trouvé: {rapports_dir}") - - # Initialiser les chemins à None - json_path = None - md_path = None - - # Si on a trouvé un dossier de rapports, chercher dedans - if rapport_dir: - # Privilégier d'abord le format JSON (format principal) - for filename in os.listdir(rapport_dir): - # Chercher le fichier JSON - if filename.endswith(".json") and ticket_id in filename: - json_path = os.path.join(rapport_dir, filename) - logger.info(f"Fichier JSON trouvé: {json_path}") - break # Priorité au premier fichier JSON trouvé - - # Chercher le fichier Markdown comme fallback - for filename in os.listdir(rapport_dir): - if filename.endswith(".md") and ticket_id in filename: - md_path = os.path.join(rapport_dir, filename) - logger.info(f"Fichier Markdown trouvé: {md_path}") - break # Priorité au premier fichier Markdown trouvé - else: - # Si pas de dossier de rapports, chercher directement dans le répertoire du ticket - logger.info(f"Pas de dossier _rapports, recherche dans {ticket_dir}") - - # Privilégier d'abord le format JSON (format principal) - for filename in os.listdir(ticket_dir): - # Chercher le JSON en priorité - if filename.endswith(".json") and ticket_id in filename and not filename.startswith("ticket_"): - json_path = os.path.join(ticket_dir, filename) - logger.info(f"Fichier JSON trouvé: {json_path}") - break # Priorité au premier fichier JSON trouvé - - # Chercher le Markdown comme fallback - for filename in os.listdir(ticket_dir): - if filename.endswith(".md") and ticket_id in filename: - md_path = os.path.join(ticket_dir, filename) - logger.info(f"Fichier Markdown trouvé: {md_path}") - break # Priorité au premier fichier Markdown trouvé - - # Si on n'a pas trouvé de fichier, alors renvoyer un dictionnaire vide plutôt que None - if not json_path and not md_path: - logger.warning(f"Aucun fichier de rapport trouvé pour le ticket {ticket_id}") - return {"json": None, "markdown": None} - - return { - "json": json_path, # Format principal (prioritaire) - "markdown": md_path # Format secondaire (fallback) - } \ No newline at end of file + rapport_dir = os.path.join(ticket_dir, f"{ticket_id}_rapports") + + if os.path.isdir(rapport_dir): + for fichier in os.listdir(rapport_dir): + if fichier.endswith(".json") and ticket_id in fichier: + return os.path.join(rapport_dir, fichier) + + # fallback : recherche directe dans l'extraction si besoin + for fichier in os.listdir(ticket_dir): + if fichier.endswith(".json") and ticket_id in fichier and not fichier.startswith("ticket_"): + return os.path.join(ticket_dir, fichier) + + logger.warning(f"Aucun rapport JSON trouvé pour {ticket_id} dans {ticket_dir}") + return None diff --git a/orchestrator.py b/orchestrator.py index 88cf852..58b00dd 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -17,18 +17,6 @@ logger = logging.getLogger("Orchestrator") class Orchestrator: """ Orchestrateur pour l'analyse de tickets et la génération de rapports. - - Stratégie de gestion des formats: - - JSON est le format principal pour le traitement des données et l'analyse - - Markdown est utilisé uniquement comme format de présentation finale - - Les agents LLM travaillent principalement avec le format JSON - - La conversion JSON->Markdown se fait uniquement à la fin du processus pour la présentation - - Cette approche permet de: - 1. Simplifier le code des agents - 2. Réduire les redondances et incohérences entre formats - 3. Améliorer la performance des agents LLM avec un format plus structuré - 4. Faciliter la maintenance et l'évolution du système """ def __init__(self, output_dir: str = "output/", @@ -101,10 +89,10 @@ class Orchestrator: Cherche le rapport JSON. """ result = self.ticket_loader.trouver_ticket(extraction_path, ticket_id) - if not result or not result.get("json"): + if not result: logger.warning(f"Aucun fichier JSON trouvé pour le ticket {ticket_id}") return {"json": None} - return {"json": result.get("json")} + return {"json": result} def executer(self, ticket_specifique: Optional[str] = None): """ @@ -366,12 +354,13 @@ class Orchestrator: """ ticket_data = None - # Charger le fichier JSON - if rapports.get("json") and rapports["json"] is not None: + # Charger le fichier JSON si le chemin existe + json_path = rapports.get("json") + if json_path is not None: try: - ticket_data = self.ticket_loader.charger(rapports["json"]) - logger.info(f"Données JSON chargées depuis: {rapports['json']}") - print(f" Rapport JSON chargé: {os.path.basename(rapports['json'])}") + ticket_data = self.ticket_loader.charger(json_path) + logger.info(f"Données JSON chargées depuis: {json_path}") + print(f" Rapport JSON chargé: {os.path.basename(json_path)}") # Ajouter une métadonnée sur le format source if "metadata" not in ticket_data: