Source code for relstorage.adapters.interfaces

# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
"""Interfaces provided by RelStorage database adapters"""
from __future__ import absolute_import

from ZODB.POSException import StorageError
from ZODB.POSException import ReadConflictError
from ZODB.POSException import ConflictError
from zope.interface import Attribute
from zope.interface import Interface

# pylint:disable=inherit-non-class,no-method-argument,no-self-argument
# pylint:disable=too-many-ancestors,too-many-lines

from relstorage.interfaces import Tuple
from relstorage.interfaces import Object
from relstorage.interfaces import Bool
from relstorage.interfaces import Factory
from relstorage.interfaces import IException

# Abstractions to support multiple databases.

[docs] class IDBDialect(Interface): """ Handles converting from our internal "standard" SQL queries to something database specific. """ # TODO: Fill this in. def boolean_str(value): """ Given exactly a `bool` (`True` or `False`) return the string the database uses to represent that literal. By default, this will be "TRUE" or "FALSE", but older versions of SQLite need 1 or 0, while Oracle needs "'Y'" or "'N'". """
[docs] class IDBDriver(Interface): """ An abstraction over the information needed for RelStorage to work with an arbitrary DB-API driver. """ __name__ = Attribute("The name of this driver") disconnected_exceptions = Tuple( description=("A tuple of exceptions this driver can raise on any operation if it is " "disconnected from the database."), value_type=Factory(IException) ) close_exceptions = Tuple( description=("A tuple of exceptions that we can ignore when we try to " "close the connection to the database. Often this is the same " "or an extension of `disconnected_exceptions`." "These exceptions may also be ignored on rolling back the connection, " "if we are otherwise completely done with it and prepared to drop it. "), value_type=Factory(IException), ) lock_exceptions = Tuple( description="A tuple of exceptions", value_type=Factory(IException), ) # XXX: Document use_replica_exceptions = Tuple( description=("A tuple of exceptions raised by connecting " "that should cause us to try a replica."), value_type=Factory(IException) ) Binary = Attribute("A callable.") dialect = Object(IDBDialect, description="The IDBDialect for this driver.") cursor_arraysize = Attribute( "The value to assign to each new cursor's ``arraysize`` attribute.") supports_64bit_unsigned_id = Attribute("Can the driver handle 2**64 as a parameter?") connect = Attribute(""" A callable to create and return a new connection object. The signature is not specified here because the required parameters differ between databases and drivers. The interface should be agreed upon between the :class:`IConnectionManager` and the drivers for its database. This connection, and all objects created from it such as cursors, should be used within a single thread only. """) def cursor(connection, server_side=False): """ Create and return a new cursor sharing the state of the given *connection*. The cursor should be closed when it is no longer needed. The cursor should be considered forward-only (no backward scrolling) and ephemeral (results go away when the attached transaction is committed or rolled back). For compatibility, previous cursors should not have outstanding results pending when this is called and while the returned cursor is used (not all drivers permit multiple active cursors). If *server_side* is true (not the default), request that the driver creates a cursor that will **not** buffer the complete results of a query on the client. Instead, the results should be streamed from the server in batches. This can reduce the maximum amount of memory needed to handle results, if done carefully. For compatibility, server_side cursors can only be used to execute a single query. Most drivers (``psycopg2``, ``psycopg2cffi``, ``pg8000``, ``mysqlclient``) default to buffering the entire results client side before returning from the ``execute`` method. This can reduce latency and increase overall throughput, but at the cost of memory, especially if the results will be copied into different data structures. Not all drivers support server-side cursors; they will ignore that request. At this writing, this includes ``pg8000``. Some drivers (at this writing, only ``gevent MySQLdb``) always use server-side cursors. The ``cx_Oracle`` driver is unevaluated. ``psycopg2`` and ``psycopg2cffi`` both iterate in chunks of ``cur.itersize`` by default. PyMySQL seems to iterate one row at a time. ``mysqlclient`` defaults to also iterating one row at a time, but we patch that to operate in chunks of ``cur.arraysize``. """ def binary_column_as_state_type(db_column_data): """ Turn *db_column_data* into something that's a valid pickle state. Valid pickle states should be acceptable to `io.BytesIO` and `pickle.UnPickler`. *db_column_dat* came from a column of data declared to be of the type that we store state information in (e.g., a BLOB on MySQL or Oracle). """ def binary_column_as_bytes(db_column_data): """ Turn *db_column_data* into a `bytes` object. Use this when the specific type must be known, for example to prefix or suffix additional byte values like that produced by `p64`. """ def enter_critical_phase_until_transaction_end(connection, cursor): """ Given a connection and cursor opened by this driver, cause it to attempt to raise its priority and return results faster. This mostly has meaning for gevent drivers, which may limit the amount of time they spend in the hub and the number of context switches to other greenlets. This phase continues until *after* the ultimate call that commits or aborts is sent, but should revert to normal as quickly as possible after that. :class:`IRelStorageAdapter` may cooperate with the driver using implementation-specific methods to end the phase at an appropriate time if there is a hidden commit. This method must be idempotent (have the same effect if called more than once) within a given transaction. """ def is_in_critical_phase(connection, cursor): """ Answer whether :meth:`enter_critical_phase_until_transaction_end` is in effect. """ def exit_critical_phase(connection, cursor): "If currently in a critical phase, de-escalate." def exception_is_deadlock(exc): """ Given an exception object, return True if it represents a deadlock in the database. The exception need not be an exception produced by the driver. """
[docs] class IDBDriverSupportsCritical(IDBDriver): """ A marker for database drivers that support critical phases. They promise that :meth:`enter_critical_phase_until_transaction_end` will do something useful. """
[docs] class IDBDriverFactory(Interface): """ Information about, and a way to get, an `IDBDriver` implementation. """ driver_name = Attribute("The name of this driver produced by this factory.") def check_availability(): """ Return a boolean indicating whether a call to this factory will return a driver (True) or will raise an error (False). """ def __call__(): # pylint:disable=signature-differs """ Return a new `IDBDriver` as represented by this factory. If it is not possible to do this, for example because the module cannot be imported, raise an `DriverNotAvailableError`. """
[docs] class DriverNotAvailableError(Exception): """ Raised when a requested driver isn't available. """ #: The name of the requested driver driver_name = None #: The `IDBDriverOptions` that was asked for the driver. driver_options = None #: The underlying reason string, for example, from an import error #: if such is available. This can be an arbitrary object; if it #: is not None, its ``str()`` value is included in our own. reason = None def __init__(self, driver_name, driver_options=None, reason=None): super().__init__(driver_name) self.driver_name = driver_name self.driver_options = driver_options self.reason = reason def _format_drivers(self): driver_factories = getattr(self.driver_options, 'known_driver_factories', lambda: ())() driver_factories = list(driver_factories) if not driver_factories: return '' formatted = ' '.join( '%r (Module: %r; Available: %s)' % ( factory.driver_name, # This attribute isn't in the interface, # it's an extension from AbstractModuleDriver getattr(factory, 'MODULE_NAME', '<unknown>'), factory.check_availability() ) for factory in driver_factories ) return ' Options: %s.' % (formatted,) def __str__(self): if self.reason is not None: reason = ' (reason=%s)' % (self.reason,) else: reason = '' return '%s: Driver %r is not available%s.%s' % ( type(self).__name__, self.driver_name, reason, self._format_drivers() ) __repr__ = __str__
[docs] class UnknownDriverError(DriverNotAvailableError): """ Raised when a driver that isn't registered at all is requested. """
[docs] class NoDriversAvailableError(DriverNotAvailableError): """ Raised when there are no drivers available. """ def __init__(self, driver_name='auto', driver_options=None, reason=None): super().__init__(driver_name, driver_options, reason)
[docs] class IDBDriverOptions(Interface): """ Implemented by a module to provide alternative drivers. """ database_type = Attribute("A string naming the type of database. Informational only.") def select_driver(driver_name=None): """ Choose and return an `IDBDriver`. The *driver_name* of "auto" is equivalent to a *driver_name* of `None` and means to choose the highest priority available driver. """ def known_driver_factories(): """ Return an iterable of the potential `IDBDriverFactory` objects that can be used by `select_driver`. Each driver factory may or may not be available. The driver factories are returned in priority order, with the highest priority driver being first. """
### # Creating and managing DB-API 2.0 connections. # ( ###
[docs] class IConnectionManager(Interface): """ Open and close database connections. This is a low-level interface; most operations should instead use a pre-existing :class:`IManagedDBConnection`. """ isolation_load = Attribute("Default load isolation level.") isolation_store = Attribute("Default store isolation level.") isolation_read_committed = Attribute("Read committed.") isolation_serializable = Attribute("Serializable.") def open( isolation=None, deferrable=False, read_only=False, replica_selector=None, application_name=None, **kwargs): """Open a database connection and return (conn, cursor).""" def close(conn=None, cursor=None): """ Close a connection and cursor, ignoring certain errors. Return a True value if the connection was closed cleanly; return a false value if an error was ignored. """ def rollback_and_close(conn, cursor): """ Rollback the connection and close it, ignoring certain errors. Certain database drivers, such as MySQLdb using the SSCursor, require all cursors to be closed before rolling back (otherwise it generates a ProgrammingError: 2014 "Commands out of sync"). This method abstracts that. :return: A true value if the connection was closed without ignoring any exceptions; if an exception was ignored, returns a false value. """ def rollback(conn, cursor): """ Like `rollback_and_close`, but without the close, and letting errors pass. If an error does happen, then the connection and cursor are closed before this method returns. """ def rollback_quietly(conn, cursor): """ Like `rollback_and_close`, but without the close. :return: A true value if the connection was rolled back without ignoring any exceptions; if an exception was ignored, returns a false value (and the connection and cursor are closed before this method returns). """ def begin(conn, cursor): """ Call this on a store connection after restarting it. This lets the store connection know that it may need to begin a transaction, even if it was freshly opened. """ def open_and_call(callback): """Call a function with an open connection and cursor. If the function returns, commits the transaction and returns the result returned by the function. If the function raises an exception, aborts the transaction then propagates the exception. """ def open_for_load(): """ Open a connection for loading objects. This connection is read only, and presents a consistent view of the database as of the time the first statement is executed. It should be opened in ``REPEATABLE READ`` or higher isolation level. It must not be in autocommit. :return: ``(conn, cursor)`` """ def restart_load(conn, cursor, needs_rollback=True): """ Reinitialize a connection for loading objects. This gets called when polling the database, so it needs to be quick. Raise one of self.disconnected_exceptions if the database has disconnected. """ def open_for_store(**open_args): """ Open and initialize a connection for storing objects. This connection is read/write, and its view of the database needs to be consistent for each statement, but should read a fresh snapshot on each statement for purposes of conflict resolution and cooperation with other store connections. It should be opened in ``READ COMMITTED`` isolation level, without autocommit. (Opening in ``REPEATABLE READ`` or higher, with a single snapshot, could reduce the use of locks, but increases the risk of serialization errors and having transactions rollback; we could handle that by raising ``ConflictError`` and letting the application retry, but only if we did that before ``tpc_finish``, and not all test cases can handle that either.) This connection will take locks on rows in the state tables, and hold them during the commit process. A connection opened by this method is the only type of connection that can hold the commit lock. :return: ``(conn, cursor)`` """ def restart_store(conn, cursor, needs_rollback=True): """ Rollback and reuse a store connection. Raise one of self.disconnected_exceptions if the database has disconnected. You can set *needs_rollback* to false if you're certain the connection does not need rolled back. """ def open_for_pre_pack(): """ Open a connection to be used for the pre-pack phase. This connection will make many different queries; each one must be consistent unto itself, but they do not all have to be consistent with each other. This is because the *first* query this object makes establishes a base state, and we will manually discard later changes seen in future queries. It will read from the state tables and write to the pack tables; it will not write to the state tables, nor hold the commit lock. It may hold share locks on state rows temporarily. This connection may be open for a long period of time, and will be committed as appropriate between queries. It is acceptable for this connection to be in autocommit mode, if required, but it is preferred for it not to be. This should be opened in ``READ COMMITTED`` isolation level. :return: ``(conn, cursor)`` """ def open_for_pack_lock(): """ Open a connection to be used for the sole purpose of holding the pack lock. Use a private connection (lock_conn and lock_cursor) to hold the pack lock. Have the adapter open temporary connections to do the actual work, allowing the adapter to use special transaction modes for packing, and to commit at will without losing the lock. If the database doesn't actually use a pack lock, this may return ``(None, None)``. """ def cursor_for_connection(conn): """ If the cursor returned by an open method was discarded for state management purposes, use this to get a new cursor. """ def add_on_store_opened(f): """ Add a callable(cursor, restart=bool) for when a store connection is opened. .. versionadded:: 2.1a1 """ def add_on_load_opened(f): """ Add a callable (cursor, restart=bool) for when a load connection is opened. .. versionadded:: 2.1a1 """ def describe_connection(conn, cursor): """ Return an object that describes the connection. The object should have a useful `str` value. .. versionadded:: 3.4.3
[docs] class IManagedDBConnection(Interface): """ A managed DB connection consists of a DB-API ``connection`` object and a single DB-API ``cursor`` from that connection. This encapsulates proper use of ``IConnectionManager``, including handling disconnections and re-connecting at appropriate times. It is not allowed to use multiple cursors from a connection at the same time; not all drivers properly support that. If the DB-API connection is not open, presumed to be good, and previously accessed, this object has a false value. "Restarting" a connection means to bring it to a current view of the database. Typically this means a rollback so that a new transaction can begin with a new MVCC snapshot. """ cursor = Attribute("The DB-API cursor to use. Read-only.") connection = Attribute("The DB-API connection to use. Read-only.") def __bool__(): """ Return true if the database connection is believed to be ready to use. """ def drop(): """ Unconditionally drop (close) the database connection. """ def rollback_quietly(): """ Rollback the connection and return a true value on success. When this completes, the connection will be in a neutral state, not idle in a transaction. If an error occurs during rollback, the connection is dropped and a false value is returned. """ def isolated_connection(): """ Context manager that opens a new, distinct connection and returns its cursor. No matter what happens in the ``with`` block, the connection will be dropped afterwards. """ def restart_and_call(f, *args, **kw): """ Restart the connection (roll it back) and call a function after doing this. This may drop and re-connect the connection if necessary. :param callable f: The function to call: ``f(conn, cursor, *args, **kwargs)``. May be called up to twice if it raises a disconnected exception on the first try. :return: The return value of ``f``. """ def enter_critical_phase_until_transaction_end(): """ As for :meth:`IDBDriver.enter_critical_phase_until_transaction_end`.
[docs] class IManagedLoadConnection(IManagedDBConnection): """ A managed connection intended for loading.
[docs] class IManagedStoreConnection(IManagedDBConnection): """ A managed connection intended for storing data.
[docs] class IReplicaSelector(Interface): """Selects a database replica""" def current(): """Get the current replica. Return a string. For PostgreSQL and MySQL, the string is either a host:port specification or host name. For Oracle, the string is a DSN. """ def next(): """Return the next replica to try. Return None if there are no more replicas defined.
[docs] class IDatabaseIterator(Interface): """Iterate over the available data in the database""" def iter_objects(cursor, tid): """Iterate over object states in a transaction. Yields (oid, prev_tid, state) for each object state. """ def iter_transactions(cursor): """ Iterate over the transaction log, newest first. Skips packed transactions. Yields (tid, username, description, extension) for each transaction. """ def iter_transactions_range(cursor, start=None, stop=None): """ Return an indexable object over the transactions in the given range, oldest first. Includes packed transactions. Has an object with the properties ``tid_int``, ``username`` (bytes) ``description`` (bytes) ``extension`` (bytes) and ``packed`` (boolean) for each transaction. """ def iter_object_history(cursor, oid): """ Iterate over an object's history. Yields an object with the properties ``tid_int``, ``username`` (bytes) ``description`` (bytes) ``extension`` (bytes) and ``pickle_size`` (int) for each transaction. :raises KeyError: if the object does not exist """ def iter_current_records(cursor, start_oid_int=0): """ Cause the *cursor* (which should be a server-side cursor) to execute a query that will iterate over ``(oid_int, tid_int, state_bytes)`` values for all the current objects. Each current object is returned only once, at the transaction most recently committed for it. Returns a generator. For compatibility with FileStorage, this must iterate in ascending OID order; it must also accept an OID to begin with for compatibility with
[docs] """ class ILocker(Interface): """Acquire and release the commit and pack locks.""" def lock_current_objects(cursor, read_current_oid_ints, shared_locks_block): """ Lock the objects being modified in the current transaction exclusively, plus the relevant rows for the objects whose OIDs are contained in *read_current_oid_ints* with a read lock. The exclusive locks should always be taken in a blocking fashion; the shared read locks should be taken without blocking (raising an exception if blocking would occur) if possible, unless *shared_locks_block* is set to True. See :meth:`IRelStorageAdapter.lock_objects_and_detect_conflicts` for a description of the expected behaviour. This should be done as part of the voting phase of TPC, before taking out the final commit lock. Returns nothing. Typically this will be followed by a call to :meth:`detect_conflict`. """ def hold_commit_lock(cursor, ensure_current=True, nowait=False): """ Acquire the commit lock. If *ensure_current* is True (the default), other tables may be locked as well, to ensure the most current data is available. When using row level locks, *ensure_current* is always implicit. With *nowait* set to True, only try to obtain the lock without waiting and return a boolean indicating if the lock was successful. **Note:** this parameter is deprecated and will be removed in the future; it is not currently used. Should raise `UnableToAcquireCommitLockError` if the lock can not be acquired before a configured timeout. """ def release_commit_lock(cursor): """Release the commit lock""" def hold_pack_lock(cursor): """Try to acquire the pack lock. Raise UnableToAcquirePackUndoLockError if packing or undo is already in progress. """
def release_pack_lock(cursor): """Release the pack lock."""
[docs] class IObjectMover(Interface): """Move object states to/from the database and within the database.""" def load_current(cursor, oid): """ Returns the current state and integer tid for an object. *oid* is an integer. Returns (None, None) if object does not exist. """ def load_currents(cursor, oids): """ Returns the oid integer, state, and integer tid for all the specified objects. *oids* is an iterable of integers. If any objects do no exist, they are ignored. """ def load_revision(cursor, oid, tid): """Returns the state for an object on a particular transaction. Returns None if no such state exists. """ def exists(cursor, oid): """Returns a true value if the given object exists.""" def load_before(cursor, oid, tid): """Returns the state and tid of an object before transaction tid. Returns (None, None) if no earlier state exists. """ def get_object_tid_after(cursor, oid, tid): """Returns the tid of the next change after an object revision. Returns None if no later state exists. """ def current_object_tids(cursor, oids, timeout=None): """ Returns the current ``{oid_int: tid_int}`` for specified object ids. Note that this may be a BTree mapping, not a dictionary. :param oids: An iterable of OID integers. :keyword float timeout: If not None, this is an approximate upper bound (in seconds) on how long this function will run. :raises AggregateOperationTimeoutError: If the timeout was exceeded. This will have one extra attribute set, ``partial_result``, which will be a (partial) mapping of the results collected before the timeout. """ def on_store_opened(cursor, restart=False): """Create the temporary table for storing objects. This method may be None, meaning no store connection initialization is required. """ def make_batcher(cursor, row_limit): """Return an object to be used for batch store operations. *row_limit* is the maximum number of rows to queue before calling the database. """ def store_temps(cursor, state_oid_tid_iter): """ Store many objects in the temporary table. *batcher* is an object returned by :meth:`make_batcher`. *state_oid_tid_iter* is an iterable providing tuples ``(data, oid_int, prev_tid_int)``. It is guaranteed that the ``oid_int`` values will be distinct. It is further guaranteed that this method will not be called more than once in a given transaction; further updates to the temporary table will be made using ``replace_temps``, which is also only called once. """ def restore(cursor, batcher, oid, tid, data): """Store an object directly, without conflict detection. Used for copying transactions into this database. batcher is an object returned by self.make_batcher(). """ def detect_conflict(cursor): """ Find all conflicts in the data about to be committed (as stored by :meth:`store_temps`) Returns a sequence of ``(oid, committed_tid, attempted_committed_tid, committed_state)`` where each entry refers to a conflicting object. The *committed_state* **must** be returned. This method should be called during the ``tpc_vote`` phase of a transaction, with :meth:`ILocker.lock_current_objects` held. """ def replace_temps(cursor, state_oid_tid_iter): """ Replace all objects in the temporary table with new data from *state_oid_tid_iter*. This happens after conflict resolution. The param is as for ``store_temps``. Implementations should try to perform this in as few database operations as possible. """ def move_from_temp(cursor, tid, txn_has_blobs): """ Move the temporarily stored objects to permanent storage. *tid* is the integer tid of the transaction being committed. Returns nothing. The steps should be as follows: - If we are preserving history, then ``INSERT`` into ``object_state`` the values stored in ``temp_store``, remembering to coalesce the ``LENGTH(temp_store.state)``. - Otherwise, when we are not preserving history, ``INSERT`` missing rows from ``object_state`` into ``temp_store``, and ``UPDATE`` rows that were already there. (This is best done with an upsert). If blobs are involved, then ``DELETE`` from ``blob_chunk`` where the OID is in ``temp_store``. - For both types of storage, ``INSERT`` into ``blob_chunk`` the values from ``temp_blob_chunk``. In a history-free storage, this may be combined with the last step in an ``UPSERT``. """ def update_current(cursor, tid): """ Update the current object pointers. *tid* is the integer tid of the transaction being committed. Returns nothing. This does nothing when the storage is history free. When the storage preserves history, all the objects in ``object_state`` having the given *tid* should have their (oid, *tid*) stored into ``current_object``. This can be done with a single upsert. XXX: Why do we need to look at ``object_state``? Is there a reason we can't look at the smaller ``temp_store``? Conflict resolution maybe? """ def download_blob(cursor, oid, tid, filename): """Download a blob into a file. Returns the size of the blob file in bytes. """ def upload_blob(cursor, oid, tid, filename): """Upload a blob from a file.
If tid is None, upload to the temporary table. """
[docs] class IOIDAllocator(Interface): """ Allocate OIDs and control future allocation. The cursor passed here must be from a :meth:`store connection <IConnectionManager.open_for_store>`. """ def new_oids(cursor): """ Return a new :class:`list` of new, unused integer OIDs. The list should be contiguous and must be in sorted order from highest to lowest. It must never contain 0. """ def set_min_oid(cursor, oid_int): """ Ensure the next OID (the rightmost value from :meth:`new_oids`) is greater than the given *oid_int*. """ def reset_oid(cursor): """
Cause the sequence of OIDs to begin again from the beginning. """
[docs] class IPackUndo(Interface): """Perform pack and undo operations""" MAX_TID = Attribute("The maximum TID that can be stored.") def verify_undoable(cursor, undo_tid): """Raise UndoError if it is not safe to undo the specified txn. """ def undo(cursor, undo_tid, self_tid): """Undo a transaction. Parameters: "undo_tid", the integer tid of the transaction to undo, and "self_tid", the integer tid of the current transaction. Returns the states copied forward by the undo operation as a list of (oid, old_tid). May raise UndoError. """ def fill_object_refs(conn, cursor, get_references): """Update the object_refs table by analyzing new transactions. """ def choose_pack_transaction(pack_point): """Return the transaction before or at the specified pack time. Returns None if there is nothing to pack. """ def pre_pack(pack_tid, get_references): """Decide what to pack. pack_tid specifies the most recent transaction to pack. get_references is a function that accepts a stored object state and returns a set of OIDs that state refers to. """ def pack(pack_tid, packed_func=None): """Pack. Requires the information provided by pre_pack. packed_func, if provided, will be called for every object state packed, just after the object is removed. The function must accept two parameters, oid and tid (64 bit integers). """ def deleteObject(cursor, oid_int, tid_int): """ Delete the revision of *oid_int* in transaction *tid_int*. This method marks an object as deleted via a new object revision. Subsequent attempts to load current data for the object will fail with a POSKeyError, but loads for non-current data will suceed if there are previous non-delete records. The object will be removed from the storage when all not-delete records are removed. The serial argument must match the most recently committed serial for the object. This is a seat belt. --- Documentation for ``IExternalGC`` In history-free databases there is no such thing as a delete record, so this should remove the single revision of *oid_int* (which *should* be checked to verify it is at *tid_int*), leading all access to *oid_int* in the future to throw ``POSKeyError``. In history preserving databases, this means that a new revision of the object with a NULL state is created when the transaction is committed. The NULL state signifies that it's been deleted. A subsequent
pack operation is required to actually remove these deleted items.
[docs] """ class IPoller(Interface): """Poll for new data""" def get_current_tid(cursor): """ Returns the highest transaction ID visible to the cursor. If there are no transactions, returns 0. """ def poll_invalidations(conn, cursor, prev_polled_tid): """ Polls for new transactions. *conn* and *cursor* must have been created previously by ``open_for_load()`` (a snapshot connection). *prev_polled_tid* is the tid returned at the last poll, or None if this is the first poll. If the database has disconnected, this method should raise one of the exceptions listed in the disconnected_exceptions attribute of the associated IConnectionManager. Returns ``(changes, new_polled_tid)``, where changes is either an iterable of (oid, tid) that have changed, or None to indicate that the changes are too complex to list; this must cause local storage caches to be invalidated. ``new_polled_tid`` is never None. Important: You must consume the changes iterable, and you must not make any other queries until you do. This method may raise :class:`StaleConnectionError` (a ``ReadConflictError``) if the database has reverted to an earlier transaction, which can happen in an asynchronously replicated database. This exception is one that is transient and most transaction middleware will catch it and retry the
[docs] """ class ISchemaInstaller(Interface): """Install the schema in the database, clear it, or uninstall it""" def prepare(): """ Create the database schema if it does not already exist. Perform any migration steps needed, and call :meth:`verify` before returning. """ def verify(): """ Ensure that the schema that's installed can be used by this RelStorage. If it cannot, for example it's history-preserving and we were configured to be history-free, raise an exception. """ def zap_all(): """Clear all data out of the database."""
def drop_all(): """Drop all tables and sequences."""
[docs] class IScriptRunner(Interface): """Run database-agnostic SQL scripts. Using an IScriptRunner is appropriate for batch operations and uncommon operations that can be slow, but is not appropriate for performance-critical code. """ script_vars = Attribute( """A mapping providing replacements for parts of scripts. Used for making scripts compatible with databases using different parameter styles. """) def run_script_stmt(cursor, generic_stmt, generic_params=()): """Execute a statement from a script with the given parameters. generic_params should be either an empty tuple (no parameters) or a map. The input statement is generic and will be transformed into a database-specific statement. """ def run_script(cursor, script, params=()): """Execute a series of statements in the database. params should be either an empty tuple (no parameters) or a map. The statements are transformed by run_script_stmt before execution. """ def run_many(cursor, stmt, items): """Execute a statement repeatedly. Items should be a list of tuples.
stmt should use '%s' parameter format (not %(name)s). """ # Note: the Oracle implementation also provides run_lob_stmt, which # is useful for reading LOBs from the database quickly.
[docs] class ITransactionControl(Interface): """Begin, commit, and abort transactions.""" def get_tid(cursor): """Returns the most recent tid.""" def add_transaction(cursor, tid, username, description, extension, packed=False): """Add a transaction.""" def delete_transaction(cursor, tid): """Remove a transaction.""" def commit_phase1(store_connection, tid): """ Begin a commit. Returns the transaction name. The transaction name must not be None. This method should guarantee that :meth:`commit_phase2` will succeed, meaning that if commit_phase2() would raise any error, the error should be raised in :meth:`commit_phase1` instead. :param store_connection: An :class:`IManagedStoreConnection` """ def commit_phase2(store_connection, txn, load_connection): """ Final transaction commit. *txn* is the name returned by :meth:`commit_phase1`. :param store_connection: An :class:`IManagedStoreConnection` This is what must be committed. :param load_connection: An :class:`IManagedLoadConnection` corresponding to the store connection. If helpful to the database, (for example, for resource reasons) implementations may rollback the connection immediately before committing the store connection. """ def abort(store_connection, txn=None): """ Abort the commit, ignoring certain exceptions. If *txn* is not None, phase 1 is also aborted. :param store_connection: An :class:`IManagedStoreConnection` :return: A true value if the connection was rolled back without ignoring any exceptions; if an exception was ignored, returns a false value (and the connection
and cursor are closed before this method returns).
[docs] """ class IDBStats(Interface): """ Collecting, viewing and updating database information. """ def get_object_count(): """Returns the approximate number of objects in the database""" def get_db_size(): """Returns the approximate size of the database in bytes""" def large_database_change(): """ Call this when the database has changed substantially, and it would be a good time to perform any updates or
optimizations. """
[docs] class IRelStorageAdapter(Interface): """ A database adapter for RelStorage. Historically, this has just been a holding place for other components for the particular database. However, it is moving to holding algorithms involved in the storage; this facilitates moving chunks of functionality into database stored procedures as appropriate. The basic algorithms are implemented in :class:`.adapter.AbstractAdapter`. """ driver = Object(IDBDriver) connmanager = Object(IConnectionManager) dbiter = Object(IDatabaseIterator) keep_history = Bool(description="True if this adapter supports undo") locker = Object(ILocker) mover = Object(IObjectMover) oidallocator = Object(IOIDAllocator) packundo = Object(IPackUndo) poller = Object(IPoller) runner = Object(IScriptRunner) schema = Object(ISchemaInstaller) stats = Object(IDBStats) txncontrol = Object(ITransactionControl) def new_instance(): """ Return an instance for use by another RelStorage instance. Adapters that are stateless can simply return self. Adapters that have mutable state must make a clone and return it. """ def release(): """ Release the resources held uniquely by this instance. """ def close(): """ Release the resources held by this instance and all child instances. """ def __str__(): """Return a short description of the adapter""" def lock_database_and_choose_next_tid(cursor, username, description, extension): """ Lock the database with the commit lock and allocate the next tid. In a simple implementation, this will first obtain the commit lock with around :meth:`ILocker.hold_commit_lock`. Then it will query the current most recently committed TID with :meth:`ITransactionControl.get_tid`. It will choose the next TID based on that value and the current timestamp, and then it will write that value to the database with :meth:`ITransactionControl.add_transaction`. The *username*, *description* and *extension* paramaters are as for ``add_transaction. :return: The new TID integer. """ def lock_database_and_move( store_connection, load_connection, transaction_has_blobs, ude, commit=True, committing_tid_int=None, after_selecting_tid=None ): """ Lock the database, choose the next TID, and move temporary data into its final place. This is used in two modes. In the usual case, *commit* will be true, and this method is called to implement the final step of ``tpc_finish``. In that case, this method is responsible for committing the transaction (using the *store_connection* provided). When *commit* is false, this method is effectively part of ``tpc_vote`` and must **not** commit. The *blobhelper* is the :class:`IBlobHelper`. This method is responsible for moving blob data into place if the blob data is stored on the server and there are blobs in this transaction. (Implementations that use stored procedures will probably not need this argument; it is here to be able to provide ``txn_has_blobs`` to :meth:`IObjectMover.move_from_temp`.) *ude* is a tuple of ``(username, description, extension)``. If *committing_tid_int* is None, then this method must lock the database and choose the next TID as if by calling :meth:`lock_database_and_choose_next_tid` (passing in the expanded *ude*); if it is **not** None, the database has already been locked and the TID selected. *after_selecting_tid* is a function of one argument, the committing integer TID. If it is proveded, it must be called once the TID has been selected and temporary data moved into place. Implementations are encouraged to do all of this work in as few calls to the database as possible with a stored procedure (ideally one call). The default implementation will use :meth:`lock_database_and_choose_next_tid`, :meth:`IObjectMover.move_from_temp`, :meth:`IObjectMover.update_current` and :meth:`ITransactionControl.commit_phase1` and :meth:`ITransactionControl.commit_phase2`. When committing, implementations are encouraged to exit any :meth:`critical phase <IDBDriver.enter_critical_phase_until_transaction_end>` in the most timely manner possible after ensuring that the commit request has been sent, especially if only one communication with the database is required that may block for an arbitrary time to get the lock. :param load_connection: The load connection corresponding to the store connection. When *commit* is true, this **may** be rolled back immediately before actually committing the *store_connection*, if that assists the database (for example, with resource management). If *commit* is not true, the load connection must not be changed. Rolling back the load connection is optional. Be careful to use the *store_connection* for all operations requiring a current view of the database or any writes. The load connection has a historical view and is not writable. :return: A tuple ``(committing_tid_int, prepared_txn_id)``; the *prepared_txn_id* is irrelevant if *commit* was true. """ def lock_objects_and_detect_conflicts( cursor, read_current_oids, ): """ Without taking the commit lock, lock the objects this transaction wants to modify (for exclusive update) and the objects in *read_current_oids* (for shared read). Returns an iterable of ``(oid_int, committed_tid_int, tid_this_txn_saw_int, committed_state)`` for current objects that were locked, plus objects which had conflicts. Implementations are encouraged to do all this work in as few calls to the database as possible with a stored procedure. The default implementation will use :meth:`ILocker.lock_current_objects`, :meth:`IObjectMover.current_object_tids`, and :meth:`IObjectMover.detect_conflicts`. This method may raise the same lock exceptions and :meth:`ILocker.lock_current_objects`. In particular, it should take care to distinguish between a failure to acquire an update lock and a failure to acquire a read lock by raising the appropriate exceptions (:class:`UnableToLockRowsToModifyError` and :class:`UnableToLockRowsToReadCurrentError`, respectively). Because two separate classes of locks are to be obtained, implementations will typically need to make two separate locking queries. If the second of those queries fails, the implementation is encouraged to immediately release locks taken by the first operation before raising an exception. Ideally this happens in the database stored procedure. (PostgreSQL works this way automatically --- any error rolls the transaction back and releases locks --- but this takes work on MySQL except in the case of deadlock.) .. rubric:: Read Current Locks The *read_current_oids* maps OID integers to expected TID integers. Each such object must be read share locked for the duration of the transaction so we can ensure that the final commit occurs without those objects having been changed. From ReadVerifyingStorage's ``checkCurrentSerialInTransaction``: "If no [ReadConflictError] exception is raised, then the serial must remain current through the end of the transaction." Applications sometimes (often) perform readCurrent() on the *wrong* object (for example: the ``BTree`` object or the ``zope.container`` container object, when what is really required, what will actually be modified, is a ``BTree`` bucket---very hard to predict), so very often these objects will not ever be modified. (Indeed, ``BTree.__setitem__`` and ``__delitem__`` perform readCurrent on the BTree itself; but once the first bucket has been established, the BTree will not usually be modified.) A share lock is enough to prevent any modifications without causing unnecessary blocking if the object would never be modified. In the results, if the ``tid_this_txn_saw_int`` is ``None``, that was an object we only read, and share locked. If the ``committed_tid_int`` does not match the TID we expected to get, then the caller will raise a ``ReadConflictError`` and abort the transaction. The implementation is encouraged to return all such rows *first* so that these inexpensive checks can be accomplished before the more expensive conflict resolution process. Optionally, if this method can detect a read current violation based on the data in *read_current_oids* at the database level, it may raise a :class:`ReadConflictError`. This method is also allowed and encouraged to *only* return read current violations; further, it need only return the first such violation (because an exception will immediately be raised.) Implementations are encouraged to use the database ``NOWAIT`` feature (or equivalent) to take read current locks. If such an object is already locked exclusively, that means it is being modified and this transaction is racing the modification transaction. Taking the lock with ``NOWAIT`` and raising an error lets the write transaction proceed, while this one rolls back and retries. (Hopefully the write transaction finishes quickly, or several retries may be needed.) .. rubric:: Modified Objects and Conflicts All objects that this transaction intends to modify (which are in the ``temp_store`` table) must be exclusively write locked when this method returns. The remainder of the results (where ``tid_this_txn_saw_int`` is not ``None``) give objects that we have detected a conflict (a modification that has committed earlier to an object this transaction also wants to modify). As an optimization for conflict resolution, ``committed_state`` may give the current committed state of the object (corresponding to ``committed_tid_int``), but is allowed to be ``None`` if there isn't an efficient way to query that in bulk from the database. .. rubric:: Deadlocks, and Shared vs Exclusive Locks It might seem that, because no method of a transaction (*except* for ``restore()``) writes directly to the ``object_state`` or ``current_object`` table *before* acquiring the commit lock, a share lock is enough even for objects we're definitely going to modify. That way leads to deadlock, however. Consider this order of operations: 1. Tx a: LOCK OBJECT 1 FOR SHARE. (Tx a will modify this.) 2. Tx b: LOCK OBJECT 1 FOR SHARE. (Tx b is just readCurrent.) 3. Tx a: Obtain commit lock. 4. Tx b: attempt to obtain commit lock; block. 5. Tx a: UPDATE OBJECT 1; attempt to escalate shared lock to exclusive lock. --> DEADLOCK with the shared lock from step 2. Tx a needs to raise the lock of object 1, but Tx b's share lock is preventing it. Meanwhile, Tx b wants the commit lock, but Tx a is holding it. If Tx a took an exclusive lock, it would either block Tx b from getting a share lock, or be blocked by Tx b's share lock; either way, whichever one got to the commit lock would be able to complete. Further, it is trivial to show that when using two lock classes, two transactions that have overlapping sets of objects (e.g., a wants shared on ``(1, 3, 5, 7)`` and exclusive on ``(2, 4, 6, 8)`` and b wants shared on ``(2, 4, 6, 8)`` and exclusive on ``(3, 5, 7)``), can easily deadlock *before* taking the commit lock, no matter how we interleave those operations. This is true if they both take their exclusive locks first and then attempt share locks on the remainder, both take shared locks on everything and attempt to upgrade to exclusive on that subset, or both take just the shared locks and then attempt to take the exclusive locks. This extends to more than two processes. **That's perfectly fine.** As long as the database either supports ``NOWAIT`` (immediately error when you fail to get a requested lock) or rapid deadlock detection resulting in an error, we can catch that error and turn it into the ``ReadConflictError`` it actually is. PostgreSQL supports ``NOWAIT`` (and deadlock detection, after a small but configurable delay). MySQL's InnoDB supports rapid deadlock detection, and starting with MySQL 8, it supports ``NOWAIT``. Transactions that deadlock would have been doomed anyway; a deadlock is just another way of saying there will be a readCurrent conflict. .. rubric:: Lock Order The original strategy was to first take exclusive locks of things we will be modifying. Once that succeeds, then we attempt shared locks of readCurrent using ``NOWAIT``. If that fails because we can't get a lock, we know someone is in the process of modifying it and we have a conflict. If we get the locks, we still have to confirm the TIDs are the things we expect. (A possible optimization is to do those two steps at once, in the database. ``SELECT FOR SHARE WHERE oid = X and TID = x``. If we don't get the right number of rows, conflict.) This prioritizes writers over readers: readers fail at the expense of writers. However, it means that if we're going to fail a transaction because an object we'd like to read lock has been modified, we have to wait until we timeout, or acquire all of our exclusive locks. Depending on transaction ordering, this could mean unnecessarily long delays. Suppose Tx a wants to write to objects 1 and 2, and Tx b wants to write to 1 but only read 2. (Recall that a ZODB connection automatically upgrades readCurrent() into just modifications. 1 and 2 could be bank accounts; Tx a is doing a transfer, but Tx b is checking collateral (account 2) for a loan that was approved and transferring that into 1.) 1. Tx a: LOCK 1, 2 EXCLUSIVE. (No share locks.) 2. Tx b: LOCK 1 EXCLUSIVE. -> Block; queue for lock 1. 3. Tx a: Resolve a conflict in 1, wait for the commit lock, and finally commit and release the locks on 1 and 4. Tx c: LOCK 2 EXCLUSIVE. 5. Tx b: LOCK 2 SHARE -> wait exception. Here, Tx b had to wait while Tx a finished its entire business, only to have Tx c swoop in and get the lock first, leading to Tx b raising an exception. Some databases guarantee that locks are handed off in FIFO fashion, but not all do. Even if the database granted Tb b the share lock first, it would still discover that the TID had changed and raise an exception. Meanwhile, Tx b has been holding an exclusive lock on 1 this entire time, preventing anyone else from modifying it. If we take the share locks first, the scenario looks like this: 1. Tx a: LOCK 1, 2 EXCLUSIVE. (No share locks.) 2. Tx b: LOCK 2 SHARE -> wait exception; begin retry. Tx b gets much quicker notification that it won't be able to progress and begins a retry. Both orders have the problem that if Tx a takes longer to commit than Tx b does to begin its retry, Tx b may take the same action against the same state of the database several times in a row before giving up. What's different is that in the exclusive first version, that only happens if Tx a and Tx b have no exclusive locks in common: 1. Tx a: Lock 1 EXCLUSIVE. 2. Tx b: Lock 2 EXCLUSIVE. 3. Tx b: Lock 1 SHARE -> wait exception, begin retry. However, this can be mitigated by introducing small backoff delays in the transaction retry logic. Suppose a different transaction already modified object 2. With the original lock order (exclusive first), the scenario doesn't change. Tx b has to wait for Tx a to finish before it can determine that fact (Tx a has to resolve a conflict or fail). If Tx b got to go first, it would relatively quickly discover this fact, but at the cost of waiting for an exclusive lock for an arbitrary amount of time: 1. Tx b: Lock 1 EXCLUSIVE. (Potentially blocks.) 2. Tx a: Lock 1, 2 EXCLUSIVE. -> Block; queue for lock 1. 3. Tx b: Lock 2 SHARE. 4. Tx b: Determine 2 has been modified; raise ReadConflictError. Taking the share lock first solves this concern; Tx b is immediately able to determine that 2 has been modified and quickly raise an exception without holding any other locks. :param cursor: The store cursor. :param read_current_oids: A mapping from oid integer to tid
integer that the transaction expects. """ ### # Exceptions
[docs] ###
class ReplicaClosedException(Exception):
[docs] """The connection to the replica has been closed"""
class UnableToAcquireLockError(Exception):
[docs] "A lock cannot be acquired." class UnableToAcquireCommitLockError(StorageError, UnableToAcquireLockError): """ The commit lock cannot be acquired due to a timeout. This means some other transaction had the lock we needed. Retrying the transaction may succeed.
However, for historical reasons, this exception is not a ``TransientError``. """
[docs] class UnableToLockRowsDeadlockError(ConflictError, UnableToAcquireLockError): """
The root class for a lock error because there was a deadlock. """
[docs] # TransientError -> ConflictError -> ReadConflictError class UnableToLockRowsToModifyError(ConflictError, UnableToAcquireLockError): """ We were unable to lock one or more rows that we intend to modify due to a timeout. This means another transaction already had the rows locked, and thus we are in conflict with that transaction. Retrying the transaction may succeed.
This is a type of ``ConflictError``, which is a transient error.
[docs] """ class UnableToLockRowsToModifyDeadlockError(UnableToLockRowsToModifyError,
UnableToLockRowsDeadlockError): pass
[docs] UnableToLockRowsToModifyError.DEADLOCK_VARIANT = UnableToLockRowsToModifyDeadlockError class UnableToLockRowsToReadCurrentError(ReadConflictError, UnableToAcquireLockError): """ We were unable to lock one or more rows that belong to an object that ``Connection.readCurrent()`` was called on. This means another transaction already had the rows locked with intent to modify them, and thus we are in conflict with that transaction. Retrying the transaction may succeed.
This is a type of ``ReadConflictError``, which is a transient error.
[docs] """ class UnableToLockRowsToReadCurrentDeadlockError(UnableToLockRowsToReadCurrentError,
UnableToLockRowsDeadlockError): pass
[docs] UnableToLockRowsToReadCurrentError.DEADLOCK_VARIANT = UnableToLockRowsToReadCurrentDeadlockError
class UnableToAcquirePackUndoLockError(StorageError, UnableToAcquireLockError): """A pack or undo operation is in progress."""
[docs] class StaleConnectionError(ReadConflictError): """ Raised by `IPoller.poll_invalidations` when a stale connection is detected. """ @classmethod def from_prev_and_new_tid(cls, prev_polled_tid, new_polled_tid): return cls(
"The database connection is stale: new_polled_tid=%d, " "prev_polled_tid=%d." % (new_polled_tid, prev_polled_tid))
[docs] class AggregateOperationTimeoutError(Exception): """ Raised when an aggregate operation in RelStorage detects that it has exceeded a specified timeout. """ #: If the function knows a useful, partial result, it may set this
#: attribute. Check it against the class value to see if it's been set. partial_result = object()