diff --git a/tool/net/redbean.c b/tool/net/redbean.c index 660319f0a..23bb0b4a4 100644 --- a/tool/net/redbean.c +++ b/tool/net/redbean.c @@ -181,10 +181,8 @@ __static_yoink("blink_xnu_aarch64"); // is apple silicon #define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a) #define HeaderEqualCase(H, S) \ SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H)) -#define LockInc(P) \ - atomic_fetch_add_explicit(P, +1, memory_order_relaxed) -#define LockDec(P) \ - atomic_fetch_add_explicit(P, -1, memory_order_relaxed) +#define LockInc(P) atomic_fetch_add_explicit(P, +1, memory_order_relaxed) +#define LockDec(P) atomic_fetch_add_explicit(P, -1, memory_order_relaxed) #define TRACE_BEGIN \ do { \ @@ -375,7 +373,7 @@ struct Blackhole { } blackhole; static struct Shared { - int workers; + _Atomic(int) workers; struct timespec nowish; struct timespec lastreindex; struct timespec lastmeltdown; @@ -387,7 +385,9 @@ static struct Shared { #include "tool/net/counters.inc" #undef C } c; - pthread_spinlock_t montermlock; + pthread_mutex_t datetime_mu; + pthread_mutex_t server_mu; + pthread_mutex_t children_mu; } *shared; static const char kCounterNames[] = @@ -1348,8 +1348,8 @@ static void CallSimpleHookIfDefined(const char *s) { } static void ReportWorkerExit(int pid, int ws) { - int workers; - workers = atomic_fetch_sub((_Atomic(int) *)&shared->workers, 1) - 1; + int workers = + atomic_fetch_sub_explicit(&shared->workers, 1, memory_order_release); if (WIFEXITED(ws)) { if (WEXITSTATUS(ws)) { LockInc(&shared->c.failedchildren); @@ -1381,7 +1381,9 @@ static void ReportWorkerResources(int pid, struct rusage *ru) { static void HandleWorkerExit(int pid, int ws, struct rusage *ru) { LockInc(&shared->c.connectionshandled); + unassert(!pthread_mutex_lock(&shared->children_mu)); rusage_add(&shared->children, ru); + unassert(!pthread_mutex_unlock(&shared->children_mu)); ReportWorkerExit(pid, ws); ReportWorkerResources(pid, ru); if (hasonprocessdestroy) { @@ -2127,9 +2129,11 @@ static void UpdateCurrentDate(struct timespec now) { int64_t t; struct tm tm; t = now.tv_sec; - shared->nowish = now; gmtime_r(&t, &tm); + unassert(pthread_mutex_lock(&shared->datetime_mu)); + shared->nowish = now; FormatHttpDateTime(shared->currentdate, &tm); + unassert(pthread_mutex_unlock(&shared->datetime_mu)); } static int64_t GetGmtOffset(int64_t t) { @@ -2362,7 +2366,10 @@ static char *AppendCache(char *p, int64_t seconds, char *directive) { p = stpcpy(p, directive); } p = AppendCrlf(p); - return AppendExpires(p, shared->nowish.tv_sec + seconds); + unassert(pthread_mutex_lock(&shared->datetime_mu)); + long nowish_sec = shared->nowish.tv_sec; + unassert(pthread_mutex_unlock(&shared->datetime_mu)); + return AppendExpires(p, nowish_sec + seconds); } static inline char *AppendContentLength(char *p, size_t n) { @@ -3101,9 +3108,12 @@ td { padding-right: 3em; }\r\n\ \r\n\ /statusz\r\n\ "); - if (shared->c.connectionshandled) { + if (atomic_load_explicit(&shared->c.connectionshandled, + memory_order_acquire)) { appends(&cpm.outbuf, "says your redbean
\r\n"); + unassert(pthread_mutex_lock(&shared->children_mu)); AppendResourceReport(&cpm.outbuf, &shared->children, "
\r\n"); + unassert(pthread_mutex_unlock(&shared->children_mu)); } appends(&cpm.outbuf, "\r\n"); and = ""; @@ -3125,12 +3135,12 @@ td { padding-right: 3em; }\r\n\ } appendf(&cpm.outbuf, "%s%,ld second%s of operation
\r\n", and, y.rem, y.rem == 1 ? "" : "s"); - x = shared->c.messageshandled; + x = atomic_load_explicit(&shared->c.messageshandled, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld message%s handled
\r\n", x, x == 1 ? "" : "s"); - x = shared->c.connectionshandled; + x = atomic_load_explicit(&shared->c.connectionshandled, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld connection%s handled
\r\n", x, x == 1 ? "" : "s"); - x = shared->workers; + x = atomic_load_explicit(&shared->workers, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld connection%s active
\r\n", x, x == 1 ? "" : "s"); appends(&cpm.outbuf, "\r\n"); @@ -3182,11 +3192,11 @@ static void AppendRusage(const char *a, struct rusage *ru) { } static void ServeCounters(void) { - const long *c; + const _Atomic(long) *c; const char *s; - for (c = (const long *)&shared->c, s = kCounterNames; *s; + for (c = (const _Atomic(long) *)&shared->c, s = kCounterNames; *s; ++c, s += strlen(s) + 1) { - AppendLong1(s, *c); + AppendLong1(s, atomic_load_explicit(c, memory_order_relaxed)); } } @@ -3199,12 +3209,15 @@ static char *ServeStatusz(void) { AppendLong1("pid", getpid()); AppendLong1("ppid", getppid()); AppendLong1("now", timespec_real().tv_sec); + unassert(pthread_mutex_lock(&shared->datetime_mu)); AppendLong1("nowish", shared->nowish.tv_sec); + unassert(pthread_mutex_unlock(&shared->datetime_mu)); AppendLong1("gmtoff", gmtoff); AppendLong1("CLK_TCK", CLK_TCK); AppendLong1("startserver", startserver.tv_sec); AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec); - AppendLong1("workers", shared->workers); + AppendLong1("workers", + atomic_load_explicit(&shared->workers, memory_order_relaxed)); AppendLong1("assets.n", assets.n); #ifndef STATIC lua_State *L = GL; @@ -3212,8 +3225,12 @@ static char *ServeStatusz(void) { lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB)); #endif ServeCounters(); + unassert(pthread_mutex_lock(&shared->server_mu)); AppendRusage("server", &shared->server); + unassert(pthread_mutex_unlock(&shared->server_mu)); + unassert(pthread_mutex_lock(&shared->children_mu)); AppendRusage("children", &shared->children); + unassert(pthread_mutex_unlock(&shared->children_mu)); p = SetStatus(200, "OK"); p = AppendContentType(p, "text/plain"); if (cpm.msg.version >= 11) { @@ -3978,7 +3995,9 @@ static int LuaNilTlsError(lua_State *L, const char *s, int r) { #include "tool/net/fetch.inc" static int LuaGetDate(lua_State *L) { + unassert(pthread_mutex_lock(&shared->datetime_mu)); lua_pushinteger(L, shared->nowish.tv_sec); + unassert(pthread_mutex_unlock(&shared->datetime_mu)); return 1; } @@ -5032,7 +5051,7 @@ static int LuaProgramTokenBucket(lua_State *L) { npassert(pid != -1); if (!pid) Replenisher(); - ++shared->workers; + atomic_fetch_add_explicit(&shared->workers, 1, memory_order_acquire); return 0; } @@ -5677,7 +5696,8 @@ static void LogClose(const char *reason) { if (amtread || meltdown || killed) { LockInc(&shared->c.fumbles); INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)", - DescribeClient(), reason, amtread, messageshandled, shared->workers); + DescribeClient(), reason, amtread, messageshandled, + atomic_load_explicit(&shared->workers, memory_order_relaxed)); } else { DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason, messageshandled); @@ -5739,10 +5759,11 @@ static void EnterMeltdownMode(void) { (struct timespec){1}) < 0) { return; } - WARNF("(srvr) server is melting down (%,d workers)", shared->workers); + WARNF("(srvr) server is melting down (%,d workers)", + atomic_load_explicit(&shared->workers, memory_order_relaxed)); LOGIFNEG1(kill(0, SIGUSR2)); shared->lastmeltdown = timespec_real(); - ++shared->c.meltdowns; + LockInc(&shared->c.meltdowns); } static char *HandlePayloadDisconnect(void) { @@ -5859,7 +5880,9 @@ static void HandleHeartbeat(void) { size_t i; UpdateCurrentDate(timespec_real()); Reindex(); + unassert(pthread_mutex_lock(&shared->server_mu)); getrusage(RUSAGE_SELF, &shared->server); + unassert(pthread_mutex_unlock(&shared->server_mu)); #ifndef STATIC CallSimpleHookIfDefined("OnServerHeartbeat"); CollectGarbage(); @@ -6479,7 +6502,9 @@ static bool HandleMessageActual(void) { DEBUGF("(clnt) could not synchronize message stream"); } if (cpm.msg.version >= 10) { + unassert(pthread_mutex_lock(&shared->datetime_mu)); p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate)); + unassert(pthread_mutex_unlock(&shared->datetime_mu)); if (!cpm.branded) p = stpcpy(p, serverheader); if (extrahdrs) @@ -6749,7 +6774,9 @@ static int HandleConnection(size_t i) { DEBUGF("(token) can't acquire accept() token for client"); } startconnection = timespec_real(); - if (UNLIKELY(maxworkers) && shared->workers >= maxworkers) { + if (UNLIKELY(maxworkers) && + atomic_load_explicit(&shared->workers, memory_order_relaxed) >= + maxworkers) { EnterMeltdownMode(); SendServiceUnavailable(); close(client);