mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-13 10:46:51 +01:00
525 lines
24 KiB
Python
525 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script pour convertir les fichiers JSON de tickets en Markdown formaté.
|
|
Ce script prend les données JSON des tickets extraits et crée un fichier Markdown structuré.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import html
|
|
import subprocess
|
|
import re
|
|
from datetime import datetime
|
|
|
|
from .clean_html import clean_html, format_date
|
|
|
|
def clean_newlines(text):
|
|
"""
|
|
Nettoie les sauts de ligne excessifs dans le texte.
|
|
|
|
Args:
|
|
text: Texte à nettoyer
|
|
|
|
Returns:
|
|
Texte avec sauts de ligne normalisés
|
|
"""
|
|
if not text:
|
|
return text
|
|
|
|
# Étape 1: Normaliser tous les sauts de ligne
|
|
text = text.replace("\r\n", "\n").replace("\r", "\n")
|
|
|
|
# Étape 2: Supprimer les lignes vides consécutives (plus de 2 sauts de ligne)
|
|
text = re.sub(r'\n{3,}', '\n\n', text)
|
|
|
|
# Étape 3: Supprimer les espaces en début et fin de chaque ligne
|
|
lines = text.split('\n')
|
|
cleaned_lines = [line.strip() for line in lines]
|
|
|
|
# Étape 4: Supprimer les lignes qui ne contiennent que des espaces ou des caractères de mise en forme
|
|
meaningful_lines = []
|
|
for line in cleaned_lines:
|
|
# Ignorer les lignes qui ne contiennent que des caractères spéciaux de mise en forme
|
|
if line and not re.match(r'^[\s_\-=\.]+$', line):
|
|
meaningful_lines.append(line)
|
|
elif line: # Si c'est une ligne de séparation, la garder mais la normaliser
|
|
if re.match(r'^_{3,}$', line): # Ligne de tirets bas
|
|
meaningful_lines.append("___")
|
|
elif re.match(r'^-{3,}$', line): # Ligne de tirets
|
|
meaningful_lines.append("---")
|
|
elif re.match(r'^={3,}$', line): # Ligne d'égal
|
|
meaningful_lines.append("===")
|
|
else:
|
|
meaningful_lines.append(line)
|
|
|
|
# Recombiner les lignes
|
|
return '\n'.join(meaningful_lines)
|
|
|
|
def create_markdown_from_json(json_file, output_file):
|
|
"""
|
|
Crée un fichier Markdown à partir d'un fichier JSON de ticket
|
|
|
|
Args:
|
|
json_file: Le chemin vers le fichier JSON
|
|
output_file: Le chemin vers le fichier Markdown à créer
|
|
|
|
Returns:
|
|
bool: True si la conversion a réussi, False sinon
|
|
"""
|
|
# Déterminer l'emplacement du fichier de sortie JSON
|
|
json_output_file = output_file.replace('.md', '.json')
|
|
|
|
# Charger les données du fichier JSON
|
|
try:
|
|
with open(json_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except Exception as e:
|
|
print(f"Erreur lors de la lecture du fichier JSON: {e}")
|
|
return False
|
|
|
|
# Déterminer l'emplacement du fichier messages_raw.json
|
|
ticket_dir = os.path.dirname(json_file)
|
|
raw_messages_path = os.path.join(ticket_dir, "messages_raw.json")
|
|
|
|
# Charger les données brutes des messages
|
|
raw_messages_data = {}
|
|
if os.path.exists(raw_messages_path):
|
|
try:
|
|
with open(raw_messages_path, 'r', encoding='utf-8') as f:
|
|
raw_messages_data = json.load(f)
|
|
print(f"Fichier messages_raw.json chargé avec succès")
|
|
except Exception as e:
|
|
print(f"Erreur lors de la lecture de messages_raw.json: {e}")
|
|
else:
|
|
print(f"Fichier messages_raw.json introuvable: {raw_messages_path}")
|
|
|
|
ticket_summary = {}
|
|
try:
|
|
with open(json_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
ticket_summary = data.get("ticket_summary", {})
|
|
except Exception as e:
|
|
print(f"Erreur : {e}")
|
|
return False
|
|
|
|
ticket_code = ticket_summary.get("code", "inconnu")
|
|
|
|
# Créer le dossier rapports si il n'existe pas
|
|
reports_dir = os.path.join(ticket_dir, f"{ticket_code}_rapports")
|
|
os.makedirs(reports_dir, exist_ok=True)
|
|
|
|
output_file = os.path.join(reports_dir, f"{ticket_code}_rapport.md")
|
|
json_output_file = os.path.join(reports_dir, f"{ticket_code}_rapport.json")
|
|
|
|
# Essayer de lire le fichier ticket_info.json si disponible
|
|
ticket_info = {}
|
|
ticket_info_path = os.path.join(ticket_dir, "ticket_info.json")
|
|
if os.path.exists(ticket_info_path):
|
|
try:
|
|
with open(ticket_info_path, 'r', encoding='utf-8') as f:
|
|
ticket_info = json.load(f)
|
|
except Exception as e:
|
|
print(f"Avertissement: Impossible de lire ticket_info.json: {e}")
|
|
|
|
# Récupérer les informations du sommaire du ticket
|
|
ticket_summary = {}
|
|
if "ticket_summary" in data:
|
|
ticket_summary = data.get("ticket_summary", {})
|
|
else:
|
|
summary_path = os.path.join(ticket_dir, "ticket_summary.json")
|
|
if os.path.exists(summary_path):
|
|
try:
|
|
with open(summary_path, 'r', encoding='utf-8') as f:
|
|
ticket_summary = json.load(f)
|
|
except Exception as e:
|
|
print(f"Avertissement: Impossible de lire ticket_summary.json: {e}")
|
|
|
|
# Tenter de lire le fichier structure.json
|
|
structure = {}
|
|
structure_path = os.path.join(ticket_dir, "structure.json")
|
|
if os.path.exists(structure_path):
|
|
try:
|
|
with open(structure_path, 'r', encoding='utf-8') as f:
|
|
structure = json.load(f)
|
|
except Exception as e:
|
|
print(f"Avertissement: Impossible de lire structure.json: {e}")
|
|
|
|
# Commencer à construire le contenu Markdown
|
|
md_content = []
|
|
|
|
# Ajouter l'en-tête du document avec les informations du ticket
|
|
ticket_code = ticket_summary.get("code", os.path.basename(ticket_dir).split('_')[0])
|
|
ticket_name = ticket_summary.get("name", "")
|
|
|
|
md_content.append(f"# Ticket {ticket_code}: {ticket_name}")
|
|
md_content.append("")
|
|
|
|
# Ajouter des métadonnées du ticket
|
|
md_content.append("## Informations du ticket")
|
|
md_content.append("")
|
|
# Ajouter l'ID du ticket
|
|
ticket_id = ticket_summary.get("id", ticket_info.get("id", ""))
|
|
md_content.append(f"- **id**: {ticket_id}")
|
|
md_content.append(f"- **code**: {ticket_code}")
|
|
md_content.append(f"- **name**: {ticket_name}")
|
|
md_content.append(f"- **project_name**: {ticket_summary.get('project_name', '')}")
|
|
md_content.append(f"- **stage_name**: {ticket_summary.get('stage_name', '')}")
|
|
|
|
# Chercher l'utilisateur assigné dans les métadonnées
|
|
assigned_to = ""
|
|
if "user_id" in structure and structure["user_id"]:
|
|
user_id = structure["user_id"]
|
|
if isinstance(user_id, list) and len(user_id) > 1:
|
|
assigned_to = user_id[1]
|
|
|
|
md_content.append(f"- **user_id**: {assigned_to}")
|
|
|
|
# Ajouter le client si disponible
|
|
partner = ""
|
|
if "partner_id" in ticket_info:
|
|
partner_id = ticket_info.get("partner_id", [])
|
|
if isinstance(partner_id, list) and len(partner_id) > 1:
|
|
partner = partner_id[1]
|
|
|
|
# Ajouter l'email du client si disponible
|
|
partner_email = ""
|
|
if "email_from" in ticket_info and ticket_info["email_from"]:
|
|
partner_email = ticket_info["email_from"]
|
|
if partner:
|
|
partner += f", {partner_email}"
|
|
else:
|
|
partner = partner_email
|
|
|
|
md_content.append(f"- **partner_id/email_from**: {partner}")
|
|
|
|
# Ajouter les tags s'ils sont disponibles
|
|
tags = []
|
|
if "tag_ids" in ticket_info:
|
|
tag_ids = ticket_info.get("tag_ids", []) or []
|
|
for tag in tag_ids:
|
|
if isinstance(tag, list) and len(tag) > 1:
|
|
tags.append(tag[1])
|
|
|
|
if tags:
|
|
md_content.append(f"- **tag_ids**: {', '.join(tags)}")
|
|
|
|
# Ajouter les dates
|
|
md_content.append(f"- **create_date**: {format_date(ticket_info.get('create_date', ''))}")
|
|
md_content.append(f"- **write_date/last modification**: {format_date(ticket_info.get('write_date', ''))}")
|
|
if "date_deadline" in ticket_info and ticket_info.get("date_deadline"):
|
|
md_content.append(f"- **date_deadline**: {format_date(ticket_info.get('date_deadline', ''))}")
|
|
|
|
md_content.append("")
|
|
|
|
# Ajouter la description du ticket
|
|
description = ticket_info.get("description", "")
|
|
md_content.append(f"- **description**:")
|
|
md_content.append("") # saut de ligne
|
|
|
|
if description:
|
|
cleaned_description = clean_html(description)
|
|
if cleaned_description and cleaned_description != "*Contenu vide*":
|
|
cleaned_description = html.unescape(cleaned_description)
|
|
md_content.append(cleaned_description)
|
|
else:
|
|
md_content.append("*Aucune description fournie*")
|
|
else:
|
|
md_content.append("*Aucune description fournie*")
|
|
md_content.append("") # saut de ligne
|
|
|
|
# Ajouter les messages
|
|
messages = []
|
|
if "messages" in data:
|
|
messages = data.get("messages", [])
|
|
|
|
if not messages:
|
|
md_content.append("## Messages")
|
|
md_content.append("")
|
|
md_content.append("*Aucun message disponible*")
|
|
else:
|
|
# Filtrer les messages système non pertinents
|
|
filtered_messages = []
|
|
for msg in messages:
|
|
# Ignorer les messages système vides
|
|
if msg.get("is_system", False) and not msg.get("body", "").strip():
|
|
continue
|
|
|
|
# Ignorer les changements d'état sans contenu
|
|
if msg.get("is_stage_change", False) and not msg.get("body", "").strip():
|
|
# Sauf si on veut les garder pour la traçabilité
|
|
filtered_messages.append(msg)
|
|
continue
|
|
|
|
filtered_messages.append(msg)
|
|
|
|
# Si nous avons au moins un message significatif
|
|
if filtered_messages:
|
|
md_content.append("## Messages")
|
|
md_content.append("")
|
|
|
|
# Trier les messages par date
|
|
filtered_messages.sort(key=lambda x: x.get("date", ""))
|
|
|
|
for i, message in enumerate(filtered_messages):
|
|
if not isinstance(message, dict):
|
|
continue
|
|
|
|
# Déterminer l'auteur du message
|
|
author = "Système"
|
|
author_details = message.get("author_details", {})
|
|
if author_details and author_details.get("name"):
|
|
author = author_details.get("name")
|
|
else:
|
|
author_id = message.get("author_id", [])
|
|
if isinstance(author_id, list) and len(author_id) > 1:
|
|
author = author_id[1]
|
|
|
|
# Formater la date
|
|
date = format_date(message.get("date", ""))
|
|
|
|
# Récupérer le corps du message, en privilégiant body_original (HTML) si disponible
|
|
if "body_original" in message and message["body_original"]:
|
|
body = message["body_original"]
|
|
# Nettoyer le corps HTML avec clean_html
|
|
cleaned_body = clean_html(body, is_forwarded=message.get("is_forwarded", False))
|
|
|
|
# Vérifier s'il y a des liens utiles dans le message original mais pas dans le corps nettoyé
|
|
message_id = message.get("id", "")
|
|
if message_id and "Je ne parviens pas à accéder" not in body and "Pour vous accompagner" in body:
|
|
# Rechercher le message correspondant dans messages_raw.json
|
|
raw_message = None
|
|
|
|
# Si raw_messages_data est disponible, chercher le message brut
|
|
if raw_messages_data and "messages" in raw_messages_data:
|
|
for msg in raw_messages_data["messages"]:
|
|
if msg.get("id") == message_id:
|
|
raw_message = msg
|
|
break
|
|
|
|
# Si le message brut a été trouvé, extraire les liens
|
|
if raw_message and "body" in raw_message:
|
|
raw_body = raw_message["body"]
|
|
|
|
# Rechercher des liens vers la documentation ou des manuels
|
|
doc_links = []
|
|
link_pattern = re.compile(r'<a[^>]+href=["\']([^"\']+)["\'][^>]*>(.*?)</a>', re.DOTALL)
|
|
|
|
for match in link_pattern.finditer(raw_body):
|
|
href = match.group(1)
|
|
text = re.sub(r'<[^>]+>', '', match.group(2)).strip()
|
|
|
|
# Vérifier si c'est un lien de documentation ou manuel
|
|
doc_keywords = ['manuel', 'manual', 'documentation', 'doc.', 'faq', 'aide', 'help']
|
|
if any(keyword in href.lower() for keyword in doc_keywords) or any(keyword in text.lower() for keyword in doc_keywords):
|
|
doc_links.append((text, href))
|
|
|
|
# Si des liens ont été trouvés et qu'ils ne sont pas dans le corps nettoyé,
|
|
# les ajouter au corps nettoyé
|
|
if doc_links and not any(link[1] in cleaned_body for link in doc_links):
|
|
if "Pour vous accompagner" not in cleaned_body:
|
|
cleaned_body += "\n\nPour vous accompagner au mieux, voici des liens utiles :\n"
|
|
else:
|
|
cleaned_body += "\n"
|
|
|
|
for text, href in doc_links:
|
|
cleaned_body += f"[{text}]({href})\n"
|
|
else:
|
|
# Utiliser body directement (déjà en texte/markdown) sans passer par clean_html
|
|
body = message.get("body", "")
|
|
cleaned_body = body # Pas besoin de nettoyer car déjà en texte brut
|
|
|
|
# Déterminer le type de message
|
|
message_type = ""
|
|
if message.get("is_stage_change", False):
|
|
message_type = "Changement d'état"
|
|
elif message.get("is_system", False):
|
|
message_type = "Système"
|
|
elif message.get("is_note", False):
|
|
message_type = "Commentaire"
|
|
elif message.get("email_from", False):
|
|
message_type = "E-mail"
|
|
|
|
# Récupérer le sujet du message
|
|
subject = message.get("subject", "")
|
|
|
|
# Créer l'en-tête du message
|
|
md_content.append(f"### Message {i+1}")
|
|
md_content.append(f"**author_id**: {author}")
|
|
md_content.append(f"**date**: {date}")
|
|
md_content.append(f"**message_type**: {message_type}")
|
|
if subject:
|
|
md_content.append(f"**subject**: {subject}")
|
|
|
|
# Ajouter l'ID du message si disponible
|
|
message_id = message.get("id", "")
|
|
if message_id:
|
|
md_content.append(f"**id**: {message_id}")
|
|
|
|
# Ajouter le corps nettoyé du message
|
|
if cleaned_body:
|
|
cleaned_body = clean_newlines(cleaned_body)
|
|
md_content.append(cleaned_body)
|
|
else:
|
|
md_content.append("*Contenu vide*")
|
|
|
|
# Ajouter les pièces jointes si elles existent
|
|
attachment_ids = message.get("attachment_ids", [])
|
|
has_attachments = False
|
|
|
|
# Vérifier si les pièces jointes existent et ne sont pas vides
|
|
if attachment_ids:
|
|
# Récupérer les informations des pièces jointes
|
|
valid_attachments = []
|
|
if isinstance(attachment_ids, list) and all(isinstance(id, int) for id in attachment_ids):
|
|
# Chercher les informations des pièces jointes dans attachments_info.json
|
|
attachments_info_path = os.path.join(ticket_dir, "attachments_info.json")
|
|
if os.path.exists(attachments_info_path):
|
|
try:
|
|
with open(attachments_info_path, 'r', encoding='utf-8') as f:
|
|
attachments_info = json.load(f)
|
|
for attachment_id in attachment_ids:
|
|
for attachment_info in attachments_info:
|
|
if attachment_info.get("id") == attachment_id:
|
|
valid_attachments.append(attachment_info)
|
|
except Exception as e:
|
|
print(f"Avertissement: Impossible de lire attachments_info.json: {e}")
|
|
elif isinstance(attachment_ids, list):
|
|
for att in attachment_ids:
|
|
if isinstance(att, list) and len(att) > 1:
|
|
valid_attachments.append(att)
|
|
|
|
if valid_attachments:
|
|
has_attachments = True
|
|
md_content.append("")
|
|
md_content.append("**attachment_ids**:")
|
|
for att in valid_attachments:
|
|
if isinstance(att, list) and len(att) > 1:
|
|
md_content.append(f"- {att[1]}")
|
|
elif isinstance(att, dict):
|
|
att_id = att.get("id", "")
|
|
name = att.get("name", "Pièce jointe sans nom")
|
|
mimetype = att.get("mimetype", "Type inconnu")
|
|
md_content.append(f"- {name} ({mimetype}) [ID: {att_id}]")
|
|
|
|
md_content.append("")
|
|
md_content.append("---")
|
|
md_content.append("")
|
|
|
|
# Ajouter une section pour les pièces jointes du ticket si elles existent
|
|
attachment_data = {}
|
|
attachment_path = os.path.join(ticket_dir, "attachments.json")
|
|
if os.path.exists(attachment_path):
|
|
try:
|
|
with open(attachment_path, 'r', encoding='utf-8') as f:
|
|
attachment_data = json.load(f)
|
|
except Exception as e:
|
|
print(f"Avertissement: Impossible de lire attachments.json: {e}")
|
|
|
|
if attachment_data and "attachments" in attachment_data:
|
|
attachments = attachment_data.get("attachments", [])
|
|
if attachments:
|
|
md_content.append("## Pièces jointes")
|
|
md_content.append("")
|
|
md_content.append("| Nom | Type | Taille | Date |")
|
|
md_content.append("|-----|------|--------|------|")
|
|
|
|
for att in attachments:
|
|
name = att.get("name", "")
|
|
mimetype = att.get("mimetype", "")
|
|
file_size = att.get("file_size", 0)
|
|
size_str = f"{file_size / 1024:.1f} KB" if file_size else ""
|
|
create_date = format_date(att.get("create_date", ""))
|
|
|
|
md_content.append(f"| {name} | {mimetype} | {size_str} | {create_date} |")
|
|
|
|
md_content.append("")
|
|
|
|
# Ajouter des informations sur l'extraction
|
|
extract_time = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
|
|
md_content.append("## Informations sur l'extraction")
|
|
md_content.append("")
|
|
md_content.append(f"- **Date d'extraction**: {extract_time}")
|
|
md_content.append(f"- **Répertoire**: {ticket_dir}")
|
|
|
|
# Écrire le contenu dans le fichier de sortie
|
|
try:
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write("\n".join(md_content))
|
|
print(f"Rapport Markdown créé : {output_file}")
|
|
# Appeler le script markdown_to_json.py
|
|
# Mise à jour du chemin pour utiliser le module formatters
|
|
from .markdown_to_json import markdown_to_json
|
|
result = markdown_to_json(output_file, json_output_file)
|
|
|
|
# Enrichir le fichier JSON avec les données de messages_raw.json
|
|
if result and raw_messages_data:
|
|
try:
|
|
# Lire le fichier JSON généré
|
|
with open(json_output_file, 'r', encoding='utf-8') as f:
|
|
rapport_json = json.load(f)
|
|
|
|
# Ajouter une référence vers les données brutes
|
|
rapport_json["messages_raw_reference"] = raw_messages_path
|
|
|
|
# Créer un lien explicite entre les ID des messages du rapport et ceux de messages_raw
|
|
if "messages" in rapport_json and "messages" in raw_messages_data:
|
|
for message in rapport_json["messages"]:
|
|
message_id = message.get("id")
|
|
if message_id:
|
|
# Chercher le message correspondant dans raw_messages
|
|
for raw_message in raw_messages_data["messages"]:
|
|
if raw_message.get("id") == message_id:
|
|
# Ajouter l'index du message brut correspondant
|
|
message["raw_message_index"] = raw_messages_data["messages"].index(raw_message)
|
|
break
|
|
|
|
# Ajouter une date d'extraction
|
|
rapport_json["date_d'extraction"] = extract_time
|
|
rapport_json["répertoire"] = ticket_dir
|
|
|
|
# Sauvegarder le fichier JSON enrichi
|
|
with open(json_output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(rapport_json, f, indent=4, ensure_ascii=False)
|
|
print(f"Fichier JSON enrichi créé : {json_output_file}")
|
|
except Exception as e:
|
|
print(f"Erreur lors de l'enrichissement du fichier JSON: {e}")
|
|
else:
|
|
print(f"Fichier JSON créé : {json_output_file}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Erreur lors de l'écriture du fichier Markdown: {e}")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Convertir les fichiers JSON de tickets en Markdown")
|
|
parser.add_argument("--ticket_code", "-t", help="Code du ticket à convertir (ex: T11067)")
|
|
parser.add_argument("--date_dir", "-d", help="Dossier spécifique par date, optionnel (ex: 20250403_155134)")
|
|
parser.add_argument("--input_dir", "-i", default="output", help="Dossier racine contenant les tickets")
|
|
parser.add_argument("--output_name", "-o", default="rapport.md", help="Nom du fichier Markdown à générer")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.ticket_code:
|
|
ticket_dir = os.path.join(args.input_dir, args.ticket_code)
|
|
|
|
# Si un dossier de date spécifique est fourni, l'ajouter au chemin
|
|
if args.date_dir:
|
|
ticket_dir = os.path.join(ticket_dir, args.date_dir)
|
|
|
|
# Chercher le fichier all_messages.json
|
|
json_file = os.path.join(ticket_dir, "all_messages.json")
|
|
|
|
if os.path.exists(json_file):
|
|
output_file = os.path.join(ticket_dir, args.output_name)
|
|
success = create_markdown_from_json(json_file, output_file)
|
|
|
|
if success:
|
|
print(f"Conversion réussie : {output_file}")
|
|
else:
|
|
print("Échec de la conversion")
|
|
else:
|
|
print(f"Fichier {json_file} introuvable")
|
|
else:
|
|
print("Veuillez spécifier un code de ticket avec l'option -t") |