Merge branch 'enable-subscriber-rate-limiting' into matrix-507-reject

This commit is contained in:
binwiederhier 2023-03-03 20:34:33 -05:00
commit 3eeeac2c13
28 changed files with 637 additions and 80 deletions

View file

@ -124,6 +124,7 @@ type Config struct {
VisitorAuthFailureLimitBurst int
VisitorAuthFailureLimitReplenish time.Duration
VisitorStatsResetTime time.Time // Time of the day at which to reset visitor stats
VisitorSubscriberRateLimiting bool // Enable subscriber-based rate limiting for UnifiedPush topics
BehindProxy bool
StripeSecretKey string
StripeWebhookKey string
@ -198,10 +199,12 @@ func NewConfig() *Config {
VisitorAuthFailureLimitBurst: DefaultVisitorAuthFailureLimitBurst,
VisitorAuthFailureLimitReplenish: DefaultVisitorAuthFailureLimitReplenish,
VisitorStatsResetTime: DefaultVisitorStatsResetTime,
VisitorSubscriberRateLimiting: false,
BehindProxy: false,
StripeSecretKey: "",
StripeWebhookKey: "",
StripePriceCacheDuration: DefaultStripePriceCacheDuration,
BillingContact: "",
EnableWeb: true,
EnableSignup: false,
EnableLogin: false,

View file

@ -31,7 +31,7 @@ const (
)
var (
normalErrorCodes = []int{http.StatusNotFound, http.StatusBadRequest, http.StatusTooManyRequests, http.StatusUnauthorized, http.StatusInsufficientStorage}
normalErrorCodes = []int{http.StatusNotFound, http.StatusBadRequest, http.StatusTooManyRequests, http.StatusUnauthorized, http.StatusForbidden, http.StatusInsufficientStorage}
rateLimitingErrorCodes = []int{http.StatusTooManyRequests, http.StatusRequestEntityTooLarge}
)

View file

@ -597,7 +597,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
if e != nil {
return nil, e.With(t)
}
if unifiedpush && t.RateVisitor() == nil {
if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
// UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see
// Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
// the subscription as invalid if any 400-499 code (except 429/408) is returned.
@ -1197,14 +1197,19 @@ func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, schedu
// maybeSetRateVisitors sets the rate visitor on a topic (v.SetRateVisitor), indicating that all messages published
// to that topic will be rate limited against the rate visitor instead of the publishing visitor.
//
// Setting the rate visitor is ony allowed if
// Setting the rate visitor is ony allowed if the `visitor-subscriber-rate-limiting` setting is enabled, AND
// - auth-file is not set (everything is open by default)
// - the topic is reserved, and v.user is the owner
// - the topic is not reserved, and v.user has write access
// - or the topic is reserved, and v.user is the owner
// - or the topic is not reserved, and v.user has write access
//
// Note: This TEMPORARILY also registers all topics starting with "up" (= UnifiedPush). This is to ease the transition
// until the Android app will send the "Rate-Topics" header.
func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic, rateTopics []string) error {
// Bail out if not enabled
if !s.config.VisitorSubscriberRateLimiting {
return nil
}
// Make a list of topics that we'll actually set the RateVisitor on
eligibleRateTopics := make([]*topic, 0)
for _, t := range topics {

View file

@ -117,18 +117,19 @@
# attachment-expiry-duration: "3h"
# If enabled, allow outgoing e-mail notifications via the 'X-Email' header. If this header is set,
# messages will additionally be sent out as e-mail using an external SMTP server. As of today, only
# SMTP servers with plain text auth and STARTLS are supported. Please also refer to the rate limiting settings
# below (visitor-email-limit-burst & visitor-email-limit-burst).
# messages will additionally be sent out as e-mail using an external SMTP server.
#
# As of today, only SMTP servers with plain text auth (or no auth at all), and STARTLS are supported.
# Please also refer to the rate limiting settings below (visitor-email-limit-burst & visitor-email-limit-burst).
#
# - smtp-sender-addr is the hostname:port of the SMTP server
# - smtp-sender-user/smtp-sender-pass are the username and password of the SMTP user
# - smtp-sender-from is the e-mail address of the sender
# - smtp-sender-user/smtp-sender-pass are the username and password of the SMTP user (leave blank for no auth)
#
# smtp-sender-addr:
# smtp-sender-from:
# smtp-sender-user:
# smtp-sender-pass:
# smtp-sender-from:
# If enabled, ntfy will launch a lightweight SMTP server for incoming messages. Once configured, users can send
# emails to a topic e-mail address to publish messages to a topic.
@ -234,6 +235,21 @@
# visitor-attachment-total-size-limit: "100M"
# visitor-attachment-daily-bandwidth-limit: "500M"
# Rate limiting: Enable subscriber-based rate limiting (mostly used for UnifiedPush)
#
# If enabled, subscribers may opt to have published messages counted against their own rate limits, as opposed
# to the publisher's rate limits. This is especially useful to increase the amount of messages that high-volume
# publishers (e.g. Matrix/Mastodon servers) are allowed to send.
#
# Once enabled, a client may send a "Rate-Topics: <topic1>,<topic2>,..." header when subscribing to topics via
# HTTP stream, or websockets, thereby registering itself as the "rate visitor", i.e. the visitor whose rate limits
# to use when publishing on this topic. Note: Setting the rate visitor requires READ-WRITE permission on the topic.
#
# UnifiedPush only: If this setting is enabled, publishing to UnifiedPush topics will lead to a HTTP 507 response if
# no "rate visitor" has been previously registered. This is to avoid burning the publisher's "visitor-message-daily-limit".
#
# visitor-subscriber-rate-limiting: false
# Payments integration via Stripe
#
# - stripe-secret-key is the key used for the Stripe API communication. Setting this values

View file

@ -657,6 +657,17 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) {
m2 := toMessage(t, rr.Body.String())
require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, m2.ID))
// Pre-verify message count and file
ms, err := s.messageCache.Messages("mytopic1", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 1, len(ms))
require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, m1.ID))
ms, err = s.messageCache.Messages("mytopic2", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 1, len(ms))
require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, m2.ID))
// Delete reservation
rr = request(t, s, "DELETE", "/v1/account/reservation/mytopic1", ``, map[string]string{
"X-Delete-Messages": "true",
@ -672,9 +683,13 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) {
// Verify that messages and attachments were deleted
// This does not explicitly call the manager!
time.Sleep(time.Second)
waitFor(t, func() bool {
ms, err := s.messageCache.Messages("mytopic1", sinceAllMessages, false)
require.Nil(t, err)
return len(ms) == 0 && !util.FileExists(filepath.Join(s.config.AttachmentCacheDir, m1.ID))
})
ms, err := s.messageCache.Messages("mytopic1", sinceAllMessages, false)
ms, err = s.messageCache.Messages("mytopic1", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 0, len(ms))
require.NoFileExists(t, filepath.Join(s.config.AttachmentCacheDir, m1.ID))
@ -712,13 +727,12 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) {
})
require.Equal(t, 200, rr.Code)
// Wait for stats queue writer
time.Sleep(600 * time.Millisecond)
// Verify that message stats were persisted
u, err := s.userManager.User("phil")
require.Nil(t, err)
require.Equal(t, int64(1), u.Stats.Messages)
// Wait for stats queue writer, verify that message stats were persisted
waitFor(t, func() bool {
u, err := s.userManager.User("phil")
require.Nil(t, err)
return int64(1) == u.Stats.Messages
})
// Change tier, make a request (to reset limiters)
require.Nil(t, s.userManager.ChangeTier("phil", "pro"))
@ -736,10 +750,11 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) {
require.Equal(t, 200, rr.Code)
// Verify that message stats were persisted
time.Sleep(600 * time.Millisecond)
u, err = s.userManager.User("phil")
require.Nil(t, err)
require.Equal(t, int64(2), u.Stats.Messages) // v.EnqueueUserStats had run!
waitFor(t, func() bool {
u, err := s.userManager.User("phil")
require.Nil(t, err)
return int64(2) == u.Stats.Messages // v.EnqueueUserStats had run!
})
// Stats keep counting
rr = request(t, s, "GET", "/v1/account", "", map[string]string{

View file

@ -15,6 +15,7 @@ import (
"net/netip"
"os"
"path/filepath"
"runtime/debug"
"strings"
"sync"
"testing"
@ -914,7 +915,15 @@ func TestServer_StatsResetter(t *testing.T) {
require.Equal(t, int64(2), account.Stats.Messages)
// Wait for stats resetter to run
time.Sleep(2200 * time.Millisecond)
waitFor(t, func() bool {
response = request(t, s, "GET", "/v1/account", "", map[string]string{
"Authorization": util.BasicAuth("phil", "phil"),
})
require.Equal(t, 200, response.Code)
account, err = util.UnmarshalJSON[apiAccountResponse](io.NopCloser(response.Body))
require.Nil(t, err)
return account.Stats.Messages == 0
})
// User stats show 0 messages now!
response = request(t, s, "GET", "/v1/account", "", map[string]string{
@ -1283,7 +1292,9 @@ func TestServer_MatrixGateway_Push_Success(t *testing.T) {
}
func TestServer_MatrixGateway_Push_Failure_NoSubscriber(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
c := newTestConfig(t)
c.VisitorSubscriberRateLimiting = true
s := newTestServer(t, c)
notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}`
response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
require.Equal(t, 507, response.Code)
@ -1661,9 +1672,10 @@ func TestServer_PublishAttachmentAndExpire(t *testing.T) {
require.Equal(t, content, response.Body.String())
// Prune and makes sure it's gone
time.Sleep(time.Second) // Sigh ...
s.execManager()
require.NoFileExists(t, file)
waitFor(t, func() bool {
s.execManager() // May run many times
return !util.FileExists(file)
})
response = request(t, s, "GET", path, "", nil)
require.Equal(t, 404, response.Code)
}
@ -2020,6 +2032,7 @@ func TestServer_AnonymousUser_And_NonTierUser_Are_Same_Visitor(t *testing.T) {
func TestServer_SubscriberRateLimiting_Success(t *testing.T) {
c := newTestConfigWithAuthFile(t)
c.VisitorRequestLimitBurst = 3
c.VisitorSubscriberRateLimiting = true
s := newTestServer(t, c)
// "Register" visitor 1.2.3.4 to topic "subscriber1topic" as a rate limit visitor
@ -2031,6 +2044,7 @@ func TestServer_SubscriberRateLimiting_Success(t *testing.T) {
}, subscriber1Fn)
require.Equal(t, 200, rr.Code)
require.Equal(t, "", rr.Body.String())
require.Equal(t, "1.2.3.4", s.topics["subscriber1topic"].rateVisitor.ip.String())
// "Register" visitor 8.7.7.1 to topic "up012345678912" as a rate limit visitor (implicitly via topic name)
subscriber2Fn := func(r *http.Request) {
@ -2039,6 +2053,7 @@ func TestServer_SubscriberRateLimiting_Success(t *testing.T) {
rr = request(t, s, "GET", "/up012345678912/json?poll=1", "", nil, subscriber2Fn)
require.Equal(t, 200, rr.Code)
require.Equal(t, "", rr.Body.String())
require.Equal(t, "8.7.7.1", s.topics["up012345678912"].rateVisitor.ip.String())
// Publish 2 messages to "subscriber1topic" as visitor 9.9.9.9. It'd be 3 normally, but the
// GET request before is also counted towards the request limiter.
@ -2070,9 +2085,47 @@ func TestServer_SubscriberRateLimiting_Success(t *testing.T) {
require.Equal(t, 429, rr.Code)
}
func TestServer_SubscriberRateLimiting_NotEnabled_Failed(t *testing.T) {
c := newTestConfigWithAuthFile(t)
c.VisitorRequestLimitBurst = 3
c.VisitorSubscriberRateLimiting = false
s := newTestServer(t, c)
// Subscriber rate limiting is disabled!
// Registering visitor 1.2.3.4 to topic has no effect
rr := request(t, s, "GET", "/subscriber1topic/json?poll=1", "", map[string]string{
"Rate-Topics": "subscriber1topic",
}, func(r *http.Request) {
r.RemoteAddr = "1.2.3.4"
})
require.Equal(t, 200, rr.Code)
require.Equal(t, "", rr.Body.String())
require.Nil(t, s.topics["subscriber1topic"].rateVisitor)
// Registering visitor 8.7.7.1 to topic has no effect
rr = request(t, s, "GET", "/up012345678912/json?poll=1", "", nil, func(r *http.Request) {
r.RemoteAddr = "8.7.7.1"
})
require.Equal(t, 200, rr.Code)
require.Equal(t, "", rr.Body.String())
require.Nil(t, s.topics["up012345678912"].rateVisitor)
// Publish 3 messages to "subscriber1topic" as visitor 9.9.9.9
for i := 0; i < 3; i++ {
rr := request(t, s, "PUT", "/subscriber1topic", "some message", nil)
require.Equal(t, 200, rr.Code)
}
rr = request(t, s, "PUT", "/subscriber1topic", "some message", nil)
require.Equal(t, 429, rr.Code)
rr = request(t, s, "PUT", "/up012345678912", "some message", nil)
require.Equal(t, 429, rr.Code)
}
func TestServer_SubscriberRateLimiting_UP_Only(t *testing.T) {
c := newTestConfigWithAuthFile(t)
c.VisitorRequestLimitBurst = 3
c.VisitorSubscriberRateLimiting = true
s := newTestServer(t, c)
// "Register" 5 different UnifiedPush visitors
@ -2096,6 +2149,7 @@ func TestServer_SubscriberRateLimiting_UP_Only(t *testing.T) {
func TestServer_Matrix_SubscriberRateLimiting_UP_Only(t *testing.T) {
c := newTestConfig(t)
c.VisitorRequestLimitBurst = 3
c.VisitorSubscriberRateLimiting = true
s := newTestServer(t, c)
// "Register" 5 different UnifiedPush visitors
@ -2123,6 +2177,7 @@ func TestServer_Matrix_SubscriberRateLimiting_UP_Only(t *testing.T) {
func TestServer_SubscriberRateLimiting_VisitorExpiration(t *testing.T) {
c := newTestConfig(t)
c.VisitorRequestLimitBurst = 3
c.VisitorSubscriberRateLimiting = true
s := newTestServer(t, c)
// "Register" rate visitor
@ -2158,6 +2213,7 @@ func TestServer_SubscriberRateLimiting_VisitorExpiration(t *testing.T) {
func TestServer_SubscriberRateLimiting_ProtectedTopics(t *testing.T) {
c := newTestConfigWithAuthFile(t)
c.AuthDefault = user.PermissionDenyAll
c.VisitorSubscriberRateLimiting = true
s := newTestServer(t, c)
// Create some ACLs
@ -2205,6 +2261,7 @@ func TestServer_SubscriberRateLimiting_ProtectedTopics(t *testing.T) {
func TestServer_SubscriberRateLimiting_ProtectedTopics_WithDefaultReadWrite(t *testing.T) {
c := newTestConfigWithAuthFile(t)
c.AuthDefault = user.PermissionReadWrite
c.VisitorSubscriberRateLimiting = true
s := newTestServer(t, c)
// Create some ACLs
@ -2311,3 +2368,18 @@ func readAll(t *testing.T, rc io.ReadCloser) string {
}
return string(b)
}
func waitFor(t *testing.T, f func() bool) {
waitForWithMaxWait(t, 5*time.Second, f)
}
func waitForWithMaxWait(t *testing.T, maxWait time.Duration, f func() bool) {
start := time.Now()
for time.Since(start) < maxWait {
if f() {
return
}
time.Sleep(100 * time.Millisecond)
}
t.Fatalf("Function f did not succeed after %v: %v", maxWait, string(debug.Stack()))
}

View file

@ -36,7 +36,10 @@ func (s *smtpSender) Send(v *visitor, m *message, to string) error {
if err != nil {
return err
}
auth := smtp.PlainAuth("", s.config.SMTPSenderUser, s.config.SMTPSenderPass, host)
var auth smtp.Auth
if s.config.SMTPSenderUser != "" {
auth = smtp.PlainAuth("", s.config.SMTPSenderUser, s.config.SMTPSenderPass, host)
}
ev := logvm(v, m).
Tag(tagEmail).
Fields(log.Context{

View file

@ -143,6 +143,7 @@ func (v *visitor) contextNoLock() log.Context {
fields := log.Context{
"visitor_id": visitorID(v.ip, v.user),
"visitor_ip": v.ip.String(),
"visitor_seen": util.FormatTime(v.seen),
"visitor_messages": info.Stats.Messages,
"visitor_messages_limit": info.Limits.MessageLimit,
"visitor_messages_remaining": info.Stats.MessagesRemaining,