diff --git a/tool/build/runit.c b/tool/build/runit.c index 41cf9f71b..c71780ec2 100644 --- a/tool/build/runit.c +++ b/tool/build/runit.c @@ -21,6 +21,7 @@ #include "libc/bits/safemacros.internal.h" #include "libc/calls/calls.h" #include "libc/calls/struct/flock.h" +#include "libc/calls/struct/sigaction.h" #include "libc/calls/struct/stat.h" #include "libc/dce.h" #include "libc/dns/dns.h" @@ -43,10 +44,12 @@ #include "libc/sysv/consts/fileno.h" #include "libc/sysv/consts/ipproto.h" #include "libc/sysv/consts/itimer.h" +#include "libc/sysv/consts/limits.h" #include "libc/sysv/consts/lock.h" #include "libc/sysv/consts/map.h" #include "libc/sysv/consts/o.h" #include "libc/sysv/consts/prot.h" +#include "libc/sysv/consts/sig.h" #include "libc/sysv/consts/sock.h" #include "libc/time/time.h" #include "libc/x/x.h" @@ -310,11 +313,12 @@ TryAgain: freeaddrinfo(ai); } -static bool Send(const void *output, size_t outputsize) { - int rc, have; +bool Send(int *pipes, int pipescount, const void *output, size_t outputsize) { + bool okall; + int rc, i, have; static bool once; static z_stream zs; - static char zbuf[4096]; + char *zbuf = gc(malloc(PIPE_BUF)); if (!once) { CHECK_EQ(Z_OK, deflateInit2(&zs, 4, Z_DEFLATED, MAX_WBITS, DEF_MEM_LEVEL, Z_DEFAULT_STRATEGY)); @@ -322,26 +326,29 @@ static bool Send(const void *output, size_t outputsize) { } zs.next_in = output; zs.avail_in = outputsize; + okall = true; do { - zs.avail_out = sizeof(zbuf); + zs.avail_out = PIPE_BUF; zs.next_out = (unsigned char *)zbuf; rc = deflate(&zs, Z_SYNC_FLUSH); CHECK_NE(Z_STREAM_ERROR, rc); - have = sizeof(zbuf) - zs.avail_out; - rc = mbedtls_ssl_write(&ezssl, zbuf, have); - if (rc == MBEDTLS_ERR_NET_CONN_RESET) { - usleep((g_backoff = (g_backoff + 1000) * 2)); - return false; + have = PIPE_BUF - zs.avail_out; + for (i = 0; i < pipescount; ++i) { + rc = write(pipes[i * 2 + 1], zbuf, have); + if (rc != have) { + DEBUGF("write(%d, %d) → %d", pipes[i * 2 + 1], have, rc); + okall = false; + } } - CHECK_EQ(have, rc); } while (!zs.avail_out); - return true; + return okall; } -int SendRequest(void) { +bool SendRequest(int *pipes, int pipescount) { int fd; char *p; size_t i; + bool okall; ssize_t rc; uint32_t crc; struct stat st; @@ -364,21 +371,40 @@ int SendRequest(void) { q = WRITE32BE(q, crc); q = mempcpy(q, name, namesize); assert(hdrsize == q - hdr); - DEBUGF("running %s on %s", g_prog, g_hostname); - if (Send(hdr, hdrsize) && Send(p, progsize)) { - if (!(rc = EzTlsFlush(&ezbio, 0, 0))) { - rc = 0; - } else if (rc == MBEDTLS_ERR_NET_CONN_RESET) { - rc = -1; - } else { - CHECK_EQ(0, rc); - } - } else { - rc = -1; - } + okall = true; + okall &= Send(pipes, pipescount, hdr, hdrsize); + okall &= Send(pipes, pipescount, p, progsize); CHECK_NE(-1, munmap(p, st.st_size)); CHECK_NE(-1, close(fd)); - return rc; + for (i = 0; i < pipescount; ++i) { + okall &= !close(pipes[i * 2 + 1]); + } + return okall; +} + +bool RelayRequest(void) { + int i, rc, have, transferred; + char *buf = gc(malloc(PIPE_BUF)); + for (transferred = 0;;) { + rc = read(13, buf, PIPE_BUF); + CHECK_NE(-1, rc); + have = rc; + if (!rc) break; + transferred += have; + for (i = 0; i < have; i += rc) { + rc = mbedtls_ssl_write(&ezssl, buf + i, have - i); + if (rc <= 0) { + TlsDie("relay request failed", rc); + } + } + } + CHECK_NE(0, transferred); + rc = EzTlsFlush(&ezbio, 0, 0); + if (rc < 0) { + TlsDie("relay request failed to flush", rc); + } + close(13); + return true; } bool Recv(unsigned char *p, size_t n) { @@ -387,10 +413,7 @@ bool Recv(unsigned char *p, size_t n) { do { rc = mbedtls_ssl_read(&ezssl, p + i, n - i); } while (rc == MBEDTLS_ERR_SSL_WANT_READ); - if (!rc || rc == MBEDTLS_ERR_NET_CONN_RESET) { - usleep((g_backoff = (g_backoff + 1000) * 2)); - return false; - } else if (rc < 0) { + if (rc < 0) { TlsDie("read response failed", rc); } } @@ -458,7 +481,7 @@ int RunOnHost(char *spec) { WARNF("warning: got connection reset in handshake"); close(g_sock); } - } while ((rc = SendRequest()) == -1 || (rc = ReadResponse()) == -1); + } while ((rc = RelayRequest()) == -1 || (rc = ReadResponse()) == -1); return rc; } @@ -473,12 +496,23 @@ bool ShouldRunInParralel(void) { int SpawnSubprocesses(int argc, char *argv[]) { sigset_t chldmask, savemask; - int i, rc, ws, pid, *pids, exitcode; struct sigaction ignore, saveint, savequit; + int i, rc, ws, pid, *pids, *pipes, exitcode; char *args[5] = {argv[0], argv[1], argv[2]}; argc -= 3; argv += 3; + + // fork off 𝑛 subprocesses for each host on which we run binary. + // what's important here is htop in tree mode will report like: + // + // runit.com xnu freebsd netbsd + // ├─runit.com xnu + // ├─runit.com freebsd + // └─runit.com netbsd + // + // That way when one hangs, it's easy to know what o/s it is. pids = calloc(argc, sizeof(int)); + pipes = calloc(argc, sizeof(int) * 2); ignore.sa_flags = 0; ignore.sa_handler = SIG_IGN; sigemptyset(&ignore.sa_mask); @@ -489,16 +523,27 @@ int SpawnSubprocesses(int argc, char *argv[]) { sigprocmask(SIG_BLOCK, &chldmask, &savemask); for (i = 0; i < argc; ++i) { args[3] = argv[i]; + CHECK_NE(-1, pipe2(pipes + i * 2, O_CLOEXEC)); CHECK_NE(-1, (pids[i] = vfork())); if (!pids[i]) { + dup2(pipes[i * 2], 13); sigaction(SIGINT, &(struct sigaction){0}, 0); sigaction(SIGQUIT, &(struct sigaction){0}, 0); sigprocmask(SIG_SETMASK, &savemask, 0); - execve(args[0], args, environ); /* for htop */ + execve(args[0], args, environ); // for htop _Exit(127); } + close(pipes[i * 2]); // close reader } - for (exitcode = 0;;) { + + // the smart part is we only do the expensive deflate operation from + // the main process we assume file descriptor 13 is safely inherited + static z_stream zs; + sigaction(SIGPIPE, &ignore, 0); + exitcode = SendRequest(pipes, argc) ? 0 : 150; + + // now wait for the children to terminate + for (;;) { if ((pid = wait(&ws)) == -1) { if (errno == EINTR) continue; if (errno == ECHILD) break; @@ -523,6 +568,7 @@ int SpawnSubprocesses(int argc, char *argv[]) { sigprocmask(SIG_SETMASK, &savemask, 0); sigaction(SIGQUIT, &savequit, 0); sigaction(SIGINT, &saveint, 0); + free(pipes); free(pids); return exitcode; } diff --git a/tool/build/runitd.c b/tool/build/runitd.c index 588446d11..170da76b9 100644 --- a/tool/build/runitd.c +++ b/tool/build/runitd.c @@ -310,8 +310,14 @@ void Recv(void *output, size_t outputsize) { // pass along error conditions to caller // pass along eof condition to zlib received = mbedtls_ssl_read(&ezssl, buf, sizeof(buf)); - if (!received) TlsDie("got unexpected eof", received); - if (received < 0) TlsDie("read failed", received); + if (!received) { + close(g_clifd); + TlsDie("got unexpected eof", received); + } + if (received < 0) { + close(g_clifd); + TlsDie("read failed", received); + } // decompress packet completely // into a dynamical size buffer zs.avail_in = received;