179 lines
6.2 KiB
Python
179 lines
6.2 KiB
Python
import json
|
|
|
|
from calendar import timegm
|
|
from peewee import JOIN_LEFT_OUTER, SQL, fn
|
|
from datetime import datetime, timedelta, date
|
|
from cachetools import lru_cache
|
|
|
|
from data.database import LogEntry, LogEntryKind, User, db
|
|
from data.model import config
|
|
|
|
def _logs_query(selections, start_time, end_time, performer=None, repository=None, namespace=None,
|
|
ignore=None):
|
|
joined = (LogEntry
|
|
.select(*selections)
|
|
.switch(LogEntry)
|
|
.where(LogEntry.datetime >= start_time, LogEntry.datetime < end_time))
|
|
|
|
if repository:
|
|
joined = joined.where(LogEntry.repository == repository)
|
|
|
|
if performer:
|
|
joined = joined.where(LogEntry.performer == performer)
|
|
|
|
if namespace:
|
|
joined = joined.join(User).where(User.username == namespace)
|
|
|
|
if ignore:
|
|
kind_map = get_log_entry_kinds()
|
|
ignore_ids = [kind_map[kind_name] for kind_name in ignore]
|
|
joined = joined.where(~(LogEntry.kind << ignore_ids))
|
|
|
|
return joined
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def get_log_entry_kinds():
|
|
kind_map = {}
|
|
for kind in LogEntryKind.select():
|
|
kind_map[kind.id] = kind.name
|
|
kind_map[kind.name] = kind.id
|
|
|
|
return kind_map
|
|
|
|
|
|
def get_aggregated_logs(start_time, end_time, performer=None, repository=None, namespace=None,
|
|
ignore=None):
|
|
date = db.extract_date('day', LogEntry.datetime)
|
|
selections = [LogEntry.kind, date.alias('day'), fn.Count(LogEntry.id).alias('count')]
|
|
query = _logs_query(selections, start_time, end_time, performer, repository, namespace, ignore)
|
|
return query.group_by(date, LogEntry.kind)
|
|
|
|
|
|
def get_logs_query(start_time, end_time, performer=None, repository=None, namespace=None,
|
|
ignore=None):
|
|
Performer = User.alias()
|
|
selections = [LogEntry, Performer]
|
|
|
|
query = _logs_query(selections, start_time, end_time, performer, repository, namespace, ignore)
|
|
query = (query.switch(LogEntry)
|
|
.join(Performer, JOIN_LEFT_OUTER,
|
|
on=(LogEntry.performer == Performer.id).alias('performer')))
|
|
|
|
return query
|
|
|
|
|
|
def _json_serialize(obj):
|
|
if isinstance(obj, datetime):
|
|
return timegm(obj.utctimetuple())
|
|
|
|
return obj
|
|
|
|
|
|
def log_action(kind_name, user_or_organization_name, performer=None, repository=None,
|
|
ip=None, metadata={}, timestamp=None):
|
|
if not timestamp:
|
|
timestamp = datetime.today()
|
|
|
|
account = None
|
|
if user_or_organization_name is not None:
|
|
account = User.get(User.username == user_or_organization_name).id
|
|
else:
|
|
account = config.app_config.get('SERVICE_LOG_ACCOUNT_ID')
|
|
if account is None:
|
|
account = User.select(fn.Min(User.id)).tuples().get()[0]
|
|
|
|
kind = LogEntryKind.get(LogEntryKind.name == kind_name)
|
|
metadata_json = json.dumps(metadata, default=_json_serialize)
|
|
LogEntry.create(kind=kind, account=account, performer=performer,
|
|
repository=repository, ip=ip, metadata_json=metadata_json,
|
|
datetime=timestamp)
|
|
|
|
|
|
def _get_repository_events(repository, time_delta, time_delta_earlier, clause):
|
|
""" Returns a pair representing the count of the number of events for the given
|
|
repository in each of the specified time deltas. The date ranges are calculated by
|
|
taking the current time today and subtracting the time delta given. Since
|
|
we want to grab *two* ranges, we restrict the second range to be greater
|
|
than the first (i.e. referring to an earlier time), so we can conduct the
|
|
lookup in a single query. The clause is used to further filter the kind of
|
|
events being found.
|
|
"""
|
|
since = date.today() - time_delta
|
|
since_earlier = date.today() - time_delta_earlier
|
|
|
|
if since_earlier >= since:
|
|
raise ValueError('time_delta_earlier must be greater than time_delta')
|
|
|
|
# This uses a CASE WHEN inner clause to further filter the count.
|
|
formatted = since.strftime('%Y-%m-%d')
|
|
case_query = 'CASE WHEN datetime >= \'%s\' THEN 1 ELSE 0 END' % formatted
|
|
|
|
result = (LogEntry
|
|
.select(fn.Sum(SQL(case_query)), fn.Count(SQL('*')))
|
|
.where(LogEntry.repository == repository)
|
|
.where(clause)
|
|
.where(LogEntry.datetime >= since_earlier)
|
|
.tuples()
|
|
.get())
|
|
|
|
return (int(result[0]) if result[0] else 0, int(result[1]) if result[1] else 0)
|
|
|
|
|
|
def get_repository_pushes(repository, time_delta, time_delta_earlier):
|
|
push_repo = LogEntryKind.get(name='push_repo')
|
|
clauses = (LogEntry.kind == push_repo)
|
|
return _get_repository_events(repository, time_delta, time_delta_earlier, clauses)
|
|
|
|
|
|
def get_repository_pulls(repository, time_delta, time_delta_earlier):
|
|
repo_pull = LogEntryKind.get(name='pull_repo')
|
|
repo_verb = LogEntryKind.get(name='repo_verb')
|
|
clauses = ((LogEntry.kind == repo_pull) | (LogEntry.kind == repo_verb))
|
|
return _get_repository_events(repository, time_delta, time_delta_earlier, clauses)
|
|
|
|
|
|
def get_repository_usage():
|
|
one_month_ago = date.today() - timedelta(weeks=4)
|
|
repo_pull = LogEntryKind.get(name='pull_repo')
|
|
repo_verb = LogEntryKind.get(name='repo_verb')
|
|
return (LogEntry
|
|
.select(LogEntry.ip, LogEntry.repository)
|
|
.where((LogEntry.kind == repo_pull) | (LogEntry.kind == repo_verb))
|
|
.where(~(LogEntry.repository >> None))
|
|
.where(LogEntry.datetime >= one_month_ago)
|
|
.group_by(LogEntry.ip, LogEntry.repository)
|
|
.count())
|
|
|
|
|
|
def get_stale_logs_start_id():
|
|
""" Gets the oldest log entry. """
|
|
try:
|
|
return (LogEntry
|
|
.select(LogEntry.id)
|
|
.order_by(LogEntry.id)
|
|
.limit(1)
|
|
.tuples())[0][0]
|
|
except IndexError:
|
|
return None
|
|
|
|
|
|
def get_stale_logs_cutoff_id(cutoff_date):
|
|
""" Gets the most recent ID created before the cutoff_date. """
|
|
try:
|
|
return (LogEntry
|
|
.select(fn.Min(LogEntry.id))
|
|
.where(LogEntry.datetime <= cutoff_date)
|
|
.tuples())[0][0]
|
|
except IndexError:
|
|
return None
|
|
|
|
|
|
def get_stale_logs(start_id, end_id):
|
|
""" Returns all the logs with IDs between start_id and end_id inclusively. """
|
|
return LogEntry.select().where((LogEntry.id >= start_id), (LogEntry.id <= end_id))
|
|
|
|
|
|
def delete_stale_logs(start_id, end_id):
|
|
""" Deletes all the logs with IDs between start_id and end_id. """
|
|
LogEntry.delete().where((LogEntry.id >= start_id), (LogEntry.id <= end_id)).execute()
|