# karma - A maubot plugin to track the karma of users.
# Copyright (C) 2018 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from typing import Awaitable, Type, Optional, Tuple
import json
import html
from maubot import Plugin, MessageEvent
from maubot.handlers import command
from mautrix.client import Client
from mautrix.types import (Event, StateEvent, EventID, UserID, FileInfo, MessageType,
MediaMessageEventContent)
from mautrix.client.api.types.event.message import media_reply_fallback_body_map
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from .db import make_tables, Karma, Version
UPVOTE_EMOJI = r"(?:\U0001F44D[\U0001F3FB-\U0001F3FF]?)"
UPVOTE_EMOJI_SHORTHAND = r"(?:\:\+1\:)|(?:\:thumbsup\:)"
UPVOTE_TEXT = r"(?:\+(?:1|\+)?)"
UPVOTE = f"^(?:{UPVOTE_EMOJI}|{UPVOTE_EMOJI_SHORTHAND}|{UPVOTE_TEXT})$"
DOWNVOTE_EMOJI = r"(?:\U0001F44E[\U0001F3FB-\U0001F3FF]?)"
DOWNVOTE_EMOJI_SHORTHAND = r"(?:\:-1\:)|(?:\:thumbsdown\:)"
DOWNVOTE_TEXT = r"(?:-(?:1|-)?)"
DOWNVOTE = f"^(?:{DOWNVOTE_EMOJI}|{DOWNVOTE_EMOJI_SHORTHAND}|{DOWNVOTE_TEXT})$"
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("democracy")
helper.copy("filter")
class KarmaBot(Plugin):
karma: Type[Karma]
version: Type[Version]
async def start(self) -> None:
await super().start()
self.config.load_and_update()
self.karma, self.version = make_tables(self.database)
@command.new("karma", help="View users' karma or karma top lists")
async def karma(self) -> None:
pass
@karma.subcommand("up", help="Upvote a message")
def upvote(self, evt: MessageEvent) -> Awaitable[None]:
return self._vote(evt, evt.content.get_reply_to(), +1)
@karma.subcommand("down", help="Downvote a message")
def downvote(self, evt: MessageEvent) -> Awaitable[None]:
return self._vote(evt, evt.content.get_reply_to(), -1)
@command.passive(UPVOTE)
def upvote(self, evt: MessageEvent) -> Awaitable[None]:
return self._vote(evt, evt.content.get_reply_to(), +1)
@command.passive(DOWNVOTE)
def downvote(self, evt: MessageEvent) -> Awaitable[None]:
return self._vote(evt, evt.content.get_reply_to(), -1)
@karma.subcommand("stats", help="View global karma statistics")
async def karma_stats(self, evt: MessageEvent) -> None:
await evt.reply("Not yet implemented :(")
@karma.subcommand("view", help="View your or another users karma")
@command.argument("user", required=False, parser=Client.parse_mxid)
async def view_karma(self, evt: MessageEvent, user: Optional[Tuple[str, str]]) -> None:
if user is not None:
mxid = UserID(f"@{user[0]}:{user[1]}")
name = f"[{user[0]}](https://matrix.to/#/{mxid})"
word_have = "has"
word_to_be = "is"
else:
mxid = evt.sender
name = "You"
word_have = "have"
word_to_be = "are"
karma = self.karma.get_karma(mxid)
if karma is None or karma.total is None:
await evt.reply(f"{name} {word_have} no karma :(")
return
index = self.karma.find_index_from_top(mxid)
await evt.reply(f"{name} {word_have} {karma.total} karma "
f"(+{karma.positive}/-{karma.negative}) "
f"and {word_to_be} #{index + 1 or '∞'} on the top list.")
@karma.subcommand("export", help="Export the data of your karma")
async def export_own_karma(self, evt: MessageEvent) -> None:
karma_list = [karma.to_dict() for karma in self.karma.export(evt.sender)]
data = json.dumps(karma_list).encode("utf-8")
url = await self.client.upload_media(data, mime_type="application/json")
await evt.reply(MediaMessageEventContent(
msgtype=MessageType.FILE,
body=f"karma-{evt.sender}.json",
url=url,
info=FileInfo(
mimetype="application/json",
size=len(data),
)
))
@karma.subcommand("breakdown", help="View your karma breakdown")
async def own_karma_breakdown(self, evt: MessageEvent) -> None:
await evt.reply("Not yet implemented :(")
@karma.subcommand("top", help="View the highest rated users")
async def karma_list(self, evt: MessageEvent) -> None:
await evt.reply(self._karma_user_list("top"))
@karma.subcommand("bottom", help="View the lowest rated users")
async def karma_list(self, evt: MessageEvent) -> None:
await evt.reply(self._karma_user_list("bottom"))
@karma.subcommand("best", help="View the highest rated messages")
async def karma_list(self, evt: MessageEvent) -> None:
await evt.reply(self._karma_message_list("best"))
@karma.subcommand("worst", help="View the lowest rated messages")
async def karma_list(self, evt: MessageEvent) -> None:
await evt.reply(self._karma_message_list("worst"))
def _parse_content(self, evt: Event) -> str:
if isinstance(evt, MessageEvent):
if evt.content.msgtype in (MessageType.NOTICE, MessageType.TEXT, MessageType.EMOTE):
body = evt.content.body
if evt.content.msgtype == MessageType.EMOTE:
body = "/me " + body
body = body.split("\n")[0]
if len(body) > 60:
body = body[:50] + " \u2026"
body = html.escape(body)
return body
name = media_reply_fallback_body_map[evt.content.msgtype]
return f"[{name}]({self.client.api.get_download_url(evt.content.url)})"
elif isinstance(evt, StateEvent):
return "a state event"
return "an unknown event"
@staticmethod
def _sign(value: int) -> str:
if value > 0:
return f"+{value}"
elif value < 0:
return str(value)
else:
return "±0"
async def _vote(self, evt: MessageEvent, target: EventID, value: int) -> None:
if not target:
return
in_filter = evt.sender in self.config["filter"]
if self.config["democracy"] == in_filter:
await evt.reply("Sorry, you're not allowed to vote.")
return
if self.karma.is_vote_event(target):
await evt.reply("Sorry, you can't vote on votes.")
return
karma_target = await self.client.get_event(evt.room_id, target)
if not karma_target:
return
if karma_target.sender == evt.sender and value > 0:
await evt.reply("Hey! You can't upvote yourself!")
return
karma_id = dict(given_to=karma_target.sender, given_by=evt.sender, given_in=evt.room_id,
given_for=karma_target.event_id)
existing = self.karma.get(**karma_id)
if existing is not None:
if existing.value == value:
await evt.reply(f"You already {self._sign(value)}'d that message.")
return
existing.update(new_value=value)
else:
karma = self.karma(**karma_id, given_from=evt.event_id, value=value,
content=self._parse_content(karma_target))
karma.insert()
await evt.mark_read()
def _denotify(self, mxid: UserID) -> str:
localpart, _ = self.client.parse_mxid(mxid)
return "\u2063".join(localpart)
def _karma_user_list(self, list_type: str) -> Optional[str]:
if list_type == "top":
karma_list = self.karma.get_top_users()
message = "#### Highest karma\n\n"
elif list_type in ("bot", "bottom"):
karma_list = self.karma.get_bottom_users()
message = "#### Lowest karma\n\n"
else:
return None
message += "\n".join(
f"{index + 1}. [{self._denotify(karma.user_id)}](https://matrix.to/#/{karma.user_id}): "
f"{self._sign(karma.total)} (+{karma.positive}/-{karma.negative})"
for index, karma in enumerate(karma_list))
return message
def _karma_message_list(self, list_type: str) -> Optional[str]:
if list_type == "best":
karma_list = self.karma.get_best_events()
message = "#### Best messages\n\n"
elif list_type == "worst":
karma_list = self.karma.get_worst_events()
message = "#### Worst messages\n\n"
else:
return None
message += "\n".join(
f"{index + 1}. Event"
f" by [{self._denotify(event.sender)}](https://matrix.to/#/{event.sender}) with"
f" {self._sign(event.total)} karma (+{event.positive}/-{event.negative})\n"
f" > {event.content}"
for index, event in enumerate(karma_list))
return message
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config