from test.test_api_usage import ApiTestCase, READ_ACCESS_USER, ADMIN_ACCESS_USER from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, SuperUserCreateInitialSuperUser, SuperUserConfigValidate) from app import config_provider, all_queues from data.database import User from data import model import unittest class ConfigForTesting(object): def __enter__(self): config_provider.reset_for_test() return config_provider def __exit__(self, type, value, traceback): config_provider.reset_for_test() class TestSuperUserRegistryStatus(ApiTestCase): def test_registry_status(self): with ConfigForTesting(): json = self.getJsonResponse(SuperUserRegistryStatus) self.assertEquals('upload-license', json['status']) class TestSuperUserConfigFile(ApiTestCase): def test_get_non_superuser(self): with ConfigForTesting(): # No user. self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) # Non-superuser. self.login(READ_ACCESS_USER) self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) def test_get_superuser_invalid_filename(self): with ConfigForTesting(): self.login(ADMIN_ACCESS_USER) self.getResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404) def test_get_superuser(self): with ConfigForTesting(): self.login(ADMIN_ACCESS_USER) result = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) self.assertFalse(result['exists']) def test_post_non_superuser(self): with ConfigForTesting(): # No user, before config.yaml exists. self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400) # Write some config. self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) # No user, with config.yaml. self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) # Non-superuser. self.login(READ_ACCESS_USER) self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) def test_post_superuser_invalid_filename(self): with ConfigForTesting(): self.login(ADMIN_ACCESS_USER) self.postResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404) def test_post_superuser(self): with ConfigForTesting(): self.login(ADMIN_ACCESS_USER) self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400) class TestSuperUserCreateInitialSuperUser(ApiTestCase): def test_no_config_file(self): with ConfigForTesting(): # If there is no config.yaml, then this method should security fail. data = dict(username='cooluser', password='password', email='fake@example.com') self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) def test_config_file_with_db_users(self): with ConfigForTesting(): # Write some config. self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) # If there is a config.yaml, but existing DB users exist, then this method should security # fail. data = dict(username='cooluser', password='password', email='fake@example.com') self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) def test_config_file_with_no_db_users(self): with ConfigForTesting(): # Write some config. self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) # Delete all the users in the DB. for user in list(User.select()): model.user.delete_user(user, all_queues, force=True) # This method should now succeed. data = dict(username='cooluser', password='password', email='fake@example.com') result = self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data) self.assertTrue(result['status']) # Verify the superuser was created. User.get(User.username == 'cooluser') # Verify the superuser was placed into the config. result = self.getJsonResponse(SuperUserConfig) self.assertEquals(['cooluser'], result['config']['SUPER_USERS']) class TestSuperUserConfigValidate(ApiTestCase): def test_nonsuperuser_noconfig(self): with ConfigForTesting(): self.login(ADMIN_ACCESS_USER) result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'), data=dict(config={})) self.assertFalse(result['status']) def test_nonsuperuser_config(self): with ConfigForTesting(): # The validate config call works if there is no config.yaml OR the user is a superuser. # Add a config, and verify it breaks when unauthenticated. json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) self.assertTrue(json['exists']) self.postResponse(SuperUserConfigValidate, params=dict(service='someservice'), data=dict(config={}), expected_code=403) # Now login as a superuser. self.login(ADMIN_ACCESS_USER) result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'), data=dict(config={})) self.assertFalse(result['status']) class TestSuperUserConfig(ApiTestCase): def test_get_non_superuser(self): with ConfigForTesting(): # No user. self.getResponse(SuperUserConfig, expected_code=401) # Non-superuser. self.login(READ_ACCESS_USER) self.getResponse(SuperUserConfig, expected_code=403) def test_get_superuser(self): with ConfigForTesting(): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserConfig) # Note: We expect the config to be none because a config.yaml should never be checked into # the directory. self.assertIsNone(json['config']) def test_put(self): with ConfigForTesting() as config: # The update config call works if there is no config.yaml OR the user is a superuser. First # try writing it without a superuser present. json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) self.assertTrue(json['exists']) # Verify the config file exists. self.assertTrue(config.config_exists()) # Try writing it again. This should now fail, since the config.yaml exists. self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403) # Login as a non-superuser. self.login(READ_ACCESS_USER) # Try writing it again. This should fail. self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403) # Login as a superuser. self.login(ADMIN_ACCESS_USER) # This should succeed. json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz')) self.assertTrue(json['exists']) json = self.getJsonResponse(SuperUserConfig) self.assertIsNotNone(json['config']) if __name__ == '__main__': unittest.main()