From 52fa9aad5b3adb04a6db7b61097beb5494a4f48b Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 25 Jun 2015 21:22:39 -0400 Subject: [PATCH] Fix etcd watching Etcd can miss events on watches if they are occurring fast enough, so if we can get an exception indicating that we've missed an index, we reset the state of our local tracking structures by re-reading the *full* list and starting a new watch at HEAD --- buildman/manager/ephemeral.py | 29 ++++++++++++++++++++++++++++- buildman/server.py | 6 ++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 7ea9ea8eb..73d5450d9 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -89,6 +89,16 @@ class EphemeralBuilderManager(BaseManager): except (ProtocolError, etcd.EtcdException): logger.exception('Exception on etcd watch: %s', etcd_key) + except etcd.EtcdEventIndexCleared: + # This happens if etcd2 has moved forward too fast for us to start watching + # at the index we retrieved. We therefore start a new watch at HEAD and + # call the method to read the key and load ALL realms to handle any we might + # have missed. + # TODO: Remove this hack once Etcd is fixed. + logger.exception('Etcd moved forward too quickly. Restarting watch cycle.') + new_index = None + async(self._register_existing_realms()) + if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): self._watch_etcd(etcd_key, change_callback, start_index=new_index) @@ -165,18 +175,35 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Registering realm with manager: %s', realm_spec['realm']) component = self.register_component(realm_spec['realm'], BuildComponent, token=realm_spec['token']) + + if component in self._component_to_job: + logger.debug('Realm already registered with manager: %s', realm_spec['realm']) + return component + build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) self._component_to_job[component] = build_job self._component_to_builder[component] = realm_spec['builder_id'] self._job_uuid_to_component[build_job.job_details['build_uuid']] = component + return component @coroutine def _register_existing_realms(self): try: all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True)) + + # Register all existing realms found. + encountered = set() for realm in all_realms.children: if not realm.dir: - self._register_realm(json.loads(realm.value)) + component = self._register_realm(json.loads(realm.value)) + encountered.add(component) + + # Remove any components not encountered so we can clean up. + for found in list(self._component_to_job.keys()): + if not found in encountered: + self._component_to_job.pop(component) + self._component_to_builder.pop(component) + except (KeyError, etcd.EtcdKeyError): # no realms have been registered yet pass diff --git a/buildman/server.py b/buildman/server.py index 878b8d5fd..655b78495 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -36,6 +36,7 @@ class BuilderServer(object): self._loop = None self._current_status = BuildServerStatus.STARTING self._current_components = [] + self._realm_map = {} self._job_count = 0 self._session_factory = RouterSessionFactory(RouterFactory()) @@ -111,6 +112,9 @@ class BuilderServer(object): BaseComponent. """ logger.debug('Registering component with realm %s', realm) + if realm in self._realm_map: + logger.debug('Component with realm %s already registered', realm) + return self._realm_map[realm] component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) component.server = self @@ -119,6 +123,7 @@ class BuilderServer(object): component.user_files = self._user_files component.registry_hostname = self._registry_hostname + self._realm_map[realm] = component self._current_components.append(component) self._session_factory.add(component) return component @@ -127,6 +132,7 @@ class BuilderServer(object): logger.debug('Unregistering component with realm %s and token %s', component.builder_realm, component.expected_token) + self._realm_map.pop(component.builder_realm) self._current_components.remove(component) self._session_factory.remove(component)