diff --git a/conf/init/service/blobuploadcleanupworker/run b/conf/init/service/blobuploadcleanupworker/run index be3c48e6d..29759be69 100755 --- a/conf/init/service/blobuploadcleanupworker/run +++ b/conf/init/service/blobuploadcleanupworker/run @@ -5,6 +5,6 @@ echo 'Starting Blob upload cleanup worker' QUAYPATH=${QUAYPATH:-"."} cd ${QUAYDIR:-"/"} -PYTHONPATH=$QUAYPATH venv/bin/python -m workers.blobuploadcleanupworker 2>&1 +PYTHONPATH=$QUAYPATH venv/bin/python -m workers.blobuploadcleanupworker.blobuploadcleanupworker 2>&1 echo 'Blob upload cleanup exited' \ No newline at end of file diff --git a/data/model/blob.py b/data/model/blob.py index cd830d6cb..0a3f1a39f 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -88,6 +88,17 @@ def get_stale_blob_upload(stale_timespan): return None +def get_blob_upload_by_uuid(upload_uuid): + """ Loads the upload with the given UUID, if any. """ + try: + return (BlobUpload + .select() + .where(BlobUpload.uuid == upload_uuid) + .get()) + except BlobUpload.DoesNotExist: + return None + + def get_blob_upload(namespace, repo_name, upload_uuid): """ Load the upload which is already in progress. """ diff --git a/workers/blobuploadcleanupworker/__init__.py b/workers/blobuploadcleanupworker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/workers/blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py similarity index 81% rename from workers/blobuploadcleanupworker.py rename to workers/blobuploadcleanupworker/blobuploadcleanupworker.py index bb705d10a..82d3acda3 100644 --- a/workers/blobuploadcleanupworker.py +++ b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py @@ -4,16 +4,15 @@ import logging.config from datetime import timedelta from app import app, storage -from data.database import UseThenDisconnect, BlobUpload -from data import model +from data.database import UseThenDisconnect +from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model from workers.worker import Worker from util.log import logfile_path logger = logging.getLogger(__name__) - DELETION_DATE_THRESHOLD = timedelta(days=2) -BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get('BLOBUPLOAD_CLEANUP_FREQUENCY', 60*60) +BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get('BLOBUPLOAD_CLEANUP_FREQUENCY', 60 * 60) class BlobUploadCleanupWorker(Worker): @@ -26,7 +25,7 @@ class BlobUploadCleanupWorker(Worker): while True: # Find all blob uploads older than the threshold (typically a week) and delete them. with UseThenDisconnect(app.config): - stale_upload = model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD) + stale_upload = model.get_stale_blob_upload(DELETION_DATE_THRESHOLD) if stale_upload is None: logger.debug('No additional stale blob uploads found') return @@ -34,7 +33,7 @@ class BlobUploadCleanupWorker(Worker): # Remove the stale upload from storage. logger.debug('Removing stale blob upload %s', stale_upload.uuid) try: - storage.cancel_chunked_upload([stale_upload.location.name], stale_upload.uuid, + storage.cancel_chunked_upload([stale_upload.location_name], stale_upload.uuid, stale_upload.storage_metadata) except Exception as ex: logger.debug('Got error when trying to cancel chunked upload %s: %s', stale_upload.uuid, @@ -42,10 +41,7 @@ class BlobUploadCleanupWorker(Worker): # Delete the stale upload's row. with UseThenDisconnect(app.config): - try: - stale_upload.delete_instance() - except BlobUpload.DoesNotExist: - continue + model.delete_blob_upload(stale_upload) logger.debug('Removed stale blob upload %s', stale_upload.uuid) diff --git a/workers/blobuploadcleanupworker/models_interface.py b/workers/blobuploadcleanupworker/models_interface.py new file mode 100644 index 000000000..000a26771 --- /dev/null +++ b/workers/blobuploadcleanupworker/models_interface.py @@ -0,0 +1,37 @@ +from abc import ABCMeta, abstractmethod +from collections import namedtuple +from six import add_metaclass + + +class BlobUpload(namedtuple('BlobUpload', ['uuid', 'storage_metadata', 'location_name'])): + """ + BlobUpload represents a single upload of a blob in progress or previously started. + """ + + +@add_metaclass(ABCMeta) +class BlobUploadCleanupWorkerDataInterface(object): + """ + Interface that represents all data store interactions required by the blob upload cleanup worker. + """ + + @abstractmethod + def get_stale_blob_upload(self, stale_threshold): + """ Returns a BlobUpload that was created on or before the current date/time minus the + stale threshold. If none, returns None. """ + pass + + @abstractmethod + def delete_blob_upload(self, blob_upload): + """ Deletes a blob upload from the database. """ + pass + + @abstractmethod + def create_stale_upload_for_testing(self): + """ Creates a new stale blob upload for testing. """ + pass + + @abstractmethod + def blob_upload_exists(self, upload_uuid): + """ Returns True if a blob upload with the given UUID exists. """ + pass diff --git a/workers/blobuploadcleanupworker/models_pre_oci.py b/workers/blobuploadcleanupworker/models_pre_oci.py new file mode 100644 index 000000000..97db6e159 --- /dev/null +++ b/workers/blobuploadcleanupworker/models_pre_oci.py @@ -0,0 +1,38 @@ +from datetime import datetime, timedelta + +from data import model +from data.database import BlobUpload as BlobUploadTable +from workers.blobuploadcleanupworker.models_interface import ( + BlobUpload, BlobUploadCleanupWorkerDataInterface) + + +class PreOCIModel(BlobUploadCleanupWorkerDataInterface): + def get_stale_blob_upload(self, stale_threshold): + blob_upload = model.blob.get_stale_blob_upload(stale_threshold) + if blob_upload is None: + return None + + return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) + + def delete_blob_upload(self, blob_upload): + blob_upload = model.blob.get_blob_upload_by_uuid(blob_upload.uuid) + if blob_upload is None: + return + + try: + blob_upload.delete_instance() + except BlobUploadTable.DoesNotExist: + pass + + def create_stale_upload_for_testing(self): + blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {}) + blob_upload.created = datetime.now() - timedelta(days=60) + blob_upload.save() + return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) + + def blob_upload_exists(self, upload_uuid): + blob_upload = model.blob.get_blob_upload_by_uuid(upload_uuid) + return blob_upload is not None + + +pre_oci_model = PreOCIModel() diff --git a/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py new file mode 100644 index 000000000..fb61e1389 --- /dev/null +++ b/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py @@ -0,0 +1,27 @@ +from contextlib import contextmanager +from mock import patch, Mock + +from test.fixtures import * +from workers.blobuploadcleanupworker.blobuploadcleanupworker import BlobUploadCleanupWorker +from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model + +def test_blobuploadcleanupworker(initialized_db): + # Create a blob upload older than the threshold. + blob_upload = model.create_stale_upload_for_testing() + + # Note: We need to override UseThenDisconnect to ensure to remains connected to the test DB. + @contextmanager + def noop(_): + yield + + storage_mock = Mock() + with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.UseThenDisconnect', noop): + with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.storage', storage_mock): + # Call cleanup and ensure it is canceled. + worker = BlobUploadCleanupWorker() + worker._cleanup_uploads() + + storage_mock.cancel_chunked_upload.assert_called_once() + + # Ensure the blob no longer exists. + model.blob_upload_exists(blob_upload.uuid)