auth/test: yapf format
This commit is contained in:
parent
92877fa70f
commit
da4fb02423
9 changed files with 98 additions and 87 deletions
|
@ -8,9 +8,11 @@ from data import model
|
||||||
|
|
||||||
from test.fixtures import *
|
from test.fixtures import *
|
||||||
|
|
||||||
|
|
||||||
def _token(username, password):
|
def _token(username, password):
|
||||||
return 'basic ' + b64encode('%s:%s' % (username, password))
|
return 'basic ' + b64encode('%s:%s' % (username, password))
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('token, expected_result', [
|
@pytest.mark.parametrize('token, expected_result', [
|
||||||
('', ValidateResult(AuthKind.basic, missing=True)),
|
('', ValidateResult(AuthKind.basic, missing=True)),
|
||||||
('someinvalidtoken', ValidateResult(AuthKind.basic, missing=True)),
|
('someinvalidtoken', ValidateResult(AuthKind.basic, missing=True)),
|
||||||
|
@ -18,25 +20,18 @@ def _token(username, password):
|
||||||
('basic ', ValidateResult(AuthKind.basic, missing=True)),
|
('basic ', ValidateResult(AuthKind.basic, missing=True)),
|
||||||
('basic some token', ValidateResult(AuthKind.basic, missing=True)),
|
('basic some token', ValidateResult(AuthKind.basic, missing=True)),
|
||||||
('basic sometoken', ValidateResult(AuthKind.basic, missing=True)),
|
('basic sometoken', ValidateResult(AuthKind.basic, missing=True)),
|
||||||
|
(_token(ACCESS_TOKEN_USERNAME, 'invalid'), ValidateResult(AuthKind.basic,
|
||||||
(_token(ACCESS_TOKEN_USERNAME, 'invalid'),
|
error_message='Invalid access token')),
|
||||||
ValidateResult(AuthKind.basic, error_message='Invalid access token')),
|
|
||||||
|
|
||||||
(_token(OAUTH_TOKEN_USERNAME, 'invalid'),
|
(_token(OAUTH_TOKEN_USERNAME, 'invalid'),
|
||||||
ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')),
|
ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')),
|
||||||
|
(_token('devtable', 'invalid'), ValidateResult(AuthKind.basic,
|
||||||
(_token('devtable', 'invalid'),
|
error_message='Invalid Username or Password')),
|
||||||
ValidateResult(AuthKind.basic, error_message='Invalid Username or Password')),
|
(_token('devtable+somebot', 'invalid'), ValidateResult(
|
||||||
|
AuthKind.basic, error_message='Could not find robot with username: devtable+somebot ' +
|
||||||
(_token('devtable+somebot', 'invalid'),
|
'and supplied password.')),
|
||||||
ValidateResult(AuthKind.basic,
|
(_token('disabled', 'password'), ValidateResult(
|
||||||
error_message='Could not find robot with username: devtable+somebot ' +
|
AuthKind.basic,
|
||||||
'and supplied password.')),
|
error_message='This user has been disabled. Please contact your administrator.')),])
|
||||||
|
|
||||||
(_token('disabled', 'password'),
|
|
||||||
ValidateResult(AuthKind.basic,
|
|
||||||
error_message='This user has been disabled. Please contact your administrator.')),
|
|
||||||
])
|
|
||||||
def test_validate_basic_auth_token(token, expected_result, app):
|
def test_validate_basic_auth_token(token, expected_result, app):
|
||||||
result = validate_basic_auth(token)
|
result = validate_basic_auth(token)
|
||||||
assert result == expected_result
|
assert result == expected_result
|
||||||
|
|
|
@ -7,9 +7,11 @@ from data import model
|
||||||
from auth.cookie import validate_session_cookie
|
from auth.cookie import validate_session_cookie
|
||||||
from test.fixtures import *
|
from test.fixtures import *
|
||||||
|
|
||||||
|
|
||||||
def test_anonymous_cookie(app):
|
def test_anonymous_cookie(app):
|
||||||
assert validate_session_cookie().missing
|
assert validate_session_cookie().missing
|
||||||
|
|
||||||
|
|
||||||
def test_invalidformatted_cookie(app):
|
def test_invalidformatted_cookie(app):
|
||||||
# "Login" with a non-UUID reference.
|
# "Login" with a non-UUID reference.
|
||||||
someuser = model.user.get_user('devtable')
|
someuser = model.user.get_user('devtable')
|
||||||
|
@ -22,6 +24,7 @@ def test_invalidformatted_cookie(app):
|
||||||
assert not result.has_user
|
assert not result.has_user
|
||||||
assert result.error_message == 'Invalid session cookie format'
|
assert result.error_message == 'Invalid session cookie format'
|
||||||
|
|
||||||
|
|
||||||
def test_disabled_user(app):
|
def test_disabled_user(app):
|
||||||
# "Login" with a disabled user.
|
# "Login" with a disabled user.
|
||||||
someuser = model.user.get_user('disabled')
|
someuser = model.user.get_user('disabled')
|
||||||
|
@ -34,6 +37,7 @@ def test_disabled_user(app):
|
||||||
assert not result.has_user
|
assert not result.has_user
|
||||||
assert result.error_message == 'User account is disabled'
|
assert result.error_message == 'User account is disabled'
|
||||||
|
|
||||||
|
|
||||||
def test_valid_user(app):
|
def test_valid_user(app):
|
||||||
# Login with a valid user.
|
# Login with a valid user.
|
||||||
someuser = model.user.get_user('devtable')
|
someuser = model.user.get_user('devtable')
|
||||||
|
@ -45,6 +49,7 @@ def test_valid_user(app):
|
||||||
assert result.has_user
|
assert result.has_user
|
||||||
assert result.error_message is None
|
assert result.error_message is None
|
||||||
|
|
||||||
|
|
||||||
def test_valid_organization(app):
|
def test_valid_organization(app):
|
||||||
# "Login" with a valid organization.
|
# "Login" with a valid organization.
|
||||||
someorg = model.user.get_namespace_user('buynlarge')
|
someorg = model.user.get_namespace_user('buynlarge')
|
||||||
|
|
|
@ -6,11 +6,12 @@ from werkzeug.exceptions import HTTPException
|
||||||
|
|
||||||
from app import LoginWrappedDBUser
|
from app import LoginWrappedDBUser
|
||||||
from auth.auth_context import get_authenticated_user
|
from auth.auth_context import get_authenticated_user
|
||||||
from auth.decorators import (extract_namespace_repo_from_session, require_session_login,
|
from auth.decorators import (
|
||||||
process_auth_or_cookie)
|
extract_namespace_repo_from_session, require_session_login, process_auth_or_cookie)
|
||||||
from data import model
|
from data import model
|
||||||
from test.fixtures import *
|
from test.fixtures import *
|
||||||
|
|
||||||
|
|
||||||
def test_extract_namespace_repo_from_session_missing(app):
|
def test_extract_namespace_repo_from_session_missing(app):
|
||||||
def emptyfunc():
|
def emptyfunc():
|
||||||
pass
|
pass
|
||||||
|
@ -102,4 +103,3 @@ def test_process_auth_or_cookie_valid_user(app):
|
||||||
|
|
||||||
# Ensure the authenticated user was updated.
|
# Ensure the authenticated user was updated.
|
||||||
assert get_authenticated_user() == someuser
|
assert get_authenticated_user() == someuser
|
||||||
|
|
||||||
|
|
|
@ -5,17 +5,17 @@ from auth.validateresult import AuthKind, ValidateResult
|
||||||
from data import model
|
from data import model
|
||||||
from test.fixtures import *
|
from test.fixtures import *
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('header, expected_result', [
|
@pytest.mark.parametrize('header, expected_result', [
|
||||||
('', ValidateResult(AuthKind.oauth, missing=True)),
|
('', ValidateResult(AuthKind.oauth, missing=True)),
|
||||||
('somerandomtoken', ValidateResult(AuthKind.oauth, missing=True)),
|
('somerandomtoken', ValidateResult(AuthKind.oauth, missing=True)),
|
||||||
('bearer some random token', ValidateResult(AuthKind.oauth, missing=True)),
|
('bearer some random token', ValidateResult(AuthKind.oauth, missing=True)),
|
||||||
|
|
||||||
('bearer invalidtoken',
|
('bearer invalidtoken',
|
||||||
ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')),
|
ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')),])
|
||||||
])
|
|
||||||
def test_bearer(header, expected_result, app):
|
def test_bearer(header, expected_result, app):
|
||||||
assert validate_bearer_auth(header) == expected_result
|
assert validate_bearer_auth(header) == expected_result
|
||||||
|
|
||||||
|
|
||||||
def test_valid_oauth(app):
|
def test_valid_oauth(app):
|
||||||
user = model.user.get_user('devtable')
|
user = model.user.get_user('devtable')
|
||||||
token = list(model.oauth.list_access_tokens_for_user(user))[0]
|
token = list(model.oauth.list_access_tokens_for_user(user))[0]
|
||||||
|
@ -25,6 +25,7 @@ def test_valid_oauth(app):
|
||||||
assert result.authed_user == user
|
assert result.authed_user == user
|
||||||
assert result.auth_valid
|
assert result.auth_valid
|
||||||
|
|
||||||
|
|
||||||
def test_disabled_user_oauth(app):
|
def test_disabled_user_oauth(app):
|
||||||
user = model.user.get_user('disabled')
|
user = model.user.get_user('disabled')
|
||||||
token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin',
|
token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin',
|
||||||
|
@ -36,6 +37,7 @@ def test_disabled_user_oauth(app):
|
||||||
assert not result.auth_valid
|
assert not result.auth_valid
|
||||||
assert result.error_message == 'Granter of the oauth access token is disabled'
|
assert result.error_message == 'Granter of the oauth access token is disabled'
|
||||||
|
|
||||||
|
|
||||||
def test_expired_token(app):
|
def test_expired_token(app):
|
||||||
user = model.user.get_user('devtable')
|
user = model.user.get_user('devtable')
|
||||||
token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin',
|
token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin',
|
||||||
|
|
|
@ -9,7 +9,6 @@ from initdb import setup_database_for_testing, finished_database_for_testing
|
||||||
from util.morecollections import AttrDict
|
from util.morecollections import AttrDict
|
||||||
from util.security.registry_jwt import ANONYMOUS_SUB, build_context_and_subject
|
from util.security.registry_jwt import ANONYMOUS_SUB, build_context_and_subject
|
||||||
|
|
||||||
|
|
||||||
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']
|
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']
|
||||||
TEST_USER = AttrDict({'username': 'joeuser'})
|
TEST_USER = AttrDict({'username': 'joeuser'})
|
||||||
MAX_SIGNED_S = 3660
|
MAX_SIGNED_S = 3660
|
||||||
|
@ -22,8 +21,7 @@ def _access(typ='repository', name='somens/somerepo', actions=None):
|
||||||
return [{
|
return [{
|
||||||
'type': typ,
|
'type': typ,
|
||||||
'name': name,
|
'name': name,
|
||||||
'actions': actions,
|
'actions': actions,}]
|
||||||
}]
|
|
||||||
|
|
||||||
|
|
||||||
def _delete_field(token_data, field_name):
|
def _delete_field(token_data, field_name):
|
||||||
|
@ -31,8 +29,8 @@ def _delete_field(token_data, field_name):
|
||||||
return token_data
|
return token_data
|
||||||
|
|
||||||
|
|
||||||
def _token_data(access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None, exp=None,
|
def _token_data(access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None,
|
||||||
nbf=None, iss=None, subject=None):
|
exp=None, nbf=None, iss=None, subject=None):
|
||||||
if subject is None:
|
if subject is None:
|
||||||
_, subject = build_context_and_subject(user=user)
|
_, subject = build_context_and_subject(user=user)
|
||||||
return {
|
return {
|
||||||
|
@ -43,8 +41,7 @@ def _token_data(access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER,
|
||||||
'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S),
|
'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S),
|
||||||
'sub': subject,
|
'sub': subject,
|
||||||
'access': access,
|
'access': access,
|
||||||
'context': context,
|
'context': context,}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _token(token_data, key_id=None, private_key=None, skip_header=False, alg=None):
|
def _token(token_data, key_id=None, private_key=None, skip_header=False, alg=None):
|
||||||
|
@ -70,12 +67,14 @@ def _parse_token(token):
|
||||||
def test_accepted_token():
|
def test_accepted_token():
|
||||||
token = _token(_token_data())
|
token = _token(_token_data())
|
||||||
identity = _parse_token(token)
|
identity = _parse_token(token)
|
||||||
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, identity.id)
|
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username,
|
||||||
|
identity.id)
|
||||||
assert len(identity.provides) == 0
|
assert len(identity.provides) == 0
|
||||||
|
|
||||||
anon_token = _token(_token_data(user=None))
|
anon_token = _token(_token_data(user=None))
|
||||||
anon_identity = _parse_token(anon_token)
|
anon_identity = _parse_token(anon_token)
|
||||||
assert anon_identity.id == ANONYMOUS_SUB, 'should be %s, but was %s' % (ANONYMOUS_SUB, anon_identity.id)
|
assert anon_identity.id == ANONYMOUS_SUB, 'should be %s, but was %s' % (ANONYMOUS_SUB,
|
||||||
|
anon_identity.id)
|
||||||
assert len(identity.provides) == 0
|
assert len(identity.provides) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@ -84,12 +83,12 @@ def test_accepted_token():
|
||||||
(_access(actions=['pull', '*'])),
|
(_access(actions=['pull', '*'])),
|
||||||
(_access(actions=['*', 'push'])),
|
(_access(actions=['*', 'push'])),
|
||||||
(_access(actions=['*'])),
|
(_access(actions=['*'])),
|
||||||
(_access(actions=['pull', '*', 'push'])),
|
(_access(actions=['pull', '*', 'push'])),])
|
||||||
])
|
|
||||||
def test_token_with_access(access):
|
def test_token_with_access(access):
|
||||||
token = _token(_token_data(access=access))
|
token = _token(_token_data(access=access))
|
||||||
identity = _parse_token(token)
|
identity = _parse_token(token)
|
||||||
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, identity.id)
|
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username,
|
||||||
|
identity.id)
|
||||||
assert len(identity.provides) == 1
|
assert len(identity.provides) == 1
|
||||||
|
|
||||||
role = list(identity.provides)[0][3]
|
role = list(identity.provides)[0][3]
|
||||||
|
@ -102,7 +101,11 @@ def test_token_with_access(access):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('token', [
|
@pytest.mark.parametrize('token', [
|
||||||
(_token(_token_data(access=[{'toipe': 'repository', 'namesies': 'somens/somerepo', 'akshuns': ['pull', 'push', '*']}]))),
|
(_token(
|
||||||
|
_token_data(access=[{
|
||||||
|
'toipe': 'repository',
|
||||||
|
'namesies': 'somens/somerepo',
|
||||||
|
'akshuns': ['pull', 'push', '*']}]))),
|
||||||
(_token(_token_data(audience='someotherapp'))),
|
(_token(_token_data(audience='someotherapp'))),
|
||||||
(_token(_delete_field(_token_data(), 'aud'))),
|
(_token(_delete_field(_token_data(), 'aud'))),
|
||||||
(_token(_token_data(nbf=int(time.time()) + 60))),
|
(_token(_token_data(nbf=int(time.time()) + 60))),
|
||||||
|
@ -121,8 +124,7 @@ def test_token_with_access(access):
|
||||||
(_token(_token_data(), alg='none', private_key=None)),
|
(_token(_token_data(), alg='none', private_key=None)),
|
||||||
('some random token'),
|
('some random token'),
|
||||||
('Bearer: sometokenhere'),
|
('Bearer: sometokenhere'),
|
||||||
('\nBearer: dGVzdA'),
|
('\nBearer: dGVzdA'),])
|
||||||
])
|
|
||||||
def test_invalid_jwt(token):
|
def test_invalid_jwt(token):
|
||||||
with pytest.raises(InvalidJWTException):
|
with pytest.raises(InvalidJWTException):
|
||||||
_parse_token(token)
|
_parse_token(token)
|
||||||
|
|
|
@ -11,7 +11,6 @@ from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTExcepti
|
||||||
from util.morecollections import AttrDict
|
from util.morecollections import AttrDict
|
||||||
from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject)
|
from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject)
|
||||||
|
|
||||||
|
|
||||||
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']
|
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']
|
||||||
TEST_USER = AttrDict({'username': 'joeuser'})
|
TEST_USER = AttrDict({'username': 'joeuser'})
|
||||||
MAX_SIGNED_S = 3660
|
MAX_SIGNED_S = 3660
|
||||||
|
@ -24,8 +23,8 @@ class TestRegistryV2Auth(unittest.TestCase):
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
finished_database_for_testing(self)
|
finished_database_for_testing(self)
|
||||||
|
|
||||||
def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None,
|
def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER,
|
||||||
exp=None, nbf=None, iss=None):
|
iat=None, exp=None, nbf=None, iss=None):
|
||||||
|
|
||||||
_, subject = build_context_and_subject(user=user)
|
_, subject = build_context_and_subject(user=user)
|
||||||
return {
|
return {
|
||||||
|
@ -36,10 +35,10 @@ class TestRegistryV2Auth(unittest.TestCase):
|
||||||
'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S),
|
'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S),
|
||||||
'sub': subject,
|
'sub': subject,
|
||||||
'access': access,
|
'access': access,
|
||||||
'context': context,
|
'context': context,}
|
||||||
}
|
|
||||||
|
|
||||||
def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, alg=None):
|
def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False,
|
||||||
|
alg=None):
|
||||||
key_id = key_id or instance_keys.local_key_id
|
key_id = key_id or instance_keys.local_key_id
|
||||||
private_key = private_key or instance_keys.local_private_key
|
private_key = private_key or instance_keys.local_private_key
|
||||||
|
|
||||||
|
@ -47,8 +46,7 @@ class TestRegistryV2Auth(unittest.TestCase):
|
||||||
private_key = None
|
private_key = None
|
||||||
|
|
||||||
token_headers = {
|
token_headers = {
|
||||||
'kid': key_id,
|
'kid': key_id,}
|
||||||
}
|
|
||||||
|
|
||||||
if skip_header:
|
if skip_header:
|
||||||
token_headers = {}
|
token_headers = {}
|
||||||
|
@ -63,13 +61,14 @@ class TestRegistryV2Auth(unittest.TestCase):
|
||||||
token_data = self._generate_token_data()
|
token_data = self._generate_token_data()
|
||||||
|
|
||||||
# Create a new key for testing.
|
# Create a new key for testing.
|
||||||
p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey',
|
p, key = model.service_keys.generate_service_key(instance_keys.service_name, None,
|
||||||
name='newkey', metadata={})
|
kid='newkey', name='newkey', metadata={})
|
||||||
|
|
||||||
private_key = p.exportKey('PEM')
|
private_key = p.exportKey('PEM')
|
||||||
|
|
||||||
# Test first with the new valid, but unapproved key.
|
# Test first with the new valid, but unapproved key.
|
||||||
unapproved_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
|
unapproved_key_token = self._generate_token(token_data, key_id='newkey',
|
||||||
|
private_key=private_key)
|
||||||
with self.assertRaises(InvalidJWTException):
|
with self.assertRaises(InvalidJWTException):
|
||||||
self._parse_token(unapproved_key_token)
|
self._parse_token(unapproved_key_token)
|
||||||
|
|
||||||
|
@ -84,12 +83,14 @@ class TestRegistryV2Auth(unittest.TestCase):
|
||||||
self.assertEqual(0, len(identity.provides))
|
self.assertEqual(0, len(identity.provides))
|
||||||
|
|
||||||
# Try using a different private key with the existing key ID.
|
# Try using a different private key with the existing key ID.
|
||||||
bad_private_token = self._generate_token(token_data, key_id='newkey', private_key=instance_keys.local_private_key)
|
bad_private_token = self._generate_token(token_data, key_id='newkey',
|
||||||
|
private_key=instance_keys.local_private_key)
|
||||||
with self.assertRaises(InvalidJWTException):
|
with self.assertRaises(InvalidJWTException):
|
||||||
self._parse_token(bad_private_token)
|
self._parse_token(bad_private_token)
|
||||||
|
|
||||||
# Try using a different key ID with the existing private key.
|
# Try using a different key ID with the existing private key.
|
||||||
kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, private_key=private_key)
|
kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id,
|
||||||
|
private_key=private_key)
|
||||||
with self.assertRaises(InvalidJWTException):
|
with self.assertRaises(InvalidJWTException):
|
||||||
self._parse_token(kid_mismatch_token)
|
self._parse_token(kid_mismatch_token)
|
||||||
|
|
||||||
|
|
|
@ -1,32 +1,34 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from auth.scopes import (scopes_from_scope_string, validate_scope_string, ALL_SCOPES,
|
from auth.scopes import (
|
||||||
is_subset_string)
|
scopes_from_scope_string, validate_scope_string, ALL_SCOPES, is_subset_string)
|
||||||
|
|
||||||
@pytest.mark.parametrize('scopes_string, expected', [
|
|
||||||
# Valid single scopes.
|
|
||||||
('repo:read', ['repo:read']),
|
|
||||||
('repo:admin', ['repo:admin']),
|
|
||||||
|
|
||||||
# Invalid scopes.
|
@pytest.mark.parametrize(
|
||||||
('not:valid', []),
|
'scopes_string, expected',
|
||||||
('repo:admins', []),
|
[
|
||||||
|
# Valid single scopes.
|
||||||
|
('repo:read', ['repo:read']),
|
||||||
|
('repo:admin', ['repo:admin']),
|
||||||
|
|
||||||
# Valid scope strings.
|
# Invalid scopes.
|
||||||
('repo:read repo:admin', ['repo:read', 'repo:admin']),
|
('not:valid', []),
|
||||||
('repo:read,repo:admin', ['repo:read', 'repo:admin']),
|
('repo:admins', []),
|
||||||
('repo:read,repo:admin repo:write', ['repo:read', 'repo:admin', 'repo:write']),
|
|
||||||
|
|
||||||
# Partially invalid scopes.
|
# Valid scope strings.
|
||||||
('repo:read,not:valid', []),
|
('repo:read repo:admin', ['repo:read', 'repo:admin']),
|
||||||
('repo:read repo:admins', []),
|
('repo:read,repo:admin', ['repo:read', 'repo:admin']),
|
||||||
|
('repo:read,repo:admin repo:write', ['repo:read', 'repo:admin', 'repo:write']),
|
||||||
|
|
||||||
# Invalid scope strings.
|
# Partially invalid scopes.
|
||||||
('repo:read|repo:admin', []),
|
('repo:read,not:valid', []),
|
||||||
|
('repo:read repo:admins', []),
|
||||||
|
|
||||||
# Mixture of delimiters.
|
# Invalid scope strings.
|
||||||
('repo:read, repo:admin', []),
|
('repo:read|repo:admin', []),
|
||||||
])
|
|
||||||
|
# Mixture of delimiters.
|
||||||
|
('repo:read, repo:admin', []),])
|
||||||
def test_parsing(scopes_string, expected):
|
def test_parsing(scopes_string, expected):
|
||||||
expected_scope_set = {ALL_SCOPES[scope_name] for scope_name in expected}
|
expected_scope_set = {ALL_SCOPES[scope_name] for scope_name in expected}
|
||||||
parsed_scope_set = scopes_from_scope_string(scopes_string)
|
parsed_scope_set = scopes_from_scope_string(scopes_string)
|
||||||
|
@ -40,11 +42,9 @@ def test_parsing(scopes_string, expected):
|
||||||
('repo:read,repo:admin', 'repo:read', True),
|
('repo:read,repo:admin', 'repo:read', True),
|
||||||
('repo:read,repo:admin', 'repo:admin', True),
|
('repo:read,repo:admin', 'repo:admin', True),
|
||||||
('repo:read,repo:admin', 'repo:admin repo:read', True),
|
('repo:read,repo:admin', 'repo:admin repo:read', True),
|
||||||
|
|
||||||
('', 'repo:read', False),
|
('', 'repo:read', False),
|
||||||
('unknown:tag', 'repo:read', False),
|
('unknown:tag', 'repo:read', False),
|
||||||
('repo:read unknown:tag', 'repo:read', False),
|
('repo:read unknown:tag', 'repo:read', False),
|
||||||
('repo:read,unknown:tag', 'repo:read', False),
|
('repo:read,unknown:tag', 'repo:read', False),])
|
||||||
])
|
|
||||||
def test_subset_string(superset, subset, result):
|
def test_subset_string(superset, subset, result):
|
||||||
assert is_subset_string(superset, subset) == result
|
assert is_subset_string(superset, subset) == result
|
||||||
|
|
|
@ -3,16 +3,18 @@ import pytest
|
||||||
from auth.signedgrant import validate_signed_grant, generate_signed_token, SIGNATURE_PREFIX
|
from auth.signedgrant import validate_signed_grant, generate_signed_token, SIGNATURE_PREFIX
|
||||||
from auth.validateresult import AuthKind, ValidateResult
|
from auth.validateresult import AuthKind, ValidateResult
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('header, expected_result', [
|
@pytest.mark.parametrize('header, expected_result', [
|
||||||
('', ValidateResult(AuthKind.signed_grant, missing=True)),
|
('', ValidateResult(AuthKind.signed_grant, missing=True)),
|
||||||
('somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)),
|
('somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)),
|
||||||
('token somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)),
|
('token somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)),
|
||||||
|
|
||||||
('token ' + SIGNATURE_PREFIX + 'foo',
|
('token ' + SIGNATURE_PREFIX + 'foo',
|
||||||
ValidateResult(AuthKind.signed_grant, error_message='Signed grant could not be validated')),
|
ValidateResult(AuthKind.signed_grant, error_message='Signed grant could not be validated')),
|
||||||
|
('token ' + generate_signed_token({
|
||||||
('token ' + generate_signed_token({'a': 'b'}, {'c': 'd'}),
|
'a': 'b'}, {'c': 'd'}), ValidateResult(AuthKind.signed_grant, signed_data={
|
||||||
ValidateResult(AuthKind.signed_grant, signed_data={'grants': {'a': 'b'}, 'user_context': {'c': 'd'}})),
|
'grants': {
|
||||||
])
|
'a': 'b'},
|
||||||
|
'user_context': {
|
||||||
|
'c': 'd'}})),])
|
||||||
def test_token(header, expected_result):
|
def test_token(header, expected_result):
|
||||||
assert validate_signed_grant(header) == expected_result
|
assert validate_signed_grant(header) == expected_result
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from auth.auth_context import (get_authenticated_user, get_grant_context, get_validated_token,
|
from auth.auth_context import (
|
||||||
get_validated_oauth_token)
|
get_authenticated_user, get_grant_context, get_validated_token, get_validated_oauth_token)
|
||||||
from auth.validateresult import AuthKind, ValidateResult
|
from auth.validateresult import AuthKind, ValidateResult
|
||||||
from data import model
|
from data import model
|
||||||
from test.fixtures import *
|
from test.fixtures import *
|
||||||
|
@ -10,27 +10,31 @@ from test.fixtures import *
|
||||||
def get_user():
|
def get_user():
|
||||||
return model.user.get_user('devtable')
|
return model.user.get_user('devtable')
|
||||||
|
|
||||||
|
|
||||||
def get_robot():
|
def get_robot():
|
||||||
robot, _ = model.user.create_robot('somebot', get_user())
|
robot, _ = model.user.create_robot('somebot', get_user())
|
||||||
return robot
|
return robot
|
||||||
|
|
||||||
|
|
||||||
def get_token():
|
def get_token():
|
||||||
return model.token.create_delegate_token('devtable', 'simple', 'sometoken')
|
return model.token.create_delegate_token('devtable', 'simple', 'sometoken')
|
||||||
|
|
||||||
|
|
||||||
def get_oauthtoken():
|
def get_oauthtoken():
|
||||||
user = model.user.get_user('devtable')
|
user = model.user.get_user('devtable')
|
||||||
return list(model.oauth.list_access_tokens_for_user(user))[0]
|
return list(model.oauth.list_access_tokens_for_user(user))[0]
|
||||||
|
|
||||||
|
|
||||||
def get_signeddata():
|
def get_signeddata():
|
||||||
return {'grants': {'a': 'b'}, 'user_context': {'c': 'd'}}
|
return {'grants': {'a': 'b'}, 'user_context': {'c': 'd'}}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('get_entity,entity_kind', [
|
@pytest.mark.parametrize('get_entity,entity_kind', [
|
||||||
(get_user, 'user'),
|
(get_user, 'user'),
|
||||||
(get_robot, 'robot'),
|
(get_robot, 'robot'),
|
||||||
(get_token, 'token'),
|
(get_token, 'token'),
|
||||||
(get_oauthtoken, 'oauthtoken'),
|
(get_oauthtoken, 'oauthtoken'),
|
||||||
(get_signeddata, 'signed_data'),
|
(get_signeddata, 'signed_data'),])
|
||||||
])
|
|
||||||
def test_apply_context(get_entity, entity_kind, app):
|
def test_apply_context(get_entity, entity_kind, app):
|
||||||
assert get_authenticated_user() is None
|
assert get_authenticated_user() is None
|
||||||
assert get_validated_token() is None
|
assert get_validated_token() is None
|
||||||
|
@ -52,9 +56,9 @@ def test_apply_context(get_entity, entity_kind, app):
|
||||||
expected_oauth = entity if entity_kind == 'oauthtoken' else None
|
expected_oauth = entity if entity_kind == 'oauthtoken' else None
|
||||||
|
|
||||||
fake_grant = {
|
fake_grant = {
|
||||||
'user': {'c': 'd'},
|
'user': {
|
||||||
'kind': 'user',
|
'c': 'd'},
|
||||||
}
|
'kind': 'user',}
|
||||||
expected_grant = fake_grant if entity_kind == 'signed_data' else None
|
expected_grant = fake_grant if entity_kind == 'signed_data' else None
|
||||||
|
|
||||||
assert get_authenticated_user() == expected_user
|
assert get_authenticated_user() == expected_user
|
||||||
|
|
Reference in a new issue