Source code for relstorage.adapters.mover

##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""IObjectMover implementation.
"""
from __future__ import absolute_import
from __future__ import print_function

import os

from zope.interface import implementer

from .._compat import OID_TID_MAP_TYPE
from .._compat import md5
from .._util import metricmethod_sampled
from .._util import metricmethod
from ._util import noop_when_history_free
from ._util import query_property as _query_property
from ._util import DatabaseHelpersMixin
from .._compat import ABC
from .batch import RowBatcher
from .interfaces import IObjectMover
from .interfaces import AggregateOperationTimeoutError
from .schema import Schema
from .sql import it
from .sql.schema import ColumnExpression

objects = Schema.all_current_object_state
object_state = Schema.object_state


def _compute_md5sum(_self, data):
    if data is None:
        return None
    return md5(data).hexdigest()



[docs] @implementer(IObjectMover) class AbstractObjectMover(DatabaseHelpersMixin, ABC): def __init__(self, database_driver, options, runner=None, version_detector=None, batcher_factory=RowBatcher): """ :param database_driver: The `IDBDriver` in use. """ self.driver = database_driver self.keep_history = options.keep_history self.blob_chunk_size = options.blob_chunk_size self.runner = runner self.version_detector = version_detector self.make_batcher = batcher_factory _compute_md5sum = noop_when_history_free(_compute_md5sum) _load_current_query = objects.select( objects.c.state, objects.c.tid ).where( objects.c.zoid == objects.orderedbindparam() ).prepared()
[docs] @metricmethod_sampled def load_current(self, cursor, oid): """Returns the current pickle and integer tid for an object. oid is an integer. Returns (None, None) if object does not exist. """ stmt = self._load_current_query stmt.execute(cursor, (oid,)) # Note that we cannot rely on cursor.rowcount being # a valid indicator. The DB-API doesn't require it, and # some implementations, like MySQL Connector/Python are # unbuffered by default and can't provide it. row = cursor.fetchone() if row: state, tid = row state = self.driver.binary_column_as_state_type(state) # If it's None, the object's creation has been # undone. return state, tid return None, None
_load_currents_queries = ( (('zoid', 'state', 'tid'), 'current_object JOIN object_state USING(zoid, tid)', 'zoid'), (('zoid', 'state', 'tid'), 'object_state', 'zoid'), ) _load_currents_query = _query_property('_load_currents')
[docs] @metricmethod_sampled def load_currents(self, cursor, oids): """Returns the current (oid, state, tid) for specified object ids.""" columns, table, filter_column = self._load_currents_query binary_column_as_state_type = self.driver.binary_column_as_state_type batcher = self.make_batcher(cursor, row_limit=1000) rows = batcher.select_from(columns, table, **{filter_column: oids}) for row in rows: oid, state, tid = row yield oid, binary_column_as_state_type(state), tid
_load_revision_query = object_state.select( object_state.c.state ).where( object_state.c.zoid == object_state.orderedbindparam() ).and_( object_state.c.tid == object_state.orderedbindparam() ).prepared()
[docs] @metricmethod_sampled def load_revision(self, cursor, oid, tid): """Returns the pickle for an object on a particular transaction. Returns None if no such state exists. """ stmt = self._load_revision_query stmt.execute(cursor, (oid, tid)) row = cursor.fetchone() if row: (state,) = row return self.driver.binary_column_as_state_type(state) return None
_exists_query = Schema.all_current_object.select( Schema.all_current_object.c.zoid ).where( Schema.all_current_object.c.zoid == Schema.all_current_object.orderedbindparam() )
[docs] @metricmethod_sampled def exists(self, cursor, oid): """Returns a true value if the given object exists.""" stmt = self._exists_query stmt.execute(cursor, (oid,)) row = cursor.fetchone() return row
_load_before_query = object_state.select( object_state.c.state, object_state.c.tid ).where( object_state.c.zoid == object_state.orderedbindparam() ).and_( object_state.c.tid < object_state.orderedbindparam() ).order_by( object_state.c.tid, "DESC" ).limit( 1 ).prepared()
[docs] @metricmethod_sampled def load_before(self, cursor, oid, tid): """Returns the pickle and tid of an object before transaction tid. Returns (None, None) if no earlier state exists. """ self._load_before_query.execute(cursor, (oid, tid)) row = cursor.fetchone() if row: state, tid = row state = self.driver.binary_column_as_state_type(state) # None in state means The object's creation has been undone return state, tid return None, None
_get_tid_after_query = object_state.select( object_state.c.tid ).where( object_state.c.zoid == object_state.orderedbindparam() ).and_( object_state.c.tid > object_state.orderedbindparam() ).order_by( object_state.c.tid, "ASC" ).limit( 1 ).prepared()
[docs] @metricmethod_sampled def get_object_tid_after(self, cursor, oid, tid): """Returns the tid of the next change after an object revision. Returns None if no later state exists. """ self._get_tid_after_query.execute(cursor, (oid, tid)) row = cursor.fetchone() return row[0] if row else None
_current_object_tids_queries = ( (('zoid', 'tid'), 'current_object', 'zoid'), (('zoid', 'tid'), 'object_state', 'zoid'), ) _current_object_tids_query = _query_property('_current_object_tids') _current_object_tids_map_type = OID_TID_MAP_TYPE
[docs] @metricmethod def current_object_tids(self, cursor, oids, timeout=None): """Returns the current {oid: tid} for specified object ids.""" # This is a metricmethod, not a metricmethod_sampled because only databases that # use composed lock_objects_and_detect_conflicts call it for every transaction, # and even then they only call it once, so its like the tpc_* methods. # Other databases use it only when restoring the cache at startup (just once) so its # unlikely to get sampled (see relstorage.cache.mvcc). res = self._current_object_tids_map_type() columns, table, filter_column = self._current_object_tids_query batcher = self.make_batcher(cursor) rows = batcher.select_from(columns, table, timeout=timeout, **{filter_column: oids}) if timeout: # Do the collecting and iterating in Python so we can handle partial results res = self._current_object_tids_map_type() try: for (oid, tid) in rows: res[oid] = tid except AggregateOperationTimeoutError as ex: ex.partial_result = res raise else: # Do the collecting and iterating in C res = self._current_object_tids_map_type(list(rows)) return res
[docs] def on_store_opened(self, cursor, restart=False): """ Hook for subclasses. """
[docs] def on_load_opened(self, cursor, restart=False): """ Hook for subclasses. """
_store_temp_query = Schema.temp_store.upsert( it.c.zoid, it.c.prev_tid, it.c.md5, it.c.state ).on_conflict( it.c.zoid ).do_update( it.c.prev_tid, it.c.md5, it.c.state )
[docs] @metricmethod_sampled def store_temps(self, cursor, state_oid_tid_iter): """ Uses the cursor's ``executemany`` method to store temporary objects. :param state_oid_tid_iter: An iterable over tuples ``(state, oid_int, tid_int)``. Data may be None to indicate we should store a NULL. If there is a more optimal way to implement putting objects in the database, please do so. - On SQLite, ``executemany`` is implemnted in a C looping over the provided iterator. Which it turns out is exactly what the normal ``execute`` method also does (it just uses a one-row iterator). So ``executemany`` that saves substantial setup overhead dealing with sqlite's prepared statements. - On Postgresql, we use COPY for this (unless we're using the 'gevent psycopg2' driver; it's the only thing that doesn't support COPY). None of the supported PostgreSQL drivers have a good ``executemany`` method, so they should fall back to using our own RowBatcher. - On Oracle, we use the RowBatcher with a combination of bulk array operations and direct inserts. - On MySQL, the preferred driver (mysqlclient) has a decent implementation of executemany for INSERT or REPLACE (basically an optimized form of what our RowBatcher does). That implementation is shared with PyMySQL as well, but it must be a simple INSERT statement matching a regular expression. Note that it has a bug though: it can't handle an iterator that's empty. """ query = self._store_temp_query do_md5 = self._compute_md5sum Binary = self.driver.Binary query.executemany( cursor, ( (oid_int, tid_int, do_md5(data), Binary(data)) for (data, oid_int, tid_int) in state_oid_tid_iter ) )
[docs] @metricmethod_sampled def replace_temps(self, cursor, state_oid_tid_iter): """ Assumes that ``store_temps`` is using an upsert query and simply calls that method. The same comments apply. In particular, MySQLclient won't optimize an UPDATE in the same way it does an INSERT. """ self.store_temps(cursor, state_oid_tid_iter)
@metricmethod_sampled def _generic_restore(self, batcher, oid, tid, data, command, suffix): """ Store an object directly, without conflict detection. Used for copying transactions into this database. Either the *command* or the *suffix* must be capable of handling conflicts in a single query. For example, ``command='INSERT OR REPLACE'`` or ``command='INSERT', suffix='ON CONFLICT (zoid) DO...`` """ if data is not None: encoded = self.driver.Binary(data) size = len(data) else: encoded = None size = 0 if self.keep_history: # We can record deletion/un-creation via a null state. md5sum = self._compute_md5sum(data) row_schema = """ %s, %s, COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0), %s, %s, %s """ batcher.insert_into( "object_state (zoid, tid, prev_tid, md5, state_size, state)", row_schema, (oid, tid, oid, md5sum, size, encoded), rowkey=(oid, tid), size=size, command=command, suffix=suffix ) elif data: # history free can only delete the entire record. batcher.insert_into( "object_state (zoid, tid, state_size, state)", "%s, %s, %s, %s", (oid, tid, size, encoded), rowkey=oid, size=size, command=command, suffix=suffix ) else: batcher.delete_from('object_state', zoid=oid) def restore(self, cursor, batcher, oid, tid, data): raise NotImplementedError() _detect_conflict_query = Schema.temp_store.inner_join( Schema.all_current_object_state ).using( Schema.all_current_object_state.c.zoid ).select( Schema.temp_store.c.zoid, Schema.all_current_object_state.c.tid, Schema.temp_store.c.prev_tid, Schema.all_current_object_state.c.state, ).where( # Some databases may be able to benefit from prev_tid <> 0, but it depends # on their ability to make use of indexes Schema.all_current_object_state.c.tid != Schema.temp_store.c.prev_tid ).prepared() @metricmethod_sampled def detect_conflict(self, cursor): self._detect_conflict_query.execute(cursor) # Note that we're not transforming the state into # bytes; it doesn't seem to be needed here, even with sqlite3 # on Python 2 (where it is a buffer). rows = cursor.fetchall() return rows # Subclasses may override any of these queries if there is a # more optimal form. _move_from_temp_hp_insert_query = Schema.object_state.insert( ).from_select( (Schema.object_state.c.zoid, Schema.object_state.c.tid, Schema.object_state.c.prev_tid, Schema.object_state.c.md5, Schema.object_state.c.state_size, Schema.object_state.c.state), Schema.temp_store.select( Schema.temp_store.c.zoid, Schema.temp_store.orderedbindparam(), Schema.temp_store.c.prev_tid, Schema.temp_store.c.md5, 'COALESCE(LENGTH(state), 0)', Schema.temp_store.c.state ).order_by( Schema.temp_store.c.zoid ) ).prepared() _move_from_temp_hf_upsert_query = Schema.object_state.upsert( it.c.zoid, it.c.tid, it.c.state_size, it.c.state ).from_select( (Schema.temp_store.c.zoid, Schema.object_state.c.tid, # correct column for typing Schema.object_state.c.state_size, Schema.temp_store.c.state), Schema.temp_store.select( it.c.zoid, ColumnExpression(it.orderedbindparam()).aliased('tid'), ColumnExpression('COALESCE(LENGTH(state), 0)').aliased('state_size'), it.c.state ).order_by( it.c.zoid ) ).on_conflict( it.c.zoid ).do_update( it.c.state, it.c.tid, it.c.state_size ).prepared() _move_from_temp_copy_blob_query = Schema.blob_chunk.insert( ).from_select( (Schema.blob_chunk.c.zoid, Schema.blob_chunk.c.tid, Schema.blob_chunk.c.chunk_num, Schema.blob_chunk.c.chunk), Schema.temp_blob_chunk.select( Schema.temp_blob_chunk.c.zoid, Schema.temp_blob_chunk.orderedbindparam(), Schema.temp_blob_chunk.c.chunk_num, Schema.temp_blob_chunk.c.chunk ) ).prepared() _move_from_temp_hf_delete_blob_chunk_query = """ DELETE FROM blob_chunk WHERE zoid IN (SELECT zoid FROM temp_store) """ def _move_from_temp_object_state(self, cursor, tid): """ Called for history-free databases. Should replace all entries in object_state with the same zoid from temp_store. This implementation is in two steps, first deleting from ``object_state`` with :attr:`_move_from_temp_hf_delete_query`, and then copying from ``temp_store`` using :attr:`_move_from_temp_hf_insert_query`. If a subclass can do this in a single step with an ``UPSERT``, it should set :attr:`_move_from_temp_hf_delete_query` to a false value. Recall that the queries that touch ``current_object`` and ``object_state`` need to be certain the order they use (by ``zoid``) to avoid deadlocks. Blobs are handled separately. """ stmt = self._move_from_temp_hf_upsert_query __traceback_info__ = stmt stmt.execute(cursor, (tid,))
[docs] @metricmethod_sampled def move_from_temp(self, cursor, tid, txn_has_blobs): """ Move the temporarily stored objects to permanent storage. """ if self.keep_history: stmt = self._move_from_temp_hp_insert_query __traceback_info__ = stmt stmt.execute(cursor, (tid,)) else: self._move_from_temp_object_state(cursor, tid) if txn_has_blobs: # If we can require storages to have an UPSERT (mysql and # postgres do), then can we remove the DELETE? # Answer: probably not. What if the blob shrunk and we # have fewer chunks than we used to? stmt = self._move_from_temp_hf_delete_blob_chunk_query cursor.execute(stmt) if txn_has_blobs: stmt = self._move_from_temp_copy_blob_query __traceabck_info__ = stmt stmt.execute(cursor, (tid,))
# Insert and update current objects. # Note that to avoid deadlocks, it is incredibly important # to order the updates in OID order. _update_current_upsert_query = Schema.current_object.upsert( it.c.zoid, it.c.tid ).from_select( (it.c.zoid, it.c.tid), Schema.object_state.select( it.c.zoid, Schema.object_state.c.tid # correct column for typing ).where( it.c.tid == it.orderedbindparam() ).order_by(it.c.zoid) ).on_conflict( it.c.zoid ).do_update( it.c.tid ).prepared()
[docs] @noop_when_history_free @metricmethod_sampled def update_current(self, cursor, tid): """ Update the current object pointers. tid is the integer tid of the transaction being committed. """ stmt = self._update_current_upsert_query stmt.execute(cursor, (tid,))
[docs] @metricmethod_sampled def download_blob(self, cursor, oid, tid, filename): """Download a blob into a file.""" stmt = """ SELECT chunk FROM blob_chunk WHERE zoid = %s AND tid = %s AND chunk_num = %s """ f = None bytecount = 0 try: chunk_num = 0 while True: cursor.execute(stmt, (oid, tid, chunk_num)) rows = list(cursor) if rows: assert len(rows) == 1 chunk = rows[0][0] else: # No more chunks. Note: if there are no chunks at # all, then this method should not write a file. break if f is None: f = open(filename, 'wb') # pylint:disable=consider-using-with f.write(chunk) bytecount += len(chunk) chunk_num += 1 except: if f is not None: f.close() os.remove(filename) raise if f is not None: f.close() return bytecount
_upload_blob_uses_chunks = True def _upload_blob_clear_old_blob(self, cursor, oid, tid): if tid is not None: if self.keep_history: delete_stmt = """ DELETE FROM blob_chunk WHERE zoid = %s AND tid = %s """ cursor.execute(delete_stmt, (oid, tid)) else: delete_stmt = "DELETE FROM blob_chunk WHERE zoid = %s" cursor.execute(delete_stmt, (oid,)) use_tid = True insert_stmt = """ INSERT INTO blob_chunk (zoid, tid, chunk_num, chunk) VALUES (%s, %s, %s, %s) """ else: use_tid = False delete_stmt = "DELETE FROM temp_blob_chunk WHERE zoid = %s" cursor.execute(delete_stmt, (oid,)) insert_stmt = """ INSERT INTO temp_blob_chunk (zoid, chunk_num, chunk) VALUES (%s, %s, %s) """ return insert_stmt, use_tid def _upload_blob_read_chunks(self, cursor, oid, tid, filename, use_chunks, insert_stmt, use_tid): Binary = self.driver.Binary with open(filename, 'rb') as f: chunk_num = 0 while True: chunk = f.read(self.blob_chunk_size) if use_chunks else f.read() if not chunk and chunk_num > 0: # EOF. Note that we always write at least one # chunk, even if the blob file is empty. break if use_tid: params = (oid, tid, chunk_num, Binary(chunk)) else: params = (oid, chunk_num, Binary(chunk)) cursor.execute(insert_stmt, params) chunk_num += 1
[docs] @metricmethod_sampled def upload_blob(self, cursor, oid, tid, filename): """Upload a blob from a file. If serial is None, upload to the temporary table. """ insert_stmt, use_tid = self._upload_blob_clear_old_blob(cursor, oid, tid) self._upload_blob_read_chunks( cursor, oid, tid, filename, self._upload_blob_uses_chunks, insert_stmt, use_tid )
[docs] class RowBatcherStoreTemps(object): """ A helper class to implement ``store_temps`` using a RowBatcher. You must provide an implementation of :meth:`store_temp_into_batcher` and it must be an upsert. The :meth:`generic_store_temp_into_batcher` method can be used to help with this. """ def __init__(self, keep_history, binary, batcher_factory=RowBatcher): self.make_batcher = batcher_factory self.keep_history = keep_history self.binary = binary _compute_md5sum = noop_when_history_free(_compute_md5sum) @metricmethod_sampled def store_temps(self, cursor, state_oid_tid_iter): store_temp = self.store_temp_into_batcher batcher = self.make_batcher(cursor) # Default row limit for data, oid_int, tid_int in state_oid_tid_iter: store_temp(batcher, oid_int, tid_int, data) batcher.flush() replace_temps = store_temps # The _generic methods allow for UPSERTs, at least on MySQL # and PostgreSQL. Previously, MySQL used `command='REPLACE'` # for an UPSERT; now it uses a suffix 'ON DUPLICATE KEY UPDATE ...'. # PostgreSQL uses a suffix 'ON CONFLICT (...) UPDATE ...'. generic_command = 'INSERT' generic_suffix = '' def generic_store_temp_into_batcher(self, batcher, oid, prev_tid, data): md5sum = self._compute_md5sum(data) command = self.generic_command suffix = self.generic_suffix # TODO: Now that we guarantee not to feed duplicates here, drop # the conflict handling. if command == 'INSERT' and not suffix: batcher.delete_from('temp_store', zoid=oid) batcher.insert_into( "temp_store (zoid, prev_tid, md5, state)", batcher.row_schema_of_length(4), (oid, prev_tid, md5sum, self.binary(data)), rowkey=oid, size=len(data) + 32, command=command, suffix=suffix ) store_temp_into_batcher = generic_store_temp_into_batcher