Change tags API endpoint to use new registry model interface
This commit is contained in:
		
							parent
							
								
									8225c61a1f
								
							
						
					
					
						commit
						bc99dd7963
					
				
					 7 changed files with 148 additions and 841 deletions
				
			
		|  | @ -1,20 +1,40 @@ | |||
| """ Manage the tags of a repository. """ | ||||
| 
 | ||||
| from datetime import datetime, timedelta | ||||
| from datetime import datetime | ||||
| from flask import request, abort | ||||
| 
 | ||||
| from auth.auth_context import get_authenticated_user | ||||
| from data.model import DataModelException | ||||
| from data.registry_model import registry_model | ||||
| from endpoints.api import (resource, nickname, require_repo_read, require_repo_write, | ||||
|                            RepositoryParamResource, log_action, validate_json_request, path_param, | ||||
|                            parse_args, query_param, truthy_bool, disallow_for_app_repositories) | ||||
| from endpoints.api.tag_models_interface import Repository | ||||
| from endpoints.api.tag_models_pre_oci import pre_oci_model as model | ||||
| from endpoints.api.image import image_dict | ||||
| from endpoints.exception import NotFound, InvalidRequest | ||||
| from endpoints.v2.manifest import _generate_and_store_manifest | ||||
| from util.names import TAG_ERROR, TAG_REGEX | ||||
| 
 | ||||
| 
 | ||||
| def _tag_dict(tag): | ||||
|   tag_info = { | ||||
|     'name': tag.name, | ||||
|     'reversion': tag.reversion, | ||||
|   } | ||||
| 
 | ||||
|   if tag.lifetime_start_ts > 0: | ||||
|     tag_info['start_ts'] = tag.lifetime_start_ts | ||||
| 
 | ||||
|   if tag.lifetime_end_ts > 0: | ||||
|     tag_info['end_ts'] = tag.lifetime_end_ts | ||||
| 
 | ||||
|   if tag.manifest_digest: | ||||
|     tag_info['manifest_digest'] = tag.manifest_digest | ||||
| 
 | ||||
|   if tag.legacy_image: | ||||
|     tag_info['docker_image_id'] = tag.legacy_image.docker_image_id | ||||
| 
 | ||||
|   return tag_info | ||||
| 
 | ||||
| 
 | ||||
| @resource('/v1/repository/<apirepopath:repository>/tag/') | ||||
| @path_param('repository', 'The full path of the repository. e.g. namespace/name') | ||||
| class ListRepositoryTags(RepositoryParamResource): | ||||
|  | @ -33,17 +53,17 @@ class ListRepositoryTags(RepositoryParamResource): | |||
|     page = max(1, parsed_args.get('page', 1)) | ||||
|     limit = min(100, max(1, parsed_args.get('limit', 50))) | ||||
| 
 | ||||
|     tag_history = model.list_repository_tag_history(namespace_name=namespace, | ||||
|                                                     repository_name=repository, page=page, | ||||
|                                                     size=limit, specific_tag=specific_tag) | ||||
| 
 | ||||
|     if not tag_history: | ||||
|     repo_ref = registry_model.lookup_repository(namespace, repository) | ||||
|     if repo_ref is None: | ||||
|       raise NotFound() | ||||
| 
 | ||||
|     history, has_more = registry_model.list_repository_tag_history(repo_ref, page=page, | ||||
|                                                                    size=limit, | ||||
|                                                                    specific_tag_name=specific_tag) | ||||
|     return { | ||||
|       'tags': [tag.to_dict() for tag in tag_history.tags], | ||||
|       'tags': [_tag_dict(tag) for tag in history], | ||||
|       'page': page, | ||||
|       'has_additional': tag_history.more, | ||||
|       'has_additional': has_more, | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
|  | @ -75,59 +95,67 @@ class RepositoryTag(RepositoryParamResource): | |||
|   @validate_json_request('ChangeTag') | ||||
|   def put(self, namespace, repository, tag): | ||||
|     """ Change which image a tag points to or create a new tag.""" | ||||
| 
 | ||||
|     if not TAG_REGEX.match(tag): | ||||
|       abort(400, TAG_ERROR) | ||||
| 
 | ||||
|     repo = model.get_repo(namespace, repository) | ||||
|     if not repo: | ||||
|     repo_ref = registry_model.lookup_repository(namespace, repository) | ||||
|     if repo_ref is None: | ||||
|       raise NotFound() | ||||
| 
 | ||||
|     if 'expiration' in request.get_json(): | ||||
|       tag_ref = registry_model.get_repo_tag(repo_ref, tag) | ||||
|       if tag_ref is None: | ||||
|         raise NotFound() | ||||
| 
 | ||||
|       expiration = request.get_json().get('expiration') | ||||
|       expiration_date = None | ||||
|       if expiration is not None: | ||||
|        try: | ||||
|          expiration_date = datetime.utcfromtimestamp(float(expiration)) | ||||
|        except ValueError: | ||||
|          abort(400) | ||||
|         try: | ||||
|           expiration_date = datetime.utcfromtimestamp(float(expiration)) | ||||
|         except ValueError: | ||||
|           abort(400) | ||||
| 
 | ||||
|        if expiration_date <= datetime.now(): | ||||
|          abort(400) | ||||
|         if expiration_date <= datetime.now(): | ||||
|           abort(400) | ||||
| 
 | ||||
|       existing_end_ts, ok = model.change_repository_tag_expiration(namespace, repository, tag, | ||||
|                                                                    expiration_date) | ||||
|       existing_end_ts, ok = registry_model.change_repository_tag_expiration(tag_ref, | ||||
|                                                                             expiration_date) | ||||
|       if ok: | ||||
|        if not (existing_end_ts is None and expiration_date is None): | ||||
|          log_action('change_tag_expiration', namespace, { | ||||
|            'username': get_authenticated_user().username, | ||||
|            'repo': repository, | ||||
|            'tag': tag, | ||||
|            'namespace': namespace, | ||||
|            'expiration_date': expiration_date, | ||||
|            'old_expiration_date': existing_end_ts | ||||
|          }, repo_name=repository) | ||||
|         if not (existing_end_ts is None and expiration_date is None): | ||||
|           log_action('change_tag_expiration', namespace, { | ||||
|             'username': get_authenticated_user().username, | ||||
|             'repo': repository, | ||||
|             'tag': tag, | ||||
|             'namespace': namespace, | ||||
|             'expiration_date': expiration_date, | ||||
|             'old_expiration_date': existing_end_ts | ||||
|           }, repo_name=repository) | ||||
|       else: | ||||
|         raise InvalidRequest('Could not update tag expiration; Tag has probably changed') | ||||
| 
 | ||||
|     if 'image' in request.get_json(): | ||||
|       existing_tag = registry_model.get_repo_tag(repo_ref, tag, include_legacy_image=True) | ||||
| 
 | ||||
|       image_id = request.get_json()['image'] | ||||
|       image = model.get_repository_image(namespace, repository, image_id) | ||||
|       image = registry_model.get_legacy_image(repo_ref, image_id) | ||||
|       if image is None: | ||||
|         raise NotFound() | ||||
| 
 | ||||
|       original_image_id = model.get_repo_tag_image(repo, tag) | ||||
|       model.create_or_update_tag(namespace, repository, tag, image_id) | ||||
|       if not registry_model.retarget_tag(repo_ref, tag, image): | ||||
|         raise InvalidRequest('Could not move tag') | ||||
| 
 | ||||
|       username = get_authenticated_user().username | ||||
|       log_action('move_tag' if original_image_id else 'create_tag', namespace, { | ||||
| 
 | ||||
|       log_action('move_tag' if existing_tag else 'create_tag', namespace, { | ||||
|         'username': username, | ||||
|         'repo': repository, | ||||
|         'tag': tag, | ||||
|         'namespace': namespace, | ||||
|         'image': image_id, | ||||
|         'original_image': original_image_id | ||||
|         'original_image': existing_tag.legacy_image.docker_image_id if existing_tag else None, | ||||
|       }, repo_name=repository) | ||||
| 
 | ||||
|       # TODO(jschorr): Move this into the retarget_tag call | ||||
|       _generate_and_store_manifest(namespace, repository, tag) | ||||
| 
 | ||||
|     return 'Updated', 201 | ||||
|  | @ -137,7 +165,11 @@ class RepositoryTag(RepositoryParamResource): | |||
|   @nickname('deleteFullTag') | ||||
|   def delete(self, namespace, repository, tag): | ||||
|     """ Delete the specified repository tag. """ | ||||
|     model.delete_tag(namespace, repository, tag) | ||||
|     repo_ref = registry_model.lookup_repository(namespace, repository) | ||||
|     if repo_ref is None: | ||||
|       raise NotFound() | ||||
| 
 | ||||
|     registry_model.delete_tag(repo_ref, tag) | ||||
| 
 | ||||
|     username = get_authenticated_user().username | ||||
|     log_action('delete_tag', namespace, | ||||
|  | @ -163,37 +195,28 @@ class RepositoryTagImages(RepositoryParamResource): | |||
|                type=truthy_bool, default=False) | ||||
|   def get(self, namespace, repository, tag, parsed_args): | ||||
|     """ List the images for the specified repository tag. """ | ||||
|     try: | ||||
|       tag_image = model.get_repo_tag_image( | ||||
|         Repository(namespace_name=namespace, repository_name=repository), tag) | ||||
|     except DataModelException: | ||||
|     repo_ref = registry_model.lookup_repository(namespace, repository) | ||||
|     if repo_ref is None: | ||||
|       raise NotFound() | ||||
| 
 | ||||
|     if tag_image is None: | ||||
|     tag_ref = registry_model.get_repo_tag(repo_ref, tag, include_legacy_image=True) | ||||
|     if tag_ref is None: | ||||
|       raise NotFound() | ||||
| 
 | ||||
|     # Find all the parent images for the tag. | ||||
|     parent_images = model.get_parent_images(namespace, repository, tag_image.docker_image_id) | ||||
|     all_images = [tag_image] + list(parent_images) | ||||
|     image_map = {image.docker_image_id: image for image in all_images} | ||||
|     skip_set = set() | ||||
|     image_id = tag_ref.legacy_image.docker_image_id | ||||
| 
 | ||||
|     # Filter the images returned to those not found in the ancestry of any of the other tags in | ||||
|     # the repository. | ||||
|     all_images = None | ||||
|     if parsed_args['owned']: | ||||
|       all_tags = model.list_repository_tags(namespace, repository) | ||||
|       for current_tag in all_tags: | ||||
|         if current_tag.name == tag: | ||||
|           continue | ||||
|       all_images = registry_model.get_legacy_images_owned_by_tag(tag_ref) | ||||
|     else: | ||||
|       image_with_parents = registry_model.get_legacy_image(repo_ref, image_id, include_parents=True) | ||||
|       if image_with_parents is None: | ||||
|         raise NotFound() | ||||
| 
 | ||||
|         skip_set.add(current_tag.image.ancestor_id) | ||||
|         skip_set = skip_set | set(current_tag.image.ancestor_id_list) | ||||
|       all_images = [image_with_parents] + image_with_parents.parents | ||||
| 
 | ||||
|     return { | ||||
|       'images': [ | ||||
|         image.to_dict(image_map) for image in all_images | ||||
|         if not parsed_args['owned'] or (image.ancestor_id not in skip_set) | ||||
|       ] | ||||
|       'images': [image_dict(image) for image in all_images], | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
|  | @ -226,6 +249,9 @@ class RestoreTag(RepositoryParamResource): | |||
|   @validate_json_request('RestoreTag') | ||||
|   def post(self, namespace, repository, tag): | ||||
|     """ Restores a repository tag back to a previous image in the repository. """ | ||||
|     repo_ref = registry_model.lookup_repository(namespace, repository) | ||||
|     if repo_ref is None: | ||||
|       raise NotFound() | ||||
| 
 | ||||
|     # Restore the tag back to the previous image. | ||||
|     image_id = request.get_json()['image'] | ||||
|  | @ -239,19 +265,26 @@ class RestoreTag(RepositoryParamResource): | |||
|       'tag': tag, | ||||
|       'image': image_id, | ||||
|     } | ||||
|     repo = Repository(namespace, repository) | ||||
|     if manifest_digest is not None: | ||||
|       existing_image = model.restore_tag_to_manifest(repo, tag, manifest_digest) | ||||
|     else: | ||||
|       existing_image = model.restore_tag_to_image(repo, tag, image_id) | ||||
|       _generate_and_store_manifest(namespace, repository, tag) | ||||
| 
 | ||||
|     if existing_image is not None: | ||||
|       log_data['original_image'] = existing_image.docker_image_id | ||||
|     manifest_or_legacy_image = None | ||||
|     if manifest_digest is not None: | ||||
|       manifest_or_legacy_image = registry_model.lookup_manifest_by_digest(repo_ref, manifest_digest, | ||||
|                                                                           allow_dead=True) | ||||
|     else: | ||||
|       manifest_or_legacy_image = registry_model.get_legacy_image(repo_ref, image_id) | ||||
| 
 | ||||
|     if manifest_or_legacy_image is None: | ||||
|       raise NotFound() | ||||
| 
 | ||||
|     if not registry_model.retarget_tag(repo_ref, tag, manifest_or_legacy_image, is_reversion=True): | ||||
|       raise InvalidRequest('Could not restore tag') | ||||
| 
 | ||||
|     if manifest_digest is None: | ||||
|       # TODO(jschorr): Move this into the retarget_tag call | ||||
|       _generate_and_store_manifest(namespace, repository, tag) | ||||
| 
 | ||||
|     log_action('revert_tag', namespace, log_data, repo_name=repository) | ||||
| 
 | ||||
|     return { | ||||
|       'image_id': image_id, | ||||
|       'original_image_id': existing_image.docker_image_id if existing_image else None, | ||||
|     } | ||||
|  |  | |||
|  | @ -1,187 +0,0 @@ | |||
| import json | ||||
| from abc import ABCMeta, abstractmethod | ||||
| from collections import namedtuple | ||||
| 
 | ||||
| from six import add_metaclass | ||||
| 
 | ||||
| from endpoints.api import format_date | ||||
| 
 | ||||
| 
 | ||||
| class Tag( | ||||
|   namedtuple('Tag', [ | ||||
|     'name', 'image', 'reversion', 'lifetime_start_ts', 'lifetime_end_ts', 'manifest_list', | ||||
|     'docker_image_id' | ||||
|   ])): | ||||
|   """ | ||||
|   Tag represents a name to an image. | ||||
|   :type name: string | ||||
|   :type image: Image | ||||
|   :type reversion: boolean | ||||
|   :type lifetime_start_ts: int | ||||
|   :type lifetime_end_ts: int | ||||
|   :type manifest_list: [manifest_digest] | ||||
|   :type docker_image_id: string | ||||
|   """ | ||||
| 
 | ||||
|   def to_dict(self): | ||||
|     tag_info = { | ||||
|       'name': self.name, | ||||
|       'docker_image_id': self.docker_image_id, | ||||
|       'reversion': self.reversion, | ||||
|     } | ||||
| 
 | ||||
|     if self.lifetime_start_ts > 0: | ||||
|       tag_info['start_ts'] = self.lifetime_start_ts | ||||
| 
 | ||||
|     if self.lifetime_end_ts > 0: | ||||
|       tag_info['end_ts'] = self.lifetime_end_ts | ||||
| 
 | ||||
|     if self.manifest_list: | ||||
|       tag_info['manifest_digest'] = self.manifest_list | ||||
| 
 | ||||
|     return tag_info | ||||
| 
 | ||||
| 
 | ||||
| class RepositoryTagHistory(namedtuple('RepositoryTagHistory', ['tags', 'more'])): | ||||
|   """ | ||||
|   Tag represents a name to an image. | ||||
|   :type tags: [Tag] | ||||
|   :type more: boolean | ||||
|   """ | ||||
| 
 | ||||
| 
 | ||||
| class Repository(namedtuple('Repository', ['namespace_name', 'repository_name'])): | ||||
|   """ | ||||
|   Repository a single quay repository | ||||
|   :type namespace_name: string | ||||
|   :type repository_name: string | ||||
|   """ | ||||
| 
 | ||||
| 
 | ||||
| class Image( | ||||
|   namedtuple('Image', [ | ||||
|     'docker_image_id', 'created', 'comment', 'command', 'storage_image_size', | ||||
|     'storage_uploading', 'ancestor_id_list', 'ancestor_id' | ||||
|   ])): | ||||
|   """ | ||||
|   Image | ||||
|   :type docker_image_id: string | ||||
|   :type created: datetime | ||||
|   :type comment: string | ||||
|   :type command: string | ||||
|   :type storage_image_size: int | ||||
|   :type storage_uploading: boolean | ||||
|   :type ancestor_id_list: [int] | ||||
|   :type ancestor_id: int | ||||
|   """ | ||||
| 
 | ||||
|   def to_dict(self, image_map, include_ancestors=True): | ||||
|     command = self.command | ||||
| 
 | ||||
|     def docker_id(aid): | ||||
|       if aid not in image_map: | ||||
|         return '' | ||||
| 
 | ||||
|       return image_map[aid].docker_image_id | ||||
| 
 | ||||
|     image_data = { | ||||
|       'id': self.docker_image_id, | ||||
|       'created': format_date(self.created), | ||||
|       'comment': self.comment, | ||||
|       'command': json.loads(command) if command else None, | ||||
|       'size': self.storage_image_size, | ||||
|       'uploading': self.storage_uploading, | ||||
|       'sort_index': len(self.ancestor_id_list), | ||||
|     } | ||||
| 
 | ||||
|     if include_ancestors: | ||||
|       # Calculate the ancestors string, with the DBID's replaced with the docker IDs. | ||||
|       ancestors = [docker_id(a) for a in self.ancestor_id_list] | ||||
|       image_data['ancestors'] = '/{0}/'.format('/'.join(ancestors)) | ||||
| 
 | ||||
|     return image_data | ||||
| 
 | ||||
| 
 | ||||
| @add_metaclass(ABCMeta) | ||||
| class TagDataInterface(object): | ||||
|   """ | ||||
|   Interface that represents all data store interactions required by a Tag. | ||||
|   """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def list_repository_tag_history(self, namespace_name, repository_name, page=1, size=100, | ||||
|                                   specific_tag=None): | ||||
|     """ | ||||
|     Returns a RepositoryTagHistory with a list of historic tags and whether there are more tags then returned. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def get_repo(self, namespace_name, repository_name): | ||||
|     """ | ||||
|     Returns a repository associated with the given namespace and repository name. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def get_repo_tag_image(self, repository, tag_name): | ||||
|     """ | ||||
|     Returns an image associated with the repository and tag_name | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def create_or_update_tag(self, namespace_name, repository_name, tag_name, docker_image_id): | ||||
|     """ | ||||
|     Returns the repository tag if it is created. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def delete_tag(self, namespace_name, repository_name, tag_name): | ||||
|     """ | ||||
|     Returns the tag for the given namespace and repository if it was created | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def get_parent_images(self, namespace, repository, tag_name): | ||||
|     """ | ||||
|     Returns a list of the parent images for the namespace, repository and tag specified. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def list_repository_tags(self, namespace_name, repository_name): | ||||
|     """ | ||||
|     Returns a list of all tags associated with namespace_nam and repository_name | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def get_repository(self, namespace_name, repository_name): | ||||
|     """ | ||||
|     Returns the repository associated with the namespace_name and repository_name | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def get_repository_image(self, namespace_name, repository_name, docker_image_id): | ||||
|     """ | ||||
|     Returns the repository image associated with the namespace_name, repository_name, and docker | ||||
|     image ID. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def restore_tag_to_manifest(self, repository_name, tag_name, manifest_digest): | ||||
|     """ | ||||
|     Returns the existing repo tag image if it exists or else returns None. | ||||
|     Side effects include adding the tag with associated name to the manifest_digest in the named repo. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def restore_tag_to_image(self, repository_name, tag_name, image_id): | ||||
|     """ | ||||
|     Returns the existing repo tag image if it exists or else returns None | ||||
|     Side effects include adding the tag with associated name to the image with the associated id in the named repo. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def change_repository_tag_expiration(self, namespace_name, repository_name, tag_name, | ||||
|                                        expiration_date): | ||||
|     """ Sets the expiration date of the tag under the matching repository to that given. If the | ||||
|         expiration date is None, then the tag will not expire. Returns a tuple of the previous | ||||
|         expiration timestamp in seconds (if any), and whether the operation succeeded. | ||||
|     """ | ||||
|  | @ -1,133 +0,0 @@ | |||
| from data import model | ||||
| from data.model import DataModelException, InvalidImageException | ||||
| from endpoints.api.tag_models_interface import TagDataInterface, Tag, RepositoryTagHistory, Repository, Image | ||||
| 
 | ||||
| 
 | ||||
| class PreOCIModel(TagDataInterface): | ||||
|   """ | ||||
|   PreOCIModel implements the data model for the Tags using a database schema | ||||
|   before it was changed to support the OCI specification. | ||||
|   """ | ||||
| 
 | ||||
|   def list_repository_tag_history(self, namespace_name, repository_name, page=1, size=100, | ||||
|                                   specific_tag=None): | ||||
|     repository = model.repository.get_repository(namespace_name, repository_name) | ||||
|     if repository is None: | ||||
|       return None | ||||
| 
 | ||||
|     tags, manifest_map, more = model.tag.list_repository_tag_history(repository, page, size, | ||||
|                                                                      specific_tag) | ||||
|     repository_tag_history = [] | ||||
|     for tag in tags: | ||||
|       manifest_list = None | ||||
|       if tag.id in manifest_map: | ||||
|         manifest_list = manifest_map[tag.id] | ||||
| 
 | ||||
|       repository_tag_history.append(convert_tag(tag, manifest_list)) | ||||
| 
 | ||||
|     return RepositoryTagHistory(tags=repository_tag_history, more=more) | ||||
| 
 | ||||
|   def get_repo(self, namespace_name, repository_name): | ||||
|     repo = model.repository.get_repository(namespace_name, repository_name) | ||||
|     if repo is None: | ||||
|       return None | ||||
| 
 | ||||
|     return Repository(repo.namespace_user, repo.name) | ||||
| 
 | ||||
|   def get_repo_tag_image(self, repository, tag_name): | ||||
|     repo = model.repository.get_repository(str(repository.namespace_name), str(repository.repository_name)) | ||||
|     if repo is None: | ||||
|       return None | ||||
| 
 | ||||
|     try: | ||||
|       image = model.tag.get_repo_tag_image(repo, tag_name) | ||||
|     except DataModelException: | ||||
|       return None | ||||
| 
 | ||||
|     return convert_image(image) | ||||
| 
 | ||||
|   def create_or_update_tag(self, namespace_name, repository_name, tag_name, docker_image_id): | ||||
|     return model.tag.create_or_update_tag(namespace_name, repository_name, tag_name, | ||||
|                                           docker_image_id) | ||||
| 
 | ||||
|   def delete_tag(self, namespace_name, repository_name, tag_name): | ||||
|     return model.tag.delete_tag(namespace_name, repository_name, tag_name) | ||||
| 
 | ||||
|   def get_parent_images(self, namespace_name, repository_name, docker_image_id): | ||||
|     try: | ||||
|       image = model.image.get_image_by_id(namespace_name, repository_name, docker_image_id) | ||||
|     except InvalidImageException: | ||||
|       return [] | ||||
| 
 | ||||
|     parent_tags = model.image.get_parent_images(namespace_name, repository_name, image) | ||||
|     return_tags = [] | ||||
|     for image in parent_tags: | ||||
|       return_tags.append(convert_image(image)) | ||||
| 
 | ||||
|     return return_tags | ||||
| 
 | ||||
|   def list_repository_tags(self, namespace_name, repository_name): | ||||
|     tags = model.tag.list_repository_tags(namespace_name, repository_name) | ||||
|     new_tags = [] | ||||
|     for tag in tags: | ||||
|       new_tags.append(convert_tag(tag)) | ||||
|     return new_tags | ||||
| 
 | ||||
|   def get_repository_image(self, namespace_name, repository_name, docker_image_id): | ||||
|     image = model.image.get_repo_image(namespace_name, repository_name, docker_image_id) | ||||
|     if image is None: | ||||
|       return None | ||||
| 
 | ||||
|     return convert_image(image) | ||||
| 
 | ||||
|   def get_repository(self, namespace_name, repository_name): | ||||
|     repo = model.repository.get_repository(namespace_name, repository_name) | ||||
|     if repo is None: | ||||
|       return None | ||||
| 
 | ||||
|     return Repository(namespace_name=namespace_name, repository_name=repository_name) | ||||
| 
 | ||||
|   def restore_tag_to_manifest(self, repository, tag_name, manifest_digest): | ||||
|     repo = model.repository.get_repository(repository.namespace_name, repository.repository_name) | ||||
|     if repo is None: | ||||
|       return None | ||||
| 
 | ||||
|     image = model.tag.restore_tag_to_manifest(repo, tag_name, manifest_digest) | ||||
|     if image is None: | ||||
|       return None | ||||
| 
 | ||||
|     return convert_image(image) | ||||
| 
 | ||||
|   def restore_tag_to_image(self, repository, tag_name, image_id): | ||||
|     repo = model.repository.get_repository(repository.namespace_name, repository.repository_name) | ||||
|     if repo is None: | ||||
|       return None | ||||
| 
 | ||||
|     image = model.tag.restore_tag_to_image(repo, tag_name, image_id) | ||||
|     if image is None: | ||||
|       return None | ||||
| 
 | ||||
|     return convert_image(image) | ||||
| 
 | ||||
|   def change_repository_tag_expiration(self, namespace_name, repository_name, tag_name, | ||||
|                                        expiration_date): | ||||
|     return model.tag.change_repository_tag_expiration(namespace_name, repository_name, tag_name, | ||||
|                                                       expiration_date) | ||||
| 
 | ||||
| 
 | ||||
| def convert_image(database_image): | ||||
|   return Image(docker_image_id=database_image.docker_image_id, created=database_image.created, | ||||
|                comment=database_image.comment, command=database_image.command, | ||||
|                storage_image_size=database_image.storage.image_size, | ||||
|                storage_uploading=database_image.storage.uploading, | ||||
|                ancestor_id_list=database_image.ancestor_id_list(), | ||||
|                ancestor_id=database_image.id) | ||||
| 
 | ||||
| 
 | ||||
| def convert_tag(tag, manifest_list=None): | ||||
|   return Tag(name=tag.name, image=convert_image(tag.image), reversion=tag.reversion, | ||||
|              lifetime_start_ts=tag.lifetime_start_ts, lifetime_end_ts=tag.lifetime_end_ts, | ||||
|              manifest_list=manifest_list, docker_image_id=tag.image.docker_image_id) | ||||
| 
 | ||||
| 
 | ||||
| pre_oci_model = PreOCIModel() | ||||
|  | @ -788,11 +788,11 @@ SECURITY_TESTS = [ | |||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'public/publicrepo'}, {u'image': 'WXNG'}, 'freshuser', 403), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'public/publicrepo'}, {u'image': 'WXNG'}, 'reader', 403), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, None, 401), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, 'devtable', 400), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, 'devtable', 404), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, 'freshuser', 403), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'devtable/shared'}, {u'image': 'WXNG'}, 'reader', 403), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, None, 401), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, 'devtable', 400), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, 'devtable', 404), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, 'freshuser', 403), | ||||
|   (RestoreTag, 'POST', {'tag': 'HP8R', 'repository': 'buynlarge/orgrepo'}, {u'image': 'WXNG'}, 'reader', 403), | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,129 +1,13 @@ | |||
| import json | ||||
| 
 | ||||
| import pytest | ||||
| 
 | ||||
| from mock import patch, Mock, MagicMock, call | ||||
| from data.registry_model import registry_model | ||||
| 
 | ||||
| from data.model import DataModelException | ||||
| from endpoints.api.tag_models_interface import RepositoryTagHistory, Tag | ||||
| from endpoints.api.test.shared import conduct_api_call | ||||
| from endpoints.test.shared import client_with_identity | ||||
| from endpoints.api.tag import RepositoryTag, RestoreTag, ListRepositoryTags, RepositoryTagImages | ||||
| 
 | ||||
| from features import FeatureNameValue | ||||
| 
 | ||||
| from test.fixtures import * | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def get_repo_image(): | ||||
|   def mock_callable(namespace, repository, image_id): | ||||
|     mock = Mock(namespace_user='devtable') | ||||
|     mock.name = 'simple' | ||||
|     img = Mock(repository=mock, docker_image_id=12) if image_id == 'image1' else None | ||||
|     return img | ||||
| 
 | ||||
|   with patch('endpoints.api.tag_models_pre_oci.model.image.get_repo_image', | ||||
|              side_effect=mock_callable) as mk: | ||||
|     yield mk | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def get_repository(): | ||||
|   with patch('endpoints.api.tag_models_pre_oci.model.image.get_repo_image', | ||||
|              return_value='mock_repo') as mk: | ||||
|     yield mk | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def get_repo_tag_image(): | ||||
|   def mock_get_repo_tag_image(repository, tag): | ||||
|     storage_mock = Mock(image_size=1234, uploading='uploading') | ||||
| 
 | ||||
|     def fake_ancestor_id_list(): | ||||
|       return [] | ||||
| 
 | ||||
|     if tag == 'existing-tag': | ||||
|       return Mock(docker_image_id='mock_docker_image_id', created=12345, comment='comment', | ||||
|                   command='command', storage=storage_mock, ancestors=[], | ||||
|                   ancestor_id_list=fake_ancestor_id_list) | ||||
|     else: | ||||
|       raise DataModelException('Unable to find image for tag.') | ||||
| 
 | ||||
|   with patch('endpoints.api.tag_models_pre_oci.model.tag.get_repo_tag_image', | ||||
|              side_effect=mock_get_repo_tag_image): | ||||
|     yield | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def restore_tag_to_manifest(): | ||||
|   def mock_restore_tag_to_manifest(repository, tag, manifest_digest): | ||||
|     tag_img = Mock(docker_image_id='mock_docker_image_id') if tag == 'existing-tag' else None | ||||
|     return tag_img | ||||
| 
 | ||||
|   with patch('endpoints.api.tag_models_pre_oci.model.tag.restore_tag_to_manifest', | ||||
|              side_effect=mock_restore_tag_to_manifest): | ||||
|     yield | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def restore_tag_to_image(): | ||||
|   def mock_restore_tag_to_image(repository, tag, image_id): | ||||
|     tag_img = Mock(docker_image_id='mock_docker_image_id') if tag == 'existing-tag' else None | ||||
|     return tag_img | ||||
| 
 | ||||
|   with patch('endpoints.api.tag_models_pre_oci.model.tag.restore_tag_to_image', | ||||
|              side_effect=mock_restore_tag_to_image): | ||||
|     yield | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def create_or_update_tag(): | ||||
|   with patch('endpoints.api.tag_models_pre_oci.model.tag.create_or_update_tag') as mk: | ||||
|     yield mk | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def generate_manifest(): | ||||
|   def mock_callable(namespace, repository, tag): | ||||
|     if tag == 'generatemanifestfail': | ||||
|       raise Exception('test_failure') | ||||
| 
 | ||||
|   with patch('endpoints.api.tag._generate_and_store_manifest', side_effect=mock_callable) as mk: | ||||
|     yield mk | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def authd_client(client): | ||||
|   with client_with_identity('devtable', client) as cl: | ||||
|     yield cl | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def list_repository_tag_history(): | ||||
|   def list_repository_tag_history(namespace_name, repository_name, page, size, specific_tag): | ||||
|     return RepositoryTagHistory(tags=[ | ||||
|       Tag(name='First Tag', image='image', reversion=False, lifetime_start_ts=0, lifetime_end_ts=0, | ||||
|           manifest_list=[], docker_image_id='first docker image id'), | ||||
|       Tag(name='Second Tag', image='second image', reversion=True, lifetime_start_ts=10, | ||||
|           lifetime_end_ts=100, manifest_list=[], docker_image_id='second docker image id') | ||||
|     ], more=False) | ||||
| 
 | ||||
|   with patch('endpoints.api.tag.model.list_repository_tag_history', | ||||
|              side_effect=list_repository_tag_history): | ||||
|     yield | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def find_no_repo_tag_history(): | ||||
|   def list_repository_tag_history(namespace_name, repository_name, page, size, specific_tag): | ||||
|     return None | ||||
| 
 | ||||
|   with patch('endpoints.api.tag.model.list_repository_tag_history', | ||||
|              side_effect=list_repository_tag_history): | ||||
|     yield | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize('expiration_time, expected_status', [ | ||||
|   (None, 201), | ||||
|   ('aksdjhasd', 400), | ||||
|  | @ -161,126 +45,50 @@ def test_change_tag_expiration(client, app): | |||
|     assert tag.lifetime_end_ts == updated_expiration | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize('test_image,test_tag,expected_status', [ | ||||
|   ('image1', '-INVALID-TAG-NAME', 400), | ||||
|   ('image1', '.INVALID-TAG-NAME', 400), | ||||
|   ('image1', | ||||
| @pytest.mark.parametrize('image_exists,test_tag,expected_status', [ | ||||
|   (True, '-INVALID-TAG-NAME', 400), | ||||
|   (True, '.INVALID-TAG-NAME', 400), | ||||
|   (True, | ||||
|    'INVALID-TAG_NAME-BECAUSE-THIS-IS-WAY-WAY-TOO-LOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOONG', | ||||
|    400), | ||||
|   ('nonexistantimage', 'newtag', 404), | ||||
|   ('image1', 'generatemanifestfail', None), | ||||
|   ('image1', 'existing-tag', 201), | ||||
|   ('image1', 'newtag', 201), | ||||
|   (False, 'newtag', 404), | ||||
|   (True, 'generatemanifestfail', None), | ||||
|   (True, 'latest', 201), | ||||
|   (True, 'newtag', 201), | ||||
| ]) | ||||
| def test_move_tag(test_image, test_tag, expected_status, get_repo_image, get_repo_tag_image, | ||||
|                   create_or_update_tag, generate_manifest, authd_client): | ||||
|   params = {'repository': 'devtable/simple', 'tag': test_tag} | ||||
|   request_body = {'image': test_image} | ||||
|   if expected_status is None: | ||||
|     with pytest.raises(Exception): | ||||
|       conduct_api_call(authd_client, RepositoryTag, 'put', params, request_body, expected_status) | ||||
|   else: | ||||
|     conduct_api_call(authd_client, RepositoryTag, 'put', params, request_body, expected_status) | ||||
| def test_move_tag(image_exists, test_tag, expected_status, client, app): | ||||
|   with client_with_identity('devtable', client) as cl: | ||||
|     test_image = 'unknown' | ||||
|     if image_exists: | ||||
|       repo_ref = registry_model.lookup_repository('devtable', 'simple') | ||||
|       tag_ref = registry_model.get_repo_tag(repo_ref, 'latest', include_legacy_image=True) | ||||
|       assert tag_ref | ||||
| 
 | ||||
|       test_image = tag_ref.legacy_image.docker_image_id | ||||
| 
 | ||||
|     params = {'repository': 'devtable/simple', 'tag': test_tag} | ||||
|     request_body = {'image': test_image} | ||||
|     if expected_status is None: | ||||
|       with pytest.raises(Exception): | ||||
|         conduct_api_call(cl, RepositoryTag, 'put', params, request_body, expected_status) | ||||
|     else: | ||||
|       conduct_api_call(cl, RepositoryTag, 'put', params, request_body, expected_status) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|   'namespace, repository, specific_tag, page, limit, expected_response_code, expected', [ | ||||
|     ('devtable', 'simple', None, 1, 10, 200, { | ||||
|       'has_additional': False | ||||
|     }), | ||||
|     ('devtable', 'simple', None, 1, 10, 200, { | ||||
|       'page': 1 | ||||
|     }), | ||||
|     ('devtable', 'simple', None, 1, 10, 200, { | ||||
|       'tags': [{ | ||||
|         'docker_image_id': 'first docker image id', | ||||
|         'name': 'First Tag', | ||||
|         'reversion': False | ||||
|       }, { | ||||
|         'docker_image_id': 'second docker image id', | ||||
|         'end_ts': 100, | ||||
|         'name': 'Second Tag', | ||||
|         'reversion': True, | ||||
|         'start_ts': 10 | ||||
|       }] | ||||
|     }), | ||||
|   ]) | ||||
| def test_list_repository_tags_view_is_correct(namespace, repository, specific_tag, page, limit, | ||||
|                                               list_repository_tag_history, expected_response_code, | ||||
|                                               expected, authd_client): | ||||
|   params = { | ||||
|     'repository': namespace + '/' + repository, | ||||
|     'specificTag': specific_tag, | ||||
|     'page': page, | ||||
|     'limit': limit | ||||
|   } | ||||
|   response = conduct_api_call(authd_client, ListRepositoryTags, 'get', params, | ||||
|                               expected_code=expected_response_code) | ||||
|   compare_list_history_tags_response(expected, response.json) | ||||
| 
 | ||||
| 
 | ||||
| def compare_list_history_tags_response(expected, actual): | ||||
|   if 'has_additional' in expected: | ||||
|     assert expected['has_additional'] == actual['has_additional'] | ||||
| 
 | ||||
|   if 'page' in expected: | ||||
|     assert expected['page'] == actual['page'] | ||||
| 
 | ||||
|   if 'tags' in expected: | ||||
|     assert expected['tags'] == actual['tags'] | ||||
| 
 | ||||
| 
 | ||||
| def test_no_repo_tag_history(find_no_repo_tag_history, authd_client): | ||||
|   params = {'repository': 'devtable/simple', 'specificTag': None, 'page': 1, 'limit': 10} | ||||
|   conduct_api_call(authd_client, ListRepositoryTags, 'get', params, expected_code=404) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|   'specific_tag, page, limit, expected_specific_tag, expected_page, expected_limit', [ | ||||
|     (None, None, None, None, 1, 50), | ||||
|     ('specific_tag', 12, 13, 'specific_tag', 12, 13), | ||||
|     ('specific_tag', -1, 101, 'specific_tag', 1, 100), | ||||
|     ('specific_tag', 0, 0, 'specific_tag', 1, 1), | ||||
|   ]) | ||||
| def test_repo_tag_history_param_parse(specific_tag, page, limit, expected_specific_tag, | ||||
|                                       expected_page, expected_limit, authd_client): | ||||
|   mock = MagicMock() | ||||
|   mock.return_value = RepositoryTagHistory(tags=[], more=False) | ||||
| 
 | ||||
|   with patch('endpoints.api.tag.model.list_repository_tag_history', side_effect=mock): | ||||
|     params = { | ||||
|       'repository': 'devtable/simple', | ||||
|       'specificTag': specific_tag, | ||||
|       'page': page, | ||||
|       'limit': limit | ||||
|     } | ||||
|     conduct_api_call(authd_client, ListRepositoryTags, 'get', params) | ||||
| 
 | ||||
|   assert mock.call_args == call(namespace_name='devtable', repository_name='simple', | ||||
|                                 page=expected_page, size=expected_limit, | ||||
|                                 specific_tag=expected_specific_tag) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize('test_manifest,test_tag,manifest_generated,expected_status', [ | ||||
|   (None, 'newtag', True, 200), | ||||
|   (None, 'generatemanifestfail', True, None), | ||||
|   ('manifest1', 'newtag', False, 200), | ||||
| @pytest.mark.parametrize('repo_namespace, repo_name', [ | ||||
|   ('devtable', 'simple'), | ||||
|   ('devtable', 'history'), | ||||
|   ('devtable', 'complex'), | ||||
|   ('buynlarge', 'orgrepo'), | ||||
| ]) | ||||
| def test_restore_tag(test_manifest, test_tag, manifest_generated, expected_status, get_repository, | ||||
|                      restore_tag_to_manifest, restore_tag_to_image, generate_manifest, | ||||
|                      authd_client): | ||||
|   params = {'repository': 'devtable/simple', 'tag': test_tag} | ||||
|   request_body = {'image': 'image1'} | ||||
|   if test_manifest is not None: | ||||
|     request_body['manifest_digest'] = test_manifest | ||||
|   if expected_status is None: | ||||
|     with pytest.raises(Exception): | ||||
|       conduct_api_call(authd_client, RestoreTag, 'post', params, request_body, expected_status) | ||||
|   else: | ||||
|     conduct_api_call(authd_client, RestoreTag, 'post', params, request_body, expected_status) | ||||
| def test_list_repo_tags(repo_namespace, repo_name, client, app): | ||||
|   params = {'repository': repo_namespace + '/' + repo_name} | ||||
|   with client_with_identity('devtable', client) as cl: | ||||
|     tags = conduct_api_call(cl, ListRepositoryTags, 'get', params).json['tags'] | ||||
|     repo_ref = registry_model.lookup_repository(repo_namespace, repo_name) | ||||
|     history, _ = registry_model.list_repository_tag_history(repo_ref) | ||||
|     assert len(tags) == len(history) | ||||
| 
 | ||||
|   if manifest_generated: | ||||
|     generate_manifest.assert_called_with('devtable', 'simple', test_tag) | ||||
| 
 | ||||
| @pytest.mark.parametrize('repository, tag, owned, expect_images', [ | ||||
|   ('devtable/simple', 'prod', False, True), | ||||
|  | @ -291,7 +99,8 @@ def test_restore_tag(test_manifest, test_tag, manifest_generated, expected_statu | |||
|   ('devtable/complex', 'prod', False, True), | ||||
|   ('devtable/complex', 'prod', True, True), | ||||
| ]) | ||||
| def test_list_tag_images(repository, tag, owned, expect_images, authd_client): | ||||
|   params = {'repository': repository, 'tag': tag, 'owned': owned} | ||||
|   result = conduct_api_call(authd_client, RepositoryTagImages, 'get', params, None, 200).json | ||||
|   assert bool(result['images']) == expect_images | ||||
| def test_list_tag_images(repository, tag, owned, expect_images, client, app): | ||||
|   with client_with_identity('devtable', client) as cl: | ||||
|     params = {'repository': repository, 'tag': tag, 'owned': owned} | ||||
|     result = conduct_api_call(cl, RepositoryTagImages, 'get', params, None, 200).json | ||||
|     assert bool(result['images']) == expect_images | ||||
|  |  | |||
|  | @ -1,215 +0,0 @@ | |||
| import pytest | ||||
| 
 | ||||
| from data.model import DataModelException, InvalidImageException | ||||
| from endpoints.api.tag_models_interface import RepositoryTagHistory, Tag, Repository | ||||
| from mock import Mock, call | ||||
| 
 | ||||
| from data import model | ||||
| from endpoints.api.tag_models_pre_oci import pre_oci_model | ||||
| from util.morecollections import AttrDict | ||||
| 
 | ||||
| EMPTY_REPOSITORY = 'empty_repository' | ||||
| EMPTY_NAMESPACE = 'empty_namespace' | ||||
| BAD_REPOSITORY_NAME = 'bad_repository_name' | ||||
| BAD_NAMESPACE_NAME = 'bad_namespace_name' | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture | ||||
| def get_monkeypatch(monkeypatch): | ||||
|   return monkeypatch | ||||
| 
 | ||||
| 
 | ||||
| def mock_out_get_repository(monkeypatch, namespace_name, repository_name): | ||||
|   def return_none(namespace_name, repository_name): | ||||
|     return None | ||||
| 
 | ||||
|   def return_repository(namespace_name, repository_name): | ||||
|     return 'repository' | ||||
| 
 | ||||
|   if namespace_name == BAD_NAMESPACE_NAME or repository_name == BAD_REPOSITORY_NAME: | ||||
|     return_function = return_none | ||||
|   else: | ||||
|     return_function = return_repository | ||||
| 
 | ||||
|   monkeypatch.setattr(model.repository, 'get_repository', return_function) | ||||
| 
 | ||||
| 
 | ||||
| def get_repo_mock(monkeypatch, return_value): | ||||
|   def return_return_value(namespace_name, repository_name): | ||||
|     return return_value | ||||
| 
 | ||||
|   monkeypatch.setattr(model.repository, 'get_repository', return_return_value) | ||||
| 
 | ||||
| 
 | ||||
| def test_get_repo_not_exists(get_monkeypatch): | ||||
|   namespace_name = 'namespace_name' | ||||
|   repository_name = 'repository_name' | ||||
|   get_repo_mock(get_monkeypatch, None) | ||||
|   repo = pre_oci_model.get_repo(namespace_name, repository_name) | ||||
| 
 | ||||
|   assert repo is None | ||||
| 
 | ||||
| 
 | ||||
| def test_get_repo_exists(get_monkeypatch): | ||||
|   namespace_name = 'namespace_name' | ||||
|   repository_name = 'repository_name' | ||||
|   mock = Mock() | ||||
|   mock.namespace_user = namespace_name | ||||
|   mock.name = repository_name | ||||
|   mock.repository = mock | ||||
|   get_repo_mock(get_monkeypatch, mock) | ||||
| 
 | ||||
|   repo = pre_oci_model.get_repo(namespace_name, repository_name) | ||||
| 
 | ||||
|   assert repo is not None | ||||
|   assert repo.repository_name == repository_name | ||||
|   assert repo.namespace_name == namespace_name | ||||
| 
 | ||||
| 
 | ||||
| def get_repository_mock(monkeypatch, return_value): | ||||
|   def return_return_value(namespace_name, repository_name, kind_filter=None): | ||||
|     return return_value | ||||
| 
 | ||||
|   monkeypatch.setattr(model.repository, 'get_repository', return_return_value) | ||||
| 
 | ||||
| 
 | ||||
| def get_repo_tag_image_mock(monkeypatch, return_value): | ||||
|   def return_return_value(repo, tag_name, include_storage=False): | ||||
|     return return_value | ||||
| 
 | ||||
|   monkeypatch.setattr(model.tag, 'get_repo_tag_image', return_return_value) | ||||
| 
 | ||||
| 
 | ||||
| def test_get_repo_tag_image_with_repo_and_repo_tag(get_monkeypatch): | ||||
|   mock_storage = Mock() | ||||
|   mock_image = Mock() | ||||
|   mock_image.docker_image_id = 'some docker image id' | ||||
|   mock_image.created = 1235 | ||||
|   mock_image.comment = 'some comment' | ||||
|   mock_image.command = 'some command' | ||||
|   mock_image.storage = mock_storage | ||||
|   mock_image.ancestors = [] | ||||
| 
 | ||||
|   get_repository_mock(get_monkeypatch, mock_image) | ||||
|   get_repo_tag_image_mock(get_monkeypatch, mock_image) | ||||
| 
 | ||||
|   image = pre_oci_model.get_repo_tag_image( | ||||
|     Repository('namespace_name', 'repository_name'), 'tag_name') | ||||
| 
 | ||||
|   assert image is not None | ||||
|   assert image.docker_image_id == 'some docker image id' | ||||
| 
 | ||||
| 
 | ||||
| def test_get_repo_tag_image_without_repo(get_monkeypatch): | ||||
|   get_repository_mock(get_monkeypatch, None) | ||||
| 
 | ||||
|   image = pre_oci_model.get_repo_tag_image( | ||||
|     Repository('namespace_name', 'repository_name'), 'tag_name') | ||||
| 
 | ||||
|   assert image is None | ||||
| 
 | ||||
| 
 | ||||
| def test_get_repo_tag_image_without_repo_tag_image(get_monkeypatch): | ||||
|   mock = Mock() | ||||
|   mock.docker_image_id = 'some docker image id' | ||||
|   get_repository_mock(get_monkeypatch, mock) | ||||
| 
 | ||||
|   def raise_exception(repo, tag_name, include_storage=False): | ||||
|     raise DataModelException() | ||||
| 
 | ||||
|   get_monkeypatch.setattr(model.tag, 'get_repo_tag_image', raise_exception) | ||||
| 
 | ||||
|   image = pre_oci_model.get_repo_tag_image( | ||||
|     Repository('namespace_name', 'repository_name'), 'tag_name') | ||||
| 
 | ||||
|   assert image is None | ||||
| 
 | ||||
| 
 | ||||
| def test_create_or_update_tag(get_monkeypatch): | ||||
|   mock = Mock() | ||||
|   get_monkeypatch.setattr(model.tag, 'create_or_update_tag', mock) | ||||
| 
 | ||||
|   pre_oci_model.create_or_update_tag('namespace_name', 'repository_name', 'tag_name', | ||||
|                                      'docker_image_id') | ||||
| 
 | ||||
|   assert mock.call_count == 1 | ||||
|   assert mock.call_args == call('namespace_name', 'repository_name', 'tag_name', 'docker_image_id') | ||||
| 
 | ||||
| 
 | ||||
| def test_delete_tag(get_monkeypatch): | ||||
|   mock = Mock() | ||||
|   get_monkeypatch.setattr(model.tag, 'delete_tag', mock) | ||||
| 
 | ||||
|   pre_oci_model.delete_tag('namespace_name', 'repository_name', 'tag_name') | ||||
| 
 | ||||
|   assert mock.call_count == 1 | ||||
|   assert mock.call_args == call('namespace_name', 'repository_name', 'tag_name') | ||||
| 
 | ||||
| 
 | ||||
| def test_get_parent_images_with_exception(get_monkeypatch): | ||||
|   mock = Mock(side_effect=InvalidImageException) | ||||
| 
 | ||||
|   get_monkeypatch.setattr(model.image, 'get_image_by_id', mock) | ||||
| 
 | ||||
|   images = pre_oci_model.get_parent_images('namespace_name', 'repository_name', 'tag_name') | ||||
|   assert images == [] | ||||
| 
 | ||||
| 
 | ||||
| def test_get_parent_images_empty_parent_images(get_monkeypatch): | ||||
|   get_image_by_id_mock = Mock() | ||||
|   get_monkeypatch.setattr(model.image, 'get_image_by_id', get_image_by_id_mock) | ||||
| 
 | ||||
|   get_parent_images_mock = Mock(return_value=[]) | ||||
|   get_monkeypatch.setattr(model.image, 'get_parent_images', get_parent_images_mock) | ||||
| 
 | ||||
|   images = pre_oci_model.get_parent_images('namespace_name', 'repository_name', 'tag_name') | ||||
|   assert images == [] | ||||
| 
 | ||||
| 
 | ||||
| def test_list_repository_tags(get_monkeypatch): | ||||
|   mock = Mock(return_value=[]) | ||||
| 
 | ||||
|   get_monkeypatch.setattr(model.tag, 'list_repository_tags', mock) | ||||
| 
 | ||||
|   pre_oci_model.list_repository_tags('namespace_name', 'repository_name') | ||||
| 
 | ||||
|   mock.assert_called_once_with('namespace_name', 'repository_name') | ||||
| 
 | ||||
| 
 | ||||
| def test_get_repository(get_monkeypatch): | ||||
|   mock = Mock() | ||||
| 
 | ||||
|   get_monkeypatch.setattr(model.repository, 'get_repository', mock) | ||||
| 
 | ||||
|   pre_oci_model.get_repository('namespace_name', 'repository_name') | ||||
| 
 | ||||
|   mock.assert_called_once_with('namespace_name', 'repository_name') | ||||
| 
 | ||||
| 
 | ||||
| def test_tag_to_manifest(get_monkeypatch): | ||||
|   repo_mock = Mock() | ||||
|   restore_tag_mock = Mock(return_value=None) | ||||
|   get_repository_mock = Mock(return_value=repo_mock) | ||||
| 
 | ||||
|   get_monkeypatch.setattr(model.tag, 'restore_tag_to_manifest', restore_tag_mock) | ||||
|   get_monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock) | ||||
| 
 | ||||
|   pre_oci_model.restore_tag_to_manifest( | ||||
|     Repository('namespace', 'repository'), 'tag_name', 'manifest_digest') | ||||
| 
 | ||||
|   get_repository_mock.assert_called_once_with('namespace', 'repository') | ||||
|   restore_tag_mock.assert_called_once_with(repo_mock, 'tag_name', 'manifest_digest') | ||||
| 
 | ||||
| 
 | ||||
| def test__tag_to_image(get_monkeypatch): | ||||
|   repo_mock = Mock() | ||||
|   restore_tag_mock = Mock(return_value=None) | ||||
|   get_repository_mock = Mock(return_value=repo_mock) | ||||
| 
 | ||||
|   get_monkeypatch.setattr(model.tag, 'restore_tag_to_image', restore_tag_mock) | ||||
|   get_monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock) | ||||
| 
 | ||||
|   pre_oci_model.restore_tag_to_image(Repository('namespace', 'repository'), 'tag_name', 'image_id') | ||||
| 
 | ||||
|   get_repository_mock.assert_called_once_with('namespace', 'repository') | ||||
|   restore_tag_mock.assert_called_once_with(repo_mock, 'tag_name', 'image_id') | ||||
		Reference in a new issue