142 lines
		
	
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			142 lines
		
	
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import json
 | |
| import time
 | |
| 
 | |
| from datetime import timedelta, datetime
 | |
| from gzip import GzipFile
 | |
| from tempfile import SpooledTemporaryFile
 | |
| 
 | |
| import features
 | |
| from app import app, storage
 | |
| from data.database import UseThenDisconnect, LogEntry, LogEntry2, LogEntry3
 | |
| from data.model.log import (get_stale_logs, get_stale_logs_start_id,
 | |
|                             get_stale_logs_cutoff_id, delete_stale_logs)
 | |
| from data.userfiles import DelegateUserfiles
 | |
| from util.locking import GlobalLock, LockNotAcquiredException
 | |
| from util.log import logfile_path
 | |
| from util.streamingjsonencoder import StreamingJSONEncoder
 | |
| from util.timedeltastring import convert_to_timedelta
 | |
| from workers.worker import Worker
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| JSON_MIMETYPE = 'application/json'
 | |
| MIN_LOGS_PER_ROTATION = 10000
 | |
| MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024
 | |
| 
 | |
| WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 12)
 | |
| STALE_AFTER = convert_to_timedelta(app.config.get('ACTION_LOG_ROTATION_THRESHOLD', '30d'))
 | |
| SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH')
 | |
| SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION')
 | |
| 
 | |
| class LogRotateWorker(Worker):
 | |
|   """ Worker used to rotate old logs out the database and into storage. """
 | |
|   def __init__(self):
 | |
|     super(LogRotateWorker, self).__init__()
 | |
|     self.add_operation(self._archive_logs, WORKER_FREQUENCY)
 | |
| 
 | |
|   def _archive_logs(self):
 | |
|     # TODO(LogMigrate): Remove the branch once we're back on a single table.
 | |
|     models = [LogEntry, LogEntry2, LogEntry3]
 | |
|     for model in models:
 | |
|       self._archive_logs_for_model(model)
 | |
| 
 | |
|   def _archive_logs_for_model(self, model):
 | |
|     logger.debug('Attempting to rotate log entries')
 | |
| 
 | |
|     with UseThenDisconnect(app.config):
 | |
|       cutoff_date = datetime.now() - STALE_AFTER
 | |
|       cutoff_id = get_stale_logs_cutoff_id(cutoff_date, model)
 | |
|       if cutoff_id is None:
 | |
|         logger.warning('Failed to find cutoff id')
 | |
|         return
 | |
| 
 | |
|     logs_archived = True
 | |
|     while logs_archived:
 | |
|       try:
 | |
|         with GlobalLock('ACTION_LOG_ROTATION'):
 | |
|           logs_archived = self._perform_archiving(cutoff_id, model)
 | |
|       except LockNotAcquiredException:
 | |
|         return
 | |
| 
 | |
|   def _perform_archiving(self, cutoff_id, model):
 | |
|     save_location = SAVE_LOCATION
 | |
|     if not save_location:
 | |
|       # Pick the *same* save location for all instances. This is a fallback if
 | |
|       # a location was not configured.
 | |
|       save_location = storage.locations[0]
 | |
| 
 | |
|     log_archive = DelegateUserfiles(app, storage, save_location, SAVE_PATH)
 | |
| 
 | |
|     with UseThenDisconnect(app.config):
 | |
|       start_id = get_stale_logs_start_id(model)
 | |
| 
 | |
|       if start_id is None:
 | |
|         logger.warning('Failed to find start id')
 | |
|         return False
 | |
| 
 | |
|       logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id)
 | |
| 
 | |
|       approx_count = cutoff_id - start_id
 | |
|       if approx_count < MIN_LOGS_PER_ROTATION:
 | |
|         logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count)
 | |
|         return False
 | |
| 
 | |
|       end_id = start_id + MIN_LOGS_PER_ROTATION
 | |
|       logs = [log_dict(log) for log in get_stale_logs(start_id, end_id, model)]
 | |
| 
 | |
|     logger.debug('Archiving logs from IDs %s to %s', start_id, end_id)
 | |
|     with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
 | |
|       with GzipFile('temp_action_log_rotate', fileobj=tempfile, compresslevel=1) as zipstream:
 | |
|         for chunk in StreamingJSONEncoder().iterencode(logs):
 | |
|           zipstream.write(chunk)
 | |
| 
 | |
|       tempfile.seek(0)
 | |
|       filename = '%d-%d-%s.txt.gz' % (start_id, end_id, model.__name__.lower())
 | |
|       log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
 | |
|                              file_id=filename)
 | |
|       logger.debug('Finished archiving logs from IDs %s to %s', start_id, end_id)
 | |
| 
 | |
|     with UseThenDisconnect(app.config):
 | |
|       logger.debug('Deleting logs from IDs %s to %s', start_id, end_id)
 | |
|       delete_stale_logs(start_id, end_id, model)
 | |
| 
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def log_dict(log):
 | |
|   """ Pretty prints a LogEntry in JSON. """
 | |
|   try:
 | |
|     metadata_json = json.loads(str(log.metadata_json))
 | |
|   except ValueError:
 | |
|     logger.exception('Could not parse metadata JSON for log entry %s', log.id)
 | |
|     metadata_json = {'__raw': log.metadata_json}
 | |
|   except TypeError:
 | |
|     logger.exception('Could not parse metadata JSON for log entry %s', log.id)
 | |
|     metadata_json = {'__raw': log.metadata_json}
 | |
| 
 | |
|   return {
 | |
|     'kind_id': log.kind_id,
 | |
|     'account_id': log.account_id,
 | |
|     'performer_id': log.performer_id,
 | |
|     'repository_id': log.repository_id,
 | |
|     'datetime': str(log.datetime),
 | |
|     'ip': str(log.ip),
 | |
|     'metadata_json': metadata_json,
 | |
|   }
 | |
| 
 | |
| 
 | |
| def main():
 | |
|   logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
 | |
| 
 | |
|   if not features.LOG_EXPORT:
 | |
|     logger.debug('Log export not enabled; skipping')
 | |
|     while True:
 | |
|       time.sleep(100000)
 | |
| 
 | |
|   worker = LogRotateWorker()
 | |
|   worker.start()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   main()
 |