import logging from app import app from data.database import UseThenDisconnect, QueueItem from workers.worker import Worker from datetime import timedelta, datetime logger = logging.getLogger(__name__) DELETION_DATE_THRESHOLD = timedelta(days=7) DELETION_COUNT_THRESHOLD = 50 BATCH_SIZE = 500 QUEUE_CLEANUP_FREQUENCY = app.config.get('QUEUE_CLEANUP_FREQUENCY', 60*60*24) class QueueCleanupWorker(Worker): def __init__(self): super(QueueCleanupWorker, self).__init__() self.add_operation(self._cleanup_queue, QUEUE_CLEANUP_FREQUENCY) def _cleanup_queue(self): """ Performs garbage collection on the queueitem table. """ with UseThenDisconnect(app.config): while True: # Find all queue items older than the threshold (typically a week) and delete them. threshold_ago = datetime.now() - DELETION_DATE_THRESHOLD to_delete = list(QueueItem.select() .where(QueueItem.processing_expires <= threshold_ago) .limit(BATCH_SIZE)) if len(to_delete) < DELETION_COUNT_THRESHOLD: return QueueItem.delete().where(QueueItem.id << to_delete).execute() if __name__ == "__main__": worker = QueueCleanupWorker() worker.start()