From f4dab52a241d6ce7b5b66da3e299fbbb15d6e9d3 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 22 Jan 2025 00:04:09 +0100 Subject: [PATCH] server : add more clean up when cancel_tasks is called --- examples/server/server.cpp | 42 ++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 5f08c4ecc..fb41775ca 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1428,6 +1428,10 @@ struct server_queue { } else { queue_tasks.push_back(std::move(task)); } + // if this is cancel task make sure to clean up pending tasks + if (task.type == SERVER_TASK_TYPE_CANCEL) { + cleanup_pending_task(task.id_target); + } condition_tasks.notify_one(); return task.id; } @@ -1445,6 +1449,10 @@ struct server_queue { } else { queue_tasks.push_back(std::move(task)); } + // if this is cancel task make sure to clean up pending tasks + if (task.type == SERVER_TASK_TYPE_CANCEL) { + cleanup_pending_task(task.id_target); + } } condition_tasks.notify_one(); return 0; @@ -1539,6 +1547,23 @@ struct server_queue { } } } + +private: + void cleanup_pending_task(int id_task) { + // no need lock because this is called exclusively by post() + for (size_t i = 0; i < queue_tasks.size(); i++) { + if (queue_tasks[i].id == id_task) { + queue_tasks.erase(queue_tasks.begin() + i); + break; + } + } + for (size_t i = 0; i < queue_tasks_deferred.size(); i++) { + if (queue_tasks_deferred[i].id == id_task) { + queue_tasks_deferred.erase(queue_tasks_deferred.begin() + i); + break; + } + } + } }; struct server_response { @@ -1574,6 +1599,13 @@ struct server_response { std::unique_lock lock(mutex_results); waiting_task_ids.erase(id_task); + // make sure to clean up all pending results + for (size_t i = 0; i < queue_results.size(); i++) { + if (queue_results[i]->id == id_task) { + queue_results.erase(queue_results.begin() + i); + i--; + } + } } void remove_waiting_task_ids(const std::unordered_set & id_tasks) { @@ -1593,7 +1625,7 @@ struct server_response { return !queue_results.empty(); }); - for (int i = 0; i < (int) queue_results.size(); i++) { + for (size_t i = 0; i < queue_results.size(); i++) { if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) { server_task_result_ptr res = std::move(queue_results[i]); queue_results.erase(queue_results.begin() + i); @@ -1610,10 +1642,8 @@ struct server_response { server_task_result_ptr recv_with_timeout(const std::unordered_set & id_tasks, int timeout) { while (true) { std::unique_lock lock(mutex_results); - bool cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout), [&]{ - return !queue_results.empty(); - }); - if (!cr_res) { + std::cv_status cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout)); + if (cr_res == std::cv_status::timeout) { return nullptr; } @@ -2368,8 +2398,8 @@ struct server_context { server_task task(SERVER_TASK_TYPE_CANCEL); task.id_target = id_task; - cancel_tasks.push_back(task); queue_results.remove_waiting_task_id(id_task); + cancel_tasks.push_back(task); } // push to beginning of the queue, so it has highest priority queue_tasks.post(cancel_tasks, true);