2017-02-14 19:30:53 +00:00
|
|
|
from test.test_api_usage import ApiTestCase, READ_ACCESS_USER, ADMIN_ACCESS_USER
|
2015-01-08 17:53:36 +00:00
|
|
|
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile,
|
|
|
|
SuperUserCreateInitialSuperUser, SuperUserConfigValidate)
|
2016-08-09 21:58:33 +00:00
|
|
|
from app import config_provider, all_queues
|
2015-01-08 17:53:36 +00:00
|
|
|
from data.database import User
|
2016-08-09 21:58:33 +00:00
|
|
|
from data import model
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
import unittest
|
2015-01-09 21:23:31 +00:00
|
|
|
|
|
|
|
|
2017-02-14 19:30:53 +00:00
|
|
|
class FreshConfigProvider(object):
|
|
|
|
def __enter__(self):
|
|
|
|
config_provider.reset_for_test()
|
|
|
|
return config_provider
|
|
|
|
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
|
|
config_provider.reset_for_test()
|
|
|
|
|
|
|
|
|
2015-01-08 17:53:36 +00:00
|
|
|
class TestSuperUserRegistryStatus(ApiTestCase):
|
|
|
|
def test_registry_status(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 21:23:31 +00:00
|
|
|
json = self.getJsonResponse(SuperUserRegistryStatus)
|
2015-12-08 20:00:50 +00:00
|
|
|
self.assertEquals('upload-license', json['status'])
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
|
|
|
|
class TestSuperUserConfigFile(ApiTestCase):
|
|
|
|
def test_get_non_superuser(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 22:11:51 +00:00
|
|
|
# No user.
|
|
|
|
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
2015-01-09 22:11:51 +00:00
|
|
|
# Non-superuser.
|
|
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
def test_get_superuser_invalid_filename(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 22:11:51 +00:00
|
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.getResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
def test_get_superuser(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 22:11:51 +00:00
|
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
result = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
|
|
|
|
self.assertFalse(result['exists'])
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
def test_post_non_superuser(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-06-29 05:08:10 +00:00
|
|
|
# No user, before config.yaml exists.
|
|
|
|
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400)
|
|
|
|
|
|
|
|
# Write some config.
|
|
|
|
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
|
|
|
|
|
|
|
# No user, with config.yaml.
|
2015-01-09 22:11:51 +00:00
|
|
|
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
2015-01-09 22:11:51 +00:00
|
|
|
# Non-superuser.
|
|
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
def test_post_superuser_invalid_filename(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 22:11:51 +00:00
|
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.postResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
def test_post_superuser(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 22:11:51 +00:00
|
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
|
|
|
|
class TestSuperUserCreateInitialSuperUser(ApiTestCase):
|
|
|
|
def test_no_config_file(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 22:11:51 +00:00
|
|
|
# If there is no config.yaml, then this method should security fail.
|
|
|
|
data = dict(username='cooluser', password='password', email='fake@example.com')
|
|
|
|
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
def test_config_file_with_db_users(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-08 17:53:36 +00:00
|
|
|
# Write some config.
|
|
|
|
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
|
|
|
|
|
|
|
# If there is a config.yaml, but existing DB users exist, then this method should security
|
|
|
|
# fail.
|
|
|
|
data = dict(username='cooluser', password='password', email='fake@example.com')
|
|
|
|
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
|
|
|
|
|
|
|
|
def test_config_file_with_no_db_users(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-08 17:53:36 +00:00
|
|
|
# Write some config.
|
|
|
|
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
|
|
|
|
|
|
|
# Delete all the users in the DB.
|
|
|
|
for user in list(User.select()):
|
2018-02-23 21:45:16 +00:00
|
|
|
model.user.delete_user(user, all_queues)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
# This method should now succeed.
|
|
|
|
data = dict(username='cooluser', password='password', email='fake@example.com')
|
|
|
|
result = self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data)
|
|
|
|
self.assertTrue(result['status'])
|
|
|
|
|
|
|
|
# Verify the superuser was created.
|
|
|
|
User.get(User.username == 'cooluser')
|
|
|
|
|
|
|
|
# Verify the superuser was placed into the config.
|
|
|
|
result = self.getJsonResponse(SuperUserConfig)
|
|
|
|
self.assertEquals(['cooluser'], result['config']['SUPER_USERS'])
|
|
|
|
|
|
|
|
|
|
|
|
class TestSuperUserConfigValidate(ApiTestCase):
|
|
|
|
def test_nonsuperuser_noconfig(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 22:11:51 +00:00
|
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
|
|
|
|
data=dict(config={}))
|
2015-01-08 17:53:36 +00:00
|
|
|
|
2015-01-09 22:11:51 +00:00
|
|
|
self.assertFalse(result['status'])
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_nonsuperuser_config(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-08 17:53:36 +00:00
|
|
|
# The validate config call works if there is no config.yaml OR the user is a superuser.
|
|
|
|
# Add a config, and verify it breaks when unauthenticated.
|
|
|
|
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
|
|
|
self.assertTrue(json['exists'])
|
|
|
|
|
|
|
|
self.postResponse(SuperUserConfigValidate, params=dict(service='someservice'),
|
|
|
|
data=dict(config={}),
|
|
|
|
expected_code=403)
|
|
|
|
|
|
|
|
# Now login as a superuser.
|
|
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
|
|
|
|
data=dict(config={}))
|
|
|
|
|
|
|
|
self.assertFalse(result['status'])
|
|
|
|
|
|
|
|
|
|
|
|
class TestSuperUserConfig(ApiTestCase):
|
|
|
|
def test_get_non_superuser(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 22:11:51 +00:00
|
|
|
# No user.
|
|
|
|
self.getResponse(SuperUserConfig, expected_code=401)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
2015-01-09 22:11:51 +00:00
|
|
|
# Non-superuser.
|
|
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
self.getResponse(SuperUserConfig, expected_code=403)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
def test_get_superuser(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider():
|
2015-01-09 22:11:51 +00:00
|
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(SuperUserConfig)
|
2015-01-08 17:53:36 +00:00
|
|
|
|
2015-01-09 22:11:51 +00:00
|
|
|
# Note: We expect the config to be none because a config.yaml should never be checked into
|
|
|
|
# the directory.
|
|
|
|
self.assertIsNone(json['config'])
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
def test_put(self):
|
2017-02-14 19:30:53 +00:00
|
|
|
with FreshConfigProvider() as config:
|
2015-01-08 17:53:36 +00:00
|
|
|
# The update config call works if there is no config.yaml OR the user is a superuser. First
|
|
|
|
# try writing it without a superuser present.
|
|
|
|
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
|
|
|
self.assertTrue(json['exists'])
|
|
|
|
|
2015-01-09 21:23:31 +00:00
|
|
|
# Verify the config file exists.
|
2015-07-27 15:17:44 +00:00
|
|
|
self.assertTrue(config.config_exists())
|
2015-01-08 17:53:36 +00:00
|
|
|
|
|
|
|
# Try writing it again. This should now fail, since the config.yaml exists.
|
|
|
|
self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403)
|
|
|
|
|
|
|
|
# Login as a non-superuser.
|
|
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
|
|
|
|
# Try writing it again. This should fail.
|
|
|
|
self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403)
|
|
|
|
|
|
|
|
# Login as a superuser.
|
|
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
|
|
|
|
# This should succeed.
|
|
|
|
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'))
|
|
|
|
self.assertTrue(json['exists'])
|
|
|
|
|
|
|
|
json = self.getJsonResponse(SuperUserConfig)
|
|
|
|
self.assertIsNotNone(json['config'])
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
2017-02-03 21:04:31 +00:00
|
|
|
unittest.main()
|