# -*- coding: utf-8 -*-
"""
The cerebro.db module provides access to :ref:`remote database for SQL queries execution <sapi-sql>`.
.. rubric:: Classes
* :py:class:`Db <cerebro.db.Db>`
"""
import psycopg2
import threading
import py_cerebro_db
import collections
import time
from psycopg2.extras import DictCursor
import requests, json, iso8601, datetime
# PostgreSQL types
TYPES_INT = {'integer', 'serial', 'smallint', 'int4', 'int2', 'int8'}
TYPES_FLOAT = {'numeric', 'decimal', 'real', 'double precision', 'float4', 'float8'}
TYPES_DATETIME = {'timestamptz'}
# PostgreSQL error codes that require another try
RECONNECT_ERROR_CODES = {'08000', '08003', '08006', '08001', '08004', '08007', '08P01'}
[docs]def json_serial(obj):
"""
json serializer for unsupported by default types.
"""
if isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, set):
return list(obj)
raise TypeError("Type %s is not JSON serializable" % type(obj))
def executeFormat(val_str):
ret_str = ''
ints = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
i = 0
while i < len(val_str):
if val_str[i] != '$':
ret_str += val_str[i]
i += 1
else:
ret_str += '%s'
i += 1
while val_str[i] in ints:
i += 1
return ret_str
class Set_to_sql_arr:
def __init__(self, obj_set):
self.obj = obj_set
def getquoted(self):
sql_str = "'{"
for i in self.obj:
if sql_str != "'{":
sql_str += ','
sql_str += '%s' % i
sql_str += "}'"
return sql_str
[docs]class Db():
"""
The class of access to remote Cerebro database.
.. rubric:: Methods
* :py:meth:`execute() <cerebro.db.Db.execute>`
* :py:meth:`is_connected() <cerebro.db.Db.is_connected>`
* :py:meth:`url() <cerebro.db.Db.url>`
You can work with Cerebro database directly,
executing SQL read/write queries.
The access to the data is provided by means of procedures stored on the database side.
See more detailed info on the procedures in the :ref:`sapi-sql` chapter.
::
db = cerebro.db.Db()
res = db.execute('select "listProjects_01"(%s,%s)', false, true) # executing a query to get a list of projects
print('The list of projects', res) # printing the result
"""
def __init__(self, db_timeout = 5, db_reconn_count = 3):
self.is_rpc = False
self.sid = py_cerebro_db.session_id()
self.__setup_urls()
self.db_timeout = db_timeout
self.db_reconn_count = db_reconn_count
self.db = None
self.dbcon = None
self.query_id = 0
if self.sid == 0:
raise Exception('Erorr connect to database. Session identificator is null\n Use to check cerebro.core.is_logon() function')
psycopg2.extensions.register_adapter(set, Set_to_sql_arr)
def __del__(self):
self.__disconnectDB()
def __setup_urls(self):
self.is_rpc = False
db_url = py_cerebro_db.url()
urls = {}
try:
urls = json.loads(db_url)
self.is_rpc = urls.get("primary", None) is not None and urls.get("secondary", None) is not None
except:
pass
if self.is_rpc:
self.connect_url = [ urls["primary"], urls["secondary"] ]
else:
self.connect_url = []
self.connect_url.append(db_url.split('//')[1].split('@')[0].split(':')[0]) # User
self.connect_url.append(db_url.split('//')[1].split('@')[0].split(':')[1]) # Password
self.connect_url.append(db_url.split('//')[1].split('@')[1].split('/')[0].split(':')[0]) # Url
self.connect_url.append(db_url.split('//')[1].split('@')[1].split('/')[0].split(':')[1]) # Port
self.connect_url.append(db_url.split('//')[1].split('@')[1].split('/')[1]) # Db
def __disconnectDB(self):
if self.db != None and self.db.closed == False:
self.db.close()
if self.dbcon != None and self.dbcon.closed == False:
self.dbcon.close()
def __connectDB(self):
self.__disconnectDB()
if self.is_rpc:
pass
else:
self.dbcon = psycopg2.connect(host=self.connect_url[2], port=self.connect_url[3], database=self.connect_url[4], user=self.connect_url[0], password=self.connect_url[1], cursor_factory=DictCursor)
self.dbcon.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Automatic transaction commit
self.db = self.dbcon.cursor()
[docs] def execute(self, query, *parameters):
"""
:param string query: SQL query string.
:param parameters: query parameters.
:returns: query result.
:rtype: list(tuple,)
Executes the query and returns the result as a list of tuples.
The query arguments have the format %s.
::
db = cerebro.db.Db()
projects = db.execute('select "listProjects_01"(%s,%s)', false, true) # executing a query to get a list of projects
print('Список проектов', projects) # printing the result
::
db = cerebro.db.Db()
# executing a query to set task progress to 50%
projects = db.execute('select "taskSetProgress_a"(%s,%s)', {task_id,}, 50)
"""
return self.z_execute(False, query, *parameters)
def z_execute(self, read_only, query, *parameters):
if self.is_rpc:
return self.execute_rpc(read_only, query, *parameters)
else:
return self.execute_sql(query, *parameters)
def execute_sql(self, query, *parameters):
self.__connectDB()
try:
pars = (self.sid,) + parameters
self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars)
except psycopg2.Error as err:
if err.pgcode in {'08000', '08003', '08006', '08001', '08004', '08007', '08P01'} or \
err.pgerror == 'server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n':
showError = True
for x in range(0, self.db_reconn_count):
try:
self.__connectDB()
pars = (self.sid,) + parameters
self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars)
showError = False
break
except Exception as err:
if err.pgcode not in {'08000', '08003', '08006', '08001', '08004', '08007', '08P01'} and \
err.pgerror != 'server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n':
raise
time.sleep(5)
if showError:
raise Exception('Connection Error')
else:
raise
table = None
try:
table = self.db.fetchall()
except psycopg2.Error as err:
if str(err) == 'cursor already closed':
self.__connectDB()
self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars)
table = self.db.fetchall()
else:
raise
self.__disconnectDB()
return table
[docs] def execute_rpc(self, read_only, query, *parameters):
"""
:param bool read_only: check if query only reads data from DB.
:param string query: query text.
:param parameters: query parameters list.
Executes the query and returns the result. The result has a form of a table (list pf tuples).
"""
ret = []
if not self.is_connected():
raise Exception('You are not connected to Cerebro DB!')
args = [query.replace('%s', '?')]
args.extend(parameters)
header = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Accept': 'application/json-rpc',
'Cache-Control': 'no-store'
}
payload = {
'method': 'queryMulti',
'jsonrpc': '2.0',
'params': args,
'id': self.query_id
}
cookies = {
'token': str(self.sid)
}
res = None
error_msg = ""
for i in range(self.db_reconn_count):
try:
response = requests.post(self.connect_url[1] if read_only else self.connect_url[0], headers=header, cookies=cookies,
data=json.dumps(payload, default=json_serial), timeout=self.db_timeout)
self.query_id += 1
if response.status_code == 200:
res = json.loads(response.text)
err = res.get("error", None)
if err is None: break
# Internal server error occured
res = None
error_msg = "{0} : {1}".format(err["code"], err["message"])
if err["code"] in RECONNECT_ERROR_CODES: continue
elif err["message"].startswith("sqlmsg--") and err["message"].endswith("#0--"):
raise Exception(err["message"][8:-4])
elif err["message"].startswith("sqlmsg--") and err["message"].endswith("#1--"):
raise Exception(err["message"][8:-4])
break
else:
# Http request error occured
error_msg = "Connection error {0} : {1}".format(response.status_code, response.reason)
except Exception as e:
error_msg = str(e)
if res is not None:
if len(res.get("result", [])) > 0:
for i, row in enumerate(res["result"][0]["rows"]):
ret.append([])
for j, col in enumerate(row):
type = res["result"][0]["columns"][j]["type"]
if col is None:
ret[i].append(col)
elif type in TYPES_INT:
ret[i].append(int(col))
elif type in TYPES_FLOAT:
ret[i].append(float(col))
elif type in TYPES_DATETIME:
ret[i].append(iso8601.parse_date(col))
else:
ret[i].append(col)
else:
raise Exception(error_msg)
return ret
[docs] def is_connected(self):
"""
:returns: True, if connection to database is established.
:rtype: bool
"""
return py_cerebro_db.is_connected() != 0
[docs] def url(self):
"""
:returns: string locator of a remote database.
:rtype: string
"""
return py_cerebro_db.url()