mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-13 10:46:51 +01:00
314 lines
12 KiB
Python
314 lines
12 KiB
Python
# utils/translate_utils.py
|
|
|
|
from deep_translator import GoogleTranslator
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
import logging
|
|
from typing import Optional, Dict, Any
|
|
import hashlib
|
|
from functools import lru_cache
|
|
|
|
logger = logging.getLogger("Translate")
|
|
|
|
# Cache global pour les traductions (conservé entre les appels)
|
|
TRANSLATION_CACHE: Dict[str, str] = {}
|
|
MAX_CACHE_SIZE = 1000 # Nombre maximal d'entrées dans le cache
|
|
|
|
def _get_cache_key(text: str, source: str, target: str) -> str:
|
|
"""
|
|
Génère une clé de cache unique pour une traduction donnée.
|
|
|
|
Args:
|
|
text: Texte à traduire
|
|
source: Langue source
|
|
target: Langue cible
|
|
|
|
Returns:
|
|
Clé de cache
|
|
"""
|
|
# Limiter la taille du texte pour la clé de cache
|
|
text_snippet = text[:500] if text else ""
|
|
hash_key = hashlib.md5(f"{text_snippet}|{source}|{target}".encode('utf-8')).hexdigest()
|
|
return hash_key
|
|
|
|
def _clean_cache_if_needed() -> None:
|
|
"""
|
|
Nettoie le cache si sa taille dépasse la limite maximale.
|
|
"""
|
|
global TRANSLATION_CACHE
|
|
if len(TRANSLATION_CACHE) > MAX_CACHE_SIZE:
|
|
# Garder seulement 75% des entrées les plus récentes (approximativement)
|
|
items = list(TRANSLATION_CACHE.items())
|
|
keep_count = int(MAX_CACHE_SIZE * 0.75)
|
|
TRANSLATION_CACHE = dict(items[-keep_count:])
|
|
logger.info(f"Cache de traduction nettoyé : {len(TRANSLATION_CACHE)} entrées conservées")
|
|
|
|
def translate_text(text: str, source: str, target: str, use_cache: bool = True) -> str:
|
|
"""
|
|
Fonction générique de traduction avec gestion de cache.
|
|
|
|
Args:
|
|
text: Texte à traduire
|
|
source: Langue source ('fr', 'en', etc.)
|
|
target: Langue cible ('fr', 'en', etc.)
|
|
use_cache: Si True, utilise le cache de traduction
|
|
|
|
Returns:
|
|
Texte traduit
|
|
"""
|
|
if not text or not text.strip():
|
|
logger.debug(f"[TRADUCTION] Texte vide, aucune traduction nécessaire {source}->{target}")
|
|
return ""
|
|
|
|
# Log de début de traduction
|
|
text_preview = text[:50] + "..." if len(text) > 50 else text
|
|
logger.debug(f"[TRADUCTION] Demande de traduction {source}->{target}: '{text_preview}'")
|
|
|
|
# Vérifier le cache
|
|
if use_cache:
|
|
cache_key = _get_cache_key(text, source, target)
|
|
if cache_key in TRANSLATION_CACHE:
|
|
logger.debug(f"[TRADUCTION] Récupération depuis le cache pour {source}->{target}")
|
|
return TRANSLATION_CACHE[cache_key]
|
|
|
|
# Limiter la taille du texte pour éviter les problèmes avec l'API
|
|
# Les longs textes sont découpés et traduits par morceaux
|
|
MAX_TEXT_LENGTH = 5000
|
|
if len(text) > MAX_TEXT_LENGTH:
|
|
logger.info(f"[TRADUCTION] Texte trop long ({len(text)} caractères), découpage en {len(text) // MAX_TEXT_LENGTH + 1} morceaux")
|
|
chunks = _split_text_into_chunks(text, MAX_TEXT_LENGTH)
|
|
translated_chunks = []
|
|
for i, chunk in enumerate(chunks):
|
|
logger.debug(f"[TRADUCTION] Traduction du morceau {i+1}/{len(chunks)}")
|
|
translated_chunk = translate_text(chunk, source, target, use_cache)
|
|
translated_chunks.append(translated_chunk)
|
|
result = ' '.join(translated_chunks)
|
|
logger.debug(f"[TRADUCTION] Tous les morceaux traduits et réassemblés : {len(result)} caractères")
|
|
else:
|
|
try:
|
|
logger.debug(f"[TRADUCTION] Appel de l'API de traduction {source}->{target} pour {len(text)} caractères")
|
|
translator = GoogleTranslator(source=source, target=target)
|
|
result = translator.translate(text)
|
|
logger.debug(f"[TRADUCTION] Traduction terminée {source}->{target}, résultat: {len(result)} caractères")
|
|
except Exception as e:
|
|
logger.error(f"[TRADUCTION] Échec de la traduction {source}->{target}: {e}")
|
|
return text # Retourner le texte original en cas d'erreur
|
|
|
|
# Mettre en cache
|
|
if use_cache:
|
|
cache_key = _get_cache_key(text, source, target)
|
|
TRANSLATION_CACHE[cache_key] = result
|
|
logger.debug(f"[TRADUCTION] Résultat mis en cache pour {source}->{target}")
|
|
_clean_cache_if_needed()
|
|
|
|
return result
|
|
|
|
def _split_text_into_chunks(text: str, max_length: int) -> list:
|
|
"""
|
|
Découpe un texte en morceaux plus petits en respectant les phrases.
|
|
|
|
Args:
|
|
text: Texte à découper
|
|
max_length: Longueur maximale de chaque morceau
|
|
|
|
Returns:
|
|
Liste des morceaux de texte
|
|
"""
|
|
chunks = []
|
|
current_chunk = ""
|
|
|
|
# Split by paragraphs
|
|
paragraphs = text.split('\n')
|
|
|
|
for paragraph in paragraphs:
|
|
# If paragraph is too long, split by sentences
|
|
if len(paragraph) > max_length:
|
|
sentences = paragraph.replace('. ', '.\n').replace('! ', '!\n').replace('? ', '?\n').split('\n')
|
|
for sentence in sentences:
|
|
if len(current_chunk) + len(sentence) + 1 <= max_length:
|
|
if current_chunk:
|
|
current_chunk += ' ' + sentence
|
|
else:
|
|
current_chunk = sentence
|
|
else:
|
|
chunks.append(current_chunk)
|
|
current_chunk = sentence
|
|
else:
|
|
if len(current_chunk) + len(paragraph) + 1 <= max_length:
|
|
if current_chunk:
|
|
current_chunk += '\n' + paragraph
|
|
else:
|
|
current_chunk = paragraph
|
|
else:
|
|
chunks.append(current_chunk)
|
|
current_chunk = paragraph
|
|
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
|
|
return chunks
|
|
|
|
def fr_to_en(text: str) -> str:
|
|
"""
|
|
Traduit du français vers l'anglais.
|
|
|
|
Args:
|
|
text: Texte en français
|
|
|
|
Returns:
|
|
Texte traduit en anglais
|
|
"""
|
|
if not text:
|
|
return ""
|
|
|
|
logger.info(f"[TRADUCTION] FR → EN: Traduction de {len(text)} caractères")
|
|
result = translate_text(text, "fr", "en")
|
|
logger.info(f"[TRADUCTION] FR → EN: Résultat obtenu de {len(result)} caractères")
|
|
return result
|
|
|
|
def en_to_fr(text: str) -> str:
|
|
"""
|
|
Traduit de l'anglais vers le français.
|
|
|
|
Args:
|
|
text: Texte en anglais
|
|
|
|
Returns:
|
|
Texte traduit en français
|
|
"""
|
|
if not text:
|
|
return ""
|
|
|
|
logger.info(f"[TRADUCTION] EN → FR: Traduction de {len(text)} caractères")
|
|
result = translate_text(text, "en", "fr")
|
|
logger.info(f"[TRADUCTION] EN → FR: Résultat obtenu de {len(result)} caractères")
|
|
return result
|
|
|
|
def determiner_repertoire_ticket(ticket_id: str):
|
|
"""
|
|
Détermine dynamiquement le répertoire du ticket.
|
|
|
|
Args:
|
|
ticket_id: str, le code du ticket
|
|
|
|
Returns:
|
|
str, le chemin du répertoire pour ce ticket ou None si non trouvé
|
|
"""
|
|
# Base de recherche des tickets
|
|
output_dir = "output"
|
|
|
|
# Format attendu du répertoire de ticket
|
|
ticket_dir = f"ticket_{ticket_id}"
|
|
ticket_path = os.path.join(output_dir, ticket_dir)
|
|
|
|
if not os.path.exists(ticket_path):
|
|
print(f"Répertoire de ticket non trouvé: {ticket_path}")
|
|
return None
|
|
|
|
# Trouver la dernière extraction (par date)
|
|
extractions = []
|
|
for extraction in os.listdir(ticket_path):
|
|
extraction_path = os.path.join(ticket_path, extraction)
|
|
if os.path.isdir(extraction_path) and extraction.startswith(ticket_id):
|
|
extractions.append(extraction_path)
|
|
|
|
if not extractions:
|
|
print(f"Aucune extraction trouvée pour le ticket {ticket_id}")
|
|
return None
|
|
|
|
# Trier par date de modification (plus récente en premier)
|
|
extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True)
|
|
|
|
# Retourner le chemin de la dernière extraction
|
|
return extractions[0]
|
|
|
|
def sauvegarder_ocr_traduction(
|
|
image_path: str,
|
|
ticket_id: str,
|
|
ocr_fr: str,
|
|
ocr_en: str,
|
|
ocr_en_back_fr: str = "",
|
|
base_dir: Optional[str] = None # Utiliser Optional[str]
|
|
) -> None:
|
|
"""
|
|
Sauvegarde les résultats OCR + TRAD en JSON (par image) et
|
|
ajoute une ligne dans un fichier texte global (append sécurisé).
|
|
Utilise le répertoire de sortie output/ticket_X/X_YYYYMMDD_HHMMSS/X_rapports/pipeline
|
|
pour la sauvegarde des données.
|
|
"""
|
|
try:
|
|
image_name = os.path.basename(image_path)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
logger.info(f"[SAUVEGARDE] Sauvegarde des résultats OCR et traduction pour {image_name}")
|
|
logger.info(f"[SAUVEGARDE] Contenus: OCR FR={len(ocr_fr)} caractères, OCR EN={len(ocr_en)} caractères, OCR EN→FR={len(ocr_en_back_fr)} caractères")
|
|
|
|
# Déterminer le répertoire de sortie basé sur le ticket_id
|
|
if not base_dir:
|
|
# Utiliser le répertoire de sortie approprié dans output
|
|
extraction_dir = determiner_repertoire_ticket(ticket_id)
|
|
if not extraction_dir:
|
|
# Fallback vers reports si impossible de trouver le répertoire
|
|
base_dir = "reports"
|
|
rapport_dir = os.path.join(base_dir, ticket_id, "pipeline", "ocr_traduction")
|
|
logger.info(f"[SAUVEGARDE] Répertoire de ticket non trouvé, utilisation de {rapport_dir}")
|
|
else:
|
|
# Utiliser le répertoire rapports du ticket
|
|
rapports_dir = os.path.join(extraction_dir, f"{ticket_id}_rapports")
|
|
rapport_dir = os.path.join(rapports_dir, "pipeline", "ocr_traduction")
|
|
logger.info(f"[SAUVEGARDE] Utilisation du répertoire de rapports: {rapport_dir}")
|
|
else:
|
|
# Utiliser directement le répertoire pipeline existant et y ajouter ocr_traduction
|
|
if os.path.basename(base_dir) == f"{ticket_id}_rapports":
|
|
# Si base_dir est déjà le répertoire des rapports
|
|
rapport_dir = os.path.join(base_dir, "pipeline", "ocr_traduction")
|
|
else:
|
|
# Sinon, utiliser tel quel et ajouter ocr_traduction
|
|
rapport_dir = os.path.join(base_dir, "pipeline", "ocr_traduction")
|
|
logger.info(f"[SAUVEGARDE] Utilisation du répertoire fourni: {rapport_dir}")
|
|
|
|
os.makedirs(rapport_dir, exist_ok=True)
|
|
|
|
# Sauvegarde JSON (1 par image, réécrit à chaque passage)
|
|
json_path = os.path.join(rapport_dir, f"{image_name}.json")
|
|
result = {
|
|
"image_name": image_name,
|
|
"ocr_fr": ocr_fr,
|
|
"translation_en": ocr_en,
|
|
"translation_en_back_fr": ocr_en_back_fr,
|
|
"metadata": {
|
|
"ticket_id": ticket_id,
|
|
"timestamp": timestamp,
|
|
"source_module": "ocr_utils + translate_utils",
|
|
"lang_detected": "fr"
|
|
}
|
|
}
|
|
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
logger.info(f"[SAUVEGARDE] Fichier JSON enregistré: {json_path}")
|
|
|
|
# Append TXT global sécurisé (évite l'écrasement)
|
|
txt_path = os.path.join(rapport_dir, "ocr_traduction.txt")
|
|
ligne = (
|
|
f"{image_name}\n"
|
|
f"[FR] {ocr_fr or '_'}\n"
|
|
f"[EN] {ocr_en or '_'}\n"
|
|
f"[EN→FR] {ocr_en_back_fr or '_'}\n\n"
|
|
)
|
|
with open(txt_path, "a", encoding="utf-8") as f:
|
|
f.write(ligne)
|
|
|
|
logger.info(f"[SAUVEGARDE] Ligne ajoutée dans le fichier global: {txt_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[SAUVEGARDE] Erreur lors de la sauvegarde OCR+TRAD pour {image_path}: {e}")
|
|
|
|
# Fonction pour effacer le cache de traduction (utile pour les tests)
|
|
def clear_translation_cache():
|
|
"""Vide le cache de traduction."""
|
|
global TRANSLATION_CACHE
|
|
TRANSLATION_CACHE = {}
|
|
logger.info("Cache de traduction vidé")
|