Merge pull request #3383 from quay/joseph.schorr/QUAY-1371/optimize-log-archive

Optimize the logs archiving worker to not issue extremely long queries
This commit is contained in:
Joseph Schorr 2019-02-26 16:05:50 -05:00 committed by GitHub
commit 07fccb8703
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 32 deletions

View file

@ -151,25 +151,17 @@ def get_stale_logs_start_id(model):
""" Gets the oldest log entry. """ """ Gets the oldest log entry. """
# TODO(LogMigrate): Remove the branch once we're back on a single table. # TODO(LogMigrate): Remove the branch once we're back on a single table.
try: try:
return (model.select(model.id).order_by(model.id).limit(1).tuples())[0][0] return (model.select(fn.Min(model.id)).tuples())[0][0]
except IndexError: except IndexError:
return None return None
def get_stale_logs_cutoff_id(cutoff_date, model): def get_stale_logs(start_id, end_id, model, cutoff_date):
""" Gets the most recent ID created before the cutoff_date. """
# TODO(LogMigrate): Remove the branch once we're back on a single table.
try:
return (model.select(fn.Max(model.id)).where(model.datetime <= cutoff_date)
.tuples())[0][0]
except IndexError:
return None
def get_stale_logs(start_id, end_id, model):
""" Returns all the logs with IDs between start_id and end_id inclusively. """ """ Returns all the logs with IDs between start_id and end_id inclusively. """
# TODO(LogMigrate): Remove the branch once we're back on a single table. # TODO(LogMigrate): Remove the branch once we're back on a single table.
return model.select().where((model.id >= start_id), (model.id <= end_id)) return model.select().where((model.id >= start_id),
(model.id <= end_id),
model.datetime <= cutoff_date)
def delete_stale_logs(start_id, end_id, model): def delete_stale_logs(start_id, end_id, model):

View file

@ -10,7 +10,7 @@ import features
from app import app, storage from app import app, storage
from data.database import UseThenDisconnect, LogEntry, LogEntry2, LogEntry3 from data.database import UseThenDisconnect, LogEntry, LogEntry2, LogEntry3
from data.model.log import (get_stale_logs, get_stale_logs_start_id, from data.model.log import (get_stale_logs, get_stale_logs_start_id,
get_stale_logs_cutoff_id, delete_stale_logs) delete_stale_logs)
from data.userfiles import DelegateUserfiles from data.userfiles import DelegateUserfiles
from util.locking import GlobalLock, LockNotAcquiredException from util.locking import GlobalLock, LockNotAcquiredException
from util.log import logfile_path from util.log import logfile_path
@ -21,7 +21,7 @@ from workers.worker import Worker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
JSON_MIMETYPE = 'application/json' JSON_MIMETYPE = 'application/json'
MIN_LOGS_PER_ROTATION = 10000 MIN_LOGS_PER_ROTATION = 5000
MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024 MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024
WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 12) WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 12)
@ -44,22 +44,16 @@ class LogRotateWorker(Worker):
def _archive_logs_for_model(self, model): def _archive_logs_for_model(self, model):
logger.debug('Attempting to rotate log entries') logger.debug('Attempting to rotate log entries')
with UseThenDisconnect(app.config):
cutoff_date = datetime.now() - STALE_AFTER
cutoff_id = get_stale_logs_cutoff_id(cutoff_date, model)
if cutoff_id is None:
logger.warning('Failed to find cutoff id')
return
logs_archived = True logs_archived = True
cutoff_date = datetime.now() - STALE_AFTER
while logs_archived: while logs_archived:
try: try:
with GlobalLock('ACTION_LOG_ROTATION'): with GlobalLock('ACTION_LOG_ROTATION'):
logs_archived = self._perform_archiving(cutoff_id, model) logs_archived = self._perform_archiving(model, cutoff_date)
except LockNotAcquiredException: except LockNotAcquiredException:
return return
def _perform_archiving(self, cutoff_id, model): def _perform_archiving(self, model, cutoff_date):
save_location = SAVE_LOCATION save_location = SAVE_LOCATION
if not save_location: if not save_location:
# Pick the *same* save location for all instances. This is a fallback if # Pick the *same* save location for all instances. This is a fallback if
@ -75,20 +69,21 @@ class LogRotateWorker(Worker):
logger.warning('Failed to find start id') logger.warning('Failed to find start id')
return False return False
logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id) logger.debug('Found starting ID %s', start_id)
lookup_end_id = start_id + MIN_LOGS_PER_ROTATION
logs = [log for log in get_stale_logs(start_id, lookup_end_id, model, cutoff_date)]
approx_count = cutoff_id - start_id if not logs:
if approx_count < MIN_LOGS_PER_ROTATION: logger.debug('No further logs found')
logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) return False
return False
end_id = start_id + MIN_LOGS_PER_ROTATION end_id = max([log.id for log in logs])
logs = [log_dict(log) for log in get_stale_logs(start_id, end_id, model)] formatted_logs = [log_dict(log) for log in logs]
logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) logger.debug('Archiving logs from IDs %s to %s', start_id, end_id)
with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
with GzipFile('temp_action_log_rotate', fileobj=tempfile, compresslevel=1) as zipstream: with GzipFile('temp_action_log_rotate', fileobj=tempfile, compresslevel=1) as zipstream:
for chunk in StreamingJSONEncoder().iterencode(logs): for chunk in StreamingJSONEncoder().iterencode(formatted_logs):
zipstream.write(chunk) zipstream.write(chunk)
tempfile.seek(0) tempfile.seek(0)

View file

@ -0,0 +1,60 @@
from datetime import datetime, timedelta
from data import model
from data.database import LogEntry3
from workers.logrotateworker import LogRotateWorker
from test.fixtures import *
def test_logrotateworker(initialized_db):
worker = LogRotateWorker()
logs = list(LogEntry3.select())
# Ensure there are logs.
assert logs
# Ensure we don't find any to archive that are new.
assert not worker._perform_archiving(LogEntry3, datetime.utcnow() - timedelta(weeks=6))
assert len(list(LogEntry3.select())) == len(logs)
# Archive all the logs.
assert worker._perform_archiving(LogEntry3, datetime.utcnow() + timedelta(days=1))
# Ensure all the logs were archived.
assert not list(LogEntry3.select())
def test_logrotateworker_withcutoff(initialized_db):
# Delete all existing logs.
LogEntry3.delete().execute()
# Create a new set of logs.
start_timestamp = datetime.now()
for day in range(0, 100):
model.log.log_action('push_repo', 'devtable', timestamp=start_timestamp + timedelta(days=day))
logs = list(LogEntry3.select())
# Sort and find a midpoint in the logs.
logs.sort(key=lambda l: l.datetime)
midpoint = logs[0:len(logs)/2]
assert midpoint
assert len(midpoint) < len(logs)
# Archive the earlier logs.
worker = LogRotateWorker()
assert worker._perform_archiving(LogEntry3, midpoint[-1].datetime)
# Ensure the earlier logs were archived.
for log in midpoint:
try:
LogEntry3.get(id=log.id)
assert False, "Found unexpected log"
except LogEntry3.DoesNotExist:
pass
# Ensure the later logs were not.
for log in logs[len(logs)/2:]:
LogEntry3.get(id=log.id)