From f6fea27c12d738a80de6822c4b5fb3863944a3e8 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 20 May 2015 16:37:09 -0400 Subject: [PATCH] Fix encrypted password generator to use the LDAP username, not the Quay username. Currently, we use the Quay username via `verify_user` when we go to create the encrypted password. This is only correct if Quay has not generated its own different username for the LDAP user, and fails if it has. We therefore add a new method `confirm_existing_user`, which looks up the federated login for the LDAP user and then runs the auth flow using that username. --- data/model/legacy.py | 6 ++++++ data/users.py | 28 +++++++++++++++++++++++++++- endpoints/api/user.py | 3 +-- test/test_ldap.py | 35 +++++++++++++++++++++++++++++++++++ 4 files changed, 69 insertions(+), 3 deletions(-) diff --git a/data/model/legacy.py b/data/model/legacy.py index 67daaa540..16191b8a6 100644 --- a/data/model/legacy.py +++ b/data/model/legacy.py @@ -566,6 +566,12 @@ def list_federated_logins(user): FederatedLogin.user == user) +def lookup_federated_login(user, service_name): + try: + return list_federated_logins(user).where(LoginService.name == service_name).get() + except FederatedLogin.DoesNotExist: + return None + def create_confirm_email_code(user, new_email=None): if new_email: if not validate_email(new_email): diff --git a/data/users.py b/data/users.py index 22c51273e..1accd1fdc 100644 --- a/data/users.py +++ b/data/users.py @@ -28,6 +28,8 @@ class DatabaseUsers(object): return (result, None) + def confirm_existing_user(self, username, password): + return self.verify_user(username, password) def user_exists(self, username): return model.get_user(username) is not None @@ -86,7 +88,21 @@ class LDAPUsers(object): return None - def verify_user(self, username_or_email, password): + def confirm_existing_user(self, username, password): + """ Verify the username and password by looking up the *LDAP* username and confirming the + password. + """ + db_user = model.get_user(username) + if not db_user: + return (None, 'Invalid user') + + federated_login = model.lookup_federated_login(db_user, 'ldap') + if not federated_login: + return (None, 'Invalid user') + + return self.verify_user(federated_login.service_ident, password, create_new_user=False) + + def verify_user(self, username_or_email, password, create_new_user=True): """ Verify the credentials with LDAP and if they are valid, create or update the user in our database. """ @@ -122,6 +138,9 @@ class LDAPUsers(object): db_user = model.verify_federated_login('ldap', username) if not db_user: + if not create_new_user: + return (None, 'Invalid user') + # We must create the user in our db valid_username = None for valid_username in generate_valid_usernames(username): @@ -233,6 +252,13 @@ class UserAuthentication(object): return data.get('password', encrypted) + def confirm_existing_user(self, username, password): + """ Verifies that the given password matches to the given DB username. Unlike verify_user, this + call first translates the DB user via the FederatedLogin table (where applicable). + """ + return self.state.confirm_existing_user(username, password) + + def verify_user(self, username_or_email, password, basic_auth=False): # First try to decode the password as a signed token. if basic_auth: diff --git a/endpoints/api/user.py b/endpoints/api/user.py index b03c5f87b..161c97c88 100644 --- a/endpoints/api/user.py +++ b/endpoints/api/user.py @@ -370,8 +370,7 @@ class ClientKey(ApiResource): """ Return's the user's private client key. """ username = get_authenticated_user().username password = request.get_json()['password'] - - (result, error_message) = authentication.verify_user(username, password) + (result, error_message) = authentication.confirm_existing_user(username, password) if not result: raise request_error(message=error_message) diff --git a/test/test_ldap.py b/test/test_ldap.py index 323a87a4e..49fd5979c 100644 --- a/test/test_ldap.py +++ b/test/test_ldap.py @@ -38,6 +38,13 @@ class TestLDAP(unittest.TestCase): 'ou': 'employees', 'uid': ['nomail'], 'userPassword': ['somepass'] + }, + 'uid=cool.user,ou=employees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'employees', + 'uid': ['cool.user'], + 'userPassword': ['somepass'], + 'mail': ['foo@bar.com'] } }) @@ -59,9 +66,14 @@ class TestLDAP(unittest.TestCase): ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr) + # Verify we can login. (response, _) = ldap.verify_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') + # Verify we can confirm the user. + (response, _) = ldap.confirm_existing_user('someuser', 'somepass') + self.assertEquals(response.username, 'someuser') + def test_missing_mail(self): base_dn = ['dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' @@ -77,6 +89,29 @@ class TestLDAP(unittest.TestCase): self.assertIsNone(response) self.assertEquals('Missing mail field "mail" in user record', err_msg) + def test_confirm_different_username(self): + base_dn = ['dc=quay', 'dc=io'] + admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' + admin_passwd = 'password' + user_rdn = ['ou=employees'] + uid_attr = 'uid' + email_attr = 'mail' + + ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, + uid_attr, email_attr) + + # Verify that the user is logged in and their username was adjusted. + (response, _) = ldap.verify_user('cool.user', 'somepass') + self.assertEquals(response.username, 'cool_user') + + # Verify we can confirm the user's quay username. + (response, _) = ldap.confirm_existing_user('cool_user', 'somepass') + self.assertEquals(response.username, 'cool_user') + + # Verify that we *cannot* confirm the LDAP username. + (response, _) = ldap.confirm_existing_user('cool.user', 'somepass') + self.assertIsNone(response) + if __name__ == '__main__': unittest.main()