This commit is contained in:
Ladebeze66 2025-04-03 15:40:07 +02:00
parent 3bf32de480
commit 63144d70d2
1489 changed files with 14447 additions and 4559 deletions

109
README.md
View File

@ -1,89 +1,80 @@
# Gestionnaire de Tickets Odoo
# Système d'extraction de tickets Odoo
Cet outil permet d'extraire des tickets de support depuis Odoo et de les sauvegarder localement.
## Prérequis
- Python 3.6+
- Accès à une instance Odoo
- Fichier de configuration avec les identifiants
Ce projet permet d'extraire les informations des tickets Odoo (tâches, tickets de support) avec leurs messages et pièces jointes, et de les sauvegarder dans une structure organisée.
## Installation
Aucune installation particulière n'est requise. Clonez simplement ce dépôt et assurez-vous que les dépendances Python sont installées :
```bash
pip install requests
```
1. Clonez le dépôt
2. Créez un environnement virtuel :
```bash
python3 -m venv venv
source venv/bin/activate # Sur Linux/Mac
# ou
venv\Scripts\activate # Sur Windows
```
3. Installez les dépendances :
```bash
pip install -r requirements.txt
```
## Configuration
Le fichier `config.json` à la racine du projet doit contenir les informations de connexion Odoo :
Créez un fichier `config.json` basé sur le modèle `config.template.json` :
```json
{
"odoo": {
"url": "https://votre-instance-odoo.com",
"db": "nom_de_la_base",
"username": "utilisateur@example.com",
"api_key": "votre_clé_api_ou_mot_de_passe"
},
"output_dir": "output"
"odoo_url": "https://votre-instance.odoo.com",
"odoo_db": "nom_de_la_base",
"odoo_username": "votre_email@exemple.com",
"odoo_api_key": "votre_clé_api_odoo",
"output_dir": "ticket_structure"
}
```
## Utilisation
### Extraire un ticket
Pour extraire un ticket en utilisant son code, utilisez la commande :
Pour extraire un ticket, utilisez la commande :
```bash
./retrieve_ticket.py T0123
python -m utils.retrieve_ticket CODE_TICKET
```
`T0123` est le code du ticket à extraire.
Options disponibles :
- `--output`, `-o` : Répertoire de sortie (défaut: "ticket_structure")
- `--config`, `-c` : Chemin vers le fichier de configuration (défaut: "config.json")
- `--verbose`, `-v` : Activer le mode verbeux
- `--config` : Spécifier un fichier de configuration alternatif (par défaut: `config.json`)
- `--output-dir` : Spécifier un répertoire de sortie (par défaut: `output/ticket_CODE`)
- `--verbose` ou `-v` : Afficher plus d'informations pendant l'exécution
- `--keep-html` : Conserver le contenu HTML original dans les messages (désactivé par défaut)
- `--no-original` : Ne pas conserver le corps de message HTML original (désactivé par défaut)
- `--keep-all` : Conserver tous les messages, y compris ceux d'OdooBot et les messages vides (désactivé par défaut)
### Exemples
Extraire un ticket avec affichage détaillé :
Exemple :
```bash
./retrieve_ticket.py T0167 --verbose
python -m utils.retrieve_ticket T1234 --output mes_tickets --verbose
```
Extraire un ticket en conservant le contenu HTML :
```bash
./retrieve_ticket.py T0167 --keep-html
## Structure des fichiers générés
Pour chaque ticket extrait, un répertoire est créé avec la structure suivante :
```
CODE_TICKET_DATE/
├── all_messages.json # Messages traités au format JSON
├── all_messages.txt # Messages au format texte
├── attachments/ # Répertoire contenant les pièces jointes
├── attachments_info.json # Métadonnées des pièces jointes
├── extraction_summary.json # Résumé de l'extraction
├── messages_raw.json # Messages bruts
├── structure.json # Structure du répertoire
├── ticket_info.json # Données complètes du ticket
└── ticket_summary.json # Résumé du ticket
```
Extraire un ticket avec nettoyage des balises HTML sans conserver l'original :
```bash
./retrieve_ticket.py T0167 --no-original
```
## Gestionnaires disponibles
Extraire un ticket en conservant tous les messages (y compris OdooBot) :
```bash
./retrieve_ticket.py T0167 --keep-all
```
Le système est divisé en plusieurs gestionnaires :
## Structure des données extraites
- `AuthManager` : Gère l'authentification et les appels à l'API Odoo
- `TicketManager` : Gère la récupération des tickets et organise leur extraction
- `MessageManager` : Gère le traitement des messages (filtrage, nettoyage)
- `AttachmentManager` : Gère le téléchargement des pièces jointes
Pour chaque ticket extrait, le script crée un dossier contenant :
## Licence
- `ticket_info.json` : Informations générales sur le ticket
- `messages.json` : Messages associés au ticket
- Par défaut, le contenu HTML est nettoyé, les messages d'OdooBot sont supprimés et le texte original est conservé dans `body_original`
- Avec `--keep-html`, le contenu HTML est conservé tel quel
- Avec `--no-original`, seule la version nettoyée est conservée
- Avec `--keep-all`, tous les messages sont conservés (y compris OdooBot et messages vides)
- `attachments_info.json` : Métadonnées des pièces jointes
- `attachments/` : Dossier contenant les fichiers des pièces jointes
Ce projet est sous licence MIT.

View File

@ -9,4 +9,4 @@
"api_key": "your_mistral_api_key"
},
"output_dir": "output"
}
}

7
config.template.json Normal file
View File

@ -0,0 +1,7 @@
{
"odoo_url": "https://exemple.odoo.com",
"odoo_db": "exemple_db",
"odoo_username": "utilisateur@exemple.com",
"odoo_api_key": "votre_clé_api",
"output_dir": "ticket_structure"
}

View File

@ -1,3 +0,0 @@
Original Author
---------------
Sébastien Alix <sebastien.alix@osiell.com>, <seb@usr-src.org>

View File

@ -1,165 +0,0 @@
GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.

View File

@ -1,163 +0,0 @@
Metadata-Version: 2.1
Name: OdooRPC
Version: 0.10.1
Summary: OdooRPC is a Python package providing an easy way to pilot your Odoo servers through RPC.
Home-page: https://github.com/OCA/odoorpc
Author: Sebastien Alix
Author-email: seb@usr-src.org
License: LGPL v3
Keywords: openerp odoo server rpc client xml-rpc xmlrpc jsonrpc json-rpc odoorpc oerplib communication lib library python service web webservice
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Framework :: Odoo
Description-Content-Type: text/x-rst
License-File: LICENSE
License-File: AUTHORS
=======
OdooRPC
=======
.. image:: https://img.shields.io/pypi/v/OdooRPC.svg
:target: https://pypi.python.org/pypi/OdooRPC/
:alt: Latest Version
.. image:: https://travis-ci.org/OCA/odoorpc.svg?branch=master
:target: https://travis-ci.org/OCA/odoorpc
:alt: Build Status
.. image:: https://img.shields.io/pypi/pyversions/OdooRPC.svg
:target: https://pypi.python.org/pypi/OdooRPC/
:alt: Supported Python versions
.. image:: https://img.shields.io/pypi/l/OdooRPC.svg
:target: https://pypi.python.org/pypi/OdooRPC/
:alt: License
**OdooRPC** is a Python package providing an easy way to
pilot your **Odoo** servers through `RPC`.
Features supported:
- access to all data model methods (even ``browse``) with an API similar
to the server-side API,
- use named parameters with model methods,
- user context automatically sent providing support for
internationalization,
- browse records,
- execute workflows,
- manage databases,
- reports downloading,
- JSON-RPC protocol (SSL supported),
How does it work? See below:
.. code-block:: python
import odoorpc
# Prepare the connection to the server
odoo = odoorpc.ODOO('localhost', port=8069)
# Check available databases
print(odoo.db.list())
# Login
odoo.login('db_name', 'user', 'passwd')
# Current user
user = odoo.env.user
print(user.name) # name of the user connected
print(user.company_id.name) # the name of its company
# Simple 'raw' query
user_data = odoo.execute('res.users', 'read', [user.id])
print(user_data)
# Use all methods of a model
if 'sale.order' in odoo.env:
Order = odoo.env['sale.order']
order_ids = Order.search([])
for order in Order.browse(order_ids):
print(order.name)
products = [line.product_id.name for line in order.order_line]
print(products)
# Update data through a record
user.name = "Brian Jones"
See the documentation for more details and features.
Supported Odoo server versions
==============================
`OdooRPC` is tested on all major releases of `Odoo` (starting from 8.0).
Supported Python versions
=========================
`OdooRPC` support Python 2.7, 3.7+.
License
=======
This software is made available under the `LGPL v3` license.
Generate the documentation
==========================
To generate the documentation, you have to install `Sphinx` documentation
generator::
pip install sphinx
Then, you can use the ``build_doc`` option of the ``setup.py``::
python setup.py build_doc
The generated documentation will be in the ``./doc/build/html`` directory.
Changes in this version
=======================
Consult the ``CHANGELOG`` file.
Bug Tracker
===========
Bugs are tracked on `GitHub Issues
<https://github.com/OCA/odoorpc/issues>`_. In case of trouble, please
check there if your issue has already been reported. If you spotted it first,
help us smash it by providing detailed and welcomed feedback.
Credits
=======
Contributors
------------
* Sébastien Alix <sebastien.alix@osiell.com>
Do not contact contributors directly about support or help with technical issues.
Maintainer
----------
.. image:: https://odoo-community.org/logo.png
:alt: Odoo Community Association
:target: https://odoo-community.org
This package is maintained by the OCA.
OCA, or the Odoo Community Association, is a nonprofit organization whose
mission is to support the collaborative development of Odoo features and
promote its widespread use.

View File

@ -1,34 +0,0 @@
OdooRPC-0.10.1.dist-info/AUTHORS,sha256=Kjdl6zj2iQulcwF4iADsfzyuusIPWLKsRK9rM2Bh4TY,95
OdooRPC-0.10.1.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
OdooRPC-0.10.1.dist-info/LICENSE,sha256=2n6rt7r999OuXp8iOqW9we7ORaxWncIbOwN1ILRGR2g,7651
OdooRPC-0.10.1.dist-info/METADATA,sha256=UuFVcRgJiOT8MOZ9sREZ4ebCik2JUuM8yckCO1HP9so,4803
OdooRPC-0.10.1.dist-info/RECORD,,
OdooRPC-0.10.1.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
OdooRPC-0.10.1.dist-info/WHEEL,sha256=m9WAupmBd2JGDsXWQGJgMGXIWbQY3F5c2xBJbBhq0nY,110
OdooRPC-0.10.1.dist-info/top_level.txt,sha256=qdAy2XwLvCFM_VdG79vIgP3UV43zLZmvNqbdk4L1b6E,8
odoorpc/__init__.py,sha256=Zk5CzvWtqvlGWNupLWb8OJQh69KdB9Mv8wfnmfTHVf8,2495
odoorpc/__pycache__/__init__.cpython-312.pyc,,
odoorpc/__pycache__/db.cpython-312.pyc,,
odoorpc/__pycache__/env.cpython-312.pyc,,
odoorpc/__pycache__/error.cpython-312.pyc,,
odoorpc/__pycache__/fields.cpython-312.pyc,,
odoorpc/__pycache__/models.cpython-312.pyc,,
odoorpc/__pycache__/odoo.cpython-312.pyc,,
odoorpc/__pycache__/report.cpython-312.pyc,,
odoorpc/__pycache__/session.cpython-312.pyc,,
odoorpc/__pycache__/tools.cpython-312.pyc,,
odoorpc/db.py,sha256=cBZzZvnNc5lBC-InKFfRGTBH4psG5mZJ8UOl0GDXt9k,10178
odoorpc/env.py,sha256=ncP9TnvCwtrD4aHcsv4rSeMXaXTUNajUgYAwQeAWXwQ,10119
odoorpc/error.py,sha256=QkGjqv5Y0aHxvtuV7oRiFbNhAXz8AK1srmMRLIc0gfU,3284
odoorpc/fields.py,sha256=Kf5af_m0TDz0k4lKFJLv75YUsu8ClwUOcsKWbTv8EHU,27004
odoorpc/models.py,sha256=4gsHOcqp8vhN4N9U66B5cnleSbf5gO93gqn7jEZN7Lc,15034
odoorpc/odoo.py,sha256=UQWQCJppn05XDOgpAdMRKXZEHH6Dv-LkFd6heJaAZ1w,22740
odoorpc/report.py,sha256=zF_XJDNyDmRDiMVjjQZtgnTBg4iFZZakrw6nUvE8U5k,7396
odoorpc/rpc/__init__.py,sha256=DFNJYDtwlCHo1d6xBAKV4bXziVoBJLJ8b-Bu85xIgvs,9465
odoorpc/rpc/__pycache__/__init__.cpython-312.pyc,,
odoorpc/rpc/__pycache__/error.cpython-312.pyc,,
odoorpc/rpc/__pycache__/jsonrpclib.cpython-312.pyc,,
odoorpc/rpc/error.py,sha256=LOb2kvZmXNGy5ZWw6W6UKWvF75YqmcVvL017budrnts,349
odoorpc/rpc/jsonrpclib.py,sha256=oY0eChMXUinC5YFjUcUO5ZWqt4ar9Dq2X0TJiFnpGb0,5342
odoorpc/session.py,sha256=YXGVVTKCZMzGCwxoGGeo_XDO04JK2rojrji7o9TuWC8,5567
odoorpc/tools.py,sha256=yYvMIreEDgZKSoQhZYD6W4xZpY2XppbTnttqHMR1i2w,3539

View File

@ -1,45 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""The `odoorpc` module defines the :class:`ODOO` class.
The :class:`ODOO` class is the entry point to manage `Odoo` servers.
You can use this one to write `Python` programs that performs a variety of
automated jobs that communicate with a `Odoo` server.
Here's a sample session using this module::
>>> import odoorpc
>>> odoo = odoorpc.ODOO('localhost', port=8069) # connect to localhost, default port
>>> odoo.login('dbname', 'admin', 'admin')
To catch debug logs of OdooRPC from your own code, you have to configure
a logger the way you want with a log level set to `DEBUG`::
>>> import logging
>>> logging.basicConfig()
>>> logger = logging.getLogger('odoorpc')
>>> logger.setLevel(logging.DEBUG)
Then all queries generated by OdooRPC will be logged::
>>> import odoorpc
>>> odoo = odoorpc.ODOO()
>>> odoo.login('dbname', 'admin', 'admin')
DEBUG:odoorpc.rpc.jsonrpclib:(JSON,send) http://localhost:8069/web/session/authenticate {'jsonrpc': '2.0', 'id': 499807971, 'method': 'call', 'params': {'db': 'dbname', 'login': 'admin', 'password': '**********'}}
DEBUG:odoorpc.rpc.jsonrpclib:(JSON,recv) http://localhost:8069/web/session/authenticate {'jsonrpc': '2.0', 'id': 499807971, 'method': 'call', 'params': {'db': 'dbname', 'login': 'admin', 'password': '**********'}} => {'result': {'is_admin': True, 'server_version': '12.0-20181008', 'currencies': {'2': {'digits': [69, 2], 'position': 'before', 'symbol': '$'}, '1': {'digits': [69, 2], 'position': 'after', 'symbol': ''}}, 'partner_display_name': 'YourCompany, Mitchell Admin', 'company_id': 1, 'username': 'admin', 'web_tours': [], 'user_companies': False, 'session_id': '61cb37d21771531f789bea631a03236aa21f06d4', 'is_system': True, 'server_version_info': [12, 0, 0, 'final', 0, ''], 'db': 'odoorpc_v12', 'name': 'Mitchell Admin', 'web.base.url': 'http://localhost:8069', 'user_context': {'lang': 'fr_FR', 'tz': 'Europe/Brussels', 'uid': 2}, 'odoobot_initialized': True, 'show_effect': 'True', 'partner_id': 3, 'uid': 2}, 'id': 499807971, 'jsonrpc': '2.0'}
"""
__author__ = 'ABF Osiell - Sebastien Alix'
__email__ = 'sebastien.alix@osiell.com'
__licence__ = 'LGPL v3'
__version__ = '0.10.1'
__all__ = ['ODOO', 'error']
import logging
from odoorpc import error
from odoorpc.odoo import ODOO
logging.getLogger(__name__).addHandler(logging.NullHandler())

View File

@ -1,316 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""Provide the :class:`DB` class to manage the server databases."""
import base64
import io
import sys
from odoorpc import error
from odoorpc.tools import v
# Python 2
if sys.version_info[0] < 3:
def encode2bytes(data):
return data
# Python >= 3
else:
def encode2bytes(data):
return bytes(data, 'ascii')
class DB(object):
"""The `DB` class represents the database management service.
It provides functionalities such as list, create, drop, dump
and restore databases.
.. note::
This service have to be used through the :attr:`odoorpc.ODOO.db`
property.
>>> import odoorpc
>>> odoo = odoorpc.ODOO('localhost') # doctest: +SKIP
>>> odoo.db
<odoorpc.db.DB object at 0x...>
"""
def __init__(self, odoo):
self._odoo = odoo
def dump(self, password, db, format_='zip'):
"""Backup the `db` database. Returns the dump as a binary ZIP file
containing the SQL dump file alongside the filestore directory (if any).
>>> dump = odoo.db.dump('super_admin_passwd', 'prod') # doctest: +SKIP
.. doctest::
:hide:
>>> dump = odoo.db.dump(SUPER_PWD, DB)
If you get a timeout error, increase this one before performing the
request:
>>> timeout_backup = odoo.config['timeout']
>>> odoo.config['timeout'] = 600 # Timeout set to 10 minutes
>>> dump = odoo.db.dump('super_admin_passwd', 'prod') # doctest: +SKIP
>>> odoo.config['timeout'] = timeout_backup
Write it on the file system:
.. doctest::
:options: +SKIP
>>> with open('dump.zip', 'wb') as dump_zip:
... dump_zip.write(dump.read())
...
.. doctest::
:hide:
>>> with open('dump.zip', 'wb') as dump_zip:
... fileno = dump_zip.write(dump.read()) # Python 3
...
You can manipulate the file with the `zipfile` module for instance:
.. doctest::
:options: +SKIP
>>> import zipfile
>>> zipfile.ZipFile('dump.zip').namelist()
['dump.sql',
'filestore/ef/ef2c882a36dbe90fc1e7e28d816ad1ac1464cfbb',
'filestore/dc/dcf00aacce882bbfd117c0277e514f829b4c5bf0',
...]
.. doctest::
:hide:
>>> import zipfile
>>> zipfile.ZipFile('dump.zip').namelist() # doctest: +NORMALIZE_WHITESPACE
['dump.sql'...'filestore/...'...]
The super administrator password is required to perform this method.
*Python 2:*
:return: `io.BytesIO`
:raise: :class:`odoorpc.error.RPCError` (access denied / wrong database)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: `io.BytesIO`
:raise: :class:`odoorpc.error.RPCError` (access denied / wrong database)
:raise: `urllib.error.URLError` (connection error)
"""
args = [password, db]
if v(self._odoo.version)[0] >= 9:
args.append(format_)
data = self._odoo.json(
'/jsonrpc', {'service': 'db', 'method': 'dump', 'args': args}
)
# Encode to bytes forced to be compatible with Python 3.2
# (its 'base64.standard_b64decode()' function only accepts bytes)
result = encode2bytes(data['result'])
content = base64.standard_b64decode(result)
return io.BytesIO(content)
def change_password(self, password, new_password):
"""Change the administrator password by `new_password`.
>>> odoo.db.change_password('super_admin_passwd', 'new_admin_passwd') # doctest: +SKIP
.. doctest:
:hide:
>>> odoo.db.change_password(SUPER_PWD, 'new_admin_passwd')
>>> odoo.db.change_password('new_admin_passwd', SUPER_PWD)
The super administrator password is required to perform this method.
*Python 2:*
:raise: :class:`odoorpc.error.RPCError` (access denied)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:raise: :class:`odoorpc.error.RPCError` (access denied)
:raise: `urllib.error.URLError` (connection error)
"""
self._odoo.json(
'/jsonrpc',
{
'service': 'db',
'method': 'change_admin_password',
'args': [password, new_password],
},
)
def create(
self, password, db, demo=False, lang='en_US', admin_password='admin'
):
"""Request the server to create a new database named `db`
which will have `admin_password` as administrator password and
localized with the `lang` parameter.
You have to set the flag `demo` to `True` in order to insert
demonstration data.
>>> odoo.db.create('super_admin_passwd', 'prod', False, 'fr_FR', 'my_admin_passwd') # doctest: +SKIP
If you get a timeout error, increase this one before performing the
request:
>>> timeout_backup = odoo.config['timeout']
>>> odoo.config['timeout'] = 600 # Timeout set to 10 minutes
>>> odoo.db.create('super_admin_passwd', 'prod', False, 'fr_FR', 'my_admin_passwd') # doctest: +SKIP
>>> odoo.config['timeout'] = timeout_backup
The super administrator password is required to perform this method.
*Python 2:*
:raise: :class:`odoorpc.error.RPCError` (access denied)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:raise: :class:`odoorpc.error.RPCError` (access denied)
:raise: `urllib.error.URLError` (connection error)
"""
self._odoo.json(
'/jsonrpc',
{
'service': 'db',
'method': 'create_database',
'args': [password, db, demo, lang, admin_password],
},
)
def drop(self, password, db):
"""Drop the `db` database. Returns `True` if the database was removed,
`False` otherwise (database did not exist):
>>> odoo.db.drop('super_admin_passwd', 'test') # doctest: +SKIP
True
The super administrator password is required to perform this method.
*Python 2:*
:return: `True` or `False`
:raise: :class:`odoorpc.error.RPCError` (access denied)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: `True` or `False`
:raise: :class:`odoorpc.error.RPCError` (access denied)
:raise: `urllib.error.URLError` (connection error)
"""
if self._odoo._env and self._odoo._env.db == db:
# Remove the existing session to avoid HTTP session error
self._odoo.logout()
data = self._odoo.json(
'/jsonrpc',
{'service': 'db', 'method': 'drop', 'args': [password, db]},
)
return data['result']
def duplicate(self, password, db, new_db):
"""Duplicate `db' as `new_db`.
>>> odoo.db.duplicate('super_admin_passwd', 'prod', 'test') # doctest: +SKIP
The super administrator password is required to perform this method.
*Python 2:*
:raise: :class:`odoorpc.error.RPCError` (access denied / wrong database)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:raise: :class:`odoorpc.error.RPCError` (access denied / wrong database)
:raise: `urllib.error.URLError` (connection error)
"""
self._odoo.json(
'/jsonrpc',
{
'service': 'db',
'method': 'duplicate_database',
'args': [password, db, new_db],
},
)
def list(self):
"""Return the list of the databases:
>>> odoo.db.list() # doctest: +SKIP
['prod', 'test']
*Python 2:*
:return: `list` of database names
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: `list` of database names
:raise: `urllib.error.URLError` (connection error)
"""
data = self._odoo.json(
'/jsonrpc', {'service': 'db', 'method': 'list', 'args': []}
)
return data.get('result', [])
def restore(self, password, db, dump, copy=False):
"""Restore the `dump` database into the new `db` database.
The `dump` file object can be obtained with the
:func:`dump <DB.dump>` method.
If `copy` is set to `True`, the restored database will have a new UUID.
>>> odoo.db.restore('super_admin_passwd', 'test', dump_file) # doctest: +SKIP
If you get a timeout error, increase this one before performing the
request:
>>> timeout_backup = odoo.config['timeout']
>>> odoo.config['timeout'] = 7200 # Timeout set to 2 hours
>>> odoo.db.restore('super_admin_passwd', 'test', dump_file) # doctest: +SKIP
>>> odoo.config['timeout'] = timeout_backup
The super administrator password is required to perform this method.
*Python 2:*
:raise: :class:`odoorpc.error.RPCError`
(access denied / database already exists)
:raise: :class:`odoorpc.error.InternalError` (dump file closed)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:raise: :class:`odoorpc.error.RPCError`
(access denied / database already exists)
:raise: :class:`odoorpc.error.InternalError` (dump file closed)
:raise: `urllib.error.URLError` (connection error)
"""
if dump.closed:
raise error.InternalError("Dump file closed")
b64_data = base64.standard_b64encode(dump.read()).decode()
self._odoo.json(
'/jsonrpc',
{
'service': 'db',
'method': 'restore',
'args': [password, db, b64_data, copy],
},
)

View File

@ -1,328 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""Supply the :class:`Environment` class to manage records more efficiently."""
import sys
import weakref
from odoorpc import fields
from odoorpc.models import Model
from odoorpc.tools import v
FIELDS_RESERVED = ['id', 'ids', '__odoo__', '__osv__', '__data__', 'env']
class Environment(object):
"""An environment wraps data like the user ID, context or current database
name, and provides an access to data model proxies.
.. doctest::
:options: +SKIP
>>> import odoorpc
>>> odoo = odoorpc.ODOO('localhost')
>>> odoo.login('db_name', 'admin', 'password')
>>> odoo.env
Environment(db='db_name', uid=1, context={'lang': 'fr_FR', 'tz': 'Europe/Brussels', 'uid': 1})
.. doctest::
:hide:
>>> odoo.env
Environment(db=..., uid=..., context=...)
"""
def __init__(self, odoo, db, uid, context):
self._odoo = odoo
self._db = db
self._uid = uid
self._context = context
self._registry = {}
self._dirty = weakref.WeakSet() # set of records updated locally
def __repr__(self):
return "Environment(db={}, uid={}, context={})".format(
repr(self._db), self._uid, self._context
)
@property
def dirty(self):
"""
.. warning::
This property is used internally and should not be used directly.
As such, it should not be referenced in the user documentation.
List records having local changes.
These changes can be committed to the server with the :func:`commit`
method, or invalidated with :func:`invalidate`.
"""
return self._dirty
@property
def context(self):
"""The context of the user connected.
.. doctest::
:options: +SKIP
>>> odoo.env.context
{'lang': 'en_US', 'tz': 'Europe/Brussels', 'uid': 2}
.. doctest::
:hide:
>>> from pprint import pprint as pp
>>> pp(odoo.env.context)
{'lang': 'en_US', 'tz': 'Europe/Brussels', 'uid': ...}
"""
return self._context
@property
def db(self):
"""The database currently used.
.. doctest::
:options: +SKIP
>>> odoo.env.db
'db_name'
.. doctest::
:hide:
>>> odoo.env.db == DB
True
"""
return self._db
def commit(self):
"""Commit dirty records to the server. This method is automatically
called when the `auto_commit` option is set to `True` (default).
It can be useful to set the former option to `False` to get better
performance by reducing the number of RPC requests generated.
With `auto_commit` set to `True` (default behaviour), each time a value
is set on a record field a RPC request is sent to the server to update
the record:
.. doctest::
>>> user = odoo.env.user
>>> user.name = "Joe" # write({'name': "Joe"})
>>> user.email = "joe@odoo.net" # write({'email': "joe@odoo.net"})
With `auto_commit` set to `False`, changes on a record are sent all at
once when calling the :func:`commit` method:
.. doctest::
>>> odoo.config['auto_commit'] = False
>>> user = odoo.env.user
>>> user.name = "Joe"
>>> user.email = "joe@odoo.net"
>>> user in odoo.env.dirty
True
>>> odoo.env.commit() # write({'name': "Joe", 'email': "joe@odoo.net"})
>>> user in odoo.env.dirty
False
Only one RPC request is generated in the last case.
"""
# Iterate on a new set, as we remove record during iteration from the
# original one
for record in set(self.dirty):
values = {}
for field in record._values_to_write:
if record.id in record._values_to_write[field]:
value = record._values_to_write[field].pop(record.id)
values[field] = value
# Store the value in the '_values' dictionary. This
# operation is delegated to each field descriptor as some
# values can not be stored "as is" (e.g. magic tuples of
# 2many fields need to be converted)
record.__class__.__dict__[field].store(record, value)
record.write(values)
self.dirty.remove(record)
def invalidate(self):
"""Invalidate the cache of records."""
self.dirty.clear()
@property
def lang(self):
"""Return the current language code.
.. doctest::
>>> odoo.env.lang
'en_US'
"""
return self.context.get('lang', False)
def ref(self, xml_id):
"""Return the record corresponding to the given `xml_id` (also called
external ID).
Raise an :class:`RPCError <odoorpc.error.RPCError>` if no record
is found.
.. doctest::
>>> odoo.env.ref('base.lang_en')
Recordset('res.lang', [1])
:return: a :class:`odoorpc.models.Model` instance (recordset)
:raise: :class:`odoorpc.error.RPCError`
"""
if v(self._odoo.version)[0] < 15:
model, id_ = self._odoo.execute(
"ir.model.data", "xmlid_to_res_model_res_id", xml_id, True
)
module, name = xml_id.split(".", 1)
model, id_ = self._odoo.execute(
"ir.model.data", "check_object_reference", module, name, True
)
return self[model].browse(id_)
@property
def uid(self):
"""The user ID currently logged.
.. doctest::
:options: +SKIP
>>> odoo.env.uid
1
.. doctest::
:hide:
>>> odoo.env.uid in [1, 2]
True
"""
return self._uid
@property
def user(self):
"""Return the current user (as a record).
.. doctest::
:options: +SKIP
>>> user = odoo.env.user
>>> user
Recordset('res.users', [2])
>>> user.name
'Mitchell Admin'
.. doctest::
:hide:
>>> user = odoo.env.user
>>> user.id in [1, 2]
True
>>> 'Admin' in user.name
True
:return: a :class:`odoorpc.models.Model` instance
:raise: :class:`odoorpc.error.RPCError`
"""
return self['res.users'].browse(self.uid)
@property
def registry(self):
"""The data model registry. It is a mapping between a model name and
its corresponding proxy used to generate records.
As soon as a model is needed the proxy is added to the registry. This
way the model proxy is ready for a further use (avoiding costly `RPC`
queries when browsing records through relations).
.. doctest::
:hide:
>>> odoo.env.registry.clear()
>>> odoo.env.registry
{}
>>> odoo.env.user.company_id.name # 'res.users' and 'res.company' Model proxies will be fetched
'YourCompany'
>>> from pprint import pprint
>>> pprint(odoo.env.registry)
{'res.company': Model('res.company'), 'res.users': Model('res.users')}
If you need to regenerate the model proxy, simply delete it from the
registry:
>>> del odoo.env.registry['res.company']
To delete all model proxies:
>>> odoo.env.registry.clear()
>>> odoo.env.registry
{}
"""
return self._registry
def __getitem__(self, model):
"""Return the model class corresponding to `model`.
>>> Partner = odoo.env['res.partner']
>>> Partner
Model('res.partner')
:return: a :class:`odoorpc.models.Model` class
"""
if model not in self.registry:
# self.registry[model] = Model(self._odoo, self, model)
self.registry[model] = self._create_model_class(model)
return self.registry[model]
def __call__(self, context=None):
"""Return an environment based on `self` with a different
user context.
"""
context = self.context if context is None else context
env = Environment(self._odoo, self._db, self._uid, context)
env._dirty = self._dirty
env._registry = self._registry
return env
def __contains__(self, model):
"""Check if the given `model` exists on the server.
>>> 'res.partner' in odoo.env
True
:return: `True` or `False`
"""
model_exists = self._odoo.execute(
'ir.model', 'search', [('model', '=', model)]
)
return bool(model_exists)
def _create_model_class(self, model):
"""Generate the model proxy class.
:return: a :class:`odoorpc.models.Model` class
"""
cls_name = model.replace('.', '_')
# Hack for Python 2 (no need to do this for Python 3)
if sys.version_info[0] < 3:
# noqa: F821
if isinstance(cls_name, unicode): # noqa: F821
cls_name = cls_name.encode('utf-8')
# Retrieve server fields info and generate corresponding local fields
attrs = {
'_env': self,
'_odoo': self._odoo,
'_name': model,
'_columns': {},
}
fields_get = self._odoo.execute(model, 'fields_get')
for field_name, field_data in fields_get.items():
if field_name not in FIELDS_RESERVED:
Field = fields.generate_field(field_name, field_data)
attrs['_columns'][field_name] = Field
attrs[field_name] = Field
return type(cls_name, (Model,), attrs)

View File

@ -1,96 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""This module contains all exceptions raised by `OdooRPC` when an error
occurred.
"""
import sys
class Error(Exception):
"""Base class for exception."""
pass
class RPCError(Error):
"""Exception raised for errors related to RPC queries.
Error details (like the `Odoo` server traceback) are available through the
`info` attribute:
.. doctest::
:options: +SKIP
>>> from pprint import pprint as pp
>>> try:
... odoo.execute('res.users', 'wrong_method')
... except odoorpc.error.RPCError as exc:
... pp(exc.info)
...
{'code': 200,
'data': {'arguments': ["type object 'res.users' has no attribute 'wrong_method'"],
'debug': 'Traceback (most recent call last):\\n File ...',
'exception_type': 'internal_error',
'message': "'res.users' object has no attribute 'wrong_method'",
'name': 'exceptions.AttributeError'}
'message': 'Odoo Server Error'}
.. doctest::
:hide:
>>> from pprint import pprint as pp
>>> try:
... odoo.execute('res.users', 'wrong_method')
... except odoorpc.error.RPCError as exc:
... exc.info['code'] == 200
... 'message' in exc.info
... exc.info['data']['arguments'] in [
... ["'res.users' object has no attribute 'wrong_method'"], # >= 8.0
... ["type object 'res.users' has no attribute 'wrong_method'"], # >= 10.0
... ]
... exc.info['data']['debug'].startswith('Traceback (most recent call last):\\n File')
... exc.info['data']['message'] in [
... "'res.users' object has no attribute 'wrong_method'", # >= 8.0
... "type object 'res.users' has no attribute 'wrong_method'", # >= 10.0
... ]
... exc.info['data']['name'] in [
... 'exceptions.AttributeError',
... 'builtins.AttributeError',
... ]
...
True
True
True
True
True
True
"""
def __init__(self, message, info=False):
# Ensure that the message is in unicode,
# to be compatible both with Python2 and 3
try:
message = message.decode('utf-8')
except (UnicodeEncodeError, AttributeError):
pass
super(Error, self).__init__(message, info)
self.info = info
def __str__(self):
# args[0] should always be a unicode object (see '__init__(...)')
if sys.version_info[0] < 3 and self.args and self.args[0]:
return self.args[0].encode('utf-8')
return self.args and self.args[0] or ''
def __unicode__(self):
# args[0] should always be a unicode object (see '__init__(...)')
return self.args and self.args[0] or u''
def __repr__(self):
return "{}({})".format(self.__class__.__name__, repr(self.args[0]))
class InternalError(Error):
"""Exception raised for errors occurring during an internal operation."""
pass

View File

@ -1,764 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""This module contains classes representing the fields supported by Odoo.
A field is a Python descriptor which defines getter/setter methods for
its related attribute.
"""
import datetime
import sys
# from odoorpc import error
from odoorpc.models import IncrementalRecords, Model
def is_int(value):
"""Return `True` if ``value`` is an integer."""
if isinstance(value, bool):
return False
try:
int(value)
return True
except (ValueError, TypeError):
return False
# Python 2
if sys.version_info[0] < 3:
def is_string(value):
"""Return `True` if ``value`` is a string."""
# noqa: F821
return isinstance(value, basestring) # noqa: F821
# Python >= 3
else:
def is_string(value):
"""Return `True` if ``value`` is a string."""
return isinstance(value, str)
def odoo_tuple_in(iterable):
"""Return `True` if `iterable` contains an expected tuple like
``(6, 0, IDS)`` (and so on).
>>> odoo_tuple_in([0, 1, 2]) # Simple list
False
>>> odoo_tuple_in([(6, 0, [42])]) # List of tuples
True
>>> odoo_tuple_in([[1, 42]]) # List of lists
True
"""
if not iterable:
return False
def is_odoo_tuple(elt):
"""Return `True` if `elt` is a Odoo special tuple."""
try:
return elt[:1][0] in [1, 2, 3, 4, 5] or elt[:2] in [
(6, 0),
[6, 0],
(0, 0),
[0, 0],
]
except (TypeError, IndexError):
return False
return any(is_odoo_tuple(elt) for elt in iterable)
def tuples2ids(tuples, ids):
"""Update `ids` according to `tuples`, e.g. (3, 0, X), (4, 0, X)..."""
for value in tuples:
if value[0] == 6 and value[2]:
ids = value[2]
elif value[0] == 5:
ids[:] = []
elif value[0] == 4 and value[1] and value[1] not in ids:
ids.append(value[1])
elif value[0] == 3 and value[1] and value[1] in ids:
ids.remove(value[1])
return ids
def records2ids(iterable):
"""Replace records contained in `iterable` with their corresponding IDs:
>>> groups = list(odoo.env.user.groups_id)
>>> records2ids(groups)
[1, 2, 3, 14, 17, 18, 19, 7, 8, 9, 5, 20, 21, 22, 23]
"""
def record2id(elt):
"""If `elt` is a record, return its ID."""
if isinstance(elt, Model):
return elt.id
return elt
return [record2id(elt) for elt in iterable]
class BaseField(object):
"""Field which all other fields inherit.
Manage common metadata.
"""
def __init__(self, name, data):
self.name = name
self.type = 'type' in data and data['type'] or False
self.string = 'string' in data and data['string'] or False
self.size = 'size' in data and data['size'] or False
self.required = 'required' in data and data['required'] or False
self.readonly = 'readonly' in data and data['readonly'] or False
self.help = 'help' in data and data['help'] or False
self.states = 'states' in data and data['states'] or False
def __get__(self, instance, owner):
pass
def __set__(self, instance, value):
"""Each time a record is modified, it is marked as dirty
in the environment.
"""
instance.env.dirty.add(instance)
if instance._odoo.config.get('auto_commit'):
instance.env.commit()
def __str__(self):
"""Return a human readable string representation of the field."""
attrs = [
'string',
'relation',
'required',
'readonly',
'size',
'domain',
]
attrs_rep = []
for attr in attrs:
if hasattr(self, attr):
value = getattr(self, attr)
if value:
if is_string(value):
attrs_rep.append("{}='{}'".format(attr, value))
else:
attrs_rep.append("{}={}".format(attr, value))
attrs_rep = ", ".join(attrs_rep)
return "{}({})".format(self.type, attrs_rep)
def check_required(self, value):
"""Check the value if the field is required.
Aim to be overridden by field classes.
:return: `True` if the value is accepted, `False` otherwise.
"""
return bool(value)
def check_value(self, value):
"""Check the validity of a value for the field."""
# if self.readonly:
# raise error.Error(
# "'{field_name}' field is readonly".format(
# field_name=self.name))
if value and self.size:
if not is_string(value):
raise ValueError("Value supplied has to be a string")
if len(value) > self.size:
raise ValueError(
"Lenght of the '{}' is limited to {}".format(
self.name, self.size
)
)
if self.required and not self.check_required(value):
raise ValueError("'{}' field is required".format(self.name))
return value
def store(self, record, value):
"""Store the value in the record."""
record._values[self.name][record.id] = value
class Binary(BaseField):
"""Equivalent of the `fields.Binary` class."""
def __init__(self, name, data):
super(Binary, self).__init__(name, data)
def __get__(self, instance, owner):
value = instance._values[self.name][instance.id]
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
return value
def __set__(self, instance, value):
if value is None:
value = False
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Binary, self).__set__(instance, value)
class Boolean(BaseField):
"""Equivalent of the `fields.Boolean` class."""
def __init__(self, name, data):
super(Boolean, self).__init__(name, data)
def __get__(self, instance, owner):
value = instance._values[self.name][instance.id]
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
return value
def __set__(self, instance, value):
value = bool(value)
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Boolean, self).__set__(instance, value)
class Char(BaseField):
"""Equivalent of the `fields.Char` class."""
def __init__(self, name, data):
super(Char, self).__init__(name, data)
def __get__(self, instance, owner):
value = instance._values[self.name].get(instance.id)
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
return value
def __set__(self, instance, value):
if value is None:
value = False
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Char, self).__set__(instance, value)
class Date(BaseField):
"""Represent the OpenObject 'fields.data'"""
pattern = "%Y-%m-%d"
def __init__(self, name, data):
super(Date, self).__init__(name, data)
def __get__(self, instance, owner):
value = instance._values[self.name].get(instance.id) or False
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
try:
res = datetime.datetime.strptime(value, self.pattern).date()
except (ValueError, TypeError):
res = value
return res
def __set__(self, instance, value):
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Date, self).__set__(instance, value)
def check_value(self, value):
super(Date, self).check_value(value)
if isinstance(value, datetime.date):
value = value.strftime("%Y-%m-%d")
elif is_string(value):
datetime.datetime.strptime(value, self.pattern)
elif isinstance(value, bool) or value is None:
return value
else:
raise ValueError("Expecting a datetime.date object or string")
return value
class Datetime(BaseField):
"""Represent the OpenObject 'fields.datetime'"""
pattern = "%Y-%m-%d %H:%M:%S"
def __init__(self, name, data):
super(Datetime, self).__init__(name, data)
def __get__(self, instance, owner):
value = instance._values[self.name].get(instance.id)
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
try:
res = datetime.datetime.strptime(value, self.pattern)
except (ValueError, TypeError):
res = value
return res
def __set__(self, instance, value):
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Datetime, self).__set__(instance, value)
def check_value(self, value):
super(Datetime, self).check_value(value)
if isinstance(value, datetime.datetime):
value = value.strftime("%Y-%m-%d %H:%M:%S")
elif is_string(value):
datetime.datetime.strptime(value, self.pattern)
elif isinstance(value, bool):
return value
else:
raise ValueError("Expecting a datetime.datetime object or string")
return value
class Float(BaseField):
"""Equivalent of the `fields.Float` class."""
def __init__(self, name, data):
super(Float, self).__init__(name, data)
def __get__(self, instance, owner):
value = instance._values[self.name].get(instance.id)
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
if value in [None, False]:
value = 0.0
return value
def __set__(self, instance, value):
if value is None:
value = False
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Float, self).__set__(instance, value)
def check_required(self, value):
# Accept 0 values
return super(Float, self).check_required() or value == 0
class Integer(BaseField):
"""Equivalent of the `fields.Integer` class."""
def __init__(self, name, data):
super(Integer, self).__init__(name, data)
def __get__(self, instance, owner):
value = instance._values[self.name].get(instance.id)
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
if value in [None, False]:
value = 0
return value
def __set__(self, instance, value):
if value is None:
value = False
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Integer, self).__set__(instance, value)
def check_required(self, value):
# Accept 0 values
return super(Float, self).check_required() or value == 0
class Selection(BaseField):
"""Represent the OpenObject 'fields.selection'"""
def __init__(self, name, data):
super(Selection, self).__init__(name, data)
self.selection = 'selection' in data and data['selection'] or False
def __get__(self, instance, owner):
value = instance._values[self.name].get(instance.id, False)
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
return value
def __set__(self, instance, value):
if value is None:
value = False
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Selection, self).__set__(instance, value)
def check_value(self, value):
super(Selection, self).check_value(value)
selection = [val[0] for val in self.selection]
if value and value not in selection:
raise ValueError(
"The value '{}' supplied doesn't match with the possible "
"values '{}' for the '{}' field".format(
value, selection, self.name
)
)
return value
class Many2many(BaseField):
"""Represent the OpenObject 'fields.many2many'"""
def __init__(self, name, data):
super(Many2many, self).__init__(name, data)
self.relation = 'relation' in data and data['relation'] or False
self.context = 'context' in data and data['context'] or {}
self.domain = 'domain' in data and data['domain'] or False
def __get__(self, instance, owner):
"""Return a recordset."""
ids = None
if instance._values[self.name].get(instance.id):
ids = instance._values[self.name][instance.id][:]
# None value => get the value on the fly
if ids is None:
args = [[instance.id], [self.name]]
kwargs = {'context': self.context, 'load': '_classic_write'}
orig_ids = instance._odoo.execute_kw(
instance._name, 'read', args, kwargs
)[0][self.name]
instance._values[self.name][instance.id] = orig_ids
ids = orig_ids and orig_ids[:] or []
# Take updated values into account
if instance.id in instance._values_to_write[self.name]:
values = instance._values_to_write[self.name][instance.id]
# Handle ODOO tuples to update 'ids'
ids = tuples2ids(values, ids or [])
# Handle the field context
Relation = instance.env[self.relation]
env = instance.env
if self.context:
context = instance.env.context.copy()
context.update(self.context)
env = instance.env(context=context)
return Relation._browse(env, ids, from_record=(instance, self))
def __set__(self, instance, value):
value = self.check_value(value)
if isinstance(value, IncrementalRecords):
value = value.tuples
else:
if value and not odoo_tuple_in(value):
value = [(6, 0, records2ids(value))]
elif not value:
value = [(5,)]
instance._values_to_write[self.name][instance.id] = value
super(Many2many, self).__set__(instance, value)
def check_value(self, value):
if value:
if (
not isinstance(value, list)
and not isinstance(value, Model)
and not isinstance(value, IncrementalRecords)
):
raise ValueError(
"The value supplied has to be a list, a recordset "
"or 'False'"
)
return super(Many2many, self).check_value(value)
def store(self, record, value):
"""Store the value in the record."""
if record._values[self.name].get(record.id):
tuples2ids(value, record._values[self.name][record.id])
else:
record._values[self.name][record.id] = tuples2ids(value, [])
class Many2one(BaseField):
"""Represent the OpenObject 'fields.many2one'"""
def __init__(self, name, data):
super(Many2one, self).__init__(name, data)
self.relation = 'relation' in data and data['relation'] or False
self.context = 'context' in data and data['context'] or {}
self.domain = 'domain' in data and data['domain'] or False
def __get__(self, instance, owner):
id_ = instance._values[self.name].get(instance.id)
if instance.id in instance._values_to_write[self.name]:
id_ = instance._values_to_write[self.name][instance.id]
# None value => get the value on the fly
if id_ is None:
args = [[instance.id], [self.name]]
kwargs = {'context': self.context, 'load': '_classic_write'}
id_ = instance._odoo.execute_kw(
instance._name, 'read', args, kwargs
)[0][self.name]
instance._values[self.name][instance.id] = id_
Relation = instance.env[self.relation]
if id_:
env = instance.env
if self.context:
context = instance.env.context.copy()
context.update(self.context)
env = instance.env(context=context)
return Relation._browse(env, id_, from_record=(instance, self))
return Relation.browse(False)
def __set__(self, instance, value):
if isinstance(value, Model):
o_rel = value
elif is_int(value):
rel_obj = instance.env[self.relation]
o_rel = rel_obj.browse(value)
elif value in [None, False]:
o_rel = False
else:
raise ValueError(
"Value supplied has to be an integer, "
"a record object or 'None/False'."
)
o_rel = self.check_value(o_rel)
# instance.__data__['updated_values'][self.name] = \
# o_rel and [o_rel.id, False]
instance._values_to_write[self.name][instance.id] = (
o_rel and o_rel.id or False
)
super(Many2one, self).__set__(instance, value)
def check_value(self, value):
super(Many2one, self).check_value(value)
if value and value._name != self.relation:
raise ValueError(
(
"Instance of '{model}' supplied doesn't match with the "
+ "relation '{relation}' of the '{field_name}' field."
).format(
model=value._name,
relation=self.relation,
field_name=self.name,
)
)
return value
class One2many(BaseField):
"""Represent the OpenObject 'fields.one2many'"""
def __init__(self, name, data):
super(One2many, self).__init__(name, data)
self.relation = 'relation' in data and data['relation'] or False
self.context = 'context' in data and data['context'] or {}
self.domain = 'domain' in data and data['domain'] or False
def __get__(self, instance, owner):
"""Return a recordset."""
ids = None
if instance._values[self.name].get(instance.id):
ids = instance._values[self.name][instance.id][:]
# None value => get the value on the fly
if ids is None:
args = [[instance.id], [self.name]]
kwargs = {'context': self.context, 'load': '_classic_write'}
orig_ids = instance._odoo.execute_kw(
instance._name, 'read', args, kwargs
)[0][self.name]
instance._values[self.name][instance.id] = orig_ids
ids = orig_ids and orig_ids[:] or []
# Take updated values into account
if instance.id in instance._values_to_write[self.name]:
values = instance._values_to_write[self.name][instance.id]
# Handle ODOO tuples to update 'ids'
ids = tuples2ids(values, ids or [])
Relation = instance.env[self.relation]
env = instance.env
if self.context:
context = instance.env.context.copy()
context.update(self.context)
env = instance.env(context=context)
return Relation._browse(env, ids, from_record=(instance, self))
def __set__(self, instance, value):
value = self.check_value(value)
if isinstance(value, IncrementalRecords):
value = value.tuples
else:
if value and not odoo_tuple_in(value):
value = [(6, 0, records2ids(value))]
elif not value:
value = [(5,)]
instance._values_to_write[self.name][instance.id] = value
super(One2many, self).__set__(instance, value)
def check_value(self, value):
if value:
if (
not isinstance(value, list)
and not isinstance(value, Model)
and not isinstance(value, IncrementalRecords)
):
raise ValueError(
"The value supplied has to be a list, a recordset "
"or 'False'"
)
return super(One2many, self).check_value(value)
def store(self, record, value):
"""Store the value in the record."""
if record._values[self.name].get(record.id):
tuples2ids(value, record._values[self.name][record.id])
else:
record._values[self.name][record.id] = tuples2ids(value, [])
class Reference(BaseField):
"""Represent the OpenObject 'fields.reference'."""
def __init__(self, name, data):
super(Reference, self).__init__(name, data)
self.context = 'context' in data and data['context'] or {}
self.domain = 'domain' in data and data['domain'] or False
self.selection = 'selection' in data and data['selection'] or False
def __get__(self, instance, owner):
value = instance._values[self.name].get(instance.id) or False
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
# None value => get the value on the fly
if value is None:
args = [[instance.id], [self.name]]
kwargs = {'context': self.context, 'load': '_classic_write'}
value = instance._odoo.execute_kw(
instance._name, 'read', args, kwargs
)[0][self.name]
instance._values_to_write[self.name][instance.id] = value
if value:
parts = value.rpartition(',')
relation, o_id = parts[0], parts[2]
relation = relation.strip()
o_id = int(o_id.strip())
if relation and o_id:
Relation = instance.env[relation]
env = instance.env
if self.context:
context = instance.env.context.copy()
context.update(self.context)
env = instance.env(context=context)
return Relation._browse(
env, o_id, from_record=(instance, self)
)
return False
def __set__(self, instance, value):
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Reference, self).__set__(instance, value)
def _check_relation(self, relation):
"""Raise a `ValueError` if `relation` is not allowed among
the possible values.
"""
selection = [val[0] for val in self.selection]
if relation not in selection:
raise ValueError(
(
"The value '{value}' supplied doesn't match with the possible"
" values '{selection}' for the '{field_name}' field"
).format(
value=relation, selection=selection, field_name=self.name
)
)
return relation
def check_value(self, value):
if isinstance(value, Model):
relation = value.__class__.__osv__['name']
self._check_relation(relation)
value = "{},{}".format(relation, value.id)
super(Reference, self).check_value(value)
elif is_string(value):
super(Reference, self).check_value(value)
parts = value.rpartition(',')
relation, o_id = parts[0], parts[2]
relation = relation.strip()
o_id = o_id.strip()
# o_rel = instance.__class__.__odoo__.browse(relation, o_id)
if not relation or not is_int(o_id):
raise ValueError(
"String not well formatted, expecting "
"'{relation},{id}' format"
)
self._check_relation(relation)
else:
raise ValueError(
"Value supplied has to be a string or"
" a browse_record object."
)
return value
class Text(BaseField):
"""Equivalent of the `fields.Text` class."""
def __init__(self, name, data):
super(Text, self).__init__(name, data)
def __get__(self, instance, owner):
value = instance._values[self.name].get(instance.id)
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
return value
def __set__(self, instance, value):
if value is None:
value = False
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Text, self).__set__(instance, value)
class Html(Text):
"""Equivalent of the `fields.Html` class."""
def __init__(self, name, data):
super(Html, self).__init__(name, data)
class Unknown(BaseField):
"""Represent an unknown field. This should not happen but this kind of
field only exists to avoid a blocking situation from a RPC point of view.
"""
def __init__(self, name, data):
super(Unknown, self).__init__(name, data)
def __get__(self, instance, owner):
value = instance._values[self.name][instance.id]
if instance.id in instance._values_to_write[self.name]:
value = instance._values_to_write[self.name][instance.id]
return value
def __set__(self, instance, value):
value = self.check_value(value)
instance._values_to_write[self.name][instance.id] = value
super(Unknown, self).__set__(instance, value)
TYPES_TO_FIELDS = {
'binary': Binary,
'boolean': Boolean,
'char': Char,
'date': Date,
'datetime': Datetime,
'float': Float,
'html': Html,
'integer': Integer,
'many2many': Many2many,
'many2one': Many2one,
'one2many': One2many,
'reference': Reference,
'selection': Selection,
'text': Text,
}
def generate_field(name, data):
"""Generate a well-typed field according to the data dictionary supplied
(obtained via the `fields_get' method of any models).
"""
assert 'type' in data
field = TYPES_TO_FIELDS.get(data['type'], Unknown)(name, data)
return field

View File

@ -1,479 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""Provide the :class:`Model` class which allow to access dynamically to all
methods proposed by a data model.
"""
__all__ = ['Model']
import sys
from odoorpc import error
# Python 2
if sys.version_info[0] < 3:
# noqa: F821
NORMALIZED_TYPES = (int, long, str, unicode) # noqa: F821
# Python >= 3
else:
NORMALIZED_TYPES = (int, str, bytes)
FIELDS_RESERVED = ['id', 'ids', '__odoo__', '__osv__', '__data__', 'env']
def _normalize_ids(ids):
"""Normalizes the ids argument for ``browse``."""
if not ids:
return []
if ids.__class__ in NORMALIZED_TYPES:
return [ids]
return list(ids)
class IncrementalRecords(object):
"""A helper class used internally by __iadd__ and __isub__ methods.
Afterwards, field descriptors can adapt their behaviour when an instance of
this class is set.
"""
def __init__(self, tuples):
self.tuples = tuples
class MetaModel(type):
"""Define class methods for the :class:`Model` class."""
_env = None
def __getattr__(cls, method):
"""Provide a dynamic access to a RPC method."""
if method.startswith('_'):
return super(MetaModel, cls).__getattr__(method)
def rpc_method(*args, **kwargs):
"""Return the result of the RPC request."""
if cls._odoo.config['auto_context'] and 'context' not in kwargs:
kwargs['context'] = cls.env.context
result = cls._odoo.execute_kw(cls._name, method, args, kwargs)
return result
return rpc_method
def __repr__(cls):
return "Model(%r)" % (cls._name)
@property
def env(cls):
"""The environment used for this model/recordset."""
return cls._env
# An intermediate class used to associate the 'MetaModel' metaclass to the
# 'Model' one with a Python 2 and Python 3 compatibility
BaseModel = MetaModel('BaseModel', (), {})
class Model(BaseModel):
"""Base class for all data model proxies.
.. note::
All model proxies (based on this class) are generated by an
:class:`environment <odoorpc.env.Environment>`
(see the :attr:`odoorpc.ODOO.env` property).
.. doctest::
:options: +SKIP
>>> import odoorpc
>>> odoo = odoorpc.ODOO('localhost', port=8069)
>>> odoo.login('db_name', 'admin', 'password')
>>> User = odoo.env['res.users']
>>> User
Model('res.users')
.. doctest::
:hide:
>>> import odoorpc
>>> odoo = odoorpc.ODOO(HOST, protocol=PROTOCOL, port=PORT)
>>> odoo.login(DB, USER, PWD)
>>> User = odoo.env['res.users']
>>> User
Model('res.users')
Use this data model proxy to call any method:
.. doctest::
:options: +SKIP
>>> User.name_get([2]) # Use any methods from the model class
[[1, 'Mitchell Admin']]
.. doctest::
:hide:
>>> from odoorpc.tools import v
>>> uid = 1
>>> if v(VERSION) >= v('12.0'):
... uid = 2
>>> data = User.name_get([uid])
>>> 'Admin' in data[0][1]
True
Get a recordset:
.. doctest::
:options: +SKIP
>>> user = User.browse(2)
>>> user.name
'Mitchell Admin'
.. doctest::
:hide:
>>> from odoorpc.tools import v
>>> uid = 1
>>> if v(VERSION) >= v('12.0'):
... uid = 2
>>> user = User.browse(uid)
>>> 'Admin' in user.name
True
And call any method from it, it will be automatically applied on the
current record:
.. doctest::
:options: +SKIP
>>> user.name_get() # No IDs in parameter, the method is applied on the current recordset
[[1, 'Mitchell Admin']]
.. doctest::
:hide:
>>> data = user.name_get()
>>> 'Admin' in data[0][1]
True
.. warning::
Excepted the :func:`browse <odoorpc.models.Model.browse>` method,
method calls are purely dynamic. As long as you know the signature of
the model method targeted, you will be able to use it
(see the :ref:`tutorial <tuto-execute-queries>`).
"""
__metaclass__ = MetaModel
_odoo = None
_name = None
_columns = {} # {field: field object}
def __init__(self):
super(Model, self).__init__()
self._env_local = None
self._from_record = None
self._ids = []
self._values = {} # {field: {ID: value}}
self._values_to_write = {} # {field: {ID: value}}
for field in self._columns:
self._values[field] = {}
self._values_to_write[field] = {}
self.with_context = self._with_context
self.with_env = self._with_env
@property
def env(self):
"""The environment used for this model/recordset."""
if self._env_local:
return self._env_local
return self.__class__._env
@property
def id(self):
"""ID of the record (or the first ID of a recordset)."""
return self._ids[0] if self._ids else None
@property
def ids(self):
"""IDs of the recorset."""
return self._ids
@classmethod
def _browse(cls, env, ids, from_record=None, iterated=None):
"""Create an instance (a recordset) corresponding to `ids` and
attached to `env`.
`from_record` parameter is used when the recordset is related to a
parent record, and as such can take the value of a tuple
(record, field). This is useful to update the parent record when the
current recordset is modified.
`iterated` can take the value of an iterated recordset, and no extra
RPC queries are made to generate the resulting record (recordset and
its record share the same values).
"""
records = cls()
records._env_local = env
records._ids = _normalize_ids(ids)
if iterated:
records._values = iterated._values
records._values_to_write = iterated._values_to_write
else:
records._from_record = from_record
records._values = {}
records._values_to_write = {}
for field in cls._columns:
records._values[field] = {}
records._values_to_write[field] = {}
records._init_values()
return records
@classmethod
def browse(cls, ids):
"""Browse one or several records (if `ids` is a list of IDs).
.. doctest::
>>> odoo.env['res.partner'].browse(1)
Recordset('res.partner', [1])
.. doctest::
:options: +SKIP
>>> [partner.name for partner in odoo.env['res.partner'].browse([1, 3])]
['YourCompany', 'Mitchell Admin']
.. doctest::
:hide:
>>> names = [partner.name for partner in odoo.env['res.partner'].browse([1, 3])]
>>> 'YourCompany' in names[0]
True
>>> 'Admin' in names[1]
True
A list of data types returned by such record fields are
available :ref:`here <fields>`.
:return: a :class:`Model <odoorpc.models.Model>`
instance (recordset)
:raise: :class:`odoorpc.error.RPCError`
"""
return cls._browse(cls.env, ids)
@classmethod
def with_context(cls, *args, **kwargs):
"""Return a model (or recordset) equivalent to the current model
(or recordset) attached to an environment with another context.
The context is taken from the current environment or from the
positional arguments `args` if given, and modified by `kwargs`.
Thus, the following two examples are equivalent:
.. doctest::
>>> Product = odoo.env['product.product']
>>> Product.with_context(lang='fr_FR')
Model('product.product')
.. doctest::
>>> context = Product.env.context
>>> Product.with_context(context, lang='fr_FR')
Model('product.product')
This method is very convenient for example to search records
whatever their active status are (active/inactive):
.. doctest::
>>> all_product_ids = Product.with_context(active_test=False).search([])
Or to update translations of a recordset:
.. doctest::
>>> product_en = Product.browse(1)
>>> product_en.env.lang
'en_US'
>>> product_en.name = "My product" # Update the english translation
>>> product_fr = product_en.with_context(lang='fr_FR')
>>> product_fr.env.lang
'fr_FR'
>>> product_fr.name = "Mon produit" # Update the french translation
"""
context = dict(args[0] if args else cls.env.context, **kwargs)
return cls.with_env(cls.env(context=context))
def _with_context(self, *args, **kwargs):
"""As the `with_context` class method but for recordset."""
context = dict(args[0] if args else self.env.context, **kwargs)
return self.with_env(self.env(context=context))
@classmethod
def with_env(cls, env):
"""Return a model (or recordset) equivalent to the current model
(or recordset) attached to `env`.
"""
new_cls = type(cls.__name__, cls.__bases__, dict(cls.__dict__))
new_cls._env = env
return new_cls
def _with_env(self, env):
"""As the `with_env` class method but for recordset."""
res = self._browse(env, self._ids)
return res
def _init_values(self, context=None):
"""Retrieve field values from the server.
May be used to restore the original values in the purpose to cancel
all changes made.
"""
if context is None:
context = self.env.context
# Get basic fields (no relational ones)
basic_fields = []
for field_name in self._columns:
field = self._columns[field_name]
if not getattr(field, 'relation', False):
basic_fields.append(field_name)
# Fetch values from the server
if self.ids:
rows = self.__class__.read(
self.ids, basic_fields, context=context, load='_classic_write'
)
ids_fetched = set()
for row in rows:
ids_fetched.add(row['id'])
for field_name in row:
if field_name == 'id':
continue
self._values[field_name][row['id']] = row[field_name]
ids_in_error = set(self.ids) - ids_fetched
if ids_in_error:
raise ValueError(
"There is no '{model}' record with IDs {ids}.".format(
model=self._name, ids=list(ids_in_error)
)
)
# No ID: fields filled with default values
else:
default_get = self.__class__.default_get(
list(self._columns), context=context
)
for field_name in self._columns:
self._values[field_name][None] = default_get.get(
field_name, False
)
def __getattr__(self, method):
"""Provide a dynamic access to a RPC *instance* method (which applies
on the current recordset).
.. doctest::
>>> Partner = odoo.env['res.partner']
>>> Partner.write([1], {'name': 'YourCompany'}) # Class method
True
>>> partner = Partner.browse(1)
>>> partner.write({'name': 'YourCompany'}) # Instance method
True
"""
if method.startswith('_'):
return super(Model, self).__getattr__(method)
def rpc_method(*args, **kwargs):
"""Return the result of the RPC request."""
args = tuple([self.ids]) + args
if self._odoo.config['auto_context'] and 'context' not in kwargs:
kwargs['context'] = self.env.context
result = self._odoo.execute_kw(self._name, method, args, kwargs)
return result
return rpc_method
def __getitem__(self, key):
"""If `key` is an integer or a slice, return the corresponding record
selection as a recordset.
"""
if isinstance(key, int) or isinstance(key, slice):
return self._browse(self.env, self._ids[key], iterated=self)
else:
return getattr(self, key)
def __int__(self):
return self.id
def __eq__(self, other):
return other.__class__ == self.__class__ and self.id == other.id
# Need to explicitly declare '__hash__' in Python 3
# (because '__eq__' is defined)
__hash__ = BaseModel.__hash__
def __ne__(self, other):
return other.__class__ != self.__class__ or self.id != other.id
def __repr__(self):
return "Recordset({!r}, {})".format(self._name, self.ids)
def __iter__(self):
"""Return an iterator over `self`."""
for id_ in self._ids:
yield self._browse(self.env, id_, iterated=self)
def __nonzero__(self):
return bool(getattr(self, '_ids', True))
def __len__(self):
return len(self.ids)
def __iadd__(self, records):
if not self._from_record:
raise error.InternalError("No parent record to update")
try:
list(records)
except TypeError:
records = [records]
parent = self._from_record[0]
field = self._from_record[1]
updated_values = parent._values_to_write[field.name]
values = []
if updated_values.get(parent.id):
values = updated_values[parent.id][:] # Copy
from odoorpc import fields
for id_ in fields.records2ids(records):
if (3, id_) in values:
values.remove((3, id_))
if (4, id_) not in values:
values.append((4, id_))
return IncrementalRecords(values)
def __isub__(self, records):
if not self._from_record:
raise error.InternalError("No parent record to update")
try:
list(records)
except TypeError:
records = [records]
parent = self._from_record[0]
field = self._from_record[1]
updated_values = parent._values_to_write[field.name]
values = []
if updated_values.get(parent.id):
values = updated_values[parent.id][:] # Copy
from odoorpc import fields
for id_ in fields.records2ids(records):
if (4, id_) in values:
values.remove((4, id_))
if (3, id_) not in values:
values.append((3, id_))
return values

View File

@ -1,714 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""This module contains the ``ODOO`` class which is the entry point to manage
an `Odoo` server.
"""
from odoorpc import error, rpc, session, tools
from odoorpc.db import DB
from odoorpc.env import Environment
from odoorpc.report import Report
class ODOO(object):
"""Return a new instance of the :class:`ODOO` class.
`JSON-RPC` protocol is used to make requests, and the respective values
for the `protocol` parameter are ``jsonrpc`` (default) and ``jsonrpc+ssl``.
.. doctest::
:options: +SKIP
>>> import odoorpc
>>> odoo = odoorpc.ODOO('localhost', protocol='jsonrpc', port=8069)
`OdooRPC` will try by default to detect the server version in order to
adapt its requests if necessary. However, it is possible to force the
version to use with the `version` parameter:
.. doctest::
:options: +SKIP
>>> odoo = odoorpc.ODOO('localhost', version='12.0')
You can also define a custom URL opener to handle HTTP requests. A use
case is to manage a basic HTTP authentication in front of `Odoo`:
.. doctest::
:options: +SKIP
>>> import urllib.request
>>> import odoorpc
>>> pwd_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
>>> pwd_mgr.add_password(None, "http://example.net", "userName", "passWord")
>>> auth_handler = urllib.request.HTTPBasicAuthHandler(pwd_mgr)
>>> opener = urllib.request.build_opener(auth_handler)
>>> odoo = odoorpc.ODOO('example.net', port=80, opener=opener)
*Python 2:*
:raise: :class:`odoorpc.error.InternalError`
:raise: `ValueError` (wrong protocol, port value, timeout value)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:raise: :class:`odoorpc.error.InternalError`
:raise: `ValueError` (wrong protocol, port value, timeout value)
:raise: `urllib.error.URLError` (connection error)
"""
def __init__(
self,
host='localhost',
protocol='jsonrpc',
port=8069,
timeout=120,
version=None,
opener=None,
):
if protocol not in ['jsonrpc', 'jsonrpc+ssl']:
txt = (
"The protocol '{0}' is not supported by the ODOO class. "
"Please choose a protocol among these ones: {1}"
)
txt = txt.format(protocol, ['jsonrpc', 'jsonrpc+ssl'])
raise ValueError(txt)
try:
port = int(port)
except (ValueError, TypeError):
raise ValueError("The port must be an integer")
try:
if timeout is not None:
timeout = float(timeout)
except (ValueError, TypeError):
raise ValueError("The timeout must be a float")
self._host = host
self._port = port
self._protocol = protocol
self._env = None
self._login = None
self._password = None
self._db = DB(self)
self._report = Report(self)
# Instanciate the server connector
try:
self._connector = rpc.PROTOCOLS[protocol](
self._host, self._port, timeout, version, opener=opener
)
except rpc.error.ConnectorError as exc:
raise error.InternalError(exc.message)
# Dictionary of configuration options
self._config = tools.Config(
self,
{'auto_commit': True, 'auto_context': True, 'timeout': timeout},
)
@property
def config(self):
"""Dictionary of available configuration options.
.. doctest::
:options: +SKIP
>>> odoo.config
{'auto_commit': True, 'auto_context': True, 'timeout': 120}
.. doctest::
:hide:
>>> 'auto_commit' in odoo.config
True
>>> 'auto_context' in odoo.config
True
>>> 'timeout' in odoo.config
True
- ``auto_commit``: if set to `True` (default), each time a value is set
on a record field a RPC request is sent to the server to update the
record (see :func:`odoorpc.env.Environment.commit`).
- ``auto_context``: if set to `True` (default), the user context will
be sent automatically to every call of a
:class:`model <odoorpc.models.Model>` method (default: `True`):
.. doctest::
:options: +SKIP
>>> odoo.env.context['lang'] = 'fr_FR'
>>> Product = odoo.env['product.product']
>>> Product.name_get([2]) # Context sent by default ('lang': 'fr_FR' here)
[[2, 'Surveillance sur site']]
>>> odoo.config['auto_context'] = False
>>> Product.name_get([2]) # No context sent, 'en_US' used
[[2, 'On Site Monitoring']]
- ``timeout``: set the maximum timeout in seconds for a RPC request
(default: `120`):
>>> odoo.config['timeout'] = 300
"""
return self._config
@property
def version(self):
"""The version of the server.
.. doctest::
:options: +SKIP
>>> odoo.version
'12.0'
"""
return self._connector.version
@property
def db(self):
"""The database management service.
See the :class:`odoorpc.db.DB` class.
"""
return self._db
@property
def report(self):
"""The report management service.
See the :class:`odoorpc.report.Report` class.
"""
return self._report
host = property(
lambda self: self._host,
doc="Hostname of IP address of the the server.",
)
port = property(lambda self: self._port, doc="The port used.")
protocol = property(lambda self: self._protocol, doc="The protocol used.")
@property
def env(self):
"""The environment which wraps data to manage records such as the
user context and the registry to access data model proxies.
>>> Partner = odoo.env['res.partner']
>>> Partner
Model('res.partner')
See the :class:`odoorpc.env.Environment` class.
"""
self._check_logged_user()
return self._env
def json(self, url, params):
"""Low level method to execute JSON queries.
It basically performs a request and raises an
:class:`odoorpc.error.RPCError` exception if the response contains
an error.
You have to know the names of each parameter required by the function
called, and set them in the `params` dictionary.
Here an authentication request:
.. doctest::
:options: +SKIP
>>> data = odoo.json(
... '/web/session/authenticate',
... {'db': 'db_name', 'login': 'admin', 'password': 'admin'})
>>> from pprint import pprint
>>> pprint(data)
{'id': 645674382,
'jsonrpc': '2.0',
'result': {'db': 'db_name',
'session_id': 'fa740abcb91784b8f4750c5c5b14da3fcc782d11',
'uid': 1,
'user_context': {'lang': 'en_US',
'tz': 'Europe/Brussels',
'uid': 1},
'username': 'admin'}}
.. doctest::
:hide:
>>> data = odoo.json(
... '/web/session/authenticate',
... {'db': DB, 'login': USER, 'password': PWD})
>>> data['result']['db'] == DB
True
>>> data['result']['uid'] in [1, 2]
True
>>> data['result']['username'] == USER
True
And a call to the ``read`` method of the ``res.users`` model:
.. doctest::
:options: +SKIP
>>> data = odoo.json(
... '/web/dataset/call',
... {'model': 'res.users', 'method': 'read',
... 'args': [[2], ['name']]})
>>> from pprint import pprint
>>> pprint(data)
{'id': ...,
'jsonrpc': '2.0',
'result': [{'id': 2, 'name': 'Mitchell Admin'}]}
*Python 2:*
:return: a dictionary (JSON response)
:raise: :class:`odoorpc.error.RPCError`
:raise: `urllib2.HTTPError` (if `params` is not a dictionary)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: a dictionary (JSON response)
:raise: :class:`odoorpc.error.RPCError`
:raise: `urllib.error.HTTPError` (if `params` is not a dictionary)
:raise: `urllib.error.URLError` (connection error)
"""
data = self._connector.proxy_json(url, params)
if data.get('error'):
raise error.RPCError(
data['error']['data']['message'], data['error']
)
return data
def http(self, url, data=None, headers=None):
"""Low level method to execute raw HTTP queries.
.. note::
For low level JSON-RPC queries, see the more convenient
:func:`odoorpc.ODOO.json` method instead.
You have to know the names of each POST parameter required by the
URL, and set them in the `data` string/buffer.
The `data` argument must be built by yourself, following the expected
URL parameters (with :func:`urllib.urlencode` function for simple
parameters, or multipart/form-data structure to handle file upload).
E.g., the HTTP raw query to get the company logo on `Odoo 12.0`:
.. doctest::
>>> response = odoo.http('web/binary/company_logo')
>>> binary_data = response.read()
*Python 2:*
:return: `urllib.addinfourl`
:raise: `urllib2.HTTPError`
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: `http.client.HTTPResponse`
:raise: `urllib.error.HTTPError`
:raise: `urllib.error.URLError` (connection error)
"""
return self._connector.proxy_http(url, data, headers)
# NOTE: in the past this function was implemented as a decorator for
# methods needing to be checked, but Sphinx documentation generator is not
# able to parse decorated methods.
def _check_logged_user(self):
"""Check if a user is logged. Otherwise, an error is raised."""
if not self._env or not self._password or not self._login:
raise error.InternalError("Login required")
def login(self, db, login='admin', password='admin'):
"""Log in as the given `user` with the password `passwd` on the
database `db`.
.. doctest::
:options: +SKIP
>>> odoo.login('db_name', 'admin', 'admin')
>>> odoo.env.user.name
'Administrator'
*Python 2:*
:raise: :class:`odoorpc.error.RPCError`
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:raise: :class:`odoorpc.error.RPCError`
:raise: `urllib.error.URLError` (connection error)
"""
# Get the user's ID and generate the corresponding user record
if tools.v(self.version)[0] >= 10:
data = self.json(
"/jsonrpc",
params={
"service": "common",
"method": "login",
"args": [db, login, password],
},
)
uid = data["result"]
else:
# Needed to get 'report' service working on Odoo < 10.0
data = self.json(
"/web/session/authenticate",
{"db": db, "login": login, "password": password},
)
uid = data["result"]["uid"]
if uid:
if tools.v(self.version)[0] >= 10:
args_to_send = [db, uid, password, "res.users", "context_get"]
context = self.json(
"/jsonrpc",
{
"service": "object",
"method": "execute",
"args": args_to_send,
},
)["result"]
context["uid"] = uid
else:
context = data["result"]["user_context"]
self._env = Environment(self, db, uid, context=context)
self._login = login
self._password = password
else:
raise error.RPCError("Wrong login ID or password")
def logout(self):
"""Log out the user.
>>> odoo.logout()
True
*Python 2:*
:return: `True` if the operation succeed, `False` if no user was logged
:raise: :class:`odoorpc.error.RPCError`
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: `True` if the operation succeed, `False` if no user was logged
:raise: :class:`odoorpc.error.RPCError`
:raise: `urllib.error.URLError` (connection error)
"""
if not self._env:
return False
if tools.v(self.version)[0] < 10:
self.json('/web/session/destroy', {})
self._env = None
self._login = None
self._password = None
return True
def close(self):
"""Same than :attr:`odoorpc.ODOO.logout` method.
Here for the compatibility with `contextlib.closing`:
.. doctest::
:options: +SKIP
>>> import contextlib
>>> odoo.login('db_name', 'admin', 'admin')
>>> with contextlib.closing(odoo):
... print(odoo.env.user.name)
...
Mitchell Admin
"""
return self.logout()
# ------------------------- #
# -- Raw XML-RPC methods -- #
# ------------------------- #
def execute(self, model, method, *args):
"""Execute the `method` of `model`.
`*args` parameters varies according to the `method` used.
.. doctest::
:options: +SKIP
>>> odoo.execute('res.partner', 'read', [1], ['name'])
[{'id': 1, 'name': 'YourCompany'}]
.. doctest::
:hide:
>>> data = odoo.execute('res.partner', 'read', [1], ['name'])
>>> data[0]['id'] == 1
True
>>> data[0]['name'] == 'YourCompany'
True
*Python 2:*
:return: the result returned by the `method` called
:raise: :class:`odoorpc.error.RPCError`
:raise: :class:`odoorpc.error.InternalError` (if not logged)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: the result returned by the `method` called
:raise: :class:`odoorpc.error.RPCError`
:raise: :class:`odoorpc.error.InternalError` (if not logged)
:raise: `urllib.error.URLError` (connection error)
"""
self._check_logged_user()
# Execute the query
args_to_send = [
self.env.db,
self.env.uid,
self._password,
model,
method,
]
args_to_send.extend(args)
data = self.json(
'/jsonrpc',
{'service': 'object', 'method': 'execute', 'args': args_to_send},
)
return data.get('result')
def execute_kw(self, model, method, args=None, kwargs=None):
"""Execute the `method` of `model`.
`args` is a list of parameters (in the right order),
and `kwargs` a dictionary (named parameters). Both varies according
to the `method` used.
.. doctest::
:options: +SKIP
>>> odoo.execute_kw('res.partner', 'read', [[1]], {'fields': ['name']})
[{'id': 1, 'name': 'YourCompany'}]
.. doctest::
:hide:
>>> data = odoo.execute_kw('res.partner', 'read', [[1]], {'fields': ['name']})
>>> data[0]['id'] == 1
True
>>> data[0]['name'] == 'YourCompany'
True
*Python 2:*
:return: the result returned by the `method` called
:raise: :class:`odoorpc.error.RPCError`
:raise: :class:`odoorpc.error.InternalError` (if not logged)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: the result returned by the `method` called
:raise: :class:`odoorpc.error.RPCError`
:raise: :class:`odoorpc.error.InternalError` (if not logged)
:raise: `urllib.error.URLError` (connection error)
"""
self._check_logged_user()
# Execute the query
args = args or []
kwargs = kwargs or {}
args_to_send = [
self.env.db,
self.env.uid,
self._password,
model,
method,
]
args_to_send.extend([args, kwargs])
data = self.json(
'/jsonrpc',
{
'service': 'object',
'method': 'execute_kw',
'args': args_to_send,
},
)
return data.get('result')
def exec_workflow(self, model, record_id, signal):
"""Execute the workflow `signal` on
the instance having the ID `record_id` of `model`.
*Python 2:*
:raise: :class:`odoorpc.error.RPCError`
:raise: :class:`odoorpc.error.InternalError` (if not logged)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:raise: :class:`odoorpc.error.RPCError`
:raise: :class:`odoorpc.error.InternalError` (if not logged)
:raise: `urllib.error.URLError` (connection error)
"""
if tools.v(self.version)[0] >= 11:
raise DeprecationWarning(
u"Workflows have been removed in Odoo >= 11.0"
)
self._check_logged_user()
# Execute the workflow query
args_to_send = [
self.env.db,
self.env.uid,
self._password,
model,
signal,
record_id,
]
data = self.json(
'/jsonrpc',
{
'service': 'object',
'method': 'exec_workflow',
'args': args_to_send,
},
)
return data.get('result')
# ---------------------- #
# -- Session methods -- #
# ---------------------- #
def save(self, name, rc_file='~/.odoorpcrc'):
"""Save the current :class:`ODOO <odoorpc.ODOO>` instance (a `session`)
inside `rc_file` (``~/.odoorpcrc`` by default). This session will be
identified by `name`::
>>> import odoorpc
>>> odoo = odoorpc.ODOO('localhost', port=8069)
>>> odoo.login('db_name', 'admin', 'admin')
>>> odoo.save('foo')
Use the :func:`list <odoorpc.ODOO.list>` class method to list all
stored sessions, and the :func:`load <odoorpc.ODOO.load>` class method
to retrieve an already-connected :class:`ODOO <odoorpc.ODOO>` instance.
*Python 2:*
:raise: :class:`odoorpc.error.InternalError` (if not logged)
:raise: `IOError`
*Python 3:*
:raise: :class:`odoorpc.error.InternalError` (if not logged)
:raise: `PermissionError`
:raise: `FileNotFoundError`
"""
self._check_logged_user()
data = {
'type': self.__class__.__name__,
'host': self.host,
'protocol': self.protocol,
'port': self.port,
'timeout': self.config['timeout'],
'user': self._login,
'passwd': self._password,
'database': self.env.db,
}
session.save(name, data, rc_file)
@classmethod
def load(cls, name, rc_file='~/.odoorpcrc'):
"""Return a connected :class:`ODOO` session identified by `name`:
.. doctest::
:options: +SKIP
>>> import odoorpc
>>> odoo = odoorpc.ODOO.load('foo')
Such sessions are stored with the
:func:`save <odoorpc.ODOO.save>` method.
*Python 2:*
:raise: :class:`odoorpc.error.RPCError`
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:raise: :class:`odoorpc.error.RPCError`
:raise: `urllib.error.URLError` (connection error)
"""
data = session.get(name, rc_file)
if data.get('type') != cls.__name__:
raise error.InternalError(
"'{}' session is not of type '{}'".format(name, cls.__name__)
)
odoo = cls(
host=data['host'],
protocol=data['protocol'],
port=data['port'],
timeout=data['timeout'],
)
odoo.login(
db=data['database'], login=data['user'], password=data['passwd']
)
return odoo
@classmethod
def list(cls, rc_file='~/.odoorpcrc'):
"""Return a list of all stored sessions available in the
`rc_file` file:
.. doctest::
:options: +SKIP
>>> import odoorpc
>>> odoorpc.ODOO.list()
['foo', 'bar']
Use the :func:`save <odoorpc.ODOO.save>` and
:func:`load <odoorpc.ODOO.load>` methods to manage such sessions.
*Python 2:*
:raise: `IOError`
*Python 3:*
:raise: `PermissionError`
:raise: `FileNotFoundError`
"""
sessions = session.get_all(rc_file)
return [
name
for name in sessions
if sessions[name].get('type') == cls.__name__
]
# return session.list(rc_file)
@classmethod
def remove(cls, name, rc_file='~/.odoorpcrc'):
"""Remove the session identified by `name` from the `rc_file` file:
.. doctest::
:options: +SKIP
>>> import odoorpc
>>> odoorpc.ODOO.remove('foo')
True
*Python 2:*
:raise: `ValueError` (if the session does not exist)
:raise: `IOError`
*Python 3:*
:raise: `ValueError` (if the session does not exist)
:raise: `PermissionError`
:raise: `FileNotFoundError`
"""
data = session.get(name, rc_file)
if data.get('type') != cls.__name__:
raise error.InternalError(
"'{}' session is not of type '{}'".format(name, cls.__name__)
)
return session.remove(name, rc_file)

View File

@ -1,223 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""This module provide the :class:`Report` class to list available reports and
to generate/download them.
"""
import base64
import io
from odoorpc.tools import get_encodings, v
def encode2bytes(data):
for encoding in get_encodings():
try:
return data.decode(encoding)
except Exception:
pass
return data
class Report(object):
"""The `Report` class represents the report management service.
It provides methods to list and download available reports from the server.
.. note::
This service have to be used through the :attr:`odoorpc.ODOO.report`
property.
.. doctest::
:options: +SKIP
>>> import odoorpc
>>> odoo = odoorpc.ODOO('localhost', port=8069)
>>> odoo.login('odoorpc_test', 'admin', 'password')
>>> odoo.report
<odoorpc.report.Report object at 0x7f82fe7a1d50>
.. doctest::
:hide:
>>> import odoorpc
>>> odoo = odoorpc.ODOO(HOST, protocol=PROTOCOL, port=PORT)
>>> odoo.login(DB, USER, PWD)
>>> odoo.report
<odoorpc.report.Report object at ...>
"""
def __init__(self, odoo):
self._odoo = odoo
def download(self, name, ids, datas=None, context=None):
"""Download a report from the server and return it as a remote file.
Warning: this feature is not supported for Odoo >= 14 (CSRF token required).
For instance, to download the "Quotation / Order" report of sale orders
identified by the IDs ``[2, 3]``:
.. doctest::
:options: +SKIP
>>> report = odoo.report.download('sale.report_saleorder', [2, 3])
.. doctest::
:hide:
>>> from odoorpc.tools import v
>>> if v(VERSION) < v('14.0'):
... report = odoo.report.download('sale.report_saleorder', [2])
Write it on the file system:
.. doctest::
:options: +SKIP
>>> with open('sale_orders.pdf', 'wb') as report_file:
... report_file.write(report.read())
...
.. doctest::
:hide:
>>> from odoorpc.tools import v
>>> if v(VERSION) < v('14.0'):
... with open('sale_orders.pdf', 'wb') as report_file:
... fileno = report_file.write(report.read()) # Python 3
...
*Python 2:*
:return: `io.BytesIO`
:raise: :class:`odoorpc.error.RPCError` (wrong parameters)
:raise: `ValueError` (received invalid data)
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: `io.BytesIO`
:raise: :class:`odoorpc.error.RPCError` (wrong parameters)
:raise: `ValueError` (received invalid data)
:raise: `urllib.error.URLError` (connection error)
"""
if context is None:
context = self._odoo.env.context
def check_report(name):
report_model = 'ir.actions.report'
if v(self._odoo.version)[0] < 11:
report_model = 'ir.actions.report.xml'
IrReport = self._odoo.env[report_model]
report_ids = IrReport.search([('report_name', '=', name)])
report_id = report_ids and report_ids[0] or False
if not report_id:
raise ValueError("The report '%s' does not exist." % name)
return report_id
report_id = check_report(name)
# Odoo >= 11.0
if v(self._odoo.version)[0] >= 11:
IrReport = self._odoo.env['ir.actions.report']
report = IrReport.browse(report_id)
if v(self._odoo.version)[0] >= 14:
# Need a CSRF token to print reports on Odoo >= 14
raise NotImplementedError
# response = report.with_context(context)._render(
# ids, data=datas
# )
else:
response = report.with_context(context).render(ids, data=datas)
content = response[0]
# On the server the result is a bytes string,
# but the RPC layer of Odoo returns it as a unicode string,
# so we encode it again as bytes
result = content.encode('latin1')
return io.BytesIO(result)
# Odoo < 11.0
else:
args_to_send = [
self._odoo.env.db,
self._odoo.env.uid,
self._odoo._password,
name,
ids,
datas,
context,
]
data = self._odoo.json(
'/jsonrpc',
{
'service': 'report',
'method': 'render_report',
'args': args_to_send,
},
)
if 'result' not in data and not data['result'].get('result'):
raise ValueError("Received invalid data.")
# Encode to bytes forced to be compatible with Python 3.2
# (its 'base64.standard_b64decode()' function only accepts bytes)
result = encode2bytes(data['result']['result'])
content = base64.standard_b64decode(result)
return io.BytesIO(content)
def list(self):
"""List available reports from the server.
It returns a dictionary with reports classified by data model:
.. doctest::
:options: +SKIP
>>> from odoorpc.tools import v
>>> inv_model = 'account.move'
>>> if v(VERSION) < v('13.0'):
... inv_model = 'account.invoice'
>>> odoo.report.list()[inv_model]
[{'name': u'Duplicates',
'report_name': u'account.account_invoice_report_duplicate_main',
'report_type': u'qweb-pdf'},
{'name': 'Invoices',
'report_type': 'qweb-pdf',
'report_name': 'account.report_invoice'}]
.. doctest::
:hide:
>>> from odoorpc.tools import v
>>> inv_model = 'account.move'
>>> if v(VERSION) < v('13.0'):
... inv_model = 'account.invoice'
>>> from pprint import pprint as pp
>>> any(data['report_name'] == 'account.report_invoice'
... for data in odoo.report.list()[inv_model])
True
*Python 2:*
:return: `list` of dictionaries
:raise: `urllib2.URLError` (connection error)
*Python 3:*
:return: `list` of dictionaries
:raise: `urllib.error.URLError` (connection error)
"""
report_model = 'ir.actions.report'
if v(self._odoo.version)[0] < 11:
report_model = 'ir.actions.report.xml'
IrReport = self._odoo.env[report_model]
report_ids = IrReport.search([])
reports = IrReport.read(
report_ids, ['name', 'model', 'report_name', 'report_type']
)
result = {}
for report in reports:
model = report.pop('model')
report.pop('id')
if model not in result:
result[model] = []
result[model].append(report)
return result

View File

@ -1,298 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""This module provides `Connector` classes to communicate with an `Odoo`
server with the `JSON-RPC` protocol or through simple HTTP requests.
Web controllers of `Odoo` expose two kinds of methods: `json` and `http`.
These methods can be accessed from the connectors of this module.
"""
import sys
from odoorpc.rpc import error, jsonrpclib
# Python 2
if sys.version_info[0] < 3:
from cookielib import CookieJar
from urllib2 import HTTPCookieProcessor, build_opener
# Python >= 3
else:
from http.cookiejar import CookieJar
from urllib.request import HTTPCookieProcessor, build_opener
class Connector(object):
"""Connector base class defining the interface used
to interact with a server.
"""
def __init__(self, host, port=8069, timeout=120, version=None):
self.host = host
try:
int(port)
except (ValueError, TypeError):
txt = "The port '{0}' is invalid. An integer is required."
txt = txt.format(port)
raise error.ConnectorError(txt)
else:
self.port = int(port)
self._timeout = timeout
self.version = version
@property
def ssl(self):
"""Return `True` if SSL is activated."""
return False
@property
def timeout(self):
"""Return the timeout."""
return self._timeout
@timeout.setter
def timeout(self, timeout):
"""Set the timeout."""
self._timeout = timeout
class ConnectorJSONRPC(Connector):
"""Connector class using the `JSON-RPC` protocol.
.. doctest::
:options: +SKIP
>>> from odoorpc import rpc
>>> cnt = rpc.ConnectorJSONRPC('localhost', port=8069)
.. doctest::
:hide:
>>> from odoorpc import rpc
>>> cnt = rpc.ConnectorJSONRPC(HOST, port=PORT)
Open a user session:
.. doctest::
:options: +SKIP
>>> cnt.proxy_json.web.session.authenticate(db='db_name', login='admin', password='password')
{'id': 51373612,
'jsonrpc': '2.0',
'result': {'company_id': 1,
'currencies': {'1': {'digits': [69, 2],
'position': 'after',
'symbol': '\u20ac'},
'3': {'digits': [69, 2],
'position': 'before',
'symbol': '$'}},
'db': 'db_name',
'is_admin': True,
'is_system': True,
'name': 'Mitchell Admin',
'partner_display_name': 'YourCompany, Mitchell Admin',
'partner_id': 3,
'server_version': '12.0',
'server_version_info': [12, 0, 0, 'final', 0, ''],
'session_id': '6dd7a34f16c1c67b38bfec413cca4962d5c01d53',
'show_effect': True,
'uid': 2,
'user_companies': False,
'user_context': {'lang': 'en_US',
'tz': 'Europe/Brussels',
'uid': 2},
'username': 'admin',
'web.base.url': 'http://localhost:8069',
'web_tours': []}}
.. doctest::
:hide:
:options: +NORMALIZE_WHITESPACE
>>> from odoorpc.tools import v
>>> data = cnt.proxy_json.web.session.authenticate(db=DB, login=USER, password=PWD)
>>> keys = ['company_id', 'db', 'session_id', 'uid', 'user_context', 'username']
>>> if v(VERSION) >= v('10.0'):
... keys.extend([
... 'currencies', 'is_admin', 'is_superuser', 'name',
... 'partner_id', 'server_version', 'server_version_info',
... 'user_companies', 'web.base.url', 'web_tours',
... ])
>>> if v(VERSION) >= v('11.0'):
... keys.extend([
... 'is_system',
... ])
... keys.remove('is_admin')
>>> if v(VERSION) >= v('12.0'):
... keys.extend([
... 'is_admin',
... 'partner_display_name',
... 'show_effect',
... ])
... keys.remove('is_superuser')
>>> if v(VERSION) >= v('13.0'):
... keys.extend([
... 'display_switch_company_menu',
... 'cache_hashes',
... ])
... keys.remove('session_id')
>>> if v(VERSION) >= v('14.0'):
... keys.extend([
... 'active_ids_limit',
... ])
>>> all([key in data['result'] for key in keys])
True
Read data of a partner:
.. doctest::
:options: +SKIP
>>> cnt.proxy_json.web.dataset.call(model='res.partner', method='read', args=[[1]])
{'jsonrpc': '2.0', 'id': 454236230,
'result': [{'id': 1, 'comment': False, 'ean13': False, 'property_account_position': False, ...}]}
.. doctest::
:hide:
>>> data = cnt.proxy_json.web.dataset.call(model='res.partner', method='read', args=[[1]])
>>> 'jsonrpc' in data and 'id' in data and 'result' in data
True
You can send requests this way too:
.. doctest::
:options: +SKIP
>>> cnt.proxy_json['/web/dataset/call'](model='res.partner', method='read', args=[[1]])
{'jsonrpc': '2.0', 'id': 328686288,
'result': [{'id': 1, 'comment': False, 'ean13': False, 'property_account_position': False, ...}]}
.. doctest::
:hide:
>>> data = cnt.proxy_json['/web/dataset/call'](model='res.partner', method='read', args=[[1]])
>>> 'jsonrpc' in data and 'id' in data and 'result' in data
True
Or like this:
.. doctest::
:options: +SKIP
>>> cnt.proxy_json['web']['dataset']['call'](model='res.partner', method='read', args=[[1]])
{'jsonrpc': '2.0', 'id': 102320639,
'result': [{'id': 1, 'comment': False, 'ean13': False, 'property_account_position': False, ...}]}
.. doctest::
:hide:
>>> data = cnt.proxy_json['web']['dataset']['call'](model='res.partner', method='read', args=[[1]])
>>> 'jsonrpc' in data and 'id' in data and 'result' in data
True
"""
def __init__(
self,
host,
port=8069,
timeout=120,
version=None,
deserialize=True,
opener=None,
):
super(ConnectorJSONRPC, self).__init__(host, port, timeout, version)
self.deserialize = deserialize
# One URL opener (with cookies handling) shared between
# JSON and HTTP requests
if opener is None:
cookie_jar = CookieJar()
opener = build_opener(HTTPCookieProcessor(cookie_jar))
self._opener = opener
self._proxy_json, self._proxy_http = self._get_proxies()
def _get_proxies(self):
"""Returns the :class:`ProxyJSON <odoorpc.rpc.jsonrpclib.ProxyJSON>`
and :class:`ProxyHTTP <odoorpc.rpc.jsonrpclib.ProxyHTTP>` instances
corresponding to the server version used.
"""
proxy_json = jsonrpclib.ProxyJSON(
self.host,
self.port,
self._timeout,
ssl=self.ssl,
deserialize=self.deserialize,
opener=self._opener,
)
proxy_http = jsonrpclib.ProxyHTTP(
self.host,
self.port,
self._timeout,
ssl=self.ssl,
opener=self._opener,
)
# Detect the server version
if self.version is None:
result = proxy_json('/web/webclient/version_info')['result']
if 'server_version' in result:
self.version = result['server_version']
return proxy_json, proxy_http
@property
def proxy_json(self):
"""Return the JSON proxy."""
return self._proxy_json
@property
def proxy_http(self):
"""Return the HTTP proxy."""
return self._proxy_http
@property
def timeout(self):
"""Return the timeout."""
return self._proxy_json._timeout
@timeout.setter
def timeout(self, timeout):
"""Set the timeout."""
self._proxy_json._timeout = timeout
self._proxy_http._timeout = timeout
class ConnectorJSONRPCSSL(ConnectorJSONRPC):
"""Connector class using the `JSON-RPC` protocol over `SSL`.
.. doctest::
:options: +SKIP
>>> from odoorpc import rpc
>>> cnt = rpc.ConnectorJSONRPCSSL('localhost', port=8069)
.. doctest::
:hide:
>>> if 'ssl' in PROTOCOL:
... from odoorpc import rpc
... cnt = rpc.ConnectorJSONRPCSSL(HOST, port=PORT)
"""
def __init__(
self,
host,
port=8069,
timeout=120,
version=None,
deserialize=True,
opener=None,
):
super(ConnectorJSONRPCSSL, self).__init__(
host, port, timeout, version, opener=opener
)
self._proxy_json, self._proxy_http = self._get_proxies()
@property
def ssl(self):
return True
PROTOCOLS = {'jsonrpc': ConnectorJSONRPC, 'jsonrpc+ssl': ConnectorJSONRPCSSL}

View File

@ -1,11 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
class ConnectorError(BaseException):
"""Exception raised by the ``odoorpc.rpc`` package."""
def __init__(self, message, odoo_traceback=None):
self.message = message
self.odoo_traceback = odoo_traceback

View File

@ -1,181 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""Provides the :class:`ProxyJSON` class for JSON-RPC requests."""
import copy
import json
import logging
import random
import sys
# Python 2
if sys.version_info[0] < 3:
from cookielib import CookieJar
from urllib2 import HTTPCookieProcessor, Request, build_opener
def encode_data(data):
return data
def decode_data(data):
return data
# Python >= 3
else:
import io
from http.cookiejar import CookieJar
from urllib.request import HTTPCookieProcessor, Request, build_opener
def encode_data(data):
try:
return bytes(data, 'utf-8')
except: # noqa: E722
return bytes(data)
def decode_data(data):
return io.StringIO(data.read().decode('utf-8'))
LOG_HIDDEN_JSON_PARAMS = ['password']
LOG_JSON_SEND_MSG = u"(JSON,send) %(url)s %(data)s"
LOG_JSON_RECV_MSG = u"(JSON,recv) %(url)s %(data)s => %(result)s"
LOG_HTTP_SEND_MSG = u"(HTTP,send) %(url)s%(data)s"
LOG_HTTP_RECV_MSG = u"(HTTP,recv) %(url)s%(data)s => %(result)s"
logger = logging.getLogger(__name__)
def get_json_log_data(data):
"""Returns a new `data` dictionary with hidden params
for log purpose.
"""
log_data = data
for param in LOG_HIDDEN_JSON_PARAMS:
if param in data['params']:
if log_data is data:
log_data = copy.deepcopy(data)
log_data['params'][param] = "**********"
return log_data
class Proxy(object):
"""Base class to implement a proxy to perform requests."""
def __init__(self, host, port, timeout=120, ssl=False, opener=None):
self._root_url = "{http}{host}:{port}".format(
http=(ssl and "https://" or "http://"), host=host, port=port
)
self._timeout = timeout
self._builder = URLBuilder(self)
self._opener = opener
if not opener:
cookie_jar = CookieJar()
self._opener = build_opener(HTTPCookieProcessor(cookie_jar))
def __getattr__(self, name):
return getattr(self._builder, name)
def __getitem__(self, url):
return self._builder[url]
def _get_full_url(self, url):
return '/'.join([self._root_url, url])
class ProxyJSON(Proxy):
"""The :class:`ProxyJSON` class provides a dynamic access
to all JSON methods.
"""
def __init__(
self, host, port, timeout=120, ssl=False, opener=None, deserialize=True
):
Proxy.__init__(self, host, port, timeout, ssl, opener)
self._deserialize = deserialize
def __call__(self, url, params=None):
if params is None:
params = {}
data = {
"jsonrpc": "2.0",
"method": "call",
"params": params,
"id": random.randint(0, 1000000000),
}
if url.startswith('/'):
url = url[1:]
full_url = self._get_full_url(url)
log_data = get_json_log_data(data)
logger.debug(LOG_JSON_SEND_MSG, {'url': full_url, 'data': log_data})
data_json = json.dumps(data)
request = Request(url=full_url, data=encode_data(data_json))
request.add_header('Content-Type', 'application/json')
response = self._opener.open(request, timeout=self._timeout)
if not self._deserialize:
return response
result = json.load(decode_data(response))
logger.debug(
LOG_JSON_RECV_MSG,
{'url': full_url, 'data': log_data, 'result': result},
)
return result
class ProxyHTTP(Proxy):
"""The :class:`ProxyHTTP` class provides a dynamic access
to all HTTP methods.
"""
def __call__(self, url, data=None, headers=None):
if url.startswith('/'):
url = url[1:]
full_url = self._get_full_url(url)
logger.debug(
LOG_HTTP_SEND_MSG,
{'url': full_url, 'data': data and u" (%s)" % data or u""},
)
kwargs = {'url': full_url}
if data:
kwargs['data'] = encode_data(data)
request = Request(**kwargs)
if headers:
for hkey in headers:
hvalue = headers[hkey]
request.add_header(hkey, hvalue)
response = self._opener.open(request, timeout=self._timeout)
logger.debug(
LOG_HTTP_RECV_MSG,
{
'url': full_url,
'data': data and u" (%s)" % data or u"",
'result': response,
},
)
return response
class URLBuilder(object):
"""Auto-builds an URL while getting its attributes.
Used by the :class:`ProxyJSON` and :class:`ProxyHTTP` classes.
"""
def __init__(self, rpc, url=None):
self._rpc = rpc
self._url = url
def __getattr__(self, path):
new_url = self._url and '/'.join([self._url, path]) or path
return URLBuilder(self._rpc, new_url)
def __getitem__(self, path):
if path and path[0] == '/':
path = path[1:]
if path and path[-1] == '/':
path = path[:-1]
return getattr(self, path)
def __call__(self, **kwargs):
return self._rpc(self._url, kwargs)
def __str__(self):
return self._url

View File

@ -1,190 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""This module contains some helper functions used to save and load sessions
in `OdooRPC`.
"""
import os
import stat
import sys
# Python 2
if sys.version_info[0] < 3:
from ConfigParser import SafeConfigParser as ConfigParser
# Python >= 3
else:
from configparser import ConfigParser
def get_all(rc_file='~/.odoorpcrc'):
"""Return all session configurations from the `rc_file` file.
>>> import odoorpc
>>> from pprint import pprint as pp
>>> pp(odoorpc.session.get_all()) # doctest: +SKIP
{'foo': {'database': 'db_name',
'host': 'localhost',
'passwd': 'password',
'port': 8069,
'protocol': 'jsonrpc',
'timeout': 120,
'type': 'ODOO',
'user': 'admin'},
...}
.. doctest::
:hide:
>>> import odoorpc
>>> session = '%s_session' % DB
>>> odoo.save(session)
>>> data = odoorpc.session.get_all()
>>> data[session]['host'] == HOST
True
>>> data[session]['protocol'] == PROTOCOL
True
>>> data[session]['port'] == int(PORT)
True
>>> data[session]['database'] == DB
True
>>> data[session]['user'] == USER
True
>>> data[session]['passwd'] == PWD
True
>>> data[session]['type'] == 'ODOO'
True
"""
conf = ConfigParser()
conf.read([os.path.expanduser(rc_file)])
sessions = {}
for name in conf.sections():
sessions[name] = {
'type': conf.get(name, 'type'),
'host': conf.get(name, 'host'),
'protocol': conf.get(name, 'protocol'),
'port': conf.getint(name, 'port'),
'timeout': conf.getfloat(name, 'timeout'),
'user': conf.get(name, 'user'),
'passwd': conf.get(name, 'passwd'),
'database': conf.get(name, 'database'),
}
return sessions
def get(name, rc_file='~/.odoorpcrc'):
"""Return the session configuration identified by `name`
from the `rc_file` file.
>>> import odoorpc
>>> from pprint import pprint as pp
>>> pp(odoorpc.session.get('foo')) # doctest: +SKIP
{'database': 'db_name',
'host': 'localhost',
'passwd': 'password',
'port': 8069,
'protocol': 'jsonrpc',
'timeout': 120,
'type': 'ODOO',
'user': 'admin'}
.. doctest::
:hide:
>>> import odoorpc
>>> session = '%s_session' % DB
>>> odoo.save(session)
>>> data = odoorpc.session.get(session)
>>> data['host'] == HOST
True
>>> data['protocol'] == PROTOCOL
True
>>> data['port'] == int(PORT)
True
>>> data['database'] == DB
True
>>> data['user'] == USER
True
>>> data['passwd'] == PWD
True
>>> data['type'] == 'ODOO'
True
:raise: `ValueError` (wrong session name)
"""
conf = ConfigParser()
conf.read([os.path.expanduser(rc_file)])
if not conf.has_section(name):
raise ValueError(
"'{}' session does not exist in {}".format(name, rc_file)
)
return {
'type': conf.get(name, 'type'),
'host': conf.get(name, 'host'),
'protocol': conf.get(name, 'protocol'),
'port': conf.getint(name, 'port'),
'timeout': conf.getfloat(name, 'timeout'),
'user': conf.get(name, 'user'),
'passwd': conf.get(name, 'passwd'),
'database': conf.get(name, 'database'),
}
def save(name, data, rc_file='~/.odoorpcrc'):
"""Save the `data` session configuration under the name `name`
in the `rc_file` file.
>>> import odoorpc
>>> odoorpc.session.save(
... 'foo',
... {'type': 'ODOO', 'host': 'localhost', 'protocol': 'jsonrpc',
... 'port': 8069, 'timeout': 120, 'database': 'db_name'
... 'user': 'admin', 'passwd': 'password'}) # doctest: +SKIP
.. doctest::
:hide:
>>> import odoorpc
>>> session = '%s_session' % DB
>>> odoorpc.session.save(
... session,
... {'type': 'ODOO', 'host': HOST, 'protocol': PROTOCOL,
... 'port': PORT, 'timeout': 120, 'database': DB,
... 'user': USER, 'passwd': PWD})
"""
conf = ConfigParser()
conf.read([os.path.expanduser(rc_file)])
if not conf.has_section(name):
conf.add_section(name)
for key in data:
value = data[key]
conf.set(name, key, str(value))
with open(os.path.expanduser(rc_file), 'w') as file_:
os.chmod(os.path.expanduser(rc_file), stat.S_IREAD | stat.S_IWRITE)
conf.write(file_)
def remove(name, rc_file='~/.odoorpcrc'):
"""Remove the session configuration identified by `name`
from the `rc_file` file.
>>> import odoorpc
>>> odoorpc.session.remove('foo') # doctest: +SKIP
.. doctest::
:hide:
>>> import odoorpc
>>> session = '%s_session' % DB
>>> odoorpc.session.remove(session)
:raise: `ValueError` (wrong session name)
"""
conf = ConfigParser()
conf.read([os.path.expanduser(rc_file)])
if not conf.has_section(name):
raise ValueError(
"'{}' session does not exist in {}".format(name, rc_file)
)
conf.remove_section(name)
with open(os.path.expanduser(rc_file), 'wb') as file_:
conf.write(file_)

View File

@ -1,125 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Sébastien Alix
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl)
"""This module contains the :class:`Config <odoorpc.config.Config>` class which
manage the configuration related to an instance of
:class:`ODOO <odoorpc.ODOO>`, and some useful helper functions used internally
in `OdooRPC`.
"""
try:
from collections.abc import MutableMapping
except ImportError: # Python 2.7 compatibility
from collections import MutableMapping
import re
from .error import InternalError
MATCH_VERSION = re.compile(r'[^\d.]')
class Config(MutableMapping):
"""Class which manage the configuration of an
:class:`ODOO <odoorpc.ODOO>` instance.
.. note::
This class have to be used through the :attr:`odoorpc.ODOO.config`
property.
>>> import odoorpc
>>> odoo = odoorpc.ODOO('localhost') # doctest: +SKIP
>>> type(odoo.config)
<class 'odoorpc.tools.Config'>
"""
def __init__(self, odoo, options):
super(Config, self).__init__()
self._odoo = odoo
self._options = options or {}
def __getitem__(self, key):
return self._options[key]
def __setitem__(self, key, value):
"""Handle ``timeout`` option to set the timeout on the connector."""
if key == 'timeout':
self._odoo._connector.timeout = value
self._options[key] = value
def __delitem__(self, key):
raise InternalError("Operation not allowed")
def __iter__(self):
return self._options.__iter__()
def __len__(self):
return len(self._options)
def __str__(self):
return self._options.__str__()
def __repr__(self):
return self._options.__repr__()
def clean_version(version):
"""Clean a version string.
>>> from odoorpc.tools import clean_version
>>> clean_version('7.0alpha-20121206-000102')
'7.0'
:return: a cleaner version string
"""
version = MATCH_VERSION.sub('', version.split('-')[0])
return version
def v(version):
"""Convert a version string to a tuple. The tuple can be use to compare
versions between them.
>>> from odoorpc.tools import v
>>> v('7.0')
[7, 0]
>>> v('6.1')
[6, 1]
>>> v('7.0') < v('6.1')
False
:return: the version as tuple
"""
return [int(x) for x in clean_version(version).split(".")]
def get_encodings(hint_encoding='utf-8'):
"""Used to try different encoding.
Function copied from Odoo 11.0 (odoo.loglevels.get_encodings).
This piece of code is licensed under the LGPL-v3 and so it is compatible
with the LGPL-v3 license of OdooRPC::
- https://github.com/odoo/odoo/blob/11.0/LICENSE
- https://github.com/odoo/odoo/blob/11.0/COPYRIGHT
"""
fallbacks = {
'latin1': 'latin9',
'iso-8859-1': 'iso8859-15',
'cp1252': '1252',
}
if hint_encoding:
yield hint_encoding
if hint_encoding.lower() in fallbacks:
yield fallbacks[hint_encoding.lower()]
# some defaults (also taking care of pure ASCII)
for charset in ['utf8', 'latin1', 'ascii']:
if not hint_encoding or (charset.lower() != hint_encoding.lower()):
yield charset
from locale import getpreferredencoding
prefenc = getpreferredencoding()
if prefenc and prefenc.lower() != 'utf-8':
yield prefenc
prefenc = fallbacks.get(prefenc.lower())
if prefenc:
yield prefenc

Some files were not shown because too many files have changed in this diff Show More