Custom SSL certificates config panel

Adds a new panel to the superuser config tool, for managing custom SSL certificates in the config bundle

[Delivers #135586525]
This commit is contained in:
Joseph Schorr 2017-01-11 18:45:46 -05:00
parent 773f271daa
commit 7e0fbeb625
14 changed files with 434 additions and 41 deletions

View file

@ -51,7 +51,9 @@ from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserMana
SuperUserOrganizationManagement, SuperUserOrganizationList,
SuperUserAggregateLogs, SuperUserServiceKeyManagement,
SuperUserServiceKey, SuperUserServiceKeyApproval,
SuperUserTakeOwnership, SuperUserLicense)
SuperUserTakeOwnership, SuperUserLicense,
SuperUserCustomCertificates,
SuperUserCustomCertificate)
from endpoints.api.globalmessages import GlobalUserMessage, GlobalUserMessages
from endpoints.api.secscan import RepositoryImageSecurity
from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel
@ -4170,6 +4172,54 @@ class TestSuperUserList(ApiTestCase):
self._run_test('GET', 200, 'devtable', None)
class TestSuperUserCustomCertificates(ApiTestCase):
def setUp(self):
ApiTestCase.setUp(self)
self._set_url(SuperUserCustomCertificates)
def test_get_anonymous(self):
self._run_test('GET', 401, None, None)
def test_get_freshuser(self):
self._run_test('GET', 403, 'freshuser', None)
def test_get_reader(self):
self._run_test('GET', 403, 'reader', None)
def test_get_devtable(self):
self._run_test('GET', 200, 'devtable', None)
class TestSuperUserCustomCertificate(ApiTestCase):
def setUp(self):
ApiTestCase.setUp(self)
self._set_url(SuperUserCustomCertificate, certpath='somecert.crt')
def test_post_anonymous(self):
self._run_test('POST', 401, None, None)
def test_post_freshuser(self):
self._run_test('POST', 403, 'freshuser', None)
def test_post_reader(self):
self._run_test('POST', 403, 'reader', None)
def test_post_devtable(self):
self._run_test('POST', 400, 'devtable', None)
def test_delete_anonymous(self):
self._run_test('DELETE', 401, None, None)
def test_delete_freshuser(self):
self._run_test('DELETE', 403, 'freshuser', None)
def test_delete_reader(self):
self._run_test('DELETE', 403, 'reader', None)
def test_delete_devtable(self):
self._run_test('DELETE', 204, 'devtable', None)
class TestSuperUserLicense(ApiTestCase):
def setUp(self):
ApiTestCase.setUp(self)

View file

@ -66,12 +66,14 @@ from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPe
RepositoryTeamPermissionList, RepositoryUserPermissionList)
from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserManagement,
SuperUserServiceKeyManagement, SuperUserServiceKey,
SuperUserServiceKeyApproval, SuperUserTakeOwnership,)
SuperUserServiceKeyApproval, SuperUserTakeOwnership,
SuperUserCustomCertificates, SuperUserCustomCertificate)
from endpoints.api.globalmessages import (GlobalUserMessage, GlobalUserMessages,)
from endpoints.api.secscan import RepositoryImageSecurity
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile,
SuperUserCreateInitialSuperUser)
from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel
from test.test_ssl_util import generate_test_cert
try:
@ -4240,6 +4242,81 @@ class TestRepositoryImageSecurity(ApiTestCase):
self.assertEquals(1, response['data']['Layer']['IndexedByVersion'])
class TestSuperUserCustomCertificates(ApiTestCase):
def test_custom_certificates(self):
self.login(ADMIN_ACCESS_USER)
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', san_list=['DNS:bar', 'DNS:baz'])
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'),
file=(StringIO(cert_contents), 'testcert'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert', cert_info['path'])
self.assertEquals(set(['somecoolhost', 'bar', 'baz']), set(cert_info['names']))
self.assertFalse(cert_info['expired'])
# Remove the certificate.
self.deleteResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'))
# Make sure it is gone.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(0, len(json['certs']))
def test_expired_custom_certificate(self):
self.login(ADMIN_ACCESS_USER)
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10)
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'),
file=(StringIO(cert_contents), 'testcert'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert', cert_info['path'])
self.assertEquals(set(['somecoolhost']), set(cert_info['names']))
self.assertTrue(cert_info['expired'])
def test_invalid_custom_certificate(self):
self.login(ADMIN_ACCESS_USER)
# Upload an invalid certificate.
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'),
file=(StringIO('some contents'), 'testcert'), expected_code=204)
# Make sure it is present but invalid.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert', cert_info['path'])
self.assertEquals('no start line', cert_info['error'])
def test_path_sanitization(self):
self.login(ADMIN_ACCESS_USER)
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10)
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert/../foobar'),
file=(StringIO(cert_contents), 'testcert/../foobar'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('foobar', cert_info['path'])
class TestSuperUserTakeOwnership(ApiTestCase):
def test_take_ownership_superuser(self):
self.login(ADMIN_ACCESS_USER)

View file

@ -5,42 +5,45 @@ from OpenSSL import crypto
from util.security.ssl import load_certificate, CertInvalidException, KeyInvalidException
def generate_test_cert(hostname='somehostname', san_list=None, expires=1000000):
""" Generates a test SSL certificate and returns the certificate data and private key data. """
# Based on: http://blog.richardknop.com/2012/08/create-a-self-signed-x509-certificate-in-python/
# Create a key pair.
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 1024)
# Create a self-signed cert.
cert = crypto.X509()
cert.get_subject().CN = hostname
# Add the subjectAltNames (if necessary).
if san_list is not None:
cert.add_extensions([crypto.X509Extension("subjectAltName", False, ", ".join(san_list))])
cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(expires)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, 'sha1')
# Dump the certificate and private key in PEM format.
cert_data = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
key_data = crypto.dump_privatekey(crypto.FILETYPE_PEM, k)
return (cert_data, key_data)
class TestSSLCertificate(unittest.TestCase):
def _generate_cert(self, hostname='somehostname', san_list=None, expires=1000000):
# Based on: http://blog.richardknop.com/2012/08/create-a-self-signed-x509-certificate-in-python/
# Create a key pair.
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 1024)
# Create a self-signed cert.
cert = crypto.X509()
cert.get_subject().CN = hostname
# Add the subjectAltNames (if necessary).
if san_list is not None:
cert.add_extensions([crypto.X509Extension("subjectAltName", False, ", ".join(san_list))])
cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(expires)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, 'sha1')
# Dump the certificate and private key in PEM format.
cert_data = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
key_data = crypto.dump_privatekey(crypto.FILETYPE_PEM, k)
return (cert_data, key_data)
def test_load_certificate(self):
# Try loading an invalid certificate.
with self.assertRaisesRegexp(CertInvalidException, 'no start line'):
load_certificate('someinvalidcontents')
# Load a valid certificate.
(public_key_data, _) = self._generate_cert()
(public_key_data, _) = generate_test_cert()
cert = load_certificate(public_key_data)
self.assertFalse(cert.expired)
@ -48,13 +51,13 @@ class TestSSLCertificate(unittest.TestCase):
self.assertTrue(cert.matches_name('somehostname'))
def test_expired_certificate(self):
(public_key_data, _) = self._generate_cert(expires=-100)
(public_key_data, _) = generate_test_cert(expires=-100)
cert = load_certificate(public_key_data)
self.assertTrue(cert.expired)
def test_hostnames(self):
(public_key_data, _) = self._generate_cert(hostname='foo', san_list=['DNS:bar', 'DNS:baz'])
(public_key_data, _) = generate_test_cert(hostname='foo', san_list=['DNS:bar', 'DNS:baz'])
cert = load_certificate(public_key_data)
self.assertEquals(set(['foo', 'bar', 'baz']), cert.names)
@ -62,12 +65,12 @@ class TestSSLCertificate(unittest.TestCase):
self.assertTrue(cert.matches_name(name))
def test_nondns_hostnames(self):
(public_key_data, _) = self._generate_cert(hostname='foo', san_list=['URI:yarg'])
(public_key_data, _) = generate_test_cert(hostname='foo', san_list=['URI:yarg'])
cert = load_certificate(public_key_data)
self.assertEquals(set(['foo']), cert.names)
def test_validate_private_key(self):
(public_key_data, private_key_data) = self._generate_cert()
(public_key_data, private_key_data) = generate_test_cert()
private_key = NamedTemporaryFile(delete=True)
private_key.write(private_key_data)
@ -77,7 +80,7 @@ class TestSSLCertificate(unittest.TestCase):
cert.validate_private_key(private_key.name)
def test_invalid_private_key(self):
(public_key_data, _) = self._generate_cert()
(public_key_data, _) = generate_test_cert()
private_key = NamedTemporaryFile(delete=True)
private_key.write('somerandomdata')
@ -88,8 +91,8 @@ class TestSSLCertificate(unittest.TestCase):
cert.validate_private_key(private_key.name)
def test_mismatch_private_key(self):
(public_key_data, _) = self._generate_cert()
(_, private_key_data) = self._generate_cert()
(public_key_data, _) = generate_test_cert()
(_, private_key_data) = generate_test_cert()
private_key = NamedTemporaryFile(delete=True)
private_key.write(private_key_data)