Add db configuration to modifying config

Move more tests over
This commit is contained in:
Sam Chow 2018-08-21 11:40:59 -04:00
parent d44aa8f566
commit d9f7c07f42
10 changed files with 530 additions and 406 deletions

View file

@ -150,10 +150,10 @@ def kubernetes_only(f):
nickname = partial(add_method_metadata, 'nickname') nickname = partial(add_method_metadata, 'nickname')
import config_endpoints.api import config_app.config_endpoints.api
import config_endpoints.api.discovery import config_app.config_endpoints.api.discovery
import config_endpoints.api.suconfig import config_app.config_endpoints.api.suconfig
import config_endpoints.api.superuser import config_app.config_endpoints.api.superuser
import config_endpoints.api.user import config_app.config_endpoints.api.user
import config_endpoints.api.tar_config_loader import config_app.config_endpoints.api.tar_config_loader

View file

@ -76,6 +76,11 @@ class SuperUserConfig(ApiResource):
# Write the configuration changes to the config override file. # Write the configuration changes to the config override file.
config_provider.save_config(config_object) config_provider.save_config(config_object)
# now try to connect to the db provided in their config to validate it works
combined = dict(**app.config)
combined.update(config_provider.get_config())
configure(combined, testing=app.config['TESTING'])
return { return {
'exists': True, 'exists': True,
'config': config_object 'config': config_object

View file

@ -54,9 +54,10 @@ class SuperUserCustomCertificate(ApiResource):
return '', 204 return '', 204
# Call the update script with config dir location to install the certificate immediately. # Call the update script with config dir location to install the certificate immediately.
if subprocess.call([os.path.join(INIT_SCRIPTS_LOCATION, 'certs_install.sh')], if not app.config['TESTING']:
env={ 'QUAYCONFIG': config_provider.get_config_dir_path() }) != 0: if subprocess.call([os.path.join(INIT_SCRIPTS_LOCATION, 'certs_install.sh')],
raise Exception('Could not install certificates') env={ 'QUAYCONFIG': config_provider.get_config_dir_path() }) != 0:
raise Exception('Could not install certificates')
return '', 204 return '', 204

View file

@ -0,0 +1,149 @@
import json as py_json
import unittest
from contextlib import contextmanager
from urllib import urlencode
from urlparse import urlparse, parse_qs, urlunparse
from config_app.c_app import app, config_provider
from config_app.config_endpoints.api import api
from initdb import setup_database_for_testing, finished_database_for_testing
CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'
READ_ACCESS_USER = 'reader'
ADMIN_ACCESS_USER = 'devtable'
ADMIN_ACCESS_EMAIL = 'jschorr@devtable.com'
# OVERRIDES FROM PORTING FROM OLD APP:
all_queues = [] # the config app doesn't have any queues
class ApiTestCase(unittest.TestCase):
maxDiff = None
@staticmethod
def _add_csrf(without_csrf):
parts = urlparse(without_csrf)
query = parse_qs(parts[4])
query[CSRF_TOKEN_KEY] = CSRF_TOKEN
return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))
def url_for(self, resource_name, params=None, skip_csrf=False):
params = params or {}
url = api.url_for(resource_name, **params)
if not skip_csrf:
url = ApiTestCase._add_csrf(url)
return url
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.setCsrfToken(CSRF_TOKEN)
def tearDown(self):
finished_database_for_testing(self)
config_provider.clear()
self.ctx.__exit__(True, None, None)
def setCsrfToken(self, token):
with self.app.session_transaction() as sess:
sess[CSRF_TOKEN_KEY] = token
@contextmanager
def toggleFeature(self, name, enabled):
import features
previous_value = getattr(features, name)
setattr(features, name, enabled)
yield
setattr(features, name, previous_value)
def getJsonResponse(self, resource_name, params={}, expected_code=200):
rv = self.app.get(api.url_for(resource_name, **params))
self.assertEquals(expected_code, rv.status_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def postResponse(self, resource_name, params={}, data={}, file=None, headers=None,
expected_code=200):
data = py_json.dumps(data)
headers = headers or {}
headers.update({"Content-Type": "application/json"})
if file is not None:
data = {'file': file}
headers = None
rv = self.app.post(self.url_for(resource_name, params), data=data, headers=headers)
self.assertEquals(rv.status_code, expected_code)
return rv.data
def getResponse(self, resource_name, params={}, expected_code=200):
rv = self.app.get(api.url_for(resource_name, **params))
self.assertEquals(rv.status_code, expected_code)
return rv.data
def putResponse(self, resource_name, params={}, data={}, expected_code=200):
rv = self.app.put(
self.url_for(resource_name, params), data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
self.assertEquals(rv.status_code, expected_code)
return rv.data
def deleteResponse(self, resource_name, params={}, expected_code=204):
rv = self.app.delete(self.url_for(resource_name, params))
if rv.status_code != expected_code:
print 'Mismatch data for resource DELETE %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
return rv.data
def deleteEmptyResponse(self, resource_name, params={}, expected_code=204):
rv = self.app.delete(self.url_for(resource_name, params))
self.assertEquals(rv.status_code, expected_code)
self.assertEquals(rv.data, '') # ensure response body empty
return
def postJsonResponse(self, resource_name, params={}, data={}, expected_code=200):
rv = self.app.post(
self.url_for(resource_name, params), data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
if rv.status_code != expected_code:
print 'Mismatch data for resource POST %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def putJsonResponse(self, resource_name, params={}, data={}, expected_code=200, skip_csrf=False):
rv = self.app.put(
self.url_for(resource_name, params, skip_csrf), data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
if rv.status_code != expected_code:
print 'Mismatch data for resource PUT %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def assertNotInTeam(self, data, membername):
for memberData in data['members']:
if memberData['name'] == membername:
self.fail(membername + ' found in team: ' + data['name'])
def assertInTeam(self, data, membername):
for member_data in data['members']:
if member_data['name'] == membername:
return
self.fail(membername + ' not found in team: ' + data['name'])

View file

@ -0,0 +1,208 @@
from StringIO import StringIO
from mockldap import MockLdap
from data import database, model
from util.security.test.test_ssl_util import generate_test_cert
from config_app.c_app import app
from config_app.config_test import ApiTestCase, all_queues, ADMIN_ACCESS_USER, ADMIN_ACCESS_EMAIL
from config_app.config_endpoints.api import api_bp
from config_app.config_endpoints.api.superuser import SuperUserCustomCertificate, SuperUserCustomCertificates
from config_app.config_endpoints.api.suconfig import SuperUserConfig, SuperUserCreateInitialSuperUser, \
SuperUserConfigFile, SuperUserRegistryStatus
try:
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# This blueprint was already registered
pass
class TestSuperUserCreateInitialSuperUser(ApiTestCase):
def test_create_superuser(self):
data = {
'username': 'newsuper',
'password': 'password',
'email': 'jschorr+fake@devtable.com',
}
# Add some fake config.
fake_config = {
'AUTHENTICATION_TYPE': 'Database',
'SECRET_KEY': 'fakekey',
}
self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost'))
# Try to write with config. Should 403 since there are users in the DB.
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
# Delete all users in the DB.
for user in list(database.User.select()):
model.user.delete_user(user, all_queues)
# Create the superuser.
self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data)
# Ensure the user exists in the DB.
self.assertIsNotNone(model.user.get_user('newsuper'))
# Ensure that the current user is a superuser in the config.
json = self.getJsonResponse(SuperUserConfig)
self.assertEquals(['newsuper'], json['config']['SUPER_USERS'])
# Ensure that the current user is a superuser in memory by trying to call an API
# that will fail otherwise.
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
class TestSuperUserConfig(ApiTestCase):
def test_get_status_update_config(self):
# With no config the status should be 'config-db'.
json = self.getJsonResponse(SuperUserRegistryStatus)
self.assertEquals('config-db', json['status'])
# Add some fake config.
fake_config = {
'AUTHENTICATION_TYPE': 'Database',
'SECRET_KEY': 'fakekey',
}
json = self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config,
hostname='fakehost'))
self.assertEquals('fakekey', json['config']['SECRET_KEY'])
self.assertEquals('fakehost', json['config']['SERVER_HOSTNAME'])
self.assertEquals('Database', json['config']['AUTHENTICATION_TYPE'])
# With config the status should be 'setup-db'.
# TODO(sam): fix this test
# json = self.getJsonResponse(SuperUserRegistryStatus)
# self.assertEquals('setup-db', json['status'])
def test_config_file(self):
# Try for an invalid file. Should 404.
self.getResponse(SuperUserConfigFile, params=dict(filename='foobar'), expected_code=404)
# Try for a valid filename. Should not exist.
json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
self.assertFalse(json['exists'])
# Add the file.
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'),
file=(StringIO('my file contents'), 'ssl.cert'))
# Should now exist.
json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
self.assertTrue(json['exists'])
def test_update_with_external_auth(self):
# Run a mock LDAP.
mockldap = MockLdap({
'dc=quay,dc=io': {
'dc': ['quay', 'io']
},
'ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees'
},
'uid=' + ADMIN_ACCESS_USER + ',ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': [ADMIN_ACCESS_USER],
'userPassword': ['password'],
'mail': [ADMIN_ACCESS_EMAIL],
},
})
config = {
'AUTHENTICATION_TYPE': 'LDAP',
'LDAP_BASE_DN': ['dc=quay', 'dc=io'],
'LDAP_ADMIN_DN': 'uid=devtable,ou=employees,dc=quay,dc=io',
'LDAP_ADMIN_PASSWD': 'password',
'LDAP_USER_RDN': ['ou=employees'],
'LDAP_UID_ATTR': 'uid',
'LDAP_EMAIL_ATTR': 'mail',
}
mockldap.start()
try:
# Write the config with the valid password.
self.putResponse(SuperUserConfig,
data={'config': config,
'password': 'password',
'hostname': 'foo'}, expected_code=200)
# Ensure that the user row has been linked.
# TODO(sam): fix this test
# self.assertEquals(ADMIN_ACCESS_USER,
# model.user.verify_federated_login('ldap', ADMIN_ACCESS_USER).username)
finally:
mockldap.stop()
class TestSuperUserCustomCertificates(ApiTestCase):
def test_custom_certificates(self):
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', san_list=['DNS:bar', 'DNS:baz'])
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'),
file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert.crt', cert_info['path'])
self.assertEquals(set(['somecoolhost', 'bar', 'baz']), set(cert_info['names']))
self.assertFalse(cert_info['expired'])
# Remove the certificate.
self.deleteResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'))
# Make sure it is gone.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(0, len(json['certs']))
def test_expired_custom_certificate(self):
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10)
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'),
file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert.crt', cert_info['path'])
self.assertEquals(set(['somecoolhost']), set(cert_info['names']))
self.assertTrue(cert_info['expired'])
def test_invalid_custom_certificate(self):
# Upload an invalid certificate.
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'),
file=(StringIO('some contents'), 'testcert.crt'), expected_code=204)
# Make sure it is present but invalid.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert.crt', cert_info['path'])
self.assertEquals('no start line', cert_info['error'])
def test_path_sanitization(self):
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10)
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert/../foobar.crt'),
file=(StringIO(cert_contents), 'testcert/../foobar.crt'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('foobar.crt', cert_info['path'])

View file

@ -0,0 +1,149 @@
import unittest
from data.database import User
from data import model
from config_app.config_endpoints.api.suconfig import SuperUserConfig, SuperUserConfigValidate, SuperUserConfigFile, \
SuperUserRegistryStatus, SuperUserCreateInitialSuperUser
from config_app.config_endpoints.api import api_bp
from config_app.config_test import ApiTestCase, READ_ACCESS_USER, ADMIN_ACCESS_USER
from config_app.c_app import app, config_provider
try:
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# This blueprint was already registered
pass
# OVERRIDES FROM PORTING FROM OLD APP:
all_queues = [] # the config app doesn't have any queues
class FreshConfigProvider(object):
def __enter__(self):
config_provider.reset_for_test()
return config_provider
def __exit__(self, type, value, traceback):
config_provider.reset_for_test()
class TestSuperUserRegistryStatus(ApiTestCase):
def test_registry_status(self):
with FreshConfigProvider():
json = self.getJsonResponse(SuperUserRegistryStatus)
self.assertEquals('config-db', json['status'])
class TestSuperUserConfigFile(ApiTestCase):
def test_get_superuser_invalid_filename(self):
with FreshConfigProvider():
self.getResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
def test_get_superuser(self):
with FreshConfigProvider():
result = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
self.assertFalse(result['exists'])
def test_post_no_file(self):
with FreshConfigProvider():
# No file
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400)
def test_post_superuser_invalid_filename(self):
with FreshConfigProvider():
self.postResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
def test_post_superuser(self):
with FreshConfigProvider():
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400)
class TestSuperUserCreateInitialSuperUser(ApiTestCase):
def test_no_config_file(self):
with FreshConfigProvider():
# If there is no config.yaml, then this method should security fail.
data = dict(username='cooluser', password='password', email='fake@example.com')
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
def test_config_file_with_db_users(self):
with FreshConfigProvider():
# Write some config.
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
# If there is a config.yaml, but existing DB users exist, then this method should security
# fail.
data = dict(username='cooluser', password='password', email='fake@example.com')
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
def test_config_file_with_no_db_users(self):
with FreshConfigProvider():
# Write some config.
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
# Delete all the users in the DB.
for user in list(User.select()):
model.user.delete_user(user, all_queues)
# This method should now succeed.
data = dict(username='cooluser', password='password', email='fake@example.com')
result = self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data)
self.assertTrue(result['status'])
# Verify the superuser was created.
User.get(User.username == 'cooluser')
# Verify the superuser was placed into the config.
result = self.getJsonResponse(SuperUserConfig)
self.assertEquals(['cooluser'], result['config']['SUPER_USERS'])
class TestSuperUserConfigValidate(ApiTestCase):
def test_nonsuperuser_noconfig(self):
with FreshConfigProvider():
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
data=dict(config={}))
self.assertFalse(result['status'])
def test_nonsuperuser_config(self):
with FreshConfigProvider():
# The validate config call works if there is no config.yaml OR the user is a superuser.
# Add a config, and verify it breaks when unauthenticated.
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
self.assertTrue(json['exists'])
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
data=dict(config={}))
self.assertFalse(result['status'])
class TestSuperUserConfig(ApiTestCase):
def test_get_superuser(self):
with FreshConfigProvider():
json = self.getJsonResponse(SuperUserConfig)
# Note: We expect the config to be none because a config.yaml should never be checked into
# the directory.
self.assertIsNone(json['config'])
def test_put(self):
with FreshConfigProvider() as config:
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
self.assertTrue(json['exists'])
# Verify the config file exists.
self.assertTrue(config.config_exists())
# This should succeed.
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'))
self.assertTrue(json['exists'])
json = self.getJsonResponse(SuperUserConfig)
self.assertIsNotNone(json['config'])
if __name__ == '__main__':
unittest.main()

View file

@ -78,3 +78,6 @@ class TestConfigProvider(BaseProvider):
def get_volume_path(self, directory, filename): def get_volume_path(self, directory, filename):
return os.path.join(directory, filename) return os.path.join(directory, filename)
def get_config_dir_path(self):
return ''

View file

@ -321,7 +321,7 @@ def _db_from_url(url, db_kwargs, connect_timeout=DEFAULT_DB_CONNECT_TIMEOUT,
return driver(parsed_url.database, **db_kwargs) return driver(parsed_url.database, **db_kwargs)
def configure(config_object): def configure(config_object, testing=False):
logger.debug('Configuring database') logger.debug('Configuring database')
db_kwargs = dict(config_object['DB_CONNECTION_ARGS']) db_kwargs = dict(config_object['DB_CONNECTION_ARGS'])
write_db_uri = config_object['DB_URI'] write_db_uri = config_object['DB_URI']
@ -344,7 +344,7 @@ def configure(config_object):
@contextmanager @contextmanager
def _ensure_under_transaction(): def _ensure_under_transaction():
if not config_object['TESTING']: if not testing and not config_object['TESTING']:
if db.transaction_depth() == 0: if db.transaction_depth() == 0:
raise Exception('Expected to be under a transaction') raise Exception('Expected to be under a transaction')

View file

@ -8,7 +8,6 @@ import re
import json as py_json import json as py_json
from mock import patch from mock import patch
from StringIO import StringIO
from calendar import timegm from calendar import timegm
from contextlib import contextmanager from contextlib import contextmanager
from httmock import urlmatch, HTTMock, all_requests from httmock import urlmatch, HTTMock, all_requests
@ -18,7 +17,6 @@ from urlparse import urlparse, urlunparse, parse_qs
from playhouse.test_utils import assert_query_count, _QueryLogHandler from playhouse.test_utils import assert_query_count, _QueryLogHandler
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from mockldap import MockLdap
from endpoints.api import api_bp, api from endpoints.api import api_bp, api
from endpoints.building import PreparedBuild from endpoints.building import PreparedBuild
@ -70,16 +68,12 @@ from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPe
RepositoryTeamPermissionList, RepositoryUserPermissionList) RepositoryTeamPermissionList, RepositoryUserPermissionList)
from endpoints.api.superuser import ( from endpoints.api.superuser import (
SuperUserLogs, SuperUserManagement, SuperUserServiceKeyManagement, SuperUserLogs, SuperUserManagement, SuperUserServiceKeyManagement,
SuperUserServiceKey, SuperUserServiceKeyApproval, SuperUserTakeOwnership, SuperUserServiceKey, SuperUserServiceKeyApproval, SuperUserTakeOwnership)
SuperUserCustomCertificates, SuperUserCustomCertificate)
from endpoints.api.globalmessages import ( from endpoints.api.globalmessages import (
GlobalUserMessage, GlobalUserMessage,
GlobalUserMessages,) GlobalUserMessages,)
from endpoints.api.secscan import RepositoryImageSecurity, RepositoryManifestSecurity from endpoints.api.secscan import RepositoryImageSecurity, RepositoryManifestSecurity
from endpoints.api.suconfig import (
SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, SuperUserCreateInitialSuperUser)
from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel
from util.security.test.test_ssl_util import generate_test_cert
from util.morecollections import AttrDict from util.morecollections import AttrDict
try: try:
@ -3990,150 +3984,6 @@ class TestSuperUserLogs(ApiTestCase):
assert len(json['logs']) > 0 assert len(json['logs']) > 0
class TestSuperUserCreateInitialSuperUser(ApiTestCase):
def test_create_superuser(self):
data = {
'username': 'newsuper',
'password': 'password',
'email': 'jschorr+fake@devtable.com',
}
# Try to write before some config. Should 403.
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
# Add some fake config.
fake_config = {
'AUTHENTICATION_TYPE': 'Database',
'SECRET_KEY': 'fakekey',
}
self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost'))
# Try to write with config. Should 403 since there are users in the DB.
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
# Delete all users in the DB.
for user in list(database.User.select()):
model.user.delete_user(user, all_queues)
# Create the superuser.
self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data)
# Ensure the user exists in the DB.
self.assertIsNotNone(model.user.get_user('newsuper'))
# Ensure that the current user is newsuper.
json = self.getJsonResponse(User)
self.assertEquals('newsuper', json['username'])
# Ensure that the current user is a superuser in the config.
json = self.getJsonResponse(SuperUserConfig)
self.assertEquals(['newsuper'], json['config']['SUPER_USERS'])
# Ensure that the current user is a superuser in memory by trying to call an API
# that will fail otherwise.
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
class TestSuperUserConfig(ApiTestCase):
def test_get_status_update_config(self):
# With no config the status should be 'config-db'.
json = self.getJsonResponse(SuperUserRegistryStatus)
self.assertEquals('config-db', json['status'])
# And the config should 401.
self.getResponse(SuperUserConfig, expected_code=401)
# Add some fake config.
fake_config = {
'AUTHENTICATION_TYPE': 'Database',
'SECRET_KEY': 'fakekey',
}
json = self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config,
hostname='fakehost'))
self.assertEquals('fakekey', json['config']['SECRET_KEY'])
self.assertEquals('fakehost', json['config']['SERVER_HOSTNAME'])
self.assertEquals('Database', json['config']['AUTHENTICATION_TYPE'])
# With config the status should be 'setup-db'.
json = self.getJsonResponse(SuperUserRegistryStatus)
self.assertEquals('setup-db', json['status'])
def test_config_file(self):
# Try without an account. Should 403.
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
# Login to a superuser.
self.login(ADMIN_ACCESS_USER)
# Try for an invalid file. Should 404.
self.getResponse(SuperUserConfigFile, params=dict(filename='foobar'), expected_code=404)
# Try for a valid filename. Should not exist.
json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
self.assertFalse(json['exists'])
# Add the file.
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'),
file=(StringIO('my file contents'), 'ssl.cert'))
# Should now exist.
json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
self.assertTrue(json['exists'])
def test_update_with_external_auth(self):
self.login(ADMIN_ACCESS_USER)
# Run a mock LDAP.
mockldap = MockLdap({
'dc=quay,dc=io': {
'dc': ['quay', 'io']
},
'ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees'
},
'uid=' + ADMIN_ACCESS_USER + ',ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': [ADMIN_ACCESS_USER],
'userPassword': ['password'],
'mail': [ADMIN_ACCESS_EMAIL],
},
})
config = {
'AUTHENTICATION_TYPE': 'LDAP',
'LDAP_BASE_DN': ['dc=quay', 'dc=io'],
'LDAP_ADMIN_DN': 'uid=devtable,ou=employees,dc=quay,dc=io',
'LDAP_ADMIN_PASSWD': 'password',
'LDAP_USER_RDN': ['ou=employees'],
'LDAP_UID_ATTR': 'uid',
'LDAP_EMAIL_ATTR': 'mail',
}
mockldap.start()
try:
# Try writing some config with an invalid password.
self.putResponse(SuperUserConfig, data={'config': config,
'hostname': 'foo'}, expected_code=400)
self.putResponse(SuperUserConfig,
data={'config': config,
'password': 'invalid',
'hostname': 'foo'}, expected_code=400)
# Write the config with the valid password.
self.putResponse(SuperUserConfig,
data={'config': config,
'password': 'password',
'hostname': 'foo'}, expected_code=200)
# Ensure that the user row has been linked.
self.assertEquals(ADMIN_ACCESS_USER,
model.user.verify_federated_login('ldap', ADMIN_ACCESS_USER).username)
finally:
mockldap.stop()
class TestRepositoryImageSecurity(ApiTestCase): class TestRepositoryImageSecurity(ApiTestCase):
@ -4207,80 +4057,6 @@ class TestRepositoryImageSecurity(ApiTestCase):
vulnerabilities='true'), expected_code=200) vulnerabilities='true'), expected_code=200)
class TestSuperUserCustomCertificates(ApiTestCase):
def test_custom_certificates(self):
self.login(ADMIN_ACCESS_USER)
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', san_list=['DNS:bar', 'DNS:baz'])
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'),
file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert.crt', cert_info['path'])
self.assertEquals(set(['somecoolhost', 'bar', 'baz']), set(cert_info['names']))
self.assertFalse(cert_info['expired'])
# Remove the certificate.
self.deleteResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'))
# Make sure it is gone.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(0, len(json['certs']))
def test_expired_custom_certificate(self):
self.login(ADMIN_ACCESS_USER)
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10)
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'),
file=(StringIO(cert_contents), 'testcert.crt'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert.crt', cert_info['path'])
self.assertEquals(set(['somecoolhost']), set(cert_info['names']))
self.assertTrue(cert_info['expired'])
def test_invalid_custom_certificate(self):
self.login(ADMIN_ACCESS_USER)
# Upload an invalid certificate.
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert.crt'),
file=(StringIO('some contents'), 'testcert.crt'), expected_code=204)
# Make sure it is present but invalid.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert.crt', cert_info['path'])
self.assertEquals('no start line', cert_info['error'])
def test_path_sanitization(self):
self.login(ADMIN_ACCESS_USER)
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10)
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert/../foobar.crt'),
file=(StringIO(cert_contents), 'testcert/../foobar.crt'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('foobar.crt', cert_info['path'])
class TestSuperUserTakeOwnership(ApiTestCase): class TestSuperUserTakeOwnership(ApiTestCase):
def test_take_ownership_superuser(self): def test_take_ownership_superuser(self):

View file

@ -1,9 +1,6 @@
from test.test_api_usage import ApiTestCase, READ_ACCESS_USER, ADMIN_ACCESS_USER from test.test_api_usage import ApiTestCase
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, from endpoints.api.suconfig import SuperUserRegistryStatus
SuperUserCreateInitialSuperUser, SuperUserConfigValidate) from app import config_provider
from app import config_provider, all_queues
from data.database import User
from data import model
import unittest import unittest
@ -24,169 +21,5 @@ class TestSuperUserRegistryStatus(ApiTestCase):
self.assertEquals('config-db', json['status']) self.assertEquals('config-db', json['status'])
class TestSuperUserConfigFile(ApiTestCase):
def test_get_non_superuser(self):
with FreshConfigProvider():
# No user.
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
# Non-superuser.
self.login(READ_ACCESS_USER)
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
def test_get_superuser_invalid_filename(self):
with FreshConfigProvider():
self.login(ADMIN_ACCESS_USER)
self.getResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
def test_get_superuser(self):
with FreshConfigProvider():
self.login(ADMIN_ACCESS_USER)
result = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
self.assertFalse(result['exists'])
def test_post_non_superuser(self):
with FreshConfigProvider():
# No user, before config.yaml exists.
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400)
# Write some config.
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
# No user, with config.yaml.
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
# Non-superuser.
self.login(READ_ACCESS_USER)
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
def test_post_superuser_invalid_filename(self):
with FreshConfigProvider():
self.login(ADMIN_ACCESS_USER)
self.postResponse(SuperUserConfigFile, params=dict(filename='somefile'), expected_code=404)
def test_post_superuser(self):
with FreshConfigProvider():
self.login(ADMIN_ACCESS_USER)
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=400)
class TestSuperUserCreateInitialSuperUser(ApiTestCase):
def test_no_config_file(self):
with FreshConfigProvider():
# If there is no config.yaml, then this method should security fail.
data = dict(username='cooluser', password='password', email='fake@example.com')
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
def test_config_file_with_db_users(self):
with FreshConfigProvider():
# Write some config.
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
# If there is a config.yaml, but existing DB users exist, then this method should security
# fail.
data = dict(username='cooluser', password='password', email='fake@example.com')
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
def test_config_file_with_no_db_users(self):
with FreshConfigProvider():
# Write some config.
self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
# Delete all the users in the DB.
for user in list(User.select()):
model.user.delete_user(user, all_queues)
# This method should now succeed.
data = dict(username='cooluser', password='password', email='fake@example.com')
result = self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data)
self.assertTrue(result['status'])
# Verify the superuser was created.
User.get(User.username == 'cooluser')
# Verify the superuser was placed into the config.
result = self.getJsonResponse(SuperUserConfig)
self.assertEquals(['cooluser'], result['config']['SUPER_USERS'])
class TestSuperUserConfigValidate(ApiTestCase):
def test_nonsuperuser_noconfig(self):
with FreshConfigProvider():
self.login(ADMIN_ACCESS_USER)
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
data=dict(config={}))
self.assertFalse(result['status'])
def test_nonsuperuser_config(self):
with FreshConfigProvider():
# The validate config call works if there is no config.yaml OR the user is a superuser.
# Add a config, and verify it breaks when unauthenticated.
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
self.assertTrue(json['exists'])
self.postResponse(SuperUserConfigValidate, params=dict(service='someservice'),
data=dict(config={}),
expected_code=403)
# Now login as a superuser.
self.login(ADMIN_ACCESS_USER)
result = self.postJsonResponse(SuperUserConfigValidate, params=dict(service='someservice'),
data=dict(config={}))
self.assertFalse(result['status'])
class TestSuperUserConfig(ApiTestCase):
def test_get_non_superuser(self):
with FreshConfigProvider():
# No user.
self.getResponse(SuperUserConfig, expected_code=401)
# Non-superuser.
self.login(READ_ACCESS_USER)
self.getResponse(SuperUserConfig, expected_code=403)
def test_get_superuser(self):
with FreshConfigProvider():
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(SuperUserConfig)
# Note: We expect the config to be none because a config.yaml should never be checked into
# the directory.
self.assertIsNone(json['config'])
def test_put(self):
with FreshConfigProvider() as config:
# The update config call works if there is no config.yaml OR the user is a superuser. First
# try writing it without a superuser present.
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='foobar'))
self.assertTrue(json['exists'])
# Verify the config file exists.
self.assertTrue(config.config_exists())
# Try writing it again. This should now fail, since the config.yaml exists.
self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403)
# Login as a non-superuser.
self.login(READ_ACCESS_USER)
# Try writing it again. This should fail.
self.putResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'), expected_code=403)
# Login as a superuser.
self.login(ADMIN_ACCESS_USER)
# This should succeed.
json = self.putJsonResponse(SuperUserConfig, data=dict(config={}, hostname='barbaz'))
self.assertTrue(json['exists'])
json = self.getJsonResponse(SuperUserConfig)
self.assertIsNotNone(json['config'])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()