43aed7c6f4
Return an empty body on API requests with status code 204, which means "No content". Incorrect 'Deleted' responses were being returned after successful DELETE operations despite the "No Content" definition of 204.
895 lines
26 KiB
Python
895 lines
26 KiB
Python
""" Superuser API. """
|
|
|
|
import logging
|
|
import os
|
|
import string
|
|
|
|
from datetime import datetime
|
|
from random import SystemRandom
|
|
|
|
from flask import request, make_response, jsonify
|
|
|
|
import features
|
|
|
|
from app import (app, avatar, superusers, authentication, config_provider, license_validator,
|
|
all_queues)
|
|
from auth import scopes
|
|
from auth.auth_context import get_authenticated_user
|
|
from auth.permissions import SuperUserPermission
|
|
from endpoints.api import (ApiResource, nickname, resource, validate_json_request,
|
|
internal_only, require_scope, show_if, parse_args,
|
|
query_param, abort, require_fresh_login, path_param, verify_not_prod,
|
|
page_support, log_action, InvalidRequest)
|
|
from endpoints.api.logs import get_logs, get_aggregate_logs
|
|
from data import model
|
|
from data.database import ServiceKeyApprovalType
|
|
from util.useremails import send_confirmation_email, send_recovery_email
|
|
from util.license import decode_license, LicenseDecodeError
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_immediate_subdirectories(directory):
|
|
return [name for name in os.listdir(directory) if os.path.isdir(os.path.join(directory, name))]
|
|
|
|
|
|
def get_services():
|
|
services = set(get_immediate_subdirectories(app.config['SYSTEM_SERVICES_PATH']))
|
|
services = services - set(app.config['SYSTEM_SERVICE_BLACKLIST'])
|
|
return services
|
|
|
|
|
|
@resource('/v1/superuser/systemlogs/<service>')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserGetLogsForService(ApiResource):
|
|
""" Resource for fetching the kinds of system logs in the system. """
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('getSystemLogs')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def get(self, service):
|
|
""" Returns the logs for the specific service. """
|
|
if SuperUserPermission().can():
|
|
if not service in get_services():
|
|
abort(404)
|
|
|
|
logs = []
|
|
try:
|
|
with open(app.config['SYSTEM_LOGS_FILE'], 'r') as f:
|
|
logs = [line for line in f if line.find(service + '[') >= 0]
|
|
|
|
except Exception:
|
|
logger.exception('Cannot read logs')
|
|
abort(400)
|
|
|
|
return {
|
|
'logs': '\n'.join(logs)
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/systemlogs/')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserSystemLogServices(ApiResource):
|
|
""" Resource for fetching the kinds of system logs in the system. """
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('listSystemLogServices')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def get(self):
|
|
""" List the system logs for the current system. """
|
|
if SuperUserPermission().can():
|
|
return {
|
|
'services': list(get_services())
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/aggregatelogs')
|
|
@internal_only
|
|
class SuperUserAggregateLogs(ApiResource):
|
|
""" Resource for fetching aggregated logs for the current user. """
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('listAllAggregateLogs')
|
|
@parse_args()
|
|
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
|
|
@query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str)
|
|
def get(self, parsed_args):
|
|
""" Returns the aggregated logs for the current system. """
|
|
if SuperUserPermission().can():
|
|
start_time = parsed_args['starttime']
|
|
end_time = parsed_args['endtime']
|
|
|
|
return get_aggregate_logs(start_time, end_time)
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/logs')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserLogs(ApiResource):
|
|
""" Resource for fetching all logs in the system. """
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('listAllLogs')
|
|
@parse_args()
|
|
@query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str)
|
|
@query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str)
|
|
@query_param('page', 'The page number for the logs', type=int, default=1)
|
|
@page_support()
|
|
@require_scope(scopes.SUPERUSER)
|
|
def get(self, parsed_args, page_token):
|
|
""" List the usage logs for the current system. """
|
|
if SuperUserPermission().can():
|
|
start_time = parsed_args['starttime']
|
|
end_time = parsed_args['endtime']
|
|
|
|
return get_logs(start_time, end_time, page_token=page_token)
|
|
|
|
abort(403)
|
|
|
|
|
|
def org_view(org):
|
|
return {
|
|
'name': org.username,
|
|
'email': org.email,
|
|
'avatar': avatar.get_data_for_org(org),
|
|
}
|
|
|
|
def user_view(user, password=None):
|
|
user_data = {
|
|
'kind': 'user',
|
|
'name': user.username,
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'verified': user.verified,
|
|
'avatar': avatar.get_data_for_user(user),
|
|
'super_user': superusers.is_superuser(user.username),
|
|
'enabled': user.enabled,
|
|
}
|
|
|
|
if password is not None:
|
|
user_data['encrypted_password'] = authentication.encrypt_user_password(password)
|
|
|
|
return user_data
|
|
|
|
@resource('/v1/superuser/changelog/')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class ChangeLog(ApiResource):
|
|
""" Resource for returning the change log for enterprise customers. """
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('getChangeLog')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def get(self):
|
|
""" Returns the change log for this installation. """
|
|
if SuperUserPermission().can():
|
|
with open('CHANGELOG.md', 'r') as f:
|
|
return {
|
|
'log': f.read()
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
|
|
@resource('/v1/superuser/organizations/')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserOrganizationList(ApiResource):
|
|
""" Resource for listing organizations in the system. """
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('listAllOrganizations')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def get(self):
|
|
""" Returns a list of all organizations in the system. """
|
|
if SuperUserPermission().can():
|
|
orgs = model.organization.get_organizations()
|
|
return {
|
|
'organizations': [org_view(org) for org in orgs]
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/users/')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserList(ApiResource):
|
|
""" Resource for listing users in the system. """
|
|
schemas = {
|
|
'CreateInstallUser': {
|
|
'id': 'CreateInstallUser',
|
|
'description': 'Data for creating a user',
|
|
'required': ['username'],
|
|
'properties': {
|
|
'username': {
|
|
'type': 'string',
|
|
'description': 'The username of the user being created'
|
|
},
|
|
|
|
'email': {
|
|
'type': 'string',
|
|
'description': 'The email address of the user being created'
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('listAllUsers')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def get(self):
|
|
""" Returns a list of all users in the system. """
|
|
if SuperUserPermission().can():
|
|
users = model.user.get_active_users()
|
|
return {
|
|
'users': [user_view(user) for user in users]
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('createInstallUser')
|
|
@validate_json_request('CreateInstallUser')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def post(self):
|
|
""" Creates a new user. """
|
|
# Ensure that we are using database auth.
|
|
if app.config['AUTHENTICATION_TYPE'] != 'Database':
|
|
abort(400)
|
|
|
|
user_information = request.get_json()
|
|
if SuperUserPermission().can():
|
|
# Generate a temporary password for the user.
|
|
random = SystemRandom()
|
|
password = ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(32)])
|
|
|
|
# Create the user.
|
|
username = user_information['username']
|
|
email = user_information.get('email')
|
|
prompts = model.user.get_default_user_prompts(features)
|
|
user = model.user.create_user(username, password, email, auto_verify=not features.MAILING,
|
|
email_required=features.MAILING, prompts=prompts)
|
|
|
|
# If mailing is turned on, send the user a verification email.
|
|
if features.MAILING:
|
|
confirmation = model.user.create_confirm_email_code(user)
|
|
send_confirmation_email(user.username, user.email, confirmation.code)
|
|
|
|
return {
|
|
'username': username,
|
|
'email': email,
|
|
'password': password,
|
|
'encrypted_password': authentication.encrypt_user_password(password),
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superusers/users/<username>/sendrecovery')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
@show_if(features.MAILING)
|
|
class SuperUserSendRecoveryEmail(ApiResource):
|
|
""" Resource for sending a recovery user on behalf of a user. """
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('sendInstallUserRecoveryEmail')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def post(self, username):
|
|
# Ensure that we are using database auth.
|
|
if app.config['AUTHENTICATION_TYPE'] != 'Database':
|
|
abort(400)
|
|
|
|
if SuperUserPermission().can():
|
|
user = model.user.get_nonrobot_user(username)
|
|
if not user:
|
|
abort(404)
|
|
|
|
if superusers.is_superuser(username):
|
|
abort(403)
|
|
|
|
code = model.user.create_reset_password_email_code(user.email)
|
|
send_recovery_email(user.email, code.code)
|
|
return {
|
|
'email': user.email
|
|
}
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/users/<username>')
|
|
@path_param('username', 'The username of the user being managed')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserManagement(ApiResource):
|
|
""" Resource for managing users in the system. """
|
|
schemas = {
|
|
'UpdateUser': {
|
|
'id': 'UpdateUser',
|
|
'type': 'object',
|
|
'description': 'Description of updates for a user',
|
|
'properties': {
|
|
'password': {
|
|
'type': 'string',
|
|
'description': 'The new password for the user',
|
|
},
|
|
'email': {
|
|
'type': 'string',
|
|
'description': 'The new e-mail address for the user',
|
|
},
|
|
'enabled': {
|
|
'type': 'boolean',
|
|
'description': 'Whether the user is enabled'
|
|
}
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('getInstallUser')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def get(self, username):
|
|
""" Returns information about the specified user. """
|
|
if SuperUserPermission().can():
|
|
user = model.user.get_nonrobot_user(username)
|
|
if not user:
|
|
abort(404)
|
|
|
|
return user_view(user)
|
|
|
|
abort(403)
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('deleteInstallUser')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def delete(self, username):
|
|
""" Deletes the specified user. """
|
|
if SuperUserPermission().can():
|
|
user = model.user.get_nonrobot_user(username)
|
|
if not user:
|
|
abort(404)
|
|
|
|
if superusers.is_superuser(username):
|
|
abort(403)
|
|
|
|
model.user.delete_user(user, all_queues, force=True)
|
|
return '', 204
|
|
|
|
abort(403)
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('changeInstallUser')
|
|
@validate_json_request('UpdateUser')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def put(self, username):
|
|
""" Updates information about the specified user. """
|
|
if SuperUserPermission().can():
|
|
user = model.user.get_nonrobot_user(username)
|
|
if not user:
|
|
abort(404)
|
|
|
|
if superusers.is_superuser(username):
|
|
abort(403)
|
|
|
|
user_data = request.get_json()
|
|
if 'password' in user_data:
|
|
# Ensure that we are using database auth.
|
|
if app.config['AUTHENTICATION_TYPE'] != 'Database':
|
|
abort(400)
|
|
|
|
model.user.change_password(user, user_data['password'])
|
|
|
|
if 'email' in user_data:
|
|
# Ensure that we are using database auth.
|
|
if app.config['AUTHENTICATION_TYPE'] != 'Database':
|
|
abort(400)
|
|
|
|
model.user.update_email(user, user_data['email'], auto_verify=True)
|
|
|
|
if 'enabled' in user_data:
|
|
# Disable/enable the user.
|
|
user.enabled = bool(user_data['enabled'])
|
|
user.save()
|
|
|
|
if 'superuser' in user_data:
|
|
config_object = config_provider.get_config()
|
|
superusers_set = set(config_object['SUPER_USERS'])
|
|
|
|
if user_data['superuser']:
|
|
superusers_set.add(username)
|
|
elif username in superusers_set:
|
|
superusers_set.remove(username)
|
|
|
|
config_object['SUPER_USERS'] = list(superusers_set)
|
|
config_provider.save_config(config_object)
|
|
|
|
return user_view(user, password=user_data.get('password'))
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/takeownership/<namespace>')
|
|
@path_param('namespace', 'The namespace of the user or organization being managed')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserTakeOwnership(ApiResource):
|
|
""" Resource for a superuser to take ownership of a namespace. """
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('takeOwnership')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def post(self, namespace):
|
|
""" Takes ownership of the specified organization or user. """
|
|
if SuperUserPermission().can():
|
|
# Disallow for superusers.
|
|
if superusers.is_superuser(namespace):
|
|
abort(400)
|
|
|
|
entity = model.user.get_user_or_org(namespace)
|
|
if entity is None:
|
|
abort(404)
|
|
|
|
authed_user = get_authenticated_user()
|
|
was_user = not entity.organization
|
|
if entity.organization:
|
|
# Add the superuser as an admin to the owners team of the org.
|
|
model.organization.add_user_as_admin(authed_user, entity)
|
|
else:
|
|
# If the entity is a user, convert it to an organization and add the current superuser
|
|
# as the admin.
|
|
model.organization.convert_user_to_organization(entity, get_authenticated_user())
|
|
|
|
# Log the change.
|
|
log_metadata = {
|
|
'entity_id': entity.id,
|
|
'namespace': namespace,
|
|
'was_user': was_user,
|
|
'superuser': authed_user.username,
|
|
}
|
|
|
|
log_action('take_ownership', authed_user.username, log_metadata)
|
|
|
|
return jsonify({
|
|
'namespace': namespace
|
|
})
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/organizations/<name>')
|
|
@path_param('name', 'The name of the organizaton being managed')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserOrganizationManagement(ApiResource):
|
|
""" Resource for managing organizations in the system. """
|
|
schemas = {
|
|
'UpdateOrg': {
|
|
'id': 'UpdateOrg',
|
|
'type': 'object',
|
|
'description': 'Description of updates for an organization',
|
|
'properties': {
|
|
'name': {
|
|
'type': 'string',
|
|
'description': 'The new name for the organization',
|
|
}
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('deleteOrganization')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def delete(self, name):
|
|
""" Deletes the specified organization. """
|
|
if SuperUserPermission().can():
|
|
org = model.organization.get_organization(name)
|
|
|
|
model.user.delete_user(org, all_queues)
|
|
return '', 204
|
|
|
|
abort(403)
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('changeOrganization')
|
|
@validate_json_request('UpdateOrg')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def put(self, name):
|
|
""" Updates information about the specified user. """
|
|
if SuperUserPermission().can():
|
|
org = model.organization.get_organization(name)
|
|
org_data = request.get_json()
|
|
|
|
if 'name' in org_data:
|
|
org = model.user.change_username(org.id, org_data['name'])
|
|
|
|
return org_view(org)
|
|
|
|
abort(403)
|
|
|
|
|
|
def key_view(key):
|
|
return {
|
|
'name': key.name,
|
|
'kid': key.kid,
|
|
'service': key.service,
|
|
'jwk': key.jwk,
|
|
'metadata': key.metadata,
|
|
'created_date': key.created_date,
|
|
'expiration_date': key.expiration_date,
|
|
'rotation_duration': key.rotation_duration,
|
|
'approval': approval_view(key.approval) if key.approval is not None else None,
|
|
}
|
|
|
|
|
|
def approval_view(approval):
|
|
return {
|
|
'approver': user_view(approval.approver) if approval.approver else None,
|
|
'approval_type': approval.approval_type,
|
|
'approved_date': approval.approved_date,
|
|
'notes': approval.notes,
|
|
}
|
|
|
|
|
|
@resource('/v1/superuser/keys')
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserServiceKeyManagement(ApiResource):
|
|
""" Resource for managing service keys."""
|
|
schemas = {
|
|
'CreateServiceKey': {
|
|
'id': 'CreateServiceKey',
|
|
'type': 'object',
|
|
'description': 'Description of creation of a service key',
|
|
'required': ['service', 'expiration'],
|
|
'properties': {
|
|
'service': {
|
|
'type': 'string',
|
|
'description': 'The service authenticating with this key',
|
|
},
|
|
'name': {
|
|
'type': 'string',
|
|
'description': 'The friendly name of a service key',
|
|
},
|
|
'metadata': {
|
|
'type': 'object',
|
|
'description': 'The key/value pairs of this key\'s metadata',
|
|
},
|
|
'notes': {
|
|
'type': 'string',
|
|
'description': 'If specified, the extra notes for the key',
|
|
},
|
|
'expiration': {
|
|
'description': 'The expiration date as a unix timestamp',
|
|
'anyOf': [{'type': 'number'}, {'type': 'null'}],
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@verify_not_prod
|
|
@nickname('listServiceKeys')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def get(self):
|
|
if SuperUserPermission().can():
|
|
keys = model.service_keys.list_all_keys()
|
|
|
|
return jsonify({
|
|
'keys': [key_view(key) for key in keys],
|
|
})
|
|
|
|
abort(403)
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('createServiceKey')
|
|
@require_scope(scopes.SUPERUSER)
|
|
@validate_json_request('CreateServiceKey')
|
|
def post(self):
|
|
if SuperUserPermission().can():
|
|
body = request.get_json()
|
|
|
|
# Ensure we have a valid expiration date if specified.
|
|
expiration_date = body.get('expiration', None)
|
|
if expiration_date is not None:
|
|
try:
|
|
expiration_date = datetime.utcfromtimestamp(float(expiration_date))
|
|
except ValueError:
|
|
abort(400)
|
|
|
|
if expiration_date <= datetime.now():
|
|
abort(400)
|
|
|
|
# Create the metadata for the key.
|
|
user = get_authenticated_user()
|
|
metadata = body.get('metadata', {})
|
|
metadata.update({
|
|
'created_by': 'Quay Superuser Panel',
|
|
'creator': user.username,
|
|
'ip': request.remote_addr,
|
|
})
|
|
|
|
# Generate a key with a private key that we *never save*.
|
|
(private_key, key) = model.service_keys.generate_service_key(body['service'], expiration_date,
|
|
metadata=metadata,
|
|
name=body.get('name', ''))
|
|
# Auto-approve the service key.
|
|
model.service_keys.approve_service_key(key.kid, user, ServiceKeyApprovalType.SUPERUSER,
|
|
notes=body.get('notes', ''))
|
|
|
|
# Log the creation and auto-approval of the service key.
|
|
key_log_metadata = {
|
|
'kid': key.kid,
|
|
'preshared': True,
|
|
'service': body['service'],
|
|
'name': body.get('name', ''),
|
|
'expiration_date': expiration_date,
|
|
'auto_approved': True,
|
|
}
|
|
|
|
log_action('service_key_create', None, key_log_metadata)
|
|
log_action('service_key_approve', None, key_log_metadata)
|
|
|
|
return jsonify({
|
|
'kid': key.kid,
|
|
'name': body.get('name', ''),
|
|
'service': body['service'],
|
|
'public_key': private_key.publickey().exportKey('PEM'),
|
|
'private_key': private_key.exportKey('PEM'),
|
|
})
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/keys/<kid>')
|
|
@path_param('kid', 'The unique identifier for a service key')
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserServiceKey(ApiResource):
|
|
""" Resource for managing service keys. """
|
|
schemas = {
|
|
'PutServiceKey': {
|
|
'id': 'PutServiceKey',
|
|
'type': 'object',
|
|
'description': 'Description of updates for a service key',
|
|
'properties': {
|
|
'name': {
|
|
'type': 'string',
|
|
'description': 'The friendly name of a service key',
|
|
},
|
|
'metadata': {
|
|
'type': 'object',
|
|
'description': 'The key/value pairs of this key\'s metadata',
|
|
},
|
|
'expiration': {
|
|
'description': 'The expiration date as a unix timestamp',
|
|
'anyOf': [{'type': 'number'}, {'type': 'null'}],
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@verify_not_prod
|
|
@nickname('getServiceKey')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def get(self, kid):
|
|
if SuperUserPermission().can():
|
|
try:
|
|
key = model.service_keys.get_service_key(kid, approved_only=False, alive_only=False)
|
|
return jsonify(key_view(key))
|
|
except model.service_keys.ServiceKeyDoesNotExist:
|
|
abort(404)
|
|
|
|
abort(403)
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('updateServiceKey')
|
|
@require_scope(scopes.SUPERUSER)
|
|
@validate_json_request('PutServiceKey')
|
|
def put(self, kid):
|
|
if SuperUserPermission().can():
|
|
body = request.get_json()
|
|
try:
|
|
key = model.service_keys.get_service_key(kid, approved_only=False, alive_only=False)
|
|
except model.service_keys.ServiceKeyDoesNotExist:
|
|
abort(404)
|
|
|
|
key_log_metadata = {
|
|
'kid': key.kid,
|
|
'service': key.service,
|
|
'name': body.get('name', key.name),
|
|
'expiration_date': key.expiration_date,
|
|
}
|
|
|
|
if 'expiration' in body:
|
|
expiration_date = body['expiration']
|
|
if expiration_date is not None and expiration_date != '':
|
|
try:
|
|
expiration_date = datetime.utcfromtimestamp(float(expiration_date))
|
|
except ValueError:
|
|
abort(400)
|
|
|
|
if expiration_date <= datetime.now():
|
|
abort(400)
|
|
|
|
key_log_metadata.update({
|
|
'old_expiration_date': key.expiration_date,
|
|
'expiration_date': expiration_date,
|
|
})
|
|
|
|
log_action('service_key_extend', None, key_log_metadata)
|
|
model.service_keys.set_key_expiration(kid, expiration_date)
|
|
|
|
|
|
if 'name' in body or 'metadata' in body:
|
|
model.service_keys.update_service_key(kid, body.get('name'), body.get('metadata'))
|
|
log_action('service_key_modify', None, key_log_metadata)
|
|
|
|
updated_key = model.service_keys.get_service_key(kid, approved_only=False, alive_only=False)
|
|
return jsonify(key_view(updated_key))
|
|
|
|
abort(403)
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('deleteServiceKey')
|
|
@require_scope(scopes.SUPERUSER)
|
|
def delete(self, kid):
|
|
if SuperUserPermission().can():
|
|
try:
|
|
key = model.service_keys.delete_service_key(kid)
|
|
except model.service_keys.ServiceKeyDoesNotExist:
|
|
abort(404)
|
|
|
|
key_log_metadata = {
|
|
'kid': kid,
|
|
'service': key.service,
|
|
'name': key.name,
|
|
'created_date': key.created_date,
|
|
'expiration_date': key.expiration_date,
|
|
}
|
|
|
|
log_action('service_key_delete', None, key_log_metadata)
|
|
return make_response('', 204)
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/approvedkeys/<kid>')
|
|
@path_param('kid', 'The unique identifier for a service key')
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserServiceKeyApproval(ApiResource):
|
|
""" Resource for approving service keys. """
|
|
|
|
schemas = {
|
|
'ApproveServiceKey': {
|
|
'id': 'ApproveServiceKey',
|
|
'type': 'object',
|
|
'description': 'Information for approving service keys',
|
|
'properties': {
|
|
'notes': {
|
|
'type': 'string',
|
|
'description': 'Optional approval notes',
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_fresh_login
|
|
@verify_not_prod
|
|
@nickname('approveServiceKey')
|
|
@require_scope(scopes.SUPERUSER)
|
|
@validate_json_request('ApproveServiceKey')
|
|
def post(self, kid):
|
|
if SuperUserPermission().can():
|
|
notes = request.get_json().get('notes', '')
|
|
approver = get_authenticated_user()
|
|
try:
|
|
key = model.service_keys.approve_service_key(kid, approver, ServiceKeyApprovalType.SUPERUSER,
|
|
notes=notes)
|
|
|
|
# Log the approval of the service key.
|
|
key_log_metadata = {
|
|
'kid': kid,
|
|
'service': key.service,
|
|
'name': key.name,
|
|
'expiration_date': key.expiration_date,
|
|
}
|
|
|
|
log_action('service_key_approve', None, key_log_metadata)
|
|
except model.ServiceKeyDoesNotExist:
|
|
abort(404)
|
|
except model.ServiceKeyAlreadyApproved:
|
|
pass
|
|
|
|
return make_response('', 201)
|
|
|
|
abort(403)
|
|
|
|
|
|
@resource('/v1/superuser/license')
|
|
@internal_only
|
|
@show_if(features.SUPER_USERS)
|
|
class SuperUserLicense(ApiResource):
|
|
""" Resource for getting and setting a license. """
|
|
schemas = {
|
|
'UpdateLicense': {
|
|
'type': 'object',
|
|
'description': 'Updates a license',
|
|
'required': [
|
|
'license',
|
|
],
|
|
'properties': {
|
|
'license': {
|
|
'type': 'string'
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@nickname('getLicense')
|
|
@require_fresh_login
|
|
@require_scope(scopes.SUPERUSER)
|
|
@verify_not_prod
|
|
def get(self):
|
|
""" Returns the current decoded license. """
|
|
if SuperUserPermission().can():
|
|
try:
|
|
decoded_license = config_provider.get_license()
|
|
except LicenseDecodeError as le:
|
|
raise InvalidRequest(le.message)
|
|
|
|
statuses = decoded_license.validate(app.config)
|
|
all_met = all(status.is_met() for status in statuses)
|
|
|
|
return {
|
|
'status': [status.as_dict(for_private=True) for status in statuses],
|
|
'success': all_met,
|
|
}
|
|
|
|
abort(403)
|
|
|
|
@nickname('updateLicense')
|
|
@require_fresh_login
|
|
@require_scope(scopes.SUPERUSER)
|
|
@verify_not_prod
|
|
@validate_json_request('UpdateLicense')
|
|
def put(self):
|
|
""" Validates the given license contents and then saves it to the config volume. """
|
|
if SuperUserPermission().can():
|
|
license_contents = request.get_json()['license']
|
|
try:
|
|
decoded_license = decode_license(license_contents)
|
|
except LicenseDecodeError as le:
|
|
raise InvalidRequest(le.message)
|
|
|
|
statuses = decoded_license.validate(app.config)
|
|
all_met = all(status.is_met() for status in statuses)
|
|
if all_met:
|
|
# Save the license and update the license check thread.
|
|
config_provider.save_license(license_contents)
|
|
license_validator.compute_license_sufficiency()
|
|
|
|
return {
|
|
'status': [status.as_dict(for_private=True) for status in statuses],
|
|
'success': all_met,
|
|
}
|
|
|
|
abort(403)
|