Fix verbs in manifestlist

All registry_tests now pass
This commit is contained in:
Joseph Schorr 2016-09-01 19:00:11 -04:00 committed by Jimmy Zelinskie
parent 783c9e7a73
commit 3c8b87e086
18 changed files with 517 additions and 247 deletions

View file

@ -9,7 +9,7 @@ from data.database import db
from auth.auth_context import get_authenticated_user
from endpoints.notificationhelper import spawn_notification
from util.names import escape_tag
from util.morecollections import AttrDict
logger = logging.getLogger(__name__)
@ -72,7 +72,13 @@ def start_build(repository, prepared_build, pull_robot_name=None):
model.log.log_action('build_dockerfile', repository.namespace_user.username,
ip=request.remote_addr, metadata=event_log_metadata, repository=repository)
spawn_notification(repository, 'build_queued', event_log_metadata,
# TODO(jzelinskie): remove when more endpoints have been converted to using interfaces
repo = AttrDict({
'namespace_name': repository.namespace_user.username,
'name': repository.name,
})
spawn_notification(repo, 'build_queued', event_log_metadata,
subpage='build/%s' % build_request.uuid,
pathargs=['build', build_request.uuid])

View file

@ -155,6 +155,10 @@ def put_image_layer(namespace, repository, image_id):
if model.storage_exists(namespace, repository, image_id):
exact_abort(409, 'Image already exists')
v1_metadata = model.docker_v1_metadata(namespace, repository, image_id)
if v1_metadata is None:
abort(404)
logger.debug('Storing layer data')
input_stream = request.stream
@ -182,7 +186,6 @@ def put_image_layer(namespace, repository, image_id):
sr.add_handler(piece_hasher.update)
# Add a handler which computes the checksum.
v1_metadata = model.docker_v1_metadata(namespace, repository, image_id)
h, sum_hndlr = checksums.simple_checksum_handler(v1_metadata.compat_json)
sr.add_handler(sum_hndlr)

View file

@ -104,7 +104,7 @@ def write_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
if manifest.tag != manifest_ref:
raise TagInvalid()
return _write_manifest(namespace_name, repo_name, manifest)
return _write_manifest_and_log(namespace_name, repo_name, manifest)
@v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['PUT'])
@ -113,16 +113,16 @@ def write_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def write_manifest_by_digest(namespace_name, repo_name, digest):
def write_manifest_by_digest(namespace_name, repo_name, manifest_ref):
try:
manifest = DockerSchema1Manifest(request.data)
except ManifestException as me:
raise ManifestInvalid(detail={'message': me.message})
if manifest.digest != digest:
if manifest.digest != manifest_ref:
raise ManifestInvalid(detail={'message': 'manifest digest mismatch'})
return _write_manifest(namespace_name, repo_name, manifest)
return _write_manifest_and_log(namespace_name, repo_name, manifest)
def _write_manifest(namespace_name, repo_name, manifest):
@ -178,6 +178,12 @@ def _write_manifest(namespace_name, repo_name, manifest):
model.save_manifest(namespace_name, repo_name, manifest.tag, leaf_layer_id, manifest.digest,
manifest.bytes)
return repo, storage_map
def _write_manifest_and_log(namespace_name, repo_name, manifest):
repo, storage_map = _write_manifest(namespace_name, repo_name, manifest)
# Queue all blob manifests for replication.
# TODO(jschorr): Find a way to optimize this insertion.
if features.STORAGE_REPLICATION:

View file

@ -1,5 +1,4 @@
import logging
import json
import hashlib
from flask import redirect, Blueprint, abort, send_file, make_response, request
@ -10,7 +9,8 @@ from app import app, signer, storage, metric_queue
from auth.auth import process_auth
from auth.auth_context import get_authenticated_user
from auth.permissions import ReadRepositoryPermission
from data import model, database
from data import database
from data.interfaces.verbs import PreOCIModel as model
from endpoints.common import route_show_if, parse_repository_name
from endpoints.decorators import anon_protect
from endpoints.trackhelper import track_and_log
@ -29,8 +29,7 @@ verbs = Blueprint('verbs', __name__)
logger = logging.getLogger(__name__)
def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image,
handlers):
def _open_stream(formatter, namespace, repository, tag, derived_image_id, repo_image, handlers):
"""
This method generates a stream of data which will be replicated and read from the queue files.
This method runs in a separate process.
@ -38,12 +37,7 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag
# For performance reasons, we load the full image list here, cache it, then disconnect from
# the database.
with database.UseThenDisconnect(app.config):
image_list = list(model.image.get_parent_images_with_placements(namespace, repository,
repo_image))
image_list.insert(0, repo_image)
def get_image_json(image):
return json.loads(image.v1_json_metadata)
image_list = list(model.get_manifest_layers_with_blobs(repo_image))
def get_next_image():
for current_image in image_list:
@ -52,18 +46,16 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag
def get_next_layer():
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue)
for current_image_entry in image_list:
current_image_path = model.storage.get_layer_path(current_image_entry.storage)
current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
for current_image in image_list:
current_image_path = model.get_blob_path(current_image.blob)
current_image_stream = store.stream_read_file(current_image.blob.locations,
current_image_path)
current_image_id = current_image_entry.id
logger.debug('Returning image layer %s (%s): %s', current_image_id,
current_image_entry.docker_image_id, current_image_path)
logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path)
yield current_image_stream
stream = formatter.build_stream(namespace, repository, tag, synthetic_image_id, image_json,
get_next_image, get_next_layer, get_image_json)
stream = formatter.build_stream(namespace, repository, tag, repo_image, derived_image_id,
get_next_image, get_next_layer)
for handler_fn in handlers:
stream = wrap_with_handler(stream, handler_fn)
@ -71,75 +63,58 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag
return stream.read
def _sign_synthetic_image(verb, linked_storage_uuid, queue_file):
def _sign_derived_image(verb, derived_image, queue_file):
""" Read from the queue file and sign the contents which are generated. This method runs in a
separate process. """
signature = None
try:
signature = signer.detached_sign(queue_file)
except:
logger.exception('Exception when signing %s image %s', verb, linked_storage_uuid)
logger.exception('Exception when signing %s deriving image %s', verb, derived_image.ref)
return
# Setup the database (since this is a new process) and then disconnect immediately
# once the operation completes.
if not queue_file.raised_exception:
with database.UseThenDisconnect(app.config):
try:
derived = model.storage.get_storage_by_uuid(linked_storage_uuid)
except model.storage.InvalidImageException:
return
signature_entry = model.storage.find_or_create_storage_signature(derived, signer.name)
signature_entry.signature = signature
signature_entry.uploading = False
signature_entry.save()
model.set_derived_image_signature(derived_image, signer.name, signature)
def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file):
def _write_derived_image_to_storage(verb, derived_image, queue_file):
""" Read from the generated stream and write it back to the storage engine. This method runs in a
separate process.
"""
def handle_exception(ex):
logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex)
logger.debug('Exception when building %s derived image %s: %s', verb, derived_image.ref, ex)
with database.UseThenDisconnect(app.config):
model.image.delete_derived_storage_by_uuid(linked_storage_uuid)
model.delete_derived_image(derived_image)
queue_file.add_exception_handler(handle_exception)
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue)
image_path = store.v1_image_layer_path(linked_storage_uuid)
store.stream_write(linked_locations, image_path, queue_file)
image_path = model.get_blob_path(derived_image.blob)
store.stream_write(derived_image.blob.locations, image_path, queue_file)
queue_file.close()
if not queue_file.raised_exception:
# Setup the database (since this is a new process) and then disconnect immediately
# once the operation completes.
with database.UseThenDisconnect(app.config):
done_uploading = model.storage.get_storage_by_uuid(linked_storage_uuid)
done_uploading.uploading = False
done_uploading.save()
def _torrent_for_storage(storage_ref, is_public):
""" Returns a response containing the torrent file contents for the given storage. May abort
def _torrent_for_blob(blob, is_public):
""" Returns a response containing the torrent file contents for the given blob. May abort
with an error if the state is not valid (e.g. non-public, non-user request).
"""
# Make sure the storage has a size.
if not storage_ref.image_size:
if not blob.size:
abort(404)
# Lookup the torrent information for the storage.
try:
torrent_info = model.storage.get_torrent_info(storage_ref)
except model.TorrentInfoDoesNotExist:
torrent_info = model.get_torrent_info(blob)
if torrent_info is None:
abort(404)
# Lookup the webseed path for the storage.
path = model.storage.get_layer_path(storage_ref)
webseed = storage.get_direct_download_url(storage_ref.locations, path,
path = model.get_blob_path(blob)
webseed = storage.get_direct_download_url(blob.locations, path,
expires_in=app.config['BITTORRENT_WEBSEED_LIFETIME'])
if webseed is None:
# We cannot support webseeds for storages that cannot provide direct downloads.
@ -147,17 +122,17 @@ def _torrent_for_storage(storage_ref, is_public):
# Build the filename for the torrent.
if is_public:
name = public_torrent_filename(storage_ref.uuid)
name = public_torrent_filename(blob.uuid)
else:
user = get_authenticated_user()
if not user:
abort(403)
name = per_user_torrent_filename(user.uuid, storage_ref.uuid)
name = per_user_torrent_filename(user.uuid, blob.uuid)
# Return the torrent file.
torrent_file = make_torrent(name, webseed, storage_ref.image_size,
torrent_info.piece_length, torrent_info.pieces)
torrent_file = make_torrent(name, webseed, blob.size, torrent_info.piece_length,
torrent_info.pieces)
headers = {'Content-Type': 'application/x-bittorrent',
'Content-Disposition': 'attachment; filename={0}.torrent'.format(name)}
@ -173,60 +148,46 @@ def _torrent_repo_verb(repo_image, tag, verb, **kwargs):
# Lookup an *existing* derived storage for the verb. If the verb's image storage doesn't exist,
# we cannot create it here, so we 406.
derived = model.image.find_derived_storage_for_image(repo_image, verb,
varying_metadata={'tag': tag})
if not derived:
derived_image = model.lookup_derived_image(repo_image, verb, varying_metadata={'tag': tag})
if derived_image is None:
abort(406)
# Return the torrent.
public_repo = model.repository.is_repository_public(repo_image.repository)
torrent = _torrent_for_storage(derived, public_repo)
public_repo = model.repository_is_public(repo_image.repository.namespace_name,
repo_image.repository.name)
torrent = _torrent_for_blob(derived_image.blob, public_repo)
# Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, torrent=True, **kwargs)
return torrent
def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None):
def _verify_repo_verb(_, namespace, repository, tag, verb, checker=None):
permission = ReadRepositoryPermission(namespace, repository)
if not permission.can() and not model.repository.repository_is_public(namespace, repository):
if not permission.can() and not model.repository_is_public(namespace, repository):
abort(403)
# Lookup the requested tag.
try:
tag_image = model.tag.get_tag_image(namespace, repository, tag)
except model.DataModelException:
abort(404)
# Lookup the tag's image and storage.
repo_image = model.image.get_repo_image_extended(namespace, repository, tag_image.docker_image_id)
if not repo_image:
tag_image = model.get_tag_image(namespace, repository, tag)
if tag_image is None:
abort(404)
# If there is a data checker, call it first.
image_json = None
if checker is not None:
image_json = json.loads(repo_image.v1_json_metadata)
if not checker(image_json):
if not checker(tag_image):
logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repository, tag, verb)
abort(404)
return (repo_image, tag_image, image_json)
return tag_image
def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs):
# Verify that the image exists and that we have access to it.
result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
(repo_image, _, _) = result
repo_image = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
# Lookup the derived image storage for the verb.
derived = model.image.find_derived_storage_for_image(repo_image, verb,
varying_metadata={'tag': tag})
if derived is None or derived.uploading:
# derived_image the derived image storage for the verb.
derived_image = model.lookup_derived_image(repo_image, verb, varying_metadata={'tag': tag})
if derived_image is None or derived_image.blob.uploading:
return make_response('', 202)
# Check if we have a valid signer configured.
@ -234,18 +195,17 @@ def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwarg
abort(404)
# Lookup the signature for the verb.
signature_entry = model.storage.lookup_storage_signature(derived, signer.name)
if signature_entry is None:
signature_value = model.get_derived_image_signature(derived_image, signer.name)
if signature_value is None:
abort(404)
# Return the signature.
return make_response(signature_entry.signature)
return make_response(signature_value)
def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs):
# Verify that the image exists and that we have access to it.
result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
(repo_image, tag_image, image_json) = result
repo_image = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
# Check for torrent. If found, we return a torrent for the repo verb image (if the derived
# image already exists).
@ -257,36 +217,30 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs)
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb])
# Lookup/create the derived image storage for the verb and repo image.
derived = model.image.find_or_create_derived_storage(repo_image, verb,
# Lookup/create the derived image for the verb and repo image.
derived_image = model.lookup_or_create_derived_image(repo_image, verb,
storage.preferred_locations[0],
varying_metadata={'tag': tag})
if not derived.uploading:
logger.debug('Derived %s image %s exists in storage', verb, derived.uuid)
derived_layer_path = model.storage.get_layer_path(derived)
if not derived_image.blob.uploading:
logger.debug('Derived %s image %s exists in storage', verb, derived_image.ref)
derived_layer_path = model.get_blob_path(derived_image.blob)
is_head_request = request.method == 'HEAD'
download_url = storage.get_direct_download_url(derived.locations, derived_layer_path,
download_url = storage.get_direct_download_url(derived_image.blob.locations, derived_layer_path,
head=is_head_request)
if download_url:
logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid)
logger.debug('Redirecting to download URL for derived %s image %s', verb, derived_image.ref)
return redirect(download_url)
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)
logger.debug('Sending cached derived %s image %s', verb, derived.uuid)
return send_file(storage.stream_read_file(derived.locations, derived_layer_path))
logger.debug('Sending cached derived %s image %s', verb, derived_image.ref)
return send_file(storage.stream_read_file(derived_image.blob.locations, derived_layer_path))
logger.debug('Building and returning derived %s image %s', verb, derived_image.ref)
logger.debug('Building and returning derived %s image %s', verb, derived.uuid)
# Load the image's JSON layer.
if not image_json:
image_json = json.loads(repo_image.v1_json_metadata)
# Calculate a synthetic image ID.
synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':' + verb).hexdigest()
# Calculate a derived image ID.
derived_image_id = hashlib.sha256(repo_image.image_id + ':' + verb).hexdigest()
def _cleanup():
# Close any existing DB connection once the process has exited.
@ -296,16 +250,14 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
def _store_metadata_and_cleanup():
with database.UseThenDisconnect(app.config):
model.storage.save_torrent_info(derived, app.config['BITTORRENT_PIECE_SIZE'],
hasher.final_piece_hashes())
derived.image_size = hasher.hashed_bytes
derived.save()
model.set_torrent_info(derived_image.blob, app.config['BITTORRENT_PIECE_SIZE'],
hasher.final_piece_hashes())
model.set_blob_size(derived_image.blob, hasher.hashed_bytes)
# Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage.
handlers = [hasher.update]
args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image,
handlers)
args = (formatter, namespace, repository, tag, derived_image_id, repo_image, handlers)
queue_process = QueueProcess(_open_stream,
8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max
args, finished=_store_metadata_and_cleanup)
@ -322,12 +274,12 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
queue_process.run()
# Start the storage saving.
storage_args = (verb, derived.uuid, derived.locations, storage_queue_file)
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup)
storage_args = (verb, derived_image, storage_queue_file)
QueueProcess.run_process(_write_derived_image_to_storage, storage_args, finished=_cleanup)
if sign and signer.name:
signing_args = (verb, derived.uuid, signing_queue_file)
QueueProcess.run_process(_sign_synthetic_image, signing_args, finished=_cleanup)
signing_args = (verb, derived_image, signing_queue_file)
QueueProcess.run_process(_sign_derived_image, signing_args, finished=_cleanup)
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)
@ -337,7 +289,9 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
def os_arch_checker(os, arch):
def checker(image_json):
def checker(repo_image):
image_json = repo_image.compat_metadata
# Verify the architecture and os.
operating_system = image_json.get('os', 'linux')
if operating_system != os:
@ -391,7 +345,7 @@ def get_squashed_tag(namespace, repository, tag):
@parse_repository_name()
def get_tag_torrent(namespace_name, repo_name, digest):
permission = ReadRepositoryPermission(namespace_name, repo_name)
public_repo = model.repository.repository_is_public(namespace_name, repo_name)
public_repo = model.repository_is_public(namespace_name, repo_name)
if not permission.can() and not public_repo:
abort(403)
@ -400,10 +354,9 @@ def get_tag_torrent(namespace_name, repo_name, digest):
# We can not generate a private torrent cluster without a user uuid (e.g. token auth)
abort(403)
try:
blob = model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest)
except model.BlobDoesNotExist:
blob = model.get_repo_blob_by_digest(namespace_name, repo_name, digest)
if blob is None:
abort(404)
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'torrent'])
return _torrent_for_storage(blob, public_repo)
return _torrent_for_blob(blob, public_repo)