commit
1117a2cdc6
4 changed files with 70 additions and 3 deletions
|
@ -566,6 +566,12 @@ def list_federated_logins(user):
|
|||
FederatedLogin.user == user)
|
||||
|
||||
|
||||
def lookup_federated_login(user, service_name):
|
||||
try:
|
||||
return list_federated_logins(user).where(LoginService.name == service_name).get()
|
||||
except FederatedLogin.DoesNotExist:
|
||||
return None
|
||||
|
||||
def create_confirm_email_code(user, new_email=None):
|
||||
if new_email:
|
||||
if not validate_email(new_email):
|
||||
|
|
|
@ -28,6 +28,8 @@ class DatabaseUsers(object):
|
|||
|
||||
return (result, None)
|
||||
|
||||
def confirm_existing_user(self, username, password):
|
||||
return self.verify_user(username, password)
|
||||
|
||||
def user_exists(self, username):
|
||||
return model.get_user(username) is not None
|
||||
|
@ -43,6 +45,7 @@ class LDAPConnection(object):
|
|||
def __enter__(self):
|
||||
trace_level = 2 if os.environ.get('LDAP_DEBUG') == '1' else 0
|
||||
self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level)
|
||||
self._conn.set_option(ldap.OPT_REFERRALS, 1)
|
||||
self._conn.simple_bind_s(self._user_dn, self._user_pw)
|
||||
|
||||
return self._conn
|
||||
|
@ -85,7 +88,21 @@ class LDAPUsers(object):
|
|||
|
||||
return None
|
||||
|
||||
def verify_user(self, username_or_email, password):
|
||||
def confirm_existing_user(self, username, password):
|
||||
""" Verify the username and password by looking up the *LDAP* username and confirming the
|
||||
password.
|
||||
"""
|
||||
db_user = model.get_user(username)
|
||||
if not db_user:
|
||||
return (None, 'Invalid user')
|
||||
|
||||
federated_login = model.lookup_federated_login(db_user, 'ldap')
|
||||
if not federated_login:
|
||||
return (None, 'Invalid user')
|
||||
|
||||
return self.verify_user(federated_login.service_ident, password, create_new_user=False)
|
||||
|
||||
def verify_user(self, username_or_email, password, create_new_user=True):
|
||||
""" Verify the credentials with LDAP and if they are valid, create or update the user
|
||||
in our database. """
|
||||
|
||||
|
@ -121,6 +138,9 @@ class LDAPUsers(object):
|
|||
db_user = model.verify_federated_login('ldap', username)
|
||||
|
||||
if not db_user:
|
||||
if not create_new_user:
|
||||
return (None, 'Invalid user')
|
||||
|
||||
# We must create the user in our db
|
||||
valid_username = None
|
||||
for valid_username in generate_valid_usernames(username):
|
||||
|
@ -232,6 +252,13 @@ class UserAuthentication(object):
|
|||
|
||||
return data.get('password', encrypted)
|
||||
|
||||
def confirm_existing_user(self, username, password):
|
||||
""" Verifies that the given password matches to the given DB username. Unlike verify_user, this
|
||||
call first translates the DB user via the FederatedLogin table (where applicable).
|
||||
"""
|
||||
return self.state.confirm_existing_user(username, password)
|
||||
|
||||
|
||||
def verify_user(self, username_or_email, password, basic_auth=False):
|
||||
# First try to decode the password as a signed token.
|
||||
if basic_auth:
|
||||
|
|
|
@ -370,8 +370,7 @@ class ClientKey(ApiResource):
|
|||
""" Return's the user's private client key. """
|
||||
username = get_authenticated_user().username
|
||||
password = request.get_json()['password']
|
||||
|
||||
(result, error_message) = authentication.verify_user(username, password)
|
||||
(result, error_message) = authentication.confirm_existing_user(username, password)
|
||||
if not result:
|
||||
raise request_error(message=error_message)
|
||||
|
||||
|
|
|
@ -38,6 +38,13 @@ class TestLDAP(unittest.TestCase):
|
|||
'ou': 'employees',
|
||||
'uid': ['nomail'],
|
||||
'userPassword': ['somepass']
|
||||
},
|
||||
'uid=cool.user,ou=employees,dc=quay,dc=io': {
|
||||
'dc': ['quay', 'io'],
|
||||
'ou': 'employees',
|
||||
'uid': ['cool.user'],
|
||||
'userPassword': ['somepass'],
|
||||
'mail': ['foo@bar.com']
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -59,9 +66,14 @@ class TestLDAP(unittest.TestCase):
|
|||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
# Verify we can login.
|
||||
(response, _) = ldap.verify_user('someuser', 'somepass')
|
||||
self.assertEquals(response.username, 'someuser')
|
||||
|
||||
# Verify we can confirm the user.
|
||||
(response, _) = ldap.confirm_existing_user('someuser', 'somepass')
|
||||
self.assertEquals(response.username, 'someuser')
|
||||
|
||||
def test_missing_mail(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
|
@ -77,6 +89,29 @@ class TestLDAP(unittest.TestCase):
|
|||
self.assertIsNone(response)
|
||||
self.assertEquals('Missing mail field "mail" in user record', err_msg)
|
||||
|
||||
def test_confirm_different_username(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
# Verify that the user is logged in and their username was adjusted.
|
||||
(response, _) = ldap.verify_user('cool.user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify we can confirm the user's quay username.
|
||||
(response, _) = ldap.confirm_existing_user('cool_user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify that we *cannot* confirm the LDAP username.
|
||||
(response, _) = ldap.confirm_existing_user('cool.user', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Reference in a new issue