""" Manage the tags of a repository. """ from datetime import datetime, timedelta from flask import request, abort from auth.auth_context import get_authenticated_user from data.model import DataModelException from endpoints.api import (resource, nickname, require_repo_read, require_repo_write, RepositoryParamResource, log_action, validate_json_request, path_param, parse_args, query_param, truthy_bool, disallow_for_app_repositories) from endpoints.api.tag_models_interface import Repository from endpoints.api.tag_models_pre_oci import pre_oci_model as model from endpoints.exception import NotFound, InvalidRequest from endpoints.v2.manifest import _generate_and_store_manifest from util.names import TAG_ERROR, TAG_REGEX @resource('/v1/repository//tag/') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class ListRepositoryTags(RepositoryParamResource): """ Resource for listing full repository tag history, alive *and dead*. """ @require_repo_read @disallow_for_app_repositories @parse_args() @query_param('specificTag', 'Filters the tags to the specific tag.', type=str, default='') @query_param('limit', 'Limit to the number of results to return per page. Max 100.', type=int, default=50) @query_param('page', 'Page index for the results. Default 1.', type=int, default=1) @nickname('listRepoTags') def get(self, namespace, repository, parsed_args): specific_tag = parsed_args.get('specificTag') or None page = max(1, parsed_args.get('page', 1)) limit = min(100, max(1, parsed_args.get('limit', 50))) tag_history = model.list_repository_tag_history(namespace_name=namespace, repository_name=repository, page=page, size=limit, specific_tag=specific_tag) if not tag_history: raise NotFound() return { 'tags': [tag.to_dict() for tag in tag_history.tags], 'page': page, 'has_additional': tag_history.more, } @resource('/v1/repository//tag/') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('tag', 'The name of the tag') class RepositoryTag(RepositoryParamResource): """ Resource for managing repository tags. """ schemas = { 'ChangeTag': { 'type': 'object', 'description': 'Makes changes to a specific tag', 'properties': { 'image': { 'type': ['string', 'null'], 'description': '(If specified) Image identifier to which the tag should point', }, 'expiration': { 'type': ['number', 'null'], 'description': '(If specified) The expiration for the image', }, }, }, } @require_repo_write @disallow_for_app_repositories @nickname('changeTag') @validate_json_request('ChangeTag') def put(self, namespace, repository, tag): """ Change which image a tag points to or create a new tag.""" if not TAG_REGEX.match(tag): abort(400, TAG_ERROR) repo = model.get_repo(namespace, repository) if not repo: raise NotFound() if 'expiration' in request.get_json(): expiration = request.get_json().get('expiration') expiration_date = None if expiration is not None: try: expiration_date = datetime.utcfromtimestamp(float(expiration)) except ValueError: abort(400) if expiration_date <= datetime.now(): abort(400) existing_end_ts, ok = model.change_repository_tag_expiration(namespace, repository, tag, expiration_date) if ok: if not (existing_end_ts is None and expiration_date is None): log_action('change_tag_expiration', namespace, { 'username': get_authenticated_user().username, 'repo': repository, 'tag': tag, 'namespace': namespace, 'expiration_date': expiration_date, 'old_expiration_date': existing_end_ts }, repo_name=repository) else: raise InvalidRequest('Could not update tag expiration; Tag has probably changed') if 'image' in request.get_json(): image_id = request.get_json()['image'] image = model.get_repository_image(namespace, repository, image_id) if image is None: raise NotFound() original_image_id = model.get_repo_tag_image(repo, tag) model.create_or_update_tag(namespace, repository, tag, image_id) username = get_authenticated_user().username log_action('move_tag' if original_image_id else 'create_tag', namespace, { 'username': username, 'repo': repository, 'tag': tag, 'namespace': namespace, 'image': image_id, 'original_image': original_image_id }, repo_name=repository) _generate_and_store_manifest(namespace, repository, tag) return 'Updated', 201 @require_repo_write @disallow_for_app_repositories @nickname('deleteFullTag') def delete(self, namespace, repository, tag): """ Delete the specified repository tag. """ model.delete_tag(namespace, repository, tag) username = get_authenticated_user().username log_action('delete_tag', namespace, {'username': username, 'repo': repository, 'namespace': namespace, 'tag': tag}, repo_name=repository) return '', 204 @resource('/v1/repository//tag//images') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('tag', 'The name of the tag') class RepositoryTagImages(RepositoryParamResource): """ Resource for listing the images in a specific repository tag. """ @require_repo_read @nickname('listTagImages') @disallow_for_app_repositories @parse_args() @query_param('owned', 'If specified, only images wholely owned by this tag are returned.', type=truthy_bool, default=False) def get(self, namespace, repository, tag, parsed_args): """ List the images for the specified repository tag. """ try: tag_image = model.get_repo_tag_image( Repository(namespace_name=namespace, repository_name=repository), tag) except DataModelException: raise NotFound() if tag_image is None: raise NotFound() parent_images = model.get_parent_images(namespace, repository, tag_image.docker_image_id) image_map = {str(tag_image.docker_image_id): tag_image} for image in parent_images: image_map[str(image.docker_image_id)] = image image_map_all = dict(image_map) all_images = [tag_image] + list(parent_images) # Filter the images returned to those not found in the ancestry of any of the other tags in # the repository. if parsed_args['owned']: all_tags = model.list_repository_tags(namespace, repository) for current_tag in all_tags: if current_tag.name == tag: continue # Remove the tag's image ID. tag_image_id = str(current_tag.docker_image_id) image_map.pop(tag_image_id, None) # Remove any ancestors: for ancestor_id in current_tag.image.ancestors.split('/'): image_map.pop(ancestor_id, None) return { 'images': [ image.to_dict(image_map_all) for image in all_images if not parsed_args['owned'] or (str(image.docker_image_id) in image_map) ] } @resource('/v1/repository//tag//restore') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('tag', 'The name of the tag') class RestoreTag(RepositoryParamResource): """ Resource for restoring a repository tag back to a previous image. """ schemas = { 'RestoreTag': { 'type': 'object', 'description': 'Restores a tag to a specific image', 'required': ['image',], 'properties': { 'image': { 'type': 'string', 'description': 'Image identifier to which the tag should point', }, 'manifest_digest': { 'type': 'string', 'description': 'If specified, the manifest digest that should be used', }, }, }, } @require_repo_write @disallow_for_app_repositories @nickname('restoreTag') @validate_json_request('RestoreTag') def post(self, namespace, repository, tag): """ Restores a repository tag back to a previous image in the repository. """ # Restore the tag back to the previous image. image_id = request.get_json()['image'] manifest_digest = request.get_json().get('manifest_digest', None) # Data for logging the reversion/restoration. username = get_authenticated_user().username log_data = { 'username': username, 'repo': repository, 'tag': tag, 'image': image_id, } repo = Repository(namespace, repository) if manifest_digest is not None: existing_image = model.restore_tag_to_manifest(repo, tag, manifest_digest) else: existing_image = model.restore_tag_to_image(repo, tag, image_id) _generate_and_store_manifest(namespace, repository, tag) if existing_image is not None: log_data['original_image'] = existing_image.docker_image_id log_action('revert_tag', namespace, log_data, repo_name=repository) return { 'image_id': image_id, 'original_image_id': existing_image.docker_image_id if existing_image else None, }