Merge pull request #1056 from coreos-inc/dont-hide-ioerror

Handle IOErrors in v2 uploads
This commit is contained in:
Silas Sewell 2015-12-14 14:46:46 -05:00
commit 881fd53714
6 changed files with 63 additions and 42 deletions

View file

@ -124,8 +124,10 @@ def start_blob_upload(namespace, repo_name):
return accepted return accepted
else: else:
# The user plans to send us the entire body right now # The user plans to send us the entire body right now
uploaded = _upload_chunk(namespace, repo_name, new_upload_uuid) uploaded, error = _upload_chunk(namespace, repo_name, new_upload_uuid)
uploaded.save() uploaded.save()
if error:
_range_not_satisfiable(uploaded.byte_count)
return _finish_upload(namespace, repo_name, uploaded, digest) return _finish_upload(namespace, repo_name, uploaded, digest)
@ -219,16 +221,16 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
input_fp = wrap_with_handler(input_fp, found.sha_state.update) input_fp = wrap_with_handler(input_fp, found.sha_state.update)
try: try:
length_written, new_metadata = storage.stream_upload_chunk(location_set, upload_uuid, length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid,
start_offset, length, input_fp, start_offset, length, input_fp,
found.storage_metadata, found.storage_metadata,
content_type=BLOB_CONTENT_TYPE) content_type=BLOB_CONTENT_TYPE)
except InvalidChunkException: except InvalidChunkException:
_range_not_satisfiable(found.byte_count) _range_not_satisfiable(found.byte_count)
found.storage_metadata = new_metadata found.storage_metadata = new_metadata
found.byte_count += length_written found.byte_count += length_written
return found return found, error
def _finish_upload(namespace, repo_name, upload_obj, expected_digest): def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
@ -272,9 +274,12 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
@require_repo_write @require_repo_write
@anon_protect @anon_protect
def upload_chunk(namespace, repo_name, upload_uuid): def upload_chunk(namespace, repo_name, upload_uuid):
upload = _upload_chunk(namespace, repo_name, upload_uuid) upload, error = _upload_chunk(namespace, repo_name, upload_uuid)
upload.save() upload.save()
if error:
_range_not_satisfiable(upload.byte_count)
accepted = make_response('', 204) accepted = make_response('', 204)
accepted.headers['Location'] = _current_request_path() accepted.headers['Location'] = _current_request_path()
accepted.headers['Range'] = _render_range(upload.byte_count, with_bytes_prefix=False) accepted.headers['Range'] = _render_range(upload.byte_count, with_bytes_prefix=False)
@ -291,7 +296,12 @@ def monolithic_upload_or_last_chunk(namespace, repo_name, upload_uuid):
if digest is None: if digest is None:
raise BlobUploadInvalid() raise BlobUploadInvalid()
found = _upload_chunk(namespace, repo_name, upload_uuid) found, error = _upload_chunk(namespace, repo_name, upload_uuid)
if error:
found.save()
_range_not_satisfiable(found.byte_count)
return _finish_upload(namespace, repo_name, found, digest) return _finish_upload(namespace, repo_name, found, digest)

View file

@ -100,15 +100,11 @@ class BaseStorage(StoragePaths):
if size_to_read < 0: if size_to_read < 0:
size_to_read = self.buffer_size size_to_read = self.buffer_size
try: buf = in_fp.read(size_to_read)
buf = in_fp.read(size_to_read) if not buf:
if not buf:
break
out_fp.write(buf)
bytes_copied += len(buf)
except IOError as err:
logger.error('Failed to stream_write_to_fp: %s', err)
break break
out_fp.write(buf)
bytes_copied += len(buf)
return bytes_copied return bytes_copied

View file

@ -166,17 +166,16 @@ class _CloudStorage(BaseStorageV2):
**self._upload_params) **self._upload_params)
def stream_write(self, path, fp, content_type=None, content_encoding=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
return self._stream_write_internal(path, fp, content_type, content_encoding) self._stream_write_internal(path, fp, content_type, content_encoding)
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END): cancel_on_error=True, size=filelike.READ_UNTIL_END):
error = None
mp = self.__initiate_multipart_upload(path, content_type, content_encoding) mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
# We are going to reuse this but be VERY careful to only read the number of bytes written to it # We are going to reuse this but be VERY careful to only read the number of bytes written to it
buf = StringIO.StringIO() buf = StringIO.StringIO()
chunk_sizes = []
num_part = 1 num_part = 1
total_bytes_written = 0 total_bytes_written = 0
while size == filelike.READ_UNTIL_END or total_bytes_written < size: while size == filelike.READ_UNTIL_END or total_bytes_written < size:
@ -192,26 +191,24 @@ class _CloudStorage(BaseStorageV2):
if bytes_staged == 0: if bytes_staged == 0:
break break
chunk_sizes.append(bytes_staged)
buf.seek(0) buf.seek(0)
mp.upload_part_from_file(buf, num_part, size=bytes_staged) mp.upload_part_from_file(buf, num_part, size=bytes_staged)
total_bytes_written += bytes_staged total_bytes_written += bytes_staged
num_part += 1 num_part += 1
except IOError: except IOError as ex:
logger.warn('stream write error: %s', ex)
error = ex
app.metric_queue.put('MultipartUploadFailure', 1) app.metric_queue.put('MultipartUploadFailure', 1)
if cancel_on_error: if cancel_on_error:
mp.cancel_upload() mp.cancel_upload()
return 0 return 0, error
else:
break
if total_bytes_written > 0: if total_bytes_written > 0:
app.metric_queue.put('MultipartUploadSuccess', 1) app.metric_queue.put('MultipartUploadSuccess', 1)
try: mp.complete_upload()
mp.complete_upload() return total_bytes_written, error
except:
logger.error('complete_upload failed: chunk_sizes: %s', chunk_sizes)
raise
return total_bytes_written
def list_directory(self, path=None): def list_directory(self, path=None):
self._initialize_cloud_conn() self._initialize_cloud_conn()
@ -307,8 +304,8 @@ class _CloudStorage(BaseStorageV2):
# We are going to upload each chunk to a separate key # We are going to upload each chunk to a separate key
chunk_path = self._rel_upload_path(str(uuid4())) chunk_path = self._rel_upload_path(str(uuid4()))
bytes_written = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False, bytes_written, error = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False,
size=length, content_type=content_type) size=length, content_type=content_type)
new_metadata = copy.deepcopy(storage_metadata) new_metadata = copy.deepcopy(storage_metadata)
@ -316,7 +313,7 @@ class _CloudStorage(BaseStorageV2):
if bytes_written > 0: if bytes_written > 0:
new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written)) new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
return bytes_written, new_metadata return bytes_written, new_metadata, error
def _chunk_generator(self, chunk_list): def _chunk_generator(self, chunk_list):
for chunk in chunk_list: for chunk in chunk_list:
@ -480,9 +477,12 @@ class GoogleCloudStorage(_CloudStorage):
fp = filelike.StreamSlice(fp, 0, size) fp = filelike.StreamSlice(fp, 0, size)
# TODO figure out how to handle cancel_on_error=False # TODO figure out how to handle cancel_on_error=False
key.set_contents_from_stream(fp) try:
key.set_contents_from_stream(fp)
except IOError as ex:
return 0, ex
return key.size return key.size, None
def complete_chunked_upload(self, uuid, final_path, storage_metadata): def complete_chunked_upload(self, uuid, final_path, storage_metadata):
self._initialize_cloud_conn() self._initialize_cloud_conn()

View file

@ -57,7 +57,10 @@ class FakeStorage(BaseStorageV2):
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None): def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
upload_storage = _FAKE_STORAGE_MAP[uuid] upload_storage = _FAKE_STORAGE_MAP[uuid]
upload_storage.seek(offset) upload_storage.seek(offset)
return self.stream_write_to_fp(in_fp, upload_storage, length), {} try:
return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None
except IOError as ex:
return 0, {}, ex
def complete_chunked_upload(self, uuid, final_path, _): def complete_chunked_upload(self, uuid, final_path, _):
_FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid] _FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid]

View file

@ -106,9 +106,12 @@ class LocalStorage(BaseStorageV2):
return new_uuid, {} return new_uuid, {}
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None): def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage: try:
upload_storage.seek(offset) with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage:
return self.stream_write_to_fp(in_fp, upload_storage, length), {} upload_storage.seek(offset)
return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None
except IOError as ex:
return 0, {}, ex
def complete_chunked_upload(self, uuid, final_path, _): def complete_chunked_upload(self, uuid, final_path, _):
content_path = self._rel_upload_path(uuid) content_path = self._rel_upload_path(uuid)

View file

@ -269,15 +269,22 @@ class SwiftStorage(BaseStorage):
return random_uuid, metadata return random_uuid, metadata
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None): def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
error = None
if length == 0: if length == 0:
return 0, storage_metadata return 0, storage_metadata, error
# Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we # Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we
# are finished hitting the data limit. # are finished hitting the data limit.
total_bytes_written = 0 total_bytes_written = 0
while True: while True:
bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp, try:
storage_metadata, content_type) bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp,
storage_metadata, content_type)
except IOError as ex:
logger.warn('stream write error: %s', ex)
error = ex
break
if length != filelike.READ_UNTIL_END: if length != filelike.READ_UNTIL_END:
length = length - bytes_written length = length - bytes_written
@ -285,7 +292,9 @@ class SwiftStorage(BaseStorage):
offset = offset + bytes_written offset = offset + bytes_written
total_bytes_written = total_bytes_written + bytes_written total_bytes_written = total_bytes_written + bytes_written
if bytes_written == 0 or length <= 0: if bytes_written == 0 or length <= 0:
return total_bytes_written, storage_metadata return total_bytes_written, storage_metadata, error
return total_bytes_written, storage_metadata, error
def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata, content_type): def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata, content_type):
updated_metadata = copy.deepcopy(storage_metadata) updated_metadata = copy.deepcopy(storage_metadata)