diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 42439650a..56d6dbd10 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -124,8 +124,10 @@ def start_blob_upload(namespace, repo_name): return accepted else: # The user plans to send us the entire body right now - uploaded = _upload_chunk(namespace, repo_name, new_upload_uuid) + uploaded, error = _upload_chunk(namespace, repo_name, new_upload_uuid) uploaded.save() + if error: + _range_not_satisfiable(uploaded.byte_count) return _finish_upload(namespace, repo_name, uploaded, digest) @@ -210,16 +212,16 @@ def _upload_chunk(namespace, repo_name, upload_uuid): input_fp = wrap_with_handler(input_fp, found.sha_state.update) try: - length_written, new_metadata = storage.stream_upload_chunk(location_set, upload_uuid, - start_offset, length, input_fp, - found.storage_metadata, - content_type=BLOB_CONTENT_TYPE) + length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid, + start_offset, length, input_fp, + found.storage_metadata, + content_type=BLOB_CONTENT_TYPE) except InvalidChunkException: _range_not_satisfiable(found.byte_count) found.storage_metadata = new_metadata found.byte_count += length_written - return found + return found, error def _finish_upload(namespace, repo_name, upload_obj, expected_digest): @@ -263,9 +265,12 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest): @require_repo_write @anon_protect def upload_chunk(namespace, repo_name, upload_uuid): - upload = _upload_chunk(namespace, repo_name, upload_uuid) + upload, error = _upload_chunk(namespace, repo_name, upload_uuid) upload.save() + if error: + _range_not_satisfiable(upload.byte_count) + accepted = make_response('', 204) accepted.headers['Location'] = _current_request_path() accepted.headers['Range'] = _render_range(upload.byte_count, with_bytes_prefix=False) @@ -282,7 +287,12 @@ def monolithic_upload_or_last_chunk(namespace, repo_name, upload_uuid): if digest is None: raise BlobUploadInvalid() - found = _upload_chunk(namespace, repo_name, upload_uuid) + found, error = _upload_chunk(namespace, repo_name, upload_uuid) + + if error: + found.save() + _range_not_satisfiable(found.byte_count) + return _finish_upload(namespace, repo_name, found, digest) diff --git a/storage/basestorage.py b/storage/basestorage.py index 37d473177..dee4937c9 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -100,15 +100,11 @@ class BaseStorage(StoragePaths): if size_to_read < 0: size_to_read = self.buffer_size - try: - buf = in_fp.read(size_to_read) - if not buf: - break - out_fp.write(buf) - bytes_copied += len(buf) - except IOError as err: - logger.error('Failed to stream_write_to_fp: %s', err) + buf = in_fp.read(size_to_read) + if not buf: break + out_fp.write(buf) + bytes_copied += len(buf) return bytes_copied diff --git a/storage/cloud.py b/storage/cloud.py index 9b6c41eaa..4c0ee9d60 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -166,17 +166,16 @@ class _CloudStorage(BaseStorageV2): **self._upload_params) def stream_write(self, path, fp, content_type=None, content_encoding=None): - return self._stream_write_internal(path, fp, content_type, content_encoding) + self._stream_write_internal(path, fp, content_type, content_encoding) def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, cancel_on_error=True, size=filelike.READ_UNTIL_END): + error = None mp = self.__initiate_multipart_upload(path, content_type, content_encoding) # We are going to reuse this but be VERY careful to only read the number of bytes written to it buf = StringIO.StringIO() - chunk_sizes = [] - num_part = 1 total_bytes_written = 0 while size == filelike.READ_UNTIL_END or total_bytes_written < size: @@ -192,26 +191,24 @@ class _CloudStorage(BaseStorageV2): if bytes_staged == 0: break - chunk_sizes.append(bytes_staged) - buf.seek(0) mp.upload_part_from_file(buf, num_part, size=bytes_staged) total_bytes_written += bytes_staged num_part += 1 - except IOError: + except IOError as ex: + logger.warn('stream write error: %s', ex) + error = ex app.metric_queue.put('MultipartUploadFailure', 1) if cancel_on_error: mp.cancel_upload() - return 0 + return 0, error + else: + break if total_bytes_written > 0: app.metric_queue.put('MultipartUploadSuccess', 1) - try: - mp.complete_upload() - except: - logger.error('complete_upload failed: chunk_sizes: %s', chunk_sizes) - raise - return total_bytes_written + mp.complete_upload() + return total_bytes_written, error def list_directory(self, path=None): self._initialize_cloud_conn() @@ -307,8 +304,8 @@ class _CloudStorage(BaseStorageV2): # We are going to upload each chunk to a separate key chunk_path = self._rel_upload_path(str(uuid4())) - bytes_written = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False, - size=length, content_type=content_type) + bytes_written, error = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False, + size=length, content_type=content_type) new_metadata = copy.deepcopy(storage_metadata) @@ -316,7 +313,7 @@ class _CloudStorage(BaseStorageV2): if bytes_written > 0: new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written)) - return bytes_written, new_metadata + return bytes_written, new_metadata, error def _chunk_generator(self, chunk_list): for chunk in chunk_list: @@ -480,9 +477,12 @@ class GoogleCloudStorage(_CloudStorage): fp = filelike.StreamSlice(fp, 0, size) # TODO figure out how to handle cancel_on_error=False - key.set_contents_from_stream(fp) + try: + key.set_contents_from_stream(fp) + except IOError as ex: + return 0, ex - return key.size + return key.size, None def complete_chunked_upload(self, uuid, final_path, storage_metadata): self._initialize_cloud_conn() diff --git a/storage/fakestorage.py b/storage/fakestorage.py index e9f6305ea..12440a32c 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -57,7 +57,10 @@ class FakeStorage(BaseStorageV2): def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None): upload_storage = _FAKE_STORAGE_MAP[uuid] upload_storage.seek(offset) - return self.stream_write_to_fp(in_fp, upload_storage, length), {} + try: + return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None + except IOError as ex: + return 0, {}, ex def complete_chunked_upload(self, uuid, final_path, _): _FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid] diff --git a/storage/local.py b/storage/local.py index 7a4c0d10d..d574293bc 100644 --- a/storage/local.py +++ b/storage/local.py @@ -106,9 +106,12 @@ class LocalStorage(BaseStorageV2): return new_uuid, {} def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None): - with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage: - upload_storage.seek(offset) - return self.stream_write_to_fp(in_fp, upload_storage, length), {} + try: + with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage: + upload_storage.seek(offset) + return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None + except IOError as ex: + return 0, {}, ex def complete_chunked_upload(self, uuid, final_path, _): content_path = self._rel_upload_path(uuid) diff --git a/storage/swift.py b/storage/swift.py index 71631a608..472a8c92f 100644 --- a/storage/swift.py +++ b/storage/swift.py @@ -269,15 +269,22 @@ class SwiftStorage(BaseStorage): return random_uuid, metadata def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None): + error = None + if length == 0: - return 0, storage_metadata + return 0, storage_metadata, error # Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we # are finished hitting the data limit. total_bytes_written = 0 while True: - bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp, - storage_metadata, content_type) + try: + bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp, + storage_metadata, content_type) + except IOError as ex: + logger.warn('stream write error: %s', ex) + error = ex + break if length != filelike.READ_UNTIL_END: length = length - bytes_written @@ -285,7 +292,9 @@ class SwiftStorage(BaseStorage): offset = offset + bytes_written total_bytes_written = total_bytes_written + bytes_written if bytes_written == 0 or length <= 0: - return total_bytes_written, storage_metadata + return total_bytes_written, storage_metadata, error + + return total_bytes_written, storage_metadata, error def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata, content_type): updated_metadata = copy.deepcopy(storage_metadata)