import logging from peewee import JOIN_LEFT_OUTER from data.database import (AccessToken, AccessTokenKind, Repository, Namespace, Role, RepositoryBuildTrigger, LogEntryKind) from data.model import DataModelException, _basequery, InvalidTokenException logger = logging.getLogger(__name__) def create_access_token(repo, role, kind=None, friendly_name=None): role = Role.get(Role.name == role) kind_ref = None if kind is not None: kind_ref = AccessTokenKind.get(AccessTokenKind.name == kind) new_token = AccessToken.create(repository=repo, temporary=True, role=role, kind=kind_ref, friendly_name=friendly_name) return new_token def create_delegate_token(namespace_name, repository_name, friendly_name, role='read'): read_only = Role.get(name=role) repo = _basequery.get_existing_repository(namespace_name, repository_name) new_token = AccessToken.create(repository=repo, role=read_only, friendly_name=friendly_name, temporary=False) return new_token def get_repository_delegate_tokens(namespace_name, repository_name): return (AccessToken .select(AccessToken, Role) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .switch(AccessToken) .join(Role) .switch(AccessToken) .join(RepositoryBuildTrigger, JOIN_LEFT_OUTER) .where(Repository.name == repository_name, Namespace.username == namespace_name, AccessToken.temporary == False, RepositoryBuildTrigger.uuid >> None)) def get_repo_delegate_token(namespace_name, repository_name, code): repo_query = get_repository_delegate_tokens(namespace_name, repository_name) try: return repo_query.where(AccessToken.code == code).get() except AccessToken.DoesNotExist: raise InvalidTokenException('Unable to find token with code: %s' % code) def set_repo_delegate_token_role(namespace_name, repository_name, code, role): token = get_repo_delegate_token(namespace_name, repository_name, code) if role != 'read' and role != 'write': raise DataModelException('Invalid role for delegate token: %s' % role) new_role = Role.get(Role.name == role) token.role = new_role token.save() return token def delete_delegate_token(namespace_name, repository_name, code): token = get_repo_delegate_token(namespace_name, repository_name, code) token.delete_instance(recursive=True) return token def load_token_data(code): """ Load the permissions for any token by code. """ try: return (AccessToken .select(AccessToken, Repository, Namespace, Role) .join(Role) .switch(AccessToken) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(AccessToken.code == code) .get()) except AccessToken.DoesNotExist: raise InvalidTokenException('Invalid delegate token code: %s' % code)