mirror of
https://github.com/jart/cosmopolitan.git
synced 2025-05-29 08:42:28 +00:00
Fix ordering of pthread_create(pthread_t *thread)
This change fixes a bug where signal_latency_async_test would flake less than 1/1000 of the time. What was happening was pthread_kill(sender_thr) would return EFAULT. This was because pthread_create() was not returning the thread object pointer until after clone() had been called. So it was actually possible for the main thread to stall after calling clone() and during that time the receiver would launch and receive a signal from the sender thread, and then fail when it tried to send a pong. I thought I'd use a barrier at first, in the test, to synchronize thread creation, but I firmly believe that pthread_create() was to blame and now that's fixed
This commit is contained in:
parent
ed6d133a27
commit
e939659b70
6 changed files with 57 additions and 63 deletions
|
@ -492,7 +492,7 @@ relegated bool TellOpenbsdThisIsStackMemory(void *addr, size_t size) {
|
||||||
|
|
||||||
// OpenBSD only permits RSP to occupy memory that's been explicitly
|
// OpenBSD only permits RSP to occupy memory that's been explicitly
|
||||||
// defined as stack memory, i.e. `lo <= %rsp < hi` must be the case
|
// defined as stack memory, i.e. `lo <= %rsp < hi` must be the case
|
||||||
relegated errno_t FixupCustomStackOnOpenbsd(pthread_attr_t *attr) {
|
relegated bool FixupCustomStackOnOpenbsd(pthread_attr_t *attr) {
|
||||||
|
|
||||||
// get interval
|
// get interval
|
||||||
uintptr_t lo = (uintptr_t)attr->__stackaddr;
|
uintptr_t lo = (uintptr_t)attr->__stackaddr;
|
||||||
|
@ -503,15 +503,11 @@ relegated errno_t FixupCustomStackOnOpenbsd(pthread_attr_t *attr) {
|
||||||
hi = hi & -__pagesize;
|
hi = hi & -__pagesize;
|
||||||
|
|
||||||
// tell os it's stack memory
|
// tell os it's stack memory
|
||||||
errno_t olderr = errno;
|
if (!TellOpenbsdThisIsStackMemory((void *)lo, hi - lo))
|
||||||
if (!TellOpenbsdThisIsStackMemory((void *)lo, hi - lo)) {
|
return false;
|
||||||
errno_t err = errno;
|
|
||||||
errno = olderr;
|
|
||||||
return err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// update attributes with usable stack address
|
// update attributes with usable stack address
|
||||||
attr->__stackaddr = (void *)lo;
|
attr->__stackaddr = (void *)lo;
|
||||||
attr->__stacksize = hi - lo;
|
attr->__stacksize = hi - lo;
|
||||||
return 0;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ void cosmo_stack_unlock(void);
|
||||||
void cosmo_stack_wipe(void);
|
void cosmo_stack_wipe(void);
|
||||||
|
|
||||||
bool TellOpenbsdThisIsStackMemory(void *, size_t);
|
bool TellOpenbsdThisIsStackMemory(void *, size_t);
|
||||||
errno_t FixupCustomStackOnOpenbsd(pthread_attr_t *);
|
bool FixupCustomStackOnOpenbsd(pthread_attr_t *);
|
||||||
|
|
||||||
COSMOPOLITAN_C_END_
|
COSMOPOLITAN_C_END_
|
||||||
#endif /* COSMOPOLITAN_LIBC_STACK_H_ */
|
#endif /* COSMOPOLITAN_LIBC_STACK_H_ */
|
||||||
|
|
|
@ -128,7 +128,7 @@ forceinline pureconst struct PosixThread *_pthread_self(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
forceinline void _pthread_ref(struct PosixThread *pt) {
|
forceinline void _pthread_ref(struct PosixThread *pt) {
|
||||||
atomic_fetch_add_explicit(&pt->pt_refs, 1, memory_order_acq_rel);
|
atomic_fetch_add_explicit(&pt->pt_refs, 1, memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
forceinline void _pthread_unref(struct PosixThread *pt) {
|
forceinline void _pthread_unref(struct PosixThread *pt) {
|
||||||
|
|
|
@ -199,14 +199,12 @@ static errno_t pthread_create_impl(pthread_t *thread,
|
||||||
const pthread_attr_t *attr,
|
const pthread_attr_t *attr,
|
||||||
void *(*start_routine)(void *), void *arg,
|
void *(*start_routine)(void *), void *arg,
|
||||||
sigset_t oldsigs) {
|
sigset_t oldsigs) {
|
||||||
int rc, e = errno;
|
errno_t err;
|
||||||
struct PosixThread *pt;
|
struct PosixThread *pt;
|
||||||
|
|
||||||
// create posix thread object
|
// create posix thread object
|
||||||
if (!(pt = calloc(1, sizeof(struct PosixThread)))) {
|
if (!(pt = calloc(1, sizeof(struct PosixThread))))
|
||||||
errno = e;
|
|
||||||
return EAGAIN;
|
return EAGAIN;
|
||||||
}
|
|
||||||
dll_init(&pt->list);
|
dll_init(&pt->list);
|
||||||
pt->pt_locale = &__global_locale;
|
pt->pt_locale = &__global_locale;
|
||||||
pt->pt_start = start_routine;
|
pt->pt_start = start_routine;
|
||||||
|
@ -215,7 +213,6 @@ static errno_t pthread_create_impl(pthread_t *thread,
|
||||||
// create thread local storage memory
|
// create thread local storage memory
|
||||||
if (!(pt->pt_tls = _mktls(&pt->tib))) {
|
if (!(pt->pt_tls = _mktls(&pt->tib))) {
|
||||||
free(pt);
|
free(pt);
|
||||||
errno = e;
|
|
||||||
return EAGAIN;
|
return EAGAIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,9 +229,9 @@ static errno_t pthread_create_impl(pthread_t *thread,
|
||||||
// caller supplied their own stack
|
// caller supplied their own stack
|
||||||
// assume they know what they're doing as much as possible
|
// assume they know what they're doing as much as possible
|
||||||
if (IsOpenbsd()) {
|
if (IsOpenbsd()) {
|
||||||
if ((rc = FixupCustomStackOnOpenbsd(&pt->pt_attr))) {
|
if (!FixupCustomStackOnOpenbsd(&pt->pt_attr)) {
|
||||||
_pthread_free(pt);
|
_pthread_free(pt);
|
||||||
return rc;
|
return EPERM;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -259,7 +256,7 @@ static errno_t pthread_create_impl(pthread_t *thread,
|
||||||
if (!(pt->pt_attr.__sigaltstackaddr =
|
if (!(pt->pt_attr.__sigaltstackaddr =
|
||||||
malloc(pt->pt_attr.__sigaltstacksize))) {
|
malloc(pt->pt_attr.__sigaltstacksize))) {
|
||||||
_pthread_free(pt);
|
_pthread_free(pt);
|
||||||
return errno;
|
return EAGAIN;
|
||||||
}
|
}
|
||||||
pt->pt_flags |= PT_OWNSIGALTSTACK;
|
pt->pt_flags |= PT_OWNSIGALTSTACK;
|
||||||
}
|
}
|
||||||
|
@ -282,35 +279,41 @@ static errno_t pthread_create_impl(pthread_t *thread,
|
||||||
memory_order_relaxed);
|
memory_order_relaxed);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
_pthread_free(pt);
|
// pthread_attr_setdetachstate() makes this impossible
|
||||||
return EINVAL;
|
__builtin_unreachable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if pthread_attr_setdetachstate() was used then it's possible for
|
||||||
|
// the `pt` object to be freed before this clone call has returned!
|
||||||
|
atomic_store_explicit(&pt->pt_refs, 1, memory_order_relaxed);
|
||||||
|
|
||||||
// add thread to global list
|
// add thread to global list
|
||||||
// we add it to the beginning since zombies go at the end
|
// we add it to the beginning since zombies go at the end
|
||||||
_pthread_lock();
|
_pthread_lock();
|
||||||
dll_make_first(&_pthread_list, &pt->list);
|
dll_make_first(&_pthread_list, &pt->list);
|
||||||
_pthread_unlock();
|
_pthread_unlock();
|
||||||
|
|
||||||
// if pthread_attr_setdetachstate() was used then it's possible for
|
// we don't normally do this, but it's important to write the result
|
||||||
// the `pt` object to be freed before this clone call has returned!
|
// memory before spawning the thread, so it's visible to the threads
|
||||||
_pthread_ref(pt);
|
*thread = (pthread_t)pt;
|
||||||
|
|
||||||
// launch PosixThread(pt) in new thread
|
// launch PosixThread(pt) in new thread
|
||||||
if ((rc = clone(
|
if ((err = clone(
|
||||||
PosixThread, pt->pt_attr.__stackaddr, pt->pt_attr.__stacksize,
|
PosixThread, pt->pt_attr.__stackaddr, pt->pt_attr.__stacksize,
|
||||||
CLONE_VM | CLONE_THREAD | CLONE_FS | CLONE_FILES | CLONE_SIGHAND |
|
CLONE_VM | CLONE_THREAD | CLONE_FS | CLONE_FILES | CLONE_SIGHAND |
|
||||||
CLONE_SYSVSEM | CLONE_SETTLS | CLONE_PARENT_SETTID |
|
CLONE_SYSVSEM | CLONE_SETTLS | CLONE_PARENT_SETTID |
|
||||||
CLONE_CHILD_SETTID | CLONE_CHILD_CLEARTID,
|
CLONE_CHILD_SETTID | CLONE_CHILD_CLEARTID,
|
||||||
pt, &pt->tib->tib_ptid, __adj_tls(pt->tib), &pt->tib->tib_ctid))) {
|
pt, &pt->tib->tib_ptid, __adj_tls(pt->tib), &pt->tib->tib_ctid))) {
|
||||||
|
*thread = 0; // posix doesn't require we do this
|
||||||
_pthread_lock();
|
_pthread_lock();
|
||||||
dll_remove(&_pthread_list, &pt->list);
|
dll_remove(&_pthread_list, &pt->list);
|
||||||
_pthread_unlock();
|
_pthread_unlock();
|
||||||
_pthread_free(pt);
|
_pthread_free(pt);
|
||||||
return rc;
|
if (err == ENOMEM)
|
||||||
|
err = EAGAIN;
|
||||||
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
*thread = (pthread_t)pt;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -359,7 +362,7 @@ static const char *DescribeHandle(char buf[12], errno_t err, pthread_t *th) {
|
||||||
* └──────────────┘
|
* └──────────────┘
|
||||||
*
|
*
|
||||||
* @param thread is used to output the thread id upon success, which
|
* @param thread is used to output the thread id upon success, which
|
||||||
* must be non-null
|
* must be non-null; upon failure, its value is undefined
|
||||||
* @param attr points to launch configuration, or may be null
|
* @param attr points to launch configuration, or may be null
|
||||||
* to use sensible defaults; it must be initialized using
|
* to use sensible defaults; it must be initialized using
|
||||||
* pthread_attr_init()
|
* pthread_attr_init()
|
||||||
|
@ -375,6 +378,7 @@ static const char *DescribeHandle(char buf[12], errno_t err, pthread_t *th) {
|
||||||
errno_t pthread_create(pthread_t *thread, const pthread_attr_t *attr,
|
errno_t pthread_create(pthread_t *thread, const pthread_attr_t *attr,
|
||||||
void *(*start_routine)(void *), void *arg) {
|
void *(*start_routine)(void *), void *arg) {
|
||||||
errno_t err;
|
errno_t err;
|
||||||
|
errno_t olderr = errno;
|
||||||
_pthread_decimate(kPosixThreadZombie);
|
_pthread_decimate(kPosixThreadZombie);
|
||||||
BLOCK_SIGNALS;
|
BLOCK_SIGNALS;
|
||||||
err = pthread_create_impl(thread, attr, start_routine, arg, _SigMask);
|
err = pthread_create_impl(thread, attr, start_routine, arg, _SigMask);
|
||||||
|
@ -382,7 +386,10 @@ errno_t pthread_create(pthread_t *thread, const pthread_attr_t *attr,
|
||||||
STRACE("pthread_create([%s], %p, %t, %p) → %s",
|
STRACE("pthread_create([%s], %p, %t, %p) → %s",
|
||||||
DescribeHandle(alloca(12), err, thread), attr, start_routine, arg,
|
DescribeHandle(alloca(12), err, thread), attr, start_routine, arg,
|
||||||
DescribeErrno(err));
|
DescribeErrno(err));
|
||||||
if (!err)
|
if (!err) {
|
||||||
_pthread_unref(*(struct PosixThread **)thread);
|
_pthread_unref(*(struct PosixThread **)thread);
|
||||||
|
} else {
|
||||||
|
errno = olderr;
|
||||||
|
}
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,11 +40,10 @@ void receiver_signal_handler(int signo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *sender_func(void *arg) {
|
void *sender_func(void *arg) {
|
||||||
|
|
||||||
for (int i = 0; i < ITERATIONS; i++) {
|
for (int i = 0; i < ITERATIONS; i++) {
|
||||||
|
|
||||||
// Wait a bit sometimes
|
// Wait a bit sometimes
|
||||||
if (rand() % 2 == 1) {
|
if (rand() % 2) {
|
||||||
volatile unsigned v = 0;
|
volatile unsigned v = 0;
|
||||||
for (;;)
|
for (;;)
|
||||||
if (++v == 4000)
|
if (++v == 4000)
|
||||||
|
@ -67,32 +66,25 @@ void *sender_func(void *arg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *receiver_func(void *arg) {
|
void *receiver_func(void *arg) {
|
||||||
|
static int iteration = 0;
|
||||||
// Wait for asynchronous signals
|
do {
|
||||||
for (;;) {
|
// wait for signal handler to be called
|
||||||
if (atomic_exchange_explicit(&receiver_got_signal, 0,
|
if (atomic_exchange_explicit(&receiver_got_signal, 0,
|
||||||
memory_order_acq_rel)) {
|
memory_order_acq_rel)) {
|
||||||
|
|
||||||
|
// record received time
|
||||||
struct timespec receive_time;
|
struct timespec receive_time;
|
||||||
clock_gettime(CLOCK_MONOTONIC, &receive_time);
|
clock_gettime(CLOCK_MONOTONIC, &receive_time);
|
||||||
|
|
||||||
long sec_diff = receive_time.tv_sec - send_time.tv_sec;
|
long sec_diff = receive_time.tv_sec - send_time.tv_sec;
|
||||||
long nsec_diff = receive_time.tv_nsec - send_time.tv_nsec;
|
long nsec_diff = receive_time.tv_nsec - send_time.tv_nsec;
|
||||||
double latency_ns = sec_diff * 1e9 + nsec_diff;
|
double latency_ns = sec_diff * 1e9 + nsec_diff;
|
||||||
|
latencies[iteration++] = latency_ns;
|
||||||
|
|
||||||
static int iteration = 0;
|
// pong sender
|
||||||
if (iteration < ITERATIONS)
|
|
||||||
latencies[iteration++] = latency_ns;
|
|
||||||
|
|
||||||
// Pong sender
|
|
||||||
if (pthread_kill(sender_thread, SIGUSR2))
|
if (pthread_kill(sender_thread, SIGUSR2))
|
||||||
exit(2);
|
exit(2);
|
||||||
|
|
||||||
// Exit if done
|
|
||||||
if (iteration >= ITERATIONS)
|
|
||||||
pthread_exit(0);
|
|
||||||
}
|
}
|
||||||
}
|
} while (iteration < ITERATIONS);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,11 +100,7 @@ int compare(const void *a, const void *b) {
|
||||||
|
|
||||||
int main() {
|
int main() {
|
||||||
|
|
||||||
// TODO(jart): fix flakes
|
// install handlers
|
||||||
if (1)
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
// Install signal handlers
|
|
||||||
struct sigaction sa;
|
struct sigaction sa;
|
||||||
sa.sa_handler = receiver_signal_handler;
|
sa.sa_handler = receiver_signal_handler;
|
||||||
sa.sa_flags = 0;
|
sa.sa_flags = 0;
|
||||||
|
@ -121,27 +109,27 @@ int main() {
|
||||||
sa.sa_handler = sender_signal_handler;
|
sa.sa_handler = sender_signal_handler;
|
||||||
sigaction(SIGUSR2, &sa, 0);
|
sigaction(SIGUSR2, &sa, 0);
|
||||||
|
|
||||||
// Create receiver thread first
|
// create receiver thread first
|
||||||
if (pthread_create(&receiver_thread, 0, receiver_func, 0))
|
if (pthread_create(&receiver_thread, 0, receiver_func, 0))
|
||||||
exit(11);
|
exit(11);
|
||||||
|
|
||||||
// Create sender thread
|
// create sender thread
|
||||||
if (pthread_create(&sender_thread, 0, sender_func, 0))
|
if (pthread_create(&sender_thread, 0, sender_func, 0))
|
||||||
exit(12);
|
exit(12);
|
||||||
|
|
||||||
// Wait for threads to finish
|
// wait for threads to finish
|
||||||
if (pthread_join(sender_thread, 0))
|
if (pthread_join(sender_thread, 0))
|
||||||
exit(13);
|
exit(13);
|
||||||
if (pthread_join(receiver_thread, 0))
|
if (pthread_join(receiver_thread, 0))
|
||||||
exit(14);
|
exit(14);
|
||||||
|
|
||||||
// Compute mean latency
|
// compute mean latency
|
||||||
double total_latency = 0;
|
double total_latency = 0;
|
||||||
for (int i = 0; i < ITERATIONS; i++)
|
for (int i = 0; i < ITERATIONS; i++)
|
||||||
total_latency += latencies[i];
|
total_latency += latencies[i];
|
||||||
double mean_latency = total_latency / ITERATIONS;
|
double mean_latency = total_latency / ITERATIONS;
|
||||||
|
|
||||||
// Sort latencies to compute percentiles
|
// sort latencies to compute percentiles
|
||||||
qsort(latencies, ITERATIONS, sizeof(double), compare);
|
qsort(latencies, ITERATIONS, sizeof(double), compare);
|
||||||
|
|
||||||
double p50 = latencies[(int)(0.50 * ITERATIONS)];
|
double p50 = latencies[(int)(0.50 * ITERATIONS)];
|
||||||
|
|
|
@ -6,17 +6,20 @@ import concurrent.futures
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
from typing import List, Dict, Tuple
|
from typing import List, Dict, Tuple
|
||||||
|
|
||||||
NUM_PARALLEL = int(os.cpu_count() * 1.5)
|
NUM_PARALLEL = int(os.cpu_count() * 20)
|
||||||
|
|
||||||
def find_test_files(root_dir: str) -> List[str]:
|
def find_test_files(root: str) -> List[str]:
|
||||||
"""Find all executable files ending with _test recursively."""
|
"""Find all executable files ending with _test recursively."""
|
||||||
test_files = []
|
test_files = []
|
||||||
for root, _, files in os.walk(root_dir):
|
if os.path.isdir(root):
|
||||||
for file in files:
|
for root, _, files in os.walk(root):
|
||||||
if file.endswith('_test'):
|
for file in files:
|
||||||
file_path = os.path.join(root, file)
|
if file.endswith('_test'):
|
||||||
if os.access(file_path, os.X_OK):
|
file_path = os.path.join(root, file)
|
||||||
test_files.append(file_path)
|
if os.access(file_path, os.X_OK):
|
||||||
|
test_files.append(file_path)
|
||||||
|
elif root.endswith('_test'):
|
||||||
|
test_files.append(root)
|
||||||
return test_files
|
return test_files
|
||||||
|
|
||||||
def run_single_test(test_path: str) -> int:
|
def run_single_test(test_path: str) -> int:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue