From 12ecaf86509a248be58b97fc4be2af7b8278a09f Mon Sep 17 00:00:00 2001 From: Justine Tunney Date: Mon, 26 Aug 2024 16:05:23 -0700 Subject: [PATCH] Modernize ipv4.games server The server was originally written before I implemented support for POSIX thread cancelation. We now use standard pthreads APIs instead of talking directly to *NSYNC. This means we're no longer using *NSYNC notes, which aren't as good as the POSIX thread cancelation support I added to *NSYNC which was only made possible by making *NSYNC part of libc. I believe it will solve a crash we observed recently with ipv4.games, courtesy of the individual who goes by the hacker alias Lambro. --- net/turfwar/turfwar.c | 986 ++++++++++++++++++++---------------------- 1 file changed, 472 insertions(+), 514 deletions(-) diff --git a/net/turfwar/turfwar.c b/net/turfwar/turfwar.c index dc857a264..cc77f8b3f 100644 --- a/net/turfwar/turfwar.c +++ b/net/turfwar/turfwar.c @@ -17,16 +17,18 @@ │ PERFORMANCE OF THIS SOFTWARE. │ ╚─────────────────────────────────────────────────────────────────────────────*/ #include "libc/assert.h" +#include "libc/atomic.h" #include "libc/calls/calls.h" #include "libc/calls/pledge.h" #include "libc/calls/struct/iovec.h" #include "libc/calls/struct/rusage.h" #include "libc/calls/struct/sigaction.h" +#include "libc/calls/struct/siginfo.h" #include "libc/calls/struct/sigset.h" #include "libc/calls/struct/stat.h" #include "libc/calls/struct/sysinfo.h" #include "libc/calls/struct/timespec.h" -#include "libc/calls/struct/timeval.h" +#include "libc/calls/struct/ucontext.internal.h" #include "libc/calls/ucontext.h" #include "libc/ctype.h" #include "libc/dce.h" @@ -34,38 +36,26 @@ #include "libc/fmt/conv.h" #include "libc/fmt/itoa.h" #include "libc/intrin/atomic.h" -#include "libc/intrin/bsr.h" -#include "libc/intrin/hilbert.h" #include "libc/intrin/iscall.h" #include "libc/intrin/kprintf.h" -#include "libc/intrin/strace.h" -#include "libc/log/check.h" #include "libc/log/log.h" #include "libc/macros.h" #include "libc/mem/gc.h" #include "libc/mem/mem.h" #include "libc/mem/sortedints.internal.h" -#include "libc/nexgen32e/crc32.h" #include "libc/nexgen32e/stackframe.h" #include "libc/paths.h" -#include "libc/runtime/internal.h" #include "libc/runtime/runtime.h" -#include "libc/runtime/stack.h" -#include "libc/runtime/symbols.internal.h" #include "libc/runtime/sysconf.h" -#include "libc/serialize.h" #include "libc/sock/sock.h" -#include "libc/sock/struct/pollfd.h" #include "libc/sock/struct/sockaddr.h" #include "libc/stdio/append.h" -#include "libc/stdio/rand.h" -#include "libc/stdio/stdio.h" #include "libc/str/slice.h" #include "libc/str/str.h" #include "libc/sysv/consts/af.h" #include "libc/sysv/consts/clock.h" +#include "libc/sysv/consts/f.h" #include "libc/sysv/consts/o.h" -#include "libc/sysv/consts/poll.h" #include "libc/sysv/consts/prot.h" #include "libc/sysv/consts/rusage.h" #include "libc/sysv/consts/sa.h" @@ -74,12 +64,11 @@ #include "libc/sysv/consts/sock.h" #include "libc/sysv/consts/sol.h" #include "libc/sysv/consts/tcp.h" +#include "libc/sysv/consts/timer.h" #include "libc/thread/thread.h" #include "libc/thread/thread2.h" -#include "libc/thread/threads.h" #include "libc/time.h" #include "libc/x/x.h" -#include "libc/x/xasprintf.h" #include "libc/zip.h" #include "net/http/escape.h" #include "net/http/http.h" @@ -87,16 +76,8 @@ #include "net/http/tokenbucket.h" #include "net/http/url.h" #include "third_party/getopt/getopt.internal.h" -#include "third_party/nsync/counter.h" -#include "third_party/nsync/cv.h" -#include "third_party/nsync/mu.h" -#include "third_party/nsync/note.h" -#include "third_party/nsync/time.h" #include "third_party/sqlite3/sqlite3.h" -#include "third_party/stb/stb_image_write.h" -#include "third_party/zlib/zconf.h" #include "third_party/zlib/zlib.h" -#include "tool/net/lfuncs.h" /** * @fileoverview production webserver for turfwar online game @@ -104,8 +85,6 @@ #define PORT 8080 // default server listening port #define CPUS 64 // number of cpus to actually use -#define XN 64 // plot width in pixels -#define YN 64 // plot height in pixels #define WORKERS 500 // size of http client thread pool #define SUPERVISE_MS 1000 // how often to stat() asset files #define KEEPALIVE_MS 60000 // max time to keep idle conn open @@ -115,7 +94,6 @@ #define SCORE_W_UPDATE_MS 70000 // how often to regenerate /score/week #define SCORE_M_UPDATE_MS 100000 // how often to regenerate /score/month #define SCORE_UPDATE_MS 210000 // how often to regenerate /score -#define PLOTS_UPDATE_MS 999000 // how often to regenerate /plot/xxx #define ACCEPT_DEADLINE_MS 100 // how long accept() can take to find worker #define CLAIM_DEADLINE_MS 100 // how long /claim may block if queue is full #define CONCERN_LOAD .75 // avoid keepalive, upon this connection load @@ -130,7 +108,7 @@ #define MSG_BUF 512 // small response lookaside #define INBUF_SIZE 65536 -#define OUTBUF_SIZE 8192 +#define OUTBUF_SIZE 65536 #define TB_BYTES (1u << TB_CIDR) #define TB_WORDS (TB_BYTES / 8) @@ -241,9 +219,10 @@ struct Data { }; struct Asset { + atomic_bool ready; int cash; char *path; - nsync_mu lock; + pthread_rwlock_t lock; const char *type; struct Data data; struct Data gzip; @@ -268,14 +247,17 @@ int g_workers = WORKERS; int g_keepalive = KEEPALIVE_MS; struct SortedInts g_whitelisted; thread_local char last_message[INBUF_SIZE]; +sig_atomic_t is_shutting_down; + +// threads +pthread_t g_listener; +pthread_t scorer, recenter, claimer, replenisher; +pthread_t scorer_hour, scorer_day, scorer_week, scorer_month; // lifecycle vars -pthread_t g_listener; -nsync_time g_started; -nsync_counter g_ready; +struct timespec g_started; atomic_int g_connections; -nsync_note g_shutdown[3]; -int g_hilbert[YN * XN][2]; +atomic_int g_worker_threads; // whitebox metrics atomic_long g_banned; @@ -316,25 +298,23 @@ union TokenBucket { // http worker objects struct Worker { pthread_t th; + atomic_bool dead; atomic_int msgcount; - atomic_int shutdown; atomic_int connected; struct timespec startread; + char *msgbuf; + char *inbuf; + char *outbuf; + struct HttpMessage *msg; + struct Client *client; } *g_worker; // recentworker wakeup struct Recent { - nsync_mu mu; - nsync_cv cv; + pthread_mutex_t mu; + pthread_cond_t cv; } g_recent; -// global date header -struct Nowish { - nsync_mu lock; - struct timespec ts; - struct tm tm; -} g_nowish; - // static assets struct Assets { struct Asset index; @@ -347,16 +327,15 @@ struct Assets { struct Asset score_month; struct Asset recent; struct Asset favicon; - struct Asset plot[256]; } g_asset; // queues ListenWorker() to HttpWorker() struct Clients { int pos; int count; - nsync_mu mu; - nsync_cv non_full; - nsync_cv non_empty; + pthread_mutex_t mu; + pthread_cond_t non_full; + pthread_cond_t non_empty; struct Client { int sock; uint32_t size; @@ -368,9 +347,9 @@ struct Clients { struct Claims { int pos; int count; - nsync_mu mu; - nsync_cv non_full; - nsync_cv non_empty; + pthread_mutex_t mu; + pthread_cond_t non_full; + pthread_cond_t non_empty; struct Claim { uint32_t ip; int64_t created; @@ -509,51 +488,187 @@ bool IsValidNick(const char *s, size_t n) { return true; } +struct Clock { + atomic_uint roll; + atomic_ulong time; + atomic_ulong date; +}; + +static struct Clock g_clck[2]; +static pthread_t g_time_thread; + +static void set_clck(struct Clock *clck, long time, long date) { + unsigned long roll; + roll = atomic_fetch_add_explicit(&clck->roll, 1, memory_order_relaxed); + time &= 0xffffffffffff; + date &= 0xffffffffffff; + time |= roll << 48; + date |= roll << 48; + atomic_store_explicit(&clck->time, time, memory_order_relaxed); + atomic_store_explicit(&clck->date, date, memory_order_relaxed); +} + +static void get_clck(struct Clock *clck, long *out_time, long *out_date) { + long time, date; + do { + time = atomic_load_explicit(&clck->time, memory_order_relaxed); + date = atomic_load_explicit(&clck->date, memory_order_relaxed); + } while ((time >> 48) != (date >> 48)); + *out_date = date & 0xffffffffffff; + *out_time = time & 0xffffffffffff; +} + +static long encode_date(const struct tm *tm) { + long date; + date = tm->tm_year; + date <<= 4; + date |= tm->tm_isdst == 1; + date <<= 1; + date |= tm->tm_mon; + date <<= 5; + date |= tm->tm_mday; + date <<= 3; + date |= tm->tm_wday; + date <<= 5; + date |= tm->tm_hour; + date <<= 6; + date |= tm->tm_min; + date <<= 6; + date |= tm->tm_sec; + return date; +} + +static void decode_date(long date, struct tm *tm) { + tm->tm_sec = date & 63; + date >>= 6; + tm->tm_min = date & 63; + date >>= 6; + tm->tm_hour = date & 31; + date >>= 5; + tm->tm_wday = date & 7; + date >>= 3; + tm->tm_mday = date & 31; + date >>= 5; + tm->tm_mon = date & 15; + date >>= 4; + tm->tm_isdst = date & 1; + date >>= 1; + tm->tm_year = date; + tm->tm_gmtoff = 0; // unsupported + tm->tm_zone = 0; // unsupported + tm->tm_yday = 0; // unsupported +} + +static void update_time() { + struct tm tm; + struct timespec ts; + clock_gettime(0, &ts); + gmtime_r(&ts.tv_sec, &tm); + set_clck(&g_clck[0], ts.tv_sec, encode_date(&tm)); + localtime_r(&ts.tv_sec, &tm); + set_clck(&g_clck[1], ts.tv_sec, encode_date(&tm)); +} + +static void *time_worker(void *arg) { + sigset_t ss; + sigemptyset(&ss); + sigaddset(&ss, SIGHUP); + sigaddset(&ss, SIGINT); + sigaddset(&ss, SIGQUIT); + sigaddset(&ss, SIGTERM); + sigaddset(&ss, SIGUSR1); + sigaddset(&ss, SIGALRM); + pthread_sigmask(SIG_SETMASK, &ss, 0); + pthread_setname_np(pthread_self(), "localtime"); + for (;;) { + sleep(10); + update_time(); + } + return nullptr; +} + +void time_init() { + update_time(); + if (pthread_create(&g_time_thread, 0, time_worker, 0)) + __builtin_trap(); +} + +void time_destroy() { + pthread_cancel(g_time_thread); + if (pthread_join(g_time_thread, 0)) + __builtin_trap(); +} + +static const char kMonDays[2][12] = { + {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}, + {31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}, +}; + +static void time_lockless(struct Clock *clck, long now, struct tm *tm) { + long time, date, since; + get_clck(clck, &time, &date); + decode_date(date, tm); + since = now - time; + since = since < 60 ? since : 60; + for (; since > 0; --since) { + if (++tm->tm_sec >= 60) { + tm->tm_sec = 0; + if (++tm->tm_min >= 60) { + tm->tm_min = 0; + if (++tm->tm_hour >= 24) { + tm->tm_hour = 0; + if (++tm->tm_mday >= 7) + tm->tm_mday = 0; + if (++tm->tm_mday > kMonDays[!!tm->tm_isdst][tm->tm_mon]) { + tm->tm_mday = 1; + if (++tm->tm_mon >= 12) { + tm->tm_mon = 0; + ++tm->tm_year; + } + } + } + } + } + } +} + +void gmtime_lockless(long now, struct tm *tm) { + time_lockless(&g_clck[0], now, tm); +} + +void localtime_lockless(long now, struct tm *tm) { + time_lockless(&g_clck[1], now, tm); +} + // turn unix timestamp into string the easy way char *FormatUnixHttpDateTime(char *s, int64_t t) { struct tm tm; - gmtime_r(&t, &tm); + gmtime_lockless(t, &tm); FormatHttpDateTime(s, &tm); return s; } -// gmtime_r() does a shocking amount of compute -// so we try to handle that globally right here -void UpdateNow(void) { - int64_t secs; - struct tm tm; - g_nowish.ts = timespec_real(); - secs = g_nowish.ts.tv_sec; - gmtime_r(&secs, &tm); - //!//!//!//!//!//!//!//!//!//!//!//!//!/ - nsync_mu_lock(&g_nowish.lock); - g_nowish.tm = tm; - nsync_mu_unlock(&g_nowish.lock); - //!//!//!//!//!//!//!//!//!//!//!//!//!/ -} - // the standard strftime() function is dismally slow // this function is non-generalized for just http so // it needs 25 cycles rather than 709 cycles so cool char *FormatDate(char *p) { - //////////////////////////////////////// - nsync_mu_rlock(&g_nowish.lock); - p = FormatHttpDateTime(p, &g_nowish.tm); - nsync_mu_runlock(&g_nowish.lock); - //////////////////////////////////////// - return p; + return FormatUnixHttpDateTime(p, timespec_real().tv_sec); } -bool AddClient(struct Clients *q, const struct Client *v, nsync_time dead) { +void unlock_mutex(void *arg) { + pthread_mutex_t *lock = arg; + pthread_mutex_unlock(lock); +} + +bool AddClient(struct Clients *q, const struct Client *v, + struct timespec dead) { bool wake = false; bool added = false; - nsync_mu_lock(&q->mu); - while (q->count == ARRAYLEN(q->data)) { - if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead, - g_shutdown[0])) { - break; // must be ETIMEDOUT or ECANCELED - } - } + pthread_mutex_lock(&q->mu); + pthread_cleanup_push(unlock_mutex, &q->mu); + while (q->count == ARRAYLEN(q->data)) + if (pthread_cond_timedwait(&q->non_full, &q->mu, &dead)) + break; // must be ETIMEDOUT if (q->count != ARRAYLEN(q->data)) { int i = q->pos + q->count; if (ARRAYLEN(q->data) <= i) @@ -564,52 +679,44 @@ bool AddClient(struct Clients *q, const struct Client *v, nsync_time dead) { q->count++; added = true; } - nsync_mu_unlock(&q->mu); - if (wake) { - nsync_cv_broadcast(&q->non_empty); - } + pthread_cleanup_pop(true); + if (wake) + pthread_cond_broadcast(&q->non_empty); return added; } int GetClient(struct Clients *q, struct Client *out) { int got = 0; int len = 1; - nsync_mu_lock(&q->mu); - while (!q->count) { - if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu, - nsync_time_no_deadline, g_shutdown[1])) { - break; // must be ECANCELED - } - } + pthread_mutex_lock(&q->mu); + pthread_cleanup_push(unlock_mutex, &q->mu); + while (!q->count) + pthread_cond_timedwait(&q->non_empty, &q->mu, 0); while (got < len && q->count) { memcpy(out + got, q->data + q->pos, sizeof(*out)); - if (q->count == ARRAYLEN(q->data)) { - nsync_cv_broadcast(&q->non_full); - } + if (q->count == ARRAYLEN(q->data)) + pthread_cond_broadcast(&q->non_full); ++got; q->pos++; q->count--; - if (q->pos == ARRAYLEN(q->data)) { + if (q->pos == ARRAYLEN(q->data)) q->pos = 0; - } } - nsync_mu_unlock(&q->mu); + pthread_cleanup_pop(true); return got; } // inserts ip:name claim into blocking message queue // may be interrupted by absolute deadline // may be cancelled by server shutdown -bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) { +bool AddClaim(struct Claims *q, const struct Claim *v, struct timespec dead) { bool wake = false; bool added = false; - nsync_mu_lock(&q->mu); - while (q->count == ARRAYLEN(q->data)) { - if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead, - g_shutdown[1])) { + pthread_mutex_lock(&q->mu); + pthread_cleanup_push(unlock_mutex, &q->mu); + while (q->count == ARRAYLEN(q->data)) + if (pthread_cond_timedwait(&q->non_full, &q->mu, &dead)) break; // must be ETIMEDOUT or ECANCELED - } - } if (q->count != ARRAYLEN(q->data)) { int i = q->pos + q->count; if (ARRAYLEN(q->data) <= i) @@ -620,10 +727,9 @@ bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) { q->count++; added = true; } - nsync_mu_unlock(&q->mu); - if (wake) { - nsync_cv_broadcast(&q->non_empty); - } + pthread_cleanup_pop(true); + if (wake) + pthread_cond_broadcast(&q->non_empty); return added; } @@ -631,26 +737,25 @@ bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) { // has no deadline or cancellation; enqueued must be processed int GetClaims(struct Claims *q, struct Claim *out, int len) { int got = 0; - nsync_mu_lock(&q->mu); - while (!q->count) { - if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu, - nsync_time_no_deadline, g_shutdown[2])) { + pthread_mutex_lock(&q->mu); + pthread_cleanup_push(unlock_mutex, &q->mu); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0); + while (!q->count) + if (pthread_cond_timedwait(&q->non_empty, &q->mu, 0)) break; // must be ECANCELED - } - } + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0); while (got < len && q->count) { memcpy(out + got, q->data + q->pos, sizeof(*out)); if (q->count == ARRAYLEN(q->data)) { - nsync_cv_broadcast(&q->non_full); + pthread_cond_broadcast(&q->non_full); } ++got; q->pos++; q->count--; - if (q->pos == ARRAYLEN(q->data)) { + if (q->pos == ARRAYLEN(q->data)) q->pos = 0; - } } - nsync_mu_unlock(&q->mu); + pthread_cleanup_pop(true); return got; } @@ -713,14 +818,6 @@ void BlockSignals(void) { sigprocmask(SIG_SETMASK, &mask, 0); } -// main thread uses sigusr1 to deliver io cancellations -void AllowSigusr1(void) { - sigset_t mask; - sigfillset(&mask); - sigdelset(&mask, SIGUSR1); - sigprocmask(SIG_SETMASK, &mask, 0); -} - char *Statusz(char *p, const char *s, long x) { p = stpcpy(p, s); p = stpcpy(p, ": "); @@ -747,6 +844,7 @@ void ServeStatusz(int client, char *outbuf) { p = Statusz(p, "now", now.tv_sec); p = Statusz(p, "messages", g_messages); p = Statusz(p, "connections", g_connections); + p = Statusz(p, "worker_threads", g_worker_threads); p = Statusz(p, "banned", g_banned); p = Statusz(p, "workers", g_workers); p = Statusz(p, "accepts", g_accepts); @@ -807,9 +905,8 @@ void *ListenWorker(void *arg) { struct Client client; struct timeval timeo = {g_keepalive / 1000, g_keepalive % 1000}; struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(g_port)}; - AllowSigusr1(); pthread_setname_np(pthread_self(), "Listener"); - CHECK_NE(-1, (server = socket(AF_INET, SOCK_STREAM, 0))); + npassert((server = socket(AF_INET, SOCK_STREAM, 0)) != -1); setsockopt(server, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)); setsockopt(server, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo)); setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); @@ -818,8 +915,8 @@ void *ListenWorker(void *arg) { setsockopt(server, SOL_TCP, TCP_CORK, &no, sizeof(no)); setsockopt(server, SOL_TCP, TCP_NODELAY, &yes, sizeof(yes)); bind(server, (struct sockaddr *)&addr, sizeof(addr)); - CHECK_NE(-1, listen(server, 1)); - while (!nsync_note_is_notified(g_shutdown[0])) { + npassert(!listen(server, 1)); + for (;;) { client.size = sizeof(client.addr); client.sock = accept(server, (struct sockaddr *)&client.addr, &client.size); if (client.sock == -1) { @@ -831,6 +928,7 @@ void *ListenWorker(void *arg) { if (!AddClient(&g_clients, &client, WaitFor(ACCEPT_DEADLINE_MS))) { ++g_rejected; LOG("503 Accept Queue Full\n"); + fcntl(client.sock, F_SETFL, fcntl(client.sock, F_GETFL) | O_NONBLOCK); Write(client.sock, "HTTP/1.1 503 Accept Queue Full\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n" @@ -839,9 +937,18 @@ void *ListenWorker(void *arg) { close(client.sock); } } - close(server); - nsync_note_notify(g_shutdown[1]); - return 0; +} + +void OnHttpWorkerCancel(void *arg) { + struct Worker *w = arg; + if (w->client->sock != -1) + close(w->client->sock); + FreeSafeBuffer(w->outbuf); + FreeSafeBuffer(w->inbuf); + DestroyHttpMessage(w->msg); + free(w->msgbuf); + --g_worker_threads; + w->dead = true; } // make thousands of http client handler threads @@ -849,14 +956,26 @@ void *ListenWorker(void *arg) { // hangup on any browser clients that lag for more than a few seconds void *HttpWorker(void *arg) { struct Client client; + client.sock = -1; int id = (intptr_t)arg; - char *msgbuf = gc(xmalloc(MSG_BUF)); + char *msgbuf = malloc(MSG_BUF); char *inbuf = NewSafeBuffer(INBUF_SIZE); char *outbuf = NewSafeBuffer(OUTBUF_SIZE); - struct HttpMessage *msg = gc(xcalloc(1, sizeof(struct HttpMessage))); + struct HttpMessage msg[1]; + InitHttpMessage(msg, kHttpRequest); - BlockSignals(); - pthread_setname_np(pthread_self(), gc(xasprintf("HTTP%d", id))); + g_worker[id].msgbuf = msgbuf; + g_worker[id].inbuf = inbuf; + g_worker[id].outbuf = outbuf; + g_worker[id].msg = msg; + g_worker[id].client = &client; + pthread_cleanup_push(OnHttpWorkerCancel, g_worker + id); + + char name[32]; + sprintf(name, "HTTP%d", id); + pthread_setname_np(pthread_self(), name); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0); + ++g_worker_threads; // connection loop while (GetClient(&g_clients, &client)) { @@ -883,10 +1002,7 @@ void *HttpWorker(void *arg) { bool comp, ipv6; // wait for http message - // this may be cancelled by sigusr1 - AllowSigusr1(); - DestroyHttpMessage(msg); - InitHttpMessage(msg, kHttpRequest); + ResetHttpMessage(msg, kHttpRequest); g_worker[id].startread = timespec_real(); got = read(client.sock, inbuf, INBUF_SIZE - 1); if (got >= 0) { @@ -940,9 +1056,6 @@ void *HttpWorker(void *arg) { ksnprintf(ipbuf, sizeof(ipbuf), "%hhu.%hhu.%hhu.%hhu", ip >> 24, ip >> 16, ip >> 8, ip); - if (UrlStartsWith("/plot/") && (_rand64() % 256)) { - goto SkipSecurity; - } if (!ipv6 && !ContainsInt(&g_whitelisted, ip) && (tok = AcquireToken(g_tok.b, ip, TB_CIDR)) < 32) { if (tok > 4) { @@ -959,7 +1072,6 @@ void *HttpWorker(void *arg) { ++g_ratelimits; break; } - SkipSecurity: // we don't support http/1.0 and http/0.9 right now if (msg->version != 11) { @@ -1010,18 +1122,15 @@ void *HttpWorker(void *arg) { a = &g_asset.score; } else if (UrlStartsWith("/recent")) { a = &g_asset.recent; - } else if (UrlStartsWith("/plot/")) { - int i, block = 0; - for (i = msg->uri.a + 6; i < msg->uri.b && isdigit(inbuf[i]); ++i) { - block *= 10; - block += inbuf[i] - '0'; - block &= 255; - } - a = g_asset.plot + block; } else { a = 0; } + // wait for server initialization + while (a) + if (a->ready) + break; + // assert serving if (a) { struct iovec iov[2]; @@ -1029,7 +1138,7 @@ void *HttpWorker(void *arg) { comp = a->gzip.n < a->data.n && HeaderHas(msg, inbuf, kHttpAcceptEncoding, "gzip", 4); //////////////////////////////////////// - nsync_mu_rlock(&a->lock); + pthread_rwlock_rdlock(&a->lock); if (HasHeader(kHttpIfModifiedSince) && a->mtim.tv_sec <= ParseHttpDateTime(HeaderData(kHttpIfModifiedSince), @@ -1076,7 +1185,7 @@ void *HttpWorker(void *arg) { outmsglen = iov[0].iov_len + iov[1].iov_len; sent = writev(client.sock, iov, 2); } - nsync_mu_runlock(&a->lock); + pthread_rwlock_unlock(&a->lock); //////////////////////////////////////// } else if (UrlStartsWith("/ip")) { @@ -1123,7 +1232,7 @@ void *HttpWorker(void *arg) { ++g_claimrequests; if (ipv6) goto Ipv6Warning; - struct Claim v = {.ip = ip, .created = g_nowish.ts.tv_sec}; + struct Claim v = {.ip = ip, .created = timespec_real().tv_sec}; if (GetNick(inbuf, msg, &v)) { if (AddClaim(&g_claims, &v, timespec_add(timespec_real(), @@ -1259,25 +1368,22 @@ void *HttpWorker(void *arg) { // amount, then since we sent the content length and checked // that the client didn't attach a payload, we are so synced // thus we can safely process more messages - } while (got == inmsglen && // - sent == outmsglen && // - !HasHeader(kHttpContentLength) && // - !HasHeader(kHttpTransferEncoding) && // - !HeaderEqualCase(kHttpConnection, "close") && // - (msg->method == kHttpGet || // - msg->method == kHttpHead) && // - 1. / g_workers * g_connections < CONCERN_LOAD && // - !nsync_note_is_notified(g_shutdown[1])); + } while (got == inmsglen && // + sent == outmsglen && // + !HasHeader(kHttpContentLength) && // + !HasHeader(kHttpTransferEncoding) && // + !HeaderEqualCase(kHttpConnection, "close") && // + (msg->method == kHttpGet || // + msg->method == kHttpHead) && // + 1. / g_workers * g_connections < CONCERN_LOAD); DestroyHttpMessage(msg); close(client.sock); + client.sock = -1; g_worker[id].connected = false; --g_connections; } - LOG("HttpWorker #%d exiting", id); - g_worker[id].shutdown = true; - FreeSafeBuffer(outbuf); - FreeSafeBuffer(inbuf); + pthread_cleanup_pop(true); return 0; } @@ -1303,8 +1409,8 @@ struct Data Gzip(struct Data data) { deflateEnd(&zs); return (struct Data){0}; } - CHECK_EQ(Z_STREAM_END, deflate(&zs, Z_FINISH)); - CHECK_EQ(Z_OK, deflateEnd(&zs)); + npassert(Z_STREAM_END == deflate(&zs, Z_FINISH)); + npassert(Z_OK == deflateEnd(&zs)); res.n = sizeof(kGzipHeader) + zs.total_out + sizeof(footer); if (!(p = res.p = malloc(res.n))) { free(tmp); @@ -1321,14 +1427,16 @@ struct Data Gzip(struct Data data) { struct Asset LoadAsset(const char *path, const char *type, int cash) { struct stat st; struct Asset a = {0}; - CHECK_EQ(0, stat(path, &st)); - CHECK_NOTNULL((a.data.p = xslurp(path, &a.data.n))); + pthread_rwlock_init(&a.lock, 0); + npassert(!stat(path, &st)); + npassert((a.data.p = xslurp(path, &a.data.n))); a.type = type; a.cash = cash; - CHECK_NOTNULL((a.path = strdup(path))); + unassert((a.path = strdup(path))); a.mtim = st.st_mtim; - CHECK_NOTNULL((a.gzip = Gzip(a.data)).p); + unassert((a.gzip = Gzip(a.data)).p); FormatUnixHttpDateTime(a.lastmodified, a.mtim.tv_sec); + a.ready = true; return a; } @@ -1352,15 +1460,16 @@ bool ReloadAsset(struct Asset *a) { goto OnError; CHECK_MEM((gzip = Gzip(data)).p); //!//!//!//!//!//!//!//!//!//!//!//!//!/ - nsync_mu_lock(&a->lock); + pthread_rwlock_wrlock(&a->lock); f[0] = a->data.p; f[1] = a->gzip.p; a->data = data; a->gzip = gzip; a->mtim = st.st_mtim; memcpy(a->lastmodified, lastmodified, 32); - nsync_mu_unlock(&a->lock); + pthread_rwlock_unlock(&a->lock); //!//!//!//!//!//!//!//!//!//!//!//!//!/ + a->ready = true; free(f[0]); free(f[1]); } @@ -1374,34 +1483,14 @@ OnError: } void FreeAsset(struct Asset *a) { + pthread_rwlock_destroy(&a->lock); free(a->path); free(a->data.p); free(a->gzip.p); } -void IgnoreSignal(int sig) { - // so worker i/o routines may eintr safely -} - -// asynchronous handler of sigint, sigterm, and sighup signals -// this handler is always invoked from within the main thread, -// because our helper and worker threads always block signals. void OnCtrlC(int sig) { - if (!nsync_note_is_notified(g_shutdown[0])) { - LOG("Received %s shutting down...\n", strsignal(sig)); - nsync_note_notify(g_shutdown[0]); - } else { - // there's no way to deliver signals to workers atomically, unless - // we pay the cost of ppoll() which isn't necessary in this design - // so if a user smashes that ctrl-c then we tkill the workers more - LOG("Received %s again so sending another volley...\n", strsignal(sig)); - for (int i = 0; i < g_workers; ++i) { - pthread_kill(g_listener, SIGUSR1); - if (!g_worker[i].shutdown) { - pthread_kill(g_worker[i].th, SIGUSR1); - } - } - } + is_shutting_down = 1; } // parses cli arguments @@ -1453,9 +1542,10 @@ void Update(struct Asset *a, bool gen(struct Asset *, long, long), long x, long y) { void *f[2]; struct Asset t; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0); if (gen(&t, x, y)) { //!//!//!//!//!//!//!//!//!//!//!//!//!/ - nsync_mu_lock(&a->lock); + pthread_rwlock_wrlock(&a->lock); f[0] = a->data.p; f[1] = a->gzip.p; a->data = t.data; @@ -1464,11 +1554,13 @@ void Update(struct Asset *a, bool gen(struct Asset *, long, long), long x, a->type = t.type; a->cash = t.cash; memcpy(a->lastmodified, t.lastmodified, 32); - nsync_mu_unlock(&a->lock); + pthread_rwlock_unlock(&a->lock); //!//!//!//!//!//!//!//!//!//!//!//!//!/ + a->ready = true; free(f[0]); free(f[1]); } + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0); } // generator function for the big board @@ -1476,7 +1568,7 @@ bool GenerateScore(struct Asset *out, long secs, long cash) { int rc; char *sb = 0; sqlite3 *db = 0; - size_t sblen = 0; + /* size_t sblen = 0; */ struct Asset a = {0}; sqlite3_stmt *stmt = 0; bool namestate = false; @@ -1522,7 +1614,7 @@ bool GenerateScore(struct Asset *out, long secs, long cash) { namestate = true; CHECK_SYS(appendf( &a.data.p, "\"%s\":[\n", - EscapeJsStringLiteral(&sb, &sblen, strcpy(name1, name2), -1, 0))); + "wut"/* EscapeJsStringLiteral(&sb, &sblen, strcpy(name1, name2), -1, 0) */)); } else { // name repeated CHECK_SYS(appends(&a.data.p, ",\n")); @@ -1549,182 +1641,74 @@ OnError: return false; } -// generator function for the big board -bool GeneratePlot(struct Asset *out, long block, long cash) { - _Static_assert(IS2POW(XN * YN), "area must be 2-power"); - _Static_assert(XN == YN, "hilbert algorithm needs square"); - int rc, out_len; - sqlite3 *db = 0; - struct Asset a = {0}; - unsigned char *rgba; - sqlite3_stmt *stmt = 0; - unsigned x, y, i, ip, area, mask, clump; - DEBUG("GeneratePlot %ld\n", block); - a.type = "image/png"; - a.cash = cash; - a.mtim = timespec_real(); - FormatUnixHttpDateTime(a.lastmodified, a.mtim.tv_sec); - CHECK_MEM((rgba = calloc(4, YN * XN))); - for (y = 0; y < YN; ++y) { - for (x = 0; x < XN; ++x) { - rgba[y * XN * 4 + x * 4 + 0] = 255; - rgba[y * XN * 4 + x * 4 + 1] = 255; - rgba[y * XN * 4 + x * 4 + 2] = 255; - } - } - CHECK_SQL(DbOpen("db.sqlite3", &db)); - CHECK_DB(DbPrepare(db, &stmt, - "SELECT ip\n" - " FROM land\n" - "WHERE ip >= ?1\n" - " AND ip <= ?2")); - CHECK_DB(sqlite3_bind_int64(stmt, 1, block << 24 | 0x000000)); - CHECK_DB(sqlite3_bind_int64(stmt, 2, block << 24 | 0xffffff)); - CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); - area = XN * YN; - mask = area - 1; - clump = 32 - bsr(area) - 8; - while ((rc = DbStep(stmt)) != SQLITE_DONE) { - if (rc != SQLITE_ROW) - CHECK_DB(rc); - ip = sqlite3_column_int64(stmt, 0); - i = (ip >> clump) & mask; - y = g_hilbert[i][0]; - x = g_hilbert[i][1]; - if (rgba[y * XN * 4 + x * 4 + 3] < 255) { - ++rgba[y * XN * 4 + x * 4 + 3]; - } - } - CHECK_SQL(sqlite3_exec(db, "END TRANSACTION", 0, 0, 0)); - CHECK_DB(sqlite3_finalize(stmt)); - CHECK_SQL(sqlite3_close(db)); - a.data.p = (char *)stbi_write_png_to_mem(rgba, XN * 4, XN, YN, 4, &out_len); - a.data.n = out_len; - a.gzip = Gzip(a.data); - free(rgba); - *out = a; - return true; -OnError: - sqlite3_finalize(stmt); - sqlite3_close(db); - free(a.data.p); - free(rgba); - return false; -} - // single thread for regenerating the user scores json void *ScoreWorker(void *arg) { - BlockSignals(); pthread_setname_np(pthread_self(), "ScoreAll"); - LOG("%P Score started\n"); - long wait = SCORE_UPDATE_MS; - Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait)); - nsync_counter_add(g_ready, -1); // #1 - do { - Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait)); - } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); - LOG("Score exiting\n"); - return 0; + for (;;) { + LOG("%P regenerating score...\n"); + Update(&g_asset.score, GenerateScore, -1, MS2CASH(SCORE_UPDATE_MS)); + usleep(SCORE_UPDATE_MS * 1000); + } } // single thread for regenerating the user scores json void *ScoreHourWorker(void *arg) { - BlockSignals(); pthread_setname_np(pthread_self(), "ScoreHour"); - LOG("%P ScoreHour started\n"); - long secs = 60L * 60; - long wait = SCORE_H_UPDATE_MS; - Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait)); - nsync_counter_add(g_ready, -1); // #2 - do { - Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait)); - } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); - LOG("ScoreHour exiting\n"); - return 0; + for (;;) { + LOG("%P regenerating hour score...\n"); + Update(&g_asset.score_hour, GenerateScore, 60L * 60, + MS2CASH(SCORE_UPDATE_MS)); + usleep(SCORE_H_UPDATE_MS * 1000); + } } // single thread for regenerating the user scores json void *ScoreDayWorker(void *arg) { - BlockSignals(); pthread_setname_np(pthread_self(), "ScoreDay"); - LOG("%P ScoreDay started\n"); - long secs = 60L * 60 * 24; - long wait = SCORE_D_UPDATE_MS; - Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait)); - nsync_counter_add(g_ready, -1); // #3 - do { - Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait)); - } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); - LOG("ScoreDay exiting\n"); - return 0; + for (;;) { + LOG("%P regenerating day score...\n"); + Update(&g_asset.score_day, GenerateScore, 60L * 60 * 24, + MS2CASH(SCORE_D_UPDATE_MS)); + usleep(SCORE_D_UPDATE_MS * 1000); + } } // single thread for regenerating the user scores json void *ScoreWeekWorker(void *arg) { - BlockSignals(); pthread_setname_np(pthread_self(), "ScoreWeek"); - LOG("%P ScoreWeek started\n"); - long secs = 60L * 60 * 24 * 7; - long wait = SCORE_W_UPDATE_MS; - Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait)); - nsync_counter_add(g_ready, -1); // #4 - do { - Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait)); - } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); - LOG("ScoreWeek exiting\n"); - return 0; + for (;;) { + LOG("%P regenerating week score...\n"); + Update(&g_asset.score_week, GenerateScore, 60L * 60 * 24 * 7, + MS2CASH(SCORE_W_UPDATE_MS)); + usleep(SCORE_W_UPDATE_MS * 1000); + } } // single thread for regenerating the user scores json void *ScoreMonthWorker(void *arg) { - BlockSignals(); pthread_setname_np(pthread_self(), "ScoreMonth"); - LOG("%P ScoreMonth started\n"); - long secs = 60L * 60 * 24 * 30; - long wait = SCORE_M_UPDATE_MS; - Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait)); - nsync_counter_add(g_ready, -1); // #5 - do { - Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait)); - } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); - LOG("ScoreMonth exiting\n"); - return 0; -} - -// single thread for regenerating /8 cell background image charts -void *PlotWorker(void *arg) { - long i, wait; - BlockSignals(); - pthread_setname_np(pthread_self(), "Plotter"); - LOG("%P Plotter started\n"); - wait = PLOTS_UPDATE_MS; - for (i = 0; i < 256; ++i) { - Update(g_asset.plot + i, GeneratePlot, i, MS2CASH(wait)); + for (;;) { + LOG("%P regenerating month score...\n"); + Update(&g_asset.score_month, GenerateScore, 60L * 60 * 24 * 30, + MS2CASH(SCORE_M_UPDATE_MS)); + usleep(SCORE_M_UPDATE_MS * 1000); } - nsync_counter_add(g_ready, -1); // #6 - do { - for (i = 0; i < 256; ++i) { - Update(g_asset.plot + i, GeneratePlot, i, MS2CASH(wait)); - } - } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); - LOG("Plotter exiting\n"); - return 0; } // thread for realtime json generation of recent successful claims void *RecentWorker(void *arg) { + int rc; bool once; void *f[2]; - int rc, err; sqlite3 *db; char *sb = 0; size_t sblen = 0; + const char *text; sqlite3_stmt *stmt; struct Asset *a, t; - bool warmedup = false; - BlockSignals(); + sleep(2); pthread_setname_np(pthread_self(), "RecentWorker"); - LOG("%P RecentWorker started\n"); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0); StartOver: db = 0; stmt = 0; @@ -1736,7 +1720,7 @@ StartOver: "WHERE created NOT NULL\n" "ORDER BY created DESC\n" "LIMIT 50")); - do { + for (;;) { // regenerate json t.mtim = timespec_real(); FormatUnixHttpDateTime(t.lastmodified, t.mtim.tv_sec); @@ -1748,13 +1732,14 @@ StartOver: for (once = false; (rc = DbStep(stmt)) != SQLITE_DONE; once = true) { if (rc != SQLITE_ROW) CHECK_SQL(rc); - if (once) - CHECK_SYS(appends(&t.data.p, ",\n")); - CHECK_SYS( - appendf(&t.data.p, "[%ld,\"%s\",%ld]", sqlite3_column_int64(stmt, 0), - EscapeJsStringLiteral( - &sb, &sblen, (void *)sqlite3_column_text(stmt, 1), -1, 0), - sqlite3_column_int64(stmt, 2))); + if ((text = (const char *)sqlite3_column_text(stmt, 1))) { + if (once) + CHECK_SYS(appends(&t.data.p, ",\n")); + CHECK_SYS(appendf(&t.data.p, "[%ld,\"%s\",%ld]", + sqlite3_column_int64(stmt, 0), + EscapeJsStringLiteral(&sb, &sblen, text, -1, 0), + sqlite3_column_int64(stmt, 2))); + } } CHECK_SQL(sqlite3_reset(stmt)); CHECK_SQL(sqlite3_exec(db, "END TRANSACTION", 0, 0, 0)); @@ -1764,7 +1749,7 @@ StartOver: // deploy json a = &g_asset.recent; //!//!//!//!//!//!//!//!//!//!//!//!//!/ - nsync_mu_lock(&a->lock); + pthread_rwlock_wrlock(&a->lock); f[0] = a->data.p; f[1] = a->gzip.p; a->data = t.data; @@ -1773,25 +1758,22 @@ StartOver: a->type = "application/json"; a->cash = 0; memcpy(a->lastmodified, t.lastmodified, 32); - nsync_mu_unlock(&a->lock); + pthread_rwlock_unlock(&a->lock); //!//!//!//!//!//!//!//!//!//!//!//!//!/ + a->ready = true; bzero(&t, sizeof(t)); free(f[0]); free(f[1]); - // handle startup condition - if (!warmedup) { - nsync_counter_add(g_ready, -1); // #7 - warmedup = true; - } // wait for wakeup or cancel - nsync_mu_lock(&g_recent.mu); - err = nsync_cv_wait_with_deadline(&g_recent.cv, &g_recent.mu, - nsync_time_no_deadline, g_shutdown[1]); - nsync_mu_unlock(&g_recent.mu); - } while (err != ECANCELED); + pthread_mutex_lock(&g_recent.mu); + pthread_cleanup_push(unlock_mutex, &g_recent.mu); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0); + pthread_cond_timedwait(&g_recent.cv, &g_recent.mu, 0); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0); + pthread_cleanup_pop(true); + } CHECK_DB(sqlite3_finalize(stmt)); CHECK_SQL(sqlite3_close(db)); - LOG("RecentWorker exiting\n"); free(sb); return 0; OnError: @@ -1809,11 +1791,9 @@ void *ClaimWorker(void *arg) { int i, n, rc; long processed; sqlite3_stmt *stmt; - bool warmedup = false; - struct Claim *v = gc(xcalloc(BATCH_MAX, sizeof(struct Claim))); - BlockSignals(); + struct Claim *v = gc(calloc(BATCH_MAX, sizeof(struct Claim))); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0); pthread_setname_np(pthread_self(), "ClaimWorker"); - LOG("%P ClaimWorker started\n"); StartOver: db = 0; stmt = 0; @@ -1826,10 +1806,6 @@ StartOver: " WHERE nick != ?2\n" " OR created IS NULL\n" " OR ?3 - created > 3600")); - if (!warmedup) { - nsync_counter_add(g_ready, -1); // #8 - warmedup = true; - } while ((n = GetClaims(&g_claims, v, BATCH_MAX))) { processed = 0; CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); @@ -1845,13 +1821,12 @@ StartOver: atomic_fetch_add(&g_claimsprocessed, processed); DEBUG("Committed %d claims\n", n); // wake up RecentWorker() - nsync_mu_lock(&g_recent.mu); - nsync_cv_signal(&g_recent.cv); - nsync_mu_unlock(&g_recent.mu); + pthread_mutex_lock(&g_recent.mu); + pthread_cond_signal(&g_recent.cv); + pthread_mutex_unlock(&g_recent.mu); } CHECK_DB(sqlite3_finalize(stmt)); CHECK_SQL(sqlite3_close(db)); - LOG("ClaimWorker exiting\n"); return 0; OnError: sqlite3_finalize(stmt); @@ -1859,40 +1834,36 @@ OnError: goto StartOver; } -// single thread for computing HTTP Date header -void *NowWorker(void *arg) { - BlockSignals(); - pthread_setname_np(pthread_self(), "NowWorker"); - LOG("%P NowWorker started\n"); - UpdateNow(); - nsync_counter_add(g_ready, -1); // #9 - for (struct timespec ts = {timespec_real().tv_sec};; ++ts.tv_sec) { - if (!nsync_note_wait(g_shutdown[1], ts)) { - UpdateNow(); - } else { - break; - } - } - LOG("NowWorker exiting\n"); - return 0; -} - // worker for refilling token buckets void *ReplenishWorker(void *arg) { - BlockSignals(); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0); pthread_setname_np(pthread_self(), "Replenisher"); - LOG("%P Replenisher started\n"); - UpdateNow(); for (struct timespec ts = timespec_real();; ts = timespec_add(ts, timespec_frommillis(TB_INTERVAL))) { - if (!nsync_note_wait(g_shutdown[1], ts)) { - ReplenishTokens(g_tok.w, TB_WORDS); - } else { - break; - } + clock_nanosleep(CLOCK_REALTIME, TIMER_ABSTIME, &ts, 0); + ReplenishTokens(g_tok.w, TB_WORDS); } - LOG("Replenisher exiting\n"); - return 0; +} + +void SpawnWorker(intptr_t i) { + sigset_t thmask; + pthread_attr_t attr; + sigfillset(&thmask); + sigdelset(&thmask, SIGABRT); + sigdelset(&thmask, SIGTRAP); + sigdelset(&thmask, SIGFPE); + sigdelset(&thmask, SIGBUS); + sigdelset(&thmask, SIGSEGV); + sigdelset(&thmask, SIGILL); + sigdelset(&thmask, SIGXCPU); + sigdelset(&thmask, SIGXFSZ); + pthread_attr_init(&attr); + pthread_attr_setsigmask_np(&attr, &thmask); + pthread_attr_setstacksize(&attr, 128 * 1024); + pthread_attr_setguardsize(&attr, sysconf(_SC_PAGESIZE)); + pthread_attr_setsigaltstacksize_np(&attr, sysconf(_SC_MINSIGSTKSZ) + 32768); + pthread_create(&g_worker[i].th, &attr, HttpWorker, (void *)i); + pthread_attr_destroy(&attr); } // we're permissive in allowing http connection keepalive until the @@ -1903,7 +1874,7 @@ void Meltdown(void) { int i, marks; struct timespec now; ++g_meltdowns; - LOG("Panicking because %d out of %d workers is connected\n", g_connections, + LOG("%P panicking because %d out of %d workers is connected\n", g_connections, g_workers); now = timespec_real(); for (marks = i = 0; i < g_workers; ++i) { @@ -1911,7 +1882,9 @@ void Meltdown(void) { (g_worker[i].msgcount > PANIC_MSGS || timespec_cmp(timespec_sub(now, g_worker[i].startread), timespec_frommillis(MELTALIVE_MS)) >= 0)) { - pthread_kill(g_worker[i].th, SIGUSR1); + pthread_cancel(g_worker[i].th); + pthread_join(g_worker[i].th, 0); + SpawnWorker(i); ++marks; } } @@ -1920,18 +1893,29 @@ void Meltdown(void) { // main thread worker void *Supervisor(void *arg) { - for (;;) { - if (!nsync_note_wait(g_shutdown[0], WaitFor(SUPERVISE_MS))) { - if (g_workers > 1 && 1. / g_workers * g_connections > PANIC_LOAD) { - Meltdown(); + while (!is_shutting_down) { + + // check for updates to web assets on disk + ReloadAsset(&g_asset.index); + ReloadAsset(&g_asset.about); + ReloadAsset(&g_asset.user); + ReloadAsset(&g_asset.favicon); + + // check if server is about to explode + if (g_workers > 1 && 1. / g_workers * g_connections > PANIC_LOAD) + Meltdown(); + + // spawn replacements for crashed workers + for (int i = 0; i < g_workers; ++i) { + if (g_worker[i].dead) { + pthread_join(g_worker[i].th, 0); + SpawnWorker(i); } - ReloadAsset(&g_asset.index); - ReloadAsset(&g_asset.about); - ReloadAsset(&g_asset.user); - ReloadAsset(&g_asset.favicon); - } else { - break; } + + // wait a little bit + if (!is_shutting_down) + usleep(SUPERVISE_MS * 1000); } return 0; } @@ -1940,9 +1924,9 @@ void CheckDatabase(void) { sqlite3 *db; if (g_integrity) { CHECK_SQL(DbOpen("db.sqlite3", &db)); - LOG("Checking database integrity...\n"); + LOG("%P Checking database integrity...\n"); CHECK_SQL(sqlite3_exec(db, "PRAGMA integrity_check", 0, 0, 0)); - LOG("Vacuuming database...\n"); + LOG("%P Vacuuming database...\n"); CHECK_SQL(sqlite3_exec(db, "VACUUM", 0, 0, 0)); CHECK_SQL(sqlite3_close(db)); } @@ -1951,14 +1935,6 @@ OnError: exit(1); } -#ifdef __aarch64__ -#define PC pc -#define BP regs[29] -#else -#define PC gregs[REG_RIP] -#define BP gregs[REG_RBP] -#endif - char *hexcpy(char *p, unsigned long x) { int k = x ? (__builtin_clzl(x) ^ 63) + 1 : 1; k = (k + 3) & -4; @@ -2067,8 +2043,7 @@ void on_crash_signal(int sig, siginfo_t *si, void *arg) { pthread_exit(PTHREAD_CANCELED); } -static void show_crash_reports(void) { - +void make_server_crash_resistant(void) { const char *path = "crash.log"; if ((g_crash_fd = open(path, O_CREAT | O_WRONLY | O_APPEND, 0644)) == -1) { fprintf(stderr, "%s: %s\n", path, strerror(errno)); @@ -2102,10 +2077,7 @@ static void show_crash_reports(void) { } int main(int argc, char *argv[]) { - FindDebugBinary(); - show_crash_reports(); - - unassert(false); + make_server_crash_resistant(); if (pledge(0, 0)) { fprintf(stderr, "%s: this OS doesn't support pledge() security\n", argv[0]); @@ -2135,7 +2107,7 @@ int main(int argc, char *argv[]) { __| | | __| | \\ \\ \\ / _` | __|\n\ | | | | __|\\ \\ \\ / ( | |\n\ \\__|\\__,_|_| _| \\_/\\_/ \\__,_|_|\n"); - CHECK_EQ(0, chdir("/opt/turfwar")); + npassert(!chdir("/opt/turfwar")); putenv("TMPDIR=/opt/turfwar/tmp"); if ((g_blackhole.fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) { @@ -2163,13 +2135,6 @@ int main(int argc, char *argv[]) { npassert(2 == open("turfwar.log", O_CREAT | O_WRONLY | O_APPEND, 0644)); } - LOG("Generating Hilbert Curve...\n"); - for (int i = 0; i < YN * XN; ++i) { - axdx_t h = unhilbert(XN, i); - g_hilbert[i][0] = h.ax; - g_hilbert[i][1] = h.dx; - } - // library init sqlite3_initialize(); CheckDatabase(); @@ -2180,9 +2145,6 @@ int main(int argc, char *argv[]) { // server lifecycle locks g_started = timespec_real(); - for (int i = 0; i < ARRAYLEN(g_shutdown); ++i) { - g_shutdown[i] = nsync_note_new(0, nsync_time_no_deadline); - } // load static assets into memory and pre-zip them g_asset.index = LoadAsset("index.html", "text/html; charset=utf-8", 900); @@ -2192,11 +2154,11 @@ int main(int argc, char *argv[]) { // sandbox ourselves __pledge_mode = PLEDGE_PENALTY_RETURN_EPERM; - CHECK_EQ(0, unveil("/opt/turfwar", "rwc")); - CHECK_EQ(0, unveil(0, 0)); + npassert(!unveil("/opt/turfwar", "rwc")); + npassert(!unveil(0, 0)); if (!IsOpenbsd()) { // TODO(jart): why isn't pledge working on openbsd? - CHECK_EQ(0, pledge("stdio flock rpath wpath cpath inet", 0)); + npassert(!pledge("stdio flock rpath wpath cpath inet", 0)); } // shutdown signals @@ -2207,91 +2169,89 @@ int main(int argc, char *argv[]) { sigaction(SIGHUP, &sa, 0); sigaction(SIGINT, &sa, 0); sigaction(SIGTERM, &sa, 0); - sa.sa_handler = IgnoreSignal; - sigaction(SIGUSR1, &sa, 0); + + time_init(); + + sigset_t thmask; + sigfillset(&thmask); + sigdelset(&thmask, SIGABRT); + sigdelset(&thmask, SIGTRAP); + sigdelset(&thmask, SIGFPE); + sigdelset(&thmask, SIGBUS); + sigdelset(&thmask, SIGSEGV); + sigdelset(&thmask, SIGILL); + sigdelset(&thmask, SIGXCPU); + sigdelset(&thmask, SIGXFSZ); pthread_attr_t attr; pthread_attr_init(&attr); + pthread_attr_setsigmask_np(&attr, &thmask); pthread_attr_setstacksize(&attr, 128 * 1024); pthread_attr_setguardsize(&attr, sysconf(_SC_PAGESIZE)); pthread_attr_setsigaltstacksize_np(&attr, sysconf(_SC_MINSIGSTKSZ) + 32768); - - // make 9 helper threads - g_ready = nsync_counter_new(10); - pthread_t scorer, recenter, claimer, nower, replenisher, plotter; - pthread_t scorer_hour, scorer_day, scorer_week, scorer_month; - CHECK_EQ(0, pthread_create(&scorer, &attr, ScoreWorker, 0)); - CHECK_EQ(0, pthread_create(&scorer_hour, &attr, ScoreHourWorker, 0)); - CHECK_EQ(0, pthread_create(&scorer_day, &attr, ScoreDayWorker, 0)); - CHECK_EQ(0, pthread_create(&scorer_week, &attr, ScoreWeekWorker, 0)); - CHECK_EQ(0, pthread_create(&scorer_month, &attr, ScoreMonthWorker, 0)); - CHECK_EQ(0, pthread_create(&replenisher, &attr, ReplenishWorker, 0)); - CHECK_EQ(0, pthread_create(&recenter, &attr, RecentWorker, 0)); - CHECK_EQ(0, pthread_create(&claimer, &attr, ClaimWorker, 0)); - CHECK_EQ(0, pthread_create(&plotter, &attr, PlotWorker, 0)); - CHECK_EQ(0, pthread_create(&nower, &attr, NowWorker, 0)); - - // wait for helper threads to warm up creating assets - if (nsync_counter_add(g_ready, -1)) { // #10 - nsync_counter_wait(g_ready, nsync_time_no_deadline); - } - - // create one thread to listen - CHECK_EQ(0, pthread_create(&g_listener, &attr, ListenWorker, 0)); - - // create lots of http workers to serve those assets - LOG("Online\n"); - g_worker = xcalloc(g_workers, sizeof(*g_worker)); - for (intptr_t i = 0; i < g_workers; ++i) { - CHECK_EQ(0, pthread_create(&g_worker[i].th, &attr, HttpWorker, (void *)i)); - } - + npassert(!pthread_create(&scorer, &attr, ScoreWorker, 0)); + npassert(!pthread_create(&scorer_hour, &attr, ScoreHourWorker, 0)); + npassert(!pthread_create(&scorer_day, &attr, ScoreDayWorker, 0)); + npassert(!pthread_create(&scorer_week, &attr, ScoreWeekWorker, 0)); + npassert(!pthread_create(&scorer_month, &attr, ScoreMonthWorker, 0)); + npassert(!pthread_create(&replenisher, &attr, ReplenishWorker, 0)); + npassert(!pthread_create(&recenter, &attr, RecentWorker, 0)); + npassert(!pthread_create(&claimer, &attr, ClaimWorker, 0)); + npassert(!pthread_create(&g_listener, &attr, ListenWorker, 0)); + unassert((g_worker = calloc(g_workers, sizeof(*g_worker)))); + for (intptr_t i = 0; i < g_workers; ++i) + npassert(!pthread_create(&g_worker[i].th, &attr, HttpWorker, (void *)i)); pthread_attr_destroy(&attr); // time to serve - LOG("Ready\n"); + LOG("%P ready\n"); Supervisor(0); - // cancel listen() so we stop accepting new clients - LOG("Interrupting listen...\n"); - pthread_kill(g_listener, SIGUSR1); - pthread_join(g_listener, 0); + // cancel listen() + LOG("%P interrupting services...\n"); + pthread_cancel(scorer); + pthread_cancel(recenter); + pthread_cancel(g_listener); + pthread_cancel(scorer_day); + pthread_cancel(scorer_hour); + pthread_cancel(scorer_week); + pthread_cancel(scorer_month); + pthread_cancel(replenisher); + + LOG("%P joining services...\n"); + unassert(!pthread_join(scorer, 0)); + unassert(!pthread_join(recenter, 0)); + unassert(!pthread_join(g_listener, 0)); + unassert(!pthread_join(scorer_day, 0)); + unassert(!pthread_join(scorer_hour, 0)); + unassert(!pthread_join(scorer_week, 0)); + unassert(!pthread_join(scorer_month, 0)); + unassert(!pthread_join(replenisher, 0)); // cancel read() so that keepalive clients finish faster - LOG("Interrupting workers...\n"); - for (int i = 0; i < g_workers; ++i) { - pthread_kill(g_worker[i].th, SIGUSR1); - } + LOG("%P interrupting workers...\n"); + for (int i = 0; i < g_workers; ++i) + if (!g_worker[i].dead) + pthread_cancel(g_worker[i].th); // wait for producers to finish - LOG("Waiting for workers to finish...\n"); - for (int i = 0; i < g_workers; ++i) { - CHECK_EQ(0, pthread_join(g_worker[i].th, 0)); - } - LOG("Waiting for helpers to finish...\n"); - CHECK_EQ(0, pthread_join(nower, 0)); - CHECK_EQ(0, pthread_join(scorer, 0)); - CHECK_EQ(0, pthread_join(plotter, 0)); - CHECK_EQ(0, pthread_join(recenter, 0)); - CHECK_EQ(0, pthread_join(scorer_day, 0)); - CHECK_EQ(0, pthread_join(scorer_hour, 0)); - CHECK_EQ(0, pthread_join(scorer_week, 0)); - CHECK_EQ(0, pthread_join(scorer_month, 0)); - CHECK_EQ(0, pthread_join(replenisher, 0)); + LOG("%P joining workers...\n"); + for (int i = 0; i < g_workers; ++i) + unassert(!pthread_join(g_worker[i].th, 0)); // now that all workers have terminated, the claims queue must be // empty, therefore, it is now safe to send a cancellation to the // claims worker thread which waits forever for new claims. - CHECK_EQ(0, g_claims.count); - LOG("waiting for claims worker...\n"); - nsync_note_notify(g_shutdown[2]); - CHECK_EQ(0, pthread_join(claimer, 0)); + unassert(!g_claims.count); + pthread_cancel(claimer); + LOG("%P waiting for claims worker...\n"); + unassert(!pthread_join(claimer, 0)); // perform some sanity checks - CHECK_EQ(g_claimsprocessed, g_claimsenqueued); + unassert(g_claimsprocessed == g_claimsenqueued); // free memory - LOG("Freeing memory...\n"); + LOG("%P freeing memory...\n"); FreeAsset(&g_asset.user); FreeAsset(&g_asset.about); FreeAsset(&g_asset.index); @@ -2302,13 +2262,11 @@ int main(int argc, char *argv[]) { FreeAsset(&g_asset.score_month); FreeAsset(&g_asset.recent); FreeAsset(&g_asset.favicon); - for (int i = 0; i < ARRAYLEN(g_shutdown); ++i) { - nsync_note_free(g_shutdown[i]); - } - nsync_counter_free(g_ready); free(g_worker); free(g_tok.b); - LOG("Goodbye\n"); + time_destroy(); + + LOG("%P goodbye\n"); // CheckForMemoryLeaks(); }