import unittest import redis import moto import json from httmock import urlmatch, HTTMock from initdb import setup_database_for_testing, finished_database_for_testing from util.config.validator import VALIDATORS, ConfigValidationException from util.morecollections import AttrDict from app import app class TestValidateConfig(unittest.TestCase): validated = set([]) def setUp(self): setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() def tearDown(self): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) def validate(self, service, config, user=None, password=None): self.validated.add(service) config['TESTING'] = True VALIDATORS[service](config, user, password) def test_validate_redis(self): with self.assertRaisesRegexp(ConfigValidationException, 'Missing redis hostname'): self.validate('redis', {}) with self.assertRaises(redis.ConnectionError): self.validate('redis', { 'BUILDLOGS_REDIS': { 'host': 'somehost', }, }) def test_validate_mail(self): # Skip mail. self.validated.add('mail') def test_validate_database(self): with self.assertRaisesRegexp(Exception, 'database not properly initialized'): self.validate('database', { 'DB_URI': 'mysql://somehost', }) def test_validate_jwt(self): with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Verification endpoint'): self.validate('jwt', { 'AUTHENTICATION_TYPE': 'JWT', }) with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Issuer ID'): self.validate('jwt', { 'AUTHENTICATION_TYPE': 'JWT', 'JWT_VERIFY_ENDPOINT': 'somehost', }) with self.assertRaisesRegexp(Exception, 'JWT Authentication public key file'): self.validate('jwt', { 'AUTHENTICATION_TYPE': 'JWT', 'JWT_VERIFY_ENDPOINT': 'somehost', 'JWT_AUTH_ISSUER': 'someissuer', }) # TODO(jschorr): Add another test once we switch JWT auth to use the config provider to # find the file def test_validate_registry_storage(self): with self.assertRaisesRegexp(ConfigValidationException, 'Storage configuration required'): self.validate('registry-storage', {}) with self.assertRaisesRegexp(ConfigValidationException, 'Locally mounted directory not'): self.validate('registry-storage', { 'FEATURE_STORAGE_REPLICATION': True, 'DISTRIBUTED_STORAGE_CONFIG': { 'default': ('LocalStorage', { 'storage_path': '', }), } }) with self.assertRaisesRegexp(ConfigValidationException, 'No such file or directory'): self.validate('registry-storage', { 'DISTRIBUTED_STORAGE_CONFIG': { 'default': ('LocalStorage', { 'storage_path': '', }), } }) with moto.mock_s3(): with self.assertRaisesRegexp(ConfigValidationException, 'S3ResponseError: 404 Not Found'): self.validate('registry-storage', { 'DISTRIBUTED_STORAGE_CONFIG': { 'default': ('S3Storage', { 's3_access_key': 'invalid', 's3_secret_key': 'invalid', 's3_bucket': 'somebucket', 'storage_path': '' }), } }) def test_validate_bittorrent(self): with self.assertRaisesRegexp(ConfigValidationException, 'Missing announce URL'): self.validate('bittorrent', {}) announcer_hit = [False] @urlmatch(netloc=r'somehost', path='/announce') def handler(url, request): announcer_hit[0] = True return {'status_code': 200, 'content': ''} with HTTMock(handler): self.validate('bittorrent', { 'BITTORRENT_ANNOUNCE_URL': 'http://somehost/announce', }) self.assertTrue(announcer_hit[0]) def test_validate_ssl(self): self.validate('ssl', { 'PREFERRED_URL_SCHEME': 'http', }) self.validate('ssl', { 'PREFERRED_URL_SCHEME': 'https', 'EXTERNAL_TLS_TERMINATION': True, }) with self.assertRaisesRegexp(ConfigValidationException, 'Missing required SSL file'): self.validate('ssl', { 'PREFERRED_URL_SCHEME': 'https', }) # TODO(jschorr): Add SSL verification tests once file lookup is fixed. def test_validate_keystone(self): with self.assertRaisesRegexp(ConfigValidationException, 'Verification of superuser someuser failed'): self.validate('keystone', { 'AUTHENTICATION_TYPE': 'Keystone', 'KEYSTONE_AUTH_URL': 'somehost', 'KEYSTONE_AUTH_VERSION': 2, 'KEYSTONE_ADMIN_USERNAME': 'someusername', 'KEYSTONE_ADMIN_PASSWORD': 'somepassword', 'KEYSTONE_ADMIN_TENANT': 'sometenant', }, user=AttrDict(dict(username='someuser'))) def test_validate_ldap(self): with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin DN for LDAP'): self.validate('ldap', { 'AUTHENTICATION_TYPE': 'LDAP', }) with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin Password for LDAP'): self.validate('ldap', { 'AUTHENTICATION_TYPE': 'LDAP', 'LDAP_ADMIN_DN': 'somedn', }) with self.assertRaisesRegexp(ConfigValidationException, 'Can\'t contact LDAP server'): self.validate('ldap', { 'AUTHENTICATION_TYPE': 'LDAP', 'LDAP_ADMIN_DN': 'somedn', 'LDAP_ADMIN_PASSWD': 'somepass', 'LDAP_URI': 'ldap://localhost', }) def test_validate_signer(self): with self.assertRaisesRegexp(ConfigValidationException, 'Unknown signing engine'): self.validate('signer', { 'SIGNING_ENGINE': 'foobar', }) def test_validate_security_scanner(self): url_hit = [False] @urlmatch(netloc=r'somehost') def handler(url, request): url_hit[0] = True return {'status_code': 200, 'content': ''} with HTTMock(handler): self.validate('security-scanner', { 'DISTRIBUTED_STORAGE_PREFERENCE': ['local'], 'DISTRIBUTED_STORAGE_CONFIG': { 'default': ('LocalStorage', { 'storage_path': '', }), }, 'SECURITY_SCANNER_ENDPOINT': 'http://somehost', }) def test_validate_github_trigger(self): with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'): self.validate('github-trigger', {}) url_hit = [False] @urlmatch(netloc=r'somehost') def handler(url, request): url_hit[0] = True return {'status_code': 200, 'content': ''} with HTTMock(handler): with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'): self.validate('github-trigger', { 'GITHUB_TRIGGER_CONFIG': { 'GITHUB_ENDPOINT': 'http://somehost', 'CLIENT_ID': 'foo', 'CLIENT_SECRET': 'bar', }, }) self.assertTrue(url_hit[0]) def test_validate_github_login(self): with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'): self.validate('github-login', {}) url_hit = [False] @urlmatch(netloc=r'somehost') def handler(url, request): url_hit[0] = True return {'status_code': 200, 'content': ''} with HTTMock(handler): with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'): self.validate('github-login', { 'GITHUB_LOGIN_CONFIG': { 'GITHUB_ENDPOINT': 'http://somehost', 'CLIENT_ID': 'foo', 'CLIENT_SECRET': 'bar', }, }) self.assertTrue(url_hit[0]) def test_validate_bitbucket_trigger(self): with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'): self.validate('bitbucket-trigger', {}) url_hit = [False] @urlmatch(netloc=r'bitbucket.org') def handler(url, request): url_hit[0] = True return { 'status_code': 200, 'content': 'oauth_token=foo&oauth_token_secret=bar', } with HTTMock(handler): self.validate('bitbucket-trigger', { 'BITBUCKET_TRIGGER_CONFIG': { 'CONSUMER_KEY': 'foo', 'CONSUMER_SECRET': 'bar', }, }) self.assertTrue(url_hit[0]) def test_validate_google_login(self): with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'): self.validate('google-login', {}) url_hit = [False] @urlmatch(netloc=r'www.googleapis.com', path='/oauth2/v3/token') def handler(url, request): url_hit[0] = True return {'status_code': 200, 'content': ''} with HTTMock(handler): self.validate('google-login', { 'GOOGLE_LOGIN_CONFIG': { 'CLIENT_ID': 'foo', 'CLIENT_SECRET': 'bar', }, }) self.assertTrue(url_hit[0]) def test_validate_gitlab_trigger(self): with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitLab client id'): self.validate('gitlab-trigger', {}) url_hit = [False] @urlmatch(netloc=r'somegitlab', path='/oauth/token') def handler(url, request): url_hit[0] = True return {'status_code': 200, 'content': '{}'} with HTTMock(handler): with self.assertRaisesRegexp(ConfigValidationException, "Invalid client id or client secret"): self.validate('gitlab-trigger', { 'GITLAB_TRIGGER_CONFIG': { 'GITLAB_ENDPOINT': 'http://somegitlab', 'CLIENT_ID': 'foo', 'CLIENT_SECRET': 'bar', }, }) self.assertTrue(url_hit[0]) @classmethod def tearDownClass(cls): not_run = set(VALIDATORS.keys()) - cls.validated assert not not_run, not_run if __name__ == '__main__': unittest.main()