improved python client lower threadpool

This commit is contained in:
pudepiedj 2024-02-21 17:33:03 +00:00
parent f7e29e5248
commit 1b04d5907b
4 changed files with 49 additions and 32 deletions

View file

@ -17,10 +17,16 @@ def print_dict(data):
print_dict(entry)
elif isinstance(data, str):
print(f"Incoming string is {data}.\n")
else:
print("No intelligible data received.\n")
return
def print_response(text):
def title_print(text):
length = len(text)
print("\n" + "*" * length)
print(text)
print("*" * length + "\n")
def make_empty_bar(num_requests):
bar = []
@ -28,7 +34,7 @@ def make_empty_bar(num_requests):
bar.append("\u2589")
bar = ' '.join(bar)
bar = bar.replace(' ','')
print(f"Bar is now {bar}.\n")
# print(f"Bar is now {bar}.\n")
return bar
def make_progress_bar(bar, count, num_requests):
@ -38,39 +44,41 @@ def make_progress_bar(bar, count, num_requests):
if i == count:
# print(f"Bar position {i} is {bar[i]}\n")
bar = bar[:i*stride1] + "\u23F1" + bar[i*stride1 + stride2:]
print(f"Bar is now {bar}\n")
return bar
print(f"Bar is now {bar}\n")
return bar
def send_request(q, question, event, count, num_requests):
def send_request(q, system, question, event, count, num_requests):
delay = 0.1
global bar
data = {'prompt': question}
data = {'system prompt': system, 'prompt': question}
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code in [200,300]:
print(f"Current Queue Size: {q.qsize()}; processing request {count} / {num_requests}\n")
print(f"Status Code for {question}: {response.status_code}\n")
print(f"Response to {question}:\n")
if isinstance(response.text, str):
data = json.loads(response.text)
if isinstance(data, dict):
print_dict(data)
elif isinstance(data, str):
print(data)
else:
print("\nServer returned data of wrong type.\n")
# put the response text in the queue
q.put(response.text)
if not q.empty():
print(f"Completed task {count} / {num_requests}\n")
bar = make_progress_bar(bar, count, num_requests)
q.task_done()
with lockbar:
print(f"Current Client Queue Size: {q.qsize()}; processing request {count} / {num_requests}\n")
print(f"Status Code for {question}: {response.status_code}\n")
print(f"Response to {question}:\n")
if isinstance(response.text, str):
data = json.loads(response.text)
if isinstance(data, dict):
print_dict(data)
elif isinstance(data, str):
print(data)
else:
print("\nServer returned data of wrong type.\n")
# put the response text in the queue
q.put(response.text)
if not q.empty():
#with lockbar: # lock automatically releases when the update is done
title_print(f"Completed task {count} / {num_requests}")
bar = make_progress_bar(bar, count, num_requests)
q.task_done()
elif response.status_code == 429 and not q.empty():
# event.set()
event.set()
print("Server return too many requests; back off!! Reset event.")
else:
print(f"Server responded with code {response.status_code}\n")
@ -82,11 +90,12 @@ def send_request(q, question, event, count, num_requests):
if __name__ == "__main__":
global bar
lockbar = threading.Lock()
url = "http://localhost:8080/completion"
num_requests = 256
q = Queue(maxsize = 256)
num_requests = 40
q = Queue(maxsize = 64)
threads = []
bar = make_empty_bar(num_requests)
@ -97,6 +106,11 @@ if __name__ == "__main__":
'User-Agent': 'Llamaserver.py'
}
system = "You are a helpful and cheerful \
assistant who answers questions briefly, \
clearly and without undue repetition \
paying very close attention to the requirements of the task set."
country_list = ["France", "Germany", "China", "USA", "Italy", "India",
"Ukraine", "Japan", "Australia", "New Zealand", "Indonesia", "Nigeria", "Saudi Arabia",
"Israel", "Egypt", "Kenya", "Chile", "Mexico", "Canada",
@ -109,7 +123,7 @@ if __name__ == "__main__":
# NOTE: don't pass the parameter as a function call; pass in args
print(f"Processing request {i} / {num_requests}: {question}\n")
event = threading.Event()
t = threading.Thread(target=send_request, args=(q, question, event, i, num_requests))
t = threading.Thread(target=send_request, args=(q, system, question, event, i, num_requests))
t.start()
threads.append(t)

View file

@ -96,7 +96,7 @@
// the value here (8u, 16u, 32u, etc) is what governs max threads at 5126
#ifndef CPPHTTPLIB_THREAD_POOL_COUNT
#define CPPHTTPLIB_THREAD_POOL_COUNT \
((std::max)(128u, std::thread::hardware_concurrency() > 0 \
((std::max)(32u, std::thread::hardware_concurrency() > 0 \
? std::thread::hardware_concurrency() - 1 \
: 0))
#endif

View file

@ -30,6 +30,7 @@
#include <condition_variable>
#include <atomic>
#include <signal.h>
#include <string>
#include <iostream> // do we still need this?
@ -350,7 +351,7 @@ static void kvgraphics(std::vector<llama_client_slot>& slots, int cache_size) {
printf("\033[1;0H\033[K**************************\n\033[KKVcache occupancy by slot:\n\033[K**************************\n");
for(int i=0; i<num_blocks; i++) {
//printf("\033[K"); // clear the current line
printf("\033[K"); // clear the current line
for(int j=0; j < max_length; j++) {
int used = slots[i].cache_tokens.size() * max_length / slot_cache_size;
if((j < max_length / 2) && (j < used)) {
@ -382,7 +383,7 @@ static void kvgraphics(std::vector<llama_client_slot>& slots, int cache_size) {
}
printf(" %4zu/%5zu %2d %s %s %s\n", slots[i].cache_tokens.size(), slot_cache_size, slots[i].id, slot_symbol1.c_str(), slot_symbol2.c_str(), slot_symbol3.c_str());
}
printf("\n\033[%dJ", num_blocks+5); // move cursor to end of cache display
printf("\n\033[%dJ", num_blocks+5); // move cursor to end of cache display
}
struct llama_server_context
@ -2664,6 +2665,7 @@ int main(int argc, char **argv)
llama_backend_init();
llama_numa_init(params.numa);
ggml_time_init();
LOG_INFO("build info", {{"build", LLAMA_BUILD_NUMBER},
{"commit", LLAMA_COMMIT}});

View file

@ -291,9 +291,10 @@ struct llama_server_queue {
// Start the main loop. Called from the very end of server.cpp
void start_loop() {
running = true;
//LOG_TEE("In start_loop have new task number %d.\n", id);
while (true) {
// new task arrived
LOG_TEE("In start_loop have new task number %d.\n", id);
// LOG_TEE("In start_loop have new task number %d.\n", id);
{
while (true)
{
@ -377,7 +378,7 @@ struct llama_server_response {
typedef std::function<void(int, int, task_result&)> callback_multitask_t;
callback_multitask_t callback_update_multitask;
// for keeping track of all tasks waiting for the result
std::set<int> waiting_task_ids; // so this stores waiting tasks with no obvious limit
std::set<int> waiting_task_ids; // this stores waiting tasks with no obvious limit
// the main result queue
std::vector<task_result> queue_results;
std::mutex mutex_results;