diff --git a/auth/test/test_signedgrant.py b/auth/test/test_signedgrant.py index 8eacbfac6..e200f0bf1 100644 --- a/auth/test/test_signedgrant.py +++ b/auth/test/test_signedgrant.py @@ -5,16 +5,28 @@ from auth.validateresult import AuthKind, ValidateResult @pytest.mark.parametrize('header, expected_result', [ - ('', ValidateResult(AuthKind.signed_grant, missing=True)), - ('somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)), - ('token somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)), - ('token ' + SIGNATURE_PREFIX + 'foo', - ValidateResult(AuthKind.signed_grant, error_message='Signed grant could not be validated')), - ('token ' + generate_signed_token({ - 'a': 'b'}, {'c': 'd'}), ValidateResult(AuthKind.signed_grant, signed_data={ - 'grants': { - 'a': 'b'}, - 'user_context': { - 'c': 'd'}})),]) + pytest.param('', ValidateResult(AuthKind.signed_grant, missing=True), id='Missing'), + pytest.param('somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True), + id='Invalid header'), + pytest.param('token somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True), + id='Random Token'), + pytest.param('token ' + SIGNATURE_PREFIX + 'foo', + ValidateResult(AuthKind.signed_grant, + error_message='Signed grant could not be validated'), + id='Invalid token'), +]) def test_token(header, expected_result): assert validate_signed_grant(header) == expected_result + + +def test_valid_grant(): + header = 'token ' + generate_signed_token({'a': 'b'}, {'c': 'd'}) + expected = ValidateResult(AuthKind.signed_grant, signed_data={ + 'grants': { + 'a': 'b', + }, + 'user_context': { + 'c': 'd' + }, + }) + assert validate_signed_grant(header) == expected diff --git a/buildtrigger/test/test_githosthandler.py b/buildtrigger/test/test_githosthandler.py index 53005d4c0..e7fbbeeed 100644 --- a/buildtrigger/test/test_githosthandler.py +++ b/buildtrigger/test/test_githosthandler.py @@ -116,10 +116,7 @@ def test_list_build_sources_for_namespace(namespace, expected, githost_trigger): assert githost_trigger.list_build_sources_for_namespace(namespace) == expected -def test_activate(githost_trigger): +def test_activate_and_deactivate(githost_trigger): _, private_key = githost_trigger.activate('http://some/url') assert 'private_key' in private_key - - -def test_deactivate(githost_trigger): githost_trigger.deactivate() diff --git a/test/registry/liveserverfixture.py b/test/registry/liveserverfixture.py index 18185a5d1..e506ef689 100644 --- a/test/registry/liveserverfixture.py +++ b/test/registry/liveserverfixture.py @@ -75,6 +75,7 @@ class liveFlaskServer(object): raise RuntimeError("Failed to start the server after %d seconds. " % timeout) if self._can_connect(): + self.app.config['SERVER_HOSTNAME'] = 'localhost:%s' % self._port_value.value break def _can_connect(self): diff --git a/test/specs.py b/test/specs.py index 54d6d8c64..0590715a8 100644 --- a/test/specs.py +++ b/test/specs.py @@ -2,7 +2,6 @@ import json import hashlib from flask import url_for -from uuid import uuid4 from base64 import b64encode @@ -27,19 +26,19 @@ ORG_OWNERS = 'owners' ORG_READERS = 'readers' FAKE_MANIFEST = 'unknown_tag' -FAKE_DIGEST = 'sha256:' + hashlib.sha256(str(uuid4())).hexdigest() -FAKE_IMAGE_ID = str(uuid4()) -FAKE_UPLOAD_ID = str(uuid4()) -FAKE_TAG_NAME = str(uuid4()) -FAKE_USERNAME = str(uuid4()) -FAKE_TOKEN = str(uuid4()) -FAKE_WEBHOOK = str(uuid4()) +FAKE_DIGEST = 'sha256:' + hashlib.sha256('fake').hexdigest() +FAKE_IMAGE_ID = 'fake-image' +FAKE_UPLOAD_ID = 'fake-upload' +FAKE_TAG_NAME = 'fake-tag' +FAKE_USERNAME = 'fakeuser' +FAKE_TOKEN = 'fake-token' +FAKE_WEBHOOK = 'fake-webhook' BUILD_UUID = '123' TRIGGER_UUID = '123' NEW_ORG_REPO_DETAILS = { - 'repository': str(uuid4()), + 'repository': 'fake-repository', 'visibility': 'private', 'description': '', 'namespace': ORG, @@ -69,7 +68,7 @@ CHANGE_PERMISSION_DETAILS = { } CREATE_BUILD_DETAILS = { - 'file_id': str(uuid4()), + 'file_id': 'fake-file-id', } CHANGE_VISIBILITY_DETAILS = { diff --git a/test/test_storageproxy.py b/test/test_storageproxy.py index 174c3c854..b0a4577d4 100644 --- a/test/test_storageproxy.py +++ b/test/test_storageproxy.py @@ -1,92 +1,95 @@ import os + +import pytest import requests -import unittest from flask import Flask from flask_testing import LiveServerTestCase -from initdb import setup_database_for_testing, finished_database_for_testing from storage import Storage from util.security.instancekeys import InstanceKeys -_PORT_NUMBER = 5001 - -class TestStorageProxy(LiveServerTestCase): - def setUp(self): - setup_database_for_testing(self) - - def tearDown(self): - finished_database_for_testing(self) - - def create_app(self): - global _PORT_NUMBER - _PORT_NUMBER = _PORT_NUMBER + 1 - - self.test_app = Flask('teststorageproxy') - self.test_app.config['LIVESERVER_PORT'] = _PORT_NUMBER - - if os.environ.get('DEBUG') == 'true': - self.test_app.config['DEBUG'] = True - - self.test_app.config['TESTING'] = True - self.test_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER - - self.test_app.config['INSTANCE_SERVICE_KEY_KID_LOCATION'] = 'test/data/test.kid' - self.test_app.config['INSTANCE_SERVICE_KEY_LOCATION'] = 'test/data/test.pem' - self.test_app.config['INSTANCE_SERVICE_KEY_SERVICE'] = 'quay' - - # UGH... Such a stupid hack! - self.test_app.config['FEATURE_PROXY_STORAGE'] = self.id().find('notinstalled') < 0 - - self.test_app.config['DISTRIBUTED_STORAGE_CONFIG'] = { - 'test': ['FakeStorage', {}], - } - - instance_keys = InstanceKeys(self.test_app) - self.storage = Storage(self.test_app, instance_keys=instance_keys) - self.test_app.config['DISTRIBUTED_STORAGE_PREFERENCE'] = ['test'] - return self.test_app - - @unittest.skipIf(os.environ.get('TEST_DATABASE_URI'), "not supported for non SQLite testing") - def test_storage_proxy_auth_notinstalled(self): - # Active direct download on the fake storage. - self.storage.put_content(['test'], 'supports_direct_download', 'true') - - # Get the unwrapped URL. - direct_download_url = self.storage.get_direct_download_url(['test'], 'somepath') - self.assertEquals(-1, direct_download_url.find('/_storage_proxy/')) - - # Ensure that auth returns 404. - headers = { - 'X-Original-URI': 'someurihere' - } - - resp = requests.get('http://%s/_storage_proxy_auth' % self.test_app.config['SERVER_HOSTNAME'], - headers=headers) - self.assertEquals(404, resp.status_code) +from test.registry.liveserverfixture import * +from test.fixtures import * - @unittest.skipIf(os.environ.get('TEST_DATABASE_URI'), "not supported for non SQLite testing") - def test_storage_proxy_auth(self): - # Active direct download on the fake storage. - self.storage.put_content(['test'], 'supports_direct_download', 'true') - - # Get the wrapped URL. - direct_download_url = self.storage.get_direct_download_url(['test'], 'somepath') - - # Ensure it refers to the storage proxy. - proxy_index = direct_download_url.find('/_storage_proxy/') - self.assertTrue(proxy_index > 0) - - # Ensure that auth returns 200 for the URL pieces. - headers = { - 'X-Original-URI': direct_download_url[proxy_index:] - } - - resp = requests.get('http://%s/_storage_proxy_auth' % self.test_app.config['SERVER_HOSTNAME'], - headers=headers) - self.assertEquals(200, resp.status_code) +@pytest.fixture(params=[True, False]) +def is_proxying_enabled(request): + return request.param -if __name__ == '__main__': - unittest.main() \ No newline at end of file +@pytest.fixture() +def server_executor(app): + def reload_app(server_hostname): + # Close any existing connection. + close_db_filter(None) + + # Reload the database config. + app.config['SERVER_HOSTNAME'] = server_hostname[len('http://'):] + configure(app.config) + return 'OK' + + executor = LiveServerExecutor() + executor.register('reload_app', reload_app) + return executor + + +@pytest.fixture() +def liveserver_app(app, server_executor, init_db_path, is_proxying_enabled): + server_executor.apply_blueprint_to_app(app) + + if os.environ.get('DEBUG') == 'true': + app.config['DEBUG'] = True + + app.config['TESTING'] = True + app.config['INSTANCE_SERVICE_KEY_KID_LOCATION'] = 'test/data/test.kid' + app.config['INSTANCE_SERVICE_KEY_LOCATION'] = 'test/data/test.pem' + app.config['INSTANCE_SERVICE_KEY_SERVICE'] = 'quay' + + app.config['FEATURE_PROXY_STORAGE'] = is_proxying_enabled + + app.config['DISTRIBUTED_STORAGE_CONFIG'] = { + 'test': ['FakeStorage', {}], + } + app.config['DISTRIBUTED_STORAGE_PREFERENCE'] = ['test'] + return app + + +@pytest.fixture() +def instance_keys(liveserver_app): + return InstanceKeys(liveserver_app) + + +@pytest.fixture() +def storage(liveserver_app, instance_keys): + return Storage(liveserver_app, instance_keys=instance_keys) + + +@pytest.fixture() +def app_reloader(liveserver, server_executor): + server_executor.on(liveserver).reload_app(liveserver.url) + yield + + +@pytest.mark.skipif(os.environ.get('TEST_DATABASE_URI'), + reason="not supported for non SQLite testing") +def test_storage_proxy_auth(storage, liveserver_app, liveserver_session, is_proxying_enabled, + app_reloader): + # Activate direct download on the fake storage. + storage.put_content(['test'], 'supports_direct_download', 'true') + + # Get the unwrapped URL. + direct_download_url = storage.get_direct_download_url(['test'], 'somepath') + proxy_index = direct_download_url.find('/_storage_proxy/') + if is_proxying_enabled: + assert proxy_index >= 0 + else: + assert proxy_index == -1 + + # Ensure that auth returns the expected value. + headers = { + 'X-Original-URI': direct_download_url[proxy_index:] if proxy_index else 'someurihere' + } + + resp = liveserver_session.get('_storage_proxy_auth', headers=headers) + assert resp.status_code == (404 if not is_proxying_enabled else 200) diff --git a/test/test_storagereplication.py b/test/test_storagereplication.py index fd3b679e8..c5c6c3ad9 100644 --- a/test/test_storagereplication.py +++ b/test/test_storagereplication.py @@ -1,66 +1,69 @@ -import unittest import hashlib +import pytest + from app import storage -from initdb import setup_database_for_testing, finished_database_for_testing from data import model, database -from workers.storagereplication import StorageReplicationWorker from storage.basestorage import StoragePaths +from workers.storagereplication import StorageReplicationWorker -class TestStorageReplication(unittest.TestCase): - def setUp(self): - setup_database_for_testing(self) - - self.worker = StorageReplicationWorker(None) - self.paths = StoragePaths() - - # Add both regions for a user. - self.user = model.user.get_user('devtable') - database.UserRegion.create(user=self.user, - location=database.ImageStorageLocation.get(name='local_us')) - database.UserRegion.create(user=self.user, - location=database.ImageStorageLocation.get(name='local_eu')) - - def tearDown(self): - finished_database_for_testing(self) - - def test_storage_replication_v1(self): - # Add a storage entry with a V1 path. - v1_storage = model.storage.create_v1_storage('local_us') - content_path = self.paths.v1_image_layer_path(v1_storage.uuid) - storage.put_content(['local_us'], content_path, 'some content') - - # Call replicate on it and verify it replicates. - result = self.worker.replicate_storage(self.user, v1_storage.uuid) - self.assertTrue(result) - - # Ensure that the data was replicated to the other "region". - self.assertEquals('some content', storage.get_content(['local_eu'], content_path)) - - locations = model.storage.get_storage_locations(v1_storage.uuid) - self.assertEquals(2, len(locations)) - - def test_storage_replication_cas(self): - # Add a storage entry with a CAS path. - content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() - cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) - - location = database.ImageStorageLocation.get(name='local_us') - database.ImageStoragePlacement.create(storage=cas_storage, location=location) - - content_path = self.paths.blob_path(cas_storage.content_checksum) - storage.put_content(['local_us'], content_path, 'some content') - - # Call replicate on it and verify it replicates. - result = self.worker.replicate_storage(self.user, cas_storage.uuid) - self.assertTrue(result) - - # Ensure that the data was replicated to the other "region". - self.assertEquals('some content', storage.get_content(['local_eu'], content_path)) - - locations = model.storage.get_storage_locations(cas_storage.uuid) - self.assertEquals(2, len(locations)) +from test.fixtures import * -if __name__ == '__main__': - unittest.main() +@pytest.fixture() +def storage_user(app): + user = model.user.get_user('devtable') + database.UserRegion.create(user=user, + location=database.ImageStorageLocation.get(name='local_us')) + database.UserRegion.create(user=user, + location=database.ImageStorageLocation.get(name='local_eu')) + return user + + +@pytest.fixture() +def storage_paths(): + return StoragePaths() + + +@pytest.fixture() +def replication_worker(): + return StorageReplicationWorker(None) + + +def test_storage_replication_v1(storage_user, storage_paths, replication_worker, app): + # Add a storage entry with a V1 path. + v1_storage = model.storage.create_v1_storage('local_us') + content_path = storage_paths.v1_image_layer_path(v1_storage.uuid) + storage.put_content(['local_us'], content_path, 'some content') + + # Call replicate on it and verify it replicates. + result = replication_worker.replicate_storage(storage_user, v1_storage.uuid) + assert result + + # Ensure that the data was replicated to the other "region". + assert storage.get_content(['local_eu'], content_path) == 'some content' + + locations = model.storage.get_storage_locations(v1_storage.uuid) + assert len(locations) == 2 + + +def test_storage_replication_cas(storage_user, storage_paths, replication_worker, app): + # Add a storage entry with a CAS path. + content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() + cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) + + location = database.ImageStorageLocation.get(name='local_us') + database.ImageStoragePlacement.create(storage=cas_storage, location=location) + + content_path = storage_paths.blob_path(cas_storage.content_checksum) + storage.put_content(['local_us'], content_path, 'some content') + + # Call replicate on it and verify it replicates. + result = replication_worker.replicate_storage(storage_user, cas_storage.uuid) + assert result + + # Ensure that the data was replicated to the other "region". + assert storage.get_content(['local_eu'], content_path) == 'some content' + + locations = model.storage.get_storage_locations(cas_storage.uuid) + assert len(locations) == 2