mirror of
https://github.com/jart/cosmopolitan.git
synced 2025-05-19 20:10:08 +00:00
Redbean stream yield implementation (#370)
* Simplify handling of coroutine state * Update redbean to allow yielding from Lua to support streaming * Add stack checks for Lua resume calls in redbean
This commit is contained in:
parent
e5314dedde
commit
87029ac3f9
1 changed files with 72 additions and 38 deletions
|
@ -385,6 +385,7 @@ static int gmtoff;
|
||||||
static int client;
|
static int client;
|
||||||
static int changeuid;
|
static int changeuid;
|
||||||
static int changegid;
|
static int changegid;
|
||||||
|
static int isyielding;
|
||||||
static int statuscode;
|
static int statuscode;
|
||||||
static int sslpskindex;
|
static int sslpskindex;
|
||||||
static int oldloglevel;
|
static int oldloglevel;
|
||||||
|
@ -395,7 +396,7 @@ static uint32_t clientaddrsize;
|
||||||
|
|
||||||
static size_t zsize;
|
static size_t zsize;
|
||||||
static char *outbuf;
|
static char *outbuf;
|
||||||
static lua_State *GL;
|
static lua_State *GL, *YL;
|
||||||
static char *content;
|
static char *content;
|
||||||
static uint8_t *zmap;
|
static uint8_t *zmap;
|
||||||
static uint8_t *zbase;
|
static uint8_t *zbase;
|
||||||
|
@ -1057,21 +1058,28 @@ static nodiscard char *LuaFormatStack(lua_State *L) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// calling convention for lua stack of L is:
|
// calling convention for lua stack of L is:
|
||||||
// -3 is lua_newthread() called by caller
|
|
||||||
// -2 is function
|
// -2 is function
|
||||||
// -1 is is argument (assuming nargs == 1)
|
// -1 is is argument (assuming nargs == 1)
|
||||||
// L will have this after the call
|
// L will have this after the call
|
||||||
// -2 is lua_newthread() called by caller
|
// -1 is error or result (assuming nres == 1)
|
||||||
// -1 is result (assuming nres == 1)
|
|
||||||
// @param L is main Lua interpreter
|
// @param L is main Lua interpreter
|
||||||
// @param C should be the result of lua_newthread()
|
|
||||||
// @note this needs to be reentrant
|
// @note this needs to be reentrant
|
||||||
static int LuaCallWithTrace(lua_State *L, lua_State *C, int nargs, int nres) {
|
static int LuaCallWithTrace(lua_State *L, int nargs, int nres, lua_State *C) {
|
||||||
int nresults, status;
|
int nresults, status;
|
||||||
|
bool canyield = !!C; // allow yield if coroutine is provided
|
||||||
|
if (!C) C = lua_newthread(L); // create a new coroutine if not passed
|
||||||
|
// move coroutine to the bottom of the stack (including one that is passed)
|
||||||
|
lua_insert(L, 1);
|
||||||
|
// make sure that there is enough stack space
|
||||||
|
if (!lua_checkstack(C, 1 + nargs)) {
|
||||||
|
lua_pushliteral(L, "too many arguments to resume");
|
||||||
|
return LUA_ERRRUN; /* error flag */
|
||||||
|
}
|
||||||
// move the function (and arguments) to the top of the coro stack
|
// move the function (and arguments) to the top of the coro stack
|
||||||
lua_xmove(L, C, 1 + nargs);
|
lua_xmove(L, C, 1 + nargs);
|
||||||
// resume the coroutine thus executing the function
|
// resume the coroutine thus executing the function
|
||||||
status = lua_resume(C, L, nargs, &nresults);
|
status = lua_resume(C, L, nargs, &nresults);
|
||||||
|
lua_remove(L, 1); // remove coroutine (still) at the bottom
|
||||||
if (status != LUA_OK && status != LUA_YIELD) {
|
if (status != LUA_OK && status != LUA_YIELD) {
|
||||||
// move the error message
|
// move the error message
|
||||||
lua_xmove(C, L, 1);
|
lua_xmove(C, L, 1);
|
||||||
|
@ -1079,15 +1087,17 @@ static int LuaCallWithTrace(lua_State *L, lua_State *C, int nargs, int nres) {
|
||||||
luaL_traceback(L, C, lua_tostring(L, -1), 0);
|
luaL_traceback(L, C, lua_tostring(L, -1), 0);
|
||||||
lua_remove(L, -2); // remove the error message
|
lua_remove(L, -2); // remove the error message
|
||||||
} else {
|
} else {
|
||||||
// move results to the main stack
|
if (!lua_checkstack(L, MAX(nresults, nres))) {
|
||||||
lua_xmove(C, L, nresults);
|
lua_pop(C, nresults); /* remove results anyway */
|
||||||
// make sure the stack has enough space to grow
|
lua_pushliteral(L, "too many results to resume");
|
||||||
luaL_checkstack(L, nres - nresults, NULL);
|
return LUA_ERRRUN; /* error flag */
|
||||||
|
}
|
||||||
|
lua_xmove(C, L, nresults); // move results to the main stack
|
||||||
// grow the stack in case returned fewer results
|
// grow the stack in case returned fewer results
|
||||||
// than the caller expects, as lua_resume
|
// than the caller expects, as lua_resume
|
||||||
// doesn't adjust the stack for needed results
|
// doesn't adjust the stack for needed results
|
||||||
for (; nresults < nres; nresults++) lua_pushnil(L);
|
for (; nresults < nres; nresults++) lua_pushnil(L);
|
||||||
status = LUA_OK; // treat LUA_YIELD the same as LUA_OK
|
if (!canyield) status = LUA_OK; // treat LUA_YIELD the same as LUA_OK
|
||||||
}
|
}
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
@ -1098,14 +1108,12 @@ static void LogLuaError(char *hook, char *err) {
|
||||||
|
|
||||||
static bool LuaRunCode(const char *code) {
|
static bool LuaRunCode(const char *code) {
|
||||||
lua_State *L = GL;
|
lua_State *L = GL;
|
||||||
lua_State *C = lua_newthread(L);
|
|
||||||
int status = luaL_loadstring(L, code);
|
int status = luaL_loadstring(L, code);
|
||||||
if (status != LUA_OK || LuaCallWithTrace(L, C, 0, 0) != LUA_OK) {
|
if (status != LUA_OK || LuaCallWithTrace(L, 0, 0, NULL) != LUA_OK) {
|
||||||
LogLuaError("lua code", lua_tostring(L, -1));
|
LogLuaError("lua code", lua_tostring(L, -1));
|
||||||
lua_pop(L, 2); // pop error and thread
|
lua_pop(L, 1); // pop error
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
lua_pop(L, 1); // pop thread
|
|
||||||
AssertLuaStackIsEmpty(L);
|
AssertLuaStackIsEmpty(L);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1115,7 +1123,6 @@ static bool LuaOnClientConnection(void) {
|
||||||
uint32_t ip, serverip;
|
uint32_t ip, serverip;
|
||||||
uint16_t port, serverport;
|
uint16_t port, serverport;
|
||||||
lua_State *L = GL;
|
lua_State *L = GL;
|
||||||
lua_State *C = lua_newthread(L);
|
|
||||||
lua_getglobal(L, "OnClientConnection");
|
lua_getglobal(L, "OnClientConnection");
|
||||||
GetClientAddr(&ip, &port);
|
GetClientAddr(&ip, &port);
|
||||||
GetServerAddr(&serverip, &serverport);
|
GetServerAddr(&serverip, &serverport);
|
||||||
|
@ -1123,14 +1130,13 @@ static bool LuaOnClientConnection(void) {
|
||||||
lua_pushinteger(L, port);
|
lua_pushinteger(L, port);
|
||||||
lua_pushinteger(L, serverip);
|
lua_pushinteger(L, serverip);
|
||||||
lua_pushinteger(L, serverport);
|
lua_pushinteger(L, serverport);
|
||||||
if (LuaCallWithTrace(L, C, 4, 1) == LUA_OK) {
|
if (LuaCallWithTrace(L, 4, 1, NULL) == LUA_OK) {
|
||||||
dropit = lua_toboolean(L, -1);
|
dropit = lua_toboolean(L, -1);
|
||||||
} else {
|
} else {
|
||||||
LogLuaError("OnClientConnection", lua_tostring(L, -1));
|
LogLuaError("OnClientConnection", lua_tostring(L, -1));
|
||||||
lua_pop(L, 1); // pop error
|
lua_pop(L, 1); // pop error
|
||||||
dropit = false;
|
dropit = false;
|
||||||
}
|
}
|
||||||
lua_pop(L, 1); // pop thread
|
|
||||||
AssertLuaStackIsEmpty(L);
|
AssertLuaStackIsEmpty(L);
|
||||||
return dropit;
|
return dropit;
|
||||||
}
|
}
|
||||||
|
@ -1139,7 +1145,6 @@ static void LuaOnProcessCreate(int pid) {
|
||||||
uint32_t ip, serverip;
|
uint32_t ip, serverip;
|
||||||
uint16_t port, serverport;
|
uint16_t port, serverport;
|
||||||
lua_State *L = GL;
|
lua_State *L = GL;
|
||||||
lua_State *C = lua_newthread(L);
|
|
||||||
lua_getglobal(L, "OnProcessCreate");
|
lua_getglobal(L, "OnProcessCreate");
|
||||||
GetClientAddr(&ip, &port);
|
GetClientAddr(&ip, &port);
|
||||||
GetServerAddr(&serverip, &serverport);
|
GetServerAddr(&serverip, &serverport);
|
||||||
|
@ -1148,24 +1153,21 @@ static void LuaOnProcessCreate(int pid) {
|
||||||
lua_pushinteger(L, port);
|
lua_pushinteger(L, port);
|
||||||
lua_pushinteger(L, serverip);
|
lua_pushinteger(L, serverip);
|
||||||
lua_pushinteger(L, serverport);
|
lua_pushinteger(L, serverport);
|
||||||
if (LuaCallWithTrace(L, C, 5, 0) != LUA_OK) {
|
if (LuaCallWithTrace(L, 5, 0, NULL) != LUA_OK) {
|
||||||
LogLuaError("OnProcessCreate", lua_tostring(L, -1));
|
LogLuaError("OnProcessCreate", lua_tostring(L, -1));
|
||||||
lua_pop(L, 1); // pop error
|
lua_pop(L, 1); // pop error
|
||||||
}
|
}
|
||||||
lua_pop(L, 1); // pop thread
|
|
||||||
AssertLuaStackIsEmpty(L);
|
AssertLuaStackIsEmpty(L);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void LuaOnProcessDestroy(int pid) {
|
static void LuaOnProcessDestroy(int pid) {
|
||||||
lua_State *L = GL;
|
lua_State *L = GL;
|
||||||
lua_State *C = lua_newthread(L);
|
|
||||||
lua_getglobal(L, "OnProcessDestroy");
|
lua_getglobal(L, "OnProcessDestroy");
|
||||||
lua_pushinteger(L, pid);
|
lua_pushinteger(L, pid);
|
||||||
if (LuaCallWithTrace(L, C, 1, 0) != LUA_OK) {
|
if (LuaCallWithTrace(L, 1, 0, NULL) != LUA_OK) {
|
||||||
LogLuaError("OnProcessDestroy", lua_tostring(L, -1));
|
LogLuaError("OnProcessDestroy", lua_tostring(L, -1));
|
||||||
lua_pop(L, 1); // pop error
|
lua_pop(L, 1); // pop error
|
||||||
}
|
}
|
||||||
lua_pop(L, 1); // pop thread
|
|
||||||
AssertLuaStackIsEmpty(L);
|
AssertLuaStackIsEmpty(L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1182,13 +1184,11 @@ static inline bool IsHookDefined(const char *s) {
|
||||||
|
|
||||||
static void CallSimpleHook(const char *s) {
|
static void CallSimpleHook(const char *s) {
|
||||||
lua_State *L = GL;
|
lua_State *L = GL;
|
||||||
lua_State *C = lua_newthread(L);
|
|
||||||
lua_getglobal(L, s);
|
lua_getglobal(L, s);
|
||||||
if (LuaCallWithTrace(L, C, 0, 0) != LUA_OK) {
|
if (LuaCallWithTrace(L, 0, 0, NULL) != LUA_OK) {
|
||||||
LogLuaError(s, lua_tostring(L, -1));
|
LogLuaError(s, lua_tostring(L, -1));
|
||||||
lua_pop(L, 1); // pop error
|
lua_pop(L, 1); // pop error
|
||||||
}
|
}
|
||||||
lua_pop(L, 1); // pop thread
|
|
||||||
AssertLuaStackIsEmpty(L);
|
AssertLuaStackIsEmpty(L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2413,6 +2413,45 @@ static char *ServeFailure(unsigned code, const char *reason) {
|
||||||
return ServeErrorImpl(code, reason, NULL);
|
return ServeErrorImpl(code, reason, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ssize_t YieldGenerator(struct iovec v[3]) {
|
||||||
|
int nresults, status;
|
||||||
|
if (isyielding > 1) {
|
||||||
|
do {
|
||||||
|
if (!YL || lua_status(YL) != LUA_YIELD) return 0; // done yielding
|
||||||
|
contentlength = 0;
|
||||||
|
status = lua_resume(YL, NULL, 0, &nresults);
|
||||||
|
if (status != LUA_OK && status != LUA_YIELD) {
|
||||||
|
LogLuaError("resume", lua_tostring(YL, -1));
|
||||||
|
lua_pop(YL, 1);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
lua_pop(YL, nresults);
|
||||||
|
if (!contentlength) UseOutput();
|
||||||
|
// continue yielding if nothing to return to keep generator running
|
||||||
|
} while (!contentlength);
|
||||||
|
}
|
||||||
|
DEBUGF("(lua) yielded with %ld bytes generated", contentlength);
|
||||||
|
isyielding++;
|
||||||
|
v[0].iov_base = content;
|
||||||
|
v[0].iov_len = contentlength;
|
||||||
|
return contentlength;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int LuaCallWithYield(lua_State *L) {
|
||||||
|
int status;
|
||||||
|
// since yield may happen in OnHttpRequest and in ServeLua,
|
||||||
|
// need to fully restart the yield generator;
|
||||||
|
// the second set of headers is not going to be sent
|
||||||
|
lua_State *co = lua_newthread(L);
|
||||||
|
if ((status = LuaCallWithTrace(L, 0, 0, co)) == LUA_YIELD) {
|
||||||
|
YL = co;
|
||||||
|
generator = YieldGenerator;
|
||||||
|
if (!isyielding) isyielding = 1;
|
||||||
|
status = LUA_OK;
|
||||||
|
}
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
static ssize_t DeflateGenerator(struct iovec v[3]) {
|
static ssize_t DeflateGenerator(struct iovec v[3]) {
|
||||||
int i, rc;
|
int i, rc;
|
||||||
size_t no;
|
size_t no;
|
||||||
|
@ -2987,12 +3026,10 @@ static bool IsLoopbackClient() {
|
||||||
static char *LuaOnHttpRequest(void) {
|
static char *LuaOnHttpRequest(void) {
|
||||||
char *error;
|
char *error;
|
||||||
lua_State *L = GL;
|
lua_State *L = GL;
|
||||||
lua_State *C = lua_newthread(L);
|
|
||||||
effectivepath.p = url.path.p;
|
effectivepath.p = url.path.p;
|
||||||
effectivepath.n = url.path.n;
|
effectivepath.n = url.path.n;
|
||||||
lua_getglobal(L, "OnHttpRequest");
|
lua_getglobal(L, "OnHttpRequest");
|
||||||
if (LuaCallWithTrace(L, C, 0, 0) == LUA_OK) {
|
if (LuaCallWithYield(L) == LUA_OK) {
|
||||||
lua_pop(L, 1); // pop thread
|
|
||||||
AssertLuaStackIsEmpty(L);
|
AssertLuaStackIsEmpty(L);
|
||||||
return CommitOutput(GetLuaResponse());
|
return CommitOutput(GetLuaResponse());
|
||||||
} else {
|
} else {
|
||||||
|
@ -3000,7 +3037,7 @@ static char *LuaOnHttpRequest(void) {
|
||||||
error =
|
error =
|
||||||
ServeErrorWithDetail(500, "Internal Server Error",
|
ServeErrorWithDetail(500, "Internal Server Error",
|
||||||
IsLoopbackClient() ? lua_tostring(L, -1) : NULL);
|
IsLoopbackClient() ? lua_tostring(L, -1) : NULL);
|
||||||
lua_pop(L, 2); // pop error and thread
|
lua_pop(L, 1); // pop error
|
||||||
AssertLuaStackIsEmpty(L);
|
AssertLuaStackIsEmpty(L);
|
||||||
return error;
|
return error;
|
||||||
}
|
}
|
||||||
|
@ -3010,7 +3047,6 @@ static char *ServeLua(struct Asset *a, const char *s, size_t n) {
|
||||||
char *code;
|
char *code;
|
||||||
size_t codelen;
|
size_t codelen;
|
||||||
lua_State *L = GL;
|
lua_State *L = GL;
|
||||||
lua_State *C = lua_newthread(L);
|
|
||||||
LockInc(&shared->c.dynamicrequests);
|
LockInc(&shared->c.dynamicrequests);
|
||||||
effectivepath.p = s;
|
effectivepath.p = s;
|
||||||
effectivepath.n = n;
|
effectivepath.n = n;
|
||||||
|
@ -3018,8 +3054,7 @@ static char *ServeLua(struct Asset *a, const char *s, size_t n) {
|
||||||
int status =
|
int status =
|
||||||
luaL_loadbuffer(L, code, codelen,
|
luaL_loadbuffer(L, code, codelen,
|
||||||
FreeLater(xasprintf("@%s", FreeLater(strndup(s, n)))));
|
FreeLater(xasprintf("@%s", FreeLater(strndup(s, n)))));
|
||||||
if (status == LUA_OK && LuaCallWithTrace(L, C, 0, 0) == LUA_OK) {
|
if (status == LUA_OK && LuaCallWithYield(L) == LUA_OK) {
|
||||||
lua_pop(L, 1); // pop thread
|
|
||||||
return CommitOutput(GetLuaResponse());
|
return CommitOutput(GetLuaResponse());
|
||||||
} else {
|
} else {
|
||||||
char *error;
|
char *error;
|
||||||
|
@ -3027,7 +3062,7 @@ static char *ServeLua(struct Asset *a, const char *s, size_t n) {
|
||||||
error =
|
error =
|
||||||
ServeErrorWithDetail(500, "Internal Server Error",
|
ServeErrorWithDetail(500, "Internal Server Error",
|
||||||
IsLoopbackClient() ? lua_tostring(L, -1) : NULL);
|
IsLoopbackClient() ? lua_tostring(L, -1) : NULL);
|
||||||
lua_pop(L, 2); // pop error and thread
|
lua_pop(L, 1); // pop error
|
||||||
return error;
|
return error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5695,18 +5730,16 @@ static bool LuaRunAsset(const char *path, bool mandatory) {
|
||||||
if ((a = GetAsset(path, pathlen))) {
|
if ((a = GetAsset(path, pathlen))) {
|
||||||
if ((code = FreeLater(LoadAsset(a, &codelen)))) {
|
if ((code = FreeLater(LoadAsset(a, &codelen)))) {
|
||||||
lua_State *L = GL;
|
lua_State *L = GL;
|
||||||
lua_State *C = lua_newthread(L);
|
|
||||||
effectivepath.p = path;
|
effectivepath.p = path;
|
||||||
effectivepath.n = pathlen;
|
effectivepath.n = pathlen;
|
||||||
DEBUGF("(lua) LuaRunAsset(%`'s)", path);
|
DEBUGF("(lua) LuaRunAsset(%`'s)", path);
|
||||||
status =
|
status =
|
||||||
luaL_loadbuffer(L, code, codelen, FreeLater(xasprintf("@%s", path)));
|
luaL_loadbuffer(L, code, codelen, FreeLater(xasprintf("@%s", path)));
|
||||||
if (status != LUA_OK || LuaCallWithTrace(L, C, 0, 0) != LUA_OK) {
|
if (status != LUA_OK || LuaCallWithTrace(L, 0, 0, NULL) != LUA_OK) {
|
||||||
LogLuaError("lua code", lua_tostring(L, -1));
|
LogLuaError("lua code", lua_tostring(L, -1));
|
||||||
lua_pop(L, 1); // pop error
|
lua_pop(L, 1); // pop error
|
||||||
if (mandatory) exit(1);
|
if (mandatory) exit(1);
|
||||||
}
|
}
|
||||||
lua_pop(L, 1); // pop thread
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return !!a;
|
return !!a;
|
||||||
|
@ -6774,6 +6807,7 @@ static void InitRequest(void) {
|
||||||
loops.n = 0;
|
loops.n = 0;
|
||||||
generator = 0;
|
generator = 0;
|
||||||
luaheaderp = 0;
|
luaheaderp = 0;
|
||||||
|
isyielding = 0;
|
||||||
contentlength = 0;
|
contentlength = 0;
|
||||||
hascontenttype = false;
|
hascontenttype = false;
|
||||||
referrerpolicy = 0;
|
referrerpolicy = 0;
|
||||||
|
|
Loading…
Add table
Reference in a new issue