Merge pull request #2360 from coreos-inc/code-with-quay

Code with Quay: Refactor the config validation logic into their own classes and add pytest-based unittesting
This commit is contained in:
josephschorr 2017-02-27 13:51:37 -05:00 committed by GitHub
commit 609c942746
39 changed files with 1301 additions and 825 deletions

View file

@ -1,12 +1,9 @@
import logging
from data.database import TeamRole
from util.config.validator import validate_database_url
from data.database import TeamRole, validate_database_url
logger = logging.getLogger(__name__)
def check_health(app_config):
# Attempt to connect to the database first. If the DB is not responding,
# using the validate_database_url will timeout quickly, as opposed to

View file

@ -28,11 +28,12 @@ class ExternalJWTAuthN(FederatedUsers):
default_key_path = os.path.join(override_config_dir, ExternalJWTAuthN.PUBLIC_KEY_FILENAME)
public_key_path = public_key_path or default_key_path
if not os.path.exists(public_key_path):
error_message = ('JWT Authentication public key file "%s" not found in directory %s' %
(ExternalJWTAuthN.PUBLIC_KEY_FILENAME, override_config_dir))
error_message = ('JWT Authentication public key file "%s" not found' % public_key_path)
raise Exception(error_message)
self.public_key_path = public_key_path
with open(public_key_path) as public_key_file:
self.public_key = public_key_file.read()

View file

@ -68,3 +68,4 @@ trollius
tzlocal
xhtml2pdf
recaptcha2
mockredispy

View file

@ -52,6 +52,7 @@ marisa-trie==0.7.2
MarkupSafe==0.23
mixpanel==4.3.1
mock==2.0.0
mockredispy==2.9.3
-e git+https://github.com/coreos/mockldap.git@59a46efbe8c7cd8146a87a7c4f2b09746b953e11#egg=mockldap
monotonic==1.2
moto==0.4.25

View file

@ -36,7 +36,8 @@ def fake_jwt(requires_email=True):
getuser_url = server_url + '/user/get'
jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
app.config['HTTPCLIENT'], 300, public_key.name,
app.config['HTTPCLIENT'], 300,
public_key_path=public_key.name,
requires_email=requires_email)
with liveserver_app(jwt_app, port):

View file

@ -1,320 +0,0 @@
import unittest
import redis
import moto
import json
from httmock import urlmatch, HTTMock
from initdb import setup_database_for_testing, finished_database_for_testing
from util.config.validator import VALIDATORS, ConfigValidationException
from util.morecollections import AttrDict
from app import app
class TestValidateConfig(unittest.TestCase):
validated = set([])
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
def tearDown(self):
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def validate(self, service, config, user=None, password=None):
self.validated.add(service)
config['TESTING'] = True
VALIDATORS[service](config, user, password)
def test_validate_redis(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Missing redis hostname'):
self.validate('redis', {})
with self.assertRaises(redis.ConnectionError):
self.validate('redis', {
'BUILDLOGS_REDIS': {
'host': 'somehost',
},
})
def test_validate_mail(self):
# Skip mail.
self.validated.add('mail')
def test_validate_database(self):
with self.assertRaisesRegexp(Exception, 'database not properly initialized'):
self.validate('database', {
'DB_URI': 'mysql://somehost',
})
def test_validate_jwt(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Verification endpoint'):
self.validate('jwt', {
'AUTHENTICATION_TYPE': 'JWT',
})
with self.assertRaisesRegexp(ConfigValidationException, 'Missing JWT Issuer ID'):
self.validate('jwt', {
'AUTHENTICATION_TYPE': 'JWT',
'JWT_VERIFY_ENDPOINT': 'somehost',
})
with self.assertRaisesRegexp(Exception, 'JWT Authentication public key file'):
self.validate('jwt', {
'AUTHENTICATION_TYPE': 'JWT',
'JWT_VERIFY_ENDPOINT': 'somehost',
'JWT_AUTH_ISSUER': 'someissuer',
})
# TODO(jschorr): Add another test once we switch JWT auth to use the config provider to
# find the file
def test_validate_registry_storage(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Storage configuration required'):
self.validate('registry-storage', {})
with self.assertRaisesRegexp(ConfigValidationException, 'Locally mounted directory not'):
self.validate('registry-storage', {
'FEATURE_STORAGE_REPLICATION': True,
'DISTRIBUTED_STORAGE_CONFIG': {
'default': ('LocalStorage', {
'storage_path': '',
}),
}
})
with self.assertRaisesRegexp(ConfigValidationException, 'No such file or directory'):
self.validate('registry-storage', {
'DISTRIBUTED_STORAGE_CONFIG': {
'default': ('LocalStorage', {
'storage_path': '',
}),
}
})
with moto.mock_s3():
with self.assertRaisesRegexp(ConfigValidationException, 'S3ResponseError: 404 Not Found'):
self.validate('registry-storage', {
'DISTRIBUTED_STORAGE_CONFIG': {
'default': ('S3Storage', {
's3_access_key': 'invalid',
's3_secret_key': 'invalid',
's3_bucket': 'somebucket',
'storage_path': ''
}),
}
})
def test_validate_bittorrent(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Missing announce URL'):
self.validate('bittorrent', {})
announcer_hit = [False]
@urlmatch(netloc=r'somehost', path='/announce')
def handler(url, request):
announcer_hit[0] = True
return {'status_code': 200, 'content': ''}
with HTTMock(handler):
self.validate('bittorrent', {
'BITTORRENT_ANNOUNCE_URL': 'http://somehost/announce',
})
self.assertTrue(announcer_hit[0])
def test_validate_ssl(self):
self.validate('ssl', {
'PREFERRED_URL_SCHEME': 'http',
})
self.validate('ssl', {
'PREFERRED_URL_SCHEME': 'https',
'EXTERNAL_TLS_TERMINATION': True,
})
with self.assertRaisesRegexp(ConfigValidationException, 'Missing required SSL file'):
self.validate('ssl', {
'PREFERRED_URL_SCHEME': 'https',
})
def test_validate_keystone(self):
with self.assertRaisesRegexp(ConfigValidationException,
'Verification of superuser someuser failed'):
self.validate('keystone', {
'AUTHENTICATION_TYPE': 'Keystone',
'KEYSTONE_AUTH_URL': 'somehost',
'KEYSTONE_AUTH_VERSION': 2,
'KEYSTONE_ADMIN_USERNAME': 'someusername',
'KEYSTONE_ADMIN_PASSWORD': 'somepassword',
'KEYSTONE_ADMIN_TENANT': 'sometenant',
}, user=AttrDict(dict(username='someuser')))
def test_validate_ldap(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin DN for LDAP'):
self.validate('ldap', {
'AUTHENTICATION_TYPE': 'LDAP',
})
with self.assertRaisesRegexp(ConfigValidationException, 'Missing Admin Password for LDAP'):
self.validate('ldap', {
'AUTHENTICATION_TYPE': 'LDAP',
'LDAP_ADMIN_DN': 'somedn',
})
with self.assertRaisesRegexp(ConfigValidationException, 'Can\'t contact LDAP server'):
self.validate('ldap', {
'AUTHENTICATION_TYPE': 'LDAP',
'LDAP_ADMIN_DN': 'somedn',
'LDAP_ADMIN_PASSWD': 'somepass',
'LDAP_URI': 'ldap://localhost',
})
def test_validate_signer(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Unknown signing engine'):
self.validate('signer', {
'SIGNING_ENGINE': 'foobar',
})
def test_validate_security_scanner(self):
url_hit = [False]
@urlmatch(netloc=r'somehost')
def handler(url, request):
url_hit[0] = True
return {'status_code': 200, 'content': ''}
with HTTMock(handler):
self.validate('security-scanner', {
'DISTRIBUTED_STORAGE_PREFERENCE': ['local'],
'DISTRIBUTED_STORAGE_CONFIG': {
'default': ('LocalStorage', {
'storage_path': '',
}),
},
'SECURITY_SCANNER_ENDPOINT': 'http://somehost',
})
def test_validate_github_trigger(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'):
self.validate('github-trigger', {})
url_hit = [False]
@urlmatch(netloc=r'somehost')
def handler(url, request):
url_hit[0] = True
return {'status_code': 200, 'content': ''}
with HTTMock(handler):
with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'):
self.validate('github-trigger', {
'GITHUB_TRIGGER_CONFIG': {
'GITHUB_ENDPOINT': 'http://somehost',
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
},
})
self.assertTrue(url_hit[0])
def test_validate_github_login(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitHub client id'):
self.validate('github-login', {})
url_hit = [False]
@urlmatch(netloc=r'somehost')
def handler(url, request):
url_hit[0] = True
return {'status_code': 200, 'content': ''}
with HTTMock(handler):
with self.assertRaisesRegexp(Exception, 'Endpoint is not a Github'):
self.validate('github-login', {
'GITHUB_LOGIN_CONFIG': {
'GITHUB_ENDPOINT': 'http://somehost',
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
},
})
self.assertTrue(url_hit[0])
def test_validate_bitbucket_trigger(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'):
self.validate('bitbucket-trigger', {})
url_hit = [False]
@urlmatch(netloc=r'bitbucket.org')
def handler(url, request):
url_hit[0] = True
return {
'status_code': 200,
'content': 'oauth_token=foo&oauth_token_secret=bar',
}
with HTTMock(handler):
self.validate('bitbucket-trigger', {
'BITBUCKET_TRIGGER_CONFIG': {
'CONSUMER_KEY': 'foo',
'CONSUMER_SECRET': 'bar',
},
})
self.assertTrue(url_hit[0])
def test_validate_google_login(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Missing client ID and client secret'):
self.validate('google-login', {})
url_hit = [False]
@urlmatch(netloc=r'www.googleapis.com', path='/oauth2/v3/token')
def handler(url, request):
url_hit[0] = True
return {'status_code': 200, 'content': ''}
with HTTMock(handler):
self.validate('google-login', {
'GOOGLE_LOGIN_CONFIG': {
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
},
})
self.assertTrue(url_hit[0])
def test_validate_gitlab_trigger(self):
with self.assertRaisesRegexp(ConfigValidationException, 'Missing GitLab client id'):
self.validate('gitlab-trigger', {})
url_hit = [False]
@urlmatch(netloc=r'somegitlab', path='/oauth/token')
def handler(url, request):
url_hit[0] = True
return {'status_code': 200, 'content': '{}'}
with HTTMock(handler):
with self.assertRaisesRegexp(ConfigValidationException, "Invalid client id or client secret"):
self.validate('gitlab-trigger', {
'GITLAB_TRIGGER_CONFIG': {
'GITLAB_ENDPOINT': 'http://somegitlab',
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
},
})
self.assertTrue(url_hit[0])
@classmethod
def tearDownClass(cls):
not_run = set(VALIDATORS.keys()) - cls.validated
assert not not_run, not_run
if __name__ == '__main__':
unittest.main()

View file

@ -1,34 +1,23 @@
import logging
import subprocess
import time
from StringIO import StringIO
from hashlib import sha1
import ldap
import peewee
import redis
from flask import Flask
from flask_mail import Mail, Message
from app import app, config_provider, get_app_url, OVERRIDE_CONFIG_DIRECTORY
from auth.auth_context import get_authenticated_user
from bitbucket import BitBucket
from boot import setup_jwt_proxy
from data.database import validate_database_url
from data.users import LDAP_CERT_FILENAME
from data.users.externaljwt import ExternalJWTAuthN
from data.users.externalldap import LDAPConnection, LDAPUsers
from data.users.keystone import get_keystone_users
from storage import get_storage_driver
from oauth.services.github import GithubOAuthService
from oauth.services.google import GoogleOAuthService
from oauth.services.gitlab import GitLabOAuthService
from util.secscan.api import SecurityScannerAPI
from util.registry.torrent import torrent_jwt
from util.security.signing import SIGNING_ENGINES
from util.security.ssl import load_certificate, CertInvalidException, KeyInvalidException
from util.config.validators.validate_database import DatabaseValidator
from util.config.validators.validate_redis import RedisValidator
from util.config.validators.validate_storage import StorageValidator
from util.config.validators.validate_email import EmailValidator
from util.config.validators.validate_ldap import LDAPValidator
from util.config.validators.validate_keystone import KeystoneValidator
from util.config.validators.validate_jwt import JWTAuthValidator
from util.config.validators.validate_secscan import SecurityScannerValidator
from util.config.validators.validate_signer import SignerValidator
from util.config.validators.validate_torrent import BittorrentValidator
from util.config.validators.validate_ssl import SSLValidator, SSL_FILENAMES
from util.config.validators.validate_google_login import GoogleLoginValidator
from util.config.validators.validate_bitbucket_trigger import BitbucketTriggerValidator
from util.config.validators.validate_gitlab_trigger import GitLabTriggerValidator
from util.config.validators.validate_github import GitHubLoginValidator, GitHubTriggerValidator
logger = logging.getLogger(__name__)
@ -36,9 +25,7 @@ class ConfigValidationException(Exception):
""" Exception raised when the configuration fails to validate for a known reason. """
pass
# Note: Only add files required for HTTPS to the SSL_FILESNAMES list.
SSL_FILENAMES = ['ssl.cert', 'ssl.key']
DB_SSL_FILENAMES = ['database.pem']
JWT_FILENAMES = ['jwt-authn.cert']
ACI_CERT_FILENAMES = ['signing-public.gpg', 'signing-private.gpg']
@ -47,19 +34,24 @@ CONFIG_FILENAMES = (SSL_FILENAMES + DB_SSL_FILENAMES + JWT_FILENAMES + ACI_CERT_
LDAP_FILENAMES)
EXTRA_CA_DIRECTORY = 'extra_ca_certs'
def get_storage_providers(config):
storage_config = config.get('DISTRIBUTED_STORAGE_CONFIG', {})
drivers = {}
try:
for name, parameters in storage_config.items():
drivers[name] = (parameters[0], get_storage_driver(None, None, None, parameters))
except TypeError:
logger.exception('Missing required storage configuration provider')
raise ConfigValidationException('Missing required parameter(s) for storage %s' % name)
return drivers
VALIDATORS = {
DatabaseValidator.name: DatabaseValidator.validate,
RedisValidator.name: RedisValidator.validate,
StorageValidator.name: StorageValidator.validate,
EmailValidator.name: EmailValidator.validate,
GitHubLoginValidator.name: GitHubLoginValidator.validate,
GitHubTriggerValidator.name: GitHubTriggerValidator.validate,
GitLabTriggerValidator.name: GitLabTriggerValidator.validate,
BitbucketTriggerValidator.name: BitbucketTriggerValidator.validate,
GoogleLoginValidator.name: GoogleLoginValidator.validate,
SSLValidator.name: SSLValidator.validate,
LDAPValidator.name: LDAPValidator.validate,
JWTAuthValidator.name: JWTAuthValidator.validate,
KeystoneValidator.name: KeystoneValidator.validate,
SignerValidator.name: SignerValidator.validate,
SecurityScannerValidator.name: SecurityScannerValidator.validate,
BittorrentValidator.name: BittorrentValidator.validate,
}
def validate_service_for_config(service, config, password=None):
""" Attempts to validate the configuration for the given service. """
@ -79,458 +71,3 @@ def validate_service_for_config(service, config, password=None):
'status': False,
'reason': str(ex)
}
def _validate_database(config, user_obj, _):
""" Validates connecting to the database. """
try:
validate_database_url(config['DB_URI'], config.get('DB_CONNECTION_ARGS', {}))
except peewee.OperationalError as ex:
if ex.args and len(ex.args) > 1:
raise ConfigValidationException(ex.args[1])
else:
raise ex
def _validate_redis(config, user_obj, _):
""" Validates connecting to redis. """
redis_config = config.get('BUILDLOGS_REDIS', {})
if not 'host' in redis_config:
raise ConfigValidationException('Missing redis hostname')
client = redis.StrictRedis(socket_connect_timeout=5, **redis_config)
client.ping()
def _validate_registry_storage(config, user_obj, _):
""" Validates registry storage. """
replication_enabled = config.get('FEATURE_STORAGE_REPLICATION', False)
providers = get_storage_providers(config).items()
if not providers:
raise ConfigValidationException('Storage configuration required')
for name, (storage_type, driver) in providers:
try:
if replication_enabled and storage_type == 'LocalStorage':
raise ConfigValidationException('Locally mounted directory not supported ' +
'with storage replication')
# Run validation on the driver.
driver.validate(app.config['HTTPCLIENT'])
# Run setup on the driver if the read/write succeeded.
driver.setup()
except Exception as ex:
raise ConfigValidationException('Invalid storage configuration: %s: %s' % (name, str(ex)))
def _validate_mailing(config, user_obj, _):
""" Validates sending email. """
test_app = Flask("mail-test-app")
test_app.config.update(config)
test_app.config.update({
'MAIL_FAIL_SILENTLY': False,
'TESTING': False
})
test_mail = Mail(test_app)
test_msg = Message("Test e-mail from %s" % app.config['REGISTRY_TITLE'],
sender=config.get('MAIL_DEFAULT_SENDER'))
test_msg.add_recipient(user_obj.email)
test_mail.send(test_msg)
def _validate_gitlab(config, user_obj, _):
""" Validates the OAuth credentials and API endpoint for a GitLab service. """
github_config = config.get('GITLAB_TRIGGER_CONFIG')
if not github_config:
raise ConfigValidationException('Missing GitLab client id and client secret')
endpoint = github_config.get('GITLAB_ENDPOINT')
if not endpoint:
raise ConfigValidationException('Missing GitLab Endpoint')
if endpoint.find('http://') != 0 and endpoint.find('https://') != 0:
raise ConfigValidationException('GitLab Endpoint must start with http:// or https://')
if not github_config.get('CLIENT_ID'):
raise ConfigValidationException('Missing Client ID')
if not github_config.get('CLIENT_SECRET'):
raise ConfigValidationException('Missing Client Secret')
client = app.config['HTTPCLIENT']
oauth = GitLabOAuthService(config, 'GITLAB_TRIGGER_CONFIG')
result = oauth.validate_client_id_and_secret(client, app.config)
if not result:
raise ConfigValidationException('Invalid client id or client secret')
def _validate_github(config_key):
return lambda config, user_obj, _: _validate_github_with_key(config_key, config)
def _validate_github_with_key(config_key, config):
""" Validates the OAuth credentials and API endpoint for a Github service. """
github_config = config.get(config_key)
if not github_config:
raise ConfigValidationException('Missing GitHub client id and client secret')
endpoint = github_config.get('GITHUB_ENDPOINT')
if not endpoint:
raise ConfigValidationException('Missing GitHub Endpoint')
if endpoint.find('http://') != 0 and endpoint.find('https://') != 0:
raise ConfigValidationException('Github Endpoint must start with http:// or https://')
if not github_config.get('CLIENT_ID'):
raise ConfigValidationException('Missing Client ID')
if not github_config.get('CLIENT_SECRET'):
raise ConfigValidationException('Missing Client Secret')
if github_config.get('ORG_RESTRICT') and not github_config.get('ALLOWED_ORGANIZATIONS'):
raise ConfigValidationException('Organization restriction must have at least one allowed ' +
'organization')
client = app.config['HTTPCLIENT']
oauth = GithubOAuthService(config, config_key)
result = oauth.validate_client_id_and_secret(client, app.config)
if not result:
raise ConfigValidationException('Invalid client id or client secret')
if github_config.get('ALLOWED_ORGANIZATIONS'):
for org_id in github_config.get('ALLOWED_ORGANIZATIONS'):
if not oauth.validate_organization(org_id, client):
raise ConfigValidationException('Invalid organization: %s' % org_id)
def _validate_bitbucket(config, user_obj, _):
""" Validates the config for BitBucket. """
trigger_config = config.get('BITBUCKET_TRIGGER_CONFIG')
if not trigger_config:
raise ConfigValidationException('Missing client ID and client secret')
if not trigger_config.get('CONSUMER_KEY'):
raise ConfigValidationException('Missing Consumer Key')
if not trigger_config.get('CONSUMER_SECRET'):
raise ConfigValidationException('Missing Consumer Secret')
key = trigger_config['CONSUMER_KEY']
secret = trigger_config['CONSUMER_SECRET']
callback_url = '%s/oauth1/bitbucket/callback/trigger/' % (get_app_url())
bitbucket_client = BitBucket(key, secret, callback_url)
(result, _, _) = bitbucket_client.get_authorization_url()
if not result:
raise ConfigValidationException('Invalid consumer key or secret')
def _validate_google_login(config, user_obj, _):
""" Validates the Google Login client ID and secret. """
google_login_config = config.get('GOOGLE_LOGIN_CONFIG')
if not google_login_config:
raise ConfigValidationException('Missing client ID and client secret')
if not google_login_config.get('CLIENT_ID'):
raise ConfigValidationException('Missing Client ID')
if not google_login_config.get('CLIENT_SECRET'):
raise ConfigValidationException('Missing Client Secret')
client = app.config['HTTPCLIENT']
oauth = GoogleOAuthService(config, 'GOOGLE_LOGIN_CONFIG')
result = oauth.validate_client_id_and_secret(client, app.config)
if not result:
raise ConfigValidationException('Invalid client id or client secret')
def _validate_ssl(config, user_obj, _):
""" Validates the SSL configuration (if enabled). """
# Skip if non-SSL.
if config.get('PREFERRED_URL_SCHEME', 'http') != 'https':
return
# Skip if externally terminated.
if config.get('EXTERNAL_TLS_TERMINATION', False) is True:
return
# Verify that we have all the required SSL files.
for filename in SSL_FILENAMES:
if not config_provider.volume_file_exists(filename):
raise ConfigValidationException('Missing required SSL file: %s' % filename)
# Read the contents of the SSL certificate.
with config_provider.get_volume_file(SSL_FILENAMES[0]) as f:
cert_contents = f.read()
# Validate the certificate.
try:
certificate = load_certificate(cert_contents)
except CertInvalidException as cie:
raise ConfigValidationException('Could not load SSL certificate: %s' % cie.message)
# Verify the certificate has not expired.
if certificate.expired:
raise ConfigValidationException('The specified SSL certificate has expired.')
# Verify the hostname matches the name in the certificate.
if not certificate.matches_name(config['SERVER_HOSTNAME']):
msg = ('Supported names "%s" in SSL cert do not match server hostname "%s"' %
(', '.join(list(certificate.names)), config['SERVER_HOSTNAME']))
raise ConfigValidationException(msg)
# Verify the private key against the certificate.
private_key_path = None
with config_provider.get_volume_file(SSL_FILENAMES[1]) as f:
private_key_path = f.name
if not private_key_path:
# Only in testing.
return
try:
certificate.validate_private_key(private_key_path)
except KeyInvalidException as kie:
raise ConfigValidationException('SSL private key failed to validate: %s' % kie.message)
def _validate_ldap(config, user_obj, password):
""" Validates the LDAP connection. """
if config.get('AUTHENTICATION_TYPE', 'Database') != 'LDAP':
return
# If there is a custom LDAP certificate, then reinstall the certificates for the container.
if config_provider.volume_file_exists(LDAP_CERT_FILENAME):
subprocess.check_call(['/conf/init/certs_install.sh'])
# Note: raises ldap.INVALID_CREDENTIALS on failure
admin_dn = config.get('LDAP_ADMIN_DN')
admin_passwd = config.get('LDAP_ADMIN_PASSWD')
if not admin_dn:
raise ConfigValidationException('Missing Admin DN for LDAP configuration')
if not admin_passwd:
raise ConfigValidationException('Missing Admin Password for LDAP configuration')
ldap_uri = config.get('LDAP_URI', 'ldap://localhost')
if not ldap_uri.startswith('ldap://') and not ldap_uri.startswith('ldaps://'):
raise ConfigValidationException('LDAP URI must start with ldap:// or ldaps://')
allow_tls_fallback = config.get('LDAP_ALLOW_INSECURE_FALLBACK', False)
try:
with LDAPConnection(ldap_uri, admin_dn, admin_passwd, allow_tls_fallback):
pass
except ldap.LDAPError as ex:
values = ex.args[0] if ex.args else {}
if not isinstance(values, dict):
raise ConfigValidationException(str(ex.args))
raise ConfigValidationException(values.get('desc', 'Unknown error'))
# Verify that the superuser exists. If not, raise an exception.
base_dn = config.get('LDAP_BASE_DN')
user_rdn = config.get('LDAP_USER_RDN', [])
uid_attr = config.get('LDAP_UID_ATTR', 'uid')
email_attr = config.get('LDAP_EMAIL_ATTR', 'mail')
requires_email = config.get('FEATURE_MAILING', True)
users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr,
allow_tls_fallback, requires_email=requires_email)
username = user_obj.username
(result, err_msg) = users.verify_credentials(username, password)
if not result:
msg = ('Verification of superuser %s failed: %s. \n\nThe user either does not exist ' +
'in the remote authentication system ' +
'OR LDAP auth is misconfigured.') % (username, err_msg)
raise ConfigValidationException(msg)
def _validate_jwt(config, user_obj, password):
""" Validates the JWT authentication system. """
if config.get('AUTHENTICATION_TYPE', 'Database') != 'JWT':
return
verify_endpoint = config.get('JWT_VERIFY_ENDPOINT')
query_endpoint = config.get('JWT_QUERY_ENDPOINT', None)
getuser_endpoint = config.get('JWT_GETUSER_ENDPOINT', None)
issuer = config.get('JWT_AUTH_ISSUER')
if not verify_endpoint:
raise ConfigValidationException('Missing JWT Verification endpoint')
if not issuer:
raise ConfigValidationException('Missing JWT Issuer ID')
# Try to instatiate the JWT authentication mechanism. This will raise an exception if
# the key cannot be found.
users = ExternalJWTAuthN(verify_endpoint, query_endpoint, getuser_endpoint, issuer,
OVERRIDE_CONFIG_DIRECTORY,
app.config['HTTPCLIENT'],
app.config.get('JWT_AUTH_MAX_FRESH_S', 300),
requires_email=config.get('FEATURE_MAILING', True))
# Verify that the superuser exists. If not, raise an exception.
username = user_obj.username
(result, err_msg) = users.verify_credentials(username, password)
if not result:
msg = ('Verification of superuser %s failed: %s. \n\nThe user either does not ' +
'exist in the remote authentication system ' +
'OR JWT auth is misconfigured') % (username, err_msg)
raise ConfigValidationException(msg)
# If the query endpoint exists, ensure we can query to find the current user and that we can
# look up users directly.
if query_endpoint:
(results, err_msg) = users.query_users(username)
if not results:
err_msg = err_msg or ('Could not find users matching query: %s' % username)
raise ConfigValidationException('Query endpoint is misconfigured or not returning ' +
'proper users: %s' % err_msg)
# Make sure the get user endpoint is also configured.
if not getuser_endpoint:
raise ConfigValidationException('The lookup user endpoint must be configured if the ' +
'query endpoint is set')
(result, err_msg) = users.get_user(username)
if not result:
err_msg = err_msg or ('Could not find user %s' % username)
raise ConfigValidationException('Lookup endpoint is misconfigured or not returning ' +
'properly: %s' % err_msg)
def _validate_keystone(config, user_obj, password):
""" Validates the Keystone authentication system. """
if config.get('AUTHENTICATION_TYPE', 'Database') != 'Keystone':
return
auth_url = config.get('KEYSTONE_AUTH_URL')
auth_version = int(config.get('KEYSTONE_AUTH_VERSION', 2))
admin_username = config.get('KEYSTONE_ADMIN_USERNAME')
admin_password = config.get('KEYSTONE_ADMIN_PASSWORD')
admin_tenant = config.get('KEYSTONE_ADMIN_TENANT')
if not auth_url:
raise ConfigValidationException('Missing authentication URL')
if not admin_username:
raise ConfigValidationException('Missing admin username')
if not admin_password:
raise ConfigValidationException('Missing admin password')
if not admin_tenant:
raise ConfigValidationException('Missing admin tenant')
requires_email = config.get('FEATURE_MAILING', True)
users = get_keystone_users(auth_version, auth_url, admin_username, admin_password, admin_tenant,
requires_email)
# Verify that the superuser exists. If not, raise an exception.
username = user_obj.username
(result, err_msg) = users.verify_credentials(username, password)
if not result:
msg = ('Verification of superuser %s failed: %s \n\nThe user either does not ' +
'exist in the remote authentication system ' +
'OR Keystone auth is misconfigured.') % (username, err_msg)
raise ConfigValidationException(msg)
def _validate_signer(config, user_obj, _):
""" Validates the GPG public+private key pair used for signing converted ACIs. """
if config.get('SIGNING_ENGINE') is None:
return
if config['SIGNING_ENGINE'] not in SIGNING_ENGINES:
raise ConfigValidationException('Unknown signing engine: %s' % config['SIGNING_ENGINE'])
engine = SIGNING_ENGINES[config['SIGNING_ENGINE']](config, config_provider)
engine.detached_sign(StringIO('test string'))
def _validate_security_scanner(config, user_obj, _):
""" Validates the configuration for talking to a Quay Security Scanner. """
if not config.get('TESTING', False):
# Generate a temporary Quay key to use for signing the outgoing requests.
setup_jwt_proxy()
# Wait a few seconds for the JWT proxy to startup.
time.sleep(2)
# Make a ping request to the security service.
client = app.config['HTTPCLIENT']
api = SecurityScannerAPI(app, config, None, client=client, skip_validation=True)
response = api.ping()
if response.status_code != 200:
message = 'Expected 200 status code, got %s: %s' % (response.status_code, response.text)
raise ConfigValidationException('Could not ping security scanner: %s' % message)
def _validate_bittorrent(config, user_obj, _):
""" Validates the configuration for using BitTorrent for downloads. """
announce_url = config.get('BITTORRENT_ANNOUNCE_URL')
if not announce_url:
raise ConfigValidationException('Missing announce URL')
# Ensure that the tracker is reachable and accepts requests signed with a registry key.
client = app.config['HTTPCLIENT']
params = {
'info_hash': sha1('somedata').digest(),
'peer_id': '-QUAY00-6wfG2wk6wWLc',
'uploaded': 0,
'downloaded': 0,
'left': 0,
'numwant': 0,
'port': 80,
}
encoded_jwt = torrent_jwt(params)
params['jwt'] = encoded_jwt
resp = client.get(announce_url, timeout=5, params=params)
logger.debug('Got tracker response: %s: %s', resp.status_code, resp.text)
if resp.status_code == 404:
raise ConfigValidationException('Announce path not found; did you forget `/announce`?')
if resp.status_code == 500:
raise ConfigValidationException('Did not get expected response from Tracker; ' +
'please check your settings')
if resp.status_code == 200:
if 'invalid jwt' in resp.text:
raise ConfigValidationException('Could not authorize to Tracker; is your Tracker ' +
'properly configured?')
if 'failure reason' in resp.text:
raise ConfigValidationException('Could not validate signed announce request: ' + resp.text)
VALIDATORS = {
'database': _validate_database,
'redis': _validate_redis,
'registry-storage': _validate_registry_storage,
'mail': _validate_mailing,
'github-login': _validate_github('GITHUB_LOGIN_CONFIG'),
'github-trigger': _validate_github('GITHUB_TRIGGER_CONFIG'),
'gitlab-trigger': _validate_gitlab,
'bitbucket-trigger': _validate_bitbucket,
'google-login': _validate_google_login,
'ssl': _validate_ssl,
'ldap': _validate_ldap,
'jwt': _validate_jwt,
'keystone': _validate_keystone,
'signer': _validate_signer,
'security-scanner': _validate_security_scanner,
'bittorrent': _validate_bittorrent,
}

View file

@ -0,0 +1,20 @@
from abc import ABCMeta, abstractmethod, abstractproperty
from six import add_metaclass
class ConfigValidationException(Exception):
""" Exception raised when the configuration fails to validate for a known reason. """
pass
@add_metaclass(ABCMeta)
class BaseValidator(object):
@abstractproperty
def name(self):
""" The key for the validation API. """
pass
@classmethod
@abstractmethod
def validate(cls, config, user, user_password):
""" Raises Exception if failure to validate. """
pass

View file

@ -0,0 +1,88 @@
import os
import pytest
import shutil
from flask import Flask, jsonify
from flask.ext.login import LoginManager
from peewee import SqliteDatabase
from app import app as application
from data import model
from data.database import (close_db_filter, db)
from data.model.user import LoginWrappedDBUser
from endpoints.api import api_bp
from initdb import initialize_database, populate_database
from path_converters import APIRepositoryPathConverter, RegexConverter
# TODO(jschorr): Unify with the API conftest once the other PR gets in.
@pytest.fixture()
def app(appconfig):
""" Used by pytest-flask plugin to inject app by test for client See test_security by name injection of client. """
app = Flask(__name__)
login_manager = LoginManager(app)
@app.errorhandler(model.DataModelException)
def handle_dme(ex):
response = jsonify({'message': ex.message})
response.status_code = 400
return response
@login_manager.user_loader
def load_user(user_uuid):
return LoginWrappedDBUser(user_uuid)
app.url_map.converters['regex'] = RegexConverter
app.url_map.converters['apirepopath'] = APIRepositoryPathConverter
app.register_blueprint(api_bp, url_prefix='/api')
app.config.update(appconfig)
return app
@pytest.fixture(scope="session")
def init_db_path(tmpdir_factory):
""" Creates a new db and appropriate configuration. Used for parameter by name injection. """
sqlitedb_file = str(tmpdir_factory.mktemp("data").join("test.db"))
sqlitedb = 'sqlite:///{0}'.format(sqlitedb_file)
conf = {"TESTING": True,
"DEBUG": True,
"DB_URI": sqlitedb}
os.environ['TEST_DATABASE_URI'] = str(sqlitedb)
os.environ['DB_URI'] = str(sqlitedb)
db.initialize(SqliteDatabase(sqlitedb_file))
application.config.update(conf)
application.config.update({"DB_URI": sqlitedb})
initialize_database()
populate_database()
close_db_filter(None)
return str(sqlitedb_file)
@pytest.fixture()
def database_uri(monkeypatch, init_db_path, sqlitedb_file):
""" Creates the db uri. Used for parameter by name injection. """
shutil.copy2(init_db_path, sqlitedb_file)
db.initialize(SqliteDatabase(sqlitedb_file))
db_path = 'sqlite:///{0}'.format(sqlitedb_file)
monkeypatch.setenv("DB_URI", db_path)
return db_path
@pytest.fixture()
def sqlitedb_file(tmpdir):
""" Makes file for db. Used for parameter by name injection. """
test_db_file = tmpdir.mkdir("quaydb").join("test.db")
return str(test_db_file)
@pytest.fixture()
def appconfig(database_uri):
""" Makes conf with database_uri. Used for parameter by name injection """
conf = {
"TESTING": True,
"DEBUG": True,
"DB_URI": database_uri,
"SECRET_KEY": 'superdupersecret!!!1',
}
return conf

View file

@ -0,0 +1,40 @@
import pytest
from httmock import urlmatch, HTTMock
from util.config.validators import ConfigValidationException
from util.config.validators.validate_bitbucket_trigger import BitbucketTriggerValidator
@pytest.mark.parametrize('unvalidated_config', [
({}),
({'BITBUCKET_TRIGGER_CONFIG': {}}),
({'BITBUCKET_TRIGGER_CONFIG': {'CONSUMER_KEY': 'foo'}}),
({'BITBUCKET_TRIGGER_CONFIG': {'CONSUMER_SECRET': 'foo'}}),
])
def test_validate_invalid_bitbucket_trigger_config(unvalidated_config):
validator = BitbucketTriggerValidator()
with pytest.raises(ConfigValidationException):
validator.validate(unvalidated_config, None, None)
def test_validate_bitbucket_trigger():
url_hit = [False]
@urlmatch(netloc=r'bitbucket.org')
def handler(url, request):
url_hit[0] = True
return {
'status_code': 200,
'content': 'oauth_token=foo&oauth_token_secret=bar',
}
with HTTMock(handler):
validator = BitbucketTriggerValidator()
validator.validate({
'BITBUCKET_TRIGGER_CONFIG': {
'CONSUMER_KEY': 'foo',
'CONSUMER_SECRET': 'bar',
},
}, None, None)
assert url_hit[0]

View file

@ -0,0 +1,21 @@
import pytest
from util.config.validators import ConfigValidationException
from util.config.validators.validate_database import DatabaseValidator
@pytest.mark.parametrize('unvalidated_config,user,user_password,expected', [
(None, None, None, TypeError),
({}, None, None, KeyError),
({'DB_URI': 'sqlite:///:memory:'}, None, None, None),
({'DB_URI': 'invalid:///:memory:'}, None, None, KeyError),
({'DB_NOTURI': 'sqlite:///:memory:'}, None, None, KeyError),
({'DB_URI': 'mysql:///someinvalid'}, None, None, ConfigValidationException),
])
def test_validate_database(unvalidated_config, user, user_password, expected):
validator = DatabaseValidator()
if expected is not None:
with pytest.raises(expected):
validator.validate(unvalidated_config, user, user_password)
else:
validator.validate(unvalidated_config, user, user_password)

View file

@ -0,0 +1,62 @@
import pytest
from httmock import urlmatch, HTTMock
from util.config.validators import ConfigValidationException
from util.config.validators.validate_github import GitHubLoginValidator, GitHubTriggerValidator
@pytest.fixture(params=[GitHubLoginValidator, GitHubTriggerValidator])
def github_validator(request):
return request.param
@pytest.mark.parametrize('github_config', [
({}),
({'GITHUB_ENDPOINT': 'foo'}),
({'GITHUB_ENDPOINT': 'http://github.com'}),
({'GITHUB_ENDPOINT': 'http://github.com', 'CLIENT_ID': 'foo'}),
({'GITHUB_ENDPOINT': 'http://github.com', 'CLIENT_SECRET': 'foo'}),
({
'GITHUB_ENDPOINT': 'http://github.com',
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'foo',
'ORG_RESTRICT': True
}),
({
'GITHUB_ENDPOINT': 'http://github.com',
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'foo',
'ORG_RESTRICT': True,
'ALLOWED_ORGANIZATIONS': [],
}),
])
def test_validate_invalid_github_config(github_config, github_validator):
with pytest.raises(ConfigValidationException):
unvalidated_config = {}
unvalidated_config[github_validator.config_key] = github_config
github_validator.validate(unvalidated_config, None, None)
def test_validate_github(github_validator):
url_hit = [False, False]
@urlmatch(netloc=r'somehost')
def handler(url, request):
url_hit[0] = True
return {'status_code': 200, 'content': '', 'headers': {'X-GitHub-Request-Id': 'foo'}}
@urlmatch(netloc=r'somehost', path=r'/api/v3/applications/foo/tokens/foo')
def app_handler(url, request):
url_hit[1] = True
return {'status_code': 404, 'content': '', 'headers': {'X-GitHub-Request-Id': 'foo'}}
with HTTMock(app_handler, handler):
github_validator.validate({
github_validator.config_key: {
'GITHUB_ENDPOINT': 'http://somehost',
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
},
}, None, None)
assert url_hit[0]
assert url_hit[1]

View file

@ -0,0 +1,41 @@
import json
import pytest
from httmock import urlmatch, HTTMock
from util.config.validators import ConfigValidationException
from util.config.validators.validate_gitlab_trigger import GitLabTriggerValidator
@pytest.mark.parametrize('unvalidated_config', [
({}),
({'GITLAB_TRIGGER_CONFIG': {}}),
({'GITLAB_TRIGGER_CONFIG': {'GITLAB_ENDPOINT': 'foo'}}),
({'GITLAB_TRIGGER_CONFIG': {'GITLAB_ENDPOINT': 'http://someendpoint', 'CLIENT_ID': 'foo'}}),
({'GITLAB_TRIGGER_CONFIG': {'GITLAB_ENDPOINT': 'http://someendpoint', 'CLIENT_SECRET': 'foo'}}),
])
def test_validate_invalid_gitlab_trigger_config(unvalidated_config):
validator = GitLabTriggerValidator()
with pytest.raises(ConfigValidationException):
validator.validate(unvalidated_config, None, None)
def test_validate_gitlab_trigger():
url_hit = [False]
@urlmatch(netloc=r'somegitlab', path='/oauth/token')
def handler(_, __):
url_hit[0] = True
return {'status_code': 400, 'content': json.dumps({'error': 'invalid code'})}
with HTTMock(handler):
validator = GitLabTriggerValidator()
validator.validate({
'GITLAB_TRIGGER_CONFIG': {
'GITLAB_ENDPOINT': 'http://somegitlab',
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
},
}, None, None)
assert url_hit[0]

View file

@ -0,0 +1,37 @@
import pytest
from httmock import urlmatch, HTTMock
from util.config.validators import ConfigValidationException
from util.config.validators.validate_google_login import GoogleLoginValidator
@pytest.mark.parametrize('unvalidated_config', [
({}),
({'GOOGLE_LOGIN_CONFIG': {}}),
({'GOOGLE_LOGIN_CONFIG': {'CLIENT_ID': 'foo'}}),
({'GOOGLE_LOGIN_CONFIG': {'CLIENT_SECRET': 'foo'}}),
])
def test_validate_invalid_google_login_config(unvalidated_config):
validator = GoogleLoginValidator()
with pytest.raises(ConfigValidationException):
validator.validate(unvalidated_config, None, None)
def test_validate_google_login():
url_hit = [False]
@urlmatch(netloc=r'www.googleapis.com', path='/oauth2/v3/token')
def handler(_, __):
url_hit[0] = True
return {'status_code': 200, 'content': ''}
validator = GoogleLoginValidator()
with HTTMock(handler):
validator.validate({
'GOOGLE_LOGIN_CONFIG': {
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
},
}, None, None)
assert url_hit[0]

View file

@ -0,0 +1,49 @@
import pytest
from util.config.validators import ConfigValidationException
from util.config.validators.validate_jwt import JWTAuthValidator
from util.morecollections import AttrDict
from test.test_external_jwt_authn import fake_jwt
@pytest.mark.parametrize('unvalidated_config', [
({}),
({'AUTHENTICATION_TYPE': 'Database'}),
])
def test_validate_noop(unvalidated_config):
JWTAuthValidator.validate(unvalidated_config, None, None)
@pytest.mark.parametrize('unvalidated_config', [
({'AUTHENTICATION_TYPE': 'JWT'}),
({'AUTHENTICATION_TYPE': 'JWT', 'JWT_AUTH_ISSUER': 'foo'}),
({'AUTHENTICATION_TYPE': 'JWT', 'JWT_VERIFY_ENDPOINT': 'foo'}),
])
def test_invalid_config(unvalidated_config):
with pytest.raises(ConfigValidationException):
JWTAuthValidator.validate(unvalidated_config, None, None)
@pytest.mark.parametrize('username, password, expected_exception', [
('invaliduser', 'invalidpass', ConfigValidationException),
('cool.user', 'invalidpass', ConfigValidationException),
('invaliduser', 'somepass', ConfigValidationException),
('cool.user', 'password', None),
])
def test_validated_jwt(username, password, expected_exception):
with fake_jwt() as jwt_auth:
config = {}
config['AUTHENTICATION_TYPE'] = 'JWT'
config['JWT_AUTH_ISSUER'] = jwt_auth.issuer
config['JWT_VERIFY_ENDPOINT'] = jwt_auth.verify_url
config['JWT_QUERY_ENDPOINT'] = jwt_auth.query_url
config['JWT_GETUSER_ENDPOINT'] = jwt_auth.getuser_url
if expected_exception is not None:
with pytest.raises(ConfigValidationException):
JWTAuthValidator.validate(config, AttrDict(dict(username=username)), password,
public_key_path=jwt_auth.public_key_path)
else:
JWTAuthValidator.validate(config, AttrDict(dict(username=username)), password,
public_key_path=jwt_auth.public_key_path)

View file

@ -0,0 +1,51 @@
import pytest
from util.config.validators import ConfigValidationException
from util.config.validators.validate_keystone import KeystoneValidator
from util.morecollections import AttrDict
from test.test_keystone_auth import fake_keystone
@pytest.mark.parametrize('unvalidated_config', [
({}),
({'AUTHENTICATION_TYPE': 'Database'}),
])
def test_validate_noop(unvalidated_config):
KeystoneValidator.validate(unvalidated_config, None, None)
@pytest.mark.parametrize('unvalidated_config', [
({'AUTHENTICATION_TYPE': 'Keystone'}),
({'AUTHENTICATION_TYPE': 'Keystone', 'KEYSTONE_AUTH_URL': 'foo'}),
({'AUTHENTICATION_TYPE': 'Keystone', 'KEYSTONE_AUTH_URL': 'foo',
'KEYSTONE_ADMIN_USERNAME': 'bar'}),
({'AUTHENTICATION_TYPE': 'Keystone', 'KEYSTONE_AUTH_URL': 'foo',
'KEYSTONE_ADMIN_USERNAME': 'bar', 'KEYSTONE_ADMIN_PASSWORD': 'baz'}),
])
def test_invalid_config(unvalidated_config):
with pytest.raises(ConfigValidationException):
KeystoneValidator.validate(unvalidated_config, None, None)
@pytest.mark.parametrize('username, password, expected_exception', [
('invaliduser', 'invalidpass', ConfigValidationException),
('cool.user', 'invalidpass', ConfigValidationException),
('invaliduser', 'somepass', ConfigValidationException),
('cool.user', 'password', None),
])
def test_validated_keystone(username, password, expected_exception):
with fake_keystone(2) as keystone_auth:
auth_url = keystone_auth.auth_url
config = {}
config['AUTHENTICATION_TYPE'] = 'Keystone'
config['KEYSTONE_AUTH_URL'] = auth_url
config['KEYSTONE_ADMIN_USERNAME'] = 'adminuser'
config['KEYSTONE_ADMIN_PASSWORD'] = 'adminpass'
config['KEYSTONE_ADMIN_TENANT'] = 'admintenant'
if expected_exception is not None:
with pytest.raises(ConfigValidationException):
KeystoneValidator.validate(config, AttrDict(dict(username=username)), password)
else:
KeystoneValidator.validate(config, AttrDict(dict(username=username)), password)

View file

@ -0,0 +1,64 @@
import pytest
from util.config.validators import ConfigValidationException
from util.config.validators.validate_ldap import LDAPValidator
from util.morecollections import AttrDict
from test.test_ldap import mock_ldap
@pytest.mark.parametrize('unvalidated_config', [
({}),
({'AUTHENTICATION_TYPE': 'Database'}),
])
def test_validate_noop(unvalidated_config):
LDAPValidator.validate(unvalidated_config, None, None)
@pytest.mark.parametrize('unvalidated_config', [
({'AUTHENTICATION_TYPE': 'LDAP'}),
({'AUTHENTICATION_TYPE': 'LDAP', 'LDAP_ADMIN_DN': 'foo'}),
])
def test_invalid_config(unvalidated_config):
with pytest.raises(ConfigValidationException):
LDAPValidator.validate(unvalidated_config, None, None)
@pytest.mark.parametrize('uri', [
'foo',
'http://foo',
'ldap:foo',
])
def test_invalid_uri(uri):
config = {}
config['AUTHENTICATION_TYPE'] = 'LDAP'
config['LDAP_BASE_DN'] = ['dc=quay', 'dc=io']
config['LDAP_ADMIN_DN'] = 'uid=testy,ou=employees,dc=quay,dc=io'
config['LDAP_ADMIN_PASSWD'] = 'password'
config['LDAP_USER_RDN'] = ['ou=employees']
config['LDAP_URI'] = uri
with pytest.raises(ConfigValidationException):
LDAPValidator.validate(config, None, None)
@pytest.mark.parametrize('username, password, expected_exception', [
('invaliduser', 'invalidpass', ConfigValidationException),
('someuser', 'invalidpass', ConfigValidationException),
('invaliduser', 'somepass', ConfigValidationException),
('someuser', 'somepass', None),
])
def test_validated_ldap(username, password, expected_exception):
config = {}
config['AUTHENTICATION_TYPE'] = 'LDAP'
config['LDAP_BASE_DN'] = ['dc=quay', 'dc=io']
config['LDAP_ADMIN_DN'] = 'uid=testy,ou=employees,dc=quay,dc=io'
config['LDAP_ADMIN_PASSWD'] = 'password'
config['LDAP_USER_RDN'] = ['ou=employees']
if expected_exception is not None:
with pytest.raises(ConfigValidationException):
with mock_ldap():
LDAPValidator.validate(config, AttrDict(dict(username=username)), password)
else:
with mock_ldap():
LDAPValidator.validate(config, AttrDict(dict(username=username)), password)

View file

@ -0,0 +1,24 @@
import pytest
import redis
from mock import patch
from mockredis import mock_strict_redis_client
from util.config.validators import ConfigValidationException
from util.config.validators.validate_redis import RedisValidator
@pytest.mark.parametrize('unvalidated_config,user,user_password,use_mock,expected', [
({}, None, None, False, ConfigValidationException),
({'BUILDLOGS_REDIS': {}}, None, None, False, ConfigValidationException),
({'BUILDLOGS_REDIS': {'host': 'somehost'}}, None, None, False, redis.ConnectionError),
({'BUILDLOGS_REDIS': {'host': 'localhost'}}, None, None, True, None),
])
def test_validate_redis(unvalidated_config, user, user_password, use_mock, expected):
with patch('redis.StrictRedis' if use_mock else 'redis.None', mock_strict_redis_client):
validator = RedisValidator()
if expected is not None:
with pytest.raises(expected):
validator.validate(unvalidated_config, user, user_password)
else:
validator.validate(unvalidated_config, user, user_password)

View file

@ -0,0 +1,36 @@
import pytest
from util.config.validators import ConfigValidationException
from util.config.validators.validate_secscan import SecurityScannerValidator
from util.secscan.fake import fake_security_scanner
@pytest.mark.parametrize('unvalidated_config', [
({'DISTRIBUTED_STORAGE_PREFERENCE': []}),
])
def test_validate_noop(unvalidated_config, app):
SecurityScannerValidator.validate(unvalidated_config, None, None)
@pytest.mark.parametrize('unvalidated_config, expected_error, error_message', [
({
'TESTING': True,
'DISTRIBUTED_STORAGE_PREFERENCE': [],
'FEATURE_SECURITY_SCANNER': True,
'SECURITY_SCANNER_ENDPOINT': 'http://invalidhost',
}, Exception, 'Connection error when trying to connect to security scanner endpoint'),
({
'TESTING': True,
'DISTRIBUTED_STORAGE_PREFERENCE': [],
'FEATURE_SECURITY_SCANNER': True,
'SECURITY_SCANNER_ENDPOINT': 'http://fakesecurityscanner',
}, None, None),
])
def test_validate(unvalidated_config, expected_error, error_message, app):
with fake_security_scanner(hostname='fakesecurityscanner'):
if expected_error is not None:
with pytest.raises(expected_error) as ipe:
SecurityScannerValidator.validate(unvalidated_config, None, None)
assert ipe.value.message == error_message
else:
SecurityScannerValidator.validate(unvalidated_config, None, None)

View file

@ -0,0 +1,17 @@
import pytest
from util.config.validators import ConfigValidationException
from util.config.validators.validate_signer import SignerValidator
@pytest.mark.parametrize('unvalidated_config,expected', [
({}, None),
({'SIGNING_ENGINE': 'foobar'}, ConfigValidationException),
({'SIGNING_ENGINE': 'gpg2'}, Exception),
])
def test_validate_signer(unvalidated_config,expected):
validator = SignerValidator()
if expected is not None:
with pytest.raises(expected):
validator.validate(unvalidated_config, None, None)
else:
validator.validate(unvalidated_config, None, None)

View file

@ -0,0 +1,62 @@
import pytest
from mock import patch
from tempfile import NamedTemporaryFile
from util.config.validators import ConfigValidationException
from util.config.validators.validate_ssl import SSLValidator, SSL_FILENAMES
from test.test_ssl_util import generate_test_cert
@pytest.mark.parametrize('unvalidated_config', [
({}),
({'PREFERRED_URL_SCHEME': 'http'}),
({'PREFERRED_URL_SCHEME': 'https', 'EXTERNAL_TLS_TERMINATION': True}),
])
def test_skip_validate_ssl(unvalidated_config):
validator = SSLValidator()
validator.validate(unvalidated_config, None, None)
@pytest.mark.parametrize('cert, expected_error, error_message', [
('invalidcert', ConfigValidationException, 'Could not load SSL certificate: no start line'),
(generate_test_cert(hostname='someserver'), None, None),
(generate_test_cert(hostname='invalidserver'), ConfigValidationException,
'Supported names "invalidserver" in SSL cert do not match server hostname "someserver"'),
])
def test_validate_ssl(cert, expected_error, error_message):
with NamedTemporaryFile(delete=False) as cert_file:
cert_file.write(cert[0])
cert_file.seek(0)
with NamedTemporaryFile(delete=False) as key_file:
key_file.write(cert[1])
key_file.seek(0)
def return_true(filename):
return True
def get_volume_file(filename):
if filename == SSL_FILENAMES[0]:
return open(cert_file.name)
if filename == SSL_FILENAMES[1]:
return open(key_file.name)
return None
config = {
'PREFERRED_URL_SCHEME': 'https',
'SERVER_HOSTNAME': 'someserver',
}
with patch('app.config_provider.volume_file_exists', return_true):
with patch('app.config_provider.get_volume_file', get_volume_file):
validator = SSLValidator()
if expected_error is not None:
with pytest.raises(expected_error) as ipe:
validator.validate(config, None, None)
assert ipe.value.message == error_message
else:
validator.validate(config, None, None)

View file

@ -0,0 +1,36 @@
import moto
import pytest
from util.config.validators import ConfigValidationException
from util.config.validators.validate_storage import StorageValidator
@pytest.mark.parametrize('unvalidated_config, expected', [
({}, ConfigValidationException),
({'DISTRIBUTED_STORAGE_CONFIG': {}}, ConfigValidationException),
({'DISTRIBUTED_STORAGE_CONFIG': {'local': None}}, ConfigValidationException),
({'DISTRIBUTED_STORAGE_CONFIG': {'local': ['FakeStorage', {}]}}, None),
])
def test_validate_storage(unvalidated_config, expected):
validator = StorageValidator()
if expected is not None:
with pytest.raises(expected):
validator.validate(unvalidated_config, None, None)
else:
validator.validate(unvalidated_config, None, None)
def test_validate_s3_storage():
validator = StorageValidator()
with moto.mock_s3():
with pytest.raises(ConfigValidationException) as ipe:
validator.validate({
'DISTRIBUTED_STORAGE_CONFIG': {
'default': ('S3Storage', {
's3_access_key': 'invalid',
's3_secret_key': 'invalid',
's3_bucket': 'somebucket',
'storage_path': ''
}),
}
}, None, None)
assert ipe.value.message == 'Invalid storage configuration: default: S3ResponseError: 404 Not Found'

View file

@ -0,0 +1,28 @@
import pytest
from httmock import urlmatch, HTTMock
from util.config.validators import ConfigValidationException
from util.config.validators.validate_torrent import BittorrentValidator
@pytest.mark.parametrize('unvalidated_config,expected', [
({}, ConfigValidationException),
({'BITTORRENT_ANNOUNCE_URL': 'http://faketorrent/announce'}, None),
])
def test_validate_torrent(unvalidated_config,expected):
announcer_hit = [False]
@urlmatch(netloc=r'faketorrent', path='/announce')
def handler(url, request):
announcer_hit[0] = True
return {'status_code': 200, 'content': ''}
with HTTMock(handler):
validator = BittorrentValidator()
if expected is not None:
with pytest.raises(expected):
validator.validate(unvalidated_config, None, None)
assert not announcer_hit[0]
else:
validator.validate(unvalidated_config, None, None)
assert announcer_hit[0]

View file

@ -0,0 +1,29 @@
from bitbucket import BitBucket
from app import get_app_url
from util.config.validators import BaseValidator, ConfigValidationException
class BitbucketTriggerValidator(BaseValidator):
name = "bitbucket-trigger"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the config for BitBucket. """
trigger_config = config.get('BITBUCKET_TRIGGER_CONFIG')
if not trigger_config:
raise ConfigValidationException('Missing client ID and client secret')
if not trigger_config.get('CONSUMER_KEY'):
raise ConfigValidationException('Missing Consumer Key')
if not trigger_config.get('CONSUMER_SECRET'):
raise ConfigValidationException('Missing Consumer Secret')
key = trigger_config['CONSUMER_KEY']
secret = trigger_config['CONSUMER_SECRET']
callback_url = '%s/oauth1/bitbucket/callback/trigger/' % (get_app_url())
bitbucket_client = BitBucket(key, secret, callback_url)
(result, _, _) = bitbucket_client.get_authorization_url()
if not result:
raise ConfigValidationException('Invalid consumer key or secret')

View file

@ -0,0 +1,18 @@
from peewee import OperationalError
from data.database import validate_database_url
from util.config.validators import BaseValidator, ConfigValidationException
class DatabaseValidator(BaseValidator):
name = "database"
@classmethod
def validate(cls, config, user, user_password):
""" Validates connecting to the database. """
try:
validate_database_url(config['DB_URI'], config.get('DB_CONNECTION_ARGS', {}))
except OperationalError as ex:
if ex.args and len(ex.args) > 1:
raise ConfigValidationException(ex.args[1])
else:
raise ex

View file

@ -0,0 +1,25 @@
from flask import Flask
from flask_mail import Mail, Message
from app import app
from util.config.validators import BaseValidator
class EmailValidator(BaseValidator):
name = "mail"
@classmethod
def validate(cls, config, user, user_password):
""" Validates sending email. """
with app.app_context():
test_app = Flask("mail-test-app")
test_app.config.update(config)
test_app.config.update({
'MAIL_FAIL_SILENTLY': False,
'TESTING': False
})
test_mail = Mail(test_app)
test_msg = Message("Test e-mail from %s" % app.config['REGISTRY_TITLE'],
sender=config.get('MAIL_DEFAULT_SENDER'))
test_msg.add_recipient(user.email)
test_mail.send(test_msg)

View file

@ -0,0 +1,51 @@
from app import app
from oauth.services.github import GithubOAuthService
from util.config.validators import BaseValidator, ConfigValidationException
class BaseGitHubValidator(BaseValidator):
name = None
config_key = None
@classmethod
def validate(cls, config, user, user_password):
""" Validates the OAuth credentials and API endpoint for a Github service. """
github_config = config.get(cls.config_key)
if not github_config:
raise ConfigValidationException('Missing GitHub client id and client secret')
endpoint = github_config.get('GITHUB_ENDPOINT')
if not endpoint:
raise ConfigValidationException('Missing GitHub Endpoint')
if endpoint.find('http://') != 0 and endpoint.find('https://') != 0:
raise ConfigValidationException('Github Endpoint must start with http:// or https://')
if not github_config.get('CLIENT_ID'):
raise ConfigValidationException('Missing Client ID')
if not github_config.get('CLIENT_SECRET'):
raise ConfigValidationException('Missing Client Secret')
if github_config.get('ORG_RESTRICT') and not github_config.get('ALLOWED_ORGANIZATIONS'):
raise ConfigValidationException('Organization restriction must have at least one allowed ' +
'organization')
client = app.config['HTTPCLIENT']
oauth = GithubOAuthService(config, cls.config_key)
result = oauth.validate_client_id_and_secret(client, app.config)
if not result:
raise ConfigValidationException('Invalid client id or client secret')
if github_config.get('ALLOWED_ORGANIZATIONS'):
for org_id in github_config.get('ALLOWED_ORGANIZATIONS'):
if not oauth.validate_organization(org_id, client):
raise ConfigValidationException('Invalid organization: %s' % org_id)
class GitHubLoginValidator(BaseGitHubValidator):
name = "github-login"
config_key = "GITHUB_LOGIN_CONFIG"
class GitHubTriggerValidator(BaseGitHubValidator):
name = "github-trigger"
config_key = "GITHUB_TRIGGER_CONFIG"

View file

@ -0,0 +1,32 @@
from app import app
from oauth.services.gitlab import GitLabOAuthService
from util.config.validators import BaseValidator, ConfigValidationException
class GitLabTriggerValidator(BaseValidator):
name = "gitlab-trigger"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the OAuth credentials and API endpoint for a GitLab service. """
github_config = config.get('GITLAB_TRIGGER_CONFIG')
if not github_config:
raise ConfigValidationException('Missing GitLab client id and client secret')
endpoint = github_config.get('GITLAB_ENDPOINT')
if not endpoint:
raise ConfigValidationException('Missing GitLab Endpoint')
if endpoint.find('http://') != 0 and endpoint.find('https://') != 0:
raise ConfigValidationException('GitLab Endpoint must start with http:// or https://')
if not github_config.get('CLIENT_ID'):
raise ConfigValidationException('Missing Client ID')
if not github_config.get('CLIENT_SECRET'):
raise ConfigValidationException('Missing Client Secret')
client = app.config['HTTPCLIENT']
oauth = GitLabOAuthService(config, 'GITLAB_TRIGGER_CONFIG')
result = oauth.validate_client_id_and_secret(client, app.config)
if not result:
raise ConfigValidationException('Invalid client id or client secret')

View file

@ -0,0 +1,25 @@
from app import app
from oauth.services.google import GoogleOAuthService
from util.config.validators import BaseValidator, ConfigValidationException
class GoogleLoginValidator(BaseValidator):
name = "google-login"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the Google Login client ID and secret. """
google_login_config = config.get('GOOGLE_LOGIN_CONFIG')
if not google_login_config:
raise ConfigValidationException('Missing client ID and client secret')
if not google_login_config.get('CLIENT_ID'):
raise ConfigValidationException('Missing Client ID')
if not google_login_config.get('CLIENT_SECRET'):
raise ConfigValidationException('Missing Client Secret')
client = app.config['HTTPCLIENT']
oauth = GoogleOAuthService(config, 'GOOGLE_LOGIN_CONFIG')
result = oauth.validate_client_id_and_secret(client, app.config)
if not result:
raise ConfigValidationException('Invalid client id or client secret')

View file

@ -0,0 +1,62 @@
from app import app, OVERRIDE_CONFIG_DIRECTORY
from data.users.externaljwt import ExternalJWTAuthN
from util.config.validators import BaseValidator, ConfigValidationException
class JWTAuthValidator(BaseValidator):
name = "jwt"
@classmethod
def validate(cls, config, user, user_password, public_key_path=None):
""" Validates the JWT authentication system. """
if config.get('AUTHENTICATION_TYPE', 'Database') != 'JWT':
return
verify_endpoint = config.get('JWT_VERIFY_ENDPOINT')
query_endpoint = config.get('JWT_QUERY_ENDPOINT', None)
getuser_endpoint = config.get('JWT_GETUSER_ENDPOINT', None)
issuer = config.get('JWT_AUTH_ISSUER')
if not verify_endpoint:
raise ConfigValidationException('Missing JWT Verification endpoint')
if not issuer:
raise ConfigValidationException('Missing JWT Issuer ID')
# Try to instatiate the JWT authentication mechanism. This will raise an exception if
# the key cannot be found.
users = ExternalJWTAuthN(verify_endpoint, query_endpoint, getuser_endpoint, issuer,
OVERRIDE_CONFIG_DIRECTORY,
app.config['HTTPCLIENT'],
app.config.get('JWT_AUTH_MAX_FRESH_S', 300),
public_key_path=public_key_path,
requires_email=config.get('FEATURE_MAILING', True))
# Verify that the superuser exists. If not, raise an exception.
username = user.username
(result, err_msg) = users.verify_credentials(username, user_password)
if not result:
msg = ('Verification of superuser %s failed: %s. \n\nThe user either does not ' +
'exist in the remote authentication system ' +
'OR JWT auth is misconfigured') % (username, err_msg)
raise ConfigValidationException(msg)
# If the query endpoint exists, ensure we can query to find the current user and that we can
# look up users directly.
if query_endpoint:
(results, _, err_msg) = users.query_users(username)
if not results:
err_msg = err_msg or ('Could not find users matching query: %s' % username)
raise ConfigValidationException('Query endpoint is misconfigured or not returning ' +
'proper users: %s' % err_msg)
# Make sure the get user endpoint is also configured.
if not getuser_endpoint:
raise ConfigValidationException('The lookup user endpoint must be configured if the ' +
'query endpoint is set')
(result, err_msg) = users.get_user(username)
if not result:
err_msg = err_msg or ('Could not find user %s' % username)
raise ConfigValidationException('Lookup endpoint is misconfigured or not returning ' +
'properly: %s' % err_msg)

View file

@ -0,0 +1,42 @@
from util.config.validators import BaseValidator, ConfigValidationException
from data.users.keystone import get_keystone_users
class KeystoneValidator(BaseValidator):
name = "keystone"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the Keystone authentication system. """
if config.get('AUTHENTICATION_TYPE', 'Database') != 'Keystone':
return
auth_url = config.get('KEYSTONE_AUTH_URL')
auth_version = int(config.get('KEYSTONE_AUTH_VERSION', 2))
admin_username = config.get('KEYSTONE_ADMIN_USERNAME')
admin_password = config.get('KEYSTONE_ADMIN_PASSWORD')
admin_tenant = config.get('KEYSTONE_ADMIN_TENANT')
if not auth_url:
raise ConfigValidationException('Missing authentication URL')
if not admin_username:
raise ConfigValidationException('Missing admin username')
if not admin_password:
raise ConfigValidationException('Missing admin password')
if not admin_tenant:
raise ConfigValidationException('Missing admin tenant')
requires_email = config.get('FEATURE_MAILING', True)
users = get_keystone_users(auth_version, auth_url, admin_username, admin_password, admin_tenant,
requires_email)
# Verify that the superuser exists. If not, raise an exception.
username = user.username
(result, err_msg) = users.verify_credentials(username, user_password)
if not result:
msg = ('Verification of superuser %s failed: %s \n\nThe user either does not ' +
'exist in the remote authentication system ' +
'OR Keystone auth is misconfigured.') % (username, err_msg)
raise ConfigValidationException(msg)

View file

@ -0,0 +1,64 @@
import ldap
import subprocess
from app import app, config_provider
from data.users import LDAP_CERT_FILENAME
from data.users.externalldap import LDAPConnection, LDAPUsers
from util.config.validators import BaseValidator, ConfigValidationException
class LDAPValidator(BaseValidator):
name = "ldap"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the LDAP connection. """
if config.get('AUTHENTICATION_TYPE', 'Database') != 'LDAP':
return
# If there is a custom LDAP certificate, then reinstall the certificates for the container.
if config_provider.volume_file_exists(LDAP_CERT_FILENAME):
subprocess.check_call(['/conf/init/certs_install.sh'])
# Note: raises ldap.INVALID_CREDENTIALS on failure
admin_dn = config.get('LDAP_ADMIN_DN')
admin_passwd = config.get('LDAP_ADMIN_PASSWD')
if not admin_dn:
raise ConfigValidationException('Missing Admin DN for LDAP configuration')
if not admin_passwd:
raise ConfigValidationException('Missing Admin Password for LDAP configuration')
ldap_uri = config.get('LDAP_URI', 'ldap://localhost')
if not ldap_uri.startswith('ldap://') and not ldap_uri.startswith('ldaps://'):
raise ConfigValidationException('LDAP URI must start with ldap:// or ldaps://')
allow_tls_fallback = config.get('LDAP_ALLOW_INSECURE_FALLBACK', False)
try:
with LDAPConnection(ldap_uri, admin_dn, admin_passwd, allow_tls_fallback):
pass
except ldap.LDAPError as ex:
values = ex.args[0] if ex.args else {}
if not isinstance(values, dict):
raise ConfigValidationException(str(ex.args))
raise ConfigValidationException(values.get('desc', 'Unknown error'))
# Verify that the superuser exists. If not, raise an exception.
base_dn = config.get('LDAP_BASE_DN')
user_rdn = config.get('LDAP_USER_RDN', [])
uid_attr = config.get('LDAP_UID_ATTR', 'uid')
email_attr = config.get('LDAP_EMAIL_ATTR', 'mail')
requires_email = config.get('FEATURE_MAILING', True)
users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr,
allow_tls_fallback, requires_email=requires_email)
username = user.username
(result, err_msg) = users.verify_credentials(username, user_password)
if not result:
msg = ('Verification of superuser %s failed: %s. \n\nThe user either does not exist ' +
'in the remote authentication system ' +
'OR LDAP auth is misconfigured.') % (username, err_msg)
raise ConfigValidationException(msg)

View file

@ -0,0 +1,16 @@
import redis
from util.config.validators import BaseValidator, ConfigValidationException
class RedisValidator(BaseValidator):
name = "redis"
@classmethod
def validate(cls, config, user, user_password):
""" Validates connecting to redis. """
redis_config = config.get('BUILDLOGS_REDIS', {})
if not 'host' in redis_config:
raise ConfigValidationException('Missing redis hostname')
client = redis.StrictRedis(socket_connect_timeout=5, **redis_config)
client.ping()

View file

@ -0,0 +1,36 @@
import time
from app import app
from boot import setup_jwt_proxy
from util.secscan.api import SecurityScannerAPI
from util.config.validators import BaseValidator, ConfigValidationException
class SecurityScannerValidator(BaseValidator):
name = "security-scanner"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the configuration for talking to a Quay Security Scanner. """
if not config.get('FEATURE_SECURITY_SCANNER', False):
return
client = app.config['HTTPCLIENT']
api = SecurityScannerAPI(app, config, None, client=client, skip_validation=True)
if not config.get('TESTING', False):
# Generate a temporary Quay key to use for signing the outgoing requests.
setup_jwt_proxy()
# We have to wait for JWT proxy to restart with the newly generated key.
max_tries = 5
response = None
while max_tries > 0:
response = api.ping()
if response.status_code == 200:
return
time.sleep(1)
max_tries = max_tries - 1
message = 'Expected 200 status code, got %s: %s' % (response.status_code, response.text)
raise ConfigValidationException('Could not ping security scanner: %s' % message)

View file

@ -0,0 +1,20 @@
from StringIO import StringIO
from app import config_provider
from util.config.validators import BaseValidator, ConfigValidationException
from util.security.signing import SIGNING_ENGINES
class SignerValidator(BaseValidator):
name = "signer"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the GPG public+private key pair used for signing converted ACIs. """
if config.get('SIGNING_ENGINE') is None:
return
if config['SIGNING_ENGINE'] not in SIGNING_ENGINES:
raise ConfigValidationException('Unknown signing engine: %s' % config['SIGNING_ENGINE'])
engine = SIGNING_ENGINES[config['SIGNING_ENGINE']](config, config_provider)
engine.detached_sign(StringIO('test string'))

View file

@ -0,0 +1,59 @@
from app import config_provider
from util.config.validators import BaseValidator, ConfigValidationException
from util.security.ssl import load_certificate, CertInvalidException, KeyInvalidException
SSL_FILENAMES = ['ssl.cert', 'ssl.key']
class SSLValidator(BaseValidator):
name = "ssl"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the SSL configuration (if enabled). """
# Skip if non-SSL.
if config.get('PREFERRED_URL_SCHEME', 'http') != 'https':
return
# Skip if externally terminated.
if config.get('EXTERNAL_TLS_TERMINATION', False) is True:
return
# Verify that we have all the required SSL files.
for filename in SSL_FILENAMES:
if not config_provider.volume_file_exists(filename):
raise ConfigValidationException('Missing required SSL file: %s' % filename)
# Read the contents of the SSL certificate.
with config_provider.get_volume_file(SSL_FILENAMES[0]) as f:
cert_contents = f.read()
# Validate the certificate.
try:
certificate = load_certificate(cert_contents)
except CertInvalidException as cie:
raise ConfigValidationException('Could not load SSL certificate: %s' % cie.message)
# Verify the certificate has not expired.
if certificate.expired:
raise ConfigValidationException('The specified SSL certificate has expired.')
# Verify the hostname matches the name in the certificate.
if not certificate.matches_name(config['SERVER_HOSTNAME']):
msg = ('Supported names "%s" in SSL cert do not match server hostname "%s"' %
(', '.join(list(certificate.names)), config['SERVER_HOSTNAME']))
raise ConfigValidationException(msg)
# Verify the private key against the certificate.
private_key_path = None
with config_provider.get_volume_file(SSL_FILENAMES[1]) as f:
private_key_path = f.name
if not private_key_path:
# Only in testing.
return
try:
certificate.validate_private_key(private_key_path)
except KeyInvalidException as kie:
raise ConfigValidationException('SSL private key failed to validate: %s' % kie.message)

View file

@ -0,0 +1,43 @@
from app import app
from storage import get_storage_driver
from util.config.validators import BaseValidator, ConfigValidationException
class StorageValidator(BaseValidator):
name = "registry-storage"
@classmethod
def validate(cls, config, user, user_password):
""" Validates registry storage. """
replication_enabled = config.get('FEATURE_STORAGE_REPLICATION', False)
providers = _get_storage_providers(config).items()
if not providers:
raise ConfigValidationException('Storage configuration required')
for name, (storage_type, driver) in providers:
try:
if replication_enabled and storage_type == 'LocalStorage':
raise ConfigValidationException('Locally mounted directory not supported ' +
'with storage replication')
# Run validation on the driver.
driver.validate(app.config['HTTPCLIENT'])
# Run setup on the driver if the read/write succeeded.
driver.setup()
except Exception as ex:
msg = str(ex).strip()
raise ConfigValidationException('Invalid storage configuration: %s: %s' % (name, msg))
def _get_storage_providers(config):
storage_config = config.get('DISTRIBUTED_STORAGE_CONFIG', {})
drivers = {}
try:
for name, parameters in storage_config.items():
drivers[name] = (parameters[0], get_storage_driver(None, None, None, parameters))
except TypeError:
raise ConfigValidationException('Missing required parameter(s) for storage %s' % name)
return drivers

View file

@ -0,0 +1,53 @@
import logging
from hashlib import sha1
from app import app
from util.config.validators import BaseValidator, ConfigValidationException
from util.registry.torrent import torrent_jwt
logger = logging.getLogger(__name__)
class BittorrentValidator(BaseValidator):
name = "bittorrent"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the configuration for using BitTorrent for downloads. """
announce_url = config.get('BITTORRENT_ANNOUNCE_URL')
if not announce_url:
raise ConfigValidationException('Missing announce URL')
# Ensure that the tracker is reachable and accepts requests signed with a registry key.
client = app.config['HTTPCLIENT']
params = {
'info_hash': sha1('somedata').digest(),
'peer_id': '-QUAY00-6wfG2wk6wWLc',
'uploaded': 0,
'downloaded': 0,
'left': 0,
'numwant': 0,
'port': 80,
}
encoded_jwt = torrent_jwt(params)
params['jwt'] = encoded_jwt
resp = client.get(announce_url, timeout=5, params=params)
logger.debug('Got tracker response: %s: %s', resp.status_code, resp.text)
if resp.status_code == 404:
raise ConfigValidationException('Announce path not found; did you forget `/announce`?')
if resp.status_code == 500:
raise ConfigValidationException('Did not get expected response from Tracker; ' +
'please check your settings')
if resp.status_code == 200:
if 'invalid jwt' in resp.text:
raise ConfigValidationException('Could not authorize to Tracker; is your Tracker ' +
'properly configured?')
if 'failure reason' in resp.text:
raise ConfigValidationException('Could not validate signed announce request: ' + resp.text)

View file

@ -316,6 +316,13 @@ class FakeSecurityScanner(object):
'content': json.dumps(response),
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/metrics$', method='GET')
def metrics(url, _):
return {
'status_code': 200,
'content': json.dumps({'fake': True}),
}
@all_requests
def response_content(url, _):
return {
@ -324,4 +331,4 @@ class FakeSecurityScanner(object):
}
return [get_layer_mock, post_layer_mock, remove_layer_mock, get_notification,
delete_notification, response_content]
delete_notification, metrics, response_content]