This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/model.py

574 lines
18 KiB
Python

import bcrypt
import logging
import dateutil.parser
import operator
from database import *
from util.validation import *
logger = logging.getLogger(__name__)
class DataModelException(Exception):
pass
class InvalidEmailAddressException(DataModelException):
pass
class InvalidUsernameException(DataModelException):
pass
class InvalidPasswordException(DataModelException):
pass
class InvalidTokenException(DataModelException):
pass
class InvalidRepositoryBuildException(DataModelException):
pass
def create_user(username, password, email):
if not validate_email(email):
raise InvalidEmailAddressException('Invalid email address: %s' % email)
if not validate_username(username):
raise InvalidUsernameException('Invalid username: %s' % username)
# We allow password none for the federated login case.
if password is not None and not validate_password(password):
raise InvalidPasswordException(INVALID_PASSWORD_MESSAGE)
try:
existing = User.get((User.username == username) | (User.email == email))
logger.debug('Existing user with same username or email.')
# A user already exists with either the same username or email
if existing.username == username:
raise InvalidUsernameException('Username has already been taken: %s' %
username)
raise InvalidEmailAddressException('Email has already been used: %s' %
email)
except User.DoesNotExist:
# This is actually the happy path
logger.debug('Email and username are unique!')
pass
try:
pw_hash = None
if password is not None:
pw_hash = bcrypt.hashpw(password, bcrypt.gensalt())
new_user = User.create(username=username, password_hash=pw_hash,
email=email)
return new_user
except Exception as ex:
raise DataModelException(ex.message)
def create_federated_user(username, email, service_name, service_id):
new_user = create_user(username, None, email)
new_user.verified = True
new_user.save()
service = LoginService.get(LoginService.name == service_name)
federated_user = FederatedLogin.create(user=new_user, service=service,
service_ident=service_id)
return new_user
def verify_federated_login(service_name, service_id):
selected = FederatedLogin.select(FederatedLogin, User)
with_service = selected.join(LoginService)
with_user = with_service.switch(FederatedLogin).join(User)
found = with_user.where(FederatedLogin.service_ident == service_id,
LoginService.name == service_name)
found_list = list(found)
if found_list:
return found_list[0].user
return None
def create_confirm_email_code(user):
code = EmailConfirmation.create(user=user, email_confirm=True)
return code
def confirm_user_email(code):
code = EmailConfirmation.get(EmailConfirmation.code == code,
EmailConfirmation.email_confirm == True)
user = code.user
user.verified = True
user.save()
code.delete_instance()
return user
def create_reset_password_email_code(email):
try:
user = User.get(User.email == email)
except User.DoesNotExist:
raise InvalidEmailAddressException('Email address was not found.');
code = EmailConfirmation.create(user=user, pw_reset=True)
return code
def validate_reset_code(code):
try:
code = EmailConfirmation.get(EmailConfirmation.code == code,
EmailConfirmation.pw_reset == True)
except EmailConfirmation.DoesNotExist:
return None
user = code.user
code.delete_instance()
return user
def get_user(username):
try:
return User.get(User.username == username)
except User.DoesNotExist:
return None
def get_matching_users(username_prefix):
query = User.select().where(User.username ** (username_prefix + '%'))
return list(query.limit(10))
def verify_user(username, password):
try:
fetched = User.get(User.username == username)
except User.DoesNotExist:
return None
if (fetched.password_hash and
bcrypt.hashpw(password, fetched.password_hash) ==
fetched.password_hash):
return fetched
# We weren't able to authorize the user
return None
def get_visible_repositories(username=None, include_public=True, limit=None,
sort=False):
if not username and not include_public:
return []
query = Repository.select().distinct().join(Visibility)
or_clauses = []
if include_public:
or_clauses.append((Visibility.name == 'public'))
if username:
with_perms = query.switch(Repository).join(RepositoryPermission,
JOIN_LEFT_OUTER)
query = with_perms.join(User)
or_clauses.append(User.username == username)
if sort:
with_images = query.switch(Repository).join(Image, JOIN_LEFT_OUTER)
query = with_images.order_by(Image.created.desc())
if (or_clauses):
query = query.where(reduce(operator.or_, or_clauses))
if limit:
query = query.limit(limit)
return query
def get_matching_repositories(repo_term, username=None):
namespace_term = repo_term
name_term = repo_term
visible = get_visible_repositories(username)
search_clauses = (Repository.name ** ('%' + name_term + '%') |
Repository.namespace ** ('%' + namespace_term + '%'))
# Handle the case where the user has already entered a namespace path.
if repo_term.find('/') > 0:
parts = repo_term.split('/', 1)
namespace_term = '/'.join(parts[:-1])
name_term = parts[-1]
search_clauses = (Repository.name ** ('%' + name_term + '%') &
Repository.namespace ** ('%' + namespace_term + '%'))
final = visible.where(search_clauses).limit(10)
return list(final)
def change_password(user, new_password):
if not validate_password(new_password):
raise InvalidPasswordException(INVALID_PASSWORD_MESSAGE)
pw_hash = bcrypt.hashpw(new_password, bcrypt.gensalt())
user.password_hash = pw_hash
user.save()
def update_email(user, new_email):
user.email = new_email
user.verified = False
user.save()
def get_all_user_permissions(user):
select = User.select(User, Repository, RepositoryPermission, Role)
with_repo = select.join(RepositoryPermission).join(Repository)
with_role = with_repo.switch(RepositoryPermission).join(Role)
return with_role.where(User.username == user.username)
def get_all_repo_users(namespace_name, repository_name):
select = RepositoryPermission.select(User.username, Role.name,
RepositoryPermission)
with_user = select.join(User)
with_role = with_user.switch(RepositoryPermission).join(Role)
with_repo = with_role.switch(RepositoryPermission).join(Repository)
return with_repo.where(Repository.namespace == namespace_name,
Repository.name == repository_name)
def get_repository(namespace_name, repository_name):
try:
return Repository.get(Repository.name == repository_name,
Repository.namespace == namespace_name)
except Repository.DoesNotExist:
return None
def get_repo_image(namespace_name, repository_name, image_id):
joined = Image.select().join(Repository)
query = joined.where(Repository.name == repository_name,
Repository.namespace == namespace_name,
Image.docker_image_id == image_id).limit(1)
result = list(query)
if not result:
return None
return result[0]
def repository_is_public(namespace_name, repository_name):
joined = Repository.select().join(Visibility)
query = joined.where(Repository.namespace == namespace_name,
Repository.name == repository_name,
Visibility.name == 'public')
return len(list(query)) > 0
def set_repository_visibility(repo, visibility):
visibility_obj = Visibility.get(name=visibility)
if not visibility_obj:
return
repo.visibility = visibility_obj
repo.save()
def create_repository(namespace, name, owner, visibility='private'):
private = Visibility.get(name=visibility)
repo = Repository.create(namespace=namespace, name=name,
visibility=private)
admin = Role.get(name='admin')
permission = RepositoryPermission.create(user=owner, repository=repo,
role=admin)
return repo
def create_image(docker_image_id, repository):
new_image = Image.create(docker_image_id=docker_image_id,
repository=repository)
return new_image
def set_image_checksum(docker_image_id, repository, checksum):
fetched = Image.get(Image.docker_image_id == docker_image_id,
Image.repository == repository)
fetched.checksum = checksum
fetched.save()
return fetched
def set_image_metadata(docker_image_id, namespace_name, repository_name,
created_date_str, comment, parent=None):
joined = Image.select().join(Repository)
image_list = list(joined.where(Repository.name == repository_name,
Repository.namespace == namespace_name,
Image.docker_image_id == docker_image_id))
if not image_list:
raise DataModelException('No image with specified id and repository')
fetched = image_list[0]
fetched.created = dateutil.parser.parse(created_date_str)
fetched.comment = comment
if parent:
fetched.ancestors = '%s%s/' % (parent.ancestors, parent.id)
fetched.save()
return fetched
def get_repository_images(namespace_name, repository_name):
joined = Image.select().join(Repository)
return joined.where(Repository.name == repository_name,
Repository.namespace == namespace_name)
def list_repository_tags(namespace_name, repository_name):
select = RepositoryTag.select(RepositoryTag, Image)
with_repo = select.join(Repository)
with_image = with_repo.switch(RepositoryTag).join(Image)
return with_image.where(Repository.name == repository_name,
Repository.namespace == namespace_name)
def get_tag_image(namespace_name, repository_name, tag_name):
joined = Image.select().join(RepositoryTag).join(Repository)
fetched = list(joined.where(Repository.name == repository_name,
Repository.namespace == namespace_name,
RepositoryTag.name == tag_name))
if not fetched:
raise DataModelException('Unable to find image for tag.')
return fetched[0]
def get_image_by_id(namespace_name, repository_name, docker_image_id):
joined = Image.select().join(Repository)
fetched = list(joined.where(Repository.name == repository_name,
Repository.namespace == namespace_name,
Image.docker_image_id == docker_image_id))
if not fetched:
raise DataModelException('Unable to find image for tag with repo.')
return fetched[0]
def get_parent_images(image_obj):
""" Returns a list of parent Image objects in chronilogical order. """
parents = image_obj.ancestors
parent_db_ids = parents.strip('/').split('/')
if parent_db_ids == ['']:
return []
or_clauses = [(Image.id == db_id) for db_id in parent_db_ids]
parent_images = Image.select().where(reduce(operator.or_, or_clauses))
id_to_image = {unicode(image.id): image for image in parent_images}
return [id_to_image[parent_id] for parent_id in parent_db_ids]
def create_or_update_tag(namespace_name, repository_name, tag_name,
tag_docker_image_id):
repo = Repository.get(Repository.name == repository_name,
Repository.namespace == namespace_name)
image = Image.get(Image.docker_image_id == tag_docker_image_id)
try:
tag = RepositoryTag.get(RepositoryTag.repository == repo,
RepositoryTag.name == tag_name)
tag.image = image
tag.save()
except RepositoryTag.DoesNotExist:
tag = RepositoryTag.create(repository=repo, image=image, name=tag_name)
return tag
def delete_tag(namespace_name, repository_name, tag_name):
repo = Repository.get(Repository.name == repository_name,
Repository.namespace == namespace_name)
tag = RepositoryTag.get(RepositoryTag.repository == repo,
RepositoryTag.name == tag_name)
tag.delete_instance()
def delete_all_repository_tags(namespace_name, repository_name):
repo = Repository.get(Repository.name == repository_name,
Repository.namespace == namespace_name)
RepositoryTag.delete().where(RepositoryTag.repository == repo)
def get_user_repo_permissions(user, repository):
select = RepositoryPermission.select()
return select.where(RepositoryPermission.user == user,
RepositoryPermission.repository == repository)
def user_permission_repo_query(username, namespace_name, repository_name):
selected = RepositoryPermission.select(User, Repository, Role,
RepositoryPermission)
with_user = selected.join(User)
with_role = with_user.switch(RepositoryPermission).join(Role)
with_repo = with_role.switch(RepositoryPermission).join(Repository)
return with_repo.where(Repository.name == repository_name,
Repository.namespace == namespace_name,
User.username == username)
def get_user_reponame_permission(username, namespace_name, repository_name):
fetched = list(user_permission_repo_query(username, namespace_name,
repository_name))
if not fetched:
raise DataModelException('User does not have permission for repo.')
return fetched[0]
def set_user_repo_permission(username, namespace_name, repository_name,
role_name):
if username == namespace_name:
raise DataModelException('Namespace owner must always be admin.')
user = User.get(User.username == username)
repo = Repository.get(Repository.name == repository_name,
Repository.namespace == namespace_name)
new_role = Role.get(Role.name == role_name)
# Fetch any existing permission for this user on the repo
try:
perm = RepositoryPermission.get(RepositoryPermission.user == user,
RepositoryPermission.repository == repo)
perm.role = new_role
perm.save()
return perm
except RepositoryPermission.DoesNotExist:
new_perm = RepositoryPermission.create(repository=repo, user=user,
role=new_role)
return new_perm
def delete_user_permission(username, namespace_name, repository_name):
if username == namespace_name:
raise DataModelException('Namespace owner must always be admin.')
fetched = list(user_permission_repo_query(username, namespace_name,
repository_name))
if not fetched:
raise DataModelException('User does not have permission for repo.')
fetched[0].delete_instance()
def purge_repository(namespace_name, repository_name):
fetched = Repository.get(Repository.name == repository_name,
Repository.namespace == namespace_name)
fetched.delete_instance(recursive=True, delete_nullable=True)
def get_private_repo_count(username):
joined = Repository.select().join(Visibility)
return joined.where(Repository.namespace == username,
Visibility.name == 'private').count()
def create_access_token(repository, role):
role = Role.get(Role.name == role)
new_token = AccessToken.create(repository=repository, temporary=True,
role=role)
return new_token
def create_delegate_token(namespace_name, repository_name, friendly_name):
read_only = Role.get(name='read')
repo = Repository.get(Repository.name == repository_name,
Repository.namespace == namespace_name)
new_token = AccessToken.create(repository=repo, role=read_only,
friendly_name=friendly_name, temporary=False)
return new_token
def get_repository_delegate_tokens(namespace_name, repository_name):
selected = AccessToken.select(AccessToken, Role)
with_repo = selected.join(Repository)
with_role = with_repo.switch(AccessToken).join(Role)
return with_role.where(Repository.name == repository_name,
Repository.namespace == namespace_name,
AccessToken.temporary == False)
def get_repo_delegate_token(namespace_name, repository_name, code):
repo_query = get_repository_delegate_tokens(namespace_name, repository_name)
found = list(repo_query.where(AccessToken.code == code))
if found:
return found[0]
else:
raise InvalidTokenException('Unable to find token with code: %s' % code)
def set_repo_delegate_token_role(namespace_name, repository_name, code, role):
token = get_repo_delegate_token(namespace_name, repository_name, code)
if role != 'read' and role != 'write':
raise DataModelException('Invalid role for delegate token: %s' % role)
new_role = Role.get(Role.name == role)
token.role = new_role
token.save()
return token
def delete_delegate_token(namespace_name, repository_name, code):
token = get_repo_delegate_token(namespace_name, repository_name, code)
token.delete_instance()
def load_token_data(code):
""" Load the permissions for any token by code. """
selected = AccessToken.select(AccessToken, Repository, Role)
with_role = selected.join(Role)
with_repo = with_role.switch(AccessToken).join(Repository)
fetched = list(with_repo.where(AccessToken.code == code))
if fetched:
return fetched[0]
else:
raise InvalidTokenException('Invalid delegate token code: %s' % code)
def get_repository_build(request_dbid):
try:
return RepositoryBuild.get(RepositoryBuild == request_dbid)
except RepositoryBuild.DoesNotExist:
msg = 'Unable to locate a build by id: %s' % request_dbid
raise InvalidRepositoryBuildException(msg)
def list_repository_builds(namespace_name, repository_name):
joined = RepositoryBuild.select().join(Repository)
fetched = list(joined.where(Repository.name == repository_name,
Repository.namespace == namespace_name))
return fetched
def create_repository_build(repo, access_token, resource_key, tag):
return RepositoryBuild.create(repository=repo, access_token=access_token,
resource_key=resource_key, tag=tag)