2021-10-23 01:26:01 +00:00
package server
import (
"bytes"
2021-10-29 03:50:38 +00:00
"context"
2022-05-27 11:55:57 +00:00
"crypto/sha256"
2021-12-23 23:03:04 +00:00
"embed"
2022-01-17 18:28:07 +00:00
"encoding/base64"
2021-10-23 01:26:01 +00:00
"encoding/json"
2022-12-03 20:20:59 +00:00
"errors"
2021-10-23 17:21:33 +00:00
"fmt"
2022-12-25 16:41:38 +00:00
"heckel.io/ntfy/user"
2021-10-23 01:26:01 +00:00
"io"
2021-10-24 02:49:50 +00:00
"net"
2021-10-23 01:26:01 +00:00
"net/http"
2022-10-05 20:42:07 +00:00
"net/netip"
2022-01-14 17:13:14 +00:00
"net/url"
2022-01-10 21:28:13 +00:00
"os"
2022-01-14 17:13:14 +00:00
"path"
2022-01-02 22:56:12 +00:00
"path/filepath"
2021-10-23 01:26:01 +00:00
"regexp"
2022-06-20 16:11:52 +00:00
"sort"
2021-10-29 17:58:14 +00:00
"strconv"
2021-10-23 01:26:01 +00:00
"strings"
"sync"
"time"
2022-01-02 22:56:12 +00:00
"unicode/utf8"
2022-05-13 17:08:07 +00:00
2022-06-11 01:33:39 +00:00
"heckel.io/ntfy/log"
2022-05-13 17:08:07 +00:00
"github.com/emersion/go-smtp"
"github.com/gorilla/websocket"
"golang.org/x/sync/errgroup"
"heckel.io/ntfy/util"
2021-10-23 01:26:01 +00:00
)
2022-12-08 01:44:20 +00:00
/ *
TODO
2022-12-25 16:41:38 +00:00
expire tokens
auto - extend tokens from UI
2022-12-21 18:19:07 +00:00
use token auth in "SubscribeDialog"
upload files based on user limit
2022-12-25 16:41:38 +00:00
database migration
2022-12-21 02:18:33 +00:00
publishXHR + poll should pick current user , not from userManager
2022-12-09 01:50:48 +00:00
reserve topics
2022-12-20 03:19:44 +00:00
purge accounts that were not logged into in X
sync subscription display name
2022-12-24 17:10:51 +00:00
reset daily limits for users
2022-12-21 18:19:07 +00:00
store users
2022-12-12 19:52:37 +00:00
Pages :
- Home
- Password reset
- Pricing
2022-12-13 20:19:40 +00:00
- change email
-
2022-12-17 18:49:32 +00:00
Polishing :
aria - label for everything
2022-12-08 01:44:20 +00:00
2022-12-25 16:41:38 +00:00
Tests :
- APIs
- CRUD tokens
- Expire tokens
-
2022-12-08 01:44:20 +00:00
* /
2021-12-07 16:45:15 +00:00
// Server is the main server, providing the UI and API for ntfy
2021-10-23 01:26:01 +00:00
type Server struct {
2022-06-02 03:24:44 +00:00
config * Config
httpServer * http . Server
httpsServer * http . Server
unixListener net . Listener
smtpServer * smtp . Server
smtpServerBackend * smtpBackend
smtpSender mailer
topics map [ string ] * topic
2022-12-02 20:37:48 +00:00
visitors map [ string ] * visitor // ip:<ip> or user:<user>
2022-06-02 03:24:44 +00:00
firebaseClient * firebaseClient
messages int64
2022-12-25 16:41:38 +00:00
userManager user . Manager
2022-06-02 03:24:44 +00:00
messageCache * messageCache
fileCache * fileCache
closeChan chan bool
mu sync . Mutex
2021-10-23 01:26:01 +00:00
}
2022-01-22 03:22:27 +00:00
// handleFunc extends the normal http.HandlerFunc to be able to easily return errors
type handleFunc func ( http . ResponseWriter , * http . Request , * visitor ) error
2021-10-23 01:26:01 +00:00
var (
2022-01-31 16:44:58 +00:00
// If changed, don't forget to update Android App and auth_sqlite.go
2022-03-23 18:29:55 +00:00
topicRegex = regexp . MustCompile ( ` ^[-_A-Za-z0-9] { 1,64}$ ` ) // No /!
topicPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}$ ` ) // Regex must match JS & Android app!
externalTopicPathRegex = regexp . MustCompile ( ` ^/[^/]+\.[^/]+/[-_A-Za-z0-9] { 1,64}$ ` ) // Extended topic path, for web-app, e.g. /example.com/mytopic
jsonPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/json$ ` )
ssePathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/sse$ ` )
rawPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/raw$ ` )
wsPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/ws$ ` )
authPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/auth$ ` )
publishPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}/(publish|send|trigger)$ ` )
2021-10-29 17:58:14 +00:00
2022-12-15 04:11:22 +00:00
webConfigPath = "/config.js"
2022-12-24 17:26:56 +00:00
healthPath = "/v1/health"
2022-12-15 04:11:22 +00:00
accountPath = "/v1/account"
accountTokenPath = "/v1/account/token"
2022-12-16 03:07:04 +00:00
accountPasswordPath = "/v1/account/password"
2022-12-15 04:11:22 +00:00
accountSettingsPath = "/v1/account/settings"
accountSubscriptionPath = "/v1/account/subscription"
accountSubscriptionSingleRegex = regexp . MustCompile ( ` ^/v1/account/subscription/([-_A-Za-z0-9] { 16})$ ` )
matrixPushPath = "/_matrix/push/v1/notify"
staticRegex = regexp . MustCompile ( ` ^/static/.+ ` )
docsRegex = regexp . MustCompile ( ` ^/docs(|/.*)$ ` )
fileRegex = regexp . MustCompile ( ` ^/file/([-_A-Za-z0-9] { 1,64})(?:\.[A-Za-z0-9] { 1,16})?$ ` )
2022-12-21 18:19:07 +00:00
disallowedTopics = [ ] string { "docs" , "static" , "file" , "app" , "account" , "settings" , "pricing" , "signup" , "login" , "reset-password" } // If updated, also update in Android and web app
2022-12-15 04:11:22 +00:00
urlRegex = regexp . MustCompile ( ` ^https?:// ` )
2021-10-23 01:26:01 +00:00
2022-03-06 01:24:10 +00:00
//go:embed site
webFs embed . FS
webFsCached = & util . CachingEmbedFS { ModTime : time . Now ( ) , FS : webFs }
webSiteDir = "/site"
2022-03-06 02:28:25 +00:00
webHomeIndex = "/home.html" // Landing page, only if "web-root: home"
2022-03-06 01:24:10 +00:00
webAppIndex = "/app.html" // React app
2021-10-24 18:22:53 +00:00
2021-12-02 22:27:31 +00:00
//go:embed docs
2021-12-07 15:38:58 +00:00
docsStaticFs embed . FS
2021-12-02 22:27:31 +00:00
docsStaticCached = & util . CachingEmbedFS { ModTime : time . Now ( ) , FS : docsStaticFs }
2021-10-23 01:26:01 +00:00
)
2021-12-14 03:30:28 +00:00
const (
2022-01-13 02:24:48 +00:00
firebaseControlTopic = "~control" // See Android if changed
2022-05-26 01:39:46 +00:00
firebasePollTopic = "~poll" // See iOS if changed
2022-01-13 02:24:48 +00:00
emptyMessageBody = "triggered" // Used if message body is empty
2022-05-27 11:55:57 +00:00
newMessageBody = "New message" // Used in poll requests as generic message
2022-01-13 02:24:48 +00:00
defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
2022-01-17 18:28:07 +00:00
encodingBase64 = "base64"
2022-01-16 03:33:35 +00:00
)
// WebSocket constants
const (
wsWriteWait = 2 * time . Second
wsBufferSize = 1024
wsReadLimit = 64 // We only ever receive PINGs
wsPongWait = 15 * time . Second
2021-12-14 03:30:28 +00:00
)
2021-12-07 16:45:15 +00:00
// New instantiates a new Server. It creates the cache and adds a Firebase
// subscriber (if configured).
2021-12-19 03:02:36 +00:00
func New ( conf * Config ) ( * Server , error ) {
2021-12-23 23:03:04 +00:00
var mailer mailer
2021-12-27 15:39:28 +00:00
if conf . SMTPSenderAddr != "" {
mailer = & smtpSender { config : conf }
2021-12-23 23:03:04 +00:00
}
2022-02-27 19:47:28 +00:00
messageCache , err := createMessageCache ( conf )
2021-11-02 18:08:21 +00:00
if err != nil {
return nil , err
}
2022-02-27 19:47:28 +00:00
topics , err := messageCache . Topics ( )
2021-11-03 01:09:49 +00:00
if err != nil {
return nil , err
2021-11-02 18:08:21 +00:00
}
2022-01-07 13:49:28 +00:00
var fileCache * fileCache
2022-01-02 22:56:12 +00:00
if conf . AttachmentCacheDir != "" {
2022-12-21 18:19:07 +00:00
fileCache , err = newFileCache ( conf . AttachmentCacheDir , conf . AttachmentTotalSizeLimit )
2022-01-07 13:49:28 +00:00
if err != nil {
2022-01-02 22:56:12 +00:00
return nil , err
}
}
2022-12-25 16:41:38 +00:00
var auther user . Manager
2022-01-23 04:01:20 +00:00
if conf . AuthFile != "" {
2022-12-25 16:41:38 +00:00
auther , err = user . NewSQLiteAuthManager ( conf . AuthFile , conf . AuthDefaultRead , conf . AuthDefaultWrite )
2022-01-23 04:01:20 +00:00
if err != nil {
return nil , err
}
2022-01-22 19:47:27 +00:00
}
2022-06-01 03:16:44 +00:00
var firebaseClient * firebaseClient
2022-02-01 00:33:22 +00:00
if conf . FirebaseKeyFile != "" {
2022-06-01 03:16:44 +00:00
sender , err := newFirebaseSender ( conf . FirebaseKeyFile )
2022-02-01 00:33:22 +00:00
if err != nil {
return nil , err
}
2022-06-01 03:16:44 +00:00
firebaseClient = newFirebaseClient ( sender , auther )
2022-02-01 00:33:22 +00:00
}
2021-10-23 01:26:01 +00:00
return & Server {
2022-06-01 03:16:44 +00:00
config : conf ,
messageCache : messageCache ,
fileCache : fileCache ,
firebaseClient : firebaseClient ,
2022-06-02 03:24:44 +00:00
smtpSender : mailer ,
2022-06-01 03:16:44 +00:00
topics : topics ,
2022-12-25 16:41:38 +00:00
userManager : auther ,
2022-12-02 20:37:48 +00:00
visitors : make ( map [ string ] * visitor ) ,
2021-10-29 03:50:38 +00:00
} , nil
2021-10-23 01:26:01 +00:00
}
2022-02-27 19:47:28 +00:00
func createMessageCache ( conf * Config ) ( * messageCache , error ) {
2021-12-09 15:23:17 +00:00
if conf . CacheDuration == 0 {
2022-02-27 14:38:46 +00:00
return newNopCache ( )
2021-12-09 15:23:17 +00:00
} else if conf . CacheFile != "" {
2022-11-16 15:28:20 +00:00
return newSqliteCache ( conf . CacheFile , conf . CacheStartupQueries , conf . CacheBatchSize , conf . CacheBatchTimeout , false )
2021-11-02 18:08:21 +00:00
}
2022-02-27 14:38:46 +00:00
return newMemCache ( )
2021-11-02 18:08:21 +00:00
}
2021-12-07 16:45:15 +00:00
// Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
// a manager go routine to print stats and prune messages.
2021-10-23 01:26:01 +00:00
func ( s * Server ) Run ( ) error {
2022-01-10 21:28:13 +00:00
var listenStr string
if s . config . ListenHTTP != "" {
listenStr += fmt . Sprintf ( " %s[http]" , s . config . ListenHTTP )
}
2021-12-02 13:52:48 +00:00
if s . config . ListenHTTPS != "" {
2022-01-10 21:28:13 +00:00
listenStr += fmt . Sprintf ( " %s[https]" , s . config . ListenHTTPS )
}
if s . config . ListenUnix != "" {
2022-07-03 23:33:01 +00:00
listenStr += fmt . Sprintf ( " %s[unix]" , s . config . ListenUnix )
2021-12-02 13:52:48 +00:00
}
2021-12-27 21:06:40 +00:00
if s . config . SMTPServerListen != "" {
2022-01-10 21:28:13 +00:00
listenStr += fmt . Sprintf ( " %s[smtp]" , s . config . SMTPServerListen )
2021-12-27 21:06:40 +00:00
}
2022-06-12 15:54:58 +00:00
log . Info ( "Listening on%s, ntfy %s, log level is %s" , listenStr , s . config . Version , log . CurrentLevel ( ) . String ( ) )
2021-12-22 22:45:19 +00:00
mux := http . NewServeMux ( )
mux . HandleFunc ( "/" , s . handle )
2021-12-02 13:52:48 +00:00
errChan := make ( chan error )
2021-12-22 13:17:50 +00:00
s . mu . Lock ( )
s . closeChan = make ( chan bool )
2022-01-10 21:28:13 +00:00
if s . config . ListenHTTP != "" {
s . httpServer = & http . Server { Addr : s . config . ListenHTTP , Handler : mux }
go func ( ) {
errChan <- s . httpServer . ListenAndServe ( )
} ( )
}
2021-12-02 13:52:48 +00:00
if s . config . ListenHTTPS != "" {
2022-01-06 13:45:23 +00:00
s . httpsServer = & http . Server { Addr : s . config . ListenHTTPS , Handler : mux }
2021-12-02 13:52:48 +00:00
go func ( ) {
2021-12-22 13:17:50 +00:00
errChan <- s . httpsServer . ListenAndServeTLS ( s . config . CertFile , s . config . KeyFile )
2021-12-02 13:52:48 +00:00
} ( )
}
2022-01-10 21:28:13 +00:00
if s . config . ListenUnix != "" {
go func ( ) {
var err error
s . mu . Lock ( )
os . Remove ( s . config . ListenUnix )
s . unixListener , err = net . Listen ( "unix" , s . config . ListenUnix )
if err != nil {
2022-07-03 23:33:01 +00:00
s . mu . Unlock ( )
2022-01-10 21:28:13 +00:00
errChan <- err
return
}
2022-07-03 23:33:01 +00:00
defer s . unixListener . Close ( )
if s . config . ListenUnixMode > 0 {
if err := os . Chmod ( s . config . ListenUnix , s . config . ListenUnixMode ) ; err != nil {
s . mu . Unlock ( )
errChan <- err
return
}
2022-07-03 19:27:36 +00:00
}
2022-01-10 21:28:13 +00:00
s . mu . Unlock ( )
httpServer := & http . Server { Handler : mux }
errChan <- httpServer . Serve ( s . unixListener )
} ( )
}
2021-12-27 15:39:28 +00:00
if s . config . SMTPServerListen != "" {
2021-12-27 14:48:09 +00:00
go func ( ) {
2021-12-27 21:06:40 +00:00
errChan <- s . runSMTPServer ( )
2021-12-27 14:48:09 +00:00
} ( )
}
2021-12-22 13:17:50 +00:00
s . mu . Unlock ( )
2021-12-22 22:20:43 +00:00
go s . runManager ( )
2022-06-01 00:38:56 +00:00
go s . runDelayedSender ( )
2022-01-21 19:17:59 +00:00
go s . runFirebaseKeepaliver ( )
2021-12-27 14:48:09 +00:00
2021-12-02 13:52:48 +00:00
return <- errChan
2021-10-23 01:26:01 +00:00
}
2021-12-22 13:17:50 +00:00
// Stop stops HTTP (+HTTPS) server and all managers
func ( s * Server ) Stop ( ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
if s . httpServer != nil {
s . httpServer . Close ( )
}
if s . httpsServer != nil {
s . httpsServer . Close ( )
}
2022-01-10 21:28:13 +00:00
if s . unixListener != nil {
s . unixListener . Close ( )
}
2021-12-27 21:06:40 +00:00
if s . smtpServer != nil {
s . smtpServer . Close ( )
}
2021-12-22 13:17:50 +00:00
close ( s . closeChan )
}
2021-10-23 01:26:01 +00:00
func ( s * Server ) handle ( w http . ResponseWriter , r * http . Request ) {
2022-12-02 20:37:48 +00:00
v , err := s . visitor ( r ) // Note: Always returns v, even when error is returned
if err == nil {
log . Debug ( "%s Dispatching request" , logHTTPPrefix ( v , r ) )
if log . IsTrace ( ) {
log . Trace ( "%s Entire request (headers and body):\n%s" , logHTTPPrefix ( v , r ) , renderHTTPRequest ( r ) )
}
err = s . handleInternal ( w , r , v )
2022-06-20 01:25:35 +00:00
}
2022-12-02 20:37:48 +00:00
if err != nil {
2022-01-16 03:33:35 +00:00
if websocket . IsWebSocketUpgrade ( r ) {
2022-06-02 03:24:44 +00:00
isNormalError := strings . Contains ( err . Error ( ) , "i/o timeout" )
2022-06-01 20:57:35 +00:00
if isNormalError {
2022-06-02 03:24:44 +00:00
log . Debug ( "%s WebSocket error (this error is okay, it happens a lot): %s" , logHTTPPrefix ( v , r ) , err . Error ( ) )
2022-06-01 20:57:35 +00:00
} else {
2022-06-02 15:59:22 +00:00
log . Info ( "%s WebSocket error: %s" , logHTTPPrefix ( v , r ) , err . Error ( ) )
2022-06-01 20:57:35 +00:00
}
2022-01-16 03:33:35 +00:00
return // Do not attempt to write to upgraded connection
2021-10-24 02:49:50 +00:00
}
2022-06-16 00:36:49 +00:00
if matrixErr , ok := err . ( * errMatrix ) ; ok {
writeMatrixError ( w , r , v , matrixErr )
return
}
2022-01-16 03:33:35 +00:00
httpErr , ok := err . ( * errHTTP )
if ! ok {
httpErr = errHTTPInternalError
}
2022-06-03 00:59:07 +00:00
isNormalError := httpErr . HTTPCode == http . StatusNotFound || httpErr . HTTPCode == http . StatusBadRequest
2022-06-01 20:57:35 +00:00
if isNormalError {
2022-06-02 03:24:44 +00:00
log . Debug ( "%s Connection closed with HTTP %d (ntfy error %d): %s" , logHTTPPrefix ( v , r ) , httpErr . HTTPCode , httpErr . Code , err . Error ( ) )
2022-06-01 20:57:35 +00:00
} else {
2022-06-02 03:24:44 +00:00
log . Info ( "%s Connection closed with HTTP %d (ntfy error %d): %s" , logHTTPPrefix ( v , r ) , httpErr . HTTPCode , httpErr . Code , err . Error ( ) )
2022-06-01 20:57:35 +00:00
}
2021-12-25 14:15:05 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
2022-01-16 03:33:35 +00:00
w . WriteHeader ( httpErr . HTTPCode )
io . WriteString ( w , httpErr . JSON ( ) + "\n" )
2021-10-23 01:26:01 +00:00
}
}
2022-02-14 21:09:59 +00:00
func ( s * Server ) handleInternal ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-05-13 18:42:25 +00:00
if r . Method == http . MethodGet && r . URL . Path == "/" {
return s . ensureWebEnabled ( s . handleHome ) ( w , r , v )
2021-11-05 17:46:27 +00:00
} else if r . Method == http . MethodHead && r . URL . Path == "/" {
2022-05-13 18:42:25 +00:00
return s . ensureWebEnabled ( s . handleEmpty ) ( w , r , v )
2022-12-24 17:22:54 +00:00
} else if r . Method == http . MethodGet && r . URL . Path == healthPath {
return s . handleHealth ( w , r , v )
2022-05-13 18:42:25 +00:00
} else if r . Method == http . MethodGet && r . URL . Path == webConfigPath {
return s . ensureWebEnabled ( s . handleWebConfig ) ( w , r , v )
2022-12-15 04:11:22 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == accountPath {
2022-12-16 03:07:04 +00:00
return s . handleAccountCreate ( w , r , v )
2022-12-17 20:17:52 +00:00
} else if r . Method == http . MethodGet && r . URL . Path == accountPath {
return s . handleAccountGet ( w , r , v )
2022-12-16 03:07:04 +00:00
} else if r . Method == http . MethodDelete && r . URL . Path == accountPath {
return s . handleAccountDelete ( w , r , v )
} else if r . Method == http . MethodPost && r . URL . Path == accountPasswordPath {
return s . handleAccountPasswordChange ( w , r , v )
2022-12-25 16:41:38 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == accountTokenPath {
return s . handleAccountTokenIssue ( w , r , v )
} else if r . Method == http . MethodPatch && r . URL . Path == accountTokenPath {
return s . handleAccountTokenExtend ( w , r , v )
2022-12-15 04:11:22 +00:00
} else if r . Method == http . MethodDelete && r . URL . Path == accountTokenPath {
return s . handleAccountTokenDelete ( w , r , v )
2022-12-25 16:41:38 +00:00
} else if r . Method == http . MethodPatch && r . URL . Path == accountSettingsPath {
2022-12-16 03:07:04 +00:00
return s . handleAccountSettingsChange ( w , r , v )
2022-12-15 04:11:22 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == accountSubscriptionPath {
return s . handleAccountSubscriptionAdd ( w , r , v )
} else if r . Method == http . MethodDelete && accountSubscriptionSingleRegex . MatchString ( r . URL . Path ) {
return s . handleAccountSubscriptionDelete ( w , r , v )
2022-06-15 20:03:12 +00:00
} else if r . Method == http . MethodGet && r . URL . Path == matrixPushPath {
return s . handleMatrixDiscovery ( w )
2022-05-13 18:42:25 +00:00
} else if r . Method == http . MethodGet && staticRegex . MatchString ( r . URL . Path ) {
return s . ensureWebEnabled ( s . handleStatic ) ( w , r , v )
} else if r . Method == http . MethodGet && docsRegex . MatchString ( r . URL . Path ) {
return s . ensureWebEnabled ( s . handleDocs ) ( w , r , v )
2022-06-11 01:33:39 +00:00
} else if ( r . Method == http . MethodGet || r . Method == http . MethodHead ) && fileRegex . MatchString ( r . URL . Path ) && s . config . AttachmentCacheDir != "" {
2022-01-22 03:22:27 +00:00
return s . limitRequests ( s . handleFile ) ( w , r , v )
2021-11-05 17:46:27 +00:00
} else if r . Method == http . MethodOptions {
2022-05-13 18:42:25 +00:00
return s . ensureWebEnabled ( s . handleOptions ) ( w , r , v )
2022-03-16 18:16:54 +00:00
} else if ( r . Method == http . MethodPut || r . Method == http . MethodPost ) && r . URL . Path == "/" {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . transformBodyJSON ( s . authorizeTopicWrite ( s . handlePublish ) ) ) ( w , r , v )
2022-06-14 02:07:30 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == matrixPushPath {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . transformMatrixJSON ( s . authorizeTopicWrite ( s . handlePublishMatrix ) ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if ( r . Method == http . MethodPut || r . Method == http . MethodPost ) && topicPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicWrite ( s . handlePublish ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if r . Method == http . MethodGet && publishPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicWrite ( s . handlePublish ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if r . Method == http . MethodGet && jsonPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleSubscribeJSON ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if r . Method == http . MethodGet && ssePathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleSubscribeSSE ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if r . Method == http . MethodGet && rawPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleSubscribeRaw ) ) ( w , r , v )
2022-01-15 18:23:35 +00:00
} else if r . Method == http . MethodGet && wsPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleSubscribeWS ) ) ( w , r , v )
2022-01-26 04:04:09 +00:00
} else if r . Method == http . MethodGet && authPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleTopicAuth ) ) ( w , r , v )
2022-03-23 18:29:55 +00:00
} else if r . Method == http . MethodGet && ( topicPathRegex . MatchString ( r . URL . Path ) || externalTopicPathRegex . MatchString ( r . URL . Path ) ) {
2022-05-13 18:42:25 +00:00
return s . ensureWebEnabled ( s . handleTopic ) ( w , r , v )
2021-10-23 01:26:01 +00:00
}
2021-10-24 02:49:50 +00:00
return errHTTPNotFound
2021-10-23 01:26:01 +00:00
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleHome ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-03-06 02:28:25 +00:00
if s . config . WebRootIsApp {
r . URL . Path = webAppIndex
} else {
r . URL . Path = webHomeIndex
}
2022-05-13 18:42:25 +00:00
return s . handleStatic ( w , r , v )
2021-10-23 01:26:01 +00:00
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleTopic ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-01-16 04:17:46 +00:00
unifiedpush := readBoolParam ( r , false , "x-unifiedpush" , "unifiedpush" , "up" ) // see PUT/POST too!
2021-12-25 21:07:55 +00:00
if unifiedpush {
w . Header ( ) . Set ( "Content-Type" , "application/json" )
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
_ , err := io . WriteString ( w , ` { "unifiedpush": { "version":1}} ` + "\n" )
return err
}
2022-03-06 01:24:10 +00:00
r . URL . Path = webAppIndex
2022-05-13 18:42:25 +00:00
return s . handleStatic ( w , r , v )
2021-12-25 21:07:55 +00:00
}
2022-01-26 04:04:09 +00:00
func ( s * Server ) handleEmpty ( _ http . ResponseWriter , _ * http . Request , _ * visitor ) error {
2021-11-05 17:46:27 +00:00
return nil
}
2022-01-26 04:04:09 +00:00
func ( s * Server ) handleTopicAuth ( w http . ResponseWriter , _ * http . Request , _ * visitor ) error {
w . Header ( ) . Set ( "Content-Type" , "application/json" )
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
_ , err := io . WriteString ( w , ` { "success":true} ` + "\n" )
return err
}
2022-12-24 17:22:54 +00:00
func ( s * Server ) handleHealth ( w http . ResponseWriter , _ * http . Request , _ * visitor ) error {
response := & apiHealthResponse {
Healthy : true ,
}
w . Header ( ) . Set ( "Content-Type" , "text/json" )
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
if err := json . NewEncoder ( w ) . Encode ( response ) ; err != nil {
return err
}
return nil
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleWebConfig ( w http . ResponseWriter , _ * http . Request , _ * visitor ) error {
2022-03-10 04:28:55 +00:00
appRoot := "/"
if ! s . config . WebRootIsApp {
appRoot = "/app"
}
disallowedTopicsStr := ` " ` + strings . Join ( disallowedTopics , ` ", " ` ) + ` " `
2022-03-11 20:56:54 +00:00
w . Header ( ) . Set ( "Content-Type" , "text/javascript" )
2022-03-10 04:28:55 +00:00
_ , err := io . WriteString ( w , fmt . Sprintf ( ` // Generated server configuration
2022-03-11 20:56:54 +00:00
var config = {
2022-12-21 18:19:07 +00:00
baseUrl : window . location . origin ,
2022-03-10 04:28:55 +00:00
appRoot : "%s" ,
2022-12-21 18:19:07 +00:00
enableLogin : % t ,
enableSignup : % t ,
enableResetPassword : % t ,
disallowedTopics : [ % s ] ,
} ; ` , appRoot , s . config . EnableLogin , s . config . EnableSignup , s . config . EnableResetPassword , disallowedTopicsStr ) )
2022-03-10 04:28:55 +00:00
return err
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleStatic ( w http . ResponseWriter , r * http . Request , _ * visitor ) error {
2022-03-06 01:24:10 +00:00
r . URL . Path = webSiteDir + r . URL . Path
2022-03-11 02:55:56 +00:00
util . Gzip ( http . FileServer ( http . FS ( webFsCached ) ) ) . ServeHTTP ( w , r )
2021-10-29 17:58:14 +00:00
return nil
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleDocs ( w http . ResponseWriter , r * http . Request , _ * visitor ) error {
2022-03-11 02:55:56 +00:00
util . Gzip ( http . FileServer ( http . FS ( docsStaticCached ) ) ) . ServeHTTP ( w , r )
2021-12-02 22:27:31 +00:00
return nil
}
2022-01-12 23:52:07 +00:00
func ( s * Server ) handleFile ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-01-02 22:56:12 +00:00
if s . config . AttachmentCacheDir == "" {
2022-01-03 23:55:08 +00:00
return errHTTPInternalError
2022-01-02 22:56:12 +00:00
}
matches := fileRegex . FindStringSubmatch ( r . URL . Path )
if len ( matches ) != 2 {
return errHTTPInternalErrorInvalidFilePath
}
messageID := matches [ 1 ]
file := filepath . Join ( s . config . AttachmentCacheDir , messageID )
stat , err := os . Stat ( file )
if err != nil {
return errHTTPNotFound
}
2022-06-11 01:33:39 +00:00
if r . Method == http . MethodGet {
if err := v . BandwidthLimiter ( ) . Allow ( stat . Size ( ) ) ; err != nil {
return errHTTPTooManyRequestsAttachmentBandwidthLimit
}
2022-01-12 23:52:07 +00:00
}
2022-01-12 16:05:04 +00:00
w . Header ( ) . Set ( "Content-Length" , fmt . Sprintf ( "%d" , stat . Size ( ) ) )
2022-03-25 21:17:24 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
2022-06-11 01:33:39 +00:00
if r . Method == http . MethodGet {
f , err := os . Open ( file )
if err != nil {
return err
}
defer f . Close ( )
_ , err = io . Copy ( util . NewContentTypeWriter ( w , r . URL . Path ) , f )
2022-01-02 22:56:12 +00:00
return err
}
2022-06-11 01:33:39 +00:00
return nil
2022-01-02 22:56:12 +00:00
}
2022-06-15 20:03:12 +00:00
func ( s * Server ) handleMatrixDiscovery ( w http . ResponseWriter ) error {
2022-06-20 01:25:35 +00:00
if s . config . BaseURL == "" {
return errHTTPInternalErrorMissingBaseURL
}
2022-06-16 15:40:56 +00:00
return writeMatrixDiscoveryResponse ( w )
2022-06-15 20:03:12 +00:00
}
2022-06-15 00:43:17 +00:00
func ( s * Server ) handlePublishWithoutResponse ( r * http . Request , v * visitor ) ( * message , error ) {
2021-12-15 14:41:55 +00:00
t , err := s . topicFromPath ( r . URL . Path )
2021-11-01 20:39:40 +00:00
if err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-11-01 20:39:40 +00:00
}
2022-04-03 16:39:52 +00:00
body , err := util . Peek ( r . Body , s . config . MessageLimit )
2021-10-23 01:26:01 +00:00
if err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-10-23 01:26:01 +00:00
}
2022-01-02 22:56:12 +00:00
m := newDefaultMessage ( t . ID , "" )
2022-01-17 18:28:07 +00:00
cache , firebase , email , unifiedpush , err := s . parsePublishParams ( r , v , m )
2021-12-10 16:31:42 +00:00
if err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-10-29 03:50:38 +00:00
}
2022-05-27 11:55:57 +00:00
if m . PollID != "" {
m = newPollRequestMessage ( t . ID , m . PollID )
}
2022-12-20 02:42:36 +00:00
if v . user != nil {
m . User = v . user . Name
}
2022-01-17 18:28:07 +00:00
if err := s . handlePublishBody ( r , v , m , body , unifiedpush ) ; err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-12-23 23:03:04 +00:00
}
2021-12-15 14:41:55 +00:00
if m . Message == "" {
2021-12-23 23:03:04 +00:00
m . Message = emptyMessageBody
2021-12-15 14:41:55 +00:00
}
2021-12-10 16:31:42 +00:00
delayed := m . Time > time . Now ( ) . Unix ( )
2022-12-20 02:42:36 +00:00
log . Debug ( "%s Received message: event=%s, user=%s, body=%d byte(s), delayed=%t, firebase=%t, cache=%t, up=%t, email=%s" ,
logMessagePrefix ( v , m ) , m . Event , m . User , len ( m . Message ) , delayed , firebase , cache , unifiedpush , email )
2022-06-02 03:24:44 +00:00
if log . IsTrace ( ) {
2022-06-02 15:59:22 +00:00
log . Trace ( "%s Message body: %s" , logMessagePrefix ( v , m ) , util . MaybeMarshalJSON ( m ) )
2022-06-02 03:24:44 +00:00
}
2021-12-10 16:31:42 +00:00
if ! delayed {
2022-06-01 00:38:56 +00:00
if err := t . Publish ( v , m ) ; err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-12-10 16:31:42 +00:00
}
2022-06-01 20:57:35 +00:00
if s . firebaseClient != nil && firebase {
go s . sendToFirebase ( v , m )
}
2022-06-02 03:24:44 +00:00
if s . smtpSender != nil && email != "" {
2022-12-19 14:59:32 +00:00
v . IncrEmails ( )
2022-06-01 20:57:35 +00:00
go s . sendEmail ( v , m , email )
}
if s . config . UpstreamBaseURL != "" {
go s . forwardPollRequest ( v , m )
}
} else {
2022-06-02 03:24:44 +00:00
log . Debug ( "%s Message delayed, will process later" , logMessagePrefix ( v , m ) )
2022-05-27 11:55:57 +00:00
}
2021-12-09 15:23:17 +00:00
if cache {
2022-11-16 15:28:20 +00:00
log . Debug ( "%s Adding message to cache" , logMessagePrefix ( v , m ) )
if err := s . messageCache . AddMessage ( m ) ; err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2022-11-16 15:28:20 +00:00
}
2021-11-02 18:08:21 +00:00
}
2022-12-19 14:59:32 +00:00
v . IncrMessages ( )
2022-12-21 02:18:33 +00:00
if v . user != nil {
2022-12-25 16:41:38 +00:00
s . userManager . EnqueueStats ( v . user )
2022-12-21 02:18:33 +00:00
}
2022-06-15 00:43:17 +00:00
s . mu . Lock ( )
s . messages ++
s . mu . Unlock ( )
return m , nil
}
func ( s * Server ) handlePublish ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
m , err := s . handlePublishWithoutResponse ( r , v )
if err != nil {
return err
}
2021-12-15 21:12:40 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2021-10-24 17:34:15 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
2021-11-03 15:33:34 +00:00
if err := json . NewEncoder ( w ) . Encode ( m ) ; err != nil {
return err
}
2022-06-15 00:43:17 +00:00
return nil
}
func ( s * Server ) handlePublishMatrix ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
_ , err := s . handlePublishWithoutResponse ( r , v )
if err != nil {
2022-06-16 00:36:49 +00:00
return & errMatrix { pushKey : r . Header . Get ( matrixPushKeyHeader ) , err : err }
2022-06-15 00:43:17 +00:00
}
2022-06-15 20:03:12 +00:00
return writeMatrixSuccess ( w )
2021-10-23 01:26:01 +00:00
}
2022-05-28 00:30:20 +00:00
func ( s * Server ) sendToFirebase ( v * visitor , m * message ) {
2022-06-02 03:24:44 +00:00
log . Debug ( "%s Publishing to Firebase" , logMessagePrefix ( v , m ) )
2022-06-01 03:16:44 +00:00
if err := s . firebaseClient . Send ( v , m ) ; err != nil {
2022-06-03 00:59:07 +00:00
if err == errFirebaseTemporarilyBanned {
log . Debug ( "%s Unable to publish to Firebase: %v" , logMessagePrefix ( v , m ) , err . Error ( ) )
} else {
log . Warn ( "%s Unable to publish to Firebase: %v" , logMessagePrefix ( v , m ) , err . Error ( ) )
}
2022-05-28 00:30:20 +00:00
}
}
func ( s * Server ) sendEmail ( v * visitor , m * message , email string ) {
2022-06-02 03:24:44 +00:00
log . Debug ( "%s Sending email to %s" , logMessagePrefix ( v , m ) , email )
if err := s . smtpSender . Send ( v , m , email ) ; err != nil {
2022-06-02 14:50:05 +00:00
log . Warn ( "%s Unable to send email to %s: %v" , logMessagePrefix ( v , m ) , email , err . Error ( ) )
2022-05-28 00:30:20 +00:00
}
}
func ( s * Server ) forwardPollRequest ( v * visitor , m * message ) {
topicURL := fmt . Sprintf ( "%s/%s" , s . config . BaseURL , m . Topic )
topicHash := fmt . Sprintf ( "%x" , sha256 . Sum256 ( [ ] byte ( topicURL ) ) )
forwardURL := fmt . Sprintf ( "%s/%s" , s . config . UpstreamBaseURL , topicHash )
2022-06-02 03:24:44 +00:00
log . Debug ( "%s Publishing poll request to %s" , logMessagePrefix ( v , m ) , forwardURL )
2022-05-28 00:30:20 +00:00
req , err := http . NewRequest ( "POST" , forwardURL , strings . NewReader ( "" ) )
if err != nil {
2022-06-02 03:24:44 +00:00
log . Warn ( "%s Unable to publish poll request: %v" , logMessagePrefix ( v , m ) , err . Error ( ) )
2022-05-28 00:30:20 +00:00
return
}
req . Header . Set ( "X-Poll-ID" , m . ID )
2022-06-01 01:39:19 +00:00
var httpClient = & http . Client {
Timeout : time . Second * 10 ,
}
response , err := httpClient . Do ( req )
2022-05-28 00:30:20 +00:00
if err != nil {
2022-06-02 03:24:44 +00:00
log . Warn ( "%s Unable to publish poll request: %v" , logMessagePrefix ( v , m ) , err . Error ( ) )
2022-05-28 00:30:20 +00:00
return
} else if response . StatusCode != http . StatusOK {
2022-06-02 03:24:44 +00:00
log . Warn ( "%s Unable to publish poll request, unexpected HTTP status: %d" , logMessagePrefix ( v , m ) , response . StatusCode )
2022-05-28 00:30:20 +00:00
return
}
}
2022-01-17 18:28:07 +00:00
func ( s * Server ) parsePublishParams ( r * http . Request , v * visitor , m * message ) ( cache bool , firebase bool , email string , unifiedpush bool , err error ) {
2022-01-16 04:17:46 +00:00
cache = readBoolParam ( r , true , "x-cache" , "cache" )
firebase = readBoolParam ( r , true , "x-firebase" , "firebase" )
2021-12-22 08:44:16 +00:00
m . Title = readParam ( r , "x-title" , "title" , "t" )
2022-01-04 22:40:41 +00:00
m . Click = readParam ( r , "x-click" , "click" )
2022-07-17 21:47:21 +00:00
icon := readParam ( r , "x-icon" , "icon" )
2022-01-08 20:47:08 +00:00
filename := readParam ( r , "x-filename" , "filename" , "file" , "f" )
2022-01-14 17:13:14 +00:00
attach := readParam ( r , "x-attach" , "attach" , "a" )
2022-01-08 20:47:08 +00:00
if attach != "" || filename != "" {
m . Attachment = & attachment { }
}
2022-01-14 17:13:14 +00:00
if filename != "" {
m . Attachment . Name = filename
}
2022-01-08 20:47:08 +00:00
if attach != "" {
2022-07-17 21:47:21 +00:00
if ! urlRegex . MatchString ( attach ) {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestAttachmentURLInvalid
2022-01-08 20:47:08 +00:00
}
m . Attachment . URL = attach
2022-01-14 17:13:14 +00:00
if m . Attachment . Name == "" {
u , err := url . Parse ( m . Attachment . URL )
if err == nil {
m . Attachment . Name = path . Base ( u . Path )
if m . Attachment . Name == "." || m . Attachment . Name == "/" {
m . Attachment . Name = ""
}
}
}
if m . Attachment . Name == "" {
m . Attachment . Name = "attachment"
}
2022-01-08 20:47:08 +00:00
}
2022-07-17 21:47:21 +00:00
if icon != "" {
if ! urlRegex . MatchString ( icon ) {
return false , false , "" , false , errHTTPBadRequestIconURLInvalid
}
m . Icon = icon
}
2022-01-08 20:47:08 +00:00
email = readParam ( r , "x-email" , "x-e-mail" , "email" , "e-mail" , "mail" , "e" )
if email != "" {
if err := v . EmailAllowed ( ) ; err != nil {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPTooManyRequestsLimitEmails
2022-01-08 20:47:08 +00:00
}
}
2022-06-02 03:24:44 +00:00
if s . smtpSender == nil && email != "" {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestEmailDisabled
2022-01-08 20:47:08 +00:00
}
2022-02-13 14:23:55 +00:00
messageStr := strings . ReplaceAll ( readParam ( r , "x-message" , "message" , "m" ) , "\\n" , "\n" )
2021-12-15 14:41:55 +00:00
if messageStr != "" {
m . Message = messageStr
}
2021-12-17 01:33:01 +00:00
m . Priority , err = util . ParsePriority ( readParam ( r , "x-priority" , "priority" , "prio" , "p" ) )
if err != nil {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestPriorityInvalid
2021-11-27 21:12:08 +00:00
}
2021-12-22 08:44:16 +00:00
tagsStr := readParam ( r , "x-tags" , "tags" , "tag" , "ta" )
2021-11-27 21:12:08 +00:00
if tagsStr != "" {
2021-12-10 16:31:42 +00:00
m . Tags = make ( [ ] string , 0 )
2021-12-21 20:22:27 +00:00
for _ , s := range util . SplitNoEmpty ( tagsStr , "," ) {
2021-12-10 16:31:42 +00:00
m . Tags = append ( m . Tags , strings . TrimSpace ( s ) )
2021-12-07 20:39:42 +00:00
}
2021-11-27 21:12:08 +00:00
}
2021-12-15 14:41:55 +00:00
delayStr := readParam ( r , "x-delay" , "delay" , "x-at" , "at" , "x-in" , "in" )
2021-12-11 05:06:25 +00:00
if delayStr != "" {
2021-12-10 16:31:42 +00:00
if ! cache {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayNoCache
2021-12-10 16:31:42 +00:00
}
2021-12-23 23:03:04 +00:00
if email != "" {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
2021-12-23 23:03:04 +00:00
}
2021-12-11 05:06:25 +00:00
delay , err := util . ParseFutureTime ( delayStr , time . Now ( ) )
2021-12-10 16:31:42 +00:00
if err != nil {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayCannotParse
2021-12-11 05:06:25 +00:00
} else if delay . Unix ( ) < time . Now ( ) . Add ( s . config . MinDelay ) . Unix ( ) {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayTooSmall
2021-12-11 05:06:25 +00:00
} else if delay . Unix ( ) > time . Now ( ) . Add ( s . config . MaxDelay ) . Unix ( ) {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayTooLarge
2021-12-10 16:31:42 +00:00
}
2021-12-11 05:06:25 +00:00
m . Time = delay . Unix ( )
2022-10-07 21:16:20 +00:00
m . Sender = v . ip // Important for rate limiting
2021-12-10 16:31:42 +00:00
}
2022-04-16 20:17:58 +00:00
actionsStr := readParam ( r , "x-actions" , "actions" , "action" )
if actionsStr != "" {
2022-04-19 13:14:32 +00:00
m . Actions , err = parseActions ( actionsStr )
if err != nil {
2022-04-27 13:51:23 +00:00
return false , false , "" , false , wrapErrHTTP ( errHTTPBadRequestActionsInvalid , err . Error ( ) )
2022-04-17 18:29:43 +00:00
}
2022-04-16 20:17:58 +00:00
}
2022-01-17 18:28:07 +00:00
unifiedpush = readBoolParam ( r , false , "x-unifiedpush" , "unifiedpush" , "up" ) // see GET too!
2021-12-25 21:07:55 +00:00
if unifiedpush {
firebase = false
2022-01-17 18:28:07 +00:00
unifiedpush = true
2021-12-25 21:07:55 +00:00
}
2022-05-29 02:06:46 +00:00
m . PollID = readParam ( r , "x-poll-id" , "poll-id" )
2022-05-27 11:55:57 +00:00
if m . PollID != "" {
unifiedpush = false
cache = false
email = ""
}
2022-01-17 18:28:07 +00:00
return cache , firebase , email , unifiedpush , nil
2021-11-27 21:12:08 +00:00
}
2022-01-08 20:47:08 +00:00
// handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
//
2022-09-27 16:37:02 +00:00
// 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
// If a message is flagged as poll request, the body does not matter and is discarded
// 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
// If body is binary, encode as base64, if not do not encode
// 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
// Body must be a message, because we attached an external URL
// 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
// Body must be attachment, because we passed a filename
// 5. curl -T file.txt ntfy.sh/mytopic
// If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
// 6. curl -T file.txt ntfy.sh/mytopic
// If file.txt is > message limit, treat it as an attachment
2022-04-03 16:39:52 +00:00
func ( s * Server ) handlePublishBody ( r * http . Request , v * visitor , m * message , body * util . PeekedReadCloser , unifiedpush bool ) error {
2022-05-28 00:30:20 +00:00
if m . Event == pollRequestEvent { // Case 1
2022-06-02 03:24:44 +00:00
return s . handleBodyDiscard ( body )
2022-05-27 11:55:57 +00:00
} else if unifiedpush {
2022-05-28 00:30:20 +00:00
return s . handleBodyAsMessageAutoDetect ( m , body ) // Case 2
2022-01-17 18:28:07 +00:00
} else if m . Attachment != nil && m . Attachment . URL != "" {
2022-05-28 00:30:20 +00:00
return s . handleBodyAsTextMessage ( m , body ) // Case 3
2022-01-08 20:47:08 +00:00
} else if m . Attachment != nil && m . Attachment . Name != "" {
2022-05-28 00:30:20 +00:00
return s . handleBodyAsAttachment ( r , v , m , body ) // Case 4
2022-04-03 16:39:52 +00:00
} else if ! body . LimitReached && utf8 . Valid ( body . PeekedBytes ) {
2022-05-28 00:30:20 +00:00
return s . handleBodyAsTextMessage ( m , body ) // Case 5
2022-01-08 20:47:08 +00:00
}
2022-05-28 00:30:20 +00:00
return s . handleBodyAsAttachment ( r , v , m , body ) // Case 6
2022-01-17 18:28:07 +00:00
}
2022-06-02 03:24:44 +00:00
func ( s * Server ) handleBodyDiscard ( body * util . PeekedReadCloser ) error {
_ , err := io . Copy ( io . Discard , body )
_ = body . Close ( )
return err
}
2022-04-03 16:39:52 +00:00
func ( s * Server ) handleBodyAsMessageAutoDetect ( m * message , body * util . PeekedReadCloser ) error {
if utf8 . Valid ( body . PeekedBytes ) {
m . Message = string ( body . PeekedBytes ) // Do not trim
2022-01-17 18:28:07 +00:00
} else {
2022-04-03 16:39:52 +00:00
m . Message = base64 . StdEncoding . EncodeToString ( body . PeekedBytes )
2022-01-17 18:28:07 +00:00
m . Encoding = encodingBase64
}
return nil
2022-01-08 20:47:08 +00:00
}
2022-04-03 16:39:52 +00:00
func ( s * Server ) handleBodyAsTextMessage ( m * message , body * util . PeekedReadCloser ) error {
if ! utf8 . Valid ( body . PeekedBytes ) {
2022-01-08 20:47:08 +00:00
return errHTTPBadRequestMessageNotUTF8
}
2022-04-03 16:39:52 +00:00
if len ( body . PeekedBytes ) > 0 { // Empty body should not override message (publish via GET!)
m . Message = strings . TrimSpace ( string ( body . PeekedBytes ) ) // Truncates the message to the peek limit if required
2022-01-08 20:47:08 +00:00
}
2022-01-10 18:38:51 +00:00
if m . Attachment != nil && m . Attachment . Name != "" && m . Message == "" {
m . Message = fmt . Sprintf ( defaultAttachmentMessage , m . Attachment . Name )
}
2022-01-08 20:47:08 +00:00
return nil
}
2022-04-03 16:39:52 +00:00
func ( s * Server ) handleBodyAsAttachment ( r * http . Request , v * visitor , m * message , body * util . PeekedReadCloser ) error {
2022-01-12 16:05:04 +00:00
if s . fileCache == nil || s . config . BaseURL == "" || s . config . AttachmentCacheDir == "" {
2022-01-08 20:47:08 +00:00
return errHTTPBadRequestAttachmentsDisallowed
} else if m . Time > time . Now ( ) . Add ( s . config . AttachmentExpiryDuration ) . Unix ( ) {
return errHTTPBadRequestAttachmentsExpiryBeforeDelivery
}
2022-12-21 02:18:33 +00:00
stats , err := v . Stats ( )
2022-01-11 17:58:11 +00:00
if err != nil {
return err
}
contentLengthStr := r . Header . Get ( "Content-Length" )
if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
contentLength , err := strconv . ParseInt ( contentLengthStr , 10 , 64 )
2022-12-21 02:18:33 +00:00
if err == nil && ( contentLength > stats . AttachmentTotalSizeRemaining || contentLength > stats . AttachmentFileSizeLimit ) {
2022-04-02 21:06:26 +00:00
return errHTTPEntityTooLargeAttachmentTooLarge
2022-01-11 17:58:11 +00:00
}
}
2022-01-08 20:47:08 +00:00
if m . Attachment == nil {
m . Attachment = & attachment { }
}
2022-01-10 18:38:51 +00:00
var ext string
2022-06-01 01:39:19 +00:00
m . Sender = v . ip // Important for attachment rate limiting
2022-01-08 20:47:08 +00:00
m . Attachment . Expires = time . Now ( ) . Add ( s . config . AttachmentExpiryDuration ) . Unix ( )
2022-04-03 16:39:52 +00:00
m . Attachment . Type , ext = util . DetectContentType ( body . PeekedBytes , m . Attachment . Name )
2022-01-08 20:47:08 +00:00
m . Attachment . URL = fmt . Sprintf ( "%s/file/%s%s" , s . config . BaseURL , m . ID , ext )
if m . Attachment . Name == "" {
m . Attachment . Name = fmt . Sprintf ( "attachment%s" , ext )
}
if m . Message == "" {
2022-01-10 18:38:51 +00:00
m . Message = fmt . Sprintf ( defaultAttachmentMessage , m . Attachment . Name )
2022-01-03 23:55:08 +00:00
}
2022-12-21 18:19:07 +00:00
limiters := [ ] util . Limiter {
v . BandwidthLimiter ( ) ,
util . NewFixedLimiter ( stats . AttachmentFileSizeLimit ) ,
util . NewFixedLimiter ( stats . AttachmentTotalSizeRemaining ) ,
}
m . Attachment . Size , err = s . fileCache . Write ( m . ID , body , limiters ... )
2022-01-07 13:49:28 +00:00
if err == util . ErrLimitReached {
2022-04-02 21:06:26 +00:00
return errHTTPEntityTooLargeAttachmentTooLarge
2022-01-07 13:49:28 +00:00
} else if err != nil {
2022-01-02 22:56:12 +00:00
return err
}
return nil
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribeJSON ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-10-27 18:56:17 +00:00
encoder := func ( msg * message ) ( string , error ) {
var buf bytes . Buffer
if err := json . NewEncoder ( & buf ) . Encode ( & msg ) ; err != nil {
return "" , err
2021-10-23 01:26:01 +00:00
}
2021-10-27 18:56:17 +00:00
return buf . String ( ) , nil
2021-10-23 01:26:01 +00:00
}
2022-01-16 04:17:46 +00:00
return s . handleSubscribeHTTP ( w , r , v , "application/x-ndjson" , encoder )
2021-10-23 01:26:01 +00:00
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribeSSE ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-10-27 18:56:17 +00:00
encoder := func ( msg * message ) ( string , error ) {
2021-10-23 17:21:33 +00:00
var buf bytes . Buffer
if err := json . NewEncoder ( & buf ) . Encode ( & msg ) ; err != nil {
2021-10-27 18:56:17 +00:00
return "" , err
2021-10-23 17:21:33 +00:00
}
2021-10-29 12:29:27 +00:00
if msg . Event != messageEvent {
2021-10-27 18:56:17 +00:00
return fmt . Sprintf ( "event: %s\ndata: %s\n" , msg . Event , buf . String ( ) ) , nil // Browser's .onmessage() does not fire on this!
2021-10-23 17:21:33 +00:00
}
2021-10-27 18:56:17 +00:00
return fmt . Sprintf ( "data: %s\n" , buf . String ( ) ) , nil
2021-10-23 19:22:17 +00:00
}
2022-01-16 04:17:46 +00:00
return s . handleSubscribeHTTP ( w , r , v , "text/event-stream" , encoder )
2021-10-23 17:21:33 +00:00
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribeRaw ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-10-27 18:56:17 +00:00
encoder := func ( msg * message ) ( string , error ) {
2021-11-02 18:10:56 +00:00
if msg . Event == messageEvent { // only handle default events
2021-10-27 18:56:17 +00:00
return strings . ReplaceAll ( msg . Message , "\n" , " " ) + "\n" , nil
}
return "\n" , nil // "keepalive" and "open" events just send an empty line
}
2022-01-16 04:17:46 +00:00
return s . handleSubscribeHTTP ( w , r , v , "text/plain" , encoder )
2021-10-27 18:56:17 +00:00
}
2022-01-16 04:17:46 +00:00
func ( s * Server ) handleSubscribeHTTP ( w http . ResponseWriter , r * http . Request , v * visitor , contentType string , encoder messageEncoder ) error {
2022-06-02 03:24:44 +00:00
log . Debug ( "%s HTTP stream connection opened" , logHTTPPrefix ( v , r ) )
defer log . Debug ( "%s HTTP stream connection closed" , logHTTPPrefix ( v , r ) )
2021-12-25 14:15:05 +00:00
if err := v . SubscriptionAllowed ( ) ; err != nil {
return errHTTPTooManyRequestsLimitSubscriptions
2021-11-01 19:21:38 +00:00
}
defer v . RemoveSubscription ( )
2022-01-16 04:17:46 +00:00
topics , topicsStr , err := s . topicsFromPath ( r . URL . Path )
2021-11-01 20:39:40 +00:00
if err != nil {
return err
}
2022-01-16 04:17:46 +00:00
poll , since , scheduled , filters , err := parseSubscribeParams ( r )
2021-12-21 20:22:27 +00:00
if err != nil {
return err
}
2021-12-22 08:44:16 +00:00
var wlock sync . Mutex
2022-06-22 17:47:54 +00:00
defer func ( ) {
// Hack: This is the fix for a horrible data race that I have not been able to figure out in quite some time.
// It appears to be happening when the Go HTTP code reads from the socket when closing the request (i.e. AFTER
// this function returns), and causes a data race with the ResponseWriter. Locking wlock here silences the
// data race detector. See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889.
wlock . TryLock ( )
} ( )
2022-06-01 00:38:56 +00:00
sub := func ( v * visitor , msg * message ) error {
2022-01-16 04:17:46 +00:00
if ! filters . Pass ( msg ) {
2021-12-21 20:22:27 +00:00
return nil
}
2021-10-27 18:56:17 +00:00
m , err := encoder ( msg )
if err != nil {
return err
}
2021-12-21 20:22:27 +00:00
wlock . Lock ( )
defer wlock . Unlock ( )
2021-10-27 18:56:17 +00:00
if _ , err := w . Write ( [ ] byte ( m ) ) ; err != nil {
2021-10-24 01:29:45 +00:00
return err
}
if fl , ok := w . ( http . Flusher ) ; ok {
fl . Flush ( )
}
return nil
2021-10-27 18:56:17 +00:00
}
2021-11-07 18:08:03 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
w . Header ( ) . Set ( "Content-Type" , contentType + "; charset=utf-8" ) // Android/Volley client needs charset!
2021-10-29 17:58:14 +00:00
if poll {
2022-06-01 00:38:56 +00:00
return s . sendOldMessages ( topics , since , scheduled , v , sub )
2021-10-29 17:58:14 +00:00
}
2021-11-15 12:56:58 +00:00
subscriberIDs := make ( [ ] int , 0 )
for _ , t := range topics {
subscriberIDs = append ( subscriberIDs , t . Subscribe ( sub ) )
}
defer func ( ) {
for i , subscriberID := range subscriberIDs {
topics [ i ] . Unsubscribe ( subscriberID ) // Order!
}
} ( )
2022-06-01 00:38:56 +00:00
if err := sub ( v , newOpenMessage ( topicsStr ) ) ; err != nil { // Send out open message
2021-10-29 17:58:14 +00:00
return err
}
2022-06-01 00:38:56 +00:00
if err := s . sendOldMessages ( topics , since , scheduled , v , sub ) ; err != nil {
2021-10-27 18:56:17 +00:00
return err
}
for {
select {
case <- r . Context ( ) . Done ( ) :
return nil
case <- time . After ( s . config . KeepaliveInterval ) :
2022-06-02 03:24:44 +00:00
log . Trace ( "%s Sending keepalive message" , logHTTPPrefix ( v , r ) )
2021-11-01 19:21:38 +00:00
v . Keepalive ( )
2022-06-01 00:38:56 +00:00
if err := sub ( v , newKeepaliveMessage ( topicsStr ) ) ; err != nil { // Send keepalive message
2021-10-27 18:56:17 +00:00
return err
}
}
2021-10-24 01:29:45 +00:00
}
}
2022-01-15 18:23:35 +00:00
func ( s * Server ) handleSubscribeWS ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-04-29 17:23:04 +00:00
if strings . ToLower ( r . Header . Get ( "Upgrade" ) ) != "websocket" {
2022-01-16 22:54:15 +00:00
return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
}
2022-01-15 18:23:35 +00:00
if err := v . SubscriptionAllowed ( ) ; err != nil {
return errHTTPTooManyRequestsLimitSubscriptions
}
defer v . RemoveSubscription ( )
2022-06-02 03:24:44 +00:00
log . Debug ( "%s WebSocket connection opened" , logHTTPPrefix ( v , r ) )
defer log . Debug ( "%s WebSocket connection closed" , logHTTPPrefix ( v , r ) )
2022-01-16 04:17:46 +00:00
topics , topicsStr , err := s . topicsFromPath ( r . URL . Path )
2022-01-15 18:23:35 +00:00
if err != nil {
return err
}
2022-01-16 04:17:46 +00:00
poll , since , scheduled , filters , err := parseSubscribeParams ( r )
2022-01-15 18:23:35 +00:00
if err != nil {
return err
}
upgrader := & websocket . Upgrader {
ReadBufferSize : wsBufferSize ,
WriteBufferSize : wsBufferSize ,
CheckOrigin : func ( r * http . Request ) bool {
return true // We're open for business!
} ,
}
conn , err := upgrader . Upgrade ( w , r , nil )
if err != nil {
return err
}
defer conn . Close ( )
2022-01-16 05:07:32 +00:00
var wlock sync . Mutex
2022-01-15 18:23:35 +00:00
g , ctx := errgroup . WithContext ( context . Background ( ) )
g . Go ( func ( ) error {
pongWait := s . config . KeepaliveInterval + wsPongWait
conn . SetReadLimit ( wsReadLimit )
if err := conn . SetReadDeadline ( time . Now ( ) . Add ( pongWait ) ) ; err != nil {
return err
}
conn . SetPongHandler ( func ( appData string ) error {
2022-06-02 03:24:44 +00:00
log . Trace ( "%s Received WebSocket pong" , logHTTPPrefix ( v , r ) )
2022-01-15 18:23:35 +00:00
return conn . SetReadDeadline ( time . Now ( ) . Add ( pongWait ) )
} )
for {
_ , _ , err := conn . NextReader ( )
if err != nil {
return err
}
}
} )
g . Go ( func ( ) error {
ping := func ( ) error {
2022-01-16 05:07:32 +00:00
wlock . Lock ( )
defer wlock . Unlock ( )
2022-01-15 18:23:35 +00:00
if err := conn . SetWriteDeadline ( time . Now ( ) . Add ( wsWriteWait ) ) ; err != nil {
return err
}
2022-06-02 03:24:44 +00:00
log . Trace ( "%s Sending WebSocket ping" , logHTTPPrefix ( v , r ) )
2022-01-15 18:23:35 +00:00
return conn . WriteMessage ( websocket . PingMessage , nil )
}
for {
select {
case <- ctx . Done ( ) :
return nil
case <- time . After ( s . config . KeepaliveInterval ) :
v . Keepalive ( )
if err := ping ( ) ; err != nil {
return err
}
}
}
} )
2022-06-01 00:38:56 +00:00
sub := func ( v * visitor , msg * message ) error {
2022-01-16 04:17:46 +00:00
if ! filters . Pass ( msg ) {
2022-01-15 18:23:35 +00:00
return nil
}
2022-01-16 05:07:32 +00:00
wlock . Lock ( )
defer wlock . Unlock ( )
2022-01-15 18:23:35 +00:00
if err := conn . SetWriteDeadline ( time . Now ( ) . Add ( wsWriteWait ) ) ; err != nil {
return err
}
return conn . WriteJSON ( msg )
}
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
if poll {
2022-06-01 00:38:56 +00:00
return s . sendOldMessages ( topics , since , scheduled , v , sub )
2022-01-15 18:23:35 +00:00
}
subscriberIDs := make ( [ ] int , 0 )
for _ , t := range topics {
subscriberIDs = append ( subscriberIDs , t . Subscribe ( sub ) )
}
defer func ( ) {
for i , subscriberID := range subscriberIDs {
topics [ i ] . Unsubscribe ( subscriberID ) // Order!
}
} ( )
2022-06-01 00:38:56 +00:00
if err := sub ( v , newOpenMessage ( topicsStr ) ) ; err != nil { // Send out open message
2022-01-15 18:23:35 +00:00
return err
}
2022-06-01 00:38:56 +00:00
if err := s . sendOldMessages ( topics , since , scheduled , v , sub ) ; err != nil {
2022-01-15 18:23:35 +00:00
return err
}
2022-01-16 03:33:35 +00:00
err = g . Wait ( )
2022-06-02 03:24:44 +00:00
if err != nil && websocket . IsCloseError ( err , websocket . CloseNormalClosure , websocket . CloseGoingAway , websocket . CloseAbnormalClosure ) {
log . Trace ( "%s WebSocket connection closed: %s" , logHTTPPrefix ( v , r ) , err . Error ( ) )
return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
2022-01-16 03:33:35 +00:00
}
return err
2022-01-15 18:23:35 +00:00
}
2022-02-26 20:57:10 +00:00
func parseSubscribeParams ( r * http . Request ) ( poll bool , since sinceMarker , scheduled bool , filters * queryFilter , err error ) {
2022-01-16 04:17:46 +00:00
poll = readBoolParam ( r , false , "x-poll" , "poll" , "po" )
scheduled = readBoolParam ( r , false , "x-scheduled" , "scheduled" , "sched" )
since , err = parseSince ( r , poll )
if err != nil {
return
2021-12-21 20:22:27 +00:00
}
2022-01-16 04:17:46 +00:00
filters , err = parseQueryFilters ( r )
if err != nil {
return
2021-12-21 20:22:27 +00:00
}
2022-01-16 04:17:46 +00:00
return
2021-12-21 20:22:27 +00:00
}
2022-06-20 16:11:52 +00:00
// sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
// marker, returning only messages that are newer than the marker.
2022-06-01 00:38:56 +00:00
func ( s * Server ) sendOldMessages ( topics [ ] * topic , since sinceMarker , scheduled bool , v * visitor , sub subscriber ) error {
2021-11-08 14:46:31 +00:00
if since . IsNone ( ) {
2021-10-29 17:58:14 +00:00
return nil
}
2022-06-20 16:11:52 +00:00
messages := make ( [ ] * message , 0 )
2021-11-15 12:56:58 +00:00
for _ , t := range topics {
2022-06-20 16:11:52 +00:00
topicMessages , err := s . messageCache . Messages ( t . ID , since , scheduled )
2021-11-15 12:56:58 +00:00
if err != nil {
2021-10-29 17:58:14 +00:00
return err
}
2022-06-20 16:11:52 +00:00
messages = append ( messages , topicMessages ... )
}
sort . Slice ( messages , func ( i , j int ) bool {
return messages [ i ] . Time < messages [ j ] . Time
} )
for _ , m := range messages {
if err := sub ( v , m ) ; err != nil {
return err
2021-11-15 12:56:58 +00:00
}
2021-10-29 17:58:14 +00:00
}
2021-10-24 17:34:15 +00:00
return nil
}
2021-11-08 14:46:31 +00:00
// parseSince returns a timestamp identifying the time span from which cached messages should be received.
//
// Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
// "all" for all messages.
2022-02-26 20:57:10 +00:00
func parseSince ( r * http . Request , poll bool ) ( sinceMarker , error ) {
2021-12-22 08:44:16 +00:00
since := readParam ( r , "x-since" , "since" , "si" )
2022-02-26 20:57:10 +00:00
// Easy cases (empty, all, none)
2021-12-22 08:44:16 +00:00
if since == "" {
if poll {
2021-11-08 14:46:31 +00:00
return sinceAllMessages , nil
}
return sinceNoMessages , nil
2022-02-26 20:57:10 +00:00
} else if since == "all" {
2021-11-08 14:46:31 +00:00
return sinceAllMessages , nil
2022-02-26 20:57:10 +00:00
} else if since == "none" {
return sinceNoMessages , nil
}
// ID, timestamp, duration
if validMessageID ( since ) {
return newSinceID ( since ) , nil
2021-12-22 08:44:16 +00:00
} else if s , err := strconv . ParseInt ( since , 10 , 64 ) ; err == nil {
2022-02-26 20:57:10 +00:00
return newSinceTime ( s ) , nil
2021-12-22 08:44:16 +00:00
} else if d , err := time . ParseDuration ( since ) ; err == nil {
2022-02-26 20:57:10 +00:00
return newSinceTime ( time . Now ( ) . Add ( - 1 * d ) . Unix ( ) ) , nil
2021-10-29 17:58:14 +00:00
}
2021-12-25 14:15:05 +00:00
return sinceNoMessages , errHTTPBadRequestSinceInvalid
2021-10-29 17:58:14 +00:00
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleOptions ( w http . ResponseWriter , _ * http . Request , _ * visitor ) error {
2022-12-25 16:41:38 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Methods" , "GET, PUT, POST, PATCH, DELETE" )
2022-03-11 03:58:24 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
w . Header ( ) . Set ( "Access-Control-Allow-Headers" , "*" ) // CORS, allow auth via JS // FIXME is this terrible?
2021-10-24 18:22:53 +00:00
return nil
}
2021-12-15 14:41:55 +00:00
func ( s * Server ) topicFromPath ( path string ) ( * topic , error ) {
parts := strings . Split ( path , "/" )
if len ( parts ) < 2 {
2021-12-25 14:15:05 +00:00
return nil , errHTTPBadRequestTopicInvalid
2021-12-15 14:41:55 +00:00
}
topics , err := s . topicsFromIDs ( parts [ 1 ] )
2021-11-15 12:56:58 +00:00
if err != nil {
return nil , err
}
return topics [ 0 ] , nil
}
2022-01-16 04:17:46 +00:00
func ( s * Server ) topicsFromPath ( path string ) ( [ ] * topic , string , error ) {
parts := strings . Split ( path , "/" )
if len ( parts ) < 2 {
return nil , "" , errHTTPBadRequestTopicInvalid
}
topicIDs := util . SplitNoEmpty ( parts [ 1 ] , "," )
topics , err := s . topicsFromIDs ( topicIDs ... )
if err != nil {
return nil , "" , errHTTPBadRequestTopicInvalid
}
return topics , parts [ 1 ] , nil
}
2021-11-27 21:12:08 +00:00
func ( s * Server ) topicsFromIDs ( ids ... string ) ( [ ] * topic , error ) {
2021-10-23 01:26:01 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2021-11-15 12:56:58 +00:00
topics := make ( [ ] * topic , 0 )
2021-11-27 21:12:08 +00:00
for _ , id := range ids {
2022-10-01 19:50:48 +00:00
if util . Contains ( disallowedTopics , id ) {
2021-12-25 14:15:05 +00:00
return nil , errHTTPBadRequestTopicDisallowed
2021-12-09 03:13:59 +00:00
}
2021-11-15 12:56:58 +00:00
if _ , ok := s . topics [ id ] ; ! ok {
2022-01-02 22:56:12 +00:00
if len ( s . topics ) >= s . config . TotalTopicLimit {
2022-01-12 16:05:04 +00:00
return nil , errHTTPTooManyRequestsLimitTotalTopics
2021-11-15 12:56:58 +00:00
}
2021-12-09 03:57:31 +00:00
s . topics [ id ] = newTopic ( id )
2021-10-29 17:58:14 +00:00
}
2021-11-15 12:56:58 +00:00
topics = append ( topics , s . topics [ id ] )
2021-10-23 01:26:01 +00:00
}
2021-11-15 12:56:58 +00:00
return topics , nil
2021-10-23 01:26:01 +00:00
}
2021-12-11 03:57:01 +00:00
func ( s * Server ) updateStatsAndPrune ( ) {
2022-06-22 00:07:08 +00:00
log . Debug ( "Manager: Starting" )
defer log . Debug ( "Manager: Finished" )
2022-06-21 23:07:27 +00:00
// WARNING: Make sure to only selectively lock with the mutex, and be aware that this
// there is no mutex for the entire function.
2021-10-29 17:58:14 +00:00
// Expire visitors from rate visitors map
2022-06-21 23:07:27 +00:00
s . mu . Lock ( )
2022-06-02 03:24:44 +00:00
staleVisitors := 0
2021-10-29 17:58:14 +00:00
for ip , v := range s . visitors {
2021-11-01 19:21:38 +00:00
if v . Stale ( ) {
2022-06-21 23:45:23 +00:00
log . Trace ( "Deleting stale visitor %s" , v . ip )
2021-10-29 17:58:14 +00:00
delete ( s . visitors , ip )
2022-06-02 03:24:44 +00:00
staleVisitors ++
2021-10-29 17:58:14 +00:00
}
2021-10-23 01:26:01 +00:00
}
2022-06-21 23:07:27 +00:00
s . mu . Unlock ( )
2022-06-02 03:24:44 +00:00
log . Debug ( "Manager: Deleted %d stale visitor(s)" , staleVisitors )
2021-10-24 01:29:45 +00:00
2022-12-25 16:41:38 +00:00
// Delete expired user tokens
if err := s . userManager . RemoveExpiredTokens ( ) ; err != nil {
log . Warn ( "Error expiring user tokens: %s" , err . Error ( ) )
}
2022-01-07 14:15:33 +00:00
// Delete expired attachments
2022-07-08 14:00:04 +00:00
if s . fileCache != nil && s . config . AttachmentExpiryDuration > 0 {
olderThan := time . Now ( ) . Add ( - 1 * s . config . AttachmentExpiryDuration )
ids , err := s . fileCache . Expired ( olderThan )
2022-06-01 20:57:35 +00:00
if err != nil {
log . Warn ( "Error retrieving expired attachments: %s" , err . Error ( ) )
} else if len ( ids ) > 0 {
2022-06-02 03:24:44 +00:00
log . Debug ( "Manager: Deleting expired attachments: %v" , ids )
2022-01-08 17:14:43 +00:00
if err := s . fileCache . Remove ( ids ... ) ; err != nil {
2022-05-30 02:14:14 +00:00
log . Warn ( "Error deleting attachments: %s" , err . Error ( ) )
2022-01-08 17:14:43 +00:00
}
} else {
2022-06-02 03:24:44 +00:00
log . Debug ( "Manager: No expired attachments to delete" )
2022-01-07 14:15:33 +00:00
}
}
2021-12-11 03:57:01 +00:00
// Prune message cache
2021-12-09 03:57:31 +00:00
olderThan := time . Now ( ) . Add ( - 1 * s . config . CacheDuration )
2022-06-02 03:24:44 +00:00
log . Debug ( "Manager: Pruning messages older than %s" , olderThan . Format ( "2006-01-02 15:04:05" ) )
2022-02-27 19:47:28 +00:00
if err := s . messageCache . Prune ( olderThan ) ; err != nil {
2022-06-02 03:24:44 +00:00
log . Warn ( "Manager: Error pruning cache: %s" , err . Error ( ) )
2021-11-02 18:08:21 +00:00
}
2022-06-21 23:07:27 +00:00
// Message count per topic
var messages int
messageCounts , err := s . messageCache . MessageCounts ( )
if err != nil {
log . Warn ( "Manager: Cannot get message counts: %s" , err . Error ( ) )
messageCounts = make ( map [ string ] int ) // Empty, so we can continue
}
for _ , count := range messageCounts {
messages += count
}
2022-06-21 23:45:23 +00:00
// Remove subscriptions without subscribers
2022-06-21 23:07:27 +00:00
s . mu . Lock ( )
var subscribers int
2021-10-29 17:58:14 +00:00
for _ , t := range s . topics {
2022-06-21 23:45:23 +00:00
subs := t . SubscribersCount ( )
2022-06-21 23:07:27 +00:00
msgs , exists := messageCounts [ t . ID ]
if subs == 0 && ( ! exists || msgs == 0 ) {
2022-06-21 23:45:23 +00:00
log . Trace ( "Deleting empty topic %s" , t . ID )
2021-12-09 03:57:31 +00:00
delete ( s . topics , t . ID )
2021-11-03 01:09:49 +00:00
continue
2021-10-29 17:58:14 +00:00
}
subscribers += subs
2021-10-24 01:29:45 +00:00
}
2022-06-21 23:07:27 +00:00
s . mu . Unlock ( )
2021-11-03 01:09:49 +00:00
2021-12-27 21:18:15 +00:00
// Mail stats
2022-06-02 03:24:44 +00:00
var receivedMailTotal , receivedMailSuccess , receivedMailFailure int64
if s . smtpServerBackend != nil {
receivedMailTotal , receivedMailSuccess , receivedMailFailure = s . smtpServerBackend . Counts ( )
}
var sentMailTotal , sentMailSuccess , sentMailFailure int64
if s . smtpSender != nil {
sentMailTotal , sentMailSuccess , sentMailFailure = s . smtpSender . Counts ( )
2021-12-27 21:18:15 +00:00
}
2021-12-27 21:06:40 +00:00
2021-11-03 01:09:49 +00:00
// Print stats
2022-06-22 19:11:50 +00:00
s . mu . Lock ( )
messagesCount , topicsCount , visitorsCount := s . messages , len ( s . topics ) , len ( s . visitors )
s . mu . Unlock ( )
2022-06-02 03:24:44 +00:00
log . Info ( "Stats: %d messages published, %d in cache, %d topic(s) active, %d subscriber(s), %d visitor(s), %d mails received (%d successful, %d failed), %d mails sent (%d successful, %d failed)" ,
2022-06-22 19:11:50 +00:00
messagesCount , messages , topicsCount , subscribers , visitorsCount ,
2022-06-02 03:24:44 +00:00
receivedMailTotal , receivedMailSuccess , receivedMailFailure ,
sentMailTotal , sentMailSuccess , sentMailFailure )
2021-10-24 01:29:45 +00:00
}
2021-10-24 02:49:50 +00:00
2021-12-27 21:06:40 +00:00
func ( s * Server ) runSMTPServer ( ) error {
2022-06-02 03:24:44 +00:00
s . smtpServerBackend = newMailBackend ( s . config , s . handle )
s . smtpServer = smtp . NewServer ( s . smtpServerBackend )
2021-12-27 21:06:40 +00:00
s . smtpServer . Addr = s . config . SMTPServerListen
s . smtpServer . Domain = s . config . SMTPServerDomain
s . smtpServer . ReadTimeout = 10 * time . Second
s . smtpServer . WriteTimeout = 10 * time . Second
2021-12-28 00:26:20 +00:00
s . smtpServer . MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
2021-12-27 21:06:40 +00:00
s . smtpServer . MaxRecipients = 1
s . smtpServer . AllowInsecureAuth = true
return s . smtpServer . ListenAndServe ( )
2021-12-27 14:48:09 +00:00
}
2021-12-15 14:13:16 +00:00
func ( s * Server ) runManager ( ) {
2021-12-22 13:17:50 +00:00
for {
select {
case <- time . After ( s . config . ManagerInterval ) :
2021-12-15 14:13:16 +00:00
s . updateStatsAndPrune ( )
2021-12-22 13:17:50 +00:00
case <- s . closeChan :
return
2021-12-15 14:13:16 +00:00
}
2021-12-22 13:17:50 +00:00
}
2021-12-15 14:13:16 +00:00
}
2022-01-21 19:17:59 +00:00
func ( s * Server ) runFirebaseKeepaliver ( ) {
2022-06-01 03:16:44 +00:00
if s . firebaseClient == nil {
2021-12-15 14:13:16 +00:00
return
}
2022-12-02 20:37:48 +00:00
v := newVisitor ( s . config , s . messageCache , netip . IPv4Unspecified ( ) , nil ) // Background process, not a real visitor, uses IP 0.0.0.0
2021-12-15 14:13:16 +00:00
for {
2021-12-22 13:17:50 +00:00
select {
case <- time . After ( s . config . FirebaseKeepaliveInterval ) :
2022-06-01 03:16:44 +00:00
s . sendToFirebase ( v , newKeepaliveMessage ( firebaseControlTopic ) )
2022-05-26 01:39:46 +00:00
case <- time . After ( s . config . FirebasePollInterval ) :
2022-06-01 03:16:44 +00:00
s . sendToFirebase ( v , newKeepaliveMessage ( firebasePollTopic ) )
2021-12-22 13:17:50 +00:00
case <- s . closeChan :
return
2021-12-15 14:13:16 +00:00
}
}
}
2021-12-22 13:17:50 +00:00
2022-06-02 03:24:44 +00:00
func ( s * Server ) runDelayedSender ( ) {
for {
select {
case <- time . After ( s . config . DelayedSenderInterval ) :
if err := s . sendDelayedMessages ( ) ; err != nil {
log . Warn ( "Error sending delayed messages: %s" , err . Error ( ) )
}
case <- s . closeChan :
return
}
}
}
2021-12-10 16:31:42 +00:00
func ( s * Server ) sendDelayedMessages ( ) error {
2022-02-27 19:47:28 +00:00
messages , err := s . messageCache . MessagesDue ( )
2021-12-10 16:31:42 +00:00
if err != nil {
return err
}
for _ , m := range messages {
2022-12-22 02:55:39 +00:00
var v * visitor
if m . User != "" {
2022-12-25 16:41:38 +00:00
user , err := s . userManager . User ( m . User )
2022-12-22 02:55:39 +00:00
if err != nil {
log . Warn ( "%s Error sending delayed message: %s" , logMessagePrefix ( v , m ) , err . Error ( ) )
continue
}
v = s . visitorFromUser ( user , m . Sender )
} else {
v = s . visitorFromIP ( m . Sender )
}
2022-06-01 00:38:56 +00:00
if err := s . sendDelayedMessage ( v , m ) ; err != nil {
2022-06-02 03:24:44 +00:00
log . Warn ( "%s Error sending delayed message: %s" , logMessagePrefix ( v , m ) , err . Error ( ) )
2022-01-10 20:36:12 +00:00
}
2022-06-01 00:38:56 +00:00
}
return nil
}
func ( s * Server ) sendDelayedMessage ( v * visitor , m * message ) error {
2022-06-02 03:24:44 +00:00
log . Debug ( "%s Sending delayed message" , logMessagePrefix ( v , m ) )
2022-06-22 00:07:08 +00:00
s . mu . Lock ( )
2022-06-01 00:38:56 +00:00
t , ok := s . topics [ m . Topic ] // If no subscribers, just mark message as published
2022-06-22 00:07:08 +00:00
s . mu . Unlock ( )
2022-06-01 00:38:56 +00:00
if ok {
2022-06-01 01:39:19 +00:00
go func ( ) {
// We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
if err := t . Publish ( v , m ) ; err != nil {
2022-06-02 03:24:44 +00:00
log . Warn ( "%s Unable to publish message: %v" , logMessagePrefix ( v , m ) , err . Error ( ) )
2021-12-10 16:31:42 +00:00
}
2022-06-01 01:39:19 +00:00
} ( )
2022-06-01 00:38:56 +00:00
}
2022-06-01 03:16:44 +00:00
if s . firebaseClient != nil { // Firebase subscribers may not show up in topics map
2022-06-01 01:39:19 +00:00
go s . sendToFirebase ( v , m )
}
if s . config . UpstreamBaseURL != "" {
go s . forwardPollRequest ( v , m )
2021-12-10 16:31:42 +00:00
}
2022-06-01 00:38:56 +00:00
if err := s . messageCache . MarkPublished ( m ) ; err != nil {
return err
2021-12-10 16:31:42 +00:00
}
return nil
}
2022-01-22 03:22:27 +00:00
func ( s * Server ) limitRequests ( next handleFunc ) handleFunc {
return func ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-10-07 21:16:20 +00:00
if util . ContainsIP ( s . config . VisitorRequestExemptIPAddrs , v . ip ) {
2022-02-14 21:09:59 +00:00
return next ( w , r , v )
} else if err := v . RequestAllowed ( ) ; err != nil {
2022-01-22 03:22:27 +00:00
return errHTTPTooManyRequestsLimitRequests
}
return next ( w , r , v )
}
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) ensureWebEnabled ( next handleFunc ) handleFunc {
return func ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
if ! s . config . EnableWeb {
return errHTTPNotFound
}
return next ( w , r , v )
}
}
2022-04-03 16:39:52 +00:00
// transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
2022-03-16 18:16:54 +00:00
// before passing it on to the next handler. This is meant to be used in combination with handlePublish.
func ( s * Server ) transformBodyJSON ( next handleFunc ) handleFunc {
2022-03-15 20:00:59 +00:00
return func ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-04-03 16:39:52 +00:00
body , err := util . Peek ( r . Body , s . config . MessageLimit )
2022-03-15 20:00:59 +00:00
if err != nil {
return err
}
defer r . Body . Close ( )
var m publishMessage
if err := json . NewDecoder ( body ) . Decode ( & m ) ; err != nil {
2022-03-16 18:16:54 +00:00
return errHTTPBadRequestJSONInvalid
2022-03-15 20:00:59 +00:00
}
if ! topicRegex . MatchString ( m . Topic ) {
2022-03-16 18:16:54 +00:00
return errHTTPBadRequestTopicInvalid
2022-03-15 20:00:59 +00:00
}
if m . Message == "" {
m . Message = emptyMessageBody
}
r . URL . Path = "/" + m . Topic
r . Body = io . NopCloser ( strings . NewReader ( m . Message ) )
if m . Title != "" {
r . Header . Set ( "X-Title" , m . Title )
}
2022-03-16 18:16:54 +00:00
if m . Priority != 0 {
r . Header . Set ( "X-Priority" , fmt . Sprintf ( "%d" , m . Priority ) )
2022-03-15 20:00:59 +00:00
}
2022-03-16 18:16:54 +00:00
if m . Tags != nil && len ( m . Tags ) > 0 {
r . Header . Set ( "X-Tags" , strings . Join ( m . Tags , "," ) )
2022-03-15 20:00:59 +00:00
}
if m . Attach != "" {
r . Header . Set ( "X-Attach" , m . Attach )
}
if m . Filename != "" {
r . Header . Set ( "X-Filename" , m . Filename )
}
if m . Click != "" {
r . Header . Set ( "X-Click" , m . Click )
}
2022-07-16 19:31:03 +00:00
if m . Icon != "" {
r . Header . Set ( "X-Icon" , m . Icon )
}
2022-04-16 20:17:58 +00:00
if len ( m . Actions ) > 0 {
actionsStr , err := json . Marshal ( m . Actions )
if err != nil {
return errHTTPBadRequestJSONInvalid
}
r . Header . Set ( "X-Actions" , string ( actionsStr ) )
}
2022-03-29 19:40:26 +00:00
if m . Email != "" {
r . Header . Set ( "X-Email" , m . Email )
}
if m . Delay != "" {
r . Header . Set ( "X-Delay" , m . Delay )
}
2022-03-15 20:00:59 +00:00
return next ( w , r , v )
}
}
2022-06-14 02:07:30 +00:00
func ( s * Server ) transformMatrixJSON ( next handleFunc ) handleFunc {
return func ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-06-16 00:36:49 +00:00
newRequest , err := newRequestFromMatrixJSON ( r , s . config . BaseURL , s . config . MessageLimit )
2022-06-14 02:07:30 +00:00
if err != nil {
return err
}
2022-06-16 00:36:49 +00:00
if err := next ( w , newRequest , v ) ; err != nil {
return & errMatrix { pushKey : newRequest . Header . Get ( matrixPushKeyHeader ) , err : err }
2022-06-14 02:07:30 +00:00
}
return nil
}
}
2022-12-03 20:20:59 +00:00
func ( s * Server ) authorizeTopicWrite ( next handleFunc ) handleFunc {
2022-12-25 16:41:38 +00:00
return s . autorizeTopic ( next , user . PermissionWrite )
2022-01-22 03:22:27 +00:00
}
2022-12-03 20:20:59 +00:00
func ( s * Server ) authorizeTopicRead ( next handleFunc ) handleFunc {
2022-12-25 16:41:38 +00:00
return s . autorizeTopic ( next , user . PermissionRead )
2022-01-22 03:22:27 +00:00
}
2022-12-25 16:41:38 +00:00
func ( s * Server ) autorizeTopic ( next handleFunc , perm user . Permission ) handleFunc {
2022-01-22 03:22:27 +00:00
return func ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-12-25 16:41:38 +00:00
if s . userManager == nil {
2022-01-22 03:22:27 +00:00
return next ( w , r , v )
}
2022-01-27 17:49:05 +00:00
topics , _ , err := s . topicsFromPath ( r . URL . Path )
2022-01-22 03:22:27 +00:00
if err != nil {
return err
}
2022-01-27 17:49:05 +00:00
for _ , t := range topics {
2022-12-25 16:41:38 +00:00
if err := s . userManager . Authorize ( v . user , t . ID , perm ) ; err != nil {
2022-05-30 02:14:14 +00:00
log . Info ( "unauthorized: %s" , err . Error ( ) )
2022-01-27 17:49:05 +00:00
return errHTTPForbidden
}
2022-01-22 03:22:27 +00:00
}
return next ( w , r , v )
2021-11-05 17:46:27 +00:00
}
}
2021-10-24 02:49:50 +00:00
// visitor creates or retrieves a rate.Limiter for the given visitor.
2022-12-02 20:37:48 +00:00
// Note that this function will always return a visitor, even if an error occurs.
func ( s * Server ) visitor ( r * http . Request ) ( v * visitor , err error ) {
2022-12-22 02:55:39 +00:00
ip := extractIPAddress ( r , s . config . BehindProxy )
2022-12-25 16:41:38 +00:00
var user * user . User // may stay nil if no auth header!
2022-12-03 20:20:59 +00:00
if user , err = s . authenticate ( r ) ; err != nil {
log . Debug ( "authentication failed: %s" , err . Error ( ) )
err = errHTTPUnauthorized // Always return visitor, even when error occurs!
}
if user != nil {
2022-12-22 02:55:39 +00:00
v = s . visitorFromUser ( user , ip )
} else {
v = s . visitorFromIP ( ip )
2022-12-02 20:37:48 +00:00
}
2022-12-22 02:55:39 +00:00
v . user = user // Update user -- FIXME race?
2022-12-02 20:37:48 +00:00
return v , err // Always return visitor, even when error occurs!
}
2022-12-21 02:18:33 +00:00
// authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
// The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
// support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
// query param is effectively double base64 encoded. Its format is base64(Basic base64(user:pass)).
2022-12-25 16:41:38 +00:00
func ( s * Server ) authenticate ( r * http . Request ) ( user * user . User , err error ) {
2022-12-03 20:20:59 +00:00
value := r . Header . Get ( "Authorization" )
queryParam := readQueryParam ( r , "authorization" , "auth" )
if queryParam != "" {
a , err := base64 . RawURLEncoding . DecodeString ( queryParam )
if err != nil {
return nil , err
}
value = string ( a )
}
if value == "" {
return nil , nil
}
if strings . HasPrefix ( value , "Bearer" ) {
return s . authenticateBearerAuth ( value )
}
return s . authenticateBasicAuth ( r , value )
}
2022-12-25 16:41:38 +00:00
func ( s * Server ) authenticateBasicAuth ( r * http . Request , value string ) ( user * user . User , err error ) {
2022-12-03 20:20:59 +00:00
r . Header . Set ( "Authorization" , value )
username , password , ok := r . BasicAuth ( )
if ! ok {
return nil , errors . New ( "invalid basic auth" )
}
2022-12-25 16:41:38 +00:00
return s . userManager . Authenticate ( username , password )
2022-12-03 20:20:59 +00:00
}
2022-12-25 16:41:38 +00:00
func ( s * Server ) authenticateBearerAuth ( value string ) ( user * user . User , err error ) {
2022-12-03 20:20:59 +00:00
token := strings . TrimSpace ( strings . TrimPrefix ( value , "Bearer" ) )
2022-12-25 16:41:38 +00:00
return s . userManager . AuthenticateToken ( token )
2022-12-03 20:20:59 +00:00
}
2022-12-25 16:41:38 +00:00
func ( s * Server ) visitorFromID ( visitorID string , ip netip . Addr , user * user . User ) * visitor {
2022-12-02 20:37:48 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
v , exists := s . visitors [ visitorID ]
if ! exists {
s . visitors [ visitorID ] = newVisitor ( s . config , s . messageCache , ip , user )
return s . visitors [ visitorID ]
}
v . Keepalive ( )
return v
}
2022-12-22 02:55:39 +00:00
func ( s * Server ) visitorFromIP ( ip netip . Addr ) * visitor {
return s . visitorFromID ( fmt . Sprintf ( "ip:%s" , ip . String ( ) ) , ip , nil )
}
2022-12-25 16:41:38 +00:00
func ( s * Server ) visitorFromUser ( user * user . User , ip netip . Addr ) * visitor {
2022-12-22 02:55:39 +00:00
return s . visitorFromID ( fmt . Sprintf ( "user:%s" , user . Name ) , ip , user )
2021-10-24 02:49:50 +00:00
}