Change blobuploadcleanupworker to use a data interface
This commit is contained in:
parent
b2053829f9
commit
bdab367285
5 changed files with 93 additions and 21 deletions
|
@ -88,6 +88,17 @@ def get_stale_blob_upload(stale_timespan):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_blob_upload_by_uuid(upload_uuid):
|
||||||
|
""" Loads the upload with the given UUID, if any. """
|
||||||
|
try:
|
||||||
|
return (BlobUpload
|
||||||
|
.select()
|
||||||
|
.where(BlobUpload.uuid == upload_uuid)
|
||||||
|
.get())
|
||||||
|
except BlobUpload.DoesNotExist:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_blob_upload(namespace, repo_name, upload_uuid):
|
def get_blob_upload(namespace, repo_name, upload_uuid):
|
||||||
""" Load the upload which is already in progress.
|
""" Load the upload which is already in progress.
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -4,8 +4,8 @@ import logging.config
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
from app import app, storage
|
from app import app, storage
|
||||||
from data.database import UseThenDisconnect, BlobUpload
|
from data.database import UseThenDisconnect
|
||||||
from data import model
|
from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model
|
||||||
from workers.worker import Worker
|
from workers.worker import Worker
|
||||||
from util.log import logfile_path
|
from util.log import logfile_path
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ class BlobUploadCleanupWorker(Worker):
|
||||||
while True:
|
while True:
|
||||||
# Find all blob uploads older than the threshold (typically a week) and delete them.
|
# Find all blob uploads older than the threshold (typically a week) and delete them.
|
||||||
with UseThenDisconnect(app.config):
|
with UseThenDisconnect(app.config):
|
||||||
stale_upload = model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD)
|
stale_upload = model.get_stale_blob_upload(DELETION_DATE_THRESHOLD)
|
||||||
if stale_upload is None:
|
if stale_upload is None:
|
||||||
logger.debug('No additional stale blob uploads found')
|
logger.debug('No additional stale blob uploads found')
|
||||||
return
|
return
|
||||||
|
@ -34,7 +34,7 @@ class BlobUploadCleanupWorker(Worker):
|
||||||
# Remove the stale upload from storage.
|
# Remove the stale upload from storage.
|
||||||
logger.debug('Removing stale blob upload %s', stale_upload.uuid)
|
logger.debug('Removing stale blob upload %s', stale_upload.uuid)
|
||||||
try:
|
try:
|
||||||
storage.cancel_chunked_upload([stale_upload.location.name], stale_upload.uuid,
|
storage.cancel_chunked_upload([stale_upload.location_name], stale_upload.uuid,
|
||||||
stale_upload.storage_metadata)
|
stale_upload.storage_metadata)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
logger.debug('Got error when trying to cancel chunked upload %s: %s', stale_upload.uuid,
|
logger.debug('Got error when trying to cancel chunked upload %s: %s', stale_upload.uuid,
|
||||||
|
@ -42,10 +42,7 @@ class BlobUploadCleanupWorker(Worker):
|
||||||
|
|
||||||
# Delete the stale upload's row.
|
# Delete the stale upload's row.
|
||||||
with UseThenDisconnect(app.config):
|
with UseThenDisconnect(app.config):
|
||||||
try:
|
model.delete_blob_upload(stale_upload)
|
||||||
stale_upload.delete_instance()
|
|
||||||
except BlobUpload.DoesNotExist:
|
|
||||||
continue
|
|
||||||
|
|
||||||
logger.debug('Removed stale blob upload %s', stale_upload.uuid)
|
logger.debug('Removed stale blob upload %s', stale_upload.uuid)
|
||||||
|
|
||||||
|
|
37
workers/blobuploadcleanupworker/models_interface.py
Normal file
37
workers/blobuploadcleanupworker/models_interface.py
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
from abc import ABCMeta, abstractmethod
|
||||||
|
from collections import namedtuple
|
||||||
|
from six import add_metaclass
|
||||||
|
|
||||||
|
|
||||||
|
class BlobUpload(namedtuple('BlobUpload', ['uuid', 'storage_metadata', 'location_name'])):
|
||||||
|
"""
|
||||||
|
BlobUpload represents a single upload of a blob in progress or previously started.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@add_metaclass(ABCMeta)
|
||||||
|
class BlobUploadCleanupWorkerDataInterface(object):
|
||||||
|
"""
|
||||||
|
Interface that represents all data store interactions required by the blob upload cleanup worker.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_stale_blob_upload(self, stale_threshold):
|
||||||
|
""" Returns a BlobUpload that was created on or before the current date/time minus the
|
||||||
|
stale threshold. If none, returns None. """
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def delete_blob_upload(self, blob_upload):
|
||||||
|
""" Deletes a blob upload from the database. """
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def create_stale_upload_for_testing(self):
|
||||||
|
""" Creates a new stale blob upload for testing. """
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def blob_upload_exists(self, upload_uuid):
|
||||||
|
""" Returns True if a blob upload with the given UUID exists. """
|
||||||
|
pass
|
36
workers/blobuploadcleanupworker/models_pre_oci.py
Normal file
36
workers/blobuploadcleanupworker/models_pre_oci.py
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
from data import model
|
||||||
|
from data.database import BlobUpload as BlobUploadTable
|
||||||
|
from workers.blobuploadcleanupworker.models_interface import (BlobUpload,
|
||||||
|
BlobUploadCleanupWorkerDataInterface)
|
||||||
|
|
||||||
|
class PreOCIModel(BlobUploadCleanupWorkerDataInterface):
|
||||||
|
def get_stale_blob_upload(self, stale_threshold):
|
||||||
|
blob_upload = model.blob.get_stale_blob_upload(stale_threshold)
|
||||||
|
if blob_upload is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name)
|
||||||
|
|
||||||
|
def delete_blob_upload(self, blob_upload):
|
||||||
|
blob_upload = model.blob.get_blob_upload_by_uuid(blob_upload.uuid)
|
||||||
|
if blob_upload is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
blob_upload.delete_instance()
|
||||||
|
except BlobUploadTable.DoesNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_stale_upload_for_testing(self):
|
||||||
|
blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {})
|
||||||
|
blob_upload.created = datetime.now() - timedelta(days=60)
|
||||||
|
blob_upload.save()
|
||||||
|
return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name)
|
||||||
|
|
||||||
|
def blob_upload_exists(self, upload_uuid):
|
||||||
|
blob_upload = model.blob.get_blob_upload_by_uuid(upload_uuid)
|
||||||
|
return blob_upload is not None
|
||||||
|
|
||||||
|
pre_oci_model = PreOCIModel()
|
|
@ -1,21 +1,13 @@
|
||||||
import pytest
|
|
||||||
|
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from datetime import datetime, timedelta
|
|
||||||
from mock import patch, Mock
|
from mock import patch, Mock
|
||||||
|
|
||||||
from test.fixtures import *
|
from test.fixtures import *
|
||||||
from data import model
|
from workers.blobuploadcleanupworker.blobuploadcleanupworker import BlobUploadCleanupWorker
|
||||||
from workers.blobuploadcleanupworker.blobuploadcleanupworker import (BlobUploadCleanupWorker,
|
from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model
|
||||||
DELETION_DATE_THRESHOLD)
|
|
||||||
|
|
||||||
def test_blobuploadcleanupworker(initialized_db):
|
def test_blobuploadcleanupworker(initialized_db):
|
||||||
# Create a blob upload older than the threshold.
|
# Create a blob upload older than the threshold.
|
||||||
blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {})
|
blob_upload = model.create_stale_upload_for_testing()
|
||||||
blob_upload.created = datetime.now() - (DELETION_DATE_THRESHOLD + timedelta(days=100))
|
|
||||||
blob_upload.save()
|
|
||||||
|
|
||||||
assert model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD) is not None
|
|
||||||
|
|
||||||
# Note: We need to override UseThenDisconnect to ensure to remains connected to the test DB.
|
# Note: We need to override UseThenDisconnect to ensure to remains connected to the test DB.
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
@ -32,5 +24,4 @@ def test_blobuploadcleanupworker(initialized_db):
|
||||||
storage_mock.cancel_chunked_upload.assert_called_once()
|
storage_mock.cancel_chunked_upload.assert_called_once()
|
||||||
|
|
||||||
# Ensure the blob no longer exists.
|
# Ensure the blob no longer exists.
|
||||||
with pytest.raises(model.InvalidBlobUpload):
|
model.blob_upload_exists(blob_upload.uuid)
|
||||||
model.blob.get_blob_upload('devtable', 'simple', blob_upload.uuid)
|
|
||||||
|
|
Reference in a new issue