From c9f8973de78cfda9e909abe6993975b820396bf9 Mon Sep 17 00:00:00 2001 From: Lemaitre Date: Sun, 10 Oct 2021 12:06:18 +0200 Subject: [PATCH] Exponential back-off --- examples/thread.c | 4 +- libc/thread/nativesem.c | 84 +++++++++++++++++++++-------------------- libc/thread/nativesem.h | 2 +- 3 files changed, 47 insertions(+), 43 deletions(-) diff --git a/examples/thread.c b/examples/thread.c index 06ae57914..d156f89b6 100644 --- a/examples/thread.c +++ b/examples/thread.c @@ -35,7 +35,7 @@ int main() { cthread_t thread; int rc = cthread_create(&thread, NULL, &worker, NULL); if (rc == 0) { - cthread_native_sem_wait(&semaphore, 0, 0, NULL); + cthread_native_sem_wait(&semaphore, 0, NULL); //printf("thread created: %p\n", thread); sleep(1); #if 1 @@ -45,7 +45,7 @@ int main() { sleep(2); #endif cthread_native_sem_signal(&semaphore); - cthread_native_sem_wait(&semaphore, 0, 0, NULL); + cthread_native_sem_wait(&semaphore, 0, NULL); //printf("thread joined: %p -> %d\n", thread, rc); } else { printf("ERROR: thread could not be started: %d\n", rc); diff --git a/libc/thread/nativesem.c b/libc/thread/nativesem.c index 2e7eccafb..f260e177e 100644 --- a/libc/thread/nativesem.c +++ b/libc/thread/nativesem.c @@ -24,6 +24,16 @@ #define CTHREAD_THREAD_VAL_BITS 32 +static void pause(int attempt) { + if (attempt < 16) { + for (int i = 0; i < (1 << attempt); ++i) { + asm("pause"); + } + } else { + cthread_yield(); + } +} + int cthread_native_sem_init(cthread_native_sem_t* sem, int count) { sem->linux.count = count; return 0; @@ -54,8 +64,8 @@ int cthread_native_sem_signal(cthread_native_sem_t* sem) { return 0; } -int cthread_native_sem_wait_slow(cthread_native_sem_t* sem, - const struct timespec* timeout) { +int cthread_native_sem_wait_futex(cthread_native_sem_t* sem, + const struct timespec* timeout) { uint64_t count; // record current thread as waiter @@ -66,12 +76,15 @@ int cthread_native_sem_wait_slow(cthread_native_sem_t* sem, for (;;) { // try to acquire the semaphore, as well as remove itself from waiters - if ((uint32_t)count > 0 && - atomic_compare_exchange_weak( - &sem->linux.count, count, - count - 1 - ((uint64_t)1 << CTHREAD_THREAD_VAL_BITS))) - break; - + while ((uint32_t)count > 0) { + // without spin, we could miss a futex wake + if (atomic_compare_exchange_weak( + &sem->linux.count, count, + count - 1 - ((uint64_t)1 << CTHREAD_THREAD_VAL_BITS))) { + return 0; + } + } + int flags = FUTEX_WAIT; register struct timespec* timeout_ asm("r10") = timeout; @@ -88,45 +101,36 @@ int cthread_native_sem_wait_slow(cthread_native_sem_t* sem, return 0; } -int cthread_native_sem_wait_spin_yield(cthread_native_sem_t* sem, - uint64_t count, int yield, - const struct timespec* timeout) { - // spin on yield - while (yield-- > 0) { - if ((count >> CTHREAD_THREAD_VAL_BITS) != 0) - break; // a thread is already waiting in queue - if ((uint32_t)count > 0 && - atomic_compare_exchange_weak(&sem->linux.count, count, count - 1)) - return 0; - cthread_yield(); - } - - return cthread_native_sem_wait_slow(sem, timeout); -} - int cthread_native_sem_wait_spin(cthread_native_sem_t* sem, uint64_t count, - int spin, int yield, - const struct timespec* timeout) { + int spin, const struct timespec* timeout) { // spin on pause - while (spin-- > 0) { - if ((count >> CTHREAD_THREAD_VAL_BITS) != 0) break; - if ((uint32_t)count > 0 && - atomic_compare_exchange_weak(&sem->linux.count, count, count - 1)) - return 0; - asm volatile("pause"); + for (int attempt = 0; attempt < spin; ++attempt) { + //if ((count >> CTHREAD_THREAD_VAL_BITS) != 0) break; + while ((uint32_t)count > 0) { + // spin is useful if multiple waiters can acquire the semaphore at the same time + if (atomic_compare_exchange_weak( + &sem->linux.count, count, count - 1)) { + return 0; + } + } + pause(attempt); } - - return cthread_native_sem_wait_spin_yield(sem, count, yield, timeout); + + return cthread_native_sem_wait_futex(sem, timeout); } -int cthread_native_sem_wait(cthread_native_sem_t* sem, int spin, int yield, +int cthread_native_sem_wait(cthread_native_sem_t* sem, int spin, const struct timespec* timeout) { uint64_t count = atomic_load(&sem->linux.count); // uncontended - if ((count >> 32) == 0 && (uint32_t)count > 0 && - atomic_compare_exchange_weak(&sem->linux.count, count, count - 1)) - return 0; - - return cthread_native_sem_wait_spin(sem, count, spin, yield, timeout); + while ((uint32_t)count > 0) { + // spin is useful if multiple waiters can acquire the semaphore at the same time + if (atomic_compare_exchange_weak( + &sem->linux.count, count, count - 1)) { + return 0; + } + } + + return cthread_native_sem_wait_spin(sem, count, spin, timeout); } diff --git a/libc/thread/nativesem.h b/libc/thread/nativesem.h index df1b8583c..aab938b2a 100644 --- a/libc/thread/nativesem.h +++ b/libc/thread/nativesem.h @@ -18,7 +18,7 @@ struct timespec; int cthread_native_sem_init(cthread_native_sem_t*, int); int cthread_native_sem_destroy(cthread_native_sem_t*); -int cthread_native_sem_wait(cthread_native_sem_t*, int, int, const struct timespec*); +int cthread_native_sem_wait(cthread_native_sem_t*, int, const struct timespec*); int cthread_native_sem_signal(cthread_native_sem_t*);