Pull out JWT auth validation into validator class
Also fixes a small bug in validation (yay tests!)
This commit is contained in:
parent
678f868bc4
commit
c0f7530b29
5 changed files with 118 additions and 60 deletions
|
@ -28,11 +28,12 @@ class ExternalJWTAuthN(FederatedUsers):
|
||||||
default_key_path = os.path.join(override_config_dir, ExternalJWTAuthN.PUBLIC_KEY_FILENAME)
|
default_key_path = os.path.join(override_config_dir, ExternalJWTAuthN.PUBLIC_KEY_FILENAME)
|
||||||
public_key_path = public_key_path or default_key_path
|
public_key_path = public_key_path or default_key_path
|
||||||
if not os.path.exists(public_key_path):
|
if not os.path.exists(public_key_path):
|
||||||
error_message = ('JWT Authentication public key file "%s" not found in directory %s' %
|
error_message = ('JWT Authentication public key file "%s" not found' % public_key_path)
|
||||||
(ExternalJWTAuthN.PUBLIC_KEY_FILENAME, override_config_dir))
|
|
||||||
|
|
||||||
raise Exception(error_message)
|
raise Exception(error_message)
|
||||||
|
|
||||||
|
self.public_key_path = public_key_path
|
||||||
|
|
||||||
with open(public_key_path) as public_key_file:
|
with open(public_key_path) as public_key_file:
|
||||||
self.public_key = public_key_file.read()
|
self.public_key = public_key_file.read()
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,8 @@ def fake_jwt(requires_email=True):
|
||||||
getuser_url = server_url + '/user/get'
|
getuser_url = server_url + '/user/get'
|
||||||
|
|
||||||
jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
|
jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
|
||||||
app.config['HTTPCLIENT'], 300, public_key.name,
|
app.config['HTTPCLIENT'], 300,
|
||||||
|
public_key_path=public_key.name,
|
||||||
requires_email=requires_email)
|
requires_email=requires_email)
|
||||||
|
|
||||||
with liveserver_app(jwt_app, port):
|
with liveserver_app(jwt_app, port):
|
||||||
|
|
|
@ -14,7 +14,6 @@ from bitbucket import BitBucket
|
||||||
from boot import setup_jwt_proxy
|
from boot import setup_jwt_proxy
|
||||||
from data.database import validate_database_url
|
from data.database import validate_database_url
|
||||||
from data.users import LDAP_CERT_FILENAME
|
from data.users import LDAP_CERT_FILENAME
|
||||||
from data.users.externaljwt import ExternalJWTAuthN
|
|
||||||
from oauth.services.github import GithubOAuthService
|
from oauth.services.github import GithubOAuthService
|
||||||
from oauth.services.google import GoogleOAuthService
|
from oauth.services.google import GoogleOAuthService
|
||||||
from oauth.services.gitlab import GitLabOAuthService
|
from oauth.services.gitlab import GitLabOAuthService
|
||||||
|
@ -29,6 +28,7 @@ from util.config.validators.validate_storage import StorageValidator
|
||||||
from util.config.validators.validate_email import EmailValidator
|
from util.config.validators.validate_email import EmailValidator
|
||||||
from util.config.validators.validate_ldap import LDAPValidator
|
from util.config.validators.validate_ldap import LDAPValidator
|
||||||
from util.config.validators.validate_keystone import KeystoneValidator
|
from util.config.validators.validate_keystone import KeystoneValidator
|
||||||
|
from util.config.validators.validate_jwt import JWTAuthValidator
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -236,61 +236,6 @@ def _validate_ssl(config, user_obj, _):
|
||||||
raise ConfigValidationException('SSL private key failed to validate: %s' % kie.message)
|
raise ConfigValidationException('SSL private key failed to validate: %s' % kie.message)
|
||||||
|
|
||||||
|
|
||||||
def _validate_jwt(config, user_obj, password):
|
|
||||||
""" Validates the JWT authentication system. """
|
|
||||||
if config.get('AUTHENTICATION_TYPE', 'Database') != 'JWT':
|
|
||||||
return
|
|
||||||
|
|
||||||
verify_endpoint = config.get('JWT_VERIFY_ENDPOINT')
|
|
||||||
query_endpoint = config.get('JWT_QUERY_ENDPOINT', None)
|
|
||||||
getuser_endpoint = config.get('JWT_GETUSER_ENDPOINT', None)
|
|
||||||
|
|
||||||
issuer = config.get('JWT_AUTH_ISSUER')
|
|
||||||
|
|
||||||
if not verify_endpoint:
|
|
||||||
raise ConfigValidationException('Missing JWT Verification endpoint')
|
|
||||||
|
|
||||||
if not issuer:
|
|
||||||
raise ConfigValidationException('Missing JWT Issuer ID')
|
|
||||||
|
|
||||||
# Try to instatiate the JWT authentication mechanism. This will raise an exception if
|
|
||||||
# the key cannot be found.
|
|
||||||
users = ExternalJWTAuthN(verify_endpoint, query_endpoint, getuser_endpoint, issuer,
|
|
||||||
OVERRIDE_CONFIG_DIRECTORY,
|
|
||||||
app.config['HTTPCLIENT'],
|
|
||||||
app.config.get('JWT_AUTH_MAX_FRESH_S', 300),
|
|
||||||
requires_email=config.get('FEATURE_MAILING', True))
|
|
||||||
|
|
||||||
# Verify that the superuser exists. If not, raise an exception.
|
|
||||||
username = user_obj.username
|
|
||||||
(result, err_msg) = users.verify_credentials(username, password)
|
|
||||||
if not result:
|
|
||||||
msg = ('Verification of superuser %s failed: %s. \n\nThe user either does not ' +
|
|
||||||
'exist in the remote authentication system ' +
|
|
||||||
'OR JWT auth is misconfigured') % (username, err_msg)
|
|
||||||
raise ConfigValidationException(msg)
|
|
||||||
|
|
||||||
# If the query endpoint exists, ensure we can query to find the current user and that we can
|
|
||||||
# look up users directly.
|
|
||||||
if query_endpoint:
|
|
||||||
(results, err_msg) = users.query_users(username)
|
|
||||||
if not results:
|
|
||||||
err_msg = err_msg or ('Could not find users matching query: %s' % username)
|
|
||||||
raise ConfigValidationException('Query endpoint is misconfigured or not returning ' +
|
|
||||||
'proper users: %s' % err_msg)
|
|
||||||
|
|
||||||
# Make sure the get user endpoint is also configured.
|
|
||||||
if not getuser_endpoint:
|
|
||||||
raise ConfigValidationException('The lookup user endpoint must be configured if the ' +
|
|
||||||
'query endpoint is set')
|
|
||||||
|
|
||||||
(result, err_msg) = users.get_user(username)
|
|
||||||
if not result:
|
|
||||||
err_msg = err_msg or ('Could not find user %s' % username)
|
|
||||||
raise ConfigValidationException('Lookup endpoint is misconfigured or not returning ' +
|
|
||||||
'properly: %s' % err_msg)
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_signer(config, user_obj, _):
|
def _validate_signer(config, user_obj, _):
|
||||||
""" Validates the GPG public+private key pair used for signing converted ACIs. """
|
""" Validates the GPG public+private key pair used for signing converted ACIs. """
|
||||||
if config.get('SIGNING_ENGINE') is None:
|
if config.get('SIGNING_ENGINE') is None:
|
||||||
|
@ -380,7 +325,7 @@ VALIDATORS = {
|
||||||
'google-login': _validate_google_login,
|
'google-login': _validate_google_login,
|
||||||
'ssl': _validate_ssl,
|
'ssl': _validate_ssl,
|
||||||
LDAPValidator.name: LDAPValidator.validate,
|
LDAPValidator.name: LDAPValidator.validate,
|
||||||
'jwt': _validate_jwt,
|
JWTAuthValidator.name: JWTAuthValidator.validate,
|
||||||
KeystoneValidator.name: KeystoneValidator.validate,
|
KeystoneValidator.name: KeystoneValidator.validate,
|
||||||
'signer': _validate_signer,
|
'signer': _validate_signer,
|
||||||
'security-scanner': _validate_security_scanner,
|
'security-scanner': _validate_security_scanner,
|
||||||
|
|
49
util/config/validators/test/test_validate_jwt.py
Normal file
49
util/config/validators/test/test_validate_jwt.py
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from util.config.validators import ConfigValidationException
|
||||||
|
from util.config.validators.validate_jwt import JWTAuthValidator
|
||||||
|
from util.morecollections import AttrDict
|
||||||
|
|
||||||
|
from test.test_external_jwt_authn import fake_jwt
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('unvalidated_config', [
|
||||||
|
({}),
|
||||||
|
({'AUTHENTICATION_TYPE': 'Database'}),
|
||||||
|
])
|
||||||
|
def test_validate_noop(unvalidated_config):
|
||||||
|
JWTAuthValidator.validate(unvalidated_config, None, None)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('unvalidated_config', [
|
||||||
|
({'AUTHENTICATION_TYPE': 'JWT'}),
|
||||||
|
({'AUTHENTICATION_TYPE': 'JWT', 'JWT_AUTH_ISSUER': 'foo'}),
|
||||||
|
({'AUTHENTICATION_TYPE': 'JWT', 'JWT_VERIFY_ENDPOINT': 'foo'}),
|
||||||
|
])
|
||||||
|
def test_invalid_config(unvalidated_config):
|
||||||
|
with pytest.raises(ConfigValidationException):
|
||||||
|
JWTAuthValidator.validate(unvalidated_config, None, None)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('username, password, expected_exception', [
|
||||||
|
('invaliduser', 'invalidpass', ConfigValidationException),
|
||||||
|
('cool.user', 'invalidpass', ConfigValidationException),
|
||||||
|
('invaliduser', 'somepass', ConfigValidationException),
|
||||||
|
('cool.user', 'password', None),
|
||||||
|
])
|
||||||
|
def test_validated_jwt(username, password, expected_exception):
|
||||||
|
with fake_jwt() as jwt_auth:
|
||||||
|
config = {}
|
||||||
|
config['AUTHENTICATION_TYPE'] = 'JWT'
|
||||||
|
config['JWT_AUTH_ISSUER'] = jwt_auth.issuer
|
||||||
|
config['JWT_VERIFY_ENDPOINT'] = jwt_auth.verify_url
|
||||||
|
config['JWT_QUERY_ENDPOINT'] = jwt_auth.query_url
|
||||||
|
config['JWT_GETUSER_ENDPOINT'] = jwt_auth.getuser_url
|
||||||
|
|
||||||
|
if expected_exception is not None:
|
||||||
|
with pytest.raises(ConfigValidationException):
|
||||||
|
JWTAuthValidator.validate(config, AttrDict(dict(username=username)), password,
|
||||||
|
public_key_path=jwt_auth.public_key_path)
|
||||||
|
else:
|
||||||
|
JWTAuthValidator.validate(config, AttrDict(dict(username=username)), password,
|
||||||
|
public_key_path=jwt_auth.public_key_path)
|
62
util/config/validators/validate_jwt.py
Normal file
62
util/config/validators/validate_jwt.py
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
from app import app, OVERRIDE_CONFIG_DIRECTORY
|
||||||
|
from data.users.externaljwt import ExternalJWTAuthN
|
||||||
|
from util.config.validators import BaseValidator, ConfigValidationException
|
||||||
|
|
||||||
|
class JWTAuthValidator(BaseValidator):
|
||||||
|
name = "jwt"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def validate(cls, config, user, user_password, public_key_path=None):
|
||||||
|
""" Validates the JWT authentication system. """
|
||||||
|
if config.get('AUTHENTICATION_TYPE', 'Database') != 'JWT':
|
||||||
|
return
|
||||||
|
|
||||||
|
verify_endpoint = config.get('JWT_VERIFY_ENDPOINT')
|
||||||
|
query_endpoint = config.get('JWT_QUERY_ENDPOINT', None)
|
||||||
|
getuser_endpoint = config.get('JWT_GETUSER_ENDPOINT', None)
|
||||||
|
|
||||||
|
issuer = config.get('JWT_AUTH_ISSUER')
|
||||||
|
|
||||||
|
if not verify_endpoint:
|
||||||
|
raise ConfigValidationException('Missing JWT Verification endpoint')
|
||||||
|
|
||||||
|
if not issuer:
|
||||||
|
raise ConfigValidationException('Missing JWT Issuer ID')
|
||||||
|
|
||||||
|
# Try to instatiate the JWT authentication mechanism. This will raise an exception if
|
||||||
|
# the key cannot be found.
|
||||||
|
users = ExternalJWTAuthN(verify_endpoint, query_endpoint, getuser_endpoint, issuer,
|
||||||
|
OVERRIDE_CONFIG_DIRECTORY,
|
||||||
|
app.config['HTTPCLIENT'],
|
||||||
|
app.config.get('JWT_AUTH_MAX_FRESH_S', 300),
|
||||||
|
public_key_path=public_key_path,
|
||||||
|
requires_email=config.get('FEATURE_MAILING', True))
|
||||||
|
|
||||||
|
# Verify that the superuser exists. If not, raise an exception.
|
||||||
|
username = user.username
|
||||||
|
(result, err_msg) = users.verify_credentials(username, user_password)
|
||||||
|
if not result:
|
||||||
|
msg = ('Verification of superuser %s failed: %s. \n\nThe user either does not ' +
|
||||||
|
'exist in the remote authentication system ' +
|
||||||
|
'OR JWT auth is misconfigured') % (username, err_msg)
|
||||||
|
raise ConfigValidationException(msg)
|
||||||
|
|
||||||
|
# If the query endpoint exists, ensure we can query to find the current user and that we can
|
||||||
|
# look up users directly.
|
||||||
|
if query_endpoint:
|
||||||
|
(results, _, err_msg) = users.query_users(username)
|
||||||
|
if not results:
|
||||||
|
err_msg = err_msg or ('Could not find users matching query: %s' % username)
|
||||||
|
raise ConfigValidationException('Query endpoint is misconfigured or not returning ' +
|
||||||
|
'proper users: %s' % err_msg)
|
||||||
|
|
||||||
|
# Make sure the get user endpoint is also configured.
|
||||||
|
if not getuser_endpoint:
|
||||||
|
raise ConfigValidationException('The lookup user endpoint must be configured if the ' +
|
||||||
|
'query endpoint is set')
|
||||||
|
|
||||||
|
(result, err_msg) = users.get_user(username)
|
||||||
|
if not result:
|
||||||
|
err_msg = err_msg or ('Could not find user %s' % username)
|
||||||
|
raise ConfigValidationException('Lookup endpoint is misconfigured or not returning ' +
|
||||||
|
'properly: %s' % err_msg)
|
Reference in a new issue