add missing notify_one

This commit is contained in:
Xuan Son Nguyen 2024-09-03 10:57:36 +02:00
parent ec882cc1ef
commit d3fedaa6d6

View file

@ -226,10 +226,6 @@ struct server_slot {
return n_remaining > 0; // no budget return n_remaining > 0; // no budget
} }
bool available() const {
return state == SLOT_STATE_IDLE;
}
bool is_processing() const { bool is_processing() const {
return state != SLOT_STATE_IDLE; return state != SLOT_STATE_IDLE;
} }
@ -249,7 +245,7 @@ struct server_slot {
{"id_slot", id}, {"id_slot", id},
{"id_task", id_task}, {"id_task", id_task},
{"n_past", n_past}, {"n_past", n_past},
{"truncated", truncated} {"truncated", truncated},
}); });
callback_on_release(id); callback_on_release(id);
} }
@ -436,6 +432,7 @@ struct server_queue {
void defer(server_task task) { void defer(server_task task) {
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
queue_tasks_deferred.push_back(std::move(task)); queue_tasks_deferred.push_back(std::move(task));
condition_tasks.notify_one();
} }
// Get the next id for creating a new task // Get the next id for creating a new task
@ -458,7 +455,6 @@ struct server_queue {
// Call when the state of one slot is changed, it will move one task from deferred to main queue // Call when the state of one slot is changed, it will move one task from deferred to main queue
void pop_deferred_task() { void pop_deferred_task() {
// move deferred tasks back to main loop
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
if (!queue_tasks_deferred.empty()) { if (!queue_tasks_deferred.empty()) {
server_task task = queue_tasks_deferred.front(); server_task task = queue_tasks_deferred.front();
@ -807,7 +803,7 @@ struct server_context {
for (server_slot & slot : slots) { for (server_slot & slot : slots) {
// skip the slot if it is not available // skip the slot if it is not available
if (!slot.available()) { if (slot.is_processing()) {
continue; continue;
} }
@ -849,7 +845,7 @@ struct server_context {
int64_t t_last = ggml_time_us(); int64_t t_last = ggml_time_us();
for (server_slot & slot : slots) { for (server_slot & slot : slots) {
// skip the slot if it is not available // skip the slot if it is not available
if (!slot.available()) { if (slot.is_processing()) {
continue; continue;
} }
@ -1631,7 +1627,7 @@ struct server_context {
queue_tasks.defer(task); queue_tasks.defer(task);
break; break;
} }
if (!slot->available()) { if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later // if requested slot is unavailable, we defer this task for processing later
LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}}); LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}});
queue_tasks.defer(task); queue_tasks.defer(task);
@ -1756,7 +1752,7 @@ struct server_context {
send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST);
break; break;
} }
if (!slot->available()) { if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later // if requested slot is unavailable, we defer this task for processing later
LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}}); LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}});
queue_tasks.defer(task); queue_tasks.defer(task);
@ -1797,7 +1793,7 @@ struct server_context {
send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST);
break; break;
} }
if (!slot->available()) { if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later // if requested slot is unavailable, we defer this task for processing later
LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}}); LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}});
queue_tasks.defer(task); queue_tasks.defer(task);
@ -1845,7 +1841,7 @@ struct server_context {
send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST);
break; break;
} }
if (!slot->available()) { if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later // if requested slot is unavailable, we defer this task for processing later
LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}}); LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}});
queue_tasks.defer(task); queue_tasks.defer(task);