diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 5db77f662..81d864eeb 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -25,12 +25,15 @@ logger = logging.getLogger(__name__) ETCD_MAX_WATCH_TIMEOUT = 30 +ETCD_ATOMIC_OP_TIMEOUT = 30 RETRY_IMMEDIATELY_TIMEOUT = 0 NO_WORKER_AVAILABLE_TIMEOUT = 10 DEFAULT_EPHEMERAL_API_TIMEOUT = 20 DEFAULT_EPHEMERAL_SETUP_TIMEOUT = 300 + class EtcdAction(object): + """ Enumeration of the various kinds of etcd actions we can observe via a watch. """ GET = 'get' SET = 'set' EXPIRE = 'expire' @@ -70,6 +73,7 @@ class EphemeralBuilderManager(BaseManager): self._etcd_realm_prefix = None self._etcd_job_prefix = None + self._etcd_lock_prefix = None self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT self._ephemeral_setup_timeout = DEFAULT_EPHEMERAL_SETUP_TIMEOUT @@ -155,16 +159,16 @@ class EphemeralBuilderManager(BaseManager): if etcd_result is None: raise Return() - if etcd_result.action == EtcdAction.CREATE: + if etcd_result.action in (EtcdAction.CREATE, EtcdAction.SET): raise Return() elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE): - # Handle the expiration/deletion + # Handle the expiration/deletion. job_metadata = json.loads(etcd_result._prev_node.value) build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) logger.debug('Got "%s" of job %s', etcd_result.action, build_job.build_uuid) - # Pop the build info. + # Get the build info. build_info = self._build_uuid_to_info.get(build_job.build_uuid, None) if build_info is None: logger.debug('No build info for "%s" job %s (%s); probably already deleted by this manager', @@ -185,9 +189,15 @@ class EphemeralBuilderManager(BaseManager): if not job_metadata.get('had_heartbeat', True): logger.warning('Build executor failed to successfully boot with execution id %s', execution_id) - self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) - # Finally, we terminate the build execution for the job. + # Take a lock to ensure that only one manager reports the build as incomplete. + got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid)) + if got_lock: + logger.warning('Marking job %s as incomplete', build_job.build_uuid) + self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) + + # Finally, we terminate the build execution for the job. We don't do this under a lock as + # terminating a node is an atomic operation; better to make sure it is terminated than not. logger.info('Terminating expired build executor for job %s with execution id %s', build_job.build_uuid, execution_id) yield From(self.kill_builder_executor(build_job.build_uuid)) @@ -340,6 +350,8 @@ class EphemeralBuilderManager(BaseManager): self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change, restarter=self._register_existing_realms) + self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'lock/') + self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT', DEFAULT_EPHEMERAL_API_TIMEOUT) @@ -591,6 +603,23 @@ class EphemeralBuilderManager(BaseManager): yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl)) self.job_heartbeat_callback(build_job) + + @coroutine + def _take_etcd_atomic_lock(self, path, *args): + """ Takes a lock for atomic operations via etcd over the given path. Returns true if the lock + was granted and false otherwise. + """ + pieces = [self._etcd_lock_prefix, path] + pieces.extend(*args) + + lock_key = os.path.join(*pieces) + try: + yield From(self._etcd_client.write(lock_key, {}, prevExist=False, ttl=ETCD_ATOMIC_OP_TIMEOUT)) + raise Return(True) + except (KeyError, etcd.EtcdKeyError): + raise Return(False) + + def _etcd_job_key(self, build_job): """ Create a key which is used to track a job in etcd. """