From 8ac20edfb2719ea9bf124afb54d7d971931da37e Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 28 Jun 2017 12:48:02 +0300 Subject: [PATCH] Move verbs security tests into pytest style --- endpoints/test/shared.py | 15 +++- endpoints/verbs/test/test_security.py | 69 ++++++++++++++++++ test/fixtures.py | 2 + test/specs.py | 97 ------------------------- test/test_verbs_endpoint_security.py | 100 -------------------------- 5 files changed, 85 insertions(+), 198 deletions(-) create mode 100644 endpoints/verbs/test/test_security.py delete mode 100644 test/test_verbs_endpoint_security.py diff --git a/endpoints/test/shared.py b/endpoints/test/shared.py index baf7de18f..abb22ded9 100644 --- a/endpoints/test/shared.py +++ b/endpoints/test/shared.py @@ -1,9 +1,13 @@ import datetime import json +import base64 from contextlib import contextmanager from data import model +from flask import g +from flask_principal import Identity + CSRF_TOKEN_KEY = '_csrf_token' CSRF_TOKEN = '123csrfforme' @@ -36,7 +40,13 @@ def add_csrf_param(params): return params -def conduct_call(client, resource, url_for, method, params, body=None, expected_code=200, headers=None): +def gen_basic_auth(username, password): + """ Generates a basic auth header. """ + return 'Basic ' + base64.b64encode("%s:%s" % (username, password)) + + +def conduct_call(client, resource, url_for, method, params, body=None, expected_code=200, + headers=None): """ Conducts a call to a Flask endpoint. """ params = add_csrf_param(params) @@ -48,6 +58,9 @@ def conduct_call(client, resource, url_for, method, params, body=None, expected_ if body is not None: body = json.dumps(body) + # Required for anonymous calls to not exception. + g.identity = Identity(None, 'none') + rv = client.open(final_url, method=method, data=body, headers=headers) msg = '%s %s: got %s expected: %s | %s' % (method, final_url, rv.status_code, expected_code, rv.data) diff --git a/endpoints/verbs/test/test_security.py b/endpoints/verbs/test/test_security.py new file mode 100644 index 000000000..5e53c68a8 --- /dev/null +++ b/endpoints/verbs/test/test_security.py @@ -0,0 +1,69 @@ +import pytest + +from flask import url_for +from endpoints.test.shared import conduct_call, gen_basic_auth +from test.fixtures import * + +NO_ACCESS_USER = 'freshuser' +READ_ACCESS_USER = 'reader' +ADMIN_ACCESS_USER = 'devtable' +CREATOR_ACCESS_USER = 'creator' + +PUBLIC_REPO = 'public/publicrepo' +PRIVATE_REPO = 'devtable/shared' +ORG_REPO = 'buynlarge/orgrepo' +ANOTHER_ORG_REPO = 'buynlarge/anotherorgrepo' + +ACI_ARGS = { + 'server': 'someserver', + 'tag': 'fake', + 'os': 'linux', + 'arch': 'x64', +} + +@pytest.mark.parametrize('user', [ + (0, None), + (1, NO_ACCESS_USER), + (2, READ_ACCESS_USER), + (3, CREATOR_ACCESS_USER), + (4, ADMIN_ACCESS_USER), +]) +@pytest.mark.parametrize('endpoint,method,repository,single_repo_path,params,expected_statuses', [ + ('get_aci_signature', 'GET', PUBLIC_REPO, False, ACI_ARGS, (404, 404, 404, 404, 404)), + ('get_aci_signature', 'GET', PRIVATE_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)), + ('get_aci_signature', 'GET', ORG_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)), + ('get_aci_signature', 'GET', ANOTHER_ORG_REPO, False, ACI_ARGS, (403, 403, 403, 403, 404)), + + # get_aci_image + ('get_aci_image', 'GET', PUBLIC_REPO, False, ACI_ARGS, (404, 404, 404, 404, 404)), + ('get_aci_image', 'GET', PRIVATE_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)), + ('get_aci_image', 'GET', ORG_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)), + ('get_aci_image', 'GET', ANOTHER_ORG_REPO, False, ACI_ARGS, (403, 403, 403, 403, 404)), + + # get_squashed_tag + ('get_squashed_tag', 'GET', PUBLIC_REPO, False, dict(tag='fake'), (404, 404, 404, 404, 404)), + ('get_squashed_tag', 'GET', PRIVATE_REPO, False, dict(tag='fake'), (403, 403, 404, 403, 404)), + ('get_squashed_tag', 'GET', ORG_REPO, False, dict(tag='fake'), (403, 403, 404, 403, 404)), + ('get_squashed_tag', 'GET', ANOTHER_ORG_REPO, False, dict(tag='fake'), (403, 403, 403, 403, 404)), + + # get_tag_torrent + ('get_tag_torrent', 'GET', PUBLIC_REPO, True, dict(digest='sha256:1234'), (404, 404, 404, 404, 404)), + ('get_tag_torrent', 'GET', PRIVATE_REPO, True, dict(digest='sha256:1234'), (403, 403, 404, 403, 404)), + ('get_tag_torrent', 'GET', ORG_REPO, True, dict(digest='sha256:1234'), (403, 403, 404, 403, 404)), + ('get_tag_torrent', 'GET', ANOTHER_ORG_REPO, True, dict(digest='sha256:1234'), (403, 403, 403, 403, 404)), +]) +def test_verbs_security(user, endpoint, method, repository, single_repo_path, params, + expected_statuses, app, client): + headers = {} + if user[1] is not None: + headers['Authorization'] = gen_basic_auth(user[1], 'password') + + if single_repo_path: + params['repository'] = repository + else: + (namespace, repo_name) = repository.split('/') + params['namespace'] = namespace + params['repository'] = repo_name + + conduct_call(client, 'verbs.' + endpoint, url_for, method, params, + expected_code=expected_statuses[user[0]], headers=headers) diff --git a/test/fixtures.py b/test/fixtures.py index bee8199e1..c1f9e3b74 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -15,6 +15,7 @@ from data.model.user import LoginWrappedDBUser from endpoints.api import api_bp from endpoints.appr import appr_bp from endpoints.web import web +from endpoints.verbs import verbs as verbs_bp from initdb import initialize_database, populate_database @@ -166,6 +167,7 @@ def app(appconfig, initialized_db): app.register_blueprint(api_bp, url_prefix='/api') app.register_blueprint(appr_bp, url_prefix='/cnr') app.register_blueprint(web, url_prefix='/') + app.register_blueprint(verbs_bp, url_prefix='/c1') app.config.update(appconfig) return app diff --git a/test/specs.py b/test/specs.py index d7bb79061..54d6d8c64 100644 --- a/test/specs.py +++ b/test/specs.py @@ -509,100 +509,3 @@ def build_v2_index_specs(): request_status(401, 401, 401, 401, 404), ] - -class VerbTestSpec(object): - def __init__(self, index_name, method_name, repo_name, rpath=False, **kwargs): - self.index_name = index_name - self.repo_name = repo_name - self.method_name = method_name - self.single_repository_path = rpath - - self.kwargs = kwargs - - self.anon_code = 401 - self.no_access_code = 403 - self.read_code = 200 - self.admin_code = 200 - self.creator_code = 200 - - def request_status(self, anon_code=401, no_access_code=403, read_code=200, creator_code=200, - admin_code=200): - self.anon_code = anon_code - self.no_access_code = no_access_code - self.read_code = read_code - self.creator_code = creator_code - self.admin_code = admin_code - return self - - def get_url(self): - if self.single_repository_path: - return url_for(self.index_name, repository=self.repo_name, **self.kwargs) - else: - (namespace, repo_name) = self.repo_name.split('/') - return url_for(self.index_name, namespace=namespace, repository=repo_name, **self.kwargs) - - def gen_basic_auth(self, username, password): - encoded = b64encode('%s:%s' % (username, password)) - return 'basic %s' % encoded - -ACI_ARGS = { - 'server': 'someserver', - 'tag': 'fake', - 'os': 'linux', - 'arch': 'x64', -} - -def build_verbs_specs(): - return [ - # get_aci_signature - VerbTestSpec('verbs.get_aci_signature', 'GET', PUBLIC_REPO, **ACI_ARGS). - request_status(404, 404, 404, 404, 404), - - VerbTestSpec('verbs.get_aci_signature', 'GET', PRIVATE_REPO, **ACI_ARGS). - request_status(403, 403, 404, 403, 404), - - VerbTestSpec('verbs.get_aci_signature', 'GET', ORG_REPO, **ACI_ARGS). - request_status(403, 403, 404, 403, 404), - - VerbTestSpec('verbs.get_aci_signature', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS). - request_status(403, 403, 403, 403, 404), - - # get_aci_image - VerbTestSpec('verbs.get_aci_image', 'GET', PUBLIC_REPO, **ACI_ARGS). - request_status(404, 404, 404, 404, 404), - - VerbTestSpec('verbs.get_aci_image', 'GET', PRIVATE_REPO, **ACI_ARGS). - request_status(403, 403, 404, 403, 404), - - VerbTestSpec('verbs.get_aci_image', 'GET', ORG_REPO, **ACI_ARGS). - request_status(403, 403, 404, 403, 404), - - VerbTestSpec('verbs.get_aci_image', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS). - request_status(403, 403, 403, 403, 404), - - # get_squashed_tag - VerbTestSpec('verbs.get_squashed_tag', 'GET', PUBLIC_REPO, tag='fake'). - request_status(404, 404, 404, 404, 404), - - VerbTestSpec('verbs.get_squashed_tag', 'GET', PRIVATE_REPO, tag='fake'). - request_status(403, 403, 404, 403, 404), - - VerbTestSpec('verbs.get_squashed_tag', 'GET', ORG_REPO, tag='fake'). - request_status(403, 403, 404, 403, 404), - - VerbTestSpec('verbs.get_squashed_tag', 'GET', ANOTHER_ORG_REPO, tag='fake'). - request_status(403, 403, 403, 403, 404), - - # get_tag_torrent - VerbTestSpec('verbs.get_tag_torrent', 'GET', PUBLIC_REPO, digest='sha256:1234', rpath=True). - request_status(404, 404, 404, 404, 404), - - VerbTestSpec('verbs.get_tag_torrent', 'GET', PRIVATE_REPO, digest='sha256:1234', rpath=True). - request_status(403, 403, 404, 403, 404), - - VerbTestSpec('verbs.get_tag_torrent', 'GET', ORG_REPO, digest='sha256:1234', rpath=True). - request_status(403, 403, 404, 403, 404), - - VerbTestSpec('verbs.get_tag_torrent', 'GET', ANOTHER_ORG_REPO, digest='sha256:1234', rpath=True). - request_status(403, 403, 403, 403, 404), - ] diff --git a/test/test_verbs_endpoint_security.py b/test/test_verbs_endpoint_security.py deleted file mode 100644 index ac6ac36b9..000000000 --- a/test/test_verbs_endpoint_security.py +++ /dev/null @@ -1,100 +0,0 @@ -import unittest - -import endpoints.decorated # Register the various exceptions via decorators. - -from app import app -from endpoints.verbs import verbs -from initdb import setup_database_for_testing, finished_database_for_testing -from test.specs import build_verbs_specs - -app.register_blueprint(verbs, url_prefix='/c1') - -NO_ACCESS_USER = 'freshuser' -READ_ACCESS_USER = 'reader' -ADMIN_ACCESS_USER = 'devtable' -CREATOR_ACCESS_USER = 'creator' - - -class EndpointTestCase(unittest.TestCase): - def setUp(self): - setup_database_for_testing(self) - - def tearDown(self): - finished_database_for_testing(self) - - -class _SpecTestBuilder(type): - @staticmethod - def _test_generator(url, test_spec, attrs): - def test(self): - with app.test_client() as c: - headers = {} - - if attrs['auth_username']: - headers['Authorization'] = test_spec.gen_basic_auth(attrs['auth_username'], 'password') - - expected_status = getattr(test_spec, attrs['result_attr']) - - rv = c.open(url, headers=headers, method=test_spec.method_name) - msg = '%s %s: got %s, expected: %s (auth: %s | headers %s)' % (test_spec.method_name, - test_spec.index_name, rv.status_code, expected_status, attrs['auth_username'], - headers) - - self.assertEqual(rv.status_code, expected_status, msg) - - return test - - - def __new__(cls, name, bases, attrs): - with app.test_request_context() as ctx: - specs = attrs['spec_func']() - for test_spec in specs: - test_name = '%s_%s_%s_%s_%s' % (test_spec.index_name, test_spec.method_name, - test_spec.repo_name, attrs['auth_username'] or 'anon', - attrs['result_attr']) - test_name = test_name.replace('/', '_').replace('-', '_') - - test_name = 'test_' + test_name.lower().replace('verbs.', 'verbs_') - url = test_spec.get_url() - attrs[test_name] = _SpecTestBuilder._test_generator(url, test_spec, attrs) - - return type(name, bases, attrs) - - -class TestAnonymousAccess(EndpointTestCase): - __metaclass__ = _SpecTestBuilder - spec_func = build_verbs_specs - result_attr = 'anon_code' - auth_username = None - - -class TestNoAccess(EndpointTestCase): - __metaclass__ = _SpecTestBuilder - spec_func = build_verbs_specs - result_attr = 'no_access_code' - auth_username = NO_ACCESS_USER - - -class TestReadAccess(EndpointTestCase): - __metaclass__ = _SpecTestBuilder - spec_func = build_verbs_specs - result_attr = 'read_code' - auth_username = READ_ACCESS_USER - - -class TestCreatorAccess(EndpointTestCase): - __metaclass__ = _SpecTestBuilder - spec_func = build_verbs_specs - result_attr = 'creator_code' - auth_username = CREATOR_ACCESS_USER - - -class TestAdminAccess(EndpointTestCase): - __metaclass__ = _SpecTestBuilder - spec_func = build_verbs_specs - result_attr = 'admin_code' - auth_username = ADMIN_ACCESS_USER - - -if __name__ == '__main__': - unittest.main()