kvgraphics with interactio

This commit is contained in:
pudepiedj 2024-02-20 19:31:29 +00:00
parent 69cb1ef0b1
commit 4904b0a06e
4 changed files with 97 additions and 47 deletions

View file

@ -76,8 +76,8 @@ def send_request(q, question, event, count, num_requests):
print(f"Server responded with code {response.status_code}\n") print(f"Server responded with code {response.status_code}\n")
except Exception as e: except Exception as e:
print(f"Server returned exception error {e}") print(f"Server returned exception error {e}")
sleep(delay) # sleep(delay)
delay *= 2 # delay *= 2
if __name__ == "__main__": if __name__ == "__main__":
@ -105,7 +105,7 @@ if __name__ == "__main__":
for i in range(num_requests): for i in range(num_requests):
country = country_list[i % len(country_list)] country = country_list[i % len(country_list)]
question = f"When was the first democratic election (if any) in {country}?" question = f"What was the total population of {country} in 2018?"
# NOTE: don't pass the parameter as a function call; pass in args # NOTE: don't pass the parameter as a function call; pass in args
print(f"Processing request {i} / {num_requests}: {question}\n") print(f"Processing request {i} / {num_requests}: {question}\n")
event = threading.Event() event = threading.Event()

View file

@ -96,7 +96,7 @@
#ifndef CPPHTTPLIB_THREAD_POOL_COUNT #ifndef CPPHTTPLIB_THREAD_POOL_COUNT
#define CPPHTTPLIB_THREAD_POOL_COUNT \ #define CPPHTTPLIB_THREAD_POOL_COUNT \
((std::max)(8u, std::thread::hardware_concurrency() > 0 \ ((std::max)(32u, std::thread::hardware_concurrency() > 0 \
? std::thread::hardware_concurrency() - 1 \ ? std::thread::hardware_concurrency() - 1 \
: 0)) : 0))
#endif #endif

View file

@ -265,6 +265,7 @@ struct llama_client_slot
return state == IDLE && command == NONE; return state == IDLE && command == NONE;
} }
// this doesn't set the is_processing; it tests it and returns true only if either condition is met
bool is_processing() const { bool is_processing() const {
return (state == IDLE && command == LOAD_PROMPT) || state == PROCESSING; return (state == IDLE && command == LOAD_PROMPT) || state == PROCESSING;
} }
@ -301,13 +302,19 @@ struct llama_client_slot
}; };
} }
void print_timings() const { void print_timings(llama_client_slot &slot, bool flag = false) const {
LOG_TEE("\n"); printf("\033[21;0H");
LOG_TEE("Finished processing slot %d.\n", slot.id);
LOG_TEE("%s: prompt eval time = %10.2f ms / %5d tokens (%8.2f ms per token, %8.2f tokens per second)\n", LOG_TEE("%s: prompt eval time = %10.2f ms / %5d tokens (%8.2f ms per token, %8.2f tokens per second)\n",
__func__, t_prompt_processing, num_prompt_tokens_processed, t_prompt_processing / num_prompt_tokens_processed, 1e3 / t_prompt_processing * num_prompt_tokens_processed); __func__, t_prompt_processing, num_prompt_tokens_processed, t_prompt_processing / num_prompt_tokens_processed, 1e3 / t_prompt_processing * num_prompt_tokens_processed);
LOG_TEE("%s: eval time = %10.2f ms / %5d runs (%8.2f ms per token, %8.2f tokens per second)\n", LOG_TEE("%s: eval time = %10.2f ms / %5d runs (%8.2f ms per token, %8.2f tokens per second)\n",
__func__, t_token_generation, n_decoded,t_token_generation / n_decoded, 1e3 / t_token_generation * n_decoded); __func__, t_token_generation, n_decoded,t_token_generation / n_decoded, 1e3 / t_token_generation * n_decoded);
LOG_TEE("%s: total time = %10.2f ms\n", __func__, t_prompt_processing + t_token_generation); LOG_TEE("%s: total time = %10.2f ms\n", __func__, t_prompt_processing + t_token_generation);
if (flag) {
printf("\033[25;0HPress any key ... ");
getchar();
}
} }
}; };
@ -374,7 +381,7 @@ static void kvgraphics(std::vector<llama_client_slot>& slots, int cache_size) {
} }
printf(" %4zu/%5zu %2d %s %s %s\n", slots[i].cache_tokens.size(), slot_cache_size, slots[i].id, slot_symbol1.c_str(), slot_symbol2.c_str(), slot_symbol3.c_str()); printf(" %4zu/%5zu %2d %s %s %s\n", slots[i].cache_tokens.size(), slot_cache_size, slots[i].id, slot_symbol1.c_str(), slot_symbol2.c_str(), slot_symbol3.c_str());
} }
printf("\n\033[%dJ", num_blocks); // move cursor to end of cache display printf("\n\033[%dJ", num_blocks+5); // move cursor to end of cache display
} }
struct llama_server_context struct llama_server_context
@ -393,6 +400,7 @@ struct llama_server_context
bool all_slots_are_idle = false; bool all_slots_are_idle = false;
bool add_bos_token = true; bool add_bos_token = true;
bool skvgraphics = false; bool skvgraphics = false;
bool skvinteract = false;
int32_t n_ctx; // total context for all clients / slots int32_t n_ctx; // total context for all clients / slots
@ -561,19 +569,24 @@ struct llama_server_context
return prompt_tokens; return prompt_tokens;
} }
// the logic seems wrong in this function
// why should there be an id in a task matching a slot.id before a slot has been assigned?
// most commonly id = -1 so we deal with that first rather than the specified id > 0
llama_client_slot* get_slot(int id) { llama_client_slot* get_slot(int id) {
int64_t t_last = ggml_time_us(); int64_t t_last = ggml_time_us();
llama_client_slot *last_used = nullptr; llama_client_slot *last_used = nullptr;
for (llama_client_slot & slot : slots) for (llama_client_slot & slot : slots)
{ {
if (slot.state == IDLE && slot.command != LOAD_PROMPT) { if (slot.id == -1 && slot.available())
LOG_TEE("Hijacking the first available slot %d\n", slot.id); {
LOG_TEE("Unallocated task now using slot %d", slot.id);
return &slot; return &slot;
} }
if (slot.id == id && slot.available()) if (slot.id == id && slot.available())
{ {
LOG_TEE("Using if-based available slot called by id: %d", slot.id); LOG_TEE("Using id-based available slot called by id: %d", slot.id);
return &slot; return &slot;
} }
@ -581,8 +594,9 @@ struct llama_server_context
{ {
last_used = &slot; last_used = &slot;
t_last = slot.t_last_used; t_last = slot.t_last_used;
LOG_TEE("Using time-based slot id: %d\n", slot.id); // remove this commented-out break to choose the first available slot rather than the last
//break; // has the effect of reducing the number of slots used
// break;
} }
} }
@ -1301,14 +1315,14 @@ struct llama_server_context
void request_completion(int task_id, json data, bool infill, bool embedding, int multitask_id) void request_completion(int task_id, json data, bool infill, bool embedding, int multitask_id)
{ {
task_server task; task_server task; // create a task_server instance task
task.id = task_id; task.id = task_id; // assign it an id based on ... what? Is this the sequential number of the request?
task.target_id = 0; task.target_id = 0; // what is this?
task.data = std::move(data); task.data = std::move(data);
task.infill_mode = infill; task.infill_mode = infill; //
task.embedding_mode = embedding; task.embedding_mode = embedding; //
task.type = TASK_TYPE_COMPLETION; task.type = TASK_TYPE_COMPLETION; // what to do
task.multitask_id = multitask_id; task.multitask_id = multitask_id; //
// when a completion task's prompt array is not a singleton, we split it into multiple requests // when a completion task's prompt array is not a singleton, we split it into multiple requests
// otherwise, it's a single-prompt task, we actually queue it // otherwise, it's a single-prompt task, we actually queue it
@ -1407,13 +1421,13 @@ struct llama_server_context
void request_cancel(int task_id) void request_cancel(int task_id)
{ {
task_server task; task_server task; // doesn't this assign a completely empty task then cancel it
task.type = TASK_TYPE_CANCEL; task.type = TASK_TYPE_CANCEL; // seems as though the only relevant parameter is task_id as the target ???
task.target_id = task_id; task.target_id = task_id; // rather than cancel a task already assigned a task?
queue_tasks.post(task); queue_tasks.post(task); // add the task to cancel to the queue
} }
void split_multiprompt_task(int multitask_id, task_server& multiprompt_task) void split_multiprompt_task(int multitask_id, task_server& multiprompt_task) // what exactly is a multiprompt_task?
{ {
int prompt_count = multiprompt_task.data.at("prompt").size(); int prompt_count = multiprompt_task.data.at("prompt").size();
if (prompt_count <= 1) { if (prompt_count <= 1) {
@ -1447,14 +1461,25 @@ struct llama_server_context
switch (task.type) switch (task.type)
{ {
case TASK_TYPE_COMPLETION: { case TASK_TYPE_COMPLETION: {
printf("Task data %d.\n", task.id);
llama_client_slot *slot = get_slot(json_value(task.data, "slot_id", -1)); // returns nullptr if no slot available // we need to sort out the task_server / slots logic which isn't right
// why should task.data already contain a slot_id key when we haven't allocated it?
// because if it doesnt the returned value will be -1; what makes it anything else?
int requested_slot = json_value(task.data, "slot_id", -1);
LOG_TEE("Task %d requesting slot %d\n", task.id, requested_slot);
// why are we suddenly using 'slot' as a pointer here - confusing?
llama_client_slot *slot = get_slot(requested_slot); // returns nullptr if no slot available
if (slot == nullptr) if (slot == nullptr)
{ {
// if no slot is available, we defer this task for processing later // if no slot is available, we defer this task for processing later
LOG_VERBOSE("no slot is available", {}); LOG_TEE("no slot is available for task %d\n", task.id);
queue_tasks.defer(task); queue_tasks.defer(task);
break; break;
} else {
LOG_TEE("Activating slot %d.\n", (*slot).id);
(*slot).state = PROCESSING; // makes slot.is_processing true
(*slot).command = LOAD_PROMPT; // why not a new flag 'RUNNING'? does this do anything when state is PROC
} }
if (task.data.contains("system_prompt")) if (task.data.contains("system_prompt"))
@ -1542,20 +1567,22 @@ struct llama_server_context
return true; return true;
} }
task_server task; task_server task; // where is space made (malloc) for all these tasks of indeterminate number?
task.type = TASK_TYPE_NEXT_RESPONSE; task.type = TASK_TYPE_NEXT_RESPONSE; // where is this set?
task.target_id = -1; task.target_id = -1; // -1 means here as elsewhere 'available to be set'
queue_tasks.post(task); queue_tasks.post(task);
for (llama_client_slot &slot : slots) for (llama_client_slot &slot : slots)
{ {
if (slot.ga_n == 1) if (slot.ga_n == 1) // what is this?
{ {
// if we are processing and SYSTEM + CACHE > SLOT_KVCACHE_SIZE (but is this slot.n_ctx or slot.n_ctx_slot?)
// we need to do a context shift
if (slot.is_processing() && system_tokens.size() + slot.cache_tokens.size() >= (size_t) slot.n_ctx) if (slot.is_processing() && system_tokens.size() + slot.cache_tokens.size() >= (size_t) slot.n_ctx)
{ {
// Shift context // Shift context
const int n_left = system_tokens.size() + slot.n_past - slot.params.n_keep - 1; const int n_left = system_tokens.size() + slot.n_past - slot.params.n_keep - 1;
const int n_discard = n_left / 2; const int n_discard = n_left / 2; // is this arbitrary?
LOG_TEE("slot %d: context shift - n_keep = %d, n_left = %d, n_discard = %d\n", slot.id, slot.params.n_keep, n_left, n_discard); LOG_TEE("slot %d: context shift - n_keep = %d, n_left = %d, n_discard = %d\n", slot.id, slot.params.n_keep, n_left, n_discard);
llama_kv_cache_seq_rm (ctx, slot.id, slot.params.n_keep + 1 , slot.params.n_keep + n_discard + 1); llama_kv_cache_seq_rm (ctx, slot.id, slot.params.n_keep + 1 , slot.params.n_keep + n_discard + 1);
@ -1627,7 +1654,7 @@ struct llama_server_context
if (slot.state == IDLE && slot.command == LOAD_PROMPT && !has_prompt && !slot.infill) if (slot.state == IDLE && slot.command == LOAD_PROMPT && !has_prompt && !slot.infill)
{ {
slot.release(); slot.release();
slot.print_timings(); slot.print_timings(slot, skvinteract);
send_final_response(slot); send_final_response(slot);
continue; continue;
} }
@ -1860,6 +1887,8 @@ struct llama_server_context
0, 0, 0, // unused 0, 0, 0, // unused
}; };
// llama_decode returns 0 (success); 1 (cannot find space); -1 (error)
// n_batch is the size of batch we are looking for; ret is whether we succeed
const int ret = llama_decode(ctx, batch_view); const int ret = llama_decode(ctx, batch_view);
if (ret != 0) if (ret != 0)
@ -1871,12 +1900,14 @@ struct llama_server_context
return false; return false;
} }
// we get here if ret = 1 and n_batch != 1
LOG_TEE("%s : failed to find free space in the KV cache, retrying with smaller n_batch = %d\n", __func__, n_batch / 2); LOG_TEE("%s : failed to find free space in the KV cache, retrying with smaller n_batch = %d\n", __func__, n_batch / 2);
// retry with half the batch size to try to find a free slot in the KV cache // retry with half the batch size to try to find a free slot in the KV cache
n_batch /= 2; n_batch /= 2;
i -= n_batch; i -= n_batch; // this modifies a loop variable inside its own loop, which is potentially dangerous
continue; continue; // but where do we restore n_batch in cases where we do a context shift?
// apparently at line ~1840f but this doesn't always work because slots get jammed when full
} }
for (auto & slot : slots) for (auto & slot : slots)
@ -1925,7 +1956,7 @@ struct llama_server_context
if (!process_token(result, slot)) if (!process_token(result, slot))
{ {
slot.release(); slot.release();
slot.print_timings(); slot.print_timings(slot, skvinteract);
send_final_response(slot); send_final_response(slot);
} }
@ -2014,6 +2045,8 @@ static void server_print_usage(const char *argv0, const gpt_params &params,
printf(" --log-disable disables logging to a file.\n"); printf(" --log-disable disables logging to a file.\n");
printf(" --slots-endpoint-disable disables slots monitoring endpoint.\n"); printf(" --slots-endpoint-disable disables slots monitoring endpoint.\n");
printf(" -skvg, --show-graphics enable graphics displaying kvcache occupancy (default: false)"); printf(" -skvg, --show-graphics enable graphics displaying kvcache occupancy (default: false)");
printf(" -skvi, --show-interactive-graphics\n");
printf(" enable graphics displaying kvcache occupancy with user pause (default: false)\n");
printf("\n"); printf("\n");
printf(" -n, --n-predict maximum tokens to predict (default: %d)\n", params.n_predict); printf(" -n, --n-predict maximum tokens to predict (default: %d)\n", params.n_predict);
printf(" --override-kv KEY=TYPE:VALUE\n"); printf(" --override-kv KEY=TYPE:VALUE\n");
@ -2246,7 +2279,7 @@ static void server_params_parse(int argc, char **argv, server_params &sparams,
params.n_batch = std::stoi(argv[i]); params.n_batch = std::stoi(argv[i]);
params.n_batch = std::min(512, params.n_batch); params.n_batch = std::min(512, params.n_batch);
} }
else if (arg == "-skvg" || arg == "--show-graphics") else if (arg == "-skvg" || arg == "--show-graphics")
{ {
if (i >= argc) if (i >= argc)
{ {
@ -2254,6 +2287,17 @@ static void server_params_parse(int argc, char **argv, server_params &sparams,
break; break;
} }
llama.skvgraphics = true; llama.skvgraphics = true;
llama.skvinteract = false;
}
else if (arg == "-skvi" || arg == "--show-interactive-graphics")
{
if (i >= argc)
{
invalid_param = true;
break;
}
llama.skvgraphics = true;
llama.skvinteract = true;
} }
else if (arg == "--gpu-layers" || arg == "-ngl" || arg == "--n-gpu-layers") else if (arg == "--gpu-layers" || arg == "-ngl" || arg == "--n-gpu-layers")
{ {
@ -2813,6 +2857,7 @@ int main(int argc, char **argv)
LOG_INFO("HTTP server listening", log_data); LOG_INFO("HTTP server listening", log_data);
// run the HTTP server in a thread - see comment below // run the HTTP server in a thread - see comment below
// is there a limit on the number of threads the HTTP can handle?
std::thread t([&]() std::thread t([&]()
{ {
if (!svr.listen_after_bind()) if (!svr.listen_after_bind())
@ -2822,7 +2867,7 @@ int main(int argc, char **argv)
} }
return 0; return 0;
}); }); // end of thread t
// load the model // load the model
if (!llama.load_model(params)) if (!llama.load_model(params))
@ -2907,13 +2952,19 @@ int main(int argc, char **argv)
if (!validate_api_key(req, res)) { if (!validate_api_key(req, res)) {
return; return;
} }
// it appears that here we first get ONE request to parse; then TEN; then ONE-by-ONE
printf("Request body to parse: %s", req.body.c_str());
if (llama.skvinteract) {
getchar();
}
json data = json::parse(req.body); json data = json::parse(req.body);
const int task_id = llama.queue_tasks.get_new_id(); const int task_id = llama.queue_tasks.get_new_id(); // just returns a new id number
llama.queue_results.add_waiting_task_id(task_id); llama.queue_results.add_waiting_task_id(task_id);
LOG_TEE("Initiated new task %d.\n", task_id); LOG_TEE("Initiated new task %d.\n", task_id);
llama.request_completion(task_id, data, false, false, -1); llama.request_completion(task_id, data, false, false, -1);
if (!json_value(data, "stream", false)) { if (!json_value(data, "stream", false)) {
std::string completion_text; std::string completion_text; // is this the ANSWER? never used?
LOG_TEE("The answer is %s\n", completion_text.c_str());
task_result result = llama.queue_results.recv(task_id); task_result result = llama.queue_results.recv(task_id);
if (!result.error && result.stop) { if (!result.error && result.stop) {
res.set_content(result.result_json.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); res.set_content(result.result_json.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8");

View file

@ -162,9 +162,6 @@ template <typename T>
static T json_value(const json &body, const std::string &key, const T &default_value) static T json_value(const json &body, const std::string &key, const T &default_value)
{ {
// Fallback null to default value // Fallback null to default value
if (body.contains(key) && !body.at(key).is_null()) {
LOG_TEE("Body at %s in %d\n", key.c_str(), int(body.at(key)));
}
return body.contains(key) && !body.at(key).is_null() return body.contains(key) && !body.at(key).is_null()
? body.value(key, default_value) ? body.value(key, default_value)
: default_value; : default_value;
@ -238,7 +235,7 @@ struct llama_server_queue {
int post(task_server task) { int post(task_server task) {
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
if (task.id == -1) { if (task.id == -1) {
task.id = id++; task.id = id; // originally id++ but this repeats get_new_id below
} }
queue_tasks.push_back(std::move(task)); queue_tasks.push_back(std::move(task));
//LOG_TEE("Queue now has %2zu members.\n", queue_tasks.size()); //LOG_TEE("Queue now has %2zu members.\n", queue_tasks.size());
@ -294,12 +291,12 @@ struct llama_server_queue {
condition_tasks.notify_all(); condition_tasks.notify_all();
} }
// Start the main loop. // Start the main loop. Called from the very end of server.cpp
void start_loop() { void start_loop() {
running = true; running = true;
while (true) { while (true) {
// new task arrived // new task arrived
LOG_VERBOSE("have new task number %d.\n", {}); LOG_TEE("In start_loop have new task number %d.\n", id);
{ {
while (true) while (true)
{ {
@ -383,7 +380,7 @@ struct llama_server_response {
typedef std::function<void(int, int, task_result&)> callback_multitask_t; typedef std::function<void(int, int, task_result&)> callback_multitask_t;
callback_multitask_t callback_update_multitask; callback_multitask_t callback_update_multitask;
// for keeping track of all tasks waiting for the result // for keeping track of all tasks waiting for the result
std::set<int> waiting_task_ids; std::set<int> waiting_task_ids; // so this stores waiting tasks with no obvious limit
// the main result queue // the main result queue
std::vector<task_result> queue_results; std::vector<task_result> queue_results;
std::mutex mutex_results; std::mutex mutex_results;
@ -392,11 +389,13 @@ struct llama_server_response {
void add_waiting_task_id(int task_id) { void add_waiting_task_id(int task_id) {
std::unique_lock<std::mutex> lock(mutex_results); std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.insert(task_id); waiting_task_ids.insert(task_id);
LOG_TEE("Waiting task list size after addition: %zu.\n", waiting_task_ids.size());
} }
void remove_waiting_task_id(int task_id) { void remove_waiting_task_id(int task_id) {
std::unique_lock<std::mutex> lock(mutex_results); std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.erase(task_id); waiting_task_ids.erase(task_id);
LOG_TEE("Waiting task list size after removal: %zu.\n", waiting_task_ids.size());
} }
// This function blocks the thread until there is a response for this task_id // This function blocks the thread until there is a response for this task_id