236655adb4
Note that the test suite doesn't fully verify that each validation succeeds; rather, it ensures that the proper system (storage, security scanning, etc) is called with the configuration and returns at all (usually with an expected error). This should prevent us from forgetting to update these code paths when we change config-based systems. Longer term, we might want to have these tests stand up fake/mock versions of the endpoint services as well, for end-to-end testing.
331 lines
No EOL
10 KiB
Python
331 lines
No EOL
10 KiB
Python
import unittest
|
|
import redis
|
|
import moto
|
|
import json
|
|
|
|
from httmock import urlmatch, HTTMock
|
|
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
|
|
from util.config.validator import VALIDATORS, ConfigValidationException
|
|
from util.morecollections import AttrDict
|
|
|
|
from app import app
|
|
|
|
class TestValidateConfig(unittest.TestCase):
|
|
validated = set([])
|
|
|
|
def setUp(self):
|
|
setup_database_for_testing(self)
|
|
|
|
self.app = app.test_client()
|
|
self.ctx = app.test_request_context()
|
|
self.ctx.__enter__()
|
|
|
|
def tearDown(self):
|
|
finished_database_for_testing(self)
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
def validate(self, service, config, user=None, password=None):
|
|
self.validated.add(service)
|
|
config['TESTING'] = True
|
|
VALIDATORS[service](config, user, password)
|
|
|
|
def test_validate_redis(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing redis hostname'):
|
|
self.validate('redis', {})
|
|
|
|
with self.assertRaises(redis.ConnectionError):
|
|
self.validate('redis', {
|
|
'BUILDLOGS_REDIS': {
|
|
'host': 'somehost',
|
|
},
|
|
})
|
|
|
|
def test_validate_mail(self):
|
|
# Skip mail.
|
|
self.validated.add('mail')
|
|
|
|
def test_validate_database(self):
|
|
with self.assertRaisesRegexp(Exception, 'database not properly initialized'):
|
|
self.validate('database', {
|
|
'DB_URI': 'mysql://somehost',
|
|
})
|
|
|
|
def test_validate_jwt(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Verification endpoint'):
|
|
self.validate('jwt', {
|
|
'AUTHENTICATION_TYPE': 'JWT',
|
|
})
|
|
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Issuer ID'):
|
|
self.validate('jwt', {
|
|
'AUTHENTICATION_TYPE': 'JWT',
|
|
'JWT_VERIFY_ENDPOINT': 'somehost',
|
|
})
|
|
|
|
with self.assertRaisesRegexp(Exception, 'JWT Authentication public key file'):
|
|
self.validate('jwt', {
|
|
'AUTHENTICATION_TYPE': 'JWT',
|
|
'JWT_VERIFY_ENDPOINT': 'somehost',
|
|
'JWT_AUTH_ISSUER': 'someissuer',
|
|
})
|
|
|
|
# TODO(jschorr): Add another test once we switch JWT auth to use the config provider to
|
|
# find the file
|
|
|
|
def test_validate_registry_storage(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Storage configuration required'):
|
|
self.validate('registry-storage', {})
|
|
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Locally mounted directory not'):
|
|
self.validate('registry-storage', {
|
|
'FEATURE_STORAGE_REPLICATION': True,
|
|
'DISTRIBUTED_STORAGE_CONFIG': {
|
|
'default': ('LocalStorage', {
|
|
'storage_path': '',
|
|
}),
|
|
}
|
|
})
|
|
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'No such file or directory'):
|
|
self.validate('registry-storage', {
|
|
'DISTRIBUTED_STORAGE_CONFIG': {
|
|
'default': ('LocalStorage', {
|
|
'storage_path': '',
|
|
}),
|
|
}
|
|
})
|
|
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'not under a mounted volume'):
|
|
self.validate('registry-storage', {
|
|
'DISTRIBUTED_STORAGE_CONFIG': {
|
|
'default': ('LocalStorage', {
|
|
'storage_path': '/tmp/somepath',
|
|
}),
|
|
}
|
|
})
|
|
|
|
with moto.mock_s3():
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'S3ResponseError: 404 Not Found'):
|
|
self.validate('registry-storage', {
|
|
'DISTRIBUTED_STORAGE_CONFIG': {
|
|
'default': ('S3Storage', {
|
|
's3_access_key': 'invalid',
|
|
's3_secret_key': 'invalid',
|
|
's3_bucket': 'somebucket',
|
|
'storage_path': ''
|
|
}),
|
|
}
|
|
})
|
|
|
|
def test_validate_bittorrent(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing announce URL'):
|
|
self.validate('bittorrent', {})
|
|
|
|
announcer_hit = [False]
|
|
|
|
@urlmatch(netloc=r'somehost', path='/announce')
|
|
def handler(url, request):
|
|
announcer_hit[0] = True
|
|
return {'status_code': 200, 'content': ''}
|
|
|
|
with HTTMock(handler):
|
|
self.validate('bittorrent', {
|
|
'BITTORRENT_ANNOUNCE_URL': 'http://somehost/announce',
|
|
})
|
|
|
|
self.assertTrue(announcer_hit[0])
|
|
|
|
def test_validate_ssl(self):
|
|
self.validate('ssl', {
|
|
'PREFERRED_URL_SCHEME': 'http',
|
|
})
|
|
|
|
self.validate('ssl', {
|
|
'PREFERRED_URL_SCHEME': 'https',
|
|
'EXTERNAL_TLS_TERMINATION': True,
|
|
})
|
|
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing required SSL file'):
|
|
self.validate('ssl', {
|
|
'PREFERRED_URL_SCHEME': 'https',
|
|
})
|
|
|
|
# TODO(jschorr): Add SSL verification tests once file lookup is fixed.
|
|
|
|
def test_validate_keystone(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException,
|
|
'Verification of superuser someuser failed'):
|
|
self.validate('keystone', {
|
|
'AUTHENTICATION_TYPE': 'Keystone',
|
|
'KEYSTONE_AUTH_URL': 'somehost',
|
|
'KEYSTONE_AUTH_VERSION': 2,
|
|
'KEYSTONE_ADMIN_USERNAME': 'someusername',
|
|
'KEYSTONE_ADMIN_PASSWORD': 'somepassword',
|
|
'KEYSTONE_ADMIN_TENANT': 'sometenant',
|
|
}, user=AttrDict(dict(username='someuser')))
|
|
|
|
def test_validate_ldap(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin DN for LDAP'):
|
|
self.validate('ldap', {
|
|
'AUTHENTICATION_TYPE': 'LDAP',
|
|
})
|
|
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin Password for LDAP'):
|
|
self.validate('ldap', {
|
|
'AUTHENTICATION_TYPE': 'LDAP',
|
|
'LDAP_ADMIN_DN': 'somedn',
|
|
})
|
|
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Can\'t contact LDAP server'):
|
|
self.validate('ldap', {
|
|
'AUTHENTICATION_TYPE': 'LDAP',
|
|
'LDAP_ADMIN_DN': 'somedn',
|
|
'LDAP_ADMIN_PASSWD': 'somepass',
|
|
'LDAP_URI': 'ldap://localhost',
|
|
})
|
|
|
|
def test_validate_signer(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Unknown signing engine'):
|
|
self.validate('signer', {
|
|
'SIGNING_ENGINE': 'foobar',
|
|
})
|
|
|
|
def test_validate_security_scanner(self):
|
|
url_hit = [False]
|
|
@urlmatch(netloc=r'somehost')
|
|
def handler(url, request):
|
|
url_hit[0] = True
|
|
return {'status_code': 200, 'content': ''}
|
|
|
|
with HTTMock(handler):
|
|
self.validate('security-scanner', {
|
|
'DISTRIBUTED_STORAGE_PREFERENCE': ['local'],
|
|
'DISTRIBUTED_STORAGE_CONFIG': {
|
|
'default': ('LocalStorage', {
|
|
'storage_path': '',
|
|
}),
|
|
},
|
|
'SECURITY_SCANNER_ENDPOINT': 'http://somehost',
|
|
})
|
|
|
|
|
|
def test_validate_github_trigger(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'):
|
|
self.validate('github-trigger', {})
|
|
|
|
url_hit = [False]
|
|
@urlmatch(netloc=r'somehost')
|
|
def handler(url, request):
|
|
url_hit[0] = True
|
|
return {'status_code': 200, 'content': ''}
|
|
|
|
with HTTMock(handler):
|
|
with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'):
|
|
self.validate('github-trigger', {
|
|
'GITHUB_TRIGGER_CONFIG': {
|
|
'GITHUB_ENDPOINT': 'http://somehost',
|
|
'CLIENT_ID': 'foo',
|
|
'CLIENT_SECRET': 'bar',
|
|
},
|
|
})
|
|
|
|
self.assertTrue(url_hit[0])
|
|
|
|
def test_validate_github_login(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'):
|
|
self.validate('github-login', {})
|
|
|
|
url_hit = [False]
|
|
@urlmatch(netloc=r'somehost')
|
|
def handler(url, request):
|
|
url_hit[0] = True
|
|
return {'status_code': 200, 'content': ''}
|
|
|
|
with HTTMock(handler):
|
|
with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'):
|
|
self.validate('github-login', {
|
|
'GITHUB_LOGIN_CONFIG': {
|
|
'GITHUB_ENDPOINT': 'http://somehost',
|
|
'CLIENT_ID': 'foo',
|
|
'CLIENT_SECRET': 'bar',
|
|
},
|
|
})
|
|
|
|
self.assertTrue(url_hit[0])
|
|
|
|
def test_validate_bitbucket_trigger(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'):
|
|
self.validate('bitbucket-trigger', {})
|
|
|
|
url_hit = [False]
|
|
@urlmatch(netloc=r'bitbucket.org')
|
|
def handler(url, request):
|
|
url_hit[0] = True
|
|
return {
|
|
'status_code': 200,
|
|
'content': 'oauth_token=foo&oauth_token_secret=bar',
|
|
}
|
|
|
|
with HTTMock(handler):
|
|
self.validate('bitbucket-trigger', {
|
|
'BITBUCKET_TRIGGER_CONFIG': {
|
|
'CONSUMER_KEY': 'foo',
|
|
'CONSUMER_SECRET': 'bar',
|
|
},
|
|
})
|
|
|
|
self.assertTrue(url_hit[0])
|
|
|
|
def test_validate_google_login(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'):
|
|
self.validate('google-login', {})
|
|
|
|
url_hit = [False]
|
|
@urlmatch(netloc=r'www.googleapis.com', path='/oauth2/v3/token')
|
|
def handler(url, request):
|
|
url_hit[0] = True
|
|
return {'status_code': 200, 'content': ''}
|
|
|
|
with HTTMock(handler):
|
|
self.validate('google-login', {
|
|
'GOOGLE_LOGIN_CONFIG': {
|
|
'CLIENT_ID': 'foo',
|
|
'CLIENT_SECRET': 'bar',
|
|
},
|
|
})
|
|
|
|
self.assertTrue(url_hit[0])
|
|
|
|
def test_validate_gitlab_trigger(self):
|
|
with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitLab client id'):
|
|
self.validate('gitlab-trigger', {})
|
|
|
|
url_hit = [False]
|
|
@urlmatch(netloc=r'somegitlab', path='/oauth/token')
|
|
def handler(url, request):
|
|
url_hit[0] = True
|
|
return {'status_code': 200, 'content': '{}'}
|
|
|
|
with HTTMock(handler):
|
|
with self.assertRaisesRegexp(ConfigValidationException, "Invalid client id or client secret"):
|
|
self.validate('gitlab-trigger', {
|
|
'GITLAB_TRIGGER_CONFIG': {
|
|
'GITLAB_ENDPOINT': 'http://somegitlab',
|
|
'CLIENT_ID': 'foo',
|
|
'CLIENT_SECRET': 'bar',
|
|
},
|
|
})
|
|
|
|
self.assertTrue(url_hit[0])
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
not_run = set(VALIDATORS.keys()) - cls.validated
|
|
assert not not_run, not_run
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |