322 lines
		
	
	
		
			No EOL
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			322 lines
		
	
	
		
			No EOL
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| import redis
 | |
| import moto
 | |
| import json
 | |
| 
 | |
| from httmock import urlmatch, HTTMock
 | |
| 
 | |
| from initdb import setup_database_for_testing, finished_database_for_testing
 | |
| 
 | |
| from util.config.validator import VALIDATORS, ConfigValidationException
 | |
| from util.morecollections import AttrDict
 | |
| 
 | |
| from app import app
 | |
| 
 | |
| class TestValidateConfig(unittest.TestCase):
 | |
|   validated = set([])
 | |
| 
 | |
|   def setUp(self):
 | |
|     setup_database_for_testing(self)
 | |
| 
 | |
|     self.app = app.test_client()
 | |
|     self.ctx = app.test_request_context()
 | |
|     self.ctx.__enter__()
 | |
| 
 | |
|   def tearDown(self):
 | |
|     finished_database_for_testing(self)
 | |
|     self.ctx.__exit__(True, None, None)
 | |
| 
 | |
|   def validate(self, service, config, user=None, password=None):
 | |
|     self.validated.add(service)
 | |
|     config['TESTING'] = True
 | |
|     VALIDATORS[service](config, user, password)
 | |
| 
 | |
|   def test_validate_redis(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing redis hostname'):
 | |
|       self.validate('redis', {})
 | |
| 
 | |
|     with self.assertRaises(redis.ConnectionError):
 | |
|       self.validate('redis', {
 | |
|         'BUILDLOGS_REDIS': {
 | |
|           'host': 'somehost',
 | |
|         },
 | |
|       })
 | |
| 
 | |
|   def test_validate_mail(self):
 | |
|     # Skip mail.
 | |
|     self.validated.add('mail')
 | |
| 
 | |
|   def test_validate_database(self):
 | |
|     with self.assertRaisesRegexp(Exception, 'database not properly initialized'):
 | |
|       self.validate('database', {
 | |
|         'DB_URI': 'mysql://somehost',
 | |
|       })
 | |
| 
 | |
|   def test_validate_jwt(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Verification endpoint'):
 | |
|       self.validate('jwt', {
 | |
|         'AUTHENTICATION_TYPE': 'JWT',
 | |
|       })
 | |
| 
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Issuer ID'):
 | |
|       self.validate('jwt', {
 | |
|         'AUTHENTICATION_TYPE': 'JWT',
 | |
|         'JWT_VERIFY_ENDPOINT': 'somehost',
 | |
|       })
 | |
| 
 | |
|     with self.assertRaisesRegexp(Exception, 'JWT Authentication public key file'):
 | |
|       self.validate('jwt', {
 | |
|         'AUTHENTICATION_TYPE': 'JWT',
 | |
|         'JWT_VERIFY_ENDPOINT': 'somehost',
 | |
|         'JWT_AUTH_ISSUER': 'someissuer',
 | |
|       })
 | |
| 
 | |
|    # TODO(jschorr): Add another test once we switch JWT auth to use the config provider to
 | |
|    # find the file
 | |
| 
 | |
|   def test_validate_registry_storage(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Storage configuration required'):
 | |
|       self.validate('registry-storage', {})
 | |
| 
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Locally mounted directory not'):
 | |
|       self.validate('registry-storage', {
 | |
|         'FEATURE_STORAGE_REPLICATION': True,
 | |
|         'DISTRIBUTED_STORAGE_CONFIG': {
 | |
|           'default': ('LocalStorage', {
 | |
|             'storage_path': '',
 | |
|           }),
 | |
|         }
 | |
|       })
 | |
| 
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'No such file or directory'):
 | |
|       self.validate('registry-storage', {
 | |
|         'DISTRIBUTED_STORAGE_CONFIG': {
 | |
|           'default': ('LocalStorage', {
 | |
|             'storage_path': '',
 | |
|           }),
 | |
|         }
 | |
|       })
 | |
| 
 | |
|     with moto.mock_s3():
 | |
|       with self.assertRaisesRegexp(ConfigValidationException, 'S3ResponseError: 404 Not Found'):
 | |
|         self.validate('registry-storage', {
 | |
|           'DISTRIBUTED_STORAGE_CONFIG': {
 | |
|             'default': ('S3Storage', {
 | |
|               's3_access_key': 'invalid',
 | |
|               's3_secret_key': 'invalid',
 | |
|               's3_bucket': 'somebucket',
 | |
|               'storage_path': ''
 | |
|             }),
 | |
|           }
 | |
|         })
 | |
| 
 | |
|   def test_validate_bittorrent(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing announce URL'):
 | |
|       self.validate('bittorrent', {})
 | |
| 
 | |
|     announcer_hit = [False]
 | |
| 
 | |
|     @urlmatch(netloc=r'somehost', path='/announce')
 | |
|     def handler(url, request):
 | |
|       announcer_hit[0] = True
 | |
|       return {'status_code': 200, 'content': ''}
 | |
| 
 | |
|     with HTTMock(handler):
 | |
|       self.validate('bittorrent', {
 | |
|         'BITTORRENT_ANNOUNCE_URL': 'http://somehost/announce',
 | |
|       })
 | |
| 
 | |
|       self.assertTrue(announcer_hit[0])
 | |
| 
 | |
|   def test_validate_ssl(self):
 | |
|     self.validate('ssl', {
 | |
|       'PREFERRED_URL_SCHEME': 'http',
 | |
|     })
 | |
| 
 | |
|     self.validate('ssl', {
 | |
|       'PREFERRED_URL_SCHEME': 'https',
 | |
|       'EXTERNAL_TLS_TERMINATION': True,
 | |
|     })
 | |
| 
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing required SSL file'):
 | |
|       self.validate('ssl', {
 | |
|         'PREFERRED_URL_SCHEME': 'https',
 | |
|       })
 | |
| 
 | |
|     # TODO(jschorr): Add SSL verification tests once file lookup is fixed.
 | |
| 
 | |
|   def test_validate_keystone(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException,
 | |
|                                  'Verification of superuser someuser failed'):
 | |
|       self.validate('keystone', {
 | |
|         'AUTHENTICATION_TYPE': 'Keystone',
 | |
|         'KEYSTONE_AUTH_URL': 'somehost',
 | |
|         'KEYSTONE_AUTH_VERSION': 2,
 | |
|         'KEYSTONE_ADMIN_USERNAME': 'someusername',
 | |
|         'KEYSTONE_ADMIN_PASSWORD': 'somepassword',
 | |
|         'KEYSTONE_ADMIN_TENANT': 'sometenant',
 | |
|       }, user=AttrDict(dict(username='someuser')))
 | |
| 
 | |
|   def test_validate_ldap(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin DN for LDAP'):
 | |
|       self.validate('ldap', {
 | |
|         'AUTHENTICATION_TYPE': 'LDAP',
 | |
|       })
 | |
| 
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin Password for LDAP'):
 | |
|       self.validate('ldap', {
 | |
|         'AUTHENTICATION_TYPE': 'LDAP',
 | |
|         'LDAP_ADMIN_DN': 'somedn',
 | |
|       })
 | |
| 
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Can\'t contact LDAP server'):
 | |
|       self.validate('ldap', {
 | |
|         'AUTHENTICATION_TYPE': 'LDAP',
 | |
|         'LDAP_ADMIN_DN': 'somedn',
 | |
|         'LDAP_ADMIN_PASSWD': 'somepass',
 | |
|         'LDAP_URI': 'ldap://localhost',
 | |
|       })
 | |
| 
 | |
|   def test_validate_signer(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Unknown signing engine'):
 | |
|       self.validate('signer', {
 | |
|         'SIGNING_ENGINE': 'foobar',
 | |
|       })
 | |
| 
 | |
|   def test_validate_security_scanner(self):
 | |
|     url_hit = [False]
 | |
|     @urlmatch(netloc=r'somehost')
 | |
|     def handler(url, request):
 | |
|       url_hit[0] = True
 | |
|       return {'status_code': 200, 'content': ''}
 | |
| 
 | |
|     with HTTMock(handler):
 | |
|       self.validate('security-scanner', {
 | |
|         'DISTRIBUTED_STORAGE_PREFERENCE': ['local'],
 | |
|         'DISTRIBUTED_STORAGE_CONFIG': {
 | |
|           'default': ('LocalStorage', {
 | |
|             'storage_path': '',
 | |
|           }),
 | |
|         },
 | |
|         'SECURITY_SCANNER_ENDPOINT': 'http://somehost',
 | |
|       })
 | |
| 
 | |
| 
 | |
|   def test_validate_github_trigger(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'):
 | |
|       self.validate('github-trigger', {})
 | |
| 
 | |
|     url_hit = [False]
 | |
|     @urlmatch(netloc=r'somehost')
 | |
|     def handler(url, request):
 | |
|       url_hit[0] = True
 | |
|       return {'status_code': 200, 'content': ''}
 | |
| 
 | |
|     with HTTMock(handler):
 | |
|       with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'):
 | |
|         self.validate('github-trigger', {
 | |
|           'GITHUB_TRIGGER_CONFIG': {
 | |
|             'GITHUB_ENDPOINT': 'http://somehost',
 | |
|             'CLIENT_ID': 'foo',
 | |
|             'CLIENT_SECRET': 'bar',
 | |
|           },
 | |
|         })
 | |
| 
 | |
|       self.assertTrue(url_hit[0])
 | |
| 
 | |
|   def test_validate_github_login(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'):
 | |
|       self.validate('github-login', {})
 | |
| 
 | |
|     url_hit = [False]
 | |
|     @urlmatch(netloc=r'somehost')
 | |
|     def handler(url, request):
 | |
|       url_hit[0] = True
 | |
|       return {'status_code': 200, 'content': ''}
 | |
| 
 | |
|     with HTTMock(handler):
 | |
|       with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'):
 | |
|         self.validate('github-login', {
 | |
|           'GITHUB_LOGIN_CONFIG': {
 | |
|             'GITHUB_ENDPOINT': 'http://somehost',
 | |
|             'CLIENT_ID': 'foo',
 | |
|             'CLIENT_SECRET': 'bar',
 | |
|           },
 | |
|         })
 | |
| 
 | |
|       self.assertTrue(url_hit[0])
 | |
| 
 | |
|   def test_validate_bitbucket_trigger(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'):
 | |
|       self.validate('bitbucket-trigger', {})
 | |
| 
 | |
|     url_hit = [False]
 | |
|     @urlmatch(netloc=r'bitbucket.org')
 | |
|     def handler(url, request):
 | |
|       url_hit[0] = True
 | |
|       return {
 | |
|         'status_code': 200,
 | |
|         'content': 'oauth_token=foo&oauth_token_secret=bar',
 | |
|       }
 | |
| 
 | |
|     with HTTMock(handler):
 | |
|       self.validate('bitbucket-trigger', {
 | |
|         'BITBUCKET_TRIGGER_CONFIG': {
 | |
|           'CONSUMER_KEY': 'foo',
 | |
|           'CONSUMER_SECRET': 'bar',
 | |
|         },
 | |
|       })
 | |
| 
 | |
|       self.assertTrue(url_hit[0])
 | |
| 
 | |
|   def test_validate_google_login(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'):
 | |
|       self.validate('google-login', {})
 | |
| 
 | |
|     url_hit = [False]
 | |
|     @urlmatch(netloc=r'www.googleapis.com', path='/oauth2/v3/token')
 | |
|     def handler(url, request):
 | |
|       url_hit[0] = True
 | |
|       return {'status_code': 200, 'content': ''}
 | |
| 
 | |
|     with HTTMock(handler):
 | |
|       self.validate('google-login', {
 | |
|         'GOOGLE_LOGIN_CONFIG': {
 | |
|           'CLIENT_ID': 'foo',
 | |
|           'CLIENT_SECRET': 'bar',
 | |
|         },
 | |
|       })
 | |
| 
 | |
|       self.assertTrue(url_hit[0])
 | |
| 
 | |
|   def test_validate_gitlab_trigger(self):
 | |
|     with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitLab client id'):
 | |
|       self.validate('gitlab-trigger', {})
 | |
| 
 | |
|     url_hit = [False]
 | |
|     @urlmatch(netloc=r'somegitlab', path='/oauth/token')
 | |
|     def handler(url, request):
 | |
|       url_hit[0] = True
 | |
|       return {'status_code': 200, 'content': '{}'}
 | |
| 
 | |
|     with HTTMock(handler):
 | |
|       with self.assertRaisesRegexp(ConfigValidationException, "Invalid client id or client secret"):
 | |
|         self.validate('gitlab-trigger', {
 | |
|           'GITLAB_TRIGGER_CONFIG': {
 | |
|             'GITLAB_ENDPOINT': 'http://somegitlab',
 | |
|             'CLIENT_ID': 'foo',
 | |
|             'CLIENT_SECRET': 'bar',
 | |
|           },
 | |
|         })
 | |
| 
 | |
|       self.assertTrue(url_hit[0])
 | |
| 
 | |
| 
 | |
|   @classmethod
 | |
|   def tearDownClass(cls):
 | |
|     not_run = set(VALIDATORS.keys()) - cls.validated
 | |
|     assert not not_run, not_run
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|   unittest.main() |