odoo_toolkit/data_filter.py
2025-03-20 10:31:00 +01:00

261 lines
9.9 KiB
Python

import os
import json
import re
from bs4 import BeautifulSoup
import html
import shutil
from typing import Dict, List, Any, Optional, Tuple, Union
def clean_html(html_content: str) -> str:
"""
Nettoie le contenu HTML en supprimant toutes les balises mais en préservant le texte.
Args:
html_content: Contenu HTML à nettoyer
Returns:
Texte nettoyé sans balises HTML
"""
if not html_content:
return ""
# Utiliser BeautifulSoup pour extraire le texte
soup = BeautifulSoup(html_content, 'html.parser')
# Récupérer le texte sans balises HTML
text = soup.get_text(separator=' ', strip=True)
# Décodage des entités HTML spéciales
text = html.unescape(text)
# Nettoyer les espaces multiples
text = re.sub(r'\s+', ' ', text)
# Nettoyer les lignes vides multiples
text = re.sub(r'\n\s*\n', '\n\n', text)
return text.strip()
def process_message_file(message_file_path: str, output_dir: str) -> None:
"""
Traite un fichier de message en nettoyant le contenu HTML des messages.
Args:
message_file_path: Chemin du fichier de message à traiter
output_dir: Répertoire de sortie pour le fichier traité
"""
try:
with open(message_file_path, 'r', encoding='utf-8') as f:
message_data = json.load(f)
# Vérifier si le message contient un corps HTML
if 'body' in message_data and message_data['body']:
# Remplacer le contenu HTML par le texte filtré
message_data['body'] = clean_html(message_data['body'])
# Écrire le message filtré
output_file_path = os.path.join(output_dir, os.path.basename(message_file_path))
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(message_data, f, indent=4, ensure_ascii=False)
print(f"Message traité: {os.path.basename(message_file_path)}")
except Exception as e:
print(f"Erreur lors du traitement du fichier {message_file_path}: {e}")
def process_messages_threads(threads_file_path: str, output_dir: str) -> None:
"""
Traite un fichier de threads de messages en nettoyant le contenu HTML.
Args:
threads_file_path: Chemin du fichier de threads de messages
output_dir: Répertoire de sortie pour le fichier traité
"""
try:
with open(threads_file_path, 'r', encoding='utf-8') as f:
threads_data = json.load(f)
# Parcourir tous les threads
for thread_id, thread in threads_data.items():
# Traiter le message principal
if thread.get('main_message') and 'body' in thread['main_message']:
thread['main_message']['body'] = clean_html(thread['main_message']['body'])
# Traiter les réponses
for i, reply in enumerate(thread.get('replies', [])):
if 'body' in reply:
thread['replies'][i]['body'] = clean_html(reply['body'])
# Écrire le fichier de threads filtré
output_file_path = os.path.join(output_dir, os.path.basename(threads_file_path))
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(threads_data, f, indent=4, ensure_ascii=False)
print(f"Fichier de threads traité: {os.path.basename(threads_file_path)}")
except Exception as e:
print(f"Erreur lors du traitement du fichier {threads_file_path}: {e}")
def process_messages_collection(messages_file_path: str, output_dir: str) -> None:
"""
Traite un fichier de collection de messages en nettoyant le contenu HTML.
Args:
messages_file_path: Chemin du fichier de collection de messages
output_dir: Répertoire de sortie pour le fichier traité
"""
try:
with open(messages_file_path, 'r', encoding='utf-8') as f:
messages_data = json.load(f)
# Parcourir tous les messages
for i, message in enumerate(messages_data):
if 'body' in message:
messages_data[i]['body'] = clean_html(message['body'])
# Écrire le fichier de messages filtré
output_file_path = os.path.join(output_dir, os.path.basename(messages_file_path))
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(messages_data, f, indent=4, ensure_ascii=False)
print(f"Collection de messages traitée: {os.path.basename(messages_file_path)}")
except Exception as e:
print(f"Erreur lors du traitement du fichier {messages_file_path}: {e}")
def process_ticket_folder(ticket_folder: str, output_base_dir: str) -> None:
"""
Traite un dossier de ticket en filtrant les messages HTML.
Args:
ticket_folder: Chemin du dossier du ticket à traiter
output_base_dir: Répertoire de base pour la sortie des fichiers filtrés
"""
ticket_name = os.path.basename(ticket_folder)
output_ticket_dir = os.path.join(output_base_dir, ticket_name)
# Créer le répertoire de sortie s'il n'existe pas
if not os.path.exists(output_ticket_dir):
os.makedirs(output_ticket_dir, exist_ok=True)
# Copier les fichiers d'information du ticket
for file_name in ['ticket_info.json', 'contact_info.json', 'activities.json', 'followers.json', 'attachments_info.json', 'timesheets.json']:
src_file = os.path.join(ticket_folder, file_name)
if os.path.exists(src_file):
dst_file = os.path.join(output_ticket_dir, file_name)
shutil.copy2(src_file, dst_file)
print(f"Fichier copié: {file_name}")
# Traitement des fichiers de messages
src_messages_dir = os.path.join(ticket_folder, 'messages')
if os.path.exists(src_messages_dir):
# Créer le répertoire de messages filtré
filtered_messages_dir = os.path.join(output_ticket_dir, 'messages')
if not os.path.exists(filtered_messages_dir):
os.makedirs(filtered_messages_dir, exist_ok=True)
# Traiter chaque fichier de message individuel
for file_name in os.listdir(src_messages_dir):
if file_name.endswith('.json'):
message_file_path = os.path.join(src_messages_dir, file_name)
process_message_file(message_file_path, filtered_messages_dir)
# Traitement des fichiers de messages regroupés
messages_file = os.path.join(ticket_folder, 'messages.json')
message_threads_file = os.path.join(ticket_folder, 'message_threads.json')
if os.path.exists(messages_file):
process_messages_collection(messages_file, output_ticket_dir)
if os.path.exists(message_threads_file):
process_messages_threads(message_threads_file, output_ticket_dir)
# Copier le répertoire des pièces jointes
src_attachments_dir = os.path.join(ticket_folder, 'attachments')
if os.path.exists(src_attachments_dir):
dst_attachments_dir = os.path.join(output_ticket_dir, 'attachments')
if os.path.exists(dst_attachments_dir):
shutil.rmtree(dst_attachments_dir)
shutil.copytree(src_attachments_dir, dst_attachments_dir)
print(f"Répertoire des pièces jointes copié")
print(f"Traitement du ticket {ticket_name} terminé")
def filter_exported_tickets(source_dir: str = 'exported_tickets', output_dir: str = 'filtered_tickets') -> None:
"""
Filtre les tickets exportés en nettoyant les messages HTML.
Args:
source_dir: Répertoire source contenant les tickets exportés
output_dir: Répertoire de sortie pour les tickets filtrés
"""
if not os.path.exists(source_dir):
print(f"Le répertoire source {source_dir} n'existe pas.")
return
# Créer le répertoire de sortie s'il n'existe pas
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
# Parcourir tous les éléments du répertoire source
for item in os.listdir(source_dir):
item_path = os.path.join(source_dir, item)
# Vérifier si c'est un dossier qui contient un ticket
if os.path.isdir(item_path) and (
item.startswith('ticket_') or
os.path.exists(os.path.join(item_path, 'ticket_info.json'))
):
process_ticket_folder(item_path, output_dir)
# Si c'est un fichier JSON brut de ticket, le copier simplement
elif os.path.isfile(item_path) and item.endswith('_raw.json'):
shutil.copy2(item_path, os.path.join(output_dir, item))
print(f"Fichier copié: {item}")
print(f"Filtrage des tickets terminé. Les tickets filtrés sont disponibles dans {output_dir}")
def run_filter_wizard() -> None:
"""
Interface utilisateur en ligne de commande pour filtrer les tickets exportés.
"""
print("\n==== FILTRAGE DES MESSAGES DES TICKETS ====")
# Demander le répertoire source
default_source = 'exported_tickets'
source_dir = input(f"Répertoire source (par défaut: {default_source}): ").strip()
if not source_dir:
source_dir = default_source
# Vérifier si le répertoire source existe
if not os.path.exists(source_dir):
print(f"Le répertoire source {source_dir} n'existe pas.")
return
# Demander le répertoire de sortie
default_output = 'filtered_tickets'
output_dir = input(f"Répertoire de sortie (par défaut: {default_output}): ").strip()
if not output_dir:
output_dir = default_output
# Confirmation
print(f"\nLes tickets du répertoire {source_dir} seront filtrés et placés dans {output_dir}.")
confirm = input("Continuer ? (o/n): ").strip().lower()
if confirm == 'o':
filter_exported_tickets(source_dir, output_dir)
print("\nOpération terminée avec succès!")
else:
print("Opération annulée.")
if __name__ == "__main__":
run_filter_wizard()