##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Pack/Undo implementations.
"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import logging
from contextlib import contextmanager
from ZODB.POSException import UndoError
from ZODB.utils import u64
from zope.interface import implementer
from .._util import metricmethod
from .._compat import perf_counter
from .._compat import OidList
from .._util import byte_display
from .._util import get_memory_usage
from .._util import get_duration_from_environ
from .._util import get_positive_integer_from_environ
from ..treemark import TreeMarker
from .schema import Schema
from .connections import LoadConnection
from .connections import StoreConnection
from .connections import PrePackConnection
from .interfaces import IPackUndo
from ._util import DatabaseHelpersMixin
from .sql import it
# pylint:disable=too-many-lines,unused-argument
logger = logging.getLogger(__name__)
[docs]
class PackUndo(DatabaseHelpersMixin):
"""Abstract base class for pack/undo"""
_choose_pack_transaction_query = None
driver = None
connmanager = None
runner = None
locker = None
options = None
###
# Parameters. Some are used by only history-free or history-preserving
# implementations.
###
cursor_arraysize = get_positive_integer_from_environ('RS_PACK_CURSOR_ARRAYSIZE',
4096,
logger=logger)
# These are generally very small rows we're uploading (a few integers).
# Be generous.
store_batch_size = get_positive_integer_from_environ('RS_PACK_STORE_BATCH_SIZE', 4096,
logger=logger)
# How often, in seconds, to commit work in progress.
# This is a variable here for testing.
fill_object_refs_commit_frequency = get_duration_from_environ('RS_PACK_COMMIT_FREQUENCY', 120,
logger=logger)
# How many object states to find references in at any one time.
# This is a control on the amount of memory used by the Python
# process during packing, especially if the database driver
# doesn't use server-side cursors.
fill_object_refs_batch_size = get_positive_integer_from_environ('RS_PACK_DOWNLOAD_BATCH_SIZE',
1024,
logger=logger)
def __init__(self, database_driver, connmanager, runner, locker, options):
self.driver = database_driver
self.connmanager = connmanager
self.runner = runner
self.locker = locker
self.options = options
[docs]
def with_options(self, options):
"""
Return a new instance that will use the given options, instead
of the options originally constructed.
"""
if options == self.options:
# If the options haven't changed, return ourself. This is
# for tests that make changes to the structure of this
# object not captured in the constructor or options.
# (checkPackWhileReferringObjectChanges)
return self
result = self.__class__(self.driver, self.connmanager, self.runner, self.locker, options)
# Setting the MAX_TID is important for SQLite.
# This should probably be handled directly in subclasses.
for k, v in vars(self).items():
if k != 'options' and getattr(result, k, None) is not v:
setattr(result, k, v)
return result
[docs]
def choose_pack_transaction(self, pack_point):
"""Return the transaction before or at the specified pack time.
Returns None if there is nothing to pack.
"""
conn, cursor = self.connmanager.open()
try:
__traceback_info__ = self, pack_point
self._choose_pack_transaction_query.execute(cursor, {'tid': pack_point})
rows = cursor.fetchall()
if not rows:
# Nothing needs to be packed.
return None
return rows[0][0]
finally:
self.connmanager.close(conn, cursor)
@contextmanager
def _make_ss_load_cursor(self, load_connection):
# server_side_cursor() is a generator function. If we just call it and don't
# enter it, then when we try to set values on it, we're setting them on the
# generator, not the actual cursor, meaning it does no good.
with load_connection.server_side_cursor() as ss_load_cursor:
yield self._set_cursor_sizes(ss_load_cursor)
def _set_cursor_sizes(self, cursor):
cursor.arraysize = self.cursor_arraysize
try:
cursor.itersize = self.cursor_arraysize
except AttributeError:
pass
return cursor
def _make_load_batcher(self, load_connection):
load_batcher = self.locker.make_batcher(load_connection.cursor)
load_batcher.row_limit = self.fill_object_refs_batch_size
return load_batcher
def _make_store_batcher(self, store_connection):
store_batcher = self.locker.make_batcher(store_connection.cursor)
store_batcher.row_limit = max(store_batcher.row_limit, self.store_batch_size)
return store_batcher
# Subclasses (notably Oracle) can define this to provide hints
# that affect graph traversal.
#
# We cannot include the hints Oracle wants as standard; /*+ ... */
# is also the syntax for a MySQL 5.7 optimizer hint, but FULL(...)
# isn't valid syntax, so it produces a warning (and some
# frameworks/drivers want to treat warnings as errors, or print
# them).
#
# The alternate comment syntax for Oracle hints, --+ ..., isn't a
# valid MySQL comment (MySQL requires whitespace after --) and raises
# a syntax error.
#
# PostgreSQL doesn't have hints, so this is a no-op there.
_traverse_graph_optimizer_hint = ''
def _traverse_graph(self, load_connection, store_connection):
"""
Visit the entire object graph to find out what should be
kept.
Sets the pack_object.keep flags.
Must not read from the ``object_state`` table or any other table that
could be inconsistent with the original snapshot view of references
established by :meth:`pre_pack`.
*cursor* is a writable store connection cursor.
"""
logger.info("pre_pack: downloading pack_object and object_ref.")
# Ensure we're up-to-date and can view the data in pack_object.
# Note that we don't just use restart() here: if we haven't actually
# opened the cursor yet, restart() won't do anything. But on MySQL,
# (because we TRUNCATE'd the pack_object table?) if we don't actually
# rollback, we get
# OperationalError: 1412, 'Table definition has changed, please retry transaction'
load_connection.rollback_quietly()
marker = TreeMarker()
# Download the graph of object references into the TreeMarker.
# TODO: We can probably do much or most of this in SQL, at least
# in recent databases that support recursive WITH queries?
# XXX: In history-free mode, ``pack_object`` contains exactly
# the set of OIDs that are present in ``object_state`` and I
# *think* that ``pack_object.keep_tid`` is always going to be equal to
# ``object_ref.tid``. We may get better behaviour if we join
# against that table here
with self._make_ss_load_cursor(load_connection) as ss_load_cursor:
stmt = """
SELECT {}
zoid, object_ref.to_zoid
FROM object_ref
INNER JOIN pack_object USING (zoid)
WHERE object_ref.tid >= pack_object.keep_tid
ORDER BY zoid, object_ref.to_zoid
""".format(self._traverse_graph_optimizer_hint)
ss_load_cursor.execute(stmt)
while True:
# XXX: Why are we not just iterating the cursor?
# Does add_refs need an iterable (iterate many times?)
rows = ss_load_cursor.fetchmany(self.cursor_arraysize)
if not rows:
break
marker.add_refs(rows)
# Use the TreeMarker to find all reachable objects, starting
# with the ones that are known reachable. These are the roots:
#
# - ZOID 0 which is explicitly marked as such
#
# - In history preserving databases where we are not doing GC,
# this includes all objects (except those explicitly
# deleted) --- but we don't actually call this method for
# the HP-no-gc case.
#
# - In history preserving *with* gc, this is all objects that
# have been modified after the pack time or are referenced
# from objects that have been modified after the pack time.
#
# - In history free *with* gc, this is all objects that have
# been modified after the pack time.
# XXX: It seems like a lot of what TreeMarker does could actually
# be done in the database, especially if we have support for
# recursive common table expressions; if we don't, we can still do more of it
# in the DB, it will just take more queries and some temp tables.
logger.info("pre_pack: traversing the object graph "
"to find reachable objects.")
with self._make_ss_load_cursor(load_connection) as ss_load_cursor:
stmt = """
SELECT zoid
FROM pack_object
WHERE keep = %(TRUE)s
"""
self.runner.run_script_stmt(ss_load_cursor, stmt)
# XXX: This used to use a loop around fetchmany(), but
# that shouldn't be needed with server-side cursor.
marker.mark(oid for (oid,) in ss_load_cursor)
marker.free_refs()
# Upload the TreeMarker results to the database.
# TODO: It probably makes more sense to mark *unreachable* objects?
# There should generally be fewer of them than reachable objects
# if the database is regularly GC'd.
logger.info(
"pre_pack: marking objects reachable: %d",
marker.reachable_count)
store_batcher = self._make_store_batcher(store_connection)
update_stmt = str(Schema.pack_object.update(
keep=True, visited=True
).bind(self))
assert update_stmt.startswith("UPDATE")
log_at = [perf_counter() + self.fill_object_refs_commit_frequency]
def batch_done_callback(total_count):
store_connection.commit()
if perf_counter() >= log_at[0]:
logger.debug(
"pre_pack: marked %d/%d objects reachable",
total_count, marker.reachable_count
)
log_at[0] = perf_counter() + self.fill_object_refs_commit_frequency
num_rows_sent_to_db = store_batcher.update_set_static(
update_stmt,
batch_done_callback=batch_done_callback,
zoid=marker.reachable
)
assert num_rows_sent_to_db == marker.reachable_count
__check_refs_script = """
SELECT zoid, to_zoid
FROM pack_object
INNER JOIN object_ref USING (zoid)
WHERE keep = %(TRUE)s
AND NOT EXISTS (
SELECT 1
FROM object_state
WHERE object_state.zoid = to_zoid
AND object_state.state IS NOT NULL
AND object_state.tid = (
SELECT MAX(tid)
FROM object_state
WHERE object_state.zoid = object_ref.to_zoid
)
)
"""
[docs]
def check_refs(self, pack_tid):
"""
Are there any objects we're *not* going to garbage collect that
point to an object that doesn't exist?
Note that this goes only one level deep. A whole subtree of objects
may be removed by a subsequent pack if the only reference them was from
a missing object.
Logs a warning for each discovered broken reference.
Returns a true object if there were broken references, a false
object otherwise.
"""
load_connection = LoadConnection(self.connmanager)
try:
with _Progress('execute') as progress:
with self._make_ss_load_cursor(load_connection) as ss_load_cursor:
stmt = self.__check_refs_script
self.runner.run_script_stmt(ss_load_cursor, stmt)
progress.mark('download')
missing = []
for zoid, to_zoid in ss_load_cursor:
logger.warning(
'check_refs: object %d references missing object %d',
zoid, to_zoid
)
missing.append((zoid, to_zoid))
logger.info(
'check_refs: Found %d broken references '
"(memory delta: %s, query time: %.2f; download time: %.2f)",
len(missing),
progress.total_memory_delta_display,
progress.phase_duration('execute'),
progress.phase_duration('download')
)
return missing
finally:
load_connection.drop()
# The only things to worry about are object_state and blob_chuck
# and, in history-preserving, transaction. blob chunks are deleted
# automatically by a foreign key; transaction we'll handle with a
# pack. (We don't do anything with current_object; a state of NULL
# represents a deleted object; it shouldn't be reachable anyway
# and will be packed away next time we pack (without GC))
# We shouldn't *have* to verify the oldserial in the delete statement,
# because our only consumer is zc.zodbdgc which only calls us for
# unreachable objects, so they shouldn't be modified and get a new
# TID. But it's safer to do so.
_script_delete_object = None
def deleteObject(self, cursor, oid, oldserial):
params = {'oid': u64(oid), 'tid': u64(oldserial)}
self.runner.run_script_stmt(
cursor,
self._script_delete_object,
params)
return cursor.rowcount
[docs]
def on_filling_object_refs_added(self, oids=None, tids=None):
"""Test injection point for packing."""
[docs]
@implementer(IPackUndo)
class HistoryPreservingPackUndo(PackUndo):
"""
History-preserving pack/undo.
"""
keep_history = True
_choose_pack_transaction_query = Schema.transaction.select(
it.c.tid
).where(
it.c.tid > 0
).and_(
it.c.tid <= it.bindparam('tid')
).and_(
it.c.packed == False # pylint:disable=singleton-comparison
).order_by(
it.c.tid, 'DESC'
).limit(1)
_script_create_temp_pack_visit = """
CREATE TEMPORARY TABLE temp_pack_visit (
zoid BIGINT NOT NULL PRIMARY KEY,
keep_tid BIGINT NOT NULL
);
CREATE INDEX temp_pack_keep_tid ON temp_pack_visit (keep_tid)
"""
_script_create_temp_undo = """
CREATE TEMPORARY TABLE temp_undo (
zoid BIGINT NOT NULL,
prev_tid BIGINT NOT NULL
);
CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
"""
_script_reset_temp_undo = "DROP TABLE temp_undo"
_choose_pack_tid_query = Schema.pack_object.select(
it.c.keep_tid
).order_by(
it.c.keep_tid, 'DESC'
).limit(1)
_transaction_has_data_query = Schema.object_state.select(
1
).where(
it.c.tid == it.bindparam('tid')
).limit(1)
_script_pack_current_object = """
DELETE FROM current_object
WHERE tid = %(tid)s
AND zoid in (
SELECT pack_state.zoid
FROM pack_state
WHERE pack_state.tid = %(tid)s
%(INNER_ORDER_BY)s
)
"""
_script_pack_object_state = """
DELETE FROM object_state
WHERE tid = %(tid)s
AND zoid in (
SELECT pack_state.zoid
FROM pack_state
WHERE pack_state.tid = %(tid)s
%(INNER_ORDER_BY)s
)
"""
_script_pack_object_ref = """
DELETE FROM object_refs_added
WHERE tid IN (
SELECT tid
FROM "transaction"
WHERE is_empty = %(TRUE)s
);
DELETE FROM object_ref
WHERE tid IN (
SELECT tid
FROM "transaction"
WHERE is_empty = %(TRUE)s
);
"""
# Previously we used `= ANY(ARRAY(...))`, as was once recommended,
# (See http://www.postgres.cz/index.php/PostgreSQL_SQL_Tricks#Fast_first_n_rows_removing)
# but that is no longer recommended or expected to be faster.
# Also, it was postgres specific. Now we use a more standard syntax,
# that lets us preserve order (in case that matters).
delete_empty_transactions_batch_size = get_positive_integer_from_environ(
'RS_PACK_HP_DELETE_BATCH_SIZE',
1000,
logger=logger
)
_delete_empty_transactions_batch_query = Schema.transaction.delete(
).where(
it.c.tid.in_(Schema.transaction.select(
it.c.tid
).where(
it.c.packed == True # pylint:disable=singleton-comparison.
).and_(
it.c.is_empty == True # pylint:disable=singleton-comparison.
).order_by(
it.c.tid
).limit(delete_empty_transactions_batch_size))
)
_script_delete_object = """
SELECT 1
FROM object_state
WHERE zoid = %(oid)s
AND tid = %(tid)s
"""
_is_packed_tx_query = Schema.transaction.select(
1
).where(
Schema.transaction.c.tid == Schema.transaction.bindparam('undo_tid')
).and_(
Schema.transaction.c.packed == False # pylint:disable=singleton-comparison
)
_is_root_creation_tx_query = Schema.object_state.select(
1
).where(
Schema.object_state.c.tid == Schema.object_state.bindparam('undo_tid')
).and_(
Schema.object_state.c.zoid == 0
).and_(
Schema.object_state.c.prev_tid == 0
)
[docs]
@metricmethod
def verify_undoable(self, cursor, undo_tid):
"""Raise UndoError if it is not safe to undo the specified txn."""
self._is_packed_tx_query.execute(cursor, {'undo_tid': undo_tid})
if not cursor.fetchall():
raise UndoError("Transaction not found or packed")
# Rule: we can undo an object if the object's state in the
# transaction to undo matches the object's current state. If
# any object in the transaction does not fit that rule, refuse
# to undo. In theory this means arbitrary transactions can be
# undone (because we actually match the MD5 of the state); in practice it
# means that it must be the most recent transaction those
# objects were involved in.
# (Note that this prevents conflict-resolving undo as described
# by ZODB.tests.ConflictResolution.ConflictResolvingTransUndoStorage.
# Do people need that? If so, we can probably support it, but it
# will require additional code.)
stmt = """
SELECT prev_os.zoid, current_object.tid
FROM object_state prev_os
INNER JOIN object_state cur_os
ON (prev_os.zoid = cur_os.zoid)
INNER JOIN current_object
ON (cur_os.zoid = current_object.zoid
AND cur_os.tid = current_object.tid)
WHERE prev_os.tid = %s
AND cur_os.md5 != prev_os.md5
ORDER BY prev_os.zoid
"""
self.runner.run_script_stmt(cursor, stmt, (undo_tid,))
if cursor.fetchmany():
raise UndoError(
"Some data were modified by a later transaction")
# Rule: don't allow the creation of the root object to
# be undone. It's hard to get it back.
self._is_root_creation_tx_query.execute(cursor, {'undo_tid': undo_tid})
if cursor.fetchall():
raise UndoError("Can't undo the creation of the root object")
[docs]
@metricmethod
def undo(self, cursor, undo_tid, self_tid):
"""Undo a transaction.
Parameters: "undo_tid", the integer tid of the transaction to undo,
and "self_tid", the integer tid of the current transaction.
Returns the states copied forward by the undo operation as a
list of (oid, old_tid).
"""
stmt = self._script_create_temp_undo
if stmt:
self.runner.run_script(cursor, stmt)
stmt = """
DELETE FROM temp_undo;
-- Put into temp_undo the list of objects to be undone and
-- the tid of the transaction that has the undone state.
INSERT INTO temp_undo (zoid, prev_tid)
SELECT zoid, prev_tid
FROM object_state
WHERE tid = %(undo_tid)s
ORDER BY zoid;
-- Override previous undo operations within this transaction
-- by resetting the current_object pointer and deleting
-- copied states from object_state.
UPDATE current_object
SET tid = (
SELECT prev_tid
FROM object_state
WHERE zoid = current_object.zoid
AND tid = %(self_tid)s
)
WHERE zoid IN (SELECT zoid FROM temp_undo %(INNER_ORDER_BY)s)
AND tid = %(self_tid)s;
DELETE FROM object_state
WHERE zoid IN (SELECT zoid FROM temp_undo %(INNER_ORDER_BY)s)
AND tid = %(self_tid)s;
-- Copy old states forward.
INSERT INTO object_state (zoid, tid, prev_tid, md5, state_size, state)
SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
md5, COALESCE(state_size, 0), state
FROM temp_undo
INNER JOIN current_object ON (temp_undo.zoid = current_object.zoid)
LEFT OUTER JOIN object_state
ON (object_state.zoid = temp_undo.zoid
AND object_state.tid = temp_undo.prev_tid)
ORDER BY current_object.zoid;
-- Copy old blob chunks forward.
INSERT INTO blob_chunk (zoid, tid, chunk_num, chunk)
SELECT temp_undo.zoid, %(self_tid)s, chunk_num, chunk
FROM temp_undo
JOIN blob_chunk
ON (blob_chunk.zoid = temp_undo.zoid
AND blob_chunk.tid = temp_undo.prev_tid);
-- List the copied states.
SELECT zoid, prev_tid
FROM temp_undo;
"""
self.runner.run_script(cursor, stmt,
{'undo_tid': undo_tid, 'self_tid': self_tid})
res = list(cursor)
stmt = self._script_reset_temp_undo
if stmt:
self.runner.run_script(cursor, stmt)
return res
[docs]
def fill_object_refs(self, load_connection, store_connection, get_references):
"""Update the object_refs table by analyzing new transactions."""
with self._make_ss_load_cursor(load_connection) as ss_load_cursor:
stmt = """
SELECT DISTINCT tx.tid
FROM "transaction" tx
LEFT OUTER JOIN object_refs_added
ON (tx.tid = object_refs_added.tid)
WHERE object_refs_added.tid IS NULL
ORDER BY tx.tid
"""
self.runner.run_script_stmt(ss_load_cursor, stmt)
tids = OidList((tid for (tid,) in ss_load_cursor))
log_at = perf_counter() + self.fill_object_refs_commit_frequency
tid_count = len(tids)
txns_done = 0
self.on_filling_object_refs_added(tids=tids)
logger.info(
"pre_pack: analyzing references from objects in %d new "
"transaction(s)", tid_count)
store_batcher = self._make_store_batcher(store_connection)
for tid in tids:
self._add_refs_for_tid(load_connection, store_batcher, tid, get_references)
txns_done += 1
now = perf_counter()
if now >= log_at:
# save the work done so far
store_batcher.flush()
store_connection.commit()
log_at = now + self.fill_object_refs_commit_frequency
logger.info(
"pre_pack: transactions analyzed: %d/%d",
txns_done, tid_count)
store_batcher.flush()
store_connection.commit()
logger.info("pre_pack: transactions analyzed: %d/%d", txns_done, tid_count)
_get_objects_in_transaction_query = Schema.object_state.select(
it.c.zoid,
it.c.state
).where(
it.c.tid == it.bindparam('tid')
).order_by(it.c.zoid).prepared()
def _add_refs_for_tid(self, load_connection, store_batcher, tid, get_references):
"""Fill object_refs with all states for a transaction.
Returns the number of references added.
"""
logger.debug("pre_pack: transaction %d: computing references ", tid)
from_count = 0
# XXX: We're not using a server-side cursor here, so
# we could be fetching arbitrarily large amounts of data.
self._get_objects_in_transaction_query.execute(load_connection.cursor,
{'tid': tid})
# A previous pre-pack may have been interrupted. Delete rows
# from the interrupted attempt. Recall that the batcher always executes
# deletes before inserts.
store_batcher.delete_from('object_ref', tid=tid)
object_ref_schema = store_batcher.row_schema_of_length(3)
object_refs_added_schema = store_batcher.row_schema_of_length(1)
num_refs_found = 0
for from_oid, state in load_connection.cursor:
state = self.driver.binary_column_as_state_type(state)
if state:
assert isinstance(state, self.driver.state_types), type(state)
from_count += 1
try:
to_oids = get_references(state)
except:
logger.exception(
"pre_pack: can't unpickle "
"object %d in transaction %d; state length = %d",
from_oid, tid, len(state))
raise
for to_oid in to_oids:
row = (from_oid, tid, to_oid)
num_refs_found += 1
store_batcher.insert_into(
'object_ref (zoid, tid, to_zoid)',
object_ref_schema,
row,
row,
size=3
)
# The references have been computed for this transaction
store_batcher.insert_into(
'object_refs_added (tid)',
object_refs_added_schema,
(tid,),
(tid,),
size=1,
)
logger.debug("pre_pack: transaction %d: has %d reference(s) "
"from %d object(s)", tid, num_refs_found, from_count)
return num_refs_found
[docs]
@metricmethod
def pre_pack(self, pack_tid, get_references):
"""Decide what to pack.
pack_tid specifies the most recent transaction to pack.
get_references is a function that accepts a pickled state and
returns a set of OIDs that state refers to.
The self.options.pack_gc flag indicates whether
to run garbage collection.
If pack_gc is false, at least one revision of every object is kept,
even if nothing refers to it. Packing with pack_gc disabled can be
much faster.
"""
load_connection = LoadConnection(self.connmanager)
store_connection = PrePackConnection(self.connmanager)
try:
# The pre-pack functions are responsible for managing
# their own commits; when they return, the transaction
# should be committed.
#
# ``pack_object`` should be populated,
# essentially with the distinct list of all objects and their
# maximum (newest) transaction ids.
if self.options.pack_gc:
logger.info("pre_pack: start with gc enabled")
self._pre_pack_with_gc(
load_connection, store_connection, pack_tid, get_references)
else:
logger.info("pre_pack: start without gc")
self._pre_pack_without_gc(
load_connection, store_connection, pack_tid)
logger.info("pre_pack: enumerating states to pack")
cursor = store_connection.cursor
stmt = "%(TRUNCATE)s pack_state"
self.runner.run_script_stmt(cursor, stmt)
to_remove = 0
if self.options.pack_gc:
# Mark all objects we said not to keep as something
# we should discard.
stmt = """
INSERT INTO pack_state (tid, zoid)
SELECT tid, zoid
FROM object_state
INNER JOIN pack_object USING (zoid)
WHERE keep = %(FALSE)s
AND tid > 0
AND tid <= %(pack_tid)s
ORDER BY zoid
"""
self.runner.run_script_stmt(
cursor, stmt, {'pack_tid': pack_tid})
to_remove += cursor.rowcount
else:
# Support for IExternalGC. Also remove deleted objects.
stmt = """
INSERT INTO pack_state (tid, zoid)
SELECT t.tid, t.zoid
FROM (
SELECT zoid, tid
FROM object_state
WHERE state IS NULL
AND tid = (
SELECT MAX(i.tid)
FROM object_state i
WHERE i.zoid = object_state.zoid
)
) t
"""
self.runner.run_script_stmt(cursor, stmt)
to_remove += cursor.rowcount
# Pack object states with the keep flag set to true,
# excluding their current TID.
stmt = """
INSERT INTO pack_state (tid, zoid)
SELECT tid, zoid
FROM object_state
INNER JOIN pack_object USING (zoid)
WHERE keep = %(TRUE)s
AND tid > 0
AND tid != keep_tid
AND tid <= %(pack_tid)s
ORDER BY zoid
"""
self.runner.run_script_stmt(
cursor, stmt, {'pack_tid': pack_tid})
to_remove += cursor.rowcount
# Make a simple summary of the transactions to examine.
logger.info("pre_pack: enumerating transactions to pack")
stmt = "%(TRUNCATE)s pack_state_tid"
self.runner.run_script_stmt(cursor, stmt)
stmt = """
INSERT INTO pack_state_tid (tid)
SELECT DISTINCT tid
FROM pack_state
"""
cursor.execute(stmt)
logger.info("pre_pack: will remove %d object state(s)",
to_remove)
logger.info("pre_pack: finished successfully")
store_connection.commit()
except:
store_connection.rollback_quietly()
raise
finally:
store_connection.drop()
load_connection.drop()
def __initial_populate_pack_object(self, load_connection, store_connection,
pack_tid, keep):
"""
Put all objects into ``pack_object`` that have revisions equal
to or below *pack_tid*, setting their initial ``keep`` status
to *keep*.
Commits the transaction to release locks.
"""
# Access the tables that are used by online transactions
# in a short transaction and immediately commit to release any
# locks.
# TRUNCATE may or may not cause implicit commits. (MySQL: Yes,
# PostgreSQL: No)
self.runner.run_script(store_connection.cursor, "%(TRUNCATE)s pack_object;")
affected_objects = """
SELECT zoid, tid
FROM object_state
WHERE tid > 0 AND tid <= %(pack_tid)s
ORDER BY zoid
"""
# Take the locks we need up front, in order, because
# locking in a subquery doing an INSERT isn't guaranteed to use that
# order (deadlocks seen with commits on MySQL 5.7 without this,
# when using REPEATABLE READ.)
#
# We must do this on its own, because some drivers (notably
# mysql-connector-python) get very upset
# ("mysql.connector.errors.InternalError: Unread result
# found") if you issue a SELECT that you don't then consume.
#
# Since we switched MySQL back to READ COMMITTED (what PostgreSQL uses)
# I haven't been able to produce the error anymore. So don't explicitly lock.
stmt = """
INSERT INTO pack_object (zoid, keep, keep_tid)
SELECT zoid, """ + ('%(TRUE)s' if keep else '%(FALSE)s') + """, MAX(tid)
FROM ( """ + affected_objects + """ ) t
GROUP BY zoid;
-- Keep the root object.
UPDATE pack_object
SET keep = %(TRUE)s
WHERE zoid = 0;
"""
self.runner.run_script(store_connection.cursor, stmt, {'pack_tid': pack_tid})
store_connection.commit()
def _pre_pack_without_gc(self, conn, cursor, pack_tid):
"""
Determine what to pack, without garbage collection.
With garbage collection disabled, there is no need to follow
object references.
"""
# Fill the pack_object table with OIDs, but configure them
# all to be kept by setting keep to true.
logger.debug("pre_pack: populating pack_object")
self.__initial_populate_pack_object(conn, cursor, pack_tid, keep=True)
def _pre_pack_with_gc(self, load_connection, store_connection,
pack_tid, get_references):
"""
Determine what to pack, with garbage collection.
"""
stmt = self._script_create_temp_pack_visit
if stmt:
self.runner.run_script(store_connection.cursor, stmt)
self.fill_object_refs(load_connection, store_connection, get_references)
logger.info("pre_pack: filling the pack_object table")
# Fill the pack_object table with OIDs that either will be
# removed (if nothing references the OID) or whose history will
# be cut.
self.__initial_populate_pack_object(load_connection, store_connection,
pack_tid, keep=False)
stmt = """
-- Keep objects that have been revised since pack_tid.
-- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
INSERT INTO temp_pack_visit (zoid, keep_tid)
SELECT zoid, 0
FROM current_object
WHERE tid > %(pack_tid)s
ORDER BY zoid;
UPDATE pack_object
SET keep = %(TRUE)s
WHERE zoid IN (
SELECT zoid
FROM temp_pack_visit
);
%(TRUNCATE)s temp_pack_visit;
-- Keep objects that are still referenced by object states in
-- transactions that will not be packed.
-- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
INSERT INTO temp_pack_visit (zoid, keep_tid)
SELECT DISTINCT to_zoid, 0
FROM object_ref
WHERE tid > %(pack_tid)s;
UPDATE pack_object
SET keep = %(TRUE)s
WHERE zoid IN (
SELECT zoid
FROM temp_pack_visit
);
%(TRUNCATE)s temp_pack_visit;
"""
self.runner.run_script(store_connection.cursor, stmt, {'pack_tid': pack_tid})
# Traverse the graph, setting the 'keep' flags in pack_object
self._traverse_graph(load_connection, store_connection)
store_connection.commit()
def _find_pack_tid(self):
"""If pack was not completed, find our pack tid again"""
conn, cursor = self.connmanager.open_for_pre_pack()
try:
stmt = self._choose_pack_tid_query
stmt.execute(cursor)
res = [tid for (tid,) in cursor]
finally:
self.connmanager.close(conn, cursor)
return res[0] if res else 0
[docs]
@metricmethod
def pack(self, pack_tid, packed_func=None):
"""Pack. Requires the information provided by pre_pack."""
# pylint:disable=too-many-locals,too-complex
# Read committed mode is sufficient.
store_connection = StoreConnection(self.connmanager)
try: # pylint:disable=too-many-nested-blocks
try:
# If we have a transaction entry in ``pack_state_tid`` (that is,
# we found a transaction with an object in the range of transactions
# we can pack away) that matches an actual transaction entry (XXX:
# How could we be in the state where the transaction row is gone but we still
# have object_state with that transaction id?), then we need to pack that
# transaction. The presence of an entry in ``pack_state_tid`` means that all
# object states from that transaction should be removed.
stmt = """
SELECT tx.tid,
CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
CASE WHEN pack_state_tid.tid IS NOT NULL THEN 1 ELSE 0 END
FROM "transaction" tx
LEFT OUTER JOIN pack_state_tid ON (tx.tid = pack_state_tid.tid)
WHERE tx.tid > 0
AND tx.tid <= %(pack_tid)s
AND (packed = %(FALSE)s OR pack_state_tid.tid IS NOT NULL)
ORDER BY tx.tid
"""
self.runner.run_script_stmt(
store_connection.cursor, stmt, {'pack_tid': pack_tid})
tid_rows = list(store_connection.cursor) # oldest first, sorted in SQL
total = len(tid_rows)
logger.info("pack: will pack %d transaction(s)", total)
stmt = self._script_create_temp_pack_visit
if stmt:
self.runner.run_script(store_connection.cursor, stmt)
# Lock and delete rows in the same order that
# new commits would in order to prevent deadlocks.
# Pack in small batches of transactions only after we are able
# to obtain a commit lock in order to minimize the
# interruption of concurrent write operations.
start = perf_counter()
packed_list = []
counter, lastreport, statecounter = 0, 0, 0
# We'll report on progress in at most .1% step increments
reportstep = max(total / 1000, 1)
for tid, packed, has_removable in tid_rows:
self._pack_transaction(
store_connection.cursor, pack_tid, tid, packed, has_removable,
packed_list)
counter += 1
if perf_counter() >= start + self.options.pack_batch_timeout:
store_connection.commit()
if packed_func is not None:
for poid, ptid in packed_list:
packed_func(poid, ptid)
statecounter += len(packed_list)
if counter >= lastreport + reportstep:
logger.info("pack: packed %d (%.1f%%) transaction(s), "
"affecting %d states",
counter, counter / float(total) * 100,
statecounter)
lastreport = counter / reportstep * reportstep
del packed_list[:]
start = perf_counter()
if packed_func is not None:
for oid, tid in packed_list:
packed_func(oid, tid)
packed_list = None
self._pack_cleanup(store_connection)
except:
logger.exception("pack: failed")
store_connection.rollback_quietly()
raise
logger.info("pack: finished successfully")
store_connection.commit()
finally:
store_connection.drop()
def _pack_transaction(self, cursor, pack_tid, tid, packed,
has_removable, packed_list):
"""
Pack one transaction. Requires populated pack tables.
If *has_removable* is true, then we have object states and current
object pointers to remove.
"""
logger.debug("pack: transaction %d: packing", tid)
removed_objects = 0
removed_states = 0
if has_removable:
stmt = self._script_pack_current_object
self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
removed_objects = cursor.rowcount
stmt = self._script_pack_object_state
self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
removed_states = cursor.rowcount
# Terminate prev_tid chains
stmt = """
UPDATE object_state SET prev_tid = 0
WHERE prev_tid = %(tid)s
AND tid <= %(pack_tid)s
"""
self.runner.run_script_stmt(cursor, stmt,
{'pack_tid': pack_tid, 'tid': tid})
stmt = """
SELECT pack_state.zoid
FROM pack_state
WHERE pack_state.tid = %(tid)s
"""
self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
for (oid,) in cursor:
packed_list.append((oid, tid))
# Find out whether the transaction is empty
stmt = self._transaction_has_data_query
stmt.execute(cursor, {'tid': tid})
empty = not list(cursor)
# mark the transaction packed and possibly empty
if empty:
clause = 'is_empty = %(TRUE)s'
state = 'empty'
else:
clause = 'is_empty = %(FALSE)s'
state = 'not empty'
stmt = 'UPDATE "transaction" SET packed = %(TRUE)s, ' + clause
stmt += " WHERE tid = %(tid)s"
self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
logger.debug(
"pack: transaction %d (%s): removed %d object(s) and %d state(s)",
tid, state, removed_objects, removed_states)
def _pack_cleanup(self, store_connection):
"""Remove unneeded table rows after packing"""
# commit the work done so far, releasing row-level locks.
store_connection.commit()
logger.info("pack: cleaning up")
# This section does not need to hold the commit lock, as it only
# touches pack-specific tables. We already hold a pack lock for that.
logger.debug("pack: removing unused object references")
stmt = self._script_pack_object_ref
self.runner.run_script(store_connection.cursor, stmt)
# In the past, we needed an exclusive commit lock to
# touch the transaction table and remove empty transactions.
# For that reason, we explicitly batched it in 1000 rows.
# That's no longer the case.
# XXX: Stop batching.
logger.debug("pack: removing empty packed transactions")
while True:
stmt = self._delete_empty_transactions_batch_query
stmt.execute(store_connection.cursor)
deleted = store_connection.cursor.rowcount
store_connection.commit()
if deleted < self.delete_empty_transactions_batch_size:
# Last set of deletions complete
break
logger.debug("pack: clearing temporary pack state")
for _table in ('pack_object', 'pack_state', 'pack_state_tid'):
stmt = '%(TRUNCATE)s ' + _table
self.runner.run_script_stmt(store_connection.cursor, stmt)
store_connection.commit()
[docs]
@implementer(IPackUndo)
class HistoryFreePackUndo(PackUndo):
"""
History-free pack/undo.
"""
keep_history = False
_choose_pack_transaction_query = Schema.object_state.select(
it.c.tid
).where(
it.c.tid > 0
).and_(
it.c.tid <= it.bindparam('tid')
).order_by(
it.c.tid, 'DESC'
).limit(1)
# history-free packing doesn't use temp_pack_visit, unlike
# history-preserving.
# _script_create_temp_pack_visit = """
# CREATE TEMPORARY TABLE temp_pack_visit (
# zoid BIGINT NOT NULL PRIMARY KEY,
# keep_tid BIGINT NOT NULL
# );
# CREATE INDEX temp_pack_keep_tid ON temp_pack_visit (keep_tid)
# """
# Used for generic deleteObject() calls (e.g., zc.zodbdgc).
# We include the TID here for extra safety, but we don't
# in our native pack.
_script_delete_object = """
DELETE FROM object_state
WHERE zoid = %(oid)s
and tid = %(tid)s
"""
[docs]
def on_fill_object_ref_batch(self, oid_batch, num_refs_found):
"""Hook for testing."""
[docs]
def verify_undoable(self, cursor, undo_tid):
"""Raise UndoError if it is not safe to undo the specified txn."""
raise UndoError("Undo is not supported by this storage")
[docs]
def undo(self, cursor, undo_tid, self_tid):
"""Undo a transaction.
Parameters: "undo_tid", the integer tid of the transaction to undo,
and "self_tid", the integer tid of the current transaction.
Returns the list of OIDs undone.
"""
raise UndoError("Undo is not supported by this storage")
[docs]
def fill_object_refs(self, load_connection, store_connection, get_references):
"""
Update the object_refs table by analyzing new object states.
See :meth:`pre_pack` for a description of the parameters.
Because *load_connection* is read-only and repeatable read,
we don't need to do any object-level locking.
"""
# pylint:disable=too-many-locals
# Begin by ensuring we have a snapshot reflecting anything
# committed up to this point, including the contents of
# ``pack_object``, which determines the visible objects
# we will examine.
load_connection.restart()
mem_begin = get_memory_usage()
logger.debug("pre_pack: Collecting objects to examine.")
# Recall pre_pack can be run many times, and by default
# ``object_refs_added`` is kept after a pack run to speed it
# up next time.
#
# Ordering should be immaterial as we are in a read-only snapshot view
# of the database; we shouldn't run into locking issues with other
# transactions. However, if objects are stored by OID on disk, which
# is likely, then ordering by OID may reduce the total amount of disk seeks
# done; however it makes this initial query much slower, so we
# do it in Python.
with self._make_ss_load_cursor(load_connection) as ss_load_cursor:
stmt = """
SELECT zoid
FROM pack_object
INNER JOIN object_state USING (zoid)
LEFT OUTER JOIN object_refs_added
USING (zoid)
WHERE object_refs_added.tid IS NULL
OR object_refs_added.tid != object_state.tid
"""
with _Progress('execute') as progress:
ss_load_cursor.execute(stmt)
progress.mark('download')
oids = OidList((row[0] for row in ss_load_cursor))
progress.mark('sort')
try:
# If we're using a list.
oids.sort()
except AttributeError:
oids = OidList(sorted(oids))
# If we're storing in an array.array, going to/from the list
# and doing the sort of 30MM rows takes 16s on my machine; given that the query
# itself takes 4 minutes (but much longer than that if
# we do it on the server; are indexes missing?), that's fine.
logger.debug(
"pre_pack: Downloaded objects to examine "
"(memory delta: %s; download time: %.2f; sort time %.2f)",
progress.total_memory_delta_display,
progress.phase_duration('download'),
progress.phase_duration('sort'),
)
log_at = perf_counter() + self.fill_object_refs_commit_frequency
self.on_filling_object_refs_added(oids=oids)
load_batcher = self._make_load_batcher(load_connection)
store_batcher = self._make_store_batcher(store_connection)
oid_count = len(oids)
oids_done = 0
num_refs_found = 0
# Against 30 MM rows with MySQL on Python 2.7 with the
# server-side cursor on mysqlclient, the SELECT takes
# negligible time and memory; transforming into the
# array.array and pulling rows takes 5 minutes and a total of
# 231MB with a batch size of 1024; that's about the resident
# size of the process, too. Using a fetch size of 10000 didn't reduce
# the time substantially though curiously it did report just 165.9MB
# memory delta.
#
# Previously, using a list and a buffered cursor for the same setup,
# the SELECT takes 3.5 minutes and a memory delta of 2.5GB; transforming into
# the list takes a few more seconds and a final delta of 3GB. When the cursor
# is closed, the resident size of the process shrinks to around 1.2GB.
logger.info(
"pre_pack: analyzing references from %d object(s) (memory delta: %s)",
oid_count, byte_display(get_memory_usage() - mem_begin))
while oids_done < oid_count:
# Previously, we iterated like this:
#
# while oids:
# batch = oids[:batch_size]
# oids = oids[batch_size:]
#
# However, that turns into O(n^2) operations with a large
# overhead, especially on CPython. Each slice operation
# allocates a new list, and copies into it (including
# INCREF). Even with the O(n^2) algorithm,
# array.array('Q') benchmarks ~5x faster for these two
# operations, probably because it's just memory movements,
# not loops that have to INCREF/DECREF.
#
# A simple profile while this is running with 30 MM rows
# shows at least 37% of the time spent in the C
# ``list_slice`` function and 31% in ``list_dealloc``,
# averaging about 15,000 objects per minute.
#
# Switching just to array.array and leaving the slicing, I
# was getting 44,000 objects per minute, but 99% time
# spent in memmove().
#
# Using manual indexing of arrays, CPU usage of less than
# 35%; for the first time, 35% of profile time is spent in
# talking to MySQL (over gigabit switch); clearly parallel
# pre-fetching would be useful.
batch = oids[oids_done:oids_done + self.fill_object_refs_batch_size]
oids_done += len(batch)
refs_found = self._add_refs_for_oids(load_batcher, store_batcher,
batch, get_references)
num_refs_found += refs_found
self.on_fill_object_ref_batch(oid_batch=batch, num_refs_found=refs_found)
now = perf_counter()
if now >= log_at:
# Save the work done so far.
store_batcher.flush()
store_connection.commit()
log_at = now + self.fill_object_refs_commit_frequency
logger.info(
"pre_pack: objects analyzed: %d/%d (%d total references)",
oids_done, oid_count, num_refs_found)
# Those 30MM objects wound up with about 48,976,835 references.
store_batcher.flush()
store_connection.commit()
logger.info(
"pre_pack: objects analyzed: %d/%d", oids_done, oid_count)
def _add_refs_for_oids(self, load_batcher, store_batcher,
oids, get_references):
"""
Fill object_refs with the states for some objects.
Returns the number of references added.
"""
# oids should be a slice of an ``OidList``, which may be an
# ``array.array``; those are relatively slow to iterate.
# The batcher always does deletes before inserts, which is
# exactly what we want.
# In the past, we performed all deletes and then all inserts;
# now, things to batching, they could be interleaved, but
# because we process OID-by-OID, that should be fine.
# In the past, we also DELETED from object_refs_added and object_ref
# everything found in the ``oids`` parameter; now we only do a delete if
# we get back a row from object_state; again, that shouldn't matter, rows
# should be found in object_state.
object_ref_schema = store_batcher.row_schema_of_length(3)
object_refs_added_schema = store_batcher.row_schema_of_length(2)
# Use the batcher to get efficient ``= ANY()``
# queries, but go ahead and collect into a list at once
rows = list(load_batcher.select_from(
('zoid', 'tid', 'state'),
'object_state',
suffix=' ORDER BY zoid ',
zoid=oids
))
num_refs_found = 0
for from_oid, tid, state in rows:
state = self.driver.binary_column_as_state_type(state)
row = (from_oid, tid)
store_batcher.insert_into(
'object_refs_added (zoid, tid)',
object_refs_added_schema,
row,
row,
size=2
)
store_batcher.delete_from(
'object_refs_added',
zoid=from_oid
)
store_batcher.delete_from(
'object_ref',
zoid=from_oid
)
if state:
try:
to_oids = get_references(state)
except:
logger.exception(
"pre_pack: can't unpickle "
"object %d in transaction %d; state length = %d",
from_oid, tid, len(state)
)
raise
for to_oid in to_oids:
row = (from_oid, tid, to_oid)
num_refs_found += 1
store_batcher.insert_into(
'object_ref (zoid, tid, to_zoid)',
object_ref_schema,
row,
row,
size=3
)
return num_refs_found
[docs]
@metricmethod
def pre_pack(self, pack_tid, get_references):
"""
Decide what the garbage collector should delete.
Objects created or modified after pack_tid will not be garbage
collected.
get_references is a function that accepts a pickled state and
returns a set of OIDs that state refers to.
The self.options.pack_gc flag indicates whether to run garbage
collection. If pack_gc is false, this method does nothing.
"""
if not self.options.pack_gc:
logger.warning("pre_pack: garbage collection is disabled on a "
"history-free storage, so doing nothing")
return
load_connection = LoadConnection(self.connmanager)
store_connection = PrePackConnection(self.connmanager)
try:
try:
self._pre_pack_main(load_connection, store_connection,
pack_tid, get_references)
except:
logger.exception("pre_pack: failed")
store_connection.rollback_quietly()
raise
store_connection.commit()
logger.info("pre_pack: finished successfully")
finally:
load_connection.drop()
store_connection.drop()
def _pre_pack_main(self, load_connection, store_connection,
pack_tid, get_references):
"""
Determine what to garbage collect.
*load_connection* is a
:class:`relstorage.adapters.connections.LoadConnection`; this
connection is in "snapshot" mode and is used to read a
consistent view of the database. Although this connection is
never committed or rolled back while this method is running
(which may take a long time), because load connections are
declared to be read-only the database engines can make certain
optimizations that reduce the overhead of them (e.g.,
https://dev.mysql.com/doc/refman/5.7/en/innodb-performance-ro-txn.html),
making long-running transactions less problematic. For
example, while packing a 60 million row single MySQL storage
with ``zc.zodbdgc``, a load transaction was open and actively
reading for over 8 hours while the database continued to be
heavily written to without causing any problems.
*store_connection* is a standard read-committed store connection;
it will be periodically committed.
"""
# First, fill the ``pack_object`` table with all known OIDs
# as they currently exist in the database, regardless of
# what the load_connection snapshot can see (which is no later
# and possibly earlier, than what the store connection can see).
#
# Mark things that need to be kept:
# - the root object;
# - anything that has changed since ``pack_tid``;
# Note that we do NOT add items that have been newly added since
# ``pack_tid``; no need to traverse into them, they couldn't possibly
# have a reference to an older object that's not also referenced
# by an object in the snapshot (without the app doing something seriously
# wrong): plus, we didn't find references from that item anyway.
#
# TODO: Copying 30MM objects takes almost 10 minutes (600s)
# against mysql 8 running on an SSD, and heaven forgive you if
# you kill the transaction and roll back --- the undo info is
# insane. What if we CREATE AS SELECT a table? Doing 'CREATE
# TEMPORARY TABLE AS' takes 173s; doing 'CREATE TABLE AS'
# takes 277s.
#
# On PostgreSQL we could use unlogged tables; this is somewhat faster
# in some tests (15 minutes vs 12?)
logger.info("pre_pack: filling the pack_object table")
stmt = """
%(TRUNCATE)s pack_object;
INSERT INTO pack_object (zoid, keep, keep_tid)
SELECT zoid, CASE WHEN tid > %(pack_tid)s THEN %(TRUE)s ELSE %(FALSE)s END, tid
FROM object_state;
-- Also keep the root
UPDATE pack_object
SET keep = %(TRUE)s
WHERE zoid = 0;
"""
self.runner.run_script(store_connection.cursor, stmt, {'pack_tid': pack_tid})
store_connection.commit()
logger.info("pre_pack: Filled the pack_object table")
# Chase down all the references using a consistent snapshot, including
# only the objects that were visible in ``pack_object``.
self.fill_object_refs(load_connection, store_connection, get_references)
# Traverse the graph, setting the 'keep' flags in ``pack_object``
self._traverse_graph(load_connection, store_connection)
def _find_pack_tid(self):
"""If pack was not completed, find our pack tid again"""
# pack (below) ignores its pack_tid argument, so we can safely
# return None here
return None
__find_zoid_to_delete_query = Schema.pack_object.select(
it.c.zoid
).where(
it.c.keep == False # pylint:disable=singleton-comparison
).order_by(
it.c.zoid
)
# This query is used to feed ``packed_func``, which is used
# to normalize the local cache.
__find_zoid_tid_to_delete_query = Schema.pack_object.select(
it.c.zoid, it.c.keep_tid
).where(
it.c.keep == False # pylint:disable=singleton-comparison
).order_by(
it.c.zoid
)
[docs]
@metricmethod
def pack(self, pack_tid, packed_func=None):
"""Run garbage collection.
Requires the information provided by pre_pack.
"""
# pylint:disable=too-many-locals
# Read committed mode is sufficient.
store_connection = StoreConnection(self.connmanager)
try: # pylint:disable=too-many-nested-blocks
try:
# On PostgreSQL, this uses the index
# ``pack_object_keep_false`` So if there's lots of
# garbage, this only touches a small part of the table
# and is surprisingly fast (it doesn't even need to
# sort); between 40s (cached) and 2 minutes (uncached)
# on a pack_object containing 60MM rows.
#
# Attempting to join these results against the
# object_state table and do the delete in one shot
# (60MM rows, half garbage) took more than 24 hours
# against a remote PostgreSQL database (before I killed it), the
# same one that ran the above query in 40s. It appears
# to spend a lot of time checkpointing (each
# checkpoint takes an hour!) Even on a faster
# database, that's unlikely to be suitable for production.
#
# Breaking it into chunks, as below, took about an hour.
logger.debug("pack: Fetching objects to remove.")
with self._make_ss_load_cursor(store_connection) as cursor:
with _Progress('execute') as progress:
self.__find_zoid_to_delete_query.execute(cursor)
progress.mark('download')
to_remove = OidList(row[0] for row in cursor)
# On postgres, with a regular cursor, fetching 32,502,545 objects to remove
# took 56.7s (execute: 50.8s; download 5.8s; memory delta 1474.82 MB);
# The second time took half of that.
# Switching to a server side cursor brought that to
# fetched 32,502,545 objects to remove in
# 41.74s (execute: 0.00s; download 41.74s; memory delta 257.21 MB)
total = len(to_remove)
logger.debug(
"pack: fetched %d objects to remove in %.2fs "
"(execute: %.2fs; download %.2fs; memory delta %s)",
total,
progress.duration,
progress.phase_duration('execute'),
progress.phase_duration('download'),
progress.total_memory_delta_display,
)
logger.info("pack: will remove %d object(s)", total)
# We used to hold the commit lock and do this in small batches,
# but that's not important any longer since RelStorage 3.0
start = perf_counter()
store_batcher = self.locker.make_batcher(store_connection.cursor)
store_batcher.row_limit = max(store_batcher.row_limit, 4096)
removed = 0
def maybe_commit_and_report(force=False):
"""Returns the time of the last log."""
now = perf_counter()
if force or now >= start + self.options.pack_batch_timeout:
store_connection.commit()
if removed and total:
# XXX: storage.copy has a progress logger that could
# easily be generalized to this, and do a better job.
logger.info("pack: removed %d (%.1f%%) state(s)",
removed, removed / total * 100)
else:
logger.info("pack: No objects to remove")
return now
return start
# In the 60mm/30mm garbage postgresql database, removing took about 2.8 hours.
# Most of that times seems to have been from removing blobs.
for oid in to_remove:
# In the past, we used to include the TID in the delete statement,
# but that shouldn't be necessary in a history-free database. It might have been
# there to protect against object resurrection, but that's an application bug.
# This way is faster and lets us take advantage of fast ANY row batching.
removed += store_batcher.delete_from('object_state', zoid=oid)
start = maybe_commit_and_report()
removed += store_batcher.flush()
maybe_commit_and_report(True)
to_remove = None # Drop memory usage
if packed_func is not None:
logger.debug("pack: Passing removed objects and tids to %s.", packed_func)
with self._make_ss_load_cursor(store_connection) as cursor:
with _Progress('execute') as progress:
self.__find_zoid_tid_to_delete_query.execute(cursor)
progress.mark('iterate and call')
# When collecting 32MM objects, this took 18 minutes.
for oid, tid in cursor:
packed_func(oid, tid)
logger.debug(
"pack: Passed %d objects to to function in %.2fs "
"(execute: %.2fs; iterate/call %.2fs; memory delta %s)",
total,
progress.duration,
progress.phase_duration('execute'),
progress.phase_duration('iterate and call'),
progress.total_memory_delta_display,
)
# In a DB that previously had 60MM objects, and collected 32MM,
# on Postgres this phase took 15 minutes
self._pack_cleanup(store_connection)
except:
logger.exception("pack: failed")
store_connection.rollback_quietly()
raise
logger.info("pack: finished successfully")
store_connection.commit()
finally:
store_connection.drop()
def _pack_cleanup(self, store_connection):
# The work done so far must already be committed
logger.info("pack: cleaning up")
# This section does not need to hold the commit lock, as it only
# touches pack-specific tables. We already hold a pack lock for that.
# XXX: Shouldn't we keep this? Unless there's a huge amount of churn,
# older state info might be valid from previous packs. I guess we
# want to avoid backing these things up.
stmt = """
DELETE FROM object_refs_added
WHERE zoid IN (
SELECT zoid
FROM pack_object
WHERE keep = %(FALSE)s
);
DELETE FROM object_ref
WHERE zoid IN (
SELECT zoid
FROM pack_object
WHERE keep = %(FALSE)s
);
%(TRUNCATE)s pack_object
"""
self.runner.run_script(store_connection.cursor, stmt)
class _Progress(object):
def __init__(self, name):
# [name, entry time, entry memory bytes]
self.marks = [[
name,
-1, # Replaced in __enter__
get_memory_usage()
]]
def __enter__(self):
self.marks[0][1] = perf_counter()
return self
def __exit__(self, t, v, tb):
self.mark('COMPLETE')
def mark(self, name):
self.marks.append((
name,
perf_counter(),
get_memory_usage()
))
def phase_duration(self, name):
for i, mark in enumerate(self.marks):
phase_name, _, _ = mark
if name == phase_name:
_, begin, _ = mark
_, end, _ = self.marks[i + 1]
return end - begin
return None
@property
def duration(self):
return self.marks[-1][1] - self.marks[0][1]
@property
def total_memory_delta_display(self):
return byte_display(
self.marks[-1][2] - self.marks[0][2]
)