Work towards improving signals and processes

This commit is contained in:
Justine Tunney 2021-01-27 19:34:02 -08:00
parent de703b182c
commit d7ac16a9ed
96 changed files with 1474 additions and 427 deletions

View file

@ -20,6 +20,7 @@
#include "libc/bits/bits.h"
#include "libc/bits/safemacros.h"
#include "libc/calls/calls.h"
#include "libc/calls/sigbits.h"
#include "libc/calls/struct/flock.h"
#include "libc/calls/struct/itimerval.h"
#include "libc/calls/struct/sigaction.h"
@ -119,7 +120,7 @@ char g_hostname[128];
uint16_t g_runitdport;
volatile bool alarmed;
static void OnAlarm(void) {
static void OnAlarm(int sig) {
alarmed = true;
}
@ -170,7 +171,9 @@ void DeployEphemeralRunItDaemonRemotelyViaSsh(struct addrinfo *ai) {
struct stat st;
char linebuf[32];
struct timeval now, then;
sigset_t chldmask, savemask;
int sshpid, wstatus, binfd, pipefds[2][2];
struct sigaction ignore, saveint, savequit;
mkdir("o", 0755);
CHECK_NE(-1, (lock = open(gc(xasprintf("o/lock.%s", g_hostname)),
O_RDWR | O_CREAT, 0644)));
@ -179,7 +182,7 @@ void DeployEphemeralRunItDaemonRemotelyViaSsh(struct addrinfo *ai) {
if (!read(lock, &then, 16) || ((now.tv_sec * 1000 + now.tv_usec / 1000) -
(then.tv_sec * 1000 + then.tv_usec / 1000)) >=
(RUNITD_TIMEOUT_MS >> 1)) {
DEBUGF("spawning %s on %s:%hu", g_runitd, g_hostname, g_runitdport);
DEBUGF("ssh %s:%hu to spawn %s", g_hostname, g_runitdport, g_runitd);
CHECK_NE(-1, (binfd = open(g_runitd, O_RDONLY | O_CLOEXEC)));
CHECK_NE(-1, fstat(binfd, &st));
args[0] = "ssh";
@ -189,16 +192,28 @@ void DeployEphemeralRunItDaemonRemotelyViaSsh(struct addrinfo *ai) {
args[4] = g_hostname;
args[5] = gc(MakeDeployScript(ai, st.st_size));
args[6] = NULL;
ignore.sa_flags = 0;
ignore.sa_handler = SIG_IGN;
LOGIFNEG1(sigemptyset(&ignore.sa_mask));
LOGIFNEG1(sigaction(SIGINT, &ignore, &saveint));
LOGIFNEG1(sigaction(SIGQUIT, &ignore, &savequit));
LOGIFNEG1(sigemptyset(&chldmask));
LOGIFNEG1(sigaddset(&chldmask, SIGCHLD));
LOGIFNEG1(sigprocmask(SIG_BLOCK, &chldmask, &savemask));
CHECK_NE(-1, pipe2(pipefds[0], O_CLOEXEC));
CHECK_NE(-1, pipe2(pipefds[1], O_CLOEXEC));
if (!(sshpid = vfork())) {
CHECK_NE(-1, (sshpid = fork()));
if (!sshpid) {
sigaction(SIGINT, &saveint, NULL);
sigaction(SIGQUIT, &savequit, NULL);
sigprocmask(SIG_SETMASK, &savemask, NULL);
dup2(pipefds[0][0], 0);
dup2(pipefds[1][1], 1);
execv(g_ssh, args);
abort();
_exit(127);
}
close(pipefds[0][0]);
close(pipefds[1][1]);
LOGIFNEG1(close(pipefds[0][0]));
LOGIFNEG1(close(pipefds[1][1]));
Upload(pipefds[0][1], binfd, &st);
LOGIFNEG1(close(pipefds[0][1]));
CHECK_NE(-1, (got = read(pipefds[1][0], linebuf, sizeof(linebuf))));
@ -212,7 +227,16 @@ void DeployEphemeralRunItDaemonRemotelyViaSsh(struct addrinfo *ai) {
g_runitdport = (uint16_t)atoi(&linebuf[6]);
LOGIFNEG1(close(pipefds[1][0]));
CHECK_NE(-1, waitpid(sshpid, &wstatus, 0));
CHECK_EQ(0, WEXITSTATUS(wstatus));
LOGIFNEG1(sigaction(SIGINT, &saveint, NULL));
LOGIFNEG1(sigaction(SIGQUIT, &savequit, NULL));
LOGIFNEG1(sigprocmask(SIG_SETMASK, &savemask, NULL));
if (WIFEXITED(wstatus)) {
DEBUGF("ssh %s exited with %d", g_hostname, WEXITSTATUS(wstatus));
} else {
DEBUGF("ssh %s terminated with %s", g_hostname,
strsignal(WTERMSIG(wstatus)));
}
CHECK(WIFEXITED(wstatus) && !WEXITSTATUS(wstatus), "wstatus=%#x", wstatus);
CHECK_NE(-1, gettimeofday(&now, 0));
CHECK_NE(-1, lseek(lock, 0, SEEK_SET));
CHECK_NE(-1, write(lock, &now, 16));
@ -223,7 +247,11 @@ void DeployEphemeralRunItDaemonRemotelyViaSsh(struct addrinfo *ai) {
}
void SetDeadline(int micros) {
setitimer(ITIMER_REAL, &(const struct itimerval){{0, 0}, {0, micros}}, NULL);
alarmed = false;
LOGIFNEG1(
sigaction(SIGALRM, &(struct sigaction){.sa_handler = OnAlarm}, NULL));
LOGIFNEG1(setitimer(ITIMER_REAL,
&(const struct itimerval){{0, 0}, {0, micros}}, NULL));
}
void Connect(void) {
@ -242,28 +270,32 @@ void Connect(void) {
g_hostname, ip4[0], ip4[1], ip4[2], ip4[3]);
unreachable;
}
DEBUGF("connecting to %s (%hhu.%hhu.%hhu.%hhu) to run %s", g_hostname, ip4[0],
ip4[1], ip4[2], ip4[3], g_prog);
CHECK_NE(-1,
(g_sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)));
expo = 1;
TryAgain:
alarmed = false;
Reconnect:
DEBUGF("connecting to %s (%hhu.%hhu.%hhu.%hhu) to run %s", g_hostname, ip4[0],
ip4[1], ip4[2], ip4[3], g_prog);
SetDeadline(100000);
TryAgain:
rc = connect(g_sock, ai->ai_addr, ai->ai_addrlen);
err = errno;
SetDeadline(0);
if (rc == -1) {
if ((err == ECONNREFUSED || err == EHOSTUNREACH || err == ECONNRESET ||
err == EINTR)) {
if (err == EINTR) goto TryAgain;
if (err == ECONNREFUSED || err == EHOSTUNREACH || err == ECONNRESET) {
DEBUGF("got %s from %s (%hhu.%hhu.%hhu.%hhu)", strerror(err), g_hostname,
ip4[0], ip4[1], ip4[2], ip4[3]);
usleep((expo *= 2));
DeployEphemeralRunItDaemonRemotelyViaSsh(ai);
goto TryAgain;
goto Reconnect;
} else {
FATALF("%s(%s:%hu): %s", "connect", g_hostname, g_runitdport,
strerror(err));
unreachable;
}
} else {
DEBUGF("connected to %s", g_hostname);
}
freeaddrinfo(ai);
}
@ -275,6 +307,7 @@ void SendRequest(void) {
const char *name;
unsigned char *hdr;
size_t progsize, namesize, hdrsize;
DEBUGF("running %s on %s", g_prog, g_hostname);
CHECK_NE(-1, (fd = open(g_prog, O_RDONLY)));
CHECK_NE(-1, fstat(fd, &st));
CHECK_LE((namesize = strlen((name = basename(g_prog)))), PATH_MAX);
@ -370,8 +403,7 @@ int RunOnHost(char *spec) {
do {
Connect();
SendRequest();
rc = ReadResponse();
} while (rc == -1);
} while ((rc = ReadResponse()) == -1);
return rc;
}
@ -384,59 +416,56 @@ bool ShouldRunInParralel(void) {
return !IsWindows() && IsParallelBuild();
}
int RunRemoteTestsInSerial(char *hosts[], int count) {
int i, exitcode;
for (i = 0; i < count; ++i) {
if ((exitcode = RunOnHost(hosts[i]))) {
return exitcode;
}
}
return 0;
}
void OnInterrupt(int sig) {
static bool once;
if (!once) {
once = true;
gclongjmp(g_jmpbuf, 128 + sig);
} else {
abort();
}
}
int RunRemoteTestsInParallel(char *hosts[], int count) {
const struct sigaction onsigterm = {.sa_handler = (void *)OnInterrupt};
struct sigaction onsigint = {.sa_handler = (void *)OnInterrupt};
int i, rc, exitcode;
int64_t leader, *pids;
leader = getpid();
pids = gc(xcalloc(count, sizeof(char *)));
if (!(exitcode = setjmp(g_jmpbuf))) {
sigaction(SIGINT, &onsigint, NULL);
sigaction(SIGTERM, &onsigterm, NULL);
for (i = 0; i < count; ++i) {
CHECK_NE(-1, (pids[i] = fork()));
if (!pids[i]) {
return RunOnHost(hosts[i]);
}
sigset_t chldmask, savemask;
int i, rc, ws, pid, *pids, exitcode;
struct sigaction ignore, saveint, savequit;
pids = calloc(count, sizeof(char *));
ignore.sa_flags = 0;
ignore.sa_handler = SIG_IGN;
LOGIFNEG1(sigemptyset(&ignore.sa_mask));
LOGIFNEG1(sigaction(SIGINT, &ignore, &saveint));
LOGIFNEG1(sigaction(SIGQUIT, &ignore, &savequit));
LOGIFNEG1(sigemptyset(&chldmask));
LOGIFNEG1(sigaddset(&chldmask, SIGCHLD));
LOGIFNEG1(sigprocmask(SIG_BLOCK, &chldmask, &savemask));
for (i = 0; i < count; ++i) {
CHECK_NE(-1, (pids[i] = fork()));
if (!pids[i]) {
sigaction(SIGINT, &saveint, NULL);
sigaction(SIGQUIT, &savequit, NULL);
sigprocmask(SIG_SETMASK, &savemask, NULL);
_exit(RunOnHost(hosts[i]));
}
for (i = 0; i < count; ++i) {
CHECK_NE(-1, waitpid(pids[i], &rc, 0));
exitcode |= WEXITSTATUS(rc);
}
} else if (getpid() == leader) {
onsigint.sa_handler = SIG_IGN;
sigaction(SIGINT, &onsigint, NULL);
kill(0, SIGINT);
while (waitpid(-1, NULL, 0) > 0) donothing;
}
for (exitcode = 0;;) {
if ((pid = wait(&ws)) == -1) {
if (errno == EINTR) continue;
if (errno == ECHILD) break;
FATALF("wait failed");
}
for (i = 0; i < count; ++i) {
if (pids[i] != pid) continue;
if (WIFEXITED(ws)) {
DEBUGF("%s exited with %d", hosts[i], WEXITSTATUS(ws));
if (!exitcode) exitcode = WEXITSTATUS(ws);
} else {
DEBUGF("%s terminated with %s", hosts[i], strsignal(WTERMSIG(ws)));
if (!exitcode) exitcode = 128 + WTERMSIG(ws);
}
break;
}
}
LOGIFNEG1(sigaction(SIGINT, &saveint, NULL));
LOGIFNEG1(sigaction(SIGQUIT, &savequit, NULL));
LOGIFNEG1(sigprocmask(SIG_SETMASK, &savemask, NULL));
free(pids);
return exitcode;
}
int main(int argc, char *argv[]) {
showcrashreports();
/* g_loglevel = kLogDebug; */
const struct sigaction onsigalrm = {.sa_handler = (void *)OnAlarm};
if (argc > 1 &&
(strcmp(argv[1], "-h") == 0 || strcmp(argv[1], "--help") == 0)) {
ShowUsage(stdout, 0);
@ -447,9 +476,7 @@ int main(int argc, char *argv[]) {
CheckExists((g_runitd = argv[1]));
CheckExists((g_prog = argv[2]));
if (argc == 1 + 2) return 0; /* hosts list empty */
sigaction(SIGALRM, &onsigalrm, NULL);
g_sshport = 22;
g_runitdport = RUNITD_PORT;
return (ShouldRunInParralel() ? RunRemoteTestsInParallel
: RunRemoteTestsInSerial)(&argv[3], argc - 3);
return RunRemoteTestsInParallel(&argv[3], argc - 3);
}

View file

@ -19,6 +19,7 @@
#include "libc/bits/bits.h"
#include "libc/bits/safemacros.h"
#include "libc/calls/calls.h"
#include "libc/calls/sigbits.h"
#include "libc/calls/struct/sigaction.h"
#include "libc/calls/struct/stat.h"
#include "libc/dce.h"
@ -96,32 +97,30 @@
#define kLogFile "o/runitd.log"
#define kLogMaxBytes (2 * 1000 * 1000)
jmp_buf g_jb;
char *g_exepath;
volatile bool g_childterm;
volatile int g_childstatus;
struct sockaddr_in g_servaddr;
unsigned char g_buf[PAGESIZE];
bool g_daemonize, g_sendready;
int g_timeout, g_devnullfd, g_servfd, g_clifd, g_exefd;
void OnInterrupt(int sig) {
static bool once;
if (once) abort();
once = true;
kill(0, sig);
for (;;) {
if (waitpid(-1, NULL, 0) == -1) {
break;
}
}
gclongjmp(g_jb, sig);
unreachable;
}
void OnChildTerminated(int sig) {
while (waitpid(-1, &g_childstatus, WNOHANG) > 0) {
g_childterm = true;
int ws, pid;
for (;;) {
if ((pid = waitpid(-1, &ws, WNOHANG)) != -1) {
if (pid) {
if (WIFEXITED(ws)) {
DEBUGF("worker %d exited with %d", pid, WEXITSTATUS(ws));
} else {
DEBUGF("worker %d terminated with %s", pid, strsignal(WTERMSIG(ws)));
}
} else {
break;
}
} else {
if (errno == EINTR) continue;
if (errno == ECHILD) break;
FATALF("waitpid failed in sigchld");
}
}
}
@ -190,14 +189,14 @@ void StartTcpServer(void) {
CHECK_NE(-1, listen(g_servfd, 10));
asize = sizeof(g_servaddr);
CHECK_NE(-1, getsockname(g_servfd, &g_servaddr, &asize));
CHECK_NE(-1, fcntl(g_servfd, F_SETFD, FD_CLOEXEC));
LOGF("%s:%s", "listening on tcp", gc(DescribeAddress(&g_servaddr)));
if (g_sendready) {
printf("ready %hu\n", ntohs(g_servaddr.sin_port));
fflush(stdout);
fclose(stdout);
stdout->fd = g_devnullfd;
dup2(g_devnullfd, stdout->fd);
}
CHECK_NE(-1, fcntl(g_servfd, F_SETFD, FD_CLOEXEC));
LOGF("%s:%s", "listening on tcp", gc(DescribeAddress(&g_servaddr)));
}
void SendExitMessage(int sock, int rc) {
@ -242,7 +241,9 @@ void HandleClient(void) {
ssize_t got, wrote;
struct sockaddr_in addr;
char *addrstr, *exename;
int rc, wstatus, child, pipefds[2];
sigset_t chldmask, savemask;
int exitcode, wstatus, child, pipefds[2];
struct sigaction ignore, saveint, savequit;
uint32_t addrsize, namesize, filesize, remaining;
/* read request to run program */
@ -252,7 +253,6 @@ void HandleClient(void) {
close(g_clifd);
return;
}
g_childterm = false;
addrstr = gc(DescribeAddress(&addr));
DEBUGF("%s %s %s", gc(DescribeAddress(&g_servaddr)), "accepted", addrstr);
got = recv(g_clifd, (p = &g_buf[0]), sizeof(g_buf), 0);
@ -303,13 +303,25 @@ void HandleClient(void) {
/* run program, tee'ing stderr to both log and client */
DEBUGF("spawning %s", exename);
ignore.sa_flags = 0;
ignore.sa_handler = SIG_IGN;
LOGIFNEG1(sigemptyset(&ignore.sa_mask));
LOGIFNEG1(sigaction(SIGINT, &ignore, &saveint));
LOGIFNEG1(sigaction(SIGQUIT, &ignore, &savequit));
LOGIFNEG1(sigemptyset(&chldmask));
LOGIFNEG1(sigaddset(&chldmask, SIGCHLD));
LOGIFNEG1(sigprocmask(SIG_BLOCK, &chldmask, &savemask));
CHECK_NE(-1, pipe2(pipefds, O_CLOEXEC));
if (!(child = vfork())) {
CHECK_NE(-1, (child = fork()));
if (!child) {
sigaction(SIGINT, &saveint, NULL);
sigaction(SIGQUIT, &savequit, NULL);
sigprocmask(SIG_SETMASK, &savemask, NULL);
dup2(pipefds[1], 2);
execv(g_exepath, (char *const[]){g_exepath, NULL});
abort();
_exit(127);
}
close(pipefds[1]);
LOGIFNEG1(close(pipefds[1]));
DEBUGF("communicating %s[%d]", exename, child);
for (;;) {
CHECK_NE(-1, (got = read(pipefds[0], g_buf, sizeof(g_buf))));
@ -320,23 +332,24 @@ void HandleClient(void) {
fwrite(g_buf, got, 1, stderr);
SendOutputFragmentMessage(g_clifd, kRunitStderr, g_buf, got);
}
if ((rc = waitpid(child, &wstatus, 0)) != -1) {
g_childstatus = wstatus;
} else {
CHECK_EQ(ECHILD, errno);
CHECK(g_childterm);
while (waitpid(child, &wstatus, 0) == -1) {
if (errno == EINTR) continue;
FATALF("waitpid failed");
}
if (WIFSIGNALED(g_childstatus)) {
rc = 128 + WTERMSIG(g_childstatus);
if (WIFEXITED(wstatus)) {
DEBUGF("%s exited with %d", exename, WEXITSTATUS(wstatus));
exitcode = WEXITSTATUS(wstatus);
} else {
rc = WEXITSTATUS(g_childstatus);
DEBUGF("%s terminated with %s", exename, strsignal(WTERMSIG(wstatus)));
exitcode = 128 + WTERMSIG(wstatus);
}
DEBUGF("exited %s[%d] -> %d", exename, child, rc);
LOGIFNEG1(sigaction(SIGINT, &saveint, NULL));
LOGIFNEG1(sigaction(SIGQUIT, &savequit, NULL));
LOGIFNEG1(sigprocmask(SIG_SETMASK, &savemask, NULL));
/* let client know how it went */
LOGIFNEG1(unlink(g_exepath));
SendExitMessage(g_clifd, rc);
SendExitMessage(g_clifd, exitcode);
LOGIFNEG1(shutdown(g_clifd, SHUT_RDWR));
LOGIFNEG1(close(g_clifd));
_exit(0);
@ -363,29 +376,17 @@ TryAgain:
}
int Serve(void) {
int rc;
const struct sigaction onsigint = {.sa_handler = (void *)OnInterrupt,
.sa_flags = SA_NODEFER};
const struct sigaction onsigterm = {.sa_handler = (void *)OnInterrupt,
.sa_flags = SA_NODEFER};
const struct sigaction onsigchld = {.sa_handler = (void *)OnChildTerminated,
.sa_flags = SA_RESTART};
StartTcpServer();
defer(close_s, &g_servfd);
if (!(rc = setjmp(g_jb))) {
sigaction(SIGINT, &onsigint, NULL);
sigaction(SIGTERM, &onsigterm, NULL);
sigaction(SIGCHLD, &onsigchld, NULL);
for (;;) {
if (!Poll() && !g_timeout) break;
}
LOGF("timeout expired, shutting down");
} else {
if (isatty(fileno(stderr))) fputc('\r', stderr);
LOGF("got %s, shutting down", strsignal(rc));
rc += 128;
sigaction(SIGCHLD,
(&(struct sigaction){.sa_handler = (void *)OnChildTerminated,
.sa_flags = SA_RESTART}),
NULL);
for (;;) {
if (!Poll() && !g_timeout) break;
}
return rc;
close(g_servfd);
LOGF("timeout expired, shutting down");
return 0;
}
void Daemonize(void) {
@ -393,15 +394,17 @@ void Daemonize(void) {
if (fork() > 0) _exit(0);
setsid();
if (fork() > 0) _exit(0);
stdin->fd = g_devnullfd;
if (!g_sendready) stdout->fd = g_devnullfd;
if (stat(kLogFile, &st) != -1 && st.st_size > kLogMaxBytes) unlink(kLogFile);
freopen(kLogFile, "a", stderr);
dup2(g_devnullfd, stdin->fd);
if (!g_sendready) dup2(g_devnullfd, stdout->fd);
freopen(kLogFile, "ae", stderr);
if (fstat(fileno(stderr), &st) != -1 && st.st_size > kLogMaxBytes) {
ftruncate(fileno(stderr), 0);
}
}
int main(int argc, char *argv[]) {
showcrashreports();
g_loglevel = kLogDebug;
/* g_loglevel = kLogDebug; */
GetOpts(argc, argv);
CHECK_NE(-1, (g_devnullfd = open("/dev/null", O_RDWR)));
defer(close_s, &g_devnullfd);

View file

@ -23,6 +23,7 @@
#include "libc/calls/struct/stat.h"
#include "libc/calls/struct/timespec.h"
#include "libc/elf/def.h"
#include "libc/fmt/conv.h"
#include "libc/limits.h"
#include "libc/log/check.h"
#include "libc/log/log.h"
@ -68,6 +69,7 @@
char *symbol_;
char *outpath_;
char *yoink_;
int64_t image_base_;
const size_t kMinCompressSize = 32;
const char kNoCompressExts[][8] = {".gz", ".xz", ".jpg", ".png",
@ -83,7 +85,8 @@ wontreturn void PrintUsage(int rc, FILE *f) {
void GetOpts(int *argc, char ***argv) {
int opt;
yoink_ = "__zip_start";
while ((opt = getopt(*argc, *argv, "?ho:s:y:")) != -1) {
image_base_ = IMAGE_BASE_VIRTUAL;
while ((opt = getopt(*argc, *argv, "?ho:s:y:b:")) != -1) {
switch (opt) {
case 'o':
outpath_ = optarg;
@ -94,6 +97,9 @@ void GetOpts(int *argc, char ***argv) {
case 'y':
yoink_ = optarg;
break;
case 'b':
image_base_ = strtol(optarg, NULL, 0);
break;
case '?':
case 'h':
PrintUsage(EXIT_SUCCESS, stdout);
@ -261,7 +267,7 @@ void EmitZip(struct ElfWriter *elf, const char *name, size_t namesize,
ELF64_ST_INFO(STB_LOCAL, STT_OBJECT), STV_DEFAULT, 0,
kZipCdirHdrLinkableSize);
elfwriter_appendrela(elf, kZipCfileOffsetOffset, lfilesym, R_X86_64_32,
-IMAGE_BASE_VIRTUAL);
-image_base_);
elfwriter_commit(elf, kZipCdirHdrLinkableSize);
elfwriter_finishsection(elf);
}