from datetime import datetime, timedelta from data.logs_model.table_logs_model import TableLogsModel from data.logs_model.interface import LogsIterationTimeout from data.database import LogEntry, LogEntry2, LogEntry3, LogEntryKind from data import model from test.fixtures import * @pytest.fixture() def clear_db_logs(initialized_db): LogEntry.delete().execute() LogEntry2.delete().execute() LogEntry3.delete().execute() @pytest.fixture(params=[TableLogsModel()]) def logs_model(request, clear_db_logs): return request.param def _lookup_logs(logs_model, start_time, end_time, **kwargs): logs_found = [] page_token = None while True: found = logs_model.lookup_logs(start_time, end_time, page_token=page_token, **kwargs) logs_found.extend(found.logs) page_token = found.next_page_token if not found.logs: break assert len(logs_found) == len(set(logs_found)) return logs_found @pytest.mark.skipif(os.environ.get('TEST_DATABASE_URI', '').find('mysql') >= 0, reason='Flaky on MySQL') @pytest.mark.parametrize('namespace_name, repo_name, performer_name, check_args, expect_results', [ pytest.param('devtable', 'simple', 'devtable', {}, True, id='no filters'), pytest.param('devtable', 'simple', 'devtable', { 'performer_name': 'devtable', }, True, id='matching performer'), pytest.param('devtable', 'simple', 'devtable', { 'namespace_name': 'devtable', }, True, id='matching namespace'), pytest.param('devtable', 'simple', 'devtable', { 'namespace_name': 'devtable', 'repository_name': 'simple', }, True, id='matching repository'), pytest.param('devtable', 'simple', 'devtable', { 'performer_name': 'public', }, False, id='different performer'), pytest.param('devtable', 'simple', 'devtable', { 'namespace_name': 'public', }, False, id='different namespace'), pytest.param('devtable', 'simple', 'devtable', { 'namespace_name': 'devtable', 'repository_name': 'complex', }, False, id='different repository'), ]) def test_logs(namespace_name, repo_name, performer_name, check_args, expect_results, logs_model): # Add some logs. kinds = list(LogEntryKind.select()) user = model.user.get_user(performer_name) start_timestamp = datetime.utcnow() timestamp = start_timestamp for kind in kinds: for index in range(0, 3): logs_model.log_action(kind.name, namespace_name=namespace_name, repository_name=repo_name, performer=user, ip='1.2.3.4', timestamp=timestamp) timestamp = timestamp + timedelta(seconds=1) found = _lookup_logs(logs_model, start_timestamp, start_timestamp + timedelta(minutes=10), **check_args) if expect_results: assert len(found) == len(kinds) * 3 else: assert not found aggregated_counts = logs_model.get_aggregated_log_counts(start_timestamp, start_timestamp + timedelta(minutes=10), **check_args) if expect_results: assert len(aggregated_counts) == len(kinds) for ac in aggregated_counts: assert ac.count == 3 else: assert not aggregated_counts def test_count_repository_actions(logs_model): # Log some actions. logs_model.log_action('push_repo', namespace_name='devtable', repository_name='simple', ip='1.2.3.4') logs_model.log_action('pull_repo', namespace_name='devtable', repository_name='simple', ip='1.2.3.4') logs_model.log_action('pull_repo', namespace_name='devtable', repository_name='simple', ip='1.2.3.4') # Log some actions to a different repo. logs_model.log_action('pull_repo', namespace_name='devtable', repository_name='complex', ip='1.2.3.4') logs_model.log_action('pull_repo', namespace_name='devtable', repository_name='complex', ip='1.2.3.4') # Count the actions. day = datetime.today() - timedelta(minutes=60) simple_repo = model.repository.get_repository('devtable', 'simple') count = logs_model.count_repository_actions(simple_repo, day) assert count == 3 complex_repo = model.repository.get_repository('devtable', 'complex') count = logs_model.count_repository_actions(complex_repo, day) assert count == 2 @pytest.mark.skipif(os.environ.get('TEST_DATABASE_URI', '').find('mysql') >= 0, reason='Flaky on MySQL') def test_yield_logs_for_export(logs_model): # Add some logs. kinds = list(LogEntryKind.select()) user = model.user.get_user('devtable') start_timestamp = datetime.utcnow() timestamp = start_timestamp for kind in kinds: for index in range(0, 10): logs_model.log_action(kind.name, namespace_name='devtable', repository_name='simple', performer=user, ip='1.2.3.4', timestamp=timestamp) timestamp = timestamp + timedelta(seconds=1) # Yield the logs. simple_repo = model.repository.get_repository('devtable', 'simple') logs_found = [] for logs in logs_model.yield_logs_for_export(start_timestamp, timestamp + timedelta(minutes=10), repository_id=simple_repo.id): logs_found.extend(logs) # Ensure we found all added logs. assert len(logs_found) == len(kinds) * 10 def test_yield_logs_for_export_timeout(logs_model): # Add some logs. kinds = list(LogEntryKind.select()) user = model.user.get_user('devtable') start_timestamp = datetime.utcnow() timestamp = start_timestamp for kind in kinds: for _ in range(0, 2): logs_model.log_action(kind.name, namespace_name='devtable', repository_name='simple', performer=user, ip='1.2.3.4', timestamp=timestamp) timestamp = timestamp + timedelta(seconds=1) # Yield the logs. Since we set the timeout to nothing, it should immediately fail. simple_repo = model.repository.get_repository('devtable', 'simple') with pytest.raises(LogsIterationTimeout): list(logs_model.yield_logs_for_export(start_timestamp, timestamp + timedelta(minutes=1), repository_id=simple_repo.id, max_query_time=timedelta(seconds=0)))