This commit is contained in:
Philipp Heckel 2021-11-02 14:08:21 -04:00
parent b775e6dfce
commit 67922b0ae5
8 changed files with 175 additions and 6 deletions

View file

@ -23,7 +23,6 @@ import (
)
// TODO add "max messages in a topic" limit
// TODO implement persistence
// TODO implement "since=<ID>"
// Server is the main server
@ -33,6 +32,7 @@ type Server struct {
visitors map[string]*visitor
firebase subscriber
messages int64
cache *cache
mu sync.Mutex
}
@ -78,14 +78,32 @@ func New(conf *config.Config) (*Server, error) {
return nil, err
}
}
cache, err := maybeCreateCache(conf)
if err != nil {
return nil, err
}
topics := make(map[string]*topic)
if cache != nil {
if topics, err = cache.Load(); err != nil {
return nil, err
}
}
return &Server{
config: conf,
cache: cache,
firebase: firebaseSubscriber,
topics: make(map[string]*topic),
topics: topics,
visitors: make(map[string]*visitor),
}, nil
}
func maybeCreateCache(conf *config.Config) (*cache, error) {
if conf.CacheFile == "" {
return nil, nil
}
return newCache(conf.CacheFile)
}
func createFirebaseSubscriber(conf *config.Config) (subscriber, error) {
fb, err := firebase.NewApp(context.Background(), nil, option.WithCredentialsFile(conf.FirebaseKeyFile))
if err != nil {
@ -180,9 +198,13 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
if err != nil {
return err
}
if err := t.Publish(newDefaultMessage(t.id, string(b))); err != nil {
m := newDefaultMessage(t.id, string(b))
if err := t.Publish(m); err != nil {
return err
}
if s.cache != nil {
s.cache.Add(m)
}
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
s.mu.Lock()
s.messages++
@ -337,6 +359,13 @@ func (s *Server) updateStatsAndExpire() {
}
}
// Prune cache
if s.cache != nil {
if err := s.cache.Prune(s.config.MessageBufferDuration); err != nil {
log.Printf("error pruning cache: %s", err.Error())
}
}
// Prune old messages, remove subscriptions without subscribers
for _, t := range s.topics {
t.Prune(s.config.MessageBufferDuration)