This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/repository.py

378 lines
13 KiB
Python
Raw Normal View History

2015-05-14 20:47:38 +00:00
""" List, create and manage repositories. """
import logging
2015-03-10 05:03:39 +00:00
import datetime
import features
2015-03-11 00:22:46 +00:00
from datetime import timedelta
from flask import request
from data import model
from data.database import (Repository as RepositoryTable, Visibility, RepositoryTag,
RepositoryActionCount, Namespace, fn)
from endpoints.api import (truthy_bool, format_date, nickname, log_action, validate_json_request,
require_repo_read, require_repo_write, require_repo_admin,
RepositoryParamResource, resource, query_param, parse_args, ApiResource,
request_error, require_scope, Unauthorized, NotFound, InvalidRequest,
path_param, ExceedsLicenseException)
from endpoints.api.billing import lookup_allowed_private_repos
from auth.permissions import (ModifyRepositoryPermission, AdministerRepositoryPermission,
CreateRepositoryPermission)
from auth.auth_context import get_authenticated_user
from auth import scopes
from util.names import REPOSITORY_NAME_REGEX
2014-03-11 03:54:55 +00:00
logger = logging.getLogger(__name__)
def check_allowed_private_repos(namespace):
""" Checks to see if the given namespace has reached its private repository limit. If so,
raises a ExceedsLicenseException.
"""
# Not enabled if billing is disabled.
if not features.BILLING:
return
if not lookup_allowed_private_repos(namespace):
raise ExceedsLicenseException()
@resource('/v1/repository')
class RepositoryList(ApiResource):
"""Operations for creating and listing repositories."""
2014-03-11 03:54:55 +00:00
schemas = {
'NewRepo': {
'type': 'object',
'description': 'Description of a new repository',
'required': [
'repository',
'visibility',
'description',
],
2014-03-11 03:54:55 +00:00
'properties': {
'repository': {
'type': 'string',
'description': 'Repository name',
2014-03-11 03:54:55 +00:00
},
'visibility': {
'type': 'string',
'description': 'Visibility which the repository will start with',
2014-03-11 03:54:55 +00:00
'enum': [
'public',
'private',
],
2014-03-11 03:54:55 +00:00
},
'namespace': {
'type': 'string',
'description': ('Namespace in which the repository should be created. If omitted, the '
'username of the caller is used'),
2014-03-11 03:54:55 +00:00
},
'description': {
'type': 'string',
'description': 'Markdown encoded description for the repository',
2014-03-11 03:54:55 +00:00
},
},
},
2014-03-11 03:54:55 +00:00
}
@require_scope(scopes.CREATE_REPO)
@nickname('createRepo')
2014-03-11 03:54:55 +00:00
@validate_json_request('NewRepo')
def post(self):
"""Create a new repository."""
owner = get_authenticated_user()
2014-03-11 03:54:55 +00:00
req = request.get_json()
if owner is None and 'namespace' not in 'req':
raise InvalidRequest('Must provide a namespace or must be logged in.')
2014-03-11 03:54:55 +00:00
namespace_name = req['namespace'] if 'namespace' in req else owner.username
permission = CreateRepositoryPermission(namespace_name)
if permission.can():
repository_name = req['repository']
visibility = req['visibility']
existing = model.repository.get_repository(namespace_name, repository_name)
2014-03-11 03:54:55 +00:00
if existing:
raise request_error(message='Repository already exists')
2014-03-11 03:54:55 +00:00
visibility = req['visibility']
if visibility == 'private':
check_allowed_private_repos(namespace_name)
2014-03-11 03:54:55 +00:00
# Verify that the repository name is valid.
if not REPOSITORY_NAME_REGEX.match(repository_name):
raise InvalidRequest('Invalid repository name')
repo = model.repository.create_repository(namespace_name, repository_name, owner, visibility)
2014-03-11 03:54:55 +00:00
repo.description = req['description']
repo.save()
log_action('create_repo', namespace_name, {'repo': repository_name,
'namespace': namespace_name}, repo=repo)
return {
2014-03-11 03:54:55 +00:00
'namespace': namespace_name,
'name': repository_name
}, 201
2014-03-11 03:54:55 +00:00
raise Unauthorized()
def _load_repositories(self, namespace=None, public=False, starred=False, limit=None, page=None):
""" Loads the filtered list of repositories and returns it and the star lookup set. """
# Load the starred repositories and username (if applicable)
username = get_authenticated_user().username if get_authenticated_user() else None
# If starred (and only starred) repositories were requested, then load them directly.
if starred and namespace is None and not public:
if not username:
2015-07-28 19:47:04 +00:00
return [], set()
repositories = model.repository.get_user_starred_repositories(get_authenticated_user(),
limit=limit,
page=page)
return repositories, set([repo.id for repo in repositories])
# Otherwise, conduct a full filtered lookup and (optionally) filter by the starred repositories.
starred_repos = []
star_lookup = set()
if username:
starred_repos = model.repository.get_user_starred_repositories(get_authenticated_user())
star_lookup = set([repo.id for repo in starred_repos])
# If the user asked for only public repositories, limit to only public repos.
if public and (not namespace and not starred):
username = None
# Find the matching repositories.
repositories = model.repository.get_visible_repositories(username=username,
limit=limit,
page=page,
include_public=public,
namespace=namespace)
# Filter down to just the starred repositories, if asked.
if starred:
return [repo for repo in repositories if repo.id in star_lookup], star_lookup
return repositories, star_lookup
@require_scope(scopes.READ_REPO)
@nickname('listRepos')
@parse_args
@query_param('page', 'Offset page number. (int)', type=int)
@query_param('limit', 'Limit on the number of results (int)', type=int)
@query_param('namespace', 'Filters the repositories returned to this namespace', type=str)
@query_param('starred', 'Filters the repositories returned to those starred by the user',
type=truthy_bool, default=False)
@query_param('public', 'Adds any repositories visible to the user by virtue of being public',
type=truthy_bool, default=False)
@query_param('last_modified', 'Whether to include when the repository was last modified.',
type=truthy_bool, default=False)
@query_param('popularity', 'Whether to include the repository\'s popularity metric.',
type=truthy_bool, default=False)
def get(self, args):
""" Fetch the list of repositories visible to the current user under a variety of situations.
"""
if not args['namespace'] and not args['starred'] and not args['public']:
raise InvalidRequest('namespace, starred or public are required for this API call')
repositories, star_lookup = self._load_repositories(args['namespace'], args['public'],
args['starred'], args['limit'],
args['page'])
# Collect the IDs of the repositories found for subequent lookup of popularity
# and/or last modified.
repository_ids = [repo.id for repo in repositories]
if args['last_modified']:
last_modified_map = model.repository.get_when_last_modified(repository_ids)
if args['popularity']:
action_count_map = model.repository.get_action_counts(repository_ids)
def repo_view(repo_obj):
repo = {
'namespace': repo_obj.namespace_user.username,
'name': repo_obj.name,
'description': repo_obj.description,
2015-10-21 18:13:03 +00:00
'is_public': repo_obj.visibility_id == model.repository.get_public_repo_visibility().id,
}
repo_id = repo_obj.id
if args['last_modified']:
repo['last_modified'] = last_modified_map.get(repo_id)
if args['popularity']:
repo['popularity'] = action_count_map.get(repo_id, 0)
if get_authenticated_user():
repo['is_starred'] = repo_id in star_lookup
return repo
return {
'repositories': [repo_view(repo) for repo in repositories]
}
@resource('/v1/repository/<repopath:repository>')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
class Repository(RepositoryParamResource):
"""Operations for managing a specific repository."""
schemas = {
'RepoUpdate': {
'type': 'object',
'description': 'Fields which can be updated in a repository.',
'required': [
'description',
],
'properties': {
'description': {
'type': 'string',
'description': 'Markdown encoded description for the repository',
},
}
}
}
@require_repo_read
@nickname('getRepo')
def get(self, namespace, repository):
"""Fetch the specified repository."""
logger.debug('Get repo: %s/%s' % (namespace, repository))
def tag_view(tag):
tag_info = {
'name': tag.name,
2015-03-10 05:03:39 +00:00
'image_id': tag.image.docker_image_id,
'size': tag.image.aggregate_size
}
if tag.lifetime_start_ts > 0:
last_modified = format_date(datetime.datetime.fromtimestamp(tag.lifetime_start_ts))
tag_info['last_modified'] = last_modified
return tag_info
repo = model.repository.get_repository(namespace, repository)
if repo:
tags = model.tag.list_repository_tags(namespace, repository, include_storage=True)
tag_dict = {tag.name: tag_view(tag) for tag in tags}
can_write = ModifyRepositoryPermission(namespace, repository).can()
can_admin = AdministerRepositoryPermission(namespace, repository).can()
is_starred = (model.repository.repository_is_starred(get_authenticated_user(), repo)
2015-03-10 05:03:39 +00:00
if get_authenticated_user() else False)
is_public = model.repository.is_repository_public(repo)
2015-03-10 05:03:39 +00:00
(pull_today, pull_thirty_day) = model.log.get_repository_pulls(repo, timedelta(days=1),
timedelta(days=30))
(push_today, push_thirty_day) = model.log.get_repository_pushes(repo, timedelta(days=1),
timedelta(days=30))
return {
'namespace': namespace,
'name': repository,
'description': repo.description,
'tags': tag_dict,
'can_write': can_write,
'can_admin': can_admin,
'is_public': is_public,
'is_organization': repo.namespace_user.organization,
2015-03-10 05:03:39 +00:00
'is_starred': is_starred,
2015-03-11 00:22:46 +00:00
'status_token': repo.badge_token if not is_public else '',
'stats': {
'pulls': {
'today': pull_today,
'thirty_day': pull_thirty_day
2015-03-11 00:22:46 +00:00
},
'pushes': {
'today': push_today,
'thirty_day': push_thirty_day
2015-03-11 00:22:46 +00:00
}
}
}
raise NotFound()
2014-11-24 21:07:38 +00:00
@require_repo_write
@nickname('updateRepo')
@validate_json_request('RepoUpdate')
def put(self, namespace, repository):
""" Update the description in the specified repository. """
repo = model.repository.get_repository(namespace, repository)
if repo:
values = request.get_json()
repo.description = values['description']
repo.save()
2014-11-24 21:07:38 +00:00
log_action('set_repo_description', namespace,
{'repo': repository, 'description': values['description']},
repo=repo)
return {
'success': True
}
raise NotFound()
@require_repo_admin
@nickname('deleteRepository')
def delete(self, namespace, repository):
2014-03-13 19:19:49 +00:00
""" Delete a repository. """
model.repository.purge_repository(namespace, repository)
log_action('delete_repo', namespace,
{'repo': repository, 'namespace': namespace})
return 'Deleted', 204
@resource('/v1/repository/<repopath:repository>/changevisibility')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
class RepositoryVisibility(RepositoryParamResource):
""" Custom verb for changing the visibility of the repository. """
schemas = {
'ChangeVisibility': {
'type': 'object',
'description': 'Change the visibility for the repository.',
'required': [
'visibility',
],
'properties': {
'visibility': {
'type': 'string',
'description': 'Visibility which the repository will start with',
'enum': [
'public',
'private',
],
},
}
}
}
@require_repo_admin
@nickname('changeRepoVisibility')
@validate_json_request('ChangeVisibility')
def post(self, namespace, repository):
""" Change the visibility of a repository. """
repo = model.repository.get_repository(namespace, repository)
if repo:
values = request.get_json()
visibility = values['visibility']
if visibility == 'private':
check_allowed_private_repos(namespace)
model.repository.set_repository_visibility(repo, visibility)
log_action('change_repo_visibility', namespace,
{'repo': repository, 'visibility': values['visibility']},
repo=repo)
return {'success': True}