##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
# pylint:disable=too-many-lines
import logging
import os
import threading
from persistent.timestamp import TimeStamp
from ZODB.POSException import ReadConflictError
from ZODB.utils import p64
from zope import interface
from relstorage._compat import IN_TESTRUNNER
from relstorage._compat import OID_SET_TYPE as OIDSet
from relstorage._compat import OID_TID_MAP_TYPE as OidTMap
from relstorage._util import bytes8_to_int64
from relstorage._mvcc import DetachableMVCCDatabaseViewer
from relstorage.cache import persistence
from relstorage.cache.interfaces import IStorageCache
from relstorage.cache.interfaces import IPersistentCache
from relstorage.cache.interfaces import CacheConsistencyError
from relstorage.cache.local_client import LocalClient
from relstorage.cache.memcache_client import MemcacheStateCache
from relstorage.cache.trace import ZEOTracer
from relstorage.cache._statecache_wrappers import MultiStateCache
from relstorage.cache._statecache_wrappers import TracingStateCache
from relstorage.cache.mvcc import MVCCDatabaseCoordinator
logger = log = logging.getLogger(__name__)
class _UsedAfterRelease(object):
size = limit = 0
def __len__(self):
return 0
def __call__(self):
raise NotImplementedError
close = reset_stats = release = unregister = lambda self, *args: None
stats = lambda s: {}
new_instance = lambda s: s
_UsedAfterRelease = _UsedAfterRelease()
[docs]
@interface.implementer(IStorageCache, IPersistentCache)
class StorageCache(DetachableMVCCDatabaseViewer):
"""RelStorage integration with memcached or similar.
Holds a list of memcache clients in order from most local to
most global. The first is a LocalClient, which stores the cache
in the Python process, but shares the cache between threads.
"""
# pylint:disable=too-many-instance-attributes,too-many-public-methods
__slots__ = (
'adapter',
'options',
'keep_history',
'prefix',
'polling_state',
'local_client',
'cache',
'object_index',
)
if IN_TESTRUNNER:
class MVCCInternalConsistencyError(Exception):
"This can never be raised or caught."
else:
MVCCInternalConsistencyError = AssertionError
def __init__(self, adapter, options, prefix, _parent=None):
super().__init__()
self.adapter = adapter
self.options = options
self.keep_history = options.keep_history
self.prefix = prefix or ''
if _parent is None:
# I must be the master!
# This is shared between all instances of a cache in a tree,
# including the master, so that they can share information about
# polling.
self.polling_state = MVCCDatabaseCoordinator(self.options)
self.local_client = LocalClient(options, self.prefix)
shared_cache = MemcacheStateCache.from_options(options, self.prefix)
if shared_cache is not None:
self.cache = MultiStateCache(self.local_client, shared_cache)
else:
self.cache = self.local_client
tracefile = persistence.trace_file(options, self.prefix)
if tracefile:
tracer = ZEOTracer(tracefile)
tracer.trace(0x00)
self.cache = TracingStateCache(self.cache, tracer)
else:
self.polling_state = _parent.polling_state # type: MVCCDatabaseCoordinator
self.local_client = _parent.local_client.new_instance()
self.cache = _parent.cache.new_instance()
# Once we have registered with the MVCCDatabaseCoordinator,
# we cannot make any changes to our own mvcc state without
# letting it know about them. In particular, that means we must
# not just assign to this object (except under careful circumstances
# where we're sure to be single threaded.)
# This object can be None
self.object_index = None
# It is also important not to register with the coordinator until
# we are fully initialized; we could be constructing a new_instance
# in a separate thread while polling is going on in other threads.
# We can get strange AttributeError if a partially constructed instance
# is exposed.
self.polling_state.register(self)
if _parent is None:
self.restore()
@property
def current_tid(self):
# testing
return self.highest_visible_tid
# XXX: Note that our __bool__ and __len__ are NOT consistent
def __bool__(self):
return True
__nonzero__ = __bool__
def __len__(self):
return len(self.local_client)
@property
def size(self):
return self.local_client.size
@property
def limit(self):
return self.local_client.limit
[docs]
def stats(self):
"""
Return stats. This is a debugging aid only. The format is undefined and intended
for human inspection only.
"""
stats = self.local_client.stats()
stats['local_index_stats'] = self.object_index.stats() if self.object_index else None
stats['global_index_stats'] = self.polling_state.stats()
return stats
def __repr__(self):
return "<%s at 0x%x hvt=%s bytes=%d len=%d state=%r>" % (
self.__class__.__name__,
id(self),
self.highest_visible_tid,
self.size,
len(self),
self.polling_state,
)
def reset_stats(self):
self.local_client.reset_stats()
[docs]
def new_instance(self, before=None, adapter=None):
"""
Return a copy of this instance sharing the same local client
and having the most current view of the database as collected
by any instance.
If *before* is given, the new cache will use a distinct
:class:`MVCCDatabaseCoordinator` so that
its usage pattern does not interfere.
"""
klass = type(self) if before is None else _BeforeStorageCache
cache = klass(adapter or self.adapter,
self.options, self.prefix,
_parent=self)
return cache
[docs]
def release(self):
"""
Release resources held by this instance.
This does not corrupt shared state, and must be called
on each instance that's not the root.
This is usually memcache connections if they're in use.
"""
self.cache.release()
# Release our clients. If we had a non-shared local cache,
# this will also allow it to release any memory it's holding.
self.local_client = self.cache = _UsedAfterRelease
self.polling_state.unregister(self)
self.polling_state = _UsedAfterRelease
self.object_index = None
self.highest_visible_tid = None
[docs]
def close(self, **save_args):
"""
Release resources held by this instance, and
save any persistent data necessary.
This is only called on the root. If there are still instances
that haven't been released, they'll be broken.
"""
# grab things that will be reset in release()
cache = self.cache
polling_state = self.polling_state
# Go ahead and release our polling_state now, in case
# it helps to vacuum for save.
self.polling_state.unregister(self)
self.save(**save_args)
self.release()
cache.close()
polling_state.close()
[docs]
def save(self, **save_args):
"""
Store any persistent client data.
"""
if self.options.cache_local_dir and len(self) > 0: # pylint:disable=len-as-condition
# (our __bool__ is not consistent with our len)
stats = self.local_client.stats()
if stats['hits'] or stats['sets']:
# Only write this out if (1) it proved useful OR (2)
# we've made modifications. Otherwise, we're writing a consolidated
# file for no good reason.
# TODO: Consider the correctness here, now that we have a
# more accurate cache. Should that maybe be AND?
return self.polling_state.save(self, save_args)
logger.debug("Cannot justify writing cache file, no hits or misses")
return None
def restore(self):
# We must only restore into an empty cache.
state = self.polling_state
assert not self.local_client
state.restore(self.adapter, self.local_client)
def _reset(self, message=None):
"""
Reset the transaction state of only this instance.
If this is being done in a transactional way, it must be followed
by raising an exception. If the *message* parameter is provided,
then a ``CacheConsistencyError`` will be raised when this
method returns.
"""
# As if we've never polled
self.polling_state.reset_viewer(self)
self.polling_state.flush_all()
if message:
raise CacheConsistencyError(message)
[docs]
def clear(self, load_persistent=True):
"""
Remove all data from the cache, both locally (and shared among
other instances), and globally.
Called by speed tests.
Starting from the introduction of persistent cache files, this
also results in the local client being repopulated with the
current set of persistent data. The *load_persistent* keyword
can be used to control this.
.. versionchanged:: 2.0b6 Added the ``load_persistent``
keyword. This argument is provisional.
"""
self._reset()
self.polling_state.flush_all()
self.cache.flush_all()
if load_persistent:
self.restore()
[docs]
def zap_all(self):
"""
Remove all data from the cache, both locally (and shared among
other instances, and globally); in addition, remove any
persistent cache files on disk.
"""
self.local_client.zap_all()
self.clear(load_persistent=False)
def _check_tid_after_load(self, oid_int, actual_tid_int,
expect_tid_int=None,
cursor=None):
"""Verify the tid of an object loaded from the database is sane."""
if actual_tid_int is not None and actual_tid_int > self.highest_visible_tid:
# Strangely, the database just gave us data from a future
# transaction. We can't give the data to ZODB because that
# would be a consistency violation. However, the cause is
# hard to track down, so issue a ReadConflictError and
# hope that the application retries successfully.
msg = ("Got data for OID 0x%(oid_int)x from "
"future transaction %(actual_tid_int)d (%(got_ts)s). "
"Current transaction is %(hvt)s (%(current_ts)s)."
% {
'oid_int': oid_int,
'actual_tid_int': actual_tid_int,
'hvt': self.highest_visible_tid,
'got_ts': str(TimeStamp(p64(actual_tid_int))),
'current_ts': str(TimeStamp(p64(self.highest_visible_tid))),
})
raise ReadConflictError(msg)
if expect_tid_int is not None and actual_tid_int != expect_tid_int:
# Uh-oh, the cache is inconsistent with the database.
# We didn't get a TID from the future, but it's not what we
# had in our delta_after0 map, which means...we missed a change
# somewhere.
#
# Possible causes:
#
# - The database MUST provide a snapshot view for each
# session; this error can occur if that requirement is
# violated. For example, MySQL's MyISAM engine is not
# sufficient for the object_state table because MyISAM
# can not provide a snapshot view. (InnoDB is
# sufficient.)
#
# - (Similar to the last one.) Using too low of a
# isolation level for the database connection and
# viewing unrelated data.
#
# - Something could be writing to the database out
# of order, such as a version of RelStorage that
# acquires a different commit lock.
#
# - A software bug. In the past, there was a subtle bug
# in after_poll() that caused it to ignore the
# transaction order, leading it to sometimes put the
# wrong tid in delta_after*.
#
# - Restarting a load connection at a future point we hadn't
# actually polled to, such that our current_tid is out of sync
# with the connection's *actual* viewable tid?
from pprint import pformat
from relstorage._util import int64_to_8bytes
msg = ("Detected an inconsistency "
"between the RelStorage cache and the database "
"while loading an object using the MVCC index. "
"Please verify the database is configured for "
"ACID compliance and that all clients are using "
"the same commit lock. Info:\n%s"
% pformat({
'oid_int': oid_int,
'expect_tid_int': expect_tid_int,
'actual_tid_int': actual_tid_int,
# Typically if this happens we get something newer than we expect.
'actual_expect_delta': actual_tid_int - expect_tid_int,
'expect_tid': str(TimeStamp(int64_to_8bytes(expect_tid_int))),
'actual_tid': str(TimeStamp(int64_to_8bytes(actual_tid_int))),
'current_tid': self.highest_visible_tid,
'pid': os.getpid(),
'thread_ident': threading.current_thread(),
'cursor': cursor,
}))
# We reset ourself as if we hadn't polled, and hope the transient
# error gets retried in a working, consistent view.
self._reset(msg)
[docs]
def loadSerial(self, oid_int, tid_int):
"""
Return the locally cached state for the object *oid_int* as-of
exactly *tid_int*.
If that state is not available in the local cache, return
nothing.
This is independent of the current transaction and polling state, and
may return data from the future.
If the storage hasn't polled invalidations, or if there are other viewers
open at transactions in the past, it may also return data from the past
that has been overwritten (in history-free storages).
"""
# We use only the local client because, for history-free storages,
# it's the only one we can be reasonably sure has been
# invalidated by a local pack. Also, our point here is to avoid
# network traffic, so it's no good going to memcache for what may be
# a stale answer.
cache = self.local_client
# Don't take this as an MRU hit; if we succeed, we'll
# put new cached data in for this OID and do that anyway.
cache_data = cache.get((oid_int, tid_int), False)
if cache_data and cache_data[1] == tid_int:
return cache_data[0]
return None
[docs]
def load(self, cursor, oid_int):
"""
Load the given object from cache if possible.
Fall back to loading from the database.
Returns (state_bytes, tid_int).
"""
# pylint:disable=too-many-statements,too-many-branches,too-many-locals
if not self.object_index:
# No poll has occurred yet. For safety, don't use the cache.
# Note that without going through the cache, we can't
# go through tracing either.
return self.adapter.mover.load_current(cursor, oid_int)
# Get the object from the transaction specified
# by the following values, in order:
#
# 1. self.object_index[oid_int]
#
# An entry in object_index means we've polled for and know the exact
# TID for this object, either because we polled, or because someone
# loaded it and put it in the index. If we know a TID, we must *never*
# use the wildcard frozen value (it's possible to have an older frozen tid that's
# valid for older transactions, but out of date for this one.) That's handled
# internally in the clients.
cache = self.cache
index = self.object_index
indexed_tid_int = index[oid_int] # Could be None pylint:disable=unsubscriptable-object
key = (oid_int, indexed_tid_int)
cache_data = cache[key]
if cache_data and indexed_tid_int is None and cache_data[1] > self.highest_visible_tid:
# Cache hit on a wildcard, but we need to verify the wildcard
# and it didn't pass. This situation should be impossible.
cache_data = None
if cache_data:
# Cache hit, non-wildcard or wildcard matched.
return cache_data
# Cache miss.
state, actual_tid_int = self.adapter.mover.load_current(
cursor, oid_int)
if actual_tid_int:
# If either is None, the object was deleted.
self._check_tid_after_load(oid_int, actual_tid_int, indexed_tid_int, cursor)
# We may or may not have had an index entry, but make sure we do now.
# Eventually this will age to be frozen again if needed.
index[oid_int] = actual_tid_int # pylint:disable=unsupported-assignment-operation
cache[(oid_int, actual_tid_int)] = (state, actual_tid_int)
return state, actual_tid_int
# This is in the bytecode as a LOAD_CONST
return None, None
def prefetch(self, cursor, oid_ints):
# Just like load(), but we only fetch the OIDs
# we can't find in the cache.
if not self.object_index:
# No point even trying, we would just throw the results away
return
cache = self.cache
if cache is self.local_client and not cache.limit:
# No point.
return
index = self.object_index
# We don't actually need the cache data, so avoid asking
# for it. That would trigger stats updates (hits/misses)
# and move it to the front of the LRU list. But this is just
# in advance, we don't know if it will actually be used.
# `in` has a race condition (it could be evicted soon), but
# if it is, there was probably something else more important
# going on.
to_fetch = {oid_int
for oid_int in oid_ints
if (oid_int, index[oid_int]) not in cache} # pylint:disable=unsubscriptable-object
if not to_fetch:
return
for oid, state, tid_int in self.adapter.mover.load_currents(cursor, to_fetch):
key = (oid, tid_int)
self._check_tid_after_load(oid, tid_int, cursor=cursor)
cache[key] = (state, tid_int)
index[oid] = tid_int # pylint:disable=unsupported-assignment-operation
def prefetch_for_conflicts(self, cursor, oid_tid_pairs):
results = {}
to_fetch = OidTMap()
cache_get = self.cache.get
# if we've never polled, we can't actually use our cache, will
# just have to make a bulk query.
if not self.object_index:
to_fetch = OidTMap(oid_tid_pairs)
else:
for key in oid_tid_pairs:
# Don't update stats/MRU, just as with normal prefetch().
# It's also important here to avoid taking the lock.
# We don't store this prefetched data back into the cache because
# we're just about to overwrite it; we'd have to have multiple writers
# all with the same initial starting TID lined up to write to the object
# for that to have any benefit.
cache_data = cache_get(key, peek=True)
if not cache_data:
to_fetch[key[0]] = key[1]
else:
results[key[0]] = cache_data
assert cache_data[1] == key[1]
if to_fetch:
check = self._check_tid_after_load if self.object_index else lambda *_, **kw__: None
for oid, state, tid_int in self.adapter.mover.load_currents(cursor, to_fetch):
check(oid, tid_int, to_fetch[oid], cursor=cursor)
results[oid] = (state, tid_int)
return results
[docs]
def remove_cached_data(self, oid_int, tid_int):
"""
See notes in `invalidate_all`.
"""
del self.cache[(oid_int, tid_int)]
[docs]
def remove_all_cached_data_for_oids(self, oids):
"""
Invalidate all cached data for the given OIDs.
This isn't transactional or locked so it may still result in
this or others seeing invalid (ha!) states.
This is a specialized API. It allows violation of our internal
consistency constraints. It should only be used when the
database is being manipulated at a low level, such as during
pack or undo.
"""
# Erase our knowledge of where to look
# self._invalidate_all(oids)
# Remove the data too.
self.cache.invalidate_all(oids)
[docs]
def after_tpc_finish(self, tid, temp_storage):
"""
Flush queued changes.
This is called after the database commit lock is released,
but before control is returned to the Connection.
Now that this tid is known, send all queued objects to the
cache. The cache will have ``(oid, tid)`` entry for each object
we have been holding on to (well, in a big transaction, some of them
might actually not get stored in the cache. But we try!)
"""
tid_int = bytes8_to_int64(tid)
self.cache.set_all_for_tid(tid_int, temp_storage)
def poll(self, conn, cursor, ignore_tid):
try:
changes = self.polling_state.poll(self, conn, cursor)
except self.MVCCInternalConsistencyError: # pragma: no cover
logger.critical(
"Internal consistency violation in the MVCC coordinator. "
"Please report a bug to the RelStorage maintainers. "
"Flushing caches for safety. ",
exc_info=True
)
self._reset("Unknown internal violation")
if changes is None:
return None
return OIDSet(oid for oid, tid in changes if tid != ignore_tid)
class _BeforeStorageCache(StorageCache):
__slots__ = ()
def poll(self, conn, cursor, _):
# Grab whatever index we have the first time we poll,
# and then immediately drop away from the polling state.
# We don't want our increasingly-stale view of the database
# (which isn't even really quite accurate in terms of what we
# need: TODO: Should we do that?) to hold up vacuums (even
# though it will hold up vacuum in the real RDBMS).
self.polling_state.poll(self, conn, cursor)
# We'll never try to poll again, so this is ok.
self.polling_state.unregister(self)
return ()