llm_ticket3/utils/ticket_data_loader.py
2025-04-07 15:40:17 +02:00

309 lines
12 KiB
Python

import os
import re
import json
import logging
from typing import Dict, Optional, Any, List, Union
from abc import ABC, abstractmethod
logger = logging.getLogger("TicketDataLoader")
class TicketDataSource(ABC):
"""Classe abstraite pour les sources de données de tickets"""
@abstractmethod
def charger(self, chemin_fichier: str) -> Dict[str, Any]:
"""Charge les données du ticket depuis un fichier source"""
pass
@abstractmethod
def get_format(self) -> str:
"""Retourne le format de la source de données"""
pass
def valider_donnees(self, donnees: Dict[str, Any]) -> bool:
"""Vérifie si les données chargées contiennent les champs obligatoires"""
champs_obligatoires = ["code", "name"]
return all(field in donnees for field in champs_obligatoires)
class JsonTicketSource(TicketDataSource):
"""Source de données pour les tickets au format JSON"""
def charger(self, chemin_fichier: str) -> Dict[str, Any]:
"""Charge les données du ticket depuis un fichier JSON"""
try:
with open(chemin_fichier, 'r', encoding='utf-8') as f:
donnees = json.load(f)
# Ajout de métadonnées sur la source
if "metadata" not in donnees:
donnees["metadata"] = {}
donnees["metadata"]["source_file"] = chemin_fichier
donnees["metadata"]["format"] = "json"
return donnees
except Exception as e:
logger.error(f"Erreur lors du chargement du fichier JSON {chemin_fichier}: {str(e)}")
raise ValueError(f"Impossible de charger le fichier JSON: {str(e)}")
def get_format(self) -> str:
return "json"
class MarkdownTicketSource(TicketDataSource):
"""Source de données pour les tickets au format Markdown"""
def charger(self, chemin_fichier: str) -> Dict[str, Any]:
"""Charge les données du ticket depuis un fichier Markdown"""
try:
with open(chemin_fichier, 'r', encoding='utf-8') as f:
contenu_md = f.read()
# Extraire les données du contenu Markdown
donnees = self._extraire_donnees_de_markdown(contenu_md)
# Ajout de métadonnées sur la source
if "metadata" not in donnees:
donnees["metadata"] = {}
donnees["metadata"]["source_file"] = chemin_fichier
donnees["metadata"]["format"] = "markdown"
return donnees
except Exception as e:
logger.error(f"Erreur lors du chargement du fichier Markdown {chemin_fichier}: {str(e)}")
raise ValueError(f"Impossible de charger le fichier Markdown: {str(e)}")
def get_format(self) -> str:
return "markdown"
def _extraire_donnees_de_markdown(self, contenu_md: str) -> Dict[str, Any]:
"""Extrait les données structurées d'un contenu Markdown"""
donnees = {}
# Diviser le contenu en sections
sections = re.split(r"\n## ", contenu_md)
# Traiter chaque section
for section in sections:
if section.startswith("Informations du ticket"):
ticket_info = self._analyser_infos_ticket(section)
donnees.update(ticket_info)
elif section.startswith("Messages"):
messages = self._analyser_messages(section)
donnees["messages"] = messages
elif section.startswith("Informations sur l'extraction"):
extraction_info = self._analyser_infos_extraction(section)
donnees.update(extraction_info)
# Réorganiser les champs pour que la description soit après "name"
ordered_fields = ["id", "code", "name", "description"]
ordered_data = {}
# D'abord ajouter les champs dans l'ordre spécifié
for field in ordered_fields:
if field in donnees:
ordered_data[field] = donnees[field]
# Ensuite ajouter les autres champs
for key, value in donnees.items():
if key not in ordered_data:
ordered_data[key] = value
# S'assurer que la description est présente
if "description" not in ordered_data:
ordered_data["description"] = ""
return ordered_data
def _analyser_infos_ticket(self, section: str) -> Dict[str, Any]:
"""Analyse la section d'informations du ticket"""
info = {}
description = []
capturing_description = False
lines = section.strip().split("\n")
i = 0
while i < len(lines):
line = lines[i]
# Si on est déjà en train de capturer la description
if capturing_description:
# Vérifie si on atteint une nouvelle section ou un nouveau champ
if i + 1 < len(lines) and (lines[i + 1].startswith("## ") or lines[i + 1].startswith("- **")):
capturing_description = False
info["description"] = "\n".join(description).strip()
else:
description.append(line)
i += 1
continue
# Détecte le début de la description
desc_match = re.match(r"- \*\*description\*\*:", line)
if desc_match:
capturing_description = True
i += 1 # Passe à la ligne suivante
continue
# Traite les autres champs normalement
match = re.match(r"- \*\*(.*?)\*\*: (.*)", line)
if match:
key, value = match.groups()
key = key.lower().replace("/", "_").replace(" ", "_")
info[key] = value.strip()
i += 1
# Si on finit en capturant la description, l'ajouter au dictionnaire
if capturing_description and description:
info["description"] = "\n".join(description).strip()
elif "description" not in info:
info["description"] = ""
return info
def _analyser_messages(self, section: str) -> List[Dict[str, Any]]:
"""Analyse la section des messages"""
messages = []
current_message = {}
in_message = False
lines = section.strip().split("\n")
for line in lines:
if line.startswith("### Message"):
if current_message:
messages.append(current_message)
current_message = {}
in_message = True
elif line.startswith("**") and in_message:
match = re.match(r"\*\*(.*?)\*\*: (.*)", line)
if match:
key, value = match.groups()
key = key.lower().replace("/", "_").replace(" ", "_")
current_message[key] = value.strip()
else:
if in_message:
current_message["content"] = current_message.get("content", "") + line + "\n"
if current_message:
messages.append(current_message)
# Nettoyer le contenu des messages
for message in messages:
if "content" in message:
message["content"] = message["content"].strip()
return messages
def _analyser_infos_extraction(self, section: str) -> Dict[str, Any]:
"""Analyse la section d'informations sur l'extraction"""
extraction_info = {}
lines = section.strip().split("\n")
for line in lines:
match = re.match(r"- \*\*(.*?)\*\*: (.*)", line)
if match:
key, value = match.groups()
key = key.lower().replace("/", "_").replace(" ", "_")
extraction_info[key] = value.strip()
return extraction_info
class TicketDataLoader:
"""Classe pour charger les données de tickets à partir de différentes sources"""
def __init__(self):
self.sources = {
"json": JsonTicketSource(),
"markdown": MarkdownTicketSource()
}
def detecter_format(self, chemin_fichier: str) -> str:
"""Détecte le format du fichier à partir de son extension"""
ext = os.path.splitext(chemin_fichier)[1].lower()
if ext == '.json':
return "json"
elif ext in ['.md', '.markdown']:
return "markdown"
else:
raise ValueError(f"Format de fichier non supporté: {ext}")
def charger(self, chemin_fichier: str, format_force: Optional[str] = None) -> Dict[str, Any]:
"""
Charge les données d'un ticket à partir d'un fichier
Args:
chemin_fichier: Chemin du fichier à charger
format_force: Format à utiliser (ignore la détection automatique)
Returns:
Dictionnaire contenant les données du ticket
"""
if not os.path.exists(chemin_fichier):
raise FileNotFoundError(f"Le fichier {chemin_fichier} n'existe pas")
format_fichier = format_force if format_force else self.detecter_format(chemin_fichier)
if format_fichier not in self.sources:
raise ValueError(f"Format non supporté: {format_fichier}")
logger.info(f"Chargement des données au format {format_fichier} depuis {chemin_fichier}")
donnees = self.sources[format_fichier].charger(chemin_fichier)
# Validation des données
if not self.sources[format_fichier].valider_donnees(donnees):
logger.warning(f"Les données chargées depuis {chemin_fichier} ne contiennent pas tous les champs obligatoires")
return donnees
def trouver_ticket(self, dossier: str, ticket_id: str) -> Optional[Dict[str, Optional[str]]]:
"""
Recherche les fichiers de ticket dans différents emplacements possibles
Args:
dossier: Dossier de base pour la recherche
ticket_id: ID du ticket (ex: T0101)
Returns:
Dictionnaire avec les chemins des fichiers trouvés par format ou None si aucun fichier trouvé
"""
resultats: Dict[str, Optional[str]] = {"json": None, "markdown": None}
# Liste des emplacements possibles pour les rapports
emplacements_possibles = [
# 1. Dans le répertoire d'extraction directement
dossier,
# 2. Dans un sous-répertoire "data"
os.path.join(dossier, "data"),
# 3. Dans un sous-répertoire spécifique au ticket pour les rapports
os.path.join(dossier, f"{ticket_id}_rapports"),
# 4. Dans un sous-répertoire "rapports"
os.path.join(dossier, "rapports")
]
# Vérifier chaque emplacement
for base_location in emplacements_possibles:
# Chercher le fichier JSON
json_path = os.path.join(base_location, f"{ticket_id}_rapport.json")
if os.path.exists(json_path):
logger.info(f"Rapport JSON trouvé à: {json_path}")
resultats["json"] = json_path
# Chercher le fichier Markdown
md_path = os.path.join(base_location, f"{ticket_id}_rapport.md")
if os.path.exists(md_path):
logger.info(f"Rapport Markdown trouvé à: {md_path}")
resultats["markdown"] = md_path
if not resultats["json"] and not resultats["markdown"]:
logger.warning(f"Aucun rapport trouvé pour {ticket_id} dans {dossier}")
return None
return resultats