Merge pull request #618 from jzelinskie/logsworker
add a log rotation worker
This commit is contained in:
		
						commit
						f439ad7804
					
				
					 5 changed files with 147 additions and 0 deletions
				
			
		
							
								
								
									
										2
									
								
								conf/init/service/logrotateworker/log/run
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										2
									
								
								conf/init/service/logrotateworker/log/run
									
										
									
									
									
										Executable file
									
								
							|  | @ -0,0 +1,2 @@ | |||
| #!/bin/sh | ||||
| exec logger -i -t logrotateworker | ||||
							
								
								
									
										8
									
								
								conf/init/service/logrotateworker/run
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										8
									
								
								conf/init/service/logrotateworker/run
									
										
									
									
									
										Executable file
									
								
							|  | @ -0,0 +1,8 @@ | |||
| #! /bin/bash | ||||
| 
 | ||||
| echo 'Starting log rotation worker' | ||||
| 
 | ||||
| cd / | ||||
| venv/bin/python -m workers.logrotateworker | ||||
| 
 | ||||
| echo 'Log rotation worker exited' | ||||
|  | @ -197,6 +197,9 @@ class DefaultConfig(object): | |||
|   # Documentation: http://pythonhosted.org/semantic_version/reference.html#semantic_version.Spec | ||||
|   BLACKLIST_V2_SPEC = '<1.6.0' | ||||
| 
 | ||||
|   # Feature Flag: Whether or not to rotate old action logs to storage. | ||||
|   FEATURE_ACTION_LOG_ROTATION = False | ||||
| 
 | ||||
|   BUILD_MANAGER = ('enterprise', {}) | ||||
| 
 | ||||
|   DISTRIBUTED_STORAGE_CONFIG = { | ||||
|  | @ -218,6 +221,10 @@ class DefaultConfig(object): | |||
|   LOG_ARCHIVE_LOCATION = 'local_us' | ||||
|   LOG_ARCHIVE_PATH = 'logarchive/' | ||||
| 
 | ||||
|   # Action logs archive | ||||
|   ACTION_LOG_ARCHIVE_LOCATION = 'local_us' | ||||
|   ACTION_LOG_ARCHIVE_PATH = 'actionlogarchive/' | ||||
| 
 | ||||
|   # For enterprise: | ||||
|   MAXIMUM_REPOSITORY_USAGE = 20 | ||||
| 
 | ||||
|  |  | |||
|  | @ -124,3 +124,38 @@ def get_repository_usage(): | |||
|           .where(LogEntry.datetime >= one_month_ago) | ||||
|           .group_by(LogEntry.ip, LogEntry.repository) | ||||
|           .count()) | ||||
| 
 | ||||
| 
 | ||||
| def get_stale_logs_start_id(): | ||||
|   """ Gets the oldest log entry. """ | ||||
|   try: | ||||
|     return (LogEntry | ||||
|             .select(LogEntry.id) | ||||
|             .order_by(LogEntry.id) | ||||
|             .limit(1) | ||||
|             .tuples())[0][0] | ||||
|   except IndexError: | ||||
|     return None | ||||
| 
 | ||||
| 
 | ||||
| def get_stale_logs_end_id(cutoff_date): | ||||
|   """ Gets the most recent ID created before the cutoff_date. """ | ||||
|   try: | ||||
|     return (LogEntry | ||||
|             .select(LogEntry.id) | ||||
|             .where(LogEntry.datetime <= cutoff_date) | ||||
|             .order_by(LogEntry.id.desc()) | ||||
|             .limit(1) | ||||
|             .tuples())[0][0] | ||||
|   except IndexError: | ||||
|     return None | ||||
| 
 | ||||
| 
 | ||||
| def get_stale_logs(start_id, end_id): | ||||
|   """ Returns all the logs with IDs between start_id and end_id inclusively. """ | ||||
|   return LogEntry.select().where((LogEntry.id >= start_id), (LogEntry.id <= end_id)) | ||||
| 
 | ||||
| 
 | ||||
| def delete_stale_logs(start_id, end_id): | ||||
|   """ Deletes all the logs with IDs between start_id and end_id. """ | ||||
|   LogEntry.delete().where((LogEntry.id >= start_id), (LogEntry.id <= end_id)).execute() | ||||
|  |  | |||
							
								
								
									
										95
									
								
								workers/logrotateworker.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										95
									
								
								workers/logrotateworker.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,95 @@ | |||
| import logging | ||||
| import json | ||||
| import time | ||||
| 
 | ||||
| from datetime import timedelta, datetime | ||||
| from itertools import chain | ||||
| 
 | ||||
| import features | ||||
| from app import app, storage | ||||
| from data.database import UseThenDisconnect | ||||
| from data.model import db_transaction | ||||
| from data.model.log import (get_stale_logs, get_stale_logs_start_id, | ||||
|                             get_stale_logs_end_id, delete_stale_logs) | ||||
| from util.registry.gzipwrap import GzipWrap | ||||
| from workers.globalworkerbase import GlobalWorker | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| WORKER_FREQUENCY = 3600 | ||||
| STALE_AFTER = timedelta(days=30) | ||||
| MIN_LOGS_PER_ROTATION = 10000 | ||||
| BATCH_SIZE = 500 | ||||
| SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') | ||||
| SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION') | ||||
| 
 | ||||
| class LogRotateWorker(GlobalWorker): | ||||
|   """ Worker used to rotate old logs out the database and into storage. """ | ||||
|   def __init__(self): | ||||
|     super(LogRotateWorker, self).__init__(app, sleep_period_seconds=WORKER_FREQUENCY) | ||||
| 
 | ||||
|   def perform_global_work(self): | ||||
|     logger.debug('Attempting to rotate log entries') | ||||
|     with UseThenDisconnect(app.config): | ||||
|       with db_transaction(): | ||||
|         cutoff = datetime.now() - STALE_AFTER | ||||
|         start_id = get_stale_logs_start_id() | ||||
|         end_id = get_stale_logs_end_id(cutoff) | ||||
| 
 | ||||
|         if start_id is None or end_id is None: | ||||
|           logger.warning('No logs to be archived.') | ||||
|           return | ||||
| 
 | ||||
|         logger.debug('Found starting ID %s and ending ID %s', start_id, end_id) | ||||
| 
 | ||||
|         approx_count = end_id - start_id | ||||
|         if approx_count < MIN_LOGS_PER_ROTATION: | ||||
|           logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) | ||||
|           return | ||||
| 
 | ||||
|         # Build a generator of all the logs we want to archive in batches of 500. | ||||
|         log_generator = (_ for _ in ()) | ||||
|         logs_to_fetch = range(start_id, end_id+1) | ||||
|         while len(logs_to_fetch) != 0: | ||||
|           (batch, logs_to_fetch) = _take(BATCH_SIZE, logs_to_fetch) | ||||
|           logger.debug('Reading archived logs from IDs %s to %s', batch[0], batch[-1]) | ||||
|           batch_generator = (pretty_print_in_json(log) + '\n' | ||||
|                              for log in get_stale_logs(batch[0], batch[-1])) | ||||
|           log_generator = chain(log_generator, batch_generator) | ||||
| 
 | ||||
|         logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) | ||||
|         filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) | ||||
|         storage.stream_write(SAVE_LOCATION, filename, GzipWrap(log_generator)) | ||||
| 
 | ||||
|         logs_to_delete = range(start_id, end_id+1) | ||||
|         while len(logs_to_delete) != 0: | ||||
|           (batch, logs_to_delete) = _take(BATCH_SIZE, logs_to_delete) | ||||
|           logger.debug('Deleting archived logs from IDs %s to %s', batch[0], batch[-1]) | ||||
|           delete_stale_logs(batch[0], batch[-1]) | ||||
| 
 | ||||
| 
 | ||||
| def _take(count, lst): | ||||
|   return lst[:count], lst[count:] | ||||
| 
 | ||||
| def pretty_print_in_json(log): | ||||
|   """ Pretty prints a LogEntry in JSON. """ | ||||
|   return json.dumps({'kind_id': log.kind_id, | ||||
|                      'account_id': log.account_id, | ||||
|                      'performer_id': log.performer_id, | ||||
|                      'repository_id': log.repository_id, | ||||
|                      'datetime': str(log.datetime), | ||||
|                      'ip': str(log.ip), | ||||
|                      'metadata_json': json.loads(str(log.metadata_json))}) | ||||
| 
 | ||||
| 
 | ||||
| def main(): | ||||
|   if not features.ACTION_LOG_ROTATION or None in [SAVE_PATH, SAVE_LOCATION]: | ||||
|     while True: | ||||
|       time.sleep(100000) | ||||
| 
 | ||||
|   worker = LogRotateWorker() | ||||
|   worker.start() | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   main() | ||||
		Reference in a new issue