Switch the checksums to use the registry computed value, remove all assumptions of namespaced paths for legacy storage, fix an upload race condition in the registry code.
This commit is contained in:
parent
246a216f80
commit
78c5aec5b9
11 changed files with 112 additions and 264 deletions
|
@ -38,27 +38,29 @@ class SocketReader(object):
|
|||
return buf
|
||||
|
||||
|
||||
def image_is_uploading(namespace, repository, image_id, repo_image):
|
||||
if repo_image and repo_image.storage and repo_image.storage.uploading is not None:
|
||||
def image_is_uploading(repo_image):
|
||||
if repo_image is None:
|
||||
return False
|
||||
|
||||
if repo_image.storage.uploading is not None:
|
||||
return repo_image.storage.uploading
|
||||
|
||||
logger.warning('Setting legacy upload flag')
|
||||
uuid = repo_image and repo_image.storage and repo_image.storage.uuid
|
||||
mark_path = store.image_mark_path(namespace, repository, image_id, uuid)
|
||||
logger.warning('Checking legacy upload flag')
|
||||
mark_path = store.image_mark_path(repo_image.storage.uuid)
|
||||
return store.exists(mark_path)
|
||||
|
||||
|
||||
def mark_upload_complete(namespace, repository, image_id, repo_image):
|
||||
if repo_image and repo_image.storage and repo_image.storage.uploading is not None:
|
||||
repo_image.storage.uploading = False
|
||||
repo_image.storage.save()
|
||||
else:
|
||||
def set_uploading_flag(repo_image, is_image_uploading):
|
||||
if repo_image.storage.uploading is None and not is_image_uploading:
|
||||
logger.warning('Removing legacy upload flag')
|
||||
uuid = repo_image and repo_image.storage and repo_image.storage.uuid
|
||||
mark_path = store.image_mark_path(namespace, repository, image_id, uuid)
|
||||
uuid = repo_image.storage.uuid
|
||||
mark_path = store.image_mark_path(uuid)
|
||||
if store.exists(mark_path):
|
||||
store.remove(mark_path)
|
||||
|
||||
repo_image.storage.uploading = is_image_uploading
|
||||
repo_image.storage.save()
|
||||
|
||||
|
||||
def require_completion(f):
|
||||
"""This make sure that the image push correctly finished."""
|
||||
|
@ -66,7 +68,7 @@ def require_completion(f):
|
|||
def wrapper(namespace, repository, *args, **kwargs):
|
||||
image_id = kwargs['image_id']
|
||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
||||
if image_is_uploading(namespace, repository, image_id, repo_image):
|
||||
if image_is_uploading(repo_image):
|
||||
abort(400, 'Image %(image_id)s is being uploaded, retry later',
|
||||
issue='upload-in-progress', image_id=kwargs['image_id'])
|
||||
|
||||
|
@ -111,21 +113,20 @@ def get_image_layer(namespace, repository, image_id, headers):
|
|||
profile.debug('Looking up repo image')
|
||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
||||
|
||||
uuid = repo_image and repo_image.storage and repo_image.storage.uuid
|
||||
|
||||
profile.debug('Looking up the layer path')
|
||||
path = store.image_layer_path(namespace, repository, image_id, uuid)
|
||||
|
||||
profile.debug('Looking up the direct download URL')
|
||||
direct_download_url = store.get_direct_download_url(path)
|
||||
|
||||
if direct_download_url:
|
||||
profile.debug('Returning direct download URL')
|
||||
return redirect(direct_download_url)
|
||||
try:
|
||||
path = store.image_layer_path(repo_image.storage.uuid)
|
||||
|
||||
profile.debug('Looking up the direct download URL')
|
||||
direct_download_url = store.get_direct_download_url(path)
|
||||
|
||||
if direct_download_url:
|
||||
profile.debug('Returning direct download URL')
|
||||
return redirect(direct_download_url)
|
||||
|
||||
profile.debug('Streaming layer data')
|
||||
return Response(store.stream_read(path), headers=headers)
|
||||
except IOError:
|
||||
except (IOError, AttributeError):
|
||||
profile.debug('Image not found')
|
||||
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
|
||||
image_id=image_id)
|
||||
|
@ -144,21 +145,19 @@ def put_image_layer(namespace, repository, image_id):
|
|||
|
||||
profile.debug('Retrieving image')
|
||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
||||
|
||||
uuid = repo_image and repo_image.storage and repo_image.storage.uuid
|
||||
try:
|
||||
profile.debug('Retrieving image data')
|
||||
json_data = store.get_content(store.image_json_path(namespace, repository,
|
||||
image_id, uuid))
|
||||
except IOError:
|
||||
uuid = repo_image.storage.uuid
|
||||
json_data = store.get_content(store.image_json_path(uuid))
|
||||
except (IOError, AttributeError):
|
||||
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
|
||||
image_id=image_id)
|
||||
|
||||
profile.debug('Retrieving image path info')
|
||||
layer_path = store.image_layer_path(namespace, repository, image_id, uuid)
|
||||
layer_path = store.image_layer_path(uuid)
|
||||
|
||||
if (store.exists(layer_path) and not
|
||||
image_is_uploading(namespace, repository, image_id, repo_image)):
|
||||
image_is_uploading(repo_image)):
|
||||
abort(409, 'Image already exists', issue='image-exists', image_id=image_id)
|
||||
|
||||
profile.debug('Storing layer data')
|
||||
|
@ -193,9 +192,7 @@ def put_image_layer(namespace, repository, image_id):
|
|||
'{0}'.format(e))
|
||||
|
||||
try:
|
||||
checksum = store.get_content(store.image_checksum_path(namespace,
|
||||
repository,
|
||||
image_id, uuid))
|
||||
checksum = store.get_content(store.image_checksum_path(uuid))
|
||||
except IOError:
|
||||
# We don't have a checksum stored yet, that's fine skipping the check.
|
||||
# Not removing the mark though, image is not downloadable yet.
|
||||
|
@ -209,7 +206,7 @@ def put_image_layer(namespace, repository, image_id):
|
|||
issue='checksum-mismatch', image_id=image_id)
|
||||
|
||||
# Checksum is ok, we remove the marker
|
||||
mark_upload_complete(namespace, repository, image_id, repo_image)
|
||||
set_uploading_flag(repo_image, False)
|
||||
|
||||
# The layer is ready for download, send a job to the work queue to
|
||||
# process it.
|
||||
|
@ -232,9 +229,11 @@ def put_image_checksum(namespace, repository, image_id):
|
|||
if not permission.can():
|
||||
abort(403)
|
||||
|
||||
checksum = request.headers.get('X-Docker-Checksum')
|
||||
checksum = (request.headers.get('X-Docker-Checksum-Payload', None) or
|
||||
request.headers.get('X-Docker-Checksum'))
|
||||
if not checksum:
|
||||
abort(400, "Missing checksum for image %(image_id)s", issue='missing-checksum', image_id=image_id)
|
||||
abort(400, "Missing checksum for image %(image_id)s", issue='missing-checksum',
|
||||
image_id=image_id)
|
||||
|
||||
if not session.get('checksum'):
|
||||
abort(400, 'Checksum not found in Cookie for image %(image_id)s',
|
||||
|
@ -242,21 +241,19 @@ def put_image_checksum(namespace, repository, image_id):
|
|||
|
||||
profile.debug('Looking up repo image')
|
||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
||||
|
||||
uuid = repo_image and repo_image.storage and repo_image.storage.uuid
|
||||
uuid = repo_image.storage.uuid
|
||||
|
||||
profile.debug('Looking up repo layer data')
|
||||
if not store.exists(store.image_json_path(namespace, repository, image_id,
|
||||
uuid)):
|
||||
if not store.exists(store.image_json_path(uuid)):
|
||||
abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id)
|
||||
|
||||
profile.debug('Marking image path')
|
||||
if not image_is_uploading(namespace, repository, image_id, repo_image):
|
||||
if not image_is_uploading(repo_image):
|
||||
abort(409, 'Cannot set checksum for image %(image_id)s',
|
||||
issue='image-write-error', image_id=image_id)
|
||||
|
||||
profile.debug('Storing image checksum')
|
||||
err = store_checksum(namespace, repository, image_id, uuid, checksum)
|
||||
err = store_checksum(repo_image.storage, checksum)
|
||||
if err:
|
||||
abort(400, err)
|
||||
|
||||
|
@ -268,7 +265,7 @@ def put_image_checksum(namespace, repository, image_id):
|
|||
issue='checksum-mismatch', image_id=image_id)
|
||||
|
||||
# Checksum is ok, we remove the marker
|
||||
mark_upload_complete(namespace, repository, image_id, repo_image)
|
||||
set_uploading_flag(repo_image, False)
|
||||
|
||||
# The layer is ready for download, send a job to the work queue to
|
||||
# process it.
|
||||
|
@ -296,13 +293,11 @@ def get_image_json(namespace, repository, image_id, headers):
|
|||
|
||||
profile.debug('Looking up repo image')
|
||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
||||
uuid = repo_image and repo_image.storage and repo_image.storage.uuid
|
||||
|
||||
profile.debug('Looking up repo layer data')
|
||||
try:
|
||||
data = store.get_content(store.image_json_path(namespace, repository,
|
||||
image_id, uuid))
|
||||
except IOError:
|
||||
data = store.get_content(store.image_json_path(repo_image.storage.uuid))
|
||||
except (IOError, AttributeError):
|
||||
flask_abort(404)
|
||||
|
||||
profile.debug('Looking up repo layer size')
|
||||
|
@ -312,12 +307,6 @@ def get_image_json(namespace, repository, image_id, headers):
|
|||
except OSError:
|
||||
pass
|
||||
|
||||
profile.debug('Retrieving checksum')
|
||||
checksum_path = store.image_checksum_path(namespace, repository, image_id,
|
||||
uuid)
|
||||
if store.exists(checksum_path):
|
||||
headers['X-Docker-Checksum'] = store.get_content(checksum_path)
|
||||
|
||||
response = make_response(data, 200)
|
||||
response.headers.extend(headers)
|
||||
return response
|
||||
|
@ -337,13 +326,11 @@ def get_image_ancestry(namespace, repository, image_id, headers):
|
|||
|
||||
profile.debug('Looking up repo image')
|
||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
||||
uuid = repo_image and repo_image.storage and repo_image.storage.uuid
|
||||
|
||||
profile.debug('Looking up image data')
|
||||
try:
|
||||
data = store.get_content(store.image_ancestry_path(namespace, repository,
|
||||
image_id, uuid))
|
||||
except IOError:
|
||||
data = store.get_content(store.image_ancestry_path(repo_image.storage.uuid))
|
||||
except (IOError, AttributeError):
|
||||
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
|
||||
image_id=image_id)
|
||||
|
||||
|
@ -355,32 +342,32 @@ def get_image_ancestry(namespace, repository, image_id, headers):
|
|||
return response
|
||||
|
||||
|
||||
def generate_ancestry(namespace, repository, image_id, uuid, parent_id=None,
|
||||
def generate_ancestry(image_id, uuid, parent_id=None,
|
||||
parent_uuid=None):
|
||||
if not parent_id:
|
||||
store.put_content(store.image_ancestry_path(namespace, repository,
|
||||
image_id, uuid),
|
||||
store.put_content(store.image_ancestry_path(uuid),
|
||||
json.dumps([image_id]))
|
||||
return
|
||||
data = store.get_content(store.image_ancestry_path(namespace, repository,
|
||||
parent_id, parent_uuid))
|
||||
data = store.get_content(store.image_ancestry_path(parent_uuid))
|
||||
data = json.loads(data)
|
||||
data.insert(0, image_id)
|
||||
store.put_content(store.image_ancestry_path(namespace, repository,
|
||||
image_id, uuid),
|
||||
store.put_content(store.image_ancestry_path(uuid),
|
||||
json.dumps(data))
|
||||
|
||||
|
||||
def store_checksum(namespace, repository, image_id, uuid, checksum):
|
||||
def store_checksum(image_storage, checksum):
|
||||
checksum_parts = checksum.split(':')
|
||||
if len(checksum_parts) != 2:
|
||||
return 'Invalid checksum format'
|
||||
|
||||
# We store the checksum
|
||||
checksum_path = store.image_checksum_path(namespace, repository, image_id,
|
||||
uuid)
|
||||
checksum_path = store.image_checksum_path(image_storage.uuid)
|
||||
store.put_content(checksum_path, checksum)
|
||||
|
||||
# And store it in the db
|
||||
image_storage.checksum = checksum
|
||||
image_storage.save()
|
||||
|
||||
|
||||
@registry.route('/images/<image_id>/json', methods=['PUT'])
|
||||
@process_auth
|
||||
|
@ -393,9 +380,10 @@ def put_image_json(namespace, repository, image_id):
|
|||
|
||||
profile.debug('Parsing image JSON')
|
||||
try:
|
||||
data = json.loads(request.data)
|
||||
except json.JSONDecodeError:
|
||||
data = json.loads(request.data.decode('utf8'))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if not data or not isinstance(data, dict):
|
||||
abort(400, 'Invalid JSON for image: %(image_id)s\nJSON: %(json)s',
|
||||
issue='invalid-request', image_id=image_id, json=request.data)
|
||||
|
@ -406,22 +394,8 @@ def put_image_json(namespace, repository, image_id):
|
|||
|
||||
profile.debug('Looking up repo image')
|
||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
||||
uuid = repo_image and repo_image.storage and repo_image.storage.uuid
|
||||
uuid = repo_image.storage.uuid
|
||||
|
||||
# Read the checksum
|
||||
checksum = request.headers.get('X-Docker-Checksum')
|
||||
if checksum:
|
||||
# Storing the checksum is optional at this stage
|
||||
profile.debug('Storing image checksum')
|
||||
err = store_checksum(namespace, repository, image_id, uuid, checksum)
|
||||
if err:
|
||||
abort(400, err, issue='write-error')
|
||||
|
||||
else:
|
||||
# We cleanup any old checksum in case it's a retry after a fail
|
||||
profile.debug('Cleanup old checksum')
|
||||
store.remove(store.image_checksum_path(namespace, repository, image_id,
|
||||
uuid))
|
||||
if image_id != data['id']:
|
||||
abort(400, 'JSON data contains invalid id for image: %(image_id)s',
|
||||
issue='invalid-request', image_id=image_id)
|
||||
|
@ -433,26 +407,33 @@ def put_image_json(namespace, repository, image_id):
|
|||
profile.debug('Looking up parent image')
|
||||
parent_image = model.get_repo_image(namespace, repository, parent_id)
|
||||
|
||||
parent_uuid = (parent_image and parent_image.storage and
|
||||
parent_image.storage.uuid)
|
||||
parent_uuid = parent_image and parent_image.storage.uuid
|
||||
|
||||
if parent_id:
|
||||
profile.debug('Looking up parent image data')
|
||||
|
||||
if (parent_id and not
|
||||
store.exists(store.image_json_path(namespace, repository, parent_id,
|
||||
parent_uuid))):
|
||||
store.exists(store.image_json_path(parent_uuid))):
|
||||
abort(400, 'Image %(image_id)s depends on non existing parent image %(parent_id)s',
|
||||
issue='invalid-request', image_id=image_id, parent_id=parent_id)
|
||||
|
||||
profile.debug('Looking up image storage paths')
|
||||
json_path = store.image_json_path(namespace, repository, image_id, uuid)
|
||||
json_path = store.image_json_path(uuid)
|
||||
|
||||
profile.debug('Checking if image already exists')
|
||||
if (store.exists(json_path) and not
|
||||
image_is_uploading(namespace, repository, image_id, repo_image)):
|
||||
image_is_uploading(repo_image)):
|
||||
abort(409, 'Image already exists', issue='image-exists', image_id=image_id)
|
||||
|
||||
set_uploading_flag(repo_image, True)
|
||||
|
||||
# We cleanup any old checksum in case it's a retry after a fail
|
||||
profile.debug('Cleanup old checksum')
|
||||
try:
|
||||
store.remove(store.image_checksum_path(uuid))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# If we reach that point, it means that this is a new image or a retry
|
||||
# on a failed push
|
||||
# save the metadata
|
||||
|
@ -468,8 +449,7 @@ def put_image_json(namespace, repository, image_id):
|
|||
store.put_content(json_path, request.data)
|
||||
|
||||
profile.debug('Generating image ancestry')
|
||||
generate_ancestry(namespace, repository, image_id, uuid, parent_id,
|
||||
parent_uuid)
|
||||
generate_ancestry(image_id, uuid, parent_id, parent_uuid)
|
||||
|
||||
profile.debug('Done')
|
||||
return make_response('true', 200)
|
||||
|
@ -479,12 +459,10 @@ def process_image_changes(namespace, repository, image_id):
|
|||
logger.debug('Generating diffs for image: %s' % image_id)
|
||||
|
||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
||||
uuid = repo_image and repo_image.storage and repo_image.storage.uuid
|
||||
uuid = repo_image.storage.uuid
|
||||
|
||||
image_diffs_path = store.image_file_diffs_path(namespace, repository,
|
||||
image_id, uuid)
|
||||
image_trie_path = store.image_file_trie_path(namespace, repository,
|
||||
image_id, uuid)
|
||||
image_diffs_path = store.image_file_diffs_path(uuid)
|
||||
image_trie_path = store.image_file_trie_path(uuid)
|
||||
|
||||
if store.exists(image_diffs_path):
|
||||
logger.debug('Diffs already exist for image: %s' % image_id)
|
||||
|
@ -506,7 +484,7 @@ def process_image_changes(namespace, repository, image_id):
|
|||
parent_trie.frombytes(parent_trie_bytes)
|
||||
|
||||
# Read in the file entries from the layer tar file
|
||||
layer_path = store.image_layer_path(namespace, repository, image_id, uuid)
|
||||
layer_path = store.image_layer_path(uuid)
|
||||
with store.stream_read_file(layer_path) as layer_tar_stream:
|
||||
removed_files = set()
|
||||
layer_files = changes.files_and_dirs_from_tar(layer_tar_stream,
|
||||
|
|
Reference in a new issue