Source code for relstorage.adapters.sqlite.drivers

# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2019 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import contextlib
import os.path
import sqlite3
import functools

from zope.interface import implementer

from relstorage._util import log_timed

from ..drivers import implement_db_driver_options
from ..drivers import AbstractModuleDriver
from ..drivers import GeventDriverMixin
from ..drivers import MemoryViewBlobDriverMixin
from ..interfaces import IDBDriver
from ..interfaces import IDBDriverSupportsCritical
from .._util import DatabaseHelpersMixin

from .dialect import Sqlite3Dialect

__all__ = [
    'Sqlite3Driver',
    'Sqlite3GeventDriver',
]

database_type = 'sqlite3'
logger = __import__('logging').getLogger(__name__)



_MB = 1024 * 1024
DEFAULT_MAX_WAL = 10 * _MB

# Benchmarking on at least one system doesn't show an improvement to
# either reading or writing by forcing a large mmap_size (maybe that's
# not enabled on macOS?). By default, just report. A setting of 0
# means do not use.
DEFAULT_MMAP_SIZE = None
# 4096 is the page size in current releases of sqlite; older versions
# used 1024. A larger page makes sense as we have biggish values.
# Going larger doesn't make much difference in benchmarks.
DEFAULT_PAGE_SIZE = 4096
# The default of -2000 is abs(N*1024) = ~2MB. At 4K page size,
# that's about 500 pages. A negative number specifies bytes, a
# positive number specifies pages. In general, since we have two
# connections for each ZODB connection, we're probably best letting
# the cache stay relatively small and letting the operating system
# page cache decide between threads and process.s
DEFAULT_CACHE_SIZE = None
# Control where temporary data is:
#
# FILE = a deleted disk file (that sqlite never flushes so
# theoretically just exists in the operating system's filesystem
# cache)
#
# MEMORY = explicitly in memory only
#
# DEFAULT = compile time default. Benchmarking for large writes
# doesn't show much difference between FILE and MEMORY *usually*:
# Sometimes changing the setting back and forth causes massive
# changes in performance, at least on a macOS desktop system.
DEFAULT_TEMP_STORE = None

# How long before we get 'OperationalError: database is locked'
DEFAULT_TIMEOUT = 15


class UnableToConnect(sqlite3.OperationalError):
    filename = None
    def with_filename(self, f):
        self.filename = f
        return self

    def __str__(self):
        s = super().__str__()
        if self.filename:
            s += " (At: %r)" % self.filename
        return s


class Cursor(sqlite3.Cursor):
    has_analyzed_temp = False

    def execute(self, stmt, params=None):
        # While we transition away from hardcoded SQL to the query
        # objects, we still have some %s params out there. This papers
        # over that.
        if params is not None:
            stmt = stmt.replace('%s', '?')
            return sqlite3.Cursor.execute(self, stmt, params)

        return sqlite3.Cursor.execute(self, stmt)

    def executemany(self, stmt, params):
        stmt = stmt.replace('%s', '?')
        return sqlite3.Cursor.executemany(self, stmt, params)

    # PyPy likes to throw `sqlite3.ProgrammingError: Cannot operate on a closed cursor`
    # under various circumstances. Which ones depend on which version of PyPy.
    # For example, upgrading from PyPy3.6 7.1 to PyPy3.6 7.3.1 started to do
    # this in ``self.connection`` when printing ``__traceback_info__``
    # on failures (""):
    #
    #  File "/opt/python/pypy3.6-7.3.1/lib_pypy/_sqlite3.py", line 884, in __execute
    #     if self.__connection._begin_statement and self.__statement._is_dml:
    #  AttributeError: 'Connection' object has no attribute '_begin_statement'
    #

    @property
    def __rs_connection(self):
        try:
            return self.connection
        except sqlite3.ProgrammingError: # pragma: no cover
            return "<closed cursor>"

    def __repr__(self):
        return '<%s at 0x%x from %r>' % (
            type(self).__name__,
            id(self),
            self.__rs_connection
        )

    def close(self):
        try:
            sqlite3.Cursor.close(self)
        except sqlite3.ProgrammingError: # pragma: no cover
            # XXX: Where was this happening? Why are we doing this?
            # (I haven't seen it locally)
            pass


class _ExplainCursor(DatabaseHelpersMixin): # pragma: no cover (A debugging aid)
    def __init__(self, cur):
        self.cur = cur

    def __getattr__(self, name):
        return getattr(self.cur, name)

    def __iter__(self):
        return iter(self.cur)

    def execute(self, sql, *args):
        if sql.strip().startswith(('INSERT', 'SELECT', 'DELETE', 'WITH')):
            exp = 'EXPLAIN QUERY PLAN ' + sql.lstrip()
            print(sql)
            self.cur.execute(exp, *args)
            print(self._rows_as_pretty_string(self.cur))
        return self.cur.execute(sql, *args)


class Connection(sqlite3.Connection):
    CURSOR_FACTORY = Cursor
    has_analyzed_temp = False
    before_commit_functions = ()
    _rs_has_closed = False
    _rs_progress_handler = None
    replica = None
    standard_progress_handler = (None, 0)
    # For PyPy3.6-7.3.1. We connect with isolation_level=None, leaving
    # sqlite in its own transaction management mode. But PyPy
    # 3.6-7.3.1 has that broken prior to
    # https://foss.heptapod.net/pypy/pypy/-/commit/fbbe06715eb48df1a03640672d99335695d3e47c.
    # See also https://foss.heptapod.net/pypy/pypy/-/issues/3226
    _begin_statement = None

    def __init__(self, rs_db_filename, *args, **kwargs):
        __traceback_info__ = args, kwargs
        # PyPy3 calls self.commit() during __init__.
        self.rs_db_filename = rs_db_filename
        self.before_commit_functions = []

        try:
            super().__init__(*args, **kwargs)
        except sqlite3.OperationalError as e:
            raise UnableToConnect(e).with_filename(rs_db_filename)

    def __repr__(self):
        if not self.rs_db_filename:
            return super().__repr__()
        try:
            in_tx = self.in_transaction
        except sqlite3.ProgrammingError:
            in_tx = 'closed'

        return '<%s at 0x%x to %r in_transaction=%s>' % (
            type(self).__name__,
            id(self), self.rs_db_filename,
            in_tx
        )

    def _at_transaction_end(self):
        pass

    def register_before_commit_cleanup(self, func):
        self.before_commit_functions.append(func)

    if 0: # pylint:disable=using-constant-test
        def cursor(self):
            return _ExplainCursor(sqlite3.Connection.cursor(self, self.CURSOR_FACTORY))
    else:
        def cursor(self):
            return sqlite3.Connection.cursor(self, self.CURSOR_FACTORY)

    def return_to_repeatable_read(self):
        # See mover.py
        # deliberately doesn't use self.commit(), we don't want to run cleanup
        # hooks, this is a mid-logical-ZODB-transaction operation
        sqlite3.Connection.commit(self)

    def commit(self):
        try:
            for func in self.before_commit_functions:
                func(self)
            sqlite3.Connection.commit(self)
        finally:
            self._at_transaction_end()


    def rollback(self):
        try:
            sqlite3.Connection.rollback(self)
            for func in self.before_commit_functions:
                func(self, True)
        finally:
            self._at_transaction_end()

    def __check_and_log(self, reported_values, changed_values):
        logger.debug(
            "Connection: %(conn)s.\n\t"
            "Using sqlite3 version: %(ver)s.\n\t"
            "Default    connection settings: %(defs)s.\n\t"
            "Changing   connection settings: %(changing)s.\n\t"
            "Desired    connection settings: %(desired)s.\n\t"
            "Unapplied  connection settings: %(applied)s.\n\t",
            dict( # pylint:disable=use-dict-literal
                conn=self,
                ver=sqlite3.sqlite_version,
                defs={
                    k: v for k, v in reported_values.items()
                    if k not in changed_values
                },
                changing={
                    k: v for k, v in reported_values.items()
                    if k in changed_values
                },
                desired={
                    k: v[0] for k, v in changed_values.items()
                },
                applied={
                    k: v[1] for k, v in changed_values.items()
                    if v[0] != v[1]
                },
            )
        )

    # PRAGMA statements don't allow ? placeholders
    # when executed. This is probably a bug in the sqlite3
    # module.
    def __execute_pragma(self, name, value):
        stmt = 'PRAGMA %s = %s' % (name, value)
        cur = self.execute(stmt)
        # On PyPy, it's important to traverse the cursor, even if
        # you don't expect any results, because it still counts as
        # a statement that's open and can cause 'OperationalError:
        # can't commit with SQL operations active'.
        return cur.fetchall()

    def execute_pragmas(self, **kwargs):
        report = {}
        changed = {} # {setting: (desired, updated)}
        if 'page_size' in kwargs:
            # This has to happen before changing into WAL.
            ps = kwargs.pop('page_size')
            items = [('page_size', ps)]
            items.extend(kwargs.items())
        else:
            items = sorted(kwargs.items())

        for pragma, desired in items:
            # Query, report, then change
            stmt = 'PRAGMA %s' % (pragma,)
            row, = self.execute(stmt).fetchall() or ((),)
            orig_value = row[0] if row else None
            assert pragma not in report # Only one
            report[pragma] = orig_value
            if desired is not None and desired != orig_value:
                self.__execute_pragma(pragma, desired)
                row = self.execute(stmt).fetchone()
                new_value = row[0] if row else None
                changed[pragma] = (desired, new_value)

        self.__check_and_log(report, changed)

    @log_timed
    def close(self):
        # If we're the only connection open to this database,
        # and SQLITE_FCNTL_PERSIST_WAL is true (by default
        # *most* places, but apparently not in the sqlite3
        # 3.24 shipped with Apple in macOS 10.14.5), then when
        # we close the database the wal file that was built up
        # by any of the writes that have been done will be automatically
        # combined with the database file, as if with
        # "PRAGMA wal_checkpoint(RESTART)".
        #
        # This can be slow. It releases the GIL, so it could be done in another thread;
        # but most often (aside from tests) we're closing connections because we're
        # shutting down the process so spawning threads isn't really a good thing.
        if self._rs_has_closed: # pragma: no cover
            return

        self._rs_has_closed = True
        self.before_commit_functions = ()
        # Recommended best practice is to OPTIMIZE the database for
        # each closed connection. OPTIMIZE needs to run in each connection
        # so it can see what tables and indexes were used. It's usually fast,
        # but has the potential to be slow and lock the database. It cannot be executed
        # on a read-only connection. We don't want to block too long just closing
        # a connection, so reset the time we wait to get a lock
        # (if needed) to a lower value (ms).
        try:
            self.executescript("""
            PRAGMA busy_timeout = 3000;
            PRAGMA query_only = 0;
            PRAGMA optimize;
            """)
        except sqlite3.OperationalError:
            logger.debug("Failed to optimize database, probably in use", exc_info=True)
        except sqlite3.DatabaseError:
            # It's possible the file was removed.
            logger.exception("Failed to optimize database; was it removed?")

        super().close()


[docs] @implementer(IDBDriver) class Sqlite3Driver(MemoryViewBlobDriverMixin, AbstractModuleDriver): dialect = Sqlite3Dialect() __name__ = 'sqlite3' MODULE_NAME = __name__ STATIC_AVAILABLE = ( # Tested to work on Python 2.7 and Python 3.6+; # seen some strange ``ProgrammingError: closed`` # on Python 3.5 # 3.11 is the oldest version tested on CI sqlite3.sqlite_version_info[:2] >= (3, 11) ) CONNECTION_FACTORY = Connection DEFAULT_CONNECT_ARGS = { } supports_64bit_unsigned_id = False def __init__(self): super().__init__() # Sadly, if a connection is closed out from under it, # sqlite3 throws ProgrammingError, which is not very helpful. # That should really only happen in tests, though, since # we're directly connected to the file on disk. self.disconnected_exceptions += (self.driver_module.ProgrammingError,) # pylint:disable=no-member # Make our usual connect() method call our connect_to_file method instead of the # module's connect() method so we get our preferred goodies. self._connect = self.connect_to_file # in_transaction doesn't work on Py2, so assume the worst # by inheriting the functions. def connection_may_need_rollback(self, conn): try: return conn.in_transaction except sqlite3.ProgrammingError: # we're closed. We do need to attempt the rollback so # we catch the error and know to drop the connection. return True connection_may_need_commit = connection_may_need_rollback # Py2 returns buffer for blobs, hence the Mixin. # But Py3 returns plain bytes. def binary_column_as_state_type(self, data): return data def _connect_to_file_or_uri(self, fname, timeout=DEFAULT_TIMEOUT, pragmas=None, quick_check=True, isolation_level=None, factory_args=(), **connect_args): factory_args = (fname,) + factory_args factory = lambda *args, **kwargs: self.CONNECTION_FACTORY(*(factory_args + args), **kwargs) default_connect_args = self.DEFAULT_CONNECT_ARGS.copy() default_connect_args.update(connect_args) connect_args = default_connect_args connection = sqlite3.connect( fname, # See Sqlite3ConnectionManager for the meaning of this. # We default to None which puts us in the underlying sqlite autocommit # mode. This is unlike the standard library, so you must # either set this or manage your own transactions explicitly isolation_level=isolation_level, factory=factory, # We explicitly push closing off to a new thread. check_same_thread=False, timeout=timeout, **connect_args ) if str is bytes: # We don't use the TEXT type, but even so # sqlite complains: # # ProgrammingError: You must not use 8-bit bytestrings unless # you use a text_factory that can interpret 8-bit bytestrings # (like text_factory = str). It is highly recommended that you # instead just switch your application to Unicode strings. connection.text_factory = str # Make sure we have at least one pragma that touches # the database so that we can verify that it's not corrupt. pragmas = pragmas or {} pragmas.setdefault('journal_mode', 'wal') cur = connection.cursor() __traceback_info__ = fname, cur, pragmas try: connection.execute_pragmas(**pragmas) # Quick integrity check before we read. if quick_check: cur.execute('PRAGMA quick_check') rows = cur.fetchall() if len(rows) != 1 or rows[0][0] != 'ok': msg = '\n'.join(row[0] for row in rows) raise sqlite3.DatabaseError('Quick integrity check failed %s' % msg) except: logger.exception("Failed to execute pragmas") cur.close() connection.close() raise cur.close() return connection
[docs] def connect_to_file(self, fname, query_only=False, max_wal_size=DEFAULT_MAX_WAL, mmap_size=DEFAULT_MMAP_SIZE, page_size=DEFAULT_PAGE_SIZE, cache_size=DEFAULT_CACHE_SIZE, temp_store=DEFAULT_TEMP_STORE, timeout=DEFAULT_TIMEOUT, quick_check=True, isolation_level=None, extra_pragmas=None, override_pragmas=None): """ Return a DB-API Connection object. .. caution:: Using the connection as a context manager does **not** result in the connection being closed, only committed or rolled back. """ # There are some things we might like to do, but we can't # unless we're operating on at least Python 3.4, where the URI # syntax is supported. It also turns out that they don't really work either: # # - Enable shared cache. (https://www.sqlite.org/sharedcache.html) # This doesn't work because of locking: all connections sharing the cache # lock at the same time. # - Use ?mode=ro for true query_only mode. But if the file doesn't # exist that fails, and we can't execute certain pragmas that way either. fname = os.path.abspath(fname) if fname and fname != ':memory:' else fname # Things we like but don't require. # Note that we use the primitive values for these things so that we can # detect when they get set for reporting purposes. pragmas = { 'journal_size_limit': max_wal_size, 'mmap_size': mmap_size, 'page_size': page_size, 'temp_store': temp_store, # WAL mode is always consistent even after a operating system # crash in NORMAL mode. It might lose a transaction, though. # The default is often FULL/2, which is higher than NORMAL/1. 'synchronous': 1, # If cache_spill is allowed, at some point a transaction # can begin writing dirty pages to the database file, taking # an exclusive lock. That could be at arbitrary times, so we don't want that. 'cache_spill': 0, # In the past, we considered disabling auto-checkpoint so that commits have # reliable duration; after commit, if it's a good time, # we can run 'PRAGMA wal_checkpoint'. (In most cases, the last # database connection that's open will essentially do that # automatically.) This is a balancing act, though, and # we expose this setting to the end user to allow them to tune it. # By default, use the compiled-in default and report what that is. 'wal_autocheckpoint': None, # Things to query and report. 'soft_heap_limit': None, # 0 means no limit 'cache_size': cache_size, # How big is the database? 'page_count': None, 'busy_timeout': None, 'query_only': None, } # User-specified extra pragmas go here. pragmas.update(extra_pragmas or {}) # Things that *must* be set. required_pragmas = { # WAL mode can actually be a bit slower at commit time, # but buys us far better concurrency. # Note: In-memory databases always use 'memory' as the journal mode; # temporary databases always use 'delete'. 'journal_mode': 'wal', } if query_only: required_pragmas['query_only'] = 1 pragmas.update(required_pragmas) pragmas.update(override_pragmas or {}) return self._connect_to_file_or_uri( fname, pragmas=pragmas, timeout=timeout, quick_check=quick_check, isolation_level=isolation_level )
def exception_is_deadlock(self, exc): # Not possible in sqlite, only one writer return False
### # Gevent. # # Database drivers that use networking and a gevent cooperative driver # will essentially always switch while any given query is running. # This means that acquiring database locks doesn't block the Python # process and other greenlets can run. We need to achieve the same # thing here, otherwise we can deadlock the process --- despite the # 'critical_phase', it's still possible that, while already holding # sqlite's exclusive locks, we could switch to another greenlet that # wants to take those same locks. If we block, we deadlock until a # timeout occurs. The ZODB tests check7*Threads do this explicitly by # invoking tpc_vote() and then time.sleep(). # # So, since there are limited statements that we execute to lock, # we push those onto gevent's threadpool (unless we're in critical phase). # # The rest of the time, we use a progress handler for non-blocking functions # to periodically automatically switch. This is a bit non-deterministic, # but low overhead. ### def run_blocking_in_threadpool(func): @functools.wraps(func) def in_threadpool(self, stmt, params=None): if self.could_block(stmt) and not self.connection.is_in_critical_phase(): # Be sure not to yield in the threadpool thread, there's nothing to yield to. with self.connection.temp_critical_phase(): return self.connection.gevent.get_hub().threadpool.apply( func, (self, stmt, params) ) return func(self, stmt, params) return in_threadpool class GeventCursor(Cursor): BLOCKING_PREFIXES = ( 'BEGIN EXCLUSIVE', 'BEGIN IMMEDIATE', 'UPDATE', 'INSERT', ) def could_block(self, stmt): stmt = stmt.upper().rstrip() if stmt.upper().rstrip().startswith(self.BLOCKING_PREFIXES): return True return False execute = run_blocking_in_threadpool(Cursor.execute) executemany = run_blocking_in_threadpool(Cursor.executemany) class GeventConnection(Connection): CURSOR_FACTORY = GeventCursor _in_critical_phase = None def __init__(self, rs_db_filename, gevent, yield_interval, *args, **kwargs): # PyPy3 calls commit() from early in __init__, and certain # things aren't set up yet. Notably, set_progress_handler # doesn't work. self._at_transaction_end = lambda: None self.yield_interval = yield_interval self.gevent = gevent Connection.__init__(self, rs_db_filename, *args, **kwargs) del self._at_transaction_end self.set_progress_handler(gevent.sleep, yield_interval) def is_in_critical_phase(self): return self._in_critical_phase def _at_transaction_end(self): # pylint:disable=method-hidden self.exit_critical_phase() def enter_critical_phase_until_transaction_end(self): self._in_critical_phase = True self.set_progress_handler(None, 0) def exit_critical_phase(self): self._in_critical_phase = None self.set_progress_handler(self.gevent.sleep, self.yield_interval) @contextlib.contextmanager def temp_critical_phase(self): if self._in_critical_phase: yield else: self.enter_critical_phase_until_transaction_end() try: yield finally: self.exit_critical_phase() # Note that we don't commit or rollback on the threadpool. If the # threadpool is full, we don't want to put an operation (releasing # locks) that is needed for threadpool tasks (taking locks) on the # back of the threadpool.
[docs] @implementer(IDBDriverSupportsCritical) class Sqlite3GeventDriver(GeventDriverMixin, Sqlite3Driver): __name__ = 'gevent ' + Sqlite3Driver.MODULE_NAME _GEVENT_CAPABLE = True _GEVENT_NEEDS_SOCKET_PATCH = False CONNECTION_FACTORY = GeventConnection DEFAULT_CONNECT_ARGS = { # Keep a large number of cached statements. Certain # data is tracked at the cached statement level. In our # case that's notably the number of virtual machine instructions # it has executed. 'cached_statements': 1024 } # Call ``gevent.sleep()`` after this many virtual instructions # have been executed by the SQLite virtual machine for any one # statement (because instruction execution is tracked at the statement # level, not the database level). yield_to_gevent_instruction_interval = 100
[docs] def configure_from_options(self, options): if options.adapter and options.adapter.config.gevent_yield_interval: conf = options.adapter.config
self.yield_to_gevent_instruction_interval = conf.gevent_yield_interval def _connect_to_file_or_uri(self, *args, **kwargs): # pylint:disable=arguments-differ,signature-differs assert 'factory_args' not in kwargs kwargs['factory_args'] = ( self.gevent, self.yield_to_gevent_instruction_interval, ) return super()._connect_to_file_or_uri(*args, **kwargs)
[docs] def enter_critical_phase_until_transaction_end(self, connection, cursor): # pylint:disable=unused-argument
connection.enter_critical_phase_until_transaction_end()
[docs] def is_in_critical_phase(self, connection, cursor): # pylint:disable=unused-argument
return connection.is_in_critical_phase()
[docs] def exit_critical_phase(self, connection, cursor): # pylint:disable=unused-argument
connection.exit_critical_phase() implement_db_driver_options( __name__, '.drivers' )