This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/config/provider/k8sprovider.py
2017-07-20 14:07:06 +02:00

153 lines
No EOL
5.2 KiB
Python

import os
import logging
import json
import base64
from requests import Request, Session
from util.config.provider.baseprovider import get_yaml, CannotWriteConfigException
from util.config.provider.fileprovider import FileConfigProvider
logger = logging.getLogger(__name__)
KUBERNETES_API_HOST = os.environ.get('KUBERNETES_SERVICE_HOST', '')
port = os.environ.get('KUBERNETES_SERVICE_PORT')
if port:
KUBERNETES_API_HOST += ':' + port
SERVICE_ACCOUNT_TOKEN_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/token'
QE_NAMESPACE = os.environ.get('QE_K8S_NAMESPACE', 'quay-enterprise')
QE_CONFIG_SECRET = os.environ.get('QE_K8S_CONFIG_SECRET', 'quay-enterprise-config-secret')
class KubernetesConfigProvider(FileConfigProvider):
""" Implementation of the config provider that reads and writes configuration
data from a Kubernetes Secret. """
def __init__(self, config_volume, yaml_filename, py_filename):
super(KubernetesConfigProvider, self).__init__(config_volume, yaml_filename, py_filename)
self.yaml_filename = yaml_filename
# Load the service account token from the local store.
if not os.path.exists(SERVICE_ACCOUNT_TOKEN_PATH):
raise Exception('Cannot load Kubernetes service account token')
with open(SERVICE_ACCOUNT_TOKEN_PATH, 'r') as f:
self._service_token = f.read()
# Make sure the configuration volume exists.
if not self.volume_exists():
os.makedirs(config_volume)
@property
def provider_id(self):
return 'k8s'
def save_config(self, config_obj):
self._update_secret_file(self.yaml_filename, get_yaml(config_obj))
super(KubernetesConfigProvider, self).save_config(config_obj)
def write_volume_file(self, filename, contents):
super(KubernetesConfigProvider, self).write_volume_file(filename, contents)
try:
self._update_secret_file(filename, contents)
except IOError as ioe:
raise CannotWriteConfigException(str(ioe))
def volume_file_exists(self, filename):
secret = self._lookup_secret()
if not secret or not secret.get('data'):
return False
return filename in secret['data']
def list_volume_directory(self, path):
secret = self._lookup_secret()
if not secret:
return []
paths = []
for filename in secret.get('data', {}):
if filename.startswith(path):
paths.append(filename[len(path) + 1:])
return paths
def remove_volume_file(self, filename):
super(KubernetesConfigProvider, self).remove_volume_file(filename)
try:
self._update_secret_file(filename, None)
except IOError as ioe:
raise CannotWriteConfigException(str(ioe))
def save_volume_file(self, filename, flask_file):
filepath = super(KubernetesConfigProvider, self).save_volume_file(filename, flask_file)
with open(filepath, 'r') as f:
self.write_volume_file(filename, f.read())
def _assert_success(self, response):
if response.status_code != 200:
logger.error('Kubernetes API call failed with response: %s => %s', response.status_code,
response.text)
raise CannotWriteConfigException('Kubernetes API call failed: %s' % response.text)
def _update_secret_file(self, filename, value=None):
# Check first that the namespace for Quay Enterprise exists. If it does not, report that
# as an error, as it seems to be a common issue.
namespace_url = 'namespaces/%s' % (QE_NAMESPACE)
response = self._execute_k8s_api('GET', namespace_url)
if response.status_code // 100 != 2:
msg = 'A Kubernetes namespace with name `%s` must be created to save config' % QE_NAMESPACE
raise CannotWriteConfigException(msg)
# Check if the secret exists. If not, then we create an empty secret and then update the file
# inside.
secret_url = 'namespaces/%s/secrets/%s' % (QE_NAMESPACE, QE_CONFIG_SECRET)
secret = self._lookup_secret()
if secret is None:
self._assert_success(self._execute_k8s_api('POST', secret_url, {
"kind": "Secret",
"apiVersion": "v1",
"metadata": {
"name": QE_CONFIG_SECRET
},
"data": {}
}))
# Update the secret to reflect the file change.
secret['data'] = secret.get('data', {})
if value is not None:
secret['data'][filename] = base64.b64encode(value)
else:
secret['data'].pop(filename)
self._assert_success(self._execute_k8s_api('PUT', secret_url, secret))
def _lookup_secret(self):
secret_url = 'namespaces/%s/secrets/%s' % (QE_NAMESPACE, QE_CONFIG_SECRET)
response = self._execute_k8s_api('GET', secret_url)
if response.status_code != 200:
return None
return json.loads(response.text)
def _execute_k8s_api(self, method, relative_url, data=None):
headers = {
'Authorization': 'Bearer ' + self._service_token
}
if data:
headers['Content-Type'] = 'application/json'
data = json.dumps(data) if data else None
session = Session()
url = 'https://%s/api/v1/%s' % (KUBERNETES_API_HOST, relative_url)
request = Request(method, url, data=data, headers=headers)
return session.send(request.prepare(), verify=False, timeout=2)
def get_volume_path(self, directory, filename):
return "_".join([directory.rstrip('/'), filename])