244 lines
6.9 KiB
Python
244 lines
6.9 KiB
Python
import string
|
|
import logging
|
|
import json
|
|
|
|
from random import SystemRandom
|
|
from app import app
|
|
from flask import request
|
|
|
|
from endpoints.api import (ApiResource, nickname, resource, validate_json_request, request_error,
|
|
log_action, internal_only, NotFound, require_user_admin, format_date,
|
|
InvalidToken, require_scope, format_date, hide_if, show_if, parse_args,
|
|
query_param, abort, require_fresh_login, path_param)
|
|
|
|
from endpoints.api.logs import get_logs
|
|
|
|
from data import model
|
|
from auth.permissions import SuperUserPermission
|
|
from auth.auth_context import get_authenticated_user
|
|
from util.useremails import send_confirmation_email, send_recovery_email
|
|
|
|
import features
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@resource('/v1/superuser/logs')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserLogs(ApiResource):
|
|
""" Resource for fetching all logs in the system. """
|
|
@nickname('listAllLogs')
|
|
@parse_args
|
|
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
|
|
@query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str)
|
|
@query_param('performer', 'Username for which to filter logs.', type=str)
|
|
def get(self, args):
|
|
""" List the logs for the current system. """
|
|
if SuperUserPermission().can():
|
|
performer_name = args['performer']
|
|
start_time = args['starttime']
|
|
end_time = args['endtime']
|
|
|
|
return get_logs(start_time, end_time)
|
|
|
|
abort(403)
|
|
|
|
|
|
def user_view(user):
|
|
return {
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'verified': user.verified,
|
|
'super_user': user.username in app.config['SUPER_USERS']
|
|
}
|
|
|
|
@resource('/v1/superuser/usage/')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class UsageInformation(ApiResource):
|
|
""" Resource for returning the usage information for enterprise customers. """
|
|
@require_fresh_login
|
|
@nickname('getSystemUsage')
|
|
def get(self):
|
|
""" Returns the number of repository handles currently held. """
|
|
if SuperUserPermission().can():
|
|
return {
|
|
'usage': model.get_repository_usage(),
|
|
'allowed': app.config.get('MAXIMUM_REPOSITORY_USAGE', 20)
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
|
|
@resource('/v1/superuser/users/')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserList(ApiResource):
|
|
""" Resource for listing users in the system. """
|
|
schemas = {
|
|
'CreateInstallUser': {
|
|
'id': 'CreateInstallUser',
|
|
'description': 'Data for creating a user',
|
|
'required': ['username', 'email'],
|
|
'properties': {
|
|
'username': {
|
|
'type': 'string',
|
|
'description': 'The username of the user being created'
|
|
},
|
|
|
|
'email': {
|
|
'type': 'string',
|
|
'description': 'The email address of the user being created'
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@require_fresh_login
|
|
@nickname('listAllUsers')
|
|
def get(self):
|
|
""" Returns a list of all users in the system. """
|
|
if SuperUserPermission().can():
|
|
users = model.get_active_users()
|
|
return {
|
|
'users': [user_view(user) for user in users]
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@require_fresh_login
|
|
@nickname('createInstallUser')
|
|
@validate_json_request('CreateInstallUser')
|
|
def post(self):
|
|
""" Creates a new user. """
|
|
user_information = request.get_json()
|
|
if SuperUserPermission().can():
|
|
username = user_information['username']
|
|
email = user_information['email']
|
|
|
|
# Generate a temporary password for the user.
|
|
random = SystemRandom()
|
|
password = ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(32)])
|
|
|
|
# Create the user.
|
|
user = model.create_user(username, password, email, auto_verify=not features.MAILING)
|
|
|
|
# If mailing is turned on, send the user a verification email.
|
|
if features.MAILING:
|
|
confirmation = model.create_confirm_email_code(user, new_email=user.email)
|
|
send_confirmation_email(user.username, user.email, confirmation.code)
|
|
|
|
return {
|
|
'username': username,
|
|
'email': email,
|
|
'password': password
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superusers/users/<username>/sendrecovery')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
@show_if(features.MAILING)
|
|
class SuperUserSendRecoveryEmail(ApiResource):
|
|
""" Resource for sending a recovery user on behalf of a user. """
|
|
@require_fresh_login
|
|
@nickname('sendInstallUserRecoveryEmail')
|
|
def post(self, username):
|
|
if SuperUserPermission().can():
|
|
user = model.get_user(username)
|
|
if not user or user.organization or user.robot:
|
|
abort(404)
|
|
|
|
if username in app.config['SUPER_USERS']:
|
|
abort(403)
|
|
|
|
code = model.create_reset_password_email_code(user.email)
|
|
send_recovery_email(user.email, code.code)
|
|
return {
|
|
'email': user.email
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/users/<username>')
|
|
@path_param('username', 'The username of the user being managed')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserManagement(ApiResource):
|
|
""" Resource for managing users in the system. """
|
|
schemas = {
|
|
'UpdateUser': {
|
|
'id': 'UpdateUser',
|
|
'type': 'object',
|
|
'description': 'Description of updates for a user',
|
|
'properties': {
|
|
'password': {
|
|
'type': 'string',
|
|
'description': 'The new password for the user',
|
|
},
|
|
'email': {
|
|
'type': 'string',
|
|
'description': 'The new e-mail address for the user',
|
|
}
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_fresh_login
|
|
@nickname('getInstallUser')
|
|
def get(self, username):
|
|
""" Returns information about the specified user. """
|
|
if SuperUserPermission().can():
|
|
user = model.get_user(username)
|
|
if not user or user.organization or user.robot:
|
|
abort(404)
|
|
|
|
return user_view(user)
|
|
|
|
abort(403)
|
|
|
|
@require_fresh_login
|
|
@nickname('deleteInstallUser')
|
|
def delete(self, username):
|
|
""" Deletes the specified user. """
|
|
if SuperUserPermission().can():
|
|
user = model.get_user(username)
|
|
if not user or user.organization or user.robot:
|
|
abort(404)
|
|
|
|
if username in app.config['SUPER_USERS']:
|
|
abort(403)
|
|
|
|
model.delete_user(user)
|
|
return 'Deleted', 204
|
|
|
|
abort(403)
|
|
|
|
@require_fresh_login
|
|
@nickname('changeInstallUser')
|
|
@validate_json_request('UpdateUser')
|
|
def put(self, username):
|
|
""" Updates information about the specified user. """
|
|
if SuperUserPermission().can():
|
|
user = model.get_user(username)
|
|
if not user or user.organization or user.robot:
|
|
abort(404)
|
|
|
|
if username in app.config['SUPER_USERS']:
|
|
abort(403)
|
|
|
|
user_data = request.get_json()
|
|
if 'password' in user_data:
|
|
model.change_password(user, user_data['password'])
|
|
|
|
if 'email' in user_data:
|
|
model.update_email(user, user_data['email'])
|
|
|
|
return user_view(user)
|
|
|
|
abort(403)
|