311 lines
		
	
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			311 lines
		
	
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import string
 | |
| import logging
 | |
| import json
 | |
| import os
 | |
| 
 | |
| from random import SystemRandom
 | |
| from app import app, avatar, superusers
 | |
| from flask import request
 | |
| 
 | |
| from endpoints.api import (ApiResource, nickname, resource, validate_json_request, request_error,
 | |
|                            log_action, internal_only, NotFound, require_user_admin, format_date,
 | |
|                            InvalidToken, require_scope, format_date, hide_if, show_if, parse_args,
 | |
|                            query_param, abort, require_fresh_login, path_param, verify_not_prod)
 | |
| 
 | |
| from endpoints.api.logs import get_logs
 | |
| 
 | |
| from data import model
 | |
| from auth.permissions import SuperUserPermission
 | |
| from auth.auth_context import get_authenticated_user
 | |
| from util.useremails import send_confirmation_email, send_recovery_email
 | |
| 
 | |
| import features
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| def get_immediate_subdirectories(directory):
 | |
|   return [name for name in os.listdir(directory) if os.path.isdir(os.path.join(directory, name))]
 | |
| 
 | |
| def get_services():
 | |
|   services = set(get_immediate_subdirectories(app.config['SYSTEM_SERVICES_PATH']))
 | |
|   services = services - set(app.config['SYSTEM_SERVICE_BLACKLIST'])
 | |
|   return services
 | |
| 
 | |
| 
 | |
| @resource('/v1/superuser/systemlogs/<service>')
 | |
| @internal_only
 | |
| @show_if(features.SUPER_USERS)
 | |
| class SuperUserGetLogsForService(ApiResource):
 | |
|   """ Resource for fetching the kinds of system logs in the system. """
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('getSystemLogs')
 | |
|   def get(self, service):
 | |
|     """ Returns the logs for the specific service. """
 | |
|     if SuperUserPermission().can():
 | |
|       if not service in get_services():
 | |
|         abort(404)
 | |
| 
 | |
|       try:
 | |
|         with open(app.config['SYSTEM_SERVICE_LOGS_PATH'] % service, 'r') as f:
 | |
|           logs = f.read()
 | |
|       except Exception as ex:
 | |
|         logger.exception('Cannot read logs')
 | |
|         abort(400)
 | |
| 
 | |
|       return {
 | |
|         'logs': logs
 | |
|       }
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
| 
 | |
| @resource('/v1/superuser/systemlogs/')
 | |
| @internal_only
 | |
| @show_if(features.SUPER_USERS)
 | |
| class SuperUserSystemLogServices(ApiResource):
 | |
|   """ Resource for fetching the kinds of system logs in the system. """
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('listSystemLogServices')
 | |
|   def get(self):
 | |
|     """ List the system logs for the current system. """
 | |
|     if SuperUserPermission().can():
 | |
|       return {
 | |
|         'services': list(get_services())
 | |
|       }
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
| 
 | |
| 
 | |
| @resource('/v1/superuser/logs')
 | |
| @internal_only
 | |
| @show_if(features.SUPER_USERS)
 | |
| class SuperUserLogs(ApiResource):
 | |
|   """ Resource for fetching all logs in the system. """
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('listAllLogs')
 | |
|   @parse_args
 | |
|   @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
 | |
|   @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str)
 | |
|   @query_param('performer', 'Username for which to filter logs.', type=str)
 | |
|   def get(self, args):
 | |
|     """ List the usage logs for the current system. """
 | |
|     if SuperUserPermission().can():
 | |
|       performer_name = args['performer']
 | |
|       start_time = args['starttime']
 | |
|       end_time = args['endtime']
 | |
| 
 | |
|       return get_logs(start_time, end_time)
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
| 
 | |
| def user_view(user):
 | |
|   return  {
 | |
|     'username': user.username,
 | |
|     'email': user.email,
 | |
|     'verified': user.verified,
 | |
|     'avatar': avatar.get_data_for_user(user),
 | |
|     'super_user': superusers.is_superuser(user.username)
 | |
|   }
 | |
| 
 | |
| @resource('/v1/superuser/usage/')
 | |
| @internal_only
 | |
| @show_if(features.SUPER_USERS)
 | |
| class UsageInformation(ApiResource):
 | |
|   """ Resource for returning the usage information for enterprise customers. """
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('getSystemUsage')
 | |
|   def get(self):
 | |
|     """ Returns the number of repository handles currently held. """
 | |
|     if SuperUserPermission().can():
 | |
|       return {
 | |
|         'usage': model.get_repository_usage(),
 | |
|         'allowed': app.config.get('MAXIMUM_REPOSITORY_USAGE', 20)
 | |
|       }
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
| 
 | |
| 
 | |
| @resource('/v1/superuser/users/')
 | |
| @internal_only
 | |
| @show_if(features.SUPER_USERS)
 | |
| class SuperUserList(ApiResource):
 | |
|   """ Resource for listing users in the system. """
 | |
|   schemas = {
 | |
|     'CreateInstallUser': {
 | |
|       'id': 'CreateInstallUser',
 | |
|       'description': 'Data for creating a user',
 | |
|       'required': ['username', 'email'],
 | |
|       'properties': {
 | |
|         'username': {
 | |
|           'type': 'string',
 | |
|           'description': 'The username of the user being created'
 | |
|         },
 | |
| 
 | |
|         'email': {
 | |
|           'type': 'string',
 | |
|           'description': 'The email address of the user being created'
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('listAllUsers')
 | |
|   def get(self):
 | |
|     """ Returns a list of all users in the system. """
 | |
|     if SuperUserPermission().can():
 | |
|       users = model.get_active_users()
 | |
|       return {
 | |
|         'users': [user_view(user) for user in users]
 | |
|       }
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
| 
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('createInstallUser')
 | |
|   @validate_json_request('CreateInstallUser')
 | |
|   def post(self):
 | |
|     """ Creates a new user. """
 | |
|     user_information = request.get_json()
 | |
|     if SuperUserPermission().can():
 | |
|       username = user_information['username']
 | |
|       email = user_information['email']
 | |
| 
 | |
|       # Generate a temporary password for the user.
 | |
|       random = SystemRandom()
 | |
|       password =  ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(32)])
 | |
| 
 | |
|       # Create the user.
 | |
|       user = model.create_user(username, password, email, auto_verify=not features.MAILING)
 | |
| 
 | |
|       # If mailing is turned on, send the user a verification email.
 | |
|       if features.MAILING:
 | |
|         confirmation = model.create_confirm_email_code(user)
 | |
|         send_confirmation_email(user.username, user.email, confirmation.code)
 | |
| 
 | |
|       return {
 | |
|         'username': username,
 | |
|         'email': email,
 | |
|         'password': password
 | |
|       }
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
| 
 | |
| @resource('/v1/superusers/users/<username>/sendrecovery')
 | |
| @internal_only
 | |
| @show_if(features.SUPER_USERS)
 | |
| @show_if(features.MAILING)
 | |
| class SuperUserSendRecoveryEmail(ApiResource):
 | |
|   """ Resource for sending a recovery user on behalf of a user. """
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('sendInstallUserRecoveryEmail')
 | |
|   def post(self, username):
 | |
|     if SuperUserPermission().can():
 | |
|       user = model.get_user(username)
 | |
|       if not user or user.organization or user.robot:
 | |
|         abort(404)
 | |
| 
 | |
|       if superusers.is_superuser(username):
 | |
|           abort(403)
 | |
| 
 | |
|       code = model.create_reset_password_email_code(user.email)
 | |
|       send_recovery_email(user.email, code.code)
 | |
|       return {
 | |
|         'email': user.email
 | |
|       }
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
| 
 | |
| @resource('/v1/superuser/users/<username>')
 | |
| @path_param('username', 'The username of the user being managed')
 | |
| @internal_only
 | |
| @show_if(features.SUPER_USERS)
 | |
| class SuperUserManagement(ApiResource):
 | |
|   """ Resource for managing users in the system. """
 | |
|   schemas = {
 | |
|     'UpdateUser': {
 | |
|       'id': 'UpdateUser',
 | |
|       'type': 'object',
 | |
|       'description': 'Description of updates for a user',
 | |
|       'properties': {
 | |
|         'password': {
 | |
|           'type': 'string',
 | |
|           'description': 'The new password for the user',
 | |
|         },
 | |
|         'email': {
 | |
|           'type': 'string',
 | |
|           'description': 'The new e-mail address for the user',
 | |
|         }
 | |
|       },
 | |
|     },
 | |
|   }
 | |
| 
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('getInstallUser')
 | |
|   def get(self, username):
 | |
|     """ Returns information about the specified user. """
 | |
|     if SuperUserPermission().can():
 | |
|       user = model.get_user(username)
 | |
|       if not user or user.organization or user.robot:
 | |
|         abort(404)
 | |
| 
 | |
|       return user_view(user)
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('deleteInstallUser')
 | |
|   def delete(self, username):
 | |
|     """ Deletes the specified user. """
 | |
|     if SuperUserPermission().can():
 | |
|       user = model.get_user(username)
 | |
|       if not user or user.organization or user.robot:
 | |
|         abort(404)
 | |
| 
 | |
|       if superusers.is_superuser(username):
 | |
|           abort(403)
 | |
| 
 | |
|       model.delete_user(user)
 | |
|       return 'Deleted', 204
 | |
| 
 | |
|     abort(403)
 | |
| 
 | |
|   @require_fresh_login
 | |
|   @verify_not_prod
 | |
|   @nickname('changeInstallUser')
 | |
|   @validate_json_request('UpdateUser')
 | |
|   def put(self, username):
 | |
|     """ Updates information about the specified user. """
 | |
|     if SuperUserPermission().can():
 | |
|         user = model.get_user(username)
 | |
|         if not user or user.organization or user.robot:
 | |
|           abort(404)
 | |
| 
 | |
|         if superusers.is_superuser(username):
 | |
|           abort(403)
 | |
| 
 | |
|         user_data = request.get_json()
 | |
|         if 'password' in user_data:
 | |
|           model.change_password(user, user_data['password'])
 | |
| 
 | |
|         if 'email' in user_data:
 | |
|           model.update_email(user, user_data['email'])
 | |
| 
 | |
|         return user_view(user)
 | |
| 
 | |
|     abort(403)
 |