322 lines
9.7 KiB
Python
322 lines
9.7 KiB
Python
|
import unittest
|
||
|
import redis
|
||
|
import moto
|
||
|
import json
|
||
|
|
||
|
from httmock import urlmatch, HTTMock
|
||
|
|
||
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
||
|
|
||
|
from util.config.validator import VALIDATORS, ConfigValidationException
|
||
|
from util.morecollections import AttrDict
|
||
|
|
||
|
from app import app
|
||
|
|
||
|
class TestValidateConfig(unittest.TestCase):
|
||
|
validated = set([])
|
||
|
|
||
|
def setUp(self):
|
||
|
setup_database_for_testing(self)
|
||
|
|
||
|
self.app = app.test_client()
|
||
|
self.ctx = app.test_request_context()
|
||
|
self.ctx.__enter__()
|
||
|
|
||
|
def tearDown(self):
|
||
|
finished_database_for_testing(self)
|
||
|
self.ctx.__exit__(True, None, None)
|
||
|
|
||
|
def validate(self, service, config, user=None, password=None):
|
||
|
self.validated.add(service)
|
||
|
config['TESTING'] = True
|
||
|
VALIDATORS[service](config, user, password)
|
||
|
|
||
|
def test_validate_redis(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing redis hostname'):
|
||
|
self.validate('redis', {})
|
||
|
|
||
|
with self.assertRaises(redis.ConnectionError):
|
||
|
self.validate('redis', {
|
||
|
'BUILDLOGS_REDIS': {
|
||
|
'host': 'somehost',
|
||
|
},
|
||
|
})
|
||
|
|
||
|
def test_validate_mail(self):
|
||
|
# Skip mail.
|
||
|
self.validated.add('mail')
|
||
|
|
||
|
def test_validate_database(self):
|
||
|
with self.assertRaisesRegexp(Exception, 'database not properly initialized'):
|
||
|
self.validate('database', {
|
||
|
'DB_URI': 'mysql://somehost',
|
||
|
})
|
||
|
|
||
|
def test_validate_jwt(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Verification endpoint'):
|
||
|
self.validate('jwt', {
|
||
|
'AUTHENTICATION_TYPE': 'JWT',
|
||
|
})
|
||
|
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Issuer ID'):
|
||
|
self.validate('jwt', {
|
||
|
'AUTHENTICATION_TYPE': 'JWT',
|
||
|
'JWT_VERIFY_ENDPOINT': 'somehost',
|
||
|
})
|
||
|
|
||
|
with self.assertRaisesRegexp(Exception, 'JWT Authentication public key file'):
|
||
|
self.validate('jwt', {
|
||
|
'AUTHENTICATION_TYPE': 'JWT',
|
||
|
'JWT_VERIFY_ENDPOINT': 'somehost',
|
||
|
'JWT_AUTH_ISSUER': 'someissuer',
|
||
|
})
|
||
|
|
||
|
# TODO(jschorr): Add another test once we switch JWT auth to use the config provider to
|
||
|
# find the file
|
||
|
|
||
|
def test_validate_registry_storage(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Storage configuration required'):
|
||
|
self.validate('registry-storage', {})
|
||
|
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Locally mounted directory not'):
|
||
|
self.validate('registry-storage', {
|
||
|
'FEATURE_STORAGE_REPLICATION': True,
|
||
|
'DISTRIBUTED_STORAGE_CONFIG': {
|
||
|
'default': ('LocalStorage', {
|
||
|
'storage_path': '',
|
||
|
}),
|
||
|
}
|
||
|
})
|
||
|
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'No such file or directory'):
|
||
|
self.validate('registry-storage', {
|
||
|
'DISTRIBUTED_STORAGE_CONFIG': {
|
||
|
'default': ('LocalStorage', {
|
||
|
'storage_path': '',
|
||
|
}),
|
||
|
}
|
||
|
})
|
||
|
|
||
|
with moto.mock_s3():
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'S3ResponseError: 404 Not Found'):
|
||
|
self.validate('registry-storage', {
|
||
|
'DISTRIBUTED_STORAGE_CONFIG': {
|
||
|
'default': ('S3Storage', {
|
||
|
's3_access_key': 'invalid',
|
||
|
's3_secret_key': 'invalid',
|
||
|
's3_bucket': 'somebucket',
|
||
|
'storage_path': ''
|
||
|
}),
|
||
|
}
|
||
|
})
|
||
|
|
||
|
def test_validate_bittorrent(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing announce URL'):
|
||
|
self.validate('bittorrent', {})
|
||
|
|
||
|
announcer_hit = [False]
|
||
|
|
||
|
@urlmatch(netloc=r'somehost', path='/announce')
|
||
|
def handler(url, request):
|
||
|
announcer_hit[0] = True
|
||
|
return {'status_code': 200, 'content': ''}
|
||
|
|
||
|
with HTTMock(handler):
|
||
|
self.validate('bittorrent', {
|
||
|
'BITTORRENT_ANNOUNCE_URL': 'http://somehost/announce',
|
||
|
})
|
||
|
|
||
|
self.assertTrue(announcer_hit[0])
|
||
|
|
||
|
def test_validate_ssl(self):
|
||
|
self.validate('ssl', {
|
||
|
'PREFERRED_URL_SCHEME': 'http',
|
||
|
})
|
||
|
|
||
|
self.validate('ssl', {
|
||
|
'PREFERRED_URL_SCHEME': 'https',
|
||
|
'EXTERNAL_TLS_TERMINATION': True,
|
||
|
})
|
||
|
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing required SSL file'):
|
||
|
self.validate('ssl', {
|
||
|
'PREFERRED_URL_SCHEME': 'https',
|
||
|
})
|
||
|
|
||
|
# TODO(jschorr): Add SSL verification tests once file lookup is fixed.
|
||
|
|
||
|
def test_validate_keystone(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException,
|
||
|
'Verification of superuser someuser failed'):
|
||
|
self.validate('keystone', {
|
||
|
'AUTHENTICATION_TYPE': 'Keystone',
|
||
|
'KEYSTONE_AUTH_URL': 'somehost',
|
||
|
'KEYSTONE_AUTH_VERSION': 2,
|
||
|
'KEYSTONE_ADMIN_USERNAME': 'someusername',
|
||
|
'KEYSTONE_ADMIN_PASSWORD': 'somepassword',
|
||
|
'KEYSTONE_ADMIN_TENANT': 'sometenant',
|
||
|
}, user=AttrDict(dict(username='someuser')))
|
||
|
|
||
|
def test_validate_ldap(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin DN for LDAP'):
|
||
|
self.validate('ldap', {
|
||
|
'AUTHENTICATION_TYPE': 'LDAP',
|
||
|
})
|
||
|
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin Password for LDAP'):
|
||
|
self.validate('ldap', {
|
||
|
'AUTHENTICATION_TYPE': 'LDAP',
|
||
|
'LDAP_ADMIN_DN': 'somedn',
|
||
|
})
|
||
|
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Can\'t contact LDAP server'):
|
||
|
self.validate('ldap', {
|
||
|
'AUTHENTICATION_TYPE': 'LDAP',
|
||
|
'LDAP_ADMIN_DN': 'somedn',
|
||
|
'LDAP_ADMIN_PASSWD': 'somepass',
|
||
|
'LDAP_URI': 'ldap://localhost',
|
||
|
})
|
||
|
|
||
|
def test_validate_signer(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Unknown signing engine'):
|
||
|
self.validate('signer', {
|
||
|
'SIGNING_ENGINE': 'foobar',
|
||
|
})
|
||
|
|
||
|
def test_validate_security_scanner(self):
|
||
|
url_hit = [False]
|
||
|
@urlmatch(netloc=r'somehost')
|
||
|
def handler(url, request):
|
||
|
url_hit[0] = True
|
||
|
return {'status_code': 200, 'content': ''}
|
||
|
|
||
|
with HTTMock(handler):
|
||
|
self.validate('security-scanner', {
|
||
|
'DISTRIBUTED_STORAGE_PREFERENCE': ['local'],
|
||
|
'DISTRIBUTED_STORAGE_CONFIG': {
|
||
|
'default': ('LocalStorage', {
|
||
|
'storage_path': '',
|
||
|
}),
|
||
|
},
|
||
|
'SECURITY_SCANNER_ENDPOINT': 'http://somehost',
|
||
|
})
|
||
|
|
||
|
|
||
|
def test_validate_github_trigger(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'):
|
||
|
self.validate('github-trigger', {})
|
||
|
|
||
|
url_hit = [False]
|
||
|
@urlmatch(netloc=r'somehost')
|
||
|
def handler(url, request):
|
||
|
url_hit[0] = True
|
||
|
return {'status_code': 200, 'content': ''}
|
||
|
|
||
|
with HTTMock(handler):
|
||
|
with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'):
|
||
|
self.validate('github-trigger', {
|
||
|
'GITHUB_TRIGGER_CONFIG': {
|
||
|
'GITHUB_ENDPOINT': 'http://somehost',
|
||
|
'CLIENT_ID': 'foo',
|
||
|
'CLIENT_SECRET': 'bar',
|
||
|
},
|
||
|
})
|
||
|
|
||
|
self.assertTrue(url_hit[0])
|
||
|
|
||
|
def test_validate_github_login(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'):
|
||
|
self.validate('github-login', {})
|
||
|
|
||
|
url_hit = [False]
|
||
|
@urlmatch(netloc=r'somehost')
|
||
|
def handler(url, request):
|
||
|
url_hit[0] = True
|
||
|
return {'status_code': 200, 'content': ''}
|
||
|
|
||
|
with HTTMock(handler):
|
||
|
with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'):
|
||
|
self.validate('github-login', {
|
||
|
'GITHUB_LOGIN_CONFIG': {
|
||
|
'GITHUB_ENDPOINT': 'http://somehost',
|
||
|
'CLIENT_ID': 'foo',
|
||
|
'CLIENT_SECRET': 'bar',
|
||
|
},
|
||
|
})
|
||
|
|
||
|
self.assertTrue(url_hit[0])
|
||
|
|
||
|
def test_validate_bitbucket_trigger(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'):
|
||
|
self.validate('bitbucket-trigger', {})
|
||
|
|
||
|
url_hit = [False]
|
||
|
@urlmatch(netloc=r'bitbucket.org')
|
||
|
def handler(url, request):
|
||
|
url_hit[0] = True
|
||
|
return {
|
||
|
'status_code': 200,
|
||
|
'content': 'oauth_token=foo&oauth_token_secret=bar',
|
||
|
}
|
||
|
|
||
|
with HTTMock(handler):
|
||
|
self.validate('bitbucket-trigger', {
|
||
|
'BITBUCKET_TRIGGER_CONFIG': {
|
||
|
'CONSUMER_KEY': 'foo',
|
||
|
'CONSUMER_SECRET': 'bar',
|
||
|
},
|
||
|
})
|
||
|
|
||
|
self.assertTrue(url_hit[0])
|
||
|
|
||
|
def test_validate_google_login(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'):
|
||
|
self.validate('google-login', {})
|
||
|
|
||
|
url_hit = [False]
|
||
|
@urlmatch(netloc=r'www.googleapis.com', path='/oauth2/v3/token')
|
||
|
def handler(url, request):
|
||
|
url_hit[0] = True
|
||
|
return {'status_code': 200, 'content': ''}
|
||
|
|
||
|
with HTTMock(handler):
|
||
|
self.validate('google-login', {
|
||
|
'GOOGLE_LOGIN_CONFIG': {
|
||
|
'CLIENT_ID': 'foo',
|
||
|
'CLIENT_SECRET': 'bar',
|
||
|
},
|
||
|
})
|
||
|
|
||
|
self.assertTrue(url_hit[0])
|
||
|
|
||
|
def test_validate_gitlab_trigger(self):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitLab client id'):
|
||
|
self.validate('gitlab-trigger', {})
|
||
|
|
||
|
url_hit = [False]
|
||
|
@urlmatch(netloc=r'somegitlab', path='/oauth/token')
|
||
|
def handler(url, request):
|
||
|
url_hit[0] = True
|
||
|
return {'status_code': 200, 'content': '{}'}
|
||
|
|
||
|
with HTTMock(handler):
|
||
|
with self.assertRaisesRegexp(ConfigValidationException, "Invalid client id or client secret"):
|
||
|
self.validate('gitlab-trigger', {
|
||
|
'GITLAB_TRIGGER_CONFIG': {
|
||
|
'GITLAB_ENDPOINT': 'http://somegitlab',
|
||
|
'CLIENT_ID': 'foo',
|
||
|
'CLIENT_SECRET': 'bar',
|
||
|
},
|
||
|
})
|
||
|
|
||
|
self.assertTrue(url_hit[0])
|
||
|
|
||
|
|
||
|
@classmethod
|
||
|
def tearDownClass(cls):
|
||
|
not_run = set(VALIDATORS.keys()) - cls.validated
|
||
|
assert not not_run, not_run
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
unittest.main()
|