cherrypick
This commit is contained in:
parent
313d03b75b
commit
bb4927470a
3 changed files with 292 additions and 5 deletions
|
@ -84,7 +84,7 @@ class BaseStorage(StoragePaths):
|
||||||
|
|
||||||
def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END):
|
def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END):
|
||||||
""" Copy the specified number of bytes from the input file stream to the output stream. If
|
""" Copy the specified number of bytes from the input file stream to the output stream. If
|
||||||
num_bytes < 0 copy until the stream ends.
|
num_bytes < 0 copy until the stream ends. Returns the number of bytes copied.
|
||||||
"""
|
"""
|
||||||
bytes_copied = 0
|
bytes_copied = 0
|
||||||
while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END:
|
while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END:
|
||||||
|
|
|
@ -182,12 +182,28 @@ class _CloudStorage(BaseStorageV2):
|
||||||
**self._upload_params)
|
**self._upload_params)
|
||||||
|
|
||||||
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
||||||
self._stream_write_internal(path, fp, content_type, content_encoding)
|
""" Writes the data found in the file-like stream tot he given path. Raises an IOError
|
||||||
|
if the write fails.
|
||||||
|
"""
|
||||||
|
_, write_error = self._stream_write_internal(path, fp, content_type, content_encoding)
|
||||||
|
if write_error is not None:
|
||||||
|
logger.error('Error when trying to stream_write path `%s`: %s', path, write_error)
|
||||||
|
raise IOError('Exception when trying to stream_write path')
|
||||||
|
|
||||||
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
||||||
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
||||||
|
""" Writes the data found in the file-like stream to the given path, with optional limit
|
||||||
|
on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should
|
||||||
|
*not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check
|
||||||
|
the returned tuple on calls to this method.
|
||||||
|
"""
|
||||||
write_error = None
|
write_error = None
|
||||||
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
|
|
||||||
|
try:
|
||||||
|
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
|
||||||
|
except S3ResponseError as e:
|
||||||
|
logger.exception('Exception when initiating multipart upload')
|
||||||
|
return 0, e
|
||||||
|
|
||||||
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
|
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
|
||||||
buf = StringIO.StringIO()
|
buf = StringIO.StringIO()
|
||||||
|
@ -211,7 +227,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
|
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
|
||||||
total_bytes_written += bytes_staged
|
total_bytes_written += bytes_staged
|
||||||
num_part += 1
|
num_part += 1
|
||||||
except IOError as e:
|
except (S3ResponseError, IOError) as e:
|
||||||
logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e)
|
logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e)
|
||||||
write_error = e
|
write_error = e
|
||||||
|
|
||||||
|
@ -219,7 +235,11 @@ class _CloudStorage(BaseStorageV2):
|
||||||
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
|
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
|
||||||
|
|
||||||
if cancel_on_error:
|
if cancel_on_error:
|
||||||
mp.cancel_upload()
|
try:
|
||||||
|
mp.cancel_upload()
|
||||||
|
except (S3ResponseError, IOError):
|
||||||
|
logger.exception('Could not cancel upload')
|
||||||
|
|
||||||
return 0, write_error
|
return 0, write_error
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
@ -263,6 +283,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
return k.etag[1:-1][:7]
|
return k.etag[1:-1][:7]
|
||||||
|
|
||||||
def copy_to(self, destination, path):
|
def copy_to(self, destination, path):
|
||||||
|
""" Copies the given path from this storage to the destination storage. """
|
||||||
self._initialize_cloud_conn()
|
self._initialize_cloud_conn()
|
||||||
|
|
||||||
# First try to copy directly via boto, but only if the storages are the
|
# First try to copy directly via boto, but only if the storages are the
|
||||||
|
@ -527,6 +548,11 @@ class GoogleCloudStorage(_CloudStorage):
|
||||||
|
|
||||||
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
||||||
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
||||||
|
""" Writes the data found in the file-like stream to the given path, with optional limit
|
||||||
|
on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should
|
||||||
|
*not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check
|
||||||
|
the returned tuple on calls to this method.
|
||||||
|
"""
|
||||||
# Minimum size of upload part size on S3 is 5MB
|
# Minimum size of upload part size on S3 is 5MB
|
||||||
self._initialize_cloud_conn()
|
self._initialize_cloud_conn()
|
||||||
path = self._init_path(path)
|
path = self._init_path(path)
|
||||||
|
|
261
storage/test/test_cloud_storage.py
Normal file
261
storage/test/test_cloud_storage.py
Normal file
|
@ -0,0 +1,261 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
from StringIO import StringIO
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import moto
|
||||||
|
import boto
|
||||||
|
|
||||||
|
from moto import mock_s3
|
||||||
|
|
||||||
|
from storage import S3Storage, StorageContext
|
||||||
|
from storage.cloud import _CloudStorage, _PartUploadMetadata
|
||||||
|
from storage.cloud import _CHUNKS_KEY
|
||||||
|
|
||||||
|
_TEST_CONTENT = os.urandom(1024)
|
||||||
|
_TEST_BUCKET = 'some_bucket'
|
||||||
|
_TEST_USER = 'someuser'
|
||||||
|
_TEST_PASSWORD = 'somepassword'
|
||||||
|
_TEST_PATH = 'some/cool/path'
|
||||||
|
_TEST_CONTEXT = StorageContext('nyc', None, None, None, None)
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def storage_engine():
|
||||||
|
mock = moto.mock_s3()
|
||||||
|
mock.start()
|
||||||
|
|
||||||
|
# Create a test bucket and put some test content.
|
||||||
|
boto.connect_s3().create_bucket(_TEST_BUCKET)
|
||||||
|
engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
|
||||||
|
engine.put_content(_TEST_PATH, _TEST_CONTENT)
|
||||||
|
|
||||||
|
yield engine
|
||||||
|
|
||||||
|
mock.stop()
|
||||||
|
|
||||||
|
|
||||||
|
def test_basicop(storage_engine):
|
||||||
|
# Ensure the content exists.
|
||||||
|
assert storage_engine.exists(_TEST_PATH)
|
||||||
|
|
||||||
|
# Verify it can be retrieved.
|
||||||
|
assert storage_engine.get_content(_TEST_PATH) == _TEST_CONTENT
|
||||||
|
|
||||||
|
# Retrieve a checksum for the content.
|
||||||
|
storage_engine.get_checksum(_TEST_PATH)
|
||||||
|
|
||||||
|
# Remove the file.
|
||||||
|
storage_engine.remove(_TEST_PATH)
|
||||||
|
|
||||||
|
# Ensure it no longer exists.
|
||||||
|
with pytest.raises(IOError):
|
||||||
|
storage_engine.get_content(_TEST_PATH)
|
||||||
|
|
||||||
|
with pytest.raises(IOError):
|
||||||
|
storage_engine.get_checksum(_TEST_PATH)
|
||||||
|
|
||||||
|
assert not storage_engine.exists(_TEST_PATH)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('bucket, username, password', [
|
||||||
|
pytest.param(_TEST_BUCKET, _TEST_USER, _TEST_PASSWORD, id='same credentials'),
|
||||||
|
pytest.param('another_bucket', 'blech', 'password', id='different credentials'),
|
||||||
|
])
|
||||||
|
def test_copy(bucket, username, password, storage_engine):
|
||||||
|
# Copy the content to another engine.
|
||||||
|
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER,
|
||||||
|
_TEST_PASSWORD)
|
||||||
|
boto.connect_s3().create_bucket('another_bucket')
|
||||||
|
storage_engine.copy_to(another_engine, _TEST_PATH)
|
||||||
|
|
||||||
|
# Verify it can be retrieved.
|
||||||
|
assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT
|
||||||
|
|
||||||
|
|
||||||
|
def test_copy_with_error(storage_engine):
|
||||||
|
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'anotherbucket', 'foo',
|
||||||
|
'bar')
|
||||||
|
|
||||||
|
with pytest.raises(IOError):
|
||||||
|
storage_engine.copy_to(another_engine, _TEST_PATH)
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_read(storage_engine):
|
||||||
|
# Read the streaming content.
|
||||||
|
data = ''.join(storage_engine.stream_read(_TEST_PATH))
|
||||||
|
assert data == _TEST_CONTENT
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_read_file(storage_engine):
|
||||||
|
with storage_engine.stream_read_file(_TEST_PATH) as f:
|
||||||
|
assert f.read() == _TEST_CONTENT
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_write(storage_engine):
|
||||||
|
new_data = os.urandom(4096)
|
||||||
|
storage_engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type')
|
||||||
|
assert storage_engine.get_content(_TEST_PATH) == new_data
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_write_error():
|
||||||
|
with mock_s3():
|
||||||
|
# Create an engine but not the bucket.
|
||||||
|
engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
|
||||||
|
|
||||||
|
# Attempt to write to the uncreated bucket, which should raise an error.
|
||||||
|
with pytest.raises(IOError):
|
||||||
|
engine.stream_write(_TEST_PATH, StringIO('hello world'), content_type='Cool/Type')
|
||||||
|
|
||||||
|
assert not engine.exists(_TEST_PATH)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('chunk_count', [
|
||||||
|
0,
|
||||||
|
1,
|
||||||
|
50,
|
||||||
|
])
|
||||||
|
@pytest.mark.parametrize('force_client_side', [
|
||||||
|
False,
|
||||||
|
True
|
||||||
|
])
|
||||||
|
def test_chunk_upload(storage_engine, chunk_count, force_client_side):
|
||||||
|
if chunk_count == 0 and force_client_side:
|
||||||
|
return
|
||||||
|
|
||||||
|
upload_id, metadata = storage_engine.initiate_chunked_upload()
|
||||||
|
final_data = ''
|
||||||
|
|
||||||
|
for index in range(0, chunk_count):
|
||||||
|
chunk_data = os.urandom(1024)
|
||||||
|
final_data = final_data + chunk_data
|
||||||
|
bytes_written, new_metadata, error = storage_engine.stream_upload_chunk(upload_id, 0,
|
||||||
|
len(chunk_data),
|
||||||
|
StringIO(chunk_data),
|
||||||
|
metadata)
|
||||||
|
metadata = new_metadata
|
||||||
|
|
||||||
|
assert bytes_written == len(chunk_data)
|
||||||
|
assert error is None
|
||||||
|
assert len(metadata[_CHUNKS_KEY]) == index + 1
|
||||||
|
|
||||||
|
# Complete the chunked upload.
|
||||||
|
storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', metadata,
|
||||||
|
force_client_side=force_client_side)
|
||||||
|
|
||||||
|
# Ensure the file contents are valid.
|
||||||
|
assert storage_engine.get_content('some/chunked/path') == final_data
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('chunk_count', [
|
||||||
|
0,
|
||||||
|
1,
|
||||||
|
50,
|
||||||
|
])
|
||||||
|
def test_cancel_chunked_upload(storage_engine, chunk_count):
|
||||||
|
upload_id, metadata = storage_engine.initiate_chunked_upload()
|
||||||
|
|
||||||
|
for _ in range(0, chunk_count):
|
||||||
|
chunk_data = os.urandom(1024)
|
||||||
|
_, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0,
|
||||||
|
len(chunk_data),
|
||||||
|
StringIO(chunk_data),
|
||||||
|
metadata)
|
||||||
|
metadata = new_metadata
|
||||||
|
|
||||||
|
# Cancel the upload.
|
||||||
|
storage_engine.cancel_chunked_upload(upload_id, metadata)
|
||||||
|
|
||||||
|
# Ensure all chunks were deleted.
|
||||||
|
for chunk in metadata[_CHUNKS_KEY]:
|
||||||
|
assert not storage_engine.exists(chunk.path)
|
||||||
|
|
||||||
|
|
||||||
|
def test_large_chunks_upload(storage_engine):
|
||||||
|
# Make the max chunk size much smaller for testing.
|
||||||
|
storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2
|
||||||
|
|
||||||
|
upload_id, metadata = storage_engine.initiate_chunked_upload()
|
||||||
|
|
||||||
|
# Write a "super large" chunk, to ensure that it is broken into smaller chunks.
|
||||||
|
chunk_data = os.urandom(int(storage_engine.maximum_chunk_size * 2.5))
|
||||||
|
bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0,
|
||||||
|
-1,
|
||||||
|
StringIO(chunk_data),
|
||||||
|
metadata)
|
||||||
|
assert len(chunk_data) == bytes_written
|
||||||
|
|
||||||
|
# Complete the chunked upload.
|
||||||
|
storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata)
|
||||||
|
|
||||||
|
# Ensure the file contents are valid.
|
||||||
|
assert len(chunk_data) == len(storage_engine.get_content('some/chunked/path'))
|
||||||
|
assert storage_engine.get_content('some/chunked/path') == chunk_data
|
||||||
|
|
||||||
|
|
||||||
|
def test_large_chunks_with_ragged_edge(storage_engine):
|
||||||
|
# Make the max chunk size much smaller for testing and force it to have a ragged edge.
|
||||||
|
storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2 + 10
|
||||||
|
|
||||||
|
upload_id, metadata = storage_engine.initiate_chunked_upload()
|
||||||
|
|
||||||
|
# Write a few "super large" chunks, to ensure that it is broken into smaller chunks.
|
||||||
|
all_data = ''
|
||||||
|
for _ in range(0, 2):
|
||||||
|
chunk_data = os.urandom(int(storage_engine.maximum_chunk_size) + 20)
|
||||||
|
bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0,
|
||||||
|
-1,
|
||||||
|
StringIO(chunk_data),
|
||||||
|
metadata)
|
||||||
|
assert len(chunk_data) == bytes_written
|
||||||
|
all_data = all_data + chunk_data
|
||||||
|
metadata = new_metadata
|
||||||
|
|
||||||
|
# Complete the chunked upload.
|
||||||
|
storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata)
|
||||||
|
|
||||||
|
# Ensure the file contents are valid.
|
||||||
|
assert len(all_data) == len(storage_engine.get_content('some/chunked/path'))
|
||||||
|
assert storage_engine.get_content('some/chunked/path') == all_data
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('max_size, parts', [
|
||||||
|
(50, [
|
||||||
|
_PartUploadMetadata('foo', 0, 50),
|
||||||
|
_PartUploadMetadata('foo', 50, 50),
|
||||||
|
]),
|
||||||
|
|
||||||
|
(40, [
|
||||||
|
_PartUploadMetadata('foo', 0, 25),
|
||||||
|
_PartUploadMetadata('foo', 25, 25),
|
||||||
|
_PartUploadMetadata('foo', 50, 25),
|
||||||
|
_PartUploadMetadata('foo', 75, 25)
|
||||||
|
]),
|
||||||
|
|
||||||
|
(51, [
|
||||||
|
_PartUploadMetadata('foo', 0, 50),
|
||||||
|
_PartUploadMetadata('foo', 50, 50),
|
||||||
|
]),
|
||||||
|
|
||||||
|
(49, [
|
||||||
|
_PartUploadMetadata('foo', 0, 25),
|
||||||
|
_PartUploadMetadata('foo', 25, 25),
|
||||||
|
_PartUploadMetadata('foo', 50, 25),
|
||||||
|
_PartUploadMetadata('foo', 75, 25),
|
||||||
|
]),
|
||||||
|
|
||||||
|
(99, [
|
||||||
|
_PartUploadMetadata('foo', 0, 50),
|
||||||
|
_PartUploadMetadata('foo', 50, 50),
|
||||||
|
]),
|
||||||
|
|
||||||
|
(100, [
|
||||||
|
_PartUploadMetadata('foo', 0, 100),
|
||||||
|
]),
|
||||||
|
])
|
||||||
|
def test_rechunked(max_size, parts):
|
||||||
|
chunk = _PartUploadMetadata('foo', 0, 100)
|
||||||
|
rechunked = list(_CloudStorage._rechunk(chunk, max_size))
|
||||||
|
assert len(rechunked) == len(parts)
|
||||||
|
for index, chunk in enumerate(rechunked):
|
||||||
|
assert chunk == parts[index]
|
Reference in a new issue