diff --git a/conf/init/service/queuecleanupworker/log/run b/conf/init/service/queuecleanupworker/log/run new file mode 100755 index 000000000..ec239adfa --- /dev/null +++ b/conf/init/service/queuecleanupworker/log/run @@ -0,0 +1,2 @@ +#!/bin/sh +exec logger -i -t queuecleanupworker \ No newline at end of file diff --git a/conf/init/service/queuecleanupworker/run b/conf/init/service/queuecleanupworker/run new file mode 100755 index 000000000..ba04d5019 --- /dev/null +++ b/conf/init/service/queuecleanupworker/run @@ -0,0 +1,8 @@ +#! /bin/bash + +echo 'Starting Queue cleanup worker' + +cd / +venv/bin/python -m workers.queuecleanupworker 2>&1 + +echo 'Repository Queue cleanup exited' \ No newline at end of file diff --git a/workers/queuecleanupworker.py b/workers/queuecleanupworker.py new file mode 100644 index 000000000..8bd8713e3 --- /dev/null +++ b/workers/queuecleanupworker.py @@ -0,0 +1,39 @@ +import logging + +from app import app +from data.database import UseThenDisconnect, QueueItem +from workers.worker import Worker +from datetime import timedelta, datetime + +logger = logging.getLogger(__name__) + +DELETION_DATE_THRESHOLD = timedelta(days=7) +DELETION_COUNT_THRESHOLD = 50 +BATCH_SIZE = 500 +QUEUE_CLEANUP_FREQUENCY = app.config.get('QUEUE_CLEANUP_FREQUENCY', 60*60*24) + +class QueueCleanupWorker(Worker): + def __init__(self): + super(QueueCleanupWorker, self).__init__() + self.add_operation(self._cleanup_queue, QUEUE_CLEANUP_FREQUENCY) + + def _cleanup_queue(self): + """ Performs garbage collection on the queueitem table. """ + with UseThenDisconnect(app.config): + while True: + # Find all queue items older than the threshold (typically a week) that have no additional + # retries and delete them. + threshold_ago = datetime.now() - DELETION_DATE_THRESHOLD + to_delete = list(QueueItem.select().where(QueueItem.processing_expires >= threshold_ago, + QueueItem.retries_remaining == 0) + .limit(BATCH_SIZE)) + + if len(to_delete) < DELETION_COUNT_THRESHOLD: + return + + QueueItem.delete().where(QueueItem.id << to_delete).execute() + + +if __name__ == "__main__": + worker = QueueCleanupWorker() + worker.start()