WIP: Logging

This commit is contained in:
binwiederhier 2023-02-03 22:21:50 -05:00
parent af4175a5bc
commit a6641980c2
15 changed files with 631 additions and 168 deletions

View file

@ -138,6 +138,19 @@ const (
wsPongWait = 15 * time.Second
)
// Log tags
const (
tagPublish = "publish"
tagFirebase = "firebase"
tagEmail = "email" // Send email
tagSMTP = "smtp" // Receive email
tagPay = "pay"
tagAccount = "account"
tagManager = "manager"
tagResetter = "resetter"
tagWebsocket = "websocket"
)
// New instantiates a new Server. It creates the cache and adds a Firebase
// subscriber (if configured).
func New(conf *Config) (*Server, error) {
@ -305,9 +318,9 @@ func (s *Server) closeDatabases() {
func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
v, err := s.maybeAuthenticate(r) // Note: Always returns v, even when error is returned
if err == nil {
log.Debug("%s Dispatching request", logHTTPPrefix(v, r))
logvr(v, r).Debug("Dispatching request")
if log.IsTrace() {
log.Trace("%s Entire request (headers and body):\n%s", logHTTPPrefix(v, r), renderHTTPRequest(r))
logvr(v, r).Trace("Entire request (headers and body):\n%s", renderHTTPRequest(r))
}
err = s.handleInternal(w, r, v)
}
@ -315,9 +328,9 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
if websocket.IsWebSocketUpgrade(r) {
isNormalError := strings.Contains(err.Error(), "i/o timeout")
if isNormalError {
log.Debug("%s WebSocket error (this error is okay, it happens a lot): %s", logHTTPPrefix(v, r), err.Error())
logvr(v, r).Tag(tagWebsocket).Debug("WebSocket error (this error is okay, it happens a lot): %s", err.Error())
} else {
log.Info("%s WebSocket error: %s", logHTTPPrefix(v, r), err.Error())
logvr(v, r).Tag(tagWebsocket).Info("WebSocket error: %s", err.Error())
}
return // Do not attempt to write to upgraded connection
}
@ -331,9 +344,21 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
}
isNormalError := httpErr.HTTPCode == http.StatusNotFound || httpErr.HTTPCode == http.StatusBadRequest
if isNormalError {
log.Debug("%s Connection closed with HTTP %d (ntfy error %d): %s", logHTTPPrefix(v, r), httpErr.HTTPCode, httpErr.Code, err.Error())
logvr(v, r).
Fields(map[string]any{
"error": err,
"error_code": httpErr.Code,
"http_status": httpErr.HTTPCode,
}).
Debug("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error())
} else {
log.Info("%s Connection closed with HTTP %d (ntfy error %d): %s", logHTTPPrefix(v, r), httpErr.HTTPCode, httpErr.Code, err.Error())
logvr(v, r).
Fields(map[string]any{
"error": err,
"error_code": httpErr.Code,
"http_status": httpErr.HTTPCode,
}).
Info("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error())
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
@ -586,10 +611,20 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
m.Message = emptyMessageBody
}
delayed := m.Time > time.Now().Unix()
log.Debug("%s Received message: event=%s, user=%s, body=%d byte(s), delayed=%t, firebase=%t, cache=%t, up=%t, email=%s",
logMessagePrefix(v, m), m.Event, m.User, len(m.Message), delayed, firebase, cache, unifiedpush, email)
logvrm(v, r, m).
Tag(tagPublish).
Fields(map[string]any{
"message_delayed": delayed,
"message_firebase": firebase,
"message_unifiedpush": unifiedpush,
"message_email": email,
}).
Debug("Received message")
if log.IsTrace() {
log.Trace("%s Message body: %s", logMessagePrefix(v, m), util.MaybeMarshalJSON(m))
logvrm(v, r, m).
Tag(tagPublish).
Field("message_body", util.MaybeMarshalJSON(m)).
Trace("Message body")
}
if !delayed {
if err := t.Publish(v, m); err != nil {
@ -605,10 +640,10 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
go s.forwardPollRequest(v, m)
}
} else {
log.Debug("%s Message delayed, will process later", logMessagePrefix(v, m))
logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
}
if cache {
log.Debug("%s Adding message to cache", logMessagePrefix(v, m))
logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
if err := s.messageCache.AddMessage(m); err != nil {
return nil, err
}
@ -640,20 +675,20 @@ func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *
}
func (s *Server) sendToFirebase(v *visitor, m *message) {
log.Debug("%s Publishing to Firebase", logMessagePrefix(v, m))
logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
if err := s.firebaseClient.Send(v, m); err != nil {
if err == errFirebaseTemporarilyBanned {
log.Debug("%s Unable to publish to Firebase: %v", logMessagePrefix(v, m), err.Error())
logvm(v, m).Tag(tagFirebase).Err(err).Debug("Unable to publish to Firebase: %v", err.Error())
} else {
log.Warn("%s Unable to publish to Firebase: %v", logMessagePrefix(v, m), err.Error())
logvm(v, m).Tag(tagFirebase).Err(err).Warn("Unable to publish to Firebase: %v", err.Error())
}
}
}
func (s *Server) sendEmail(v *visitor, m *message, email string) {
log.Debug("%s Sending email to %s", logMessagePrefix(v, m), email)
logvm(v, m).Tag(tagEmail).Field("email", email).Debug("Sending email to %s", email)
if err := s.smtpSender.Send(v, m, email); err != nil {
log.Warn("%s Unable to send email to %s: %v", logMessagePrefix(v, m), email, err.Error())
logvm(v, m).Tag(tagEmail).Field("email", email).Err(err).Warn("Unable to send email to %s: %v", email, err.Error())
}
}
@ -661,10 +696,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) {
topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
log.Debug("%s Publishing poll request to %s", logMessagePrefix(v, m), forwardURL)
logvm(v, m).Debug("Publishing poll request to %s", forwardURL)
req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
if err != nil {
log.Warn("%s Unable to publish poll request: %v", logMessagePrefix(v, m), err.Error())
logvm(v, m).Err(err).Warn("Unable to publish poll request")
return
}
req.Header.Set("X-Poll-ID", m.ID)
@ -673,10 +708,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) {
}
response, err := httpClient.Do(req)
if err != nil {
log.Warn("%s Unable to publish poll request: %v", logMessagePrefix(v, m), err.Error())
logvm(v, m).Err(err).Warn("Unable to publish poll request")
return
} else if response.StatusCode != http.StatusOK {
log.Warn("%s Unable to publish poll request, unexpected HTTP status: %d", logMessagePrefix(v, m), response.StatusCode)
logvm(v, m).Err(err).Warn("Unable to publish poll request, unexpected HTTP status: %d")
return
}
}
@ -924,8 +959,8 @@ func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *v
}
func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
log.Debug("%s HTTP stream connection opened", logHTTPPrefix(v, r))
defer log.Debug("%s HTTP stream connection closed", logHTTPPrefix(v, r))
logvr(v, r).Debug("HTTP stream connection opened")
defer logvr(v, r).Debug("HTTP stream connection closed")
if !v.SubscriptionAllowed() {
return errHTTPTooManyRequestsLimitSubscriptions
}
@ -993,7 +1028,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
case <-r.Context().Done():
return nil
case <-time.After(s.config.KeepaliveInterval):
log.Trace("%s Sending keepalive message", logHTTPPrefix(v, r))
logvr(v, r).Trace("Sending keepalive message")
v.Keepalive()
if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
return err
@ -1010,8 +1045,8 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
return errHTTPTooManyRequestsLimitSubscriptions
}
defer v.RemoveSubscription()
log.Debug("%s WebSocket connection opened", logHTTPPrefix(v, r))
defer log.Debug("%s WebSocket connection closed", logHTTPPrefix(v, r))
logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection opened")
defer logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection closed")
topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
if err != nil {
return err
@ -1047,7 +1082,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
return err
}
conn.SetPongHandler(func(appData string) error {
log.Trace("%s Received WebSocket pong", logHTTPPrefix(v, r))
logvr(v, r).Tag(tagWebsocket).Trace("Received WebSocket pong")
return conn.SetReadDeadline(time.Now().Add(pongWait))
})
for {
@ -1069,7 +1104,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
return err
}
log.Trace("%s Sending WebSocket ping", logHTTPPrefix(v, r))
logvr(v, r).Tag(tagWebsocket).Trace("Sending WebSocket ping")
return conn.WriteMessage(websocket.PingMessage, nil)
}
for {
@ -1077,7 +1112,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
case <-gctx.Done():
return nil
case <-cancelCtx.Done():
log.Trace("%s Cancel received, closing subscriber connection", logHTTPPrefix(v, r))
logvr(v, r).Tag(tagWebsocket).Trace("Cancel received, closing subscriber connection")
conn.Close()
return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
case <-time.After(s.config.KeepaliveInterval):
@ -1120,7 +1155,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
}
err = g.Wait()
if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Trace("%s WebSocket connection closed: %s", logHTTPPrefix(v, r), err.Error())
logvr(v, r).Tag(tagWebsocket).Err(err).Trace("WebSocket connection closed")
return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
}
return err
@ -1251,8 +1286,8 @@ func (s *Server) topicFromID(id string) (*topic, error) {
}
func (s *Server) execManager() {
log.Debug("Manager: Starting")
defer log.Debug("Manager: Finished")
log.Tag(tagManager).Debug("Starting manager")
defer log.Tag(tagManager).Debug("Finished manager")
// WARNING: Make sure to only selectively lock with the mutex, and be aware that this
// there is no mutex for the entire function.
@ -1393,13 +1428,13 @@ func (s *Server) runStatsResetter() {
for {
runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
timer := time.NewTimer(time.Until(runAt))
log.Debug("Stats resetter: Waiting until %v to reset visitor stats", runAt)
log.Tag(tagResetter).Debug("Waiting until %v to reset visitor stats", runAt)
select {
case <-timer.C:
log.Debug("Stats resetter: Running")
log.Tag(tagResetter).Debug("Running stats resetter")
s.resetStats()
case <-s.closeChan:
log.Debug("Stats resetter: Stopping timer")
log.Tag(tagResetter).Debug("Stopping stats resetter")
timer.Stop()
return
}
@ -1415,7 +1450,7 @@ func (s *Server) resetStats() {
}
if s.userManager != nil {
if err := s.userManager.ResetStats(); err != nil {
log.Warn("Failed to write to database: %s", err.Error())
log.Tag(tagResetter).Warn("Failed to write to database: %s", err.Error())
}
}
}
@ -1442,7 +1477,7 @@ func (s *Server) runDelayedSender() {
select {
case <-time.After(s.config.DelayedSenderInterval):
if err := s.sendDelayedMessages(); err != nil {
log.Warn("Error sending delayed messages: %s", err.Error())
log.Tag(tagPublish).Err(err).Warn("Error sending delayed messages")
}
case <-s.closeChan:
return
@ -1460,20 +1495,20 @@ func (s *Server) sendDelayedMessages() error {
if s.userManager != nil && m.User != "" {
u, err = s.userManager.User(m.User)
if err != nil {
log.Warn("Error sending delayed message %s: %s", m.ID, err.Error())
log.Context(m).Err(err).Warn("Error sending delayed message")
continue
}
}
v := s.visitor(m.Sender, u)
if err := s.sendDelayedMessage(v, m); err != nil {
log.Warn("%s Error sending delayed message: %s", logMessagePrefix(v, m), err.Error())
logvm(v, m).Err(err).Warn("Error sending delayed message")
}
}
return nil
}
func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
log.Debug("%s Sending delayed message", logMessagePrefix(v, m))
logvm(v, m).Debug("Sending delayed message")
s.mu.Lock()
t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
s.mu.Unlock()
@ -1481,7 +1516,7 @@ func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
go func() {
// We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
if err := t.Publish(v, m); err != nil {
log.Warn("%s Unable to publish message: %v", logMessagePrefix(v, m), err.Error())
logvm(v, m).Err(err).Warn("Unable to publish message")
}
}()
}
@ -1595,7 +1630,7 @@ func (s *Server) autorizeTopic(next handleFunc, perm user.Permission) handleFunc
u := v.User()
for _, t := range topics {
if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
log.Info("unauthorized: %s", err.Error())
logvr(v, r).Err(err).Debug("Unauthorized")
return errHTTPForbidden
}
}
@ -1609,7 +1644,7 @@ func (s *Server) maybeAuthenticate(r *http.Request) (v *visitor, err error) {
ip := extractIPAddress(r, s.config.BehindProxy)
var u *user.User // may stay nil if no auth header!
if u, err = s.authenticate(r); err != nil {
log.Debug("authentication failed: %s", err.Error())
logr(r).Debug("Authentication failed: %s", err.Error())
err = errHTTPUnauthorized // Always return visitor, even when error occurs!
}
v = s.visitor(ip, u)