import json from app import storage from datetime import datetime, timedelta from httmock import urlmatch, HTTMock from data import model from data.logs_model import logs_model from workers.exportactionlogsworker import ExportActionLogsWorker, POLL_PERIOD_SECONDS from test.fixtures import * # TODO(sidchen): Test logging to elastic search log model. @pytest.mark.parametrize('namespace,repo_name,expects_logs', [ ('buynlarge', 'orgrepo', True), ('devtable', 'history', False), ]) def test_process_queue_item(namespace, repo_name, expects_logs, app): end_time = datetime.utcnow() + timedelta(days=365) start_time = datetime.utcnow() - timedelta(days=365) repo = model.repository.get_repository(namespace, repo_name) worker = ExportActionLogsWorker(None) called = [{}] @urlmatch(netloc=r'testcallback') def handle_request(url, request): called[0] = json.loads(request.body) return {'status_code': 200, 'content': '{}'} def format_date(datetime): return datetime.strftime("%m/%d/%Y") with HTTMock(handle_request): worker.process_queue_item({ 'export_id': 'someid', 'repository_id': repo.id, 'namespace_id': repo.namespace_user.id, 'namespace_name': namespace, 'repository_name': repo_name, 'start_time': format_date(start_time), 'end_time': format_date(end_time), 'callback_url': 'http://testcallback/', 'callback_email': None, }) assert called[0] assert called[0][u'export_id'] == 'someid' assert called[0][u'status'] == 'success' url = called[0][u'exported_data_url'] assert url.find('http://localhost:5000/exportedlogs/') == 0 storage_id = url[len('http://localhost:5000/exportedlogs/'):] created = storage.get_content(storage.preferred_locations, 'exportedactionlogs/' + storage_id) created_json = json.loads(created) expected_count = len(logs_model.lookup_logs(start_time, end_time, namespace_name=namespace, repository_name=repo_name).logs) assert (expected_count > 1) == expects_logs assert created_json['export_id'] == 'someid' assert len(created_json['logs']) == (expected_count + 1) assert POLL_PERIOD_SECONDS == 60