diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 7903fe012..89ea3e7ad 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -6,6 +6,7 @@ import logging import re import json as py_json +from StringIO import StringIO from urllib import urlencode from urlparse import urlparse, urlunparse, parse_qs @@ -14,7 +15,7 @@ from playhouse.test_utils import assert_query_count, _QueryLogHandler from endpoints.api import api_bp, api from endpoints.building import PreparedBuild from endpoints.webhooks import webhooks -from app import app +from app import app, config_provider from buildtrigger.basehandler import BuildTriggerHandler from initdb import setup_database_for_testing, finished_database_for_testing from data import database, model @@ -50,7 +51,9 @@ from endpoints.api.organization import (OrganizationList, OrganizationMember, from endpoints.api.repository import RepositoryList, RepositoryVisibility, Repository from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission, RepositoryTeamPermissionList, RepositoryUserPermissionList) -from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserManagement) +from endpoints.api.superuser import SuperUserLogs, SuperUserList, SuperUserManagement +from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, + SuperUserCreateInitialSuperUser) try: app.register_blueprint(api_bp, url_prefix='/api') @@ -116,6 +119,7 @@ class ApiTestCase(unittest.TestCase): def tearDown(self): finished_database_for_testing(self) + config_provider.clear() self.ctx.__exit__(True, None, None) def setCsrfToken(self, token): @@ -129,10 +133,15 @@ class ApiTestCase(unittest.TestCase): parsed = py_json.loads(data) return parsed - def postResponse(self, resource_name, params={}, data={}, expected_code=200): - rv = self.app.post(self.url_for(resource_name, params), - data=py_json.dumps(data), - headers={"Content-Type": "application/json"}) + def postResponse(self, resource_name, params={}, data={}, file=None, expected_code=200): + data = py_json.dumps(data) + headers = {"Content-Type": "application/json"} + + if file is not None: + data = {'file': file} + headers = None + + rv = self.app.post(self.url_for(resource_name, params), data=data, headers=headers) self.assertEquals(rv.status_code, expected_code) return rv.data @@ -3322,6 +3331,98 @@ class TestSuperUserList(ApiTestCase): assert len(json['users']) > 0 +class TestSuperUserCreateInitialSuperUser(ApiTestCase): + def test_create_superuser(self): + data = { + 'username': 'newsuper', + 'password': 'password', + 'email': 'jschorr+fake@devtable.com', + } + + # Try to write before some config. Should 403. + self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) + + # Add some fake config. + fake_config = { + 'AUTHENTICATION_TYPE': 'Database', + 'SECRET_KEY': 'fakekey', + } + + self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost')) + + # Try to write with config. Should 403 since there are users in the DB. + self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) + + # Delete all users in the DB. + for user in list(database.User.select()): + user.delete_instance(recursive=True) + + # Create the superuser. + self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data) + + # Ensure the user exists in the DB. + self.assertIsNotNone(model.user.get_user('newsuper')) + + # Ensure that the current user is newsuper. + json = self.getJsonResponse(User) + self.assertEquals('newsuper', json['username']) + + # Ensure that the current user is a superuser in the config. + json = self.getJsonResponse(SuperUserConfig) + self.assertEquals(['newsuper'], json['config']['SUPER_USERS']) + + # Ensure that the current user is a superuser in memory by trying to call an API + # that will fail otherwise. + self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) + + +class TestSuperUserConfig(ApiTestCase): + def test_get_status_update_config(self): + # With no config the status should be 'config-db'. + json = self.getJsonResponse(SuperUserRegistryStatus) + self.assertEquals('config-db', json['status']) + + # And the config should 401. + self.getResponse(SuperUserConfig, expected_code=401) + + # Add some fake config. + fake_config = { + 'AUTHENTICATION_TYPE': 'Database', + 'SECRET_KEY': 'fakekey', + } + + json = self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost')) + self.assertEquals('fakekey', json['config']['SECRET_KEY']) + self.assertEquals('fakehost', json['config']['SERVER_HOSTNAME']) + self.assertEquals('Database', json['config']['AUTHENTICATION_TYPE']) + + # With config the status should be 'setup-db'. + json = self.getJsonResponse(SuperUserRegistryStatus) + self.assertEquals('setup-db', json['status']) + + def test_config_file(self): + # Try without an account. Should 403. + self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) + + # Login to a superuser. + self.login(ADMIN_ACCESS_USER) + + # Try for an invalid file. Should 404. + self.getResponse(SuperUserConfigFile, params=dict(filename='foobar'), expected_code=404) + + # Try for a valid filename. Should not exist. + json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) + self.assertFalse(json['exists']) + + # Add the file. + self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), + file=(StringIO('my file contents'), 'ssl.cert')) + + # Should now exist. + json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) + self.assertTrue(json['exists']) + + class TestSuperUserManagement(ApiTestCase): def test_get_user(self): self.login(ADMIN_ACCESS_USER) diff --git a/util/config/provider/testprovider.py b/util/config/provider/testprovider.py index 96ccf77ae..7c8e1054f 100644 --- a/util/config/provider/testprovider.py +++ b/util/config/provider/testprovider.py @@ -7,8 +7,11 @@ class TestConfigProvider(BaseProvider): """ Implementation of the config provider for testing. Everything is kept in-memory instead on the real file system. """ def __init__(self): + self.clear() + + def clear(self): self.files = {} - self._config = None + self._config = {} @property def provider_id(self):