diff --git a/CHANGELOG.md b/CHANGELOG.md index 360179caa..19a5f1026 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ The following are features that have been merged, but not yet deployed: - Nothing yet! +### 1.9.6 + +- Temporary fix for etcd-related issues with the ephemeral build manager (#181) + ### 1.9.5 - Added changelog (#178) diff --git a/buildman/component/buildcomponent.py b/buildman/component/buildcomponent.py index c5fe8a24a..1d3ee078b 100644 --- a/buildman/component/buildcomponent.py +++ b/buildman/component/buildcomponent.py @@ -17,6 +17,7 @@ from buildman.jobutil.workererror import WorkerError from data.database import BUILD_PHASE HEARTBEAT_DELTA = datetime.timedelta(seconds=30) +BUILD_HEARTBEAT_DELAY = datetime.timedelta(seconds=30) HEARTBEAT_TIMEOUT = 10 INITIAL_TIMEOUT = 25 @@ -76,8 +77,8 @@ class BuildComponent(BaseComponent): @trollius.coroutine def start_build(self, build_job): """ Starts a build. """ - logger.debug('Starting build for component %s (worker version: %s)', - self.builder_realm, self._worker_version) + logger.debug('Starting build for component %s (build %s, worker version: %s)', + self.builder_realm, build_job.repo_build.uuid, self._worker_version) self._current_job = build_job self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid) @@ -150,6 +151,11 @@ class BuildComponent(BaseComponent): self.call("io.quay.builder.build", **build_arguments).add_done_callback(self._build_complete) + # Set the heartbeat for the future. If the builder never receives the build call, + # then this will cause a timeout after 30 seconds. We know the builder has registered + # by this point, so it makes sense to have a timeout. + self._last_heartbeat = datetime.datetime.utcnow() + BUILD_HEARTBEAT_DELAY + @staticmethod def _commit_sha(build_config): """ Determines whether the metadata is using an old schema or not and returns the commit. """ @@ -401,8 +407,8 @@ class BuildComponent(BaseComponent): yield From(self._timeout()) raise Return() - logger.debug('Heartbeat on realm %s is valid: %s.', self.builder_realm, - self._last_heartbeat) + logger.debug('Heartbeat on realm %s is valid: %s (%s).', self.builder_realm, + self._last_heartbeat, self._component_status) yield From(trollius.sleep(HEARTBEAT_TIMEOUT)) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 7ea9ea8eb..a1b7809c2 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -68,7 +68,8 @@ class EphemeralBuilderManager(BaseManager): super(EphemeralBuilderManager, self).__init__(*args, **kwargs) - def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True): + def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True, + restarter=None): watch_task_key = (etcd_key, recursive) def callback_wrapper(changed_key_future): new_index = start_index @@ -89,8 +90,19 @@ class EphemeralBuilderManager(BaseManager): except (ProtocolError, etcd.EtcdException): logger.exception('Exception on etcd watch: %s', etcd_key) + except etcd.EtcdEventIndexCleared: + # This happens if etcd2 has moved forward too fast for us to start watching + # at the index we retrieved. We therefore start a new watch at HEAD and + # (if specified) call the restarter method which should conduct a read and + # reset the state of the manager. + # TODO: Remove this hack once Etcd is fixed. + logger.exception('Etcd moved forward too quickly. Restarting watch cycle.') + new_index = None + if restarter is not None: + async(restarter()) + if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): - self._watch_etcd(etcd_key, change_callback, start_index=new_index) + self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter) if etcd_result: change_callback(etcd_result) @@ -165,18 +177,35 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Registering realm with manager: %s', realm_spec['realm']) component = self.register_component(realm_spec['realm'], BuildComponent, token=realm_spec['token']) + + if component in self._component_to_job: + logger.debug('Realm already registered with manager: %s', realm_spec['realm']) + return component + build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) self._component_to_job[component] = build_job self._component_to_builder[component] = realm_spec['builder_id'] self._job_uuid_to_component[build_job.job_details['build_uuid']] = component + return component @coroutine def _register_existing_realms(self): try: all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True)) + + # Register all existing realms found. + encountered = set() for realm in all_realms.children: if not realm.dir: - self._register_realm(json.loads(realm.value)) + component = self._register_realm(json.loads(realm.value)) + encountered.add(component) + + # Remove any components not encountered so we can clean up. + for found in list(self._component_to_job.keys()): + if not found in encountered: + self._component_to_job.pop(component) + self._component_to_builder.pop(component) + except (KeyError, etcd.EtcdKeyError): # no realms have been registered yet pass @@ -212,7 +241,8 @@ class EphemeralBuilderManager(BaseManager): self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration) self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') - self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change) + self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change, + restarter=self._register_existing_realms) self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/') diff --git a/buildman/server.py b/buildman/server.py index 878b8d5fd..655b78495 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -36,6 +36,7 @@ class BuilderServer(object): self._loop = None self._current_status = BuildServerStatus.STARTING self._current_components = [] + self._realm_map = {} self._job_count = 0 self._session_factory = RouterSessionFactory(RouterFactory()) @@ -111,6 +112,9 @@ class BuilderServer(object): BaseComponent. """ logger.debug('Registering component with realm %s', realm) + if realm in self._realm_map: + logger.debug('Component with realm %s already registered', realm) + return self._realm_map[realm] component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) component.server = self @@ -119,6 +123,7 @@ class BuilderServer(object): component.user_files = self._user_files component.registry_hostname = self._registry_hostname + self._realm_map[realm] = component self._current_components.append(component) self._session_factory.add(component) return component @@ -127,6 +132,7 @@ class BuilderServer(object): logger.debug('Unregistering component with realm %s and token %s', component.builder_realm, component.expected_token) + self._realm_map.pop(component.builder_realm) self._current_components.remove(component) self._session_factory.remove(component)