From 630ecd351fb9ffeeeea253d181d486edb9a2e08b Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sat, 23 Oct 2021 13:21:33 -0400 Subject: [PATCH] SSE --- server/index.html | 132 ++++++++++++++++++++++++++-------------------- server/server.go | 36 +++++++++++-- 2 files changed, 108 insertions(+), 60 deletions(-) diff --git a/server/index.html b/server/index.html index 9ab3b9b..eaa4d5f 100644 --- a/server/index.html +++ b/server/index.html @@ -8,71 +8,89 @@

ntfy.sh

-Topics: - +

+ ntfy.sh is a super simple pub-sub notification service. It allows you to send desktop and (soon) phone notifications + via scripts, without signup or cost. It's entirely free and open source. +

- - - +

+ Usage: You can subscribe to a topic either in this web UI, or in your own app by subscribing to an SSE/EventSource + or JSON feed. Once subscribed, you can publish messages via PUT or POST. +

+ +
+ + +
+ +Topics: + + diff --git a/server/server.go b/server/server.go index 3b839b8..a78019c 100644 --- a/server/server.go +++ b/server/server.go @@ -5,6 +5,7 @@ import ( _ "embed" // required for go:embed "encoding/json" "errors" + "fmt" "github.com/gorilla/websocket" "io" "log" @@ -31,8 +32,9 @@ const ( var ( topicRegex = regexp.MustCompile(`^/[^/]+$`) - wsRegex = regexp.MustCompile(`^/[^/]+/ws$`) jsonRegex = regexp.MustCompile(`^/[^/]+/json$`) + sseRegex = regexp.MustCompile(`^/[^/]+/sse$`) + wsRegex = regexp.MustCompile(`^/[^/]+/ws$`) wsUpgrader = websocket.Upgrader{ ReadBufferSize: messageLimit, WriteBufferSize: messageLimit, @@ -82,7 +84,9 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error { } else if r.Method == http.MethodGet && wsRegex.MatchString(r.URL.Path) { return s.handleSubscribeWS(w, r) } else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) { - return s.handleSubscribeHTTP(w, r) + return s.handleSubscribeJSON(w, r) + } else if r.Method == http.MethodGet && sseRegex.MatchString(r.URL.Path) { + return s.handleSubscribeSSE(w, r) } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicRegex.MatchString(r.URL.Path) { return s.handlePublishHTTP(w, r) } @@ -112,7 +116,7 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error return t.Publish(msg) } -func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request) error { +func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) error { t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/json")) // Hack subscriberID := t.Subscribe(func (msg *message) error { if err := json.NewEncoder(w).Encode(&msg); err != nil { @@ -131,6 +135,32 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request) err return nil } +func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) error { + t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/sse")) // Hack + subscriberID := t.Subscribe(func (msg *message) error { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(&msg); err != nil { + return err + } + m := fmt.Sprintf("data: %s\n\n", buf.String()) + if _, err := io.WriteString(w, m); err != nil { + return err + } + if fl, ok := w.(http.Flusher); ok { + fl.Flush() + } + return nil + }) + defer t.Unsubscribe(subscriberID) + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + select { + case <-t.ctx.Done(): + case <-r.Context().Done(): + } + return nil +} + func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request) error { conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil {