Source code for relstorage.cache.local_client

##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import bz2
import time
import zlib


from contextlib import closing

from zope import interface

from relstorage._util import get_memory_usage
from relstorage._util import byte_display
from relstorage._util import timer as _timer
from relstorage._util import log_timed as _log_timed
from relstorage._util import consume
from relstorage._compat import OID_TID_MAP_TYPE as OidTMap
from relstorage.interfaces import Int

from relstorage.cache.interfaces import IStateCache
from relstorage.cache.interfaces import IPersistentCache
from relstorage.cache.interfaces import IGenerationalLRUCache
from relstorage.cache.interfaces import IGeneration
from relstorage.cache.interfaces import ILRUEntry


from relstorage.cache.persistence import sqlite_connect
from relstorage.cache.persistence import sqlite_files
from relstorage.cache.persistence import FAILURE_TO_OPEN_DB_EXCEPTIONS
from relstorage.cache.local_database import Database

from relstorage.cache import cache # pylint:disable=no-name-in-module

logger = __import__('logging').getLogger(__name__)

# pylint:disable=too-many-lines

[docs] class ICachedValue(ILRUEntry): """ Data stored in the cache for a single OID. This may be a single ``(state, tid)`` pair, or it may be multiple such pairs, representing evolution of the object. Memory and time efficiency both matter. These objects do not know their own OID, just the state and tid. .. rubric:: Freezing objects For any given OID, one TID may be frozen. It may then be looked up without knowing its actual TID (using a TID of ``None``). This is useful for objects that do not change. Invalidations of frozen states happen automatically during the MVCC vacuuming process: - Freezing happens after a poll, when we have determined that an object has not changed within the range of transactions visible to all database viewers. The index entry is removed, and we begin looking for it at None. At this time, any older states cached for the object are removed. By definition, there can be no newer states, so only one state is accessible. (Of course, if we've just completed a transaction and not yet polled, then that's not strictly true; there could be cached data from the future not yet visible to any viewers.) - If an object previously frozen is changed, we see that in our index and won't ask for frozen states anymore. If we then load the new state from the DB, we cache it, leaving it with two cached states. Older viewers unaware of the change and accessing the database prior to it, can still use the frozen revision. Eventually that index entry reaches the end of its lifetime. If the object has not changed again, we will freeze it. This will discard the older frozen value and replace it with a new one. If it has changed again, we will invalidate the cache for anything older than that TID, which includes the first frozen state. The implementation and usage of frozen objects is contained entirely within this module. Clients are then responsible for making sure that the returned tid, if any, is within their viewing range. """ # pylint:disable=no-self-argument,unexpected-special-method-signature,inherit-non-class # pylint:disable=arguments-differ weight = Int( description="""The cost (size) of this cached value.""") max_tid = Int( description="""The newest TID cached for the object.""" ) newest_value = interface.Attribute("The ``(state, tid)`` pair that is the newest.") def get_if_tid_matches(tid): """ Return the ``(state, tid)`` for the given TID. A special value of None matches any previously frozen TID. If no TID matches, returns None. """ def freeze_to_tid(tid): """ Mark the given TID, if it exists, as frozen. Returns a new value to store in the cache. If it returns None, this entry is removed from the cache (because the TID didn't match, and must have been older.) """ def with_later(value): """ Add the ``(state, tid)`` (another ICachedValue) to the list of cached entries. Return the new value to place in the cache. """ def discarding_tids_before(tid): """ Remove the tid, and anything older, from the list of cached entries. Return the new value to place in the cache. Return None if all values were removed and the cached entry should be removed. """
interface.classImplements(cache.CachedValue, ICachedValue) interface.classImplements(cache.PyCache, IGenerationalLRUCache) interface.classImplements(cache.PyGeneration, IGeneration) @interface.implementer(IStateCache, IPersistentCache) class LocalClient(object): # pylint:disable=too-many-public-methods,too-many-instance-attributes # Percentage of our byte limit that should be dedicated # to the main "protected" generation _gen_protected_pct = 0.8 # Percentage of our byte limit that should be dedicated # to the initial "eden" generation _gen_eden_pct = 0.1 # Percentage of our byte limit that should be dedicated # to the "probationary"generation _gen_probation_pct = 0.1 # By default these numbers add up to 1.0, but it would be possible to # overcommit by making them sum to more than 1.0. (For very small # limits, the rounding will also make them overcommit). # Use the same markers as zc.zlibstorage (well, one marker) # to automatically avoid double-compression _compression_markers = { 'zlib': (b'.z', zlib.compress), 'bz2': (b'.b', bz2.compress), 'none': (None, None) } _decompression_functions = { b'.z': zlib.decompress, b'.b': bz2.decompress } # What multiplier of the number of items in the cache do we apply # to determine when to age the frequencies? _age_factor = 10 # When did we last age? _aged_at = 0 _next_age_at = 1000 _cache = None # Things copied from self._cache _peek = None def __init__(self, options, prefix=None): self.options = options self.prefix = prefix or '' # XXX: The calc for limit is substantially smaller # The real MB value is 1024 * 1024 = 1048576 self.limit = int(1000000 * options.cache_local_mb) self._value_limit = options.cache_local_object_max # The underlying data storage. It maps ``{oid: value}``, # where ``value`` is an :class:`ICachedValue`. # # Keying off of OID directly lets the LRU be based on the # popularity of an object, not its particular transaction. It also lets # us use a LLBTree to store the data, which can be more memory efficient. self._cache = None self.flush_all() self.__initial_weight = self._cache.weight compression_module = options.cache_local_compression try: compression_markers = self._compression_markers[compression_module] except KeyError as exc: raise ValueError("Unknown compression module") from exc self.__compression_marker = compression_markers[0] self.__compress = compression_markers[1] if self.__compress is None: self._compress = None @property def size(self): return self._cache.weight def __len__(self): """ How many distinct OIDs are stored. """ return len(self._cache) def __iter__(self): return iter(self._cache) def keys(self): return self._cache.keys() def _decompress(self, data): pfx = data[:2] if pfx not in self._decompression_functions: return data return self._decompression_functions[pfx](data[2:]) def _compress(self, data): # pylint:disable=method-hidden # We override this if we're disabling compression # altogether. # Use the same basic rule as zc.zlibstorage, but bump the object size up from 20; # many smaller object (under 100 bytes) like you get with small btrees, # tend not to compress well, so don't bother. if data and (len(data) > 100) and data[:2] not in self._decompression_functions: compressed = self.__compression_marker + self.__compress(data) if len(compressed) < len(data): return compressed return data @_log_timed def save(self, object_index=None, checkpoints=None, **sqlite_args): options = self.options if not options.cache_local_dir or self.size <= self.__initial_weight: return None try: conn = sqlite_connect(options, self.prefix, **sqlite_args) except FAILURE_TO_OPEN_DB_EXCEPTIONS: logger.exception("Failed to open sqlite to write") return 0 with closing(conn): self.write_to_sqlite(conn, checkpoints, object_index) # Testing: Return a signal when we tried to write # something. return 1 def restore(self): """ Load the data from the persistent database. Returns the checkpoint data last saved, which may be None if there was no data. """ options = self.options if not options.cache_local_dir: return None try: conn = sqlite_connect(options, self.prefix) except FAILURE_TO_OPEN_DB_EXCEPTIONS: logger.exception("Failed to read data from sqlite") return None with closing(conn): return self.read_from_sqlite(conn) @_log_timed def remove_invalid_persistent_oids(self, bad_oids): """ Remove data from the persistent cache for the given oids. """ options = self.options if not options.cache_local_dir: return count_removed = 0 conn = '(no oids to remove)' if bad_oids: self.invalidate_all(bad_oids) conn = sqlite_connect(options, self.prefix) with closing(conn): db = Database.from_connection(conn) count_removed = db.remove_invalid_persistent_oids(bad_oids) logger.debug("Removed %d invalid OIDs from %s", count_removed, conn) def zap_all(self): _, destroy = sqlite_files(self.options, self.prefix) destroy() # zapping happens frequently during test runs, # and during zodbconvert when the process will exist # only for a short time. self.flush_all() def flush_all(self): if self._cache or self._cache is None: # Only actually abandon the cache object # if it has data. Otherwise let it keep the # preallocated CFFI objects it may have # (those are expensive to create and tests call # this a LOT) byte_limit = self.limit self._cache = cache.PyCache( byte_limit * self._gen_eden_pct, byte_limit * self._gen_protected_pct, byte_limit * self._gen_probation_pct ) self._peek = self._cache.peek self.reset_stats() def reset_stats(self): self._cache.reset_stats() self._aged_at = 0 self._next_age_at = 1000 def stats(self): total = self._cache.hits + self._cache.misses return { 'hits': self._cache.hits, 'misses': self._cache.misses, 'sets': self._cache.sets, 'ratio': self._cache.hits / total if total else 0, 'len': len(self), 'bytes': self.size, } def __contains__(self, oid_tid): oid, tid = oid_tid return self._cache.get_item_with_tid(oid, tid) is not None def contains_oid_with_newer_tid(self, oid, tid): return self._cache.contains_oid_with_newer_tid(oid, tid) def get(self, oid_tid, peek=False): oid, tid = oid_tid assert tid is None or tid >= 0 decompress = self._decompress value = None if peek: value = self._cache.peek_item_with_tid(oid, tid) else: value = self._cache.get_item_with_tid(oid, tid) # Finally, decompress if needed. # Recall that for deleted objects, `state` can be None. # TODO: This is making a copy of the string from C++ even if we # don't need to decompress. if value is None: return None state, tid = value return ((decompress(state) if state else state), tid) __getitem__ = get def _age(self): # Age only when we're full and would thus need to evict; this # makes initial population faster. It's cheaper to calculate this # AFTER the operations, though, because we read it from C. #if self.size < self.limit: # return # Age the whole thing periodically based on the number of # operations we've done that would have altered popularity. # Dynamically calculate how often we need to age. By default, this is # based on what Caffeine's PerfectFrequency does: 10 * max # cache entries. # # We don't take a lock to do this; it's fine if two threads # attempt it at the same time. age_period = self._age_factor * len(self._cache) operations = self._cache.hits + self._cache.sets if operations - self._aged_at < age_period: self._next_age_at = age_period return None if self.size < self.limit: return None self._aged_at = operations now = time.time() logger.debug("Beginning frequency aging for %d cache entries", len(self._cache)) self._cache.age_frequencies() done = time.time() logger.debug("Aged %d cache entries in %s", len(self._cache), done - now) self._next_age_at = int(self._aged_at * 1.5) # in case the dict shrinks return self._aged_at def __setitem__(self, oid_tid, state_bytes_tid): if not self.limit: # don't bother return # A state of 'None' happens for undone transactions. oid, key_tid = oid_tid state_bytes, actual_tid = state_bytes_tid # Really key_tid should be > 0; we allow >= for tests. assert key_tid == actual_tid and key_tid >= 0 self.set_all_for_tid(key_tid, [(state_bytes, oid, None)]) def set_all_for_tid(self, tid_int, state_oid_iter): if self.limit: self._cache.set_all_for_tid(tid_int, state_oid_iter, self._compress, self._value_limit) # Inline some of the logic about whether to age or not; avoiding the # call helps speed if self._cache.hits + self._cache.sets > self._next_age_at: self._age() def __delitem__(self, oid_tid): self.delitems({oid_tid[0]: oid_tid[1]}) def delitems(self, oids_tids): """ For each OID/TID pair in the items, remove all cached values for OID that are older than TID. """ self._cache.delitems(oids_tids) def invalidate_all(self, oids): self._cache.del_oids(oids) def freeze(self, oids_tids): self._cache.freeze(oids_tids) def close(self): pass release = close def new_instance(self): return self def updating_delta_map(self, deltas): return deltas def _bulk_update(self, keys_and_values, source='<unknown>', log_count=None, mem_usage_before=None): """ Insert all the ``(oid, (state, actual_tid, frozen, frequency)))`` pairs found in *keys_and_values*, rejecting entries once we get too full. *keys_and_values* should be ordered from least to most recently used. This can only be done in an empty cache. """ now = time.time() mem_usage_before = mem_usage_before if mem_usage_before is not None else get_memory_usage() mem_usage_before_this = get_memory_usage() log_count = log_count or len(keys_and_values) data = self._cache if data: raise ValueError("Only bulk load into an empty cache.") stored = data.add_MRUs( keys_and_values, return_count_only=True) then = time.time() del keys_and_values # For memory reporting. mem_usage_after = get_memory_usage() logger.info( "Bulk update of empty cache: Examined %d and stored %d items from %s in %s using %s. " "(%s local) (%s)", log_count, stored, getattr(source, 'name', source), then - now, byte_display(mem_usage_after - mem_usage_before), byte_display(mem_usage_after - mem_usage_before_this), self) return log_count, stored @_log_timed def read_from_sqlite(self, connection): import gc gc.collect() # Free memory, we're about to make big allocations. mem_before = get_memory_usage() db = Database.from_connection(connection) checkpoints = db.checkpoints @_log_timed def fetch_and_filter_rows(): # Do this here so that we don't have the result # in a local variable and it can be collected # before we measure the memory delta. # # In large benchmarks, this function accounts for 57% # of the total time to load data. 26% of the total is # fetching rows from sqlite, and 18% of the total is allocating # storage for the blob state. # # We make one call into sqlite and let it handle the iterating. # Items are (oid, key_tid, state, actual_tid). # key_tid may equal the actual tid, or be -1 when the row was previously # frozen; # That doesn't matter to us, we always freeze all rows. size = 0 limit = self.limit items = [] rows = db.fetch_rows_by_priority() for oid, frozen, state, actual_tid, frequency in rows: size += len(state) if size > limit: break items.append((oid, (state, actual_tid, frozen, frequency))) consume(rows) # Rows came to us MRU to LRU, but we need to feed them the other way. items.reverse() return items # In the large benchmark, this is 25% of the total time. # 18% of the total time is preallocating the entry nodes. self._bulk_update(fetch_and_filter_rows(), source=connection, mem_usage_before=mem_before) return checkpoints def _items_to_write(self, stored_oid_tid): # pylint:disable=too-many-locals all_entries_len = len(self._cache) # Only write the newest entry for each OID. # Newly added items have a frequency of 1. They *may* be # useless stuff we picked up from an old cache file and # haven't touched again, or they may be something that # really is new and we have no track history for (that # would be in eden generation). # # Ideally we want to do something here that's similar to what # the SegmentedLRU does: if we would reject an item from eden, # then only keep it if it's better than the least popular # thing in probation. # # It's not clear that we have a good algorithm for this, and # what we tried takes a lot of time, so we don't bother. # TODO: Finish tuning these. # But we have a problem: If there's an object in the existing # cache that we have updated but now exclude, we would fail to # write its new data. So we can't exclude anything that's # changed without also nuking it from the DB. # Also, if we had a value loaded from the DB, and then we # modified it, but later ejected all entries for that object # from the cache, we wouldn't see it here, and we could be # left with stale data in the database. We must track the # (oid, tid) we load from the database and record that we # should DELETE those rows if that happens. # This is done with the __min_allowed_writeback dictionary, # which we mutate here: When we find a row we can write that meets # the requirement, we take it out of the dictionary. # Later, if there's anything left in the dictionary, we *know* there # may be stale entries in the cache, and we remove them. # The *object_index* is our best polling data; anything it has it gospel, # so if it has an entry for an object, it superceeds our own. get_current_oid_tid = stored_oid_tid.get matching_tid_count = 0 # When we accumulate all the rows here before returning them, # this function shows as about 3% of the total time to save # in a very large database. with _timer() as t: for oid, lru_entry in self._cache.iteritems(): newest_value = lru_entry.newest_value # We must have something at least this fresh # to consider writing it out if newest_value is None: raise AssertionError("Value should not be none", oid, lru_entry) actual_tid = newest_value.tid # If we have something >= min_allowed, matching # what's in the database, or even older (somehow), # it's not worth writing to the database (states should be identical). current_tid = get_current_oid_tid(oid, -1) if current_tid >= actual_tid: matching_tid_count += 1 continue yield (oid, actual_tid, newest_value.frozen, bytes(newest_value.state), lru_entry.frequency) # We're able to satisfy this, so we don't need to consider # it in our min_allowed set anymore. # XXX: This isn't really the right place to do this. # Move this and write a test. removed_entry_count = matching_tid_count logger.info( "Storing persistent cache: Examined %d entries and rejected %d " "(already in db: %d) in %s", all_entries_len, removed_entry_count, matching_tid_count, t.duration) @_log_timed def write_to_sqlite(self, connection, checkpoints, object_index=None): # pylint:disable=too-many-locals mem_before = get_memory_usage() object_index = object_index or OidTMap() cur = connection.cursor() db = Database.from_connection(connection) begin = time.time() # In a large benchmark, store_temp() accounts for 32% # of the total time, while move_from_temp accounts for 49%. # The batch size depends on how many params a stored proc can # have; if we go too big we get OperationalError: too many SQL # variables. The default is 999. # Note that the multiple-value syntax was added in # 3.7.11, 2012-03-20. with _timer() as batch_timer: cur.execute('BEGIN') stored_oid_tid = db.oid_to_tid fetch_current = time.time() count_written, _ = db.store_temp(self._items_to_write(stored_oid_tid)) cur.execute("COMMIT") # Take out (reserved write) locks now; if we don't, we can get # 'OperationalError: database is locked' (the SQLITE_BUSY # error code; the SQLITE_LOCKED error code is about table # locks and has the string 'database table is locked') But # beginning immediate lets us stand in line. # # Note that the SQLITE_BUSY error on COMMIT happens "if an # another thread or process has a shared lock on the database # that prevented the database from being updated.", But "When # COMMIT fails in this way, the transaction remains active and # the COMMIT can be retried later after the reader has had a # chance to clear." So we could retry the commit a couple # times and give up if can't get there. It looks like one # production database takes about 2.5s to execute this entire function; # it seems like most instances that are shutting down get to this place # at roughly the same time and stack up waiting: # # Instance | Save Time | write_to_sqlite time # 1 | 2.36s | 2.35s # 2 | 5.31s | 5.30s # 3 | 6.51s | 6.30s # 4 | 7.91s | 7.90s # # That either suggests that all waiting happens just to acquire this lock and # commit this transaction (and that subsequent operations are inconsequential # in terms of time) OR that the exclusive lock isn't truly getting dropped, # OR that some function in subsequent processes is taking longer and longer. # And indeed, our logging showed that a gc.collect() at the end of this function was # taking more and more time: # # Instance | GC Time | Reported memory usage after GC # 1 | 2.00s | 34.4MB # 2 | 3.50s | 83.2MB # 3 | 4.94s | 125.9MB # 4 | 6.44 | 202.1MB # # The exclusive lock time did vary, but not as much; trim time # didn't really vary: # # Instance | Exclusive Write Time | Batch Insert Time | Fetch Current # 1 | 0.09s | 0.12s | 0.008s # 2 | 1.19s | 0.44s | 0.008s # 3 | 0.91s | 0.43s | 0.026s # 4 | 0.92s | 0.34s | 0.026s with _timer() as exclusive_timer: cur.execute('BEGIN IMMEDIATE') # During the time it took for us to commit, and the time that we # got the lock, it's possible that someone else committed and # changed the data in object_state # Things might have changed in the database since our # snapshot where we put temp objects in, so we can't rely on # just assuming that's the truth anymore. rows_inserted = db.move_from_temp() if checkpoints: db.update_checkpoints(*checkpoints) cur.execute('COMMIT') # TODO: Maybe use BTrees.family.intersection to get the common keys? # Or maybe use a SQLite Python function? # 'DELETE from object_state where is_stale(zoid, tid)' # What's fastest? The custom function version would require scanning the entire # table; unfortunately this sqlite interface doesn't allow defining virtual tables. with _timer() as trim_timer: # Delete anything we still know is stale, because we # saw a new TID for it, but we had nothing to replace it with. min_allowed_writeback = OidTMap() db.trim_to_size(self.limit, min_allowed_writeback) del min_allowed_writeback # Cleanups cur.close() del cur db.close() # closes the connection. del db del stored_oid_tid # We're probably shutting down, don't perform a GC; we see # that can get quite lengthy. end = time.time() stats = self.stats() mem_after = get_memory_usage() logger.info( "Wrote %d items to %s in %s " "(%s to fetch current, %s to insert batch (%d rows), %s to write, %s to trim). " "Used %s memory. (Storage: %s) " "Total hits %s; misses %s; ratio %s (stores: %s)", count_written, connection, end - begin, fetch_current - begin, batch_timer.duration, rows_inserted, exclusive_timer.duration, trim_timer.duration, byte_display(mem_after - mem_before), self._cache, stats['hits'], stats['misses'], stats['ratio'], stats['sets']) return count_written