##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""IObjectMover implementation.
"""
from __future__ import absolute_import, print_function
import io
import os
import struct
from zope.interface import implementer
from ..interfaces import IObjectMover
from ..mover import AbstractObjectMover
from ..mover import RowBatcherStoreTemps
from ..mover import metricmethod_sampled
[docs]
class PostgreSQLRowBatcherStoreTemps(RowBatcherStoreTemps):
generic_suffix = """
ON CONFLICT (zoid) DO UPDATE
SET state = excluded.state,
prev_tid = excluded.prev_tid,
md5 = excluded.md5
"""
[docs]
@implementer(IObjectMover)
class PostgreSQLObjectMover(AbstractObjectMover):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.driver.supports_copy:
batcher = PostgreSQLRowBatcherStoreTemps(self.keep_history,
self.driver.Binary,
self.make_batcher)
self.replace_temps = batcher.replace_temps
self.store_temps = batcher.store_temps
[docs]
@metricmethod_sampled
def on_store_opened(self, cursor, restart=False):
"""Create the temporary tables for storing objects"""
# Note that the md5 column is not used if self.keep_history == False.
# Ideally we wouldn't execute any of these on a restart, but
# I've seen an issue with temp_store apparently going missing on pg8000.
#
# In testing, using 'DELETE ROWS' is faster than using 'DROP TABLE':
# 1230/840ms for zodbshootout add/update with DROP vs 983/681ms.
# The theory was that maybe a vacuum or analyze would be needed after
# DELETE ROWS and not DROP TABLE, but that didn't seem to be true (it's possible
# an ANALYZE would still be helpful before using the temp table, but we
# haven't benchmarked that).
if not restart:
temp_store_table_tmpl = """
CREATE TEMPORARY TABLE IF NOT EXISTS {NAME} (
zoid BIGINT NOT NULL PRIMARY KEY,
prev_tid BIGINT NOT NULL,
md5 CHAR(32),
state BYTEA
) ON COMMIT DELETE ROWS;
"""
ddl_stmts = [
temp_store_table_tmpl.format(NAME='temp_store'),
temp_store_table_tmpl.format(NAME='temp_store_replacements'),
"""
CREATE TEMPORARY TABLE IF NOT EXISTS temp_blob_chunk (
zoid BIGINT NOT NULL,
chunk_num BIGINT NOT NULL,
chunk OID,
PRIMARY KEY (zoid, chunk_num)
) ON COMMIT DELETE ROWS;
""",
"""
-- This trigger removes blobs that get replaced before being
-- moved to blob_chunk. Note that it is never called when
-- the temp_blob_chunk table is being dropped or truncated.
CREATE TRIGGER temp_blob_chunk_delete
BEFORE DELETE ON temp_blob_chunk
FOR EACH ROW
EXECUTE PROCEDURE temp_blob_chunk_delete_trigger();
""",
]
for stmt in ddl_stmts:
cursor.execute(stmt)
cursor.connection.commit()
AbstractObjectMover.on_store_opened(self, cursor, restart)
[docs]
@metricmethod_sampled
def restore(self, cursor, batcher, oid, tid, data):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
"""
if self.keep_history:
suffix = """
ON CONFLICT (zoid, tid) DO UPDATE SET
tid = excluded.tid,
prev_tid = excluded.prev_tid,
md5 = excluded.md5,
state_size = excluded.state_size,
state = excluded.state
"""
else:
suffix = """
ON CONFLICT (zoid) DO UPDATE SET
tid = excluded.tid,
state_size = excluded.state_size,
state = excluded.state
"""
self._generic_restore(batcher, oid, tid, data,
command='INSERT', suffix=suffix)
[docs]
@metricmethod_sampled
def download_blob(self, cursor, oid, tid, filename):
"""Download a blob into a file."""
stmt = """
SELECT chunk
FROM blob_chunk
WHERE zoid = %s
AND tid = %s
ORDER BY chunk_num
"""
# Beginning in RelStorage 3, we no longer chunk blobs.
# All chunks were collapsed into one as part of the migration.
bytecount = 0
cursor.execute(stmt, (oid, tid))
rows = cursor.fetchall()
assert len(rows) == 1
loid, = rows[0]
blob = cursor.connection.lobject(loid, 'rb')
# Use the native psycopg2 blob export functionality
blob.export(filename)
blob.close()
bytecount = os.path.getsize(filename)
return bytecount
[docs]
@metricmethod_sampled
def upload_blob(self, cursor, oid, tid, filename):
"""Upload a blob from a file.
If serial is None, upload to the temporary table.
"""
# pylint:disable=too-many-branches,too-many-locals
if tid is not None:
if self.keep_history:
delete_stmt = """
DELETE FROM blob_chunk
WHERE zoid = %s AND tid = %s
"""
cursor.execute(delete_stmt, (oid, tid))
else:
delete_stmt = "DELETE FROM blob_chunk WHERE zoid = %s"
cursor.execute(delete_stmt, (oid,))
use_tid = True
insert_stmt = """
INSERT INTO blob_chunk (zoid, tid, chunk_num, chunk)
VALUES (%(oid)s, %(tid)s, %(chunk_num)s, %(loid)s)
"""
else:
use_tid = False
delete_stmt = "DELETE FROM temp_blob_chunk WHERE zoid = %s"
cursor.execute(delete_stmt, (oid,))
insert_stmt = """
INSERT INTO temp_blob_chunk (zoid, chunk_num, chunk)
VALUES (%(oid)s, %(chunk_num)s, %(loid)s)
"""
# Since we only run on 9.6 and above, the sizes of large objects
# are allowed to exceed 2GB (int_32). The server is already chunking
# large objects internally by itself into 4KB pages, so there's no
# advantage to us also adding a layer of chunking.
#
# As long as we keep our usage simple, that's fine. Only
# blob.seek(), blob.truncate() and blob.tell() have a need to
# use a specific 64-bit function. `export()` and `import()`
# (called implicitly by creating the lobject with a local
# filename in psycopg2) work with small fixed buffers (8KB) and
# don't care about filesize or offset; they just need the
# `open` and `read` syscalls to handle 64-bit files (and don't
# they have to for Python to handle 64-bit files?)
#
# psycopg2 explicitly uses the 64 family of functions;
# psycopg2cffi does *not* but if it's built on 64-bit
# platform, that's fine. pg8000 uses the SQL interfaces, not
# the libpq interfaces, and that's also fine. Since we don't use
# any of the functions that need 64-bit aware, none of that should be an
# issue.
# Create and upload the blob, getting a large object identifier.
blob = cursor.connection.lobject(0, 'wb', 0, filename)
blob.close()
# Now put it into our blob_chunk table.
# pylint:disable=use-dict-literal
params = dict(oid=oid, chunk_num=0, loid=blob.oid)
if use_tid:
params['tid'] = tid
cursor.execute(insert_stmt, params)
def _do_store_temps(self, cursor, state_oid_tid_iter, table_name):
# History-preserving storages need the md5 to compare states.
# We could calculate that on the server using pgcrypto, if its
# available. Or we could just compare directly, instead of comparing
# md5; that's fast on PostgreSQL.
if state_oid_tid_iter:
buf = TempStoreCopyBuffer(table_name,
state_oid_tid_iter,
self._compute_md5sum if self.keep_history else None)
cursor.copy_expert(buf.COPY_COMMAND, buf)
[docs]
@metricmethod_sampled
def store_temps(self, cursor, state_oid_tid_iter):
self._do_store_temps(cursor, state_oid_tid_iter, 'temp_store')
[docs]
@metricmethod_sampled
def replace_temps(self, cursor, state_oid_tid_iter):
# Upload and then replace. We *could* go right into the table
# if we first deleted but that would require either iterating twice
# and/or bufferring all the state data in memory. If it's small that's ok,
# but it could be large.
self._do_store_temps(cursor, state_oid_tid_iter, 'temp_store_replacements')
# TODO: Prepare this query.
cursor.execute(
"""
UPDATE temp_store
SET prev_tid = r.prev_tid,
md5 = r.md5,
state = r.state
FROM temp_store_replacements r
WHERE temp_store.zoid = r.zoid
"""
)
[docs]
class TempStoreCopyBuffer(io.BufferedIOBase):
"""
A binary file-like object for putting data into
``temp_store``.
"""
# pg8000 uses readinto(); psycopg2 uses read().
COPY_COMMAND_TMPL = "COPY {NAME} (zoid, prev_tid, md5, state) FROM STDIN WITH (FORMAT binary)"
def __init__(self, table, state_oid_tid_iterable, digester):
super().__init__()
self.COPY_COMMAND = self.COPY_COMMAND_TMPL.format(NAME=table)
self.state_oid_tid_iterable = state_oid_tid_iterable
self._iter = iter(state_oid_tid_iterable)
self._digester = digester
if digester:
# On Python 3, this outputs a str, but our protocol needs bytes
self._digester = lambda s: digester(s).encode("ascii")
if self._digester:
self._read_tuple = self._read_one_tuple_md5
else:
self._read_tuple = self._read_one_tuple_no_md5
self._done = False
self._header = self.HEADER
self._buffer = bytearray(8192)
SIGNATURE = b'PGCOPY\n\xff\r\n\0'
FLAGS = struct.pack("!i", 0)
EXTENSION_LEN = struct.pack("!i", 0)
HEADER = SIGNATURE + FLAGS + EXTENSION_LEN
# All tuples begin with their length in 16 signed bits, which is the same for all tuples
# (zoid, prev_tid, md5, state)
_common = "!hiqiqi"
WITH_SUM = struct.Struct(_common + "32si")
NO_SUM = struct.Struct(_common + "i")
# Each column in the tuple is a 32-bit length (-1
# for NULL), followed by exactly that many bytes of data.
# Each column datum is written in binary format; for character
# fields (like md5) that turns out to be a direct dump of the ascii.
# For BIGINT fields, that's an 8-byte big-endian encoding
# For BYTEA fields, it's just the raw data
# Finally, the trailer is a tuple size of -1
TRAILER = struct.pack("!h", -1)
[docs]
def read(self, size=-1):
# We don't handle "read everything in one go".
# assert size is not None and size > 0
if self._done:
return b''
if len(self._buffer) < size:
self._buffer.extend(bytearray(size - len(self._buffer)))
count = self.readinto(self._buffer)
if not count:
return b''
return bytes(self._buffer)
def readinto(self, buf):
# We basically ignore the size of the buffer,
# writing more into it if we need to.
if self._done:
return 0
requested = len(buf)
# bytearray.clear() is only in Python 3
del buf[:]
buf.extend(self._header)
self._header = b''
while len(buf) < requested:
try:
self._read_tuple(buf)
except StopIteration:
buf.extend(self.TRAILER)
self._done = True
break
return len(buf)
def __len__(self):
return len(self.state_oid_tid_iterable)
def _next_row(self): # pylint:disable=method-hidden
return next(self._iter)
def _read_one_tuple_md5(self,
buf,
_pack_into=WITH_SUM.pack_into,
_header_size=WITH_SUM.size,
_blank_header=bytearray(WITH_SUM.size)):
data, oid_int, tid_int = self._next_row()
if data is None:
# A deleted object. These should be quite rare relative
# to everything else. This won't have an MD5, so
# it can take the same code as the normal no-md5-path,
# as long as we arrange for it to get the right data.
# (A push-back iterator would simplify this.)
self._next_row = lambda: (data, oid_int, tid_int)
try:
self._read_one_tuple_no_md5(buf)
finally:
del self._next_row
return
len_data = len(data)
md5 = self._digester(data)
offset = len(buf)
buf.extend(_blank_header)
_pack_into(
buf, offset,
4,
8, oid_int,
8, tid_int,
32, md5,
len_data
)
buf.extend(data)
def _read_one_tuple_no_md5(self,
buf,
_pack_into=NO_SUM.pack_into,
_header_size=NO_SUM.size,
_blank_header=bytearray(NO_SUM.size)):
data, oid_int, tid_int = self._next_row()
len_data = len(data) if data is not None else -1
offset = len(buf)
buf.extend(_blank_header)
_pack_into(
buf, offset,
4,
8, oid_int,
8, tid_int,
-1,
len_data
)
buf.extend(data or b'')