This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_external_jwt_authn.py

275 lines
8.7 KiB
Python

import base64
import unittest
from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
import jwt
import requests
from Crypto.PublicKey import RSA
from flask import Flask, jsonify, request, make_response
from flask_testing import LiveServerTestCase
from app import app
from data.users import ExternalJWTAuthN
from initdb import setup_database_for_testing, finished_database_for_testing
_PORT_NUMBER = 5001
class JWTAuthTestCase(LiveServerTestCase):
maxDiff = None
@classmethod
def setUpClass(cls):
public_key = NamedTemporaryFile(delete=True)
key = RSA.generate(1024)
private_key_data = key.exportKey('PEM')
pubkey = key.publickey()
public_key.write(pubkey.exportKey('OpenSSH'))
public_key.seek(0)
JWTAuthTestCase.public_key = public_key
JWTAuthTestCase.private_key_data = private_key_data
def create_app(self):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
users = [
{'name': 'cooluser', 'email': 'user@domain.com', 'password': 'password'},
{'name': 'some.neat.user', 'email': 'neat@domain.com', 'password': 'foobar'}
]
jwt_app = Flask('testjwt')
private_key = JWTAuthTestCase.private_key_data
jwt_app.config['LIVESERVER_PORT'] = _PORT_NUMBER
def _get_basic_auth():
data = base64.b64decode(request.headers['Authorization'][len('Basic '):])
return data.split(':', 1)
@jwt_app.route('/user/query', methods=['GET'])
def query_users():
query = request.args.get('query')
results = []
for user in users:
if user['name'].startswith(query):
results.append({
'username': user['name'],
'email': user['email'],
})
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/query',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'results': results,
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
@jwt_app.route('/user/get', methods=['GET'])
def get_user():
username = request.args.get('username')
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/getuser',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email']
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
@jwt_app.route('/user/verify', methods=['GET'])
def verify_user():
username, password = _get_basic_auth()
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
if password != user['password']:
return make_response('', 404)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email']
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
jwt_app.config['TESTING'] = True
return jwt_app
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.session = requests.Session()
verify_url = self.get_server_url() + '/user/verify'
query_url = self.get_server_url() + '/user/query'
getuser_url = self.get_server_url() + '/user/get'
self.jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
app.config['HTTPCLIENT'], 300, JWTAuthTestCase.public_key.name)
def tearDown(self):
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def test_verify_and_link_user(self):
result, error_message = self.jwt_auth.verify_and_link_user('invaliduser', 'foobar')
self.assertEquals('Invalid username or password', error_message)
self.assertIsNone(result)
result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'invalidpassword')
self.assertIsNone(result)
result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
result, _ = self.jwt_auth.verify_and_link_user('some.neat.user', 'foobar')
self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username)
def test_confirm_existing_user(self):
# Create the users in the DB.
result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'password')
self.assertIsNotNone(result)
result, _ = self.jwt_auth.verify_and_link_user('some.neat.user', 'foobar')
self.assertIsNotNone(result)
# Confirm a user with the same internal and external username.
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'invalidpassword')
self.assertIsNone(result)
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
# Fail to confirm the *external* username, which should return nothing.
result, _ = self.jwt_auth.confirm_existing_user('some.neat.user', 'password')
self.assertIsNone(result)
# Now confirm the internal username.
result, _ = self.jwt_auth.confirm_existing_user('some_neat_user', 'foobar')
self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username)
def test_disabled_user_custom_error(self):
result, error_message = self.jwt_auth.verify_and_link_user('disabled', 'password')
self.assertIsNone(result)
self.assertEquals('User is currently disabled', error_message)
def test_query(self):
# Lookup `cool`.
results, identifier, error_message = self.jwt_auth.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results))
self.assertEquals('cooluser', results[0].username)
self.assertEquals('user@domain.com', results[0].email)
# Lookup `some`.
results, identifier, error_message = self.jwt_auth.query_users('some')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results))
self.assertEquals('some.neat.user', results[0].username)
self.assertEquals('neat@domain.com', results[0].email)
# Lookup `unknown`.
results, identifier, error_message = self.jwt_auth.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(0, len(results))
def test_get_user(self):
# Lookup cooluser.
result, error_message = self.jwt_auth.get_user('cooluser')
self.assertIsNone(error_message)
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
self.assertEquals('user@domain.com', result.email)
# Lookup some.neat.user.
result, error_message = self.jwt_auth.get_user('some.neat.user')
self.assertIsNone(error_message)
self.assertIsNotNone(result)
self.assertEquals('some.neat.user', result.username)
self.assertEquals('neat@domain.com', result.email)
# Lookup unknown user.
result, error_message = self.jwt_auth.get_user('unknownuser')
self.assertIsNone(result)
def test_link_user(self):
# Link cooluser.
user, error_message = self.jwt_auth.link_user('cooluser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('cooluser', user.username)
# Link again. Should return the same user record.
user_again, _ = self.jwt_auth.link_user('cooluser')
self.assertEquals(user_again.id, user.id)
# Confirm cooluser.
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
def test_link_invalid_user(self):
user, error_message = self.jwt_auth.link_user('invaliduser')
self.assertIsNotNone(error_message)
self.assertIsNone(user)
if __name__ == '__main__':
unittest.main()