Resubscribe when subscription tasks raise an exception

This commit is contained in:
Sophie Tauchert 2023-01-10 14:31:44 +01:00
parent 78fd69de32
commit 420f7bc1cb
No known key found for this signature in database
GPG key ID: 52701DE5F5F51125

View file

@ -1,7 +1,7 @@
import asyncio import asyncio
import html import html
import json import json
from typing import Any, List, Tuple from typing import Any, Dict, Tuple
from aiohttp import ClientTimeout from aiohttp import ClientTimeout
from maubot import MessageEvent, Plugin from maubot import MessageEvent, Plugin
@ -20,7 +20,7 @@ from .emoji import EMOJI_FALLBACK, WHITE_CHECK_MARK, parse_tags
class NtfyBot(Plugin): class NtfyBot(Plugin):
db: DB db: DB
config: Config config: Config
tasks: List[asyncio.Task] = [] tasks: Dict[int, asyncio.Task] = {}
async def start(self) -> None: async def start(self) -> None:
await super().start() await super().start()
@ -44,7 +44,7 @@ class NtfyBot(Plugin):
await self.subscribe_to_topics() await self.subscribe_to_topics()
async def clear_subscriptions(self) -> None: async def clear_subscriptions(self) -> None:
tasks = self.tasks[:] tasks = list(self.tasks.values())
if not tasks: if not tasks:
return None return None
@ -58,7 +58,7 @@ class NtfyBot(Plugin):
pass pass
except Exception as exc: except Exception as exc:
self.log.exception("Subscription task errored", exc_info=exc) self.log.exception("Subscription task errored", exc_info=exc)
self.tasks[:] = [] self.tasks.clear()
async def can_use_command(self, evt: MessageEvent) -> bool: async def can_use_command(self, evt: MessageEvent) -> bool:
if evt.sender in self.config["admins"]: if evt.sender in self.config["admins"]:
@ -117,15 +117,18 @@ class NtfyBot(Plugin):
async def subscribe_to_topics(self) -> None: async def subscribe_to_topics(self) -> None:
topics = await self.db.get_topics() topics = await self.db.get_topics()
for topic in topics: for topic in topics:
await self.subscribe_to_topic(topic) self.subscribe_to_topic(topic)
async def subscribe_to_topic(self, topic: Topic) -> None: def subscribe_to_topic(self, topic: Topic) -> None:
def log_task_exc(task: asyncio.Task) -> None: def log_task_exc(task: asyncio.Task) -> None:
t2 = self.tasks.pop(topic.id, None)
if t2 != task:
self.log.warn("stored task doesn't match callback")
if task.done() and not task.cancelled(): if task.done() and not task.cancelled():
exc = task.exception() exc = task.exception()
self.log.exception( self.log.exception(
"Subscription task errored", exc_info=exc) "Subscription task errored, resubscribing", exc_info=exc)
# TODO: restart subscription# self.subscribe_to_topic(topic)
self.log.info("Subscribing to %s/%s", topic.server, topic.topic) self.log.info("Subscribing to %s/%s", topic.server, topic.topic)
url = "%s/%s/json" % (topic.server, topic.topic) url = "%s/%s/json" % (topic.server, topic.topic)
@ -137,8 +140,7 @@ class NtfyBot(Plugin):
self.log.debug("Subscribing to URL %s", url) self.log.debug("Subscribing to URL %s", url)
task = self.loop.create_task( task = self.loop.create_task(
self.run_topic_subscription(topic, url)) self.run_topic_subscription(topic, url))
self.tasks.append(task) self.tasks[topic.id] = task
task.add_done_callback(self.tasks.remove)
task.add_done_callback(log_task_exc) task.add_done_callback(log_task_exc)
async def run_topic_subscription(self, topic: Topic, url: str) -> None: async def run_topic_subscription(self, topic: Topic, url: str) -> None: