Add a transaction around the extend_processing call
This commit is contained in:
parent
838bfe23b1
commit
3872d29de9
5 changed files with 19 additions and 15 deletions
|
@ -120,8 +120,8 @@ class BuilderServer(object):
|
|||
self._session_factory.remove(component)
|
||||
|
||||
def _job_heartbeat(self, build_job):
|
||||
WorkQueue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS,
|
||||
minimum_extension=MINIMUM_JOB_EXTENSION)
|
||||
self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS,
|
||||
minimum_extension=MINIMUM_JOB_EXTENSION)
|
||||
|
||||
def _job_complete(self, build_job, job_status):
|
||||
if job_status == BuildJobResult.INCOMPLETE:
|
||||
|
|
|
@ -128,16 +128,12 @@ class WorkQueue(object):
|
|||
incomplete_item_obj.save()
|
||||
self._currently_processing = False
|
||||
|
||||
@staticmethod
|
||||
def extend_processing(queue_item_info, seconds_from_now, retry_count=None,
|
||||
minimum_extension=MINIMUM_EXTENSION):
|
||||
queue_item = QueueItem.get(QueueItem.id == queue_item_info.id)
|
||||
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
|
||||
def extend_processing(self, seconds_from_now, minimum_extension=MINIMUM_EXTENSION):
|
||||
with self._transaction_factory(db):
|
||||
queue_item = QueueItem.get(QueueItem.id == self.id)
|
||||
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
|
||||
|
||||
# Only actually write the new expiration to the db if it moves the expiration some minimum
|
||||
if new_expiration - queue_item.processing_expires > minimum_extension:
|
||||
if retry_count is not None:
|
||||
queue_item.retries_remaining = retry_count
|
||||
|
||||
queue_item.processing_expires = new_expiration
|
||||
queue_item.save()
|
||||
# Only actually write the new expiration to the db if it moves the expiration some minimum
|
||||
if new_expiration - queue_item.processing_expires > minimum_extension:
|
||||
queue_item.processing_expires = new_expiration
|
||||
queue_item.save()
|
|
@ -231,3 +231,6 @@ class TestEphemeral(unittest.TestCase):
|
|||
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
|
||||
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
|
||||
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -162,3 +162,8 @@ class TestQueue(QueueTestCase):
|
|||
one = self.queue.get()
|
||||
self.assertNotEqual(None, one)
|
||||
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
|
|
|
@ -98,7 +98,7 @@ class Worker(object):
|
|||
def extend_processing(self, seconds_from_now):
|
||||
with self._current_item_lock:
|
||||
if self.current_queue_item is not None:
|
||||
WorkQueue.extend_processing(self.current_queue_item, seconds_from_now)
|
||||
self._queue.extend_processing(self.current_queue_item, seconds_from_now)
|
||||
|
||||
def run_watchdog(self):
|
||||
logger.debug('Running watchdog.')
|
||||
|
|
Reference in a new issue