Implement pthread_atfork()

If threads are being used, then fork() will now acquire and release and
runtime locks so that fork() may be safely used from threads. This also
makes vfork() thread safe, because pthread mutexes will do nothing when
the process is a child of vfork(). More torture tests have been written
to confirm this all works like a charm. Additionally:

- Invent hexpcpy() api
- Rename nsync_malloc_() to kmalloc()
- Complete posix named semaphore implementation
- Make pthread_create() asynchronous signal safe
- Add rm, rmdir, and touch to command interpreter builtins
- Invent sigisprecious() and modify sigset functions to use it
- Add unit tests for posix_spawn() attributes and fix its bugs

One unresolved problem is the reclaiming of *NSYNC waiter memory in the
forked child processes, within apps which have threads waiting on locks
This commit is contained in:
Justine Tunney 2022-10-16 12:05:08 -07:00
parent 64c284003d
commit 60cb435cb4
No known key found for this signature in database
GPG key ID: BE714B4575D6E328
124 changed files with 2169 additions and 718 deletions

View file

@ -1,6 +1,7 @@
#ifndef COSMOPOLITAN_LIBC_THREAD_POSIXTHREAD_INTERNAL_H_
#define COSMOPOLITAN_LIBC_THREAD_POSIXTHREAD_INTERNAL_H_
#include "libc/calls/struct/sched_param.h"
#include "libc/calls/struct/sigset.h"
#include "libc/runtime/runtime.h"
#include "libc/thread/thread.h"
#include "libc/thread/tls.h"
@ -15,6 +16,8 @@ COSMOPOLITAN_C_START_
* @fileoverview Cosmopolitan POSIX Thread Internals
*/
typedef void (*atfork_f)(void);
// LEGAL TRANSITIONS ┌──> TERMINATED
// pthread_create ─┬─> JOINABLE ─┴┬─> DETACHED ──> ZOMBIE
// └──────────────┘
@ -73,6 +76,7 @@ struct PosixThread {
struct CosmoTib *tib; // middle of tls allocation
jmp_buf exiter; // for pthread_exit
pthread_attr_t attr;
sigset_t sigmask;
struct _pthread_cleanup_buffer *cleanup;
};
@ -82,8 +86,7 @@ hidden extern uint64_t _pthread_key_usage[(PTHREAD_KEYS_MAX + 63) / 64];
hidden extern pthread_key_dtor _pthread_key_dtor[PTHREAD_KEYS_MAX];
hidden extern _Thread_local void *_pthread_keys[PTHREAD_KEYS_MAX];
void _pthread_atfork(int) hidden;
void _pthread_funlock(pthread_mutex_t *, int) hidden;
int _pthread_atfork(atfork_f, atfork_f, atfork_f) hidden;
int _pthread_reschedule(struct PosixThread *) hidden;
int _pthread_setschedparam_freebsd(int, int, const struct sched_param *) hidden;
void _pthread_free(struct PosixThread *) hidden;
@ -91,9 +94,15 @@ void _pthread_cleanup(struct PosixThread *) hidden;
void _pthread_ungarbage(void) hidden;
void _pthread_wait(struct PosixThread *) hidden;
void _pthread_zombies_add(struct PosixThread *) hidden;
void _pthread_zombies_purge(void) hidden;
void _pthread_zombies_decimate(void) hidden;
void _pthread_zombies_harvest(void) hidden;
void _pthread_key_destruct(void *[PTHREAD_KEYS_MAX]) hidden;
void _pthread_key_lock(void) hidden;
void _pthread_key_unlock(void) hidden;
void _pthread_onfork_prepare(void) hidden;
void _pthread_onfork_parent(void) hidden;
void _pthread_onfork_child(void) hidden;
COSMOPOLITAN_C_END_
#endif /* !(__ASSEMBLER__ + __LINKER__ + 0) */

View file

@ -17,16 +17,63 @@
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/assert.h"
#include "libc/runtime/runtime.h"
#include "libc/thread/semaphore.h"
#include "libc/intrin/kmalloc.h"
#include "libc/thread/posixthread.internal.h"
#include "libc/thread/thread.h"
#include "libc/thread/tls.h"
/**
* Closes named semaphore.
*
* @param sem was created with sem_open()
* @return 0 on success, or -1 w/ errno
*/
int sem_close(sem_t *sem) {
_npassert(!munmap(sem, FRAMESIZE));
return 0;
static struct AtForks {
pthread_mutex_t lock;
struct AtFork {
struct AtFork *p[2];
atfork_f f[3];
} * list;
} _atforks;
static void _pthread_onfork(int i) {
struct AtFork *a;
struct PosixThread *pt;
_unassert(0 <= i && i <= 2);
if (!i) pthread_mutex_lock(&_atforks.lock);
for (a = _atforks.list; a; a = a->p[!i]) {
if (a->f[i]) a->f[i]();
_atforks.list = a;
}
if (i) pthread_mutex_unlock(&_atforks.lock);
if (i == 2) {
_pthread_zombies_purge();
if (__tls_enabled) {
pt = (struct PosixThread *)__get_tls()->tib_pthread;
pt->flags |= PT_MAINTHREAD;
}
}
}
void _pthread_onfork_prepare(void) {
_pthread_onfork(0);
}
void _pthread_onfork_parent(void) {
_pthread_onfork(1);
}
void _pthread_onfork_child(void) {
_pthread_onfork(2);
}
int _pthread_atfork(atfork_f prepare, atfork_f parent, atfork_f child) {
int rc;
struct AtFork *a;
a = kmalloc(sizeof(struct AtFork));
a->f[0] = prepare;
a->f[1] = parent;
a->f[2] = child;
pthread_mutex_lock(&_atforks.lock);
a->p[0] = 0;
a->p[1] = _atforks.list;
if (_atforks.list) _atforks.list->p[0] = a;
_atforks.list = a;
pthread_mutex_unlock(&_atforks.lock);
rc = 0;
return rc;
}

View file

@ -21,6 +21,7 @@
#include "libc/calls/sched-sysv.internal.h"
#include "libc/calls/state.internal.h"
#include "libc/calls/struct/sigaltstack.h"
#include "libc/calls/struct/sigset.h"
#include "libc/calls/syscall-sysv.internal.h"
#include "libc/dce.h"
#include "libc/errno.h"
@ -41,6 +42,7 @@
#include "libc/sysv/consts/clone.h"
#include "libc/sysv/consts/map.h"
#include "libc/sysv/consts/prot.h"
#include "libc/sysv/consts/sig.h"
#include "libc/sysv/consts/ss.h"
#include "libc/sysv/errfuns.h"
#include "libc/thread/posixthread.internal.h"
@ -52,6 +54,7 @@
STATIC_YOINK("nsync_mu_lock");
STATIC_YOINK("nsync_mu_unlock");
STATIC_YOINK("_pthread_atfork");
#define MAP_ANON_OPENBSD 0x1000
#define MAP_STACK_OPENBSD 0x4000
@ -76,30 +79,6 @@ void _pthread_free(struct PosixThread *pt) {
free(pt);
}
void _pthread_funlock(pthread_mutex_t *mu, int parent_tid) {
if (mu->_type == PTHREAD_MUTEX_NORMAL ||
(atomic_load_explicit(&mu->_lock, memory_order_relaxed) &&
mu->_owner != parent_tid)) {
atomic_store_explicit(&mu->_lock, 0, memory_order_relaxed);
mu->_nsync = 0;
mu->_depth = 0;
mu->_owner = 0;
}
}
void _pthread_atfork(int parent_tid) {
FILE *f;
if (_weaken(dlmalloc_atfork)) _weaken(dlmalloc_atfork)();
_pthread_funlock(&__fds_lock_obj, parent_tid);
_pthread_funlock(&__sig_lock_obj, parent_tid);
_pthread_funlock(&__fflush_lock_obj, parent_tid);
for (int i = 0; i < __fflush.handles.i; ++i) {
if ((f = __fflush.handles.p[i])) {
_pthread_funlock((pthread_mutex_t *)f->lock, parent_tid);
}
}
}
static int PosixThread(void *arg, int tid) {
struct PosixThread *pt = arg;
enum PosixThreadStatus status;
@ -118,6 +97,7 @@ static int PosixThread(void *arg, int tid) {
// set long jump handler so pthread_exit can bring control back here
if (!setjmp(pt->exiter)) {
__get_tls()->tib_pthread = (pthread_t)pt;
sigprocmask(SIG_SETMASK, &pt->sigmask, 0);
pt->rc = pt->start_routine(pt->arg);
// ensure pthread_cleanup_pop(), and pthread_exit() popped cleanup
_npassert(!pt->cleanup);
@ -158,62 +138,12 @@ static int FixupCustomStackOnOpenbsd(pthread_attr_t *attr) {
}
}
/**
* Creates thread, e.g.
*
* void *worker(void *arg) {
* fputs(arg, stdout);
* return "there\n";
* }
*
* int main() {
* void *result;
* pthread_t id;
* pthread_create(&id, 0, worker, "hi ");
* pthread_join(id, &result);
* fputs(result, stdout);
* }
*
* Here's the OSI model of threads in Cosmopolitan:
*
*
* pthread_create() - Standard
* Abstraction
*
* clone() - Polyfill
*
* - Kernel
* Interfaces
* sys_clone tfork
* CreateThread
*
* bsdthread_create thr_new
*
*
* _lwp_create
*
*
* @param thread if non-null is used to output the thread id
* upon successful completion
* @param attr points to launch configuration, or may be null
* to use sensible defaults; it must be initialized using
* pthread_attr_init()
* @param start_routine is your thread's callback function
* @param arg is an arbitrary value passed to `start_routine`
* @return 0 on success, or errno on error
* @raise EAGAIN if resources to create thread weren't available
* @raise EINVAL if `attr` was supplied and had unnaceptable data
* @raise EPERM if scheduling policy was requested and user account
* isn't authorized to use it
* @returnserrno
* @threadsafe
*/
errno_t pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg) {
static errno_t pthread_create_impl(pthread_t *thread,
const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg,
sigset_t oldsigs) {
int rc, e = errno;
struct PosixThread *pt;
__require_tls();
_pthread_zombies_decimate();
// create posix thread object
if (!(pt = calloc(1, sizeof(struct PosixThread)))) {
@ -323,6 +253,7 @@ errno_t pthread_create(pthread_t *thread, const pthread_attr_t *attr,
}
// launch PosixThread(pt) in new thread
pt->sigmask = oldsigs;
if (clone(PosixThread, pt->attr.__stackaddr,
pt->attr.__stacksize - (IsOpenbsd() ? 16 : 0),
CLONE_VM | CLONE_THREAD | CLONE_FS | CLONE_FILES | CLONE_SIGHAND |
@ -340,3 +271,66 @@ errno_t pthread_create(pthread_t *thread, const pthread_attr_t *attr,
}
return 0;
}
/**
* Creates thread, e.g.
*
* void *worker(void *arg) {
* fputs(arg, stdout);
* return "there\n";
* }
*
* int main() {
* void *result;
* pthread_t id;
* pthread_create(&id, 0, worker, "hi ");
* pthread_join(id, &result);
* fputs(result, stdout);
* }
*
* Here's the OSI model of threads in Cosmopolitan:
*
*
* pthread_create() - Standard
* Abstraction
*
* clone() - Polyfill
*
* - Kernel
* Interfaces
* sys_clone tfork
* CreateThread
*
* bsdthread_create thr_new
*
*
* _lwp_create
*
*
* @param thread if non-null is used to output the thread id
* upon successful completion
* @param attr points to launch configuration, or may be null
* to use sensible defaults; it must be initialized using
* pthread_attr_init()
* @param start_routine is your thread's callback function
* @param arg is an arbitrary value passed to `start_routine`
* @return 0 on success, or errno on error
* @raise EAGAIN if resources to create thread weren't available
* @raise EINVAL if `attr` was supplied and had unnaceptable data
* @raise EPERM if scheduling policy was requested and user account
* isn't authorized to use it
* @returnserrno
* @threadsafe
*/
errno_t pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg) {
errno_t rc;
sigset_t blocksigs, oldsigs;
__require_tls();
_pthread_zombies_decimate();
sigfillset(&blocksigs);
_npassert(!sigprocmask(SIG_SETMASK, &blocksigs, &oldsigs));
rc = pthread_create_impl(thread, attr, start_routine, arg, oldsigs);
_npassert(!sigprocmask(SIG_SETMASK, &oldsigs, 0));
return rc;
}

View file

@ -23,9 +23,7 @@
#include "libc/thread/spawn.h"
#include "libc/thread/thread.h"
/**
* @fileoverview Memory collector for detached threads.
*/
// TODO(jart): track all threads, not just zombies
static struct Zombie {
struct Zombie *next;
@ -70,6 +68,16 @@ void _pthread_zombies_harvest(void) {
}
}
void _pthread_zombies_purge(void) {
struct Zombie *z, *n;
while ((z = atomic_load_explicit(&_pthread_zombies, memory_order_relaxed))) {
_pthread_free(z->pt);
n = z->next;
free(z);
atomic_store_explicit(&_pthread_zombies, n, memory_order_relaxed);
}
}
__attribute__((__constructor__)) static void init(void) {
atexit(_pthread_zombies_harvest);
}

View file

@ -16,17 +16,36 @@
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/assert.h"
#include "libc/intrin/atomic.h"
#include "libc/limits.h"
#include "libc/sysv/errfuns.h"
#include "libc/thread/semaphore.h"
/**
* Destroys unnamed semaphore.
*
* If `sem` was successfully initialized by sem_init() then
* sem_destroy() may be called multiple times, before it may be
* reinitialized again by calling sem_init().
*
* Calling sem_destroy() on a semaphore created by sem_open() has
* undefined behavior. Using `sem` after calling sem_destroy() is
* undefined behavior that will cause semaphore APIs to either crash or
* raise `EINVAL` until `sem` is passed to sem_init() again.
*
* @param sem was created by sem_init()
* @return 0 on success, or -1 w/ errno
* @raise EINVAL if `sem` wasn't valid
* @raise EBUSY if `sem` has waiters
*/
int sem_destroy(sem_t *sem) {
int waiters;
_npassert(sem->sem_magic != SEM_MAGIC_NAMED);
if (sem->sem_magic != SEM_MAGIC_UNNAMED) return einval();
waiters = atomic_load_explicit(&sem->sem_waiters, memory_order_relaxed);
_unassert(waiters >= 0);
if (waiters) return ebusy();
atomic_store_explicit(&sem->sem_value, INT_MIN, memory_order_relaxed);
return 0;
}

View file

@ -16,6 +16,8 @@
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/assert.h"
#include "libc/calls/calls.h"
#include "libc/intrin/atomic.h"
#include "libc/thread/semaphore.h"
@ -27,6 +29,7 @@
* @return 0 on success, or -1 w/ errno
*/
int sem_getvalue(sem_t *sem, int *sval) {
_unassert(sem->sem_pshared || sem->sem_pid == getpid());
*sval = atomic_load_explicit(&sem->sem_value, memory_order_relaxed);
return 0;
}

View file

@ -16,6 +16,8 @@
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/calls/calls.h"
#include "libc/dce.h"
#include "libc/intrin/atomic.h"
#include "libc/limits.h"
#include "libc/sysv/errfuns.h"
@ -25,15 +27,25 @@
/**
* Initializes unnamed semaphore.
*
* Calling sem_init() on an already initialized semaphore is undefined.
*
* @param sem should make its way to sem_destroy() if this succeeds
* @param pshared if semaphore may be shared between processes
* @param pshared if semaphore may be shared between processes, provided
* `sem` is backed by `mmap(MAP_ANONYMOUS | MAP_SHARED)` memory
* @param value is initial count of semaphore
* @return 0 on success, or -1 w/ errno
* @raise EINVAL if `value` exceeds `SEM_VALUE_MAX`
* @raise EPERM on OpenBSD if `pshared` is true
*/
int sem_init(sem_t *sem, int pshared, unsigned value) {
if (value > SEM_VALUE_MAX) return einval();
// OpenBSD MAP_ANONYMOUS|MAP_SHARED memory is kind of busted.
// The OpenBSD implementation of sem_init() also EPERMs here.
if (IsOpenbsd() && pshared) return eperm();
sem->sem_magic = SEM_MAGIC_UNNAMED;
atomic_store_explicit(&sem->sem_value, value, memory_order_relaxed);
sem->sem_pshared = pshared;
sem->sem_pshared = !!pshared;
sem->sem_pid = getpid();
sem->sem_waiters = 0;
return 0;
}

View file

@ -18,60 +18,298 @@
*/
#include "libc/assert.h"
#include "libc/calls/calls.h"
#include "libc/calls/struct/stat.h"
#include "libc/dce.h"
#include "libc/errno.h"
#include "libc/intrin/atomic.h"
#include "libc/intrin/nopl.internal.h"
#include "libc/intrin/strace.internal.h"
#include "libc/limits.h"
#include "libc/mem/mem.h"
#include "libc/runtime/runtime.h"
#include "libc/str/str.h"
#include "libc/sysv/consts/at.h"
#include "libc/sysv/consts/map.h"
#include "libc/sysv/consts/o.h"
#include "libc/sysv/consts/prot.h"
#include "libc/sysv/errfuns.h"
#include "libc/thread/semaphore.h"
#include "libc/thread/semaphore.internal.h"
#include "libc/thread/thread.h"
#include "libc/thread/tls.h"
/**
* Initializes and opens named semaphore.
*
* @param name can be absolute path or should be component w/o slashes
* @param oflga can have `O_CREAT` and/or `O_EXCL`
* @return semaphore object which needs sem_close(), or SEM_FAILED w/ errno
* @raise ENOTDIR if a directory component in `name` exists as non-directory
* @raise ENAMETOOLONG if symlink-resolved `name` length exceeds `PATH_MAX`
* @raise ENAMETOOLONG if component in `name` exists longer than `NAME_MAX`
* @raise ELOOP if `flags` had `O_NOFOLLOW` and `name` is a symbolic link
* @raise ENOSPC if file system is full when `name` would be `O_CREAT`ed
* @raise ELOOP if a loop was detected resolving components of `name`
* @raise EEXIST if `O_CREAT|O_EXCL` is used and semaphore exists
* @raise EACCES if we didn't have permission to create semaphore
* @raise EMFILE if process `RLIMIT_NOFILE` has been reached
* @raise ENFILE if system-wide file limit has been reached
* @raise EINTR if signal was delivered instead
*/
sem_t *sem_open(const char *name, int oflag, ...) {
static struct Semaphores {
pthread_once_t once;
pthread_mutex_t lock;
struct Semaphore {
struct Semaphore *next;
sem_t *sem;
char *path;
bool dead;
int refs;
} * list;
} g_semaphores;
static void sem_open_lock(void) {
pthread_mutex_lock(&g_semaphores.lock);
}
static void sem_open_unlock(void) {
pthread_mutex_unlock(&g_semaphores.lock);
}
static void sem_open_funlock(void) {
pthread_mutex_init(&g_semaphores.lock, 0);
}
static void sem_open_setup(void) {
sem_open_funlock();
pthread_atfork(sem_open_lock, sem_open_unlock, sem_open_funlock);
}
static void sem_open_init(void) {
pthread_once(&g_semaphores.once, sem_open_setup);
}
#ifdef _NOPL0
#define sem_open_lock() _NOPL0("__threadcalls", sem_open_lock)
#define sem_open_unlock() _NOPL0("__threadcalls", sem_open_unlock)
#endif
static sem_t *sem_open_impl(const char *path, int oflag, unsigned mode,
unsigned value) {
int fd;
sem_t *sem;
va_list va;
unsigned mode;
char path[PATH_MAX];
va_start(va, oflag);
mode = va_arg(va, unsigned);
va_end(va);
struct stat st;
oflag |= O_RDWR | O_CLOEXEC;
if ((fd = openat(AT_FDCWD, __sem_name(name, path), oflag, mode)) == -1) {
if ((fd = openat(AT_FDCWD, path, oflag, mode)) == -1) {
return SEM_FAILED;
}
if (ftruncate(fd, sizeof(sem_t)) == -1) {
_npassert(!fstat(fd, &st));
if (st.st_size < PAGESIZE && ftruncate(fd, PAGESIZE) == -1) {
_npassert(!close(fd));
return SEM_FAILED;
}
sem = mmap(0, sizeof(sem_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
sem = mmap(0, PAGESIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (sem != MAP_FAILED) {
atomic_store_explicit(&sem->sem_value, value, memory_order_relaxed);
sem->sem_magic = SEM_MAGIC_NAMED;
sem->sem_dev = st.st_dev;
sem->sem_ino = st.st_ino;
sem->sem_pshared = true;
} else {
sem = SEM_FAILED;
}
_npassert(!close(fd));
return sem;
}
static struct Semaphore *sem_open_find(const char *path) {
struct Semaphore *s;
for (s = g_semaphores.list; s; s = s->next) {
if (!strcmp(path, s->path)) {
return s;
}
}
return 0;
}
static struct Semaphore *sem_open_reopen(const char *path) {
int e = errno;
struct stat st;
struct Semaphore *s;
for (s = g_semaphores.list; s; s = s->next) {
if (!s->dead && //
!strcmp(path, s->path)) {
if (!fstatat(AT_FDCWD, path, &st, 0) && //
st.st_dev == s->sem->sem_dev && //
st.st_ino == s->sem->sem_ino) {
return s;
} else {
errno = e;
s->dead = true;
}
}
}
return 0;
}
static struct Semaphore *sem_open_get(const sem_t *sem,
struct Semaphore ***out_prev) {
struct Semaphore *s, *t, **p;
for (p = &g_semaphores.list, s = *p; s; p = &s->next, s = s->next) {
if (s && sem == s->sem) {
*out_prev = p;
return s;
}
}
return 0;
}
/**
* Initializes and opens named semaphore.
*
* This function tracks open semaphore objects within a process. When a
* process calls sem_open() multiple times with the same name, then the
* same shared memory address will be returned, unless it was unlinked.
*
* @param name is arbitrary string that begins with a slash character
* @param oflag can have any of:
* - `O_CREAT` to create the named semaphore if it doesn't exist,
* in which case two additional arguments must be supplied
* - `O_EXCL` to raise `EEXIST` if semaphore already exists
* @param mode is octal mode bits, required if `oflag & O_CREAT`
* @param value is initial of semaphore, required if `oflag & O_CREAT`
* @return semaphore object which needs sem_close(), or SEM_FAILED w/ errno
* @raise ENOSPC if file system is full when `name` would be `O_CREAT`ed
* @raise EINVAL if `oflag` has bits other than `O_CREAT | O_EXCL`
* @raise EINVAL if `value` is negative or exceeds `SEM_VALUE_MAX`
* @raise EEXIST if `O_CREAT|O_EXCL` is used and semaphore exists
* @raise EACCES if we didn't have permission to create semaphore
* @raise EACCES if recreating open semaphore pending an unlink
* @raise EMFILE if process `RLIMIT_NOFILE` has been reached
* @raise ENFILE if system-wide file limit has been reached
* @raise ENOMEM if we require more vespene gas
* @raise EINTR if signal handler was called
* @threadsafe
*/
sem_t *sem_open(const char *name, int oflag, ...) {
sem_t *sem;
va_list va;
struct Semaphore *s;
unsigned mode = 0, value = 0;
char *path, pathbuf[PATH_MAX];
if (oflag & ~(O_CREAT | O_EXCL)) {
einval();
return SEM_FAILED;
}
if (oflag & O_CREAT) {
va_start(va, oflag);
mode = va_arg(va, unsigned);
value = va_arg(va, unsigned);
va_end(va);
if (value > SEM_VALUE_MAX) {
einval();
return SEM_FAILED;
}
}
if (!(path = sem_path_np(name, pathbuf, sizeof(pathbuf)))) {
return SEM_FAILED;
}
sem_open_init();
sem_open_lock();
if ((s = sem_open_reopen(path))) {
if (s->sem->sem_lazydelete) {
if (oflag & O_CREAT) {
eacces();
} else {
enoent();
}
sem = SEM_FAILED;
} else if (~oflag & O_EXCL) {
sem = s->sem;
atomic_fetch_add_explicit(&sem->sem_prefs, 1, memory_order_acquire);
++s->refs;
} else {
eexist();
sem = SEM_FAILED;
}
} else if ((s = calloc(1, sizeof(struct Semaphore)))) {
if ((s->path = strdup(path))) {
if ((sem = sem_open_impl(path, oflag, mode, value)) != SEM_FAILED) {
atomic_fetch_add_explicit(&sem->sem_prefs, 1, memory_order_relaxed);
s->next = g_semaphores.list;
s->sem = sem;
s->refs = 1;
g_semaphores.list = s;
} else {
free(s->path);
free(s);
}
} else {
free(s);
sem = SEM_FAILED;
}
} else {
sem = SEM_FAILED;
}
sem_open_unlock();
return sem;
}
/**
* Closes named semaphore.
*
* Calling sem_close() on a semaphore not created by sem_open() has
* undefined behavior. Using `sem` after calling sem_close() from either
* the current process or forked processes sharing the same address is
* also undefined behavior. If any threads in this process or forked
* children are currently blocked on `sem` then calling sem_close() has
* undefined behavior.
*
* @param sem was created with sem_open()
* @return 0 on success, or -1 w/ errno
*/
int sem_close(sem_t *sem) {
int rc, prefs;
bool unmap, delete;
struct Semaphore *s, **p;
_npassert(sem->sem_magic == SEM_MAGIC_NAMED);
sem_open_init();
sem_open_lock();
_npassert((s = sem_open_get(sem, &p)));
prefs = atomic_fetch_add_explicit(&sem->sem_prefs, -1, memory_order_release);
_npassert(s->refs > 0);
if ((unmap = !--s->refs)) {
_npassert(prefs > 0);
delete = sem->sem_lazydelete && prefs == 1;
*p = s->next;
} else {
_npassert(prefs > 1);
delete = false;
}
sem_open_unlock();
if (unmap) {
_npassert(!munmap(sem, PAGESIZE));
}
if (delete) {
rc = unlink(s->path);
}
if (unmap) {
free(s->path);
free(s);
}
return 0;
}
/**
* Removes named semaphore.
*
* This causes the file resource to be deleted. If processes have this
* semaphore currently opened, then on platforms like Windows deletion
* may be postponed until the last process calls sem_close().
*
* @param name can be absolute path or should be component w/o slashes
* @return 0 on success, or -1 w/ errno
* @raise ACCESS if Windows is being fussy about deleting open files
* @raise EPERM if pledge() is in play w/o `cpath` promise
* @raise ENOENT if named semaphore doesn't exist
* @raise EACCES if permission is denied
* @raise ENAMETOOLONG if too long
*/
int sem_unlink(const char *name) {
int rc, e = errno;
struct Semaphore *s;
char *path, pathbuf[PATH_MAX];
if (!(path = sem_path_np(name, pathbuf, sizeof(pathbuf)))) return -1;
if ((rc = unlink(path)) == -1 && IsWindows() && errno == EACCES) {
sem_open_init();
sem_open_lock();
if ((s = sem_open_find(path))) {
s->sem->sem_lazydelete = true;
errno = e;
rc = 0;
}
sem_open_unlock();
}
return rc;
}

View file

@ -16,22 +16,38 @@
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/intrin/kprintf.h"
#include "libc/runtime/runtime.h"
#include "libc/dce.h"
#include "libc/str/blake2.h"
#include "libc/str/path.h"
#include "libc/str/str.h"
#include "libc/thread/semaphore.internal.h"
#include "libc/sysv/errfuns.h"
#include "libc/thread/thread.h"
const char *__sem_name(const char *name, char path[hasatleast PATH_MAX]) {
const char *res;
if (_isabspath(name)) {
res = name;
/**
* Returns filesystem pathname of named semaphore.
*
* @param name is `name` of semaphore which should begin with slash
* @param buf is temporary storage with at least `size` bytes
* @param size is size of `buf` in bytes
* @return pointer to file system path
* @raise ENAMETOOLONG if constructed path would exceed `size`
*/
const char *sem_path_np(const char *name, char *buf, size_t size) {
char *p;
unsigned n;
const char *path, *a;
uint8_t digest[BLAKE2B256_DIGEST_LENGTH];
a = "/tmp/", n = 5;
if (IsLinux()) a = "/dev/shm/", n = 9;
if (n + BLAKE2B256_DIGEST_LENGTH * 2 + 4 < size) {
BLAKE2B256(name, strlen(name), digest);
p = mempcpy(buf, a, n);
p = hexpcpy(p, digest, BLAKE2B256_DIGEST_LENGTH);
p = mempcpy(p, ".sem", 5);
path = buf;
} else {
strlcpy(path, kTmpPath, PATH_MAX);
strlcat(path, ".sem-", PATH_MAX);
strlcat(path, name, PATH_MAX);
res = path;
enametoolong();
path = 0;
}
return res;
return path;
}

View file

@ -17,9 +17,10 @@
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/assert.h"
#include "libc/calls/calls.h"
#include "libc/errno.h"
#include "libc/intrin/atomic.h"
#include "libc/intrin/strace.internal.h"
#include "libc/limits.h"
#include "libc/sysv/errfuns.h"
#include "libc/thread/semaphore.h"
#include "third_party/nsync/futex.internal.h"
@ -31,14 +32,17 @@
* @raise EINVAL if `sem` isn't valid
*/
int sem_post(sem_t *sem) {
int rc;
int old = atomic_fetch_add_explicit(&sem->sem_value, 1, memory_order_relaxed);
int rc, old, wakeups;
_unassert(sem->sem_pshared || sem->sem_pid == getpid());
old = atomic_fetch_add_explicit(&sem->sem_value, 1, memory_order_relaxed);
_unassert(old > INT_MIN);
if (old >= 0) {
_npassert(nsync_futex_wake_(&sem->sem_value, 1, sem->sem_pshared) >= 0);
wakeups = nsync_futex_wake_(&sem->sem_value, 1, sem->sem_pshared);
_npassert(wakeups >= 0);
rc = 0;
} else {
wakeups = 0;
rc = einval();
}
STRACE("sem_post(%p) → %d% m", sem, rc);
return rc;
}

View file

@ -21,10 +21,9 @@
#include "libc/calls/struct/timespec.internal.h"
#include "libc/errno.h"
#include "libc/intrin/atomic.h"
#include "libc/intrin/strace.internal.h"
#include "libc/limits.h"
#include "libc/sysv/errfuns.h"
#include "libc/thread/semaphore.h"
#include "libc/thread/semaphore.internal.h"
#include "third_party/nsync/futex.internal.h"
static void sem_delay(int n) {
@ -79,6 +78,9 @@ int sem_timedwait(sem_t *sem, const struct timespec *abstime) {
}
}
_unassert(atomic_fetch_add_explicit(&sem->sem_waiters, +1,
memory_order_acquire) >= 0);
do {
if (!(v = atomic_load_explicit(&sem->sem_value, memory_order_relaxed))) {
rc = nsync_futex_wait_(&sem->sem_value, v, sem->sem_pshared,
@ -101,13 +103,15 @@ int sem_timedwait(sem_t *sem, const struct timespec *abstime) {
} else if (v > 0) {
rc = 0;
} else {
_unassert(v > INT_MIN);
rc = einval();
}
} while (!rc && (!v || !atomic_compare_exchange_weak_explicit(
&sem->sem_value, &v, v - 1, memory_order_acquire,
memory_order_relaxed)));
STRACE("sem_timedwait(%p, %s) → %d% m", sem, DescribeTimespec(0, abstime),
rc);
_unassert(atomic_fetch_add_explicit(&sem->sem_waiters, -1,
memory_order_release) > 0);
return rc;
}

View file

@ -17,8 +17,10 @@
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/assert.h"
#include "libc/calls/calls.h"
#include "libc/errno.h"
#include "libc/intrin/atomic.h"
#include "libc/limits.h"
#include "libc/sysv/errfuns.h"
#include "libc/thread/semaphore.h"
@ -32,8 +34,10 @@
*/
int sem_trywait(sem_t *sem) {
int v;
_unassert(sem->sem_pshared || sem->sem_pid == getpid());
v = atomic_load_explicit(&sem->sem_value, memory_order_relaxed);
do {
_unassert(v > INT_MIN);
if (!v) return eagain();
if (v < 0) return einval();
} while (!atomic_compare_exchange_weak_explicit(

View file

@ -1,35 +0,0 @@
/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│
vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi
Copyright 2022 Justine Alexandra Roberts Tunney
Permission to use, copy, modify, and/or distribute this software for
any purpose with or without fee is hereby granted, provided that the
above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/calls/calls.h"
#include "libc/thread/semaphore.h"
#include "libc/thread/semaphore.internal.h"
/**
* Removes named semaphore.
*
* @param name can be absolute path or should be component w/o slashes
* @return 0 on success, or -1 w/ errno
* @raise EPERM if pledge() is in play w/o `cpath` promise
* @raise ENOENT if named semaphore doesn't exist
* @raise EACCES if permission is denied
*/
int sem_unlink(const char *name) {
char path[PATH_MAX];
return unlink(__sem_name(name, path));
}

View file

@ -4,12 +4,21 @@
#if !(__ASSEMBLER__ + __LINKER__ + 0)
COSMOPOLITAN_C_START_
#define SEM_FAILED ((sem_t *)0)
#define SEM_FAILED ((sem_t *)0)
#define SEM_MAGIC_NAMED 0xDEADBEEFu
#define SEM_MAGIC_UNNAMED 0xFEEDABEEu
typedef struct {
union {
struct {
_Atomic(int) sem_value;
_Atomic(int) sem_waiters;
_Atomic(int) sem_prefs; /* named only */
unsigned sem_magic;
int64_t sem_dev; /* named only */
int64_t sem_ino; /* named only */
int sem_pid; /* unnamed only */
bool sem_lazydelete; /* named only */
bool sem_pshared;
};
void *sem_space[32];
@ -26,6 +35,7 @@ int sem_getvalue(sem_t *, int *);
sem_t *sem_open(const char *, int, ...);
int sem_close(sem_t *);
int sem_unlink(const char *);
const char *sem_path_np(const char *, char *, size_t);
COSMOPOLITAN_C_END_
#endif /* !(__ASSEMBLER__ + __LINKER__ + 0) */

View file

@ -1,10 +0,0 @@
#ifndef COSMOPOLITAN_LIBC_THREAD_SEMAPHORE_INTERNAL_H_
#define COSMOPOLITAN_LIBC_THREAD_SEMAPHORE_INTERNAL_H_
#if !(__ASSEMBLER__ + __LINKER__ + 0)
COSMOPOLITAN_C_START_
const char *__sem_name(const char *, char[hasatleast PATH_MAX]) hidden;
COSMOPOLITAN_C_END_
#endif /* !(__ASSEMBLER__ + __LINKER__ + 0) */
#endif /* COSMOPOLITAN_LIBC_THREAD_SEMAPHORE_INTERNAL_H_ */

View file

@ -63,7 +63,7 @@ typedef struct pthread_mutex_s {
unsigned _pshared : 1;
unsigned _depth : 8;
unsigned _owner : 21;
void *_nsync;
long _pid;
} pthread_mutex_t;
typedef struct pthread_mutexattr_s {
@ -149,6 +149,7 @@ int pthread_mutexattr_gettype(const pthread_mutexattr_t *, int *);
int pthread_mutexattr_settype(pthread_mutexattr_t *, int);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *, int);
int pthread_mutexattr_getpshared(const pthread_mutexattr_t *, int *);
int pthread_atfork(void (*)(void), void (*)(void), void (*)(void));
int pthread_mutex_init(pthread_mutex_t *, const pthread_mutexattr_t *);
int pthread_mutex_lock(pthread_mutex_t *);
int pthread_mutex_unlock(pthread_mutex_t *);

View file

@ -4,6 +4,8 @@
#define TLS_ALIGNMENT 64
#define TIB_FLAG_TIME_CRITICAL 1
#define TIB_FLAG_VFORKED 2
#define TIB_FLAG_WINCRASHING 4
#if !(__ASSEMBLER__ + __LINKER__ + 0)
COSMOPOLITAN_C_START_
@ -24,8 +26,8 @@ struct CosmoTib {
struct CosmoTib *tib_self2; /* 0x30 */
_Atomic(int32_t) tib_tid; /* 0x38 */
int32_t tib_errno; /* 0x3c */
uint64_t tib_flags; /* 0x40 */
void *tib_nsync;
uint64_t tib_flags;
uint64_t tib_sigmask;
void *tib_reserved3;
void *tib_reserved4;