From b87415129f985b6b60b277c4c5a59415a8b46799 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 11 Jul 2017 15:38:10 +0300 Subject: [PATCH 1/4] Move blobuploadcleanupworker into its own package --- conf/init/service/blobuploadcleanupworker/run | 2 +- workers/blobuploadcleanupworker/__init__.py | 0 .../{ => blobuploadcleanupworker}/blobuploadcleanupworker.py | 0 3 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 workers/blobuploadcleanupworker/__init__.py rename workers/{ => blobuploadcleanupworker}/blobuploadcleanupworker.py (100%) diff --git a/conf/init/service/blobuploadcleanupworker/run b/conf/init/service/blobuploadcleanupworker/run index be3c48e6d..29759be69 100755 --- a/conf/init/service/blobuploadcleanupworker/run +++ b/conf/init/service/blobuploadcleanupworker/run @@ -5,6 +5,6 @@ echo 'Starting Blob upload cleanup worker' QUAYPATH=${QUAYPATH:-"."} cd ${QUAYDIR:-"/"} -PYTHONPATH=$QUAYPATH venv/bin/python -m workers.blobuploadcleanupworker 2>&1 +PYTHONPATH=$QUAYPATH venv/bin/python -m workers.blobuploadcleanupworker.blobuploadcleanupworker 2>&1 echo 'Blob upload cleanup exited' \ No newline at end of file diff --git a/workers/blobuploadcleanupworker/__init__.py b/workers/blobuploadcleanupworker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/workers/blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py similarity index 100% rename from workers/blobuploadcleanupworker.py rename to workers/blobuploadcleanupworker/blobuploadcleanupworker.py From b2053829f9b7364bca9e377e325d2deedade18df Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 11 Jul 2017 16:35:10 +0300 Subject: [PATCH 2/4] Add a basic test for blob upload cleanup --- .../test/test_blobuploadcleanupworker.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py diff --git a/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py new file mode 100644 index 000000000..702292063 --- /dev/null +++ b/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py @@ -0,0 +1,36 @@ +import pytest + +from contextlib import contextmanager +from datetime import datetime, timedelta +from mock import patch, Mock + +from test.fixtures import * +from data import model +from workers.blobuploadcleanupworker.blobuploadcleanupworker import (BlobUploadCleanupWorker, + DELETION_DATE_THRESHOLD) + +def test_blobuploadcleanupworker(initialized_db): + # Create a blob upload older than the threshold. + blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {}) + blob_upload.created = datetime.now() - (DELETION_DATE_THRESHOLD + timedelta(days=100)) + blob_upload.save() + + assert model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD) is not None + + # Note: We need to override UseThenDisconnect to ensure to remains connected to the test DB. + @contextmanager + def noop(_): + yield + + storage_mock = Mock() + with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.UseThenDisconnect', noop): + with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.storage', storage_mock): + # Call cleanup and ensure it is canceled. + worker = BlobUploadCleanupWorker() + worker._cleanup_uploads() + + storage_mock.cancel_chunked_upload.assert_called_once() + + # Ensure the blob no longer exists. + with pytest.raises(model.InvalidBlobUpload): + model.blob.get_blob_upload('devtable', 'simple', blob_upload.uuid) From bdab367285fab3c4938db83984eb756b0f5363c4 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 11 Jul 2017 16:58:09 +0300 Subject: [PATCH 3/4] Change blobuploadcleanupworker to use a data interface --- data/model/blob.py | 11 ++++++ .../blobuploadcleanupworker.py | 13 +++---- .../models_interface.py | 37 +++++++++++++++++++ .../blobuploadcleanupworker/models_pre_oci.py | 36 ++++++++++++++++++ .../test/test_blobuploadcleanupworker.py | 17 ++------- 5 files changed, 93 insertions(+), 21 deletions(-) create mode 100644 workers/blobuploadcleanupworker/models_interface.py create mode 100644 workers/blobuploadcleanupworker/models_pre_oci.py diff --git a/data/model/blob.py b/data/model/blob.py index cd830d6cb..0a3f1a39f 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -88,6 +88,17 @@ def get_stale_blob_upload(stale_timespan): return None +def get_blob_upload_by_uuid(upload_uuid): + """ Loads the upload with the given UUID, if any. """ + try: + return (BlobUpload + .select() + .where(BlobUpload.uuid == upload_uuid) + .get()) + except BlobUpload.DoesNotExist: + return None + + def get_blob_upload(namespace, repo_name, upload_uuid): """ Load the upload which is already in progress. """ diff --git a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py index bb705d10a..4c664641f 100644 --- a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py +++ b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py @@ -4,8 +4,8 @@ import logging.config from datetime import timedelta from app import app, storage -from data.database import UseThenDisconnect, BlobUpload -from data import model +from data.database import UseThenDisconnect +from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model from workers.worker import Worker from util.log import logfile_path @@ -26,7 +26,7 @@ class BlobUploadCleanupWorker(Worker): while True: # Find all blob uploads older than the threshold (typically a week) and delete them. with UseThenDisconnect(app.config): - stale_upload = model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD) + stale_upload = model.get_stale_blob_upload(DELETION_DATE_THRESHOLD) if stale_upload is None: logger.debug('No additional stale blob uploads found') return @@ -34,7 +34,7 @@ class BlobUploadCleanupWorker(Worker): # Remove the stale upload from storage. logger.debug('Removing stale blob upload %s', stale_upload.uuid) try: - storage.cancel_chunked_upload([stale_upload.location.name], stale_upload.uuid, + storage.cancel_chunked_upload([stale_upload.location_name], stale_upload.uuid, stale_upload.storage_metadata) except Exception as ex: logger.debug('Got error when trying to cancel chunked upload %s: %s', stale_upload.uuid, @@ -42,10 +42,7 @@ class BlobUploadCleanupWorker(Worker): # Delete the stale upload's row. with UseThenDisconnect(app.config): - try: - stale_upload.delete_instance() - except BlobUpload.DoesNotExist: - continue + model.delete_blob_upload(stale_upload) logger.debug('Removed stale blob upload %s', stale_upload.uuid) diff --git a/workers/blobuploadcleanupworker/models_interface.py b/workers/blobuploadcleanupworker/models_interface.py new file mode 100644 index 000000000..000a26771 --- /dev/null +++ b/workers/blobuploadcleanupworker/models_interface.py @@ -0,0 +1,37 @@ +from abc import ABCMeta, abstractmethod +from collections import namedtuple +from six import add_metaclass + + +class BlobUpload(namedtuple('BlobUpload', ['uuid', 'storage_metadata', 'location_name'])): + """ + BlobUpload represents a single upload of a blob in progress or previously started. + """ + + +@add_metaclass(ABCMeta) +class BlobUploadCleanupWorkerDataInterface(object): + """ + Interface that represents all data store interactions required by the blob upload cleanup worker. + """ + + @abstractmethod + def get_stale_blob_upload(self, stale_threshold): + """ Returns a BlobUpload that was created on or before the current date/time minus the + stale threshold. If none, returns None. """ + pass + + @abstractmethod + def delete_blob_upload(self, blob_upload): + """ Deletes a blob upload from the database. """ + pass + + @abstractmethod + def create_stale_upload_for_testing(self): + """ Creates a new stale blob upload for testing. """ + pass + + @abstractmethod + def blob_upload_exists(self, upload_uuid): + """ Returns True if a blob upload with the given UUID exists. """ + pass diff --git a/workers/blobuploadcleanupworker/models_pre_oci.py b/workers/blobuploadcleanupworker/models_pre_oci.py new file mode 100644 index 000000000..e7d0dbd20 --- /dev/null +++ b/workers/blobuploadcleanupworker/models_pre_oci.py @@ -0,0 +1,36 @@ +from datetime import datetime, timedelta + +from data import model +from data.database import BlobUpload as BlobUploadTable +from workers.blobuploadcleanupworker.models_interface import (BlobUpload, + BlobUploadCleanupWorkerDataInterface) + +class PreOCIModel(BlobUploadCleanupWorkerDataInterface): + def get_stale_blob_upload(self, stale_threshold): + blob_upload = model.blob.get_stale_blob_upload(stale_threshold) + if blob_upload is None: + return None + + return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) + + def delete_blob_upload(self, blob_upload): + blob_upload = model.blob.get_blob_upload_by_uuid(blob_upload.uuid) + if blob_upload is None: + return + + try: + blob_upload.delete_instance() + except BlobUploadTable.DoesNotExist: + pass + + def create_stale_upload_for_testing(self): + blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {}) + blob_upload.created = datetime.now() - timedelta(days=60) + blob_upload.save() + return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) + + def blob_upload_exists(self, upload_uuid): + blob_upload = model.blob.get_blob_upload_by_uuid(upload_uuid) + return blob_upload is not None + +pre_oci_model = PreOCIModel() diff --git a/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py index 702292063..fb61e1389 100644 --- a/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py +++ b/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py @@ -1,21 +1,13 @@ -import pytest - from contextlib import contextmanager -from datetime import datetime, timedelta from mock import patch, Mock from test.fixtures import * -from data import model -from workers.blobuploadcleanupworker.blobuploadcleanupworker import (BlobUploadCleanupWorker, - DELETION_DATE_THRESHOLD) +from workers.blobuploadcleanupworker.blobuploadcleanupworker import BlobUploadCleanupWorker +from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model def test_blobuploadcleanupworker(initialized_db): # Create a blob upload older than the threshold. - blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {}) - blob_upload.created = datetime.now() - (DELETION_DATE_THRESHOLD + timedelta(days=100)) - blob_upload.save() - - assert model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD) is not None + blob_upload = model.create_stale_upload_for_testing() # Note: We need to override UseThenDisconnect to ensure to remains connected to the test DB. @contextmanager @@ -32,5 +24,4 @@ def test_blobuploadcleanupworker(initialized_db): storage_mock.cancel_chunked_upload.assert_called_once() # Ensure the blob no longer exists. - with pytest.raises(model.InvalidBlobUpload): - model.blob.get_blob_upload('devtable', 'simple', blob_upload.uuid) + model.blob_upload_exists(blob_upload.uuid) From 8ded8f573d7edca3798d9389452d882f82f8e46d Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 11 Jul 2017 16:58:30 +0300 Subject: [PATCH 4/4] yapf --- workers/blobuploadcleanupworker/blobuploadcleanupworker.py | 3 +-- workers/blobuploadcleanupworker/models_pre_oci.py | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py index 4c664641f..82d3acda3 100644 --- a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py +++ b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py @@ -11,9 +11,8 @@ from util.log import logfile_path logger = logging.getLogger(__name__) - DELETION_DATE_THRESHOLD = timedelta(days=2) -BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get('BLOBUPLOAD_CLEANUP_FREQUENCY', 60*60) +BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get('BLOBUPLOAD_CLEANUP_FREQUENCY', 60 * 60) class BlobUploadCleanupWorker(Worker): diff --git a/workers/blobuploadcleanupworker/models_pre_oci.py b/workers/blobuploadcleanupworker/models_pre_oci.py index e7d0dbd20..97db6e159 100644 --- a/workers/blobuploadcleanupworker/models_pre_oci.py +++ b/workers/blobuploadcleanupworker/models_pre_oci.py @@ -2,8 +2,9 @@ from datetime import datetime, timedelta from data import model from data.database import BlobUpload as BlobUploadTable -from workers.blobuploadcleanupworker.models_interface import (BlobUpload, - BlobUploadCleanupWorkerDataInterface) +from workers.blobuploadcleanupworker.models_interface import ( + BlobUpload, BlobUploadCleanupWorkerDataInterface) + class PreOCIModel(BlobUploadCleanupWorkerDataInterface): def get_stale_blob_upload(self, stale_threshold): @@ -33,4 +34,5 @@ class PreOCIModel(BlobUploadCleanupWorkerDataInterface): blob_upload = model.blob.get_blob_upload_by_uuid(upload_uuid) return blob_upload is not None + pre_oci_model = PreOCIModel()