This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/test/test_exportactionlogsworker.py

66 lines
2.2 KiB
Python

import json
from app import storage
from datetime import datetime, timedelta
from httmock import urlmatch, HTTMock
from data import model, database
from workers.exportactionlogsworker import ExportActionLogsWorker
from test.fixtures import *
@pytest.mark.parametrize('namespace,repo_name,expects_logs', [
('buynlarge', 'orgrepo', True),
('devtable', 'history', False),
])
def test_process_queue_item(namespace, repo_name, expects_logs, app):
end_time = datetime.utcnow() + timedelta(days=365)
start_time = datetime.utcnow() - timedelta(days=365)
repo = model.repository.get_repository(namespace, repo_name)
assert (model.log.get_maximum_id_for_logs(end_time, repository_id=repo.id) is not None) == expects_logs
assert (model.log.get_minimum_id_for_logs(start_time, repository_id=repo.id) is not None) == expects_logs
worker = ExportActionLogsWorker(None)
called = [{}]
@urlmatch(netloc=r'testcallback')
def handle_request(url, request):
called[0] = json.loads(request.body)
return {'status_code': 200, 'content': '{}'}
def format_date(datetime):
return datetime.strftime("%m/%d/%Y")
with HTTMock(handle_request):
worker.process_queue_item({
'export_id': 'someid',
'repository_id': repo.id,
'namespace_id': repo.namespace_user.id,
'namespace_name': namespace,
'repository_name': repo_name,
'start_time': format_date(start_time),
'end_time': format_date(end_time),
'callback_url': 'http://testcallback/',
'callback_email': None,
})
assert called[0]
assert called[0][u'export_id'] == 'someid'
assert called[0][u'status'] == 'success'
url = called[0][u'exported_data_url']
assert url.find('http://localhost:5000/exportedlogs/') == 0
storage_id = url[len('http://localhost:5000/exportedlogs/'):]
created = storage.get_content(storage.preferred_locations, 'exportedactionlogs/' + storage_id)
created_json = json.loads(created)
expected_count = database.LogEntry3.select().where(database.LogEntry3.repository == repo).count()
assert (expected_count > 1) == expects_logs
assert created_json['export_id'] == 'someid'
assert len(created_json['logs']) == (expected_count + 1)