mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-16 11:57:54 +01:00
408 lines
14 KiB
Python
408 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Module pour gérer l'extraction de tickets depuis Odoo.
|
|
Cette version est simplifiée et indépendante de odoo_toolkit.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import base64
|
|
from typing import Dict, List, Any, Optional
|
|
import requests
|
|
from agents import AgentFiltreImages, AgentAnalyseImage, AgentQuestionReponse
|
|
|
|
class TicketManager:
|
|
"""
|
|
Gestionnaire de tickets pour extraire des données depuis Odoo.
|
|
"""
|
|
|
|
def __init__(self, url: str, db: str, username: str, api_key: str):
|
|
"""
|
|
Initialise le gestionnaire de tickets avec les paramètres de connexion.
|
|
|
|
Args:
|
|
url: URL du serveur Odoo
|
|
db: Nom de la base de données
|
|
username: Nom d'utilisateur
|
|
api_key: Clé API ou mot de passe
|
|
"""
|
|
self.url = url
|
|
self.db = db
|
|
self.username = username
|
|
self.api_key = api_key
|
|
self.uid = None
|
|
self.session_id = None
|
|
self.model_name = "project.task" # Modèle par défaut pour les tickets
|
|
|
|
def login(self) -> bool:
|
|
"""
|
|
Établit la connexion au serveur Odoo.
|
|
|
|
Returns:
|
|
True si la connexion réussit, False sinon
|
|
"""
|
|
try:
|
|
# Point d'entrée pour le login
|
|
login_url = f"{self.url}/web/session/authenticate"
|
|
|
|
# Données pour la requête de login
|
|
login_data = {
|
|
"jsonrpc": "2.0",
|
|
"params": {
|
|
"db": self.db,
|
|
"login": self.username,
|
|
"password": self.api_key
|
|
}
|
|
}
|
|
|
|
# Effectuer la requête
|
|
response = requests.post(login_url, json=login_data)
|
|
response.raise_for_status()
|
|
|
|
# Extraire les résultats
|
|
result = response.json()
|
|
if result.get("error"):
|
|
print(f"Erreur de connexion: {result['error']['message']}")
|
|
return False
|
|
|
|
# Récupérer l'ID utilisateur et la session
|
|
self.uid = result.get("result", {}).get("uid")
|
|
self.session_id = response.cookies.get("session_id")
|
|
|
|
if not self.uid:
|
|
print("Erreur: Impossible de récupérer l'ID utilisateur")
|
|
return False
|
|
|
|
print(f"Connecté avec succès à {self.url} (User ID: {self.uid})")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Erreur de connexion: {str(e)}")
|
|
return False
|
|
|
|
def _ensure_connection(self) -> bool:
|
|
"""
|
|
Vérifie que la connexion est établie, tente de se reconnecter si nécessaire.
|
|
|
|
Returns:
|
|
True si la connexion est disponible, False sinon
|
|
"""
|
|
if not self.uid or not self.session_id:
|
|
return self.login()
|
|
return True
|
|
|
|
def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Effectue un appel RPC vers le serveur Odoo.
|
|
|
|
Args:
|
|
endpoint: Point d'entrée de l'API (/web/dataset/call_kw, etc.)
|
|
params: Paramètres de la requête
|
|
|
|
Returns:
|
|
Résultat de la requête
|
|
"""
|
|
if not self._ensure_connection():
|
|
return {"error": "Non connecté"}
|
|
|
|
try:
|
|
# Préparer la requête
|
|
full_url = f"{self.url}{endpoint}"
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
# Données de la requête
|
|
data = {
|
|
"jsonrpc": "2.0",
|
|
"method": "call",
|
|
"params": params
|
|
}
|
|
|
|
# Effectuer la requête
|
|
response = requests.post(
|
|
full_url,
|
|
json=data,
|
|
headers=headers,
|
|
cookies={"session_id": self.session_id} if self.session_id else None
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Traiter la réponse
|
|
result = response.json()
|
|
if result.get("error"):
|
|
return {"error": result["error"]["message"]}
|
|
|
|
return result.get("result", {})
|
|
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def search_read(self, model: str, domain: List, fields: List[str], limit: int = 0) -> List[Dict[str, Any]]:
|
|
"""
|
|
Recherche et lit des enregistrements selon un domaine.
|
|
|
|
Args:
|
|
model: Nom du modèle
|
|
domain: Domaine de recherche
|
|
fields: Champs à récupérer
|
|
limit: Nombre max de résultats (0 pour illimité)
|
|
|
|
Returns:
|
|
Liste des enregistrements trouvés
|
|
"""
|
|
params = {
|
|
"model": model,
|
|
"method": "search_read",
|
|
"args": [domain, fields],
|
|
"kwargs": {"limit": limit}
|
|
}
|
|
|
|
result = self._rpc_call("/web/dataset/call_kw", params)
|
|
if isinstance(result, dict) and "error" in result:
|
|
print(f"Erreur lors de la recherche: {result['error']}")
|
|
return []
|
|
|
|
return result if isinstance(result, list) else []
|
|
|
|
def read(self, model: str, ids: List[int], fields: List[str]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Lit des enregistrements par leurs IDs.
|
|
|
|
Args:
|
|
model: Nom du modèle
|
|
ids: Liste des IDs à lire
|
|
fields: Champs à récupérer
|
|
|
|
Returns:
|
|
Liste des enregistrements lus
|
|
"""
|
|
params = {
|
|
"model": model,
|
|
"method": "read",
|
|
"args": [ids, fields],
|
|
"kwargs": {}
|
|
}
|
|
|
|
result = self._rpc_call("/web/dataset/call_kw", params)
|
|
if isinstance(result, dict) and "error" in result:
|
|
print(f"Erreur lors de la lecture: {result['error']}")
|
|
return []
|
|
|
|
return result if isinstance(result, list) else []
|
|
|
|
def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]:
|
|
"""
|
|
Récupère un ticket par son code.
|
|
|
|
Args:
|
|
ticket_code: Code du ticket à récupérer
|
|
|
|
Returns:
|
|
Données du ticket ou dictionnaire vide si non trouvé
|
|
"""
|
|
# Rechercher l'ID du ticket par son code
|
|
tickets = self.search_read(
|
|
model=self.model_name,
|
|
domain=[("code", "=", ticket_code)],
|
|
fields=["id"],
|
|
limit=1
|
|
)
|
|
|
|
if not tickets:
|
|
print(f"Aucun ticket trouvé avec le code {ticket_code}")
|
|
return {}
|
|
|
|
# Récupérer toutes les données du ticket
|
|
ticket_id = tickets[0]["id"]
|
|
return self.get_ticket_by_id(ticket_id)
|
|
|
|
def get_ticket_by_id(self, ticket_id: int) -> Dict[str, Any]:
|
|
"""
|
|
Récupère un ticket par son ID.
|
|
|
|
Args:
|
|
ticket_id: ID du ticket à récupérer
|
|
|
|
Returns:
|
|
Données du ticket ou dictionnaire vide si non trouvé
|
|
"""
|
|
# Récupérer les champs disponibles pour le modèle
|
|
fields_info = self._get_model_fields(self.model_name)
|
|
|
|
# Lire les données du ticket
|
|
tickets = self.read(
|
|
model=self.model_name,
|
|
ids=[ticket_id],
|
|
fields=fields_info
|
|
)
|
|
|
|
if not tickets:
|
|
print(f"Aucun ticket trouvé avec l'ID {ticket_id}")
|
|
return {}
|
|
|
|
return tickets[0]
|
|
|
|
def _get_model_fields(self, model_name: str) -> List[str]:
|
|
"""
|
|
Récupère la liste des champs disponibles pour un modèle.
|
|
|
|
Args:
|
|
model_name: Nom du modèle
|
|
|
|
Returns:
|
|
Liste des noms de champs
|
|
"""
|
|
params = {
|
|
"model": model_name,
|
|
"method": "fields_get",
|
|
"args": [],
|
|
"kwargs": {"attributes": ["name", "type"]}
|
|
}
|
|
|
|
result = self._rpc_call("/web/dataset/call_kw", params)
|
|
if "error" in result:
|
|
print(f"Erreur lors de la récupération des champs: {result['error']}")
|
|
return []
|
|
|
|
# Filtrer les types de champs problématiques
|
|
invalid_types = ["many2many", "binary"]
|
|
valid_fields = [
|
|
field for field, info in result.items()
|
|
if info.get("type") not in invalid_types
|
|
]
|
|
|
|
return valid_fields
|
|
|
|
def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]:
|
|
"""
|
|
Récupère les messages d'un ticket.
|
|
|
|
Args:
|
|
ticket_id: ID du ticket
|
|
|
|
Returns:
|
|
Liste des messages du ticket
|
|
"""
|
|
# D'abord récupérer les IDs des messages
|
|
ticket = self.read(
|
|
model=self.model_name,
|
|
ids=[ticket_id],
|
|
fields=["message_ids"]
|
|
)
|
|
|
|
if not ticket or "message_ids" not in ticket[0]:
|
|
print(f"Impossible de récupérer les messages pour le ticket {ticket_id}")
|
|
return []
|
|
|
|
message_ids = ticket[0]["message_ids"]
|
|
|
|
# Récupérer les détails des messages
|
|
messages = self.read(
|
|
model="mail.message",
|
|
ids=message_ids,
|
|
fields=["id", "body", "date", "author_id", "email_from", "subject", "parent_id"]
|
|
)
|
|
|
|
return messages
|
|
|
|
def get_ticket_attachments(self, ticket_id: int, download_path: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Récupère les pièces jointes d'un ticket, avec option de téléchargement.
|
|
|
|
Args:
|
|
ticket_id: ID du ticket
|
|
download_path: Chemin où télécharger les pièces jointes (optionnel)
|
|
|
|
Returns:
|
|
Liste des informations sur les pièces jointes
|
|
"""
|
|
# Rechercher les pièces jointes liées au ticket
|
|
attachments = self.search_read(
|
|
model="ir.attachment",
|
|
domain=[("res_model", "=", self.model_name), ("res_id", "=", ticket_id)],
|
|
fields=["id", "name", "mimetype", "create_date", "datas"]
|
|
)
|
|
|
|
if not attachments:
|
|
print(f"Aucune pièce jointe trouvée pour le ticket {ticket_id}")
|
|
return []
|
|
|
|
if download_path:
|
|
# Créer le répertoire si nécessaire
|
|
os.makedirs(download_path, exist_ok=True)
|
|
|
|
# Télécharger chaque pièce jointe
|
|
for attachment in attachments:
|
|
if "datas" in attachment and attachment["datas"]:
|
|
# Déchiffrer les données base64
|
|
binary_data = base64.b64decode(attachment["datas"])
|
|
|
|
# Nettoyer le nom de fichier
|
|
safe_name = attachment["name"].replace("/", "_").replace("\\", "_")
|
|
file_path = os.path.join(download_path, f"{attachment['id']}_{safe_name}")
|
|
|
|
# Sauvegarder le fichier
|
|
with open(file_path, "wb") as f:
|
|
f.write(binary_data)
|
|
|
|
# Remplacer les données binaires par le chemin du fichier
|
|
attachment["file_path"] = file_path
|
|
del attachment["datas"]
|
|
|
|
return attachments
|
|
|
|
def extract_ticket_data(self, ticket_id: int, output_dir: str) -> Dict[str, Any]:
|
|
"""
|
|
Extrait toutes les données d'un ticket, y compris messages et pièces jointes.
|
|
|
|
Args:
|
|
ticket_id: ID du ticket
|
|
output_dir: Répertoire de sortie
|
|
|
|
Returns:
|
|
Dictionnaire avec toutes les données du ticket
|
|
"""
|
|
# Créer le répertoire de sortie
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Récupérer les données du ticket
|
|
ticket = self.get_ticket_by_id(ticket_id)
|
|
if not ticket:
|
|
return {"error": f"Ticket {ticket_id} non trouvé"}
|
|
|
|
# Sauvegarder les données du ticket
|
|
ticket_path = os.path.join(output_dir, "ticket_info.json")
|
|
with open(ticket_path, "w", encoding="utf-8") as f:
|
|
json.dump(ticket, f, indent=2, ensure_ascii=False)
|
|
|
|
# Récupérer et sauvegarder les messages
|
|
messages = self.get_ticket_messages(ticket_id)
|
|
messages_path = os.path.join(output_dir, "messages.json")
|
|
with open(messages_path, "w", encoding="utf-8") as f:
|
|
json.dump(messages, f, indent=2, ensure_ascii=False)
|
|
|
|
# Récupérer et sauvegarder les pièces jointes
|
|
attachments_dir = os.path.join(output_dir, "attachments")
|
|
attachments = self.get_ticket_attachments(ticket_id, attachments_dir)
|
|
attachments_path = os.path.join(output_dir, "attachments_info.json")
|
|
with open(attachments_path, "w", encoding="utf-8") as f:
|
|
json.dump(attachments, f, indent=2, ensure_ascii=False)
|
|
|
|
# Compiler toutes les informations
|
|
result = {
|
|
"ticket": ticket,
|
|
"messages": messages,
|
|
"attachments": [
|
|
{k: v for k, v in a.items() if k != "datas"}
|
|
for a in attachments
|
|
],
|
|
"files": {
|
|
"ticket_info": ticket_path,
|
|
"messages": messages_path,
|
|
"attachments_info": attachments_path,
|
|
"attachments_dir": attachments_dir
|
|
}
|
|
}
|
|
|
|
return result |