commit
3ccf3c5f33
40 changed files with 790 additions and 136 deletions
Binary file not shown.
|
@ -168,6 +168,7 @@ class FailureCodes:
|
|||
INVALID_REGISTRY = ('invalidregistry', 404, 404)
|
||||
DOES_NOT_EXIST = ('doesnotexist', 404, 404)
|
||||
INVALID_REQUEST = ('invalidrequest', 400, 400)
|
||||
APP_REPOSITORY = ('apprepository', 405, 405)
|
||||
|
||||
def _get_expected_code(expected_failure, version, success_status_code):
|
||||
""" Returns the HTTP status code for the expected failure under the specified protocol version
|
||||
|
@ -545,6 +546,8 @@ class V2RegistryPushMixin(V2RegistryMixin):
|
|||
expected_auth_code = 200
|
||||
if expect_failure == FailureCodes.INVALID_REGISTRY:
|
||||
expected_auth_code = 400
|
||||
elif expect_failure == FailureCodes.APP_REPOSITORY:
|
||||
expected_auth_code = 405
|
||||
|
||||
self.do_auth(username, password, namespace, repository, scopes=scopes or ['push', 'pull'],
|
||||
expected_code=expected_auth_code)
|
||||
|
@ -685,6 +688,8 @@ class V2RegistryPullMixin(V2RegistryMixin):
|
|||
expected_auth_code = 200
|
||||
if expect_failure == FailureCodes.UNAUTHENTICATED:
|
||||
expected_auth_code = 401
|
||||
elif expect_failure == FailureCodes.APP_REPOSITORY:
|
||||
expected_auth_code = 405
|
||||
|
||||
self.do_auth(username, password, namespace, repository, scopes=['pull'],
|
||||
expected_code=expected_auth_code)
|
||||
|
@ -765,6 +770,26 @@ class V2RegistryLoginMixin(object):
|
|||
|
||||
|
||||
class RegistryTestsMixin(object):
|
||||
def test_application_repo(self):
|
||||
# Create an application repository via the API.
|
||||
self.conduct_api_login('devtable', 'password')
|
||||
data = {
|
||||
'repository': 'someapprepo',
|
||||
'visibility': 'private',
|
||||
'kind': 'application',
|
||||
'description': 'test app repo',
|
||||
}
|
||||
self.conduct('POST', '/api/v1/repository', json_data=data, expected_code=201)
|
||||
|
||||
# Try to push to the repo, which should fail with a 405.
|
||||
self.do_push('devtable', 'someapprepo', 'devtable', 'password',
|
||||
expect_failure=FailureCodes.APP_REPOSITORY)
|
||||
|
||||
# Try to pull from the repo, which should fail with a 405.
|
||||
self.do_pull('devtable', 'someapprepo', 'devtable', 'password',
|
||||
expect_failure=FailureCodes.APP_REPOSITORY)
|
||||
|
||||
|
||||
def test_middle_layer_different_sha(self):
|
||||
if self.push_version == 'v1':
|
||||
# No SHAs to munge in V1.
|
||||
|
|
|
@ -508,3 +508,101 @@ def build_v2_index_specs():
|
|||
IndexV2TestSpec('v2.cancel_upload', 'DELETE', ANOTHER_ORG_REPO, upload_uuid=FAKE_UPLOAD_ID).
|
||||
request_status(401, 401, 401, 401, 404),
|
||||
]
|
||||
|
||||
|
||||
class VerbTestSpec(object):
|
||||
def __init__(self, index_name, method_name, repo_name, rpath=False, **kwargs):
|
||||
self.index_name = index_name
|
||||
self.repo_name = repo_name
|
||||
self.method_name = method_name
|
||||
self.single_repository_path = rpath
|
||||
|
||||
self.kwargs = kwargs
|
||||
|
||||
self.anon_code = 401
|
||||
self.no_access_code = 403
|
||||
self.read_code = 200
|
||||
self.admin_code = 200
|
||||
self.creator_code = 200
|
||||
|
||||
def request_status(self, anon_code=401, no_access_code=403, read_code=200, creator_code=200,
|
||||
admin_code=200):
|
||||
self.anon_code = anon_code
|
||||
self.no_access_code = no_access_code
|
||||
self.read_code = read_code
|
||||
self.creator_code = creator_code
|
||||
self.admin_code = admin_code
|
||||
return self
|
||||
|
||||
def get_url(self):
|
||||
if self.single_repository_path:
|
||||
return url_for(self.index_name, repository=self.repo_name, **self.kwargs)
|
||||
else:
|
||||
(namespace, repo_name) = self.repo_name.split('/')
|
||||
return url_for(self.index_name, namespace=namespace, repository=repo_name, **self.kwargs)
|
||||
|
||||
def gen_basic_auth(self, username, password):
|
||||
encoded = b64encode('%s:%s' % (username, password))
|
||||
return 'basic %s' % encoded
|
||||
|
||||
ACI_ARGS = {
|
||||
'server': 'someserver',
|
||||
'tag': 'fake',
|
||||
'os': 'linux',
|
||||
'arch': 'x64',
|
||||
}
|
||||
|
||||
def build_verbs_specs():
|
||||
return [
|
||||
# get_aci_signature
|
||||
VerbTestSpec('verbs.get_aci_signature', 'GET', PUBLIC_REPO, **ACI_ARGS).
|
||||
request_status(404, 404, 404, 404, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_signature', 'GET', PRIVATE_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_signature', 'GET', ORG_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_signature', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 403, 403, 404),
|
||||
|
||||
# get_aci_image
|
||||
VerbTestSpec('verbs.get_aci_image', 'GET', PUBLIC_REPO, **ACI_ARGS).
|
||||
request_status(404, 404, 404, 404, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_image', 'GET', PRIVATE_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_image', 'GET', ORG_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_image', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 403, 403, 404),
|
||||
|
||||
# get_squashed_tag
|
||||
VerbTestSpec('verbs.get_squashed_tag', 'GET', PUBLIC_REPO, tag='fake').
|
||||
request_status(404, 404, 404, 404, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_squashed_tag', 'GET', PRIVATE_REPO, tag='fake').
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_squashed_tag', 'GET', ORG_REPO, tag='fake').
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_squashed_tag', 'GET', ANOTHER_ORG_REPO, tag='fake').
|
||||
request_status(403, 403, 403, 403, 404),
|
||||
|
||||
# get_tag_torrent
|
||||
VerbTestSpec('verbs.get_tag_torrent', 'GET', PUBLIC_REPO, digest='sha256:1234', rpath=True).
|
||||
request_status(404, 404, 404, 404, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_tag_torrent', 'GET', PRIVATE_REPO, digest='sha256:1234', rpath=True).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_tag_torrent', 'GET', ORG_REPO, digest='sha256:1234', rpath=True).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_tag_torrent', 'GET', ANOTHER_ORG_REPO, digest='sha256:1234', rpath=True).
|
||||
request_status(403, 403, 403, 403, 404),
|
||||
]
|
||||
|
|
|
@ -2551,8 +2551,8 @@ class TestRepoBuilds(ApiTestCase):
|
|||
def test_getrepo_nobuilds(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
# Queries: Permission + the list query
|
||||
with assert_query_count(2):
|
||||
# Queries: Permission + the list query + app check
|
||||
with assert_query_count(3):
|
||||
json = self.getJsonResponse(RepositoryBuildList,
|
||||
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
||||
|
||||
|
@ -2561,8 +2561,8 @@ class TestRepoBuilds(ApiTestCase):
|
|||
def test_getrepobuilds(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
# Queries: Permission + the list query
|
||||
with assert_query_count(2):
|
||||
# Queries: Permission + the list query + app check
|
||||
with assert_query_count(3):
|
||||
json = self.getJsonResponse(RepositoryBuildList,
|
||||
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
||||
|
||||
|
|
100
test/test_verbs_endpoint_security.py
Normal file
100
test/test_verbs_endpoint_security.py
Normal file
|
@ -0,0 +1,100 @@
|
|||
import unittest
|
||||
|
||||
import endpoints.decorated # Register the various exceptions via decorators.
|
||||
|
||||
from app import app
|
||||
from endpoints.verbs import verbs
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from test.specs import build_verbs_specs
|
||||
|
||||
app.register_blueprint(verbs, url_prefix='/c1')
|
||||
|
||||
NO_ACCESS_USER = 'freshuser'
|
||||
READ_ACCESS_USER = 'reader'
|
||||
ADMIN_ACCESS_USER = 'devtable'
|
||||
CREATOR_ACCESS_USER = 'creator'
|
||||
|
||||
|
||||
class EndpointTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
setup_database_for_testing(self)
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
|
||||
|
||||
class _SpecTestBuilder(type):
|
||||
@staticmethod
|
||||
def _test_generator(url, test_spec, attrs):
|
||||
def test(self):
|
||||
with app.test_client() as c:
|
||||
headers = {}
|
||||
|
||||
if attrs['auth_username']:
|
||||
headers['Authorization'] = test_spec.gen_basic_auth(attrs['auth_username'], 'password')
|
||||
|
||||
expected_status = getattr(test_spec, attrs['result_attr'])
|
||||
|
||||
rv = c.open(url, headers=headers, method=test_spec.method_name)
|
||||
msg = '%s %s: got %s, expected: %s (auth: %s | headers %s)' % (test_spec.method_name,
|
||||
test_spec.index_name, rv.status_code, expected_status, attrs['auth_username'],
|
||||
headers)
|
||||
|
||||
self.assertEqual(rv.status_code, expected_status, msg)
|
||||
|
||||
return test
|
||||
|
||||
|
||||
def __new__(cls, name, bases, attrs):
|
||||
with app.test_request_context() as ctx:
|
||||
specs = attrs['spec_func']()
|
||||
for test_spec in specs:
|
||||
test_name = '%s_%s_%s_%s_%s' % (test_spec.index_name, test_spec.method_name,
|
||||
test_spec.repo_name, attrs['auth_username'] or 'anon',
|
||||
attrs['result_attr'])
|
||||
test_name = test_name.replace('/', '_').replace('-', '_')
|
||||
|
||||
test_name = 'test_' + test_name.lower().replace('verbs.', 'verbs_')
|
||||
url = test_spec.get_url()
|
||||
attrs[test_name] = _SpecTestBuilder._test_generator(url, test_spec, attrs)
|
||||
|
||||
return type(name, bases, attrs)
|
||||
|
||||
|
||||
class TestAnonymousAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'anon_code'
|
||||
auth_username = None
|
||||
|
||||
|
||||
class TestNoAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'no_access_code'
|
||||
auth_username = NO_ACCESS_USER
|
||||
|
||||
|
||||
class TestReadAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'read_code'
|
||||
auth_username = READ_ACCESS_USER
|
||||
|
||||
|
||||
class TestCreatorAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'creator_code'
|
||||
auth_username = CREATOR_ACCESS_USER
|
||||
|
||||
|
||||
class TestAdminAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'admin_code'
|
||||
auth_username = ADMIN_ACCESS_USER
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in a new issue