odoo_toolkit/data_filter.py
2025-03-20 15:38:12 +01:00

593 lines
24 KiB
Python

import os
import json
import re
from bs4 import BeautifulSoup
import html
import shutil
from typing import Dict, List, Any, Optional, Tuple, Union, Set
def is_odoobot_message(message: Dict[str, Any]) -> bool:
"""
Détecte si un message provient d'OdooBot ou d'un bot système.
Args:
message: Dictionnaire du message à analyser
Returns:
True si le message est d'OdooBot, False sinon
"""
if not message:
return False
# Vérifier par le nom de l'auteur
author_name = ""
if message.get('author_id') and isinstance(message.get('author_id'), list) and len(message.get('author_id')) > 1:
author_name = message.get('author_id')[1].lower()
elif message.get('author_details', {}).get('name'):
author_name = message.get('author_details', {}).get('name', '').lower()
if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name or 'system' in author_name:
return True
# Vérifier par le contenu du message (messages système typiques)
body = message.get('body', '').lower()
if body and isinstance(body, str):
system_patterns = [
r'assigné à',
r'assigned to',
r'étape changée',
r'stage changed',
r'créé automatiquement',
r'automatically created',
r'a modifié la date limite',
r'changed the deadline',
r'a ajouté une pièce jointe',
r'added an attachment'
]
for pattern in system_patterns:
if re.search(pattern, body, re.IGNORECASE):
return True
# Vérifier par le type de message/sous-type
if message.get('message_type') == 'notification':
return True
subtype_name = ""
if message.get('subtype_id') and isinstance(message.get('subtype_id'), list) and len(message.get('subtype_id')) > 1:
subtype_name = message.get('subtype_id')[1].lower()
elif message.get('subtype_details') and isinstance(message.get('subtype_details'), list) and len(message.get('subtype_details')) > 0:
subtype_name = message.get('subtype_details')[0].get('name', '').lower()
if subtype_name and ('notification' in subtype_name or 'system' in subtype_name):
return True
return False
def is_important_image(img_tag: Any, message_text: str) -> bool:
"""
Détermine si une image est importante ou s'il s'agit d'une image inutile (logo, signature, etc.).
Args:
img_tag: Balise d'image BeautifulSoup
message_text: Texte du message complet pour contexte
Returns:
True si l'image semble importante, False sinon
"""
# Vérifier les attributs de l'image
img_src = img_tag.get('src', '')
img_alt = img_tag.get('alt', '')
img_class = img_tag.get('class', '')
img_style = img_tag.get('style', '')
# Mots-clés indiquant des images inutiles
useless_patterns = [
'logo', 'signature', 'footer', 'header', 'separator', 'separateur',
'outlook', 'mail_signature', 'icon', 'emoticon', 'emoji', 'cid:',
'pixel', 'spacer', 'vignette', 'footer', 'banner', 'banniere'
]
# Vérifier le src/alt/class pour les motifs inutiles
for pattern in useless_patterns:
if (pattern in img_src.lower() or
pattern in img_alt.lower() or
(isinstance(img_class, list) and any(pattern in c.lower() for c in img_class)) or
(isinstance(img_class, str) and pattern in img_class.lower()) or
pattern in img_style.lower()):
return False
# Vérifier les dimensions (logos et icônes sont souvent petits)
width = img_tag.get('width', '')
height = img_tag.get('height', '')
# Convertir en entiers si possible
try:
width = int(width) if width and width.isdigit() else None
height = int(height) if height and height.isdigit() else None
except (ValueError, TypeError):
# Extraire les dimensions des attributs style si disponibles
if img_style:
width_match = re.search(r'width:[ ]*(\d+)', img_style)
height_match = re.search(r'height:[ ]*(\d+)', img_style)
width = int(width_match.group(1)) if width_match else None
height = int(height_match.group(1)) if height_match else None
# Images très petites sont souvent des éléments décoratifs
if width is not None and height is not None:
if width <= 50 and height <= 50: # Taille arbitraire pour les petites images
return False
# Rechercher des termes qui indiquent l'importance de l'image dans le texte du message
importance_indicators = [
'capture', 'screenshot', 'image', 'photo', 'illustration',
'pièce jointe', 'attachment', 'voir', 'regarder', 'ci-joint',
'écran', 'erreur', 'problème', 'bug', 'issue'
]
for indicator in importance_indicators:
if indicator in message_text.lower():
return True
# Par défaut, considérer l'image comme importante si aucun des filtres ci-dessus ne s'applique
return True
def find_relevant_attachments(message_text: str, attachments_info: List[Dict[str, Any]]) -> List[int]:
"""
Trouve les pièces jointes pertinentes mentionnées dans le message.
Args:
message_text: Texte du message
attachments_info: Liste des informations sur les pièces jointes
Returns:
Liste des IDs des pièces jointes pertinentes
"""
relevant_ids = []
if not message_text or not attachments_info:
return relevant_ids
# Rechercher les mentions de pièces jointes dans le texte
attachment_indicators = [
r'pi(è|e)ce(s)? jointe(s)?', r'attachment(s)?', r'fichier(s)?', r'file(s)?',
r'voir (le|la|les) document(s)?', r'voir (le|la|les) fichier(s)?',
r'voir (le|la|les) image(s)?', r'voir (le|la|les) screenshot(s)?',
r'voir (le|la|les) capture(s)?', r'voir (le|la|les) photo(s)?',
r'voir ci-joint', r'voir ci-dessous', r'voir ci-après',
r'veuillez trouver', r'please find', r'in attachment',
r'joint(e)?(s)?', r'attached', r'screenshot(s)?', r'capture(s)? d(\'|e) (é|e)cran',
r'image(s)?', r'photo(s)?'
]
has_attachment_mention = False
for indicator in attachment_indicators:
if re.search(indicator, message_text, re.IGNORECASE):
has_attachment_mention = True
break
# Si le message mentionne des pièces jointes
if has_attachment_mention:
for attachment in attachments_info:
# Exclure les pièces jointes qui semblent être des signatures ou des logos
name = attachment.get('name', '').lower()
useless_patterns = ['logo', 'signature', 'outlook', 'footer', 'header', 'icon', 'emoticon', 'emoji']
is_useless = False
for pattern in useless_patterns:
if pattern in name:
is_useless = True
break
if not is_useless:
relevant_ids.append(attachment.get('id'))
return relevant_ids
def clean_html(html_content: str) -> str:
"""
Nettoie le contenu HTML en supprimant toutes les balises mais en préservant le texte.
Améliore le traitement des images, supprime les signatures et les éléments inutiles.
Args:
html_content: Contenu HTML à nettoyer
Returns:
Texte nettoyé sans balises HTML
"""
if not html_content:
return ""
# Utiliser BeautifulSoup pour manipuler le HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Supprimer les signatures et pieds de courriels typiques
signature_selectors = [
'div.signature', '.gmail_signature', '.signature',
'hr + div', 'hr + p', '.footer', '.mail-signature',
'.ms-signature', '[data-smartmail="gmail_signature"]'
]
for selector in signature_selectors:
for element in soup.select(selector):
element.decompose()
# Supprimer les lignes horizontales qui séparent souvent les signatures
for hr in soup.find_all('hr'):
hr.decompose()
# Traiter les images
message_text = soup.get_text()
for img in soup.find_all('img'):
if is_important_image(img, message_text):
# Remplacer les images importantes par une description
alt_text = img.get('alt', '') or img.get('title', '') or '[Image importante]'
img.replace_with(f" [Image: {alt_text}] ")
else:
# Supprimer les images inutiles
img.decompose()
# Traiter les références aux pièces jointes
attachment_refs = soup.find_all('a', href=re.compile(r'attachment|piece|fichier|file', re.IGNORECASE))
for ref in attachment_refs:
ref.replace_with(f" [Pièce jointe: {ref.get_text()}] ")
# Filtrer les éléments vides ou non significatifs
for tag in soup.find_all(['span', 'div', 'p']):
if not tag.get_text(strip=True):
tag.decompose()
# Récupérer le texte sans balises HTML
text = soup.get_text(separator=' ', strip=True)
# Décodage des entités HTML spéciales
text = html.unescape(text)
# Nettoyer les espaces multiples
text = re.sub(r'\s+', ' ', text)
# Nettoyer les lignes vides multiples
text = re.sub(r'\n\s*\n', '\n\n', text)
# Supprimer les footers typiques des emails
footer_patterns = [
r'Sent from my .*',
r'Envoyé depuis mon .*',
r'Ce message .*confidentiel.*',
r'This email .*confidential.*',
r'DISCLAIMER.*',
r'CONFIDENTIAL.*',
r'CONFIDENTIEL.*',
r'Le contenu de ce courriel est confidentiel.*',
r'This message and any attachments.*',
r'Ce message et ses pièces jointes.*',
r'AVIS DE CONFIDENTIALITÉ.*',
r'PRIVACY NOTICE.*',
r'Droit à la déconnexion.*',
r'L\'objectif du Support Technique.*',
r'\\*\\*\\*\\*\\*\\* ATTENTION \\*\\*\\*\\*\\*\\*.*',
r'Please consider the environment.*',
r'Pensez à l\'environnement.*'
]
for pattern in footer_patterns:
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
return text.strip()
def process_message_file(message_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None:
"""
Traite un fichier de message en nettoyant le contenu HTML des messages.
Args:
message_file_path: Chemin du fichier de message à traiter
output_dir: Répertoire de sortie pour le fichier traité
attachments_info: Informations sur les pièces jointes (optionnel)
"""
try:
with open(message_file_path, 'r', encoding='utf-8') as f:
message_data = json.load(f)
# Ignorer les messages d'OdooBot
if is_odoobot_message(message_data):
print(f"Message ignoré (OdooBot): {os.path.basename(message_file_path)}")
return
# Vérifier si le message contient un corps HTML
if 'body' in message_data and message_data['body']:
# Remplacer le contenu HTML par le texte filtré
message_data['body'] = clean_html(message_data['body'])
# Identifier les pièces jointes pertinentes si disponibles
if attachments_info and message_data['body']:
relevant_attachments = find_relevant_attachments(message_data['body'], attachments_info)
if relevant_attachments:
message_data['relevant_attachment_ids'] = relevant_attachments
# Écrire le message filtré
output_file_path = os.path.join(output_dir, os.path.basename(message_file_path))
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(message_data, f, indent=4, ensure_ascii=False)
print(f"Message traité: {os.path.basename(message_file_path)}")
except Exception as e:
print(f"Erreur lors du traitement du fichier {message_file_path}: {e}")
def process_messages_threads(threads_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None:
"""
Traite un fichier de threads de messages en nettoyant le contenu HTML.
Args:
threads_file_path: Chemin du fichier de threads de messages
output_dir: Répertoire de sortie pour le fichier traité
attachments_info: Informations sur les pièces jointes (optionnel)
"""
try:
with open(threads_file_path, 'r', encoding='utf-8') as f:
threads_data = json.load(f)
# Stocker les IDs des threads à supprimer (qui ne contiennent que des messages d'OdooBot)
threads_to_remove = []
# Parcourir tous les threads
for thread_id, thread in threads_data.items():
# Vérifier si le message principal existe et n'est pas d'OdooBot
main_message_is_bot = False
if thread.get('main_message'):
if is_odoobot_message(thread['main_message']):
main_message_is_bot = True
# Si c'est un message d'OdooBot, on le supprime
thread['main_message'] = None
elif 'body' in thread['main_message']:
# Sinon, on nettoie le corps du message
thread['main_message']['body'] = clean_html(thread['main_message']['body'])
# Identifier les pièces jointes pertinentes
if attachments_info and thread['main_message']['body']:
relevant_attachments = find_relevant_attachments(
thread['main_message']['body'], attachments_info
)
if relevant_attachments:
thread['main_message']['relevant_attachment_ids'] = relevant_attachments
# Filtrer les réponses pour supprimer celles d'OdooBot
filtered_replies = []
for reply in thread.get('replies', []):
if not is_odoobot_message(reply):
# Nettoyer le corps du message
if 'body' in reply:
reply['body'] = clean_html(reply['body'])
# Identifier les pièces jointes pertinentes
if attachments_info and reply['body']:
relevant_attachments = find_relevant_attachments(reply['body'], attachments_info)
if relevant_attachments:
reply['relevant_attachment_ids'] = relevant_attachments
filtered_replies.append(reply)
# Mettre à jour les réponses
thread['replies'] = filtered_replies
# Si le thread ne contient plus de messages (tous étaient des messages d'OdooBot),
# marquer pour suppression
if main_message_is_bot and not filtered_replies:
threads_to_remove.append(thread_id)
# Supprimer les threads qui ne contiennent que des messages d'OdooBot
for thread_id in threads_to_remove:
del threads_data[thread_id]
# Écrire le fichier de threads filtré
output_file_path = os.path.join(output_dir, os.path.basename(threads_file_path))
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(threads_data, f, indent=4, ensure_ascii=False)
print(f"Fichier de threads traité: {os.path.basename(threads_file_path)}")
if threads_to_remove:
print(f" {len(threads_to_remove)} threads supprimés (messages d'OdooBot uniquement)")
except Exception as e:
print(f"Erreur lors du traitement du fichier {threads_file_path}: {e}")
def process_messages_collection(messages_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None:
"""
Traite un fichier de collection de messages en nettoyant le contenu HTML.
Args:
messages_file_path: Chemin du fichier de collection de messages
output_dir: Répertoire de sortie pour le fichier traité
attachments_info: Informations sur les pièces jointes (optionnel)
"""
try:
with open(messages_file_path, 'r', encoding='utf-8') as f:
messages_data = json.load(f)
# Filtrer les messages pour supprimer ceux d'OdooBot
filtered_messages = []
for message in messages_data:
if not is_odoobot_message(message):
# Nettoyer le corps du message
if 'body' in message:
message['body'] = clean_html(message['body'])
# Identifier les pièces jointes pertinentes
if attachments_info and message['body']:
relevant_attachments = find_relevant_attachments(message['body'], attachments_info)
if relevant_attachments:
message['relevant_attachment_ids'] = relevant_attachments
filtered_messages.append(message)
# Écrire le fichier de messages filtré
output_file_path = os.path.join(output_dir, os.path.basename(messages_file_path))
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(filtered_messages, f, indent=4, ensure_ascii=False)
print(f"Collection de messages traitée: {os.path.basename(messages_file_path)}")
print(f" {len(messages_data) - len(filtered_messages)} messages supprimés (OdooBot)")
except Exception as e:
print(f"Erreur lors du traitement du fichier {messages_file_path}: {e}")
def process_ticket_folder(ticket_folder: str, output_base_dir: str) -> None:
"""
Traite un dossier de ticket en filtrant les messages HTML.
Args:
ticket_folder: Chemin du dossier du ticket à traiter
output_base_dir: Répertoire de base pour la sortie des fichiers filtrés
"""
ticket_name = os.path.basename(ticket_folder)
output_ticket_dir = os.path.join(output_base_dir, ticket_name)
# Créer le répertoire de sortie s'il n'existe pas
if not os.path.exists(output_ticket_dir):
os.makedirs(output_ticket_dir, exist_ok=True)
# Copier les fichiers d'information du ticket
for file_name in ['ticket_info.json', 'contact_info.json', 'activities.json', 'followers.json', 'timesheets.json']:
src_file = os.path.join(ticket_folder, file_name)
if os.path.exists(src_file):
dst_file = os.path.join(output_ticket_dir, file_name)
shutil.copy2(src_file, dst_file)
print(f"Fichier copié: {file_name}")
# Charger les informations sur les pièces jointes si disponibles
attachments_info = []
attachments_info_file = os.path.join(ticket_folder, 'attachments_info.json')
if os.path.exists(attachments_info_file):
try:
with open(attachments_info_file, 'r', encoding='utf-8') as f:
attachments_info = json.load(f)
except Exception as e:
print(f"Erreur lors du chargement des informations sur les pièces jointes: {e}")
# Copier le fichier d'informations sur les pièces jointes
dst_file = os.path.join(output_ticket_dir, 'attachments_info.json')
shutil.copy2(attachments_info_file, dst_file)
# Traitement des fichiers de messages
src_messages_dir = os.path.join(ticket_folder, 'messages')
if os.path.exists(src_messages_dir):
# Créer le répertoire de messages filtré
filtered_messages_dir = os.path.join(output_ticket_dir, 'messages')
if not os.path.exists(filtered_messages_dir):
os.makedirs(filtered_messages_dir, exist_ok=True)
# Traiter chaque fichier de message individuel
for file_name in os.listdir(src_messages_dir):
if file_name.endswith('.json'):
message_file_path = os.path.join(src_messages_dir, file_name)
process_message_file(message_file_path, filtered_messages_dir, attachments_info)
# Traitement des fichiers de messages regroupés
messages_file = os.path.join(ticket_folder, 'messages.json')
message_threads_file = os.path.join(ticket_folder, 'message_threads.json')
if os.path.exists(messages_file):
process_messages_collection(messages_file, output_ticket_dir, attachments_info)
if os.path.exists(message_threads_file):
process_messages_threads(message_threads_file, output_ticket_dir, attachments_info)
# Copier le répertoire des pièces jointes (on conserve toutes les pièces jointes)
src_attachments_dir = os.path.join(ticket_folder, 'attachments')
if os.path.exists(src_attachments_dir):
dst_attachments_dir = os.path.join(output_ticket_dir, 'attachments')
if os.path.exists(dst_attachments_dir):
shutil.rmtree(dst_attachments_dir)
shutil.copytree(src_attachments_dir, dst_attachments_dir)
print(f"Répertoire des pièces jointes copié")
print(f"Traitement du ticket {ticket_name} terminé")
def filter_exported_tickets(source_dir: str = 'exported_tickets', output_dir: str = 'filtered_tickets') -> None:
"""
Filtre les tickets exportés en nettoyant les messages HTML.
Args:
source_dir: Répertoire source contenant les tickets exportés
output_dir: Répertoire de sortie pour les tickets filtrés
"""
if not os.path.exists(source_dir):
print(f"Le répertoire source {source_dir} n'existe pas.")
return
# Créer le répertoire de sortie s'il n'existe pas
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
# Parcourir tous les éléments du répertoire source
for item in os.listdir(source_dir):
item_path = os.path.join(source_dir, item)
# Vérifier si c'est un dossier qui contient un ticket
if os.path.isdir(item_path) and (
item.startswith('ticket_') or
os.path.exists(os.path.join(item_path, 'ticket_info.json'))
):
process_ticket_folder(item_path, output_dir)
# Si c'est un fichier JSON brut de ticket, le copier simplement
elif os.path.isfile(item_path) and item.endswith('_raw.json'):
shutil.copy2(item_path, os.path.join(output_dir, item))
print(f"Fichier copié: {item}")
print(f"Filtrage des tickets terminé. Les tickets filtrés sont disponibles dans {output_dir}")
def run_filter_wizard() -> None:
"""
Interface utilisateur en ligne de commande pour filtrer les tickets exportés.
"""
print("\n==== FILTRAGE DES MESSAGES DES TICKETS ====")
print("Cette fonction va:")
print("1. Supprimer les messages provenant d'OdooBot")
print("2. Supprimer les logos, signatures et images non pertinentes")
print("3. Conserver uniquement le texte utile des messages")
print("4. Identifier les pièces jointes mentionnées dans les messages\n")
# Demander le répertoire source
default_source = 'exported_tickets'
source_dir = input(f"Répertoire source (par défaut: {default_source}): ").strip()
if not source_dir:
source_dir = default_source
# Vérifier si le répertoire source existe
if not os.path.exists(source_dir):
print(f"Le répertoire source {source_dir} n'existe pas.")
return
# Demander le répertoire de sortie
default_output = 'filtered_tickets'
output_dir = input(f"Répertoire de sortie (par défaut: {default_output}): ").strip()
if not output_dir:
output_dir = default_output
# Confirmation
print(f"\nLes tickets du répertoire {source_dir} seront filtrés et placés dans {output_dir}.")
confirm = input("Continuer ? (o/n): ").strip().lower()
if confirm == 'o':
filter_exported_tickets(source_dir, output_dir)
print("\nOpération terminée avec succès!")
else:
print("Opération annulée.")
if __name__ == "__main__":
run_filter_wizard()