From 52fa9aad5b3adb04a6db7b61097beb5494a4f48b Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 25 Jun 2015 21:22:39 -0400 Subject: [PATCH 1/5] Fix etcd watching Etcd can miss events on watches if they are occurring fast enough, so if we can get an exception indicating that we've missed an index, we reset the state of our local tracking structures by re-reading the *full* list and starting a new watch at HEAD --- buildman/manager/ephemeral.py | 29 ++++++++++++++++++++++++++++- buildman/server.py | 6 ++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 7ea9ea8eb..73d5450d9 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -89,6 +89,16 @@ class EphemeralBuilderManager(BaseManager): except (ProtocolError, etcd.EtcdException): logger.exception('Exception on etcd watch: %s', etcd_key) + except etcd.EtcdEventIndexCleared: + # This happens if etcd2 has moved forward too fast for us to start watching + # at the index we retrieved. We therefore start a new watch at HEAD and + # call the method to read the key and load ALL realms to handle any we might + # have missed. + # TODO: Remove this hack once Etcd is fixed. + logger.exception('Etcd moved forward too quickly. Restarting watch cycle.') + new_index = None + async(self._register_existing_realms()) + if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): self._watch_etcd(etcd_key, change_callback, start_index=new_index) @@ -165,18 +175,35 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Registering realm with manager: %s', realm_spec['realm']) component = self.register_component(realm_spec['realm'], BuildComponent, token=realm_spec['token']) + + if component in self._component_to_job: + logger.debug('Realm already registered with manager: %s', realm_spec['realm']) + return component + build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) self._component_to_job[component] = build_job self._component_to_builder[component] = realm_spec['builder_id'] self._job_uuid_to_component[build_job.job_details['build_uuid']] = component + return component @coroutine def _register_existing_realms(self): try: all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True)) + + # Register all existing realms found. + encountered = set() for realm in all_realms.children: if not realm.dir: - self._register_realm(json.loads(realm.value)) + component = self._register_realm(json.loads(realm.value)) + encountered.add(component) + + # Remove any components not encountered so we can clean up. + for found in list(self._component_to_job.keys()): + if not found in encountered: + self._component_to_job.pop(component) + self._component_to_builder.pop(component) + except (KeyError, etcd.EtcdKeyError): # no realms have been registered yet pass diff --git a/buildman/server.py b/buildman/server.py index 878b8d5fd..655b78495 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -36,6 +36,7 @@ class BuilderServer(object): self._loop = None self._current_status = BuildServerStatus.STARTING self._current_components = [] + self._realm_map = {} self._job_count = 0 self._session_factory = RouterSessionFactory(RouterFactory()) @@ -111,6 +112,9 @@ class BuilderServer(object): BaseComponent. """ logger.debug('Registering component with realm %s', realm) + if realm in self._realm_map: + logger.debug('Component with realm %s already registered', realm) + return self._realm_map[realm] component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) component.server = self @@ -119,6 +123,7 @@ class BuilderServer(object): component.user_files = self._user_files component.registry_hostname = self._registry_hostname + self._realm_map[realm] = component self._current_components.append(component) self._session_factory.add(component) return component @@ -127,6 +132,7 @@ class BuilderServer(object): logger.debug('Unregistering component with realm %s and token %s', component.builder_realm, component.expected_token) + self._realm_map.pop(component.builder_realm) self._current_components.remove(component) self._session_factory.remove(component) From 9f5f71398cdd2f2951d846cf11d037263ca9af96 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 25 Jun 2015 21:36:58 -0400 Subject: [PATCH 2/5] Abstract out the concept of a restart function --- buildman/manager/ephemeral.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 73d5450d9..1e5575a05 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -68,7 +68,8 @@ class EphemeralBuilderManager(BaseManager): super(EphemeralBuilderManager, self).__init__(*args, **kwargs) - def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True): + def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True, + restarter=None): watch_task_key = (etcd_key, recursive) def callback_wrapper(changed_key_future): new_index = start_index @@ -97,10 +98,11 @@ class EphemeralBuilderManager(BaseManager): # TODO: Remove this hack once Etcd is fixed. logger.exception('Etcd moved forward too quickly. Restarting watch cycle.') new_index = None - async(self._register_existing_realms()) + if restarter is not None: + async(restarter()) if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): - self._watch_etcd(etcd_key, change_callback, start_index=new_index) + self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter) if etcd_result: change_callback(etcd_result) @@ -239,7 +241,8 @@ class EphemeralBuilderManager(BaseManager): self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration) self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') - self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change) + self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change, + restarter=self._register_existing_realms) self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/') From ecebc063436845439c72175d0622a33289224a22 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 25 Jun 2015 21:53:42 -0400 Subject: [PATCH 3/5] Update comment now that restarter is abstracted --- buildman/manager/ephemeral.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 1e5575a05..a1b7809c2 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -93,8 +93,8 @@ class EphemeralBuilderManager(BaseManager): except etcd.EtcdEventIndexCleared: # This happens if etcd2 has moved forward too fast for us to start watching # at the index we retrieved. We therefore start a new watch at HEAD and - # call the method to read the key and load ALL realms to handle any we might - # have missed. + # (if specified) call the restarter method which should conduct a read and + # reset the state of the manager. # TODO: Remove this hack once Etcd is fixed. logger.exception('Etcd moved forward too quickly. Restarting watch cycle.') new_index = None From bead839abd3ee6be43d64fae93ec5cd61c04ad1c Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 25 Jun 2015 22:13:01 -0400 Subject: [PATCH 4/5] Make sure build components timeout if the initial connection fails --- buildman/component/buildcomponent.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/buildman/component/buildcomponent.py b/buildman/component/buildcomponent.py index c5fe8a24a..1d3ee078b 100644 --- a/buildman/component/buildcomponent.py +++ b/buildman/component/buildcomponent.py @@ -17,6 +17,7 @@ from buildman.jobutil.workererror import WorkerError from data.database import BUILD_PHASE HEARTBEAT_DELTA = datetime.timedelta(seconds=30) +BUILD_HEARTBEAT_DELAY = datetime.timedelta(seconds=30) HEARTBEAT_TIMEOUT = 10 INITIAL_TIMEOUT = 25 @@ -76,8 +77,8 @@ class BuildComponent(BaseComponent): @trollius.coroutine def start_build(self, build_job): """ Starts a build. """ - logger.debug('Starting build for component %s (worker version: %s)', - self.builder_realm, self._worker_version) + logger.debug('Starting build for component %s (build %s, worker version: %s)', + self.builder_realm, build_job.repo_build.uuid, self._worker_version) self._current_job = build_job self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid) @@ -150,6 +151,11 @@ class BuildComponent(BaseComponent): self.call("io.quay.builder.build", **build_arguments).add_done_callback(self._build_complete) + # Set the heartbeat for the future. If the builder never receives the build call, + # then this will cause a timeout after 30 seconds. We know the builder has registered + # by this point, so it makes sense to have a timeout. + self._last_heartbeat = datetime.datetime.utcnow() + BUILD_HEARTBEAT_DELAY + @staticmethod def _commit_sha(build_config): """ Determines whether the metadata is using an old schema or not and returns the commit. """ @@ -401,8 +407,8 @@ class BuildComponent(BaseComponent): yield From(self._timeout()) raise Return() - logger.debug('Heartbeat on realm %s is valid: %s.', self.builder_realm, - self._last_heartbeat) + logger.debug('Heartbeat on realm %s is valid: %s (%s).', self.builder_realm, + self._last_heartbeat, self._component_status) yield From(trollius.sleep(HEARTBEAT_TIMEOUT)) From 9bcf5c0c1fa78a2e05aede1ad304aa0328871b3b Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 25 Jun 2015 22:23:48 -0400 Subject: [PATCH 5/5] Add changelog for the deploy --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 360179caa..19a5f1026 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ The following are features that have been merged, but not yet deployed: - Nothing yet! +### 1.9.6 + +- Temporary fix for etcd-related issues with the ephemeral build manager (#181) + ### 1.9.5 - Added changelog (#178)