Support thread local storage

This commit is contained in:
Justine Tunney 2022-05-16 13:20:08 -07:00
parent 91ee2b19d4
commit 55de4ca6b5
197 changed files with 1483 additions and 874 deletions

View file

@ -8,6 +8,7 @@
*/
#endif
#include "libc/log/log.h"
#include "libc/runtime/symbols.internal.h"
/**
* @fileoverview How to print backtraces and cpu state on crash.

View file

@ -143,14 +143,6 @@ o/$(MODE)/examples/nesemu1.com: \
@$(COMPILE) -AZIP -T$@ o/$(MODE)/third_party/zip/zip.com -0qj $@ \
o/$(MODE)/examples/.nesemu1/.symtab
o/$(MODE)/examples/hello.com.dbg: \
$(EXAMPLES_DEPS) \
o/$(MODE)/examples/hello.o \
o/$(MODE)/examples/examples.pkg \
$(CRT) \
$(APE_NO_MODIFY_SELF)
@$(APELINK)
o/$(MODE)/examples/nesemu1.o: QUOTA += -M512m
$(EXAMPLES_OBJS): examples/examples.mk

View file

@ -16,6 +16,7 @@
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/assert.h"
#include "libc/bits/atomic.h"
#include "libc/calls/calls.h"
#include "libc/calls/sigbits.h"
@ -27,6 +28,7 @@
#include "libc/intrin/kprintf.h"
#include "libc/log/check.h"
#include "libc/log/log.h"
#include "libc/macros.internal.h"
#include "libc/mem/mem.h"
#include "libc/runtime/runtime.h"
#include "libc/sock/goodsocket.internal.h"
@ -90,7 +92,8 @@
*
*/
#define THREADS 512
#define PORT 8080
#define THREADS 10000
#define HEARTBEAT 100
#define KEEPALIVE 5000
#define LOGGING 0
@ -100,120 +103,39 @@
"Referrer-Policy: origin\r\n" \
"Cache-Control: private; max-age=0\r\n"
////////////////////////////////////////////////////////////////////////////////
// BEGIN: Chris Wellons's Public Domain GNU Atomics Library
#define BARRIER_INC(x) __atomic_add_fetch(x, 1, __ATOMIC_SEQ_CST)
#define BARRIER_GET(x) __atomic_load_n(x, __ATOMIC_SEQ_CST)
#define ATOMIC_LOAD(q) __atomic_load_n(q, __ATOMIC_ACQUIRE)
#define ATOMIC_RLOAD(q) __atomic_load_n(q, __ATOMIC_RELAXED)
#define ATOMIC_STORE(q, v) __atomic_store_n(q, v, __ATOMIC_RELEASE)
#define ATOMIC_ADD(q, c) __atomic_add_fetch(q, c, __ATOMIC_RELEASE)
#define ATOMIC_AND(q, m) __atomic_and_fetch(q, m, __ATOMIC_RELEASE)
#define ATOMIC_CAS(q, e, d) \
__atomic_compare_exchange_n(q, e, d, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)
// Return the array index for then next value to be pushed. The size of this
// array must be (1 << exp) elements. Write the value into this array index,
// then commit it. With a single-consumer queue, this element store need not
// be atomic. The value will appear in the queue after the commit. Returns
// -1 if the queue is full.
static int queue_push(uint32_t *q, int exp) {
uint32_t r = ATOMIC_LOAD(q);
int mask = (1u << exp) - 1;
int head = r & mask;
int tail = r >> 16 & mask;
int next = (head + 1u) & mask;
if (r & 0x8000) { // avoid overflow on commit
ATOMIC_AND(q, ~0x8000);
}
return next == tail ? -1 : head;
}
// Commits and completes the push operation. Do this after storing into the
// array. This operation cannot fail.
static void queue_push_commit(uint32_t *q) {
ATOMIC_ADD(q, 1);
}
// Return the array index for the next value to be popped. The size of this
// array must be (1 << exp) elements. Read from this array index, then
// commit the pop. This element load need not be atomic. The value will be
// removed from the queue after the commit. Returns -1 if the queue is
// empty.
static int queue_pop(uint32_t *q, int exp) {
uint32_t r = ATOMIC_LOAD(q);
int mask = (1u << exp) - 1;
int head = r & mask;
int tail = r >> 16 & mask;
return head == tail ? -1 : tail;
}
// Commits and completes the pop operation. Do this after loading from the
// array. This operation cannot fail.
static void queue_pop_commit(uint32_t *q) {
ATOMIC_ADD(q, 0x10000);
}
// Like queue_pop() but for multiple-consumer queues. The element load must
// be atomic since it is concurrent with the producer's push, though it can
// use a relaxed memory order. The loaded value must not be used unless the
// commit is successful. Stores a temporary "save" to be used at commit.
static int queue_mpop(uint32_t *q, int exp, uint32_t *save) {
uint32_t r = *save = ATOMIC_LOAD(q);
int mask = (1u << exp) - 1;
int head = r & mask;
int tail = r >> 16 & mask;
return head == tail ? -1 : tail;
}
// Like queue_pop_commit() but for multiple-consumer queues. It may fail if
// another consumer pops concurrently, in which case the pop must be retried
// from the beginning.
static bool queue_mpop_commit(uint32_t *q, uint32_t save) {
return ATOMIC_CAS(q, &save, save + 0x10000);
}
// Spin-lock barrier for n threads, where n is a power of two.
// Initialize *barrier to zero.
static void barrier_waitn(int *barrier, int n) {
int v = BARRIER_INC(barrier);
if (v & (n - 1)) {
for (v &= n; (BARRIER_GET(barrier) & n) == v;) {
donothing;
}
}
}
// END: Chris Wellons's Public Domain GNU Atomics Library
////////////////////////////////////////////////////////////////////////////////
int barrier1;
int itsbegun;
int workers;
int messages;
int connections;
int closingtime;
int barrier2;
int itsdone;
const char *status;
int Worker(void *id) {
int server, yes = 1;
kprintf(" %d", id);
barrier_waitn(&barrier1, THREADS);
itsbegun = true;
// load balance incoming connections for port 8080 across all threads
// hangup on any browser clients that lag for more than a few seconds
struct timeval timeo = {KEEPALIVE / 1000, KEEPALIVE % 1000};
struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(8080)};
CHECK_NE(-1, (server = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)));
struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(PORT)};
server = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (server == -1) {
if (LOGGING) kprintf("%s() failed %m\n", "socket");
goto WorkerFinished;
}
setsockopt(server, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo));
setsockopt(server, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo));
setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
setsockopt(server, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes));
setsockopt(server, SOL_TCP, TCP_FASTOPEN, &yes, sizeof(yes));
setsockopt(server, SOL_TCP, TCP_QUICKACK, &yes, sizeof(yes));
CHECK_EQ(0, bind(server, &addr, sizeof(addr)));
CHECK_EQ(0, listen(server, 10));
if (bind(server, &addr, sizeof(addr)) == -1) {
if (LOGGING) kprintf("%s() failed %m\n", "socket");
goto WorkerFinished;
}
listen(server, 10);
// connection loop
while (!closingtime) {
@ -228,8 +150,12 @@ int Worker(void *id) {
char inbuf[1500], outbuf[512], *p, *q;
int clientip, client, inmsglen, outmsglen;
if (!IsLinux() &&
poll(&(struct pollfd){server, POLLIN}, 1, HEARTBEAT) < 1) {
continue;
}
// wait for client connection
if (poll(&(struct pollfd){server, POLLIN}, 1, HEARTBEAT) < 1) continue;
clientaddrsize = sizeof(clientaddr);
client = accept(server, &clientaddr, &clientaddrsize);
@ -245,6 +171,8 @@ int Worker(void *id) {
continue;
}
asm volatile("lock incl\t%0" : "+m"(connections));
// message loop
do {
// parse the incoming http message
@ -253,6 +181,7 @@ int Worker(void *id) {
if ((got = read(client, inbuf, sizeof(inbuf))) <= 0) break;
// check that client message wasn't fragmented into more reads
if (!(inmsglen = ParseHttpMessage(&msg, inbuf, got))) break;
asm volatile("lock incl\t%0" : "+m"(messages));
#if LOGGING
// log the incoming http message
@ -313,35 +242,46 @@ int Worker(void *id) {
(msg.method == kHttpGet || msg.method == kHttpHead));
DestroyHttpMessage(&msg);
close(client);
asm volatile("lock decl\t%0" : "+m"(connections));
}
// inform the parent that this clone has finished
WorkerFinished:
close(server);
kprintf(" %d", id);
barrier_waitn(&barrier2, THREADS);
itsdone = true;
asm volatile("lock decl\t%0" : "+m"(workers));
return 0;
}
void OnCtrlC(int sig) {
closingtime = true;
status = " shutting down...";
}
int main(int argc, char *argv[]) {
/* ShowCrashReports(); */
int64_t loadtzdbearly;
kprintf("welcome to greenbean\n");
gmtime(&loadtzdbearly);
for (int i = 0; i < THREADS; ++i) {
int i;
uint32_t *hostips;
ShowCrashReports();
sigaction(SIGINT, &(struct sigaction){.sa_handler = OnCtrlC}, 0);
for (hostips = GetHostIps(), i = 0; hostips[i]; ++i) {
kprintf("listening on http://%d.%d.%d.%d:%d\n",
(hostips[i] & 0xff000000) >> 030, (hostips[i] & 0x00ff0000) >> 020,
(hostips[i] & 0x0000ff00) >> 010, (hostips[i] & 0x000000ff) >> 000,
PORT);
}
workers = THREADS;
for (i = 0; i < THREADS; ++i) {
void *stack = mmap(0, 65536, PROT_READ | PROT_WRITE,
MAP_STACK | MAP_ANONYMOUS, -1, 0);
clone(Worker, stack, 65536,
CLONE_THREAD | CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND,
(void *)(intptr_t)i, 0, 0, 0, 0);
}
while (!ATOMIC_LOAD(&itsbegun)) usleep(HEARTBEAT * 1000);
sigaction(SIGINT, &(struct sigaction){.sa_handler = OnCtrlC}, 0);
kprintf("\nit's begun\n");
while (!ATOMIC_LOAD(&itsdone)) usleep(HEARTBEAT * 1000);
kprintf("\nthank you for flying greenbean\n");
status = "";
while (workers) {
kprintf(
"\r\e[K\e[32mgreenbean\e[0m workers=%d connections=%d messages=%d%s ",
workers, connections, messages, status);
usleep(HEARTBEAT * 1000);
}
kprintf("\r\e[K");
}