mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-16 03:37:47 +01:00
161 lines
5.9 KiB
Python
161 lines
5.9 KiB
Python
import os
|
|
import json
|
|
import base64
|
|
import requests
|
|
from typing import Dict, List, Any, Optional
|
|
from datetime import datetime
|
|
import re
|
|
from html import unescape
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
except ImportError:
|
|
BeautifulSoup = None
|
|
|
|
class TicketManager:
|
|
def __init__(self, url: str, db: str, username: str, api_key: str):
|
|
self.url = url
|
|
self.db = db
|
|
self.username = username
|
|
self.api_key = api_key
|
|
self.uid = None
|
|
self.session_id = None
|
|
self.model_name = "project.task"
|
|
|
|
def login(self) -> bool:
|
|
login_url = f"{self.url}/web/session/authenticate"
|
|
login_data = {
|
|
"jsonrpc": "2.0",
|
|
"params": {
|
|
"db": self.db,
|
|
"login": self.username,
|
|
"password": self.api_key
|
|
}
|
|
}
|
|
response = requests.post(login_url, json=login_data)
|
|
result = response.json()
|
|
|
|
if result.get("error"):
|
|
print(f"Erreur de connexion: {result['error']['message']}")
|
|
return False
|
|
|
|
self.uid = result.get("result", {}).get("uid")
|
|
self.session_id = response.cookies.get("session_id")
|
|
return True if self.uid else False
|
|
|
|
def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
full_url = f"{self.url}{endpoint}"
|
|
headers = {"Content-Type": "application/json"}
|
|
data = {"jsonrpc": "2.0", "method": "call", "params": params}
|
|
|
|
response = requests.post(full_url, json=data, headers=headers, cookies={"session_id": self.session_id})
|
|
return response.json().get("result", {})
|
|
|
|
def get_ticket(self, ticket_id: int) -> Dict[str, Any]:
|
|
params = {
|
|
"model": self.model_name,
|
|
"method": "read",
|
|
"args": [[ticket_id]],
|
|
"kwargs": {"fields": ["id", "name", "description", "stage_id", "user_id", "create_date"]}
|
|
}
|
|
result = self._rpc_call("/web/dataset/call_kw", params)
|
|
|
|
if isinstance(result, list) and len(result) > 0:
|
|
return result[0]
|
|
else:
|
|
print(f"Aucun ticket trouvé avec l'ID {ticket_id} ou une erreur est survenue.")
|
|
return {}
|
|
|
|
def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]:
|
|
params = {
|
|
"model": self.model_name,
|
|
"method": "search_read",
|
|
"args": [[["code", "=", ticket_code]], ["id", "name", "description", "stage_id", "user_id", "create_date"]],
|
|
"kwargs": {"limit": 1}
|
|
}
|
|
result = self._rpc_call("/web/dataset/call_kw", params)
|
|
|
|
if isinstance(result, list) and len(result) > 0:
|
|
return result[0]
|
|
else:
|
|
print(f"Aucun ticket trouvé avec le code {ticket_code} ou une erreur est survenue.")
|
|
return {}
|
|
|
|
def get_ticket_attachments(self, ticket_id: int) -> List[Dict[str, Any]]:
|
|
attachments = self._rpc_call("/web/dataset/call_kw", {
|
|
"model": "ir.attachment",
|
|
"method": "search_read",
|
|
"args": [[["res_id", "=", ticket_id], ["res_model", "=", self.model_name]]],
|
|
"kwargs": {"fields": ["id", "name", "datas", "mimetype"]}
|
|
})
|
|
return attachments if isinstance(attachments, list) else []
|
|
|
|
def clean_html(self, html_content: str) -> str:
|
|
if BeautifulSoup:
|
|
soup = BeautifulSoup(html_content, "html.parser")
|
|
for element in soup(['style', 'script', 'footer', 'header']):
|
|
element.extract()
|
|
text = soup.get_text(separator=' ', strip=True)
|
|
else:
|
|
text = re.sub(r'<.*?>', '', html_content)
|
|
|
|
text = unescape(text)
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
|
|
return text
|
|
|
|
def save_json(self, data: Any, path: str):
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
def extract_ticket_data(self, ticket_id: int, output_dir: str):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
ticket_data = self.get_ticket(ticket_id)
|
|
if not ticket_data:
|
|
print("Erreur: Ticket non trouvé.")
|
|
return None
|
|
|
|
ticket_info_path = os.path.join(output_dir, "ticket_info.json")
|
|
self.save_json(ticket_data, ticket_info_path)
|
|
|
|
attachments = self.get_ticket_attachments(ticket_id)
|
|
attachment_list = []
|
|
attachments_dir = os.path.join(output_dir, "attachments")
|
|
os.makedirs(attachments_dir, exist_ok=True)
|
|
|
|
for attachment in attachments:
|
|
if "datas" in attachment and attachment["datas"]:
|
|
file_name = f"{attachment['id']}_{attachment['name']}"
|
|
file_path = os.path.join(attachments_dir, file_name)
|
|
|
|
with open(file_path, "wb") as f:
|
|
f.write(base64.b64decode(attachment["datas"]))
|
|
|
|
attachment_list.append({
|
|
"id": attachment["id"],
|
|
"name": attachment["name"],
|
|
"file_path": file_path,
|
|
"mimetype": attachment.get("mimetype")
|
|
})
|
|
|
|
attachments_info_path = os.path.join(output_dir, "attachments_info.json")
|
|
self.save_json(attachment_list, attachments_info_path)
|
|
|
|
structure = {
|
|
"date_extraction": datetime.now().isoformat(),
|
|
"ticket_dir": output_dir,
|
|
"fichiers_json": [
|
|
"ticket_info.json",
|
|
"attachments_info.json"
|
|
]
|
|
}
|
|
structure_path = os.path.join(output_dir, "structure.json")
|
|
self.save_json(structure, structure_path)
|
|
|
|
return {
|
|
"ticket_info": ticket_info_path,
|
|
"attachments_info": attachments_info_path,
|
|
"ticket_data_file": structure_path,
|
|
}
|