Merge pull request #181 from coreos-inc/fixetcd

Fix etcd watching
This commit is contained in:
josephschorr 2015-06-25 22:24:55 -04:00
commit 0953bae44d
4 changed files with 54 additions and 8 deletions

View file

@ -4,6 +4,10 @@ The following are features that have been merged, but not yet deployed:
- Nothing yet! - Nothing yet!
### 1.9.6
- Temporary fix for etcd-related issues with the ephemeral build manager (#181)
### 1.9.5 ### 1.9.5
- Added changelog (#178) - Added changelog (#178)

View file

@ -17,6 +17,7 @@ from buildman.jobutil.workererror import WorkerError
from data.database import BUILD_PHASE from data.database import BUILD_PHASE
HEARTBEAT_DELTA = datetime.timedelta(seconds=30) HEARTBEAT_DELTA = datetime.timedelta(seconds=30)
BUILD_HEARTBEAT_DELAY = datetime.timedelta(seconds=30)
HEARTBEAT_TIMEOUT = 10 HEARTBEAT_TIMEOUT = 10
INITIAL_TIMEOUT = 25 INITIAL_TIMEOUT = 25
@ -76,8 +77,8 @@ class BuildComponent(BaseComponent):
@trollius.coroutine @trollius.coroutine
def start_build(self, build_job): def start_build(self, build_job):
""" Starts a build. """ """ Starts a build. """
logger.debug('Starting build for component %s (worker version: %s)', logger.debug('Starting build for component %s (build %s, worker version: %s)',
self.builder_realm, self._worker_version) self.builder_realm, build_job.repo_build.uuid, self._worker_version)
self._current_job = build_job self._current_job = build_job
self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid) self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid)
@ -150,6 +151,11 @@ class BuildComponent(BaseComponent):
self.call("io.quay.builder.build", **build_arguments).add_done_callback(self._build_complete) self.call("io.quay.builder.build", **build_arguments).add_done_callback(self._build_complete)
# Set the heartbeat for the future. If the builder never receives the build call,
# then this will cause a timeout after 30 seconds. We know the builder has registered
# by this point, so it makes sense to have a timeout.
self._last_heartbeat = datetime.datetime.utcnow() + BUILD_HEARTBEAT_DELAY
@staticmethod @staticmethod
def _commit_sha(build_config): def _commit_sha(build_config):
""" Determines whether the metadata is using an old schema or not and returns the commit. """ """ Determines whether the metadata is using an old schema or not and returns the commit. """
@ -401,8 +407,8 @@ class BuildComponent(BaseComponent):
yield From(self._timeout()) yield From(self._timeout())
raise Return() raise Return()
logger.debug('Heartbeat on realm %s is valid: %s.', self.builder_realm, logger.debug('Heartbeat on realm %s is valid: %s (%s).', self.builder_realm,
self._last_heartbeat) self._last_heartbeat, self._component_status)
yield From(trollius.sleep(HEARTBEAT_TIMEOUT)) yield From(trollius.sleep(HEARTBEAT_TIMEOUT))

View file

@ -68,7 +68,8 @@ class EphemeralBuilderManager(BaseManager):
super(EphemeralBuilderManager, self).__init__(*args, **kwargs) super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True): def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True,
restarter=None):
watch_task_key = (etcd_key, recursive) watch_task_key = (etcd_key, recursive)
def callback_wrapper(changed_key_future): def callback_wrapper(changed_key_future):
new_index = start_index new_index = start_index
@ -89,8 +90,19 @@ class EphemeralBuilderManager(BaseManager):
except (ProtocolError, etcd.EtcdException): except (ProtocolError, etcd.EtcdException):
logger.exception('Exception on etcd watch: %s', etcd_key) logger.exception('Exception on etcd watch: %s', etcd_key)
except etcd.EtcdEventIndexCleared:
# This happens if etcd2 has moved forward too fast for us to start watching
# at the index we retrieved. We therefore start a new watch at HEAD and
# (if specified) call the restarter method which should conduct a read and
# reset the state of the manager.
# TODO: Remove this hack once Etcd is fixed.
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
new_index = None
if restarter is not None:
async(restarter())
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
self._watch_etcd(etcd_key, change_callback, start_index=new_index) self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter)
if etcd_result: if etcd_result:
change_callback(etcd_result) change_callback(etcd_result)
@ -165,18 +177,35 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Registering realm with manager: %s', realm_spec['realm']) logger.debug('Registering realm with manager: %s', realm_spec['realm'])
component = self.register_component(realm_spec['realm'], BuildComponent, component = self.register_component(realm_spec['realm'], BuildComponent,
token=realm_spec['token']) token=realm_spec['token'])
if component in self._component_to_job:
logger.debug('Realm already registered with manager: %s', realm_spec['realm'])
return component
build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
self._component_to_job[component] = build_job self._component_to_job[component] = build_job
self._component_to_builder[component] = realm_spec['builder_id'] self._component_to_builder[component] = realm_spec['builder_id']
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
return component
@coroutine @coroutine
def _register_existing_realms(self): def _register_existing_realms(self):
try: try:
all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True)) all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True))
# Register all existing realms found.
encountered = set()
for realm in all_realms.children: for realm in all_realms.children:
if not realm.dir: if not realm.dir:
self._register_realm(json.loads(realm.value)) component = self._register_realm(json.loads(realm.value))
encountered.add(component)
# Remove any components not encountered so we can clean up.
for found in list(self._component_to_job.keys()):
if not found in encountered:
self._component_to_job.pop(component)
self._component_to_builder.pop(component)
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
# no realms have been registered yet # no realms have been registered yet
pass pass
@ -212,7 +241,8 @@ class EphemeralBuilderManager(BaseManager):
self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration) self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration)
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change) self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
restarter=self._register_existing_realms)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/') self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')

View file

@ -36,6 +36,7 @@ class BuilderServer(object):
self._loop = None self._loop = None
self._current_status = BuildServerStatus.STARTING self._current_status = BuildServerStatus.STARTING
self._current_components = [] self._current_components = []
self._realm_map = {}
self._job_count = 0 self._job_count = 0
self._session_factory = RouterSessionFactory(RouterFactory()) self._session_factory = RouterSessionFactory(RouterFactory())
@ -111,6 +112,9 @@ class BuilderServer(object):
BaseComponent. BaseComponent.
""" """
logger.debug('Registering component with realm %s', realm) logger.debug('Registering component with realm %s', realm)
if realm in self._realm_map:
logger.debug('Component with realm %s already registered', realm)
return self._realm_map[realm]
component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs)
component.server = self component.server = self
@ -119,6 +123,7 @@ class BuilderServer(object):
component.user_files = self._user_files component.user_files = self._user_files
component.registry_hostname = self._registry_hostname component.registry_hostname = self._registry_hostname
self._realm_map[realm] = component
self._current_components.append(component) self._current_components.append(component)
self._session_factory.add(component) self._session_factory.add(component)
return component return component
@ -127,6 +132,7 @@ class BuilderServer(object):
logger.debug('Unregistering component with realm %s and token %s', logger.debug('Unregistering component with realm %s and token %s',
component.builder_realm, component.expected_token) component.builder_realm, component.expected_token)
self._realm_map.pop(component.builder_realm)
self._current_components.remove(component) self._current_components.remove(component)
self._session_factory.remove(component) self._session_factory.remove(component)