workers is atomic, add some mutexes

This commit is contained in:
Steven Dee (Jōshin) 2024-12-01 22:03:04 -05:00
parent 58c6f24d9d
commit 493249a02f
No known key found for this signature in database

View file

@ -181,10 +181,8 @@ __static_yoink("blink_xnu_aarch64"); // is apple silicon
#define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a) #define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a)
#define HeaderEqualCase(H, S) \ #define HeaderEqualCase(H, S) \
SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H)) SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H))
#define LockInc(P) \ #define LockInc(P) atomic_fetch_add_explicit(P, +1, memory_order_relaxed)
atomic_fetch_add_explicit(P, +1, memory_order_relaxed) #define LockDec(P) atomic_fetch_add_explicit(P, -1, memory_order_relaxed)
#define LockDec(P) \
atomic_fetch_add_explicit(P, -1, memory_order_relaxed)
#define TRACE_BEGIN \ #define TRACE_BEGIN \
do { \ do { \
@ -375,7 +373,7 @@ struct Blackhole {
} blackhole; } blackhole;
static struct Shared { static struct Shared {
int workers; _Atomic(int) workers;
struct timespec nowish; struct timespec nowish;
struct timespec lastreindex; struct timespec lastreindex;
struct timespec lastmeltdown; struct timespec lastmeltdown;
@ -387,7 +385,9 @@ static struct Shared {
#include "tool/net/counters.inc" #include "tool/net/counters.inc"
#undef C #undef C
} c; } c;
pthread_spinlock_t montermlock; pthread_mutex_t datetime_mu;
pthread_mutex_t server_mu;
pthread_mutex_t children_mu;
} *shared; } *shared;
static const char kCounterNames[] = static const char kCounterNames[] =
@ -1348,8 +1348,8 @@ static void CallSimpleHookIfDefined(const char *s) {
} }
static void ReportWorkerExit(int pid, int ws) { static void ReportWorkerExit(int pid, int ws) {
int workers; int workers =
workers = atomic_fetch_sub((_Atomic(int) *)&shared->workers, 1) - 1; atomic_fetch_sub_explicit(&shared->workers, 1, memory_order_release);
if (WIFEXITED(ws)) { if (WIFEXITED(ws)) {
if (WEXITSTATUS(ws)) { if (WEXITSTATUS(ws)) {
LockInc(&shared->c.failedchildren); LockInc(&shared->c.failedchildren);
@ -1381,7 +1381,9 @@ static void ReportWorkerResources(int pid, struct rusage *ru) {
static void HandleWorkerExit(int pid, int ws, struct rusage *ru) { static void HandleWorkerExit(int pid, int ws, struct rusage *ru) {
LockInc(&shared->c.connectionshandled); LockInc(&shared->c.connectionshandled);
unassert(!pthread_mutex_lock(&shared->children_mu));
rusage_add(&shared->children, ru); rusage_add(&shared->children, ru);
unassert(!pthread_mutex_unlock(&shared->children_mu));
ReportWorkerExit(pid, ws); ReportWorkerExit(pid, ws);
ReportWorkerResources(pid, ru); ReportWorkerResources(pid, ru);
if (hasonprocessdestroy) { if (hasonprocessdestroy) {
@ -2127,9 +2129,11 @@ static void UpdateCurrentDate(struct timespec now) {
int64_t t; int64_t t;
struct tm tm; struct tm tm;
t = now.tv_sec; t = now.tv_sec;
shared->nowish = now;
gmtime_r(&t, &tm); gmtime_r(&t, &tm);
unassert(pthread_mutex_lock(&shared->datetime_mu));
shared->nowish = now;
FormatHttpDateTime(shared->currentdate, &tm); FormatHttpDateTime(shared->currentdate, &tm);
unassert(pthread_mutex_unlock(&shared->datetime_mu));
} }
static int64_t GetGmtOffset(int64_t t) { static int64_t GetGmtOffset(int64_t t) {
@ -2362,7 +2366,10 @@ static char *AppendCache(char *p, int64_t seconds, char *directive) {
p = stpcpy(p, directive); p = stpcpy(p, directive);
} }
p = AppendCrlf(p); p = AppendCrlf(p);
return AppendExpires(p, shared->nowish.tv_sec + seconds); unassert(pthread_mutex_lock(&shared->datetime_mu));
long nowish_sec = shared->nowish.tv_sec;
unassert(pthread_mutex_unlock(&shared->datetime_mu));
return AppendExpires(p, nowish_sec + seconds);
} }
static inline char *AppendContentLength(char *p, size_t n) { static inline char *AppendContentLength(char *p, size_t n) {
@ -3101,9 +3108,12 @@ td { padding-right: 3em; }\r\n\
<td valign=\"top\">\r\n\ <td valign=\"top\">\r\n\
<a href=\"/statusz\">/statusz</a>\r\n\ <a href=\"/statusz\">/statusz</a>\r\n\
"); ");
if (shared->c.connectionshandled) { if (atomic_load_explicit(&shared->c.connectionshandled,
memory_order_acquire)) {
appends(&cpm.outbuf, "says your redbean<br>\r\n"); appends(&cpm.outbuf, "says your redbean<br>\r\n");
unassert(pthread_mutex_lock(&shared->children_mu));
AppendResourceReport(&cpm.outbuf, &shared->children, "<br>\r\n"); AppendResourceReport(&cpm.outbuf, &shared->children, "<br>\r\n");
unassert(pthread_mutex_unlock(&shared->children_mu));
} }
appends(&cpm.outbuf, "<td valign=\"top\">\r\n"); appends(&cpm.outbuf, "<td valign=\"top\">\r\n");
and = ""; and = "";
@ -3125,12 +3135,12 @@ td { padding-right: 3em; }\r\n\
} }
appendf(&cpm.outbuf, "%s%,ld second%s of operation<br>\r\n", and, y.rem, appendf(&cpm.outbuf, "%s%,ld second%s of operation<br>\r\n", and, y.rem,
y.rem == 1 ? "" : "s"); y.rem == 1 ? "" : "s");
x = shared->c.messageshandled; x = atomic_load_explicit(&shared->c.messageshandled, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld message%s handled<br>\r\n", x, x == 1 ? "" : "s"); appendf(&cpm.outbuf, "%,ld message%s handled<br>\r\n", x, x == 1 ? "" : "s");
x = shared->c.connectionshandled; x = atomic_load_explicit(&shared->c.connectionshandled, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld connection%s handled<br>\r\n", x, appendf(&cpm.outbuf, "%,ld connection%s handled<br>\r\n", x,
x == 1 ? "" : "s"); x == 1 ? "" : "s");
x = shared->workers; x = atomic_load_explicit(&shared->workers, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld connection%s active<br>\r\n", x, appendf(&cpm.outbuf, "%,ld connection%s active<br>\r\n", x,
x == 1 ? "" : "s"); x == 1 ? "" : "s");
appends(&cpm.outbuf, "</table>\r\n"); appends(&cpm.outbuf, "</table>\r\n");
@ -3182,11 +3192,11 @@ static void AppendRusage(const char *a, struct rusage *ru) {
} }
static void ServeCounters(void) { static void ServeCounters(void) {
const long *c; const _Atomic(long) *c;
const char *s; const char *s;
for (c = (const long *)&shared->c, s = kCounterNames; *s; for (c = (const _Atomic(long) *)&shared->c, s = kCounterNames; *s;
++c, s += strlen(s) + 1) { ++c, s += strlen(s) + 1) {
AppendLong1(s, *c); AppendLong1(s, atomic_load_explicit(c, memory_order_relaxed));
} }
} }
@ -3199,12 +3209,15 @@ static char *ServeStatusz(void) {
AppendLong1("pid", getpid()); AppendLong1("pid", getpid());
AppendLong1("ppid", getppid()); AppendLong1("ppid", getppid());
AppendLong1("now", timespec_real().tv_sec); AppendLong1("now", timespec_real().tv_sec);
unassert(pthread_mutex_lock(&shared->datetime_mu));
AppendLong1("nowish", shared->nowish.tv_sec); AppendLong1("nowish", shared->nowish.tv_sec);
unassert(pthread_mutex_unlock(&shared->datetime_mu));
AppendLong1("gmtoff", gmtoff); AppendLong1("gmtoff", gmtoff);
AppendLong1("CLK_TCK", CLK_TCK); AppendLong1("CLK_TCK", CLK_TCK);
AppendLong1("startserver", startserver.tv_sec); AppendLong1("startserver", startserver.tv_sec);
AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec); AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec);
AppendLong1("workers", shared->workers); AppendLong1("workers",
atomic_load_explicit(&shared->workers, memory_order_relaxed));
AppendLong1("assets.n", assets.n); AppendLong1("assets.n", assets.n);
#ifndef STATIC #ifndef STATIC
lua_State *L = GL; lua_State *L = GL;
@ -3212,8 +3225,12 @@ static char *ServeStatusz(void) {
lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB)); lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB));
#endif #endif
ServeCounters(); ServeCounters();
unassert(pthread_mutex_lock(&shared->server_mu));
AppendRusage("server", &shared->server); AppendRusage("server", &shared->server);
unassert(pthread_mutex_unlock(&shared->server_mu));
unassert(pthread_mutex_lock(&shared->children_mu));
AppendRusage("children", &shared->children); AppendRusage("children", &shared->children);
unassert(pthread_mutex_unlock(&shared->children_mu));
p = SetStatus(200, "OK"); p = SetStatus(200, "OK");
p = AppendContentType(p, "text/plain"); p = AppendContentType(p, "text/plain");
if (cpm.msg.version >= 11) { if (cpm.msg.version >= 11) {
@ -3978,7 +3995,9 @@ static int LuaNilTlsError(lua_State *L, const char *s, int r) {
#include "tool/net/fetch.inc" #include "tool/net/fetch.inc"
static int LuaGetDate(lua_State *L) { static int LuaGetDate(lua_State *L) {
unassert(pthread_mutex_lock(&shared->datetime_mu));
lua_pushinteger(L, shared->nowish.tv_sec); lua_pushinteger(L, shared->nowish.tv_sec);
unassert(pthread_mutex_unlock(&shared->datetime_mu));
return 1; return 1;
} }
@ -5032,7 +5051,7 @@ static int LuaProgramTokenBucket(lua_State *L) {
npassert(pid != -1); npassert(pid != -1);
if (!pid) if (!pid)
Replenisher(); Replenisher();
++shared->workers; atomic_fetch_add_explicit(&shared->workers, 1, memory_order_acquire);
return 0; return 0;
} }
@ -5677,7 +5696,8 @@ static void LogClose(const char *reason) {
if (amtread || meltdown || killed) { if (amtread || meltdown || killed) {
LockInc(&shared->c.fumbles); LockInc(&shared->c.fumbles);
INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)", INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)",
DescribeClient(), reason, amtread, messageshandled, shared->workers); DescribeClient(), reason, amtread, messageshandled,
atomic_load_explicit(&shared->workers, memory_order_relaxed));
} else { } else {
DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason, DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason,
messageshandled); messageshandled);
@ -5739,10 +5759,11 @@ static void EnterMeltdownMode(void) {
(struct timespec){1}) < 0) { (struct timespec){1}) < 0) {
return; return;
} }
WARNF("(srvr) server is melting down (%,d workers)", shared->workers); WARNF("(srvr) server is melting down (%,d workers)",
atomic_load_explicit(&shared->workers, memory_order_relaxed));
LOGIFNEG1(kill(0, SIGUSR2)); LOGIFNEG1(kill(0, SIGUSR2));
shared->lastmeltdown = timespec_real(); shared->lastmeltdown = timespec_real();
++shared->c.meltdowns; LockInc(&shared->c.meltdowns);
} }
static char *HandlePayloadDisconnect(void) { static char *HandlePayloadDisconnect(void) {
@ -5859,7 +5880,9 @@ static void HandleHeartbeat(void) {
size_t i; size_t i;
UpdateCurrentDate(timespec_real()); UpdateCurrentDate(timespec_real());
Reindex(); Reindex();
unassert(pthread_mutex_lock(&shared->server_mu));
getrusage(RUSAGE_SELF, &shared->server); getrusage(RUSAGE_SELF, &shared->server);
unassert(pthread_mutex_unlock(&shared->server_mu));
#ifndef STATIC #ifndef STATIC
CallSimpleHookIfDefined("OnServerHeartbeat"); CallSimpleHookIfDefined("OnServerHeartbeat");
CollectGarbage(); CollectGarbage();
@ -6479,7 +6502,9 @@ static bool HandleMessageActual(void) {
DEBUGF("(clnt) could not synchronize message stream"); DEBUGF("(clnt) could not synchronize message stream");
} }
if (cpm.msg.version >= 10) { if (cpm.msg.version >= 10) {
unassert(pthread_mutex_lock(&shared->datetime_mu));
p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate)); p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate));
unassert(pthread_mutex_unlock(&shared->datetime_mu));
if (!cpm.branded) if (!cpm.branded)
p = stpcpy(p, serverheader); p = stpcpy(p, serverheader);
if (extrahdrs) if (extrahdrs)
@ -6749,7 +6774,9 @@ static int HandleConnection(size_t i) {
DEBUGF("(token) can't acquire accept() token for client"); DEBUGF("(token) can't acquire accept() token for client");
} }
startconnection = timespec_real(); startconnection = timespec_real();
if (UNLIKELY(maxworkers) && shared->workers >= maxworkers) { if (UNLIKELY(maxworkers) &&
atomic_load_explicit(&shared->workers, memory_order_relaxed) >=
maxworkers) {
EnterMeltdownMode(); EnterMeltdownMode();
SendServiceUnavailable(); SendServiceUnavailable();
close(client); close(client);