Merge pull request #2768 from coreos-inc/joseph.schorr/QUAY-653/blobuploadcleanupworker-data-interface

Change blobuploadcleanupworker to use a data interface
This commit is contained in:
josephschorr 2017-07-12 00:32:09 +03:00 committed by GitHub
commit dc6c6b30fc
7 changed files with 120 additions and 11 deletions

View file

@ -5,6 +5,6 @@ echo 'Starting Blob upload cleanup worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.blobuploadcleanupworker 2>&1
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.blobuploadcleanupworker.blobuploadcleanupworker 2>&1
echo 'Blob upload cleanup exited'

View file

@ -88,6 +88,17 @@ def get_stale_blob_upload(stale_timespan):
return None
def get_blob_upload_by_uuid(upload_uuid):
""" Loads the upload with the given UUID, if any. """
try:
return (BlobUpload
.select()
.where(BlobUpload.uuid == upload_uuid)
.get())
except BlobUpload.DoesNotExist:
return None
def get_blob_upload(namespace, repo_name, upload_uuid):
""" Load the upload which is already in progress.
"""

View file

@ -4,16 +4,15 @@ import logging.config
from datetime import timedelta
from app import app, storage
from data.database import UseThenDisconnect, BlobUpload
from data import model
from data.database import UseThenDisconnect
from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model
from workers.worker import Worker
from util.log import logfile_path
logger = logging.getLogger(__name__)
DELETION_DATE_THRESHOLD = timedelta(days=2)
BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get('BLOBUPLOAD_CLEANUP_FREQUENCY', 60*60)
BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get('BLOBUPLOAD_CLEANUP_FREQUENCY', 60 * 60)
class BlobUploadCleanupWorker(Worker):
@ -26,7 +25,7 @@ class BlobUploadCleanupWorker(Worker):
while True:
# Find all blob uploads older than the threshold (typically a week) and delete them.
with UseThenDisconnect(app.config):
stale_upload = model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD)
stale_upload = model.get_stale_blob_upload(DELETION_DATE_THRESHOLD)
if stale_upload is None:
logger.debug('No additional stale blob uploads found')
return
@ -34,7 +33,7 @@ class BlobUploadCleanupWorker(Worker):
# Remove the stale upload from storage.
logger.debug('Removing stale blob upload %s', stale_upload.uuid)
try:
storage.cancel_chunked_upload([stale_upload.location.name], stale_upload.uuid,
storage.cancel_chunked_upload([stale_upload.location_name], stale_upload.uuid,
stale_upload.storage_metadata)
except Exception as ex:
logger.debug('Got error when trying to cancel chunked upload %s: %s', stale_upload.uuid,
@ -42,10 +41,7 @@ class BlobUploadCleanupWorker(Worker):
# Delete the stale upload's row.
with UseThenDisconnect(app.config):
try:
stale_upload.delete_instance()
except BlobUpload.DoesNotExist:
continue
model.delete_blob_upload(stale_upload)
logger.debug('Removed stale blob upload %s', stale_upload.uuid)

View file

@ -0,0 +1,37 @@
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from six import add_metaclass
class BlobUpload(namedtuple('BlobUpload', ['uuid', 'storage_metadata', 'location_name'])):
"""
BlobUpload represents a single upload of a blob in progress or previously started.
"""
@add_metaclass(ABCMeta)
class BlobUploadCleanupWorkerDataInterface(object):
"""
Interface that represents all data store interactions required by the blob upload cleanup worker.
"""
@abstractmethod
def get_stale_blob_upload(self, stale_threshold):
""" Returns a BlobUpload that was created on or before the current date/time minus the
stale threshold. If none, returns None. """
pass
@abstractmethod
def delete_blob_upload(self, blob_upload):
""" Deletes a blob upload from the database. """
pass
@abstractmethod
def create_stale_upload_for_testing(self):
""" Creates a new stale blob upload for testing. """
pass
@abstractmethod
def blob_upload_exists(self, upload_uuid):
""" Returns True if a blob upload with the given UUID exists. """
pass

View file

@ -0,0 +1,38 @@
from datetime import datetime, timedelta
from data import model
from data.database import BlobUpload as BlobUploadTable
from workers.blobuploadcleanupworker.models_interface import (
BlobUpload, BlobUploadCleanupWorkerDataInterface)
class PreOCIModel(BlobUploadCleanupWorkerDataInterface):
def get_stale_blob_upload(self, stale_threshold):
blob_upload = model.blob.get_stale_blob_upload(stale_threshold)
if blob_upload is None:
return None
return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name)
def delete_blob_upload(self, blob_upload):
blob_upload = model.blob.get_blob_upload_by_uuid(blob_upload.uuid)
if blob_upload is None:
return
try:
blob_upload.delete_instance()
except BlobUploadTable.DoesNotExist:
pass
def create_stale_upload_for_testing(self):
blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {})
blob_upload.created = datetime.now() - timedelta(days=60)
blob_upload.save()
return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name)
def blob_upload_exists(self, upload_uuid):
blob_upload = model.blob.get_blob_upload_by_uuid(upload_uuid)
return blob_upload is not None
pre_oci_model = PreOCIModel()

View file

@ -0,0 +1,27 @@
from contextlib import contextmanager
from mock import patch, Mock
from test.fixtures import *
from workers.blobuploadcleanupworker.blobuploadcleanupworker import BlobUploadCleanupWorker
from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model
def test_blobuploadcleanupworker(initialized_db):
# Create a blob upload older than the threshold.
blob_upload = model.create_stale_upload_for_testing()
# Note: We need to override UseThenDisconnect to ensure to remains connected to the test DB.
@contextmanager
def noop(_):
yield
storage_mock = Mock()
with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.UseThenDisconnect', noop):
with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.storage', storage_mock):
# Call cleanup and ensure it is canceled.
worker = BlobUploadCleanupWorker()
worker._cleanup_uploads()
storage_mock.cancel_chunked_upload.assert_called_once()
# Ensure the blob no longer exists.
model.blob_upload_exists(blob_upload.uuid)