Blame | Last modification | View Log | RSS feed
"""
Functions and classes to handle databases
"""
from psycopg2 import connect,threadsafety,paramstyle
import psycopg2 as pgdb
#from pyPgSQL.PgSQL import concnect,threadsafety,paramstyle
import threading
import random
from . import config
#def TimestampFromTicks(ticks):
# return pgdb.TimestampFromTicks(ticks)
class ConnectionPool:
"""
Implements a db connection pool
"""
def __init__(self,**args):
self.pool = []
self.glock = threading.Lock()
self.rlock = threading.Lock()
self.notifier = threading.Condition()
self.cursormap = {}
self.args=args
def connect(self):
return pgdb.connect(**self.args)
def get_conn(self):
"""
get a database connection
"""
self.glock.acquire()
while len(self.pool)==0:
self.notifier.acquire()
self.notifier.wait()
self.notifier.release()
r = self.pool.pop()
self.glock.release()
# Hack for pgdb
try:
r.rollback()
except:
pass
return r
def free_conn(self,c):
"""
free a connection aquired by get_con()
"""
try:
c.rollback()
except:
pass
self.rlock.acquire()
self.notifier.acquire()
self.pool.append(c)
self.notifier.notify()
self.notifier.release()
self.rlock.release()
def get_cursor(self):
db = self.get_conn()
cursor = db.cursor()
self.cursormap[cursor]=db
return cursor
def free_cursor(self,cursor,commit=True):
db = self.cursormap[cursor]
del self.cursormap[cursor]
cursor.close()
if commit:
db.commit()
else:
db.rollback()
self.free_conn(db)
def create(self,poolsize):
for i in range(0,poolsize):
c = self.connect ()
self.pool.append(c)
def check_conn(self,conn):
"""
Check if a connection is alive
Inputs the connection object
Returns true if the connection is alive otherwise False
"""
try:
cursor=conn.cursor()
except Exception as e:
if type(e) != pgdb.InterfaceError:
return False
try:
cursor.execute("SELECT 1")
cursor.close()
except Exception as e:
if type(e) != pgdb.OperationalError:
return False
raise
return True
cursormap = {}
def get_cursor():
db = get_conn()
cursor = db.cursor()
cursormap[cursor]=db
return cursor
def free_cursor(cursor,commit=True):
db = cursormap[cursor]
del cursormap[cursor]
cursor.close()
if commit:
db.commit()
else:
db.rollback()
free_conn(db)
def row_to_dict(cursor,row):
"""
Converts a row, fetched from a database to, a dictionary
Example:
result = cursor.fetchone()
dictresult = row_to_dict(cursor,reult)
"""
i=0
r={}
if row == None:
return
for d in cursor.description:
r[d[0]]=row[i]
i=i+1
return r
def make_random_id( len=32 ):
str=''
for i in range(0,len):
h = random.randint(0,9)
str += repr(h)
return str
def create_connection_pool(conffile,conf_section):
"""
Create a connection pool
"""
cfg = config.Cfg(conffile)
connpool = ConnectionPool(
user=cfg.get(conf_section,'dbuser'),
password=cfg.get(conf_section,'dbpassword'),
host=cfg.get(conf_section,'dbhost'),
database=cfg.get(conf_section,'dbdatabase'),
port=cfg.get(conf_section,'dbport')
)
connpool.create(int(cfg.get(conf_section,'poolsize')))
return connpool