Rewrite Windows poll()

We can now await signals, files, pipes, and console simultaneously. This
change also gives a deeper review and testing to changes made yesterday.
This commit is contained in:
Justine Tunney 2024-09-10 18:59:06 -07:00
parent cceddd21b2
commit fbdf9d028c
No known key found for this signature in database
GPG key ID: BE714B4575D6E328
15 changed files with 425 additions and 191 deletions

View file

@ -117,6 +117,11 @@ int main(int argc, char *argv[]) {
void *p;
const char *prog;
// if you need the tiny64 program for windows:
//
// make -j o//tool/hello/life-pe.ape
// scp o//tool/hello/life-pe.ape windows:tiny64
//
if (argc <= 1) {
prog = "tiny64";
} else {

View file

@ -26,7 +26,6 @@ uint32_t sys_getuid_nt(void);
int __ensurefds_unlocked(int);
void __printfds(struct Fd *, size_t);
int CountConsoleInputBytes(void);
int CountConsoleInputBytesBlocking(uint32_t, sigset_t);
int FlushConsoleInputBytes(void);
int64_t GetConsoleInputHandle(void);
int64_t GetConsoleOutputHandle(void);

View file

@ -24,15 +24,18 @@
#include "libc/calls/struct/sigaction.h"
#include "libc/calls/struct/sigset.internal.h"
#include "libc/calls/struct/timespec.h"
#include "libc/calls/syscall_support-nt.internal.h"
#include "libc/dce.h"
#include "libc/errno.h"
#include "libc/intrin/atomic.h"
#include "libc/intrin/fds.h"
#include "libc/intrin/strace.h"
#include "libc/intrin/weaken.h"
#include "libc/macros.h"
#include "libc/mem/mem.h"
#include "libc/nt/console.h"
#include "libc/nt/enum/filetype.h"
#include "libc/nt/enum/wait.h"
#include "libc/nt/errors.h"
#include "libc/nt/files.h"
#include "libc/nt/ipc.h"
@ -41,6 +44,7 @@
#include "libc/nt/synchronization.h"
#include "libc/nt/thread.h"
#include "libc/nt/thunk/msabi.h"
#include "libc/nt/time.h"
#include "libc/nt/winsock.h"
#include "libc/runtime/runtime.h"
#include "libc/sock/internal.h"
@ -71,182 +75,14 @@
#define POLLPRI_ 0x0400 // MSDN unsupported
// </sync libc/sysv/consts.sh>
// Polls on the New Technology.
//
// This function is used to implement poll() and select(). You may poll
// on sockets, files and the console at the same time. We also poll for
// both signals and posix thread cancelation, while the poll is polling
static textwindows int sys_poll_nt_impl(struct pollfd *fds, uint64_t nfds,
uint32_t *ms, sigset_t sigmask) {
bool ok;
uint64_t millis;
struct sys_pollfd_nt pipefds[64];
struct sys_pollfd_nt sockfds[64];
int pipeindices[ARRAYLEN(pipefds)];
int sockindices[ARRAYLEN(sockfds)];
struct timespec deadline, remain, now;
uint32_t cm, avail, waitfor, already_slept;
int i, rc, sn, pn, sig, gotinvals, gotpipes, gotsocks, handler_was_called;
textwindows static dontinline struct timespec sys_poll_nt_now(void) {
uint64_t hectons;
QueryUnbiasedInterruptTimePrecise(&hectons);
return timespec_fromnanos(hectons * 100);
}
waitfor = ms ? *ms : -1u;
deadline = timespec_add(timespec_mono(), timespec_frommillis(waitfor));
// do the planning
// we need to read static variables
// we might need to spawn threads and open pipes
__fds_lock();
for (gotinvals = rc = sn = pn = i = 0; i < nfds; ++i) {
if (fds[i].fd < 0)
continue;
if (__isfdopen(fds[i].fd)) {
if (__isfdkind(fds[i].fd, kFdSocket)) {
if (sn < ARRAYLEN(sockfds)) {
// WSAPoll whines if we pass POLLNVAL, POLLHUP, or POLLERR.
sockindices[sn] = i;
sockfds[sn].handle = g_fds.p[fds[i].fd].handle;
sockfds[sn].events =
fds[i].events & (POLLRDNORM_ | POLLRDBAND_ | POLLWRNORM_);
sockfds[sn].revents = 0;
++sn;
} else {
// too many socket fds
rc = e2big();
break;
}
} else if (pn < ARRAYLEN(pipefds)) {
pipeindices[pn] = i;
pipefds[pn].handle = g_fds.p[fds[i].fd].handle;
pipefds[pn].events = 0;
pipefds[pn].revents = 0;
switch (g_fds.p[fds[i].fd].flags & O_ACCMODE) {
case O_RDONLY:
pipefds[pn].events = fds[i].events & POLLIN_;
break;
case O_WRONLY:
pipefds[pn].events = fds[i].events & POLLOUT_;
break;
case O_RDWR:
pipefds[pn].events = fds[i].events & (POLLIN_ | POLLOUT_);
break;
default:
break;
}
++pn;
} else {
// too many non-socket fds
rc = e2big();
break;
}
} else {
++gotinvals;
}
}
__fds_unlock();
if (rc)
// failed to create a polling solution
return rc;
// perform the i/o and sleeping and looping
for (;;) {
// determine how long to wait
now = timespec_mono();
if (timespec_cmp(now, deadline) < 0) {
remain = timespec_sub(deadline, now);
millis = timespec_tomillis(remain);
waitfor = MIN(millis, 0xffffffffu);
waitfor = MIN(waitfor, POLL_INTERVAL_MS);
} else {
waitfor = 0;
}
// see if input is available on non-sockets
already_slept = 0;
for (gotpipes = i = 0; i < pn; ++i) {
if (pipefds[i].events & POLLWRNORM_)
// we have no way of polling if a non-socket is writeable yet
// therefore we assume that if it can happen, it shall happen
pipefds[i].revents |= POLLWRNORM_;
if (GetFileType(pipefds[i].handle) == kNtFileTypePipe) {
ok = PeekNamedPipe(pipefds[i].handle, 0, 0, 0, &avail, 0);
POLLTRACE("PeekNamedPipe(%ld, 0, 0, 0, [%'u], 0) → {%hhhd, %d}",
pipefds[i].handle, avail, ok, GetLastError());
if (ok) {
if (avail)
pipefds[i].revents |= POLLRDNORM_;
} else if (GetLastError() == kNtErrorHandleEof ||
GetLastError() == kNtErrorBrokenPipe) {
pipefds[i].revents &= ~POLLWRNORM_;
pipefds[i].revents |= POLLHUP_;
} else {
pipefds[i].revents &= ~POLLWRNORM_;
pipefds[i].revents |= POLLERR_;
}
} else if (GetConsoleMode(pipefds[i].handle, &cm)) {
// some programs like bash like to poll([stdin], 1, -1) so let's
// avoid busy looping in such cases. we could generalize this to
// always avoid busy loops, but we'd need poll to launch threads
if (!sn && (pipefds[i].events & POLLRDNORM_) && !already_slept++) {
int err = errno;
switch (CountConsoleInputBytesBlocking(waitfor, sigmask)) {
case -1:
if (errno == EINTR || errno == ECANCELED)
return -1;
errno = err;
pipefds[i].revents &= ~POLLWRNORM_;
pipefds[i].revents |= POLLERR_;
break;
case 0:
pipefds[i].revents &= ~POLLWRNORM_;
pipefds[i].revents |= POLLHUP_;
break;
default:
pipefds[i].revents |= POLLRDNORM_;
break;
}
} else {
switch (CountConsoleInputBytes()) {
case 0:
break;
case -1:
pipefds[i].revents &= ~POLLWRNORM_;
pipefds[i].revents |= POLLHUP_;
break;
default:
pipefds[i].revents |= POLLRDNORM_;
break;
}
}
} else {
// we have no way of polling if a non-socket is readable yet
// therefore we assume that if it can happen it shall happen
pipefds[i].revents |= POLLRDNORM_;
}
if (!(pipefds[i].events & POLLRDNORM_))
pipefds[i].revents &= ~POLLRDNORM_;
if (pipefds[i].revents)
++gotpipes;
}
// if we haven't found any good results yet then here we
// compute a small time slice we don't mind sleeping for
if (sn) {
already_slept = 1;
if ((gotsocks = WSAPoll(sockfds, sn, waitfor)) == -1)
return __winsockerr();
} else {
gotsocks = 0;
}
// add some artificial delay, which we use as an opportunity to also
// check for pending signals, thread cancelation, etc.
if (!gotinvals && !gotsocks && !gotpipes && waitfor) {
if (!already_slept) {
POLLTRACE("poll() parking for %'d out of %'lu ms", waitfor,
timespec_tomillis(remain));
if (_park_norestart(waitfor, sigmask) == -1)
return -1; // eintr, ecanceled, etc.
} else {
textwindows static int sys_poll_nt_sigcheck(sigset_t sigmask) {
int sig, handler_was_called;
if (_check_cancel() == -1)
return -1;
if (_weaken(__sig_get) && (sig = _weaken(__sig_get)(sigmask))) {
@ -256,31 +92,251 @@ static textwindows int sys_poll_nt_impl(struct pollfd *fds, uint64_t nfds,
if (handler_was_called)
return eintr();
}
return 0;
}
// Polls on the New Technology.
//
// This function is used to implement poll() and select(). You may poll
// on sockets, files and the console at the same time. We also poll for
// both signals and posix thread cancelation, while the poll is polling
textwindows static int sys_poll_nt_impl(struct pollfd *fds, uint64_t nfds,
uint32_t *ms, sigset_t sigmask) {
bool ok;
uint64_t millis;
int fileindices[64];
int sockindices[64];
int64_t filehands[64];
struct PosixThread *pt;
int i, rc, ev, kind, gotsocks;
struct sys_pollfd_nt sockfds[64];
struct timespec deadline, remain, now;
uint32_t cm, fi, wi, sn, pn, avail, waitfor, already_slept;
waitfor = ms ? *ms : -1u;
deadline = timespec_add(sys_poll_nt_now(), timespec_frommillis(waitfor));
// ensure revents is cleared
for (i = 0; i < nfds; ++i)
fds[i].revents = 0;
// divide files from sockets
// check for invalid file descriptors
__fds_lock();
for (rc = sn = pn = i = 0; i < nfds; ++i) {
if (fds[i].fd < 0)
continue;
if (__isfdopen(fds[i].fd)) {
kind = g_fds.p[fds[i].fd].kind;
if (kind == kFdSocket) {
// we can use WSAPoll() for these fds
if (sn < ARRAYLEN(sockfds)) {
// WSAPoll whines if we pass POLLNVAL, POLLHUP, or POLLERR.
sockindices[sn] = i;
sockfds[sn].handle = g_fds.p[fds[i].fd].handle;
sockfds[sn].events =
fds[i].events & (POLLRDNORM_ | POLLRDBAND_ | POLLWRNORM_);
sockfds[sn].revents = 0;
++sn;
} else {
// too many sockets
rc = einval();
break;
}
} else if (kind == kFdFile || kind == kFdConsole) {
// we can use WaitForMultipleObjects() for these fds
if (pn < ARRAYLEN(fileindices) - 1) { // last slot for semaphore
fileindices[pn] = i;
filehands[pn] = g_fds.p[fds[i].fd].handle;
++pn;
} else {
// too many files
rc = einval();
break;
}
} else if (kind == kFdDevNull || kind == kFdDevRandom || kind == kFdZip) {
// we can't wait on these kinds via win32
if (fds[i].events & (POLLRDNORM_ | POLLWRNORM_)) {
// the linux kernel does this irrespective of oflags
fds[i].revents = fds[i].events & (POLLRDNORM_ | POLLWRNORM_);
}
} else {
// unsupported file type
fds[i].revents = POLLNVAL_;
}
} else {
// file not open
fds[i].revents = POLLNVAL_;
}
rc += !!fds[i].revents;
}
__fds_unlock();
if (rc)
return rc;
// perform poll operation
for (;;) {
// check input status of pipes / consoles without blocking
// this ensures any socket fds won't starve them of events
// if a file handle is POLLOUT only, we just mark it ready
for (i = 0; i < pn; ++i) {
fi = fileindices[i];
ev = fds[fi].events;
ev &= POLLRDNORM_ | POLLWRNORM_;
if ((g_fds.p[fds[fi].fd].flags & O_ACCMODE) == O_RDONLY)
ev &= ~POLLWRNORM_;
if ((g_fds.p[fds[fi].fd].flags & O_ACCMODE) == O_WRONLY)
ev &= ~POLLRDNORM_;
if ((ev & POLLWRNORM_) && !(ev & POLLRDNORM_)) {
fds[fi].revents = fds[fi].events & (POLLRDNORM_ | POLLWRNORM_);
} else if (GetFileType(filehands[i]) == kNtFileTypePipe) {
ok = PeekNamedPipe(filehands[i], 0, 0, 0, &avail, 0);
POLLTRACE("PeekNamedPipe(%ld, 0, 0, 0, [%'u], 0) → {%hhhd, %d}",
filehands[i], avail, ok, GetLastError());
if (ok) {
if (avail)
fds[fi].revents = POLLRDNORM_;
} else if (GetLastError() == kNtErrorHandleEof ||
GetLastError() == kNtErrorBrokenPipe) {
fds[fi].revents = POLLHUP_;
} else {
fds[fi].revents = POLLERR_;
}
} else if (GetConsoleMode(filehands[i], &cm)) {
switch (CountConsoleInputBytes()) {
case 0:
fds[fi].revents = fds[fi].events & POLLWRNORM_;
break;
case -1:
fds[fi].revents = POLLHUP_;
break;
default:
fds[fi].revents = fds[fi].events & (POLLRDNORM_ | POLLWRNORM_);
break;
}
}
rc += !!fds[fi].revents;
}
// determine how long to wait
now = sys_poll_nt_now();
if (timespec_cmp(now, deadline) < 0) {
remain = timespec_sub(deadline, now);
millis = timespec_tomillis(remain);
waitfor = MIN(millis, 0xffffffffu);
waitfor = MIN(waitfor, POLL_INTERVAL_MS);
} else {
waitfor = 0; // we timed out
}
// check for events and/or readiness on sockets
// we always do this due to issues with POLLOUT
if (sn) {
// if we need to wait, then we prefer to wait inside WSAPoll()
// this ensures network events are received in ~10µs not ~10ms
if (!rc && waitfor) {
if (sys_poll_nt_sigcheck(sigmask))
return -1;
already_slept = waitfor;
} else {
already_slept = 0;
}
if ((gotsocks = WSAPoll(sockfds, sn, already_slept)) == -1)
return __winsockerr();
if (gotsocks) {
for (i = 0; i < sn; ++i)
if (sockfds[i].revents) {
fds[sockindices[i]].revents = sockfds[i].revents;
++rc;
}
} else if (already_slept) {
if (sys_poll_nt_sigcheck(sigmask))
return -1;
}
} else {
already_slept = 0;
}
// return if we observed events
if (rc || !waitfor)
break;
// if nothing has happened and we haven't already waited in poll()
// then we can wait on consoles, pipes, and signals simultaneously
// this ensures low latency for apps like emacs which with no sock
// here we shall actually report that something can be written too
if (!already_slept) {
if (sys_poll_nt_sigcheck(sigmask))
return -1;
pt = _pthread_self();
filehands[pn] = pt->pt_semaphore = CreateSemaphore(0, 0, 1, 0);
atomic_store_explicit(&pt->pt_blocker, PT_BLOCKER_SEM,
memory_order_release);
wi = WaitForMultipleObjects(pn + 1, filehands, 0, waitfor);
atomic_store_explicit(&pt->pt_blocker, 0, memory_order_release);
CloseHandle(filehands[pn]);
if (wi == -1u) {
// win32 wait failure
return __winerr();
} else if (wi == pn) {
// our semaphore was signalled
if (sys_poll_nt_sigcheck(sigmask))
return -1;
} else if ((wi ^ kNtWaitAbandoned) < pn) {
// this is possibly because a process or thread was killed
fds[fileindices[wi ^ kNtWaitAbandoned]].revents = POLLERR_;
++rc;
} else if (wi < pn) {
fi = fileindices[wi];
// one of the handles we polled is ready for fi/o
if (GetConsoleMode(filehands[wi], &cm)) {
switch (CountConsoleInputBytes()) {
case 0:
// it's possible there was input and it was handled by the
// ICANON reader, and therefore should not be reported yet
if (fds[fi].events & POLLWRNORM_)
fds[fi].revents = POLLWRNORM_;
break;
case -1:
fds[fi].revents = POLLHUP_;
break;
default:
fds[fi].revents = fds[fi].events & (POLLRDNORM_ | POLLWRNORM_);
break;
}
} else if (GetFileType(filehands[wi]) == kNtFileTypePipe) {
if ((fds[fi].events & POLLRDNORM_) &&
(g_fds.p[fds[fi].fd].flags & O_ACCMODE) != O_WRONLY) {
if (PeekNamedPipe(filehands[wi], 0, 0, 0, &avail, 0)) {
fds[fi].revents = fds[fi].events & (POLLRDNORM_ | POLLWRNORM_);
} else if (GetLastError() == kNtErrorHandleEof ||
GetLastError() == kNtErrorBrokenPipe) {
fds[fi].revents = POLLHUP_;
} else {
fds[fi].revents = POLLERR_;
}
} else {
fds[fi].revents = fds[fi].events & (POLLRDNORM_ | POLLWRNORM_);
}
} else {
fds[fi].revents = fds[fi].events & (POLLRDNORM_ | POLLWRNORM_);
}
rc += !!fds[fi].revents;
} else {
// should only be possible on kNtWaitTimeout or semaphore abandoned
// keep looping for events and we'll catch timeout when appropriate
if (sys_poll_nt_sigcheck(sigmask))
return -1;
}
}
// we gave all the sockets and all the named pipes a shot
// if we found anything at all then it's time to end work
if (gotinvals || gotpipes || gotsocks || !waitfor)
// once again, return if we observed events
if (rc)
break;
}
// the system call is going to succeed
// it's now ok to start setting the output memory
for (i = 0; i < nfds; ++i) {
if (fds[i].fd < 0 || __isfdopen(fds[i].fd)) {
fds[i].revents = 0;
} else {
fds[i].revents = POLLNVAL_;
}
}
for (i = 0; i < pn; ++i)
fds[pipeindices[i]].revents = pipefds[i].revents;
for (i = 0; i < sn; ++i)
fds[sockindices[i]].revents = sockfds[i].revents;
// and finally return
return gotinvals + gotpipes + gotsocks;
return rc;
}
textwindows int sys_poll_nt(struct pollfd *fds, uint64_t nfds, uint32_t *ms,

View file

@ -29,7 +29,7 @@
* On Windows it's only possible to poll 64 file descriptors at a time.
* This is a limitation imposed by WSAPoll(). Cosmopolitan Libc's poll()
* polyfill can go higher in some cases. For example, you can actually
* poll 64 sockets and 64 pipes/terminals at the same time. Furthermore,
* poll 64 sockets and 63 non-sockets at the same time. Furthermore,
* elements whose fd field is set to a negative number are ignored and
* will not count against this limit.
*
@ -59,8 +59,10 @@
* @return fds[𝑖].revents is always zero initializaed and then will
* be populated with POLL{IN,OUT,PRI,HUP,ERR,NVAL} if something
* was determined about the file descriptor
* @raise E2BIG if we exceeded the 64 socket limit on Windows
* @raise EINVAL if we exceeded the 64 socket limit on Windows
* @raise ECANCELED if thread was cancelled in masked mode
* @raise EINVAL if `nfds` exceeded `RLIMIT_NOFILE`
* @raise ENOMEM on failure to allocate memory
* @raise EINTR if signal was delivered
* @cancelationpoint
* @asyncsignalsafe

View file

@ -90,8 +90,11 @@
* was determined about the file descriptor
* @param timeout if null will block indefinitely
* @param sigmask may be null in which case no mask change happens
* @raise E2BIG if we exceeded the 64 socket limit on Windows
* @raise EINVAL if we exceeded the 64 socket limit on Windows
* @raise ECANCELED if thread was cancelled in masked mode
* @raise EINVAL if `nfds` exceeded `RLIMIT_NOFILE`
* @raise ENOMEM on failure to allocate memory
* @raise EINVAL if `*timeout` is invalid
* @raise EINTR if signal was delivered
* @cancelationpoint
* @asyncsignalsafe
@ -104,6 +107,10 @@ int ppoll(struct pollfd *fds, size_t nfds, const struct timespec *timeout,
struct timespec ts, *tsp;
BEGIN_CANCELATION_POINT;
// validate timeout
if (timeout && timeout->tv_nsec >= 1000000000)
return einval();
// The OpenBSD poll() man pages claims it'll ignore POLLERR, POLLHUP,
// and POLLNVAL in pollfd::events except it doesn't actually do this.
size_t bytes = 0;

View file

@ -899,7 +899,8 @@ RestartOperation:
goto RestartOperation;
}
textwindows int CountConsoleInputBytesBlocking(uint32_t ms, sigset_t waitmask) {
textwindows static int CountConsoleInputBytesBlocking(uint32_t ms,
sigset_t waitmask) {
int got = CountConsoleInputBytes();
if (got == -1)
return 0;

View file

@ -18,8 +18,8 @@
*/
#include "libc/calls/cp.internal.h"
#include "libc/calls/internal.h"
#include "libc/intrin/fds.h"
#include "libc/dce.h"
#include "libc/intrin/fds.h"
#include "libc/intrin/strace.h"
#include "libc/sock/internal.h"
#include "libc/sock/sock.h"
@ -36,6 +36,12 @@
* also means getsockname() can be called to retrieve routing details.
*
* @return 0 on success or -1 w/ errno
* @raise EALREADY if a non-blocking connection request already happened
* @raise EADDRINUSE if local address is already in use
* @raise EINTR if a signal handler was called instead
* @raise ENETUNREACH if network is unreachable
* @raise ETIMEDOUT if connection timed out
* @raise EISCONN if already connected
* @cancelationpoint
* @asyncsignalsafe
* @restartable (unless SO_RCVTIMEO)

View file

@ -23,6 +23,10 @@
#include "third_party/nsync/cv.h"
#include "third_party/nsync/futex.internal.h"
__static_yoink("nsync_mu_lock");
__static_yoink("nsync_mu_unlock");
__static_yoink("nsync_mu_trylock");
/**
* Wakes all threads waiting on condition, e.g.
*
@ -43,9 +47,14 @@
errno_t pthread_cond_broadcast(pthread_cond_t *cond) {
#if PTHREAD_USE_NSYNC
// do nothing if pthread_cond_timedwait() hasn't been called yet
// this is because we dont know for certain if nsync use is safe
if (!atomic_load_explicit(&cond->_waited, memory_order_acquire))
return 0;
// favor *NSYNC if this is a process private condition variable
// if using Mike Burrows' code isn't possible, use a naive impl
if (!cond->_pshared && !IsXnuSilicon()) {
if (!cond->_footek) {
nsync_cv_broadcast((nsync_cv *)cond);
return 0;
}

View file

@ -30,11 +30,8 @@ errno_t pthread_cond_init(pthread_cond_t *cond,
const pthread_condattr_t *attr) {
*cond = (pthread_cond_t){0};
if (attr) {
cond->_footek = IsXnuSilicon() || attr->_pshared;
cond->_pshared = attr->_pshared;
cond->_clock = attr->_clock;
} else {
cond->_footek = IsXnuSilicon();
}
return 0;
}

View file

@ -22,6 +22,10 @@
#include "third_party/nsync/cv.h"
#include "third_party/nsync/futex.internal.h"
__static_yoink("nsync_mu_lock");
__static_yoink("nsync_mu_unlock");
__static_yoink("nsync_mu_trylock");
/**
* Wakes at least one thread waiting on condition, e.g.
*
@ -43,7 +47,7 @@ errno_t pthread_cond_signal(pthread_cond_t *cond) {
#if PTHREAD_USE_NSYNC
// do nothing if pthread_cond_timedwait() hasn't been called yet
// this is because we dont know for certain if nsync is safe
// this is because we dont know for certain if nsync use is safe
if (!atomic_load_explicit(&cond->_waited, memory_order_acquire))
return 0;

View file

@ -31,11 +31,21 @@
#include "third_party/nsync/futex.internal.h"
#include "third_party/nsync/time.h"
__static_yoink("nsync_mu_lock");
__static_yoink("nsync_mu_unlock");
__static_yoink("nsync_mu_trylock");
struct PthreadWait {
pthread_cond_t *cond;
pthread_mutex_t *mutex;
};
static bool can_use_nsync(uint64_t muword) {
return !IsXnuSilicon() && //
MUTEX_TYPE(muword) == PTHREAD_MUTEX_NORMAL &&
MUTEX_PSHARED(muword) == PTHREAD_PROCESS_PRIVATE;
}
static void pthread_cond_leave(void *arg) {
struct PthreadWait *wait = (struct PthreadWait *)arg;
if (pthread_mutex_lock(wait->mutex))
@ -117,19 +127,20 @@ errno_t pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
MUTEX_OWNER(muword) != gettid())
return EPERM;
// if the cond is process shared then the mutex needs to be too
if ((cond->_pshared == PTHREAD_PROCESS_SHARED) ^
(MUTEX_PSHARED(muword) == PTHREAD_PROCESS_SHARED))
return EINVAL;
#if PTHREAD_USE_NSYNC
// the first time pthread_cond_timedwait() is called we learn if the
// associated mutex is normal and private. that means *NSYNC is safe
// this decision is permanent. you can't use a recursive mutex later
if (!atomic_load_explicit(&cond->_waited, memory_order_acquire)) {
if (!cond->_footek)
if (MUTEX_TYPE(muword) != PTHREAD_MUTEX_NORMAL ||
MUTEX_PSHARED(muword) != PTHREAD_PROCESS_PRIVATE)
cond->_footek = true;
cond->_footek = !can_use_nsync(muword);
atomic_store_explicit(&cond->_waited, true, memory_order_release);
} else if (!cond->_footek) {
if (MUTEX_TYPE(muword) != PTHREAD_MUTEX_NORMAL ||
MUTEX_PSHARED(muword) != PTHREAD_PROCESS_PRIVATE)
if (!can_use_nsync(muword))
return EINVAL;
}
#endif

View file

@ -20,6 +20,7 @@
#include "libc/calls/calls.h"
#include "libc/calls/pledge.h"
#include "libc/calls/struct/sigaction.h"
#include "libc/calls/struct/timespec.h"
#include "libc/dce.h"
#include "libc/errno.h"
#include "libc/intrin/describeflags.h"
@ -34,6 +35,7 @@
#include "libc/sysv/consts/af.h"
#include "libc/sysv/consts/inaddr.h"
#include "libc/sysv/consts/ipproto.h"
#include "libc/sysv/consts/o.h"
#include "libc/sysv/consts/sig.h"
#include "libc/sysv/consts/sock.h"
#include "libc/testlib/testlib.h"
@ -44,8 +46,7 @@
bool gotsig;
void SetUpOnce(void) {
__pledge_mode = PLEDGE_PENALTY_KILL_PROCESS | PLEDGE_STDERR_LOGGING;
ASSERT_SYS(0, 0, pledge("stdio proc inet", 0));
testlib_enable_tmp_setup_teardown();
}
void SetUp(void) {
@ -60,6 +61,12 @@ TEST(poll, allZero_doesNothingPrettyMuch) {
EXPECT_SYS(0, 0, poll(0, 0, 0));
}
TEST(poll, allZeroWithTimeout_sleeps) {
struct timespec ts1 = timespec_mono();
EXPECT_SYS(0, 0, poll(0, 0, 100));
EXPECT_GE(timespec_tomillis(timespec_sub(timespec_mono(), ts1)), 100);
}
TEST(ppoll, weCanProveItChecksForSignals) {
if (IsXnu())
return;
@ -203,22 +210,141 @@ TEST(poll, pipe_hasInput) {
EXPECT_EQ(0, sigprocmask(SIG_SETMASK, &savemask, 0));
}
#if 0
TEST(poll, emptyFds_becomesSleep) {
// timing tests w/o mocks are always the hardest
int64_t a, b, c, p, i = 0;
do {
if (++i == 5) {
kprintf("too much cpu churn%n");
return;
}
p = TSC_AUX_CORE(rdpid());
a = rdtsc();
EXPECT_SYS(0, 0, poll(0, 0, 5));
b = rdtsc();
EXPECT_SYS(0, 0, poll(0, 0, 50));
c = rdtsc();
} while (TSC_AUX_CORE(rdpid()) != p);
EXPECT_LT((b - a) * 2, c - b);
TEST(poll, file_pollin) {
int fd;
EXPECT_SYS(0, 3, (fd = open("boop", O_CREAT | O_RDWR | O_TRUNC, 0644)));
struct pollfd fds[] = {{fd, POLLIN}};
EXPECT_SYS(0, 1, poll(fds, 1, -1));
EXPECT_TRUE(!!(fds[0].revents & POLLIN));
EXPECT_TRUE(!(fds[0].revents & POLLOUT));
EXPECT_SYS(0, 0, close(fd));
}
TEST(poll, file_pollout) {
int fd;
EXPECT_SYS(0, 3, (fd = open("boop", O_CREAT | O_RDWR | O_TRUNC, 0644)));
struct pollfd fds[] = {{fd, POLLOUT}};
EXPECT_SYS(0, 1, poll(fds, 1, -1));
EXPECT_TRUE(!(fds[0].revents & POLLIN));
EXPECT_TRUE(!!(fds[0].revents & POLLOUT));
EXPECT_SYS(0, 0, close(fd));
}
TEST(poll, file_pollinout) {
int fd;
EXPECT_SYS(0, 3, (fd = open("boop", O_CREAT | O_RDWR | O_TRUNC, 0644)));
struct pollfd fds[] = {{fd, POLLIN | POLLOUT}};
EXPECT_SYS(0, 1, poll(fds, 1, -1));
EXPECT_TRUE(!!(fds[0].revents & POLLIN));
EXPECT_TRUE(!!(fds[0].revents & POLLOUT));
EXPECT_SYS(0, 0, close(fd));
}
TEST(poll, file_rdonly_pollinout) {
int fd;
EXPECT_SYS(0, 3, (fd = open("boop", O_CREAT | O_RDWR | O_TRUNC, 0644)));
EXPECT_SYS(0, 0, close(fd));
EXPECT_SYS(0, 3, (fd = open("boop", O_RDONLY)));
struct pollfd fds[] = {{fd, POLLIN | POLLOUT}};
EXPECT_SYS(0, 1, poll(fds, 1, -1));
EXPECT_TRUE(!!(fds[0].revents & POLLIN));
EXPECT_TRUE(!!(fds[0].revents & POLLOUT)); // counter-intuitive
EXPECT_SYS(0, 0, close(fd));
}
TEST(poll, file_wronly_pollin) {
int fd;
EXPECT_SYS(0, 3, (fd = creat("boop", 0644)));
struct pollfd fds[] = {{fd, POLLIN}};
EXPECT_SYS(0, 1, poll(fds, 1, -1));
EXPECT_TRUE(!!(fds[0].revents & POLLIN));
EXPECT_TRUE(!(fds[0].revents & POLLOUT));
EXPECT_SYS(0, 0, close(fd));
}
TEST(poll, file_wronly_pollout) {
int fd;
EXPECT_SYS(0, 3, (fd = creat("boop", 0644)));
struct pollfd fds[] = {{fd, POLLOUT}};
EXPECT_SYS(0, 1, poll(fds, 1, -1));
EXPECT_TRUE(!(fds[0].revents & POLLIN));
EXPECT_TRUE(!!(fds[0].revents & POLLOUT));
EXPECT_SYS(0, 0, close(fd));
}
TEST(poll, file_wronly_pollinout) {
int fd;
EXPECT_SYS(0, 3, (fd = creat("boop", 0644)));
struct pollfd fds[] = {{fd, POLLIN | POLLOUT}};
EXPECT_SYS(0, 1, poll(fds, 1, -1));
EXPECT_TRUE(!!(fds[0].revents & POLLIN));
EXPECT_TRUE(!!(fds[0].revents & POLLOUT));
EXPECT_SYS(0, 0, close(fd));
}
TEST(poll, file_rdwr_pollinoutpri) {
int fd;
EXPECT_SYS(0, 3, (fd = open("boop", O_CREAT | O_RDWR | O_TRUNC, 0644)));
struct pollfd fds[] = {{fd, POLLIN | POLLOUT | POLLPRI}};
EXPECT_SYS(0, 1, poll(fds, 1, -1));
EXPECT_TRUE(!!(fds[0].revents & POLLIN));
EXPECT_TRUE(!!(fds[0].revents & POLLOUT));
if (IsXnu())
EXPECT_TRUE(!!(fds[0].revents & POLLPRI)); // wut
else
EXPECT_TRUE(!(fds[0].revents & POLLPRI));
EXPECT_SYS(0, 0, close(fd));
}
TEST(poll, pipein_pollout_blocks) {
if (IsFreebsd() || IsOpenbsd())
return;
int pipefds[2];
EXPECT_SYS(0, 0, pipe(pipefds));
struct pollfd fds[] = {{pipefds[0], POLLOUT}};
EXPECT_SYS(0, 0, poll(fds, 1, 0));
struct timespec ts1 = timespec_mono();
EXPECT_SYS(0, 0, poll(fds, 1, 10));
EXPECT_GE(timespec_tomillis(timespec_sub(timespec_mono(), ts1)), 10);
EXPECT_SYS(0, 0, close(pipefds[1]));
EXPECT_SYS(0, 0, close(pipefds[0]));
}
TEST(poll, pipeout_pollout) {
int pipefds[2];
EXPECT_SYS(0, 0, pipe(pipefds));
struct pollfd fds[] = {{pipefds[1], POLLOUT}};
EXPECT_SYS(0, 1, poll(fds, 1, 0));
EXPECT_TRUE(!(fds[0].revents & POLLIN));
EXPECT_TRUE(!!(fds[0].revents & POLLOUT));
EXPECT_SYS(0, 1, poll(fds, 1, 1));
EXPECT_TRUE(!(fds[0].revents & POLLIN));
EXPECT_TRUE(!!(fds[0].revents & POLLOUT));
EXPECT_SYS(0, 0, close(pipefds[1]));
EXPECT_SYS(0, 0, close(pipefds[0]));
}
TEST(poll, pipein_pollin_timeout) {
int pipefds[2];
EXPECT_SYS(0, 0, pipe(pipefds));
struct pollfd fds[] = {{pipefds[0], POLLIN}};
struct timespec ts1 = timespec_mono();
EXPECT_SYS(0, 0, poll(fds, 1, 10));
EXPECT_GE(timespec_tomillis(timespec_sub(timespec_mono(), ts1)), 10);
EXPECT_SYS(0, 0, close(pipefds[1]));
EXPECT_SYS(0, 0, close(pipefds[0]));
}
TEST(poll, pipein_pollinout_timeout) {
if (IsFreebsd() || IsOpenbsd())
return;
int pipefds[2];
EXPECT_SYS(0, 0, pipe(pipefds));
struct pollfd fds[] = {{pipefds[0], POLLIN | POLLOUT}};
EXPECT_SYS(0, 0, poll(fds, 1, 0));
struct timespec ts1 = timespec_mono();
EXPECT_SYS(0, 0, poll(fds, 1, 10));
EXPECT_GE(timespec_tomillis(timespec_sub(timespec_mono(), ts1)), 10);
EXPECT_SYS(0, 0, close(pipefds[1]));
EXPECT_SYS(0, 0, close(pipefds[0]));
}
#endif

View file

@ -46,14 +46,18 @@ TEST(connect, blocking) {
ASSERT_SYS(0, 0, bind(3, (struct sockaddr *)&addr, sizeof(addr)));
ASSERT_SYS(0, 0, getsockname(3, (struct sockaddr *)&addr, &addrsize));
ASSERT_SYS(0, 0, listen(3, SOMAXCONN));
SPAWN(fork);
while (!*sem)
pthread_yield();
ASSERT_SYS(0, 4, accept(3, (struct sockaddr *)&addr, &addrsize));
ASSERT_SYS(0, 2, read(4, buf, 16)); // hi
ASSERT_SYS(0, 5, write(4, "hello", 5));
ASSERT_SYS(0, 3, read(4, buf, 16)); // bye
PARENT();
ASSERT_SYS(0, 0, close(3));
ASSERT_SYS(0, 3, socket(AF_INET, SOCK_STREAM, IPPROTO_TCP));
ASSERT_SYS(0, 0, connect(3, (struct sockaddr *)&addr, sizeof(addr)));
@ -79,7 +83,9 @@ TEST(connect, blocking) {
ASSERT_STREQ("hello", buf);
ASSERT_SYS(0, 3, write(3, "bye", 3));
ASSERT_SYS(0, 0, close(3));
WAIT(exit, 0);
munmap(sem, sizeof(unsigned));
}
@ -99,20 +105,25 @@ TEST(connect, nonblocking) {
ASSERT_SYS(0, 0, bind(3, (struct sockaddr *)&addr, sizeof(addr)));
ASSERT_SYS(0, 0, getsockname(3, (struct sockaddr *)&addr, &addrsize));
ASSERT_SYS(0, 0, listen(3, SOMAXCONN));
SPAWN(fork);
while (!*sem)
pthread_yield();
ASSERT_SYS(0, 4, accept(3, (struct sockaddr *)&addr, &addrsize));
ASSERT_SYS(0, 2, read(4, buf, 16)); // hi
ASSERT_SYS(0, 5, write(4, "hello", 5));
ASSERT_SYS(0, 3, read(4, buf, 16)); // bye
PARENT();
ASSERT_SYS(0, 0, close(3));
ASSERT_SYS(0, 3, socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP));
ASSERT_SYS(EINPROGRESS, -1,
connect(3, (struct sockaddr *)&addr, sizeof(addr)));
if (!(IsLinux() || IsNetbsd())) {
// this doens't work on rhel7 and netbsd
if (!IsLinux() && !IsNetbsd() && !IsXnu()) {
// this doens't work on linux and netbsd
// on MacOS this can EISCONN before accept() is called
ASSERT_SYS(EALREADY, -1,
connect(3, (struct sockaddr *)&addr, sizeof(addr)));
}
@ -137,6 +148,8 @@ TEST(connect, nonblocking) {
ASSERT_STREQ("hello", buf);
ASSERT_SYS(0, 3, write(3, "bye", 3));
ASSERT_SYS(0, 0, close(3));
WAIT(exit, 0);
munmap(sem, sizeof(unsigned));
}

View file

@ -24,9 +24,7 @@
/* Aborts after printing the nul-terminated string s[]. */
void nsync_panic_ (const char *s) {
if (1)
__builtin_trap();
tinyprint(2, "error: nsync panic: ", s,
tinyprint (2, "error: nsync panic: ", s,
"cosmoaddr2line ", program_invocation_name, " ",
DescribeBacktrace (__builtin_frame_address (0)), "\n",
NULL);

View file

@ -1166,9 +1166,9 @@ static void PerformBestEffortIo(void) {
DEBUGF("poll() toto=%d [grace=%,ldns]", toto,
timespec_tonanos(GetGraceTime()));
if (toto) {
if (fds[0].revents & (POLLIN | POLLERR))
if (fds[0].revents & (POLLIN | POLLHUP | POLLERR))
ReadKeyboard();
if (fds[1].revents & (POLLOUT | POLLERR))
if (fds[1].revents & (POLLOUT | POLLHUP | POLLERR))
WriteVideo();
}
} else if (errno == EINTR) {
@ -1299,7 +1299,7 @@ static void PickDefaults(void) {
*
* strcmp(nulltoempty(getenv("TERM")), "xterm-direct") == 0
*/
if (strcmp(nulltoempty(getenv("TERM")), "xterm-kitty") == 0)
if (IsWindows() || !strcmp(nulltoempty(getenv("TERM")), "xterm-kitty"))
ttyquantsetup(kTtyQuantTrue, TTYQUANT()->chans, kTtyBlocksUnicode);
}