import os import re import json import logging from typing import Dict, Optional, Any, List, Union from abc import ABC, abstractmethod logger = logging.getLogger("TicketDataLoader") class TicketDataSource(ABC): """Classe abstraite pour les sources de données de tickets""" @abstractmethod def charger(self, chemin_fichier: str) -> Dict[str, Any]: """Charge les données du ticket depuis un fichier source""" pass @abstractmethod def get_format(self) -> str: """Retourne le format de la source de données""" pass def valider_donnees(self, donnees: Dict[str, Any]) -> bool: """Vérifie si les données chargées contiennent les champs obligatoires""" champs_obligatoires = ["code", "name"] return all(field in donnees for field in champs_obligatoires) class JsonTicketSource(TicketDataSource): """Source de données pour les tickets au format JSON""" def charger(self, chemin_fichier: str) -> Dict[str, Any]: """Charge les données du ticket depuis un fichier JSON""" try: with open(chemin_fichier, 'r', encoding='utf-8') as f: donnees = json.load(f) # Ajout de métadonnées sur la source if "metadata" not in donnees: donnees["metadata"] = {} donnees["metadata"]["source_file"] = chemin_fichier donnees["metadata"]["format"] = "json" return donnees except Exception as e: logger.error(f"Erreur lors du chargement du fichier JSON {chemin_fichier}: {str(e)}") raise ValueError(f"Impossible de charger le fichier JSON: {str(e)}") def get_format(self) -> str: return "json" class MarkdownTicketSource(TicketDataSource): """Source de données pour les tickets au format Markdown""" def charger(self, chemin_fichier: str) -> Dict[str, Any]: """Charge les données du ticket depuis un fichier Markdown""" try: with open(chemin_fichier, 'r', encoding='utf-8') as f: contenu_md = f.read() # Extraire les données du contenu Markdown donnees = self._extraire_donnees_de_markdown(contenu_md) # Ajout de métadonnées sur la source if "metadata" not in donnees: donnees["metadata"] = {} donnees["metadata"]["source_file"] = chemin_fichier donnees["metadata"]["format"] = "markdown" return donnees except Exception as e: logger.error(f"Erreur lors du chargement du fichier Markdown {chemin_fichier}: {str(e)}") raise ValueError(f"Impossible de charger le fichier Markdown: {str(e)}") def get_format(self) -> str: return "markdown" def _extraire_donnees_de_markdown(self, contenu_md: str) -> Dict[str, Any]: """Extrait les données structurées d'un contenu Markdown""" donnees = {} # Diviser le contenu en sections sections = re.split(r"\n## ", contenu_md) # Traiter chaque section for section in sections: if section.startswith("Informations du ticket"): ticket_info = self._analyser_infos_ticket(section) donnees.update(ticket_info) elif section.startswith("Messages"): messages = self._analyser_messages(section) donnees["messages"] = messages elif section.startswith("Informations sur l'extraction"): extraction_info = self._analyser_infos_extraction(section) donnees.update(extraction_info) # Réorganiser les champs pour que la description soit après "name" ordered_fields = ["id", "code", "name", "description"] ordered_data = {} # D'abord ajouter les champs dans l'ordre spécifié for field in ordered_fields: if field in donnees: ordered_data[field] = donnees[field] # Ensuite ajouter les autres champs for key, value in donnees.items(): if key not in ordered_data: ordered_data[key] = value # S'assurer que la description est présente if "description" not in ordered_data: ordered_data["description"] = "" return ordered_data def _analyser_infos_ticket(self, section: str) -> Dict[str, Any]: """Analyse la section d'informations du ticket""" info = {} description = [] capturing_description = False lines = section.strip().split("\n") i = 0 while i < len(lines): line = lines[i] # Si on est déjà en train de capturer la description if capturing_description: # Vérifie si on atteint une nouvelle section ou un nouveau champ if i + 1 < len(lines) and (lines[i + 1].startswith("## ") or lines[i + 1].startswith("- **")): capturing_description = False info["description"] = "\n".join(description).strip() else: description.append(line) i += 1 continue # Détecte le début de la description desc_match = re.match(r"- \*\*description\*\*:", line) if desc_match: capturing_description = True i += 1 # Passe à la ligne suivante continue # Traite les autres champs normalement match = re.match(r"- \*\*(.*?)\*\*: (.*)", line) if match: key, value = match.groups() key = key.lower().replace("/", "_").replace(" ", "_") info[key] = value.strip() i += 1 # Si on finit en capturant la description, l'ajouter au dictionnaire if capturing_description and description: info["description"] = "\n".join(description).strip() elif "description" not in info: info["description"] = "" return info def _analyser_messages(self, section: str) -> List[Dict[str, Any]]: """Analyse la section des messages""" messages = [] current_message = {} in_message = False lines = section.strip().split("\n") for line in lines: if line.startswith("### Message"): if current_message: messages.append(current_message) current_message = {} in_message = True elif line.startswith("**") and in_message: match = re.match(r"\*\*(.*?)\*\*: (.*)", line) if match: key, value = match.groups() key = key.lower().replace("/", "_").replace(" ", "_") current_message[key] = value.strip() else: if in_message: current_message["content"] = current_message.get("content", "") + line + "\n" if current_message: messages.append(current_message) # Nettoyer le contenu des messages for message in messages: if "content" in message: message["content"] = message["content"].strip() return messages def _analyser_infos_extraction(self, section: str) -> Dict[str, Any]: """Analyse la section d'informations sur l'extraction""" extraction_info = {} lines = section.strip().split("\n") for line in lines: match = re.match(r"- \*\*(.*?)\*\*: (.*)", line) if match: key, value = match.groups() key = key.lower().replace("/", "_").replace(" ", "_") extraction_info[key] = value.strip() return extraction_info class TicketDataLoader: """Classe pour charger les données de tickets à partir de différentes sources""" def __init__(self): self.sources = { "json": JsonTicketSource(), "markdown": MarkdownTicketSource() } def detecter_format(self, chemin_fichier: str) -> str: """Détecte le format du fichier à partir de son extension""" ext = os.path.splitext(chemin_fichier)[1].lower() if ext == '.json': return "json" elif ext in ['.md', '.markdown']: return "markdown" else: raise ValueError(f"Format de fichier non supporté: {ext}") def charger(self, chemin_fichier: str, format_force: Optional[str] = None) -> Dict[str, Any]: """ Charge les données d'un ticket à partir d'un fichier Args: chemin_fichier: Chemin du fichier à charger format_force: Format à utiliser (ignore la détection automatique) Returns: Dictionnaire contenant les données du ticket """ if not os.path.exists(chemin_fichier): raise FileNotFoundError(f"Le fichier {chemin_fichier} n'existe pas") format_fichier = format_force if format_force else self.detecter_format(chemin_fichier) if format_fichier not in self.sources: raise ValueError(f"Format non supporté: {format_fichier}") logger.info(f"Chargement des données au format {format_fichier} depuis {chemin_fichier}") donnees = self.sources[format_fichier].charger(chemin_fichier) # Validation des données if not self.sources[format_fichier].valider_donnees(donnees): logger.warning(f"Les données chargées depuis {chemin_fichier} ne contiennent pas tous les champs obligatoires") return donnees def trouver_ticket(self, ticket_dir: str, ticket_id: str) -> Optional[Dict[str, Optional[str]]]: """ Recherche des fichiers de ticket dans un répertoire spécifique Args: ticket_dir: Répertoire contenant les données du ticket ticket_id: Code du ticket à rechercher Returns: Dictionnaire avec les chemins des fichiers de rapport trouvés (JSON est le format privilégié) ou None si aucun répertoire valide n'est trouvé { "json": chemin_du_fichier_json ou None si non trouvé, "markdown": chemin_du_fichier_markdown ou None si non trouvé } """ logger.info(f"Recherche du ticket {ticket_id} dans {ticket_dir}") if not os.path.exists(ticket_dir): logger.warning(f"Le répertoire {ticket_dir} n'existe pas") return None rapport_dir = None # Chercher d'abord dans le dossier spécifique aux rapports rapports_dir = os.path.join(ticket_dir, f"{ticket_id}_rapports") if os.path.exists(rapports_dir) and os.path.isdir(rapports_dir): rapport_dir = rapports_dir logger.info(f"Dossier de rapports trouvé: {rapports_dir}") # Initialiser les chemins à None json_path = None md_path = None # Si on a trouvé un dossier de rapports, chercher dedans if rapport_dir: # Privilégier d'abord le format JSON (format principal) for filename in os.listdir(rapport_dir): # Chercher le fichier JSON if filename.endswith(".json") and ticket_id in filename: json_path = os.path.join(rapport_dir, filename) logger.info(f"Fichier JSON trouvé: {json_path}") break # Priorité au premier fichier JSON trouvé # Chercher le fichier Markdown comme fallback for filename in os.listdir(rapport_dir): if filename.endswith(".md") and ticket_id in filename: md_path = os.path.join(rapport_dir, filename) logger.info(f"Fichier Markdown trouvé: {md_path}") break # Priorité au premier fichier Markdown trouvé else: # Si pas de dossier de rapports, chercher directement dans le répertoire du ticket logger.info(f"Pas de dossier _rapports, recherche dans {ticket_dir}") # Privilégier d'abord le format JSON (format principal) for filename in os.listdir(ticket_dir): # Chercher le JSON en priorité if filename.endswith(".json") and ticket_id in filename and not filename.startswith("ticket_"): json_path = os.path.join(ticket_dir, filename) logger.info(f"Fichier JSON trouvé: {json_path}") break # Priorité au premier fichier JSON trouvé # Chercher le Markdown comme fallback for filename in os.listdir(ticket_dir): if filename.endswith(".md") and ticket_id in filename: md_path = os.path.join(ticket_dir, filename) logger.info(f"Fichier Markdown trouvé: {md_path}") break # Priorité au premier fichier Markdown trouvé # Si on n'a pas trouvé de fichier, alors renvoyer un dictionnaire vide plutôt que None if not json_path and not md_path: logger.warning(f"Aucun fichier de rapport trouvé pour le ticket {ticket_id}") return {"json": None, "markdown": None} return { "json": json_path, # Format principal (prioritaire) "markdown": md_path # Format secondaire (fallback) }