from StringIO import StringIO from mockldap import MockLdap from data import database, model from util.security.test.test_ssl_util import generate_test_cert from config_app.c_app import app from config_app.config_test import ApiTestCase, all_queues, ADMIN_ACCESS_USER, ADMIN_ACCESS_EMAIL from config_app.config_endpoints.api import api_bp from config_app.config_endpoints.api.superuser import SuperUserCustomCertificate, SuperUserCustomCertificates from config_app.config_endpoints.api.suconfig import SuperUserConfig, SuperUserCreateInitialSuperUser, \ SuperUserConfigFile, SuperUserRegistryStatus try: app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # This blueprint was already registered pass class TestSuperUserCreateInitialSuperUser(ApiTestCase): def test_create_superuser(self): data = { 'username': 'newsuper', 'password': 'password', 'email': 'jschorr+fake@devtable.com', } # Add some fake config. fake_config = { 'AUTHENTICATION_TYPE': 'Database', 'SECRET_KEY': 'fakekey', } self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost')) # Try to write with config. Should 403 since there are users in the DB. self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) # Delete all users in the DB. for user in list(database.User.select()): model.user.delete_user(user, all_queues) # Create the superuser. self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data) # Ensure the user exists in the DB. self.assertIsNotNone(model.user.get_user('newsuper')) # Ensure that the current user is a superuser in the config. json = self.getJsonResponse(SuperUserConfig) self.assertEquals(['newsuper'], json['config']['SUPER_USERS']) # Ensure that the current user is a superuser in memory by trying to call an API # that will fail otherwise. self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) class TestSuperUserConfig(ApiTestCase): def test_get_status_update_config(self): # With no config the status should be 'config-db'. json = self.getJsonResponse(SuperUserRegistryStatus) self.assertEquals('config-db', json['status']) # Add some fake config. fake_config = { 'AUTHENTICATION_TYPE': 'Database', 'SECRET_KEY': 'fakekey', } json = self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost')) self.assertEquals('fakekey', json['config']['SECRET_KEY']) self.assertEquals('fakehost', json['config']['SERVER_HOSTNAME']) self.assertEquals('Database', json['config']['AUTHENTICATION_TYPE']) # With config the status should be 'setup-db'. # TODO(sam): fix this test # json = self.getJsonResponse(SuperUserRegistryStatus) # self.assertEquals('setup-db', json['status']) def test_config_file(self): # Try for an invalid file. Should 404. self.getResponse(SuperUserConfigFile, params=dict(filename='foobar'), expected_code=404) # Try for a valid filename. Should not exist. json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) self.assertFalse(json['exists']) # Add the file. self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), file=(StringIO('my file contents'), 'ssl.cert')) # Should now exist. json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) self.assertTrue(json['exists']) def test_update_with_external_auth(self): # Run a mock LDAP. mockldap = MockLdap({ 'dc=quay,dc=io': { 'dc': ['quay', 'io'] }, 'ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees' }, 'uid=' + ADMIN_ACCESS_USER + ',ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': [ADMIN_ACCESS_USER], 'userPassword': ['password'], 'mail': [ADMIN_ACCESS_EMAIL], }, }) config = { 'AUTHENTICATION_TYPE': 'LDAP', 'LDAP_BASE_DN': ['dc=quay', 'dc=io'], 'LDAP_ADMIN_DN': 'uid=devtable,ou=employees,dc=quay,dc=io', 'LDAP_ADMIN_PASSWD': 'password', 'LDAP_USER_RDN': ['ou=employees'], 'LDAP_UID_ATTR': 'uid', 'LDAP_EMAIL_ATTR': 'mail', } mockldap.start() try: # Write the config with the valid password. self.putResponse(SuperUserConfig, data={'config': config, 'password': 'password', 'hostname': 'foo'}, expected_code=200) # Ensure that the user row has been linked. # TODO(sam): fix this test # self.assertEquals(ADMIN_ACCESS_USER, # model.user.verify_federated_login('ldap', ADMIN_ACCESS_USER).username) finally: mockldap.stop() class TestSuperUserCustomCertificates(ApiTestCase): def test_custom_certificates(self): # Upload a certificate. cert_contents, _ = generate_test_cert(hostname='somecoolhost', san_list=['DNS:bar', 'DNS:baz']) self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'), file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204) # Make sure it is present. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(1, len(json['certs'])) cert_info = json['certs'][0] self.assertEquals('testcert.crt', cert_info['path']) self.assertEquals(set(['somecoolhost', 'bar', 'baz']), set(cert_info['names'])) self.assertFalse(cert_info['expired']) # Remove the certificate. self.deleteResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt')) # Make sure it is gone. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(0, len(json['certs'])) def test_expired_custom_certificate(self): # Upload a certificate. cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10) self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'), file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204) # Make sure it is present. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(1, len(json['certs'])) cert_info = json['certs'][0] self.assertEquals('testcert.crt', cert_info['path']) self.assertEquals(set(['somecoolhost']), set(cert_info['names'])) self.assertTrue(cert_info['expired']) def test_invalid_custom_certificate(self): # Upload an invalid certificate. self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'), file=(StringIO('some contents'), 'testcert.crt'), expected_code=204) # Make sure it is present but invalid. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(1, len(json['certs'])) cert_info = json['certs'][0] self.assertEquals('testcert.crt', cert_info['path']) self.assertEquals('no start line', cert_info['error']) def test_path_sanitization(self): # Upload a certificate. cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10) self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert/../foobar.crt'), file=(StringIO(cert_contents), 'testcert/../foobar.crt'), expected_code=204) # Make sure it is present. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(1, len(json['certs'])) cert_info = json['certs'][0] self.assertEquals('foobar.crt', cert_info['path'])