mirror of
https://github.com/jart/cosmopolitan.git
synced 2025-07-14 15:09:09 +00:00
Make greenbean web server better
- Remove misguided __assert_disabled variable - Change EPROCLIM to be EAGAIN on BSD distros - Improve quality of greenbean with cancellations - Fix thread race condition crash with file descriptors
This commit is contained in:
parent
6cff3137c5
commit
0e087143fd
14 changed files with 247 additions and 149 deletions
|
@ -10,52 +10,34 @@
|
|||
#include "libc/assert.h"
|
||||
#include "libc/atomic.h"
|
||||
#include "libc/calls/calls.h"
|
||||
#include "libc/calls/pledge.h"
|
||||
#include "libc/calls/struct/sigaction.h"
|
||||
#include "libc/calls/struct/sigset.h"
|
||||
#include "libc/calls/struct/timespec.h"
|
||||
#include "libc/calls/struct/timeval.h"
|
||||
#include "libc/dce.h"
|
||||
#include "libc/errno.h"
|
||||
#include "libc/fmt/conv.h"
|
||||
#include "libc/fmt/itoa.h"
|
||||
#include "libc/intrin/atomic.h"
|
||||
#include "libc/intrin/kprintf.h"
|
||||
#include "libc/limits.h"
|
||||
#include "libc/log/check.h"
|
||||
#include "libc/log/log.h"
|
||||
#include "libc/macros.internal.h"
|
||||
#include "libc/mem/gc.internal.h"
|
||||
#include "libc/mem/mem.h"
|
||||
#include "libc/runtime/internal.h"
|
||||
#include "libc/runtime/runtime.h"
|
||||
#include "libc/runtime/stack.h"
|
||||
#include "libc/sock/sock.h"
|
||||
#include "libc/sock/struct/pollfd.h"
|
||||
#include "libc/sock/struct/sockaddr.h"
|
||||
#include "libc/str/str.h"
|
||||
#include "libc/sysv/consts/af.h"
|
||||
#include "libc/sysv/consts/clock.h"
|
||||
#include "libc/sysv/consts/clone.h"
|
||||
#include "libc/sysv/consts/ipproto.h"
|
||||
#include "libc/sysv/consts/map.h"
|
||||
#include "libc/sysv/consts/poll.h"
|
||||
#include "libc/sysv/consts/prot.h"
|
||||
#include "libc/sysv/consts/rlimit.h"
|
||||
#include "libc/sysv/consts/sig.h"
|
||||
#include "libc/sysv/consts/so.h"
|
||||
#include "libc/sysv/consts/sock.h"
|
||||
#include "libc/sysv/consts/sol.h"
|
||||
#include "libc/sysv/consts/tcp.h"
|
||||
#include "libc/thread/spawn.h"
|
||||
#include "libc/thread/thread.h"
|
||||
#include "libc/thread/tls.h"
|
||||
#include "libc/thread/wait0.internal.h"
|
||||
#include "libc/time/struct/tm.h"
|
||||
#include "libc/time/time.h"
|
||||
#include "libc/thread/thread2.h"
|
||||
#include "net/http/http.h"
|
||||
#include "net/http/url.h"
|
||||
|
||||
/**
|
||||
* @fileoverview greenbean lightweighht threaded web server
|
||||
* @fileoverview greenbean lightweight threaded web server
|
||||
*
|
||||
* $ make -j8 o//tool/net/greenbean.com
|
||||
* $ o//tool/net/greenbean.com &
|
||||
|
@ -95,9 +77,8 @@
|
|||
*/
|
||||
|
||||
#define PORT 8080
|
||||
#define HEARTBEAT 100
|
||||
#define KEEPALIVE 5000
|
||||
#define LOGGING 0
|
||||
#define KEEPALIVE 30000
|
||||
#define LOGGING 1
|
||||
|
||||
#define STANDARD_RESPONSE_HEADERS \
|
||||
"Server: greenbean/1.o\r\n" \
|
||||
|
@ -105,12 +86,25 @@
|
|||
"Cache-Control: private; max-age=0\r\n"
|
||||
|
||||
int threads;
|
||||
atomic_int workers;
|
||||
atomic_int messages;
|
||||
atomic_int listening;
|
||||
atomic_int connections;
|
||||
atomic_int closingtime;
|
||||
const char *volatile status;
|
||||
int alwaysclose;
|
||||
atomic_int a_termsig;
|
||||
atomic_int a_workers;
|
||||
atomic_int a_messages;
|
||||
atomic_int a_listening;
|
||||
atomic_int a_connections;
|
||||
pthread_cond_t statuscond;
|
||||
pthread_mutex_t statuslock;
|
||||
const char *volatile status = "";
|
||||
|
||||
void SomethingHappened(void) {
|
||||
unassert(!pthread_cond_signal(&statuscond));
|
||||
}
|
||||
|
||||
void SomethingImportantHappened(void) {
|
||||
unassert(!pthread_mutex_lock(&statuslock));
|
||||
unassert(!pthread_cond_signal(&statuscond));
|
||||
unassert(!pthread_mutex_unlock(&statuslock));
|
||||
}
|
||||
|
||||
void *Worker(void *id) {
|
||||
int server, yes = 1;
|
||||
|
@ -128,6 +122,8 @@ void *Worker(void *id) {
|
|||
goto WorkerFinished;
|
||||
}
|
||||
|
||||
// we don't bother checking for errors here since OS support for the
|
||||
// advanced features tends to be a bit spotty and harmless to ignore
|
||||
setsockopt(server, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo));
|
||||
setsockopt(server, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo));
|
||||
setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
|
||||
|
@ -136,72 +132,84 @@ void *Worker(void *id) {
|
|||
setsockopt(server, SOL_TCP, TCP_QUICKACK, &yes, sizeof(yes));
|
||||
errno = 0;
|
||||
|
||||
// open our ears to incoming connections; so_reuseport makes it
|
||||
// possible for our many threads to bind to the same interface!
|
||||
// otherwise we'd need to create a complex multi-threaded queue
|
||||
if (bind(server, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
||||
kprintf("%s() failed %m\n", "socket");
|
||||
goto CloseWorker;
|
||||
}
|
||||
|
||||
listen(server, 1);
|
||||
unassert(!listen(server, 1));
|
||||
|
||||
// connection loop
|
||||
++listening;
|
||||
while (!closingtime) {
|
||||
struct tm tm;
|
||||
int64_t unixts;
|
||||
ssize_t got, sent;
|
||||
struct timespec ts;
|
||||
struct HttpMessage msg;
|
||||
++a_listening;
|
||||
SomethingImportantHappened();
|
||||
while (!a_termsig) {
|
||||
uint32_t clientaddrsize;
|
||||
struct sockaddr_in clientaddr;
|
||||
char inbuf[1500], outbuf[512], *p, *q;
|
||||
int client, inmsglen, outmsglen;
|
||||
char inbuf[1500], outbuf[512], *p, *q;
|
||||
|
||||
// this slows the server down a lot but is needed on non-Linux to
|
||||
// react to keyboard ctrl-c
|
||||
if (!IsLinux() &&
|
||||
poll(&(struct pollfd){server, POLLIN}, 1, HEARTBEAT) < 1) {
|
||||
continue;
|
||||
}
|
||||
// musl libc and cosmopolitan libc support a posix thread extension
|
||||
// that makes thread cancellation work much better your io routines
|
||||
// will just raise ECANCELED so you can check for cancellation with
|
||||
// normal logic rather than needing to push and pop cleanup handler
|
||||
// functions onto the stack, or worse dealing with async interrupts
|
||||
unassert(!pthread_setcancelstate(PTHREAD_CANCEL_MASKED, 0));
|
||||
|
||||
// wait for client connection
|
||||
// we don't bother with poll() because this is actually very speedy
|
||||
clientaddrsize = sizeof(clientaddr);
|
||||
client = accept(server, (struct sockaddr *)&clientaddr, &clientaddrsize);
|
||||
|
||||
// turns cancellation off so we don't interrupt active http clients
|
||||
unassert(!pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0));
|
||||
|
||||
// accept() can raise a very diverse number of errors but none of
|
||||
// them are really true showstoppers that would necessitate us to
|
||||
// panic and abort the entire server, so we can just ignore these
|
||||
if (client == -1) {
|
||||
// we used SO_RCVTIMEO and SO_SNDTIMEO because those settings are
|
||||
// inherited by the accepted sockets, but using them also has the
|
||||
// side-effect that the listening socket fails with EAGAIN, every
|
||||
// several seconds. we can use that to our advantage to check for
|
||||
// the ctrl-c shutdowne event; otherwise we retry the accept call
|
||||
// side-effect that the listening socket fails with EAGAIN errors
|
||||
// which are harmless, and so are most other errors accept raises
|
||||
// e.g. ECANCELED, which lets us check closingtime without delay!
|
||||
continue;
|
||||
}
|
||||
|
||||
++connections;
|
||||
++a_connections;
|
||||
SomethingHappened();
|
||||
|
||||
// message loop
|
||||
ssize_t got, sent;
|
||||
struct HttpMessage msg;
|
||||
do {
|
||||
// parse the incoming http message
|
||||
InitHttpMessage(&msg, kHttpRequest);
|
||||
// we're not terrible concerned when errors happen here
|
||||
unassert(!pthread_setcancelstate(PTHREAD_CANCEL_MASKED, 0));
|
||||
if ((got = read(client, inbuf, sizeof(inbuf))) <= 0) break;
|
||||
unassert(!pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0));
|
||||
// check that client message wasn't fragmented into more reads
|
||||
if ((inmsglen = ParseHttpMessage(&msg, inbuf, got)) <= 0) break;
|
||||
++messages;
|
||||
++a_messages;
|
||||
SomethingHappened();
|
||||
|
||||
#if LOGGING
|
||||
// log the incoming http message
|
||||
clientip = ntohl(clientaddr.sin_addr.s_addr);
|
||||
kprintf("%6P get some %d.%d.%d.%d:%d %#.*s\n",
|
||||
unsigned clientip = ntohl(clientaddr.sin_addr.s_addr);
|
||||
kprintf("\r\e[K%6P get some %d.%d.%d.%d:%d %#.*s\n",
|
||||
(clientip & 0xff000000) >> 030, (clientip & 0x00ff0000) >> 020,
|
||||
(clientip & 0x0000ff00) >> 010, (clientip & 0x000000ff) >> 000,
|
||||
ntohs(clientaddr.sin_port), msg.uri.b - msg.uri.a,
|
||||
inbuf + msg.uri.a);
|
||||
SomethingHappened();
|
||||
#endif
|
||||
|
||||
// display hello world html page for http://127.0.0.1:8080/
|
||||
struct tm tm;
|
||||
int64_t unixts;
|
||||
struct timespec ts;
|
||||
if (msg.method == kHttpGet &&
|
||||
(msg.uri.b - msg.uri.a == 1 && inbuf[msg.uri.a + 0] == '/')) {
|
||||
q = "<!doctype html>\r\n"
|
||||
|
@ -216,6 +224,9 @@ void *Worker(void *id) {
|
|||
p = FormatHttpDateTime(p, gmtime_r(&unixts, &tm));
|
||||
p = stpcpy(p, "\r\nContent-Length: ");
|
||||
p = FormatInt32(p, strlen(q));
|
||||
if (alwaysclose) {
|
||||
p = stpcpy(p, "\r\nConnection: close");
|
||||
}
|
||||
p = stpcpy(p, "\r\n\r\n");
|
||||
p = stpcpy(p, q);
|
||||
outmsglen = p - outbuf;
|
||||
|
@ -234,6 +245,9 @@ void *Worker(void *id) {
|
|||
p = FormatHttpDateTime(p, gmtime_r(&unixts, &tm));
|
||||
p = stpcpy(p, "\r\nContent-Length: ");
|
||||
p = FormatInt32(p, strlen(q));
|
||||
if (alwaysclose) {
|
||||
p = stpcpy(p, "\r\nConnection: close");
|
||||
}
|
||||
p = stpcpy(p, "\r\n\r\n");
|
||||
p = stpcpy(p, q);
|
||||
outmsglen = p - outbuf;
|
||||
|
@ -244,27 +258,27 @@ void *Worker(void *id) {
|
|||
// amount, then since we sent the content length and checked
|
||||
// that the client didn't attach a payload, we are so synced
|
||||
// thus we can safely process more messages
|
||||
} while (got == inmsglen && sent == outmsglen &&
|
||||
} while (!alwaysclose && //
|
||||
got == inmsglen && //
|
||||
sent == outmsglen && //
|
||||
!msg.headers[kHttpContentLength].a &&
|
||||
!msg.headers[kHttpTransferEncoding].a &&
|
||||
(msg.method == kHttpGet || msg.method == kHttpHead));
|
||||
DestroyHttpMessage(&msg);
|
||||
close(client);
|
||||
--connections;
|
||||
--a_connections;
|
||||
SomethingHappened();
|
||||
}
|
||||
--listening;
|
||||
--a_listening;
|
||||
|
||||
// inform the parent that this clone has finished
|
||||
CloseWorker:
|
||||
close(server);
|
||||
WorkerFinished:
|
||||
--workers;
|
||||
return 0;
|
||||
}
|
||||
--a_workers;
|
||||
|
||||
void OnCtrlC(int sig) {
|
||||
closingtime = true;
|
||||
status = " shutting down...";
|
||||
SomethingImportantHappened();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void PrintStatus(void) {
|
||||
|
@ -273,70 +287,170 @@ void PrintStatus(void) {
|
|||
"listening=%d "
|
||||
"connections=%d "
|
||||
"messages=%d%s ",
|
||||
workers, listening, connections, messages, status);
|
||||
a_workers, a_listening, a_connections, a_messages, status);
|
||||
}
|
||||
|
||||
void OnTerm(int sig) {
|
||||
a_termsig = sig;
|
||||
status = " shutting down...";
|
||||
SomethingHappened();
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
int i, rc;
|
||||
pthread_t *th;
|
||||
int i;
|
||||
|
||||
// print cpu registers and backtrace on crash
|
||||
// note that pledge'll makes backtraces worse
|
||||
// you can press ctrl+\ to trigger your crash
|
||||
ShowCrashReports();
|
||||
|
||||
// listen for ctrl-c, terminal close, and kill
|
||||
struct sigaction sa = {.sa_handler = OnTerm};
|
||||
unassert(!sigaction(SIGINT, &sa, 0));
|
||||
unassert(!sigaction(SIGHUP, &sa, 0));
|
||||
unassert(!sigaction(SIGTERM, &sa, 0));
|
||||
|
||||
// print all the ips that 0.0.0.0 would bind
|
||||
// Cosmo's GetHostIps() API is much easier than ioctl(SIOCGIFCONF)
|
||||
uint32_t *hostips;
|
||||
// ShowCrashReports();
|
||||
|
||||
// listen for ctrl-c, hangup, and kill which shut down greenbean
|
||||
status = "";
|
||||
struct sigaction sa = {.sa_handler = OnCtrlC};
|
||||
sigaction(SIGHUP, &sa, 0);
|
||||
sigaction(SIGINT, &sa, 0);
|
||||
sigaction(SIGTERM, &sa, 0);
|
||||
|
||||
// print all the ips that 0.0.0.0 will bind
|
||||
for (hostips = GetHostIps(), i = 0; hostips[i]; ++i) {
|
||||
for (hostips = gc(GetHostIps()), i = 0; hostips[i]; ++i) {
|
||||
kprintf("listening on http://%d.%d.%d.%d:%d\n",
|
||||
(hostips[i] & 0xff000000) >> 030, (hostips[i] & 0x00ff0000) >> 020,
|
||||
(hostips[i] & 0x0000ff00) >> 010, (hostips[i] & 0x000000ff) >> 000,
|
||||
PORT);
|
||||
}
|
||||
|
||||
// you can pass the number of threads you want as the first command arg
|
||||
threads = argc > 1 ? atoi(argv[1]) : __get_cpu_count();
|
||||
if (!(1 <= threads && threads <= 100000)) {
|
||||
kprintf("error: invalid number of threads: %d\n", threads);
|
||||
kprintf("\r\e[Kerror: invalid number of threads: %d\n", threads);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// caveat emptor microsofties
|
||||
if (IsWindows()) {
|
||||
kprintf("sorry but windows isn't supported by the greenbean demo yet\n"
|
||||
"because it doesn't support SO_REUSEPORT which is a nice for\n"
|
||||
"gaining great performance on UNIX systems, with simple code\n"
|
||||
"however windows will work fine if we limit it to one thread\n");
|
||||
threads = 1; // we're going to make just one web server thread
|
||||
alwaysclose = 1; // don't let client idle, since it'd block others
|
||||
}
|
||||
|
||||
// secure the server
|
||||
//
|
||||
// pledge() and unveil() let us whitelist which system calls and files
|
||||
// the server will be allowed to use. this way if it gets hacked, they
|
||||
// won't be able to do much damage, like compromising the whole server
|
||||
//
|
||||
// we use an internal api to force threads to enable beforehand, since
|
||||
// cosmopolitan code morphs the binary to support tls across platforms
|
||||
// and doing that requires extra permissions we don't need for serving
|
||||
//
|
||||
// pledge violations on openbsd are logged nicely to the system logger
|
||||
// but on linux we need to use a cosmopolitan extension to get details
|
||||
// although doing that slightly weakens the security pledge() provides
|
||||
//
|
||||
// if your operating system doesn't support these security features or
|
||||
// is too old, then pledge() and unveil() don't consider this an error
|
||||
// so it works. if security is critical there's a special call to test
|
||||
// which is npassert(!pledge(0, 0)), and npassert(unveil("", 0) != -1)
|
||||
__enable_threads();
|
||||
__pledge_mode = PLEDGE_PENALTY_KILL_THREAD | PLEDGE_STDERR_LOGGING;
|
||||
unveil("/dev/null", "rw");
|
||||
unveil(0, 0);
|
||||
pledge("stdio inet", 0);
|
||||
|
||||
// spawn over 9,000 worker threads
|
||||
th = calloc(threads, sizeof(pthread_t));
|
||||
//
|
||||
// you don't need weird i/o models, or event driven yoyo pattern code
|
||||
// to build a massively scalable server. the secret is to use threads
|
||||
// with tiny stacks. then you can write plain simple imperative code!
|
||||
//
|
||||
// we like pthread attributes since they generally make thread spawns
|
||||
// faster especially in cases where you need to make detached threads
|
||||
//
|
||||
// we block signals in our worker threads so we won't need messy code
|
||||
// to spin on eintr. operating systems also deliver signals to random
|
||||
// threads, and we'd have ctrl-c, etc. be handled by the main thread.
|
||||
//
|
||||
// alternatively you can just use signal() instead of sigaction(); it
|
||||
// uses SA_RESTART because all the syscalls the worker currently uses
|
||||
// are documented as @restartable which means no EINTR toil is needed
|
||||
unassert(!pthread_cond_init(&statuscond, 0));
|
||||
unassert(!pthread_mutex_init(&statuslock, 0));
|
||||
sigset_t block;
|
||||
sigfillset(&block);
|
||||
sigdelset(&block, SIGSEGV); // invalid memory access
|
||||
sigdelset(&block, SIGBUS); // another kind of bad memory access
|
||||
sigdelset(&block, SIGFPE); // divide by zero, etc.
|
||||
sigdelset(&block, SIGSYS); // pledge violations
|
||||
sigdelset(&block, SIGILL); // bad cpu opcode
|
||||
pthread_attr_t attr;
|
||||
unassert(!pthread_attr_init(&attr));
|
||||
unassert(!pthread_attr_setsigmask_np(&attr, &block));
|
||||
pthread_t *th = gc(calloc(threads, sizeof(pthread_t)));
|
||||
for (i = 0; i < threads; ++i) {
|
||||
++workers;
|
||||
if ((rc = pthread_create(th + i, 0, Worker, (void *)(intptr_t)i))) {
|
||||
--workers;
|
||||
kprintf("error: pthread_create(%d) failed %s\n", i, strerror(rc));
|
||||
int rc;
|
||||
++a_workers;
|
||||
if ((rc = pthread_create(th + i, &attr, Worker, (void *)(intptr_t)i))) {
|
||||
--a_workers;
|
||||
// rc will most likely be EAGAIN (we hit the process/thread limit)
|
||||
kprintf("\r\e[Kerror: pthread_create(%d) failed: %s\n"
|
||||
" try increasing RLIMIT_NPROC\n",
|
||||
i, strerror(rc));
|
||||
if (!i) exit(1);
|
||||
threads = i;
|
||||
break;
|
||||
}
|
||||
if (!(i % 500)) {
|
||||
if (!(i % 50)) {
|
||||
PrintStatus();
|
||||
}
|
||||
}
|
||||
unassert(!pthread_attr_destroy(&attr));
|
||||
|
||||
// wait for workers to terminate
|
||||
while (workers) {
|
||||
unassert(!pthread_mutex_lock(&statuslock));
|
||||
while (!a_termsig) {
|
||||
PrintStatus();
|
||||
usleep(HEARTBEAT * 1000);
|
||||
unassert(!pthread_cond_wait(&statuscond, &statuslock));
|
||||
usleep(20);
|
||||
}
|
||||
unassert(!pthread_mutex_unlock(&statuslock));
|
||||
|
||||
// cancel all the worker threads so they shut down asap
|
||||
// and it'll wait on active clients to gracefully close
|
||||
// you've never seen a production server close so fast!
|
||||
for (i = 0; i < threads; ++i) {
|
||||
pthread_cancel(th[i]);
|
||||
}
|
||||
|
||||
// print status in terminal as the shutdown progresses
|
||||
unassert(!pthread_mutex_lock(&statuslock));
|
||||
while (a_workers) {
|
||||
unassert(!pthread_cond_wait(&statuscond, &statuslock));
|
||||
PrintStatus();
|
||||
}
|
||||
unassert(!pthread_mutex_unlock(&statuslock));
|
||||
|
||||
// wait for final termination and free thread memory
|
||||
for (i = 0; i < threads; ++i) {
|
||||
unassert(!pthread_join(th[i], 0));
|
||||
}
|
||||
|
||||
// clean up terminal line
|
||||
kprintf("\r\e[K");
|
||||
kprintf("\r\e[Kthank you for choosing \e[32mgreenbean\e[0m\n");
|
||||
|
||||
// join the workers
|
||||
for (i = 0; i < threads; ++i) {
|
||||
pthread_join(th[i], 0);
|
||||
// clean up more resources
|
||||
unassert(!pthread_mutex_destroy(&statuslock));
|
||||
unassert(!pthread_cond_destroy(&statuscond));
|
||||
|
||||
// quality assurance
|
||||
if (IsModeDbg()) {
|
||||
CheckForMemoryLeaks();
|
||||
}
|
||||
|
||||
// clean up memory
|
||||
free(hostips);
|
||||
free(th);
|
||||
// propagate termination signal
|
||||
signal(a_termsig, SIG_DFL);
|
||||
raise(a_termsig);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue