diff --git a/buildman/server.py b/buildman/server.py index ce185fff6..72da134da 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -120,8 +120,8 @@ class BuilderServer(object): self._session_factory.remove(component) def _job_heartbeat(self, build_job): - WorkQueue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, - minimum_extension=MINIMUM_JOB_EXTENSION) + self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, + minimum_extension=MINIMUM_JOB_EXTENSION) def _job_complete(self, build_job, job_status): if job_status == BuildJobResult.INCOMPLETE: diff --git a/data/queue.py b/data/queue.py index 865511519..e560e7cd9 100644 --- a/data/queue.py +++ b/data/queue.py @@ -128,16 +128,12 @@ class WorkQueue(object): incomplete_item_obj.save() self._currently_processing = False - @staticmethod - def extend_processing(queue_item_info, seconds_from_now, retry_count=None, - minimum_extension=MINIMUM_EXTENSION): - queue_item = QueueItem.get(QueueItem.id == queue_item_info.id) - new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now) + def extend_processing(self, seconds_from_now, minimum_extension=MINIMUM_EXTENSION): + with self._transaction_factory(db): + queue_item = QueueItem.get(QueueItem.id == self.id) + new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now) - # Only actually write the new expiration to the db if it moves the expiration some minimum - if new_expiration - queue_item.processing_expires > minimum_extension: - if retry_count is not None: - queue_item.retries_remaining = retry_count - - queue_item.processing_expires = new_expiration - queue_item.save() \ No newline at end of file + # Only actually write the new expiration to the db if it moves the expiration some minimum + if new_expiration - queue_item.processing_expires > minimum_extension: + queue_item.processing_expires = new_expiration + queue_item.save() \ No newline at end of file diff --git a/test/test_buildman.py b/test/test_buildman.py index 89658f65d..a34da811e 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -231,3 +231,6 @@ class TestEphemeral(unittest.TestCase): self.job_heartbeat_callback.assert_called_once_with(self.mock_job) self.assertEqual(self.etcd_client_mock.write.call_count, 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_queue.py b/test/test_queue.py index 6c1660eb7..3d31978c8 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -162,3 +162,8 @@ class TestQueue(QueueTestCase): one = self.queue.get() self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) + + +if __name__ == '__main__': + unittest.main() + diff --git a/workers/worker.py b/workers/worker.py index 9f2851343..66ab38ba4 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -98,7 +98,7 @@ class Worker(object): def extend_processing(self, seconds_from_now): with self._current_item_lock: if self.current_queue_item is not None: - WorkQueue.extend_processing(self.current_queue_item, seconds_from_now) + self._queue.extend_processing(self.current_queue_item, seconds_from_now) def run_watchdog(self): logger.debug('Running watchdog.')