Further enhance IPv4 TurfWar server

- We now vary `/claim` responses based on the `Accept` header
- Remove xasprintf() call to boost `/claim` perf to 321k qps!
This commit is contained in:
Justine Tunney 2022-10-04 05:23:23 -07:00
parent 556697cbd8
commit 726f04e8aa
No known key found for this signature in database
GPG key ID: BE714B4575D6E328

View file

@ -86,13 +86,18 @@
#define MELTALIVE_MS 2000 // panic keepalive under heavy load #define MELTALIVE_MS 2000 // panic keepalive under heavy load
#define POLL_ASSETS_MS 1000 // how often to stat() asset files #define POLL_ASSETS_MS 1000 // how often to stat() asset files
#define DATE_UPDATE_MS 500 // how often to do tzdata crunching #define DATE_UPDATE_MS 500 // how often to do tzdata crunching
#define SCORE_UPDATE_MS 15000 // how often to regeenrate /score json #define SCORE_UPDATE_MS 60000 // how often to regeenrate /score json
#define SCORE_H_UPDATE_MS 15000 // how often to regeenrate /score json
#define SCORE_D_UPDATE_MS 15000 // how often to regeenrate /score json
#define SCORE_W_UPDATE_MS 15000 // how often to regeenrate /score json
#define SCORE_M_UPDATE_MS 15000 // how often to regeenrate /score json
#define CLAIM_DEADLINE_MS 100 // how long /claim may block if queue is full #define CLAIM_DEADLINE_MS 100 // how long /claim may block if queue is full
#define PANIC_LOAD .85 // meltdown if this percent of pool connected #define PANIC_LOAD .85 // meltdown if this percent of pool connected
#define PANIC_MSGS 10 // msgs per conn can't exceed it in meltdown #define PANIC_MSGS 10 // msgs per conn can't exceed it in meltdown
#define QUEUE_MAX 800 // maximum pending claim items in queue #define QUEUE_MAX 800 // maximum pending claim items in queue
#define BATCH_MAX 64 // max claims to insert per transaction #define BATCH_MAX 64 // max claims to insert per transaction
#define NICK_MAX 40 // max length of user nickname string #define NICK_MAX 40 // max length of user nickname string
#define MSG_BUF 512 // small response lookaside
#define INBUF_SIZE PAGESIZE #define INBUF_SIZE PAGESIZE
#define OUTBUF_SIZE PAGESIZE #define OUTBUF_SIZE PAGESIZE
@ -138,6 +143,12 @@ Usage: turfwar.com [-dv] ARGS...\n\
#define DEBUG(...) (void)0 #define DEBUG(...) (void)0
#endif #endif
#define CHECK_MEM(x) \
do { \
if (!CheckMem(__FILE__, __LINE__, x)) { \
goto OnError; \
} \
} while (0)
#define CHECK_SYS(x) \ #define CHECK_SYS(x) \
do { \ do { \
if (!CheckSys(__FILE__, __LINE__, x)) { \ if (!CheckSys(__FILE__, __LINE__, x)) { \
@ -174,6 +185,11 @@ static const uint8_t kGzipHeader[] = {
kZipOsUnix, // OS kZipOsUnix, // OS
}; };
static const char kPixel[43] =
"\x47\x49\x46\x38\x39\x61\x01\x00\x01\x00\x80\x00\x00\xff\xff\xff"
"\x00\x00\x00\x21\xf9\x04\x01\x00\x00\x00\x00\x2c\x00\x00\x00\x00"
"\x01\x00\x01\x00\x00\x02\x02\x44\x01\x00\x3b";
struct Data { struct Data {
char *p; char *p;
size_t n; size_t n;
@ -202,6 +218,7 @@ atomic_int g_connections;
struct Worker { struct Worker {
pthread_t th; pthread_t th;
atomic_int msgcount; atomic_int msgcount;
atomic_bool shutdown;
atomic_bool connected; atomic_bool connected;
struct timespec startread; struct timespec startread;
} * g_worker; } * g_worker;
@ -222,6 +239,10 @@ struct Assets {
struct Asset about; struct Asset about;
struct Asset user; struct Asset user;
struct Asset score; struct Asset score;
struct Asset score_hour;
struct Asset score_day;
struct Asset score_week;
struct Asset score_month;
struct Asset recent; struct Asset recent;
struct Asset favicon; struct Asset favicon;
} g_asset; } g_asset;
@ -239,6 +260,12 @@ struct Claims {
} data[QUEUE_MAX]; } data[QUEUE_MAX];
} g_claims; } g_claims;
bool CheckMem(const char *file, int line, void *ptr) {
if (ptr) return true;
kprintf("%s:%d: out of memory: %s\n", file, line, strerror(errno));
return false;
}
bool CheckSys(const char *file, int line, long rc) { bool CheckSys(const char *file, int line, long rc) {
if (rc != -1) return true; if (rc != -1) return true;
kprintf("%s:%d: %s\n", file, line, strerror(errno)); kprintf("%s:%d: %s\n", file, line, strerror(errno));
@ -427,6 +454,7 @@ void *HttpWorker(void *arg) {
char name[16]; char name[16];
sigset_t mask; sigset_t mask;
int id = (intptr_t)arg; int id = (intptr_t)arg;
char *msgbuf = _gc(malloc(MSG_BUF));
char *inbuf = NewSafeBuffer(INBUF_SIZE); char *inbuf = NewSafeBuffer(INBUF_SIZE);
char *outbuf = NewSafeBuffer(OUTBUF_SIZE); char *outbuf = NewSafeBuffer(OUTBUF_SIZE);
struct timeval timeo = {g_keepalive / 1000, g_keepalive % 1000}; struct timeval timeo = {g_keepalive / 1000, g_keepalive % 1000};
@ -504,6 +532,14 @@ void *HttpWorker(void *arg) {
a = &g_asset.about; a = &g_asset.about;
} else if (UrlStartsWith("/user.html")) { } else if (UrlStartsWith("/user.html")) {
a = &g_asset.user; a = &g_asset.user;
} else if (UrlStartsWith("/score/hour")) {
a = &g_asset.score_hour;
} else if (UrlStartsWith("/score/day")) {
a = &g_asset.score_day;
} else if (UrlStartsWith("/score/week")) {
a = &g_asset.score_week;
} else if (UrlStartsWith("/score/month")) {
a = &g_asset.score_month;
} else if (UrlStartsWith("/score")) { } else if (UrlStartsWith("/score")) {
a = &g_asset.score; a = &g_asset.score;
} else if (UrlStartsWith("/recent")) { } else if (UrlStartsWith("/recent")) {
@ -545,6 +581,7 @@ void *HttpWorker(void *arg) {
} else if (UrlStartsWith("/ip")) { } else if (UrlStartsWith("/ip")) {
if (!ipv6) { if (!ipv6) {
p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS
"Vary: Accept\r\n"
"Content-Type: text/plain\r\n" "Content-Type: text/plain\r\n"
"Cache-Control: max-age=3600, private\r\n" "Cache-Control: max-age=3600, private\r\n"
"Date: "); "Date: ");
@ -563,6 +600,7 @@ void *HttpWorker(void *arg) {
q = "IPv4 Games only supports IPv4 right now"; q = "IPv4 Games only supports IPv4 right now";
p = stpcpy(outbuf, p = stpcpy(outbuf,
"HTTP/1.1 400 Need IPv4\r\n" STANDARD_RESPONSE_HEADERS "HTTP/1.1 400 Need IPv4\r\n" STANDARD_RESPONSE_HEADERS
"Vary: Accept\r\n"
"Content-Type: text/plain\r\n" "Content-Type: text/plain\r\n"
"Cache-Control: private\r\n" "Cache-Control: private\r\n"
"Connection: close\r\n" "Connection: close\r\n"
@ -586,26 +624,71 @@ void *HttpWorker(void *arg) {
_timespec_add(_timespec_real(), _timespec_add(_timespec_real(),
_timespec_frommillis(CLAIM_DEADLINE_MS)))) { _timespec_frommillis(CLAIM_DEADLINE_MS)))) {
LOG("%s claimed by %s\n", ipbuf, v.name); LOG("%s claimed by %s\n", ipbuf, v.name);
q = xasprintf("<!doctype html>\n" if (HasHeader(kHttpAccept) &&
"<title>The land at %s was claimed for %s.</title>\n" (HeaderHas(msg, inbuf, kHttpAccept, "image/*", 7) ||
"<meta name=\"viewport\" " HeaderHas(msg, inbuf, kHttpAccept, "image/gif", 9))) {
"content=\"width=device-width, initial-scale=1\">\n" p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS
"The land at %s was claimed for <a " "Vary: Accept\r\n"
"href=\"/user.html?name=%s\">%s</a>.\n" "Cache-Control: private\r\n"
"<p>\n<a href=/>Back to homepage</a>\n", "Content-Type: image/gif\r\n"
ipbuf, v.name, ipbuf, v.name, v.name); "Date: ");
p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS p = FormatDate(p);
"Content-Type: text/html\r\n" p = stpcpy(p, "\r\nContent-Length: ");
"Cache-Control: private\r\n" p = FormatInt32(p, sizeof(kPixel));
"Date: "); p = stpcpy(p, "\r\n\r\n");
p = FormatDate(p); p = mempcpy(p, kPixel, sizeof(kPixel));
p = stpcpy(p, "\r\nContent-Length: "); } else if (HasHeader(kHttpAccept) &&
p = FormatInt32(p, strlen(q)); HeaderHas(msg, inbuf, kHttpAccept, "text/plain", 10) &&
p = stpcpy(p, "\r\n\r\n"); !HeaderHas(msg, inbuf, kHttpAccept, "text/html", 9)) {
p = stpcpy(p, q); ksnprintf(msgbuf, MSG_BUF, "The land at %s was claimed for %s\n",
ipbuf, v.name);
q = msgbuf;
p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS
"Vary: Accept\r\n"
"Cache-Control: private\r\n"
"Content-Type: text/plain\r\n"
"Date: ");
p = FormatDate(p);
p = stpcpy(p, "\r\nContent-Length: ");
p = FormatInt32(p, strlen(q));
p = stpcpy(p, "\r\n\r\n");
p = stpcpy(p, q);
} else if (!HasHeader(kHttpAccept) ||
(HeaderHas(msg, inbuf, kHttpAccept, "text/html", 9) ||
HeaderHas(msg, inbuf, kHttpAccept, "text/*", 6) ||
HeaderHas(msg, inbuf, kHttpAccept, "*/*", 3))) {
ksnprintf(msgbuf, MSG_BUF,
"<!doctype html>\n"
"<title>The land at %s was claimed for %s.</title>\n"
"<meta name=\"viewport\" "
"content=\"width=device-width, initial-scale=1\">\n"
"The land at %s was claimed for <a "
"href=\"/user.html?name=%s\">%s</a>.\n"
"<p>\n<a href=/>Back to homepage</a>\n",
ipbuf, v.name, ipbuf, v.name, v.name);
q = msgbuf;
p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS
"Vary: Accept\r\n"
"Cache-Control: private\r\n"
"Content-Type: text/html\r\n"
"Date: ");
p = FormatDate(p);
p = stpcpy(p, "\r\nContent-Length: ");
p = FormatInt32(p, strlen(q));
p = stpcpy(p, "\r\n\r\n");
p = stpcpy(p, q);
} else {
p = stpcpy(outbuf,
"HTTP/1.1 204 No Content\r\n" STANDARD_RESPONSE_HEADERS
"Vary: Accept\r\n"
"Cache-Control: private\r\n"
"Content-Length: 0\r\n"
"Date: ");
p = FormatDate(p);
p = stpcpy(p, "\r\n\r\n");
}
outmsglen = p - outbuf; outmsglen = p - outbuf;
sent = write(client, outbuf, p - outbuf); sent = write(client, outbuf, p - outbuf);
free(q);
} else { } else {
LOG("%s: 502 Claims Queue Full\n", ipbuf); LOG("%s: 502 Claims Queue Full\n", ipbuf);
q = "Claims Queue Full"; q = "Claims Queue Full";
@ -680,6 +763,7 @@ void *HttpWorker(void *arg) {
} }
LOG("HttpWorker #%d exiting", id); LOG("HttpWorker #%d exiting", id);
g_worker[id].shutdown = true;
FreeSafeBuffer(outbuf); FreeSafeBuffer(outbuf);
FreeSafeBuffer(inbuf); FreeSafeBuffer(inbuf);
close(server); close(server);
@ -691,21 +775,29 @@ struct Data Gzip(struct Data data) {
void *tmp; void *tmp;
uint32_t crc; uint32_t crc;
char footer[8]; char footer[8];
struct Data res;
z_stream zs = {0}; z_stream zs = {0};
struct Data res = {0};
crc = crc32_z(0, data.p, data.n); crc = crc32_z(0, data.p, data.n);
WRITE32LE(footer + 0, crc); WRITE32LE(footer + 0, crc);
WRITE32LE(footer + 4, data.n); WRITE32LE(footer + 4, data.n);
CHECK_EQ(Z_OK, deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, if (Z_OK != deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -MAX_WBITS,
-MAX_WBITS, DEF_MEM_LEVEL, Z_DEFAULT_STRATEGY)); DEF_MEM_LEVEL, Z_DEFAULT_STRATEGY)) {
return (struct Data){0};
}
zs.next_in = (const Bytef *)data.p; zs.next_in = (const Bytef *)data.p;
zs.avail_in = data.n; zs.avail_in = data.n;
zs.avail_out = compressBound(data.n); zs.avail_out = compressBound(data.n);
zs.next_out = tmp = xmalloc(zs.avail_out); if (!(zs.next_out = tmp = malloc(zs.avail_out))) {
deflateEnd(&zs);
return (struct Data){0};
}
CHECK_EQ(Z_STREAM_END, deflate(&zs, Z_FINISH)); CHECK_EQ(Z_STREAM_END, deflate(&zs, Z_FINISH));
CHECK_EQ(Z_OK, deflateEnd(&zs)); CHECK_EQ(Z_OK, deflateEnd(&zs));
res.n = sizeof(kGzipHeader) + zs.total_out + sizeof(footer); res.n = sizeof(kGzipHeader) + zs.total_out + sizeof(footer);
p = res.p = xmalloc(res.n); if (!(p = res.p = malloc(res.n))) {
free(tmp);
return (struct Data){0};
}
p = mempcpy(p, kGzipHeader, sizeof(kGzipHeader)); p = mempcpy(p, kGzipHeader, sizeof(kGzipHeader));
p = mempcpy(p, tmp, zs.total_out); p = mempcpy(p, tmp, zs.total_out);
p = mempcpy(p, footer, sizeof(footer)); p = mempcpy(p, footer, sizeof(footer));
@ -720,9 +812,9 @@ struct Asset LoadAsset(const char *path, const char *type) {
CHECK_NOTNULL((a.data.p = xslurp(path, &a.data.n))); CHECK_NOTNULL((a.data.p = xslurp(path, &a.data.n)));
a.type = type; a.type = type;
a.cache = "max-age=3600, must-revalidate"; a.cache = "max-age=3600, must-revalidate";
a.path = xstrdup(path); CHECK_NOTNULL((a.path = strdup(path)));
a.mtim = st.st_mtim; a.mtim = st.st_mtim;
a.gzip = Gzip(a.data); CHECK_NOTNULL((a.gzip = Gzip(a.data)).p);
FormatUnixHttpDateTime(a.lastmod, a.mtim.tv_sec); FormatUnixHttpDateTime(a.lastmod, a.mtim.tv_sec);
return a; return a;
} }
@ -737,12 +829,13 @@ bool ReloadAsset(struct Asset *a) {
struct Data gzip = {0}; struct Data gzip = {0};
CHECK_SYS((fd = open(a->path, O_RDONLY))); CHECK_SYS((fd = open(a->path, O_RDONLY)));
CHECK_SYS(fstat(fd, &st)); CHECK_SYS(fstat(fd, &st));
if (_timespec_gt(st.st_mtim, a->mtim) && (data.p = malloc(st.st_size))) { if (_timespec_gt(st.st_mtim, a->mtim)) {
FormatUnixHttpDateTime(lastmod, st.st_mtim.tv_sec); FormatUnixHttpDateTime(lastmod, st.st_mtim.tv_sec);
CHECK_MEM((data.p = malloc(st.st_size)));
CHECK_SYS((rc = read(fd, data.p, st.st_size))); CHECK_SYS((rc = read(fd, data.p, st.st_size)));
data.n = st.st_size; data.n = st.st_size;
if (rc != st.st_size) goto OnError; if (rc != st.st_size) goto OnError;
gzip = Gzip(data); CHECK_MEM((gzip = Gzip(data)).p);
//!//!//!//!//!//!//!//!//!//!//!//!//!/ //!//!//!//!//!//!//!//!//!//!//!//!//!/
nsync_mu_lock(&a->lock); nsync_mu_lock(&a->lock);
f[0] = a->data.p; f[0] = a->data.p;
@ -760,6 +853,7 @@ bool ReloadAsset(struct Asset *a) {
return true; return true;
OnError: OnError:
free(data.p); free(data.p);
free(gzip.p);
close(fd); close(fd);
return false; return false;
} }
@ -775,8 +869,19 @@ void IgnoreSignal(int sig) {
} }
void OnCtrlC(int sig) { void OnCtrlC(int sig) {
LOG("Received %s shutting down...\n", strsignal(sig)); if (!nsync_note_is_notified(g_shutdown)) {
nsync_note_notify(g_shutdown); LOG("Received %s shutting down...\n", strsignal(sig));
nsync_note_notify(g_shutdown);
} else {
// there's no way to deliver signals to workers atomically, unless
// we pay the cost of ppoll() which isn't necessary in this design
LOG("Received %s again so sending another volley...\n", strsignal(sig));
for (int i = 0; i < g_workers; ++i) {
if (!g_worker[i].shutdown) {
tkill(pthread_getunique_np(g_worker[i].th), SIGUSR1);
}
}
}
} }
static void GetOpts(int argc, char *argv[]) { static void GetOpts(int argc, char *argv[]) {
@ -808,10 +913,10 @@ static void GetOpts(int argc, char *argv[]) {
} }
} }
void Update(struct Asset *a, bool gen(struct Asset *)) { void Update(struct Asset *a, bool gen(struct Asset *, long), long arg) {
void *f[2]; void *f[2];
struct Asset t; struct Asset t;
if (gen(&t)) { if (gen(&t, arg)) {
//!//!//!//!//!//!//!//!//!//!//!//!//!/ //!//!//!//!//!//!//!//!//!//!//!//!//!/
nsync_mu_lock(&a->lock); nsync_mu_lock(&a->lock);
f[0] = a->data.p; f[0] = a->data.p;
@ -827,7 +932,7 @@ void Update(struct Asset *a, bool gen(struct Asset *)) {
} }
} }
bool GenerateScore(struct Asset *out) { bool GenerateScore(struct Asset *out, long seconds) {
int rc; int rc;
char *sb = 0; char *sb = 0;
sqlite3 *db = 0; sqlite3 *db = 0;
@ -837,9 +942,9 @@ bool GenerateScore(struct Asset *out) {
bool namestate = false; bool namestate = false;
char name1[NICK_MAX + 1] = {0}; char name1[NICK_MAX + 1] = {0};
char name2[NICK_MAX + 1]; char name2[NICK_MAX + 1];
DEBUG("GenerateScore\n"); DEBUG("GenerateScore %ld\n", seconds);
a.type = "application/json"; a.type = "application/json";
a.cache = "max-age=60, must-revalidate"; a.cache = "max-age=15, must-revalidate";
CHECK_SYS(clock_gettime(CLOCK_REALTIME, &a.mtim)); CHECK_SYS(clock_gettime(CLOCK_REALTIME, &a.mtim));
FormatUnixHttpDateTime(a.lastmod, a.mtim.tv_sec); FormatUnixHttpDateTime(a.lastmod, a.mtim.tv_sec);
CHECK_SYS(appends(&a.data.p, "{\n")); CHECK_SYS(appends(&a.data.p, "{\n"));
@ -849,11 +954,22 @@ bool GenerateScore(struct Asset *out) {
CHECK_SQL(sqlite3_open("db.sqlite3", &db)); CHECK_SQL(sqlite3_open("db.sqlite3", &db));
CHECK_SQL(sqlite3_exec(db, "PRAGMA journal_mode=WAL", 0, 0, 0)); CHECK_SQL(sqlite3_exec(db, "PRAGMA journal_mode=WAL", 0, 0, 0));
CHECK_SQL(sqlite3_exec(db, "PRAGMA synchronous=NORMAL", 0, 0, 0)); CHECK_SQL(sqlite3_exec(db, "PRAGMA synchronous=NORMAL", 0, 0, 0));
CHECK_DB(sqlite3_prepare_v2(db, if (seconds == -1) {
"SELECT nick, (ip >> 24), COUNT(*)\n" CHECK_DB(sqlite3_prepare_v2(db,
"FROM land\n" "SELECT nick, (ip >> 24), COUNT(*)\n"
"GROUP BY nick, (ip >> 24)", "FROM land\n"
-1, &stmt, 0)); "GROUP BY nick, (ip >> 24)",
-1, &stmt, 0));
} else {
CHECK_DB(sqlite3_prepare_v2(db,
"SELECT nick, (ip >> 24), COUNT(*)\n"
" FROM land\n"
"WHERE created NOT NULL\n"
" AND created >= ?1\n"
"GROUP BY nick, (ip >> 24)",
-1, &stmt, 0));
CHECK_DB(sqlite3_bind_int64(stmt, 1, a.mtim.tv_sec - seconds));
}
CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0));
while ((rc = sqlite3_step(stmt)) != SQLITE_DONE) { while ((rc = sqlite3_step(stmt)) != SQLITE_DONE) {
if (rc != SQLITE_ROW) CHECK_SQL(rc); if (rc != SQLITE_ROW) CHECK_SQL(rc);
@ -900,7 +1016,7 @@ void *ScoreWorker(void *arg) {
for (deadline = _timespec_real();;) { for (deadline = _timespec_real();;) {
deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_UPDATE_MS)); deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_UPDATE_MS));
if (!nsync_note_wait(g_shutdown, deadline)) { if (!nsync_note_wait(g_shutdown, deadline)) {
Update(&g_asset.score, GenerateScore); Update(&g_asset.score, GenerateScore, -1);
} else { } else {
break; break;
} }
@ -909,7 +1025,79 @@ void *ScoreWorker(void *arg) {
return 0; return 0;
} }
bool GenerateRecent(struct Asset *out) { // single thread for regenerating the user scores json
void *ScoreHourWorker(void *arg) {
nsync_time deadline;
LOG("ScoreHourWorker started\n");
OnlyRunOnCpu(0);
pthread_setname_np(pthread_self(), "ScoreHourWorker");
for (deadline = _timespec_real();;) {
deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_D_UPDATE_MS));
if (!nsync_note_wait(g_shutdown, deadline)) {
Update(&g_asset.score_hour, GenerateScore, 60L * 60);
} else {
break;
}
}
LOG("ScoreHourWorker exiting\n");
return 0;
}
// single thread for regenerating the user scores json
void *ScoreDayWorker(void *arg) {
nsync_time deadline;
LOG("ScoreDayWorker started\n");
OnlyRunOnCpu(0);
pthread_setname_np(pthread_self(), "ScoreDayWorker");
for (deadline = _timespec_real();;) {
deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_D_UPDATE_MS));
if (!nsync_note_wait(g_shutdown, deadline)) {
Update(&g_asset.score_day, GenerateScore, 60L * 60 * 24);
} else {
break;
}
}
LOG("ScoreDayWorker exiting\n");
return 0;
}
// single thread for regenerating the user scores json
void *ScoreWeekWorker(void *arg) {
nsync_time deadline;
LOG("ScoreWeekWorker started\n");
OnlyRunOnCpu(0);
pthread_setname_np(pthread_self(), "ScoreWeekWorker");
for (deadline = _timespec_real();;) {
deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_D_UPDATE_MS));
if (!nsync_note_wait(g_shutdown, deadline)) {
Update(&g_asset.score_week, GenerateScore, 60L * 60 * 24 * 7);
} else {
break;
}
}
LOG("ScoreWeekWorker exiting\n");
return 0;
}
// single thread for regenerating the user scores json
void *ScoreMonthWorker(void *arg) {
nsync_time deadline;
LOG("ScoreMonthWorker started\n");
OnlyRunOnCpu(0);
pthread_setname_np(pthread_self(), "ScoreMonthWorker");
for (deadline = _timespec_real();;) {
deadline = _timespec_add(deadline, _timespec_frommillis(SCORE_D_UPDATE_MS));
if (!nsync_note_wait(g_shutdown, deadline)) {
Update(&g_asset.score_month, GenerateScore, 60L * 60 * 24 * 30);
} else {
break;
}
}
LOG("ScoreMonthWorker exiting\n");
return 0;
}
bool GenerateRecent(struct Asset *out, long arg) {
int rc; int rc;
char *sb = 0; char *sb = 0;
sqlite3 *db = 0; sqlite3 *db = 0;
@ -981,7 +1169,7 @@ void *RecentWorker(void *arg) {
nsync_time_no_deadline, g_shutdown); nsync_time_no_deadline, g_shutdown);
nsync_mu_unlock(&g_recent.mu); nsync_mu_unlock(&g_recent.mu);
if (rc == ECANCELED) break; if (rc == ECANCELED) break;
Update(&g_asset.recent, GenerateRecent); Update(&g_asset.recent, GenerateRecent, -1);
} }
LOG("RecentWorker exiting\n"); LOG("RecentWorker exiting\n");
return 0; return 0;
@ -1005,7 +1193,9 @@ StartOver:
"VALUES (?1, ?2, ?3)\n" "VALUES (?1, ?2, ?3)\n"
"ON CONFLICT (ip) DO\n" "ON CONFLICT (ip) DO\n"
"UPDATE SET (nick, created) = (?2, ?3)\n" "UPDATE SET (nick, created) = (?2, ?3)\n"
"WHERE nick != ?2", "WHERE nick != ?2\n"
" AND (created IS NULL\n"
" OR (?3 - created) > 3600)",
-1, &stmt, 0)); -1, &stmt, 0));
LOG("ClaimWorker started\n"); LOG("ClaimWorker started\n");
while ((n = GetClaims(&g_claims, v, BATCH_MAX, nsync_time_no_deadline))) { while ((n = GetClaims(&g_claims, v, BATCH_MAX, nsync_time_no_deadline))) {
@ -1014,6 +1204,7 @@ StartOver:
CHECK_DB(sqlite3_bind_int64(stmt, 1, v[i].ip)); CHECK_DB(sqlite3_bind_int64(stmt, 1, v[i].ip));
CHECK_DB(sqlite3_bind_text(stmt, 2, v[i].name, -1, SQLITE_TRANSIENT)); CHECK_DB(sqlite3_bind_text(stmt, 2, v[i].name, -1, SQLITE_TRANSIENT));
CHECK_DB(sqlite3_bind_int64(stmt, 3, v[i].created)); CHECK_DB(sqlite3_bind_int64(stmt, 3, v[i].created));
CHECK_DB(sqlite3_bind_int64(stmt, 3, v[i].created));
CHECK_DB((rc = sqlite3_step(stmt)) == SQLITE_DONE ? SQLITE_OK : rc); CHECK_DB((rc = sqlite3_step(stmt)) == SQLITE_DONE ? SQLITE_OK : rc);
CHECK_DB(sqlite3_reset(stmt)); CHECK_DB(sqlite3_reset(stmt));
} }
@ -1130,10 +1321,22 @@ int main(int argc, char *argv[]) {
// create threads // create threads
pthread_t scorer; pthread_t scorer;
CHECK_EQ(1, GenerateScore(&g_asset.score)); CHECK_EQ(1, GenerateScore(&g_asset.score, -1));
CHECK_EQ(0, pthread_create(&scorer, 0, ScoreWorker, 0)); CHECK_EQ(0, pthread_create(&scorer, 0, ScoreWorker, 0));
pthread_t scorer_hour;
CHECK_EQ(1, GenerateScore(&g_asset.score_hour, 60L * 60));
CHECK_EQ(0, pthread_create(&scorer_hour, 0, ScoreHourWorker, 0));
pthread_t scorer_day;
CHECK_EQ(1, GenerateScore(&g_asset.score_day, 60L * 60 * 24));
CHECK_EQ(0, pthread_create(&scorer_day, 0, ScoreDayWorker, 0));
pthread_t scorer_week;
CHECK_EQ(1, GenerateScore(&g_asset.score_week, 60L * 60 * 24 * 7));
CHECK_EQ(0, pthread_create(&scorer_week, 0, ScoreWeekWorker, 0));
pthread_t scorer_month;
CHECK_EQ(1, GenerateScore(&g_asset.score_month, 60L * 60 * 24 * 30));
CHECK_EQ(0, pthread_create(&scorer_month, 0, ScoreMonthWorker, 0));
pthread_t recentr; pthread_t recentr;
CHECK_EQ(1, GenerateRecent(&g_asset.recent)); CHECK_EQ(1, GenerateRecent(&g_asset.recent, -1));
CHECK_EQ(0, pthread_create(&recentr, 0, RecentWorker, 0)); CHECK_EQ(0, pthread_create(&recentr, 0, RecentWorker, 0));
pthread_t claimer; pthread_t claimer;
CHECK_EQ(0, pthread_create(&claimer, 0, ClaimWorker, 0)); CHECK_EQ(0, pthread_create(&claimer, 0, ClaimWorker, 0));
@ -1164,6 +1367,10 @@ int main(int argc, char *argv[]) {
LOG("Waiting for helpers to finish...\n"); LOG("Waiting for helpers to finish...\n");
CHECK_EQ(0, pthread_join(recentr, 0)); CHECK_EQ(0, pthread_join(recentr, 0));
CHECK_EQ(0, pthread_join(scorer, 0)); CHECK_EQ(0, pthread_join(scorer, 0));
CHECK_EQ(0, pthread_join(scorer_hour, 0));
CHECK_EQ(0, pthread_join(scorer_day, 0));
CHECK_EQ(0, pthread_join(scorer_week, 0));
CHECK_EQ(0, pthread_join(scorer_month, 0));
CHECK_EQ(0, pthread_join(nower, 0)); CHECK_EQ(0, pthread_join(nower, 0));
// wait for consumers to finish // wait for consumers to finish
@ -1178,6 +1385,10 @@ int main(int argc, char *argv[]) {
FreeAsset(&g_asset.about); FreeAsset(&g_asset.about);
FreeAsset(&g_asset.index); FreeAsset(&g_asset.index);
FreeAsset(&g_asset.score); FreeAsset(&g_asset.score);
FreeAsset(&g_asset.score_hour);
FreeAsset(&g_asset.score_day);
FreeAsset(&g_asset.score_week);
FreeAsset(&g_asset.score_month);
FreeAsset(&g_asset.recent); FreeAsset(&g_asset.recent);
FreeAsset(&g_asset.favicon); FreeAsset(&g_asset.favicon);
nsync_note_free(g_terminate); nsync_note_free(g_terminate);