mirror of
https://github.com/Ladebeze66/odoo_toolkit.git
synced 2025-12-13 09:06:52 +01:00
544 lines
22 KiB
Python
544 lines
22 KiB
Python
import os
|
|
import json
|
|
import re
|
|
from bs4 import BeautifulSoup
|
|
import html
|
|
import shutil
|
|
from typing import Dict, List, Any, Optional, Tuple, Union, Set
|
|
|
|
|
|
def is_odoobot_author(message: Dict[str, Any]) -> bool:
|
|
"""
|
|
Vérifie si l'auteur du message est OdooBot ou un autre système.
|
|
|
|
Args:
|
|
message: Le message à vérifier
|
|
|
|
Returns:
|
|
True si le message provient d'OdooBot, False sinon
|
|
"""
|
|
# Vérifier le nom de l'auteur
|
|
if 'author_id' in message and isinstance(message['author_id'], list) and len(message['author_id']) > 1:
|
|
author_name = message['author_id'][1].lower()
|
|
if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name:
|
|
return True
|
|
|
|
# Vérifier le type de message (souvent les notifications système)
|
|
if message.get('message_type') == 'notification':
|
|
return True
|
|
|
|
# Vérifier le sous-type du message
|
|
if 'subtype_id' in message and isinstance(message['subtype_id'], list) and len(message['subtype_id']) > 1:
|
|
subtype = message['subtype_id'][1].lower()
|
|
if 'notification' in subtype or 'system' in subtype:
|
|
return True
|
|
|
|
# Vérifier le contenu du message
|
|
if 'body' in message and isinstance(message['body'], str):
|
|
body = message['body'].lower()
|
|
system_patterns = [
|
|
'assigné à', 'étape changée', 'créé automatiquement',
|
|
'assigned to', 'stage changed', 'automatically created',
|
|
'updated', 'mis à jour', 'a modifié', 'changed'
|
|
]
|
|
|
|
for pattern in system_patterns:
|
|
if pattern in body:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def is_important_image(tag, message_text: str) -> bool:
|
|
"""
|
|
Détermine si une image est importante ou s'il s'agit d'un logo/signature.
|
|
|
|
Args:
|
|
tag: La balise d'image à analyser
|
|
message_text: Le texte complet du message pour contexte
|
|
|
|
Returns:
|
|
True si l'image semble importante, False sinon
|
|
"""
|
|
# Vérifier les attributs de l'image
|
|
src = tag.get('src', '')
|
|
alt = tag.get('alt', '')
|
|
title = tag.get('title', '')
|
|
css_class = tag.get('class', '')
|
|
|
|
# Patterns pour les images inutiles
|
|
useless_img_patterns = [
|
|
'logo', 'signature', 'outlook', 'footer', 'header', 'icon',
|
|
'emoticon', 'emoji', 'cid:', 'pixel', 'spacer', 'vignette',
|
|
'banner', 'separator', 'decoration', 'mail_signature'
|
|
]
|
|
|
|
# Vérifier si c'est une image inutile
|
|
for pattern in useless_img_patterns:
|
|
if (pattern in src.lower() or
|
|
pattern in alt.lower() or
|
|
pattern in title.lower() or
|
|
(css_class and any(pattern in c.lower() for c in css_class if isinstance(c, str)))):
|
|
return False
|
|
|
|
# Vérifier la taille (les petites images sont souvent des icônes/logos)
|
|
width = tag.get('width', '')
|
|
height = tag.get('height', '')
|
|
try:
|
|
width = int(width) if width and str(width).isdigit() else None
|
|
height = int(height) if height and str(height).isdigit() else None
|
|
if width and height and width <= 50 and height <= 50:
|
|
return False
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Vérifier si l'image est mentionnée dans le texte
|
|
image_indicators = [
|
|
'capture', 'screenshot', 'image', 'photo', 'illustration',
|
|
'voir', 'regarder', 'ci-joint', 'écran', 'erreur', 'problème',
|
|
'bug', 'pièce jointe', 'attachment', 'veuillez trouver'
|
|
]
|
|
|
|
for indicator in image_indicators:
|
|
if indicator in message_text.lower():
|
|
return True
|
|
|
|
# Par défaut, considérer les images qui ne sont pas clairement inutiles comme potentiellement importantes
|
|
return True
|
|
|
|
|
|
def find_attachment_references(message_text: str, attachments_info: List[Dict[str, Any]]) -> List[int]:
|
|
"""
|
|
Identifie les pièces jointes mentionnées dans le message.
|
|
|
|
Args:
|
|
message_text: Texte du message
|
|
attachments_info: Informations sur les pièces jointes disponibles
|
|
|
|
Returns:
|
|
Liste des IDs des pièces jointes pertinentes
|
|
"""
|
|
if not message_text or not attachments_info:
|
|
return []
|
|
|
|
# Patterns indiquant des pièces jointes
|
|
attachment_indicators = [
|
|
r'pi[èe]ce[s]? jointe[s]?', r'attachment[s]?', r'fichier[s]?', r'file[s]?',
|
|
r'veuillez trouver', r'please find', r'voir ci-joint', r'voir ci-dessous',
|
|
r'ci-joint', r'joint[e]?[s]?', r'attached', r'screenshot[s]?',
|
|
r'capture[s]? d[\'e] ?[ée]cran', r'image[s]?', r'photo[s]?'
|
|
]
|
|
|
|
relevant_ids = []
|
|
|
|
# Vérifier si le message mentionne des pièces jointes
|
|
mention_found = False
|
|
for pattern in attachment_indicators:
|
|
if re.search(pattern, message_text, re.IGNORECASE):
|
|
mention_found = True
|
|
break
|
|
|
|
if mention_found:
|
|
# Identifier les pièces jointes pertinentes (non logos/images d'interface)
|
|
for attachment in attachments_info:
|
|
name = attachment.get('name', '').lower() if attachment.get('name') else ''
|
|
|
|
# Exclure les pièces jointes qui semblent être des logos ou images d'interface
|
|
useless_patterns = ['logo', 'signature', 'outlook', 'icon', 'emoticon', 'emoji']
|
|
is_useless = any(pattern in name for pattern in useless_patterns)
|
|
|
|
if not is_useless and 'id' in attachment:
|
|
relevant_ids.append(attachment['id'])
|
|
|
|
return relevant_ids
|
|
|
|
|
|
def clean_html(html_content: str) -> str:
|
|
"""
|
|
Nettoie le contenu HTML en supprimant toutes les balises mais en préservant le texte.
|
|
Traite spécifiquement les images pour garder uniquement celles pertinentes.
|
|
|
|
Args:
|
|
html_content: Contenu HTML à nettoyer
|
|
|
|
Returns:
|
|
Texte nettoyé sans balises HTML
|
|
"""
|
|
if not html_content:
|
|
return ""
|
|
|
|
# Utiliser BeautifulSoup pour manipuler le HTML
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# Supprimer les éléments de signature
|
|
signature_elements = [
|
|
'div.signature', '.gmail_signature', '.signature',
|
|
'hr + div', 'hr + p', '.footer', '.mail-signature'
|
|
]
|
|
|
|
for selector in signature_elements:
|
|
for element in soup.select(selector):
|
|
element.decompose()
|
|
|
|
# Supprimer les lignes horizontales (souvent utilisées pour séparer les signatures)
|
|
for hr in soup.find_all('hr'):
|
|
hr.decompose()
|
|
|
|
# Récupérer le texte complet pour analyse
|
|
full_text = soup.get_text(' ', strip=True)
|
|
|
|
# Traiter les images
|
|
for img in soup.find_all('img'):
|
|
if is_important_image(img, full_text):
|
|
# Remplacer les images importantes par une description
|
|
alt_text = img.get('alt', '') or img.get('title', '') or '[Image importante]'
|
|
img.replace_with(f" [Image: {alt_text}] ")
|
|
else:
|
|
# Supprimer les images non pertinentes
|
|
img.decompose()
|
|
|
|
# Traiter les liens vers des pièces jointes
|
|
for a in soup.find_all('a', href=True):
|
|
href = a.get('href', '').lower()
|
|
if 'attachment' in href or 'download' in href or 'file' in href:
|
|
a.replace_with(f" [Pièce jointe: {a.get_text()}] ")
|
|
|
|
# Récupérer le texte sans balises HTML
|
|
text = soup.get_text(separator=' ', strip=True)
|
|
|
|
# Décodage des entités HTML spéciales
|
|
text = html.unescape(text)
|
|
|
|
# Nettoyer les espaces multiples
|
|
text = re.sub(r'\s+', ' ', text)
|
|
|
|
# Nettoyer les lignes vides multiples
|
|
text = re.sub(r'\n\s*\n', '\n\n', text)
|
|
|
|
# Supprimer les disclaimers et signatures standards
|
|
footer_patterns = [
|
|
r'Sent from my .*',
|
|
r'Envoyé depuis mon .*',
|
|
r'Ce message .*confidentiel.*',
|
|
r'This email .*confidential.*',
|
|
r'DISCLAIMER.*',
|
|
r'CONFIDENTIAL.*',
|
|
r'CONFIDENTIEL.*',
|
|
r'Le contenu de ce courriel est confidentiel.*',
|
|
r'This message and any attachments.*',
|
|
r'Ce message et ses pièces jointes.*',
|
|
r'AVIS DE CONFIDENTIALITÉ.*',
|
|
r'PRIVACY NOTICE.*'
|
|
]
|
|
|
|
for pattern in footer_patterns:
|
|
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
|
|
|
|
return text.strip()
|
|
|
|
|
|
def process_message_file(message_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None:
|
|
"""
|
|
Traite un fichier de message en nettoyant le contenu HTML des messages.
|
|
|
|
Args:
|
|
message_file_path: Chemin du fichier de message à traiter
|
|
output_dir: Répertoire de sortie pour le fichier traité
|
|
attachments_info: Informations sur les pièces jointes disponibles
|
|
"""
|
|
try:
|
|
with open(message_file_path, 'r', encoding='utf-8') as f:
|
|
message_data = json.load(f)
|
|
|
|
# Ignorer les messages d'OdooBot
|
|
if is_odoobot_author(message_data):
|
|
print(f"Message ignoré (OdooBot): {os.path.basename(message_file_path)}")
|
|
return
|
|
|
|
# Vérifier si le message contient un corps HTML
|
|
if 'body' in message_data and message_data['body']:
|
|
# Remplacer le contenu HTML par le texte filtré
|
|
message_data['body'] = clean_html(message_data['body'])
|
|
|
|
# Identifier les pièces jointes pertinentes mentionnées dans le message
|
|
if attachments_info and message_data['body']:
|
|
relevant_attachments = find_attachment_references(message_data['body'], attachments_info)
|
|
if relevant_attachments:
|
|
message_data['relevant_attachment_ids'] = relevant_attachments
|
|
|
|
# Écrire le message filtré
|
|
output_file_path = os.path.join(output_dir, os.path.basename(message_file_path))
|
|
with open(output_file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(message_data, f, indent=4, ensure_ascii=False)
|
|
|
|
print(f"Message traité: {os.path.basename(message_file_path)}")
|
|
|
|
except Exception as e:
|
|
print(f"Erreur lors du traitement du fichier {message_file_path}: {e}")
|
|
|
|
|
|
def process_messages_threads(threads_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None:
|
|
"""
|
|
Traite un fichier de threads de messages en nettoyant le contenu HTML.
|
|
|
|
Args:
|
|
threads_file_path: Chemin du fichier de threads de messages
|
|
output_dir: Répertoire de sortie pour le fichier traité
|
|
attachments_info: Informations sur les pièces jointes disponibles
|
|
"""
|
|
try:
|
|
with open(threads_file_path, 'r', encoding='utf-8') as f:
|
|
threads_data = json.load(f)
|
|
|
|
# Liste des threads à supprimer (ceux qui ne contiennent que des messages d'OdooBot)
|
|
threads_to_remove = []
|
|
|
|
# Parcourir tous les threads
|
|
for thread_id, thread in threads_data.items():
|
|
# Traiter le message principal
|
|
main_message_is_bot = False
|
|
if thread.get('main_message'):
|
|
if is_odoobot_author(thread['main_message']):
|
|
main_message_is_bot = True
|
|
thread['main_message'] = None
|
|
elif 'body' in thread['main_message']:
|
|
thread['main_message']['body'] = clean_html(thread['main_message']['body'])
|
|
|
|
# Identifier les pièces jointes pertinentes
|
|
if attachments_info and thread['main_message']['body']:
|
|
relevant_attachments = find_attachment_references(
|
|
thread['main_message']['body'], attachments_info
|
|
)
|
|
if relevant_attachments:
|
|
thread['main_message']['relevant_attachment_ids'] = relevant_attachments
|
|
|
|
# Traiter les réponses (filtrer les messages d'OdooBot)
|
|
filtered_replies = []
|
|
for reply in thread.get('replies', []):
|
|
if not is_odoobot_author(reply):
|
|
if 'body' in reply:
|
|
reply['body'] = clean_html(reply['body'])
|
|
|
|
# Identifier les pièces jointes pertinentes
|
|
if attachments_info and reply['body']:
|
|
relevant_attachments = find_attachment_references(reply['body'], attachments_info)
|
|
if relevant_attachments:
|
|
reply['relevant_attachment_ids'] = relevant_attachments
|
|
|
|
filtered_replies.append(reply)
|
|
|
|
# Mettre à jour les réponses
|
|
thread['replies'] = filtered_replies
|
|
|
|
# Si le thread ne contient que des messages de bot, le marquer pour suppression
|
|
if main_message_is_bot and not filtered_replies:
|
|
threads_to_remove.append(thread_id)
|
|
|
|
# Supprimer les threads qui ne contiennent que des messages d'OdooBot
|
|
for thread_id in threads_to_remove:
|
|
del threads_data[thread_id]
|
|
|
|
# Écrire le fichier de threads filtré
|
|
output_file_path = os.path.join(output_dir, os.path.basename(threads_file_path))
|
|
with open(output_file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(threads_data, f, indent=4, ensure_ascii=False)
|
|
|
|
print(f"Fichier de threads traité: {os.path.basename(threads_file_path)}")
|
|
if threads_to_remove:
|
|
print(f" {len(threads_to_remove)} threads supprimés (OdooBot uniquement)")
|
|
|
|
except Exception as e:
|
|
print(f"Erreur lors du traitement du fichier {threads_file_path}: {e}")
|
|
|
|
|
|
def process_messages_collection(messages_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None:
|
|
"""
|
|
Traite un fichier de collection de messages en nettoyant le contenu HTML.
|
|
|
|
Args:
|
|
messages_file_path: Chemin du fichier de collection de messages
|
|
output_dir: Répertoire de sortie pour le fichier traité
|
|
attachments_info: Informations sur les pièces jointes disponibles
|
|
"""
|
|
try:
|
|
with open(messages_file_path, 'r', encoding='utf-8') as f:
|
|
messages_data = json.load(f)
|
|
|
|
# Filtrer les messages pour supprimer ceux d'OdooBot
|
|
filtered_messages = []
|
|
for message in messages_data:
|
|
if not is_odoobot_author(message):
|
|
# Nettoyer le contenu HTML
|
|
if 'body' in message:
|
|
message['body'] = clean_html(message['body'])
|
|
|
|
# Identifier les pièces jointes pertinentes
|
|
if attachments_info and message['body']:
|
|
relevant_attachments = find_attachment_references(message['body'], attachments_info)
|
|
if relevant_attachments:
|
|
message['relevant_attachment_ids'] = relevant_attachments
|
|
|
|
filtered_messages.append(message)
|
|
|
|
# Écrire le fichier de messages filtré
|
|
output_file_path = os.path.join(output_dir, os.path.basename(messages_file_path))
|
|
with open(output_file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(filtered_messages, f, indent=4, ensure_ascii=False)
|
|
|
|
print(f"Collection de messages traitée: {os.path.basename(messages_file_path)}")
|
|
print(f" {len(messages_data) - len(filtered_messages)} messages supprimés (OdooBot)")
|
|
|
|
except Exception as e:
|
|
print(f"Erreur lors du traitement du fichier {messages_file_path}: {e}")
|
|
|
|
|
|
def process_ticket_folder(ticket_folder: str, output_base_dir: str) -> None:
|
|
"""
|
|
Traite un dossier de ticket en filtrant les messages HTML.
|
|
|
|
Args:
|
|
ticket_folder: Chemin du dossier du ticket à traiter
|
|
output_base_dir: Répertoire de base pour la sortie des fichiers filtrés
|
|
"""
|
|
ticket_name = os.path.basename(ticket_folder)
|
|
output_ticket_dir = os.path.join(output_base_dir, ticket_name)
|
|
|
|
# Créer le répertoire de sortie s'il n'existe pas
|
|
if not os.path.exists(output_ticket_dir):
|
|
os.makedirs(output_ticket_dir, exist_ok=True)
|
|
|
|
# Copier les fichiers d'information du ticket
|
|
for file_name in ['ticket_info.json', 'contact_info.json', 'activities.json', 'followers.json', 'timesheets.json']:
|
|
src_file = os.path.join(ticket_folder, file_name)
|
|
if os.path.exists(src_file):
|
|
dst_file = os.path.join(output_ticket_dir, file_name)
|
|
shutil.copy2(src_file, dst_file)
|
|
print(f"Fichier copié: {file_name}")
|
|
|
|
# Charger les informations sur les pièces jointes
|
|
attachments_info = []
|
|
attachments_info_file = os.path.join(ticket_folder, 'attachments_info.json')
|
|
if os.path.exists(attachments_info_file):
|
|
try:
|
|
with open(attachments_info_file, 'r', encoding='utf-8') as f:
|
|
attachments_info = json.load(f)
|
|
except Exception as e:
|
|
print(f"Erreur lors du chargement des informations sur les pièces jointes: {e}")
|
|
|
|
# Copier le fichier d'informations sur les pièces jointes
|
|
dst_file = os.path.join(output_ticket_dir, 'attachments_info.json')
|
|
shutil.copy2(attachments_info_file, dst_file)
|
|
|
|
# Traitement des fichiers de messages
|
|
src_messages_dir = os.path.join(ticket_folder, 'messages')
|
|
if os.path.exists(src_messages_dir):
|
|
# Créer le répertoire de messages filtré
|
|
filtered_messages_dir = os.path.join(output_ticket_dir, 'messages')
|
|
if not os.path.exists(filtered_messages_dir):
|
|
os.makedirs(filtered_messages_dir, exist_ok=True)
|
|
|
|
# Traiter chaque fichier de message individuel
|
|
for file_name in os.listdir(src_messages_dir):
|
|
if file_name.endswith('.json'):
|
|
message_file_path = os.path.join(src_messages_dir, file_name)
|
|
process_message_file(message_file_path, filtered_messages_dir, attachments_info)
|
|
|
|
# Traitement des fichiers de messages regroupés
|
|
messages_file = os.path.join(ticket_folder, 'messages.json')
|
|
message_threads_file = os.path.join(ticket_folder, 'message_threads.json')
|
|
|
|
if os.path.exists(messages_file):
|
|
process_messages_collection(messages_file, output_ticket_dir, attachments_info)
|
|
|
|
if os.path.exists(message_threads_file):
|
|
process_messages_threads(message_threads_file, output_ticket_dir, attachments_info)
|
|
|
|
# Copier le répertoire des pièces jointes (on garde toutes les pièces jointes)
|
|
src_attachments_dir = os.path.join(ticket_folder, 'attachments')
|
|
if os.path.exists(src_attachments_dir):
|
|
dst_attachments_dir = os.path.join(output_ticket_dir, 'attachments')
|
|
if os.path.exists(dst_attachments_dir):
|
|
shutil.rmtree(dst_attachments_dir)
|
|
shutil.copytree(src_attachments_dir, dst_attachments_dir)
|
|
print(f"Répertoire des pièces jointes copié")
|
|
|
|
print(f"Traitement du ticket {ticket_name} terminé")
|
|
|
|
|
|
def filter_exported_tickets(source_dir: str = 'exported_tickets', output_dir: str = 'filtered_tickets') -> None:
|
|
"""
|
|
Filtre les tickets exportés en nettoyant les messages HTML.
|
|
|
|
Args:
|
|
source_dir: Répertoire source contenant les tickets exportés
|
|
output_dir: Répertoire de sortie pour les tickets filtrés
|
|
"""
|
|
if not os.path.exists(source_dir):
|
|
print(f"Le répertoire source {source_dir} n'existe pas.")
|
|
return
|
|
|
|
# Créer le répertoire de sortie s'il n'existe pas
|
|
if not os.path.exists(output_dir):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Parcourir tous les éléments du répertoire source
|
|
for item in os.listdir(source_dir):
|
|
item_path = os.path.join(source_dir, item)
|
|
|
|
# Vérifier si c'est un dossier qui contient un ticket
|
|
if os.path.isdir(item_path) and (
|
|
item.startswith('ticket_') or
|
|
os.path.exists(os.path.join(item_path, 'ticket_info.json'))
|
|
):
|
|
process_ticket_folder(item_path, output_dir)
|
|
|
|
# Si c'est un fichier JSON brut de ticket, le copier simplement
|
|
elif os.path.isfile(item_path) and item.endswith('_raw.json'):
|
|
shutil.copy2(item_path, os.path.join(output_dir, item))
|
|
print(f"Fichier copié: {item}")
|
|
|
|
print(f"Filtrage des tickets terminé. Les tickets filtrés sont disponibles dans {output_dir}")
|
|
|
|
|
|
def run_filter_wizard() -> None:
|
|
"""
|
|
Interface utilisateur en ligne de commande pour filtrer les tickets exportés.
|
|
"""
|
|
print("\n==== FILTRAGE DES MESSAGES DES TICKETS ====")
|
|
print("Cette fonction va:")
|
|
print("1. Supprimer les messages provenant d'OdooBot")
|
|
print("2. Filtrer les images inutiles (logos, signatures, images Outlook)")
|
|
print("3. Conserver les images pertinentes pour la demande")
|
|
print("4. Identifier les pièces jointes importantes mentionnées dans les messages\n")
|
|
|
|
# Demander le répertoire source
|
|
default_source = 'exported_tickets'
|
|
source_dir = input(f"Répertoire source (par défaut: {default_source}): ").strip()
|
|
if not source_dir:
|
|
source_dir = default_source
|
|
|
|
# Vérifier si le répertoire source existe
|
|
if not os.path.exists(source_dir):
|
|
print(f"Le répertoire source {source_dir} n'existe pas.")
|
|
return
|
|
|
|
# Demander le répertoire de sortie
|
|
default_output = 'filtered_tickets'
|
|
output_dir = input(f"Répertoire de sortie (par défaut: {default_output}): ").strip()
|
|
if not output_dir:
|
|
output_dir = default_output
|
|
|
|
# Confirmation
|
|
print(f"\nLes tickets du répertoire {source_dir} seront filtrés et placés dans {output_dir}.")
|
|
confirm = input("Continuer ? (o/n): ").strip().lower()
|
|
|
|
if confirm == 'o':
|
|
filter_exported_tickets(source_dir, output_dir)
|
|
print("\nOpération terminée avec succès!")
|
|
else:
|
|
print("Opération annulée.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_filter_wizard()
|