Subversion Repositories SE.SVN

Rev

Blame | Last modification | View Log | RSS feed

"""
Functions and classes to handle databases 
"""

from psycopg2 import connect,threadsafety,paramstyle

import psycopg2 as pgdb

#from pyPgSQL.PgSQL import concnect,threadsafety,paramstyle

import threading
import random
from . import config

#def TimestampFromTicks(ticks):
#    return pgdb.TimestampFromTicks(ticks)


class ConnectionPool:
    """
    Implements a db connection pool
    """

    def __init__(self,**args):
        self.pool = []
        self.glock = threading.Lock()
        self.rlock = threading.Lock()
        self.notifier = threading.Condition()
        self.cursormap = {}
        self.args=args

    def connect(self):
        return  pgdb.connect(**self.args)
    

    def get_conn(self):
        """
        get a database connection
        """

        self.glock.acquire()

        while len(self.pool)==0:
            self.notifier.acquire()
            self.notifier.wait()
            self.notifier.release()
                        
        r = self.pool.pop()
        self.glock.release()

        # Hack for pgdb
        try:
            r.rollback()
        except:
            pass

        return r


    def free_conn(self,c):
        """
        free a connection aquired by get_con()
        """
        try:
            c.rollback()
        except:
            pass
                    
        self.rlock.acquire()
        self.notifier.acquire()
        self.pool.append(c)
        self.notifier.notify()
        self.notifier.release()
        self.rlock.release()


    def get_cursor(self):

        db = self.get_conn()

        cursor = db.cursor()
        self.cursormap[cursor]=db
        return cursor

    def free_cursor(self,cursor,commit=True):
        db = self.cursormap[cursor]
        del self.cursormap[cursor]
        cursor.close()
        if commit:
            db.commit()
        else:
            db.rollback()
        self.free_conn(db)

    def create(self,poolsize):
        for i in range(0,poolsize):
            c = self.connect ()
            self.pool.append(c)


    def check_conn(self,conn):
        """
        Check if a connection is alive
        Inputs the connection object
        Returns true if the connection is alive otherwise False
        """
        try:
            cursor=conn.cursor()
        except Exception as e:
            if type(e) != pgdb.InterfaceError:
                return False
            
        try:
            cursor.execute("SELECT 1")
            cursor.close()

        except Exception as e:
            if type(e) != pgdb.OperationalError:
                return False
            raise
        
        return True
            
        

cursormap = {}

def get_cursor():
    db = get_conn()
    cursor = db.cursor()
    cursormap[cursor]=db
    return cursor

def free_cursor(cursor,commit=True):
    db = cursormap[cursor]
    del cursormap[cursor]
    cursor.close()
    if commit:
        db.commit()
    else:
        db.rollback()
    free_conn(db)




def row_to_dict(cursor,row):
    """
    Converts a row, fetched from a database to, a dictionary

    Example:
        result = cursor.fetchone()
        dictresult = row_to_dict(cursor,reult)
    """
    i=0
    r={}
    if row == None: 
        return
        
    for d in cursor.description:
        r[d[0]]=row[i]
        i=i+1

    return r


def make_random_id( len=32 ):
    str=''
    for i in range(0,len):
        h = random.randint(0,9)
        str += repr(h)
    return str


def create_connection_pool(conffile,conf_section):
    """
    Create a connection pool
    """
    cfg = config.Cfg(conffile)

    connpool = ConnectionPool(
        user=cfg.get(conf_section,'dbuser'),
        password=cfg.get(conf_section,'dbpassword'),
        host=cfg.get(conf_section,'dbhost'),
        database=cfg.get(conf_section,'dbdatabase'),
        port=cfg.get(conf_section,'dbport')
        )

    connpool.create(int(cfg.get(conf_section,'poolsize')))
    return connpool