This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/security/signing.py

82 lines
2.3 KiB
Python

import gpgme
import os
import features
import logging
logger = logging.getLogger(__name__)
from StringIO import StringIO
class GPG2Signer(object):
""" Helper class for signing data using GPG2. """
def __init__(self, config, config_provider):
if not config.get('GPG2_PRIVATE_KEY_NAME'):
raise Exception('Missing configuration key GPG2_PRIVATE_KEY_NAME')
if not config.get('GPG2_PRIVATE_KEY_FILENAME'):
raise Exception('Missing configuration key GPG2_PRIVATE_KEY_FILENAME')
if not config.get('GPG2_PUBLIC_KEY_FILENAME'):
raise Exception('Missing configuration key GPG2_PUBLIC_KEY_FILENAME')
self._ctx = gpgme.Context()
self._ctx.armor = True
self._private_key_name = config['GPG2_PRIVATE_KEY_NAME']
self._public_key_filename = config['GPG2_PUBLIC_KEY_FILENAME']
self._config_provider = config_provider
if not config_provider.volume_file_exists(config['GPG2_PRIVATE_KEY_FILENAME']):
raise Exception('Missing key file %s' % config['GPG2_PRIVATE_KEY_FILENAME'])
with config_provider.get_volume_file(config['GPG2_PRIVATE_KEY_FILENAME'], mode='rb') as fp:
self._ctx.import_(fp)
@property
def name(self):
return 'gpg2'
def open_public_key_file(self):
return self._config_provider.get_volume_file(self._public_key_filename, mode='rb')
def detached_sign(self, stream):
""" Signs the given stream, returning the signature. """
ctx = self._ctx
try:
ctx.signers = [ctx.get_key(self._private_key_name)]
except:
raise Exception('Invalid private key name')
signature = StringIO()
new_sigs = ctx.sign(stream, signature, gpgme.SIG_MODE_DETACH)
signature.seek(0)
return signature.getvalue()
class Signer(object):
def __init__(self, app=None, config_provider=None):
self.app = app
if app is not None:
self.state = self.init_app(app, config_provider)
else:
self.state = None
def init_app(self, app, config_provider):
preference = app.config.get('SIGNING_ENGINE', None)
if preference is None:
return None
if not features.ACI_CONVERSION:
return None
try:
return SIGNING_ENGINES[preference](app.config, config_provider)
except:
logger.exception('Could not initialize signing engine')
def __getattr__(self, name):
return getattr(self.state, name, None)
SIGNING_ENGINES = {
'gpg2': GPG2Signer
}