server: move all mutexes away from server.cpp
This commit is contained in:
parent
58fe9cf572
commit
d87b48fd55
2 changed files with 90 additions and 85 deletions
|
@ -25,7 +25,6 @@
|
|||
|
||||
#include <cstddef>
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <atomic>
|
||||
|
@ -328,10 +327,8 @@ struct llama_server_context
|
|||
// slots / clients
|
||||
std::vector<llama_client_slot> slots;
|
||||
|
||||
llama_server_queue<task_server> queue_tasks;
|
||||
llama_server_response_event queue_results;
|
||||
std::vector<task_multi> queue_multitasks;
|
||||
std::mutex mutex_multitasks;
|
||||
llama_server_queue queue_tasks;
|
||||
llama_server_response queue_results;
|
||||
|
||||
~llama_server_context()
|
||||
{
|
||||
|
@ -961,30 +958,6 @@ struct llama_server_context
|
|||
queue_results.send(res);
|
||||
}
|
||||
|
||||
void add_multitask(int id, std::vector<int>& sub_ids)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_multitasks);
|
||||
task_multi multi;
|
||||
multi.id = id;
|
||||
std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end()));
|
||||
queue_multitasks.push_back(multi);
|
||||
// TODO @ngxson : Do we need to notify the queue_tasks?
|
||||
}
|
||||
|
||||
void update_multitask(int multitask_id, int subtask_id, task_result& result)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_multitasks);
|
||||
for (auto& multitask : queue_multitasks)
|
||||
{
|
||||
if (multitask.id == multitask_id)
|
||||
{
|
||||
multitask.subtasks_remaining.erase(subtask_id);
|
||||
multitask.results.push_back(result);
|
||||
// TODO @ngxson : Do we need to notify the queue_tasks?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
json get_model_props()
|
||||
{
|
||||
return get_formated_generation(slots[0]);
|
||||
|
@ -1120,7 +1093,7 @@ struct llama_server_context
|
|||
// parent multitask, if any, needs to be updated
|
||||
if (slot.multitask_id != -1)
|
||||
{
|
||||
update_multitask(slot.multitask_id, slot.task_id, res);
|
||||
queue_tasks.update_multitask(slot.multitask_id, slot.task_id, res);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1157,7 +1130,6 @@ struct llama_server_context
|
|||
|
||||
int request_completion(json data, bool infill, bool embedding, int multitask_id)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_multitasks);
|
||||
task_server task;
|
||||
task.target_id = 0;
|
||||
task.data = std::move(data);
|
||||
|
@ -1169,7 +1141,6 @@ struct llama_server_context
|
|||
// when a completion task's prompt array is not a singleton, we split it into multiple requests
|
||||
if (task.data.count("prompt") && task.data.at("prompt").size() > 1)
|
||||
{
|
||||
lock.unlock(); // entering new func scope
|
||||
return split_multiprompt_task(task);
|
||||
}
|
||||
|
||||
|
@ -1270,11 +1241,11 @@ struct llama_server_context
|
|||
}
|
||||
|
||||
// queue up the multitask so we can track its subtask progression
|
||||
add_multitask(multitask_id, subtask_ids);
|
||||
queue_tasks.add_multitask(multitask_id, subtask_ids);
|
||||
return multitask_id;
|
||||
}
|
||||
|
||||
void process_single_task(task_server task)
|
||||
void process_single_task(task_server& task)
|
||||
{
|
||||
switch (task.type)
|
||||
{
|
||||
|
@ -1283,7 +1254,7 @@ struct llama_server_context
|
|||
if (slot == nullptr)
|
||||
{
|
||||
// if no slot is available, we defer this task for processing later
|
||||
LOG_TEE("no slot\n");
|
||||
LOG_VERBOSE("no slot is available", {});
|
||||
queue_tasks.defer(task);
|
||||
break;
|
||||
}
|
||||
|
@ -1333,42 +1304,23 @@ struct llama_server_context
|
|||
}
|
||||
}
|
||||
|
||||
void process_multitask()
|
||||
void on_finish_multitask(task_multi& multitask)
|
||||
{
|
||||
// remove finished multitasks from the queue of multitasks, and add the corresponding result to the result queue
|
||||
std::vector<task_result> agg_results;
|
||||
auto queue_iterator = queue_multitasks.begin();
|
||||
while (queue_iterator != queue_multitasks.end())
|
||||
// all subtasks done == multitask is done
|
||||
task_result result;
|
||||
result.id = multitask.id;
|
||||
result.stop = true;
|
||||
result.error = false;
|
||||
|
||||
// collect json results into one json result
|
||||
std::vector<json> result_jsons;
|
||||
for (auto& subres : multitask.results)
|
||||
{
|
||||
if (queue_iterator->subtasks_remaining.empty())
|
||||
{
|
||||
// all subtasks done == multitask is done
|
||||
task_result aggregate_result;
|
||||
aggregate_result.id = queue_iterator->id;
|
||||
aggregate_result.stop = true;
|
||||
aggregate_result.error = false;
|
||||
|
||||
// collect json results into one json result
|
||||
std::vector<json> result_jsons;
|
||||
for (auto& subres : queue_iterator->results)
|
||||
{
|
||||
result_jsons.push_back(subres.result_json);
|
||||
aggregate_result.error = aggregate_result.error && subres.error;
|
||||
}
|
||||
aggregate_result.result_json = json{ "results", result_jsons };
|
||||
agg_results.push_back(aggregate_result);
|
||||
queue_iterator = queue_multitasks.erase(queue_iterator);
|
||||
}
|
||||
else
|
||||
{
|
||||
++queue_iterator;
|
||||
}
|
||||
}
|
||||
|
||||
// copy aggregate results of complete multi-tasks to the results queue
|
||||
for (auto& res : agg_results) {
|
||||
queue_results.send(res);
|
||||
result_jsons.push_back(subres.result_json);
|
||||
result.error = result.error && subres.error;
|
||||
}
|
||||
result.result_json = json{ "results", result_jsons };
|
||||
queue_results.send(result);
|
||||
}
|
||||
|
||||
bool update_slots() {
|
||||
|
@ -1704,7 +1656,6 @@ struct llama_server_context
|
|||
}
|
||||
|
||||
void run_on_all_tasks_finished() {
|
||||
process_multitask();
|
||||
update_slots();
|
||||
}
|
||||
};
|
||||
|
@ -2861,16 +2812,18 @@ int main(int argc, char **argv)
|
|||
|
||||
llama.queue_tasks.on_new_task(std::bind(
|
||||
&llama_server_context::process_single_task, &llama, std::placeholders::_1));
|
||||
llama.queue_tasks.on_finish_multitask(std::bind(
|
||||
&llama_server_context::on_finish_multitask, &llama, std::placeholders::_1));
|
||||
llama.queue_tasks.on_all_tasks_finished(std::bind(
|
||||
&llama_server_context::run_on_all_tasks_finished, &llama));
|
||||
llama.queue_tasks.start_loop();
|
||||
llama.queue_results.on_multitask_update(std::bind(
|
||||
&llama_server_context::update_multitask,
|
||||
&llama,
|
||||
&llama_server_queue::update_multitask,
|
||||
&llama.queue_tasks,
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2,
|
||||
std::placeholders::_3
|
||||
));
|
||||
llama.queue_tasks.start_loop();
|
||||
|
||||
t.join();
|
||||
|
||||
|
|
|
@ -187,18 +187,21 @@ inline std::string format_chatml(std::vector<json> messages)
|
|||
// work queue utils
|
||||
//
|
||||
|
||||
template<typename T>
|
||||
struct llama_server_queue {
|
||||
int id = 0;
|
||||
std::mutex mutex_tasks;
|
||||
std::vector<T> queue_tasks;
|
||||
std::vector<T> queue_tasks_deferred;
|
||||
// queues
|
||||
std::vector<task_server> queue_tasks;
|
||||
std::vector<task_server> queue_tasks_deferred;
|
||||
std::vector<task_multi> queue_multitasks;
|
||||
std::condition_variable condition_tasks;
|
||||
std::function<void(T)> callback_new_task;
|
||||
// callback functions
|
||||
std::function<void(task_server&)> callback_new_task;
|
||||
std::function<void(task_multi&)> callback_finish_multitask;
|
||||
std::function<void(void)> callback_all_task_finished;
|
||||
|
||||
// Add a new task to the end of the queue
|
||||
int post(T task) {
|
||||
int post(task_server task) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
task.id = id++;
|
||||
queue_tasks.push_back(std::move(task));
|
||||
|
@ -207,7 +210,7 @@ struct llama_server_queue {
|
|||
}
|
||||
|
||||
// Add a new task, but defer until the next loop
|
||||
void defer(T task) {
|
||||
void defer(task_server task) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
queue_tasks_deferred.push_back(std::move(task));
|
||||
}
|
||||
|
@ -219,10 +222,15 @@ struct llama_server_queue {
|
|||
}
|
||||
|
||||
// Register function to process a new task
|
||||
void on_new_task(std::function<void(T)> callback) {
|
||||
void on_new_task(std::function<void(task_server&)> callback) {
|
||||
callback_new_task = callback;
|
||||
}
|
||||
|
||||
// Register function to process a multitask
|
||||
void on_finish_multitask(std::function<void(task_multi&)> callback) {
|
||||
callback_finish_multitask = callback;
|
||||
}
|
||||
|
||||
// Register the function to be called when the batch of tasks is finished
|
||||
void on_all_tasks_finished(std::function<void(void)> callback) {
|
||||
callback_all_task_finished = callback;
|
||||
|
@ -257,6 +265,24 @@ struct llama_server_queue {
|
|||
lock.unlock();
|
||||
}
|
||||
LOG_VERBOSE("callback_all_task_finished", {});
|
||||
// process and update all the multitasks
|
||||
auto queue_iterator = queue_multitasks.begin();
|
||||
while (queue_iterator != queue_multitasks.end())
|
||||
{
|
||||
if (queue_iterator->subtasks_remaining.empty())
|
||||
{
|
||||
// all subtasks done == multitask is done
|
||||
task_multi current_multitask = *queue_iterator;
|
||||
callback_finish_multitask(current_multitask);
|
||||
// remove this multitask
|
||||
queue_iterator = queue_multitasks.erase(queue_iterator);
|
||||
}
|
||||
else
|
||||
{
|
||||
++queue_iterator;
|
||||
}
|
||||
}
|
||||
// all tasks in the current loop is finished
|
||||
callback_all_task_finished();
|
||||
}
|
||||
LOG_VERBOSE("wait for new task", {});
|
||||
|
@ -271,13 +297,40 @@ struct llama_server_queue {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// functions to manage multitasks
|
||||
//
|
||||
|
||||
// add a multitask by specifying the id of all subtask (subtask is a task_server)
|
||||
void add_multitask(int multitask_id, std::vector<int>& sub_ids)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_tasks);
|
||||
task_multi multi;
|
||||
multi.id = multitask_id;
|
||||
std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end()));
|
||||
queue_multitasks.push_back(multi);
|
||||
}
|
||||
|
||||
// updatethe remaining subtasks, while appending results to multitask
|
||||
void update_multitask(int multitask_id, int subtask_id, task_result& result)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_tasks);
|
||||
for (auto& multitask : queue_multitasks)
|
||||
{
|
||||
if (multitask.id == multitask_id)
|
||||
{
|
||||
multitask.subtasks_remaining.erase(subtask_id);
|
||||
multitask.results.push_back(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct llama_server_response_event {
|
||||
struct llama_server_response {
|
||||
typedef std::function<void(int, int, task_result&)> callback_multitask_t;
|
||||
callback_multitask_t callback_update_multitask;
|
||||
// for keeping track of all tasks waiting for the result
|
||||
std::mutex mutex_task_ids;
|
||||
std::set<int> waiting_task_ids;
|
||||
// the main result queue
|
||||
std::vector<task_result> queue_results;
|
||||
|
@ -285,12 +338,12 @@ struct llama_server_response_event {
|
|||
std::condition_variable condition_results;
|
||||
|
||||
void add_waiting_task_id(int task_id) {
|
||||
std::unique_lock<std::mutex> lock(mutex_task_ids);
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
waiting_task_ids.insert(task_id);
|
||||
}
|
||||
|
||||
void remove_waiting_task_id(int task_id) {
|
||||
std::unique_lock<std::mutex> lock(mutex_task_ids);
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
waiting_task_ids.erase(task_id);
|
||||
}
|
||||
|
||||
|
@ -327,7 +380,6 @@ struct llama_server_response_event {
|
|||
// Send a new result to a waiting task_id
|
||||
void send(task_result result) {
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
std::unique_lock<std::mutex> lock1(mutex_task_ids);
|
||||
LOG_VERBOSE("send new result", {});
|
||||
for (auto& task_id : waiting_task_ids) {
|
||||
// LOG_TEE("waiting task id %i \n", task_id);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue