Fix ephemeral build manager to ask for watches in index order with no gaps
This commit is contained in:
		
							parent
							
								
									88949f773c
								
							
						
					
					
						commit
						b4c39e8ec0
					
				
					 2 changed files with 31 additions and 18 deletions
				
			
		|  | @ -68,28 +68,41 @@ class EphemeralBuilderManager(BaseManager): | ||||||
| 
 | 
 | ||||||
|     super(EphemeralBuilderManager, self).__init__(*args, **kwargs) |     super(EphemeralBuilderManager, self).__init__(*args, **kwargs) | ||||||
| 
 | 
 | ||||||
|   def _watch_etcd(self, etcd_key, change_callback, recursive=True): |   def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True): | ||||||
|     watch_task_key = (etcd_key, recursive) |     watch_task_key = (etcd_key, recursive) | ||||||
|     def callback_wrapper(changed_key_future): |     def callback_wrapper(changed_key_future): | ||||||
|       if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): |       new_index = start_index | ||||||
|         self._watch_etcd(etcd_key, change_callback) |       etcd_result = None | ||||||
| 
 |  | ||||||
|       if changed_key_future.cancelled(): |  | ||||||
|         # Due to lack of interest, tomorrow has been cancelled |  | ||||||
|         return |  | ||||||
| 
 | 
 | ||||||
|  |       if not changed_key_future.cancelled(): | ||||||
|         try: |         try: | ||||||
|           etcd_result = changed_key_future.result() |           etcd_result = changed_key_future.result() | ||||||
|       except (ReadTimeoutError, ProtocolError, etcd.EtcdException): |           existing_index = getattr(etcd_result, 'etcd_index', None) | ||||||
|         return |           new_index = etcd_result.modifiedIndex + 1 | ||||||
| 
 | 
 | ||||||
|  |           logger.debug('Got watch of key: %s%s at #%s with result: %s', etcd_key, | ||||||
|  |                        '*' if recursive else '', existing_index, etcd_result) | ||||||
|  | 
 | ||||||
|  |         except ReadTimeoutError: | ||||||
|  |           logger.debug('Read-timeout on etcd watch: %s', etcd_key) | ||||||
|  | 
 | ||||||
|  |         except (ProtocolError, etcd.EtcdException): | ||||||
|  |           logger.exception('Exception on etcd watch: %s', etcd_key) | ||||||
|  | 
 | ||||||
|  |       if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): | ||||||
|  |         self._watch_etcd(etcd_key, change_callback, start_index=new_index) | ||||||
|  | 
 | ||||||
|  |       if etcd_result: | ||||||
|         change_callback(etcd_result) |         change_callback(etcd_result) | ||||||
| 
 | 
 | ||||||
|     if not self._shutting_down: |     if not self._shutting_down: | ||||||
|       watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, |       logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key, | ||||||
|  |                    '*' if recursive else '', start_index) | ||||||
|  | 
 | ||||||
|  |       watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index, | ||||||
|                                              timeout=ETCD_DISABLE_TIMEOUT) |                                              timeout=ETCD_DISABLE_TIMEOUT) | ||||||
|       watch_future.add_done_callback(callback_wrapper) |       watch_future.add_done_callback(callback_wrapper) | ||||||
|       logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '') | 
 | ||||||
|       self._watch_tasks[watch_task_key] = async(watch_future) |       self._watch_tasks[watch_task_key] = async(watch_future) | ||||||
| 
 | 
 | ||||||
|   @coroutine |   @coroutine | ||||||
|  | @ -329,7 +342,7 @@ class EphemeralBuilderManager(BaseManager): | ||||||
|                    job.job_details['build_uuid'], build_component.builder_realm) |                    job.job_details['build_uuid'], build_component.builder_realm) | ||||||
|       yield From(build_component.start_build(job)) |       yield From(build_component.start_build(job)) | ||||||
|     except (KeyError, etcd.EtcdKeyError): |     except (KeyError, etcd.EtcdKeyError): | ||||||
|       logger.exception('Builder is asking for more work, but work already completed') |       logger.warning('Builder is asking for more work, but work already completed') | ||||||
| 
 | 
 | ||||||
|   def build_component_disposed(self, build_component, timed_out): |   def build_component_disposed(self, build_component, timed_out): | ||||||
|     logger.debug('Calling build_component_disposed.') |     logger.debug('Calling build_component_disposed.') | ||||||
|  |  | ||||||
|  | @ -104,7 +104,7 @@ class TestEphemeral(unittest.TestCase): | ||||||
|   @coroutine |   @coroutine | ||||||
|   def _setup_job_for_managers(self): |   def _setup_job_for_managers(self): | ||||||
|     # Test that we are watching the realm location before anything else happens |     # Test that we are watching the realm location before anything else happens | ||||||
|     self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0) |     self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0, index=None) | ||||||
| 
 | 
 | ||||||
|     self.etcd_client_mock.read = Mock(side_effect=KeyError) |     self.etcd_client_mock.read = Mock(side_effect=KeyError) | ||||||
|     test_component = Mock(spec=BuildComponent) |     test_component = Mock(spec=BuildComponent) | ||||||
|  | @ -182,7 +182,7 @@ class TestEphemeral(unittest.TestCase): | ||||||
|   @async_test |   @async_test | ||||||
|   def test_expiring_worker(self): |   def test_expiring_worker(self): | ||||||
|     # Test that we are watching before anything else happens |     # Test that we are watching before anything else happens | ||||||
|     self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0) |     self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None) | ||||||
| 
 | 
 | ||||||
|     # Send a signal to the callback that a worker has expired |     # Send a signal to the callback that a worker has expired | ||||||
|     expired_result = Mock(spec=etcd.EtcdResult) |     expired_result = Mock(spec=etcd.EtcdResult) | ||||||
|  | @ -201,7 +201,7 @@ class TestEphemeral(unittest.TestCase): | ||||||
|     test_component = yield From(self._setup_job_for_managers()) |     test_component = yield From(self._setup_job_for_managers()) | ||||||
| 
 | 
 | ||||||
|     # Test that we are watching before anything else happens |     # Test that we are watching before anything else happens | ||||||
|     self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0) |     self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None) | ||||||
| 
 | 
 | ||||||
|     # Send a signal to the callback that a worker has expired |     # Send a signal to the callback that a worker has expired | ||||||
|     expired_result = Mock(spec=etcd.EtcdResult) |     expired_result = Mock(spec=etcd.EtcdResult) | ||||||
|  |  | ||||||
		Reference in a new issue