This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/blobuploadcleanupworker/test/test_blobuploadcleanupworker.py
2017-07-11 16:35:10 +03:00

36 lines
1.4 KiB
Python

import pytest
from contextlib import contextmanager
from datetime import datetime, timedelta
from mock import patch, Mock
from test.fixtures import *
from data import model
from workers.blobuploadcleanupworker.blobuploadcleanupworker import (BlobUploadCleanupWorker,
DELETION_DATE_THRESHOLD)
def test_blobuploadcleanupworker(initialized_db):
# Create a blob upload older than the threshold.
blob_upload = model.blob.initiate_upload('devtable', 'simple', 'foobarbaz', 'local_us', {})
blob_upload.created = datetime.now() - (DELETION_DATE_THRESHOLD + timedelta(days=100))
blob_upload.save()
assert model.blob.get_stale_blob_upload(DELETION_DATE_THRESHOLD) is not None
# Note: We need to override UseThenDisconnect to ensure to remains connected to the test DB.
@contextmanager
def noop(_):
yield
storage_mock = Mock()
with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.UseThenDisconnect', noop):
with patch('workers.blobuploadcleanupworker.blobuploadcleanupworker.storage', storage_mock):
# Call cleanup and ensure it is canceled.
worker = BlobUploadCleanupWorker()
worker._cleanup_uploads()
storage_mock.cancel_chunked_upload.assert_called_once()
# Ensure the blob no longer exists.
with pytest.raises(model.InvalidBlobUpload):
model.blob.get_blob_upload('devtable', 'simple', blob_upload.uuid)