2021-10-23 01:26:01 +00:00
package server
import (
"bytes"
2021-10-29 03:50:38 +00:00
"context"
2022-05-27 11:55:57 +00:00
"crypto/sha256"
2021-12-23 23:03:04 +00:00
"embed"
2022-01-17 18:28:07 +00:00
"encoding/base64"
2021-10-23 01:26:01 +00:00
"encoding/json"
2022-12-03 20:20:59 +00:00
"errors"
2021-10-23 17:21:33 +00:00
"fmt"
2023-01-30 01:11:58 +00:00
"github.com/emersion/go-smtp"
"github.com/gorilla/websocket"
"golang.org/x/sync/errgroup"
"heckel.io/ntfy/log"
2022-12-25 16:41:38 +00:00
"heckel.io/ntfy/user"
2023-01-30 01:11:58 +00:00
"heckel.io/ntfy/util"
2021-10-23 01:26:01 +00:00
"io"
2021-10-24 02:49:50 +00:00
"net"
2021-10-23 01:26:01 +00:00
"net/http"
2022-10-05 20:42:07 +00:00
"net/netip"
2022-01-14 17:13:14 +00:00
"net/url"
2022-01-10 21:28:13 +00:00
"os"
2022-01-14 17:13:14 +00:00
"path"
2022-01-02 22:56:12 +00:00
"path/filepath"
2021-10-23 01:26:01 +00:00
"regexp"
2022-06-20 16:11:52 +00:00
"sort"
2021-10-29 17:58:14 +00:00
"strconv"
2021-10-23 01:26:01 +00:00
"strings"
"sync"
"time"
2022-01-02 22:56:12 +00:00
"unicode/utf8"
2021-10-23 01:26:01 +00:00
)
2022-12-08 01:44:20 +00:00
/ *
2023-01-19 19:03:39 +00:00
2023-02-11 02:19:44 +00:00
- MEDIUM fail2ban to work with ntfy log not nginx log
2023-01-29 20:11:26 +00:00
- HIGH Docs
2023-02-08 20:20:44 +00:00
- tiers
- api
2023-02-09 20:24:12 +00:00
- tokens
2023-01-10 02:53:21 +00:00
2022-12-08 01:44:20 +00:00
* /
2021-12-07 16:45:15 +00:00
// Server is the main server, providing the UI and API for ntfy
2021-10-23 01:26:01 +00:00
type Server struct {
2022-06-02 03:24:44 +00:00
config * Config
httpServer * http . Server
httpsServer * http . Server
unixListener net . Listener
smtpServer * smtp . Server
smtpServerBackend * smtpBackend
smtpSender mailer
topics map [ string ] * topic
2022-12-02 20:37:48 +00:00
visitors map [ string ] * visitor // ip:<ip> or user:<user>
2022-06-02 03:24:44 +00:00
firebaseClient * firebaseClient
messages int64
2023-01-19 04:01:26 +00:00
userManager * user . Manager // Might be nil!
messageCache * messageCache // Database that stores the messages
fileCache * fileCache // File system based cache that stores attachments
stripe stripeAPI // Stripe API, can be replaced with a mock
2023-01-18 20:50:06 +00:00
priceCache * util . LookupCache [ map [ string ] string ] // Stripe price ID -> formatted price
2022-06-02 03:24:44 +00:00
closeChan chan bool
mu sync . Mutex
2021-10-23 01:26:01 +00:00
}
2022-01-22 03:22:27 +00:00
// handleFunc extends the normal http.HandlerFunc to be able to easily return errors
type handleFunc func ( http . ResponseWriter , * http . Request , * visitor ) error
2021-10-23 01:26:01 +00:00
var (
2022-01-31 16:44:58 +00:00
// If changed, don't forget to update Android App and auth_sqlite.go
2022-03-23 18:29:55 +00:00
topicRegex = regexp . MustCompile ( ` ^[-_A-Za-z0-9] { 1,64}$ ` ) // No /!
topicPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}$ ` ) // Regex must match JS & Android app!
externalTopicPathRegex = regexp . MustCompile ( ` ^/[^/]+\.[^/]+/[-_A-Za-z0-9] { 1,64}$ ` ) // Extended topic path, for web-app, e.g. /example.com/mytopic
jsonPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/json$ ` )
ssePathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/sse$ ` )
rawPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/raw$ ` )
wsPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/ws$ ` )
authPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/auth$ ` )
publishPathRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}/(publish|send|trigger)$ ` )
2021-10-29 17:58:14 +00:00
2023-01-17 15:09:37 +00:00
webConfigPath = "/config.js"
accountPath = "/account"
matrixPushPath = "/_matrix/push/v1/notify"
apiHealthPath = "/v1/health"
2023-01-18 18:46:40 +00:00
apiTiers = "/v1/tiers"
2023-01-17 15:09:37 +00:00
apiAccountPath = "/v1/account"
apiAccountTokenPath = "/v1/account/token"
apiAccountPasswordPath = "/v1/account/password"
apiAccountSettingsPath = "/v1/account/settings"
apiAccountSubscriptionPath = "/v1/account/subscription"
apiAccountReservationPath = "/v1/account/reservation"
apiAccountBillingPortalPath = "/v1/account/billing/portal"
apiAccountBillingWebhookPath = "/v1/account/billing/webhook"
apiAccountBillingSubscriptionPath = "/v1/account/billing/subscription"
apiAccountBillingSubscriptionCheckoutSuccessTemplate = "/v1/account/billing/subscription/success/{CHECKOUT_SESSION_ID}"
apiAccountBillingSubscriptionCheckoutSuccessRegex = regexp . MustCompile ( ` /v1/account/billing/subscription/success/(.+)$ ` )
apiAccountReservationSingleRegex = regexp . MustCompile ( ` /v1/account/reservation/([-_A-Za-z0-9] { 1,64})$ ` )
staticRegex = regexp . MustCompile ( ` ^/static/.+ ` )
docsRegex = regexp . MustCompile ( ` ^/docs(|/.*)$ ` )
fileRegex = regexp . MustCompile ( ` ^/file/([-_A-Za-z0-9] { 1,64})(?:\.[A-Za-z0-9] { 1,16})?$ ` )
urlRegex = regexp . MustCompile ( ` ^https?:// ` )
2021-10-23 01:26:01 +00:00
2022-03-06 01:24:10 +00:00
//go:embed site
webFs embed . FS
webFsCached = & util . CachingEmbedFS { ModTime : time . Now ( ) , FS : webFs }
webSiteDir = "/site"
2022-03-06 02:28:25 +00:00
webHomeIndex = "/home.html" // Landing page, only if "web-root: home"
2022-03-06 01:24:10 +00:00
webAppIndex = "/app.html" // React app
2021-10-24 18:22:53 +00:00
2021-12-02 22:27:31 +00:00
//go:embed docs
2021-12-07 15:38:58 +00:00
docsStaticFs embed . FS
2021-12-02 22:27:31 +00:00
docsStaticCached = & util . CachingEmbedFS { ModTime : time . Now ( ) , FS : docsStaticFs }
2021-10-23 01:26:01 +00:00
)
2021-12-14 03:30:28 +00:00
const (
2023-02-14 19:58:13 +00:00
firebaseControlTopic = "~control" // See Android if changed
firebasePollTopic = "~poll" // See iOS if changed
emptyMessageBody = "triggered" // Used if message body is empty
newMessageBody = "New message" // Used in poll requests as generic message
defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
jsonBodyBytesLimit = 16384
subscriberBilledTopicPrefix = "up_"
subscriberBilledValidity = 12 * time . Hour
2022-01-16 03:33:35 +00:00
)
// WebSocket constants
const (
wsWriteWait = 2 * time . Second
wsBufferSize = 1024
wsReadLimit = 64 // We only ever receive PINGs
wsPongWait = 15 * time . Second
2021-12-14 03:30:28 +00:00
)
2023-02-04 03:21:50 +00:00
// Log tags
const (
2023-02-07 21:20:49 +00:00
tagStartup = "startup"
2023-02-06 04:34:27 +00:00
tagPublish = "publish"
2023-02-09 20:24:12 +00:00
tagSubscribe = "subscribe"
2023-02-06 04:34:27 +00:00
tagFirebase = "firebase"
tagSMTP = "smtp" // Receive email
2023-02-10 01:49:45 +00:00
tagEmail = "email" // Send email
2023-02-06 04:34:27 +00:00
tagFileCache = "file_cache"
tagMessageCache = "message_cache"
tagStripe = "stripe"
tagAccount = "account"
tagManager = "manager"
tagResetter = "resetter"
tagWebsocket = "websocket"
tagMatrix = "matrix"
2023-02-04 03:21:50 +00:00
)
2021-12-07 16:45:15 +00:00
// New instantiates a new Server. It creates the cache and adds a Firebase
// subscriber (if configured).
2021-12-19 03:02:36 +00:00
func New ( conf * Config ) ( * Server , error ) {
2021-12-23 23:03:04 +00:00
var mailer mailer
2021-12-27 15:39:28 +00:00
if conf . SMTPSenderAddr != "" {
mailer = & smtpSender { config : conf }
2021-12-23 23:03:04 +00:00
}
2023-01-19 04:01:26 +00:00
var stripe stripeAPI
if conf . StripeSecretKey != "" {
stripe = newStripeAPI ( )
}
2022-02-27 19:47:28 +00:00
messageCache , err := createMessageCache ( conf )
2021-11-02 18:08:21 +00:00
if err != nil {
return nil , err
}
2022-02-27 19:47:28 +00:00
topics , err := messageCache . Topics ( )
2021-11-03 01:09:49 +00:00
if err != nil {
return nil , err
2021-11-02 18:08:21 +00:00
}
2022-01-07 13:49:28 +00:00
var fileCache * fileCache
2022-01-02 22:56:12 +00:00
if conf . AttachmentCacheDir != "" {
2022-12-21 18:19:07 +00:00
fileCache , err = newFileCache ( conf . AttachmentCacheDir , conf . AttachmentTotalSizeLimit )
2022-01-07 13:49:28 +00:00
if err != nil {
2022-01-02 22:56:12 +00:00
return nil , err
}
}
2022-12-28 03:14:14 +00:00
var userManager * user . Manager
2022-01-23 04:01:20 +00:00
if conf . AuthFile != "" {
2023-01-29 01:29:06 +00:00
userManager , err = user . NewManager ( conf . AuthFile , conf . AuthStartupQueries , conf . AuthDefault , conf . AuthBcryptCost , conf . AuthStatsQueueWriterInterval )
2022-01-23 04:01:20 +00:00
if err != nil {
return nil , err
}
2022-01-22 19:47:27 +00:00
}
2022-06-01 03:16:44 +00:00
var firebaseClient * firebaseClient
2022-02-01 00:33:22 +00:00
if conf . FirebaseKeyFile != "" {
2022-06-01 03:16:44 +00:00
sender , err := newFirebaseSender ( conf . FirebaseKeyFile )
2022-02-01 00:33:22 +00:00
if err != nil {
return nil , err
}
2022-12-28 03:14:14 +00:00
firebaseClient = newFirebaseClient ( sender , userManager )
2022-02-01 00:33:22 +00:00
}
2023-01-19 04:01:26 +00:00
s := & Server {
2022-06-01 03:16:44 +00:00
config : conf ,
messageCache : messageCache ,
fileCache : fileCache ,
firebaseClient : firebaseClient ,
2022-06-02 03:24:44 +00:00
smtpSender : mailer ,
2022-06-01 03:16:44 +00:00
topics : topics ,
2022-12-28 03:14:14 +00:00
userManager : userManager ,
2022-12-02 20:37:48 +00:00
visitors : make ( map [ string ] * visitor ) ,
2023-01-19 04:01:26 +00:00
stripe : stripe ,
}
s . priceCache = util . NewLookupCache ( s . fetchStripePrices , conf . StripePriceCacheDuration )
return s , nil
2021-10-23 01:26:01 +00:00
}
2022-02-27 19:47:28 +00:00
func createMessageCache ( conf * Config ) ( * messageCache , error ) {
2021-12-09 15:23:17 +00:00
if conf . CacheDuration == 0 {
2022-02-27 14:38:46 +00:00
return newNopCache ( )
2021-12-09 15:23:17 +00:00
} else if conf . CacheFile != "" {
2023-01-09 21:21:00 +00:00
return newSqliteCache ( conf . CacheFile , conf . CacheStartupQueries , conf . CacheDuration , conf . CacheBatchSize , conf . CacheBatchTimeout , false )
2021-11-02 18:08:21 +00:00
}
2022-02-27 14:38:46 +00:00
return newMemCache ( )
2021-11-02 18:08:21 +00:00
}
2021-12-07 16:45:15 +00:00
// Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
// a manager go routine to print stats and prune messages.
2021-10-23 01:26:01 +00:00
func ( s * Server ) Run ( ) error {
2022-01-10 21:28:13 +00:00
var listenStr string
if s . config . ListenHTTP != "" {
listenStr += fmt . Sprintf ( " %s[http]" , s . config . ListenHTTP )
}
2021-12-02 13:52:48 +00:00
if s . config . ListenHTTPS != "" {
2022-01-10 21:28:13 +00:00
listenStr += fmt . Sprintf ( " %s[https]" , s . config . ListenHTTPS )
}
if s . config . ListenUnix != "" {
2022-07-03 23:33:01 +00:00
listenStr += fmt . Sprintf ( " %s[unix]" , s . config . ListenUnix )
2021-12-02 13:52:48 +00:00
}
2021-12-27 21:06:40 +00:00
if s . config . SMTPServerListen != "" {
2022-01-10 21:28:13 +00:00
listenStr += fmt . Sprintf ( " %s[smtp]" , s . config . SMTPServerListen )
2021-12-27 21:06:40 +00:00
}
2023-02-07 21:20:49 +00:00
log . Tag ( tagStartup ) . Info ( "Listening on%s, ntfy %s, log level is %s" , listenStr , s . config . Version , log . CurrentLevel ( ) . String ( ) )
2023-02-06 04:34:27 +00:00
if log . IsFile ( ) {
2023-02-07 17:02:25 +00:00
fmt . Fprintf ( os . Stderr , "Listening on%s, ntfy %s\n" , listenStr , s . config . Version )
fmt . Fprintf ( os . Stderr , "Logs are written to %s\n" , log . File ( ) )
2023-02-06 04:34:27 +00:00
}
2021-12-22 22:45:19 +00:00
mux := http . NewServeMux ( )
mux . HandleFunc ( "/" , s . handle )
2021-12-02 13:52:48 +00:00
errChan := make ( chan error )
2021-12-22 13:17:50 +00:00
s . mu . Lock ( )
s . closeChan = make ( chan bool )
2022-01-10 21:28:13 +00:00
if s . config . ListenHTTP != "" {
s . httpServer = & http . Server { Addr : s . config . ListenHTTP , Handler : mux }
go func ( ) {
errChan <- s . httpServer . ListenAndServe ( )
} ( )
}
2021-12-02 13:52:48 +00:00
if s . config . ListenHTTPS != "" {
2022-01-06 13:45:23 +00:00
s . httpsServer = & http . Server { Addr : s . config . ListenHTTPS , Handler : mux }
2021-12-02 13:52:48 +00:00
go func ( ) {
2021-12-22 13:17:50 +00:00
errChan <- s . httpsServer . ListenAndServeTLS ( s . config . CertFile , s . config . KeyFile )
2021-12-02 13:52:48 +00:00
} ( )
}
2022-01-10 21:28:13 +00:00
if s . config . ListenUnix != "" {
go func ( ) {
var err error
s . mu . Lock ( )
os . Remove ( s . config . ListenUnix )
s . unixListener , err = net . Listen ( "unix" , s . config . ListenUnix )
if err != nil {
2022-07-03 23:33:01 +00:00
s . mu . Unlock ( )
2022-01-10 21:28:13 +00:00
errChan <- err
return
}
2022-07-03 23:33:01 +00:00
defer s . unixListener . Close ( )
if s . config . ListenUnixMode > 0 {
if err := os . Chmod ( s . config . ListenUnix , s . config . ListenUnixMode ) ; err != nil {
s . mu . Unlock ( )
errChan <- err
return
}
2022-07-03 19:27:36 +00:00
}
2022-01-10 21:28:13 +00:00
s . mu . Unlock ( )
httpServer := & http . Server { Handler : mux }
errChan <- httpServer . Serve ( s . unixListener )
} ( )
}
2021-12-27 15:39:28 +00:00
if s . config . SMTPServerListen != "" {
2021-12-27 14:48:09 +00:00
go func ( ) {
2021-12-27 21:06:40 +00:00
errChan <- s . runSMTPServer ( )
2021-12-27 14:48:09 +00:00
} ( )
}
2021-12-22 13:17:50 +00:00
s . mu . Unlock ( )
2021-12-22 22:20:43 +00:00
go s . runManager ( )
2023-01-11 03:51:51 +00:00
go s . runStatsResetter ( )
2022-06-01 00:38:56 +00:00
go s . runDelayedSender ( )
2022-01-21 19:17:59 +00:00
go s . runFirebaseKeepaliver ( )
2021-12-27 14:48:09 +00:00
2021-12-02 13:52:48 +00:00
return <- errChan
2021-10-23 01:26:01 +00:00
}
2021-12-22 13:17:50 +00:00
// Stop stops HTTP (+HTTPS) server and all managers
func ( s * Server ) Stop ( ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
if s . httpServer != nil {
s . httpServer . Close ( )
}
if s . httpsServer != nil {
s . httpsServer . Close ( )
}
2022-01-10 21:28:13 +00:00
if s . unixListener != nil {
s . unixListener . Close ( )
}
2021-12-27 21:06:40 +00:00
if s . smtpServer != nil {
s . smtpServer . Close ( )
}
2023-01-28 14:03:14 +00:00
s . closeDatabases ( )
2021-12-22 13:17:50 +00:00
close ( s . closeChan )
}
2023-01-28 14:03:14 +00:00
func ( s * Server ) closeDatabases ( ) {
if s . userManager != nil {
s . userManager . Close ( )
}
s . messageCache . Close ( )
}
2023-02-10 01:49:45 +00:00
// handle is the main entry point for all HTTP requests
2021-10-23 01:26:01 +00:00
func ( s * Server ) handle ( w http . ResponseWriter , r * http . Request ) {
2023-01-27 03:57:18 +00:00
v , err := s . maybeAuthenticate ( r ) // Note: Always returns v, even when error is returned
2022-12-02 20:37:48 +00:00
if err != nil {
2023-02-10 01:49:45 +00:00
s . handleError ( w , r , v , err )
return
}
if log . IsTrace ( ) {
logvr ( v , r ) . Field ( "http_request" , renderHTTPRequest ( r ) ) . Trace ( "HTTP request started" )
} else if log . IsDebug ( ) {
logvr ( v , r ) . Debug ( "HTTP request started" )
}
logvr ( v , r ) .
Timing ( func ( ) {
if err := s . handleInternal ( w , r , v ) ; err != nil {
s . handleError ( w , r , v , err )
return
2022-06-01 20:57:35 +00:00
}
2023-02-10 01:49:45 +00:00
} ) .
Debug ( "HTTP request finished" )
}
func ( s * Server ) handleError ( w http . ResponseWriter , r * http . Request , v * visitor , err error ) {
if websocket . IsWebSocketUpgrade ( r ) {
isNormalError := strings . Contains ( err . Error ( ) , "i/o timeout" )
2022-06-01 20:57:35 +00:00
if isNormalError {
2023-02-10 01:49:45 +00:00
logvr ( v , r ) . Tag ( tagWebsocket ) . Err ( err ) . Fields ( websocketErrorContext ( err ) ) . Debug ( "WebSocket error (this error is okay, it happens a lot): %s" , err . Error ( ) )
2022-06-01 20:57:35 +00:00
} else {
2023-02-10 01:49:45 +00:00
logvr ( v , r ) . Tag ( tagWebsocket ) . Err ( err ) . Fields ( websocketErrorContext ( err ) ) . Info ( "WebSocket error: %s" , err . Error ( ) )
2022-06-01 20:57:35 +00:00
}
2023-02-10 01:49:45 +00:00
return // Do not attempt to write to upgraded connection
}
if matrixErr , ok := err . ( * errMatrix ) ; ok {
writeMatrixError ( w , r , v , matrixErr )
return
}
httpErr , ok := err . ( * errHTTP )
if ! ok {
httpErr = errHTTPInternalError
}
isNormalError := httpErr . HTTPCode == http . StatusNotFound || httpErr . HTTPCode == http . StatusBadRequest || httpErr . HTTPCode == http . StatusTooManyRequests
if isNormalError {
logvr ( v , r ) .
Err ( httpErr ) .
Debug ( "Connection closed with HTTP %d (ntfy error %d)" , httpErr . HTTPCode , httpErr . Code )
} else {
logvr ( v , r ) .
Err ( httpErr ) .
Info ( "Connection closed with HTTP %d (ntfy error %d)" , httpErr . HTTPCode , httpErr . Code )
2021-10-23 01:26:01 +00:00
}
2023-02-10 01:49:45 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , s . config . AccessControlAllowOrigin ) // CORS, allow cross-origin requests
w . WriteHeader ( httpErr . HTTPCode )
io . WriteString ( w , httpErr . JSON ( ) + "\n" )
2021-10-23 01:26:01 +00:00
}
2022-02-14 21:09:59 +00:00
func ( s * Server ) handleInternal ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-05-13 18:42:25 +00:00
if r . Method == http . MethodGet && r . URL . Path == "/" {
return s . ensureWebEnabled ( s . handleHome ) ( w , r , v )
2021-11-05 17:46:27 +00:00
} else if r . Method == http . MethodHead && r . URL . Path == "/" {
2022-05-13 18:42:25 +00:00
return s . ensureWebEnabled ( s . handleEmpty ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodGet && r . URL . Path == apiHealthPath {
2022-12-24 17:22:54 +00:00
return s . handleHealth ( w , r , v )
2022-05-13 18:42:25 +00:00
} else if r . Method == http . MethodGet && r . URL . Path == webConfigPath {
return s . ensureWebEnabled ( s . handleWebConfig ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == apiAccountPath {
2022-12-29 00:55:11 +00:00
return s . ensureUserManager ( s . handleAccountCreate ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodGet && r . URL . Path == apiAccountPath {
2022-12-28 03:14:14 +00:00
return s . handleAccountGet ( w , r , v ) // Allowed by anonymous
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodDelete && r . URL . Path == apiAccountPath {
2023-01-16 21:35:37 +00:00
return s . ensureUser ( s . withAccountSync ( s . handleAccountDelete ) ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == apiAccountPasswordPath {
2022-12-29 00:55:11 +00:00
return s . ensureUser ( s . handleAccountPasswordChange ) ( w , r , v )
2023-01-28 04:10:59 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == apiAccountTokenPath {
return s . ensureUser ( s . withAccountSync ( s . handleAccountTokenCreate ) ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPatch && r . URL . Path == apiAccountTokenPath {
2023-01-28 04:10:59 +00:00
return s . ensureUser ( s . withAccountSync ( s . handleAccountTokenUpdate ) ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodDelete && r . URL . Path == apiAccountTokenPath {
2023-01-28 04:10:59 +00:00
return s . ensureUser ( s . withAccountSync ( s . handleAccountTokenDelete ) ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPatch && r . URL . Path == apiAccountSettingsPath {
2023-01-16 21:35:37 +00:00
return s . ensureUser ( s . withAccountSync ( s . handleAccountSettingsChange ) ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == apiAccountSubscriptionPath {
2023-01-16 21:35:37 +00:00
return s . ensureUser ( s . withAccountSync ( s . handleAccountSubscriptionAdd ) ) ( w , r , v )
2023-02-12 19:09:44 +00:00
} else if r . Method == http . MethodPatch && r . URL . Path == apiAccountSubscriptionPath {
2023-01-16 21:35:37 +00:00
return s . ensureUser ( s . withAccountSync ( s . handleAccountSubscriptionChange ) ) ( w , r , v )
2023-02-12 19:09:44 +00:00
} else if r . Method == http . MethodDelete && r . URL . Path == apiAccountSubscriptionPath {
2023-01-16 21:35:37 +00:00
return s . ensureUser ( s . withAccountSync ( s . handleAccountSubscriptionDelete ) ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == apiAccountReservationPath {
2023-01-16 21:35:37 +00:00
return s . ensureUser ( s . withAccountSync ( s . handleAccountReservationAdd ) ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodDelete && apiAccountReservationSingleRegex . MatchString ( r . URL . Path ) {
2023-01-16 21:35:37 +00:00
return s . ensureUser ( s . withAccountSync ( s . handleAccountReservationDelete ) ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == apiAccountBillingSubscriptionPath {
2023-01-16 21:35:37 +00:00
return s . ensurePaymentsEnabled ( s . ensureUser ( s . handleAccountBillingSubscriptionCreate ) ) ( w , r , v ) // Account sync via incoming Stripe webhook
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodGet && apiAccountBillingSubscriptionCheckoutSuccessRegex . MatchString ( r . URL . Path ) {
2023-01-16 21:35:37 +00:00
return s . ensurePaymentsEnabled ( s . ensureUserManager ( s . handleAccountBillingSubscriptionCreateSuccess ) ) ( w , r , v ) // No user context!
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPut && r . URL . Path == apiAccountBillingSubscriptionPath {
2023-01-18 18:46:40 +00:00
return s . ensurePaymentsEnabled ( s . ensureStripeCustomer ( s . handleAccountBillingSubscriptionUpdate ) ) ( w , r , v ) // Account sync via incoming Stripe webhook
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodDelete && r . URL . Path == apiAccountBillingSubscriptionPath {
2023-01-16 21:35:37 +00:00
return s . ensurePaymentsEnabled ( s . ensureStripeCustomer ( s . handleAccountBillingSubscriptionDelete ) ) ( w , r , v ) // Account sync via incoming Stripe webhook
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == apiAccountBillingPortalPath {
2023-01-16 21:35:37 +00:00
return s . ensurePaymentsEnabled ( s . ensureStripeCustomer ( s . handleAccountBillingPortalSessionCreate ) ) ( w , r , v )
2023-01-17 15:09:37 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == apiAccountBillingWebhookPath {
2023-01-18 18:46:40 +00:00
return s . ensurePaymentsEnabled ( s . ensureUserManager ( s . handleAccountBillingWebhook ) ) ( w , r , v ) // This request comes from Stripe!
} else if r . Method == http . MethodGet && r . URL . Path == apiTiers {
return s . ensurePaymentsEnabled ( s . handleBillingTiersGet ) ( w , r , v )
2022-06-15 20:03:12 +00:00
} else if r . Method == http . MethodGet && r . URL . Path == matrixPushPath {
return s . handleMatrixDiscovery ( w )
2022-05-13 18:42:25 +00:00
} else if r . Method == http . MethodGet && staticRegex . MatchString ( r . URL . Path ) {
return s . ensureWebEnabled ( s . handleStatic ) ( w , r , v )
} else if r . Method == http . MethodGet && docsRegex . MatchString ( r . URL . Path ) {
return s . ensureWebEnabled ( s . handleDocs ) ( w , r , v )
2022-06-11 01:33:39 +00:00
} else if ( r . Method == http . MethodGet || r . Method == http . MethodHead ) && fileRegex . MatchString ( r . URL . Path ) && s . config . AttachmentCacheDir != "" {
2022-01-22 03:22:27 +00:00
return s . limitRequests ( s . handleFile ) ( w , r , v )
2021-11-05 17:46:27 +00:00
} else if r . Method == http . MethodOptions {
2023-02-03 00:07:16 +00:00
return s . limitRequests ( s . handleOptions ) ( w , r , v ) // Should work even if the web app is not enabled, see #598
2022-03-16 18:16:54 +00:00
} else if ( r . Method == http . MethodPut || r . Method == http . MethodPost ) && r . URL . Path == "/" {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . transformBodyJSON ( s . authorizeTopicWrite ( s . handlePublish ) ) ) ( w , r , v )
2022-06-14 02:07:30 +00:00
} else if r . Method == http . MethodPost && r . URL . Path == matrixPushPath {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . transformMatrixJSON ( s . authorizeTopicWrite ( s . handlePublishMatrix ) ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if ( r . Method == http . MethodPut || r . Method == http . MethodPost ) && topicPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicWrite ( s . handlePublish ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if r . Method == http . MethodGet && publishPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicWrite ( s . handlePublish ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if r . Method == http . MethodGet && jsonPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleSubscribeJSON ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if r . Method == http . MethodGet && ssePathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleSubscribeSSE ) ) ( w , r , v )
2021-12-27 21:06:40 +00:00
} else if r . Method == http . MethodGet && rawPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleSubscribeRaw ) ) ( w , r , v )
2022-01-15 18:23:35 +00:00
} else if r . Method == http . MethodGet && wsPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleSubscribeWS ) ) ( w , r , v )
2022-01-26 04:04:09 +00:00
} else if r . Method == http . MethodGet && authPathRegex . MatchString ( r . URL . Path ) {
2022-12-03 20:20:59 +00:00
return s . limitRequests ( s . authorizeTopicRead ( s . handleTopicAuth ) ) ( w , r , v )
2022-03-23 18:29:55 +00:00
} else if r . Method == http . MethodGet && ( topicPathRegex . MatchString ( r . URL . Path ) || externalTopicPathRegex . MatchString ( r . URL . Path ) ) {
2022-05-13 18:42:25 +00:00
return s . ensureWebEnabled ( s . handleTopic ) ( w , r , v )
2021-10-23 01:26:01 +00:00
}
2021-10-24 02:49:50 +00:00
return errHTTPNotFound
2021-10-23 01:26:01 +00:00
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleHome ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-03-06 02:28:25 +00:00
if s . config . WebRootIsApp {
r . URL . Path = webAppIndex
} else {
r . URL . Path = webHomeIndex
}
2022-05-13 18:42:25 +00:00
return s . handleStatic ( w , r , v )
2021-10-23 01:26:01 +00:00
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleTopic ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-01-16 04:17:46 +00:00
unifiedpush := readBoolParam ( r , false , "x-unifiedpush" , "unifiedpush" , "up" ) // see PUT/POST too!
2021-12-25 21:07:55 +00:00
if unifiedpush {
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2023-01-23 19:05:41 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , s . config . AccessControlAllowOrigin ) // CORS, allow cross-origin requests
2021-12-25 21:07:55 +00:00
_ , err := io . WriteString ( w , ` { "unifiedpush": { "version":1}} ` + "\n" )
return err
}
2022-03-06 01:24:10 +00:00
r . URL . Path = webAppIndex
2022-05-13 18:42:25 +00:00
return s . handleStatic ( w , r , v )
2021-12-25 21:07:55 +00:00
}
2022-01-26 04:04:09 +00:00
func ( s * Server ) handleEmpty ( _ http . ResponseWriter , _ * http . Request , _ * visitor ) error {
2021-11-05 17:46:27 +00:00
return nil
}
2022-01-26 04:04:09 +00:00
func ( s * Server ) handleTopicAuth ( w http . ResponseWriter , _ * http . Request , _ * visitor ) error {
2023-01-18 20:50:06 +00:00
return s . writeJSON ( w , newSuccessResponse ( ) )
2022-01-26 04:04:09 +00:00
}
2022-12-24 17:22:54 +00:00
func ( s * Server ) handleHealth ( w http . ResponseWriter , _ * http . Request , _ * visitor ) error {
response := & apiHealthResponse {
Healthy : true ,
}
2023-01-18 20:50:06 +00:00
return s . writeJSON ( w , response )
2022-12-24 17:22:54 +00:00
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleWebConfig ( w http . ResponseWriter , _ * http . Request , _ * visitor ) error {
2022-03-10 04:28:55 +00:00
appRoot := "/"
if ! s . config . WebRootIsApp {
appRoot = "/app"
}
2023-01-05 01:34:22 +00:00
response := & apiConfigResponse {
2023-01-11 03:51:51 +00:00
BaseURL : "" , // Will translate to window.location.origin
AppRoot : appRoot ,
EnableLogin : s . config . EnableLogin ,
EnableSignup : s . config . EnableSignup ,
2023-01-18 18:46:40 +00:00
EnablePayments : s . config . StripeSecretKey != "" ,
2023-01-11 03:51:51 +00:00
EnableReservations : s . config . EnableReservations ,
2023-02-09 13:32:51 +00:00
DisallowedTopics : s . config . DisallowedTopics ,
2023-01-05 01:34:22 +00:00
}
2023-01-10 01:37:13 +00:00
b , err := json . MarshalIndent ( response , "" , " " )
2023-01-05 01:34:22 +00:00
if err != nil {
return err
}
2022-03-11 20:56:54 +00:00
w . Header ( ) . Set ( "Content-Type" , "text/javascript" )
2023-01-05 01:34:22 +00:00
_ , err = io . WriteString ( w , fmt . Sprintf ( "// Generated server configuration\nvar config = %s;\n" , string ( b ) ) )
2022-03-10 04:28:55 +00:00
return err
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleStatic ( w http . ResponseWriter , r * http . Request , _ * visitor ) error {
2022-03-06 01:24:10 +00:00
r . URL . Path = webSiteDir + r . URL . Path
2022-03-11 02:55:56 +00:00
util . Gzip ( http . FileServer ( http . FS ( webFsCached ) ) ) . ServeHTTP ( w , r )
2021-10-29 17:58:14 +00:00
return nil
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleDocs ( w http . ResponseWriter , r * http . Request , _ * visitor ) error {
2022-03-11 02:55:56 +00:00
util . Gzip ( http . FileServer ( http . FS ( docsStaticCached ) ) ) . ServeHTTP ( w , r )
2021-12-02 22:27:31 +00:00
return nil
}
2023-01-29 20:11:26 +00:00
// handleFile processes the download of attachment files. The method handles GET and HEAD requests against a file.
// Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it
// can associate the download bandwidth with the uploader.
2022-01-12 23:52:07 +00:00
func ( s * Server ) handleFile ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-01-02 22:56:12 +00:00
if s . config . AttachmentCacheDir == "" {
2022-01-03 23:55:08 +00:00
return errHTTPInternalError
2022-01-02 22:56:12 +00:00
}
matches := fileRegex . FindStringSubmatch ( r . URL . Path )
if len ( matches ) != 2 {
2022-12-29 14:57:42 +00:00
return errHTTPInternalErrorInvalidPath
2022-01-02 22:56:12 +00:00
}
messageID := matches [ 1 ]
file := filepath . Join ( s . config . AttachmentCacheDir , messageID )
stat , err := os . Stat ( file )
if err != nil {
return errHTTPNotFound
}
2023-01-23 19:05:41 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , s . config . AccessControlAllowOrigin ) // CORS, allow cross-origin requests
2023-01-29 20:11:26 +00:00
w . Header ( ) . Set ( "Content-Length" , fmt . Sprintf ( "%d" , stat . Size ( ) ) )
if r . Method == http . MethodHead {
return nil
}
// Find message in database, and associate bandwidth to the uploader user
// This is an easy way to
// - avoid abuse (e.g. 1 uploader, 1k downloaders)
// - and also uses the higher bandwidth limits of a paying user
m , err := s . messageCache . Message ( messageID )
if err == errMessageNotFound {
2023-02-11 02:44:12 +00:00
if s . config . CacheBatchTimeout > 0 {
// Strange edge case: If we immediately after upload request the file (the web app does this for images),
// and messages are persisted asynchronously, retry fetching from the database
m , err = util . Retry ( func ( ) ( * message , error ) {
return s . messageCache . Message ( messageID )
} , s . config . CacheBatchTimeout , 100 * time . Millisecond , 300 * time . Millisecond , 600 * time . Millisecond )
}
if err != nil {
return errHTTPNotFound
}
2023-01-29 20:11:26 +00:00
} else if err != nil {
return err
}
bandwidthVisitor := v
if s . userManager != nil && m . User != "" {
u , err := s . userManager . UserByID ( m . User )
2022-06-11 01:33:39 +00:00
if err != nil {
return err
}
2023-01-29 20:11:26 +00:00
bandwidthVisitor = s . visitor ( v . IP ( ) , u )
2023-02-08 03:10:51 +00:00
} else if m . Sender . IsValid ( ) {
2023-01-29 20:11:26 +00:00
bandwidthVisitor = s . visitor ( m . Sender , nil )
}
if ! bandwidthVisitor . BandwidthAllowed ( stat . Size ( ) ) {
return errHTTPTooManyRequestsLimitAttachmentBandwidth
}
// Actually send file
f , err := os . Open ( file )
if err != nil {
2022-01-02 22:56:12 +00:00
return err
}
2023-01-29 20:11:26 +00:00
defer f . Close ( )
_ , err = io . Copy ( util . NewContentTypeWriter ( w , r . URL . Path ) , f )
return err
2022-01-02 22:56:12 +00:00
}
2022-06-15 20:03:12 +00:00
func ( s * Server ) handleMatrixDiscovery ( w http . ResponseWriter ) error {
2022-06-20 01:25:35 +00:00
if s . config . BaseURL == "" {
return errHTTPInternalErrorMissingBaseURL
}
2022-06-16 15:40:56 +00:00
return writeMatrixDiscoveryResponse ( w )
2022-06-15 20:03:12 +00:00
}
2022-06-15 00:43:17 +00:00
func ( s * Server ) handlePublishWithoutResponse ( r * http . Request , v * visitor ) ( * message , error ) {
2021-12-15 14:41:55 +00:00
t , err := s . topicFromPath ( r . URL . Path )
2021-11-01 20:39:40 +00:00
if err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-11-01 20:39:40 +00:00
}
2023-01-27 16:33:51 +00:00
if ! v . MessageAllowed ( ) {
2023-01-09 20:40:46 +00:00
return nil , errHTTPTooManyRequestsLimitMessages
2023-01-09 01:46:46 +00:00
}
2022-04-03 16:39:52 +00:00
body , err := util . Peek ( r . Body , s . config . MessageLimit )
2021-10-23 01:26:01 +00:00
if err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-10-23 01:26:01 +00:00
}
2022-01-02 22:56:12 +00:00
m := newDefaultMessage ( t . ID , "" )
2022-01-17 18:28:07 +00:00
cache , firebase , email , unifiedpush , err := s . parsePublishParams ( r , v , m )
2021-12-10 16:31:42 +00:00
if err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-10-29 03:50:38 +00:00
}
2022-05-27 11:55:57 +00:00
if m . PollID != "" {
m = newPollRequestMessage ( t . ID , m . PollID )
}
2023-02-08 03:10:51 +00:00
m . Sender = v . IP ( )
2023-01-27 16:33:51 +00:00
m . User = v . MaybeUserID ( )
2023-01-27 03:57:18 +00:00
m . Expires = time . Now ( ) . Add ( v . Limits ( ) . MessageExpiryDuration ) . Unix ( )
2022-01-17 18:28:07 +00:00
if err := s . handlePublishBody ( r , v , m , body , unifiedpush ) ; err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-12-23 23:03:04 +00:00
}
2021-12-15 14:41:55 +00:00
if m . Message == "" {
2021-12-23 23:03:04 +00:00
m . Message = emptyMessageBody
2021-12-15 14:41:55 +00:00
}
2021-12-10 16:31:42 +00:00
delayed := m . Time > time . Now ( ) . Unix ( )
2023-02-04 03:21:50 +00:00
logvrm ( v , r , m ) .
Tag ( tagPublish ) .
2023-02-06 04:34:27 +00:00
Fields ( log . Context {
2023-02-04 03:21:50 +00:00
"message_delayed" : delayed ,
"message_firebase" : firebase ,
"message_unifiedpush" : unifiedpush ,
"message_email" : email ,
} ) .
Debug ( "Received message" )
2022-06-02 03:24:44 +00:00
if log . IsTrace ( ) {
2023-02-04 03:21:50 +00:00
logvrm ( v , r , m ) .
Tag ( tagPublish ) .
Field ( "message_body" , util . MaybeMarshalJSON ( m ) ) .
Trace ( "Message body" )
2022-06-02 03:24:44 +00:00
}
2021-12-10 16:31:42 +00:00
if ! delayed {
2022-06-01 00:38:56 +00:00
if err := t . Publish ( v , m ) ; err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2021-12-10 16:31:42 +00:00
}
2022-06-01 20:57:35 +00:00
if s . firebaseClient != nil && firebase {
go s . sendToFirebase ( v , m )
}
2022-06-02 03:24:44 +00:00
if s . smtpSender != nil && email != "" {
2022-06-01 20:57:35 +00:00
go s . sendEmail ( v , m , email )
}
if s . config . UpstreamBaseURL != "" {
go s . forwardPollRequest ( v , m )
}
} else {
2023-02-04 03:21:50 +00:00
logvrm ( v , r , m ) . Tag ( tagPublish ) . Debug ( "Message delayed, will process later" )
2022-05-27 11:55:57 +00:00
}
2021-12-09 15:23:17 +00:00
if cache {
2023-02-04 03:21:50 +00:00
logvrm ( v , r , m ) . Tag ( tagPublish ) . Debug ( "Adding message to cache" )
2022-11-16 15:28:20 +00:00
if err := s . messageCache . AddMessage ( m ) ; err != nil {
2022-06-15 00:43:17 +00:00
return nil , err
2022-11-16 15:28:20 +00:00
}
2021-11-02 18:08:21 +00:00
}
2023-01-27 16:33:51 +00:00
u := v . User ( )
if s . userManager != nil && u != nil && u . Tier != nil {
2023-02-09 20:24:12 +00:00
go s . userManager . EnqueueUserStats ( u . ID , v . Stats ( ) )
2022-12-21 02:18:33 +00:00
}
2022-06-15 00:43:17 +00:00
s . mu . Lock ( )
s . messages ++
s . mu . Unlock ( )
return m , nil
}
func ( s * Server ) handlePublish ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
m , err := s . handlePublishWithoutResponse ( r , v )
if err != nil {
return err
}
2023-01-18 20:50:06 +00:00
return s . writeJSON ( w , m )
2022-06-15 00:43:17 +00:00
}
func ( s * Server ) handlePublishMatrix ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
_ , err := s . handlePublishWithoutResponse ( r , v )
if err != nil {
2022-06-16 00:36:49 +00:00
return & errMatrix { pushKey : r . Header . Get ( matrixPushKeyHeader ) , err : err }
2022-06-15 00:43:17 +00:00
}
2022-06-15 20:03:12 +00:00
return writeMatrixSuccess ( w )
2021-10-23 01:26:01 +00:00
}
2022-05-28 00:30:20 +00:00
func ( s * Server ) sendToFirebase ( v * visitor , m * message ) {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Tag ( tagFirebase ) . Debug ( "Publishing to Firebase" )
2022-06-01 03:16:44 +00:00
if err := s . firebaseClient . Send ( v , m ) ; err != nil {
2022-06-03 00:59:07 +00:00
if err == errFirebaseTemporarilyBanned {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Tag ( tagFirebase ) . Err ( err ) . Debug ( "Unable to publish to Firebase: %v" , err . Error ( ) )
2022-06-03 00:59:07 +00:00
} else {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Tag ( tagFirebase ) . Err ( err ) . Warn ( "Unable to publish to Firebase: %v" , err . Error ( ) )
2022-06-03 00:59:07 +00:00
}
2022-05-28 00:30:20 +00:00
}
}
func ( s * Server ) sendEmail ( v * visitor , m * message , email string ) {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Tag ( tagEmail ) . Field ( "email" , email ) . Debug ( "Sending email to %s" , email )
2022-06-02 03:24:44 +00:00
if err := s . smtpSender . Send ( v , m , email ) ; err != nil {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Tag ( tagEmail ) . Field ( "email" , email ) . Err ( err ) . Warn ( "Unable to send email to %s: %v" , email , err . Error ( ) )
2022-05-28 00:30:20 +00:00
}
}
func ( s * Server ) forwardPollRequest ( v * visitor , m * message ) {
topicURL := fmt . Sprintf ( "%s/%s" , s . config . BaseURL , m . Topic )
topicHash := fmt . Sprintf ( "%x" , sha256 . Sum256 ( [ ] byte ( topicURL ) ) )
forwardURL := fmt . Sprintf ( "%s/%s" , s . config . UpstreamBaseURL , topicHash )
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Debug ( "Publishing poll request to %s" , forwardURL )
2022-05-28 00:30:20 +00:00
req , err := http . NewRequest ( "POST" , forwardURL , strings . NewReader ( "" ) )
if err != nil {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Err ( err ) . Warn ( "Unable to publish poll request" )
2022-05-28 00:30:20 +00:00
return
}
req . Header . Set ( "X-Poll-ID" , m . ID )
2022-06-01 01:39:19 +00:00
var httpClient = & http . Client {
Timeout : time . Second * 10 ,
}
response , err := httpClient . Do ( req )
2022-05-28 00:30:20 +00:00
if err != nil {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Err ( err ) . Warn ( "Unable to publish poll request" )
2022-05-28 00:30:20 +00:00
return
} else if response . StatusCode != http . StatusOK {
2023-02-05 02:26:01 +00:00
logvm ( v , m ) . Err ( err ) . Warn ( "Unable to publish poll request, unexpected HTTP status: %d" , response . StatusCode )
2022-05-28 00:30:20 +00:00
return
}
}
2022-01-17 18:28:07 +00:00
func ( s * Server ) parsePublishParams ( r * http . Request , v * visitor , m * message ) ( cache bool , firebase bool , email string , unifiedpush bool , err error ) {
2022-01-16 04:17:46 +00:00
cache = readBoolParam ( r , true , "x-cache" , "cache" )
firebase = readBoolParam ( r , true , "x-firebase" , "firebase" )
2021-12-22 08:44:16 +00:00
m . Title = readParam ( r , "x-title" , "title" , "t" )
2022-01-04 22:40:41 +00:00
m . Click = readParam ( r , "x-click" , "click" )
2022-07-17 21:47:21 +00:00
icon := readParam ( r , "x-icon" , "icon" )
2022-01-08 20:47:08 +00:00
filename := readParam ( r , "x-filename" , "filename" , "file" , "f" )
2022-01-14 17:13:14 +00:00
attach := readParam ( r , "x-attach" , "attach" , "a" )
2022-01-08 20:47:08 +00:00
if attach != "" || filename != "" {
m . Attachment = & attachment { }
}
2022-01-14 17:13:14 +00:00
if filename != "" {
m . Attachment . Name = filename
}
2022-01-08 20:47:08 +00:00
if attach != "" {
2022-07-17 21:47:21 +00:00
if ! urlRegex . MatchString ( attach ) {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestAttachmentURLInvalid
2022-01-08 20:47:08 +00:00
}
m . Attachment . URL = attach
2022-01-14 17:13:14 +00:00
if m . Attachment . Name == "" {
u , err := url . Parse ( m . Attachment . URL )
if err == nil {
m . Attachment . Name = path . Base ( u . Path )
if m . Attachment . Name == "." || m . Attachment . Name == "/" {
m . Attachment . Name = ""
}
}
}
if m . Attachment . Name == "" {
m . Attachment . Name = "attachment"
}
2022-01-08 20:47:08 +00:00
}
2022-07-17 21:47:21 +00:00
if icon != "" {
if ! urlRegex . MatchString ( icon ) {
return false , false , "" , false , errHTTPBadRequestIconURLInvalid
}
m . Icon = icon
}
2022-01-08 20:47:08 +00:00
email = readParam ( r , "x-email" , "x-e-mail" , "email" , "e-mail" , "mail" , "e" )
if email != "" {
2023-01-27 16:33:51 +00:00
if ! v . EmailAllowed ( ) {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPTooManyRequestsLimitEmails
2022-01-08 20:47:08 +00:00
}
}
2022-06-02 03:24:44 +00:00
if s . smtpSender == nil && email != "" {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestEmailDisabled
2022-01-08 20:47:08 +00:00
}
2022-02-13 14:23:55 +00:00
messageStr := strings . ReplaceAll ( readParam ( r , "x-message" , "message" , "m" ) , "\\n" , "\n" )
2021-12-15 14:41:55 +00:00
if messageStr != "" {
m . Message = messageStr
}
2021-12-17 01:33:01 +00:00
m . Priority , err = util . ParsePriority ( readParam ( r , "x-priority" , "priority" , "prio" , "p" ) )
if err != nil {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestPriorityInvalid
2021-11-27 21:12:08 +00:00
}
2021-12-22 08:44:16 +00:00
tagsStr := readParam ( r , "x-tags" , "tags" , "tag" , "ta" )
2021-11-27 21:12:08 +00:00
if tagsStr != "" {
2021-12-10 16:31:42 +00:00
m . Tags = make ( [ ] string , 0 )
2021-12-21 20:22:27 +00:00
for _ , s := range util . SplitNoEmpty ( tagsStr , "," ) {
2021-12-10 16:31:42 +00:00
m . Tags = append ( m . Tags , strings . TrimSpace ( s ) )
2021-12-07 20:39:42 +00:00
}
2021-11-27 21:12:08 +00:00
}
2021-12-15 14:41:55 +00:00
delayStr := readParam ( r , "x-delay" , "delay" , "x-at" , "at" , "x-in" , "in" )
2021-12-11 05:06:25 +00:00
if delayStr != "" {
2021-12-10 16:31:42 +00:00
if ! cache {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayNoCache
2021-12-10 16:31:42 +00:00
}
2021-12-23 23:03:04 +00:00
if email != "" {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
2021-12-23 23:03:04 +00:00
}
2021-12-11 05:06:25 +00:00
delay , err := util . ParseFutureTime ( delayStr , time . Now ( ) )
2021-12-10 16:31:42 +00:00
if err != nil {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayCannotParse
2021-12-11 05:06:25 +00:00
} else if delay . Unix ( ) < time . Now ( ) . Add ( s . config . MinDelay ) . Unix ( ) {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayTooSmall
2021-12-11 05:06:25 +00:00
} else if delay . Unix ( ) > time . Now ( ) . Add ( s . config . MaxDelay ) . Unix ( ) {
2022-01-17 18:28:07 +00:00
return false , false , "" , false , errHTTPBadRequestDelayTooLarge
2021-12-10 16:31:42 +00:00
}
2021-12-11 05:06:25 +00:00
m . Time = delay . Unix ( )
2021-12-10 16:31:42 +00:00
}
2022-04-16 20:17:58 +00:00
actionsStr := readParam ( r , "x-actions" , "actions" , "action" )
if actionsStr != "" {
2022-04-19 13:14:32 +00:00
m . Actions , err = parseActions ( actionsStr )
if err != nil {
2022-04-27 13:51:23 +00:00
return false , false , "" , false , wrapErrHTTP ( errHTTPBadRequestActionsInvalid , err . Error ( ) )
2022-04-17 18:29:43 +00:00
}
2022-04-16 20:17:58 +00:00
}
2022-01-17 18:28:07 +00:00
unifiedpush = readBoolParam ( r , false , "x-unifiedpush" , "unifiedpush" , "up" ) // see GET too!
2021-12-25 21:07:55 +00:00
if unifiedpush {
firebase = false
2022-01-17 18:28:07 +00:00
unifiedpush = true
2021-12-25 21:07:55 +00:00
}
2022-05-29 02:06:46 +00:00
m . PollID = readParam ( r , "x-poll-id" , "poll-id" )
2022-05-27 11:55:57 +00:00
if m . PollID != "" {
unifiedpush = false
cache = false
email = ""
}
2022-01-17 18:28:07 +00:00
return cache , firebase , email , unifiedpush , nil
2021-11-27 21:12:08 +00:00
}
2022-01-08 20:47:08 +00:00
// handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
//
2022-09-27 16:37:02 +00:00
// 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
// If a message is flagged as poll request, the body does not matter and is discarded
// 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
// If body is binary, encode as base64, if not do not encode
// 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
// Body must be a message, because we attached an external URL
// 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
// Body must be attachment, because we passed a filename
// 5. curl -T file.txt ntfy.sh/mytopic
// If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
// 6. curl -T file.txt ntfy.sh/mytopic
// If file.txt is > message limit, treat it as an attachment
2022-04-03 16:39:52 +00:00
func ( s * Server ) handlePublishBody ( r * http . Request , v * visitor , m * message , body * util . PeekedReadCloser , unifiedpush bool ) error {
2022-05-28 00:30:20 +00:00
if m . Event == pollRequestEvent { // Case 1
2022-06-02 03:24:44 +00:00
return s . handleBodyDiscard ( body )
2022-05-27 11:55:57 +00:00
} else if unifiedpush {
2022-05-28 00:30:20 +00:00
return s . handleBodyAsMessageAutoDetect ( m , body ) // Case 2
2022-01-17 18:28:07 +00:00
} else if m . Attachment != nil && m . Attachment . URL != "" {
2022-05-28 00:30:20 +00:00
return s . handleBodyAsTextMessage ( m , body ) // Case 3
2022-01-08 20:47:08 +00:00
} else if m . Attachment != nil && m . Attachment . Name != "" {
2022-05-28 00:30:20 +00:00
return s . handleBodyAsAttachment ( r , v , m , body ) // Case 4
2022-04-03 16:39:52 +00:00
} else if ! body . LimitReached && utf8 . Valid ( body . PeekedBytes ) {
2022-05-28 00:30:20 +00:00
return s . handleBodyAsTextMessage ( m , body ) // Case 5
2022-01-08 20:47:08 +00:00
}
2022-05-28 00:30:20 +00:00
return s . handleBodyAsAttachment ( r , v , m , body ) // Case 6
2022-01-17 18:28:07 +00:00
}
2022-06-02 03:24:44 +00:00
func ( s * Server ) handleBodyDiscard ( body * util . PeekedReadCloser ) error {
_ , err := io . Copy ( io . Discard , body )
_ = body . Close ( )
return err
}
2022-04-03 16:39:52 +00:00
func ( s * Server ) handleBodyAsMessageAutoDetect ( m * message , body * util . PeekedReadCloser ) error {
if utf8 . Valid ( body . PeekedBytes ) {
m . Message = string ( body . PeekedBytes ) // Do not trim
2022-01-17 18:28:07 +00:00
} else {
2022-04-03 16:39:52 +00:00
m . Message = base64 . StdEncoding . EncodeToString ( body . PeekedBytes )
2022-01-17 18:28:07 +00:00
m . Encoding = encodingBase64
}
return nil
2022-01-08 20:47:08 +00:00
}
2022-04-03 16:39:52 +00:00
func ( s * Server ) handleBodyAsTextMessage ( m * message , body * util . PeekedReadCloser ) error {
if ! utf8 . Valid ( body . PeekedBytes ) {
2022-01-08 20:47:08 +00:00
return errHTTPBadRequestMessageNotUTF8
}
2022-04-03 16:39:52 +00:00
if len ( body . PeekedBytes ) > 0 { // Empty body should not override message (publish via GET!)
m . Message = strings . TrimSpace ( string ( body . PeekedBytes ) ) // Truncates the message to the peek limit if required
2022-01-08 20:47:08 +00:00
}
2022-01-10 18:38:51 +00:00
if m . Attachment != nil && m . Attachment . Name != "" && m . Message == "" {
m . Message = fmt . Sprintf ( defaultAttachmentMessage , m . Attachment . Name )
}
2022-01-08 20:47:08 +00:00
return nil
}
2022-04-03 16:39:52 +00:00
func ( s * Server ) handleBodyAsAttachment ( r * http . Request , v * visitor , m * message , body * util . PeekedReadCloser ) error {
2022-01-12 16:05:04 +00:00
if s . fileCache == nil || s . config . BaseURL == "" || s . config . AttachmentCacheDir == "" {
2022-01-08 20:47:08 +00:00
return errHTTPBadRequestAttachmentsDisallowed
2023-01-07 14:34:02 +00:00
}
2023-01-09 20:40:46 +00:00
vinfo , err := v . Info ( )
if err != nil {
return err
2023-01-07 14:34:02 +00:00
}
2023-01-09 20:40:46 +00:00
attachmentExpiry := time . Now ( ) . Add ( vinfo . Limits . AttachmentExpiryDuration ) . Unix ( )
2023-01-07 14:34:02 +00:00
if m . Time > attachmentExpiry {
2022-01-08 20:47:08 +00:00
return errHTTPBadRequestAttachmentsExpiryBeforeDelivery
}
2022-01-11 17:58:11 +00:00
contentLengthStr := r . Header . Get ( "Content-Length" )
if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
contentLength , err := strconv . ParseInt ( contentLengthStr , 10 , 64 )
2023-01-09 20:40:46 +00:00
if err == nil && ( contentLength > vinfo . Stats . AttachmentTotalSizeRemaining || contentLength > vinfo . Limits . AttachmentFileSizeLimit ) {
2022-12-29 14:57:42 +00:00
return errHTTPEntityTooLargeAttachment
2022-01-11 17:58:11 +00:00
}
}
2022-01-08 20:47:08 +00:00
if m . Attachment == nil {
m . Attachment = & attachment { }
}
2022-01-10 18:38:51 +00:00
var ext string
2023-01-07 14:34:02 +00:00
m . Attachment . Expires = attachmentExpiry
2022-04-03 16:39:52 +00:00
m . Attachment . Type , ext = util . DetectContentType ( body . PeekedBytes , m . Attachment . Name )
2022-01-08 20:47:08 +00:00
m . Attachment . URL = fmt . Sprintf ( "%s/file/%s%s" , s . config . BaseURL , m . ID , ext )
if m . Attachment . Name == "" {
m . Attachment . Name = fmt . Sprintf ( "attachment%s" , ext )
}
if m . Message == "" {
2022-01-10 18:38:51 +00:00
m . Message = fmt . Sprintf ( defaultAttachmentMessage , m . Attachment . Name )
2022-01-03 23:55:08 +00:00
}
2022-12-21 18:19:07 +00:00
limiters := [ ] util . Limiter {
v . BandwidthLimiter ( ) ,
2023-01-09 20:40:46 +00:00
util . NewFixedLimiter ( vinfo . Limits . AttachmentFileSizeLimit ) ,
util . NewFixedLimiter ( vinfo . Stats . AttachmentTotalSizeRemaining ) ,
2022-12-21 18:19:07 +00:00
}
m . Attachment . Size , err = s . fileCache . Write ( m . ID , body , limiters ... )
2022-01-07 13:49:28 +00:00
if err == util . ErrLimitReached {
2022-12-29 14:57:42 +00:00
return errHTTPEntityTooLargeAttachment
2022-01-07 13:49:28 +00:00
} else if err != nil {
2022-01-02 22:56:12 +00:00
return err
}
return nil
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribeJSON ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-10-27 18:56:17 +00:00
encoder := func ( msg * message ) ( string , error ) {
var buf bytes . Buffer
if err := json . NewEncoder ( & buf ) . Encode ( & msg ) ; err != nil {
return "" , err
2021-10-23 01:26:01 +00:00
}
2021-10-27 18:56:17 +00:00
return buf . String ( ) , nil
2021-10-23 01:26:01 +00:00
}
2022-01-16 04:17:46 +00:00
return s . handleSubscribeHTTP ( w , r , v , "application/x-ndjson" , encoder )
2021-10-23 01:26:01 +00:00
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribeSSE ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-10-27 18:56:17 +00:00
encoder := func ( msg * message ) ( string , error ) {
2021-10-23 17:21:33 +00:00
var buf bytes . Buffer
if err := json . NewEncoder ( & buf ) . Encode ( & msg ) ; err != nil {
2021-10-27 18:56:17 +00:00
return "" , err
2021-10-23 17:21:33 +00:00
}
2021-10-29 12:29:27 +00:00
if msg . Event != messageEvent {
2021-10-27 18:56:17 +00:00
return fmt . Sprintf ( "event: %s\ndata: %s\n" , msg . Event , buf . String ( ) ) , nil // Browser's .onmessage() does not fire on this!
2021-10-23 17:21:33 +00:00
}
2021-10-27 18:56:17 +00:00
return fmt . Sprintf ( "data: %s\n" , buf . String ( ) ) , nil
2021-10-23 19:22:17 +00:00
}
2022-01-16 04:17:46 +00:00
return s . handleSubscribeHTTP ( w , r , v , "text/event-stream" , encoder )
2021-10-23 17:21:33 +00:00
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribeRaw ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-10-27 18:56:17 +00:00
encoder := func ( msg * message ) ( string , error ) {
2021-11-02 18:10:56 +00:00
if msg . Event == messageEvent { // only handle default events
2021-10-27 18:56:17 +00:00
return strings . ReplaceAll ( msg . Message , "\n" , " " ) + "\n" , nil
}
return "\n" , nil // "keepalive" and "open" events just send an empty line
}
2022-01-16 04:17:46 +00:00
return s . handleSubscribeHTTP ( w , r , v , "text/plain" , encoder )
2021-10-27 18:56:17 +00:00
}
2022-01-16 04:17:46 +00:00
func ( s * Server ) handleSubscribeHTTP ( w http . ResponseWriter , r * http . Request , v * visitor , contentType string , encoder messageEncoder ) error {
2023-02-09 20:24:12 +00:00
logvr ( v , r ) . Tag ( tagSubscribe ) . Debug ( "HTTP stream connection opened" )
defer logvr ( v , r ) . Tag ( tagSubscribe ) . Debug ( "HTTP stream connection closed" )
2023-01-27 16:33:51 +00:00
if ! v . SubscriptionAllowed ( ) {
2021-12-25 14:15:05 +00:00
return errHTTPTooManyRequestsLimitSubscriptions
2021-11-01 19:21:38 +00:00
}
defer v . RemoveSubscription ( )
2022-01-16 04:17:46 +00:00
topics , topicsStr , err := s . topicsFromPath ( r . URL . Path )
2021-11-01 20:39:40 +00:00
if err != nil {
return err
}
2022-01-16 04:17:46 +00:00
poll , since , scheduled , filters , err := parseSubscribeParams ( r )
2021-12-21 20:22:27 +00:00
if err != nil {
return err
}
2021-12-22 08:44:16 +00:00
var wlock sync . Mutex
2022-06-22 17:47:54 +00:00
defer func ( ) {
// Hack: This is the fix for a horrible data race that I have not been able to figure out in quite some time.
// It appears to be happening when the Go HTTP code reads from the socket when closing the request (i.e. AFTER
// this function returns), and causes a data race with the ResponseWriter. Locking wlock here silences the
// data race detector. See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889.
wlock . TryLock ( )
} ( )
2022-06-01 00:38:56 +00:00
sub := func ( v * visitor , msg * message ) error {
2022-01-16 04:17:46 +00:00
if ! filters . Pass ( msg ) {
2021-12-21 20:22:27 +00:00
return nil
}
2021-10-27 18:56:17 +00:00
m , err := encoder ( msg )
if err != nil {
return err
}
2021-12-21 20:22:27 +00:00
wlock . Lock ( )
defer wlock . Unlock ( )
2021-10-27 18:56:17 +00:00
if _ , err := w . Write ( [ ] byte ( m ) ) ; err != nil {
2021-10-24 01:29:45 +00:00
return err
}
if fl , ok := w . ( http . Flusher ) ; ok {
fl . Flush ( )
}
return nil
2021-10-27 18:56:17 +00:00
}
2023-01-23 19:05:41 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , s . config . AccessControlAllowOrigin ) // CORS, allow cross-origin requests
w . Header ( ) . Set ( "Content-Type" , contentType + "; charset=utf-8" ) // Android/Volley client needs charset!
2021-10-29 17:58:14 +00:00
if poll {
2022-06-01 00:38:56 +00:00
return s . sendOldMessages ( topics , since , scheduled , v , sub )
2021-10-29 17:58:14 +00:00
}
2023-01-23 19:05:41 +00:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
2021-11-15 12:56:58 +00:00
subscriberIDs := make ( [ ] int , 0 )
for _ , t := range topics {
2023-02-14 19:07:32 +00:00
subscriberIDs = append ( subscriberIDs , t . Subscribe ( sub , v , cancel ) )
2021-11-15 12:56:58 +00:00
}
defer func ( ) {
for i , subscriberID := range subscriberIDs {
topics [ i ] . Unsubscribe ( subscriberID ) // Order!
}
} ( )
2022-06-01 00:38:56 +00:00
if err := sub ( v , newOpenMessage ( topicsStr ) ) ; err != nil { // Send out open message
2021-10-29 17:58:14 +00:00
return err
}
2022-06-01 00:38:56 +00:00
if err := s . sendOldMessages ( topics , since , scheduled , v , sub ) ; err != nil {
2021-10-27 18:56:17 +00:00
return err
}
for {
select {
2023-01-23 19:05:41 +00:00
case <- ctx . Done ( ) :
return nil
2021-10-27 18:56:17 +00:00
case <- r . Context ( ) . Done ( ) :
return nil
case <- time . After ( s . config . KeepaliveInterval ) :
2023-02-09 20:24:12 +00:00
logvr ( v , r ) . Tag ( tagSubscribe ) . Trace ( "Sending keepalive message" )
2021-11-01 19:21:38 +00:00
v . Keepalive ( )
2022-06-01 00:38:56 +00:00
if err := sub ( v , newKeepaliveMessage ( topicsStr ) ) ; err != nil { // Send keepalive message
2021-10-27 18:56:17 +00:00
return err
}
}
2021-10-24 01:29:45 +00:00
}
}
2022-01-15 18:23:35 +00:00
func ( s * Server ) handleSubscribeWS ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-04-29 17:23:04 +00:00
if strings . ToLower ( r . Header . Get ( "Upgrade" ) ) != "websocket" {
2022-01-16 22:54:15 +00:00
return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
}
2023-01-27 16:33:51 +00:00
if ! v . SubscriptionAllowed ( ) {
2022-01-15 18:23:35 +00:00
return errHTTPTooManyRequestsLimitSubscriptions
}
defer v . RemoveSubscription ( )
2023-02-04 03:21:50 +00:00
logvr ( v , r ) . Tag ( tagWebsocket ) . Debug ( "WebSocket connection opened" )
defer logvr ( v , r ) . Tag ( tagWebsocket ) . Debug ( "WebSocket connection closed" )
2022-01-16 04:17:46 +00:00
topics , topicsStr , err := s . topicsFromPath ( r . URL . Path )
2022-01-15 18:23:35 +00:00
if err != nil {
return err
}
2022-01-16 04:17:46 +00:00
poll , since , scheduled , filters , err := parseSubscribeParams ( r )
2022-01-15 18:23:35 +00:00
if err != nil {
return err
}
upgrader := & websocket . Upgrader {
ReadBufferSize : wsBufferSize ,
WriteBufferSize : wsBufferSize ,
CheckOrigin : func ( r * http . Request ) bool {
return true // We're open for business!
} ,
}
conn , err := upgrader . Upgrade ( w , r , nil )
if err != nil {
return err
}
defer conn . Close ( )
2023-01-23 19:05:41 +00:00
// Subscription connections can be canceled externally, see topic.CancelSubscribers
2023-01-24 20:31:39 +00:00
cancelCtx , cancel := context . WithCancel ( context . Background ( ) )
2023-01-23 19:05:41 +00:00
defer cancel ( )
// Use errgroup to run WebSocket reader and writer in Go routines
2022-01-16 05:07:32 +00:00
var wlock sync . Mutex
2023-01-24 20:31:39 +00:00
g , gctx := errgroup . WithContext ( cancelCtx )
2022-01-15 18:23:35 +00:00
g . Go ( func ( ) error {
pongWait := s . config . KeepaliveInterval + wsPongWait
conn . SetReadLimit ( wsReadLimit )
if err := conn . SetReadDeadline ( time . Now ( ) . Add ( pongWait ) ) ; err != nil {
return err
}
conn . SetPongHandler ( func ( appData string ) error {
2023-02-04 03:21:50 +00:00
logvr ( v , r ) . Tag ( tagWebsocket ) . Trace ( "Received WebSocket pong" )
2022-01-15 18:23:35 +00:00
return conn . SetReadDeadline ( time . Now ( ) . Add ( pongWait ) )
} )
for {
_ , _ , err := conn . NextReader ( )
if err != nil {
return err
}
2023-01-23 19:05:41 +00:00
select {
case <- gctx . Done ( ) :
return nil
default :
}
2022-01-15 18:23:35 +00:00
}
} )
g . Go ( func ( ) error {
ping := func ( ) error {
2022-01-16 05:07:32 +00:00
wlock . Lock ( )
defer wlock . Unlock ( )
2022-01-15 18:23:35 +00:00
if err := conn . SetWriteDeadline ( time . Now ( ) . Add ( wsWriteWait ) ) ; err != nil {
return err
}
2023-02-04 03:21:50 +00:00
logvr ( v , r ) . Tag ( tagWebsocket ) . Trace ( "Sending WebSocket ping" )
2022-01-15 18:23:35 +00:00
return conn . WriteMessage ( websocket . PingMessage , nil )
}
for {
select {
2023-01-23 19:05:41 +00:00
case <- gctx . Done ( ) :
2022-01-15 18:23:35 +00:00
return nil
2023-01-24 20:31:39 +00:00
case <- cancelCtx . Done ( ) :
2023-02-04 03:21:50 +00:00
logvr ( v , r ) . Tag ( tagWebsocket ) . Trace ( "Cancel received, closing subscriber connection" )
2023-01-24 19:44:14 +00:00
conn . Close ( )
return & websocket . CloseError { Code : websocket . CloseNormalClosure , Text : "subscription was canceled" }
2022-01-15 18:23:35 +00:00
case <- time . After ( s . config . KeepaliveInterval ) :
v . Keepalive ( )
if err := ping ( ) ; err != nil {
return err
}
}
}
} )
2022-06-01 00:38:56 +00:00
sub := func ( v * visitor , msg * message ) error {
2022-01-16 04:17:46 +00:00
if ! filters . Pass ( msg ) {
2022-01-15 18:23:35 +00:00
return nil
}
2022-01-16 05:07:32 +00:00
wlock . Lock ( )
defer wlock . Unlock ( )
2022-01-15 18:23:35 +00:00
if err := conn . SetWriteDeadline ( time . Now ( ) . Add ( wsWriteWait ) ) ; err != nil {
return err
}
return conn . WriteJSON ( msg )
}
2023-01-23 19:05:41 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , s . config . AccessControlAllowOrigin ) // CORS, allow cross-origin requests
2022-01-15 18:23:35 +00:00
if poll {
2022-06-01 00:38:56 +00:00
return s . sendOldMessages ( topics , since , scheduled , v , sub )
2022-01-15 18:23:35 +00:00
}
subscriberIDs := make ( [ ] int , 0 )
for _ , t := range topics {
2023-02-14 19:07:32 +00:00
subscriberIDs = append ( subscriberIDs , t . Subscribe ( sub , v , cancel ) )
2022-01-15 18:23:35 +00:00
}
defer func ( ) {
for i , subscriberID := range subscriberIDs {
topics [ i ] . Unsubscribe ( subscriberID ) // Order!
}
} ( )
2022-06-01 00:38:56 +00:00
if err := sub ( v , newOpenMessage ( topicsStr ) ) ; err != nil { // Send out open message
2022-01-15 18:23:35 +00:00
return err
}
2022-06-01 00:38:56 +00:00
if err := s . sendOldMessages ( topics , since , scheduled , v , sub ) ; err != nil {
2022-01-15 18:23:35 +00:00
return err
}
2022-01-16 03:33:35 +00:00
err = g . Wait ( )
2022-06-02 03:24:44 +00:00
if err != nil && websocket . IsCloseError ( err , websocket . CloseNormalClosure , websocket . CloseGoingAway , websocket . CloseAbnormalClosure ) {
2023-02-06 21:01:32 +00:00
logvr ( v , r ) . Tag ( tagWebsocket ) . Err ( err ) . Fields ( websocketErrorContext ( err ) ) . Trace ( "WebSocket connection closed" )
2022-06-02 03:24:44 +00:00
return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
2022-01-16 03:33:35 +00:00
}
return err
2022-01-15 18:23:35 +00:00
}
2022-02-26 20:57:10 +00:00
func parseSubscribeParams ( r * http . Request ) ( poll bool , since sinceMarker , scheduled bool , filters * queryFilter , err error ) {
2022-01-16 04:17:46 +00:00
poll = readBoolParam ( r , false , "x-poll" , "poll" , "po" )
scheduled = readBoolParam ( r , false , "x-scheduled" , "scheduled" , "sched" )
since , err = parseSince ( r , poll )
if err != nil {
return
2021-12-21 20:22:27 +00:00
}
2022-01-16 04:17:46 +00:00
filters , err = parseQueryFilters ( r )
if err != nil {
return
2021-12-21 20:22:27 +00:00
}
2022-01-16 04:17:46 +00:00
return
2021-12-21 20:22:27 +00:00
}
2022-06-20 16:11:52 +00:00
// sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
// marker, returning only messages that are newer than the marker.
2022-06-01 00:38:56 +00:00
func ( s * Server ) sendOldMessages ( topics [ ] * topic , since sinceMarker , scheduled bool , v * visitor , sub subscriber ) error {
2021-11-08 14:46:31 +00:00
if since . IsNone ( ) {
2021-10-29 17:58:14 +00:00
return nil
}
2022-06-20 16:11:52 +00:00
messages := make ( [ ] * message , 0 )
2021-11-15 12:56:58 +00:00
for _ , t := range topics {
2022-06-20 16:11:52 +00:00
topicMessages , err := s . messageCache . Messages ( t . ID , since , scheduled )
2021-11-15 12:56:58 +00:00
if err != nil {
2021-10-29 17:58:14 +00:00
return err
}
2022-06-20 16:11:52 +00:00
messages = append ( messages , topicMessages ... )
}
sort . Slice ( messages , func ( i , j int ) bool {
return messages [ i ] . Time < messages [ j ] . Time
} )
for _ , m := range messages {
if err := sub ( v , m ) ; err != nil {
return err
2021-11-15 12:56:58 +00:00
}
2021-10-29 17:58:14 +00:00
}
2021-10-24 17:34:15 +00:00
return nil
}
2021-11-08 14:46:31 +00:00
// parseSince returns a timestamp identifying the time span from which cached messages should be received.
//
// Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
// "all" for all messages.
2022-02-26 20:57:10 +00:00
func parseSince ( r * http . Request , poll bool ) ( sinceMarker , error ) {
2021-12-22 08:44:16 +00:00
since := readParam ( r , "x-since" , "since" , "si" )
2022-02-26 20:57:10 +00:00
// Easy cases (empty, all, none)
2021-12-22 08:44:16 +00:00
if since == "" {
if poll {
2021-11-08 14:46:31 +00:00
return sinceAllMessages , nil
}
return sinceNoMessages , nil
2022-02-26 20:57:10 +00:00
} else if since == "all" {
2021-11-08 14:46:31 +00:00
return sinceAllMessages , nil
2022-02-26 20:57:10 +00:00
} else if since == "none" {
return sinceNoMessages , nil
}
// ID, timestamp, duration
if validMessageID ( since ) {
return newSinceID ( since ) , nil
2021-12-22 08:44:16 +00:00
} else if s , err := strconv . ParseInt ( since , 10 , 64 ) ; err == nil {
2022-02-26 20:57:10 +00:00
return newSinceTime ( s ) , nil
2021-12-22 08:44:16 +00:00
} else if d , err := time . ParseDuration ( since ) ; err == nil {
2022-02-26 20:57:10 +00:00
return newSinceTime ( time . Now ( ) . Add ( - 1 * d ) . Unix ( ) ) , nil
2021-10-29 17:58:14 +00:00
}
2021-12-25 14:15:05 +00:00
return sinceNoMessages , errHTTPBadRequestSinceInvalid
2021-10-29 17:58:14 +00:00
}
2022-05-13 18:42:25 +00:00
func ( s * Server ) handleOptions ( w http . ResponseWriter , _ * http . Request , _ * visitor ) error {
2022-12-25 16:41:38 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Methods" , "GET, PUT, POST, PATCH, DELETE" )
2023-01-18 20:50:06 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , s . config . AccessControlAllowOrigin ) // CORS, allow cross-origin requests
w . Header ( ) . Set ( "Access-Control-Allow-Headers" , "*" ) // CORS, allow auth via JS // FIXME is this terrible?
2021-10-24 18:22:53 +00:00
return nil
}
2021-12-15 14:41:55 +00:00
func ( s * Server ) topicFromPath ( path string ) ( * topic , error ) {
parts := strings . Split ( path , "/" )
if len ( parts ) < 2 {
2021-12-25 14:15:05 +00:00
return nil , errHTTPBadRequestTopicInvalid
2021-12-15 14:41:55 +00:00
}
2023-01-23 19:05:41 +00:00
return s . topicFromID ( parts [ 1 ] )
2021-11-15 12:56:58 +00:00
}
2022-01-16 04:17:46 +00:00
func ( s * Server ) topicsFromPath ( path string ) ( [ ] * topic , string , error ) {
parts := strings . Split ( path , "/" )
if len ( parts ) < 2 {
return nil , "" , errHTTPBadRequestTopicInvalid
}
topicIDs := util . SplitNoEmpty ( parts [ 1 ] , "," )
topics , err := s . topicsFromIDs ( topicIDs ... )
if err != nil {
return nil , "" , errHTTPBadRequestTopicInvalid
}
return topics , parts [ 1 ] , nil
}
2021-11-27 21:12:08 +00:00
func ( s * Server ) topicsFromIDs ( ids ... string ) ( [ ] * topic , error ) {
2021-10-23 01:26:01 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2021-11-15 12:56:58 +00:00
topics := make ( [ ] * topic , 0 )
2021-11-27 21:12:08 +00:00
for _ , id := range ids {
2023-02-09 13:32:51 +00:00
if util . Contains ( s . config . DisallowedTopics , id ) {
2021-12-25 14:15:05 +00:00
return nil , errHTTPBadRequestTopicDisallowed
2021-12-09 03:13:59 +00:00
}
2021-11-15 12:56:58 +00:00
if _ , ok := s . topics [ id ] ; ! ok {
2022-01-02 22:56:12 +00:00
if len ( s . topics ) >= s . config . TotalTopicLimit {
2022-01-12 16:05:04 +00:00
return nil , errHTTPTooManyRequestsLimitTotalTopics
2021-11-15 12:56:58 +00:00
}
2021-12-09 03:57:31 +00:00
s . topics [ id ] = newTopic ( id )
2021-10-29 17:58:14 +00:00
}
2021-11-15 12:56:58 +00:00
topics = append ( topics , s . topics [ id ] )
2021-10-23 01:26:01 +00:00
}
2021-11-15 12:56:58 +00:00
return topics , nil
2021-10-23 01:26:01 +00:00
}
2023-01-23 19:05:41 +00:00
func ( s * Server ) topicFromID ( id string ) ( * topic , error ) {
topics , err := s . topicsFromIDs ( id )
if err != nil {
return nil , err
}
return topics [ 0 ] , nil
}
2022-12-28 03:14:14 +00:00
func ( s * Server ) execManager ( ) {
2022-06-21 23:07:27 +00:00
// WARNING: Make sure to only selectively lock with the mutex, and be aware that this
// there is no mutex for the entire function.
2021-10-29 17:58:14 +00:00
// Expire visitors from rate visitors map
2022-06-02 03:24:44 +00:00
staleVisitors := 0
2023-02-09 20:24:12 +00:00
log .
Tag ( tagManager ) .
Timing ( func ( ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
for ip , v := range s . visitors {
if v . Stale ( ) {
log . Tag ( tagManager ) . With ( v ) . Trace ( "Deleting stale visitor" )
delete ( s . visitors , ip )
staleVisitors ++
}
}
} ) .
Field ( "stale_visitors" , staleVisitors ) .
Debug ( "Deleted %d stale visitor(s)" , staleVisitors )
2021-10-24 01:29:45 +00:00
2023-01-23 03:21:30 +00:00
// Delete expired user tokens and users
2022-12-28 03:14:14 +00:00
if s . userManager != nil {
2023-02-09 20:24:12 +00:00
log .
Tag ( tagManager ) .
Timing ( func ( ) {
if err := s . userManager . RemoveExpiredTokens ( ) ; err != nil {
log . Tag ( tagManager ) . Err ( err ) . Warn ( "Error expiring user tokens" )
}
if err := s . userManager . RemoveDeletedUsers ( ) ; err != nil {
log . Tag ( tagManager ) . Err ( err ) . Warn ( "Error deleting soft-deleted users" )
}
} ) .
Debug ( "Removed expired tokens and users" )
2022-12-25 16:41:38 +00:00
}
2022-01-07 14:15:33 +00:00
// Delete expired attachments
2023-01-07 14:34:02 +00:00
if s . fileCache != nil {
2023-02-09 20:24:12 +00:00
log .
Tag ( tagManager ) .
Timing ( func ( ) {
ids , err := s . messageCache . AttachmentsExpired ( )
if err != nil {
log . Tag ( tagManager ) . Err ( err ) . Warn ( "Error retrieving expired attachments" )
} else if len ( ids ) > 0 {
if log . Tag ( tagManager ) . IsDebug ( ) {
log . Tag ( tagManager ) . Debug ( "Deleting attachments %s" , strings . Join ( ids , ", " ) )
}
if err := s . fileCache . Remove ( ids ... ) ; err != nil {
log . Tag ( tagManager ) . Err ( err ) . Warn ( "Error deleting attachments" )
}
if err := s . messageCache . MarkAttachmentsDeleted ( ids ... ) ; err != nil {
log . Tag ( tagManager ) . Err ( err ) . Warn ( "Error marking attachments deleted" )
}
} else {
log . Tag ( tagManager ) . Debug ( "No expired attachments to delete" )
}
} ) .
Debug ( "Deleted expired attachments" )
2022-01-07 14:15:33 +00:00
}
2023-01-23 03:21:30 +00:00
// Prune messages
2023-02-09 20:24:12 +00:00
log .
Tag ( tagManager ) .
Timing ( func ( ) {
expiredMessageIDs , err := s . messageCache . MessagesExpired ( )
if err != nil {
log . Tag ( tagManager ) . Err ( err ) . Warn ( "Error retrieving expired messages" )
} else if len ( expiredMessageIDs ) > 0 {
if err := s . fileCache . Remove ( expiredMessageIDs ... ) ; err != nil {
log . Tag ( tagManager ) . Err ( err ) . Warn ( "Error deleting attachments for expired messages" )
}
if err := s . messageCache . DeleteMessages ( expiredMessageIDs ... ) ; err != nil {
log . Tag ( tagManager ) . Err ( err ) . Warn ( "Error marking attachments deleted" )
}
} else {
log . Tag ( tagManager ) . Debug ( "No expired messages to delete" )
}
} ) .
Debug ( "Pruned messages" )
2021-11-02 18:08:21 +00:00
2022-06-21 23:07:27 +00:00
// Message count per topic
2023-02-07 21:20:49 +00:00
var messagesCached int
2022-06-21 23:07:27 +00:00
messageCounts , err := s . messageCache . MessageCounts ( )
if err != nil {
2023-02-07 21:20:49 +00:00
log . Tag ( tagManager ) . Err ( err ) . Warn ( "Cannot get message counts" )
2022-06-21 23:07:27 +00:00
messageCounts = make ( map [ string ] int ) // Empty, so we can continue
}
for _ , count := range messageCounts {
2023-02-07 21:20:49 +00:00
messagesCached += count
2022-06-21 23:07:27 +00:00
}
2022-06-21 23:45:23 +00:00
// Remove subscriptions without subscribers
2023-02-09 20:24:12 +00:00
var emptyTopics , subscribers int
log .
Tag ( tagManager ) .
Timing ( func ( ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
for _ , t := range s . topics {
subs := t . SubscribersCount ( )
log . Tag ( tagManager ) . Trace ( "- topic %s: %d subscribers" , t . ID , subs )
msgs , exists := messageCounts [ t . ID ]
if subs == 0 && ( ! exists || msgs == 0 ) {
log . Tag ( tagManager ) . Trace ( "Deleting empty topic %s" , t . ID )
emptyTopics ++
delete ( s . topics , t . ID )
continue
}
subscribers += subs
}
} ) .
Debug ( "Removed %d empty topic(s)" , emptyTopics )
2021-11-03 01:09:49 +00:00
2021-12-27 21:18:15 +00:00
// Mail stats
2022-06-02 03:24:44 +00:00
var receivedMailTotal , receivedMailSuccess , receivedMailFailure int64
if s . smtpServerBackend != nil {
receivedMailTotal , receivedMailSuccess , receivedMailFailure = s . smtpServerBackend . Counts ( )
}
var sentMailTotal , sentMailSuccess , sentMailFailure int64
if s . smtpSender != nil {
sentMailTotal , sentMailSuccess , sentMailFailure = s . smtpSender . Counts ( )
2021-12-27 21:18:15 +00:00
}
2021-12-27 21:06:40 +00:00
2021-11-03 01:09:49 +00:00
// Print stats
2022-06-22 19:11:50 +00:00
s . mu . Lock ( )
messagesCount , topicsCount , visitorsCount := s . messages , len ( s . topics ) , len ( s . visitors )
s . mu . Unlock ( )
2023-02-07 21:20:49 +00:00
log .
Tag ( tagManager ) .
Fields ( log . Context {
"messages_published" : messagesCount ,
"messages_cached" : messagesCached ,
"topics_active" : topicsCount ,
"subscribers" : subscribers ,
"visitors" : visitorsCount ,
"emails_received" : receivedMailTotal ,
"emails_received_success" : receivedMailSuccess ,
"emails_received_failure" : receivedMailFailure ,
"emails_sent" : sentMailTotal ,
"emails_sent_success" : sentMailSuccess ,
"emails_sent_failure" : sentMailFailure ,
} ) .
Info ( "Server stats" )
2021-10-24 01:29:45 +00:00
}
2021-10-24 02:49:50 +00:00
2021-12-27 21:06:40 +00:00
func ( s * Server ) runSMTPServer ( ) error {
2022-06-02 03:24:44 +00:00
s . smtpServerBackend = newMailBackend ( s . config , s . handle )
s . smtpServer = smtp . NewServer ( s . smtpServerBackend )
2021-12-27 21:06:40 +00:00
s . smtpServer . Addr = s . config . SMTPServerListen
s . smtpServer . Domain = s . config . SMTPServerDomain
s . smtpServer . ReadTimeout = 10 * time . Second
s . smtpServer . WriteTimeout = 10 * time . Second
2021-12-28 00:26:20 +00:00
s . smtpServer . MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
2021-12-27 21:06:40 +00:00
s . smtpServer . MaxRecipients = 1
s . smtpServer . AllowInsecureAuth = true
return s . smtpServer . ListenAndServe ( )
2021-12-27 14:48:09 +00:00
}
2021-12-15 14:13:16 +00:00
func ( s * Server ) runManager ( ) {
2021-12-22 13:17:50 +00:00
for {
select {
case <- time . After ( s . config . ManagerInterval ) :
2023-02-09 20:24:12 +00:00
log .
Tag ( tagManager ) .
Timing ( s . execManager ) .
Debug ( "Manager finished" )
2021-12-22 13:17:50 +00:00
case <- s . closeChan :
return
2021-12-15 14:13:16 +00:00
}
2021-12-22 13:17:50 +00:00
}
2021-12-15 14:13:16 +00:00
}
2023-01-18 18:46:40 +00:00
// runStatsResetter runs once a day (usually midnight UTC) to reset all the visitor's message and
// email counters. The stats are used to display the counters in the web app, as well as for rate limiting.
2023-01-11 03:51:51 +00:00
func ( s * Server ) runStatsResetter ( ) {
for {
runAt := util . NextOccurrenceUTC ( s . config . VisitorStatsResetTime , time . Now ( ) )
timer := time . NewTimer ( time . Until ( runAt ) )
2023-02-04 03:21:50 +00:00
log . Tag ( tagResetter ) . Debug ( "Waiting until %v to reset visitor stats" , runAt )
2023-01-11 03:51:51 +00:00
select {
case <- timer . C :
2023-02-04 03:21:50 +00:00
log . Tag ( tagResetter ) . Debug ( "Running stats resetter" )
2023-01-11 03:51:51 +00:00
s . resetStats ( )
case <- s . closeChan :
2023-02-04 03:21:50 +00:00
log . Tag ( tagResetter ) . Debug ( "Stopping stats resetter" )
2023-01-11 03:51:51 +00:00
timer . Stop ( )
return
}
}
}
func ( s * Server ) resetStats ( ) {
log . Info ( "Resetting all visitor stats (daily task)" )
s . mu . Lock ( )
defer s . mu . Unlock ( ) // Includes the database query to avoid races with other processes
for _ , v := range s . visitors {
v . ResetStats ( )
}
if s . userManager != nil {
if err := s . userManager . ResetStats ( ) ; err != nil {
2023-02-04 03:21:50 +00:00
log . Tag ( tagResetter ) . Warn ( "Failed to write to database: %s" , err . Error ( ) )
2023-01-11 03:51:51 +00:00
}
}
}
2022-01-21 19:17:59 +00:00
func ( s * Server ) runFirebaseKeepaliver ( ) {
2022-06-01 03:16:44 +00:00
if s . firebaseClient == nil {
2021-12-15 14:13:16 +00:00
return
}
2023-01-03 01:08:37 +00:00
v := newVisitor ( s . config , s . messageCache , s . userManager , netip . IPv4Unspecified ( ) , nil ) // Background process, not a real visitor, uses IP 0.0.0.0
2021-12-15 14:13:16 +00:00
for {
2021-12-22 13:17:50 +00:00
select {
case <- time . After ( s . config . FirebaseKeepaliveInterval ) :
2022-06-01 03:16:44 +00:00
s . sendToFirebase ( v , newKeepaliveMessage ( firebaseControlTopic ) )
2022-05-26 01:39:46 +00:00
case <- time . After ( s . config . FirebasePollInterval ) :
2022-06-01 03:16:44 +00:00
s . sendToFirebase ( v , newKeepaliveMessage ( firebasePollTopic ) )
2021-12-22 13:17:50 +00:00
case <- s . closeChan :
return
2021-12-15 14:13:16 +00:00
}
}
}
2021-12-22 13:17:50 +00:00
2022-06-02 03:24:44 +00:00
func ( s * Server ) runDelayedSender ( ) {
for {
select {
case <- time . After ( s . config . DelayedSenderInterval ) :
if err := s . sendDelayedMessages ( ) ; err != nil {
2023-02-04 03:21:50 +00:00
log . Tag ( tagPublish ) . Err ( err ) . Warn ( "Error sending delayed messages" )
2022-06-02 03:24:44 +00:00
}
case <- s . closeChan :
return
}
}
}
2021-12-10 16:31:42 +00:00
func ( s * Server ) sendDelayedMessages ( ) error {
2022-02-27 19:47:28 +00:00
messages , err := s . messageCache . MessagesDue ( )
2021-12-10 16:31:42 +00:00
if err != nil {
return err
}
for _ , m := range messages {
2023-01-27 03:57:18 +00:00
var u * user . User
2022-12-29 16:09:45 +00:00
if s . userManager != nil && m . User != "" {
2023-01-27 03:57:18 +00:00
u , err = s . userManager . User ( m . User )
2022-12-22 02:55:39 +00:00
if err != nil {
2023-02-06 04:34:27 +00:00
log . With ( m ) . Err ( err ) . Warn ( "Error sending delayed message" )
2022-12-22 02:55:39 +00:00
continue
}
}
2023-01-27 03:57:18 +00:00
v := s . visitor ( m . Sender , u )
2022-06-01 00:38:56 +00:00
if err := s . sendDelayedMessage ( v , m ) ; err != nil {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Err ( err ) . Warn ( "Error sending delayed message" )
2022-01-10 20:36:12 +00:00
}
2022-06-01 00:38:56 +00:00
}
return nil
}
func ( s * Server ) sendDelayedMessage ( v * visitor , m * message ) error {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Debug ( "Sending delayed message" )
2022-06-22 00:07:08 +00:00
s . mu . Lock ( )
2022-06-01 00:38:56 +00:00
t , ok := s . topics [ m . Topic ] // If no subscribers, just mark message as published
2022-06-22 00:07:08 +00:00
s . mu . Unlock ( )
2022-06-01 00:38:56 +00:00
if ok {
2022-06-01 01:39:19 +00:00
go func ( ) {
// We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
if err := t . Publish ( v , m ) ; err != nil {
2023-02-04 03:21:50 +00:00
logvm ( v , m ) . Err ( err ) . Warn ( "Unable to publish message" )
2021-12-10 16:31:42 +00:00
}
2022-06-01 01:39:19 +00:00
} ( )
2022-06-01 00:38:56 +00:00
}
2022-06-01 03:16:44 +00:00
if s . firebaseClient != nil { // Firebase subscribers may not show up in topics map
2022-06-01 01:39:19 +00:00
go s . sendToFirebase ( v , m )
}
if s . config . UpstreamBaseURL != "" {
go s . forwardPollRequest ( v , m )
2021-12-10 16:31:42 +00:00
}
2022-06-01 00:38:56 +00:00
if err := s . messageCache . MarkPublished ( m ) ; err != nil {
return err
2021-12-10 16:31:42 +00:00
}
return nil
}
2022-04-03 16:39:52 +00:00
// transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
2022-03-16 18:16:54 +00:00
// before passing it on to the next handler. This is meant to be used in combination with handlePublish.
func ( s * Server ) transformBodyJSON ( next handleFunc ) handleFunc {
2022-03-15 20:00:59 +00:00
return func ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2023-01-28 04:10:59 +00:00
m , err := readJSONWithLimit [ publishMessage ] ( r . Body , s . config . MessageLimit * 2 , false ) // 2x to account for JSON format overhead
2022-03-15 20:00:59 +00:00
if err != nil {
return err
}
if ! topicRegex . MatchString ( m . Topic ) {
2022-03-16 18:16:54 +00:00
return errHTTPBadRequestTopicInvalid
2022-03-15 20:00:59 +00:00
}
if m . Message == "" {
m . Message = emptyMessageBody
}
r . URL . Path = "/" + m . Topic
r . Body = io . NopCloser ( strings . NewReader ( m . Message ) )
if m . Title != "" {
r . Header . Set ( "X-Title" , m . Title )
}
2022-03-16 18:16:54 +00:00
if m . Priority != 0 {
r . Header . Set ( "X-Priority" , fmt . Sprintf ( "%d" , m . Priority ) )
2022-03-15 20:00:59 +00:00
}
2022-03-16 18:16:54 +00:00
if m . Tags != nil && len ( m . Tags ) > 0 {
r . Header . Set ( "X-Tags" , strings . Join ( m . Tags , "," ) )
2022-03-15 20:00:59 +00:00
}
if m . Attach != "" {
r . Header . Set ( "X-Attach" , m . Attach )
}
if m . Filename != "" {
r . Header . Set ( "X-Filename" , m . Filename )
}
if m . Click != "" {
r . Header . Set ( "X-Click" , m . Click )
}
2022-07-16 19:31:03 +00:00
if m . Icon != "" {
r . Header . Set ( "X-Icon" , m . Icon )
}
2022-04-16 20:17:58 +00:00
if len ( m . Actions ) > 0 {
actionsStr , err := json . Marshal ( m . Actions )
if err != nil {
2022-12-29 14:57:42 +00:00
return errHTTPBadRequestMessageJSONInvalid
2022-04-16 20:17:58 +00:00
}
r . Header . Set ( "X-Actions" , string ( actionsStr ) )
}
2022-03-29 19:40:26 +00:00
if m . Email != "" {
r . Header . Set ( "X-Email" , m . Email )
}
if m . Delay != "" {
r . Header . Set ( "X-Delay" , m . Delay )
}
2022-03-15 20:00:59 +00:00
return next ( w , r , v )
}
}
2022-06-14 02:07:30 +00:00
func ( s * Server ) transformMatrixJSON ( next handleFunc ) handleFunc {
return func ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-06-16 00:36:49 +00:00
newRequest , err := newRequestFromMatrixJSON ( r , s . config . BaseURL , s . config . MessageLimit )
2022-06-14 02:07:30 +00:00
if err != nil {
2023-02-05 02:26:01 +00:00
logvr ( v , r ) . Tag ( tagMatrix ) . Err ( err ) . Trace ( "Invalid Matrix request" )
2022-06-14 02:07:30 +00:00
return err
}
2022-06-16 00:36:49 +00:00
if err := next ( w , newRequest , v ) ; err != nil {
return & errMatrix { pushKey : newRequest . Header . Get ( matrixPushKeyHeader ) , err : err }
2022-06-14 02:07:30 +00:00
}
return nil
}
}
2022-12-03 20:20:59 +00:00
func ( s * Server ) authorizeTopicWrite ( next handleFunc ) handleFunc {
2022-12-25 16:41:38 +00:00
return s . autorizeTopic ( next , user . PermissionWrite )
2022-01-22 03:22:27 +00:00
}
2022-12-03 20:20:59 +00:00
func ( s * Server ) authorizeTopicRead ( next handleFunc ) handleFunc {
2022-12-25 16:41:38 +00:00
return s . autorizeTopic ( next , user . PermissionRead )
2022-01-22 03:22:27 +00:00
}
2022-12-25 16:41:38 +00:00
func ( s * Server ) autorizeTopic ( next handleFunc , perm user . Permission ) handleFunc {
2022-01-22 03:22:27 +00:00
return func ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2022-12-25 16:41:38 +00:00
if s . userManager == nil {
2022-01-22 03:22:27 +00:00
return next ( w , r , v )
}
2022-01-27 17:49:05 +00:00
topics , _ , err := s . topicsFromPath ( r . URL . Path )
2022-01-22 03:22:27 +00:00
if err != nil {
return err
}
2023-01-28 14:03:14 +00:00
u := v . User ( )
2022-01-27 17:49:05 +00:00
for _ , t := range topics {
2023-01-28 14:03:14 +00:00
if err := s . userManager . Authorize ( u , t . ID , perm ) ; err != nil {
2023-02-05 02:26:01 +00:00
logvr ( v , r ) . Err ( err ) . Field ( "message_topic" , t . ID ) . Debug ( "Access to topic %s not authorized" , t . ID )
2022-01-27 17:49:05 +00:00
return errHTTPForbidden
}
2022-01-22 03:22:27 +00:00
}
return next ( w , r , v )
2021-11-05 17:46:27 +00:00
}
}
2023-02-08 20:20:44 +00:00
// maybeAuthenticate reads the "Authorization" header and will try to authenticate the user
// if it is set.
//
// - If the header is not set, an IP-based visitor is returned
// - If the header is set, authenticate will be called to check the username/password (Basic auth),
// or the token (Bearer auth), and read the user from the database
//
// This function will ALWAYS return a visitor, even if an error occurs (e.g. unauthorized), so
// that subsequent logging calls still have a visitor context.
func ( s * Server ) maybeAuthenticate ( r * http . Request ) ( * visitor , error ) {
// Read "Authorization" header value, and exit out early if it's not set
2022-12-22 02:55:39 +00:00
ip := extractIPAddress ( r , s . config . BehindProxy )
2023-02-08 20:20:44 +00:00
vip := s . visitor ( ip , nil )
header , err := readAuthHeader ( r )
if err != nil {
return vip , err
} else if header == "" {
return vip , nil
} else if s . userManager == nil {
return vip , errHTTPUnauthorized
2022-12-03 20:20:59 +00:00
}
2023-02-08 20:20:44 +00:00
// If we're trying to auth, check the rate limiter first
if ! vip . AuthAllowed ( ) {
return vip , errHTTPTooManyRequestsLimitAuthFailure // Always return visitor, even when error occurs!
}
u , err := s . authenticate ( r , header )
if err != nil {
vip . AuthFailed ( )
logr ( r ) . Err ( err ) . Debug ( "Authentication failed" )
return vip , errHTTPUnauthorized // Always return visitor, even when error occurs!
}
// Authentication with user was successful
return s . visitor ( ip , u ) , nil
2022-12-02 20:37:48 +00:00
}
2022-12-21 02:18:33 +00:00
// authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
// The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
// support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
2023-02-08 20:20:44 +00:00
// query param is effectively doubly base64 encoded. Its format is base64(Basic base64(user:pass)).
func ( s * Server ) authenticate ( r * http . Request , header string ) ( user * user . User , err error ) {
if strings . HasPrefix ( header , "Bearer" ) {
return s . authenticateBearerAuth ( r , strings . TrimSpace ( strings . TrimPrefix ( header , "Bearer" ) ) )
}
return s . authenticateBasicAuth ( r , header )
}
// readAuthHeader reads the raw value of the Authorization header, either from the actual HTTP header,
// or from the ?auth... query parameter
func readAuthHeader ( r * http . Request ) ( string , error ) {
2022-12-28 03:14:14 +00:00
value := strings . TrimSpace ( r . Header . Get ( "Authorization" ) )
2022-12-03 20:20:59 +00:00
queryParam := readQueryParam ( r , "authorization" , "auth" )
if queryParam != "" {
a , err := base64 . RawURLEncoding . DecodeString ( queryParam )
if err != nil {
2023-02-08 20:20:44 +00:00
return "" , err
2022-12-03 20:20:59 +00:00
}
2022-12-28 03:14:14 +00:00
value = strings . TrimSpace ( string ( a ) )
2022-12-03 20:20:59 +00:00
}
2023-02-08 20:20:44 +00:00
return value , nil
2022-12-03 20:20:59 +00:00
}
2022-12-25 16:41:38 +00:00
func ( s * Server ) authenticateBasicAuth ( r * http . Request , value string ) ( user * user . User , err error ) {
2022-12-03 20:20:59 +00:00
r . Header . Set ( "Authorization" , value )
username , password , ok := r . BasicAuth ( )
if ! ok {
return nil , errors . New ( "invalid basic auth" )
2023-01-29 21:15:08 +00:00
} else if username == "" {
return s . authenticateBearerAuth ( r , password ) // Treat password as token
2022-12-03 20:20:59 +00:00
}
2022-12-25 16:41:38 +00:00
return s . userManager . Authenticate ( username , password )
2022-12-03 20:20:59 +00:00
}
2023-01-29 21:15:08 +00:00
func ( s * Server ) authenticateBearerAuth ( r * http . Request , token string ) ( * user . User , error ) {
2023-01-29 01:29:06 +00:00
u , err := s . userManager . AuthenticateToken ( token )
if err != nil {
return nil , err
}
ip := extractIPAddress ( r , s . config . BehindProxy )
go s . userManager . EnqueueTokenUpdate ( token , & user . TokenUpdate {
LastAccess : time . Now ( ) ,
LastOrigin : ip ,
} )
return u , nil
2022-12-03 20:20:59 +00:00
}
2023-01-27 03:57:18 +00:00
func ( s * Server ) visitor ( ip netip . Addr , user * user . User ) * visitor {
2022-12-02 20:37:48 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2023-01-27 03:57:18 +00:00
id := visitorID ( ip , user )
v , exists := s . visitors [ id ]
2022-12-02 20:37:48 +00:00
if ! exists {
2023-01-27 03:57:18 +00:00
s . visitors [ id ] = newVisitor ( s . config , s . messageCache , s . userManager , ip , user )
return s . visitors [ id ]
2022-12-02 20:37:48 +00:00
}
v . Keepalive ( )
2023-02-08 20:20:44 +00:00
v . SetUser ( user ) // Always update with the latest user, may be nil!
2022-12-02 20:37:48 +00:00
return v
}
2023-01-18 20:50:06 +00:00
func ( s * Server ) writeJSON ( w http . ResponseWriter , v any ) error {
w . Header ( ) . Set ( "Content-Type" , "application/json" )
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , s . config . AccessControlAllowOrigin ) // CORS, allow cross-origin requests
if err := json . NewEncoder ( w ) . Encode ( v ) ; err != nil {
return err
}
return nil
}