Hello Nikolaus,
compiling the extension from the release tarball works now, thanks!
It looks like the release tarball (
https://github.com/s3ql/s3ql/releases/download/release-5.0.0-pre1/s3ql-5.0.0.tar.gz
) is from another commit than the corresponding tag (
https://github.com/s3ql/s3ql/archive/refs/tags/release-5.0.0-pre1.tar.gz
). Have a look at the attached diff for the changes.
Could you maybe upload the release tarball with after a hard reset of
your working copy to the release-5.0.0-pre1 tag ( sha
adb2d04c588b636a4251d8b4f455b36b92dbb33b )?
Thanks,
Daniel
--
You received this message because you are subscribed to the Google Groups
"s3ql" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/s3ql/7bed92d1-7f23-76e8-4433-04876fd9723c%40jagszent.de.
diff --color -ru ~/s3ql-release-5.0.0-pre1/setup.cfg ~/s3ql-5.0.0/setup.cfg
--- ~/s3ql-release-5.0.0-pre1/setup.cfg 2023-05-17 10:39:01.000000000 +0200
+++ ~/s3ql-5.0.0/setup.cfg 2023-05-19 18:58:31.888473300 +0200
@@ -1,6 +1,10 @@
-# Don't automatically download and install any dependencies
[easy_install]
allow_hosts = None
[aliases]
-test=pytest
\ No newline at end of file
+test = pytest
+
+[egg_info]
+tag_build =
+tag_date = 0
+
diff --color -ru ~/s3ql-release-5.0.0-pre1/src/s3ql/backends/common.py
~/s3ql-5.0.0/src/s3ql/backends/common.py
--- ~/s3ql-release-5.0.0-pre1/src/s3ql/backends/common.py 2023-05-17
10:39:01.000000000 +0200
+++ ~/s3ql-5.0.0/src/s3ql/backends/common.py 2023-05-17 11:14:50.000000000
+0200
@@ -20,6 +20,7 @@
import time
from abc import ABCMeta, abstractmethod
from functools import wraps
+from io import BytesIO
from typing import BinaryIO
from .. import BUFSIZE
@@ -297,28 +298,50 @@
``backend.fetch(key)[0]``.
"""
- def do_read(fh):
- data = fh.read()
- return (data, fh.metadata)
+ fh = BytesIO()
+ metadata = self.readinto_fh(key, fh)
+ return (fh.getvalue(), metadata)
- return self.perform_read(do_read, key)
+ def readinto_fh(self, key: str, fh: BinaryIO):
+ '''Transfer data stored under *key* into *fh*, return metadata.
- def readinto(self, key: str, ofh: BinaryIO):
- """Read data stored under `key` into *fh*, return metadata."""
+ The data will be inserted at offset zero, and the file handle
truncated to the size of the
+ data.
- off = ofh.tell()
+ If a temporary error (as defined by `is_temp_failure`) occurs, the
operation is retried.
+ '''
def do_read(ifh: BinaryIO):
- ofh.seek(off)
+ fh.seek(0)
while True:
buf = ifh.read(BUFSIZE)
if not buf:
break
- ofh.write(buf)
+ fh.write(buf)
+ fh.truncate()
return ifh.metadata
return self.perform_read(do_read, key)
+ def write_fh(self, key: str, fh: BinaryIO, metadata=None):
+ '''Upload data from *fh* under *key*.
+
+ The data will be read at offset zero. If a temporary error (as defined
by `is_temp_failure`)
+ occurs, the operation is retried. Returns the amount of data written.
+ '''
+
+ def do_write(ofh: BinaryIO):
+ fh.seek(0)
+ while True:
+ buf = fh.read(BUFSIZE)
+ if not buf:
+ break
+ ofh.write(buf)
+ return ofh
+
+ fh = self.perform_write(do_write, key, metadata)
+ return fh.get_obj_size()
+
def store(self, key, val, metadata=None):
"""Store data under `key`.
@@ -331,7 +354,7 @@
equivalent to ``backend.store(key, val)``.
"""
- self.perform_write(lambda fh: fh.write(val), key, metadata)
+ return self.write_fh(key, BytesIO(val), metadata)
@abstractmethod
def is_temp_failure(self, exc):
diff --color -ru ~/s3ql-release-5.0.0-pre1/src/s3ql/backends/comprenc.py
~/s3ql-5.0.0/src/s3ql/backends/comprenc.py
--- ~/s3ql-release-5.0.0-pre1/src/s3ql/backends/comprenc.py 2023-05-17
10:39:01.000000000 +0200
+++ ~/s3ql-5.0.0/src/s3ql/backends/comprenc.py 2023-05-17 11:14:50.000000000
+0200
@@ -12,9 +12,11 @@
import io
import logging
import lzma
+import shutil
import struct
import time
import zlib
+from typing import Any, BinaryIO, Optional, Dict
import cryptography.hazmat.backends as crypto_backends
import cryptography.hazmat.primitives.ciphers as crypto_ciphers
@@ -157,6 +159,129 @@
except ThawError:
raise CorruptedObjectError('Invalid metadata')
+ def readinto_fh(self, key: str, fh: BinaryIO):
+ '''Transfer data stored under *key* into *fh*, return metadata.
+
+ The data will be inserted at offset zero, and the file handle
truncated to the size of the
+ data.
+ '''
+
+ buf1 = fh
+ buf2 = io.BytesIO()
+
+ # Read into buf1
+ meta_raw = self.backend.readinto_fh(key, buf1)
+ (nonce, meta) = self._verify_meta(key, meta_raw)
+ if nonce:
+ data_key = sha256(self.passphrase + nonce)
+
+ # The `payload_offset` key only exists if the storage object was
created with on old S3QL
+ # version. In order to avoid having to download and re-upload the
entire object during the
+ # upgrade, the upgrade procedure adds this header to tell us how many
bytes at the beginning
+ # of the object we have to skip to get to the payload.
+ if 'payload_offset' in meta_raw:
+ buf1.seek(meta_raw['payload_offset'])
+ else:
+ buf1.seek(0)
+
+ # Decrypt into buf2
+ encr_alg = meta_raw['encryption']
+ if encr_alg == 'AES_v2':
+ buf2.seek(0)
+ decrypt_fh(buf1, buf2, data_key)
+ buf2.truncate()
+ buf2.seek(0)
+ elif encr_alg != 'None':
+ raise RuntimeError('Unsupported encryption: %s' % encr_alg)
+ else:
+ (buf1, buf2) = (buf2, buf1)
+
+ # Decompress into buf1
+ compr_alg = meta_raw['compression']
+ if compr_alg != 'None':
+ if compr_alg == 'BZIP2':
+ decompressor = bz2.BZ2Decompressor()
+ elif compr_alg == 'LZMA':
+ decompressor = lzma.LZMADecompressor()
+ elif compr_alg == 'ZLIB':
+ decompressor = zlib.decompressobj()
+ else:
+ raise RuntimeError('Unsupported compression: %s' % compr_alg)
+ buf1.seek(0)
+ decompress_fh(buf2, buf1, decompressor)
+ buf1.truncate()
+ buf1.seek(0)
+ else:
+ (buf1, buf2) = (buf2, buf1)
+
+ # Make sure final output is in fh
+ if buf1 is not fh:
+ fh.seek(0)
+ shutil.copyfileobj(buf1, fh, BUFSIZE)
+ fh.truncate()
+
+ return meta
+
+ def write_fh(
+ self,
+ key: str,
+ fh: BinaryIO,
+ metadata: Optional[Dict[str, Any]] = None,
+ is_compressed: bool = False,
+ ):
+ '''Upload data from *fh* under *key*.
+
+ The data will be read at offset zero.
+
+ If *is_compressed* is True, do not compress the data.
+
+ Returns the size of the resulting storage object (which may be less
due to compression).
+ '''
+
+ if metadata is None:
+ metadata = dict()
+
+ meta_buf = freeze_basic_mapping(metadata)
+ meta_raw = dict(format_version=2)
+
+ if is_compressed or self.compression[0] is None:
+ meta_raw['compression'] = 'None'
+ else:
+ if self.compression[0] == 'zlib':
+ compr = zlib.compressobj(self.compression[1])
+ meta_raw['compression'] = 'ZLIB'
+ elif self.compression[0] == 'bzip2':
+ compr = bz2.BZ2Compressor(self.compression[1])
+ meta_raw['compression'] = 'BZIP2'
+ elif self.compression[0] == 'lzma':
+ compr = lzma.LZMACompressor(preset=self.compression[1])
+ meta_raw['compression'] = 'LZMA'
+ buf = io.BytesIO()
+ compress_fh(fh, buf, compr)
+ buf.seek(0)
+ fh = buf
+
+ if self.passphrase is None:
+ meta_raw['encryption'] = 'None'
+ meta_raw['data'] = meta_buf
+ else:
+ nonce = struct.pack('<d', time.time()) + key.encode('utf-8')
+ meta_key = sha256(self.passphrase + nonce + b'meta')
+ encryptor = aes_encryptor(meta_key)
+ meta_raw['encryption'] = 'AES_v2'
+ meta_raw['nonce'] = nonce
+ meta_raw['data'] = encryptor.update(meta_buf) +
encryptor.finalize()
+ meta_raw['object_id'] = key
+ meta_raw['signature'] = checksum_basic_mapping(meta_raw, meta_key)
+ data_key = sha256(self.passphrase + nonce)
+ encryptor = aes_encryptor(data_key)
+ buf = io.BytesIO()
+ encrypt_fh(fh, buf, encryptor)
+ buf.seek(0)
+ fh = buf
+
+ return self.backend.write_fh(key, fh, meta_raw)
+
def open_read(self, key):
"""
If the backend has a password set but the object is not encrypted,
@@ -267,6 +392,47 @@
self.backend.close()
+def compress_fh(ifh: BinaryIO, ofh: BinaryIO, compr):
+ '''Compress *ifh* into *ofh* using compr'''
+
+ while True:
+ buf = ifh.read(BUFSIZE)
+ if not buf:
+ break
+ buf = compr.compress(buf)
+ if buf:
+ ofh.write(buf)
+
+ buf = compr.flush()
+ if buf:
+ ofh.write(buf)
+
+
+def encrypt_fh(ifh: BinaryIO, ofh: BinaryIO, key: bytes):
+ '''Decrypt contents of *ifh* into *ofh*'''
+
+ encryptor = aes_encryptor(key)
+ hmac_ = hmac.new(key, digestmod=hashlib.sha256)
+
+ while True:
+ buf = ifh.read(BUFSIZE)
+ if not buf:
+ break
+
+ header = struct.pack(b'<I', len(buf))
+ hmac_.update(header)
+ ofh.write(encryptor.update(header))
+
+ hmac_.update(buf)
+ ofh.write(encryptor.update(buf))
+
+ # Packet length of 0 indicates end of stream, only HMAC follows
+ buf = struct.pack(b'<I', 0)
+ hmac_.update(buf)
+ ofh.write(encryptor.update(buf))
+ ofh.write(encryptor.update(hmac_.digest()))
+
+
class CompressFilter:
'''Compress data while writing'''
@@ -396,7 +562,7 @@
return b''
try:
- buf = decompress(self.decomp, buf)
+ buf = decompress_buf(self.decomp, buf)
except CorruptedObjectError:
# Read rest of stream, so that we raise HMAC or MD5 error
instead
# if problem is on lower layer
@@ -483,6 +649,58 @@
return self.obj_size
+def decompress_fh(ifh: BinaryIO, ofh: BinaryIO, decompressor):
+ '''Decompress contents of *ifh* into *ofh*'''
+
+ while True:
+ buf = ifh.read(BUFSIZE)
+ if not buf:
+ break
+ buf = decompress_buf(decompressor, buf)
+ if buf:
+ ofh.write(buf)
+
+ if not decompressor.eof:
+ raise CorruptedObjectError('Premature end of stream.')
+ if decompressor.unused_data:
+ raise CorruptedObjectError('Data after end of compressed stream')
+
+
+def decrypt_fh(ifh: BinaryIO, ofh: BinaryIO, key: bytes):
+ '''Decrypt contents of *ifh* into *ofh*'''
+
+ off_size = struct.calcsize(b'<I')
+ decryptor = aes_decryptor(key)
+ hmac_ = hmac.new(key, digestmod=hashlib.sha256)
+
+ while True:
+ buf = ifh.read(off_size)
+ if not buf:
+ raise CorruptedObjectError('Premature end of stream.')
+ buf = decryptor.update(buf)
+ hmac_.update(buf)
+ assert len(buf) == off_size
+ to_read = struct.unpack(b'<I', buf)[0]
+ if to_read == 0:
+ break
+ while to_read:
+ buf = ifh.read(min(to_read, BUFSIZE))
+ if not buf:
+ raise CorruptedObjectError('Premature end of stream.')
+ to_read -= len(buf)
+ buf = decryptor.update(buf)
+ hmac_.update(buf)
+ ofh.write(buf)
+
+ buf = ifh.read(HMAC_SIZE)
+ buf = decryptor.update(buf)
+ if ifh.read(BUFSIZE):
+ raise CorruptedObjectError('Extraneous data at end of object')
+
+ if not hmac.compare_digest(buf, hmac_.digest()):
+ raise CorruptedObjectError('HMAC mismatch')
+
+
class DecryptFilter(InputFilter):
'''Decrypt data while reading
@@ -606,7 +824,7 @@
return False
-def decompress(decomp, buf):
+def decompress_buf(decomp, buf):
'''Decompress *buf* using *decomp*
This method encapsulates exception handling for different
diff --color -ru ~/s3ql-release-5.0.0-pre1/src/s3ql/block_cache.py
~/s3ql-5.0.0/src/s3ql/block_cache.py
--- ~/s3ql-release-5.0.0-pre1/src/s3ql/block_cache.py 2023-05-17
10:39:01.000000000 +0200
+++ ~/s3ql-5.0.0/src/s3ql/block_cache.py 2023-05-17 11:14:50.000000000
+0200
@@ -10,7 +10,6 @@
import logging
import os
import re
-import shutil
import sys
import threading
import time
@@ -327,15 +326,6 @@
This method runs in a separate thread outside the trio event loop.
'''
- def do_write(fh):
- el.seek(0)
- while True:
- buf = el.read(BUFSIZE)
- if not buf:
- break
- fh.write(buf)
- return fh
-
success = False
async def with_event_loop(exc_info):
@@ -377,16 +367,12 @@
with self.backend_pool() as backend:
if log.isEnabledFor(logging.DEBUG):
time_ = time.time()
- obj_size = backend.perform_write(
- do_write, 's3ql_data_%d' % obj_id
- ).get_obj_size()
+ obj_size = backend.write_fh('s3ql_data_%d' % obj_id, el)
time_ = time.time() - time_
rate = el.size / (1024**2 * time_) if time_ != 0 else 0
log.debug('uploaded %d bytes in %.3f seconds, %.2f MiB/s',
el.size, time_, rate)
else:
- obj_size = backend.perform_write(
- do_write, 's3ql_data_%d' % obj_id
- ).get_obj_size()
+ obj_size = backend.write_fh('s3ql_data_%d' % obj_id, el)
success = True
finally:
self.in_transit.remove(el)
@@ -704,12 +690,6 @@
log.debug('downloading object %d..', obj_id)
tmpfh = open(filename + '.tmp', 'wb')
try:
-
- def do_read(fh):
- tmpfh.seek(0)
- tmpfh.truncate()
- shutil.copyfileobj(fh, tmpfh, BUFSIZE)
-
# Lock object. This ensures that we wait until the object
# is uploaded. We don't have to worry about deletion, because
# as long as the current cache entry exists, there will always
be
@@ -720,7 +700,7 @@
def with_lock_released():
with self.backend_pool() as backend:
- backend.perform_read(do_read, 's3ql_data_%d' % obj_id)
+ backend.readinto_fh('s3ql_data_%d' % obj_id, tmpfh)
await trio.to_thread.run_sync(with_lock_released)
Only in ~/s3ql-5.0.0/src/s3ql: sqlite3ext.cpp
Only in ~/s3ql-5.0.0/src: s3ql.egg-info
diff --color -ru ~/s3ql-release-5.0.0-pre1/tests/t1_backends.py
~/s3ql-5.0.0/tests/t1_backends.py
--- ~/s3ql-release-5.0.0-pre1/tests/t1_backends.py 2023-05-17
10:39:01.000000000 +0200
+++ ~/s3ql-5.0.0/tests/t1_backends.py 2023-05-17 11:14:50.000000000 +0200
@@ -23,6 +23,7 @@
import threading
import time
from argparse import Namespace
+from io import BytesIO
import mock_server
import pytest
@@ -33,7 +34,7 @@
import s3ql.backends.common
from s3ql import BUFSIZE, backends
-from s3ql.backends.common import CorruptedObjectError, NoSuchObject
+from s3ql.backends.common import AbstractBackend, CorruptedObjectError,
NoSuchObject
from s3ql.backends.comprenc import ComprencBackend, ObjectNotEncrypted
from s3ql.backends.gs import Backend as GSBackend
from s3ql.backends.local import Backend as LocalBackend
@@ -400,6 +401,28 @@
assert metadata == metadata2
assert lookup_object(backend, key) == metadata
+
[email protected]_backend('*/*')
+def test_readinto_write_fh(backend: AbstractBackend):
+ key = newname()
+ value = newvalue()
+ metadata = {'jimmy': 'jups@42'}
+ buf = BytesIO()
+
+ assert key not in backend
+ assert_raises(NoSuchObject, backend.lookup, key)
+ assert_raises(NoSuchObject, backend.readinto_fh, key, buf)
+
+ backend.write_fh(key, BytesIO(value), metadata)
+
+ assert key in backend
+
+ metadata2 = backend.readinto_fh(key, buf)
+
+ assert value == buf.getvalue()
+ assert metadata == metadata2
+ assert lookup_object(backend, key) == metadata
+
@pytest.mark.with_backend('swift/raw')
def test_issue114(backend, monkeypatch):