workers.queuecleanup: remove direct peewee usage

This commit is contained in:
Jimmy Zelinskie 2016-10-20 13:46:00 -04:00
parent efbbeeb07f
commit 20ef43d5fb
2 changed files with 28 additions and 11 deletions

View file

@ -274,3 +274,21 @@ class WorkQueue(object):
except QueueItem.DoesNotExist:
return
def delete_expired(expiration_threshold, deletion_threshold, batch_size):
"""
Deletes all queue items that are older than the provided expiration threshold in batches of the
provided size. If there are less items than the deletion threshold, this method does nothing.
Returns the number of items deleted.
"""
to_delete = list(QueueItem
.select()
.where(QueueItem.processing_expires <= expiration_threshold)
.limit(batch_size))
if len(to_delete) < deletion_threshold:
return 0
QueueItem.delete().where(QueueItem.id << to_delete).execute()
return len(to_delete)

View file

@ -1,10 +1,13 @@
import logging
from app import app
from data.database import UseThenDisconnect, QueueItem
from workers.worker import Worker
from datetime import timedelta, datetime
from app import app
from data.database import UseThenDisconnect
from data.queue import delete_expired
from workers.worker import Worker
logger = logging.getLogger(__name__)
DELETION_DATE_THRESHOLD = timedelta(days=7)
@ -12,6 +15,7 @@ DELETION_COUNT_THRESHOLD = 50
BATCH_SIZE = 500
QUEUE_CLEANUP_FREQUENCY = app.config.get('QUEUE_CLEANUP_FREQUENCY', 60*60*24)
class QueueCleanupWorker(Worker):
def __init__(self):
super(QueueCleanupWorker, self).__init__()
@ -22,16 +26,11 @@ class QueueCleanupWorker(Worker):
with UseThenDisconnect(app.config):
while True:
# Find all queue items older than the threshold (typically a week) and delete them.
threshold_ago = datetime.now() - DELETION_DATE_THRESHOLD
to_delete = list(QueueItem.select()
.where(QueueItem.processing_expires <= threshold_ago)
.limit(BATCH_SIZE))
if len(to_delete) < DELETION_COUNT_THRESHOLD:
expiration_threshold = datetime.now() - DELETION_DATE_THRESHOLD
deleted_count = delete_expired(expiration_threshold, DELETION_COUNT_THRESHOLD, BATCH_SIZE)
if deleted_count == 0:
return
QueueItem.delete().where(QueueItem.id << to_delete).execute()
if __name__ == "__main__":
worker = QueueCleanupWorker()