mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-13 10:46:51 +01:00
246 lines
7.9 KiB
Python
246 lines
7.9 KiB
Python
"""
|
|
Utilitaires généraux pour l'ensemble de l'application.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
import re
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from datetime import datetime
|
|
import unicodedata
|
|
from bs4 import Tag
|
|
|
|
def setup_logging(level: Union[str, int] = logging.INFO, log_file: Optional[str] = None) -> None:
|
|
"""
|
|
Configure la journalisation avec un format spécifique et éventuellement un fichier de logs.
|
|
|
|
Args:
|
|
level: Niveau de journalisation en tant que chaîne (ex: "INFO", "DEBUG") ou valeur entière (default: logging.INFO)
|
|
log_file: Chemin du fichier de log (default: None)
|
|
"""
|
|
# Convertir le niveau de log si c'est une chaîne
|
|
if isinstance(level, str):
|
|
numeric_level = getattr(logging, level.upper(), None)
|
|
if not isinstance(numeric_level, int):
|
|
raise ValueError(f"Niveau de journalisation invalide: {level}")
|
|
else:
|
|
numeric_level = level
|
|
|
|
logging.basicConfig(
|
|
level=numeric_level,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
# Ajout d'un gestionnaire de fichier si log_file est spécifié
|
|
if log_file:
|
|
# S'assurer que le répertoire existe
|
|
log_dir = os.path.dirname(log_file)
|
|
if log_dir and not os.path.exists(log_dir):
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
file_handler = logging.FileHandler(log_file, encoding='utf-8')
|
|
file_handler.setLevel(numeric_level)
|
|
file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S')
|
|
file_handler.setFormatter(file_formatter)
|
|
logging.getLogger().addHandler(file_handler)
|
|
|
|
def log_separator(length: int = 60) -> None:
|
|
"""
|
|
Ajoute une ligne de séparation dans les logs.
|
|
|
|
Args:
|
|
length: Longueur de la ligne (default: 60)
|
|
"""
|
|
logging.info("-" * length)
|
|
|
|
def save_json(data: Any, file_path: str) -> bool:
|
|
"""
|
|
Sauvegarde des données au format JSON dans un fichier.
|
|
|
|
Args:
|
|
data: Données à sauvegarder
|
|
file_path: Chemin du fichier
|
|
|
|
Returns:
|
|
True si la sauvegarde a réussi, False sinon
|
|
"""
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Erreur lors de la sauvegarde du fichier JSON {file_path}: {e}")
|
|
return False
|
|
|
|
def save_text(text: str, file_path: str) -> bool:
|
|
"""
|
|
Sauvegarde du texte dans un fichier.
|
|
|
|
Args:
|
|
text: Texte à sauvegarder
|
|
file_path: Chemin du fichier
|
|
|
|
Returns:
|
|
True si la sauvegarde a réussi, False sinon
|
|
"""
|
|
try:
|
|
# S'assurer que le répertoire existe
|
|
directory = os.path.dirname(file_path)
|
|
if directory and not os.path.exists(directory):
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.write(text)
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Erreur lors de la sauvegarde du fichier texte {file_path}: {e}")
|
|
return False
|
|
|
|
def normalize_filename(name: str) -> str:
|
|
"""
|
|
Normalise un nom de fichier en remplaçant les caractères non autorisés.
|
|
|
|
Args:
|
|
name: Nom à normaliser
|
|
|
|
Returns:
|
|
Nom normalisé
|
|
"""
|
|
# Enlever les accents
|
|
name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII')
|
|
|
|
# Remplacer les caractères non alphanumériques par des underscores
|
|
name = re.sub(r'[^\w\.-]', '_', name)
|
|
|
|
# Remplacer les caractères non autorisés par des underscores
|
|
sanitized = re.sub(r'[\\/*?:"<>|]', '_', name)
|
|
# Limiter la longueur du nom à 100 caractères
|
|
if len(sanitized) > 100:
|
|
sanitized = sanitized[:97] + "..."
|
|
return sanitized.strip()
|
|
|
|
def detect_duplicate_content(messages: List[Dict[str, Any]]) -> List[int]:
|
|
"""
|
|
Détecte les messages avec un contenu dupliqué et retourne leurs indices.
|
|
|
|
Args:
|
|
messages: Liste de messages à analyser
|
|
|
|
Returns:
|
|
Liste des indices des messages dupliqués
|
|
"""
|
|
try:
|
|
# Import here to avoid circular imports
|
|
from formatters.clean_html import clean_html
|
|
except ImportError:
|
|
# Fallback to a simplified version if the import fails
|
|
def clean_html(html_content, is_forwarded=False, is_description=False, strategy="standard",
|
|
preserve_links=False, preserve_images=False, preserve_doc_links=True):
|
|
return html_content.strip() if html_content else ""
|
|
|
|
content_map = {}
|
|
duplicate_indices = []
|
|
|
|
for idx, message in enumerate(messages):
|
|
body = message.get("body", "")
|
|
if not body:
|
|
continue
|
|
|
|
# Nettoyer le contenu HTML pour la comparaison
|
|
cleaned_content = clean_html(body, is_description=False)
|
|
# Considérer uniquement les messages avec du contenu significatif
|
|
if len(cleaned_content.strip()) < 10:
|
|
continue
|
|
|
|
# Vérifier si le contenu existe déjà
|
|
if cleaned_content in content_map:
|
|
duplicate_indices.append(idx)
|
|
else:
|
|
content_map[cleaned_content] = idx
|
|
|
|
return duplicate_indices
|
|
|
|
def get_timestamp() -> str:
|
|
"""
|
|
Retourne un timestamp au format YYYYMMDD_HHMMSS
|
|
|
|
Returns:
|
|
Chaîne formatée avec le timestamp
|
|
"""
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
def ensure_dir(path: str) -> bool:
|
|
"""
|
|
S'assure qu'un répertoire existe, le crée si nécessaire.
|
|
|
|
Args:
|
|
path: Chemin du répertoire à créer
|
|
|
|
Returns:
|
|
True si le répertoire existe ou a été créé avec succès, False sinon
|
|
"""
|
|
try:
|
|
if not os.path.exists(path):
|
|
os.makedirs(path, exist_ok=True)
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Erreur lors de la création du répertoire {path}: {e}")
|
|
return False
|
|
|
|
def is_important_image(tag: Tag, message_text: str) -> bool:
|
|
"""
|
|
Détermine si une image est importante ou s'il s'agit d'un logo/signature.
|
|
|
|
Args:
|
|
tag: La balise d'image à analyser
|
|
message_text: Le texte complet du message pour contexte
|
|
|
|
Returns:
|
|
True si l'image semble importante, False sinon
|
|
"""
|
|
# Vérifier les attributs de l'image
|
|
src = str(tag.get('src', ''))
|
|
alt = str(tag.get('alt', ''))
|
|
title = str(tag.get('title', ''))
|
|
|
|
# Patterns pour les images inutiles
|
|
useless_img_patterns = [
|
|
'logo', 'signature', 'outlook', 'footer', 'header', 'icon',
|
|
'emoticon', 'emoji', 'cid:', 'pixel', 'spacer', 'vignette',
|
|
'banner', 'separator', 'decoration', 'mail_signature'
|
|
]
|
|
|
|
# Vérifier si c'est une image inutile
|
|
for pattern in useless_img_patterns:
|
|
if (pattern in src.lower() or
|
|
pattern in alt.lower() or
|
|
pattern in title.lower()):
|
|
return False
|
|
|
|
# Vérifier la taille
|
|
width_str = str(tag.get('width', ''))
|
|
height_str = str(tag.get('height', ''))
|
|
|
|
try:
|
|
if width_str.isdigit() and height_str.isdigit():
|
|
width = int(width_str)
|
|
height = int(height_str)
|
|
if width <= 50 and height <= 50:
|
|
return False
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Vérifier si l'image est mentionnée dans le texte
|
|
image_indicators = [
|
|
'capture', 'screenshot', 'image', 'photo', 'illustration',
|
|
'voir', 'regarder', 'ci-joint', 'écran', 'erreur', 'problème',
|
|
'bug', 'pièce jointe', 'attachment', 'veuillez trouver'
|
|
]
|
|
|
|
for indicator in image_indicators:
|
|
if indicator in message_text.lower():
|
|
return True
|
|
|
|
return True |