parent
1ab388d503
commit
c59e0c99b7
2 changed files with 208 additions and 45 deletions
166
karma/bot.py
166
karma/bot.py
|
@ -13,12 +13,16 @@
|
|||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Awaitable, Type
|
||||
from typing import Awaitable, Type, Optional
|
||||
import json
|
||||
import html
|
||||
|
||||
from sqlalchemy.engine.base import Engine
|
||||
|
||||
from maubot import Plugin, CommandSpec, Command, PassiveCommand, Argument, MessageEvent
|
||||
from mautrix.types import Event, StateEvent
|
||||
from mautrix.types import (Event, StateEvent, EventID, UserID, FileInfo, MessageType,
|
||||
MediaMessageEventContent)
|
||||
from mautrix.client.api.types.event.message import media_reply_fallback_body_map
|
||||
|
||||
from .db import make_tables, Karma, KarmaCache, Version
|
||||
|
||||
|
@ -26,9 +30,15 @@ COMMAND_PASSIVE_UPVOTE = "xyz.maubot.karma.up"
|
|||
COMMAND_PASSIVE_DOWNVOTE = "xyz.maubot.karma.down"
|
||||
|
||||
ARG_LIST = "$list"
|
||||
ARG_LIST_MATCHES = "top|bot(?:tom)?|best|worst"
|
||||
COMMAND_KARMA_LIST = f"karma {ARG_LIST}"
|
||||
|
||||
COMMAND_OWN_KARMA = "karma"
|
||||
ARG_USER = "$user"
|
||||
ARG_USER_MATCHES = ".+"
|
||||
COMMAND_KARMA_VIEW = f"karma {ARG_USER}"
|
||||
COMMAND_KARMA_STATS = "karma stats"
|
||||
COMMAND_OWN_KARMA_VIEW = "karma"
|
||||
COMMAND_OWN_KARMA_BREAKDOWN = "karma breakdown"
|
||||
COMMAND_OWN_KARMA_EXPORT = "karma export"
|
||||
|
||||
COMMAND_UPVOTE = "upvote"
|
||||
COMMAND_DOWNVOTE = "downvote"
|
||||
|
@ -54,10 +64,16 @@ class KarmaBot(Plugin):
|
|||
self.db = self.request_db_engine()
|
||||
self.karma_cache, self.karma, self.version = make_tables(self.db)
|
||||
self.set_command_spec(CommandSpec(commands=[
|
||||
Command(syntax=COMMAND_KARMA_STATS, description="View global karma statistics"),
|
||||
Command(syntax=COMMAND_OWN_KARMA_VIEW, description="View your karma"),
|
||||
Command(syntax=COMMAND_OWN_KARMA_BREAKDOWN, description="View your karma breakdown"),
|
||||
Command(syntax=COMMAND_OWN_KARMA_EXPORT, description="Export the data of your karma"),
|
||||
Command(syntax=COMMAND_KARMA_LIST, description="View the karma top lists",
|
||||
arguments={ARG_LIST: Argument(matches="(top|bot(tom)?|high(score)?|low)",
|
||||
required=True, description="The list to view")}),
|
||||
Command(syntax=COMMAND_OWN_KARMA, description="View your karma"),
|
||||
arguments={ARG_LIST: Argument(matches=ARG_LIST_MATCHES, required=True,
|
||||
description="The list to view")}),
|
||||
Command(syntax=COMMAND_KARMA_VIEW, description="View the karma of a specific user",
|
||||
arguments={ARG_USER: Argument(matches=ARG_USER_MATCHES, required=True,
|
||||
description="The user whose karma to view")}),
|
||||
Command(syntax=COMMAND_UPVOTE, description="Upvote a message"),
|
||||
Command(syntax=COMMAND_DOWNVOTE, description="Downvote a message"),
|
||||
], passive_commands=[
|
||||
|
@ -69,24 +85,40 @@ class KarmaBot(Plugin):
|
|||
self.client.add_command_handler(COMMAND_PASSIVE_DOWNVOTE, self.downvote)
|
||||
self.client.add_command_handler(COMMAND_UPVOTE, self.upvote)
|
||||
self.client.add_command_handler(COMMAND_DOWNVOTE, self.downvote)
|
||||
self.client.add_command_handler(COMMAND_KARMA_LIST, self.view_karma_list)
|
||||
self.client.add_command_handler(COMMAND_OWN_KARMA, self.view_karma)
|
||||
|
||||
self.client.add_command_handler(COMMAND_KARMA_LIST, self.karma_list)
|
||||
self.client.add_command_handler(COMMAND_KARMA_VIEW, self.view_karma)
|
||||
self.client.add_command_handler(COMMAND_KARMA_STATS, self.karma_stats)
|
||||
self.client.add_command_handler(COMMAND_OWN_KARMA_VIEW, self.view_own_karma)
|
||||
self.client.add_command_handler(COMMAND_OWN_KARMA_BREAKDOWN, self.own_karma_breakdown)
|
||||
self.client.add_command_handler(COMMAND_OWN_KARMA_EXPORT, self.export_own_karma)
|
||||
|
||||
async def stop(self) -> None:
|
||||
self.client.remove_command_handler(COMMAND_PASSIVE_UPVOTE, self.upvote)
|
||||
self.client.remove_command_handler(COMMAND_PASSIVE_DOWNVOTE, self.downvote)
|
||||
self.client.remove_command_handler(COMMAND_UPVOTE, self.upvote)
|
||||
self.client.remove_command_handler(COMMAND_DOWNVOTE, self.downvote)
|
||||
self.client.remove_command_handler(COMMAND_KARMA_LIST, self.view_karma_list)
|
||||
self.client.remove_command_handler(COMMAND_OWN_KARMA, self.view_karma)
|
||||
|
||||
@staticmethod
|
||||
def parse_content(evt: Event) -> str:
|
||||
self.client.remove_command_handler(COMMAND_KARMA_LIST, self.karma_list)
|
||||
self.client.remove_command_handler(COMMAND_KARMA_VIEW, self.view_karma)
|
||||
self.client.remove_command_handler(COMMAND_KARMA_STATS, self.karma_stats)
|
||||
self.client.remove_command_handler(COMMAND_OWN_KARMA_VIEW, self.view_own_karma)
|
||||
self.client.remove_command_handler(COMMAND_OWN_KARMA_BREAKDOWN, self.own_karma_breakdown)
|
||||
self.client.remove_command_handler(COMMAND_OWN_KARMA_EXPORT, self.export_own_karma)
|
||||
|
||||
def parse_content(self, evt: Event) -> str:
|
||||
if isinstance(evt, MessageEvent):
|
||||
return "message event"
|
||||
if evt.content.msgtype in (MessageType.NOTICE, MessageType.TEXT, MessageType.EMOTE):
|
||||
if evt.content.msgtype == MessageType.EMOTE:
|
||||
evt.content.body = "/me " + evt.content.body
|
||||
return (html.escape(evt.content.body[:50]) + " \u2026"
|
||||
if len(evt.content.body) > 60
|
||||
else html.escape(evt.content.body))
|
||||
name = media_reply_fallback_body_map[evt.content.msgtype]
|
||||
return f"[{name}]({self.client.get_download_url(evt.content.url)})"
|
||||
elif isinstance(evt, StateEvent):
|
||||
return "state event"
|
||||
return "unknown event"
|
||||
return "a state event"
|
||||
return "an unknown event"
|
||||
|
||||
@staticmethod
|
||||
def sign(value: int) -> str:
|
||||
|
@ -97,11 +129,13 @@ class KarmaBot(Plugin):
|
|||
else:
|
||||
return "±0"
|
||||
|
||||
async def vote(self, evt: MessageEvent, value: int) -> None:
|
||||
reply_to = evt.content.get_reply_to()
|
||||
if not reply_to:
|
||||
async def vote(self, evt: MessageEvent, target: EventID, value: int) -> None:
|
||||
if not target:
|
||||
return
|
||||
karma_target = await self.client.get_event(evt.room_id, reply_to)
|
||||
if self.karma.is_vote_event(target):
|
||||
await evt.reply("Sorry, you can't vote on votes.")
|
||||
return
|
||||
karma_target = await self.client.get_event(evt.room_id, target)
|
||||
if not karma_target:
|
||||
return
|
||||
if karma_target.sender == evt.sender and value > 0:
|
||||
|
@ -122,12 +156,81 @@ class KarmaBot(Plugin):
|
|||
await evt.mark_read()
|
||||
|
||||
def upvote(self, evt: MessageEvent) -> Awaitable[None]:
|
||||
return self.vote(evt, +1)
|
||||
return self.vote(evt, evt.content.get_reply_to(), +1)
|
||||
|
||||
def downvote(self, evt: MessageEvent) -> Awaitable[None]:
|
||||
return self.vote(evt, -1)
|
||||
return self.vote(evt, evt.content.get_reply_to(), -1)
|
||||
|
||||
async def karma_stats(self, evt: MessageEvent) -> None:
|
||||
await evt.reply("Not yet implemented :(")
|
||||
|
||||
def denotify(self, mxid: UserID) -> str:
|
||||
localpart, _ = self.client.parse_mxid(mxid)
|
||||
return localpart.replace("", "\u2063")
|
||||
|
||||
async def karma_list(self, evt: MessageEvent) -> None:
|
||||
list_type = evt.content.command.arguments[ARG_LIST]
|
||||
if not list_type:
|
||||
await evt.reply("**Usage**: !karma [top|bottom|best|worst]")
|
||||
return
|
||||
message = None
|
||||
if list_type in ("top", "bot", "bottom"):
|
||||
message = self.karma_user_list(list_type)
|
||||
elif list_type in ("best", "worst"):
|
||||
message = self.karma_message_list(list_type)
|
||||
if message is not None:
|
||||
await evt.reply(message)
|
||||
|
||||
def karma_user_list(self, list_type: str) -> Optional[str]:
|
||||
if list_type == "top":
|
||||
karma_list = self.karma_cache.get_high()
|
||||
message = "#### Highest karma\n\n"
|
||||
elif list_type in ("bot", "bottom"):
|
||||
karma_list = self.karma_cache.get_low()
|
||||
message = "#### Lowest karma\n\n"
|
||||
else:
|
||||
return None
|
||||
message += "\n".join(
|
||||
f"{index + 1}. [{self.denotify(karma.user_id)}](https://matrix.to/#/{karma.user_id}): "
|
||||
f"{self.sign(karma.total)} (+{karma.positive}/-{karma.negative})"
|
||||
for index, karma in enumerate(karma_list))
|
||||
return message
|
||||
|
||||
def karma_message_list(self, list_type: str) -> Optional[str]:
|
||||
if list_type == "best":
|
||||
karma_list = self.karma.get_best_events()
|
||||
message = "#### Best messages\n\n"
|
||||
elif list_type == "worst":
|
||||
karma_list = self.karma.get_worst_events()
|
||||
message = "#### Worst messages\n\n"
|
||||
else:
|
||||
return None
|
||||
message += "\n".join(
|
||||
f"{index + 1}. [Event](https://matrix.to/#/{event.event_id}) by "
|
||||
f"[{self.denotify(event.sender)}](https://matrix.to/#/{event.sender}) with "
|
||||
f"{self.sign(event.total)} karma (+{event.positive}/-{event.negative})\n"
|
||||
f" > {event.content}"
|
||||
for index, event in enumerate(karma_list))
|
||||
return message
|
||||
|
||||
async def view_karma(self, evt: MessageEvent) -> None:
|
||||
pass
|
||||
|
||||
async def export_own_karma(self, evt: MessageEvent) -> None:
|
||||
karma_list = [karma.to_dict() for karma in self.karma.export(evt.sender)]
|
||||
data = json.dumps(karma_list).encode("utf-8")
|
||||
url = await self.client.upload_media(data, mime_type="application/json")
|
||||
await evt.reply(MediaMessageEventContent(
|
||||
msgtype=MessageType.FILE,
|
||||
body=f"karma-{evt.sender}.json",
|
||||
url=url,
|
||||
info=FileInfo(
|
||||
mimetype="application/json",
|
||||
size=len(data),
|
||||
)
|
||||
))
|
||||
|
||||
async def view_own_karma(self, evt: MessageEvent) -> None:
|
||||
karma = self.karma_cache.get_karma(evt.sender)
|
||||
if karma is None:
|
||||
await evt.reply("You don't have any karma :(")
|
||||
|
@ -136,20 +239,5 @@ class KarmaBot(Plugin):
|
|||
await evt.reply(f"You have {karma.total} karma (+{karma.positive}/-{karma.negative})"
|
||||
f" and are #{index} on the top list.")
|
||||
|
||||
async def view_karma_list(self, evt: MessageEvent) -> None:
|
||||
list_type = evt.content.command.arguments[ARG_LIST]
|
||||
if not list_type:
|
||||
await evt.reply("**Usage**: !karma [top|bottom]")
|
||||
return
|
||||
if list_type in ("top", "high", "highscore"):
|
||||
karma_list = self.karma_cache.get_high()
|
||||
message = "#### Highest karma\n\n"
|
||||
elif list_type in ("bot", "bottom", "low"):
|
||||
karma_list = self.karma_cache.get_low()
|
||||
message = "#### Lowest karma\n\n"
|
||||
else:
|
||||
return
|
||||
message += "\n".join(f"{index + 1}. [{karma.user_id}](https://matrix.to/#/{karma.user_id}):"
|
||||
f" {karma.total} (+{karma.positive}/-{karma.negative})"
|
||||
for index, karma in enumerate(karma_list))
|
||||
await evt.reply(message)
|
||||
async def own_karma_breakdown(self, evt: MessageEvent) -> None:
|
||||
await evt.reply("Not yet implemented :(")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue