87 lines
3 KiB
Python
87 lines
3 KiB
Python
import logging
|
|
|
|
from peewee import JOIN_LEFT_OUTER
|
|
|
|
from data.database import (AccessToken, AccessTokenKind, Repository, Namespace, Role,
|
|
RepositoryBuildTrigger, LogEntryKind)
|
|
from data.model import DataModelException, _basequery, InvalidTokenException
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_access_token(repo, role, kind=None, friendly_name=None):
|
|
role = Role.get(Role.name == role)
|
|
kind_ref = None
|
|
if kind is not None:
|
|
kind_ref = AccessTokenKind.get(AccessTokenKind.name == kind)
|
|
|
|
new_token = AccessToken.create(repository=repo, temporary=True, role=role, kind=kind_ref,
|
|
friendly_name=friendly_name)
|
|
return new_token
|
|
|
|
|
|
def create_delegate_token(namespace_name, repository_name, friendly_name,
|
|
role='read'):
|
|
read_only = Role.get(name=role)
|
|
repo = _basequery.get_existing_repository(namespace_name, repository_name)
|
|
new_token = AccessToken.create(repository=repo, role=read_only,
|
|
friendly_name=friendly_name, temporary=False)
|
|
return new_token
|
|
|
|
|
|
def get_repository_delegate_tokens(namespace_name, repository_name):
|
|
return (AccessToken
|
|
.select(AccessToken, Role)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.switch(AccessToken)
|
|
.join(Role)
|
|
.switch(AccessToken)
|
|
.join(RepositoryBuildTrigger, JOIN_LEFT_OUTER)
|
|
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
|
AccessToken.temporary == False, RepositoryBuildTrigger.uuid >> None))
|
|
|
|
|
|
def get_repo_delegate_token(namespace_name, repository_name, code):
|
|
repo_query = get_repository_delegate_tokens(namespace_name, repository_name)
|
|
|
|
try:
|
|
return repo_query.where(AccessToken.code == code).get()
|
|
except AccessToken.DoesNotExist:
|
|
raise InvalidTokenException('Unable to find token with code: %s' % code)
|
|
|
|
|
|
def set_repo_delegate_token_role(namespace_name, repository_name, code, role):
|
|
token = get_repo_delegate_token(namespace_name, repository_name, code)
|
|
|
|
if role != 'read' and role != 'write':
|
|
raise DataModelException('Invalid role for delegate token: %s' % role)
|
|
|
|
new_role = Role.get(Role.name == role)
|
|
token.role = new_role
|
|
token.save()
|
|
|
|
return token
|
|
|
|
|
|
def delete_delegate_token(namespace_name, repository_name, code):
|
|
token = get_repo_delegate_token(namespace_name, repository_name, code)
|
|
token.delete_instance(recursive=True)
|
|
return token
|
|
|
|
|
|
def load_token_data(code):
|
|
""" Load the permissions for any token by code. """
|
|
try:
|
|
return (AccessToken
|
|
.select(AccessToken, Repository, Namespace, Role)
|
|
.join(Role)
|
|
.switch(AccessToken)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(AccessToken.code == code)
|
|
.get())
|
|
|
|
except AccessToken.DoesNotExist:
|
|
raise InvalidTokenException('Invalid delegate token code: %s' % code)
|