Add ping method to auth engines to determine if they are reachable

This commit is contained in:
Joseph Schorr 2017-05-10 20:55:10 -07:00
parent a6ea16abc5
commit 0dfb6806e3
6 changed files with 67 additions and 0 deletions

View file

@ -150,6 +150,10 @@ class UserAuthentication(object):
return data.get('password', encrypted) return data.get('password', encrypted)
def ping(self):
""" Returns whether the authentication engine is reachable and working. """
return self.state.ping()
@property @property
def federated_service(self): def federated_service(self):
""" Returns the name of the federated service for the auth system. If none, should return None. """ Returns the name of the federated service for the auth system. If none, should return None.

View file

@ -5,6 +5,10 @@ class DatabaseUsers(object):
def federated_service(self): def federated_service(self):
return None return None
def ping(self):
""" Always assumed to be working. If the DB is broken, other checks will handle it. """
return (True, None)
def verify_credentials(self, username_or_email, password): def verify_credentials(self, username_or_email, password):
""" Simply delegate to the model implementation. """ """ Simply delegate to the model implementation. """
result = model.user.verify_user(username_or_email, password) result = model.user.verify_user(username_or_email, password)

View file

@ -37,6 +37,12 @@ class ExternalJWTAuthN(FederatedUsers):
with open(public_key_path) as public_key_file: with open(public_key_path) as public_key_file:
self.public_key = public_key_file.read() self.public_key = public_key_file.read()
def ping(self):
result = self.client.get(self.getuser_url, timeout=2)
if result.status_code // 100 != 4:
return (False, result.text or 'Could not reach JWT authn endpoint')
return (True, None)
def get_user(self, username_or_email): def get_user(self, username_or_email):
if self.getuser_url is None: if self.getuser_url is None:

View file

@ -193,6 +193,18 @@ class LDAPUsers(FederatedUsers):
email = response.get(self._email_attr, [None])[0] email = response.get(self._email_attr, [None])[0]
return (UserInformation(username=username, email=email, id=username), None) return (UserInformation(username=username, email=email, id=username), None)
def ping(self):
try:
with self._ldap.get_connection():
pass
except ldap.INVALID_CREDENTIALS:
return (False, 'LDAP Admin dn or password is invalid')
except ldap.LDAPError as lde:
logger.exception('Exception when trying to health check LDAP')
return (False, lde.message)
return (True, None)
def get_user(self, username_or_email): def get_user(self, username_or_email):
""" Looks up a username or email in LDAP. """ """ Looks up a username or email in LDAP. """
logger.debug('Looking up LDAP username or email %s', username_or_email) logger.debug('Looking up LDAP username or email %s', username_or_email)

View file

@ -36,6 +36,21 @@ class KeystoneV2Users(FederatedUsers):
self.debug = os.environ.get('USERS_DEBUG') == '1' self.debug = os.environ.get('USERS_DEBUG') == '1'
self.requires_email = requires_email self.requires_email = requires_email
def ping(self):
try:
keystone_client = kclient.Client(username=self.admin_username, password=self.admin_password,
tenant_name=self.admin_tenant, auth_url=self.auth_url,
timeout=self.timeout, debug=self.debug)
keystone_client.user_id # Make sure we loaded a valid user.
except KeystoneUnauthorized as kut:
logger.exception('Keystone unauthorized admin')
return (False, 'Keystone admin credentials are invalid: %s' % kut.message)
except Exception:
logger.exception('Keystone unauthorized admin')
return (False, 'Keystone ping check failed: %s' % kut.message)
return (True, None)
def verify_credentials(self, username_or_email, password): def verify_credentials(self, username_or_email, password):
try: try:
keystone_client = kclient.Client(username=username_or_email, password=password, keystone_client = kclient.Client(username=username_or_email, password=password,
@ -89,6 +104,18 @@ class KeystoneV3Users(FederatedUsers):
tenant_name=self.admin_tenant, auth_url=self.auth_url, tenant_name=self.admin_tenant, auth_url=self.auth_url,
timeout=self.timeout, debug=self.debug) timeout=self.timeout, debug=self.debug)
def ping(self):
try:
self._get_admin_client().user_id # Make sure we loaded a valid user
except KeystoneUnauthorized as kut:
logger.exception('Keystone unauthorized admin')
return (False, 'Keystone admin credentials are invalid: %s' % kut.message)
except Exception:
logger.exception('Keystone unauthorized admin')
return (False, 'Keystone ping check failed: %s' % kut.message)
return (True, None)
def verify_credentials(self, username_or_email, password): def verify_credentials(self, username_or_email, password):
try: try:
keystone_client = kv3client.Client(username=username_or_email, password=password, keystone_client = kv3client.Client(username=username_or_email, password=password,

View file

@ -6,6 +6,7 @@ from data.database import model
from data.users.federated import DISABLED_MESSAGE from data.users.federated import DISABLED_MESSAGE
from test.test_ldap import mock_ldap from test.test_ldap import mock_ldap
from test.test_keystone_auth import fake_keystone from test.test_keystone_auth import fake_keystone
from test.test_external_jwt_authn import fake_jwt
from test.fixtures import * from test.fixtures import *
@ -34,3 +35,16 @@ def test_auth_createuser(auth_system_builder, user1, user2, config, app):
new_user, err = auth.verify_and_link_user(*user2) new_user, err = auth.verify_and_link_user(*user2)
assert new_user is None assert new_user is None
assert err == DISABLED_MESSAGE assert err == DISABLED_MESSAGE
@pytest.mark.parametrize('auth_system_builder,auth_kwargs', [
(mock_ldap, {}),
(fake_keystone, {'version': 3}),
(fake_keystone, {'version': 2}),
(fake_jwt, {}),
])
def test_ping(auth_system_builder, auth_kwargs, app):
with auth_system_builder(**auth_kwargs) as auth:
status, err = auth.ping()
assert status
assert err is None