Add some tests to make sure our docker API is properly respecting auth.

This commit is contained in:
yackob03 2013-11-07 17:10:57 -05:00
parent 05ccd4e793
commit 8a738c2bf9
5 changed files with 278 additions and 15 deletions

View file

@ -1,5 +1,5 @@
class FakeMixpanel(object): class FakeMixpanel(object):
def send(self, endpoint, json_message): def track(*args, **kwargs):
pass pass
def init_app(app): def init_app(app):

View file

@ -3,10 +3,10 @@ import json
from flask import url_for from flask import url_for
from collections import OrderedDict from collections import OrderedDict
from uuid import uuid4 from uuid import uuid4
from base64 import b64encode
NO_REPO = None
PUBLIC_REPO = 'public/publicrepo' PUBLIC_REPO = 'public/publicrepo'
PRIVATE_REPO = 'devtable/shared' PRIVATE_REPO = 'devtable/shared'
ORG = 'devtableorg' ORG = 'devtableorg'
@ -383,3 +383,162 @@ def build_specs():
TestSpec(url_for('get_org_subscription', orgname=ORG)), TestSpec(url_for('get_org_subscription', orgname=ORG)),
] ]
class IndexTestSpec(object):
def __init__(self, url, sess_repo=None, anon_code=403, no_access_code=403,
read_code=200, admin_code=200):
self._url = url
self._method = 'GET'
self._data = None
self.sess_repo = sess_repo
self.anon_code = anon_code
self.no_access_code = no_access_code
self.read_code = read_code
self.admin_code = admin_code
def gen_basic_auth(self, username, password):
encoded = b64encode('%s:%s' % (username, password))
return 'basic %s' % encoded
def set_data_from_obj(self, json_serializable):
self._data = json.dumps(json_serializable)
return self
def set_method(self, method):
self._method = method
return self
def get_client_args(self):
kwargs = {
'method': self._method
}
if self._data or self._method == 'POST' or self._method == 'PUT':
kwargs['data'] = self._data if self._data else '{}'
kwargs['content_type'] = 'application/json'
return self._url, kwargs
def build_index_specs():
return [
IndexTestSpec(url_for('get_image_layer', image_id=FAKE_IMAGE_ID),
PUBLIC_REPO, 200, 200, 200, 200),
IndexTestSpec(url_for('get_image_layer', image_id=FAKE_IMAGE_ID),
PRIVATE_REPO),
IndexTestSpec(url_for('get_image_layer', image_id=FAKE_IMAGE_ID),
ORG_REPO),
IndexTestSpec(url_for('put_image_layer', image_id=FAKE_IMAGE_ID),
PUBLIC_REPO, 403, 403, 403, 403).set_method('PUT'),
IndexTestSpec(url_for('put_image_layer', image_id=FAKE_IMAGE_ID),
PRIVATE_REPO, 403, 403, 403, 404).set_method('PUT'),
IndexTestSpec(url_for('put_image_layer', image_id=FAKE_IMAGE_ID),
ORG_REPO, 403, 403, 403, 404).set_method('PUT'),
IndexTestSpec(url_for('put_image_checksum', image_id=FAKE_IMAGE_ID),
PUBLIC_REPO, 403, 403, 403, 403).set_method('PUT'),
IndexTestSpec(url_for('put_image_checksum', image_id=FAKE_IMAGE_ID),
PRIVATE_REPO, 403, 403, 403, 400).set_method('PUT'),
IndexTestSpec(url_for('put_image_checksum', image_id=FAKE_IMAGE_ID),
ORG_REPO, 403, 403, 403, 400).set_method('PUT'),
IndexTestSpec(url_for('get_image_json', image_id=FAKE_IMAGE_ID),
PUBLIC_REPO, 404, 404, 404, 404),
IndexTestSpec(url_for('get_image_json', image_id=FAKE_IMAGE_ID),
PRIVATE_REPO, 403, 403, 404, 404),
IndexTestSpec(url_for('get_image_json', image_id=FAKE_IMAGE_ID),
ORG_REPO, 403, 403, 404, 404),
IndexTestSpec(url_for('get_image_ancestry', image_id=FAKE_IMAGE_ID),
PUBLIC_REPO, 404, 404, 404, 404),
IndexTestSpec(url_for('get_image_ancestry', image_id=FAKE_IMAGE_ID),
PRIVATE_REPO, 403, 403, 404, 404),
IndexTestSpec(url_for('get_image_ancestry', image_id=FAKE_IMAGE_ID),
ORG_REPO, 403, 403, 404, 404),
IndexTestSpec(url_for('put_image_json', image_id=FAKE_IMAGE_ID),
PUBLIC_REPO, 403, 403, 403, 403).set_method('PUT'),
IndexTestSpec(url_for('put_image_json', image_id=FAKE_IMAGE_ID),
PRIVATE_REPO, 403, 403, 403, 400).set_method('PUT'),
IndexTestSpec(url_for('put_image_json', image_id=FAKE_IMAGE_ID),
ORG_REPO, 403, 403, 403, 400).set_method('PUT'),
IndexTestSpec(url_for('create_user'), NO_REPO, 201, 201, 201,
201).set_method('POST').set_data_from_obj(NEW_USER_DETAILS),
IndexTestSpec(url_for('get_user'), NO_REPO, 404, 200, 200, 200),
IndexTestSpec(url_for('update_user', username=FAKE_USERNAME),
NO_REPO, 403, 403, 403, 403).set_method('PUT'),
IndexTestSpec(url_for('create_repository', repository=PUBLIC_REPO),
NO_REPO, 403, 403, 403, 403).set_method('PUT'),
IndexTestSpec(url_for('create_repository', repository=PRIVATE_REPO),
NO_REPO, 403, 403, 403, 201).set_method('PUT'),
IndexTestSpec(url_for('create_repository', repository=ORG_REPO),
NO_REPO, 403, 403, 403, 201).set_method('PUT'),
IndexTestSpec(url_for('update_images', repository=PUBLIC_REPO), NO_REPO,
403, 403, 403, 403).set_method('PUT'),
IndexTestSpec(url_for('update_images', repository=PRIVATE_REPO), NO_REPO,
403, 403, 403, 204).set_method('PUT'),
IndexTestSpec(url_for('update_images', repository=ORG_REPO), NO_REPO,
403, 403, 403, 204).set_method('PUT'),
IndexTestSpec(url_for('get_repository_images', repository=PUBLIC_REPO),
NO_REPO, 200, 200, 200, 200),
IndexTestSpec(url_for('get_repository_images', repository=PRIVATE_REPO)),
IndexTestSpec(url_for('get_repository_images', repository=ORG_REPO)),
IndexTestSpec(url_for('delete_repository_images', repository=PUBLIC_REPO),
NO_REPO, 501, 501, 501, 501).set_method('DELETE'),
IndexTestSpec(url_for('put_repository_auth', repository=PUBLIC_REPO),
NO_REPO, 501, 501, 501, 501).set_method('PUT'),
IndexTestSpec(url_for('get_search'), NO_REPO, 501, 501, 501, 501),
IndexTestSpec(url_for('ping'), NO_REPO, 200, 200, 200, 200),
IndexTestSpec(url_for('get_tags', repository=PUBLIC_REPO), NO_REPO,
200, 200, 200, 200),
IndexTestSpec(url_for('get_tags', repository=PRIVATE_REPO)),
IndexTestSpec(url_for('get_tags', repository=ORG_REPO)),
IndexTestSpec(url_for('get_tag', repository=PUBLIC_REPO,
tag=FAKE_TAG_NAME), NO_REPO, 400, 400, 400, 400),
IndexTestSpec(url_for('get_tag', repository=PRIVATE_REPO,
tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 400, 400),
IndexTestSpec(url_for('get_tag', repository=ORG_REPO,
tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 400, 400),
IndexTestSpec(url_for('put_tag', repository=PUBLIC_REPO,
tag=FAKE_TAG_NAME),
NO_REPO, 403, 403, 403, 403).set_method('PUT'),
IndexTestSpec(url_for('put_tag', repository=PRIVATE_REPO,
tag=FAKE_TAG_NAME),
NO_REPO, 403, 403, 403, 400).set_method('PUT'),
IndexTestSpec(url_for('put_tag', repository=ORG_REPO, tag=FAKE_TAG_NAME),
NO_REPO, 403, 403, 403, 400).set_method('PUT'),
IndexTestSpec(url_for('delete_tag', repository=PUBLIC_REPO,
tag=FAKE_TAG_NAME),
NO_REPO, 403, 403, 403, 403).set_method('DELETE'),
IndexTestSpec(url_for('delete_tag', repository=PRIVATE_REPO,
tag=FAKE_TAG_NAME),
NO_REPO, 403, 403, 403, 400).set_method('DELETE'),
IndexTestSpec(url_for('delete_tag', repository=ORG_REPO,
tag=FAKE_TAG_NAME),
NO_REPO, 403, 403, 403, 400).set_method('DELETE'),
IndexTestSpec(url_for('delete_repository_tags', repository=PUBLIC_REPO),
NO_REPO, 403, 403, 403, 403).set_method('DELETE'),
IndexTestSpec(url_for('delete_repository_tags', repository=PRIVATE_REPO),
NO_REPO, 403, 403, 403, 204).set_method('DELETE'),
IndexTestSpec(url_for('delete_repository_tags', repository=ORG_REPO),
NO_REPO, 403, 403, 403, 204).set_method('DELETE'),
]

View file

@ -4,7 +4,6 @@ import json
import endpoints.api import endpoints.api
from app import app from app import app
from data import model
from initdb import wipe_database, initialize_database, populate_database from initdb import wipe_database, initialize_database, populate_database
from specs import build_specs from specs import build_specs
@ -21,7 +20,7 @@ class ApiTestCase(unittest.TestCase):
populate_database() populate_database()
class SpecTestBuilder(type): class _SpecTestBuilder(type):
@staticmethod @staticmethod
def _test_generator(url, expected_status, open_kwargs, auth_username=None): def _test_generator(url, expected_status, open_kwargs, auth_username=None):
def test(self): def test(self):
@ -42,8 +41,6 @@ class SpecTestBuilder(type):
rv = c.open(url, **open_kwargs) rv = c.open(url, **open_kwargs)
msg = '%s %s: %s expected: %s' % (open_kwargs['method'], url, msg = '%s %s: %s expected: %s' % (open_kwargs['method'], url,
rv.status_code, expected_status) rv.status_code, expected_status)
if rv.status_code != expected_status:
print msg
self.assertEqual(rv.status_code, expected_status, msg) self.assertEqual(rv.status_code, expected_status, msg)
return test return test
@ -54,9 +51,9 @@ class SpecTestBuilder(type):
for test_spec in specs: for test_spec in specs:
url, open_kwargs = test_spec.get_client_args() url, open_kwargs = test_spec.get_client_args()
expected_status = getattr(test_spec, attrs['result_attr']) expected_status = getattr(test_spec, attrs['result_attr'])
test = SpecTestBuilder._test_generator(url, expected_status, test = _SpecTestBuilder._test_generator(url, expected_status,
open_kwargs, open_kwargs,
attrs['auth_username']) attrs['auth_username'])
test_name_url = url.replace('/', '_').replace('-', '_') test_name_url = url.replace('/', '_').replace('-', '_')
test_name = 'test_%s_%s' % (open_kwargs['method'].lower(), test_name = 'test_%s_%s' % (open_kwargs['method'].lower(),
@ -67,28 +64,28 @@ class SpecTestBuilder(type):
class TestAnonymousAccess(ApiTestCase): class TestAnonymousAccess(ApiTestCase):
__metaclass__ = SpecTestBuilder __metaclass__ = _SpecTestBuilder
spec_func = build_specs spec_func = build_specs
result_attr = 'anon_code' result_attr = 'anon_code'
auth_username = None auth_username = None
class TestNoAccess(ApiTestCase): class TestNoAccess(ApiTestCase):
__metaclass__ = SpecTestBuilder __metaclass__ = _SpecTestBuilder
spec_func = build_specs spec_func = build_specs
result_attr = 'no_access_code' result_attr = 'no_access_code'
auth_username = NO_ACCESS_USER auth_username = NO_ACCESS_USER
class TestReadAccess(ApiTestCase): class TestReadAccess(ApiTestCase):
__metaclass__ = SpecTestBuilder __metaclass__ = _SpecTestBuilder
spec_func = build_specs spec_func = build_specs
result_attr = 'read_code' result_attr = 'read_code'
auth_username = READ_ACCESS_USER auth_username = READ_ACCESS_USER
class TestAdminAccess(ApiTestCase): class TestAdminAccess(ApiTestCase):
__metaclass__ = SpecTestBuilder __metaclass__ = _SpecTestBuilder
spec_func = build_specs spec_func = build_specs
result_attr = 'admin_code' result_attr = 'admin_code'
auth_username = ADMIN_ACCESS_USER auth_username = ADMIN_ACCESS_USER

View file

@ -0,0 +1,107 @@
import unittest
import endpoints.registry
import endpoints.index
import endpoints.tags
from app import app
from util.names import parse_namespace_repository
from initdb import wipe_database, initialize_database, populate_database
from specs import build_index_specs
NO_ACCESS_USER = 'freshuser'
READ_ACCESS_USER = 'reader'
ADMIN_ACCESS_USER = 'devtable'
class EndpointTestCase(unittest.TestCase):
def setUp(self):
wipe_database()
initialize_database()
populate_database()
class _SpecTestBuilder(type):
@staticmethod
def _test_generator(url, expected_status, open_kwargs, session_var_list):
def test(self):
with app.test_client() as c:
if session_var_list:
# Temporarily remove the teardown functions
teardown_funcs = app.teardown_request_funcs[None]
app.teardown_request_funcs[None] = []
with c.session_transaction() as sess:
for sess_key, sess_val in session_var_list:
sess[sess_key] = sess_val
# Restore the teardown functions
app.teardown_request_funcs[None] = teardown_funcs
rv = c.open(url, **open_kwargs)
msg = '%s %s: %s expected: %s' % (open_kwargs['method'], url,
rv.status_code, expected_status)
if rv.status_code != expected_status:
print msg
self.assertEqual(rv.status_code, expected_status, msg)
return test
def __new__(cls, name, bases, attrs):
with app.test_request_context() as ctx:
specs = attrs['spec_func']()
for test_spec in specs:
url, open_kwargs = test_spec.get_client_args()
if attrs['auth_username']:
basic_auth = test_spec.gen_basic_auth(attrs['auth_username'],
'password')
open_kwargs['headers'] = [('authorization', '%s' % basic_auth)]
session_vars = []
if test_spec.sess_repo:
ns, repo = parse_namespace_repository(test_spec.sess_repo)
session_vars.append(('namespace', ns))
session_vars.append(('repository', repo))
expected_status = getattr(test_spec, attrs['result_attr'])
test = _SpecTestBuilder._test_generator(url, expected_status,
open_kwargs,
session_vars)
test_name_url = url.replace('/', '_').replace('-', '_')
sess_repo = str(test_spec.sess_repo).replace('/', '_')
test_name = 'test_%s%s_%s' % (open_kwargs['method'].lower(),
test_name_url, sess_repo)
attrs[test_name] = test
return type(name, bases, attrs)
class TestAnonymousAccess(EndpointTestCase):
__metaclass__ = _SpecTestBuilder
spec_func = build_index_specs
result_attr = 'anon_code'
auth_username = None
class TestNoAccess(EndpointTestCase):
__metaclass__ = _SpecTestBuilder
spec_func = build_index_specs
result_attr = 'no_access_code'
auth_username = NO_ACCESS_USER
class TestReadAccess(EndpointTestCase):
__metaclass__ = _SpecTestBuilder
spec_func = build_index_specs
result_attr = 'read_code'
auth_username = READ_ACCESS_USER
class TestAdminAccess(EndpointTestCase):
__metaclass__ = _SpecTestBuilder
spec_func = build_index_specs
result_attr = 'admin_code'
auth_username = ADMIN_ACCESS_USER

View file

@ -23,7 +23,7 @@ class FakeStorage(Storage):
pass pass
def exists(self, path): def exists(self, path):
return True return False
class FakeUserfiles(object): class FakeUserfiles(object):