This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/registry_model/registry_pre_oci_model.py

65 lines
2.6 KiB
Python
Raw Normal View History

from data import database
from data import model
from data.registry_model.interface import RegistryDataInterface
from data.registry_model.datatypes import Tag, RepositoryReference, Manifest
class PreOCIModel(RegistryDataInterface):
"""
PreOCIModel implements the data model for the registry API using a database schema
before it was changed to support the OCI specification.
"""
def find_matching_tag(self, repository_ref, tag_names):
""" Finds an alive tag in the repository matching one of the given tag names and returns it
or None if none.
"""
found_tag = model.tag.find_matching_tag(repository_ref.repo_id, tag_names)
return Tag.for_repository_tag(found_tag)
def get_most_recent_tag(self, repository_ref):
""" Returns the most recently pushed alive tag in the repository, if any. If none, returns
None.
"""
found_tag = model.tag.get_most_recent_tag(repository_ref.repo_id)
return Tag.for_repository_tag(found_tag)
def lookup_repository(self, namespace_name, repo_name, kind_filter=None):
""" Looks up and returns a reference to the repository with the given namespace and name,
or None if none. """
repo = model.repository.get_repository(namespace_name, repo_name, kind_filter=kind_filter)
return RepositoryReference.for_repo_obj(repo)
def get_manifest_for_tag(self, tag):
""" Returns the manifest associated with the given tag. """
try:
tag_manifest = database.TagManifest.get(tag_id=tag.id)
except database.TagManifest.DoesNotExist:
return
return Manifest.for_tag_manifest(tag_manifest)
def lookup_manifest_by_digest(self, repository_ref, manifest_digest, allow_dead=False):
""" Looks up the manifest with the given digest under the given repository and returns it
or None if none. """
repo = model.repository.lookup_repository(repository_ref.repo_id)
if repo is None:
return None
tag_manifest = model.tag.load_manifest_by_digest(repo.namespace_user.username,
repo.name,
manifest_digest, allow_dead=allow_dead)
return Manifest.for_tag_manifest(tag_manifest)
def create_manifest_label(self, manifest, key, value, source_type_name, media_type_name=None):
""" Creates a label on the manifest with the given key and value. """
try:
tag_manifest = database.TagManifest.get(id=manifest.id)
except database.TagManifest.DoesNotExist:
return
model.label.create_manifest_label(tag_manifest, key, value, source_type_name, media_type_name)
pre_oci_model = PreOCIModel()