diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index c98cc7ef1..7bcdf3a80 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) ETCD_MAX_WATCH_TIMEOUT = 30 -ETCD_ATOMIC_OP_TIMEOUT = 120 +ETCD_ATOMIC_OP_TIMEOUT = 10000 RETRY_IMMEDIATELY_TIMEOUT = 0 NO_WORKER_AVAILABLE_TIMEOUT = 10 DEFAULT_EPHEMERAL_API_TIMEOUT = 20 @@ -186,12 +186,14 @@ class EphemeralBuilderManager(BaseManager): # If we have not yet received a heartbeat, then the node failed to boot in some way. We mark # the job as incomplete here. - if not job_metadata.get('had_heartbeat', True): + if not job_metadata.get('had_heartbeat', False): logger.warning('Build executor failed to successfully boot with execution id %s', execution_id) - # Take a lock to ensure that only one manager reports the build as incomplete. - got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid)) + # Take a lock to ensure that only one manager reports the build as incomplete for this + # execution. + got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid, + execution_id)) if got_lock: logger.warning('Marking job %s as incomplete', build_job.build_uuid) self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) @@ -419,7 +421,6 @@ class EphemeralBuilderManager(BaseManager): } lock_payload = json.dumps(payload) - logger.debug('Writing key for job %s with expiration in %s seconds', build_uuid, self._ephemeral_setup_timeout) try: @@ -427,17 +428,17 @@ class EphemeralBuilderManager(BaseManager): ttl=self._ephemeral_setup_timeout)) except (KeyError, etcd.EtcdKeyError): # The job was already taken by someone else, we are probably a retry - logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid) + logger.warning('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid) raise Return(False, self._ephemeral_api_timeout) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) + # Got a lock, now lets boot the job via one of the registered executors. started_with_executor = None execution_id = None logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors]) - for executor in self._ordered_executors: # Check if we can use this executor based on its whitelist, by namespace. namespace = build_job.namespace @@ -474,10 +475,19 @@ class EphemeralBuilderManager(BaseManager): # Break out of the loop now that we've started a builder successfully. break + # If we didn't start the job, cleanup and return it to the queue. if started_with_executor is None: logger.error('Could not start ephemeral worker for build %s', build_uuid) + + # Delete the associated build job record. + try: + yield From(self._etcd_client.delete(job_key)) + except (KeyError, etcd.EtcdKeyError): + logger.warning('Could not delete job key %s', job_key) + raise Return(False, self._ephemeral_api_timeout) + # Job was started! logger.debug('Started execution with ID %s for job: %s with executor: %s', execution_id, build_uuid, started_with_executor.name) @@ -581,7 +591,10 @@ class EphemeralBuilderManager(BaseManager): @coroutine def job_heartbeat(self, build_job): - # Extend the deadline in etcd + # Extend the queue item. + self.job_heartbeat_callback(build_job) + + # Extend the deadline in etcd. job_key = self._etcd_job_key(build_job) try: @@ -607,7 +620,6 @@ class EphemeralBuilderManager(BaseManager): # to ensure that if the TTL is < 0, the key will expire immediately. etcd_ttl = max(ttl, 0) yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl)) - self.job_heartbeat_callback(build_job) @coroutine @@ -616,7 +628,7 @@ class EphemeralBuilderManager(BaseManager): was granted and false otherwise. """ pieces = [self._etcd_lock_prefix, path] - pieces.extend(*args) + pieces.extend(args) lock_key = os.path.join(*pieces) try: diff --git a/buildman/server.py b/buildman/server.py index f94ef3a69..fcc909714 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -17,15 +17,14 @@ from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database from app import app, metric_queue -from app import app logger = logging.getLogger(__name__) WORK_CHECK_TIMEOUT = 10 TIMEOUT_PERIOD_MINUTES = 20 JOB_TIMEOUT_SECONDS = 300 -SETUP_LEEWAY_SECONDS = 10 -MINIMUM_JOB_EXTENSION = timedelta(minutes=2) +SETUP_LEEWAY_SECONDS = 30 +MINIMUM_JOB_EXTENSION = timedelta(minutes=1) HEARTBEAT_PERIOD_SEC = 30