Fix the metrics so they are usable for scaling the workers down and up. Switch all datetimes which touch the database from now to utcnow. Fix the worker Dockerfile.

This commit is contained in:
Jake Moshenko 2014-05-23 14:16:26 -04:00
parent f4c488f9b6
commit 0b6552d6cc
10 changed files with 137 additions and 75 deletions

View file

@ -12,6 +12,7 @@ class WorkQueue(object):
self._queue_name = queue_name
self._reporter = reporter
self._transaction_factory = transaction_factory
self._currently_processing = False
if canonical_name_match_list is None:
self._canonical_name_match_list = []
@ -39,25 +40,21 @@ class WorkQueue(object):
def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
def _report_queue_metrics(self):
if self._reporter is None:
return
now = datetime.now()
name_match_query = self._name_match_query()
running_query = self._running_jobs(now, name_match_query)
running_count =running_query.distinct().count()
avialable_query = self._available_jobs(now, name_match_query, running_query)
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
self._reporter(running_count, running_count + available_count)
def update_metrics(self):
with self._transaction_factory(db):
self._report_queue_metrics()
if self._reporter is None:
return
now = datetime.utcnow()
name_match_query = self._name_match_query()
running_query = self._running_jobs(now, name_match_query)
running_count =running_query.distinct().count()
avialable_query = self._available_jobs(now, name_match_query, running_query)
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
self._reporter(self._currently_processing, running_count, running_count + available_count)
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
@ -72,19 +69,18 @@ class WorkQueue(object):
}
if available_after:
available_date = datetime.now() + timedelta(seconds=available_after)
available_date = datetime.utcnow() + timedelta(seconds=available_after)
params['available_after'] = available_date
with self._transaction_factory(db):
QueueItem.create(**params)
self._report_queue_metrics()
def get(self, processing_time=300):
"""
Get an available item and mark it as unavailable for the default of five
minutes.
"""
now = datetime.now()
now = datetime.utcnow()
name_match_query = self._name_match_query()
@ -99,21 +95,22 @@ class WorkQueue(object):
item.processing_expires = now + timedelta(seconds=processing_time)
item.retries_remaining -= 1
item.save()
except QueueItem.DoesNotExist:
pass
self._report_queue_metrics()
self._currently_processing = True
except QueueItem.DoesNotExist:
self._currently_processing = False
pass
return item
def complete(self, completed_item):
with self._transaction_factory(db):
completed_item.delete_instance()
self._report_queue_metrics()
self._currently_processing = False
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with self._transaction_factory(db):
retry_date = datetime.now() + timedelta(seconds=retry_after)
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
incomplete_item.available_after = retry_date
incomplete_item.available = True
@ -121,11 +118,11 @@ class WorkQueue(object):
incomplete_item.retries_remaining += 1
incomplete_item.save()
self._report_queue_metrics()
self._currently_processing = False
@staticmethod
def extend_processing(queue_item, seconds_from_now):
new_expiration = datetime.now() + timedelta(seconds=seconds_from_now)
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
# Only actually write the new expiration to the db if it moves the expiration some minimum
if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION: