Merge pull request #3219 from quay/joseph.schorr/QUAY-1030/interfacing-part4

Change tags API endpoint to use new registry model interface
This commit is contained in:
Joseph Schorr 2018-08-23 15:30:16 -04:00 committed by GitHub
commit dcaa98a428
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 517 additions and 850 deletions

View file

@ -515,7 +515,7 @@ def restore_tag_to_manifest(repo_obj, tag_name, manifest_digest):
# Change the tag manifest to point to the updated image.
docker_image_id = tag_manifest.tag.image.docker_image_id
updated_tag = create_or_update_tag_for_repo(repo_obj.id, tag_name, docker_image_id,
updated_tag = create_or_update_tag_for_repo(repo_obj, tag_name, docker_image_id,
reversion=True)
tag_manifest.tag = updated_tag
tag_manifest.save()
@ -544,8 +544,7 @@ def restore_tag_to_image(repo_obj, tag_name, docker_image_id):
except DataModelException:
existing_image = None
create_or_update_tag(repo_obj.namespace_user.username, repo_obj.name, tag_name,
docker_image_id, reversion=True)
create_or_update_tag_for_repo(repo_obj, tag_name, docker_image_id, reversion=True)
return existing_image
@ -589,6 +588,14 @@ def get_active_tag(namespace, repo_name, tag_name):
.where(RepositoryTag.name == tag_name, Repository.name == repo_name,
Namespace.username == namespace)).get()
def get_active_tag_for_repo(repo, tag_name):
try:
return _tag_alive(RepositoryTag
.select()
.where(RepositoryTag.name == tag_name,
RepositoryTag.repository == repo)).get()
except RepositoryTag.DoesNotExist:
return None
def get_possibly_expired_tag(namespace, repo_name, tag_name):
return (RepositoryTag
@ -641,6 +648,13 @@ def populate_manifest(repository, manifest, legacy_image, storage_ids):
return manifest_row
def get_tag_manifest(tag):
try:
return TagManifest.get(tag=tag)
except TagManifest.DoesNotExist:
return None
def load_tag_manifest(namespace, repo_name, tag_name):
try:
return (_load_repo_manifests(namespace, repo_name)

View file

@ -39,7 +39,7 @@ def requiresinput(input_name):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self._inputs.get(input_name) is None:
raise Exception('Cannot invoke function with missing input `%s`', input_name)
raise Exception('Cannot invoke function with missing input `%s`' % input_name)
kwargs[input_name] = self._inputs[input_name]
result = func(self, *args, **kwargs)

View file

@ -22,14 +22,29 @@ class Label(datatype('Label', ['key', 'value', 'uuid', 'source_type_name', 'medi
source_type_name=label.source_type.name)
class Tag(datatype('Tag', ['name'])):
class Tag(datatype('Tag', ['name', 'reversion', 'manifest_digest', 'lifetime_start_ts',
'lifetime_end_ts'])):
""" Tag represents a tag in a repository, which points to a manifest or image. """
@classmethod
def for_repository_tag(cls, repository_tag):
def for_repository_tag(cls, repository_tag, manifest_digest=None, legacy_image=None):
if repository_tag is None:
return None
return Tag(db_id=repository_tag.id, name=repository_tag.name)
return Tag(db_id=repository_tag.id,
name=repository_tag.name,
reversion=repository_tag.reversion,
lifetime_start_ts=repository_tag.lifetime_start_ts,
lifetime_end_ts=repository_tag.lifetime_end_ts,
manifest_digest=manifest_digest,
inputs=dict(legacy_image=legacy_image))
@property
@requiresinput('legacy_image')
def legacy_image(self, legacy_image):
""" Returns the legacy Docker V1-style image for this tag. Note that this
will be None for tags whose manifests point to other manifests instead of images.
"""
return legacy_image
class Manifest(datatype('Manifest', ['digest', 'manifest_bytes'])):
@ -78,7 +93,7 @@ class LegacyImage(datatype('LegacyImage', ['docker_image_id', 'created', 'commen
not been loaded before this property is invoked.
"""
return [LegacyImage.for_image(images_map[ancestor_id], images_map=images_map)
for ancestor_id in ancestor_id_list
for ancestor_id in reversed(ancestor_id_list)
if images_map.get(ancestor_id)]
@property

View file

@ -70,3 +70,51 @@ class RegistryDataInterface(object):
""" Delete the label with the specified UUID on the manifest. Returns the label deleted
or None if none.
"""
@abstractmethod
def list_repository_tags(self, repository_ref, include_legacy_images=False):
"""
Returns a list of all the active tags in the repository. Note that this can be a *heavy*
operation on repositories with a lot of tags, and should be avoided for more targetted
operations wherever possible.
"""
@abstractmethod
def list_repository_tag_history(self, repository_ref, page=1, size=100, specific_tag_name=None):
"""
Returns the history of all tags in the repository (unless filtered). This includes tags that
have been made in-active due to newer versions of those tags coming into service.
"""
@abstractmethod
def get_repo_tag(self, repository_ref, tag_name, include_legacy_image=False):
"""
Returns the latest, *active* tag found in the repository, with the matching name
or None if none.
"""
@abstractmethod
def retarget_tag(self, repository_ref, tag_name, manifest_or_legacy_image,
is_reversion=False):
"""
Creates, updates or moves a tag to a new entry in history, pointing to the manifest or
legacy image specified. If is_reversion is set to True, this operation is considered a
reversion over a previous tag move operation. Returns the updated Tag or None on error.
"""
@abstractmethod
def delete_tag(self, repository_ref, tag_name):
"""
Deletes the latest, *active* tag with the given name in the repository.
"""
@abstractmethod
def change_repository_tag_expiration(self, tag, expiration_date):
""" Sets the expiration date of the tag under the matching repository to that given. If the
expiration date is None, then the tag will not expire. Returns a tuple of the previous
expiration timestamp in seconds (if any), and whether the operation succeeded.
"""
@abstractmethod
def get_legacy_images_owned_by_tag(self, tag):
""" Returns all legacy images *solely owned and used* by the given tag. """

View file

@ -133,4 +133,139 @@ class PreOCIModel(RegistryDataInterface):
"""
return Label.for_label(model.label.delete_manifest_label(label_uuid, manifest._db_id))
def list_repository_tags(self, repository_ref, include_legacy_images=False):
"""
Returns a list of all the active tags in the repository. Note that this can be a *heavy*
operation on repositories with a lot of tags, and should be avoided for more targetted
operations wherever possible.
"""
# NOTE: include_legacy_images isn't used here because `list_active_repo_tags` includes the
# information already, so we might as well just use it. However, the new model classes will
# *not* include it by default, so we make it a parameter now.
tags = model.tag.list_active_repo_tags(repository_ref._db_id)
return [Tag.for_repository_tag(tag,
legacy_image=LegacyImage.for_image(tag.image),
manifest_digest=(tag.tagmanifest.digest
if hasattr(tag, 'tagmanifest')
else None))
for tag in tags]
def list_repository_tag_history(self, repository_ref, page=1, size=100, specific_tag_name=None):
"""
Returns the history of all tags in the repository (unless filtered). This includes tags that
have been made in-active due to newer versions of those tags coming into service.
"""
tags, manifest_map, has_more = model.tag.list_repository_tag_history(repository_ref._db_id,
page, size,
specific_tag_name)
return [Tag.for_repository_tag(tag, manifest_map.get(tag.id),
legacy_image=LegacyImage.for_image(tag.image))
for tag in tags], has_more
def get_repo_tag(self, repository_ref, tag_name, include_legacy_image=False):
"""
Returns the latest, *active* tag found in the repository, with the matching name
or None if none.
"""
tag = model.tag.get_active_tag_for_repo(repository_ref._db_id, tag_name)
if tag is None:
return None
legacy_image = LegacyImage.for_image(tag.image) if include_legacy_image else None
tag_manifest = model.tag.get_tag_manifest(tag)
manifest_digest = tag_manifest.digest if tag_manifest else None
return Tag.for_repository_tag(tag, legacy_image=legacy_image, manifest_digest=manifest_digest)
def retarget_tag(self, repository_ref, tag_name, manifest_or_legacy_image,
is_reversion=False):
"""
Creates, updates or moves a tag to a new entry in history, pointing to the manifest or
legacy image specified. If is_reversion is set to True, this operation is considered a
reversion over a previous tag move operation. Returns the updated Tag or None on error.
"""
# TODO: unify this.
if not is_reversion:
if isinstance(manifest_or_legacy_image, Manifest):
raise NotImplementedError('Not yet implemented')
else:
model.tag.create_or_update_tag_for_repo(repository_ref._db_id, tag_name,
manifest_or_legacy_image.docker_image_id)
else:
if isinstance(manifest_or_legacy_image, Manifest):
image = model.tag.restore_tag_to_manifest(repository_ref._db_id, tag_name,
manifest_or_legacy_image.digest)
if image is None:
return None
else:
image = model.tag.restore_tag_to_image(repository_ref._db_id, tag_name,
manifest_or_legacy_image.docker_image_id)
if image is None:
return None
return self.get_repo_tag(repository_ref, tag_name, include_legacy_image=True)
def delete_tag(self, repository_ref, tag_name):
"""
Deletes the latest, *active* tag with the given name in the repository.
"""
repo = model.repository.lookup_repository(repository_ref._db_id)
if repo is None:
return None
deleted_tag = model.tag.delete_tag(repo.namespace_user.username, repo.name, tag_name)
return Tag.for_repository_tag(deleted_tag)
def change_repository_tag_expiration(self, tag, expiration_date):
""" Sets the expiration date of the tag under the matching repository to that given. If the
expiration date is None, then the tag will not expire. Returns a tuple of the previous
expiration timestamp in seconds (if any), and whether the operation succeeded.
"""
try:
tag_obj = database.RepositoryTag.get(id=tag._db_id)
except database.RepositoryTag.DoesNotExist:
return (None, False)
return model.tag.change_tag_expiration(tag_obj, expiration_date)
def get_legacy_images_owned_by_tag(self, tag):
""" Returns all legacy images *solely owned and used* by the given tag. """
try:
tag_obj = database.RepositoryTag.get(id=tag._db_id)
except database.RepositoryTag.DoesNotExist:
return None
# Collect the IDs of all images that the tag uses.
tag_image_ids = set()
tag_image_ids.add(tag_obj.image.id)
tag_image_ids.update(tag_obj.image.ancestor_id_list())
# Remove any images shared by other tags.
for current_tag in model.tag.list_active_repo_tags(tag_obj.repository_id):
if current_tag == tag_obj:
continue
tag_image_ids.discard(current_tag.image.id)
tag_image_ids = tag_image_ids.difference(current_tag.image.ancestor_id_list())
if not tag_image_ids:
return []
if not tag_image_ids:
return []
# Load the images we need to return.
images = database.Image.select().where(database.Image.id << list(tag_image_ids))
all_image_ids = set()
for image in images:
all_image_ids.add(image.id)
all_image_ids.update(image.ancestor_id_list())
# Build a map of all the images and their parents.
images_map = {}
all_images = database.Image.select().where(database.Image.id << list(all_image_ids))
for image in all_images:
images_map[image.id] = image
return [LegacyImage.for_image(image, images_map=images_map) for image in images]
pre_oci_model = PreOCIModel()

View file

@ -1,3 +1,5 @@
from datetime import datetime, timedelta
import pytest
from data import model
@ -103,7 +105,7 @@ def test_legacy_images(repo_namespace, repo_name, pre_oci_model):
# Check against the actual DB row.
model_image = model.image.get_image(repository_ref._db_id, found_image.docker_image_id)
assert model_image.id == found_image._db_id
assert ([pid for pid in model_image.ancestor_id_list()] ==
assert ([pid for pid in reversed(model_image.ancestor_id_list())] ==
[p._db_id for p in found_image.parents])
# Try without parents and ensure it raises an exception.
@ -145,3 +147,149 @@ def test_manifest_labels(pre_oci_model):
assert pre_oci_model.delete_manifest_label(found_manifest, created.uuid)
assert pre_oci_model.get_manifest_label(found_manifest, created.uuid) is None
assert created not in pre_oci_model.list_manifest_labels(found_manifest)
@pytest.mark.parametrize('repo_namespace, repo_name', [
('devtable', 'simple'),
('devtable', 'complex'),
('devtable', 'history'),
('buynlarge', 'orgrepo'),
])
def test_repository_tags(repo_namespace, repo_name, pre_oci_model):
repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
tags = pre_oci_model.list_repository_tags(repository_ref)
assert len(tags)
for tag in tags:
found_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True)
assert found_tag == tag
if found_tag.legacy_image is None:
continue
found_image = pre_oci_model.get_legacy_image(repository_ref,
found_tag.legacy_image.docker_image_id)
assert found_image == found_tag.legacy_image
def test_repository_tag_history(pre_oci_model):
repository_ref = pre_oci_model.lookup_repository('devtable', 'history')
history, has_more = pre_oci_model.list_repository_tag_history(repository_ref)
assert not has_more
assert len(history) == 2
@pytest.mark.parametrize('repo_namespace, repo_name', [
('devtable', 'simple'),
('devtable', 'complex'),
('devtable', 'history'),
('buynlarge', 'orgrepo'),
])
def test_delete_tags(repo_namespace, repo_name, pre_oci_model):
repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
tags = pre_oci_model.list_repository_tags(repository_ref)
assert len(tags)
# Save history before the deletions.
previous_history, _ = pre_oci_model.list_repository_tag_history(repository_ref, size=1000)
assert len(previous_history) >= len(tags)
# Delete every tag in the repository.
for tag in tags:
assert pre_oci_model.delete_tag(repository_ref, tag.name)
# Make sure the tag is no longer found.
found_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True)
assert found_tag is None
# Ensure all tags have been deleted.
tags = pre_oci_model.list_repository_tags(repository_ref)
assert not len(tags)
# Ensure that the tags all live in history.
history, _ = pre_oci_model.list_repository_tag_history(repository_ref, size=1000)
assert len(history) == len(previous_history)
@pytest.mark.parametrize('use_manifest', [
True,
False,
])
def test_retarget_tag_history(use_manifest, pre_oci_model):
repository_ref = pre_oci_model.lookup_repository('devtable', 'history')
history, _ = pre_oci_model.list_repository_tag_history(repository_ref)
if use_manifest:
manifest_or_legacy_image = pre_oci_model.lookup_manifest_by_digest(repository_ref,
history[1].manifest_digest,
allow_dead=True)
else:
manifest_or_legacy_image = history[1].legacy_image
# Retarget the tag.
assert manifest_or_legacy_image
updated_tag = pre_oci_model.retarget_tag(repository_ref, 'latest', manifest_or_legacy_image,
is_reversion=True)
# Ensure the tag has changed targets.
if use_manifest:
assert updated_tag.manifest_digest == manifest_or_legacy_image.digest
else:
assert updated_tag.legacy_image == manifest_or_legacy_image
# Ensure history has been updated.
new_history, _ = pre_oci_model.list_repository_tag_history(repository_ref)
assert len(new_history) == len(history) + 1
def test_retarget_tag(pre_oci_model):
repository_ref = pre_oci_model.lookup_repository('devtable', 'complex')
history, _ = pre_oci_model.list_repository_tag_history(repository_ref)
prod_tag = pre_oci_model.get_repo_tag(repository_ref, 'prod', include_legacy_image=True)
# Retarget the tag.
updated_tag = pre_oci_model.retarget_tag(repository_ref, 'latest', prod_tag.legacy_image)
# Ensure the tag has changed targets.
assert updated_tag.legacy_image == prod_tag.legacy_image
# Ensure history has been updated.
new_history, _ = pre_oci_model.list_repository_tag_history(repository_ref)
assert len(new_history) == len(history) + 1
def test_change_repository_tag_expiration(pre_oci_model):
repository_ref = pre_oci_model.lookup_repository('devtable', 'simple')
tag = pre_oci_model.get_repo_tag(repository_ref, 'latest')
assert tag.lifetime_end_ts is None
new_datetime = datetime.utcnow() + timedelta(days=2)
previous, okay = pre_oci_model.change_repository_tag_expiration(tag, new_datetime)
assert okay
assert previous is None
tag = pre_oci_model.get_repo_tag(repository_ref, 'latest')
assert tag.lifetime_end_ts is not None
@pytest.mark.parametrize('repo_namespace, repo_name, expected_non_empty', [
('devtable', 'simple', []),
('devtable', 'complex', ['prod', 'v2.0']),
('devtable', 'history', ['latest']),
('buynlarge', 'orgrepo', []),
('devtable', 'gargantuan', ['v2.0', 'v3.0', 'v4.0', 'v5.0', 'v6.0']),
])
def test_get_legacy_images_owned_by_tag(repo_namespace, repo_name, expected_non_empty,
pre_oci_model):
repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
tags = pre_oci_model.list_repository_tags(repository_ref)
assert len(tags)
non_empty = set()
for tag in tags:
if pre_oci_model.get_legacy_images_owned_by_tag(tag):
non_empty.add(tag.name)
assert non_empty == set(expected_non_empty)

View file

@ -1,20 +1,40 @@
""" Manage the tags of a repository. """
from datetime import datetime, timedelta
from datetime import datetime
from flask import request, abort
from auth.auth_context import get_authenticated_user
from data.model import DataModelException
from data.registry_model import registry_model
from endpoints.api import (resource, nickname, require_repo_read, require_repo_write,
RepositoryParamResource, log_action, validate_json_request, path_param,
parse_args, query_param, truthy_bool, disallow_for_app_repositories)
from endpoints.api.tag_models_interface import Repository
from endpoints.api.tag_models_pre_oci import pre_oci_model as model
from endpoints.api.image import image_dict
from endpoints.exception import NotFound, InvalidRequest
from endpoints.v2.manifest import _generate_and_store_manifest
from util.names import TAG_ERROR, TAG_REGEX
def _tag_dict(tag):
tag_info = {
'name': tag.name,
'reversion': tag.reversion,
}
if tag.lifetime_start_ts > 0:
tag_info['start_ts'] = tag.lifetime_start_ts
if tag.lifetime_end_ts > 0:
tag_info['end_ts'] = tag.lifetime_end_ts
if tag.manifest_digest:
tag_info['manifest_digest'] = tag.manifest_digest
if tag.legacy_image:
tag_info['docker_image_id'] = tag.legacy_image.docker_image_id
return tag_info
@resource('/v1/repository/<apirepopath:repository>/tag/')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
class ListRepositoryTags(RepositoryParamResource):
@ -33,17 +53,17 @@ class ListRepositoryTags(RepositoryParamResource):
page = max(1, parsed_args.get('page', 1))
limit = min(100, max(1, parsed_args.get('limit', 50)))
tag_history = model.list_repository_tag_history(namespace_name=namespace,
repository_name=repository, page=page,
size=limit, specific_tag=specific_tag)
if not tag_history:
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
history, has_more = registry_model.list_repository_tag_history(repo_ref, page=page,
size=limit,
specific_tag_name=specific_tag)
return {
'tags': [tag.to_dict() for tag in tag_history.tags],
'tags': [_tag_dict(tag) for tag in history],
'page': page,
'has_additional': tag_history.more,
'has_additional': has_more,
}
@ -75,59 +95,67 @@ class RepositoryTag(RepositoryParamResource):
@validate_json_request('ChangeTag')
def put(self, namespace, repository, tag):
""" Change which image a tag points to or create a new tag."""
if not TAG_REGEX.match(tag):
abort(400, TAG_ERROR)
repo = model.get_repo(namespace, repository)
if not repo:
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
if 'expiration' in request.get_json():
tag_ref = registry_model.get_repo_tag(repo_ref, tag)
if tag_ref is None:
raise NotFound()
expiration = request.get_json().get('expiration')
expiration_date = None
if expiration is not None:
try:
expiration_date = datetime.utcfromtimestamp(float(expiration))
except ValueError:
abort(400)
try:
expiration_date = datetime.utcfromtimestamp(float(expiration))
except ValueError:
abort(400)
if expiration_date <= datetime.now():
abort(400)
if expiration_date <= datetime.now():
abort(400)
existing_end_ts, ok = model.change_repository_tag_expiration(namespace, repository, tag,
expiration_date)
existing_end_ts, ok = registry_model.change_repository_tag_expiration(tag_ref,
expiration_date)
if ok:
if not (existing_end_ts is None and expiration_date is None):
log_action('change_tag_expiration', namespace, {
'username': get_authenticated_user().username,
'repo': repository,
'tag': tag,
'namespace': namespace,
'expiration_date': expiration_date,
'old_expiration_date': existing_end_ts
}, repo_name=repository)
if not (existing_end_ts is None and expiration_date is None):
log_action('change_tag_expiration', namespace, {
'username': get_authenticated_user().username,
'repo': repository,
'tag': tag,
'namespace': namespace,
'expiration_date': expiration_date,
'old_expiration_date': existing_end_ts
}, repo_name=repository)
else:
raise InvalidRequest('Could not update tag expiration; Tag has probably changed')
if 'image' in request.get_json():
existing_tag = registry_model.get_repo_tag(repo_ref, tag, include_legacy_image=True)
image_id = request.get_json()['image']
image = model.get_repository_image(namespace, repository, image_id)
image = registry_model.get_legacy_image(repo_ref, image_id)
if image is None:
raise NotFound()
original_image_id = model.get_repo_tag_image(repo, tag)
model.create_or_update_tag(namespace, repository, tag, image_id)
if not registry_model.retarget_tag(repo_ref, tag, image):
raise InvalidRequest('Could not move tag')
username = get_authenticated_user().username
log_action('move_tag' if original_image_id else 'create_tag', namespace, {
log_action('move_tag' if existing_tag else 'create_tag', namespace, {
'username': username,
'repo': repository,
'tag': tag,
'namespace': namespace,
'image': image_id,
'original_image': original_image_id
'original_image': existing_tag.legacy_image.docker_image_id if existing_tag else None,
}, repo_name=repository)
# TODO(jschorr): Move this into the retarget_tag call
_generate_and_store_manifest(namespace, repository, tag)
return 'Updated', 201
@ -137,7 +165,11 @@ class RepositoryTag(RepositoryParamResource):
@nickname('deleteFullTag')
def delete(self, namespace, repository, tag):
""" Delete the specified repository tag. """
model.delete_tag(namespace, repository, tag)
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
registry_model.delete_tag(repo_ref, tag)
username = get_authenticated_user().username
log_action('delete_tag', namespace,
@ -163,37 +195,28 @@ class RepositoryTagImages(RepositoryParamResource):
type=truthy_bool, default=False)
def get(self, namespace, repository, tag, parsed_args):
""" List the images for the specified repository tag. """
try:
tag_image = model.get_repo_tag_image(
Repository(namespace_name=namespace, repository_name=repository), tag)
except DataModelException:
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
if tag_image is None:
tag_ref = registry_model.get_repo_tag(repo_ref, tag, include_legacy_image=True)
if tag_ref is None:
raise NotFound()
# Find all the parent images for the tag.
parent_images = model.get_parent_images(namespace, repository, tag_image.docker_image_id)
all_images = [tag_image] + list(parent_images)
image_map = {image.docker_image_id: image for image in all_images}
skip_set = set()
image_id = tag_ref.legacy_image.docker_image_id
# Filter the images returned to those not found in the ancestry of any of the other tags in
# the repository.
all_images = None
if parsed_args['owned']:
all_tags = model.list_repository_tags(namespace, repository)
for current_tag in all_tags:
if current_tag.name == tag:
continue
all_images = registry_model.get_legacy_images_owned_by_tag(tag_ref)
else:
image_with_parents = registry_model.get_legacy_image(repo_ref, image_id, include_parents=True)
if image_with_parents is None:
raise NotFound()
skip_set.add(current_tag.image.ancestor_id)
skip_set = skip_set | set(current_tag.image.ancestor_id_list)
all_images = [image_with_parents] + image_with_parents.parents
return {
'images': [
image.to_dict(image_map) for image in all_images
if not parsed_args['owned'] or (image.ancestor_id not in skip_set)
]
'images': [image_dict(image) for image in all_images],
}
@ -226,6 +249,9 @@ class RestoreTag(RepositoryParamResource):
@validate_json_request('RestoreTag')
def post(self, namespace, repository, tag):
""" Restores a repository tag back to a previous image in the repository. """
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
# Restore the tag back to the previous image.
image_id = request.get_json()['image']
@ -239,19 +265,26 @@ class RestoreTag(RepositoryParamResource):
'tag': tag,
'image': image_id,
}
repo = Repository(namespace, repository)
if manifest_digest is not None:
existing_image = model.restore_tag_to_manifest(repo, tag, manifest_digest)
else:
existing_image = model.restore_tag_to_image(repo, tag, image_id)
_generate_and_store_manifest(namespace, repository, tag)
if existing_image is not None:
log_data['original_image'] = existing_image.docker_image_id
manifest_or_legacy_image = None
if manifest_digest is not None:
manifest_or_legacy_image = registry_model.lookup_manifest_by_digest(repo_ref, manifest_digest,
allow_dead=True)
else:
manifest_or_legacy_image = registry_model.get_legacy_image(repo_ref, image_id)
if manifest_or_legacy_image is None:
raise NotFound()
if not registry_model.retarget_tag(repo_ref, tag, manifest_or_legacy_image, is_reversion=True):
raise InvalidRequest('Could not restore tag')
if manifest_digest is None:
# TODO(jschorr): Move this into the retarget_tag call
_generate_and_store_manifest(namespace, repository, tag)
log_action('revert_tag', namespace, log_data, repo_name=repository)
return {
'image_id': image_id,
'original_image_id': existing_image.docker_image_id if existing_image else None,
}

View file

@ -1,187 +0,0 @@
import json
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from six import add_metaclass
from endpoints.api import format_date
class Tag(
namedtuple('Tag', [
'name', 'image', 'reversion', 'lifetime_start_ts', 'lifetime_end_ts', 'manifest_list',
'docker_image_id'
])):
"""
Tag represents a name to an image.
:type name: string
:type image: Image
:type reversion: boolean
:type lifetime_start_ts: int
:type lifetime_end_ts: int
:type manifest_list: [manifest_digest]
:type docker_image_id: string
"""
def to_dict(self):
tag_info = {
'name': self.name,
'docker_image_id': self.docker_image_id,
'reversion': self.reversion,
}
if self.lifetime_start_ts > 0:
tag_info['start_ts'] = self.lifetime_start_ts
if self.lifetime_end_ts > 0:
tag_info['end_ts'] = self.lifetime_end_ts
if self.manifest_list:
tag_info['manifest_digest'] = self.manifest_list
return tag_info
class RepositoryTagHistory(namedtuple('RepositoryTagHistory', ['tags', 'more'])):
"""
Tag represents a name to an image.
:type tags: [Tag]
:type more: boolean
"""
class Repository(namedtuple('Repository', ['namespace_name', 'repository_name'])):
"""
Repository a single quay repository
:type namespace_name: string
:type repository_name: string
"""
class Image(
namedtuple('Image', [
'docker_image_id', 'created', 'comment', 'command', 'storage_image_size',
'storage_uploading', 'ancestor_id_list', 'ancestor_id'
])):
"""
Image
:type docker_image_id: string
:type created: datetime
:type comment: string
:type command: string
:type storage_image_size: int
:type storage_uploading: boolean
:type ancestor_id_list: [int]
:type ancestor_id: int
"""
def to_dict(self, image_map, include_ancestors=True):
command = self.command
def docker_id(aid):
if aid not in image_map:
return ''
return image_map[aid].docker_image_id
image_data = {
'id': self.docker_image_id,
'created': format_date(self.created),
'comment': self.comment,
'command': json.loads(command) if command else None,
'size': self.storage_image_size,
'uploading': self.storage_uploading,
'sort_index': len(self.ancestor_id_list),
}
if include_ancestors:
# Calculate the ancestors string, with the DBID's replaced with the docker IDs.
ancestors = [docker_id(a) for a in self.ancestor_id_list]
image_data['ancestors'] = '/{0}/'.format('/'.join(ancestors))
return image_data
@add_metaclass(ABCMeta)
class TagDataInterface(object):
"""
Interface that represents all data store interactions required by a Tag.
"""
@abstractmethod
def list_repository_tag_history(self, namespace_name, repository_name, page=1, size=100,
specific_tag=None):
"""
Returns a RepositoryTagHistory with a list of historic tags and whether there are more tags then returned.
"""
@abstractmethod
def get_repo(self, namespace_name, repository_name):
"""
Returns a repository associated with the given namespace and repository name.
"""
@abstractmethod
def get_repo_tag_image(self, repository, tag_name):
"""
Returns an image associated with the repository and tag_name
"""
@abstractmethod
def create_or_update_tag(self, namespace_name, repository_name, tag_name, docker_image_id):
"""
Returns the repository tag if it is created.
"""
@abstractmethod
def delete_tag(self, namespace_name, repository_name, tag_name):
"""
Returns the tag for the given namespace and repository if it was created
"""
@abstractmethod
def get_parent_images(self, namespace, repository, tag_name):
"""
Returns a list of the parent images for the namespace, repository and tag specified.
"""
@abstractmethod
def list_repository_tags(self, namespace_name, repository_name):
"""
Returns a list of all tags associated with namespace_nam and repository_name
"""
@abstractmethod
def get_repository(self, namespace_name, repository_name):
"""
Returns the repository associated with the namespace_name and repository_name
"""
@abstractmethod
def get_repository_image(self, namespace_name, repository_name, docker_image_id):
"""
Returns the repository image associated with the namespace_name, repository_name, and docker
image ID.
"""
@abstractmethod
def restore_tag_to_manifest(self, repository_name, tag_name, manifest_digest):
"""
Returns the existing repo tag image if it exists or else returns None.
Side effects include adding the tag with associated name to the manifest_digest in the named repo.
"""
@abstractmethod
def restore_tag_to_image(self, repository_name, tag_name, image_id):
"""
Returns the existing repo tag image if it exists or else returns None
Side effects include adding the tag with associated name to the image with the associated id in the named repo.
"""
@abstractmethod
def change_repository_tag_expiration(self, namespace_name, repository_name, tag_name,
expiration_date):
""" Sets the expiration date of the tag under the matching repository to that given. If the
expiration date is None, then the tag will not expire. Returns a tuple of the previous
expiration timestamp in seconds (if any), and whether the operation succeeded.
"""

View file

@ -1,133 +0,0 @@
from data import model
from data.model import DataModelException, InvalidImageException
from endpoints.api.tag_models_interface import TagDataInterface, Tag, RepositoryTagHistory, Repository, Image
class PreOCIModel(TagDataInterface):
"""
PreOCIModel implements the data model for the Tags using a database schema
before it was changed to support the OCI specification.
"""
def list_repository_tag_history(self, namespace_name, repository_name, page=1, size=100,
specific_tag=None):
repository = model.repository.get_repository(namespace_name, repository_name)
if repository is None:
return None
tags, manifest_map, more = model.tag.list_repository_tag_history(repository, page, size,
specific_tag)
repository_tag_history = []
for tag in tags:
manifest_list = None
if tag.id in manifest_map:
manifest_list = manifest_map[tag.id]
repository_tag_history.append(convert_tag(tag, manifest_list))
return RepositoryTagHistory(tags=repository_tag_history, more=more)
def get_repo(self, namespace_name, repository_name):
repo = model.repository.get_repository(namespace_name, repository_name)
if repo is None:
return None
return Repository(repo.namespace_user, repo.name)
def get_repo_tag_image(self, repository, tag_name):
repo = model.repository.get_repository(str(repository.namespace_name), str(repository.repository_name))
if repo is None:
return None
try:
image = model.tag.get_repo_tag_image(repo, tag_name)
except DataModelException:
return None
return convert_image(image)
def create_or_update_tag(self, namespace_name, repository_name, tag_name, docker_image_id):
return model.tag.create_or_update_tag(namespace_name, repository_name, tag_name,
docker_image_id)
def delete_tag(self, namespace_name, repository_name, tag_name):
return model.tag.delete_tag(namespace_name, repository_name, tag_name)
def get_parent_images(self, namespace_name, repository_name, docker_image_id):
try:
image = model.image.get_image_by_id(namespace_name, repository_name, docker_image_id)
except InvalidImageException:
return []
parent_tags = model.image.get_parent_images(namespace_name, repository_name, image)
return_tags = []
for image in parent_tags:
return_tags.append(convert_image(image))
return return_tags
def list_repository_tags(self, namespace_name, repository_name):
tags = model.tag.list_repository_tags(namespace_name, repository_name)
new_tags = []
for tag in tags:
new_tags.append(convert_tag(tag))
return new_tags
def get_repository_image(self, namespace_name, repository_name, docker_image_id):
image = model.image.get_repo_image(namespace_name, repository_name, docker_image_id)
if image is None:
return None
return convert_image(image)
def get_repository(self, namespace_name, repository_name):
repo = model.repository.get_repository(namespace_name, repository_name)
if repo is None:
return None
return Repository(namespace_name=namespace_name, repository_name=repository_name)
def restore_tag_to_manifest(self, repository, tag_name, manifest_digest):
repo = model.repository.get_repository(repository.namespace_name, repository.repository_name)
if repo is None:
return None
image = model.tag.restore_tag_to_manifest(repo, tag_name, manifest_digest)
if image is None:
return None
return convert_image(image)
def restore_tag_to_image(self, repository, tag_name, image_id):
repo = model.repository.get_repository(repository.namespace_name, repository.repository_name)
if repo is None:
return None
image = model.tag.restore_tag_to_image(repo, tag_name, image_id)
if image is None:
return None
return convert_image(image)
def change_repository_tag_expiration(self, namespace_name, repository_name, tag_name,
expiration_date):
return model.tag.change_repository_tag_expiration(namespace_name, repository_name, tag_name,
expiration_date)
def convert_image(database_image):
return Image(docker_image_id=database_image.docker_image_id, created=database_image.created,
comment=database_image.comment, command=database_image.command,
storage_image_size=database_image.storage.image_size,
storage_uploading=database_image.storage.uploading,
ancestor_id_list=database_image.ancestor_id_list(),
ancestor_id=database_image.id)
def convert_tag(tag, manifest_list=None):
return Tag(name=tag.name, image=convert_image(tag.image), reversion=tag.reversion,
lifetime_start_ts=tag.lifetime_start_ts, lifetime_end_ts=tag.lifetime_end_ts,
manifest_list=manifest_list, docker_image_id=tag.image.docker_image_id)
pre_oci_model = PreOCIModel()

View file

@ -788,11 +788,11 @@ SECURITY_TESTS = [
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'public/publicrepo'}, {u'image': 'WXNG'}, 'freshuser', 403),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'public/publicrepo'}, {u'image': 'WXNG'}, 'reader', 403),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, None, 401),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, 'devtable', 400),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, 'devtable', 404),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, 'freshuser', 403),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, 'reader', 403),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, None, 401),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, 'devtable', 400),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, 'devtable', 404),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, 'freshuser', 403),
(RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, 'reader', 403),

View file

@ -1,129 +1,13 @@
import json
import pytest
from mock import patch, Mock, MagicMock, call
from data.registry_model import registry_model
from data.model import DataModelException
from endpoints.api.tag_models_interface import RepositoryTagHistory, Tag
from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
from endpoints.api.tag import RepositoryTag, RestoreTag, ListRepositoryTags, RepositoryTagImages
from features import FeatureNameValue
from test.fixtures import *
@pytest.fixture()
def get_repo_image():
def mock_callable(namespace, repository, image_id):
mock = Mock(namespace_user='devtable')
mock.name = 'simple'
img = Mock(repository=mock, docker_image_id=12) if image_id == 'image1' else None
return img
with patch('endpoints.api.tag_models_pre_oci.model.image.get_repo_image',
side_effect=mock_callable) as mk:
yield mk
@pytest.fixture()
def get_repository():
with patch('endpoints.api.tag_models_pre_oci.model.image.get_repo_image',
return_value='mock_repo') as mk:
yield mk
@pytest.fixture()
def get_repo_tag_image():
def mock_get_repo_tag_image(repository, tag):
storage_mock = Mock(image_size=1234, uploading='uploading')
def fake_ancestor_id_list():
return []
if tag == 'existing-tag':
return Mock(docker_image_id='mock_docker_image_id', created=12345, comment='comment',
command='command', storage=storage_mock, ancestors=[],
ancestor_id_list=fake_ancestor_id_list)
else:
raise DataModelException('Unable to find image for tag.')
with patch('endpoints.api.tag_models_pre_oci.model.tag.get_repo_tag_image',
side_effect=mock_get_repo_tag_image):
yield
@pytest.fixture()
def restore_tag_to_manifest():
def mock_restore_tag_to_manifest(repository, tag, manifest_digest):
tag_img = Mock(docker_image_id='mock_docker_image_id') if tag == 'existing-tag' else None
return tag_img
with patch('endpoints.api.tag_models_pre_oci.model.tag.restore_tag_to_manifest',
side_effect=mock_restore_tag_to_manifest):
yield
@pytest.fixture()
def restore_tag_to_image():
def mock_restore_tag_to_image(repository, tag, image_id):
tag_img = Mock(docker_image_id='mock_docker_image_id') if tag == 'existing-tag' else None
return tag_img
with patch('endpoints.api.tag_models_pre_oci.model.tag.restore_tag_to_image',
side_effect=mock_restore_tag_to_image):
yield
@pytest.fixture()
def create_or_update_tag():
with patch('endpoints.api.tag_models_pre_oci.model.tag.create_or_update_tag') as mk:
yield mk
@pytest.fixture()
def generate_manifest():
def mock_callable(namespace, repository, tag):
if tag == 'generatemanifestfail':
raise Exception('test_failure')
with patch('endpoints.api.tag._generate_and_store_manifest', side_effect=mock_callable) as mk:
yield mk
@pytest.fixture()
def authd_client(client):
with client_with_identity('devtable', client) as cl:
yield cl
@pytest.fixture()
def list_repository_tag_history():
def list_repository_tag_history(namespace_name, repository_name, page, size, specific_tag):
return RepositoryTagHistory(tags=[
Tag(name='First Tag', image='image', reversion=False, lifetime_start_ts=0, lifetime_end_ts=0,
manifest_list=[], docker_image_id='first docker image id'),
Tag(name='Second Tag', image='second image', reversion=True, lifetime_start_ts=10,
lifetime_end_ts=100, manifest_list=[], docker_image_id='second docker image id')
], more=False)
with patch('endpoints.api.tag.model.list_repository_tag_history',
side_effect=list_repository_tag_history):
yield
@pytest.fixture()
def find_no_repo_tag_history():
def list_repository_tag_history(namespace_name, repository_name, page, size, specific_tag):
return None
with patch('endpoints.api.tag.model.list_repository_tag_history',
side_effect=list_repository_tag_history):
yield
@pytest.mark.parametrize('expiration_time, expected_status', [
(None, 201),
('aksdjhasd', 400),
@ -161,126 +45,50 @@ def test_change_tag_expiration(client, app):
assert tag.lifetime_end_ts == updated_expiration
@pytest.mark.parametrize('test_image,test_tag,expected_status', [
('image1', '-INVALID-TAG-NAME', 400),
('image1', '.INVALID-TAG-NAME', 400),
('image1',
@pytest.mark.parametrize('image_exists,test_tag,expected_status', [
(True, '-INVALID-TAG-NAME', 400),
(True, '.INVALID-TAG-NAME', 400),
(True,
'INVALID-TAG_NAME-BECAUSE-THIS-IS-WAY-WAY-TOO-LOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOONG',
400),
('nonexistantimage', 'newtag', 404),
('image1', 'generatemanifestfail', None),
('image1', 'existing-tag', 201),
('image1', 'newtag', 201),
(False, 'newtag', 404),
(True, 'generatemanifestfail', None),
(True, 'latest', 201),
(True, 'newtag', 201),
])
def test_move_tag(test_image, test_tag, expected_status, get_repo_image, get_repo_tag_image,
create_or_update_tag, generate_manifest, authd_client):
params = {'repository': 'devtable/simple', 'tag': test_tag}
request_body = {'image': test_image}
if expected_status is None:
with pytest.raises(Exception):
conduct_api_call(authd_client, RepositoryTag, 'put', params, request_body, expected_status)
else:
conduct_api_call(authd_client, RepositoryTag, 'put', params, request_body, expected_status)
def test_move_tag(image_exists, test_tag, expected_status, client, app):
with client_with_identity('devtable', client) as cl:
test_image = 'unknown'
if image_exists:
repo_ref = registry_model.lookup_repository('devtable', 'simple')
tag_ref = registry_model.get_repo_tag(repo_ref, 'latest', include_legacy_image=True)
assert tag_ref
test_image = tag_ref.legacy_image.docker_image_id
params = {'repository': 'devtable/simple', 'tag': test_tag}
request_body = {'image': test_image}
if expected_status is None:
with pytest.raises(Exception):
conduct_api_call(cl, RepositoryTag, 'put', params, request_body, expected_status)
else:
conduct_api_call(cl, RepositoryTag, 'put', params, request_body, expected_status)
@pytest.mark.parametrize(
'namespace, repository, specific_tag, page, limit, expected_response_code, expected', [
('devtable', 'simple', None, 1, 10, 200, {
'has_additional': False
}),
('devtable', 'simple', None, 1, 10, 200, {
'page': 1
}),
('devtable', 'simple', None, 1, 10, 200, {
'tags': [{
'docker_image_id': 'first docker image id',
'name': 'First Tag',
'reversion': False
}, {
'docker_image_id': 'second docker image id',
'end_ts': 100,
'name': 'Second Tag',
'reversion': True,
'start_ts': 10
}]
}),
])
def test_list_repository_tags_view_is_correct(namespace, repository, specific_tag, page, limit,
list_repository_tag_history, expected_response_code,
expected, authd_client):
params = {
'repository': namespace + '/' + repository,
'specificTag': specific_tag,
'page': page,
'limit': limit
}
response = conduct_api_call(authd_client, ListRepositoryTags, 'get', params,
expected_code=expected_response_code)
compare_list_history_tags_response(expected, response.json)
def compare_list_history_tags_response(expected, actual):
if 'has_additional' in expected:
assert expected['has_additional'] == actual['has_additional']
if 'page' in expected:
assert expected['page'] == actual['page']
if 'tags' in expected:
assert expected['tags'] == actual['tags']
def test_no_repo_tag_history(find_no_repo_tag_history, authd_client):
params = {'repository': 'devtable/simple', 'specificTag': None, 'page': 1, 'limit': 10}
conduct_api_call(authd_client, ListRepositoryTags, 'get', params, expected_code=404)
@pytest.mark.parametrize(
'specific_tag, page, limit, expected_specific_tag, expected_page, expected_limit', [
(None, None, None, None, 1, 50),
('specific_tag', 12, 13, 'specific_tag', 12, 13),
('specific_tag', -1, 101, 'specific_tag', 1, 100),
('specific_tag', 0, 0, 'specific_tag', 1, 1),
])
def test_repo_tag_history_param_parse(specific_tag, page, limit, expected_specific_tag,
expected_page, expected_limit, authd_client):
mock = MagicMock()
mock.return_value = RepositoryTagHistory(tags=[], more=False)
with patch('endpoints.api.tag.model.list_repository_tag_history', side_effect=mock):
params = {
'repository': 'devtable/simple',
'specificTag': specific_tag,
'page': page,
'limit': limit
}
conduct_api_call(authd_client, ListRepositoryTags, 'get', params)
assert mock.call_args == call(namespace_name='devtable', repository_name='simple',
page=expected_page, size=expected_limit,
specific_tag=expected_specific_tag)
@pytest.mark.parametrize('test_manifest,test_tag,manifest_generated,expected_status', [
(None, 'newtag', True, 200),
(None, 'generatemanifestfail', True, None),
('manifest1', 'newtag', False, 200),
@pytest.mark.parametrize('repo_namespace, repo_name', [
('devtable', 'simple'),
('devtable', 'history'),
('devtable', 'complex'),
('buynlarge', 'orgrepo'),
])
def test_restore_tag(test_manifest, test_tag, manifest_generated, expected_status, get_repository,
restore_tag_to_manifest, restore_tag_to_image, generate_manifest,
authd_client):
params = {'repository': 'devtable/simple', 'tag': test_tag}
request_body = {'image': 'image1'}
if test_manifest is not None:
request_body['manifest_digest'] = test_manifest
if expected_status is None:
with pytest.raises(Exception):
conduct_api_call(authd_client, RestoreTag, 'post', params, request_body, expected_status)
else:
conduct_api_call(authd_client, RestoreTag, 'post', params, request_body, expected_status)
def test_list_repo_tags(repo_namespace, repo_name, client, app):
params = {'repository': repo_namespace + '/' + repo_name}
with client_with_identity('devtable', client) as cl:
tags = conduct_api_call(cl, ListRepositoryTags, 'get', params).json['tags']
repo_ref = registry_model.lookup_repository(repo_namespace, repo_name)
history, _ = registry_model.list_repository_tag_history(repo_ref)
assert len(tags) == len(history)
if manifest_generated:
generate_manifest.assert_called_with('devtable', 'simple', test_tag)
@pytest.mark.parametrize('repository, tag, owned, expect_images', [
('devtable/simple', 'prod', False, True),
@ -291,7 +99,8 @@ def test_restore_tag(test_manifest, test_tag, manifest_generated, expected_statu
('devtable/complex', 'prod', False, True),
('devtable/complex', 'prod', True, True),
])
def test_list_tag_images(repository, tag, owned, expect_images, authd_client):
params = {'repository': repository, 'tag': tag, 'owned': owned}
result = conduct_api_call(authd_client, RepositoryTagImages, 'get', params, None, 200).json
assert bool(result['images']) == expect_images
def test_list_tag_images(repository, tag, owned, expect_images, client, app):
with client_with_identity('devtable', client) as cl:
params = {'repository': repository, 'tag': tag, 'owned': owned}
result = conduct_api_call(cl, RepositoryTagImages, 'get', params, None, 200).json
assert bool(result['images']) == expect_images

View file

@ -1,215 +0,0 @@
import pytest
from data.model import DataModelException, InvalidImageException
from endpoints.api.tag_models_interface import RepositoryTagHistory, Tag, Repository
from mock import Mock, call
from data import model
from endpoints.api.tag_models_pre_oci import pre_oci_model
from util.morecollections import AttrDict
EMPTY_REPOSITORY = 'empty_repository'
EMPTY_NAMESPACE = 'empty_namespace'
BAD_REPOSITORY_NAME = 'bad_repository_name'
BAD_NAMESPACE_NAME = 'bad_namespace_name'
@pytest.fixture
def get_monkeypatch(monkeypatch):
return monkeypatch
def mock_out_get_repository(monkeypatch, namespace_name, repository_name):
def return_none(namespace_name, repository_name):
return None
def return_repository(namespace_name, repository_name):
return 'repository'
if namespace_name == BAD_NAMESPACE_NAME or repository_name == BAD_REPOSITORY_NAME:
return_function = return_none
else:
return_function = return_repository
monkeypatch.setattr(model.repository, 'get_repository', return_function)
def get_repo_mock(monkeypatch, return_value):
def return_return_value(namespace_name, repository_name):
return return_value
monkeypatch.setattr(model.repository, 'get_repository', return_return_value)
def test_get_repo_not_exists(get_monkeypatch):
namespace_name = 'namespace_name'
repository_name = 'repository_name'
get_repo_mock(get_monkeypatch, None)
repo = pre_oci_model.get_repo(namespace_name, repository_name)
assert repo is None
def test_get_repo_exists(get_monkeypatch):
namespace_name = 'namespace_name'
repository_name = 'repository_name'
mock = Mock()
mock.namespace_user = namespace_name
mock.name = repository_name
mock.repository = mock
get_repo_mock(get_monkeypatch, mock)
repo = pre_oci_model.get_repo(namespace_name, repository_name)
assert repo is not None
assert repo.repository_name == repository_name
assert repo.namespace_name == namespace_name
def get_repository_mock(monkeypatch, return_value):
def return_return_value(namespace_name, repository_name, kind_filter=None):
return return_value
monkeypatch.setattr(model.repository, 'get_repository', return_return_value)
def get_repo_tag_image_mock(monkeypatch, return_value):
def return_return_value(repo, tag_name, include_storage=False):
return return_value
monkeypatch.setattr(model.tag, 'get_repo_tag_image', return_return_value)
def test_get_repo_tag_image_with_repo_and_repo_tag(get_monkeypatch):
mock_storage = Mock()
mock_image = Mock()
mock_image.docker_image_id = 'some docker image id'
mock_image.created = 1235
mock_image.comment = 'some comment'
mock_image.command = 'some command'
mock_image.storage = mock_storage
mock_image.ancestors = []
get_repository_mock(get_monkeypatch, mock_image)
get_repo_tag_image_mock(get_monkeypatch, mock_image)
image = pre_oci_model.get_repo_tag_image(
Repository('namespace_name', 'repository_name'), 'tag_name')
assert image is not None
assert image.docker_image_id == 'some docker image id'
def test_get_repo_tag_image_without_repo(get_monkeypatch):
get_repository_mock(get_monkeypatch, None)
image = pre_oci_model.get_repo_tag_image(
Repository('namespace_name', 'repository_name'), 'tag_name')
assert image is None
def test_get_repo_tag_image_without_repo_tag_image(get_monkeypatch):
mock = Mock()
mock.docker_image_id = 'some docker image id'
get_repository_mock(get_monkeypatch, mock)
def raise_exception(repo, tag_name, include_storage=False):
raise DataModelException()
get_monkeypatch.setattr(model.tag, 'get_repo_tag_image', raise_exception)
image = pre_oci_model.get_repo_tag_image(
Repository('namespace_name', 'repository_name'), 'tag_name')
assert image is None
def test_create_or_update_tag(get_monkeypatch):
mock = Mock()
get_monkeypatch.setattr(model.tag, 'create_or_update_tag', mock)
pre_oci_model.create_or_update_tag('namespace_name', 'repository_name', 'tag_name',
'docker_image_id')
assert mock.call_count == 1
assert mock.call_args == call('namespace_name', 'repository_name', 'tag_name', 'docker_image_id')
def test_delete_tag(get_monkeypatch):
mock = Mock()
get_monkeypatch.setattr(model.tag, 'delete_tag', mock)
pre_oci_model.delete_tag('namespace_name', 'repository_name', 'tag_name')
assert mock.call_count == 1
assert mock.call_args == call('namespace_name', 'repository_name', 'tag_name')
def test_get_parent_images_with_exception(get_monkeypatch):
mock = Mock(side_effect=InvalidImageException)
get_monkeypatch.setattr(model.image, 'get_image_by_id', mock)
images = pre_oci_model.get_parent_images('namespace_name', 'repository_name', 'tag_name')
assert images == []
def test_get_parent_images_empty_parent_images(get_monkeypatch):
get_image_by_id_mock = Mock()
get_monkeypatch.setattr(model.image, 'get_image_by_id', get_image_by_id_mock)
get_parent_images_mock = Mock(return_value=[])
get_monkeypatch.setattr(model.image, 'get_parent_images', get_parent_images_mock)
images = pre_oci_model.get_parent_images('namespace_name', 'repository_name', 'tag_name')
assert images == []
def test_list_repository_tags(get_monkeypatch):
mock = Mock(return_value=[])
get_monkeypatch.setattr(model.tag, 'list_repository_tags', mock)
pre_oci_model.list_repository_tags('namespace_name', 'repository_name')
mock.assert_called_once_with('namespace_name', 'repository_name')
def test_get_repository(get_monkeypatch):
mock = Mock()
get_monkeypatch.setattr(model.repository, 'get_repository', mock)
pre_oci_model.get_repository('namespace_name', 'repository_name')
mock.assert_called_once_with('namespace_name', 'repository_name')
def test_tag_to_manifest(get_monkeypatch):
repo_mock = Mock()
restore_tag_mock = Mock(return_value=None)
get_repository_mock = Mock(return_value=repo_mock)
get_monkeypatch.setattr(model.tag, 'restore_tag_to_manifest', restore_tag_mock)
get_monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock)
pre_oci_model.restore_tag_to_manifest(
Repository('namespace', 'repository'), 'tag_name', 'manifest_digest')
get_repository_mock.assert_called_once_with('namespace', 'repository')
restore_tag_mock.assert_called_once_with(repo_mock, 'tag_name', 'manifest_digest')
def test__tag_to_image(get_monkeypatch):
repo_mock = Mock()
restore_tag_mock = Mock(return_value=None)
get_repository_mock = Mock(return_value=repo_mock)
get_monkeypatch.setattr(model.tag, 'restore_tag_to_image', restore_tag_mock)
get_monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock)
pre_oci_model.restore_tag_to_image(Repository('namespace', 'repository'), 'tag_name', 'image_id')
get_repository_mock.assert_called_once_with('namespace', 'repository')
restore_tag_mock.assert_called_once_with(repo_mock, 'tag_name', 'image_id')

View file

@ -2738,14 +2738,14 @@ class TestRestoreTag(ApiTestCase):
self.postResponse(RestoreTag,
params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='invalidtag'),
data=dict(image='invalid_image'), expected_code=400)
data=dict(image='invalid_image'), expected_code=404)
def test_restoretag_invalidimage(self):
self.login(ADMIN_ACCESS_USER)
self.postResponse(RestoreTag,
params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest'),
data=dict(image='invalid_image'), expected_code=400)
data=dict(image='invalid_image'), expected_code=404)
def test_restoretag_invalidmanifest(self):
self.login(ADMIN_ACCESS_USER)