Greatly expand system() shell code features

The cosmopolitan command interpreter now has 13 builtin commands,
variable support, support for ; / && / || syntax, asynchronous support,
and plenty of unit tests with bug fixes.

This change fixes a bug in posix_spawn() with null envp arg. strace
logging now uses atomic writes for scatter functions. Breaking change
renaming GetCpuCount() to _getcpucount(). TurfWar is now updated to use
the new token bucket algorithm. WIN32 affinity masks now inherit across
fork() and execve().
This commit is contained in:
Justine Tunney 2022-10-11 21:06:27 -07:00
parent e7329b7cba
commit b41f91c658
No known key found for this signature in database
GPG key ID: BE714B4575D6E328
80 changed files with 1370 additions and 344 deletions

View file

@ -26,6 +26,7 @@
#include "libc/calls/struct/stat.h"
#include "libc/calls/struct/timespec.h"
#include "libc/calls/struct/timeval.h"
#include "libc/dce.h"
#include "libc/errno.h"
#include "libc/fmt/conv.h"
#include "libc/fmt/itoa.h"
@ -43,7 +44,6 @@
#include "libc/runtime/internal.h"
#include "libc/runtime/runtime.h"
#include "libc/runtime/stack.h"
#include "libc/runtime/sysconf.h"
#include "libc/sock/sock.h"
#include "libc/sock/struct/pollfd.h"
#include "libc/sock/struct/sockaddr.h"
@ -71,6 +71,7 @@
#include "net/http/escape.h"
#include "net/http/http.h"
#include "net/http/ip.h"
#include "net/http/tokenbucket.h"
#include "net/http/url.h"
#include "third_party/getopt/getopt.h"
#include "third_party/nsync/counter.h"
@ -88,8 +89,8 @@
*/
#define PORT 8080 // default server listening port
#define CPUS 256 // number of cpus to actually use
#define WORKERS 1000 // size of http client thread pool
#define CPUS 64 // number of cpus to actually use
#define WORKERS 100 // size of http client thread pool
#define SUPERVISE_MS 1000 // how often to stat() asset files
#define KEEPALIVE_MS 60000 // max time to keep idle conn open
#define MELTALIVE_MS 2000 // panic keepalive under heavy load
@ -106,12 +107,17 @@
#define QUEUE_MAX 800 // maximum pending claim items in queue
#define BATCH_MAX 64 // max claims to insert per transaction
#define NICK_MAX 40 // max length of user nickname string
#define TB_INTERVAL 1000 // millis between token replenishes
#define TB_CIDR 22 // token bucket cidr specificity
#define SOCK_MAX 100 // max length of socket queue
#define MSG_BUF 512 // small response lookaside
#define INBUF_SIZE PAGESIZE
#define OUTBUF_SIZE 8192
#define TB_BYTES (1u << TB_CIDR)
#define TB_WORDS (TB_BYTES / 8)
#define GETOPTS "idvp:w:k:"
#define USAGE \
"\
@ -257,6 +263,7 @@ atomic_long g_parsefails;
atomic_long g_iprequests;
atomic_long g_queuefulls;
atomic_long g_htmlclaims;
atomic_long g_ratelimits;
atomic_long g_emptyclaims;
atomic_long g_acceptfails;
atomic_long g_badversions;
@ -270,6 +277,11 @@ atomic_long g_claimsenqueued;
atomic_long g_claimsprocessed;
atomic_long g_statuszrequests;
union TokenBucket {
atomic_schar *b;
atomic_uint_fast64_t *w;
} g_tok;
// http worker objects
struct Worker {
pthread_t th;
@ -599,7 +611,7 @@ void OnlyRunOnCpu(int i) {
int n;
cpu_set_t cpus;
_Static_assert(CPUS > 0, "");
n = GetCpuCount();
n = _getcpucount();
n = MIN(CPUS, n);
i = MIN(i, n - 1);
CPU_ZERO(&cpus);
@ -612,7 +624,7 @@ void DontRunOnFirstCpus(int i) {
int n;
cpu_set_t cpus;
_Static_assert(CPUS > 0, "");
n = GetCpuCount();
n = _getcpucount();
n = MIN(CPUS, n);
i = MIN(i, n - 1);
CPU_ZERO(&cpus);
@ -679,6 +691,7 @@ void ServeStatusz(int client, char *outbuf) {
p = Statusz(p, "iprequests", g_iprequests);
p = Statusz(p, "queuefulls", g_queuefulls);
p = Statusz(p, "htmlclaims", g_htmlclaims);
p = Statusz(p, "ratelimits", g_ratelimits);
p = Statusz(p, "emptyclaims", g_emptyclaims);
p = Statusz(p, "acceptfails", g_acceptfails);
p = Statusz(p, "badversions", g_badversions);
@ -860,6 +873,17 @@ void *HttpWorker(void *arg) {
break;
}
if (!AcquireToken(g_tok.b, ip, TB_CIDR)) {
LOG("%s rate limiting client\n", ipbuf, msg->version);
Write(client.sock, "HTTP/1.1 429 Too Many Requests\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"\r\n"
"429 Too Many Requests\n");
++g_ratelimits;
break;
}
// access log
LOG("%16s %.*s %.*s %.*s %.*s %#.*s\n", ipbuf,
msg->xmethod.b - msg->xmethod.a, inbuf + msg->xmethod.a,
@ -966,6 +990,8 @@ void *HttpWorker(void *arg) {
"Cache-Control: max-age=3600, private\r\n"
"Date: ");
p = FormatDate(p);
p = stpcpy(p, "\r\nX-Token-Count: ");
p = FormatInt32(p, CountTokens(g_tok.b, ip, TB_CIDR));
p = stpcpy(p, "\r\nContent-Length: ");
p = FormatInt32(p, strlen(ipbuf));
p = stpcpy(p, "\r\n\r\n");
@ -1662,6 +1688,25 @@ void *NowWorker(void *arg) {
return 0;
}
// worker for refilling token buckets
void *ReplenishWorker(void *arg) {
BlockSignals();
pthread_setname_np(pthread_self(), "Replenisher");
LOG("%P Replenisher started\n");
UpdateNow();
OnlyRunOnCpu(0);
for (struct timespec ts = _timespec_real();;
ts = _timespec_add(ts, _timespec_frommillis(TB_INTERVAL))) {
if (!nsync_note_wait(g_shutdown[1], ts)) {
ReplenishTokens(g_tok.w, TB_WORDS);
} else {
break;
}
}
LOG("Replenisher exiting\n");
return 0;
}
// we're permissive in allowing http connection keepalive until the
// moment worker resources start becoming scarce. when that happens
// we'll (1) cancel read operations that have not sent us a message
@ -1723,8 +1768,10 @@ int main(int argc, char *argv[]) {
// ShowCrashReports();
// we don't have proper futexes on these platforms
// so choose a smaller number of workers
g_workers = MIN(g_workers, 4);
// we'll be somewhat less aggressive about workers
if (IsXnu() || IsNetbsd()) {
g_workers = MIN(g_workers, _getcpucount());
}
// user interface
GetOpts(argc, argv);
@ -1755,6 +1802,10 @@ int main(int argc, char *argv[]) {
sqlite3_initialize();
CheckDatabase();
// fill token buckets
g_tok.b = malloc(TB_BYTES);
memset(g_tok.b, 127, TB_BYTES);
// server lifecycle locks
g_started = _timespec_real();
for (int i = 0; i < ARRAYLEN(g_shutdown); ++i) {
@ -1789,13 +1840,14 @@ int main(int argc, char *argv[]) {
// make 8 helper threads
g_ready = nsync_counter_new(9);
pthread_t scorer, recenter, claimer, nower;
pthread_t scorer, recenter, claimer, nower, replenisher;
pthread_t scorer_hour, scorer_day, scorer_week, scorer_month;
CHECK_EQ(0, pthread_create(&scorer, 0, ScoreWorker, 0));
CHECK_EQ(0, pthread_create(&scorer_hour, 0, ScoreHourWorker, 0));
CHECK_EQ(0, pthread_create(&scorer_day, 0, ScoreDayWorker, 0));
CHECK_EQ(0, pthread_create(&scorer_week, 0, ScoreWeekWorker, 0));
CHECK_EQ(0, pthread_create(&scorer_month, 0, ScoreMonthWorker, 0));
CHECK_EQ(0, pthread_create(&replenisher, 0, ReplenishWorker, 0));
CHECK_EQ(0, pthread_create(&recenter, 0, RecentWorker, 0));
CHECK_EQ(0, pthread_create(&claimer, 0, ClaimWorker, 0));
CHECK_EQ(0, pthread_create(&nower, 0, NowWorker, 0));
@ -1843,6 +1895,7 @@ int main(int argc, char *argv[]) {
CHECK_EQ(0, pthread_join(scorer_hour, 0));
CHECK_EQ(0, pthread_join(scorer_week, 0));
CHECK_EQ(0, pthread_join(scorer_month, 0));
CHECK_EQ(0, pthread_join(replenisher, 0));
// now that all workers have terminated, the claims queue must be
// empty, therefore, it is now safe to send a cancellation to the
@ -1872,6 +1925,7 @@ int main(int argc, char *argv[]) {
}
nsync_counter_free(g_ready);
free(g_worker);
free(g_tok.b);
LOG("Goodbye\n");
// CheckForMemoryLeaks();