Limits
This commit is contained in:
parent
fa7a45902f
commit
b775e6dfce
5 changed files with 146 additions and 37 deletions
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
// TODO add "max messages in a topic" limit
|
||||
// TODO implement persistence
|
||||
// TODO implement "since=<ID>"
|
||||
|
||||
// Server is the main server
|
||||
type Server struct {
|
||||
|
@ -146,7 +147,7 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error {
|
|||
} else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) {
|
||||
return s.handleStatic(w, r)
|
||||
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicRegex.MatchString(r.URL.Path) {
|
||||
return s.handlePublish(w, r)
|
||||
return s.handlePublish(w, r, v)
|
||||
} else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) {
|
||||
return s.handleSubscribeJSON(w, r, v)
|
||||
} else if r.Method == http.MethodGet && sseRegex.MatchString(r.URL.Path) {
|
||||
|
@ -169,8 +170,11 @@ func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request) error {
|
||||
t := s.createTopic(r.URL.Path[1:])
|
||||
func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
|
||||
t, err := s.topic(r.URL.Path[1:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reader := io.LimitReader(r.Body, messageLimit)
|
||||
b, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
|
@ -223,10 +227,13 @@ func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *v
|
|||
|
||||
func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visitor, format string, contentType string, encoder messageEncoder) error {
|
||||
if err := v.AddSubscription(); err != nil {
|
||||
return err
|
||||
return errHTTPTooManyRequests
|
||||
}
|
||||
defer v.RemoveSubscription()
|
||||
t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack
|
||||
t, err := s.topic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
since, err := parseSince(r)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -304,16 +311,19 @@ func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) createTopic(id string) *topic {
|
||||
func (s *Server) topic(id string) (*topic, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if _, ok := s.topics[id]; !ok {
|
||||
if len(s.topics) >= s.config.GlobalTopicLimit {
|
||||
return nil, errHTTPTooManyRequests
|
||||
}
|
||||
s.topics[id] = newTopic(id)
|
||||
if s.firebase != nil {
|
||||
s.topics[id].Subscribe(s.firebase)
|
||||
}
|
||||
}
|
||||
return s.topics[id]
|
||||
return s.topics[id], nil
|
||||
}
|
||||
|
||||
func (s *Server) updateStatsAndExpire() {
|
||||
|
@ -331,7 +341,7 @@ func (s *Server) updateStatsAndExpire() {
|
|||
for _, t := range s.topics {
|
||||
t.Prune(s.config.MessageBufferDuration)
|
||||
subs, msgs := t.Stats()
|
||||
if msgs == 0 && (subs == 0 || (s.firebase != nil && subs == 1)) {
|
||||
if msgs == 0 && (subs == 0 || (s.firebase != nil && subs == 1)) { // Firebase is a subscriber!
|
||||
delete(s.topics, t.id)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue