Temporarily change to storing logs in a new LogEntry2 table

This will prevent us from running out of auto-incrementing ID values until such time as we can upgrade to peewee 3 and change the field type to a BigInt

Fixes https://jira.coreos.com/browse/QUAY-943
This commit is contained in:
Joseph Schorr 2018-05-18 12:54:38 -04:00
parent 66b4e45929
commit a007332d4c
13 changed files with 201 additions and 113 deletions

View file

@ -981,6 +981,27 @@ class LogEntry(BaseModel):
)
class LogEntry2(BaseModel):
""" TEMP FOR QUAY.IO ONLY. DO NOT RELEASE INTO QUAY ENTERPRISE. """
kind = ForeignKeyField(LogEntryKind)
account = IntegerField(index=True, db_column='account_id')
performer = IntegerField(index=True, null=True, db_column='performer_id')
repository = IntegerField(index=True, null=True, db_column='repository_id')
datetime = DateTimeField(default=datetime.now, index=True)
ip = CharField(null=True)
metadata_json = TextField(default='{}')
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('account', 'datetime'), False),
(('performer', 'datetime'), False),
(('repository', 'datetime'), False),
(('repository', 'datetime', 'kind'), False),
)
class RepositoryActionCount(BaseModel):
repository = ForeignKeyField(Repository)
count = IntegerField()

View file

@ -0,0 +1,46 @@
"""Add LogEntry2 table - QUAY.IO ONLY
Revision ID: 1783530bee68
Revises: 5b7503aada1b
Create Date: 2018-05-17 16:32:28.532264
"""
# revision identifiers, used by Alembic.
revision = '1783530bee68'
down_revision = '5b7503aada1b'
from alembic import op
import sqlalchemy as sa
def upgrade(tables):
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('logentry2',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('kind_id', sa.Integer(), nullable=False),
sa.Column('account_id', sa.Integer(), nullable=False),
sa.Column('performer_id', sa.Integer(), nullable=True),
sa.Column('repository_id', sa.Integer(), nullable=True),
sa.Column('datetime', sa.DateTime(), nullable=False),
sa.Column('ip', sa.String(length=255), nullable=True),
sa.Column('metadata_json', sa.Text(), nullable=False),
sa.ForeignKeyConstraint(['kind_id'], ['logentrykind.id'], name=op.f('fk_logentry2_kind_id_logentrykind')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_logentry2'))
)
op.create_index('logentry2_account_id', 'logentry2', ['account_id'], unique=False)
op.create_index('logentry2_account_id_datetime', 'logentry2', ['account_id', 'datetime'], unique=False)
op.create_index('logentry2_datetime', 'logentry2', ['datetime'], unique=False)
op.create_index('logentry2_kind_id', 'logentry2', ['kind_id'], unique=False)
op.create_index('logentry2_performer_id', 'logentry2', ['performer_id'], unique=False)
op.create_index('logentry2_performer_id_datetime', 'logentry2', ['performer_id', 'datetime'], unique=False)
op.create_index('logentry2_repository_id', 'logentry2', ['repository_id'], unique=False)
op.create_index('logentry2_repository_id_datetime', 'logentry2', ['repository_id', 'datetime'], unique=False)
op.create_index('logentry2_repository_id_datetime_kind_id', 'logentry2', ['repository_id', 'datetime', 'kind_id'], unique=False)
# ### end Alembic commands ###
def downgrade(tables):
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('logentry2')
# ### end Alembic commands ###

View file

@ -7,7 +7,7 @@ from datetime import datetime, timedelta
from cachetools import lru_cache
import data
from data.database import LogEntry, LogEntryKind, User, RepositoryActionCount, db
from data.database import LogEntry, LogEntry2, LogEntryKind, User, RepositoryActionCount, db
from data.model import config, user, DataModelException
logger = logging.getLogger(__name__)
@ -16,27 +16,29 @@ ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING = ['pull_repo']
def _logs_query(selections, start_time, end_time, performer=None, repository=None, namespace=None,
ignore=None):
joined = (LogEntry.select(*selections).switch(LogEntry)
.where(LogEntry.datetime >= start_time, LogEntry.datetime < end_time))
ignore=None, model=LogEntry):
""" Returns a query for selecting logs from the table, with various options and filters. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
joined = (model.select(*selections).switch(model)
.where(model.datetime >= start_time, model.datetime < end_time))
if repository:
joined = joined.where(LogEntry.repository == repository)
joined = joined.where(model.repository == repository)
if performer:
joined = joined.where(LogEntry.performer == performer)
joined = joined.where(model.performer == performer)
if namespace and not repository:
namespace_user = user.get_user_or_org(namespace)
if namespace_user is None:
raise DataModelException('Invalid namespace requested')
joined = joined.where(LogEntry.account == namespace_user.id)
joined = joined.where(model.account == namespace_user.id)
if ignore:
kind_map = get_log_entry_kinds()
ignore_ids = [kind_map[kind_name] for kind_name in ignore]
joined = joined.where(~(LogEntry.kind << ignore_ids))
joined = joined.where(~(model.kind << ignore_ids))
return joined
@ -57,29 +59,35 @@ def _get_log_entry_kind(name):
def get_aggregated_logs(start_time, end_time, performer=None, repository=None, namespace=None,
ignore=None):
date = db.extract_date('day', LogEntry.datetime)
selections = [LogEntry.kind, date.alias('day'), fn.Count(LogEntry.id).alias('count')]
query = _logs_query(selections, start_time, end_time, performer, repository, namespace, ignore)
return query.group_by(date, LogEntry.kind)
ignore=None, model=LogEntry):
""" Returns the count of logs, by kind and day, for the logs matching the given filters. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
date = db.extract_date('day', model.datetime)
selections = [model.kind, date.alias('day'), fn.Count(model.id).alias('count')]
query = _logs_query(selections, start_time, end_time, performer, repository, namespace, ignore,
model=model)
return query.group_by(date, model.kind)
def get_logs_query(start_time, end_time, performer=None, repository=None, namespace=None,
ignore=None):
ignore=None, model=LogEntry):
""" Returns the logs matching the given filters. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
Performer = User.alias()
Account = User.alias()
selections = [LogEntry, Performer]
selections = [model, Performer]
if namespace is None and repository is None:
selections.append(Account)
query = _logs_query(selections, start_time, end_time, performer, repository, namespace, ignore)
query = (query.switch(LogEntry).join(Performer, JOIN_LEFT_OUTER,
on=(LogEntry.performer == Performer.id).alias('performer')))
query = _logs_query(selections, start_time, end_time, performer, repository, namespace, ignore,
model=model)
query = (query.switch(model).join(Performer, JOIN_LEFT_OUTER,
on=(model.performer == Performer.id).alias('performer')))
if namespace is None and repository is None:
query = (query.switch(LogEntry).join(Account, JOIN_LEFT_OUTER,
on=(LogEntry.account == Account.id).alias('account')))
query = (query.switch(model).join(Account, JOIN_LEFT_OUTER,
on=(model.account == Account.id).alias('account')))
return query
@ -93,6 +101,7 @@ def _json_serialize(obj):
def log_action(kind_name, user_or_organization_name, performer=None, repository=None, ip=None,
metadata={}, timestamp=None):
""" Logs an entry in the LogEntry2 table. """
if not timestamp:
timestamp = datetime.today()
@ -123,7 +132,7 @@ def log_action(kind_name, user_or_organization_name, performer=None, repository=
}
try:
LogEntry.create(**log_data)
LogEntry2.create(**log_data)
except PeeweeException as ex:
strict_logging_disabled = config.app_config.get('ALLOW_PULLS_WITHOUT_STRICT_LOGGING')
if strict_logging_disabled and kind_name in ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING:
@ -132,39 +141,47 @@ def log_action(kind_name, user_or_organization_name, performer=None, repository=
raise
def get_stale_logs_start_id():
def get_stale_logs_start_id(model):
""" Gets the oldest log entry. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
try:
return (LogEntry.select(LogEntry.id).order_by(LogEntry.id).limit(1).tuples())[0][0]
return (model.select(model.id).order_by(model.id).limit(1).tuples())[0][0]
except IndexError:
return None
def get_stale_logs_cutoff_id(cutoff_date):
def get_stale_logs_cutoff_id(cutoff_date, model):
""" Gets the most recent ID created before the cutoff_date. """
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
try:
return (LogEntry.select(fn.Max(LogEntry.id)).where(LogEntry.datetime <= cutoff_date)
return (model.select(fn.Max(model.id)).where(model.datetime <= cutoff_date)
.tuples())[0][0]
except IndexError:
return None
def get_stale_logs(start_id, end_id):
def get_stale_logs(start_id, end_id, model):
""" Returns all the logs with IDs between start_id and end_id inclusively. """
return LogEntry.select().where((LogEntry.id >= start_id), (LogEntry.id <= end_id))
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
return model.select().where((model.id >= start_id), (model.id <= end_id))
def delete_stale_logs(start_id, end_id):
def delete_stale_logs(start_id, end_id, model):
""" Deletes all the logs with IDs between start_id and end_id. """
LogEntry.delete().where((LogEntry.id >= start_id), (LogEntry.id <= end_id)).execute()
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
model.delete().where((model.id >= start_id), (model.id <= end_id)).execute()
def get_repository_action_counts(repo, start_date):
""" Returns the daily aggregated action counts for the given repository, starting at the given
start date.
"""
return RepositoryActionCount.select().where(RepositoryActionCount.repository == repo,
RepositoryActionCount.date >= start_date)
def get_repositories_action_sums(repository_ids):
""" Returns a map from repository ID to total actions within that repository in the last week. """
if not repository_ids:
return {}

View file

@ -4,8 +4,8 @@ from collections import namedtuple
from peewee import IntegrityError
from datetime import date, timedelta, datetime
from data.database import (Repository, LogEntry, RepositoryActionCount, RepositorySearchScore,
db_random_func, fn)
from data.database import (Repository, LogEntry, LogEntry2, RepositoryActionCount,
RepositorySearchScore, db_random_func, fn)
logger = logging.getLogger(__name__)
@ -52,13 +52,16 @@ def count_repository_actions(to_count):
today = date.today()
yesterday = today - timedelta(days=1)
actions = (LogEntry
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
def lookup_action_count(model):
return (model
.select()
.where(LogEntry.repository == to_count,
LogEntry.datetime >= yesterday,
LogEntry.datetime < today)
.where(model.repository == to_count,
model.datetime >= yesterday,
model.datetime < today)
.count())
actions = lookup_action_count(LogEntry) + lookup_action_count(LogEntry2)
try:
RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions)
return True

View file

@ -1,6 +1,6 @@
import pytest
from data.database import LogEntry, User
from data.database import LogEntry, LogEntry2, User
from data.model import config as _config
from data.model.log import log_action
@ -21,8 +21,8 @@ def logentry_kind():
@pytest.fixture()
def logentry(logentry_kind):
with patch('data.database.LogEntry.create', spec=True):
yield LogEntry
with patch('data.database.LogEntry2.create', spec=True):
yield LogEntry2
@pytest.fixture()
def user():

View file

@ -1,3 +1,5 @@
import itertools
from data import model, database
from endpoints.api.logs_models_interface import LogEntryDataInterface, LogEntryPage, LogEntry, AggregatedLogEntry
@ -47,15 +49,33 @@ class PreOCIModel(LogEntryDataInterface):
if performer_name:
performer = model.user.get_user(performer_name)
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
def get_logs(m):
logs_query = model.log.get_logs_query(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore)
logs, next_page_token = model.modelutil.paginate(logs_query, database.LogEntry, descending=True,
page_token=page_token, limit=20)
ignore=ignore, model=m)
logs, next_page_token = model.modelutil.paginate(logs_query, m,
descending=True, page_token=page_token,
limit=20)
return LogEntryPage([_create_log(log) for log in logs], next_page_token)
# First check the LogEntry2 table for the most recent logs, unless we've been expressly told
# to look inside the first table.
TOKEN_TABLE_KEY = 'ttk'
is_old_table = page_token is not None and page_token.get(TOKEN_TABLE_KEY) == 1
if is_old_table:
page_result = get_logs(database.LogEntry)
else:
page_result = get_logs(database.LogEntry2)
if page_result.next_page_token is None and not is_old_table:
page_result = page_result._replace(next_page_token={TOKEN_TABLE_KEY: 1})
elif is_old_table and page_result.next_page_token is not None:
page_result.next_page_token[TOKEN_TABLE_KEY] = 1
return page_result
def get_log_entry_kinds(self):
return model.log.get_log_entry_kinds()
@ -75,10 +95,23 @@ class PreOCIModel(LogEntryDataInterface):
if performer_name:
performer = model.user.get_user(performer_name)
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
aggregated_logs = model.log.get_aggregated_logs(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore)
return [AggregatedLogEntry(log.count, log.kind_id, log.day) for log in aggregated_logs]
ignore=ignore, model=database.LogEntry)
aggregated_logs_2 = model.log.get_aggregated_logs(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore, model=database.LogEntry2)
entries = {}
for log in itertools.chain(aggregated_logs, aggregated_logs_2):
key = '%s-%s' % (log.kind_id, log.day)
if key in entries:
entries[key] = AggregatedLogEntry(log.count + entries[key].count, log.kind_id, log.day)
else:
entries[key] = AggregatedLogEntry(log.count, log.kind_id, log.day)
return entries.values()
pre_oci_model = PreOCIModel()

View file

@ -27,6 +27,7 @@ from endpoints.api.build import get_logs_or_log_url
from endpoints.api.superuser_models_pre_oci import (pre_oci_model, ServiceKeyDoesNotExist,
ServiceKeyAlreadyApproved,
InvalidRepositoryBuildException)
from endpoints.api.logs_models_pre_oci import pre_oci_model as log_model
from util.useremails import send_confirmation_email, send_recovery_email
from util.security.ssl import load_certificate, CertInvalidException
from util.config.validator import EXTRA_CA_DIRECTORY
@ -137,10 +138,12 @@ class SuperUserAggregateLogs(ApiResource):
if SuperUserPermission().can():
(start_time, end_time) = _validate_logs_arguments(parsed_args['starttime'],
parsed_args['endtime'])
aggregated_logs = pre_oci_model.get_aggregated_logs(start_time, end_time)
# TODO(LogMigrate): Change to a unified log lookup util lib once we're back on LogEntry only.
aggregated_logs = log_model.get_aggregated_logs(start_time, end_time)
kinds = log_model.get_log_entry_kinds()
return {
'aggregated': [log.to_dict() for log in aggregated_logs]
'aggregated': [log.to_dict(kinds, start_time) for log in aggregated_logs]
}
raise Unauthorized()
@ -168,12 +171,14 @@ class SuperUserLogs(ApiResource):
start_time = parsed_args['starttime']
end_time = parsed_args['endtime']
(start_time, end_time) = _validate_logs_arguments(start_time, end_time)
log_page = pre_oci_model.get_logs_query(start_time, end_time, page_token=page_token)
# TODO(LogMigrate): Change to a unified log lookup util lib once we're back on LogEntry only.
log_page = log_model.get_logs_query(start_time, end_time, page_token=page_token)
kinds = log_model.get_log_entry_kinds()
return {
'start_time': format_date(start_time),
'end_time': format_date(end_time),
'logs': [log.to_dict() for log in log_page.logs],
'logs': [log.to_dict(kinds, include_namespace=True) for log in log_page.logs],
}, log_page.next_page_token
raise Unauthorized()

View file

@ -22,7 +22,7 @@ def test_get_logs_query(monkeypatch):
monkeypatch.setattr(model.modelutil, 'paginate', paginate_mock)
assert pre_oci_model.get_logs_query('start_time', 'end_time', 'preformer_namne', 'repository_name', 'namespace_name',
set(), 'page_token') == LogEntryPage([], {})
set(), None) == LogEntryPage([], {})
def test_get_logs_query_returns_list_log_entries(monkeypatch):
@ -52,6 +52,7 @@ def test_get_logs_query_returns_list_log_entries(monkeypatch):
False, 'account_username', 'account_email', False, 1)], {'key': 'value'})
@pytest.mark.skip('Turned off until we move back to a single LogEntry table')
def test_get_logs_query_calls_get_repository(monkeypatch):
repo_mock = Mock()
performer_mock = Mock()
@ -109,7 +110,8 @@ def test_does_repo_exist_returns_true(monkeypatch):
def test_get_aggregated_logs(monkeypatch):
get_aggregated_logs_mock = Mock()
get_aggregated_logs_mock.return_value = [AttrDict({'day': '1', 'kind_id': 4, 'count': 12})]
get_aggregated_logs_mock.side_effect = [[AttrDict({'day': '1', 'kind_id': 4, 'count': 6})],
[AttrDict({'day': '1', 'kind_id': 4, 'count': 12})]]
monkeypatch.setattr(model.log, 'get_aggregated_logs', get_aggregated_logs_mock)
repo_mock = Mock()
@ -125,4 +127,4 @@ def test_get_aggregated_logs(monkeypatch):
actual = pre_oci_model.get_aggregated_logs('start_time', 'end_time', 'performer_name', 'repository_name',
'namespace_name', set())
assert actual == [AggregatedLogEntry(12, 4, '1')]
assert actual == [AggregatedLogEntry(18, 4, '1')]

View file

@ -109,7 +109,7 @@ def require_xhr_from_browser(func):
if app.config.get('BROWSER_API_CALLS_XHR_ONLY', False):
if request.method == 'GET' and request.user_agent.browser:
has_xhr_header = request.headers.get('X-Requested-With') == 'XMLHttpRequest'
if not has_xhr_header:
if not has_xhr_header and not app.config.get('DEBUGGING') == True:
logger.warning('Disallowed possible RTA to URL %s with user agent %s',
request.path, request.user_agent)
abort(400)

View file

@ -899,7 +899,7 @@ def populate_database(minimal=False, with_storage=False):
model.repositoryactioncount.update_repository_score(to_count)
WHITELISTED_EMPTY_MODELS = ['DeletedNamespace']
WHITELISTED_EMPTY_MODELS = ['DeletedNamespace', 'LogEntry']
def find_models_missing_data():
# As a sanity check we are going to make sure that all db tables have some data, unless explicitly

View file

@ -2,8 +2,8 @@ import multiprocessing
import time
import socket
from data.database import LogEntryKind, LogEntry
from contextlib import contextmanager
from data.database import LogEntryKind, LogEntry2
class assert_action_logged(object):
""" Specialized assertion for ensuring that a log entry of a particular kind was added under the
@ -14,7 +14,7 @@ class assert_action_logged(object):
self.existing_count = 0
def _get_log_count(self):
return LogEntry.select().where(LogEntry.kind == LogEntryKind.get(name=self.log_kind)).count()
return LogEntry2.select().where(LogEntry2.kind == LogEntryKind.get(name=self.log_kind)).count()
def __enter__(self):
self.existing_count = self._get_log_count()

View file

@ -1,45 +0,0 @@
import logging
from datetime import timedelta, datetime
from app import app
from data.database import LogEntry
logger = logging.getLogger(__name__)
LOG_FORMAT = "%(asctime)s [%(process)d] [%(levelname)s] [%(name)s] %(message)s"
BATCH_SIZE = 1000
def delete_old_logentries(delete_before):
delete_up_to_id = (LogEntry
.select(LogEntry.id)
.where(LogEntry.datetime <= delete_before)
.order_by(LogEntry.id.desc())
.limit(1)
.tuples())[0][0]
logger.debug('Deleting up to id: %s', delete_up_to_id)
start_from_id = (LogEntry
.select(LogEntry.id)
.order_by(LogEntry.id)
.limit(1)
.tuples())[0][0]
logger.debug('Starting from id: %s', start_from_id)
deleted = 1
current_batch_end = min(start_from_id + BATCH_SIZE, delete_up_to_id)
while deleted > 0 or current_batch_end < delete_up_to_id:
deleted = (LogEntry
.delete()
.where(LogEntry.id <= current_batch_end)
.execute())
logger.debug('Deleted %s entries', deleted)
current_batch_end = min(current_batch_end + BATCH_SIZE, delete_up_to_id)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
now = datetime.now()
one_month_ago = now - timedelta(days=30)
delete_old_logentries(one_month_ago)

View file

@ -8,7 +8,7 @@ from tempfile import SpooledTemporaryFile
import features
from app import app, storage
from data.database import UseThenDisconnect
from data.database import UseThenDisconnect, LogEntry, LogEntry2
from data.model.log import (get_stale_logs, get_stale_logs_start_id,
get_stale_logs_cutoff_id, delete_stale_logs)
from data.userfiles import DelegateUserfiles
@ -35,11 +35,17 @@ class LogRotateWorker(Worker):
self.add_operation(self._archive_logs, WORKER_FREQUENCY)
def _archive_logs(self):
# TODO(LogMigrate): Remove the branch once we're back on LogEntry only.
models = [LogEntry, LogEntry2]
for model in models:
self._archive_logs_for_model(model)
def _archive_logs_for_model(self, model):
logger.debug('Attempting to rotate log entries')
with UseThenDisconnect(app.config):
cutoff_date = datetime.now() - STALE_AFTER
cutoff_id = get_stale_logs_cutoff_id(cutoff_date)
cutoff_id = get_stale_logs_cutoff_id(cutoff_date, model)
if cutoff_id is None:
logger.warning('Failed to find cutoff id')
return
@ -48,11 +54,11 @@ class LogRotateWorker(Worker):
while logs_archived:
try:
with GlobalLock('ACTION_LOG_ROTATION'):
logs_archived = self._perform_archiving(cutoff_id)
logs_archived = self._perform_archiving(cutoff_id, model)
except LockNotAcquiredException:
return
def _perform_archiving(self, cutoff_id):
def _perform_archiving(self, cutoff_id, model):
save_location = SAVE_LOCATION
if not save_location:
# Pick the *same* save location for all instances. This is a fallback if
@ -62,7 +68,7 @@ class LogRotateWorker(Worker):
log_archive = DelegateUserfiles(app, storage, save_location, SAVE_PATH)
with UseThenDisconnect(app.config):
start_id = get_stale_logs_start_id()
start_id = get_stale_logs_start_id(model)
if start_id is None:
logger.warning('Failed to find start id')
@ -76,7 +82,7 @@ class LogRotateWorker(Worker):
return False
end_id = start_id + MIN_LOGS_PER_ROTATION
logs = [log_dict(log) for log in get_stale_logs(start_id, end_id)]
logs = [log_dict(log) for log in get_stale_logs(start_id, end_id, model)]
logger.debug('Archiving logs from IDs %s to %s', start_id, end_id)
with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
@ -85,14 +91,14 @@ class LogRotateWorker(Worker):
zipstream.write(chunk)
tempfile.seek(0)
filename = '%d-%d.txt.gz' % (start_id, end_id)
filename = '%d-%d-%s.txt.gz' % (start_id, end_id, model.__name__.lower())
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
file_id=filename)
logger.debug('Finished archiving logs from IDs %s to %s', start_id, end_id)
with UseThenDisconnect(app.config):
logger.debug('Deleting logs from IDs %s to %s', start_id, end_id)
delete_stale_logs(start_id, end_id)
delete_stale_logs(start_id, end_id, model)
return True