from flask import request, abort from endpoints.api import (resource, nickname, require_repo_read, require_repo_write, RepositoryParamResource, log_action, NotFound, validate_json_request, path_param, format_date, parse_args, query_param) from endpoints.api.image import image_view from data import model from auth.auth_context import get_authenticated_user from datetime import datetime @resource('/v1/repository//tag/') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class ListRepositoryTags(RepositoryParamResource): """ Resource for listing repository tags. """ @require_repo_write @parse_args @query_param('specificTag', 'Filters the tags to the specific tag.', type=str, default='') @query_param('limit', 'Limit to the number of results to return. Max 100.', type=int, default=50) @nickname('listRepoTags') def get(self, args, namespace, repository): repo = model.get_repository(namespace, repository) if not repo: abort(404) def tag_view(tag): tag_info = { 'name': tag.name, 'docker_image_id': tag.image.docker_image_id, 'reversion': tag.reversion, } if tag.lifetime_start_ts > 0: tag_info['start_ts'] = tag.lifetime_start_ts if tag.lifetime_end_ts > 0: tag_info['end_ts'] = tag.lifetime_end_ts return tag_info specific_tag = args.get('specificTag') or None limit = min(100, max(1, args.get('limit', 50))) tags = model.list_repository_tag_history(repo, limit=limit, specific_tag=specific_tag) return {'tags': [tag_view(tag) for tag in tags]} @resource('/v1/repository//tag/') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('tag', 'The name of the tag') class RepositoryTag(RepositoryParamResource): """ Resource for managing repository tags. """ schemas = { 'MoveTag': { 'id': 'MoveTag', 'type': 'object', 'description': 'Description of to which image a new or existing tag should point', 'required': [ 'image', ], 'properties': { 'image': { 'type': 'string', 'description': 'Image identifier to which the tag should point', }, }, }, } @require_repo_write @nickname('changeTagImage') @validate_json_request('MoveTag') def put(self, namespace, repository, tag): """ Change which image a tag points to or create a new tag.""" image_id = request.get_json()['image'] image = model.get_repo_image(namespace, repository, image_id) if not image: raise NotFound() original_image_id = None try: original_tag_image = model.get_tag_image(namespace, repository, tag) if original_tag_image: original_image_id = original_tag_image.docker_image_id except model.DataModelException: # This is a new tag. pass model.create_or_update_tag(namespace, repository, tag, image_id) model.garbage_collect_repository(namespace, repository) username = get_authenticated_user().username log_action('move_tag' if original_image_id else 'create_tag', namespace, {'username': username, 'repo': repository, 'tag': tag, 'image': image_id, 'original_image': original_image_id}, repo=model.get_repository(namespace, repository)) return 'Updated', 201 @require_repo_write @nickname('deleteFullTag') def delete(self, namespace, repository, tag): """ Delete the specified repository tag. """ model.delete_tag(namespace, repository, tag) model.garbage_collect_repository(namespace, repository) username = get_authenticated_user().username log_action('delete_tag', namespace, {'username': username, 'repo': repository, 'tag': tag}, repo=model.get_repository(namespace, repository)) return 'Deleted', 204 @resource('/v1/repository//tag//images') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('tag', 'The name of the tag') class RepositoryTagImages(RepositoryParamResource): """ Resource for listing the images in a specific repository tag. """ @require_repo_read @nickname('listTagImages') def get(self, namespace, repository, tag): """ List the images for the specified repository tag. """ try: tag_image = model.get_tag_image(namespace, repository, tag) except model.DataModelException: raise NotFound() parent_images = model.get_parent_images(namespace, repository, tag_image) image_map = {} for image in parent_images: image_map[str(image.id)] = image parents = list(parent_images) parents.reverse() all_images = [tag_image] + parents return { 'images': [image_view(image, image_map) for image in all_images] } @resource('/v1/repository//tag//revert') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('tag', 'The name of the tag') class RevertTag(RepositoryParamResource): """ Resource for reverting a repository tag back to a previous image. """ schemas = { 'RevertTag': { 'id': 'RevertTag', 'type': 'object', 'description': 'Reverts a tag to a specific image', 'required': [ 'image', ], 'properties': { 'image': { 'type': 'string', 'description': 'Image identifier to which the tag should point', }, }, }, } @require_repo_write @nickname('revertTag') @validate_json_request('RevertTag') def post(self, namespace, repository, tag): """ Reverts a repository tag back to a previous image in the repository. """ try: tag_image = model.get_tag_image(namespace, repository, tag) except model.DataModelException: raise NotFound() # Revert the tag back to the previous image. image_id = request.get_json()['image'] model.revert_tag(tag_image.repository, tag, image_id) model.garbage_collect_repository(namespace, repository) # Log the reversion. username = get_authenticated_user().username log_action('revert_tag', namespace, {'username': username, 'repo': repository, 'tag': tag, 'image': image_id, 'original_image': tag_image.docker_image_id}, repo=model.get_repository(namespace, repository)) return { 'image_id': image_id, 'original_image_id': tag_image.docker_image_id }