This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/model.py

91 lines
2.4 KiB
Python
Raw Normal View History

import bcrypt
from database import User, Repository, Image, RepositoryImage, AccessToken
def create_user(username, password, email):
pw_hash = bcrypt.hashpw(password, bcrypt.gensalt())
new_user = User.create(username=username, password_hash=pw_hash,
email=email)
return new_user
def verify_user(username, password):
try:
fetched = User.get(User.username == username)
except User.DoesNotExist:
return None
if bcrypt.hashpw(password, fetched.password_hash) == fetched.password_hash:
return fetched
# We weren't able to authorize the user
return None
def change_password(user, new_password):
pw_hash = bcrypt.hashpw(new_password, bcrypt.gensalt())
user.password_hash = pw_hash
user.save()
def update_email(user, new_email):
user.email = new_email
user.verified = False
user.save()
def create_or_fetch_repository(namespace, name):
try:
repo = Repository.get(Repository.name == name and
Repository.namespace == namespace)
except Repository.DoesNotExist:
repo = Repository.create(namespace=namespace, name=name)
return repo
def create_image(image_id):
new_image = Image.create(image_id=image_id)
return new_image
def set_image_checksum(image_id, checksum):
fetched = Image.get(Image.image_id == image_id)
fetched.checksum = checksum
fetched.save()
return fetched
def assign_image_repository(repository, image, tag):
repo_image = RepositoryImage.create(repository=repository, image=image,
tag=tag)
return repo_image
def get_repository_images(namespace_name, repository_name):
select = Image.select(Image, RepositoryImage)
joined = select.join(RepositoryImage).join(Repository)
return joined.where(Repository.name == repository_name and
Repository.namespace == namespace_name)
def create_access_token(repository):
new_token = AccessToken.create(repository=repository)
return new_token
def verify_token(namespace_name, repository_name, code):
try:
fetched_repo = Repository.get(Repository.namespace == namespace_name and
Repository.name == repository_name)
except Repository.DoesNotExist:
return None
try:
fetched = AccessToken.get(AccessToken.code == code and
AccessToken.repository == fetched_repo)
except AccessToken.DoesNotExist:
return None
return fetched