mirror of
https://github.com/jart/cosmopolitan.git
synced 2025-07-06 03:08:31 +00:00
Overhaul process spawning
This commit is contained in:
parent
99dc1281f5
commit
26e254fb4d
96 changed files with 1848 additions and 1541 deletions
|
@ -16,8 +16,12 @@
|
|||
│ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │
|
||||
│ PERFORMANCE OF THIS SOFTWARE. │
|
||||
╚─────────────────────────────────────────────────────────────────────────────*/
|
||||
#include "libc/assert.h"
|
||||
#include "libc/atomic.h"
|
||||
#include "libc/calls/calls.h"
|
||||
#include "libc/calls/struct/rusage.h"
|
||||
#include "libc/calls/struct/sigaction.h"
|
||||
#include "libc/calls/struct/sigset.h"
|
||||
#include "libc/calls/struct/stat.h"
|
||||
#include "libc/calls/struct/timespec.h"
|
||||
#include "libc/calls/struct/timeval.h"
|
||||
|
@ -26,19 +30,27 @@
|
|||
#include "libc/fmt/conv.h"
|
||||
#include "libc/fmt/libgen.h"
|
||||
#include "libc/intrin/bits.h"
|
||||
#include "libc/intrin/kprintf.h"
|
||||
#include "libc/log/appendresourcereport.internal.h"
|
||||
#include "libc/log/check.h"
|
||||
#include "libc/log/log.h"
|
||||
#include "libc/macros.internal.h"
|
||||
#include "libc/mem/gc.h"
|
||||
#include "libc/mem/gc.internal.h"
|
||||
#include "libc/mem/mem.h"
|
||||
#include "libc/nexgen32e/crc32.h"
|
||||
#include "libc/runtime/runtime.h"
|
||||
#include "libc/runtime/syslib.internal.h"
|
||||
#include "libc/sock/sock.h"
|
||||
#include "libc/sock/struct/pollfd.h"
|
||||
#include "libc/sock/struct/sockaddr.h"
|
||||
#include "libc/stdio/append.h"
|
||||
#include "libc/stdio/posix_spawn.h"
|
||||
#include "libc/stdio/rand.h"
|
||||
#include "libc/stdio/stdio.h"
|
||||
#include "libc/str/str.h"
|
||||
#include "libc/sysv/consts/af.h"
|
||||
#include "libc/sysv/consts/at.h"
|
||||
#include "libc/sysv/consts/clock.h"
|
||||
#include "libc/sysv/consts/ex.h"
|
||||
#include "libc/sysv/consts/exit.h"
|
||||
#include "libc/sysv/consts/f.h"
|
||||
|
@ -48,15 +60,21 @@
|
|||
#include "libc/sysv/consts/itimer.h"
|
||||
#include "libc/sysv/consts/o.h"
|
||||
#include "libc/sysv/consts/poll.h"
|
||||
#include "libc/sysv/consts/posix.h"
|
||||
#include "libc/sysv/consts/sa.h"
|
||||
#include "libc/sysv/consts/sig.h"
|
||||
#include "libc/sysv/consts/so.h"
|
||||
#include "libc/sysv/consts/sock.h"
|
||||
#include "libc/sysv/consts/sol.h"
|
||||
#include "libc/sysv/consts/w.h"
|
||||
#include "libc/temp.h"
|
||||
#include "libc/thread/thread.h"
|
||||
#include "libc/thread/thread2.h"
|
||||
#include "libc/time/struct/tm.h"
|
||||
#include "libc/time/time.h"
|
||||
#include "libc/x/x.h"
|
||||
#include "libc/x/xasprintf.h"
|
||||
#include "libc/x/xsigaction.h"
|
||||
#include "net/http/escape.h"
|
||||
#include "net/https/https.h"
|
||||
#include "third_party/getopt/getopt.internal.h"
|
||||
#include "third_party/mbedtls/ssl.h"
|
||||
|
@ -104,45 +122,64 @@
|
|||
#define kLogFile "o/runitd.log"
|
||||
#define kLogMaxBytes (2 * 1000 * 1000)
|
||||
|
||||
#define LOG_LEVEL_WARN 0
|
||||
#define LOG_LEVEL_INFO 1
|
||||
#define LOG_LEVEL_VERB 3
|
||||
#define LOG_LEVEL_DEBU 3
|
||||
|
||||
#define DEBUF(FMT, ...) LOGF(DEBU, FMT, ##__VA_ARGS__)
|
||||
#define VERBF(FMT, ...) LOGF(VERB, FMT, ##__VA_ARGS__)
|
||||
#define INFOF(FMT, ...) LOGF(INFO, FMT, ##__VA_ARGS__)
|
||||
#define WARNF(FMT, ...) LOGF(WARN, FMT, ##__VA_ARGS__)
|
||||
|
||||
#define LOGF(LVL, FMT, ...) \
|
||||
do { \
|
||||
if (g_log_level >= LOG_LEVEL_##LVL) { \
|
||||
kprintf("%r" #LVL " %6P %'18T %s:%d " FMT "\n", __FILE__, __LINE__, \
|
||||
##__VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
struct Client {
|
||||
int fd;
|
||||
int pid;
|
||||
int pipe[2];
|
||||
pthread_t th;
|
||||
uint32_t addrsize;
|
||||
struct sockaddr_in addr;
|
||||
bool once;
|
||||
int zstatus;
|
||||
z_stream zs;
|
||||
struct {
|
||||
size_t off;
|
||||
size_t len;
|
||||
size_t cap;
|
||||
char *data;
|
||||
} rbuf;
|
||||
char *output;
|
||||
char exepath[128];
|
||||
char buf[32768];
|
||||
};
|
||||
|
||||
char *g_psk;
|
||||
int g_log_level;
|
||||
bool use_ftrace;
|
||||
bool use_strace;
|
||||
char *g_exepath;
|
||||
unsigned char g_buf[4096];
|
||||
volatile bool g_interrupted;
|
||||
char g_hostname[256];
|
||||
int g_bogusfd, g_servfd;
|
||||
atomic_bool g_interrupted;
|
||||
struct sockaddr_in g_servaddr;
|
||||
bool g_daemonize, g_sendready;
|
||||
int g_timeout, g_bogusfd, g_servfd, g_clifd, g_exefd;
|
||||
|
||||
void OnInterrupt(int sig) {
|
||||
g_interrupted = true;
|
||||
}
|
||||
|
||||
void OnChildTerminated(int sig) {
|
||||
int e, ws, pid;
|
||||
sigset_t ss, oldss;
|
||||
e = errno; // SIGCHLD can be called asynchronously
|
||||
sigfillset(&ss);
|
||||
sigdelset(&ss, SIGTERM);
|
||||
sigprocmask(SIG_BLOCK, &ss, &oldss);
|
||||
for (;;) {
|
||||
if ((pid = waitpid(-1, &ws, WNOHANG)) != -1) {
|
||||
if (pid) {
|
||||
if (WIFEXITED(ws)) {
|
||||
DEBUGF("worker %d exited with %d", pid, WEXITSTATUS(ws));
|
||||
} else {
|
||||
DEBUGF("worker %d terminated with %s", pid, strsignal(WTERMSIG(ws)));
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (errno == EINTR) continue;
|
||||
if (errno == ECHILD) break;
|
||||
FATALF("waitpid failed in sigchld");
|
||||
}
|
||||
void Close(int *fd) {
|
||||
if (*fd > 0) {
|
||||
close(*fd);
|
||||
*fd = -1; // poll ignores -1
|
||||
}
|
||||
sigprocmask(SIG_SETMASK, &oldss, 0);
|
||||
errno = e;
|
||||
}
|
||||
|
||||
wontreturn void ShowUsage(FILE *f, int rc) {
|
||||
|
@ -151,9 +188,18 @@ wontreturn void ShowUsage(FILE *f, int rc) {
|
|||
exit(rc);
|
||||
}
|
||||
|
||||
char *DescribeAddress(struct sockaddr_in *addr) {
|
||||
static _Thread_local char res[64];
|
||||
char ip4buf[64];
|
||||
sprintf(res, "%s:%hu",
|
||||
inet_ntop(addr->sin_family, &addr->sin_addr.s_addr, ip4buf,
|
||||
sizeof(ip4buf)),
|
||||
ntohs(addr->sin_port));
|
||||
return res;
|
||||
}
|
||||
|
||||
void GetOpts(int argc, char *argv[]) {
|
||||
int opt;
|
||||
g_timeout = RUNITD_TIMEOUT_MS;
|
||||
g_servaddr.sin_family = AF_INET;
|
||||
g_servaddr.sin_port = htons(RUNITD_PORT);
|
||||
g_servaddr.sin_addr.s_addr = INADDR_ANY;
|
||||
|
@ -166,10 +212,10 @@ void GetOpts(int argc, char *argv[]) {
|
|||
use_strace = true;
|
||||
break;
|
||||
case 'q':
|
||||
--__log_level;
|
||||
--g_log_level;
|
||||
break;
|
||||
case 'v':
|
||||
++__log_level;
|
||||
++g_log_level;
|
||||
break;
|
||||
case 'd':
|
||||
g_daemonize = true;
|
||||
|
@ -178,56 +224,44 @@ void GetOpts(int argc, char *argv[]) {
|
|||
g_sendready = true;
|
||||
break;
|
||||
case 't':
|
||||
g_timeout = atoi(optarg);
|
||||
break;
|
||||
case 'p':
|
||||
CHECK_NE(0xFFFF, (g_servaddr.sin_port = htons(parseport(optarg))));
|
||||
g_servaddr.sin_port = htons(parseport(optarg));
|
||||
break;
|
||||
case 'l':
|
||||
CHECK_EQ(1, inet_pton(AF_INET, optarg, &g_servaddr.sin_addr));
|
||||
inet_pton(AF_INET, optarg, &g_servaddr.sin_addr);
|
||||
break;
|
||||
case 'h':
|
||||
ShowUsage(stdout, EXIT_SUCCESS);
|
||||
__builtin_unreachable();
|
||||
default:
|
||||
ShowUsage(stderr, EX_USAGE);
|
||||
__builtin_unreachable();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
__wur char *DescribeAddress(struct sockaddr_in *addr) {
|
||||
char ip4buf[16];
|
||||
return xasprintf("%s:%hu",
|
||||
inet_ntop(addr->sin_family, &addr->sin_addr.s_addr, ip4buf,
|
||||
sizeof(ip4buf)),
|
||||
ntohs(addr->sin_port));
|
||||
}
|
||||
|
||||
void StartTcpServer(void) {
|
||||
int yes = true;
|
||||
uint32_t asize;
|
||||
|
||||
/*
|
||||
* TODO: How can we make close(serversocket) on Windows go fast?
|
||||
* That way we can put back SOCK_CLOEXEC.
|
||||
*/
|
||||
CHECK_NE(-1, (g_servfd =
|
||||
socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP)));
|
||||
|
||||
struct timeval timeo = {30};
|
||||
g_servfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
|
||||
if (g_servfd == -1) {
|
||||
fprintf(stderr, program_invocation_short_name,
|
||||
": socket failed: ", strerror(errno), "\n", NULL);
|
||||
exit(1);
|
||||
}
|
||||
struct timeval timeo = {DEATH_CLOCK_SECONDS / 10};
|
||||
setsockopt(g_servfd, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo));
|
||||
setsockopt(g_servfd, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo));
|
||||
|
||||
LOGIFNEG1(setsockopt(g_servfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)));
|
||||
setsockopt(g_servfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
|
||||
if (bind(g_servfd, (struct sockaddr *)&g_servaddr, sizeof(g_servaddr)) ==
|
||||
-1) {
|
||||
FATALF("bind failed %m");
|
||||
fprintf(stderr, program_invocation_short_name,
|
||||
": bind failed: ", strerror(errno), "\n", NULL);
|
||||
exit(1);
|
||||
}
|
||||
CHECK_NE(-1, listen(g_servfd, 10));
|
||||
unassert(!listen(g_servfd, 10));
|
||||
asize = sizeof(g_servaddr);
|
||||
CHECK_NE(-1, getsockname(g_servfd, (struct sockaddr *)&g_servaddr, &asize));
|
||||
INFOF("%s:%s", "listening on tcp", _gc(DescribeAddress(&g_servaddr)));
|
||||
unassert(!getsockname(g_servfd, (struct sockaddr *)&g_servaddr, &asize));
|
||||
INFOF("listening on tcp:%s", DescribeAddress(&g_servaddr));
|
||||
if (g_sendready) {
|
||||
printf("ready %hu\n", ntohs(g_servaddr.sin_port));
|
||||
fflush(stdout);
|
||||
|
@ -237,22 +271,28 @@ void StartTcpServer(void) {
|
|||
}
|
||||
|
||||
void SendExitMessage(int rc) {
|
||||
EzSanity();
|
||||
int res;
|
||||
unsigned char msg[4 + 1 + 1];
|
||||
DEBUF("SendExitMessage");
|
||||
msg[0 + 0] = (RUNITD_MAGIC & 0xff000000) >> 030;
|
||||
msg[0 + 1] = (RUNITD_MAGIC & 0x00ff0000) >> 020;
|
||||
msg[0 + 2] = (RUNITD_MAGIC & 0x0000ff00) >> 010;
|
||||
msg[0 + 3] = (RUNITD_MAGIC & 0x000000ff) >> 000;
|
||||
msg[4] = kRunitExit;
|
||||
msg[5] = rc;
|
||||
INFOF("mbedtls_ssl_write");
|
||||
CHECK_EQ(sizeof(msg), mbedtls_ssl_write(&ezssl, msg, sizeof(msg)));
|
||||
CHECK_EQ(0, EzTlsFlush(&ezbio, 0, 0));
|
||||
DEBUF("mbedtls_ssl_write");
|
||||
if (sizeof(msg) != (res = mbedtls_ssl_write(&ezssl, msg, sizeof(msg)))) {
|
||||
EzTlsDie("SendExitMessage mbedtls_ssl_write failed", res);
|
||||
}
|
||||
if ((res = EzTlsFlush(&ezbio, 0, 0))) {
|
||||
EzTlsDie("SendExitMessage EzTlsFlush failed", res);
|
||||
}
|
||||
}
|
||||
|
||||
void SendOutputFragmentMessage(enum RunitCommand kind, unsigned char *buf,
|
||||
size_t size) {
|
||||
void SendOutputFragmentMessage(enum RunitCommand kind, char *buf, size_t size) {
|
||||
EzSanity();
|
||||
ssize_t rc;
|
||||
size_t sent;
|
||||
unsigned char msg[4 + 1 + 4];
|
||||
msg[0 + 0] = (RUNITD_MAGIC & 0xff000000) >> 030;
|
||||
msg[0 + 1] = (RUNITD_MAGIC & 0x00ff0000) >> 020;
|
||||
|
@ -263,309 +303,451 @@ void SendOutputFragmentMessage(enum RunitCommand kind, unsigned char *buf,
|
|||
msg[5 + 1] = (size & 0x00ff0000) >> 020;
|
||||
msg[5 + 2] = (size & 0x0000ff00) >> 010;
|
||||
msg[5 + 3] = (size & 0x000000ff) >> 000;
|
||||
INFOF("mbedtls_ssl_write");
|
||||
CHECK_EQ(sizeof(msg), mbedtls_ssl_write(&ezssl, msg, sizeof(msg)));
|
||||
while (size) {
|
||||
CHECK_NE(-1, (rc = mbedtls_ssl_write(&ezssl, buf, size)));
|
||||
CHECK_LE((sent = (size_t)rc), size);
|
||||
size -= sent;
|
||||
buf += sent;
|
||||
DEBUF("mbedtls_ssl_write");
|
||||
if (sizeof(msg) != (rc = mbedtls_ssl_write(&ezssl, msg, sizeof(msg)))) {
|
||||
EzTlsDie("SendOutputFragmentMessage mbedtls_ssl_write failed", rc);
|
||||
}
|
||||
while (size) {
|
||||
if ((rc = mbedtls_ssl_write(&ezssl, buf, size)) <= 0) {
|
||||
EzTlsDie("SendOutputFragmentMessage mbedtls_ssl_write #2 failed", rc);
|
||||
}
|
||||
size -= rc;
|
||||
buf += rc;
|
||||
}
|
||||
if ((rc = EzTlsFlush(&ezbio, 0, 0))) {
|
||||
EzTlsDie("SendOutputFragmentMessage EzTlsFlush failed", rc);
|
||||
}
|
||||
CHECK_EQ(0, EzTlsFlush(&ezbio, 0, 0));
|
||||
}
|
||||
|
||||
void Recv(void *output, size_t outputsize) {
|
||||
void Recv(struct Client *client, void *output, size_t outputsize) {
|
||||
EzSanity();
|
||||
ssize_t chunk, received, totalgot;
|
||||
static bool once;
|
||||
static int zstatus;
|
||||
static char buf[32768];
|
||||
static z_stream zs;
|
||||
static struct {
|
||||
size_t off;
|
||||
size_t len;
|
||||
size_t cap;
|
||||
char *data;
|
||||
} rbuf;
|
||||
if (!once) {
|
||||
CHECK_EQ(Z_OK, inflateInit(&zs));
|
||||
once = true;
|
||||
if (!client->once) {
|
||||
unassert(Z_OK == inflateInit(&client->zs));
|
||||
client->once = true;
|
||||
}
|
||||
totalgot = 0;
|
||||
for (;;) {
|
||||
if (rbuf.len >= outputsize) {
|
||||
memcpy(output, rbuf.data + rbuf.off, outputsize);
|
||||
rbuf.len -= outputsize;
|
||||
rbuf.off += outputsize;
|
||||
if (client->rbuf.len >= outputsize) {
|
||||
memcpy(output, client->rbuf.data + client->rbuf.off, outputsize);
|
||||
client->rbuf.len -= outputsize;
|
||||
client->rbuf.off += outputsize;
|
||||
// trim dymanic buffer once it empties
|
||||
if (!rbuf.len) {
|
||||
rbuf.off = 0;
|
||||
rbuf.cap = 4096;
|
||||
rbuf.data = realloc(rbuf.data, rbuf.cap);
|
||||
if (!client->rbuf.len) {
|
||||
client->rbuf.off = 0;
|
||||
client->rbuf.cap = 4096;
|
||||
client->rbuf.data = realloc(client->rbuf.data, client->rbuf.cap);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (zstatus == Z_STREAM_END) {
|
||||
close(g_clifd);
|
||||
FATALF("recv zlib unexpected eof");
|
||||
if (client->zstatus == Z_STREAM_END) {
|
||||
WARNF("recv zlib unexpected eof");
|
||||
pthread_exit(0);
|
||||
}
|
||||
// get another fixed-size data packet from network
|
||||
// pass along error conditions to caller
|
||||
// pass along eof condition to zlib
|
||||
received = mbedtls_ssl_read(&ezssl, buf, sizeof(buf));
|
||||
received = mbedtls_ssl_read(&ezssl, client->buf, sizeof(client->buf));
|
||||
if (!received) {
|
||||
close(g_clifd);
|
||||
TlsDie("got unexpected eof", received);
|
||||
EzTlsDie("got unexpected eof", received);
|
||||
}
|
||||
if (received < 0) {
|
||||
close(g_clifd);
|
||||
TlsDie("read failed", received);
|
||||
EzTlsDie("read failed", received);
|
||||
}
|
||||
totalgot += received;
|
||||
// decompress packet completely
|
||||
// into a dynamical size buffer
|
||||
zs.avail_in = received;
|
||||
zs.next_in = (unsigned char *)buf;
|
||||
CHECK_EQ(Z_OK, zstatus);
|
||||
client->zs.avail_in = received;
|
||||
client->zs.next_in = (unsigned char *)client->buf;
|
||||
unassert(Z_OK == client->zstatus);
|
||||
do {
|
||||
// make sure we have a reasonable capacity for zlib output
|
||||
if (rbuf.cap - (rbuf.off + rbuf.len) < sizeof(buf)) {
|
||||
rbuf.cap += sizeof(buf);
|
||||
rbuf.data = realloc(rbuf.data, rbuf.cap);
|
||||
if (client->rbuf.cap - (client->rbuf.off + client->rbuf.len) <
|
||||
sizeof(client->buf)) {
|
||||
client->rbuf.cap += sizeof(client->buf);
|
||||
client->rbuf.data = realloc(client->rbuf.data, client->rbuf.cap);
|
||||
}
|
||||
// inflate packet, which naturally can be much larger
|
||||
// permit zlib no delay flushes that come from sender
|
||||
zs.next_out = (unsigned char *)rbuf.data + (rbuf.off + rbuf.len);
|
||||
zs.avail_out = chunk = rbuf.cap - (rbuf.off + rbuf.len);
|
||||
zstatus = inflate(&zs, Z_SYNC_FLUSH);
|
||||
CHECK_NE(Z_STREAM_ERROR, zstatus);
|
||||
switch (zstatus) {
|
||||
client->zs.next_out = (unsigned char *)client->rbuf.data +
|
||||
(client->rbuf.off + client->rbuf.len);
|
||||
client->zs.avail_out = chunk =
|
||||
client->rbuf.cap - (client->rbuf.off + client->rbuf.len);
|
||||
client->zstatus = inflate(&client->zs, Z_SYNC_FLUSH);
|
||||
unassert(Z_STREAM_ERROR != client->zstatus);
|
||||
switch (client->zstatus) {
|
||||
case Z_NEED_DICT:
|
||||
WARNF("tls recv Z_NEED_DICT %ld total %ld", received, totalgot);
|
||||
exit(1);
|
||||
pthread_exit(0);
|
||||
case Z_DATA_ERROR:
|
||||
WARNF("tls recv Z_DATA_ERROR %ld total %ld", received, totalgot);
|
||||
exit(1);
|
||||
pthread_exit(0);
|
||||
case Z_MEM_ERROR:
|
||||
WARNF("tls recv Z_MEM_ERROR %ld total %ld", received, totalgot);
|
||||
exit(1);
|
||||
pthread_exit(0);
|
||||
case Z_BUF_ERROR:
|
||||
zstatus = Z_OK; // harmless? nothing for inflate to do
|
||||
break; // it probably just our wraparound eof
|
||||
client->zstatus = Z_OK; // harmless? nothing for inflate to do
|
||||
break; // it probably just our wraparound eof
|
||||
default:
|
||||
rbuf.len += chunk - zs.avail_out;
|
||||
client->rbuf.len += chunk - client->zs.avail_out;
|
||||
break;
|
||||
}
|
||||
} while (!zs.avail_out);
|
||||
} while (!client->zs.avail_out);
|
||||
}
|
||||
}
|
||||
|
||||
void HandleClient(void) {
|
||||
ssize_t got;
|
||||
void SendProgramOutut(struct Client *client) {
|
||||
if (client->output) {
|
||||
SendOutputFragmentMessage(kRunitStderr, client->output,
|
||||
appendz(client->output).i);
|
||||
}
|
||||
}
|
||||
|
||||
void PrintProgramOutput(struct Client *client) {
|
||||
if (client->output) {
|
||||
char *p = client->output;
|
||||
size_t z = appendz(p).i;
|
||||
if ((p = IndentLines(p, z, &z, 2))) {
|
||||
fwrite(p, 1, z, stderr);
|
||||
free(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FreeClient(struct Client *client) {
|
||||
DEBUF("FreeClient");
|
||||
if (client->pid) {
|
||||
kill(client->pid, SIGHUP);
|
||||
waitpid(client->pid, 0, 0);
|
||||
}
|
||||
Close(&client->fd);
|
||||
if (*client->exepath) {
|
||||
unlink(client->exepath);
|
||||
}
|
||||
if (client->once) {
|
||||
inflateEnd(&client->zs);
|
||||
}
|
||||
EzDestroy();
|
||||
free(client->rbuf.data);
|
||||
free(client->output);
|
||||
free(client);
|
||||
VERBF("---------------");
|
||||
}
|
||||
|
||||
void *ClientWorker(void *arg) {
|
||||
uint32_t crc;
|
||||
sigset_t sigmask;
|
||||
struct sockaddr_in addr;
|
||||
struct timespec now, deadline;
|
||||
int events, wstatus;
|
||||
struct Client *client = arg;
|
||||
uint32_t namesize, filesize;
|
||||
char *addrstr, *exename, *exe;
|
||||
unsigned char msg[4 + 1 + 4 + 4 + 4];
|
||||
uint32_t addrsize, namesize, filesize;
|
||||
int events, exitcode, wstatus, child, pipefds[2];
|
||||
|
||||
/* read request to run program */
|
||||
addrsize = sizeof(addr);
|
||||
INFOF("accept");
|
||||
do {
|
||||
g_clifd =
|
||||
accept4(g_servfd, (struct sockaddr *)&addr, &addrsize, SOCK_CLOEXEC);
|
||||
} while (g_clifd == -1 && errno == EAGAIN);
|
||||
CHECK_NE(-1, g_clifd);
|
||||
if (fork()) {
|
||||
close(g_clifd);
|
||||
return;
|
||||
}
|
||||
EzFd(g_clifd);
|
||||
INFOF("EzHandshake");
|
||||
SetupPresharedKeySsl(MBEDTLS_SSL_IS_SERVER, g_psk);
|
||||
defer(FreeClient, client);
|
||||
|
||||
// read request to run program
|
||||
EzFd(client->fd);
|
||||
DEBUF("EzHandshake");
|
||||
EzHandshake();
|
||||
addrstr = _gc(DescribeAddress(&addr));
|
||||
DEBUGF("%s %s %s", _gc(DescribeAddress(&g_servaddr)), "accepted", addrstr);
|
||||
addrstr = DescribeAddress(&client->addr);
|
||||
DEBUF("%s %s %s", DescribeAddress(&g_servaddr), "accepted", addrstr);
|
||||
|
||||
Recv(msg, sizeof(msg));
|
||||
CHECK_EQ(RUNITD_MAGIC, READ32BE(msg));
|
||||
CHECK_EQ(kRunitExecute, msg[4]);
|
||||
// get the executable
|
||||
Recv(client, msg, sizeof(msg));
|
||||
if (READ32BE(msg) != RUNITD_MAGIC) {
|
||||
WARNF("%s magic mismatch!", addrstr);
|
||||
pthread_exit(0);
|
||||
}
|
||||
if (msg[4] != kRunitExecute) {
|
||||
WARNF("%s unknown command!", addrstr);
|
||||
pthread_exit(0);
|
||||
}
|
||||
namesize = READ32BE(msg + 5);
|
||||
filesize = READ32BE(msg + 9);
|
||||
crc = READ32BE(msg + 13);
|
||||
exename = _gc(calloc(1, namesize + 1));
|
||||
Recv(exename, namesize);
|
||||
g_exepath = _gc(xasprintf("o/%d.%s", getpid(), basename(exename)));
|
||||
INFOF("%s asked we run %`'s (%,u bytes @ %`'s)", addrstr, exename, filesize,
|
||||
g_exepath);
|
||||
|
||||
exe = malloc(filesize);
|
||||
Recv(exe, filesize);
|
||||
exename = gc(calloc(1, namesize + 1));
|
||||
Recv(client, exename, namesize);
|
||||
INFOF("%s sent %#s (%'u bytes @ %#s)", addrstr, exename, filesize,
|
||||
client->exepath);
|
||||
exe = gc(malloc(filesize));
|
||||
Recv(client, exe, filesize);
|
||||
if (crc32_z(0, exe, filesize) != crc) {
|
||||
FATALF("%s crc mismatch! %`'s", addrstr, exename);
|
||||
WARNF("%s crc mismatch! %#s", addrstr, exename);
|
||||
pthread_exit(0);
|
||||
}
|
||||
CHECK_NE(-1, (g_exefd = creat(g_exepath, 0700)));
|
||||
LOGIFNEG1(ftruncate(g_exefd, filesize));
|
||||
CHECK_NE(-1, xwrite(g_exefd, exe, filesize));
|
||||
LOGIFNEG1(close(g_exefd));
|
||||
|
||||
/* run program, tee'ing stderr to both log and client */
|
||||
DEBUGF("spawning %s", exename);
|
||||
// create the executable file
|
||||
// if another thread vforks while we're writing it then a race
|
||||
// condition can happen, where etxtbsy is raised by our execve
|
||||
// we're using o_cloexec so it's guaranteed to fix itself fast
|
||||
// thus we use an optimistic approach to avoid expensive locks
|
||||
sprintf(client->exepath, "o/%s.XXXXXX.com", basename(exename));
|
||||
int exefd = openatemp(AT_FDCWD, client->exepath, 4, O_CLOEXEC, 0700);
|
||||
ftruncate(exefd, filesize);
|
||||
if (write(exefd, exe, filesize) != filesize) {
|
||||
WARNF("%s failed to write %#s", addrstr, exename);
|
||||
close(exefd);
|
||||
pthread_exit(0);
|
||||
}
|
||||
if (close(exefd)) {
|
||||
WARNF("%s failed to close %#s", addrstr, exename);
|
||||
pthread_exit(0);
|
||||
}
|
||||
|
||||
// do the args
|
||||
int i = 0;
|
||||
char *args[8] = {0};
|
||||
if (!IsXnuSilicon()) {
|
||||
exe = client->exepath;
|
||||
} else {
|
||||
exe = "ape-m1.com";
|
||||
args[i++] = (char *)exe;
|
||||
args[i++] = "-";
|
||||
args[i++] = client->exepath;
|
||||
}
|
||||
args[i++] = client->exepath;
|
||||
if (use_strace) args[i++] = "--strace";
|
||||
if (use_ftrace) args[i++] = "--ftrace";
|
||||
|
||||
// run program, tee'ing stderr to both log and client
|
||||
DEBUF("spawning %s", client->exepath);
|
||||
sigemptyset(&sigmask);
|
||||
sigaddset(&sigmask, SIGINT);
|
||||
sigaddset(&sigmask, SIGQUIT);
|
||||
sigaddset(&sigmask, SIGCHLD);
|
||||
sigprocmask(SIG_BLOCK, &sigmask, 0);
|
||||
CHECK_NE(-1, pipe2(pipefds, O_CLOEXEC));
|
||||
CHECK_NE(-1, (child = fork()));
|
||||
if (!child) {
|
||||
dup2(g_bogusfd, 0);
|
||||
dup2(pipefds[1], 1);
|
||||
dup2(pipefds[1], 2);
|
||||
sigemptyset(&sigmask);
|
||||
sigprocmask(SIG_SETMASK, &sigmask, 0);
|
||||
int i = 0;
|
||||
const char *exe;
|
||||
char *args[8] = {0};
|
||||
if (!IsXnuSilicon()) {
|
||||
exe = g_exepath;
|
||||
} else {
|
||||
exe = "ape-m1.com";
|
||||
args[i++] = (char *)exe;
|
||||
args[i++] = "-";
|
||||
args[i++] = g_exepath;
|
||||
|
||||
// spawn the program
|
||||
int etxtbsy_tries = 0;
|
||||
RetryOnEtxtbsyRaceCondition:
|
||||
if (etxtbsy_tries++) {
|
||||
if (etxtbsy_tries == 24) { // ~30 seconds
|
||||
WARNF("%s failed to spawn on %s due because either (1) the ETXTBSY race "
|
||||
"condition kept happening or (2) the program in question actually "
|
||||
"is crashing with SIGVTALRM, without printing anything to out/err!",
|
||||
exename, g_hostname);
|
||||
pthread_exit(0);
|
||||
}
|
||||
if (usleep(1u << etxtbsy_tries)) {
|
||||
INFOF("interrupted exponential spawn backoff");
|
||||
pthread_exit(0);
|
||||
}
|
||||
args[i++] = g_exepath;
|
||||
if (use_strace) args[i++] = "--strace";
|
||||
if (use_ftrace) args[i++] = "--ftrace";
|
||||
execvp(exe, args);
|
||||
_Exit(127);
|
||||
}
|
||||
signal(SIGINT, SIG_IGN);
|
||||
signal(SIGQUIT, SIG_IGN);
|
||||
close(pipefds[1]);
|
||||
DEBUGF("communicating %s[%d]", exename, child);
|
||||
deadline =
|
||||
errno_t err;
|
||||
posix_spawnattr_t spawnattr;
|
||||
posix_spawn_file_actions_t spawnfila;
|
||||
sigemptyset(&sigmask);
|
||||
pipe2(client->pipe, O_CLOEXEC);
|
||||
posix_spawnattr_init(&spawnattr);
|
||||
posix_spawnattr_setflags(&spawnattr, POSIX_SPAWN_SETPGROUP);
|
||||
posix_spawnattr_setsigmask(&spawnattr, &sigmask);
|
||||
posix_spawn_file_actions_init(&spawnfila);
|
||||
posix_spawn_file_actions_adddup2(&spawnfila, g_bogusfd, 0);
|
||||
posix_spawn_file_actions_adddup2(&spawnfila, client->pipe[1], 1);
|
||||
posix_spawn_file_actions_adddup2(&spawnfila, client->pipe[1], 2);
|
||||
err = posix_spawn(&client->pid, exe, &spawnfila, &spawnattr, args, environ);
|
||||
if (err) {
|
||||
Close(&client->pipe[1]);
|
||||
Close(&client->pipe[0]);
|
||||
if (err == ETXTBSY) {
|
||||
goto RetryOnEtxtbsyRaceCondition;
|
||||
}
|
||||
WARNF("%s failed to spawn on %s due to %s", exename, g_hostname,
|
||||
strerror(err));
|
||||
pthread_exit(0);
|
||||
}
|
||||
posix_spawn_file_actions_destroy(&spawnfila);
|
||||
posix_spawnattr_destroy(&spawnattr);
|
||||
Close(&client->pipe[1]);
|
||||
|
||||
DEBUF("communicating %s[%d]", exename, client->pid);
|
||||
struct timespec deadline =
|
||||
timespec_add(timespec_real(), timespec_fromseconds(DEATH_CLOCK_SECONDS));
|
||||
for (;;) {
|
||||
now = timespec_real();
|
||||
if (timespec_cmp(now, deadline) >= 0) {
|
||||
WARNF("%s worker timed out", exename);
|
||||
if (g_interrupted) {
|
||||
WARNF("killing %d %s and hanging up %d due to interrupt", client->fd,
|
||||
exename, client->pid);
|
||||
HangupClientAndTerminateJob:
|
||||
SendProgramOutut(client);
|
||||
mbedtls_ssl_close_notify(&ezssl);
|
||||
TerminateJob:
|
||||
LOGIFNEG1(kill(child, 9));
|
||||
LOGIFNEG1(waitpid(child, 0, 0));
|
||||
LOGIFNEG1(close(g_clifd));
|
||||
LOGIFNEG1(close(pipefds[0]));
|
||||
LOGIFNEG1(unlink(g_exepath));
|
||||
_exit(1);
|
||||
PrintProgramOutput(client);
|
||||
pthread_exit(0);
|
||||
}
|
||||
struct timespec now = timespec_real();
|
||||
if (timespec_cmp(now, deadline) >= 0) {
|
||||
WARNF("killing %s (pid %d) which timed out after %d seconds", exename,
|
||||
client->pid, DEATH_CLOCK_SECONDS);
|
||||
goto HangupClientAndTerminateJob;
|
||||
}
|
||||
struct pollfd fds[2];
|
||||
fds[0].fd = g_clifd;
|
||||
fds[0].fd = client->fd;
|
||||
fds[0].events = POLLIN;
|
||||
fds[1].fd = pipefds[0];
|
||||
fds[1].fd = client->pipe[0];
|
||||
fds[1].events = POLLIN;
|
||||
int waitms = timespec_tomillis(timespec_sub(deadline, now));
|
||||
INFOF("polling for %d ms", waitms);
|
||||
events = poll(fds, ARRAYLEN(fds), waitms);
|
||||
CHECK_NE(-1, events); // EINTR shouldn't be possible
|
||||
events = poll(fds, ARRAYLEN(fds),
|
||||
timespec_tomillis(timespec_sub(deadline, now)));
|
||||
if (events == -1) {
|
||||
if (errno == EINTR) {
|
||||
INFOF("poll interrupted");
|
||||
continue;
|
||||
} else {
|
||||
WARNF("killing %d %s and hanging up %d because poll failed", client->fd,
|
||||
exename, client->pid);
|
||||
goto HangupClientAndTerminateJob;
|
||||
}
|
||||
}
|
||||
if (events) {
|
||||
if (fds[0].revents) {
|
||||
int received;
|
||||
char buf[512];
|
||||
INFOF("mbedtls_ssl_read");
|
||||
received = mbedtls_ssl_read(&ezssl, buf, sizeof(buf));
|
||||
if (!received) {
|
||||
WARNF("%s client disconnected so killing worker %d", exename, child);
|
||||
WARNF("%s client disconnected so killing worker %d", exename,
|
||||
client->pid);
|
||||
goto TerminateJob;
|
||||
}
|
||||
if (received > 0) {
|
||||
WARNF("%s client sent %d unexpected bytes so killing job", exename,
|
||||
received);
|
||||
goto TerminateJob;
|
||||
goto HangupClientAndTerminateJob;
|
||||
}
|
||||
if (received != MBEDTLS_ERR_SSL_WANT_READ) {
|
||||
WARNF("%s client ssl read failed with -0x%04x so killing job",
|
||||
exename, -received);
|
||||
goto TerminateJob;
|
||||
if (received == MBEDTLS_ERR_SSL_WANT_READ) { // EAGAIN SO_RCVTIMEO
|
||||
WARNF("%s (pid %d) is taking a really long time", exename,
|
||||
client->pid);
|
||||
continue;
|
||||
}
|
||||
INFOF("got spurious ssl data");
|
||||
WARNF("client ssl read failed with -0x%04x (%s) so killing %s",
|
||||
-received, GetTlsError(received), exename);
|
||||
goto TerminateJob;
|
||||
}
|
||||
if (fds[1].revents) {
|
||||
INFOF("read");
|
||||
got = read(pipefds[0], g_buf, sizeof(g_buf));
|
||||
CHECK_NE(-1, got); // EINTR shouldn't be possible
|
||||
char buf[512];
|
||||
ssize_t got = read(client->pipe[0], buf, sizeof(buf));
|
||||
if (got == -1) {
|
||||
WARNF("got %s reading %s output", strerror(errno), exename);
|
||||
goto HangupClientAndTerminateJob;
|
||||
}
|
||||
if (!got) {
|
||||
LOGIFNEG1(close(pipefds[0]));
|
||||
VERBF("got eof reading %s output", exename);
|
||||
Close(&client->pipe[0]);
|
||||
break;
|
||||
}
|
||||
fwrite(g_buf, got, 1, stderr);
|
||||
SendOutputFragmentMessage(kRunitStderr, g_buf, got);
|
||||
DEBUF("got %ld bytes reading %s output", got, exename);
|
||||
appendd(&client->output, buf, got);
|
||||
}
|
||||
}
|
||||
}
|
||||
INFOF("waitpid");
|
||||
CHECK_NE(-1, waitpid(child, &wstatus, 0)); // EINTR shouldn't be possible
|
||||
WaitAgain:
|
||||
DEBUF("waitpid");
|
||||
struct rusage rusage;
|
||||
int wrc = wait4(client->pid, &wstatus, 0, &rusage);
|
||||
if (wrc == -1) {
|
||||
if (errno == EINTR) {
|
||||
WARNF("waitpid interrupted; killing %s pid %d", exename, client->pid);
|
||||
kill(client->pid, SIGINT);
|
||||
goto WaitAgain;
|
||||
}
|
||||
WARNF("waitpid failed %m");
|
||||
client->pid = 0;
|
||||
goto HangupClientAndTerminateJob;
|
||||
}
|
||||
client->pid = 0;
|
||||
int exitcode;
|
||||
if (WIFEXITED(wstatus)) {
|
||||
if (WEXITSTATUS(wstatus)) {
|
||||
WARNF("%s exited with %d", exename, WEXITSTATUS(wstatus));
|
||||
WARNF("%s on %s exited with %d", exename, g_hostname,
|
||||
WEXITSTATUS(wstatus));
|
||||
appendf(&client->output, "------ %s %s $?=%d (0x%08x) ------\n",
|
||||
g_hostname, exename, WEXITSTATUS(wstatus), wstatus);
|
||||
} else {
|
||||
VERBOSEF("%s exited with %d", exename, WEXITSTATUS(wstatus));
|
||||
VERBF("%s on %s exited with %d", exename, g_hostname,
|
||||
WEXITSTATUS(wstatus));
|
||||
}
|
||||
exitcode = WEXITSTATUS(wstatus);
|
||||
} else {
|
||||
WARNF("%s terminated with %s", exename, strsignal(WTERMSIG(wstatus)));
|
||||
} else if (WIFSIGNALED(wstatus)) {
|
||||
if (WTERMSIG(wstatus) == SIGVTALRM && !client->output) {
|
||||
free(client->output);
|
||||
client->output = 0;
|
||||
goto RetryOnEtxtbsyRaceCondition;
|
||||
}
|
||||
WARNF("%s on %s terminated with %s", exename, g_hostname,
|
||||
strsignal(WTERMSIG(wstatus)));
|
||||
exitcode = 128 + WTERMSIG(wstatus);
|
||||
appendf(&client->output, "------ %s %s $?=%s (0x%08x) ------\n", g_hostname,
|
||||
exename, strsignal(WTERMSIG(wstatus)), wstatus);
|
||||
} else {
|
||||
WARNF("%s on %s died with wait status 0x%08x", exename, g_hostname,
|
||||
wstatus);
|
||||
exitcode = 127;
|
||||
}
|
||||
LOGIFNEG1(unlink(g_exepath));
|
||||
if (wstatus) {
|
||||
AppendResourceReport(&client->output, &rusage, "\n");
|
||||
PrintProgramOutput(client);
|
||||
}
|
||||
SendProgramOutut(client);
|
||||
SendExitMessage(exitcode);
|
||||
INFOF("mbedtls_ssl_close_notify");
|
||||
mbedtls_ssl_close_notify(&ezssl);
|
||||
LOGIFNEG1(close(g_clifd));
|
||||
_exit(0);
|
||||
if (etxtbsy_tries) {
|
||||
VERBF("encountered %d ETXTBSY race conditions spawning %s", etxtbsy_tries,
|
||||
exename);
|
||||
}
|
||||
pthread_exit(0);
|
||||
}
|
||||
|
||||
int Poll(void) {
|
||||
int i, wait, evcount;
|
||||
struct pollfd fds[1];
|
||||
TryAgain:
|
||||
if (g_interrupted) return 0;
|
||||
fds[0].fd = g_servfd;
|
||||
fds[0].events = POLLIN | POLLERR | POLLHUP;
|
||||
wait = MIN(1000, g_timeout);
|
||||
evcount = poll(fds, ARRAYLEN(fds), wait);
|
||||
if (!evcount) g_timeout -= wait;
|
||||
if (evcount == -1 && errno == EINTR) goto TryAgain;
|
||||
CHECK_NE(-1, evcount);
|
||||
for (i = 0; i < evcount; ++i) {
|
||||
CHECK(fds[i].revents & POLLIN);
|
||||
HandleClient();
|
||||
void HandleClient(void) {
|
||||
struct Client *client;
|
||||
client = calloc(1, sizeof(struct Client));
|
||||
client->addrsize = sizeof(client->addr);
|
||||
for (;;) {
|
||||
if (g_interrupted) {
|
||||
free(client);
|
||||
return;
|
||||
}
|
||||
// poll() because we use SA_RESTART and accept() is @restartable
|
||||
if (poll(&(struct pollfd){g_servfd, POLLIN}, 1, -1) > 0) {
|
||||
client->fd = accept4(g_servfd, (struct sockaddr *)&client->addr,
|
||||
&client->addrsize, SOCK_CLOEXEC);
|
||||
if (client->fd != -1) {
|
||||
VERBF("accepted client fd %d", client->fd);
|
||||
break;
|
||||
} else if (errno != EINTR && errno != EAGAIN) {
|
||||
WARNF("accept4 failed %m");
|
||||
}
|
||||
} else if (errno != EINTR && errno != EAGAIN) {
|
||||
WARNF("poll failed %m");
|
||||
}
|
||||
}
|
||||
/* manually do this because of nt */
|
||||
while (waitpid(-1, NULL, WNOHANG) > 0) donothing;
|
||||
return evcount;
|
||||
sigset_t mask;
|
||||
pthread_attr_t attr;
|
||||
sigfillset(&mask);
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setsigmask_np(&attr, &mask);
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
|
||||
pthread_create(&client->th, &attr, ClientWorker, client);
|
||||
pthread_attr_destroy(&attr);
|
||||
}
|
||||
|
||||
int Serve(void) {
|
||||
sigset_t mask;
|
||||
StartTcpServer();
|
||||
sigaction(SIGINT, (&(struct sigaction){.sa_handler = OnInterrupt}), 0);
|
||||
sigaction(SIGCHLD,
|
||||
(&(struct sigaction){.sa_handler = OnChildTerminated,
|
||||
.sa_flags = SA_RESTART}),
|
||||
0);
|
||||
for (;;) {
|
||||
if (!Poll() && (!g_timeout || g_interrupted)) break;
|
||||
sigemptyset(&mask);
|
||||
sigaddset(&mask, SIGCHLD);
|
||||
signal(SIGINT, OnInterrupt);
|
||||
sigprocmask(SIG_BLOCK, &mask, 0);
|
||||
while (!g_interrupted) {
|
||||
HandleClient();
|
||||
}
|
||||
if (g_interrupted) {
|
||||
WARNF("got ctrl-c, shutting down...");
|
||||
}
|
||||
WARNF("server exiting");
|
||||
close(g_servfd);
|
||||
if (!g_timeout) {
|
||||
INFOF("timeout expired, shutting down");
|
||||
} else {
|
||||
INFOF("got ctrl-c, shutting down");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void Daemonize(void) {
|
||||
VERBF("Daemonize");
|
||||
struct stat st;
|
||||
if (fork() > 0) _exit(0);
|
||||
setsid();
|
||||
|
@ -579,19 +761,29 @@ void Daemonize(void) {
|
|||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
int i;
|
||||
SetupPresharedKeySsl(MBEDTLS_SSL_IS_SERVER, GetRunitPsk());
|
||||
__log_level = kLogWarn;
|
||||
#if IsModeDbg()
|
||||
ShowCrashReports();
|
||||
#endif
|
||||
GetOpts(argc, argv);
|
||||
for (i = 3; i < 16; ++i) close(i);
|
||||
g_psk = GetRunitPsk();
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
setenv("TZ", "PST", true);
|
||||
gethostname(g_hostname, sizeof(g_hostname));
|
||||
for (int i = 3; i < 16; ++i) close(i);
|
||||
errno = 0;
|
||||
// poll()'ing /dev/null stdin file descriptor on xnu returns POLLNVAL?!
|
||||
if (IsWindows()) {
|
||||
CHECK_EQ(3, (g_bogusfd = open("/dev/null", O_RDONLY | O_CLOEXEC)));
|
||||
g_bogusfd = open("/dev/null", O_RDONLY | O_CLOEXEC);
|
||||
} else {
|
||||
CHECK_EQ(3, (g_bogusfd = open("/dev/zero", O_RDONLY | O_CLOEXEC)));
|
||||
g_bogusfd = open("/dev/zero", O_RDONLY | O_CLOEXEC);
|
||||
}
|
||||
if (!isdirectory("o")) CHECK_NE(-1, mkdir("o", 0700));
|
||||
if (g_daemonize) Daemonize();
|
||||
return Serve();
|
||||
mkdir("o", 0700);
|
||||
Serve();
|
||||
free(g_psk);
|
||||
#if IsModeDbg()
|
||||
void CheckForMemoryLeaks(void);
|
||||
CheckForMemoryLeaks();
|
||||
#endif
|
||||
pthread_exit(0);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue