llm_ticket3/odoo/batch_ticket_manager.py
2025-04-09 16:13:20 +02:00

327 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Module de gestion des lots de tickets Odoo.
Permet l'extraction par lots de tickets selon différents critères.
"""
import os
import json
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple, Set, Union
from .ticket_manager import TicketManager
from core.utils import save_json, ensure_dir
class BatchTicketManager:
"""
Gestionnaire pour l'extraction par lots de tickets Odoo.
Étend les fonctionnalités du TicketManager standard.
"""
def __init__(self, ticket_manager: TicketManager):
"""
Initialise le gestionnaire de lots avec un TicketManager existant.
Args:
ticket_manager: Gestionnaire de tickets déjà initialisé
"""
self.ticket_manager = ticket_manager
def search_tickets(self,
domain: Optional[List] = None,
limit: int = 50,
offset: int = 0,
order: str = "create_date desc") -> List[Dict[str, Any]]:
"""
Recherche des tickets selon des critères spécifiques.
Args:
domain: Domaine de recherche au format Odoo
limit: Nombre maximum de tickets à retourner
offset: Nombre de tickets à ignorer (pour la pagination)
order: Champ et direction de tri
Returns:
Liste de tickets correspondant aux critères
"""
domain = domain or []
params = {
"model": self.ticket_manager.model_name,
"method": "search_read",
"args": [domain,
["id", "name", "description", "stage_id", "project_id", "partner_id",
"user_id", "date_start", "date_end", "date_deadline", "create_date", "write_date",
"tag_ids", "priority", "email_from", "email_cc", "code"]],
"kwargs": {"limit": limit, "offset": offset, "order": order}
}
result = self.ticket_manager.auth_manager._rpc_call("/web/dataset/call_kw", params)
# Résoudre les champs relationnels pour chaque ticket
if isinstance(result, list):
return [self.ticket_manager.resolve_relation_fields(ticket) for ticket in result]
else:
logging.error(f"Erreur lors de la recherche des tickets: {result}")
return []
def get_projects(self) -> List[Dict[str, Any]]:
"""
Récupère la liste des projets disponibles.
Returns:
Liste des projets avec leur ID et nom
"""
params = {
"model": "project.project",
"method": "search_read",
"args": [[], ["id", "name", "description", "active"]],
"kwargs": {"order": "name"}
}
result = self.ticket_manager.auth_manager._rpc_call("/web/dataset/call_kw", params)
if isinstance(result, list):
return result
else:
logging.error(f"Erreur lors de la récupération des projets: {result}")
return []
def get_stages(self) -> List[Dict[str, Any]]:
"""
Récupère la liste des étapes/statuts disponibles.
Returns:
Liste des étapes avec leur ID et nom
"""
params = {
"model": "project.task.type",
"method": "search_read",
"args": [[], ["id", "name", "description", "sequence"]],
"kwargs": {"order": "sequence"}
}
result = self.ticket_manager.auth_manager._rpc_call("/web/dataset/call_kw", params)
if isinstance(result, list):
return result
else:
logging.error(f"Erreur lors de la récupération des étapes: {result}")
return []
def get_ticket_count(self, domain: Optional[List] = None) -> int:
"""
Compte le nombre de tickets correspondant aux critères.
Args:
domain: Domaine de recherche au format Odoo
Returns:
Nombre de tickets
"""
domain = domain or []
params = {
"model": self.ticket_manager.model_name,
"method": "search_count",
"args": [domain],
"kwargs": {}
}
result = self.ticket_manager.auth_manager._rpc_call("/web/dataset/call_kw", params)
if isinstance(result, int):
return result
else:
logging.error(f"Erreur lors du comptage des tickets: {result}")
return 0
def load_existing_tickets(self, base_dir: str) -> Set[str]:
"""
Charge la liste des tickets déjà extraits.
Args:
base_dir: Répertoire de base où chercher le fichier
Returns:
Ensemble des codes de tickets déjà extraits
"""
existing_tickets_file = os.path.join(base_dir, "extracted_tickets.json")
existing_tickets = set()
if os.path.exists(existing_tickets_file):
try:
with open(existing_tickets_file, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
existing_tickets = set(existing_data.get("ticket_codes", []))
logging.info(f"Chargé {len(existing_tickets)} tickets déjà extraits")
except Exception as e:
logging.error(f"Erreur lors de la lecture des tickets existants: {e}")
return existing_tickets
def save_existing_tickets(self, base_dir: str, tickets: Set[str]) -> bool:
"""
Sauvegarde la liste des tickets extraits.
Args:
base_dir: Répertoire de base où sauvegarder le fichier
tickets: Ensemble des codes de tickets extraits
Returns:
True si la sauvegarde a réussi, False sinon
"""
existing_tickets_file = os.path.join(base_dir, "extracted_tickets.json")
try:
ensure_dir(base_dir)
with open(existing_tickets_file, 'w', encoding='utf-8') as f:
json.dump({"ticket_codes": list(tickets)}, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
logging.error(f"Erreur lors de l'enregistrement des tickets extraits: {e}")
return False
def extract_tickets_batch(self,
domain: Optional[List] = None,
limit: int = 50,
offset: int = 0,
base_output_dir: str = "output",
skip_existing: bool = True) -> Dict[str, Any]:
"""
Extrait plusieurs tickets selon des critères spécifiques.
Args:
domain: Domaine de recherche au format Odoo
limit: Nombre maximum de tickets à traiter
offset: Nombre de tickets à ignorer
base_output_dir: Répertoire de sortie de base
skip_existing: Ignorer les tickets déjà extraits
Returns:
Dictionnaire contenant les résultats de l'extraction
"""
# Rechercher les tickets
domain = domain or []
tickets = self.search_tickets(domain, limit, offset)
if not tickets:
logging.warning("Aucun ticket ne correspond aux critères de recherche.")
return {"status": "error", "message": "Aucun ticket trouvé", "processed": 0, "skipped": 0}
# Créer le répertoire principal de sortie
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
batch_dir = os.path.join(base_output_dir, f"batch_{timestamp}")
ensure_dir(batch_dir)
# Charger les tickets déjà extraits
existing_tickets = set()
if skip_existing:
existing_tickets = self.load_existing_tickets(base_output_dir)
# Traiter chaque ticket
processed_tickets = []
skipped_tickets = []
failed_tickets = []
for ticket in tickets:
ticket_code = ticket.get("code")
if not ticket_code:
logging.warning(f"Ticket sans code ignoré: {ticket.get('id')}")
failed_tickets.append({"id": ticket.get("id"), "reason": "missing_code"})
continue
if skip_existing and ticket_code in existing_tickets:
logging.info(f"Ticket {ticket_code} déjà extrait, ignoré")
skipped_tickets.append({
"code": ticket_code,
"id": ticket.get("id"),
"name": ticket.get("name"),
"reason": "already_extracted"
})
continue
logging.info(f"Traitement du ticket {ticket_code}...")
# Créer un sous-répertoire pour ce ticket
ticket_dir = os.path.join(batch_dir, f"ticket_{ticket_code}")
try:
# Extraire les données
result = self.ticket_manager.extract_ticket_data(ticket_code, ticket_dir)
if result:
processed_tickets.append({
"code": ticket_code,
"id": ticket.get("id"),
"name": ticket.get("name"),
"dir": ticket_dir,
"messages_count": result.get("messages_count", 0),
"attachments_count": result.get("attachments_count", 0)
})
# Ajouter à la liste des tickets extraits
existing_tickets.add(ticket_code)
logging.info(f"Ticket {ticket_code} extrait avec succès")
else:
logging.error(f"Échec de l'extraction du ticket {ticket_code}")
failed_tickets.append({
"code": ticket_code,
"id": ticket.get("id"),
"name": ticket.get("name"),
"reason": "extraction_failed"
})
except Exception as e:
logging.exception(f"Erreur lors du traitement du ticket {ticket_code}: {e}")
failed_tickets.append({
"code": ticket_code,
"id": ticket.get("id"),
"name": ticket.get("name"),
"reason": f"exception: {str(e)}"
})
# Enregistrer la liste mise à jour des tickets extraits
if skip_existing:
self.save_existing_tickets(base_output_dir, existing_tickets)
# Créer un fichier de résumé du batch
batch_summary = {
"timestamp": timestamp,
"batch_dir": batch_dir,
"search_criteria": {
"domain": domain,
"limit": limit,
"offset": offset
},
"processed_tickets": processed_tickets,
"skipped_tickets": skipped_tickets,
"failed_tickets": failed_tickets,
"stats": {
"total_found": len(tickets),
"processed": len(processed_tickets),
"skipped": len(skipped_tickets),
"failed": len(failed_tickets)
}
}
batch_summary_path = os.path.join(batch_dir, "batch_summary.json")
save_json(batch_summary, batch_summary_path)
logging.info(f"Extraction par lot terminée avec succès.")
logging.info(f"Tickets traités: {len(processed_tickets)}")
logging.info(f"Tickets ignorés: {len(skipped_tickets)}")
logging.info(f"Tickets en échec: {len(failed_tickets)}")
logging.info(f"Répertoire de sortie: {batch_dir}")
return {
"status": "success",
"batch_dir": batch_dir,
"processed": len(processed_tickets),
"skipped": len(skipped_tickets),
"failed": len(failed_tickets),
"summary_file": batch_summary_path,
"processed_tickets": processed_tickets,
"skipped_tickets": skipped_tickets,
"failed_tickets": failed_tickets
}