mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-13 10:46:51 +01:00
150 lines
5.4 KiB
Python
150 lines
5.4 KiB
Python
from .base_llm import BaseLLM
|
|
import requests
|
|
import os
|
|
import base64
|
|
from PIL import Image
|
|
import io
|
|
from datetime import datetime
|
|
|
|
class PixtralLarge(BaseLLM):
|
|
|
|
def __init__(self):
|
|
super().__init__("pixtral-large-latest")
|
|
self.configurer(temperature=0.2, top_p=1, n=1)
|
|
|
|
def urlBase(self) -> str:
|
|
return "https://api.mistral.ai/v1/"
|
|
|
|
def cleAPI(self) -> str:
|
|
return "2iGzTzE9csRQ9IoASoUjplHwEjA200Vh"
|
|
|
|
def urlFonction(self) -> str:
|
|
return "chat/completions"
|
|
|
|
def _preparer_contenu(self, question: str) -> dict:
|
|
return {
|
|
"model": self.modele,
|
|
"messages": [
|
|
{"role": "system", "content": self.prompt_system},
|
|
{"role": "user", "content": question}
|
|
],
|
|
**self.params
|
|
}
|
|
|
|
def _traiter_reponse(self, reponse: requests.Response) -> str:
|
|
data = reponse.json()
|
|
return data["choices"][0]["message"]["content"]
|
|
|
|
def _encoder_image_base64(self, image_path: str) -> str:
|
|
"""
|
|
Encode une image en base64 pour l'API.
|
|
|
|
Args:
|
|
image_path: Chemin vers l'image à encoder
|
|
|
|
Returns:
|
|
Image encodée en base64 avec préfixe approprié
|
|
"""
|
|
if not os.path.isfile(image_path):
|
|
raise FileNotFoundError(f"L'image {image_path} n'a pas été trouvée")
|
|
|
|
try:
|
|
# Ouvrir l'image et la redimensionner si trop grande
|
|
with Image.open(image_path) as img:
|
|
# Redimensionner l'image si elle est trop grande (max 800x800)
|
|
max_size = 800
|
|
if img.width > max_size or img.height > max_size:
|
|
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
|
|
|
|
# Convertir en RGB si nécessaire (pour les formats comme PNG)
|
|
if img.mode != "RGB":
|
|
img = img.convert("RGB")
|
|
|
|
# Sauvegarder l'image en JPEG dans un buffer mémoire
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format="JPEG", quality=85)
|
|
buffer.seek(0)
|
|
|
|
# Encoder en base64
|
|
encoded_string = base64.b64encode(buffer.read()).decode("utf-8")
|
|
except Exception as e:
|
|
# Si échec avec PIL, essayer avec la méthode simple
|
|
with open(image_path, "rb") as image_file:
|
|
encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
|
|
|
|
# Détecter le type de fichier
|
|
file_extension = os.path.splitext(image_path)[1].lower()
|
|
if file_extension in ['.jpg', '.jpeg']:
|
|
mime_type = 'image/jpeg'
|
|
elif file_extension == '.png':
|
|
mime_type = 'image/png'
|
|
elif file_extension == '.gif':
|
|
mime_type = 'image/gif'
|
|
elif file_extension in ['.webp']:
|
|
mime_type = 'image/webp'
|
|
else:
|
|
# Par défaut, on suppose JPEG
|
|
mime_type = 'image/jpeg'
|
|
|
|
return f"data:{mime_type};base64,{encoded_string}"
|
|
|
|
def interroger_avec_image(self, image_path: str, question: str) -> str:
|
|
"""
|
|
Analyse une image avec le modèle Pixtral
|
|
|
|
Args:
|
|
image_path: Chemin vers l'image à analyser
|
|
question: Question ou instructions pour l'analyse
|
|
|
|
Returns:
|
|
Réponse générée par le modèle
|
|
"""
|
|
url = self.urlBase() + self.urlFonction()
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {self.cleAPI()}"
|
|
}
|
|
|
|
try:
|
|
# Encoder l'image en base64
|
|
encoded_image = self._encoder_image_base64(image_path)
|
|
|
|
# Préparer le contenu avec l'image
|
|
contenu = {
|
|
"model": self.modele,
|
|
"messages": [
|
|
{"role": "system", "content": self.prompt_system},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": question},
|
|
{"type": "image_url", "image_url": {"url": encoded_image}}
|
|
]
|
|
}
|
|
],
|
|
**self.params
|
|
}
|
|
|
|
self.heureDepart = datetime.now()
|
|
|
|
# Envoyer la requête
|
|
response = requests.post(url=url, headers=headers, json=contenu, timeout=180) # Timeout plus long pour les images
|
|
|
|
self.heureFin = datetime.now()
|
|
if self.heureDepart is not None and self.heureFin is not None:
|
|
self.dureeTraitement = self.heureFin - self.heureDepart
|
|
|
|
if response.status_code in [200, 201]:
|
|
self.reponseErreur = False
|
|
return self._traiter_reponse(response)
|
|
else:
|
|
self.reponseErreur = True
|
|
return f"Erreur API ({response.status_code}): {response.text}"
|
|
|
|
except Exception as e:
|
|
self.heureFin = datetime.now()
|
|
if self.heureDepart is not None and self.heureFin is not None:
|
|
self.dureeTraitement = self.heureFin - self.heureDepart
|
|
self.reponseErreur = True
|
|
return f"Erreur lors de l'analyse de l'image: {str(e)}"
|