This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/model/organization.py
Joseph Schorr 462f47924e More detailed namespace validation
Fixes namespace validation to use the proper regex for checking length, as well as showing the proper messaging if the entered namespace is invalid

[Delivers #137830461]
2017-01-17 17:31:59 -05:00

147 lines
4.9 KiB
Python

from data.database import (User, FederatedLogin, TeamMember, Team, TeamRole, RepositoryPermission,
Repository, Namespace)
from data.model import (user, team, DataModelException, InvalidOrganizationException,
InvalidUsernameException, db_transaction, _basequery)
def create_organization(name, email, creating_user, email_required=True):
try:
# Create the org
new_org = user.create_user_noverify(name, email, email_required=email_required)
new_org.organization = True
new_org.save()
# Create a team for the owners
owners_team = team.create_team('owners', new_org, 'admin')
# Add the user who created the org to the owners team
team.add_user_to_team(creating_user, owners_team)
return new_org
except InvalidUsernameException as iue:
raise InvalidOrganizationException(iue.message)
def get_organization(name):
try:
return User.get(username=name, organization=True)
except User.DoesNotExist:
raise InvalidOrganizationException('Organization does not exist: %s' %
name)
def convert_user_to_organization(user_obj, admin_user):
if user_obj.robot:
raise DataModelException('Cannot convert a robot into an organization')
with db_transaction():
# Change the user to an organization and disable this account for login.
user_obj.organization = True
user_obj.password_hash = None
user_obj.save()
# Clear any federated auth pointing to this user.
FederatedLogin.delete().where(FederatedLogin.user == user_obj).execute()
# Delete any user-specific permissions on repositories.
(RepositoryPermission.delete()
.where(RepositoryPermission.user == user_obj)
.execute())
# Create a team for the owners
owners_team = team.create_team('owners', user_obj, 'admin')
# Add the user who will admin the org to the owners team
team.add_user_to_team(admin_user, owners_team)
return user_obj
def get_user_organizations(username):
return _basequery.get_user_organizations(username)
def get_organization_team_members(teamid):
joined = User.select().join(TeamMember).join(Team)
query = joined.where(Team.id == teamid)
return query
def __get_org_admin_users(org):
return (User
.select()
.join(TeamMember)
.join(Team)
.join(TeamRole)
.where(Team.organization == org, TeamRole.name == 'admin', User.robot == False)
.distinct())
def get_admin_users(org):
""" Returns the owner users for the organization. """
return __get_org_admin_users(org)
def remove_organization_member(org, user_obj):
org_admins = [u.username for u in __get_org_admin_users(org)]
if len(org_admins) == 1 and user_obj.username in org_admins:
raise DataModelException('Cannot remove user as they are the only organization admin')
with db_transaction():
# Find and remove the user from any repositories under the org.
permissions = list(RepositoryPermission
.select(RepositoryPermission.id)
.join(Repository)
.where(Repository.namespace_user == org,
RepositoryPermission.user == user_obj))
if permissions:
RepositoryPermission.delete().where(RepositoryPermission.id << permissions).execute()
# Find and remove the user from any teams under the org.
members = list(TeamMember
.select(TeamMember.id)
.join(Team)
.where(Team.organization == org, TeamMember.user == user_obj))
if members:
TeamMember.delete().where(TeamMember.id << members).execute()
def get_organization_member_set(orgname):
Org = User.alias()
org_users = (User
.select(User.username)
.join(TeamMember)
.join(Team)
.join(Org, on=(Org.id == Team.organization))
.where(Org.username == orgname)
.distinct())
return {user.username for user in org_users}
def get_all_repo_users_transitive_via_teams(namespace_name, repository_name):
return (User
.select()
.distinct()
.join(TeamMember)
.join(Team)
.join(RepositoryPermission)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == namespace_name, Repository.name == repository_name))
def get_organizations():
return User.select().where(User.organization == True, User.robot == False)
def get_active_org_count():
return get_organizations().count()
def add_user_as_admin(user_obj, org_obj):
try:
admin_role = TeamRole.get(name='admin')
admin_team = Team.select().where(Team.role == admin_role, Team.organization == org_obj).get()
team.add_user_to_team(user_obj, admin_team)
except team.UserAlreadyInTeam:
pass