import os import logging import pathvalidate from flask import request from config_endpoints.exception import InvalidRequest from config_endpoints.api import resource, ApiResource, verify_not_prod, nickname from config_util.ssl import load_certificate, CertInvalidException from config_app import app, config_provider logger = logging.getLogger(__name__) EXTRA_CA_DIRECTORY = 'extra_ca_certs' @resource('/v1/superuser/customcerts/') class SuperUserCustomCertificate(ApiResource): """ Resource for managing a custom certificate. """ @nickname('uploadCustomCertificate') @verify_not_prod def post(self, certpath): uploaded_file = request.files['file'] if not uploaded_file: raise InvalidRequest('Missing certificate file') # Save the certificate. certpath = pathvalidate.sanitize_filename(certpath) if not certpath.endswith('.crt'): raise InvalidRequest('Invalid certificate file: must have suffix `.crt`') logger.debug('Saving custom certificate %s', certpath) cert_full_path = config_provider.get_volume_path(EXTRA_CA_DIRECTORY, certpath) config_provider.save_volume_file(cert_full_path, uploaded_file) logger.debug('Saved custom certificate %s', certpath) # Validate the certificate. try: logger.debug('Loading custom certificate %s', certpath) with config_provider.get_volume_file(cert_full_path) as f: load_certificate(f.read()) except CertInvalidException: logger.exception('Got certificate invalid error for cert %s', certpath) return '', 204 except IOError: logger.exception('Got IO error for cert %s', certpath) return '', 204 # Call the update script to install the certificate immediately. if not app.config['TESTING']: logger.debug('Calling certs_install.sh') if os.system('/conf/init/certs_install.sh') != 0: raise Exception('Could not install certificates') logger.debug('certs_install.sh completed') return '', 204 @nickname('deleteCustomCertificate') @verify_not_prod def delete(self, certpath): cert_full_path = config_provider.get_volume_path(EXTRA_CA_DIRECTORY, certpath) config_provider.remove_volume_file(cert_full_path) return '', 204 @resource('/v1/superuser/customcerts') class SuperUserCustomCertificates(ApiResource): """ Resource for managing custom certificates. """ @nickname('getCustomCertificates') @verify_not_prod def get(self): has_extra_certs_path = config_provider.volume_file_exists(EXTRA_CA_DIRECTORY) extra_certs_found = config_provider.list_volume_directory(EXTRA_CA_DIRECTORY) if extra_certs_found is None: return { 'status': 'file' if has_extra_certs_path else 'none', } cert_views = [] for extra_cert_path in extra_certs_found: try: cert_full_path = config_provider.get_volume_path(EXTRA_CA_DIRECTORY, extra_cert_path) with config_provider.get_volume_file(cert_full_path) as f: certificate = load_certificate(f.read()) cert_views.append({ 'path': extra_cert_path, 'names': list(certificate.names), 'expired': certificate.expired, }) except CertInvalidException as cie: cert_views.append({ 'path': extra_cert_path, 'error': cie.message, }) except IOError as ioe: cert_views.append({ 'path': extra_cert_path, 'error': ioe.message, }) return { 'status': 'directory', 'certs': cert_views, } # TODO(config) port this endpoint when (https://github.com/quay/quay/pull/3055) merged to ensure no conflicts # @resource('/v1/superuser/keys') # class SuperUserServiceKeyManagement(ApiResource): # """ Resource for managing service keys.""" # schemas = { # 'CreateServiceKey': { # 'id': 'CreateServiceKey', # 'type': 'object', # 'description': 'Description of creation of a service key', # 'required': ['service', 'expiration'], # 'properties': { # 'service': { # 'type': 'string', # 'description': 'The service authenticating with this key', # }, # 'name': { # 'type': 'string', # 'description': 'The friendly name of a service key', # }, # 'metadata': { # 'type': 'object', # 'description': 'The key/value pairs of this key\'s metadata', # }, # 'notes': { # 'type': 'string', # 'description': 'If specified, the extra notes for the key', # }, # 'expiration': { # 'description': 'The expiration date as a unix timestamp', # 'anyOf': [{'type': 'number'}, {'type': 'null'}], # }, # }, # }, # } # # @verify_not_prod # @nickname('listServiceKeys') # def get(self): # keys = pre_oci_model.list_all_service_keys() # # return jsonify({ # 'keys': [key.to_dict() for key in keys], # }) #