From bb4927470a11d3a3aaca01c7c65018ee3451ffa5 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 31 Oct 2018 12:46:49 -0400 Subject: [PATCH] cherrypick --- storage/basestorage.py | 2 +- storage/cloud.py | 34 +++- storage/test/test_cloud_storage.py | 261 +++++++++++++++++++++++++++++ 3 files changed, 292 insertions(+), 5 deletions(-) create mode 100644 storage/test/test_cloud_storage.py diff --git a/storage/basestorage.py b/storage/basestorage.py index 1ec0cfb77..6cc4e96ac 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -84,7 +84,7 @@ class BaseStorage(StoragePaths): def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END): """ Copy the specified number of bytes from the input file stream to the output stream. If - num_bytes < 0 copy until the stream ends. + num_bytes < 0 copy until the stream ends. Returns the number of bytes copied. """ bytes_copied = 0 while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END: diff --git a/storage/cloud.py b/storage/cloud.py index a04f50ed6..87964259d 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -182,12 +182,28 @@ class _CloudStorage(BaseStorageV2): **self._upload_params) def stream_write(self, path, fp, content_type=None, content_encoding=None): - self._stream_write_internal(path, fp, content_type, content_encoding) + """ Writes the data found in the file-like stream tot he given path. Raises an IOError + if the write fails. + """ + _, write_error = self._stream_write_internal(path, fp, content_type, content_encoding) + if write_error is not None: + logger.error('Error when trying to stream_write path `%s`: %s', path, write_error) + raise IOError('Exception when trying to stream_write path') def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, cancel_on_error=True, size=filelike.READ_UNTIL_END): + """ Writes the data found in the file-like stream to the given path, with optional limit + on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should + *not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check + the returned tuple on calls to this method. + """ write_error = None - mp = self.__initiate_multipart_upload(path, content_type, content_encoding) + + try: + mp = self.__initiate_multipart_upload(path, content_type, content_encoding) + except S3ResponseError as e: + logger.exception('Exception when initiating multipart upload') + return 0, e # We are going to reuse this but be VERY careful to only read the number of bytes written to it buf = StringIO.StringIO() @@ -211,7 +227,7 @@ class _CloudStorage(BaseStorageV2): mp.upload_part_from_file(buf, num_part, size=bytes_staged) total_bytes_written += bytes_staged num_part += 1 - except IOError as e: + except (S3ResponseError, IOError) as e: logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e) write_error = e @@ -219,7 +235,11 @@ class _CloudStorage(BaseStorageV2): self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) if cancel_on_error: - mp.cancel_upload() + try: + mp.cancel_upload() + except (S3ResponseError, IOError): + logger.exception('Could not cancel upload') + return 0, write_error else: break @@ -263,6 +283,7 @@ class _CloudStorage(BaseStorageV2): return k.etag[1:-1][:7] def copy_to(self, destination, path): + """ Copies the given path from this storage to the destination storage. """ self._initialize_cloud_conn() # First try to copy directly via boto, but only if the storages are the @@ -527,6 +548,11 @@ class GoogleCloudStorage(_CloudStorage): def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, cancel_on_error=True, size=filelike.READ_UNTIL_END): + """ Writes the data found in the file-like stream to the given path, with optional limit + on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should + *not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check + the returned tuple on calls to this method. + """ # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() path = self._init_path(path) diff --git a/storage/test/test_cloud_storage.py b/storage/test/test_cloud_storage.py new file mode 100644 index 000000000..9db320161 --- /dev/null +++ b/storage/test/test_cloud_storage.py @@ -0,0 +1,261 @@ +import os + +from StringIO import StringIO + +import pytest + +import moto +import boto + +from moto import mock_s3 + +from storage import S3Storage, StorageContext +from storage.cloud import _CloudStorage, _PartUploadMetadata +from storage.cloud import _CHUNKS_KEY + +_TEST_CONTENT = os.urandom(1024) +_TEST_BUCKET = 'some_bucket' +_TEST_USER = 'someuser' +_TEST_PASSWORD = 'somepassword' +_TEST_PATH = 'some/cool/path' +_TEST_CONTEXT = StorageContext('nyc', None, None, None, None) + +@pytest.fixture() +def storage_engine(): + mock = moto.mock_s3() + mock.start() + + # Create a test bucket and put some test content. + boto.connect_s3().create_bucket(_TEST_BUCKET) + engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) + engine.put_content(_TEST_PATH, _TEST_CONTENT) + + yield engine + + mock.stop() + + +def test_basicop(storage_engine): + # Ensure the content exists. + assert storage_engine.exists(_TEST_PATH) + + # Verify it can be retrieved. + assert storage_engine.get_content(_TEST_PATH) == _TEST_CONTENT + + # Retrieve a checksum for the content. + storage_engine.get_checksum(_TEST_PATH) + + # Remove the file. + storage_engine.remove(_TEST_PATH) + + # Ensure it no longer exists. + with pytest.raises(IOError): + storage_engine.get_content(_TEST_PATH) + + with pytest.raises(IOError): + storage_engine.get_checksum(_TEST_PATH) + + assert not storage_engine.exists(_TEST_PATH) + + +@pytest.mark.parametrize('bucket, username, password', [ + pytest.param(_TEST_BUCKET, _TEST_USER, _TEST_PASSWORD, id='same credentials'), + pytest.param('another_bucket', 'blech', 'password', id='different credentials'), +]) +def test_copy(bucket, username, password, storage_engine): + # Copy the content to another engine. + another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER, + _TEST_PASSWORD) + boto.connect_s3().create_bucket('another_bucket') + storage_engine.copy_to(another_engine, _TEST_PATH) + + # Verify it can be retrieved. + assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT + + +def test_copy_with_error(storage_engine): + another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'anotherbucket', 'foo', + 'bar') + + with pytest.raises(IOError): + storage_engine.copy_to(another_engine, _TEST_PATH) + + +def test_stream_read(storage_engine): + # Read the streaming content. + data = ''.join(storage_engine.stream_read(_TEST_PATH)) + assert data == _TEST_CONTENT + + +def test_stream_read_file(storage_engine): + with storage_engine.stream_read_file(_TEST_PATH) as f: + assert f.read() == _TEST_CONTENT + + +def test_stream_write(storage_engine): + new_data = os.urandom(4096) + storage_engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type') + assert storage_engine.get_content(_TEST_PATH) == new_data + + +def test_stream_write_error(): + with mock_s3(): + # Create an engine but not the bucket. + engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) + + # Attempt to write to the uncreated bucket, which should raise an error. + with pytest.raises(IOError): + engine.stream_write(_TEST_PATH, StringIO('hello world'), content_type='Cool/Type') + + assert not engine.exists(_TEST_PATH) + + +@pytest.mark.parametrize('chunk_count', [ + 0, + 1, + 50, +]) +@pytest.mark.parametrize('force_client_side', [ + False, + True +]) +def test_chunk_upload(storage_engine, chunk_count, force_client_side): + if chunk_count == 0 and force_client_side: + return + + upload_id, metadata = storage_engine.initiate_chunked_upload() + final_data = '' + + for index in range(0, chunk_count): + chunk_data = os.urandom(1024) + final_data = final_data + chunk_data + bytes_written, new_metadata, error = storage_engine.stream_upload_chunk(upload_id, 0, + len(chunk_data), + StringIO(chunk_data), + metadata) + metadata = new_metadata + + assert bytes_written == len(chunk_data) + assert error is None + assert len(metadata[_CHUNKS_KEY]) == index + 1 + + # Complete the chunked upload. + storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', metadata, + force_client_side=force_client_side) + + # Ensure the file contents are valid. + assert storage_engine.get_content('some/chunked/path') == final_data + + +@pytest.mark.parametrize('chunk_count', [ + 0, + 1, + 50, +]) +def test_cancel_chunked_upload(storage_engine, chunk_count): + upload_id, metadata = storage_engine.initiate_chunked_upload() + + for _ in range(0, chunk_count): + chunk_data = os.urandom(1024) + _, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0, + len(chunk_data), + StringIO(chunk_data), + metadata) + metadata = new_metadata + + # Cancel the upload. + storage_engine.cancel_chunked_upload(upload_id, metadata) + + # Ensure all chunks were deleted. + for chunk in metadata[_CHUNKS_KEY]: + assert not storage_engine.exists(chunk.path) + + +def test_large_chunks_upload(storage_engine): + # Make the max chunk size much smaller for testing. + storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2 + + upload_id, metadata = storage_engine.initiate_chunked_upload() + + # Write a "super large" chunk, to ensure that it is broken into smaller chunks. + chunk_data = os.urandom(int(storage_engine.maximum_chunk_size * 2.5)) + bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0, + -1, + StringIO(chunk_data), + metadata) + assert len(chunk_data) == bytes_written + + # Complete the chunked upload. + storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) + + # Ensure the file contents are valid. + assert len(chunk_data) == len(storage_engine.get_content('some/chunked/path')) + assert storage_engine.get_content('some/chunked/path') == chunk_data + + +def test_large_chunks_with_ragged_edge(storage_engine): + # Make the max chunk size much smaller for testing and force it to have a ragged edge. + storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2 + 10 + + upload_id, metadata = storage_engine.initiate_chunked_upload() + + # Write a few "super large" chunks, to ensure that it is broken into smaller chunks. + all_data = '' + for _ in range(0, 2): + chunk_data = os.urandom(int(storage_engine.maximum_chunk_size) + 20) + bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0, + -1, + StringIO(chunk_data), + metadata) + assert len(chunk_data) == bytes_written + all_data = all_data + chunk_data + metadata = new_metadata + + # Complete the chunked upload. + storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) + + # Ensure the file contents are valid. + assert len(all_data) == len(storage_engine.get_content('some/chunked/path')) + assert storage_engine.get_content('some/chunked/path') == all_data + + +@pytest.mark.parametrize('max_size, parts', [ + (50, [ + _PartUploadMetadata('foo', 0, 50), + _PartUploadMetadata('foo', 50, 50), + ]), + + (40, [ + _PartUploadMetadata('foo', 0, 25), + _PartUploadMetadata('foo', 25, 25), + _PartUploadMetadata('foo', 50, 25), + _PartUploadMetadata('foo', 75, 25) + ]), + + (51, [ + _PartUploadMetadata('foo', 0, 50), + _PartUploadMetadata('foo', 50, 50), + ]), + + (49, [ + _PartUploadMetadata('foo', 0, 25), + _PartUploadMetadata('foo', 25, 25), + _PartUploadMetadata('foo', 50, 25), + _PartUploadMetadata('foo', 75, 25), + ]), + + (99, [ + _PartUploadMetadata('foo', 0, 50), + _PartUploadMetadata('foo', 50, 50), + ]), + + (100, [ + _PartUploadMetadata('foo', 0, 100), + ]), +]) +def test_rechunked(max_size, parts): + chunk = _PartUploadMetadata('foo', 0, 100) + rechunked = list(_CloudStorage._rechunk(chunk, max_size)) + assert len(rechunked) == len(parts) + for index, chunk in enumerate(rechunked): + assert chunk == parts[index]