This commit is contained in:
Ladebeze66 2025-04-03 14:19:02 +02:00
parent ef39245360
commit 49a32554cb
16 changed files with 257 additions and 253 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,25 +1,21 @@
#!/usr/bin/env python3
"""
Script pour récupérer un ticket Odoo en utilisant TicketManager.
Exemple d'usage: python retrieve_ticket.py T0123
"""
import os
import sys
from utils.ticket_manager2 import TicketManager
import json
import argparse
from utils.auth_manager import AuthManager
from utils.ticket_manager import TicketManager
from utils.attachment_manager import AttachmentManager
from utils.message_manager import MessageManager
from utils.utils import save_json
def main():
"""Fonction principale du script"""
parser = argparse.ArgumentParser(description="Extraction de tickets Odoo")
parser.add_argument("ticket_code", help="Code du ticket à extraire (ex: T0167)")
parser.add_argument("--config", default="config.json", help="Chemin vers le fichier de configuration")
parser.add_argument("--output-dir", help="Répertoire de sortie (par défaut: output/ticket_CODE)")
parser.add_argument("--verbose", "-v", action="store_true", help="Afficher plus d'informations")
parser.add_argument("--keep-html", action="store_true", help="Conserver le contenu HTML dans les messages")
parser.add_argument("--no-original", action="store_true", help="Ne pas conserver le corps de message original")
parser.add_argument("--keep-all", action="store_true", help="Conserver tous les messages (y compris ceux d'OdooBot et les messages vides)")
args = parser.parse_args()
# Charger la configuration
@ -46,94 +42,63 @@ def main():
# Définir le répertoire de sortie
output_dir = args.output_dir or os.path.join(config.get("output_dir", "output"), f"ticket_{args.ticket_code}")
# Créer et connecter le gestionnaire de tickets
ticket_manager = TicketManager(url, db, username, api_key)
# Personnaliser le nettoyage des messages HTML si demandé
if args.keep_html:
# Remplacer la méthode de nettoyage par une qui ne fait rien
ticket_manager._clean_messages = lambda messages: [
{**msg, "body_original": msg["body"] if isinstance(msg.get("body"), str) else msg.get("body")}
for msg in messages
]
elif args.keep_all:
# Modifier la méthode pour garder tous les messages mais nettoyer leur contenu
original_clean_method = ticket_manager._clean_messages
def keep_all_messages(messages):
result = []
for message in messages:
cleaned = message.copy()
if "body" in cleaned and cleaned["body"] and isinstance(cleaned["body"], str):
# Nettoyer le contenu mais garder tous les messages
cleaned["body_original"] = cleaned["body"]
# Appliquer le nettoyage de base
import re
from html import unescape
body = cleaned["body"]
body = re.sub(r'<img[^>]*?>', '', body)
body = re.sub(r'<style[^>]*?>.*?</style>', '', body, flags=re.DOTALL)
body = re.sub(r'<script[^>]*?>.*?</script>', '', body, flags=re.DOTALL)
body = re.sub(r' style="[^"]*"', '', body)
body = re.sub(r'<(?:p|div)[^>]*?>', '\n', body)
body = re.sub(r'</(?:p|div)>', '\n', body)
body = re.sub(r'<br[^>]*?>', '\n', body)
body = re.sub(r'<[^>]*?>', '', body)
body = unescape(body)
body = re.sub(r'\n\s*\n', '\n\n', body)
body = re.sub(r' +', ' ', body)
body = body.strip()
cleaned["body"] = body
result.append(cleaned)
return result
ticket_manager._clean_messages = keep_all_messages
elif args.no_original:
# Modifier la méthode pour ne pas conserver le corps original
original_clean_method = ticket_manager._clean_messages
ticket_manager._clean_messages = lambda messages: [
{k: v for k, v in msg.items() if k != "body_original"}
for msg in original_clean_method(messages)
]
if not ticket_manager.login():
print("Échec de la connexion à Odoo")
os.makedirs(output_dir, exist_ok=True)
# Authentification Odoo
auth = AuthManager(url, db, username, api_key)
if not auth.login():
print("Échec de connexion à Odoo")
sys.exit(1)
# Initialiser les gestionnaires
ticket_manager = TicketManager(auth)
attachment_manager = AttachmentManager(auth)
message_manager = MessageManager(auth)
# Récupérer le ticket
if args.verbose:
print(f"Recherche du ticket {args.ticket_code}...")
ticket = ticket_manager.get_ticket_by_code(args.ticket_code)
if not ticket:
print(f"Ticket {args.ticket_code} non trouvé")
sys.exit(1)
ticket_id = ticket.get('id')
# Sauvegarder ticket_info.json
ticket_info_path = os.path.join(output_dir, "ticket_info.json")
save_json(ticket, ticket_info_path)
if args.verbose:
print(f"Ticket {args.ticket_code} trouvé (ID: {ticket.get('id')})")
print(f"Ticket {args.ticket_code} trouvé (ID: {ticket_id})")
print(f"Extraction des données vers {output_dir}...")
# Récupérer et sauvegarder les messages
messages = message_manager.get_ticket_messages(ticket_id)
all_messages_path = os.path.join(output_dir, "all_messages.json")
save_json(messages, all_messages_path)
# Extraire et sauvegarder toutes les données
result = ticket_manager.extract_ticket_data(ticket["id"], output_dir)
# Récupérer et sauvegarder les pièces jointes
attachments = attachment_manager.get_ticket_attachments(ticket_id)
attachments_path = os.path.join(output_dir, "attachments_info.json")
save_json(attachments, attachments_path)
if "error" in result:
print(f"Erreur: {result['error']}")
sys.exit(1)
print(f"Extraction terminée avec succès")
print(f"- Informations du ticket: {result['ticket_info']}")
print(f"- Messages: {result['messages_file']}")
print(f"- Données complètes: {result['ticket_data_file']}")
print(f"- Pièces jointes: {len(result['attachments'])} fichiers")
# Afficher un résumé
print(f"\nRésumé du ticket {args.ticket_code}:")
print(f"- Nom: {ticket.get('name', 'N/A')}")
print(f"- Description: {ticket.get('description', 'N/A')[:100]}...")
print(f"- Étape: {ticket.get('stage_id', ['N/A'])[1] if isinstance(ticket.get('stage_id'), (list, tuple)) else 'N/A'}")
# Génération de structure.json
structure = {
"date_extraction": datetime.now().isoformat(),
"ticket_dir": output_dir,
"fichiers_json": [
"ticket_info.json",
"all_messages.json",
"attachments_info.json"
]
}
structure_path = os.path.join(output_dir, "structure.json")
save_json(structure, structure_path)
print("Extraction terminée avec succès")
print(f"- Informations du ticket: {ticket_info_path}")
print(f"- Messages: {all_messages_path}")
print(f"- Pièces jointes: {attachments_path}")
print(f"- Structure: {structure_path}")
if __name__ == "__main__":
main()
main()

View File

@ -2,4 +2,4 @@
Package utils pour les outils du gestionnaire de tickets
"""
from .ticket_manager2 import TicketManager
from .ticket_manager import TicketManager

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,15 @@
from typing import List, Dict, Any
from .auth_manager import AuthManager
class AttachmentManager:
def __init__(self, auth: AuthManager):
self.auth = auth
def get_ticket_attachments(self, ticket_id: int) -> List[Dict[str, Any]]:
params = {
"model": "ir.attachment",
"method": "search_read",
"args": [[[ "res_id", "=", ticket_id], ["res_model", "=", "project.task"]]],
"kwargs": {"fields": ["id", "name", "datas"]}
}
return self.auth._rpc_call("/web/dataset/call_kw", params)

40
utils/auth_manager.py Normal file
View File

@ -0,0 +1,40 @@
import requests
from typing import Dict, Any
class AuthManager:
def __init__(self, url: str, db: str, username: str, api_key: str):
self.url = url
self.db = db
self.username = username
self.api_key = api_key
self.uid = None
self.session_id = None
def login(self) -> bool:
login_url = f"{self.url}/web/session/authenticate"
login_data = {
"jsonrpc": "2.0",
"params": {
"db": self.db,
"login": self.username,
"password": self.api_key
}
}
response = requests.post(login_url, json=login_data)
result = response.json()
if result.get("error"):
print(f"Erreur de connexion: {result['error']['message']}")
return False
self.uid = result.get("result", {}).get("uid")
self.session_id = response.cookies.get("session_id")
return bool(self.uid)
def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
full_url = f"{self.url}{endpoint}"
headers = {"Content-Type": "application/json"}
data = {"jsonrpc": "2.0", "method": "call", "params": params}
response = requests.post(full_url, json=data, headers=headers, cookies={"session_id": self.session_id})
return response.json().get("result", {})

16
utils/message_manager.py Normal file
View File

@ -0,0 +1,16 @@
from typing import List, Dict, Any
from .auth_manager import AuthManager
from .utils import clean_html
class MessageManager:
def __init__(self, auth: AuthManager):
self.auth = auth
def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]:
params = {
"model": "mail.message",
"method": "search_read",
"args": [[[ "res_id", "=", ticket_id], ["model", "=", "project.task"]]],
"kwargs": {"fields": ["id", "body", "author_id", "date"]}
}
return self.auth._rpc_call("/web/dataset/call_kw", params)

26
utils/ticket_manager.py Normal file
View File

@ -0,0 +1,26 @@
from typing import Dict, Any, List
from .auth_manager import AuthManager
class TicketManager:
def __init__(self, auth: AuthManager):
self.auth = auth
self.model_name = "project.task"
def get_ticket(self, ticket_id: int) -> Dict[str, Any]:
params = {
"model": self.model_name,
"method": "read",
"args": [[ticket_id]],
"kwargs": {"fields": ["id", "name", "description", "stage_id", "user_id", "create_date"]}
}
return self.auth._rpc_call("/web/dataset/call_kw", params)
def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]:
params = {
"model": self.model_name,
"method": "search_read",
"args": [[["code", "=", ticket_code]], ["id", "name", "description", "stage_id", "user_id", "create_date"]],
"kwargs": {"limit": 1}
}
result = self.auth._rpc_call("/web/dataset/call_kw", params)
return result[0] if isinstance(result, list) and result else {}

View File

@ -1,160 +0,0 @@
import os
import json
import base64
import requests
from typing import Dict, List, Any, Optional
from datetime import datetime
import re
from html import unescape
try:
from bs4 import BeautifulSoup
except ImportError:
BeautifulSoup = None
class TicketManager:
def __init__(self, url: str, db: str, username: str, api_key: str):
self.url = url
self.db = db
self.username = username
self.api_key = api_key
self.uid = None
self.session_id = None
self.model_name = "project.task"
def login(self) -> bool:
login_url = f"{self.url}/web/session/authenticate"
login_data = {
"jsonrpc": "2.0",
"params": {
"db": self.db,
"login": self.username,
"password": self.api_key
}
}
response = requests.post(login_url, json=login_data)
result = response.json()
if result.get("error"):
print(f"Erreur de connexion: {result['error']['message']}")
return False
self.uid = result.get("result", {}).get("uid")
self.session_id = response.cookies.get("session_id")
return True if self.uid else False
def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
full_url = f"{self.url}{endpoint}"
headers = {"Content-Type": "application/json"}
data = {"jsonrpc": "2.0", "method": "call", "params": params}
response = requests.post(full_url, json=data, headers=headers, cookies={"session_id": self.session_id})
return response.json().get("result", {})
def get_ticket(self, ticket_id: int) -> Dict[str, Any]:
params = {
"model": self.model_name,
"method": "read",
"args": [[ticket_id]],
"kwargs": {"fields": ["id", "name", "description", "stage_id", "user_id", "create_date"]}
}
result = self._rpc_call("/web/dataset/call_kw", params)
if isinstance(result, list) and len(result) > 0:
return result[0]
else:
print(f"Aucun ticket trouvé avec l'ID {ticket_id} ou une erreur est survenue.")
return {}
def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]:
params = {
"model": self.model_name,
"method": "search_read",
"args": [[["code", "=", ticket_code]], ["id", "name", "description", "stage_id", "user_id", "create_date"]],
"kwargs": {"limit": 1}
}
result = self._rpc_call("/web/dataset/call_kw", params)
if isinstance(result, list) and len(result) > 0:
return result[0]
else:
print(f"Aucun ticket trouvé avec le code {ticket_code} ou une erreur est survenue.")
return {}
def get_ticket_attachments(self, ticket_id: int) -> List[Dict[str, Any]]:
attachments = self._rpc_call("/web/dataset/call_kw", {
"model": "ir.attachment",
"method": "search_read",
"args": [[["res_id", "=", ticket_id], ["res_model", "=", self.model_name]]],
"kwargs": {"fields": ["id", "name", "datas", "mimetype"]}
})
return attachments if isinstance(attachments, list) else []
def clean_html(self, html_content: str) -> str:
if BeautifulSoup:
soup = BeautifulSoup(html_content, "html.parser")
for element in soup(['style', 'script', 'footer', 'header']):
element.extract()
text = soup.get_text(separator=' ', strip=True)
else:
text = re.sub(r'<.*?>', '', html_content)
text = unescape(text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def save_json(self, data: Any, path: str):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def extract_ticket_data(self, ticket_id: int, output_dir: str):
os.makedirs(output_dir, exist_ok=True)
ticket_data = self.get_ticket(ticket_id)
if not ticket_data:
print("Erreur: Ticket non trouvé.")
return None
ticket_info_path = os.path.join(output_dir, "ticket_info.json")
self.save_json(ticket_data, ticket_info_path)
attachments = self.get_ticket_attachments(ticket_id)
attachment_list = []
attachments_dir = os.path.join(output_dir, "attachments")
os.makedirs(attachments_dir, exist_ok=True)
for attachment in attachments:
if "datas" in attachment and attachment["datas"]:
file_name = f"{attachment['id']}_{attachment['name']}"
file_path = os.path.join(attachments_dir, file_name)
with open(file_path, "wb") as f:
f.write(base64.b64decode(attachment["datas"]))
attachment_list.append({
"id": attachment["id"],
"name": attachment["name"],
"file_path": file_path,
"mimetype": attachment.get("mimetype")
})
attachments_info_path = os.path.join(output_dir, "attachments_info.json")
self.save_json(attachment_list, attachments_info_path)
structure = {
"date_extraction": datetime.now().isoformat(),
"ticket_dir": output_dir,
"fichiers_json": [
"ticket_info.json",
"attachments_info.json"
]
}
structure_path = os.path.join(output_dir, "structure.json")
self.save_json(structure, structure_path)
return {
"ticket_info": ticket_info_path,
"attachments_info": attachments_info_path,
"ticket_data_file": structure_path,
}

22
utils/utils.py Normal file
View File

@ -0,0 +1,22 @@
import json
from typing import Any
def save_json(data: Any, path: str):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def clean_html(html_content: str) -> str:
import re
from html import unescape
from bs4 import BeautifulSoup
if BeautifulSoup:
soup = BeautifulSoup(html_content, "html.parser")
text = soup.get_text()
else:
text = re.sub(r'<.*?>', '', html_content)
text = unescape(text)
text = re.sub(r'\s+', ' ', text).strip()
return text