This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py

28 lines
1 KiB
Python
Raw Normal View History

2019-11-12 16:09:47 +00:00
from contextlib import contextmanager
from mock import patch, Mock
from test.fixtures import *
from workers.blobuploadcleanupworker.blobuploadcleanupworker import BlobUploadCleanupWorker
from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model
def test_blobuploadcleanupworker(initialized_db):
# Create a blob upload older than the threshold.
blob_upload = model.create_stale_upload_for_testing()
# Note: We need to override UseThenDisconnect to ensure to remains connected to the test DB.
@contextmanager
def noop(_):
yield
storage_mock = Mock()
with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.UseThenDisconnect', noop):
with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.storage', storage_mock):
# Call cleanup and ensure it is canceled.
worker = BlobUploadCleanupWorker()
worker._cleanup_uploads()
storage_mock.cancel_chunked_upload.assert_called_once()
# Ensure the blob no longer exists.
model.blob_upload_exists(blob_upload.uuid)