diff --git a/data/model/legacy.py b/data/model/legacy.py index 67daaa540..16191b8a6 100644 --- a/data/model/legacy.py +++ b/data/model/legacy.py @@ -566,6 +566,12 @@ def list_federated_logins(user): FederatedLogin.user == user) +def lookup_federated_login(user, service_name): + try: + return list_federated_logins(user).where(LoginService.name == service_name).get() + except FederatedLogin.DoesNotExist: + return None + def create_confirm_email_code(user, new_email=None): if new_email: if not validate_email(new_email): diff --git a/data/users.py b/data/users.py index 22c51273e..1accd1fdc 100644 --- a/data/users.py +++ b/data/users.py @@ -28,6 +28,8 @@ class DatabaseUsers(object): return (result, None) + def confirm_existing_user(self, username, password): + return self.verify_user(username, password) def user_exists(self, username): return model.get_user(username) is not None @@ -86,7 +88,21 @@ class LDAPUsers(object): return None - def verify_user(self, username_or_email, password): + def confirm_existing_user(self, username, password): + """ Verify the username and password by looking up the *LDAP* username and confirming the + password. + """ + db_user = model.get_user(username) + if not db_user: + return (None, 'Invalid user') + + federated_login = model.lookup_federated_login(db_user, 'ldap') + if not federated_login: + return (None, 'Invalid user') + + return self.verify_user(federated_login.service_ident, password, create_new_user=False) + + def verify_user(self, username_or_email, password, create_new_user=True): """ Verify the credentials with LDAP and if they are valid, create or update the user in our database. """ @@ -122,6 +138,9 @@ class LDAPUsers(object): db_user = model.verify_federated_login('ldap', username) if not db_user: + if not create_new_user: + return (None, 'Invalid user') + # We must create the user in our db valid_username = None for valid_username in generate_valid_usernames(username): @@ -233,6 +252,13 @@ class UserAuthentication(object): return data.get('password', encrypted) + def confirm_existing_user(self, username, password): + """ Verifies that the given password matches to the given DB username. Unlike verify_user, this + call first translates the DB user via the FederatedLogin table (where applicable). + """ + return self.state.confirm_existing_user(username, password) + + def verify_user(self, username_or_email, password, basic_auth=False): # First try to decode the password as a signed token. if basic_auth: diff --git a/endpoints/api/user.py b/endpoints/api/user.py index b03c5f87b..161c97c88 100644 --- a/endpoints/api/user.py +++ b/endpoints/api/user.py @@ -370,8 +370,7 @@ class ClientKey(ApiResource): """ Return's the user's private client key. """ username = get_authenticated_user().username password = request.get_json()['password'] - - (result, error_message) = authentication.verify_user(username, password) + (result, error_message) = authentication.confirm_existing_user(username, password) if not result: raise request_error(message=error_message) diff --git a/test/test_ldap.py b/test/test_ldap.py index 323a87a4e..49fd5979c 100644 --- a/test/test_ldap.py +++ b/test/test_ldap.py @@ -38,6 +38,13 @@ class TestLDAP(unittest.TestCase): 'ou': 'employees', 'uid': ['nomail'], 'userPassword': ['somepass'] + }, + 'uid=cool.user,ou=employees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'employees', + 'uid': ['cool.user'], + 'userPassword': ['somepass'], + 'mail': ['foo@bar.com'] } }) @@ -59,9 +66,14 @@ class TestLDAP(unittest.TestCase): ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr) + # Verify we can login. (response, _) = ldap.verify_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') + # Verify we can confirm the user. + (response, _) = ldap.confirm_existing_user('someuser', 'somepass') + self.assertEquals(response.username, 'someuser') + def test_missing_mail(self): base_dn = ['dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' @@ -77,6 +89,29 @@ class TestLDAP(unittest.TestCase): self.assertIsNone(response) self.assertEquals('Missing mail field "mail" in user record', err_msg) + def test_confirm_different_username(self): + base_dn = ['dc=quay', 'dc=io'] + admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' + admin_passwd = 'password' + user_rdn = ['ou=employees'] + uid_attr = 'uid' + email_attr = 'mail' + + ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, + uid_attr, email_attr) + + # Verify that the user is logged in and their username was adjusted. + (response, _) = ldap.verify_user('cool.user', 'somepass') + self.assertEquals(response.username, 'cool_user') + + # Verify we can confirm the user's quay username. + (response, _) = ldap.confirm_existing_user('cool_user', 'somepass') + self.assertEquals(response.username, 'cool_user') + + # Verify that we *cannot* confirm the LDAP username. + (response, _) = ldap.confirm_existing_user('cool.user', 'somepass') + self.assertIsNone(response) + if __name__ == '__main__': unittest.main()