First stab at token auth. The UI could use a little bit of polishing.

This commit is contained in:
yackob03 2013-10-16 14:24:10 -04:00
parent f1746417b1
commit 283f9b81ae
9 changed files with 360 additions and 91 deletions

View file

@ -26,6 +26,10 @@ class InvalidPasswordException(DataModelException):
pass
class InvalidTokenException(DataModelException):
pass
def create_user(username, password, email):
if not validate_email(email):
raise InvalidEmailAddressException('Invalid email address: %s' % email)
@ -159,25 +163,6 @@ def verify_user(username, password):
return None
def create_access_token(user, repository):
new_token = AccessToken.create(user=user, repository=repository)
return new_token
def verify_token(code, namespace_name, repository_name):
joined = AccessToken.select(AccessToken, Repository).join(Repository)
tokens = list(joined.where(AccessToken.code == code,
Repository.namespace == namespace_name,
Repository.name == repository_name))
if tokens:
return tokens[0]
return None
def get_token(code):
return AccessToken.get(AccessToken.code == code)
def get_visible_repositories(username=None, include_public=True, limit=None,
sort=False):
if not username and not include_public:
@ -485,3 +470,69 @@ def get_private_repo_count(username):
joined = Repository.select().join(Visibility)
return joined.where(Repository.namespace == username,
Visibility.name == 'private').count()
def create_access_token(repository, role):
role = Role.get(Role.name == role)
new_token = AccessToken.create(repository=repository, temporary=True,
role=role)
return new_token
def create_delegate_token(namespace_name, repository_name, friendly_name):
read_only = Role.get(name='read')
repo = Repository.get(Repository.name == repository_name,
Repository.namespace == namespace_name)
new_token = AccessToken.create(repository=repo, role=read_only,
friendly_name=friendly_name, temporary=False)
return new_token
def get_repository_delegate_tokens(namespace_name, repository_name):
selected = AccessToken.select(AccessToken, Role)
with_repo = selected.join(Repository)
with_role = with_repo.switch(AccessToken).join(Role)
return with_role.where(Repository.name == repository_name,
Repository.namespace == namespace_name,
AccessToken.temporary == False)
def get_repo_delegate_token(namespace_name, repository_name, code):
repo_query = get_repository_delegate_tokens(namespace_name, repository_name)
found = list(repo_query.where(AccessToken.code == code))
if found:
return found[0]
else:
raise InvalidTokenException('Unable to find token with code: %s' % code)
def set_repo_delegate_token_role(namespace_name, repository_name, code, role):
token = get_repo_delegate_token(namespace_name, repository_name, code)
if role != 'read' and role != 'write':
raise DataModelException('Invalid role for delegate token: %s' % role)
new_role = Role.get(Role.name == role)
token.role = new_role
token.save()
return token
def delete_delegate_token(namespace_name, repository_name, code):
token = get_repo_delegate_token(namespace_name, repository_name, code)
token.delete_instance()
def load_token_data(code):
""" Load the permissions for any token by code. """
selected = AccessToken.select(AccessToken, Repository, Role)
with_role = selected.join(Role)
with_repo = with_role.switch(AccessToken).join(Repository)
fetched = list(with_repo.where(AccessToken.code == code))
if fetched:
return fetched[0]
else:
raise InvalidTokenException('Invalid delegate token code: %s' % code)