diff --git a/data/interfaces/verbs.py b/data/interfaces/verbs.py index 5f1e02c70..6222f46b7 100644 --- a/data/interfaces/verbs.py +++ b/data/interfaces/verbs.py @@ -159,7 +159,11 @@ class PreOCIModel(VerbsDataInterface): """ def get_repository(self, namespace_name, repo_name): - return _repository_for_repo(model.repository.get_repository(namespace_name, repo_name)) + repo = model.repository.get_repository(namespace_name, repo_name) + if repo is None: + return None + + return _repository_for_repo(repo) def get_manifest_layers_with_blobs(self, repo_image): repo_image_record = model.image.get_image_by_id(repo_image.repository.namespace_name, diff --git a/endpoints/verbs/__init__.py b/endpoints/verbs/__init__.py index 16453ed08..1c6f0af92 100644 --- a/endpoints/verbs/__init__.py +++ b/endpoints/verbs/__init__.py @@ -176,7 +176,7 @@ def _verify_repo_verb(_, namespace, repo_name, tag, verb, checker=None): if tag_image is None: abort(404) - if repo.kind != 'image': + if repo is not None and repo.kind != 'image': abort(405) # If there is a data checker, call it first. @@ -351,10 +351,10 @@ def get_squashed_tag(namespace, repository, tag): @process_auth @parse_repository_name() def get_tag_torrent(namespace_name, repo_name, digest): - permission = ReadRepositoryPermission(namespace_name, repo_name) repo = model.get_repository(namespace_name, repo_name) repo_is_public = repo is not None and repo.is_public + permission = ReadRepositoryPermission(namespace_name, repo_name) if not permission.can() and not repo_is_public: abort(403) @@ -363,7 +363,7 @@ def get_tag_torrent(namespace_name, repo_name, digest): # We can not generate a private torrent cluster without a user uuid (e.g. token auth) abort(403) - if repo.kind != 'image': + if repo is not None and repo.kind != 'image': abort(405) blob = model.get_repo_blob_by_digest(namespace_name, repo_name, digest) diff --git a/test/specs.py b/test/specs.py index abbfe04de..d7bb79061 100644 --- a/test/specs.py +++ b/test/specs.py @@ -508,3 +508,101 @@ def build_v2_index_specs(): IndexV2TestSpec('v2.cancel_upload', 'DELETE', ANOTHER_ORG_REPO, upload_uuid=FAKE_UPLOAD_ID). request_status(401, 401, 401, 401, 404), ] + + +class VerbTestSpec(object): + def __init__(self, index_name, method_name, repo_name, rpath=False, **kwargs): + self.index_name = index_name + self.repo_name = repo_name + self.method_name = method_name + self.single_repository_path = rpath + + self.kwargs = kwargs + + self.anon_code = 401 + self.no_access_code = 403 + self.read_code = 200 + self.admin_code = 200 + self.creator_code = 200 + + def request_status(self, anon_code=401, no_access_code=403, read_code=200, creator_code=200, + admin_code=200): + self.anon_code = anon_code + self.no_access_code = no_access_code + self.read_code = read_code + self.creator_code = creator_code + self.admin_code = admin_code + return self + + def get_url(self): + if self.single_repository_path: + return url_for(self.index_name, repository=self.repo_name, **self.kwargs) + else: + (namespace, repo_name) = self.repo_name.split('/') + return url_for(self.index_name, namespace=namespace, repository=repo_name, **self.kwargs) + + def gen_basic_auth(self, username, password): + encoded = b64encode('%s:%s' % (username, password)) + return 'basic %s' % encoded + +ACI_ARGS = { + 'server': 'someserver', + 'tag': 'fake', + 'os': 'linux', + 'arch': 'x64', +} + +def build_verbs_specs(): + return [ + # get_aci_signature + VerbTestSpec('verbs.get_aci_signature', 'GET', PUBLIC_REPO, **ACI_ARGS). + request_status(404, 404, 404, 404, 404), + + VerbTestSpec('verbs.get_aci_signature', 'GET', PRIVATE_REPO, **ACI_ARGS). + request_status(403, 403, 404, 403, 404), + + VerbTestSpec('verbs.get_aci_signature', 'GET', ORG_REPO, **ACI_ARGS). + request_status(403, 403, 404, 403, 404), + + VerbTestSpec('verbs.get_aci_signature', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS). + request_status(403, 403, 403, 403, 404), + + # get_aci_image + VerbTestSpec('verbs.get_aci_image', 'GET', PUBLIC_REPO, **ACI_ARGS). + request_status(404, 404, 404, 404, 404), + + VerbTestSpec('verbs.get_aci_image', 'GET', PRIVATE_REPO, **ACI_ARGS). + request_status(403, 403, 404, 403, 404), + + VerbTestSpec('verbs.get_aci_image', 'GET', ORG_REPO, **ACI_ARGS). + request_status(403, 403, 404, 403, 404), + + VerbTestSpec('verbs.get_aci_image', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS). + request_status(403, 403, 403, 403, 404), + + # get_squashed_tag + VerbTestSpec('verbs.get_squashed_tag', 'GET', PUBLIC_REPO, tag='fake'). + request_status(404, 404, 404, 404, 404), + + VerbTestSpec('verbs.get_squashed_tag', 'GET', PRIVATE_REPO, tag='fake'). + request_status(403, 403, 404, 403, 404), + + VerbTestSpec('verbs.get_squashed_tag', 'GET', ORG_REPO, tag='fake'). + request_status(403, 403, 404, 403, 404), + + VerbTestSpec('verbs.get_squashed_tag', 'GET', ANOTHER_ORG_REPO, tag='fake'). + request_status(403, 403, 403, 403, 404), + + # get_tag_torrent + VerbTestSpec('verbs.get_tag_torrent', 'GET', PUBLIC_REPO, digest='sha256:1234', rpath=True). + request_status(404, 404, 404, 404, 404), + + VerbTestSpec('verbs.get_tag_torrent', 'GET', PRIVATE_REPO, digest='sha256:1234', rpath=True). + request_status(403, 403, 404, 403, 404), + + VerbTestSpec('verbs.get_tag_torrent', 'GET', ORG_REPO, digest='sha256:1234', rpath=True). + request_status(403, 403, 404, 403, 404), + + VerbTestSpec('verbs.get_tag_torrent', 'GET', ANOTHER_ORG_REPO, digest='sha256:1234', rpath=True). + request_status(403, 403, 403, 403, 404), + ] diff --git a/test/test_verbs_endpoint_security.py b/test/test_verbs_endpoint_security.py new file mode 100644 index 000000000..ac6ac36b9 --- /dev/null +++ b/test/test_verbs_endpoint_security.py @@ -0,0 +1,100 @@ +import unittest + +import endpoints.decorated # Register the various exceptions via decorators. + +from app import app +from endpoints.verbs import verbs +from initdb import setup_database_for_testing, finished_database_for_testing +from test.specs import build_verbs_specs + +app.register_blueprint(verbs, url_prefix='/c1') + +NO_ACCESS_USER = 'freshuser' +READ_ACCESS_USER = 'reader' +ADMIN_ACCESS_USER = 'devtable' +CREATOR_ACCESS_USER = 'creator' + + +class EndpointTestCase(unittest.TestCase): + def setUp(self): + setup_database_for_testing(self) + + def tearDown(self): + finished_database_for_testing(self) + + +class _SpecTestBuilder(type): + @staticmethod + def _test_generator(url, test_spec, attrs): + def test(self): + with app.test_client() as c: + headers = {} + + if attrs['auth_username']: + headers['Authorization'] = test_spec.gen_basic_auth(attrs['auth_username'], 'password') + + expected_status = getattr(test_spec, attrs['result_attr']) + + rv = c.open(url, headers=headers, method=test_spec.method_name) + msg = '%s %s: got %s, expected: %s (auth: %s | headers %s)' % (test_spec.method_name, + test_spec.index_name, rv.status_code, expected_status, attrs['auth_username'], + headers) + + self.assertEqual(rv.status_code, expected_status, msg) + + return test + + + def __new__(cls, name, bases, attrs): + with app.test_request_context() as ctx: + specs = attrs['spec_func']() + for test_spec in specs: + test_name = '%s_%s_%s_%s_%s' % (test_spec.index_name, test_spec.method_name, + test_spec.repo_name, attrs['auth_username'] or 'anon', + attrs['result_attr']) + test_name = test_name.replace('/', '_').replace('-', '_') + + test_name = 'test_' + test_name.lower().replace('verbs.', 'verbs_') + url = test_spec.get_url() + attrs[test_name] = _SpecTestBuilder._test_generator(url, test_spec, attrs) + + return type(name, bases, attrs) + + +class TestAnonymousAccess(EndpointTestCase): + __metaclass__ = _SpecTestBuilder + spec_func = build_verbs_specs + result_attr = 'anon_code' + auth_username = None + + +class TestNoAccess(EndpointTestCase): + __metaclass__ = _SpecTestBuilder + spec_func = build_verbs_specs + result_attr = 'no_access_code' + auth_username = NO_ACCESS_USER + + +class TestReadAccess(EndpointTestCase): + __metaclass__ = _SpecTestBuilder + spec_func = build_verbs_specs + result_attr = 'read_code' + auth_username = READ_ACCESS_USER + + +class TestCreatorAccess(EndpointTestCase): + __metaclass__ = _SpecTestBuilder + spec_func = build_verbs_specs + result_attr = 'creator_code' + auth_username = CREATOR_ACCESS_USER + + +class TestAdminAccess(EndpointTestCase): + __metaclass__ = _SpecTestBuilder + spec_func = build_verbs_specs + result_attr = 'admin_code' + auth_username = ADMIN_ACCESS_USER + + +if __name__ == '__main__': + unittest.main()