# -*- coding: utf-8 -*-
"""
Модуль cerebro.db предоставляет доступ к :ref:`удаленной базе данных для выполнения SQL-запросов <sapi-sql>`.
.. rubric:: Классы
* :py:class:`Db <cerebro.db.Db>`
"""
import psycopg2
import threading
import py_cerebro_db
import collections
import time
from psycopg2.extras import DictCursor
def executeFormat(val_str):
ret_str = ''
ints = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
i = 0
while i < len(val_str):
if val_str[i] != '$':
ret_str += val_str[i]
i += 1
else:
ret_str += '%s'
i += 1
while val_str[i] in ints:
i += 1
return ret_str
def parseUrl():
# Url example pq://sa_web:web@ss.cerebrohq.com:5432/memoria
retVal = []
connect_url = py_cerebro_db.url()
retVal.append(connect_url.split('//')[1].split('@')[0].split(':')[0]) # User
retVal.append(connect_url.split('//')[1].split('@')[0].split(':')[1]) # Password
retVal.append(connect_url.split('//')[1].split('@')[1].split('/')[0].split(':')[0]) # Url
retVal.append(connect_url.split('//')[1].split('@')[1].split('/')[0].split(':')[1]) # Port
retVal.append(connect_url.split('//')[1].split('@')[1].split('/')[1]) # Db
return retVal
class Set_to_sql_arr:
def __init__(self, obj_set):
self.obj = obj_set
def getquoted(self):
sql_str = "'{"
for i in self.obj:
if sql_str != "'{":
sql_str += ','
sql_str += '%s' % i
sql_str += "}'"
return sql_str
[документация]class Db():
"""
Класс доступа к удаленной базе данных Cerebro.
.. rubric:: Методы
* :py:meth:`execute() <cerebro.db.Db.execute>`
* :py:meth:`is_connected() <cerebro.db.Db.is_connected>`
* :py:meth:`url() <cerebro.db.Db.url>`
Вы можете напрямую работать с базой данных Cerebro,
выполняя запросы на чтение и изменение данных на языке SQL.
Доступ к данным осуществляется с помощью хранимых процедур на стороне базы данных.
Подробнее о процедурах смотрите раздел :ref:`sapi-sql`.
::
db = cerebro.db.Db()
res = db.execute('select "listProjects_01"(%s,%s)', false, true) # выполняем запрос на список проектов
print('Список проектов', res) # печатаем результат
"""
def __init__(self, db_timeout = 5, db_reconn_count = 3):
self.sid = py_cerebro_db.session_id()
self.db_timeout = db_timeout
self.db_reconn_count = db_reconn_count
self.disconnected_by_timer = False
self.dont_auto_disconnect = False
if self.sid == 0:
raise Exception('Erorr connect to database. Session identificator is null\n Use to check cerebro.core.is_logon() function')
psycopg2.extensions.register_adapter(set, Set_to_sql_arr)
connect_url = parseUrl()
self.dbcon = psycopg2.connect(host=connect_url[2], port=connect_url[3], database=connect_url[4], user=connect_url[0], password=connect_url[1], cursor_factory=DictCursor)
self.dbcon.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Automatic transaction commit
self.db = self.dbcon.cursor()
self.disconnectTask = threading.Timer(self.db_timeout, self.__disconnectDB)
self.disconnectTask.start()
def __disconnectDB(self):
if self.dont_auto_disconnect:
return
if self.db != None and self.db.closed == False:
self.db.close()
if self.dbcon != None and self.dbcon.closed == False:
self.dbcon.close()
self.disconnected_by_timer = True
[документация] def execute(self, query, *parameters):
"""
:param string query: строка SQL-запроса.
:param parameters: параметры запроса.
:returns: результат запроса.
:rtype: list(tuple,)
Выполняет запрос и возвращает результат в виде списка кортежей.
Аргументы запроса задаются в формате %s.
::
db = cerebro.db.Db()
projects = db.execute('select "listProjects_01"(%s,%s)', false, true) # выполняем запрос на список проектов
print('Список проектов', projects) # печатаем результат
::
db = cerebro.db.Db()
# выполняем запрос на установку прогресса задачи в 50%
projects = db.execute('select "taskSetProgress_a"(%s,%s)', {task_id,}, 50)
"""
if self.disconnected_by_timer:
self.__init__(self.db_timeout)
try:
pars = (self.sid,) + parameters
self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars)
except psycopg2.Error as err:
if err.pgcode in {'08000', '08003', '08006', '08001', '08004', '08007', '08P01'} or \
err.args[0] == 'server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n':
self.dont_auto_disconnect = True
showError = True
for x in range(0, self.db_reconn_count):
try:
self.__init__(self.db_timeout, self.db_reconn_count)
pars = (self.sid,) + parameters
self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars)
showError = False
break
except Exception as err:
print('Reconnection attempt: ' + str(x + 1))
time.sleep(5)
self.dont_auto_disconnect = False
if showError:
raise Exception('Reconnection Error!')
else:
raise Exception(err.args)
self.disconnectTask.cancel()
self.disconnectTask = threading.Timer(self.db_timeout, self.__disconnectDB)
self.disconnectTask.start()
return self.db.fetchall()
[документация] def is_connected(self):
"""
:returns: True, если соединение c базой данных установлено.
:rtype: bool
"""
return py_cerebro_db.is_connected() != 0
[документация] def url(self):
"""
:returns: строковый локатор удаленной базы данных.
:rtype: string
"""
return py_cerebro_db.url()