diff --git a/examples/ctrlc.c b/examples/ctrlc.c index f15f8dae4..f291add80 100644 --- a/examples/ctrlc.c +++ b/examples/ctrlc.c @@ -89,7 +89,7 @@ int main(int argc, char *argv[]) { // not the case when you use sigprocmask() to block signals which is // useful for kicking the can down the road. WRITE("doing cpu task...\n"); - for (volatile int i = 0; i < INT_MAX / 5; ++i) { + for (volatile int i = 0; i < INT_MAX / 3; ++i) { if (gotsig) { WRITE("\rgot ctrl+c asynchronously\n"); exit(0); diff --git a/libc/calls/poll-nt.c b/libc/calls/poll-nt.c index e7159a71d..360a80eac 100644 --- a/libc/calls/poll-nt.c +++ b/libc/calls/poll-nt.c @@ -28,6 +28,7 @@ #include "libc/errno.h" #include "libc/intrin/atomic.h" #include "libc/intrin/strace.h" +#include "libc/intrin/weaken.h" #include "libc/macros.h" #include "libc/mem/mem.h" #include "libc/nt/console.h" @@ -48,6 +49,7 @@ #include "libc/stdio/sysparam.h" #include "libc/sysv/consts/o.h" #include "libc/sysv/consts/poll.h" +#include "libc/sysv/consts/sicode.h" #include "libc/sysv/consts/sig.h" #include "libc/sysv/errfuns.h" #include "libc/thread/posixthread.internal.h" @@ -78,13 +80,13 @@ static textwindows int sys_poll_nt_impl(struct pollfd *fds, uint64_t nfds, uint32_t *ms, sigset_t sigmask) { bool ok; uint64_t millis; - uint32_t cm, avail, waitfor; struct sys_pollfd_nt pipefds[64]; struct sys_pollfd_nt sockfds[64]; int pipeindices[ARRAYLEN(pipefds)]; int sockindices[ARRAYLEN(sockfds)]; struct timespec deadline, remain, now; - int i, rc, sn, pn, gotinvals, gotpipes, gotsocks; + uint32_t cm, avail, waitfor, already_slept; + int i, rc, sn, pn, sig, gotinvals, gotpipes, gotsocks, handler_was_called; waitfor = ms ? *ms : -1u; deadline = timespec_add(timespec_mono(), timespec_frommillis(waitfor)); @@ -146,7 +148,20 @@ static textwindows int sys_poll_nt_impl(struct pollfd *fds, uint64_t nfds, // perform the i/o and sleeping and looping for (;;) { + + // determine how long to wait + now = timespec_mono(); + if (timespec_cmp(now, deadline) < 0) { + remain = timespec_sub(deadline, now); + millis = timespec_tomillis(remain); + waitfor = MIN(millis, 0xffffffffu); + waitfor = MIN(waitfor, POLL_INTERVAL_MS); + } else { + waitfor = 0; + } + // see if input is available on non-sockets + already_slept = 0; for (gotpipes = i = 0; i < pn; ++i) { if (pipefds[i].events & POLLWRNORM_) // we have no way of polling if a non-socket is writeable yet @@ -171,7 +186,7 @@ static textwindows int sys_poll_nt_impl(struct pollfd *fds, uint64_t nfds, // some programs like bash like to poll([stdin], 1, -1) so let's // avoid busy looping in such cases. we could generalize this to // always avoid busy loops, but we'd need poll to launch threads - if (0 && pn == 1 && sn == 0 && (pipefds[i].events & POLLRDNORM_)) { + if (!sn && (pipefds[i].events & POLLRDNORM_) && !already_slept++) { int err = errno; switch (CountConsoleInputBytesBlocking(waitfor, sigmask)) { case -1: @@ -212,10 +227,12 @@ static textwindows int sys_poll_nt_impl(struct pollfd *fds, uint64_t nfds, if (pipefds[i].revents) ++gotpipes; } + // if we haven't found any good results yet then here we // compute a small time slice we don't mind sleeping for if (sn) { - if ((gotsocks = WSAPoll(sockfds, sn, 0)) == -1) + already_slept = 1; + if ((gotsocks = WSAPoll(sockfds, sn, waitfor)) == -1) return __winsockerr(); } else { gotsocks = 0; @@ -223,19 +240,21 @@ static textwindows int sys_poll_nt_impl(struct pollfd *fds, uint64_t nfds, // add some artificial delay, which we use as an opportunity to also // check for pending signals, thread cancelation, etc. - waitfor = 0; - if (!gotinvals && !gotsocks && !gotpipes) { - now = timespec_mono(); - if (timespec_cmp(now, deadline) < 0) { - remain = timespec_sub(deadline, now); - millis = timespec_tomillis(remain); - waitfor = MIN(millis, 0xffffffffu); - waitfor = MIN(waitfor, POLL_INTERVAL_MS); - if (waitfor) { - POLLTRACE("poll() sleeping for %'d out of %'lu ms", waitfor, - timespec_tomillis(remain)); - if (_park_norestart(waitfor, sigmask) == -1) - return -1; // eintr, ecanceled, etc. + if (!gotinvals && !gotsocks && !gotpipes && waitfor) { + if (!already_slept) { + POLLTRACE("poll() parking for %'d out of %'lu ms", waitfor, + timespec_tomillis(remain)); + if (_park_norestart(waitfor, sigmask) == -1) + return -1; // eintr, ecanceled, etc. + } else { + if (_check_cancel() == -1) + return -1; + if (_weaken(__sig_get) && (sig = _weaken(__sig_get)(sigmask))) { + handler_was_called = _weaken(__sig_relay)(sig, SI_KERNEL, sigmask); + if (_check_cancel() == -1) + return -1; + if (handler_was_called) + return eintr(); } } } diff --git a/test/libc/calls/poll_latency_test.c b/test/libc/calls/poll_latency_test.c new file mode 100644 index 000000000..ab6aa5a4f --- /dev/null +++ b/test/libc/calls/poll_latency_test.c @@ -0,0 +1,205 @@ +/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│ +│ vi: set et ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi │ +╞══════════════════════════════════════════════════════════════════════════════╡ +│ Copyright 2024 Justine Alexandra Roberts Tunney │ +│ │ +│ Permission to use, copy, modify, and/or distribute this software for │ +│ any purpose with or without fee is hereby granted, provided that the │ +│ above copyright notice and this permission notice appear in all copies. │ +│ │ +│ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL │ +│ WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED │ +│ WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE │ +│ AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL │ +│ DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR │ +│ PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER │ +│ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │ +│ PERFORMANCE OF THIS SOFTWARE. │ +╚─────────────────────────────────────────────────────────────────────────────*/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define NUM_MEASUREMENTS 10 +#define BUFFER_SIZE sizeof(struct timespec) + +atomic_int global_state; + +typedef struct { + int port; + int client_sock; +} listener_data; + +void *sender_thread(void *arg) { + listener_data *data = (listener_data *)arg; + int sockfd = socket(data->port == 0 ? AF_INET : AF_INET6, SOCK_STREAM, 0); + if (sockfd < 0) { + perror("Socket creation failed"); + exit(EXIT_FAILURE); + } + + void *addr; + struct sockaddr_in addr_v4 = {0}; + struct sockaddr_in6 addr_v6 = {0}; + socklen_t addr_len; + + if (data->port == 0) { // IPv4 + addr_v4.sin_family = AF_INET; + addr_v4.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr_v4.sin_port = 0; + addr = &addr_v4; + addr_len = sizeof(addr_v4); + } else { // IPv6 + addr_v6.sin6_family = AF_INET6; + addr_v6.sin6_addr = in6addr_loopback; + addr_v6.sin6_port = 0; + addr = &addr_v6; + addr_len = sizeof(addr_v6); + } + + if (bind(sockfd, addr, addr_len) < 0) { + perror("Bind failed"); + exit(EXIT_FAILURE); + } + + if (getsockname(sockfd, addr, &addr_len) < 0) { + perror("getsockname failed"); + exit(EXIT_FAILURE); + } + + data->port = ntohs(data->port == 0 ? addr_v4.sin_port : addr_v6.sin6_port); + + if (listen(sockfd, 1) < 0) { + perror("Listen failed"); + exit(EXIT_FAILURE); + } + + atomic_fetch_add(&global_state, 1); + data->client_sock = accept(sockfd, NULL, NULL); + if (data->client_sock < 0) { + perror("Accept failed"); + exit(EXIT_FAILURE); + } + atomic_fetch_add(&global_state, 1); + + struct timespec ts; + for (int i = 0; i < NUM_MEASUREMENTS; i++) { + while (atomic_load(&global_state)) { + } + atomic_fetch_add(&global_state, 1); + clock_gettime(CLOCK_MONOTONIC, &ts); + send(data->client_sock, &ts, sizeof(ts), 0); + } + + close(data->client_sock); + close(sockfd); + return NULL; +} + +int main() { + ShowCrashReports(); + + pthread_t ipv4_thread, ipv6_thread; + listener_data ipv4_data = {0}, + ipv6_data = {1}; // Use port 0 for IPv4, 1 for IPv6 + + global_state = -5; + + if (pthread_create(&ipv4_thread, NULL, sender_thread, &ipv4_data) != 0) { + perror("Failed to create IPv4 thread"); + exit(EXIT_FAILURE); + } + + if (pthread_create(&ipv6_thread, NULL, sender_thread, &ipv6_data) != 0) { + perror("Failed to create IPv6 thread"); + exit(EXIT_FAILURE); + } + + // Wait for both listeners to be ready + while (atomic_load(&global_state) < -3) { + // Busy wait + } + + int ipv4_sock = socket(AF_INET, SOCK_STREAM, 0); + int ipv6_sock = socket(AF_INET6, SOCK_STREAM, 0); + + struct sockaddr_in ipv4_addr = {0}; + ipv4_addr.sin_family = AF_INET; + ipv4_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + ipv4_addr.sin_port = htons(ipv4_data.port); + + struct sockaddr_in6 ipv6_addr = {0}; + ipv6_addr.sin6_family = AF_INET6; + ipv6_addr.sin6_addr = in6addr_loopback; + ipv6_addr.sin6_port = htons(ipv6_data.port); + + if (connect(ipv4_sock, (struct sockaddr *)&ipv4_addr, sizeof(ipv4_addr)) < + 0) { + perror("IPv4 connect failed"); + exit(EXIT_FAILURE); + } + + if (connect(ipv6_sock, (struct sockaddr *)&ipv6_addr, sizeof(ipv6_addr)) < + 0) { + perror("IPv6 connect failed"); + exit(EXIT_FAILURE); + } + + // Wait for both listeners to be ready + while (atomic_load(&global_state) < -1) { + // Busy wait + } + atomic_fetch_add(&global_state, 1); + + struct pollfd fds[2]; + fds[0].fd = ipv4_sock; + fds[0].events = POLLIN; + fds[1].fd = ipv6_sock; + fds[1].events = POLLIN; + + struct timespec ts_sent, ts_now; + double total_latency = 0.0; + int total_measurements = 0; + + while (total_measurements < 2 * NUM_MEASUREMENTS) { + int ready = poll(fds, 2, -1); + if (ready < 0) { + perror("Poll failed"); + exit(EXIT_FAILURE); + } + + clock_gettime(CLOCK_MONOTONIC, &ts_now); + + for (int i = 0; i < 2; i++) { + if (fds[i].revents & POLLIN) { + ssize_t n = recv(fds[i].fd, &ts_sent, sizeof(ts_sent), 0); + if (n == sizeof(ts_sent)) { + total_latency += timespec_tonanos(timespec_sub(ts_now, ts_sent)); + total_measurements++; + atomic_fetch_sub(&global_state, 1); + } + } + } + } + + double mean_latency = total_latency / total_measurements; + printf("Mean poll() latency: %.2f ns\n", mean_latency); + + unassert(!close(ipv4_sock)); + unassert(!close(ipv6_sock)); + + unassert(!pthread_join(ipv4_thread, NULL)); + unassert(!pthread_join(ipv6_thread, NULL)); + + CheckForMemoryLeaks(); +} diff --git a/test/libc/sock/ipv4v6poll_test.c b/test/libc/sock/ipv4v6poll_test.c new file mode 100644 index 000000000..7fc6f9ed0 --- /dev/null +++ b/test/libc/sock/ipv4v6poll_test.c @@ -0,0 +1,247 @@ +/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│ +│ vi: set et ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi │ +╞══════════════════════════════════════════════════════════════════════════════╡ +│ Copyright 2024 Justine Alexandra Roberts Tunney │ +│ │ +│ Permission to use, copy, modify, and/or distribute this software for │ +│ any purpose with or without fee is hereby granted, provided that the │ +│ above copyright notice and this permission notice appear in all copies. │ +│ │ +│ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL │ +│ WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED │ +│ WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE │ +│ AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL │ +│ DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR │ +│ PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER │ +│ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │ +│ PERFORMANCE OF THIS SOFTWARE. │ +╚─────────────────────────────────────────────────────────────────────────────*/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define BUFFER_SIZE 1024 + +// States: +// 0: Initial state +// 1: IPv4 listener ready +// 2: IPv6 listener ready +// 3: Both listeners ready, main can connect +// 4: Main connected, IPv4 can send +// 5: IPv4 sent, IPv6 can send +// 6: All communication complete +atomic_int global_state = 0; + +typedef struct { + int port; + int client_sock; +} listener_data; + +void *ipv4_listener(void *arg) { + listener_data *data = (listener_data *)arg; + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { + perror("IPv4 socket creation failed"); + exit(EXIT_FAILURE); + } + + struct sockaddr_in addr = {0}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; // Random port + + if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + perror("IPv4 bind failed"); + exit(EXIT_FAILURE); + } + + socklen_t len = sizeof(addr); + if (getsockname(sockfd, (struct sockaddr *)&addr, &len) < 0) { + perror("getsockname failed"); + exit(EXIT_FAILURE); + } + + data->port = ntohs(addr.sin_port); + // printf("IPv4 listening on port %d\n", data->port); + + if (listen(sockfd, 1) < 0) { + perror("IPv4 listen failed"); + exit(EXIT_FAILURE); + } + + // Signal that IPv4 listener is ready + atomic_fetch_add(&global_state, 1); + + // Wait for IPv6 to be ready before accepting + while (atomic_load(&global_state) < 3) { + // Busy wait + } + + data->client_sock = accept(sockfd, NULL, NULL); + if (data->client_sock < 0) { + perror("IPv4 accept failed"); + exit(EXIT_FAILURE); + } + + while (atomic_load(&global_state) < 4) { + // Wait for main to signal it's connected + } + + const char *message = "Hello from IPv4!"; + unassert(send(data->client_sock, message, strlen(message), 0) > 0); + + unassert(!close(sockfd)); + return NULL; +} + +void *ipv6_listener(void *arg) { + listener_data *data = (listener_data *)arg; + int sockfd = socket(AF_INET6, SOCK_STREAM, 0); + if (sockfd < 0) { + perror("IPv6 socket creation failed"); + exit(EXIT_FAILURE); + } + + struct sockaddr_in6 addr = {0}; + addr.sin6_family = AF_INET6; + addr.sin6_addr = in6addr_loopback; + addr.sin6_port = 0; // Random port + + if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + perror("IPv6 bind failed"); + exit(EXIT_FAILURE); + } + + socklen_t len = sizeof(addr); + if (getsockname(sockfd, (struct sockaddr *)&addr, &len) < 0) { + perror("getsockname failed"); + exit(EXIT_FAILURE); + } + + data->port = ntohs(addr.sin6_port); + // printf("IPv6 listening on port %d\n", data->port); + + if (listen(sockfd, 1) < 0) { + perror("IPv6 listen failed"); + exit(EXIT_FAILURE); + } + + // Signal that IPv6 listener is ready and wait for IPv4 + int expected = 1; + while (!atomic_compare_exchange_weak(&global_state, &expected, 3)) { + expected = 1; // Reset expected value if CAS failed + } + + data->client_sock = accept(sockfd, NULL, NULL); + if (data->client_sock < 0) { + perror("IPv6 accept failed"); + exit(EXIT_FAILURE); + } + + while (atomic_load(&global_state) < 5) { + // Wait for IPv4 to send its message + } + + const char *message = "Hello from IPv6!"; + unassert(send(data->client_sock, message, strlen(message), 0) > 0); + + unassert(!close(sockfd)); + return NULL; +} + +int main() { + ShowCrashReports(); + + pthread_t ipv4_thread, ipv6_thread; + listener_data ipv4_data = {0}, ipv6_data = {0}; + + if (pthread_create(&ipv4_thread, NULL, ipv4_listener, &ipv4_data) != 0) { + perror("Failed to create IPv4 thread"); + exit(EXIT_FAILURE); + } + + if (pthread_create(&ipv6_thread, NULL, ipv6_listener, &ipv6_data) != 0) { + perror("Failed to create IPv6 thread"); + exit(EXIT_FAILURE); + } + + // Wait for both listeners to be ready + while (atomic_load(&global_state) < 3) { + // Busy wait + } + + int ipv4_sock = socket(AF_INET, SOCK_STREAM, 0); + int ipv6_sock = socket(AF_INET6, SOCK_STREAM, 0); + + struct sockaddr_in ipv4_addr = {0}; + ipv4_addr.sin_family = AF_INET; + ipv4_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + ipv4_addr.sin_port = htons(ipv4_data.port); + + struct sockaddr_in6 ipv6_addr = {0}; + ipv6_addr.sin6_family = AF_INET6; + ipv6_addr.sin6_addr = in6addr_loopback; + ipv6_addr.sin6_port = htons(ipv6_data.port); + + if (connect(ipv4_sock, (struct sockaddr *)&ipv4_addr, sizeof(ipv4_addr)) < + 0) { + perror("IPv4 connect failed"); + exit(EXIT_FAILURE); + } + + if (connect(ipv6_sock, (struct sockaddr *)&ipv6_addr, sizeof(ipv6_addr)) < + 0) { + perror("IPv6 connect failed"); + exit(EXIT_FAILURE); + } + + // Signal that main thread is connected + atomic_store(&global_state, 4); + + struct pollfd fds[2]; + fds[0].fd = ipv4_sock; + fds[0].events = POLLIN; + fds[1].fd = ipv6_sock; + fds[1].events = POLLIN; + + char buffer[BUFFER_SIZE]; + + while (atomic_load(&global_state) < 6) { + if (poll(fds, 2, -1) > 0) { + if (fds[0].revents & POLLIN) { + ssize_t n = recv(ipv4_sock, buffer, BUFFER_SIZE - 1, 0); + unassert(n != -1); + buffer[n] = '\0'; + // printf("Received from IPv4: %s\n", buffer); + unassert(atomic_load(&global_state) == 4); + atomic_store(&global_state, 5); + } + if (fds[1].revents & POLLIN) { + ssize_t n = recv(ipv6_sock, buffer, BUFFER_SIZE - 1, 0); + unassert(n != -1); + buffer[n] = '\0'; + // printf("Received from IPv6: %s\n", buffer); + unassert(atomic_load(&global_state) == 5); + atomic_store(&global_state, 6); + } + } + } + + unassert(!close(ipv4_sock)); + unassert(!close(ipv6_sock)); + + unassert(!pthread_join(ipv4_thread, NULL)); + unassert(!pthread_join(ipv6_thread, NULL)); + + CheckForMemoryLeaks(); + return 0; +} diff --git a/third_party/nsync/testing/mu_starvation_test.c b/third_party/nsync/testing/mu_starvation_test_.c similarity index 100% rename from third_party/nsync/testing/mu_starvation_test.c rename to third_party/nsync/testing/mu_starvation_test_.c