97 lines
3.6 KiB
Python
97 lines
3.6 KiB
Python
|
# reminder - A maubot plugin that reacts to messages that match predefined rules.
|
||
|
# Copyright (C) 2019 Tulir Asokan
|
||
|
#
|
||
|
# This program is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU Affero General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# This program is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU Affero General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU Affero General Public License
|
||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
from typing import Union, Dict, List, Tuple, Any
|
||
|
from itertools import chain
|
||
|
import json
|
||
|
import copy
|
||
|
import re
|
||
|
|
||
|
from attr import dataclass
|
||
|
from jinja2 import Template as JinjaTemplate
|
||
|
|
||
|
from mautrix.types import EventType, Event
|
||
|
|
||
|
|
||
|
class Key(str):
|
||
|
pass
|
||
|
|
||
|
|
||
|
variable_regex = re.compile(r"\$\${([0-9A-Za-z-_]+)}")
|
||
|
|
||
|
Index = Union[str, int, Key]
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class Template:
|
||
|
type: EventType
|
||
|
variables: Dict[str, JinjaTemplate]
|
||
|
content: Union[Dict[str, Any], JinjaTemplate]
|
||
|
|
||
|
_variable_locations: List[Tuple[Index, ...]] = None
|
||
|
|
||
|
def init(self) -> 'Template':
|
||
|
self._variable_locations = []
|
||
|
self._map_variable_locations((), self.content)
|
||
|
return self
|
||
|
|
||
|
def _map_variable_locations(self, path: Tuple[Index, ...], data: Any) -> None:
|
||
|
if isinstance(data, list):
|
||
|
for i, v in enumerate(data):
|
||
|
self._map_variable_locations((*path, i), v)
|
||
|
elif isinstance(data, dict):
|
||
|
for k, v in data.items():
|
||
|
if variable_regex.match(k):
|
||
|
self._variable_locations.append((*path, Key(k)))
|
||
|
self._map_variable_locations((*path, k), v)
|
||
|
elif isinstance(data, str):
|
||
|
if variable_regex.match(data):
|
||
|
self._variable_locations.append(path)
|
||
|
|
||
|
@classmethod
|
||
|
def _recurse(cls, content: Any, path: Tuple[Index, ...]) -> Any:
|
||
|
if len(path) == 0:
|
||
|
return content
|
||
|
return cls._recurse(content[path[0]], path[1:])
|
||
|
|
||
|
@staticmethod
|
||
|
def _replace_variables(tpl: str, variables: Dict[str, Any]) -> str:
|
||
|
for match in variable_regex.finditer(tpl):
|
||
|
val = variables[match.group(1)]
|
||
|
if match.start() == 0 and match.end() == len(tpl):
|
||
|
# Whole field is a single variable, just return the value to allow non-string types.
|
||
|
return val
|
||
|
tpl = tpl[:match.start()] + val + tpl[match.end():]
|
||
|
return tpl
|
||
|
|
||
|
def execute(self, evt: Event, rule_vars: Dict[str, JinjaTemplate], extra_vars: Dict[str, str]
|
||
|
) -> Dict[str, Any]:
|
||
|
variables = {**{name: template.render(event=evt)
|
||
|
for name, template in chain(self.variables.items(), rule_vars.items())},
|
||
|
**extra_vars}
|
||
|
if isinstance(self.content, JinjaTemplate):
|
||
|
raw_json = self.content.render(event=evt, **variables)
|
||
|
return json.loads(raw_json)
|
||
|
content = copy.deepcopy(self.content)
|
||
|
for path in self._variable_locations:
|
||
|
data: Dict[str, Any] = self._recurse(content, path[:-1])
|
||
|
key = path[-1]
|
||
|
if isinstance(key, Key):
|
||
|
key = str(key)
|
||
|
data[self._replace_variables(key, variables)] = data.pop(key)
|
||
|
else:
|
||
|
data[key] = self._replace_variables(data[key], variables)
|
||
|
return content
|