# utils/translate_utils.py from deep_translator import GoogleTranslator import json import os from datetime import datetime import logging from typing import Optional, Dict, Any import hashlib from functools import lru_cache logger = logging.getLogger("Translate") # Cache global pour les traductions (conservé entre les appels) TRANSLATION_CACHE: Dict[str, str] = {} MAX_CACHE_SIZE = 1000 # Nombre maximal d'entrées dans le cache def _get_cache_key(text: str, source: str, target: str) -> str: """ Génère une clé de cache unique pour une traduction donnée. Args: text: Texte à traduire source: Langue source target: Langue cible Returns: Clé de cache """ # Limiter la taille du texte pour la clé de cache text_snippet = text[:500] if text else "" hash_key = hashlib.md5(f"{text_snippet}|{source}|{target}".encode('utf-8')).hexdigest() return hash_key def _clean_cache_if_needed() -> None: """ Nettoie le cache si sa taille dépasse la limite maximale. """ global TRANSLATION_CACHE if len(TRANSLATION_CACHE) > MAX_CACHE_SIZE: # Garder seulement 75% des entrées les plus récentes (approximativement) items = list(TRANSLATION_CACHE.items()) keep_count = int(MAX_CACHE_SIZE * 0.75) TRANSLATION_CACHE = dict(items[-keep_count:]) logger.info(f"Cache de traduction nettoyé : {len(TRANSLATION_CACHE)} entrées conservées") def translate_text(text: str, source: str, target: str, use_cache: bool = True) -> str: """ Fonction générique de traduction avec gestion de cache. Args: text: Texte à traduire source: Langue source ('fr', 'en', etc.) target: Langue cible ('fr', 'en', etc.) use_cache: Si True, utilise le cache de traduction Returns: Texte traduit """ if not text or not text.strip(): logger.debug(f"[TRADUCTION] Texte vide, aucune traduction nécessaire {source}->{target}") return "" # Log de début de traduction text_preview = text[:50] + "..." if len(text) > 50 else text logger.debug(f"[TRADUCTION] Demande de traduction {source}->{target}: '{text_preview}'") # Vérifier le cache if use_cache: cache_key = _get_cache_key(text, source, target) if cache_key in TRANSLATION_CACHE: logger.debug(f"[TRADUCTION] Récupération depuis le cache pour {source}->{target}") return TRANSLATION_CACHE[cache_key] # Limiter la taille du texte pour éviter les problèmes avec l'API # Les longs textes sont découpés et traduits par morceaux MAX_TEXT_LENGTH = 5000 if len(text) > MAX_TEXT_LENGTH: logger.info(f"[TRADUCTION] Texte trop long ({len(text)} caractères), découpage en {len(text) // MAX_TEXT_LENGTH + 1} morceaux") chunks = _split_text_into_chunks(text, MAX_TEXT_LENGTH) translated_chunks = [] for i, chunk in enumerate(chunks): logger.debug(f"[TRADUCTION] Traduction du morceau {i+1}/{len(chunks)}") translated_chunk = translate_text(chunk, source, target, use_cache) translated_chunks.append(translated_chunk) result = ' '.join(translated_chunks) logger.debug(f"[TRADUCTION] Tous les morceaux traduits et réassemblés : {len(result)} caractères") else: try: logger.debug(f"[TRADUCTION] Appel de l'API de traduction {source}->{target} pour {len(text)} caractères") translator = GoogleTranslator(source=source, target=target) result = translator.translate(text) logger.debug(f"[TRADUCTION] Traduction terminée {source}->{target}, résultat: {len(result)} caractères") except Exception as e: logger.error(f"[TRADUCTION] Échec de la traduction {source}->{target}: {e}") return text # Retourner le texte original en cas d'erreur # Mettre en cache if use_cache: cache_key = _get_cache_key(text, source, target) TRANSLATION_CACHE[cache_key] = result logger.debug(f"[TRADUCTION] Résultat mis en cache pour {source}->{target}") _clean_cache_if_needed() return result def _split_text_into_chunks(text: str, max_length: int) -> list: """ Découpe un texte en morceaux plus petits en respectant les phrases. Args: text: Texte à découper max_length: Longueur maximale de chaque morceau Returns: Liste des morceaux de texte """ chunks = [] current_chunk = "" # Split by paragraphs paragraphs = text.split('\n') for paragraph in paragraphs: # If paragraph is too long, split by sentences if len(paragraph) > max_length: sentences = paragraph.replace('. ', '.\n').replace('! ', '!\n').replace('? ', '?\n').split('\n') for sentence in sentences: if len(current_chunk) + len(sentence) + 1 <= max_length: if current_chunk: current_chunk += ' ' + sentence else: current_chunk = sentence else: chunks.append(current_chunk) current_chunk = sentence else: if len(current_chunk) + len(paragraph) + 1 <= max_length: if current_chunk: current_chunk += '\n' + paragraph else: current_chunk = paragraph else: chunks.append(current_chunk) current_chunk = paragraph if current_chunk: chunks.append(current_chunk) return chunks def fr_to_en(text: str) -> str: """ Traduit du français vers l'anglais. Args: text: Texte en français Returns: Texte traduit en anglais """ if not text: return "" logger.info(f"[TRADUCTION] FR → EN: Traduction de {len(text)} caractères") result = translate_text(text, "fr", "en") logger.info(f"[TRADUCTION] FR → EN: Résultat obtenu de {len(result)} caractères") return result def en_to_fr(text: str) -> str: """ Traduit de l'anglais vers le français. Args: text: Texte en anglais Returns: Texte traduit en français """ if not text: return "" logger.info(f"[TRADUCTION] EN → FR: Traduction de {len(text)} caractères") result = translate_text(text, "en", "fr") logger.info(f"[TRADUCTION] EN → FR: Résultat obtenu de {len(result)} caractères") return result def determiner_repertoire_ticket(ticket_id: str): """ Détermine dynamiquement le répertoire du ticket. Args: ticket_id: str, le code du ticket Returns: str, le chemin du répertoire pour ce ticket ou None si non trouvé """ # Base de recherche des tickets output_dir = "output" # Format attendu du répertoire de ticket ticket_dir = f"ticket_{ticket_id}" ticket_path = os.path.join(output_dir, ticket_dir) if not os.path.exists(ticket_path): print(f"Répertoire de ticket non trouvé: {ticket_path}") return None # Trouver la dernière extraction (par date) extractions = [] for extraction in os.listdir(ticket_path): extraction_path = os.path.join(ticket_path, extraction) if os.path.isdir(extraction_path) and extraction.startswith(ticket_id): extractions.append(extraction_path) if not extractions: print(f"Aucune extraction trouvée pour le ticket {ticket_id}") return None # Trier par date de modification (plus récente en premier) extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True) # Retourner le chemin de la dernière extraction return extractions[0] def sauvegarder_ocr_traduction( image_path: str, ticket_id: str, ocr_fr: str, ocr_en: str, ocr_en_back_fr: str = "", base_dir: Optional[str] = None # Utiliser Optional[str] ) -> None: """ Sauvegarde les résultats OCR + TRAD en JSON (par image) et ajoute une ligne dans un fichier texte global (append sécurisé). Utilise le répertoire de sortie output/ticket_X/X_YYYYMMDD_HHMMSS/X_rapports/pipeline pour la sauvegarde des données. """ try: image_name = os.path.basename(image_path) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") logger.info(f"[SAUVEGARDE] Sauvegarde des résultats OCR et traduction pour {image_name}") logger.info(f"[SAUVEGARDE] Contenus: OCR FR={len(ocr_fr)} caractères, OCR EN={len(ocr_en)} caractères, OCR EN→FR={len(ocr_en_back_fr)} caractères") # Déterminer le répertoire de sortie basé sur le ticket_id if not base_dir: # Utiliser le répertoire de sortie approprié dans output extraction_dir = determiner_repertoire_ticket(ticket_id) if not extraction_dir: # Fallback vers reports si impossible de trouver le répertoire base_dir = "reports" rapport_dir = os.path.join(base_dir, ticket_id, "pipeline", "ocr_traduction") logger.info(f"[SAUVEGARDE] Répertoire de ticket non trouvé, utilisation de {rapport_dir}") else: # Utiliser le répertoire rapports du ticket rapports_dir = os.path.join(extraction_dir, f"{ticket_id}_rapports") rapport_dir = os.path.join(rapports_dir, "pipeline", "ocr_traduction") logger.info(f"[SAUVEGARDE] Utilisation du répertoire de rapports: {rapport_dir}") else: # Utiliser directement le répertoire pipeline existant et y ajouter ocr_traduction if os.path.basename(base_dir) == f"{ticket_id}_rapports": # Si base_dir est déjà le répertoire des rapports rapport_dir = os.path.join(base_dir, "pipeline", "ocr_traduction") else: # Sinon, utiliser tel quel et ajouter ocr_traduction rapport_dir = os.path.join(base_dir, "pipeline", "ocr_traduction") logger.info(f"[SAUVEGARDE] Utilisation du répertoire fourni: {rapport_dir}") os.makedirs(rapport_dir, exist_ok=True) # Sauvegarde JSON (1 par image, réécrit à chaque passage) json_path = os.path.join(rapport_dir, f"{image_name}.json") result = { "image_name": image_name, "ocr_fr": ocr_fr, "translation_en": ocr_en, "translation_en_back_fr": ocr_en_back_fr, "metadata": { "ticket_id": ticket_id, "timestamp": timestamp, "source_module": "ocr_utils + translate_utils", "lang_detected": "fr" } } with open(json_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) logger.info(f"[SAUVEGARDE] Fichier JSON enregistré: {json_path}") # Append TXT global sécurisé (évite l'écrasement) txt_path = os.path.join(rapport_dir, "ocr_traduction.txt") ligne = ( f"{image_name}\n" f"[FR] {ocr_fr or '_'}\n" f"[EN] {ocr_en or '_'}\n" f"[EN→FR] {ocr_en_back_fr or '_'}\n\n" ) with open(txt_path, "a", encoding="utf-8") as f: f.write(ligne) logger.info(f"[SAUVEGARDE] Ligne ajoutée dans le fichier global: {txt_path}") except Exception as e: logger.error(f"[SAUVEGARDE] Erreur lors de la sauvegarde OCR+TRAD pour {image_path}: {e}") # Fonction pour effacer le cache de traduction (utile pour les tests) def clear_translation_cache(): """Vide le cache de traduction.""" global TRANSLATION_CACHE TRANSLATION_CACHE = {} logger.info("Cache de traduction vidé")