Move verbs security tests into pytest style

This commit is contained in:
Joseph Schorr 2017-06-28 12:48:02 +03:00
parent 2f018046ec
commit 8ac20edfb2
5 changed files with 85 additions and 198 deletions

View file

@ -1,9 +1,13 @@
import datetime
import json
import base64
from contextlib import contextmanager
from data import model
from flask import g
from flask_principal import Identity
CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'
@ -36,7 +40,13 @@ def add_csrf_param(params):
return params
def conduct_call(client, resource, url_for, method, params, body=None, expected_code=200, headers=None):
def gen_basic_auth(username, password):
""" Generates a basic auth header. """
return 'Basic ' + base64.b64encode("%s:%s" % (username, password))
def conduct_call(client, resource, url_for, method, params, body=None, expected_code=200,
headers=None):
""" Conducts a call to a Flask endpoint. """
params = add_csrf_param(params)
@ -48,6 +58,9 @@ def conduct_call(client, resource, url_for, method, params, body=None, expected_
if body is not None:
body = json.dumps(body)
# Required for anonymous calls to not exception.
g.identity = Identity(None, 'none')
rv = client.open(final_url, method=method, data=body, headers=headers)
msg = '%s %s: got %s expected: %s | %s' % (method, final_url, rv.status_code, expected_code,
rv.data)

View file

@ -0,0 +1,69 @@
import pytest
from flask import url_for
from endpoints.test.shared import conduct_call, gen_basic_auth
from test.fixtures import *
NO_ACCESS_USER = 'freshuser'
READ_ACCESS_USER = 'reader'
ADMIN_ACCESS_USER = 'devtable'
CREATOR_ACCESS_USER = 'creator'
PUBLIC_REPO = 'public/publicrepo'
PRIVATE_REPO = 'devtable/shared'
ORG_REPO = 'buynlarge/orgrepo'
ANOTHER_ORG_REPO = 'buynlarge/anotherorgrepo'
ACI_ARGS = {
'server': 'someserver',
'tag': 'fake',
'os': 'linux',
'arch': 'x64',
}
@pytest.mark.parametrize('user', [
(0, None),
(1, NO_ACCESS_USER),
(2, READ_ACCESS_USER),
(3, CREATOR_ACCESS_USER),
(4, ADMIN_ACCESS_USER),
])
@pytest.mark.parametrize('endpoint,method,repository,single_repo_path,params,expected_statuses', [
('get_aci_signature', 'GET', PUBLIC_REPO, False, ACI_ARGS, (404, 404, 404, 404, 404)),
('get_aci_signature', 'GET', PRIVATE_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)),
('get_aci_signature', 'GET', ORG_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)),
('get_aci_signature', 'GET', ANOTHER_ORG_REPO, False, ACI_ARGS, (403, 403, 403, 403, 404)),
# get_aci_image
('get_aci_image', 'GET', PUBLIC_REPO, False, ACI_ARGS, (404, 404, 404, 404, 404)),
('get_aci_image', 'GET', PRIVATE_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)),
('get_aci_image', 'GET', ORG_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)),
('get_aci_image', 'GET', ANOTHER_ORG_REPO, False, ACI_ARGS, (403, 403, 403, 403, 404)),
# get_squashed_tag
('get_squashed_tag', 'GET', PUBLIC_REPO, False, dict(tag='fake'), (404, 404, 404, 404, 404)),
('get_squashed_tag', 'GET', PRIVATE_REPO, False, dict(tag='fake'), (403, 403, 404, 403, 404)),
('get_squashed_tag', 'GET', ORG_REPO, False, dict(tag='fake'), (403, 403, 404, 403, 404)),
('get_squashed_tag', 'GET', ANOTHER_ORG_REPO, False, dict(tag='fake'), (403, 403, 403, 403, 404)),
# get_tag_torrent
('get_tag_torrent', 'GET', PUBLIC_REPO, True, dict(digest='sha256:1234'), (404, 404, 404, 404, 404)),
('get_tag_torrent', 'GET', PRIVATE_REPO, True, dict(digest='sha256:1234'), (403, 403, 404, 403, 404)),
('get_tag_torrent', 'GET', ORG_REPO, True, dict(digest='sha256:1234'), (403, 403, 404, 403, 404)),
('get_tag_torrent', 'GET', ANOTHER_ORG_REPO, True, dict(digest='sha256:1234'), (403, 403, 403, 403, 404)),
])
def test_verbs_security(user, endpoint, method, repository, single_repo_path, params,
expected_statuses, app, client):
headers = {}
if user[1] is not None:
headers['Authorization'] = gen_basic_auth(user[1], 'password')
if single_repo_path:
params['repository'] = repository
else:
(namespace, repo_name) = repository.split('/')
params['namespace'] = namespace
params['repository'] = repo_name
conduct_call(client, 'verbs.' + endpoint, url_for, method, params,
expected_code=expected_statuses[user[0]], headers=headers)

View file

@ -15,6 +15,7 @@ from data.model.user import LoginWrappedDBUser
from endpoints.api import api_bp
from endpoints.appr import appr_bp
from endpoints.web import web
from endpoints.verbs import verbs as verbs_bp
from initdb import initialize_database, populate_database
@ -166,6 +167,7 @@ def app(appconfig, initialized_db):
app.register_blueprint(api_bp, url_prefix='/api')
app.register_blueprint(appr_bp, url_prefix='/cnr')
app.register_blueprint(web, url_prefix='/')
app.register_blueprint(verbs_bp, url_prefix='/c1')
app.config.update(appconfig)
return app

View file

@ -509,100 +509,3 @@ def build_v2_index_specs():
request_status(401, 401, 401, 401, 404),
]
class VerbTestSpec(object):
def __init__(self, index_name, method_name, repo_name, rpath=False, **kwargs):
self.index_name = index_name
self.repo_name = repo_name
self.method_name = method_name
self.single_repository_path = rpath
self.kwargs = kwargs
self.anon_code = 401
self.no_access_code = 403
self.read_code = 200
self.admin_code = 200
self.creator_code = 200
def request_status(self, anon_code=401, no_access_code=403, read_code=200, creator_code=200,
admin_code=200):
self.anon_code = anon_code
self.no_access_code = no_access_code
self.read_code = read_code
self.creator_code = creator_code
self.admin_code = admin_code
return self
def get_url(self):
if self.single_repository_path:
return url_for(self.index_name, repository=self.repo_name, **self.kwargs)
else:
(namespace, repo_name) = self.repo_name.split('/')
return url_for(self.index_name, namespace=namespace, repository=repo_name, **self.kwargs)
def gen_basic_auth(self, username, password):
encoded = b64encode('%s:%s' % (username, password))
return 'basic %s' % encoded
ACI_ARGS = {
'server': 'someserver',
'tag': 'fake',
'os': 'linux',
'arch': 'x64',
}
def build_verbs_specs():
return [
# get_aci_signature
VerbTestSpec('verbs.get_aci_signature', 'GET', PUBLIC_REPO, **ACI_ARGS).
request_status(404, 404, 404, 404, 404),
VerbTestSpec('verbs.get_aci_signature', 'GET', PRIVATE_REPO, **ACI_ARGS).
request_status(403, 403, 404, 403, 404),
VerbTestSpec('verbs.get_aci_signature', 'GET', ORG_REPO, **ACI_ARGS).
request_status(403, 403, 404, 403, 404),
VerbTestSpec('verbs.get_aci_signature', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS).
request_status(403, 403, 403, 403, 404),
# get_aci_image
VerbTestSpec('verbs.get_aci_image', 'GET', PUBLIC_REPO, **ACI_ARGS).
request_status(404, 404, 404, 404, 404),
VerbTestSpec('verbs.get_aci_image', 'GET', PRIVATE_REPO, **ACI_ARGS).
request_status(403, 403, 404, 403, 404),
VerbTestSpec('verbs.get_aci_image', 'GET', ORG_REPO, **ACI_ARGS).
request_status(403, 403, 404, 403, 404),
VerbTestSpec('verbs.get_aci_image', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS).
request_status(403, 403, 403, 403, 404),
# get_squashed_tag
VerbTestSpec('verbs.get_squashed_tag', 'GET', PUBLIC_REPO, tag='fake').
request_status(404, 404, 404, 404, 404),
VerbTestSpec('verbs.get_squashed_tag', 'GET', PRIVATE_REPO, tag='fake').
request_status(403, 403, 404, 403, 404),
VerbTestSpec('verbs.get_squashed_tag', 'GET', ORG_REPO, tag='fake').
request_status(403, 403, 404, 403, 404),
VerbTestSpec('verbs.get_squashed_tag', 'GET', ANOTHER_ORG_REPO, tag='fake').
request_status(403, 403, 403, 403, 404),
# get_tag_torrent
VerbTestSpec('verbs.get_tag_torrent', 'GET', PUBLIC_REPO, digest='sha256:1234', rpath=True).
request_status(404, 404, 404, 404, 404),
VerbTestSpec('verbs.get_tag_torrent', 'GET', PRIVATE_REPO, digest='sha256:1234', rpath=True).
request_status(403, 403, 404, 403, 404),
VerbTestSpec('verbs.get_tag_torrent', 'GET', ORG_REPO, digest='sha256:1234', rpath=True).
request_status(403, 403, 404, 403, 404),
VerbTestSpec('verbs.get_tag_torrent', 'GET', ANOTHER_ORG_REPO, digest='sha256:1234', rpath=True).
request_status(403, 403, 403, 403, 404),
]

View file

@ -1,100 +0,0 @@
import unittest
import endpoints.decorated # Register the various exceptions via decorators.
from app import app
from endpoints.verbs import verbs
from initdb import setup_database_for_testing, finished_database_for_testing
from test.specs import build_verbs_specs
app.register_blueprint(verbs, url_prefix='/c1')
NO_ACCESS_USER = 'freshuser'
READ_ACCESS_USER = 'reader'
ADMIN_ACCESS_USER = 'devtable'
CREATOR_ACCESS_USER = 'creator'
class EndpointTestCase(unittest.TestCase):
def setUp(self):
setup_database_for_testing(self)
def tearDown(self):
finished_database_for_testing(self)
class _SpecTestBuilder(type):
@staticmethod
def _test_generator(url, test_spec, attrs):
def test(self):
with app.test_client() as c:
headers = {}
if attrs['auth_username']:
headers['Authorization'] = test_spec.gen_basic_auth(attrs['auth_username'], 'password')
expected_status = getattr(test_spec, attrs['result_attr'])
rv = c.open(url, headers=headers, method=test_spec.method_name)
msg = '%s %s: got %s, expected: %s (auth: %s | headers %s)' % (test_spec.method_name,
test_spec.index_name, rv.status_code, expected_status, attrs['auth_username'],
headers)
self.assertEqual(rv.status_code, expected_status, msg)
return test
def __new__(cls, name, bases, attrs):
with app.test_request_context() as ctx:
specs = attrs['spec_func']()
for test_spec in specs:
test_name = '%s_%s_%s_%s_%s' % (test_spec.index_name, test_spec.method_name,
test_spec.repo_name, attrs['auth_username'] or 'anon',
attrs['result_attr'])
test_name = test_name.replace('/', '_').replace('-', '_')
test_name = 'test_' + test_name.lower().replace('verbs.', 'verbs_')
url = test_spec.get_url()
attrs[test_name] = _SpecTestBuilder._test_generator(url, test_spec, attrs)
return type(name, bases, attrs)
class TestAnonymousAccess(EndpointTestCase):
__metaclass__ = _SpecTestBuilder
spec_func = build_verbs_specs
result_attr = 'anon_code'
auth_username = None
class TestNoAccess(EndpointTestCase):
__metaclass__ = _SpecTestBuilder
spec_func = build_verbs_specs
result_attr = 'no_access_code'
auth_username = NO_ACCESS_USER
class TestReadAccess(EndpointTestCase):
__metaclass__ = _SpecTestBuilder
spec_func = build_verbs_specs
result_attr = 'read_code'
auth_username = READ_ACCESS_USER
class TestCreatorAccess(EndpointTestCase):
__metaclass__ = _SpecTestBuilder
spec_func = build_verbs_specs
result_attr = 'creator_code'
auth_username = CREATOR_ACCESS_USER
class TestAdminAccess(EndpointTestCase):
__metaclass__ = _SpecTestBuilder
spec_func = build_verbs_specs
result_attr = 'admin_code'
auth_username = ADMIN_ACCESS_USER
if __name__ == '__main__':
unittest.main()