diff --git a/data/model/blob.py b/data/model/blob.py index cd830d6cb..0a3f1a39f 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -88,6 +88,17 @@ def get_stale_blob_upload(stale_timespan): return None +def get_blob_upload_by_uuid(upload_uuid): + """ Loads the upload with the given UUID, if any. """ + try: + return (BlobUpload + .select() + .where(BlobUpload.uuid == upload_uuid) + .get()) + except BlobUpload.DoesNotExist: + return None + + def get_blob_upload(namespace, repo_name, upload_uuid): """ Load the upload which is already in progress. """ diff --git a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py index bb705d10a..4c664641f 100644 --- a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py +++ b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py @@ -4,8 +4,8 @@ import logging.config from datetime import timedelta from app import app, storage -from data.database import UseThenDisconnect, BlobUpload -from data import model +from data.database import UseThenDisconnect +from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model from workers.worker import Worker from util.log import logfile_path @@ -26,7 +26,7 @@ class BlobUploadCleanupWorker(Worker): while True: # Find all blob uploads older than the threshold (typically a week) and delete them. with UseThenDisconnect(app.config): - stale_upload = model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD) + stale_upload = model.get_stale_blob_upload(DELETION_DATE_THRESHOLD) if stale_upload is None: logger.debug('No additional stale blob uploads found') return @@ -34,7 +34,7 @@ class BlobUploadCleanupWorker(Worker): # Remove the stale upload from storage. logger.debug('Removing stale blob upload %s', stale_upload.uuid) try: - storage.cancel_chunked_upload([stale_upload.location.name], stale_upload.uuid, + storage.cancel_chunked_upload([stale_upload.location_name], stale_upload.uuid, stale_upload.storage_metadata) except Exception as ex: logger.debug('Got error when trying to cancel chunked upload %s: %s', stale_upload.uuid, @@ -42,10 +42,7 @@ class BlobUploadCleanupWorker(Worker): # Delete the stale upload's row. with UseThenDisconnect(app.config): - try: - stale_upload.delete_instance() - except BlobUpload.DoesNotExist: - continue + model.delete_blob_upload(stale_upload) logger.debug('Removed stale blob upload %s', stale_upload.uuid) diff --git a/workers/blobuploadcleanupworker/models_interface.py b/workers/blobuploadcleanupworker/models_interface.py new file mode 100644 index 000000000..000a26771 --- /dev/null +++ b/workers/blobuploadcleanupworker/models_interface.py @@ -0,0 +1,37 @@ +from abc import ABCMeta, abstractmethod +from collections import namedtuple +from six import add_metaclass + + +class BlobUpload(namedtuple('BlobUpload', ['uuid', 'storage_metadata', 'location_name'])): + """ + BlobUpload represents a single upload of a blob in progress or previously started. + """ + + +@add_metaclass(ABCMeta) +class BlobUploadCleanupWorkerDataInterface(object): + """ + Interface that represents all data store interactions required by the blob upload cleanup worker. + """ + + @abstractmethod + def get_stale_blob_upload(self, stale_threshold): + """ Returns a BlobUpload that was created on or before the current date/time minus the + stale threshold. If none, returns None. """ + pass + + @abstractmethod + def delete_blob_upload(self, blob_upload): + """ Deletes a blob upload from the database. """ + pass + + @abstractmethod + def create_stale_upload_for_testing(self): + """ Creates a new stale blob upload for testing. """ + pass + + @abstractmethod + def blob_upload_exists(self, upload_uuid): + """ Returns True if a blob upload with the given UUID exists. """ + pass diff --git a/workers/blobuploadcleanupworker/models_pre_oci.py b/workers/blobuploadcleanupworker/models_pre_oci.py new file mode 100644 index 000000000..e7d0dbd20 --- /dev/null +++ b/workers/blobuploadcleanupworker/models_pre_oci.py @@ -0,0 +1,36 @@ +from datetime import datetime, timedelta + +from data import model +from data.database import BlobUpload as BlobUploadTable +from workers.blobuploadcleanupworker.models_interface import (BlobUpload, + BlobUploadCleanupWorkerDataInterface) + +class PreOCIModel(BlobUploadCleanupWorkerDataInterface): + def get_stale_blob_upload(self, stale_threshold): + blob_upload = model.blob.get_stale_blob_upload(stale_threshold) + if blob_upload is None: + return None + + return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) + + def delete_blob_upload(self, blob_upload): + blob_upload = model.blob.get_blob_upload_by_uuid(blob_upload.uuid) + if blob_upload is None: + return + + try: + blob_upload.delete_instance() + except BlobUploadTable.DoesNotExist: + pass + + def create_stale_upload_for_testing(self): + blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {}) + blob_upload.created = datetime.now() - timedelta(days=60) + blob_upload.save() + return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) + + def blob_upload_exists(self, upload_uuid): + blob_upload = model.blob.get_blob_upload_by_uuid(upload_uuid) + return blob_upload is not None + +pre_oci_model = PreOCIModel() diff --git a/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py index 702292063..fb61e1389 100644 --- a/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py +++ b/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py @@ -1,21 +1,13 @@ -import pytest - from contextlib import contextmanager -from datetime import datetime, timedelta from mock import patch, Mock from test.fixtures import * -from data import model -from workers.blobuploadcleanupworker.blobuploadcleanupworker import (BlobUploadCleanupWorker, - DELETION_DATE_THRESHOLD) +from workers.blobuploadcleanupworker.blobuploadcleanupworker import BlobUploadCleanupWorker +from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model def test_blobuploadcleanupworker(initialized_db): # Create a blob upload older than the threshold. - blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {}) - blob_upload.created = datetime.now() - (DELETION_DATE_THRESHOLD + timedelta(days=100)) - blob_upload.save() - - assert model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD) is not None + blob_upload = model.create_stale_upload_for_testing() # Note: We need to override UseThenDisconnect to ensure to remains connected to the test DB. @contextmanager @@ -32,5 +24,4 @@ def test_blobuploadcleanupworker(initialized_db): storage_mock.cancel_chunked_upload.assert_called_once() # Ensure the blob no longer exists. - with pytest.raises(model.InvalidBlobUpload): - model.blob.get_blob_upload('devtable', 'simple', blob_upload.uuid) + model.blob_upload_exists(blob_upload.uuid)