From fdee54f9218ae7e668627214865ab6eca3bfb434 Mon Sep 17 00:00:00 2001 From: binwiederhier Date: Wed, 11 Jan 2023 21:38:10 -0500 Subject: [PATCH] Account sync in action --- server/server_account.go | 1 + web/src/app/AccountApi.js | 42 +++++++++++++++++++++++++++++- web/src/app/Api.js | 11 ++------ web/src/app/ConnectionManager.js | 14 +++++----- web/src/app/SubscriptionManager.js | 18 +++++++++---- web/src/components/App.js | 15 +++++++++++ web/src/components/Navigation.js | 8 +++--- web/src/components/hooks.js | 41 ++++++++++++++++++++++++----- 8 files changed, 118 insertions(+), 32 deletions(-) diff --git a/server/server_account.go b/server/server_account.go index 0b2df19..2f5d04f 100644 --- a/server/server_account.go +++ b/server/server_account.go @@ -46,6 +46,7 @@ func (s *Server) handleAccountGet(w http.ResponseWriter, _ *http.Request, v *vis return err } limits, stats := info.Limits, info.Stats + response := &apiAccountResponse{ Limits: &apiAccountLimits{ Basis: string(limits.Basis), diff --git a/web/src/app/AccountApi.js b/web/src/app/AccountApi.js index 5a50b89..52aa836 100644 --- a/web/src/app/AccountApi.js +++ b/web/src/app/AccountApi.js @@ -6,7 +6,7 @@ import { accountSubscriptionSingleUrl, accountSubscriptionUrl, accountTokenUrl, - accountUrl, + accountUrl, maybeWithAuth, topicUrl, withBasicAuth, withBearerAuth } from "./utils"; @@ -15,6 +15,7 @@ import subscriptionManager from "./SubscriptionManager"; import i18n from "i18next"; import prefs from "./Prefs"; import routes from "../components/routes"; +import userManager from "./UserManager"; const delayMillis = 45000; // 45 seconds const intervalMillis = 900000; // 15 minutes @@ -23,6 +24,11 @@ class AccountApi { constructor() { this.timer = null; this.listener = null; // Fired when account is fetched from remote + + // Random ID used to identify this client when sending/receiving "sync" events + // to the sync topic of an account. This ID doesn't matter much, but it will prevent + // a client from reacting to its own message. + this.identity = Math.floor(Math.random() * 2586000); } registerListener(listener) { @@ -164,6 +170,7 @@ class AccountApi { } else if (response.status !== 200) { throw new Error(`Unexpected server response ${response.status}`); } + this.triggerChange(); // Dangle! } async addSubscription(payload) { @@ -182,6 +189,7 @@ class AccountApi { } const subscription = await response.json(); console.log(`[AccountApi] Subscription`, subscription); + this.triggerChange(); // Dangle! return subscription; } @@ -201,6 +209,7 @@ class AccountApi { } const subscription = await response.json(); console.log(`[AccountApi] Subscription`, subscription); + this.triggerChange(); // Dangle! return subscription; } @@ -216,6 +225,7 @@ class AccountApi { } else if (response.status !== 200) { throw new Error(`Unexpected server response ${response.status}`); } + this.triggerChange(); // Dangle! } async upsertAccess(topic, everyone) { @@ -236,6 +246,7 @@ class AccountApi { } else if (response.status !== 200) { throw new Error(`Unexpected server response ${response.status}`); } + this.triggerChange(); // Dangle! } async deleteAccess(topic) { @@ -250,6 +261,7 @@ class AccountApi { } else if (response.status !== 200) { throw new Error(`Unexpected server response ${response.status}`); } + this.triggerChange(); // Dangle! } async sync() { @@ -285,6 +297,34 @@ class AccountApi { } } + async triggerChange() { + const account = await this.get(); + if (!account.sync_topic) { + return; + } + const url = topicUrl(config.base_url, account.sync_topic); + console.log(`[AccountApi] Triggering account change to ${url}`); + const user = await userManager.get(config.base_url); + const headers = { + Cache: "no" // We really don't need to store this! + }; + try { + const response = await fetch(url, { + method: 'PUT', + body: JSON.stringify({ + event: "sync", + source: this.identity + }), + headers: maybeWithAuth(headers, user) + }); + if (response.status < 200 || response.status > 299) { + throw new Error(`Unexpected response: ${response.status}`); + } + } catch (e) { + console.log(`[AccountApi] Publishing to sync topic failed`, e); + } + } + startWorker() { if (this.timer !== null) { return; diff --git a/web/src/app/Api.js b/web/src/app/Api.js index 4e3214a..d94f021 100644 --- a/web/src/app/Api.js +++ b/web/src/app/Api.js @@ -1,13 +1,6 @@ import { - accountPasswordUrl, - accountSettingsUrl, - accountSubscriptionSingleUrl, - accountSubscriptionUrl, - accountTokenUrl, - accountUrl, - fetchLinesIterator, maybeWithAuth, - withBasicAuth, - withBearerAuth, + fetchLinesIterator, + maybeWithAuth, topicShortUrl, topicUrl, topicUrlAuth, diff --git a/web/src/app/ConnectionManager.js b/web/src/app/ConnectionManager.js index c825ff1..1e805eb 100644 --- a/web/src/app/ConnectionManager.js +++ b/web/src/app/ConnectionManager.js @@ -11,7 +11,7 @@ class ConnectionManager { constructor() { this.connections = new Map(); // ConnectionId -> Connection (hash, see below) this.stateListener = null; // Fired when connection state changes - this.notificationListener = null; // Fired when new notifications arrive + this.messageListener = null; // Fired when new notifications arrive } registerStateListener(listener) { @@ -22,12 +22,12 @@ class ConnectionManager { this.stateListener = null; } - registerNotificationListener(listener) { - this.notificationListener = listener; + registerMessageListener(listener) { + this.messageListener = listener; } - resetNotificationListener() { - this.notificationListener = null; + resetMessageListener() { + this.messageListener = null; } /** @@ -97,9 +97,9 @@ class ConnectionManager { } notificationReceived(subscriptionId, notification) { - if (this.notificationListener) { + if (this.messageListener) { try { - this.notificationListener(subscriptionId, notification); + this.messageListener(subscriptionId, notification); } catch (e) { console.error(`[ConnectionManager] Error handling notification for ${subscriptionId}`, e); } diff --git a/web/src/app/SubscriptionManager.js b/web/src/app/SubscriptionManager.js index 2f283ff..ec8c13b 100644 --- a/web/src/app/SubscriptionManager.js +++ b/web/src/app/SubscriptionManager.js @@ -29,7 +29,8 @@ class SubscriptionManager { topic: topic, mutedUntil: 0, last: null, - remoteId: null + remoteId: null, + internal: false }; await db.subscriptions.put(subscription); return subscription; @@ -44,9 +45,11 @@ class SubscriptionManager { const remote = remoteSubscriptions[i]; const local = await this.add(remote.base_url, remote.topic); const reservation = remoteReservations?.find(r => remote.base_url === config.base_url && remote.topic === r.topic) || null; - await this.setRemoteId(local.id, remote.id); - await this.setDisplayName(local.id, remote.display_name); - await this.setReservation(local.id, reservation); // May be null! + await this.update(local.id, { + remoteId: remote.id, + displayName: remote.display_name, + reservation: reservation // May be null! + }); remoteIds.push(remote.id); } @@ -54,7 +57,8 @@ class SubscriptionManager { const localSubscriptions = await db.subscriptions.toArray(); for (let i = 0; i < localSubscriptions.length; i++) { const local = localSubscriptions[i]; - if (!local.remoteId || !remoteIds.includes(local.remoteId)) { + const remoteExists = local.remoteId && remoteIds.includes(local.remoteId); + if (!local.internal && !remoteExists) { await this.remove(local.id); } } @@ -182,6 +186,10 @@ class SubscriptionManager { }); } + async update(subscriptionId, params) { + await db.subscriptions.update(subscriptionId, params); + } + async pruneNotifications(thresholdTimestamp) { await db.notifications .where("time").below(thresholdTimestamp) diff --git a/web/src/components/App.js b/web/src/components/App.js index 7bb3695..331119a 100644 --- a/web/src/components/App.js +++ b/web/src/components/App.js @@ -27,6 +27,7 @@ import Login from "./Login"; import Pricing from "./Pricing"; import Signup from "./Signup"; import Account from "./Account"; +import accountApi from "../app/AccountApi"; export const AccountContext = createContext(null); @@ -79,6 +80,20 @@ const Layout = () => { useBackgroundProcesses(); useEffect(() => updateTitle(newNotificationsCount), [newNotificationsCount]); + useEffect(() => { + if (!account || !account.sync_topic) { + return; + } + (async () => { + const subscription = await subscriptionManager.add(config.base_url, account.sync_topic); + if (!subscription.hidden) { + await subscriptionManager.update(subscription.id, { + internal: true + }); + } + })(); + }, [account]); + return ( { }; const SubscriptionList = (props) => { - const sortedSubscriptions = props.subscriptions.sort( (a, b) => { - return (topicUrl(a.baseUrl, a.topic) < topicUrl(b.baseUrl, b.topic)) ? -1 : 1; - }); + const sortedSubscriptions = props.subscriptions + .filter(s => !s.internal) + .sort((a, b) => { + return (topicUrl(a.baseUrl, a.topic) < topicUrl(b.baseUrl, b.topic)) ? -1 : 1; + }); return ( <> {sortedSubscriptions.map(subscription => diff --git a/web/src/components/hooks.js b/web/src/components/hooks.js index a9a7e2d..77d6d99 100644 --- a/web/src/components/hooks.js +++ b/web/src/components/hooks.js @@ -1,5 +1,5 @@ import {useNavigate, useParams} from "react-router-dom"; -import {useEffect, useState} from "react"; +import {useContext, useEffect, useState} from "react"; import subscriptionManager from "../app/SubscriptionManager"; import {disallowedTopic, expandSecureUrl, topicUrl} from "../app/utils"; import notifier from "../app/Notifier"; @@ -10,6 +10,7 @@ import pruner from "../app/Pruner"; import session from "../app/Session"; import {UnauthorizedError} from "../app/AccountApi"; import accountApi from "../app/AccountApi"; +import {AccountContext} from "./App"; /** * Wire connectionManager and subscriptionManager so that subscriptions are updated when the connection @@ -20,6 +21,34 @@ export const useConnectionListeners = (subscriptions, users) => { const navigate = useNavigate(); useEffect(() => { + const handleMessage = async (subscriptionId, message) => { + const subscription = await subscriptionManager.get(subscriptionId); + if (subscription.internal) { + await handleInternalMessage(message); + } else { + await handleNotification(subscriptionId, message); + } + }; + + const handleInternalMessage = async (message) => { + console.log(`[ConnectionListener] Received message on sync topic`, message.message); + try { + const data = JSON.parse(message.message); + if (data.event === "sync") { + if (data.source !== accountApi.identity) { + console.log(`[ConnectionListener] Triggering account sync`); + await accountApi.sync(); + } else { + console.log(`[ConnectionListener] I triggered the account sync, ignoring message`); + } + } else { + console.log(`[ConnectionListener] Unknown message type. Doing nothing.`); + } + } catch (e) { + console.log(`[ConnectionListener] Error parsing sync topic message`, e); + } + }; + const handleNotification = async (subscriptionId, notification) => { const added = await subscriptionManager.addNotification(subscriptionId, notification); if (added) { @@ -28,10 +57,10 @@ export const useConnectionListeners = (subscriptions, users) => { } }; connectionManager.registerStateListener(subscriptionManager.updateState); - connectionManager.registerNotificationListener(handleNotification); + connectionManager.registerMessageListener(handleMessage); return () => { connectionManager.resetStateListener(); - connectionManager.resetNotificationListener(); + connectionManager.resetMessageListener(); } }, // We have to disable dep checking for "navigate". This is fine, it never changes. @@ -100,11 +129,9 @@ export const useBackgroundProcesses = () => { export const useAccountListener = (setAccount) => { useEffect(() => { accountApi.registerListener(setAccount); - (async () => { - await accountApi.sync(); - })(); + accountApi.sync(); // Dangle return () => { - accountApi.registerListener(); + accountApi.resetListener(); } }, []); }