Continued logging work
This commit is contained in:
parent
27bd79febf
commit
7cc8c81bd8
28 changed files with 287 additions and 171 deletions
|
@ -44,6 +44,7 @@ func (c *fileCache) Write(id string, in io.Reader, limiters ...util.Limiter) (in
|
|||
if !fileIDRegex.MatchString(id) {
|
||||
return 0, errInvalidFileID
|
||||
}
|
||||
log.Tag(tagFileCache).Field("message_id", id).Debug("Writing attachment")
|
||||
file := filepath.Join(c.dir, id)
|
||||
if _, err := os.Stat(file); err == nil {
|
||||
return 0, errFileExists
|
||||
|
@ -75,10 +76,10 @@ func (c *fileCache) Remove(ids ...string) error {
|
|||
if !fileIDRegex.MatchString(id) {
|
||||
return errInvalidFileID
|
||||
}
|
||||
log.Debug("File Cache: Deleting attachment %s", id)
|
||||
log.Tag(tagFileCache).Field("message_id", id).Debug("Deleting attachment")
|
||||
file := filepath.Join(c.dir, id)
|
||||
if err := os.Remove(file); err != nil {
|
||||
log.Debug("File Cache: Error deleting attachment %s: %s", id, err.Error())
|
||||
log.Tag(tagFileCache).Field("message_id", id).Err(err).Debug("Error deleting attachment")
|
||||
}
|
||||
}
|
||||
size, err := dirSize(c.dir)
|
||||
|
|
|
@ -11,30 +11,36 @@ import (
|
|||
"unicode/utf8"
|
||||
)
|
||||
|
||||
// logr creates a new log event with HTTP request fields
|
||||
func logr(r *http.Request) *log.Event {
|
||||
return log.Fields(httpFields(r))
|
||||
}
|
||||
|
||||
// logr creates a new log event with visitor fields
|
||||
func logv(v *visitor) *log.Event {
|
||||
return log.Context(v)
|
||||
return log.With(v)
|
||||
}
|
||||
|
||||
// logr creates a new log event with HTTP request and visitor fields
|
||||
func logvr(v *visitor, r *http.Request) *log.Event {
|
||||
return logv(v).Fields(httpFields(r))
|
||||
}
|
||||
|
||||
// logvrm creates a new log event with HTTP request, visitor fields and message fields
|
||||
func logvrm(v *visitor, r *http.Request, m *message) *log.Event {
|
||||
return logvr(v, r).Context(m)
|
||||
return logvr(v, r).With(m)
|
||||
}
|
||||
|
||||
// logvrm creates a new log event with visitor fields and message fields
|
||||
func logvm(v *visitor, m *message) *log.Event {
|
||||
return logv(v).Context(m)
|
||||
return logv(v).With(m)
|
||||
}
|
||||
|
||||
// logem creates a new log event with email fields
|
||||
func logem(state *smtp.ConnectionState) *log.Event {
|
||||
return log.
|
||||
Tag(tagSMTP).
|
||||
Fields(map[string]any{
|
||||
Fields(log.Context{
|
||||
"smtp_hostname": state.Hostname,
|
||||
"smtp_remote_addr": state.RemoteAddr.String(),
|
||||
})
|
||||
|
|
|
@ -369,10 +369,10 @@ func (c *messageCache) addMessages(ms []*message) error {
|
|||
}
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Error("Message Cache: Writing %d message(s) failed (took %v)", len(ms), time.Since(start))
|
||||
log.Tag(tagMessageCache).Err(err).Error("Writing %d message(s) failed (took %v)", len(ms), time.Since(start))
|
||||
return err
|
||||
}
|
||||
log.Debug("Message Cache: Wrote %d message(s) in %v", len(ms), time.Since(start))
|
||||
log.Tag(tagMessageCache).Debug("Wrote %d message(s) in %v", len(ms), time.Since(start))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -609,7 +609,7 @@ func (c *messageCache) processMessageBatches() {
|
|||
}
|
||||
for messages := range c.queue.Dequeue() {
|
||||
if err := c.addMessages(messages); err != nil {
|
||||
log.Error("Message Cache: %s", err.Error())
|
||||
log.Tag(tagMessageCache).Err(err).Error("Cannot write message batch")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -766,7 +766,7 @@ func setupNewCacheDB(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func migrateFrom0(db *sql.DB, _ time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 0 to 1")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 0 to 1")
|
||||
if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -780,7 +780,7 @@ func migrateFrom0(db *sql.DB, _ time.Duration) error {
|
|||
}
|
||||
|
||||
func migrateFrom1(db *sql.DB, _ time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 1 to 2")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 1 to 2")
|
||||
if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -791,7 +791,7 @@ func migrateFrom1(db *sql.DB, _ time.Duration) error {
|
|||
}
|
||||
|
||||
func migrateFrom2(db *sql.DB, _ time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 2 to 3")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 2 to 3")
|
||||
if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -802,7 +802,7 @@ func migrateFrom2(db *sql.DB, _ time.Duration) error {
|
|||
}
|
||||
|
||||
func migrateFrom3(db *sql.DB, _ time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 3 to 4")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 3 to 4")
|
||||
if _, err := db.Exec(migrate3To4AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -813,7 +813,7 @@ func migrateFrom3(db *sql.DB, _ time.Duration) error {
|
|||
}
|
||||
|
||||
func migrateFrom4(db *sql.DB, _ time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 4 to 5")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 4 to 5")
|
||||
if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -824,7 +824,7 @@ func migrateFrom4(db *sql.DB, _ time.Duration) error {
|
|||
}
|
||||
|
||||
func migrateFrom5(db *sql.DB, _ time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 5 to 6")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 5 to 6")
|
||||
if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -835,7 +835,7 @@ func migrateFrom5(db *sql.DB, _ time.Duration) error {
|
|||
}
|
||||
|
||||
func migrateFrom6(db *sql.DB, _ time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 6 to 7")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 6 to 7")
|
||||
if _, err := db.Exec(migrate6To7AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -846,7 +846,7 @@ func migrateFrom6(db *sql.DB, _ time.Duration) error {
|
|||
}
|
||||
|
||||
func migrateFrom7(db *sql.DB, _ time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 7 to 8")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 7 to 8")
|
||||
if _, err := db.Exec(migrate7To8AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -857,7 +857,7 @@ func migrateFrom7(db *sql.DB, _ time.Duration) error {
|
|||
}
|
||||
|
||||
func migrateFrom8(db *sql.DB, _ time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 8 to 9")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 8 to 9")
|
||||
if _, err := db.Exec(migrate8To9AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -868,7 +868,7 @@ func migrateFrom8(db *sql.DB, _ time.Duration) error {
|
|||
}
|
||||
|
||||
func migrateFrom9(db *sql.DB, cacheDuration time.Duration) error {
|
||||
log.Info("Migrating cache database schema: from 9 to 10")
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 9 to 10")
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -40,7 +40,6 @@ import (
|
|||
- HIGH CLI "ntfy tier [add|list|delete]"
|
||||
- HIGH CLI "ntfy user" should show tier
|
||||
- HIGH Self-review
|
||||
- HIGH Stripe webhook failures cannot be diagnosed because of missing logs
|
||||
- MEDIUM: Test for expiring messages after reservation removal
|
||||
- MEDIUM: Test new token endpoints & never-expiring token
|
||||
- LOW: UI: Flickering upgrade banner when logging in
|
||||
|
@ -140,16 +139,18 @@ const (
|
|||
|
||||
// Log tags
|
||||
const (
|
||||
tagPublish = "publish"
|
||||
tagFirebase = "firebase"
|
||||
tagEmail = "email" // Send email
|
||||
tagSMTP = "smtp" // Receive email
|
||||
tagPay = "pay"
|
||||
tagAccount = "account"
|
||||
tagManager = "manager"
|
||||
tagResetter = "resetter"
|
||||
tagWebsocket = "websocket"
|
||||
tagMatrix = "matrix"
|
||||
tagPublish = "publish"
|
||||
tagFirebase = "firebase"
|
||||
tagEmail = "email" // Send email
|
||||
tagSMTP = "smtp" // Receive email
|
||||
tagFileCache = "file_cache"
|
||||
tagMessageCache = "message_cache"
|
||||
tagStripe = "stripe"
|
||||
tagAccount = "account"
|
||||
tagManager = "manager"
|
||||
tagResetter = "resetter"
|
||||
tagWebsocket = "websocket"
|
||||
tagMatrix = "matrix"
|
||||
)
|
||||
|
||||
// New instantiates a new Server. It creates the cache and adds a Firebase
|
||||
|
@ -234,6 +235,10 @@ func (s *Server) Run() error {
|
|||
listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
|
||||
}
|
||||
log.Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String())
|
||||
if log.IsFile() {
|
||||
fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s, log file is %s\n", listenStr, s.config.Version, log.File())
|
||||
fmt.Fprintln(os.Stderr, "No more output is expected.")
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", s.handle)
|
||||
errChan := make(chan error)
|
||||
|
@ -346,19 +351,19 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
|
|||
isNormalError := httpErr.HTTPCode == http.StatusNotFound || httpErr.HTTPCode == http.StatusBadRequest
|
||||
if isNormalError {
|
||||
logvr(v, r).
|
||||
Fields(map[string]any{
|
||||
"error": err,
|
||||
Fields(log.Context{
|
||||
"error_code": httpErr.Code,
|
||||
"http_status": httpErr.HTTPCode,
|
||||
}).
|
||||
Err(err).
|
||||
Debug("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error())
|
||||
} else {
|
||||
logvr(v, r).
|
||||
Fields(map[string]any{
|
||||
"error": err,
|
||||
Fields(log.Context{
|
||||
"error_code": httpErr.Code,
|
||||
"http_status": httpErr.HTTPCode,
|
||||
}).
|
||||
Err(err).
|
||||
Info("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error())
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
@ -614,7 +619,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
|
|||
delayed := m.Time > time.Now().Unix()
|
||||
logvrm(v, r, m).
|
||||
Tag(tagPublish).
|
||||
Fields(map[string]any{
|
||||
Fields(log.Context{
|
||||
"message_delayed": delayed,
|
||||
"message_firebase": firebase,
|
||||
"message_unifiedpush": unifiedpush,
|
||||
|
@ -1496,7 +1501,7 @@ func (s *Server) sendDelayedMessages() error {
|
|||
if s.userManager != nil && m.User != "" {
|
||||
u, err = s.userManager.User(m.User)
|
||||
if err != nil {
|
||||
log.Context(m).Err(err).Warn("Error sending delayed message")
|
||||
log.With(m).Err(err).Warn("Error sending delayed message")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
|
@ -233,10 +233,15 @@
|
|||
# stripe-secret-key:
|
||||
# stripe-webhook-key:
|
||||
|
||||
# Log level, can be TRACE, DEBUG, INFO, WARN or ERROR
|
||||
# Log level, can be "trace", "debug", "info", "warn" or "error"
|
||||
# This option can be hot-reloaded by calling "kill -HUP $pid" or "systemctl reload ntfy".
|
||||
#
|
||||
# Be aware that DEBUG (and particularly TRACE) can be VERY CHATTY. Only turn them on for
|
||||
# FIXME
|
||||
#
|
||||
# Be aware that "debug" (and particularly "trace"") can be VERY CHATTY. Only turn them on for
|
||||
# debugging purposes, or your disk will fill up quickly.
|
||||
#
|
||||
# log-level: INFO
|
||||
# log-level: info
|
||||
# log-level-overrides:
|
||||
# log-format: text
|
||||
# log-file:
|
||||
|
|
|
@ -2,7 +2,6 @@ package server
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"heckel.io/ntfy/log"
|
||||
"heckel.io/ntfy/user"
|
||||
"heckel.io/ntfy/util"
|
||||
"net/http"
|
||||
|
@ -155,7 +154,7 @@ func (s *Server) handleAccountDelete(w http.ResponseWriter, r *http.Request, v *
|
|||
return errHTTPBadRequestIncorrectPasswordConfirmation
|
||||
}
|
||||
if u.Billing.StripeSubscriptionID != "" {
|
||||
logvr(v, r).Tag(tagPay).Info("Canceling billing subscription for user %s", u.Name)
|
||||
logvr(v, r).Tag(tagStripe).Info("Canceling billing subscription for user %s", u.Name)
|
||||
if _, err := s.stripe.CancelSubscription(u.Billing.StripeSubscriptionID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -488,7 +487,7 @@ func (s *Server) maybeRemoveMessagesAndExcessReservations(r *http.Request, v *vi
|
|||
func (s *Server) publishSyncEventAsync(v *visitor) {
|
||||
go func() {
|
||||
if err := s.publishSyncEvent(v); err != nil {
|
||||
log.Trace("%s Error publishing to user's sync topic: %s", v.String(), err.Error())
|
||||
logv(v).Err(err).Trace("Error publishing to user's sync topic")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -499,7 +498,7 @@ func (s *Server) publishSyncEvent(v *visitor) error {
|
|||
if u == nil || u.SyncTopic == "" {
|
||||
return nil
|
||||
}
|
||||
log.Trace("Publishing sync event to user %s's sync topic %s", u.Name, u.SyncTopic)
|
||||
logv(v).Field("sync_topic", u.SyncTopic).Trace("Publishing sync event to user's sync topic")
|
||||
syncTopic, err := s.topicFromID(u.SyncTopic)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -683,7 +683,7 @@ func TestAccount_Reservation_Add_Kills_Other_Subscribers(t *testing.T) {
|
|||
func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) {
|
||||
conf := newTestConfigWithAuthFile(t)
|
||||
conf.AuthDefault = user.PermissionReadWrite
|
||||
conf.AuthStatsQueueWriterInterval = 100 * time.Millisecond
|
||||
conf.AuthStatsQueueWriterInterval = 200 * time.Millisecond
|
||||
s := newTestServer(t, conf)
|
||||
defer s.closeDatabases()
|
||||
|
||||
|
@ -706,7 +706,7 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) {
|
|||
require.Equal(t, 200, rr.Code)
|
||||
|
||||
// Wait for stats queue writer
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Verify that message stats were persisted
|
||||
u, err := s.userManager.User("phil")
|
||||
|
|
|
@ -46,7 +46,7 @@ func (c *firebaseClient) Send(v *visitor, m *message) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if log.IsTrace() {
|
||||
if log.Tag(tagFirebase).IsTrace() {
|
||||
logvm(v, m).
|
||||
Tag(tagFirebase).
|
||||
Field("firebase_message", util.MaybeMarshalJSON(fbm)).
|
||||
|
|
|
@ -121,8 +121,8 @@ func (s *Server) handleAccountBillingSubscriptionCreate(w http.ResponseWriter, r
|
|||
return errNotAPaidTier
|
||||
}
|
||||
logvr(v, r).
|
||||
Tag(tagPay).
|
||||
Fields(map[string]any{
|
||||
Tag(tagStripe).
|
||||
Fields(log.Context{
|
||||
"tier": tier,
|
||||
"stripe_price_id": tier.StripePriceID,
|
||||
}).
|
||||
|
@ -196,8 +196,8 @@ func (s *Server) handleAccountBillingSubscriptionCreateSuccess(w http.ResponseWr
|
|||
}
|
||||
v.SetUser(u)
|
||||
logvr(v, r).
|
||||
Tag(tagPay).
|
||||
Fields(map[string]any{
|
||||
Tag(tagStripe).
|
||||
Fields(log.Context{
|
||||
"tier_id": tier.ID,
|
||||
"tier_name": tier.Name,
|
||||
"stripe_price_id": tier.StripePriceID,
|
||||
|
@ -241,8 +241,8 @@ func (s *Server) handleAccountBillingSubscriptionUpdate(w http.ResponseWriter, r
|
|||
return err
|
||||
}
|
||||
logvr(v, r).
|
||||
Tag(tagPay).
|
||||
Fields(map[string]any{
|
||||
Tag(tagStripe).
|
||||
Fields(log.Context{
|
||||
"new_tier_id": tier.ID,
|
||||
"new_tier_name": tier.Name,
|
||||
"new_tier_stripe_price_id": tier.StripePriceID,
|
||||
|
@ -275,7 +275,7 @@ func (s *Server) handleAccountBillingSubscriptionUpdate(w http.ResponseWriter, r
|
|||
// handleAccountBillingSubscriptionDelete facilitates downgrading a paid user to a tier-less user,
|
||||
// and cancelling the Stripe subscription entirely
|
||||
func (s *Server) handleAccountBillingSubscriptionDelete(w http.ResponseWriter, r *http.Request, v *visitor) error {
|
||||
logvr(v, r).Tag(tagPay).Info("Deleting Stripe subscription")
|
||||
logvr(v, r).Tag(tagStripe).Info("Deleting Stripe subscription")
|
||||
u := v.User()
|
||||
if u.Billing.StripeSubscriptionID != "" {
|
||||
params := &stripe.SubscriptionParams{
|
||||
|
@ -292,7 +292,7 @@ func (s *Server) handleAccountBillingSubscriptionDelete(w http.ResponseWriter, r
|
|||
// handleAccountBillingPortalSessionCreate creates a session to the customer billing portal, and returns the
|
||||
// redirect URL. The billing portal allows customers to change their payment methods, and cancel the subscription.
|
||||
func (s *Server) handleAccountBillingPortalSessionCreate(w http.ResponseWriter, r *http.Request, v *visitor) error {
|
||||
logvr(v, r).Tag(tagPay).Info("Creating Stripe billing portal session")
|
||||
logvr(v, r).Tag(tagStripe).Info("Creating Stripe billing portal session")
|
||||
u := v.User()
|
||||
if u.Billing.StripeCustomerID == "" {
|
||||
return errHTTPBadRequestNotAPaidUser
|
||||
|
@ -338,7 +338,7 @@ func (s *Server) handleAccountBillingWebhook(_ http.ResponseWriter, r *http.Requ
|
|||
return s.handleAccountBillingWebhookSubscriptionDeleted(r, v, event)
|
||||
default:
|
||||
logvr(v, r).
|
||||
Tag(tagPay).
|
||||
Tag(tagStripe).
|
||||
Field("stripe_webhook_type", event.Type).
|
||||
Warn("Unhandled Stripe webhook event %s received", event.Type)
|
||||
return nil
|
||||
|
@ -354,8 +354,8 @@ func (s *Server) handleAccountBillingWebhookSubscriptionUpdated(r *http.Request,
|
|||
}
|
||||
subscriptionID, priceID := ev.ID, ev.Items.Data[0].Price.ID
|
||||
logvr(v, r).
|
||||
Tag(tagPay).
|
||||
Fields(map[string]any{
|
||||
Tag(tagStripe).
|
||||
Fields(log.Context{
|
||||
"stripe_webhook_type": event.Type,
|
||||
"stripe_customer_id": ev.Customer,
|
||||
"stripe_subscription_id": ev.ID,
|
||||
|
@ -400,7 +400,7 @@ func (s *Server) handleAccountBillingWebhookSubscriptionDeleted(r *http.Request,
|
|||
}
|
||||
v.SetUser(u)
|
||||
logvr(v, r).
|
||||
Tag(tagPay).
|
||||
Tag(tagStripe).
|
||||
Field("stripe_webhook_type", event.Type).
|
||||
Info("Subscription deleted, downgrading to unpaid tier")
|
||||
if err := s.updateSubscriptionAndTier(r, v, u, nil, ev.Customer, "", "", 0, 0); err != nil {
|
||||
|
@ -419,14 +419,14 @@ func (s *Server) updateSubscriptionAndTier(r *http.Request, v *visitor, u *user.
|
|||
return err
|
||||
}
|
||||
if tier == nil && u.Tier != nil {
|
||||
logvr(v, r).Tag(tagPay).Info("Resetting tier for user %s", u.Name)
|
||||
logvr(v, r).Tag(tagStripe).Info("Resetting tier for user %s", u.Name)
|
||||
if err := s.userManager.ResetTier(u.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if tier != nil && u.TierID() != tier.ID {
|
||||
logvr(v, r).
|
||||
Tag(tagPay).
|
||||
Fields(map[string]any{
|
||||
Tag(tagStripe).
|
||||
Fields(log.Context{
|
||||
"new_tier_id": tier.ID,
|
||||
"new_tier_name": tier.Name,
|
||||
"new_tier_stripe_price_id": tier.StripePriceID,
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/netip"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -26,8 +27,9 @@ import (
|
|||
"heckel.io/ntfy/util"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// log.SetLevel(log.DebugLevel)
|
||||
func TestMain(m *testing.M) {
|
||||
log.SetLevel(log.ErrorLevel)
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
func TestServer_PublishAndPoll(t *testing.T) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
_ "embed" // required by go:embed
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"heckel.io/ntfy/log"
|
||||
"heckel.io/ntfy/util"
|
||||
"mime"
|
||||
"net"
|
||||
|
@ -38,7 +39,7 @@ func (s *smtpSender) Send(v *visitor, m *message, to string) error {
|
|||
auth := smtp.PlainAuth("", s.config.SMTPSenderUser, s.config.SMTPSenderPass, host)
|
||||
logvm(v, m).
|
||||
Tag(tagEmail).
|
||||
Fields(map[string]any{
|
||||
Fields(log.Context{
|
||||
"email_via": s.config.SMTPSenderAddr,
|
||||
"email_user": s.config.SMTPSenderUser,
|
||||
"email_to": to,
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"heckel.io/ntfy/log"
|
||||
"heckel.io/ntfy/user"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
|
@ -42,7 +43,7 @@ type message struct {
|
|||
User string `json:"-"` // Username of the uploader, used to associated attachments
|
||||
}
|
||||
|
||||
func (m *message) Context() map[string]any {
|
||||
func (m *message) Context() log.Context {
|
||||
fields := map[string]any{
|
||||
"message_id": m.ID,
|
||||
"message_time": m.Time,
|
||||
|
|
|
@ -135,25 +135,14 @@ func newVisitor(conf *Config, messageCache *messageCache, userManager *user.Mana
|
|||
return v
|
||||
}
|
||||
|
||||
func (v *visitor) String() string {
|
||||
func (v *visitor) Context() log.Context {
|
||||
v.mu.Lock()
|
||||
defer v.mu.Unlock()
|
||||
return v.stringNoLock()
|
||||
return v.contextNoLock()
|
||||
}
|
||||
|
||||
func (v *visitor) stringNoLock() string {
|
||||
if v.user != nil && v.user.Billing.StripeCustomerID != "" {
|
||||
return fmt.Sprintf("%s/%s/%s", v.ip.String(), v.user.ID, v.user.Billing.StripeCustomerID)
|
||||
} else if v.user != nil {
|
||||
return fmt.Sprintf("%s/%s", v.ip.String(), v.user.ID)
|
||||
}
|
||||
return v.ip.String()
|
||||
}
|
||||
|
||||
func (v *visitor) Context() map[string]any {
|
||||
v.mu.Lock()
|
||||
defer v.mu.Unlock()
|
||||
fields := map[string]any{
|
||||
func (v *visitor) contextNoLock() log.Context {
|
||||
fields := log.Context{
|
||||
"visitor_ip": v.ip.String(),
|
||||
}
|
||||
if v.user != nil {
|
||||
|
@ -320,7 +309,7 @@ func (v *visitor) MaybeUserID() string {
|
|||
}
|
||||
|
||||
func (v *visitor) resetLimitersNoLock(messages, emails int64, enqueueUpdate bool) {
|
||||
log.Context(v).Debug("%s Resetting limiters for visitor", v.stringNoLock())
|
||||
log.Fields(v.contextNoLock()).Debug("Resetting limiters for visitor")
|
||||
limits := v.limitsNoLock()
|
||||
v.requestLimiter = rate.NewLimiter(limits.RequestLimitReplenish, limits.RequestLimitBurst)
|
||||
v.messagesLimiter = util.NewFixedLimiterWithValue(limits.MessageLimit, messages)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue