Add tests for the new super user config API and make sure both super user API endpoint sets are all guarded against being used in production
This commit is contained in:
parent
575d4c5062
commit
7933bd44fd
4 changed files with 224 additions and 15 deletions
|
@ -280,6 +280,23 @@ require_user_read = require_user_permission(UserReadPermission, scopes.READ_USER
|
||||||
require_user_admin = require_user_permission(UserAdminPermission, None)
|
require_user_admin = require_user_permission(UserAdminPermission, None)
|
||||||
require_fresh_user_admin = require_user_permission(UserAdminPermission, None)
|
require_fresh_user_admin = require_user_permission(UserAdminPermission, None)
|
||||||
|
|
||||||
|
|
||||||
|
def verify_not_prod(func):
|
||||||
|
@add_method_metadata('enterprise_only', True)
|
||||||
|
@wraps(func)
|
||||||
|
def wrapped(*args, **kwargs):
|
||||||
|
# Verify that we are not running on a production (i.e. hosted) stack. If so, we fail.
|
||||||
|
# This should never happen (because of the feature-flag on SUPER_USERS), but we want to be
|
||||||
|
# absolutely sure.
|
||||||
|
if app.config['SERVER_HOSTNAME'].find('quay.io') >= 0:
|
||||||
|
logger.error('!!! Super user method called IN PRODUCTION !!!')
|
||||||
|
raise NotFound()
|
||||||
|
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
return wrapped
|
||||||
|
|
||||||
|
|
||||||
def require_fresh_login(func):
|
def require_fresh_login(func):
|
||||||
@add_method_metadata('requires_fresh_login', True)
|
@add_method_metadata('requires_fresh_login', True)
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
|
|
|
@ -4,7 +4,7 @@ import json
|
||||||
|
|
||||||
from flask import abort
|
from flask import abort
|
||||||
from endpoints.api import (ApiResource, nickname, resource, internal_only, show_if, hide_if,
|
from endpoints.api import (ApiResource, nickname, resource, internal_only, show_if, hide_if,
|
||||||
require_fresh_login, request, validate_json_request)
|
require_fresh_login, request, validate_json_request, verify_not_prod)
|
||||||
|
|
||||||
from endpoints.common import common_login
|
from endpoints.common import common_login
|
||||||
from app import app, OVERRIDE_CONFIG_YAML_FILENAME, OVERRIDE_CONFIG_DIRECTORY
|
from app import app, OVERRIDE_CONFIG_YAML_FILENAME, OVERRIDE_CONFIG_DIRECTORY
|
||||||
|
@ -21,43 +21,45 @@ import features
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def database_is_valid():
|
def database_is_valid():
|
||||||
|
""" Returns whether the database, as configured, is valid. """
|
||||||
try:
|
try:
|
||||||
User.select().limit(1)
|
User.select().limit(1)
|
||||||
return True
|
return True
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def database_has_users():
|
def database_has_users():
|
||||||
|
""" Returns whether the database has any users defined. """
|
||||||
return bool(list(User.select().limit(1)))
|
return bool(list(User.select().limit(1)))
|
||||||
|
|
||||||
|
def config_file_exists():
|
||||||
|
""" Returns whether a configuration file exists. """
|
||||||
|
return os.path.exists(OVERRIDE_CONFIG_YAML_FILENAME)
|
||||||
|
|
||||||
|
|
||||||
@resource('/v1/superuser/registrystatus')
|
@resource('/v1/superuser/registrystatus')
|
||||||
@internal_only
|
@internal_only
|
||||||
@show_if(features.SUPER_USERS)
|
@show_if(features.SUPER_USERS)
|
||||||
@hide_if(features.BILLING) # Make sure it is never allowed in prod.
|
|
||||||
class SuperUserRegistryStatus(ApiResource):
|
class SuperUserRegistryStatus(ApiResource):
|
||||||
""" Resource for determining the status of the registry, such as if config exists,
|
""" Resource for determining the status of the registry, such as if config exists,
|
||||||
if a database is configured, and if it has any defined users.
|
if a database is configured, and if it has any defined users.
|
||||||
"""
|
"""
|
||||||
@nickname('scRegistryStatus')
|
@nickname('scRegistryStatus')
|
||||||
|
@verify_not_prod
|
||||||
def get(self):
|
def get(self):
|
||||||
""" Returns whether a valid configuration, database and users exist. """
|
""" Returns whether a valid configuration, database and users exist. """
|
||||||
current_user = get_authenticated_user()
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'dir_exists': os.path.exists(OVERRIDE_CONFIG_DIRECTORY),
|
'dir_exists': os.path.exists(OVERRIDE_CONFIG_DIRECTORY),
|
||||||
'file_exists': os.path.exists(OVERRIDE_CONFIG_YAML_FILENAME),
|
'file_exists': os.path.exists(OVERRIDE_CONFIG_YAML_FILENAME),
|
||||||
'is_testing': app.config['TESTING'],
|
'is_testing': app.config['TESTING'],
|
||||||
'valid_db': database_is_valid(),
|
'valid_db': database_is_valid(),
|
||||||
'ready': current_user and current_user.username in app.config['SUPER_USERS']
|
'ready': not app.config['TESTING'] and file_exists and bool(app.config['SUPER_USERS'])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@resource('/v1/superuser/config')
|
@resource('/v1/superuser/config')
|
||||||
@internal_only
|
@internal_only
|
||||||
@show_if(features.SUPER_USERS)
|
@show_if(features.SUPER_USERS)
|
||||||
@hide_if(features.BILLING) # Make sure it is never allowed in prod.
|
|
||||||
class SuperUserConfig(ApiResource):
|
class SuperUserConfig(ApiResource):
|
||||||
""" Resource for fetching and updating the current configuration, if any. """
|
""" Resource for fetching and updating the current configuration, if any. """
|
||||||
schemas = {
|
schemas = {
|
||||||
|
@ -81,6 +83,7 @@ class SuperUserConfig(ApiResource):
|
||||||
}
|
}
|
||||||
|
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('scGetConfig')
|
@nickname('scGetConfig')
|
||||||
def get(self):
|
def get(self):
|
||||||
""" Returns the currently defined configuration, if any. """
|
""" Returns the currently defined configuration, if any. """
|
||||||
|
@ -98,12 +101,13 @@ class SuperUserConfig(ApiResource):
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
@nickname('scUpdateConfig')
|
@nickname('scUpdateConfig')
|
||||||
|
@verify_not_prod
|
||||||
@validate_json_request('UpdateConfig')
|
@validate_json_request('UpdateConfig')
|
||||||
def put(self):
|
def put(self):
|
||||||
""" Updates the config.yaml file. """
|
""" Updates the config.yaml file. """
|
||||||
# Note: This method is called to set the database configuration before super users exists,
|
# Note: This method is called to set the database configuration before super users exists,
|
||||||
# so we also allow it to be called if there is no valid registry configuration setup.
|
# so we also allow it to be called if there is no valid registry configuration setup.
|
||||||
if not os.path.exists(OVERRIDE_CONFIG_YAML_FILENAME) or SuperUserPermission().can():
|
if not config_file_exists() or SuperUserPermission().can():
|
||||||
config_object = request.get_json()['config']
|
config_object = request.get_json()['config']
|
||||||
hostname = request.get_json()['hostname']
|
hostname = request.get_json()['hostname']
|
||||||
|
|
||||||
|
@ -124,10 +128,10 @@ class SuperUserConfig(ApiResource):
|
||||||
@resource('/v1/superuser/config/file/<filename>')
|
@resource('/v1/superuser/config/file/<filename>')
|
||||||
@internal_only
|
@internal_only
|
||||||
@show_if(features.SUPER_USERS)
|
@show_if(features.SUPER_USERS)
|
||||||
@hide_if(features.BILLING) # Make sure it is never allowed in prod.
|
|
||||||
class SuperUserConfigFile(ApiResource):
|
class SuperUserConfigFile(ApiResource):
|
||||||
""" Resource for fetching the status of config files and overriding them. """
|
""" Resource for fetching the status of config files and overriding them. """
|
||||||
@nickname('scConfigFileExists')
|
@nickname('scConfigFileExists')
|
||||||
|
@verify_not_prod
|
||||||
def get(self, filename):
|
def get(self, filename):
|
||||||
""" Returns whether the configuration file with the given name exists. """
|
""" Returns whether the configuration file with the given name exists. """
|
||||||
if not filename in SSL_FILENAMES:
|
if not filename in SSL_FILENAMES:
|
||||||
|
@ -141,6 +145,7 @@ class SuperUserConfigFile(ApiResource):
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
@nickname('scUpdateConfigFile')
|
@nickname('scUpdateConfigFile')
|
||||||
|
@verify_not_prod
|
||||||
def post(self, filename):
|
def post(self, filename):
|
||||||
""" Updates the configuration file with the given name. """
|
""" Updates the configuration file with the given name. """
|
||||||
if not filename in SSL_FILENAMES:
|
if not filename in SSL_FILENAMES:
|
||||||
|
@ -149,7 +154,7 @@ class SuperUserConfigFile(ApiResource):
|
||||||
if SuperUserPermission().can():
|
if SuperUserPermission().can():
|
||||||
uploaded_file = request.files['file']
|
uploaded_file = request.files['file']
|
||||||
if not uploaded_file:
|
if not uploaded_file:
|
||||||
abort(404)
|
abort(400)
|
||||||
|
|
||||||
uploaded_file.save(os.path.join(OVERRIDE_CONFIG_DIRECTORY, filename))
|
uploaded_file.save(os.path.join(OVERRIDE_CONFIG_DIRECTORY, filename))
|
||||||
return {
|
return {
|
||||||
|
@ -162,7 +167,6 @@ class SuperUserConfigFile(ApiResource):
|
||||||
@resource('/v1/superuser/config/createsuperuser')
|
@resource('/v1/superuser/config/createsuperuser')
|
||||||
@internal_only
|
@internal_only
|
||||||
@show_if(features.SUPER_USERS)
|
@show_if(features.SUPER_USERS)
|
||||||
@hide_if(features.BILLING) # Make sure it is never allowed in prod.
|
|
||||||
class SuperUserCreateInitialSuperUser(ApiResource):
|
class SuperUserCreateInitialSuperUser(ApiResource):
|
||||||
""" Resource for creating the initial super user. """
|
""" Resource for creating the initial super user. """
|
||||||
schemas = {
|
schemas = {
|
||||||
|
@ -193,6 +197,7 @@ class SuperUserCreateInitialSuperUser(ApiResource):
|
||||||
}
|
}
|
||||||
|
|
||||||
@nickname('scCreateInitialSuperuser')
|
@nickname('scCreateInitialSuperuser')
|
||||||
|
@verify_not_prod
|
||||||
@validate_json_request('CreateSuperUser')
|
@validate_json_request('CreateSuperUser')
|
||||||
def post(self):
|
def post(self):
|
||||||
""" Creates the initial super user, updates the underlying configuration and
|
""" Creates the initial super user, updates the underlying configuration and
|
||||||
|
@ -204,7 +209,7 @@ class SuperUserCreateInitialSuperUser(ApiResource):
|
||||||
#
|
#
|
||||||
# We do this special security check because at the point this method is called, the database
|
# We do this special security check because at the point this method is called, the database
|
||||||
# is clean but does not (yet) have any super users for our permissions code to check against.
|
# is clean but does not (yet) have any super users for our permissions code to check against.
|
||||||
if os.path.exists(OVERRIDE_CONFIG_YAML_FILENAME) and not database_has_users():
|
if config_file_exists() and not database_has_users():
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
username = data['username']
|
username = data['username']
|
||||||
password = data['password']
|
password = data['password']
|
||||||
|
@ -231,7 +236,6 @@ class SuperUserCreateInitialSuperUser(ApiResource):
|
||||||
@resource('/v1/superuser/config/validate/<service>')
|
@resource('/v1/superuser/config/validate/<service>')
|
||||||
@internal_only
|
@internal_only
|
||||||
@show_if(features.SUPER_USERS)
|
@show_if(features.SUPER_USERS)
|
||||||
@hide_if(features.BILLING) # Make sure it is never allowed in prod.
|
|
||||||
class SuperUserConfigValidate(ApiResource):
|
class SuperUserConfigValidate(ApiResource):
|
||||||
""" Resource for validating a block of configuration against an external service. """
|
""" Resource for validating a block of configuration against an external service. """
|
||||||
schemas = {
|
schemas = {
|
||||||
|
@ -251,13 +255,14 @@ class SuperUserConfigValidate(ApiResource):
|
||||||
}
|
}
|
||||||
|
|
||||||
@nickname('scValidateConfig')
|
@nickname('scValidateConfig')
|
||||||
|
@verify_not_prod
|
||||||
@validate_json_request('ValidateConfig')
|
@validate_json_request('ValidateConfig')
|
||||||
def post(self, service):
|
def post(self, service):
|
||||||
""" Validates the given config for the given service. """
|
""" Validates the given config for the given service. """
|
||||||
# Note: This method is called to validate the database configuration before super users exists,
|
# Note: This method is called to validate the database configuration before super users exists,
|
||||||
# so we also allow it to be called if there is no valid registry configuration setup. Note that
|
# so we also allow it to be called if there is no valid registry configuration setup. Note that
|
||||||
# this is also safe since this method does not access any information not given in the request.
|
# this is also safe since this method does not access any information not given in the request.
|
||||||
if not os.path.exists(OVERRIDE_CONFIG_YAML_FILENAME) or SuperUserPermission().can():
|
if not config_file_exists() or SuperUserPermission().can():
|
||||||
config = request.get_json()['config']
|
config = request.get_json()['config']
|
||||||
return validate_service_for_config(service, config)
|
return validate_service_for_config(service, config)
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ from flask import request
|
||||||
from endpoints.api import (ApiResource, nickname, resource, validate_json_request, request_error,
|
from endpoints.api import (ApiResource, nickname, resource, validate_json_request, request_error,
|
||||||
log_action, internal_only, NotFound, require_user_admin, format_date,
|
log_action, internal_only, NotFound, require_user_admin, format_date,
|
||||||
InvalidToken, require_scope, format_date, hide_if, show_if, parse_args,
|
InvalidToken, require_scope, format_date, hide_if, show_if, parse_args,
|
||||||
query_param, abort, require_fresh_login, path_param)
|
query_param, abort, require_fresh_login, path_param, verify_not_prod)
|
||||||
|
|
||||||
from endpoints.api.logs import get_logs
|
from endpoints.api.logs import get_logs
|
||||||
|
|
||||||
|
@ -38,6 +38,7 @@ def get_services():
|
||||||
class SuperUserGetLogsForService(ApiResource):
|
class SuperUserGetLogsForService(ApiResource):
|
||||||
""" Resource for fetching the kinds of system logs in the system. """
|
""" Resource for fetching the kinds of system logs in the system. """
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('getSystemLogs')
|
@nickname('getSystemLogs')
|
||||||
def get(self, service):
|
def get(self, service):
|
||||||
""" Returns the logs for the specific service. """
|
""" Returns the logs for the specific service. """
|
||||||
|
@ -65,6 +66,7 @@ class SuperUserGetLogsForService(ApiResource):
|
||||||
class SuperUserSystemLogServices(ApiResource):
|
class SuperUserSystemLogServices(ApiResource):
|
||||||
""" Resource for fetching the kinds of system logs in the system. """
|
""" Resource for fetching the kinds of system logs in the system. """
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('listSystemLogServices')
|
@nickname('listSystemLogServices')
|
||||||
def get(self):
|
def get(self):
|
||||||
""" List the system logs for the current system. """
|
""" List the system logs for the current system. """
|
||||||
|
@ -83,6 +85,7 @@ class SuperUserSystemLogServices(ApiResource):
|
||||||
class SuperUserLogs(ApiResource):
|
class SuperUserLogs(ApiResource):
|
||||||
""" Resource for fetching all logs in the system. """
|
""" Resource for fetching all logs in the system. """
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('listAllLogs')
|
@nickname('listAllLogs')
|
||||||
@parse_args
|
@parse_args
|
||||||
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
|
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
|
||||||
|
@ -115,6 +118,7 @@ def user_view(user):
|
||||||
class UsageInformation(ApiResource):
|
class UsageInformation(ApiResource):
|
||||||
""" Resource for returning the usage information for enterprise customers. """
|
""" Resource for returning the usage information for enterprise customers. """
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('getSystemUsage')
|
@nickname('getSystemUsage')
|
||||||
def get(self):
|
def get(self):
|
||||||
""" Returns the number of repository handles currently held. """
|
""" Returns the number of repository handles currently held. """
|
||||||
|
@ -153,6 +157,7 @@ class SuperUserList(ApiResource):
|
||||||
}
|
}
|
||||||
|
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('listAllUsers')
|
@nickname('listAllUsers')
|
||||||
def get(self):
|
def get(self):
|
||||||
""" Returns a list of all users in the system. """
|
""" Returns a list of all users in the system. """
|
||||||
|
@ -166,6 +171,7 @@ class SuperUserList(ApiResource):
|
||||||
|
|
||||||
|
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('createInstallUser')
|
@nickname('createInstallUser')
|
||||||
@validate_json_request('CreateInstallUser')
|
@validate_json_request('CreateInstallUser')
|
||||||
def post(self):
|
def post(self):
|
||||||
|
@ -203,6 +209,7 @@ class SuperUserList(ApiResource):
|
||||||
class SuperUserSendRecoveryEmail(ApiResource):
|
class SuperUserSendRecoveryEmail(ApiResource):
|
||||||
""" Resource for sending a recovery user on behalf of a user. """
|
""" Resource for sending a recovery user on behalf of a user. """
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('sendInstallUserRecoveryEmail')
|
@nickname('sendInstallUserRecoveryEmail')
|
||||||
def post(self, username):
|
def post(self, username):
|
||||||
if SuperUserPermission().can():
|
if SuperUserPermission().can():
|
||||||
|
@ -247,6 +254,7 @@ class SuperUserManagement(ApiResource):
|
||||||
}
|
}
|
||||||
|
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('getInstallUser')
|
@nickname('getInstallUser')
|
||||||
def get(self, username):
|
def get(self, username):
|
||||||
""" Returns information about the specified user. """
|
""" Returns information about the specified user. """
|
||||||
|
@ -260,6 +268,7 @@ class SuperUserManagement(ApiResource):
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('deleteInstallUser')
|
@nickname('deleteInstallUser')
|
||||||
def delete(self, username):
|
def delete(self, username):
|
||||||
""" Deletes the specified user. """
|
""" Deletes the specified user. """
|
||||||
|
@ -277,6 +286,7 @@ class SuperUserManagement(ApiResource):
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
@require_fresh_login
|
@require_fresh_login
|
||||||
|
@verify_not_prod
|
||||||
@nickname('changeInstallUser')
|
@nickname('changeInstallUser')
|
||||||
@validate_json_request('UpdateUser')
|
@validate_json_request('UpdateUser')
|
||||||
def put(self, username):
|
def put(self, username):
|
||||||
|
|
177
test/test_suconfig_api.py
Normal file
177
test/test_suconfig_api.py
Normal file
|
@ -0,0 +1,177 @@
|
||||||
|
from test.test_api_usage import ApiTestCase, READ_ACCESS_USER, ADMIN_ACCESS_USER
|
||||||
|
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile,
|
||||||
|
SuperUserCreateInitialSuperUser, SuperUserConfigValidate)
|
||||||
|
from app import OVERRIDE_CONFIG_YAML_FILENAME
|
||||||
|
from data.database import User
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
|
||||||
|
class TestSuperUserRegistryStatus(ApiTestCase):
|
||||||
|
def test_registry_status(self):
|
||||||
|
json = self.getJsonResponse(SuperUserRegistryStatus)
|
||||||
|
self.assertTrue(json['is_testing'])
|
||||||
|
self.assertTrue(json['valid_db'])
|
||||||
|
self.assertFalse(json['file_exists'])
|
||||||
|
self.assertFalse(json['ready'])
|
||||||
|
|
||||||
|
|
||||||
|
class TestSuperUserConfigFile(ApiTestCase):
|
||||||
|
def test_get_non_superuser(self):
|
||||||
|
# No user.
|
||||||
|
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
||||||
|
|
||||||
|
# Non-superuser.
|
||||||
|
self.login(READ_ACCESS_USER)
|
||||||
|
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
||||||
|
|
||||||
|
def test_get_superuser_invalid_filename(self):
|
||||||
|
self.login(ADMIN_ACCESS_USER)
|
||||||
|
self.getResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
|
||||||
|
|
||||||
|
def test_get_superuser(self):
|
||||||
|
self.login(ADMIN_ACCESS_USER)
|
||||||
|
result = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
|
||||||
|
self.assertFalse(result['exists'])
|
||||||
|
|
||||||
|
def test_post_non_superuser(self):
|
||||||
|
# No user.
|
||||||
|
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
||||||
|
|
||||||
|
# Non-superuser.
|
||||||
|
self.login(READ_ACCESS_USER)
|
||||||
|
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
||||||
|
|
||||||
|
def test_post_superuser_invalid_filename(self):
|
||||||
|
self.login(ADMIN_ACCESS_USER)
|
||||||
|
self.postResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
|
||||||
|
|
||||||
|
def test_post_superuser(self):
|
||||||
|
self.login(ADMIN_ACCESS_USER)
|
||||||
|
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSuperUserCreateInitialSuperUser(ApiTestCase):
|
||||||
|
def test_no_config_file(self):
|
||||||
|
# If there is no config.yaml, then this method should security fail.
|
||||||
|
data = dict(username='cooluser', password='password', email='fake@example.com')
|
||||||
|
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
|
||||||
|
|
||||||
|
def test_config_file_with_db_users(self):
|
||||||
|
try:
|
||||||
|
# Write some config.
|
||||||
|
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
||||||
|
|
||||||
|
# If there is a config.yaml, but existing DB users exist, then this method should security
|
||||||
|
# fail.
|
||||||
|
data = dict(username='cooluser', password='password', email='fake@example.com')
|
||||||
|
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
|
||||||
|
finally:
|
||||||
|
os.remove(OVERRIDE_CONFIG_YAML_FILENAME)
|
||||||
|
|
||||||
|
def test_config_file_with_no_db_users(self):
|
||||||
|
try:
|
||||||
|
# Write some config.
|
||||||
|
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
||||||
|
|
||||||
|
# Delete all the users in the DB.
|
||||||
|
for user in list(User.select()):
|
||||||
|
user.delete_instance(recursive=True)
|
||||||
|
|
||||||
|
# This method should now succeed.
|
||||||
|
data = dict(username='cooluser', password='password', email='fake@example.com')
|
||||||
|
result = self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data)
|
||||||
|
self.assertTrue(result['status'])
|
||||||
|
|
||||||
|
# Verify the superuser was created.
|
||||||
|
User.get(User.username == 'cooluser')
|
||||||
|
|
||||||
|
# Verify the superuser was placed into the config.
|
||||||
|
result = self.getJsonResponse(SuperUserConfig)
|
||||||
|
self.assertEquals(['cooluser'], result['config']['SUPER_USERS'])
|
||||||
|
|
||||||
|
finally:
|
||||||
|
os.remove(OVERRIDE_CONFIG_YAML_FILENAME)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSuperUserConfigValidate(ApiTestCase):
|
||||||
|
def test_nonsuperuser_noconfig(self):
|
||||||
|
self.login(ADMIN_ACCESS_USER)
|
||||||
|
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
|
||||||
|
data=dict(config={}))
|
||||||
|
|
||||||
|
self.assertFalse(result['status'])
|
||||||
|
|
||||||
|
|
||||||
|
def test_nonsuperuser_config(self):
|
||||||
|
try:
|
||||||
|
# The validate config call works if there is no config.yaml OR the user is a superuser.
|
||||||
|
# Add a config, and verify it breaks when unauthenticated.
|
||||||
|
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
||||||
|
self.assertTrue(json['exists'])
|
||||||
|
|
||||||
|
self.postResponse(SuperUserConfigValidate, params=dict(service='someservice'),
|
||||||
|
data=dict(config={}),
|
||||||
|
expected_code=403)
|
||||||
|
|
||||||
|
# Now login as a superuser.
|
||||||
|
self.login(ADMIN_ACCESS_USER)
|
||||||
|
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
|
||||||
|
data=dict(config={}))
|
||||||
|
|
||||||
|
self.assertFalse(result['status'])
|
||||||
|
finally:
|
||||||
|
os.remove(OVERRIDE_CONFIG_YAML_FILENAME)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSuperUserConfig(ApiTestCase):
|
||||||
|
def test_get_non_superuser(self):
|
||||||
|
# No user.
|
||||||
|
self.getResponse(SuperUserConfig, expected_code=401)
|
||||||
|
|
||||||
|
# Non-superuser.
|
||||||
|
self.login(READ_ACCESS_USER)
|
||||||
|
self.getResponse(SuperUserConfig, expected_code=403)
|
||||||
|
|
||||||
|
def test_get_superuser(self):
|
||||||
|
self.login(ADMIN_ACCESS_USER)
|
||||||
|
json = self.getJsonResponse(SuperUserConfig)
|
||||||
|
|
||||||
|
# Note: We expect the config to be none because a config.yaml should never be checked into
|
||||||
|
# the directory.
|
||||||
|
self.assertIsNone(json['config'])
|
||||||
|
|
||||||
|
def test_put(self):
|
||||||
|
try:
|
||||||
|
# The update config call works if there is no config.yaml OR the user is a superuser. First
|
||||||
|
# try writing it without a superuser present.
|
||||||
|
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
||||||
|
self.assertTrue(json['exists'])
|
||||||
|
|
||||||
|
# Verify the config.yaml file exists.
|
||||||
|
self.assertTrue(os.path.exists(OVERRIDE_CONFIG_YAML_FILENAME))
|
||||||
|
|
||||||
|
# Try writing it again. This should now fail, since the config.yaml exists.
|
||||||
|
self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403)
|
||||||
|
|
||||||
|
# Login as a non-superuser.
|
||||||
|
self.login(READ_ACCESS_USER)
|
||||||
|
|
||||||
|
# Try writing it again. This should fail.
|
||||||
|
self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403)
|
||||||
|
|
||||||
|
# Login as a superuser.
|
||||||
|
self.login(ADMIN_ACCESS_USER)
|
||||||
|
|
||||||
|
# This should succeed.
|
||||||
|
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'))
|
||||||
|
self.assertTrue(json['exists'])
|
||||||
|
|
||||||
|
json = self.getJsonResponse(SuperUserConfig)
|
||||||
|
self.assertIsNotNone(json['config'])
|
||||||
|
|
||||||
|
finally:
|
||||||
|
os.remove(OVERRIDE_CONFIG_YAML_FILENAME)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Reference in a new issue