This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/logs_model/test/test_logs_interface.py
2019-02-13 17:08:31 -05:00

172 lines
6.2 KiB
Python

from datetime import datetime, timedelta
from data.logs_model.table_logs_model import TableLogsModel
from data.logs_model.interface import LogsIterationTimeout
from data.database import LogEntry, LogEntry2, LogEntry3, LogEntryKind
from data import model
from test.fixtures import *
@pytest.fixture()
def clear_db_logs(initialized_db):
LogEntry.delete().execute()
LogEntry2.delete().execute()
LogEntry3.delete().execute()
@pytest.fixture(params=[TableLogsModel()])
def logs_model(request, clear_db_logs):
return request.param
def _lookup_logs(logs_model, start_time, end_time, **kwargs):
logs_found = []
page_token = None
while True:
found = logs_model.lookup_logs(start_time, end_time, page_token=page_token, **kwargs)
logs_found.extend(found.logs)
page_token = found.next_page_token
if not found.logs:
break
assert len(logs_found) == len(set(logs_found))
return logs_found
@pytest.mark.skipif(os.environ.get('TEST_DATABASE_URI', '').find('mysql') >= 0,
reason='Flaky on MySQL')
@pytest.mark.parametrize('namespace_name, repo_name, performer_name, check_args, expect_results', [
pytest.param('devtable', 'simple', 'devtable', {}, True, id='no filters'),
pytest.param('devtable', 'simple', 'devtable', {
'performer_name': 'devtable',
}, True, id='matching performer'),
pytest.param('devtable', 'simple', 'devtable', {
'namespace_name': 'devtable',
}, True, id='matching namespace'),
pytest.param('devtable', 'simple', 'devtable', {
'namespace_name': 'devtable',
'repository_name': 'simple',
}, True, id='matching repository'),
pytest.param('devtable', 'simple', 'devtable', {
'performer_name': 'public',
}, False, id='different performer'),
pytest.param('devtable', 'simple', 'devtable', {
'namespace_name': 'public',
}, False, id='different namespace'),
pytest.param('devtable', 'simple', 'devtable', {
'namespace_name': 'devtable',
'repository_name': 'complex',
}, False, id='different repository'),
])
def test_logs(namespace_name, repo_name, performer_name, check_args, expect_results, logs_model):
# Add some logs.
kinds = list(LogEntryKind.select())
user = model.user.get_user(performer_name)
start_timestamp = datetime.utcnow()
timestamp = start_timestamp
for kind in kinds:
for index in range(0, 3):
logs_model.log_action(kind.name, namespace_name=namespace_name, repository_name=repo_name,
performer=user, ip='1.2.3.4', timestamp=timestamp)
timestamp = timestamp + timedelta(seconds=1)
found = _lookup_logs(logs_model, start_timestamp, start_timestamp + timedelta(minutes=10),
**check_args)
if expect_results:
assert len(found) == len(kinds) * 3
else:
assert not found
aggregated_counts = logs_model.get_aggregated_log_counts(start_timestamp,
start_timestamp + timedelta(minutes=10),
**check_args)
if expect_results:
assert len(aggregated_counts) == len(kinds)
for ac in aggregated_counts:
assert ac.count == 3
else:
assert not aggregated_counts
def test_count_repository_actions(logs_model):
# Log some actions.
logs_model.log_action('push_repo', namespace_name='devtable', repository_name='simple',
ip='1.2.3.4')
logs_model.log_action('pull_repo', namespace_name='devtable', repository_name='simple',
ip='1.2.3.4')
logs_model.log_action('pull_repo', namespace_name='devtable', repository_name='simple',
ip='1.2.3.4')
# Log some actions to a different repo.
logs_model.log_action('pull_repo', namespace_name='devtable', repository_name='complex',
ip='1.2.3.4')
logs_model.log_action('pull_repo', namespace_name='devtable', repository_name='complex',
ip='1.2.3.4')
# Count the actions.
day = datetime.today() - timedelta(minutes=60)
simple_repo = model.repository.get_repository('devtable', 'simple')
count = logs_model.count_repository_actions(simple_repo, day)
assert count == 3
complex_repo = model.repository.get_repository('devtable', 'complex')
count = logs_model.count_repository_actions(complex_repo, day)
assert count == 2
@pytest.mark.skipif(os.environ.get('TEST_DATABASE_URI', '').find('mysql') >= 0,
reason='Flaky on MySQL')
def test_yield_logs_for_export(logs_model):
# Add some logs.
kinds = list(LogEntryKind.select())
user = model.user.get_user('devtable')
start_timestamp = datetime.utcnow()
timestamp = start_timestamp
for kind in kinds:
for index in range(0, 10):
logs_model.log_action(kind.name, namespace_name='devtable', repository_name='simple',
performer=user, ip='1.2.3.4', timestamp=timestamp)
timestamp = timestamp + timedelta(seconds=1)
# Yield the logs.
simple_repo = model.repository.get_repository('devtable', 'simple')
logs_found = []
for logs in logs_model.yield_logs_for_export(start_timestamp, timestamp + timedelta(minutes=10),
repository_id=simple_repo.id):
logs_found.extend(logs)
# Ensure we found all added logs.
assert len(logs_found) == len(kinds) * 10
def test_yield_logs_for_export_timeout(logs_model):
# Add some logs.
kinds = list(LogEntryKind.select())
user = model.user.get_user('devtable')
start_timestamp = datetime.utcnow()
timestamp = start_timestamp
for kind in kinds:
for _ in range(0, 2):
logs_model.log_action(kind.name, namespace_name='devtable', repository_name='simple',
performer=user, ip='1.2.3.4', timestamp=timestamp)
timestamp = timestamp + timedelta(seconds=1)
# Yield the logs. Since we set the timeout to nothing, it should immediately fail.
simple_repo = model.repository.get_repository('devtable', 'simple')
with pytest.raises(LogsIterationTimeout):
list(logs_model.yield_logs_for_export(start_timestamp, timestamp + timedelta(minutes=1),
repository_id=simple_repo.id,
max_query_time=timedelta(seconds=0)))