Merge pull request #2751 from jzelinskie/registry-tests

test: convert registry auth test to pytest
This commit is contained in:
Jimmy Zelinskie 2017-07-13 15:09:57 -04:00 committed by GitHub
commit 9f4ffca736
10 changed files with 270 additions and 371 deletions

View file

@ -8,9 +8,11 @@ from data import model
from test.fixtures import *
def _token(username, password):
return 'basic ' + b64encode('%s:%s' % (username, password))
@pytest.mark.parametrize('token, expected_result', [
('', ValidateResult(AuthKind.basic, missing=True)),
('someinvalidtoken', ValidateResult(AuthKind.basic, missing=True)),
@ -18,25 +20,18 @@ def _token(username, password):
('basic ', ValidateResult(AuthKind.basic, missing=True)),
('basic some token', ValidateResult(AuthKind.basic, missing=True)),
('basic sometoken', ValidateResult(AuthKind.basic, missing=True)),
(_token(ACCESS_TOKEN_USERNAME, 'invalid'),
ValidateResult(AuthKind.basic, error_message='Invalid access token')),
(_token(ACCESS_TOKEN_USERNAME, 'invalid'), ValidateResult(AuthKind.basic,
error_message='Invalid access token')),
(_token(OAUTH_TOKEN_USERNAME, 'invalid'),
ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')),
(_token('devtable', 'invalid'),
ValidateResult(AuthKind.basic, error_message='Invalid Username or Password')),
(_token('devtable+somebot', 'invalid'),
ValidateResult(AuthKind.basic,
error_message='Could not find robot with username: devtable+somebot ' +
'and supplied password.')),
(_token('disabled', 'password'),
ValidateResult(AuthKind.basic,
error_message='This user has been disabled. Please contact your administrator.')),
])
(_token('devtable', 'invalid'), ValidateResult(AuthKind.basic,
error_message='Invalid Username or Password')),
(_token('devtable+somebot', 'invalid'), ValidateResult(
AuthKind.basic, error_message='Could not find robot with username: devtable+somebot ' +
'and supplied password.')),
(_token('disabled', 'password'), ValidateResult(
AuthKind.basic,
error_message='This user has been disabled. Please contact your administrator.')),])
def test_validate_basic_auth_token(token, expected_result, app):
result = validate_basic_auth(token)
assert result == expected_result

View file

@ -7,9 +7,11 @@ from data import model
from auth.cookie import validate_session_cookie
from test.fixtures import *
def test_anonymous_cookie(app):
assert validate_session_cookie().missing
def test_invalidformatted_cookie(app):
# "Login" with a non-UUID reference.
someuser = model.user.get_user('devtable')
@ -22,6 +24,7 @@ def test_invalidformatted_cookie(app):
assert not result.has_user
assert result.error_message == 'Invalid session cookie format'
def test_disabled_user(app):
# "Login" with a disabled user.
someuser = model.user.get_user('disabled')
@ -34,6 +37,7 @@ def test_disabled_user(app):
assert not result.has_user
assert result.error_message == 'User account is disabled'
def test_valid_user(app):
# Login with a valid user.
someuser = model.user.get_user('devtable')
@ -45,6 +49,7 @@ def test_valid_user(app):
assert result.has_user
assert result.error_message is None
def test_valid_organization(app):
# "Login" with a valid organization.
someorg = model.user.get_namespace_user('buynlarge')

View file

@ -6,11 +6,12 @@ from werkzeug.exceptions import HTTPException
from app import LoginWrappedDBUser
from auth.auth_context import get_authenticated_user
from auth.decorators import (extract_namespace_repo_from_session, require_session_login,
process_auth_or_cookie)
from auth.decorators import (
extract_namespace_repo_from_session, require_session_login, process_auth_or_cookie)
from data import model
from test.fixtures import *
def test_extract_namespace_repo_from_session_missing(app):
def emptyfunc():
pass
@ -102,4 +103,3 @@ def test_process_auth_or_cookie_valid_user(app):
# Ensure the authenticated user was updated.
assert get_authenticated_user() == someuser

View file

@ -5,17 +5,17 @@ from auth.validateresult import AuthKind, ValidateResult
from data import model
from test.fixtures import *
@pytest.mark.parametrize('header, expected_result', [
('', ValidateResult(AuthKind.oauth, missing=True)),
('somerandomtoken', ValidateResult(AuthKind.oauth, missing=True)),
('bearer some random token', ValidateResult(AuthKind.oauth, missing=True)),
('bearer invalidtoken',
ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')),
])
ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')),])
def test_bearer(header, expected_result, app):
assert validate_bearer_auth(header) == expected_result
def test_valid_oauth(app):
user = model.user.get_user('devtable')
token = list(model.oauth.list_access_tokens_for_user(user))[0]
@ -25,6 +25,7 @@ def test_valid_oauth(app):
assert result.authed_user == user
assert result.auth_valid
def test_disabled_user_oauth(app):
user = model.user.get_user('disabled')
token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin',
@ -36,6 +37,7 @@ def test_disabled_user_oauth(app):
assert not result.auth_valid
assert result.error_message == 'Granter of the oauth access token is disabled'
def test_expired_token(app):
user = model.user.get_user('devtable')
token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin',

View file

@ -0,0 +1,188 @@
import time
import jwt
import pytest
from app import app, instance_keys
from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException
from data import model # TODO(jzelinskie): remove this after service keys are decoupled
from data.database import ServiceKeyApprovalType
from initdb import setup_database_for_testing, finished_database_for_testing
from util.morecollections import AttrDict
from util.security.registry_jwt import ANONYMOUS_SUB, build_context_and_subject
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']
TEST_USER = AttrDict({'username': 'joeuser'})
MAX_SIGNED_S = 3660
TOKEN_VALIDITY_LIFETIME_S = 60 * 60 # 1 hour
ANONYMOUS_SUB = '(anonymous)'
SERVICE_NAME = 'quay'
# This import has to come below any references to "app".
from test.fixtures import *
def _access(typ='repository', name='somens/somerepo', actions=None):
actions = [] if actions is None else actions
return [{
'type': typ,
'name': name,
'actions': actions,}]
def _delete_field(token_data, field_name):
token_data.pop(field_name)
return token_data
def _token_data(access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None,
exp=None, nbf=None, iss=None, subject=None):
if subject is None:
_, subject = build_context_and_subject(user=user)
return {
'iss': iss or instance_keys.service_name,
'aud': audience,
'nbf': nbf if nbf is not None else int(time.time()),
'iat': iat if iat is not None else int(time.time()),
'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S),
'sub': subject,
'access': access,
'context': context,}
def _token(token_data, key_id=None, private_key=None, skip_header=False, alg=None):
key_id = key_id or instance_keys.local_key_id
private_key = private_key or instance_keys.local_private_key
if alg == "none":
private_key = None
token_headers = {'kid': key_id}
if skip_header:
token_headers = {}
token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers)
return 'Bearer {0}'.format(token_data)
def _parse_token(token):
return identity_from_bearer_token(token)[0]
def test_accepted_token(initialized_db):
token = _token(_token_data())
identity = _parse_token(token)
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username,
identity.id)
assert len(identity.provides) == 0
anon_token = _token(_token_data(user=None))
anon_identity = _parse_token(anon_token)
assert anon_identity.id == ANONYMOUS_SUB, 'should be %s, but was %s' % (ANONYMOUS_SUB,
anon_identity.id)
assert len(identity.provides) == 0
@pytest.mark.parametrize('access', [
(_access(actions=['pull', 'push'])),
(_access(actions=['pull', '*'])),
(_access(actions=['*', 'push'])),
(_access(actions=['*'])),
(_access(actions=['pull', '*', 'push'])),])
def test_token_with_access(access, initialized_db):
token = _token(_token_data(access=access))
identity = _parse_token(token)
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username,
identity.id)
assert len(identity.provides) == 1
role = list(identity.provides)[0][3]
if "*" in access[0]['actions']:
assert role == 'admin'
elif "push" in access[0]['actions']:
assert role == 'write'
elif "pull" in access[0]['actions']:
assert role == 'read'
@pytest.mark.parametrize('token', [
(_token(
_token_data(access=[{
'toipe': 'repository',
'namesies': 'somens/somerepo',
'akshuns': ['pull', 'push', '*']}]))),
(_token(_token_data(audience='someotherapp'))),
(_token(_delete_field(_token_data(), 'aud'))),
(_token(_token_data(nbf=int(time.time()) + 600))),
(_token(_delete_field(_token_data(), 'nbf'))),
(_token(_token_data(iat=int(time.time()) + 600))),
(_token(_delete_field(_token_data(), 'iat'))),
(_token(_token_data(exp=int(time.time()) + MAX_SIGNED_S * 2))),
(_token(_token_data(exp=int(time.time()) - 60))),
(_token(_delete_field(_token_data(), 'exp'))),
(_token(_delete_field(_token_data(), 'sub'))),
(_token(_token_data(iss='badissuer'))),
(_token(_delete_field(_token_data(), 'iss'))),
(_token(_token_data(), skip_header=True)),
(_token(_token_data(), key_id='someunknownkey')),
(_token(_token_data(), key_id='kid7')),
(_token(_token_data(), alg='none', private_key=None)),
('some random token'),
('Bearer: sometokenhere'),
('\nBearer: dGVzdA'),])
def test_invalid_jwt(token, initialized_db):
with pytest.raises(InvalidJWTException):
_parse_token(token)
def test_mixing_keys_e2e(initialized_db):
token_data = _token_data()
# Create a new key for testing.
p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey',
name='newkey', metadata={})
private_key = p.exportKey('PEM')
# Test first with the new valid, but unapproved key.
unapproved_key_token = _token(token_data, key_id='newkey', private_key=private_key)
with pytest.raises(InvalidJWTException):
_parse_token(unapproved_key_token)
# Approve the key and try again.
admin_user = model.user.get_user('devtable')
model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER)
valid_token = _token(token_data, key_id='newkey', private_key=private_key)
identity = _parse_token(valid_token)
assert identity.id == TEST_USER.username
assert len(identity.provides) == 0
# Try using a different private key with the existing key ID.
bad_private_token = _token(token_data, key_id='newkey',
private_key=instance_keys.local_private_key)
with pytest.raises(InvalidJWTException):
_parse_token(bad_private_token)
# Try using a different key ID with the existing private key.
kid_mismatch_token = _token(token_data, key_id=instance_keys.local_key_id,
private_key=private_key)
with pytest.raises(InvalidJWTException):
_parse_token(kid_mismatch_token)
# Delete the new key.
key.delete_instance(recursive=True)
# Ensure it still works (via the cache.)
deleted_key_token = _token(token_data, key_id='newkey', private_key=private_key)
identity = _parse_token(deleted_key_token)
assert identity.id == TEST_USER.username
assert len(identity.provides) == 0
# Break the cache.
instance_keys.clear_cache()
# Ensure the key no longer works.
with pytest.raises(InvalidJWTException):
_parse_token(deleted_key_token)

View file

@ -1,32 +1,34 @@
import pytest
from auth.scopes import (scopes_from_scope_string, validate_scope_string, ALL_SCOPES,
is_subset_string)
from auth.scopes import (
scopes_from_scope_string, validate_scope_string, ALL_SCOPES, is_subset_string)
@pytest.mark.parametrize('scopes_string, expected', [
# Valid single scopes.
('repo:read', ['repo:read']),
('repo:admin', ['repo:admin']),
# Invalid scopes.
('not:valid', []),
('repo:admins', []),
@pytest.mark.parametrize(
'scopes_string, expected',
[
# Valid single scopes.
('repo:read', ['repo:read']),
('repo:admin', ['repo:admin']),
# Valid scope strings.
('repo:read repo:admin', ['repo:read', 'repo:admin']),
('repo:read,repo:admin', ['repo:read', 'repo:admin']),
('repo:read,repo:admin repo:write', ['repo:read', 'repo:admin', 'repo:write']),
# Invalid scopes.
('not:valid', []),
('repo:admins', []),
# Partially invalid scopes.
('repo:read,not:valid', []),
('repo:read repo:admins', []),
# Valid scope strings.
('repo:read repo:admin', ['repo:read', 'repo:admin']),
('repo:read,repo:admin', ['repo:read', 'repo:admin']),
('repo:read,repo:admin repo:write', ['repo:read', 'repo:admin', 'repo:write']),
# Invalid scope strings.
('repo:read|repo:admin', []),
# Partially invalid scopes.
('repo:read,not:valid', []),
('repo:read repo:admins', []),
# Mixture of delimiters.
('repo:read, repo:admin', []),
])
# Invalid scope strings.
('repo:read|repo:admin', []),
# Mixture of delimiters.
('repo:read, repo:admin', []),])
def test_parsing(scopes_string, expected):
expected_scope_set = {ALL_SCOPES[scope_name] for scope_name in expected}
parsed_scope_set = scopes_from_scope_string(scopes_string)
@ -40,11 +42,9 @@ def test_parsing(scopes_string, expected):
('repo:read,repo:admin', 'repo:read', True),
('repo:read,repo:admin', 'repo:admin', True),
('repo:read,repo:admin', 'repo:admin repo:read', True),
('', 'repo:read', False),
('unknown:tag', 'repo:read', False),
('repo:read unknown:tag', 'repo:read', False),
('repo:read,unknown:tag', 'repo:read', False),
])
('repo:read,unknown:tag', 'repo:read', False),])
def test_subset_string(superset, subset, result):
assert is_subset_string(superset, subset) == result

View file

@ -3,16 +3,18 @@ import pytest
from auth.signedgrant import validate_signed_grant, generate_signed_token, SIGNATURE_PREFIX
from auth.validateresult import AuthKind, ValidateResult
@pytest.mark.parametrize('header, expected_result', [
('', ValidateResult(AuthKind.signed_grant, missing=True)),
('somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)),
('token somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)),
('token ' + SIGNATURE_PREFIX + 'foo',
ValidateResult(AuthKind.signed_grant, error_message='Signed grant could not be validated')),
('token ' + generate_signed_token({'a': 'b'}, {'c': 'd'}),
ValidateResult(AuthKind.signed_grant, signed_data={'grants': {'a': 'b'}, 'user_context': {'c': 'd'}})),
])
('token ' + generate_signed_token({
'a': 'b'}, {'c': 'd'}), ValidateResult(AuthKind.signed_grant, signed_data={
'grants': {
'a': 'b'},
'user_context': {
'c': 'd'}})),])
def test_token(header, expected_result):
assert validate_signed_grant(header) == expected_result

View file

@ -1,7 +1,7 @@
import pytest
from auth.auth_context import (get_authenticated_user, get_grant_context, get_validated_token,
get_validated_oauth_token)
from auth.auth_context import (
get_authenticated_user, get_grant_context, get_validated_token, get_validated_oauth_token)
from auth.validateresult import AuthKind, ValidateResult
from data import model
from test.fixtures import *
@ -10,27 +10,31 @@ from test.fixtures import *
def get_user():
return model.user.get_user('devtable')
def get_robot():
robot, _ = model.user.create_robot('somebot', get_user())
return robot
def get_token():
return model.token.create_delegate_token('devtable', 'simple', 'sometoken')
def get_oauthtoken():
user = model.user.get_user('devtable')
return list(model.oauth.list_access_tokens_for_user(user))[0]
def get_signeddata():
return {'grants': {'a': 'b'}, 'user_context': {'c': 'd'}}
@pytest.mark.parametrize('get_entity,entity_kind', [
(get_user, 'user'),
(get_robot, 'robot'),
(get_token, 'token'),
(get_oauthtoken, 'oauthtoken'),
(get_signeddata, 'signed_data'),
])
(get_signeddata, 'signed_data'),])
def test_apply_context(get_entity, entity_kind, app):
assert get_authenticated_user() is None
assert get_validated_token() is None
@ -52,9 +56,9 @@ def test_apply_context(get_entity, entity_kind, app):
expected_oauth = entity if entity_kind == 'oauthtoken' else None
fake_grant = {
'user': {'c': 'd'},
'kind': 'user',
}
'user': {
'c': 'd'},
'kind': 'user',}
expected_grant = fake_grant if entity_kind == 'signed_data' else None
assert get_authenticated_user() == expected_user

View file

@ -1,15 +1,15 @@
import pytest
import flask
from flask_principal import Identity, Principal
from endpoints.v2.v2auth import get_tuf_root
from auth import permissions
from util.security.registry_jwt import QUAY_TUF_ROOT, SIGNER_TUF_ROOT, DISABLED_TUF_ROOT
from test import testconfig
from flask_principal import Identity, Principal
from mock import Mock
from auth import permissions
from endpoints.v2.v2auth import get_tuf_root
from test import testconfig
from util.security.registry_jwt import QUAY_TUF_ROOT, SIGNER_TUF_ROOT, DISABLED_TUF_ROOT
def admin_identity(namespace, reponame):
identity = Identity('admin')
@ -17,24 +17,28 @@ def admin_identity(namespace, reponame):
identity.provides.add(permissions._OrganizationRepoNeed(namespace, 'admin'))
return identity
def write_identity(namespace, reponame):
identity = Identity('writer')
identity.provides.add(permissions._RepositoryNeed(namespace, reponame, 'write'))
identity.provides.add(permissions._OrganizationRepoNeed(namespace, 'write'))
return identity
def read_identity(namespace, reponame):
identity = Identity('reader')
identity.provides.add(permissions._RepositoryNeed(namespace, reponame, 'read'))
identity.provides.add(permissions._OrganizationRepoNeed(namespace, 'read'))
return identity
def app_with_principal():
app = flask.Flask(__name__)
app.config.from_object(testconfig.TestConfig())
principal = Principal(app)
principal = Principal(app)
return app, principal
@pytest.mark.parametrize('identity,expected', [
(Identity('anon'), QUAY_TUF_ROOT),
(read_identity("namespace", "repo"), QUAY_TUF_ROOT),
@ -50,8 +54,8 @@ def test_get_tuf_root(identity, expected):
principal.set_identity(identity)
actual = get_tuf_root(Mock(), "namespace", "repo")
assert actual == expected, "should be %s, but was %s" % (expected, actual)
@pytest.mark.parametrize('trust_enabled,tuf_root', [
(True, QUAY_TUF_ROOT),
(False, DISABLED_TUF_ROOT),

View file

@ -1,301 +0,0 @@
import unittest
import time
import jwt
from app import app, instance_keys
from data import model
from data.database import ServiceKeyApprovalType
from initdb import setup_database_for_testing, finished_database_for_testing
from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S
from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException
from util.morecollections import AttrDict
from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject,
generate_bearer_token)
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']
TEST_USER = AttrDict({'username': 'joeuser'})
MAX_SIGNED_S = 3660
class TestRegistryV2Auth(unittest.TestCase):
def setUp(self):
setup_database_for_testing(self)
def tearDown(self):
finished_database_for_testing(self)
def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None,
exp=None, nbf=None, iss=None):
_, subject = build_context_and_subject(user=user)
return {
'iss': iss or instance_keys.service_name,
'aud': audience,
'nbf': nbf if nbf is not None else int(time.time()),
'iat': iat if iat is not None else int(time.time()),
'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S),
'sub': subject,
'access': access,
'context': context,
}
def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, alg=None):
key_id = key_id or instance_keys.local_key_id
private_key = private_key or instance_keys.local_private_key
if alg == "none":
private_key = None
token_headers = {
'kid': key_id,
}
if skip_header:
token_headers = {}
token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers)
return 'Bearer {0}'.format(token_data)
def _parse_token(self, token):
return identity_from_bearer_token(token)[0]
def test_accepted_token(self):
token = self._generate_token(self._generate_token_data())
identity = self._parse_token(token)
self.assertEqual(identity.id, TEST_USER.username)
self.assertEqual(0, len(identity.provides))
anon_token = self._generate_token(self._generate_token_data(user=None))
anon_identity = self._parse_token(anon_token)
self.assertEqual(anon_identity.id, ANONYMOUS_SUB)
self.assertEqual(0, len(identity.provides))
def test_token_with_access(self):
valid_access = [
[
{
'type': 'repository',
'name': 'somens/somerepo',
'actions': ['pull', 'push'],
}
],
[
{
'type': 'repository',
'name': 'somens/somerepo',
'actions': ['pull', '*'],
}
],
[
{
'type': 'repository',
'name': 'somens/somerepo',
'actions': ['*', 'push'],
}
],
[
{
'type': 'repository',
'name': 'somens/somerepo',
'actions': ['*'],
}
],
[
{
'type': 'repository',
'name': 'somens/somerepo',
'actions': ['pull', '*', 'push'],
}
],
]
for access in valid_access:
token = self._generate_token(self._generate_token_data(access=access))
identity = self._parse_token(token)
self.assertEqual(identity.id, TEST_USER.username)
self.assertEqual(1, len(identity.provides))
role = list(identity.provides)[0][3]
if "*" in access[0]['actions']:
self.assertEqual(role, 'admin')
elif "push" in access[0]['actions']:
self.assertEqual(role, 'write')
elif "pull" in access[0]['actions']:
self.assertEqual(role, 'read')
def test_malformed_access(self):
access = [
{
'toipe': 'repository',
'namesies': 'somens/somerepo',
'akshuns': ['pull', 'push', '*'],
}
]
token = self._generate_token(self._generate_token_data(access=access))
with self.assertRaises(InvalidJWTException):
self._parse_token(token)
def test_audience(self):
token_data = self._generate_token_data(audience='someotherapp')
token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(token)
token_data.pop('aud')
no_aud = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(no_aud)
def test_nbf(self):
future = int(time.time()) + 60
token_data = self._generate_token_data(nbf=future)
token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(token)
token_data.pop('nbf')
no_nbf_token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(no_nbf_token)
def test_iat(self):
future = int(time.time()) + 60
token_data = self._generate_token_data(iat=future)
token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(token)
token_data.pop('iat')
no_iat_token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(no_iat_token)
def test_exp(self):
too_far = int(time.time()) + MAX_SIGNED_S * 2
token_data = self._generate_token_data(exp=too_far)
token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(token)
past = int(time.time()) - 60
token_data['exp'] = past
expired_token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(expired_token)
token_data.pop('exp')
no_exp_token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(no_exp_token)
def test_no_sub(self):
token_data = self._generate_token_data()
token_data.pop('sub')
token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(token)
def test_iss(self):
token_data = self._generate_token_data(iss='badissuer')
token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(token)
token_data.pop('iss')
no_iss_token = self._generate_token(token_data)
with self.assertRaises(InvalidJWTException):
self._parse_token(no_iss_token)
def test_missing_header(self):
token_data = self._generate_token_data()
missing_header_token = self._generate_token(token_data, skip_header=True)
with self.assertRaises(InvalidJWTException):
self._parse_token(missing_header_token)
def test_invalid_key(self):
token_data = self._generate_token_data()
invalid_key_token = self._generate_token(token_data, key_id='someunknownkey')
with self.assertRaises(InvalidJWTException):
self._parse_token(invalid_key_token)
def test_expired_key(self):
token_data = self._generate_token_data()
expired_key_token = self._generate_token(token_data, key_id='kid7')
with self.assertRaises(InvalidJWTException):
self._parse_token(expired_key_token)
def test_mixing_keys(self):
token_data = self._generate_token_data()
# Create a new key for testing.
p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey',
name='newkey', metadata={})
private_key = p.exportKey('PEM')
# Test first with the new valid, but unapproved key.
unapproved_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
with self.assertRaises(InvalidJWTException):
self._parse_token(unapproved_key_token)
# Approve the key and try again.
admin_user = model.user.get_user('devtable')
model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER)
valid_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
identity = self._parse_token(valid_token)
self.assertEqual(identity.id, TEST_USER.username)
self.assertEqual(0, len(identity.provides))
# Try using a different private key with the existing key ID.
bad_private_token = self._generate_token(token_data, key_id='newkey', private_key=instance_keys.local_private_key)
with self.assertRaises(InvalidJWTException):
self._parse_token(bad_private_token)
# Try using a different key ID with the existing private key.
kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, private_key=private_key)
with self.assertRaises(InvalidJWTException):
self._parse_token(kid_mismatch_token)
# Delete the new key.
key.delete_instance(recursive=True)
# Ensure it still works (via the cache.)
deleted_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
identity = self._parse_token(deleted_key_token)
self.assertEqual(identity.id, TEST_USER.username)
self.assertEqual(0, len(identity.provides))
# Break the cache.
instance_keys.clear_cache()
# Ensure the key no longer works.
with self.assertRaises(InvalidJWTException):
self._parse_token(deleted_key_token)
def test_bad_token(self):
with self.assertRaises(InvalidJWTException):
self._parse_token("some random token here")
def test_bad_bearer_token(self):
with self.assertRaises(InvalidJWTException):
self._parse_token("Bearer: sometokenhere")
def test_bad_bearer_newline_token(self):
with self.assertRaises(InvalidJWTException):
self._parse_token("\nBearer: dGVzdA")
def test_ensure_no_none(self):
token_data = self._generate_token_data()
none_token = self._generate_token(token_data, alg='none', private_key=None)
with self.assertRaises(InvalidJWTException):
self._parse_token(none_token)
if __name__ == '__main__':
unittest.main()