diff --git a/conf/init/service/blobuploadcleanupworker/log/run b/conf/init/service/blobuploadcleanupworker/log/run new file mode 100755 index 000000000..0f881fd59 --- /dev/null +++ b/conf/init/service/blobuploadcleanupworker/log/run @@ -0,0 +1,7 @@ +#!/bin/sh + +# Ensure dependencies start before the logger +sv check syslog-ng > /dev/null || exit 1 + +# Start the logger +exec logger -i -t blobuploadcleanupworker diff --git a/conf/init/service/blobuploadcleanupworker/run b/conf/init/service/blobuploadcleanupworker/run new file mode 100755 index 000000000..5f6f273ce --- /dev/null +++ b/conf/init/service/blobuploadcleanupworker/run @@ -0,0 +1,8 @@ +#! /bin/bash + +echo 'Starting Blob upload cleanup worker' + +cd / +venv/bin/python -m workers.blobuploadcleanupworker 2>&1 + +echo 'Blob upload cleanup exited' \ No newline at end of file diff --git a/data/model/blob.py b/data/model/blob.py index 470bb519d..cd830d6cb 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -1,9 +1,10 @@ +from datetime import datetime from uuid import uuid4 from data.model import (tag, _basequery, BlobDoesNotExist, InvalidBlobUpload, db_transaction, storage as storage_model, InvalidImageException) from data.database import (Repository, Namespace, ImageStorage, Image, ImageStoragePlacement, - BlobUpload, ImageStorageLocation) + BlobUpload, ImageStorageLocation, db_random_func) def get_repo_blob_by_digest(namespace, repo_name, blob_digest): @@ -58,6 +59,35 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_ return storage +def get_stale_blob_upload(stale_timespan): + """ Returns a random blob upload which was created before the stale timespan. """ + stale_threshold = datetime.now() - stale_timespan + + try: + candidates = (BlobUpload + .select() + .where(BlobUpload.created <= stale_threshold) + .limit(500) + .distinct() + .alias('candidates')) + + found = (BlobUpload + .select(candidates.c.id) + .from_(candidates) + .order_by(db_random_func()) + .get()) + if not found: + return None + + return (BlobUpload + .select(BlobUpload, ImageStorageLocation) + .join(ImageStorageLocation) + .where(BlobUpload.id == found.id) + .get()) + except BlobUpload.DoesNotExist: + return None + + def get_blob_upload(namespace, repo_name, upload_uuid): """ Load the upload which is already in progress. """ diff --git a/workers/blobuploadcleanupworker.py b/workers/blobuploadcleanupworker.py new file mode 100644 index 000000000..bfd8bb5d5 --- /dev/null +++ b/workers/blobuploadcleanupworker.py @@ -0,0 +1,56 @@ +import logging +import logging.config + +from datetime import timedelta + +from app import app, storage +from data.database import UseThenDisconnect, BlobUpload +from data import model +from workers.worker import Worker + + +logger = logging.getLogger(__name__) + + +DELETION_DATE_THRESHOLD = timedelta(days=2) +BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get('BLOBUPLOAD_CLEANUP_FREQUENCY', 60*60) + + +class BlobUploadCleanupWorker(Worker): + def __init__(self): + super(BlobUploadCleanupWorker, self).__init__() + self.add_operation(self._cleanup_uploads, BLOBUPLOAD_CLEANUP_FREQUENCY) + + def _cleanup_uploads(self): + """ Performs garbage collection on the blobupload table. """ + while True: + # Find all blob uploads older than the threshold (typically a week) and delete them. + with UseThenDisconnect(app.config): + stale_upload = model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD) + if stale_upload is None: + logger.debug('No additional stale blob uploads found') + return + + # Remove the stale upload from storage. + logger.debug('Removing stale blob upload %s', stale_upload.uuid) + try: + storage.cancel_chunked_upload([stale_upload.location.name], stale_upload.uuid, + stale_upload.storage_metadata) + except Exception as ex: + logger.debug('Got error when trying to cancel chunked upload %s: %s', stale_upload.uuid, + ex.message) + + # Delete the stale upload's row. + with UseThenDisconnect(app.config): + try: + stale_upload.delete_instance() + except BlobUpload.DoesNotExist: + continue + + logger.debug('Removed stale blob upload %s', stale_upload.uuid) + + +if __name__ == "__main__": + logging.config.fileConfig('conf/logging_debug.conf', disable_existing_loggers=False) + worker = BlobUploadCleanupWorker() + worker.start()