diff --git a/karma/bot.py b/karma/bot.py
index cd3219e..ae50ea3 100644
--- a/karma/bot.py
+++ b/karma/bot.py
@@ -13,13 +13,13 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import Awaitable, Type, Optional
+from typing import Awaitable, Type, Optional, Tuple
import json
import html
-from sqlalchemy.engine.base import Engine
-
-from maubot import Plugin, CommandSpec, Command, PassiveCommand, Argument, MessageEvent
+from maubot import Plugin, MessageEvent
+from maubot.handlers import command
+from mautrix.client import Client
from mautrix.types import (Event, StateEvent, EventID, UserID, FileInfo, MessageType,
MediaMessageEventContent)
from mautrix.client.api.types.event.message import media_reply_fallback_body_map
@@ -27,23 +27,6 @@ from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from .db import make_tables, Karma, Version
-COMMAND_PASSIVE_UPVOTE = "xyz.maubot.karma.up"
-COMMAND_PASSIVE_DOWNVOTE = "xyz.maubot.karma.down"
-
-ARG_LIST = "$list"
-ARG_LIST_MATCHES = "top|bot(?:tom)?|best|worst"
-COMMAND_KARMA_LIST = f"karma {ARG_LIST}"
-ARG_USER = "$user"
-ARG_USER_MATCHES = "@[^:]+:.+"
-COMMAND_KARMA_VIEW = f"karma {ARG_USER}"
-COMMAND_KARMA_STATS = "karma stats"
-COMMAND_OWN_KARMA_VIEW = "karma"
-COMMAND_OWN_KARMA_BREAKDOWN = "karma breakdown"
-COMMAND_OWN_KARMA_EXPORT = "karma export"
-
-COMMAND_UPVOTE = "upvote"
-COMMAND_DOWNVOTE = "downvote"
-
UPVOTE_EMOJI = r"(?:\U0001F44D[\U0001F3FB-\U0001F3FF]?)"
UPVOTE_EMOJI_SHORTHAND = r"(?:\:\+1\:)|(?:\:thumbsup\:)"
UPVOTE_TEXT = r"(?:\+(?:1|\+)?)"
@@ -64,56 +47,94 @@ class Config(BaseProxyConfig):
class KarmaBot(Plugin):
karma: Type[Karma]
version: Type[Version]
- db: Engine
async def start(self) -> None:
+ await super().start()
self.config.load_and_update()
- self.db = self.request_db_engine()
- self.karma, self.version = make_tables(self.db)
- self.set_command_spec(CommandSpec(commands=[
- Command(syntax=COMMAND_KARMA_STATS, description="View global karma statistics"),
- Command(syntax=COMMAND_OWN_KARMA_VIEW, description="View your karma"),
- Command(syntax=COMMAND_OWN_KARMA_BREAKDOWN, description="View your karma breakdown"),
- Command(syntax=COMMAND_OWN_KARMA_EXPORT, description="Export the data of your karma"),
- Command(syntax=COMMAND_KARMA_LIST, description="View the karma top lists",
- arguments={ARG_LIST: Argument(matches=ARG_LIST_MATCHES, required=True,
- description="The list to view")}),
- Command(syntax=COMMAND_KARMA_VIEW, description="View the karma of a specific user",
- arguments={ARG_USER: Argument(matches=ARG_USER_MATCHES, required=True,
- description="The user whose karma to view")}),
- Command(syntax=COMMAND_UPVOTE, description="Upvote a message"),
- Command(syntax=COMMAND_DOWNVOTE, description="Downvote a message"),
- ], passive_commands=[
- PassiveCommand(COMMAND_PASSIVE_UPVOTE, match_against="body", matches=UPVOTE),
- PassiveCommand(COMMAND_PASSIVE_DOWNVOTE, match_against="body", matches=DOWNVOTE)
- ]))
+ self.karma, self.version = make_tables(self.database)
- self.client.add_command_handler(COMMAND_PASSIVE_UPVOTE, self.upvote)
- self.client.add_command_handler(COMMAND_PASSIVE_DOWNVOTE, self.downvote)
- self.client.add_command_handler(COMMAND_UPVOTE, self.upvote)
- self.client.add_command_handler(COMMAND_DOWNVOTE, self.downvote)
+ @command.new("karma", help="View users' karma or karma top lists")
+ async def karma(self) -> None:
+ pass
- self.client.add_command_handler(COMMAND_KARMA_LIST, self.karma_list)
- self.client.add_command_handler(COMMAND_KARMA_VIEW, self.view_karma)
- self.client.add_command_handler(COMMAND_KARMA_STATS, self.karma_stats)
- self.client.add_command_handler(COMMAND_OWN_KARMA_VIEW, self.view_own_karma)
- self.client.add_command_handler(COMMAND_OWN_KARMA_BREAKDOWN, self.own_karma_breakdown)
- self.client.add_command_handler(COMMAND_OWN_KARMA_EXPORT, self.export_own_karma)
+ @karma.subcommand("up", help="Upvote a message")
+ def upvote(self, evt: MessageEvent) -> Awaitable[None]:
+ return self._vote(evt, evt.content.get_reply_to(), +1)
- async def stop(self) -> None:
- self.client.remove_command_handler(COMMAND_PASSIVE_UPVOTE, self.upvote)
- self.client.remove_command_handler(COMMAND_PASSIVE_DOWNVOTE, self.downvote)
- self.client.remove_command_handler(COMMAND_UPVOTE, self.upvote)
- self.client.remove_command_handler(COMMAND_DOWNVOTE, self.downvote)
+ @karma.subcommand("down", help="Downvote a message")
+ def downvote(self, evt: MessageEvent) -> Awaitable[None]:
+ return self._vote(evt, evt.content.get_reply_to(), -1)
- self.client.remove_command_handler(COMMAND_KARMA_LIST, self.karma_list)
- self.client.remove_command_handler(COMMAND_KARMA_VIEW, self.view_karma)
- self.client.remove_command_handler(COMMAND_KARMA_STATS, self.karma_stats)
- self.client.remove_command_handler(COMMAND_OWN_KARMA_VIEW, self.view_own_karma)
- self.client.remove_command_handler(COMMAND_OWN_KARMA_BREAKDOWN, self.own_karma_breakdown)
- self.client.remove_command_handler(COMMAND_OWN_KARMA_EXPORT, self.export_own_karma)
+ @command.passive(UPVOTE)
+ def upvote(self, evt: MessageEvent) -> Awaitable[None]:
+ return self._vote(evt, evt.content.get_reply_to(), +1)
- def parse_content(self, evt: Event) -> str:
+ @command.passive(DOWNVOTE)
+ def downvote(self, evt: MessageEvent) -> Awaitable[None]:
+ return self._vote(evt, evt.content.get_reply_to(), -1)
+
+ @karma.subcommand("stats", help="View global karma statistics")
+ async def karma_stats(self, evt: MessageEvent) -> None:
+ await evt.reply("Not yet implemented :(")
+
+ @karma.subcommand("view", help="View your or another users karma")
+ @command.argument("user", required=False, parser=Client.parse_mxid)
+ async def view_karma(self, evt: MessageEvent, user: Optional[Tuple[str, str]]) -> None:
+ if user is not None:
+ mxid = UserID(f"@{user[0]}:{user[1]}")
+ name = f"[{user[0]}](https://matrix.to/#/{mxid})"
+ word_have = "has"
+ word_to_be = "is"
+ else:
+ mxid = evt.sender
+ name = "You"
+ word_have = "have"
+ word_to_be = "are"
+ karma = self.karma.get_karma(mxid)
+ if karma is None or karma.total is None:
+ await evt.reply(f"{name} {word_have} no karma :(")
+ return
+ index = self.karma.find_index_from_top(mxid)
+ await evt.reply(f"{name} {word_have} {karma.total} karma "
+ f"(+{karma.positive}/-{karma.negative}) "
+ f"and {word_to_be} #{index + 1 or '∞'} on the top list.")
+
+ @karma.subcommand("export", help="Export the data of your karma")
+ async def export_own_karma(self, evt: MessageEvent) -> None:
+ karma_list = [karma.to_dict() for karma in self.karma.export(evt.sender)]
+ data = json.dumps(karma_list).encode("utf-8")
+ url = await self.client.upload_media(data, mime_type="application/json")
+ await evt.reply(MediaMessageEventContent(
+ msgtype=MessageType.FILE,
+ body=f"karma-{evt.sender}.json",
+ url=url,
+ info=FileInfo(
+ mimetype="application/json",
+ size=len(data),
+ )
+ ))
+
+ @karma.subcommand("breakdown", help="View your karma breakdown")
+ async def own_karma_breakdown(self, evt: MessageEvent) -> None:
+ await evt.reply("Not yet implemented :(")
+
+ @karma.subcommand("top", help="View the highest rated users")
+ async def karma_list(self, evt: MessageEvent) -> None:
+ await evt.reply(self._karma_user_list("top"))
+
+ @karma.subcommand("bottom", help="View the lowest rated users")
+ async def karma_list(self, evt: MessageEvent) -> None:
+ await evt.reply(self._karma_user_list("bottom"))
+
+ @karma.subcommand("best", help="View the highest rated messages")
+ async def karma_list(self, evt: MessageEvent) -> None:
+ await evt.reply(self._karma_message_list("best"))
+
+ @karma.subcommand("worst", help="View the lowest rated messages")
+ async def karma_list(self, evt: MessageEvent) -> None:
+ await evt.reply(self._karma_message_list("worst"))
+
+ def _parse_content(self, evt: Event) -> str:
if isinstance(evt, MessageEvent):
if evt.content.msgtype in (MessageType.NOTICE, MessageType.TEXT, MessageType.EMOTE):
body = evt.content.body
@@ -131,7 +152,7 @@ class KarmaBot(Plugin):
return "an unknown event"
@staticmethod
- def sign(value: int) -> str:
+ def _sign(value: int) -> str:
if value > 0:
return f"+{value}"
elif value < 0:
@@ -139,7 +160,7 @@ class KarmaBot(Plugin):
else:
return "±0"
- async def vote(self, evt: MessageEvent, target: EventID, value: int) -> None:
+ async def _vote(self, evt: MessageEvent, target: EventID, value: int) -> None:
if not target:
return
in_filter = evt.sender in self.config["filter"]
@@ -160,42 +181,20 @@ class KarmaBot(Plugin):
existing = self.karma.get(**karma_id)
if existing is not None:
if existing.value == value:
- await evt.reply(f"You already {self.sign(value)}'d that message.")
+ await evt.reply(f"You already {self._sign(value)}'d that message.")
return
existing.update(new_value=value)
else:
karma = self.karma(**karma_id, given_from=evt.event_id, value=value,
- content=self.parse_content(karma_target))
+ content=self._parse_content(karma_target))
karma.insert()
await evt.mark_read()
- def upvote(self, evt: MessageEvent) -> Awaitable[None]:
- return self.vote(evt, evt.content.get_reply_to(), +1)
-
- def downvote(self, evt: MessageEvent) -> Awaitable[None]:
- return self.vote(evt, evt.content.get_reply_to(), -1)
-
- async def karma_stats(self, evt: MessageEvent) -> None:
- await evt.reply("Not yet implemented :(")
-
- def denotify(self, mxid: UserID) -> str:
+ def _denotify(self, mxid: UserID) -> str:
localpart, _ = self.client.parse_mxid(mxid)
return "\u2063".join(localpart)
- async def karma_list(self, evt: MessageEvent) -> None:
- list_type = evt.content.command.arguments[ARG_LIST]
- if not list_type:
- await evt.reply("**Usage**: !karma [top|bottom|best|worst]")
- return
- message = None
- if list_type in ("top", "bot", "bottom"):
- message = self.karma_user_list(list_type)
- elif list_type in ("best", "worst"):
- message = self.karma_message_list(list_type)
- if message is not None:
- await evt.reply(message)
-
- def karma_user_list(self, list_type: str) -> Optional[str]:
+ def _karma_user_list(self, list_type: str) -> Optional[str]:
if list_type == "top":
karma_list = self.karma.get_top_users()
message = "#### Highest karma\n\n"
@@ -205,12 +204,12 @@ class KarmaBot(Plugin):
else:
return None
message += "\n".join(
- f"{index + 1}. [{self.denotify(karma.user_id)}](https://matrix.to/#/{karma.user_id}): "
- f"{self.sign(karma.total)} (+{karma.positive}/-{karma.negative})"
+ f"{index + 1}. [{self._denotify(karma.user_id)}](https://matrix.to/#/{karma.user_id}): "
+ f"{self._sign(karma.total)} (+{karma.positive}/-{karma.negative})"
for index, karma in enumerate(karma_list))
return message
- def karma_message_list(self, list_type: str) -> Optional[str]:
+ def _karma_message_list(self, list_type: str) -> Optional[str]:
if list_type == "best":
karma_list = self.karma.get_best_events()
message = "#### Best messages\n\n"
@@ -221,53 +220,12 @@ class KarmaBot(Plugin):
return None
message += "\n".join(
f"{index + 1}. Event"
- f" by [{self.denotify(event.sender)}](https://matrix.to/#/{event.sender}) with"
- f" {self.sign(event.total)} karma (+{event.positive}/-{event.negative})\n"
+ f" by [{self._denotify(event.sender)}](https://matrix.to/#/{event.sender}) with"
+ f" {self._sign(event.total)} karma (+{event.positive}/-{event.negative})\n"
f" > {event.content}"
for index, event in enumerate(karma_list))
return message
- async def view_karma(self, evt: MessageEvent) -> None:
- try:
- localpart, server_name = self.client.parse_mxid(evt.content.command.arguments[ARG_USER])
- except (ValueError, KeyError):
- return
- mxid = UserID(f"@{localpart}:{server_name}")
- karma = self.karma.get_karma(mxid)
- if karma is None or karma.total is None:
- await evt.reply(f"[{localpart}](https://matrix.to/#/{mxid}) has no karma :(")
- return
- index = self.karma.find_index_from_top(mxid)
- await evt.reply(f"[{localpart}](https://matrix.to/#/{mxid}) has {karma.total} karma "
- f"(+{karma.positive}/-{karma.negative}) "
- f"and is #{index + 1 or '∞'} on the top list.")
-
- async def export_own_karma(self, evt: MessageEvent) -> None:
- karma_list = [karma.to_dict() for karma in self.karma.export(evt.sender)]
- data = json.dumps(karma_list).encode("utf-8")
- url = await self.client.upload_media(data, mime_type="application/json")
- await evt.reply(MediaMessageEventContent(
- msgtype=MessageType.FILE,
- body=f"karma-{evt.sender}.json",
- url=url,
- info=FileInfo(
- mimetype="application/json",
- size=len(data),
- )
- ))
-
- async def view_own_karma(self, evt: MessageEvent) -> None:
- karma = self.karma.get_karma(evt.sender)
- if karma is None or karma.total is None:
- await evt.reply("You don't have any karma :(")
- return
- index = self.karma.find_index_from_top(evt.sender)
- await evt.reply(f"You have {karma.total} karma (+{karma.positive}/-{karma.negative}) "
- f"and are #{index + 1 or '∞'} on the top list.")
-
- async def own_karma_breakdown(self, evt: MessageEvent) -> None:
- await evt.reply("Not yet implemented :(")
-
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
diff --git a/karma/db.py b/karma/db.py
index dc64835..57fa8a2 100644
--- a/karma/db.py
+++ b/karma/db.py
@@ -100,7 +100,7 @@ class Karma:
return None
@classmethod
- def find_index_from_top(cls, user_id: UserKarmaStats) -> int:
+ def find_index_from_top(cls, user_id: UserID) -> int:
c = cls.c
rows = cls.db.execute(select([c.given_to])
.group_by(c.given_to)