Add very basic security tests for CNR APIs
This commit is contained in:
parent
f1dccc9554
commit
ef4569f2c5
3 changed files with 82 additions and 10 deletions
|
@ -13,12 +13,13 @@ CSRF_TOKEN = '123csrfforme'
|
|||
@contextmanager
|
||||
def client_with_identity(auth_username, client):
|
||||
with client.session_transaction() as sess:
|
||||
if auth_username:
|
||||
if auth_username is not None:
|
||||
if auth_username and auth_username is not None:
|
||||
loaded = model.user.get_user(auth_username)
|
||||
sess['user_id'] = loaded.uuid
|
||||
sess['login_time'] = datetime.datetime.now()
|
||||
sess[CSRF_TOKEN_KEY] = CSRF_TOKEN
|
||||
else:
|
||||
sess['user_id'] = 'anonymous'
|
||||
|
||||
yield client
|
||||
|
||||
|
|
|
@ -181,11 +181,9 @@ def push(namespace, package_name):
|
|||
logger.debug('Found invalid repository name CNR push: %s', reponame)
|
||||
raise InvalidUsage()
|
||||
|
||||
values = request.get_json(force=True, silent=True)
|
||||
release_version = values['release']
|
||||
media_type = values['media_type']
|
||||
force = request.args.get('force', 'false') == 'true'
|
||||
values = request.get_json(force=True, silent=True) or {}
|
||||
private = values.get('visibility', 'public')
|
||||
|
||||
owner = get_authenticated_user()
|
||||
if not Package.exists(reponame):
|
||||
if not CreateRepositoryPermission(namespace).can():
|
||||
|
@ -198,6 +196,13 @@ def push(namespace, package_name):
|
|||
raise UnauthorizedAccess("Unauthorized access for: %s" % reponame,
|
||||
{"package": reponame, "scopes": ['push']})
|
||||
|
||||
if not 'release' in values:
|
||||
raise InvalidUsage('Missing release')
|
||||
|
||||
release_version = values['release']
|
||||
media_type = values['media_type']
|
||||
force = request.args.get('force', 'false') == 'true'
|
||||
|
||||
blob = Blob(reponame, values['blob'])
|
||||
app_release = cnr_registry.push(reponame, release_version, media_type, blob, force,
|
||||
package_class=Package, user=owner, visibility=private)
|
||||
|
|
66
endpoints/appr/test/test_api_security.py
Normal file
66
endpoints/appr/test/test_api_security.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
import base64
|
||||
import pytest
|
||||
|
||||
from flask import url_for
|
||||
|
||||
from data import model
|
||||
from endpoints.test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file
|
||||
from endpoints.appr.registry import appr_bp, blobs
|
||||
from endpoints.api.test.shared import client_with_identity
|
||||
|
||||
@pytest.mark.parametrize('resource,method,params,owned_by,identity,expected', [
|
||||
('appr.blobs', 'GET', {'digest': 'abcd1235'}, 'devtable', 'public', 401),
|
||||
('appr.blobs', 'GET', {'digest': 'abcd1235'}, 'devtable', 'devtable', 404),
|
||||
|
||||
('appr.delete_package', 'DELETE', {'release': 'r', 'media_type': 'foo'}, 'devtable', 'public', 401),
|
||||
('appr.delete_package', 'DELETE', {'release': 'r', 'media_type': 'foo'}, 'devtable', 'devtable', 404),
|
||||
|
||||
('appr.show_package', 'GET', {'release': 'r', 'media_type': 'foo'}, 'devtable', 'public', 401),
|
||||
('appr.show_package', 'GET', {'release': 'r', 'media_type': 'foo'}, 'devtable', 'devtable', 404),
|
||||
|
||||
('appr.show_package_releases', 'GET', {}, 'devtable', 'public', 401),
|
||||
('appr.show_package_releases', 'GET', {}, 'devtable', 'devtable', 200),
|
||||
|
||||
('appr.show_package_releasse_manifests', 'GET', {'release': 'r'}, 'devtable', 'public', 401),
|
||||
('appr.show_package_releasse_manifests', 'GET', {'release': 'r'}, 'devtable', 'devtable', 200),
|
||||
|
||||
('appr.pull', 'GET', {'release': 'r', 'media_type': 'foo'}, 'devtable', 'public', 401),
|
||||
('appr.pull', 'GET', {'release': 'r', 'media_type': 'foo'}, 'devtable', 'devtable', 404),
|
||||
|
||||
('appr.push', 'POST', {}, 'devtable', 'public', 401),
|
||||
('appr.push', 'POST', {}, 'devtable', 'devtable', 400),
|
||||
|
||||
('appr.list_channels', 'GET', {}, 'devtable', 'public', 401),
|
||||
('appr.list_channels', 'GET', {}, 'devtable', 'devtable', 200),
|
||||
|
||||
('appr.show_channel', 'GET', {'channel_name': 'c'}, 'devtable', 'public', 401),
|
||||
('appr.show_channel', 'GET', {'channel_name': 'c'}, 'devtable', 'devtable', 404),
|
||||
|
||||
('appr.delete_channel', 'DELETE', {'channel_name': 'c'}, 'devtable', 'public', 401),
|
||||
('appr.delete_channel', 'DELETE', {'channel_name': 'c'}, 'devtable', 'devtable', 404),
|
||||
|
||||
('appr.add_channel_release', 'POST', {'channel_name': 'c', 'release': 'r'}, 'devtable', 'public', 401),
|
||||
('appr.add_channel_release', 'POST', {'channel_name': 'c', 'release': 'r'}, 'devtable', 'devtable', 404),
|
||||
|
||||
('appr.delete_channel_release', 'DELETE', {'channel_name': 'c', 'release': 'r'}, 'devtable', 'public', 401),
|
||||
('appr.delete_channel_release', 'DELETE', {'channel_name': 'c', 'release': 'r'}, 'devtable', 'devtable', 404),
|
||||
])
|
||||
def test_api_security(resource, method, params, owned_by, identity, expected, app, client):
|
||||
app.register_blueprint(appr_bp, url_prefix='/cnr')
|
||||
|
||||
with client_with_identity(identity, client) as cl:
|
||||
owner = model.user.get_user(owned_by)
|
||||
model.repository.create_repository(owned_by, 'someapprepo', owner, repo_kind='application')
|
||||
|
||||
params['namespace'] = owned_by
|
||||
params['package_name'] = 'someapprepo'
|
||||
params['_csrf_token'] = '123csrfforme'
|
||||
|
||||
url = url_for(resource, **params)
|
||||
headers = {}
|
||||
if identity is not None:
|
||||
headers['authorization'] = 'basic ' + base64.b64encode('%s:password' % identity)
|
||||
|
||||
rv = cl.open(url, headers=headers, method=method)
|
||||
assert rv.status_code == expected
|
||||
|
Reference in a new issue