Handle IOErrors in v2 uploads
This commit is contained in:
parent
35437c9f55
commit
2dcc1f13a6
6 changed files with 63 additions and 42 deletions
|
@ -124,8 +124,10 @@ def start_blob_upload(namespace, repo_name):
|
||||||
return accepted
|
return accepted
|
||||||
else:
|
else:
|
||||||
# The user plans to send us the entire body right now
|
# The user plans to send us the entire body right now
|
||||||
uploaded = _upload_chunk(namespace, repo_name, new_upload_uuid)
|
uploaded, error = _upload_chunk(namespace, repo_name, new_upload_uuid)
|
||||||
uploaded.save()
|
uploaded.save()
|
||||||
|
if error:
|
||||||
|
_range_not_satisfiable(uploaded.byte_count)
|
||||||
|
|
||||||
return _finish_upload(namespace, repo_name, uploaded, digest)
|
return _finish_upload(namespace, repo_name, uploaded, digest)
|
||||||
|
|
||||||
|
@ -210,16 +212,16 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
|
||||||
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
length_written, new_metadata = storage.stream_upload_chunk(location_set, upload_uuid,
|
length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid,
|
||||||
start_offset, length, input_fp,
|
start_offset, length, input_fp,
|
||||||
found.storage_metadata,
|
found.storage_metadata,
|
||||||
content_type=BLOB_CONTENT_TYPE)
|
content_type=BLOB_CONTENT_TYPE)
|
||||||
except InvalidChunkException:
|
except InvalidChunkException:
|
||||||
_range_not_satisfiable(found.byte_count)
|
_range_not_satisfiable(found.byte_count)
|
||||||
|
|
||||||
found.storage_metadata = new_metadata
|
found.storage_metadata = new_metadata
|
||||||
found.byte_count += length_written
|
found.byte_count += length_written
|
||||||
return found
|
return found, error
|
||||||
|
|
||||||
|
|
||||||
def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
|
def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
|
||||||
|
@ -263,9 +265,12 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
|
||||||
@require_repo_write
|
@require_repo_write
|
||||||
@anon_protect
|
@anon_protect
|
||||||
def upload_chunk(namespace, repo_name, upload_uuid):
|
def upload_chunk(namespace, repo_name, upload_uuid):
|
||||||
upload = _upload_chunk(namespace, repo_name, upload_uuid)
|
upload, error = _upload_chunk(namespace, repo_name, upload_uuid)
|
||||||
upload.save()
|
upload.save()
|
||||||
|
|
||||||
|
if error:
|
||||||
|
_range_not_satisfiable(upload.byte_count)
|
||||||
|
|
||||||
accepted = make_response('', 204)
|
accepted = make_response('', 204)
|
||||||
accepted.headers['Location'] = _current_request_path()
|
accepted.headers['Location'] = _current_request_path()
|
||||||
accepted.headers['Range'] = _render_range(upload.byte_count, with_bytes_prefix=False)
|
accepted.headers['Range'] = _render_range(upload.byte_count, with_bytes_prefix=False)
|
||||||
|
@ -282,7 +287,12 @@ def monolithic_upload_or_last_chunk(namespace, repo_name, upload_uuid):
|
||||||
if digest is None:
|
if digest is None:
|
||||||
raise BlobUploadInvalid()
|
raise BlobUploadInvalid()
|
||||||
|
|
||||||
found = _upload_chunk(namespace, repo_name, upload_uuid)
|
found, error = _upload_chunk(namespace, repo_name, upload_uuid)
|
||||||
|
|
||||||
|
if error:
|
||||||
|
found.save()
|
||||||
|
_range_not_satisfiable(found.byte_count)
|
||||||
|
|
||||||
return _finish_upload(namespace, repo_name, found, digest)
|
return _finish_upload(namespace, repo_name, found, digest)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -100,15 +100,11 @@ class BaseStorage(StoragePaths):
|
||||||
if size_to_read < 0:
|
if size_to_read < 0:
|
||||||
size_to_read = self.buffer_size
|
size_to_read = self.buffer_size
|
||||||
|
|
||||||
try:
|
buf = in_fp.read(size_to_read)
|
||||||
buf = in_fp.read(size_to_read)
|
if not buf:
|
||||||
if not buf:
|
|
||||||
break
|
|
||||||
out_fp.write(buf)
|
|
||||||
bytes_copied += len(buf)
|
|
||||||
except IOError as err:
|
|
||||||
logger.error('Failed to stream_write_to_fp: %s', err)
|
|
||||||
break
|
break
|
||||||
|
out_fp.write(buf)
|
||||||
|
bytes_copied += len(buf)
|
||||||
|
|
||||||
return bytes_copied
|
return bytes_copied
|
||||||
|
|
||||||
|
|
|
@ -166,17 +166,16 @@ class _CloudStorage(BaseStorageV2):
|
||||||
**self._upload_params)
|
**self._upload_params)
|
||||||
|
|
||||||
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
||||||
return self._stream_write_internal(path, fp, content_type, content_encoding)
|
self._stream_write_internal(path, fp, content_type, content_encoding)
|
||||||
|
|
||||||
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
||||||
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
||||||
|
error = None
|
||||||
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
|
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
|
||||||
|
|
||||||
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
|
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
|
||||||
buf = StringIO.StringIO()
|
buf = StringIO.StringIO()
|
||||||
|
|
||||||
chunk_sizes = []
|
|
||||||
|
|
||||||
num_part = 1
|
num_part = 1
|
||||||
total_bytes_written = 0
|
total_bytes_written = 0
|
||||||
while size == filelike.READ_UNTIL_END or total_bytes_written < size:
|
while size == filelike.READ_UNTIL_END or total_bytes_written < size:
|
||||||
|
@ -192,26 +191,24 @@ class _CloudStorage(BaseStorageV2):
|
||||||
if bytes_staged == 0:
|
if bytes_staged == 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
chunk_sizes.append(bytes_staged)
|
|
||||||
|
|
||||||
buf.seek(0)
|
buf.seek(0)
|
||||||
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
|
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
|
||||||
total_bytes_written += bytes_staged
|
total_bytes_written += bytes_staged
|
||||||
num_part += 1
|
num_part += 1
|
||||||
except IOError:
|
except IOError as ex:
|
||||||
|
logger.warn('stream write error: %s', ex)
|
||||||
|
error = ex
|
||||||
app.metric_queue.put('MultipartUploadFailure', 1)
|
app.metric_queue.put('MultipartUploadFailure', 1)
|
||||||
if cancel_on_error:
|
if cancel_on_error:
|
||||||
mp.cancel_upload()
|
mp.cancel_upload()
|
||||||
return 0
|
return 0, error
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
if total_bytes_written > 0:
|
if total_bytes_written > 0:
|
||||||
app.metric_queue.put('MultipartUploadSuccess', 1)
|
app.metric_queue.put('MultipartUploadSuccess', 1)
|
||||||
try:
|
mp.complete_upload()
|
||||||
mp.complete_upload()
|
return total_bytes_written, error
|
||||||
except:
|
|
||||||
logger.error('complete_upload failed: chunk_sizes: %s', chunk_sizes)
|
|
||||||
raise
|
|
||||||
return total_bytes_written
|
|
||||||
|
|
||||||
def list_directory(self, path=None):
|
def list_directory(self, path=None):
|
||||||
self._initialize_cloud_conn()
|
self._initialize_cloud_conn()
|
||||||
|
@ -307,8 +304,8 @@ class _CloudStorage(BaseStorageV2):
|
||||||
|
|
||||||
# We are going to upload each chunk to a separate key
|
# We are going to upload each chunk to a separate key
|
||||||
chunk_path = self._rel_upload_path(str(uuid4()))
|
chunk_path = self._rel_upload_path(str(uuid4()))
|
||||||
bytes_written = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False,
|
bytes_written, error = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False,
|
||||||
size=length, content_type=content_type)
|
size=length, content_type=content_type)
|
||||||
|
|
||||||
new_metadata = copy.deepcopy(storage_metadata)
|
new_metadata = copy.deepcopy(storage_metadata)
|
||||||
|
|
||||||
|
@ -316,7 +313,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
if bytes_written > 0:
|
if bytes_written > 0:
|
||||||
new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
|
new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
|
||||||
|
|
||||||
return bytes_written, new_metadata
|
return bytes_written, new_metadata, error
|
||||||
|
|
||||||
def _chunk_generator(self, chunk_list):
|
def _chunk_generator(self, chunk_list):
|
||||||
for chunk in chunk_list:
|
for chunk in chunk_list:
|
||||||
|
@ -480,9 +477,12 @@ class GoogleCloudStorage(_CloudStorage):
|
||||||
fp = filelike.StreamSlice(fp, 0, size)
|
fp = filelike.StreamSlice(fp, 0, size)
|
||||||
|
|
||||||
# TODO figure out how to handle cancel_on_error=False
|
# TODO figure out how to handle cancel_on_error=False
|
||||||
key.set_contents_from_stream(fp)
|
try:
|
||||||
|
key.set_contents_from_stream(fp)
|
||||||
|
except IOError as ex:
|
||||||
|
return 0, ex
|
||||||
|
|
||||||
return key.size
|
return key.size, None
|
||||||
|
|
||||||
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
||||||
self._initialize_cloud_conn()
|
self._initialize_cloud_conn()
|
||||||
|
|
|
@ -57,7 +57,10 @@ class FakeStorage(BaseStorageV2):
|
||||||
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
|
||||||
upload_storage = _FAKE_STORAGE_MAP[uuid]
|
upload_storage = _FAKE_STORAGE_MAP[uuid]
|
||||||
upload_storage.seek(offset)
|
upload_storage.seek(offset)
|
||||||
return self.stream_write_to_fp(in_fp, upload_storage, length), {}
|
try:
|
||||||
|
return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None
|
||||||
|
except IOError as ex:
|
||||||
|
return 0, {}, ex
|
||||||
|
|
||||||
def complete_chunked_upload(self, uuid, final_path, _):
|
def complete_chunked_upload(self, uuid, final_path, _):
|
||||||
_FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid]
|
_FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid]
|
||||||
|
|
|
@ -106,9 +106,12 @@ class LocalStorage(BaseStorageV2):
|
||||||
return new_uuid, {}
|
return new_uuid, {}
|
||||||
|
|
||||||
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
|
||||||
with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage:
|
try:
|
||||||
upload_storage.seek(offset)
|
with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage:
|
||||||
return self.stream_write_to_fp(in_fp, upload_storage, length), {}
|
upload_storage.seek(offset)
|
||||||
|
return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None
|
||||||
|
except IOError as ex:
|
||||||
|
return 0, {}, ex
|
||||||
|
|
||||||
def complete_chunked_upload(self, uuid, final_path, _):
|
def complete_chunked_upload(self, uuid, final_path, _):
|
||||||
content_path = self._rel_upload_path(uuid)
|
content_path = self._rel_upload_path(uuid)
|
||||||
|
|
|
@ -269,15 +269,22 @@ class SwiftStorage(BaseStorage):
|
||||||
return random_uuid, metadata
|
return random_uuid, metadata
|
||||||
|
|
||||||
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
|
||||||
|
error = None
|
||||||
|
|
||||||
if length == 0:
|
if length == 0:
|
||||||
return 0, storage_metadata
|
return 0, storage_metadata, error
|
||||||
|
|
||||||
# Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we
|
# Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we
|
||||||
# are finished hitting the data limit.
|
# are finished hitting the data limit.
|
||||||
total_bytes_written = 0
|
total_bytes_written = 0
|
||||||
while True:
|
while True:
|
||||||
bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp,
|
try:
|
||||||
storage_metadata, content_type)
|
bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp,
|
||||||
|
storage_metadata, content_type)
|
||||||
|
except IOError as ex:
|
||||||
|
logger.warn('stream write error: %s', ex)
|
||||||
|
error = ex
|
||||||
|
break
|
||||||
|
|
||||||
if length != filelike.READ_UNTIL_END:
|
if length != filelike.READ_UNTIL_END:
|
||||||
length = length - bytes_written
|
length = length - bytes_written
|
||||||
|
@ -285,7 +292,9 @@ class SwiftStorage(BaseStorage):
|
||||||
offset = offset + bytes_written
|
offset = offset + bytes_written
|
||||||
total_bytes_written = total_bytes_written + bytes_written
|
total_bytes_written = total_bytes_written + bytes_written
|
||||||
if bytes_written == 0 or length <= 0:
|
if bytes_written == 0 or length <= 0:
|
||||||
return total_bytes_written, storage_metadata
|
return total_bytes_written, storage_metadata, error
|
||||||
|
|
||||||
|
return total_bytes_written, storage_metadata, error
|
||||||
|
|
||||||
def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata, content_type):
|
def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata, content_type):
|
||||||
updated_metadata = copy.deepcopy(storage_metadata)
|
updated_metadata = copy.deepcopy(storage_metadata)
|
||||||
|
|
Reference in a new issue