odoo_toolkit/ticket_manager.py
2025-03-19 16:39:03 +01:00

258 lines
12 KiB
Python

from odoo_connection import OdooConnection
import os
import json
from utils import print_error
from config import EXPORT_DIR
class TicketManager:
"""Gestionnaire de tickets simplifié avec seulement les fonctionnalités essentielles"""
def __init__(self):
"""Initialise le gestionnaire de tickets"""
self.conn = OdooConnection()
self.odoo = self.conn.get_odoo_instance()
self.model_name = "project.task"
def _ensure_connection(self):
"""Vérifie et établit la onnexion si nécessaire"""
if not self.odoo:
self.odoo = self.conn.get_odoo_instance()
return self.odoo is not None
def _safe_execute(self, model, method, *args):
"""
Exécute une méthode Odoo en toute sécurité via odoorpc.
Vérifie la connexion avant d'exécuter.
"""
# Vérifier que la connexion est bien établie
if not self._ensure_connection():
print_error("Connexion Odoo indisponible.")
return None
try:
# Exécuter la méthode sur le modèle via OdooRPC
return self.odoo.execute(model, method, *args) # type: ignore
except odoorpc.error.RPCError as e: # type: ignore
print_error(f" Erreur RPC lors de '{method}' sur '{model}': {e}")
return None
except Exception as e:
print_error(f" Erreur inattendue lors de '{method}' sur '{model}': {e}")
return None
def get_model_fields(self, model_name):
"""Récupére tous les champs disponibles pour un modèle donné, en filtrant ceux qui ne sont pas exploitables"""
fields_info = self._safe_execute(model_name, 'fields_get', [], ['name', 'type'])
if not fields_info:
print_error(f"Impossible de récupérer les champs pour {model_name}")
return []
#On filtre les champs qui ne sont pas exploitables
invalid_types = ['many2many']
valid_fields = [field for field, info in fields_info.items() if info.get("type") not in invalid_types]
return valid_fields #Retourne la liste des champs exploitables
def save_raw_ticket_data(self, ticket_data, filename="raw_ticket_data.json"):
"""Sauvegarde les données brutes du ticket dans un fichier JSON"""
file_path = os.path.join(EXPORT_DIR, filename)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(ticket_data, f, indent=4, ensure_ascii=False)
print(f"Données brutes du ticket sauvegardées dans : {file_path}")
def get_all_models(self):
"""Récupérer et sauvegarder tous les modèles disponibles"""
models = self._safe_execute('ir.model', 'search_read', [], ['model', 'name'] )
if not models:
print_error("Impossible de récupérer la liste des modèles.")
return None
#Convertir en dictionnaire {nom_du_modèle: description}
models_dict = {model['model']: model['name'] for model in models}
#Sauvegarder dans un fichier JSON
self.save_raw_ticket_data(models_dict, "all_models.json")
print(f"Liste des modèles sauvegardée dans 'available_models.json")
return models_dict
def get_model_fields_with_types(self, model_name):
"""Récupère et sauvegarde les champs d'un modèle avec leurs types"""
fields_info = self._safe_execute(model_name, 'fields_get', [], ['name', 'type', 'relation'])
if not fields_info:
print_error(f"Impossible de récupérer les champs pour {model_name}")
return {}
# Construire un dictionnaire {champ: {type, relation (si relationnel)}}
fields_dict = {
field: {
"type": info["type"],
"relation": info.get("relation", None)
}
for field, info in fields_info.items()
}
# Sauvegarde en JSON
self.save_raw_ticket_data(fields_dict, f"fields_{model_name}.json")
print(f"Liste des champs du modèle '{model_name}' sauvegardée dans 'fields_{model_name}.json'")
return fields_dict
def get_ticket_by_id(self, ticket_id):
"""Récupère les détails d'un ticket par son ID et exclut dynamiquement les champs invalides"""
fields_to_read = self.get_model_fields(self.model_name) # Récupère tous les champs
ticket_data = self._safe_execute(self.model_name, 'read', [ticket_id], fields_to_read)
if ticket_data:
ticket = self.resolve_relational_fields(ticket_data[0])
self.save_raw_ticket_data(ticket, f"ticket_{ticket_id}_raw.json")
return ticket
print_error(f"Aucun ticket trouvé avec l'ID {ticket_id}")
return None
def resolve_relational_fields(self, ticket):
"""Ajoute les valeurs des champs relationnels"""
fields_info = self._safe_execute(self.model_name, 'fields_get', [], ['type'])
for field, info in fields_info.items(): # type: ignore
if info.get("type") == "many2one" and isinstance(ticket.get(field), list):
ticket[f"{field}_value"] = ticket[field][1] if len(ticket[field]) > 1 else None #Ajoute la valeur lisible en plus du ID
return ticket
def get_ticket_by_code(self, ticket_code):
"""Récupérer un ticket via son code"""
domain = [('code', '=', ticket_code)]
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1)
if not ticket_ids:
return None
# Sauvegarde des données brutes pour analyse
self.save_raw_ticket_data(ticket_ids, f"ticket_{ticket_code}_raw.json")
return self.get_ticket_by_id(ticket_ids[0])
def get_available_projects(self):
"""Retourne la liste des projets disponibles"""
projects = self._safe_execute('project.project', 'search_read', [], ['id', 'name'])
if not projects:
print_error("Aucun projet trouvé.")
return {}
return {proj['id']: proj['name'] for proj in projects}
def export_tickets_by_project_and_stage(self, project_id=None, stage_id=None):
"""
Exporte les tickets selon des critères de projet et d'étape
Args:
project_id: ID du projet à filtrer (optionnel)
stage_id: ID de l'étape à filtrer (optionnel)
"""
# Vérifier la connexion Odoo
if not self._ensure_connection():
print_error("Connexion Odoo indisponible.")
return
# Construire le domaine de recherche en fonction des paramètres
domain = []
if project_id:
domain.append(('project_id', '=', project_id))
if stage_id:
domain.append(('stage_id', '=', stage_id))
# Récupérer les IDs des tickets correspondant aux critères
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1000)
if not ticket_ids:
filters = []
if project_id:
project_name = self._safe_execute('project.project', 'read', [project_id], ['name'])
project_name = project_name[0]['name'] if project_name else str(project_id)
filters.append(f"projet '{project_name}'")
if stage_id:
stage_name = self._safe_execute('project.task.type', 'read', [stage_id], ['name'])
stage_name = stage_name[0]['name'] if stage_name else str(stage_id)
filters.append(f"étape '{stage_name}'")
filter_text = " et ".join(filters) if filters else "critères spécifiés"
print_error(f"Aucun ticket trouvé pour les {filter_text}.")
return
# Récupérer les détails des tickets sans appeler get_ticket_by_id
# pour éviter la création de fichiers individuels
fields_to_read = self.get_model_fields(self.model_name)
all_tickets_data = self._safe_execute(self.model_name, 'read', ticket_ids, fields_to_read)
if not all_tickets_data:
print_error("Impossible de récupérer les détails des tickets.")
return
# Organisation des tickets par projet et par étape
tickets_by_project_stage = {}
for ticket_data in all_tickets_data:
# Résoudre les champs relationnels
ticket = self.resolve_relational_fields(ticket_data)
# Extraire les informations du projet
project_data = ticket.get('project_id', [0, "Sans projet"])
project_id_value = project_data[0] if isinstance(project_data, list) and len(project_data) > 0 else 0
project_name = project_data[1] if isinstance(project_data, list) and len(project_data) > 1 else "Sans projet"
# Extraire les informations de l'étape
stage_data = ticket.get('stage_id', [0, "Non classé"])
stage_id_value = stage_data[0] if isinstance(stage_data, list) and len(stage_data) > 0 else 0
stage_name = stage_data[1] if isinstance(stage_data, list) and len(stage_data) > 1 else "Non classé"
# Clés pour l'organisation des dossiers
project_key = f"{project_id_value}_{project_name}"
stage_key = f"{stage_id_value}_{stage_name}"
# Organiser la structure de données
if project_key not in tickets_by_project_stage:
tickets_by_project_stage[project_key] = {}
if stage_key not in tickets_by_project_stage[project_key]:
tickets_by_project_stage[project_key][stage_key] = {}
# Utiliser l'ID du ticket comme clé pour éviter les doublons
ticket_id = ticket.get('id', 0)
tickets_by_project_stage[project_key][stage_key][str(ticket_id)] = ticket
# Création des répertoires d'exportation
export_base_dir = os.path.join(EXPORT_DIR, "tickets_export")
os.makedirs(export_base_dir, exist_ok=True)
# Sauvegarder les tickets un par un dans des fichiers individuels
# organisés par répertoires de projet et d'étape
for project_key, stages in tickets_by_project_stage.items():
project_dir = os.path.join(export_base_dir, project_key)
os.makedirs(project_dir, exist_ok=True)
for stage_key, tickets in stages.items():
stage_dir = os.path.join(project_dir, stage_key)
os.makedirs(stage_dir, exist_ok=True)
# Créer un index de tous les tickets dans l'étape
tickets_index = {}
for ticket_id, ticket in tickets.items():
ticket_name = ticket.get('name', 'Sans nom')
tickets_index[ticket_id] = {
"name": ticket_name,
"code": ticket.get('code', ''),
"filename": f"ticket_{ticket_id}.json"
}
# Sauvegarder l'index des tickets
index_path = os.path.join(stage_dir, "index.json")
with open(index_path, "w", encoding="utf-8") as f:
json.dump(tickets_index, f, indent=4, ensure_ascii=False)
# Sauvegarder chaque ticket dans un fichier séparé
for ticket_id, ticket in tickets.items():
ticket_path = os.path.join(stage_dir, f"ticket_{ticket_id}.json")
with open(ticket_path, "w", encoding="utf-8") as f:
json.dump(ticket, f, indent=4, ensure_ascii=False)
print(f"Sauvegarde de {len(tickets)} tickets pour le projet '{project_key}', étape '{stage_key}'")
print(f"Exportation terminée. Les fichiers sont organisés dans : {export_base_dir}/")