From fe3216e961d03fa1484c880b8a5045802d105239 Mon Sep 17 00:00:00 2001 From: Justine Tunney Date: Tue, 4 Oct 2022 23:32:16 -0700 Subject: [PATCH] Perform some code cleanup --- libc/calls/read.c | 5 +- libc/calls/readv.c | 6 +- libc/calls/setrlimit.c | 4 +- libc/calls/tkill.c | 3 +- libc/calls/write.c | 25 +- libc/calls/writev.c | 3 +- libc/runtime/clone.c | 8 +- libc/thread/wait0.c | 3 + net/http/ip.h | 1 + net/http/iscloudflare.c | 42 + net/turfwar/turfwar.c | 914 +++++++++++++------ test/libc/calls/write_test.c | 68 ++ test/libc/calls/writev_test.c | 31 +- test/libc/thread/pthread_barrier_wait_test.c | 2 +- 14 files changed, 803 insertions(+), 312 deletions(-) create mode 100644 net/http/iscloudflare.c diff --git a/libc/calls/read.c b/libc/calls/read.c index a5ffa8531..d6056da03 100644 --- a/libc/calls/read.c +++ b/libc/calls/read.c @@ -41,7 +41,6 @@ * @param size in range [1..0x7ffff000] is reasonable * @return [1..size] bytes on success, 0 on EOF, or -1 w/ errno; with * exception of size==0, in which case return zero means no error - * @see write(), pread(), readv() * @asyncsignalsafe * @restartable * @vforksafe @@ -49,7 +48,7 @@ ssize_t read(int fd, void *buf, size_t size) { ssize_t rc; if (fd >= 0) { - if (IsAsan() && !__asan_is_valid(buf, size)) { + if ((!buf && size) || (IsAsan() && !__asan_is_valid(buf, size))) { rc = efault(); } else if (fd < g_fds.n && g_fds.p[fd].kind == kFdZip) { rc = _weaken(__zipos_read)( @@ -65,7 +64,7 @@ ssize_t read(int fd, void *buf, size_t size) { rc = sys_readv_nt(g_fds.p + fd, &(struct iovec){buf, size}, 1); } } else { - rc = einval(); + rc = ebadf(); } DATATRACE("read(%d, [%#.*hhs%s], %'zu) → %'zd% m", fd, MAX(0, MIN(40, rc)), buf, rc > 40 ? "..." : "", size, rc); diff --git a/libc/calls/readv.c b/libc/calls/readv.c index c0a1f5f21..6d412f83d 100644 --- a/libc/calls/readv.c +++ b/libc/calls/readv.c @@ -46,8 +46,8 @@ * @restartable */ ssize_t readv(int fd, const struct iovec *iov, int iovlen) { - int i; ssize_t rc; + if (fd >= 0 && iovlen >= 0) { if (IsAsan() && !__asan_is_valid_iov(iov, iovlen)) { rc = efault(); @@ -67,9 +67,12 @@ ssize_t readv(int fd, const struct iovec *iov, int iovlen) { } else { rc = sys_readv_nt(g_fds.p + fd, iov, iovlen); } + } else if (fd < 0) { + rc = ebadf(); } else { rc = einval(); } + #if defined(SYSDEBUG) && _DATATRACE if (UNLIKELY(__strace > 0)) { if (rc == -1 && errno == EFAULT) { @@ -81,5 +84,6 @@ ssize_t readv(int fd, const struct iovec *iov, int iovlen) { } } #endif + return rc; } diff --git a/libc/calls/setrlimit.c b/libc/calls/setrlimit.c index 0039ad90c..8d4c9381c 100644 --- a/libc/calls/setrlimit.c +++ b/libc/calls/setrlimit.c @@ -18,11 +18,11 @@ ╚─────────────────────────────────────────────────────────────────────────────*/ #include "libc/assert.h" #include "libc/calls/calls.h" -#include "libc/intrin/strace.internal.h" #include "libc/calls/struct/rlimit.internal.h" #include "libc/dce.h" #include "libc/intrin/asan.internal.h" #include "libc/intrin/describeflags.internal.h" +#include "libc/intrin/strace.internal.h" #include "libc/sysv/consts/rlimit.h" #include "libc/sysv/errfuns.h" @@ -43,6 +43,8 @@ * - `RLIMIT_FSIZE` causes `SIGXFSZ` to sent to the process when the * soft limit on file size is exceeded and the process is destroyed * when the hard limit is exceeded. It works everywhere but Windows + * and it also causes `EFBIG` to be returned by i/o functions after + * the `SIGXFSZ` signal is delivered or ignored * * - `RLIMIT_NPROC` limits the number of simultaneous processes and it * should work on all platforms except Windows. Please be advised it diff --git a/libc/calls/tkill.c b/libc/calls/tkill.c index 291988bc2..0be116697 100644 --- a/libc/calls/tkill.c +++ b/libc/calls/tkill.c @@ -17,10 +17,10 @@ │ PERFORMANCE OF THIS SOFTWARE. │ ╚─────────────────────────────────────────────────────────────────────────────*/ #include "libc/calls/calls.h" -#include "libc/intrin/strace.internal.h" #include "libc/calls/syscall-sysv.internal.h" #include "libc/calls/syscall_support-nt.internal.h" #include "libc/dce.h" +#include "libc/intrin/strace.internal.h" #include "libc/nt/enum/threadaccess.h" #include "libc/nt/runtime.h" #include "libc/nt/thread.h" @@ -48,6 +48,7 @@ static textwindows int sys_tkill_nt(int tid, int sig) { * @param tid is thread id * @param sig does nothing on xnu * @return 0 on success, or -1 w/ errno + * @asyncsignalsafe */ int tkill(int tid, int sig) { int rc; diff --git a/libc/calls/write.c b/libc/calls/write.c index 8b5a4cb08..9606be55c 100644 --- a/libc/calls/write.c +++ b/libc/calls/write.c @@ -34,12 +34,27 @@ * This function changes the current file position. For documentation * on file position behaviors and gotchas, see the lseek() function. * - * @param fd is something open()'d earlier + * @param fd is open file descriptor * @param buf is copied from, cf. copy_file_range(), sendfile(), etc. - * @param size in range [1..0x7ffff000] is reasonable * @return [1..size] bytes on success, or -1 w/ errno; noting zero is * impossible unless size was passed as zero to do an error check - * @see read(), pwrite(), writev(), SIGPIPE + * @raise EBADF if `fd` is negative or not an open file descriptor + * @raise EBADF if `fd` wasn't opened with `O_WRONLY` or `O_RDWR` + * @raise EPIPE if `fd` is a pipe whose other reader end was closed, + * after the `SIGPIPE` signal was delivered, blocked, or ignored + * @raise EFBIG if `RLIMIT_FSIZE` soft limit was exceeded, after the + * `SIGXFSZ` signal was either delivered, blocked, or ignored + * @raise EFAULT if `size` is nonzero and `buf` points to bad memory + * @raise EPERM if pledge() is in play without the stdio promise + * @raise ENOSPC if device containing `fd` is full + * @raise EIO if low-level i/o error happened + * @raise EINTR if signal was delivered instead + * @raise EAGAIN if `O_NONBLOCK` is in play and write needs to block + * @raise ENOBUFS if kernel lacked internal resources; which FreeBSD + * and XNU say could happen with sockets, and OpenBSD documents it + * as a general possibility; whereas other system don't specify it + * @raise ENXIO is specified only by POSIX and XNU when a request is + * made of a nonexistent device or outside device capabilities * @asyncsignalsafe * @restartable * @vforksafe @@ -47,7 +62,7 @@ ssize_t write(int fd, const void *buf, size_t size) { ssize_t rc; if (fd >= 0) { - if (IsAsan() && !__asan_is_valid(buf, size)) { + if ((!buf && size) || (IsAsan() && !__asan_is_valid(buf, size))) { rc = efault(); } else if (fd < g_fds.n && g_fds.p[fd].kind == kFdZip) { rc = _weaken(__zipos_write)( @@ -63,7 +78,7 @@ ssize_t write(int fd, const void *buf, size_t size) { rc = sys_writev_nt(fd, &(struct iovec){buf, size}, 1); } } else { - rc = einval(); + rc = ebadf(); } DATATRACE("write(%d, %#.*hhs%s, %'zu) → %'zd% m", fd, MAX(0, MIN(40, rc)), buf, rc > 40 ? "..." : "", size, rc); diff --git a/libc/calls/writev.c b/libc/calls/writev.c index ada4dc8b1..da2a0791c 100644 --- a/libc/calls/writev.c +++ b/libc/calls/writev.c @@ -50,7 +50,6 @@ * @restartable */ ssize_t writev(int fd, const struct iovec *iov, int iovlen) { - int i; ssize_t rc; if (fd >= 0 && iovlen >= 0) { @@ -72,6 +71,8 @@ ssize_t writev(int fd, const struct iovec *iov, int iovlen) { } else { rc = sys_writev_nt(fd, iov, iovlen); } + } else if (fd < 0) { + rc = ebadf(); } else { rc = einval(); } diff --git a/libc/runtime/clone.c b/libc/runtime/clone.c index cebcedf18..3c84277a4 100644 --- a/libc/runtime/clone.c +++ b/libc/runtime/clone.c @@ -488,13 +488,11 @@ static int CloneLinux(int (*func)(void *arg, int tid), char *stk, size_t stksz, // COSMOPOLITAN /** - * Creates thread without malloc being linked, e.g. + * Creates thread without malloc being linked. * - * int worker(void *arg) { - * return 0; - * } + * If you use clone() you're on you're own, e.g. * - * // NOTE: See _mktls() for _Thread_local support. + * int worker(void *arg) { return 0; } * struct CosmoTib tib = {.tib_self = &tib, .tib_tid = -1}; * char *stk = _mapstack(); * tid = clone(worker, stk, GetStackSize() - 16, diff --git a/libc/thread/wait0.c b/libc/thread/wait0.c index 2c4bdf124..b4df4641a 100644 --- a/libc/thread/wait0.c +++ b/libc/thread/wait0.c @@ -100,4 +100,7 @@ void _wait0(const atomic_int *ctid) { _wait0_poll(&ts); } } + if (IsOpenbsd()) { + sched_yield(); // TODO(jart): whhhy? + } } diff --git a/net/http/ip.h b/net/http/ip.h index 048862759..622b7bf66 100644 --- a/net/http/ip.h +++ b/net/http/ip.h @@ -39,6 +39,7 @@ bool IsMulticastIp(uint32_t); bool IsAnonymousIp(uint32_t); int CategorizeIp(uint32_t); const char *GetIpCategoryName(int); +bool IsCloudflareIp(uint32_t); COSMOPOLITAN_C_END_ #endif /* !(__ASSEMBLER__ + __LINKER__ + 0) */ diff --git a/net/http/iscloudflare.c b/net/http/iscloudflare.c new file mode 100644 index 000000000..af5b3caa8 --- /dev/null +++ b/net/http/iscloudflare.c @@ -0,0 +1,42 @@ +/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│ +│vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi│ +╞══════════════════════════════════════════════════════════════════════════════╡ +│ Copyright 2022 Justine Alexandra Roberts Tunney │ +│ │ +│ Permission to use, copy, modify, and/or distribute this software for │ +│ any purpose with or without fee is hereby granted, provided that the │ +│ above copyright notice and this permission notice appear in all copies. │ +│ │ +│ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL │ +│ WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED │ +│ WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE │ +│ AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL │ +│ DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR │ +│ PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER │ +│ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │ +│ PERFORMANCE OF THIS SOFTWARE. │ +╚─────────────────────────────────────────────────────────────────────────────*/ +#include "net/http/ip.h" + +/** + * Returns true if `x` is Cloudflare IPv4 address. + * + * @see https://www.cloudflare.com/ips/ (April 8, 2021) + */ +bool IsCloudflareIp(uint32_t x) { + return (x & 0xfffffc00) == 0x6715f400 || // 103.21.244.0/22 + (x & 0xfffffc00) == 0x6716c800 || // 103.22.200.0/22 + (x & 0xfffffc00) == 0x671f0400 || // 103.31.4.0/22 + (x & 0xfff80000) == 0x68100000 || // 104.16.0.0/13 + (x & 0xfffc0000) == 0x68180000 || // 104.24.0.0/14 + (x & 0xffffc000) == 0x6ca2c000 || // 108.162.192.0/18 + (x & 0xfffffc00) == 0x83004800 || // 131.0.72.0/22 + (x & 0xffffc000) == 0x8d654000 || // 141.101.64.0/18 + (x & 0xfffe0000) == 0xa29e0000 || // 162.158.0.0/15 + (x & 0xfff80000) == 0xac400000 || // 172.64.0.0/13 + (x & 0xfffff000) == 0xadf53000 || // 173.245.48.0/20 + (x & 0xfffff000) == 0xbc726000 || // 188.114.96.0/20 + (x & 0xfffff000) == 0xbe5df000 || // 190.93.240.0/20 + (x & 0xfffffc00) == 0xc5eaf000 || // 197.234.240.0/22 + (x & 0xffff8000) == 0xc6298000; // 198.41.128.0/17 +} diff --git a/net/turfwar/turfwar.c b/net/turfwar/turfwar.c index 16c8f64a0..41c8eac42 100644 --- a/net/turfwar/turfwar.c +++ b/net/turfwar/turfwar.c @@ -20,6 +20,7 @@ #include "libc/calls/calls.h" #include "libc/calls/pledge.h" #include "libc/calls/struct/iovec.h" +#include "libc/calls/struct/rusage.h" #include "libc/calls/struct/sigaction.h" #include "libc/calls/struct/sigset.h" #include "libc/calls/struct/stat.h" @@ -37,6 +38,7 @@ #include "libc/mem/gc.h" #include "libc/mem/mem.h" #include "libc/nexgen32e/crc32.h" +#include "libc/paths.h" #include "libc/runtime/internal.h" #include "libc/runtime/runtime.h" #include "libc/runtime/stack.h" @@ -53,6 +55,7 @@ #include "libc/sysv/consts/o.h" #include "libc/sysv/consts/poll.h" #include "libc/sysv/consts/prot.h" +#include "libc/sysv/consts/rusage.h" #include "libc/sysv/consts/sig.h" #include "libc/sysv/consts/so.h" #include "libc/sysv/consts/sock.h" @@ -66,11 +69,14 @@ #include "libc/zip.h" #include "net/http/escape.h" #include "net/http/http.h" +#include "net/http/ip.h" #include "net/http/url.h" #include "third_party/getopt/getopt.h" +#include "third_party/nsync/counter.h" #include "third_party/nsync/cv.h" #include "third_party/nsync/mu.h" #include "third_party/nsync/note.h" +#include "third_party/nsync/time.h" #include "third_party/sqlite3/sqlite3.h" #include "third_party/zlib/zconf.h" #include "third_party/zlib/zlib.h" @@ -82,16 +88,16 @@ #define PORT 8080 // default server listening port #define WORKERS 9001 // size of http client thread pool +#define SUPERVISE_MS 1000 // how often to stat() asset files #define KEEPALIVE_MS 60000 // max time to keep idle conn open #define MELTALIVE_MS 2000 // panic keepalive under heavy load -#define POLL_ASSETS_MS 1000 // how often to stat() asset files #define DATE_UPDATE_MS 500 // how often to do tzdata crunching -#define SCORE_UPDATE_MS 60000 // how often to regeenrate /score json -#define SCORE_H_UPDATE_MS 15000 // how often to regeenrate /score json -#define SCORE_D_UPDATE_MS 15000 // how often to regeenrate /score json -#define SCORE_W_UPDATE_MS 15000 // how often to regeenrate /score json -#define SCORE_M_UPDATE_MS 15000 // how often to regeenrate /score json -#define CLAIM_DEADLINE_MS 100 // how long /claim may block if queue is full +#define SCORE_UPDATE_MS 90000 // how often to regenerate /score +#define SCORE_H_UPDATE_MS 10000 // how often to regenerate /score/hour +#define SCORE_D_UPDATE_MS 15000 // how often to regenerate /score/day +#define SCORE_W_UPDATE_MS 30000 // how often to regenerate /score/week +#define SCORE_M_UPDATE_MS 60000 // how often to regenerate /score/month +#define CLAIM_DEADLINE_MS 50 // how long /claim may block if queue is full #define PANIC_LOAD .85 // meltdown if this percent of pool connected #define PANIC_MSGS 10 // msgs per conn can't exceed it in meltdown #define QUEUE_MAX 800 // maximum pending claim items in queue @@ -100,7 +106,7 @@ #define MSG_BUF 512 // small response lookaside #define INBUF_SIZE PAGESIZE -#define OUTBUF_SIZE PAGESIZE +#define OUTBUF_SIZE 8192 #define GETOPTS "dvp:w:k:" #define USAGE \ @@ -118,6 +124,7 @@ Usage: turfwar.com [-dv] ARGS...\n\ "Referrer-Policy: origin\r\n" \ "Access-Control-Allow-Origin: *\r\n" +#define MS2CASH(x) (x / 1000 / 2) #define HasHeader(H) (!!msg->headers[H].a) #define HeaderData(H) (inbuf + msg->headers[H].a) #define HeaderLength(H) (msg->headers[H].b - msg->headers[H].a) @@ -131,27 +138,32 @@ Usage: turfwar.com [-dv] ARGS...\n\ (msg->uri.b - msg->uri.a >= strlen(S) && \ !memcmp(inbuf + msg->uri.a, S, strlen(S))) +// logging is line-buffered when LOG("foo\n") is used +// log lines show ephemerally when LOG("foo") is used #if 1 #define LOG(...) kprintf("\r\e[K" __VA_ARGS__) #else #define LOG(...) (void)0 #endif - #if 0 #define DEBUG(...) kprintf("\r\e[K" __VA_ARGS__) #else #define DEBUG(...) (void)0 #endif +// cosmo's CHECK_EQ() macros are designed to succeed or die +// these macros are similar but designed to return on error #define CHECK_MEM(x) \ do { \ if (!CheckMem(__FILE__, __LINE__, x)) { \ + ++g_memfails; \ goto OnError; \ } \ } while (0) #define CHECK_SYS(x) \ do { \ if (!CheckSys(__FILE__, __LINE__, x)) { \ + ++g_sysfails; \ goto OnError; \ } \ } while (0) @@ -159,6 +171,7 @@ Usage: turfwar.com [-dv] ARGS...\n\ do { \ int e = errno; \ if (!CheckSql(__FILE__, __LINE__, x)) { \ + ++g_dbfails; \ goto OnError; \ } \ errno = e; \ @@ -167,11 +180,13 @@ Usage: turfwar.com [-dv] ARGS...\n\ do { \ int e = errno; \ if (!CheckDb(__FILE__, __LINE__, x, db)) { \ + ++g_dbfails; \ goto OnError; \ } \ errno = e; \ } while (0) +// mandatory header for gzip payloads static const uint8_t kGzipHeader[] = { 0x1F, // MAGNUM 0x8B, // MAGNUM @@ -185,6 +200,7 @@ static const uint8_t kGzipHeader[] = { kZipOsUnix, // OS }; +// 1x1 pixel transparent gif data static const char kPixel[43] = "\x47\x49\x46\x38\x39\x61\x01\x00\x01\x00\x80\x00\x00\xff\xff\xff" "\x00\x00\x00\x21\xf9\x04\x01\x00\x00\x00\x00\x2c\x00\x00\x00\x00" @@ -196,25 +212,56 @@ struct Data { }; struct Asset { + int cash; char *path; nsync_mu lock; const char *type; - const char *cache; struct Data data; struct Data gzip; struct timespec mtim; - char lastmod[32]; + char lastmodified[32]; }; +// cli flags bool g_daemonize; int g_port = PORT; int g_workers = WORKERS; int g_keepalive = KEEPALIVE_MS; +// lifecycle vars +nsync_time g_started; +nsync_counter g_ready; nsync_note g_shutdown; nsync_note g_terminate; atomic_int g_connections; +// whitebox metrics +atomic_long g_accepts; +atomic_long g_dbfails; +atomic_long g_proxied; +atomic_long g_messages; +atomic_long g_memfails; +atomic_long g_sysfails; +atomic_long g_unproxied; +atomic_long g_readfails; +atomic_long g_notfounds; +atomic_long g_meltdowns; +atomic_long g_parsefails; +atomic_long g_iprequests; +atomic_long g_queuefulls; +atomic_long g_htmlclaims; +atomic_long g_emptyclaims; +atomic_long g_acceptfails; +atomic_long g_badversions; +atomic_long g_plainclaims; +atomic_long g_imageclaims; +atomic_long g_invalidnames; +atomic_long g_ipv6forwards; +atomic_long g_claimrequests; +atomic_long g_assetrequests; +atomic_long g_statuszrequests; + +// http worker objects struct Worker { pthread_t th; atomic_int msgcount; @@ -223,17 +270,20 @@ struct Worker { struct timespec startread; } * g_worker; +// recentworker wakeup struct Recent { nsync_mu mu; nsync_cv cv; } g_recent; +// global date header struct Nowish { nsync_mu lock; struct timespec ts; struct tm tm; } g_nowish; +// static assets struct Assets { struct Asset index; struct Asset about; @@ -247,6 +297,7 @@ struct Assets { struct Asset favicon; } g_asset; +// queues /claim to ClaimWorker() struct Claims { int pos; int count; @@ -260,24 +311,27 @@ struct Claims { } data[QUEUE_MAX]; } g_claims; +// easy string sender +ssize_t Write(int fd, const char *s) { + return write(fd, s, strlen(s)); +} + +// helper functions for check macro implementation bool CheckMem(const char *file, int line, void *ptr) { if (ptr) return true; kprintf("%s:%d: out of memory: %s\n", file, line, strerror(errno)); return false; } - bool CheckSys(const char *file, int line, long rc) { if (rc != -1) return true; kprintf("%s:%d: %s\n", file, line, strerror(errno)); return false; } - bool CheckSql(const char *file, int line, int rc) { if (rc == SQLITE_OK) return true; kprintf("%s:%d: %s\n", file, line, sqlite3_errstr(rc)); return false; } - bool CheckDb(const char *file, int line, int rc, sqlite3 *db) { if (rc == SQLITE_OK) return true; kprintf("%s:%d: %s: %s\n", file, line, sqlite3_errstr(rc), @@ -285,6 +339,29 @@ bool CheckDb(const char *file, int line, int rc, sqlite3 *db) { return false; } +// if we try to open a WAL database at the same time from multiple +// threads then it's likely we'll get a SQLITE_BUSY conflict since +// WAL mode does a complicated dance to initialize itself thus all +// we need to do is wait a little bit, and use exponential backoff +int DbOpen(const char *path, sqlite3 **db) { + int i, rc; + rc = sqlite3_open(path, db); + if (rc != SQLITE_OK) return rc; + for (i = 0; i < 7; ++i) { + rc = sqlite3_exec(*db, "PRAGMA journal_mode=WAL", 0, 0, 0); + if (rc == SQLITE_OK) break; + if (rc != SQLITE_BUSY) return rc; + usleep(1000L << i); + } + return sqlite3_exec(*db, "PRAGMA synchronous=NORMAL", 0, 0, 0); +} + +// why not make the statement prepare api a little less hairy too +int DbPrepare(sqlite3 *db, sqlite3_stmt **stmt, const char *sql) { + return sqlite3_prepare_v2(db, sql, -1, stmt, 0); +} + +// validates name registration validity bool IsValidNick(const char *s, size_t n) { size_t i; if (n == -1) n = strlen(s); @@ -308,6 +385,7 @@ bool IsValidNick(const char *s, size_t n) { return true; } +// turn unix timestamp into string the easy way char *FormatUnixHttpDateTime(char *s, int64_t t) { struct tm tm; gmtime_r(&t, &tm); @@ -315,6 +393,8 @@ char *FormatUnixHttpDateTime(char *s, int64_t t) { return s; } +// gmtime_r() does a shocking amount of compute +// so we try to handle that globally right here void UpdateNow(void) { int64_t secs; struct tm tm; @@ -328,6 +408,9 @@ void UpdateNow(void) { //!//!//!//!//!//!//!//!//!//!//!//!//!/ } +// the standard strftime() function is dismally slow +// this function is non-generalized for just http so +// it needs 25 cycles rather than 709 cycles so cool char *FormatDate(char *p) { //////////////////////////////////////// nsync_mu_rlock(&g_nowish.lock); @@ -337,6 +420,9 @@ char *FormatDate(char *p) { return p; } +// inserts ip:name claim into blocking message queue +// may be interrupted by absolute deadline +// may be cancelled by server shutdown bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) { bool wake = false; bool added = false; @@ -361,6 +447,9 @@ bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) { return added; } +// removes batch of ip:name claims from blocking message queue +// may be interrupted by absolute deadline +// may be cancelled by server termination int GetClaims(struct Claims *q, struct Claim *out, int len, nsync_time dead) { int got = 0; nsync_mu_lock(&q->mu); @@ -385,6 +474,7 @@ int GetClaims(struct Claims *q, struct Claim *out, int len, nsync_time dead) { return got; } +// parses request uri query string and extracts ?name=value static bool GetNick(char *inbuf, struct HttpMessage *msg, struct Claim *v) { size_t i, n; struct Url url; @@ -407,6 +497,8 @@ static bool GetNick(char *inbuf, struct HttpMessage *msg, struct Claim *v) { return found; } +// allocates memory with hardware-accelerated buffer overflow detection +// so if it gets hacked it'll at least crash instead of get compromised void *NewSafeBuffer(size_t n) { char *p; size_t m = ROUNDUP(n, PAGESIZE); @@ -415,6 +507,7 @@ void *NewSafeBuffer(size_t n) { return p; } +// frees memory with hardware-accelerated buffer overflow detection void FreeSafeBuffer(void *p) { size_t n = malloc_usable_size(p); size_t m = ROUNDDOWN(n, PAGESIZE); @@ -445,29 +538,113 @@ void DontRunOnFirstCpus(int i) { } } -// thousands of http client servicing threads +// signals by default get delivered to any random thread +// solution is to block every signal possible in threads +void BlockSignals(void) { + sigset_t mask; + sigfillset(&mask); + sigprocmask(SIG_SETMASK, &mask, 0); +} + +// main thread uses sigusr1 to deliver io cancellations +void AllowSigusr1(void) { + sigset_t mask; + sigfillset(&mask); + sigdelset(&mask, SIGUSR1); + sigprocmask(SIG_SETMASK, &mask, 0); +} + +char *Statusz(char *p, const char *s, long x) { + p = stpcpy(p, s); + p = stpcpy(p, ": "); + p = FormatInt64(p, x); + p = stpcpy(p, "\n"); + return p; +} + +// public /statusz endpoint for monitoring server internals +void ServeStatusz(int client, char *outbuf) { + char *p; + nsync_time now; + struct rusage ru; + now = nsync_time_now(); + p = outbuf; + p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" + "Content-Type: text/plain\r\n" + "Cache-Control: max-age=0, must-revalidate\r\n" + "Connection: close\r\n" + "\r\n"); + p = Statusz(p, "qps", + g_messages / MAX(1, nsync_time_sub(now, g_started).tv_sec)); + p = Statusz(p, "started", g_started.tv_sec); + p = Statusz(p, "now", now.tv_sec); + p = Statusz(p, "connections", g_connections); + p = Statusz(p, "workers", g_workers); + p = Statusz(p, "accepts", g_accepts); + p = Statusz(p, "messages", g_messages); + p = Statusz(p, "dbfails", g_dbfails); + p = Statusz(p, "proxied", g_proxied); + p = Statusz(p, "memfails", g_memfails); + p = Statusz(p, "sysfails", g_sysfails); + p = Statusz(p, "unproxied", g_unproxied); + p = Statusz(p, "readfails", g_readfails); + p = Statusz(p, "notfounds", g_notfounds); + p = Statusz(p, "meltdowns", g_meltdowns); + p = Statusz(p, "parsefails", g_parsefails); + p = Statusz(p, "iprequests", g_iprequests); + p = Statusz(p, "queuefulls", g_queuefulls); + p = Statusz(p, "htmlclaims", g_htmlclaims); + p = Statusz(p, "emptyclaims", g_emptyclaims); + p = Statusz(p, "acceptfails", g_acceptfails); + p = Statusz(p, "badversions", g_badversions); + p = Statusz(p, "plainclaims", g_plainclaims); + p = Statusz(p, "imageclaims", g_imageclaims); + p = Statusz(p, "invalidnames", g_invalidnames); + p = Statusz(p, "ipv6forwards", g_ipv6forwards); + p = Statusz(p, "claimrequests", g_claimrequests); + p = Statusz(p, "assetrequests", g_assetrequests); + p = Statusz(p, "statuszrequests", g_statuszrequests); + if (!getrusage(RUSAGE_SELF, &ru)) { + p = Statusz(p, "ru_utime.tv_sec", ru.ru_utime.tv_sec); + p = Statusz(p, "ru_utime.tv_usec", ru.ru_utime.tv_usec); + p = Statusz(p, "ru_stime.tv_sec", ru.ru_stime.tv_sec); + p = Statusz(p, "ru_stime.tv_usec", ru.ru_stime.tv_usec); + p = Statusz(p, "ru_maxrss", ru.ru_maxrss); + p = Statusz(p, "ru_ixrss", ru.ru_ixrss); + p = Statusz(p, "ru_idrss", ru.ru_idrss); + p = Statusz(p, "ru_isrss", ru.ru_isrss); + p = Statusz(p, "ru_minflt", ru.ru_minflt); + p = Statusz(p, "ru_majflt", ru.ru_majflt); + p = Statusz(p, "ru_nswap", ru.ru_nswap); + p = Statusz(p, "ru_inblock", ru.ru_inblock); + p = Statusz(p, "ru_oublock", ru.ru_oublock); + p = Statusz(p, "ru_msgsnd", ru.ru_msgsnd); + p = Statusz(p, "ru_msgrcv", ru.ru_msgrcv); + p = Statusz(p, "ru_nsignals", ru.ru_nsignals); + p = Statusz(p, "ru_nvcsw", ru.ru_nvcsw); + p = Statusz(p, "ru_nivcsw", ru.ru_nivcsw); + } + write(client, outbuf, p - outbuf); +} + +// make thousands of http client handler threads // load balance incoming connections for port 8080 across all threads // hangup on any browser clients that lag for more than a few seconds void *HttpWorker(void *arg) { int server; int yes = 1; - char name[16]; - sigset_t mask; int id = (intptr_t)arg; - char *msgbuf = _gc(malloc(MSG_BUF)); + char *msgbuf = _gc(xmalloc(MSG_BUF)); char *inbuf = NewSafeBuffer(INBUF_SIZE); char *outbuf = NewSafeBuffer(OUTBUF_SIZE); struct timeval timeo = {g_keepalive / 1000, g_keepalive % 1000}; struct HttpMessage *msg = _gc(xmalloc(sizeof(struct HttpMessage))); struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(g_port)}; - sigfillset(&mask); + BlockSignals(); DontRunOnFirstCpus(2); - sigdelset(&mask, SIGUSR1); - sigprocmask(SIG_SETMASK, &mask, 0); - ksnprintf(name, sizeof(name), "HTTP #%d", id); - pthread_setname_np(pthread_self(), name); CHECK_NE(-1, (server = socket(AF_INET, SOCK_STREAM, 0))); + pthread_setname_np(pthread_self(), _gc(xasprintf("HTTP #%d", id))); setsockopt(server, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)); setsockopt(server, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo)); setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); @@ -476,7 +653,6 @@ void *HttpWorker(void *arg) { setsockopt(server, SOL_TCP, TCP_QUICKACK, &yes, sizeof(yes)); CHECK_NE(-1, bind(server, &addr, sizeof(addr))); CHECK_NE(-1, listen(server, 1)); - errno = 0; // connection loop while (!nsync_note_is_notified(g_shutdown)) { @@ -484,46 +660,116 @@ void *HttpWorker(void *arg) { struct Url url; ssize_t got, sent; uint32_t ip, clientip; - char ipbuf[32], *p, *q; uint32_t clientaddrsize; - struct sockaddr_in clientaddr; int client, inmsglen, outmsglen; + char ipbuf[32], *p, *q, cashbuf[64]; + struct sockaddr_in clientaddr = {0}; // wait for client connection + // this may be cancelled by sigusr1 + AllowSigusr1(); clientaddrsize = sizeof(clientaddr); client = accept(server, (struct sockaddr *)&clientaddr, &clientaddrsize); - if (client == -1) continue; - ip = clientip = ntohl(clientaddr.sin_addr.s_addr); + if (client == -1) { + if (errno != EAGAIN) { // spinning on SO_RCVTIMEO + ++g_acceptfails; + } + continue; + } + clientip = ntohl(clientaddr.sin_addr.s_addr); g_worker[id].connected = true; g_worker[id].msgcount = 0; + ++g_accepts; ++g_connections; - // strict message loop w/o pipelining + // simple http/1.1 message loop + // let's assume we're behind a well-behaved frontend + // each read() should give us just *one* HTTP message + // if we get less than one message, we drop connection + // if we get more than one message, we Connection: close + // let's not bother with cray proto stuff like 100-expect do { struct Asset *a; bool comp, ipv6; - sigdelset(&mask, SIGUSR1); - sigprocmask(SIG_SETMASK, &mask, 0); + + // wait for http message + // this may be cancelled by sigusr1 + AllowSigusr1(); InitHttpMessage(msg, kHttpRequest); g_worker[id].startread = _timespec_real(); - if ((got = read(client, inbuf, INBUF_SIZE)) <= 0) break; - sigaddset(&mask, SIGUSR1); - sigprocmask(SIG_SETMASK, &mask, 0); - if ((inmsglen = ParseHttpMessage(msg, inbuf, got)) <= 0) break; - if (msg->version != 11) break; // cloudflare won't send 0.9 or 1.0 + if ((got = read(client, inbuf, INBUF_SIZE)) <= 0) { + ++g_readfails; + break; + } + BlockSignals(); + + // parse http message + // we're only doing one-shot parsing right now + if ((inmsglen = ParseHttpMessage(msg, inbuf, got)) <= 0) { + ++g_parsefails; + break; + } + ++g_messages; ++g_worker[id].msgcount; - // get the ip address again - // we assume a firewall only lets the frontend talk to this server - ipv6 = false; - if (HasHeader(kHttpXForwardedFor) && - ParseForwarded(HeaderData(kHttpXForwardedFor), - HeaderLength(kHttpXForwardedFor), &ip, 0) == -1) { - ipv6 = true; + // get client address from frontend + if (HasHeader(kHttpXForwardedFor)) { + if (!IsLoopbackIp(clientip) && // + !IsPrivateIp(clientip) && // + !IsCloudflareIp(clientip)) { + LOG("Got X-Forwarded-For from untrusted IPv4 client address " + "%hhu.%hhu.%hhu.%hhu\n", + clientip >> 24, clientip >> 16, clientip >> 8, clientip); + ipv6 = false; + ip = clientip; + ++g_unproxied; + } else if (ParseForwarded(HeaderData(kHttpXForwardedFor), + HeaderLength(kHttpXForwardedFor), &ip, + 0) != -1) { + ipv6 = false; + ++g_proxied; + } else { + ipv6 = true; + ip = clientip; + ++g_ipv6forwards; + ++g_proxied; + } + } else { + ipv6 = false; + ip = clientip; + ++g_unproxied; } ksnprintf(ipbuf, sizeof(ipbuf), "%hhu.%hhu.%hhu.%hhu", ip >> 24, ip >> 16, ip >> 8, ip); + // we don't support http/1.0 and http/0.9 right now + if (msg->version != 11) { + LOG("%s used unsupported http/%d version\n", ipbuf, msg->version); + Write(client, "HTTP/1.1 505 HTTP Version Not Supported\r\n" + "Content-Type: text/plain\r\n" + "Connection: close\r\n" + "\r\n" + "HTTP Version Not Supported\n"); + ++g_badversions; + break; + } + + // access log + LOG("%16s %.*s %.*s %.*s %.*s %#.*s\n", ipbuf, + msg->xmethod.b - msg->xmethod.a, inbuf + msg->xmethod.a, + msg->uri.b - msg->uri.a, inbuf + msg->uri.a, + HeaderLength(kHttpCfIpcountry), HeaderData(kHttpCfIpcountry), + HeaderLength(kHttpSecChUaPlatform), HeaderData(kHttpSecChUaPlatform), + HeaderLength(kHttpReferer), HeaderData(kHttpReferer)); + + // export monitoring data + if (UrlEqual("/statusz")) { + ServeStatusz(client, outbuf); + ++g_statuszrequests; + break; + } + + // asset routing if (UrlEqual("/") || UrlStartsWith("/index.html")) { a = &g_asset.index; } else if (UrlStartsWith("/favicon.ico")) { @@ -548,37 +794,65 @@ void *HttpWorker(void *arg) { a = 0; } + // assert serving if (a) { struct iovec iov[2]; + ++g_assetrequests; comp = a->gzip.n < a->data.n && HeaderHas(msg, inbuf, kHttpAcceptEncoding, "gzip", 4); - p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS - "Vary: Accept-Encoding\r\n" - "Date: "); - p = FormatDate(p); //////////////////////////////////////// nsync_mu_rlock(&a->lock); - p = stpcpy(p, "\r\nLast-Modified: "); - p = stpcpy(p, a->lastmod); - p = stpcpy(p, "\r\nContent-Type: "); - p = stpcpy(p, a->type); - p = stpcpy(p, "\r\nCache-Control: "); - p = stpcpy(p, a->cache); - if (comp) p = stpcpy(p, "\r\nContent-Encoding: gzip"); - p = stpcpy(p, "\r\nContent-Length: "); - d = comp ? a->gzip : a->data; - p = FormatInt32(p, d.n); - p = stpcpy(p, "\r\n\r\n"); - iov[0].iov_base = outbuf; - iov[0].iov_len = p - outbuf; - iov[1].iov_base = d.p; - iov[1].iov_len = d.n; - sent = writev(client, iov, 2); - outmsglen = iov[0].iov_len + iov[1].iov_len; + if (HasHeader(kHttpIfModifiedSince) && + a->mtim.tv_sec <= + ParseHttpDateTime(HeaderData(kHttpIfModifiedSince), + HeaderLength(kHttpIfModifiedSince))) { + p = stpcpy(outbuf, + "HTTP/1.1 304 Not Modified\r\n" STANDARD_RESPONSE_HEADERS + "Vary: Accept-Encoding\r\n" + "Date: "); + p = FormatDate(p); + p = stpcpy(p, "\r\nLast-Modified: "); + p = stpcpy(p, a->lastmodified); + p = stpcpy(p, "\r\nContent-Type: "); + p = stpcpy(p, a->type); + p = stpcpy(p, "\r\nCache-Control: "); + ksnprintf(cashbuf, sizeof(cashbuf), "max-age=%d, must-revalidate", + a->cash); + p = stpcpy(p, cashbuf); + p = stpcpy(p, "\r\n\r\n"); + outmsglen = p - outbuf; + sent = write(client, outbuf, outmsglen); + } else { + p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS + "Vary: Accept-Encoding\r\n" + "Date: "); + p = FormatDate(p); + p = stpcpy(p, "\r\nLast-Modified: "); + p = stpcpy(p, a->lastmodified); + p = stpcpy(p, "\r\nContent-Type: "); + p = stpcpy(p, a->type); + p = stpcpy(p, "\r\nCache-Control: "); + ksnprintf(cashbuf, sizeof(cashbuf), "max-age=%d, must-revalidate", + a->cash); + p = stpcpy(p, cashbuf); + if (comp) p = stpcpy(p, "\r\nContent-Encoding: gzip"); + p = stpcpy(p, "\r\nContent-Length: "); + d = comp ? a->gzip : a->data; + p = FormatInt32(p, d.n); + p = stpcpy(p, "\r\n\r\n"); + iov[0].iov_base = outbuf; + iov[0].iov_len = p - outbuf; + iov[1].iov_base = d.p; + iov[1].iov_len = msg->method == kHttpHead ? 0 : d.n; + outmsglen = iov[0].iov_len + iov[1].iov_len; + sent = writev(client, iov, 2); + } nsync_mu_runlock(&a->lock); //////////////////////////////////////// } else if (UrlStartsWith("/ip")) { + // what is my ip endpoint + ++g_iprequests; if (!ipv6) { p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept\r\n" @@ -616,6 +890,8 @@ void *HttpWorker(void *arg) { } } else if (UrlStartsWith("/claim")) { + // ip:name registration endpoint + ++g_claimrequests; if (ipv6) goto Ipv6Warning; struct Claim v = {.ip = ip, .created = g_nowish.ts.tv_sec}; if (GetNick(inbuf, msg, &v)) { @@ -623,10 +899,11 @@ void *HttpWorker(void *arg) { &g_claims, &v, _timespec_add(_timespec_real(), _timespec_frommillis(CLAIM_DEADLINE_MS)))) { - LOG("%s claimed by %s\n", ipbuf, v.name); + DEBUG("%s claimed by %s\n", ipbuf, v.name); if (HasHeader(kHttpAccept) && (HeaderHas(msg, inbuf, kHttpAccept, "image/*", 7) || HeaderHas(msg, inbuf, kHttpAccept, "image/gif", 9))) { + ++g_imageclaims; p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept\r\n" "Cache-Control: private\r\n" @@ -640,6 +917,7 @@ void *HttpWorker(void *arg) { } else if (HasHeader(kHttpAccept) && HeaderHas(msg, inbuf, kHttpAccept, "text/plain", 10) && !HeaderHas(msg, inbuf, kHttpAccept, "text/html", 9)) { + ++g_plainclaims; ksnprintf(msgbuf, MSG_BUF, "The land at %s was claimed for %s\n", ipbuf, v.name); q = msgbuf; @@ -657,6 +935,7 @@ void *HttpWorker(void *arg) { (HeaderHas(msg, inbuf, kHttpAccept, "text/html", 9) || HeaderHas(msg, inbuf, kHttpAccept, "text/*", 6) || HeaderHas(msg, inbuf, kHttpAccept, "*/*", 3))) { + ++g_htmlclaims; ksnprintf(msgbuf, MSG_BUF, "\n" "The land at %s was claimed for %s.\n" @@ -678,6 +957,7 @@ void *HttpWorker(void *arg) { p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, q); } else { + ++g_emptyclaims; p = stpcpy(outbuf, "HTTP/1.1 204 No Content\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept\r\n" @@ -691,24 +971,16 @@ void *HttpWorker(void *arg) { sent = write(client, outbuf, p - outbuf); } else { LOG("%s: 502 Claims Queue Full\n", ipbuf); - q = "Claims Queue Full"; - p = stpcpy( - outbuf, - "HTTP/1.1 502 Claims Queue Full\r\n" STANDARD_RESPONSE_HEADERS - "Content-Type: text/plain\r\n" - "Cache-Control: private\r\n" - "Connection: close\r\n" - "Date: "); - p = FormatDate(p); - p = stpcpy(p, "\r\nContent-Length: "); - p = FormatInt32(p, strlen(q)); - p = stpcpy(p, "\r\n\r\n"); - p = stpcpy(p, q); - outmsglen = p - outbuf; - sent = write(client, outbuf, p - outbuf); + Write(client, "HTTP/1.1 502 Claims Queue Full\r\n" + "Content-Type: text/plain\r\n" + "Connection: close\r\n" + "\r\n" + "Claims Queue Full\n"); + ++g_queuefulls; break; } } else { + ++g_invalidnames; LOG("%s: 400 invalid name\n", ipbuf); q = "invalid name"; p = stpcpy(outbuf, @@ -728,6 +1000,8 @@ void *HttpWorker(void *arg) { } } else { + // default endpoint + ++g_notfounds; LOG("%s: 400 not found %#.*s\n", ipbuf, msg->uri.b - msg->uri.a, inbuf + msg->uri.a); q = "\r\n" @@ -750,11 +1024,12 @@ void *HttpWorker(void *arg) { // amount, then since we sent the content length and checked // that the client didn't attach a payload, we are so synced // thus we can safely process more messages - } while (got == inmsglen && // - sent == outmsglen && // - !msg->headers[kHttpContentLength].a && - !msg->headers[kHttpTransferEncoding].a && - (msg->method == kHttpGet || msg->method == kHttpHead) && + } while (got == inmsglen && // + sent == outmsglen && // + !HasHeader(kHttpContentLength) && // + !HasHeader(kHttpTransferEncoding) && // + (msg->method == kHttpGet || // + msg->method == kHttpHead) && // !nsync_note_is_notified(g_shutdown)); DestroyHttpMessage(msg); close(client); @@ -770,6 +1045,7 @@ void *HttpWorker(void *arg) { return 0; } +// helper to precompress gzip responses in background struct Data Gzip(struct Data data) { char *p; void *tmp; @@ -805,32 +1081,34 @@ struct Data Gzip(struct Data data) { return res; } -struct Asset LoadAsset(const char *path, const char *type) { +// slurps asset off disk once during startup +struct Asset LoadAsset(const char *path, const char *type, int cash) { struct stat st; struct Asset a = {0}; CHECK_EQ(0, stat(path, &st)); CHECK_NOTNULL((a.data.p = xslurp(path, &a.data.n))); a.type = type; - a.cache = "max-age=3600, must-revalidate"; + a.cash = cash; CHECK_NOTNULL((a.path = strdup(path))); a.mtim = st.st_mtim; CHECK_NOTNULL((a.gzip = Gzip(a.data)).p); - FormatUnixHttpDateTime(a.lastmod, a.mtim.tv_sec); + FormatUnixHttpDateTime(a.lastmodified, a.mtim.tv_sec); return a; } +// reslurps asset off disk if its mtim changed bool ReloadAsset(struct Asset *a) { int fd; void *f[2]; ssize_t rc; struct stat st; - char lastmod[32]; + char lastmodified[32]; struct Data data = {0}; struct Data gzip = {0}; CHECK_SYS((fd = open(a->path, O_RDONLY))); CHECK_SYS(fstat(fd, &st)); if (_timespec_gt(st.st_mtim, a->mtim)) { - FormatUnixHttpDateTime(lastmod, st.st_mtim.tv_sec); + FormatUnixHttpDateTime(lastmodified, st.st_mtim.tv_sec); CHECK_MEM((data.p = malloc(st.st_size))); CHECK_SYS((rc = read(fd, data.p, st.st_size))); data.n = st.st_size; @@ -843,7 +1121,7 @@ bool ReloadAsset(struct Asset *a) { a->data = data; a->gzip = gzip; a->mtim = st.st_mtim; - memcpy(a->lastmod, lastmod, 32); + memcpy(a->lastmodified, lastmodified, 32); nsync_mu_unlock(&a->lock); //!//!//!//!//!//!//!//!//!//!//!//!//!/ free(f[0]); @@ -868,6 +1146,9 @@ void IgnoreSignal(int sig) { // so worker i/o routines may eintr safely } +// asynchronous handler of sigint, sigterm, and sighup signals +// this handler is always invoked from within the main thread, +// because our helper and worker threads block always signals. void OnCtrlC(int sig) { if (!nsync_note_is_notified(g_shutdown)) { LOG("Received %s shutting down...\n", strsignal(sig)); @@ -875,6 +1156,7 @@ void OnCtrlC(int sig) { } else { // there's no way to deliver signals to workers atomically, unless // we pay the cost of ppoll() which isn't necessary in this design + // so if a user smashes that ctrl-c then we tkill the workers more LOG("Received %s again so sending another volley...\n", strsignal(sig)); for (int i = 0; i < g_workers; ++i) { if (!g_worker[i].shutdown) { @@ -884,6 +1166,7 @@ void OnCtrlC(int sig) { } } +// parses cli arguments static void GetOpts(int argc, char *argv[]) { int opt; while ((opt = getopt(argc, argv, GETOPTS)) != -1) { @@ -913,10 +1196,12 @@ static void GetOpts(int argc, char *argv[]) { } } -void Update(struct Asset *a, bool gen(struct Asset *, long), long arg) { +// atomically swaps out asset with newer version +void Update(struct Asset *a, bool gen(struct Asset *, long, long), long x, + long y) { void *f[2]; struct Asset t; - if (gen(&t, arg)) { + if (gen(&t, x, y)) { //!//!//!//!//!//!//!//!//!//!//!//!//!/ nsync_mu_lock(&a->lock); f[0] = a->data.p; @@ -924,7 +1209,9 @@ void Update(struct Asset *a, bool gen(struct Asset *, long), long arg) { a->data = t.data; a->gzip = t.gzip; a->mtim = t.mtim; - memcpy(a->lastmod, t.lastmod, 32); + a->type = t.type; + a->cash = t.cash; + memcpy(a->lastmodified, t.lastmodified, 32); nsync_mu_unlock(&a->lock); //!//!//!//!//!//!//!//!//!//!//!//!//!/ free(f[0]); @@ -932,7 +1219,8 @@ void Update(struct Asset *a, bool gen(struct Asset *, long), long arg) { } } -bool GenerateScore(struct Asset *out, long seconds) { +// generator function for the big board +bool GenerateScore(struct Asset *out, long secs, long cash) { int rc; char *sb = 0; sqlite3 *db = 0; @@ -942,34 +1230,32 @@ bool GenerateScore(struct Asset *out, long seconds) { bool namestate = false; char name1[NICK_MAX + 1] = {0}; char name2[NICK_MAX + 1]; - DEBUG("GenerateScore %ld\n", seconds); + DEBUG("GenerateScore %ld\n", secs); a.type = "application/json"; - a.cache = "max-age=15, must-revalidate"; + a.cash = cash; CHECK_SYS(clock_gettime(CLOCK_REALTIME, &a.mtim)); - FormatUnixHttpDateTime(a.lastmod, a.mtim.tv_sec); + FormatUnixHttpDateTime(a.lastmodified, a.mtim.tv_sec); CHECK_SYS(appends(&a.data.p, "{\n")); CHECK_SYS(appendf(&a.data.p, "\"now\":[%ld,%ld],\n", a.mtim.tv_sec, a.mtim.tv_nsec)); CHECK_SYS(appends(&a.data.p, "\"score\":{\n")); - CHECK_SQL(sqlite3_open("db.sqlite3", &db)); - CHECK_SQL(sqlite3_exec(db, "PRAGMA journal_mode=WAL", 0, 0, 0)); - CHECK_SQL(sqlite3_exec(db, "PRAGMA synchronous=NORMAL", 0, 0, 0)); - if (seconds == -1) { - CHECK_DB(sqlite3_prepare_v2(db, - "SELECT nick, (ip >> 24), COUNT(*)\n" - "FROM land\n" - "GROUP BY nick, (ip >> 24)", - -1, &stmt, 0)); + CHECK_SQL(DbOpen("db.sqlite3", &db)); + if (secs == -1) { + CHECK_DB(DbPrepare(db, &stmt, + "SELECT nick, (ip >> 24), COUNT(*)\n" + "FROM land\n" + "GROUP BY nick, (ip >> 24)")); } else { - CHECK_DB(sqlite3_prepare_v2(db, - "SELECT nick, (ip >> 24), COUNT(*)\n" - " FROM land\n" - "WHERE created NOT NULL\n" - " AND created >= ?1\n" - "GROUP BY nick, (ip >> 24)", - -1, &stmt, 0)); - CHECK_DB(sqlite3_bind_int64(stmt, 1, a.mtim.tv_sec - seconds)); + CHECK_DB(DbPrepare(db, &stmt, + "SELECT nick, (ip >> 24), COUNT(*)\n" + " FROM land\n" + "WHERE created NOT NULL\n" + " AND created >= ?1\n" + "GROUP BY nick, (ip >> 24)")); + CHECK_DB(sqlite3_bind_int64(stmt, 1, a.mtim.tv_sec - secs)); } + // be sure to always use transactions with sqlite as in always + // otherwise.. you can use --strace to see the fcntl bloodbath CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); while ((rc = sqlite3_step(stmt)) != SQLITE_DONE) { if (rc != SQLITE_ROW) CHECK_SQL(rc); @@ -1009,195 +1295,212 @@ OnError: // single thread for regenerating the user scores json void *ScoreWorker(void *arg) { - nsync_time deadline; - LOG("ScoreWorker started\n"); + BlockSignals(); + pthread_setname_np(pthread_self(), "ScoreAll"); + LOG("Score started\n"); + long wait = SCORE_UPDATE_MS; + Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait)); + nsync_counter_add(g_ready, -1); // #1 OnlyRunOnCpu(0); - pthread_setname_np(pthread_self(), "ScoreWorker"); - for (deadline = _timespec_real();;) { - deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_UPDATE_MS)); - if (!nsync_note_wait(g_shutdown, deadline)) { - Update(&g_asset.score, GenerateScore, -1); - } else { - break; - } + for (nsync_time deadline = _timespec_real();;) { + Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait)); + deadline = _timespec_add(deadline, _timespec_frommillis(wait)); + if (nsync_note_wait(g_shutdown, deadline)) break; } - LOG("ScoreWorker exiting\n"); + LOG("Score exiting\n"); return 0; } // single thread for regenerating the user scores json void *ScoreHourWorker(void *arg) { - nsync_time deadline; - LOG("ScoreHourWorker started\n"); + BlockSignals(); + pthread_setname_np(pthread_self(), "ScoreHour"); + LOG("ScoreHour started\n"); + long secs = 60L * 60; + long wait = SCORE_H_UPDATE_MS; + Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait)); + nsync_counter_add(g_ready, -1); // #2 OnlyRunOnCpu(0); - pthread_setname_np(pthread_self(), "ScoreHourWorker"); - for (deadline = _timespec_real();;) { - deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_D_UPDATE_MS)); - if (!nsync_note_wait(g_shutdown, deadline)) { - Update(&g_asset.score_hour, GenerateScore, 60L * 60); - } else { - break; - } + for (nsync_time deadline = _timespec_real();;) { + Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait)); + deadline = _timespec_add(deadline, _timespec_frommillis(wait)); + if (nsync_note_wait(g_shutdown, deadline)) break; } - LOG("ScoreHourWorker exiting\n"); + LOG("ScoreHour exiting\n"); return 0; } // single thread for regenerating the user scores json void *ScoreDayWorker(void *arg) { - nsync_time deadline; - LOG("ScoreDayWorker started\n"); + BlockSignals(); + pthread_setname_np(pthread_self(), "ScoreDay"); + LOG("ScoreDay started\n"); + long secs = 60L * 60 * 24; + long wait = SCORE_D_UPDATE_MS; + Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait)); + nsync_counter_add(g_ready, -1); // #3 OnlyRunOnCpu(0); - pthread_setname_np(pthread_self(), "ScoreDayWorker"); - for (deadline = _timespec_real();;) { - deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_D_UPDATE_MS)); - if (!nsync_note_wait(g_shutdown, deadline)) { - Update(&g_asset.score_day, GenerateScore, 60L * 60 * 24); - } else { - break; - } + for (nsync_time deadline = _timespec_real();;) { + Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait)); + deadline = _timespec_add(deadline, _timespec_frommillis(wait)); + if (nsync_note_wait(g_shutdown, deadline)) break; } - LOG("ScoreDayWorker exiting\n"); + LOG("ScoreDay exiting\n"); return 0; } // single thread for regenerating the user scores json void *ScoreWeekWorker(void *arg) { - nsync_time deadline; - LOG("ScoreWeekWorker started\n"); + BlockSignals(); + pthread_setname_np(pthread_self(), "ScoreWeek"); + LOG("ScoreWeek started\n"); + long secs = 60L * 60 * 24 * 7; + long wait = SCORE_W_UPDATE_MS; + Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait)); + nsync_counter_add(g_ready, -1); // #4 OnlyRunOnCpu(0); - pthread_setname_np(pthread_self(), "ScoreWeekWorker"); - for (deadline = _timespec_real();;) { - deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_D_UPDATE_MS)); - if (!nsync_note_wait(g_shutdown, deadline)) { - Update(&g_asset.score_week, GenerateScore, 60L * 60 * 24 * 7); - } else { - break; - } + for (nsync_time deadline = _timespec_real();;) { + Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait)); + deadline = _timespec_add(deadline, _timespec_frommillis(wait)); + if (nsync_note_wait(g_shutdown, deadline)) break; } - LOG("ScoreWeekWorker exiting\n"); + LOG("ScoreWeek exiting\n"); return 0; } // single thread for regenerating the user scores json void *ScoreMonthWorker(void *arg) { - nsync_time deadline; - LOG("ScoreMonthWorker started\n"); + BlockSignals(); + pthread_setname_np(pthread_self(), "ScoreMonth"); + LOG("ScoreMonth started\n"); + long secs = 60L * 60 * 24 * 30; + long wait = SCORE_M_UPDATE_MS; + Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait)); + nsync_counter_add(g_ready, -1); // #5 OnlyRunOnCpu(0); - pthread_setname_np(pthread_self(), "ScoreMonthWorker"); - for (deadline = _timespec_real();;) { - deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_D_UPDATE_MS)); - if (!nsync_note_wait(g_shutdown, deadline)) { - Update(&g_asset.score_month, GenerateScore, 60L * 60 * 24 * 30); - } else { - break; - } + for (nsync_time deadline = _timespec_real();;) { + Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait)); + deadline = _timespec_add(deadline, _timespec_frommillis(wait)); + if (nsync_note_wait(g_shutdown, deadline)) break; } - LOG("ScoreMonthWorker exiting\n"); + LOG("ScoreMonth exiting\n"); return 0; } -bool GenerateRecent(struct Asset *out, long arg) { - int rc; +// thread for realtime json generation of recent successful claims +void *RecentWorker(void *arg) { + bool once; + void *f[2]; + int rc, err; + sqlite3 *db; char *sb = 0; - sqlite3 *db = 0; size_t sblen = 0; - bool once = false; - struct Asset a = {0}; - sqlite3_stmt *stmt = 0; - DEBUG("GenerateRecent\n"); - OnlyRunOnCpu(0); - pthread_setname_np(pthread_self(), "GenerateRecent"); - a.type = "application/json"; - a.cache = "max-age=0, must-revalidate"; - CHECK_SYS(clock_gettime(CLOCK_REALTIME, &a.mtim)); - FormatUnixHttpDateTime(a.lastmod, a.mtim.tv_sec); - CHECK_SYS(appends(&a.data.p, "{\n")); - CHECK_SYS(appendf(&a.data.p, "\"now\":[%ld,%ld],\n", a.mtim.tv_sec, - a.mtim.tv_nsec)); - CHECK_SYS(appends(&a.data.p, "\"recent\":[\n")); - CHECK_SQL(sqlite3_open("db.sqlite3", &db)); - CHECK_SQL(sqlite3_exec(db, "PRAGMA journal_mode=WAL", 0, 0, 0)); - CHECK_SQL(sqlite3_exec(db, "PRAGMA synchronous=NORMAL", 0, 0, 0)); - CHECK_DB(sqlite3_prepare_v2(db, - "SELECT ip, nick, created\n" - "FROM land\n" - "WHERE created NOT NULL\n" - "ORDER BY created DESC\n" - "LIMIT 50", - -1, &stmt, 0)); - CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); - while ((rc = sqlite3_step(stmt)) != SQLITE_DONE) { - if (rc != SQLITE_ROW) CHECK_SQL(rc); - if (once) { - CHECK_SYS(appends(&a.data.p, ",\n")); - } else { - once = true; + sqlite3_stmt *stmt; + struct Asset *a, t; + bool warmedup = false; + BlockSignals(); + pthread_setname_np(pthread_self(), "RecentWorker"); + LOG("RecentWorker started\n"); +StartOver: + stmt = 0; + bzero(&t, sizeof(t)); + CHECK_SQL(DbOpen("db.sqlite3", &db)); + CHECK_DB(DbPrepare(db, &stmt, + "SELECT ip, nick, created\n" + "FROM land\n" + "WHERE created NOT NULL\n" + "ORDER BY created DESC\n" + "LIMIT 50")); + do { + // regenerate json + CHECK_SYS(clock_gettime(CLOCK_REALTIME, &t.mtim)); + FormatUnixHttpDateTime(t.lastmodified, t.mtim.tv_sec); + CHECK_SYS(appends(&t.data.p, "{\n")); + CHECK_SYS(appendf(&t.data.p, "\"now\":[%ld,%ld],\n", t.mtim.tv_sec, + t.mtim.tv_nsec)); + CHECK_SYS(appends(&t.data.p, "\"recent\":[\n")); + CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); + for (once = false; (rc = sqlite3_step(stmt)) != SQLITE_DONE; once = true) { + if (rc != SQLITE_ROW) CHECK_SQL(rc); + if (once) CHECK_SYS(appends(&t.data.p, ",\n")); + CHECK_SYS( + appendf(&t.data.p, "[%ld,\"%s\",%ld]", sqlite3_column_int64(stmt, 0), + EscapeJsStringLiteral( + &sb, &sblen, (void *)sqlite3_column_text(stmt, 1), -1, 0), + sqlite3_column_int64(stmt, 2))); } - CHECK_SYS( - appendf(&a.data.p, "[%ld,\"%s\",%ld]", sqlite3_column_int64(stmt, 0), - EscapeJsStringLiteral( - &sb, &sblen, (void *)sqlite3_column_text(stmt, 1), -1, 0), - sqlite3_column_int64(stmt, 2))); - } - CHECK_SQL(sqlite3_exec(db, "END TRANSACTION", 0, 0, 0)); - CHECK_SYS(appends(&a.data.p, "]}\n")); + CHECK_SQL(sqlite3_exec(db, "END TRANSACTION", 0, 0, 0)); + CHECK_SYS(appends(&t.data.p, "]}\n")); + t.data.n = appendz(t.data.p).i; + CHECK_MEM((t.gzip = Gzip(t.data)).p); + // deploy json + a = &g_asset.recent; + //!//!//!//!//!//!//!//!//!//!//!//!//!/ + nsync_mu_lock(&a->lock); + f[0] = a->data.p; + f[1] = a->gzip.p; + a->data = t.data; + a->gzip = t.gzip; + a->mtim = t.mtim; + a->type = "application/json"; + a->cash = 0; + memcpy(a->lastmodified, t.lastmodified, 32); + nsync_mu_unlock(&a->lock); + //!//!//!//!//!//!//!//!//!//!//!//!//!/ + bzero(&t, sizeof(t)); + free(f[0]); + free(f[1]); + // handle startup condition + if (!warmedup) { + OnlyRunOnCpu(1); + nsync_counter_add(g_ready, -1); // #6 + warmedup = true; + } + // wait for wakeup or cancel + nsync_mu_lock(&g_recent.mu); + err = nsync_cv_wait_with_deadline(&g_recent.cv, &g_recent.mu, + nsync_time_no_deadline, g_shutdown); + nsync_mu_unlock(&g_recent.mu); + } while (err != ECANCELED); CHECK_DB(sqlite3_finalize(stmt)); CHECK_SQL(sqlite3_close(db)); - a.data.n = appendz(a.data.p).i; - a.gzip = Gzip(a.data); + LOG("RecentWorker exiting\n"); free(sb); - *out = a; - return true; + return 0; OnError: sqlite3_finalize(stmt); sqlite3_close(db); - free(a.data.p); - free(sb); - return false; -} - -// thread for realtime json generation most recent successful claims -void *RecentWorker(void *arg) { - int rc; - OnlyRunOnCpu(1); - pthread_setname_np(pthread_self(), "RecentWorker"); - LOG("RecentWorker started\n"); - for (;;) { - nsync_mu_lock(&g_recent.mu); - rc = nsync_cv_wait_with_deadline(&g_recent.cv, &g_recent.mu, - nsync_time_no_deadline, g_shutdown); - nsync_mu_unlock(&g_recent.mu); - if (rc == ECANCELED) break; - Update(&g_asset.recent, GenerateRecent, -1); - } - LOG("RecentWorker exiting\n"); - return 0; + free(t.data.p); + free(t.gzip.p); + goto StartOver; } // single thread for inserting batched claims into the database +// this helps us avoid over 9000 threads having fcntl bloodbath void *ClaimWorker(void *arg) { int i, n, rc; sqlite3 *db = 0; sqlite3_stmt *stmt = 0; + bool warmedup = false; struct Claim *v = _gc(xcalloc(BATCH_MAX, sizeof(struct Claim))); + BlockSignals(); pthread_setname_np(pthread_self(), "ClaimWorker"); LOG("ClaimWorker started\n"); - OnlyRunOnCpu(0); StartOver: - CHECK_SQL(sqlite3_open("db.sqlite3", &db)); - CHECK_SQL(sqlite3_exec(db, "PRAGMA journal_mode=WAL", 0, 0, 0)); - CHECK_SQL(sqlite3_exec(db, "PRAGMA synchronous=NORMAL", 0, 0, 0)); - CHECK_DB(sqlite3_prepare_v2(db, - "INSERT INTO land (ip, nick, created)\n" - "VALUES (?1, ?2, ?3)\n" - "ON CONFLICT (ip) DO\n" - "UPDATE SET (nick, created) = (?2, ?3)\n" - "WHERE nick != ?2\n" - " OR created IS NULL\n" - " OR ?3 - created > 3600", - -1, &stmt, 0)); - LOG("ClaimWorker started\n"); + CHECK_SQL(DbOpen("db.sqlite3", &db)); + CHECK_DB(DbPrepare(db, &stmt, + "INSERT INTO land (ip, nick, created)\n" + "VALUES (?1, ?2, ?3)\n" + "ON CONFLICT (ip) DO\n" + "UPDATE SET (nick, created) = (?2, ?3)\n" + " WHERE nick != ?2\n" + " OR created IS NULL\n" + " OR ?3 - created > 3600")); + if (!warmedup) { + OnlyRunOnCpu(0); + nsync_counter_add(g_ready, -1); // #7 + warmedup = true; + } while ((n = GetClaims(&g_claims, v, BATCH_MAX, nsync_time_no_deadline))) { CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); for (i = 0; i < n; ++i) { @@ -1210,6 +1513,7 @@ StartOver: } CHECK_SQL(sqlite3_exec(db, "COMMIT TRANSACTION", 0, 0, 0)); DEBUG("Committed %d claims\n", n); + // wake up RecentWorker() nsync_mu_lock(&g_recent.mu); nsync_cv_signal(&g_recent.cv); nsync_mu_unlock(&g_recent.mu); @@ -1223,16 +1527,18 @@ OnError: sqlite3_close(db); stmt = 0; db = 0; - usleep(100 * 1000); goto StartOver; } // single thread for computing HTTP Date header void *NowWorker(void *arg) { - nsync_time deadline; - OnlyRunOnCpu(0); + BlockSignals(); pthread_setname_np(pthread_self(), "NowWorker"); - for (deadline = _timespec_real();;) { + LOG("NowWorker started\n"); + UpdateNow(); + OnlyRunOnCpu(0); + nsync_counter_add(g_ready, -1); // #8 + for (nsync_time deadline = _timespec_real();;) { deadline = _timespec_add(deadline, _timespec_frommillis(DATE_UPDATE_MS)); if (!nsync_note_wait(g_shutdown, deadline)) { UpdateNow(); @@ -1240,6 +1546,7 @@ void *NowWorker(void *arg) { break; } } + LOG("NowWorker exiting\n"); return 0; } @@ -1249,7 +1556,8 @@ void *NowWorker(void *arg) { // in a while; (2) cancel clients who are sending lots of messages. void Meltdown(void) { int i, marks; - struct timespec now; + nsync_time now; + ++g_meltdowns; LOG("Panicking because %d out of %d workers is connected\n", g_connections, g_workers); now = _timespec_real(); @@ -1267,11 +1575,8 @@ void Meltdown(void) { // main thread worker void *Supervisor(void *arg) { - nsync_time deadline; - OnlyRunOnCpu(0); - pthread_setname_np(pthread_self(), "Supervisor"); - for (deadline = _timespec_real();;) { - deadline = _timespec_add(deadline, _timespec_frommillis(POLL_ASSETS_MS)); + for (nsync_time deadline = _timespec_real();;) { + deadline = _timespec_add(deadline, _timespec_frommillis(SUPERVISE_MS)); if (!nsync_note_wait(g_shutdown, deadline)) { if (g_workers > 1 && 1. / g_workers * g_connections > PANIC_LOAD) { Meltdown(); @@ -1289,26 +1594,53 @@ void *Supervisor(void *arg) { int main(int argc, char *argv[]) { // ShowCrashReports(); - GetOpts(argc, argv); + // user interface + GetOpts(argc, argv); + kprintf("\ + | _| \n\ + __| | | __| | \\ \\ \\ / _` | __|\n\ + | | | | __|\\ \\ \\ / ( | |\n\ +\\__|\\__,_|_| _| \\_/\\_/ \\__,_|_|\n"); + CHECK_EQ(0, chdir("/opt/turfwar")); + putenv("TMPDIR=/opt/turfwar/tmp"); + + // the power to serve + if (g_daemonize) { + if (fork() > 0) _Exit(0); + setsid(); + if (fork() > 0) _Exit(0); + umask(0); + if (closefrom(0)) + for (int i = 0; i < 256; ++i) // + close(i); + _npassert(0 == open(_PATH_DEVNULL, O_RDWR)); + _npassert(1 == dup(0)); + _npassert(2 == open("turfwar.log", O_CREAT | O_WRONLY | O_APPEND, 0644)); + } + + // library init __enable_threads(); sqlite3_initialize(); + + // server lifecycle locks + g_started = nsync_time_now(); g_shutdown = nsync_note_new(0, nsync_time_no_deadline); g_terminate = nsync_note_new(0, nsync_time_no_deadline); - CHECK_EQ(0, chdir("/opt/turfwar")); - putenv("TMPDIR=/opt/turfwar/tmp"); - g_asset.index = LoadAsset("index.html", "text/html; charset=utf-8"); - g_asset.about = LoadAsset("about.html", "text/html; charset=utf-8"); - g_asset.user = LoadAsset("user.html", "text/html; charset=utf-8"); - g_asset.favicon = LoadAsset("favicon.ico", "image/vnd.microsoft.icon"); + // load static assets into memory and pre-zip them + g_asset.index = LoadAsset("index.html", "text/html; charset=utf-8", 900); + g_asset.about = LoadAsset("about.html", "text/html; charset=utf-8", 900); + g_asset.user = LoadAsset("user.html", "text/html; charset=utf-8", 900); + g_asset.favicon = LoadAsset("favicon.ico", "image/vnd.microsoft.icon", 86400); + // sandbox ourselves + __pledge_mode = PLEDGE_PENALTY_RETURN_EPERM; CHECK_EQ(0, unveil("/opt/turfwar", "rwc")); CHECK_EQ(0, unveil(0, 0)); - __pledge_mode = PLEDGE_PENALTY_RETURN_EPERM; CHECK_EQ(0, pledge("stdio flock rpath wpath cpath inet", 0)); - // signal handling + // shutdown signals struct sigaction sa; sa.sa_flags = 0; sa.sa_handler = OnCtrlC; @@ -1319,38 +1651,33 @@ int main(int argc, char *argv[]) { sa.sa_handler = IgnoreSignal; sigaction(SIGUSR1, &sa, 0); - // create threads - pthread_t scorer; - CHECK_EQ(1, GenerateScore(&g_asset.score, -1)); + // make 8 helper threads + g_ready = nsync_counter_new(9); + pthread_t scorer, recenter, claimer, nower; + pthread_t scorer_hour, scorer_day, scorer_week, scorer_month; CHECK_EQ(0, pthread_create(&scorer, 0, ScoreWorker, 0)); - pthread_t scorer_hour; - CHECK_EQ(1, GenerateScore(&g_asset.score_hour, 60L * 60)); CHECK_EQ(0, pthread_create(&scorer_hour, 0, ScoreHourWorker, 0)); - pthread_t scorer_day; - CHECK_EQ(1, GenerateScore(&g_asset.score_day, 60L * 60 * 24)); CHECK_EQ(0, pthread_create(&scorer_day, 0, ScoreDayWorker, 0)); - pthread_t scorer_week; - CHECK_EQ(1, GenerateScore(&g_asset.score_week, 60L * 60 * 24 * 7)); CHECK_EQ(0, pthread_create(&scorer_week, 0, ScoreWeekWorker, 0)); - pthread_t scorer_month; - CHECK_EQ(1, GenerateScore(&g_asset.score_month, 60L * 60 * 24 * 30)); CHECK_EQ(0, pthread_create(&scorer_month, 0, ScoreMonthWorker, 0)); - pthread_t recentr; - CHECK_EQ(1, GenerateRecent(&g_asset.recent, -1)); - CHECK_EQ(0, pthread_create(&recentr, 0, RecentWorker, 0)); - pthread_t claimer; + CHECK_EQ(0, pthread_create(&recenter, 0, RecentWorker, 0)); CHECK_EQ(0, pthread_create(&claimer, 0, ClaimWorker, 0)); - pthread_t nower; - UpdateNow(); CHECK_EQ(0, pthread_create(&nower, 0, NowWorker, 0)); + + // wait for helper threads to warm up creating assets + if (nsync_counter_add(g_ready, -1)) { // #9 + nsync_counter_wait(g_ready, nsync_time_no_deadline); + } + + // create lots of http listeners to serve those assets + LOG("Online\n"); g_worker = _gc(xcalloc(g_workers, sizeof(*g_worker))); for (intptr_t i = 0; i < g_workers; ++i) { - LOG("Starting http worker #%d", i); CHECK_EQ(0, pthread_create(&g_worker[i].th, 0, HttpWorker, (void *)i)); } // time to serve - LOG("Hello\n"); + LOG("Ready\n"); Supervisor(0); // cancel accept and read for fast shutdown @@ -1365,13 +1692,13 @@ int main(int argc, char *argv[]) { CHECK_EQ(0, pthread_join(g_worker[i].th, 0)); } LOG("Waiting for helpers to finish...\n"); - CHECK_EQ(0, pthread_join(recentr, 0)); + CHECK_EQ(0, pthread_join(nower, 0)); CHECK_EQ(0, pthread_join(scorer, 0)); - CHECK_EQ(0, pthread_join(scorer_hour, 0)); + CHECK_EQ(0, pthread_join(recenter, 0)); CHECK_EQ(0, pthread_join(scorer_day, 0)); + CHECK_EQ(0, pthread_join(scorer_hour, 0)); CHECK_EQ(0, pthread_join(scorer_week, 0)); CHECK_EQ(0, pthread_join(scorer_month, 0)); - CHECK_EQ(0, pthread_join(nower, 0)); // wait for consumers to finish LOG("Waiting for queue to empty...\n"); @@ -1393,7 +1720,8 @@ int main(int argc, char *argv[]) { FreeAsset(&g_asset.favicon); nsync_note_free(g_terminate); nsync_note_free(g_shutdown); - // CheckForMemoryLeaks(); + nsync_counter_free(g_ready); LOG("Goodbye\n"); + // CheckForMemoryLeaks(); } diff --git a/test/libc/calls/write_test.c b/test/libc/calls/write_test.c index f6e7cb4a5..56cda5e29 100644 --- a/test/libc/calls/write_test.c +++ b/test/libc/calls/write_test.c @@ -20,13 +20,81 @@ #include "libc/calls/internal.h" #include "libc/calls/struct/iovec.h" #include "libc/calls/struct/iovec.internal.h" +#include "libc/calls/struct/rlimit.h" +#include "libc/calls/struct/sigaction.h" +#include "libc/calls/struct/sigset.h" #include "libc/calls/syscall-sysv.internal.h" +#include "libc/dce.h" +#include "libc/errno.h" #include "libc/sock/internal.h" #include "libc/sysv/consts/nr.h" #include "libc/sysv/consts/o.h" +#include "libc/sysv/consts/rlimit.h" +#include "libc/sysv/consts/sig.h" #include "libc/testlib/ezbench.h" +#include "libc/testlib/subprocess.h" #include "libc/testlib/testlib.h" +char testlib_enable_tmp_setup_teardown; + +TEST(write, notOpen_ebadf) { + ASSERT_SYS(EBADF, -1, write(-1, 0, 0)); + ASSERT_SYS(EBADF, -1, write(+3, 0, 0)); +} + +TEST(write, readOnlyFd_ebadf) { + ASSERT_SYS(0, 0, touch("foo", 0644)); + ASSERT_SYS(0, 3, open("foo", O_RDONLY)); + ASSERT_SYS(EBADF, -1, write(3, "x", 1)); + ASSERT_SYS(0, 0, close(3)); +} + +TEST(write, badMemory_efault) { + ASSERT_SYS(EFAULT, -1, write(1, 0, 1)); + if (!IsAsan()) return; + ASSERT_SYS(EFAULT, -1, write(1, (void *)1, 1)); +} + +TEST(write, brokenPipe_sigpipeIgnored_returnsEpipe) { + int fds[2]; + SPAWN(fork); + signal(SIGPIPE, SIG_IGN); + ASSERT_SYS(0, 0, pipe(fds)); + ASSERT_SYS(0, 1, write(4, "x", 1)); + ASSERT_SYS(0, 0, close(3)); + ASSERT_SYS(EPIPE, -1, write(4, "x", 1)); + ASSERT_SYS(0, 0, close(4)); + EXITS(0); +} + +TEST(write, brokenPipe_sigpipeBlocked_returnsEpipe) { + int fds[2]; + sigset_t mask; + SPAWN(fork); + signal(SIGPIPE, SIG_DFL); + sigemptyset(&mask); + sigaddset(&mask, SIGPIPE); + sigprocmask(SIG_BLOCK, &mask, 0); + ASSERT_SYS(0, 0, pipe(fds)); + ASSERT_SYS(0, 0, close(3)); + ASSERT_SYS(EPIPE, -1, write(4, "x", 1)); + ASSERT_SYS(0, 0, close(4)); + EXITS(0); +} + +TEST(write, rlimitFsizeExceeded_raisesEfbig) { + if (IsWindows()) return; // not supported + struct rlimit rl = {1, 10}; + SPAWN(fork); + signal(SIGXFSZ, SIG_IGN); + ASSERT_SYS(0, 0, setrlimit(RLIMIT_FSIZE, &rl)); + ASSERT_SYS(0, 3, creat("foo", 0644)); + ASSERT_SYS(0, 1, write(3, "x", 1)); + ASSERT_SYS(EFBIG, -1, write(3, "x", 1)); + ASSERT_SYS(0, 0, close(3)); + EXITS(0); +} + static long Write(long fd, const void *data, unsigned long size) { long ax, di, si, dx; asm volatile("syscall" diff --git a/test/libc/calls/writev_test.c b/test/libc/calls/writev_test.c index 04173f2e9..8b716dad1 100644 --- a/test/libc/calls/writev_test.c +++ b/test/libc/calls/writev_test.c @@ -20,12 +20,15 @@ #include "libc/calls/struct/iovec.h" #include "libc/dce.h" #include "libc/errno.h" +#include "libc/limits.h" #include "libc/macros.internal.h" -#include "libc/mem/mem.h" +#include "libc/mem/gc.h" #include "libc/mem/gc.internal.h" +#include "libc/mem/mem.h" #include "libc/runtime/runtime.h" #include "libc/sock/sock.h" #include "libc/sysv/consts/auxv.h" +#include "libc/sysv/consts/iov.h" #include "libc/sysv/consts/o.h" #include "libc/testlib/testlib.h" @@ -35,6 +38,32 @@ void SetUpOnce(void) { ASSERT_SYS(0, 0, pledge("stdio rpath wpath cpath fattr", 0)); } +TEST(writev, negativeFd_ebadf) { + ASSERT_SYS(EBADF, -1, writev(-1, 0, 0)); +} + +TEST(writev, negativeCount_einval) { + ASSERT_SYS(EINVAL, -1, writev(1, 0, -1)); +} + +TEST(writev, negative_einvalOrEfault) { + struct iovec v[] = {{"", -1}}; + ASSERT_EQ(-1, writev(1, v, 1)); + ASSERT_TRUE(errno == EINVAL || errno == EFAULT); + errno = 0; +} + +TEST(writev, exceedsIovMax_einval) { + if (IsWindows()) return; // it's complicated + int i, n = IOV_MAX + 1; + struct iovec *v = _gc(malloc(sizeof(struct iovec) * n)); + for (i = 0; i < n; ++i) { + v[i].iov_base = "x"; + v[i].iov_len = 1; + } + ASSERT_SYS(EINVAL, -1, writev(1, v, n)); +} + TEST(writev, test) { int fd; char ba[1] = "a"; diff --git a/test/libc/thread/pthread_barrier_wait_test.c b/test/libc/thread/pthread_barrier_wait_test.c index 2a826fb41..65b623d8e 100644 --- a/test/libc/thread/pthread_barrier_wait_test.c +++ b/test/libc/thread/pthread_barrier_wait_test.c @@ -20,8 +20,8 @@ #include "libc/atomic.h" #include "libc/errno.h" #include "libc/intrin/atomic.h" -#include "libc/mem/mem.h" #include "libc/mem/gc.internal.h" +#include "libc/mem/mem.h" #include "libc/testlib/testlib.h" #include "libc/thread/spawn.h" #include "libc/thread/thread.h"