This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.

88 lines
3.0 KiB

import os
import logging
import base64
from collections import namedtuple
from cryptography.hazmat.primitives.ciphers.aead import AESCCM
from import convert_secret_key
class DecryptionFailureException(Exception):
""" Exception raised if a field could not be decrypted. """
EncryptionVersion = namedtuple('EncryptionVersion', ['prefix', 'encrypt', 'decrypt'])
logger = logging.getLogger(__name__)
def _encrypt_ccm(secret_key, value, field_max_length=None):
aesccm = AESCCM(secret_key)
nonce = os.urandom(AES_CCM_NONCE_LENGTH)
ct = aesccm.encrypt(nonce, value.encode('utf-8'), None)
encrypted = base64.b64encode(nonce + ct)
if field_max_length:
msg = 'Tried to encode a value too large for this field'
assert (len(encrypted) + _RESERVED_FIELD_SPACE) <= field_max_length, msg
return encrypted
def _decrypt_ccm(secret_key, value):
aesccm = AESCCM(secret_key)
decoded = base64.b64decode(value)
nonce = decoded[:AES_CCM_NONCE_LENGTH]
ct = decoded[AES_CCM_NONCE_LENGTH:]
decrypted = aesccm.decrypt(nonce, ct, None)
return decrypted.decode('utf-8')
except Exception:
logger.exception('Got exception when trying to decrypt value `%s`', value)
raise DecryptionFailureException()
# Defines the versions of encryptions we support. This will allow us to upgrade to newer encryption
# protocols (fairly seamlessly) if need be in the future.
'v0': EncryptionVersion('v0', _encrypt_ccm, _decrypt_ccm),
_RESERVED_FIELD_SPACE = len(_SEPARATOR) + max([len(k) for k in _VERSIONS.keys()])
class FieldEncrypter(object):
""" Helper object for defining how fields are encrypted and decrypted between the database
and the application.
def __init__(self, secret_key, version='v0'):
# NOTE: secret_key will be None when the system is being first initialized, so we allow that
# case here, but make sure to assert that it is *not* None below if any encryption is actually
# needed.
self._secret_key = convert_secret_key(secret_key) if secret_key is not None else None
self._encryption_version = _VERSIONS[version]
def encrypt_value(self, value, field_max_length=None):
""" Encrypts the value using the current version of encryption. """
assert self._secret_key is not None
encrypted_value = self._encryption_version.encrypt(self._secret_key, value, field_max_length)
return '%s%s%s' % (self._encryption_version.prefix, _SEPARATOR, encrypted_value)
def decrypt_value(self, value):
""" Decrypts the value, returning it. If the value cannot be decrypted
raises a DecryptionFailureException.
assert self._secret_key is not None
if _SEPARATOR not in value:
raise DecryptionFailureException('Invalid encrypted value')
version_prefix, data = value.split(_SEPARATOR, 1)
if version_prefix not in _VERSIONS:
raise DecryptionFailureException('Unknown version prefix %s' % version_prefix)
return _VERSIONS[version_prefix].decrypt(self._secret_key, data)