""" List, create and manage repositories. """ import logging import datetime import features from datetime import timedelta from flask import request from data import model from data.database import (Repository as RepositoryTable, Visibility, RepositoryTag, RepositoryActionCount, Namespace, fn) from endpoints.api import (truthy_bool, format_date, nickname, log_action, validate_json_request, require_repo_read, require_repo_write, require_repo_admin, RepositoryParamResource, resource, query_param, parse_args, ApiResource, request_error, require_scope, Unauthorized, NotFound, InvalidRequest, path_param, ExceedsLicenseException) from endpoints.api.billing import lookup_allowed_private_repos, get_namespace_plan from endpoints.common import check_repository_usage from auth.permissions import (ModifyRepositoryPermission, AdministerRepositoryPermission, CreateRepositoryPermission) from auth.auth_context import get_authenticated_user from auth import scopes from util.names import REPOSITORY_NAME_REGEX logger = logging.getLogger(__name__) def check_allowed_private_repos(namespace): """ Checks to see if the given namespace has reached its private repository limit. If so, raises a ExceedsLicenseException. """ # Not enabled if billing is disabled. if not features.BILLING: return if not lookup_allowed_private_repos(namespace): raise ExceedsLicenseException() @resource('/v1/repository') class RepositoryList(ApiResource): """Operations for creating and listing repositories.""" schemas = { 'NewRepo': { 'type': 'object', 'description': 'Description of a new repository', 'required': [ 'repository', 'visibility', 'description', ], 'properties': { 'repository': { 'type': 'string', 'description': 'Repository name', }, 'visibility': { 'type': 'string', 'description': 'Visibility which the repository will start with', 'enum': [ 'public', 'private', ], }, 'namespace': { 'type': 'string', 'description': ('Namespace in which the repository should be created. If omitted, the ' 'username of the caller is used'), }, 'description': { 'type': 'string', 'description': 'Markdown encoded description for the repository', }, }, }, } @require_scope(scopes.CREATE_REPO) @nickname('createRepo') @validate_json_request('NewRepo') def post(self): """Create a new repository.""" owner = get_authenticated_user() req = request.get_json() if owner is None and 'namespace' not in 'req': raise InvalidRequest('Must provide a namespace or must be logged in.') namespace_name = req['namespace'] if 'namespace' in req else owner.username permission = CreateRepositoryPermission(namespace_name) if permission.can(): repository_name = req['repository'] visibility = req['visibility'] existing = model.repository.get_repository(namespace_name, repository_name) if existing: raise request_error(message='Repository already exists') visibility = req['visibility'] if visibility == 'private': check_allowed_private_repos(namespace_name) # Verify that the repository name is valid. if not REPOSITORY_NAME_REGEX.match(repository_name): raise InvalidRequest('Invalid repository name') repo = model.repository.create_repository(namespace_name, repository_name, owner, visibility) repo.description = req['description'] repo.save() log_action('create_repo', namespace_name, {'repo': repository_name, 'namespace': namespace_name}, repo=repo) return { 'namespace': namespace_name, 'name': repository_name }, 201 raise Unauthorized() def _load_repositories(self, namespace=None, public=False, starred=False, limit=None, page=None): """ Loads the filtered list of repositories and returns it and the star lookup set. """ # Load the starred repositories and username (if applicable) username = get_authenticated_user().username if get_authenticated_user() else None # If starred (and only starred) repositories were requested, then load them directly. if starred and namespace is None and not public: if not username: return [], set() repositories = model.repository.get_user_starred_repositories(get_authenticated_user(), limit=limit, page=page) return repositories, set([repo.id for repo in repositories]) # Otherwise, conduct a full filtered lookup and (optionally) filter by the starred repositories. starred_repos = [] star_lookup = set() if username: starred_repos = model.repository.get_user_starred_repositories(get_authenticated_user()) star_lookup = set([repo.id for repo in starred_repos]) # If the user asked for only public repositories, limit to only public repos. if public and (not namespace and not starred): username = None # Find the matching repositories. repositories = model.repository.get_visible_repositories(username=username, limit=limit, page=page, include_public=public, namespace=namespace) # Filter down to just the starred repositories, if asked. if starred: return [repo for repo in repositories if repo.id in star_lookup], star_lookup return repositories, star_lookup @require_scope(scopes.READ_REPO) @nickname('listRepos') @parse_args @query_param('page', 'Offset page number. (int)', type=int) @query_param('limit', 'Limit on the number of results (int)', type=int) @query_param('namespace', 'Filters the repositories returned to this namespace', type=str) @query_param('starred', 'Filters the repositories returned to those starred by the user', type=truthy_bool, default=False) @query_param('public', 'Adds any repositories visible to the user by virtue of being public', type=truthy_bool, default=False) @query_param('last_modified', 'Whether to include when the repository was last modified.', type=truthy_bool, default=False) @query_param('popularity', 'Whether to include the repository\'s popularity metric.', type=truthy_bool, default=False) def get(self, args): """ Fetch the list of repositories visible to the current user under a variety of situations. """ if not args['namespace'] and not args['starred'] and not args['public']: raise InvalidRequest('namespace, starred or public are required for this API call') repositories, star_lookup = self._load_repositories(args['namespace'], args['public'], args['starred'], args['limit'], args['page']) # Collect the IDs of the repositories found for subequent lookup of popularity # and/or last modified. repository_ids = [repo.id for repo in repositories] if args['last_modified']: last_modified_map = model.repository.get_when_last_modified(repository_ids) if args['popularity']: action_count_map = model.repository.get_action_counts(repository_ids) def repo_view(repo_obj): repo = { 'namespace': repo_obj.namespace_user.username, 'name': repo_obj.name, 'description': repo_obj.description, 'is_public': repo_obj.visibility_id == model.repository.get_public_repo_visibility().id, } repo_id = repo_obj.id if args['last_modified']: repo['last_modified'] = last_modified_map.get(repo_id) if args['popularity']: repo['popularity'] = action_count_map.get(repo_id, 0) if get_authenticated_user(): repo['is_starred'] = repo_id in star_lookup return repo return { 'repositories': [repo_view(repo) for repo in repositories] } @resource('/v1/repository/') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class Repository(RepositoryParamResource): """Operations for managing a specific repository.""" schemas = { 'RepoUpdate': { 'type': 'object', 'description': 'Fields which can be updated in a repository.', 'required': [ 'description', ], 'properties': { 'description': { 'type': 'string', 'description': 'Markdown encoded description for the repository', }, } } } @require_repo_read @nickname('getRepo') def get(self, namespace, repository): """Fetch the specified repository.""" logger.debug('Get repo: %s/%s' % (namespace, repository)) def tag_view(tag): tag_info = { 'name': tag.name, 'image_id': tag.image.docker_image_id, 'size': tag.image.aggregate_size } if tag.lifetime_start_ts > 0: last_modified = format_date(datetime.datetime.fromtimestamp(tag.lifetime_start_ts)) tag_info['last_modified'] = last_modified return tag_info repo = model.repository.get_repository(namespace, repository) if repo: tags = model.tag.list_repository_tags(namespace, repository, include_storage=True) tag_dict = {tag.name: tag_view(tag) for tag in tags} can_write = ModifyRepositoryPermission(namespace, repository).can() can_admin = AdministerRepositoryPermission(namespace, repository).can() is_starred = (model.repository.repository_is_starred(get_authenticated_user(), repo) if get_authenticated_user() else False) is_public = model.repository.is_repository_public(repo) (pull_today, pull_thirty_day) = model.log.get_repository_pulls(repo, timedelta(days=1), timedelta(days=30)) (push_today, push_thirty_day) = model.log.get_repository_pushes(repo, timedelta(days=1), timedelta(days=30)) return { 'namespace': namespace, 'name': repository, 'description': repo.description, 'tags': tag_dict, 'can_write': can_write, 'can_admin': can_admin, 'is_public': is_public, 'is_organization': repo.namespace_user.organization, 'is_starred': is_starred, 'status_token': repo.badge_token if not is_public else '', 'stats': { 'pulls': { 'today': pull_today, 'thirty_day': pull_thirty_day }, 'pushes': { 'today': push_today, 'thirty_day': push_thirty_day } } } raise NotFound() @require_repo_write @nickname('updateRepo') @validate_json_request('RepoUpdate') def put(self, namespace, repository): """ Update the description in the specified repository. """ repo = model.repository.get_repository(namespace, repository) if repo: values = request.get_json() repo.description = values['description'] repo.save() log_action('set_repo_description', namespace, {'repo': repository, 'description': values['description']}, repo=repo) return { 'success': True } raise NotFound() @require_repo_admin @nickname('deleteRepository') def delete(self, namespace, repository): """ Delete a repository. """ model.repository.purge_repository(namespace, repository) user = model.user.get_namespace_user(namespace) plan = get_namespace_plan(namespace) check_repository_usage(user, plan) log_action('delete_repo', namespace, {'repo': repository, 'namespace': namespace}) return 'Deleted', 204 @resource('/v1/repository//changevisibility') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class RepositoryVisibility(RepositoryParamResource): """ Custom verb for changing the visibility of the repository. """ schemas = { 'ChangeVisibility': { 'type': 'object', 'description': 'Change the visibility for the repository.', 'required': [ 'visibility', ], 'properties': { 'visibility': { 'type': 'string', 'description': 'Visibility which the repository will start with', 'enum': [ 'public', 'private', ], }, } } } @require_repo_admin @nickname('changeRepoVisibility') @validate_json_request('ChangeVisibility') def post(self, namespace, repository): """ Change the visibility of a repository. """ repo = model.repository.get_repository(namespace, repository) if repo: values = request.get_json() visibility = values['visibility'] if visibility == 'private': check_allowed_private_repos(namespace) model.repository.set_repository_visibility(repo, visibility) log_action('change_repo_visibility', namespace, {'repo': repository, 'visibility': values['visibility']}, repo=repo) return {'success': True}