From b775e6dfce07e2d4be5e4107cd175c03c1701d35 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Mon, 1 Nov 2021 16:39:40 -0400 Subject: [PATCH] Limits --- config/config.go | 47 +++++++++++++++++++--------------- server/index.html | 29 +++++++++++++++++++++ server/server.go | 26 +++++++++++++------ server/visitor.go | 16 ++++++------ util/limit.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 146 insertions(+), 37 deletions(-) create mode 100644 util/limit.go diff --git a/config/config.go b/config/config.go index 2d3db1c..cfa9e68 100644 --- a/config/config.go +++ b/config/config.go @@ -14,36 +14,41 @@ const ( DefaultManagerInterval = time.Minute ) -// Defines the max number of requests, here: -// 50 requests bucket, replenished at a rate of 1 per second +// Defines all the limits +// - request limit: max number of PUT/GET/.. requests (here: 50 requests bucket, replenished at a rate of 1 per second) +// - global topic limit: max number of topics overall +// - subscription limit: max number of subscriptions (active HTTP connections) per per-visitor/IP var ( - defaultRequestLimit = rate.Every(time.Second) - defaultRequestLimitBurst = 50 - defaultSubscriptionLimit = 30 // per visitor + defaultGlobalTopicLimit = 5000 + defaultVisitorRequestLimit = rate.Every(time.Second) + defaultVisitorRequestLimitBurst = 50 + defaultVisitorSubscriptionLimit = 30 ) // Config is the main config struct for the application. Use New to instantiate a default config struct. type Config struct { - ListenHTTP string - FirebaseKeyFile string - MessageBufferDuration time.Duration - KeepaliveInterval time.Duration - ManagerInterval time.Duration - RequestLimit rate.Limit - RequestLimitBurst int - SubscriptionLimit int + ListenHTTP string + FirebaseKeyFile string + MessageBufferDuration time.Duration + KeepaliveInterval time.Duration + ManagerInterval time.Duration + GlobalTopicLimit int + VisitorRequestLimit rate.Limit + VisitorRequestLimitBurst int + VisitorSubscriptionLimit int } // New instantiates a default new config func New(listenHTTP string) *Config { return &Config{ - ListenHTTP: listenHTTP, - FirebaseKeyFile: "", - MessageBufferDuration: DefaultMessageBufferDuration, - KeepaliveInterval: DefaultKeepaliveInterval, - ManagerInterval: DefaultManagerInterval, - RequestLimit: defaultRequestLimit, - RequestLimitBurst: defaultRequestLimitBurst, - SubscriptionLimit: defaultSubscriptionLimit, + ListenHTTP: listenHTTP, + FirebaseKeyFile: "", + MessageBufferDuration: DefaultMessageBufferDuration, + KeepaliveInterval: DefaultKeepaliveInterval, + ManagerInterval: DefaultManagerInterval, + GlobalTopicLimit: defaultGlobalTopicLimit, + VisitorRequestLimit: defaultVisitorRequestLimit, + VisitorRequestLimitBurst: defaultVisitorRequestLimitBurst, + VisitorSubscriptionLimit: defaultVisitorSubscriptionLimit, } } diff --git a/server/index.html b/server/index.html index b41afd7..551670c 100644 --- a/server/index.html +++ b/server/index.html @@ -81,6 +81,12 @@ +

Subscribe via phone

+

+ Once it's approved, you can use the Ntfy Android App to receive notifications directly on your phone. Just like + the server, this app is also open source. +

+

Subscribe via your app, or via the CLI

Using EventSource in JS, you can consume @@ -142,6 +148,7 @@ $ curl -s "ntfy.sh/mytopic/json?poll=1&since=10m"
# Returns messages from up to 10 minutes ago and ends the connection +

FAQ

Isn't this like ...?
@@ -165,6 +172,28 @@ That said, the logs do not contain any topic names or other details about you. Check the code if you don't believe me.

+

+ Why is Firebase used?
+ In addition to caching messages locally and delivering them to long-polling subscribers, all messages are also + published to Firebase Cloud Messaging (FCM) (if FirebaseKeyFile is set, which it is on ntfy.sh). This + is to facilitate instant notifications on Android. I tried really, really hard to avoid using FCM, but newer + versions of Android made it impossible to implement background services>. + I'm sorry. +

+ +

Privacy policy

+

+ Neither the server nor the app record any personal information, or share any of the messages and topics with + any outside service. All data is exclusively used to make the service function properly. The notable exception + is the Firebase Cloud Messaging (FCM) service, which is required to provide instant Android notifications (see + FAQ for details). +

+ +

+ The web server does not log or otherwise store request paths, remote IP addresses or even topics or messages, + aside from a short on-disk cache (up to a day) to support the since= feature and service restarts. +

+
Made with ❤️ by Philipp C. Heckel
diff --git a/server/server.go b/server/server.go index 6811a62..dcee296 100644 --- a/server/server.go +++ b/server/server.go @@ -24,6 +24,7 @@ import ( // TODO add "max messages in a topic" limit // TODO implement persistence +// TODO implement "since=" // Server is the main server type Server struct { @@ -146,7 +147,7 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error { } else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) { return s.handleStatic(w, r) } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicRegex.MatchString(r.URL.Path) { - return s.handlePublish(w, r) + return s.handlePublish(w, r, v) } else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) { return s.handleSubscribeJSON(w, r, v) } else if r.Method == http.MethodGet && sseRegex.MatchString(r.URL.Path) { @@ -169,8 +170,11 @@ func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request) error { return nil } -func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request) error { - t := s.createTopic(r.URL.Path[1:]) +func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error { + t, err := s.topic(r.URL.Path[1:]) + if err != nil { + return err + } reader := io.LimitReader(r.Body, messageLimit) b, err := io.ReadAll(reader) if err != nil { @@ -223,10 +227,13 @@ func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *v func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visitor, format string, contentType string, encoder messageEncoder) error { if err := v.AddSubscription(); err != nil { - return err + return errHTTPTooManyRequests } defer v.RemoveSubscription() - t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack + t, err := s.topic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack + if err != nil { + return err + } since, err := parseSince(r) if err != nil { return err @@ -304,16 +311,19 @@ func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error { return nil } -func (s *Server) createTopic(id string) *topic { +func (s *Server) topic(id string) (*topic, error) { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.topics[id]; !ok { + if len(s.topics) >= s.config.GlobalTopicLimit { + return nil, errHTTPTooManyRequests + } s.topics[id] = newTopic(id) if s.firebase != nil { s.topics[id].Subscribe(s.firebase) } } - return s.topics[id] + return s.topics[id], nil } func (s *Server) updateStatsAndExpire() { @@ -331,7 +341,7 @@ func (s *Server) updateStatsAndExpire() { for _, t := range s.topics { t.Prune(s.config.MessageBufferDuration) subs, msgs := t.Stats() - if msgs == 0 && (subs == 0 || (s.firebase != nil && subs == 1)) { + if msgs == 0 && (subs == 0 || (s.firebase != nil && subs == 1)) { // Firebase is a subscriber! delete(s.topics, t.id) } } diff --git a/server/visitor.go b/server/visitor.go index 06ee32d..3e028ac 100644 --- a/server/visitor.go +++ b/server/visitor.go @@ -3,6 +3,7 @@ package server import ( "golang.org/x/time/rate" "heckel.io/ntfy/config" + "heckel.io/ntfy/util" "sync" "time" ) @@ -15,16 +16,17 @@ const ( type visitor struct { config *config.Config limiter *rate.Limiter - subscriptions int + subscriptions *util.Limiter seen time.Time mu sync.Mutex } func newVisitor(conf *config.Config) *visitor { return &visitor{ - config: conf, - limiter: rate.NewLimiter(conf.RequestLimit, conf.RequestLimitBurst), - seen: time.Now(), + config: conf, + limiter: rate.NewLimiter(conf.VisitorRequestLimit, conf.VisitorRequestLimitBurst), + subscriptions: util.NewLimiter(int64(conf.VisitorSubscriptionLimit)), + seen: time.Now(), } } @@ -38,17 +40,16 @@ func (v *visitor) RequestAllowed() error { func (v *visitor) AddSubscription() error { v.mu.Lock() defer v.mu.Unlock() - if v.subscriptions >= v.config.SubscriptionLimit { + if err := v.subscriptions.Add(1); err != nil { return errHTTPTooManyRequests } - v.subscriptions++ return nil } func (v *visitor) RemoveSubscription() { v.mu.Lock() defer v.mu.Unlock() - v.subscriptions-- + v.subscriptions.Sub(1) } func (v *visitor) Keepalive() { @@ -60,6 +61,5 @@ func (v *visitor) Keepalive() { func (v *visitor) Stale() bool { v.mu.Lock() defer v.mu.Unlock() - v.seen = time.Now() return time.Since(v.seen) > visitorExpungeAfter } diff --git a/util/limit.go b/util/limit.go new file mode 100644 index 0000000..962757f --- /dev/null +++ b/util/limit.go @@ -0,0 +1,65 @@ +package util + +import ( + "errors" + "sync" +) + +// ErrLimitReached is the error returned by the Limiter and LimitWriter when the predefined limit has been reached +var ErrLimitReached = errors.New("limit reached") + +// Limiter is a helper that allows adding values up to a well-defined limit. Once the limit is reached +// ErrLimitReached will be returned. Limiter may be used by multiple goroutines. +type Limiter struct { + value int64 + limit int64 + mu sync.Mutex +} + +// NewLimiter creates a new Limiter +func NewLimiter(limit int64) *Limiter { + return &Limiter{ + limit: limit, + } +} + +// Add adds n to the limiters internal value, but only if the limit has not been reached. If the limit would be +// exceeded after adding n, ErrLimitReached is returned. +func (l *Limiter) Add(n int64) error { + l.mu.Lock() + defer l.mu.Unlock() + if l.limit == 0 { + l.value += n + return nil + } else if l.value+n <= l.limit { + l.value += n + return nil + } else { + return ErrLimitReached + } +} + +// Sub subtracts a value from the limiters internal value +func (l *Limiter) Sub(n int64) { + l.Add(-n) +} + +// Set sets the value of the limiter to n. This function ignores the limit. It is meant to set the value +// based on reality. +func (l *Limiter) Set(n int64) { + l.mu.Lock() + l.value = n + l.mu.Unlock() +} + +// Value returns the internal value of the limiter +func (l *Limiter) Value() int64 { + l.mu.Lock() + defer l.mu.Unlock() + return l.value +} + +// Limit returns the defined limit +func (l *Limiter) Limit() int64 { + return l.limit +}