diff --git a/data/queue.py b/data/queue.py index f05ee9c8f..b2e6fa976 100644 --- a/data/queue.py +++ b/data/queue.py @@ -274,3 +274,21 @@ class WorkQueue(object): except QueueItem.DoesNotExist: return + +def delete_expired(expiration_threshold, deletion_threshold, batch_size): + """ + Deletes all queue items that are older than the provided expiration threshold in batches of the + provided size. If there are less items than the deletion threshold, this method does nothing. + + Returns the number of items deleted. + """ + to_delete = list(QueueItem + .select() + .where(QueueItem.processing_expires <= expiration_threshold) + .limit(batch_size)) + + if len(to_delete) < deletion_threshold: + return 0 + + QueueItem.delete().where(QueueItem.id << to_delete).execute() + return len(to_delete) diff --git a/workers/queuecleanupworker.py b/workers/queuecleanupworker.py index 2e453f3d0..ad5b3784d 100644 --- a/workers/queuecleanupworker.py +++ b/workers/queuecleanupworker.py @@ -1,10 +1,13 @@ import logging -from app import app -from data.database import UseThenDisconnect, QueueItem -from workers.worker import Worker from datetime import timedelta, datetime +from app import app +from data.database import UseThenDisconnect +from data.queue import delete_expired +from workers.worker import Worker + + logger = logging.getLogger(__name__) DELETION_DATE_THRESHOLD = timedelta(days=7) @@ -12,6 +15,7 @@ DELETION_COUNT_THRESHOLD = 50 BATCH_SIZE = 500 QUEUE_CLEANUP_FREQUENCY = app.config.get('QUEUE_CLEANUP_FREQUENCY', 60*60*24) + class QueueCleanupWorker(Worker): def __init__(self): super(QueueCleanupWorker, self).__init__() @@ -22,16 +26,11 @@ class QueueCleanupWorker(Worker): with UseThenDisconnect(app.config): while True: # Find all queue items older than the threshold (typically a week) and delete them. - threshold_ago = datetime.now() - DELETION_DATE_THRESHOLD - to_delete = list(QueueItem.select() - .where(QueueItem.processing_expires <= threshold_ago) - .limit(BATCH_SIZE)) - - if len(to_delete) < DELETION_COUNT_THRESHOLD: + expiration_threshold = datetime.now() - DELETION_DATE_THRESHOLD + deleted_count = delete_expired(expiration_threshold, DELETION_COUNT_THRESHOLD, BATCH_SIZE) + if deleted_count == 0: return - QueueItem.delete().where(QueueItem.id << to_delete).execute() - if __name__ == "__main__": worker = QueueCleanupWorker()