Clean up some of the threading code

This commit is contained in:
Justine Tunney 2022-09-08 11:54:56 -07:00
parent 0547eabcd6
commit 9f963dc597
No known key found for this signature in database
GPG key ID: BE714B4575D6E328
62 changed files with 175 additions and 582 deletions

View file

@ -1,69 +0,0 @@
/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│
vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi
Copyright 2020 Justine Alexandra Roberts Tunney
Permission to use, copy, modify, and/or distribute this software for
any purpose with or without fee is hereby granted, provided that the
above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/errno.h"
#include "libc/runtime/stack.h"
#include "libc/thread/thread.h"
#define MIN_STACKSIZE (8 * PAGESIZE) // includes guard, rounds up to FRAMESIZE
#define MIN_GUARDSIZE PAGESIZE
// CTOR/DTOR
int cthread_attr_init(cthread_attr_t* attr) {
attr->stacksize = GetStackSize();
attr->guardsize = PAGESIZE;
attr->mode = CTHREAD_CREATE_JOINABLE;
return 0;
}
int cthread_attr_destroy(cthread_attr_t* attr) {
(void)attr;
return 0;
}
// stacksize
int cthread_attr_setstacksize(cthread_attr_t* attr, size_t size) {
if (size & (PAGESIZE - 1)) return EINVAL;
if (size < MIN_STACKSIZE) return EINVAL;
attr->stacksize = size;
return 0;
}
size_t cthread_attr_getstacksize(const cthread_attr_t* attr) {
return attr->stacksize;
}
// guardsize
int cthread_attr_setguardsize(cthread_attr_t* attr, size_t size) {
if (size & (PAGESIZE - 1)) return EINVAL;
if (size < MIN_GUARDSIZE) return EINVAL;
attr->guardsize = size;
return 0;
}
size_t cthread_attr_getguardsize(const cthread_attr_t* attr) {
return attr->guardsize;
}
// detachstate
int cthread_attr_setdetachstate(cthread_attr_t* attr, int mode) {
if (mode & ~(CTHREAD_CREATE_JOINABLE | CTHREAD_CREATE_DETACHED))
return EINVAL;
attr->mode = mode;
return 0;
}
int cthread_attr_getdetachstate(const cthread_attr_t* attr) {
return attr->mode;
}

View file

@ -53,7 +53,7 @@ static int PosixThread(void *arg, int tid) {
struct PosixThread *pt = arg;
enum PosixThreadStatus status;
if (!setjmp(pt->exiter)) {
((cthread_t)__get_tls())->pthread = pt;
((cthread_t)__get_tls())->pthread = (pthread_t)pt;
pt->rc = pt->start_routine(pt->arg);
}
if (weaken(_pthread_key_destruct)) {
@ -72,7 +72,20 @@ static int PosixThread(void *arg, int tid) {
}
/**
* Creates thread.
* Creates thread, e.g.
*
* void *worker(void *arg) {
* fputs(arg, stdout);
* return "there\n";
* }
*
* int main() {
* void *result;
* pthread_t id;
* pthread_create(&id, 0, worker, "hi ");
* pthread_join(id, &result);
* fputs(result, stdout);
* }
*
* Here's the OSI model of threads in Cosmopolitan:
*
@ -216,7 +229,7 @@ int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
}
if (thread) {
*thread = pt;
*thread = (pthread_t)pt;
}
return 0;
}

View file

@ -30,7 +30,7 @@
*/
int pthread_detach(pthread_t thread) {
enum PosixThreadStatus status;
struct PosixThread *pt = thread;
struct PosixThread *pt = (struct PosixThread *)thread;
for (;;) {
status = atomic_load_explicit(&pt->status, memory_order_relaxed);
if (status == kPosixThreadDetached || status == kPosixThreadZombie) {

View file

@ -25,7 +25,7 @@
* @return nonzero if equal, otherwise zero
*/
int pthread_equal(pthread_t t1, pthread_t t2) {
struct PosixThread *a = t1;
struct PosixThread *b = t2;
struct PosixThread *a = (struct PosixThread *)t1;
struct PosixThread *b = (struct PosixThread *)t2;
return a->spawn.ptid == b->spawn.ptid;
}

View file

@ -37,7 +37,7 @@
*/
wontreturn void pthread_exit(void *rc) {
struct PosixThread *pt;
if ((pt = ((cthread_t)__get_tls())->pthread)) {
if ((pt = (struct PosixThread *)((cthread_t)__get_tls())->pthread)) {
pt->rc = rc;
_gclongjmp(pt->exiter, 1);
} else {

View file

@ -22,7 +22,7 @@
#include "libc/thread/thread.h"
int pthread_getattr_np(pthread_t thread, pthread_attr_t *attr) {
struct PosixThread *pt = thread;
struct PosixThread *pt = (struct PosixThread *)thread;
memcpy(attr, &pt->attr, sizeof(pt->attr));
return 0;
}

View file

@ -23,6 +23,6 @@
* Returns thread id of POSIX thread.
*/
int64_t pthread_getunique_np(pthread_t thread) {
struct PosixThread *pt = thread;
struct PosixThread *pt = (struct PosixThread *)thread;
return pt->spawn.ptid;
}

View file

@ -31,7 +31,7 @@
* @raise EDEADLK if thread is detached
*/
int pthread_join(pthread_t thread, void **value_ptr) {
struct PosixThread *pt = thread;
struct PosixThread *pt = (struct PosixThread *)thread;
if (pt->status == kPosixThreadDetached || //
pt->status == kPosixThreadZombie) {
assert(!"badjoin");

View file

@ -1,28 +0,0 @@
/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│
vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi
Copyright 2022 Justine Alexandra Roberts Tunney
Permission to use, copy, modify, and/or distribute this software for
any purpose with or without fee is hereby granted, provided that the
above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/nexgen32e/gettls.h"
#include "libc/nexgen32e/threaded.h"
#include "libc/thread/thread.h"
/**
* Returns thread descriptor of the current thread.
*/
cthread_t(cthread_self)(void) {
return (cthread_t)__get_tls();
}

View file

@ -1,133 +0,0 @@
/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│
vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi
Copyright 2020 Justine Alexandra Roberts Tunney
Permission to use, copy, modify, and/or distribute this software for
any purpose with or without fee is hereby granted, provided that the
above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/intrin/atomic.h"
#include "libc/calls/calls.h"
#include "libc/thread/thread.h"
#define CTHREAD_THREAD_VAL_BITS 32
static void Pause(int attempt) {
if (attempt < 16) {
for (int i = 0; i < (1 << attempt); ++i) {
__builtin_ia32_pause();
}
} else {
sched_yield();
}
}
/**
* Initializes semaphore.
*/
int cthread_sem_init(cthread_sem_t* sem, int count) {
sem->linux.count = count;
return 0;
}
/**
* Destroys semaphore.
*/
int cthread_sem_destroy(cthread_sem_t* sem) {
(void)sem;
return 0;
}
/**
* Notifies a thread waiting on semaphore.
*/
int cthread_sem_signal(cthread_sem_t* sem) {
uint64_t count;
count = atomic_fetch_add(&sem->linux.count, 1);
if ((count >> CTHREAD_THREAD_VAL_BITS)) {
// WARNING: an offset of 4 bytes would be required on little-endian archs
void* wait_address = &sem->linux.count;
cthread_memory_wake32(wait_address, 1);
}
return 0;
}
/**
* Waits on semaphore with kernel assistance.
*/
int cthread_sem_wait_futex(cthread_sem_t* sem, const struct timespec* timeout) {
uint64_t count;
// record current thread as waiter
count = atomic_fetch_add(&sem->linux.count,
(uint64_t)1 << CTHREAD_THREAD_VAL_BITS);
for (;;) {
// try to acquire the semaphore, as well as remove itself from waiters
while ((uint32_t)count > 0) {
// without spin, we could miss a futex wake
if (atomic_compare_exchange_weak(
&sem->linux.count, &count,
count - 1 - ((uint64_t)1 << CTHREAD_THREAD_VAL_BITS))) {
return 0;
}
}
// WARNING: an offset of 4 bytes would be required on little-endian archs
void* wait_address = &sem->linux.count;
cthread_memory_wait32(wait_address, count, timeout);
count = atomic_load(&sem->linux.count);
}
return 0;
}
/**
* Waits on semaphore without kernel assistance.
*/
int cthread_sem_wait_spin(cthread_sem_t* sem, uint64_t count, int spin,
const struct timespec* timeout) {
// spin on pause
for (int attempt = 0; attempt < spin; ++attempt) {
// if ((count >> CTHREAD_THREAD_VAL_BITS) != 0) break;
while ((uint32_t)count > 0) {
// spin is useful if multiple waiters can acquire the semaphore at the
// same time
if (atomic_compare_exchange_weak(&sem->linux.count, &count, count - 1)) {
return 0;
}
}
Pause(attempt);
}
return cthread_sem_wait_futex(sem, timeout);
}
/**
* Waits on semaphore.
*/
int cthread_sem_wait(cthread_sem_t* sem, int spin,
const struct timespec* timeout) {
uint64_t count = atomic_load(&sem->linux.count);
// uncontended
while ((uint32_t)count > 0) {
// spin is useful if multiple waiters can acquire the semaphore at the same
// time
if (atomic_compare_exchange_weak(&sem->linux.count, &count, count - 1)) {
return 0;
}
}
return cthread_sem_wait_spin(sem, count, spin, timeout);
}

View file

@ -1,23 +1,11 @@
#ifndef COSMOPOLITAN_LIBC_THREAD_THREAD_H_
#define COSMOPOLITAN_LIBC_THREAD_THREAD_H_
#include "libc/calls/struct/timespec.h"
#include "libc/intrin/pthread.h"
#include "libc/runtime/runtime.h"
#include "libc/str/locale.h"
#define CTHREAD_CREATE_DETACHED 1
#define CTHREAD_CREATE_JOINABLE 0
#if !(__ASSEMBLER__ + __LINKER__ + 0)
COSMOPOLITAN_C_START_
enum cthread_state {
cthread_started = 0,
cthread_joining = 1,
cthread_finished = 2,
cthread_detached = 4,
};
struct FtraceTls { /* 16 */
bool once; /* 0 */
bool noreentry; /* 1 */
@ -38,32 +26,6 @@ struct cthread_descriptor_t {
typedef struct cthread_descriptor_t *cthread_t;
typedef union cthread_sem_t {
struct {
uint64_t count;
} linux;
} cthread_sem_t;
typedef struct cthread_attr_t {
size_t stacksize, guardsize;
int mode;
} cthread_attr_t;
cthread_t cthread_self(void);
int cthread_attr_init(cthread_attr_t *);
int cthread_attr_destroy(cthread_attr_t *);
int cthread_attr_setstacksize(cthread_attr_t *, size_t);
size_t thread_attr_getstacksize(const cthread_attr_t *);
int cthread_attr_setguardsize(cthread_attr_t *, size_t);
size_t cthread_attr_getguardsize(const cthread_attr_t *);
int cthread_attr_setdetachstate(cthread_attr_t *, int);
int cthread_attr_getdetachstate(const cthread_attr_t *);
int cthread_sem_init(cthread_sem_t *, int);
int cthread_sem_destroy(cthread_sem_t *);
int cthread_sem_wait(cthread_sem_t *, int, const struct timespec *);
int cthread_sem_signal(cthread_sem_t *);
int cthread_memory_wait32(int *, int, const struct timespec *);
int cthread_memory_wake32(int *, int);
void cthread_ungarbage(void);
COSMOPOLITAN_C_END_

View file

@ -1,43 +0,0 @@
/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│
vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi
Copyright 2020 Justine Alexandra Roberts Tunney
Permission to use, copy, modify, and/or distribute this software for
any purpose with or without fee is hereby granted, provided that the
above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/calls/calls.h"
#include "libc/calls/struct/timespec.h"
#include "libc/dce.h"
#include "libc/errno.h"
#include "libc/intrin/atomic.h"
#include "libc/intrin/futex.internal.h"
#include "libc/intrin/pthread.h"
#include "libc/thread/thread.h"
int cthread_memory_wait32(int* addr, int val, const struct timespec* timeout) {
size_t size;
if (IsLinux() || IsOpenbsd()) {
return _futex_wait(addr, val, PTHREAD_PROCESS_SHARED, timeout);
} else {
return sched_yield();
}
}
int cthread_memory_wake32(int* addr, int n) {
if (IsLinux() || IsOpenbsd()) {
return _futex_wake(addr, n, PTHREAD_PROCESS_SHARED);
} else {
return 0;
}
}