284 lines
9.7 KiB
Python
284 lines
9.7 KiB
Python
import bcrypt
|
|
import logging
|
|
import dateutil.parser
|
|
|
|
from database import *
|
|
from util.validation import (validate_email, validate_username,
|
|
validate_password)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DataModelException(Exception):
|
|
pass
|
|
|
|
|
|
def create_user(username, password, email):
|
|
pw_hash = bcrypt.hashpw(password, bcrypt.gensalt())
|
|
|
|
if not validate_email(email):
|
|
raise DataModelException('Invalid email address: %s' % email)
|
|
if not validate_username(username):
|
|
raise DataModelException('Invalid username: %s' % username)
|
|
if not validate_password(password):
|
|
raise DataModelException('Invalid password, password must be at least ' +
|
|
'8 characters and contain no whitespace.')
|
|
|
|
try:
|
|
new_user = User.create(username=username, password_hash=pw_hash,
|
|
email=email)
|
|
except Exception as ex:
|
|
raise DataModelException(ex.message)
|
|
return new_user
|
|
|
|
|
|
def get_user(username):
|
|
try:
|
|
return User.get(User.username == username)
|
|
except User.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def get_matching_users(username_prefix):
|
|
return list(User.select().where(User.username ** (username_prefix + '%')).limit(10))
|
|
|
|
def verify_user(username, password):
|
|
try:
|
|
fetched = User.get(User.username == username)
|
|
except User.DoesNotExist:
|
|
return None
|
|
|
|
if bcrypt.hashpw(password, fetched.password_hash) == fetched.password_hash:
|
|
return fetched
|
|
|
|
# We weren't able to authorize the user
|
|
return None
|
|
|
|
|
|
def create_access_token(user, repository):
|
|
new_token = AccessToken.create(user=user, repository=repository)
|
|
return new_token
|
|
|
|
|
|
def verify_token(code, namespace_name, repository_name):
|
|
joined = AccessToken.select(AccessToken, Repository).join(Repository)
|
|
tokens = list(joined.where(AccessToken.code == code,
|
|
Repository.namespace == namespace_name,
|
|
Repository.name == repository_name))
|
|
if tokens:
|
|
return tokens[0]
|
|
return None
|
|
|
|
|
|
def get_token(code):
|
|
return AccessToken.get(AccessToken.code == code)
|
|
|
|
|
|
def get_matching_repositories(repo_term):
|
|
return list(Repository.select().where(Repository.name ** ('%' + repo_term + '%') | Repository.namespace ** ('%' + repo_term + '%') | Repository.description ** ('%' + repo_term + '%')).limit(10))
|
|
|
|
|
|
def change_password(user, new_password):
|
|
pw_hash = bcrypt.hashpw(new_password, bcrypt.gensalt())
|
|
user.password_hash = pw_hash
|
|
user.save()
|
|
|
|
|
|
def update_email(user, new_email):
|
|
user.email = new_email
|
|
user.verified = False
|
|
user.save()
|
|
|
|
|
|
def get_all_user_permissions(user):
|
|
select = User.select(User, Repository, RepositoryPermission, Role)
|
|
with_repo = select.join(RepositoryPermission).join(Repository)
|
|
with_role = with_repo.switch(RepositoryPermission).join(Role)
|
|
return with_role.where(User.username == user.username)
|
|
|
|
|
|
def get_all_repo_users(namespace_name, repository_name):
|
|
select = RepositoryPermission.select(User.username, Role.name,
|
|
RepositoryPermission)
|
|
with_user = select.join(User)
|
|
with_role = with_user.switch(RepositoryPermission).join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
return with_repo.where(Repository.namespace == namespace_name,
|
|
Repository.name == repository_name)
|
|
|
|
|
|
def get_repository(namespace, name):
|
|
try:
|
|
return Repository.get(Repository.name == name,
|
|
Repository.namespace == namespace)
|
|
except Repository.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def get_user_repositories(user):
|
|
select = RepositoryPermission.select(RepositoryPermission, Repository, Role)
|
|
with_user = select.join(User).where(User.username == user.username)
|
|
with_role = with_user.switch(RepositoryPermission).join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
return with_repo
|
|
|
|
|
|
def create_repository(namespace, name, owner):
|
|
private = Visibility.get(name='private')
|
|
repo = Repository.create(namespace=namespace, name=name,
|
|
visibility=private)
|
|
admin = Role.get(name='admin')
|
|
permission = RepositoryPermission.create(user=owner, repository=repo,
|
|
role=admin)
|
|
return repo
|
|
|
|
|
|
def create_image(image_id, repository):
|
|
new_image = Image.create(image_id=image_id, repository=repository)
|
|
return new_image
|
|
|
|
|
|
def set_image_checksum(image_id, repository, checksum):
|
|
fetched = Image.get(Image.image_id == image_id,
|
|
Image.repository == repository)
|
|
fetched.checksum = checksum
|
|
fetched.save()
|
|
return fetched
|
|
|
|
|
|
def set_image_metadata(image_id, namespace_name, repository_name,
|
|
created_date_str, comment):
|
|
joined = Image.select().join(Repository)
|
|
image_list = list(joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
Image.image_id == image_id))
|
|
|
|
if not image_list:
|
|
raise DataModelException('No image with specified id and repository')
|
|
|
|
fetched = image_list[0]
|
|
fetched.created = dateutil.parser.parse(created_date_str)
|
|
fetched.comment = comment
|
|
fetched.save()
|
|
return fetched
|
|
|
|
|
|
def get_repository_images(namespace_name, repository_name):
|
|
joined = Image.select().join(Repository)
|
|
return joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
|
|
def get_tag_images(namespace_name, repository_name, tag_name):
|
|
joined = Image.select().join(RepositoryTag).join(Repository)
|
|
fetched = list(joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
RepositoryTag.name == tag_name))
|
|
|
|
return fetched
|
|
|
|
def list_repository_tags(namespace_name, repository_name):
|
|
select = RepositoryTag.select(RepositoryTag, Image)
|
|
with_repo = select.join(Repository)
|
|
with_image = with_repo.switch(RepositoryTag).join(Image)
|
|
return with_image.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
|
|
|
|
def get_tag_image(namespace_name, repository_name, tag_name):
|
|
fetched = get_tag_images(namespace_name, repository_name, tag_name)
|
|
if not fetched:
|
|
raise DataModelException('Unable to find image for tag.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def create_or_update_tag(namespace_name, repository_name, tag_name,
|
|
tag_image_id):
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
image = Image.get(Image.image_id == tag_image_id)
|
|
|
|
try:
|
|
tag = RepositoryTag.get(RepositoryTag.repository == repo,
|
|
RepositoryTag.name == tag_name)
|
|
tag.image = image
|
|
tag.save()
|
|
except RepositoryTag.DoesNotExist:
|
|
tag = RepositoryTag.create(repository=repo, image=image, name=tag_name)
|
|
|
|
return tag
|
|
|
|
|
|
def delete_tag(namespace_name, repository_name, tag_name):
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
tag = RepositoryTag.get(RepositoryTag.repository == repo,
|
|
RepositoryTag.name == tag_name)
|
|
tag.delete_instance()
|
|
|
|
|
|
def delete_all_repository_tags(namespace_name, repository_name):
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
RepositoryTag.delete().where(RepositoryTag.repository == repo)
|
|
|
|
|
|
def get_user_repo_permissions(user, repository):
|
|
select = RepositoryPermission.select()
|
|
return select.where(RepositoryPermission.user == user,
|
|
RepositoryPermission.repository == repository)
|
|
|
|
|
|
def user_permission_repo_query(username, namespace_name, repository_name):
|
|
selected = RepositoryPermission.select(User, Repository, Role,
|
|
RepositoryPermission)
|
|
with_user = selected.join(User)
|
|
with_role = with_user.switch(RepositoryPermission).join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
return with_repo.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
User.username == username)
|
|
|
|
|
|
def get_user_reponame_permission(username, namespace_name, repository_name):
|
|
fetched = list(user_permission_repo_query(username, namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('User does not have permission for repo.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def set_user_repo_permission(username, namespace_name, repository_name,
|
|
role_name):
|
|
if username == namespace_name:
|
|
raise DataModelException('Namespace owner must always be admin.')
|
|
|
|
user = User.get(User.username == username)
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
new_role = Role.get(Role.name == role_name)
|
|
|
|
# Fetch any existing permission for this user on the repo
|
|
try:
|
|
perm = RepositoryPermission.get(RepositoryPermission.user == user,
|
|
RepositoryPermission.repository == repo)
|
|
perm.role = new_role
|
|
perm.save()
|
|
return perm
|
|
except RepositoryPermission.DoesNotExist:
|
|
new_perm = RepositoryPermission.create(repository=repo, user=user,
|
|
role=new_role)
|
|
return new_perm
|
|
|
|
def delete_user_permission(username, namespace_name, repository_name):
|
|
if username == namespace_name:
|
|
raise DataModelException('Namespace owner must always be admin.')
|
|
|
|
fetched = list(user_permission_repo_query(username, namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('User does not have permission for repo.')
|
|
|
|
fetched[0].delete_instance()
|