maubot_reactbot/reactbot/bot.py

100 lines
3.4 KiB
Python
Raw Normal View History

2019-06-23 10:43:10 +00:00
# reminder - A maubot plugin that reacts to messages that match predefined rules.
# Copyright (C) 2019 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2019-06-23 11:14:28 +00:00
from typing import Type, Tuple, Dict
import time
2019-06-23 10:43:10 +00:00
2019-06-23 11:14:28 +00:00
from attr import dataclass
from mautrix.types import EventType, MessageType, UserID, RoomID
2019-06-23 10:43:10 +00:00
from mautrix.util.config import BaseProxyConfig
from maubot import Plugin, MessageEvent
from maubot.handlers import event
from .config import Config, ConfigError
2019-06-23 11:14:28 +00:00
@dataclass
class FloodInfo:
max: int
delay: int
2019-06-23 11:14:28 +00:00
count: int
last_message: int
def bump(self) -> bool:
now = int(time.time())
if self.last_message + self.delay < now:
2019-06-23 11:14:28 +00:00
self.count = 0
self.count += 1
if self.count > self.max:
2019-06-23 11:14:28 +00:00
return True
self.last_message = now
return False
2019-06-23 10:43:10 +00:00
class ReactBot(Plugin):
2019-06-23 10:57:57 +00:00
allowed_msgtypes: Tuple[MessageType, ...] = (MessageType.TEXT, MessageType.EMOTE)
2019-06-23 11:14:28 +00:00
user_flood: Dict[UserID, FloodInfo]
room_flood: Dict[RoomID, FloodInfo]
2019-06-23 10:57:57 +00:00
2019-06-23 10:43:10 +00:00
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
async def start(self) -> None:
await super().start()
2019-06-23 11:14:28 +00:00
self.user_flood = {}
self.room_flood = {}
2019-06-23 10:43:10 +00:00
self.on_external_config_update()
def on_external_config_update(self) -> None:
self.config.load_and_update()
try:
self.config.parse_data()
except ConfigError:
self.log.exception("Failed to load config")
def _make_flood_info(self, for_type: str) -> 'FloodInfo':
return FloodInfo(max=self.config[f"antispam.{for_type}.max"],
delay=self.config[f"antispam.{for_type}.delay"],
count=0, last_message=0)
def _get_flood_info(self, flood_map: dict, key: str, for_type: str) -> 'FloodInfo':
try:
return flood_map[key]
except KeyError:
fi = flood_map[key] = self._make_flood_info(for_type)
return fi
2019-06-23 11:14:28 +00:00
def is_flood(self, evt: MessageEvent) -> bool:
return (self._get_flood_info(self.user_flood, evt.sender, "user").bump()
or self._get_flood_info(self.room_flood, evt.room_id, "room").bump())
2019-06-23 11:14:28 +00:00
2019-06-23 10:43:10 +00:00
@event.on(EventType.ROOM_MESSAGE)
async def event_handler(self, evt: MessageEvent) -> None:
2019-06-23 10:57:57 +00:00
if evt.sender == self.client.mxid or evt.content.msgtype not in self.allowed_msgtypes:
2019-06-23 10:43:10 +00:00
return
for name, rule in self.config.rules.items():
match = rule.match(evt)
if match is not None:
2019-06-23 11:14:28 +00:00
if self.is_flood(evt):
return
2019-06-23 10:43:10 +00:00
try:
await rule.execute(evt, match)
except Exception:
self.log.exception(f"Failed to execute {name}")
return