Add shared memory apis to redbean

You can now do things like implement mutexes using futexes in your
redbean lua code. This provides the fastest possible inter-process
communication for your production systems when SQLite alone as ipc
or things like pipes aren't sufficient.
This commit is contained in:
Justine Tunney 2022-10-06 04:55:26 -07:00
parent 81ee11a16e
commit 7822917fc2
No known key found for this signature in database
GPG key ID: BE714B4575D6E328
21 changed files with 988 additions and 23 deletions

View file

@ -49,7 +49,7 @@ int __ensurefds_unlocked(int fd) {
bool relocate;
if (fd < g_fds.n) return fd;
g_fds.n = fd + 1;
g_fds.e = _extend(g_fds.p, g_fds.n * sizeof(*g_fds.p), g_fds.e,
g_fds.e = _extend(g_fds.p, g_fds.n * sizeof(*g_fds.p), g_fds.e, MAP_PRIVATE,
kMemtrackFdsStart + kMemtrackFdsSize);
return fd;
}

View file

@ -29,11 +29,11 @@
#define G FRAMESIZE
static void _mapframe(void *p) {
static void _mapframe(void *p, int f) {
int prot, flags;
struct DirectMap dm;
prot = PROT_READ | PROT_WRITE;
flags = MAP_PRIVATE | MAP_ANONYMOUS | MAP_FIXED;
flags = f | MAP_ANONYMOUS | MAP_FIXED;
if ((dm = sys_mmap(p, G, prot, flags, -1, 0)).addr != p) {
notpossible;
}
@ -60,9 +60,10 @@ static void _mapframe(void *p) {
* @param n specifies how many bytes are needed
* @param e points to end of memory that's allocated
* @param h is highest address to which `e` may grow
* @param f should be `MAP_PRIVATE` or `MAP_SHARED`
* @return new value for `e`
*/
noasan void *_extend(void *p, size_t n, void *e, intptr_t h) {
noasan void *_extend(void *p, size_t n, void *e, int f, intptr_t h) {
char *q;
#ifndef NDEBUG
if ((uintptr_t)SHADOW(p) & (G - 1)) notpossible;
@ -71,10 +72,10 @@ noasan void *_extend(void *p, size_t n, void *e, intptr_t h) {
for (q = e; q < ((char *)p + n); q += 8) {
if (!((uintptr_t)q & (G - 1))) {
if (q + G > (char *)h) notpossible;
_mapframe(q);
_mapframe(q, f);
if (IsAsan()) {
if (!((uintptr_t)SHADOW(q) & (G - 1))) {
_mapframe(SHADOW(q));
_mapframe(SHADOW(q), f);
__asan_poison(q, G << kAsanScale, kAsanProtected);
}
}

View file

@ -3,7 +3,7 @@
#if !(__ASSEMBLER__ + __LINKER__ + 0)
COSMOPOLITAN_C_START_
void *_extend(void *, size_t, void *, intptr_t);
void *_extend(void *, size_t, void *, int, intptr_t);
COSMOPOLITAN_C_END_
#endif /* !(__ASSEMBLER__ + __LINKER__ + 0) */

View file

@ -23,6 +23,7 @@
#include "libc/intrin/weaken.h"
#include "libc/nt/runtime.h"
#include "libc/runtime/memtrack.internal.h"
#include "libc/sysv/consts/map.h"
#include "libc/sysv/consts/o.h"
#include "libc/thread/thread.h"
@ -47,7 +48,7 @@ textstartup void InitializeFileDescriptors(void) {
fds->p = fds->e = (void *)kMemtrackFdsStart;
fds->n = 4;
fds->f = 3;
fds->e = _extend(fds->p, fds->n * sizeof(*fds->p), fds->e,
fds->e = _extend(fds->p, fds->n * sizeof(*fds->p), fds->e, MAP_PRIVATE,
kMemtrackFdsStart + kMemtrackFdsSize);
if (IsMetal()) {
extern const char vga_console[];

View file

@ -21,7 +21,7 @@
#include "libc/macros.internal.h"
.privileged
// Asks kernel to let other threads be scheduled.
// Relinquishes scheduled quantum.
//
// @return 0 on success, or -1 w/ errno
// @norestart

31
libc/intrin/sys_umtx_op.S Normal file
View file

@ -0,0 +1,31 @@
/*-*- mode:unix-assembly; indent-tabs-mode:t; tab-width:8; coding:utf-8 -*-│
vi: set et ft=asm ts=8 tw=8 fenc=utf-8 :vi
Copyright 2022 Justine Alexandra Roberts Tunney
Permission to use, copy, modify, and/or distribute this software for
any purpose with or without fee is hereby granted, provided that the
above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/macros.internal.h"
.privileged
// Calls FreeBSD's futex() API.
// Normalizes return value to Linux ABI -errno convention.
sys_umtx_op:
mov $0x1c6,%eax
syscall
jc 1f
ret
1: neg %eax
ret
.endfn sys_umtx_op,globl,hidden

View file

@ -1,2 +1,2 @@
.include "o/libc/sysv/macros.internal.inc"
.scall sys_clock_nanosleep,0x1ddfff0f4ffff0e6,globl
.scall sys_clock_nanosleep,0x1ddfff0f4ffff0e6,globl,hidden

View file

@ -1,2 +0,0 @@
.include "o/libc/sysv/macros.internal.inc"
.scall sys_umtx_op,0xffffff1c6fffffff,globl

View file

@ -642,7 +642,7 @@ scall sys_bsdthread_register 0xfffffffff216efff globl hidden
#scall write_nocancel 0xfffffffff218dfff globl
#scall writev_nocancel 0xfffffffff219cfff globl
#──────────────────────────FREEBSD───────────────────────────
scall sys_umtx_op 0xffffff1c6fffffff globl
#scall sys_umtx_op 0xffffff1c6fffffff globl
#scall abort2 0xffffff1cffffffff globl
#scall afs3_syscall 0xffffff179fffffff globl
#scall bindat 0xffffff21afffffff globl

View file

@ -54,7 +54,7 @@ static void *__zipos_mmap(size_t mapsize) {
maptotal += mapsize;
start = (char *)kMemtrackZiposStart;
if (!mapend) mapend = start;
mapend = _extend(start, maptotal, mapend,
mapend = _extend(start, maptotal, mapend, MAP_PRIVATE,
kMemtrackZiposStart + kMemtrackZiposSize);
return start + offset;
}

View file

@ -0,0 +1,92 @@
/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│
vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi
Copyright 2022 Justine Alexandra Roberts Tunney
Permission to use, copy, modify, and/or distribute this software for
any purpose with or without fee is hereby granted, provided that the
above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/calls/calls.h"
#include "libc/errno.h"
#include "libc/intrin/bits.h"
#include "libc/intrin/intrin.h"
#include "libc/intrin/kprintf.h"
#include "libc/macros.internal.h"
#include "libc/runtime/runtime.h"
#include "libc/testlib/testlib.h"
#include "libc/thread/thread.h"
#define PROCESSES 8
#define ITERATIONS 100000
struct SharedMemory {
pthread_mutex_t mutex;
volatile long x;
} * shm;
void Worker(void) {
long t;
for (int i = 0; i < ITERATIONS; ++i) {
pthread_mutex_lock(&shm->mutex);
t = shm->x;
t += 1;
shm->x = t;
pthread_mutex_unlock(&shm->mutex);
}
}
TEST(lockipc, mutex) {
int e, rc, ws, pid;
// create shared memory
shm = _mapshared(FRAMESIZE);
// create shared mutex
pthread_mutexattr_t mattr;
pthread_mutexattr_init(&mattr);
pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_NORMAL);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&shm->mutex, &mattr);
pthread_mutexattr_destroy(&mattr);
// create processes
for (int i = 0; i < PROCESSES; ++i) {
ASSERT_NE(-1, (rc = fork()));
if (!rc) {
Worker();
_Exit(0);
}
}
// wait for processes to finish
for (;;) {
e = errno;
if ((pid = waitpid(0, &ws, 0)) != -1) {
if (WIFSIGNALED(ws)) {
kprintf("process %d terminated with %G\n", pid, WTERMSIG(ws));
testlib_incrementfailed();
} else if (WEXITSTATUS(ws)) {
kprintf("process %d exited with %d\n", pid, WEXITSTATUS(ws));
testlib_incrementfailed();
}
} else {
ASSERT_EQ(ECHILD, errno);
errno = e;
break;
}
}
EXPECT_EQ(PROCESSES * ITERATIONS, shm->x);
ASSERT_EQ(0, pthread_mutex_destroy(&shm->mutex));
ASSERT_SYS(0, 0, munmap(shm, FRAMESIZE));
}

View file

@ -0,0 +1,113 @@
-- Copyright 2022 Justine Alexandra Roberts Tunney
--
-- Permission to use, copy, modify, and/or distribute this software for
-- any purpose with or without fee is hereby granted, provided that the
-- above copyright notice and this permission notice appear in all copies.
--
-- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
-- WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
-- WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
-- AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
-- DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
-- PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
-- TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-- PERFORMANCE OF THIS SOFTWARE.
assert(unix.pledge("stdio proc"))
-- let's use futexes to implement a scalable mutex in lua
--
-- this example is designed to be the kind of thing that would be
-- extremely expensive if we were using spin locks, because we'll
-- have a lot of processes that wait a long time for something to
-- happen. futexes solve that by asking the kernel to help us out
-- and let the waiting processes sleep until the actual event.
--
-- here we have fifty processes, waiting one just one process, to
-- to finish a long-running task. if we used spin locks, then the
-- waiter procesess would eat up all the cpu. if you benchmark it
-- then be sure to note that WALL TIME will be the same, it's the
-- CPU USER TIME that gets pwnd.
--
-- uses 67 ms cpu time with futexes
-- uses 5,000 ms cpu time with spinlocks
millis = 300
waiters = 100
words = 2
mem = unix.mapshared(words * 8)
LOCK = 0 -- word index of our lock
RESULT = 1 -- word index of our result
-- From Futexes Are Tricky Version 1.1 § Mutex, Take 3;
-- Ulrich Drepper, Red Hat Incorporated, June 27, 2004.
function Lock()
local ok, old = mem:cmpxchg(LOCK, 0, 1)
if not ok then
if old == 1 then
old = mem:xchg(LOCK, 2)
end
while old > 0 do
mem:wait(LOCK, 2)
old = mem:xchg(LOCK, 2)
end
end
end
function Unlock()
local old = mem:add(LOCK, -1)
if old == 2 then
mem:store(LOCK, 0)
mem:wake(LOCK, 1)
end
end
-- -- try it out with spin locks instead
-- function Lock()
-- while mem:xchg(LOCK, 1) == 1 do
-- end
-- end
-- function Unlock()
-- mem:store(LOCK, 0)
-- end
function Worker()
assert(unix.nanosleep(math.floor(millis / 1000),
millis % 1000 * 1000 * 1000))
mem:store(RESULT, 123)
Unlock()
end
function Waiter()
Lock()
assert(mem:load(RESULT) == 123)
Unlock()
end
Lock()
for i = 1,waiters do
pid = assert(unix.fork())
if pid == 0 then
Waiter()
unix.exit(0)
end
end
Worker()
while true do
rc, ws = unix.wait(0)
if not rc then
assert(ws:errno() == unix.ECHILD)
break
end
if unix.WIFEXITED(ws) then
if unix.WEXITSTATUS(ws) ~= 0 then
print('process %d exited with %s' % {rc, unix.WEXITSTATUS(ws)})
unix.exit(1)
end
else
print('process %d terminated with %s' % {rc, unix.WTERMSIG(ws)})
unix.exit(1)
end
end

View file

@ -0,0 +1,85 @@
-- Copyright 2022 Justine Alexandra Roberts Tunney
--
-- Permission to use, copy, modify, and/or distribute this software for
-- any purpose with or without fee is hereby granted, provided that the
-- above copyright notice and this permission notice appear in all copies.
--
-- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
-- WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
-- WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
-- AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
-- DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
-- PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
-- TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-- PERFORMANCE OF THIS SOFTWARE.
assert(unix.pledge("stdio proc"))
words = 2
processes = 8
iterations = 10000
mem = unix.mapshared(words * 8)
--------------------------------------------------------------------------------
-- test shared memory string reading and writing
mem:write('hello')
assert(mem:read() == 'hello')
mem:write('hi')
assert(mem:read() == 'hi')
assert(mem:read(0, 5) == 'hi\0lo')
mem:write('H', 0, 1)
assert(mem:read(0, 5) == 'Hi\0lo')
assert(mem:read(1, 1) == 'i')
--------------------------------------------------------------------------------
-- test shared memory locking primitives
mem:store(0, 0)
assert(mem:xchg(0, 1) == 0)
assert(mem:xchg(0, 2) == 1)
mem:store(0, 0)
ok, old = mem:cmpxchg(0, 0, 1)
assert(ok and old == 0)
ok, old = mem:cmpxchg(0, 666, 777)
assert(not ok and old == 1)
assert(mem:add(0, 3) == 1)
assert(mem:load(0) == 4)
--------------------------------------------------------------------------------
-- test atomic addition across concurrent processes
function Worker()
for i = 1,iterations do
mem:add(0, 1)
end
end
mem:store(0, 0)
for i = 1,processes do
pid = assert(unix.fork())
if pid == 0 then
Worker()
unix.exit(0)
end
end
while true do
rc, ws = unix.wait(0)
if not rc then
assert(ws:errno() == unix.ECHILD)
break
end
if unix.WIFEXITED(ws) then
if unix.WEXITSTATUS(ws) ~= 0 then
print('process %d exited with %s' % {rc, unix.WEXITSTATUS(ws)})
unix.exit(1)
end
else
print('process %d terminated with %s' % {rc, unix.WTERMSIG(ws)})
unix.exit(1)
end
end
assert(mem:load(0) == processes * iterations)

View file

@ -0,0 +1,76 @@
-- Copyright 2022 Justine Alexandra Roberts Tunney
--
-- Permission to use, copy, modify, and/or distribute this software for
-- any purpose with or without fee is hereby granted, provided that the
-- above copyright notice and this permission notice appear in all copies.
--
-- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
-- WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
-- WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
-- AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
-- DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
-- PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
-- TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-- PERFORMANCE OF THIS SOFTWARE.
assert(unix.pledge("stdio proc"))
words = 2
processes = 8
iterations = 10000
mem = unix.mapshared(words * 8)
--------------------------------------------------------------------------------
-- let's use atomics to implement a spin lock in lua
LOCK = 0 -- word index of our lock
COUNTER = 1 -- word index of our number
function Lock()
while mem:xchg(LOCK, 1) == 1 do
end
end
function Unlock()
mem:store(LOCK, 0)
end
function Worker()
local x
for i = 1,iterations do
Lock()
x = mem:load(COUNTER)
x = x + 1
mem:store(COUNTER, x)
Unlock()
end
end
mem:store(LOCK, 0)
mem:store(COUNTER, 0)
for i = 1,processes do
pid = assert(unix.fork())
if pid == 0 then
Worker()
unix.exit(0)
end
end
while true do
rc, ws = unix.wait(0)
if not rc then
assert(ws:errno() == unix.ECHILD)
break
end
if unix.WIFEXITED(ws) then
if unix.WEXITSTATUS(ws) ~= 0 then
print('process %d exited with %s' % {rc, unix.WEXITSTATUS(ws)})
unix.exit(1)
end
else
print('process %d terminated with %s' % {rc, unix.WTERMSIG(ws)})
unix.exit(1)
end
end
assert(mem:load(COUNTER) == processes * iterations)

View file

@ -200,9 +200,11 @@ THIRD_PARTY_LUA_UNIX_DIRECTDEPS = \
LIBC_STR \
LIBC_STUBS \
LIBC_SYSV \
LIBC_THREAD \
LIBC_TIME \
LIBC_X \
THIRD_PARTY_LUA
THIRD_PARTY_LUA \
THIRD_PARTY_NSYNC
THIRD_PARTY_LUA_UNIX_DEPS := \
$(call uniq,$(foreach x,$(THIRD_PARTY_LUA_UNIX_DIRECTDEPS),$($(x))))

View file

@ -17,6 +17,7 @@
PERFORMANCE OF THIS SOFTWARE.
*/
#include "libc/assert.h"
#include "libc/atomic.h"
#include "libc/calls/calls.h"
#include "libc/calls/ioctl.h"
#include "libc/calls/makedev.h"
@ -36,22 +37,27 @@
#include "libc/calls/struct/timeval.h"
#include "libc/calls/struct/winsize.h"
#include "libc/calls/ucontext.h"
#include "libc/dce.h"
#include "libc/dns/dns.h"
#include "libc/errno.h"
#include "libc/fmt/conv.h"
#include "libc/fmt/fmt.h"
#include "libc/fmt/itoa.h"
#include "libc/fmt/magnumstrs.internal.h"
#include "libc/intrin/atomic.h"
#include "libc/intrin/bits.h"
#include "libc/intrin/strace.internal.h"
#include "libc/limits.h"
#include "libc/log/log.h"
#include "libc/macros.internal.h"
#include "libc/mem/fmt.h"
#include "libc/mem/mem.h"
#include "libc/nt/runtime.h"
#include "libc/nt/synchronization.h"
#include "libc/runtime/clktck.h"
#include "libc/runtime/memtrack.internal.h"
#include "libc/runtime/runtime.h"
#include "libc/runtime/sysconf.h"
#include "libc/sock/sock.h"
#include "libc/sock/struct/ifconf.h"
#include "libc/sock/struct/linger.h"
@ -72,11 +78,13 @@
#include "libc/sysv/consts/itimer.h"
#include "libc/sysv/consts/limits.h"
#include "libc/sysv/consts/log.h"
#include "libc/sysv/consts/map.h"
#include "libc/sysv/consts/msg.h"
#include "libc/sysv/consts/nr.h"
#include "libc/sysv/consts/o.h"
#include "libc/sysv/consts/ok.h"
#include "libc/sysv/consts/poll.h"
#include "libc/sysv/consts/prot.h"
#include "libc/sysv/consts/rlim.h"
#include "libc/sysv/consts/rlimit.h"
#include "libc/sysv/consts/rusage.h"
@ -93,6 +101,7 @@
#include "libc/sysv/consts/utime.h"
#include "libc/sysv/consts/w.h"
#include "libc/sysv/errfuns.h"
#include "libc/thread/thread.h"
#include "libc/time/struct/tm.h"
#include "libc/time/time.h"
#include "libc/x/x.h"
@ -102,6 +111,7 @@
#include "third_party/lua/lua.h"
#include "third_party/lua/luaconf.h"
#include "third_party/lua/lunix.h"
#include "third_party/nsync/futex.internal.h"
#include "tool/net/luacheck.h"
/**
@ -1855,7 +1865,7 @@ static int LuaUnixStrsignal(lua_State *L) {
// unix.WIFEXITED(wstatus)
// └─→ bool
static int LuaUnixWifexited(lua_State *L) {
return ReturnBoolean(L, WIFEXITED(luaL_checkinteger(L, 1)));
return ReturnBoolean(L, !!WIFEXITED(luaL_checkinteger(L, 1)));
}
// unix.WEXITSTATUS(wstatus)
@ -1867,7 +1877,7 @@ static int LuaUnixWexitstatus(lua_State *L) {
// unix.WIFSIGNALED(wstatus)
// └─→ bool
static int LuaUnixWifsignaled(lua_State *L) {
return ReturnBoolean(L, WIFSIGNALED(luaL_checkinteger(L, 1)));
return ReturnBoolean(L, !!WIFSIGNALED(luaL_checkinteger(L, 1)));
}
// unix.WTERMSIG(wstatus)
@ -1998,6 +2008,12 @@ static int LuaUnixTiocgwinsz(lua_State *L) {
}
}
// unix.sched_yield()
static int LuaUnixSchedYield(lua_State *L) {
sched_yield();
return 0;
}
////////////////////////////////////////////////////////////////////////////////
// unix.Stat object
@ -2636,6 +2652,302 @@ static void LuaUnixErrnoObj(lua_State *L) {
lua_pop(L, 1);
}
////////////////////////////////////////////////////////////////////////////////
// unix.Memory object
struct Memory {
union {
char *bytes;
atomic_long *words;
} u;
size_t size;
void *map;
size_t mapsize;
pthread_mutex_t *lock;
};
// unix.Memory:read([offset:int[, bytes:int]])
// └─→ str
static int LuaUnixMemoryRead(lua_State *L) {
size_t i, n;
struct Memory *m;
m = luaL_checkudata(L, 1, "unix.Memory");
i = luaL_optinteger(L, 2, 0);
if (lua_isnoneornil(L, 3)) {
// unix.Memory:read([offset:int])
// extracts nul-terminated string
if (i > m->size) {
luaL_error(L, "out of range");
unreachable;
}
n = strnlen(m->u.bytes + i, m->size - i);
} else {
// unix.Memory:read(offset:int, bytes:int)
// read binary data with boundary checking
n = luaL_checkinteger(L, 3);
if (i > m->size || n >= m->size || i + n > m->size) {
luaL_error(L, "out of range");
unreachable;
}
}
pthread_mutex_lock(m->lock);
lua_pushlstring(L, m->u.bytes + i, n);
pthread_mutex_unlock(m->lock);
return 1;
}
// unix.Memory:write(data:str[, offset:int[, bytes:int]])
static int LuaUnixMemoryWrite(lua_State *L) {
const char *s;
size_t i, n, j;
struct Memory *m;
m = luaL_checkudata(L, 1, "unix.Memory");
s = luaL_checklstring(L, 2, &n);
i = luaL_optinteger(L, 3, 0);
if (i > m->size) {
luaL_error(L, "out of range");
unreachable;
}
if (lua_isnoneornil(L, 4)) {
// unix.Memory:write(data:str[, offset:int])
// writes binary data, plus a nul terminator
if (i < n < m->size) {
// include lua string's implicit nul so this round trips with
// unix.Memory:read(offset:int) even when we're overwriting a
// larger string that was previously inserted
n += 1;
} else {
// there's no room to include the implicit nul-terminator so
// leave it out which is safe b/c Memory:read() uses strnlen
}
} else {
// unix.Memory:write(data:str, offset:int, bytes:int])
// writes binary data without including nul-terminator
j = luaL_checkinteger(L, 4);
if (j > n) {
luaL_argerror(L, 4, "bytes is more than what's in data");
unreachable;
}
n = j;
}
if (i + n > m->size) {
luaL_error(L, "out of range");
unreachable;
}
pthread_mutex_lock(m->lock);
memcpy(m->u.bytes + i, s, n);
pthread_mutex_unlock(m->lock);
return 0;
}
static atomic_long *GetWord(lua_State *L) {
size_t i;
struct Memory *m;
m = luaL_checkudata(L, 1, "unix.Memory");
i = luaL_checkinteger(L, 2);
if (i >= m->size / sizeof(*m->u.words)) {
luaL_error(L, "out of range");
unreachable;
}
return m->u.words + i;
}
// unix.Memory:load(word_index:int)
// └─→ int
static int LuaUnixMemoryLoad(lua_State *L) {
lua_pushinteger(L, atomic_load_explicit(GetWord(L), memory_order_relaxed));
return 1;
}
// unix.Memory:store(word_index:int, value:int)
static int LuaUnixMemoryStore(lua_State *L) {
atomic_store_explicit(GetWord(L), luaL_checkinteger(L, 3),
memory_order_relaxed);
return 0;
}
// unix.Memory:xchg(word_index:int, value:int)
// └─→ int
static int LuaUnixMemoryXchg(lua_State *L) {
lua_pushinteger(L, atomic_exchange(GetWord(L), luaL_checkinteger(L, 3)));
return 1;
}
// unix.Memory:cmpxchg(word_index:int, old:int, new:int)
// └─→ success:bool, old:int
static int LuaUnixMemoryCmpxchg(lua_State *L) {
long old = luaL_checkinteger(L, 3);
lua_pushboolean(L, atomic_compare_exchange_strong(GetWord(L), &old,
luaL_checkinteger(L, 4)));
lua_pushinteger(L, old);
return 2;
}
// unix.Memory:add(word_index:int, value:int)
// └─→ old:int
static int LuaUnixMemoryAdd(lua_State *L) {
lua_pushinteger(L, atomic_fetch_add(GetWord(L), luaL_checkinteger(L, 3)));
return 1;
}
// unix.Memory:and(word_index:int, value:int)
// └─→ old:int
static int LuaUnixMemoryAnd(lua_State *L) {
lua_pushinteger(L, atomic_fetch_and(GetWord(L), luaL_checkinteger(L, 3)));
return 1;
}
// unix.Memory:or(word_index:int, value:int)
// └─→ old:int
static int LuaUnixMemoryOr(lua_State *L) {
lua_pushinteger(L, atomic_fetch_or(GetWord(L), luaL_checkinteger(L, 3)));
return 1;
}
// unix.Memory:xor(word_index:int, value:int)
// └─→ old:int
static int LuaUnixMemoryXor(lua_State *L) {
lua_pushinteger(L, atomic_fetch_xor(GetWord(L), luaL_checkinteger(L, 3)));
return 1;
}
// unix.Memory:wait(word_index:int, expect:int[, abs_deadline:int[, nanos:int]])
// ├─→ 0
// ├─→ nil, unix.Errno(unix.EINTR)
// ├─→ nil, unix.Errno(unix.EAGAIN)
// └─→ nil, unix.Errno(unix.ETIMEDOUT)
static int LuaUnixMemoryWait(lua_State *L) {
lua_Integer expect;
int rc, olderr = errno;
struct timespec ts, now, *deadline;
expect = luaL_checkinteger(L, 3);
if (!(INT32_MIN <= expect && expect <= INT32_MAX)) {
luaL_argerror(L, 3, "must be an int32_t");
unreachable;
}
if (lua_isnoneornil(L, 4)) {
deadline = 0; // wait forever
} else {
ts.tv_sec = luaL_checkinteger(L, 4);
ts.tv_nsec = luaL_optinteger(L, 5, 0);
if (!FUTEX_TIMEOUT_IS_ABSOLUTE) {
now = _timespec_real();
if (_timespec_gt(now, ts)) {
ts = (struct timespec){0};
} else {
ts = _timespec_sub(ts, now);
}
}
deadline = &ts;
}
rc = nsync_futex_wait_((int *)GetWord(L), expect, PTHREAD_PROCESS_SHARED,
deadline);
if (rc < 0) errno = -rc, rc = -1;
return SysretInteger(L, "futex_wait", olderr, rc);
}
// unix.Memory:wake(index:int[, count:int])
// └─→ woken:int
static int LuaUnixMemoryWake(lua_State *L) {
int count, woken;
count = luaL_optinteger(L, 3, INT_MAX);
woken = nsync_futex_wake_((int *)GetWord(L), count, PTHREAD_PROCESS_SHARED);
_npassert(woken >= 0);
return ReturnInteger(L, woken);
}
static int LuaUnixMemoryTostring(lua_State *L) {
char s[128];
struct Memory *m;
m = luaL_checkudata(L, 1, "unix.Memory");
snprintf(s, sizeof(s), "unix.Memory(%zu)", m->size);
lua_pushstring(L, s);
return 1;
}
static int LuaUnixMemoryGc(lua_State *L) {
struct Memory *m;
m = luaL_checkudata(L, 1, "unix.Memory");
if (m->u.bytes) {
_npassert(!munmap(m->map, m->mapsize));
m->u.bytes = 0;
}
return 0;
}
static const luaL_Reg kLuaUnixMemoryMeth[] = {
{"read", LuaUnixMemoryRead}, //
{"write", LuaUnixMemoryWrite}, //
{"load", LuaUnixMemoryLoad}, //
{"store", LuaUnixMemoryStore}, //
{"xchg", LuaUnixMemoryXchg}, //
{"cmpxchg", LuaUnixMemoryCmpxchg}, //
{"add", LuaUnixMemoryAdd}, //
{"and", LuaUnixMemoryAnd}, //
{"or", LuaUnixMemoryOr}, //
{"xor", LuaUnixMemoryXor}, //
{"wait", LuaUnixMemoryWait}, //
{"wake", LuaUnixMemoryWake}, //
{0}, //
};
static const luaL_Reg kLuaUnixMemoryMeta[] = {
{"__tostring", LuaUnixMemoryTostring}, //
{"__repr", LuaUnixMemoryTostring}, //
{"__gc", LuaUnixMemoryGc}, //
{0}, //
};
static void LuaUnixMemoryObj(lua_State *L) {
luaL_newmetatable(L, "unix.Memory");
luaL_setfuncs(L, kLuaUnixMemoryMeta, 0);
luaL_newlibtable(L, kLuaUnixMemoryMeth);
luaL_setfuncs(L, kLuaUnixMemoryMeth, 0);
lua_setfield(L, -2, "__index");
lua_pop(L, 1);
}
static int LuaUnixMapshared(lua_State *L) {
char *p;
size_t n, c, g;
struct Memory *m;
pthread_mutexattr_t mattr;
n = luaL_checkinteger(L, 1);
if (!n) {
luaL_error(L, "can't map empty region");
unreachable;
}
if (n % sizeof(long)) {
luaL_error(L, "size must be multiple of word size");
unreachable;
}
if (!IsLegalSize(n)) {
luaL_error(L, "map size too big");
unreachable;
}
c = n;
c += sizeof(*m->lock);
g = sysconf(_SC_PAGESIZE);
c = ROUNDUP(c, g);
if (!(p = _mapshared(c))) {
luaL_error(L, "out of memory");
unreachable;
}
m = lua_newuserdatauv(L, sizeof(*m), 1);
luaL_setmetatable(L, "unix.Memory");
m->u.bytes = p + sizeof(*m->lock);
m->size = n;
m->map = p;
m->mapsize = c;
m->lock = (pthread_mutex_t *)p;
pthread_mutexattr_init(&mattr);
pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_NORMAL);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(m->lock, &mattr);
pthread_mutexattr_destroy(&mattr);
return 1;
}
////////////////////////////////////////////////////////////////////////////////
// unix.Sigset object
@ -2961,6 +3273,7 @@ static const luaL_Reg kLuaUnix[] = {
{"lseek", LuaUnixLseek}, // seek in file
{"major", LuaUnixMajor}, // extract device info
{"makedirs", LuaUnixMakedirs}, // make directory and parents too
{"mapshared", LuaUnixMapshared}, // mmap(MAP_SHARED) w/ mutex+atomics
{"minor", LuaUnixMinor}, // extract device info
{"mkdir", LuaUnixMkdir}, // make directory
{"nanosleep", LuaUnixNanosleep}, // sleep w/ nano precision
@ -2978,6 +3291,7 @@ static const luaL_Reg kLuaUnix[] = {
{"rename", LuaUnixRename}, // rename file or directory
{"rmdir", LuaUnixRmdir}, // remove empty directory
{"rmrf", LuaUnixRmrf}, // remove file recursively
{"sched_yield", LuaUnixSchedYield}, // relinquish scheduled quantum
{"send", LuaUnixSend}, // send tcp to some address
{"sendto", LuaUnixSendto}, // send udp to some address
{"setfsgid", LuaUnixSetfsgid}, // set/get group id for fs ops
@ -2994,8 +3308,8 @@ static const luaL_Reg kLuaUnix[] = {
{"setuid", LuaUnixSetuid}, // set real user id of process
{"shutdown", LuaUnixShutdown}, // make socket half empty or full
{"sigaction", LuaUnixSigaction}, // install signal handler
{"sigprocmask", LuaUnixSigprocmask}, // change signal mask
{"sigpending", LuaUnixSigpending}, // get pending signals
{"sigprocmask", LuaUnixSigprocmask}, // change signal mask
{"sigsuspend", LuaUnixSigsuspend}, // wait for signal
{"siocgifconf", LuaUnixSiocgifconf}, // get list of network interfaces
{"socket", LuaUnixSocket}, // create network communication fd
@ -3034,6 +3348,7 @@ int LuaUnix(lua_State *L) {
LuaUnixSigsetObj(L);
LuaUnixRusageObj(L);
LuaUnixStatfsObj(L);
LuaUnixMemoryObj(L);
LuaUnixErrnoObj(L);
LuaUnixStatObj(L);
LuaUnixDirObj(L);

View file

@ -1,6 +1,7 @@
#ifndef NSYNC_ATOMIC_INTERNAL_H_
#define NSYNC_ATOMIC_INTERNAL_H_
#include "libc/intrin/atomic.h"
#include "libc/intrin/cmpxchg.h"
#include "third_party/nsync/atomic.h"
#if !(__ASSEMBLER__ + __LINKER__ + 0)
COSMOPOLITAN_C_START_

View file

@ -106,7 +106,7 @@ int nsync_futex_wait_ (int *p, int expect, char pshare, struct timespec *timeout
uint32_t ms;
int rc, op, fop;
if (!FUTEX_IS_SUPPORTED) {
if (!FUTEX_IS_SUPPORTED || (IsWindows() && pshare)) {
nsync_yield_ ();
if (timeout) {
return -EINTR;
@ -166,12 +166,12 @@ int nsync_futex_wait_ (int *p, int expect, char pshare, struct timespec *timeout
}
int nsync_futex_wake_ (int *p, int count, char pshare) {
int rc, op, fop;
int e, rc, op, fop;
int wake (void *, int, int) asm ("_futex");
ASSERT (count == 1 || count == INT_MAX);
if (!FUTEX_IS_SUPPORTED) {
if (!FUTEX_IS_SUPPORTED || (IsWindows() && pshare)) {
nsync_yield_ ();
return 0;
}

View file

@ -21,6 +21,7 @@
#include "libc/intrin/extend.internal.h"
#include "libc/macros.internal.h"
#include "libc/runtime/memtrack.internal.h"
#include "libc/sysv/consts/map.h"
#include "third_party/nsync/common.internal.h"
#include "third_party/nsync/malloc.internal.h"
// clang-format off
@ -45,7 +46,7 @@ void *nsync_malloc_ (size_t size) {
if (!nsync_malloc_endptr_) nsync_malloc_endptr_ = start;
nsync_malloc_endptr_ =
_extend (start, nsync_malloc_total_, nsync_malloc_endptr_,
kMemtrackNsyncStart + kMemtrackNsyncSize);
MAP_PRIVATE, kMemtrackNsyncStart + kMemtrackNsyncSize);
atomic_store_explicit (&nsync_malloc_lock_, 0, memory_order_relaxed);
return start + offset;
}

View file

@ -5,7 +5,7 @@
COSMOPOLITAN_C_START_
typedef struct nsync_semaphore_s_ {
void *sem_space[32]; /* space used by implementation */
void *sem_space[1]; /* [jart] reduced to 8 bytes */
} nsync_semaphore;
/* Initialize *s; the initial value is 0. */

View file

@ -4467,6 +4467,255 @@ UNIX MODULE
should have better performance, because `kNtFileAttributeTemporary`
asks the kernel to more aggressively cache and reduce i/o ops.
unix.sched_yield()
Relinquishes scheduled quantum.
unix.mapshared(size:int)
└─→ unix.Memory()
Creates interprocess shared memory mapping.
This function allocates special memory that'll be inherited across
fork in a shared way. By default all memory in Redbean is "private"
memory that's only viewable and editable to the process that owns
it. When unix.fork() happens, memory is copied appropriately so
that changes to memory made in the child process, don't clobber
the memory at those same addresses in the parent process. If you
don't want that to happen, and you want the memory to be shared
similar to how it would be shared if you were using threads, then
you can use this function to achieve just that.
The memory object this function returns may be accessed using its
methods, which support atomics and futexes. It's very low-level.
For example, you can use it to implement scalable mutexes:
mem = unix.mapshared(8000 * 8)
LOCK = 0 -- pick an arbitrary word index for lock
-- From Futexes Are Tricky Version 1.1 § Mutex, Take 3;
-- Ulrich Drepper, Red Hat Incorporated, June 27, 2004.
function Lock()
local ok, old = mem:cmpxchg(LOCK, 0, 1)
if not ok then
if old == 1 then
old = mem:xchg(LOCK, 2)
end
while old > 0 do
mem:wait(LOCK, 2)
old = mem:xchg(LOCK, 2)
end
end
end
function Unlock()
old = mem:add(LOCK, -1)
if old == 2 then
mem:store(LOCK, 0)
mem:wake(LOCK, 1)
end
end
It's possible to accomplish the same thing as unix.mapshared()
using files and unix.fcntl() advisory locks. However this goes
significantly faster. For example, that's what SQLite does and
we recommend using SQLite for IPC in redbean. But, if your app
has thousands of forked processes fighting for a file lock you
might need something lower level than file locks, to implement
things like throttling. Shared memory is a good way to do that
since there's nothing that's faster.
The `size` parameter needs to be a multiple of 8. The returned
memory is zero initialized. When allocating shared memory, you
should try to get as much use out of it as possible, since the
overhead of allocating a single shared mapping is 500 words of
resident memory and 8000 words of virtual memory. It's because
the Cosmopolitan Libc mmap() granularity is 2**16.
This system call does not fail. An exception is instead thrown
if sufficient memory isn't available.
────────────────────────────────────────────────────────────────────────────────
UNIX MEMORY OBJECT
unix.Memory encapsulates memory that's shared across fork() and
this module provides the fundamental synchronization primitives
Redbean memory maps may be used in two ways:
1. as an array of bytes a.k.a. a string
2. as an array of words a.k.a. integers
They're aliased, union, or overlapped views of the same memory.
For example if you write a string to your memory region, you'll
be able to read it back as an integer.
Reads, writes, and word operations will throw an exception if a
memory boundary error or overflow occurs.
unix.Memory:read([offset:int[, bytes:int]])
└─→ str
`offset` is the starting byte index from which memory is copied,
which defaults to zero.
If `bytes` is none or nil, then the nul-terminated string at
`offset` is returned. You may specify `bytes` to safely read
binary data.
This operation happens atomically. Each shared mapping has a
single lock which is used to synchronize reads and writes to
that specific map. To make it scale, create additional maps.
unix.Memory:write(data:str[, offset:int[, bytes:int]])
Writes bytes to memory region.
`offset` is the starting byte index to which memory is copied,
which defaults to zero.
If `bytes` is none or nil, then an implicit nil-terminator
will be included after your `data` so things like json can
be easily serialized to shared memory.
This operation happens atomically. Each shared mapping has a
single lock which is used to synchronize reads and writes to
that specific map. To make it scale, create additional maps.
unix.Memory:load(word_index:int)
└─→ int
Loads word from memory region.
This operation is atomic and has relaxed barrier semantics.
unix.Memory:store(word_index:int, value:int)
Stores word from memory region.
This operation is atomic and has relaxed barrier semantics.
unix.Memory:xchg(word_index:int, value:int)
└─→ int
Exchanges value.
This sets word at `word_index` to `value` and returns the value
previously held in by the word.
This operation is atomic and provides the same memory barrier
semantics as the aligned x86 LOCK XCHG instruction.
unix.Memory:cmpxchg(word_index:int, old:int, new:int)
└─→ success:bool, old:int
Compares and exchanges value.
This inspects the word at `word_index` and if its value is the same
as `old` then it'll be replaced by the value `new`, in which case
`true, old` shall be returned. If a different value was held at
word, then `false` shall be returned along with the word.
This operation happens atomically and provides the same memory
barrier semantics as the aligned x86 LOCK CMPXCHG instruction.
unix.Memory:add(word_index:int, value:int)
└─→ old:int
Fetches then adds value.
This method modifies the word at `word_index` to contain the sum of
value and the `value` paremeter. This method then returns the value
as it existed before the addition was performed.
This operation is atomic and provides the same memory barrier
semantics as the aligned x86 LOCK XADD instruction.
unix.Memory:and(word_index:int, value:int)
└─→ int
Fetches and bitwise ands value.
This operation happens atomically and provides the same memory
barrier ordering semantics as its x86 implementation.
unix.Memory:or(word_index:int, value:int)
└─→ int
Fetches and bitwise ors value.
This operation happens atomically and provides the same memory
barrier ordering semantics as its x86 implementation.
unix.Memory:xor(word_index:int, value:int)
└─→ int
Fetches and bitwise xors value.
This operation happens atomically and provides the same memory
barrier ordering semantics as its x86 implementation.
unix.Memory:wait(word_index:int, expect:int[, abs_deadline:int[, nanos:int]])
├─→ 0
├─→ nil, unix.Errno(unix.EINTR)
├─→ nil, unix.Errno(unix.EAGAIN)
└─→ nil, unix.Errno(unix.ETIMEDOUT)
Waits for word to have a different value.
This method asks the kernel to suspend the process until either the
absolute deadline expires or we're woken up by another process that
calls unix.Memory:wake().
The `expect` parameter is used only upon entry to synchronize the
transition to kernelspace. The kernel doesn't actually poll the
memory location. It uses `expect` to make sure the process doesn't
get added to the wait list unless it's sure that it needs to wait,
since the kernel can only control the ordering of wait / wake calls
across processes.
The default behavior is to wait until the heat death of the universe
if necessary. You may alternatively specify an absolute deadline. If
it's less than or equal to the value returned by clock_gettime, then
this routine is non-blocking. Otherwise we'll block at most until
the current time reaches the absolute deadline.
Futexes are currently supported on Linux, FreeBSD, OpenBSD. On other
platforms this method calls sched_yield() and will either (1) return
unix.EINTR if a deadline is specified, otherwise (2) 0 is returned.
This means futexes will *work* on Windows, Mac, and NetBSD but they
won't be scalable in terms of CPU usage when many processes wait on
one process that holds a lock for a long time. In the future we may
polyfill futexes in userspace for these platforms to improve things
for folks who've adopted this api. If lock scalability is something
you need on Windows and MacOS today, then consider fcntl() which is
well-supported on all supported platforms but requires using files.
Please test your use case though, because it's kind of an edge case
to have the scenario above, and chances are this op will work fine.
`EINTR` if a signal is delivered while waiting on deadline. Callers
should use futexes inside a loop that is able to cope with spurious
wakeups. We don't actually guarantee the value at word has in fact
changed when this returns.
`EAGAIN` is raised if, upon entry, the word at `word_index` had a
different value than what's specified at `expect`.
`ETIMEDOUT` is raised when the absolute deadline expires.
unix.Memory:wake(index:int[, count:int])
└─→ woken:int
Wakes other processes waiting on word.
This method may be used to signal or broadcast to waiters. The
`count` specifies the number of processes that should be woken,
which defaults to `INT_MAX`.
The return value is the number of processes that were actually woken
as a result of the system call. No failure conditions are defined.
────────────────────────────────────────────────────────────────────────────────
UNIX DIR OBJECT