- Add a "can_create_repo" entry to the organization and have orgs grayed out in the new repo view if the user cannot create a repo - Fix the multiple-orgs bug in the model - Have the "create new repository" button disappear on landing if the org is selected and the user does not have create permissions for that org
857 lines
29 KiB
Python
857 lines
29 KiB
Python
import bcrypt
|
|
import logging
|
|
import dateutil.parser
|
|
import operator
|
|
|
|
from database import *
|
|
from util.validation import *
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DataModelException(Exception):
|
|
pass
|
|
|
|
|
|
class InvalidEmailAddressException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidUsernameException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidOrganizationException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidTeamException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidPasswordException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidTokenException(DataModelException):
|
|
pass
|
|
|
|
|
|
class InvalidRepositoryBuildException(DataModelException):
|
|
pass
|
|
|
|
|
|
def create_user(username, password, email):
|
|
if not validate_email(email):
|
|
raise InvalidEmailAddressException('Invalid email address: %s' % email)
|
|
if not validate_username(username):
|
|
raise InvalidUsernameException('Invalid username: %s' % username)
|
|
|
|
# We allow password none for the federated login case.
|
|
if password is not None and not validate_password(password):
|
|
raise InvalidPasswordException(INVALID_PASSWORD_MESSAGE)
|
|
|
|
try:
|
|
existing = User.get((User.username == username) | (User.email == email))
|
|
|
|
logger.debug('Existing user with same username or email.')
|
|
|
|
# A user already exists with either the same username or email
|
|
if existing.username == username:
|
|
raise InvalidUsernameException('Username has already been taken: %s' %
|
|
username)
|
|
raise InvalidEmailAddressException('Email has already been used: %s' %
|
|
email)
|
|
|
|
except User.DoesNotExist:
|
|
# This is actually the happy path
|
|
logger.debug('Email and username are unique!')
|
|
pass
|
|
|
|
try:
|
|
pw_hash = None
|
|
if password is not None:
|
|
pw_hash = bcrypt.hashpw(password, bcrypt.gensalt())
|
|
|
|
new_user = User.create(username=username, password_hash=pw_hash,
|
|
email=email)
|
|
return new_user
|
|
except Exception as ex:
|
|
raise DataModelException(ex.message)
|
|
|
|
|
|
def create_organization(name, email, creating_user):
|
|
try:
|
|
# Create the org
|
|
new_org = create_user(name, None, email)
|
|
new_org.organization = True
|
|
new_org.save()
|
|
|
|
# Create a team for the owners
|
|
owners_team = create_team('owners', new_org, 'admin')
|
|
|
|
# Add the user who created the org to the owners team
|
|
add_user_to_team(creating_user, owners_team)
|
|
|
|
return new_org
|
|
except InvalidUsernameException:
|
|
raise InvalidOrganizationException('Invalid organization name: %s' % name)
|
|
|
|
|
|
def create_team(name, org, team_role_name, description=''):
|
|
if not validate_username(name):
|
|
raise InvalidTeamException('Invalid team name: %s' % name)
|
|
|
|
if not org.organization:
|
|
raise InvalidOrganizationException('User with name %s is not an org.' %
|
|
org.username)
|
|
|
|
team_role = TeamRole.get(TeamRole.name == team_role_name)
|
|
return Team.create(name=name, organization=org, role=team_role,
|
|
description=description)
|
|
|
|
|
|
def __get_user_admin_teams(org_name, team_name, username):
|
|
Org = User.alias()
|
|
user_teams = Team.select().join(TeamMember).join(User)
|
|
with_org = user_teams.switch(Team).join(Org,
|
|
on=(Org.id == Team.organization))
|
|
with_role = with_org.switch(Team).join(TeamRole)
|
|
admin_teams = with_role.where(User.username == username,
|
|
Org.username == org_name,
|
|
TeamRole.name == 'admin')
|
|
return admin_teams
|
|
|
|
|
|
def remove_team(org_name, team_name, removed_by_username):
|
|
joined = Team.select(Team, TeamRole).join(User).switch(Team).join(TeamRole)
|
|
|
|
found = list(joined.where(User.organization == True,
|
|
User.username == org_name,
|
|
Team.name == team_name))
|
|
if not found:
|
|
raise InvalidTeamException('Team \'%s\' is not a team in org \'%s\'' %
|
|
(team_name, org_name))
|
|
|
|
team = found[0]
|
|
if team.role.name == 'admin':
|
|
admin_teams = list(__get_user_admin_teams(org_name, team_name,
|
|
removed_by_username))
|
|
|
|
if len(admin_teams) <= 1:
|
|
# The team we are trying to remove is the only admin team for this user
|
|
msg = ('Deleting team \'%s\' would remove all admin from user \'%s\'' %
|
|
(team_name, removed_by_username))
|
|
raise DataModelException(msg)
|
|
|
|
team.delete_instance(recursive=True, delete_nullable=True)
|
|
|
|
|
|
def add_user_to_team(user, team):
|
|
return TeamMember.create(user=user, team=team)
|
|
|
|
|
|
def remove_user_from_team(org_name, team_name, username, removed_by_username):
|
|
Org = User.alias()
|
|
joined = TeamMember.select().join(User).switch(TeamMember).join(Team)
|
|
with_role = joined.join(TeamRole)
|
|
with_org = with_role.switch(Team).join(Org,
|
|
on=(Org.id == Team.organization))
|
|
found = list(with_org.where(User.username == username,
|
|
Org.username == org_name,
|
|
Team.name == team_name))
|
|
|
|
if not found:
|
|
raise DataModelException('User %s does not belong to team %s' %
|
|
(username, teamname))
|
|
|
|
if username == removed_by_username:
|
|
admin_team_query = __get_user_admin_teams(org_name, team_name, username)
|
|
admin_team_names = {team.name for team in admin_team_query}
|
|
if team_name in admin_team_names and len(admin_team_names) <= 1:
|
|
msg = 'User cannot remove themselves from their only admin team.'
|
|
raise DataModelException(msg)
|
|
|
|
user_in_team = found[0]
|
|
user_in_team.delete_instance()
|
|
|
|
|
|
def get_team_org_role(team):
|
|
return TeamRole.get(TeamRole.id == team.role.id)
|
|
|
|
|
|
def set_team_org_permission(team, team_role_name, set_by_username):
|
|
if team.role.name == 'admin' and team_role_name != 'admin':
|
|
# We need to make sure we're not removing the users only admin role
|
|
user_admin_teams = __get_user_admin_teams(team.organization.username,
|
|
team.name, set_by_username)
|
|
admin_team_set = {admin_team.name for admin_team in user_admin_teams}
|
|
if team.name in admin_team_set and len(admin_team_set) <= 1:
|
|
msg = (('Cannot remove admin from team \'%s\' because calling user ' +
|
|
'would no longer have admin on org \'%s\'') %
|
|
(team.name, team.organization.username))
|
|
raise DataModelException(msg)
|
|
|
|
new_role = TeamRole.get(TeamRole.name == team_role_name)
|
|
team.role = new_role
|
|
team.save()
|
|
return team
|
|
|
|
|
|
def create_federated_user(username, email, service_name, service_id):
|
|
new_user = create_user(username, None, email)
|
|
new_user.verified = True
|
|
new_user.save()
|
|
|
|
service = LoginService.get(LoginService.name == service_name)
|
|
federated_user = FederatedLogin.create(user=new_user, service=service,
|
|
service_ident=service_id)
|
|
|
|
return new_user
|
|
|
|
def verify_federated_login(service_name, service_id):
|
|
selected = FederatedLogin.select(FederatedLogin, User)
|
|
with_service = selected.join(LoginService)
|
|
with_user = with_service.switch(FederatedLogin).join(User)
|
|
found = with_user.where(FederatedLogin.service_ident == service_id,
|
|
LoginService.name == service_name)
|
|
|
|
found_list = list(found)
|
|
|
|
if found_list:
|
|
return found_list[0].user
|
|
|
|
return None
|
|
|
|
|
|
def create_confirm_email_code(user):
|
|
code = EmailConfirmation.create(user=user, email_confirm=True)
|
|
return code
|
|
|
|
|
|
def confirm_user_email(code):
|
|
code = EmailConfirmation.get(EmailConfirmation.code == code,
|
|
EmailConfirmation.email_confirm == True)
|
|
|
|
user = code.user
|
|
user.verified = True
|
|
user.save()
|
|
|
|
code.delete_instance()
|
|
|
|
return user
|
|
|
|
|
|
def create_reset_password_email_code(email):
|
|
try:
|
|
user = User.get(User.email == email)
|
|
except User.DoesNotExist:
|
|
raise InvalidEmailAddressException('Email address was not found.');
|
|
|
|
code = EmailConfirmation.create(user=user, pw_reset=True)
|
|
return code
|
|
|
|
|
|
def validate_reset_code(code):
|
|
try:
|
|
code = EmailConfirmation.get(EmailConfirmation.code == code,
|
|
EmailConfirmation.pw_reset == True)
|
|
except EmailConfirmation.DoesNotExist:
|
|
return None
|
|
|
|
user = code.user
|
|
code.delete_instance()
|
|
|
|
return user
|
|
|
|
|
|
def get_user(username):
|
|
try:
|
|
return User.get(User.username == username, User.organization == False)
|
|
except User.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def get_matching_teams(team_prefix, organization):
|
|
query = Team.select().where(Team.name ** (team_prefix + '%'),
|
|
Team.organization == organization)
|
|
return list(query.limit(10))
|
|
|
|
|
|
def get_matching_users(username_prefix, organization=None):
|
|
Org = User.alias()
|
|
users_no_orgs = (User.username ** (username_prefix + '%') &
|
|
(User.organization == False))
|
|
query = User.select(User.username, Org.username).where(users_no_orgs)
|
|
|
|
if organization:
|
|
with_team = query.join(TeamMember, JOIN_LEFT_OUTER).join(Team,
|
|
JOIN_LEFT_OUTER)
|
|
with_org = with_team.join(Org, JOIN_LEFT_OUTER,
|
|
on=(Org.id == Team.organization))
|
|
query = with_org.where((Org.id == organization) | (Org.id >> None))
|
|
|
|
|
|
class MatchingUserResult(object):
|
|
def __init__(self, *args):
|
|
self.username = args[0]
|
|
if organization:
|
|
self.is_org_member = (args[1] == organization.username)
|
|
else:
|
|
self.is_org_member = None
|
|
|
|
|
|
return (MatchingUserResult(*args) for args in query.tuples().limit(10))
|
|
|
|
|
|
def verify_user(username, password):
|
|
try:
|
|
fetched = User.get(User.username == username)
|
|
except User.DoesNotExist:
|
|
return None
|
|
|
|
if (fetched.password_hash and
|
|
bcrypt.hashpw(password, fetched.password_hash) ==
|
|
fetched.password_hash):
|
|
return fetched
|
|
|
|
# We weren't able to authorize the user
|
|
return None
|
|
|
|
|
|
def get_user_organizations(username):
|
|
UserAlias = User.alias()
|
|
all_teams = User.select().distinct().join(Team).join(TeamMember)
|
|
with_user = all_teams.join(UserAlias, on=(UserAlias.id == TeamMember.user))
|
|
return with_user.where(User.organization == True,
|
|
UserAlias.username == username)
|
|
|
|
|
|
def get_organization(name):
|
|
try:
|
|
return User.get(username=name, organization=True)
|
|
except User.DoesNotExist:
|
|
raise InvalidOrganizationException('Organization does not exist: %s' %
|
|
name)
|
|
|
|
|
|
def get_organization_team(orgname, teamname):
|
|
joined = Team.select().join(User)
|
|
query = joined.where(Team.name == teamname, User.organization == True,
|
|
User.username == orgname).limit(1)
|
|
result = list(query)
|
|
if not result:
|
|
raise InvalidTeamException('Team does not exist: %s/%s', orgname,
|
|
teamname)
|
|
|
|
return result[0]
|
|
|
|
|
|
def get_organization_members_with_teams(organization):
|
|
joined = TeamMember.select().annotate(Team).annotate(User)
|
|
query = joined.where(Team.organization == organization)
|
|
return query
|
|
|
|
|
|
def get_organization_team_members(teamid):
|
|
joined = User.select().join(TeamMember).join(Team)
|
|
query = joined.where(Team.id == teamid)
|
|
return query
|
|
|
|
|
|
def get_organization_member_set(orgname):
|
|
Org = User.alias()
|
|
user_teams = User.select(User.username).join(TeamMember).join(Team)
|
|
with_org = user_teams.join(Org, on=(Org.username == orgname))
|
|
return {user.username for user in with_org}
|
|
|
|
|
|
def get_teams_within_org(organization):
|
|
return Team.select().where(Team.organization == organization)
|
|
|
|
|
|
def get_user_teams_within_org(username, organization):
|
|
joined = Team.select().join(TeamMember).join(User)
|
|
return joined.where(Team.organization == organization,
|
|
User.username == username)
|
|
|
|
|
|
def get_visible_repositories(username=None, include_public=True, limit=None,
|
|
sort=False, namespace=None):
|
|
if not username and not include_public:
|
|
return []
|
|
|
|
query = Repository.select().distinct()
|
|
|
|
if namespace:
|
|
query = query.where(Repository.namespace == namespace)
|
|
|
|
query = query.join(Visibility)
|
|
or_clauses = []
|
|
if include_public:
|
|
or_clauses.append((Visibility.name == 'public'))
|
|
|
|
|
|
if username:
|
|
with_perms = query.switch(Repository).join(RepositoryPermission,
|
|
JOIN_LEFT_OUTER)
|
|
query = with_perms.join(User)
|
|
or_clauses.append(User.username == username)
|
|
|
|
if sort:
|
|
with_images = query.switch(Repository).join(Image, JOIN_LEFT_OUTER)
|
|
query = with_images.order_by(Image.created.desc())
|
|
|
|
if (or_clauses):
|
|
query = query.where(reduce(operator.or_, or_clauses))
|
|
|
|
if limit:
|
|
query = query.limit(limit)
|
|
|
|
return query
|
|
|
|
|
|
def get_matching_repositories(repo_term, username=None):
|
|
namespace_term = repo_term
|
|
name_term = repo_term
|
|
|
|
visible = get_visible_repositories(username)
|
|
|
|
search_clauses = (Repository.name ** ('%' + name_term + '%') |
|
|
Repository.namespace ** ('%' + namespace_term + '%'))
|
|
|
|
# Handle the case where the user has already entered a namespace path.
|
|
if repo_term.find('/') > 0:
|
|
parts = repo_term.split('/', 1)
|
|
namespace_term = '/'.join(parts[:-1])
|
|
name_term = parts[-1]
|
|
|
|
search_clauses = (Repository.name ** ('%' + name_term + '%') &
|
|
Repository.namespace ** ('%' + namespace_term + '%'))
|
|
|
|
final = visible.where(search_clauses).limit(10)
|
|
return list(final)
|
|
|
|
|
|
def change_password(user, new_password):
|
|
if not validate_password(new_password):
|
|
raise InvalidPasswordException(INVALID_PASSWORD_MESSAGE)
|
|
|
|
pw_hash = bcrypt.hashpw(new_password, bcrypt.gensalt())
|
|
user.password_hash = pw_hash
|
|
user.save()
|
|
|
|
|
|
def update_email(user, new_email):
|
|
user.email = new_email
|
|
user.verified = False
|
|
user.save()
|
|
|
|
|
|
def get_all_user_permissions(user):
|
|
select = RepositoryPermission.select(RepositoryPermission, Role, Repository)
|
|
with_role = select.join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
through_user = with_repo.switch(RepositoryPermission).join(User,
|
|
JOIN_LEFT_OUTER)
|
|
as_perm = through_user.switch(RepositoryPermission)
|
|
through_team = as_perm.join(Team, JOIN_LEFT_OUTER).join(TeamMember,
|
|
JOIN_LEFT_OUTER)
|
|
|
|
UserThroughTeam = User.alias()
|
|
with_team_member = through_team.join(UserThroughTeam, JOIN_LEFT_OUTER,
|
|
on=(UserThroughTeam.id ==
|
|
TeamMember.user))
|
|
|
|
return with_team_member.where((User.id == user) |
|
|
(UserThroughTeam.id == user))
|
|
|
|
|
|
def get_org_wide_permissions(user):
|
|
Org = User.alias()
|
|
team_with_role = Team.select(Team, Org, TeamRole).join(TeamRole)
|
|
with_org = team_with_role.switch(Team).join(Org, on=(Team.organization ==
|
|
Org.id))
|
|
with_user = with_org.switch(Team).join(TeamMember).join(User)
|
|
return with_user.where(User.id == user, Org.organization == True)
|
|
|
|
|
|
def get_all_repo_teams(namespace_name, repository_name):
|
|
select = RepositoryPermission.select(Team.name.alias('team_name'),
|
|
Role.name, RepositoryPermission)
|
|
with_team = select.join(Team)
|
|
with_role = with_team.switch(RepositoryPermission).join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
return with_repo.where(Repository.namespace == namespace_name,
|
|
Repository.name == repository_name)
|
|
|
|
|
|
def get_all_repo_users(namespace_name, repository_name):
|
|
select = RepositoryPermission.select(User.username, Role.name,
|
|
RepositoryPermission)
|
|
with_user = select.join(User)
|
|
with_role = with_user.switch(RepositoryPermission).join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
return with_repo.where(Repository.namespace == namespace_name,
|
|
Repository.name == repository_name)
|
|
|
|
|
|
def get_repository(namespace_name, repository_name):
|
|
try:
|
|
return Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
except Repository.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def get_repo_image(namespace_name, repository_name, image_id):
|
|
joined = Image.select().join(Repository)
|
|
query = joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
Image.docker_image_id == image_id).limit(1)
|
|
result = list(query)
|
|
if not result:
|
|
return None
|
|
|
|
return result[0]
|
|
|
|
|
|
def repository_is_public(namespace_name, repository_name):
|
|
joined = Repository.select().join(Visibility)
|
|
query = joined.where(Repository.namespace == namespace_name,
|
|
Repository.name == repository_name,
|
|
Visibility.name == 'public')
|
|
return len(list(query)) > 0
|
|
|
|
|
|
def set_repository_visibility(repo, visibility):
|
|
visibility_obj = Visibility.get(name=visibility)
|
|
if not visibility_obj:
|
|
return
|
|
|
|
repo.visibility = visibility_obj
|
|
repo.save()
|
|
|
|
|
|
def create_repository(namespace, name, owner, visibility='private'):
|
|
private = Visibility.get(name=visibility)
|
|
repo = Repository.create(namespace=namespace, name=name,
|
|
visibility=private)
|
|
admin = Role.get(name='admin')
|
|
|
|
if owner and not owner.organization:
|
|
permission = RepositoryPermission.create(user=owner, repository=repo,
|
|
role=admin)
|
|
return repo
|
|
|
|
|
|
def create_image(docker_image_id, repository):
|
|
new_image = Image.create(docker_image_id=docker_image_id,
|
|
repository=repository)
|
|
return new_image
|
|
|
|
|
|
def set_image_checksum(docker_image_id, repository, checksum):
|
|
fetched = Image.get(Image.docker_image_id == docker_image_id,
|
|
Image.repository == repository)
|
|
fetched.checksum = checksum
|
|
fetched.save()
|
|
return fetched
|
|
|
|
|
|
def set_image_metadata(docker_image_id, namespace_name, repository_name,
|
|
created_date_str, comment, parent=None):
|
|
joined = Image.select().join(Repository)
|
|
image_list = list(joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
Image.docker_image_id == docker_image_id))
|
|
|
|
if not image_list:
|
|
raise DataModelException('No image with specified id and repository')
|
|
|
|
fetched = image_list[0]
|
|
fetched.created = dateutil.parser.parse(created_date_str)
|
|
fetched.comment = comment
|
|
|
|
if parent:
|
|
fetched.ancestors = '%s%s/' % (parent.ancestors, parent.id)
|
|
|
|
fetched.save()
|
|
return fetched
|
|
|
|
|
|
def get_repository_images(namespace_name, repository_name):
|
|
joined = Image.select().join(Repository)
|
|
return joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
|
|
|
|
def list_repository_tags(namespace_name, repository_name):
|
|
select = RepositoryTag.select(RepositoryTag, Image)
|
|
with_repo = select.join(Repository)
|
|
with_image = with_repo.switch(RepositoryTag).join(Image)
|
|
return with_image.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
|
|
|
|
def get_tag_image(namespace_name, repository_name, tag_name):
|
|
joined = Image.select().join(RepositoryTag).join(Repository)
|
|
fetched = list(joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
RepositoryTag.name == tag_name))
|
|
|
|
if not fetched:
|
|
raise DataModelException('Unable to find image for tag.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def get_image_by_id(namespace_name, repository_name, docker_image_id):
|
|
joined = Image.select().join(Repository)
|
|
fetched = list(joined.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
Image.docker_image_id == docker_image_id))
|
|
|
|
if not fetched:
|
|
raise DataModelException('Unable to find image for tag with repo.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def get_parent_images(image_obj):
|
|
""" Returns a list of parent Image objects in chronilogical order. """
|
|
parents = image_obj.ancestors
|
|
parent_db_ids = parents.strip('/').split('/')
|
|
|
|
if parent_db_ids == ['']:
|
|
return []
|
|
|
|
or_clauses = [(Image.id == db_id) for db_id in parent_db_ids]
|
|
parent_images = Image.select().where(reduce(operator.or_, or_clauses))
|
|
id_to_image = {unicode(image.id): image for image in parent_images}
|
|
|
|
return [id_to_image[parent_id] for parent_id in parent_db_ids]
|
|
|
|
|
|
def create_or_update_tag(namespace_name, repository_name, tag_name,
|
|
tag_docker_image_id):
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
image = Image.get(Image.docker_image_id == tag_docker_image_id)
|
|
|
|
try:
|
|
tag = RepositoryTag.get(RepositoryTag.repository == repo,
|
|
RepositoryTag.name == tag_name)
|
|
tag.image = image
|
|
tag.save()
|
|
except RepositoryTag.DoesNotExist:
|
|
tag = RepositoryTag.create(repository=repo, image=image, name=tag_name)
|
|
|
|
return tag
|
|
|
|
|
|
def delete_tag(namespace_name, repository_name, tag_name):
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
tag = RepositoryTag.get(RepositoryTag.repository == repo,
|
|
RepositoryTag.name == tag_name)
|
|
tag.delete_instance()
|
|
|
|
|
|
def delete_all_repository_tags(namespace_name, repository_name):
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
RepositoryTag.delete().where(RepositoryTag.repository == repo)
|
|
|
|
|
|
def __entity_permission_repo_query(entity_id, entity_table,
|
|
entity_id_property, namespace_name,
|
|
repository_name):
|
|
""" This method works for both users and teams. """
|
|
selected = RepositoryPermission.select(entity_table, Repository, Role,
|
|
RepositoryPermission)
|
|
with_user = selected.join(entity_table)
|
|
with_role = with_user.switch(RepositoryPermission).join(Role)
|
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
|
return with_repo.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
entity_id_property == entity_id)
|
|
|
|
|
|
def get_user_reponame_permission(username, namespace_name, repository_name):
|
|
fetched = list(__entity_permission_repo_query(username, User, User.username,
|
|
namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('User does not have permission for repo.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def get_team_reponame_permission(team_name, namespace_name, repository_name):
|
|
fetched = list(__entity_permission_repo_query(team_name, Team, Team.name,
|
|
namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('Team does not have permission for repo.')
|
|
|
|
return fetched[0]
|
|
|
|
|
|
def delete_user_permission(username, namespace_name, repository_name):
|
|
if username == namespace_name:
|
|
raise DataModelException('Namespace owner must always be admin.')
|
|
|
|
fetched = list(__entity_permission_repo_query(username, User, User.username,
|
|
namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('User does not have permission for repo.')
|
|
|
|
fetched[0].delete_instance()
|
|
|
|
|
|
def delete_team_permission(team_name, namespace_name, repository_name):
|
|
fetched = list(__entity_permission_repo_query(team_name, Team, Team.name,
|
|
namespace_name,
|
|
repository_name))
|
|
if not fetched:
|
|
raise DataModelException('Team does not have permission for repo.')
|
|
|
|
fetched[0].delete_instance()
|
|
|
|
|
|
def __set_entity_repo_permission(entity_id, entity_table, entity_id_property,
|
|
permission_entity_property, namespace_name,
|
|
repository_name, role_name):
|
|
entity = entity_table.get(entity_id_property == entity_id)
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
new_role = Role.get(Role.name == role_name)
|
|
|
|
# Fetch any existing permission for this user on the repo
|
|
try:
|
|
entity_attr = getattr(RepositoryPermission, permission_entity_property)
|
|
perm = RepositoryPermission.get(entity_attr == entity,
|
|
RepositoryPermission.repository == repo)
|
|
perm.role = new_role
|
|
perm.save()
|
|
return perm
|
|
except RepositoryPermission.DoesNotExist:
|
|
set_entity_kwargs = {permission_entity_property: entity}
|
|
new_perm = RepositoryPermission.create(repository=repo, role=new_role,
|
|
**set_entity_kwargs)
|
|
return new_perm
|
|
|
|
|
|
def set_user_repo_permission(username, namespace_name, repository_name,
|
|
role_name):
|
|
if username == namespace_name:
|
|
raise DataModelException('Namespace owner must always be admin.')
|
|
|
|
return __set_entity_repo_permission(username, User, User.username, 'user',
|
|
namespace_name, repository_name,
|
|
role_name)
|
|
|
|
|
|
def set_team_repo_permission(team_name, namespace_name, repository_name,
|
|
role_name):
|
|
return __set_entity_repo_permission(team_name, Team, Team.name, 'team',
|
|
namespace_name, repository_name,
|
|
role_name)
|
|
|
|
|
|
def purge_repository(namespace_name, repository_name):
|
|
fetched = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
fetched.delete_instance(recursive=True, delete_nullable=True)
|
|
|
|
|
|
def get_private_repo_count(username):
|
|
joined = Repository.select().join(Visibility)
|
|
return joined.where(Repository.namespace == username,
|
|
Visibility.name == 'private').count()
|
|
|
|
|
|
def create_access_token(repository, role):
|
|
role = Role.get(Role.name == role)
|
|
new_token = AccessToken.create(repository=repository, temporary=True,
|
|
role=role)
|
|
return new_token
|
|
|
|
|
|
def create_delegate_token(namespace_name, repository_name, friendly_name):
|
|
read_only = Role.get(name='read')
|
|
repo = Repository.get(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name)
|
|
new_token = AccessToken.create(repository=repo, role=read_only,
|
|
friendly_name=friendly_name, temporary=False)
|
|
return new_token
|
|
|
|
|
|
def get_repository_delegate_tokens(namespace_name, repository_name):
|
|
selected = AccessToken.select(AccessToken, Role)
|
|
with_repo = selected.join(Repository)
|
|
with_role = with_repo.switch(AccessToken).join(Role)
|
|
return with_role.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name,
|
|
AccessToken.temporary == False)
|
|
|
|
|
|
def get_repo_delegate_token(namespace_name, repository_name, code):
|
|
repo_query = get_repository_delegate_tokens(namespace_name, repository_name)
|
|
found = list(repo_query.where(AccessToken.code == code))
|
|
|
|
if found:
|
|
return found[0]
|
|
else:
|
|
raise InvalidTokenException('Unable to find token with code: %s' % code)
|
|
|
|
|
|
def set_repo_delegate_token_role(namespace_name, repository_name, code, role):
|
|
token = get_repo_delegate_token(namespace_name, repository_name, code)
|
|
|
|
if role != 'read' and role != 'write':
|
|
raise DataModelException('Invalid role for delegate token: %s' % role)
|
|
|
|
new_role = Role.get(Role.name == role)
|
|
token.role = new_role
|
|
token.save()
|
|
|
|
return token
|
|
|
|
|
|
def delete_delegate_token(namespace_name, repository_name, code):
|
|
token = get_repo_delegate_token(namespace_name, repository_name, code)
|
|
token.delete_instance()
|
|
|
|
|
|
def load_token_data(code):
|
|
""" Load the permissions for any token by code. """
|
|
selected = AccessToken.select(AccessToken, Repository, Role)
|
|
with_role = selected.join(Role)
|
|
with_repo = with_role.switch(AccessToken).join(Repository)
|
|
fetched = list(with_repo.where(AccessToken.code == code))
|
|
|
|
if fetched:
|
|
return fetched[0]
|
|
else:
|
|
raise InvalidTokenException('Invalid delegate token code: %s' % code)
|
|
|
|
|
|
def list_repository_builds(namespace_name, repository_name,
|
|
include_inactive=True):
|
|
joined = RepositoryBuild.select().join(Repository)
|
|
filtered = joined
|
|
if not include_inactive:
|
|
filtered = filtered.where(RepositoryBuild.phase != 'error',
|
|
RepositoryBuild.phase != 'complete')
|
|
fetched = list(filtered.where(Repository.name == repository_name,
|
|
Repository.namespace == namespace_name))
|
|
return fetched
|
|
|
|
|
|
def create_repository_build(repo, access_token, resource_key, tag):
|
|
return RepositoryBuild.create(repository=repo, access_token=access_token,
|
|
resource_key=resource_key, tag=tag)
|