From cbf7c2bf44683cabc51aa166cda7fce7263a3457 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 20 Jul 2016 17:53:43 -0400 Subject: [PATCH] Add better logging to blob uploads Fixes #1635 --- endpoints/v2/blob.py | 56 ++++++++++++++++++++++-------------------- storage/basestorage.py | 10 +++----- storage/cloud.py | 19 +++++++------- storage/swift.py | 28 +++++++++++---------- 4 files changed, 58 insertions(+), 55 deletions(-) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index aacf0d905..5e6109665 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -18,7 +18,6 @@ from util.cache import cache_control from util.registry.filelike import wrap_with_handler, StreamSlice from util.registry.gzipstream import calculate_size_handler from util.registry.torrent import PieceHasher -from storage.basestorage import InvalidChunkException logger = logging.getLogger(__name__) @@ -134,12 +133,15 @@ def start_blob_upload(namespace_name, repo_name): return accepted else: # The user plans to send us the entire body right now - uploaded, error = _upload_chunk(namespace_name, repo_name, new_upload_uuid) - uploaded.save() - if error: - _range_not_satisfiable(uploaded.byte_count) + blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, new_upload_uuid) + blob_upload.save() - return _finish_upload(namespace_name, repo_name, uploaded, digest) + if upload_error: + logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s', + namespace_name, repo_name, new_upload_uuid, upload_error) + _range_not_satisfiable(blob_upload.byte_count) + + return _finish_upload(namespace_name, repo_name, blob_upload, digest) @v2_bp.route('//blobs/uploads/', methods=['GET']) @@ -192,6 +194,7 @@ def _parse_range_header(range_header_text): def _upload_chunk(namespace_name, repo_name, upload_uuid): """ Common code among the various uploading paths for appending data to blobs. Callers MUST call .save() or .delete_instance() on the returned database object. + Returns the BlobUpload object and the error that occurred, if any (or None if none). """ try: found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid) @@ -211,6 +214,7 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid): location_set = {found.location.name} + upload_error = None with database.CloseForLongOperation(app.config): input_fp = get_input_stream(request) @@ -227,7 +231,7 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid): # We use this to escape early in case we have already processed all of the bytes the user # wants to upload if length == 0: - return found + return found, None input_fp = wrap_with_handler(input_fp, found.sha_state.update) @@ -252,14 +256,10 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid): size_info, fn = calculate_size_handler() input_fp = wrap_with_handler(input_fp, fn) - try: - length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid, - start_offset, length, - input_fp, - found.storage_metadata, - content_type=BLOB_CONTENT_TYPE) - except InvalidChunkException: - _range_not_satisfiable(found.byte_count) + chunk_result = storage.stream_upload_chunk(location_set, upload_uuid, start_offset, length, + input_fp, found.storage_metadata, + content_type=BLOB_CONTENT_TYPE) + length_written, new_metadata, upload_error = chunk_result # If we determined an uncompressed size and this is the first chunk, add it to the blob. # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. @@ -277,7 +277,7 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid): found.storage_metadata = new_metadata found.byte_count += length_written found.chunk_count += 1 - return found, error + return found, upload_error def _finish_upload(namespace_name, repo_name, upload_obj, expected_digest): @@ -330,15 +330,17 @@ def _finish_upload(namespace_name, repo_name, upload_obj, expected_digest): @require_repo_write @anon_protect def upload_chunk(namespace_name, repo_name, upload_uuid): - upload, error = _upload_chunk(namespace_name, repo_name, upload_uuid) - upload.save() + blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid) + blob_upload.save() - if error: - _range_not_satisfiable(upload.byte_count) + if upload_error: + logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s', + namespace_name, repo_name, upload_uuid, upload_error) + _range_not_satisfiable(blob_upload.byte_count) accepted = make_response('', 204) accepted.headers['Location'] = _current_request_path() - accepted.headers['Range'] = _render_range(upload.byte_count, with_bytes_prefix=False) + accepted.headers['Range'] = _render_range(blob_upload.byte_count, with_bytes_prefix=False) accepted.headers['Docker-Upload-UUID'] = upload_uuid return accepted @@ -353,13 +355,15 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): if digest is None: raise BlobUploadInvalid() - found, error = _upload_chunk(namespace_name, repo_name, upload_uuid) + blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid) + blob_upload.save() - if error: - found.save() - _range_not_satisfiable(found.byte_count) + if upload_error: + logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s', + namespace_name, repo_name, upload_uuid, upload_error) + _range_not_satisfiable(blob_upload.byte_count) - return _finish_upload(namespace_name, repo_name, found, digest) + return _finish_upload(namespace_name, repo_name, blob_upload, digest) @v2_bp.route('//blobs/uploads/', methods=['DELETE']) diff --git a/storage/basestorage.py b/storage/basestorage.py index 008eaba47..f7c070a24 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -101,10 +101,6 @@ class BaseStorage(StoragePaths): raise NotImplementedError -class InvalidChunkException(RuntimeError): - pass - - class BaseStorageV2(BaseStorage): def initiate_chunked_upload(self): """ Start a new chunked upload, returning the uuid and any associated storage metadata @@ -113,9 +109,9 @@ class BaseStorageV2(BaseStorage): def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None): """ Upload the specified amount of data from the given file pointer to the chunked destination - specified, starting at the given offset. Returns the number of bytes uploaded, and a new - version of the storage_metadata. Raises InvalidChunkException if the offset or length can - not be accepted. Pass length as -1 to upload as much data from the in_fp as possible. + specified, starting at the given offset. Returns the number of bytes uploaded, a new + version of the storage_metadata and an error object (if one occurred or None if none). + Pass length as -1 to upload as much data from the in_fp as possible. """ raise NotImplementedError diff --git a/storage/cloud.py b/storage/cloud.py index 5ef0249a8..6f7d185c4 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -174,7 +174,7 @@ class _CloudStorage(BaseStorageV2): def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, cancel_on_error=True, size=filelike.READ_UNTIL_END): - error = None + write_error = None mp = self.__initiate_multipart_upload(path, content_type, content_encoding) # We are going to reuse this but be VERY careful to only read the number of bytes written to it @@ -199,9 +199,9 @@ class _CloudStorage(BaseStorageV2): mp.upload_part_from_file(buf, num_part, size=bytes_staged) total_bytes_written += bytes_staged num_part += 1 - except IOError as ex: - logger.warn('stream write error: %s', ex) - error = ex + except IOError as e: + logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e) + write_error = e if self._metric_queue is not None: self._metric_queue.put_deprecated('MultipartUploadFailure', 1) @@ -209,7 +209,7 @@ class _CloudStorage(BaseStorageV2): if cancel_on_error: mp.cancel_upload() - return 0, error + return 0, write_error else: break @@ -220,7 +220,7 @@ class _CloudStorage(BaseStorageV2): mp.complete_upload() - return total_bytes_written, error + return total_bytes_written, write_error def exists(self, path): self._initialize_cloud_conn() @@ -295,8 +295,9 @@ class _CloudStorage(BaseStorageV2): # We are going to upload each chunk to a separate key chunk_path = self._rel_upload_path(str(uuid4())) - bytes_written, error = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False, - size=length, content_type=content_type) + bytes_written, write_error = self._stream_write_internal(chunk_path, in_fp, + cancel_on_error=False, size=length, + content_type=content_type) new_metadata = copy.deepcopy(storage_metadata) @@ -304,7 +305,7 @@ class _CloudStorage(BaseStorageV2): if bytes_written > 0: new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written)) - return bytes_written, new_metadata, error + return bytes_written, new_metadata, write_error def _chunk_generator(self, chunk_list): for chunk in chunk_list: diff --git a/storage/swift.py b/storage/swift.py index 9fb0dc6ed..742e7851c 100644 --- a/storage/swift.py +++ b/storage/swift.py @@ -91,8 +91,8 @@ class SwiftStorage(BaseStorage): _, obj = self._get_connection().get_object(self._swift_container, path, resp_chunk_size=chunk_size) return obj - except Exception: - logger.exception('Could not get object: %s', path) + except Exception as ex: + logger.exception('Could not get object at path %s: %s', path, ex) raise IOError('Path %s not found' % path) def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None, @@ -111,16 +111,16 @@ class SwiftStorage(BaseStorage): # We re-raise client exception here so that validation of config during setup can see # the client exception messages. raise - except Exception: - logger.exception('Could not put object: %s', path) + except Exception as ex: + logger.exception('Could not put object at path %s: %s', path, ex) raise IOError("Could not put content: %s" % path) def _head_object(self, path): path = self._normalize_path(path) try: return self._get_connection().head_object(self._swift_container, path) - except Exception: - logger.exception('Could not head object: %s', path) + except Exception as ex: + logger.exception('Could not head object at path %s: %s', path, ex) return None def get_direct_download_url(self, object_path, expires_in=60, requires_cors=False, head=False): @@ -233,22 +233,24 @@ class SwiftStorage(BaseStorage): return random_uuid, metadata def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None): - error = None - if length == 0: - return 0, storage_metadata, error + return 0, storage_metadata, None # Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we # are finished hitting the data limit. total_bytes_written = 0 + upload_error = None + while True: try: bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp, storage_metadata, content_type) except IOError as ex: - logger.warn('stream write error: %s', ex) - error = ex + message = ('Error writing to stream in stream_upload_chunk for uuid %s (offset %s' + + ', length %s, metadata: %s): %s') + logger.exception(message, uuid, offset, length, storage_metadata, ex) + upload_error = ex break if length != filelike.READ_UNTIL_END: @@ -257,9 +259,9 @@ class SwiftStorage(BaseStorage): offset = offset + bytes_written total_bytes_written = total_bytes_written + bytes_written if bytes_written == 0 or length <= 0: - return total_bytes_written, storage_metadata, error + return total_bytes_written, storage_metadata, upload_error - return total_bytes_written, storage_metadata, error + return total_bytes_written, storage_metadata, upload_error def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata, content_type): updated_metadata = copy.deepcopy(storage_metadata)