Fix an async signal delivery flake on Windows

This commit is contained in:
Justine Tunney 2024-10-02 04:55:06 -07:00
parent e4d6eb382a
commit 85c58be942
No known key found for this signature in database
GPG key ID: BE714B4575D6E328
2 changed files with 107 additions and 46 deletions

View file

@ -136,21 +136,22 @@ static textwindows wontreturn void __sig_terminate(int sig) {
TerminateThisProcess(sig); TerminateThisProcess(sig);
} }
textwindows static void __sig_wake(struct PosixThread *pt, int sig) { textwindows static bool __sig_wake(struct PosixThread *pt, int sig) {
atomic_int *blocker; atomic_int *blocker;
blocker = atomic_load_explicit(&pt->pt_blocker, memory_order_acquire); blocker = atomic_load_explicit(&pt->pt_blocker, memory_order_acquire);
if (!blocker) if (!blocker)
return; return false;
// threads can create semaphores on an as-needed basis // threads can create semaphores on an as-needed basis
if (blocker == PT_BLOCKER_EVENT) { if (blocker == PT_BLOCKER_EVENT) {
STRACE("%G set %d's event object", sig, _pthread_tid(pt)); STRACE("%G set %d's event object", sig, _pthread_tid(pt));
SetEvent(pt->pt_event); SetEvent(pt->pt_event);
return; return !!atomic_load_explicit(&pt->pt_blocker, memory_order_acquire);
} }
// all other blocking ops that aren't overlap should use futexes // all other blocking ops that aren't overlap should use futexes
// we force restartable futexes to churn by waking w/o releasing // we force restartable futexes to churn by waking w/o releasing
STRACE("%G waking %d's futex", sig, _pthread_tid(pt)); STRACE("%G waking %d's futex", sig, _pthread_tid(pt));
WakeByAddressSingle(blocker); WakeByAddressSingle(blocker);
return !!atomic_load_explicit(&pt->pt_blocker, memory_order_acquire);
} }
textwindows static bool __sig_start(struct PosixThread *pt, int sig, textwindows static bool __sig_start(struct PosixThread *pt, int sig,
@ -302,17 +303,48 @@ static textwindows int __sig_killer(struct PosixThread *pt, int sig, int sic) {
return 0; return 0;
} }
// we can't preempt threads that masked sigs or are blocked. we also // we can't preempt threads that masked sigs or are blocked on i/o
// need to ensure we don't overflow the target thread's stack if many while ((atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire) &
// signals need to be delivered at once. we also need to make sure two (1ull << (sig - 1)))) {
// threads can't deadlock by killing each other at the same time. if (atomic_fetch_or_explicit(&pt->tib->tib_sigpending, 1ull << (sig - 1),
if ((atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire) & memory_order_acq_rel) &
(1ull << (sig - 1))) || (1ull << (sig - 1)))
atomic_exchange_explicit(&pt->pt_intoff, 1, memory_order_acquire)) { // we believe signal was already enqueued
atomic_fetch_or_explicit(&pt->tib->tib_sigpending, 1ull << (sig - 1), return 0;
memory_order_relaxed); if (__sig_wake(pt, sig))
__sig_wake(pt, sig); // we believe i/o routine will handle signal
return 0; return 0;
if (atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire) &
(1ull << (sig - 1)))
// we believe ALLOW_SIGNALS will handle signal
return 0;
if (!(atomic_fetch_and_explicit(&pt->tib->tib_sigpending,
~(1ull << (sig - 1)),
memory_order_acq_rel) &
(1ull << (sig - 1))))
// we believe another thread sniped our signal
return 0;
break;
}
// avoid race conditions and deadlocks with thread suspend process
if (atomic_exchange_explicit(&pt->pt_intoff, 1, memory_order_acquire)) {
// we believe another thread is asynchronously waking the mark
if (atomic_fetch_or_explicit(&pt->tib->tib_sigpending, 1ull << (sig - 1),
memory_order_acq_rel) &
(1ull << (sig - 1)))
// we believe our signal is already being delivered
return 0;
if (atomic_load_explicit(&pt->pt_intoff, memory_order_acquire) ||
atomic_exchange_explicit(&pt->pt_intoff, 1, memory_order_acquire))
// we believe __sig_tramp will deliver our signal
return 0;
if (!(atomic_fetch_and_explicit(&pt->tib->tib_sigpending,
~(1ull << (sig - 1)),
memory_order_acq_rel) &
(1ull << (sig - 1))))
// we believe another thread sniped our signal
return 0;
} }
// if there's no handler then killing a thread kills the process // if there's no handler then killing a thread kills the process
@ -321,17 +353,10 @@ static textwindows int __sig_killer(struct PosixThread *pt, int sig, int sic) {
__sig_terminate(sig); __sig_terminate(sig);
} }
// ignore signals already pending
uintptr_t th = _pthread_syshand(pt);
if (atomic_load_explicit(&pt->tib->tib_sigpending, memory_order_acquire) &
(1ull << (sig - 1))) {
atomic_store_explicit(&pt->pt_intoff, 0, memory_order_release);
return 0;
}
// take control of thread // take control of thread
// suspending the thread happens asynchronously // suspending the thread happens asynchronously
// however getting the context blocks until it's frozen // however getting the context blocks until it's frozen
uintptr_t th = _pthread_syshand(pt);
if (SuspendThread(th) == -1u) { if (SuspendThread(th) == -1u) {
STRACE("SuspendThread failed w/ %d", GetLastError()); STRACE("SuspendThread failed w/ %d", GetLastError());
atomic_store_explicit(&pt->pt_intoff, 0, memory_order_release); atomic_store_explicit(&pt->pt_intoff, 0, memory_order_release);
@ -349,9 +374,7 @@ static textwindows int __sig_killer(struct PosixThread *pt, int sig, int sic) {
// we can't preempt threads that masked sig or are blocked // we can't preempt threads that masked sig or are blocked
// we can't preempt threads that are running in win32 code // we can't preempt threads that are running in win32 code
// so we shall unblock the thread and let it signal itself // so we shall unblock the thread and let it signal itself
if ((atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire) & if (!((uintptr_t)__executable_start <= nc.Rip &&
(1ull << (sig - 1))) ||
!((uintptr_t)__executable_start <= nc.Rip &&
nc.Rip < (uintptr_t)__privileged_start)) { nc.Rip < (uintptr_t)__privileged_start)) {
atomic_fetch_or_explicit(&pt->tib->tib_sigpending, 1ull << (sig - 1), atomic_fetch_or_explicit(&pt->tib->tib_sigpending, 1ull << (sig - 1),
memory_order_relaxed); memory_order_relaxed);
@ -634,6 +657,7 @@ textwindows dontinstrument static uint32_t __sig_worker(void *arg) {
__maps_track((char *)(((uintptr_t)sp + __pagesize - 1) & -__pagesize) - STKSZ, __maps_track((char *)(((uintptr_t)sp + __pagesize - 1) & -__pagesize) - STKSZ,
STKSZ); STKSZ);
for (;;) { for (;;) {
// dequeue all pending signals and fire them off. if there's no // dequeue all pending signals and fire them off. if there's no
// thread that can handle them then __sig_generate will requeue // thread that can handle them then __sig_generate will requeue
// those signals back to __sig.process; hence the need for xchg // those signals back to __sig.process; hence the need for xchg
@ -644,6 +668,39 @@ textwindows dontinstrument static uint32_t __sig_worker(void *arg) {
sigs &= ~(1ull << (sig - 1)); sigs &= ~(1ull << (sig - 1));
__sig_generate(sig, SI_KERNEL); __sig_generate(sig, SI_KERNEL);
} }
// unblock stalled asynchronous signals in threads
_pthread_lock();
for (struct Dll *e = dll_first(_pthread_list); e;
e = dll_next(_pthread_list, e)) {
struct PosixThread *pt = POSIXTHREAD_CONTAINER(e);
if (atomic_load_explicit(&pt->pt_status, memory_order_acquire) >=
kPosixThreadTerminated) {
break;
}
sigset_t pending =
atomic_load_explicit(&pt->tib->tib_sigpending, memory_order_acquire);
sigset_t mask =
atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire);
if (pending & ~mask) {
_pthread_ref(pt);
_pthread_unlock();
while (!atomic_compare_exchange_weak_explicit(
&pt->tib->tib_sigpending, &pending, pending & ~mask,
memory_order_acq_rel, memory_order_relaxed)) {
}
while ((pending = pending & ~mask)) {
int sig = bsfl(pending) + 1;
pending &= ~(1ull << (sig - 1));
__sig_killer(pt, sig, SI_KERNEL);
}
_pthread_lock();
_pthread_unref(pt);
}
}
_pthread_unlock();
// wait until next scheduler quantum
Sleep(POLL_INTERVAL_MS); Sleep(POLL_INTERVAL_MS);
} }
return 0; return 0;

View file

@ -27,6 +27,7 @@ pthread_t sender_thread;
pthread_t receiver_thread; pthread_t receiver_thread;
struct timespec send_time; struct timespec send_time;
atomic_int sender_got_signal; atomic_int sender_got_signal;
atomic_int receiver_got_signal;
double latencies[ITERATIONS]; double latencies[ITERATIONS];
void sender_signal_handler(int signo) { void sender_signal_handler(int signo) {
@ -34,24 +35,7 @@ void sender_signal_handler(int signo) {
} }
void receiver_signal_handler(int signo) { void receiver_signal_handler(int signo) {
struct timespec receive_time; receiver_got_signal = 1;
clock_gettime(CLOCK_MONOTONIC, &receive_time);
long sec_diff = receive_time.tv_sec - send_time.tv_sec;
long nsec_diff = receive_time.tv_nsec - send_time.tv_nsec;
double latency_ns = sec_diff * 1e9 + nsec_diff;
static int iteration = 0;
if (iteration < ITERATIONS)
latencies[iteration++] = latency_ns;
// Pong sender
if (pthread_kill(sender_thread, SIGUSR2))
exit(2);
// Exit if done
if (iteration >= ITERATIONS)
pthread_exit(0);
} }
void *sender_func(void *arg) { void *sender_func(void *arg) {
@ -84,9 +68,29 @@ void *sender_func(void *arg) {
void *receiver_func(void *arg) { void *receiver_func(void *arg) {
// Wait for asynchronous signals // Wait for asynchronous signals
volatile unsigned v = 0; for (;;) {
for (;;) if (atomic_exchange_explicit(&receiver_got_signal, 0,
++v; memory_order_acq_rel)) {
struct timespec receive_time;
clock_gettime(CLOCK_MONOTONIC, &receive_time);
long sec_diff = receive_time.tv_sec - send_time.tv_sec;
long nsec_diff = receive_time.tv_nsec - send_time.tv_nsec;
double latency_ns = sec_diff * 1e9 + nsec_diff;
static int iteration = 0;
if (iteration < ITERATIONS)
latencies[iteration++] = latency_ns;
// Pong sender
if (pthread_kill(sender_thread, SIGUSR2))
exit(2);
// Exit if done
if (iteration >= ITERATIONS)
pthread_exit(0);
}
}
return 0; return 0;
} }