maubot_karma/karma/db.py

224 lines
9.3 KiB
Python

# karma - A maubot plugin to track the karma of users.
# Copyright (C) 2019 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Tuple, Optional, Type, Iterable, Dict, Any, NamedTuple
from time import time
from sqlalchemy import (Column, String, Integer, BigInteger, Text, Table,
select, and_, or_, func, case, asc, desc)
from sqlalchemy.sql.base import ImmutableColumnCollection
from sqlalchemy.engine.base import Engine
from sqlalchemy.ext.declarative import declarative_base
from mautrix.types import Event, UserID, EventID, RoomID
EventKarmaStats = NamedTuple("EventKarmaStats", room_id=RoomID, event_id=EventID, sender=UserID,
content=str, total=int, positive=int, negative=int)
UserKarmaStats = NamedTuple("UserKarmaStats", user_id=UserID, total=int, positive=int, negative=int)
class Karma:
__tablename__ = "karma"
db: Engine = None
t: Table = None
c: ImmutableColumnCollection = None
given_to: UserID = Column(String(255), primary_key=True)
given_by: UserID = Column(String(255), primary_key=True)
given_in: RoomID = Column(String(255), primary_key=True)
given_for: EventID = Column(String(255), primary_key=True)
given_from: EventID = Column(String(255), unique=True)
given_at: int = Column(BigInteger)
value: int = Column(Integer)
content: str = Column(Text)
@classmethod
def get_best_events(cls, limit: int = 10) -> Iterable['EventKarmaStats']:
return cls.get_event_stats(direction=desc, limit=limit)
@classmethod
def get_worst_events(cls, limit: int = 10) -> Iterable['EventKarmaStats']:
return cls.get_event_stats(direction=asc, limit=limit)
@classmethod
def get_event_stats(cls, direction, limit: int = 10) -> Iterable['EventKarmaStats']:
c = cls.c
return (EventKarmaStats(*row) for row in cls.db.execute(
select([c.given_in, c.given_for, c.given_to, c.content,
func.sum(c.value).label("total"),
func.sum(case([(c.value > 0, c.value)], else_=0)).label("positive"),
func.abs(func.sum(case([(c.value < 0, c.value)], else_=0))).label("negative")])
.group_by(c.given_for)
.order_by(direction("total"), asc(c.given_for))
.limit(limit)))
@classmethod
def get_top_users(cls, limit: int = 10) -> Iterable['UserKarmaStats']:
return cls.get_user_stats(direction=desc, limit=limit)
@classmethod
def get_bottom_users(cls, limit: int = 10) -> Iterable['UserKarmaStats']:
return cls.get_user_stats(direction=asc, limit=limit)
@classmethod
def get_user_stats(cls, direction, limit: int = 10) -> Iterable['UserKarmaStats']:
c = cls.c
return (UserKarmaStats(*row) for row in cls.db.execute(
select([c.given_to,
func.sum(c.value).label("total"),
func.sum(case([(c.value > 0, c.value)], else_=0)).label("positive"),
func.abs(func.sum(case([(c.value < 0, c.value)], else_=0))).label("negative")])
.group_by(c.given_to)
.order_by(direction("total"), asc(c.given_to))
.limit(limit)))
@classmethod
def get_karma(cls, user_id: UserID) -> Optional['UserKarmaStats']:
c = cls.c
rows = cls.db.execute(
select([c.given_to,
func.sum(c.value).label("total"),
func.sum(case([(c.value > 0, c.value)], else_=0)).label("positive"),
func.abs(func.sum(case([(c.value < 0, c.value)], else_=0))).label("negative")]
).where(c.given_to == user_id))
try:
return UserKarmaStats(*next(rows))
except StopIteration:
return None
@classmethod
def find_index_from_top(cls, user_id: UserID) -> int:
c = cls.c
rows = cls.db.execute(select([c.given_to])
.group_by(c.given_to)
.order_by(desc(func.sum(c.value)), asc(c.given_to)))
for i, row in enumerate(rows):
if row[0] == user_id:
return i
return -1
@classmethod
def all(cls, user_id: UserID) -> Iterable['Karma']:
return (cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
given_from=given_from, given_at=given_at, value=value, content=content)
for given_to, given_by, given_in, given_for, given_from, given_at, value, content
in cls.db.execute(cls.t.select().where(cls.c.given_to == user_id)))
@classmethod
def export(cls, user_id: UserID) -> Iterable['Karma']:
return (cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
given_from=given_from, given_at=given_at, value=value, content=content)
for given_to, given_by, given_in, given_for, given_from, given_at, value, content
in cls.db.execute(cls.t.select().where(or_(cls.c.given_to == user_id,
cls.c.given_by == user_id))))
@classmethod
def is_vote_event(cls, event_id: EventID) -> bool:
rows = cls.db.execute(cls.t.select().where(cls.c.given_from == event_id))
try:
next(rows)
return True
except StopIteration:
return False
@classmethod
def get(cls, given_to: UserID, given_by: UserID, given_in: RoomID, given_for: Event
) -> Optional['Karma']:
rows = cls.db.execute(cls.t.select().where(and_(
cls.c.given_to == given_to, cls.c.given_by == given_by,
cls.c.given_in == given_in, cls.c.given_for == given_for)))
try:
(given_to, given_by, given_in, given_for,
given_from, given_at, value, content) = next(rows)
except StopIteration:
return None
return cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
given_from=given_from, given_at=given_at, value=value, content=content)
@classmethod
def get_by_given_from(cls, given_from: EventID) -> Optional['Karma']:
rows = cls.db.execute(cls.t.select().where(cls.c.given_from == given_from))
try:
(given_to, given_by, given_in, given_for,
given_from, given_at, value, content) = next(rows)
except StopIteration:
return None
return cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
given_from=given_from, given_at=given_at, value=value, content=content)
def delete(self) -> None:
self.db.execute(self.t.delete().where(and_(
self.c.given_to == self.given_to, self.c.given_by == self.given_by,
self.c.given_in == self.given_in, self.c.given_for == self.given_for)))
def insert(self) -> None:
self.given_at = int(time() * 1000)
self.db.execute(self.t.insert().values(given_to=self.given_to, given_by=self.given_by,
given_in=self.given_in, given_for=self.given_for,
given_from=self.given_from, value=self.value,
given_at=self.given_at, content=self.content))
def update(self, new_value: int) -> None:
self.given_at = int(time() * 1000)
self.value = new_value
self.db.execute(self.t.update().where(and_(
self.c.given_to == self.given_to, self.c.given_by == self.given_by,
self.c.given_in == self.given_in, self.c.given_for == self.given_for
)).values(given_from=self.given_from, value=self.value, given_at=self.given_at))
def to_dict(self) -> Dict[str, Any]:
return {
"to": self.given_to,
"by": self.given_by,
"in": self.given_in,
"for": self.given_for,
"from": self.given_from,
"at": self.given_at,
"value": self.value,
"content": self.content,
}
class Version:
__tablename__ = "version"
db: Engine = None
t: Table = None
c: ImmutableColumnCollection = None
version: int = Column(Integer, primary_key=True)
def make_tables(engine: Engine) -> Tuple[Type[Karma], Type[Version]]:
base = declarative_base()
class KarmaImpl(Karma, base):
__table__: Table
class VersionImpl(Version, base):
__table__: Table
base.metadata.bind = engine
for table in KarmaImpl, VersionImpl:
table.db = engine
table.t = table.__table__
table.c = table.__table__.c
table.Karma = KarmaImpl
# TODO replace with alembic
base.metadata.create_all()
return KarmaImpl, VersionImpl