parent
1cf930eb9c
commit
d0dc8fe45d
5 changed files with 416 additions and 64 deletions
110
test/test_v2_endpoint_security.py
Normal file
110
test/test_v2_endpoint_security.py
Normal file
|
@ -0,0 +1,110 @@
|
|||
import unittest
|
||||
import endpoints.decorated
|
||||
import json
|
||||
|
||||
from app import app
|
||||
from util.names import parse_namespace_repository
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from specs import build_v2_index_specs
|
||||
from endpoints.v2 import v2_bp
|
||||
|
||||
app.register_blueprint(v2_bp, url_prefix='/v2')
|
||||
|
||||
NO_ACCESS_USER = 'freshuser'
|
||||
READ_ACCESS_USER = 'reader'
|
||||
ADMIN_ACCESS_USER = 'devtable'
|
||||
|
||||
|
||||
class EndpointTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
setup_database_for_testing(self)
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
|
||||
|
||||
class _SpecTestBuilder(type):
|
||||
@staticmethod
|
||||
def _test_generator(url, test_spec, attrs):
|
||||
def test(self):
|
||||
with app.test_client() as c:
|
||||
headers = []
|
||||
expected_index_status = getattr(test_spec, attrs['result_attr'])
|
||||
|
||||
if attrs['auth_username']:
|
||||
expected_auth_status = getattr(test_spec, 'auth_' + attrs['result_attr'])
|
||||
|
||||
# Get a signed JWT.
|
||||
username = attrs['auth_username']
|
||||
password = 'password'
|
||||
|
||||
jwt_scope = test_spec.get_scope_string()
|
||||
query_string = 'service=' + app.config['SERVER_HOSTNAME'] + '&scope=' + jwt_scope
|
||||
|
||||
arv = c.open('/v2/auth',
|
||||
headers=[('authorization', test_spec.gen_basic_auth(username, password))],
|
||||
query_string=query_string)
|
||||
|
||||
msg = 'Auth failed for %s %s: got %s, expected: %s' % (
|
||||
test_spec.method_name, test_spec.index_name, arv.status_code, expected_auth_status)
|
||||
self.assertEqual(arv.status_code, expected_auth_status, msg)
|
||||
|
||||
if arv.status_code == 200:
|
||||
headers = [('authorization', 'Bearer ' + json.loads(arv.data)['token'])]
|
||||
|
||||
rv = c.open(url, headers=headers, method=test_spec.method_name)
|
||||
msg = '%s %s: got %s, expected: %s (auth: %s | headers %s)' % (test_spec.method_name,
|
||||
test_spec.index_name, rv.status_code, expected_index_status, attrs['auth_username'],
|
||||
len(headers))
|
||||
|
||||
self.assertEqual(rv.status_code, expected_index_status, msg)
|
||||
|
||||
return test
|
||||
|
||||
|
||||
def __new__(cls, name, bases, attrs):
|
||||
with app.test_request_context() as ctx:
|
||||
specs = attrs['spec_func']()
|
||||
for test_spec in specs:
|
||||
test_name = '%s_%s_%s_%s_%s' % (test_spec.index_name, test_spec.method_name,
|
||||
test_spec.repo_name, attrs['auth_username'] or 'anon',
|
||||
attrs['result_attr'])
|
||||
test_name = test_name.replace('/', '_').replace('-', '_')
|
||||
|
||||
test_name = 'test_' + test_name.lower().replace('v2.', 'v2_')
|
||||
url = test_spec.get_url()
|
||||
attrs[test_name] = _SpecTestBuilder._test_generator(url, test_spec, attrs)
|
||||
|
||||
return type(name, bases, attrs)
|
||||
|
||||
|
||||
class TestAnonymousAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_v2_index_specs
|
||||
result_attr = 'anon_code'
|
||||
auth_username = None
|
||||
|
||||
|
||||
class TestNoAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_v2_index_specs
|
||||
result_attr = 'no_access_code'
|
||||
auth_username = NO_ACCESS_USER
|
||||
|
||||
|
||||
class TestReadAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_v2_index_specs
|
||||
result_attr = 'read_code'
|
||||
auth_username = READ_ACCESS_USER
|
||||
|
||||
|
||||
class TestAdminAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_v2_index_specs
|
||||
result_attr = 'admin_code'
|
||||
auth_username = ADMIN_ACCESS_USER
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in a new issue