diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 68153928e..e927b9c0d 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -25,7 +25,6 @@ #include #include -#include #include #include #include @@ -328,10 +327,8 @@ struct llama_server_context // slots / clients std::vector slots; - llama_server_queue queue_tasks; - llama_server_response_event queue_results; - std::vector queue_multitasks; - std::mutex mutex_multitasks; + llama_server_queue queue_tasks; + llama_server_response queue_results; ~llama_server_context() { @@ -961,30 +958,6 @@ struct llama_server_context queue_results.send(res); } - void add_multitask(int id, std::vector& sub_ids) - { - std::lock_guard lock(mutex_multitasks); - task_multi multi; - multi.id = id; - std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end())); - queue_multitasks.push_back(multi); - // TODO @ngxson : Do we need to notify the queue_tasks? - } - - void update_multitask(int multitask_id, int subtask_id, task_result& result) - { - std::lock_guard lock(mutex_multitasks); - for (auto& multitask : queue_multitasks) - { - if (multitask.id == multitask_id) - { - multitask.subtasks_remaining.erase(subtask_id); - multitask.results.push_back(result); - // TODO @ngxson : Do we need to notify the queue_tasks? - } - } - } - json get_model_props() { return get_formated_generation(slots[0]); @@ -1120,7 +1093,7 @@ struct llama_server_context // parent multitask, if any, needs to be updated if (slot.multitask_id != -1) { - update_multitask(slot.multitask_id, slot.task_id, res); + queue_tasks.update_multitask(slot.multitask_id, slot.task_id, res); } } @@ -1157,7 +1130,6 @@ struct llama_server_context int request_completion(json data, bool infill, bool embedding, int multitask_id) { - std::unique_lock lock(mutex_multitasks); task_server task; task.target_id = 0; task.data = std::move(data); @@ -1169,7 +1141,6 @@ struct llama_server_context // when a completion task's prompt array is not a singleton, we split it into multiple requests if (task.data.count("prompt") && task.data.at("prompt").size() > 1) { - lock.unlock(); // entering new func scope return split_multiprompt_task(task); } @@ -1270,11 +1241,11 @@ struct llama_server_context } // queue up the multitask so we can track its subtask progression - add_multitask(multitask_id, subtask_ids); + queue_tasks.add_multitask(multitask_id, subtask_ids); return multitask_id; } - void process_single_task(task_server task) + void process_single_task(task_server& task) { switch (task.type) { @@ -1283,7 +1254,7 @@ struct llama_server_context if (slot == nullptr) { // if no slot is available, we defer this task for processing later - LOG_TEE("no slot\n"); + LOG_VERBOSE("no slot is available", {}); queue_tasks.defer(task); break; } @@ -1333,42 +1304,23 @@ struct llama_server_context } } - void process_multitask() + void on_finish_multitask(task_multi& multitask) { - // remove finished multitasks from the queue of multitasks, and add the corresponding result to the result queue - std::vector agg_results; - auto queue_iterator = queue_multitasks.begin(); - while (queue_iterator != queue_multitasks.end()) + // all subtasks done == multitask is done + task_result result; + result.id = multitask.id; + result.stop = true; + result.error = false; + + // collect json results into one json result + std::vector result_jsons; + for (auto& subres : multitask.results) { - if (queue_iterator->subtasks_remaining.empty()) - { - // all subtasks done == multitask is done - task_result aggregate_result; - aggregate_result.id = queue_iterator->id; - aggregate_result.stop = true; - aggregate_result.error = false; - - // collect json results into one json result - std::vector result_jsons; - for (auto& subres : queue_iterator->results) - { - result_jsons.push_back(subres.result_json); - aggregate_result.error = aggregate_result.error && subres.error; - } - aggregate_result.result_json = json{ "results", result_jsons }; - agg_results.push_back(aggregate_result); - queue_iterator = queue_multitasks.erase(queue_iterator); - } - else - { - ++queue_iterator; - } - } - - // copy aggregate results of complete multi-tasks to the results queue - for (auto& res : agg_results) { - queue_results.send(res); + result_jsons.push_back(subres.result_json); + result.error = result.error && subres.error; } + result.result_json = json{ "results", result_jsons }; + queue_results.send(result); } bool update_slots() { @@ -1704,7 +1656,6 @@ struct llama_server_context } void run_on_all_tasks_finished() { - process_multitask(); update_slots(); } }; @@ -2861,16 +2812,18 @@ int main(int argc, char **argv) llama.queue_tasks.on_new_task(std::bind( &llama_server_context::process_single_task, &llama, std::placeholders::_1)); + llama.queue_tasks.on_finish_multitask(std::bind( + &llama_server_context::on_finish_multitask, &llama, std::placeholders::_1)); llama.queue_tasks.on_all_tasks_finished(std::bind( &llama_server_context::run_on_all_tasks_finished, &llama)); - llama.queue_tasks.start_loop(); llama.queue_results.on_multitask_update(std::bind( - &llama_server_context::update_multitask, - &llama, + &llama_server_queue::update_multitask, + &llama.queue_tasks, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3 )); + llama.queue_tasks.start_loop(); t.join(); diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp index b14898d68..80f57dc59 100644 --- a/examples/server/utils.hpp +++ b/examples/server/utils.hpp @@ -187,18 +187,21 @@ inline std::string format_chatml(std::vector messages) // work queue utils // -template struct llama_server_queue { int id = 0; std::mutex mutex_tasks; - std::vector queue_tasks; - std::vector queue_tasks_deferred; + // queues + std::vector queue_tasks; + std::vector queue_tasks_deferred; + std::vector queue_multitasks; std::condition_variable condition_tasks; - std::function callback_new_task; + // callback functions + std::function callback_new_task; + std::function callback_finish_multitask; std::function callback_all_task_finished; // Add a new task to the end of the queue - int post(T task) { + int post(task_server task) { std::unique_lock lock(mutex_tasks); task.id = id++; queue_tasks.push_back(std::move(task)); @@ -207,7 +210,7 @@ struct llama_server_queue { } // Add a new task, but defer until the next loop - void defer(T task) { + void defer(task_server task) { std::unique_lock lock(mutex_tasks); queue_tasks_deferred.push_back(std::move(task)); } @@ -219,10 +222,15 @@ struct llama_server_queue { } // Register function to process a new task - void on_new_task(std::function callback) { + void on_new_task(std::function callback) { callback_new_task = callback; } + // Register function to process a multitask + void on_finish_multitask(std::function callback) { + callback_finish_multitask = callback; + } + // Register the function to be called when the batch of tasks is finished void on_all_tasks_finished(std::function callback) { callback_all_task_finished = callback; @@ -257,6 +265,24 @@ struct llama_server_queue { lock.unlock(); } LOG_VERBOSE("callback_all_task_finished", {}); + // process and update all the multitasks + auto queue_iterator = queue_multitasks.begin(); + while (queue_iterator != queue_multitasks.end()) + { + if (queue_iterator->subtasks_remaining.empty()) + { + // all subtasks done == multitask is done + task_multi current_multitask = *queue_iterator; + callback_finish_multitask(current_multitask); + // remove this multitask + queue_iterator = queue_multitasks.erase(queue_iterator); + } + else + { + ++queue_iterator; + } + } + // all tasks in the current loop is finished callback_all_task_finished(); } LOG_VERBOSE("wait for new task", {}); @@ -271,13 +297,40 @@ struct llama_server_queue { } } } + + // + // functions to manage multitasks + // + + // add a multitask by specifying the id of all subtask (subtask is a task_server) + void add_multitask(int multitask_id, std::vector& sub_ids) + { + std::lock_guard lock(mutex_tasks); + task_multi multi; + multi.id = multitask_id; + std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end())); + queue_multitasks.push_back(multi); + } + + // updatethe remaining subtasks, while appending results to multitask + void update_multitask(int multitask_id, int subtask_id, task_result& result) + { + std::lock_guard lock(mutex_tasks); + for (auto& multitask : queue_multitasks) + { + if (multitask.id == multitask_id) + { + multitask.subtasks_remaining.erase(subtask_id); + multitask.results.push_back(result); + } + } + } }; -struct llama_server_response_event { +struct llama_server_response { typedef std::function callback_multitask_t; callback_multitask_t callback_update_multitask; // for keeping track of all tasks waiting for the result - std::mutex mutex_task_ids; std::set waiting_task_ids; // the main result queue std::vector queue_results; @@ -285,12 +338,12 @@ struct llama_server_response_event { std::condition_variable condition_results; void add_waiting_task_id(int task_id) { - std::unique_lock lock(mutex_task_ids); + std::unique_lock lock(mutex_results); waiting_task_ids.insert(task_id); } void remove_waiting_task_id(int task_id) { - std::unique_lock lock(mutex_task_ids); + std::unique_lock lock(mutex_results); waiting_task_ids.erase(task_id); } @@ -327,7 +380,6 @@ struct llama_server_response_event { // Send a new result to a waiting task_id void send(task_result result) { std::unique_lock lock(mutex_results); - std::unique_lock lock1(mutex_task_ids); LOG_VERBOSE("send new result", {}); for (auto& task_id : waiting_task_ids) { // LOG_TEE("waiting task id %i \n", task_id); @@ -449,4 +501,4 @@ static std::string gen_chatcmplid() std::stringstream chatcmplid; chatcmplid << "chatcmpl-" << random_string(); return chatcmplid.str(); -} \ No newline at end of file +}