227 lines
7.7 KiB
Python
227 lines
7.7 KiB
Python
""" Manage the tags of a repository. """
|
|
|
|
from flask import request, abort
|
|
|
|
from endpoints.api import (resource, nickname, require_repo_read, require_repo_write,
|
|
RepositoryParamResource, log_action, NotFound, validate_json_request,
|
|
path_param, parse_args, query_param, truthy_bool)
|
|
from endpoints.api.image import image_view
|
|
from data import model
|
|
from auth.auth_context import get_authenticated_user
|
|
from util.names import TAG_ERROR, TAG_REGEX
|
|
|
|
|
|
@resource('/v1/repository/<repopath:repository>/tag/')
|
|
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
|
|
class ListRepositoryTags(RepositoryParamResource):
|
|
""" Resource for listing full repository tag history, alive *and dead*. """
|
|
|
|
@require_repo_read
|
|
@parse_args
|
|
@query_param('specificTag', 'Filters the tags to the specific tag.', type=str, default='')
|
|
@query_param('limit', 'Limit to the number of results to return per page. Max 100.', type=int, default=50)
|
|
@query_param('page', 'Page index for the results. Default 1.', type=int, default=1)
|
|
@nickname('listRepoTags')
|
|
def get(self, args, namespace, repository):
|
|
repo = model.repository.get_repository(namespace, repository)
|
|
if not repo:
|
|
raise NotFound()
|
|
|
|
def tag_view(tag):
|
|
tag_info = {
|
|
'name': tag.name,
|
|
'docker_image_id': tag.image.docker_image_id,
|
|
'reversion': tag.reversion,
|
|
}
|
|
|
|
if tag.lifetime_start_ts > 0:
|
|
tag_info['start_ts'] = tag.lifetime_start_ts
|
|
|
|
if tag.lifetime_end_ts > 0:
|
|
tag_info['end_ts'] = tag.lifetime_end_ts
|
|
|
|
return tag_info
|
|
|
|
specific_tag = args.get('specificTag') or None
|
|
|
|
page = max(1, args.get('page', 1))
|
|
limit = min(100, max(1, args.get('limit', 50)))
|
|
|
|
# Note: We ask for limit+1 here, so we can check to see if there are
|
|
# additional pages of results.
|
|
tags = model.tag.list_repository_tag_history(repo, page=page, size=limit+1,
|
|
specific_tag=specific_tag)
|
|
|
|
tags = list(tags)
|
|
return {
|
|
'tags': [tag_view(tag) for tag in tags[0:limit]],
|
|
'page': page,
|
|
'has_additional': len(tags) >= limit
|
|
}
|
|
|
|
|
|
@resource('/v1/repository/<repopath:repository>/tag/<tag>')
|
|
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
|
|
@path_param('tag', 'The name of the tag')
|
|
class RepositoryTag(RepositoryParamResource):
|
|
""" Resource for managing repository tags. """
|
|
schemas = {
|
|
'MoveTag': {
|
|
'type': 'object',
|
|
'description': 'Description of to which image a new or existing tag should point',
|
|
'required': [
|
|
'image',
|
|
],
|
|
'properties': {
|
|
'image': {
|
|
'type': 'string',
|
|
'description': 'Image identifier to which the tag should point',
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_repo_write
|
|
@nickname('changeTagImage')
|
|
@validate_json_request('MoveTag')
|
|
def put(self, namespace, repository, tag):
|
|
""" Change which image a tag points to or create a new tag."""
|
|
|
|
if not TAG_REGEX.match(tag):
|
|
abort(400, TAG_ERROR)
|
|
|
|
image_id = request.get_json()['image']
|
|
image = model.image.get_repo_image(namespace, repository, image_id)
|
|
if not image:
|
|
raise NotFound()
|
|
|
|
original_image_id = None
|
|
try:
|
|
original_tag_image = model.tag.get_tag_image(namespace, repository, tag)
|
|
if original_tag_image:
|
|
original_image_id = original_tag_image.docker_image_id
|
|
except model.DataModelException:
|
|
# This is a new tag.
|
|
pass
|
|
|
|
model.tag.create_or_update_tag(namespace, repository, tag, image_id)
|
|
|
|
username = get_authenticated_user().username
|
|
log_action('move_tag' if original_image_id else 'create_tag', namespace,
|
|
{'username': username, 'repo': repository, 'tag': tag,
|
|
'image': image_id, 'original_image': original_image_id},
|
|
repo=model.repository.get_repository(namespace, repository))
|
|
|
|
return 'Updated', 201
|
|
|
|
@require_repo_write
|
|
@nickname('deleteFullTag')
|
|
def delete(self, namespace, repository, tag):
|
|
""" Delete the specified repository tag. """
|
|
model.tag.delete_tag(namespace, repository, tag)
|
|
|
|
username = get_authenticated_user().username
|
|
log_action('delete_tag', namespace,
|
|
{'username': username, 'repo': repository, 'tag': tag},
|
|
repo=model.repository.get_repository(namespace, repository))
|
|
|
|
return 'Deleted', 204
|
|
|
|
|
|
@resource('/v1/repository/<repopath:repository>/tag/<tag>/images')
|
|
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
|
|
@path_param('tag', 'The name of the tag')
|
|
class RepositoryTagImages(RepositoryParamResource):
|
|
""" Resource for listing the images in a specific repository tag. """
|
|
@require_repo_read
|
|
@nickname('listTagImages')
|
|
@parse_args
|
|
@query_param('owned', 'If specified, only images wholely owned by this tag are returned.',
|
|
type=truthy_bool, default=False)
|
|
def get(self, args, namespace, repository, tag):
|
|
""" List the images for the specified repository tag. """
|
|
try:
|
|
tag_image = model.tag.get_tag_image(namespace, repository, tag)
|
|
except model.DataModelException:
|
|
raise NotFound()
|
|
|
|
parent_images = model.image.get_parent_images(namespace, repository, tag_image)
|
|
image_map = {}
|
|
|
|
image_map[str(tag_image.id)] = tag_image
|
|
|
|
for image in parent_images:
|
|
image_map[str(image.id)] = image
|
|
|
|
image_map_all = dict(image_map)
|
|
all_images = [tag_image] + list(parent_images)
|
|
|
|
# Filter the images returned to those not found in the ancestry of any of the other tags in
|
|
# the repository.
|
|
if args['owned']:
|
|
all_tags = model.tag.list_repository_tags(namespace, repository)
|
|
for current_tag in all_tags:
|
|
if current_tag.name == tag:
|
|
continue
|
|
|
|
# Remove the tag's image ID.
|
|
tag_image_id = str(current_tag.image_id)
|
|
image_map.pop(tag_image_id, None)
|
|
|
|
# Remove any ancestors:
|
|
for ancestor_id in current_tag.image.ancestors.split('/'):
|
|
image_map.pop(ancestor_id, None)
|
|
|
|
return {
|
|
'images': [image_view(image, image_map_all) for image in all_images
|
|
if not args['owned'] or (str(image.id) in image_map)]
|
|
}
|
|
|
|
|
|
|
|
@resource('/v1/repository/<repopath:repository>/tag/<tag>/revert')
|
|
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
|
|
@path_param('tag', 'The name of the tag')
|
|
class RevertTag(RepositoryParamResource):
|
|
""" Resource for reverting a repository tag back to a previous image. """
|
|
schemas = {
|
|
'RevertTag': {
|
|
'type': 'object',
|
|
'description': 'Reverts a tag to a specific image',
|
|
'required': [
|
|
'image',
|
|
],
|
|
'properties': {
|
|
'image': {
|
|
'type': 'string',
|
|
'description': 'Image identifier to which the tag should point',
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_repo_write
|
|
@nickname('revertTag')
|
|
@validate_json_request('RevertTag')
|
|
def post(self, namespace, repository, tag):
|
|
""" Reverts a repository tag back to a previous image in the repository. """
|
|
try:
|
|
tag_image = model.tag.get_tag_image(namespace, repository, tag)
|
|
except model.DataModelException:
|
|
raise NotFound()
|
|
|
|
# Revert the tag back to the previous image.
|
|
image_id = request.get_json()['image']
|
|
model.tag.revert_tag(tag_image.repository, tag, image_id)
|
|
|
|
# Log the reversion.
|
|
username = get_authenticated_user().username
|
|
log_action('revert_tag', namespace,
|
|
{'username': username, 'repo': repository, 'tag': tag,
|
|
'image': image_id, 'original_image': tag_image.docker_image_id},
|
|
repo=model.repository.get_repository(namespace, repository))
|
|
|
|
return {
|
|
'image_id': image_id,
|
|
'original_image_id': tag_image.docker_image_id
|
|
}
|