diff --git a/auth/test/test_basic.py b/auth/test/test_basic.py index 67cc2afc7..042901bb1 100644 --- a/auth/test/test_basic.py +++ b/auth/test/test_basic.py @@ -8,9 +8,11 @@ from data import model from test.fixtures import * + def _token(username, password): return 'basic ' + b64encode('%s:%s' % (username, password)) + @pytest.mark.parametrize('token, expected_result', [ ('', ValidateResult(AuthKind.basic, missing=True)), ('someinvalidtoken', ValidateResult(AuthKind.basic, missing=True)), @@ -18,25 +20,18 @@ def _token(username, password): ('basic ', ValidateResult(AuthKind.basic, missing=True)), ('basic some token', ValidateResult(AuthKind.basic, missing=True)), ('basic sometoken', ValidateResult(AuthKind.basic, missing=True)), - - (_token(ACCESS_TOKEN_USERNAME, 'invalid'), - ValidateResult(AuthKind.basic, error_message='Invalid access token')), - + (_token(ACCESS_TOKEN_USERNAME, 'invalid'), ValidateResult(AuthKind.basic, + error_message='Invalid access token')), (_token(OAUTH_TOKEN_USERNAME, 'invalid'), ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')), - - (_token('devtable', 'invalid'), - ValidateResult(AuthKind.basic, error_message='Invalid Username or Password')), - - (_token('devtable+somebot', 'invalid'), - ValidateResult(AuthKind.basic, - error_message='Could not find robot with username: devtable+somebot ' + - 'and supplied password.')), - - (_token('disabled', 'password'), - ValidateResult(AuthKind.basic, - error_message='This user has been disabled. Please contact your administrator.')), -]) + (_token('devtable', 'invalid'), ValidateResult(AuthKind.basic, + error_message='Invalid Username or Password')), + (_token('devtable+somebot', 'invalid'), ValidateResult( + AuthKind.basic, error_message='Could not find robot with username: devtable+somebot ' + + 'and supplied password.')), + (_token('disabled', 'password'), ValidateResult( + AuthKind.basic, + error_message='This user has been disabled. Please contact your administrator.')),]) def test_validate_basic_auth_token(token, expected_result, app): result = validate_basic_auth(token) assert result == expected_result diff --git a/auth/test/test_cookie.py b/auth/test/test_cookie.py index d0f04e5df..022b220cf 100644 --- a/auth/test/test_cookie.py +++ b/auth/test/test_cookie.py @@ -7,9 +7,11 @@ from data import model from auth.cookie import validate_session_cookie from test.fixtures import * + def test_anonymous_cookie(app): assert validate_session_cookie().missing + def test_invalidformatted_cookie(app): # "Login" with a non-UUID reference. someuser = model.user.get_user('devtable') @@ -22,6 +24,7 @@ def test_invalidformatted_cookie(app): assert not result.has_user assert result.error_message == 'Invalid session cookie format' + def test_disabled_user(app): # "Login" with a disabled user. someuser = model.user.get_user('disabled') @@ -34,6 +37,7 @@ def test_disabled_user(app): assert not result.has_user assert result.error_message == 'User account is disabled' + def test_valid_user(app): # Login with a valid user. someuser = model.user.get_user('devtable') @@ -45,6 +49,7 @@ def test_valid_user(app): assert result.has_user assert result.error_message is None + def test_valid_organization(app): # "Login" with a valid organization. someorg = model.user.get_namespace_user('buynlarge') diff --git a/auth/test/test_decorators.py b/auth/test/test_decorators.py index ec6ed28fa..b0477f7bd 100644 --- a/auth/test/test_decorators.py +++ b/auth/test/test_decorators.py @@ -6,11 +6,12 @@ from werkzeug.exceptions import HTTPException from app import LoginWrappedDBUser from auth.auth_context import get_authenticated_user -from auth.decorators import (extract_namespace_repo_from_session, require_session_login, - process_auth_or_cookie) +from auth.decorators import ( + extract_namespace_repo_from_session, require_session_login, process_auth_or_cookie) from data import model from test.fixtures import * + def test_extract_namespace_repo_from_session_missing(app): def emptyfunc(): pass @@ -102,4 +103,3 @@ def test_process_auth_or_cookie_valid_user(app): # Ensure the authenticated user was updated. assert get_authenticated_user() == someuser - diff --git a/auth/test/test_oauth.py b/auth/test/test_oauth.py index bb307459c..35959c12c 100644 --- a/auth/test/test_oauth.py +++ b/auth/test/test_oauth.py @@ -5,17 +5,17 @@ from auth.validateresult import AuthKind, ValidateResult from data import model from test.fixtures import * + @pytest.mark.parametrize('header, expected_result', [ ('', ValidateResult(AuthKind.oauth, missing=True)), ('somerandomtoken', ValidateResult(AuthKind.oauth, missing=True)), ('bearer some random token', ValidateResult(AuthKind.oauth, missing=True)), - ('bearer invalidtoken', - ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')), -]) + ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')),]) def test_bearer(header, expected_result, app): assert validate_bearer_auth(header) == expected_result + def test_valid_oauth(app): user = model.user.get_user('devtable') token = list(model.oauth.list_access_tokens_for_user(user))[0] @@ -25,6 +25,7 @@ def test_valid_oauth(app): assert result.authed_user == user assert result.auth_valid + def test_disabled_user_oauth(app): user = model.user.get_user('disabled') token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin', @@ -36,6 +37,7 @@ def test_disabled_user_oauth(app): assert not result.auth_valid assert result.error_message == 'Granter of the oauth access token is disabled' + def test_expired_token(app): user = model.user.get_user('devtable') token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin', diff --git a/auth/test/test_registry_jwt.py b/auth/test/test_registry_jwt.py index 37db6555f..a174cc324 100644 --- a/auth/test/test_registry_jwt.py +++ b/auth/test/test_registry_jwt.py @@ -9,7 +9,6 @@ from initdb import setup_database_for_testing, finished_database_for_testing from util.morecollections import AttrDict from util.security.registry_jwt import ANONYMOUS_SUB, build_context_and_subject - TEST_AUDIENCE = app.config['SERVER_HOSTNAME'] TEST_USER = AttrDict({'username': 'joeuser'}) MAX_SIGNED_S = 3660 @@ -22,8 +21,7 @@ def _access(typ='repository', name='somens/somerepo', actions=None): return [{ 'type': typ, 'name': name, - 'actions': actions, - }] + 'actions': actions,}] def _delete_field(token_data, field_name): @@ -31,8 +29,8 @@ def _delete_field(token_data, field_name): return token_data -def _token_data(access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None, exp=None, - nbf=None, iss=None, subject=None): +def _token_data(access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None, + exp=None, nbf=None, iss=None, subject=None): if subject is None: _, subject = build_context_and_subject(user=user) return { @@ -43,8 +41,7 @@ def _token_data(access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, 'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S), 'sub': subject, 'access': access, - 'context': context, - } + 'context': context,} def _token(token_data, key_id=None, private_key=None, skip_header=False, alg=None): @@ -70,12 +67,14 @@ def _parse_token(token): def test_accepted_token(): token = _token(_token_data()) identity = _parse_token(token) - assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, identity.id) + assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, + identity.id) assert len(identity.provides) == 0 anon_token = _token(_token_data(user=None)) anon_identity = _parse_token(anon_token) - assert anon_identity.id == ANONYMOUS_SUB, 'should be %s, but was %s' % (ANONYMOUS_SUB, anon_identity.id) + assert anon_identity.id == ANONYMOUS_SUB, 'should be %s, but was %s' % (ANONYMOUS_SUB, + anon_identity.id) assert len(identity.provides) == 0 @@ -84,12 +83,12 @@ def test_accepted_token(): (_access(actions=['pull', '*'])), (_access(actions=['*', 'push'])), (_access(actions=['*'])), - (_access(actions=['pull', '*', 'push'])), -]) + (_access(actions=['pull', '*', 'push'])),]) def test_token_with_access(access): token = _token(_token_data(access=access)) identity = _parse_token(token) - assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, identity.id) + assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, + identity.id) assert len(identity.provides) == 1 role = list(identity.provides)[0][3] @@ -102,7 +101,11 @@ def test_token_with_access(access): @pytest.mark.parametrize('token', [ - (_token(_token_data(access=[{'toipe': 'repository', 'namesies': 'somens/somerepo', 'akshuns': ['pull', 'push', '*']}]))), + (_token( + _token_data(access=[{ + 'toipe': 'repository', + 'namesies': 'somens/somerepo', + 'akshuns': ['pull', 'push', '*']}]))), (_token(_token_data(audience='someotherapp'))), (_token(_delete_field(_token_data(), 'aud'))), (_token(_token_data(nbf=int(time.time()) + 60))), @@ -121,8 +124,7 @@ def test_token_with_access(access): (_token(_token_data(), alg='none', private_key=None)), ('some random token'), ('Bearer: sometokenhere'), - ('\nBearer: dGVzdA'), -]) + ('\nBearer: dGVzdA'),]) def test_invalid_jwt(token): with pytest.raises(InvalidJWTException): _parse_token(token) diff --git a/auth/test/test_registry_jwt_e2e.py b/auth/test/test_registry_jwt_e2e.py index 94c2affbe..d7715b7a7 100644 --- a/auth/test/test_registry_jwt_e2e.py +++ b/auth/test/test_registry_jwt_e2e.py @@ -11,7 +11,6 @@ from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTExcepti from util.morecollections import AttrDict from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject) - TEST_AUDIENCE = app.config['SERVER_HOSTNAME'] TEST_USER = AttrDict({'username': 'joeuser'}) MAX_SIGNED_S = 3660 @@ -24,8 +23,8 @@ class TestRegistryV2Auth(unittest.TestCase): def tearDown(self): finished_database_for_testing(self) - def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None, - exp=None, nbf=None, iss=None): + def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, + iat=None, exp=None, nbf=None, iss=None): _, subject = build_context_and_subject(user=user) return { @@ -36,10 +35,10 @@ class TestRegistryV2Auth(unittest.TestCase): 'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S), 'sub': subject, 'access': access, - 'context': context, - } + 'context': context,} - def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, alg=None): + def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, + alg=None): key_id = key_id or instance_keys.local_key_id private_key = private_key or instance_keys.local_private_key @@ -47,8 +46,7 @@ class TestRegistryV2Auth(unittest.TestCase): private_key = None token_headers = { - 'kid': key_id, - } + 'kid': key_id,} if skip_header: token_headers = {} @@ -63,13 +61,14 @@ class TestRegistryV2Auth(unittest.TestCase): token_data = self._generate_token_data() # Create a new key for testing. - p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey', - name='newkey', metadata={}) + p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, + kid='newkey', name='newkey', metadata={}) private_key = p.exportKey('PEM') # Test first with the new valid, but unapproved key. - unapproved_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) + unapproved_key_token = self._generate_token(token_data, key_id='newkey', + private_key=private_key) with self.assertRaises(InvalidJWTException): self._parse_token(unapproved_key_token) @@ -84,12 +83,14 @@ class TestRegistryV2Auth(unittest.TestCase): self.assertEqual(0, len(identity.provides)) # Try using a different private key with the existing key ID. - bad_private_token = self._generate_token(token_data, key_id='newkey', private_key=instance_keys.local_private_key) + bad_private_token = self._generate_token(token_data, key_id='newkey', + private_key=instance_keys.local_private_key) with self.assertRaises(InvalidJWTException): self._parse_token(bad_private_token) # Try using a different key ID with the existing private key. - kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, private_key=private_key) + kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, + private_key=private_key) with self.assertRaises(InvalidJWTException): self._parse_token(kid_mismatch_token) diff --git a/auth/test/test_scopes.py b/auth/test/test_scopes.py index 4f03fae10..b71140136 100644 --- a/auth/test/test_scopes.py +++ b/auth/test/test_scopes.py @@ -1,32 +1,34 @@ import pytest -from auth.scopes import (scopes_from_scope_string, validate_scope_string, ALL_SCOPES, - is_subset_string) +from auth.scopes import ( + scopes_from_scope_string, validate_scope_string, ALL_SCOPES, is_subset_string) -@pytest.mark.parametrize('scopes_string, expected', [ - # Valid single scopes. - ('repo:read', ['repo:read']), - ('repo:admin', ['repo:admin']), - # Invalid scopes. - ('not:valid', []), - ('repo:admins', []), +@pytest.mark.parametrize( + 'scopes_string, expected', + [ + # Valid single scopes. + ('repo:read', ['repo:read']), + ('repo:admin', ['repo:admin']), - # Valid scope strings. - ('repo:read repo:admin', ['repo:read', 'repo:admin']), - ('repo:read,repo:admin', ['repo:read', 'repo:admin']), - ('repo:read,repo:admin repo:write', ['repo:read', 'repo:admin', 'repo:write']), + # Invalid scopes. + ('not:valid', []), + ('repo:admins', []), - # Partially invalid scopes. - ('repo:read,not:valid', []), - ('repo:read repo:admins', []), + # Valid scope strings. + ('repo:read repo:admin', ['repo:read', 'repo:admin']), + ('repo:read,repo:admin', ['repo:read', 'repo:admin']), + ('repo:read,repo:admin repo:write', ['repo:read', 'repo:admin', 'repo:write']), - # Invalid scope strings. - ('repo:read|repo:admin', []), + # Partially invalid scopes. + ('repo:read,not:valid', []), + ('repo:read repo:admins', []), - # Mixture of delimiters. - ('repo:read, repo:admin', []), -]) + # Invalid scope strings. + ('repo:read|repo:admin', []), + + # Mixture of delimiters. + ('repo:read, repo:admin', []),]) def test_parsing(scopes_string, expected): expected_scope_set = {ALL_SCOPES[scope_name] for scope_name in expected} parsed_scope_set = scopes_from_scope_string(scopes_string) @@ -40,11 +42,9 @@ def test_parsing(scopes_string, expected): ('repo:read,repo:admin', 'repo:read', True), ('repo:read,repo:admin', 'repo:admin', True), ('repo:read,repo:admin', 'repo:admin repo:read', True), - ('', 'repo:read', False), ('unknown:tag', 'repo:read', False), ('repo:read unknown:tag', 'repo:read', False), - ('repo:read,unknown:tag', 'repo:read', False), -]) + ('repo:read,unknown:tag', 'repo:read', False),]) def test_subset_string(superset, subset, result): assert is_subset_string(superset, subset) == result diff --git a/auth/test/test_signedgrant.py b/auth/test/test_signedgrant.py index 620a162ee..8eacbfac6 100644 --- a/auth/test/test_signedgrant.py +++ b/auth/test/test_signedgrant.py @@ -3,16 +3,18 @@ import pytest from auth.signedgrant import validate_signed_grant, generate_signed_token, SIGNATURE_PREFIX from auth.validateresult import AuthKind, ValidateResult + @pytest.mark.parametrize('header, expected_result', [ ('', ValidateResult(AuthKind.signed_grant, missing=True)), ('somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)), ('token somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)), - ('token ' + SIGNATURE_PREFIX + 'foo', ValidateResult(AuthKind.signed_grant, error_message='Signed grant could not be validated')), - - ('token ' + generate_signed_token({'a': 'b'}, {'c': 'd'}), - ValidateResult(AuthKind.signed_grant, signed_data={'grants': {'a': 'b'}, 'user_context': {'c': 'd'}})), -]) + ('token ' + generate_signed_token({ + 'a': 'b'}, {'c': 'd'}), ValidateResult(AuthKind.signed_grant, signed_data={ + 'grants': { + 'a': 'b'}, + 'user_context': { + 'c': 'd'}})),]) def test_token(header, expected_result): assert validate_signed_grant(header) == expected_result diff --git a/auth/test/test_validateresult.py b/auth/test/test_validateresult.py index 993075645..4b7d66c8f 100644 --- a/auth/test/test_validateresult.py +++ b/auth/test/test_validateresult.py @@ -1,7 +1,7 @@ import pytest -from auth.auth_context import (get_authenticated_user, get_grant_context, get_validated_token, - get_validated_oauth_token) +from auth.auth_context import ( + get_authenticated_user, get_grant_context, get_validated_token, get_validated_oauth_token) from auth.validateresult import AuthKind, ValidateResult from data import model from test.fixtures import * @@ -10,27 +10,31 @@ from test.fixtures import * def get_user(): return model.user.get_user('devtable') + def get_robot(): robot, _ = model.user.create_robot('somebot', get_user()) return robot + def get_token(): return model.token.create_delegate_token('devtable', 'simple', 'sometoken') + def get_oauthtoken(): user = model.user.get_user('devtable') return list(model.oauth.list_access_tokens_for_user(user))[0] + def get_signeddata(): return {'grants': {'a': 'b'}, 'user_context': {'c': 'd'}} + @pytest.mark.parametrize('get_entity,entity_kind', [ (get_user, 'user'), (get_robot, 'robot'), (get_token, 'token'), (get_oauthtoken, 'oauthtoken'), - (get_signeddata, 'signed_data'), -]) + (get_signeddata, 'signed_data'),]) def test_apply_context(get_entity, entity_kind, app): assert get_authenticated_user() is None assert get_validated_token() is None @@ -52,9 +56,9 @@ def test_apply_context(get_entity, entity_kind, app): expected_oauth = entity if entity_kind == 'oauthtoken' else None fake_grant = { - 'user': {'c': 'd'}, - 'kind': 'user', - } + 'user': { + 'c': 'd'}, + 'kind': 'user',} expected_grant = fake_grant if entity_kind == 'signed_data' else None assert get_authenticated_user() == expected_user