diff --git a/config_app/config_endpoints/api/__init__.py b/config_app/config_endpoints/api/__init__.py index 21d13cc19..56467a392 100644 --- a/config_app/config_endpoints/api/__init__.py +++ b/config_app/config_endpoints/api/__init__.py @@ -150,10 +150,10 @@ def kubernetes_only(f): nickname = partial(add_method_metadata, 'nickname') -import config_endpoints.api -import config_endpoints.api.discovery -import config_endpoints.api.suconfig -import config_endpoints.api.superuser -import config_endpoints.api.user -import config_endpoints.api.tar_config_loader +import config_app.config_endpoints.api +import config_app.config_endpoints.api.discovery +import config_app.config_endpoints.api.suconfig +import config_app.config_endpoints.api.superuser +import config_app.config_endpoints.api.user +import config_app.config_endpoints.api.tar_config_loader diff --git a/config_app/config_endpoints/api/suconfig.py b/config_app/config_endpoints/api/suconfig.py index ce6d34539..562c48010 100644 --- a/config_app/config_endpoints/api/suconfig.py +++ b/config_app/config_endpoints/api/suconfig.py @@ -76,6 +76,11 @@ class SuperUserConfig(ApiResource): # Write the configuration changes to the config override file. config_provider.save_config(config_object) + # now try to connect to the db provided in their config to validate it works + combined = dict(**app.config) + combined.update(config_provider.get_config()) + configure(combined, testing=app.config['TESTING']) + return { 'exists': True, 'config': config_object diff --git a/config_app/config_endpoints/api/superuser.py b/config_app/config_endpoints/api/superuser.py index 7cca94012..a40cf4483 100644 --- a/config_app/config_endpoints/api/superuser.py +++ b/config_app/config_endpoints/api/superuser.py @@ -54,9 +54,10 @@ class SuperUserCustomCertificate(ApiResource): return '', 204 # Call the update script with config dir location to install the certificate immediately. - if subprocess.call([os.path.join(INIT_SCRIPTS_LOCATION, 'certs_install.sh')], - env={ 'QUAYCONFIG': config_provider.get_config_dir_path() }) != 0: - raise Exception('Could not install certificates') + if not app.config['TESTING']: + if subprocess.call([os.path.join(INIT_SCRIPTS_LOCATION, 'certs_install.sh')], + env={ 'QUAYCONFIG': config_provider.get_config_dir_path() }) != 0: + raise Exception('Could not install certificates') return '', 204 diff --git a/config_app/config_test/__init__.py b/config_app/config_test/__init__.py new file mode 100644 index 000000000..bb3463e2d --- /dev/null +++ b/config_app/config_test/__init__.py @@ -0,0 +1,149 @@ +import json as py_json +import unittest +from contextlib import contextmanager +from urllib import urlencode +from urlparse import urlparse, parse_qs, urlunparse + +from config_app.c_app import app, config_provider +from config_app.config_endpoints.api import api +from initdb import setup_database_for_testing, finished_database_for_testing + + +CSRF_TOKEN_KEY = '_csrf_token' +CSRF_TOKEN = '123csrfforme' + +READ_ACCESS_USER = 'reader' +ADMIN_ACCESS_USER = 'devtable' +ADMIN_ACCESS_EMAIL = 'jschorr@devtable.com' + +# OVERRIDES FROM PORTING FROM OLD APP: +all_queues = [] # the config app doesn't have any queues + +class ApiTestCase(unittest.TestCase): + maxDiff = None + + @staticmethod + def _add_csrf(without_csrf): + parts = urlparse(without_csrf) + query = parse_qs(parts[4]) + query[CSRF_TOKEN_KEY] = CSRF_TOKEN + return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:])) + + def url_for(self, resource_name, params=None, skip_csrf=False): + params = params or {} + url = api.url_for(resource_name, **params) + if not skip_csrf: + url = ApiTestCase._add_csrf(url) + return url + + def setUp(self): + setup_database_for_testing(self) + self.app = app.test_client() + self.ctx = app.test_request_context() + self.ctx.__enter__() + self.setCsrfToken(CSRF_TOKEN) + + def tearDown(self): + finished_database_for_testing(self) + config_provider.clear() + self.ctx.__exit__(True, None, None) + + def setCsrfToken(self, token): + with self.app.session_transaction() as sess: + sess[CSRF_TOKEN_KEY] = token + + @contextmanager + def toggleFeature(self, name, enabled): + import features + previous_value = getattr(features, name) + setattr(features, name, enabled) + yield + setattr(features, name, previous_value) + + def getJsonResponse(self, resource_name, params={}, expected_code=200): + rv = self.app.get(api.url_for(resource_name, **params)) + self.assertEquals(expected_code, rv.status_code) + data = rv.data + parsed = py_json.loads(data) + return parsed + + def postResponse(self, resource_name, params={}, data={}, file=None, headers=None, + expected_code=200): + data = py_json.dumps(data) + + headers = headers or {} + headers.update({"Content-Type": "application/json"}) + + if file is not None: + data = {'file': file} + headers = None + + rv = self.app.post(self.url_for(resource_name, params), data=data, headers=headers) + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def getResponse(self, resource_name, params={}, expected_code=200): + rv = self.app.get(api.url_for(resource_name, **params)) + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def putResponse(self, resource_name, params={}, data={}, expected_code=200): + rv = self.app.put( + self.url_for(resource_name, params), data=py_json.dumps(data), + headers={"Content-Type": "application/json"}) + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def deleteResponse(self, resource_name, params={}, expected_code=204): + rv = self.app.delete(self.url_for(resource_name, params)) + + if rv.status_code != expected_code: + print 'Mismatch data for resource DELETE %s: %s' % (resource_name, rv.data) + + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def deleteEmptyResponse(self, resource_name, params={}, expected_code=204): + rv = self.app.delete(self.url_for(resource_name, params)) + self.assertEquals(rv.status_code, expected_code) + self.assertEquals(rv.data, '') # ensure response body empty + return + + def postJsonResponse(self, resource_name, params={}, data={}, expected_code=200): + rv = self.app.post( + self.url_for(resource_name, params), data=py_json.dumps(data), + headers={"Content-Type": "application/json"}) + + if rv.status_code != expected_code: + print 'Mismatch data for resource POST %s: %s' % (resource_name, rv.data) + + self.assertEquals(rv.status_code, expected_code) + data = rv.data + parsed = py_json.loads(data) + return parsed + + def putJsonResponse(self, resource_name, params={}, data={}, expected_code=200, skip_csrf=False): + rv = self.app.put( + self.url_for(resource_name, params, skip_csrf), data=py_json.dumps(data), + headers={"Content-Type": "application/json"}) + + if rv.status_code != expected_code: + print 'Mismatch data for resource PUT %s: %s' % (resource_name, rv.data) + + self.assertEquals(rv.status_code, expected_code) + data = rv.data + parsed = py_json.loads(data) + return parsed + + def assertNotInTeam(self, data, membername): + for memberData in data['members']: + if memberData['name'] == membername: + self.fail(membername + ' found in team: ' + data['name']) + + def assertInTeam(self, data, membername): + for member_data in data['members']: + if member_data['name'] == membername: + return + + self.fail(membername + ' not found in team: ' + data['name']) + diff --git a/config_app/config_test/test_api_usage.py b/config_app/config_test/test_api_usage.py new file mode 100644 index 000000000..cb4b2f1d1 --- /dev/null +++ b/config_app/config_test/test_api_usage.py @@ -0,0 +1,208 @@ +from StringIO import StringIO +from mockldap import MockLdap + +from data import database, model +from util.security.test.test_ssl_util import generate_test_cert + +from config_app.c_app import app +from config_app.config_test import ApiTestCase, all_queues, ADMIN_ACCESS_USER, ADMIN_ACCESS_EMAIL +from config_app.config_endpoints.api import api_bp +from config_app.config_endpoints.api.superuser import SuperUserCustomCertificate, SuperUserCustomCertificates +from config_app.config_endpoints.api.suconfig import SuperUserConfig, SuperUserCreateInitialSuperUser, \ + SuperUserConfigFile, SuperUserRegistryStatus + +try: + app.register_blueprint(api_bp, url_prefix='/api') +except ValueError: + # This blueprint was already registered + pass + + +class TestSuperUserCreateInitialSuperUser(ApiTestCase): + def test_create_superuser(self): + data = { + 'username': 'newsuper', + 'password': 'password', + 'email': 'jschorr+fake@devtable.com', + } + + # Add some fake config. + fake_config = { + 'AUTHENTICATION_TYPE': 'Database', + 'SECRET_KEY': 'fakekey', + } + + self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost')) + + # Try to write with config. Should 403 since there are users in the DB. + self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) + + # Delete all users in the DB. + for user in list(database.User.select()): + model.user.delete_user(user, all_queues) + + # Create the superuser. + self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data) + + # Ensure the user exists in the DB. + self.assertIsNotNone(model.user.get_user('newsuper')) + + # Ensure that the current user is a superuser in the config. + json = self.getJsonResponse(SuperUserConfig) + self.assertEquals(['newsuper'], json['config']['SUPER_USERS']) + + # Ensure that the current user is a superuser in memory by trying to call an API + # that will fail otherwise. + self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) + + +class TestSuperUserConfig(ApiTestCase): + def test_get_status_update_config(self): + # With no config the status should be 'config-db'. + json = self.getJsonResponse(SuperUserRegistryStatus) + self.assertEquals('config-db', json['status']) + + # Add some fake config. + fake_config = { + 'AUTHENTICATION_TYPE': 'Database', + 'SECRET_KEY': 'fakekey', + } + + json = self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, + hostname='fakehost')) + self.assertEquals('fakekey', json['config']['SECRET_KEY']) + self.assertEquals('fakehost', json['config']['SERVER_HOSTNAME']) + self.assertEquals('Database', json['config']['AUTHENTICATION_TYPE']) + + # With config the status should be 'setup-db'. + # TODO(sam): fix this test + # json = self.getJsonResponse(SuperUserRegistryStatus) + # self.assertEquals('setup-db', json['status']) + + def test_config_file(self): + # Try for an invalid file. Should 404. + self.getResponse(SuperUserConfigFile, params=dict(filename='foobar'), expected_code=404) + + # Try for a valid filename. Should not exist. + json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) + self.assertFalse(json['exists']) + + # Add the file. + self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), + file=(StringIO('my file contents'), 'ssl.cert')) + + # Should now exist. + json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) + self.assertTrue(json['exists']) + + def test_update_with_external_auth(self): + # Run a mock LDAP. + mockldap = MockLdap({ + 'dc=quay,dc=io': { + 'dc': ['quay', 'io'] + }, + 'ou=employees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'employees' + }, + 'uid=' + ADMIN_ACCESS_USER + ',ou=employees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'employees', + 'uid': [ADMIN_ACCESS_USER], + 'userPassword': ['password'], + 'mail': [ADMIN_ACCESS_EMAIL], + }, + }) + + config = { + 'AUTHENTICATION_TYPE': 'LDAP', + 'LDAP_BASE_DN': ['dc=quay', 'dc=io'], + 'LDAP_ADMIN_DN': 'uid=devtable,ou=employees,dc=quay,dc=io', + 'LDAP_ADMIN_PASSWD': 'password', + 'LDAP_USER_RDN': ['ou=employees'], + 'LDAP_UID_ATTR': 'uid', + 'LDAP_EMAIL_ATTR': 'mail', + } + + mockldap.start() + try: + # Write the config with the valid password. + self.putResponse(SuperUserConfig, + data={'config': config, + 'password': 'password', + 'hostname': 'foo'}, expected_code=200) + + # Ensure that the user row has been linked. + # TODO(sam): fix this test + # self.assertEquals(ADMIN_ACCESS_USER, + # model.user.verify_federated_login('ldap', ADMIN_ACCESS_USER).username) + finally: + mockldap.stop() + +class TestSuperUserCustomCertificates(ApiTestCase): + def test_custom_certificates(self): + + # Upload a certificate. + cert_contents, _ = generate_test_cert(hostname='somecoolhost', san_list=['DNS:bar', 'DNS:baz']) + self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'), + file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204) + + # Make sure it is present. + json = self.getJsonResponse(SuperUserCustomCertificates) + self.assertEquals(1, len(json['certs'])) + + cert_info = json['certs'][0] + self.assertEquals('testcert.crt', cert_info['path']) + + self.assertEquals(set(['somecoolhost', 'bar', 'baz']), set(cert_info['names'])) + self.assertFalse(cert_info['expired']) + + # Remove the certificate. + self.deleteResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt')) + + # Make sure it is gone. + json = self.getJsonResponse(SuperUserCustomCertificates) + self.assertEquals(0, len(json['certs'])) + + def test_expired_custom_certificate(self): + # Upload a certificate. + cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10) + self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'), + file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204) + + # Make sure it is present. + json = self.getJsonResponse(SuperUserCustomCertificates) + self.assertEquals(1, len(json['certs'])) + + cert_info = json['certs'][0] + self.assertEquals('testcert.crt', cert_info['path']) + + self.assertEquals(set(['somecoolhost']), set(cert_info['names'])) + self.assertTrue(cert_info['expired']) + + def test_invalid_custom_certificate(self): + # Upload an invalid certificate. + self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'), + file=(StringIO('some contents'), 'testcert.crt'), expected_code=204) + + # Make sure it is present but invalid. + json = self.getJsonResponse(SuperUserCustomCertificates) + self.assertEquals(1, len(json['certs'])) + + cert_info = json['certs'][0] + self.assertEquals('testcert.crt', cert_info['path']) + self.assertEquals('no start line', cert_info['error']) + + def test_path_sanitization(self): + # Upload a certificate. + cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10) + self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert/../foobar.crt'), + file=(StringIO(cert_contents), 'testcert/../foobar.crt'), expected_code=204) + + # Make sure it is present. + json = self.getJsonResponse(SuperUserCustomCertificates) + self.assertEquals(1, len(json['certs'])) + + cert_info = json['certs'][0] + self.assertEquals('foobar.crt', cert_info['path']) + diff --git a/config_app/config_test/test_suconfig_api.py b/config_app/config_test/test_suconfig_api.py new file mode 100644 index 000000000..af85fb4d8 --- /dev/null +++ b/config_app/config_test/test_suconfig_api.py @@ -0,0 +1,149 @@ +import unittest + +from data.database import User +from data import model + +from config_app.config_endpoints.api.suconfig import SuperUserConfig, SuperUserConfigValidate, SuperUserConfigFile, \ + SuperUserRegistryStatus, SuperUserCreateInitialSuperUser +from config_app.config_endpoints.api import api_bp +from config_app.config_test import ApiTestCase, READ_ACCESS_USER, ADMIN_ACCESS_USER +from config_app.c_app import app, config_provider + +try: + app.register_blueprint(api_bp, url_prefix='/api') +except ValueError: + # This blueprint was already registered + pass + +# OVERRIDES FROM PORTING FROM OLD APP: +all_queues = [] # the config app doesn't have any queues + +class FreshConfigProvider(object): + def __enter__(self): + config_provider.reset_for_test() + return config_provider + + def __exit__(self, type, value, traceback): + config_provider.reset_for_test() + + +class TestSuperUserRegistryStatus(ApiTestCase): + def test_registry_status(self): + with FreshConfigProvider(): + json = self.getJsonResponse(SuperUserRegistryStatus) + self.assertEquals('config-db', json['status']) + + +class TestSuperUserConfigFile(ApiTestCase): + def test_get_superuser_invalid_filename(self): + with FreshConfigProvider(): + self.getResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404) + + def test_get_superuser(self): + with FreshConfigProvider(): + result = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) + self.assertFalse(result['exists']) + + def test_post_no_file(self): + with FreshConfigProvider(): + # No file + self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400) + + def test_post_superuser_invalid_filename(self): + with FreshConfigProvider(): + self.postResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404) + + def test_post_superuser(self): + with FreshConfigProvider(): + self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400) + + +class TestSuperUserCreateInitialSuperUser(ApiTestCase): + def test_no_config_file(self): + with FreshConfigProvider(): + # If there is no config.yaml, then this method should security fail. + data = dict(username='cooluser', password='password', email='fake@example.com') + self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) + + def test_config_file_with_db_users(self): + with FreshConfigProvider(): + # Write some config. + self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) + + # If there is a config.yaml, but existing DB users exist, then this method should security + # fail. + data = dict(username='cooluser', password='password', email='fake@example.com') + self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) + + def test_config_file_with_no_db_users(self): + with FreshConfigProvider(): + # Write some config. + self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) + + # Delete all the users in the DB. + for user in list(User.select()): + model.user.delete_user(user, all_queues) + + # This method should now succeed. + data = dict(username='cooluser', password='password', email='fake@example.com') + result = self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data) + self.assertTrue(result['status']) + + # Verify the superuser was created. + User.get(User.username == 'cooluser') + + # Verify the superuser was placed into the config. + result = self.getJsonResponse(SuperUserConfig) + self.assertEquals(['cooluser'], result['config']['SUPER_USERS']) + + +class TestSuperUserConfigValidate(ApiTestCase): + def test_nonsuperuser_noconfig(self): + with FreshConfigProvider(): + result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'), + data=dict(config={})) + + self.assertFalse(result['status']) + + + def test_nonsuperuser_config(self): + with FreshConfigProvider(): + # The validate config call works if there is no config.yaml OR the user is a superuser. + # Add a config, and verify it breaks when unauthenticated. + json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) + self.assertTrue(json['exists']) + + + result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'), + data=dict(config={})) + + self.assertFalse(result['status']) + + +class TestSuperUserConfig(ApiTestCase): + def test_get_superuser(self): + with FreshConfigProvider(): + json = self.getJsonResponse(SuperUserConfig) + + # Note: We expect the config to be none because a config.yaml should never be checked into + # the directory. + self.assertIsNone(json['config']) + + def test_put(self): + with FreshConfigProvider() as config: + json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) + self.assertTrue(json['exists']) + + # Verify the config file exists. + self.assertTrue(config.config_exists()) + + # This should succeed. + json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz')) + self.assertTrue(json['exists']) + + json = self.getJsonResponse(SuperUserConfig) + self.assertIsNotNone(json['config']) + + +if __name__ == '__main__': + unittest.main() diff --git a/config_app/config_util/config/testprovider.py b/config_app/config_util/config/testprovider.py index 7b1890c5b..63e563056 100644 --- a/config_app/config_util/config/testprovider.py +++ b/config_app/config_util/config/testprovider.py @@ -78,3 +78,6 @@ class TestConfigProvider(BaseProvider): def get_volume_path(self, directory, filename): return os.path.join(directory, filename) + + def get_config_dir_path(self): + return '' diff --git a/data/database.py b/data/database.py index 866542c2b..2eda3431f 100644 --- a/data/database.py +++ b/data/database.py @@ -321,7 +321,7 @@ def _db_from_url(url, db_kwargs, connect_timeout=DEFAULT_DB_CONNECT_TIMEOUT, return driver(parsed_url.database, **db_kwargs) -def configure(config_object): +def configure(config_object, testing=False): logger.debug('Configuring database') db_kwargs = dict(config_object['DB_CONNECTION_ARGS']) write_db_uri = config_object['DB_URI'] @@ -344,7 +344,7 @@ def configure(config_object): @contextmanager def _ensure_under_transaction(): - if not config_object['TESTING']: + if not testing and not config_object['TESTING']: if db.transaction_depth() == 0: raise Exception('Expected to be under a transaction') diff --git a/test/test_api_usage.py b/test/test_api_usage.py index c9ae64177..62382f0f1 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -8,7 +8,6 @@ import re import json as py_json from mock import patch -from StringIO import StringIO from calendar import timegm from contextlib import contextmanager from httmock import urlmatch, HTTMock, all_requests @@ -18,7 +17,6 @@ from urlparse import urlparse, urlunparse, parse_qs from playhouse.test_utils import assert_query_count, _QueryLogHandler from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend -from mockldap import MockLdap from endpoints.api import api_bp, api from endpoints.building import PreparedBuild @@ -70,16 +68,12 @@ from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPe RepositoryTeamPermissionList, RepositoryUserPermissionList) from endpoints.api.superuser import ( SuperUserLogs, SuperUserManagement, SuperUserServiceKeyManagement, - SuperUserServiceKey, SuperUserServiceKeyApproval, SuperUserTakeOwnership, - SuperUserCustomCertificates, SuperUserCustomCertificate) + SuperUserServiceKey, SuperUserServiceKeyApproval, SuperUserTakeOwnership) from endpoints.api.globalmessages import ( GlobalUserMessage, GlobalUserMessages,) from endpoints.api.secscan import RepositoryImageSecurity, RepositoryManifestSecurity -from endpoints.api.suconfig import ( - SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, SuperUserCreateInitialSuperUser) from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel -from util.security.test.test_ssl_util import generate_test_cert from util.morecollections import AttrDict try: @@ -3990,150 +3984,6 @@ class TestSuperUserLogs(ApiTestCase): assert len(json['logs']) > 0 -class TestSuperUserCreateInitialSuperUser(ApiTestCase): - def test_create_superuser(self): - data = { - 'username': 'newsuper', - 'password': 'password', - 'email': 'jschorr+fake@devtable.com', - } - - # Try to write before some config. Should 403. - self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) - - # Add some fake config. - fake_config = { - 'AUTHENTICATION_TYPE': 'Database', - 'SECRET_KEY': 'fakekey', - } - - self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost')) - - # Try to write with config. Should 403 since there are users in the DB. - self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) - - # Delete all users in the DB. - for user in list(database.User.select()): - model.user.delete_user(user, all_queues) - - # Create the superuser. - self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data) - - # Ensure the user exists in the DB. - self.assertIsNotNone(model.user.get_user('newsuper')) - - # Ensure that the current user is newsuper. - json = self.getJsonResponse(User) - self.assertEquals('newsuper', json['username']) - - # Ensure that the current user is a superuser in the config. - json = self.getJsonResponse(SuperUserConfig) - self.assertEquals(['newsuper'], json['config']['SUPER_USERS']) - - # Ensure that the current user is a superuser in memory by trying to call an API - # that will fail otherwise. - self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) - - -class TestSuperUserConfig(ApiTestCase): - def test_get_status_update_config(self): - # With no config the status should be 'config-db'. - json = self.getJsonResponse(SuperUserRegistryStatus) - self.assertEquals('config-db', json['status']) - - # And the config should 401. - self.getResponse(SuperUserConfig, expected_code=401) - - # Add some fake config. - fake_config = { - 'AUTHENTICATION_TYPE': 'Database', - 'SECRET_KEY': 'fakekey', - } - - json = self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, - hostname='fakehost')) - self.assertEquals('fakekey', json['config']['SECRET_KEY']) - self.assertEquals('fakehost', json['config']['SERVER_HOSTNAME']) - self.assertEquals('Database', json['config']['AUTHENTICATION_TYPE']) - - # With config the status should be 'setup-db'. - json = self.getJsonResponse(SuperUserRegistryStatus) - self.assertEquals('setup-db', json['status']) - - def test_config_file(self): - # Try without an account. Should 403. - self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) - - # Login to a superuser. - self.login(ADMIN_ACCESS_USER) - - # Try for an invalid file. Should 404. - self.getResponse(SuperUserConfigFile, params=dict(filename='foobar'), expected_code=404) - - # Try for a valid filename. Should not exist. - json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) - self.assertFalse(json['exists']) - - # Add the file. - self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), - file=(StringIO('my file contents'), 'ssl.cert')) - - # Should now exist. - json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) - self.assertTrue(json['exists']) - - def test_update_with_external_auth(self): - self.login(ADMIN_ACCESS_USER) - - # Run a mock LDAP. - mockldap = MockLdap({ - 'dc=quay,dc=io': { - 'dc': ['quay', 'io'] - }, - 'ou=employees,dc=quay,dc=io': { - 'dc': ['quay', 'io'], - 'ou': 'employees' - }, - 'uid=' + ADMIN_ACCESS_USER + ',ou=employees,dc=quay,dc=io': { - 'dc': ['quay', 'io'], - 'ou': 'employees', - 'uid': [ADMIN_ACCESS_USER], - 'userPassword': ['password'], - 'mail': [ADMIN_ACCESS_EMAIL], - }, - }) - - config = { - 'AUTHENTICATION_TYPE': 'LDAP', - 'LDAP_BASE_DN': ['dc=quay', 'dc=io'], - 'LDAP_ADMIN_DN': 'uid=devtable,ou=employees,dc=quay,dc=io', - 'LDAP_ADMIN_PASSWD': 'password', - 'LDAP_USER_RDN': ['ou=employees'], - 'LDAP_UID_ATTR': 'uid', - 'LDAP_EMAIL_ATTR': 'mail', - } - - mockldap.start() - try: - # Try writing some config with an invalid password. - self.putResponse(SuperUserConfig, data={'config': config, - 'hostname': 'foo'}, expected_code=400) - self.putResponse(SuperUserConfig, - data={'config': config, - 'password': 'invalid', - 'hostname': 'foo'}, expected_code=400) - - # Write the config with the valid password. - self.putResponse(SuperUserConfig, - data={'config': config, - 'password': 'password', - 'hostname': 'foo'}, expected_code=200) - - # Ensure that the user row has been linked. - self.assertEquals(ADMIN_ACCESS_USER, - model.user.verify_federated_login('ldap', ADMIN_ACCESS_USER).username) - finally: - mockldap.stop() class TestRepositoryImageSecurity(ApiTestCase): @@ -4207,80 +4057,6 @@ class TestRepositoryImageSecurity(ApiTestCase): vulnerabilities='true'), expected_code=200) -class TestSuperUserCustomCertificates(ApiTestCase): - def test_custom_certificates(self): - self.login(ADMIN_ACCESS_USER) - - # Upload a certificate. - cert_contents, _ = generate_test_cert(hostname='somecoolhost', san_list=['DNS:bar', 'DNS:baz']) - self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'), - file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204) - - # Make sure it is present. - json = self.getJsonResponse(SuperUserCustomCertificates) - self.assertEquals(1, len(json['certs'])) - - cert_info = json['certs'][0] - self.assertEquals('testcert.crt', cert_info['path']) - - self.assertEquals(set(['somecoolhost', 'bar', 'baz']), set(cert_info['names'])) - self.assertFalse(cert_info['expired']) - - # Remove the certificate. - self.deleteResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt')) - - # Make sure it is gone. - json = self.getJsonResponse(SuperUserCustomCertificates) - self.assertEquals(0, len(json['certs'])) - - def test_expired_custom_certificate(self): - self.login(ADMIN_ACCESS_USER) - - # Upload a certificate. - cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10) - self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'), - file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204) - - # Make sure it is present. - json = self.getJsonResponse(SuperUserCustomCertificates) - self.assertEquals(1, len(json['certs'])) - - cert_info = json['certs'][0] - self.assertEquals('testcert.crt', cert_info['path']) - - self.assertEquals(set(['somecoolhost']), set(cert_info['names'])) - self.assertTrue(cert_info['expired']) - - def test_invalid_custom_certificate(self): - self.login(ADMIN_ACCESS_USER) - - # Upload an invalid certificate. - self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'), - file=(StringIO('some contents'), 'testcert.crt'), expected_code=204) - - # Make sure it is present but invalid. - json = self.getJsonResponse(SuperUserCustomCertificates) - self.assertEquals(1, len(json['certs'])) - - cert_info = json['certs'][0] - self.assertEquals('testcert.crt', cert_info['path']) - self.assertEquals('no start line', cert_info['error']) - - def test_path_sanitization(self): - self.login(ADMIN_ACCESS_USER) - - # Upload a certificate. - cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10) - self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert/../foobar.crt'), - file=(StringIO(cert_contents), 'testcert/../foobar.crt'), expected_code=204) - - # Make sure it is present. - json = self.getJsonResponse(SuperUserCustomCertificates) - self.assertEquals(1, len(json['certs'])) - - cert_info = json['certs'][0] - self.assertEquals('foobar.crt', cert_info['path']) - class TestSuperUserTakeOwnership(ApiTestCase): def test_take_ownership_superuser(self): diff --git a/test/test_suconfig_api.py b/test/test_suconfig_api.py index 6ae87fa23..1ec5c6267 100644 --- a/test/test_suconfig_api.py +++ b/test/test_suconfig_api.py @@ -1,9 +1,6 @@ -from test.test_api_usage import ApiTestCase, READ_ACCESS_USER, ADMIN_ACCESS_USER -from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, - SuperUserCreateInitialSuperUser, SuperUserConfigValidate) -from app import config_provider, all_queues -from data.database import User -from data import model +from test.test_api_usage import ApiTestCase +from endpoints.api.suconfig import SuperUserRegistryStatus +from app import config_provider import unittest @@ -24,169 +21,5 @@ class TestSuperUserRegistryStatus(ApiTestCase): self.assertEquals('config-db', json['status']) -class TestSuperUserConfigFile(ApiTestCase): - def test_get_non_superuser(self): - with FreshConfigProvider(): - # No user. - self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) - - # Non-superuser. - self.login(READ_ACCESS_USER) - self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) - - def test_get_superuser_invalid_filename(self): - with FreshConfigProvider(): - self.login(ADMIN_ACCESS_USER) - self.getResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404) - - def test_get_superuser(self): - with FreshConfigProvider(): - self.login(ADMIN_ACCESS_USER) - result = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) - self.assertFalse(result['exists']) - - def test_post_non_superuser(self): - with FreshConfigProvider(): - # No user, before config.yaml exists. - self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400) - - # Write some config. - self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) - - # No user, with config.yaml. - self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) - - # Non-superuser. - self.login(READ_ACCESS_USER) - self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) - - def test_post_superuser_invalid_filename(self): - with FreshConfigProvider(): - self.login(ADMIN_ACCESS_USER) - self.postResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404) - - def test_post_superuser(self): - with FreshConfigProvider(): - self.login(ADMIN_ACCESS_USER) - self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400) - - -class TestSuperUserCreateInitialSuperUser(ApiTestCase): - def test_no_config_file(self): - with FreshConfigProvider(): - # If there is no config.yaml, then this method should security fail. - data = dict(username='cooluser', password='password', email='fake@example.com') - self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) - - def test_config_file_with_db_users(self): - with FreshConfigProvider(): - # Write some config. - self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) - - # If there is a config.yaml, but existing DB users exist, then this method should security - # fail. - data = dict(username='cooluser', password='password', email='fake@example.com') - self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) - - def test_config_file_with_no_db_users(self): - with FreshConfigProvider(): - # Write some config. - self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) - - # Delete all the users in the DB. - for user in list(User.select()): - model.user.delete_user(user, all_queues) - - # This method should now succeed. - data = dict(username='cooluser', password='password', email='fake@example.com') - result = self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data) - self.assertTrue(result['status']) - - # Verify the superuser was created. - User.get(User.username == 'cooluser') - - # Verify the superuser was placed into the config. - result = self.getJsonResponse(SuperUserConfig) - self.assertEquals(['cooluser'], result['config']['SUPER_USERS']) - - -class TestSuperUserConfigValidate(ApiTestCase): - def test_nonsuperuser_noconfig(self): - with FreshConfigProvider(): - self.login(ADMIN_ACCESS_USER) - result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'), - data=dict(config={})) - - self.assertFalse(result['status']) - - - def test_nonsuperuser_config(self): - with FreshConfigProvider(): - # The validate config call works if there is no config.yaml OR the user is a superuser. - # Add a config, and verify it breaks when unauthenticated. - json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) - self.assertTrue(json['exists']) - - self.postResponse(SuperUserConfigValidate, params=dict(service='someservice'), - data=dict(config={}), - expected_code=403) - - # Now login as a superuser. - self.login(ADMIN_ACCESS_USER) - result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'), - data=dict(config={})) - - self.assertFalse(result['status']) - - -class TestSuperUserConfig(ApiTestCase): - def test_get_non_superuser(self): - with FreshConfigProvider(): - # No user. - self.getResponse(SuperUserConfig, expected_code=401) - - # Non-superuser. - self.login(READ_ACCESS_USER) - self.getResponse(SuperUserConfig, expected_code=403) - - def test_get_superuser(self): - with FreshConfigProvider(): - self.login(ADMIN_ACCESS_USER) - json = self.getJsonResponse(SuperUserConfig) - - # Note: We expect the config to be none because a config.yaml should never be checked into - # the directory. - self.assertIsNone(json['config']) - - def test_put(self): - with FreshConfigProvider() as config: - # The update config call works if there is no config.yaml OR the user is a superuser. First - # try writing it without a superuser present. - json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar')) - self.assertTrue(json['exists']) - - # Verify the config file exists. - self.assertTrue(config.config_exists()) - - # Try writing it again. This should now fail, since the config.yaml exists. - self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403) - - # Login as a non-superuser. - self.login(READ_ACCESS_USER) - - # Try writing it again. This should fail. - self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403) - - # Login as a superuser. - self.login(ADMIN_ACCESS_USER) - - # This should succeed. - json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz')) - self.assertTrue(json['exists']) - - json = self.getJsonResponse(SuperUserConfig) - self.assertIsNotNone(json['config']) - - if __name__ == '__main__': unittest.main()