backend: Periodically send client aliveness pings in event stream websocket connection (#729)

* backend: Periodically send client aliveness pings following event websocket connection

* backend: Single persistent global ping goroutine instead of per-session ticker
This commit is contained in:
LINKIWI 2024-01-31 11:20:56 -08:00 committed by GitHub
parent c55d421326
commit aace77ec40
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -2,8 +2,9 @@
package v1 package v1
import ( import (
"fmt" "encoding/json"
"net/http" "net/http"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hay-kot/homebox/backend/internal/core/services" "github.com/hay-kot/homebox/backend/internal/core/services"
@ -144,6 +145,10 @@ func (ctrl *V1Controller) HandleCurrency() errchain.HandlerFunc {
} }
func (ctrl *V1Controller) HandleCacheWS() errchain.HandlerFunc { func (ctrl *V1Controller) HandleCacheWS() errchain.HandlerFunc {
type eventMsg struct {
Event string `json:"event"`
}
m := melody.New() m := melody.New()
m.HandleConnect(func(s *melody.Session) { m.HandleConnect(func(s *melody.Session) {
@ -159,9 +164,15 @@ func (ctrl *V1Controller) HandleCacheWS() errchain.HandlerFunc {
return return
} }
jsonStr := fmt.Sprintf(`{"event": "%s"}`, e) msg := &eventMsg{Event: e}
_ = m.BroadcastFilter([]byte(jsonStr), func(s *melody.Session) bool { jsonBytes, err := json.Marshal(msg)
if err != nil {
log.Log().Msgf("error marshling event data %v: %v", data, err)
return
}
_ = m.BroadcastFilter(jsonBytes, func(s *melody.Session) bool {
groupIDStr, ok := s.Get("gid") groupIDStr, ok := s.Get("gid")
if !ok { if !ok {
return false return false
@ -177,6 +188,25 @@ func (ctrl *V1Controller) HandleCacheWS() errchain.HandlerFunc {
ctrl.bus.Subscribe(eventbus.EventLocationMutation, factory("location.mutation")) ctrl.bus.Subscribe(eventbus.EventLocationMutation, factory("location.mutation"))
ctrl.bus.Subscribe(eventbus.EventItemMutation, factory("item.mutation")) ctrl.bus.Subscribe(eventbus.EventItemMutation, factory("item.mutation"))
// Persistent asynchronous ticker that keeps all websocket connections alive with periodic pings.
go func() {
const interval = 10 * time.Second
ping := time.NewTicker(interval)
defer ping.Stop()
for range ping.C {
msg := &eventMsg{Event: "ping"}
pingBytes, err := json.Marshal(msg)
if err != nil {
log.Log().Msgf("error marshaling ping: %v", err)
} else {
_ = m.Broadcast(pingBytes)
}
}
}()
return func(w http.ResponseWriter, r *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error {
return m.HandleRequest(w, r) return m.HandleRequest(w, r)
} }