This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/buildman/orchestrator.py

754 lines
25 KiB
Python
Raw Permalink Normal View History

2019-11-12 16:09:47 +00:00
from abc import ABCMeta, abstractmethod
from collections import namedtuple
import datetime
import json
import logging
import re
import time
from enum import IntEnum, unique
from six import add_metaclass, iteritems
from trollius import async, coroutine, From, Return
from urllib3.exceptions import ReadTimeoutError, ProtocolError
import etcd
import redis
from buildman.asyncutil import wrap_with_threadpool
from util import slash_join
from util.expiresdict import ExpiresDict
logger = logging.getLogger(__name__)
ONE_DAY = 60 * 60 * 24
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION = 5
DEFAULT_LOCK_EXPIRATION = 10000
ETCD_READ_TIMEOUT = 5
ETCD_MAX_WATCH_TIMEOUT = 30
REDIS_EXPIRING_SUFFIX = '/expiring'
REDIS_DEFAULT_PUBSUB_KEY = 'orchestrator_events'
REDIS_EVENT_KIND_MESSAGE = 'message'
REDIS_EVENT_KIND_PMESSAGE = 'pmessage'
REDIS_NONEXPIRING_KEY = -1
# This constant defines the Redis configuration flags used to watch [K]eyspace and e[x]pired
# events on keys. For more info, see https://redis.io/topics/notifications#configuration
REDIS_KEYSPACE_EVENT_CONFIG_VALUE = 'Kx'
REDIS_KEYSPACE_EVENT_CONFIG_KEY = 'notify-keyspace-events'
REDIS_KEYSPACE_KEY_PATTERN = '__keyspace@%s__:%s'
REDIS_EXPIRED_KEYSPACE_PATTERN = slash_join(REDIS_KEYSPACE_KEY_PATTERN, REDIS_EXPIRING_SUFFIX)
REDIS_EXPIRED_KEYSPACE_REGEX = re.compile(REDIS_EXPIRED_KEYSPACE_PATTERN % (r'(\S+)', r'(\S+)'))
def orchestrator_from_config(manager_config, canceller_only=False):
"""
Allocates a new Orchestrator from the 'ORCHESTRATOR' block from provided manager config.
Checks for legacy configuration prefixed with 'ETCD_' when the 'ORCHESTRATOR' is not present.
:param manager_config: the configuration for the orchestrator
:type manager_config: dict
:rtype: :class: Orchestrator
"""
# Legacy codepath only knows how to configure etcd.
if manager_config.get('ORCHESTRATOR') is None:
manager_config['ORCHESTRATOR'] = {key: value
for (key, value) in iteritems(manager_config)
if key.startswith('ETCD_') and not key.endswith('_PREFIX')}
# Sanity check that legacy prefixes are no longer being used.
for key in manager_config['ORCHESTRATOR'].keys():
words = key.split('_')
if len(words) > 1 and words[-1].lower() == 'prefix':
raise AssertionError('legacy prefix used, use ORCHESTRATOR_PREFIX instead')
def _dict_key_prefix(d):
"""
:param d: the dict that has keys prefixed with underscore
:type d: {str: any}
:rtype: str
"""
return d.keys()[0].split('_', 1)[0].lower()
orchestrator_name = _dict_key_prefix(manager_config['ORCHESTRATOR'])
def format_key(key):
return key.lower().split('_', 1)[1]
orchestrator_kwargs = {format_key(key): value
for (key, value) in iteritems(manager_config['ORCHESTRATOR'])}
if manager_config.get('ORCHESTRATOR_PREFIX') is not None:
orchestrator_kwargs['orchestrator_prefix'] = manager_config['ORCHESTRATOR_PREFIX']
orchestrator_kwargs['canceller_only'] = canceller_only
logger.debug('attempting to create orchestrator %s with kwargs %s',
orchestrator_name, orchestrator_kwargs)
return orchestrator_by_name(orchestrator_name, **orchestrator_kwargs)
def orchestrator_by_name(name, **kwargs):
_ORCHESTRATORS = {
'etcd': Etcd2Orchestrator,
'mem': MemoryOrchestrator,
'redis': RedisOrchestrator,
}
return _ORCHESTRATORS.get(name, MemoryOrchestrator)(**kwargs)
class OrchestratorError(Exception):
pass
# TODO: replace with ConnectionError when this codebase is Python 3.
class OrchestratorConnectionError(OrchestratorError):
pass
@unique
class KeyEvent(IntEnum):
CREATE = 1
SET = 2
DELETE = 3
EXPIRE = 4
class KeyChange(namedtuple('KeyChange', ['event', 'key', 'value'])):
pass
@add_metaclass(ABCMeta)
class Orchestrator(object):
"""
Orchestrator is the interface that is used to synchronize the build states
across build managers.
This interface assumes that storage is being done by a key-value store
that supports watching for events on keys.
Missing keys should return KeyError; otherwise, errors should raise an
OrchestratorError.
:param key_prefix: the prefix of keys being watched
:type key_prefix: str
"""
@abstractmethod
def on_key_change(self, key, callback, restarter=None):
"""
The callback parameter takes in a KeyChange object as a parameter.
"""
pass
@abstractmethod
def get_prefixed_keys(self, prefix):
"""
:returns: a dict of key value pairs beginning with prefix
:rtype: {str: str}
"""
pass
@abstractmethod
def get_key(self, key):
"""
:returns: the value stored at the provided key
:rtype: str
"""
pass
@abstractmethod
def set_key(self, key, value, overwrite=False, expiration=None):
"""
:param key: the identifier for the value
:type key: str
:param value: the value being stored
:type value: str
:param overwrite: whether or not a KeyError is thrown if the key already exists
:type overwrite: bool
:param expiration: the duration in seconds that a key should be available
:type expiration: int
"""
pass
@abstractmethod
def set_key_sync(self, key, value, overwrite=False, expiration=None):
"""
set_key, but without trollius coroutines.
"""
pass
@abstractmethod
def delete_key(self, key):
"""
Deletes a key that has been set in the orchestrator.
:param key: the identifier for the key
:type key: str
"""
pass
@abstractmethod
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
"""
Takes a lock for synchronizing exclusive operations cluster-wide.
:param key: the identifier for the lock
:type key: str
:param expiration: the duration until the lock expires
:type expiration: :class:`datetime.timedelta` or int (seconds)
:returns: whether or not the lock was acquired
:rtype: bool
"""
pass
@abstractmethod
def shutdown():
"""
This function should shutdown any final resources allocated by the Orchestrator.
"""
pass
def _sleep_orchestrator():
"""
This function blocks the trollius event loop by sleeping in order to backoff if a failure
such as a ConnectionError has occurred.
"""
logger.exception('Connecting to etcd failed; sleeping for %s and then trying again',
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION)
time.sleep(ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION)
logger.exception('Connecting to etcd failed; slept for %s and now trying again',
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION)
class EtcdAction(object):
""" Enumeration of the various kinds of etcd actions we can observe via a watch. """
GET = 'get'
SET = 'set'
EXPIRE = 'expire'
UPDATE = 'update'
DELETE = 'delete'
CREATE = 'create'
COMPARE_AND_SWAP = 'compareAndSwap'
COMPARE_AND_DELETE = 'compareAndDelete'
class Etcd2Orchestrator(Orchestrator):
def __init__(self, host='127.0.0.1', port=2379, cert_and_key=None, ca_cert=None,
client_threads=5, canceller_only=False, **kwargs):
self.is_canceller_only = canceller_only
logger.debug('initializing async etcd client')
self._sync_etcd_client = etcd.Client(
host=host,
port=port,
cert=tuple(cert_and_key) if cert_and_key is not None else None,
ca_cert=ca_cert,
protocol='http' if cert_and_key is None else 'https',
read_timeout=ETCD_READ_TIMEOUT,
)
if not self.is_canceller_only:
(self._etcd_client, self._async_executor) = wrap_with_threadpool(self._sync_etcd_client,
client_threads)
logger.debug('creating initial orchestrator state')
self._shutting_down = False
self._watch_tasks = {}
@staticmethod
def _sanity_check_ttl(ttl):
"""
A TTL of < 0 in etcd results in the key *never being expired*.
We use a max here to ensure that if the TTL is < 0, the key will expire immediately.
"""
return max(ttl, 0)
def _watch_etcd(self, key, callback, restarter=None, start_index=None):
def callback_wrapper(changed_key_future):
new_index = start_index
etcd_result = None
if not changed_key_future.cancelled():
try:
etcd_result = changed_key_future.result()
existing_index = getattr(etcd_result, 'etcd_index', None)
new_index = etcd_result.modifiedIndex + 1
logger.debug('Got watch of key: %s at #%s with result: %s',
key, existing_index, etcd_result)
except ReadTimeoutError:
logger.debug('Read-timeout on etcd watch %s, rescheduling', key)
except etcd.EtcdEventIndexCleared:
# This happens if etcd2 has moved forward too fast for us to start watching at the index
# we retrieved. We therefore start a new watch at HEAD and (if specified) call the
# restarter method which should conduct a read and reset the state of the manager.
logger.debug('Etcd moved forward too quickly. Restarting watch cycle.')
new_index = None
if restarter is not None:
async(restarter())
except (KeyError, etcd.EtcdKeyError):
logger.debug('Etcd key already cleared: %s', key)
return
except etcd.EtcdConnectionFailed:
_sleep_orchestrator()
except etcd.EtcdException as eex:
# TODO: This is a quick and dirty hack and should be replaced with a proper
# exception check.
if str(eex.message).find('Read timed out') >= 0:
logger.debug('Read-timeout on etcd watch %s, rescheduling', key)
else:
logger.exception('Exception on etcd watch: %s', key)
except ProtocolError:
logger.exception('Exception on etcd watch: %s', key)
if key not in self._watch_tasks or self._watch_tasks[key].done():
self._watch_etcd(key, callback, start_index=new_index, restarter=restarter)
if etcd_result and etcd_result.value is not None:
async(callback(self._etcd_result_to_keychange(etcd_result)))
if not self._shutting_down:
logger.debug('Scheduling watch of key: %s at start index %s', key, start_index)
watch_future = self._etcd_client.watch(key, recursive=True, index=start_index,
timeout=ETCD_MAX_WATCH_TIMEOUT)
watch_future.add_done_callback(callback_wrapper)
self._watch_tasks[key] = async(watch_future)
@staticmethod
def _etcd_result_to_keychange(etcd_result):
event = Etcd2Orchestrator._etcd_result_to_keyevent(etcd_result)
return KeyChange(event, etcd_result.key, etcd_result.value)
@staticmethod
def _etcd_result_to_keyevent(etcd_result):
if etcd_result.action == EtcdAction.CREATE:
return KeyEvent.CREATE
if etcd_result.action == EtcdAction.SET:
return KeyEvent.CREATE if etcd_result.createdIndex == etcd_result.modifiedIndex else KeyEvent.SET
if etcd_result.action == EtcdAction.DELETE:
return KeyEvent.DELETE
if etcd_result.action == EtcdAction.EXPIRE:
return KeyEvent.EXPIRE
raise AssertionError('etcd action must have equivalant keyevent')
def on_key_change(self, key, callback, restarter=None):
assert not self.is_canceller_only
logger.debug('creating watch on %s', key)
self._watch_etcd(key, callback, restarter=restarter)
@coroutine
def get_prefixed_keys(self, prefix):
assert not self.is_canceller_only
try:
etcd_result = yield From(self._etcd_client.read(prefix, recursive=True))
raise Return({leaf.key: leaf.value for leaf in etcd_result.leaves})
except etcd.EtcdKeyError:
raise KeyError
except etcd.EtcdConnectionFailed as ex:
raise OrchestratorConnectionError(ex)
except etcd.EtcdException as ex:
raise OrchestratorError(ex)
@coroutine
def get_key(self, key):
assert not self.is_canceller_only
try:
# Ignore pylint: the value property on EtcdResult is added dynamically using setattr.
etcd_result = yield From(self._etcd_client.read(key))
raise Return(etcd_result.value)
except etcd.EtcdKeyError:
raise KeyError
except etcd.EtcdConnectionFailed as ex:
raise OrchestratorConnectionError(ex)
except etcd.EtcdException as ex:
raise OrchestratorError(ex)
@coroutine
def set_key(self, key, value, overwrite=False, expiration=None):
assert not self.is_canceller_only
yield From(self._etcd_client.write(key, value, prevExists=overwrite,
ttl=self._sanity_check_ttl(expiration)))
def set_key_sync(self, key, value, overwrite=False, expiration=None):
self._sync_etcd_client.write(key, value, prevExists=overwrite,
ttl=self._sanity_check_ttl(expiration))
@coroutine
def delete_key(self, key):
assert not self.is_canceller_only
try:
yield From(self._etcd_client.delete(key))
except etcd.EtcdKeyError:
raise KeyError
except etcd.EtcdConnectionFailed as ex:
raise OrchestratorConnectionError(ex)
except etcd.EtcdException as ex:
raise OrchestratorError(ex)
@coroutine
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
assert not self.is_canceller_only
try:
yield From(self._etcd_client.write(key, {}, prevExist=False,
ttl=self._sanity_check_ttl(expiration)))
raise Return(True)
except (KeyError, etcd.EtcdKeyError):
raise Return(False)
except etcd.EtcdConnectionFailed:
logger.exception('Could not get etcd atomic lock as etcd is down')
raise Return(False)
except etcd.EtcdException as ex:
raise OrchestratorError(ex)
def shutdown(self):
logger.debug('Shutting down etcd client.')
self._shutting_down = True
if self.is_canceller_only:
return
for (key, _), task in self._watch_tasks.items():
if not task.done():
logger.debug('Canceling watch task for %s', key)
task.cancel()
if self._async_executor is not None:
self._async_executor.shutdown()
class MemoryOrchestrator(Orchestrator):
def __init__(self, **kwargs):
self.state = ExpiresDict()
self.callbacks = {}
def _callbacks_prefixed(self, prefix):
return (callback for (key, callback) in iteritems(self.callbacks)
if key.startswith(prefix))
def on_key_change(self, key, callback, restarter=None):
self.callbacks[key] = callback
@coroutine
def get_prefixed_keys(self, prefix):
raise Return({k: value for (k, value) in self.state.items()
if k.startswith(prefix)})
@coroutine
def get_key(self, key):
raise Return(self.state[key])
@coroutine
def set_key(self, key, value, overwrite=False, expiration=None):
preexisting_key = 'key' in self.state
if preexisting_key and not overwrite:
raise KeyError
absolute_expiration = None
if expiration is not None:
absolute_expiration = datetime.datetime.now() + datetime.timedelta(seconds=expiration)
self.state.set(key, value, expires=absolute_expiration)
event = KeyEvent.CREATE if not preexisting_key else KeyEvent.SET
for callback in self._callbacks_prefixed(key):
yield From(callback(KeyChange(event, key, value)))
def set_key_sync(self, key, value, overwrite=False, expiration=None):
"""
set_key, but without trollius coroutines.
"""
preexisting_key = 'key' in self.state
if preexisting_key and not overwrite:
raise KeyError
absolute_expiration = None
if expiration is not None:
absolute_expiration = datetime.datetime.now() + datetime.timedelta(seconds=expiration)
self.state.set(key, value, expires=absolute_expiration)
event = KeyEvent.CREATE if not preexisting_key else KeyEvent.SET
for callback in self._callbacks_prefixed(key):
callback(KeyChange(event, key, value))
@coroutine
def delete_key(self, key):
value = self.state[key]
del self.state[key]
for callback in self._callbacks_prefixed(key):
yield From(callback(KeyChange(KeyEvent.DELETE, key, value)))
@coroutine
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
if key in self.state:
raise Return(False)
self.state.set(key, None, expires=expiration)
raise Return(True)
def shutdown(self):
self.state = None
self.callbacks = None
class RedisOrchestrator(Orchestrator):
def __init__(self, host='127.0.0.1', port=6379, password=None, db=0, cert_and_key=None,
ca_cert=None, client_threads=5, ssl=False, skip_keyspace_event_setup=False,
canceller_only=False, **kwargs):
self.is_canceller_only = canceller_only
(cert, key) = tuple(cert_and_key) if cert_and_key is not None else (None, None)
self._sync_client = redis.StrictRedis(
host=host,
port=port,
password=password,
db=db,
ssl_certfile=cert,
ssl_keyfile=key,
ssl_ca_certs=ca_cert,
ssl=ssl,
)
self._shutting_down = False
self._tasks = {}
self._watched_keys = {}
self._pubsub_key = slash_join(kwargs.get('orchestrator_prefix', ''),
REDIS_DEFAULT_PUBSUB_KEY).lstrip('/')
if not self.is_canceller_only:
(self._client, self._async_executor) = wrap_with_threadpool(self._sync_client, client_threads)
# Configure a subscription to watch events that the orchestrator manually publishes.
logger.debug('creating pubsub with key %s', self._pubsub_key)
published_pubsub = self._sync_client.pubsub()
published_pubsub.subscribe(self._pubsub_key)
(self._pubsub, self._async_executor_pub) = wrap_with_threadpool(published_pubsub)
self._watch_published_key()
# Configure a subscription to watch expired keyspace events.
if not skip_keyspace_event_setup:
self._sync_client.config_set(REDIS_KEYSPACE_EVENT_CONFIG_KEY,
REDIS_KEYSPACE_EVENT_CONFIG_VALUE)
expiring_pubsub = self._sync_client.pubsub()
expiring_pubsub.psubscribe(REDIS_EXPIRED_KEYSPACE_PATTERN % (db, '*'))
(self._pubsub_expiring, self._async_executor_ex) = wrap_with_threadpool(expiring_pubsub)
self._watch_expiring_key()
def _watch_published_key(self):
def published_callback_wrapper(event_future):
logger.debug('published callback called')
event_result = None
if not event_future.cancelled():
try:
event_result = event_future.result()
(redis_event, event_key, event_value) = event_result
logger.debug('Got watch of key: (%s, %s, %s)', redis_event, event_key, event_value)
except redis.ConnectionError:
_sleep_orchestrator()
except redis.RedisError:
logger.exception('Exception watching redis publish: %s', event_key)
# Schedule creating a new future if this one has been consumed.
if 'pub' not in self._tasks or self._tasks['pub'].done():
self._watch_published_key()
if event_result is not None and redis_event == REDIS_EVENT_KIND_MESSAGE:
keychange = self._publish_to_keychange(event_value)
for watched_key, callback in iteritems(self._watched_keys):
if keychange.key.startswith(watched_key):
async(callback(keychange))
if not self._shutting_down:
logger.debug('Scheduling watch of publish stream')
watch_future = self._pubsub.parse_response()
watch_future.add_done_callback(published_callback_wrapper)
self._tasks['pub'] = async(watch_future)
def _watch_expiring_key(self):
def expiring_callback_wrapper(event_future):
logger.debug('expiring callback called')
event_result = None
if not event_future.cancelled():
try:
event_result = event_future.result()
if self._is_expired_keyspace_event(event_result):
# Get the value of the original key before the expiration happened.
key = self._key_from_expiration(event_future)
expired_value = yield From(self._client.get(key))
# $KEY/expiring is gone, but the original key still remains, set an expiration for it
# so that other managers have time to get the event and still read the expired value.
yield From(self._client.expire(key, ONE_DAY))
except redis.ConnectionError:
_sleep_orchestrator()
except redis.RedisError:
logger.exception('Exception watching redis expirations: %s', key)
# Schedule creating a new future if this one has been consumed.
if 'expire' not in self._tasks or self._tasks['expire'].done():
self._watch_expiring_key()
if self._is_expired_keyspace_event(event_result) and expired_value is not None:
for watched_key, callback in iteritems(self._watched_keys):
if key.startswith(watched_key):
async(callback(KeyChange(KeyEvent.EXPIRE, key, expired_value)))
if not self._shutting_down:
logger.debug('Scheduling watch of expiration')
watch_future = self._pubsub_expiring.parse_response()
watch_future.add_done_callback(expiring_callback_wrapper)
self._tasks['expire'] = async(watch_future)
def on_key_change(self, key, callback, restarter=None):
assert not self.is_canceller_only
logger.debug('watching key: %s', key)
self._watched_keys[key] = callback
@staticmethod
def _is_expired_keyspace_event(event_result):
"""
Sanity check that this isn't an unrelated keyspace event.
There could be a more efficient keyspace event config to avoid this client-side filter.
"""
if event_result is None:
return False
(redis_event, _pattern, matched_key, expired) = event_result
return (redis_event == REDIS_EVENT_KIND_PMESSAGE and
expired == 'expired' and
REDIS_EXPIRED_KEYSPACE_REGEX.match(matched_key) is not None)
@staticmethod
def _key_from_expiration(event_result):
(_redis_event, _pattern, matched_key, _expired) = event_result
return REDIS_EXPIRED_KEYSPACE_REGEX.match(matched_key).groups()[1]
@staticmethod
def _publish_to_keychange(event_value):
e = json.loads(event_value)
return KeyChange(KeyEvent(e['event']), e['key'], e['value'])
@coroutine
def get_prefixed_keys(self, prefix):
assert not self.is_canceller_only
# TODO: This can probably be done with redis pipelines to make it transactional.
keys = yield From(self._client.keys(prefix + '*'))
# Yielding to the event loop is required, thus this cannot be written as a dict comprehension.
results = {}
for key in keys:
if key.endswith(REDIS_EXPIRING_SUFFIX):
continue
ttl = yield From(self._client.ttl(key))
if ttl != REDIS_NONEXPIRING_KEY:
# Only redis keys without expirations are live build manager keys.
value = yield From(self._client.get(key))
results.update({key: value})
raise Return(results)
@coroutine
def get_key(self, key):
assert not self.is_canceller_only
value = yield From(self._client.get(key))
raise Return(value)
@coroutine
def set_key(self, key, value, overwrite=False, expiration=None):
assert not self.is_canceller_only
already_exists = yield From(self._client.exists(key))
yield From(self._client.set(key, value, xx=overwrite))
if expiration is not None:
yield From(self._client.set(slash_join(key, REDIS_EXPIRING_SUFFIX), value,
xx=overwrite, ex=expiration))
key_event = KeyEvent.SET if already_exists else KeyEvent.CREATE
yield From(self._publish(event=key_event, key=key, value=value))
def set_key_sync(self, key, value, overwrite=False, expiration=None):
already_exists = self._sync_client.exists(key)
self._sync_client.set(key, value, xx=overwrite)
if expiration is not None:
self._sync_client.set(slash_join(key, REDIS_EXPIRING_SUFFIX), value,
xx=overwrite, ex=expiration)
self._sync_client.publish(self._pubsub_key, json.dumps({
'event': int(KeyEvent.SET if already_exists else KeyEvent.CREATE),
'key': key,
'value': value,
}))
@coroutine
def _publish(self, **kwargs):
kwargs['event'] = int(kwargs['event'])
event_json = json.dumps(kwargs)
logger.debug('publishing event: %s', event_json)
yield From(self._client.publish(self._pubsub_key, event_json))
@coroutine
def delete_key(self, key):
assert not self.is_canceller_only
value = yield From(self._client.get(key))
yield From(self._client.delete(key))
yield From(self._client.delete(slash_join(key, REDIS_EXPIRING_SUFFIX)))
yield From(self._publish(event=KeyEvent.DELETE, key=key, value=value))
@coroutine
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
assert not self.is_canceller_only
yield From(self.set_key(key, '', ex=expiration))
raise Return(True)
@coroutine
def shutdown(self):
logger.debug('Shutting down redis client.')
self._shutting_down = True
if self.is_canceller_only:
return
for key, task in iteritems(self._tasks):
if not task.done():
logger.debug('Canceling watch task for %s', key)
task.cancel()
if self._async_executor is not None:
self._async_executor.shutdown()
if self._async_executor_ex is not None:
self._async_executor_ex.shutdown()
if self._async_executor_pub is not None:
self._async_executor_pub.shutdown()