"""SteadyDB - hardened DB-API 2 connections. Implements steady connections to a database based on an arbitrary DB-API 2 compliant database interface module. The connections are transparently reopened when they are closed or the database connection has been lost or when they are used more often than an optional usage limit. Database cursors are transparently reopened as well when the execution of a database operation cannot be performed due to a lost connection. Only if the connection is lost after the execution, when rows are already fetched from the database, this will give an error and the cursor will not be reopened automatically, because there is no reliable way to recover the state of the cursor in such a situation. Connections which have been marked as being in a transaction with a begin() call will not be silently replaced either. A typical situation where database connections are lost is when the database server or an intervening firewall is shutdown and restarted for maintenance reasons. In such a case, all database connections would become unusable, even though the database service may be already available again. The "hardened" connections provided by this module will make the database connections immediately available again. This approach results in a steady database connection that can be used by PooledDB or PersistentDB to create pooled or persistent connections to a database in a threaded environment such as the application server of "Webware for Python." Note, however, that the connections themselves may not be thread-safe (depending on the used DB-API module). For the Python DB-API 2 specification, see: https://www.python.org/dev/peps/pep-0249/ For information on Webware for Python, see: https://webwareforpython.github.io/w4py/ Usage: You can use the connection constructor connect() in the same way as you would use the connection constructor of a DB-API 2 module if you specify the DB-API 2 module to be used as the first parameter, or alternatively you can specify an arbitrary constructor function returning new DB-API 2 compliant connection objects as the first parameter. Passing just a function allows implementing failover mechanisms and load balancing strategies. You may also specify a usage limit as the second parameter (set it to None if you prefer unlimited usage), an optional list of commands that may serve to prepare the session as a third parameter, the exception classes for which the failover mechanism shall be applied, and you can specify whether is is allowed to close the connection (by default this is true). When the connection to the database is lost or has been used too often, it will be transparently reset in most situations, without further notice. import pgdb # import used DB-API 2 module from dbutils.steady_db import connect db = connect(pgdb, 10000, ["set datestyle to german"], host=..., database=..., user=..., ...) ... cursor = db.cursor() ... cursor.execute('select ...') result = cursor.fetchall() ... cursor.close() ... db.close() Ideas for improvement: * Alternatively to the maximum number of uses, implement a maximum time to live for connections. * Optionally log usage and loss of connection. Copyright, credits and license: * Contributed as supplement for Webware for Python and PyGreSQL by Christoph Zwerschke in September 2005 * Allowing creator functions as first parameter as in SQLAlchemy suggested by Ezio Vernacotola in December 2006 Licensed under the MIT license. """ import sys from . import __version__ try: baseint = (int, long) except NameError: # Python 3 baseint = int class SteadyDBError(Exception): """General SteadyDB error.""" class InvalidCursor(SteadyDBError): """Database cursor is invalid.""" def connect( creator, maxusage=None, setsession=None, failures=None, ping=1, closeable=True, *args, **kwargs): """A tough version of the connection constructor of a DB-API 2 module. creator: either an arbitrary function returning new DB-API 2 compliant connection objects or a DB-API 2 compliant database module maxusage: maximum usage limit for the underlying DB-API 2 connection (number of database operations, 0 or None means unlimited usage) callproc(), execute() and executemany() count as one operation. When the limit is reached, the connection is automatically reset. setsession: an optional list of SQL commands that may serve to prepare the session, e.g. ["set datestyle to german", "set time zone mez"] failures: an optional exception class or a tuple of exception classes for which the failover mechanism shall be applied, if the default (OperationalError, InternalError) is not adequate ping: determines when the connection should be checked with ping() (0 = None = never, 1 = default = when _ping_check() is called, 2 = whenever a cursor is created, 4 = when a query is executed, 7 = always, and all other bit combinations of these values) closeable: if this is set to false, then closing the connection will be silently ignored, but by default the connection can be closed args, kwargs: the parameters that shall be passed to the creator function or the connection constructor of the DB-API 2 module """ return SteadyDBConnection( creator, maxusage, setsession, failures, ping, closeable, *args, **kwargs) class SteadyDBConnection: """A "tough" version of DB-API 2 connections.""" version = __version__ def __init__( self, creator, maxusage=None, setsession=None, failures=None, ping=1, closeable=True, *args, **kwargs): """Create a "tough" DB-API 2 connection.""" # basic initialization to make finalizer work self._con = None self._closed = True # proper initialization of the connection try: self._creator = creator.connect self._dbapi = creator except AttributeError: # try finding the DB-API 2 module via the connection creator self._creator = creator try: self._dbapi = creator.dbapi except AttributeError: try: self._dbapi = sys.modules[creator.__module__] if self._dbapi.connect != creator: raise AttributeError except (AttributeError, KeyError): self._dbapi = None try: self._threadsafety = creator.threadsafety except AttributeError: try: self._threadsafety = self._dbapi.threadsafety except AttributeError: self._threadsafety = None if not callable(self._creator): raise TypeError("%r is not a connection provider." % (creator,)) if maxusage is None: maxusage = 0 if not isinstance(maxusage, baseint): raise TypeError("'maxusage' must be an integer value.") self._maxusage = maxusage self._setsession_sql = setsession if failures is not None and not isinstance( failures, tuple) and not issubclass(failures, Exception): raise TypeError("'failures' must be a tuple of exceptions.") self._failures = failures self._ping = ping if isinstance(ping, int) else 0 self._closeable = closeable self._args, self._kwargs = args, kwargs self._store(self._create()) def __enter__(self): """Enter the runtime context for the connection object.""" return self def __exit__(self, *exc): """Exit the runtime context for the connection object. This does not close the connection, but it ends a transaction. """ if exc[0] is None and exc[1] is None and exc[2] is None: self.commit() else: self.rollback() def _create(self): """Create a new connection using the creator function.""" con = self._creator(*self._args, **self._kwargs) try: try: if self._dbapi.connect != self._creator: raise AttributeError except AttributeError: # try finding the DB-API 2 module via the connection itself try: mod = con.__module__ except AttributeError: mod = None while mod: try: self._dbapi = sys.modules[mod] if not callable(self._dbapi.connect): raise AttributeError except (AttributeError, KeyError): pass else: break i = mod.rfind('.') if i < 0: mod = None else: mod = mod[:i] else: try: mod = con.OperationalError.__module__ except AttributeError: mod = None while mod: try: self._dbapi = sys.modules[mod] if not callable(self._dbapi.connect): raise AttributeError except (AttributeError, KeyError): pass else: break i = mod.rfind('.') if i < 0: mod = None else: mod = mod[:i] else: self._dbapi = None if self._threadsafety is None: try: self._threadsafety = self._dbapi.threadsafety except AttributeError: try: self._threadsafety = con.threadsafety except AttributeError: pass if self._failures is None: try: self._failures = ( self._dbapi.OperationalError, self._dbapi.InternalError) except AttributeError: try: self._failures = ( self._creator.OperationalError, self._creator.InternalError) except AttributeError: try: self._failures = ( con.OperationalError, con.InternalError) except AttributeError: raise AttributeError( "Could not determine failure exceptions" " (please set failures or creator.dbapi).") if isinstance(self._failures, tuple): self._failure = self._failures[0] else: self._failure = self._failures self._setsession(con) except Exception as error: # the database module could not be determined # or the session could not be prepared try: # close the connection first con.close() except Exception: pass raise error # re-raise the original error again return con def _setsession(self, con=None): """Execute the SQL commands for session preparation.""" if con is None: con = self._con if self._setsession_sql: cursor = con.cursor() for sql in self._setsession_sql: cursor.execute(sql) cursor.close() def _store(self, con): """Store a database connection for subsequent use.""" self._con = con self._transaction = False self._closed = False self._usage = 0 def _close(self): """Close the tough connection. You can always close a tough connection with this method and it will not complain if you close it more than once. """ if not self._closed: try: self._con.close() except Exception: pass self._transaction = False self._closed = True def _reset(self, force=False): """Reset a tough connection. Rollback if forced or the connection was in a transaction. """ if not self._closed and (force or self._transaction): try: self.rollback() except Exception: pass def _ping_check(self, ping=1, reconnect=True): """Check whether the connection is still alive using ping(). If the the underlying connection is not active and the ping parameter is set accordingly, the connection will be recreated unless the connection is currently inside a transaction. """ if ping & self._ping: try: # if possible, ping the connection try: # pass a reconnect=False flag if this is supported alive = self._con.ping(False) except TypeError: # the reconnect flag is not supported alive = self._con.ping() except (AttributeError, IndexError, TypeError, ValueError): self._ping = 0 # ping() is not available alive = None reconnect = False except Exception: alive = False else: if alive is None: alive = True if alive: reconnect = False if reconnect and not self._transaction: try: # try to reopen the connection con = self._create() except Exception: pass else: self._close() self._store(con) alive = True return alive def dbapi(self): """Return the underlying DB-API 2 module of the connection.""" if self._dbapi is None: raise AttributeError( "Could not determine DB-API 2 module" " (please set creator.dbapi).") return self._dbapi def threadsafety(self): """Return the thread safety level of the connection.""" if self._threadsafety is None: if self._dbapi is None: raise AttributeError( "Could not determine threadsafety" " (please set creator.dbapi or creator.threadsafety).") return 0 return self._threadsafety def close(self): """Close the tough connection. You are allowed to close a tough connection by default and it will not complain if you close it more than once. You can disallow closing connections by setting the closeable parameter to something false. In this case, closing tough connections will be silently ignored. """ if self._closeable: self._close() elif self._transaction: self._reset() def begin(self, *args, **kwargs): """Indicate the beginning of a transaction. During a transaction, connections won't be transparently replaced, and all errors will be raised to the application. If the underlying driver supports this method, it will be called with the given parameters (e.g. for distributed transactions). """ self._transaction = True try: begin = self._con.begin except AttributeError: pass else: begin(*args, **kwargs) def commit(self): """Commit any pending transaction.""" self._transaction = False try: self._con.commit() except self._failures as error: # cannot commit try: # try to reopen the connection con = self._create() except Exception: pass else: self._close() self._store(con) raise error # re-raise the original error def rollback(self): """Rollback pending transaction.""" self._transaction = False try: self._con.rollback() except self._failures as error: # cannot rollback try: # try to reopen the connection con = self._create() except Exception: pass else: self._close() self._store(con) raise error # re-raise the original error def cancel(self): """Cancel a long-running transaction. If the underlying driver supports this method, it will be called. """ self._transaction = False try: cancel = self._con.cancel except AttributeError: pass else: cancel() def ping(self, *args, **kwargs): """Ping connection.""" return self._con.ping(*args, **kwargs) def _cursor(self, *args, **kwargs): """A "tough" version of the method cursor().""" # The args and kwargs are not part of the standard, # but some database modules seem to use these. transaction = self._transaction if not transaction: self._ping_check(2) try: # check whether the connection has been used too often if (self._maxusage and self._usage >= self._maxusage and not transaction): raise self._failure cursor = self._con.cursor(*args, **kwargs) # try to get a cursor except self._failures as error: # error in getting cursor try: # try to reopen the connection con = self._create() except Exception: pass else: try: # and try one more time to get a cursor cursor = con.cursor(*args, **kwargs) except Exception: pass else: self._close() self._store(con) if transaction: raise error # re-raise the original error again return cursor try: con.close() except Exception: pass if transaction: self._transaction = False raise error # re-raise the original error again return cursor def cursor(self, *args, **kwargs): """Return a new Cursor Object using the connection.""" return SteadyDBCursor(self, *args, **kwargs) def __del__(self): """Delete the steady connection.""" try: self._close() # make sure the connection is closed except Exception: pass class SteadyDBCursor: """A "tough" version of DB-API 2 cursors.""" def __init__(self, con, *args, **kwargs): """Create a "tough" DB-API 2 cursor.""" # basic initialization to make finalizer work self._cursor = None self._closed = True # proper initialization of the cursor self._con = con self._args, self._kwargs = args, kwargs self._clearsizes() try: self._cursor = con._cursor(*args, **kwargs) except AttributeError: raise TypeError("%r is not a SteadyDBConnection." % (con,)) self._closed = False def __enter__(self): """Enter the runtime context for the cursor object.""" return self def __exit__(self, *exc): """Exit the runtime context for the cursor object.""" self.close() def setinputsizes(self, sizes): """Store input sizes in case cursor needs to be reopened.""" self._inputsizes = sizes def setoutputsize(self, size, column=None): """Store output sizes in case cursor needs to be reopened.""" self._outputsizes[column] = size def _clearsizes(self): """Clear stored input and output sizes.""" self._inputsizes = [] self._outputsizes = {} def _setsizes(self, cursor=None): """Set stored input and output sizes for cursor execution.""" if cursor is None: cursor = self._cursor if self._inputsizes: cursor.setinputsizes(self._inputsizes) for column, size in self._outputsizes.items(): if column is None: cursor.setoutputsize(size) else: cursor.setoutputsize(size, column) def close(self): """Close the tough cursor. It will not complain if you close it more than once. """ if not self._closed: try: self._cursor.close() except Exception: pass self._closed = True def _get_tough_method(self, name): """Return a "tough" version of the given cursor method.""" def tough_method(*args, **kwargs): execute = name.startswith('execute') con = self._con transaction = con._transaction if not transaction: con._ping_check(4) try: # check whether the connection has been used too often if (con._maxusage and con._usage >= con._maxusage and not transaction): raise con._failure if execute: self._setsizes() method = getattr(self._cursor, name) result = method(*args, **kwargs) # try to execute if execute: self._clearsizes() except con._failures as error: # execution error if not transaction: try: cursor2 = con._cursor( *self._args, **self._kwargs) # open new cursor except Exception: pass else: try: # and try one more time to execute if execute: self._setsizes(cursor2) method = getattr(cursor2, name) result = method(*args, **kwargs) if execute: self._clearsizes() except Exception: pass else: self.close() self._cursor = cursor2 con._usage += 1 return result try: cursor2.close() except Exception: pass try: # try to reopen the connection con2 = con._create() except Exception: pass else: try: cursor2 = con2.cursor( *self._args, **self._kwargs) # open new cursor except Exception: pass else: if transaction: self.close() con._close() con._store(con2) self._cursor = cursor2 raise error # raise the original error again error2 = None try: # try one more time to execute if execute: self._setsizes(cursor2) method2 = getattr(cursor2, name) result = method2(*args, **kwargs) if execute: self._clearsizes() except error.__class__: # same execution error use2 = False error2 = error except Exception as error: # other execution errors use2 = True error2 = error else: use2 = True if use2: self.close() con._close() con._store(con2) self._cursor = cursor2 con._usage += 1 if error2: raise error2 # raise the other error return result try: cursor2.close() except Exception: pass try: con2.close() except Exception: pass if transaction: self._transaction = False raise error # re-raise the original error again else: con._usage += 1 return result return tough_method def __getattr__(self, name): """Inherit methods and attributes of underlying cursor.""" if self._cursor: if name.startswith(('execute', 'call')): # make execution methods "tough" return self._get_tough_method(name) else: return getattr(self._cursor, name) else: raise InvalidCursor def __del__(self): """Delete the steady cursor.""" try: self.close() # make sure the cursor is closed except Exception: pass