253 lines
9.6 KiB
Python
253 lines
9.6 KiB
Python
""" Manage user and organization robot accounts. """
|
|
|
|
from endpoints.api import (resource, nickname, ApiResource, log_action, related_user_resource,
|
|
Unauthorized, require_user_admin, internal_only, require_scope,
|
|
path_param, parse_args, truthy_bool, query_param)
|
|
from auth.permissions import AdministerOrganizationPermission, OrganizationMemberPermission
|
|
from auth.auth_context import get_authenticated_user
|
|
from auth import scopes
|
|
from data import model
|
|
from data.database import User, Team, Repository, FederatedLogin
|
|
from util.names import format_robot_username
|
|
from flask import abort
|
|
from app import avatar
|
|
|
|
def robot_view(name, token):
|
|
return {
|
|
'name': name,
|
|
'token': token
|
|
}
|
|
|
|
|
|
def permission_view(permission):
|
|
return {
|
|
'repository': {
|
|
'name': permission.repository.name,
|
|
'is_public': permission.repository.visibility.name == 'public'
|
|
},
|
|
'role': permission.role.name
|
|
}
|
|
|
|
|
|
def robots_list(prefix, include_permissions=False):
|
|
tuples = model.list_entity_robot_permission_teams(prefix, include_permissions=include_permissions)
|
|
|
|
robots = {}
|
|
robot_teams = set()
|
|
|
|
for robot_tuple in tuples:
|
|
robot_name = robot_tuple.get(User.username)
|
|
if not robot_name in robots:
|
|
robots[robot_name] = {
|
|
'name': robot_name,
|
|
'token': robot_tuple.get(FederatedLogin.service_ident)
|
|
}
|
|
|
|
if include_permissions:
|
|
robots[robot_name].update({
|
|
'teams': [],
|
|
'repositories': []
|
|
})
|
|
|
|
if include_permissions:
|
|
team_name = robot_tuple.get(Team.name)
|
|
repository_name = robot_tuple.get(Repository.name)
|
|
|
|
if team_name is not None:
|
|
check_key = robot_name + ':' + team_name
|
|
if not check_key in robot_teams:
|
|
robot_teams.add(check_key)
|
|
|
|
robots[robot_name]['teams'].append({
|
|
'name': team_name,
|
|
'avatar': avatar.get_data(team_name, team_name, 'team')
|
|
})
|
|
|
|
if repository_name is not None:
|
|
if not repository_name in robots[robot_name]['repositories']:
|
|
robots[robot_name]['repositories'].append(repository_name)
|
|
|
|
return {'robots': robots.values()}
|
|
|
|
@resource('/v1/user/robots')
|
|
class UserRobotList(ApiResource):
|
|
""" Resource for listing user robots. """
|
|
@require_user_admin
|
|
@nickname('getUserRobots')
|
|
@parse_args
|
|
@query_param('permissions',
|
|
'Whether to include repostories and teams in which the robots have permission.',
|
|
type=truthy_bool, default=False)
|
|
def get(self, args):
|
|
""" List the available robots for the user. """
|
|
user = get_authenticated_user()
|
|
return robots_list(user.username, include_permissions=args.get('permissions', False))
|
|
|
|
|
|
@resource('/v1/user/robots/<robot_shortname>')
|
|
@path_param('robot_shortname', 'The short name for the robot, without any user or organization prefix')
|
|
class UserRobot(ApiResource):
|
|
""" Resource for managing a user's robots. """
|
|
@require_user_admin
|
|
@nickname('getUserRobot')
|
|
def get(self, robot_shortname):
|
|
""" Returns the user's robot with the specified name. """
|
|
parent = get_authenticated_user()
|
|
robot, password = model.get_robot(robot_shortname, parent)
|
|
return robot_view(robot.username, password)
|
|
|
|
@require_user_admin
|
|
@nickname('createUserRobot')
|
|
def put(self, robot_shortname):
|
|
""" Create a new user robot with the specified name. """
|
|
parent = get_authenticated_user()
|
|
robot, password = model.create_robot(robot_shortname, parent)
|
|
log_action('create_robot', parent.username, {'robot': robot_shortname})
|
|
return robot_view(robot.username, password), 201
|
|
|
|
@require_user_admin
|
|
@nickname('deleteUserRobot')
|
|
def delete(self, robot_shortname):
|
|
""" Delete an existing robot. """
|
|
parent = get_authenticated_user()
|
|
model.delete_robot(format_robot_username(parent.username, robot_shortname))
|
|
log_action('delete_robot', parent.username, {'robot': robot_shortname})
|
|
return 'Deleted', 204
|
|
|
|
|
|
@resource('/v1/organization/<orgname>/robots')
|
|
@path_param('orgname', 'The name of the organization')
|
|
@related_user_resource(UserRobotList)
|
|
class OrgRobotList(ApiResource):
|
|
""" Resource for listing an organization's robots. """
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname('getOrgRobots')
|
|
@parse_args
|
|
@query_param('permissions',
|
|
'Whether to include repostories and teams in which the robots have permission.',
|
|
type=truthy_bool, default=False)
|
|
def get(self, args, orgname):
|
|
""" List the organization's robots. """
|
|
permission = OrganizationMemberPermission(orgname)
|
|
if permission.can():
|
|
return robots_list(orgname, include_permissions=args.get('permissions', False))
|
|
|
|
raise Unauthorized()
|
|
|
|
|
|
@resource('/v1/organization/<orgname>/robots/<robot_shortname>')
|
|
@path_param('orgname', 'The name of the organization')
|
|
@path_param('robot_shortname', 'The short name for the robot, without any user or organization prefix')
|
|
@related_user_resource(UserRobot)
|
|
class OrgRobot(ApiResource):
|
|
""" Resource for managing an organization's robots. """
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname('getOrgRobot')
|
|
def get(self, orgname, robot_shortname):
|
|
""" Returns the organization's robot with the specified name. """
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can():
|
|
parent = model.get_organization(orgname)
|
|
robot, password = model.get_robot(robot_shortname, parent)
|
|
return robot_view(robot.username, password)
|
|
|
|
raise Unauthorized()
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname('createOrgRobot')
|
|
def put(self, orgname, robot_shortname):
|
|
""" Create a new robot in the organization. """
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can():
|
|
parent = model.get_organization(orgname)
|
|
robot, password = model.create_robot(robot_shortname, parent)
|
|
log_action('create_robot', orgname, {'robot': robot_shortname})
|
|
return robot_view(robot.username, password), 201
|
|
|
|
raise Unauthorized()
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname('deleteOrgRobot')
|
|
def delete(self, orgname, robot_shortname):
|
|
""" Delete an existing organization robot. """
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can():
|
|
model.delete_robot(format_robot_username(orgname, robot_shortname))
|
|
log_action('delete_robot', orgname, {'robot': robot_shortname})
|
|
return 'Deleted', 204
|
|
|
|
raise Unauthorized()
|
|
|
|
|
|
@resource('/v1/user/robots/<robot_shortname>/permissions')
|
|
@path_param('robot_shortname', 'The short name for the robot, without any user or organization prefix')
|
|
class UserRobotPermissions(ApiResource):
|
|
""" Resource for listing the permissions a user's robot has in the system. """
|
|
@require_user_admin
|
|
@nickname('getUserRobotPermissions')
|
|
def get(self, robot_shortname):
|
|
""" Returns the list of repository permissions for the user's robot. """
|
|
parent = get_authenticated_user()
|
|
robot, password = model.get_robot(robot_shortname, parent)
|
|
permissions = model.list_robot_permissions(robot.username)
|
|
|
|
return {
|
|
'permissions': [permission_view(permission) for permission in permissions]
|
|
}
|
|
|
|
|
|
@resource('/v1/organization/<orgname>/robots/<robot_shortname>/permissions')
|
|
@path_param('orgname', 'The name of the organization')
|
|
@path_param('robot_shortname', 'The short name for the robot, without any user or organization prefix')
|
|
@related_user_resource(UserRobotPermissions)
|
|
class OrgRobotPermissions(ApiResource):
|
|
""" Resource for listing the permissions an org's robot has in the system. """
|
|
@require_user_admin
|
|
@nickname('getOrgRobotPermissions')
|
|
def get(self, orgname, robot_shortname):
|
|
""" Returns the list of repository permissions for the org's robot. """
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can():
|
|
parent = model.get_organization(orgname)
|
|
robot, password = model.get_robot(robot_shortname, parent)
|
|
permissions = model.list_robot_permissions(robot.username)
|
|
|
|
return {
|
|
'permissions': [permission_view(permission) for permission in permissions]
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/user/robots/<robot_shortname>/regenerate')
|
|
@path_param('robot_shortname', 'The short name for the robot, without any user or organization prefix')
|
|
class RegenerateUserRobot(ApiResource):
|
|
""" Resource for regenerate an organization's robot's token. """
|
|
@require_user_admin
|
|
@nickname('regenerateUserRobotToken')
|
|
def post(self, robot_shortname):
|
|
""" Regenerates the token for a user's robot. """
|
|
parent = get_authenticated_user()
|
|
robot, password = model.regenerate_robot_token(robot_shortname, parent)
|
|
log_action('regenerate_robot_token', parent.username, {'robot': robot_shortname})
|
|
return robot_view(robot.username, password)
|
|
|
|
|
|
@resource('/v1/organization/<orgname>/robots/<robot_shortname>/regenerate')
|
|
@path_param('orgname', 'The name of the organization')
|
|
@path_param('robot_shortname', 'The short name for the robot, without any user or organization prefix')
|
|
@related_user_resource(RegenerateUserRobot)
|
|
class RegenerateOrgRobot(ApiResource):
|
|
""" Resource for regenerate an organization's robot's token. """
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname('regenerateOrgRobotToken')
|
|
def post(self, orgname, robot_shortname):
|
|
""" Regenerates the token for an organization robot. """
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can():
|
|
parent = model.get_organization(orgname)
|
|
robot, password = model.regenerate_robot_token(robot_shortname, parent)
|
|
log_action('regenerate_robot_token', orgname, {'robot': robot_shortname})
|
|
return robot_view(robot.username, password)
|
|
|
|
raise Unauthorized()
|