223 lines
9.3 KiB
Python
223 lines
9.3 KiB
Python
# karma - A maubot plugin to track the karma of users.
|
|
# Copyright (C) 2019 Tulir Asokan
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from typing import Tuple, Optional, Type, Iterable, Dict, Any, NamedTuple
|
|
from time import time
|
|
|
|
from sqlalchemy import (Column, String, Integer, BigInteger, Text, Table,
|
|
select, and_, or_, func, case, asc, desc)
|
|
from sqlalchemy.sql.base import ImmutableColumnCollection
|
|
from sqlalchemy.engine.base import Engine
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
|
|
from mautrix.types import Event, UserID, EventID, RoomID
|
|
|
|
EventKarmaStats = NamedTuple("EventKarmaStats", room_id=RoomID, event_id=EventID, sender=UserID,
|
|
content=str, total=int, positive=int, negative=int)
|
|
UserKarmaStats = NamedTuple("UserKarmaStats", user_id=UserID, total=int, positive=int, negative=int)
|
|
|
|
|
|
class Karma:
|
|
__tablename__ = "karma"
|
|
db: Engine = None
|
|
t: Table = None
|
|
c: ImmutableColumnCollection = None
|
|
|
|
given_to: UserID = Column(String(255), primary_key=True)
|
|
given_by: UserID = Column(String(255), primary_key=True)
|
|
given_in: RoomID = Column(String(255), primary_key=True)
|
|
given_for: EventID = Column(String(255), primary_key=True)
|
|
|
|
given_from: EventID = Column(String(255), unique=True)
|
|
given_at: int = Column(BigInteger)
|
|
value: int = Column(Integer)
|
|
content: str = Column(Text)
|
|
|
|
@classmethod
|
|
def get_best_events(cls, limit: int = 10) -> Iterable['EventKarmaStats']:
|
|
return cls.get_event_stats(direction=desc, limit=limit)
|
|
|
|
@classmethod
|
|
def get_worst_events(cls, limit: int = 10) -> Iterable['EventKarmaStats']:
|
|
return cls.get_event_stats(direction=asc, limit=limit)
|
|
|
|
@classmethod
|
|
def get_event_stats(cls, direction, limit: int = 10) -> Iterable['EventKarmaStats']:
|
|
c = cls.c
|
|
return (EventKarmaStats(*row) for row in cls.db.execute(
|
|
select([c.given_in, c.given_for, c.given_to, c.content,
|
|
func.sum(c.value).label("total"),
|
|
func.sum(case([(c.value > 0, c.value)], else_=0)).label("positive"),
|
|
func.abs(func.sum(case([(c.value < 0, c.value)], else_=0))).label("negative")])
|
|
.group_by(c.given_for)
|
|
.order_by(direction("total"), asc(c.given_for))
|
|
.limit(limit)))
|
|
|
|
@classmethod
|
|
def get_top_users(cls, limit: int = 10) -> Iterable['UserKarmaStats']:
|
|
return cls.get_user_stats(direction=desc, limit=limit)
|
|
|
|
@classmethod
|
|
def get_bottom_users(cls, limit: int = 10) -> Iterable['UserKarmaStats']:
|
|
return cls.get_user_stats(direction=asc, limit=limit)
|
|
|
|
@classmethod
|
|
def get_user_stats(cls, direction, limit: int = 10) -> Iterable['UserKarmaStats']:
|
|
c = cls.c
|
|
return (UserKarmaStats(*row) for row in cls.db.execute(
|
|
select([c.given_to,
|
|
func.sum(c.value).label("total"),
|
|
func.sum(case([(c.value > 0, c.value)], else_=0)).label("positive"),
|
|
func.abs(func.sum(case([(c.value < 0, c.value)], else_=0))).label("negative")])
|
|
.group_by(c.given_to)
|
|
.order_by(direction("total"), asc(c.given_to))
|
|
.limit(limit)))
|
|
|
|
@classmethod
|
|
def get_karma(cls, user_id: UserID) -> Optional['UserKarmaStats']:
|
|
c = cls.c
|
|
rows = cls.db.execute(
|
|
select([c.given_to,
|
|
func.sum(c.value).label("total"),
|
|
func.sum(case([(c.value > 0, c.value)], else_=0)).label("positive"),
|
|
func.abs(func.sum(case([(c.value < 0, c.value)], else_=0))).label("negative")]
|
|
).where(c.given_to == user_id))
|
|
try:
|
|
return UserKarmaStats(*next(rows))
|
|
except StopIteration:
|
|
return None
|
|
|
|
@classmethod
|
|
def find_index_from_top(cls, user_id: UserID) -> int:
|
|
c = cls.c
|
|
rows = cls.db.execute(select([c.given_to])
|
|
.group_by(c.given_to)
|
|
.order_by(desc(func.sum(c.value)), asc(c.given_to)))
|
|
for i, row in enumerate(rows):
|
|
if row[0] == user_id:
|
|
return i
|
|
return -1
|
|
|
|
@classmethod
|
|
def all(cls, user_id: UserID) -> Iterable['Karma']:
|
|
return (cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
|
|
given_from=given_from, given_at=given_at, value=value, content=content)
|
|
for given_to, given_by, given_in, given_for, given_from, given_at, value, content
|
|
in cls.db.execute(cls.t.select().where(cls.c.given_to == user_id)))
|
|
|
|
@classmethod
|
|
def export(cls, user_id: UserID) -> Iterable['Karma']:
|
|
return (cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
|
|
given_from=given_from, given_at=given_at, value=value, content=content)
|
|
for given_to, given_by, given_in, given_for, given_from, given_at, value, content
|
|
in cls.db.execute(cls.t.select().where(or_(cls.c.given_to == user_id,
|
|
cls.c.given_by == user_id))))
|
|
|
|
@classmethod
|
|
def is_vote_event(cls, event_id: EventID) -> bool:
|
|
rows = cls.db.execute(cls.t.select().where(cls.c.given_from == event_id))
|
|
try:
|
|
next(rows)
|
|
return True
|
|
except StopIteration:
|
|
return False
|
|
|
|
@classmethod
|
|
def get(cls, given_to: UserID, given_by: UserID, given_in: RoomID, given_for: Event
|
|
) -> Optional['Karma']:
|
|
rows = cls.db.execute(cls.t.select().where(and_(
|
|
cls.c.given_to == given_to, cls.c.given_by == given_by,
|
|
cls.c.given_in == given_in, cls.c.given_for == given_for)))
|
|
try:
|
|
(given_to, given_by, given_in, given_for,
|
|
given_from, given_at, value, content) = next(rows)
|
|
except StopIteration:
|
|
return None
|
|
return cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
|
|
given_from=given_from, given_at=given_at, value=value, content=content)
|
|
|
|
@classmethod
|
|
def get_by_given_from(cls, given_from: EventID) -> Optional['Karma']:
|
|
rows = cls.db.execute(cls.t.select().where(cls.c.given_from == given_from))
|
|
try:
|
|
(given_to, given_by, given_in, given_for,
|
|
given_from, given_at, value, content) = next(rows)
|
|
except StopIteration:
|
|
return None
|
|
return cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
|
|
given_from=given_from, given_at=given_at, value=value, content=content)
|
|
|
|
def delete(self) -> None:
|
|
self.db.execute(self.t.delete().where(and_(
|
|
self.c.given_to == self.given_to, self.c.given_by == self.given_by,
|
|
self.c.given_in == self.given_in, self.c.given_for == self.given_for)))
|
|
|
|
def insert(self) -> None:
|
|
self.given_at = int(time() * 1000)
|
|
self.db.execute(self.t.insert().values(given_to=self.given_to, given_by=self.given_by,
|
|
given_in=self.given_in, given_for=self.given_for,
|
|
given_from=self.given_from, value=self.value,
|
|
given_at=self.given_at, content=self.content))
|
|
|
|
def update(self, new_value: int) -> None:
|
|
self.given_at = int(time() * 1000)
|
|
self.value = new_value
|
|
self.db.execute(self.t.update().where(and_(
|
|
self.c.given_to == self.given_to, self.c.given_by == self.given_by,
|
|
self.c.given_in == self.given_in, self.c.given_for == self.given_for
|
|
)).values(given_from=self.given_from, value=self.value, given_at=self.given_at))
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"to": self.given_to,
|
|
"by": self.given_by,
|
|
"in": self.given_in,
|
|
"for": self.given_for,
|
|
"from": self.given_from,
|
|
"at": self.given_at,
|
|
"value": self.value,
|
|
"content": self.content,
|
|
}
|
|
|
|
|
|
class Version:
|
|
__tablename__ = "version"
|
|
db: Engine = None
|
|
t: Table = None
|
|
c: ImmutableColumnCollection = None
|
|
|
|
version: int = Column(Integer, primary_key=True)
|
|
|
|
|
|
def make_tables(engine: Engine) -> Tuple[Type[Karma], Type[Version]]:
|
|
base = declarative_base()
|
|
|
|
class KarmaImpl(Karma, base):
|
|
__table__: Table
|
|
|
|
class VersionImpl(Version, base):
|
|
__table__: Table
|
|
|
|
base.metadata.bind = engine
|
|
for table in KarmaImpl, VersionImpl:
|
|
table.db = engine
|
|
table.t = table.__table__
|
|
table.c = table.__table__.c
|
|
table.Karma = KarmaImpl
|
|
|
|
# TODO replace with alembic
|
|
base.metadata.create_all()
|
|
|
|
return KarmaImpl, VersionImpl
|