Fix encrypted password generator to use the LDAP username, not the Quay username.
Currently, we use the Quay username via `verify_user` when we go to create the encrypted password. This is only correct if Quay has not generated its own different username for the LDAP user, and fails if it has. We therefore add a new method `confirm_existing_user`, which looks up the federated login for the LDAP user and then runs the auth flow using that username.
This commit is contained in:
parent
d5e70c6e2a
commit
b0d763b5ff
4 changed files with 69 additions and 3 deletions
|
@ -566,6 +566,12 @@ def list_federated_logins(user):
|
|||
FederatedLogin.user == user)
|
||||
|
||||
|
||||
def lookup_federated_login(user, service_name):
|
||||
try:
|
||||
return list_federated_logins(user).where(LoginService.name == service_name).get()
|
||||
except FederatedLogin.DoesNotExist:
|
||||
return None
|
||||
|
||||
def create_confirm_email_code(user, new_email=None):
|
||||
if new_email:
|
||||
if not validate_email(new_email):
|
||||
|
|
|
@ -28,6 +28,8 @@ class DatabaseUsers(object):
|
|||
|
||||
return (result, None)
|
||||
|
||||
def confirm_existing_user(self, username, password):
|
||||
return self.verify_user(username, password)
|
||||
|
||||
def user_exists(self, username):
|
||||
return model.get_user(username) is not None
|
||||
|
@ -86,7 +88,21 @@ class LDAPUsers(object):
|
|||
|
||||
return None
|
||||
|
||||
def verify_user(self, username_or_email, password):
|
||||
def confirm_existing_user(self, username, password):
|
||||
""" Verify the username and password by looking up the *LDAP* username and confirming the
|
||||
password.
|
||||
"""
|
||||
db_user = model.get_user(username)
|
||||
if not db_user:
|
||||
return (None, 'Invalid user')
|
||||
|
||||
federated_login = model.lookup_federated_login(db_user, 'ldap')
|
||||
if not federated_login:
|
||||
return (None, 'Invalid user')
|
||||
|
||||
return self.verify_user(federated_login.service_ident, password, create_new_user=False)
|
||||
|
||||
def verify_user(self, username_or_email, password, create_new_user=True):
|
||||
""" Verify the credentials with LDAP and if they are valid, create or update the user
|
||||
in our database. """
|
||||
|
||||
|
@ -122,6 +138,9 @@ class LDAPUsers(object):
|
|||
db_user = model.verify_federated_login('ldap', username)
|
||||
|
||||
if not db_user:
|
||||
if not create_new_user:
|
||||
return (None, 'Invalid user')
|
||||
|
||||
# We must create the user in our db
|
||||
valid_username = None
|
||||
for valid_username in generate_valid_usernames(username):
|
||||
|
@ -233,6 +252,13 @@ class UserAuthentication(object):
|
|||
|
||||
return data.get('password', encrypted)
|
||||
|
||||
def confirm_existing_user(self, username, password):
|
||||
""" Verifies that the given password matches to the given DB username. Unlike verify_user, this
|
||||
call first translates the DB user via the FederatedLogin table (where applicable).
|
||||
"""
|
||||
return self.state.confirm_existing_user(username, password)
|
||||
|
||||
|
||||
def verify_user(self, username_or_email, password, basic_auth=False):
|
||||
# First try to decode the password as a signed token.
|
||||
if basic_auth:
|
||||
|
|
|
@ -370,8 +370,7 @@ class ClientKey(ApiResource):
|
|||
""" Return's the user's private client key. """
|
||||
username = get_authenticated_user().username
|
||||
password = request.get_json()['password']
|
||||
|
||||
(result, error_message) = authentication.verify_user(username, password)
|
||||
(result, error_message) = authentication.confirm_existing_user(username, password)
|
||||
if not result:
|
||||
raise request_error(message=error_message)
|
||||
|
||||
|
|
|
@ -38,6 +38,13 @@ class TestLDAP(unittest.TestCase):
|
|||
'ou': 'employees',
|
||||
'uid': ['nomail'],
|
||||
'userPassword': ['somepass']
|
||||
},
|
||||
'uid=cool.user,ou=employees,dc=quay,dc=io': {
|
||||
'dc': ['quay', 'io'],
|
||||
'ou': 'employees',
|
||||
'uid': ['cool.user'],
|
||||
'userPassword': ['somepass'],
|
||||
'mail': ['foo@bar.com']
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -59,9 +66,14 @@ class TestLDAP(unittest.TestCase):
|
|||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
# Verify we can login.
|
||||
(response, _) = ldap.verify_user('someuser', 'somepass')
|
||||
self.assertEquals(response.username, 'someuser')
|
||||
|
||||
# Verify we can confirm the user.
|
||||
(response, _) = ldap.confirm_existing_user('someuser', 'somepass')
|
||||
self.assertEquals(response.username, 'someuser')
|
||||
|
||||
def test_missing_mail(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
|
@ -77,6 +89,29 @@ class TestLDAP(unittest.TestCase):
|
|||
self.assertIsNone(response)
|
||||
self.assertEquals('Missing mail field "mail" in user record', err_msg)
|
||||
|
||||
def test_confirm_different_username(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
# Verify that the user is logged in and their username was adjusted.
|
||||
(response, _) = ldap.verify_user('cool.user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify we can confirm the user's quay username.
|
||||
(response, _) = ldap.confirm_existing_user('cool_user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify that we *cannot* confirm the LDAP username.
|
||||
(response, _) = ldap.confirm_existing_user('cool.user', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Reference in a new issue