llm_ticket3/utils/translate_utils.py
2025-04-25 16:02:24 +02:00

314 lines
12 KiB
Python

# utils/translate_utils.py
from deep_translator import GoogleTranslator
import json
import os
from datetime import datetime
import logging
from typing import Optional, Dict, Any
import hashlib
from functools import lru_cache
logger = logging.getLogger("Translate")
# Cache global pour les traductions (conservé entre les appels)
TRANSLATION_CACHE: Dict[str, str] = {}
MAX_CACHE_SIZE = 1000 # Nombre maximal d'entrées dans le cache
def _get_cache_key(text: str, source: str, target: str) -> str:
"""
Génère une clé de cache unique pour une traduction donnée.
Args:
text: Texte à traduire
source: Langue source
target: Langue cible
Returns:
Clé de cache
"""
# Limiter la taille du texte pour la clé de cache
text_snippet = text[:500] if text else ""
hash_key = hashlib.md5(f"{text_snippet}|{source}|{target}".encode('utf-8')).hexdigest()
return hash_key
def _clean_cache_if_needed() -> None:
"""
Nettoie le cache si sa taille dépasse la limite maximale.
"""
global TRANSLATION_CACHE
if len(TRANSLATION_CACHE) > MAX_CACHE_SIZE:
# Garder seulement 75% des entrées les plus récentes (approximativement)
items = list(TRANSLATION_CACHE.items())
keep_count = int(MAX_CACHE_SIZE * 0.75)
TRANSLATION_CACHE = dict(items[-keep_count:])
logger.info(f"Cache de traduction nettoyé : {len(TRANSLATION_CACHE)} entrées conservées")
def translate_text(text: str, source: str, target: str, use_cache: bool = True) -> str:
"""
Fonction générique de traduction avec gestion de cache.
Args:
text: Texte à traduire
source: Langue source ('fr', 'en', etc.)
target: Langue cible ('fr', 'en', etc.)
use_cache: Si True, utilise le cache de traduction
Returns:
Texte traduit
"""
if not text or not text.strip():
logger.debug(f"[TRADUCTION] Texte vide, aucune traduction nécessaire {source}->{target}")
return ""
# Log de début de traduction
text_preview = text[:50] + "..." if len(text) > 50 else text
logger.debug(f"[TRADUCTION] Demande de traduction {source}->{target}: '{text_preview}'")
# Vérifier le cache
if use_cache:
cache_key = _get_cache_key(text, source, target)
if cache_key in TRANSLATION_CACHE:
logger.debug(f"[TRADUCTION] Récupération depuis le cache pour {source}->{target}")
return TRANSLATION_CACHE[cache_key]
# Limiter la taille du texte pour éviter les problèmes avec l'API
# Les longs textes sont découpés et traduits par morceaux
MAX_TEXT_LENGTH = 5000
if len(text) > MAX_TEXT_LENGTH:
logger.info(f"[TRADUCTION] Texte trop long ({len(text)} caractères), découpage en {len(text) // MAX_TEXT_LENGTH + 1} morceaux")
chunks = _split_text_into_chunks(text, MAX_TEXT_LENGTH)
translated_chunks = []
for i, chunk in enumerate(chunks):
logger.debug(f"[TRADUCTION] Traduction du morceau {i+1}/{len(chunks)}")
translated_chunk = translate_text(chunk, source, target, use_cache)
translated_chunks.append(translated_chunk)
result = ' '.join(translated_chunks)
logger.debug(f"[TRADUCTION] Tous les morceaux traduits et réassemblés : {len(result)} caractères")
else:
try:
logger.debug(f"[TRADUCTION] Appel de l'API de traduction {source}->{target} pour {len(text)} caractères")
translator = GoogleTranslator(source=source, target=target)
result = translator.translate(text)
logger.debug(f"[TRADUCTION] Traduction terminée {source}->{target}, résultat: {len(result)} caractères")
except Exception as e:
logger.error(f"[TRADUCTION] Échec de la traduction {source}->{target}: {e}")
return text # Retourner le texte original en cas d'erreur
# Mettre en cache
if use_cache:
cache_key = _get_cache_key(text, source, target)
TRANSLATION_CACHE[cache_key] = result
logger.debug(f"[TRADUCTION] Résultat mis en cache pour {source}->{target}")
_clean_cache_if_needed()
return result
def _split_text_into_chunks(text: str, max_length: int) -> list:
"""
Découpe un texte en morceaux plus petits en respectant les phrases.
Args:
text: Texte à découper
max_length: Longueur maximale de chaque morceau
Returns:
Liste des morceaux de texte
"""
chunks = []
current_chunk = ""
# Split by paragraphs
paragraphs = text.split('\n')
for paragraph in paragraphs:
# If paragraph is too long, split by sentences
if len(paragraph) > max_length:
sentences = paragraph.replace('. ', '.\n').replace('! ', '!\n').replace('? ', '?\n').split('\n')
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 <= max_length:
if current_chunk:
current_chunk += ' ' + sentence
else:
current_chunk = sentence
else:
chunks.append(current_chunk)
current_chunk = sentence
else:
if len(current_chunk) + len(paragraph) + 1 <= max_length:
if current_chunk:
current_chunk += '\n' + paragraph
else:
current_chunk = paragraph
else:
chunks.append(current_chunk)
current_chunk = paragraph
if current_chunk:
chunks.append(current_chunk)
return chunks
def fr_to_en(text: str) -> str:
"""
Traduit du français vers l'anglais.
Args:
text: Texte en français
Returns:
Texte traduit en anglais
"""
if not text:
return ""
logger.info(f"[TRADUCTION] FR → EN: Traduction de {len(text)} caractères")
result = translate_text(text, "fr", "en")
logger.info(f"[TRADUCTION] FR → EN: Résultat obtenu de {len(result)} caractères")
return result
def en_to_fr(text: str) -> str:
"""
Traduit de l'anglais vers le français.
Args:
text: Texte en anglais
Returns:
Texte traduit en français
"""
if not text:
return ""
logger.info(f"[TRADUCTION] EN → FR: Traduction de {len(text)} caractères")
result = translate_text(text, "en", "fr")
logger.info(f"[TRADUCTION] EN → FR: Résultat obtenu de {len(result)} caractères")
return result
def determiner_repertoire_ticket(ticket_id: str):
"""
Détermine dynamiquement le répertoire du ticket.
Args:
ticket_id: str, le code du ticket
Returns:
str, le chemin du répertoire pour ce ticket ou None si non trouvé
"""
# Base de recherche des tickets
output_dir = "output"
# Format attendu du répertoire de ticket
ticket_dir = f"ticket_{ticket_id}"
ticket_path = os.path.join(output_dir, ticket_dir)
if not os.path.exists(ticket_path):
print(f"Répertoire de ticket non trouvé: {ticket_path}")
return None
# Trouver la dernière extraction (par date)
extractions = []
for extraction in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extraction)
if os.path.isdir(extraction_path) and extraction.startswith(ticket_id):
extractions.append(extraction_path)
if not extractions:
print(f"Aucune extraction trouvée pour le ticket {ticket_id}")
return None
# Trier par date de modification (plus récente en premier)
extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True)
# Retourner le chemin de la dernière extraction
return extractions[0]
def sauvegarder_ocr_traduction(
image_path: str,
ticket_id: str,
ocr_fr: str,
ocr_en: str,
ocr_en_back_fr: str = "",
base_dir: Optional[str] = None # Utiliser Optional[str]
) -> None:
"""
Sauvegarde les résultats OCR + TRAD en JSON (par image) et
ajoute une ligne dans un fichier texte global (append sécurisé).
Utilise le répertoire de sortie output/ticket_X/X_YYYYMMDD_HHMMSS/X_rapports/pipeline
pour la sauvegarde des données.
"""
try:
image_name = os.path.basename(image_path)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
logger.info(f"[SAUVEGARDE] Sauvegarde des résultats OCR et traduction pour {image_name}")
logger.info(f"[SAUVEGARDE] Contenus: OCR FR={len(ocr_fr)} caractères, OCR EN={len(ocr_en)} caractères, OCR EN→FR={len(ocr_en_back_fr)} caractères")
# Déterminer le répertoire de sortie basé sur le ticket_id
if not base_dir:
# Utiliser le répertoire de sortie approprié dans output
extraction_dir = determiner_repertoire_ticket(ticket_id)
if not extraction_dir:
# Fallback vers reports si impossible de trouver le répertoire
base_dir = "reports"
rapport_dir = os.path.join(base_dir, ticket_id, "pipeline", "ocr_traduction")
logger.info(f"[SAUVEGARDE] Répertoire de ticket non trouvé, utilisation de {rapport_dir}")
else:
# Utiliser le répertoire rapports du ticket
rapports_dir = os.path.join(extraction_dir, f"{ticket_id}_rapports")
rapport_dir = os.path.join(rapports_dir, "pipeline", "ocr_traduction")
logger.info(f"[SAUVEGARDE] Utilisation du répertoire de rapports: {rapport_dir}")
else:
# Utiliser directement le répertoire pipeline existant et y ajouter ocr_traduction
if os.path.basename(base_dir) == f"{ticket_id}_rapports":
# Si base_dir est déjà le répertoire des rapports
rapport_dir = os.path.join(base_dir, "pipeline", "ocr_traduction")
else:
# Sinon, utiliser tel quel et ajouter ocr_traduction
rapport_dir = os.path.join(base_dir, "pipeline", "ocr_traduction")
logger.info(f"[SAUVEGARDE] Utilisation du répertoire fourni: {rapport_dir}")
os.makedirs(rapport_dir, exist_ok=True)
# Sauvegarde JSON (1 par image, réécrit à chaque passage)
json_path = os.path.join(rapport_dir, f"{image_name}.json")
result = {
"image_name": image_name,
"ocr_fr": ocr_fr,
"translation_en": ocr_en,
"translation_en_back_fr": ocr_en_back_fr,
"metadata": {
"ticket_id": ticket_id,
"timestamp": timestamp,
"source_module": "ocr_utils + translate_utils",
"lang_detected": "fr"
}
}
with open(json_path, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info(f"[SAUVEGARDE] Fichier JSON enregistré: {json_path}")
# Append TXT global sécurisé (évite l'écrasement)
txt_path = os.path.join(rapport_dir, "ocr_traduction.txt")
ligne = (
f"{image_name}\n"
f"[FR] {ocr_fr or '_'}\n"
f"[EN] {ocr_en or '_'}\n"
f"[EN→FR] {ocr_en_back_fr or '_'}\n\n"
)
with open(txt_path, "a", encoding="utf-8") as f:
f.write(ligne)
logger.info(f"[SAUVEGARDE] Ligne ajoutée dans le fichier global: {txt_path}")
except Exception as e:
logger.error(f"[SAUVEGARDE] Erreur lors de la sauvegarde OCR+TRAD pour {image_path}: {e}")
# Fonction pour effacer le cache de traduction (utile pour les tests)
def clear_translation_cache():
"""Vide le cache de traduction."""
global TRANSLATION_CACHE
TRANSLATION_CACHE = {}
logger.info("Cache de traduction vidé")