diff --git a/data/model.py b/data/model.py index 2e0a9241f..9fcc608ea 100644 --- a/data/model.py +++ b/data/model.py @@ -112,15 +112,39 @@ def create_team(name, org, team_role_name, description=''): description=description) -def remove_team(org_name, team_name): - found = list(Team.select().join(User).where(User.organization == True, - User.username == org_name, - Team.name == team_name)) +def __get_user_admin_teams(org_name, team_name, username): + Org = User.alias() + user_teams = Team.select().join(TeamMember).join(User) + with_org = user_teams.switch(Team).join(Org, + on=(Org.id == Team.organization)) + with_role = with_org.switch(Team).join(TeamRole) + admin_teams = with_role.where(User.username == username, + Org.username == org_name, + TeamRole.name == 'admin') + return admin_teams + + +def remove_team(org_name, team_name, removed_by_username): + joined = Team.select(Team, TeamRole).join(User).switch(Team).join(TeamRole) + + found = list(joined.where(User.organization == True, + User.username == org_name, + Team.name == team_name)) if not found: - raise InvalidTeamException('Team named: %s is not a team in org: %s' % + raise InvalidTeamException('Team \'%s\' is not a team in org \'%s\'' % (team_name, org_name)) team = found[0] + if team.role.name == 'admin': + admin_teams = list(__get_user_admin_teams(org_name, team_name, + removed_by_username)) + + if len(admin_teams) <= 1: + # The team we are trying to remove is the only admin team for this user + msg = ('Deleting team \'%s\' would remove all admin from user \'%s\'' % + (team_name, removed_by_username)) + raise DataModelException(msg) + team.delete_instance(recursive=True, delete_nullable=True) @@ -128,18 +152,47 @@ def add_user_to_team(user, team): return TeamMember.create(user=user, team=team) -def remove_user_from_team(user, team): - try: - found = TeamMember.get(user=user, team=team) - found.delete_instance() - except TeamMember.DoesNotExist: - raise InvalidTeamException('User does not belong to team.') +def remove_user_from_team(org_name, team_name, username, removed_by_username): + Org = User.alias() + joined = TeamMember.select().join(User).switch(TeamMember).join(Team) + with_role = joined.join(TeamRole) + with_org = with_role.switch(Team).join(Org, + on=(Org.id == Team.organization)) + found = list(with_org.where(User.username == username, + Org.username == org_name, + Team.name == team_name)) + + if not found: + raise DataModelException('User %s does not belong to team %s' % + (username, teamname)) + + if username == removed_by_username: + admin_team_query = __get_user_admin_teams(org_name, team_name, username) + admin_team_names = {team.name for team in admin_team_query} + if team_name in admin_team_names and len(admin_team_names) <= 1: + msg = 'User cannot remove themselves from their only admin team.' + raise DataModelException(msg) + + user_in_team = found[0] + user_in_team.delete_instance() def get_team_org_role(team): return TeamRole.get(TeamRole.id == team.role.id) -def set_team_org_permission(team, team_role_name): + +def set_team_org_permission(team, team_role_name, set_by_username): + if team.role.name == 'admin' and team_role_name != 'admin': + # We need to make sure we're not removing the users only admin role + user_admin_teams = __get_user_admin_teams(team.organization.username, + team.name, set_by_username) + admin_team_set = {admin_team.name for admin_team in user_admin_teams} + if team.name in admin_team_set and len(admin_team_set) <= 1: + msg = (('Cannot remove admin from team \'%s\' because calling user ' + + 'would no longer have admin on org \'%s\'') % + (team.name, team.organization.username)) + raise DataModelException(msg) + new_role = TeamRole.get(TeamRole.name == team_role_name) team.role = new_role team.save() diff --git a/endpoints/api.py b/endpoints/api.py index 9e228ddb5..e13de3e8e 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -341,9 +341,13 @@ def update_organization_team(orgname, teamname): team.description = json['description'] team.save() if 'role' in json: - team = model.set_team_org_permission(team, json['role']) + team = model.set_team_org_permission(team, json['role'], + current_user.db_user().username) - return jsonify(team_view(orgname, team)) + resp = jsonify(team_view(orgname, team)) + if not is_existing: + resp.status_code = 201 + return resp abort(403) @@ -353,7 +357,7 @@ def update_organization_team(orgname, teamname): def delete_organization_team(orgname, teamname): permission = AdministerOrganizationPermission(orgname) if permission.can(): - model.remove_team(orgname, teamname) + model.remove_team(orgname, teamname, current_user.db_user().username) return make_response('Deleted', 204) abort(403) @@ -415,22 +419,9 @@ def update_organization_team_member(orgname, teamname, membername): def delete_organization_team_member(orgname, teamname, membername): permission = AdministerOrganizationPermission(orgname) if permission.can(): - team = None - user = None - - # Find the team. - try: - team = model.get_organization_team(orgname, teamname) - except: - abort(404) - - # Find the user. - user = model.get_user(membername) - if not user: - abort(400) - # Remote the user from the team. - model.remove_user_from_team(user, team) + invoking_user = current_user.db_user().username + model.remove_user_from_team(orgname, teamname, membername, invoking_user) return make_response('Deleted', 204) abort(403)