261 lines
		
	
	
	
		
			9.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			261 lines
		
	
	
	
		
			9.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ Manage user and organization robot accounts. """
 | |
| 
 | |
| from endpoints.api import (resource, nickname, ApiResource, log_action, related_user_resource,
 | |
|                            require_user_admin, require_scope, path_param, parse_args,
 | |
|                            truthy_bool, query_param)
 | |
| from endpoints.exception import Unauthorized
 | |
| from auth.permissions import AdministerOrganizationPermission, OrganizationMemberPermission
 | |
| from auth.auth_context import get_authenticated_user
 | |
| from auth import scopes
 | |
| from data import model
 | |
| from data.database import User, Team, Repository, FederatedLogin
 | |
| from util.names import format_robot_username
 | |
| from flask import abort
 | |
| from app import avatar
 | |
| 
 | |
| def robot_view(name, token):
 | |
|   return {
 | |
|     'name': name,
 | |
|     'token': token
 | |
|   }
 | |
| 
 | |
| 
 | |
| def permission_view(permission):
 | |
|   return {
 | |
|     'repository': {
 | |
|       'name': permission.repository.name,
 | |
|       'is_public': permission.repository.visibility.name == 'public'
 | |
|     },
 | |
|     'role': permission.role.name
 | |
|   }
 | |
| 
 | |
| 
 | |
| def robots_list(prefix, include_permissions=False):
 | |
|   tuples = model.user.list_entity_robot_permission_teams(prefix,
 | |
|                                                          include_permissions=include_permissions)
 | |
| 
 | |
|   robots = {}
 | |
|   robot_teams = set()
 | |
| 
 | |
|   for robot_tuple in tuples:
 | |
|     robot_name = robot_tuple.get(User.username)
 | |
|     if not robot_name in robots:
 | |
|       robots[robot_name] = {
 | |
|         'name': robot_name,
 | |
|         'token': robot_tuple.get(FederatedLogin.service_ident)
 | |
|       }
 | |
| 
 | |
|       if include_permissions:
 | |
|         robots[robot_name].update({
 | |
|           'teams': [],
 | |
|           'repositories': []
 | |
|         })
 | |
| 
 | |
|     if include_permissions:
 | |
|       team_name = robot_tuple.get(Team.name)
 | |
|       repository_name = robot_tuple.get(Repository.name)
 | |
| 
 | |
|       if team_name is not None:
 | |
|         check_key = robot_name + ':' + team_name
 | |
|         if not check_key in robot_teams:
 | |
|           robot_teams.add(check_key)
 | |
| 
 | |
|           robots[robot_name]['teams'].append({
 | |
|             'name': team_name,
 | |
|             'avatar': avatar.get_data(team_name, team_name, 'team')
 | |
|           })
 | |
| 
 | |
|       if repository_name is not None:
 | |
|         if not repository_name in robots[robot_name]['repositories']:
 | |
|           robots[robot_name]['repositories'].append(repository_name)
 | |
| 
 | |
|   return {'robots': robots.values()}
 | |
| 
 | |
| @resource('/v1/user/robots')
 | |
| class UserRobotList(ApiResource):
 | |
|   """ Resource for listing user robots. """
 | |
|   @require_user_admin
 | |
|   @nickname('getUserRobots')
 | |
|   @parse_args()
 | |
|   @query_param('permissions',
 | |
|                'Whether to include repostories and teams in which the robots have permission.',
 | |
|                type=truthy_bool, default=False)
 | |
|   def get(self, parsed_args):
 | |
|     """ List the available robots for the user. """
 | |
|     user = get_authenticated_user()
 | |
|     return robots_list(user.username, include_permissions=parsed_args.get('permissions', False))
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/robots/<robot_shortname>')
 | |
| @path_param('robot_shortname',
 | |
|             'The short name for the robot, without any user or organization prefix')
 | |
| class UserRobot(ApiResource):
 | |
|   """ Resource for managing a user's robots. """
 | |
|   @require_user_admin
 | |
|   @nickname('getUserRobot')
 | |
|   def get(self, robot_shortname):
 | |
|     """ Returns the user's robot with the specified name. """
 | |
|     parent = get_authenticated_user()
 | |
|     robot, password = model.user.get_robot(robot_shortname, parent)
 | |
|     return robot_view(robot.username, password)
 | |
| 
 | |
|   @require_user_admin
 | |
|   @nickname('createUserRobot')
 | |
|   def put(self, robot_shortname):
 | |
|     """ Create a new user robot with the specified name. """
 | |
|     parent = get_authenticated_user()
 | |
|     robot, password = model.user.create_robot(robot_shortname, parent)
 | |
|     log_action('create_robot', parent.username, {'robot': robot_shortname})
 | |
|     return robot_view(robot.username, password), 201
 | |
| 
 | |
|   @require_user_admin
 | |
|   @nickname('deleteUserRobot')
 | |
|   def delete(self, robot_shortname):
 | |
|     """ Delete an existing robot. """
 | |
|     parent = get_authenticated_user()
 | |
|     model.user.delete_robot(format_robot_username(parent.username, robot_shortname))
 | |
|     log_action('delete_robot', parent.username, {'robot': robot_shortname})
 | |
|     return 'Deleted', 204
 | |
| 
 | |
| 
 | |
| @resource('/v1/organization/<orgname>/robots')
 | |
| @path_param('orgname', 'The name of the organization')
 | |
| @related_user_resource(UserRobotList)
 | |
| class OrgRobotList(ApiResource):
 | |
|   """ Resource for listing an organization's robots. """
 | |
|   @require_scope(scopes.ORG_ADMIN)
 | |
|   @nickname('getOrgRobots')
 | |
|   @parse_args()
 | |
|   @query_param('permissions',
 | |
|                'Whether to include repostories and teams in which the robots have permission.',
 | |
|                type=truthy_bool, default=False)
 | |
|   def get(self, orgname, parsed_args):
 | |
|     """ List the organization's robots. """
 | |
|     permission = OrganizationMemberPermission(orgname)
 | |
|     if permission.can():
 | |
|       return robots_list(orgname, include_permissions=parsed_args.get('permissions', False))
 | |
| 
 | |
|     raise Unauthorized()
 | |
| 
 | |
| 
 | |
| @resource('/v1/organization/<orgname>/robots/<robot_shortname>')
 | |
| @path_param('orgname', 'The name of the organization')
 | |
| @path_param('robot_shortname',
 | |
|             'The short name for the robot, without any user or organization prefix')
 | |
| @related_user_resource(UserRobot)
 | |
| class OrgRobot(ApiResource):
 | |
|   """ Resource for managing an organization's robots. """
 | |
|   @require_scope(scopes.ORG_ADMIN)
 | |
|   @nickname('getOrgRobot')
 | |
|   def get(self, orgname, robot_shortname):
 | |
|     """ Returns the organization's robot with the specified name. """
 | |
|     permission = AdministerOrganizationPermission(orgname)
 | |
|     if permission.can():
 | |
|       parent = model.organization.get_organization(orgname)
 | |
|       robot, password = model.user.get_robot(robot_shortname, parent)
 | |
|       return robot_view(robot.username, password)
 | |
| 
 | |
|     raise Unauthorized()
 | |
| 
 | |
|   @require_scope(scopes.ORG_ADMIN)
 | |
|   @nickname('createOrgRobot')
 | |
|   def put(self, orgname, robot_shortname):
 | |
|     """ Create a new robot in the organization. """
 | |
|     permission = AdministerOrganizationPermission(orgname)
 | |
|     if permission.can():
 | |
|       parent = model.organization.get_organization(orgname)
 | |
|       robot, password = model.user.create_robot(robot_shortname, parent)
 | |
|       log_action('create_robot', orgname, {'robot': robot_shortname})
 | |
|       return robot_view(robot.username, password), 201
 | |
| 
 | |
|     raise Unauthorized()
 | |
| 
 | |
|   @require_scope(scopes.ORG_ADMIN)
 | |
|   @nickname('deleteOrgRobot')
 | |
|   def delete(self, orgname, robot_shortname):
 | |
|     """ Delete an existing organization robot. """
 | |
|     permission = AdministerOrganizationPermission(orgname)
 | |
|     if permission.can():
 | |
|       model.user.delete_robot(format_robot_username(orgname, robot_shortname))
 | |
|       log_action('delete_robot', orgname, {'robot': robot_shortname})
 | |
|       return 'Deleted', 204
 | |
| 
 | |
|     raise Unauthorized()
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/robots/<robot_shortname>/permissions')
 | |
| @path_param('robot_shortname',
 | |
|             'The short name for the robot, without any user or organization prefix')
 | |
| class UserRobotPermissions(ApiResource):
 | |
|   """ Resource for listing the permissions a user's robot has in the system. """
 | |
|   @require_user_admin
 | |
|   @nickname('getUserRobotPermissions')
 | |
|   def get(self, robot_shortname):
 | |
|     """ Returns the list of repository permissions for the user's robot. """
 | |
|     parent = get_authenticated_user()
 | |
|     robot, _ = model.user.get_robot(robot_shortname, parent)
 | |
|     permissions = model.permission.list_robot_permissions(robot.username)
 | |
| 
 | |
|     return {
 | |
|       'permissions': [permission_view(permission) for permission in permissions]
 | |
|     }
 | |
| 
 | |
| 
 | |
| @resource('/v1/organization/<orgname>/robots/<robot_shortname>/permissions')
 | |
| @path_param('orgname', 'The name of the organization')
 | |
| @path_param('robot_shortname',
 | |
|             'The short name for the robot, without any user or organization prefix')
 | |
| @related_user_resource(UserRobotPermissions)
 | |
| class OrgRobotPermissions(ApiResource):
 | |
|   """ Resource for listing the permissions an org's robot has in the system. """
 | |
|   @require_user_admin
 | |
|   @nickname('getOrgRobotPermissions')
 | |
|   def get(self, orgname, robot_shortname):
 | |
|     """ Returns the list of repository permissions for the org's robot. """
 | |
|     permission = AdministerOrganizationPermission(orgname)
 | |
|     if permission.can():
 | |
|       parent = model.organization.get_organization(orgname)
 | |
|       robot, _ = model.user.get_robot(robot_shortname, parent)
 | |
|       permissions = model.permission.list_robot_permissions(robot.username)
 | |
| 
 | |
|       return {
 | |
|         'permissions': [permission_view(permission) for permission in permissions]
 | |
|       }
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/robots/<robot_shortname>/regenerate')
 | |
| @path_param('robot_shortname',
 | |
|             'The short name for the robot, without any user or organization prefix')
 | |
| class RegenerateUserRobot(ApiResource):
 | |
|   """ Resource for regenerate an organization's robot's token. """
 | |
|   @require_user_admin
 | |
|   @nickname('regenerateUserRobotToken')
 | |
|   def post(self, robot_shortname):
 | |
|     """ Regenerates the token for a user's robot. """
 | |
|     parent = get_authenticated_user()
 | |
|     robot, password = model.user.regenerate_robot_token(robot_shortname, parent)
 | |
|     log_action('regenerate_robot_token', parent.username, {'robot': robot_shortname})
 | |
|     return robot_view(robot.username, password)
 | |
| 
 | |
| 
 | |
| @resource('/v1/organization/<orgname>/robots/<robot_shortname>/regenerate')
 | |
| @path_param('orgname', 'The name of the organization')
 | |
| @path_param('robot_shortname',
 | |
|             'The short name for the robot, without any user or organization prefix')
 | |
| @related_user_resource(RegenerateUserRobot)
 | |
| class RegenerateOrgRobot(ApiResource):
 | |
|   """ Resource for regenerate an organization's robot's token. """
 | |
|   @require_scope(scopes.ORG_ADMIN)
 | |
|   @nickname('regenerateOrgRobotToken')
 | |
|   def post(self, orgname, robot_shortname):
 | |
|     """ Regenerates the token for an organization robot. """
 | |
|     permission = AdministerOrganizationPermission(orgname)
 | |
|     if permission.can():
 | |
|       parent = model.organization.get_organization(orgname)
 | |
|       robot, password = model.user.regenerate_robot_token(robot_shortname, parent)
 | |
|       log_action('regenerate_robot_token', orgname, {'robot': robot_shortname})
 | |
|       return robot_view(robot.username, password)
 | |
| 
 | |
|     raise Unauthorized()
 |