This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/model.py

170 lines
5.3 KiB
Python

import bcrypt
import logging
from database import *
logger = logging.getLogger(__name__)
def create_user(username, password, email):
pw_hash = bcrypt.hashpw(password, bcrypt.gensalt())
new_user = User.create(username=username, password_hash=pw_hash,
email=email)
return new_user
def get_user(username):
try:
return User.get(User.username == username)
except User.DoesNotExist:
return None
def verify_user(username, password):
try:
fetched = User.get(User.username == username)
except User.DoesNotExist:
return None
if bcrypt.hashpw(password, fetched.password_hash) == fetched.password_hash:
return fetched
# We weren't able to authorize the user
return None
def verify_token(code, namespace_name, repository_name):
joined = AccessToken.select(AccessToken, Repository).join(Repository)
tokens = list(joined.where(AccessToken.code == code and
Repository.namespace == namespace_name and
Repository.name == repository_name))
if tokens:
return tokens[0]
return None
def change_password(user, new_password):
pw_hash = bcrypt.hashpw(new_password, bcrypt.gensalt())
user.password_hash = pw_hash
user.save()
def update_email(user, new_email):
user.email = new_email
user.verified = False
user.save()
def get_all_repo_permissions(user):
select = User.select(User, Repository, RepositoryPermission)
joined = select.join(RepositoryPermission).join(Repository)
return joined.where(User.username == user.username)
def get_repository(namespace, name):
try:
return Repository.get(Repository.name == name and
Repository.namespace == namespace)
except Repository.DoesNotExist:
return None
def get_user_repositories(user):
select = RepositoryPermission.select(RepositoryPermission, Repository, Role)
with_user = select.join(User).where(User.username == user.username)
with_role = with_user.switch(RepositoryPermission).join(Role)
with_repo = with_role.switch(RepositoryPermission).join(Repository)
return with_repo
def create_repository(namespace, name, owner):
private = Visibility.get(name='private')
repo = Repository.create(namespace=namespace, name=name,
visibility=private)
admin = Role.get(name='admin')
permission = RepositoryPermission.create(user=owner, repository=repo,
role=admin)
return repo
def create_image(image_id):
new_image = Image.create(image_id=image_id)
return new_image
def set_image_checksum(image_id, checksum):
fetched = Image.get(Image.image_id == image_id)
fetched.checksum = checksum
fetched.save()
return fetched
def assign_image_repository(repository, image, tag):
repo_image = RepositoryImage.create(repository=repository, image=image,
tag=tag)
return repo_image
def get_repository_images(namespace_name, repository_name):
select = Image.select(Image, RepositoryImage)
joined = select.join(RepositoryImage).join(Repository)
return joined.where(Repository.name == repository_name and
Repository.namespace == namespace_name)
def list_repository_tags(namespace_name, repository_name):
select = RepositoryTag.select(RepositoryTag, Image)
with_repo = select.join(Repository)
with_image = with_repo.switch(RepositoryTag).join(Image)
return with_image.where(Repository.name == repository_name and
Repository.namespace == namespace_name)
def get_tag_image(namespace_name, repository_name, tag_name):
joined = Image.select().join(RepositoryTag).join(Repository)
return joined.where(Repository.name == repository_name and
Repository.namespace == namespace_name and
RepositoryTag.name == tag_name)
def create_or_update_tag(namespace_name, repository_name, tag_name,
tag_image_id):
repo = Repository.get(Repository.name == repository_name and
Repository.namespace == namespace_name)
image = Image.get(Image.image_id == tag_image_id)
try:
tag = RepositoryTag.get(RepositoryTag.repository == repo and
RepositoryTag.name == tag_name)
tag.image = image
tag.save()
except RepositoryTag.DoesNotExist:
tag = RepositoryTag.create(repository=repo, image=image, name=tag_name)
return tag
def delete_tag(namespace_name, repository_name, tag_name):
repo = Repository.get(Repository.name == repository_name and
Repository.namespace == namespace_name)
tag = RepositoryTag.get(RepositoryTag.repository == repo and
RepositoryTag.name == tag_name)
tag.delete_instance()
def delete_all_repository_tags(namespace_name, repository_name):
repo = Repository.get(Repository.name == repository_name and
Repository.namespace == namespace_name)
RepositoryTag.delete().where(RepositoryTag.repository == repo)
def create_access_token(repository, user):
new_token = AccessToken.create(user=user, repository=repository)
return new_token
def get_user_repo_permissions(user, repository):
select = RepositoryPermission.select()
return select.where(RepositoryPermission.user == user and
RepositoryPermission.repository == repository)