Source code for relstorage.adapters.mysql.adapter

##############################################################################
#
# Copyright (c) 2008 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""MySQL adapter for RelStorage.

Connection parameters supported by MySQLdb:

host
    string, host to connect
user
    string, user to connect as
passwd
    string, password to use
db
    string, database to use
port
    integer, TCP/IP port to connect to
unix_socket
    string, location of unix_socket (UNIX-ish only)
conv
    mapping, maps MySQL FIELD_TYPE.* to Python functions which convert a
    string to the appropriate Python type
connect_timeout
    number of seconds to wait before the connection attempt fails.
compress
    if set, gzip compression is enabled
named_pipe
    if set, connect to server via named pipe (Windows only)
init_command
    command which is run once the connection is created
read_default_file
    see the MySQL documentation for mysql_options()
read_default_group
    see the MySQL documentation for mysql_options()
client_flag
    client flags from MySQLdb.constants.CLIENT
load_infile
    int, non-zero enables LOAD LOCAL INFILE, zero disables
"""
from __future__ import absolute_import
from __future__ import print_function

import logging
import json

from zope.interface import implementer

from relstorage._compat import iteritems
from relstorage._util import metricmethod_sampled
from relstorage._util import TRACE

from ..adapter import AbstractAdapter

from ..dbiter import HistoryFreeDatabaseIterator
from ..dbiter import HistoryPreservingDatabaseIterator
from ..interfaces import IRelStorageAdapter
from ..interfaces import UnableToLockRowsToReadCurrentError
from ..interfaces import UnableToLockRowsToModifyError
from ..interfaces import UnableToAcquireCommitLockError
from ..poller import Poller
from ..scriptrunner import ScriptRunner
from ..batch import RowBatcher

from . import drivers
from .connmanager import MySQLdbConnectionManager
from .locker import MySQLLocker
from .mover import MySQLObjectMover

from .oidallocator import MySQLOIDAllocator
from .packundo import MySQLHistoryFreePackUndo
from .packundo import MySQLHistoryPreservingPackUndo
from .schema import MySQLSchemaInstaller
from .schema import MySQLVersionDetector
from .stats import MySQLStats
from .txncontrol import MySQLTransactionControl

logger = logging.getLogger(__name__)


[docs] @implementer(IRelStorageAdapter) class MySQLAdapter(AbstractAdapter): """MySQL adapter for RelStorage.""" # pylint:disable=too-many-instance-attributes driver_options = drivers def __init__(self, options=None, oidallocator=None, version_detector=None, connmanager=None, locker=None, mover=None, **params): self._params = params self.oidallocator = oidallocator self.version_detector = version_detector self.connmanager = connmanager self.locker = locker self.mover = mover super().__init__(options) def _create(self): options = self.options driver = self.driver params = self._params if self.version_detector is None: self.version_detector = MySQLVersionDetector() if self.connmanager is None: self.connmanager = MySQLdbConnectionManager( driver, params=params, options=options, ) if self.locker is None: self.locker = MySQLLocker( options=options, driver=driver, batcher_factory=RowBatcher, version_detector=self.version_detector, ) self.runner = ScriptRunner() self.schema = MySQLSchemaInstaller( driver=driver, connmanager=self.connmanager, runner=self.runner, keep_history=self.keep_history, version_detector=self.version_detector, ) if self.mover is None: self.mover = MySQLObjectMover( driver, options=options, version_detector=self.version_detector, ) if self.oidallocator is None: self.oidallocator = MySQLOIDAllocator(driver) self.poller = Poller( self.driver, keep_history=self.keep_history, runner=self.runner, revert_when_stale=options.revert_when_stale, transactions_may_go_backwards=( self.connmanager.replica_selector is not None or self.connmanager.ro_replica_selector is not None ) ) self.txncontrol = MySQLTransactionControl( connmanager=self.connmanager, poller=self.poller, keep_history=self.keep_history, Binary=driver.Binary, ) if self.keep_history: self.packundo = MySQLHistoryPreservingPackUndo( driver, connmanager=self.connmanager, runner=self.runner, locker=self.locker, options=options, ) self.dbiter = HistoryPreservingDatabaseIterator( driver, ) else: self.packundo = MySQLHistoryFreePackUndo( driver, connmanager=self.connmanager, runner=self.runner, locker=self.locker, options=options, ) self.dbiter = HistoryFreeDatabaseIterator( driver, ) self.stats = MySQLStats( connmanager=self.connmanager, keep_history=self.keep_history ) def new_instance(self): return type(self)( options=self.options, oidallocator=self.oidallocator.new_instance(), version_detector=self.version_detector, connmanager=self.connmanager, locker=self.locker, mover=self.mover, **self._params ) def __str__(self): parts = [self.__class__.__name__] if self.keep_history: parts.append('history preserving') else: parts.append('history free') p = self._params.copy() if 'passwd' in p: del p['passwd'] p = sorted(iteritems(p)) parts.extend('%s=%r' % item for item in p) return ", ".join(parts) # A temporary magic variable as we move TID allocation into some # databases; with an external clock, we *do* need to sleep waiting for # TIDs to change in a manner we can exploit; that or we need to be very # careful about choosing pack times. RS_TEST_TXN_PACK_NEEDS_SLEEP = 1 @metricmethod_sampled def lock_database_and_choose_next_tid(self, cursor, username, description, extension): proc = 'lock_and_choose_tid' args = () if self.keep_history: # (packed, username, descr, extension) proc += '(%s, %s, %s, %s)' args = (False, username, description, extension) multi_results = self.driver.callproc_multi_result(cursor, proc, args) tid, = multi_results[0][0] logger.log(TRACE, "Locked database only to choose tid %s", tid) return tid @metricmethod_sampled def lock_database_and_move(self, store_connection, load_connection, transaction_has_blobs, ude, commit=True, committing_tid_int=None, after_selecting_tid=lambda tid: None): if not self.version_detector.supports_good_stored_procs(store_connection.cursor): # XXX: When can we drop this? Probably not until AppVeyor upgrades # MySQL past 5.7.12. return super().lock_database_and_move( store_connection, load_connection, transaction_has_blobs, ude, commit=commit, committing_tid_int=committing_tid_int, after_selecting_tid=after_selecting_tid) params = (committing_tid_int, commit) # (p_committing_tid, p_commit) proc = 'lock_and_choose_tid_and_move(%s, %s)' if self.keep_history: params += ude # (p_committing_tid, p_commit, p_user, p_desc, p_ext) proc = 'lock_and_choose_tid_and_move(%s, %s, %s, %s, %s)' try: multi_results = self.driver.callproc_multi_result( store_connection.cursor, proc, params, exit_critical_phase=commit ) except self.driver.lock_exceptions: self.locker.reraise_commit_lock_error( store_connection.cursor, proc, UnableToAcquireCommitLockError, ) tid_int, = multi_results[0][0] after_selecting_tid(tid_int) logger.log(TRACE, "Locked database and moved rows for tid %s", tid_int) return tid_int, "-" DEFAULT_LOCK_OBJECTS_AND_DETECT_CONFLICTS_INTERLEAVABLE = False def _best_lock_objects_and_detect_conflicts(self, cursor, read_current_oids): read_current_param = None if read_current_oids: # In MySQL 8, we could pass in a JSON array and use JSON_TABLE # to join directly against the data. # In earlier versions, we could do some tricks with strings and # preparing dynamic SQL. # In all versions, we could write a query like # select tid from object_state where json_contains('[1, 8]', cast(zoid as json), '$') # but that is very slow (entire table is scanned). # # We pass the string array, parse it as json, loop over it to put in a temp table # and join that. # # I also benchmarked sending the data in a format suitable # for concatenating to the end of 'INSERT ... VALUES' and # using dynamic SQL in the stored proc to execute that # ('PREPARE stmt FROM @str; EXECUTE stmt') and it # benchmarked essentially the same. Usually there are a # small number of things being readCurrent so it probably # doesn't matter. read_current_param = json.dumps(list(read_current_oids.items())) proc = 'lock_objects_and_detect_conflicts(%s, %s)' try: multi_results = self.driver.callproc_multi_result( cursor, proc, (read_current_param, 0) ) except self.locker.lock_exceptions as e: # On MySQL 5.7, the time-based mechanism to determine that # we failed to take NOWAIT shared locks is not reliable # when the commit lock timeout is very small (close to 1), # because it can take several seconds for the procedure to # error out. Thus we provide a specific error message that # we key off. (We don't change errno or state or anything # like that in case anybody introspects that stuff.) if 'shared locks' in str(e): self.locker.reraise_commit_lock_error( cursor, self._describe_best_lock_objects_and_detect_conflicts(), UnableToLockRowsToReadCurrentError ) elif 'exclusive locks' in str(e): self.locker.reraise_commit_lock_error( cursor, self._describe_best_lock_objects_and_detect_conflicts(), UnableToLockRowsToModifyError ) raise # There's always a useless last result, the result of the stored procedure itself. proc_result = multi_results.pop() assert not proc_result, proc_result assert len(multi_results) == 1, multi_results conflicts = multi_results[0] return conflicts def _describe_best_lock_objects_and_detect_conflicts(self): return 'lock_objects_and_detect_conflicts(%s)'