Merge pull request #3323 from quay/joseph.schorr/QUAY-1282/log-interfacing

Interface out all action log data model operations
This commit is contained in:
Joseph Schorr 2019-01-28 15:09:25 -05:00 committed by GitHub
commit 9f09d68ad8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 714 additions and 902 deletions

View file

@ -11,10 +11,10 @@ from enum import Enum, unique
import features
from app import app, export_action_logs_queue, storage, get_app_url
from data import model
from app import app, export_action_logs_queue, storage, get_app_url, avatar
from endpoints.api import format_date
from endpoints.api.logs_models_pre_oci import create_log
from data.logs_model import logs_model
from data.logs_model.interface import LogsIterationTimeout
from workers.queueworker import QueueWorker, JobException
from util.log import logfile_path
from util.useremails import send_logs_exported_email
@ -29,10 +29,6 @@ MAXIMUM_WORK_PERIOD_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_SECONDS
MAXIMUM_QUERY_TIME_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_QUERY_TIME_SECONDS', 30)
EXPORTED_LOGS_EXPIRATION_SECONDS = app.config.get('EXPORT_ACTION_LOGS_SECONDS', 60 * 60) # 1 hour
MINIMUM_RANGE_SIZE = 1000
MAXIMUM_RANGE_SIZE = 100000
EXPECTED_ITERATION_LOG_COUNT = 1000
@unique
class ExportResult(Enum):
@ -79,29 +75,6 @@ class ExportActionLogsWorker(QueueWorker):
repository_id = job_details['repository_id']
max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS)
min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_time,
repository_id,
namespace_id))
if elapsed > max_query_time:
logger.error('Retrieval of min ID for export logs `%s` timed out with time of `%s`',
export_id, elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return
max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_time,
repository_id,
namespace_id))
if elapsed > max_query_time:
logger.error('Retrieval of max ID for export logs `%s` timed out with time of `%s`',
export_id, elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return
min_id = min_id or 1
max_id = max_id or 1
logger.info('Found log range of %s to %s for export logs `%s`', min_id, max_id, export_id)
# Generate a file key so that if we return an API URL, it cannot simply be constructed from
# just the export ID.
file_key = str(uuid.uuid4())
@ -131,10 +104,13 @@ class ExportActionLogsWorker(QueueWorker):
BytesIO(str(prefix_data)), upload_metadata)
uploaded_byte_count = len(prefix_data)
logs_iterator = logs_model.yield_logs_for_export(start_time, end_time, repository_id,
namespace_id, max_query_time)
try:
# Stream the logs to storage as chunks.
updated_metadata, uploaded_byte_count = self._stream_logs(upload_id, upload_metadata,
uploaded_byte_count, min_id, max_id,
uploaded_byte_count, logs_iterator,
job_details)
if updated_metadata is None:
storage.cancel_chunked_upload(upload_id, upload_metadata)
@ -169,69 +145,35 @@ class ExportActionLogsWorker(QueueWorker):
self._report_results(job_details, ExportResult.SUCCESSFUL_EXPORT, export_url)
def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, min_id, max_id,
def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, logs_iterator,
job_details):
export_id = job_details['export_id']
max_work_period = timedelta(seconds=MAXIMUM_WORK_PERIOD_SECONDS)
max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS)
kinds = model.log.get_log_entry_kinds()
# Using an adjusting scale, start downloading log rows in batches, starting at
# MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or
# the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes
# longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out.
batch_start_time = datetime.utcnow()
current_start_id = min_id
current_batch_size = MINIMUM_RANGE_SIZE
try:
for logs in logs_iterator:
work_elapsed = datetime.utcnow() - batch_start_time
if work_elapsed > max_work_period:
logger.error('Retrieval of logs `%s` timed out with time of `%s`',
export_id, work_elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return None, None
while current_start_id <= max_id:
# Verify we haven't been working for too long.
work_elapsed = datetime.utcnow() - batch_start_time
if work_elapsed > max_work_period:
logger.error('Retrieval of logs `%s` timed out with time of `%s`',
export_id, work_elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return None, None
logs_data = ''
if logs:
logs_data = ','.join([json.dumps(log.to_dict(avatar, False)) for log in logs]) + ','
id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)]
# Load the next set of logs.
def retrieve_and_write_logs():
namespace_id = job_details['namespace_id'] if not job_details.get('repository_id') else None
repository_id = job_details.get('repository_id')
logger.debug('Retrieving logs over range %s with namespace %s and repository %s',
id_range, namespace_id, repository_id)
logs_query = model.log.get_logs_query(namespace=namespace_id,
repository=repository_id,
id_range=id_range)
return [create_log(log) for log in logs_query]
logs, elapsed = _run_and_time(retrieve_and_write_logs)
if elapsed > max_query_time:
logger.error('Retrieval of logs for export logs `%s` with range `%s` timed out at `%s`',
export_id, id_range, elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return None, None
# Write the logs to storage.
logger.debug('Writing %s retrieved logs for range %s', len(logs), id_range)
if logs:
logs_data = ','.join([json.dumps(log.to_dict(kinds, False)) for log in logs]) + ','
logs_data = logs_data.encode('utf-8')
upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id,
uploaded_byte_count, -1,
BytesIO(logs_data),
upload_metadata)
uploaded_byte_count += len(logs_data)
# Move forward.
current_start_id = id_range[1] + 1
# Increase the batch size if necessary.
if len(logs) < EXPECTED_ITERATION_LOG_COUNT:
current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2)
except LogsIterationTimeout:
logger.error('Retrieval of logs for export logs timed out at `%s`', work_elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return None, None
return upload_metadata, uploaded_byte_count
@ -271,11 +213,6 @@ def _parse_time(specified_time):
except ValueError:
return None
def _run_and_time(fn):
start_time = datetime.utcnow()
result = fn()
return result, datetime.utcnow() - start_time
if __name__ == "__main__":
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)

View file

@ -1,7 +1,10 @@
import logging
from datetime import date, timedelta
from app import app # This is required to initialize the database.
from data import model
from data.logs_model import logs_model
from workers.worker import Worker
POLL_PERIOD_SECONDS = 10
@ -21,8 +24,16 @@ class RepositoryActionCountWorker(Worker):
logger.debug('No further repositories to count')
return False
yesterday = date.today() - timedelta(days=1)
logger.debug('Found repository #%s to count', to_count.id)
was_counted = model.repositoryactioncount.count_repository_actions(to_count)
daily_count = logs_model.count_repository_actions(to_count, yesterday)
if daily_count is None:
logger.debug('Could not load count for repository #%s', to_count.id)
return False
was_counted = model.repositoryactioncount.store_repository_action_count(to_count, yesterday,
daily_count)
if not was_counted:
logger.debug('Repository #%s was counted by another worker', to_count.id)
return False

View file

@ -5,7 +5,8 @@ from datetime import datetime, timedelta
from httmock import urlmatch, HTTMock
from data import model, database
from data import model
from data.logs_model import logs_model
from workers.exportactionlogsworker import ExportActionLogsWorker
from test.fixtures import *
@ -21,9 +22,6 @@ def test_process_queue_item(namespace, repo_name, expects_logs, app):
repo = model.repository.get_repository(namespace, repo_name)
assert (model.log.get_maximum_id_for_logs(end_time, repository_id=repo.id) is not None) == expects_logs
assert (model.log.get_minimum_id_for_logs(start_time, repository_id=repo.id) is not None) == expects_logs
worker = ExportActionLogsWorker(None)
called = [{}]
@ -59,7 +57,9 @@ def test_process_queue_item(namespace, repo_name, expects_logs, app):
created = storage.get_content(storage.preferred_locations, 'exportedactionlogs/' + storage_id)
created_json = json.loads(created)
expected_count = database.LogEntry3.select().where(database.LogEntry3.repository == repo).count()
expected_count = len(logs_model.lookup_logs(start_time, end_time, namespace_name=namespace,
repository_name=repo_name).logs)
assert (expected_count > 1) == expects_logs
assert created_json['export_id'] == 'someid'