Move verbs endpoint to use new registry data model

This commit is contained in:
Joseph Schorr 2018-08-28 22:58:19 -04:00
parent bafab2e734
commit f252b0b16f
14 changed files with 145 additions and 469 deletions

View file

@ -254,6 +254,7 @@ def get_storage_by_uuid(storage_uuid):
def get_layer_path(storage_record): def get_layer_path(storage_record):
""" Returns the path in the storage engine to the layer data referenced by the storage row. """ """ Returns the path in the storage engine to the layer data referenced by the storage row. """
assert storage_record.cas_path is not None
return get_layer_path_for_storage(storage_record.uuid, storage_record.cas_path, return get_layer_path_for_storage(storage_record.uuid, storage_record.cas_path,
storage_record.content_checksum) storage_record.content_checksum)
@ -280,7 +281,8 @@ def lookup_repo_storages_by_content_checksum(repo, checksums):
query_alias = 'q{0}'.format(counter) query_alias = 'q{0}'.format(counter)
candidate_subq = (ImageStorage candidate_subq = (ImageStorage
.select(ImageStorage.id, ImageStorage.content_checksum, .select(ImageStorage.id, ImageStorage.content_checksum,
ImageStorage.image_size, ImageStorage.uuid) ImageStorage.image_size, ImageStorage.uuid, ImageStorage.cas_path,
ImageStorage.uncompressed_size)
.join(Image) .join(Image)
.where(Image.repository == repo, ImageStorage.content_checksum == checksum) .where(Image.repository == repo, ImageStorage.content_checksum == checksum)
.limit(1) .limit(1)

View file

@ -28,6 +28,7 @@ class RepositoryReference(datatype('Repository', [])):
def _repository_obj(self): def _repository_obj(self):
return model.repository.lookup_repository(self._db_id) return model.repository.lookup_repository(self._db_id)
@property
def namespace_name(self): def namespace_name(self):
""" Returns the namespace name of this repository. """ Returns the namespace name of this repository.
""" """

View file

@ -398,6 +398,8 @@ class PreOCIModel(RegistryDataInterface):
return None return None
image_storage = storage_map[digest_str] image_storage = storage_map[digest_str]
assert image_storage.cas_path is not None
placements = None placements = None
if include_placements: if include_placements:
placements = list(model.storage.get_storage_locations(image_storage.uuid)) placements = list(model.storage.get_storage_locations(image_storage.uuid))
@ -551,6 +553,8 @@ class PreOCIModel(RegistryDataInterface):
except model.BlobDoesNotExist: except model.BlobDoesNotExist:
return None return None
assert image_storage.cas_path is not None
placements = None placements = None
if include_placements: if include_placements:
placements = list(model.storage.get_storage_locations(image_storage.uuid)) placements = list(model.storage.get_storage_locations(image_storage.uuid))

View file

@ -35,6 +35,8 @@ def test_find_matching_tag(names, expected, pre_oci_model):
assert found is None assert found is None
else: else:
assert found.name in expected assert found.name in expected
assert found.repository.namespace_name == 'devtable'
assert found.repository.name == 'simple'
@pytest.mark.parametrize('repo_namespace, repo_name, expected', [ @pytest.mark.parametrize('repo_namespace, repo_name, expected', [

View file

@ -10,13 +10,14 @@ from auth.auth_context import get_authenticated_user
from auth.decorators import process_auth from auth.decorators import process_auth
from auth.permissions import ReadRepositoryPermission from auth.permissions import ReadRepositoryPermission
from data import database from data import database
from data import model
from data.registry_model import registry_model
from endpoints.decorators import anon_protect, anon_allowed, route_show_if, parse_repository_name from endpoints.decorators import anon_protect, anon_allowed, route_show_if, parse_repository_name
from endpoints.verbs.models_pre_oci import pre_oci_model as model
from endpoints.v2.blob import BLOB_DIGEST_ROUTE from endpoints.v2.blob import BLOB_DIGEST_ROUTE
from image.appc import AppCImageFormatter from image.appc import AppCImageFormatter
from image.docker.squashed import SquashedDockerImageFormatter from image.docker.squashed import SquashedDockerImageFormatter
from storage import Storage from storage import Storage
from util.audit import track_and_log from util.audit import track_and_log, wrap_repository
from util.http import exact_abort from util.http import exact_abort
from util.registry.filelike import wrap_with_handler from util.registry.filelike import wrap_with_handler
from util.registry.queuefile import QueueFile from util.registry.queuefile import QueueFile
@ -40,7 +41,7 @@ class VerbReporter(TarLayerFormatterReporter):
metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count]) metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count])
def _open_stream(formatter, repo_image, tag, derived_image_id, handlers, reporter): def _open_stream(formatter, tag, manifest, derived_image_id, handlers, reporter):
""" """
This method generates a stream of data which will be replicated and read from the queue files. This method generates a stream of data which will be replicated and read from the queue files.
This method runs in a separate process. This method runs in a separate process.
@ -48,29 +49,25 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers, reporte
# For performance reasons, we load the full image list here, cache it, then disconnect from # For performance reasons, we load the full image list here, cache it, then disconnect from
# the database. # the database.
with database.UseThenDisconnect(app.config): with database.UseThenDisconnect(app.config):
image_list = list(model.get_manifest_layers_with_blobs(repo_image)) layers = registry_model.list_manifest_layers(manifest, include_placements=True)
def get_next_image(): def image_stream_getter(store, blob):
for current_image in image_list:
yield current_image
def image_stream_getter(store, current_image):
def get_stream_for_storage(): def get_stream_for_storage():
current_image_path = model.get_blob_path(current_image.blob) current_image_stream = store.stream_read_file(blob.placements, blob.storage_path)
current_image_stream = store.stream_read_file(current_image.blob.locations, logger.debug('Returning blob %s: %s', blob.digest, blob.storage_path)
current_image_path)
logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path)
return current_image_stream return current_image_stream
return get_stream_for_storage return get_stream_for_storage
def tar_stream_getter_iterator(): def tar_stream_getter_iterator():
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver) store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver)
for current_image in image_list:
yield image_stream_getter(store, current_image)
stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image, # Note: We reverse because we have to start at the leaf layer and move upward,
# as per the spec for the formatters.
for layer in reversed(layers):
yield image_stream_getter(store, layer.blob)
stream = formatter.build_stream(tag, manifest, derived_image_id, layers,
tar_stream_getter_iterator, reporter=reporter) tar_stream_getter_iterator, reporter=reporter)
for handler_fn in handlers: for handler_fn in handlers:
@ -86,14 +83,14 @@ def _sign_derived_image(verb, derived_image, queue_file):
try: try:
signature = signer.detached_sign(queue_file) signature = signer.detached_sign(queue_file)
except: except:
logger.exception('Exception when signing %s deriving image %s', verb, derived_image.ref) logger.exception('Exception when signing %s deriving image %s', verb, derived_image)
return return
# Setup the database (since this is a new process) and then disconnect immediately # Setup the database (since this is a new process) and then disconnect immediately
# once the operation completes. # once the operation completes.
if not queue_file.raised_exception: if not queue_file.raised_exception:
with database.UseThenDisconnect(app.config): with database.UseThenDisconnect(app.config):
model.set_derived_image_signature(derived_image, signer.name, signature) registry_model.set_derived_image_signature(derived_image, signer.name, signature)
def _write_derived_image_to_storage(verb, derived_image, queue_file): def _write_derived_image_to_storage(verb, derived_image, queue_file):
@ -102,17 +99,16 @@ def _write_derived_image_to_storage(verb, derived_image, queue_file):
""" """
def handle_exception(ex): def handle_exception(ex):
logger.debug('Exception when building %s derived image %s: %s', verb, derived_image.ref, ex) logger.debug('Exception when building %s derived image %s: %s', verb, derived_image, ex)
with database.UseThenDisconnect(app.config): with database.UseThenDisconnect(app.config):
model.delete_derived_image(derived_image) registry_model.delete_derived_image(derived_image)
queue_file.add_exception_handler(handle_exception) queue_file.add_exception_handler(handle_exception)
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver) store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver)
image_path = model.get_blob_path(derived_image.blob) store.stream_write(derived_image.blob.placements, derived_image.blob.storage_path, queue_file)
store.stream_write(derived_image.blob.locations, image_path, queue_file)
queue_file.close() queue_file.close()
@ -121,17 +117,16 @@ def _torrent_for_blob(blob, is_public):
with an error if the state is not valid (e.g. non-public, non-user request). with an error if the state is not valid (e.g. non-public, non-user request).
""" """
# Make sure the storage has a size. # Make sure the storage has a size.
if not blob.size: if not blob.compressed_size:
abort(404) abort(404)
# Lookup the torrent information for the storage. # Lookup the torrent information for the storage.
torrent_info = model.get_torrent_info(blob) torrent_info = registry_model.get_torrent_info(blob)
if torrent_info is None: if torrent_info is None:
abort(404) abort(404)
# Lookup the webseed path for the storage. # Lookup the webseed path for the storage.
path = model.get_blob_path(blob) webseed = storage.get_direct_download_url(blob.placements, blob.storage_path,
webseed = storage.get_direct_download_url(blob.locations, path,
expires_in=app.config['BITTORRENT_WEBSEED_LIFETIME']) expires_in=app.config['BITTORRENT_WEBSEED_LIFETIME'])
if webseed is None: if webseed is None:
# We cannot support webseeds for storages that cannot provide direct downloads. # We cannot support webseeds for storages that cannot provide direct downloads.
@ -151,8 +146,8 @@ def _torrent_for_blob(blob, is_public):
name = per_user_torrent_filename(torrent_config, user.uuid, blob.uuid) name = per_user_torrent_filename(torrent_config, user.uuid, blob.uuid)
# Return the torrent file. # Return the torrent file.
torrent_file = make_torrent(torrent_config, name, webseed, blob.size, torrent_info.piece_length, torrent_file = make_torrent(torrent_config, name, webseed, blob.compressed_size,
torrent_info.pieces) torrent_info.piece_length, torrent_info.pieces)
headers = { headers = {
'Content-Type': 'application/x-bittorrent', 'Content-Type': 'application/x-bittorrent',
@ -161,7 +156,7 @@ def _torrent_for_blob(blob, is_public):
return make_response(torrent_file, 200, headers) return make_response(torrent_file, 200, headers)
def _torrent_repo_verb(repo_image, tag, verb, **kwargs): def _torrent_repo_verb(repository, tag, manifest, verb, **kwargs):
""" Handles returning a torrent for the given verb on the given image and tag. """ """ Handles returning a torrent for the given verb on the given image and tag. """
if not features.BITTORRENT: if not features.BITTORRENT:
# Torrent feature is not enabled. # Torrent feature is not enabled.
@ -169,60 +164,73 @@ def _torrent_repo_verb(repo_image, tag, verb, **kwargs):
# Lookup an *existing* derived storage for the verb. If the verb's image storage doesn't exist, # Lookup an *existing* derived storage for the verb. If the verb's image storage doesn't exist,
# we cannot create it here, so we 406. # we cannot create it here, so we 406.
derived_image = model.lookup_derived_image(repo_image, verb, varying_metadata={'tag': tag}) derived_image = registry_model.lookup_derived_image(manifest, verb,
varying_metadata={'tag': tag.name},
include_placements=True)
if derived_image is None: if derived_image is None:
abort(406) abort(406)
# Return the torrent. # Return the torrent.
repo = model.get_repository(repo_image.repository.namespace_name, repo_image.repository.name) torrent = _torrent_for_blob(derived_image.blob, model.repository.is_repository_public(repository))
repo_is_public = repo is not None and repo.is_public
torrent = _torrent_for_blob(derived_image.blob, repo_is_public)
# Log the action. # Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, torrent=True, **kwargs) track_and_log('repo_verb', wrap_repository(repository), tag=tag.name, verb=verb, torrent=True,
**kwargs)
return torrent return torrent
def _verify_repo_verb(_, namespace, repo_name, tag, verb, checker=None): def _verify_repo_verb(_, namespace, repo_name, tag_name, verb, checker=None):
permission = ReadRepositoryPermission(namespace, repo_name) permission = ReadRepositoryPermission(namespace, repo_name)
repo = model.get_repository(namespace, repo_name) repo = model.repository.get_repository(namespace, repo_name)
repo_is_public = repo is not None and repo.is_public repo_is_public = repo is not None and model.repository.is_repository_public(repo)
if not permission.can() and not repo_is_public: if not permission.can() and not repo_is_public:
logger.debug('No permission to read repository %s/%s for user %s with verb %s', namespace, logger.debug('No permission to read repository %s/%s for user %s with verb %s', namespace,
repo_name, get_authenticated_user(), verb) repo_name, get_authenticated_user(), verb)
abort(403) abort(403)
# Lookup the requested tag. if repo is not None and repo.kind.name != 'image':
tag_image = model.get_tag_image(namespace, repo_name, tag)
if tag_image is None:
logger.debug('Tag %s does not exist in repository %s/%s for user %s', tag, namespace, repo_name,
get_authenticated_user())
abort(404)
if repo is not None and repo.kind != 'image':
logger.debug('Repository %s/%s for user %s is not an image repo', namespace, repo_name, logger.debug('Repository %s/%s for user %s is not an image repo', namespace, repo_name,
get_authenticated_user()) get_authenticated_user())
abort(405) abort(405)
# Make sure the repo's namespace isn't disabled. # Make sure the repo's namespace isn't disabled.
if not model.is_namespace_enabled(namespace): if not registry_model.is_namespace_enabled(namespace):
abort(400) abort(400)
# Lookup the requested tag.
repo_ref = registry_model.lookup_repository(namespace, repo_name)
tag = registry_model.get_repo_tag(repo_ref, tag_name)
if tag is None:
logger.debug('Tag %s does not exist in repository %s/%s for user %s', tag, namespace, repo_name,
get_authenticated_user())
abort(404)
# Get its associated manifest.
manifest = registry_model.get_manifest_for_tag(tag, backfill_if_necessary=True)
if manifest is None:
logger.debug('Could not get manifest on %s/%s:%s::%s', namespace, repo_name, tag.name, verb)
abort(404)
# If there is a data checker, call it first. # If there is a data checker, call it first.
if checker is not None: if checker is not None:
if not checker(tag_image): if not checker(tag, manifest):
logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repo_name, tag, verb) logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repo_name, tag.name, verb)
abort(404) abort(404)
return tag_image # Preload the tag's repository information, so it gets cached.
assert tag.repository.namespace_name
assert tag.repository.name
return tag, manifest
def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs): def _repo_verb_signature(namespace, repository, tag_name, verb, checker=None, **kwargs):
# Verify that the image exists and that we have access to it. # Verify that the tag exists and that we have access to it.
repo_image = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) tag, manifest = _verify_repo_verb(storage, namespace, repository, tag_name, verb, checker)
# derived_image the derived image storage for the verb. # Find the derived image storage for the verb.
derived_image = model.lookup_derived_image(repo_image, verb, varying_metadata={'tag': tag}) derived_image = registry_model.lookup_derived_image(manifest, verb,
varying_metadata={'tag': tag.name})
if derived_image is None or derived_image.blob.uploading: if derived_image is None or derived_image.blob.uploading:
return make_response('', 202) return make_response('', 202)
@ -231,7 +239,7 @@ def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwarg
abort(404) abort(404)
# Lookup the signature for the verb. # Lookup the signature for the verb.
signature_value = model.get_derived_image_signature(derived_image, signer.name) signature_value = registry_model.get_derived_image_signature(derived_image, signer.name)
if signature_value is None: if signature_value is None:
abort(404) abort(404)
@ -239,53 +247,57 @@ def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwarg
return make_response(signature_value) return make_response(signature_value)
def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs): def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, checker=None,
**kwargs):
# Verify that the image exists and that we have access to it. # Verify that the image exists and that we have access to it.
logger.debug('Verifying repo verb %s for repository %s/%s with user %s with mimetype %s', logger.debug('Verifying repo verb %s for repository %s/%s with user %s with mimetype %s',
verb, namespace, repository, get_authenticated_user(), request.accept_mimetypes.best) verb, namespace, repository, get_authenticated_user(), request.accept_mimetypes.best)
repo_image = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) tag, manifest = _verify_repo_verb(storage, namespace, repository, tag_name, verb, checker)
# Load the repository for later.
repo = model.repository.get_repository(namespace, repository)
if repo is None:
abort(404)
# Check for torrent. If found, we return a torrent for the repo verb image (if the derived # Check for torrent. If found, we return a torrent for the repo verb image (if the derived
# image already exists). # image already exists).
if request.accept_mimetypes.best == 'application/x-bittorrent': if request.accept_mimetypes.best == 'application/x-bittorrent':
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb + '+torrent', True]) metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb + '+torrent', True])
return _torrent_repo_verb(repo_image, tag, verb, **kwargs) return _torrent_repo_verb(repo, tag, manifest, verb, **kwargs)
# Log the action. # Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs) track_and_log('repo_verb', wrap_repository(repo), tag=tag.name, verb=verb, **kwargs)
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb, True]) metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb, True])
# Lookup/create the derived image for the verb and repo image. # Lookup/create the derived image for the verb and repo image.
derived_image = model.lookup_or_create_derived_image( derived_image = registry_model.lookup_or_create_derived_image(
repo_image, verb, storage.preferred_locations[0], varying_metadata={'tag': tag}) manifest, verb, storage.preferred_locations[0], varying_metadata={'tag': tag.name},
include_placements=True)
if not derived_image.blob.uploading: if not derived_image.blob.uploading:
logger.debug('Derived %s image %s exists in storage', verb, derived_image.ref) logger.debug('Derived %s image %s exists in storage', verb, derived_image)
derived_layer_path = model.get_blob_path(derived_image.blob)
is_head_request = request.method == 'HEAD' is_head_request = request.method == 'HEAD'
download_url = storage.get_direct_download_url(derived_image.blob.locations, download_url = storage.get_direct_download_url(derived_image.blob.placements,
derived_layer_path, head=is_head_request) derived_image.blob.storage_path,
head=is_head_request)
if download_url: if download_url:
logger.debug('Redirecting to download URL for derived %s image %s', verb, derived_image.ref) logger.debug('Redirecting to download URL for derived %s image %s', verb, derived_image)
return redirect(download_url) return redirect(download_url)
# Close the database handle here for this process before we send the long download. # Close the database handle here for this process before we send the long download.
database.close_db_filter(None) database.close_db_filter(None)
logger.debug('Sending cached derived %s image %s', verb, derived_image.ref) logger.debug('Sending cached derived %s image %s', verb, derived_image)
return send_file( return send_file(
storage.stream_read_file(derived_image.blob.locations, derived_layer_path), storage.stream_read_file(derived_image.blob.placements, derived_image.blob.storage_path),
mimetype=LAYER_MIMETYPE) mimetype=LAYER_MIMETYPE)
logger.debug('Building and returning derived %s image %s', verb, derived_image.ref) logger.debug('Building and returning derived %s image %s', verb, derived_image)
# Close the database connection before any process forking occurs. This is important because # Close the database connection before any process forking occurs. This is important because
# the Postgres driver does not react kindly to forking, so we need to make sure it is closed # the Postgres driver does not react kindly to forking, so we need to make sure it is closed
# so that each process will get its own unique connection. # so that each process will get its own unique connection.
database.close_db_filter(None) database.close_db_filter(None)
# Calculate a derived image ID.
derived_image_id = hashlib.sha256(repo_image.image_id + ':' + verb).hexdigest()
def _cleanup(): def _cleanup():
# Close any existing DB connection once the process has exited. # Close any existing DB connection once the process has exited.
database.close_db_filter(None) database.close_db_filter(None)
@ -294,15 +306,15 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
def _store_metadata_and_cleanup(): def _store_metadata_and_cleanup():
with database.UseThenDisconnect(app.config): with database.UseThenDisconnect(app.config):
model.set_torrent_info(derived_image.blob, app.config['BITTORRENT_PIECE_SIZE'], registry_model.set_torrent_info(derived_image.blob, app.config['BITTORRENT_PIECE_SIZE'],
hasher.final_piece_hashes()) hasher.final_piece_hashes())
model.set_blob_size(derived_image.blob, hasher.hashed_bytes) registry_model.set_derived_image_size(derived_image, hasher.hashed_bytes)
# Create a queue process to generate the data. The queue files will read from the process # Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage. # and send the results to the client and storage.
handlers = [hasher.update] handlers = [hasher.update]
reporter = VerbReporter(verb) reporter = VerbReporter(verb)
args = (formatter, repo_image, tag, derived_image_id, handlers, reporter) args = (formatter, tag, manifest, derived_image.unique_id, handlers, reporter)
queue_process = QueueProcess( queue_process = QueueProcess(
_open_stream, _open_stream,
8 * 1024, 8 * 1024,
@ -337,8 +349,8 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
def os_arch_checker(os, arch): def os_arch_checker(os, arch):
def checker(repo_image): def checker(tag, manifest):
image_json = repo_image.compat_metadata image_json = manifest.leaf_layer.v1_metadata
# Verify the architecture and os. # Verify the architecture and os.
operating_system = image_json.get('os', 'linux') operating_system = image_json.get('os', 'linux')
@ -394,8 +406,8 @@ def get_squashed_tag(namespace, repository, tag):
@process_auth @process_auth
@parse_repository_name() @parse_repository_name()
def get_tag_torrent(namespace_name, repo_name, digest): def get_tag_torrent(namespace_name, repo_name, digest):
repo = model.get_repository(namespace_name, repo_name) repo = model.repository.get_repository(namespace_name, repo_name)
repo_is_public = repo is not None and repo.is_public repo_is_public = repo is not None and model.repository.is_repository_public(repo)
permission = ReadRepositoryPermission(namespace_name, repo_name) permission = ReadRepositoryPermission(namespace_name, repo_name)
if not permission.can() and not repo_is_public: if not permission.can() and not repo_is_public:
@ -406,10 +418,14 @@ def get_tag_torrent(namespace_name, repo_name, digest):
# We can not generate a private torrent cluster without a user uuid (e.g. token auth) # We can not generate a private torrent cluster without a user uuid (e.g. token auth)
abort(403) abort(403)
if repo is not None and repo.kind != 'image': if repo is not None and repo.kind.name != 'image':
abort(405) abort(405)
blob = model.get_repo_blob_by_digest(namespace_name, repo_name, digest) repo_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repo_ref is None:
abort(404)
blob = registry_model.get_repo_blob_by_digest(repo_ref, digest, include_placements=True)
if blob is None: if blob is None:
abort(404) abort(404)

View file

@ -1,159 +0,0 @@
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from six import add_metaclass
class Repository(
namedtuple('Repository', ['id', 'name', 'namespace_name', 'description', 'is_public',
'kind'])):
"""
Repository represents a namespaced collection of tags.
:type id: int
:type name: string
:type namespace_name: string
:type description: string
:type is_public: bool
:type kind: string
"""
class DerivedImage(namedtuple('DerivedImage', ['ref', 'blob', 'internal_source_image_db_id'])):
"""
DerivedImage represents a user-facing alias for an image which was derived from another image.
"""
class RepositoryReference(namedtuple('RepositoryReference', ['id', 'name', 'namespace_name'])):
"""
RepositoryReference represents a reference to a Repository, without its full metadata.
"""
class ImageWithBlob(
namedtuple('Image', [
'image_id', 'blob', 'compat_metadata', 'repository', 'internal_db_id', 'v1_metadata'])):
"""
ImageWithBlob represents a user-facing alias for referencing an image, along with its blob.
"""
class Blob(namedtuple('Blob', ['uuid', 'size', 'uncompressed_size', 'uploading', 'locations'])):
"""
Blob represents an opaque binary blob saved to the storage system.
"""
class TorrentInfo(namedtuple('TorrentInfo', ['piece_length', 'pieces'])):
"""
TorrentInfo represents the torrent piece information associated with a blob.
"""
@add_metaclass(ABCMeta)
class VerbsDataInterface(object):
"""
Interface that represents all data store interactions required by the registry's custom HTTP
verbs.
"""
@abstractmethod
def get_repository(self, namespace_name, repo_name):
"""
Returns a repository tuple for the repository with the given name under the given namespace.
Returns None if no such repository was found.
"""
pass
@abstractmethod
def get_manifest_layers_with_blobs(self, repo_image):
"""
Returns the full set of manifest layers and their associated blobs starting at the given
repository image and working upwards to the root image.
"""
pass
@abstractmethod
def get_blob_path(self, blob):
"""
Returns the storage path for the given blob.
"""
pass
@abstractmethod
def get_derived_image_signature(self, derived_image, signer_name):
"""
Returns the signature associated with the derived image and a specific signer or None if none.
"""
pass
@abstractmethod
def set_derived_image_signature(self, derived_image, signer_name, signature):
"""
Sets the calculated signature for the given derived image and signer to that specified.
"""
pass
@abstractmethod
def delete_derived_image(self, derived_image):
"""
Deletes a derived image and all of its storage.
"""
pass
@abstractmethod
def set_blob_size(self, blob, size):
"""
Sets the size field on a blob to the value specified.
"""
pass
@abstractmethod
def get_repo_blob_by_digest(self, namespace_name, repo_name, digest):
"""
Returns the blob with the given digest under the matching repository or None if none.
"""
pass
@abstractmethod
def get_torrent_info(self, blob):
"""
Returns the torrent information associated with the given blob or None if none.
"""
pass
@abstractmethod
def set_torrent_info(self, blob, piece_length, pieces):
"""
Sets the torrent infomation associated with the given blob to that specified.
"""
pass
@abstractmethod
def lookup_derived_image(self, repo_image, verb, varying_metadata=None):
"""
Looks up the derived image for the given repository image, verb and optional varying metadata
and returns it or None if none.
"""
pass
@abstractmethod
def lookup_or_create_derived_image(self, repo_image, verb, location, varying_metadata=None):
"""
Looks up the derived image for the given repository image, verb and optional varying metadata
and returns it. If none exists, a new derived image is created.
"""
pass
@abstractmethod
def get_tag_image(self, namespace_name, repo_name, tag_name):
"""
Returns the image associated with the live tag with the given name under the matching repository
or None if none.
"""
pass
@abstractmethod
def is_namespace_enabled(self, namespace_name):
""" Returns whether the given namespace exists and is enabled. """
pass

View file

@ -1,205 +0,0 @@
import json
from data import model
from image.docker.v1 import DockerV1Metadata
from endpoints.verbs.models_interface import (
Blob,
DerivedImage,
ImageWithBlob,
Repository,
RepositoryReference,
TorrentInfo,
VerbsDataInterface,)
class PreOCIModel(VerbsDataInterface):
"""
PreOCIModel implements the data model for the registry's custom HTTP verbs using a database schema
before it was changed to support the OCI specification.
"""
def get_repository(self, namespace_name, repo_name):
repo = model.repository.get_repository(namespace_name, repo_name)
if repo is None:
return None
return _repository_for_repo(repo)
def get_manifest_layers_with_blobs(self, repo_image):
repo_image_record = model.image.get_image_by_id(
repo_image.repository.namespace_name, repo_image.repository.name, repo_image.image_id)
parents = model.image.get_parent_images(
repo_image.repository.namespace_name, repo_image.repository.name, repo_image_record)
placements_map = model.image.get_placements_for_images(parents)
yield repo_image
for parent in parents:
metadata = {}
try:
metadata = json.loads(parent.v1_json_metadata)
except ValueError:
pass
locations = [placement.location.name for placement in placements_map[parent.storage.id]]
yield ImageWithBlob(
image_id=parent.docker_image_id,
blob=_blob(parent.storage, locations=locations),
repository=repo_image.repository,
compat_metadata=metadata,
v1_metadata=_docker_v1_metadata(repo_image.repository.namespace_name,
repo_image.repository.name, parent),
internal_db_id=parent.id,)
def get_derived_image_signature(self, derived_image, signer_name):
storage = model.storage.get_storage_by_uuid(derived_image.blob.uuid)
signature_entry = model.storage.lookup_storage_signature(storage, signer_name)
if signature_entry is None:
return None
return signature_entry.signature
def set_derived_image_signature(self, derived_image, signer_name, signature):
storage = model.storage.get_storage_by_uuid(derived_image.blob.uuid)
signature_entry = model.storage.find_or_create_storage_signature(storage, signer_name)
signature_entry.signature = signature
signature_entry.uploading = False
signature_entry.save()
def delete_derived_image(self, derived_image):
model.image.delete_derived_storage_by_uuid(derived_image.blob.uuid)
def set_blob_size(self, blob, size):
storage_entry = model.storage.get_storage_by_uuid(blob.uuid)
storage_entry.image_size = size
storage_entry.uploading = False
storage_entry.save()
def get_blob_path(self, blob):
blob_record = model.storage.get_storage_by_uuid(blob.uuid)
return model.storage.get_layer_path(blob_record)
def get_repo_blob_by_digest(self, namespace_name, repo_name, digest):
try:
blob_record = model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest)
except model.BlobDoesNotExist:
return None
return _blob(blob_record)
def get_torrent_info(self, blob):
blob_record = model.storage.get_storage_by_uuid(blob.uuid)
try:
torrent_info = model.storage.get_torrent_info(blob_record)
except model.TorrentInfoDoesNotExist:
return None
return TorrentInfo(
pieces=torrent_info.pieces,
piece_length=torrent_info.piece_length,)
def set_torrent_info(self, blob, piece_length, pieces):
blob_record = model.storage.get_storage_by_uuid(blob.uuid)
model.storage.save_torrent_info(blob_record, piece_length, pieces)
def lookup_derived_image(self, repo_image, verb, varying_metadata=None):
blob_record = model.image.find_derived_storage_for_image(repo_image.internal_db_id, verb,
varying_metadata)
if blob_record is None:
return None
return _derived_image(blob_record, repo_image)
def lookup_or_create_derived_image(self, repo_image, verb, location, varying_metadata=None):
blob_record = model.image.find_or_create_derived_storage(repo_image.internal_db_id, verb,
location, varying_metadata)
return _derived_image(blob_record, repo_image)
def get_tag_image(self, namespace_name, repo_name, tag_name):
try:
found = model.tag.get_tag_image(namespace_name, repo_name, tag_name, include_storage=True)
except model.DataModelException:
return None
metadata = {}
try:
metadata = json.loads(found.v1_json_metadata)
except ValueError:
pass
return ImageWithBlob(
image_id=found.docker_image_id,
blob=_blob(found.storage),
repository=RepositoryReference(
namespace_name=namespace_name,
name=repo_name,
id=found.repository_id,),
compat_metadata=metadata,
v1_metadata=_docker_v1_metadata(namespace_name, repo_name, found),
internal_db_id=found.id,)
def is_namespace_enabled(self, namespace_name):
namespace = model.user.get_namespace_user(namespace_name)
return namespace is not None and namespace.enabled
pre_oci_model = PreOCIModel()
def _docker_v1_metadata(namespace_name, repo_name, repo_image):
"""
Returns a DockerV1Metadata object for the given Pre-OCI repo_image under the
repository with the given namespace and name. Note that the namespace and
name are passed here as an optimization, and are *not checked* against the
image.
"""
return DockerV1Metadata(
namespace_name=namespace_name,
repo_name=repo_name,
image_id=repo_image.docker_image_id,
checksum=repo_image.v1_checksum,
compat_json=repo_image.v1_json_metadata,
created=repo_image.created,
comment=repo_image.comment,
command=repo_image.command,
# Note: These are not needed in verbs and are expensive to load, so we just skip them.
content_checksum=None,
parent_image_id=None,)
def _derived_image(blob_record, repo_image):
"""
Returns a DerivedImage object for the given Pre-OCI data model blob and repo_image instance.
"""
return DerivedImage(
ref=repo_image.internal_db_id,
blob=_blob(blob_record),
internal_source_image_db_id=repo_image.internal_db_id,)
def _blob(blob_record, locations=None):
"""
Returns a Blob object for the given Pre-OCI data model blob instance.
"""
locations = locations or model.storage.get_storage_locations(blob_record.uuid)
return Blob(
uuid=blob_record.uuid,
size=blob_record.image_size,
uncompressed_size=blob_record.uncompressed_size,
uploading=blob_record.uploading,
locations=locations,)
def _repository_for_repo(repo):
""" Returns a Repository object representing the Pre-OCI data model repo instance given. """
return Repository(
id=repo.id,
name=repo.name,
namespace_name=repo.namespace_user.username,
description=repo.description,
is_public=model.repository.is_repository_public(repo),
kind=model.repository.get_repo_kind_name(repo),)

View file

@ -21,7 +21,7 @@ class AppCImageFormatter(TarImageFormatter):
def stream_generator(self, tag, manifest, synthetic_image_id, layer_iterator, def stream_generator(self, tag, manifest, synthetic_image_id, layer_iterator,
tar_stream_getter_iterator, reporter=None): tar_stream_getter_iterator, reporter=None):
image_mtime = 0 image_mtime = 0
created = manifest.created_datetime created = manifest.get_parsed_manifest().created_datetime
if created is not None: if created is not None:
image_mtime = calendar.timegm(created.utctimetuple()) image_mtime = calendar.timegm(created.utctimetuple())
@ -30,12 +30,12 @@ class AppCImageFormatter(TarImageFormatter):
# rootfs - The root file system # rootfs - The root file system
# Yield the manifest. # Yield the manifest.
manifest = json.dumps(DockerV1ToACIManifestTranslator.build_manifest( aci_manifest = json.dumps(DockerV1ToACIManifestTranslator.build_manifest(
tag, tag,
manifest, manifest,
synthetic_image_id synthetic_image_id
)) ))
yield self.tar_file('manifest', manifest, mtime=image_mtime) yield self.tar_file('manifest', aci_manifest, mtime=image_mtime)
# Yield the merged layer dtaa. # Yield the merged layer dtaa.
yield self.tar_folder('rootfs', mtime=image_mtime) yield self.tar_folder('rootfs', mtime=image_mtime)

View file

@ -31,7 +31,8 @@ class SquashedDockerImageFormatter(TarImageFormatter):
def stream_generator(self, tag, manifest, synthetic_image_id, layer_iterator, def stream_generator(self, tag, manifest, synthetic_image_id, layer_iterator,
tar_stream_getter_iterator, reporter=None): tar_stream_getter_iterator, reporter=None):
image_mtime = 0 image_mtime = 0
created = manifest.created_datetime parsed_manifest = manifest.get_parsed_manifest()
created = parsed_manifest.created_datetime
if created is not None: if created is not None:
image_mtime = calendar.timegm(created.utctimetuple()) image_mtime = calendar.timegm(created.utctimetuple())
@ -46,7 +47,7 @@ class SquashedDockerImageFormatter(TarImageFormatter):
# Yield the repositories file: # Yield the repositories file:
synthetic_layer_info = {} synthetic_layer_info = {}
synthetic_layer_info[tag + '.squash'] = synthetic_image_id synthetic_layer_info[tag.name + '.squash'] = synthetic_image_id
hostname = app.config['SERVER_HOSTNAME'] hostname = app.config['SERVER_HOSTNAME']
repositories = {} repositories = {}
@ -60,7 +61,7 @@ class SquashedDockerImageFormatter(TarImageFormatter):
yield self.tar_folder(synthetic_image_id, mtime=image_mtime) yield self.tar_folder(synthetic_image_id, mtime=image_mtime)
# Yield the JSON layer data. # Yield the JSON layer data.
layer_json = SquashedDockerImageFormatter._build_layer_json(manifest, synthetic_image_id) layer_json = SquashedDockerImageFormatter._build_layer_json(parsed_manifest, synthetic_image_id)
yield self.tar_file(synthetic_image_id + '/json', json.dumps(layer_json), mtime=image_mtime) yield self.tar_file(synthetic_image_id + '/json', json.dumps(layer_json), mtime=image_mtime)
# Yield the VERSION file. # Yield the VERSION file.
@ -69,7 +70,7 @@ class SquashedDockerImageFormatter(TarImageFormatter):
# Yield the merged layer data's header. # Yield the merged layer data's header.
estimated_file_size = 0 estimated_file_size = 0
for layer in layer_iterator: for layer in layer_iterator:
estimated_file_size += layer.estimated_file_size(SquashedDockerImageFormatter.SIZE_MULTIPLIER) estimated_file_size += layer.estimated_size(SquashedDockerImageFormatter.SIZE_MULTIPLIER)
# Make sure the estimated file size is an integer number of bytes. # Make sure the estimated file size is an integer number of bytes.
estimated_file_size = int(math.ceil(estimated_file_size)) estimated_file_size = int(math.ceil(estimated_file_size))
@ -108,8 +109,7 @@ class SquashedDockerImageFormatter(TarImageFormatter):
@staticmethod @staticmethod
def _build_layer_json(manifest, synthetic_image_id): def _build_layer_json(manifest, synthetic_image_id):
layer_json = manifest.leaf_layer.v1_metadata updated_json = json.loads(manifest.leaf_layer.raw_v1_metadata)
updated_json = copy.deepcopy(layer_json)
updated_json['id'] = synthetic_image_id updated_json['id'] = synthetic_image_id
if 'parent' in updated_json: if 'parent' in updated_json:

View file

@ -26,7 +26,7 @@ class FakeStorage(BaseStorageV2):
def get_content(self, path): def get_content(self, path):
if not path in _FAKE_STORAGE_MAP: if not path in _FAKE_STORAGE_MAP:
raise IOError('Fake file %s not found' % path) raise IOError('Fake file %s not found. Exist: %s' % (path, _FAKE_STORAGE_MAP.keys()))
_FAKE_STORAGE_MAP.get(path).seek(0) _FAKE_STORAGE_MAP.get(path).seek(0)
return _FAKE_STORAGE_MAP.get(path).read() return _FAKE_STORAGE_MAP.get(path).read()

View file

@ -30,8 +30,10 @@ def sized_images():
parent_bytes = layer_bytes_for_contents('parent contents', mode='') parent_bytes = layer_bytes_for_contents('parent contents', mode='')
image_bytes = layer_bytes_for_contents('some contents', mode='') image_bytes = layer_bytes_for_contents('some contents', mode='')
return [ return [
Image(id='parentid', bytes=parent_bytes, parent_id=None, size=len(parent_bytes)), Image(id='parentid', bytes=parent_bytes, parent_id=None, size=len(parent_bytes),
Image(id='someid', bytes=image_bytes, parent_id='parentid', size=len(image_bytes)), config={'foo': 'bar'}),
Image(id='someid', bytes=image_bytes, parent_id='parentid', size=len(image_bytes),
config={'foo': 'childbar'}),
] ]

View file

@ -953,7 +953,7 @@ def test_squashed_images(use_estimates, pusher, sized_images, liveserver_session
tar = tarfile.open(fileobj=StringIO(response.content)) tar = tarfile.open(fileobj=StringIO(response.content))
# Verify the squashed image. # Verify the squashed image.
expected_image_id = '13a2d9711a3e242fcd50a7627c02d86901ac801b78dcea3147e8ff640078de52' expected_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5'
expected_names = ['repositories', expected_names = ['repositories',
expected_image_id, expected_image_id,
'%s/json' % expected_image_id, '%s/json' % expected_image_id,
@ -968,9 +968,14 @@ def test_squashed_images(use_estimates, pusher, sized_images, liveserver_session
# Ensure the JSON loads and parses. # Ensure the JSON loads and parses.
result = json.loads(json_data) result = json.loads(json_data)
assert result['id'] == expected_image_id assert result['id'] == expected_image_id
assert result['config']['foo'] == 'childbar'
# Ensure that squashed layer tar can be opened. # Ensure that squashed layer tar can be opened.
tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id))) tar = tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id)))
assert tar.getnames() == ['contents']
# Check the contents.
assert tar.extractfile('contents').read() == 'some contents'
@pytest.mark.parametrize('push_user, push_namespace, push_repo, mount_repo_name, expected_failure', [ @pytest.mark.parametrize('push_user, push_namespace, push_repo, mount_repo_name, expected_failure', [

View file

@ -2103,7 +2103,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
# Create the repo. # Create the repo.
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images) self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images)
initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' initial_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5'
# Pull the squashed version of the tag. # Pull the squashed version of the tag.
tar, _ = self.get_squashed_image(auth=('devtable', 'password')) tar, _ = self.get_squashed_image(auth=('devtable', 'password'))
@ -2119,7 +2119,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
# Create the repo. # Create the repo.
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images) self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images)
initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' initial_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5'
# Pull the squashed version of the tag. # Pull the squashed version of the tag.
tar, _ = self.get_squashed_image() tar, _ = self.get_squashed_image()
@ -2134,7 +2134,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
] ]
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=updated_images) self.do_push('devtable', 'newrepo', 'devtable', 'password', images=updated_images)
updated_image_id = '38df4bd4cdffc6b7d656dbd2813c73e864f2d362ad887c999ac315224ad281ac' updated_image_id = '57e8d9226ca95ed4d9b303a4104cb6475605e00f6ce536fe6ed54a5d1f559882'
# Pull the squashed version of the tag and ensure it has changed. # Pull the squashed version of the tag and ensure it has changed.
tar, _ = self.get_squashed_image() tar, _ = self.get_squashed_image()
@ -2157,7 +2157,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
self.conduct('POST', '/__test/removeuncompressed/initialid') self.conduct('POST', '/__test/removeuncompressed/initialid')
# Pull the squashed version of the tag. # Pull the squashed version of the tag.
initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' initial_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5'
tar, _ = self.get_squashed_image() tar, _ = self.get_squashed_image()
self.assertTrue(initial_image_id in tar.getnames()) self.assertTrue(initial_image_id in tar.getnames())
@ -2179,7 +2179,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images)
# Pull the squashed version of the tag. # Pull the squashed version of the tag.
expected_image_id = 'bd590ae79fba5ebc6550aaf016c0bd0f49b1d78178e0f83e0ca1c56c2bb7e7bf' expected_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5'
expected_names = ['repositories', expected_names = ['repositories',
expected_image_id, expected_image_id,
@ -2212,7 +2212,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
# Create the repo. # Create the repo.
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images) self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images)
initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' initial_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5'
# Try to pull the torrent of the squashed image. This should fail with a 406 since the # Try to pull the torrent of the squashed image. This should fail with a 406 since the
# squashed image doesn't yet exist. # squashed image doesn't yet exist.

View file

@ -1,6 +1,7 @@
import logging import logging
import random import random
from collections import namedtuple
from urlparse import urlparse from urlparse import urlparse
from flask import request from flask import request
@ -11,6 +12,13 @@ from auth.auth_context import get_authenticated_context, get_authenticated_user
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
Repository = namedtuple('Repository', ['namespace_name', 'name', 'id'])
def wrap_repository(repo_obj):
return Repository(namespace_name=repo_obj.namespace_user.username, name=repo_obj.name,
id=repo_obj.id)
def track_and_log(event_name, repo_obj, analytics_name=None, analytics_sample=1, **kwargs): def track_and_log(event_name, repo_obj, analytics_name=None, analytics_sample=1, **kwargs):
repo_name = repo_obj.name repo_name = repo_obj.name
namespace_name = repo_obj.namespace_name namespace_name = repo_obj.namespace_name