Add a bulk insert for queue and notifications.

Use it for Clair spawned notifications.
This commit is contained in:
Jake Moshenko 2016-12-05 23:58:20 -05:00
parent eb363876cd
commit 21e3001446
4 changed files with 110 additions and 36 deletions

View file

@ -1,10 +1,12 @@
from datetime import datetime, timedelta
from contextlib import contextmanager
from data.database import QueueItem, db, db_for_update, db_random_func
from util.morecollections import AttrDict
MINIMUM_EXTENSION = timedelta(seconds=20)
DEFAULT_BATCH_SIZE = 1000
class NoopWith:
@ -160,18 +162,45 @@ class WorkQueue(object):
except QueueItem.DoesNotExist:
return False
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added.
"""
item = QueueItem.create(
def _queue_body(self, canonical_name_list, message, available_after, retries_remaining):
return dict(
queue_name=self._canonical_name([self._queue_name] + canonical_name_list),
body=message,
retries_remaining=retries_remaining,
available_after=datetime.utcnow() + timedelta(seconds=available_after or 0),
)
@contextmanager
def batch_insert(self, batch_size=DEFAULT_BATCH_SIZE):
items_to_insert = []
def batch_put(canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added.
"""
items_to_insert.append(self._queue_body(canonical_name_list, message, available_after,
retries_remaining))
yield batch_put
# Chunk the inserted items into batch_size chunks and insert_many
remaining = list(items_to_insert)
while remaining:
QueueItem.insert_many(remaining[0:batch_size]).execute()
remaining = remaining[batch_size:]
if self._metric_queue:
self._metric_queue.put_deprecated('Added', len(items_to_insert),
dimensions={'queue': self._queue_name})
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added.
"""
item = QueueItem.create(**self._queue_body(canonical_name_list, message, available_after,
retries_remaining))
if self._metric_queue:
self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name})