850 lines
29 KiB
Python
850 lines
29 KiB
Python
import bcrypt
|
|
import logging
|
|
import dateutil.parser
|
|
import operator
|
|
|
|
from database import *
|
|
from util.validation import *
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DataModelException(Exception):
|
|
pass
|
|
|
|
|
|
class InvalidEmailAddressException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidUsernameException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidOrganizationException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidTeamException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidPasswordException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidTokenException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidRepositoryBuildException(DataModelException):
|
|
pass
|
|
|
|
|
|
def create_user(username, password, email):
|
|
if not validate_email(email):
|
|
raise InvalidEmailAddressException('Invalid email address: %s' % email)
|
|
if not validate_username(username):
|
|
raise InvalidUsernameException('Invalid username: %s' % username)
|
|
|
|
# We allow password none for the federated login case.
|
|
if password is not None and not validate_password(password):
|
|
raise InvalidPasswordException(INVALID_PASSWORD_MESSAGE)
|
|
|
|
try:
|
|
existing = User.get((User.username == username) | (User.email == email))
|
|
|
|
logger.debug('Existing user with same username or email.')
|
|
|
|
# A user already exists with either the same username or email
|
|
if existing.username == username:
|
|
raise InvalidUsernameException('Username has already been taken: %s' %
|
|
username)
|
|
raise InvalidEmailAddressException('Email has already been used: %s' %
|
|
email)
|
|
|
|
except User.DoesNotExist:
|
|
# This is actually the happy path
|
|
logger.debug('Email and username are unique!')
|
|
pass
|
|
|
|
try:
|
|
pw_hash = None
|
|
if password is not None:
|
|
pw_hash = bcrypt.hashpw(password, bcrypt.gensalt())
|
|
|
|
new_user = User.create(username=username, password_hash=pw_hash,
|
|
email=email)
|
|
return new_user
|
|
except Exception as ex:
|
|
raise DataModelException(ex.message)
|
|
|
|
|
|
def create_organization(name, email, creating_user):
|
|
try:
|
|
# Create the org
|
|
new_org = create_user(name, None, email)
|
|
new_org.organization = True
|
|
new_org.save()
|
|
|
|
# Create a team for the owners
|
|
owners_team = create_team('owners', new_org, 'admin')
|
|
|
|
# Add the user who created the org to the owners team
|
|
add_user_to_team(creating_user, owners_team)
|
|
|
|
return new_org
|
|
except InvalidUsernameException:
|
|
raise InvalidOrganizationException('Invalid organization name: %s' % name)
|
|
|
|
|
|
def create_team(name, org, team_role_name, description=''):
|
|
if not validate_username(name):
|
|
raise InvalidTeamException('Invalid team name: %s' % name)
|
|
|
|
if not org.organization:
|
|
raise InvalidOrganizationException('User with name %s is not an org.' %
|
|
org.username)
|
|
|
|
team_role = TeamRole.get(TeamRole.name == team_role_name)
|
|
return Team.create(name=name, organization=org, role=team_role,
|
|
description=description)
|
|
|
|
|
|
def __get_user_admin_teams(org_name, team_name, username):
|
|
Org = User.alias()
|
|
user_teams = Team.select().join(TeamMember).join(User)
|
|
with_org = user_teams.switch(Team).join(Org,
|
|
on=(Org.id == Team.organization))
|
|
with_role = with_org.switch(Team).join(TeamRole)
|
|
admin_teams = with_role.where(User.username == username,
|
|
Org.username == org_name,
|
|
TeamRole.name == 'admin')
|
|
return admin_teams
|
|
|
|
|
|
def remove_team(org_name, team_name, removed_by_username):
|
|
joined = Team.select(Team, TeamRole).join(User).switch(Team).join(TeamRole)
|
|
|
|
found = list(joined.where(User.organization == True,
|
|
User.username == org_name,
|
|
Team.name == team_name))
|
|
if not found:
|
|
raise InvalidTeamException('Team \'%s\' is not a team in org \'%s\'' %
|
|
(team_name, org_name))
|
|
|
|
team = found[0]
|
|
if team.role.name == 'admin':
|
|
admin_teams = list(__get_user_admin_teams(org_name, team_name,
|
|
removed_by_username))
|
|
|
|
if len(admin_teams) <= 1:
|
|
# The team we are trying to remove is the only admin team for this user
|
|
msg = ('Deleting team \'%s\' would remove all admin from user \'%s\'' %
|
|
(team_name, removed_by_username))
|
|
raise DataModelException(msg)
|
|
|
|
team.delete_instance(recursive=True, delete_nullable=True)
|
|
|
|
|
|
def add_user_to_team(user, team):
|
|
return TeamMember.create(user=user, team=team)
|
|
|
|
|
|
def remove_user_from_team(org_name, team_name, username, removed_by_username):
|
|
Org = User.alias()
|
|
joined = TeamMember.select().join(User).switch(TeamMember).join(Team)
|
|
with_role = joined.join(TeamRole)
|
|
with_org = with_role.switch(Team).join(Org,
|
|
on=(Org.id == Team.organization))
|
|
found = list(with_org.where(User.username == username,
|
|
Org.username == org_name,
|
|
Team.name == team_name))
|
|
|
|
if not found:
|
|
raise DataModelException('User %s does not belong to team %s' %
|
|
(username, teamname))
|
|
|
|
if username == removed_by_username:
|
|
admin_team_query = __get_user_admin_teams(org_name, team_name, username)
|
|
admin_team_names = {team.name for team in admin_team_query}
|
|
if team_name in admin_team_names and len(admin_team_names) <= 1:
|
|
msg = 'User cannot remove themselves from their only admin team.'
|
|
raise DataModelException(msg)
|
|
|
|
user_in_team = found[0]
|
|
user_in_team.delete_instance()
|
|
|
|
|
|
def get_team_org_role(team):
|
|
return TeamRole.get(TeamRole.id == team.role.id)
|
|
|
|
|
|
def set_team_org_permission(team, team_role_name, set_by_username):
|
|
if team.role.name == 'admin' and team_role_name != 'admin':
|
|
# We need to make sure we're not removing the users only admin role
|
|
user_admin_teams = __get_user_admin_teams(team.organization.username,
|
|
team.name, set_by_username)
|
|
admin_team_set = {admin_team.name for admin_team in user_admin_teams}
|
|
if team.name in admin_team_set and len(admin_team_set) <= 1:
|
|
msg = (('Cannot remove admin from team \'%s\' because calling user ' +
|
|
'would no longer have admin on org \'%s\'') %
|
|
(team.name, team.organization.username))
|
|
raise DataModelException(msg)
|
|
|
|
new_role = TeamRole.get(TeamRole.name == team_role_name)
|
|
team.role = new_role
|
|
team.save()
|
|
return team
|
|
|
|
|
|
def create_federated_user(username, email, service_name, service_id):
|
|
new_user = create_user(username, None, email)
|
|
new_user.verified = True
|
|
new_user.save()
|
|
|
|
service = LoginService.get(LoginService.name == service_name)
|
|
federated_user = FederatedLogin.create(user=new_user, service=service,
|
|
service_ident=service_id)
|
|
|
|
return new_user
|
|
|
|
def verify_federated_login(service_name, service_id):
|
|
selected = FederatedLogin.select(FederatedLogin, User)
|
|
with_service = selected.join(LoginService)
|
|
with_user = with_service.switch(FederatedLogin).join(User)
|
|
found = with_user.where(FederatedLogin.service_ident == service_id,
|
|
LoginService.name == service_name)
|
|
|
|
found_list = list(found)
|
|
|
|
if found_list:
|
|
return found_list[0].user
|
|
|
|
return None
|
|
|
|
|
|
def create_confirm_email_code(user):
|
|
code = EmailConfirmation.create(user=user, email_confirm=True)
|
|
return code
|
|
|
|
|
|
def confirm_user_email(code):
|
|
code = EmailConfirmation.get(EmailConfirmation.code == code,
|
|
EmailConfirmation.email_confirm == True)
|
|
|
|
user = code.user
|
|
user.verified = True
|
|
user.save()
|
|
|
|
code.delete_instance()
|
|
|
|
return user
|
|
|
|
|
|
def create_reset_password_email_code(email):
|
|
try:
|
|
user = User.get(User.email == email)
|
|
except User.DoesNotExist:
|
|
raise InvalidEmailAddressException('Email address was not found.');
|
|
|
|
code = EmailConfirmation.create(user=user, pw_reset=True)
|
|
return code
|
|
|
|
|
|
def validate_reset_code(code):
|
|
try:
|
|
code = EmailConfirmation.get(EmailConfirmation.code == code,
|
|
EmailConfirmation.pw_reset == True)
|
|
except EmailConfirmation.DoesNotExist:
|
|
return None
|
|
|
|
user = code.user
|
|
code.delete_instance()
|
|
|
|
return user
|
|
|
|
|
|
def get_user(username):
|
|
try:
|
|
return User.get(User.username == username, User.organization == False)
|
|
except User.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def get_matching_teams(team_prefix, organization):
|
|
query = Team.select().where(Team.name ** (team_prefix + '%'),
|
|
Team.organization == organization)
|
|
return list(query.limit(10))
|
|
|
|
|
|
def get_matching_users(username_prefix, organization=None):
|
|
Org = User.alias()
|
|
users_no_orgs = (User.username ** (username_prefix + '%') &
|
|
(User.organization == False))
|
|
query = User.select(User.username, Org.username).where(users_no_orgs)
|
|
|
|
if organization:
|
|
with_team = query.join(TeamMember, JOIN_LEFT_OUTER).join(Team,
|
|
JOIN_LEFT_OUTER)
|
|
with_org = with_team.join(Org, JOIN_LEFT_OUTER,
|
|
on=(Org.id == Team.organization))
|
|
query = with_org.where((Org.id == organization) | (Org.id >> None))
|
|
|
|
|
|
class MatchingUserResult(object):
|
|
def __init__(self, *args):
|
|
self.username = args[0]
|
|
if organization:
|
|
self.is_org_member = (args[1] == organization.username)
|
|
else:
|
|
self.is_org_member = None
|
|
|
|
|
|
return (MatchingUserResult(*args) for args in query.tuples().limit(10))
|
|
|
|
|
|
def verify_user(username, password):
|
|
try:
|
|
fetched = User.get(User.username == username)
|
|
except User.DoesNotExist:
|
|
return None
|
|
|
|
if (fetched.password_hash and
|
|
bcrypt.hashpw(password, fetched.password_hash) ==
|
|
fetched.password_hash):
|
|
return fetched
|
|
|
|
# We weren't able to authorize the user
|
|
return None
|
|
|
|
|
|
def get_user_organizations(username):
|
|
UserAlias = User.alias()
|
|
all_teams = User.select().join(Team).join(TeamMember)
|
|
with_user = all_teams.join(UserAlias, on=(UserAlias.id == TeamMember.user))
|
|
return with_user.where(User.organization == True,
|
|
UserAlias.username == username)
|
|
|
|
|
|
def get_organization(name):
|
|
try:
|
|
return User.get(username=name, organization=True)
|
|
except User.DoesNotExist:
|
|
raise InvalidOrganizationException('Organization does not exist: %s' %
|
|
name)
|
|
|
|
|
|
def get_organization_team(orgname, teamname):
|
|
joined = Team.select().join(User)
|
|
query = joined.where(Team.name == teamname, User.organization == True,
|
|
User.username == orgname).limit(1)
|
|
result = list(query)
|
|
if not result:
|
|
raise InvalidTeamException('Team does not exist: %s/%s', orgname,
|
|
teamname)
|
|
|
|
return result[0]
|
|
|
|
|
|
def get_organization_team_members(teamid):
|
|
joined = User.select().join(TeamMember).join(Team)
|
|
query = joined.where(Team.id == teamid)
|
|
return query
|
|
|
|
|
|
def get_organization_member_set(orgname):
|
|
Org = User.alias()
|
|
user_teams = User.select(User.username).join(TeamMember).join(Team)
|
|
with_org = user_teams.join(Org, on=(Org.username == orgname))
|
|
return {user.username for user in with_org}
|
|
|
|
|
|
def get_teams_within_org(organization):
|
|
return Team.select().where(Team.organization == organization)
|
|
|
|
|
|
def get_user_teams_within_org(username, organization):
|
|
joined = Team.select().join(TeamMember).join(User)
|
|
return joined.where(Team.organization == organization,
|
|
User.username == username)
|
|
|
|
|
|
def get_visible_repositories(username=None, include_public=True, limit=None,
|
|
sort=False, namespace=None):
|
|
if not username and not include_public:
|
|
return []
|
|
|
|
query = Repository.select().distinct()
|
|
|
|
if namespace:
|
|
query = query.where(Repository.namespace == namespace)
|
|
|
|
query = query.join(Visibility)
|
|
or_clauses = []
|
|
if include_public:
|
|
or_clauses.append((Visibility.name == 'public'))
|
|
|
|
|
|
if username:
|
|
with_perms = query.switch(Repository).join(RepositoryPermission,
|
|
JOIN_LEFT_OUTER)
|
|
query = with_perms.join(User)
|
|
or_clauses.append(User.username == username)
|
|
|
|
if sort:
|
|
with_images = query.switch(Repository).join(Image, JOIN_LEFT_OUTER)
|
|
query = with_images.order_by(Image.created.desc())
|
|
|
|
if (or_clauses):
|
|
query = query.where(reduce(operator.or_, or_clauses))
|
|
|
|
if limit:
|
|
query = query.limit(limit)
|
|
|
|
return query
|
|
|
|
|
|
def get_matching_repositories(repo_term, username=None):
|
|
namespace_term = repo_term
|
|
name_term = repo_term
|
|
|
|
visible = get_visible_repositories(username)
|
|
|
|
search_clauses = (Repository.name ** ('%' + name_term + '%') |
|
|
Repository.namespace ** ('%' + namespace_term + '%'))
|
|
|
|
# Handle the case where the user has already entered a namespace path.
|
|
if repo_term.find('/') > 0:
|
|
parts = repo_term.split('/', 1)
|
|
namespace_term = '/'.join(parts[:-1])
|
|
name_term = parts[-1]
|
|
|
|
search_clauses = (Repository.name ** ('%' + name_term + '%') &
|
|
Repository.namespace ** ('%' + namespace_term + '%'))
|
|
|
|
final = visible.where(search_clauses).limit(10)
|
|
return list(final)
|
|
|
|
|
|
def change_password(user, new_password):
|
|
if not validate_password(new_password):
|
|
raise InvalidPasswordException(INVALID_PASSWORD_MESSAGE)
|
|
|
|
pw_hash = bcrypt.hashpw(new_password, bcrypt.gensalt())
|
|
user.password_hash = pw_hash
|
|
user.save()
|
|
|
|
|
|
def update_email(user, new_email):
|
|
user.email = new_email
|
|
user.verified = False
|
|
user.save()
|
|
|
|
|
|
def get_all_user_permissions(user):
|
|
select = RepositoryPermission.select(RepositoryPermission, Role, Repository)
|
|
with_role = select.join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
through_user = with_repo.switch(RepositoryPermission).join(User,
|
|
JOIN_LEFT_OUTER)
|
|
as_perm = through_user.switch(RepositoryPermission)
|
|
through_team = as_perm.join(Team, JOIN_LEFT_OUTER).join(TeamMember,
|
|
JOIN_LEFT_OUTER)
|
|
|
|
UserThroughTeam = User.alias()
|
|
with_team_member = through_team.join(UserThroughTeam, JOIN_LEFT_OUTER,
|
|
on=(UserThroughTeam.id ==
|
|
TeamMember.user))
|
|
|
|
return with_team_member.where((User.id == user) |
|
|
(UserThroughTeam.id == user))
|
|
|
|
|
|
def get_org_wide_permissions(user):
|
|
Org = User.alias()
|
|
team_with_role = Team.select(Team, Org, TeamRole).join(TeamRole)
|
|
with_org = team_with_role.switch(Team).join(Org, on=(Team.organization ==
|
|
Org.id))
|
|
with_user = with_org.switch(Team).join(TeamMember).join(User)
|
|
return with_user.where(User.id == user, Org.organization == True)
|
|
|
|
|
|
def get_all_repo_teams(namespace_name, repository_name):
|
|
select = RepositoryPermission.select(Team.name.alias('team_name'),
|
|
Role.name, RepositoryPermission)
|
|
with_team = select.join(Team)
|
|
with_role = with_team.switch(RepositoryPermission).join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
return with_repo.where(Repository.namespace == namespace_name,
|
|
Repository.name == repository_name)
|
|
|
|
|
|
def get_all_repo_users(namespace_name, repository_name):
|
|
select = RepositoryPermission.select(User.username, Role.name,
|
|
RepositoryPermission)
|
|
with_user = select.join(User)
|
|
with_role = with_user.switch(RepositoryPermission).join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
return with_repo.where(Repository.namespace == namespace_name,
|
|
Repository.name == repository_name)
|
|
|
|
|
|
def get_repository(namespace_name, repository_name):
|
|
try:
|
|
return Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
except Repository.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def get_repo_image(namespace_name, repository_name, image_id):
|
|
joined = Image.select().join(Repository)
|
|
query = joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
Image.docker_image_id == image_id).limit(1)
|
|
result = list(query)
|
|
if not result:
|
|
return None
|
|
|
|
return result[0]
|
|
|
|
|
|
def repository_is_public(namespace_name, repository_name):
|
|
joined = Repository.select().join(Visibility)
|
|
query = joined.where(Repository.namespace == namespace_name,
|
|
Repository.name == repository_name,
|
|
Visibility.name == 'public')
|
|
return len(list(query)) > 0
|
|
|
|
|
|
def set_repository_visibility(repo, visibility):
|
|
visibility_obj = Visibility.get(name=visibility)
|
|
if not visibility_obj:
|
|
return
|
|
|
|
repo.visibility = visibility_obj
|
|
repo.save()
|
|
|
|
|
|
def create_repository(namespace, name, owner, visibility='private'):
|
|
private = Visibility.get(name=visibility)
|
|
repo = Repository.create(namespace=namespace, name=name,
|
|
visibility=private)
|
|
admin = Role.get(name='admin')
|
|
|
|
if owner and not owner.organization:
|
|
permission = RepositoryPermission.create(user=owner, repository=repo,
|
|
role=admin)
|
|
return repo
|
|
|
|
|
|
def create_image(docker_image_id, repository):
|
|
new_image = Image.create(docker_image_id=docker_image_id,
|
|
repository=repository)
|
|
return new_image
|
|
|
|
|
|
def set_image_checksum(docker_image_id, repository, checksum):
|
|
fetched = Image.get(Image.docker_image_id == docker_image_id,
|
|
Image.repository == repository)
|
|
fetched.checksum = checksum
|
|
fetched.save()
|
|
return fetched
|
|
|
|
|
|
def set_image_metadata(docker_image_id, namespace_name, repository_name,
|
|
created_date_str, comment, parent=None):
|
|
joined = Image.select().join(Repository)
|
|
image_list = list(joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
Image.docker_image_id == docker_image_id))
|
|
|
|
if not image_list:
|
|
raise DataModelException('No image with specified id and repository')
|
|
|
|
fetched = image_list[0]
|
|
fetched.created = dateutil.parser.parse(created_date_str)
|
|
fetched.comment = comment
|
|
|
|
if parent:
|
|
fetched.ancestors = '%s%s/' % (parent.ancestors, parent.id)
|
|
|
|
fetched.save()
|
|
return fetched
|
|
|
|
|
|
def get_repository_images(namespace_name, repository_name):
|
|
joined = Image.select().join(Repository)
|
|
return joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
|
|
def list_repository_tags(namespace_name, repository_name):
|
|
select = RepositoryTag.select(RepositoryTag, Image)
|
|
with_repo = select.join(Repository)
|
|
with_image = with_repo.switch(RepositoryTag).join(Image)
|
|
return with_image.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
|
|
|
|
def get_tag_image(namespace_name, repository_name, tag_name):
|
|
joined = Image.select().join(RepositoryTag).join(Repository)
|
|
fetched = list(joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
RepositoryTag.name == tag_name))
|
|
|
|
if not fetched:
|
|
raise DataModelException('Unable to find image for tag.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def get_image_by_id(namespace_name, repository_name, docker_image_id):
|
|
joined = Image.select().join(Repository)
|
|
fetched = list(joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
Image.docker_image_id == docker_image_id))
|
|
|
|
if not fetched:
|
|
raise DataModelException('Unable to find image for tag with repo.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def get_parent_images(image_obj):
|
|
""" Returns a list of parent Image objects in chronilogical order. """
|
|
parents = image_obj.ancestors
|
|
parent_db_ids = parents.strip('/').split('/')
|
|
|
|
if parent_db_ids == ['']:
|
|
return []
|
|
|
|
or_clauses = [(Image.id == db_id) for db_id in parent_db_ids]
|
|
parent_images = Image.select().where(reduce(operator.or_, or_clauses))
|
|
id_to_image = {unicode(image.id): image for image in parent_images}
|
|
|
|
return [id_to_image[parent_id] for parent_id in parent_db_ids]
|
|
|
|
|
|
def create_or_update_tag(namespace_name, repository_name, tag_name,
|
|
tag_docker_image_id):
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
image = Image.get(Image.docker_image_id == tag_docker_image_id)
|
|
|
|
try:
|
|
tag = RepositoryTag.get(RepositoryTag.repository == repo,
|
|
RepositoryTag.name == tag_name)
|
|
tag.image = image
|
|
tag.save()
|
|
except RepositoryTag.DoesNotExist:
|
|
tag = RepositoryTag.create(repository=repo, image=image, name=tag_name)
|
|
|
|
return tag
|
|
|
|
|
|
def delete_tag(namespace_name, repository_name, tag_name):
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
tag = RepositoryTag.get(RepositoryTag.repository == repo,
|
|
RepositoryTag.name == tag_name)
|
|
tag.delete_instance()
|
|
|
|
|
|
def delete_all_repository_tags(namespace_name, repository_name):
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
RepositoryTag.delete().where(RepositoryTag.repository == repo)
|
|
|
|
|
|
def __entity_permission_repo_query(entity_id, entity_table,
|
|
entity_id_property, namespace_name,
|
|
repository_name):
|
|
""" This method works for both users and teams. """
|
|
selected = RepositoryPermission.select(entity_table, Repository, Role,
|
|
RepositoryPermission)
|
|
with_user = selected.join(entity_table)
|
|
with_role = with_user.switch(RepositoryPermission).join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
return with_repo.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
entity_id_property == entity_id)
|
|
|
|
|
|
def get_user_reponame_permission(username, namespace_name, repository_name):
|
|
fetched = list(__entity_permission_repo_query(username, User, User.username,
|
|
namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('User does not have permission for repo.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def get_team_reponame_permission(team_name, namespace_name, repository_name):
|
|
fetched = list(__entity_permission_repo_query(team_name, Team, Team.name,
|
|
namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('Team does not have permission for repo.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def delete_user_permission(username, namespace_name, repository_name):
|
|
if username == namespace_name:
|
|
raise DataModelException('Namespace owner must always be admin.')
|
|
|
|
fetched = list(__entity_permission_repo_query(username, User, User.username,
|
|
namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('User does not have permission for repo.')
|
|
|
|
fetched[0].delete_instance()
|
|
|
|
|
|
def delete_team_permission(team_name, namespace_name, repository_name):
|
|
fetched = list(__entity_permission_repo_query(team_name, Team, Team.name,
|
|
namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('Team does not have permission for repo.')
|
|
|
|
fetched[0].delete_instance()
|
|
|
|
|
|
def __set_entity_repo_permission(entity_id, entity_table, entity_id_property,
|
|
permission_entity_property, namespace_name,
|
|
repository_name, role_name):
|
|
entity = entity_table.get(entity_id_property == entity_id)
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
new_role = Role.get(Role.name == role_name)
|
|
|
|
# Fetch any existing permission for this user on the repo
|
|
try:
|
|
entity_attr = getattr(RepositoryPermission, permission_entity_property)
|
|
perm = RepositoryPermission.get(entity_attr == entity,
|
|
RepositoryPermission.repository == repo)
|
|
perm.role = new_role
|
|
perm.save()
|
|
return perm
|
|
except RepositoryPermission.DoesNotExist:
|
|
set_entity_kwargs = {permission_entity_property: entity}
|
|
new_perm = RepositoryPermission.create(repository=repo, role=new_role,
|
|
**set_entity_kwargs)
|
|
return new_perm
|
|
|
|
|
|
def set_user_repo_permission(username, namespace_name, repository_name,
|
|
role_name):
|
|
if username == namespace_name:
|
|
raise DataModelException('Namespace owner must always be admin.')
|
|
|
|
return __set_entity_repo_permission(username, User, User.username, 'user',
|
|
namespace_name, repository_name,
|
|
role_name)
|
|
|
|
|
|
def set_team_repo_permission(team_name, namespace_name, repository_name,
|
|
role_name):
|
|
return __set_entity_repo_permission(team_name, Team, Team.name, 'team',
|
|
namespace_name, repository_name,
|
|
role_name)
|
|
|
|
|
|
def purge_repository(namespace_name, repository_name):
|
|
fetched = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
fetched.delete_instance(recursive=True, delete_nullable=True)
|
|
|
|
|
|
def get_private_repo_count(username):
|
|
joined = Repository.select().join(Visibility)
|
|
return joined.where(Repository.namespace == username,
|
|
Visibility.name == 'private').count()
|
|
|
|
|
|
def create_access_token(repository, role):
|
|
role = Role.get(Role.name == role)
|
|
new_token = AccessToken.create(repository=repository, temporary=True,
|
|
role=role)
|
|
return new_token
|
|
|
|
|
|
def create_delegate_token(namespace_name, repository_name, friendly_name):
|
|
read_only = Role.get(name='read')
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
new_token = AccessToken.create(repository=repo, role=read_only,
|
|
friendly_name=friendly_name, temporary=False)
|
|
return new_token
|
|
|
|
|
|
def get_repository_delegate_tokens(namespace_name, repository_name):
|
|
selected = AccessToken.select(AccessToken, Role)
|
|
with_repo = selected.join(Repository)
|
|
with_role = with_repo.switch(AccessToken).join(Role)
|
|
return with_role.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
AccessToken.temporary == False)
|
|
|
|
|
|
def get_repo_delegate_token(namespace_name, repository_name, code):
|
|
repo_query = get_repository_delegate_tokens(namespace_name, repository_name)
|
|
found = list(repo_query.where(AccessToken.code == code))
|
|
|
|
if found:
|
|
return found[0]
|
|
else:
|
|
raise InvalidTokenException('Unable to find token with code: %s' % code)
|
|
|
|
|
|
def set_repo_delegate_token_role(namespace_name, repository_name, code, role):
|
|
token = get_repo_delegate_token(namespace_name, repository_name, code)
|
|
|
|
if role != 'read' and role != 'write':
|
|
raise DataModelException('Invalid role for delegate token: %s' % role)
|
|
|
|
new_role = Role.get(Role.name == role)
|
|
token.role = new_role
|
|
token.save()
|
|
|
|
return token
|
|
|
|
|
|
def delete_delegate_token(namespace_name, repository_name, code):
|
|
token = get_repo_delegate_token(namespace_name, repository_name, code)
|
|
token.delete_instance()
|
|
|
|
|
|
def load_token_data(code):
|
|
""" Load the permissions for any token by code. """
|
|
selected = AccessToken.select(AccessToken, Repository, Role)
|
|
with_role = selected.join(Role)
|
|
with_repo = with_role.switch(AccessToken).join(Repository)
|
|
fetched = list(with_repo.where(AccessToken.code == code))
|
|
|
|
if fetched:
|
|
return fetched[0]
|
|
else:
|
|
raise InvalidTokenException('Invalid delegate token code: %s' % code)
|
|
|
|
|
|
def list_repository_builds(namespace_name, repository_name,
|
|
include_inactive=True):
|
|
joined = RepositoryBuild.select().join(Repository)
|
|
filtered = joined
|
|
if not include_inactive:
|
|
filtered = filtered.where(RepositoryBuild.phase != 'error',
|
|
RepositoryBuild.phase != 'complete')
|
|
fetched = list(filtered.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name))
|
|
return fetched
|
|
|
|
|
|
def create_repository_build(repo, access_token, resource_key, tag):
|
|
return RepositoryBuild.create(repository=repo, access_token=access_token,
|
|
resource_key=resource_key, tag=tag)
|