Merge branch 'master' into gg/refactor-k-shift

This commit is contained in:
Georgi Gerganov 2024-02-25 15:20:13 +02:00
commit d141c749d9
No known key found for this signature in database
GPG key ID: 449E073F9DC10735
9 changed files with 430 additions and 90 deletions

View file

@ -936,10 +936,16 @@ if (CMAKE_OSX_ARCHITECTURES STREQUAL "arm64" OR CMAKE_GENERATOR_PLATFORM_LWR STR
list(APPEND ARCH_FLAGS -mfpu=neon-fp-armv8 -mno-unaligned-access) list(APPEND ARCH_FLAGS -mfpu=neon-fp-armv8 -mno-unaligned-access)
endif() endif()
if (${CMAKE_SYSTEM_PROCESSOR} MATCHES "armv7") if (${CMAKE_SYSTEM_PROCESSOR} MATCHES "armv7")
# Raspberry Pi 2 if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Android")
list(APPEND ARCH_FLAGS -mfpu=neon-fp-armv8 -mno-unaligned-access -funsafe-math-optimizations) # Android armeabi-v7a
list(APPEND ARCH_FLAGS -mfpu=neon-vfpv4 -mno-unaligned-access -funsafe-math-optimizations)
else()
# Raspberry Pi 2
list(APPEND ARCH_FLAGS -mfpu=neon-fp-armv8 -mno-unaligned-access -funsafe-math-optimizations)
endif()
endif() endif()
if (${CMAKE_SYSTEM_PROCESSOR} MATCHES "armv8") if (${CMAKE_SYSTEM_PROCESSOR} MATCHES "armv8")
# Android arm64-v8a
# Raspberry Pi 3, 4, Zero 2 (32-bit) # Raspberry Pi 3, 4, Zero 2 (32-bit)
list(APPEND ARCH_FLAGS -mno-unaligned-access) list(APPEND ARCH_FLAGS -mno-unaligned-access)
endif() endif()

View file

@ -39,9 +39,12 @@ see https://github.com/ggerganov/llama.cpp/issues/1437
- `--mmproj MMPROJ_FILE`: Path to a multimodal projector file for LLaVA. - `--mmproj MMPROJ_FILE`: Path to a multimodal projector file for LLaVA.
- `--grp-attn-n`: Set the group attention factor to extend context size through self-extend(default: 1=disabled), used together with group attention width `--grp-attn-w` - `--grp-attn-n`: Set the group attention factor to extend context size through self-extend(default: 1=disabled), used together with group attention width `--grp-attn-w`
- `--grp-attn-w`: Set the group attention width to extend context size through self-extend(default: 512), used together with group attention factor `--grp-attn-n` - `--grp-attn-w`: Set the group attention width to extend context size through self-extend(default: 512), used together with group attention factor `--grp-attn-n`
- `-n, --n-predict`: Set the maximum tokens to predict (default: -1) - `-n N, --n-predict N`: Set the maximum tokens to predict (default: -1)
- `--slots-endpoint-disable`: To disable slots state monitoring endpoint. Slots state may contain user data, prompts included. - `--slots-endpoint-disable`: To disable slots state monitoring endpoint. Slots state may contain user data, prompts included.
- `--metrics`: enable prometheus `/metrics` compatible endpoint (default: disabled)
- `--chat-template JINJA_TEMPLATE`: Set custom jinja chat template. This parameter accepts a string, not a file name (default: template taken from model's metadata). We only support [some pre-defined templates](https://github.com/ggerganov/llama.cpp/wiki/Templates-supported-by-llama_chat_apply_template) - `--chat-template JINJA_TEMPLATE`: Set custom jinja chat template. This parameter accepts a string, not a file name (default: template taken from model's metadata). We only support [some pre-defined templates](https://github.com/ggerganov/llama.cpp/wiki/Templates-supported-by-llama_chat_apply_template)
- `--log-disable`: Output logs to stdout only, default: enabled.
- `--log-format FORMAT`: Define the log output to FORMAT: json or text (default: json)
## Build ## Build
@ -457,6 +460,18 @@ Notice that each `probs` is an array of length `n_probs`.
] ]
``` ```
- **GET** `/metrics`: [Prometheus](https://prometheus.io/) compatible metrics exporter endpoint if `--metrics` is enabled:
Available metrics:
- `llamacpp:prompt_tokens_total`: Number of prompt tokens processed.
- `llamacpp:tokens_predicted_total`: Number of generation tokens processed.
- `llamacpp:prompt_tokens_seconds`: Average prompt throughput in tokens/s.
- `llamacpp:predicted_tokens_seconds`: Average generation throughput in tokens/s.
- `llamacpp:kv_cache_usage_ratio`: KV-cache usage. 1 means 100 percent usage.
- `llamacpp:kv_cache_tokens`: KV-cache tokens.
- `llamacpp:requests_processing`: Number of request processing.
- `llamacpp:requests_deferred`: Number of request deferred.
## More examples ## More examples
### Change system prompt on runtime ### Change system prompt on runtime

View file

@ -43,9 +43,11 @@ struct server_params
int32_t read_timeout = 600; int32_t read_timeout = 600;
int32_t write_timeout = 600; int32_t write_timeout = 600;
bool slots_endpoint = true; bool slots_endpoint = true;
bool metrics_endpoint = false;
}; };
bool server_verbose = false; bool server_verbose = false;
bool server_log_json = true;
static size_t common_part(const std::vector<llama_token> &a, const std::vector<llama_token> &b) static size_t common_part(const std::vector<llama_token> &a, const std::vector<llama_token> &b)
{ {
@ -301,12 +303,76 @@ struct llama_client_slot
} }
void print_timings() const { void print_timings() const {
LOG_TEE("\n"); char buffer[512];
LOG_TEE("%s: prompt eval time = %10.2f ms / %5d tokens (%8.2f ms per token, %8.2f tokens per second)\n", double t_token = t_prompt_processing / num_prompt_tokens_processed;
__func__, t_prompt_processing, num_prompt_tokens_processed, t_prompt_processing / num_prompt_tokens_processed, 1e3 / t_prompt_processing * num_prompt_tokens_processed); double n_tokens_second = 1e3 / t_prompt_processing * num_prompt_tokens_processed;
LOG_TEE("%s: eval time = %10.2f ms / %5d runs (%8.2f ms per token, %8.2f tokens per second)\n", sprintf(buffer, "prompt eval time = %10.2f ms / %5d tokens (%8.2f ms per token, %8.2f tokens per second)",
__func__, t_token_generation, n_decoded,t_token_generation / n_decoded, 1e3 / t_token_generation * n_decoded); t_prompt_processing, num_prompt_tokens_processed,
LOG_TEE("%s: total time = %10.2f ms\n", __func__, t_prompt_processing + t_token_generation); t_token, n_tokens_second);
LOG_INFO(buffer, {
{"slot_id", id},
{"task_id", task_id},
{"t_prompt_processing", t_prompt_processing},
{"num_prompt_tokens_processed", num_prompt_tokens_processed},
{"t_token", t_token},
{"n_tokens_second", n_tokens_second},
});
t_token = t_token_generation / n_decoded;
n_tokens_second = 1e3 / t_token_generation * n_decoded;
sprintf(buffer, "generation eval time = %10.2f ms / %5d runs (%8.2f ms per token, %8.2f tokens per second)",
t_token_generation, n_decoded,
t_token, n_tokens_second);
LOG_INFO(buffer, {
{"slot_id", id},
{"task_id", task_id},
{"t_token_generation", t_token_generation},
{"n_decoded", n_decoded},
{"t_token", t_token},
{"n_tokens_second", n_tokens_second},
});
sprintf(buffer, " total time = %10.2f ms", t_prompt_processing + t_token_generation);
LOG_INFO(buffer, {
{"slot_id", id},
{"task_id", task_id},
{"t_prompt_processing", t_prompt_processing},
{"t_token_generation", t_token_generation},
{"t_total", t_prompt_processing + t_token_generation},
});
}
};
struct llama_metrics {
uint64_t n_prompt_tokens_processed_total = 0;
uint64_t n_tokens_predicted_total = 0;
uint64_t n_prompt_tokens_processed = 0;
uint64_t t_prompt_processing = 0;
uint64_t n_tokens_predicted = 0;
uint64_t t_tokens_generation = 0;
void on_prompt_eval(const llama_client_slot &slot) {
n_prompt_tokens_processed_total += slot.num_prompt_tokens_processed;
n_prompt_tokens_processed += slot.num_prompt_tokens_processed;
t_prompt_processing += slot.t_prompt_processing;
}
void on_prediction(const llama_client_slot &slot) {
n_tokens_predicted_total += slot.n_decoded;
n_tokens_predicted += slot.n_decoded;
t_tokens_generation += slot.t_token_generation;
}
void reset_bucket() {
n_prompt_tokens_processed = 0;
t_prompt_processing = 0;
n_tokens_predicted = 0;
t_tokens_generation = 0;
} }
}; };
@ -344,6 +410,8 @@ struct llama_server_context
llama_server_queue queue_tasks; llama_server_queue queue_tasks;
llama_server_response queue_results; llama_server_response queue_results;
llama_metrics metrics;
~llama_server_context() ~llama_server_context()
{ {
if (ctx) if (ctx)
@ -363,7 +431,7 @@ struct llama_server_context
params = params_; params = params_;
if (!params.mmproj.empty()) { if (!params.mmproj.empty()) {
multimodal = true; multimodal = true;
LOG_TEE("Multi Modal Mode Enabled"); LOG_INFO("Multi Modal Mode Enabled", {});
clp_ctx = clip_model_load(params.mmproj.c_str(), /*verbosity=*/ 1); clp_ctx = clip_model_load(params.mmproj.c_str(), /*verbosity=*/ 1);
if(clp_ctx == nullptr) { if(clp_ctx == nullptr) {
LOG_ERROR("unable to load clip model", {{"model", params.mmproj}}); LOG_ERROR("unable to load clip model", {{"model", params.mmproj}});
@ -416,7 +484,7 @@ struct llama_server_context
const int32_t n_ctx_slot = n_ctx / params.n_parallel; const int32_t n_ctx_slot = n_ctx / params.n_parallel;
LOG_TEE("Available slots:\n"); LOG_INFO("initializing slots", {{"n_slots", params.n_parallel}});
for (int i = 0; i < params.n_parallel; i++) for (int i = 0; i < params.n_parallel; i++)
{ {
llama_client_slot slot; llama_client_slot slot;
@ -425,7 +493,10 @@ struct llama_server_context
slot.n_ctx = n_ctx_slot; slot.n_ctx = n_ctx_slot;
slot.n_predict = params.n_predict; slot.n_predict = params.n_predict;
LOG_TEE(" -> Slot %i - max context: %i\n", slot.id, n_ctx_slot); LOG_INFO("new slot", {
{"slot_id", slot.id},
{"n_ctx_slot", slot.n_ctx}
});
const int ga_n = params.grp_attn_n; const int ga_n = params.grp_attn_n;
const int ga_w = params.grp_attn_w; const int ga_w = params.grp_attn_w;
@ -435,7 +506,12 @@ struct llama_server_context
GGML_ASSERT(ga_w % ga_n == 0 && "ga_w must be a multiple of ga_n"); // NOLINT GGML_ASSERT(ga_w % ga_n == 0 && "ga_w must be a multiple of ga_n"); // NOLINT
//GGML_ASSERT(n_ctx_train % ga_w == 0 && "n_ctx_train must be a multiple of ga_w"); // NOLINT //GGML_ASSERT(n_ctx_train % ga_w == 0 && "n_ctx_train must be a multiple of ga_w"); // NOLINT
//GGML_ASSERT(n_ctx >= n_ctx_train * ga_n && "n_ctx must be at least n_ctx_train * ga_n"); // NOLINT //GGML_ASSERT(n_ctx >= n_ctx_train * ga_n && "n_ctx must be at least n_ctx_train * ga_n"); // NOLINT
LOG_TEE(" -> Slot %i - self-extend: ga_n = %d, ga_w = %d\n", slot.id, ga_n, ga_w);
LOG_INFO("slot self-extend", {
{"slot_id", slot.id},
{"ga_n", ga_n},
{"ga_w", ga_w}
});
} }
slot.ga_i = 0; slot.ga_i = 0;
@ -729,10 +805,16 @@ struct llama_server_context
img_sl.img_data = clip_image_u8_init(); img_sl.img_data = clip_image_u8_init();
if (!clip_image_load_from_bytes(image_buffer.data(), image_buffer.size(), img_sl.img_data)) if (!clip_image_load_from_bytes(image_buffer.data(), image_buffer.size(), img_sl.img_data))
{ {
LOG_TEE("slot %i - failed to load image [id: %i]\n", slot->id, img_sl.id); LOG_ERROR("failed to load image", {
{"slot_id", slot->id},
{"img_sl_id", img_sl.id}
});
return false; return false;
} }
LOG_TEE("slot %i - loaded image\n", slot->id); LOG_VERBOSE("image loaded", {
{"slot_id", slot->id},
{"img_sl_id", img_sl.id}
});
img_sl.request_encode_image = true; img_sl.request_encode_image = true;
slot->images.push_back(img_sl); slot->images.push_back(img_sl);
} }
@ -792,7 +874,10 @@ struct llama_server_context
all_slots_are_idle = false; all_slots_are_idle = false;
LOG_TEE("slot %i is processing [task id: %i]\n", slot->id, slot->task_id); LOG_INFO("slot is processing task", {
{"slot_id", slot->id},
{"task_id", slot->task_id},
});
return true; return true;
} }
@ -1355,7 +1440,7 @@ struct llama_server_context
if (slot == nullptr) if (slot == nullptr)
{ {
// if no slot is available, we defer this task for processing later // if no slot is available, we defer this task for processing later
LOG_VERBOSE("no slot is available", {}); LOG_VERBOSE("no slot is available", {{"task_id", task.id}});
queue_tasks.defer(task); queue_tasks.defer(task);
break; break;
} }
@ -1404,7 +1489,7 @@ struct llama_server_context
case TASK_TYPE_NEXT_RESPONSE: { case TASK_TYPE_NEXT_RESPONSE: {
// do nothing // do nothing
} break; } break;
case TASK_TYPE_SLOTS_DATA: { case TASK_TYPE_METRICS: {
json slots_data = json::array(); json slots_data = json::array();
int n_idle_slots = 0; int n_idle_slots = 0;
int n_processing_slots = 0; int n_processing_slots = 0;
@ -1431,17 +1516,41 @@ struct llama_server_context
} }
slots_data.push_back(slot_data); slots_data.push_back(slot_data);
} }
LOG_TEE("task %i - slots data: idle=%i processing=%i\n", task.id, n_idle_slots, n_processing_slots); LOG_INFO("slot data", {
{"task_id", task.id},
{"n_idle_slots", n_idle_slots},
{"n_processing_slots", n_processing_slots}
});
LOG_VERBOSE("slot data", {
{"task_id", task.id},
{"n_idle_slots", n_idle_slots},
{"n_processing_slots", n_processing_slots},
{"slots", slots_data}
});
task_result res; task_result res;
res.id = task.id; res.id = task.id;
res.multitask_id = task.multitask_id; res.multitask_id = task.multitask_id;
res.stop = true; res.stop = true;
res.error = false; res.error = false;
res.result_json = { res.result_json = {
{ "idle", n_idle_slots }, { "idle", n_idle_slots },
{ "processing", n_processing_slots }, { "processing", n_processing_slots },
{ "slots", slots_data } { "deferred", queue_tasks.queue_tasks_deferred.size() },
{ "n_prompt_tokens_processed_total", metrics.n_prompt_tokens_processed_total},
{ "n_tokens_predicted_total", metrics.n_tokens_predicted_total},
{ "n_prompt_tokens_processed", metrics.n_prompt_tokens_processed},
{ "t_prompt_processing", metrics.t_prompt_processing},
{ "n_tokens_predicted", metrics.n_tokens_predicted},
{ "t_tokens_generation", metrics.t_tokens_generation},
{ "kv_cache_tokens_count", llama_get_kv_cache_token_count(ctx)},
{ "kv_cache_used_cells", llama_get_kv_cache_used_cells(ctx)},
{ "slots", slots_data },
}; };
metrics.reset_bucket();
queue_results.send(res); queue_results.send(res);
} break; } break;
} }
@ -1469,7 +1578,7 @@ struct llama_server_context
bool update_slots() { bool update_slots() {
if (system_need_update) if (system_need_update)
{ {
LOG_TEE("updating system prompt\n"); LOG_INFO("updating system prompt", {});
update_system_prompt(); update_system_prompt();
} }
@ -1479,12 +1588,13 @@ struct llama_server_context
{ {
if (system_prompt.empty() && clean_kv_cache) if (system_prompt.empty() && clean_kv_cache)
{ {
LOG_TEE("all slots are idle and system prompt is empty, clear the KV cache\n"); LOG_INFO("all slots are idle and system prompt is empty, clear the KV cache", {});
kv_cache_clear(); kv_cache_clear();
} }
return true; return true;
} }
LOG_VERBOSE("posting NEXT_RESPONSE", {});
task_server task; task_server task;
task.type = TASK_TYPE_NEXT_RESPONSE; task.type = TASK_TYPE_NEXT_RESPONSE;
task.target_id = -1; task.target_id = -1;
@ -1498,10 +1608,20 @@ struct llama_server_context
{ {
// Shift context // Shift context
const int n_keep = slot.params.n_keep + add_bos_token; const int n_keep = slot.params.n_keep + add_bos_token;
const int n_left = system_tokens.size() + slot.n_past - n_keep; const int n_left = (int) system_tokens.size() + slot.n_past - n_keep;
const int n_discard = n_left / 2; const int n_discard = n_left / 2;
LOG_TEE("slot %d: context shift - n_keep = %d, n_left = %d, n_discard = %d\n", slot.id, n_keep, n_left, n_discard); LOG_INFO("slot context shift", {
{"slot_id", slot.id},
{"task_id", slot.task_id},
{"n_keep", n_keep},
{"n_left", n_left},
{"n_discard", n_discard},
{"n_ctx", n_ctx},
{"n_past", slot.n_past},
{"n_system_tokens", system_tokens.size()},
{"n_cache_tokens", slot.cache_tokens.size()}
});
llama_kv_cache_seq_rm (ctx, slot.id, n_keep , n_keep + n_discard); llama_kv_cache_seq_rm (ctx, slot.id, n_keep , n_keep + n_discard);
llama_kv_cache_seq_add(ctx, slot.id, n_keep + n_discard, system_tokens.size() + slot.n_past, -n_discard); llama_kv_cache_seq_add(ctx, slot.id, n_keep + n_discard, system_tokens.size() + slot.n_past, -n_discard);
@ -1515,17 +1635,12 @@ struct llama_server_context
slot.n_past -= n_discard; slot.n_past -= n_discard;
slot.truncated = true; slot.truncated = true;
LOG_VERBOSE("context shift", {
{ "n_ctx", n_ctx },
{ "n_keep", n_keep },
{ "n_left", n_left },
});
} }
} }
} }
// decode any currently ongoing sequences // decode any currently ongoing sequences
LOG_VERBOSE("decoding ongoing sequences", {});
for (auto & slot : slots) for (auto & slot : slots)
{ {
// release the slot // release the slot
@ -1535,7 +1650,15 @@ struct llama_server_context
slot.command = NONE; slot.command = NONE;
slot.t_last_used = ggml_time_us(); slot.t_last_used = ggml_time_us();
LOG_TEE("slot %d released (%d tokens in cache)\n", slot.id, (int) slot.cache_tokens.size()); LOG_INFO("slot released", {
{"slot_id", slot.id},
{"task_id", slot.task_id},
{"n_ctx", n_ctx},
{"n_past", slot.n_past},
{"n_system_tokens", system_tokens.size()},
{"n_cache_tokens", slot.cache_tokens.size()},
{"truncated", slot.truncated}
});
queue_tasks.notify_slot_changed(); queue_tasks.notify_slot_changed();
continue; continue;
@ -1683,7 +1806,12 @@ struct llama_server_context
slot.ga_i = ga_i; slot.ga_i = ga_i;
} }
LOG_TEE("slot %d : in cache: %i tokens | to process: %i tokens\n", slot.id, slot.n_past, slot.num_prompt_tokens_processed); LOG_INFO("slot progression", {
{ "slot_id", slot.id },
{ "task_id", slot.task_id },
{ "n_past", slot.n_past },
{ "num_prompt_tokens_processed", slot.num_prompt_tokens_processed }
});
} }
slot.cache_tokens = prompt_tokens; slot.cache_tokens = prompt_tokens;
@ -1691,7 +1819,10 @@ struct llama_server_context
if (slot.n_past == slot.num_prompt_tokens && slot.n_past > 0) if (slot.n_past == slot.num_prompt_tokens && slot.n_past > 0)
{ {
// we have to evaluate at least 1 token to generate logits. // we have to evaluate at least 1 token to generate logits.
LOG_TEE("slot %d : we have to evaluate at least 1 token to generate logits\n", slot.id); LOG_INFO("we have to evaluate at least 1 token to generate logits", {
{ "slot_id", slot.id },
{ "task_id", slot.task_id }
});
slot.n_past--; slot.n_past--;
if (slot.ga_i > 0) if (slot.ga_i > 0)
{ {
@ -1699,9 +1830,13 @@ struct llama_server_context
} }
} }
LOG_TEE("slot %d : kv cache rm - [%d, end)\n", slot.id, (int) system_tokens.size() + slot.n_past); int p0 = (int) system_tokens.size() + slot.n_past;
LOG_INFO("kv cache rm [p0, end)", {
llama_kv_cache_seq_rm(ctx, slot.id, system_tokens.size() + slot.n_past, -1); { "slot_id", slot.id },
{ "task_id", slot.task_id },
{ "p0", p0 }
});
llama_kv_cache_seq_rm(ctx, slot.id, p0, -1);
LOG_VERBOSE("prompt ingested", { LOG_VERBOSE("prompt ingested", {
{"n_past", slot.n_past}, {"n_past", slot.n_past},
@ -1736,7 +1871,13 @@ struct llama_server_context
if (has_images && !ingest_images(slot, n_batch)) if (has_images && !ingest_images(slot, n_batch))
{ {
LOG_TEE("failed processing images\n"); LOG_ERROR("failed processing images", {
"slot_id", slot.id,
"task_id", slot.task_id,
});
// FIXME @phymbert: to be properly tested
// early returning without changing the slot state will block the slot for ever
// no one at the moment is checking the return value
return false; return false;
} }
@ -1849,6 +1990,7 @@ struct llama_server_context
{ {
slot.t_start_genereration = ggml_time_us(); slot.t_start_genereration = ggml_time_us();
slot.t_prompt_processing = (slot.t_start_genereration - slot.t_start_process_prompt) / 1e3; slot.t_prompt_processing = (slot.t_start_genereration - slot.t_start_process_prompt) / 1e3;
metrics.on_prompt_eval(slot);
} }
llama_token_data_array cur_p = { slot.ctx_sampling->cur.data(), slot.ctx_sampling->cur.size(), false }; llama_token_data_array cur_p = { slot.ctx_sampling->cur.data(), slot.ctx_sampling->cur.size(), false };
@ -1871,11 +2013,14 @@ struct llama_server_context
slot.release(); slot.release();
slot.print_timings(); slot.print_timings();
send_final_response(slot); send_final_response(slot);
metrics.on_prediction(slot);
} }
slot.i_batch = -1; slot.i_batch = -1;
} }
} }
LOG_VERBOSE("slots updated", {});
return true; return true;
} }
@ -1953,8 +2098,10 @@ static void server_print_usage(const char *argv0, const gpt_params &params,
printf(" -ctv TYPE, --cache-type-v TYPE\n"); printf(" -ctv TYPE, --cache-type-v TYPE\n");
printf(" KV cache data type for V (default: f16)\n"); printf(" KV cache data type for V (default: f16)\n");
printf(" --mmproj MMPROJ_FILE path to a multimodal projector file for LLaVA.\n"); printf(" --mmproj MMPROJ_FILE path to a multimodal projector file for LLaVA.\n");
printf(" --log-format log output format: json or text (default: json)\n");
printf(" --log-disable disables logging to a file.\n"); printf(" --log-disable disables logging to a file.\n");
printf(" --slots-endpoint-disable disables slots monitoring endpoint.\n"); printf(" --slots-endpoint-disable disables slots monitoring endpoint.\n");
printf(" --metrics enable prometheus compatible metrics endpoint (default: %s).\n", sparams.metrics_endpoint ? "enabled" : "disabled");
printf("\n"); printf("\n");
printf(" -n, --n-predict maximum tokens to predict (default: %d)\n", params.n_predict); printf(" -n, --n-predict maximum tokens to predict (default: %d)\n", params.n_predict);
printf(" --override-kv KEY=TYPE:VALUE\n"); printf(" --override-kv KEY=TYPE:VALUE\n");
@ -2405,6 +2552,27 @@ static void server_params_parse(int argc, char **argv, server_params &sparams,
} }
params.mmproj = argv[i]; params.mmproj = argv[i];
} }
else if (arg == "--log-format")
{
if (++i >= argc)
{
invalid_param = true;
break;
}
if (std::strcmp(argv[i], "json") == 0)
{
server_log_json = true;
}
else if (std::strcmp(argv[i], "text") == 0)
{
server_log_json = false;
}
else
{
invalid_param = true;
break;
}
}
else if (arg == "--log-disable") else if (arg == "--log-disable")
{ {
log_set_target(stdout); log_set_target(stdout);
@ -2414,6 +2582,10 @@ static void server_params_parse(int argc, char **argv, server_params &sparams,
{ {
sparams.slots_endpoint = false; sparams.slots_endpoint = false;
} }
else if (arg == "--metrics")
{
sparams.metrics_endpoint = true;
}
else if (arg == "--chat-template") else if (arg == "--chat-template")
{ {
if (++i >= argc) if (++i >= argc)
@ -2514,32 +2686,40 @@ static json format_partial_response(
static json format_tokenizer_response(const std::vector<llama_token> &tokens) static json format_tokenizer_response(const std::vector<llama_token> &tokens)
{ {
return json{ return json {
{"tokens", tokens}}; {"tokens", tokens}
};
} }
static json format_detokenized_response(std::string content) static json format_detokenized_response(std::string content)
{ {
return json{ return json {
{"content", content}}; {"content", content}
};
} }
static void log_server_request(const httplib::Request &req, const httplib::Response &res) static void log_server_request(const httplib::Request &req, const httplib::Response &res)
{ {
// skip GH copilot requests when using default port
if (req.path == "/v1/health" || req.path == "/v1/completions")
{
return;
}
LOG_INFO("request", { LOG_INFO("request", {
{"remote_addr", req.remote_addr}, {"remote_addr", req.remote_addr},
{"remote_port", req.remote_port}, {"remote_port", req.remote_port},
{"status", res.status}, {"status", res.status},
{"method", req.method}, {"method", req.method},
{"path", req.path}, {"path", req.path},
{"params", req.params}, {"params", req.params},
}); });
LOG_VERBOSE("request", { LOG_VERBOSE("request", {
{"request", req.body}, {"request", req.body},
{"response", res.body}, {"response", res.body},
}); });
} }
struct token_translator struct token_translator
@ -2621,7 +2801,7 @@ int main(int argc, char **argv)
// request slots data using task queue // request slots data using task queue
task_server task; task_server task;
task.id = llama.queue_tasks.get_new_id(); task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA; task.type = TASK_TYPE_METRICS;
task.target_id = -1; task.target_id = -1;
llama.queue_results.add_waiting_task_id(task.id); llama.queue_results.add_waiting_task_id(task.id);
@ -2668,7 +2848,7 @@ int main(int argc, char **argv)
// request slots data using task queue // request slots data using task queue
task_server task; task_server task;
task.id = llama.queue_tasks.get_new_id(); task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA; task.type = TASK_TYPE_METRICS;
task.target_id = -1; task.target_id = -1;
llama.queue_results.add_waiting_task_id(task.id); llama.queue_results.add_waiting_task_id(task.id);
@ -2683,6 +2863,87 @@ int main(int argc, char **argv)
}); });
} }
if (sparams.metrics_endpoint) {
svr.Get("/metrics", [&](const httplib::Request&, httplib::Response& res) {
// request slots data using task queue
task_server task;
task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_METRICS;
task.target_id = -1;
llama.queue_results.add_waiting_task_id(task.id);
llama.queue_tasks.post(task);
// get the result
task_result result = llama.queue_results.recv(task.id);
llama.queue_results.remove_waiting_task_id(task.id);
json data = result.result_json;
uint64_t n_prompt_tokens_processed = data["n_prompt_tokens_processed"];
uint64_t t_prompt_processing = data["t_prompt_processing"];
uint64_t n_tokens_predicted = data["n_tokens_predicted"];
uint64_t t_tokens_generation = data["t_tokens_generation"];
int32_t kv_cache_used_cells = data["kv_cache_used_cells"];
// metrics definition: https://prometheus.io/docs/practices/naming/#metric-names
json all_metrics_def = json {
{"counter", {{
{"name", "prompt_tokens_total"},
{"help", "Number of prompt tokens processed."},
{"value", data["n_prompt_tokens_processed_total"]}
}, {
{"name", "tokens_predicted_total"},
{"help", "Number of generation tokens processed."},
{"value", data["n_tokens_predicted_total"]}
}}},
{"gauge", {{
{"name", "prompt_tokens_seconds"},
{"help", "Average prompt throughput in tokens/s."},
{"value", n_prompt_tokens_processed ? 1e3 / t_prompt_processing * n_prompt_tokens_processed : 0}
},{
{"name", "predicted_tokens_seconds"},
{"help", "Average generation throughput in tokens/s."},
{"value", n_tokens_predicted ? 1e3 / t_tokens_generation * n_tokens_predicted : 0}
},{
{"name", "kv_cache_usage_ratio"},
{"help", "KV-cache usage. 1 means 100 percent usage."},
{"value", 1. * kv_cache_used_cells / params.n_ctx}
},{
{"name", "kv_cache_tokens"},
{"help", "KV-cache tokens."},
{"value", data["kv_cache_tokens_count"]}
},{
{"name", "requests_processing"},
{"help", "Number of request processing."},
{"value", data["processing"]}
},{
{"name", "requests_deferred"},
{"help", "Number of request deferred."},
{"value", data["deferred"]}
}}}
};
std::stringstream prometheus;
for (const auto& el : all_metrics_def.items()) {
const auto& type = el.key();
const auto& metrics_def = el.value();
for (const auto& metric_def : metrics_def) {
std::string name = metric_def["name"];
std::string help = metric_def["help"];
prometheus << "# HELP llamacpp:" << name << " " << help << "\n"
<< "# TYPE llamacpp:" << name << " " << type << "\n"
<< "llamacpp:" << name << " " << metric_def["value"] << "\n";
}
}
res.set_content(prometheus.str(), "text/plain; version=0.0.4");
res.status = 200; // HTTP OK
});
}
svr.set_logger(log_server_request); svr.set_logger(log_server_request);
svr.set_exception_handler([](const httplib::Request &, httplib::Response &res, std::exception_ptr ep) svr.set_exception_handler([](const httplib::Request &, httplib::Response &res, std::exception_ptr ep)
@ -2735,9 +2996,6 @@ int main(int argc, char **argv)
// Set the base directory for serving static files // Set the base directory for serving static files
svr.set_base_dir(sparams.public_path); svr.set_base_dir(sparams.public_path);
// to make it ctrl+clickable:
LOG_TEE("\nllama server listening at http://%s:%d\n\n", sparams.hostname.c_str(), sparams.port);
std::unordered_map<std::string, std::string> log_data; std::unordered_map<std::string, std::string> log_data;
log_data["hostname"] = sparams.hostname; log_data["hostname"] = sparams.hostname;
log_data["port"] = std::to_string(sparams.port); log_data["port"] = std::to_string(sparams.port);

View file

@ -32,6 +32,7 @@ It's possible to override some scenario steps values with environment variables:
- `PORT` -> `context.server_port` to set the listening port of the server during scenario, default: `8080` - `PORT` -> `context.server_port` to set the listening port of the server during scenario, default: `8080`
- `LLAMA_SERVER_BIN_PATH` -> to change the server binary path, default: `../../../build/bin/server` - `LLAMA_SERVER_BIN_PATH` -> to change the server binary path, default: `../../../build/bin/server`
- `DEBUG` -> "ON" to enable steps and server verbose mode `--verbose` - `DEBUG` -> "ON" to enable steps and server verbose mode `--verbose`
- `SERVER_LOG_FORMAT_JSON` -> if set switch server logs to json format
### Run @bug, @wip or @wrong_usage annotated scenario ### Run @bug, @wip or @wrong_usage annotated scenario

View file

@ -16,6 +16,8 @@ def before_scenario(context, scenario):
def after_scenario(context, scenario): def after_scenario(context, scenario):
if context.server_process is None:
return
if scenario.status == "failed": if scenario.status == "failed":
if 'GITHUB_ACTIONS' in os.environ: if 'GITHUB_ACTIONS' in os.environ:
print(f"\x1b[33;101mSCENARIO FAILED: {scenario.name} server logs:\x1b[0m\n\n") print(f"\x1b[33;101mSCENARIO FAILED: {scenario.name} server logs:\x1b[0m\n\n")

View file

@ -13,6 +13,7 @@ Feature: llama.cpp server
And 1 slots And 1 slots
And embeddings extraction And embeddings extraction
And 32 server max tokens to predict And 32 server max tokens to predict
And prometheus compatible metrics exposed
Then the server is starting Then the server is starting
Then the server is healthy Then the server is healthy
@ -25,11 +26,12 @@ Feature: llama.cpp server
And <n_predict> max tokens to predict And <n_predict> max tokens to predict
And a completion request with no api error And a completion request with no api error
Then <n_predicted> tokens are predicted matching <re_content> Then <n_predicted> tokens are predicted matching <re_content>
And prometheus metrics are exposed
Examples: Prompts Examples: Prompts
| prompt | n_predict | re_content | n_predicted | | prompt | n_predict | re_content | n_predicted |
| I believe the meaning of life is | 8 | read | 8 | | I believe the meaning of life is | 8 | (read<or>going)+ | 8 |
| Write a joke about AI | 64 | (park<or>friends<or>scared)+ | 32 | | Write a joke about AI | 64 | (park<or>friends<or>scared<or>always)+ | 32 |
Scenario Outline: OAI Compatibility Scenario Outline: OAI Compatibility
Given a model <model> Given a model <model>

View file

@ -13,6 +13,7 @@ import aiohttp
import openai import openai
from behave import step from behave import step
from behave.api.async_step import async_run_until_complete from behave.api.async_step import async_run_until_complete
from prometheus_client import parser
@step(u"a server listening on {server_fqdn}:{server_port}") @step(u"a server listening on {server_fqdn}:{server_port}")
@ -34,6 +35,8 @@ def step_server_config(context, server_fqdn, server_port):
context.server_api_key = None context.server_api_key = None
context.server_continuous_batching = False context.server_continuous_batching = False
context.server_embeddings = False context.server_embeddings = False
context.server_metrics = False
context.server_process = None
context.server_seed = None context.server_seed = None
context.user_api_key = None context.user_api_key = None
@ -82,6 +85,11 @@ def step_server_embeddings(context):
context.server_embeddings = True context.server_embeddings = True
@step(u'prometheus compatible metrics exposed')
def step_server_metrics(context):
context.server_metrics = True
@step(u"the server is starting") @step(u"the server is starting")
def step_start_server(context): def step_start_server(context):
start_server_background(context) start_server_background(context)
@ -424,6 +432,23 @@ def step_check_options_header_value(context, cors_header, cors_header_value):
assert context.options_response.headers[cors_header] == cors_header_value assert context.options_response.headers[cors_header] == cors_header_value
@step(u'prometheus metrics are exposed')
@async_run_until_complete
async def step_prometheus_metrics_exported(context):
async with aiohttp.ClientSession() as session:
async with await session.get(f'{context.base_url}/metrics') as metrics_response:
assert metrics_response.status == 200
assert metrics_response.headers['Content-Type'] == "text/plain; version=0.0.4"
metrics_raw = await metrics_response.text()
metric_exported = False
for metric in parser.text_string_to_metric_families(metrics_raw):
match metric.name:
case "llamacpp:kv_cache_usage_ratio":
assert len(metric.samples) > 0
metric_exported = True
assert metric_exported, "No metrics exported"
async def concurrent_requests(context, f_completion, *args, **kwargs): async def concurrent_requests(context, f_completion, *args, **kwargs):
n_prompts = len(context.prompts) n_prompts = len(context.prompts)
if context.debug: if context.debug:
@ -753,6 +778,8 @@ def start_server_background(context):
server_args.append('--cont-batching') server_args.append('--cont-batching')
if context.server_embeddings: if context.server_embeddings:
server_args.append('--embedding') server_args.append('--embedding')
if context.server_metrics:
server_args.append('--metrics')
if context.model_alias is not None: if context.model_alias is not None:
server_args.extend(['--alias', context.model_alias]) server_args.extend(['--alias', context.model_alias])
if context.n_ctx is not None: if context.n_ctx is not None:
@ -765,6 +792,8 @@ def start_server_background(context):
server_args.extend(['--api-key', context.server_api_key]) server_args.extend(['--api-key', context.server_api_key])
if context.debug: if context.debug:
server_args.append('--verbose') server_args.append('--verbose')
if 'SERVER_LOG_FORMAT_JSON' not in os.environ:
server_args.extend(['--log-format', "text"])
print(f"starting server with: {context.server_path}", *server_args) print(f"starting server with: {context.server_path}", *server_args)
context.server_process = subprocess.Popen( context.server_process = subprocess.Popen(
[str(arg) for arg in [context.server_path, *server_args]], [str(arg) for arg in [context.server_path, *server_args]],

View file

@ -1,3 +1,4 @@
aiohttp~=3.9.3 aiohttp~=3.9.3
behave~=1.2.6 behave~=1.2.6
openai~=0.25.0 openai~=0.25.0
prometheus-client~=0.20.0

View file

@ -14,6 +14,7 @@
using json = nlohmann::json; using json = nlohmann::json;
extern bool server_verbose; extern bool server_verbose;
extern bool server_log_json;
#ifndef SERVER_VERBOSE #ifndef SERVER_VERBOSE
#define SERVER_VERBOSE 1 #define SERVER_VERBOSE 1
@ -27,14 +28,14 @@ extern bool server_verbose;
{ \ { \
if (server_verbose) \ if (server_verbose) \
{ \ { \
server_log("VERBOSE", __func__, __LINE__, MSG, __VA_ARGS__); \ server_log("VERB", __func__, __LINE__, MSG, __VA_ARGS__); \
} \ } \
} while (0) } while (0)
#endif #endif
#define LOG_ERROR( MSG, ...) server_log("ERROR", __func__, __LINE__, MSG, __VA_ARGS__) #define LOG_ERROR( MSG, ...) server_log("ERR", __func__, __LINE__, MSG, __VA_ARGS__)
#define LOG_WARNING(MSG, ...) server_log("WARNING", __func__, __LINE__, MSG, __VA_ARGS__) #define LOG_WARNING(MSG, ...) server_log("WARN", __func__, __LINE__, MSG, __VA_ARGS__)
#define LOG_INFO( MSG, ...) server_log("INFO", __func__, __LINE__, MSG, __VA_ARGS__) #define LOG_INFO( MSG, ...) server_log("INFO", __func__, __LINE__, MSG, __VA_ARGS__)
// //
// parallel // parallel
@ -50,7 +51,7 @@ enum task_type {
TASK_TYPE_COMPLETION, TASK_TYPE_COMPLETION,
TASK_TYPE_CANCEL, TASK_TYPE_CANCEL,
TASK_TYPE_NEXT_RESPONSE, TASK_TYPE_NEXT_RESPONSE,
TASK_TYPE_SLOTS_DATA TASK_TYPE_METRICS
}; };
struct task_server { struct task_server {
@ -133,26 +134,48 @@ struct completion_token_output
std::string text_to_send; std::string text_to_send;
}; };
static inline void server_log(const char *level, const char *function, int line, static inline void server_log(const char *level, const char *function, int line, const char *message, const nlohmann::ordered_json &extra)
const char *message, const nlohmann::ordered_json &extra)
{ {
nlohmann::ordered_json log std::stringstream ss_tid;
{ ss_tid << std::this_thread::get_id();
json log = nlohmann::ordered_json{
{"tid", ss_tid.str()},
{"timestamp", time(nullptr)}, {"timestamp", time(nullptr)},
{"level", level},
{"function", function},
{"line", line},
{"message", message},
}; };
if (!extra.empty()) if (server_log_json) {
{ log.merge_patch(
log.merge_patch(extra); {
} {"level", level},
{"function", function},
{"line", line},
{"msg", message},
});
if (!extra.empty()) {
log.merge_patch(extra);
}
const std::string str = log.dump(-1, ' ', false, json::error_handler_t::replace); std::cout << log.dump(-1, ' ', false, json::error_handler_t::replace) << "\n" << std::flush;
printf("%.*s\n", (int)str.size(), str.data()); } else {
fflush(stdout); char buf[1024];
snprintf(buf, 1024, "%4s [%24s] %s", level, function, message);
if (!extra.empty()) {
log.merge_patch(extra);
}
std::stringstream ss;
ss << buf << " |";
for (const auto& el : log.items())
{
const std::string value = el.value().dump(-1, ' ', false, json::error_handler_t::replace);
snprintf(buf, 1024, " %s=%s", el.key().c_str(), value.c_str());
ss << buf;
}
const std::string str = ss.str();
printf("%.*s\n", (int)str.size(), str.data());
fflush(stdout);
}
} }
// //
@ -234,6 +257,7 @@ struct llama_server_queue {
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
if (task.id == -1) { if (task.id == -1) {
task.id = id++; task.id = id++;
LOG_VERBOSE("new task id", {{"new_id", task.id}});
} }
queue_tasks.push_back(std::move(task)); queue_tasks.push_back(std::move(task));
condition_tasks.notify_one(); condition_tasks.notify_one();
@ -249,7 +273,9 @@ struct llama_server_queue {
// Get the next id for creating anew task // Get the next id for creating anew task
int get_new_id() { int get_new_id() {
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
return id++; int new_id = id++;
LOG_VERBOSE("new task id", {{"new_id", new_id}});
return new_id;
} }
// Register function to process a new task // Register function to process a new task
@ -290,8 +316,7 @@ struct llama_server_queue {
void start_loop() { void start_loop() {
running = true; running = true;
while (true) { while (true) {
// new task arrived LOG_VERBOSE("new task may arrive", {});
LOG_VERBOSE("have new task", {});
{ {
while (true) while (true)
{ {
@ -303,7 +328,7 @@ struct llama_server_queue {
task_server task = queue_tasks.front(); task_server task = queue_tasks.front();
queue_tasks.erase(queue_tasks.begin()); queue_tasks.erase(queue_tasks.begin());
lock.unlock(); lock.unlock();
LOG_VERBOSE("callback_new_task", {}); LOG_VERBOSE("callback_new_task", {{"task_id", task.id}});
callback_new_task(task); callback_new_task(task);
} }
LOG_VERBOSE("callback_all_task_finished", {}); LOG_VERBOSE("callback_all_task_finished", {});
@ -384,11 +409,13 @@ struct llama_server_response {
std::condition_variable condition_results; std::condition_variable condition_results;
void add_waiting_task_id(int task_id) { void add_waiting_task_id(int task_id) {
LOG_VERBOSE("waiting for task id", {{"task_id", task_id}});
std::unique_lock<std::mutex> lock(mutex_results); std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.insert(task_id); waiting_task_ids.insert(task_id);
} }
void remove_waiting_task_id(int task_id) { void remove_waiting_task_id(int task_id) {
LOG_VERBOSE("remove waiting for task id", {{"task_id", task_id}});
std::unique_lock<std::mutex> lock(mutex_results); std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.erase(task_id); waiting_task_ids.erase(task_id);
} }
@ -401,7 +428,6 @@ struct llama_server_response {
condition_results.wait(lock, [&]{ condition_results.wait(lock, [&]{
return !queue_results.empty(); return !queue_results.empty();
}); });
LOG_VERBOSE("condition_results unblock", {});
for (int i = 0; i < (int) queue_results.size(); i++) for (int i = 0; i < (int) queue_results.size(); i++)
{ {
@ -426,22 +452,22 @@ struct llama_server_response {
// Send a new result to a waiting task_id // Send a new result to a waiting task_id
void send(task_result result) { void send(task_result result) {
std::unique_lock<std::mutex> lock(mutex_results); std::unique_lock<std::mutex> lock(mutex_results);
LOG_VERBOSE("send new result", {}); LOG_VERBOSE("send new result", {{"task_id", result.id}});
for (auto& task_id : waiting_task_ids) { for (auto& task_id : waiting_task_ids) {
// LOG_TEE("waiting task id %i \n", task_id); // LOG_TEE("waiting task id %i \n", task_id);
// for now, tasks that have associated parent multitasks just get erased once multitask picks up the result // for now, tasks that have associated parent multitasks just get erased once multitask picks up the result
if (result.multitask_id == task_id) if (result.multitask_id == task_id)
{ {
LOG_VERBOSE("callback_update_multitask", {}); LOG_VERBOSE("callback_update_multitask", {{"task_id", task_id}});
callback_update_multitask(task_id, result.id, result); callback_update_multitask(task_id, result.id, result);
continue; continue;
} }
if (result.id == task_id) if (result.id == task_id)
{ {
LOG_VERBOSE("queue_results.push_back", {}); LOG_VERBOSE("queue_results.push_back", {{"task_id", task_id}});
queue_results.push_back(result); queue_results.push_back(result);
condition_results.notify_one(); condition_results.notify_all();
return; return;
} }
} }