""" Utilitaires généraux pour l'ensemble de l'application. """ import os import json import logging import re from typing import Dict, Any, List, Optional, Union from datetime import datetime import unicodedata from bs4 import Tag def setup_logging(level: Union[str, int] = logging.INFO, log_file: Optional[str] = None) -> None: """ Configure la journalisation avec un format spécifique et éventuellement un fichier de logs. Args: level: Niveau de journalisation en tant que chaîne (ex: "INFO", "DEBUG") ou valeur entière (default: logging.INFO) log_file: Chemin du fichier de log (default: None) """ # Convertir le niveau de log si c'est une chaîne if isinstance(level, str): numeric_level = getattr(logging, level.upper(), None) if not isinstance(numeric_level, int): raise ValueError(f"Niveau de journalisation invalide: {level}") else: numeric_level = level logging.basicConfig( level=numeric_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Ajout d'un gestionnaire de fichier si log_file est spécifié if log_file: # S'assurer que le répertoire existe log_dir = os.path.dirname(log_file) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(numeric_level) file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S') file_handler.setFormatter(file_formatter) logging.getLogger().addHandler(file_handler) def log_separator(length: int = 60) -> None: """ Ajoute une ligne de séparation dans les logs. Args: length: Longueur de la ligne (default: 60) """ logging.info("-" * length) def save_json(data: Any, file_path: str) -> bool: """ Sauvegarde des données au format JSON dans un fichier. Args: data: Données à sauvegarder file_path: Chemin du fichier Returns: True si la sauvegarde a réussi, False sinon """ try: with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return True except Exception as e: logging.error(f"Erreur lors de la sauvegarde du fichier JSON {file_path}: {e}") return False def save_text(text: str, file_path: str) -> bool: """ Sauvegarde du texte dans un fichier. Args: text: Texte à sauvegarder file_path: Chemin du fichier Returns: True si la sauvegarde a réussi, False sinon """ try: # S'assurer que le répertoire existe directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): os.makedirs(directory, exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: f.write(text) return True except Exception as e: logging.error(f"Erreur lors de la sauvegarde du fichier texte {file_path}: {e}") return False def normalize_filename(name: str) -> str: """ Normalise un nom de fichier en remplaçant les caractères non autorisés. Args: name: Nom à normaliser Returns: Nom normalisé """ # Enlever les accents name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII') # Remplacer les caractères non alphanumériques par des underscores name = re.sub(r'[^\w\.-]', '_', name) # Remplacer les caractères non autorisés par des underscores sanitized = re.sub(r'[\\/*?:"<>|]', '_', name) # Limiter la longueur du nom à 100 caractères if len(sanitized) > 100: sanitized = sanitized[:97] + "..." return sanitized.strip() def detect_duplicate_content(messages: List[Dict[str, Any]]) -> List[int]: """ Détecte les messages avec un contenu dupliqué et retourne leurs indices. Args: messages: Liste de messages à analyser Returns: Liste des indices des messages dupliqués """ try: # Import here to avoid circular imports from formatters.clean_html import clean_html except ImportError: # Fallback to a simplified version if the import fails def clean_html(html_content, is_forwarded=False, is_description=False, strategy="standard", preserve_links=False, preserve_images=False, preserve_doc_links=True): return html_content.strip() if html_content else "" content_map = {} duplicate_indices = [] for idx, message in enumerate(messages): body = message.get("body", "") if not body: continue # Nettoyer le contenu HTML pour la comparaison cleaned_content = clean_html(body, is_description=False) # Considérer uniquement les messages avec du contenu significatif if len(cleaned_content.strip()) < 10: continue # Vérifier si le contenu existe déjà if cleaned_content in content_map: duplicate_indices.append(idx) else: content_map[cleaned_content] = idx return duplicate_indices def get_timestamp() -> str: """ Retourne un timestamp au format YYYYMMDD_HHMMSS Returns: Chaîne formatée avec le timestamp """ return datetime.now().strftime("%Y%m%d_%H%M%S") def ensure_dir(path: str) -> bool: """ S'assure qu'un répertoire existe, le crée si nécessaire. Args: path: Chemin du répertoire à créer Returns: True si le répertoire existe ou a été créé avec succès, False sinon """ try: if not os.path.exists(path): os.makedirs(path, exist_ok=True) return True except Exception as e: logging.error(f"Erreur lors de la création du répertoire {path}: {e}") return False def is_important_image(tag: Tag, message_text: str) -> bool: """ Détermine si une image est importante ou s'il s'agit d'un logo/signature. Args: tag: La balise d'image à analyser message_text: Le texte complet du message pour contexte Returns: True si l'image semble importante, False sinon """ # Vérifier les attributs de l'image src = str(tag.get('src', '')) alt = str(tag.get('alt', '')) title = str(tag.get('title', '')) # Patterns pour les images inutiles useless_img_patterns = [ 'logo', 'signature', 'outlook', 'footer', 'header', 'icon', 'emoticon', 'emoji', 'cid:', 'pixel', 'spacer', 'vignette', 'banner', 'separator', 'decoration', 'mail_signature' ] # Vérifier si c'est une image inutile for pattern in useless_img_patterns: if (pattern in src.lower() or pattern in alt.lower() or pattern in title.lower()): return False # Vérifier la taille width_str = str(tag.get('width', '')) height_str = str(tag.get('height', '')) try: if width_str.isdigit() and height_str.isdigit(): width = int(width_str) height = int(height_str) if width <= 50 and height <= 50: return False except (ValueError, TypeError): pass # Vérifier si l'image est mentionnée dans le texte image_indicators = [ 'capture', 'screenshot', 'image', 'photo', 'illustration', 'voir', 'regarder', 'ci-joint', 'écran', 'erreur', 'problème', 'bug', 'pièce jointe', 'attachment', 'veuillez trouver' ] for indicator in image_indicators: if indicator in message_text.lower(): return True return True