This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/v1/registry.py
Matt Jibson a821ad2b01 Return an error on failed S3 uploads
The previous change to this file didn't raise the error up to stream_write,
and so the complete_upload function still ran because the loop was only
broken. It errored because the data was already canceled. This is better
than what we had before, which was to silently fail but report success
(even internally to ourselves!) on bad image upload.

This means we discovered a bug where a user could have failed during image
upload, but quay would write that image to the repository, potentially
writing broken images to S3.
2015-09-01 15:53:32 -04:00

604 lines
21 KiB
Python

import logging
import json
import features
from flask import make_response, request, session, Response, redirect, abort as flask_abort
from functools import wraps
from datetime import datetime
from time import time
from app import storage as store, image_diff_queue, image_replication_queue, app
from auth.auth import process_auth, extract_namespace_repo_from_session
from auth.auth_context import get_authenticated_user, get_grant_user_context
from digest import checksums
from util.registry import changes
from util.http import abort, exact_abort
from auth.permissions import (ReadRepositoryPermission,
ModifyRepositoryPermission)
from data import model, database
from util.registry import gzipstream
from endpoints.v1 import v1_bp
from endpoints.decorators import anon_protect
logger = logging.getLogger(__name__)
class SocketReader(object):
def __init__(self, fp):
self._fp = fp
self.handlers = []
def add_handler(self, handler):
self.handlers.append(handler)
def read(self, n=-1):
buf = self._fp.read(n)
if not buf:
return ''
for handler in self.handlers:
handler(buf)
return buf
def tell(self):
raise IOError('Stream is not seekable.')
def image_is_uploading(repo_image):
if repo_image is None:
return False
return repo_image.storage.uploading
def set_uploading_flag(repo_image, is_image_uploading):
repo_image.storage.uploading = is_image_uploading
repo_image.storage.save()
def _finish_image(namespace, repository, repo_image):
# Checksum is ok, we remove the marker
set_uploading_flag(repo_image, False)
image_id = repo_image.docker_image_id
# The layer is ready for download, send a job to the work queue to
# process it.
logger.debug('Adding layer to diff queue')
repo = model.repository.get_repository(namespace, repository)
image_diff_queue.put([repo.namespace_user.username, repository, image_id], json.dumps({
'namespace_user_id': repo.namespace_user.id,
'repository': repository,
'image_id': image_id,
}))
# Send a job to the work queue to replicate the image layer.
if features.STORAGE_REPLICATION:
image_replication_queue.put([repo_image.storage.uuid], json.dumps({
'namespace_user_id': repo.namespace_user.id,
'storage_id': repo_image.storage.uuid,
}))
def require_completion(f):
"""This make sure that the image push correctly finished."""
@wraps(f)
def wrapper(namespace, repository, *args, **kwargs):
image_id = kwargs['image_id']
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
if image_is_uploading(repo_image):
abort(400, 'Image %(image_id)s is being uploaded, retry later',
issue='upload-in-progress', image_id=kwargs['image_id'])
return f(namespace, repository, *args, **kwargs)
return wrapper
def set_cache_headers(f):
"""Returns HTTP headers suitable for caching."""
@wraps(f)
def wrapper(*args, **kwargs):
# Set TTL to 1 year by default
ttl = 31536000
expires = datetime.fromtimestamp(int(time()) + ttl)
expires = expires.strftime('%a, %d %b %Y %H:%M:%S GMT')
headers = {
'Cache-Control': 'public, max-age={0}'.format(ttl),
'Expires': expires,
'Last-Modified': 'Thu, 01 Jan 1970 00:00:00 GMT',
}
if 'If-Modified-Since' in request.headers:
response = make_response('Not modified', 304)
response.headers.extend(headers)
return response
kwargs['headers'] = headers
# Prevent the Cookie to be sent when the object is cacheable
session.modified = False
return f(*args, **kwargs)
return wrapper
@v1_bp.route('/images/<image_id>/layer', methods=['HEAD'])
@process_auth
@extract_namespace_repo_from_session
@require_completion
@set_cache_headers
@anon_protect
def head_image_layer(namespace, repository, image_id, headers):
permission = ReadRepositoryPermission(namespace, repository)
logger.debug('Checking repo permissions')
if permission.can() or model.repository.repository_is_public(namespace, repository):
logger.debug('Looking up repo image')
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
if not repo_image:
logger.debug('Image not found')
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
extra_headers = {}
# Add the Accept-Ranges header if the storage engine supports resumable
# downloads.
if store.get_supports_resumable_downloads(repo_image.storage.locations):
logger.debug('Storage supports resumable downloads')
extra_headers['Accept-Ranges'] = 'bytes'
resp = make_response('')
resp.headers.extend(headers)
resp.headers.extend(extra_headers)
return resp
abort(403)
@v1_bp.route('/images/<image_id>/layer', methods=['GET'])
@process_auth
@extract_namespace_repo_from_session
@require_completion
@set_cache_headers
@anon_protect
def get_image_layer(namespace, repository, image_id, headers):
permission = ReadRepositoryPermission(namespace, repository)
logger.debug('Checking repo permissions')
if permission.can() or model.repository.repository_is_public(namespace, repository):
logger.debug('Looking up repo image')
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
if not repo_image:
logger.debug('Image not found')
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
logger.debug('Looking up the layer path')
try:
path = store.image_layer_path(repo_image.storage.uuid)
logger.debug('Looking up the direct download URL')
direct_download_url = store.get_direct_download_url(repo_image.storage.locations, path)
if direct_download_url:
logger.debug('Returning direct download URL')
resp = redirect(direct_download_url)
return resp
logger.debug('Streaming layer data')
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)
return Response(store.stream_read(repo_image.storage.locations, path), headers=headers)
except (IOError, AttributeError):
logger.exception('Image layer data not found')
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
abort(403)
@v1_bp.route('/images/<image_id>/layer', methods=['PUT'])
@process_auth
@extract_namespace_repo_from_session
@anon_protect
def put_image_layer(namespace, repository, image_id):
logger.debug('Checking repo permissions')
permission = ModifyRepositoryPermission(namespace, repository)
if not permission.can():
abort(403)
logger.debug('Retrieving image')
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
try:
logger.debug('Retrieving image data')
uuid = repo_image.storage.uuid
json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
except (IOError, AttributeError):
logger.exception('Exception when retrieving image data')
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
logger.debug('Retrieving image path info')
layer_path = store.image_layer_path(uuid)
if (store.exists(repo_image.storage.locations, layer_path) and not
image_is_uploading(repo_image)):
exact_abort(409, 'Image already exists')
logger.debug('Storing layer data')
input_stream = request.stream
if request.headers.get('transfer-encoding') == 'chunked':
# Careful, might work only with WSGI servers supporting chunked
# encoding (Gunicorn)
input_stream = request.environ['wsgi.input']
# Create a socket reader to read the input stream containing the layer data.
sr = SocketReader(input_stream)
# Add a handler that copies the data into a temp file. This is used to calculate the tarsum,
# which is only needed for older versions of Docker.
requires_tarsum = session.get('checksum_format') == 'tarsum'
if requires_tarsum:
tmp, tmp_hndlr = store.temp_store_handler()
sr.add_handler(tmp_hndlr)
# Add a handler to compute the compressed and uncompressed sizes of the layer.
size_info, size_hndlr = gzipstream.calculate_size_handler()
sr.add_handler(size_hndlr)
# Add a handler which computes the checksum.
h, sum_hndlr = checksums.simple_checksum_handler(json_data)
sr.add_handler(sum_hndlr)
# Stream write the data to storage.
with database.CloseForLongOperation(app.config):
try:
store.stream_write(repo_image.storage.locations, layer_path, sr)
except IOError:
logger.exception('Exception when writing image data')
abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id)
# Append the computed checksum.
csums = []
csums.append('sha256:{0}'.format(h.hexdigest()))
try:
# Save the size of the image.
model.image.set_image_size(image_id, namespace, repository, size_info.compressed_size,
size_info.uncompressed_size)
if requires_tarsum:
tmp.seek(0)
csums.append(checksums.compute_tarsum(tmp, json_data))
tmp.close()
except (IOError, checksums.TarError) as exc:
logger.debug('put_image_layer: Error when computing tarsum %s', exc)
if repo_image.storage.checksum is None:
# We don't have a checksum stored yet, that's fine skipping the check.
# Not removing the mark though, image is not downloadable yet.
session['checksum'] = csums
return make_response('true', 200)
checksum = repo_image.storage.checksum
# We check if the checksums provided matches one the one we computed
if checksum not in csums:
logger.warning('put_image_layer: Wrong checksum')
abort(400, 'Checksum mismatch; ignoring the layer for image %(image_id)s',
issue='checksum-mismatch', image_id=image_id)
# Mark the image as uploaded.
_finish_image(namespace, repository, repo_image)
return make_response('true', 200)
@v1_bp.route('/images/<image_id>/checksum', methods=['PUT'])
@process_auth
@extract_namespace_repo_from_session
@anon_protect
def put_image_checksum(namespace, repository, image_id):
logger.debug('Checking repo permissions')
permission = ModifyRepositoryPermission(namespace, repository)
if not permission.can():
abort(403)
# Docker Version < 0.10 (tarsum+sha):
old_checksum = request.headers.get('X-Docker-Checksum')
# Docker Version >= 0.10 (sha):
new_checksum = request.headers.get('X-Docker-Checksum-Payload')
# Store whether we need to calculate the tarsum.
if new_checksum:
session['checksum_format'] = 'sha256'
else:
session['checksum_format'] = 'tarsum'
checksum = new_checksum or old_checksum
if not checksum:
abort(400, "Missing checksum for image %(image_id)s", issue='missing-checksum',
image_id=image_id)
if not session.get('checksum'):
abort(400, 'Checksum not found in Cookie for image %(image_id)s',
issue='missing-checksum-cookie', image_id=image_id)
logger.debug('Looking up repo image')
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
if not repo_image or not repo_image.storage:
abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id)
uuid = repo_image.storage.uuid
logger.debug('Looking up repo layer data')
if not store.exists(repo_image.storage.locations, store.image_json_path(uuid)):
abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id)
logger.debug('Marking image path')
if not image_is_uploading(repo_image):
abort(409, 'Cannot set checksum for image %(image_id)s',
issue='image-write-error', image_id=image_id)
logger.debug('Storing image checksum')
err = store_checksum(repo_image.storage, checksum)
if err:
abort(400, err)
if checksum not in session.get('checksum', []):
logger.debug('session checksums: %s', session.get('checksum', []))
logger.debug('client supplied checksum: %s', checksum)
logger.debug('put_image_checksum: Wrong checksum')
abort(400, 'Checksum mismatch for image: %(image_id)s',
issue='checksum-mismatch', image_id=image_id)
# Mark the image as uploaded.
_finish_image(namespace, repository, repo_image)
return make_response('true', 200)
@v1_bp.route('/images/<image_id>/json', methods=['GET'])
@process_auth
@extract_namespace_repo_from_session
@require_completion
@set_cache_headers
@anon_protect
def get_image_json(namespace, repository, image_id, headers):
logger.debug('Checking repo permissions')
permission = ReadRepositoryPermission(namespace, repository)
if not permission.can() and not model.repository.repository_is_public(namespace, repository):
abort(403)
logger.debug('Looking up repo image')
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
logger.debug('Looking up repo layer data')
try:
uuid = repo_image.storage.uuid
data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
except (IOError, AttributeError):
flask_abort(404)
logger.debug('Looking up repo layer size')
size = repo_image.storage.image_size
headers['X-Docker-Size'] = str(size)
response = make_response(data, 200)
response.headers.extend(headers)
return response
@v1_bp.route('/images/<image_id>/ancestry', methods=['GET'])
@process_auth
@extract_namespace_repo_from_session
@require_completion
@set_cache_headers
@anon_protect
def get_image_ancestry(namespace, repository, image_id, headers):
logger.debug('Checking repo permissions')
permission = ReadRepositoryPermission(namespace, repository)
if not permission.can() and not model.repository.repository_is_public(namespace, repository):
abort(403)
logger.debug('Looking up repo image')
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
logger.debug('Looking up image data')
try:
uuid = repo_image.storage.uuid
data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
except (IOError, AttributeError):
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
logger.debug('Converting to <-> from JSON')
response = make_response(json.dumps(json.loads(data)), 200)
response.headers.extend(headers)
logger.debug('Done')
return response
def generate_ancestry(image_id, uuid, locations, parent_id=None, parent_uuid=None,
parent_locations=None):
if not parent_id:
store.put_content(locations, store.image_ancestry_path(uuid), json.dumps([image_id]))
return
data = store.get_content(parent_locations, store.image_ancestry_path(parent_uuid))
data = json.loads(data)
data.insert(0, image_id)
store.put_content(locations, store.image_ancestry_path(uuid), json.dumps(data))
def store_checksum(image_storage, checksum):
checksum_parts = checksum.split(':')
if len(checksum_parts) != 2:
return 'Invalid checksum format'
# We store the checksum
image_storage.checksum = checksum
image_storage.save()
@v1_bp.route('/images/<image_id>/json', methods=['PUT'])
@process_auth
@extract_namespace_repo_from_session
@anon_protect
def put_image_json(namespace, repository, image_id):
logger.debug('Checking repo permissions')
permission = ModifyRepositoryPermission(namespace, repository)
if not permission.can():
abort(403)
logger.debug('Parsing image JSON')
try:
data = json.loads(request.data.decode('utf8'))
except ValueError:
pass
if not data or not isinstance(data, dict):
abort(400, 'Invalid JSON for image: %(image_id)s\nJSON: %(json)s',
issue='invalid-request', image_id=image_id, json=request.data)
if 'id' not in data:
abort(400, 'Missing key `id` in JSON for image: %(image_id)s',
issue='invalid-request', image_id=image_id)
logger.debug('Looking up repo image')
repo = model.repository.get_repository(namespace, repository)
if repo is None:
abort(404, 'Repository does not exist: %(namespace)s/%(repository)s', issue='no-repo',
namespace=namespace, repository=repository)
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
if not repo_image:
username = (get_authenticated_user() and get_authenticated_user().username or
get_grant_user_context())
logger.debug('Image not found, creating image with initiating user context: %s', username)
repo_image = model.image.find_create_or_link_image(image_id, repo, username, {},
store.preferred_locations[0])
# Create a temporary tag to prevent this image from getting garbage collected while the push
# is in progress.
model.tag.create_temporary_hidden_tag(repo, repo_image,
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])
uuid = repo_image.storage.uuid
if image_id != data['id']:
abort(400, 'JSON data contains invalid id for image: %(image_id)s',
issue='invalid-request', image_id=image_id)
parent_id = data.get('parent')
parent_image = None
if parent_id:
logger.debug('Looking up parent image')
parent_image = model.image.get_repo_image_extended(namespace, repository, parent_id)
parent_uuid = parent_image and parent_image.storage.uuid
parent_locations = parent_image and parent_image.storage.locations
if parent_id:
logger.debug('Looking up parent image data')
if (parent_id and not
store.exists(parent_locations, store.image_json_path(parent_uuid))):
abort(400, 'Image %(image_id)s depends on non existing parent image %(parent_id)s',
issue='invalid-request', image_id=image_id, parent_id=parent_id)
logger.debug('Looking up image storage paths')
json_path = store.image_json_path(uuid)
logger.debug('Checking if image already exists')
if (store.exists(repo_image.storage.locations, json_path) and not
image_is_uploading(repo_image)):
exact_abort(409, 'Image already exists')
set_uploading_flag(repo_image, True)
# If we reach that point, it means that this is a new image or a retry
# on a failed push
# save the metadata
command_list = data.get('container_config', {}).get('Cmd', None)
command = json.dumps(command_list) if command_list else None
logger.debug('Setting image metadata')
model.image.set_image_metadata(image_id, namespace, repository, data.get('created'),
data.get('comment'), command, parent_image)
logger.debug('Putting json path')
store.put_content(repo_image.storage.locations, json_path, request.data)
logger.debug('Generating image ancestry')
try:
generate_ancestry(image_id, uuid, repo_image.storage.locations, parent_id, parent_uuid,
parent_locations)
except IOError as ioe:
logger.debug('Error when generating ancestry: %s', ioe.message)
abort(404)
logger.debug('Done')
return make_response('true', 200)
def process_image_changes(namespace, repository, image_id):
logger.debug('Generating diffs for image: %s', image_id)
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
if not repo_image:
logger.warning('No image for id: %s', image_id)
return None, None
uuid = repo_image.storage.uuid
image_diffs_path = store.image_file_diffs_path(uuid)
image_trie_path = store.image_file_trie_path(uuid)
if store.exists(repo_image.storage.locations, image_diffs_path):
logger.debug('Diffs already exist for image: %s', image_id)
return image_trie_path, repo_image.storage.locations
image = model.image.get_image_by_id(namespace, repository, image_id)
parents = model.image.get_parent_images(namespace, repository, image)
# Compute the diffs and fs for the parent first if necessary
parent_trie_path = None
if parents:
parent_trie_path, parent_locations = process_image_changes(namespace, repository,
parents[-1].docker_image_id)
# Read in the collapsed layer state of the filesystem for the parent
parent_trie = changes.empty_fs()
if parent_trie_path:
parent_trie_bytes = store.get_content(parent_locations, parent_trie_path)
parent_trie.frombytes(parent_trie_bytes)
# Read in the file entries from the layer tar file
layer_path = store.image_layer_path(uuid)
with store.stream_read_file(image.storage.locations, layer_path) as layer_tar_stream:
removed_files = set()
layer_files = changes.files_and_dirs_from_tar(layer_tar_stream,
removed_files)
new_metadata = changes.compute_new_diffs_and_fs(parent_trie, layer_files, removed_files)
(new_trie, added, changed, removed) = new_metadata
# Write out the new trie
store.put_content(image.storage.locations, image_trie_path, new_trie.tobytes())
# Write out the diffs
diffs = {}
sections = ('added', 'changed', 'removed')
for section, source_trie in zip(sections, new_metadata[1:]):
diffs[section] = list(source_trie)
diffs[section].sort()
store.put_content(image.storage.locations, image_diffs_path, json.dumps(diffs, indent=2))
return image_trie_path, image.storage.locations