Do some work on TurfWar

This commit is contained in:
Justine Tunney 2022-10-07 03:11:07 -07:00
parent bc353f454b
commit 05197afca2
No known key found for this signature in database
GPG key ID: BE714B4575D6E328
12 changed files with 499 additions and 181 deletions

View file

@ -29,6 +29,7 @@
#include "libc/errno.h"
#include "libc/fmt/conv.h"
#include "libc/fmt/itoa.h"
#include "libc/intrin/atomic.h"
#include "libc/intrin/bits.h"
#include "libc/intrin/kprintf.h"
#include "libc/intrin/strace.internal.h"
@ -86,24 +87,27 @@
* @fileoverview production webserver for turfwar online game
*/
#define PORT 8080 // default server listening port
#define WORKERS 1001 // size of http client thread pool
#define SUPERVISE_MS 1000 // how often to stat() asset files
#define KEEPALIVE_MS 60000 // max time to keep idle conn open
#define MELTALIVE_MS 2000 // panic keepalive under heavy load
#define DATE_UPDATE_MS 500 // how often to do tzdata crunching
#define SCORE_UPDATE_MS 90000 // how often to regenerate /score
#define SCORE_H_UPDATE_MS 10000 // how often to regenerate /score/hour
#define SCORE_D_UPDATE_MS 15000 // how often to regenerate /score/day
#define SCORE_W_UPDATE_MS 30000 // how often to regenerate /score/week
#define SCORE_M_UPDATE_MS 60000 // how often to regenerate /score/month
#define CLAIM_DEADLINE_MS 50 // how long /claim may block if queue is full
#define PANIC_LOAD .85 // meltdown if this percent of pool connected
#define PANIC_MSGS 10 // msgs per conn can't exceed it in meltdown
#define QUEUE_MAX 800 // maximum pending claim items in queue
#define BATCH_MAX 64 // max claims to insert per transaction
#define NICK_MAX 40 // max length of user nickname string
#define MSG_BUF 512 // small response lookaside
#define PORT 8080 // default server listening port
#define CPUS 256 // number of cpus to actually use
#define WORKERS 1000 // size of http client thread pool
#define SUPERVISE_MS 1000 // how often to stat() asset files
#define KEEPALIVE_MS 60000 // max time to keep idle conn open
#define MELTALIVE_MS 2000 // panic keepalive under heavy load
#define SCORE_UPDATE_MS 90000 // how often to regenerate /score
#define SCORE_H_UPDATE_MS 10000 // how often to regenerate /score/hour
#define SCORE_D_UPDATE_MS 15000 // how often to regenerate /score/day
#define SCORE_W_UPDATE_MS 30000 // how often to regenerate /score/week
#define SCORE_M_UPDATE_MS 60000 // how often to regenerate /score/month
#define ACCEPT_DEADLINE_MS 100 // how long accept() can take to find worker
#define CLAIM_DEADLINE_MS 100 // how long /claim may block if queue is full
#define CONCERN_LOAD .75 // avoid keepalive, upon this connection load
#define PANIC_LOAD .85 // meltdown if this percent of pool connected
#define PANIC_MSGS 10 // msgs per conn can't exceed it in meltdown
#define QUEUE_MAX 800 // maximum pending claim items in queue
#define BATCH_MAX 64 // max claims to insert per transaction
#define NICK_MAX 40 // max length of user nickname string
#define SOCK_MAX 100 // max length of socket queue
#define MSG_BUF 512 // small response lookaside
#define INBUF_SIZE PAGESIZE
#define OUTBUF_SIZE 8192
@ -229,11 +233,11 @@ int g_workers = WORKERS;
int g_keepalive = KEEPALIVE_MS;
// lifecycle vars
pthread_t g_listener;
nsync_time g_started;
nsync_counter g_ready;
nsync_note g_shutdown;
nsync_note g_terminate;
atomic_int g_connections;
nsync_note g_shutdown[3];
// whitebox metrics
atomic_long g_accepts;
@ -242,6 +246,7 @@ atomic_long g_proxied;
atomic_long g_messages;
atomic_long g_memfails;
atomic_long g_sysfails;
atomic_long g_rejected;
atomic_long g_unproxied;
atomic_long g_readfails;
atomic_long g_notfounds;
@ -257,8 +262,10 @@ atomic_long g_plainclaims;
atomic_long g_imageclaims;
atomic_long g_invalidnames;
atomic_long g_ipv6forwards;
atomic_long g_claimrequests;
atomic_long g_assetrequests;
atomic_long g_claimrequests;
atomic_long g_claimsenqueued;
atomic_long g_claimsprocessed;
atomic_long g_statuszrequests;
// http worker objects
@ -297,6 +304,20 @@ struct Assets {
struct Asset favicon;
} g_asset;
// queues ListenWorker() to HttpWorker()
struct Clients {
int pos;
int count;
nsync_mu mu;
nsync_cv non_full;
nsync_cv non_empty;
struct Client {
int sock;
uint32_t size;
struct sockaddr_in addr;
} data[SOCK_MAX];
} g_clients;
// queues /claim to ClaimWorker()
struct Claims {
int pos;
@ -316,25 +337,30 @@ ssize_t Write(int fd, const char *s) {
return write(fd, s, strlen(s));
}
// turns relative timeout into an absolute timeout
struct timespec WaitFor(int millis) {
return _timespec_add(_timespec_real(), _timespec_frommillis(millis));
}
// helper functions for check macro implementation
bool CheckMem(const char *file, int line, void *ptr) {
if (ptr) return true;
kprintf("%s:%d: out of memory: %s\n", file, line, strerror(errno));
kprintf("%s:%d: %P: out of memory: %s\n", file, line, strerror(errno));
return false;
}
bool CheckSys(const char *file, int line, long rc) {
if (rc != -1) return true;
kprintf("%s:%d: %s\n", file, line, strerror(errno));
kprintf("%s:%d: %P: %s\n", file, line, strerror(errno));
return false;
}
bool CheckSql(const char *file, int line, int rc) {
if (rc == SQLITE_OK) return true;
kprintf("%s:%d: %s\n", file, line, sqlite3_errstr(rc));
kprintf("%s:%d: %P: %s\n", file, line, sqlite3_errstr(rc));
return false;
}
bool CheckDb(const char *file, int line, int rc, sqlite3 *db) {
if (rc == SQLITE_OK) return true;
kprintf("%s:%d: %s: %s\n", file, line, sqlite3_errstr(rc),
kprintf("%s:%d: %P: %s: %s\n", file, line, sqlite3_errstr(rc),
sqlite3_errmsg(db));
return false;
}
@ -398,7 +424,7 @@ char *FormatUnixHttpDateTime(char *s, int64_t t) {
void UpdateNow(void) {
int64_t secs;
struct tm tm;
clock_gettime(CLOCK_REALTIME, &g_nowish.ts);
g_nowish.ts = _timespec_real();
secs = g_nowish.ts.tv_sec;
gmtime_r(&secs, &tm);
//!//!//!//!//!//!//!//!//!//!//!//!//!/
@ -420,6 +446,57 @@ char *FormatDate(char *p) {
return p;
}
bool AddClient(struct Clients *q, const struct Client *v, nsync_time dead) {
bool wake = false;
bool added = false;
nsync_mu_lock(&q->mu);
while (q->count == ARRAYLEN(q->data)) {
if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead,
g_shutdown[0])) {
break; // must be ETIMEDOUT or ECANCELED
}
}
if (q->count != ARRAYLEN(q->data)) {
int i = q->pos + q->count;
if (ARRAYLEN(q->data) <= i) i -= ARRAYLEN(q->data);
memcpy(q->data + i, v, sizeof(*v));
if (!q->count) wake = true;
q->count++;
added = true;
}
nsync_mu_unlock(&q->mu);
if (wake) {
nsync_cv_broadcast(&q->non_empty);
}
return added;
}
int GetClient(struct Clients *q, struct Client *out) {
int got = 0;
int len = 1;
nsync_mu_lock(&q->mu);
while (!q->count) {
if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu,
nsync_time_no_deadline, g_shutdown[1])) {
break; // must be ECANCELED
}
}
while (got < len && q->count) {
memcpy(out + got, q->data + q->pos, sizeof(*out));
if (q->count == ARRAYLEN(q->data)) {
nsync_cv_broadcast(&q->non_full);
}
++got;
q->pos++;
q->count--;
if (q->pos == ARRAYLEN(q->data)) {
q->pos = 0;
}
}
nsync_mu_unlock(&q->mu);
return got;
}
// inserts ip:name claim into blocking message queue
// may be interrupted by absolute deadline
// may be cancelled by server shutdown
@ -428,7 +505,8 @@ bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) {
bool added = false;
nsync_mu_lock(&q->mu);
while (q->count == ARRAYLEN(q->data)) {
if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead, g_shutdown)) {
if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead,
g_shutdown[1])) {
break; // must be ETIMEDOUT or ECANCELED
}
}
@ -448,14 +526,14 @@ bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) {
}
// removes batch of ip:name claims from blocking message queue
// may be interrupted by absolute deadline
// may be cancelled by server termination
int GetClaims(struct Claims *q, struct Claim *out, int len, nsync_time dead) {
// has no deadline or cancellation; enqueued must be processed
int GetClaims(struct Claims *q, struct Claim *out, int len) {
int got = 0;
nsync_mu_lock(&q->mu);
while (!q->count) {
if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu, dead, g_terminate)) {
break; // must be ETIMEDOUT or ECANCELED
if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu,
nsync_time_no_deadline, g_shutdown[2])) {
break; // must be ECANCELED
}
}
while (got < len && q->count) {
@ -516,26 +594,30 @@ void FreeSafeBuffer(void *p) {
}
void OnlyRunOnCpu(int i) {
int n;
cpu_set_t cpus;
if (GetCpuCount() > i + 1) {
CPU_ZERO(&cpus);
CPU_SET(i, &cpus);
CHECK_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus));
}
_Static_assert(CPUS > 0, "");
n = GetCpuCount();
n = MIN(CPUS, n);
i = MIN(i, n - 1);
CPU_ZERO(&cpus);
CPU_SET(i, &cpus);
CHECK_NE(0, CPU_COUNT(&cpus));
pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus);
}
void DontRunOnFirstCpus(int i) {
int n;
cpu_set_t cpus;
if ((n = GetCpuCount()) > 1) {
CPU_ZERO(&cpus);
for (; i < n; ++i) {
CPU_SET(i, &cpus);
}
CHECK_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus));
} else {
notpossible;
_Static_assert(CPUS > 0, "");
n = GetCpuCount();
n = MIN(CPUS, n);
i = MIN(i, n - 1);
CPU_ZERO(&cpus);
for (; i < n; ++i) {
CPU_SET(i, &cpus);
}
pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus);
}
// signals by default get delivered to any random thread
@ -565,9 +647,9 @@ char *Statusz(char *p, const char *s, long x) {
// public /statusz endpoint for monitoring server internals
void ServeStatusz(int client, char *outbuf) {
char *p;
nsync_time now;
struct rusage ru;
now = nsync_time_now();
struct timespec now;
now = _timespec_real();
p = outbuf;
p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
@ -575,7 +657,7 @@ void ServeStatusz(int client, char *outbuf) {
"Connection: close\r\n"
"\r\n");
p = Statusz(p, "qps",
g_messages / MAX(1, nsync_time_sub(now, g_started).tv_sec));
g_messages / MAX(1, _timespec_sub(now, g_started).tv_sec));
p = Statusz(p, "started", g_started.tv_sec);
p = Statusz(p, "now", now.tv_sec);
p = Statusz(p, "connections", g_connections);
@ -586,6 +668,7 @@ void ServeStatusz(int client, char *outbuf) {
p = Statusz(p, "proxied", g_proxied);
p = Statusz(p, "memfails", g_memfails);
p = Statusz(p, "sysfails", g_sysfails);
p = Statusz(p, "rejected", g_rejected);
p = Statusz(p, "unproxied", g_unproxied);
p = Statusz(p, "readfails", g_readfails);
p = Statusz(p, "notfounds", g_notfounds);
@ -601,8 +684,10 @@ void ServeStatusz(int client, char *outbuf) {
p = Statusz(p, "imageclaims", g_imageclaims);
p = Statusz(p, "invalidnames", g_invalidnames);
p = Statusz(p, "ipv6forwards", g_ipv6forwards);
p = Statusz(p, "claimrequests", g_claimrequests);
p = Statusz(p, "assetrequests", g_assetrequests);
p = Statusz(p, "claimrequests", g_claimrequests);
p = Statusz(p, "claimsenqueued", g_claimsenqueued);
p = Statusz(p, "claimsprocessed", g_claimsprocessed);
p = Statusz(p, "statuszrequests", g_statuszrequests);
if (!getrusage(RUSAGE_SELF, &ru)) {
p = Statusz(p, "ru_utime.tv_sec", ru.ru_utime.tv_sec);
@ -627,56 +712,74 @@ void ServeStatusz(int client, char *outbuf) {
write(client, outbuf, p - outbuf);
}
// make thousands of http client handler threads
// load balance incoming connections for port 8080 across all threads
// hangup on any browser clients that lag for more than a few seconds
void *HttpWorker(void *arg) {
void *ListenWorker(void *arg) {
int server;
int yes = 1;
int id = (intptr_t)arg;
char *msgbuf = _gc(xmalloc(MSG_BUF));
char *inbuf = NewSafeBuffer(INBUF_SIZE);
char *outbuf = NewSafeBuffer(OUTBUF_SIZE);
struct Client client;
struct timeval timeo = {g_keepalive / 1000, g_keepalive % 1000};
struct HttpMessage *msg = _gc(xmalloc(sizeof(struct HttpMessage)));
struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(g_port)};
BlockSignals();
DontRunOnFirstCpus(1);
AllowSigusr1();
OnlyRunOnCpu(0);
pthread_setname_np(pthread_self(), "Listener");
CHECK_NE(-1, (server = socket(AF_INET, SOCK_STREAM, 0)));
pthread_setname_np(pthread_self(), _gc(xasprintf("HTTP #%d", id)));
setsockopt(server, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo));
setsockopt(server, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo));
setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
setsockopt(server, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes));
setsockopt(server, SOL_TCP, TCP_FASTOPEN, &yes, sizeof(yes));
setsockopt(server, SOL_TCP, TCP_QUICKACK, &yes, sizeof(yes));
setsockopt(server, SOL_TCP, TCP_NODELAY, &yes, sizeof(yes));
CHECK_NE(-1, bind(server, &addr, sizeof(addr)));
CHECK_NE(-1, listen(server, 1));
// connection loop
while (!nsync_note_is_notified(g_shutdown)) {
struct Data d;
struct Url url;
ssize_t got, sent;
uint32_t ip, clientip;
uint32_t clientaddrsize;
int client, inmsglen, outmsglen;
char ipbuf[32], *p, *q, cashbuf[64];
struct sockaddr_in clientaddr = {0};
// wait for client connection
// this may be cancelled by sigusr1
AllowSigusr1();
clientaddrsize = sizeof(clientaddr);
client = accept(server, (struct sockaddr *)&clientaddr, &clientaddrsize);
if (client == -1) {
while (!nsync_note_is_notified(g_shutdown[0])) {
client.size = sizeof(client.addr);
client.sock = accept(server, (struct sockaddr *)&client.addr, &client.size);
if (client.sock == -1) {
if (errno != EAGAIN) { // spinning on SO_RCVTIMEO
++g_acceptfails;
}
continue;
}
clientip = ntohl(clientaddr.sin_addr.s_addr);
if (!AddClient(&g_clients, &client, WaitFor(ACCEPT_DEADLINE_MS))) {
++g_rejected;
LOG("502 Accept Queue Full\n");
Write(client.sock, "HTTP/1.1 502 Accept Queue Full\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"\r\n"
"Accept Queue Full\n");
close(client.sock);
}
}
close(server);
nsync_note_notify(g_shutdown[1]);
return 0;
}
// make thousands of http client handler threads
// load balance incoming connections for port 8080 across all threads
// hangup on any browser clients that lag for more than a few seconds
void *HttpWorker(void *arg) {
struct Client client;
int id = (intptr_t)arg;
char *msgbuf = _gc(xmalloc(MSG_BUF));
char *inbuf = NewSafeBuffer(INBUF_SIZE);
char *outbuf = NewSafeBuffer(OUTBUF_SIZE);
struct HttpMessage *msg = _gc(xmalloc(sizeof(struct HttpMessage)));
BlockSignals();
DontRunOnFirstCpus(1);
pthread_setname_np(pthread_self(), _gc(xasprintf("HTTP%d", id)));
// connection loop
while (GetClient(&g_clients, &client)) {
struct Data d;
struct Url url;
ssize_t got, sent;
uint32_t ip, clientip;
int inmsglen, outmsglen;
char ipbuf[32], *p, *q, cashbuf[64];
clientip = ntohl(client.addr.sin_addr.s_addr);
g_worker[id].connected = true;
g_worker[id].msgcount = 0;
++g_accepts;
@ -697,7 +800,7 @@ void *HttpWorker(void *arg) {
AllowSigusr1();
InitHttpMessage(msg, kHttpRequest);
g_worker[id].startread = _timespec_real();
if ((got = read(client, inbuf, INBUF_SIZE)) <= 0) {
if ((got = read(client.sock, inbuf, INBUF_SIZE)) <= 0) {
++g_readfails;
break;
}
@ -745,11 +848,11 @@ void *HttpWorker(void *arg) {
// we don't support http/1.0 and http/0.9 right now
if (msg->version != 11) {
LOG("%s used unsupported http/%d version\n", ipbuf, msg->version);
Write(client, "HTTP/1.1 505 HTTP Version Not Supported\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"\r\n"
"HTTP Version Not Supported\n");
Write(client.sock, "HTTP/1.1 505 HTTP Version Not Supported\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"\r\n"
"HTTP Version Not Supported\n");
++g_badversions;
break;
}
@ -764,7 +867,7 @@ void *HttpWorker(void *arg) {
// export monitoring data
if (UrlEqual("/statusz")) {
ServeStatusz(client, outbuf);
ServeStatusz(client.sock, outbuf);
++g_statuszrequests;
break;
}
@ -821,7 +924,7 @@ void *HttpWorker(void *arg) {
p = stpcpy(p, cashbuf);
p = stpcpy(p, "\r\n\r\n");
outmsglen = p - outbuf;
sent = write(client, outbuf, outmsglen);
sent = write(client.sock, outbuf, outmsglen);
} else {
p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS
"Vary: Accept-Encoding\r\n"
@ -845,7 +948,7 @@ void *HttpWorker(void *arg) {
iov[1].iov_base = d.p;
iov[1].iov_len = msg->method == kHttpHead ? 0 : d.n;
outmsglen = iov[0].iov_len + iov[1].iov_len;
sent = writev(client, iov, 2);
sent = writev(client.sock, iov, 2);
}
nsync_mu_runlock(&a->lock);
////////////////////////////////////////
@ -865,7 +968,7 @@ void *HttpWorker(void *arg) {
p = stpcpy(p, "\r\n\r\n");
p = stpcpy(p, ipbuf);
outmsglen = p - outbuf;
sent = write(client, outbuf, outmsglen);
sent = write(client.sock, outbuf, outmsglen);
} else {
Ipv6Warning:
DEBUG("%.*s via %s: 400 Need IPv4\n",
@ -885,7 +988,7 @@ void *HttpWorker(void *arg) {
p = stpcpy(p, "\r\n\r\n");
p = stpcpy(p, q);
outmsglen = p - outbuf;
sent = write(client, outbuf, p - outbuf);
sent = write(client.sock, outbuf, p - outbuf);
break;
}
@ -899,6 +1002,7 @@ void *HttpWorker(void *arg) {
&g_claims, &v,
_timespec_add(_timespec_real(),
_timespec_frommillis(CLAIM_DEADLINE_MS)))) {
++g_claimsenqueued;
DEBUG("%s claimed by %s\n", ipbuf, v.name);
if (HasHeader(kHttpAccept) &&
(HeaderHas(msg, inbuf, kHttpAccept, "image/*", 7) ||
@ -968,14 +1072,14 @@ void *HttpWorker(void *arg) {
p = stpcpy(p, "\r\n\r\n");
}
outmsglen = p - outbuf;
sent = write(client, outbuf, p - outbuf);
sent = write(client.sock, outbuf, p - outbuf);
} else {
LOG("%s: 502 Claims Queue Full\n", ipbuf);
Write(client, "HTTP/1.1 502 Claims Queue Full\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"\r\n"
"Claims Queue Full\n");
Write(client.sock, "HTTP/1.1 502 Claims Queue Full\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"\r\n"
"Claims Queue Full\n");
++g_queuefulls;
break;
}
@ -995,7 +1099,7 @@ void *HttpWorker(void *arg) {
p = stpcpy(p, "\r\n\r\n");
p = stpcpy(p, q);
outmsglen = p - outbuf;
sent = write(client, outbuf, p - outbuf);
sent = write(client.sock, outbuf, p - outbuf);
break;
}
@ -1017,22 +1121,24 @@ void *HttpWorker(void *arg) {
p = stpcpy(p, "\r\n\r\n");
p = stpcpy(p, q);
outmsglen = p - outbuf;
sent = write(client, outbuf, p - outbuf);
sent = write(client.sock, outbuf, p - outbuf);
}
// if the client isn't pipelining and write() wrote the full
// amount, then since we sent the content length and checked
// that the client didn't attach a payload, we are so synced
// thus we can safely process more messages
} while (got == inmsglen && //
sent == outmsglen && //
!HasHeader(kHttpContentLength) && //
!HasHeader(kHttpTransferEncoding) && //
(msg->method == kHttpGet || //
msg->method == kHttpHead) && //
!nsync_note_is_notified(g_shutdown));
} while (got == inmsglen && //
sent == outmsglen && //
!HasHeader(kHttpContentLength) && //
!HasHeader(kHttpTransferEncoding) && //
!HeaderEqualCase(kHttpConnection, "close") && //
(msg->method == kHttpGet || //
msg->method == kHttpHead) && //
1. / g_workers * g_connections < CONCERN_LOAD && //
!nsync_note_is_notified(g_shutdown[1]));
DestroyHttpMessage(msg);
close(client);
close(client.sock);
g_worker[id].connected = false;
--g_connections;
}
@ -1041,7 +1147,6 @@ void *HttpWorker(void *arg) {
g_worker[id].shutdown = true;
FreeSafeBuffer(outbuf);
FreeSafeBuffer(inbuf);
close(server);
return 0;
}
@ -1148,17 +1253,18 @@ void IgnoreSignal(int sig) {
// asynchronous handler of sigint, sigterm, and sighup signals
// this handler is always invoked from within the main thread,
// because our helper and worker threads block always signals.
// because our helper and worker threads always block signals.
void OnCtrlC(int sig) {
if (!nsync_note_is_notified(g_shutdown)) {
if (!nsync_note_is_notified(g_shutdown[0])) {
LOG("Received %s shutting down...\n", strsignal(sig));
nsync_note_notify(g_shutdown);
nsync_note_notify(g_shutdown[0]);
} else {
// there's no way to deliver signals to workers atomically, unless
// we pay the cost of ppoll() which isn't necessary in this design
// so if a user smashes that ctrl-c then we tkill the workers more
LOG("Received %s again so sending another volley...\n", strsignal(sig));
for (int i = 0; i < g_workers; ++i) {
tkill(pthread_getunique_np(g_listener), SIGUSR1);
if (!g_worker[i].shutdown) {
tkill(pthread_getunique_np(g_worker[i].th), SIGUSR1);
}
@ -1233,7 +1339,7 @@ bool GenerateScore(struct Asset *out, long secs, long cash) {
DEBUG("GenerateScore %ld\n", secs);
a.type = "application/json";
a.cash = cash;
CHECK_SYS(clock_gettime(CLOCK_REALTIME, &a.mtim));
a.mtim = _timespec_real();
FormatUnixHttpDateTime(a.lastmodified, a.mtim.tv_sec);
CHECK_SYS(appends(&a.data.p, "{\n"));
CHECK_SYS(appendf(&a.data.p, "\"now\":[%ld,%ld],\n", a.mtim.tv_sec,
@ -1258,7 +1364,7 @@ bool GenerateScore(struct Asset *out, long secs, long cash) {
// otherwise.. you can use --strace to see the fcntl bloodbath
CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0));
while ((rc = sqlite3_step(stmt)) != SQLITE_DONE) {
if (rc != SQLITE_ROW) CHECK_SQL(rc);
if (rc != SQLITE_ROW) CHECK_DB(rc);
strlcpy(name2, (void *)sqlite3_column_text(stmt, 0), sizeof(name2));
if (!IsValidNick(name2, -1)) continue;
if (strcmp(name1, name2)) {
@ -1297,16 +1403,14 @@ OnError:
void *ScoreWorker(void *arg) {
BlockSignals();
pthread_setname_np(pthread_self(), "ScoreAll");
LOG("Score started\n");
LOG("%P Score started\n");
long wait = SCORE_UPDATE_MS;
Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait));
nsync_counter_add(g_ready, -1); // #1
OnlyRunOnCpu(0);
for (nsync_time deadline = _timespec_real();;) {
do {
Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait));
deadline = _timespec_add(deadline, _timespec_frommillis(wait));
if (nsync_note_wait(g_shutdown, deadline)) break;
}
} while (!nsync_note_wait(g_shutdown[1], WaitFor(wait)));
LOG("Score exiting\n");
return 0;
}
@ -1315,17 +1419,15 @@ void *ScoreWorker(void *arg) {
void *ScoreHourWorker(void *arg) {
BlockSignals();
pthread_setname_np(pthread_self(), "ScoreHour");
LOG("ScoreHour started\n");
LOG("%P ScoreHour started\n");
long secs = 60L * 60;
long wait = SCORE_H_UPDATE_MS;
Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait));
nsync_counter_add(g_ready, -1); // #2
OnlyRunOnCpu(0);
for (nsync_time deadline = _timespec_real();;) {
do {
Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait));
deadline = _timespec_add(deadline, _timespec_frommillis(wait));
if (nsync_note_wait(g_shutdown, deadline)) break;
}
} while (!nsync_note_wait(g_shutdown[1], WaitFor(wait)));
LOG("ScoreHour exiting\n");
return 0;
}
@ -1334,17 +1436,15 @@ void *ScoreHourWorker(void *arg) {
void *ScoreDayWorker(void *arg) {
BlockSignals();
pthread_setname_np(pthread_self(), "ScoreDay");
LOG("ScoreDay started\n");
LOG("%P ScoreDay started\n");
long secs = 60L * 60 * 24;
long wait = SCORE_D_UPDATE_MS;
Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait));
nsync_counter_add(g_ready, -1); // #3
OnlyRunOnCpu(0);
for (nsync_time deadline = _timespec_real();;) {
do {
Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait));
deadline = _timespec_add(deadline, _timespec_frommillis(wait));
if (nsync_note_wait(g_shutdown, deadline)) break;
}
} while (!nsync_note_wait(g_shutdown[1], WaitFor(wait)));
LOG("ScoreDay exiting\n");
return 0;
}
@ -1353,17 +1453,15 @@ void *ScoreDayWorker(void *arg) {
void *ScoreWeekWorker(void *arg) {
BlockSignals();
pthread_setname_np(pthread_self(), "ScoreWeek");
LOG("ScoreWeek started\n");
LOG("%P ScoreWeek started\n");
long secs = 60L * 60 * 24 * 7;
long wait = SCORE_W_UPDATE_MS;
Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait));
nsync_counter_add(g_ready, -1); // #4
OnlyRunOnCpu(0);
for (nsync_time deadline = _timespec_real();;) {
do {
Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait));
deadline = _timespec_add(deadline, _timespec_frommillis(wait));
if (nsync_note_wait(g_shutdown, deadline)) break;
}
} while (!nsync_note_wait(g_shutdown[1], WaitFor(wait)));
LOG("ScoreWeek exiting\n");
return 0;
}
@ -1372,17 +1470,15 @@ void *ScoreWeekWorker(void *arg) {
void *ScoreMonthWorker(void *arg) {
BlockSignals();
pthread_setname_np(pthread_self(), "ScoreMonth");
LOG("ScoreMonth started\n");
LOG("%P ScoreMonth started\n");
long secs = 60L * 60 * 24 * 30;
long wait = SCORE_M_UPDATE_MS;
Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait));
nsync_counter_add(g_ready, -1); // #5
OnlyRunOnCpu(0);
for (nsync_time deadline = _timespec_real();;) {
do {
Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait));
deadline = _timespec_add(deadline, _timespec_frommillis(wait));
if (nsync_note_wait(g_shutdown, deadline)) break;
}
} while (!nsync_note_wait(g_shutdown[1], WaitFor(wait)));
LOG("ScoreMonth exiting\n");
return 0;
}
@ -1400,7 +1496,7 @@ void *RecentWorker(void *arg) {
bool warmedup = false;
BlockSignals();
pthread_setname_np(pthread_self(), "RecentWorker");
LOG("RecentWorker started\n");
LOG("%P RecentWorker started\n");
StartOver:
db = 0;
stmt = 0;
@ -1414,7 +1510,7 @@ StartOver:
"LIMIT 50"));
do {
// regenerate json
CHECK_SYS(clock_gettime(CLOCK_REALTIME, &t.mtim));
t.mtim = _timespec_real();
FormatUnixHttpDateTime(t.lastmodified, t.mtim.tv_sec);
CHECK_SYS(appends(&t.data.p, "{\n"));
CHECK_SYS(appendf(&t.data.p, "\"now\":[%ld,%ld],\n", t.mtim.tv_sec,
@ -1461,7 +1557,7 @@ StartOver:
// wait for wakeup or cancel
nsync_mu_lock(&g_recent.mu);
err = nsync_cv_wait_with_deadline(&g_recent.cv, &g_recent.mu,
nsync_time_no_deadline, g_shutdown);
nsync_time_no_deadline, g_shutdown[1]);
nsync_mu_unlock(&g_recent.mu);
} while (err != ECANCELED);
CHECK_DB(sqlite3_finalize(stmt));
@ -1482,12 +1578,13 @@ OnError:
void *ClaimWorker(void *arg) {
sqlite3 *db;
int i, n, rc;
long processed;
sqlite3_stmt *stmt;
bool warmedup = false;
struct Claim *v = _gc(xcalloc(BATCH_MAX, sizeof(struct Claim)));
BlockSignals();
pthread_setname_np(pthread_self(), "ClaimWorker");
LOG("ClaimWorker started\n");
LOG("%P ClaimWorker started\n");
StartOver:
db = 0;
stmt = 0;
@ -1505,7 +1602,8 @@ StartOver:
nsync_counter_add(g_ready, -1); // #7
warmedup = true;
}
while ((n = GetClaims(&g_claims, v, BATCH_MAX, nsync_time_no_deadline))) {
while ((n = GetClaims(&g_claims, v, BATCH_MAX))) {
processed = 0;
CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0));
for (i = 0; i < n; ++i) {
CHECK_DB(sqlite3_bind_int64(stmt, 1, v[i].ip));
@ -1514,8 +1612,10 @@ StartOver:
CHECK_DB(sqlite3_bind_int64(stmt, 3, v[i].created));
CHECK_DB((rc = sqlite3_step(stmt)) == SQLITE_DONE ? SQLITE_OK : rc);
CHECK_DB(sqlite3_reset(stmt));
++processed;
}
CHECK_SQL(sqlite3_exec(db, "COMMIT TRANSACTION", 0, 0, 0));
atomic_fetch_add(&g_claimsprocessed, processed);
DEBUG("Committed %d claims\n", n);
// wake up RecentWorker()
nsync_mu_lock(&g_recent.mu);
@ -1536,13 +1636,12 @@ OnError:
void *NowWorker(void *arg) {
BlockSignals();
pthread_setname_np(pthread_self(), "NowWorker");
LOG("NowWorker started\n");
LOG("%P NowWorker started\n");
UpdateNow();
OnlyRunOnCpu(0);
nsync_counter_add(g_ready, -1); // #8
for (nsync_time deadline = _timespec_real();;) {
deadline = _timespec_add(deadline, _timespec_frommillis(DATE_UPDATE_MS));
if (!nsync_note_wait(g_shutdown, deadline)) {
for (struct timespec ts = {_timespec_real().tv_sec};; ++ts.tv_sec) {
if (!nsync_note_wait(g_shutdown[1], ts)) {
UpdateNow();
} else {
break;
@ -1558,7 +1657,7 @@ void *NowWorker(void *arg) {
// in a while; (2) cancel clients who are sending lots of messages.
void Meltdown(void) {
int i, marks;
nsync_time now;
struct timespec now;
++g_meltdowns;
LOG("Panicking because %d out of %d workers is connected\n", g_connections,
g_workers);
@ -1577,9 +1676,8 @@ void Meltdown(void) {
// main thread worker
void *Supervisor(void *arg) {
for (nsync_time deadline = _timespec_real();;) {
deadline = _timespec_add(deadline, _timespec_frommillis(SUPERVISE_MS));
if (!nsync_note_wait(g_shutdown, deadline)) {
for (;;) {
if (!nsync_note_wait(g_shutdown[0], WaitFor(SUPERVISE_MS))) {
if (g_workers > 1 && 1. / g_workers * g_connections > PANIC_LOAD) {
Meltdown();
}
@ -1626,9 +1724,10 @@ int main(int argc, char *argv[]) {
sqlite3_initialize();
// server lifecycle locks
g_started = nsync_time_now();
g_shutdown = nsync_note_new(0, nsync_time_no_deadline);
g_terminate = nsync_note_new(0, nsync_time_no_deadline);
g_started = _timespec_real();
for (int i = 0; i < ARRAYLEN(g_shutdown); ++i) {
g_shutdown[i] = nsync_note_new(0, nsync_time_no_deadline);
}
// load static assets into memory and pre-zip them
g_asset.index = LoadAsset("index.html", "text/html; charset=utf-8", 900);
@ -1640,7 +1739,10 @@ int main(int argc, char *argv[]) {
__pledge_mode = PLEDGE_PENALTY_RETURN_EPERM;
CHECK_EQ(0, unveil("/opt/turfwar", "rwc"));
CHECK_EQ(0, unveil(0, 0));
CHECK_EQ(0, pledge("stdio flock rpath wpath cpath inet", 0));
if (!IsOpenbsd()) {
// TODO(jart): why isn't pledge working on openbsd?
CHECK_EQ(0, pledge("stdio flock rpath wpath cpath inet", 0));
}
// shutdown signals
struct sigaction sa;
@ -1671,7 +1773,10 @@ int main(int argc, char *argv[]) {
nsync_counter_wait(g_ready, nsync_time_no_deadline);
}
// create lots of http listeners to serve those assets
// create one thread to listen
CHECK_EQ(0, pthread_create(&g_listener, 0, ListenWorker, 0));
// create lots of http workers to serve those assets
LOG("Online\n");
g_worker = _gc(xcalloc(g_workers, sizeof(*g_worker)));
for (intptr_t i = 0; i < g_workers; ++i) {
@ -1682,7 +1787,12 @@ int main(int argc, char *argv[]) {
LOG("Ready\n");
Supervisor(0);
// cancel accept and read for fast shutdown
// cancel listen() so we stop accepting new clients
LOG("Interrupting listen...\n");
tkill(pthread_getunique_np(g_listener), SIGUSR1);
CHECK_EQ(0, pthread_join(g_listener, 0));
// cancel read() so that keepalive clients finish faster
LOG("Interrupting workers...\n");
for (int i = 0; i < g_workers; ++i) {
tkill(pthread_getunique_np(g_worker[i].th), SIGUSR1);
@ -1702,11 +1812,16 @@ int main(int argc, char *argv[]) {
CHECK_EQ(0, pthread_join(scorer_week, 0));
CHECK_EQ(0, pthread_join(scorer_month, 0));
// wait for consumers to finish
LOG("Waiting for queue to empty...\n");
nsync_note_notify(g_terminate);
CHECK_EQ(0, pthread_join(claimer, 0));
// now that all workers have terminated, the claims queue must be
// empty, therefore, it is now safe to send a cancellation to the
// claims worker thread which waits forever for new claims.
CHECK_EQ(0, g_claims.count);
LOG("waiting for claims worker...\n");
nsync_note_notify(g_shutdown[2]);
CHECK_EQ(0, pthread_join(claimer, 0));
// perform some sanity checks
CHECK_EQ(g_claimsprocessed, g_claimsenqueued);
// free memory
LOG("Freeing memory...\n");
@ -1720,8 +1835,9 @@ int main(int argc, char *argv[]) {
FreeAsset(&g_asset.score_month);
FreeAsset(&g_asset.recent);
FreeAsset(&g_asset.favicon);
nsync_note_free(g_terminate);
nsync_note_free(g_shutdown);
for (int i = 0; i < ARRAYLEN(g_shutdown); ++i) {
nsync_note_free(g_shutdown[i]);
}
nsync_counter_free(g_ready);
LOG("Goodbye\n");