feat: WebSocket based implementation of server sent events for cache busting (#527)

* rough implementation of WS based event system for server side notifications of mutation

* fix test construction

* fix deadlock on event bus

* disable linter error

* add item mutation events

* remove old event bus code

* refactor event system to use composables

* refresh items table when new item is added

* fix create form errors

* cleanup unnecessary calls

* fix importer erorrs + limit fn calls on import
This commit is contained in:
Hayden 2023-08-02 13:00:57 -08:00 committed by GitHub
parent cceec06148
commit 2cbcc8bb1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
31 changed files with 458 additions and 208 deletions

View file

@ -1,12 +1,18 @@
package v1
import (
"fmt"
"net/http"
"github.com/google/uuid"
"github.com/hay-kot/homebox/backend/internal/core/services"
"github.com/hay-kot/homebox/backend/internal/core/services/reporting/eventbus"
"github.com/hay-kot/homebox/backend/internal/data/repo"
"github.com/hay-kot/httpkit/errchain"
"github.com/hay-kot/httpkit/server"
"github.com/rs/zerolog/log"
"github.com/olahol/melody"
)
type Results[T any] struct {
@ -49,6 +55,7 @@ type V1Controller struct {
maxUploadSize int64
isDemo bool
allowRegistration bool
bus *eventbus.EventBus
}
type (
@ -77,11 +84,12 @@ func BaseUrlFunc(prefix string) func(s string) string {
}
}
func NewControllerV1(svc *services.AllServices, repos *repo.AllRepos, options ...func(*V1Controller)) *V1Controller {
func NewControllerV1(svc *services.AllServices, repos *repo.AllRepos, bus *eventbus.EventBus, options ...func(*V1Controller)) *V1Controller {
ctrl := &V1Controller{
repo: repos,
svc: svc,
allowRegistration: true,
bus: bus,
}
for _, opt := range options {
@ -110,3 +118,42 @@ func (ctrl *V1Controller) HandleBase(ready ReadyFunc, build Build) errchain.Hand
})
}
}
func (ctrl *V1Controller) HandleCacheWS() errchain.HandlerFunc {
m := melody.New()
m.HandleConnect(func(s *melody.Session) {
auth := services.NewContext(s.Request.Context())
s.Set("gid", auth.GID)
})
factory := func(e string) func(data any) {
return func(data any) {
eventData, ok := data.(eventbus.GroupMutationEvent)
if !ok {
log.Log().Msgf("invalid event data: %v", data)
return
}
jsonStr := fmt.Sprintf(`{"event": "%s"}`, e)
_ = m.BroadcastFilter([]byte(jsonStr), func(s *melody.Session) bool {
groupIDStr, ok := s.Get("gid")
if !ok {
return false
}
GID := groupIDStr.(uuid.UUID)
return GID == eventData.GID
})
}
}
ctrl.bus.Subscribe(eventbus.EventLabelMutation, factory("label.mutation"))
ctrl.bus.Subscribe(eventbus.EventLocationMutation, factory("location.mutation"))
ctrl.bus.Subscribe(eventbus.EventItemMutation, factory("item.mutation"))
return func(w http.ResponseWriter, r *http.Request) error {
return m.HandleRequest(w, r)
}
}