diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 7ea9ea8eb..73d5450d9 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -89,6 +89,16 @@ class EphemeralBuilderManager(BaseManager): except (ProtocolError, etcd.EtcdException): logger.exception('Exception on etcd watch: %s', etcd_key) + except etcd.EtcdEventIndexCleared: + # This happens if etcd2 has moved forward too fast for us to start watching + # at the index we retrieved. We therefore start a new watch at HEAD and + # call the method to read the key and load ALL realms to handle any we might + # have missed. + # TODO: Remove this hack once Etcd is fixed. + logger.exception('Etcd moved forward too quickly. Restarting watch cycle.') + new_index = None + async(self._register_existing_realms()) + if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): self._watch_etcd(etcd_key, change_callback, start_index=new_index) @@ -165,18 +175,35 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Registering realm with manager: %s', realm_spec['realm']) component = self.register_component(realm_spec['realm'], BuildComponent, token=realm_spec['token']) + + if component in self._component_to_job: + logger.debug('Realm already registered with manager: %s', realm_spec['realm']) + return component + build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) self._component_to_job[component] = build_job self._component_to_builder[component] = realm_spec['builder_id'] self._job_uuid_to_component[build_job.job_details['build_uuid']] = component + return component @coroutine def _register_existing_realms(self): try: all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True)) + + # Register all existing realms found. + encountered = set() for realm in all_realms.children: if not realm.dir: - self._register_realm(json.loads(realm.value)) + component = self._register_realm(json.loads(realm.value)) + encountered.add(component) + + # Remove any components not encountered so we can clean up. + for found in list(self._component_to_job.keys()): + if not found in encountered: + self._component_to_job.pop(component) + self._component_to_builder.pop(component) + except (KeyError, etcd.EtcdKeyError): # no realms have been registered yet pass diff --git a/buildman/server.py b/buildman/server.py index 878b8d5fd..655b78495 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -36,6 +36,7 @@ class BuilderServer(object): self._loop = None self._current_status = BuildServerStatus.STARTING self._current_components = [] + self._realm_map = {} self._job_count = 0 self._session_factory = RouterSessionFactory(RouterFactory()) @@ -111,6 +112,9 @@ class BuilderServer(object): BaseComponent. """ logger.debug('Registering component with realm %s', realm) + if realm in self._realm_map: + logger.debug('Component with realm %s already registered', realm) + return self._realm_map[realm] component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) component.server = self @@ -119,6 +123,7 @@ class BuilderServer(object): component.user_files = self._user_files component.registry_hostname = self._registry_hostname + self._realm_map[realm] = component self._current_components.append(component) self._session_factory.add(component) return component @@ -127,6 +132,7 @@ class BuilderServer(object): logger.debug('Unregistering component with realm %s and token %s', component.builder_realm, component.expected_token) + self._realm_map.pop(component.builder_realm) self._current_components.remove(component) self._session_factory.remove(component)