Improve redbean concurrency (#1332)

In the course of playing with redbean I was confused about how the state
was behaving and then noticed that some stuff is maybe getting edited by
multiple processes. I tried to improve things by changing the definition
of the counter variables to be explicitly atomic. Claude assures me that
most modern Unixes support cross-process atomics, so I just went with it
on that front.

I also added some mutexes to the shared state to try to synchronize some
other things that might get written or read from workers but couldn't be
made atomic, mainly the rusage and time values. I could've probably been
less granular and just had a global shared-state lock, but I opted to be
fairly granular as a starting point.

This also reorders the resetting of the lastmeltdown timespec before the
SIGUSR2 signal is sent; hopefully this is okay.
This commit is contained in:
Steven Dee (Jōshin) 2024-12-02 17:05:38 -05:00 committed by GitHub
parent 3142758675
commit b40140e6c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -181,12 +181,8 @@ __static_yoink("blink_xnu_aarch64"); // is apple silicon
#define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a) #define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a)
#define HeaderEqualCase(H, S) \ #define HeaderEqualCase(H, S) \
SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H)) SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H))
#define LockInc(P) \ #define LockInc(P) atomic_fetch_add_explicit(P, +1, memory_order_relaxed)
atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), +1, \ #define LockDec(P) atomic_fetch_add_explicit(P, -1, memory_order_relaxed)
memory_order_relaxed)
#define LockDec(P) \
atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), -1, \
memory_order_relaxed)
#define TRACE_BEGIN \ #define TRACE_BEGIN \
do { \ do { \
@ -377,19 +373,21 @@ struct Blackhole {
} blackhole; } blackhole;
static struct Shared { static struct Shared {
int workers; _Atomic(int) workers;
struct timespec nowish;
struct timespec lastreindex;
struct timespec lastmeltdown; struct timespec lastmeltdown;
struct timespec nowish;
char currentdate[32]; char currentdate[32];
struct rusage server; struct rusage server;
struct rusage children; struct rusage children;
struct Counters { struct Counters {
#define C(x) long x; #define C(x) _Atomic(long) x;
#include "tool/net/counters.inc" #include "tool/net/counters.inc"
#undef C #undef C
} c; } c;
pthread_spinlock_t montermlock; pthread_mutex_t datetime_mu;
pthread_mutex_t server_mu;
pthread_mutex_t children_mu;
pthread_mutex_t lastmeltdown_mu;
} *shared; } *shared;
static const char kCounterNames[] = static const char kCounterNames[] =
@ -1350,8 +1348,8 @@ static void CallSimpleHookIfDefined(const char *s) {
} }
static void ReportWorkerExit(int pid, int ws) { static void ReportWorkerExit(int pid, int ws) {
int workers; int workers =
workers = atomic_fetch_sub((_Atomic(int) *)&shared->workers, 1) - 1; atomic_fetch_sub_explicit(&shared->workers, 1, memory_order_release);
if (WIFEXITED(ws)) { if (WIFEXITED(ws)) {
if (WEXITSTATUS(ws)) { if (WEXITSTATUS(ws)) {
LockInc(&shared->c.failedchildren); LockInc(&shared->c.failedchildren);
@ -1383,7 +1381,9 @@ static void ReportWorkerResources(int pid, struct rusage *ru) {
static void HandleWorkerExit(int pid, int ws, struct rusage *ru) { static void HandleWorkerExit(int pid, int ws, struct rusage *ru) {
LockInc(&shared->c.connectionshandled); LockInc(&shared->c.connectionshandled);
unassert(!pthread_mutex_lock(&shared->children_mu));
rusage_add(&shared->children, ru); rusage_add(&shared->children, ru);
unassert(!pthread_mutex_unlock(&shared->children_mu));
ReportWorkerExit(pid, ws); ReportWorkerExit(pid, ws);
ReportWorkerResources(pid, ru); ReportWorkerResources(pid, ru);
if (hasonprocessdestroy) { if (hasonprocessdestroy) {
@ -2129,9 +2129,11 @@ static void UpdateCurrentDate(struct timespec now) {
int64_t t; int64_t t;
struct tm tm; struct tm tm;
t = now.tv_sec; t = now.tv_sec;
shared->nowish = now;
gmtime_r(&t, &tm); gmtime_r(&t, &tm);
unassert(!pthread_mutex_lock(&shared->datetime_mu));
shared->nowish = now;
FormatHttpDateTime(shared->currentdate, &tm); FormatHttpDateTime(shared->currentdate, &tm);
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
} }
static int64_t GetGmtOffset(int64_t t) { static int64_t GetGmtOffset(int64_t t) {
@ -2364,7 +2366,10 @@ static char *AppendCache(char *p, int64_t seconds, char *directive) {
p = stpcpy(p, directive); p = stpcpy(p, directive);
} }
p = AppendCrlf(p); p = AppendCrlf(p);
return AppendExpires(p, shared->nowish.tv_sec + seconds); unassert(!pthread_mutex_lock(&shared->datetime_mu));
long nowish_sec = shared->nowish.tv_sec;
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
return AppendExpires(p, nowish_sec + seconds);
} }
static inline char *AppendContentLength(char *p, size_t n) { static inline char *AppendContentLength(char *p, size_t n) {
@ -3103,9 +3108,12 @@ td { padding-right: 3em; }\r\n\
<td valign=\"top\">\r\n\ <td valign=\"top\">\r\n\
<a href=\"/statusz\">/statusz</a>\r\n\ <a href=\"/statusz\">/statusz</a>\r\n\
"); ");
if (shared->c.connectionshandled) { if (atomic_load_explicit(&shared->c.connectionshandled,
memory_order_acquire)) {
appends(&cpm.outbuf, "says your redbean<br>\r\n"); appends(&cpm.outbuf, "says your redbean<br>\r\n");
unassert(!pthread_mutex_lock(&shared->children_mu));
AppendResourceReport(&cpm.outbuf, &shared->children, "<br>\r\n"); AppendResourceReport(&cpm.outbuf, &shared->children, "<br>\r\n");
unassert(!pthread_mutex_unlock(&shared->children_mu));
} }
appends(&cpm.outbuf, "<td valign=\"top\">\r\n"); appends(&cpm.outbuf, "<td valign=\"top\">\r\n");
and = ""; and = "";
@ -3127,12 +3135,12 @@ td { padding-right: 3em; }\r\n\
} }
appendf(&cpm.outbuf, "%s%,ld second%s of operation<br>\r\n", and, y.rem, appendf(&cpm.outbuf, "%s%,ld second%s of operation<br>\r\n", and, y.rem,
y.rem == 1 ? "" : "s"); y.rem == 1 ? "" : "s");
x = shared->c.messageshandled; x = atomic_load_explicit(&shared->c.messageshandled, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld message%s handled<br>\r\n", x, x == 1 ? "" : "s"); appendf(&cpm.outbuf, "%,ld message%s handled<br>\r\n", x, x == 1 ? "" : "s");
x = shared->c.connectionshandled; x = atomic_load_explicit(&shared->c.connectionshandled, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld connection%s handled<br>\r\n", x, appendf(&cpm.outbuf, "%,ld connection%s handled<br>\r\n", x,
x == 1 ? "" : "s"); x == 1 ? "" : "s");
x = shared->workers; x = atomic_load_explicit(&shared->workers, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld connection%s active<br>\r\n", x, appendf(&cpm.outbuf, "%,ld connection%s active<br>\r\n", x,
x == 1 ? "" : "s"); x == 1 ? "" : "s");
appends(&cpm.outbuf, "</table>\r\n"); appends(&cpm.outbuf, "</table>\r\n");
@ -3184,11 +3192,11 @@ static void AppendRusage(const char *a, struct rusage *ru) {
} }
static void ServeCounters(void) { static void ServeCounters(void) {
const long *c; const _Atomic(long) *c;
const char *s; const char *s;
for (c = (const long *)&shared->c, s = kCounterNames; *s; for (c = (const _Atomic(long) *)&shared->c, s = kCounterNames; *s;
++c, s += strlen(s) + 1) { ++c, s += strlen(s) + 1) {
AppendLong1(s, *c); AppendLong1(s, atomic_load_explicit(c, memory_order_relaxed));
} }
} }
@ -3201,12 +3209,17 @@ static char *ServeStatusz(void) {
AppendLong1("pid", getpid()); AppendLong1("pid", getpid());
AppendLong1("ppid", getppid()); AppendLong1("ppid", getppid());
AppendLong1("now", timespec_real().tv_sec); AppendLong1("now", timespec_real().tv_sec);
unassert(!pthread_mutex_lock(&shared->datetime_mu));
AppendLong1("nowish", shared->nowish.tv_sec); AppendLong1("nowish", shared->nowish.tv_sec);
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
AppendLong1("gmtoff", gmtoff); AppendLong1("gmtoff", gmtoff);
AppendLong1("CLK_TCK", CLK_TCK); AppendLong1("CLK_TCK", CLK_TCK);
AppendLong1("startserver", startserver.tv_sec); AppendLong1("startserver", startserver.tv_sec);
unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu));
AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec); AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec);
AppendLong1("workers", shared->workers); unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu));
AppendLong1("workers",
atomic_load_explicit(&shared->workers, memory_order_relaxed));
AppendLong1("assets.n", assets.n); AppendLong1("assets.n", assets.n);
#ifndef STATIC #ifndef STATIC
lua_State *L = GL; lua_State *L = GL;
@ -3214,8 +3227,12 @@ static char *ServeStatusz(void) {
lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB)); lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB));
#endif #endif
ServeCounters(); ServeCounters();
unassert(!pthread_mutex_lock(&shared->server_mu));
AppendRusage("server", &shared->server); AppendRusage("server", &shared->server);
unassert(!pthread_mutex_unlock(&shared->server_mu));
unassert(!pthread_mutex_lock(&shared->children_mu));
AppendRusage("children", &shared->children); AppendRusage("children", &shared->children);
unassert(!pthread_mutex_unlock(&shared->children_mu));
p = SetStatus(200, "OK"); p = SetStatus(200, "OK");
p = AppendContentType(p, "text/plain"); p = AppendContentType(p, "text/plain");
if (cpm.msg.version >= 11) { if (cpm.msg.version >= 11) {
@ -3980,7 +3997,9 @@ static int LuaNilTlsError(lua_State *L, const char *s, int r) {
#include "tool/net/fetch.inc" #include "tool/net/fetch.inc"
static int LuaGetDate(lua_State *L) { static int LuaGetDate(lua_State *L) {
unassert(!pthread_mutex_lock(&shared->datetime_mu));
lua_pushinteger(L, shared->nowish.tv_sec); lua_pushinteger(L, shared->nowish.tv_sec);
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
return 1; return 1;
} }
@ -5034,7 +5053,7 @@ static int LuaProgramTokenBucket(lua_State *L) {
npassert(pid != -1); npassert(pid != -1);
if (!pid) if (!pid)
Replenisher(); Replenisher();
++shared->workers; atomic_fetch_add_explicit(&shared->workers, 1, memory_order_acquire);
return 0; return 0;
} }
@ -5679,7 +5698,8 @@ static void LogClose(const char *reason) {
if (amtread || meltdown || killed) { if (amtread || meltdown || killed) {
LockInc(&shared->c.fumbles); LockInc(&shared->c.fumbles);
INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)", INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)",
DescribeClient(), reason, amtread, messageshandled, shared->workers); DescribeClient(), reason, amtread, messageshandled,
atomic_load_explicit(&shared->workers, memory_order_relaxed));
} else { } else {
DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason, DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason,
messageshandled); messageshandled);
@ -5737,14 +5757,18 @@ Content-Length: 22\r\n\
} }
static void EnterMeltdownMode(void) { static void EnterMeltdownMode(void) {
unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu));
if (timespec_cmp(timespec_sub(timespec_real(), shared->lastmeltdown), if (timespec_cmp(timespec_sub(timespec_real(), shared->lastmeltdown),
(struct timespec){1}) < 0) { (struct timespec){1}) < 0) {
unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu));
return; return;
} }
WARNF("(srvr) server is melting down (%,d workers)", shared->workers);
LOGIFNEG1(kill(0, SIGUSR2));
shared->lastmeltdown = timespec_real(); shared->lastmeltdown = timespec_real();
++shared->c.meltdowns; pthread_mutex_unlock(&shared->lastmeltdown_mu);
WARNF("(srvr) server is melting down (%,d workers)",
atomic_load_explicit(&shared->workers, memory_order_relaxed));
LOGIFNEG1(kill(0, SIGUSR2));
LockInc(&shared->c.meltdowns);
} }
static char *HandlePayloadDisconnect(void) { static char *HandlePayloadDisconnect(void) {
@ -5861,7 +5885,9 @@ static void HandleHeartbeat(void) {
size_t i; size_t i;
UpdateCurrentDate(timespec_real()); UpdateCurrentDate(timespec_real());
Reindex(); Reindex();
unassert(!pthread_mutex_lock(&shared->server_mu));
getrusage(RUSAGE_SELF, &shared->server); getrusage(RUSAGE_SELF, &shared->server);
unassert(!pthread_mutex_unlock(&shared->server_mu));
#ifndef STATIC #ifndef STATIC
CallSimpleHookIfDefined("OnServerHeartbeat"); CallSimpleHookIfDefined("OnServerHeartbeat");
CollectGarbage(); CollectGarbage();
@ -6481,7 +6507,9 @@ static bool HandleMessageActual(void) {
DEBUGF("(clnt) could not synchronize message stream"); DEBUGF("(clnt) could not synchronize message stream");
} }
if (cpm.msg.version >= 10) { if (cpm.msg.version >= 10) {
unassert(!pthread_mutex_lock(&shared->datetime_mu));
p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate)); p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate));
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
if (!cpm.branded) if (!cpm.branded)
p = stpcpy(p, serverheader); p = stpcpy(p, serverheader);
if (extrahdrs) if (extrahdrs)
@ -6751,7 +6779,9 @@ static int HandleConnection(size_t i) {
DEBUGF("(token) can't acquire accept() token for client"); DEBUGF("(token) can't acquire accept() token for client");
} }
startconnection = timespec_real(); startconnection = timespec_real();
if (UNLIKELY(maxworkers) && shared->workers >= maxworkers) { if (UNLIKELY(maxworkers) &&
atomic_load_explicit(&shared->workers, memory_order_relaxed) >=
maxworkers) {
EnterMeltdownMode(); EnterMeltdownMode();
SendServiceUnavailable(); SendServiceUnavailable();
close(client); close(client);
@ -7346,6 +7376,14 @@ void RedBean(int argc, char *argv[]) {
(shared = mmap(NULL, ROUNDUP(sizeof(struct Shared), getgransize()), (shared = mmap(NULL, ROUNDUP(sizeof(struct Shared), getgransize()),
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
-1, 0))); -1, 0)));
pthread_mutexattr_t attr;
unassert(!pthread_mutexattr_init(&attr));
unassert(!pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED));
unassert(!pthread_mutex_init(&shared->datetime_mu, &attr));
unassert(!pthread_mutex_init(&shared->server_mu, &attr));
unassert(!pthread_mutex_init(&shared->children_mu, &attr));
unassert(!pthread_mutex_init(&shared->lastmeltdown_mu, &attr));
unassert(!pthread_mutexattr_destroy(&attr));
if (daemonize) { if (daemonize) {
for (int i = 0; i < 256; ++i) { for (int i = 0; i < 256; ++i) {
close(i); close(i);