From f9f60b9fafa823c67758a1c5ec20f05b92adf967 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 15 Sep 2016 14:37:45 -0400 Subject: [PATCH] Fix some issues around state in the build managers - Make sure to cleanup the job if the executor could not be started - Change the setup leeway to further ensure there isn't any crossover between the queue item timing out and the cleanup of the jobs - Make the lock used for marking jobs as internal error extremely long, but also based on the execution ID. This should ensure we don't get duplicates while allowing different executions to be handled properly. - Make sure to invoke the callback update for the queue before we run off to etcd; should reduce certain timeouts Hopefully Fixes #1836 --- buildman/manager/ephemeral.py | 32 ++++++++++++++++++++++---------- buildman/server.py | 5 ++--- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index c98cc7ef1..7bcdf3a80 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) ETCD_MAX_WATCH_TIMEOUT = 30 -ETCD_ATOMIC_OP_TIMEOUT = 120 +ETCD_ATOMIC_OP_TIMEOUT = 10000 RETRY_IMMEDIATELY_TIMEOUT = 0 NO_WORKER_AVAILABLE_TIMEOUT = 10 DEFAULT_EPHEMERAL_API_TIMEOUT = 20 @@ -186,12 +186,14 @@ class EphemeralBuilderManager(BaseManager): # If we have not yet received a heartbeat, then the node failed to boot in some way. We mark # the job as incomplete here. - if not job_metadata.get('had_heartbeat', True): + if not job_metadata.get('had_heartbeat', False): logger.warning('Build executor failed to successfully boot with execution id %s', execution_id) - # Take a lock to ensure that only one manager reports the build as incomplete. - got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid)) + # Take a lock to ensure that only one manager reports the build as incomplete for this + # execution. + got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid, + execution_id)) if got_lock: logger.warning('Marking job %s as incomplete', build_job.build_uuid) self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) @@ -419,7 +421,6 @@ class EphemeralBuilderManager(BaseManager): } lock_payload = json.dumps(payload) - logger.debug('Writing key for job %s with expiration in %s seconds', build_uuid, self._ephemeral_setup_timeout) try: @@ -427,17 +428,17 @@ class EphemeralBuilderManager(BaseManager): ttl=self._ephemeral_setup_timeout)) except (KeyError, etcd.EtcdKeyError): # The job was already taken by someone else, we are probably a retry - logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid) + logger.warning('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid) raise Return(False, self._ephemeral_api_timeout) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) + # Got a lock, now lets boot the job via one of the registered executors. started_with_executor = None execution_id = None logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors]) - for executor in self._ordered_executors: # Check if we can use this executor based on its whitelist, by namespace. namespace = build_job.namespace @@ -474,10 +475,19 @@ class EphemeralBuilderManager(BaseManager): # Break out of the loop now that we've started a builder successfully. break + # If we didn't start the job, cleanup and return it to the queue. if started_with_executor is None: logger.error('Could not start ephemeral worker for build %s', build_uuid) + + # Delete the associated build job record. + try: + yield From(self._etcd_client.delete(job_key)) + except (KeyError, etcd.EtcdKeyError): + logger.warning('Could not delete job key %s', job_key) + raise Return(False, self._ephemeral_api_timeout) + # Job was started! logger.debug('Started execution with ID %s for job: %s with executor: %s', execution_id, build_uuid, started_with_executor.name) @@ -581,7 +591,10 @@ class EphemeralBuilderManager(BaseManager): @coroutine def job_heartbeat(self, build_job): - # Extend the deadline in etcd + # Extend the queue item. + self.job_heartbeat_callback(build_job) + + # Extend the deadline in etcd. job_key = self._etcd_job_key(build_job) try: @@ -607,7 +620,6 @@ class EphemeralBuilderManager(BaseManager): # to ensure that if the TTL is < 0, the key will expire immediately. etcd_ttl = max(ttl, 0) yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl)) - self.job_heartbeat_callback(build_job) @coroutine @@ -616,7 +628,7 @@ class EphemeralBuilderManager(BaseManager): was granted and false otherwise. """ pieces = [self._etcd_lock_prefix, path] - pieces.extend(*args) + pieces.extend(args) lock_key = os.path.join(*pieces) try: diff --git a/buildman/server.py b/buildman/server.py index f94ef3a69..fcc909714 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -17,15 +17,14 @@ from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database from app import app, metric_queue -from app import app logger = logging.getLogger(__name__) WORK_CHECK_TIMEOUT = 10 TIMEOUT_PERIOD_MINUTES = 20 JOB_TIMEOUT_SECONDS = 300 -SETUP_LEEWAY_SECONDS = 10 -MINIMUM_JOB_EXTENSION = timedelta(minutes=2) +SETUP_LEEWAY_SECONDS = 30 +MINIMUM_JOB_EXTENSION = timedelta(minutes=1) HEARTBEAT_PERIOD_SEC = 30