From ad1d746caa36295e1c238444231b1202153f756b Mon Sep 17 00:00:00 2001 From: Georgi Gerganov Date: Tue, 5 Mar 2024 14:57:15 +0200 Subject: [PATCH] server : normalize id vars --- examples/server/server.cpp | 299 +++++++++++++++++++------------------ examples/server/utils.hpp | 272 ++++++++++++++++----------------- 2 files changed, 283 insertions(+), 288 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 2ef8bf0e4..29fe96f83 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -81,7 +81,8 @@ struct server_params { struct server_slot { int id; - int task_id = -1; + int id_task = -1; + int id_multi = -1; struct slot_params params; @@ -141,9 +142,6 @@ struct server_slot { double t_prompt_processing; // ms double t_token_generation; // ms - // multitasks - int multitask_id = -1; - void reset() { n_prompt_tokens = 0; generated_text = ""; @@ -254,8 +252,8 @@ struct server_slot { t_token, n_tokens_second); LOG_INFO(buffer, { - {"slot_id", id}, - {"task_id", task_id}, + {"id_slot", id}, + {"id_task", id_task}, {"t_prompt_processing", t_prompt_processing}, {"n_prompt_tokens_processed", n_prompt_tokens_processed}, {"t_token", t_token}, @@ -270,8 +268,8 @@ struct server_slot { t_token, n_tokens_second); LOG_INFO(buffer, { - {"slot_id", id}, - {"task_id", task_id}, + {"id_slot", id}, + {"id_task", id_task}, {"t_token_generation", t_token_generation}, {"n_decoded", n_decoded}, {"t_token", t_token}, @@ -281,8 +279,8 @@ struct server_slot { snprintf(buffer, 512, " total time = %10.2f ms", t_prompt_processing + t_token_generation); LOG_INFO(buffer, { - {"slot_id", id}, - {"task_id", task_id}, + {"id_slot", id}, + {"id_task", id_task}, {"t_prompt_processing", t_prompt_processing}, {"t_token_generation", t_token_generation}, {"t_total", t_prompt_processing + t_token_generation}, @@ -405,7 +403,7 @@ struct llama_server_context { slot.n_predict = params.n_predict; LOG_INFO("new slot", { - {"slot_id", slot.id}, + {"id_slot", slot.id}, {"n_ctx_slot", slot.n_ctx} }); @@ -419,7 +417,7 @@ struct llama_server_context { //GGML_ASSERT(n_ctx >= n_ctx_train * ga_n && "n_ctx must be at least n_ctx_train * ga_n"); // NOLINT LOG_INFO("slot self-extend", { - {"slot_id", slot.id}, + {"id_slot", slot.id}, {"ga_n", ga_n}, {"ga_w", ga_w} }); @@ -591,7 +589,7 @@ struct llama_server_context { slot.sparams.use_penalty_prompt_tokens = true; LOG_VERBOSE("penalty_prompt_tokens", { - {"slot_id", slot.id}, + {"id_slot", slot.id}, {"tokens", slot.sparams.penalty_prompt_tokens}, }); } @@ -611,7 +609,7 @@ struct llama_server_context { slot.sparams.use_penalty_prompt_tokens = true; LOG_VERBOSE("penalty_prompt_tokens", { - {"slot_id", slot.id}, + {"id_slot", slot.id}, {"tokens", slot.sparams.penalty_prompt_tokens}, }); } @@ -686,8 +684,8 @@ struct llama_server_context { slot.command = LOAD_PROMPT; LOG_INFO("slot is processing task", { - {"slot_id", slot.id}, - {"task_id", slot.task_id}, + {"id_slot", slot.id}, + {"id_task", slot.id_task}, }); return true; @@ -843,7 +841,7 @@ struct llama_server_context { slot.has_next_token = false; LOG_VERBOSE("stopped by limit", { - {"slot_id", slot.id}, + {"id_slot", slot.id}, {"n_decoded", slot.n_decoded}, {"n_predict", slot.params.n_predict}, }); @@ -875,11 +873,11 @@ struct llama_server_context { LOG_TEE("task %i - error: %s\n", task.id, error.c_str()); task_result res; - res.id = task.id; - res.multitask_id = task.multitask_id; - res.stop = false; - res.error = true; - res.result_json = { { "content", error } }; + res.id = task.id; + res.id_multi = task.id_multi; + res.stop = false; + res.error = true; + res.result_json = { { "content", error } }; queue_results.send(res); } @@ -932,14 +930,14 @@ struct llama_server_context { void send_partial_response(server_slot & slot, completion_token_output tkn) { task_result res; - res.id = slot.task_id; - res.multitask_id = slot.multitask_id; - res.error = false; - res.stop = false; - res.result_json = json { + res.id = slot.id_task; + res.id_multi = slot.id_multi; + res.error = false; + res.stop = false; + res.result_json = json { {"content", tkn.text_to_send}, {"stop", false}, - {"slot_id", slot.id}, + {"id_slot", slot.id}, {"multimodal", false} }; @@ -969,13 +967,13 @@ struct llama_server_context { void send_final_response(const server_slot & slot) { task_result res; - res.id = slot.task_id; - res.multitask_id = slot.multitask_id; - res.error = false; - res.stop = true; - res.result_json = json { + res.id = slot.id_task; + res.id_multi = slot.id_multi; + res.error = false; + res.stop = true; + res.result_json = json { {"content", !slot.params.stream ? slot.generated_text : ""}, - {"slot_id", slot.id}, + {"id_slot", slot.id}, {"stop", true}, {"model", params.model_alias}, {"tokens_predicted", slot.n_decoded}, @@ -1018,10 +1016,10 @@ struct llama_server_context { void send_embedding(const server_slot & slot, const llama_batch & batch) { task_result res; - res.id = slot.task_id; - res.multitask_id = slot.multitask_id; - res.error = false; - res.stop = true; + res.id = slot.id_task; + res.id_multi = slot.id_multi; + res.error = false; + res.stop = true; const int n_embd = llama_n_embd(model); @@ -1040,15 +1038,19 @@ struct llama_server_context { const float * embd = llama_get_embeddings_seq(ctx, batch.seq_id[i][0]); if (embd == NULL) { embd = llama_get_embeddings_ith(ctx, i); - if (embd == NULL) { - LOG_ERROR("failed to get embeddings for token", {{"token", batch.token[i]}, {"seq_id", batch.seq_id[i][0]}}); + } - res.result_json = json { - {"embedding", std::vector(n_embd, 0.0f)}, - }; + if (embd == NULL) { + LOG_ERROR("failed to get embeddings", { + {"token", batch.token [i]}, + {"seq_id", batch.seq_id[i][0]} + }); - continue; - } + res.result_json = json { + {"embedding", std::vector(n_embd, 0.0f)}, + }; + + continue; } res.result_json = json { @@ -1060,15 +1062,15 @@ struct llama_server_context { queue_results.send(res); } - void request_completion(int task_id, json data, bool infill, bool embedding, int multitask_id) { + void request_completion(int id_task, int id_multi, json data, bool infill, bool embedding) { task_server task; - task.id = task_id; - task.target_id = 0; + task.id = id_task; + task.id_multi = id_multi; + task.id_target = 0; task.data = std::move(data); task.infill_mode = infill; task.embedding_mode = embedding; task.type = TASK_TYPE_COMPLETION; - task.multitask_id = multitask_id; // when a completion task's prompt array is not a singleton, we split it into multiple requests // otherwise, it's a single-prompt task, we actually queue it @@ -1090,22 +1092,22 @@ struct llama_server_context { if (numbers) { queue_tasks.post(task); } else { - split_multiprompt_task(task_id, task); + split_multiprompt_task(id_task, task); } } else { queue_tasks.post(task); } } - void request_cancel(int task_id) + void request_cancel(int id_task) { task_server task; task.type = TASK_TYPE_CANCEL; - task.target_id = task_id; + task.id_target = id_task; queue_tasks.post(task); } - void split_multiprompt_task(int multitask_id, task_server& multiprompt_task) + void split_multiprompt_task(int id_multi, task_server& multiprompt_task) { int prompt_count = multiprompt_task.data.at("prompt").size(); if (prompt_count <= 1) { @@ -1121,7 +1123,7 @@ struct llama_server_context { } // queue up the multitask so we can track its subtask progression - queue_tasks.add_multitask(multitask_id, subtask_ids); + queue_tasks.add_multitask(id_multi, subtask_ids); // add subtasks for (int i = 0; i < prompt_count; i++) @@ -1130,7 +1132,7 @@ struct llama_server_context { subtask_data["prompt"] = subtask_data["prompt"][i]; // subtasks inherit everything else (infill mode, embedding mode, etc.) - request_completion(subtask_ids[i], subtask_data, multiprompt_task.infill_mode, multiprompt_task.embedding_mode, multitask_id); + request_completion(subtask_ids[i], id_multi, subtask_data, multiprompt_task.infill_mode, multiprompt_task.embedding_mode); } } @@ -1138,10 +1140,10 @@ struct llama_server_context { switch (task.type) { case TASK_TYPE_COMPLETION: { - server_slot * slot = get_slot(json_value(task.data, "slot_id", -1)); + server_slot * slot = get_slot(json_value(task.data, "id_slot", -1)); if (slot == nullptr) { // if no slot is available, we defer this task for processing later - LOG_VERBOSE("no slot is available", {{"task_id", task.id}}); + LOG_VERBOSE("no slot is available", {{"id_task", task.id}}); queue_tasks.defer(task); break; } @@ -1160,10 +1162,10 @@ struct llama_server_context { slot->reset(); - slot->infill = task.infill_mode; - slot->embedding = task.embedding_mode; - slot->task_id = task.id; - slot->multitask_id = task.multitask_id; + slot->id_task = task.id; + slot->id_multi = task.id_multi; + slot->infill = task.infill_mode; + slot->embedding = task.embedding_mode; if (!launch_slot_with_data(*slot, task.data)) { @@ -1174,7 +1176,7 @@ struct llama_server_context { } break; case TASK_TYPE_CANCEL: { // release slot linked with the task id for (auto & slot : slots) { - if (slot.task_id == task.target_id) { + if (slot.id_task == task.id_target) { slot.release(); break; } @@ -1188,12 +1190,12 @@ struct llama_server_context { int n_idle_slots = 0; int n_processing_slots = 0; - for (server_slot &slot: slots) { + for (server_slot & slot: slots) { json slot_data = get_formated_generation(slot); - slot_data["id"] = slot.id; - slot_data["task_id"] = slot.task_id; - slot_data["state"] = slot.state; - slot_data["prompt"] = slot.prompt; + slot_data["id"] = slot.id; + slot_data["id_task"] = slot.id_task; + slot_data["state"] = slot.state; + slot_data["prompt"] = slot.prompt; slot_data["next_token"] = { {"has_next_token", slot.has_next_token}, {"n_remain", slot.n_remaining}, @@ -1211,21 +1213,23 @@ struct llama_server_context { slots_data.push_back(slot_data); } LOG_INFO("slot data", { - {"task_id", task.id}, + {"id_task", task.id}, {"n_idle_slots", n_idle_slots}, {"n_processing_slots", n_processing_slots} }); + LOG_VERBOSE("slot data", { - {"task_id", task.id}, + {"id_task", task.id}, {"n_idle_slots", n_idle_slots}, {"n_processing_slots", n_processing_slots}, {"slots", slots_data} }); + task_result res; - res.id = task.id; - res.multitask_id = task.multitask_id; - res.stop = true; - res.error = false; + res.id = task.id; + res.id_multi = task.id_multi; + res.stop = true; + res.error = false; res.result_json = { { "idle", n_idle_slots }, { "processing", n_processing_slots }, @@ -1244,6 +1248,7 @@ struct llama_server_context { { "slots", slots_data }, }; + metrics.reset_bucket(); queue_results.send(res); } break; @@ -1286,8 +1291,8 @@ struct llama_server_context { slot.t_last_used = ggml_time_us(); LOG_INFO("slot released", { - {"slot_id", slot.id}, - {"task_id", slot.task_id}, + {"id_slot", slot.id}, + {"id_task", slot.id_task}, {"n_ctx", n_ctx}, {"n_past", slot.n_past}, {"n_system_tokens", system_tokens.size()}, @@ -1299,6 +1304,7 @@ struct llama_server_context { } } + // check if all slots are idle { bool all_slots_are_idle = true; @@ -1324,7 +1330,7 @@ struct llama_server_context { { task_server task; task.type = TASK_TYPE_NEXT_RESPONSE; - task.target_id = -1; + task.id_target = -1; queue_tasks.post(task); } @@ -1338,8 +1344,8 @@ struct llama_server_context { const int n_discard = n_left / 2; LOG_INFO("slot context shift", { - {"slot_id", slot.id}, - {"task_id", slot.task_id}, + {"id_slot", slot.id}, + {"id_task", slot.id_task}, {"n_keep", n_keep}, {"n_left", n_left}, {"n_discard", n_discard}, @@ -1381,8 +1387,8 @@ struct llama_server_context { slot.n_past += 1; LOG_VERBOSE("slot decode token", { - {"slot_id", slot.id}, - {"task_id", slot.task_id}, + {"id_slot", slot.id}, + {"id_task", slot.id_task}, {"n_ctx", n_ctx}, {"n_past", slot.n_past}, {"n_system_tokens", system_tokens.size()}, @@ -1533,8 +1539,8 @@ struct llama_server_context { } LOG_INFO("slot progression", { - { "slot_id", slot.id }, - { "task_id", slot.task_id }, + { "id_slot", slot.id }, + { "id_task", slot.id_task }, { "n_past", slot.n_past }, { "n_past_se", slot.n_past_se }, { "ga_i", slot.ga_i }, @@ -1548,12 +1554,12 @@ struct llama_server_context { { // we have to evaluate at least 1 token to generate logits. LOG_INFO("we have to evaluate at least 1 token to generate logits", { - { "slot_id", slot.id }, - { "task_id", slot.task_id } + { "id_slot", slot.id }, + { "id_task", slot.id_task } }); + slot.n_past--; - if (slot.ga_i > 0) - { + if (slot.ga_i > 0) { slot.n_past_se--; } } @@ -1562,8 +1568,8 @@ struct llama_server_context { llama_kv_cache_seq_rm(ctx, slot.id, p0, -1); LOG_INFO("kv cache rm [p0, end)", { - { "slot_id", slot.id }, - { "task_id", slot.task_id }, + { "id_slot", slot.id }, + { "id_task", slot.id_task }, { "p0", p0 } }); @@ -2389,13 +2395,12 @@ static void server_params_parse(int argc, char ** argv, server_params & sparams, /* llama.cpp completion api semantics */ static json format_partial_response( - llama_server_context &llama, server_slot *slot, const std::string &content, const std::vector &probs + llama_server_context & llama, server_slot * slot, const std::string & content, const std::vector & probs ) { - json res = json - { + json res = json { {"content", content}, {"stop", false}, - {"slot_id", slot->id}, + {"id_slot", slot->id}, {"multimodal", false}, }; @@ -2527,7 +2532,7 @@ int main(int argc, char ** argv) { task_server task; task.id = llama.queue_tasks.get_new_id(); task.type = TASK_TYPE_METRICS; - task.target_id = -1; + task.id_target = -1; llama.queue_results.add_waiting_task_id(task.id); llama.queue_tasks.post(task); @@ -2574,7 +2579,7 @@ int main(int argc, char ** argv) { task_server task; task.id = llama.queue_tasks.get_new_id(); task.type = TASK_TYPE_METRICS; - task.target_id = -1; + task.id_target = -1; llama.queue_results.add_waiting_task_id(task.id); llama.queue_tasks.post(task); @@ -2594,7 +2599,7 @@ int main(int argc, char ** argv) { task_server task; task.id = llama.queue_tasks.get_new_id(); task.type = TASK_TYPE_METRICS; - task.target_id = -1; + task.id_target = -1; llama.queue_results.add_waiting_task_id(task.id); llama.queue_tasks.post(task); @@ -2822,12 +2827,12 @@ int main(int argc, char ** argv) { return; } json data = json::parse(req.body); - const int task_id = llama.queue_tasks.get_new_id(); - llama.queue_results.add_waiting_task_id(task_id); - llama.request_completion(task_id, data, false, false, -1); + const int id_task = llama.queue_tasks.get_new_id(); + llama.queue_results.add_waiting_task_id(id_task); + llama.request_completion(id_task, -1, data, false, false); if (!json_value(data, "stream", false)) { std::string completion_text; - task_result result = llama.queue_results.recv(task_id); + task_result result = llama.queue_results.recv(id_task); if (!result.error && result.stop) { res.set_content(result.result_json.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); } @@ -2836,13 +2841,13 @@ int main(int argc, char ** argv) { res.status = 404; res.set_content(result.result_json["content"], "text/plain; charset=utf-8"); } - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); } else { - const auto chunked_content_provider = [task_id, &llama](size_t, httplib::DataSink & sink) + const auto chunked_content_provider = [id_task, &llama](size_t, httplib::DataSink & sink) { while (true) { - task_result result = llama.queue_results.recv(task_id); + task_result result = llama.queue_results.recv(id_task); if (!result.error) { const std::string str = "data: " + @@ -2853,7 +2858,7 @@ int main(int argc, char ** argv) { }); if (!sink.write(str.c_str(), str.size())) { - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); return false; } if (result.stop) { @@ -2869,23 +2874,23 @@ int main(int argc, char ** argv) { }); if (!sink.write(str.c_str(), str.size())) { - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); return false; } break; } } - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); sink.done(); return true; }; - auto on_complete = [task_id, &llama] (bool) + auto on_complete = [id_task, &llama] (bool) { // cancel - llama.request_cancel(task_id); - llama.queue_results.remove_waiting_task_id(task_id); + llama.request_cancel(id_task); + llama.queue_results.remove_waiting_task_id(id_task); }; res.set_chunked_content_provider("text/event-stream", chunked_content_provider, on_complete); @@ -2921,13 +2926,13 @@ int main(int argc, char ** argv) { } json data = oaicompat_completion_params_parse(llama.model, json::parse(req.body), sparams.chat_template); - const int task_id = llama.queue_tasks.get_new_id(); - llama.queue_results.add_waiting_task_id(task_id); - llama.request_completion(task_id, data, false, false, -1); + const int id_task = llama.queue_tasks.get_new_id(); + llama.queue_results.add_waiting_task_id(id_task); + llama.request_completion(id_task, -1, data, false, false); if (!json_value(data, "stream", false)) { std::string completion_text; - task_result result = llama.queue_results.recv(task_id); + task_result result = llama.queue_results.recv(id_task); if (!result.error && result.stop) { json oaicompat_result = format_final_response_oaicompat(data, result); @@ -2939,11 +2944,11 @@ int main(int argc, char ** argv) { res.status = 500; res.set_content(result.result_json["content"], "text/plain; charset=utf-8"); } - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); } else { - const auto chunked_content_provider = [task_id, &llama](size_t, httplib::DataSink &sink) { + const auto chunked_content_provider = [id_task, &llama](size_t, httplib::DataSink &sink) { while (true) { - task_result llama_result = llama.queue_results.recv(task_id); + task_result llama_result = llama.queue_results.recv(id_task); if (!llama_result.error) { std::vector result_array = format_partial_response_oaicompat( llama_result); @@ -2956,7 +2961,7 @@ int main(int argc, char ** argv) { "\n\n"; LOG_VERBOSE("data stream", {{"to_send", str}}); if (!sink.write(str.c_str(), str.size())) { - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); return false; } } @@ -2972,21 +2977,21 @@ int main(int argc, char ** argv) { "\n\n"; LOG_VERBOSE("data stream", {{"to_send", str}}); if (!sink.write(str.c_str(), str.size())) { - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); return false; } break; } } sink.done(); - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); return true; }; - auto on_complete = [task_id, &llama](bool) { + auto on_complete = [id_task, &llama](bool) { // cancel request - llama.request_cancel(task_id); - llama.queue_results.remove_waiting_task_id(task_id); + llama.request_cancel(id_task); + llama.queue_results.remove_waiting_task_id(id_task); }; res.set_chunked_content_provider("text/event-stream", chunked_content_provider, on_complete); @@ -3003,12 +3008,12 @@ int main(int argc, char ** argv) { return; } json data = json::parse(req.body); - const int task_id = llama.queue_tasks.get_new_id(); - llama.queue_results.add_waiting_task_id(task_id); - llama.request_completion(task_id, data, true, false, -1); + const int id_task = llama.queue_tasks.get_new_id(); + llama.queue_results.add_waiting_task_id(id_task); + llama.request_completion(id_task, -1, data, true, false); if (!json_value(data, "stream", false)) { std::string completion_text; - task_result result = llama.queue_results.recv(task_id); + task_result result = llama.queue_results.recv(id_task); if (!result.error && result.stop) { res.set_content(result.result_json.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); @@ -3018,12 +3023,12 @@ int main(int argc, char ** argv) { res.status = 404; res.set_content(result.result_json["content"], "text/plain; charset=utf-8"); } - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); } else { - const auto chunked_content_provider = [task_id, &llama](size_t, httplib::DataSink & sink) { + const auto chunked_content_provider = [id_task, &llama](size_t, httplib::DataSink & sink) { while (true) { - task_result result = llama.queue_results.recv(task_id); + task_result result = llama.queue_results.recv(id_task); if (!result.error) { const std::string str = "data: " + @@ -3034,7 +3039,7 @@ int main(int argc, char ** argv) { }); if (!sink.write(str.c_str(), str.size())) { - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); return false; } if (result.stop) @@ -3048,15 +3053,15 @@ int main(int argc, char ** argv) { } } - llama.queue_results.remove_waiting_task_id(task_id); + llama.queue_results.remove_waiting_task_id(id_task); sink.done(); return true; }; - auto on_complete = [task_id, &llama] (bool) + auto on_complete = [id_task, &llama] (bool) { // cancel - llama.request_cancel(task_id); + llama.request_cancel(id_task); }; res.set_chunked_content_provider("text/event-stream", chunked_content_provider, on_complete); @@ -3109,13 +3114,13 @@ int main(int argc, char ** argv) { } // create and queue the task - const int task_id = llama.queue_tasks.get_new_id(); - llama.queue_results.add_waiting_task_id(task_id); - llama.request_completion(task_id, { {"prompt", prompt}, { "n_predict", 0} }, false, true, -1); + const int id_task = llama.queue_tasks.get_new_id(); + llama.queue_results.add_waiting_task_id(id_task); + llama.request_completion(id_task, -1, { {"prompt", prompt}, { "n_predict", 0} }, false, true); // get the result - task_result result = llama.queue_results.recv(task_id); - llama.queue_results.remove_waiting_task_id(task_id); + task_result result = llama.queue_results.recv(id_task); + llama.queue_results.remove_waiting_task_id(id_task); // send the result return res.set_content(result.result_json.dump(), "application/json; charset=utf-8"); @@ -3135,13 +3140,13 @@ int main(int argc, char ** argv) { json data = json::array(); int i = 0; for (const json &elem : prompt) { - const int task_id = llama.queue_tasks.get_new_id(); - llama.queue_results.add_waiting_task_id(task_id); - llama.request_completion(task_id, { {"prompt", elem}, { "n_predict", 0} }, false, true, -1); + const int id_task = llama.queue_tasks.get_new_id(); + llama.queue_results.add_waiting_task_id(id_task); + llama.request_completion(id_task, -1, { {"prompt", elem}, { "n_predict", 0} }, false, true); // get the result - task_result result = llama.queue_results.recv(task_id); - llama.queue_results.remove_waiting_task_id(task_id); + task_result result = llama.queue_results.recv(id_task); + llama.queue_results.remove_waiting_task_id(id_task); json embedding = json{ {"embedding", json_value(result.result_json, "embedding", json::array())}, @@ -3160,13 +3165,13 @@ int main(int argc, char ** argv) { } // create and queue the task - const int task_id = llama.queue_tasks.get_new_id(); - llama.queue_results.add_waiting_task_id(task_id); - llama.request_completion(task_id, { {"prompt", prompt}, { "n_predict", 0}}, false, true, -1); + const int id_task = llama.queue_tasks.get_new_id(); + llama.queue_results.add_waiting_task_id(id_task); + llama.request_completion(id_task, -1, { {"prompt", prompt}, { "n_predict", 0}}, false, true); // get the result - task_result result = llama.queue_results.recv(task_id); - llama.queue_results.remove_waiting_task_id(task_id); + task_result result = llama.queue_results.recv(id_task); + llama.queue_results.remove_waiting_task_id(id_task); json data = json::array({json{ {"embedding", json_value(result.result_json, "embedding", json::array())}, diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp index 9760d035c..89cd5bcb7 100644 --- a/examples/server/utils.hpp +++ b/examples/server/utils.hpp @@ -51,33 +51,37 @@ enum task_type { }; struct task_server { - int id = -1; // to be filled by llama_server_queue - int target_id; + int id = -1; // to be filled by llama_server_queue + int id_multi = -1; + int id_target = -1; + task_type type; json data; - bool infill_mode = false; + + bool infill_mode = false; bool embedding_mode = false; - int multitask_id = -1; }; struct task_result { - int id; - int multitask_id = -1; + int id = -1; + int id_multi = -1; + + json result_json; + bool stop; bool error; - json result_json; }; struct task_multi { - int id; - std::set subtasks_remaining{}; - std::vector results{}; + int id = -1; + + std::set subtasks_remaining; + std::vector results; }; // completion token output with probabilities struct completion_token_output { - struct token_prob - { + struct token_prob { llama_token tok; float prob; }; @@ -97,23 +101,23 @@ static inline void server_log(const char *level, const char *function, int line, std::stringstream ss_tid; ss_tid << std::this_thread::get_id(); json log = nlohmann::ordered_json{ - {"tid", ss_tid.str()}, + {"tid", ss_tid.str()}, {"timestamp", time(nullptr)}, }; if (server_log_json) { - log.merge_patch( - { - {"level", level}, - {"function", function}, - {"line", line}, - {"msg", message}, - }); + log.merge_patch( { + {"level", level}, + {"function", function}, + {"line", line}, + {"msg", message}, + }); + if (!extra.empty()) { log.merge_patch(extra); } - std::cout << log.dump(-1, ' ', false, json::error_handler_t::replace) << "\n" << std::flush; + printf("%s\n", log.dump(-1, ' ', false, json::error_handler_t::replace).c_str()); } else { char buf[1024]; snprintf(buf, 1024, "%4s [%24s] %s", level, function, message); @@ -163,7 +167,7 @@ inline std::string format_chat(const struct llama_model * model, const std::stri std::vector chat(messages.size()); for (size_t i = 0; i < messages.size(); ++i) { - auto &curr_msg = messages[i]; + const auto & curr_msg = messages[i]; str[i*2 + 0] = json_value(curr_msg, "role", std::string("")); str[i*2 + 1] = json_value(curr_msg, "content", std::string("")); alloc_size += str[i*2 + 1].length(); @@ -195,8 +199,10 @@ inline std::string format_chat(const struct llama_model * model, const std::stri struct llama_server_queue { int id = 0; - std::mutex mutex_tasks; bool running; + + std::mutex mutex_tasks; + // queues std::vector queue_tasks; std::vector queue_tasks_deferred; @@ -235,17 +241,17 @@ struct llama_server_queue { // Register function to process a new task void on_new_task(std::function callback) { - callback_new_task = callback; + callback_new_task = std::move(callback); } // Register function to process a multitask when it is finished void on_finish_multitask(std::function callback) { - callback_finish_multitask = callback; + callback_finish_multitask = std::move(callback); } // Register the function to be called when all slots data is ready to be processed void on_run_slots(std::function callback) { - callback_run_slots = callback; + callback_run_slots = std::move(callback); } // Call when the state of one slot is changed @@ -259,8 +265,7 @@ struct llama_server_queue { } // end the start_loop routine - void terminate() { - { + void terminate() { { std::unique_lock lock(mutex_tasks); running = false; } @@ -276,46 +281,46 @@ struct llama_server_queue { */ void start_loop() { running = true; + while (true) { LOG_VERBOSE("new task may arrive", {}); + + while (true) { - while (true) - { - std::unique_lock lock(mutex_tasks); - if (queue_tasks.empty()) { - lock.unlock(); - break; - } - task_server task = queue_tasks.front(); - queue_tasks.erase(queue_tasks.begin()); + std::unique_lock lock(mutex_tasks); + if (queue_tasks.empty()) { lock.unlock(); - LOG_VERBOSE("callback_new_task", {{"task_id", task.id}}); - callback_new_task(task); + break; } - LOG_VERBOSE("update_multitasks", {}); - // check if we have any finished multitasks - auto queue_iterator = queue_multitasks.begin(); - while (queue_iterator != queue_multitasks.end()) - { - if (queue_iterator->subtasks_remaining.empty()) - { - // all subtasks done == multitask is done - task_multi current_multitask = *queue_iterator; - callback_finish_multitask(current_multitask); - // remove this multitask - queue_iterator = queue_multitasks.erase(queue_iterator); - } - else - { - ++queue_iterator; - } - } - // all tasks in the current loop is processed, slots data is now ready - LOG_VERBOSE("callback_run_slots", {}); - callback_run_slots(); + task_server task = queue_tasks.front(); + queue_tasks.erase(queue_tasks.begin()); + lock.unlock(); + LOG_VERBOSE("callback_new_task", {{"id_task", task.id}}); + callback_new_task(task); } + + LOG_VERBOSE("update_multitasks", {}); + + // check if we have any finished multitasks + auto queue_iterator = queue_multitasks.begin(); + while (queue_iterator != queue_multitasks.end()) { + if (queue_iterator->subtasks_remaining.empty()) { + // all subtasks done == multitask is done + task_multi current_multitask = *queue_iterator; + callback_finish_multitask(current_multitask); + // remove this multitask + queue_iterator = queue_multitasks.erase(queue_iterator); + } else { + ++queue_iterator; + } + } + + // all tasks in the current loop is processed, slots data is now ready + LOG_VERBOSE("callback_run_slots", {}); + + callback_run_slots(); + LOG_VERBOSE("wait for new task", {}); - // wait for new task { std::unique_lock lock(mutex_tasks); if (queue_tasks.empty()) { @@ -336,22 +341,21 @@ struct llama_server_queue { // // add a multitask by specifying the id of all subtask (subtask is a task_server) - void add_multitask(int multitask_id, std::vector& sub_ids) - { + void add_multitask(int id_multi, std::vector& sub_ids) { std::lock_guard lock(mutex_tasks); task_multi multi; - multi.id = multitask_id; + multi.id = id_multi; std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end())); queue_multitasks.push_back(multi); } // updatethe remaining subtasks, while appending results to multitask - void update_multitask(int multitask_id, int subtask_id, task_result& result) + void update_multitask(int id_multi, int subtask_id, task_result& result) { std::lock_guard lock(mutex_tasks); for (auto& multitask : queue_multitasks) { - if (multitask.id == multitask_id) + if (multitask.id == id_multi) { multitask.subtasks_remaining.erase(subtask_id); multitask.results.push_back(result); @@ -363,41 +367,43 @@ struct llama_server_queue { struct llama_server_response { typedef std::function callback_multitask_t; callback_multitask_t callback_update_multitask; + // for keeping track of all tasks waiting for the result std::set waiting_task_ids; + // the main result queue std::vector queue_results; + std::mutex mutex_results; std::condition_variable condition_results; - // add the task_id to the list of tasks waiting for response - void add_waiting_task_id(int task_id) { - LOG_VERBOSE("waiting for task id", {{"task_id", task_id}}); + // add the id_task to the list of tasks waiting for response + void add_waiting_task_id(int id_task) { + LOG_VERBOSE("waiting for task id", {{"id_task", id_task}}); + std::unique_lock lock(mutex_results); - waiting_task_ids.insert(task_id); + waiting_task_ids.insert(id_task); } // when the request is finished, we can remove task associated with it - void remove_waiting_task_id(int task_id) { - LOG_VERBOSE("remove waiting for task id", {{"task_id", task_id}}); + void remove_waiting_task_id(int id_task) { + LOG_VERBOSE("remove waiting for task id", {{"id_task", id_task}}); + std::unique_lock lock(mutex_results); - waiting_task_ids.erase(task_id); + waiting_task_ids.erase(id_task); } - // This function blocks the thread until there is a response for this task_id - task_result recv(int task_id) { - while (true) - { + // This function blocks the thread until there is a response for this id_task + task_result recv(int id_task) { + while (true) { std::unique_lock lock(mutex_results); condition_results.wait(lock, [&]{ return !queue_results.empty(); }); - for (int i = 0; i < (int) queue_results.size(); i++) - { - if (queue_results[i].id == task_id) - { - assert(queue_results[i].multitask_id == -1); + for (int i = 0; i < (int) queue_results.size(); i++) { + if (queue_results[i].id == id_task) { + assert(queue_results[i].id_multi == -1); task_result res = queue_results[i]; queue_results.erase(queue_results.begin() + i); return res; @@ -410,26 +416,25 @@ struct llama_server_response { // Register the function to update multitask void on_multitask_update(callback_multitask_t callback) { - callback_update_multitask = callback; + callback_update_multitask = std::move(callback); } - // Send a new result to a waiting task_id + // Send a new result to a waiting id_task void send(task_result result) { + LOG_VERBOSE("send new result", {{"id_task", result.id}}); + std::unique_lock lock(mutex_results); - LOG_VERBOSE("send new result", {{"task_id", result.id}}); - for (auto& task_id : waiting_task_ids) { - // LOG_TEE("waiting task id %i \n", task_id); + for (const auto & id_task : waiting_task_ids) { + // LOG_TEE("waiting task id %i \n", id_task); // for now, tasks that have associated parent multitasks just get erased once multitask picks up the result - if (result.multitask_id == task_id) - { - LOG_VERBOSE("callback_update_multitask", {{"task_id", task_id}}); - callback_update_multitask(task_id, result.id, result); + if (result.id_multi == id_task) { + LOG_VERBOSE("callback_update_multitask", {{"id_task", id_task}}); + callback_update_multitask(id_task, result.id, result); continue; } - if (result.id == task_id) - { - LOG_VERBOSE("queue_results.push_back", {{"task_id", task_id}}); + if (result.id == id_task) { + LOG_VERBOSE("queue_results.push_back", {{"id_task", id_task}}); queue_results.push_back(result); condition_results.notify_all(); return; @@ -447,13 +452,11 @@ static const std::string base64_chars = "abcdefghijklmnopqrstuvwxyz" "0123456789+/"; -static inline bool is_base64(uint8_t c) -{ +static inline bool is_base64(uint8_t c) { return (isalnum(c) || (c == '+') || (c == '/')); } -static inline std::vector base64_decode(const std::string & encoded_string) -{ +static inline std::vector base64_decode(const std::string & encoded_string) { int i = 0; int j = 0; int in_ = 0; @@ -516,8 +519,7 @@ static inline std::vector base64_decode(const std::string & encoded_str // random string / id // -static std::string random_string() -{ +static std::string random_string() { static const std::string str("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"); std::random_device rd; @@ -532,10 +534,10 @@ static std::string random_string() return result; } -static std::string gen_chatcmplid() -{ +static std::string gen_chatcmplid() { std::stringstream chatcmplid; chatcmplid << "chatcmpl-" << random_string(); + return chatcmplid.str(); } @@ -543,91 +545,79 @@ static std::string gen_chatcmplid() // other common utils // -static size_t common_part(const std::vector &a, const std::vector &b) -{ +static size_t common_part(const std::vector & a, const std::vector & b) { size_t i; - for (i = 0; i < a.size() && i < b.size() && a[i] == b[i]; i++) - { - } + for (i = 0; i < a.size() && i < b.size() && a[i] == b[i]; i++) {} + return i; } -static bool ends_with(const std::string &str, const std::string &suffix) -{ - return str.size() >= suffix.size() && - 0 == str.compare(str.size() - suffix.size(), suffix.size(), suffix); +static bool ends_with(const std::string & str, const std::string & suffix) { + return str.size() >= suffix.size() && 0 == str.compare(str.size() - suffix.size(), suffix.size(), suffix); } -static size_t find_partial_stop_string(const std::string &stop, - const std::string &text) -{ - if (!text.empty() && !stop.empty()) - { +static size_t find_partial_stop_string(const std::string &stop, const std::string &text) { + if (!text.empty() && !stop.empty()) { const char text_last_char = text.back(); - for (int64_t char_index = stop.size() - 1; char_index >= 0; char_index--) - { - if (stop[char_index] == text_last_char) - { + for (int64_t char_index = stop.size() - 1; char_index >= 0; char_index--) { + if (stop[char_index] == text_last_char) { const std::string current_partial = stop.substr(0, char_index + 1); - if (ends_with(text, current_partial)) - { + if (ends_with(text, current_partial)) { return text.size() - char_index - 1; } } } } + return std::string::npos; } // TODO: reuse llama_detokenize template -static std::string tokens_to_str(llama_context *ctx, Iter begin, Iter end) -{ +static std::string tokens_to_str(llama_context * ctx, Iter begin, Iter end) { std::string ret; - for (; begin != end; ++begin) - { + for (; begin != end; ++begin) { ret += llama_token_to_piece(ctx, *begin); } + return ret; } // format incomplete utf-8 multibyte character for output -static std::string tokens_to_output_formatted_string(const llama_context *ctx, const llama_token token) -{ +static std::string tokens_to_output_formatted_string(const llama_context * ctx, const llama_token token) { std::string out = token == -1 ? "" : llama_token_to_piece(ctx, token); // if the size is 1 and first bit is 1, meaning it's a partial character // (size > 1 meaning it's already a known token) - if (out.size() == 1 && (out[0] & 0x80) == 0x80) - { + if (out.size() == 1 && (out[0] & 0x80) == 0x80) { std::stringstream ss; ss << std::hex << (out[0] & 0xff); std::string res(ss.str()); out = "byte: \\x" + res; } + return out; } // convert a vector of completion_token_output to json -static json probs_vector_to_json(const llama_context *ctx, const std::vector &probs) -{ +static json probs_vector_to_json(const llama_context * ctx, const std::vector & probs) { json out = json::array(); - for (const auto &prob : probs) - { + for (const auto & prob : probs) { json probs_for_token = json::array(); - for (const auto &p : prob.probs) - { - std::string tok_str = tokens_to_output_formatted_string(ctx, p.tok); - probs_for_token.push_back(json - { + + for (const auto & p : prob.probs) { + const std::string tok_str = tokens_to_output_formatted_string(ctx, p.tok); + probs_for_token.push_back(json { {"tok_str", tok_str}, {"prob", p.prob}, }); } - std::string tok_str = tokens_to_output_formatted_string(ctx, prob.tok); - out.push_back(json{ + + const std::string tok_str = tokens_to_output_formatted_string(ctx, prob.tok); + out.push_back(json { {"content", tok_str}, {"probs", probs_for_token}, }); } + return out; }