diff --git a/karma/bot.py b/karma/bot.py
index a417967..0db01e2 100644
--- a/karma/bot.py
+++ b/karma/bot.py
@@ -13,12 +13,16 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import Awaitable, Type
+from typing import Awaitable, Type, Optional
+import json
+import html
from sqlalchemy.engine.base import Engine
from maubot import Plugin, CommandSpec, Command, PassiveCommand, Argument, MessageEvent
-from mautrix.types import Event, StateEvent
+from mautrix.types import (Event, StateEvent, EventID, UserID, FileInfo, MessageType,
+ MediaMessageEventContent)
+from mautrix.client.api.types.event.message import media_reply_fallback_body_map
from .db import make_tables, Karma, KarmaCache, Version
@@ -26,9 +30,15 @@ COMMAND_PASSIVE_UPVOTE = "xyz.maubot.karma.up"
COMMAND_PASSIVE_DOWNVOTE = "xyz.maubot.karma.down"
ARG_LIST = "$list"
+ARG_LIST_MATCHES = "top|bot(?:tom)?|best|worst"
COMMAND_KARMA_LIST = f"karma {ARG_LIST}"
-
-COMMAND_OWN_KARMA = "karma"
+ARG_USER = "$user"
+ARG_USER_MATCHES = ".+"
+COMMAND_KARMA_VIEW = f"karma {ARG_USER}"
+COMMAND_KARMA_STATS = "karma stats"
+COMMAND_OWN_KARMA_VIEW = "karma"
+COMMAND_OWN_KARMA_BREAKDOWN = "karma breakdown"
+COMMAND_OWN_KARMA_EXPORT = "karma export"
COMMAND_UPVOTE = "upvote"
COMMAND_DOWNVOTE = "downvote"
@@ -54,10 +64,16 @@ class KarmaBot(Plugin):
self.db = self.request_db_engine()
self.karma_cache, self.karma, self.version = make_tables(self.db)
self.set_command_spec(CommandSpec(commands=[
+ Command(syntax=COMMAND_KARMA_STATS, description="View global karma statistics"),
+ Command(syntax=COMMAND_OWN_KARMA_VIEW, description="View your karma"),
+ Command(syntax=COMMAND_OWN_KARMA_BREAKDOWN, description="View your karma breakdown"),
+ Command(syntax=COMMAND_OWN_KARMA_EXPORT, description="Export the data of your karma"),
Command(syntax=COMMAND_KARMA_LIST, description="View the karma top lists",
- arguments={ARG_LIST: Argument(matches="(top|bot(tom)?|high(score)?|low)",
- required=True, description="The list to view")}),
- Command(syntax=COMMAND_OWN_KARMA, description="View your karma"),
+ arguments={ARG_LIST: Argument(matches=ARG_LIST_MATCHES, required=True,
+ description="The list to view")}),
+ Command(syntax=COMMAND_KARMA_VIEW, description="View the karma of a specific user",
+ arguments={ARG_USER: Argument(matches=ARG_USER_MATCHES, required=True,
+ description="The user whose karma to view")}),
Command(syntax=COMMAND_UPVOTE, description="Upvote a message"),
Command(syntax=COMMAND_DOWNVOTE, description="Downvote a message"),
], passive_commands=[
@@ -69,24 +85,40 @@ class KarmaBot(Plugin):
self.client.add_command_handler(COMMAND_PASSIVE_DOWNVOTE, self.downvote)
self.client.add_command_handler(COMMAND_UPVOTE, self.upvote)
self.client.add_command_handler(COMMAND_DOWNVOTE, self.downvote)
- self.client.add_command_handler(COMMAND_KARMA_LIST, self.view_karma_list)
- self.client.add_command_handler(COMMAND_OWN_KARMA, self.view_karma)
+
+ self.client.add_command_handler(COMMAND_KARMA_LIST, self.karma_list)
+ self.client.add_command_handler(COMMAND_KARMA_VIEW, self.view_karma)
+ self.client.add_command_handler(COMMAND_KARMA_STATS, self.karma_stats)
+ self.client.add_command_handler(COMMAND_OWN_KARMA_VIEW, self.view_own_karma)
+ self.client.add_command_handler(COMMAND_OWN_KARMA_BREAKDOWN, self.own_karma_breakdown)
+ self.client.add_command_handler(COMMAND_OWN_KARMA_EXPORT, self.export_own_karma)
async def stop(self) -> None:
self.client.remove_command_handler(COMMAND_PASSIVE_UPVOTE, self.upvote)
self.client.remove_command_handler(COMMAND_PASSIVE_DOWNVOTE, self.downvote)
self.client.remove_command_handler(COMMAND_UPVOTE, self.upvote)
self.client.remove_command_handler(COMMAND_DOWNVOTE, self.downvote)
- self.client.remove_command_handler(COMMAND_KARMA_LIST, self.view_karma_list)
- self.client.remove_command_handler(COMMAND_OWN_KARMA, self.view_karma)
- @staticmethod
- def parse_content(evt: Event) -> str:
+ self.client.remove_command_handler(COMMAND_KARMA_LIST, self.karma_list)
+ self.client.remove_command_handler(COMMAND_KARMA_VIEW, self.view_karma)
+ self.client.remove_command_handler(COMMAND_KARMA_STATS, self.karma_stats)
+ self.client.remove_command_handler(COMMAND_OWN_KARMA_VIEW, self.view_own_karma)
+ self.client.remove_command_handler(COMMAND_OWN_KARMA_BREAKDOWN, self.own_karma_breakdown)
+ self.client.remove_command_handler(COMMAND_OWN_KARMA_EXPORT, self.export_own_karma)
+
+ def parse_content(self, evt: Event) -> str:
if isinstance(evt, MessageEvent):
- return "message event"
+ if evt.content.msgtype in (MessageType.NOTICE, MessageType.TEXT, MessageType.EMOTE):
+ if evt.content.msgtype == MessageType.EMOTE:
+ evt.content.body = "/me " + evt.content.body
+ return (html.escape(evt.content.body[:50]) + " \u2026"
+ if len(evt.content.body) > 60
+ else html.escape(evt.content.body))
+ name = media_reply_fallback_body_map[evt.content.msgtype]
+ return f"[{name}]({self.client.get_download_url(evt.content.url)})"
elif isinstance(evt, StateEvent):
- return "state event"
- return "unknown event"
+ return "a state event"
+ return "an unknown event"
@staticmethod
def sign(value: int) -> str:
@@ -97,11 +129,13 @@ class KarmaBot(Plugin):
else:
return "±0"
- async def vote(self, evt: MessageEvent, value: int) -> None:
- reply_to = evt.content.get_reply_to()
- if not reply_to:
+ async def vote(self, evt: MessageEvent, target: EventID, value: int) -> None:
+ if not target:
return
- karma_target = await self.client.get_event(evt.room_id, reply_to)
+ if self.karma.is_vote_event(target):
+ await evt.reply("Sorry, you can't vote on votes.")
+ return
+ karma_target = await self.client.get_event(evt.room_id, target)
if not karma_target:
return
if karma_target.sender == evt.sender and value > 0:
@@ -122,12 +156,81 @@ class KarmaBot(Plugin):
await evt.mark_read()
def upvote(self, evt: MessageEvent) -> Awaitable[None]:
- return self.vote(evt, +1)
+ return self.vote(evt, evt.content.get_reply_to(), +1)
def downvote(self, evt: MessageEvent) -> Awaitable[None]:
- return self.vote(evt, -1)
+ return self.vote(evt, evt.content.get_reply_to(), -1)
+
+ async def karma_stats(self, evt: MessageEvent) -> None:
+ await evt.reply("Not yet implemented :(")
+
+ def denotify(self, mxid: UserID) -> str:
+ localpart, _ = self.client.parse_mxid(mxid)
+ return localpart.replace("", "\u2063")
+
+ async def karma_list(self, evt: MessageEvent) -> None:
+ list_type = evt.content.command.arguments[ARG_LIST]
+ if not list_type:
+ await evt.reply("**Usage**: !karma [top|bottom|best|worst]")
+ return
+ message = None
+ if list_type in ("top", "bot", "bottom"):
+ message = self.karma_user_list(list_type)
+ elif list_type in ("best", "worst"):
+ message = self.karma_message_list(list_type)
+ if message is not None:
+ await evt.reply(message)
+
+ def karma_user_list(self, list_type: str) -> Optional[str]:
+ if list_type == "top":
+ karma_list = self.karma_cache.get_high()
+ message = "#### Highest karma\n\n"
+ elif list_type in ("bot", "bottom"):
+ karma_list = self.karma_cache.get_low()
+ message = "#### Lowest karma\n\n"
+ else:
+ return None
+ message += "\n".join(
+ f"{index + 1}. [{self.denotify(karma.user_id)}](https://matrix.to/#/{karma.user_id}): "
+ f"{self.sign(karma.total)} (+{karma.positive}/-{karma.negative})"
+ for index, karma in enumerate(karma_list))
+ return message
+
+ def karma_message_list(self, list_type: str) -> Optional[str]:
+ if list_type == "best":
+ karma_list = self.karma.get_best_events()
+ message = "#### Best messages\n\n"
+ elif list_type == "worst":
+ karma_list = self.karma.get_worst_events()
+ message = "#### Worst messages\n\n"
+ else:
+ return None
+ message += "\n".join(
+ f"{index + 1}. [Event](https://matrix.to/#/{event.event_id}) by "
+ f"[{self.denotify(event.sender)}](https://matrix.to/#/{event.sender}) with "
+ f"{self.sign(event.total)} karma (+{event.positive}/-{event.negative})\n"
+ f" > {event.content}"
+ for index, event in enumerate(karma_list))
+ return message
async def view_karma(self, evt: MessageEvent) -> None:
+ pass
+
+ async def export_own_karma(self, evt: MessageEvent) -> None:
+ karma_list = [karma.to_dict() for karma in self.karma.export(evt.sender)]
+ data = json.dumps(karma_list).encode("utf-8")
+ url = await self.client.upload_media(data, mime_type="application/json")
+ await evt.reply(MediaMessageEventContent(
+ msgtype=MessageType.FILE,
+ body=f"karma-{evt.sender}.json",
+ url=url,
+ info=FileInfo(
+ mimetype="application/json",
+ size=len(data),
+ )
+ ))
+
+ async def view_own_karma(self, evt: MessageEvent) -> None:
karma = self.karma_cache.get_karma(evt.sender)
if karma is None:
await evt.reply("You don't have any karma :(")
@@ -136,20 +239,5 @@ class KarmaBot(Plugin):
await evt.reply(f"You have {karma.total} karma (+{karma.positive}/-{karma.negative})"
f" and are #{index} on the top list.")
- async def view_karma_list(self, evt: MessageEvent) -> None:
- list_type = evt.content.command.arguments[ARG_LIST]
- if not list_type:
- await evt.reply("**Usage**: !karma [top|bottom]")
- return
- if list_type in ("top", "high", "highscore"):
- karma_list = self.karma_cache.get_high()
- message = "#### Highest karma\n\n"
- elif list_type in ("bot", "bottom", "low"):
- karma_list = self.karma_cache.get_low()
- message = "#### Lowest karma\n\n"
- else:
- return
- message += "\n".join(f"{index + 1}. [{karma.user_id}](https://matrix.to/#/{karma.user_id}):"
- f" {karma.total} (+{karma.positive}/-{karma.negative})"
- for index, karma in enumerate(karma_list))
- await evt.reply(message)
+ async def own_karma_breakdown(self, evt: MessageEvent) -> None:
+ await evt.reply("Not yet implemented :(")
diff --git a/karma/db.py b/karma/db.py
index fc1aae0..e3d86c4 100644
--- a/karma/db.py
+++ b/karma/db.py
@@ -13,10 +13,11 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import List, Tuple, Optional, Type
+from typing import List, Tuple, Optional, Type, Iterable, Dict, Any, NamedTuple
from time import time
-from sqlalchemy import Column, String, Integer, BigInteger, Text, Table, select, and_
+from sqlalchemy import (Column, String, Integer, BigInteger, Text, Table,
+ select, and_, or_, func, case, asc, desc)
from sqlalchemy.sql.base import ImmutableColumnCollection
from sqlalchemy.engine.base import Engine, Connection
from sqlalchemy.ext.declarative import declarative_base
@@ -116,6 +117,11 @@ class KarmaCache:
negative=negative_diff).insert(conn)
+EventKarmaStats = NamedTuple("EventKarmaStats", event_id=EventID, sender=UserID, content=str,
+ total=int, positive=int, negative=int)
+UserKarmaStats = NamedTuple("UserKarmaStats", user_id=UserID, total=int, positive=int, negative=int)
+
+
class Karma:
__tablename__ = "karma"
db: Engine = None
@@ -128,17 +134,86 @@ class Karma:
given_in: RoomID = Column(String(255), primary_key=True)
given_for: EventID = Column(String(255), primary_key=True)
- given_from: EventID = Column(String(255))
+ given_from: EventID = Column(String(255), unique=True)
given_at: int = Column(BigInteger)
value: int = Column(Integer)
content: str = Column(Text)
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "to": self.given_to,
+ "by": self.given_by,
+ "in": self.given_in,
+ "for": self.given_for,
+ "from": self.given_from,
+ "at": self.given_at,
+ "value": self.value,
+ "content": self.content,
+ }
+
@classmethod
- def all(cls, user_id: UserID) -> List['Karma']:
- return [cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
+ def get_best_events(cls, limit: int = 10) -> Iterable['EventKarmaStats']:
+ return cls.get_event_stats(direction=desc, limit=limit)
+
+ @classmethod
+ def get_worst_events(cls, limit: int = 10) -> Iterable['EventKarmaStats']:
+ return cls.get_event_stats(direction=asc, limit=limit)
+
+ @classmethod
+ def get_top_users(cls, limit: int = 10) -> Iterable['UserKarmaStats']:
+ return cls.get_user_stats(direction=desc, limit=limit)
+
+ @classmethod
+ def get_bottom_users(cls, limit: int = 10) -> Iterable['UserKarmaStats']:
+ return cls.get_user_stats(direction=asc, limit=limit)
+
+ @classmethod
+ def get_event_stats(cls, direction, limit: int = 10) -> Iterable['EventKarmaStats']:
+ c = cls.c
+ return (EventKarmaStats(*row) for row in cls.db.execute(
+ select([c.given_for, c.given_to, c.content,
+ func.sum(c.value).label("total"),
+ func.sum(case([(c.value > 0, c.value)], else_=0)).label("positive"),
+ func.abs(func.sum(case([(c.value < 0, c.value)], else_=0))).label("negative")])
+ .group_by(c.given_for)
+ .order_by(direction("total"))
+ .limit(limit)))
+
+ @classmethod
+ def get_user_stats(cls, direction, limit: int = 10) -> Iterable['UserKarmaStats']:
+ c = cls.c
+ return (UserKarmaStats(*row) for row in cls.db.execute(
+ select([c.given_to,
+ func.sum(c.value).label("total"),
+ func.sum(case([(c.value > 0, c.value)], else_=0)).label("positive"),
+ func.abs(func.sum(case([(c.value < 0, c.value)], else_=0))).label("negative")])
+ .group_by(c.given_to)
+ .order_by(direction("total"))
+ .limit(limit)))
+
+ @classmethod
+ def all(cls, user_id: UserID) -> Iterable['Karma']:
+ return (cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
given_from=given_from, given_at=given_at, value=value, content=content)
for given_to, given_by, given_in, given_for, given_from, given_at, value, content
- in cls.db.execute(cls.t.select().where(cls.c.given_to == user_id))]
+ in cls.db.execute(cls.t.select().where(cls.c.given_to == user_id)))
+
+ @classmethod
+ def export(cls, user_id: UserID) -> Iterable['Karma']:
+ return (cls(given_to=given_to, given_by=given_by, given_in=given_in, given_for=given_for,
+ given_from=given_from, given_at=given_at, value=value, content=content)
+ for given_to, given_by, given_in, given_for, given_from, given_at, value, content
+ in cls.db.execute(cls.t.select().where(or_(cls.c.given_to == user_id,
+ cls.c.given_by == user_id))))
+
+ @classmethod
+ def is_vote_event(cls, event_id: EventID) -> bool:
+ rows = cls.db.execute(cls.t.select().where(cls.c.given_from == event_id))
+ try:
+ next(rows)
+ return True
+ except StopIteration:
+ return False
@classmethod
def get(cls, given_to: UserID, given_by: UserID, given_in: RoomID, given_for: Event