diff --git a/Llamaserver.py b/Llamaserver.py index 4969a2410..3bdee15bc 100644 --- a/Llamaserver.py +++ b/Llamaserver.py @@ -76,8 +76,8 @@ def send_request(q, question, event, count, num_requests): print(f"Server responded with code {response.status_code}\n") except Exception as e: print(f"Server returned exception error {e}") - sleep(delay) - delay *= 2 + # sleep(delay) + # delay *= 2 if __name__ == "__main__": @@ -105,7 +105,7 @@ if __name__ == "__main__": for i in range(num_requests): country = country_list[i % len(country_list)] - question = f"When was the first democratic election (if any) in {country}?" + question = f"What was the total population of {country} in 2018?" # NOTE: don't pass the parameter as a function call; pass in args print(f"Processing request {i} / {num_requests}: {question}\n") event = threading.Event() diff --git a/examples/server/httplib.h b/examples/server/httplib.h index 72d806516..2fa9ff35b 100644 --- a/examples/server/httplib.h +++ b/examples/server/httplib.h @@ -96,7 +96,7 @@ #ifndef CPPHTTPLIB_THREAD_POOL_COUNT #define CPPHTTPLIB_THREAD_POOL_COUNT \ - ((std::max)(8u, std::thread::hardware_concurrency() > 0 \ + ((std::max)(32u, std::thread::hardware_concurrency() > 0 \ ? std::thread::hardware_concurrency() - 1 \ : 0)) #endif diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 7b674b69f..164089d27 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -265,6 +265,7 @@ struct llama_client_slot return state == IDLE && command == NONE; } + // this doesn't set the is_processing; it tests it and returns true only if either condition is met bool is_processing() const { return (state == IDLE && command == LOAD_PROMPT) || state == PROCESSING; } @@ -301,13 +302,19 @@ struct llama_client_slot }; } - void print_timings() const { - LOG_TEE("\n"); + void print_timings(llama_client_slot &slot, bool flag = false) const { + printf("\033[21;0H"); + LOG_TEE("Finished processing slot %d.\n", slot.id); LOG_TEE("%s: prompt eval time = %10.2f ms / %5d tokens (%8.2f ms per token, %8.2f tokens per second)\n", __func__, t_prompt_processing, num_prompt_tokens_processed, t_prompt_processing / num_prompt_tokens_processed, 1e3 / t_prompt_processing * num_prompt_tokens_processed); LOG_TEE("%s: eval time = %10.2f ms / %5d runs (%8.2f ms per token, %8.2f tokens per second)\n", __func__, t_token_generation, n_decoded,t_token_generation / n_decoded, 1e3 / t_token_generation * n_decoded); LOG_TEE("%s: total time = %10.2f ms\n", __func__, t_prompt_processing + t_token_generation); + + if (flag) { + printf("\033[25;0HPress any key ... "); + getchar(); + } } }; @@ -374,7 +381,7 @@ static void kvgraphics(std::vector& slots, int cache_size) { } printf(" %4zu/%5zu %2d %s %s %s\n", slots[i].cache_tokens.size(), slot_cache_size, slots[i].id, slot_symbol1.c_str(), slot_symbol2.c_str(), slot_symbol3.c_str()); } - printf("\n\033[%dJ", num_blocks); // move cursor to end of cache display + printf("\n\033[%dJ", num_blocks+5); // move cursor to end of cache display } struct llama_server_context @@ -393,6 +400,7 @@ struct llama_server_context bool all_slots_are_idle = false; bool add_bos_token = true; bool skvgraphics = false; + bool skvinteract = false; int32_t n_ctx; // total context for all clients / slots @@ -561,19 +569,24 @@ struct llama_server_context return prompt_tokens; } + // the logic seems wrong in this function + // why should there be an id in a task matching a slot.id before a slot has been assigned? + // most commonly id = -1 so we deal with that first rather than the specified id > 0 llama_client_slot* get_slot(int id) { int64_t t_last = ggml_time_us(); llama_client_slot *last_used = nullptr; for (llama_client_slot & slot : slots) { - if (slot.state == IDLE && slot.command != LOAD_PROMPT) { - LOG_TEE("Hijacking the first available slot %d\n", slot.id); + if (slot.id == -1 && slot.available()) + { + LOG_TEE("Unallocated task now using slot %d", slot.id); return &slot; } + if (slot.id == id && slot.available()) { - LOG_TEE("Using if-based available slot called by id: %d", slot.id); + LOG_TEE("Using id-based available slot called by id: %d", slot.id); return &slot; } @@ -581,8 +594,9 @@ struct llama_server_context { last_used = &slot; t_last = slot.t_last_used; - LOG_TEE("Using time-based slot id: %d\n", slot.id); - //break; + // remove this commented-out break to choose the first available slot rather than the last + // has the effect of reducing the number of slots used + // break; } } @@ -1301,14 +1315,14 @@ struct llama_server_context void request_completion(int task_id, json data, bool infill, bool embedding, int multitask_id) { - task_server task; - task.id = task_id; - task.target_id = 0; + task_server task; // create a task_server instance task + task.id = task_id; // assign it an id based on ... what? Is this the sequential number of the request? + task.target_id = 0; // what is this? task.data = std::move(data); - task.infill_mode = infill; - task.embedding_mode = embedding; - task.type = TASK_TYPE_COMPLETION; - task.multitask_id = multitask_id; + task.infill_mode = infill; // + task.embedding_mode = embedding; // + task.type = TASK_TYPE_COMPLETION; // what to do + task.multitask_id = multitask_id; // // when a completion task's prompt array is not a singleton, we split it into multiple requests // otherwise, it's a single-prompt task, we actually queue it @@ -1407,13 +1421,13 @@ struct llama_server_context void request_cancel(int task_id) { - task_server task; - task.type = TASK_TYPE_CANCEL; - task.target_id = task_id; - queue_tasks.post(task); + task_server task; // doesn't this assign a completely empty task then cancel it + task.type = TASK_TYPE_CANCEL; // seems as though the only relevant parameter is task_id as the target ??? + task.target_id = task_id; // rather than cancel a task already assigned a task? + queue_tasks.post(task); // add the task to cancel to the queue } - void split_multiprompt_task(int multitask_id, task_server& multiprompt_task) + void split_multiprompt_task(int multitask_id, task_server& multiprompt_task) // what exactly is a multiprompt_task? { int prompt_count = multiprompt_task.data.at("prompt").size(); if (prompt_count <= 1) { @@ -1447,14 +1461,25 @@ struct llama_server_context switch (task.type) { case TASK_TYPE_COMPLETION: { - printf("Task data %d.\n", task.id); - llama_client_slot *slot = get_slot(json_value(task.data, "slot_id", -1)); // returns nullptr if no slot available + + // we need to sort out the task_server / slots logic which isn't right + // why should task.data already contain a slot_id key when we haven't allocated it? + // because if it doesnt the returned value will be -1; what makes it anything else? + int requested_slot = json_value(task.data, "slot_id", -1); + LOG_TEE("Task %d requesting slot %d\n", task.id, requested_slot); + + // why are we suddenly using 'slot' as a pointer here - confusing? + llama_client_slot *slot = get_slot(requested_slot); // returns nullptr if no slot available if (slot == nullptr) { // if no slot is available, we defer this task for processing later - LOG_VERBOSE("no slot is available", {}); + LOG_TEE("no slot is available for task %d\n", task.id); queue_tasks.defer(task); break; + } else { + LOG_TEE("Activating slot %d.\n", (*slot).id); + (*slot).state = PROCESSING; // makes slot.is_processing true + (*slot).command = LOAD_PROMPT; // why not a new flag 'RUNNING'? does this do anything when state is PROC } if (task.data.contains("system_prompt")) @@ -1542,20 +1567,22 @@ struct llama_server_context return true; } - task_server task; - task.type = TASK_TYPE_NEXT_RESPONSE; - task.target_id = -1; + task_server task; // where is space made (malloc) for all these tasks of indeterminate number? + task.type = TASK_TYPE_NEXT_RESPONSE; // where is this set? + task.target_id = -1; // -1 means here as elsewhere 'available to be set' queue_tasks.post(task); for (llama_client_slot &slot : slots) { - if (slot.ga_n == 1) + if (slot.ga_n == 1) // what is this? { + // if we are processing and SYSTEM + CACHE > SLOT_KVCACHE_SIZE (but is this slot.n_ctx or slot.n_ctx_slot?) + // we need to do a context shift if (slot.is_processing() && system_tokens.size() + slot.cache_tokens.size() >= (size_t) slot.n_ctx) { // Shift context const int n_left = system_tokens.size() + slot.n_past - slot.params.n_keep - 1; - const int n_discard = n_left / 2; + const int n_discard = n_left / 2; // is this arbitrary? LOG_TEE("slot %d: context shift - n_keep = %d, n_left = %d, n_discard = %d\n", slot.id, slot.params.n_keep, n_left, n_discard); llama_kv_cache_seq_rm (ctx, slot.id, slot.params.n_keep + 1 , slot.params.n_keep + n_discard + 1); @@ -1627,7 +1654,7 @@ struct llama_server_context if (slot.state == IDLE && slot.command == LOAD_PROMPT && !has_prompt && !slot.infill) { slot.release(); - slot.print_timings(); + slot.print_timings(slot, skvinteract); send_final_response(slot); continue; } @@ -1860,6 +1887,8 @@ struct llama_server_context 0, 0, 0, // unused }; + // llama_decode returns 0 (success); 1 (cannot find space); -1 (error) + // n_batch is the size of batch we are looking for; ret is whether we succeed const int ret = llama_decode(ctx, batch_view); if (ret != 0) @@ -1871,12 +1900,14 @@ struct llama_server_context return false; } + // we get here if ret = 1 and n_batch != 1 LOG_TEE("%s : failed to find free space in the KV cache, retrying with smaller n_batch = %d\n", __func__, n_batch / 2); // retry with half the batch size to try to find a free slot in the KV cache n_batch /= 2; - i -= n_batch; - continue; + i -= n_batch; // this modifies a loop variable inside its own loop, which is potentially dangerous + continue; // but where do we restore n_batch in cases where we do a context shift? + // apparently at line ~1840f but this doesn't always work because slots get jammed when full } for (auto & slot : slots) @@ -1925,7 +1956,7 @@ struct llama_server_context if (!process_token(result, slot)) { slot.release(); - slot.print_timings(); + slot.print_timings(slot, skvinteract); send_final_response(slot); } @@ -2014,6 +2045,8 @@ static void server_print_usage(const char *argv0, const gpt_params ¶ms, printf(" --log-disable disables logging to a file.\n"); printf(" --slots-endpoint-disable disables slots monitoring endpoint.\n"); printf(" -skvg, --show-graphics enable graphics displaying kvcache occupancy (default: false)"); + printf(" -skvi, --show-interactive-graphics\n"); + printf(" enable graphics displaying kvcache occupancy with user pause (default: false)\n"); printf("\n"); printf(" -n, --n-predict maximum tokens to predict (default: %d)\n", params.n_predict); printf(" --override-kv KEY=TYPE:VALUE\n"); @@ -2246,7 +2279,7 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, params.n_batch = std::stoi(argv[i]); params.n_batch = std::min(512, params.n_batch); } - else if (arg == "-skvg" || arg == "--show-graphics") + else if (arg == "-skvg" || arg == "--show-graphics") { if (i >= argc) { @@ -2254,6 +2287,17 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, break; } llama.skvgraphics = true; + llama.skvinteract = false; + } + else if (arg == "-skvi" || arg == "--show-interactive-graphics") + { + if (i >= argc) + { + invalid_param = true; + break; + } + llama.skvgraphics = true; + llama.skvinteract = true; } else if (arg == "--gpu-layers" || arg == "-ngl" || arg == "--n-gpu-layers") { @@ -2813,6 +2857,7 @@ int main(int argc, char **argv) LOG_INFO("HTTP server listening", log_data); // run the HTTP server in a thread - see comment below + // is there a limit on the number of threads the HTTP can handle? std::thread t([&]() { if (!svr.listen_after_bind()) @@ -2822,7 +2867,7 @@ int main(int argc, char **argv) } return 0; - }); + }); // end of thread t // load the model if (!llama.load_model(params)) @@ -2907,13 +2952,19 @@ int main(int argc, char **argv) if (!validate_api_key(req, res)) { return; } + // it appears that here we first get ONE request to parse; then TEN; then ONE-by-ONE + printf("Request body to parse: %s", req.body.c_str()); + if (llama.skvinteract) { + getchar(); + } json data = json::parse(req.body); - const int task_id = llama.queue_tasks.get_new_id(); + const int task_id = llama.queue_tasks.get_new_id(); // just returns a new id number llama.queue_results.add_waiting_task_id(task_id); LOG_TEE("Initiated new task %d.\n", task_id); llama.request_completion(task_id, data, false, false, -1); if (!json_value(data, "stream", false)) { - std::string completion_text; + std::string completion_text; // is this the ANSWER? never used? + LOG_TEE("The answer is %s\n", completion_text.c_str()); task_result result = llama.queue_results.recv(task_id); if (!result.error && result.stop) { res.set_content(result.result_json.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp index f016221a6..e293a063c 100644 --- a/examples/server/utils.hpp +++ b/examples/server/utils.hpp @@ -162,9 +162,6 @@ template static T json_value(const json &body, const std::string &key, const T &default_value) { // Fallback null to default value - if (body.contains(key) && !body.at(key).is_null()) { - LOG_TEE("Body at %s in %d\n", key.c_str(), int(body.at(key))); - } return body.contains(key) && !body.at(key).is_null() ? body.value(key, default_value) : default_value; @@ -238,7 +235,7 @@ struct llama_server_queue { int post(task_server task) { std::unique_lock lock(mutex_tasks); if (task.id == -1) { - task.id = id++; + task.id = id; // originally id++ but this repeats get_new_id below } queue_tasks.push_back(std::move(task)); //LOG_TEE("Queue now has %2zu members.\n", queue_tasks.size()); @@ -294,12 +291,12 @@ struct llama_server_queue { condition_tasks.notify_all(); } - // Start the main loop. + // Start the main loop. Called from the very end of server.cpp void start_loop() { running = true; while (true) { // new task arrived - LOG_VERBOSE("have new task number %d.\n", {}); + LOG_TEE("In start_loop have new task number %d.\n", id); { while (true) { @@ -383,7 +380,7 @@ struct llama_server_response { typedef std::function callback_multitask_t; callback_multitask_t callback_update_multitask; // for keeping track of all tasks waiting for the result - std::set waiting_task_ids; + std::set waiting_task_ids; // so this stores waiting tasks with no obvious limit // the main result queue std::vector queue_results; std::mutex mutex_results; @@ -392,11 +389,13 @@ struct llama_server_response { void add_waiting_task_id(int task_id) { std::unique_lock lock(mutex_results); waiting_task_ids.insert(task_id); + LOG_TEE("Waiting task list size after addition: %zu.\n", waiting_task_ids.size()); } void remove_waiting_task_id(int task_id) { std::unique_lock lock(mutex_results); waiting_task_ids.erase(task_id); + LOG_TEE("Waiting task list size after removal: %zu.\n", waiting_task_ids.size()); } // This function blocks the thread until there is a response for this task_id