This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/config/validator.py

170 lines
No EOL
5 KiB
Python

import redis
import os
import json
import ldap
from data.users import LDAPConnection
from flask import Flask
from flask.ext.mail import Mail, Message
from data.database import validate_database_url, User
from storage import get_storage_driver
from app import app, OVERRIDE_CONFIG_DIRECTORY
from auth.auth_context import get_authenticated_user
from util.oauth import GoogleOAuthConfig, GithubOAuthConfig
SSL_FILENAMES = ['ssl.cert', 'ssl.key']
def validate_service_for_config(service, config):
""" Attempts to validate the configuration for the given service. """
if not service in _VALIDATORS:
return {
'status': False
}
try:
_VALIDATORS[service](config)
return {
'status': True
}
except Exception as ex:
return {
'status': False,
'reason': str(ex)
}
def _validate_database(config):
""" Validates connecting to the database. """
validate_database_url(config['DB_URI'])
def _validate_redis(config):
""" Validates connecting to redis. """
redis_config = config['BUILDLOGS_REDIS']
client = redis.StrictRedis(socket_connect_timeout=5, **redis_config)
client.ping()
def _validate_registry_storage(config):
""" Validates registry storage. """
parameters = config.get('DISTRIBUTED_STORAGE_CONFIG', {}).get('local', ['LocalStorage', {}])
try:
driver = get_storage_driver(parameters)
except TypeError:
raise Exception('Missing required storage configuration parameter(s)')
# Put and remove a temporary file.
driver.put_content('_verify', 'testing 123')
driver.remove('_verify')
def _validate_mailing(config):
""" Validates sending email. """
test_app = Flask("mail-test-app")
test_app.config.update(config)
test_app.config.update({
'MAIL_FAIL_SILENTLY': False,
'TESTING': False
})
test_mail = Mail(test_app)
test_msg = Message("Test e-mail from %s" % app.config['REGISTRY_TITLE'])
test_msg.add_recipient(get_authenticated_user().email)
test_mail.send(test_msg)
def _validate_github(config_key):
return lambda config: _validate_github_with_key(config_key, config)
def _validate_github_with_key(config_key, config):
""" Validates the OAuth credentials and API endpoint for a Github service. """
github_config = config.get(config_key)
if not github_config:
raise Exception('Missing Github client id and client secret')
endpoint = github_config.get('GITHUB_ENDPOINT')
if not endpoint:
raise Exception('Missing Github Endpoint')
if endpoint.find('http://') != 0 and endpoint.find('https://') != 0:
raise Exception('Github Endpoint must start with http:// or https://')
if not github_config.get('CLIENT_ID'):
raise Exception('Missing Client ID')
if not github_config.get('CLIENT_SECRET'):
raise Exception('Missing Client Secret')
client = app.config['HTTPCLIENT']
oauth = GithubOAuthConfig(config, config_key)
result = oauth.validate_client_id_and_secret(client)
if not result:
raise Exception('Invalid client id or client secret')
def _validate_google_login(config):
""" Validates the Google Login client ID and secret. """
google_login_config = config.get('GOOGLE_LOGIN_CONFIG')
if not google_login_config:
raise Exception('Missing client ID and client secret')
if not google_login_config.get('CLIENT_ID'):
raise Exception('Missing Client ID')
if not google_login_config.get('CLIENT_SECRET'):
raise Exception('Missing Client Secret')
client = app.config['HTTPCLIENT']
oauth = GoogleOAuthConfig(config, 'GOOGLE_LOGIN_CONFIG')
result = oauth.validate_client_id_and_secret(client)
if not result:
raise Exception('Invalid client id or client secret')
def _validate_ssl(config):
""" Validates the SSL configuration (if enabled). """
if config.get('PREFERRED_URL_SCHEME', 'http') != 'https':
return
for filename in SSL_FILENAMES:
if not os.path.exists(os.path.join(OVERRIDE_CONFIG_DIRECTORY, filename)):
raise Exception('Missing required SSL file: %s' % filename)
def _validate_ldap(config):
""" Validates the LDAP connection. """
if config.get('AUTHENTICATION_TYPE', 'Database') != 'LDAP':
return
# Note: raises ldap.INVALID_CREDENTIALS on failure
admin_dn = config.get('LDAP_ADMIN_DN')
admin_passwd = config.get('LDAP_ADMIN_PASSWD')
if not admin_dn:
raise Exception('Missing Admin DN for LDAP configuration')
if not admin_passwd:
raise Exception('Missing Admin Password for LDAP configuration')
ldap_uri = config.get('LDAP_URI', 'ldap://localhost')
try:
with LDAPConnection(ldap_uri, admin_dn, admin_passwd):
pass
except ldap.LDAPError as ex:
values = ex.args[0] if ex.args else {}
raise Exception(values.get('desc', 'Unknown error'))
_VALIDATORS = {
'database': _validate_database,
'redis': _validate_redis,
'registry-storage': _validate_registry_storage,
'mail': _validate_mailing,
'github-login': _validate_github('GITHUB_LOGIN_CONFIG'),
'github-trigger': _validate_github('GITHUB_TRIGGER_CONFIG'),
'google-login': _validate_google_login,
'ssl': _validate_ssl,
'ldap': _validate_ldap,
}