Change spacing from 4 spaces to 2 spaces

This commit is contained in:
Sam Chow 2018-08-15 15:32:24 -04:00
parent ec14007268
commit efa66d84e4
28 changed files with 936 additions and 913 deletions

View file

@ -6,41 +6,42 @@ from config_app.config_util.k8saccessor import KubernetesAccessorSingleton
class TransientDirectoryProvider(FileConfigProvider):
""" Implementation of the config provider that reads and writes the data
from/to the file system, only using temporary directories,
deleting old dirs and creating new ones as requested.
""" Implementation of the config provider that reads and writes the data
from/to the file system, only using temporary directories,
deleting old dirs and creating new ones as requested.
"""
def __init__(self, config_volume, yaml_filename, py_filename):
# Create a temp directory that will be cleaned up when we change the config path
# This should ensure we have no "pollution" of different configs:
# no uploaded config should ever affect subsequent config modifications/creations
temp_dir = TemporaryDirectory()
self.temp_dir = temp_dir
super(TransientDirectoryProvider, self).__init__(temp_dir.name, yaml_filename, py_filename)
@property
def provider_id(self):
return 'transient'
def new_config_dir(self):
"""
def __init__(self, config_volume, yaml_filename, py_filename):
# Create a temp directory that will be cleaned up when we change the config path
# This should ensure we have no "pollution" of different configs:
# no uploaded config should ever affect subsequent config modifications/creations
temp_dir = TemporaryDirectory()
self.temp_dir = temp_dir
super(TransientDirectoryProvider, self).__init__(temp_dir.name, yaml_filename, py_filename)
Update the path with a new temporary directory, deleting the old one in the process
"""
self.temp_dir.cleanup()
temp_dir = TemporaryDirectory()
@property
def provider_id(self):
return 'transient'
self.config_volume = temp_dir.name
self.temp_dir = temp_dir
self.yaml_path = os.path.join(temp_dir.name, self.yaml_filename)
def new_config_dir(self):
"""
Update the path with a new temporary directory, deleting the old one in the process
"""
self.temp_dir.cleanup()
temp_dir = TemporaryDirectory()
def get_config_dir_path(self):
return self.config_volume
self.config_volume = temp_dir.name
self.temp_dir = temp_dir
self.yaml_path = os.path.join(temp_dir.name, self.yaml_filename)
def save_configuration_to_kubernetes(self):
config_path = self.get_config_dir_path()
def get_config_dir_path(self):
return self.config_volume
for name in os.listdir(config_path):
file_path = os.path.join(self.config_volume, name)
KubernetesAccessorSingleton.get_instance().save_file_as_secret(name, file_path)
def save_configuration_to_kubernetes(self):
config_path = self.get_config_dir_path()
for name in os.listdir(config_path):
file_path = os.path.join(self.config_volume, name)
KubernetesAccessorSingleton.get_instance().save_file_as_secret(name, file_path)
return 200
return 200

View file

@ -4,9 +4,9 @@ from config_app.config_util.config.TransientDirectoryProvider import TransientDi
def get_config_provider(config_volume, yaml_filename, py_filename, testing=False):
""" Loads and returns the config provider for the current environment. """
""" Loads and returns the config provider for the current environment. """
if testing:
return TestConfigProvider()
if testing:
return TestConfigProvider()
return TransientDirectoryProvider(config_volume, yaml_filename, py_filename)
return TransientDirectoryProvider(config_volume, yaml_filename, py_filename)

View file

@ -8,64 +8,65 @@ logger = logging.getLogger(__name__)
class BaseFileProvider(BaseProvider):
""" Base implementation of the config provider that reads the data from the file system. """
def __init__(self, config_volume, yaml_filename, py_filename):
self.config_volume = config_volume
self.yaml_filename = yaml_filename
self.py_filename = py_filename
""" Base implementation of the config provider that reads the data from the file system. """
self.yaml_path = os.path.join(config_volume, yaml_filename)
self.py_path = os.path.join(config_volume, py_filename)
def __init__(self, config_volume, yaml_filename, py_filename):
self.config_volume = config_volume
self.yaml_filename = yaml_filename
self.py_filename = py_filename
def update_app_config(self, app_config):
if os.path.exists(self.py_path):
logger.debug('Applying config file: %s', self.py_path)
app_config.from_pyfile(self.py_path)
self.yaml_path = os.path.join(config_volume, yaml_filename)
self.py_path = os.path.join(config_volume, py_filename)
if os.path.exists(self.yaml_path):
logger.debug('Applying config file: %s', self.yaml_path)
import_yaml(app_config, self.yaml_path)
def update_app_config(self, app_config):
if os.path.exists(self.py_path):
logger.debug('Applying config file: %s', self.py_path)
app_config.from_pyfile(self.py_path)
def get_config(self):
if not self.config_exists():
return None
if os.path.exists(self.yaml_path):
logger.debug('Applying config file: %s', self.yaml_path)
import_yaml(app_config, self.yaml_path)
config_obj = {}
import_yaml(config_obj, self.yaml_path)
return config_obj
def get_config(self):
if not self.config_exists():
return None
def config_exists(self):
return self.volume_file_exists(self.yaml_filename)
config_obj = {}
import_yaml(config_obj, self.yaml_path)
return config_obj
def volume_exists(self):
return os.path.exists(self.config_volume)
def config_exists(self):
return self.volume_file_exists(self.yaml_filename)
def volume_file_exists(self, filename):
return os.path.exists(os.path.join(self.config_volume, filename))
def volume_exists(self):
return os.path.exists(self.config_volume)
def get_volume_file(self, filename, mode='r'):
return open(os.path.join(self.config_volume, filename), mode=mode)
def volume_file_exists(self, filename):
return os.path.exists(os.path.join(self.config_volume, filename))
def get_volume_path(self, directory, filename):
return os.path.join(directory, filename)
def get_volume_file(self, filename, mode='r'):
return open(os.path.join(self.config_volume, filename), mode=mode)
def list_volume_directory(self, path):
dirpath = os.path.join(self.config_volume, path)
if not os.path.exists(dirpath):
return None
def get_volume_path(self, directory, filename):
return os.path.join(directory, filename)
if not os.path.isdir(dirpath):
return None
def list_volume_directory(self, path):
dirpath = os.path.join(self.config_volume, path)
if not os.path.exists(dirpath):
return None
return os.listdir(dirpath)
if not os.path.isdir(dirpath):
return None
def requires_restart(self, app_config):
file_config = self.get_config()
if not file_config:
return False
return os.listdir(dirpath)
for key in file_config:
if app_config.get(key) != file_config[key]:
return True
def requires_restart(self, app_config):
file_config = self.get_config()
if not file_config:
return False
return False
for key in file_config:
if app_config.get(key) != file_config[key]:
return True
return False

View file

@ -4,57 +4,57 @@ import logging
from config_app.config_util.config.baseprovider import export_yaml, CannotWriteConfigException
from config_app.config_util.config.basefileprovider import BaseFileProvider
logger = logging.getLogger(__name__)
def _ensure_parent_dir(filepath):
""" Ensures that the parent directory of the given file path exists. """
try:
parentpath = os.path.abspath(os.path.join(filepath, os.pardir))
if not os.path.isdir(parentpath):
os.makedirs(parentpath)
except IOError as ioe:
raise CannotWriteConfigException(str(ioe))
""" Ensures that the parent directory of the given file path exists. """
try:
parentpath = os.path.abspath(os.path.join(filepath, os.pardir))
if not os.path.isdir(parentpath):
os.makedirs(parentpath)
except IOError as ioe:
raise CannotWriteConfigException(str(ioe))
class FileConfigProvider(BaseFileProvider):
""" Implementation of the config provider that reads and writes the data
from/to the file system. """
def __init__(self, config_volume, yaml_filename, py_filename):
super(FileConfigProvider, self).__init__(config_volume, yaml_filename, py_filename)
""" Implementation of the config provider that reads and writes the data
from/to the file system. """
@property
def provider_id(self):
return 'file'
def __init__(self, config_volume, yaml_filename, py_filename):
super(FileConfigProvider, self).__init__(config_volume, yaml_filename, py_filename)
def save_config(self, config_obj):
export_yaml(config_obj, self.yaml_path)
@property
def provider_id(self):
return 'file'
def write_volume_file(self, filename, contents):
filepath = os.path.join(self.config_volume, filename)
_ensure_parent_dir(filepath)
def save_config(self, config_obj):
export_yaml(config_obj, self.yaml_path)
try:
with open(filepath, mode='w') as f:
f.write(contents)
except IOError as ioe:
raise CannotWriteConfigException(str(ioe))
def write_volume_file(self, filename, contents):
filepath = os.path.join(self.config_volume, filename)
_ensure_parent_dir(filepath)
return filepath
try:
with open(filepath, mode='w') as f:
f.write(contents)
except IOError as ioe:
raise CannotWriteConfigException(str(ioe))
def remove_volume_file(self, filename):
filepath = os.path.join(self.config_volume, filename)
os.remove(filepath)
return filepath
def save_volume_file(self, filename, flask_file):
filepath = os.path.join(self.config_volume, filename)
_ensure_parent_dir(filepath)
def remove_volume_file(self, filename):
filepath = os.path.join(self.config_volume, filename)
os.remove(filepath)
# Write the file.
try:
flask_file.save(filepath)
except IOError as ioe:
raise CannotWriteConfigException(str(ioe))
def save_volume_file(self, filename, flask_file):
filepath = os.path.join(self.config_volume, filename)
_ensure_parent_dir(filepath)
return filepath
# Write the file.
try:
flask_file.save(filepath)
except IOError as ioe:
raise CannotWriteConfigException(str(ioe))
return filepath

View file

@ -1,7 +1,6 @@
import json
import io
import os
from datetime import datetime, timedelta
from config_app.config_util.config.baseprovider import BaseProvider
@ -9,73 +8,73 @@ REAL_FILES = ['test/data/signing-private.gpg', 'test/data/signing-public.gpg', '
class TestConfigProvider(BaseProvider):
""" Implementation of the config provider for testing. Everything is kept in-memory instead on
the real file system. """
def __init__(self):
self.clear()
""" Implementation of the config provider for testing. Everything is kept in-memory instead on
the real file system. """
def clear(self):
self.files = {}
self._config = {}
def __init__(self):
self.clear()
@property
def provider_id(self):
return 'test'
def clear(self):
self.files = {}
self._config = {}
def update_app_config(self, app_config):
self._config = app_config
@property
def provider_id(self):
return 'test'
def get_config(self):
if not 'config.yaml' in self.files:
return None
def update_app_config(self, app_config):
self._config = app_config
return json.loads(self.files.get('config.yaml', '{}'))
def get_config(self):
if not 'config.yaml' in self.files:
return None
def save_config(self, config_obj):
self.files['config.yaml'] = json.dumps(config_obj)
return json.loads(self.files.get('config.yaml', '{}'))
def config_exists(self):
return 'config.yaml' in self.files
def save_config(self, config_obj):
self.files['config.yaml'] = json.dumps(config_obj)
def volume_exists(self):
return True
def config_exists(self):
return 'config.yaml' in self.files
def volume_file_exists(self, filename):
if filename in REAL_FILES:
return True
def volume_exists(self):
return True
return filename in self.files
def volume_file_exists(self, filename):
if filename in REAL_FILES:
return True
def save_volume_file(self, filename, flask_file):
self.files[filename] = flask_file.read()
return filename in self.files
def write_volume_file(self, filename, contents):
self.files[filename] = contents
def save_volume_file(self, filename, flask_file):
self.files[filename] = flask_file.read()
def get_volume_file(self, filename, mode='r'):
if filename in REAL_FILES:
return open(filename, mode=mode)
def write_volume_file(self, filename, contents):
self.files[filename] = contents
return io.BytesIO(self.files[filename])
def get_volume_file(self, filename, mode='r'):
if filename in REAL_FILES:
return open(filename, mode=mode)
def remove_volume_file(self, filename):
self.files.pop(filename, None)
return io.BytesIO(self.files[filename])
def list_volume_directory(self, path):
paths = []
for filename in self.files:
if filename.startswith(path):
paths.append(filename[len(path)+1:])
def remove_volume_file(self, filename):
self.files.pop(filename, None)
return paths
def list_volume_directory(self, path):
paths = []
for filename in self.files:
if filename.startswith(path):
paths.append(filename[len(path) + 1:])
def requires_restart(self, app_config):
return False
return paths
def reset_for_test(self):
self._config['SUPER_USERS'] = ['devtable']
self.files = {}
def requires_restart(self, app_config):
return False
def get_volume_path(self, directory, filename):
return os.path.join(directory, filename)
def reset_for_test(self):
self._config['SUPER_USERS'] = ['devtable']
self.files = {}
def get_volume_path(self, directory, filename):
return os.path.join(directory, filename)

View file

@ -11,135 +11,135 @@ logger = logging.getLogger(__name__)
QE_DEPLOYMENT_LABEL = 'quay-enterprise-component'
class KubernetesAccessorSingleton(object):
""" Singleton allowing access to kubernetes operations """
_instance = None
""" Singleton allowing access to kubernetes operations """
_instance = None
def __init__(self, kube_config=None):
self.kube_config = kube_config
if kube_config is None:
self.kube_config = KubernetesConfig.from_env()
def __init__(self, kube_config=None):
self.kube_config = kube_config
if kube_config is None:
self.kube_config = KubernetesConfig.from_env()
KubernetesAccessorSingleton._instance = self
KubernetesAccessorSingleton._instance = self
@classmethod
def get_instance(cls, kube_config=None):
"""
Singleton getter implementation, returns the instance if one exists, otherwise creates the
instance and ties it to the class.
:return: KubernetesAccessorSingleton
"""
if cls._instance is None:
return cls(kube_config)
@classmethod
def get_instance(cls, kube_config=None):
"""
Singleton getter implementation, returns the instance if one exists, otherwise creates the
instance and ties it to the class.
:return: KubernetesAccessorSingleton
"""
if cls._instance is None:
return cls(kube_config)
return cls._instance
return cls._instance
def save_file_as_secret(self, name, file_path):
with open(file_path) as f:
value = f.read()
self._update_secret_file(name, value)
def save_file_as_secret(self, name, file_path):
with open(file_path) as f:
value = f.read()
self._update_secret_file(name, value)
def get_qe_deployments(self):
""""
Returns all deployments matching the label selector provided in the KubeConfig
"""
deployment_selector_url = 'namespaces/%s/deployments?labelSelector=%s%%3D%s' % (
self.kube_config.qe_namespace, QE_DEPLOYMENT_LABEL, self.kube_config.qe_deployment_selector
)
def get_qe_deployments(self):
""""
Returns all deployments matching the label selector provided in the KubeConfig
"""
deployment_selector_url = 'namespaces/%s/deployments?labelSelector=%s%%3D%s' % (
self.kube_config.qe_namespace, QE_DEPLOYMENT_LABEL, self.kube_config.qe_deployment_selector
)
response = self._execute_k8s_api('GET', deployment_selector_url, api_prefix='apis/extensions/v1beta1')
if response.status_code != 200:
return None
return json.loads(response.text)
response = self._execute_k8s_api('GET', deployment_selector_url, api_prefix='apis/extensions/v1beta1')
if response.status_code != 200:
return None
return json.loads(response.text)
def cycle_qe_deployments(self, deployment_names):
""""
Triggers a rollout of all desired deployments in the qe namespace
"""
def cycle_qe_deployments(self, deployment_names):
""""
Triggers a rollout of all desired deployments in the qe namespace
"""
for name in deployment_names:
logger.debug('Cycling deployment %s', name)
deployment_url = 'namespaces/%s/deployments/%s' % (self.kube_config.qe_namespace, name)
for name in deployment_names:
logger.debug('Cycling deployment %s', name)
deployment_url = 'namespaces/%s/deployments/%s' % (self.kube_config.qe_namespace, name)
# There is currently no command to simply rolling restart all the pods: https://github.com/kubernetes/kubernetes/issues/13488
# Instead, we modify the template of the deployment with a dummy env variable to trigger a cycle of the pods
# (based off this comment: https://github.com/kubernetes/kubernetes/issues/13488#issuecomment-240393845)
self._assert_success(self._execute_k8s_api('PATCH', deployment_url, {
'spec': {
'template': {
'spec': {
'containers': [{
'name': 'quay-enterprise-app', 'env': [{
'name': 'RESTART_TIME',
'value': str(datetime.datetime.now())
}]
# There is currently no command to simply rolling restart all the pods: https://github.com/kubernetes/kubernetes/issues/13488
# Instead, we modify the template of the deployment with a dummy env variable to trigger a cycle of the pods
# (based off this comment: https://github.com/kubernetes/kubernetes/issues/13488#issuecomment-240393845)
self._assert_success(self._execute_k8s_api('PATCH', deployment_url, {
'spec': {
'template': {
'spec': {
'containers': [{
'name': 'quay-enterprise-app', 'env': [{
'name': 'RESTART_TIME',
'value': str(datetime.datetime.now())
}]
}
}]
}
}
}, api_prefix='apis/extensions/v1beta1', content_type='application/strategic-merge-patch+json'))
def _assert_success(self, response):
if response.status_code != 200:
logger.error('Kubernetes API call failed with response: %s => %s', response.status_code,
response.text)
raise Exception('Kubernetes API call failed: %s' % response.text)
def _update_secret_file(self, relative_file_path, value=None):
if '/' in relative_file_path:
raise Exception('Expected path from get_volume_path, but found slashes')
# Check first that the namespace for Quay Enterprise exists. If it does not, report that
# as an error, as it seems to be a common issue.
namespace_url = 'namespaces/%s' % (self.kube_config.qe_namespace)
response = self._execute_k8s_api('GET', namespace_url)
if response.status_code // 100 != 2:
msg = 'A Kubernetes namespace with name `%s` must be created to save config' % self.kube_config.qe_namespace
raise Exception(msg)
# Check if the secret exists. If not, then we create an empty secret and then update the file
# inside.
secret_url = 'namespaces/%s/secrets/%s' % (self.kube_config.qe_namespace, self.kube_config.qe_config_secret)
secret = self._lookup_secret()
if secret is None:
self._assert_success(self._execute_k8s_api('POST', secret_url, {
"kind": "Secret",
"apiVersion": "v1",
"metadata": {
"name": self.kube_config.qe_config_secret
},
"data": {}
}))
# Update the secret to reflect the file change.
secret['data'] = secret.get('data', {})
if value is not None:
secret['data'][relative_file_path] = base64.b64encode(value)
else:
secret['data'].pop(relative_file_path)
self._assert_success(self._execute_k8s_api('PUT', secret_url, secret))
def _lookup_secret(self):
secret_url = 'namespaces/%s/secrets/%s' % (self.kube_config.qe_namespace, self.kube_config.qe_config_secret)
response = self._execute_k8s_api('GET', secret_url)
if response.status_code != 200:
return None
return json.loads(response.text)
def _execute_k8s_api(self, method, relative_url, data=None, api_prefix='api/v1', content_type='application/json'):
headers = {
'Authorization': 'Bearer ' + self.kube_config.service_account_token
}
}, api_prefix='apis/extensions/v1beta1', content_type='application/strategic-merge-patch+json'))
if data:
headers['Content-Type'] = content_type
def _assert_success(self, response):
if response.status_code != 200:
logger.error('Kubernetes API call failed with response: %s => %s', response.status_code,
response.text)
raise Exception('Kubernetes API call failed: %s' % response.text)
data = json.dumps(data) if data else None
session = Session()
url = 'https://%s/%s/%s' % (self.kube_config.api_host, api_prefix, relative_url)
def _update_secret_file(self, relative_file_path, value=None):
if '/' in relative_file_path:
raise Exception('Expected path from get_volume_path, but found slashes')
request = Request(method, url, data=data, headers=headers)
return session.send(request.prepare(), verify=False, timeout=2)
# Check first that the namespace for Quay Enterprise exists. If it does not, report that
# as an error, as it seems to be a common issue.
namespace_url = 'namespaces/%s' % (self.kube_config.qe_namespace)
response = self._execute_k8s_api('GET', namespace_url)
if response.status_code // 100 != 2:
msg = 'A Kubernetes namespace with name `%s` must be created to save config' % self.kube_config.qe_namespace
raise Exception(msg)
# Check if the secret exists. If not, then we create an empty secret and then update the file
# inside.
secret_url = 'namespaces/%s/secrets/%s' % (self.kube_config.qe_namespace, self.kube_config.qe_config_secret)
secret = self._lookup_secret()
if secret is None:
self._assert_success(self._execute_k8s_api('POST', secret_url, {
"kind": "Secret",
"apiVersion": "v1",
"metadata": {
"name": self.kube_config.qe_config_secret
},
"data": {}
}))
# Update the secret to reflect the file change.
secret['data'] = secret.get('data', {})
if value is not None:
secret['data'][relative_file_path] = base64.b64encode(value)
else:
secret['data'].pop(relative_file_path)
self._assert_success(self._execute_k8s_api('PUT', secret_url, secret))
def _lookup_secret(self):
secret_url = 'namespaces/%s/secrets/%s' % (self.kube_config.qe_namespace, self.kube_config.qe_config_secret)
response = self._execute_k8s_api('GET', secret_url)
if response.status_code != 200:
return None
return json.loads(response.text)
def _execute_k8s_api(self, method, relative_url, data=None, api_prefix='api/v1', content_type='application/json'):
headers = {
'Authorization': 'Bearer ' + self.kube_config.service_account_token
}
if data:
headers['Content-Type'] = content_type
data = json.dumps(data) if data else None
session = Session()
url = 'https://%s/%s/%s' % (self.kube_config.api_host, api_prefix, relative_url)
request = Request(method, url, data=data, headers=headers)
return session.send(request.prepare(), verify=False, timeout=2)

View file

@ -8,9 +8,11 @@ DEFAULT_QE_CONFIG_SECRET = 'quay-enterprise-config-secret'
# The name of the quay enterprise deployment (not config app) that is used to query & rollout
DEFAULT_QE_DEPLOYMENT_SELECTOR = 'app'
def get_k8s_namespace():
return os.environ.get('QE_K8S_NAMESPACE', DEFAULT_QE_NAMESPACE)
class KubernetesConfig(object):
def __init__(self, api_host='', service_account_token=SERVICE_ACCOUNT_TOKEN_PATH,
qe_namespace=DEFAULT_QE_NAMESPACE,
@ -31,7 +33,7 @@ class KubernetesConfig(object):
with open(SERVICE_ACCOUNT_TOKEN_PATH, 'r') as f:
service_token = f.read()
api_host=os.environ.get('KUBERNETES_SERVICE_HOST', '')
api_host = os.environ.get('KUBERNETES_SERVICE_HOST', '')
port = os.environ.get('KUBERNETES_SERVICE_PORT')
if port:
api_host += ':' + port
@ -42,6 +44,3 @@ class KubernetesConfig(object):
return cls(api_host=api_host, service_account_token=service_token, qe_namespace=qe_namespace,
qe_config_secret=qe_config_secret, qe_deployment_selector=qe_deployment_selector)

View file

@ -3,45 +3,45 @@ from config_app._init_config import CONF_DIR
def logfile_path(jsonfmt=False, debug=False):
"""
Returns the a logfileconf path following this rules:
- conf/logging_debug_json.conf # jsonfmt=true, debug=true
- conf/logging_json.conf # jsonfmt=true, debug=false
- conf/logging_debug.conf # jsonfmt=false, debug=true
- conf/logging.conf # jsonfmt=false, debug=false
Can be parametrized via envvars: JSONLOG=true, DEBUGLOG=true
"""
_json = ""
_debug = ""
"""
Returns the a logfileconf path following this rules:
- conf/logging_debug_json.conf # jsonfmt=true, debug=true
- conf/logging_json.conf # jsonfmt=true, debug=false
- conf/logging_debug.conf # jsonfmt=false, debug=true
- conf/logging.conf # jsonfmt=false, debug=false
Can be parametrized via envvars: JSONLOG=true, DEBUGLOG=true
"""
_json = ""
_debug = ""
if jsonfmt or os.getenv('JSONLOG', 'false').lower() == 'true':
_json = "_json"
if jsonfmt or os.getenv('JSONLOG', 'false').lower() == 'true':
_json = "_json"
if debug or os.getenv('DEBUGLOG', 'false').lower() == 'true':
_debug = "_debug"
if debug or os.getenv('DEBUGLOG', 'false').lower() == 'true':
_debug = "_debug"
return os.path.join(CONF_DIR, "logging%s%s.conf" % (_debug, _json))
return os.path.join(CONF_DIR, "logging%s%s.conf" % (_debug, _json))
def filter_logs(values, filtered_fields):
"""
Takes a dict and a list of keys to filter.
eg:
with filtered_fields:
[{'key': ['k1', k2'], 'fn': lambda x: 'filtered'}]
and values:
{'k1': {'k2': 'some-secret'}, 'k3': 'some-value'}
the returned dict is:
{'k1': {k2: 'filtered'}, 'k3': 'some-value'}
"""
for field in filtered_fields:
cdict = values
"""
Takes a dict and a list of keys to filter.
eg:
with filtered_fields:
[{'key': ['k1', k2'], 'fn': lambda x: 'filtered'}]
and values:
{'k1': {'k2': 'some-secret'}, 'k3': 'some-value'}
the returned dict is:
{'k1': {k2: 'filtered'}, 'k3': 'some-value'}
"""
for field in filtered_fields:
cdict = values
for key in field['key'][:-1]:
if key in cdict:
cdict = cdict[key]
for key in field['key'][:-1]:
if key in cdict:
cdict = cdict[key]
last_key = field['key'][-1]
last_key = field['key'][-1]
if last_key in cdict and cdict[last_key]:
cdict[last_key] = field['fn'](cdict[last_key])
if last_key in cdict and cdict[last_key]:
cdict[last_key] = field['fn'](cdict[last_key])

View file

@ -2,10 +2,12 @@ from fnmatch import fnmatch
import OpenSSL
class CertInvalidException(Exception):
""" Exception raised when a certificate could not be parsed/loaded. """
pass
class KeyInvalidException(Exception):
""" Exception raised when a key could not be parsed/loaded or successfully applied to a cert. """
pass
@ -24,8 +26,10 @@ def load_certificate(cert_contents):
_SUBJECT_ALT_NAME = 'subjectAltName'
class SSLCertificate(object):
""" Helper class for easier working with SSL certificates. """
def __init__(self, openssl_cert):
self.openssl_cert = openssl_cert

View file

@ -1,20 +1,22 @@
from util.config.validator import EXTRA_CA_DIRECTORY
def strip_absolute_path_and_add_trailing_dir(path):
"""
Removes the initial trailing / from the prefix path, and add the last dir one
"""
return path[1:] + '/'
"""
Removes the initial trailing / from the prefix path, and add the last dir one
"""
return path[1:] + '/'
def tarinfo_filter_partial(prefix):
def tarinfo_filter(tarinfo):
# remove leading directory info
tarinfo.name = tarinfo.name.replace(prefix, '')
def tarinfo_filter(tarinfo):
# remove leading directory info
tarinfo.name = tarinfo.name.replace(prefix, '')
# ignore any directory that isn't the specified extra ca one:
if tarinfo.isdir() and not tarinfo.name == EXTRA_CA_DIRECTORY:
return None
# ignore any directory that isn't the specified extra ca one:
if tarinfo.isdir() and not tarinfo.name == EXTRA_CA_DIRECTORY:
return None
return tarinfo
return tarinfo
return tarinfo_filter
return tarinfo_filter

View file

@ -1,23 +1,25 @@
import pytest
import re
from httmock import urlmatch, HTTMock, response
from config_app.config_util.k8saccessor import KubernetesAccessorSingleton
from config_app.config_util.k8sconfig import KubernetesConfig
@pytest.mark.parametrize('kube_config, expected_api, expected_query', [
({'api_host':'www.customhost.com'},
({'api_host': 'www.customhost.com'},
'/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments', 'labelSelector=quay-enterprise-component%3Dapp'),
({'api_host':'www.customhost.com', 'qe_deployment_selector':'custom-selector'},
'/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments', 'labelSelector=quay-enterprise-component%3Dcustom-selector'),
({'api_host': 'www.customhost.com', 'qe_deployment_selector': 'custom-selector'},
'/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments',
'labelSelector=quay-enterprise-component%3Dcustom-selector'),
({'api_host':'www.customhost.com', 'qe_namespace':'custom-namespace'},
({'api_host': 'www.customhost.com', 'qe_namespace': 'custom-namespace'},
'/apis/extensions/v1beta1/namespaces/custom-namespace/deployments', 'labelSelector=quay-enterprise-component%3Dapp'),
({'api_host':'www.customhost.com', 'qe_namespace':'custom-namespace', 'qe_deployment_selector':'custom-selector'},
'/apis/extensions/v1beta1/namespaces/custom-namespace/deployments', 'labelSelector=quay-enterprise-component%3Dcustom-selector'),
({'api_host': 'www.customhost.com', 'qe_namespace': 'custom-namespace', 'qe_deployment_selector': 'custom-selector'},
'/apis/extensions/v1beta1/namespaces/custom-namespace/deployments',
'labelSelector=quay-enterprise-component%3Dcustom-selector'),
])
def test_get_qe_deployments(kube_config, expected_api, expected_query):
config = KubernetesConfig(**kube_config)
@ -36,12 +38,15 @@ def test_get_qe_deployments(kube_config, expected_api, expected_query):
assert url_hit[0]
@pytest.mark.parametrize('kube_config, deployment_names, expected_api_hits', [
({'api_host':'www.customhost.com'}, [], []),
({'api_host':'www.customhost.com'}, ['myDeployment'], ['/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments/myDeployment']),
({'api_host':'www.customhost.com', 'qe_namespace':'custom-namespace'},
({'api_host': 'www.customhost.com'}, [], []),
({'api_host': 'www.customhost.com'}, ['myDeployment'],
['/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments/myDeployment']),
({'api_host': 'www.customhost.com', 'qe_namespace': 'custom-namespace'},
['myDeployment', 'otherDeployment'],
['/apis/extensions/v1beta1/namespaces/custom-namespace/deployments/myDeployment', '/apis/extensions/v1beta1/namespaces/custom-namespace/deployments/otherDeployment']),
['/apis/extensions/v1beta1/namespaces/custom-namespace/deployments/myDeployment',
'/apis/extensions/v1beta1/namespaces/custom-namespace/deployments/otherDeployment']),
])
def test_cycle_qe_deployments(kube_config, deployment_names, expected_api_hits):
KubernetesAccessorSingleton._instance = None

View file

@ -6,24 +6,27 @@ from util.config.validator import EXTRA_CA_DIRECTORY
from test.fixtures import *
class MockTarInfo:
def __init__(self, name, isdir):
self.name = name
self.isdir = lambda: isdir
def __eq__(self, other):
return other is not None and self.name == other.name
class MockTarInfo:
def __init__(self, name, isdir):
self.name = name
self.isdir = lambda: isdir
def __eq__(self, other):
return other is not None and self.name == other.name
@pytest.mark.parametrize('prefix,tarinfo,expected', [
# It should handle simple files
('Users/sam/', MockTarInfo('Users/sam/config.yaml', False), MockTarInfo('config.yaml', False)),
# It should allow the extra CA dir
('Users/sam/', MockTarInfo('Users/sam/%s' % EXTRA_CA_DIRECTORY, True), MockTarInfo('%s' % EXTRA_CA_DIRECTORY, True)),
# it should allow a file in that extra dir
('Users/sam/', MockTarInfo('Users/sam/%s/cert.crt' % EXTRA_CA_DIRECTORY, False), MockTarInfo('%s/cert.crt' % EXTRA_CA_DIRECTORY, False)),
# it should not allow a directory that isn't the CA dir
('Users/sam/', MockTarInfo('Users/sam/dirignore', True), None),
# It should handle simple files
('Users/sam/', MockTarInfo('Users/sam/config.yaml', False), MockTarInfo('config.yaml', False)),
# It should allow the extra CA dir
('Users/sam/', MockTarInfo('Users/sam/%s' % EXTRA_CA_DIRECTORY, True), MockTarInfo('%s' % EXTRA_CA_DIRECTORY, True)),
# it should allow a file in that extra dir
('Users/sam/', MockTarInfo('Users/sam/%s/cert.crt' % EXTRA_CA_DIRECTORY, False),
MockTarInfo('%s/cert.crt' % EXTRA_CA_DIRECTORY, False)),
# it should not allow a directory that isn't the CA dir
('Users/sam/', MockTarInfo('Users/sam/dirignore', True), None),
])
def test_tarinfo_filter(prefix, tarinfo, expected):
partial = tarinfo_filter_partial(prefix)
assert partial(tarinfo) == expected
partial = tarinfo_filter_partial(prefix)
assert partial(tarinfo) == expected