diff --git a/examples/server-parallel/server.cpp b/examples/server-parallel/server.cpp index 0d98db4de..b98477856 100644 --- a/examples/server-parallel/server.cpp +++ b/examples/server-parallel/server.cpp @@ -105,6 +105,10 @@ struct llama_client_slot } void addTokenString(string token) { + if(command == RELEASE) { + sampled_tokens.clear(); + return; + } sampled_tokens.insert(sampled_tokens.begin(), token); n_tokens_predicted++; } @@ -135,6 +139,7 @@ struct server_parallel_context { std::vector candidates; std::vector tokens_system; int32_t n_tokens_system = 0; + bool all_slots_are_idle = false; llama_batch batch; bool loadModel(gpt_params params_) { @@ -172,6 +177,7 @@ struct server_parallel_context { user_name = "User:"; assistant_name = "Assistant:"; params.antiprompt.push_back(user_name); + all_slots_are_idle = true; } void updateSystemPrompt() { @@ -217,6 +223,7 @@ struct server_parallel_context { slot.release(); } waitAllAreIdle(); + all_slots_are_idle = true; // wait until system prompt load update_system_prompt = true; while(update_system_prompt) { @@ -244,6 +251,7 @@ struct server_parallel_context { if ((slot_id == -1 && slot.available()) || slot.id == slot_id) { slot.start(prompt, temperature); + all_slots_are_idle = false; return &slot; // return a pointer to slot (thread safe?) } } @@ -292,6 +300,7 @@ struct server_parallel_context { } bool updateSlots() { + // update the system prompt wait until all slots are idle state if(update_system_prompt) { updateSystemPrompt(); @@ -299,10 +308,13 @@ struct server_parallel_context { batch.n_tokens = 0; int kv_cache_free = (n_ctx - n_tokens_system); - + if(all_slots_are_idle) { + // avoid 100% usage of cpu all time + this_thread::sleep_for(chrono::milliseconds(5)); + } // decode any currently ongoing sequences for (auto & slot : slots) { - if (slot.state == PROCESSING && slot.command == RELEASE) + if (slot.state == PROCESSING && slot.command == RELEASE && !slot.hasNewToken()) { LOG_TEE("slot %i released\n", slot.id); llama_kv_cache_seq_rm(ctx, slot.id, n_tokens_system, n_ctx); @@ -310,16 +322,12 @@ struct server_parallel_context { slot.command = NONE; slot.n_prompt = 0; slot.n_tokens_predicted = 0; - slot.sampled_tokens.clear(); continue; } kv_cache_free -= slot.n_prompt; - // no decode wait until the token had been send to client - // improves performance and avoid decoherence? - - if (slot.state == IDLE) { + if (slot.state == IDLE || slot.command == RELEASE) { continue; } @@ -369,6 +377,7 @@ struct server_parallel_context { } if (batch.n_tokens == 0) { + all_slots_are_idle = true; return true; } @@ -853,10 +862,6 @@ int main(int argc, char **argv) // Verify if the slot exist if (slot) { auto content_provider = [slot](size_t /*offset*/, DataSink &sink) { - if(slot->available()) { // slot has been released - sink.done(); - return false; - } if(slot->hasNewToken()) { // new token notification stringstream ss; json res_d = { @@ -869,10 +874,17 @@ int main(int argc, char **argv) slot->release(); return false; } + } else { + this_thread::sleep_for(chrono::milliseconds(5)); + } + if(slot->available()) { // slot has been released + sink.done(); + return false; } return true; }; auto on_complete = [slot] (bool) { + slot->sampled_tokens.clear(); slot->release(); }; res.set_chunked_content_provider("text/event-stream", content_provider, on_complete);