No flickering for sync topic
This commit is contained in:
parent
eecd689ad5
commit
1771cb3fdb
4 changed files with 28 additions and 42 deletions
|
@ -38,24 +38,16 @@ import (
|
||||||
TODO
|
TODO
|
||||||
--
|
--
|
||||||
|
|
||||||
- HIGH Rate limiting: Sensitive endpoints (account/login/change-password/...)
|
|
||||||
- HIGH Rate limiting: dailyLimitToRate is wrong? + TESTS
|
- HIGH Rate limiting: dailyLimitToRate is wrong? + TESTS
|
||||||
|
- HIGH Rate limiting: Sensitive endpoints (account/login/change-password/...)
|
||||||
- HIGH Rate limiting: Bandwidth limit must be in tier + TESTS
|
- HIGH Rate limiting: Bandwidth limit must be in tier + TESTS
|
||||||
- HIGH Sync problems with "deleteAfter=0" and "displayName="
|
- MEDIUM: Races with v.user (see publishSyncEventAsync test)
|
||||||
- Reservation (UI): Show "This topic is reserved" error message when trying to reserve a reserved topic (Thorben)
|
- MEDIUM: Reservation (UI): Show "This topic is reserved" error message when trying to reserve a reserved topic (Thorben)
|
||||||
- Reservation (UI): Ask for confirmation when removing reservation (deadcade)
|
- MEDIUM: Reservation (UI): Ask for confirmation when removing reservation (deadcade)
|
||||||
- Reservation table delete button: dialog "keep or delete messages?"
|
- MEDIUM: Reservation table delete button: dialog "keep or delete messages?"
|
||||||
- UI: Flickering upgrade banner when logging in
|
- LOW: UI: Flickering upgrade banner when logging in
|
||||||
- JS constants
|
- LOW: JS constants
|
||||||
|
- LOW: Payments reconciliation process
|
||||||
races:
|
|
||||||
- v.user --> see publishSyncEventAsync() test
|
|
||||||
|
|
||||||
payments:
|
|
||||||
- reconciliation
|
|
||||||
|
|
||||||
delete messages + reserved topics on ResetTier delete attachments in access.go
|
|
||||||
|
|
||||||
|
|
||||||
Limits & rate limiting:
|
Limits & rate limiting:
|
||||||
users without tier: should the stats be persisted? are they meaningful? -> test that the visitor is based on the IP address!
|
users without tier: should the stats be persisted? are they meaningful? -> test that the visitor is based on the IP address!
|
||||||
|
@ -1030,12 +1022,12 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
// Subscription connections can be canceled externally, see topic.CancelSubscribers
|
// Subscription connections can be canceled externally, see topic.CancelSubscribers
|
||||||
subscriberContext, cancel := context.WithCancel(context.Background())
|
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Use errgroup to run WebSocket reader and writer in Go routines
|
// Use errgroup to run WebSocket reader and writer in Go routines
|
||||||
var wlock sync.Mutex
|
var wlock sync.Mutex
|
||||||
g, gctx := errgroup.WithContext(subscriberContext)
|
g, gctx := errgroup.WithContext(cancelCtx)
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
pongWait := s.config.KeepaliveInterval + wsPongWait
|
pongWait := s.config.KeepaliveInterval + wsPongWait
|
||||||
conn.SetReadLimit(wsReadLimit)
|
conn.SetReadLimit(wsReadLimit)
|
||||||
|
@ -1072,7 +1064,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
|
||||||
select {
|
select {
|
||||||
case <-gctx.Done():
|
case <-gctx.Done():
|
||||||
return nil
|
return nil
|
||||||
case <-subscriberContext.Done():
|
case <-cancelCtx.Done():
|
||||||
log.Trace("%s Cancel received, closing subscriber connection", logHTTPPrefix(v, r))
|
log.Trace("%s Cancel received, closing subscriber connection", logHTTPPrefix(v, r))
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
|
return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
|
||||||
|
|
|
@ -17,7 +17,7 @@ class SubscriptionManager {
|
||||||
return await db.subscriptions.get(subscriptionId)
|
return await db.subscriptions.get(subscriptionId)
|
||||||
}
|
}
|
||||||
|
|
||||||
async add(baseUrl, topic) {
|
async add(baseUrl, topic, internal) {
|
||||||
const id = topicUrl(baseUrl, topic);
|
const id = topicUrl(baseUrl, topic);
|
||||||
const existingSubscription = await this.get(id);
|
const existingSubscription = await this.get(id);
|
||||||
if (existingSubscription) {
|
if (existingSubscription) {
|
||||||
|
@ -30,7 +30,7 @@ class SubscriptionManager {
|
||||||
mutedUntil: 0,
|
mutedUntil: 0,
|
||||||
last: null,
|
last: null,
|
||||||
remoteId: null,
|
remoteId: null,
|
||||||
internal: false
|
internal: internal || false
|
||||||
};
|
};
|
||||||
await db.subscriptions.put(subscription);
|
await db.subscriptions.put(subscription);
|
||||||
return subscription;
|
return subscription;
|
||||||
|
@ -47,7 +47,7 @@ class SubscriptionManager {
|
||||||
const reservation = remoteReservations?.find(r => remote.base_url === config.base_url && remote.topic === r.topic) || null;
|
const reservation = remoteReservations?.find(r => remote.base_url === config.base_url && remote.topic === r.topic) || null;
|
||||||
await this.update(local.id, {
|
await this.update(local.id, {
|
||||||
remoteId: remote.id,
|
remoteId: remote.id,
|
||||||
displayName: remote.display_name,
|
displayName: remote.display_name, // May be undefined
|
||||||
reservation: reservation // May be null!
|
reservation: reservation // May be null!
|
||||||
});
|
});
|
||||||
remoteIds.push(remote.id);
|
remoteIds.push(remote.id);
|
||||||
|
|
|
@ -71,25 +71,11 @@ const Layout = () => {
|
||||||
|| (config.base_url === s.baseUrl && params.topic === s.topic)
|
|| (config.base_url === s.baseUrl && params.topic === s.topic)
|
||||||
});
|
});
|
||||||
|
|
||||||
useConnectionListeners(subscriptions, users);
|
useConnectionListeners(account, subscriptions, users);
|
||||||
useAccountListener(setAccount)
|
useAccountListener(setAccount)
|
||||||
useBackgroundProcesses();
|
useBackgroundProcesses();
|
||||||
useEffect(() => updateTitle(newNotificationsCount), [newNotificationsCount]);
|
useEffect(() => updateTitle(newNotificationsCount), [newNotificationsCount]);
|
||||||
|
|
||||||
useEffect(() => {
|
|
||||||
if (!account || !account.sync_topic) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
(async () => {
|
|
||||||
const subscription = await subscriptionManager.add(config.base_url, account.sync_topic);
|
|
||||||
if (!subscription.hidden) {
|
|
||||||
await subscriptionManager.update(subscription.id, {
|
|
||||||
internal: true
|
|
||||||
});
|
|
||||||
}
|
|
||||||
})();
|
|
||||||
}, [account]);
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<Box sx={{display: 'flex'}}>
|
<Box sx={{display: 'flex'}}>
|
||||||
<ActionBar
|
<ActionBar
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import {useNavigate, useParams} from "react-router-dom";
|
import {useNavigate, useParams} from "react-router-dom";
|
||||||
import {useContext, useEffect, useState} from "react";
|
import {useEffect, useState} from "react";
|
||||||
import subscriptionManager from "../app/SubscriptionManager";
|
import subscriptionManager from "../app/SubscriptionManager";
|
||||||
import {disallowedTopic, expandSecureUrl, topicUrl} from "../app/utils";
|
import {disallowedTopic, expandSecureUrl, topicUrl} from "../app/utils";
|
||||||
import notifier from "../app/Notifier";
|
import notifier from "../app/Notifier";
|
||||||
|
@ -8,18 +8,17 @@ import connectionManager from "../app/ConnectionManager";
|
||||||
import poller from "../app/Poller";
|
import poller from "../app/Poller";
|
||||||
import pruner from "../app/Pruner";
|
import pruner from "../app/Pruner";
|
||||||
import session from "../app/Session";
|
import session from "../app/Session";
|
||||||
import {UnauthorizedError} from "../app/AccountApi";
|
import accountApi, {UnauthorizedError} from "../app/AccountApi";
|
||||||
import accountApi from "../app/AccountApi";
|
|
||||||
import {AccountContext} from "./App";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wire connectionManager and subscriptionManager so that subscriptions are updated when the connection
|
* Wire connectionManager and subscriptionManager so that subscriptions are updated when the connection
|
||||||
* state changes. Conversely, when the subscription changes, the connection is refreshed (which may lead
|
* state changes. Conversely, when the subscription changes, the connection is refreshed (which may lead
|
||||||
* to the connection being re-established).
|
* to the connection being re-established).
|
||||||
*/
|
*/
|
||||||
export const useConnectionListeners = (subscriptions, users) => {
|
export const useConnectionListeners = (account, subscriptions, users) => {
|
||||||
const navigate = useNavigate();
|
const navigate = useNavigate();
|
||||||
|
|
||||||
|
// Register listeners for incoming messages, and connection state changes
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
const handleMessage = async (subscriptionId, message) => {
|
const handleMessage = async (subscriptionId, message) => {
|
||||||
const subscription = await subscriptionManager.get(subscriptionId);
|
const subscription = await subscriptionManager.get(subscriptionId);
|
||||||
|
@ -64,6 +63,15 @@ export const useConnectionListeners = (subscriptions, users) => {
|
||||||
[]
|
[]
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Sync topic listener: For accounts with sync_topic, subscribe to an internal topic
|
||||||
|
useEffect(() => {
|
||||||
|
if (!account || !account.sync_topic) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
subscriptionManager.add(config.base_url, account.sync_topic, true); // Dangle!
|
||||||
|
}, [account]);
|
||||||
|
|
||||||
|
// When subscriptions or users change, refresh the connections
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
connectionManager.refresh(subscriptions, users); // Dangle
|
connectionManager.refresh(subscriptions, users); // Dangle
|
||||||
}, [subscriptions, users]);
|
}, [subscriptions, users]);
|
||||||
|
|
Loading…
Reference in a new issue