From d31e25d5cdec2570ccc2be00d891ee3d2e13470c Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Wed, 10 Jun 2015 14:17:32 -0400 Subject: [PATCH 1/5] Allow the individual build manager types to specify how long the queue should wait before retring a job that fails to schedule. --- buildman/manager/basemanager.py | 5 +++-- buildman/manager/enterprise.py | 5 +++-- buildman/manager/ephemeral.py | 31 +++++++++++++++++++------------ buildman/server.py | 6 +++--- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/buildman/manager/basemanager.py b/buildman/manager/basemanager.py index 2c57ac095..83c192d36 100644 --- a/buildman/manager/basemanager.py +++ b/buildman/manager/basemanager.py @@ -32,8 +32,9 @@ class BaseManager(object): @coroutine def schedule(self, build_job): - """ Schedules a queue item to be built. Returns True if the item was properly scheduled - and False if all workers are busy. + """ Schedules a queue item to be built. Returns a 2-tuple with (True, None) if the item was + properly scheduled and (False, a retry timeout in seconds) if all workers are busy or an + error occurs. """ raise NotImplementedError diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py index 0ce69e508..d74c50ea9 100644 --- a/buildman/manager/enterprise.py +++ b/buildman/manager/enterprise.py @@ -8,6 +8,7 @@ from buildman.manager.basemanager import BaseManager from trollius import From, Return, coroutine REGISTRATION_REALM = 'registration' +RETRY_TIMEOUT = 5 logger = logging.getLogger(__name__) class DynamicRegistrationComponent(BaseComponent): @@ -61,13 +62,13 @@ class EnterpriseManager(BaseManager): def schedule(self, build_job): """ Schedules a build for an Enterprise Registry. """ if self.shutting_down or not self.ready_components: - raise Return(False) + raise Return(False, RETRY_TIMEOUT) component = self.ready_components.pop() yield From(component.start_build(build_job)) - raise Return(True) + raise Return(True, None) @coroutine def build_component_ready(self, build_component): diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index ed6b5229a..f5910c6b0 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -22,6 +22,8 @@ logger = logging.getLogger(__name__) ETCD_DISABLE_TIMEOUT = 0 +EC2_API_TIMEOUT = 20 +RETRY_IMMEDIATELY_TIMEOUT = 0 class EtcdAction(object): @@ -214,22 +216,23 @@ class EphemeralBuilderManager(BaseManager): workers_alive = 0 except etcd.EtcdException: logger.exception('Exception when reading job count from etcd') - raise Return(False) + raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) logger.debug('Total jobs: %s', workers_alive) if workers_alive >= allowed_worker_count: logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive, allowed_worker_count) - raise Return(False) + raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) job_key = self._etcd_job_key(build_job) # First try to take a lock for this job, meaning we will be responsible for its lifeline realm = str(uuid.uuid4()) token = str(uuid.uuid4()) - ttl = self.setup_time() - expiration = datetime.utcnow() + timedelta(seconds=ttl) + nonce = str(uuid.uuid4()) + setup_time = self.setup_time() + expiration = datetime.utcnow() + timedelta(seconds=setup_time) machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200) max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) @@ -237,17 +240,20 @@ class EphemeralBuilderManager(BaseManager): payload = { 'expiration': calendar.timegm(expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()), + 'nonce': nonce, } + lock_payload = json.dumps(payload) try: - yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl)) + yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False, + ttl=EC2_API_TIMEOUT)) except (KeyError, etcd.EtcdKeyError): # The job was already taken by someone else, we are probably a retry logger.exception('Job already exists in etcd, are timeouts misconfigured or is the queue broken?') raise Return(False) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) - raise Return(False) + raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) logger.debug('Starting builder with executor: %s', self._executor) @@ -255,15 +261,16 @@ class EphemeralBuilderManager(BaseManager): builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) except: logger.exception('Exception when starting builder for job: %s', build_uuid) - raise Return(False) + raise Return(False, EC2_API_TIMEOUT) # Store the builder in etcd associated with the job id try: payload['builder_id'] = builder_id - yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl)) + yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload, + ttl=setup_time)) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) - raise Return(False) + raise Return(False, EC2_API_TIMEOUT) # Store the realm spec which will allow any manager to accept this builder when it connects realm_spec = json.dumps({ @@ -275,15 +282,15 @@ class EphemeralBuilderManager(BaseManager): try: yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False, - ttl=ttl)) + ttl=setup_time)) except (KeyError, etcd.EtcdKeyError): logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.') - raise Return(False) + raise Return(False, setup_time) except etcd.EtcdException: logger.exception('Exception when writing realm %s to etcd', realm) raise Return(False) - raise Return(True) + raise Return(True, None) @coroutine def build_component_ready(self, build_component): diff --git a/buildman/server.py b/buildman/server.py index c49fe7b00..179be4e5d 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -172,13 +172,13 @@ class BuilderServer(object): logger.debug('Build job found. Checking for an avaliable worker.') try: - scheduled = yield From(self._lifecycle_manager.schedule(build_job)) + schedule_success, retry_timeout = yield From(self._lifecycle_manager.schedule(build_job)) except: logger.exception('Exception when scheduling job') self._current_status = BuildServerStatus.EXCEPTION return - if scheduled: + if schedule_success: logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid) status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) status_handler.set_phase('build-scheduled') @@ -188,7 +188,7 @@ class BuilderServer(object): self._job_count) else: logger.debug('All workers are busy. Requeuing.') - self._queue.incomplete(job_item, restore_retry=True, retry_after=WORK_CHECK_TIMEOUT + 5) + self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout) @trollius.coroutine def _queue_metrics_updater(self): From 884fedd22931e30eea0ddb00074d49817b46b78e Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Wed, 10 Jun 2015 14:18:12 -0400 Subject: [PATCH 2/5] Improve the log messages in the buildman. --- buildman/manager/ephemeral.py | 20 +++++++++++--------- buildman/server.py | 8 +++++--- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index f5910c6b0..902da86f0 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -215,14 +215,14 @@ class EphemeralBuilderManager(BaseManager): except (KeyError, etcd.EtcdKeyError): workers_alive = 0 except etcd.EtcdException: - logger.exception('Exception when reading job count from etcd') + logger.exception('Exception when reading job count from etcd for job: %s', build_uuid) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) logger.debug('Total jobs: %s', workers_alive) if workers_alive >= allowed_worker_count: - logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive, - allowed_worker_count) + logger.info('Too many workers alive, unable to start new worker for build job: %s. %s >= %s', + build_uuid, workers_alive, allowed_worker_count) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) job_key = self._etcd_job_key(build_job) @@ -249,13 +249,14 @@ class EphemeralBuilderManager(BaseManager): ttl=EC2_API_TIMEOUT)) except (KeyError, etcd.EtcdKeyError): # The job was already taken by someone else, we are probably a retry - logger.exception('Job already exists in etcd, are timeouts misconfigured or is the queue broken?') - raise Return(False) + logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid) + raise Return(False, EC2_API_TIMEOUT) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) - logger.debug('Starting builder with executor: %s', self._executor) + executor_type = self._executor.__class__.__name__ + logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type) try: builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) @@ -284,11 +285,12 @@ class EphemeralBuilderManager(BaseManager): yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False, ttl=setup_time)) except (KeyError, etcd.EtcdKeyError): - logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.') + logger.error('Realm %s already exists in etcd for job %s ' + + 'UUID collision or something is very very wrong.', realm, build_uuid) raise Return(False, setup_time) except etcd.EtcdException: - logger.exception('Exception when writing realm %s to etcd', realm) - raise Return(False) + logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid) + raise Return(False, setup_time) raise Return(True, None) diff --git a/buildman/server.py b/buildman/server.py index 179be4e5d..fa8f4ea90 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -169,12 +169,13 @@ class BuilderServer(object): self._queue.incomplete(job_item, restore_retry=False) continue - logger.debug('Build job found. Checking for an avaliable worker.') + logger.debug('Checking for an avaliable worker for build job %s', + build_job.repo_build.uuid) try: schedule_success, retry_timeout = yield From(self._lifecycle_manager.schedule(build_job)) except: - logger.exception('Exception when scheduling job') + logger.exception('Exception when scheduling job: %s', build_job.repo_build.uuid) self._current_status = BuildServerStatus.EXCEPTION return @@ -187,7 +188,8 @@ class BuilderServer(object): logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid, self._job_count) else: - logger.debug('All workers are busy. Requeuing.') + logger.debug('All workers are busy for job %s Requeuing after %s seconds.', + build_job.repo_build.uuid, retry_timeout) self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout) @trollius.coroutine From 79f1181a635384cf16c50df2f460b2f66cae66c4 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Thu, 14 May 2015 15:37:16 -0400 Subject: [PATCH 3/5] Switch build-scheduled to an official build phase. --- buildman/server.py | 2 +- data/database.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/buildman/server.py b/buildman/server.py index fa8f4ea90..878b8d5fd 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -182,7 +182,7 @@ class BuilderServer(object): if schedule_success: logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid) status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) - status_handler.set_phase('build-scheduled') + status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED) self._job_count = self._job_count + 1 logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid, diff --git a/data/database.py b/data/database.py index 6caae2d73..648b5058c 100644 --- a/data/database.py +++ b/data/database.py @@ -515,6 +515,7 @@ class BUILD_PHASE(object): """ Build phases enum """ ERROR = 'error' INTERNAL_ERROR = 'internalerror' + BUILD_SCHEDULED = 'build-scheduled' UNPACKING = 'unpacking' PULLING = 'pulling' BUILDING = 'building' From f767fc4d03d674d02fa92ca5f36ea6d40f3b0871 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Wed, 20 May 2015 11:32:37 -0400 Subject: [PATCH 4/5] Track whether builders ever came online in etcd. Mark builds which never successfully heartbeated as incomplete. --- buildman/manager/ephemeral.py | 32 ++++++++++++++++++++++++++++++-- test/test_buildman.py | 32 ++++++++++++++++++++++++++++---- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 902da86f0..9a17b38fe 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -15,6 +15,7 @@ from buildman.manager.executor import PopenExecutor, EC2Executor from buildman.component.buildcomponent import BuildComponent from buildman.jobutil.buildjob import BuildJob from buildman.asyncutil import AsyncWrapper +from buildman.server import BuildJobResult from util.morecollections import AttrDict @@ -91,6 +92,7 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '') self._watch_tasks[watch_task_key] = async(watch_future) + @coroutine def _handle_builder_expiration(self, etcd_result): if etcd_result is None: return @@ -101,8 +103,23 @@ class EphemeralBuilderManager(BaseManager): job_metadata = json.loads(etcd_result._prev_node.value) if 'builder_id' in job_metadata: - logger.info('Terminating expired build node.') - async(self._executor.stop_builder(job_metadata['builder_id'])) + builder_id = job_metadata['builder_id'] + + try: + lock_key = self._etcd_lock_key(builder_id) + yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time())) + except (KeyError, etcd.EtcdKeyError): + logger.debug('Somebody else is cleaning up the build node: %s', builder_id) + return + + if not job_metadata.get('had_heartbeat', True): + logger.warning('Build node failed to successfully boot: %s', builder_id) + build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) + self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) + + logger.info('Terminating expired build node: %s', builder_id) + yield From(self._executor.stop_builder(builder_id)) + def _handle_realm_change(self, etcd_result): if etcd_result is None: @@ -182,6 +199,8 @@ class EphemeralBuilderManager(BaseManager): self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change) + self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/') + # Load components for all realms currently known to the cluster async(self._register_existing_realms()) @@ -241,6 +260,8 @@ class EphemeralBuilderManager(BaseManager): 'expiration': calendar.timegm(expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()), 'nonce': nonce, + 'had_heartbeat': False, + 'job_queue_item': build_job.job_item, } lock_payload = json.dumps(payload) @@ -342,7 +363,9 @@ class EphemeralBuilderManager(BaseManager): payload = { 'expiration': calendar.timegm(new_expiration.timetuple()), 'builder_id': build_job_metadata['builder_id'], + 'job_queue_item': build_job.job_item, 'max_expiration': build_job_metadata['max_expiration'], + 'had_heartbeat': True, } yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) @@ -354,6 +377,11 @@ class EphemeralBuilderManager(BaseManager): """ return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid']) + def _etcd_lock_key(self, unique_lock_id): + """ Create a key which is used to create a temporary lock in etcd. + """ + return os.path.join(self._etcd_lock_prefix, unique_lock_id) + def _etcd_realm_key(self, realm): """ Create a key which is used to track an incoming connection on a realm. """ diff --git a/test/test_buildman.py b/test/test_buildman.py index 543c5fc5d..9e7a0f825 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -5,7 +5,7 @@ import time import json from trollius import coroutine, get_event_loop, From, Future, sleep, Return -from mock import Mock +from mock import Mock, ANY from threading import Event from urllib3.exceptions import ReadTimeoutError @@ -191,13 +191,36 @@ class TestEphemeral(unittest.TestCase): expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) - self.manager._handle_builder_expiration(expired_result) - - yield From(sleep(.01)) + yield From(self.manager._handle_builder_expiration(expired_result)) self.test_executor.stop_builder.assert_called_once_with('1234') self.assertEqual(self.test_executor.stop_builder.call_count, 1) + @async_test + def test_builder_never_starts(self): + test_component = yield From(self._setup_job_for_managers()) + + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0) + + # Send a signal to the callback that a worker has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.EXPIRE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'builder_id': '1234', + 'had_heartbeat': False, + 'job_queue_item': self.mock_job.job_item, + }) + + yield From(self.manager._handle_builder_expiration(expired_result)) + + self.test_executor.stop_builder.assert_called_once_with('1234') + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + + self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE) + @async_test def test_change_worker(self): # Send a signal to the callback that a worker key has been changed @@ -233,3 +256,4 @@ class TestEphemeral(unittest.TestCase): if __name__ == '__main__': unittest.main() + From c435f5c127497274aeadd97f2bbc824e85863b5d Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Wed, 10 Jun 2015 15:44:26 -0400 Subject: [PATCH 5/5] Add a comment about why we are taking a lock when terminating a builder machine. --- buildman/manager/ephemeral.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 9a17b38fe..5f3ebb68c 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -105,6 +105,8 @@ class EphemeralBuilderManager(BaseManager): if 'builder_id' in job_metadata: builder_id = job_metadata['builder_id'] + # Before we delete the build node, we take a lock to make sure that only one manager + # can terminate the node. try: lock_key = self._etcd_lock_key(builder_id) yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time()))