diff --git a/buildman/manager/basemanager.py b/buildman/manager/basemanager.py index 2c57ac095..83c192d36 100644 --- a/buildman/manager/basemanager.py +++ b/buildman/manager/basemanager.py @@ -32,8 +32,9 @@ class BaseManager(object): @coroutine def schedule(self, build_job): - """ Schedules a queue item to be built. Returns True if the item was properly scheduled - and False if all workers are busy. + """ Schedules a queue item to be built. Returns a 2-tuple with (True, None) if the item was + properly scheduled and (False, a retry timeout in seconds) if all workers are busy or an + error occurs. """ raise NotImplementedError diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py index 0ce69e508..d74c50ea9 100644 --- a/buildman/manager/enterprise.py +++ b/buildman/manager/enterprise.py @@ -8,6 +8,7 @@ from buildman.manager.basemanager import BaseManager from trollius import From, Return, coroutine REGISTRATION_REALM = 'registration' +RETRY_TIMEOUT = 5 logger = logging.getLogger(__name__) class DynamicRegistrationComponent(BaseComponent): @@ -61,13 +62,13 @@ class EnterpriseManager(BaseManager): def schedule(self, build_job): """ Schedules a build for an Enterprise Registry. """ if self.shutting_down or not self.ready_components: - raise Return(False) + raise Return(False, RETRY_TIMEOUT) component = self.ready_components.pop() yield From(component.start_build(build_job)) - raise Return(True) + raise Return(True, None) @coroutine def build_component_ready(self, build_component): diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index ed6b5229a..f5910c6b0 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -22,6 +22,8 @@ logger = logging.getLogger(__name__) ETCD_DISABLE_TIMEOUT = 0 +EC2_API_TIMEOUT = 20 +RETRY_IMMEDIATELY_TIMEOUT = 0 class EtcdAction(object): @@ -214,22 +216,23 @@ class EphemeralBuilderManager(BaseManager): workers_alive = 0 except etcd.EtcdException: logger.exception('Exception when reading job count from etcd') - raise Return(False) + raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) logger.debug('Total jobs: %s', workers_alive) if workers_alive >= allowed_worker_count: logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive, allowed_worker_count) - raise Return(False) + raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) job_key = self._etcd_job_key(build_job) # First try to take a lock for this job, meaning we will be responsible for its lifeline realm = str(uuid.uuid4()) token = str(uuid.uuid4()) - ttl = self.setup_time() - expiration = datetime.utcnow() + timedelta(seconds=ttl) + nonce = str(uuid.uuid4()) + setup_time = self.setup_time() + expiration = datetime.utcnow() + timedelta(seconds=setup_time) machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200) max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) @@ -237,17 +240,20 @@ class EphemeralBuilderManager(BaseManager): payload = { 'expiration': calendar.timegm(expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()), + 'nonce': nonce, } + lock_payload = json.dumps(payload) try: - yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl)) + yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False, + ttl=EC2_API_TIMEOUT)) except (KeyError, etcd.EtcdKeyError): # The job was already taken by someone else, we are probably a retry logger.exception('Job already exists in etcd, are timeouts misconfigured or is the queue broken?') raise Return(False) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) - raise Return(False) + raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) logger.debug('Starting builder with executor: %s', self._executor) @@ -255,15 +261,16 @@ class EphemeralBuilderManager(BaseManager): builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) except: logger.exception('Exception when starting builder for job: %s', build_uuid) - raise Return(False) + raise Return(False, EC2_API_TIMEOUT) # Store the builder in etcd associated with the job id try: payload['builder_id'] = builder_id - yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl)) + yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload, + ttl=setup_time)) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) - raise Return(False) + raise Return(False, EC2_API_TIMEOUT) # Store the realm spec which will allow any manager to accept this builder when it connects realm_spec = json.dumps({ @@ -275,15 +282,15 @@ class EphemeralBuilderManager(BaseManager): try: yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False, - ttl=ttl)) + ttl=setup_time)) except (KeyError, etcd.EtcdKeyError): logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.') - raise Return(False) + raise Return(False, setup_time) except etcd.EtcdException: logger.exception('Exception when writing realm %s to etcd', realm) raise Return(False) - raise Return(True) + raise Return(True, None) @coroutine def build_component_ready(self, build_component): diff --git a/buildman/server.py b/buildman/server.py index c49fe7b00..179be4e5d 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -172,13 +172,13 @@ class BuilderServer(object): logger.debug('Build job found. Checking for an avaliable worker.') try: - scheduled = yield From(self._lifecycle_manager.schedule(build_job)) + schedule_success, retry_timeout = yield From(self._lifecycle_manager.schedule(build_job)) except: logger.exception('Exception when scheduling job') self._current_status = BuildServerStatus.EXCEPTION return - if scheduled: + if schedule_success: logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid) status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) status_handler.set_phase('build-scheduled') @@ -188,7 +188,7 @@ class BuilderServer(object): self._job_count) else: logger.debug('All workers are busy. Requeuing.') - self._queue.incomplete(job_item, restore_retry=True, retry_after=WORK_CHECK_TIMEOUT + 5) + self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout) @trollius.coroutine def _queue_metrics_updater(self):