Fix stream_write to properly raise an exception on failure, instead of just silently failing

This was causing problems for customers using georeplication over unstable storage engines

Also adds tests for stream_write and copy,  to ensure we detect failure
This commit is contained in:
Joseph Schorr 2018-10-31 12:46:49 -04:00
parent a048ff3633
commit 3a0adfcb11
3 changed files with 59 additions and 18 deletions

View file

@ -84,7 +84,7 @@ class BaseStorage(StoragePaths):
def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END): def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END):
""" Copy the specified number of bytes from the input file stream to the output stream. If """ Copy the specified number of bytes from the input file stream to the output stream. If
num_bytes < 0 copy until the stream ends. num_bytes < 0 copy until the stream ends. Returns the number of bytes copied.
""" """
bytes_copied = 0 bytes_copied = 0
while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END: while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END:

View file

@ -182,12 +182,28 @@ class _CloudStorage(BaseStorageV2):
**self._upload_params) **self._upload_params)
def stream_write(self, path, fp, content_type=None, content_encoding=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
self._stream_write_internal(path, fp, content_type, content_encoding) """ Writes the data found in the file-like stream tot he given path. Raises an IOError
if the write fails.
"""
_, write_error = self._stream_write_internal(path, fp, content_type, content_encoding)
if write_error is not None:
logger.error('Error when trying to stream_write path `%s`: %s', path, write_error)
raise IOError('Exception when trying to stream_write path')
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END): cancel_on_error=True, size=filelike.READ_UNTIL_END):
""" Writes the data found in the file-like stream to the given path, with optional limit
on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should
*not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check
the returned tuple on calls to this method.
"""
write_error = None write_error = None
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
try:
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
except S3ResponseError as e:
logger.exception('Exception when initiating multipart upload')
return 0, e
# We are going to reuse this but be VERY careful to only read the number of bytes written to it # We are going to reuse this but be VERY careful to only read the number of bytes written to it
buf = StringIO.StringIO() buf = StringIO.StringIO()
@ -211,7 +227,7 @@ class _CloudStorage(BaseStorageV2):
mp.upload_part_from_file(buf, num_part, size=bytes_staged) mp.upload_part_from_file(buf, num_part, size=bytes_staged)
total_bytes_written += bytes_staged total_bytes_written += bytes_staged
num_part += 1 num_part += 1
except IOError as e: except (S3ResponseError, IOError) as e:
logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e) logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e)
write_error = e write_error = e
@ -219,7 +235,11 @@ class _CloudStorage(BaseStorageV2):
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
if cancel_on_error: if cancel_on_error:
mp.cancel_upload() try:
mp.cancel_upload()
except (S3ResponseError, IOError):
logger.exception('Could not cancel upload')
return 0, write_error return 0, write_error
else: else:
break break
@ -263,6 +283,7 @@ class _CloudStorage(BaseStorageV2):
return k.etag[1:-1][:7] return k.etag[1:-1][:7]
def copy_to(self, destination, path): def copy_to(self, destination, path):
""" Copies the given path from this storage to the destination storage. """
self._initialize_cloud_conn() self._initialize_cloud_conn()
# First try to copy directly via boto, but only if the storages are the # First try to copy directly via boto, but only if the storages are the
@ -527,6 +548,11 @@ class GoogleCloudStorage(_CloudStorage):
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END): cancel_on_error=True, size=filelike.READ_UNTIL_END):
""" Writes the data found in the file-like stream to the given path, with optional limit
on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should
*not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check
the returned tuple on calls to this method.
"""
# Minimum size of upload part size on S3 is 5MB # Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)

View file

@ -7,6 +7,8 @@ import pytest
import moto import moto
import boto import boto
from moto import mock_s3
from storage import S3Storage, StorageContext from storage import S3Storage, StorageContext
from storage.cloud import _CloudStorage, _PartUploadMetadata from storage.cloud import _CloudStorage, _PartUploadMetadata
from storage.cloud import _CHUNKS_KEY from storage.cloud import _CHUNKS_KEY
@ -56,28 +58,29 @@ def test_basicop(storage_engine):
assert not storage_engine.exists(_TEST_PATH) assert not storage_engine.exists(_TEST_PATH)
def test_copy_samecreds(storage_engine): @pytest.mark.parametrize('bucket, username, password', [
pytest.param(_TEST_BUCKET, _TEST_USER, _TEST_PASSWORD, id='same credentials'),
pytest.param('another_bucket', 'blech', 'password', id='different credentials'),
])
def test_copy(bucket, username, password, storage_engine):
# Copy the content to another engine. # Copy the content to another engine.
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER, another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER,
_TEST_PASSWORD) _TEST_PASSWORD)
storage_engine.copy_to(another_engine, _TEST_PATH)
# Verify it can be retrieved.
assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT
def test_copy_differentcreds(storage_engine):
# Copy the content to another engine.
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech',
'password')
boto.connect_s3().create_bucket('another_bucket') boto.connect_s3().create_bucket('another_bucket')
storage_engine.copy_to(another_engine, _TEST_PATH) storage_engine.copy_to(another_engine, _TEST_PATH)
# Verify it can be retrieved. # Verify it can be retrieved.
assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT
def test_copy_with_error(storage_engine):
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'anotherbucket', 'foo',
'bar')
with pytest.raises(IOError):
storage_engine.copy_to(another_engine, _TEST_PATH)
def test_stream_read(storage_engine): def test_stream_read(storage_engine):
# Read the streaming content. # Read the streaming content.
data = ''.join(storage_engine.stream_read(_TEST_PATH)) data = ''.join(storage_engine.stream_read(_TEST_PATH))
@ -95,6 +98,18 @@ def test_stream_write(storage_engine):
assert storage_engine.get_content(_TEST_PATH) == new_data assert storage_engine.get_content(_TEST_PATH) == new_data
def test_stream_write_error():
with mock_s3():
# Create an engine but not the bucket.
engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
# Attempt to write to the uncreated bucket, which should raise an error.
with pytest.raises(IOError):
engine.stream_write(_TEST_PATH, StringIO('hello world'), content_type='Cool/Type')
assert not engine.exists(_TEST_PATH)
@pytest.mark.parametrize('chunk_count', [ @pytest.mark.parametrize('chunk_count', [
0, 0,
1, 1,
@ -107,7 +122,7 @@ def test_stream_write(storage_engine):
def test_chunk_upload(storage_engine, chunk_count, force_client_side): def test_chunk_upload(storage_engine, chunk_count, force_client_side):
if chunk_count == 0 and force_client_side: if chunk_count == 0 and force_client_side:
return return
upload_id, metadata = storage_engine.initiate_chunked_upload() upload_id, metadata = storage_engine.initiate_chunked_upload()
final_data = '' final_data = ''