Use dynamic memory for *NSYNC waiters

This commit is contained in:
Justine Tunney 2023-11-10 01:42:06 -08:00
parent 15af5c2d7e
commit 241f949540
No known key found for this signature in database
GPG key ID: BE714B4575D6E328
12 changed files with 171 additions and 72 deletions

View file

@ -28,11 +28,11 @@ COSMOPOLITAN_C_START_
/* this header is included by 700+ files; therefore we */
/* hand-roll &__get_tls()->tib_errno to avoid #include */
/* cosmopolitan uses x28 as the tls register b/c apple */
#define errno \
(*({ \
errno_t *__ep; \
asm("sub\t%0,x28,#68" : "=r"(__ep)); \
__ep; \
#define errno \
(*({ \
errno_t *__ep; \
asm("sub\t%0,x28,#192-0x3c" : "=r"(__ep)); \
__ep; \
}))
#else
#define errno (*__errno_location())

View file

@ -67,7 +67,7 @@ __gc: .ftrace2
// if this code fails
// check if CosmoTib's size changed
sub x8,x28,#128 // __get_tls()
sub x8,x28,#192 // __get_tls()
ldr x9,[x8,0x18] // tib::garbages
ldr x10,[x9] // g->i
ldr x8,[x9,8] // g->p

View file

@ -112,7 +112,7 @@ vfork:
// } else {
// __get_tls()->tib_flags &= ~TIB_FLAG_VFORKED;
// }
sub x1,x28,#0x80 // RELIES ON TLS TIB ABI!
sub x1,x28,#192 // sizeof(CosmoTib)
ldr x2,[x1,64]
cbnz x0,2f
orr x2,x2,#TIB_FLAG_VFORKED

View file

@ -32,6 +32,7 @@
#include "libc/thread/thread.h"
#include "libc/thread/tls.h"
#include "third_party/nsync/futex.internal.h"
#include "third_party/nsync/wait_s.internal.h"
void _pthread_unwind(struct PosixThread *pt) {
struct _pthread_cleanup_buffer *cb;
@ -115,6 +116,9 @@ wontreturn void pthread_exit(void *rc) {
_weaken(__cxa_thread_finalize)();
}
_pthread_unkey(tib);
if (tib->tib_nsync) {
nsync_waiter_destroy(tib->tib_nsync);
}
_pthread_ungarbage();
_pthread_decimate();

View file

@ -37,6 +37,8 @@ struct CosmoTib {
uint32_t tib_sigstack_size;
uint32_t tib_sigstack_flags;
void **tib_keys;
void *tib_nsync;
void *tib_todo[63];
};
extern int __threaded;

View file

@ -15,7 +15,13 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "libc/calls/calls.h"
#include "libc/calls/syscall-sysv.internal.h"
#include "libc/dce.h"
#include "libc/intrin/dll.h"
#include "libc/nt/memory.h"
#include "libc/sysv/consts/map.h"
#include "libc/sysv/consts/prot.h"
#include "libc/thread/tls.h"
#include "third_party/nsync/atomic.h"
#include "third_party/nsync/atomic.internal.h"
@ -143,25 +149,112 @@ waiter *nsync_dll_waiter_samecond_ (struct Dll *e) {
/* -------------------------------- */
/* Initializes waiter struct. */
void nsync_waiter_init_ (waiter *w) {
w->tag = WAITER_TAG;
w->nw.tag = NSYNC_WAITER_TAG;
nsync_mu_semaphore_init (&w->sem);
w->nw.sem = &w->sem;
dll_init (&w->nw.q);
NSYNC_ATOMIC_UINT32_STORE_ (&w->nw.waiting, 0);
w->nw.flags = NSYNC_WAITER_FLAG_MUCV;
ATM_STORE (&w->remove_count, 0);
dll_init (&w->same_condition);
w->flags = WAITER_IN_USE;
#define kMallocBlockSize 16384
static struct {
nsync_atomic_uint32_ mu;
size_t used;
char *block;
} malloc;
static void *nsync_malloc (size_t size) {
void *res;
ASSERT (size <= kMallocBlockSize);
if (IsWindows ()) {
res = HeapAlloc (GetProcessHeap (), 0, size);
if (!res) {
nsync_panic_ ("out of memory\n");
}
} else {
nsync_spin_test_and_set_ (&malloc.mu, 1, 1, 0);
if (!malloc.block || malloc.used + size > kMallocBlockSize) {
malloc.used = 0;
malloc.block = __sys_mmap (0, kMallocBlockSize, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0, 0);
if (malloc.block == MAP_FAILED) {
nsync_panic_ ("out of memory\n");
}
}
res = malloc.block + malloc.used;
malloc.used = (malloc.used + size + 15) & -16;
ATM_STORE_REL (&malloc.mu, 0);
}
return res;
}
/* Destroys waiter struct. */
void nsync_waiter_destroy_ (waiter *w) {
if (w->tag) {
nsync_mu_semaphore_destroy (&w->sem);
w->tag = 0;
/* -------------------------------- */
static struct Dll *free_waiters = NULL;
/* free_waiters points to a doubly-linked list of free waiter structs. */
static nsync_atomic_uint32_ free_waiters_mu; /* spinlock; protects free_waiters */
#define waiter_for_thread __get_tls()->tib_nsync
void nsync_waiter_destroy (void *v) {
waiter *w = (waiter *) v;
/* Reset waiter_for_thread in case another thread-local variable reuses
the waiter in its destructor while the waiter is taken by the other
thread from free_waiters. This can happen as the destruction order
of thread-local variables can be arbitrary in some platform e.g.
POSIX. */
waiter_for_thread = NULL;
IGNORE_RACES_START ();
ASSERT ((w->flags & (WAITER_RESERVED|WAITER_IN_USE)) == WAITER_RESERVED);
w->flags &= ~WAITER_RESERVED;
nsync_spin_test_and_set_ (&free_waiters_mu, 1, 1, 0);
dll_make_first (&free_waiters, &w->nw.q);
ATM_STORE_REL (&free_waiters_mu, 0); /* release store */
IGNORE_RACES_END ();
}
/* Return a pointer to an unused waiter struct.
Ensures that the enclosed timer is stopped and its channel drained. */
waiter *nsync_waiter_new_ (void) {
struct Dll *q;
waiter *tw;
waiter *w;
tw = waiter_for_thread;
w = tw;
if (w == NULL || (w->flags & (WAITER_RESERVED|WAITER_IN_USE)) != WAITER_RESERVED) {
w = NULL;
nsync_spin_test_and_set_ (&free_waiters_mu, 1, 1, 0);
q = dll_first (free_waiters);
if (q != NULL) { /* If free list is non-empty, dequeue an item. */
dll_remove (&free_waiters, q);
w = DLL_WAITER (q);
}
ATM_STORE_REL (&free_waiters_mu, 0); /* release store */
if (w == NULL) { /* If free list was empty, allocate an item. */
w = (waiter *) nsync_malloc (sizeof (*w));
w->tag = WAITER_TAG;
w->nw.tag = NSYNC_WAITER_TAG;
nsync_mu_semaphore_init (&w->sem);
w->nw.sem = &w->sem;
dll_init (&w->nw.q);
NSYNC_ATOMIC_UINT32_STORE_ (&w->nw.waiting, 0);
w->nw.flags = NSYNC_WAITER_FLAG_MUCV;
ATM_STORE (&w->remove_count, 0);
dll_init (&w->same_condition);
w->flags = 0;
}
if (tw == NULL) {
w->flags |= WAITER_RESERVED;
waiter_for_thread = w;
}
}
w->flags |= WAITER_IN_USE;
return (w);
}
/* Return an unused waiter struct *w to the free pool. */
void nsync_waiter_free_ (waiter *w) {
ASSERT ((w->flags & WAITER_IN_USE) != 0);
w->flags &= ~WAITER_IN_USE;
if ((w->flags & WAITER_RESERVED) == 0) {
nsync_spin_test_and_set_ (&free_waiters_mu, 1, 1, 0);
dll_make_first (&free_waiters, &w->nw.q);
ATM_STORE_REL (&free_waiters_mu, 0); /* release store */
}
}

View file

@ -25,9 +25,6 @@ void nsync_yield_(void);
/* Retrieve the per-thread cache of the waiter object. Platform specific. */
void *nsync_per_thread_waiter_(void (*dest)(void *));
/* Set the per-thread cache of the waiter object. Platform specific. */
void nsync_set_per_thread_waiter_(void *v, void (*dest)(void *));
/* Used in spinloops to delay resumption of the loop.
Usage:
unsigned attempts = 0;
@ -244,8 +241,12 @@ waiter *nsync_dll_waiter_(struct Dll *e);
: DLL_CONTAINER(struct waiter_s, same_condition, e))
waiter *nsync_dll_waiter_samecond_(struct Dll *e);
void nsync_waiter_init_(waiter *);
void nsync_waiter_destroy_(waiter *);
/* Return a pointer to an unused waiter struct.
Ensures that the enclosed timer is stopped and its channel drained. */
waiter *nsync_waiter_new_(void);
/* Return an unused waiter struct *w to the free pool. */
void nsync_waiter_free_(waiter *w);
/* ---------- */

View file

@ -182,7 +182,7 @@ struct nsync_cv_wait_with_deadline_s {
nsync_mu *cv_mu;
nsync_time abs_deadline;
nsync_note cancel_note;
waiter w;
waiter *w;
};
/* Wait until awoken or timeout, or back out of wait if the thread is being cancelled. */
@ -190,11 +190,11 @@ static int nsync_cv_wait_with_deadline_impl_ (struct nsync_cv_wait_with_deadline
int outcome = 0;
int attempts = 0;
IGNORE_RACES_START ();
while (ATM_LOAD_ACQ (&c->w.nw.waiting) != 0) { /* acquire load */
while (ATM_LOAD_ACQ (&c->w->nw.waiting) != 0) { /* acquire load */
if (c->sem_outcome == 0) {
c->sem_outcome = nsync_sem_wait_with_cancel_ (&c->w, c->abs_deadline, c->cancel_note);
c->sem_outcome = nsync_sem_wait_with_cancel_ (c->w, c->abs_deadline, c->cancel_note);
}
if (c->sem_outcome != 0 && ATM_LOAD (&c->w.nw.waiting) != 0) {
if (c->sem_outcome != 0 && ATM_LOAD (&c->w->nw.waiting) != 0) {
/* A timeout or cancellation occurred, and no wakeup.
Acquire *pcv's spinlock, and confirm. */
c->old_word = nsync_spin_test_and_set_ (&c->pcv->word, CV_SPINLOCK,
@ -204,28 +204,28 @@ static int nsync_cv_wait_with_deadline_impl_ (struct nsync_cv_wait_with_deadline
The test of remove_count confirms that the waiter *w
is still governed by *pcv's spinlock; otherwise, some
other thread is about to set w.waiting==0. */
if (ATM_LOAD (&c->w.nw.waiting) != 0) {
if (c->remove_count == ATM_LOAD (&c->w.remove_count)) {
if (ATM_LOAD (&c->w->nw.waiting) != 0) {
if (c->remove_count == ATM_LOAD (&c->w->remove_count)) {
uint32_t old_value;
/* still in cv waiter queue */
/* Not woken, so remove *w from cv
queue, and declare a
timeout/cancellation. */
outcome = c->sem_outcome;
dll_remove (&c->pcv->waiters, &c->w.nw.q);
dll_remove (&c->pcv->waiters, &c->w->nw.q);
do {
old_value = ATM_LOAD (&c->w.remove_count);
} while (!ATM_CAS (&c->w.remove_count, old_value, old_value+1));
old_value = ATM_LOAD (&c->w->remove_count);
} while (!ATM_CAS (&c->w->remove_count, old_value, old_value+1));
if (dll_is_empty (c->pcv->waiters)) {
c->old_word &= ~(CV_NON_EMPTY);
}
ATM_STORE_REL (&c->w.nw.waiting, 0); /* release store */
ATM_STORE_REL (&c->w->nw.waiting, 0); /* release store */
}
}
/* Release spinlock. */
ATM_STORE_REL (&c->pcv->word, c->old_word); /* release store */
}
if (ATM_LOAD (&c->w.nw.waiting) != 0) {
if (ATM_LOAD (&c->w->nw.waiting) != 0) {
/* The delay here causes this thread ultimately to
yield to another that has dequeued this thread, but
has not yet set the waiting field to zero; a
@ -234,10 +234,10 @@ static int nsync_cv_wait_with_deadline_impl_ (struct nsync_cv_wait_with_deadline
attempts = nsync_spin_delay_ (attempts);
}
}
if (c->cv_mu != NULL && c->w.cv_mu == NULL) { /* waiter was moved to *pmu's queue, and woken. */
if (c->cv_mu != NULL && c->w->cv_mu == NULL) { /* waiter was moved to *pmu's queue, and woken. */
/* Requeue on *pmu using existing waiter struct; current thread
is the designated waker. */
nsync_mu_lock_slow_ (c->cv_mu, &c->w, MU_DESIG_WAKER, c->w.l_type);
nsync_mu_lock_slow_ (c->cv_mu, c->w, MU_DESIG_WAKER, c->w->l_type);
} else {
/* Traditional case: We've woken from the cv, and need to reacquire *pmu. */
if (c->is_reader_mu) {
@ -246,7 +246,7 @@ static int nsync_cv_wait_with_deadline_impl_ (struct nsync_cv_wait_with_deadline
(*c->lock) (c->pmu);
}
}
nsync_waiter_destroy_ (&c->w);
nsync_waiter_free_ (c->w);
IGNORE_RACES_END ();
return (outcome);
}
@ -288,7 +288,7 @@ int nsync_cv_wait_with_deadline_generic (nsync_cv *pcv, void *pmu,
struct nsync_cv_wait_with_deadline_s c;
IGNORE_RACES_START ();
nsync_waiter_init_ (&c.w);
c.w = nsync_waiter_new_ ();
c.abs_deadline = abs_deadline;
c.cancel_note = cancel_note;
c.cv_mu = NULL;
@ -296,19 +296,19 @@ int nsync_cv_wait_with_deadline_generic (nsync_cv *pcv, void *pmu,
c.pcv = pcv;
c.pmu = pmu;
ATM_STORE (&c.w.nw.waiting, 1);
c.w.cond.f = NULL; /* Not using a conditional critical section. */
c.w.cond.v = NULL;
c.w.cond.eq = NULL;
ATM_STORE (&c.w->nw.waiting, 1);
c.w->cond.f = NULL; /* Not using a conditional critical section. */
c.w->cond.v = NULL;
c.w->cond.eq = NULL;
if (lock == &void_mu_lock ||
lock == (void (*) (void *)) &nsync_mu_lock ||
lock == (void (*) (void *)) &nsync_mu_rlock) {
c.cv_mu = (nsync_mu *) pmu;
}
c.w.cv_mu = c.cv_mu; /* If *pmu is an nsync_mu, record its address, else record NULL. */
c.w->cv_mu = c.cv_mu; /* If *pmu is an nsync_mu, record its address, else record NULL. */
c.is_reader_mu = 0; /* If true, an nsync_mu in reader mode. */
if (c.cv_mu == NULL) {
c.w.l_type = NULL;
c.w->l_type = NULL;
} else {
uint32_t old_mu_word = ATM_LOAD (&c.cv_mu->word);
int is_writer = (old_mu_word & MU_WHELD_IF_NON_ZERO) != 0;
@ -318,9 +318,9 @@ int nsync_cv_wait_with_deadline_generic (nsync_cv *pcv, void *pmu,
nsync_panic_ ("mu held in reader and writer mode simultaneously "
"on entry to nsync_cv_wait_with_deadline()\n");
}
c.w.l_type = nsync_writer_type_;
c.w->l_type = nsync_writer_type_;
} else if (is_reader) {
c.w.l_type = nsync_reader_type_;
c.w->l_type = nsync_reader_type_;
c.is_reader_mu = 1;
} else {
nsync_panic_ ("mu not held on entry to nsync_cv_wait_with_deadline()\n");
@ -329,8 +329,8 @@ int nsync_cv_wait_with_deadline_generic (nsync_cv *pcv, void *pmu,
/* acquire spinlock, set non-empty */
c.old_word = nsync_spin_test_and_set_ (&pcv->word, CV_SPINLOCK, CV_SPINLOCK|CV_NON_EMPTY, 0);
dll_make_last (&pcv->waiters, &c.w.nw.q);
c.remove_count = ATM_LOAD (&c.w.remove_count);
dll_make_last (&pcv->waiters, &c.w->nw.q);
c.remove_count = ATM_LOAD (&c.w->remove_count);
/* Release the spin lock. */
ATM_STORE_REL (&pcv->word, c.old_word|CV_NON_EMPTY); /* release store */

View file

@ -149,7 +149,7 @@ int nsync_mu_wait_with_deadline (nsync_mu *mu,
lock_type *l_type;
int first_wait;
int condition_is_true;
waiter w[1];
waiter *w;
int outcome;
/* Work out in which mode the lock is held. */
uint32_t old_word;
@ -165,12 +165,12 @@ int nsync_mu_wait_with_deadline (nsync_mu *mu,
l_type = nsync_reader_type_;
}
w->tag = 0; /* avoid allocating system resources */
first_wait = 1; /* first time through the loop below. */
condition_is_true = (condition == NULL || (*condition) (condition_arg));
/* Loop until either the condition becomes true, or "outcome" indicates
cancellation or timeout. */
w = NULL;
outcome = 0;
while (outcome == 0 && !condition_is_true) {
uint32_t has_condition;
@ -180,10 +180,8 @@ int nsync_mu_wait_with_deadline (nsync_mu *mu,
int sem_outcome;
unsigned attempts;
int have_lock;
/* initialize the waiter if we haven't already */
if (!w->tag) {
nsync_waiter_init_ (w);
if (w == NULL) {
w = nsync_waiter_new_ (); /* get a waiter struct if we need one. */
}
/* Prepare to wait. */
@ -261,7 +259,9 @@ int nsync_mu_wait_with_deadline (nsync_mu *mu,
}
condition_is_true = (condition == NULL || (*condition) (condition_arg));
}
nsync_waiter_destroy_ (w);
if (w != NULL) {
nsync_waiter_free_ (w); /* free waiter if we allocated one. */
}
if (condition_is_true) {
outcome = 0; /* condition is true trumps other outcomes. */
}

View file

@ -49,8 +49,7 @@ int nsync_wait_n (void *mu, void (*lock) (void *), void (*unlock) (void *),
int unlocked = 0;
int j;
int enqueued = 1;
waiter w[1];
nsync_waiter_init_ (w);
waiter *w = nsync_waiter_new_ ();
struct nsync_waiter_s nw_set[4];
struct nsync_waiter_s *nw = nw_set;
if (count > (int) (sizeof (nw_set) / sizeof (nw_set[0]))) {
@ -97,10 +96,10 @@ int nsync_wait_n (void *mu, void (*lock) (void *), void (*unlock) (void *),
}
}
nsync_waiter_destroy_ (w);
if (nw != nw_set) {
free (nw);
}
nsync_waiter_free_ (w);
if (unlocked) {
(*lock) (mu);
}

View file

@ -156,10 +156,9 @@ void nsync_mu_lock (nsync_mu *mu) {
if ((old_word&MU_WZERO_TO_ACQUIRE) != 0 ||
!ATM_CAS_ACQ (&mu->word, old_word,
(old_word+MU_WADD_TO_ACQUIRE) & ~MU_WCLEAR_ON_ACQUIRE)) {
waiter w;
nsync_waiter_init_ (&w);
nsync_mu_lock_slow_ (mu, &w, 0, nsync_writer_type_);
nsync_waiter_destroy_ (&w);
waiter *w = nsync_waiter_new_ ();
nsync_mu_lock_slow_ (mu, w, 0, nsync_writer_type_);
nsync_waiter_free_ (w);
}
}
IGNORE_RACES_END ();
@ -192,10 +191,9 @@ void nsync_mu_rlock (nsync_mu *mu) {
if ((old_word&MU_RZERO_TO_ACQUIRE) != 0 ||
!ATM_CAS_ACQ (&mu->word, old_word,
(old_word+MU_RADD_TO_ACQUIRE) & ~MU_RCLEAR_ON_ACQUIRE)) {
waiter w;
nsync_waiter_init_ (&w);
nsync_mu_lock_slow_ (mu, &w, 0, nsync_reader_type_);
nsync_waiter_destroy_ (&w);
waiter *w = nsync_waiter_new_ ();
nsync_mu_lock_slow_ (mu, w, 0, nsync_reader_type_);
nsync_waiter_free_ (w);
}
}
IGNORE_RACES_END ();

View file

@ -21,6 +21,8 @@ struct nsync_waiter_s {
/* set if waiter is embedded in Mu/CV's internal structures */
#define NSYNC_WAITER_FLAG_MUCV 0x1
void nsync_waiter_destroy(void *);
COSMOPOLITAN_C_END_
#endif /* !(__ASSEMBLER__ + __LINKER__ + 0) */
#endif /* COSMOPOLITAN_LIBC_THREAD_WAIT_INTERNAL_H_ */