""" Superuser API. """ import string import logging import os from random import SystemRandom from flask import request import features from app import app, avatar, superusers, authentication, config_provider from endpoints.api import (ApiResource, nickname, resource, validate_json_request, internal_only, require_scope, show_if, parse_args, query_param, abort, require_fresh_login, path_param, verify_not_prod, page_support) from endpoints.api.logs import get_logs, get_aggregate_logs from data import model from auth.permissions import SuperUserPermission from auth import scopes from util.useremails import send_confirmation_email, send_recovery_email logger = logging.getLogger(__name__) def get_immediate_subdirectories(directory): return [name for name in os.listdir(directory) if os.path.isdir(os.path.join(directory, name))] def get_services(): services = set(get_immediate_subdirectories(app.config['SYSTEM_SERVICES_PATH'])) services = services - set(app.config['SYSTEM_SERVICE_BLACKLIST']) return services @resource('/v1/superuser/systemlogs/') @internal_only @show_if(features.SUPER_USERS) class SuperUserGetLogsForService(ApiResource): """ Resource for fetching the kinds of system logs in the system. """ @require_fresh_login @verify_not_prod @nickname('getSystemLogs') @require_scope(scopes.SUPERUSER) def get(self, service): """ Returns the logs for the specific service. """ if SuperUserPermission().can(): if not service in get_services(): abort(404) logs = [] try: with open(app.config['SYSTEM_LOGS_FILE'], 'r') as f: logs = [line for line in f if line.find(service + '[') >= 0] except Exception: logger.exception('Cannot read logs') abort(400) return { 'logs': '\n'.join(logs) } abort(403) @resource('/v1/superuser/systemlogs/') @internal_only @show_if(features.SUPER_USERS) class SuperUserSystemLogServices(ApiResource): """ Resource for fetching the kinds of system logs in the system. """ @require_fresh_login @verify_not_prod @nickname('listSystemLogServices') @require_scope(scopes.SUPERUSER) def get(self): """ List the system logs for the current system. """ if SuperUserPermission().can(): return { 'services': list(get_services()) } abort(403) @resource('/v1/superuser/aggregatelogs') @internal_only class SuperUserAggregateLogs(ApiResource): """ Resource for fetching aggregated logs for the current user. """ @require_fresh_login @verify_not_prod @nickname('listAllAggregateLogs') @parse_args() @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) def get(self, parsed_args): """ Returns the aggregated logs for the current system. """ if SuperUserPermission().can(): start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] return get_aggregate_logs(start_time, end_time) abort(403) @resource('/v1/superuser/logs') @internal_only @show_if(features.SUPER_USERS) class SuperUserLogs(ApiResource): """ Resource for fetching all logs in the system. """ @require_fresh_login @verify_not_prod @nickname('listAllLogs') @parse_args() @query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str) @query_param('page', 'The page number for the logs', type=int, default=1) @page_support() @require_scope(scopes.SUPERUSER) def get(self, parsed_args, page_token): """ List the usage logs for the current system. """ if SuperUserPermission().can(): start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] return get_logs(start_time, end_time, page_token=page_token) abort(403) def org_view(org): return { 'name': org.username, 'email': org.email, 'avatar': avatar.get_data_for_org(org), } def user_view(user, password=None): user_data = { 'username': user.username, 'email': user.email, 'verified': user.verified, 'avatar': avatar.get_data_for_user(user), 'super_user': superusers.is_superuser(user.username), 'enabled': user.enabled, } if password is not None: user_data['encrypted_password'] = authentication.encrypt_user_password(password) return user_data @resource('/v1/superuser/changelog/') @internal_only @show_if(features.SUPER_USERS) class ChangeLog(ApiResource): """ Resource for returning the change log for enterprise customers. """ @require_fresh_login @verify_not_prod @nickname('getChangeLog') @require_scope(scopes.SUPERUSER) def get(self): """ Returns the change log for this installation. """ if SuperUserPermission().can(): with open('CHANGELOG.md', 'r') as f: return { 'log': f.read() } abort(403) @resource('/v1/superuser/organizations/') @internal_only @show_if(features.SUPER_USERS) class SuperUserOrganizationList(ApiResource): """ Resource for listing organizations in the system. """ @require_fresh_login @verify_not_prod @nickname('listAllOrganizations') @require_scope(scopes.SUPERUSER) def get(self): """ Returns a list of all organizations in the system. """ if SuperUserPermission().can(): orgs = model.organization.get_organizations() return { 'organizations': [org_view(org) for org in orgs] } abort(403) @resource('/v1/superuser/users/') @internal_only @show_if(features.SUPER_USERS) class SuperUserList(ApiResource): """ Resource for listing users in the system. """ schemas = { 'CreateInstallUser': { 'id': 'CreateInstallUser', 'description': 'Data for creating a user', 'required': ['username', 'email'], 'properties': { 'username': { 'type': 'string', 'description': 'The username of the user being created' }, 'email': { 'type': 'string', 'description': 'The email address of the user being created' } } } } @require_fresh_login @verify_not_prod @nickname('listAllUsers') @require_scope(scopes.SUPERUSER) def get(self): """ Returns a list of all users in the system. """ if SuperUserPermission().can(): users = model.user.get_active_users() return { 'users': [user_view(user) for user in users] } abort(403) @require_fresh_login @verify_not_prod @nickname('createInstallUser') @validate_json_request('CreateInstallUser') @require_scope(scopes.SUPERUSER) def post(self): """ Creates a new user. """ # Ensure that we are using database auth. if app.config['AUTHENTICATION_TYPE'] != 'Database': abort(400) user_information = request.get_json() if SuperUserPermission().can(): username = user_information['username'] email = user_information['email'] # Generate a temporary password for the user. random = SystemRandom() password = ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(32)]) # Create the user. user = model.user.create_user(username, password, email, auto_verify=not features.MAILING) # If mailing is turned on, send the user a verification email. if features.MAILING: confirmation = model.user.create_confirm_email_code(user) send_confirmation_email(user.username, user.email, confirmation.code) return { 'username': username, 'email': email, 'password': password, 'encrypted_password': authentication.encrypt_user_password(password), } abort(403) @resource('/v1/superusers/users//sendrecovery') @internal_only @show_if(features.SUPER_USERS) @show_if(features.MAILING) class SuperUserSendRecoveryEmail(ApiResource): """ Resource for sending a recovery user on behalf of a user. """ @require_fresh_login @verify_not_prod @nickname('sendInstallUserRecoveryEmail') @require_scope(scopes.SUPERUSER) def post(self, username): # Ensure that we are using database auth. if app.config['AUTHENTICATION_TYPE'] != 'Database': abort(400) if SuperUserPermission().can(): user = model.user.get_nonrobot_user(username) if not user: abort(404) if superusers.is_superuser(username): abort(403) code = model.user.create_reset_password_email_code(user.email) send_recovery_email(user.email, code.code) return { 'email': user.email } abort(403) @resource('/v1/superuser/users/') @path_param('username', 'The username of the user being managed') @internal_only @show_if(features.SUPER_USERS) class SuperUserManagement(ApiResource): """ Resource for managing users in the system. """ schemas = { 'UpdateUser': { 'id': 'UpdateUser', 'type': 'object', 'description': 'Description of updates for a user', 'properties': { 'password': { 'type': 'string', 'description': 'The new password for the user', }, 'email': { 'type': 'string', 'description': 'The new e-mail address for the user', }, 'enabled': { 'type': 'boolean', 'description': 'Whether the user is enabled' } }, }, } @require_fresh_login @verify_not_prod @nickname('getInstallUser') @require_scope(scopes.SUPERUSER) def get(self, username): """ Returns information about the specified user. """ if SuperUserPermission().can(): user = model.user.get_nonrobot_user(username) if not user: abort(404) return user_view(user) abort(403) @require_fresh_login @verify_not_prod @nickname('deleteInstallUser') @require_scope(scopes.SUPERUSER) def delete(self, username): """ Deletes the specified user. """ if SuperUserPermission().can(): user = model.user.get_nonrobot_user(username) if not user: abort(404) if superusers.is_superuser(username): abort(403) model.user.delete_user(user) return 'Deleted', 204 abort(403) @require_fresh_login @verify_not_prod @nickname('changeInstallUser') @validate_json_request('UpdateUser') @require_scope(scopes.SUPERUSER) def put(self, username): """ Updates information about the specified user. """ if SuperUserPermission().can(): user = model.user.get_nonrobot_user(username) if not user: abort(404) if superusers.is_superuser(username): abort(403) user_data = request.get_json() if 'password' in user_data: # Ensure that we are using database auth. if app.config['AUTHENTICATION_TYPE'] != 'Database': abort(400) model.user.change_password(user, user_data['password']) if 'email' in user_data: # Ensure that we are using database auth. if app.config['AUTHENTICATION_TYPE'] != 'Database': abort(400) model.user.update_email(user, user_data['email'], auto_verify=True) if 'enabled' in user_data: # Disable/enable the user. user.enabled = bool(user_data['enabled']) user.save() if 'superuser' in user_data: config_object = config_provider.get_config() superusers_set = set(config_object['SUPER_USERS']) if user_data['superuser']: superusers_set.add(username) elif username in superusers_set: superusers_set.remove(username) config_object['SUPER_USERS'] = list(superusers_set) config_provider.save_config(config_object) return user_view(user, password=user_data.get('password')) abort(403) @resource('/v1/superuser/organizations/') @path_param('name', 'The name of the organizaton being managed') @internal_only @show_if(features.SUPER_USERS) class SuperUserOrganizationManagement(ApiResource): """ Resource for managing organizations in the system. """ schemas = { 'UpdateOrg': { 'id': 'UpdateOrg', 'type': 'object', 'description': 'Description of updates for an organization', 'properties': { 'name': { 'type': 'string', 'description': 'The new name for the organization', } }, }, } @require_fresh_login @verify_not_prod @nickname('deleteOrganization') @require_scope(scopes.SUPERUSER) def delete(self, name): """ Deletes the specified organization. """ if SuperUserPermission().can(): org = model.organization.get_organization(name) model.user.delete_user(org) return 'Deleted', 204 abort(403) @require_fresh_login @verify_not_prod @nickname('changeOrganization') @validate_json_request('UpdateOrg') @require_scope(scopes.SUPERUSER) def put(self, name): """ Updates information about the specified user. """ if SuperUserPermission().can(): org = model.organization.get_organization(name) org_data = request.get_json() if 'name' in org_data: org = model.user.change_username(org.id, org_data['name']) return org_view(org) abort(403)