Source code for cerebro.db

# -*- coding: utf-8 -*-
"""
The cerebro.db module provides access to :ref:`remote database for SQL queries execution <sapi-sql>`.

.. rubric:: Classes

* :py:class:`Db <cerebro.db.Db>`
"""


import psycopg2
import threading
import py_cerebro_db
import collections
import time
from psycopg2.extras import DictCursor
import requests, json, iso8601, datetime

# PostgreSQL types
TYPES_INT		= {'integer', 'serial', 'smallint', 'int4', 'int2', 'int8'}
TYPES_FLOAT		= {'numeric', 'decimal', 'real', 'double precision', 'float4', 'float8'}
TYPES_DATETIME	= {'timestamptz'}

# PostgreSQL error codes that require another try
RECONNECT_ERROR_CODES = {'08000', '08003', '08006', '08001', '08004', '08007', '08P01'}

[docs]def json_serial(obj): """ json serializer for unsupported by default types. """ if isinstance(obj, (datetime.datetime, datetime.date)): return obj.isoformat() elif isinstance(obj, set): return list(obj) raise TypeError("Type %s is not JSON serializable" % type(obj))
def executeFormat(val_str): ret_str = '' ints = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'} i = 0 while i < len(val_str): if val_str[i] != '$': ret_str += val_str[i] i += 1 else: ret_str += '%s' i += 1 while val_str[i] in ints: i += 1 return ret_str class Set_to_sql_arr: def __init__(self, obj_set): self.obj = obj_set def getquoted(self): sql_str = "'{" for i in self.obj: if sql_str != "'{": sql_str += ',' sql_str += '%s' % i sql_str += "}'" return sql_str
[docs]class Db(): """ The class of access to remote Cerebro database. .. rubric:: Methods * :py:meth:`execute() <cerebro.db.Db.execute>` * :py:meth:`is_connected() <cerebro.db.Db.is_connected>` * :py:meth:`url() <cerebro.db.Db.url>` You can work with Cerebro database directly, executing SQL read/write queries. The access to the data is provided by means of procedures stored on the database side. See more detailed info on the procedures in the :ref:`sapi-sql` chapter. :: db = cerebro.db.Db() res = db.execute('select "listProjects_01"(%s,%s)', false, true) # executing a query to get a list of projects print('The list of projects', res) # printing the result """ def __init__(self, db_timeout = 5, db_reconn_count = 3): self.is_rpc = False self.sid = py_cerebro_db.session_id() self.__setup_urls() self.db_timeout = db_timeout self.db_reconn_count = db_reconn_count self.db = None self.dbcon = None self.query_id = 0 if self.sid == 0: raise Exception('Erorr connect to database. Session identificator is null\n Use to check cerebro.core.is_logon() function') psycopg2.extensions.register_adapter(set, Set_to_sql_arr) def __del__(self): self.__disconnectDB() def __setup_urls(self): self.is_rpc = False db_url = py_cerebro_db.url() urls = {} try: urls = json.loads(db_url) self.is_rpc = urls.get("primary", None) is not None and urls.get("secondary", None) is not None except: pass if self.is_rpc: self.connect_url = [ urls["primary"], urls["secondary"] ] else: self.connect_url = [] self.connect_url.append(db_url.split('//')[1].split('@')[0].split(':')[0]) # User self.connect_url.append(db_url.split('//')[1].split('@')[0].split(':')[1]) # Password self.connect_url.append(db_url.split('//')[1].split('@')[1].split('/')[0].split(':')[0]) # Url self.connect_url.append(db_url.split('//')[1].split('@')[1].split('/')[0].split(':')[1]) # Port self.connect_url.append(db_url.split('//')[1].split('@')[1].split('/')[1]) # Db def __disconnectDB(self): if self.db != None and self.db.closed == False: self.db.close() if self.dbcon != None and self.dbcon.closed == False: self.dbcon.close() def __connectDB(self): self.__disconnectDB() if self.is_rpc: pass else: self.dbcon = psycopg2.connect(host=self.connect_url[2], port=self.connect_url[3], database=self.connect_url[4], user=self.connect_url[0], password=self.connect_url[1], cursor_factory=DictCursor) self.dbcon.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Automatic transaction commit self.db = self.dbcon.cursor()
[docs] def execute(self, query, *parameters): """ :param string query: SQL query string. :param parameters: query parameters. :returns: query result. :rtype: list(tuple,) Executes the query and returns the result as a list of tuples. The query arguments have the format %s. :: db = cerebro.db.Db() projects = db.execute('select "listProjects_01"(%s,%s)', false, true) # executing a query to get a list of projects print('Список проектов', projects) # printing the result :: db = cerebro.db.Db() # executing a query to set task progress to 50% projects = db.execute('select "taskSetProgress_a"(%s,%s)', {task_id,}, 50) """ return self.z_execute(False, query, *parameters)
def z_execute(self, read_only, query, *parameters): if self.is_rpc: return self.execute_rpc(read_only, query, *parameters) else: return self.execute_sql(query, *parameters) def execute_sql(self, query, *parameters): self.__connectDB() try: pars = (self.sid,) + parameters self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars) except psycopg2.Error as err: if err.pgcode in {'08000', '08003', '08006', '08001', '08004', '08007', '08P01'} or \ err.pgerror == 'server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n': showError = True for x in range(0, self.db_reconn_count): try: self.__connectDB() pars = (self.sid,) + parameters self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars) showError = False break except Exception as err: if err.pgcode not in {'08000', '08003', '08006', '08001', '08004', '08007', '08P01'} and \ err.pgerror != 'server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n': raise time.sleep(5) if showError: raise Exception('Connection Error') else: raise table = None try: table = self.db.fetchall() except psycopg2.Error as err: if str(err) == 'cursor already closed': self.__connectDB() self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars) table = self.db.fetchall() else: raise self.__disconnectDB() return table
[docs] def execute_rpc(self, read_only, query, *parameters): """ :param bool read_only: check if query only reads data from DB. :param string query: query text. :param parameters: query parameters list. Executes the query and returns the result. The result has a form of a table (list pf tuples). """ ret = [] if not self.is_connected(): raise Exception('You are not connected to Cerebro DB!') args = [query.replace('%s', '?')] args.extend(parameters) header = { 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Accept': 'application/json-rpc', 'Cache-Control': 'no-store' } payload = { 'method': 'queryMulti', 'jsonrpc': '2.0', 'params': args, 'id': self.query_id } cookies = { 'token': str(self.sid) } res = None error_msg = "" for i in range(self.db_reconn_count): try: response = requests.post(self.connect_url[1] if read_only else self.connect_url[0], headers=header, cookies=cookies, data=json.dumps(payload, default=json_serial), timeout=self.db_timeout) self.query_id += 1 if response.status_code == 200: res = json.loads(response.text) err = res.get("error", None) if err is None: break # Internal server error occured res = None error_msg = "{0} : {1}".format(err["code"], err["message"]) if err["code"] in RECONNECT_ERROR_CODES: continue elif err["message"].startswith("sqlmsg--") and err["message"].endswith("#0--"): raise Exception(err["message"][8:-4]) elif err["message"].startswith("sqlmsg--") and err["message"].endswith("#1--"): raise Exception(err["message"][8:-4]) break else: # Http request error occured error_msg = "Connection error {0} : {1}".format(response.status_code, response.reason) except Exception as e: error_msg = str(e) if res is not None: if len(res.get("result", [])) > 0: for i, row in enumerate(res["result"][0]["rows"]): ret.append([]) for j, col in enumerate(row): type = res["result"][0]["columns"][j]["type"] if col is None: ret[i].append(col) elif type in TYPES_INT: ret[i].append(int(col)) elif type in TYPES_FLOAT: ret[i].append(float(col)) elif type in TYPES_DATETIME: ret[i].append(iso8601.parse_date(col)) else: ret[i].append(col) else: raise Exception(error_msg) return ret
[docs] def is_connected(self): """ :returns: True, if connection to database is established. :rtype: bool """ return py_cerebro_db.is_connected() != 0
[docs] def url(self): """ :returns: string locator of a remote database. :rtype: string """ return py_cerebro_db.url()