update(security_test.py): moving tests to new framework

We should be moving tests over to pytest

[none]
This commit is contained in:
Charlton Austin 2017-01-27 11:22:40 -05:00
parent 0b51ea34b1
commit 85bcb63439
7 changed files with 190 additions and 107 deletions

53
app.py
View file

@ -21,10 +21,12 @@ from data import model
from data.archivedlogs import LogArchive from data.archivedlogs import LogArchive
from data.billing import Billing from data.billing import Billing
from data.buildlogs import BuildLogs from data.buildlogs import BuildLogs
from data.model.user import LoginWrappedDBUser
from data.queue import WorkQueue, BuildMetricQueueReporter from data.queue import WorkQueue, BuildMetricQueueReporter
from data.userevent import UserEventsBuilderModule from data.userevent import UserEventsBuilderModule
from data.userfiles import Userfiles from data.userfiles import Userfiles
from data.users import UserAuthentication from data.users import UserAuthentication
from path_converters import RegexConverter, RepositoryPathConverter, APIRepositoryPathConverter
from oauth.services.github import GithubOAuthService from oauth.services.github import GithubOAuthService
from oauth.services.gitlab import GitLabOAuthService from oauth.services.gitlab import GitLabOAuthService
from oauth.loginmanager import OAuthLoginManager from oauth.loginmanager import OAuthLoginManager
@ -134,37 +136,6 @@ for handler in root_logger.handlers:
app.request_class = RequestWithId app.request_class = RequestWithId
# Register custom converters. # Register custom converters.
class RegexConverter(BaseConverter):
""" Converter for handling custom regular expression patterns in paths. """
def __init__(self, url_map, regex_value):
super(RegexConverter, self).__init__(url_map)
self.regex = regex_value
class RepositoryPathConverter(BaseConverter):
""" Converter for handling repository paths. Handles both library and non-library paths (if
configured).
"""
def __init__(self, url_map):
super(RepositoryPathConverter, self).__init__(url_map)
self.weight = 200
if features.LIBRARY_SUPPORT:
# Allow names without namespaces.
self.regex = r'[^/]+(/[^/]+)?'
else:
self.regex = r'([^/]+/[^/]+)'
class APIRepositoryPathConverter(BaseConverter):
""" Converter for handling repository paths. Does not handle library paths.
"""
def __init__(self, url_map):
super(APIRepositoryPathConverter, self).__init__(url_map)
self.weight = 200
self.regex = r'([^/]+/[^/]+)'
app.url_map.converters['regex'] = RegexConverter app.url_map.converters['regex'] = RegexConverter
app.url_map.converters['repopath'] = RepositoryPathConverter app.url_map.converters['repopath'] = RepositoryPathConverter
app.url_map.converters['apirepopath'] = APIRepositoryPathConverter app.url_map.converters['apirepopath'] = APIRepositoryPathConverter
@ -242,25 +213,5 @@ def load_user(user_uuid):
logger.debug('User loader loading deferred user with uuid: %s', user_uuid) logger.debug('User loader loading deferred user with uuid: %s', user_uuid)
return LoginWrappedDBUser(user_uuid) return LoginWrappedDBUser(user_uuid)
class LoginWrappedDBUser(UserMixin):
def __init__(self, user_uuid, db_user=None):
self._uuid = user_uuid
self._db_user = db_user
def db_user(self):
if not self._db_user:
self._db_user = model.user.get_user_by_uuid(self._uuid)
return self._db_user
@property
def is_authenticated(self):
return self.db_user() is not None
@property
def is_active(self):
return self.db_user().verified
def get_id(self):
return unicode(self._uuid)
get_app_url = partial(get_app_url, app.config) get_app_url = partial(get_app_url, app.config)

View file

@ -2,6 +2,7 @@ import bcrypt
import logging import logging
import json import json
import uuid import uuid
from flask.ext.login import UserMixin
from peewee import JOIN_LEFT_OUTER, IntegrityError, fn from peewee import JOIN_LEFT_OUTER, IntegrityError, fn
from uuid import uuid4 from uuid import uuid4
@ -841,3 +842,25 @@ def get_federated_logins(user_ids, service_name):
.join(LoginService) .join(LoginService)
.where(FederatedLogin.user << user_ids, .where(FederatedLogin.user << user_ids,
LoginService.name == service_name)) LoginService.name == service_name))
class LoginWrappedDBUser(UserMixin):
def __init__(self, user_uuid, db_user=None):
self._uuid = user_uuid
self._db_user = db_user
def db_user(self):
if not self._db_user:
self._db_user = get_user_by_uuid(self._uuid)
return self._db_user
@property
def is_authenticated(self):
return self.db_user() is not None
@property
def is_active(self):
return self.db_user().verified
def get_id(self):
return unicode(self._uuid)

View file

@ -0,0 +1,86 @@
import os
import pytest
import shutil
from flask import Flask, jsonify
from flask.ext.login import LoginManager
from peewee import SqliteDatabase
from app import app as application
from data import model
from data.database import (close_db_filter, db)
from data.model.user import LoginWrappedDBUser
from endpoints.api import api_bp
from initdb import initialize_database, populate_database
from path_converters import APIRepositoryPathConverter, RegexConverter
@pytest.fixture()
def app(appconfig):
""" Used by pytest-flask plugin to inject app by test for client See test_security by name injection of client. """
app = Flask(__name__)
login_manager = LoginManager(app)
@app.errorhandler(model.DataModelException)
def handle_dme(ex):
response = jsonify({'message': ex.message})
response.status_code = 400
return response
@login_manager.user_loader
def load_user(user_uuid):
return LoginWrappedDBUser(user_uuid)
app.url_map.converters['regex'] = RegexConverter
app.url_map.converters['apirepopath'] = APIRepositoryPathConverter
app.register_blueprint(api_bp, url_prefix='/api')
app.config.update(appconfig)
return app
@pytest.fixture(scope="session")
def init_db_path(tmpdir_factory):
""" Creates a new db and appropriate configuration. Used for parameter by name injection. """
sqlitedb_file = str(tmpdir_factory.mktemp("data").join("test.db"))
sqlitedb = 'sqlite:///{0}'.format(sqlitedb_file)
conf = {"TESTING": True,
"DEBUG": True,
"DB_URI": sqlitedb}
os.environ['TEST_DATABASE_URI'] = str(sqlitedb)
os.environ['DB_URI'] = str(sqlitedb)
db.initialize(SqliteDatabase(sqlitedb_file))
application.config.update(conf)
application.config.update({"DB_URI": sqlitedb})
initialize_database()
populate_database()
close_db_filter(None)
return str(sqlitedb_file)
@pytest.fixture()
def database_uri(monkeypatch, init_db_path, sqlitedb_file):
""" Creates the db uri. Used for parameter by name injection. """
shutil.copy2(init_db_path, sqlitedb_file)
db.initialize(SqliteDatabase(sqlitedb_file))
db_path = 'sqlite:///{0}'.format(sqlitedb_file)
monkeypatch.setenv("DB_URI", db_path)
return db_path
@pytest.fixture()
def sqlitedb_file(tmpdir):
""" Makes file for db. Used for parameter by name injection. """
test_db_file = tmpdir.mkdir("quaydb").join("test.db")
return str(test_db_file)
@pytest.fixture()
def appconfig(database_uri):
""" Makes conf with database_uri. Used for parameter by name injection """
conf = {
"TESTING": True,
"DEBUG": True,
"DB_URI": database_uri,
"SECRET_KEY": 'superdupersecret!!!1',
}
return conf

View file

@ -0,0 +1,43 @@
import datetime
import pytest
from data import model
from endpoints.api import api
from endpoints.api.superuser import SuperUserRepositoryBuildLogs, SuperUserRepositoryBuildResource
from endpoints.api.superuser import SuperUserRepositoryBuildStatus
def client_with_identity(auth_username, client):
with client.session_transaction() as sess:
if auth_username:
if auth_username is not None:
loaded = model.user.get_user(auth_username)
sess['user_id'] = loaded.uuid
sess['login_time'] = datetime.datetime.now()
return client
@pytest.mark.parametrize('resource,identity,expected', [
(SuperUserRepositoryBuildLogs, None, 401),
(SuperUserRepositoryBuildLogs, 'freshuser', 403),
(SuperUserRepositoryBuildLogs, 'reader', 403),
(SuperUserRepositoryBuildLogs, 'devtable', 400),
(SuperUserRepositoryBuildStatus, None, 401),
(SuperUserRepositoryBuildStatus, 'freshuser', 403),
(SuperUserRepositoryBuildStatus, 'reader', 403),
(SuperUserRepositoryBuildStatus, 'devtable', 400),
(SuperUserRepositoryBuildResource, None, 401),
(SuperUserRepositoryBuildResource, 'freshuser', 403),
(SuperUserRepositoryBuildResource, 'reader', 403),
(SuperUserRepositoryBuildResource, 'devtable', 404),
])
def test_super_user_build_endpoints(resource, identity, expected, client):
cl = client_with_identity(identity, client)
final_url = api.url_for(resource, build_uuid='1234')
rv = cl.open(final_url)
msg = '%s %s: %s expected: %s' % ('GET', final_url, rv.status_code, expected)
assert rv.status_code == expected, msg

34
path_converters.py Normal file
View file

@ -0,0 +1,34 @@
from werkzeug.routing import BaseConverter
import features
class APIRepositoryPathConverter(BaseConverter):
""" Converter for handling repository paths. Does not handle library paths.
"""
def __init__(self, url_map):
super(APIRepositoryPathConverter, self).__init__(url_map)
self.weight = 200
self.regex = r'([^/]+/[^/]+)'
class RepositoryPathConverter(BaseConverter):
""" Converter for handling repository paths. Handles both library and non-library paths (if
configured).
"""
def __init__(self, url_map):
super(RepositoryPathConverter, self).__init__(url_map)
self.weight = 200
if features.LIBRARY_SUPPORT:
# Allow names without namespaces.
self.regex = r'[^/]+(/[^/]+)?'
else:
self.regex = r'([^/]+/[^/]+)'
class RegexConverter(BaseConverter):
""" Converter for handling custom regular expression patterns in paths. """
def __init__(self, url_map, regex_value):
super(RegexConverter, self).__init__(url_map)
self.regex = regex_value

View file

@ -4351,60 +4351,6 @@ class TestSuperUserMessage(ApiTestCase):
self._run_test('DELETE', 204, 'devtable', None) self._run_test('DELETE', 204, 'devtable', None)
class TestSuperUserRepositoryBuildLogs(ApiTestCase):
def setUp(self):
ApiTestCase.setUp(self)
self._set_url(SuperUserRepositoryBuildLogs, build_uuid='1234')
def test_get_anonymous(self):
self._run_test('GET', 401, None, None)
def test_get_freshuser(self):
self._run_test('GET', 403, 'freshuser', None)
def test_get_reader(self):
self._run_test('GET', 403, 'reader', None)
def test_get_devtable(self):
self._run_test('GET', 400, 'devtable', None)
class TestSuperUserRepositoryBuildStatus(ApiTestCase):
def setUp(self):
ApiTestCase.setUp(self)
self._set_url(SuperUserRepositoryBuildStatus, build_uuid='1234')
def test_get_anonymous(self):
self._run_test('GET', 401, None, None)
def test_get_freshuser(self):
self._run_test('GET', 403, 'freshuser', None)
def test_get_reader(self):
self._run_test('GET', 403, 'reader', None)
def test_get_devtable(self):
self._run_test('GET', 400, 'devtable', None)
class TestSuperUserRepositoryBuildResource(ApiTestCase):
def setUp(self):
ApiTestCase.setUp(self)
self._set_url(SuperUserRepositoryBuildResource, build_uuid='1234')
def test_get_anonymous(self):
self._run_test('GET', 401, None, None)
def test_get_freshuser(self):
self._run_test('GET', 403, 'freshuser', None)
def test_get_reader(self):
self._run_test('GET', 403, 'reader', None)
def test_get_devtable(self):
self._run_test('GET', 404, 'devtable', None)
class TestUserInvoiceFieldList(ApiTestCase): class TestUserInvoiceFieldList(ApiTestCase):
def setUp(self): def setUp(self):
ApiTestCase.setUp(self) ApiTestCase.setUp(self)

View file

@ -1104,7 +1104,7 @@ class TestCreateOrganization(ApiTestCase):
email='testorg@example.com'), email='testorg@example.com'),
expected_code=201) expected_code=201)
self.assertEquals('"Created"', data) self.assertEquals('"Created"', data.strip())
# Ensure the org was created. # Ensure the org was created.
organization = model.organization.get_organization('neworg') organization = model.organization.get_organization('neworg')
@ -1145,7 +1145,7 @@ class TestCreateOrganization(ApiTestCase):
headers=dict(Authorization='Bearer ' + token.access_token), headers=dict(Authorization='Bearer ' + token.access_token),
expected_code=201) expected_code=201)
self.assertEquals('"Created"', data) self.assertEquals('"Created"', data.strip())
class TestGetOrganization(ApiTestCase): class TestGetOrganization(ApiTestCase):