So much logging

This commit is contained in:
Philipp Heckel 2022-06-01 23:24:44 -04:00
parent ab955d4d1c
commit 7845eb0124
12 changed files with 264 additions and 122 deletions

View file

@ -32,22 +32,22 @@ import (
// Server is the main server, providing the UI and API for ntfy
type Server struct {
config *Config
httpServer *http.Server
httpsServer *http.Server
unixListener net.Listener
smtpServer *smtp.Server
smtpBackend *smtpBackend
topics map[string]*topic
visitors map[string]*visitor
firebaseClient *firebaseClient
mailer mailer
messages int64
auth auth.Auther
messageCache *messageCache
fileCache *fileCache
closeChan chan bool
mu sync.Mutex
config *Config
httpServer *http.Server
httpsServer *http.Server
unixListener net.Listener
smtpServer *smtp.Server
smtpServerBackend *smtpBackend
smtpSender mailer
topics map[string]*topic
visitors map[string]*visitor
firebaseClient *firebaseClient
messages int64
auth auth.Auther
messageCache *messageCache
fileCache *fileCache
closeChan chan bool
mu sync.Mutex
}
// handleFunc extends the normal http.HandlerFunc to be able to easily return errors
@ -147,7 +147,7 @@ func New(conf *Config) (*Server, error) {
messageCache: messageCache,
fileCache: fileCache,
firebaseClient: firebaseClient,
mailer: mailer,
smtpSender: mailer,
topics: topics,
auth: auther,
visitors: make(map[string]*visitor),
@ -246,14 +246,14 @@ func (s *Server) Stop() {
func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
v := s.visitor(r)
log.Debug("%s HTTP %s %s", v.ip, r.Method, r.URL.Path)
log.Debug("%s Dispatching request", logHTTPPrefix(v, r))
if err := s.handleInternal(w, r, v); err != nil {
if websocket.IsWebSocketUpgrade(r) {
isNormalError := websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || strings.Contains(err.Error(), "i/o timeout")
isNormalError := strings.Contains(err.Error(), "i/o timeout")
if isNormalError {
log.Debug("%s WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error())
log.Debug("%s WebSocket error (this error is okay, it happens a lot): %s", logHTTPPrefix(v, r), err.Error())
} else {
log.Warn("%s WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error())
log.Warn("%s WebSocket error: %s", logHTTPPrefix(v, r), err.Error())
}
return // Do not attempt to write to upgraded connection
}
@ -261,13 +261,12 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
if !ok {
httpErr = errHTTPInternalError
}
isNormalError := httpErr.Code == 404
isNormalError := httpErr.HTTPCode == http.StatusNotFound
if isNormalError {
log.Debug("%s HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error())
log.Debug("%s Connection closed with HTTP %d (ntfy error %d): %s", logHTTPPrefix(v, r), httpErr.HTTPCode, httpErr.Code, err.Error())
} else {
log.Info("%s HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error())
log.Info("%s Connection closed with HTTP %d (ntfy error %d): %s", logHTTPPrefix(v, r), httpErr.HTTPCode, httpErr.Code, err.Error())
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
w.WriteHeader(httpErr.HTTPCode)
@ -444,8 +443,11 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
m.Message = emptyMessageBody
}
delayed := m.Time > time.Now().Unix()
log.Debug("%s Received message: ev=%s, body=%d bytes, delayed=%t, fb=%t, cache=%t, up=%t, email=%s",
logPrefix(v, m), m.Event, len(body.PeekedBytes), delayed, firebase, cache, unifiedpush, email)
log.Debug("%s Received message: event=%s, body=%d byte(s), delayed=%t, firebase=%t, cache=%t, up=%t, email=%s",
logMessagePrefix(v, m), m.Event, len(m.Message), delayed, firebase, cache, unifiedpush, email)
if log.IsTrace() {
log.Trace("%s Message body: %s", logMessagePrefix(v, m), maybeMarshalJSON(m))
}
if !delayed {
if err := t.Publish(v, m); err != nil {
return err
@ -453,14 +455,14 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
if s.firebaseClient != nil && firebase {
go s.sendToFirebase(v, m)
}
if s.mailer != nil && email != "" {
if s.smtpSender != nil && email != "" {
go s.sendEmail(v, m, email)
}
if s.config.UpstreamBaseURL != "" {
go s.forwardPollRequest(v, m)
}
} else {
log.Debug("%s Message delayed, will process later", logPrefix(v, m))
log.Debug("%s Message delayed, will process later", logMessagePrefix(v, m))
}
if cache {
if err := s.messageCache.AddMessage(m); err != nil {
@ -479,16 +481,16 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
}
func (s *Server) sendToFirebase(v *visitor, m *message) {
log.Debug("%s Publishing to Firebase", logPrefix(v, m))
log.Debug("%s Publishing to Firebase", logMessagePrefix(v, m))
if err := s.firebaseClient.Send(v, m); err != nil {
log.Warn("%s Unable to publish to Firebase: %v", logPrefix(v, m), err.Error())
log.Warn("%s Unable to publish to Firebase: %v", logMessagePrefix(v, m), err.Error())
}
}
func (s *Server) sendEmail(v *visitor, m *message, email string) {
log.Debug("%s Sending email to %s", logPrefix(v, m), email)
if err := s.mailer.Send(v.ip, email, m); err != nil {
log.Warn("%s Unable to send email: %v", logPrefix(v, m), err.Error())
log.Debug("%s Sending email to %s", logMessagePrefix(v, m), email)
if err := s.smtpSender.Send(v, m, email); err != nil {
log.Warn("%s Unable to send email: %v", logMessagePrefix(v, m), err.Error())
}
}
@ -496,10 +498,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) {
topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
log.Debug("%s Publishing poll request to %s", logPrefix(v, m), forwardURL)
log.Debug("%s Publishing poll request to %s", logMessagePrefix(v, m), forwardURL)
req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
if err != nil {
log.Warn("%s Unable to publish poll request: %v", logPrefix(v, m), err.Error())
log.Warn("%s Unable to publish poll request: %v", logMessagePrefix(v, m), err.Error())
return
}
req.Header.Set("X-Poll-ID", m.ID)
@ -508,10 +510,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) {
}
response, err := httpClient.Do(req)
if err != nil {
log.Warn("%s Unable to publish poll request: %v", logPrefix(v, m), err.Error())
log.Warn("%s Unable to publish poll request: %v", logMessagePrefix(v, m), err.Error())
return
} else if response.StatusCode != http.StatusOK {
log.Warn("%s Unable to publish poll request, unexpected HTTP status: %d", logPrefix(v, m), response.StatusCode)
log.Warn("%s Unable to publish poll request, unexpected HTTP status: %d", logMessagePrefix(v, m), response.StatusCode)
return
}
}
@ -553,7 +555,7 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca
return false, false, "", false, errHTTPTooManyRequestsLimitEmails
}
}
if s.mailer == nil && email != "" {
if s.smtpSender == nil && email != "" {
return false, false, "", false, errHTTPBadRequestEmailDisabled
}
messageStr := strings.ReplaceAll(readParam(r, "x-message", "message", "m"), "\\n", "\n")
@ -627,7 +629,7 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca
// If file.txt is > message limit, treat it as an attachment
func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, unifiedpush bool) error {
if m.Event == pollRequestEvent { // Case 1
return nil
return s.handleBodyDiscard(body)
} else if unifiedpush {
return s.handleBodyAsMessageAutoDetect(m, body) // Case 2
} else if m.Attachment != nil && m.Attachment.URL != "" {
@ -640,6 +642,12 @@ func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body
return s.handleBodyAsAttachment(r, v, m, body) // Case 6
}
func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error {
_, err := io.Copy(io.Discard, body)
_ = body.Close()
return err
}
func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error {
if utf8.Valid(body.PeekedBytes) {
m.Message = string(body.PeekedBytes) // Do not trim
@ -739,6 +747,8 @@ func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *v
}
func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
log.Debug("%s HTTP stream connection opened", logHTTPPrefix(v, r))
defer log.Debug("%s HTTP stream connection closed", logHTTPPrefix(v, r))
if err := v.SubscriptionAllowed(); err != nil {
return errHTTPTooManyRequestsLimitSubscriptions
}
@ -795,6 +805,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
case <-r.Context().Done():
return nil
case <-time.After(s.config.KeepaliveInterval):
log.Trace("%s Sending keepalive message", logHTTPPrefix(v, r))
v.Keepalive()
if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
return err
@ -811,6 +822,8 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
return errHTTPTooManyRequestsLimitSubscriptions
}
defer v.RemoveSubscription()
log.Debug("%s WebSocket connection opened", logHTTPPrefix(v, r))
defer log.Debug("%s WebSocket connection closed", logHTTPPrefix(v, r))
topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
if err != nil {
return err
@ -840,6 +853,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
return err
}
conn.SetPongHandler(func(appData string) error {
log.Trace("%s Received WebSocket pong", logHTTPPrefix(v, r))
return conn.SetReadDeadline(time.Now().Add(pongWait))
})
for {
@ -856,6 +870,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
return err
}
log.Trace("%s Sending WebSocket ping", logHTTPPrefix(v, r))
return conn.WriteMessage(websocket.PingMessage, nil)
}
for {
@ -901,8 +916,9 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
return err
}
err = g.Wait()
if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
return nil // Normal closures are not errors
if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Trace("%s WebSocket connection closed: %s", logHTTPPrefix(v, r), err.Error())
return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
}
return err
}
@ -1025,12 +1041,15 @@ func (s *Server) updateStatsAndPrune() {
defer s.mu.Unlock()
// Expire visitors from rate visitors map
staleVisitors := 0
for ip, v := range s.visitors {
if v.Stale() {
log.Debug("Deleting stale visitor %s", v.ip)
delete(s.visitors, ip)
staleVisitors++
}
}
log.Debug("Manager: Deleted %d stale visitor(s)", staleVisitors)
// Delete expired attachments
if s.fileCache != nil {
@ -1038,20 +1057,20 @@ func (s *Server) updateStatsAndPrune() {
if err != nil {
log.Warn("Error retrieving expired attachments: %s", err.Error())
} else if len(ids) > 0 {
log.Debug("Deleting expired attachments: %v", ids)
log.Debug("Manager: Deleting expired attachments: %v", ids)
if err := s.fileCache.Remove(ids...); err != nil {
log.Warn("Error deleting attachments: %s", err.Error())
}
} else {
log.Debug("No expired attachments to delete")
log.Debug("Manager: No expired attachments to delete")
}
}
// Prune message cache
olderThan := time.Now().Add(-1 * s.config.CacheDuration)
log.Debug("Pruning messages older tha %v", olderThan)
log.Debug("Manager: Pruning messages older than %s", olderThan.Format("2006-01-02 15:04:05"))
if err := s.messageCache.Prune(olderThan); err != nil {
log.Warn("Error pruning cache: %s", err.Error())
log.Warn("Manager: Error pruning cache: %s", err.Error())
}
// Prune old topics, remove subscriptions without subscribers
@ -1060,7 +1079,7 @@ func (s *Server) updateStatsAndPrune() {
subs := t.Subscribers()
msgs, err := s.messageCache.MessageCount(t.ID)
if err != nil {
log.Warn("Cannot get stats for topic %s: %s", t.ID, err.Error())
log.Warn("Manager: Cannot get stats for topic %s: %s", t.ID, err.Error())
continue
}
if msgs == 0 && subs == 0 {
@ -1072,19 +1091,25 @@ func (s *Server) updateStatsAndPrune() {
}
// Mail stats
var mailSuccess, mailFailure int64
if s.smtpBackend != nil {
mailSuccess, mailFailure = s.smtpBackend.Counts()
var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64
if s.smtpServerBackend != nil {
receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts()
}
var sentMailTotal, sentMailSuccess, sentMailFailure int64
if s.smtpSender != nil {
sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts()
}
// Print stats
log.Info("Stats: %d message(s) published, %d in cache, %d successful mails, %d failed, %d topic(s) active, %d subscriber(s), %d visitor(s)",
s.messages, messages, mailSuccess, mailFailure, len(s.topics), subscribers, len(s.visitors))
log.Info("Stats: %d messages published, %d in cache, %d topic(s) active, %d subscriber(s), %d visitor(s), %d mails received (%d successful, %d failed), %d mails sent (%d successful, %d failed)",
s.messages, messages, len(s.topics), subscribers, len(s.visitors),
receivedMailTotal, receivedMailSuccess, receivedMailFailure,
sentMailTotal, sentMailSuccess, sentMailFailure)
}
func (s *Server) runSMTPServer() error {
s.smtpBackend = newMailBackend(s.config, s.handle)
s.smtpServer = smtp.NewServer(s.smtpBackend)
s.smtpServerBackend = newMailBackend(s.config, s.handle)
s.smtpServer = smtp.NewServer(s.smtpServerBackend)
s.smtpServer.Addr = s.config.SMTPServerListen
s.smtpServer.Domain = s.config.SMTPServerDomain
s.smtpServer.ReadTimeout = 10 * time.Second
@ -1099,7 +1124,6 @@ func (s *Server) runManager() {
for {
select {
case <-time.After(s.config.ManagerInterval):
log.Debug("Running manager")
s.updateStatsAndPrune()
case <-s.closeChan:
return
@ -1107,19 +1131,6 @@ func (s *Server) runManager() {
}
}
func (s *Server) runDelayedSender() {
for {
select {
case <-time.After(s.config.DelayedSenderInterval):
if err := s.sendDelayedMessages(); err != nil {
log.Warn("error sending scheduled messages: %s", err.Error())
}
case <-s.closeChan:
return
}
}
}
func (s *Server) runFirebaseKeepaliver() {
if s.firebaseClient == nil {
return
@ -1137,6 +1148,19 @@ func (s *Server) runFirebaseKeepaliver() {
}
}
func (s *Server) runDelayedSender() {
for {
select {
case <-time.After(s.config.DelayedSenderInterval):
if err := s.sendDelayedMessages(); err != nil {
log.Warn("Error sending delayed messages: %s", err.Error())
}
case <-s.closeChan:
return
}
}
}
func (s *Server) sendDelayedMessages() error {
messages, err := s.messageCache.MessagesDue()
if err != nil {
@ -1145,7 +1169,7 @@ func (s *Server) sendDelayedMessages() error {
for _, m := range messages {
v := s.visitorFromIP(m.Sender)
if err := s.sendDelayedMessage(v, m); err != nil {
log.Warn("%s Error sending delayed message: %s", logPrefix(v, m), err.Error())
log.Warn("%s Error sending delayed message: %s", logMessagePrefix(v, m), err.Error())
}
}
return nil
@ -1154,13 +1178,13 @@ func (s *Server) sendDelayedMessages() error {
func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
s.mu.Lock()
defer s.mu.Unlock()
log.Debug("%s Sending delayed message", logPrefix(v, m))
log.Debug("%s Sending delayed message", logMessagePrefix(v, m))
t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
if ok {
go func() {
// We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
if err := t.Publish(v, m); err != nil {
log.Warn("%s Unable to publish message: %v", logPrefix(v, m), err.Error())
log.Warn("%s Unable to publish message: %v", logMessagePrefix(v, m), err.Error())
}
}()
}
@ -1333,7 +1357,3 @@ func (s *Server) visitorFromIP(ip string) *visitor {
v.Keepalive()
return v
}
func logPrefix(v *visitor, m *message) string {
return fmt.Sprintf("%s/%s/%s", v.ip, m.Topic, m.ID)
}