From 3ee3736a015d021680c3f917886c773a2073e243 Mon Sep 17 00:00:00 2001 From: Justine Tunney Date: Mon, 3 Oct 2022 21:50:00 -0700 Subject: [PATCH] Put finishing touches on turfwar http server - Shutdown process now has optimal cancellation latency - Fairer techniques for shedding connections under load - We no longer need to call poll() which is now removed --- libc/calls/g_sighandrvas.c | 2 - net/turfwar/turfwar.c | 172 +++++++++++++++++++++++++------------ third_party/unzip/fileio.c | 4 +- 3 files changed, 117 insertions(+), 61 deletions(-) diff --git a/libc/calls/g_sighandrvas.c b/libc/calls/g_sighandrvas.c index 3e0780d59..deb3e93de 100644 --- a/libc/calls/g_sighandrvas.c +++ b/libc/calls/g_sighandrvas.c @@ -19,7 +19,5 @@ #include "libc/calls/state.internal.h" #include "libc/thread/thread.h" -// TODO(jart): These should be _Thread_local but doing that currently -// causes a regression with runitd.com on Windows. unsigned __sighandrvas[NSIG]; unsigned __sighandflags[NSIG]; diff --git a/net/turfwar/turfwar.c b/net/turfwar/turfwar.c index e49493f8d..65a1ac413 100644 --- a/net/turfwar/turfwar.c +++ b/net/turfwar/turfwar.c @@ -21,6 +21,7 @@ #include "libc/calls/pledge.h" #include "libc/calls/struct/iovec.h" #include "libc/calls/struct/sigaction.h" +#include "libc/calls/struct/sigset.h" #include "libc/calls/struct/stat.h" #include "libc/calls/struct/timespec.h" #include "libc/calls/struct/timeval.h" @@ -79,20 +80,22 @@ * @fileoverview production webserver for turfwar online game */ -#define PORT 8080 -#define WORKERS 9001 -#define HEARTBEAT 2000 -#define KEEPALIVE_MS 1000 -#define POLL_ASSETS_MS 250 -#define DATE_UPDATE_MS 500 -#define SCORE_UPDATE_MS 15000 -#define CLAIM_DEADLINE_MS 100 -#define QUEUE_MAX 800 -#define BATCH_MAX 64 -#define NICK_MAX 40 -#define MSG_MAX 10 -#define INBUF_SIZE PAGESIZE -#define OUTBUF_SIZE PAGESIZE +#define PORT 8080 // default server listening port +#define WORKERS 9001 // size of http client thread pool +#define KEEPALIVE_MS 60000 // max time to keep idle conn open +#define MELTALIVE_MS 2000 // panic keepalive under heavy load +#define POLL_ASSETS_MS 1000 // how often to stat() asset files +#define DATE_UPDATE_MS 500 // how often to do tzdata crunching +#define SCORE_UPDATE_MS 15000 // how often to regeenrate /score json +#define CLAIM_DEADLINE_MS 100 // how long /claim may block if queue is full +#define PANIC_LOAD .85 // meltdown if this percent of pool connected +#define PANIC_MSGS 10 // msgs per conn can't exceed it in meltdown +#define QUEUE_MAX 800 // maximum pending claim items in queue +#define BATCH_MAX 64 // max claims to insert per transaction +#define NICK_MAX 40 // max length of user nickname string + +#define INBUF_SIZE PAGESIZE +#define OUTBUF_SIZE PAGESIZE #define GETOPTS "dvp:w:k:" #define USAGE \ @@ -194,6 +197,14 @@ int g_keepalive = KEEPALIVE_MS; nsync_note g_shutdown; nsync_note g_terminate; +atomic_int g_connections; + +struct Worker { + pthread_t th; + atomic_int msgcount; + atomic_bool connected; + struct timespec startread; +} * g_worker; struct Recent { nsync_mu mu; @@ -407,25 +418,27 @@ void DontRunOnFirstCpus(int i) { } } -// thousands of threads for handling client connections +// thousands of http client servicing threads +// load balance incoming connections for port 8080 across all threads +// hangup on any browser clients that lag for more than a few seconds void *HttpWorker(void *arg) { int server; int yes = 1; char name[16]; + sigset_t mask; int id = (intptr_t)arg; char *inbuf = NewSafeBuffer(INBUF_SIZE); char *outbuf = NewSafeBuffer(OUTBUF_SIZE); - struct HttpMessage *msg = _gc(xmalloc(sizeof(struct HttpMessage))); - STRACE("HttpWorker #%d started", id); - DontRunOnFirstCpus(2); - ksnprintf(name, sizeof(name), "HTTP #%d", id); - pthread_setname_np(pthread_self(), name); - - // load balance incoming connections for port 8080 across all threads - // hangup on any browser clients that lag for more than a few seconds struct timeval timeo = {g_keepalive / 1000, g_keepalive % 1000}; + struct HttpMessage *msg = _gc(xmalloc(sizeof(struct HttpMessage))); struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(g_port)}; + sigfillset(&mask); + DontRunOnFirstCpus(2); + sigdelset(&mask, SIGUSR1); + sigprocmask(SIG_SETMASK, &mask, 0); + ksnprintf(name, sizeof(name), "HTTP #%d", id); + pthread_setname_np(pthread_self(), name); CHECK_NE(-1, (server = socket(AF_INET, SOCK_STREAM, 0))); setsockopt(server, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)); setsockopt(server, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo)); @@ -433,46 +446,44 @@ void *HttpWorker(void *arg) { setsockopt(server, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)); setsockopt(server, SOL_TCP, TCP_FASTOPEN, &yes, sizeof(yes)); setsockopt(server, SOL_TCP, TCP_QUICKACK, &yes, sizeof(yes)); - errno = 0; - CHECK_NE(-1, bind(server, &addr, sizeof(addr))); CHECK_NE(-1, listen(server, 1)); + errno = 0; // connection loop while (!nsync_note_is_notified(g_shutdown)) { - int msgcount; struct Data d; struct Url url; - bool comp, ipv6; - struct Asset *a; ssize_t got, sent; - struct iovec iov[2]; uint32_t ip, clientip; char ipbuf[32], *p, *q; uint32_t clientaddrsize; struct sockaddr_in clientaddr; int client, inmsglen, outmsglen; - // this slows the server down a lot but is needed on non-Linux to - // react to keyboard ctrl-c - if (!IsLinux() && - poll(&(struct pollfd){server, POLLIN}, 1, HEARTBEAT) < 1) { - continue; - } - // wait for client connection clientaddrsize = sizeof(clientaddr); client = accept(server, (struct sockaddr *)&clientaddr, &clientaddrsize); if (client == -1) continue; ip = clientip = ntohl(clientaddr.sin_addr.s_addr); + g_worker[id].connected = true; + g_worker[id].msgcount = 0; + ++g_connections; // strict message loop w/o pipelining - msgcount = 0; do { + struct Asset *a; + bool comp, ipv6; + sigdelset(&mask, SIGUSR1); + sigprocmask(SIG_SETMASK, &mask, 0); InitHttpMessage(msg, kHttpRequest); + g_worker[id].startread = _timespec_real(); if ((got = read(client, inbuf, INBUF_SIZE)) <= 0) break; + sigaddset(&mask, SIGUSR1); + sigprocmask(SIG_SETMASK, &mask, 0); if ((inmsglen = ParseHttpMessage(msg, inbuf, got)) <= 0) break; if (msg->version != 11) break; // cloudflare won't send 0.9 or 1.0 + ++g_worker[id].msgcount; // get the ip address again // we assume a firewall only lets the frontend talk to this server @@ -502,7 +513,9 @@ void *HttpWorker(void *arg) { } if (a) { - comp = HeaderHas(msg, inbuf, kHttpAcceptEncoding, "gzip", 4); + struct iovec iov[2]; + comp = a->gzip.n < a->data.n && + HeaderHas(msg, inbuf, kHttpAcceptEncoding, "gzip", 4); p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept-Encoding\r\n" "Date: "); @@ -654,18 +667,19 @@ void *HttpWorker(void *arg) { // amount, then since we sent the content length and checked // that the client didn't attach a payload, we are so synced // thus we can safely process more messages - } while (got == inmsglen && // - sent == outmsglen && // - ++msgcount < MSG_MAX && // + } while (got == inmsglen && // + sent == outmsglen && // !msg->headers[kHttpContentLength].a && !msg->headers[kHttpTransferEncoding].a && (msg->method == kHttpGet || msg->method == kHttpHead) && !nsync_note_is_notified(g_shutdown)); DestroyHttpMessage(msg); close(client); + g_worker[id].connected = false; + --g_connections; } - STRACE("HttpWorker #%d exiting", id); + LOG("HttpWorker #%d exiting", id); FreeSafeBuffer(outbuf); FreeSafeBuffer(inbuf); close(server); @@ -756,8 +770,12 @@ void FreeAsset(struct Asset *a) { free(a->gzip.p); } +void IgnoreSignal(int sig) { + // so worker i/o routines may eintr safely +} + void OnCtrlC(int sig) { - LOG("Got ctrl-c...\n"); + LOG("Received %s shutting down...\n", strsignal(sig)); nsync_note_notify(g_shutdown); } @@ -1034,17 +1052,43 @@ void *NowWorker(void *arg) { return 0; } -// single thread for monitoring assets on disk -void *AssetWorker(void *arg) { +// we're permissive in allowing http connection keepalive until the +// moment worker resources start becoming scarce. when that happens +// we'll (1) cancel read operations that have not sent us a message +// in a while; (2) cancel clients who are sending lots of messages. +void Meltdown(void) { + int i, marks; + struct timespec now; + LOG("Panicking because %d out of %d workers is connected\n", g_connections, + g_workers); + now = _timespec_real(); + for (marks = i = 0; i < g_workers; ++i) { + if (g_worker[i].connected && + (g_worker[i].msgcount > PANIC_MSGS || + _timespec_gte(_timespec_sub(now, g_worker[i].startread), + _timespec_frommillis(MELTALIVE_MS)))) { + tkill(pthread_getunique_np(g_worker[i].th), SIGUSR1); + ++marks; + } + } + LOG("Melted down %d connections\n", marks); +} + +// main thread worker +void *Supervisor(void *arg) { nsync_time deadline; OnlyRunOnCpu(0); - pthread_setname_np(pthread_self(), "AssetWorker"); + pthread_setname_np(pthread_self(), "Supervisor"); for (deadline = _timespec_real();;) { deadline = _timespec_add(deadline, _timespec_frommillis(POLL_ASSETS_MS)); if (!nsync_note_wait(g_shutdown, deadline)) { + if (g_workers > 1 && 1. / g_workers * g_connections > PANIC_LOAD) { + Meltdown(); + } ReloadAsset(&g_asset.index); ReloadAsset(&g_asset.about); ReloadAsset(&g_asset.user); + ReloadAsset(&g_asset.favicon); } else { break; } @@ -1053,7 +1097,7 @@ void *AssetWorker(void *arg) { } int main(int argc, char *argv[]) { - ShowCrashReports(); + // ShowCrashReports(); GetOpts(argc, argv); __enable_threads(); @@ -1073,6 +1117,17 @@ int main(int argc, char *argv[]) { __pledge_mode = PLEDGE_PENALTY_RETURN_EPERM; CHECK_EQ(0, pledge("stdio flock rpath wpath cpath inet", 0)); + // signal handling + struct sigaction sa; + sa.sa_flags = 0; + sa.sa_handler = OnCtrlC; + sigfillset(&sa.sa_mask); + sigaction(SIGHUP, &sa, 0); + sigaction(SIGINT, &sa, 0); + sigaction(SIGTERM, &sa, 0); + sa.sa_handler = IgnoreSignal; + sigaction(SIGUSR1, &sa, 0); + // create threads pthread_t scorer; CHECK_EQ(1, GenerateScore(&g_asset.score)); @@ -1085,25 +1140,28 @@ int main(int argc, char *argv[]) { pthread_t nower; UpdateNow(); CHECK_EQ(0, pthread_create(&nower, 0, NowWorker, 0)); - pthread_t *httper = _gc(xcalloc(g_workers, sizeof(pthread_t))); + g_worker = _gc(xcalloc(g_workers, sizeof(*g_worker))); for (intptr_t i = 0; i < g_workers; ++i) { LOG("Starting http worker #%d", i); - CHECK_EQ(0, pthread_create(httper + i, 0, HttpWorker, (void *)i)); + CHECK_EQ(0, pthread_create(&g_worker[i].th, 0, HttpWorker, (void *)i)); } - // main thread activity - struct sigaction sa = {.sa_handler = OnCtrlC}; - sigaction(SIGHUP, &sa, 0); - sigaction(SIGINT, &sa, 0); - sigaction(SIGTERM, &sa, 0); - LOG("Server is ready\n"); - AssetWorker(0); + // time to serve + LOG("Hello\n"); + Supervisor(0); + + // cancel accept and read for fast shutdown + LOG("Interrupting workers...\n"); + for (int i = 0; i < g_workers; ++i) { + tkill(pthread_getunique_np(g_worker[i].th), SIGUSR1); + } // wait for producers to finish LOG("Waiting for workers to finish...\n"); for (int i = 0; i < g_workers; ++i) { - CHECK_EQ(0, pthread_join(httper[i], 0)); + CHECK_EQ(0, pthread_join(g_worker[i].th, 0)); } + LOG("Waiting for helpers to finish...\n"); CHECK_EQ(0, pthread_join(recentr, 0)); CHECK_EQ(0, pthread_join(scorer, 0)); CHECK_EQ(0, pthread_join(nower, 0)); diff --git a/third_party/unzip/fileio.c b/third_party/unzip/fileio.c index 16292fac5..148383004 100644 --- a/third_party/unzip/fileio.c +++ b/third_party/unzip/fileio.c @@ -2887,7 +2887,7 @@ zvoid *memset(buf, init, len) /* Function memcmp() */ /*********************/ -int memcmp(b1, b2, len) +int memcmp_(b1, b2, len) register ZCONST zvoid *b1; register ZCONST zvoid *b2; register unsigned int len; @@ -2908,7 +2908,7 @@ int memcmp(b1, b2, len) /* Function memcpy() */ /*********************/ -zvoid *memcpy(dst, src, len) +zvoid *memcpy_(dst, src, len) register zvoid *dst; register ZCONST zvoid *src; register unsigned int len;