SECURITY FIX FOR LDAP
It appears the recent migration of the LDAP code and add of a check for the admin username/password being invalid *broke the LDAP password check*, allowing any password to succeed for login. This fixes the problem, add unit tests to verify the fix and add some tests to our other external auth test suite. A release will be needed immediately along with an announcement
This commit is contained in:
parent
c3518c2c99
commit
0854d20cbd
3 changed files with 34 additions and 11 deletions
|
@ -9,6 +9,16 @@ from data.users.federated import FederatedUsers, VerifiedCredentials
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LDAPConnectionBuilder(object):
|
||||
def __init__(self, ldap_uri, user_dn, user_pw):
|
||||
self._ldap_uri = ldap_uri
|
||||
self._user_dn = user_dn
|
||||
self._user_pw = user_pw
|
||||
|
||||
def get_connection(self):
|
||||
return LDAPConnection(self._ldap_uri, self._user_dn, self._user_pw)
|
||||
|
||||
|
||||
class LDAPConnection(object):
|
||||
def __init__(self, ldap_uri, user_dn, user_pw):
|
||||
self._ldap_uri = ldap_uri
|
||||
|
@ -20,13 +30,7 @@ class LDAPConnection(object):
|
|||
trace_level = 2 if os.environ.get('USERS_DEBUG') == '1' else 0
|
||||
self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level)
|
||||
self._conn.set_option(ldap.OPT_REFERRALS, 1)
|
||||
|
||||
try:
|
||||
self._conn.simple_bind_s(self._user_dn, self._user_pw)
|
||||
except ldap.INVALID_CREDENTIALS:
|
||||
logger.exception('LDAP admin dn or password are invalid')
|
||||
return None
|
||||
|
||||
self._conn.simple_bind_s(self._user_dn, self._user_pw)
|
||||
return self._conn
|
||||
|
||||
def __exit__(self, exc_type, value, tb):
|
||||
|
@ -38,7 +42,7 @@ class LDAPUsers(FederatedUsers):
|
|||
|
||||
def __init__(self, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr):
|
||||
super(LDAPUsers, self).__init__('ldap')
|
||||
self._ldap_conn = LDAPConnection(ldap_uri, admin_dn, admin_passwd)
|
||||
self._ldap = LDAPConnectionBuilder(ldap_uri, admin_dn, admin_passwd)
|
||||
self._ldap_uri = ldap_uri
|
||||
self._base_dn = base_dn
|
||||
self._user_rdn = user_rdn
|
||||
|
@ -65,10 +69,15 @@ class LDAPUsers(FederatedUsers):
|
|||
return referral_dn
|
||||
|
||||
def _ldap_user_search(self, username_or_email):
|
||||
with self._ldap_conn as conn:
|
||||
if conn is None:
|
||||
return (None, 'LDAP Admin dn or password is invalid')
|
||||
# Verify the admin connection works first. We do this here to avoid wrapping
|
||||
# the entire block in the INVALID CREDENTIALS check.
|
||||
try:
|
||||
with self._ldap.get_connection():
|
||||
pass
|
||||
except ldap.INVALID_CREDENTIALS:
|
||||
return (None, 'LDAP Admin dn or password is invalid')
|
||||
|
||||
with self._ldap.get_connection() as conn:
|
||||
logger.debug('Incoming username or email param: %s', username_or_email.__repr__())
|
||||
user_search_dn = ','.join(self._user_rdn + self._base_dn)
|
||||
query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr,
|
||||
|
|
|
@ -114,6 +114,9 @@ class JWTAuthTestCase(LiveServerTestCase):
|
|||
self.assertIsNotNone(result)
|
||||
|
||||
# Confirm a user with the same internal and external username.
|
||||
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'invalidpassword')
|
||||
self.assertIsNone(result)
|
||||
|
||||
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password')
|
||||
self.assertIsNotNone(result)
|
||||
self.assertEquals('cooluser', result.username)
|
||||
|
|
|
@ -112,6 +112,17 @@ class TestLDAP(unittest.TestCase):
|
|||
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
|
||||
self.assertEquals(response.username, 'someuser')
|
||||
|
||||
def test_invalid_password(self):
|
||||
# Verify we cannot login with an invalid password.
|
||||
(response, err_msg) = self.ldap.verify_and_link_user('someuser', 'invalidpass')
|
||||
self.assertIsNone(response)
|
||||
self.assertEquals(err_msg, 'Invalid password')
|
||||
|
||||
# Verify we cannot confirm the user.
|
||||
(response, err_msg) = self.ldap.confirm_existing_user('someuser', 'invalidpass')
|
||||
self.assertIsNone(response)
|
||||
self.assertEquals(err_msg, 'Invalid user')
|
||||
|
||||
def test_missing_mail(self):
|
||||
(response, err_msg) = self.ldap.verify_and_link_user('nomail', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
|
|
Reference in a new issue