Accidental refactor, split out legacy.py into separate sumodules and update all call sites.
This commit is contained in:
parent
2109d24483
commit
3efaa255e8
92 changed files with 4458 additions and 4269 deletions
87
data/model/token.py
Normal file
87
data/model/token.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
import logging
|
||||
|
||||
from peewee import JOIN_LEFT_OUTER
|
||||
|
||||
from data.database import (AccessToken, AccessTokenKind, Repository, Namespace, Role,
|
||||
RepositoryBuildTrigger, LogEntryKind)
|
||||
from data.model import DataModelException, _basequery, InvalidTokenException
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_access_token(repo, role, kind=None, friendly_name=None):
|
||||
role = Role.get(Role.name == role)
|
||||
kind_ref = None
|
||||
if kind is not None:
|
||||
kind_ref = AccessTokenKind.get(AccessTokenKind.name == kind)
|
||||
|
||||
new_token = AccessToken.create(repository=repo, temporary=True, role=role, kind=kind_ref,
|
||||
friendly_name=friendly_name)
|
||||
return new_token
|
||||
|
||||
|
||||
def create_delegate_token(namespace_name, repository_name, friendly_name,
|
||||
role='read'):
|
||||
read_only = Role.get(name=role)
|
||||
repo = _basequery.get_existing_repository(namespace_name, repository_name)
|
||||
new_token = AccessToken.create(repository=repo, role=read_only,
|
||||
friendly_name=friendly_name, temporary=False)
|
||||
return new_token
|
||||
|
||||
|
||||
def get_repository_delegate_tokens(namespace_name, repository_name):
|
||||
return (AccessToken
|
||||
.select(AccessToken, Role)
|
||||
.join(Repository)
|
||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||
.switch(AccessToken)
|
||||
.join(Role)
|
||||
.switch(AccessToken)
|
||||
.join(RepositoryBuildTrigger, JOIN_LEFT_OUTER)
|
||||
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
||||
AccessToken.temporary == False, RepositoryBuildTrigger.uuid >> None))
|
||||
|
||||
|
||||
def get_repo_delegate_token(namespace_name, repository_name, code):
|
||||
repo_query = get_repository_delegate_tokens(namespace_name, repository_name)
|
||||
|
||||
try:
|
||||
return repo_query.where(AccessToken.code == code).get()
|
||||
except AccessToken.DoesNotExist:
|
||||
raise InvalidTokenException('Unable to find token with code: %s' % code)
|
||||
|
||||
|
||||
def set_repo_delegate_token_role(namespace_name, repository_name, code, role):
|
||||
token = get_repo_delegate_token(namespace_name, repository_name, code)
|
||||
|
||||
if role != 'read' and role != 'write':
|
||||
raise DataModelException('Invalid role for delegate token: %s' % role)
|
||||
|
||||
new_role = Role.get(Role.name == role)
|
||||
token.role = new_role
|
||||
token.save()
|
||||
|
||||
return token
|
||||
|
||||
|
||||
def delete_delegate_token(namespace_name, repository_name, code):
|
||||
token = get_repo_delegate_token(namespace_name, repository_name, code)
|
||||
token.delete_instance(recursive=True)
|
||||
return token
|
||||
|
||||
|
||||
def load_token_data(code):
|
||||
""" Load the permissions for any token by code. """
|
||||
try:
|
||||
return (AccessToken
|
||||
.select(AccessToken, Repository, Namespace, Role)
|
||||
.join(Role)
|
||||
.switch(AccessToken)
|
||||
.join(Repository)
|
||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||
.where(AccessToken.code == code)
|
||||
.get())
|
||||
|
||||
except AccessToken.DoesNotExist:
|
||||
raise InvalidTokenException('Invalid delegate token code: %s' % code)
|
Reference in a new issue