Add verb security tests and fix small issues
This commit is contained in:
parent
d5fa2ad0c0
commit
df1e7f90e0
4 changed files with 206 additions and 4 deletions
|
@ -159,7 +159,11 @@ class PreOCIModel(VerbsDataInterface):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_repository(self, namespace_name, repo_name):
|
def get_repository(self, namespace_name, repo_name):
|
||||||
return _repository_for_repo(model.repository.get_repository(namespace_name, repo_name))
|
repo = model.repository.get_repository(namespace_name, repo_name)
|
||||||
|
if repo is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return _repository_for_repo(repo)
|
||||||
|
|
||||||
def get_manifest_layers_with_blobs(self, repo_image):
|
def get_manifest_layers_with_blobs(self, repo_image):
|
||||||
repo_image_record = model.image.get_image_by_id(repo_image.repository.namespace_name,
|
repo_image_record = model.image.get_image_by_id(repo_image.repository.namespace_name,
|
||||||
|
|
|
@ -176,7 +176,7 @@ def _verify_repo_verb(_, namespace, repo_name, tag, verb, checker=None):
|
||||||
if tag_image is None:
|
if tag_image is None:
|
||||||
abort(404)
|
abort(404)
|
||||||
|
|
||||||
if repo.kind != 'image':
|
if repo is not None and repo.kind != 'image':
|
||||||
abort(405)
|
abort(405)
|
||||||
|
|
||||||
# If there is a data checker, call it first.
|
# If there is a data checker, call it first.
|
||||||
|
@ -351,10 +351,10 @@ def get_squashed_tag(namespace, repository, tag):
|
||||||
@process_auth
|
@process_auth
|
||||||
@parse_repository_name()
|
@parse_repository_name()
|
||||||
def get_tag_torrent(namespace_name, repo_name, digest):
|
def get_tag_torrent(namespace_name, repo_name, digest):
|
||||||
permission = ReadRepositoryPermission(namespace_name, repo_name)
|
|
||||||
repo = model.get_repository(namespace_name, repo_name)
|
repo = model.get_repository(namespace_name, repo_name)
|
||||||
repo_is_public = repo is not None and repo.is_public
|
repo_is_public = repo is not None and repo.is_public
|
||||||
|
|
||||||
|
permission = ReadRepositoryPermission(namespace_name, repo_name)
|
||||||
if not permission.can() and not repo_is_public:
|
if not permission.can() and not repo_is_public:
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
|
@ -363,7 +363,7 @@ def get_tag_torrent(namespace_name, repo_name, digest):
|
||||||
# We can not generate a private torrent cluster without a user uuid (e.g. token auth)
|
# We can not generate a private torrent cluster without a user uuid (e.g. token auth)
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
if repo.kind != 'image':
|
if repo is not None and repo.kind != 'image':
|
||||||
abort(405)
|
abort(405)
|
||||||
|
|
||||||
blob = model.get_repo_blob_by_digest(namespace_name, repo_name, digest)
|
blob = model.get_repo_blob_by_digest(namespace_name, repo_name, digest)
|
||||||
|
|
|
@ -508,3 +508,101 @@ def build_v2_index_specs():
|
||||||
IndexV2TestSpec('v2.cancel_upload', 'DELETE', ANOTHER_ORG_REPO, upload_uuid=FAKE_UPLOAD_ID).
|
IndexV2TestSpec('v2.cancel_upload', 'DELETE', ANOTHER_ORG_REPO, upload_uuid=FAKE_UPLOAD_ID).
|
||||||
request_status(401, 401, 401, 401, 404),
|
request_status(401, 401, 401, 401, 404),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class VerbTestSpec(object):
|
||||||
|
def __init__(self, index_name, method_name, repo_name, rpath=False, **kwargs):
|
||||||
|
self.index_name = index_name
|
||||||
|
self.repo_name = repo_name
|
||||||
|
self.method_name = method_name
|
||||||
|
self.single_repository_path = rpath
|
||||||
|
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
self.anon_code = 401
|
||||||
|
self.no_access_code = 403
|
||||||
|
self.read_code = 200
|
||||||
|
self.admin_code = 200
|
||||||
|
self.creator_code = 200
|
||||||
|
|
||||||
|
def request_status(self, anon_code=401, no_access_code=403, read_code=200, creator_code=200,
|
||||||
|
admin_code=200):
|
||||||
|
self.anon_code = anon_code
|
||||||
|
self.no_access_code = no_access_code
|
||||||
|
self.read_code = read_code
|
||||||
|
self.creator_code = creator_code
|
||||||
|
self.admin_code = admin_code
|
||||||
|
return self
|
||||||
|
|
||||||
|
def get_url(self):
|
||||||
|
if self.single_repository_path:
|
||||||
|
return url_for(self.index_name, repository=self.repo_name, **self.kwargs)
|
||||||
|
else:
|
||||||
|
(namespace, repo_name) = self.repo_name.split('/')
|
||||||
|
return url_for(self.index_name, namespace=namespace, repository=repo_name, **self.kwargs)
|
||||||
|
|
||||||
|
def gen_basic_auth(self, username, password):
|
||||||
|
encoded = b64encode('%s:%s' % (username, password))
|
||||||
|
return 'basic %s' % encoded
|
||||||
|
|
||||||
|
ACI_ARGS = {
|
||||||
|
'server': 'someserver',
|
||||||
|
'tag': 'fake',
|
||||||
|
'os': 'linux',
|
||||||
|
'arch': 'x64',
|
||||||
|
}
|
||||||
|
|
||||||
|
def build_verbs_specs():
|
||||||
|
return [
|
||||||
|
# get_aci_signature
|
||||||
|
VerbTestSpec('verbs.get_aci_signature', 'GET', PUBLIC_REPO, **ACI_ARGS).
|
||||||
|
request_status(404, 404, 404, 404, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_aci_signature', 'GET', PRIVATE_REPO, **ACI_ARGS).
|
||||||
|
request_status(403, 403, 404, 403, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_aci_signature', 'GET', ORG_REPO, **ACI_ARGS).
|
||||||
|
request_status(403, 403, 404, 403, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_aci_signature', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS).
|
||||||
|
request_status(403, 403, 403, 403, 404),
|
||||||
|
|
||||||
|
# get_aci_image
|
||||||
|
VerbTestSpec('verbs.get_aci_image', 'GET', PUBLIC_REPO, **ACI_ARGS).
|
||||||
|
request_status(404, 404, 404, 404, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_aci_image', 'GET', PRIVATE_REPO, **ACI_ARGS).
|
||||||
|
request_status(403, 403, 404, 403, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_aci_image', 'GET', ORG_REPO, **ACI_ARGS).
|
||||||
|
request_status(403, 403, 404, 403, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_aci_image', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS).
|
||||||
|
request_status(403, 403, 403, 403, 404),
|
||||||
|
|
||||||
|
# get_squashed_tag
|
||||||
|
VerbTestSpec('verbs.get_squashed_tag', 'GET', PUBLIC_REPO, tag='fake').
|
||||||
|
request_status(404, 404, 404, 404, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_squashed_tag', 'GET', PRIVATE_REPO, tag='fake').
|
||||||
|
request_status(403, 403, 404, 403, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_squashed_tag', 'GET', ORG_REPO, tag='fake').
|
||||||
|
request_status(403, 403, 404, 403, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_squashed_tag', 'GET', ANOTHER_ORG_REPO, tag='fake').
|
||||||
|
request_status(403, 403, 403, 403, 404),
|
||||||
|
|
||||||
|
# get_tag_torrent
|
||||||
|
VerbTestSpec('verbs.get_tag_torrent', 'GET', PUBLIC_REPO, digest='sha256:1234', rpath=True).
|
||||||
|
request_status(404, 404, 404, 404, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_tag_torrent', 'GET', PRIVATE_REPO, digest='sha256:1234', rpath=True).
|
||||||
|
request_status(403, 403, 404, 403, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_tag_torrent', 'GET', ORG_REPO, digest='sha256:1234', rpath=True).
|
||||||
|
request_status(403, 403, 404, 403, 404),
|
||||||
|
|
||||||
|
VerbTestSpec('verbs.get_tag_torrent', 'GET', ANOTHER_ORG_REPO, digest='sha256:1234', rpath=True).
|
||||||
|
request_status(403, 403, 403, 403, 404),
|
||||||
|
]
|
||||||
|
|
100
test/test_verbs_endpoint_security.py
Normal file
100
test/test_verbs_endpoint_security.py
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
import endpoints.decorated # Register the various exceptions via decorators.
|
||||||
|
|
||||||
|
from app import app
|
||||||
|
from endpoints.verbs import verbs
|
||||||
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||||
|
from test.specs import build_verbs_specs
|
||||||
|
|
||||||
|
app.register_blueprint(verbs, url_prefix='/c1')
|
||||||
|
|
||||||
|
NO_ACCESS_USER = 'freshuser'
|
||||||
|
READ_ACCESS_USER = 'reader'
|
||||||
|
ADMIN_ACCESS_USER = 'devtable'
|
||||||
|
CREATOR_ACCESS_USER = 'creator'
|
||||||
|
|
||||||
|
|
||||||
|
class EndpointTestCase(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
setup_database_for_testing(self)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
finished_database_for_testing(self)
|
||||||
|
|
||||||
|
|
||||||
|
class _SpecTestBuilder(type):
|
||||||
|
@staticmethod
|
||||||
|
def _test_generator(url, test_spec, attrs):
|
||||||
|
def test(self):
|
||||||
|
with app.test_client() as c:
|
||||||
|
headers = {}
|
||||||
|
|
||||||
|
if attrs['auth_username']:
|
||||||
|
headers['Authorization'] = test_spec.gen_basic_auth(attrs['auth_username'], 'password')
|
||||||
|
|
||||||
|
expected_status = getattr(test_spec, attrs['result_attr'])
|
||||||
|
|
||||||
|
rv = c.open(url, headers=headers, method=test_spec.method_name)
|
||||||
|
msg = '%s %s: got %s, expected: %s (auth: %s | headers %s)' % (test_spec.method_name,
|
||||||
|
test_spec.index_name, rv.status_code, expected_status, attrs['auth_username'],
|
||||||
|
headers)
|
||||||
|
|
||||||
|
self.assertEqual(rv.status_code, expected_status, msg)
|
||||||
|
|
||||||
|
return test
|
||||||
|
|
||||||
|
|
||||||
|
def __new__(cls, name, bases, attrs):
|
||||||
|
with app.test_request_context() as ctx:
|
||||||
|
specs = attrs['spec_func']()
|
||||||
|
for test_spec in specs:
|
||||||
|
test_name = '%s_%s_%s_%s_%s' % (test_spec.index_name, test_spec.method_name,
|
||||||
|
test_spec.repo_name, attrs['auth_username'] or 'anon',
|
||||||
|
attrs['result_attr'])
|
||||||
|
test_name = test_name.replace('/', '_').replace('-', '_')
|
||||||
|
|
||||||
|
test_name = 'test_' + test_name.lower().replace('verbs.', 'verbs_')
|
||||||
|
url = test_spec.get_url()
|
||||||
|
attrs[test_name] = _SpecTestBuilder._test_generator(url, test_spec, attrs)
|
||||||
|
|
||||||
|
return type(name, bases, attrs)
|
||||||
|
|
||||||
|
|
||||||
|
class TestAnonymousAccess(EndpointTestCase):
|
||||||
|
__metaclass__ = _SpecTestBuilder
|
||||||
|
spec_func = build_verbs_specs
|
||||||
|
result_attr = 'anon_code'
|
||||||
|
auth_username = None
|
||||||
|
|
||||||
|
|
||||||
|
class TestNoAccess(EndpointTestCase):
|
||||||
|
__metaclass__ = _SpecTestBuilder
|
||||||
|
spec_func = build_verbs_specs
|
||||||
|
result_attr = 'no_access_code'
|
||||||
|
auth_username = NO_ACCESS_USER
|
||||||
|
|
||||||
|
|
||||||
|
class TestReadAccess(EndpointTestCase):
|
||||||
|
__metaclass__ = _SpecTestBuilder
|
||||||
|
spec_func = build_verbs_specs
|
||||||
|
result_attr = 'read_code'
|
||||||
|
auth_username = READ_ACCESS_USER
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreatorAccess(EndpointTestCase):
|
||||||
|
__metaclass__ = _SpecTestBuilder
|
||||||
|
spec_func = build_verbs_specs
|
||||||
|
result_attr = 'creator_code'
|
||||||
|
auth_username = CREATOR_ACCESS_USER
|
||||||
|
|
||||||
|
|
||||||
|
class TestAdminAccess(EndpointTestCase):
|
||||||
|
__metaclass__ = _SpecTestBuilder
|
||||||
|
spec_func = build_verbs_specs
|
||||||
|
result_attr = 'admin_code'
|
||||||
|
auth_username = ADMIN_ACCESS_USER
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Reference in a new issue