maubot_karma/karma/bot.py

289 lines
12 KiB
Python
Raw Permalink Normal View History

2018-10-24 08:54:09 +00:00
# karma - A maubot plugin to track the karma of users.
2019-06-08 14:43:48 +00:00
# Copyright (C) 2019 Tulir Asokan
2018-10-24 08:54:09 +00:00
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2018-12-17 22:55:17 +00:00
from typing import Awaitable, Type, Optional, Tuple
import hashlib
import json
import html
2018-10-24 08:54:09 +00:00
2018-12-17 22:55:17 +00:00
from mautrix.client import Client
2020-08-03 11:03:41 +00:00
from mautrix.types import (Event, StateEvent, EventID, UserID, FileInfo, MessageType, EventType,
MediaMessageEventContent, ReactionEvent, RedactionEvent)
from mautrix.types.event.message import media_reply_fallback_body_map
2018-11-16 23:21:33 +00:00
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
2018-12-24 22:36:09 +00:00
from maubot import Plugin, MessageEvent
from maubot.handlers import command, event
2018-10-24 08:54:09 +00:00
from .db import make_tables, Karma, Version
2018-10-24 08:54:09 +00:00
UPVOTE_EMOJI = r"(?:\U0001F44D[\U0001F3FB-\U0001F3FF]?)"
UPVOTE_EMOJI_SHORTHAND = r"(?:\:\+1\:)|(?:\:thumbsup\:)"
UPVOTE_TEXT = r"(?:\+(?:1|\+)?)"
UPVOTE = f"^(?:{UPVOTE_EMOJI}|{UPVOTE_EMOJI_SHORTHAND}|{UPVOTE_TEXT})$"
DOWNVOTE_EMOJI = r"(?:\U0001F44E[\U0001F3FB-\U0001F3FF]?)"
DOWNVOTE_EMOJI_SHORTHAND = r"(?:\:-1\:)|(?:\:thumbsdown\:)"
DOWNVOTE_TEXT = r"(?:-(?:1|-)?)"
DOWNVOTE = f"^(?:{DOWNVOTE_EMOJI}|{DOWNVOTE_EMOJI_SHORTHAND}|{DOWNVOTE_TEXT})$"
2018-11-16 23:21:33 +00:00
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("democracy")
helper.copy("opt_out")
helper.copy("show_content")
helper.copy("store_content")
2018-11-16 23:21:33 +00:00
helper.copy("filter")
2019-01-19 14:52:08 +00:00
helper.copy("errors.filtered_users")
helper.copy("errors.vote_on_vote")
helper.copy("errors.upvote_self")
helper.copy("errors.already_voted")
2018-11-16 23:21:33 +00:00
def sha1(val: str) -> str:
return hashlib.sha1(val.encode("utf-8")).hexdigest()
2018-10-24 08:54:09 +00:00
class KarmaBot(Plugin):
2018-12-23 22:31:27 +00:00
karma_t: Type[Karma]
2018-10-24 08:54:09 +00:00
version: Type[Version]
async def start(self) -> None:
2018-12-17 22:55:17 +00:00
await super().start()
2018-11-16 23:21:33 +00:00
self.config.load_and_update()
2018-12-23 22:31:27 +00:00
self.karma_t, self.version = make_tables(self.database)
2018-10-24 08:54:09 +00:00
2018-12-17 22:55:17 +00:00
@command.new("karma", help="View users' karma or karma top lists")
async def karma(self) -> None:
pass
@karma.subcommand("up", help="Upvote an event")
@command.argument("event_id", "Event ID", required=True)
def upvote(self, evt: MessageEvent, event_id: EventID) -> Awaitable[None]:
return self._vote(evt, event_id, +1)
2018-10-24 08:54:09 +00:00
2018-12-17 22:55:17 +00:00
@karma.subcommand("down", help="Downvote a message")
@command.argument("event_id", "Event ID", required=True)
def downvote(self, evt: MessageEvent, event_id: EventID) -> Awaitable[None]:
return self._vote(evt, event_id, -1)
2018-10-24 08:54:09 +00:00
2018-12-17 22:55:17 +00:00
@command.passive(UPVOTE)
2018-12-24 22:36:09 +00:00
def upvote(self, evt: MessageEvent, _: Tuple[str]) -> Awaitable[None]:
2018-12-17 22:55:17 +00:00
return self._vote(evt, evt.content.get_reply_to(), +1)
@command.passive(DOWNVOTE)
2018-12-24 22:36:09 +00:00
def downvote(self, evt: MessageEvent, _: Tuple[str]) -> Awaitable[None]:
2018-12-17 22:55:17 +00:00
return self._vote(evt, evt.content.get_reply_to(), -1)
@command.passive(regex=UPVOTE_EMOJI, field=lambda evt: evt.content.relates_to.key,
event_type=EventType.REACTION, msgtypes=None)
def upvote_react(self, evt: ReactionEvent, key: Tuple[str]) -> Awaitable[None]:
try:
return self._vote(evt, evt.content.relates_to.event_id, 1)
except KeyError:
pass
@command.passive(regex=DOWNVOTE_EMOJI, field=lambda evt: evt.content.relates_to.key,
event_type=EventType.REACTION, msgtypes=None)
def downvote_react(self, evt: ReactionEvent, key: Tuple[str]) -> Awaitable[None]:
try:
return self._vote(evt, evt.content.relates_to.event_id, -1)
except KeyError:
pass
@event.on(EventType.ROOM_REDACTION)
async def redact(self, evt: RedactionEvent) -> None:
karma = self.karma_t.get_by_given_from(evt.redacts)
if karma:
self.log.debug(f"Deleting {karma} due to redaction by {evt.sender}.")
karma.delete()
2018-12-17 22:55:17 +00:00
@karma.subcommand("stats", help="View global karma statistics")
async def karma_stats(self, evt: MessageEvent) -> None:
await evt.reply("Not yet implemented :(")
@karma.subcommand("view", help="View your or another users karma")
2018-12-23 22:31:27 +00:00
@command.argument("user", "user ID", required=False,
parser=lambda val: Client.parse_user_id(val) if val else None)
2018-12-17 22:55:17 +00:00
async def view_karma(self, evt: MessageEvent, user: Optional[Tuple[str, str]]) -> None:
if user is not None:
mxid = UserID(f"@{user[0]}:{user[1]}")
name = f"[{user[0]}](https://matrix.to/#/{mxid})"
word_have = "has"
word_to_be = "is"
else:
mxid = evt.sender
name = "You"
word_have = "have"
word_to_be = "are"
2018-12-23 22:31:27 +00:00
karma = self.karma_t.get_karma(mxid)
2018-12-17 22:55:17 +00:00
if karma is None or karma.total is None:
await evt.reply(f"{name} {word_have} no karma :(")
return
2018-12-23 22:31:27 +00:00
index = self.karma_t.find_index_from_top(mxid)
2018-12-17 22:55:17 +00:00
await evt.reply(f"{name} {word_have} {karma.total} karma "
f"(+{karma.positive}/-{karma.negative}) "
f"and {word_to_be} #{index + 1 or ''} on the top list.")
@karma.subcommand("export", help="Export the data of your karma")
async def export_own_karma(self, evt: MessageEvent) -> None:
2018-12-23 22:31:27 +00:00
karma_list = [karma.to_dict() for karma in self.karma_t.export(evt.sender)]
2018-12-17 22:55:17 +00:00
data = json.dumps(karma_list).encode("utf-8")
url = await self.client.upload_media(data, mime_type="application/json")
await evt.reply(MediaMessageEventContent(
msgtype=MessageType.FILE,
body=f"karma-{evt.sender}.json",
url=url,
info=FileInfo(
mimetype="application/json",
size=len(data),
)
))
@karma.subcommand("breakdown", help="View your karma breakdown")
async def own_karma_breakdown(self, evt: MessageEvent) -> None:
await evt.reply("Not yet implemented :(")
@karma.subcommand("top", help="View the highest rated users")
2018-12-23 22:31:27 +00:00
async def karma_top(self, evt: MessageEvent) -> None:
2018-12-17 22:55:17 +00:00
await evt.reply(self._karma_user_list("top"))
@karma.subcommand("bottom", help="View the lowest rated users")
2018-12-23 22:31:27 +00:00
async def karma_bottom(self, evt: MessageEvent) -> None:
2018-12-17 22:55:17 +00:00
await evt.reply(self._karma_user_list("bottom"))
@karma.subcommand("best", help="View the highest rated messages")
2018-12-23 22:31:27 +00:00
async def karma_best(self, evt: MessageEvent) -> None:
2018-12-17 22:55:17 +00:00
await evt.reply(self._karma_message_list("best"))
@karma.subcommand("worst", help="View the lowest rated messages")
2018-12-23 22:31:27 +00:00
async def karma_worst(self, evt: MessageEvent) -> None:
2018-12-17 22:55:17 +00:00
await evt.reply(self._karma_message_list("worst"))
def _parse_content(self, evt: Event) -> str:
if not self.config["store_content"]:
return ""
2018-10-24 08:54:09 +00:00
if isinstance(evt, MessageEvent):
if evt.content.msgtype in (MessageType.NOTICE, MessageType.TEXT, MessageType.EMOTE):
2018-10-26 22:50:33 +00:00
body = evt.content.body
if evt.content.msgtype == MessageType.EMOTE:
2018-10-26 22:50:33 +00:00
body = "/me " + body
if self.config["store_content"] == "partial":
body = body.split("\n")[0]
if len(body) > 60:
body = body[:50] + " \u2026"
2018-10-26 22:50:33 +00:00
return body
name = media_reply_fallback_body_map[evt.content.msgtype]
return f"[{name}]({self.client.api.get_download_url(evt.content.url)})"
2018-10-24 08:54:09 +00:00
elif isinstance(evt, StateEvent):
return "a state event"
return "an unknown event"
2018-10-24 08:54:09 +00:00
@staticmethod
2018-12-17 22:55:17 +00:00
def _sign(value: int) -> str:
2018-10-24 08:54:09 +00:00
if value > 0:
return f"+{value}"
elif value < 0:
return str(value)
else:
return "±0"
2018-12-17 22:55:17 +00:00
async def _vote(self, evt: MessageEvent, target: EventID, value: int) -> None:
if not target:
return
2018-11-16 23:21:33 +00:00
in_filter = evt.sender in self.config["filter"]
if self.config["democracy"] == in_filter or sha1(evt.sender) in self.config["opt_out"]:
if self.config["errors.filtered_users"] and isinstance(evt, MessageEvent):
2019-01-19 14:52:08 +00:00
await evt.reply("Sorry, you're not allowed to vote.")
2018-11-16 23:21:33 +00:00
return
2018-12-23 22:31:27 +00:00
if self.karma_t.is_vote_event(target):
if self.config["errors.vote_on_vote"] and isinstance(evt, MessageEvent):
2019-01-19 14:52:08 +00:00
await evt.reply("Sorry, you can't vote on votes.")
2018-10-24 08:54:09 +00:00
return
karma_target = await self.client.get_event(evt.room_id, target)
2018-10-24 08:54:09 +00:00
if not karma_target:
return
if karma_target.sender == evt.sender and value > 0:
if self.config["errors.upvote_self"] and isinstance(evt, MessageEvent):
2019-01-19 14:52:08 +00:00
await evt.reply("Hey! You can't upvote yourself!")
2018-10-24 08:54:09 +00:00
return
karma_id = dict(given_to=karma_target.sender, given_by=evt.sender, given_in=evt.room_id,
given_for=karma_target.event_id)
anonymize = sha1(karma_target.sender) in self.config["opt_out"]
if anonymize:
karma_id["given_to"] = ""
2018-12-23 22:31:27 +00:00
existing = self.karma_t.get(**karma_id)
2018-10-24 08:54:09 +00:00
if existing is not None:
if existing.value == value:
if self.config["errors.already_voted"] and isinstance(evt, MessageEvent):
2019-01-19 14:52:08 +00:00
await evt.reply(f"You already {self._sign(value)}'d that message.")
2018-10-24 08:54:09 +00:00
return
existing.update(new_value=value)
else:
2018-12-23 22:31:27 +00:00
karma = self.karma_t(**karma_id, given_from=evt.event_id, value=value,
content=self._parse_content(karma_target) if not anonymize else "")
2018-10-24 08:54:09 +00:00
karma.insert()
if isinstance(evt, MessageEvent):
await evt.mark_read()
2018-10-24 08:54:09 +00:00
2018-12-17 22:55:17 +00:00
def _denotify(self, mxid: UserID) -> str:
localpart, _ = self.client.parse_user_id(mxid)
return "\u2063".join(localpart)
2018-10-24 08:54:09 +00:00
def _user_link(self, user_id: UserID) -> str:
if not user_id:
return "Anonymous"
return f"[{self._denotify(user_id)}](https://matrix.to/#/{user_id})"
2018-12-17 22:55:17 +00:00
def _karma_user_list(self, list_type: str) -> Optional[str]:
if list_type == "top":
2018-12-23 22:31:27 +00:00
karma_list = self.karma_t.get_top_users()
2018-10-24 08:54:09 +00:00
message = "#### Highest karma\n\n"
elif list_type in ("bot", "bottom"):
2018-12-23 22:31:27 +00:00
karma_list = self.karma_t.get_bottom_users()
2018-10-24 08:54:09 +00:00
message = "#### Lowest karma\n\n"
else:
return None
message += "\n".join(
f"{index + 1}. {self._user_link(karma.user_id)}: "
2018-12-17 22:55:17 +00:00
f"{self._sign(karma.total)} (+{karma.positive}/-{karma.negative})"
for index, karma in enumerate(karma_list) if karma.user_id)
return message
def _message_text(self, index, event) -> str:
text = (f"{index + 1}. [Event](https://matrix.to/#/{event.room_id}/{event.event_id})"
f" by {self._user_link(event.sender)} with"
f" {self._sign(event.total)} karma (+{event.positive}/-{event.negative})\n")
if event.content and self.config["show_content"]:
text += f" \n > {html.escape(event.content)}\n"
return text
2018-12-17 22:55:17 +00:00
def _karma_message_list(self, list_type: str) -> Optional[str]:
if list_type == "best":
2018-12-23 22:31:27 +00:00
karma_list = self.karma_t.get_best_events()
message = "#### Best messages\n\n"
elif list_type == "worst":
2018-12-23 22:31:27 +00:00
karma_list = self.karma_t.get_worst_events()
message = "#### Worst messages\n\n"
else:
return None
message += "\n".join(self._message_text(index, event)
for index, event in enumerate(karma_list))
return message
2018-11-16 23:21:33 +00:00
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config