/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│ │ vi: set et ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi │ ╞══════════════════════════════════════════════════════════════════════════════╡ │ Copyright 2022 Justine Alexandra Roberts Tunney │ │ │ │ Permission to use, copy, modify, and/or distribute this software for │ │ any purpose with or without fee is hereby granted, provided that the │ │ above copyright notice and this permission notice appear in all copies. │ │ │ │ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL │ │ WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED │ │ WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE │ │ AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL │ │ DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR │ │ PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER │ │ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │ │ PERFORMANCE OF THIS SOFTWARE. │ ╚─────────────────────────────────────────────────────────────────────────────*/ #include "libc/assert.h" #include "libc/calls/calls.h" #include "libc/calls/pledge.h" #include "libc/calls/struct/iovec.h" #include "libc/calls/struct/rusage.h" #include "libc/calls/struct/sigaction.h" #include "libc/calls/struct/sigset.h" #include "libc/calls/struct/stat.h" #include "libc/calls/struct/sysinfo.h" #include "libc/calls/struct/timespec.h" #include "libc/calls/struct/timeval.h" #include "libc/calls/ucontext.h" #include "libc/ctype.h" #include "libc/dce.h" #include "libc/errno.h" #include "libc/fmt/conv.h" #include "libc/fmt/itoa.h" #include "libc/intrin/atomic.h" #include "libc/intrin/bsr.h" #include "libc/intrin/hilbert.h" #include "libc/intrin/iscall.h" #include "libc/intrin/kprintf.h" #include "libc/intrin/strace.h" #include "libc/log/check.h" #include "libc/log/log.h" #include "libc/macros.h" #include "libc/mem/gc.h" #include "libc/mem/mem.h" #include "libc/mem/sortedints.internal.h" #include "libc/nexgen32e/crc32.h" #include "libc/nexgen32e/stackframe.h" #include "libc/paths.h" #include "libc/runtime/internal.h" #include "libc/runtime/runtime.h" #include "libc/runtime/stack.h" #include "libc/runtime/symbols.internal.h" #include "libc/runtime/sysconf.h" #include "libc/serialize.h" #include "libc/sock/sock.h" #include "libc/sock/struct/pollfd.h" #include "libc/sock/struct/sockaddr.h" #include "libc/stdio/append.h" #include "libc/stdio/rand.h" #include "libc/stdio/stdio.h" #include "libc/str/slice.h" #include "libc/str/str.h" #include "libc/sysv/consts/af.h" #include "libc/sysv/consts/clock.h" #include "libc/sysv/consts/o.h" #include "libc/sysv/consts/poll.h" #include "libc/sysv/consts/prot.h" #include "libc/sysv/consts/rusage.h" #include "libc/sysv/consts/sa.h" #include "libc/sysv/consts/sig.h" #include "libc/sysv/consts/so.h" #include "libc/sysv/consts/sock.h" #include "libc/sysv/consts/sol.h" #include "libc/sysv/consts/tcp.h" #include "libc/thread/thread.h" #include "libc/thread/thread2.h" #include "libc/thread/threads.h" #include "libc/time.h" #include "libc/x/x.h" #include "libc/x/xasprintf.h" #include "libc/zip.h" #include "net/http/escape.h" #include "net/http/http.h" #include "net/http/ip.h" #include "net/http/tokenbucket.h" #include "net/http/url.h" #include "third_party/getopt/getopt.internal.h" #include "third_party/nsync/counter.h" #include "third_party/nsync/cv.h" #include "third_party/nsync/mu.h" #include "third_party/nsync/note.h" #include "third_party/nsync/time.h" #include "third_party/sqlite3/sqlite3.h" #include "third_party/stb/stb_image_write.h" #include "third_party/zlib/zconf.h" #include "third_party/zlib/zlib.h" #include "tool/net/lfuncs.h" /** * @fileoverview production webserver for turfwar online game */ #define PORT 8080 // default server listening port #define CPUS 64 // number of cpus to actually use #define XN 64 // plot width in pixels #define YN 64 // plot height in pixels #define WORKERS 500 // size of http client thread pool #define SUPERVISE_MS 1000 // how often to stat() asset files #define KEEPALIVE_MS 60000 // max time to keep idle conn open #define MELTALIVE_MS 2000 // panic keepalive under heavy load #define SCORE_H_UPDATE_MS 10000 // how often to regenerate /score/hour #define SCORE_D_UPDATE_MS 30000 // how often to regenerate /score/day #define SCORE_W_UPDATE_MS 70000 // how often to regenerate /score/week #define SCORE_M_UPDATE_MS 100000 // how often to regenerate /score/month #define SCORE_UPDATE_MS 210000 // how often to regenerate /score #define PLOTS_UPDATE_MS 999000 // how often to regenerate /plot/xxx #define ACCEPT_DEADLINE_MS 100 // how long accept() can take to find worker #define CLAIM_DEADLINE_MS 100 // how long /claim may block if queue is full #define CONCERN_LOAD .75 // avoid keepalive, upon this connection load #define PANIC_LOAD .85 // meltdown if this percent of pool connected #define PANIC_MSGS 10 // msgs per conn can't exceed it in meltdown #define QUEUE_MAX 800 // maximum pending claim items in queue #define BATCH_MAX 64 // max claims to insert per transaction #define NICK_MAX 40 // max length of user nickname string #define TB_INTERVAL 1000 // millis between token replenishes #define TB_CIDR 24 // token bucket cidr specificity #define SOCK_MAX 100 // max length of socket queue #define MSG_BUF 512 // small response lookaside #define INBUF_SIZE 65536 #define OUTBUF_SIZE 8192 #define TB_BYTES (1u << TB_CIDR) #define TB_WORDS (TB_BYTES / 8) #define GETOPTS "idvp:w:k:W:" #define USAGE \ "\ Usage: turfwar.com [-dv] ARGS...\n\ -i integrity check and vacuum at startup\n\ -d daemonize\n\ -v verbosity\n\ -W IP whitelist\n\ -p INT port\n\ -w INT workers\n\ -k INT keepalive\n\ " #define STANDARD_RESPONSE_HEADERS \ "Server: turfwar\r\n" \ "Referrer-Policy: origin\r\n" \ "Access-Control-Allow-Origin: *\r\n" #define MS2CASH(x) (x / 1000 / 2) #define HasHeader(H) (!!msg->headers[H].a) #define HeaderData(H) (inbuf + msg->headers[H].a) #define HeaderLength(H) (msg->headers[H].b - msg->headers[H].a) #define HeaderEqual(H, S) \ SlicesEqual(S, strlen(S), HeaderData(H), HeaderLength(H)) #define HeaderEqualCase(H, S) \ SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H)) #define UrlEqual(S) \ SlicesEqual(inbuf + msg->uri.a, msg->uri.b - msg->uri.a, S, strlen(S)) #define UrlStartsWith(S) \ (msg->uri.b - msg->uri.a >= strlen(S) && \ !memcmp(inbuf + msg->uri.a, S, strlen(S))) // logging is line-buffered when LOG("foo\n") is used // log lines show ephemerally when LOG("foo") is used #if 1 #define LOG(...) kprintf("\r\e[K" __VA_ARGS__) #else #define LOG(...) (void)0 #endif #if 0 #define DEBUG(...) kprintf("\r\e[K" __VA_ARGS__) #else #define DEBUG(...) (void)0 #endif // cosmo's CHECK_EQ() macros are designed to succeed or die // these macros are similar but designed to return on error #define CHECK_MEM(x) \ do { \ if (!CheckMem(__FILE__, __LINE__, x)) { \ ++g_memfails; \ goto OnError; \ } \ } while (0) #define CHECK_SYS(x) \ do { \ if (!CheckSys(__FILE__, __LINE__, x)) { \ ++g_sysfails; \ goto OnError; \ } \ } while (0) #define CHECK_SQL(x) \ do { \ int e = errno; \ if (!CheckSql(__FILE__, __LINE__, x)) { \ ++g_dbfails; \ goto OnError; \ } \ errno = e; \ } while (0) #define CHECK_DB(x) \ do { \ int e = errno; \ if (!CheckDb(__FILE__, __LINE__, x, db)) { \ ++g_dbfails; \ goto OnError; \ } \ errno = e; \ } while (0) // mandatory header for gzip payloads static const uint8_t kGzipHeader[] = { 0x1F, // MAGNUM 0x8B, // MAGNUM 0x08, // CM: DEFLATE 0x00, // FLG: NONE 0x00, // MTIME: NONE 0x00, // 0x00, // 0x00, // 0x00, // XFL kZipOsUnix, // OS }; // 1x1 pixel transparent gif data static const char kPixel[43] = "\x47\x49\x46\x38\x39\x61\x01\x00\x01\x00\x80\x00\x00\xff\xff\xff" "\x00\x00\x00\x21\xf9\x04\x01\x00\x00\x00\x00\x2c\x00\x00\x00\x00" "\x01\x00\x01\x00\x00\x02\x02\x44\x01\x00\x3b"; struct Data { char *p; size_t n; }; struct Asset { int cash; char *path; nsync_mu lock; const char *type; struct Data data; struct Data gzip; struct timespec mtim; char lastmodified[32]; }; struct Blackhole { struct sockaddr_un addr; int fd; } g_blackhole = {{ AF_UNIX, "/var/run/blackhole.sock", }}; // cli flags bool g_integrity; bool g_daemonize; int g_crash_fd; int g_port = PORT; int g_workers = WORKERS; int g_keepalive = KEEPALIVE_MS; struct SortedInts g_whitelisted; thread_local char last_message[INBUF_SIZE]; // lifecycle vars pthread_t g_listener; nsync_time g_started; nsync_counter g_ready; atomic_int g_connections; nsync_note g_shutdown[3]; int g_hilbert[YN * XN][2]; // whitebox metrics atomic_long g_banned; atomic_long g_accepts; atomic_long g_dbfails; atomic_long g_proxied; atomic_long g_messages; atomic_long g_memfails; atomic_long g_sysfails; atomic_long g_rejected; atomic_long g_unproxied; atomic_long g_readfails; atomic_long g_notfounds; atomic_long g_meltdowns; atomic_long g_parsefails; atomic_long g_iprequests; atomic_long g_queuefulls; atomic_long g_htmlclaims; atomic_long g_ratelimits; atomic_long g_emptyclaims; atomic_long g_acceptfails; atomic_long g_badversions; atomic_long g_plainclaims; atomic_long g_imageclaims; atomic_long g_invalidnames; atomic_long g_ipv6forwards; atomic_long g_assetrequests; atomic_long g_claimrequests; atomic_long g_claimsenqueued; atomic_long g_claimsprocessed; atomic_long g_statuszrequests; union TokenBucket { atomic_schar *b; atomic_uint_fast64_t *w; } g_tok; // http worker objects struct Worker { pthread_t th; atomic_int msgcount; atomic_int shutdown; atomic_int connected; struct timespec startread; } *g_worker; // recentworker wakeup struct Recent { nsync_mu mu; nsync_cv cv; } g_recent; // global date header struct Nowish { nsync_mu lock; struct timespec ts; struct tm tm; } g_nowish; // static assets struct Assets { struct Asset index; struct Asset about; struct Asset user; struct Asset score; struct Asset score_hour; struct Asset score_day; struct Asset score_week; struct Asset score_month; struct Asset recent; struct Asset favicon; struct Asset plot[256]; } g_asset; // queues ListenWorker() to HttpWorker() struct Clients { int pos; int count; nsync_mu mu; nsync_cv non_full; nsync_cv non_empty; struct Client { int sock; uint32_t size; struct sockaddr_in addr; } data[SOCK_MAX]; } g_clients; // queues /claim to ClaimWorker() struct Claims { int pos; int count; nsync_mu mu; nsync_cv non_full; nsync_cv non_empty; struct Claim { uint32_t ip; int64_t created; char name[NICK_MAX + 1]; } data[QUEUE_MAX]; } g_claims; long GetTotalRam(void) { struct sysinfo si; si.totalram = 256 * 1024 * 1024; sysinfo(&si); return si.totalram; } // easy string sender ssize_t Write(int fd, const char *s) { return write(fd, s, strlen(s)); } // turns relative timeout into an absolute timeout struct timespec WaitFor(int millis) { return timespec_add(timespec_real(), timespec_frommillis(millis)); } // helper functions for check macro implementation bool CheckMem(const char *file, int line, void *ptr) { if (ptr) return true; kprintf("%s:%d: %P: out of memory: %s\n", file, line, strerror(errno)); return false; } bool CheckSys(const char *file, int line, long rc) { if (rc != -1) return true; kprintf("%s:%d: %P: %s\n", file, line, strerror(errno)); return false; } bool CheckSql(const char *file, int line, int rc) { if (rc == SQLITE_OK) return true; kprintf("%s:%d: %P: %s\n", file, line, sqlite3_errstr(rc)); return false; } bool CheckDb(const char *file, int line, int rc, sqlite3 *db) { if (rc == SQLITE_OK) return true; kprintf("%s:%d: %P: %s: %s\n", file, line, sqlite3_errstr(rc), sqlite3_errmsg(db)); return false; } // if we try to open a WAL database at the same time from multiple // threads then it's likely we'll get a SQLITE_BUSY conflict since // WAL mode does a complicated dance to initialize itself thus all // we need to do is wait a little bit, and use exponential backoff int DbOpen(const char *path, sqlite3 **db) { int i, rc; char sql[128]; rc = sqlite3_open(path, db); if (rc != SQLITE_OK) return rc; if (!IsWindows() && !IsOpenbsd()) { ksnprintf(sql, sizeof(sql), "PRAGMA mmap_size=%ld", GetTotalRam()); rc = sqlite3_exec(*db, sql, 0, 0, 0); if (rc != SQLITE_OK) return rc; } for (i = 0; i < 7; ++i) { rc = sqlite3_exec(*db, "PRAGMA journal_mode=WAL", 0, 0, 0); if (rc == SQLITE_OK) break; if (rc != SQLITE_BUSY) return rc; usleep(1000L << i); } return sqlite3_exec(*db, "PRAGMA synchronous=NORMAL", 0, 0, 0); } int DbStep(sqlite3_stmt *stmt) { int i, rc; for (i = 0; i < 12; ++i) { rc = sqlite3_step(stmt); if (rc == SQLITE_ROW) break; if (rc == SQLITE_DONE) break; if (rc != SQLITE_BUSY) return rc; usleep(1000L << i); } return rc; } // why not make the statement prepare api a little less hairy too int DbPrepare(sqlite3 *db, sqlite3_stmt **stmt, const char *sql) { return sqlite3_prepare_v2(db, sql, -1, stmt, 0); } bool Blackhole(uint32_t ip) { char buf[4]; WRITE32BE(buf, ip); if (sendto(g_blackhole.fd, buf, 4, 0, (struct sockaddr *)&g_blackhole.addr, sizeof(g_blackhole.addr)) == 4) { return true; } else { kprintf("error: sendto(%#s) failed: %s\n", g_blackhole.addr.sun_path, strerror(errno)); return false; } } // validates name registration validity bool IsValidNick(const char *s, size_t n) { size_t i; if (n == -1) n = strlen(s); if (!n) return false; if (n > NICK_MAX) return false; for (i = 0; i < n; ++i) { if (!(isalnum(s[i]) || // s[i] == '@' || // s[i] == '/' || // s[i] == ':' || // s[i] == '.' || // s[i] == '^' || // s[i] == '+' || // s[i] == '!' || // s[i] == '-' || // s[i] == '_' || // s[i] == '*')) { return false; } } return true; } // turn unix timestamp into string the easy way char *FormatUnixHttpDateTime(char *s, int64_t t) { struct tm tm; gmtime_r(&t, &tm); FormatHttpDateTime(s, &tm); return s; } // gmtime_r() does a shocking amount of compute // so we try to handle that globally right here void UpdateNow(void) { int64_t secs; struct tm tm; g_nowish.ts = timespec_real(); secs = g_nowish.ts.tv_sec; gmtime_r(&secs, &tm); //!//!//!//!//!//!//!//!//!//!//!//!//!/ nsync_mu_lock(&g_nowish.lock); g_nowish.tm = tm; nsync_mu_unlock(&g_nowish.lock); //!//!//!//!//!//!//!//!//!//!//!//!//!/ } // the standard strftime() function is dismally slow // this function is non-generalized for just http so // it needs 25 cycles rather than 709 cycles so cool char *FormatDate(char *p) { //////////////////////////////////////// nsync_mu_rlock(&g_nowish.lock); p = FormatHttpDateTime(p, &g_nowish.tm); nsync_mu_runlock(&g_nowish.lock); //////////////////////////////////////// return p; } bool AddClient(struct Clients *q, const struct Client *v, nsync_time dead) { bool wake = false; bool added = false; nsync_mu_lock(&q->mu); while (q->count == ARRAYLEN(q->data)) { if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead, g_shutdown[0])) { break; // must be ETIMEDOUT or ECANCELED } } if (q->count != ARRAYLEN(q->data)) { int i = q->pos + q->count; if (ARRAYLEN(q->data) <= i) i -= ARRAYLEN(q->data); memcpy(q->data + i, v, sizeof(*v)); if (!q->count) wake = true; q->count++; added = true; } nsync_mu_unlock(&q->mu); if (wake) { nsync_cv_broadcast(&q->non_empty); } return added; } int GetClient(struct Clients *q, struct Client *out) { int got = 0; int len = 1; nsync_mu_lock(&q->mu); while (!q->count) { if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu, nsync_time_no_deadline, g_shutdown[1])) { break; // must be ECANCELED } } while (got < len && q->count) { memcpy(out + got, q->data + q->pos, sizeof(*out)); if (q->count == ARRAYLEN(q->data)) { nsync_cv_broadcast(&q->non_full); } ++got; q->pos++; q->count--; if (q->pos == ARRAYLEN(q->data)) { q->pos = 0; } } nsync_mu_unlock(&q->mu); return got; } // inserts ip:name claim into blocking message queue // may be interrupted by absolute deadline // may be cancelled by server shutdown bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) { bool wake = false; bool added = false; nsync_mu_lock(&q->mu); while (q->count == ARRAYLEN(q->data)) { if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead, g_shutdown[1])) { break; // must be ETIMEDOUT or ECANCELED } } if (q->count != ARRAYLEN(q->data)) { int i = q->pos + q->count; if (ARRAYLEN(q->data) <= i) i -= ARRAYLEN(q->data); memcpy(q->data + i, v, sizeof(*v)); if (!q->count) wake = true; q->count++; added = true; } nsync_mu_unlock(&q->mu); if (wake) { nsync_cv_broadcast(&q->non_empty); } return added; } // removes batch of ip:name claims from blocking message queue // has no deadline or cancellation; enqueued must be processed int GetClaims(struct Claims *q, struct Claim *out, int len) { int got = 0; nsync_mu_lock(&q->mu); while (!q->count) { if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu, nsync_time_no_deadline, g_shutdown[2])) { break; // must be ECANCELED } } while (got < len && q->count) { memcpy(out + got, q->data + q->pos, sizeof(*out)); if (q->count == ARRAYLEN(q->data)) { nsync_cv_broadcast(&q->non_full); } ++got; q->pos++; q->count--; if (q->pos == ARRAYLEN(q->data)) { q->pos = 0; } } nsync_mu_unlock(&q->mu); return got; } // parses request uri query string and extracts ?name=value static bool GetNick(char *inbuf, struct HttpMessage *msg, struct Claim *v) { size_t i; struct Url url; void *f[2] = {0}; bool found = false; f[0] = ParseUrl(inbuf + msg->uri.a, msg->uri.b - msg->uri.a, &url, kUrlPlus | kUrlLatin1); f[1] = url.params.p; for (i = 0; i < url.params.n; ++i) { if (SlicesEqual("name", 4, url.params.p[i].key.p, url.params.p[i].key.n) && url.params.p[i].val.p && IsValidNick(url.params.p[i].val.p, url.params.p[i].val.n)) { memcpy(v->name, url.params.p[i].val.p, url.params.p[i].val.n); found = true; break; } } free(f[1]); free(f[0]); return found; } // allocates memory with hardware-accelerated buffer overflow detection // so if it gets hacked it'll at least crash instead of get compromised void *NewSafeBuffer(size_t n) { char *p; long pagesize = sysconf(_SC_PAGESIZE); size_t m = ROUNDUP(n, pagesize); npassert((p = valloc(m + pagesize))); npassert(!mprotect(p + m, pagesize, PROT_NONE)); return p; } // frees memory with hardware-accelerated buffer overflow detection void FreeSafeBuffer(void *p) { long pagesize = sysconf(_SC_PAGESIZE); size_t n = malloc_usable_size(p); size_t m = ROUNDDOWN(n, pagesize); npassert(!mprotect(p, m, PROT_READ | PROT_WRITE)); free(p); } // signals by default get delivered to any random thread // solution is to block every signal possible in threads void BlockSignals(void) { sigset_t mask; sigfillset(&mask); sigdelset(&mask, SIGABRT); sigdelset(&mask, SIGTRAP); sigdelset(&mask, SIGFPE); sigdelset(&mask, SIGBUS); sigdelset(&mask, SIGSEGV); sigdelset(&mask, SIGILL); sigdelset(&mask, SIGXCPU); sigdelset(&mask, SIGXFSZ); sigprocmask(SIG_SETMASK, &mask, 0); } // main thread uses sigusr1 to deliver io cancellations void AllowSigusr1(void) { sigset_t mask; sigfillset(&mask); sigdelset(&mask, SIGUSR1); sigprocmask(SIG_SETMASK, &mask, 0); } char *Statusz(char *p, const char *s, long x) { p = stpcpy(p, s); p = stpcpy(p, ": "); p = FormatInt64(p, x); p = stpcpy(p, "\n"); return p; } // public /statusz endpoint for monitoring server internals void ServeStatusz(int client, char *outbuf) { char *p; struct rusage ru; struct timespec now; now = timespec_real(); p = outbuf; p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Cache-Control: max-age=0, must-revalidate\r\n" "Connection: close\r\n" "\r\n"); p = Statusz(p, "qps", g_messages / MAX(1, timespec_sub(now, g_started).tv_sec)); p = Statusz(p, "started", g_started.tv_sec); p = Statusz(p, "now", now.tv_sec); p = Statusz(p, "messages", g_messages); p = Statusz(p, "connections", g_connections); p = Statusz(p, "banned", g_banned); p = Statusz(p, "workers", g_workers); p = Statusz(p, "accepts", g_accepts); p = Statusz(p, "dbfails", g_dbfails); p = Statusz(p, "proxied", g_proxied); p = Statusz(p, "memfails", g_memfails); p = Statusz(p, "sysfails", g_sysfails); p = Statusz(p, "rejected", g_rejected); p = Statusz(p, "unproxied", g_unproxied); p = Statusz(p, "readfails", g_readfails); p = Statusz(p, "notfounds", g_notfounds); p = Statusz(p, "meltdowns", g_meltdowns); p = Statusz(p, "parsefails", g_parsefails); p = Statusz(p, "iprequests", g_iprequests); p = Statusz(p, "queuefulls", g_queuefulls); p = Statusz(p, "htmlclaims", g_htmlclaims); p = Statusz(p, "ratelimits", g_ratelimits); p = Statusz(p, "emptyclaims", g_emptyclaims); p = Statusz(p, "acceptfails", g_acceptfails); p = Statusz(p, "badversions", g_badversions); p = Statusz(p, "plainclaims", g_plainclaims); p = Statusz(p, "imageclaims", g_imageclaims); p = Statusz(p, "invalidnames", g_invalidnames); p = Statusz(p, "ipv6forwards", g_ipv6forwards); p = Statusz(p, "assetrequests", g_assetrequests); p = Statusz(p, "claimrequests", g_claimrequests); p = Statusz(p, "claimsenqueued", g_claimsenqueued); p = Statusz(p, "claimsprocessed", g_claimsprocessed); p = Statusz(p, "statuszrequests", g_statuszrequests); if (!getrusage(RUSAGE_SELF, &ru)) { p = Statusz(p, "ru_utime.tv_sec", ru.ru_utime.tv_sec); p = Statusz(p, "ru_utime.tv_usec", ru.ru_utime.tv_usec); p = Statusz(p, "ru_stime.tv_sec", ru.ru_stime.tv_sec); p = Statusz(p, "ru_stime.tv_usec", ru.ru_stime.tv_usec); p = Statusz(p, "ru_maxrss", ru.ru_maxrss); p = Statusz(p, "ru_ixrss", ru.ru_ixrss); p = Statusz(p, "ru_idrss", ru.ru_idrss); p = Statusz(p, "ru_isrss", ru.ru_isrss); p = Statusz(p, "ru_minflt", ru.ru_minflt); p = Statusz(p, "ru_majflt", ru.ru_majflt); p = Statusz(p, "ru_nswap", ru.ru_nswap); p = Statusz(p, "ru_inblock", ru.ru_inblock); p = Statusz(p, "ru_oublock", ru.ru_oublock); p = Statusz(p, "ru_msgsnd", ru.ru_msgsnd); p = Statusz(p, "ru_msgrcv", ru.ru_msgrcv); p = Statusz(p, "ru_nsignals", ru.ru_nsignals); p = Statusz(p, "ru_nvcsw", ru.ru_nvcsw); p = Statusz(p, "ru_nivcsw", ru.ru_nivcsw); } write(client, outbuf, p - outbuf); } void *ListenWorker(void *arg) { int server; int no = 0; int yes = 1; int fastopen = 5; struct Client client; struct timeval timeo = {g_keepalive / 1000, g_keepalive % 1000}; struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(g_port)}; AllowSigusr1(); pthread_setname_np(pthread_self(), "Listener"); CHECK_NE(-1, (server = socket(AF_INET, SOCK_STREAM, 0))); setsockopt(server, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)); setsockopt(server, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo)); setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); setsockopt(server, SOL_TCP, TCP_FASTOPEN, &fastopen, sizeof(fastopen)); setsockopt(server, SOL_TCP, TCP_QUICKACK, &no, sizeof(no)); setsockopt(server, SOL_TCP, TCP_CORK, &no, sizeof(no)); setsockopt(server, SOL_TCP, TCP_NODELAY, &yes, sizeof(yes)); bind(server, (struct sockaddr *)&addr, sizeof(addr)); CHECK_NE(-1, listen(server, 1)); while (!nsync_note_is_notified(g_shutdown[0])) { client.size = sizeof(client.addr); client.sock = accept(server, (struct sockaddr *)&client.addr, &client.size); if (client.sock == -1) { if (errno != EAGAIN) { // spinning on SO_RCVTIMEO ++g_acceptfails; } continue; } if (!AddClient(&g_clients, &client, WaitFor(ACCEPT_DEADLINE_MS))) { ++g_rejected; LOG("503 Accept Queue Full\n"); Write(client.sock, "HTTP/1.1 503 Accept Queue Full\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n" "\r\n" "Accept Queue Full\n"); close(client.sock); } } close(server); nsync_note_notify(g_shutdown[1]); return 0; } // make thousands of http client handler threads // load balance incoming connections for port 8080 across all threads // hangup on any browser clients that lag for more than a few seconds void *HttpWorker(void *arg) { struct Client client; int id = (intptr_t)arg; char *msgbuf = gc(xmalloc(MSG_BUF)); char *inbuf = NewSafeBuffer(INBUF_SIZE); char *outbuf = NewSafeBuffer(OUTBUF_SIZE); struct HttpMessage *msg = gc(xcalloc(1, sizeof(struct HttpMessage))); BlockSignals(); pthread_setname_np(pthread_self(), gc(xasprintf("HTTP%d", id))); // connection loop while (GetClient(&g_clients, &client)) { struct Data d; ssize_t got, sent; uint32_t ip, clientip; int tok, inmsglen, outmsglen; char ipbuf[32], *p, *q, cashbuf[64]; clientip = ntohl(client.addr.sin_addr.s_addr); g_worker[id].connected = true; g_worker[id].msgcount = 0; ++g_accepts; ++g_connections; // simple http/1.1 message loop // let's assume we're behind a well-behaved frontend // each read() should give us just *one* HTTP message // if we get less than one message, we drop connection // if we get more than one message, we Connection: close // let's not bother with cray proto stuff like 100-expect do { struct Asset *a; bool comp, ipv6; // wait for http message // this may be cancelled by sigusr1 AllowSigusr1(); DestroyHttpMessage(msg); InitHttpMessage(msg, kHttpRequest); g_worker[id].startread = timespec_real(); got = read(client.sock, inbuf, INBUF_SIZE - 1); if (got >= 0) { memcpy(last_message, inbuf, got); last_message[got] = 0; } if (got <= 0) { ++g_readfails; break; } BlockSignals(); // parse http message // we're only doing one-shot parsing right now if ((inmsglen = ParseHttpMessage(msg, inbuf, got, INBUF_SIZE)) <= 0) { ++g_parsefails; break; } ++g_messages; ++g_worker[id].msgcount; ipv6 = false; ip = clientip; // get client address from frontend if (HasHeader(kHttpXForwardedFor)) { if (!IsLoopbackIp(clientip) && !IsPrivateIp(clientip)) { LOG("Got X-Forwarded-For from untrusted IPv4 client address " "%hhu.%hhu.%hhu.%hhu\n", clientip >> 24, clientip >> 16, clientip >> 8, clientip); ipv6 = false; ip = clientip; ++g_unproxied; } else if (ParseForwarded(HeaderData(kHttpXForwardedFor), HeaderLength(kHttpXForwardedFor), &ip, 0) != -1) { ipv6 = false; ++g_proxied; } else { ipv6 = true; ip = clientip; ++g_ipv6forwards; ++g_proxied; } } else { ipv6 = false; ip = clientip; ++g_unproxied; } ksnprintf(ipbuf, sizeof(ipbuf), "%hhu.%hhu.%hhu.%hhu", ip >> 24, ip >> 16, ip >> 8, ip); if (UrlStartsWith("/plot/") && (_rand64() % 256)) { goto SkipSecurity; } if (!ipv6 && !ContainsInt(&g_whitelisted, ip) && (tok = AcquireToken(g_tok.b, ip, TB_CIDR)) < 32) { if (tok > 4) { LOG("%s rate limiting client\n", ipbuf, msg->version); Write(client.sock, "HTTP/1.1 429 Too Many Requests\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n" "\r\n" "429 Too Many Requests\n"); } else { Blackhole(ip); ++g_banned; } ++g_ratelimits; break; } SkipSecurity: // we don't support http/1.0 and http/0.9 right now if (msg->version != 11) { LOG("%s used unsupported http/%d version\n", ipbuf, msg->version); Write(client.sock, "HTTP/1.1 505 HTTP Version Not Supported\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n" "\r\n" "HTTP Version Not Supported\n"); ++g_badversions; break; } // access log char method[9] = {0}; WRITE64LE(method, msg->method); LOG("%6P %16s %s %.*s %.*s %.*s %#.*s\n", ipbuf, method, msg->uri.b - msg->uri.a, inbuf + msg->uri.a, HeaderLength(kHttpCfIpcountry), HeaderData(kHttpCfIpcountry), HeaderLength(kHttpSecChUaPlatform), HeaderData(kHttpSecChUaPlatform), HeaderLength(kHttpReferer), HeaderData(kHttpReferer)); // export monitoring data if (UrlEqual("/statusz")) { ServeStatusz(client.sock, outbuf); ++g_statuszrequests; break; } // asset routing if (UrlEqual("/") || UrlStartsWith("/index.html")) { a = &g_asset.index; } else if (UrlStartsWith("/favicon.ico")) { a = &g_asset.favicon; } else if (UrlStartsWith("/about.html")) { a = &g_asset.about; } else if (UrlStartsWith("/user.html")) { a = &g_asset.user; } else if (UrlStartsWith("/score/hour")) { a = &g_asset.score_hour; } else if (UrlStartsWith("/score/day")) { a = &g_asset.score_day; } else if (UrlStartsWith("/score/week")) { a = &g_asset.score_week; } else if (UrlStartsWith("/score/month")) { a = &g_asset.score_month; } else if (UrlStartsWith("/score")) { a = &g_asset.score; } else if (UrlStartsWith("/recent")) { a = &g_asset.recent; } else if (UrlStartsWith("/plot/")) { int i, block = 0; for (i = msg->uri.a + 6; i < msg->uri.b && isdigit(inbuf[i]); ++i) { block *= 10; block += inbuf[i] - '0'; block &= 255; } a = g_asset.plot + block; } else { a = 0; } // assert serving if (a) { struct iovec iov[2]; ++g_assetrequests; comp = a->gzip.n < a->data.n && HeaderHas(msg, inbuf, kHttpAcceptEncoding, "gzip", 4); //////////////////////////////////////// nsync_mu_rlock(&a->lock); if (HasHeader(kHttpIfModifiedSince) && a->mtim.tv_sec <= ParseHttpDateTime(HeaderData(kHttpIfModifiedSince), HeaderLength(kHttpIfModifiedSince))) { p = stpcpy(outbuf, "HTTP/1.1 304 Not Modified\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept-Encoding\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\nLast-Modified: "); p = stpcpy(p, a->lastmodified); p = stpcpy(p, "\r\nContent-Type: "); p = stpcpy(p, a->type); p = stpcpy(p, "\r\nCache-Control: "); ksnprintf(cashbuf, sizeof(cashbuf), "max-age=%d, must-revalidate", a->cash); p = stpcpy(p, cashbuf); p = stpcpy(p, "\r\n\r\n"); outmsglen = p - outbuf; sent = write(client.sock, outbuf, outmsglen); } else { p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept-Encoding\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\nLast-Modified: "); p = stpcpy(p, a->lastmodified); p = stpcpy(p, "\r\nContent-Type: "); p = stpcpy(p, a->type); p = stpcpy(p, "\r\nCache-Control: "); ksnprintf(cashbuf, sizeof(cashbuf), "max-age=%d, must-revalidate", a->cash); p = stpcpy(p, cashbuf); if (comp) p = stpcpy(p, "\r\nContent-Encoding: gzip"); p = stpcpy(p, "\r\nContent-Length: "); d = comp ? a->gzip : a->data; p = FormatInt32(p, d.n); p = stpcpy(p, "\r\n\r\n"); iov[0].iov_base = outbuf; iov[0].iov_len = p - outbuf; iov[1].iov_base = d.p; iov[1].iov_len = msg->method == kHttpHead ? 0 : d.n; outmsglen = iov[0].iov_len + iov[1].iov_len; sent = writev(client.sock, iov, 2); } nsync_mu_runlock(&a->lock); //////////////////////////////////////// } else if (UrlStartsWith("/ip")) { // what is my ip endpoint ++g_iprequests; if (!ipv6) { p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept\r\n" "Content-Type: text/plain\r\n" "Cache-Control: max-age=3600, private\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\nContent-Length: "); p = FormatInt32(p, strlen(ipbuf)); p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, ipbuf); outmsglen = p - outbuf; sent = write(client.sock, outbuf, outmsglen); } else { Ipv6Warning: DEBUG("%.*s via %s: 400 Need IPv4\n", HeaderLength(kHttpXForwardedFor), HeaderData(kHttpXForwardedFor), ipbuf); q = "IPv4 Games only supports IPv4 right now"; p = stpcpy(outbuf, "HTTP/1.1 400 Need IPv4\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept\r\n" "Content-Type: text/plain\r\n" "Cache-Control: private\r\n" "Connection: close\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\nContent-Length: "); p = FormatInt32(p, strlen(q)); p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, q); outmsglen = p - outbuf; sent = write(client.sock, outbuf, p - outbuf); break; } } else if (UrlStartsWith("/claim")) { // ip:name registration endpoint ++g_claimrequests; if (ipv6) goto Ipv6Warning; struct Claim v = {.ip = ip, .created = g_nowish.ts.tv_sec}; if (GetNick(inbuf, msg, &v)) { if (AddClaim(&g_claims, &v, timespec_add(timespec_real(), timespec_frommillis(CLAIM_DEADLINE_MS)))) { ++g_claimsenqueued; DEBUG("%s claimed by %s\n", ipbuf, v.name); if (HasHeader(kHttpAccept) && (HeaderHas(msg, inbuf, kHttpAccept, "image/*", 7) || HeaderHas(msg, inbuf, kHttpAccept, "image/gif", 9))) { ++g_imageclaims; p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept\r\n" "Cache-Control: private\r\n" "Content-Type: image/gif\r\n" "Connection: close\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\nContent-Length: "); p = FormatInt32(p, sizeof(kPixel)); p = stpcpy(p, "\r\n\r\n"); p = mempcpy(p, kPixel, sizeof(kPixel)); } else if (HasHeader(kHttpAccept) && HeaderHas(msg, inbuf, kHttpAccept, "text/plain", 10) && !HeaderHas(msg, inbuf, kHttpAccept, "text/html", 9)) { ++g_plainclaims; ksnprintf(msgbuf, MSG_BUF, "The land at %s was claimed for %s\n", ipbuf, v.name); q = msgbuf; p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept\r\n" "Cache-Control: private\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\nContent-Length: "); p = FormatInt32(p, strlen(q)); p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, q); } else if (!HasHeader(kHttpAccept) || (HeaderHas(msg, inbuf, kHttpAccept, "text/html", 9) || HeaderHas(msg, inbuf, kHttpAccept, "text/*", 6) || HeaderHas(msg, inbuf, kHttpAccept, "*/*", 3))) { ++g_htmlclaims; ksnprintf(msgbuf, MSG_BUF, "\n" "The land at %s was claimed for %s.\n" "\n" "The land at %s was claimed for %s.\n" "

\nBack to homepage\n", ipbuf, v.name, ipbuf, v.name, v.name); q = msgbuf; p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept\r\n" "Cache-Control: private\r\n" "Content-Type: text/html\r\n" "Connection: close\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\nContent-Length: "); p = FormatInt32(p, strlen(q)); p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, q); } else { ++g_emptyclaims; p = stpcpy(outbuf, "HTTP/1.1 204 No Content\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept\r\n" "Cache-Control: private\r\n" "Content-Length: 0\r\n" "Connection: close\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\n\r\n"); } outmsglen = p - outbuf; sent = write(client.sock, outbuf, p - outbuf); break; } else { LOG("%s: 503 Claims Queue Full\n", ipbuf); Write(client.sock, "HTTP/1.1 503 Claims Queue Full\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n" "\r\n" "Claims Queue Full\n"); ++g_queuefulls; break; } } else { ++g_invalidnames; LOG("%s: 400 invalid name\n", ipbuf); q = "invalid name"; p = stpcpy(outbuf, "HTTP/1.1 400 Invalid Name\r\n" STANDARD_RESPONSE_HEADERS "Content-Type: text/plain\r\n" "Cache-Control: private\r\n" "Connection: close\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\nContent-Length: "); p = FormatInt32(p, strlen(q)); p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, q); outmsglen = p - outbuf; sent = write(client.sock, outbuf, p - outbuf); break; } } else { // default endpoint ++g_notfounds; LOG("%s: 400 not found %#.*s\n", ipbuf, msg->uri.b - msg->uri.a, inbuf + msg->uri.a); q = "\r\n" "404 not found\r\n" "

404 not found

\r\n"; p = stpcpy(outbuf, "HTTP/1.1 404 Not Found\r\n" STANDARD_RESPONSE_HEADERS "Content-Type: text/html; charset=utf-8\r\n" "Date: "); p = FormatDate(p); p = stpcpy(p, "\r\nContent-Length: "); p = FormatInt32(p, strlen(q)); p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, q); outmsglen = p - outbuf; sent = write(client.sock, outbuf, p - outbuf); } // if the client isn't pipelining and write() wrote the full // amount, then since we sent the content length and checked // that the client didn't attach a payload, we are so synced // thus we can safely process more messages } while (got == inmsglen && // sent == outmsglen && // !HasHeader(kHttpContentLength) && // !HasHeader(kHttpTransferEncoding) && // !HeaderEqualCase(kHttpConnection, "close") && // (msg->method == kHttpGet || // msg->method == kHttpHead) && // 1. / g_workers * g_connections < CONCERN_LOAD && // !nsync_note_is_notified(g_shutdown[1])); DestroyHttpMessage(msg); close(client.sock); g_worker[id].connected = false; --g_connections; } LOG("HttpWorker #%d exiting", id); g_worker[id].shutdown = true; FreeSafeBuffer(outbuf); FreeSafeBuffer(inbuf); return 0; } // helper to precompress gzip responses in background struct Data Gzip(struct Data data) { char *p; void *tmp; uint32_t crc; char footer[8]; z_stream zs = {0}; struct Data res = {0}; crc = crc32_z(0, data.p, data.n); WRITE32LE(footer + 0, crc); WRITE32LE(footer + 4, data.n); if (Z_OK != deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -MAX_WBITS, DEF_MEM_LEVEL, Z_DEFAULT_STRATEGY)) { return (struct Data){0}; } zs.next_in = (const Bytef *)data.p; zs.avail_in = data.n; zs.avail_out = compressBound(data.n); if (!(zs.next_out = tmp = malloc(zs.avail_out))) { deflateEnd(&zs); return (struct Data){0}; } CHECK_EQ(Z_STREAM_END, deflate(&zs, Z_FINISH)); CHECK_EQ(Z_OK, deflateEnd(&zs)); res.n = sizeof(kGzipHeader) + zs.total_out + sizeof(footer); if (!(p = res.p = malloc(res.n))) { free(tmp); return (struct Data){0}; } p = mempcpy(p, kGzipHeader, sizeof(kGzipHeader)); p = mempcpy(p, tmp, zs.total_out); p = mempcpy(p, footer, sizeof(footer)); free(tmp); return res; } // slurps asset off disk once during startup struct Asset LoadAsset(const char *path, const char *type, int cash) { struct stat st; struct Asset a = {0}; CHECK_EQ(0, stat(path, &st)); CHECK_NOTNULL((a.data.p = xslurp(path, &a.data.n))); a.type = type; a.cash = cash; CHECK_NOTNULL((a.path = strdup(path))); a.mtim = st.st_mtim; CHECK_NOTNULL((a.gzip = Gzip(a.data)).p); FormatUnixHttpDateTime(a.lastmodified, a.mtim.tv_sec); return a; } // reslurps asset off disk if its mtim changed bool ReloadAsset(struct Asset *a) { int fd; void *f[2]; ssize_t rc; struct stat st; char lastmodified[32]; struct Data data = {0}; struct Data gzip = {0}; CHECK_SYS((fd = open(a->path, O_RDONLY))); CHECK_SYS(fstat(fd, &st)); if (timespec_cmp(st.st_mtim, a->mtim) > 0) { FormatUnixHttpDateTime(lastmodified, st.st_mtim.tv_sec); CHECK_MEM((data.p = malloc(st.st_size))); CHECK_SYS((rc = read(fd, data.p, st.st_size))); data.n = st.st_size; if (rc != st.st_size) goto OnError; CHECK_MEM((gzip = Gzip(data)).p); //!//!//!//!//!//!//!//!//!//!//!//!//!/ nsync_mu_lock(&a->lock); f[0] = a->data.p; f[1] = a->gzip.p; a->data = data; a->gzip = gzip; a->mtim = st.st_mtim; memcpy(a->lastmodified, lastmodified, 32); nsync_mu_unlock(&a->lock); //!//!//!//!//!//!//!//!//!//!//!//!//!/ free(f[0]); free(f[1]); } close(fd); return true; OnError: free(data.p); free(gzip.p); close(fd); return false; } void FreeAsset(struct Asset *a) { free(a->path); free(a->data.p); free(a->gzip.p); } void IgnoreSignal(int sig) { // so worker i/o routines may eintr safely } // asynchronous handler of sigint, sigterm, and sighup signals // this handler is always invoked from within the main thread, // because our helper and worker threads always block signals. void OnCtrlC(int sig) { if (!nsync_note_is_notified(g_shutdown[0])) { LOG("Received %s shutting down...\n", strsignal(sig)); nsync_note_notify(g_shutdown[0]); } else { // there's no way to deliver signals to workers atomically, unless // we pay the cost of ppoll() which isn't necessary in this design // so if a user smashes that ctrl-c then we tkill the workers more LOG("Received %s again so sending another volley...\n", strsignal(sig)); for (int i = 0; i < g_workers; ++i) { pthread_kill(g_listener, SIGUSR1); if (!g_worker[i].shutdown) { pthread_kill(g_worker[i].th, SIGUSR1); } } } } // parses cli arguments void GetOpts(int argc, char *argv[]) { int opt; int64_t ip; while ((opt = getopt(argc, argv, GETOPTS)) != -1) { switch (opt) { case 'i': g_integrity = true; break; case 'd': g_daemonize = true; break; case 'p': g_port = atoi(optarg); break; case 'w': g_workers = atoi(optarg); break; case 'k': g_keepalive = atoi(optarg); break; case 'v': ++__log_level; break; case 'W': if ((ip = ParseIp(optarg, -1)) != -1) { if (InsertInt(&g_whitelisted, ip, true)) { LOG("whitelisted %s", optarg); } } else { kprintf("error: could not parse -w %#s IP address\n", optarg); _Exit(1); } break; case '?': write(1, USAGE, sizeof(USAGE) - 1); exit(0); default: write(2, USAGE, sizeof(USAGE) - 1); exit(64); } } } // atomically swaps out asset with newer version void Update(struct Asset *a, bool gen(struct Asset *, long, long), long x, long y) { void *f[2]; struct Asset t; if (gen(&t, x, y)) { //!//!//!//!//!//!//!//!//!//!//!//!//!/ nsync_mu_lock(&a->lock); f[0] = a->data.p; f[1] = a->gzip.p; a->data = t.data; a->gzip = t.gzip; a->mtim = t.mtim; a->type = t.type; a->cash = t.cash; memcpy(a->lastmodified, t.lastmodified, 32); nsync_mu_unlock(&a->lock); //!//!//!//!//!//!//!//!//!//!//!//!//!/ free(f[0]); free(f[1]); } } // generator function for the big board bool GenerateScore(struct Asset *out, long secs, long cash) { int rc; char *sb = 0; sqlite3 *db = 0; size_t sblen = 0; struct Asset a = {0}; sqlite3_stmt *stmt = 0; bool namestate = false; char name1[NICK_MAX + 1] = {0}; char name2[NICK_MAX + 1]; DEBUG("GenerateScore %ld\n", secs); a.type = "application/json"; a.cash = cash; a.mtim = timespec_real(); FormatUnixHttpDateTime(a.lastmodified, a.mtim.tv_sec); CHECK_SYS(appends(&a.data.p, "{\n")); CHECK_SYS(appendf(&a.data.p, "\"now\":[%ld,%ld],\n", a.mtim.tv_sec, a.mtim.tv_nsec)); CHECK_SYS(appends(&a.data.p, "\"score\":{\n")); CHECK_SQL(DbOpen("db.sqlite3", &db)); if (secs == -1) { CHECK_DB(DbPrepare(db, &stmt, "SELECT nick, (ip >> 24), COUNT(*)\n" "FROM land\n" "GROUP BY nick, (ip >> 24)")); } else { CHECK_DB(DbPrepare(db, &stmt, "SELECT nick, (ip >> 24), COUNT(*)\n" " FROM land\n" "WHERE created NOT NULL\n" " AND created >= ?1\n" "GROUP BY nick, (ip >> 24)")); CHECK_DB(sqlite3_bind_int64(stmt, 1, a.mtim.tv_sec - secs)); } // be sure to always use transactions with sqlite as in always // otherwise.. you can use --strace to see the fcntl bloodbath CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); while ((rc = DbStep(stmt)) != SQLITE_DONE) { if (rc != SQLITE_ROW) CHECK_DB(rc); strlcpy(name2, (void *)sqlite3_column_text(stmt, 0), sizeof(name2)); if (!IsValidNick(name2, -1)) continue; if (strcmp(name1, name2)) { // name changed if (namestate) CHECK_SYS(appends(&a.data.p, "],\n")); namestate = true; CHECK_SYS(appendf( &a.data.p, "\"%s\":[\n", EscapeJsStringLiteral(&sb, &sblen, strcpy(name1, name2), -1, 0))); } else { // name repeated CHECK_SYS(appends(&a.data.p, ",\n")); } CHECK_SYS(appendf(&a.data.p, " [%ld,%ld]", sqlite3_column_int64(stmt, 1), sqlite3_column_int64(stmt, 2))); } CHECK_SQL(sqlite3_exec(db, "END TRANSACTION", 0, 0, 0)); if (namestate) CHECK_SYS(appends(&a.data.p, "]\n")); CHECK_SYS(appends(&a.data.p, "}}\n")); CHECK_DB(sqlite3_finalize(stmt)); CHECK_SQL(sqlite3_close(db)); a.data.n = appendz(a.data.p).i; a.gzip = Gzip(a.data); free(sb); *out = a; return true; OnError: sqlite3_finalize(stmt); sqlite3_close(db); free(a.data.p); free(sb); return false; } // generator function for the big board bool GeneratePlot(struct Asset *out, long block, long cash) { _Static_assert(IS2POW(XN * YN), "area must be 2-power"); _Static_assert(XN == YN, "hilbert algorithm needs square"); int rc, out_len; sqlite3 *db = 0; struct Asset a = {0}; unsigned char *rgba; sqlite3_stmt *stmt = 0; unsigned x, y, i, ip, area, mask, clump; DEBUG("GeneratePlot %ld\n", block); a.type = "image/png"; a.cash = cash; a.mtim = timespec_real(); FormatUnixHttpDateTime(a.lastmodified, a.mtim.tv_sec); CHECK_MEM((rgba = calloc(4, YN * XN))); for (y = 0; y < YN; ++y) { for (x = 0; x < XN; ++x) { rgba[y * XN * 4 + x * 4 + 0] = 255; rgba[y * XN * 4 + x * 4 + 1] = 255; rgba[y * XN * 4 + x * 4 + 2] = 255; } } CHECK_SQL(DbOpen("db.sqlite3", &db)); CHECK_DB(DbPrepare(db, &stmt, "SELECT ip\n" " FROM land\n" "WHERE ip >= ?1\n" " AND ip <= ?2")); CHECK_DB(sqlite3_bind_int64(stmt, 1, block << 24 | 0x000000)); CHECK_DB(sqlite3_bind_int64(stmt, 2, block << 24 | 0xffffff)); CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); area = XN * YN; mask = area - 1; clump = 32 - bsr(area) - 8; while ((rc = DbStep(stmt)) != SQLITE_DONE) { if (rc != SQLITE_ROW) CHECK_DB(rc); ip = sqlite3_column_int64(stmt, 0); i = (ip >> clump) & mask; y = g_hilbert[i][0]; x = g_hilbert[i][1]; if (rgba[y * XN * 4 + x * 4 + 3] < 255) { ++rgba[y * XN * 4 + x * 4 + 3]; } } CHECK_SQL(sqlite3_exec(db, "END TRANSACTION", 0, 0, 0)); CHECK_DB(sqlite3_finalize(stmt)); CHECK_SQL(sqlite3_close(db)); a.data.p = (char *)stbi_write_png_to_mem(rgba, XN * 4, XN, YN, 4, &out_len); a.data.n = out_len; a.gzip = Gzip(a.data); free(rgba); *out = a; return true; OnError: sqlite3_finalize(stmt); sqlite3_close(db); free(a.data.p); free(rgba); return false; } // single thread for regenerating the user scores json void *ScoreWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreAll"); LOG("%P Score started\n"); long wait = SCORE_UPDATE_MS; Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #1 do { Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait)); } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("Score exiting\n"); return 0; } // single thread for regenerating the user scores json void *ScoreHourWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreHour"); LOG("%P ScoreHour started\n"); long secs = 60L * 60; long wait = SCORE_H_UPDATE_MS; Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #2 do { Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait)); } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("ScoreHour exiting\n"); return 0; } // single thread for regenerating the user scores json void *ScoreDayWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreDay"); LOG("%P ScoreDay started\n"); long secs = 60L * 60 * 24; long wait = SCORE_D_UPDATE_MS; Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #3 do { Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait)); } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("ScoreDay exiting\n"); return 0; } // single thread for regenerating the user scores json void *ScoreWeekWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreWeek"); LOG("%P ScoreWeek started\n"); long secs = 60L * 60 * 24 * 7; long wait = SCORE_W_UPDATE_MS; Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #4 do { Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait)); } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("ScoreWeek exiting\n"); return 0; } // single thread for regenerating the user scores json void *ScoreMonthWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreMonth"); LOG("%P ScoreMonth started\n"); long secs = 60L * 60 * 24 * 30; long wait = SCORE_M_UPDATE_MS; Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #5 do { Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait)); } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("ScoreMonth exiting\n"); return 0; } // single thread for regenerating /8 cell background image charts void *PlotWorker(void *arg) { long i, wait; BlockSignals(); pthread_setname_np(pthread_self(), "Plotter"); LOG("%P Plotter started\n"); wait = PLOTS_UPDATE_MS; for (i = 0; i < 256; ++i) { Update(g_asset.plot + i, GeneratePlot, i, MS2CASH(wait)); } nsync_counter_add(g_ready, -1); // #6 do { for (i = 0; i < 256; ++i) { Update(g_asset.plot + i, GeneratePlot, i, MS2CASH(wait)); } } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("Plotter exiting\n"); return 0; } // thread for realtime json generation of recent successful claims void *RecentWorker(void *arg) { bool once; void *f[2]; int rc, err; sqlite3 *db; char *sb = 0; size_t sblen = 0; sqlite3_stmt *stmt; struct Asset *a, t; bool warmedup = false; BlockSignals(); pthread_setname_np(pthread_self(), "RecentWorker"); LOG("%P RecentWorker started\n"); StartOver: db = 0; stmt = 0; bzero(&t, sizeof(t)); CHECK_SQL(DbOpen("db.sqlite3", &db)); CHECK_DB(DbPrepare(db, &stmt, "SELECT ip, nick, created\n" "FROM land\n" "WHERE created NOT NULL\n" "ORDER BY created DESC\n" "LIMIT 50")); do { // regenerate json t.mtim = timespec_real(); FormatUnixHttpDateTime(t.lastmodified, t.mtim.tv_sec); CHECK_SYS(appends(&t.data.p, "{\n")); CHECK_SYS(appendf(&t.data.p, "\"now\":[%ld,%ld],\n", t.mtim.tv_sec, t.mtim.tv_nsec)); CHECK_SYS(appends(&t.data.p, "\"recent\":[\n")); CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); for (once = false; (rc = DbStep(stmt)) != SQLITE_DONE; once = true) { if (rc != SQLITE_ROW) CHECK_SQL(rc); if (once) CHECK_SYS(appends(&t.data.p, ",\n")); CHECK_SYS( appendf(&t.data.p, "[%ld,\"%s\",%ld]", sqlite3_column_int64(stmt, 0), EscapeJsStringLiteral( &sb, &sblen, (void *)sqlite3_column_text(stmt, 1), -1, 0), sqlite3_column_int64(stmt, 2))); } CHECK_SQL(sqlite3_reset(stmt)); CHECK_SQL(sqlite3_exec(db, "END TRANSACTION", 0, 0, 0)); CHECK_SYS(appends(&t.data.p, "]}\n")); t.data.n = appendz(t.data.p).i; CHECK_MEM((t.gzip = Gzip(t.data)).p); // deploy json a = &g_asset.recent; //!//!//!//!//!//!//!//!//!//!//!//!//!/ nsync_mu_lock(&a->lock); f[0] = a->data.p; f[1] = a->gzip.p; a->data = t.data; a->gzip = t.gzip; a->mtim = t.mtim; a->type = "application/json"; a->cash = 0; memcpy(a->lastmodified, t.lastmodified, 32); nsync_mu_unlock(&a->lock); //!//!//!//!//!//!//!//!//!//!//!//!//!/ bzero(&t, sizeof(t)); free(f[0]); free(f[1]); // handle startup condition if (!warmedup) { nsync_counter_add(g_ready, -1); // #7 warmedup = true; } // wait for wakeup or cancel nsync_mu_lock(&g_recent.mu); err = nsync_cv_wait_with_deadline(&g_recent.cv, &g_recent.mu, nsync_time_no_deadline, g_shutdown[1]); nsync_mu_unlock(&g_recent.mu); } while (err != ECANCELED); CHECK_DB(sqlite3_finalize(stmt)); CHECK_SQL(sqlite3_close(db)); LOG("RecentWorker exiting\n"); free(sb); return 0; OnError: sqlite3_finalize(stmt); sqlite3_close(db); free(t.data.p); free(t.gzip.p); goto StartOver; } // single thread for inserting batched claims into the database // this helps us avoid over 9000 threads having fcntl bloodbath void *ClaimWorker(void *arg) { sqlite3 *db; int i, n, rc; long processed; sqlite3_stmt *stmt; bool warmedup = false; struct Claim *v = gc(xcalloc(BATCH_MAX, sizeof(struct Claim))); BlockSignals(); pthread_setname_np(pthread_self(), "ClaimWorker"); LOG("%P ClaimWorker started\n"); StartOver: db = 0; stmt = 0; CHECK_SQL(DbOpen("db.sqlite3", &db)); CHECK_DB(DbPrepare(db, &stmt, "INSERT INTO land (ip, nick, created)\n" "VALUES (?1, ?2, ?3)\n" "ON CONFLICT (ip) DO\n" "UPDATE SET (nick, created) = (?2, ?3)\n" " WHERE nick != ?2\n" " OR created IS NULL\n" " OR ?3 - created > 3600")); if (!warmedup) { nsync_counter_add(g_ready, -1); // #8 warmedup = true; } while ((n = GetClaims(&g_claims, v, BATCH_MAX))) { processed = 0; CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); for (i = 0; i < n; ++i) { CHECK_DB(sqlite3_bind_int64(stmt, 1, v[i].ip)); CHECK_DB(sqlite3_bind_text(stmt, 2, v[i].name, -1, SQLITE_TRANSIENT)); CHECK_DB(sqlite3_bind_int64(stmt, 3, v[i].created)); CHECK_DB((rc = DbStep(stmt)) == SQLITE_DONE ? SQLITE_OK : rc); CHECK_DB(sqlite3_reset(stmt)); ++processed; } CHECK_SQL(sqlite3_exec(db, "COMMIT TRANSACTION", 0, 0, 0)); atomic_fetch_add(&g_claimsprocessed, processed); DEBUG("Committed %d claims\n", n); // wake up RecentWorker() nsync_mu_lock(&g_recent.mu); nsync_cv_signal(&g_recent.cv); nsync_mu_unlock(&g_recent.mu); } CHECK_DB(sqlite3_finalize(stmt)); CHECK_SQL(sqlite3_close(db)); LOG("ClaimWorker exiting\n"); return 0; OnError: sqlite3_finalize(stmt); sqlite3_close(db); goto StartOver; } // single thread for computing HTTP Date header void *NowWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "NowWorker"); LOG("%P NowWorker started\n"); UpdateNow(); nsync_counter_add(g_ready, -1); // #9 for (struct timespec ts = {timespec_real().tv_sec};; ++ts.tv_sec) { if (!nsync_note_wait(g_shutdown[1], ts)) { UpdateNow(); } else { break; } } LOG("NowWorker exiting\n"); return 0; } // worker for refilling token buckets void *ReplenishWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "Replenisher"); LOG("%P Replenisher started\n"); UpdateNow(); for (struct timespec ts = timespec_real();; ts = timespec_add(ts, timespec_frommillis(TB_INTERVAL))) { if (!nsync_note_wait(g_shutdown[1], ts)) { ReplenishTokens(g_tok.w, TB_WORDS); } else { break; } } LOG("Replenisher exiting\n"); return 0; } // we're permissive in allowing http connection keepalive until the // moment worker resources start becoming scarce. when that happens // we'll (1) cancel read operations that have not sent us a message // in a while; (2) cancel clients who are sending lots of messages. void Meltdown(void) { int i, marks; struct timespec now; ++g_meltdowns; LOG("Panicking because %d out of %d workers is connected\n", g_connections, g_workers); now = timespec_real(); for (marks = i = 0; i < g_workers; ++i) { if (g_worker[i].connected && (g_worker[i].msgcount > PANIC_MSGS || timespec_cmp(timespec_sub(now, g_worker[i].startread), timespec_frommillis(MELTALIVE_MS)) >= 0)) { pthread_kill(g_worker[i].th, SIGUSR1); ++marks; } } LOG("Melted down %d connections\n", marks); } // main thread worker void *Supervisor(void *arg) { for (;;) { if (!nsync_note_wait(g_shutdown[0], WaitFor(SUPERVISE_MS))) { if (g_workers > 1 && 1. / g_workers * g_connections > PANIC_LOAD) { Meltdown(); } ReloadAsset(&g_asset.index); ReloadAsset(&g_asset.about); ReloadAsset(&g_asset.user); ReloadAsset(&g_asset.favicon); } else { break; } } return 0; } void CheckDatabase(void) { sqlite3 *db; if (g_integrity) { CHECK_SQL(DbOpen("db.sqlite3", &db)); LOG("Checking database integrity...\n"); CHECK_SQL(sqlite3_exec(db, "PRAGMA integrity_check", 0, 0, 0)); LOG("Vacuuming database...\n"); CHECK_SQL(sqlite3_exec(db, "VACUUM", 0, 0, 0)); CHECK_SQL(sqlite3_close(db)); } return; OnError: exit(1); } #ifdef __aarch64__ #define PC pc #define BP regs[29] #else #define PC gregs[REG_RIP] #define BP gregs[REG_RBP] #endif char *hexcpy(char *p, unsigned long x) { int k = x ? (__builtin_clzl(x) ^ 63) + 1 : 1; k = (k + 3) & -4; while (k > 0) *p++ = "0123456789abcdef"[(x >> (k -= 4)) & 15]; *p = '\0'; return p; } char *describe_backtrace(char *p, size_t len, const struct StackFrame *sf) { char *pe = p + len; bool gotsome = false; // show address of each function while (sf) { if (kisdangerous(sf)) { if (p + 1 + 9 + 1 < pe) { if (gotsome) *p++ = ' '; p = stpcpy(p, "DANGEROUS"); if (p + 16 + 1 < pe) { *p++ = ' '; p = hexcpy(p, (long)sf); } } break; } if (p + 16 + 1 < pe) { unsigned char *ip = (unsigned char *)sf->addr; #ifdef __x86_64__ // x86 advances the progrem counter before an instruction // begins executing. return addresses in backtraces shall // point to code after the call, which means addr2line is // going to print unrelated code unless we fixup the addr if (!kisdangerous(ip)) ip -= __is_call(ip); #endif if (gotsome) *p++ = ' '; else gotsome = true; p = hexcpy(p, (long)ip); } else { break; } sf = sf->next; } // terminate string if (p < pe) *p = '\0'; return p; } // abashed the devil stood // and felt how awful goodness is char *describe_crash(char *buf, size_t len, int sig, siginfo_t *si, void *arg) { char *p = buf; // check minimum length if (len < 64) return p; // describe crash char signame[21]; p = stpcpy(p, strsignal_r(sig, signame)); if (si && // (sig == SIGFPE || // sig == SIGILL || // sig == SIGBUS || // sig == SIGSEGV || // sig == SIGTRAP)) { p = stpcpy(p, " at "); p = hexcpy(p, (long)si->si_addr); } // get stack frame daisy chain struct StackFrame pc; struct StackFrame *sf; ucontext_t *ctx; if ((ctx = (ucontext_t *)arg)) { pc.addr = ctx->uc_mcontext.PC; pc.next = (struct StackFrame *)ctx->uc_mcontext.BP; sf = &pc; } else { sf = (struct StackFrame *)__builtin_frame_address(0); } // describe backtrace p = stpcpy(p, " bt "); p = describe_backtrace(p, len - (p - buf), sf); return p; } void on_crash_signal(int sig, siginfo_t *si, void *arg) { char *p; char message[512]; write(2, "crash!\n", 7); p = describe_crash(message, sizeof(message), sig, si, arg); write(g_crash_fd, "crash: ", 7); write(g_crash_fd, message, p - message); write(g_crash_fd, "\n", 1); write(g_crash_fd, last_message, strlen(last_message)); write(g_crash_fd, "\n", 1); pthread_exit(PTHREAD_CANCELED); } static void show_crash_reports(void) { const char *path = "crash.log"; if ((g_crash_fd = open(path, O_CREAT | O_WRONLY | O_APPEND, 0644)) == -1) { fprintf(stderr, "%s: %s\n", path, strerror(errno)); exit(1); } struct sigaction sa; sa.sa_flags = SA_SIGINFO; sigemptyset(&sa.sa_mask); sa.sa_sigaction = on_crash_signal; sigaddset(&sa.sa_mask, SIGABRT); sigaddset(&sa.sa_mask, SIGTRAP); sigaddset(&sa.sa_mask, SIGFPE); sigaddset(&sa.sa_mask, SIGBUS); sigaddset(&sa.sa_mask, SIGSEGV); sigaddset(&sa.sa_mask, SIGILL); sigaddset(&sa.sa_mask, SIGXCPU); sigaddset(&sa.sa_mask, SIGXFSZ); sigaction(SIGABRT, &sa, 0); sigaction(SIGTRAP, &sa, 0); sigaction(SIGFPE, &sa, 0); sigaction(SIGILL, &sa, 0); sigaction(SIGXCPU, &sa, 0); sigaction(SIGXFSZ, &sa, 0); sa.sa_flags |= SA_ONSTACK; sigaction(SIGBUS, &sa, 0); sigaction(SIGSEGV, &sa, 0); } int main(int argc, char *argv[]) { FindDebugBinary(); show_crash_reports(); unassert(false); if (pledge(0, 0)) { fprintf(stderr, "%s: this OS doesn't support pledge() security\n", argv[0]); exit(1); } if (unveil("", 0) < 2) { fprintf(stderr, "%s: need OpenBSD or Landlock LSM v3+\n", argv[0]); exit(1); } if (IsLinux()) { Write(2, "Enabling TCP_FASTOPEN for server sockets...\n"); system("sudo sh -c 'echo 3 >/proc/sys/net/ipv4/tcp_fastopen'"); } // we don't have proper futexes on these platforms // we'll be somewhat less aggressive about workers if (IsXnu() || IsNetbsd()) { g_workers = MIN(g_workers, (unsigned)__get_cpu_count()); } // user interface GetOpts(argc, argv); kprintf("\ | _| \n\ __| | | __| | \\ \\ \\ / _` | __|\n\ | | | | __|\\ \\ \\ / ( | |\n\ \\__|\\__,_|_| _| \\_/\\_/ \\__,_|_|\n"); CHECK_EQ(0, chdir("/opt/turfwar")); putenv("TMPDIR=/opt/turfwar/tmp"); if ((g_blackhole.fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) { kprintf("error: socket(AF_UNIX) failed: %s\n", strerror(errno)); _Exit(3); } if (!Blackhole(0)) { kprintf("turfwar isn't able to protect your kernel from level 4 ddos\n"); kprintf("please run the blackholed program, see https://justine.lol/\n"); } // the power to serve if (g_daemonize) { if (fork() > 0) _Exit(0); setsid(); if (fork() > 0) _Exit(0); umask(0); if (closefrom(0)) for (int i = 0; i < 256; ++i) // close(i); npassert(0 == open(_PATH_DEVNULL, O_RDWR)); npassert(1 == dup(0)); npassert(2 == open("turfwar.log", O_CREAT | O_WRONLY | O_APPEND, 0644)); } LOG("Generating Hilbert Curve...\n"); for (int i = 0; i < YN * XN; ++i) { axdx_t h = unhilbert(XN, i); g_hilbert[i][0] = h.ax; g_hilbert[i][1] = h.dx; } // library init sqlite3_initialize(); CheckDatabase(); // fill token buckets g_tok.b = malloc(TB_BYTES); memset(g_tok.b, 127, TB_BYTES); // server lifecycle locks g_started = timespec_real(); for (int i = 0; i < ARRAYLEN(g_shutdown); ++i) { g_shutdown[i] = nsync_note_new(0, nsync_time_no_deadline); } // load static assets into memory and pre-zip them g_asset.index = LoadAsset("index.html", "text/html; charset=utf-8", 900); g_asset.about = LoadAsset("about.html", "text/html; charset=utf-8", 900); g_asset.user = LoadAsset("user.html", "text/html; charset=utf-8", 900); g_asset.favicon = LoadAsset("favicon.ico", "image/vnd.microsoft.icon", 86400); // sandbox ourselves __pledge_mode = PLEDGE_PENALTY_RETURN_EPERM; CHECK_EQ(0, unveil("/opt/turfwar", "rwc")); CHECK_EQ(0, unveil(0, 0)); if (!IsOpenbsd()) { // TODO(jart): why isn't pledge working on openbsd? CHECK_EQ(0, pledge("stdio flock rpath wpath cpath inet", 0)); } // shutdown signals struct sigaction sa; sa.sa_flags = 0; sa.sa_handler = OnCtrlC; sigfillset(&sa.sa_mask); sigaction(SIGHUP, &sa, 0); sigaction(SIGINT, &sa, 0); sigaction(SIGTERM, &sa, 0); sa.sa_handler = IgnoreSignal; sigaction(SIGUSR1, &sa, 0); pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 128 * 1024); pthread_attr_setguardsize(&attr, sysconf(_SC_PAGESIZE)); pthread_attr_setsigaltstacksize_np(&attr, sysconf(_SC_MINSIGSTKSZ) + 32768); // make 9 helper threads g_ready = nsync_counter_new(10); pthread_t scorer, recenter, claimer, nower, replenisher, plotter; pthread_t scorer_hour, scorer_day, scorer_week, scorer_month; CHECK_EQ(0, pthread_create(&scorer, &attr, ScoreWorker, 0)); CHECK_EQ(0, pthread_create(&scorer_hour, &attr, ScoreHourWorker, 0)); CHECK_EQ(0, pthread_create(&scorer_day, &attr, ScoreDayWorker, 0)); CHECK_EQ(0, pthread_create(&scorer_week, &attr, ScoreWeekWorker, 0)); CHECK_EQ(0, pthread_create(&scorer_month, &attr, ScoreMonthWorker, 0)); CHECK_EQ(0, pthread_create(&replenisher, &attr, ReplenishWorker, 0)); CHECK_EQ(0, pthread_create(&recenter, &attr, RecentWorker, 0)); CHECK_EQ(0, pthread_create(&claimer, &attr, ClaimWorker, 0)); CHECK_EQ(0, pthread_create(&plotter, &attr, PlotWorker, 0)); CHECK_EQ(0, pthread_create(&nower, &attr, NowWorker, 0)); // wait for helper threads to warm up creating assets if (nsync_counter_add(g_ready, -1)) { // #10 nsync_counter_wait(g_ready, nsync_time_no_deadline); } // create one thread to listen CHECK_EQ(0, pthread_create(&g_listener, &attr, ListenWorker, 0)); // create lots of http workers to serve those assets LOG("Online\n"); g_worker = xcalloc(g_workers, sizeof(*g_worker)); for (intptr_t i = 0; i < g_workers; ++i) { CHECK_EQ(0, pthread_create(&g_worker[i].th, &attr, HttpWorker, (void *)i)); } pthread_attr_destroy(&attr); // time to serve LOG("Ready\n"); Supervisor(0); // cancel listen() so we stop accepting new clients LOG("Interrupting listen...\n"); pthread_kill(g_listener, SIGUSR1); pthread_join(g_listener, 0); // cancel read() so that keepalive clients finish faster LOG("Interrupting workers...\n"); for (int i = 0; i < g_workers; ++i) { pthread_kill(g_worker[i].th, SIGUSR1); } // wait for producers to finish LOG("Waiting for workers to finish...\n"); for (int i = 0; i < g_workers; ++i) { CHECK_EQ(0, pthread_join(g_worker[i].th, 0)); } LOG("Waiting for helpers to finish...\n"); CHECK_EQ(0, pthread_join(nower, 0)); CHECK_EQ(0, pthread_join(scorer, 0)); CHECK_EQ(0, pthread_join(plotter, 0)); CHECK_EQ(0, pthread_join(recenter, 0)); CHECK_EQ(0, pthread_join(scorer_day, 0)); CHECK_EQ(0, pthread_join(scorer_hour, 0)); CHECK_EQ(0, pthread_join(scorer_week, 0)); CHECK_EQ(0, pthread_join(scorer_month, 0)); CHECK_EQ(0, pthread_join(replenisher, 0)); // now that all workers have terminated, the claims queue must be // empty, therefore, it is now safe to send a cancellation to the // claims worker thread which waits forever for new claims. CHECK_EQ(0, g_claims.count); LOG("waiting for claims worker...\n"); nsync_note_notify(g_shutdown[2]); CHECK_EQ(0, pthread_join(claimer, 0)); // perform some sanity checks CHECK_EQ(g_claimsprocessed, g_claimsenqueued); // free memory LOG("Freeing memory...\n"); FreeAsset(&g_asset.user); FreeAsset(&g_asset.about); FreeAsset(&g_asset.index); FreeAsset(&g_asset.score); FreeAsset(&g_asset.score_hour); FreeAsset(&g_asset.score_day); FreeAsset(&g_asset.score_week); FreeAsset(&g_asset.score_month); FreeAsset(&g_asset.recent); FreeAsset(&g_asset.favicon); for (int i = 0; i < ARRAYLEN(g_shutdown); ++i) { nsync_note_free(g_shutdown[i]); } nsync_counter_free(g_ready); free(g_worker); free(g_tok.b); LOG("Goodbye\n"); // CheckForMemoryLeaks(); }