Source code for relstorage.adapters.postgresql.locker

##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Locker implementations.
"""

from __future__ import absolute_import

from zope.interface import implementer

from relstorage._util import Lazy
from ..interfaces import ILocker
from ..interfaces import UnableToLockRowsToModifyError
from ..interfaces import UnableToLockRowsToReadCurrentError
from ..interfaces import UnableToAcquirePackUndoLockError
from ..locker import AbstractLocker


[docs] @implementer(ILocker) class PostgreSQLLocker(AbstractLocker): """ On PostgreSQL, for locking individual objects, we use advisory locks, not row locks. This is because row locks perform disk I/O, which can "bloat" tables; this is especially an issue for shared (readCurrent) locks, which perform I/O to pages that might not otherwise need it. Using many row locks requires a good (auto)vacuuming setup to prevent this bloat from becoming a problem. Also, advisory locks are much faster than row locks. We take advisory locks based on OID. OIDs begin at 0 and increase. To avoid conflicting with those, any advisory locks used for "administrative" purposes, such as the pack lock, need to use negative numbers. Locks include: - -1: The commit lock - -2: The pack lock """ def _on_store_opened_set_row_lock_timeout(self, cursor, restart=False): # This only lasts beyond the current transaction if it # commits. If we have a rollback, then our effect is lost. # Luckily, right here is a fine time to commit. self._set_row_lock_timeout(cursor, self.commit_lock_timeout) cursor.connection.commit() def _set_row_lock_timeout(self, cursor, timeout): # This will rollback if the transaction rolls back, # which should restart our store. # Maybe for timeout == 0, we should do SET LOCAL, which # is guaranteed not to persist? # Recall postgresql uses milliseconds, but our configuration is given # in integer seconds. # For tests, though, we accept floating point numbers timeout_ms = int(timeout * 1000.0) self.driver.set_lock_timeout(cursor, timeout_ms) @Lazy def _lock_current_objects_query(self): # This is not used in production, only in tests that try to interleave # different types of locking. get = self._get_current_objects_query[0] stmt = get.replace('SELECT zoid', 'SELECT pg_advisory_xact_lock(zoid)') stmt += ' ORDER BY zoid' return stmt def _lock_suffix_for_readCurrent(self, shared_locks_block): # This is not used in production, only in tests that try to interleave # different types of locking. return '' def _lock_column_name_for_readCurrent(self, shared_locks_block): # This is not used in production, only in tests that try to interleave # different types of locking. return ( 'pg_advisory_xact_lock_shared(zoid)' if shared_locks_block else 'pg_try_advisory_xact_lock_shared(zoid)' ) def _lock_consume_rows_for_readCurrent(self, rows, shared_locks_block): # This is not used in production, only in tests that try to interleave # different types of locking. # If we used the blocking version, it returned void (NULL/None), # or raised an error. No need to examine the rows. if shared_locks_block: return super()._lock_consume_rows_for_readCurrent( rows, shared_locks_block) for got_lock, in rows: if not got_lock: raise UnableToLockRowsToReadCurrentError return None def release_commit_lock(self, cursor): # no action needed, locks released with transaction. pass def _modify_commit_lock_kind(self, kind, exc): # We can distinguish exclusive locks from shared locks # by the error message produced by postgresql. The exclusive lock # failures have a full query ('SELECT ... FOR UPDATE'), while # the shared locks either have 'RETURN QUERY' in their string # (when we couldn't get a lock specifically) or 'readCurrent' in their # hint (when we could get a lock, but the object changed) exc_str = str(exc).lower() if 'for update' in exc_str: kind = UnableToLockRowsToModifyError elif 'return query' in exc_str or 'readcurrent' in exc_str: kind = UnableToLockRowsToReadCurrentError return kind def _get_commit_lock_debug_info(self, cursor, was_failure=False): # When we're called, the transaction is guaranteed to be # aborted, so to be able to execute any queries we need to # rollback. Unfortunately, that would release the locks that # we're holding, which have probably(?) already have been released # anyway. The output can be excessive. if was_failure: cursor.connection.rollback() return 'Transaction failed; no lock info available' from .util import debug_locks debug_locks(cursor) return self._rows_as_pretty_string(cursor)
[docs] def hold_pack_lock(self, cursor): """Try to acquire the pack lock. Raise an exception if packing or undo is already in progress. """ cursor.execute("SELECT pg_try_advisory_lock(-2)") locked = cursor.fetchone()[0] if not locked: raise UnableToAcquirePackUndoLockError('A pack or undo operation is in progress')
[docs] def release_pack_lock(self, cursor): """Release the pack lock.""" cursor.execute("SELECT pg_advisory_unlock(-2)")