From e7329b7cbac2ebc06cd8461074e8843395465176 Mon Sep 17 00:00:00 2001 From: Justine Tunney <jtunney@gmail.com> Date: Tue, 11 Oct 2022 10:11:57 -0700 Subject: [PATCH] Implement atomic swar token bucket library --- net/http/tokenbucket.c | 90 ++++++++++++++++++++++++++++++++ net/http/tokenbucket.h | 13 +++++ test/net/http/tokenbucket_test.c | 73 ++++++++++++++++++++++++++ 3 files changed, 176 insertions(+) create mode 100644 net/http/tokenbucket.c create mode 100644 net/http/tokenbucket.h create mode 100644 test/net/http/tokenbucket_test.c diff --git a/net/http/tokenbucket.c b/net/http/tokenbucket.c new file mode 100644 index 000000000..e8ed5cb62 --- /dev/null +++ b/net/http/tokenbucket.c @@ -0,0 +1,90 @@ +/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│ +│vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi│ +╞══════════════════════════════════════════════════════════════════════════════╡ +│ Copyright 2022 Justine Alexandra Roberts Tunney │ +│ │ +│ Permission to use, copy, modify, and/or distribute this software for │ +│ any purpose with or without fee is hereby granted, provided that the │ +│ above copyright notice and this permission notice appear in all copies. │ +│ │ +│ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL │ +│ WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED │ +│ WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE │ +│ AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL │ +│ DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR │ +│ PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER │ +│ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │ +│ PERFORMANCE OF THIS SOFTWARE. │ +╚─────────────────────────────────────────────────────────────────────────────*/ +#include "libc/intrin/atomic.h" +#include "net/http/tokenbucket.h" + +/** + * Returns byte comparison mask w/ 0xff if equal otherwise 0x00. + */ +static inline uint64_t CompareEq(uint64_t x, uint64_t y) { + uint64_t mask, zoro = x ^ y; + mask = ((((zoro >> 1) | 0x8080808080808080) - zoro) & 0x8080808080808080); + return (mask << 1) - (mask >> 7); +} + +/** + * Atomically increments all signed bytes in array without overflow. + * + * This function should be called periodically so buckets have tokens. + * While many threads can consumes tokens, only a single thread can use + * the replenish operation. + * + * This function implements a SWAR algorithm offering the best possible + * performance under the constraint that operations happen atomically. + * This function should take 2ms to add a token to 2**22 buckets which + * need a 4mb array that has one bucket for every 1024 IPv4 addresses. + * However that doesn't matter since no locks are held during that 2ms + * therefore replenishing doesn't block threads that acquire tokens. + * + * @param w is word array that aliases byte token array + * @param n is number of 64-bit words in `w` array + */ +void ReplenishTokens(atomic_uint_fast64_t *w, size_t n) { + for (size_t i = 0; i < n; ++i) { + uint64_t x = atomic_load_explicit(w + i, memory_order_relaxed); + atomic_fetch_add_explicit( + w + i, 0x0101010101010101 & ~CompareEq(x, 0x7f7f7f7f7f7f7f7f), + memory_order_acq_rel); + } +} + +/** + * Atomically decrements signed byte index if it's positive. + * + * This function should be called to take a token from the right bucket + * whenever a client wants to use some type of resource. This routine + * discriminates based on `c` which is the netmask bit count. There must + * exist `1 << c` signed bytes (or buckets) in the `b` array. + * + * Tokens are considered available if the bucket corresponding `x` has a + * positive number. This function returns true of a token was atomically + * acquired using a lockeless spinless algorithm. Buckets are allowed to + * drift into a slightly negative state, but overflow is impractical. + * + * @param w is array of token buckets + * @param n is ipv4 address + * @param c is cidr + */ +bool AcquireToken(atomic_schar *b, uint32_t x, int c) { + uint32_t i = x >> (32 - c); + return atomic_load_explicit(b + i, memory_order_relaxed) > 0 && + atomic_fetch_add_explicit(b + i, -1, memory_order_acq_rel) > 0; +} + +/** + * Returns current number of tokens in bucket. + * + * @param w is array of token buckets + * @param n is ipv4 address + * @param c is cidr + */ +int CountTokens(atomic_schar *b, uint32_t x, int c) { + uint32_t i = x >> (32 - c); + return atomic_load_explicit(b + i, memory_order_relaxed); +} diff --git a/net/http/tokenbucket.h b/net/http/tokenbucket.h new file mode 100644 index 000000000..39b84789a --- /dev/null +++ b/net/http/tokenbucket.h @@ -0,0 +1,13 @@ +#ifndef COSMOPOLITAN_NET_HTTP_TOKENBUCKET_H_ +#define COSMOPOLITAN_NET_HTTP_TOKENBUCKET_H_ +#include "libc/atomic.h" +#if !(__ASSEMBLER__ + __LINKER__ + 0) +COSMOPOLITAN_C_START_ + +void ReplenishTokens(atomic_uint_fast64_t *, size_t); +bool AcquireToken(atomic_schar *, uint32_t, int); +int CountTokens(atomic_schar *, uint32_t, int); + +COSMOPOLITAN_C_END_ +#endif /* !(__ASSEMBLER__ + __LINKER__ + 0) */ +#endif /* COSMOPOLITAN_NET_HTTP_TOKENBUCKET_H_ */ diff --git a/test/net/http/tokenbucket_test.c b/test/net/http/tokenbucket_test.c new file mode 100644 index 000000000..17dd8ff9b --- /dev/null +++ b/test/net/http/tokenbucket_test.c @@ -0,0 +1,73 @@ +/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│ +│vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi│ +╞══════════════════════════════════════════════════════════════════════════════╡ +│ Copyright 2022 Justine Alexandra Roberts Tunney │ +│ │ +│ Permission to use, copy, modify, and/or distribute this software for │ +│ any purpose with or without fee is hereby granted, provided that the │ +│ above copyright notice and this permission notice appear in all copies. │ +│ │ +│ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL │ +│ WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED │ +│ WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE │ +│ AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL │ +│ DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR │ +│ PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER │ +│ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │ +│ PERFORMANCE OF THIS SOFTWARE. │ +╚─────────────────────────────────────────────────────────────────────────────*/ +#include "libc/assert.h" +#include "libc/atomic.h" +#include "libc/calls/struct/timespec.h" +#include "libc/intrin/atomic.h" +#include "libc/intrin/kprintf.h" +#include "libc/limits.h" +#include "libc/mem/mem.h" +#include "libc/stdio/stdio.h" +#include "libc/testlib/ezbench.h" +#include "libc/testlib/testlib.h" +#include "net/http/http.h" +#include "net/http/tokenbucket.h" + +#define TB_CIDR 22 +#define TB_BYTES (1u << TB_CIDR) +#define TB_WORDS (TB_BYTES / 8) + +union TokenBucket { + atomic_schar *b; + atomic_uint_fast64_t *w; +} tok; + +void SetUp(void) { + ASSERT_NE(NULL, (tok.b = calloc(TB_BYTES, 1))); +} + +void TearDown(void) { + free(tok.b); +} + +TEST(tokenbucket, test) { + ASSERT_FALSE(AcquireToken(tok.b, 0x7f000001, TB_CIDR)); + ASSERT_FALSE(AcquireToken(tok.b, 0x7f000002, TB_CIDR)); + ReplenishTokens(tok.w, TB_WORDS); + ReplenishTokens(tok.w, TB_WORDS); + ASSERT_TRUE(AcquireToken(tok.b, 0x7f000001, TB_CIDR)); + ASSERT_TRUE(AcquireToken(tok.b, 0x7f000002, TB_CIDR)); + ASSERT_FALSE(AcquireToken(tok.b, 0x7f000001, TB_CIDR)); + ASSERT_TRUE(AcquireToken(tok.b, 0x08080808, TB_CIDR)); + ReplenishTokens(tok.w, TB_WORDS); + ReplenishTokens(tok.w, TB_WORDS); + ASSERT_TRUE(AcquireToken(tok.b, 0x7f000001, TB_CIDR)); + ASSERT_TRUE(AcquireToken(tok.b, 0x7f000001, TB_CIDR)); + ASSERT_FALSE(AcquireToken(tok.b, 0x7f000002, TB_CIDR)); + ASSERT_TRUE(AcquireToken(tok.b, 0x08080808, TB_CIDR)); +} + +BENCH(tokenbucket, bench) { + struct timespec t1, t2; + clock_gettime(0, &t1); + ReplenishTokens(tok.w, TB_WORDS); + clock_gettime(0, &t2); + kprintf("token bucket replenish took %'ld us\n", + _timespec_tomicros(_timespec_sub(t2, t1))); +}