Make mmap() scalable

It's now possible to create thousands of thousands of sparse independent
memory mappings, without any slowdown. The memory manager is better with
tracking memory protection now, particularly on Windows in a precise way
that can be restored during fork(). You now have the highest quality mem
manager possible. It's even better than some OSes like XNU, where mmap()
is implemented as an O(n) operation which means sadly things aren't much
improved over there. With this change the llamafile HTTP server endpoint
at /tokenize with a prompt of 50 tokens is now able to handle 2.6m r/sec
This commit is contained in:
Justine Tunney 2024-07-05 23:13:20 -07:00
parent 3756870635
commit 8c645fa1ee
No known key found for this signature in database
GPG key ID: BE714B4575D6E328
59 changed files with 1238 additions and 1067 deletions

View file

@ -134,12 +134,12 @@
#define INFOF(FMT, ...) LOGF(INFO, FMT, ##__VA_ARGS__)
#define WARNF(FMT, ...) LOGF(WARN, FMT, ##__VA_ARGS__)
#define LOGF(LVL, FMT, ...) \
do { \
if (g_log_level >= LOG_LEVEL_##LVL) { \
kprintf("%r" #LVL " %6P %'18T %s:%d " FMT "\n", __FILE__, __LINE__, \
##__VA_ARGS__); \
} \
#define LOGF(LVL, FMT, ...) \
do { \
if (g_log_level >= LOG_LEVEL_##LVL) { \
kprintf("%r" #LVL " %6P %6H %'18T %s:%d " FMT "\n", __FILE__, __LINE__, \
##__VA_ARGS__); \
} \
} while (0)
struct Client {
@ -443,12 +443,18 @@ void FreeClient(struct Client *client) {
void *ClientWorker(void *arg) {
uint32_t crc;
sigset_t sigmask;
struct timespec ts0;
struct timespec ts1;
struct timespec ts2;
int events, wstatus;
struct Client *client = arg;
uint32_t namesize, filesize;
char *addrstr, *origname;
unsigned char msg[4 + 1 + 4 + 4 + 4];
ts0 = timespec_real();
ts1 = timespec_real();
SetupPresharedKeySsl(MBEDTLS_SSL_IS_SERVER, g_psk);
defer(FreeClient, client);
@ -458,9 +464,15 @@ void *ClientWorker(void *arg) {
EzHandshake();
addrstr = DescribeAddress(&client->addr);
DEBUF("%s %s %s", DescribeAddress(&g_servaddr), "accepted", addrstr);
DEBUF("it took %'zu us to handshake client",
timespec_tomicros(timespec_sub(timespec_real(), ts1)));
// get the executable
ts1 = timespec_real();
ts2 = timespec_real();
Recv(client, msg, sizeof(msg));
DEBUF("it took %'zu us to receive #1",
timespec_tomicros(timespec_sub(timespec_real(), ts2)));
if (READ32BE(msg) != RUNITD_MAGIC) {
WARNF("%s magic mismatch!", addrstr);
pthread_exit(0);
@ -473,11 +485,19 @@ void *ClientWorker(void *arg) {
filesize = READ32BE(msg + 9);
crc = READ32BE(msg + 13);
origname = gc(calloc(1, namesize + 1));
ts2 = timespec_real();
Recv(client, origname, namesize);
DEBUF("it took %'zu us to receive #2",
timespec_tomicros(timespec_sub(timespec_real(), ts2)));
VERBF("%s sent %#s (%'u bytes @ %#s)", addrstr, origname, filesize,
client->tmpexepath);
char *exedata = gc(malloc(filesize));
ts2 = timespec_real();
Recv(client, exedata, filesize);
DEBUF("it took %'zu us to receive #3",
timespec_tomicros(timespec_sub(timespec_real(), ts2)));
DEBUF("it took %'zu us to receive executable from network",
timespec_tomicros(timespec_sub(timespec_real(), ts1)));
if (crc32_z(0, exedata, filesize) != crc) {
WARNF("%s crc mismatch! %#s", addrstr, origname);
pthread_exit(0);
@ -488,6 +508,7 @@ void *ClientWorker(void *arg) {
// condition can happen, where etxtbsy is raised by our execve
// we're using o_cloexec so it's guaranteed to fix itself fast
// thus we use an optimistic approach to avoid expensive locks
ts1 = timespec_real();
sprintf(client->tmpexepath, "o/%s.XXXXXX",
basename(stripext(gc(strdup(origname)))));
int exefd = openatemp(AT_FDCWD, client->tmpexepath, 0, O_CLOEXEC, 0700);
@ -510,6 +531,8 @@ void *ClientWorker(void *arg) {
WARNF("%s failed to close %#s due to %m", addrstr, origname);
pthread_exit(0);
}
DEBUF("it took %'zu us to write executable to disk",
timespec_tomicros(timespec_sub(timespec_real(), ts1)));
// do the args
int i = 0;
@ -560,8 +583,11 @@ RetryOnEtxtbsyRaceCondition:
posix_spawn_file_actions_adddup2(&spawnfila, g_bogusfd, 0);
posix_spawn_file_actions_adddup2(&spawnfila, client->pipe[1], 1);
posix_spawn_file_actions_adddup2(&spawnfila, client->pipe[1], 2);
ts1 = timespec_real();
err = posix_spawn(&client->pid, client->tmpexepath, &spawnfila, &spawnattr,
args, environ);
DEBUF("it took %'zu us to call posix_spawn",
timespec_tomicros(timespec_sub(timespec_real(), ts1)));
if (err) {
if (err == ETXTBSY) {
goto RetryOnEtxtbsyRaceCondition;
@ -599,8 +625,11 @@ RetryOnEtxtbsyRaceCondition:
fds[0].events = POLLIN;
fds[1].fd = client->pipe[0];
fds[1].events = POLLIN;
ts1 = timespec_real();
events = poll(fds, ARRAYLEN(fds),
timespec_tomillis(timespec_sub(deadline, now)));
DEBUF("it took %'zu us to call poll",
timespec_tomicros(timespec_sub(timespec_real(), ts1)));
if (events == -1) {
if (errno == EINTR) {
INFOF("poll interrupted");
@ -615,7 +644,10 @@ RetryOnEtxtbsyRaceCondition:
if (fds[0].revents) {
int received;
char buf[512];
ts1 = timespec_real();
received = mbedtls_ssl_read(&ezssl, buf, sizeof(buf));
DEBUF("it took %'zu us to call mbedtls_ssl_read",
timespec_tomicros(timespec_sub(timespec_real(), ts1)));
if (!received) {
WARNF("%s client disconnected so killing worker %d", origname,
client->pid);
@ -640,7 +672,10 @@ RetryOnEtxtbsyRaceCondition:
}
if (fds[1].revents) {
char buf[512];
ts1 = timespec_real();
ssize_t got = read(client->pipe[0], buf, sizeof(buf));
DEBUF("it took %'zu us to call read",
timespec_tomicros(timespec_sub(timespec_real(), ts1)));
if (got == -1) {
WARNF("got %s reading %s output", strerror(errno), origname);
goto HangupClientAndTerminateJob;
@ -658,7 +693,10 @@ RetryOnEtxtbsyRaceCondition:
WaitAgain:
DEBUF("waitpid");
struct rusage rusage;
ts1 = timespec_real();
int wrc = wait4(client->pid, &wstatus, 0, &rusage);
DEBUF("it took %'zu us to call wait4",
timespec_tomicros(timespec_sub(timespec_real(), ts1)));
if (wrc == -1) {
if (errno == EINTR) {
WARNF("waitpid interrupted; killing %s pid %d", origname, client->pid);
@ -711,13 +749,18 @@ WaitAgain:
AppendResourceReport(&client->output, &rusage, "\n");
PrintProgramOutput(client);
}
ts1 = timespec_real();
SendProgramOutput(client);
SendExitMessage(exitcode);
mbedtls_ssl_close_notify(&ezssl);
DEBUF("it took %'zu us to send result to client",
timespec_tomicros(timespec_sub(timespec_real(), ts1)));
if (etxtbsy_tries > 1) {
WARNF("encountered %d ETXTBSY race conditions spawning %s",
etxtbsy_tries - 1, origname);
}
DEBUF("it took %'zu us TO DO EVERYTHING",
timespec_tomicros(timespec_sub(timespec_real(), ts0)));
pthread_exit(0);
}