This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/registry_model/registry_pre_oci_model.py

290 lines
12 KiB
Python
Raw Normal View History

# pylint: disable=protected-access
from collections import defaultdict
from data import database
from data import model
from data.registry_model.interface import RegistryDataInterface
from data.registry_model.datatypes import (Tag, RepositoryReference, Manifest, LegacyImage, Label,
SecurityScanStatus)
class PreOCIModel(RegistryDataInterface):
"""
PreOCIModel implements the data model for the registry API using a database schema
before it was changed to support the OCI specification.
"""
def find_matching_tag(self, repository_ref, tag_names):
""" Finds an alive tag in the repository matching one of the given tag names and returns it
or None if none.
"""
found_tag = model.tag.find_matching_tag(repository_ref._db_id, tag_names)
return Tag.for_repository_tag(found_tag)
def get_most_recent_tag(self, repository_ref):
""" Returns the most recently pushed alive tag in the repository, if any. If none, returns
None.
"""
found_tag = model.tag.get_most_recent_tag(repository_ref._db_id)
return Tag.for_repository_tag(found_tag)
def lookup_repository(self, namespace_name, repo_name, kind_filter=None):
""" Looks up and returns a reference to the repository with the given namespace and name,
or None if none. """
repo = model.repository.get_repository(namespace_name, repo_name, kind_filter=kind_filter)
return RepositoryReference.for_repo_obj(repo)
def get_manifest_for_tag(self, tag):
""" Returns the manifest associated with the given tag. """
try:
tag_manifest = database.TagManifest.get(tag_id=tag._db_id)
except database.TagManifest.DoesNotExist:
return
return Manifest.for_tag_manifest(tag_manifest)
def lookup_manifest_by_digest(self, repository_ref, manifest_digest, allow_dead=False,
include_legacy_image=False):
""" Looks up the manifest with the given digest under the given repository and returns it
or None if none. """
repo = model.repository.lookup_repository(repository_ref._db_id)
if repo is None:
return None
try:
tag_manifest = model.tag.load_manifest_by_digest(repo.namespace_user.username,
repo.name,
manifest_digest,
allow_dead=allow_dead)
except model.tag.InvalidManifestException:
return None
legacy_image = None
if include_legacy_image:
legacy_image = self.get_legacy_image(repository_ref, tag_manifest.tag.image.docker_image_id,
include_parents=True)
return Manifest.for_tag_manifest(tag_manifest, legacy_image)
def get_legacy_images(self, repository_ref):
"""
Returns an iterator of all the LegacyImage's defined in the matching repository.
"""
repo = model.repository.lookup_repository(repository_ref._db_id)
if repo is None:
return None
all_images = model.image.get_repository_images_without_placements(repo)
all_images_map = {image.id: image for image in all_images}
all_tags = model.tag.list_repository_tags(repo.namespace_user.username, repo.name)
tags_by_image_id = defaultdict(list)
for tag in all_tags:
tags_by_image_id[tag.image_id].append(tag)
return [LegacyImage.for_image(image, images_map=all_images_map, tags_map=tags_by_image_id)
for image in all_images]
def get_legacy_image(self, repository_ref, docker_image_id, include_parents=False):
"""
Returns the matching LegacyImages under the matching repository, if any. If none,
returns None.
"""
repo = model.repository.lookup_repository(repository_ref._db_id)
if repo is None:
return None
image = model.image.get_image(repository_ref._db_id, docker_image_id)
if image is None:
return None
parent_images_map = None
if include_parents:
parent_images = model.image.get_parent_images(repo.namespace_user.username, repo.name, image)
parent_images_map = {image.id: image for image in parent_images}
return LegacyImage.for_image(image, images_map=parent_images_map)
def create_manifest_label(self, manifest, key, value, source_type_name, media_type_name=None):
""" Creates a label on the manifest with the given key and value. """
try:
tag_manifest = database.TagManifest.get(id=manifest._db_id)
except database.TagManifest.DoesNotExist:
return None
label = model.label.create_manifest_label(tag_manifest, key, value, source_type_name,
media_type_name)
return Label.for_label(label)
def list_manifest_labels(self, manifest, key_prefix=None):
""" Returns all labels found on the manifest. If specified, the key_prefix will filter the
labels returned to those keys that start with the given prefix.
"""
labels = model.label.list_manifest_labels(manifest._db_id, prefix_filter=key_prefix)
return [Label.for_label(l) for l in labels]
def get_manifest_label(self, manifest, label_uuid):
""" Returns the label with the specified UUID on the manifest or None if none. """
return Label.for_label(model.label.get_manifest_label(label_uuid, manifest._db_id))
def delete_manifest_label(self, manifest, label_uuid):
""" Delete the label with the specified UUID on the manifest. Returns the label deleted
or None if none.
"""
return Label.for_label(model.label.delete_manifest_label(label_uuid, manifest._db_id))
def list_repository_tags(self, repository_ref, include_legacy_images=False):
"""
Returns a list of all the active tags in the repository. Note that this can be a *heavy*
operation on repositories with a lot of tags, and should be avoided for more targetted
operations wherever possible.
"""
# NOTE: include_legacy_images isn't used here because `list_active_repo_tags` includes the
# information already, so we might as well just use it. However, the new model classes will
# *not* include it by default, so we make it a parameter now.
tags = model.tag.list_active_repo_tags(repository_ref._db_id)
return [Tag.for_repository_tag(tag,
legacy_image=LegacyImage.for_image(tag.image),
manifest_digest=(tag.tagmanifest.digest
if hasattr(tag, 'tagmanifest')
else None))
for tag in tags]
def list_repository_tag_history(self, repository_ref, page=1, size=100, specific_tag_name=None):
"""
Returns the history of all tags in the repository (unless filtered). This includes tags that
have been made in-active due to newer versions of those tags coming into service.
"""
tags, manifest_map, has_more = model.tag.list_repository_tag_history(repository_ref._db_id,
page, size,
specific_tag_name)
return [Tag.for_repository_tag(tag, manifest_map.get(tag.id),
legacy_image=LegacyImage.for_image(tag.image))
for tag in tags], has_more
def get_repo_tag(self, repository_ref, tag_name, include_legacy_image=False):
"""
Returns the latest, *active* tag found in the repository, with the matching name
or None if none.
"""
tag = model.tag.get_active_tag_for_repo(repository_ref._db_id, tag_name)
if tag is None:
return None
legacy_image = LegacyImage.for_image(tag.image) if include_legacy_image else None
tag_manifest = model.tag.get_tag_manifest(tag)
manifest_digest = tag_manifest.digest if tag_manifest else None
return Tag.for_repository_tag(tag, legacy_image=legacy_image, manifest_digest=manifest_digest)
def retarget_tag(self, repository_ref, tag_name, manifest_or_legacy_image,
is_reversion=False):
"""
Creates, updates or moves a tag to a new entry in history, pointing to the manifest or
legacy image specified. If is_reversion is set to True, this operation is considered a
reversion over a previous tag move operation. Returns the updated Tag or None on error.
"""
# TODO: unify this.
if not is_reversion:
if isinstance(manifest_or_legacy_image, Manifest):
raise NotImplementedError('Not yet implemented')
else:
model.tag.create_or_update_tag_for_repo(repository_ref._db_id, tag_name,
manifest_or_legacy_image.docker_image_id)
else:
if isinstance(manifest_or_legacy_image, Manifest):
model.tag.restore_tag_to_manifest(repository_ref._db_id, tag_name,
manifest_or_legacy_image.digest)
else:
model.tag.restore_tag_to_image(repository_ref._db_id, tag_name,
manifest_or_legacy_image.docker_image_id)
return self.get_repo_tag(repository_ref, tag_name, include_legacy_image=True)
def delete_tag(self, repository_ref, tag_name):
"""
Deletes the latest, *active* tag with the given name in the repository.
"""
repo = model.repository.lookup_repository(repository_ref._db_id)
if repo is None:
return None
deleted_tag = model.tag.delete_tag(repo.namespace_user.username, repo.name, tag_name)
return Tag.for_repository_tag(deleted_tag)
def change_repository_tag_expiration(self, tag, expiration_date):
""" Sets the expiration date of the tag under the matching repository to that given. If the
expiration date is None, then the tag will not expire. Returns a tuple of the previous
expiration timestamp in seconds (if any), and whether the operation succeeded.
"""
try:
tag_obj = database.RepositoryTag.get(id=tag._db_id)
except database.RepositoryTag.DoesNotExist:
return (None, False)
return model.tag.change_tag_expiration(tag_obj, expiration_date)
def get_legacy_images_owned_by_tag(self, tag):
""" Returns all legacy images *solely owned and used* by the given tag. """
try:
tag_obj = database.RepositoryTag.get(id=tag._db_id)
except database.RepositoryTag.DoesNotExist:
return None
# Collect the IDs of all images that the tag uses.
tag_image_ids = set()
tag_image_ids.add(tag_obj.image.id)
tag_image_ids.update(tag_obj.image.ancestor_id_list())
# Remove any images shared by other tags.
for current_tag in model.tag.list_active_repo_tags(tag_obj.repository_id):
if current_tag == tag_obj:
continue
tag_image_ids.discard(current_tag.image.id)
tag_image_ids = tag_image_ids.difference(current_tag.image.ancestor_id_list())
if not tag_image_ids:
return []
if not tag_image_ids:
return []
# Load the images we need to return.
images = database.Image.select().where(database.Image.id << list(tag_image_ids))
all_image_ids = set()
for image in images:
all_image_ids.add(image.id)
all_image_ids.update(image.ancestor_id_list())
# Build a map of all the images and their parents.
images_map = {}
all_images = database.Image.select().where(database.Image.id << list(all_image_ids))
for image in all_images:
images_map[image.id] = image
return [LegacyImage.for_image(image, images_map=images_map) for image in images]
def get_security_status(self, manifest_or_legacy_image):
""" Returns the security status for the given manifest or legacy image or None if none. """
image = None
if isinstance(manifest_or_legacy_image, Manifest):
try:
tag_manifest = database.TagManifest.get(id=manifest_or_legacy_image._db_id)
image = tag_manifest.tag.image
except database.TagManifest.DoesNotExist:
return None
else:
try:
image = database.Image.get(id=manifest_or_legacy_image._db_id)
except database.Image.DoesNotExist:
return None
if image.security_indexed_engine is not None and image.security_indexed_engine >= 0:
return SecurityScanStatus.SCANNED if image.security_indexed else SecurityScanStatus.FAILED
return SecurityScanStatus.QUEUED
pre_oci_model = PreOCIModel()