mirror of
https://github.com/Ladebeze66/projetcbaollm.git
synced 2025-12-16 20:47:52 +01:00
429 lines
18 KiB
Python
429 lines
18 KiB
Python
from odoo_connection import OdooConnection
|
|
import os
|
|
import json
|
|
from utils import save_json, ensure_export_directory
|
|
from config import EXPORT_DIR
|
|
from data_filter import filter_ticket_data
|
|
|
|
class TicketManager:
|
|
"""Gestionnaire de tickets simplifié avec seulement les fonctionnalités essentielles"""
|
|
|
|
def __init__(self):
|
|
"""Initialise le gestionnaire de tickets"""
|
|
self.conn = OdooConnection()
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
self.model_name = "project.task"
|
|
|
|
def _check_connection(self):
|
|
"""Vérifie que la connexion Odoo est active"""
|
|
if self.odoo is None:
|
|
print("Erreur: La connexion à Odoo n'est pas établie.")
|
|
print("Tentative de reconnexion...")
|
|
try:
|
|
self.conn = OdooConnection()
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
except Exception as e:
|
|
print(f"Erreur lors de la tentative de reconnexion: {e}")
|
|
self.odoo = None
|
|
|
|
return self.odoo is not None
|
|
|
|
def _safe_execute(self, model, method, *args):
|
|
"""Exécute une méthode Odoo de manière sécurisée"""
|
|
if not self._check_connection():
|
|
return None
|
|
|
|
try:
|
|
if self.odoo is None:
|
|
print("Erreur: Impossible d'exécuter la commande, la connexion à Odoo est inactive.")
|
|
return None
|
|
|
|
return self.odoo.execute(model, method, *args)
|
|
except Exception as e:
|
|
print(f"Erreur lors de l'exécution de {method} sur {model}: {e}")
|
|
return None
|
|
|
|
def list_models(self):
|
|
"""Affiche la liste des modèles disponibles dans Odoo"""
|
|
models = self._safe_execute('ir.model', 'search_read', [], ['model', 'name'])
|
|
if not models:
|
|
print("Aucun modèle disponible.")
|
|
return []
|
|
|
|
print("\nListe des modèles disponibles:")
|
|
for model in models:
|
|
print(f"Modèle: {model['name']} (ID: {model['model']})")
|
|
return models
|
|
|
|
def list_model_fields(self, model_name):
|
|
"""Affiche les champs d'un modèle donné"""
|
|
fields_info = self._safe_execute(model_name, 'fields_get')
|
|
if not fields_info:
|
|
print(f"Aucun champ trouvé pour le modèle {model_name}.")
|
|
return []
|
|
|
|
print(f"\nChamps du modèle {model_name}:")
|
|
for field_name, field_data in fields_info.items():
|
|
print(f"Champ: {field_name} - Type: {field_data['type']}")
|
|
return fields_info
|
|
|
|
def export_model_fields_to_json(self, model_name, filename):
|
|
"""Exporte les informations des champs d'un modèle en JSON"""
|
|
fields_info = self.list_model_fields(model_name)
|
|
if not fields_info:
|
|
return False
|
|
|
|
file_path = os.path.join(EXPORT_DIR, f"{filename}.json")
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(fields_info, f, indent=4, ensure_ascii=False)
|
|
print(f"Informations des champs exportées dans {file_path}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Erreur lors de l'exportation des champs: {str(e)}")
|
|
return False
|
|
|
|
def get_ticket_by_id(self, ticket_id):
|
|
"""Récupère un ticket par son ID"""
|
|
all_fields = self._safe_execute(self.model_name, 'fields_get')
|
|
if not all_fields:
|
|
return None
|
|
|
|
field_names = list(all_fields.keys())
|
|
|
|
ticket_data = self._safe_execute(self.model_name, 'read', [ticket_id], field_names)
|
|
if not ticket_data:
|
|
return None
|
|
|
|
ticket = ticket_data[0]
|
|
formatted_ticket = {
|
|
"ID du Ticket": ticket["id"],
|
|
"Nom": ticket["name"],
|
|
"Code": ticket.get("code", "N/A"),
|
|
"Date Limite": ticket.get("date_deadline", "Non défini"),
|
|
"Champs Simples": {},
|
|
"Champs Relationnels": {},
|
|
"Discussions": []
|
|
}
|
|
|
|
# Classer les champs par type
|
|
for field_name, field_info in all_fields.items():
|
|
field_type = field_info.get('type')
|
|
if field_type in ["many2one", "one2many", "many2many"]:
|
|
if field_name in ticket:
|
|
formatted_ticket["Champs Relationnels"][field_name] = ticket[field_name]
|
|
else:
|
|
if field_name in ticket:
|
|
formatted_ticket["Champs Simples"][field_name] = ticket[field_name]
|
|
|
|
# Récupérer les discussions
|
|
if "message_ids" in ticket and ticket["message_ids"]:
|
|
formatted_ticket["Discussions"] = self.get_ticket_discussions(ticket["message_ids"])
|
|
|
|
return formatted_ticket
|
|
|
|
def get_ticket_discussions(self, message_ids):
|
|
"""Récupère et nettoie les discussions associées à un ticket"""
|
|
model_name = "mail.message"
|
|
|
|
if not message_ids:
|
|
return []
|
|
|
|
messages = self._safe_execute(model_name, 'read', message_ids, ["id", "subject", "body", "author_id", "date"])
|
|
if messages is None:
|
|
return []
|
|
|
|
formatted_messages = []
|
|
for msg in messages:
|
|
# Utiliser la fonction clean_html de data_filter
|
|
from data_filter import clean_html
|
|
cleaned_content = clean_html(msg["body"])
|
|
|
|
if cleaned_content:
|
|
formatted_messages.append({
|
|
"ID Message": msg["id"],
|
|
"Sujet": msg["subject"] or "Sans sujet",
|
|
"Contenu": cleaned_content,
|
|
"Auteur": msg["author_id"][1] if msg["author_id"] else "Inconnu",
|
|
"Date": msg["date"]
|
|
})
|
|
|
|
return formatted_messages
|
|
|
|
def search_tickets_by_fields(self, field_criteria, limit=50):
|
|
"""Recherche des tickets selon des critères de champs"""
|
|
# Construire le domaine de recherche
|
|
domain = []
|
|
for field, value in field_criteria.items():
|
|
domain.append((field, '=', value))
|
|
|
|
print(f"\nRecherche de tickets avec les critères: {field_criteria}")
|
|
|
|
# Rechercher les tickets correspondants
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, limit)
|
|
if not ticket_ids:
|
|
print("Aucun ticket ne correspond aux critères de recherche.")
|
|
return []
|
|
|
|
print(f"Nombre de tickets trouvés: {len(ticket_ids)}")
|
|
|
|
# Récupérer tous les tickets trouvés
|
|
tickets = []
|
|
for ticket_id in ticket_ids:
|
|
ticket = self.get_ticket_by_id(ticket_id)
|
|
if ticket:
|
|
tickets.append(ticket)
|
|
|
|
return tickets
|
|
|
|
def export_tickets_by_project_and_stage(self, project_id, selected_stage_ids=None):
|
|
"""Exporte les tickets d'un projet classés par étape
|
|
|
|
Args:
|
|
project_id (int): L'ID du projet
|
|
selected_stage_ids (list, optional): Liste des IDs d'étapes à inclure. Si None, toutes les étapes sont incluses.
|
|
"""
|
|
try:
|
|
# Convertir project_id en entier
|
|
project_id = int(project_id)
|
|
|
|
# Vérifier si le projet existe
|
|
projects = self._safe_execute('project.project', 'search_read', [('id', '=', project_id)], ['id', 'name'])
|
|
if not projects:
|
|
print(f"Aucun projet trouvé avec l'ID: {project_id}")
|
|
return
|
|
|
|
project_name = projects[0]['name']
|
|
print(f"\nRécupération des tickets du projet: {project_name} (ID: {project_id})")
|
|
|
|
# Construire le domaine de recherche
|
|
domain = [('project_id', '=', project_id)]
|
|
|
|
# Si des étapes spécifiques sont sélectionnées, ajouter à la condition
|
|
if selected_stage_ids and len(selected_stage_ids) > 0:
|
|
domain.append(('stage_id', 'in', selected_stage_ids))
|
|
print(f"Filtrage sur {len(selected_stage_ids)} étapes sélectionnées")
|
|
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1000)
|
|
|
|
if not ticket_ids:
|
|
print(f"Aucun ticket trouvé avec les critères spécifiés.")
|
|
return
|
|
|
|
# Récupérer les tickets avec les champs essentiels
|
|
fields_to_read = ['id', 'name', 'code', 'stage_id', 'date_deadline',
|
|
'description', 'priority', 'sequence', 'message_ids']
|
|
|
|
tickets_data = self._safe_execute(self.model_name, 'read', ticket_ids, fields_to_read)
|
|
|
|
if not tickets_data:
|
|
print("Erreur lors de la récupération des données des tickets.")
|
|
return
|
|
|
|
print(f"Nombre de tickets trouvés: {len(tickets_data)}")
|
|
|
|
# Convertir les tickets au format approprié et les filtrer
|
|
formatted_tickets = []
|
|
for ticket in tickets_data:
|
|
formatted_ticket = {
|
|
"ID du Ticket": ticket["id"],
|
|
"Nom": ticket["name"],
|
|
"Code": ticket.get("code", "N/A"),
|
|
"Date Limite": ticket.get("date_deadline", "Non défini"),
|
|
"Champs Simples": {
|
|
"description": ticket.get("description", ""),
|
|
"priority": ticket.get("priority", ""),
|
|
"sequence": ticket.get("sequence", "")
|
|
},
|
|
"Champs Relationnels": {
|
|
"project_id": [project_id, project_name],
|
|
"stage_id": ticket.get("stage_id", [0, "Inconnu"])
|
|
},
|
|
"Discussions": []
|
|
}
|
|
|
|
# Récupérer les discussions si nécessaire
|
|
if ticket.get("message_ids"):
|
|
formatted_ticket["Discussions"] = self.get_ticket_discussions(ticket["message_ids"])
|
|
|
|
# Filtrer le ticket
|
|
filtered_ticket = filter_ticket_data(formatted_ticket)
|
|
formatted_tickets.append(filtered_ticket)
|
|
|
|
if not formatted_tickets:
|
|
print("Aucun ticket n'a pu être formaté correctement.")
|
|
return
|
|
|
|
# Créer un répertoire pour le projet avec son nom pour plus de clarté
|
|
safe_project_name = project_name.replace('/', '_').replace(' ', '_').replace('\\', '_')
|
|
project_dir = ensure_export_directory(f"project_{project_id}_{safe_project_name}")
|
|
|
|
print(f"\nExportation des tickets pour le projet {project_name} (ID: {project_id}):")
|
|
|
|
# Classer les tickets par stage_id
|
|
tickets_by_stage = {}
|
|
for ticket in formatted_tickets:
|
|
# Vérifier que stage_id existe dans les champs relationnels
|
|
if 'Champs Relationnels' in ticket and 'stage_id' in ticket['Champs Relationnels']:
|
|
stage_data = ticket['Champs Relationnels']['stage_id']
|
|
|
|
if isinstance(stage_data, list) and len(stage_data) > 0:
|
|
stage_id = stage_data[0]
|
|
stage_name = stage_data[1] if len(stage_data) > 1 else "Inconnu"
|
|
else:
|
|
# Si stage_id n'est pas une liste, utiliser une valeur par défaut
|
|
stage_id = 0
|
|
stage_name = "Inconnu"
|
|
|
|
key = f"{stage_id}_{stage_name}"
|
|
if key not in tickets_by_stage:
|
|
tickets_by_stage[key] = []
|
|
tickets_by_stage[key].append(ticket)
|
|
else:
|
|
# Si le ticket n'a pas de stage_id, le mettre dans "Non classé"
|
|
key = "0_Non_classé"
|
|
if key not in tickets_by_stage:
|
|
tickets_by_stage[key] = []
|
|
tickets_by_stage[key].append(ticket)
|
|
|
|
# Vérifier si des tickets ont été classés
|
|
if not tickets_by_stage:
|
|
print("Aucun ticket n'a pu être classé par stage_id.")
|
|
# Sauvegarde tous les tickets dans un seul fichier
|
|
all_tickets_file = os.path.join(project_dir, "all_tickets.json")
|
|
save_json(all_tickets_file, formatted_tickets)
|
|
print(f"Tous les tickets ont été sauvegardés dans {all_tickets_file}")
|
|
return
|
|
|
|
# Créer des répertoires pour chaque stage_id et sauvegarder les tickets
|
|
for stage_key, stage_tickets in tickets_by_stage.items():
|
|
stage_parts = stage_key.split('_', 1)
|
|
stage_id = stage_parts[0]
|
|
stage_name = stage_parts[1] if len(stage_parts) > 1 else "Inconnu"
|
|
safe_stage_name = stage_name.replace('/', '_').replace('\\', '_')
|
|
stage_dir = os.path.join(project_dir, f"{stage_id}_{safe_stage_name}")
|
|
os.makedirs(stage_dir, exist_ok=True)
|
|
|
|
print(f"- Stage '{stage_name}' (ID: {stage_id}): {len(stage_tickets)} tickets")
|
|
|
|
# Sauvegarder tous les tickets du stage dans un fichier unique
|
|
all_tickets_file = os.path.join(stage_dir, f"all_tickets.json")
|
|
save_json(all_tickets_file, stage_tickets)
|
|
|
|
# Sauvegarder chaque ticket individuellement
|
|
for ticket in stage_tickets:
|
|
ticket_code = ticket.get('Code', 'N/A')
|
|
ticket_id = ticket.get('ID du Ticket', 'N/A')
|
|
ticket_file = os.path.join(stage_dir, f"ticket_{ticket_code}_{ticket_id}.json")
|
|
save_json(ticket_file, ticket)
|
|
|
|
print(f" Tickets sauvegardés dans {stage_dir}/")
|
|
|
|
print(f"\nExportation terminée dans {project_dir}/")
|
|
|
|
except ValueError:
|
|
print(f"Erreur: Le project_id '{project_id}' n'est pas un nombre valide.")
|
|
except Exception as e:
|
|
print(f"Erreur lors de l'exportation des tickets: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def get_available_projects(self):
|
|
"""Récupère la liste des projets disponibles"""
|
|
projects = self._safe_execute('project.project', 'search_read', [], ['id', 'name'])
|
|
if not projects:
|
|
print("Aucun projet disponible.")
|
|
return {}
|
|
|
|
projects_dict = {}
|
|
print("\nProjets disponibles:")
|
|
for project in projects:
|
|
project_id = project['id']
|
|
project_name = project['name']
|
|
projects_dict[project_id] = project_name
|
|
print(f"ID: {project_id} - {project_name}")
|
|
|
|
return projects_dict
|
|
|
|
def get_project_tickets_summary(self, project_id):
|
|
"""Récupère un résumé des tickets d'un projet pour permettre la sélection"""
|
|
domain = [('project_id', '=', project_id)]
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 200)
|
|
|
|
if not ticket_ids:
|
|
print(f"Aucun ticket trouvé pour le projet ID: {project_id}")
|
|
return []
|
|
|
|
# Récupérer des informations résumées sur les tickets
|
|
fields_to_read = ['id', 'name', 'code', 'stage_id', 'date_deadline']
|
|
tickets_data = self._safe_execute(self.model_name, 'read', ticket_ids, fields_to_read)
|
|
|
|
if not tickets_data:
|
|
print("Erreur lors de la récupération des données des tickets.")
|
|
return []
|
|
|
|
# Formater les données pour l'affichage
|
|
summary_tickets = []
|
|
for ticket in tickets_data:
|
|
stage_name = "Non défini"
|
|
if ticket.get('stage_id'):
|
|
stage_name = ticket['stage_id'][1] if isinstance(ticket['stage_id'], list) and len(ticket['stage_id']) > 1 else str(ticket['stage_id'])
|
|
|
|
summary_tickets.append({
|
|
'id': ticket['id'],
|
|
'name': ticket['name'],
|
|
'code': ticket.get('code', 'N/A'),
|
|
'stage_id': ticket.get('stage_id', [0, "Non défini"]),
|
|
'stage_name': stage_name,
|
|
'date_deadline': ticket.get('date_deadline', 'Non défini')
|
|
})
|
|
|
|
return summary_tickets
|
|
|
|
def get_project_stages(self, project_id):
|
|
"""Récupère les étapes (stage_id) disponibles pour un projet donné
|
|
|
|
Args:
|
|
project_id (int): L'ID du projet
|
|
|
|
Returns:
|
|
dict: Un dictionnaire des étapes {stage_id: stage_name}
|
|
"""
|
|
# Récupérer les tickets du projet
|
|
domain = [('project_id', '=', project_id)]
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1000)
|
|
|
|
if not ticket_ids:
|
|
print(f"Aucun ticket trouvé pour le projet ID: {project_id}")
|
|
return {}
|
|
|
|
# Récupérer uniquement les stage_id des tickets
|
|
tickets_data = self._safe_execute(self.model_name, 'read', ticket_ids, ['stage_id'])
|
|
|
|
if not tickets_data:
|
|
print("Erreur lors de la récupération des étapes.")
|
|
return {}
|
|
|
|
# Extraire les stage_id uniques
|
|
stage_ids = set()
|
|
for ticket in tickets_data:
|
|
if ticket.get('stage_id'):
|
|
stage_ids.add(ticket['stage_id'][0])
|
|
|
|
if not stage_ids:
|
|
print("Aucune étape trouvée pour ce projet.")
|
|
return {}
|
|
|
|
# Récupérer les noms des étapes
|
|
stages_data = self._safe_execute('project.task.type', 'read', list(stage_ids), ['id', 'name'])
|
|
|
|
if not stages_data:
|
|
print("Erreur lors de la récupération des noms des étapes.")
|
|
return {}
|
|
|
|
# Créer un dictionnaire des étapes
|
|
stages_dict = {}
|
|
for stage in stages_data:
|
|
stages_dict[stage['id']] = stage['name']
|
|
|
|
return stages_dict |