Исходный код cerebro.db

# -*- coding: utf-8 -*-
"""
Модуль cerebro.db предоставляет доступ к :ref:`удаленной базе данных для выполнения SQL-запросов <sapi-sql>`.

.. rubric:: Классы

* :py:class:`Db <cerebro.db.Db>`
"""


import psycopg2
import threading
import py_cerebro_db
import collections
import time
from psycopg2.extras import DictCursor

def executeFormat(val_str):
	ret_str = ''
	ints = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
	i = 0
	while i < len(val_str):
		if val_str[i] != '$':
			ret_str += val_str[i]
			i += 1
		else:
			ret_str += '%s'
			i += 1
			while val_str[i] in ints:
				i += 1
	return ret_str

def parseUrl():
	# Url example pq://sa_web:web@ss.cerebrohq.com:5432/memoria
	retVal = []
	connect_url = py_cerebro_db.url()
	retVal.append(connect_url.split('//')[1].split('@')[0].split(':')[0]) # User
	retVal.append(connect_url.split('//')[1].split('@')[0].split(':')[1]) # Password
	retVal.append(connect_url.split('//')[1].split('@')[1].split('/')[0].split(':')[0]) # Url
	retVal.append(connect_url.split('//')[1].split('@')[1].split('/')[0].split(':')[1]) # Port
	retVal.append(connect_url.split('//')[1].split('@')[1].split('/')[1]) # Db
	
	return retVal

class Set_to_sql_arr:
	def __init__(self, obj_set):
		self.obj = obj_set

	def getquoted(self):
		sql_str = "'{"
		for i in self.obj:
			if sql_str != "'{":
				sql_str += ','

			sql_str += '%s' % i

		sql_str += "}'"
		return sql_str

[документация]class Db(): """ Класс доступа к удаленной базе данных Cerebro. .. rubric:: Методы * :py:meth:`execute() <cerebro.db.Db.execute>` * :py:meth:`is_connected() <cerebro.db.Db.is_connected>` * :py:meth:`url() <cerebro.db.Db.url>` Вы можете напрямую работать с базой данных Cerebro, выполняя запросы на чтение и изменение данных на языке SQL. Доступ к данным осуществляется с помощью хранимых процедур на стороне базы данных. Подробнее о процедурах смотрите раздел :ref:`sapi-sql`. :: db = cerebro.db.Db() res = db.execute('select "listProjects_01"(%s,%s)', false, true) # выполняем запрос на список проектов print('Список проектов', res) # печатаем результат """ def __init__(self, db_timeout = 5, db_reconn_count = 3): self.sid = py_cerebro_db.session_id() self.db_timeout = db_timeout self.db_reconn_count = db_reconn_count self.disconnected_by_timer = False self.dont_auto_disconnect = False if self.sid == 0: raise Exception('Erorr connect to database. Session identificator is null\n Use to check cerebro.core.is_logon() function') psycopg2.extensions.register_adapter(set, Set_to_sql_arr) connect_url = parseUrl() self.dbcon = psycopg2.connect(host=connect_url[2], port=connect_url[3], database=connect_url[4], user=connect_url[0], password=connect_url[1], cursor_factory=DictCursor) self.dbcon.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Automatic transaction commit self.db = self.dbcon.cursor() self.disconnectTask = threading.Timer(self.db_timeout, self.__disconnectDB) self.disconnectTask.start() def __disconnectDB(self): if self.dont_auto_disconnect: return if self.db != None and self.db.closed == False: self.db.close() if self.dbcon != None and self.dbcon.closed == False: self.dbcon.close() self.disconnected_by_timer = True
[документация] def execute(self, query, *parameters): """ :param string query: строка SQL-запроса. :param parameters: параметры запроса. :returns: результат запроса. :rtype: list(tuple,) Выполняет запрос и возвращает результат в виде списка кортежей. Аргументы запроса задаются в формате %s. :: db = cerebro.db.Db() projects = db.execute('select "listProjects_01"(%s,%s)', false, true) # выполняем запрос на список проектов print('Список проектов', projects) # печатаем результат :: db = cerebro.db.Db() # выполняем запрос на установку прогресса задачи в 50% projects = db.execute('select "taskSetProgress_a"(%s,%s)', {task_id,}, 50) """ if self.disconnected_by_timer: self.__init__(self.db_timeout) try: pars = (self.sid,) + parameters self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars) except psycopg2.Error as err: if err.pgcode in {'08000', '08003', '08006', '08001', '08004', '08007', '08P01'} or \ err.args[0] == 'server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n': self.dont_auto_disconnect = True showError = True for x in range(0, self.db_reconn_count): try: self.__init__(self.db_timeout, self.db_reconn_count) pars = (self.sid,) + parameters self.db.execute('select "webResume2"(%s);' + executeFormat(query), pars) showError = False break except Exception as err: print('Reconnection attempt: ' + str(x + 1)) time.sleep(5) self.dont_auto_disconnect = False if showError: raise Exception('Reconnection Error!') else: raise Exception(err.args) self.disconnectTask.cancel() self.disconnectTask = threading.Timer(self.db_timeout, self.__disconnectDB) self.disconnectTask.start() return self.db.fetchall()
[документация] def is_connected(self): """ :returns: True, если соединение c базой данных установлено. :rtype: bool """ return py_cerebro_db.is_connected() != 0
[документация] def url(self): """ :returns: строковый локатор удаленной базы данных. :rtype: string """ return py_cerebro_db.url()