mirror of
https://github.com/jart/cosmopolitan.git
synced 2025-08-06 09:50:28 +00:00
Exponential back-off
This commit is contained in:
parent
67b5200a0b
commit
c9f8973de7
3 changed files with 47 additions and 43 deletions
|
@ -35,7 +35,7 @@ int main() {
|
|||
cthread_t thread;
|
||||
int rc = cthread_create(&thread, NULL, &worker, NULL);
|
||||
if (rc == 0) {
|
||||
cthread_native_sem_wait(&semaphore, 0, 0, NULL);
|
||||
cthread_native_sem_wait(&semaphore, 0, NULL);
|
||||
//printf("thread created: %p\n", thread);
|
||||
sleep(1);
|
||||
#if 1
|
||||
|
@ -45,7 +45,7 @@ int main() {
|
|||
sleep(2);
|
||||
#endif
|
||||
cthread_native_sem_signal(&semaphore);
|
||||
cthread_native_sem_wait(&semaphore, 0, 0, NULL);
|
||||
cthread_native_sem_wait(&semaphore, 0, NULL);
|
||||
//printf("thread joined: %p -> %d\n", thread, rc);
|
||||
} else {
|
||||
printf("ERROR: thread could not be started: %d\n", rc);
|
||||
|
|
|
@ -24,6 +24,16 @@
|
|||
|
||||
#define CTHREAD_THREAD_VAL_BITS 32
|
||||
|
||||
static void pause(int attempt) {
|
||||
if (attempt < 16) {
|
||||
for (int i = 0; i < (1 << attempt); ++i) {
|
||||
asm("pause");
|
||||
}
|
||||
} else {
|
||||
cthread_yield();
|
||||
}
|
||||
}
|
||||
|
||||
int cthread_native_sem_init(cthread_native_sem_t* sem, int count) {
|
||||
sem->linux.count = count;
|
||||
return 0;
|
||||
|
@ -54,8 +64,8 @@ int cthread_native_sem_signal(cthread_native_sem_t* sem) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int cthread_native_sem_wait_slow(cthread_native_sem_t* sem,
|
||||
const struct timespec* timeout) {
|
||||
int cthread_native_sem_wait_futex(cthread_native_sem_t* sem,
|
||||
const struct timespec* timeout) {
|
||||
uint64_t count;
|
||||
|
||||
// record current thread as waiter
|
||||
|
@ -66,12 +76,15 @@ int cthread_native_sem_wait_slow(cthread_native_sem_t* sem,
|
|||
|
||||
for (;;) {
|
||||
// try to acquire the semaphore, as well as remove itself from waiters
|
||||
if ((uint32_t)count > 0 &&
|
||||
atomic_compare_exchange_weak(
|
||||
&sem->linux.count, count,
|
||||
count - 1 - ((uint64_t)1 << CTHREAD_THREAD_VAL_BITS)))
|
||||
break;
|
||||
|
||||
while ((uint32_t)count > 0) {
|
||||
// without spin, we could miss a futex wake
|
||||
if (atomic_compare_exchange_weak(
|
||||
&sem->linux.count, count,
|
||||
count - 1 - ((uint64_t)1 << CTHREAD_THREAD_VAL_BITS))) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int flags = FUTEX_WAIT;
|
||||
register struct timespec* timeout_ asm("r10") = timeout;
|
||||
|
||||
|
@ -88,45 +101,36 @@ int cthread_native_sem_wait_slow(cthread_native_sem_t* sem,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int cthread_native_sem_wait_spin_yield(cthread_native_sem_t* sem,
|
||||
uint64_t count, int yield,
|
||||
const struct timespec* timeout) {
|
||||
// spin on yield
|
||||
while (yield-- > 0) {
|
||||
if ((count >> CTHREAD_THREAD_VAL_BITS) != 0)
|
||||
break; // a thread is already waiting in queue
|
||||
if ((uint32_t)count > 0 &&
|
||||
atomic_compare_exchange_weak(&sem->linux.count, count, count - 1))
|
||||
return 0;
|
||||
cthread_yield();
|
||||
}
|
||||
|
||||
return cthread_native_sem_wait_slow(sem, timeout);
|
||||
}
|
||||
|
||||
int cthread_native_sem_wait_spin(cthread_native_sem_t* sem, uint64_t count,
|
||||
int spin, int yield,
|
||||
const struct timespec* timeout) {
|
||||
int spin, const struct timespec* timeout) {
|
||||
// spin on pause
|
||||
while (spin-- > 0) {
|
||||
if ((count >> CTHREAD_THREAD_VAL_BITS) != 0) break;
|
||||
if ((uint32_t)count > 0 &&
|
||||
atomic_compare_exchange_weak(&sem->linux.count, count, count - 1))
|
||||
return 0;
|
||||
asm volatile("pause");
|
||||
for (int attempt = 0; attempt < spin; ++attempt) {
|
||||
//if ((count >> CTHREAD_THREAD_VAL_BITS) != 0) break;
|
||||
while ((uint32_t)count > 0) {
|
||||
// spin is useful if multiple waiters can acquire the semaphore at the same time
|
||||
if (atomic_compare_exchange_weak(
|
||||
&sem->linux.count, count, count - 1)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
pause(attempt);
|
||||
}
|
||||
|
||||
return cthread_native_sem_wait_spin_yield(sem, count, yield, timeout);
|
||||
|
||||
return cthread_native_sem_wait_futex(sem, timeout);
|
||||
}
|
||||
|
||||
int cthread_native_sem_wait(cthread_native_sem_t* sem, int spin, int yield,
|
||||
int cthread_native_sem_wait(cthread_native_sem_t* sem, int spin,
|
||||
const struct timespec* timeout) {
|
||||
uint64_t count = atomic_load(&sem->linux.count);
|
||||
|
||||
// uncontended
|
||||
if ((count >> 32) == 0 && (uint32_t)count > 0 &&
|
||||
atomic_compare_exchange_weak(&sem->linux.count, count, count - 1))
|
||||
return 0;
|
||||
|
||||
return cthread_native_sem_wait_spin(sem, count, spin, yield, timeout);
|
||||
while ((uint32_t)count > 0) {
|
||||
// spin is useful if multiple waiters can acquire the semaphore at the same time
|
||||
if (atomic_compare_exchange_weak(
|
||||
&sem->linux.count, count, count - 1)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
return cthread_native_sem_wait_spin(sem, count, spin, timeout);
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ struct timespec;
|
|||
int cthread_native_sem_init(cthread_native_sem_t*, int);
|
||||
int cthread_native_sem_destroy(cthread_native_sem_t*);
|
||||
|
||||
int cthread_native_sem_wait(cthread_native_sem_t*, int, int, const struct timespec*);
|
||||
int cthread_native_sem_wait(cthread_native_sem_t*, int, const struct timespec*);
|
||||
int cthread_native_sem_signal(cthread_native_sem_t*);
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue