From 8a738c2bf9d4a5c46a497cea7e32551cc188e318 Mon Sep 17 00:00:00 2001 From: yackob03 Date: Thu, 7 Nov 2013 17:10:57 -0500 Subject: [PATCH] Add some tests to make sure our docker API is properly respecting auth. --- test/analytics.py | 2 +- test/specs.py | 163 ++++++++++++++++++++++++++++++++- test/test_api_security.py | 19 ++-- test/test_endpoint_security.py | 107 ++++++++++++++++++++++ test/teststorage.py | 2 +- 5 files changed, 278 insertions(+), 15 deletions(-) create mode 100644 test/test_endpoint_security.py diff --git a/test/analytics.py b/test/analytics.py index f181a8ad0..0114dd9f4 100644 --- a/test/analytics.py +++ b/test/analytics.py @@ -1,5 +1,5 @@ class FakeMixpanel(object): - def send(self, endpoint, json_message): + def track(*args, **kwargs): pass def init_app(app): diff --git a/test/specs.py b/test/specs.py index e1de69850..cf33a61a9 100644 --- a/test/specs.py +++ b/test/specs.py @@ -3,10 +3,10 @@ import json from flask import url_for from collections import OrderedDict from uuid import uuid4 +from base64 import b64encode - +NO_REPO = None PUBLIC_REPO = 'public/publicrepo' - PRIVATE_REPO = 'devtable/shared' ORG = 'devtableorg' @@ -383,3 +383,162 @@ def build_specs(): TestSpec(url_for('get_org_subscription', orgname=ORG)), ] + + +class IndexTestSpec(object): + def __init__(self, url, sess_repo=None, anon_code=403, no_access_code=403, + read_code=200, admin_code=200): + self._url = url + self._method = 'GET' + self._data = None + + self.sess_repo = sess_repo + + self.anon_code = anon_code + self.no_access_code = no_access_code + self.read_code = read_code + self.admin_code = admin_code + + def gen_basic_auth(self, username, password): + encoded = b64encode('%s:%s' % (username, password)) + return 'basic %s' % encoded + + def set_data_from_obj(self, json_serializable): + self._data = json.dumps(json_serializable) + return self + + def set_method(self, method): + self._method = method + return self + + def get_client_args(self): + kwargs = { + 'method': self._method + } + + if self._data or self._method == 'POST' or self._method == 'PUT': + kwargs['data'] = self._data if self._data else '{}' + kwargs['content_type'] = 'application/json' + + return self._url, kwargs + + +def build_index_specs(): + return [ + IndexTestSpec(url_for('get_image_layer', image_id=FAKE_IMAGE_ID), + PUBLIC_REPO, 200, 200, 200, 200), + IndexTestSpec(url_for('get_image_layer', image_id=FAKE_IMAGE_ID), + PRIVATE_REPO), + IndexTestSpec(url_for('get_image_layer', image_id=FAKE_IMAGE_ID), + ORG_REPO), + + IndexTestSpec(url_for('put_image_layer', image_id=FAKE_IMAGE_ID), + PUBLIC_REPO, 403, 403, 403, 403).set_method('PUT'), + IndexTestSpec(url_for('put_image_layer', image_id=FAKE_IMAGE_ID), + PRIVATE_REPO, 403, 403, 403, 404).set_method('PUT'), + IndexTestSpec(url_for('put_image_layer', image_id=FAKE_IMAGE_ID), + ORG_REPO, 403, 403, 403, 404).set_method('PUT'), + + IndexTestSpec(url_for('put_image_checksum', image_id=FAKE_IMAGE_ID), + PUBLIC_REPO, 403, 403, 403, 403).set_method('PUT'), + IndexTestSpec(url_for('put_image_checksum', image_id=FAKE_IMAGE_ID), + PRIVATE_REPO, 403, 403, 403, 400).set_method('PUT'), + IndexTestSpec(url_for('put_image_checksum', image_id=FAKE_IMAGE_ID), + ORG_REPO, 403, 403, 403, 400).set_method('PUT'), + + IndexTestSpec(url_for('get_image_json', image_id=FAKE_IMAGE_ID), + PUBLIC_REPO, 404, 404, 404, 404), + IndexTestSpec(url_for('get_image_json', image_id=FAKE_IMAGE_ID), + PRIVATE_REPO, 403, 403, 404, 404), + IndexTestSpec(url_for('get_image_json', image_id=FAKE_IMAGE_ID), + ORG_REPO, 403, 403, 404, 404), + + IndexTestSpec(url_for('get_image_ancestry', image_id=FAKE_IMAGE_ID), + PUBLIC_REPO, 404, 404, 404, 404), + IndexTestSpec(url_for('get_image_ancestry', image_id=FAKE_IMAGE_ID), + PRIVATE_REPO, 403, 403, 404, 404), + IndexTestSpec(url_for('get_image_ancestry', image_id=FAKE_IMAGE_ID), + ORG_REPO, 403, 403, 404, 404), + + IndexTestSpec(url_for('put_image_json', image_id=FAKE_IMAGE_ID), + PUBLIC_REPO, 403, 403, 403, 403).set_method('PUT'), + IndexTestSpec(url_for('put_image_json', image_id=FAKE_IMAGE_ID), + PRIVATE_REPO, 403, 403, 403, 400).set_method('PUT'), + IndexTestSpec(url_for('put_image_json', image_id=FAKE_IMAGE_ID), + ORG_REPO, 403, 403, 403, 400).set_method('PUT'), + + IndexTestSpec(url_for('create_user'), NO_REPO, 201, 201, 201, + 201).set_method('POST').set_data_from_obj(NEW_USER_DETAILS), + + IndexTestSpec(url_for('get_user'), NO_REPO, 404, 200, 200, 200), + + IndexTestSpec(url_for('update_user', username=FAKE_USERNAME), + NO_REPO, 403, 403, 403, 403).set_method('PUT'), + + IndexTestSpec(url_for('create_repository', repository=PUBLIC_REPO), + NO_REPO, 403, 403, 403, 403).set_method('PUT'), + IndexTestSpec(url_for('create_repository', repository=PRIVATE_REPO), + NO_REPO, 403, 403, 403, 201).set_method('PUT'), + IndexTestSpec(url_for('create_repository', repository=ORG_REPO), + NO_REPO, 403, 403, 403, 201).set_method('PUT'), + + IndexTestSpec(url_for('update_images', repository=PUBLIC_REPO), NO_REPO, + 403, 403, 403, 403).set_method('PUT'), + IndexTestSpec(url_for('update_images', repository=PRIVATE_REPO), NO_REPO, + 403, 403, 403, 204).set_method('PUT'), + IndexTestSpec(url_for('update_images', repository=ORG_REPO), NO_REPO, + 403, 403, 403, 204).set_method('PUT'), + + IndexTestSpec(url_for('get_repository_images', repository=PUBLIC_REPO), + NO_REPO, 200, 200, 200, 200), + IndexTestSpec(url_for('get_repository_images', repository=PRIVATE_REPO)), + IndexTestSpec(url_for('get_repository_images', repository=ORG_REPO)), + + IndexTestSpec(url_for('delete_repository_images', repository=PUBLIC_REPO), + NO_REPO, 501, 501, 501, 501).set_method('DELETE'), + + IndexTestSpec(url_for('put_repository_auth', repository=PUBLIC_REPO), + NO_REPO, 501, 501, 501, 501).set_method('PUT'), + + IndexTestSpec(url_for('get_search'), NO_REPO, 501, 501, 501, 501), + + IndexTestSpec(url_for('ping'), NO_REPO, 200, 200, 200, 200), + + IndexTestSpec(url_for('get_tags', repository=PUBLIC_REPO), NO_REPO, + 200, 200, 200, 200), + IndexTestSpec(url_for('get_tags', repository=PRIVATE_REPO)), + IndexTestSpec(url_for('get_tags', repository=ORG_REPO)), + + IndexTestSpec(url_for('get_tag', repository=PUBLIC_REPO, + tag=FAKE_TAG_NAME), NO_REPO, 400, 400, 400, 400), + IndexTestSpec(url_for('get_tag', repository=PRIVATE_REPO, + tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 400, 400), + IndexTestSpec(url_for('get_tag', repository=ORG_REPO, + tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 400, 400), + + IndexTestSpec(url_for('put_tag', repository=PUBLIC_REPO, + tag=FAKE_TAG_NAME), + NO_REPO, 403, 403, 403, 403).set_method('PUT'), + IndexTestSpec(url_for('put_tag', repository=PRIVATE_REPO, + tag=FAKE_TAG_NAME), + NO_REPO, 403, 403, 403, 400).set_method('PUT'), + IndexTestSpec(url_for('put_tag', repository=ORG_REPO, tag=FAKE_TAG_NAME), + NO_REPO, 403, 403, 403, 400).set_method('PUT'), + + IndexTestSpec(url_for('delete_tag', repository=PUBLIC_REPO, + tag=FAKE_TAG_NAME), + NO_REPO, 403, 403, 403, 403).set_method('DELETE'), + IndexTestSpec(url_for('delete_tag', repository=PRIVATE_REPO, + tag=FAKE_TAG_NAME), + NO_REPO, 403, 403, 403, 400).set_method('DELETE'), + IndexTestSpec(url_for('delete_tag', repository=ORG_REPO, + tag=FAKE_TAG_NAME), + NO_REPO, 403, 403, 403, 400).set_method('DELETE'), + + IndexTestSpec(url_for('delete_repository_tags', repository=PUBLIC_REPO), + NO_REPO, 403, 403, 403, 403).set_method('DELETE'), + IndexTestSpec(url_for('delete_repository_tags', repository=PRIVATE_REPO), + NO_REPO, 403, 403, 403, 204).set_method('DELETE'), + IndexTestSpec(url_for('delete_repository_tags', repository=ORG_REPO), + NO_REPO, 403, 403, 403, 204).set_method('DELETE'), + ] \ No newline at end of file diff --git a/test/test_api_security.py b/test/test_api_security.py index 6a8ffd743..49a5bc8a5 100644 --- a/test/test_api_security.py +++ b/test/test_api_security.py @@ -4,7 +4,6 @@ import json import endpoints.api from app import app -from data import model from initdb import wipe_database, initialize_database, populate_database from specs import build_specs @@ -21,7 +20,7 @@ class ApiTestCase(unittest.TestCase): populate_database() -class SpecTestBuilder(type): +class _SpecTestBuilder(type): @staticmethod def _test_generator(url, expected_status, open_kwargs, auth_username=None): def test(self): @@ -42,8 +41,6 @@ class SpecTestBuilder(type): rv = c.open(url, **open_kwargs) msg = '%s %s: %s expected: %s' % (open_kwargs['method'], url, rv.status_code, expected_status) - if rv.status_code != expected_status: - print msg self.assertEqual(rv.status_code, expected_status, msg) return test @@ -54,9 +51,9 @@ class SpecTestBuilder(type): for test_spec in specs: url, open_kwargs = test_spec.get_client_args() expected_status = getattr(test_spec, attrs['result_attr']) - test = SpecTestBuilder._test_generator(url, expected_status, - open_kwargs, - attrs['auth_username']) + test = _SpecTestBuilder._test_generator(url, expected_status, + open_kwargs, + attrs['auth_username']) test_name_url = url.replace('/', '_').replace('-', '_') test_name = 'test_%s_%s' % (open_kwargs['method'].lower(), @@ -67,28 +64,28 @@ class SpecTestBuilder(type): class TestAnonymousAccess(ApiTestCase): - __metaclass__ = SpecTestBuilder + __metaclass__ = _SpecTestBuilder spec_func = build_specs result_attr = 'anon_code' auth_username = None class TestNoAccess(ApiTestCase): - __metaclass__ = SpecTestBuilder + __metaclass__ = _SpecTestBuilder spec_func = build_specs result_attr = 'no_access_code' auth_username = NO_ACCESS_USER class TestReadAccess(ApiTestCase): - __metaclass__ = SpecTestBuilder + __metaclass__ = _SpecTestBuilder spec_func = build_specs result_attr = 'read_code' auth_username = READ_ACCESS_USER class TestAdminAccess(ApiTestCase): - __metaclass__ = SpecTestBuilder + __metaclass__ = _SpecTestBuilder spec_func = build_specs result_attr = 'admin_code' auth_username = ADMIN_ACCESS_USER diff --git a/test/test_endpoint_security.py b/test/test_endpoint_security.py new file mode 100644 index 000000000..33845ec88 --- /dev/null +++ b/test/test_endpoint_security.py @@ -0,0 +1,107 @@ +import unittest + +import endpoints.registry +import endpoints.index +import endpoints.tags + +from app import app +from util.names import parse_namespace_repository +from initdb import wipe_database, initialize_database, populate_database +from specs import build_index_specs + + +NO_ACCESS_USER = 'freshuser' +READ_ACCESS_USER = 'reader' +ADMIN_ACCESS_USER = 'devtable' + + +class EndpointTestCase(unittest.TestCase): + def setUp(self): + wipe_database() + initialize_database() + populate_database() + + +class _SpecTestBuilder(type): + @staticmethod + def _test_generator(url, expected_status, open_kwargs, session_var_list): + def test(self): + with app.test_client() as c: + if session_var_list: + # Temporarily remove the teardown functions + teardown_funcs = app.teardown_request_funcs[None] + app.teardown_request_funcs[None] = [] + + with c.session_transaction() as sess: + for sess_key, sess_val in session_var_list: + sess[sess_key] = sess_val + + # Restore the teardown functions + app.teardown_request_funcs[None] = teardown_funcs + + rv = c.open(url, **open_kwargs) + msg = '%s %s: %s expected: %s' % (open_kwargs['method'], url, + rv.status_code, expected_status) + if rv.status_code != expected_status: + print msg + self.assertEqual(rv.status_code, expected_status, msg) + return test + + + def __new__(cls, name, bases, attrs): + with app.test_request_context() as ctx: + specs = attrs['spec_func']() + for test_spec in specs: + url, open_kwargs = test_spec.get_client_args() + + if attrs['auth_username']: + basic_auth = test_spec.gen_basic_auth(attrs['auth_username'], + 'password') + open_kwargs['headers'] = [('authorization', '%s' % basic_auth)] + + session_vars = [] + if test_spec.sess_repo: + ns, repo = parse_namespace_repository(test_spec.sess_repo) + session_vars.append(('namespace', ns)) + session_vars.append(('repository', repo)) + + expected_status = getattr(test_spec, attrs['result_attr']) + test = _SpecTestBuilder._test_generator(url, expected_status, + open_kwargs, + session_vars) + + test_name_url = url.replace('/', '_').replace('-', '_') + sess_repo = str(test_spec.sess_repo).replace('/', '_') + test_name = 'test_%s%s_%s' % (open_kwargs['method'].lower(), + test_name_url, sess_repo) + attrs[test_name] = test + + return type(name, bases, attrs) + + +class TestAnonymousAccess(EndpointTestCase): + __metaclass__ = _SpecTestBuilder + spec_func = build_index_specs + result_attr = 'anon_code' + auth_username = None + + +class TestNoAccess(EndpointTestCase): + __metaclass__ = _SpecTestBuilder + spec_func = build_index_specs + result_attr = 'no_access_code' + auth_username = NO_ACCESS_USER + + +class TestReadAccess(EndpointTestCase): + __metaclass__ = _SpecTestBuilder + spec_func = build_index_specs + result_attr = 'read_code' + auth_username = READ_ACCESS_USER + + +class TestAdminAccess(EndpointTestCase): + __metaclass__ = _SpecTestBuilder + spec_func = build_index_specs + result_attr = 'admin_code' + auth_username = ADMIN_ACCESS_USER diff --git a/test/teststorage.py b/test/teststorage.py index d597bfa0e..48634075b 100644 --- a/test/teststorage.py +++ b/test/teststorage.py @@ -23,7 +23,7 @@ class FakeStorage(Storage): pass def exists(self, path): - return True + return False class FakeUserfiles(object):