Polish greenbean example a bit more

Windows support for this example is still a work in progress. It's
encountering some unusual crashes. Thank you Chris Wellons for the cool
synchronization code too!
This commit is contained in:
Justine Tunney 2022-05-15 09:14:48 -07:00
parent e5e141d9b5
commit 91ee2b19d4
4 changed files with 145 additions and 81 deletions

View file

@ -16,6 +16,7 @@
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/bits/atomic.h"
#include "libc/calls/calls.h"
#include "libc/calls/sigbits.h"
#include "libc/calls/struct/sigset.h"
@ -27,6 +28,7 @@
#include "libc/log/check.h"
#include "libc/log/log.h"
#include "libc/mem/mem.h"
#include "libc/runtime/runtime.h"
#include "libc/sock/goodsocket.internal.h"
#include "libc/sock/sock.h"
#include "libc/str/str.h"
@ -71,25 +73,25 @@
* Like redbean, greenbean has superior performance too, with an
* advantage on benchmarks biased towards high connection counts
*
* $ sudo wrk -c 300 -t 32 --latency http://127.0.0.1:8080/
* Running 10s test @ http://127.0.0.1:8080/
* $ sudo wrk -c 300 -t 32 --latency http://10.10.10.124:8080/
* Running 10s test @ http://10.10.10.124:8080/
* 32 threads and 300 connections
* Thread Stats Avg Stdev Max +/- Stdev
* Latency 36.21us 133.39us 8.10ms 98.52%
* Req/Sec 73.24k 28.92k 131.06k 47.49%
* Latency 1.07ms 8.27ms 138.55ms 98.58%
* Req/Sec 37.98k 12.61k 117.65k 80.11%
* Latency Distribution
* 50% 22.00us
* 75% 29.00us
* 90% 40.00us
* 99% 333.00us
* 4356560 requests in 4.62s, 1.29GB read
* Requests/sec: 942663.73
* Transfer/sec: 284.98MB
* 50% 200.00us
* 75% 227.00us
* 90% 303.00us
* 99% 32.46ms
* 10033090 requests in 8.31s, 2.96GB read
* Requests/sec: 1207983.58
* Transfer/sec: 365.19MB
*
*/
#define THREADS 32
#define HEARTBEAT 500
#define THREADS 512
#define HEARTBEAT 100
#define KEEPALIVE 5000
#define LOGGING 0
@ -98,23 +100,106 @@
"Referrer-Policy: origin\r\n" \
"Cache-Control: private; max-age=0\r\n"
int workers;
int barrier;
////////////////////////////////////////////////////////////////////////////////
// BEGIN: Chris Wellons's Public Domain GNU Atomics Library
#define BARRIER_INC(x) __atomic_add_fetch(x, 1, __ATOMIC_SEQ_CST)
#define BARRIER_GET(x) __atomic_load_n(x, __ATOMIC_SEQ_CST)
#define ATOMIC_LOAD(q) __atomic_load_n(q, __ATOMIC_ACQUIRE)
#define ATOMIC_RLOAD(q) __atomic_load_n(q, __ATOMIC_RELAXED)
#define ATOMIC_STORE(q, v) __atomic_store_n(q, v, __ATOMIC_RELEASE)
#define ATOMIC_ADD(q, c) __atomic_add_fetch(q, c, __ATOMIC_RELEASE)
#define ATOMIC_AND(q, m) __atomic_and_fetch(q, m, __ATOMIC_RELEASE)
#define ATOMIC_CAS(q, e, d) \
__atomic_compare_exchange_n(q, e, d, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)
// Return the array index for then next value to be pushed. The size of this
// array must be (1 << exp) elements. Write the value into this array index,
// then commit it. With a single-consumer queue, this element store need not
// be atomic. The value will appear in the queue after the commit. Returns
// -1 if the queue is full.
static int queue_push(uint32_t *q, int exp) {
uint32_t r = ATOMIC_LOAD(q);
int mask = (1u << exp) - 1;
int head = r & mask;
int tail = r >> 16 & mask;
int next = (head + 1u) & mask;
if (r & 0x8000) { // avoid overflow on commit
ATOMIC_AND(q, ~0x8000);
}
return next == tail ? -1 : head;
}
// Commits and completes the push operation. Do this after storing into the
// array. This operation cannot fail.
static void queue_push_commit(uint32_t *q) {
ATOMIC_ADD(q, 1);
}
// Return the array index for the next value to be popped. The size of this
// array must be (1 << exp) elements. Read from this array index, then
// commit the pop. This element load need not be atomic. The value will be
// removed from the queue after the commit. Returns -1 if the queue is
// empty.
static int queue_pop(uint32_t *q, int exp) {
uint32_t r = ATOMIC_LOAD(q);
int mask = (1u << exp) - 1;
int head = r & mask;
int tail = r >> 16 & mask;
return head == tail ? -1 : tail;
}
// Commits and completes the pop operation. Do this after loading from the
// array. This operation cannot fail.
static void queue_pop_commit(uint32_t *q) {
ATOMIC_ADD(q, 0x10000);
}
// Like queue_pop() but for multiple-consumer queues. The element load must
// be atomic since it is concurrent with the producer's push, though it can
// use a relaxed memory order. The loaded value must not be used unless the
// commit is successful. Stores a temporary "save" to be used at commit.
static int queue_mpop(uint32_t *q, int exp, uint32_t *save) {
uint32_t r = *save = ATOMIC_LOAD(q);
int mask = (1u << exp) - 1;
int head = r & mask;
int tail = r >> 16 & mask;
return head == tail ? -1 : tail;
}
// Like queue_pop_commit() but for multiple-consumer queues. It may fail if
// another consumer pops concurrently, in which case the pop must be retried
// from the beginning.
static bool queue_mpop_commit(uint32_t *q, uint32_t save) {
return ATOMIC_CAS(q, &save, save + 0x10000);
}
// Spin-lock barrier for n threads, where n is a power of two.
// Initialize *barrier to zero.
static void barrier_waitn(int *barrier, int n) {
int v = BARRIER_INC(barrier);
if (v & (n - 1)) {
for (v &= n; (BARRIER_GET(barrier) & n) == v;) {
donothing;
}
}
}
// END: Chris Wellons's Public Domain GNU Atomics Library
////////////////////////////////////////////////////////////////////////////////
int barrier1;
int itsbegun;
int closingtime;
int barrier2;
int itsdone;
int Worker(void *id) {
int server, itsover, ready, yes = 1;
int server, yes = 1;
// announce to the main process this has spawned
kprintf(" #%.2ld", (intptr_t)id);
__atomic_add_fetch(&workers, 1, __ATOMIC_SEQ_CST);
// wait for all threads to spawn before we proceed
for (;;) {
__atomic_load(&barrier, &ready, __ATOMIC_SEQ_CST);
if (ready) break;
__builtin_ia32_pause();
}
kprintf(" %d", id);
barrier_waitn(&barrier1, THREADS);
itsbegun = true;
// load balance incoming connections for port 8080 across all threads
// hangup on any browser clients that lag for more than a few seconds
@ -131,7 +216,7 @@ int Worker(void *id) {
CHECK_EQ(0, listen(server, 10));
// connection loop
for (;;) {
while (!closingtime) {
struct tm tm;
int64_t unixts;
struct Url url;
@ -143,15 +228,8 @@ int Worker(void *id) {
char inbuf[1500], outbuf[512], *p, *q;
int clientip, client, inmsglen, outmsglen;
__atomic_load(&closingtime, &itsover, __ATOMIC_SEQ_CST);
if (itsover) break;
if (!IsLinux() &&
poll(&(struct pollfd){server, POLLIN}, 1, HEARTBEAT) < 1) {
continue;
}
// wait for client connection
if (poll(&(struct pollfd){server, POLLIN}, 1, HEARTBEAT) < 1) continue;
clientaddrsize = sizeof(clientaddr);
client = accept(server, &clientaddr, &clientaddrsize);
@ -163,7 +241,7 @@ int Worker(void *id) {
// inherited by the accepted sockets, but using them also has the
// side-effect that the listening socket fails with EAGAIN, every
// several seconds. we can use that to our advantage to check for
// the ctrl-c shutdown event; otherwise, we retry the accept call
// the ctrl-c shutdowne event; otherwise, we retry the accept call
continue;
}
@ -179,7 +257,7 @@ int Worker(void *id) {
#if LOGGING
// log the incoming http message
clientip = ntohl(clientaddr.sin_addr.s_addr);
kprintf("#%.2ld get some %d.%d.%d.%d:%d %#.*s\n", (intptr_t)id,
kprintf("#%.4x get some %d.%d.%d.%d:%d %#.*s\n", (intptr_t)id,
(clientip & 0xff000000) >> 030, (clientip & 0x00ff0000) >> 020,
(clientip & 0x0000ff00) >> 010, (clientip & 0x000000ff) >> 000,
ntohs(clientaddr.sin_port), msg.uri.b - msg.uri.a,
@ -239,8 +317,9 @@ int Worker(void *id) {
// inform the parent that this clone has finished
close(server);
kprintf(" #%.2ld", (intptr_t)id);
__atomic_sub_fetch(&workers, 1, __ATOMIC_SEQ_CST);
kprintf(" %d", id);
barrier_waitn(&barrier2, THREADS);
itsdone = true;
return 0;
}
@ -249,45 +328,20 @@ void OnCtrlC(int sig) {
}
int main(int argc, char *argv[]) {
/* ShowCrashReports(); */
int64_t loadtzdbearly;
int i, gotsome, haveleft, ready = 1;
ShowCrashReports();
kprintf("welcome to greenbean\n");
gmtime(&loadtzdbearly);
// spawn a bunch of threads
for (i = 0; i < THREADS; ++i) {
for (int i = 0; i < THREADS; ++i) {
void *stack = mmap(0, 65536, PROT_READ | PROT_WRITE,
MAP_STACK | MAP_ANONYMOUS, -1, 0);
clone(Worker, stack, 65536,
CLONE_THREAD | CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND,
(void *)(intptr_t)i, 0, 0, 0, 0);
}
// wait for all threads to spawn
for (;;) {
__atomic_load(&workers, &gotsome, __ATOMIC_SEQ_CST);
if (workers == THREADS) break;
__builtin_ia32_pause();
}
// all threads are spawned so unleash the barrier
kprintf("\ngreenbean is ready to go\n");
while (!ATOMIC_LOAD(&itsbegun)) usleep(HEARTBEAT * 1000);
sigaction(SIGINT, &(struct sigaction){.sa_handler = OnCtrlC}, 0);
__atomic_store(&barrier, &ready, __ATOMIC_SEQ_CST);
// main process does nothing until it's closing time
for (;;) {
__atomic_load(&workers, &haveleft, __ATOMIC_SEQ_CST);
if (!haveleft) break;
__builtin_ia32_pause();
usleep(HEARTBEAT * 1000);
if (closingtime) {
kprintf("\rgreenbean is shutting down...\n");
}
}
kprintf("\n");
kprintf("thank you for flying greenbean\n");
kprintf("\nit's begun\n");
while (!ATOMIC_LOAD(&itsdone)) usleep(HEARTBEAT * 1000);
kprintf("\nthank you for flying greenbean\n");
}