Add check_group_lookup_args and service_metadata to auth providers
This commit is contained in:
parent
1cfc4a8341
commit
ecfac81721
5 changed files with 71 additions and 0 deletions
|
@ -186,6 +186,18 @@ class UserAuthentication(object):
|
|||
""" Verifies that the given username and password credentials are valid. """
|
||||
return self.state.verify_credentials(username_or_email, password)
|
||||
|
||||
def check_group_lookup_args(self, group_lookup_args):
|
||||
""" Verifies that the given group lookup args point to a valid group. Returns a tuple consisting
|
||||
of a boolean status and an error message (if any).
|
||||
"""
|
||||
return self.state.check_group_lookup_args(group_lookup_args)
|
||||
|
||||
def service_metadata(self):
|
||||
""" Returns a dictionary of extra metadata to present to *superusers* about this auth engine.
|
||||
For example, LDAP returns the base DN so we can display to the user during sync setup.
|
||||
"""
|
||||
return self.state.service_metadata()
|
||||
|
||||
def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False):
|
||||
""" Returns a tuple of an iterator over all the members of the group matching the given lookup
|
||||
args dictionary, or the error that occurred if the initial call failed or is unsupported.
|
||||
|
|
|
@ -28,3 +28,14 @@ class DatabaseUsers(object):
|
|||
""" No need to implement, as we already query for users directly in the database. """
|
||||
return (None, '', '')
|
||||
|
||||
def check_group_lookup_args(self, group_lookup_args):
|
||||
""" Never used since all groups, by definition, are in the database. """
|
||||
return (False, 'Not supported')
|
||||
|
||||
def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False):
|
||||
""" Never used since all groups, by definition, are in the database. """
|
||||
return (None, 'Not supported')
|
||||
|
||||
def service_metadata(self):
|
||||
""" Never used since database has no metadata """
|
||||
return {}
|
||||
|
|
|
@ -259,6 +259,26 @@ class LDAPUsers(FederatedUsers):
|
|||
|
||||
return self._build_user_information(found_response)
|
||||
|
||||
def service_metadata(self):
|
||||
return {
|
||||
'base_dn': self._base_dn,
|
||||
}
|
||||
|
||||
def check_group_lookup_args(self, group_lookup_args, disable_pagination=False):
|
||||
if not group_lookup_args.get('group_dn'):
|
||||
return (False, 'Missing group_dn')
|
||||
|
||||
(it, err) = self.iterate_group_members(group_lookup_args, page_size=1,
|
||||
disable_pagination=disable_pagination)
|
||||
if err is not None:
|
||||
return (False, err)
|
||||
|
||||
results = list(it)
|
||||
if not results:
|
||||
return (False, 'Group does not exist or is empty')
|
||||
|
||||
return (True, None)
|
||||
|
||||
def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False):
|
||||
try:
|
||||
with self._ldap.get_connection():
|
||||
|
|
|
@ -72,6 +72,18 @@ class FederatedUsers(object):
|
|||
|
||||
return (db_user, None)
|
||||
|
||||
def service_metadata(self):
|
||||
""" Returns a dictionary of extra metadata to present to *superusers* about this auth engine.
|
||||
For example, LDAP returns the base DN so we can display to the user during sync setup.
|
||||
"""
|
||||
return {}
|
||||
|
||||
def check_group_lookup_args(self, group_lookup_args):
|
||||
""" Verifies that the given group lookup args point to a valid group. Returns a tuple consisting
|
||||
of a boolean status and an error message (if any).
|
||||
"""
|
||||
return (False, 'Not supported')
|
||||
|
||||
def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False):
|
||||
""" Returns an iterator over all the members of the group matching the given lookup args
|
||||
dictionary. The format of the lookup args dictionary is specific to the implementation.
|
||||
|
|
|
@ -327,6 +327,22 @@ class TestLDAP(unittest.TestCase):
|
|||
self.assertEquals('someuser', second.username)
|
||||
self.assertEquals('foo@bar.com', second.email)
|
||||
|
||||
def test_check_group_lookup_args(self):
|
||||
with mock_ldap() as ldap:
|
||||
(result, err) = ldap.check_group_lookup_args({'group_dn': 'cn=invalid'},
|
||||
disable_pagination=True)
|
||||
self.assertFalse(result)
|
||||
self.assertIsNotNone(err)
|
||||
|
||||
(result, err) = ldap.check_group_lookup_args({'group_dn': 'cn=AwesomeFolk'},
|
||||
disable_pagination=True)
|
||||
self.assertTrue(result)
|
||||
self.assertIsNone(err)
|
||||
|
||||
def test_metadata(self):
|
||||
with mock_ldap() as ldap:
|
||||
assert 'base_dn' in ldap.service_metadata()
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
|
|
Reference in a new issue