odoo_toolkit/data_filter.py
2025-03-21 16:27:36 +01:00

544 lines
22 KiB
Python

import os
import json
import re
from bs4 import BeautifulSoup
import html
import shutil
from typing import Dict, List, Any, Optional, Tuple, Union, Set
def is_odoobot_author(message: Dict[str, Any]) -> bool:
"""
Vérifie si l'auteur du message est OdooBot ou un autre système.
Args:
message: Le message à vérifier
Returns:
True si le message provient d'OdooBot, False sinon
"""
# Vérifier le nom de l'auteur
if 'author_id' in message and isinstance(message['author_id'], list) and len(message['author_id']) > 1:
author_name = message['author_id'][1].lower()
if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name:
return True
# Vérifier le type de message (souvent les notifications système)
if message.get('message_type') == 'notification':
return True
# Vérifier le sous-type du message
if 'subtype_id' in message and isinstance(message['subtype_id'], list) and len(message['subtype_id']) > 1:
subtype = message['subtype_id'][1].lower()
if 'notification' in subtype or 'system' in subtype:
return True
# Vérifier le contenu du message
if 'body' in message and isinstance(message['body'], str):
body = message['body'].lower()
system_patterns = [
'assigné à', 'étape changée', 'créé automatiquement',
'assigned to', 'stage changed', 'automatically created',
'updated', 'mis à jour', 'a modifié', 'changed'
]
for pattern in system_patterns:
if pattern in body:
return True
return False
def is_important_image(tag, message_text: str) -> bool:
"""
Détermine si une image est importante ou s'il s'agit d'un logo/signature.
Args:
tag: La balise d'image à analyser
message_text: Le texte complet du message pour contexte
Returns:
True si l'image semble importante, False sinon
"""
# Vérifier les attributs de l'image
src = tag.get('src', '')
alt = tag.get('alt', '')
title = tag.get('title', '')
css_class = tag.get('class', '')
# Patterns pour les images inutiles
useless_img_patterns = [
'logo', 'signature', 'outlook', 'footer', 'header', 'icon',
'emoticon', 'emoji', 'cid:', 'pixel', 'spacer', 'vignette',
'banner', 'separator', 'decoration', 'mail_signature'
]
# Vérifier si c'est une image inutile
for pattern in useless_img_patterns:
if (pattern in src.lower() or
pattern in alt.lower() or
pattern in title.lower() or
(css_class and any(pattern in c.lower() for c in css_class if isinstance(c, str)))):
return False
# Vérifier la taille (les petites images sont souvent des icônes/logos)
width = tag.get('width', '')
height = tag.get('height', '')
try:
width = int(width) if width and str(width).isdigit() else None
height = int(height) if height and str(height).isdigit() else None
if width and height and width <= 50 and height <= 50:
return False
except (ValueError, TypeError):
pass
# Vérifier si l'image est mentionnée dans le texte
image_indicators = [
'capture', 'screenshot', 'image', 'photo', 'illustration',
'voir', 'regarder', 'ci-joint', 'écran', 'erreur', 'problème',
'bug', 'pièce jointe', 'attachment', 'veuillez trouver'
]
for indicator in image_indicators:
if indicator in message_text.lower():
return True
# Par défaut, considérer les images qui ne sont pas clairement inutiles comme potentiellement importantes
return True
def find_attachment_references(message_text: str, attachments_info: List[Dict[str, Any]]) -> List[int]:
"""
Identifie les pièces jointes mentionnées dans le message.
Args:
message_text: Texte du message
attachments_info: Informations sur les pièces jointes disponibles
Returns:
Liste des IDs des pièces jointes pertinentes
"""
if not message_text or not attachments_info:
return []
# Patterns indiquant des pièces jointes
attachment_indicators = [
r'pi[èe]ce[s]? jointe[s]?', r'attachment[s]?', r'fichier[s]?', r'file[s]?',
r'veuillez trouver', r'please find', r'voir ci-joint', r'voir ci-dessous',
r'ci-joint', r'joint[e]?[s]?', r'attached', r'screenshot[s]?',
r'capture[s]? d[\'e] ?[ée]cran', r'image[s]?', r'photo[s]?'
]
relevant_ids = []
# Vérifier si le message mentionne des pièces jointes
mention_found = False
for pattern in attachment_indicators:
if re.search(pattern, message_text, re.IGNORECASE):
mention_found = True
break
if mention_found:
# Identifier les pièces jointes pertinentes (non logos/images d'interface)
for attachment in attachments_info:
name = attachment.get('name', '').lower() if attachment.get('name') else ''
# Exclure les pièces jointes qui semblent être des logos ou images d'interface
useless_patterns = ['logo', 'signature', 'outlook', 'icon', 'emoticon', 'emoji']
is_useless = any(pattern in name for pattern in useless_patterns)
if not is_useless and 'id' in attachment:
relevant_ids.append(attachment['id'])
return relevant_ids
def clean_html(html_content: str) -> str:
"""
Nettoie le contenu HTML en supprimant toutes les balises mais en préservant le texte.
Traite spécifiquement les images pour garder uniquement celles pertinentes.
Args:
html_content: Contenu HTML à nettoyer
Returns:
Texte nettoyé sans balises HTML
"""
if not html_content:
return ""
# Utiliser BeautifulSoup pour manipuler le HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Supprimer les éléments de signature
signature_elements = [
'div.signature', '.gmail_signature', '.signature',
'hr + div', 'hr + p', '.footer', '.mail-signature'
]
for selector in signature_elements:
for element in soup.select(selector):
element.decompose()
# Supprimer les lignes horizontales (souvent utilisées pour séparer les signatures)
for hr in soup.find_all('hr'):
hr.decompose()
# Récupérer le texte complet pour analyse
full_text = soup.get_text(' ', strip=True)
# Traiter les images
for img in soup.find_all('img'):
if is_important_image(img, full_text):
# Remplacer les images importantes par une description
alt_text = img.get('alt', '') or img.get('title', '') or '[Image importante]'
img.replace_with(f" [Image: {alt_text}] ")
else:
# Supprimer les images non pertinentes
img.decompose()
# Traiter les liens vers des pièces jointes
for a in soup.find_all('a', href=True):
href = a.get('href', '').lower()
if 'attachment' in href or 'download' in href or 'file' in href:
a.replace_with(f" [Pièce jointe: {a.get_text()}] ")
# Récupérer le texte sans balises HTML
text = soup.get_text(separator=' ', strip=True)
# Décodage des entités HTML spéciales
text = html.unescape(text)
# Nettoyer les espaces multiples
text = re.sub(r'\s+', ' ', text)
# Nettoyer les lignes vides multiples
text = re.sub(r'\n\s*\n', '\n\n', text)
# Supprimer les disclaimers et signatures standards
footer_patterns = [
r'Sent from my .*',
r'Envoyé depuis mon .*',
r'Ce message .*confidentiel.*',
r'This email .*confidential.*',
r'DISCLAIMER.*',
r'CONFIDENTIAL.*',
r'CONFIDENTIEL.*',
r'Le contenu de ce courriel est confidentiel.*',
r'This message and any attachments.*',
r'Ce message et ses pièces jointes.*',
r'AVIS DE CONFIDENTIALITÉ.*',
r'PRIVACY NOTICE.*'
]
for pattern in footer_patterns:
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
return text.strip()
def process_message_file(message_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None:
"""
Traite un fichier de message en nettoyant le contenu HTML des messages.
Args:
message_file_path: Chemin du fichier de message à traiter
output_dir: Répertoire de sortie pour le fichier traité
attachments_info: Informations sur les pièces jointes disponibles
"""
try:
with open(message_file_path, 'r', encoding='utf-8') as f:
message_data = json.load(f)
# Ignorer les messages d'OdooBot
if is_odoobot_author(message_data):
print(f"Message ignoré (OdooBot): {os.path.basename(message_file_path)}")
return
# Vérifier si le message contient un corps HTML
if 'body' in message_data and message_data['body']:
# Remplacer le contenu HTML par le texte filtré
message_data['body'] = clean_html(message_data['body'])
# Identifier les pièces jointes pertinentes mentionnées dans le message
if attachments_info and message_data['body']:
relevant_attachments = find_attachment_references(message_data['body'], attachments_info)
if relevant_attachments:
message_data['relevant_attachment_ids'] = relevant_attachments
# Écrire le message filtré
output_file_path = os.path.join(output_dir, os.path.basename(message_file_path))
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(message_data, f, indent=4, ensure_ascii=False)
print(f"Message traité: {os.path.basename(message_file_path)}")
except Exception as e:
print(f"Erreur lors du traitement du fichier {message_file_path}: {e}")
def process_messages_threads(threads_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None:
"""
Traite un fichier de threads de messages en nettoyant le contenu HTML.
Args:
threads_file_path: Chemin du fichier de threads de messages
output_dir: Répertoire de sortie pour le fichier traité
attachments_info: Informations sur les pièces jointes disponibles
"""
try:
with open(threads_file_path, 'r', encoding='utf-8') as f:
threads_data = json.load(f)
# Liste des threads à supprimer (ceux qui ne contiennent que des messages d'OdooBot)
threads_to_remove = []
# Parcourir tous les threads
for thread_id, thread in threads_data.items():
# Traiter le message principal
main_message_is_bot = False
if thread.get('main_message'):
if is_odoobot_author(thread['main_message']):
main_message_is_bot = True
thread['main_message'] = None
elif 'body' in thread['main_message']:
thread['main_message']['body'] = clean_html(thread['main_message']['body'])
# Identifier les pièces jointes pertinentes
if attachments_info and thread['main_message']['body']:
relevant_attachments = find_attachment_references(
thread['main_message']['body'], attachments_info
)
if relevant_attachments:
thread['main_message']['relevant_attachment_ids'] = relevant_attachments
# Traiter les réponses (filtrer les messages d'OdooBot)
filtered_replies = []
for reply in thread.get('replies', []):
if not is_odoobot_author(reply):
if 'body' in reply:
reply['body'] = clean_html(reply['body'])
# Identifier les pièces jointes pertinentes
if attachments_info and reply['body']:
relevant_attachments = find_attachment_references(reply['body'], attachments_info)
if relevant_attachments:
reply['relevant_attachment_ids'] = relevant_attachments
filtered_replies.append(reply)
# Mettre à jour les réponses
thread['replies'] = filtered_replies
# Si le thread ne contient que des messages de bot, le marquer pour suppression
if main_message_is_bot and not filtered_replies:
threads_to_remove.append(thread_id)
# Supprimer les threads qui ne contiennent que des messages d'OdooBot
for thread_id in threads_to_remove:
del threads_data[thread_id]
# Écrire le fichier de threads filtré
output_file_path = os.path.join(output_dir, os.path.basename(threads_file_path))
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(threads_data, f, indent=4, ensure_ascii=False)
print(f"Fichier de threads traité: {os.path.basename(threads_file_path)}")
if threads_to_remove:
print(f" {len(threads_to_remove)} threads supprimés (OdooBot uniquement)")
except Exception as e:
print(f"Erreur lors du traitement du fichier {threads_file_path}: {e}")
def process_messages_collection(messages_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None:
"""
Traite un fichier de collection de messages en nettoyant le contenu HTML.
Args:
messages_file_path: Chemin du fichier de collection de messages
output_dir: Répertoire de sortie pour le fichier traité
attachments_info: Informations sur les pièces jointes disponibles
"""
try:
with open(messages_file_path, 'r', encoding='utf-8') as f:
messages_data = json.load(f)
# Filtrer les messages pour supprimer ceux d'OdooBot
filtered_messages = []
for message in messages_data:
if not is_odoobot_author(message):
# Nettoyer le contenu HTML
if 'body' in message:
message['body'] = clean_html(message['body'])
# Identifier les pièces jointes pertinentes
if attachments_info and message['body']:
relevant_attachments = find_attachment_references(message['body'], attachments_info)
if relevant_attachments:
message['relevant_attachment_ids'] = relevant_attachments
filtered_messages.append(message)
# Écrire le fichier de messages filtré
output_file_path = os.path.join(output_dir, os.path.basename(messages_file_path))
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(filtered_messages, f, indent=4, ensure_ascii=False)
print(f"Collection de messages traitée: {os.path.basename(messages_file_path)}")
print(f" {len(messages_data) - len(filtered_messages)} messages supprimés (OdooBot)")
except Exception as e:
print(f"Erreur lors du traitement du fichier {messages_file_path}: {e}")
def process_ticket_folder(ticket_folder: str, output_base_dir: str) -> None:
"""
Traite un dossier de ticket en filtrant les messages HTML.
Args:
ticket_folder: Chemin du dossier du ticket à traiter
output_base_dir: Répertoire de base pour la sortie des fichiers filtrés
"""
ticket_name = os.path.basename(ticket_folder)
output_ticket_dir = os.path.join(output_base_dir, ticket_name)
# Créer le répertoire de sortie s'il n'existe pas
if not os.path.exists(output_ticket_dir):
os.makedirs(output_ticket_dir, exist_ok=True)
# Copier les fichiers d'information du ticket
for file_name in ['ticket_info.json', 'contact_info.json', 'activities.json', 'followers.json', 'timesheets.json']:
src_file = os.path.join(ticket_folder, file_name)
if os.path.exists(src_file):
dst_file = os.path.join(output_ticket_dir, file_name)
shutil.copy2(src_file, dst_file)
print(f"Fichier copié: {file_name}")
# Charger les informations sur les pièces jointes
attachments_info = []
attachments_info_file = os.path.join(ticket_folder, 'attachments_info.json')
if os.path.exists(attachments_info_file):
try:
with open(attachments_info_file, 'r', encoding='utf-8') as f:
attachments_info = json.load(f)
except Exception as e:
print(f"Erreur lors du chargement des informations sur les pièces jointes: {e}")
# Copier le fichier d'informations sur les pièces jointes
dst_file = os.path.join(output_ticket_dir, 'attachments_info.json')
shutil.copy2(attachments_info_file, dst_file)
# Traitement des fichiers de messages
src_messages_dir = os.path.join(ticket_folder, 'messages')
if os.path.exists(src_messages_dir):
# Créer le répertoire de messages filtré
filtered_messages_dir = os.path.join(output_ticket_dir, 'messages')
if not os.path.exists(filtered_messages_dir):
os.makedirs(filtered_messages_dir, exist_ok=True)
# Traiter chaque fichier de message individuel
for file_name in os.listdir(src_messages_dir):
if file_name.endswith('.json'):
message_file_path = os.path.join(src_messages_dir, file_name)
process_message_file(message_file_path, filtered_messages_dir, attachments_info)
# Traitement des fichiers de messages regroupés
messages_file = os.path.join(ticket_folder, 'messages.json')
message_threads_file = os.path.join(ticket_folder, 'message_threads.json')
if os.path.exists(messages_file):
process_messages_collection(messages_file, output_ticket_dir, attachments_info)
if os.path.exists(message_threads_file):
process_messages_threads(message_threads_file, output_ticket_dir, attachments_info)
# Copier le répertoire des pièces jointes (on garde toutes les pièces jointes)
src_attachments_dir = os.path.join(ticket_folder, 'attachments')
if os.path.exists(src_attachments_dir):
dst_attachments_dir = os.path.join(output_ticket_dir, 'attachments')
if os.path.exists(dst_attachments_dir):
shutil.rmtree(dst_attachments_dir)
shutil.copytree(src_attachments_dir, dst_attachments_dir)
print(f"Répertoire des pièces jointes copié")
print(f"Traitement du ticket {ticket_name} terminé")
def filter_exported_tickets(source_dir: str = 'exported_tickets', output_dir: str = 'filtered_tickets') -> None:
"""
Filtre les tickets exportés en nettoyant les messages HTML.
Args:
source_dir: Répertoire source contenant les tickets exportés
output_dir: Répertoire de sortie pour les tickets filtrés
"""
if not os.path.exists(source_dir):
print(f"Le répertoire source {source_dir} n'existe pas.")
return
# Créer le répertoire de sortie s'il n'existe pas
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
# Parcourir tous les éléments du répertoire source
for item in os.listdir(source_dir):
item_path = os.path.join(source_dir, item)
# Vérifier si c'est un dossier qui contient un ticket
if os.path.isdir(item_path) and (
item.startswith('ticket_') or
os.path.exists(os.path.join(item_path, 'ticket_info.json'))
):
process_ticket_folder(item_path, output_dir)
# Si c'est un fichier JSON brut de ticket, le copier simplement
elif os.path.isfile(item_path) and item.endswith('_raw.json'):
shutil.copy2(item_path, os.path.join(output_dir, item))
print(f"Fichier copié: {item}")
print(f"Filtrage des tickets terminé. Les tickets filtrés sont disponibles dans {output_dir}")
def run_filter_wizard() -> None:
"""
Interface utilisateur en ligne de commande pour filtrer les tickets exportés.
"""
print("\n==== FILTRAGE DES MESSAGES DES TICKETS ====")
print("Cette fonction va:")
print("1. Supprimer les messages provenant d'OdooBot")
print("2. Filtrer les images inutiles (logos, signatures, images Outlook)")
print("3. Conserver les images pertinentes pour la demande")
print("4. Identifier les pièces jointes importantes mentionnées dans les messages\n")
# Demander le répertoire source
default_source = 'exported_tickets'
source_dir = input(f"Répertoire source (par défaut: {default_source}): ").strip()
if not source_dir:
source_dir = default_source
# Vérifier si le répertoire source existe
if not os.path.exists(source_dir):
print(f"Le répertoire source {source_dir} n'existe pas.")
return
# Demander le répertoire de sortie
default_output = 'filtered_tickets'
output_dir = input(f"Répertoire de sortie (par défaut: {default_output}): ").strip()
if not output_dir:
output_dir = default_output
# Confirmation
print(f"\nLes tickets du répertoire {source_dir} seront filtrés et placés dans {output_dir}.")
confirm = input("Continuer ? (o/n): ").strip().lower()
if confirm == 'o':
filter_exported_tickets(source_dir, output_dir)
print("\nOpération terminée avec succès!")
else:
print("Opération annulée.")
if __name__ == "__main__":
run_filter_wizard()