diff --git a/buildman/manager/basemanager.py b/buildman/manager/basemanager.py index 2c57ac095..83c192d36 100644 --- a/buildman/manager/basemanager.py +++ b/buildman/manager/basemanager.py @@ -32,8 +32,9 @@ class BaseManager(object): @coroutine def schedule(self, build_job): - """ Schedules a queue item to be built. Returns True if the item was properly scheduled - and False if all workers are busy. + """ Schedules a queue item to be built. Returns a 2-tuple with (True, None) if the item was + properly scheduled and (False, a retry timeout in seconds) if all workers are busy or an + error occurs. """ raise NotImplementedError diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py index 0ce69e508..d74c50ea9 100644 --- a/buildman/manager/enterprise.py +++ b/buildman/manager/enterprise.py @@ -8,6 +8,7 @@ from buildman.manager.basemanager import BaseManager from trollius import From, Return, coroutine REGISTRATION_REALM = 'registration' +RETRY_TIMEOUT = 5 logger = logging.getLogger(__name__) class DynamicRegistrationComponent(BaseComponent): @@ -61,13 +62,13 @@ class EnterpriseManager(BaseManager): def schedule(self, build_job): """ Schedules a build for an Enterprise Registry. """ if self.shutting_down or not self.ready_components: - raise Return(False) + raise Return(False, RETRY_TIMEOUT) component = self.ready_components.pop() yield From(component.start_build(build_job)) - raise Return(True) + raise Return(True, None) @coroutine def build_component_ready(self, build_component): diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index ed6b5229a..5f3ebb68c 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -15,6 +15,7 @@ from buildman.manager.executor import PopenExecutor, EC2Executor from buildman.component.buildcomponent import BuildComponent from buildman.jobutil.buildjob import BuildJob from buildman.asyncutil import AsyncWrapper +from buildman.server import BuildJobResult from util.morecollections import AttrDict @@ -22,6 +23,8 @@ logger = logging.getLogger(__name__) ETCD_DISABLE_TIMEOUT = 0 +EC2_API_TIMEOUT = 20 +RETRY_IMMEDIATELY_TIMEOUT = 0 class EtcdAction(object): @@ -89,6 +92,7 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '') self._watch_tasks[watch_task_key] = async(watch_future) + @coroutine def _handle_builder_expiration(self, etcd_result): if etcd_result is None: return @@ -99,8 +103,25 @@ class EphemeralBuilderManager(BaseManager): job_metadata = json.loads(etcd_result._prev_node.value) if 'builder_id' in job_metadata: - logger.info('Terminating expired build node.') - async(self._executor.stop_builder(job_metadata['builder_id'])) + builder_id = job_metadata['builder_id'] + + # Before we delete the build node, we take a lock to make sure that only one manager + # can terminate the node. + try: + lock_key = self._etcd_lock_key(builder_id) + yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time())) + except (KeyError, etcd.EtcdKeyError): + logger.debug('Somebody else is cleaning up the build node: %s', builder_id) + return + + if not job_metadata.get('had_heartbeat', True): + logger.warning('Build node failed to successfully boot: %s', builder_id) + build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) + self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) + + logger.info('Terminating expired build node: %s', builder_id) + yield From(self._executor.stop_builder(builder_id)) + def _handle_realm_change(self, etcd_result): if etcd_result is None: @@ -180,6 +201,8 @@ class EphemeralBuilderManager(BaseManager): self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change) + self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/') + # Load components for all realms currently known to the cluster async(self._register_existing_realms()) @@ -213,23 +236,24 @@ class EphemeralBuilderManager(BaseManager): except (KeyError, etcd.EtcdKeyError): workers_alive = 0 except etcd.EtcdException: - logger.exception('Exception when reading job count from etcd') - raise Return(False) + logger.exception('Exception when reading job count from etcd for job: %s', build_uuid) + raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) logger.debug('Total jobs: %s', workers_alive) if workers_alive >= allowed_worker_count: - logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive, - allowed_worker_count) - raise Return(False) + logger.info('Too many workers alive, unable to start new worker for build job: %s. %s >= %s', + build_uuid, workers_alive, allowed_worker_count) + raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) job_key = self._etcd_job_key(build_job) # First try to take a lock for this job, meaning we will be responsible for its lifeline realm = str(uuid.uuid4()) token = str(uuid.uuid4()) - ttl = self.setup_time() - expiration = datetime.utcnow() + timedelta(seconds=ttl) + nonce = str(uuid.uuid4()) + setup_time = self.setup_time() + expiration = datetime.utcnow() + timedelta(seconds=setup_time) machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200) max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) @@ -237,33 +261,40 @@ class EphemeralBuilderManager(BaseManager): payload = { 'expiration': calendar.timegm(expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()), + 'nonce': nonce, + 'had_heartbeat': False, + 'job_queue_item': build_job.job_item, } + lock_payload = json.dumps(payload) try: - yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl)) + yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False, + ttl=EC2_API_TIMEOUT)) except (KeyError, etcd.EtcdKeyError): # The job was already taken by someone else, we are probably a retry - logger.exception('Job already exists in etcd, are timeouts misconfigured or is the queue broken?') - raise Return(False) + logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid) + raise Return(False, EC2_API_TIMEOUT) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) - raise Return(False) + raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) - logger.debug('Starting builder with executor: %s', self._executor) + executor_type = self._executor.__class__.__name__ + logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type) try: builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) except: logger.exception('Exception when starting builder for job: %s', build_uuid) - raise Return(False) + raise Return(False, EC2_API_TIMEOUT) # Store the builder in etcd associated with the job id try: payload['builder_id'] = builder_id - yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl)) + yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload, + ttl=setup_time)) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) - raise Return(False) + raise Return(False, EC2_API_TIMEOUT) # Store the realm spec which will allow any manager to accept this builder when it connects realm_spec = json.dumps({ @@ -275,15 +306,16 @@ class EphemeralBuilderManager(BaseManager): try: yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False, - ttl=ttl)) + ttl=setup_time)) except (KeyError, etcd.EtcdKeyError): - logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.') - raise Return(False) + logger.error('Realm %s already exists in etcd for job %s ' + + 'UUID collision or something is very very wrong.', realm, build_uuid) + raise Return(False, setup_time) except etcd.EtcdException: - logger.exception('Exception when writing realm %s to etcd', realm) - raise Return(False) + logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid) + raise Return(False, setup_time) - raise Return(True) + raise Return(True, None) @coroutine def build_component_ready(self, build_component): @@ -333,7 +365,9 @@ class EphemeralBuilderManager(BaseManager): payload = { 'expiration': calendar.timegm(new_expiration.timetuple()), 'builder_id': build_job_metadata['builder_id'], + 'job_queue_item': build_job.job_item, 'max_expiration': build_job_metadata['max_expiration'], + 'had_heartbeat': True, } yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) @@ -345,6 +379,11 @@ class EphemeralBuilderManager(BaseManager): """ return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid']) + def _etcd_lock_key(self, unique_lock_id): + """ Create a key which is used to create a temporary lock in etcd. + """ + return os.path.join(self._etcd_lock_prefix, unique_lock_id) + def _etcd_realm_key(self, realm): """ Create a key which is used to track an incoming connection on a realm. """ diff --git a/buildman/server.py b/buildman/server.py index c49fe7b00..878b8d5fd 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -169,26 +169,28 @@ class BuilderServer(object): self._queue.incomplete(job_item, restore_retry=False) continue - logger.debug('Build job found. Checking for an avaliable worker.') + logger.debug('Checking for an avaliable worker for build job %s', + build_job.repo_build.uuid) try: - scheduled = yield From(self._lifecycle_manager.schedule(build_job)) + schedule_success, retry_timeout = yield From(self._lifecycle_manager.schedule(build_job)) except: - logger.exception('Exception when scheduling job') + logger.exception('Exception when scheduling job: %s', build_job.repo_build.uuid) self._current_status = BuildServerStatus.EXCEPTION return - if scheduled: + if schedule_success: logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid) status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) - status_handler.set_phase('build-scheduled') + status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED) self._job_count = self._job_count + 1 logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid, self._job_count) else: - logger.debug('All workers are busy. Requeuing.') - self._queue.incomplete(job_item, restore_retry=True, retry_after=WORK_CHECK_TIMEOUT + 5) + logger.debug('All workers are busy for job %s Requeuing after %s seconds.', + build_job.repo_build.uuid, retry_timeout) + self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout) @trollius.coroutine def _queue_metrics_updater(self): diff --git a/data/database.py b/data/database.py index 6caae2d73..648b5058c 100644 --- a/data/database.py +++ b/data/database.py @@ -515,6 +515,7 @@ class BUILD_PHASE(object): """ Build phases enum """ ERROR = 'error' INTERNAL_ERROR = 'internalerror' + BUILD_SCHEDULED = 'build-scheduled' UNPACKING = 'unpacking' PULLING = 'pulling' BUILDING = 'building' diff --git a/test/test_buildman.py b/test/test_buildman.py index 543c5fc5d..9e7a0f825 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -5,7 +5,7 @@ import time import json from trollius import coroutine, get_event_loop, From, Future, sleep, Return -from mock import Mock +from mock import Mock, ANY from threading import Event from urllib3.exceptions import ReadTimeoutError @@ -191,13 +191,36 @@ class TestEphemeral(unittest.TestCase): expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) - self.manager._handle_builder_expiration(expired_result) - - yield From(sleep(.01)) + yield From(self.manager._handle_builder_expiration(expired_result)) self.test_executor.stop_builder.assert_called_once_with('1234') self.assertEqual(self.test_executor.stop_builder.call_count, 1) + @async_test + def test_builder_never_starts(self): + test_component = yield From(self._setup_job_for_managers()) + + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0) + + # Send a signal to the callback that a worker has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.EXPIRE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'builder_id': '1234', + 'had_heartbeat': False, + 'job_queue_item': self.mock_job.job_item, + }) + + yield From(self.manager._handle_builder_expiration(expired_result)) + + self.test_executor.stop_builder.assert_called_once_with('1234') + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + + self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE) + @async_test def test_change_worker(self): # Send a signal to the callback that a worker key has been changed @@ -233,3 +256,4 @@ class TestEphemeral(unittest.TestCase): if __name__ == '__main__': unittest.main() +