Change bulk logs downloader to use datetime ranges as well
This commit is contained in:
parent
d4c74bc1d3
commit
ec9913326a
1 changed files with 17 additions and 37 deletions
|
@ -16,8 +16,8 @@ from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage, _
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
MINIMUM_RANGE_SIZE = 1000
|
MINIMUM_RANGE_SIZE = 1 # second
|
||||||
MAXIMUM_RANGE_SIZE = 100000
|
MAXIMUM_RANGE_SIZE = 60 * 60 * 24 * 30 # seconds ~= 1 month
|
||||||
EXPECTED_ITERATION_LOG_COUNT = 1000
|
EXPECTED_ITERATION_LOG_COUNT = 1000
|
||||||
|
|
||||||
|
|
||||||
|
@ -151,40 +151,16 @@ class TableLogsModel(ActionLogsDataInterface):
|
||||||
|
|
||||||
def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None,
|
def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None,
|
||||||
namespace_id=None, max_query_time=None):
|
namespace_id=None, max_query_time=None):
|
||||||
# Lookup the starting and ending IDs for the log range in the table. This operation is quite
|
|
||||||
# quick, so we use it as a bounding box for the later lookups.
|
|
||||||
min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_datetime,
|
|
||||||
repository_id,
|
|
||||||
namespace_id))
|
|
||||||
if elapsed > max_query_time:
|
|
||||||
logger.error('Retrieval of min ID for export logs `%s/%s` timed out with time of `%s`',
|
|
||||||
namespace_id, repository_id, elapsed)
|
|
||||||
raise LogsIterationTimeout()
|
|
||||||
|
|
||||||
max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_datetime,
|
|
||||||
repository_id,
|
|
||||||
namespace_id))
|
|
||||||
if elapsed > max_query_time:
|
|
||||||
logger.error('Retrieval of max ID for export logs `%s/%s` timed out with time of `%s`',
|
|
||||||
namespace_id, repository_id, elapsed)
|
|
||||||
raise LogsIterationTimeout()
|
|
||||||
|
|
||||||
min_id = min_id or 1
|
|
||||||
max_id = max_id or 1
|
|
||||||
|
|
||||||
logger.info('Found log range of %s to %s for export logs `%s/%s`', min_id, max_id,
|
|
||||||
namespace_id, repository_id)
|
|
||||||
|
|
||||||
# Using an adjusting scale, start downloading log rows in batches, starting at
|
# Using an adjusting scale, start downloading log rows in batches, starting at
|
||||||
# MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or
|
# MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or
|
||||||
# the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes
|
# the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes
|
||||||
# longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out.
|
# longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out.
|
||||||
batch_start_time = datetime.utcnow()
|
batch_start_time = datetime.utcnow()
|
||||||
|
|
||||||
current_start_id = min_id
|
current_start_datetime = start_datetime
|
||||||
current_batch_size = MINIMUM_RANGE_SIZE
|
current_batch_size = timedelta(seconds=MINIMUM_RANGE_SIZE)
|
||||||
|
|
||||||
while current_start_id <= max_id:
|
while current_start_datetime < end_datetime:
|
||||||
# Verify we haven't been working for too long.
|
# Verify we haven't been working for too long.
|
||||||
work_elapsed = datetime.utcnow() - batch_start_time
|
work_elapsed = datetime.utcnow() - batch_start_time
|
||||||
if work_elapsed > max_query_time:
|
if work_elapsed > max_query_time:
|
||||||
|
@ -192,32 +168,36 @@ class TableLogsModel(ActionLogsDataInterface):
|
||||||
namespace_id, repository_id, work_elapsed)
|
namespace_id, repository_id, work_elapsed)
|
||||||
raise LogsIterationTimeout()
|
raise LogsIterationTimeout()
|
||||||
|
|
||||||
id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)]
|
current_end_datetime = current_start_datetime + current_batch_size
|
||||||
|
current_end_datetime = min(current_end_datetime, end_datetime)
|
||||||
|
|
||||||
# Load the next set of logs.
|
# Load the next set of logs.
|
||||||
def load_logs():
|
def load_logs():
|
||||||
logger.debug('Retrieving logs over range %s with namespace %s and repository %s',
|
logger.debug('Retrieving logs over range %s-%s with namespace %s and repository %s',
|
||||||
id_range, namespace_id, repository_id)
|
current_start_datetime, current_end_datetime, namespace_id, repository_id)
|
||||||
|
|
||||||
logs_query = model.log.get_logs_query(namespace=namespace_id,
|
logs_query = model.log.get_logs_query(namespace=namespace_id,
|
||||||
repository=repository_id,
|
repository=repository_id,
|
||||||
id_range=id_range)
|
start_time=current_start_datetime,
|
||||||
|
end_time=current_end_datetime)
|
||||||
return [Log.for_logentry(log) for log in logs_query]
|
return [Log.for_logentry(log) for log in logs_query]
|
||||||
|
|
||||||
logs, elapsed = _run_and_time(load_logs)
|
logs, elapsed = _run_and_time(load_logs)
|
||||||
if elapsed > max_query_time:
|
if elapsed > max_query_time:
|
||||||
logger.error('Retrieval of logs for export logs `%s/%s` with range `%s` timed out at `%s`',
|
logger.error('Retrieval of logs for export `%s/%s` with range `%s-%s` timed out at `%s`',
|
||||||
namespace_id, repository_id, id_range, elapsed)
|
namespace_id, repository_id, current_start_datetime, current_end_datetime,
|
||||||
|
elapsed)
|
||||||
raise LogsIterationTimeout()
|
raise LogsIterationTimeout()
|
||||||
|
|
||||||
yield logs
|
yield logs
|
||||||
|
|
||||||
# Move forward.
|
# Move forward.
|
||||||
current_start_id = id_range[1] + 1
|
current_start_datetime = current_end_datetime
|
||||||
|
|
||||||
# Increase the batch size if necessary.
|
# Increase the batch size if necessary.
|
||||||
if len(logs) < EXPECTED_ITERATION_LOG_COUNT:
|
if len(logs) < EXPECTED_ITERATION_LOG_COUNT:
|
||||||
current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2)
|
seconds = min(MAXIMUM_RANGE_SIZE, current_batch_size.total_seconds() * 2)
|
||||||
|
current_batch_size = timedelta(seconds=seconds)
|
||||||
|
|
||||||
|
|
||||||
def _run_and_time(fn):
|
def _run_and_time(fn):
|
||||||
|
|
Reference in a new issue