From c8546879c416cf77baeb30c80f19743506ab8d93 Mon Sep 17 00:00:00 2001 From: Radoslav Gerganov Date: Tue, 30 Apr 2024 13:08:53 +0300 Subject: [PATCH] Address review comments --- common/common.cpp | 1 + examples/main/main.cpp | 1 - ggml-rpc.cpp | 48 +++----- ggml-rpc.h | 13 +-- llama.cpp | 256 +++++++++++++++++++++-------------------- llama.h | 4 +- 6 files changed, 159 insertions(+), 164 deletions(-) diff --git a/common/common.cpp b/common/common.cpp index 2557d542f..96130ad54 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1839,6 +1839,7 @@ struct llama_model_params llama_model_params_from_gpt_params(const gpt_params & if (params.n_gpu_layers != -1) { mparams.n_gpu_layers = params.n_gpu_layers; } + mparams.rpc_servers = params.rpc_servers.c_str(); mparams.main_gpu = params.main_gpu; mparams.split_mode = params.split_mode; mparams.tensor_split = params.tensor_split; diff --git a/examples/main/main.cpp b/examples/main/main.cpp index 8d20b8ff0..9dee41001 100644 --- a/examples/main/main.cpp +++ b/examples/main/main.cpp @@ -187,7 +187,6 @@ int main(int argc, char ** argv) { LOG("%s: llama backend init\n", __func__); llama_backend_init(); llama_numa_init(params.numa); - llama_rpc_init(params.rpc_servers.empty() ? nullptr : params.rpc_servers.c_str()); llama_model * model; llama_context * ctx; diff --git a/ggml-rpc.cpp b/ggml-rpc.cpp index 021c4a170..e2242d3f1 100644 --- a/ggml-rpc.cpp +++ b/ggml-rpc.cpp @@ -102,6 +102,8 @@ static bool recv_data(int sockfd, void * data, size_t size) { return true; } +// RPC request : | rpc_cmd (1 byte) | request_size (8 bytes) | request_data (request_size bytes) | +// RPC response: | response_size (8 bytes) | response_data (response_size bytes) | static bool send_rpc_cmd(int sockfd, enum rpc_cmd cmd, const std::vector & input, std::vector & output) { uint8_t cmd_byte = cmd; if (!send_data(sockfd, &cmd_byte, sizeof(cmd_byte))) { @@ -468,36 +470,17 @@ static ggml_backend_i ggml_backend_rpc_interface = { /* .event_synchronize = */ NULL, }; -static std::vector endpoints; +static std::unordered_map instances; -GGML_API GGML_CALL void ggml_rpc_init(const char * rpc_servers) { - endpoints.clear(); - GGML_ASSERT(rpc_servers != NULL); - std::string servers(rpc_servers); - size_t pos = 0; - while ((pos = servers.find(",")) != std::string::npos) { - std::string server = servers.substr(0, pos); - endpoints.push_back(server); - servers.erase(0, pos + 1); - } - endpoints.push_back(servers); +GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(const std::string & endpoint) { + ggml_backend_t backend = ggml_backend_rpc_init(endpoint); + return ggml_backend_rpc_get_default_buffer_type(backend); } -static ggml_backend_t instances[GGML_RPC_MAX_SERVERS] = {0}; - -GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(int server_id) { - ggml_backend_rpc_init(server_id); - return ggml_backend_rpc_get_default_buffer_type(instances[server_id]); -} - -GGML_CALL ggml_backend_t ggml_backend_rpc_init(int server_id) { - if (server_id < 0 || server_id >= ggml_backend_rpc_get_server_count()) { - return nullptr; +GGML_CALL ggml_backend_t ggml_backend_rpc_init(const std::string & endpoint) { + if (instances.find(endpoint) != instances.end()) { + return instances[endpoint]; } - if (instances[server_id]) { - return instances[server_id]; - } - std::string endpoint = endpoints[server_id]; GGML_PRINT_DEBUG("Connecting to %s\n", endpoint.c_str()); // split the endpoint into host and port size_t pos = endpoint.find(":"); @@ -508,7 +491,7 @@ GGML_CALL ggml_backend_t ggml_backend_rpc_init(int server_id) { ggml_backend_rpc_buffer_type_context * buft_ctx = new ggml_backend_rpc_buffer_type_context { /* .sockfd = */ sockfd, - /* .name = */ "RPC" + std::to_string(server_id) + /* .name = */ "RPC" + std::to_string(sockfd) }; ggml_backend_buffer_type_t buft = new ggml_backend_buffer_type { @@ -523,21 +506,24 @@ GGML_CALL ggml_backend_t ggml_backend_rpc_init(int server_id) { /* .buft = */ buft }; - instances[server_id] = new ggml_backend { + instances[endpoint] = new ggml_backend { /* .guid = */ ggml_backend_rpc_guid(), /* .interface = */ ggml_backend_rpc_interface, /* .context = */ ctx }; - return instances[server_id]; + return instances[endpoint]; } GGML_API GGML_CALL bool ggml_backend_is_rpc(ggml_backend_t backend) { return backend != NULL && ggml_guid_matches(backend->guid, ggml_backend_rpc_guid()); } -GGML_API GGML_CALL int ggml_backend_rpc_get_server_count(void) { - return endpoints.size(); +GGML_API GGML_CALL void ggml_backend_rpc_get_device_memory(const std::string & endpoint, size_t * free, size_t * total) { + UNUSED(endpoint); + UNUSED(total); + // TODO: implement + *free = 1; } // RPC server-side implementation diff --git a/ggml-rpc.h b/ggml-rpc.h index 5695a37c5..2c26c82bc 100644 --- a/ggml-rpc.h +++ b/ggml-rpc.h @@ -2,13 +2,13 @@ #include "ggml.h" #include "ggml-backend.h" +#include #ifdef __cplusplus extern "C" { #endif -#define GGML_RPC_MAX_SERVERS 16 - +// ggml_tensor is serialized into rpc_tensor struct rpc_tensor { uint64_t id; uint32_t type; @@ -25,6 +25,7 @@ struct rpc_tensor { char name[GGML_MAX_NAME]; }; +// RPC commands enum rpc_cmd { ALLOC_BUFFER = 0, BUFFER_GET_BASE, @@ -36,15 +37,13 @@ enum rpc_cmd { GRAPH_COMPUTE, }; -GGML_API GGML_CALL void ggml_rpc_init(const char * rpc_servers); - // backend API -GGML_API GGML_CALL ggml_backend_t ggml_backend_rpc_init(int server_id); +GGML_API GGML_CALL ggml_backend_t ggml_backend_rpc_init(const std::string & endpoint); GGML_API GGML_CALL bool ggml_backend_is_rpc(ggml_backend_t backend); -GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(int server_id); +GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(const std::string & endpoint); -GGML_API GGML_CALL int ggml_backend_rpc_get_server_count(void); +GGML_API GGML_CALL void ggml_backend_rpc_get_device_memory(const std::string & endpoint, size_t * free, size_t * total); GGML_API GGML_CALL void rpc_serve_client(ggml_backend_t backend, int sockfd); diff --git a/llama.cpp b/llama.cpp index 75101fd13..436562c47 100644 --- a/llama.cpp +++ b/llama.cpp @@ -1689,99 +1689,6 @@ static ggml_backend_buffer_type_t llama_default_buffer_type_cpu(bool host_buffer GGML_UNUSED(host_buffer); } -static ggml_backend_buffer_type_t llama_default_buffer_type_offload(int gpu) { - ggml_backend_buffer_type_t buft = nullptr; - -#ifdef GGML_USE_METAL - buft = ggml_backend_metal_buffer_type(); -#elif defined(GGML_USE_RPC) - buft = ggml_backend_rpc_buffer_type(gpu); -#elif defined(GGML_USE_CUDA) - buft = ggml_backend_cuda_buffer_type(gpu); -#elif defined(GGML_USE_VULKAN) - buft = ggml_backend_vk_buffer_type(gpu); -#elif defined(GGML_USE_SYCL) - buft = ggml_backend_sycl_buffer_type(gpu); -#elif defined(GGML_USE_CLBLAST) - buft = ggml_backend_opencl_buffer_type(); -#elif defined(GGML_USE_KOMPUTE) - buft = ggml_backend_kompute_buffer_type(gpu); - if (buft == nullptr) { - LLAMA_LOG_WARN("%s: cannot use GPU %d, check `vulkaninfo --summary`\n", __func__, gpu); - } -#endif - - if (buft == nullptr) { - buft = llama_default_buffer_type_cpu(true); - } - return buft; - - GGML_UNUSED(gpu); -} - -static ggml_backend_buffer_type_t llama_default_buffer_type_split(int fallback_gpu, const float * tensor_split) { - ggml_backend_buffer_type_t buft = nullptr; - -#ifdef GGML_USE_CUDA - if (ggml_backend_cuda_get_device_count() > 1) { - buft = ggml_backend_cuda_split_buffer_type(tensor_split); - } -#endif - -#ifdef GGML_USE_SYCL - if (ggml_backend_sycl_get_device_count() > 1) { - buft = ggml_backend_sycl_split_buffer_type(tensor_split); - } -#endif - - if (buft == nullptr) { - buft = llama_default_buffer_type_offload(fallback_gpu); - } - return buft; - - GGML_UNUSED(tensor_split); -} - -static size_t llama_get_device_count() { -#if defined(GGML_USE_CUDA) - return ggml_backend_cuda_get_device_count(); -#elif defined(GGML_USE_RPC) - return ggml_backend_rpc_get_server_count(); -#elif defined(GGML_USE_SYCL) - return ggml_backend_sycl_get_device_count(); -#elif defined(GGML_USE_VULKAN) - return ggml_backend_vk_get_device_count(); -#else - return 1; -#endif -} - -static size_t llama_get_device_memory(int device) { -#if defined(GGML_USE_CUDA) - size_t total; - size_t free; - ggml_backend_cuda_get_device_memory(device, &free, &total); - return free; -#elif defined(GGML_USE_RPC) - // TODO: implement - GGML_UNUSED(device); - return 1; -#elif defined(GGML_USE_SYCL) - size_t total; - size_t free; - ggml_backend_sycl_get_device_memory(device, &free, &total); - return free; -#elif defined(GGML_USE_VULKAN) - size_t total; - size_t free; - ggml_backend_vk_get_device_memory(device, &free, &total); - return free; -#else - return 1; - GGML_UNUSED(device); -#endif -} - // // globals // @@ -2222,6 +2129,8 @@ struct llama_model { int main_gpu; int n_gpu_layers; + std::vector rpc_servers; + // gguf metadata std::unordered_map gguf_kv; @@ -2365,6 +2274,104 @@ struct llama_context { #endif }; +static ggml_backend_buffer_type_t llama_default_buffer_type_offload(const llama_model & model, int gpu) { + ggml_backend_buffer_type_t buft = nullptr; + +#ifdef GGML_USE_RPC + std::string endpoint = model.rpc_servers[gpu]; + buft = ggml_backend_rpc_buffer_type(endpoint); +#elif defined(GGML_USE_METAL) + buft = ggml_backend_metal_buffer_type(); +#elif defined(GGML_USE_CUDA) + buft = ggml_backend_cuda_buffer_type(gpu); +#elif defined(GGML_USE_VULKAN) + buft = ggml_backend_vk_buffer_type(gpu); +#elif defined(GGML_USE_SYCL) + buft = ggml_backend_sycl_buffer_type(gpu); +#elif defined(GGML_USE_CLBLAST) + buft = ggml_backend_opencl_buffer_type(); +#elif defined(GGML_USE_KOMPUTE) + buft = ggml_backend_kompute_buffer_type(gpu); + if (buft == nullptr) { + LLAMA_LOG_WARN("%s: cannot use GPU %d, check `vulkaninfo --summary`\n", __func__, gpu); + } +#endif + + if (buft == nullptr) { + buft = llama_default_buffer_type_cpu(true); + } + return buft; + + GGML_UNUSED(gpu); +} + +static ggml_backend_buffer_type_t llama_default_buffer_type_split(const llama_model & model, int fallback_gpu, const float * tensor_split) { + ggml_backend_buffer_type_t buft = nullptr; + +#ifdef GGML_USE_CUDA + if (ggml_backend_cuda_get_device_count() > 1) { + buft = ggml_backend_cuda_split_buffer_type(tensor_split); + } +#endif + +#ifdef GGML_USE_SYCL + if (ggml_backend_sycl_get_device_count() > 1) { + buft = ggml_backend_sycl_split_buffer_type(tensor_split); + } +#endif + + if (buft == nullptr) { + buft = llama_default_buffer_type_offload(model, fallback_gpu); + } + return buft; + + GGML_UNUSED(tensor_split); +} + +static size_t llama_get_device_count(const llama_model & model) { +#if defined(GGML_USE_RPC) + return model.rpc_servers.size(); +#elif defined(GGML_USE_CUDA) + return ggml_backend_cuda_get_device_count(); +#elif defined(GGML_USE_SYCL) + return ggml_backend_sycl_get_device_count(); +#elif defined(GGML_USE_VULKAN) + return ggml_backend_vk_get_device_count(); +#else + return 1; +#endif + GGML_UNUSED(model); +} + +static size_t llama_get_device_memory(const llama_model & model, int device) { +#if defined(GGML_USE_RPC) + size_t total; + size_t free; + std::string endpoint = model.rpc_servers[device]; + ggml_backend_rpc_get_device_memory(endpoint, &free, &total); + return free; +#elif defined(GGML_USE_CUDA) + size_t total; + size_t free; + ggml_backend_cuda_get_device_memory(device, &free, &total); + return free; +#elif defined(GGML_USE_SYCL) + size_t total; + size_t free; + ggml_backend_sycl_get_device_memory(device, &free, &total); + return free; +#elif defined(GGML_USE_VULKAN) + size_t total; + size_t free; + ggml_backend_vk_get_device_memory(device, &free, &total); + return free; +#else + return 1; + GGML_UNUSED(model); + GGML_UNUSED(device); +#endif +} + // // kv cache helpers // @@ -4803,13 +4810,13 @@ static bool llm_load_tensors( if (split_mode == LLAMA_SPLIT_MODE_LAYER) { // calculate the split points - int device_count = llama_get_device_count(); + int device_count = llama_get_device_count(model); bool all_zero = tensor_split == nullptr || std::all_of(tensor_split, tensor_split + device_count, [](float x) { return x == 0.0f; }); std::vector splits(device_count); if (all_zero) { // default split, by free memory for (int i = 0; i < device_count; ++i) { - splits[i] = llama_get_device_memory(i); + splits[i] = llama_get_device_memory(model, i); } } else { std::copy(tensor_split, tensor_split + device_count, splits.begin()); @@ -4829,35 +4836,35 @@ static bool llm_load_tensors( int act_gpu_layers = std::min(n_gpu_layers, (int)n_layer + 1); for (int64_t i = i_gpu_start; i < n_layer; ++i) { int layer_gpu = std::upper_bound(splits.begin(), splits.begin() + device_count, float(i - i_gpu_start)/act_gpu_layers) - splits.begin(); - model.buft_layer[i] = llama_default_buffer_type_offload(layer_gpu); + model.buft_layer[i] = llama_default_buffer_type_offload(model, layer_gpu); } // assign the output layer if (n_gpu_layers > n_layer) { int layer_gpu = std::upper_bound(splits.begin(), splits.begin() + device_count, float(act_gpu_layers - 1)/act_gpu_layers) - splits.begin(); - model.buft_output = llama_default_buffer_type_offload(layer_gpu); + model.buft_output = llama_default_buffer_type_offload(model, layer_gpu); } else { model.buft_output = llama_default_buffer_type_cpu(true); } } else { ggml_backend_buffer_type_t split_buft; if (split_mode == LLAMA_SPLIT_MODE_ROW) { - split_buft = llama_default_buffer_type_split(main_gpu, tensor_split); + split_buft = llama_default_buffer_type_split(model, main_gpu, tensor_split); } else { // LLAMA_SPLIT_MODE_NONE or LLAMA_SPLIT_MODE_LAYER in backends where it is not supported - split_buft = llama_default_buffer_type_offload(main_gpu); + split_buft = llama_default_buffer_type_offload(model, main_gpu); } // assign the repeating layers for (int64_t i = i_gpu_start; i < n_layer; ++i) { model.buft_layer[i] = { split_buft, - llama_default_buffer_type_offload(main_gpu) + llama_default_buffer_type_offload(model, main_gpu) }; } // assign the output layer if (n_gpu_layers > n_layer) { model.buft_output = { split_buft, - llama_default_buffer_type_offload(main_gpu) + llama_default_buffer_type_offload(model, main_gpu) }; } else { model.buft_output = llama_default_buffer_type_cpu(true); @@ -15402,6 +15409,7 @@ struct llama_model_params llama_model_default_params() { /*.split_mode =*/ LLAMA_SPLIT_MODE_LAYER, /*.main_gpu =*/ 0, /*.tensor_split =*/ nullptr, + /*.rpc_servers =*/ nullptr, /*.progress_callback =*/ nullptr, /*.progress_callback_user_data =*/ nullptr, /*.kv_overrides =*/ nullptr, @@ -15524,16 +15532,6 @@ void llama_numa_init(enum ggml_numa_strategy numa) { } } -void llama_rpc_init(const char * rpc_servers) { -#ifdef GGML_USE_RPC - ggml_rpc_init(rpc_servers); -#else - if (rpc_servers != nullptr) { - LLAMA_LOG_WARN("%s: RPC support is not enabled in this build\n", __func__); - } -#endif -} - void llama_backend_free(void) { #ifdef GGML_USE_MPI ggml_mpi_backend_free(); @@ -15568,7 +15566,17 @@ struct llama_model * llama_load_model_from_file( return true; }; } - + if (params.rpc_servers != nullptr) { + // split the servers set them into model->rpc_servers + std::string servers(params.rpc_servers); + size_t pos = 0; + while ((pos = servers.find(",")) != std::string::npos) { + std::string server = servers.substr(0, pos); + model->rpc_servers.push_back(server); + servers.erase(0, pos + 1); + } + model->rpc_servers.push_back(servers); + } int status = llama_model_load(path_model, *model, params); GGML_ASSERT(status <= 0); if (status < 0) { @@ -15715,7 +15723,17 @@ struct llama_context * llama_new_context_with_model( if (!hparams.vocab_only) { // initialize backends -#ifdef GGML_USE_METAL +#if defined(GGML_USE_RPC) + for (auto & server : model->rpc_servers) { + ggml_backend_t backend = ggml_backend_rpc_init(server); + if (backend == nullptr) { + LLAMA_LOG_ERROR("%s: failed to initialize RPC backend, endpoint: %s\n", __func__, server.c_str()); + llama_free(ctx); + return nullptr; + } + ctx->backends.push_back(backend); + } +#elif defined(GGML_USE_METAL) if (model->n_gpu_layers > 0) { ctx->backend_metal = ggml_backend_metal_init(); if (ctx->backend_metal == nullptr) { @@ -15725,16 +15743,6 @@ struct llama_context * llama_new_context_with_model( } ctx->backends.push_back(ctx->backend_metal); } -#elif defined(GGML_USE_RPC) - for (int server = 0; server < ggml_backend_rpc_get_server_count(); ++server) { - ggml_backend_t backend = ggml_backend_rpc_init(server); - if (backend == nullptr) { - LLAMA_LOG_ERROR("%s: failed to initialize RPC%d backend\n", __func__, server); - llama_free(ctx); - return nullptr; - } - ctx->backends.push_back(backend); - } #elif defined(GGML_USE_CUDA) if (model->split_mode == LLAMA_SPLIT_MODE_NONE || model->split_mode == LLAMA_SPLIT_MODE_ROW) { // with split_mode LLAMA_SPLIT_MODE_NONE or LLAMA_SPLIT_MODE_ROW, only the main GPU backend is used @@ -15882,7 +15890,7 @@ struct llama_context * llama_new_context_with_model( // enabling pipeline parallelism in the scheduler increases memory usage, so it is only done when necessary bool pipeline_parallel = - llama_get_device_count() > 1 && + llama_get_device_count(*model) > 1 && model->n_gpu_layers > (int)model->hparams.n_layer && model->split_mode == LLAMA_SPLIT_MODE_LAYER && params.offload_kqv; diff --git a/llama.h b/llama.h index 7b93ac202..612e32c4e 100644 --- a/llama.h +++ b/llama.h @@ -242,6 +242,9 @@ extern "C" { // proportion of the model (layers or rows) to offload to each GPU, size: llama_max_devices() const float * tensor_split; + // comma separated list of RPC servers to use for offloading + const char * rpc_servers; + // Called with a progress value between 0.0 and 1.0. Pass NULL to disable. // If the provided progress_callback returns true, model loading continues. // If it returns false, model loading is immediately aborted. @@ -383,7 +386,6 @@ extern "C" { //optional: LLAMA_API void llama_numa_init(enum ggml_numa_strategy numa); - LLAMA_API void llama_rpc_init(const char * rpc_servers); // Call once at the end of the program - currently only used for MPI LLAMA_API void llama_backend_free(void);