Add LogEntry3 table without the extra indexes and switch to writing to it

This commit is contained in:
Joseph Schorr 2019-01-03 13:50:43 -05:00
parent b6db002729
commit cdb49dbfd3
12 changed files with 114 additions and 49 deletions

View file

@ -1039,6 +1039,26 @@ class LogEntry2(BaseModel):
)
class LogEntry3(BaseModel):
id = BigAutoField()
kind = ForeignKeyField(LogEntryKind)
account = IntegerField(db_column='account_id')
performer = IntegerField(null=True, db_column='performer_id')
repository = IntegerField(null=True, db_column='repository_id')
datetime = DateTimeField(default=datetime.now, index=True)
ip = CharField(null=True)
metadata_json = TextField(default='{}')
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('account', 'datetime'), False),
(('performer', 'datetime'), False),
(('repository', 'datetime', 'kind'), False),
)
class RepositoryActionCount(BaseModel):
repository = ForeignKeyField(Repository)
count = IntegerField()

View file

@ -0,0 +1,42 @@
"""Add LogEntry3 table
Revision ID: 6ec8726c0ace
Revises: 54492a68a3cf
Create Date: 2019-01-03 13:41:02.897957
"""
# revision identifiers, used by Alembic.
revision = '6ec8726c0ace'
down_revision = '54492a68a3cf'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade(tables, tester):
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('logentry3',
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('kind_id', sa.Integer(), nullable=False),
sa.Column('account_id', sa.Integer(), nullable=False),
sa.Column('performer_id', sa.Integer(), nullable=True),
sa.Column('repository_id', sa.Integer(), nullable=True),
sa.Column('datetime', sa.DateTime(), nullable=False),
sa.Column('ip', sa.String(length=255), nullable=True),
sa.Column('metadata_json', sa.Text(), nullable=False),
sa.ForeignKeyConstraint(['kind_id'], ['logentrykind.id'], name=op.f('fk_logentry3_kind_id_logentrykind')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_logentry3'))
)
op.create_index('logentry3_account_id_datetime', 'logentry3', ['account_id', 'datetime'], unique=False)
op.create_index('logentry3_datetime', 'logentry3', ['datetime'], unique=False)
op.create_index('logentry3_kind_id', 'logentry3', ['kind_id'], unique=False)
op.create_index('logentry3_performer_id_datetime', 'logentry3', ['performer_id', 'datetime'], unique=False)
op.create_index('logentry3_repository_id_datetime_kind_id', 'logentry3', ['repository_id', 'datetime', 'kind_id'], unique=False)
# ### end Alembic commands ###
def downgrade(tables, tester):
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('logentry3')
# ### end Alembic commands ###

View file

@ -7,7 +7,8 @@ from datetime import datetime, timedelta
from cachetools import lru_cache
import data
from data.database import LogEntry, LogEntryKind, User, RepositoryActionCount, db, LogEntry2
from data.database import (LogEntry, LogEntryKind, User, RepositoryActionCount, db, LogEntry3,
LogEntry3)
from data.model import config, user, DataModelException
logger = logging.getLogger(__name__)
@ -16,9 +17,9 @@ ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING = ['pull_repo']
def _logs_query(selections, start_time=None, end_time=None, performer=None, repository=None,
namespace=None, ignore=None, model=LogEntry2, id_range=None):
namespace=None, ignore=None, model=LogEntry3, id_range=None):
""" Returns a query for selecting logs from the table, with various options and filters. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
assert (start_time is not None and end_time is not None) or (id_range is not None)
joined = (model.select(*selections).switch(model))
@ -64,9 +65,9 @@ def _get_log_entry_kind(name):
def get_aggregated_logs(start_time, end_time, performer=None, repository=None, namespace=None,
ignore=None, model=LogEntry2):
ignore=None, model=LogEntry3):
""" Returns the count of logs, by kind and day, for the logs matching the given filters. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
date = db.extract_date('day', model.datetime)
selections = [model.kind, date.alias('day'), fn.Count(model.id).alias('count')]
query = _logs_query(selections, start_time, end_time, performer, repository, namespace, ignore,
@ -75,9 +76,9 @@ def get_aggregated_logs(start_time, end_time, performer=None, repository=None, n
def get_logs_query(start_time=None, end_time=None, performer=None, repository=None, namespace=None,
ignore=None, model=LogEntry2, id_range=None):
ignore=None, model=LogEntry3, id_range=None):
""" Returns the logs matching the given filters. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
Performer = User.alias()
Account = User.alias()
selections = [model, Performer]
@ -137,7 +138,7 @@ def log_action(kind_name, user_or_organization_name, performer=None, repository=
}
try:
LogEntry2.create(**log_data)
LogEntry3.create(**log_data)
except PeeweeException as ex:
strict_logging_disabled = config.app_config.get('ALLOW_PULLS_WITHOUT_STRICT_LOGGING')
if strict_logging_disabled and kind_name in ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING:
@ -148,7 +149,7 @@ def log_action(kind_name, user_or_organization_name, performer=None, repository=
def get_stale_logs_start_id(model):
""" Gets the oldest log entry. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
try:
return (model.select(model.id).order_by(model.id).limit(1).tuples())[0][0]
except IndexError:
@ -157,7 +158,7 @@ def get_stale_logs_start_id(model):
def get_stale_logs_cutoff_id(cutoff_date, model):
""" Gets the most recent ID created before the cutoff_date. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
try:
return (model.select(fn.Max(model.id)).where(model.datetime <= cutoff_date)
.tuples())[0][0]
@ -167,13 +168,13 @@ def get_stale_logs_cutoff_id(cutoff_date, model):
def get_stale_logs(start_id, end_id, model):
""" Returns all the logs with IDs between start_id and end_id inclusively. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
return model.select().where((model.id >= start_id), (model.id <= end_id))
def delete_stale_logs(start_id, end_id, model):
""" Deletes all the logs with IDs between start_id and end_id. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
model.delete().where((model.id >= start_id), (model.id <= end_id)).execute()
@ -205,7 +206,7 @@ def get_repositories_action_sums(repository_ids):
return action_count_map
def get_minimum_id_for_logs(start_time, repository_id=None, namespace_id=None, model=LogEntry2):
def get_minimum_id_for_logs(start_time, repository_id=None, namespace_id=None, model=LogEntry3):
""" Returns the minimum ID for logs matching the given repository or namespace in
the logs table, starting at the given start time.
"""
@ -221,7 +222,7 @@ def get_minimum_id_for_logs(start_time, repository_id=None, namespace_id=None, m
model=model)
def get_maximum_id_for_logs(end_time, repository_id=None, namespace_id=None, model=LogEntry2):
def get_maximum_id_for_logs(end_time, repository_id=None, namespace_id=None, model=LogEntry3):
""" Returns the maximum ID for logs matching the given repository or namespace in
the logs table, ending at the given end time.
"""
@ -238,7 +239,7 @@ def get_maximum_id_for_logs(end_time, repository_id=None, namespace_id=None, mod
def _get_bounded_id(fn, filter_clause, repository_id, namespace_id, reduction_clause=None,
model=LogEntry2):
model=LogEntry3):
assert (namespace_id is not None) or (repository_id is not None)
query = (model
.select(fn(model.id))

View file

@ -4,7 +4,7 @@ from collections import namedtuple
from peewee import IntegrityError
from datetime import date, timedelta, datetime
from data.database import (Repository, LogEntry, LogEntry2, RepositoryActionCount,
from data.database import (Repository, LogEntry, LogEntry2, LogEntry3, RepositoryActionCount,
RepositorySearchScore, db_random_func, fn)
logger = logging.getLogger(__name__)
@ -52,7 +52,7 @@ def count_repository_actions(to_count):
today = date.today()
yesterday = today - timedelta(days=1)
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
def lookup_action_count(model):
return (model
.select()
@ -61,7 +61,8 @@ def count_repository_actions(to_count):
model.datetime < today)
.count())
actions = lookup_action_count(LogEntry) + lookup_action_count(LogEntry2)
actions = (lookup_action_count(LogEntry3) + lookup_action_count(LogEntry2) +
lookup_action_count(LogEntry))
try:
RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions)
return True

View file

@ -1,6 +1,6 @@
import pytest
from data.database import LogEntry2, User
from data.database import LogEntry3, User
from data.model import config as _config
from data.model.log import log_action
@ -21,8 +21,8 @@ def logentry_kind():
@pytest.fixture()
def logentry(logentry_kind):
with patch('data.database.LogEntry2.create', spec=True):
yield LogEntry2
with patch('data.database.LogEntry3.create', spec=True):
yield LogEntry3
@pytest.fixture()
def user():

View file

@ -50,7 +50,7 @@ class PreOCIModel(LogEntryDataInterface):
if performer_name:
performer = model.user.get_user(performer_name)
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
def get_logs(m):
logs_query = model.log.get_logs_query(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
@ -62,19 +62,19 @@ class PreOCIModel(LogEntryDataInterface):
max_page=app.config['ACTION_LOG_MAX_PAGE'])
return LogEntryPage([create_log(log) for log in logs], next_page_token)
# First check the LogEntry2 table for the most recent logs, unless we've been expressly told
# to look inside the "second" table.
TOKEN_TABLE_KEY2 = 'ttk2'
is_temp_table = page_token is not None and page_token.get(TOKEN_TABLE_KEY2) == 1
if is_temp_table:
page_result = get_logs(database.LogEntry)
else:
page_result = get_logs(database.LogEntry2)
# First check the LogEntry3 table for the most recent logs, unless we've been expressly told
# to look inside the other tables.
TOKEN_TABLE_ID = 'tti'
tables = [database.LogEntry3, database.LogEntry2, database.LogEntry]
if page_result.next_page_token is None and not is_temp_table:
page_result = page_result._replace(next_page_token={TOKEN_TABLE_KEY2: 1})
elif is_temp_table and page_result.next_page_token is not None:
page_result.next_page_token[TOKEN_TABLE_KEY2] = 1
table_index = 0
table_specified = page_token is not None and page_token.get(TOKEN_TABLE_ID) is not None
if table_specified:
table_index = page_token.get(TOKEN_TABLE_ID)
page_result = get_logs(tables[table_index])
if page_result.next_page_token is None and table_index < len(tables) - 1:
page_result = page_result._replace(next_page_token={TOKEN_TABLE_ID: table_index + 1})
return page_result
@ -97,16 +97,19 @@ class PreOCIModel(LogEntryDataInterface):
if performer_name:
performer = model.user.get_user(performer_name)
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
# TODO(LogMigrate): Remove the branch once we're back on a single table.
aggregated_logs = model.log.get_aggregated_logs(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore, model=database.LogEntry)
aggregated_logs_2 = model.log.get_aggregated_logs(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore, model=database.LogEntry2)
aggregated_logs_3 = model.log.get_aggregated_logs(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore, model=database.LogEntry3)
entries = {}
for log in itertools.chain(aggregated_logs, aggregated_logs_2):
for log in itertools.chain(aggregated_logs, aggregated_logs_2, aggregated_logs_3):
key = '%s-%s' % (log.kind_id, log.day)
if key in entries:
entries[key] = AggregatedLogEntry(log.count + entries[key].count, log.kind_id, log.day)

View file

@ -134,8 +134,6 @@ class SuperUserAggregateLogs(ApiResource):
if SuperUserPermission().can():
(start_time, end_time) = _validate_logs_arguments(parsed_args['starttime'],
parsed_args['endtime'])
# TODO(LogMigrate): Change to a unified log lookup util lib once we're back on LogEntry only.
aggregated_logs = log_model.get_aggregated_logs(start_time, end_time)
kinds = log_model.get_log_entry_kinds()
return {
@ -166,9 +164,8 @@ class SuperUserLogs(ApiResource):
if SuperUserPermission().can():
start_time = parsed_args['starttime']
end_time = parsed_args['endtime']
(start_time, end_time) = _validate_logs_arguments(start_time, end_time)
# TODO(LogMigrate): Change to a unified log lookup util lib once we're back on LogEntry only.
(start_time, end_time) = _validate_logs_arguments(start_time, end_time)
log_page = log_model.get_logs_query(start_time, end_time, page_token=page_token)
kinds = log_model.get_log_entry_kinds()
return {

View file

@ -111,7 +111,8 @@ def test_does_repo_exist_returns_true(monkeypatch):
def test_get_aggregated_logs(monkeypatch):
get_aggregated_logs_mock = Mock()
get_aggregated_logs_mock.side_effect = [[AttrDict({'day': '1', 'kind_id': 4, 'count': 6})],
[AttrDict({'day': '1', 'kind_id': 4, 'count': 12})]]
[AttrDict({'day': '1', 'kind_id': 4, 'count': 12})],
[AttrDict({'day': '1', 'kind_id': 4, 'count': 3})]]
monkeypatch.setattr(model.log, 'get_aggregated_logs', get_aggregated_logs_mock)
repo_mock = Mock()
@ -127,4 +128,4 @@ def test_get_aggregated_logs(monkeypatch):
actual = pre_oci_model.get_aggregated_logs('start_time', 'end_time', 'performer_name', 'repository_name',
'namespace_name', set())
assert actual == [AggregatedLogEntry(18, 4, '1')]
assert actual == [AggregatedLogEntry(21, 4, '1')]

View file

@ -920,7 +920,7 @@ def populate_database(minimal=False, with_storage=False):
model.repositoryactioncount.update_repository_score(to_count)
WHITELISTED_EMPTY_MODELS = ['DeletedNamespace', 'LogEntry', 'ManifestChild',
WHITELISTED_EMPTY_MODELS = ['DeletedNamespace', 'LogEntry', 'LogEntry2', 'ManifestChild',
'NamespaceGeoRestriction']
def find_models_missing_data():

View file

@ -3,7 +3,7 @@ import time
import socket
from contextlib import contextmanager
from data.database import LogEntryKind, LogEntry2
from data.database import LogEntryKind, LogEntry3
class assert_action_logged(object):
""" Specialized assertion for ensuring that a log entry of a particular kind was added under the
@ -14,7 +14,7 @@ class assert_action_logged(object):
self.existing_count = 0
def _get_log_count(self):
return LogEntry2.select().where(LogEntry2.kind == LogEntryKind.get(name=self.log_kind)).count()
return LogEntry3.select().where(LogEntry3.kind == LogEntryKind.get(name=self.log_kind)).count()
def __enter__(self):
self.existing_count = self._get_log_count()

View file

@ -8,7 +8,7 @@ from tempfile import SpooledTemporaryFile
import features
from app import app, storage
from data.database import UseThenDisconnect, LogEntry, LogEntry2
from data.database import UseThenDisconnect, LogEntry, LogEntry2, LogEntry3
from data.model.log import (get_stale_logs, get_stale_logs_start_id,
get_stale_logs_cutoff_id, delete_stale_logs)
from data.userfiles import DelegateUserfiles
@ -36,8 +36,8 @@ class LogRotateWorker(Worker):
self.add_operation(self._archive_logs, WORKER_FREQUENCY)
def _archive_logs(self):
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
models = [LogEntry, LogEntry2]
# TODO(LogMigrate): Remove the branch once we're back on a single table.
models = [LogEntry, LogEntry2, LogEntry3]
for model in models:
self._archive_logs_for_model(model)

View file

@ -59,7 +59,7 @@ def test_process_queue_item(namespace, repo_name, expects_logs, app):
created = storage.get_content(storage.preferred_locations, 'exportedactionlogs/' + storage_id)
created_json = json.loads(created)
expected_count = database.LogEntry2.select().where(database.LogEntry2.repository == repo).count()
expected_count = database.LogEntry3.select().where(database.LogEntry3.repository == repo).count()
assert (expected_count > 1) == expects_logs
assert created_json['export_id'] == 'someid'