This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_jwt_auth.py
Joseph Schorr 8aac3fd86e Add support for an external JWT-based authentication system
This authentication system hits two HTTP endpoints to check and verify the existence of users:

Existance endpoint:
GET http://endpoint/ with Authorization: Basic (username:) =>
    Returns 200 if the username/email exists, 4** otherwise

Verification endpoint:
GET http://endpoint/ with Authorization: Basic (username:password) =>
    Returns 200 and a signed JWT with the user's username and email address if the username+password validates, 4** otherwise with the body containing an optional error message

The JWT produced by the endpoint must be issued with an issuer matching that configured in the config.yaml, and the audience must be "quay.io/jwtauthn". The JWT is signed using a private key and then validated on the Quay.io side with the associated public key, found as "jwt-authn.cert" in the conf/stack directory.
2015-06-05 13:20:10 -04:00

156 lines
4.9 KiB
Python

import unittest
import requests
import jwt
import base64
from app import app
from flask import Flask, abort, jsonify, request, make_response
from flask.ext.testing import LiveServerTestCase
from initdb import setup_database_for_testing, finished_database_for_testing
from data.users import JWTAuthUsers
from tempfile import NamedTemporaryFile
from Crypto.PublicKey import RSA
from datetime import datetime, timedelta
class JWTAuthTestCase(LiveServerTestCase):
maxDiff = None
@classmethod
def setUpClass(cls):
public_key = NamedTemporaryFile(delete=True)
key = RSA.generate(1024)
private_key_data = key.exportKey('PEM')
pubkey = key.publickey()
public_key.write(pubkey.exportKey('OpenSSH'))
public_key.seek(0)
JWTAuthTestCase.public_key = public_key
JWTAuthTestCase.private_key_data = private_key_data
def create_app(self):
users = [
{ 'name': 'cooluser', 'email': 'user@domain.com', 'password': 'password' },
{ 'name': 'some.neat.user', 'email': 'neat@domain.com', 'password': 'foobar'}
]
jwt_app = Flask('testjwt')
private_key = JWTAuthTestCase.private_key_data
def _get_basic_auth():
data = base64.b64decode(request.headers['Authorization'][len('Basic '):])
return data.split(':', 1)
@jwt_app.route('/user/exists', methods=['GET'])
def user_exists():
username, _ = _get_basic_auth()
for user in users:
if user['name'] == username or user['email'] == username:
return 'OK'
abort(404)
@jwt_app.route('/user/verify', methods=['GET'])
def verify_user():
username, password = _get_basic_auth()
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
if password != user['password']:
return make_response('', 404)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn',
'nbf': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email']
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
return make_response('', 404)
jwt_app.config['TESTING'] = True
jwt_app.config['DEBUG'] = True
return jwt_app
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.session = requests.Session()
self.jwt_auth = JWTAuthUsers(
self.get_server_url() + '/user/exists',
self.get_server_url() + '/user/verify',
'authy', JWTAuthTestCase.public_key.name)
def tearDown(self):
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def test_user_exists(self):
self.assertFalse(self.jwt_auth.user_exists('testuser'))
self.assertFalse(self.jwt_auth.user_exists('anotheruser'))
self.assertTrue(self.jwt_auth.user_exists('cooluser'))
self.assertTrue(self.jwt_auth.user_exists('user@domain.com'))
def test_verify_user(self):
result, error_message = self.jwt_auth.verify_user('invaliduser', 'foobar')
self.assertEquals('Invalid username or password', error_message)
self.assertIsNone(result)
result, _ = self.jwt_auth.verify_user('cooluser', 'invalidpassword')
self.assertIsNone(result)
result, _ = self.jwt_auth.verify_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
result, _ = self.jwt_auth.verify_user('some.neat.user', 'foobar')
self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username)
def test_confirm_existing_user(self):
# Create the users in the DB.
result, _ = self.jwt_auth.verify_user('cooluser', 'password')
self.assertIsNotNone(result)
result, _ = self.jwt_auth.verify_user('some.neat.user', 'foobar')
self.assertIsNotNone(result)
# Confirm a user with the same internal and external username.
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
# Fail to confirm the *external* username, which should return nothing.
result, _ = self.jwt_auth.confirm_existing_user('some.neat.user', 'password')
self.assertIsNone(result)
# Now confirm the internal username.
result, _ = self.jwt_auth.confirm_existing_user('some_neat_user', 'foobar')
self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username)
def test_disabled_user_custom_erorr(self):
result, error_message = self.jwt_auth.verify_user('disabled', 'password')
self.assertIsNone(result)
self.assertEquals('User is currently disabled', error_message)
if __name__ == '__main__':
unittest.main()