Source code for relstorage.adapters.locker

##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Locker implementations.
"""

from __future__ import absolute_import
from __future__ import print_function

import abc
import sys
import six


from relstorage._compat import ABC
from relstorage._util import metricmethod
from relstorage._util import consume
from relstorage._util import Lazy

from ._util import query_property as _query_property
from ._util import DatabaseHelpersMixin
from .schema import Schema

from .interfaces import UnableToAcquireCommitLockError
from .interfaces import UnableToLockRowsToModifyError
from .interfaces import UnableToLockRowsToReadCurrentError
from .interfaces import UnableToLockRowsDeadlockError

logger = __import__('logging').getLogger(__name__)

[docs] class AbstractLocker(DatabaseHelpersMixin, ABC): def __init__(self, options, driver, batcher_factory): self.keep_history = options.keep_history self.commit_lock_timeout = options.commit_lock_timeout self.commit_lock_id = options.commit_lock_id self.driver = driver self.lock_exceptions = driver.lock_exceptions self.illegal_operation_exceptions = driver.illegal_operation_exceptions self.make_batcher = batcher_factory
[docs] def on_store_opened(self, cursor, restart=False): """ A callback that must be called when a store connection is opened or restarted. This implementation calls :meth:`_on_store_opened_set_row_lock_timeout` when the store connection is initially opened. """ self._on_store_opened_set_row_lock_timeout(cursor, restart)
def _on_store_opened_set_row_lock_timeout(self, cursor, restart=False): """ Install a per-session row lock timeout. This should be set to the :attr:`commit_lock_timeout`. This applies to the locks taken by :meth:`lock_current_objects` during the ``tpc_vote`` phase of a transaction. """ #: Set this to a false value if you don't support ``NOWAIT`` and we'll #: instead set the lock timeout to a low value. supports_row_lock_nowait = True def _set_row_lock_timeout(self, cursor, timeout): "Implement this to change the row lock timeout." def _set_row_lock_nowait(self, cursor): # pragma: no cover """ If :attr:`supports_row_lock_timeout` is not true, then this class will call this method when :meth:`hold_commit_lock` is called with a false value for *nowait*. This method in turn is implemented to call :meth:`_set_row_lock_timeout` with a *timeout* argument of 0. This method is deprecated, as the *nowait* parameter is deprecated. ..note:: This class does not attempt to revert this change, leaving the connection in a 0 timeout. Previously, we only set *nowait* to false during packing, and only for that one connection. So it didn't matter that it wasn't rolled back. Now, we don't use *nowait* at all. We could automatically make this change local to the transaction on PostgreSQL with ``SET LOCAL``, and on MySQL with the optimizer hint ``SET_VAR()`` in the lock statement --- but only in 8+. """ self._set_row_lock_timeout(cursor, 0) _lock_current_clause = 'FOR UPDATE' _lock_share_clause_nowait = 'FOR SHARE NOWAIT' _lock_share_clause = 'FOR SHARE' #: These double as the query to get OIDs we'd like to lock, but #: do not actually lock them. _get_current_objects_queries = ( # If we also include the objects being added, # mysql takes out gap locks, and we can deadlock? # TODO: Confirm. (""" SELECT zoid FROM current_object INNER JOIN temp_store USING (zoid) WHERE temp_store.prev_tid <> 0 """, 'current_object'), (""" SELECT zoid FROM object_state INNER JOIN temp_store USING (zoid) WHERE temp_store.prev_tid <> 0 """, 'object_state'), ) _get_current_objects_query = _query_property('_get_current_objects') @Lazy def _lock_current_objects_query(self): return '{select} ORDER BY ZOID {lock}'.format( select=self._get_current_objects_query[0], lock=self._lock_current_clause ) @metricmethod def lock_current_objects(self, cursor, read_current_oid_ints, shared_locks_block, after_lock_share=lambda: None): # We need to be sure to take the locks in a deterministic # order; the easiest way to do that is to order them by OID. # But we have two separate sets of OIDs we need to lock: the # ones we're finding the current data for, and the ones that # we're going to check for conflicts. The ones we're checking # for conflicts are already in the database in ``temp_store`` # (and partly in the storage cache's temporary storage and/or # the row batcher); the current oids are only in memory. # # So we have a few choices: either put the current oids into # a database table and do a UNION query with temp_store, # or pull the temp_store data into memory, union it with # current_oids and issue a single big query. # # Our strategy could even vary depending on the size of # *current_oids*; in the usual case, it will be small or # empty, and an in-database big UNION query is probably # workable (in the empty case, we can and do elide this part # altogether) # In history free mode, *table* will be `object_state`, which # has ZOID as its primary key. In history preserving mode, # *table* will be `current_object`, where ZOID is also the primary # key (and `object_state` is immutable; that's why we don't need # to take any locks there --- conflict resolution will always be able # to find the desired state without fear of it changing). # MySQL 8 allows ``NOWAIT`` and ``SKIP LOCKED`` on ``FOR # UPDATE`` or ``FOR SHARE`` clauses; earlier versions do not # have that. PostgreSQL allows both. # In all databases, the locks we get depend on the indexing. # We must be searching on the primary key to get the smallest, # most specific row locks. In some databases, rows are only # locked when they are returned by the cursor, so we must # consume all the rows. # Lock rows we need shared access to, typically without blocking. if read_current_oid_ints: self._lock_readCurrent_oids_for_share(cursor, read_current_oid_ints, shared_locks_block) after_lock_share() # Lock the rows we need exclusive access to. # This will block for up to ``commit_lock_timeout``, # possibly * N self._lock_rows_being_modified(cursor) def _lock_suffix_for_readCurrent(self, shared_locks_block): return ' %s ' % ( self._lock_share_clause if shared_locks_block else self._lock_share_clause_nowait ) def _lock_column_name_for_readCurrent(self, shared_locks_block): # subclasses use the argument # pylint:disable=unused-argument return 'zoid' def _lock_consume_rows_for_readCurrent(self, rows, shared_locks_block): # subclasses use the argument # pylint:disable=unused-argument consume(rows) def _lock_readCurrent_oids_for_share(self, cursor, current_oids, shared_locks_block): _, table = self._get_current_objects_query oids_to_lock = sorted(set(current_oids)) batcher = self.make_batcher(cursor) locking_suffix = self._lock_suffix_for_readCurrent(shared_locks_block) lock_column = self._lock_column_name_for_readCurrent(shared_locks_block) try: rows = batcher.select_from( (lock_column,), table, suffix=locking_suffix, **{'zoid': oids_to_lock} ) self._lock_consume_rows_for_readCurrent(rows, shared_locks_block) except self.illegal_operation_exceptions: # pragma: no cover # Bug in our code raise except self.lock_exceptions: self.reraise_commit_lock_error( cursor, 'SELECT zoid FROM {table} WHERE zoid IN () {lock}'.format( table=table, lock=locking_suffix ), UnableToLockRowsToReadCurrentError ) def _lock_rows_being_modified(self, cursor): stmt = self._lock_current_objects_query try: cursor.execute(stmt) rows = cursor consume(rows) except self.illegal_operation_exceptions: # pragma: no cover # Bug in our code raise except self.lock_exceptions: self.reraise_commit_lock_error( cursor, stmt, UnableToLockRowsToModifyError ) def _modify_commit_lock_kind(self, kind, exc): # pylint:disable=unused-argument return kind def reraise_commit_lock_error(self, cursor, lock_stmt, kind): v = sys.exc_info()[1] kind = self._modify_commit_lock_kind(kind, v) if self.driver.exception_is_deadlock(v): kind = getattr(kind, 'DEADLOCK_VARIANT', UnableToLockRowsDeadlockError) try: debug_info = self._get_commit_lock_debug_info(cursor, True) except Exception as nested: # pylint:disable=broad-except logger.exception("Failed to get lock debug info") debug_info = "%r(%r)" % (type(nested), nested) __traceback_info__ = lock_stmt, debug_info if debug_info: logger.debug("Failed to acquire commit lock:\n%s", debug_info) message = "Acquiring a lock during commit failed: %s%s" % ( sys.exc_info()[1], '\n' + debug_info if debug_info else '(No debug info.)' ) val = kind(message) val.__relstorage_cause__ = v del v six.reraise( kind, val, sys.exc_info()[2]) # MySQL allows aggregates in the top level to use FOR UPDATE, # but PostgreSQL does not, so we have to use the second form. # # 'SELECT MAX(tid) FROM transaction FOR UPDATE', # 'SELECT tid FROM transaction WHERE tid = (SELECT MAX(tid) FROM transaction) FOR UPDATE', # Note that using transaction in history-preserving databases # can still lead to deadlock in older versions of MySQL (test # checkPackWhileWriting), and the above lock statement can # lead to duplicate transaction ids being inserted on older # versions (5.7.12, PyMySQL: # https://ci.appveyor.com/project/jamadden/relstorage/builds/25748619/job/cyio3w54uqi026lr#L923). # So both HF and HP use an artificial lock row. # # TODO: Figure out exactly the best way to lock just the rows # in the transaction table we care about that works # everywhere, or a better way to choose the next TID. # gap/intention locks might be a clue. _commit_lock_query = Schema.commit_row_lock.select( Schema.commit_row_lock.c.tid ).for_update( ).prepared( ) _commit_lock_nowait_query = _commit_lock_query.nowait() @metricmethod def hold_commit_lock(self, cursor, ensure_current=False, nowait=False): # pylint:disable=unused-argument # # This method is not used for PostgreSQL or MySQL. lock_stmt = self._commit_lock_query if nowait: # pragma: no cover if self.supports_row_lock_nowait: lock_stmt = self._commit_lock_nowait_query else: self._set_row_lock_nowait(cursor) try: lock_stmt.execute(cursor) rows = cursor.fetchall() if not rows or not rows[0]: raise UnableToAcquireCommitLockError("No row returned from commit_row_lock") except self.illegal_operation_exceptions: # pragma: no cover # Bug in our code. raise except self.lock_exceptions: if nowait: return False self.reraise_commit_lock_error( cursor, lock_stmt, UnableToAcquireCommitLockError ) return True def _get_commit_lock_debug_info(self, cursor, was_failure=False): # pylint:disable=unused-argument """ Subclasses can implement this to return a string that will be added to the exception message when a commit lock cannot be acquired. For example, it might list other connections that have conflicting locks. """ return '' @abc.abstractmethod def release_commit_lock(self, cursor): raise NotImplementedError() @abc.abstractmethod def hold_pack_lock(self, cursor): raise NotImplementedError() @abc.abstractmethod def release_pack_lock(self, cursor): raise NotImplementedError()