##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""IObjectMover implementation.
"""
from __future__ import absolute_import
import os
import sys
from zope.interface import implementer
from ..interfaces import IObjectMover
from ..mover import AbstractObjectMover
from ..mover import RowBatcherStoreTemps
from ..mover import metricmethod_sampled
[docs]
class OracleRowBatcherStoreTemps(RowBatcherStoreTemps):
def store_temp_into_batcher(self, batcher, oid, prev_tid, data):
md5sum = self._compute_md5sum(data)
size = len(data)
if size <= 2000:
# Send data inline for speed. Oracle docs say maximum size
# of a RAW is 2000 bytes.
stmt = "BEGIN relstorage_op.store_temp(:1, :2, :3, :4); END;"
batcher.add_array_op(
stmt,
'oid prev_tid md5sum rawdata',
(oid, prev_tid, md5sum, data),
rowkey=oid,
size=size,
)
else:
# Send data as a BLOB
row = {
'oid': oid,
'prev_tid': prev_tid,
'md5sum': md5sum,
'blobdata': data,
}
batcher.insert_into(
"temp_store (zoid, prev_tid, md5, state)",
":oid, :prev_tid, :md5sum, :blobdata",
row,
rowkey=oid,
size=size,
)
[docs]
@implementer(IObjectMover)
class OracleObjectMover(OracleRowBatcherStoreTemps,
AbstractObjectMover):
# This is assigned to by the adapter.
inputsizes = None
def __init__(self, *args, **kwargs):
AbstractObjectMover.__init__(self, *args, **kwargs)
OracleRowBatcherStoreTemps.__init__(
self,
self.keep_history,
self.driver.Binary,
batcher_factory=self.make_batcher)
[docs]
@metricmethod_sampled
def get_object_tid_after(self, cursor, oid, tid):
"""Returns the tid of the next change after an object revision.
Returns None if no later state exists.
"""
stmt = """
SELECT MIN(tid)
FROM object_state
WHERE zoid = :1
AND tid > :2
"""
cursor.execute(stmt, (oid, tid))
rows = cursor.fetchall()
if not rows:
return None
# XXX: If we can use rowcount here, we can combine
# with superclass.
assert len(rows) == 1
return rows[0][0]
# no store connection initialization needed for Oracle
[docs]
def on_store_opened(self, cursor, restart=False):
pass
[docs]
@metricmethod_sampled
def restore(self, cursor, batcher, oid, tid, data):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
"""
md5sum = self._compute_md5sum(data)
size = len(data) if data is not None else 0
if size <= 2000:
# Send data inline for speed. Oracle docs say maximum size
# of a RAW is 2000 bytes.
# XXX: Revisit this. cx_Oracle docs suggest that blobs up to 1GB can
# be sent inline very fast.
if self.keep_history:
stmt = "BEGIN relstorage_op.restore(:1, :2, :3, :4); END;"
batcher.add_array_op(
stmt,
'oid tid md5sum rawdata',
(oid, tid, md5sum, data),
rowkey=(oid, tid),
size=size,
)
else:
stmt = "BEGIN relstorage_op.restore(:1, :2, :3); END;"
batcher.add_array_op(
stmt,
'oid tid rawdata',
(oid, tid, data),
rowkey=(oid, tid),
size=size,
)
else:
# pylint:disable=else-if-used
# Send as a BLOB
if self.keep_history:
row = {
'oid': oid,
'tid': tid,
'md5sum': md5sum,
'state_size': size,
'blobdata': data,
}
row_schema = """
:oid, :tid,
COALESCE((SELECT tid
FROM current_object
WHERE zoid = :oid), 0),
:md5sum, :state_size, :blobdata
"""
batcher.insert_into(
"object_state (zoid, tid, prev_tid, md5, state_size, state)",
row_schema,
row,
rowkey=(oid, tid),
size=size,
)
else:
batcher.delete_from('object_state', zoid=oid)
if data:
row = {
'oid': oid,
'tid': tid,
'state_size': size,
'blobdata': data,
}
batcher.insert_into(
"object_state (zoid, tid, state_size, state)",
":oid, :tid, :state_size, :blobdata",
row,
rowkey=oid,
size=size,
)
[docs]
@metricmethod_sampled
def download_blob(self, cursor, oid, tid, filename):
"""Download a blob into a file."""
stmt = """
SELECT chunk
FROM blob_chunk
WHERE zoid = :1
AND tid = :2
ORDER BY chunk_num
"""
f = None
bytecount = 0
# Current versions of cx_Oracle only support offsets up
# to sys.maxint or 4GB, whichever comes first.
maxsize = min(sys.maxsize, 1 << 32)
oth = cursor.connection.outputtypehandler
del cursor.connection.outputtypehandler
try:
cursor.execute(stmt, (oid, tid))
while True:
try:
blob, = cursor.fetchone()
except TypeError:
# No more chunks. Note: if there are no chunks at
# all, then this method should not write a file.
break
if f is None:
f = open(filename, 'wb') # pylint:disable=consider-using-with
# round off the chunk-size to be a multiple of the oracle
# blob chunk size to maximize performance
read_chunk_size = int(
max(
round(1.0 * self.blob_chunk_size / blob.getchunksize()),
1)
* blob.getchunksize())
offset = 1 # Oracle still uses 1-based indexing.
reader = iter(lambda: blob.read(offset, read_chunk_size), b'')
for read_chunk in reader:
f.write(read_chunk)
bytecount += len(read_chunk)
offset += len(read_chunk)
if offset > maxsize:
# We have already read the maximum we can store
# so we can assume we are done. If we do not break
# off here, cx_Oracle will throw an overflow
# exception anyway.
break
except:
if f is not None:
f.close()
os.remove(filename)
raise
finally:
cursor.connection.outputtypehandler = oth
if f is not None:
f.close()
return bytecount
[docs]
@metricmethod_sampled
def upload_blob(self, cursor, oid, tid, filename):
"""Upload a blob from a file.
If serial is None, upload to the temporary table.
"""
# pylint:disable=too-many-locals
if tid is not None:
if self.keep_history:
delete_stmt = """
DELETE FROM blob_chunk
WHERE zoid = :1 AND tid = :2
"""
cursor.execute(delete_stmt, (oid, tid))
else:
delete_stmt = "DELETE FROM blob_chunk WHERE zoid = :1"
cursor.execute(delete_stmt, (oid,))
use_tid = True
insert_stmt = """
INSERT INTO blob_chunk (zoid, tid, chunk_num, chunk)
VALUES (:oid, :tid, :chunk_num, empty_blob())
RETURNING chunk INTO :newblob
"""
else:
use_tid = False
delete_stmt = "DELETE FROM temp_blob_chunk WHERE zoid = :1"
cursor.execute(delete_stmt, (oid,))
insert_stmt = """
INSERT INTO temp_blob_chunk (zoid, chunk_num, chunk)
VALUES (:oid, :chunk_num, empty_blob())
RETURNING chunk INTO :newblob
"""
f = open(filename, 'rb')
blob_var = cursor.var(self.driver.BLOB)
# pylint:disable=use-dict-literal
params = dict(oid=oid, chunk_num=0, newblob=blob_var)
if use_tid:
params['tid'] = tid
cursor.execute(insert_stmt, params)
blob, = blob_var.getvalue()
offset = 1 # Oracle uses 1-based indexing
try:
with f:
while 1:
data = f.read(blob.getchunksize())
if data:
blob.write(data, offset)
else:
break
offset += len(data)
finally:
if blob is not None and blob.isopen():
blob.close()