import unittest

from app import app
from initdb import setup_database_for_testing, finished_database_for_testing
from data.users import LDAPUsers
from data import model
from mockldap import MockLdap
from mock import patch

class TestLDAP(unittest.TestCase):
  def setUp(self):
    setup_database_for_testing(self)
    self.app = app.test_client()
    self.ctx = app.test_request_context()
    self.ctx.__enter__()

    self.mockldap = MockLdap({
      'dc=quay,dc=io': {'dc': ['quay', 'io']},
      'ou=employees,dc=quay,dc=io': {
        'dc': ['quay', 'io'],
        'ou': 'employees'
      },
      'ou=otheremployees,dc=quay,dc=io': {
        'dc': ['quay', 'io'],
        'ou': 'otheremployees'
      },
      'uid=testy,ou=employees,dc=quay,dc=io': {
        'dc': ['quay', 'io'],
        'ou': 'employees',
        'uid': 'testy',
        'userPassword': ['password']
      },
      'uid=someuser,ou=employees,dc=quay,dc=io': {
        'dc': ['quay', 'io'],
        'ou': 'employees',
        'uid': ['someuser'],
        'userPassword': ['somepass'],
        'mail': ['foo@bar.com']
      },
      'uid=nomail,ou=employees,dc=quay,dc=io': {
        'dc': ['quay', 'io'],
        'ou': 'employees',
        'uid': ['nomail'],
        'userPassword': ['somepass']
      },
      'uid=cool.user,ou=employees,dc=quay,dc=io': {
        'dc': ['quay', 'io'],
        'ou': 'employees',
        'uid': ['cool.user', 'referred'],
        'userPassword': ['somepass'],
        'mail': ['foo@bar.com']
      },
      'uid=referred,ou=employees,dc=quay,dc=io': {
        'uid': ['referred'],
        '_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io'
      },
      'uid=invalidreferred,ou=employees,dc=quay,dc=io': {
        'uid': ['invalidreferred'],
        '_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io'
      },
      'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': {
        'uid': ['multientry'],
        'mail': ['foo@bar.com'],
        'userPassword': ['somepass'],
      },
      'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': {
        'uid': ['multientry'],
        'another': ['key']
      },
       'uid=secondaryuser,ou=otheremployees,dc=quay,dc=io': {
        'dc': ['quay', 'io'],
        'ou': 'otheremployees',
        'uid': ['secondaryuser'],
        'userPassword': ['somepass'],
        'mail': ['foosecondary@bar.com']
      },
    })

    self.mockldap.start()
    self.ldap = self._create_ldap(requires_email=True)

  def tearDown(self):
    self.mockldap.stop()
    finished_database_for_testing(self)
    self.ctx.__exit__(True, None, None)

  def _create_ldap(self, requires_email=True):
    base_dn = ['dc=quay', 'dc=io']
    admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
    admin_passwd = 'password'
    user_rdn = ['ou=employees']
    uid_attr = 'uid'
    email_attr = 'mail'
    secondary_user_rdns = ['ou=otheremployees']

    ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
                     uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns,
                     requires_email=requires_email)
    return ldap

  def test_invalid_admin_password(self):
    base_dn = ['dc=quay', 'dc=io']
    admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
    admin_passwd = 'INVALIDPASSWORD'
    user_rdn = ['ou=employees']
    uid_attr = 'uid'
    email_attr = 'mail'

    ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
                     uid_attr, email_attr)

    self.ldap = ldap

    # Try to login.
    (response, err_msg) = self.ldap.verify_and_link_user('someuser', 'somepass')
    self.assertIsNone(response)
    self.assertEquals('LDAP Admin dn or password is invalid', err_msg)

  def test_login(self):
    # Verify we can login.
    (response, _) = self.ldap.verify_and_link_user('someuser', 'somepass')
    self.assertEquals(response.username, 'someuser')
    self.assertTrue(model.user.has_user_prompt(response, 'confirm_username'))

    # Verify we can confirm the user.
    (response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
    self.assertEquals(response.username, 'someuser')

  def test_login_secondary(self):
    # Verify we can login.
    (response, _) = self.ldap.verify_and_link_user('secondaryuser', 'somepass')
    self.assertEquals(response.username, 'secondaryuser')

    # Verify we can confirm the user.
    (response, _) = self.ldap.confirm_existing_user('secondaryuser', 'somepass')
    self.assertEquals(response.username, 'secondaryuser')

  def test_invalid_password(self):
    # Verify we cannot login with an invalid password.
    (response, err_msg) = self.ldap.verify_and_link_user('someuser', 'invalidpass')
    self.assertIsNone(response)
    self.assertEquals(err_msg, 'Invalid password')

    # Verify we cannot confirm the user.
    (response, err_msg) = self.ldap.confirm_existing_user('someuser', 'invalidpass')
    self.assertIsNone(response)
    self.assertEquals(err_msg, 'Invalid user')

  def test_missing_mail(self):
    (response, err_msg) = self.ldap.get_user('nomail')
    self.assertIsNone(response)
    self.assertEquals('Missing mail field "mail" in user record', err_msg)

  def test_missing_mail_allowed(self):
    ldap = self._create_ldap(requires_email=False)
    (response, _) = ldap.get_user('nomail')
    self.assertEquals(response.username, 'nomail')

  def test_confirm_different_username(self):
    # Verify that the user is logged in and their username was adjusted.
    (response, _) = self.ldap.verify_and_link_user('cool.user', 'somepass')
    self.assertEquals(response.username, 'cool_user')

    # Verify we can confirm the user's quay username.
    (response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
    self.assertEquals(response.username, 'cool_user')

    # Verify that we *cannot* confirm the LDAP username.
    (response, _) = self.ldap.confirm_existing_user('cool.user', 'somepass')
    self.assertIsNone(response)

  def test_referral(self):
    (response, _) = self.ldap.verify_and_link_user('referred', 'somepass')
    self.assertEquals(response.username, 'cool_user')

    # Verify we can confirm the user's quay username.
    (response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
    self.assertEquals(response.username, 'cool_user')

  def test_invalid_referral(self):
    (response, _) = self.ldap.verify_and_link_user('invalidreferred', 'somepass')
    self.assertIsNone(response)

  def test_multientry(self):
    (response, _) = self.ldap.verify_and_link_user('multientry', 'somepass')
    self.assertEquals(response.username, 'multientry')

  def test_login_empty_userdn(self):
    base_dn = ['ou=employees', 'dc=quay', 'dc=io']
    admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
    admin_passwd = 'password'
    user_rdn = []
    uid_attr = 'uid'
    email_attr = 'mail'
    secondary_user_rdns = ['ou=otheremployees']

    ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
                     uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns)

    self.ldap = ldap

    # Verify we can login.
    (response, _) = self.ldap.verify_and_link_user('someuser', 'somepass')
    self.assertEquals(response.username, 'someuser')

    # Verify we can confirm the user.
    (response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
    self.assertEquals(response.username, 'someuser')

  def test_link_user(self):
    # Link someuser.
    user, error_message = self.ldap.link_user('someuser')
    self.assertIsNone(error_message)
    self.assertIsNotNone(user)
    self.assertEquals('someuser', user.username)

    # Link again. Should return the same user record.
    user_again, _ = self.ldap.link_user('someuser')
    self.assertEquals(user_again.id, user.id)

    # Confirm someuser.
    result, _ = self.ldap.confirm_existing_user('someuser', 'somepass')
    self.assertIsNotNone(result)
    self.assertEquals('someuser', result.username)
    self.assertTrue(model.user.has_user_prompt(user, 'confirm_username'))

  def test_query(self):
    def initializer(uri, trace_level=0):
      obj = self.mockldap[uri]

      # Seed to "support" wildcard queries, which MockLDAP does not support natively.
      obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([
        ('uid=cool.user,ou=employees,dc=quay,dc=io', {
          'dc': ['quay', 'io'],
          'ou': 'employees',
          'uid': ['cool.user', 'referred'],
          'userPassword': ['somepass'],
          'mail': ['foo@bar.com']
        })
      ])

      obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([])

      obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([])
      obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2,
                        '(|(uid=unknown*)(mail=unknown*))')([])
      return obj

    with patch('ldap.initialize', new=initializer):
      # Lookup cool.
      (response, error_message) = self.ldap.query_users('cool')
      self.assertIsNone(error_message)
      self.assertEquals(1, len(response))

      user_info = response[0]
      self.assertEquals("cool.user", user_info.username)
      self.assertEquals("foo@bar.com", user_info.email)

      # Lookup unknown.
      (response, error_message) = self.ldap.query_users('unknown')
      self.assertIsNone(error_message)
      self.assertEquals(0, len(response))


if __name__ == '__main__':
  unittest.main()