Logging in subscribe and publish command
This commit is contained in:
parent
5cc0b194d3
commit
e12995e218
10 changed files with 67 additions and 48 deletions
|
@ -7,9 +7,9 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"heckel.io/ntfy/log"
|
||||
"heckel.io/ntfy/util"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -102,6 +102,7 @@ func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishO
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
log.Debug("%s Publishing message with headers %s", util.ShortTopicURL(topicURL), req.Header)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -136,6 +137,7 @@ func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, err
|
|||
msgChan := make(chan *Message)
|
||||
errChan := make(chan error)
|
||||
topicURL := c.expandTopicURL(topic)
|
||||
log.Debug("%s Polling from topic", util.ShortTopicURL(topicURL))
|
||||
options = append(options, WithPoll())
|
||||
go func() {
|
||||
err := performSubscribeRequest(ctx, msgChan, topicURL, "", options...)
|
||||
|
@ -171,6 +173,7 @@ func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
|
|||
defer c.mu.Unlock()
|
||||
subscriptionID := util.RandomString(10)
|
||||
topicURL := c.expandTopicURL(topic)
|
||||
log.Debug("%s Subscribing to topic", util.ShortTopicURL(topicURL))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c.subscriptions[subscriptionID] = &subscription{
|
||||
ID: subscriptionID,
|
||||
|
@ -226,11 +229,11 @@ func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicUR
|
|||
// TODO The retry logic is crude and may lose messages. It should record the last message like the
|
||||
// Android client, use since=, and do incremental backoff too
|
||||
if err := performSubscribeRequest(ctx, msgChan, topicURL, subcriptionID, options...); err != nil {
|
||||
log.Printf("Connection to %s failed: %s", topicURL, err.Error())
|
||||
log.Warn("%s Connection failed: %s", util.ShortTopicURL(topicURL), err.Error())
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("Connection to %s exited", topicURL)
|
||||
log.Info("Connection exited", util.ShortTopicURL(topicURL))
|
||||
return
|
||||
case <-time.After(10 * time.Second): // TODO Add incremental backoff
|
||||
}
|
||||
|
@ -238,7 +241,9 @@ func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicUR
|
|||
}
|
||||
|
||||
func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, subscriptionID string, options ...SubscribeOption) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
|
||||
streamURL := fmt.Sprintf("%s/json", topicURL)
|
||||
log.Debug("%s Listening to %s", util.ShortTopicURL(topicURL), streamURL)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -261,10 +266,12 @@ func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicUR
|
|||
}
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
for scanner.Scan() {
|
||||
m, err := toMessage(scanner.Text(), topicURL, subscriptionID)
|
||||
messageJSON := scanner.Text()
|
||||
m, err := toMessage(messageJSON, topicURL, subscriptionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Trace("%s Message received: %s", util.ShortTopicURL(topicURL), messageJSON)
|
||||
if m.Event == MessageEvent {
|
||||
msgChan <- m
|
||||
}
|
||||
|
|
|
@ -5,12 +5,13 @@ import (
|
|||
"fmt"
|
||||
"github.com/urfave/cli/v2"
|
||||
"heckel.io/ntfy/client"
|
||||
"heckel.io/ntfy/log"
|
||||
"heckel.io/ntfy/util"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
|
@ -32,7 +33,6 @@ var flagsSubscribe = append(
|
|||
&cli.BoolFlag{Name: "from-config", Aliases: []string{"C"}, Usage: "read subscriptions from config file (service mode)"},
|
||||
&cli.BoolFlag{Name: "poll", Aliases: []string{"p"}, Usage: "return events and exit, do not listen for new events"},
|
||||
&cli.BoolFlag{Name: "scheduled", Aliases: []string{"sched", "S"}, Usage: "also return scheduled/delayed events"},
|
||||
&cli.BoolFlag{Name: "verbose", Aliases: []string{"v"}, Usage: "print verbose output"},
|
||||
)
|
||||
|
||||
var cmdSubscribe = &cli.Command{
|
||||
|
@ -190,6 +190,7 @@ func doSubscribe(c *cli.Context, cl *client.Client, conf *client.Config, topic,
|
|||
if !ok {
|
||||
continue
|
||||
}
|
||||
log.Debug("%s Dispatching received message: %s", logMessagePrefix(m), m.Raw)
|
||||
printMessageOrRunCommand(c, m, cmd)
|
||||
}
|
||||
return nil
|
||||
|
@ -199,26 +200,26 @@ func printMessageOrRunCommand(c *cli.Context, m *client.Message, command string)
|
|||
if command != "" {
|
||||
runCommand(c, command, m)
|
||||
} else {
|
||||
log.Debug("%s Printing raw message", logMessagePrefix(m))
|
||||
fmt.Fprintln(c.App.Writer, m.Raw)
|
||||
}
|
||||
}
|
||||
|
||||
func runCommand(c *cli.Context, command string, m *client.Message) {
|
||||
if err := runCommandInternal(c, command, m); err != nil {
|
||||
fmt.Fprintf(c.App.ErrWriter, "Command failed: %s\n", err.Error())
|
||||
log.Warn("%s Command failed: %s", logMessagePrefix(m), err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func runCommandInternal(c *cli.Context, script string, m *client.Message) error {
|
||||
scriptFile := fmt.Sprintf("%s/ntfy-subscribe-%s.%s", os.TempDir(), util.RandomString(10), scriptExt)
|
||||
if err := os.WriteFile(scriptFile, []byte(scriptHeader+script), 0700); err != nil {
|
||||
log.Debug("%s Running command '%s' via temporary script %s", logMessagePrefix(m), script, scriptFile)
|
||||
script = scriptHeader + script
|
||||
if err := os.WriteFile(scriptFile, []byte(script), 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.Remove(scriptFile)
|
||||
verbose := c.Bool("verbose")
|
||||
if verbose {
|
||||
log.Printf("[%s] Executing: %s (for message: %s)", util.ShortTopicURL(m.TopicURL), script, m.Raw)
|
||||
}
|
||||
log.Debug("%s Executing script %s", logMessagePrefix(m), scriptFile)
|
||||
cmd := exec.Command(scriptLauncher[0], append(scriptLauncher[1:], scriptFile)...)
|
||||
cmd.Stdin = c.App.Reader
|
||||
cmd.Stdout = c.App.Writer
|
||||
|
@ -228,7 +229,7 @@ func runCommandInternal(c *cli.Context, script string, m *client.Message) error
|
|||
}
|
||||
|
||||
func envVars(m *client.Message) []string {
|
||||
env := os.Environ()
|
||||
env := make([]string, 0)
|
||||
env = append(env, envVar(m.ID, "NTFY_ID", "id")...)
|
||||
env = append(env, envVar(m.Topic, "NTFY_TOPIC", "topic")...)
|
||||
env = append(env, envVar(fmt.Sprintf("%d", m.Time), "NTFY_TIME", "time")...)
|
||||
|
@ -237,7 +238,11 @@ func envVars(m *client.Message) []string {
|
|||
env = append(env, envVar(fmt.Sprintf("%d", m.Priority), "NTFY_PRIORITY", "priority", "prio", "p")...)
|
||||
env = append(env, envVar(strings.Join(m.Tags, ","), "NTFY_TAGS", "tags", "tag", "ta")...)
|
||||
env = append(env, envVar(m.Raw, "NTFY_RAW", "raw")...)
|
||||
return env
|
||||
sort.Strings(env)
|
||||
if log.IsTrace() {
|
||||
log.Trace("%s With environment:\n%s", logMessagePrefix(m), strings.Join(env, "\n"))
|
||||
}
|
||||
return append(os.Environ(), env...)
|
||||
}
|
||||
|
||||
func envVar(value string, vars ...string) []string {
|
||||
|
@ -276,3 +281,7 @@ func defaultClientConfigFileWindows() string {
|
|||
homeDir, _ := os.UserConfigDir()
|
||||
return filepath.Join(homeDir, clientUserConfigFileWindowsRelative)
|
||||
}
|
||||
|
||||
func logMessagePrefix(m *client.Message) string {
|
||||
return fmt.Sprintf("%s/%s", util.ShortTopicURL(m.TopicURL), m.ID)
|
||||
}
|
||||
|
|
|
@ -833,9 +833,9 @@ contents. The `TRACE` setting will also print the message contents.
|
|||
Both options are very verbose and should only be enabled in production for short periods of time. Otherwise,
|
||||
you're going to run out of disk space pretty quickly.
|
||||
|
||||
You can also hot-reload the `log-level` by sending the `SIGHUP` signal to the process (or by calling
|
||||
`systemctl reload ntfy` if it's running inside systemd). You can do so by calling `kill -HUP $(pidof ntfy)`.
|
||||
If succesfull, you'll see something like this:
|
||||
You can also hot-reload the `log-level` by sending the `SIGHUP` signal to the process after editing the `server.yml` file.
|
||||
You can do so by calling `systemctl reload ntfy` (if ntfy is running inside systemd), or by calling `kill -HUP $(pidof ntfy)`.
|
||||
If successful, you'll see something like this:
|
||||
|
||||
```
|
||||
$ ntfy serve
|
||||
|
|
|
@ -44,7 +44,7 @@ fi
|
|||
|
||||
## Server-sent messages in your web app
|
||||
Just as you can [subscribe to topics in the Web UI](subscribe/web.md), you can use ntfy in your own
|
||||
web application. Check out the <a href="/example.html">live example</a> or just look the source of this page.
|
||||
web application. Check out the <a href="/example.html">live example</a>.
|
||||
|
||||
## Notify on SSH login
|
||||
Years ago my home server was broken into. That shook me hard, so every time someone logs into any machine that I
|
||||
|
|
|
@ -10,12 +10,14 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
|
|||
|
||||
* Italian (thanks to [@Genio2003](https://hosted.weblate.org/user/Genio2003/))
|
||||
|
||||
-->
|
||||
|
||||
## ntfy server v1.25.0 (UNRELEASED)
|
||||
## ntfy server v1.25.0
|
||||
Released June 2, 2022
|
||||
|
||||
**Features:**
|
||||
|
||||
* Advanced logging, with different log levels and hot reloading of the log level (no ticket)
|
||||
* Advanced logging, with different log levels and hot reloading of the log level ([#284](https://github.com/binwiederhier/ntfy/pull/284))
|
||||
|
||||
**Bugs**:
|
||||
|
||||
|
@ -28,18 +30,16 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
|
|||
|
||||
**Documentation**:
|
||||
|
||||
* ⚠️ [Privacy policy](privacy.md) updated to reflect additional debug/tracing feature (no ticket)
|
||||
* [Examples](examples.md) for [Home Assistant](https://www.home-assistant.io/) ([#282](https://github.com/binwiederhier/ntfy/pull/282), thanks to [@poblabs](https://github.com/poblabs))
|
||||
* Install instructions for [NixOS/Nix](https://ntfy.sh/docs/install/#nixos-nix) ([#282](https://github.com/binwiederhier/ntfy/pull/282), thanks to [@arjan-s](https://github.com/arjan-s))
|
||||
* Clarify `poll_request` wording for [iOS push notifications](https://ntfy.sh/docs/config/#ios-instant-notifications) ([#300](https://github.com/binwiederhier/ntfy/issues/300), thanks to [@prabirshrestha](https://github.com/prabirshrestha) for reporting)
|
||||
* Example for using ntfy with docker-compose.yml without root privileges ([#304](https://github.com/binwiederhier/ntfy/pull/304), thanks to [@ksurl](https://github.com/ksurl))
|
||||
* Example for using ntfy with docker-compose.yml without root privileges ([#304](https://github.com/binwiederhier/ntfy/pull/304), thanks to [@ksurl](https://github.com/ksurl))
|
||||
|
||||
**Additional translations:**
|
||||
|
||||
* Chinese/Simplified (thanks to [@yufei.im](https://hosted.weblate.org/user/yufei.im/))
|
||||
|
||||
-->
|
||||
|
||||
|
||||
## ntfy iOS app v1.1
|
||||
Released May 31, 2022
|
||||
|
||||
|
|
|
@ -6,8 +6,8 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
_ "github.com/mattn/go-sqlite3" // SQLite driver
|
||||
"heckel.io/ntfy/log"
|
||||
"heckel.io/ntfy/util"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
@ -540,7 +540,7 @@ func setupNewCacheDB(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func migrateFrom0(db *sql.DB) error {
|
||||
log.Print("Migrating cache database schema: from 0 to 1")
|
||||
log.Info("Migrating cache database schema: from 0 to 1")
|
||||
if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -554,7 +554,7 @@ func migrateFrom0(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func migrateFrom1(db *sql.DB) error {
|
||||
log.Print("Migrating cache database schema: from 1 to 2")
|
||||
log.Info("Migrating cache database schema: from 1 to 2")
|
||||
if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -565,7 +565,7 @@ func migrateFrom1(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func migrateFrom2(db *sql.DB) error {
|
||||
log.Print("Migrating cache database schema: from 2 to 3")
|
||||
log.Info("Migrating cache database schema: from 2 to 3")
|
||||
if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -576,7 +576,7 @@ func migrateFrom2(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func migrateFrom3(db *sql.DB) error {
|
||||
log.Print("Migrating cache database schema: from 3 to 4")
|
||||
log.Info("Migrating cache database schema: from 3 to 4")
|
||||
if _, err := db.Exec(migrate3To4AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -587,7 +587,7 @@ func migrateFrom3(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func migrateFrom4(db *sql.DB) error {
|
||||
log.Print("Migrating cache database schema: from 4 to 5")
|
||||
log.Info("Migrating cache database schema: from 4 to 5")
|
||||
if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -598,7 +598,7 @@ func migrateFrom4(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func migrateFrom5(db *sql.DB) error {
|
||||
log.Print("Migrating cache database schema: from 5 to 6")
|
||||
log.Info("Migrating cache database schema: from 5 to 6")
|
||||
if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -609,7 +609,7 @@ func migrateFrom5(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func migrateFrom6(db *sql.DB) error {
|
||||
log.Print("Migrating cache database schema: from 6 to 7")
|
||||
log.Info("Migrating cache database schema: from 6 to 7")
|
||||
if _, err := db.Exec(migrate6To7AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -253,7 +253,7 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
|
|||
if isNormalError {
|
||||
log.Debug("%s WebSocket error (this error is okay, it happens a lot): %s", logHTTPPrefix(v, r), err.Error())
|
||||
} else {
|
||||
log.Warn("%s WebSocket error: %s", logHTTPPrefix(v, r), err.Error())
|
||||
log.Info("%s WebSocket error: %s", logHTTPPrefix(v, r), err.Error())
|
||||
}
|
||||
return // Do not attempt to write to upgraded connection
|
||||
}
|
||||
|
@ -446,7 +446,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
|
|||
log.Debug("%s Received message: event=%s, body=%d byte(s), delayed=%t, firebase=%t, cache=%t, up=%t, email=%s",
|
||||
logMessagePrefix(v, m), m.Event, len(m.Message), delayed, firebase, cache, unifiedpush, email)
|
||||
if log.IsTrace() {
|
||||
log.Trace("%s Message body: %s", logMessagePrefix(v, m), maybeMarshalJSON(m))
|
||||
log.Trace("%s Message body: %s", logMessagePrefix(v, m), util.MaybeMarshalJSON(m))
|
||||
}
|
||||
if !delayed {
|
||||
if err := t.Publish(v, m); err != nil {
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"google.golang.org/api/option"
|
||||
"heckel.io/ntfy/auth"
|
||||
"heckel.io/ntfy/log"
|
||||
"heckel.io/ntfy/util"
|
||||
"strings"
|
||||
)
|
||||
|
||||
|
@ -45,7 +46,7 @@ func (c *firebaseClient) Send(v *visitor, m *message) error {
|
|||
return err
|
||||
}
|
||||
if log.IsTrace() {
|
||||
log.Trace("%s Firebase message: %s", logMessagePrefix(v, m), maybeMarshalJSON(fbm))
|
||||
log.Trace("%s Firebase message: %s", logMessagePrefix(v, m), util.MaybeMarshalJSON(fbm))
|
||||
}
|
||||
err = c.sender.Send(fbm)
|
||||
if err == errFirebaseQuotaExceeded {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/emersion/go-smtp"
|
||||
"net/http"
|
||||
|
@ -59,14 +58,3 @@ func logHTTPPrefix(v *visitor, r *http.Request) string {
|
|||
func logSMTPPrefix(state *smtp.ConnectionState) string {
|
||||
return fmt.Sprintf("%s/%s SMTP", state.Hostname, state.RemoteAddr.String())
|
||||
}
|
||||
|
||||
func maybeMarshalJSON(v interface{}) string {
|
||||
messageJSON, err := json.MarshalIndent(v, "", " ")
|
||||
if err != nil {
|
||||
return "<cannot serialize>"
|
||||
}
|
||||
if len(messageJSON) > 5000 {
|
||||
return string(messageJSON)[:5000]
|
||||
}
|
||||
return string(messageJSON)
|
||||
}
|
||||
|
|
14
util/util.go
14
util/util.go
|
@ -2,6 +2,7 @@ package util
|
|||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gabriel-vasile/mimetype"
|
||||
|
@ -264,3 +265,16 @@ func ReadPassword(in io.Reader) ([]byte, error) {
|
|||
func BasicAuth(user, pass string) string {
|
||||
return fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", user, pass))))
|
||||
}
|
||||
|
||||
// MaybeMarshalJSON returns a JSON string of the given object, or "<cannot serialize>" if serialization failed.
|
||||
// This is useful for logging purposes where a failure doesn't matter that much.
|
||||
func MaybeMarshalJSON(v interface{}) string {
|
||||
jsonBytes, err := json.MarshalIndent(v, "", " ")
|
||||
if err != nil {
|
||||
return "<cannot serialize>"
|
||||
}
|
||||
if len(jsonBytes) > 5000 {
|
||||
return string(jsonBytes)[:5000]
|
||||
}
|
||||
return string(jsonBytes)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue