Merge pull request #1639 from coreos-inc/swift-logging
Add better logging to blob uploads
This commit is contained in:
commit
640012103c
4 changed files with 58 additions and 55 deletions
|
@ -18,7 +18,6 @@ from util.cache import cache_control
|
||||||
from util.registry.filelike import wrap_with_handler, StreamSlice
|
from util.registry.filelike import wrap_with_handler, StreamSlice
|
||||||
from util.registry.gzipstream import calculate_size_handler
|
from util.registry.gzipstream import calculate_size_handler
|
||||||
from util.registry.torrent import PieceHasher
|
from util.registry.torrent import PieceHasher
|
||||||
from storage.basestorage import InvalidChunkException
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -134,12 +133,15 @@ def start_blob_upload(namespace_name, repo_name):
|
||||||
return accepted
|
return accepted
|
||||||
else:
|
else:
|
||||||
# The user plans to send us the entire body right now
|
# The user plans to send us the entire body right now
|
||||||
uploaded, error = _upload_chunk(namespace_name, repo_name, new_upload_uuid)
|
blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, new_upload_uuid)
|
||||||
uploaded.save()
|
blob_upload.save()
|
||||||
if error:
|
|
||||||
_range_not_satisfiable(uploaded.byte_count)
|
|
||||||
|
|
||||||
return _finish_upload(namespace_name, repo_name, uploaded, digest)
|
if upload_error:
|
||||||
|
logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s',
|
||||||
|
namespace_name, repo_name, new_upload_uuid, upload_error)
|
||||||
|
_range_not_satisfiable(blob_upload.byte_count)
|
||||||
|
|
||||||
|
return _finish_upload(namespace_name, repo_name, blob_upload, digest)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['GET'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['GET'])
|
||||||
|
@ -192,6 +194,7 @@ def _parse_range_header(range_header_text):
|
||||||
def _upload_chunk(namespace_name, repo_name, upload_uuid):
|
def _upload_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
""" Common code among the various uploading paths for appending data to blobs.
|
""" Common code among the various uploading paths for appending data to blobs.
|
||||||
Callers MUST call .save() or .delete_instance() on the returned database object.
|
Callers MUST call .save() or .delete_instance() on the returned database object.
|
||||||
|
Returns the BlobUpload object and the error that occurred, if any (or None if none).
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
|
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
|
||||||
|
@ -211,6 +214,7 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
|
|
||||||
location_set = {found.location.name}
|
location_set = {found.location.name}
|
||||||
|
|
||||||
|
upload_error = None
|
||||||
with database.CloseForLongOperation(app.config):
|
with database.CloseForLongOperation(app.config):
|
||||||
input_fp = get_input_stream(request)
|
input_fp = get_input_stream(request)
|
||||||
|
|
||||||
|
@ -227,7 +231,7 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
# We use this to escape early in case we have already processed all of the bytes the user
|
# We use this to escape early in case we have already processed all of the bytes the user
|
||||||
# wants to upload
|
# wants to upload
|
||||||
if length == 0:
|
if length == 0:
|
||||||
return found
|
return found, None
|
||||||
|
|
||||||
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
||||||
|
|
||||||
|
@ -252,14 +256,10 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
size_info, fn = calculate_size_handler()
|
size_info, fn = calculate_size_handler()
|
||||||
input_fp = wrap_with_handler(input_fp, fn)
|
input_fp = wrap_with_handler(input_fp, fn)
|
||||||
|
|
||||||
try:
|
chunk_result = storage.stream_upload_chunk(location_set, upload_uuid, start_offset, length,
|
||||||
length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid,
|
input_fp, found.storage_metadata,
|
||||||
start_offset, length,
|
content_type=BLOB_CONTENT_TYPE)
|
||||||
input_fp,
|
length_written, new_metadata, upload_error = chunk_result
|
||||||
found.storage_metadata,
|
|
||||||
content_type=BLOB_CONTENT_TYPE)
|
|
||||||
except InvalidChunkException:
|
|
||||||
_range_not_satisfiable(found.byte_count)
|
|
||||||
|
|
||||||
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
|
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
|
||||||
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
|
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
|
||||||
|
@ -277,7 +277,7 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
found.storage_metadata = new_metadata
|
found.storage_metadata = new_metadata
|
||||||
found.byte_count += length_written
|
found.byte_count += length_written
|
||||||
found.chunk_count += 1
|
found.chunk_count += 1
|
||||||
return found, error
|
return found, upload_error
|
||||||
|
|
||||||
|
|
||||||
def _finish_upload(namespace_name, repo_name, upload_obj, expected_digest):
|
def _finish_upload(namespace_name, repo_name, upload_obj, expected_digest):
|
||||||
|
@ -330,15 +330,17 @@ def _finish_upload(namespace_name, repo_name, upload_obj, expected_digest):
|
||||||
@require_repo_write
|
@require_repo_write
|
||||||
@anon_protect
|
@anon_protect
|
||||||
def upload_chunk(namespace_name, repo_name, upload_uuid):
|
def upload_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
upload, error = _upload_chunk(namespace_name, repo_name, upload_uuid)
|
blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid)
|
||||||
upload.save()
|
blob_upload.save()
|
||||||
|
|
||||||
if error:
|
if upload_error:
|
||||||
_range_not_satisfiable(upload.byte_count)
|
logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s',
|
||||||
|
namespace_name, repo_name, upload_uuid, upload_error)
|
||||||
|
_range_not_satisfiable(blob_upload.byte_count)
|
||||||
|
|
||||||
accepted = make_response('', 204)
|
accepted = make_response('', 204)
|
||||||
accepted.headers['Location'] = _current_request_path()
|
accepted.headers['Location'] = _current_request_path()
|
||||||
accepted.headers['Range'] = _render_range(upload.byte_count, with_bytes_prefix=False)
|
accepted.headers['Range'] = _render_range(blob_upload.byte_count, with_bytes_prefix=False)
|
||||||
accepted.headers['Docker-Upload-UUID'] = upload_uuid
|
accepted.headers['Docker-Upload-UUID'] = upload_uuid
|
||||||
return accepted
|
return accepted
|
||||||
|
|
||||||
|
@ -353,13 +355,15 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
if digest is None:
|
if digest is None:
|
||||||
raise BlobUploadInvalid()
|
raise BlobUploadInvalid()
|
||||||
|
|
||||||
found, error = _upload_chunk(namespace_name, repo_name, upload_uuid)
|
blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid)
|
||||||
|
blob_upload.save()
|
||||||
|
|
||||||
if error:
|
if upload_error:
|
||||||
found.save()
|
logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s',
|
||||||
_range_not_satisfiable(found.byte_count)
|
namespace_name, repo_name, upload_uuid, upload_error)
|
||||||
|
_range_not_satisfiable(blob_upload.byte_count)
|
||||||
|
|
||||||
return _finish_upload(namespace_name, repo_name, found, digest)
|
return _finish_upload(namespace_name, repo_name, blob_upload, digest)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
|
||||||
|
|
|
@ -101,10 +101,6 @@ class BaseStorage(StoragePaths):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
class InvalidChunkException(RuntimeError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class BaseStorageV2(BaseStorage):
|
class BaseStorageV2(BaseStorage):
|
||||||
def initiate_chunked_upload(self):
|
def initiate_chunked_upload(self):
|
||||||
""" Start a new chunked upload, returning the uuid and any associated storage metadata
|
""" Start a new chunked upload, returning the uuid and any associated storage metadata
|
||||||
|
@ -113,9 +109,9 @@ class BaseStorageV2(BaseStorage):
|
||||||
|
|
||||||
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
|
||||||
""" Upload the specified amount of data from the given file pointer to the chunked destination
|
""" Upload the specified amount of data from the given file pointer to the chunked destination
|
||||||
specified, starting at the given offset. Returns the number of bytes uploaded, and a new
|
specified, starting at the given offset. Returns the number of bytes uploaded, a new
|
||||||
version of the storage_metadata. Raises InvalidChunkException if the offset or length can
|
version of the storage_metadata and an error object (if one occurred or None if none).
|
||||||
not be accepted. Pass length as -1 to upload as much data from the in_fp as possible.
|
Pass length as -1 to upload as much data from the in_fp as possible.
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
|
@ -174,7 +174,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
|
|
||||||
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
||||||
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
||||||
error = None
|
write_error = None
|
||||||
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
|
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
|
||||||
|
|
||||||
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
|
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
|
||||||
|
@ -199,9 +199,9 @@ class _CloudStorage(BaseStorageV2):
|
||||||
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
|
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
|
||||||
total_bytes_written += bytes_staged
|
total_bytes_written += bytes_staged
|
||||||
num_part += 1
|
num_part += 1
|
||||||
except IOError as ex:
|
except IOError as e:
|
||||||
logger.warn('stream write error: %s', ex)
|
logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e)
|
||||||
error = ex
|
write_error = e
|
||||||
|
|
||||||
if self._metric_queue is not None:
|
if self._metric_queue is not None:
|
||||||
self._metric_queue.put_deprecated('MultipartUploadFailure', 1)
|
self._metric_queue.put_deprecated('MultipartUploadFailure', 1)
|
||||||
|
@ -209,7 +209,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
|
|
||||||
if cancel_on_error:
|
if cancel_on_error:
|
||||||
mp.cancel_upload()
|
mp.cancel_upload()
|
||||||
return 0, error
|
return 0, write_error
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -220,7 +220,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
|
|
||||||
mp.complete_upload()
|
mp.complete_upload()
|
||||||
|
|
||||||
return total_bytes_written, error
|
return total_bytes_written, write_error
|
||||||
|
|
||||||
def exists(self, path):
|
def exists(self, path):
|
||||||
self._initialize_cloud_conn()
|
self._initialize_cloud_conn()
|
||||||
|
@ -295,8 +295,9 @@ class _CloudStorage(BaseStorageV2):
|
||||||
|
|
||||||
# We are going to upload each chunk to a separate key
|
# We are going to upload each chunk to a separate key
|
||||||
chunk_path = self._rel_upload_path(str(uuid4()))
|
chunk_path = self._rel_upload_path(str(uuid4()))
|
||||||
bytes_written, error = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False,
|
bytes_written, write_error = self._stream_write_internal(chunk_path, in_fp,
|
||||||
size=length, content_type=content_type)
|
cancel_on_error=False, size=length,
|
||||||
|
content_type=content_type)
|
||||||
|
|
||||||
new_metadata = copy.deepcopy(storage_metadata)
|
new_metadata = copy.deepcopy(storage_metadata)
|
||||||
|
|
||||||
|
@ -304,7 +305,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
if bytes_written > 0:
|
if bytes_written > 0:
|
||||||
new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
|
new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
|
||||||
|
|
||||||
return bytes_written, new_metadata, error
|
return bytes_written, new_metadata, write_error
|
||||||
|
|
||||||
def _chunk_generator(self, chunk_list):
|
def _chunk_generator(self, chunk_list):
|
||||||
for chunk in chunk_list:
|
for chunk in chunk_list:
|
||||||
|
|
|
@ -91,8 +91,8 @@ class SwiftStorage(BaseStorage):
|
||||||
_, obj = self._get_connection().get_object(self._swift_container, path,
|
_, obj = self._get_connection().get_object(self._swift_container, path,
|
||||||
resp_chunk_size=chunk_size)
|
resp_chunk_size=chunk_size)
|
||||||
return obj
|
return obj
|
||||||
except Exception:
|
except Exception as ex:
|
||||||
logger.exception('Could not get object: %s', path)
|
logger.exception('Could not get object at path %s: %s', path, ex)
|
||||||
raise IOError('Path %s not found' % path)
|
raise IOError('Path %s not found' % path)
|
||||||
|
|
||||||
def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None,
|
def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None,
|
||||||
|
@ -111,16 +111,16 @@ class SwiftStorage(BaseStorage):
|
||||||
# We re-raise client exception here so that validation of config during setup can see
|
# We re-raise client exception here so that validation of config during setup can see
|
||||||
# the client exception messages.
|
# the client exception messages.
|
||||||
raise
|
raise
|
||||||
except Exception:
|
except Exception as ex:
|
||||||
logger.exception('Could not put object: %s', path)
|
logger.exception('Could not put object at path %s: %s', path, ex)
|
||||||
raise IOError("Could not put content: %s" % path)
|
raise IOError("Could not put content: %s" % path)
|
||||||
|
|
||||||
def _head_object(self, path):
|
def _head_object(self, path):
|
||||||
path = self._normalize_path(path)
|
path = self._normalize_path(path)
|
||||||
try:
|
try:
|
||||||
return self._get_connection().head_object(self._swift_container, path)
|
return self._get_connection().head_object(self._swift_container, path)
|
||||||
except Exception:
|
except Exception as ex:
|
||||||
logger.exception('Could not head object: %s', path)
|
logger.exception('Could not head object at path %s: %s', path, ex)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_direct_download_url(self, object_path, expires_in=60, requires_cors=False, head=False):
|
def get_direct_download_url(self, object_path, expires_in=60, requires_cors=False, head=False):
|
||||||
|
@ -233,22 +233,24 @@ class SwiftStorage(BaseStorage):
|
||||||
return random_uuid, metadata
|
return random_uuid, metadata
|
||||||
|
|
||||||
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
|
||||||
error = None
|
|
||||||
|
|
||||||
if length == 0:
|
if length == 0:
|
||||||
return 0, storage_metadata, error
|
return 0, storage_metadata, None
|
||||||
|
|
||||||
# Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we
|
# Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we
|
||||||
# are finished hitting the data limit.
|
# are finished hitting the data limit.
|
||||||
total_bytes_written = 0
|
total_bytes_written = 0
|
||||||
|
upload_error = None
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp,
|
bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp,
|
||||||
storage_metadata,
|
storage_metadata,
|
||||||
content_type)
|
content_type)
|
||||||
except IOError as ex:
|
except IOError as ex:
|
||||||
logger.warn('stream write error: %s', ex)
|
message = ('Error writing to stream in stream_upload_chunk for uuid %s (offset %s' +
|
||||||
error = ex
|
', length %s, metadata: %s): %s')
|
||||||
|
logger.exception(message, uuid, offset, length, storage_metadata, ex)
|
||||||
|
upload_error = ex
|
||||||
break
|
break
|
||||||
|
|
||||||
if length != filelike.READ_UNTIL_END:
|
if length != filelike.READ_UNTIL_END:
|
||||||
|
@ -257,9 +259,9 @@ class SwiftStorage(BaseStorage):
|
||||||
offset = offset + bytes_written
|
offset = offset + bytes_written
|
||||||
total_bytes_written = total_bytes_written + bytes_written
|
total_bytes_written = total_bytes_written + bytes_written
|
||||||
if bytes_written == 0 or length <= 0:
|
if bytes_written == 0 or length <= 0:
|
||||||
return total_bytes_written, storage_metadata, error
|
return total_bytes_written, storage_metadata, upload_error
|
||||||
|
|
||||||
return total_bytes_written, storage_metadata, error
|
return total_bytes_written, storage_metadata, upload_error
|
||||||
|
|
||||||
def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata, content_type):
|
def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata, content_type):
|
||||||
updated_metadata = copy.deepcopy(storage_metadata)
|
updated_metadata = copy.deepcopy(storage_metadata)
|
||||||
|
|
Reference in a new issue