import logging import pathvalidate import os import subprocess from flask import request, jsonify from util.config.validator import EXTRA_CA_DIRECTORY from config_app.config_endpoints.exception import InvalidRequest from config_app.config_endpoints.api import resource, ApiResource, nickname from config_app.config_endpoints.api.superuser_models_pre_oci import pre_oci_model from config_app.config_util.ssl import load_certificate, CertInvalidException from config_app.c_app import config_provider, INIT_SCRIPTS_LOCATION logger = logging.getLogger(__name__) @resource('/v1/superuser/customcerts/') class SuperUserCustomCertificate(ApiResource): """ Resource for managing a custom certificate. """ @nickname('uploadCustomCertificate') def post(self, certpath): uploaded_file = request.files['file'] if not uploaded_file: raise InvalidRequest('Missing certificate file') # Save the certificate. certpath = pathvalidate.sanitize_filename(certpath) if not certpath.endswith('.crt'): raise InvalidRequest('Invalid certificate file: must have suffix `.crt`') logger.debug('Saving custom certificate %s', certpath) cert_full_path = config_provider.get_volume_path(EXTRA_CA_DIRECTORY, certpath) config_provider.save_volume_file(cert_full_path, uploaded_file) logger.debug('Saved custom certificate %s', certpath) # Validate the certificate. try: logger.debug('Loading custom certificate %s', certpath) with config_provider.get_volume_file(cert_full_path) as f: load_certificate(f.read()) except CertInvalidException: logger.exception('Got certificate invalid error for cert %s', certpath) return '', 204 except IOError: logger.exception('Got IO error for cert %s', certpath) return '', 204 # Call the update script with config dir location to install the certificate immediately. if subprocess.call([os.path.join(INIT_SCRIPTS_LOCATION, 'certs_install.sh')], env={ 'QUAYCONFIG': config_provider.get_config_dir_path() }) != 0: raise Exception('Could not install certificates') return '', 204 @nickname('deleteCustomCertificate') def delete(self, certpath): cert_full_path = config_provider.get_volume_path(EXTRA_CA_DIRECTORY, certpath) config_provider.remove_volume_file(cert_full_path) return '', 204 @resource('/v1/superuser/customcerts') class SuperUserCustomCertificates(ApiResource): """ Resource for managing custom certificates. """ @nickname('getCustomCertificates') def get(self): has_extra_certs_path = config_provider.volume_file_exists(EXTRA_CA_DIRECTORY) extra_certs_found = config_provider.list_volume_directory(EXTRA_CA_DIRECTORY) if extra_certs_found is None: return { 'status': 'file' if has_extra_certs_path else 'none', } cert_views = [] for extra_cert_path in extra_certs_found: try: cert_full_path = config_provider.get_volume_path(EXTRA_CA_DIRECTORY, extra_cert_path) with config_provider.get_volume_file(cert_full_path) as f: certificate = load_certificate(f.read()) cert_views.append({ 'path': extra_cert_path, 'names': list(certificate.names), 'expired': certificate.expired, }) except CertInvalidException as cie: cert_views.append({ 'path': extra_cert_path, 'error': cie.message, }) except IOError as ioe: cert_views.append({ 'path': extra_cert_path, 'error': ioe.message, }) return { 'status': 'directory', 'certs': cert_views, } @resource('/v1/superuser/keys') class SuperUserServiceKeyManagement(ApiResource): """ Resource for managing service keys.""" schemas = { 'CreateServiceKey': { 'id': 'CreateServiceKey', 'type': 'object', 'description': 'Description of creation of a service key', 'required': ['service', 'expiration'], 'properties': { 'service': { 'type': 'string', 'description': 'The service authenticating with this key', }, 'name': { 'type': 'string', 'description': 'The friendly name of a service key', }, 'metadata': { 'type': 'object', 'description': 'The key/value pairs of this key\'s metadata', }, 'notes': { 'type': 'string', 'description': 'If specified, the extra notes for the key', }, 'expiration': { 'description': 'The expiration date as a unix timestamp', 'anyOf': [{'type': 'number'}, {'type': 'null'}], }, }, }, } @nickname('listServiceKeys') def get(self): keys = pre_oci_model.list_all_service_keys() return jsonify({ 'keys': [key.to_dict() for key in keys], })