llm_ticket3/formatters/json_to_markdown.py

525 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Script pour convertir les fichiers JSON de tickets en Markdown formaté.
Ce script prend les données JSON des tickets extraits et crée un fichier Markdown structuré.
"""
import os
import sys
import json
import argparse
import html
import subprocess
import re
from datetime import datetime
from .clean_html import clean_html, format_date
def clean_newlines(text):
"""
Nettoie les sauts de ligne excessifs dans le texte.
Args:
text: Texte à nettoyer
Returns:
Texte avec sauts de ligne normalisés
"""
if not text:
return text
# Étape 1: Normaliser tous les sauts de ligne
text = text.replace("\r\n", "\n").replace("\r", "\n")
# Étape 2: Supprimer les lignes vides consécutives (plus de 2 sauts de ligne)
text = re.sub(r'\n{3,}', '\n\n', text)
# Étape 3: Supprimer les espaces en début et fin de chaque ligne
lines = text.split('\n')
cleaned_lines = [line.strip() for line in lines]
# Étape 4: Supprimer les lignes qui ne contiennent que des espaces ou des caractères de mise en forme
meaningful_lines = []
for line in cleaned_lines:
# Ignorer les lignes qui ne contiennent que des caractères spéciaux de mise en forme
if line and not re.match(r'^[\s_\-=\.]+$', line):
meaningful_lines.append(line)
elif line: # Si c'est une ligne de séparation, la garder mais la normaliser
if re.match(r'^_{3,}$', line): # Ligne de tirets bas
meaningful_lines.append("___")
elif re.match(r'^-{3,}$', line): # Ligne de tirets
meaningful_lines.append("---")
elif re.match(r'^={3,}$', line): # Ligne d'égal
meaningful_lines.append("===")
else:
meaningful_lines.append(line)
# Recombiner les lignes
return '\n'.join(meaningful_lines)
def create_markdown_from_json(json_file, output_file):
"""
Crée un fichier Markdown à partir d'un fichier JSON de ticket
Args:
json_file: Le chemin vers le fichier JSON
output_file: Le chemin vers le fichier Markdown à créer
Returns:
bool: True si la conversion a réussi, False sinon
"""
# Déterminer l'emplacement du fichier de sortie JSON
json_output_file = output_file.replace('.md', '.json')
# Charger les données du fichier JSON
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except Exception as e:
print(f"Erreur lors de la lecture du fichier JSON: {e}")
return False
# Déterminer l'emplacement du fichier messages_raw.json
ticket_dir = os.path.dirname(json_file)
raw_messages_path = os.path.join(ticket_dir, "messages_raw.json")
# Charger les données brutes des messages
raw_messages_data = {}
if os.path.exists(raw_messages_path):
try:
with open(raw_messages_path, 'r', encoding='utf-8') as f:
raw_messages_data = json.load(f)
print(f"Fichier messages_raw.json chargé avec succès")
except Exception as e:
print(f"Erreur lors de la lecture de messages_raw.json: {e}")
else:
print(f"Fichier messages_raw.json introuvable: {raw_messages_path}")
ticket_summary = {}
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
ticket_summary = data.get("ticket_summary", {})
except Exception as e:
print(f"Erreur : {e}")
return False
ticket_code = ticket_summary.get("code", "inconnu")
# Créer le dossier rapports si il n'existe pas
reports_dir = os.path.join(ticket_dir, f"{ticket_code}_rapports")
os.makedirs(reports_dir, exist_ok=True)
output_file = os.path.join(reports_dir, f"{ticket_code}_rapport.md")
json_output_file = os.path.join(reports_dir, f"{ticket_code}_rapport.json")
# Essayer de lire le fichier ticket_info.json si disponible
ticket_info = {}
ticket_info_path = os.path.join(ticket_dir, "ticket_info.json")
if os.path.exists(ticket_info_path):
try:
with open(ticket_info_path, 'r', encoding='utf-8') as f:
ticket_info = json.load(f)
except Exception as e:
print(f"Avertissement: Impossible de lire ticket_info.json: {e}")
# Récupérer les informations du sommaire du ticket
ticket_summary = {}
if "ticket_summary" in data:
ticket_summary = data.get("ticket_summary", {})
else:
summary_path = os.path.join(ticket_dir, "ticket_summary.json")
if os.path.exists(summary_path):
try:
with open(summary_path, 'r', encoding='utf-8') as f:
ticket_summary = json.load(f)
except Exception as e:
print(f"Avertissement: Impossible de lire ticket_summary.json: {e}")
# Tenter de lire le fichier structure.json
structure = {}
structure_path = os.path.join(ticket_dir, "structure.json")
if os.path.exists(structure_path):
try:
with open(structure_path, 'r', encoding='utf-8') as f:
structure = json.load(f)
except Exception as e:
print(f"Avertissement: Impossible de lire structure.json: {e}")
# Commencer à construire le contenu Markdown
md_content = []
# Ajouter l'en-tête du document avec les informations du ticket
ticket_code = ticket_summary.get("code", os.path.basename(ticket_dir).split('_')[0])
ticket_name = ticket_summary.get("name", "")
md_content.append(f"# Ticket {ticket_code}: {ticket_name}")
md_content.append("")
# Ajouter des métadonnées du ticket
md_content.append("## Informations du ticket")
md_content.append("")
# Ajouter l'ID du ticket
ticket_id = ticket_summary.get("id", ticket_info.get("id", ""))
md_content.append(f"- **id**: {ticket_id}")
md_content.append(f"- **code**: {ticket_code}")
md_content.append(f"- **name**: {ticket_name}")
md_content.append(f"- **project_name**: {ticket_summary.get('project_name', '')}")
md_content.append(f"- **stage_name**: {ticket_summary.get('stage_name', '')}")
# Chercher l'utilisateur assigné dans les métadonnées
assigned_to = ""
if "user_id" in structure and structure["user_id"]:
user_id = structure["user_id"]
if isinstance(user_id, list) and len(user_id) > 1:
assigned_to = user_id[1]
md_content.append(f"- **user_id**: {assigned_to}")
# Ajouter le client si disponible
partner = ""
if "partner_id" in ticket_info:
partner_id = ticket_info.get("partner_id", [])
if isinstance(partner_id, list) and len(partner_id) > 1:
partner = partner_id[1]
# Ajouter l'email du client si disponible
partner_email = ""
if "email_from" in ticket_info and ticket_info["email_from"]:
partner_email = ticket_info["email_from"]
if partner:
partner += f", {partner_email}"
else:
partner = partner_email
md_content.append(f"- **partner_id/email_from**: {partner}")
# Ajouter les tags s'ils sont disponibles
tags = []
if "tag_ids" in ticket_info:
tag_ids = ticket_info.get("tag_ids", []) or []
for tag in tag_ids:
if isinstance(tag, list) and len(tag) > 1:
tags.append(tag[1])
if tags:
md_content.append(f"- **tag_ids**: {', '.join(tags)}")
# Ajouter les dates
md_content.append(f"- **create_date**: {format_date(ticket_info.get('create_date', ''))}")
md_content.append(f"- **write_date/last modification**: {format_date(ticket_info.get('write_date', ''))}")
if "date_deadline" in ticket_info and ticket_info.get("date_deadline"):
md_content.append(f"- **date_deadline**: {format_date(ticket_info.get('date_deadline', ''))}")
md_content.append("")
# Ajouter la description du ticket
description = ticket_info.get("description", "")
md_content.append(f"- **description**:")
md_content.append("") # saut de ligne
if description:
cleaned_description = clean_html(description)
if cleaned_description and cleaned_description != "*Contenu vide*":
cleaned_description = html.unescape(cleaned_description)
md_content.append(cleaned_description)
else:
md_content.append("*Aucune description fournie*")
else:
md_content.append("*Aucune description fournie*")
md_content.append("") # saut de ligne
# Ajouter les messages
messages = []
if "messages" in data:
messages = data.get("messages", [])
if not messages:
md_content.append("## Messages")
md_content.append("")
md_content.append("*Aucun message disponible*")
else:
# Filtrer les messages système non pertinents
filtered_messages = []
for msg in messages:
# Ignorer les messages système vides
if msg.get("is_system", False) and not msg.get("body", "").strip():
continue
# Ignorer les changements d'état sans contenu
if msg.get("is_stage_change", False) and not msg.get("body", "").strip():
# Sauf si on veut les garder pour la traçabilité
filtered_messages.append(msg)
continue
filtered_messages.append(msg)
# Si nous avons au moins un message significatif
if filtered_messages:
md_content.append("## Messages")
md_content.append("")
# Trier les messages par date
filtered_messages.sort(key=lambda x: x.get("date", ""))
for i, message in enumerate(filtered_messages):
if not isinstance(message, dict):
continue
# Déterminer l'auteur du message
author = "Système"
author_details = message.get("author_details", {})
if author_details and author_details.get("name"):
author = author_details.get("name")
else:
author_id = message.get("author_id", [])
if isinstance(author_id, list) and len(author_id) > 1:
author = author_id[1]
# Formater la date
date = format_date(message.get("date", ""))
# Récupérer le corps du message, en privilégiant body_original (HTML) si disponible
if "body_original" in message and message["body_original"]:
body = message["body_original"]
# Nettoyer le corps HTML avec clean_html
cleaned_body = clean_html(body, is_forwarded=message.get("is_forwarded", False))
# Vérifier s'il y a des liens utiles dans le message original mais pas dans le corps nettoyé
message_id = message.get("id", "")
if message_id and "Je ne parviens pas à accéder" not in body and "Pour vous accompagner" in body:
# Rechercher le message correspondant dans messages_raw.json
raw_message = None
# Si raw_messages_data est disponible, chercher le message brut
if raw_messages_data and "messages" in raw_messages_data:
for msg in raw_messages_data["messages"]:
if msg.get("id") == message_id:
raw_message = msg
break
# Si le message brut a été trouvé, extraire les liens
if raw_message and "body" in raw_message:
raw_body = raw_message["body"]
# Rechercher des liens vers la documentation ou des manuels
doc_links = []
link_pattern = re.compile(r'<a[^>]+href=["\']([^"\']+)["\'][^>]*>(.*?)</a>', re.DOTALL)
for match in link_pattern.finditer(raw_body):
href = match.group(1)
text = re.sub(r'<[^>]+>', '', match.group(2)).strip()
# Vérifier si c'est un lien de documentation ou manuel
doc_keywords = ['manuel', 'manual', 'documentation', 'doc.', 'faq', 'aide', 'help']
if any(keyword in href.lower() for keyword in doc_keywords) or any(keyword in text.lower() for keyword in doc_keywords):
doc_links.append((text, href))
# Si des liens ont été trouvés et qu'ils ne sont pas dans le corps nettoyé,
# les ajouter au corps nettoyé
if doc_links and not any(link[1] in cleaned_body for link in doc_links):
if "Pour vous accompagner" not in cleaned_body:
cleaned_body += "\n\nPour vous accompagner au mieux, voici des liens utiles :\n"
else:
cleaned_body += "\n"
for text, href in doc_links:
cleaned_body += f"[{text}]({href})\n"
else:
# Utiliser body directement (déjà en texte/markdown) sans passer par clean_html
body = message.get("body", "")
cleaned_body = body # Pas besoin de nettoyer car déjà en texte brut
# Déterminer le type de message
message_type = ""
if message.get("is_stage_change", False):
message_type = "Changement d'état"
elif message.get("is_system", False):
message_type = "Système"
elif message.get("is_note", False):
message_type = "Commentaire"
elif message.get("email_from", False):
message_type = "E-mail"
# Récupérer le sujet du message
subject = message.get("subject", "")
# Créer l'en-tête du message
md_content.append(f"### Message {i+1}")
md_content.append(f"**author_id**: {author}")
md_content.append(f"**date**: {date}")
md_content.append(f"**message_type**: {message_type}")
if subject:
md_content.append(f"**subject**: {subject}")
# Ajouter l'ID du message si disponible
message_id = message.get("id", "")
if message_id:
md_content.append(f"**id**: {message_id}")
# Ajouter le corps nettoyé du message
if cleaned_body:
cleaned_body = clean_newlines(cleaned_body)
md_content.append(cleaned_body)
else:
md_content.append("*Contenu vide*")
# Ajouter les pièces jointes si elles existent
attachment_ids = message.get("attachment_ids", [])
has_attachments = False
# Vérifier si les pièces jointes existent et ne sont pas vides
if attachment_ids:
# Récupérer les informations des pièces jointes
valid_attachments = []
if isinstance(attachment_ids, list) and all(isinstance(id, int) for id in attachment_ids):
# Chercher les informations des pièces jointes dans attachments_info.json
attachments_info_path = os.path.join(ticket_dir, "attachments_info.json")
if os.path.exists(attachments_info_path):
try:
with open(attachments_info_path, 'r', encoding='utf-8') as f:
attachments_info = json.load(f)
for attachment_id in attachment_ids:
for attachment_info in attachments_info:
if attachment_info.get("id") == attachment_id:
valid_attachments.append(attachment_info)
except Exception as e:
print(f"Avertissement: Impossible de lire attachments_info.json: {e}")
elif isinstance(attachment_ids, list):
for att in attachment_ids:
if isinstance(att, list) and len(att) > 1:
valid_attachments.append(att)
if valid_attachments:
has_attachments = True
md_content.append("")
md_content.append("**attachment_ids**:")
for att in valid_attachments:
if isinstance(att, list) and len(att) > 1:
md_content.append(f"- {att[1]}")
elif isinstance(att, dict):
att_id = att.get("id", "")
name = att.get("name", "Pièce jointe sans nom")
mimetype = att.get("mimetype", "Type inconnu")
md_content.append(f"- {name} ({mimetype}) [ID: {att_id}]")
md_content.append("")
md_content.append("---")
md_content.append("")
# Ajouter une section pour les pièces jointes du ticket si elles existent
attachment_data = {}
attachment_path = os.path.join(ticket_dir, "attachments.json")
if os.path.exists(attachment_path):
try:
with open(attachment_path, 'r', encoding='utf-8') as f:
attachment_data = json.load(f)
except Exception as e:
print(f"Avertissement: Impossible de lire attachments.json: {e}")
if attachment_data and "attachments" in attachment_data:
attachments = attachment_data.get("attachments", [])
if attachments:
md_content.append("## Pièces jointes")
md_content.append("")
md_content.append("| Nom | Type | Taille | Date |")
md_content.append("|-----|------|--------|------|")
for att in attachments:
name = att.get("name", "")
mimetype = att.get("mimetype", "")
file_size = att.get("file_size", 0)
size_str = f"{file_size / 1024:.1f} KB" if file_size else ""
create_date = format_date(att.get("create_date", ""))
md_content.append(f"| {name} | {mimetype} | {size_str} | {create_date} |")
md_content.append("")
# Ajouter des informations sur l'extraction
extract_time = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
md_content.append("## Informations sur l'extraction")
md_content.append("")
md_content.append(f"- **Date d'extraction**: {extract_time}")
md_content.append(f"- **Répertoire**: {ticket_dir}")
# Écrire le contenu dans le fichier de sortie
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write("\n".join(md_content))
print(f"Rapport Markdown créé : {output_file}")
# Appeler le script markdown_to_json.py
# Mise à jour du chemin pour utiliser le module formatters
from .markdown_to_json import markdown_to_json
result = markdown_to_json(output_file, json_output_file)
# Enrichir le fichier JSON avec les données de messages_raw.json
if result and raw_messages_data:
try:
# Lire le fichier JSON généré
with open(json_output_file, 'r', encoding='utf-8') as f:
rapport_json = json.load(f)
# Ajouter une référence vers les données brutes
rapport_json["messages_raw_reference"] = raw_messages_path
# Créer un lien explicite entre les ID des messages du rapport et ceux de messages_raw
if "messages" in rapport_json and "messages" in raw_messages_data:
for message in rapport_json["messages"]:
message_id = message.get("id")
if message_id:
# Chercher le message correspondant dans raw_messages
for raw_message in raw_messages_data["messages"]:
if raw_message.get("id") == message_id:
# Ajouter l'index du message brut correspondant
message["raw_message_index"] = raw_messages_data["messages"].index(raw_message)
break
# Ajouter une date d'extraction
rapport_json["date_d'extraction"] = extract_time
rapport_json["répertoire"] = ticket_dir
# Sauvegarder le fichier JSON enrichi
with open(json_output_file, 'w', encoding='utf-8') as f:
json.dump(rapport_json, f, indent=4, ensure_ascii=False)
print(f"Fichier JSON enrichi créé : {json_output_file}")
except Exception as e:
print(f"Erreur lors de l'enrichissement du fichier JSON: {e}")
else:
print(f"Fichier JSON créé : {json_output_file}")
return True
except Exception as e:
print(f"Erreur lors de l'écriture du fichier Markdown: {e}")
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convertir les fichiers JSON de tickets en Markdown")
parser.add_argument("--ticket_code", "-t", help="Code du ticket à convertir (ex: T11067)")
parser.add_argument("--date_dir", "-d", help="Dossier spécifique par date, optionnel (ex: 20250403_155134)")
parser.add_argument("--input_dir", "-i", default="output", help="Dossier racine contenant les tickets")
parser.add_argument("--output_name", "-o", default="rapport.md", help="Nom du fichier Markdown à générer")
args = parser.parse_args()
if args.ticket_code:
ticket_dir = os.path.join(args.input_dir, args.ticket_code)
# Si un dossier de date spécifique est fourni, l'ajouter au chemin
if args.date_dir:
ticket_dir = os.path.join(ticket_dir, args.date_dir)
# Chercher le fichier all_messages.json
json_file = os.path.join(ticket_dir, "all_messages.json")
if os.path.exists(json_file):
output_file = os.path.join(ticket_dir, args.output_name)
success = create_markdown_from_json(json_file, output_file)
if success:
print(f"Conversion réussie : {output_file}")
else:
print("Échec de la conversion")
else:
print(f"Fichier {json_file} introuvable")
else:
print("Veuillez spécifier un code de ticket avec l'option -t")