Do code cleanup use duff device linenoise i/o

This commit is contained in:
Justine Tunney 2022-04-22 18:55:28 -07:00
parent 6ff46ca373
commit 2f56ebfe78
79 changed files with 1393 additions and 1484 deletions

View file

@ -39,6 +39,7 @@
#include "libc/fmt/conv.h"
#include "libc/fmt/itoa.h"
#include "libc/intrin/nomultics.internal.h"
#include "libc/intrin/spinlock.h"
#include "libc/log/backtrace.internal.h"
#include "libc/log/check.h"
#include "libc/log/log.h"
@ -50,10 +51,12 @@
#include "libc/nexgen32e/bsf.h"
#include "libc/nexgen32e/bsr.h"
#include "libc/nexgen32e/crc32.h"
#include "libc/nexgen32e/nt2sysv.h"
#include "libc/nexgen32e/rdtsc.h"
#include "libc/nexgen32e/rdtscp.h"
#include "libc/nt/enum/fileflagandattributes.h"
#include "libc/nt/runtime.h"
#include "libc/nt/thread.h"
#include "libc/rand/rand.h"
#include "libc/runtime/clktck.h"
#include "libc/runtime/directmap.internal.h"
@ -362,6 +365,7 @@ static bool printport;
static bool daemonize;
static bool logrusage;
static bool logbodies;
static bool isterminal;
static bool sslcliused;
static bool loglatency;
static bool terminated;
@ -835,12 +839,13 @@ static void DescribeAddress(char buf[40], uint32_t addr, uint16_t port) {
char *p;
const char *s;
p = buf;
p += uint64toarray_radix10((addr & 0xFF000000) >> 030, p), *p++ = '.';
p += uint64toarray_radix10((addr & 0x00FF0000) >> 020, p), *p++ = '.';
p += uint64toarray_radix10((addr & 0x0000FF00) >> 010, p), *p++ = '.';
p += uint64toarray_radix10((addr & 0x000000FF) >> 000, p), *p++ = ':';
p += uint64toarray_radix10(port, p);
*p++ = '\0';
p = FormatUint64(p, (addr & 0xFF000000) >> 030), *p++ = '.';
p = FormatUint64(p, (addr & 0x00FF0000) >> 020), *p++ = '.';
p = FormatUint64(p, (addr & 0x0000FF00) >> 010), *p++ = '.';
p = FormatUint64(p, (addr & 0x000000FF) >> 000), *p++ = ':';
p = FormatUint64(p, port);
*p = '\0';
assert(p - buf < 40);
}
static inline void GetServerAddr(uint32_t *ip, uint16_t *port) {
@ -1029,7 +1034,7 @@ static void Daemonize(void) {
umask(0);
if (pidpath) {
fd = open(pidpath, O_CREAT | O_WRONLY, 0644);
WRITE(fd, ibuf, uint64toarray_radix10(getpid(), ibuf));
WRITE(fd, ibuf, FormatInt32(ibuf, getpid()) - ibuf);
close(fd);
}
if (!logpath) ProgramLogPath("/dev/null");
@ -1486,6 +1491,7 @@ static void CertsDestroy(void) {
static void WipeServingKeys(void) {
size_t i;
long double t = nowl();
if (uniprocess) return;
mbedtls_ssl_ticket_free(&ssltick);
mbedtls_ssl_key_cert_free(conf.key_cert), conf.key_cert = 0;
@ -2083,7 +2089,7 @@ static char *AppendExpires(char *p, int64_t t) {
static char *AppendCache(char *p, int64_t seconds) {
if (seconds < 0) return p;
p = stpcpy(p, "Cache-Control: max-age=");
p += uint64toarray_radix10(seconds, p);
p = FormatUint64(p, seconds);
if (seconds) {
p = stpcpy(p, ", public");
} else {
@ -2095,21 +2101,21 @@ static char *AppendCache(char *p, int64_t seconds) {
static inline char *AppendContentLength(char *p, size_t n) {
p = stpcpy(p, "Content-Length: ");
p += uint64toarray_radix10(n, p);
p = FormatUint64(p, n);
return AppendCrlf(p);
}
static char *AppendContentRange(char *p, long a, long b, long c) {
p = stpcpy(p, "Content-Range: bytes ");
if (a >= 0 && b > 0) {
p += uint64toarray_radix10(a, p);
p = FormatUint64(p, a);
*p++ = '-';
p += uint64toarray_radix10(a + b - 1, p);
p = FormatUint64(p, a + b - 1);
} else {
*p++ = '*';
}
*p++ = '/';
p += uint64toarray_radix10(c, p);
p = FormatUint64(p, c);
return AppendCrlf(p);
}
@ -6396,6 +6402,7 @@ static bool StreamResponse(char *p) {
static bool HandleMessageActual(void) {
int rc;
char *p;
long double now;
if ((rc = ParseHttpMessage(&msg, inbuf.p, amtread)) != -1) {
if (!rc) return false;
hdrsize = rc;
@ -6437,8 +6444,11 @@ static bool HandleMessageActual(void) {
LockInc(&shared->c.messageshandled);
++messageshandled;
if (loglatency || LOGGABLE(kLogDebug)) {
LOGF(kLogDebug, "(stat) %`'.*s latency %,ldµs", msg.uri.b - msg.uri.a,
inbuf.p + msg.uri.a, (long)((nowl() - startrequest) * 1e6L));
now = nowl();
LOGF(kLogDebug, "(stat) %`'.*s latency r: %,ldµs c: %,ldµs",
msg.uri.b - msg.uri.a, inbuf.p + msg.uri.a,
(long)((now - startrequest) * 1e6L),
(long)((now - startconnection) * 1e6L));
}
if (!generator) {
return TransmitResponse(p);
@ -6702,7 +6712,6 @@ static int EnableSandbox(void) {
}
}
// returns 0 otherwise -1 if worker needs to unwind stack and exit
static int HandleConnection(size_t i) {
int pid, rc = 0;
clientaddrsize = sizeof(clientaddr);
@ -6731,10 +6740,16 @@ static int HandleConnection(size_t i) {
__kbirth = rdtsc();
}
if (funtrace) {
ftrace_install();
if (ftrace_install() != -1) {
g_ftrace = 1;
} else {
WARNF("ftrace failed to install %m");
}
}
}
CHECK_NE(-1, EnableSandbox());
if (sandboxed) {
CHECK_NE(-1, EnableSandbox());
}
if (hasonworkerstart) {
CallSimpleHook("OnWorkerStart");
}
@ -6824,45 +6839,6 @@ static int HandleConnection(size_t i) {
return rc;
}
// returns 2 if we should stay in the redbean event loop
// returns 1 if poll() says stdin has user input available
// returns 0 if poll() timed out after ms
// returns -1 if worker is unwinding exit
static int HandlePoll(int ms) {
size_t i;
int nfds;
if ((nfds = poll(polls, 1 + servers.n, ms)) != -1) {
for (i = 0; i < servers.n; ++i) {
if (polls[1 + i].revents) {
serveraddr = &servers.p[i].addr;
ishandlingconnection = true;
if (HandleConnection(i) == -1) return -1;
ishandlingconnection = false;
}
}
// are we polling stdin for the repl?
if (polls[0].fd >= 0) {
if (polls[0].revents) {
return 1; // user entered a keystroke
} else if (!nfds) {
return 0; // let linenoise know it timed out
}
}
} else {
if (errno == EINTR || errno == EAGAIN) {
LockInc(&shared->c.pollinterrupts);
} else if (errno == ENOMEM) {
LockInc(&shared->c.enomems);
WARNF("(srvr) %s ran out of memory");
meltdown = true;
} else {
DIEF("(srvr) poll error: %m");
}
errno = 0;
}
return 2;
}
static void RestoreApe(void) {
char *p;
size_t n;
@ -6883,6 +6859,97 @@ static void RestoreApe(void) {
}
}
static int HandleReadline(void) {
int status;
for (;;) {
status = lua_loadline(GL);
if (status < 0) {
if (status == -1) {
OnTerm(SIGHUP); // eof
INFOF("got repl eof");
write(1, "\r\n", 2);
return -1;
} else if (errno == EINTR) {
errno = 0;
OnInt(SIGINT);
INFOF("got repl interrupt");
return 0;
} else if (errno == EAGAIN) {
errno = 0;
return 0;
} else {
OnTerm(SIGIO); // error
return -1;
}
}
write(1, "\r\n", 2);
linenoiseDisableRawMode();
_spinlock(&lualock);
if (status == LUA_OK) {
status = lua_runchunk(GL, 0, LUA_MULTRET);
}
if (status == LUA_OK) {
lua_l_print(GL);
} else {
lua_report(GL, status);
}
_spunlock(&lualock);
if (lua_repl_isterminal) {
linenoiseEnableRawMode(0);
}
}
}
static int HandlePoll(int ms) {
int rc, nfds;
size_t pollid, serverid;
if ((nfds = poll(polls, 1 + servers.n, ms)) != -1) {
if (nfds) {
// handle pollid/o events
for (pollid = 0; pollid < 1 + servers.n; ++pollid) {
if (!polls[pollid].revents) continue;
if (polls[pollid].fd < 0) continue;
if (polls[pollid].fd) {
// handle listen socket
serverid = pollid - 1;
assert(0 <= serverid && serverid < servers.n);
serveraddr = &servers.p[serverid].addr;
ishandlingconnection = true;
_spinlock(&lualock);
rc = HandleConnection(serverid);
_spunlock(&lualock);
ishandlingconnection = false;
if (rc == -1) return -1;
} else {
// handle standard input
rc = HandleReadline();
if (rc == -1) return rc;
}
}
} else if (__replmode) {
// handle refresh repl line
if (!IsWindows()) {
rc = HandleReadline();
if (rc < 0) return rc;
} else {
linenoiseRefreshLine(lua_repl_linenoise);
}
}
} else {
if (errno == EINTR || errno == EAGAIN) {
LockInc(&shared->c.pollinterrupts);
} else if (errno == ENOMEM) {
LockInc(&shared->c.enomems);
WARNF("(srvr) %s ran out of memory");
meltdown = true;
} else {
DIEF("(srvr) poll error: %m");
}
errno = 0;
}
return 0;
}
static void Listen(void) {
char ipbuf[16];
size_t i, j, n;
@ -6960,10 +7027,9 @@ static void HandleShutdown(void) {
}
// this function coroutines with linenoise
static int EventLoop(int fd, int ms) {
int rc;
static int EventLoop(int ms) {
long double t;
rc = -1;
VERBOSEF("EventLoop()");
while (!terminated) {
errno = 0;
if (zombied) {
@ -6977,54 +7043,47 @@ static int EventLoop(int fd, int ms) {
} else if ((t = nowl()) - lastheartbeat > HEARTBEAT / 1000.) {
lastheartbeat = t;
HandleHeartbeat();
} else if ((rc = HandlePoll(ms)) != 2) {
break; // return control to linenoise
} else if (HandlePoll(ms) == -1) {
break;
}
}
return rc;
return -1;
}
static void ReplEventLoop(void) {
int status;
long double t;
lua_State *L = GL;
DEBUGF("ReplEventLoop()");
polls[0].fd = 0;
__nomultics = 2;
__replmode = true;
lua_initrepl("redbean");
linenoiseSetPollCallback(EventLoop);
for (;;) {
if ((status = lua_loadline(L)) == -1) {
if (errno == EINTR) {
LockInc(&shared->c.pollinterrupts);
if (terminated) {
break;
} else {
continue;
}
} else {
break;
}
}
if (status == LUA_OK) {
status = lua_runchunk(L, 0, LUA_MULTRET);
}
if (status == LUA_OK) {
lua_l_print(L);
} else {
lua_report(L, status);
}
lua_initrepl(GL, "redbean");
if (lua_repl_isterminal) {
linenoiseEnableRawMode(0);
}
if (!terminated && !isexitingworker) {
OnTerm(SIGHUP); // eof event
}
lua_settop(L, 0); // clear stack
lua_writeline();
__replmode = false;
__nomultics = 0;
EventLoop(100);
linenoiseDisableRawMode();
lua_freerepl();
lua_settop(GL, 0); // clear stack
polls[0].fd = -1;
}
static uint32_t WindowsReplThread(void *arg) {
DEBUGF("WindowsReplThread()");
lua_repl_blocking = true;
lua_initrepl(GL, "redbean");
if (lua_repl_isterminal) {
linenoiseEnableRawMode(0);
}
for (;;) {
if (HandleReadline() == -1) {
break;
}
}
linenoiseDisableRawMode();
lua_freerepl();
_spinlock(&lualock);
lua_settop(GL, 0); // clear stack
_spunlock(&lualock);
return 0;
}
static void SigInit(void) {
xsigaction(SIGINT, OnInt, 0, 0, 0);
xsigaction(SIGHUP, OnHup, 0, 0, 0);
@ -7217,13 +7276,6 @@ void RedBean(int argc, char *argv[]) {
if (daemonize) {
Daemonize();
} else {
// xxx: create process group to make it easier to propagate SIGTERM
// to children. the downside to doing this seems to be that
// ctrl-c isn't propagating as expected when running redbean
// underneath strace.com :|
if (!IsWindows()) {
setpgrp();
}
if (logpath) {
close(2);
open(logpath, O_APPEND | O_WRONLY | O_CREAT, 0640);
@ -7240,14 +7292,18 @@ void RedBean(int argc, char *argv[]) {
isinitialized = true;
CallSimpleHookIfDefined("OnServerStart");
#ifdef STATIC
EventLoop(-1, HEARTBEAT);
EventLoop(HEARTBEAT);
#else
GetResolvConf(); // for effect
GetHostsTxt(); // for effect
if (!IsWindows() && isatty(0)) {
ReplEventLoop();
GetResolvConf(); // for effect
if (daemonize || !linenoiseIsTerminal()) {
EventLoop(HEARTBEAT);
} else if (IsWindows()) {
uint32_t tid;
CreateThread(0, 0, NT2SYSV(WindowsReplThread), 0, 0, &tid);
EventLoop(100);
} else {
EventLoop(-1, HEARTBEAT);
ReplEventLoop();
}
#endif
if (!isexitingworker) {