184 lines
6.3 KiB
Python
184 lines
6.3 KiB
Python
import os
|
|
import logging
|
|
import json
|
|
import base64
|
|
import time
|
|
|
|
from cStringIO import StringIO
|
|
from requests import Request, Session
|
|
|
|
from util.config.provider.baseprovider import CannotWriteConfigException, get_yaml
|
|
from util.config.provider.basefileprovider import BaseFileProvider
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
KUBERNETES_API_HOST = os.environ.get('KUBERNETES_SERVICE_HOST', '')
|
|
port = os.environ.get('KUBERNETES_SERVICE_PORT')
|
|
if port:
|
|
KUBERNETES_API_HOST += ':' + port
|
|
|
|
SERVICE_ACCOUNT_TOKEN_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/token'
|
|
|
|
QE_NAMESPACE = os.environ.get('QE_K8S_NAMESPACE', 'quay-enterprise')
|
|
QE_CONFIG_SECRET = os.environ.get('QE_K8S_CONFIG_SECRET', 'quay-enterprise-config-secret')
|
|
|
|
class KubernetesConfigProvider(BaseFileProvider):
|
|
""" Implementation of the config provider that reads and writes configuration
|
|
data from a Kubernetes Secret. """
|
|
def __init__(self, config_volume, yaml_filename, py_filename, api_host=None,
|
|
service_account_token_path=None):
|
|
super(KubernetesConfigProvider, self).__init__(config_volume, yaml_filename, py_filename)
|
|
service_account_token_path = service_account_token_path or SERVICE_ACCOUNT_TOKEN_PATH
|
|
api_host = api_host or KUBERNETES_API_HOST
|
|
|
|
# Load the service account token from the local store.
|
|
if not os.path.exists(service_account_token_path):
|
|
raise Exception('Cannot load Kubernetes service account token')
|
|
|
|
with open(service_account_token_path, 'r') as f:
|
|
self._service_token = f.read()
|
|
|
|
self._api_host = api_host
|
|
|
|
@property
|
|
def provider_id(self):
|
|
return 'k8s'
|
|
|
|
def get_volume_path(self, directory, filename):
|
|
# NOTE: Overridden to ensure we don't have subdirectories, which aren't supported
|
|
# in Kubernetes secrets.
|
|
return "_".join([directory.rstrip('/'), filename])
|
|
|
|
def volume_file_exists(self, relative_file_path):
|
|
if '/' in relative_file_path:
|
|
raise Exception('Expected path from get_volume_path, but found slashes')
|
|
|
|
# NOTE: Overridden because we don't have subdirectories, which aren't supported
|
|
# in Kubernetes secrets.
|
|
secret = self._lookup_secret()
|
|
if not secret or not secret.get('data'):
|
|
return False
|
|
return relative_file_path in secret['data']
|
|
|
|
def list_volume_directory(self, path):
|
|
# NOTE: Overridden because we don't have subdirectories, which aren't supported
|
|
# in Kubernetes secrets.
|
|
secret = self._lookup_secret()
|
|
|
|
if not secret:
|
|
return []
|
|
|
|
paths = []
|
|
for filename in secret.get('data', {}):
|
|
if filename.startswith(path):
|
|
paths.append(filename[len(path) + 1:])
|
|
return paths
|
|
|
|
def save_config(self, config_obj):
|
|
self._update_secret_file(self.yaml_filename, get_yaml(config_obj))
|
|
|
|
def remove_volume_file(self, relative_file_path):
|
|
try:
|
|
self._update_secret_file(relative_file_path, None)
|
|
except IOError as ioe:
|
|
raise CannotWriteConfigException(str(ioe))
|
|
|
|
def save_volume_file(self, flask_file, relative_file_path):
|
|
# Write the file to a temp location.
|
|
buf = StringIO()
|
|
try:
|
|
try:
|
|
flask_file.save(buf)
|
|
except IOError as ioe:
|
|
raise CannotWriteConfigException(str(ioe))
|
|
|
|
self._update_secret_file(relative_file_path, buf.getvalue())
|
|
finally:
|
|
buf.close()
|
|
|
|
def _assert_success(self, response):
|
|
if response.status_code != 200:
|
|
logger.error('Kubernetes API call failed with response: %s => %s', response.status_code,
|
|
response.text)
|
|
raise CannotWriteConfigException('Kubernetes API call failed: %s' % response.text)
|
|
|
|
def _update_secret_file(self, relative_file_path, value=None):
|
|
if '/' in relative_file_path:
|
|
raise Exception('Expected path from get_volume_path, but found slashes')
|
|
|
|
# Check first that the namespace for Quay Enterprise exists. If it does not, report that
|
|
# as an error, as it seems to be a common issue.
|
|
namespace_url = 'namespaces/%s' % (QE_NAMESPACE)
|
|
response = self._execute_k8s_api('GET', namespace_url)
|
|
if response.status_code // 100 != 2:
|
|
msg = 'A Kubernetes namespace with name `%s` must be created to save config' % QE_NAMESPACE
|
|
raise CannotWriteConfigException(msg)
|
|
|
|
# Check if the secret exists. If not, then we create an empty secret and then update the file
|
|
# inside.
|
|
secret_url = 'namespaces/%s/secrets/%s' % (QE_NAMESPACE, QE_CONFIG_SECRET)
|
|
secret = self._lookup_secret()
|
|
if secret is None:
|
|
self._assert_success(self._execute_k8s_api('POST', secret_url, {
|
|
"kind": "Secret",
|
|
"apiVersion": "v1",
|
|
"metadata": {
|
|
"name": QE_CONFIG_SECRET
|
|
},
|
|
"data": {}
|
|
}))
|
|
|
|
# Update the secret to reflect the file change.
|
|
secret['data'] = secret.get('data', {})
|
|
|
|
if value is not None:
|
|
secret['data'][relative_file_path] = base64.b64encode(value)
|
|
else:
|
|
secret['data'].pop(relative_file_path)
|
|
|
|
self._assert_success(self._execute_k8s_api('PUT', secret_url, secret))
|
|
|
|
# Wait until the local mounted copy of the secret has been updated, as
|
|
# this is an eventual consistency operation, but the caller expects immediate
|
|
# consistency.
|
|
while True:
|
|
matching_files = set()
|
|
for secret_filename, encoded_value in secret['data'].iteritems():
|
|
expected_value = base64.b64decode(encoded_value)
|
|
try:
|
|
with self.get_volume_file(secret_filename) as f:
|
|
contents = f.read()
|
|
|
|
if contents == expected_value:
|
|
matching_files.add(secret_filename)
|
|
except IOError:
|
|
continue
|
|
|
|
if matching_files == set(secret['data'].keys()):
|
|
break
|
|
|
|
# Sleep for a second and then try again.
|
|
time.sleep(1)
|
|
|
|
def _lookup_secret(self):
|
|
secret_url = 'namespaces/%s/secrets/%s' % (QE_NAMESPACE, QE_CONFIG_SECRET)
|
|
response = self._execute_k8s_api('GET', secret_url)
|
|
if response.status_code != 200:
|
|
return None
|
|
return json.loads(response.text)
|
|
|
|
def _execute_k8s_api(self, method, relative_url, data=None):
|
|
headers = {
|
|
'Authorization': 'Bearer ' + self._service_token
|
|
}
|
|
|
|
if data:
|
|
headers['Content-Type'] = 'application/json'
|
|
|
|
data = json.dumps(data) if data else None
|
|
session = Session()
|
|
url = 'https://%s/api/v1/%s' % (self._api_host, relative_url)
|
|
|
|
request = Request(method, url, data=data, headers=headers)
|
|
return session.send(request.prepare(), verify=False, timeout=2)
|