Add cycling deployments and updating config

Add kube config with refactor to kube accessor

Add tests for k8s accessor, some styling changes
This commit is contained in:
Sam Chow 2018-08-09 16:43:11 -04:00
parent d387ba171f
commit eea5fe3391
23 changed files with 830 additions and 560 deletions

View file

@ -2,7 +2,7 @@ import os
from backports.tempfile import TemporaryDirectory
from config_app.config_util.config.fileprovider import FileConfigProvider
from config_app.config_util.k8sinterface import kubernetes_access_instance
from config_app.config_util.k8saccessor import KubernetesAccessorSingleton
class TransientDirectoryProvider(FileConfigProvider):
@ -41,6 +41,6 @@ class TransientDirectoryProvider(FileConfigProvider):
for name in os.listdir(config_path):
file_path = os.path.join(self.config_volume, name)
kubernetes_access_instance.save_file_as_secret(name, file_path)
KubernetesAccessorSingleton.get_instance().save_file_as_secret(name, file_path)
return 200

View file

@ -0,0 +1,145 @@
import logging
import json
import base64
import datetime
from requests import Request, Session
from config_app.config_util.k8sconfig import KubernetesConfig
logger = logging.getLogger(__name__)
QE_DEPLOYMENT_LABEL = 'quay-enterprise-component'
class KubernetesAccessorSingleton(object):
""" Singleton allowing access to kubernetes operations """
_instance = None
def __init__(self, kube_config=None):
self.kube_config = kube_config
if kube_config is None:
self.kube_config = KubernetesConfig.from_env()
KubernetesAccessorSingleton._instance = self
@classmethod
def get_instance(cls, kube_config=None):
"""
Singleton getter implementation, returns the instance if one exists, otherwise creates the
instance and ties it to the class.
:return: KubernetesAccessorSingleton
"""
if cls._instance is None:
return cls(kube_config)
return cls._instance
def save_file_as_secret(self, name, file_path):
with open(file_path) as f:
value = f.read()
self._update_secret_file(name, value)
def get_qe_deployments(self):
""""
Returns all deployments matching the label selector provided in the KubeConfig
"""
deployment_selector_url = 'namespaces/%s/deployments?labelSelector=%s%%3D%s' % (
self.kube_config.qe_namespace, QE_DEPLOYMENT_LABEL, self.kube_config.qe_deployment_selector
)
response = self._execute_k8s_api('GET', deployment_selector_url, api_prefix='apis/extensions/v1beta1')
if response.status_code != 200:
return None
return json.loads(response.text)
def cycle_qe_deployments(self, deployment_names):
""""
Triggers a rollout of all desired deployments in the qe namespace
"""
for name in deployment_names:
logger.debug('Cycling deployment %s', name)
deployment_url = 'namespaces/%s/deployments/%s' % (self.kube_config.qe_namespace, name)
# There is currently no command to simply rolling restart all the pods: https://github.com/kubernetes/kubernetes/issues/13488
# Instead, we modify the template of the deployment with a dummy env variable to trigger a cycle of the pods
# (based off this comment: https://github.com/kubernetes/kubernetes/issues/13488#issuecomment-240393845)
self._assert_success(self._execute_k8s_api('PATCH', deployment_url, {
'spec': {
'template': {
'spec': {
'containers': [{
'name': 'quay-enterprise-app', 'env': [{
'name': 'RESTART_TIME',
'value': str(datetime.datetime.now())
}]
}]
}
}
}
}, api_prefix='apis/extensions/v1beta1', content_type='application/strategic-merge-patch+json'))
def _assert_success(self, response):
if response.status_code != 200:
logger.error('Kubernetes API call failed with response: %s => %s', response.status_code,
response.text)
raise Exception('Kubernetes API call failed: %s' % response.text)
def _update_secret_file(self, relative_file_path, value=None):
if '/' in relative_file_path:
raise Exception('Expected path from get_volume_path, but found slashes')
# Check first that the namespace for Quay Enterprise exists. If it does not, report that
# as an error, as it seems to be a common issue.
namespace_url = 'namespaces/%s' % (self.kube_config.qe_namespace)
response = self._execute_k8s_api('GET', namespace_url)
if response.status_code // 100 != 2:
msg = 'A Kubernetes namespace with name `%s` must be created to save config' % self.kube_config.qe_namespace
raise Exception(msg)
# Check if the secret exists. If not, then we create an empty secret and then update the file
# inside.
secret_url = 'namespaces/%s/secrets/%s' % (self.kube_config.qe_namespace, self.kube_config.qe_config_secret)
secret = self._lookup_secret()
if secret is None:
self._assert_success(self._execute_k8s_api('POST', secret_url, {
"kind": "Secret",
"apiVersion": "v1",
"metadata": {
"name": self.kube_config.qe_config_secret
},
"data": {}
}))
# Update the secret to reflect the file change.
secret['data'] = secret.get('data', {})
if value is not None:
secret['data'][relative_file_path] = base64.b64encode(value)
else:
secret['data'].pop(relative_file_path)
self._assert_success(self._execute_k8s_api('PUT', secret_url, secret))
def _lookup_secret(self):
secret_url = 'namespaces/%s/secrets/%s' % (self.kube_config.qe_namespace, self.kube_config.qe_config_secret)
response = self._execute_k8s_api('GET', secret_url)
if response.status_code != 200:
return None
return json.loads(response.text)
def _execute_k8s_api(self, method, relative_url, data=None, api_prefix='api/v1', content_type='application/json'):
headers = {
'Authorization': 'Bearer ' + self.kube_config.service_account_token
}
if data:
headers['Content-Type'] = content_type
data = json.dumps(data) if data else None
session = Session()
url = 'https://%s/%s/%s' % (self.kube_config.api_host, api_prefix, relative_url)
request = Request(method, url, data=data, headers=headers)
return session.send(request.prepare(), verify=False, timeout=2)

View file

@ -0,0 +1,47 @@
import os
SERVICE_ACCOUNT_TOKEN_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/token'
DEFAULT_QE_NAMESPACE = 'quay-enterprise'
DEFAULT_QE_CONFIG_SECRET = 'quay-enterprise-config-secret'
# The name of the quay enterprise deployment (not config app) that is used to query & rollout
DEFAULT_QE_DEPLOYMENT_SELECTOR = 'app'
def get_k8s_namespace():
return os.environ.get('QE_K8S_NAMESPACE', DEFAULT_QE_NAMESPACE)
class KubernetesConfig(object):
def __init__(self, api_host='', service_account_token=SERVICE_ACCOUNT_TOKEN_PATH,
qe_namespace=DEFAULT_QE_NAMESPACE,
qe_config_secret=DEFAULT_QE_CONFIG_SECRET,
qe_deployment_selector=DEFAULT_QE_DEPLOYMENT_SELECTOR):
self.api_host = api_host
self.qe_namespace = qe_namespace
self.qe_config_secret = qe_config_secret
self.qe_deployment_selector = qe_deployment_selector
self.service_account_token = service_account_token
@classmethod
def from_env(cls):
# Load the service account token from the local store.
if not os.path.exists(SERVICE_ACCOUNT_TOKEN_PATH):
raise Exception('Cannot load Kubernetes service account token')
with open(SERVICE_ACCOUNT_TOKEN_PATH, 'r') as f:
service_token = f.read()
api_host=os.environ.get('KUBERNETES_SERVICE_HOST', '')
port = os.environ.get('KUBERNETES_SERVICE_PORT')
if port:
api_host += ':' + port
qe_namespace = get_k8s_namespace()
qe_config_secret = os.environ.get('QE_K8S_CONFIG_SECRET', DEFAULT_QE_CONFIG_SECRET)
qe_deployment_selector = os.environ.get('QE_DEPLOYMENT_SELECTOR', DEFAULT_QE_DEPLOYMENT_SELECTOR)
return cls(api_host=api_host, service_account_token=service_token, qe_namespace=qe_namespace,
qe_config_secret=qe_config_secret, qe_deployment_selector=qe_deployment_selector)

View file

@ -1,189 +0,0 @@
import os
import logging
import json
import base64
import time
from cStringIO import StringIO
from requests import Request, Session
logger = logging.getLogger(__name__)
KUBERNETES_API_HOST = os.environ.get('KUBERNETES_SERVICE_HOST', '')
port = os.environ.get('KUBERNETES_SERVICE_PORT')
if port:
KUBERNETES_API_HOST += ':' + port
SERVICE_ACCOUNT_TOKEN_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/token'
QE_NAMESPACE = os.environ.get('QE_K8S_NAMESPACE', 'quay-enterprise')
QE_CONFIG_SECRET = os.environ.get('QE_K8S_CONFIG_SECRET', 'quay-enterprise-config-secret')
# The name of the quay enterprise deployment (not config app) that is used to query & rollout
QE_DEPLOYMENT_SELECTOR = os.environ.get('QE_DEPLOYMENT_SELECTOR', 'app')
class KubernetesAccessInterface:
""" Implementation of the config provider that reads and writes configuration
data from a Kubernetes Secret. """
def __init__(self, api_host=None, service_account_token_path=None):
service_account_token_path = service_account_token_path or SERVICE_ACCOUNT_TOKEN_PATH
api_host = api_host or KUBERNETES_API_HOST
# Load the service account token from the local store.
if not os.path.exists(service_account_token_path):
raise Exception('Cannot load Kubernetes service account token')
with open(service_account_token_path, 'r') as f:
self._service_token = f.read()
self._api_host = api_host
# def volume_file_exists(self, relative_file_path):
# if '/' in relative_file_path:
# raise Exception('Expected path from get_volume_path, but found slashes')
#
# # NOTE: Overridden because we don't have subdirectories, which aren't supported
# # in Kubernetes secrets.
# secret = self._lookup_secret()
# if not secret or not secret.get('data'):
# return False
# return relative_file_path in secret['data']
#
# def list_volume_directory(self, path):
# # NOTE: Overridden because we don't have subdirectories, which aren't supported
# # in Kubernetes secrets.
# secret = self._lookup_secret()
#
# if not secret:
# return []
#
# paths = []
# for filename in secret.get('data', {}):
# if filename.startswith(path):
# paths.append(filename[len(path) + 1:])
# return paths
#
# def save_config(self, config_obj):
# self._update_secret_file(self.yaml_filename, get_yaml(config_obj))
#
# def remove_volume_file(self, relative_file_path):
# try:
# self._update_secret_file(relative_file_path, None)
# except IOError as ioe:
# raise CannotWriteConfigException(str(ioe))
#
# def save_volume_file(self, flask_file, relative_file_path):
# # Write the file to a temp location.
# buf = StringIO()
# try:
# try:
# flask_file.save(buf)
# except IOError as ioe:
# raise Exception(str(ioe))
#
# self._update_secret_file(relative_file_path, buf.getvalue())
# finally:
# buf.close()
def save_file_as_secret(self, name, file_path):
with open(file_path) as f:
value = f.read()
self._update_secret_file(name, value)
def get_qe_deployments(self):
pod_selector_url = 'namespaces/%s/deployments?labelSelector=quay-enterprise-component%%3D%s' % (QE_NAMESPACE, QE_DEPLOYMENT_SELECTOR)
response = self._execute_k8s_api('GET', pod_selector_url, api_prefix='apis/extensions/v1beta1')
if response.status_code != 200:
return None
return json.loads(response.text)
def _assert_success(self, response):
if response.status_code != 200:
logger.error('Kubernetes API call failed with response: %s => %s', response.status_code,
response.text)
raise Exception('Kubernetes API call failed: %s' % response.text)
def _update_secret_file(self, relative_file_path, value=None):
if '/' in relative_file_path:
raise Exception('Expected path from get_volume_path, but found slashes')
# Check first that the namespace for Quay Enterprise exists. If it does not, report that
# as an error, as it seems to be a common issue.
namespace_url = 'namespaces/%s' % (QE_NAMESPACE)
response = self._execute_k8s_api('GET', namespace_url)
if response.status_code // 100 != 2:
msg = 'A Kubernetes namespace with name `%s` must be created to save config' % QE_NAMESPACE
raise Exception(msg)
# Check if the secret exists. If not, then we create an empty secret and then update the file
# inside.
secret_url = 'namespaces/%s/secrets/%s' % (QE_NAMESPACE, QE_CONFIG_SECRET)
secret = self._lookup_secret()
if secret is None:
self._assert_success(self._execute_k8s_api('POST', secret_url, {
"kind": "Secret",
"apiVersion": "v1",
"metadata": {
"name": QE_CONFIG_SECRET
},
"data": {}
}))
# Update the secret to reflect the file change.
secret['data'] = secret.get('data', {})
if value is not None:
secret['data'][relative_file_path] = base64.b64encode(value)
else:
secret['data'].pop(relative_file_path)
self._assert_success(self._execute_k8s_api('PUT', secret_url, secret))
# Wait until the local mounted copy of the secret has been updated, as
# this is an eventual consistency operation, but the caller expects immediate
# consistency.
# while True:
# matching_files = set()
# for secret_filename, encoded_value in secret['data'].iteritems():
# expected_value = base64.b64decode(encoded_value)
# try:
# with self.get_volume_file(secret_filename) as f:
# contents = f.read()
#
# if contents == expected_value:
# matching_files.add(secret_filename)
#
# except IOError:
# continue
#
# if matching_files == set(secret['data'].keys()):
# break
#
# # Sleep for a second and then try again.
# time.sleep(1)
def _lookup_secret(self):
secret_url = 'namespaces/%s/secrets/%s' % (QE_NAMESPACE, QE_CONFIG_SECRET)
response = self._execute_k8s_api('GET', secret_url)
if response.status_code != 200:
return None
return json.loads(response.text)
def _execute_k8s_api(self, method, relative_url, data=None, api_prefix='api/v1'):
headers = {
'Authorization': 'Bearer ' + self._service_token
}
if data:
headers['Content-Type'] = 'application/json'
data = json.dumps(data) if data else None
session = Session()
url = 'https://%s/%s/%s' % (self._api_host, api_prefix, relative_url)
request = Request(method, url, data=data, headers=headers)
return session.send(request.prepare(), verify=False, timeout=2)
kubernetes_access_instance = KubernetesAccessInterface()

View file

@ -0,0 +1,63 @@
import pytest
import re
from httmock import urlmatch, HTTMock, response
from config_app.config_util.k8saccessor import KubernetesAccessorSingleton
from config_app.config_util.k8sconfig import KubernetesConfig
@pytest.mark.parametrize('kube_config, expected_api, expected_query', [
({'api_host':'www.customhost.com'},
'/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments', 'labelSelector=quay-enterprise-component%3Dapp'),
({'api_host':'www.customhost.com', 'qe_deployment_selector':'custom-selector'},
'/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments', 'labelSelector=quay-enterprise-component%3Dcustom-selector'),
({'api_host':'www.customhost.com', 'qe_namespace':'custom-namespace'},
'/apis/extensions/v1beta1/namespaces/custom-namespace/deployments', 'labelSelector=quay-enterprise-component%3Dapp'),
({'api_host':'www.customhost.com', 'qe_namespace':'custom-namespace', 'qe_deployment_selector':'custom-selector'},
'/apis/extensions/v1beta1/namespaces/custom-namespace/deployments', 'labelSelector=quay-enterprise-component%3Dcustom-selector'),
])
def test_get_qe_deployments(kube_config, expected_api, expected_query):
config = KubernetesConfig(**kube_config)
url_hit = [False]
@urlmatch(netloc=r'www.customhost.com')
def handler(request, _):
assert request.path == expected_api
assert request.query == expected_query
url_hit[0] = True
return response(200, '{}')
with HTTMock(handler):
KubernetesAccessorSingleton._instance = None
assert KubernetesAccessorSingleton.get_instance(config).get_qe_deployments() is not None
assert url_hit[0]
@pytest.mark.parametrize('kube_config, deployment_names, expected_api_hits', [
({'api_host':'www.customhost.com'}, [], []),
({'api_host':'www.customhost.com'}, ['myDeployment'], ['/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments/myDeployment']),
({'api_host':'www.customhost.com', 'qe_namespace':'custom-namespace'},
['myDeployment', 'otherDeployment'],
['/apis/extensions/v1beta1/namespaces/custom-namespace/deployments/myDeployment', '/apis/extensions/v1beta1/namespaces/custom-namespace/deployments/otherDeployment']),
])
def test_cycle_qe_deployments(kube_config, deployment_names, expected_api_hits):
KubernetesAccessorSingleton._instance = None
config = KubernetesConfig(**kube_config)
url_hit = [False] * len(expected_api_hits)
i = [0]
@urlmatch(netloc=r'www.customhost.com', method='PATCH')
def handler(request, _):
assert request.path == expected_api_hits[i[0]]
url_hit[i[0]] = True
i[0] += 1
return response(200, '{}')
with HTTMock(handler):
KubernetesAccessorSingleton.get_instance(config).cycle_qe_deployments(deployment_names)
assert all(url_hit)