From 3a0adfcb11bc3a24c07e7acfe3dcf732086c7704 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 31 Oct 2018 12:46:49 -0400 Subject: [PATCH] Fix stream_write to properly raise an exception on failure, instead of just silently failing This was causing problems for customers using georeplication over unstable storage engines Also adds tests for stream_write and copy, to ensure we detect failure --- storage/basestorage.py | 2 +- storage/cloud.py | 34 ++++++++++++++++++++++--- storage/test/test_cloud_storage.py | 41 ++++++++++++++++++++---------- 3 files changed, 59 insertions(+), 18 deletions(-) diff --git a/storage/basestorage.py b/storage/basestorage.py index 1ec0cfb77..6cc4e96ac 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -84,7 +84,7 @@ class BaseStorage(StoragePaths): def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END): """ Copy the specified number of bytes from the input file stream to the output stream. If - num_bytes < 0 copy until the stream ends. + num_bytes < 0 copy until the stream ends. Returns the number of bytes copied. """ bytes_copied = 0 while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END: diff --git a/storage/cloud.py b/storage/cloud.py index ed2f6305f..561ddd88f 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -182,12 +182,28 @@ class _CloudStorage(BaseStorageV2): **self._upload_params) def stream_write(self, path, fp, content_type=None, content_encoding=None): - self._stream_write_internal(path, fp, content_type, content_encoding) + """ Writes the data found in the file-like stream tot he given path. Raises an IOError + if the write fails. + """ + _, write_error = self._stream_write_internal(path, fp, content_type, content_encoding) + if write_error is not None: + logger.error('Error when trying to stream_write path `%s`: %s', path, write_error) + raise IOError('Exception when trying to stream_write path') def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, cancel_on_error=True, size=filelike.READ_UNTIL_END): + """ Writes the data found in the file-like stream to the given path, with optional limit + on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should + *not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check + the returned tuple on calls to this method. + """ write_error = None - mp = self.__initiate_multipart_upload(path, content_type, content_encoding) + + try: + mp = self.__initiate_multipart_upload(path, content_type, content_encoding) + except S3ResponseError as e: + logger.exception('Exception when initiating multipart upload') + return 0, e # We are going to reuse this but be VERY careful to only read the number of bytes written to it buf = StringIO.StringIO() @@ -211,7 +227,7 @@ class _CloudStorage(BaseStorageV2): mp.upload_part_from_file(buf, num_part, size=bytes_staged) total_bytes_written += bytes_staged num_part += 1 - except IOError as e: + except (S3ResponseError, IOError) as e: logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e) write_error = e @@ -219,7 +235,11 @@ class _CloudStorage(BaseStorageV2): self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) if cancel_on_error: - mp.cancel_upload() + try: + mp.cancel_upload() + except (S3ResponseError, IOError): + logger.exception('Could not cancel upload') + return 0, write_error else: break @@ -263,6 +283,7 @@ class _CloudStorage(BaseStorageV2): return k.etag[1:-1][:7] def copy_to(self, destination, path): + """ Copies the given path from this storage to the destination storage. """ self._initialize_cloud_conn() # First try to copy directly via boto, but only if the storages are the @@ -527,6 +548,11 @@ class GoogleCloudStorage(_CloudStorage): def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, cancel_on_error=True, size=filelike.READ_UNTIL_END): + """ Writes the data found in the file-like stream to the given path, with optional limit + on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should + *not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check + the returned tuple on calls to this method. + """ # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() path = self._init_path(path) diff --git a/storage/test/test_cloud_storage.py b/storage/test/test_cloud_storage.py index a57677c89..9db320161 100644 --- a/storage/test/test_cloud_storage.py +++ b/storage/test/test_cloud_storage.py @@ -7,6 +7,8 @@ import pytest import moto import boto +from moto import mock_s3 + from storage import S3Storage, StorageContext from storage.cloud import _CloudStorage, _PartUploadMetadata from storage.cloud import _CHUNKS_KEY @@ -56,28 +58,29 @@ def test_basicop(storage_engine): assert not storage_engine.exists(_TEST_PATH) -def test_copy_samecreds(storage_engine): +@pytest.mark.parametrize('bucket, username, password', [ + pytest.param(_TEST_BUCKET, _TEST_USER, _TEST_PASSWORD, id='same credentials'), + pytest.param('another_bucket', 'blech', 'password', id='different credentials'), +]) +def test_copy(bucket, username, password, storage_engine): # Copy the content to another engine. another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) - storage_engine.copy_to(another_engine, _TEST_PATH) - - # Verify it can be retrieved. - assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT - - -def test_copy_differentcreds(storage_engine): - # Copy the content to another engine. - another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech', - 'password') boto.connect_s3().create_bucket('another_bucket') - storage_engine.copy_to(another_engine, _TEST_PATH) # Verify it can be retrieved. assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT +def test_copy_with_error(storage_engine): + another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'anotherbucket', 'foo', + 'bar') + + with pytest.raises(IOError): + storage_engine.copy_to(another_engine, _TEST_PATH) + + def test_stream_read(storage_engine): # Read the streaming content. data = ''.join(storage_engine.stream_read(_TEST_PATH)) @@ -95,6 +98,18 @@ def test_stream_write(storage_engine): assert storage_engine.get_content(_TEST_PATH) == new_data +def test_stream_write_error(): + with mock_s3(): + # Create an engine but not the bucket. + engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) + + # Attempt to write to the uncreated bucket, which should raise an error. + with pytest.raises(IOError): + engine.stream_write(_TEST_PATH, StringIO('hello world'), content_type='Cool/Type') + + assert not engine.exists(_TEST_PATH) + + @pytest.mark.parametrize('chunk_count', [ 0, 1, @@ -107,7 +122,7 @@ def test_stream_write(storage_engine): def test_chunk_upload(storage_engine, chunk_count, force_client_side): if chunk_count == 0 and force_client_side: return - + upload_id, metadata = storage_engine.initiate_chunked_upload() final_data = ''