##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import absolute_import, print_function
from zope.interface import implementer
from .._util import metricmethod
from .interfaces import IConnectionManager
from .interfaces import ReplicaClosedException
from .replica import ReplicaSelector
logger = __import__('logging').getLogger(__name__)
[docs]
def connection_callback(isolation_level=None,
read_only=None,
deferrable=None,
application_name=None,
inherit=None):
"""
Decorator for functions used in :meth:`IConnectionManager.open_and_call`.
When the function is called, it will be given a connection that has
the given traits.
Use ``inherit=BaseClass.func`` when you're overriding a callback
function ``func``.
"""
if inherit is not None:
isolation_level = isolation_level or inherit.transaction_isolation_level
application_name = application_name or inherit.transaction_application_name
read_only = read_only or inherit.transaction_read_only
deferrable = deferrable or inherit.transaction_deferrable
def f(func):
func.transaction_isolation_level = isolation_level
func.transaction_read_only = read_only
func.transaction_deferrable = deferrable
func.transaction_application_name = application_name
return func
return f
def _connection_callback_open_args(connmanager, callback):
# Read the settings from `connection_callback` or defaults.
isolation = getattr(callback, 'transaction_isolation_level', None)
read_only = getattr(callback, 'transaction_read_only', None)
deferrable = getattr(callback, 'transaction_deferrable', None)
application_name = getattr(callback, 'transaction_application_name', None)
return dict( # pylint:disable=use-dict-literal
isolation=isolation or connmanager.isolation_store,
read_only=read_only or False,
deferrable=deferrable or False,
application_name=application_name or ('RS: ' + callback.__name__)
)
[docs]
@implementer(IConnectionManager)
class AbstractConnectionManager(object):
"""
Abstract base class for connection management.
Responsible for opening and closing database connections.
"""
# pylint:disable=too-many-instance-attributes
# a series of callables (cursor, restart=bool)
# for when a store connection is opened.
_on_store_opened = ()
# a series of callables (cursor,) that will be called
# when a load connection is opened
_on_load_opened = ()
# The list of exceptions to ignore on a rollback *or* close. We
# take this as the union of the driver's close exceptions and disconnected
# exceptions (drivers aren't required to organize them to overlap, but
# in practice they should.)
# TODO: Promote to public, connections.py is using this.
_ignored_exceptions = ()
# Subclasses should set these to get semantics as close
# as possible to these standard levels.
isolation_serializable = None
isolation_read_committed = None
# If these are not set by a subclass, they will be copied
# from isolation_serializable and read_committed, respectively.
isolation_load = None
isolation_store = None
replica_selector = None
ro_replica_selector = None
def __init__(self, options, driver):
"""
:param options: A :class:`relstorage.options.Options`.
:param driver: A :class:`relstorage.adapters.interfaces.IDBDriver`,
which we use for its exceptions.
"""
self.keep_history = options.keep_history
self.driver = driver
self.options = options
self._ignored_exceptions = tuple(set(
driver.close_exceptions
+ driver.disconnected_exceptions
+ (ReplicaClosedException,)
))
if options.replica_conf:
self.replica_selector = ReplicaSelector(
options.replica_conf, options.replica_timeout)
if options.ro_replica_conf:
self.ro_replica_selector = ReplicaSelector(
options.ro_replica_conf, options.replica_timeout)
else:
self.ro_replica_selector = self.replica_selector
if not self.isolation_load:
self.isolation_load = self.isolation_serializable
if not self.isolation_store:
self.isolation_store = self.isolation_read_committed
self._may_need_rollback = driver.connection_may_need_rollback
self._may_need_commit = driver.connection_may_need_commit
self._synchronize_cursor_for_rollback = driver.synchronize_cursor_for_rollback
self._do_commit = driver.commit
self._do_rollback = driver.rollback
[docs]
def add_on_store_opened(self, f):
"""
Add a callable(cursor, restart=bool) for when a store connection
is opened.
Hooks are called in the order added.
.. versionadded:: 2.1a1
"""
self._on_store_opened += (f,)
set_on_store_opened = add_on_store_opened # BWC
[docs]
def add_on_load_opened(self, f):
"""
Add a callable (cursor, restart=bool) for when a load connection is opened.
Hooks are called in the order added.
.. versionadded:: 2.1a1
"""
self._on_load_opened += (f,)
[docs]
def open(self,
isolation=None,
read_only=False,
deferrable=False,
replica_selector=None,
application_name=None,
**kwargs):
"""Open a database connection and return (conn, cursor)."""
raise NotImplementedError()
[docs]
@metricmethod
def close(self, conn=None, cursor=None):
"""
Close a connection and cursor, ignoring certain errors.
Return a True value if the connection was closed cleanly. Return
a False value if the processes ignored an error.
"""
clean = True
for obj in (cursor, conn): # cursor first; some drivers want that done
if obj is not None:
try:
obj.close()
except self._ignored_exceptions: # pylint:disable=catching-non-exception
logger.debug("Exception closing %r", obj, exc_info=1)
clean = False
return clean
def __rollback_connection(self, conn, ignored_exceptions, restarting):
"""Return True if we successfully rolled back."""
clean = True
if conn is not None:
if self._may_need_rollback(conn):
try:
self._do_rollback(conn)
except ignored_exceptions:
clean = False
elif restarting:
self._begin_for_restart(conn)
return clean
def _begin_for_restart(self, conn):
pass
def _may_need_rollback(self, conn): # pylint:disable=unused-argument,method-hidden
"""
Answer if this connection might need to be rolled back.
If a subclass can definitively say that it does *not* need to
be rolled back, because it is not in a transaction,
it can override to return false.
"""
return True
def _may_need_commit(self, conn): # pylint:disable=unused-argument,method-hidden
"""
Answer if this connection might need to be committed.
If a subclass can definitively say that it does *not* need to
be committed, because it is not in a transaction,
it can override to return false.
"""
return True
def __rollback(self, conn, cursor, quietly, restarting):
# If an error occurs, close the connection and cursor.
#
# Some drivers require the cursor to be closed before closing
# the connection.
#
# Some drivers also don't allow you to close the cursor
# without fetching all rows.
self._synchronize_cursor_for_rollback(cursor)
clean = False
try:
clean = self.__rollback_connection(
conn,
# Let it raise if we're not meant to be quiet.
self._ignored_exceptions if quietly else (),
restarting
)
except:
clean = False
raise
finally:
if not clean:
self.close(conn, cursor)
return clean
def rollback_and_close(self, conn, cursor):
clean = self.__rollback(conn, cursor, True, False)
if clean:
# if an error already occurred, we closed things.
clean = self.close(conn, cursor)
return clean
def rollback(self, conn, cursor):
return self.__rollback(conn, cursor, False, None)
def rollback_for_restart(self, conn, cursor):
return self.__rollback(conn, cursor, False, True)
def rollback_quietly(self, conn, cursor):
return self.__rollback(conn, cursor, True, None)
rollback_store_quietly = rollback_quietly
def commit(self, conn, cursor=None, force=False):
if self._may_need_commit(conn) or force:
self._do_commit(conn, cursor)
def begin(self, conn, cursor):
pass
def cursor_for_connection(self, conn):
return self.driver.cursor(conn)
[docs]
def open_and_call(self, callback):
"""
Call ``callback(connection, cursor)`` with a newly open connection and cursor.
If the function returns, commits the transaction and returns the
result returned by the function.
If the function raises an exception, aborts the transaction
then propagates the exception.
"""
conn, cursor = self._do_open_for_call(callback)
try:
try:
res = callback(conn, cursor)
except:
self.rollback_and_close(conn, cursor)
conn, cursor = None, None
raise
self.close(None, cursor)
cursor = None
self.commit(conn)
return res
finally:
self.close(conn, cursor)
def _call_hooks(self, hooks, conn, cursor,
*args, **kwargs):
try:
for hook in hooks:
hook(*args, **kwargs)
except:
self.close(conn, cursor)
raise
def _do_open_for_load(self):
raise NotImplementedError()
def open_for_load(self):
conn, cursor = self._do_open_for_load()
self._call_hooks(self._on_load_opened, conn, cursor,
cursor, restart=False)
return conn, cursor
[docs]
def restart_load(self, conn, cursor, needs_rollback=True):
"""Reinitialize a connection for loading objects."""
self.check_replica(conn, cursor,
replica_selector=self.ro_replica_selector)
if needs_rollback:
self.rollback(conn, cursor)
self._call_hooks(self._on_load_opened, conn, cursor,
cursor, restart=True)
[docs]
def check_replica(self, conn, cursor, replica_selector=None):
"""Raise an exception if the connection belongs to an old replica"""
if replica_selector is None:
replica_selector = self.replica_selector
if replica_selector is not None:
current = replica_selector.current()
if conn.replica != current:
# Prompt the change to a new replica by raising an exception.
self.close(conn, cursor)
raise ReplicaClosedException(
"Switched replica from %s to %s" % (conn.replica, current))
[docs]
def open_for_pre_pack(self):
"""Open a connection to be used for the pre-pack phase.
Returns (conn, cursor).
"""
return self.open_for_store(application_name='RS: Prepack')
def open_for_pack_lock(self):
return self.open()
def _do_open_for_store(self, **open_args):
open_args['isolation'] = self.isolation_store
open_args['read_only'] = False
open_args['deferrable'] = False
if 'application_name' not in open_args:
open_args['application_name'] = 'RS: Store'
return self.open(**open_args)
def _do_open_for_call(self, callback):
return self.open(**_connection_callback_open_args(self, callback))
def _after_opened_for_store(self, conn, cursor, restart=False):
"""
Called after a store is opened or restarted but
before hooks are called.
Subclasses may override.
"""
# pylint:disable=unused-argument
return
[docs]
def open_for_store(self, **open_args):
"""Open and initialize a connection for storing objects.
Returns (conn, cursor).
"""
conn, cursor = self._do_open_for_store(**open_args)
try:
self._after_opened_for_store(conn, cursor)
except:
self.close(conn, cursor)
raise
self._call_hooks(self._on_store_opened, conn, cursor,
cursor, restart=False)
return conn, cursor
[docs]
def restart_store(self, conn, cursor, needs_rollback=True):
"""Reuse a store connection."""
self.check_replica(conn, cursor)
if needs_rollback:
self.rollback_for_restart(conn, cursor)
self._after_opened_for_store(conn, cursor)
self._call_hooks(self._on_store_opened, conn, cursor,
cursor, restart=True)
def describe_connection(self, conn, cursor): # pylint:disable=unused-argument
return "<unknown>"