Blacken and isort code, add pre-commit and CI linting

This commit is contained in:
Tulir Asokan 2023-10-05 22:22:10 +03:00
parent 3507b3b63a
commit 3ca366fea9
8 changed files with 125 additions and 52 deletions

View file

@ -13,18 +13,18 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List, Union, Dict, Any
from typing import Any, Dict, List, Union
import re
from jinja2 import Template as JinjaStringTemplate
from jinja2.nativetypes import NativeTemplate as JinjaNativeTemplate
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from mautrix.types import EventType
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from .rule import RPattern, Rule
from .simplepattern import SimplePattern
from .template import Template
from .rule import Rule, RPattern
InputPattern = Union[str, Dict[str, str]]
@ -49,28 +49,32 @@ class Config(BaseProxyConfig):
self.rules = {}
self.default_flags = self._get_flags(self["default_flags"])
self.templates = {name: self._make_template(name, tpl)
for name, tpl in self["templates"].items()}
self.rules = {name: self._make_rule(name, rule)
for name, rule in self["rules"].items()}
self.templates = {
name: self._make_template(name, tpl) for name, tpl in self["templates"].items()
}
self.rules = {name: self._make_rule(name, rule) for name, rule in self["rules"].items()}
def _make_rule(self, name: str, rule: Dict[str, Any]) -> Rule:
try:
return Rule(rooms=set(rule.get("rooms", [])),
not_rooms=set(rule.get("not_rooms", [])),
matches=self._compile_all(rule["matches"]),
not_matches=self._compile_all(rule.get("not_matches", [])),
type=EventType.find(rule["type"]) if "type" in rule else None,
template=self.templates[rule["template"]],
variables=self._parse_variables(rule))
return Rule(
rooms=set(rule.get("rooms", [])),
not_rooms=set(rule.get("not_rooms", [])),
matches=self._compile_all(rule["matches"]),
not_matches=self._compile_all(rule.get("not_matches", [])),
type=EventType.find(rule["type"]) if "type" in rule else None,
template=self.templates[rule["template"]],
variables=self._parse_variables(rule),
)
except Exception as e:
raise ConfigError(f"Failed to load {name}") from e
def _make_template(self, name: str, tpl: Dict[str, Any]) -> Template:
try:
return Template(type=EventType.find(tpl.get("type", "m.room.message")),
variables=self._parse_variables(tpl),
content=self._parse_content(tpl.get("content", None))).init()
return Template(
type=EventType.find(tpl.get("type", "m.room.message")),
variables=self._parse_variables(tpl),
content=self._parse_content(tpl.get("content", None)),
).init()
except Exception as e:
raise ConfigError(f"Failed to load {name}") from e
@ -93,13 +97,19 @@ class Config(BaseProxyConfig):
@staticmethod
def _parse_variables(data: Dict[str, Any]) -> Dict[str, Any]:
return {name: (JinjaNativeTemplate(var_tpl)
if isinstance(var_tpl, str) and var_tpl.startswith("{{")
else var_tpl)
for name, var_tpl in data.get("variables", {}).items()}
return {
name: (
JinjaNativeTemplate(var_tpl)
if isinstance(var_tpl, str) and var_tpl.startswith("{{")
else var_tpl
)
for name, var_tpl in data.get("variables", {}).items()
}
@staticmethod
def _parse_content(content: Union[Dict[str, Any], str]) -> Union[Dict[str, Any], JinjaStringTemplate]:
def _parse_content(
content: Union[Dict[str, Any], str]
) -> Union[Dict[str, Any], JinjaStringTemplate]:
if not content:
return {}
elif isinstance(content, str):