Merge remote-tracking branch 'origin/master' into rockyhorror
This commit is contained in:
commit
b154e7acef
83 changed files with 5630 additions and 589 deletions
|
@ -3576,7 +3576,7 @@ class TestSuperUserLogs(ApiTestCase):
|
|||
self._set_url(SuperUserLogs)
|
||||
|
||||
def test_get_anonymous(self):
|
||||
self._run_test('GET', 403, None, None)
|
||||
self._run_test('GET', 401, None, None)
|
||||
|
||||
def test_get_freshuser(self):
|
||||
self._run_test('GET', 403, 'freshuser', None)
|
||||
|
|
96
test/test_imagetree.py
Normal file
96
test/test_imagetree.py
Normal file
|
@ -0,0 +1,96 @@
|
|||
import unittest
|
||||
|
||||
from app import app
|
||||
from util.imagetree import ImageTree
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from data import model
|
||||
|
||||
NAMESPACE = 'devtable'
|
||||
SIMPLE_REPO = 'simple'
|
||||
COMPLEX_REPO = 'complex'
|
||||
|
||||
class TestImageTree(unittest.TestCase):
|
||||
def setUp(self):
|
||||
setup_database_for_testing(self)
|
||||
self.app = app.test_client()
|
||||
self.ctx = app.test_request_context()
|
||||
self.ctx.__enter__()
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
self.ctx.__exit__(True, None, None)
|
||||
|
||||
def _get_base_image(self, all_images):
|
||||
for image in all_images:
|
||||
if image.ancestors == '/':
|
||||
return image
|
||||
|
||||
return None
|
||||
|
||||
def test_longest_path_simple_repo(self):
|
||||
all_images = list(model.get_repository_images(NAMESPACE, SIMPLE_REPO))
|
||||
all_tags = list(model.list_repository_tags(NAMESPACE, SIMPLE_REPO))
|
||||
tree = ImageTree(all_images, all_tags)
|
||||
|
||||
base_image = self._get_base_image(all_images)
|
||||
tag_image = all_tags[0].image
|
||||
|
||||
def checker(index, image):
|
||||
return True
|
||||
|
||||
ancestors = tag_image.ancestors.split('/')[2:-1] # Skip the first image.
|
||||
result = tree.find_longest_path(base_image.id, checker)
|
||||
self.assertEquals(3, len(result))
|
||||
for index in range(0, 2):
|
||||
self.assertEquals(int(ancestors[index]), result[index].id)
|
||||
|
||||
self.assertEquals('latest', tree.tag_containing_image(result[-1]))
|
||||
|
||||
def test_longest_path_complex_repo(self):
|
||||
all_images = list(model.get_repository_images(NAMESPACE, COMPLEX_REPO))
|
||||
all_tags = list(model.list_repository_tags(NAMESPACE, COMPLEX_REPO))
|
||||
tree = ImageTree(all_images, all_tags)
|
||||
|
||||
base_image = self._get_base_image(all_images)
|
||||
|
||||
def checker(index, image):
|
||||
return True
|
||||
|
||||
result = tree.find_longest_path(base_image.id, checker)
|
||||
self.assertEquals(4, len(result))
|
||||
self.assertEquals('v2.0', tree.tag_containing_image(result[-1]))
|
||||
|
||||
def test_filtering(self):
|
||||
all_images = list(model.get_repository_images(NAMESPACE, COMPLEX_REPO))
|
||||
all_tags = list(model.list_repository_tags(NAMESPACE, COMPLEX_REPO))
|
||||
tree = ImageTree(all_images, all_tags, base_filter=1245)
|
||||
|
||||
base_image = self._get_base_image(all_images)
|
||||
|
||||
def checker(index, image):
|
||||
return True
|
||||
|
||||
result = tree.find_longest_path(base_image.id, checker)
|
||||
self.assertEquals(0, len(result))
|
||||
|
||||
def test_find_tag_parent_image(self):
|
||||
all_images = list(model.get_repository_images(NAMESPACE, COMPLEX_REPO))
|
||||
all_tags = list(model.list_repository_tags(NAMESPACE, COMPLEX_REPO))
|
||||
tree = ImageTree(all_images, all_tags)
|
||||
|
||||
base_image = self._get_base_image(all_images)
|
||||
|
||||
def checker(index, image):
|
||||
return True
|
||||
|
||||
result = tree.find_longest_path(base_image.id, checker)
|
||||
self.assertEquals(4, len(result))
|
||||
|
||||
# Only use the first two images. They don't have tags, but the method should
|
||||
# still return the tag that contains them.
|
||||
self.assertEquals('v2.0', tree.tag_containing_image(result[0]))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
186
test/test_suconfig_api.py
Normal file
186
test/test_suconfig_api.py
Normal file
|
@ -0,0 +1,186 @@
|
|||
from test.test_api_usage import ApiTestCase, READ_ACCESS_USER, ADMIN_ACCESS_USER
|
||||
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile,
|
||||
SuperUserCreateInitialSuperUser, SuperUserConfigValidate)
|
||||
from app import CONFIG_PROVIDER
|
||||
from data.database import User
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
class ConfigForTesting(object):
|
||||
|
||||
def __enter__(self):
|
||||
CONFIG_PROVIDER.reset_for_test()
|
||||
return CONFIG_PROVIDER
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
CONFIG_PROVIDER.reset_for_test()
|
||||
|
||||
|
||||
class TestSuperUserRegistryStatus(ApiTestCase):
|
||||
def test_registry_status(self):
|
||||
with ConfigForTesting():
|
||||
json = self.getJsonResponse(SuperUserRegistryStatus)
|
||||
self.assertEquals('config-db', json['status'])
|
||||
|
||||
|
||||
class TestSuperUserConfigFile(ApiTestCase):
|
||||
def test_get_non_superuser(self):
|
||||
with ConfigForTesting():
|
||||
# No user.
|
||||
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
||||
|
||||
# Non-superuser.
|
||||
self.login(READ_ACCESS_USER)
|
||||
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
||||
|
||||
def test_get_superuser_invalid_filename(self):
|
||||
with ConfigForTesting():
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
self.getResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
|
||||
|
||||
def test_get_superuser(self):
|
||||
with ConfigForTesting():
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
result = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
|
||||
self.assertFalse(result['exists'])
|
||||
|
||||
def test_post_non_superuser(self):
|
||||
with ConfigForTesting():
|
||||
# No user.
|
||||
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
||||
|
||||
# Non-superuser.
|
||||
self.login(READ_ACCESS_USER)
|
||||
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
|
||||
|
||||
def test_post_superuser_invalid_filename(self):
|
||||
with ConfigForTesting():
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
self.postResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
|
||||
|
||||
def test_post_superuser(self):
|
||||
with ConfigForTesting():
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400)
|
||||
|
||||
|
||||
class TestSuperUserCreateInitialSuperUser(ApiTestCase):
|
||||
def test_no_config_file(self):
|
||||
with ConfigForTesting():
|
||||
# If there is no config.yaml, then this method should security fail.
|
||||
data = dict(username='cooluser', password='password', email='fake@example.com')
|
||||
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
|
||||
|
||||
def test_config_file_with_db_users(self):
|
||||
with ConfigForTesting():
|
||||
# Write some config.
|
||||
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
||||
|
||||
# If there is a config.yaml, but existing DB users exist, then this method should security
|
||||
# fail.
|
||||
data = dict(username='cooluser', password='password', email='fake@example.com')
|
||||
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
|
||||
|
||||
def test_config_file_with_no_db_users(self):
|
||||
with ConfigForTesting():
|
||||
# Write some config.
|
||||
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
||||
|
||||
# Delete all the users in the DB.
|
||||
for user in list(User.select()):
|
||||
user.delete_instance(recursive=True)
|
||||
|
||||
# This method should now succeed.
|
||||
data = dict(username='cooluser', password='password', email='fake@example.com')
|
||||
result = self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data)
|
||||
self.assertTrue(result['status'])
|
||||
|
||||
# Verify the superuser was created.
|
||||
User.get(User.username == 'cooluser')
|
||||
|
||||
# Verify the superuser was placed into the config.
|
||||
result = self.getJsonResponse(SuperUserConfig)
|
||||
self.assertEquals(['cooluser'], result['config']['SUPER_USERS'])
|
||||
|
||||
|
||||
class TestSuperUserConfigValidate(ApiTestCase):
|
||||
def test_nonsuperuser_noconfig(self):
|
||||
with ConfigForTesting():
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
|
||||
data=dict(config={}))
|
||||
|
||||
self.assertFalse(result['status'])
|
||||
|
||||
|
||||
def test_nonsuperuser_config(self):
|
||||
with ConfigForTesting():
|
||||
# The validate config call works if there is no config.yaml OR the user is a superuser.
|
||||
# Add a config, and verify it breaks when unauthenticated.
|
||||
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
||||
self.assertTrue(json['exists'])
|
||||
|
||||
self.postResponse(SuperUserConfigValidate, params=dict(service='someservice'),
|
||||
data=dict(config={}),
|
||||
expected_code=403)
|
||||
|
||||
# Now login as a superuser.
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
|
||||
data=dict(config={}))
|
||||
|
||||
self.assertFalse(result['status'])
|
||||
|
||||
|
||||
class TestSuperUserConfig(ApiTestCase):
|
||||
def test_get_non_superuser(self):
|
||||
with ConfigForTesting():
|
||||
# No user.
|
||||
self.getResponse(SuperUserConfig, expected_code=401)
|
||||
|
||||
# Non-superuser.
|
||||
self.login(READ_ACCESS_USER)
|
||||
self.getResponse(SuperUserConfig, expected_code=403)
|
||||
|
||||
def test_get_superuser(self):
|
||||
with ConfigForTesting():
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
json = self.getJsonResponse(SuperUserConfig)
|
||||
|
||||
# Note: We expect the config to be none because a config.yaml should never be checked into
|
||||
# the directory.
|
||||
self.assertIsNone(json['config'])
|
||||
|
||||
def test_put(self):
|
||||
with ConfigForTesting() as config:
|
||||
# The update config call works if there is no config.yaml OR the user is a superuser. First
|
||||
# try writing it without a superuser present.
|
||||
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
|
||||
self.assertTrue(json['exists'])
|
||||
|
||||
# Verify the config file exists.
|
||||
self.assertTrue(config.yaml_exists())
|
||||
|
||||
# Try writing it again. This should now fail, since the config.yaml exists.
|
||||
self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403)
|
||||
|
||||
# Login as a non-superuser.
|
||||
self.login(READ_ACCESS_USER)
|
||||
|
||||
# Try writing it again. This should fail.
|
||||
self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403)
|
||||
|
||||
# Login as a superuser.
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
# This should succeed.
|
||||
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'))
|
||||
self.assertTrue(json['exists'])
|
||||
|
||||
json = self.getJsonResponse(SuperUserConfig)
|
||||
self.assertIsNotNone(json['config'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -19,6 +19,7 @@ TEST_DB_FILE = NamedTemporaryFile(delete=True)
|
|||
|
||||
class TestConfig(DefaultConfig):
|
||||
TESTING = True
|
||||
SECRET_KEY = 'a36c9d7d-25a9-4d3f-a586-3d2f8dc40a83'
|
||||
|
||||
DB_URI = os.environ.get('TEST_DATABASE_URI', 'sqlite:///{0}'.format(TEST_DB_FILE.name))
|
||||
DB_CONNECTION_ARGS = {
|
||||
|
|
Reference in a new issue