From bfdd4f4045756516f6deff4807b1c6cc61ef128b Mon Sep 17 00:00:00 2001 From: Radoslav Gerganov Date: Thu, 18 Apr 2024 14:07:22 +0300 Subject: [PATCH] ggml : add RPC backend The RPC backend proxies all operations to a remote server which runs a regular backend (CPU, CUDA, Metal, etc). --- CMakeLists.txt | 9 + common/common.cpp | 9 + common/common.h | 1 + examples/CMakeLists.txt | 3 + examples/main/main.cpp | 1 + examples/rpc/CMakeLists.txt | 2 + examples/rpc/rpc-server.cpp | 101 +++++ ggml-rpc.cpp | 767 ++++++++++++++++++++++++++++++++++++ ggml-rpc.h | 53 +++ llama.cpp | 34 +- llama.h | 1 + 11 files changed, 980 insertions(+), 1 deletion(-) create mode 100644 examples/rpc/CMakeLists.txt create mode 100644 examples/rpc/rpc-server.cpp create mode 100644 ggml-rpc.cpp create mode 100644 ggml-rpc.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 1c3b5c8e4..d61fa4520 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -123,6 +123,7 @@ set(LLAMA_METAL_MACOSX_VERSION_MIN "" CACHE STRING set(LLAMA_METAL_STD "" CACHE STRING "llama: metal standard version (-std flag)") option(LLAMA_KOMPUTE "llama: use Kompute" OFF) option(LLAMA_MPI "llama: use MPI" OFF) +option(LLAMA_RPC "llama: use RPC" OFF) option(LLAMA_QKK_64 "llama: use super-block size of 64 for k-quants" OFF) option(LLAMA_SYCL "llama: use SYCL" OFF) option(LLAMA_SYCL_F16 "llama: use 16 bit floats for sycl calculations" OFF) @@ -494,6 +495,13 @@ if (LLAMA_MPI) endif() endif() +if (LLAMA_RPC) + add_compile_definitions(GGML_USE_RPC) + + set(GGML_HEADERS_RPC ggml-rpc.h) + set(GGML_SOURCES_RPC ggml-rpc.cpp) +endif() + if (LLAMA_CLBLAST) find_package(CLBlast) if (CLBlast_FOUND) @@ -1176,6 +1184,7 @@ add_library(ggml OBJECT ${GGML_SOURCES_OPENCL} ${GGML_HEADERS_OPENCL} ${GGML_SOURCES_METAL} ${GGML_HEADERS_METAL} ${GGML_SOURCES_MPI} ${GGML_HEADERS_MPI} + ${GGML_SOURCES_RPC} ${GGML_HEADERS_RPC} ${GGML_SOURCES_EXTRA} ${GGML_HEADERS_EXTRA} ${GGML_SOURCES_SYCL} ${GGML_HEADERS_SYCL} ${GGML_SOURCES_KOMPUTE} ${GGML_HEADERS_KOMPUTE} diff --git a/common/common.cpp b/common/common.cpp index ba1ecf0e5..2557d542f 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1060,6 +1060,14 @@ bool gpt_params_find_arg(int argc, char ** argv, const std::string & arg, gpt_pa #endif // GGML_USE_CUDA_SYCL_VULKAN return true; } + if (arg == "--rpc") { + if (++i >= argc) { + invalid_param = true; + return true; + } + params.rpc_servers = argv[i]; + return true; + } if (arg == "--no-mmap") { params.use_mmap = false; return true; @@ -1557,6 +1565,7 @@ void gpt_print_usage(int /*argc*/, char ** argv, const gpt_params & params) { printf(" -mg i, --main-gpu i the GPU to use for the model (with split-mode = none),\n"); printf(" or for intermediate results and KV (with split-mode = row) (default: %d)\n", params.main_gpu); } + printf(" --rpc SERVERS comma separated list of RPC servers\n"); printf(" --verbose-prompt print a verbose prompt before generation (default: %s)\n", params.verbose_prompt ? "true" : "false"); printf(" --no-display-prompt don't print prompt at generation (default: %s)\n", !params.display_prompt ? "true" : "false"); printf(" -gan N, --grp-attn-n N\n"); diff --git a/common/common.h b/common/common.h index d80344f2a..566490e2f 100644 --- a/common/common.h +++ b/common/common.h @@ -82,6 +82,7 @@ struct gpt_params { float yarn_beta_slow = 1.0f; // YaRN high correction dim int32_t yarn_orig_ctx = 0; // YaRN original context length float defrag_thold = -1.0f; // KV cache defragmentation threshold + std::string rpc_servers = ""; // comma separated list of RPC servers ggml_backend_sched_eval_callback cb_eval = nullptr; void * cb_eval_user_data = nullptr; diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f421769cc..b40ee4ccb 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -49,4 +49,7 @@ else() add_subdirectory(server) endif() add_subdirectory(export-lora) + if (LLAMA_RPC) + add_subdirectory(rpc) + endif() endif() diff --git a/examples/main/main.cpp b/examples/main/main.cpp index 9dee41001..8d20b8ff0 100644 --- a/examples/main/main.cpp +++ b/examples/main/main.cpp @@ -187,6 +187,7 @@ int main(int argc, char ** argv) { LOG("%s: llama backend init\n", __func__); llama_backend_init(); llama_numa_init(params.numa); + llama_rpc_init(params.rpc_servers.empty() ? nullptr : params.rpc_servers.c_str()); llama_model * model; llama_context * ctx; diff --git a/examples/rpc/CMakeLists.txt b/examples/rpc/CMakeLists.txt new file mode 100644 index 000000000..ae48fb98d --- /dev/null +++ b/examples/rpc/CMakeLists.txt @@ -0,0 +1,2 @@ +add_executable(rpc-server rpc-server.cpp) +target_link_libraries(rpc-server PRIVATE ggml llama) diff --git a/examples/rpc/rpc-server.cpp b/examples/rpc/rpc-server.cpp new file mode 100644 index 000000000..5fc315425 --- /dev/null +++ b/examples/rpc/rpc-server.cpp @@ -0,0 +1,101 @@ +#ifdef GGML_USE_CUDA +#include "ggml-cuda.h" +#endif + +#ifdef GGML_USE_METAL +#include "ggml-metal.h" +#endif + +#include "ggml-rpc.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static ggml_backend_t create_backend() { + ggml_backend_t backend = NULL; +#ifdef GGML_USE_CUDA + fprintf(stderr, "%s: using CUDA backend\n", __func__); + backend = ggml_backend_cuda_init(0); // init device 0 + if (!backend) { + fprintf(stderr, "%s: ggml_backend_cuda_init() failed\n", __func__); + } +#endif + +#ifdef GGML_USE_METAL + fprintf(stderr, "%s: using Metal backend\n", __func__); + backend = ggml_backend_metal_init(); + if (!backend) { + fprintf(stderr, "%s: ggml_backend_metal_init() failed\n", __func__); + } +#endif + + // if there aren't GPU Backends fallback to CPU backend + if (!backend) { + fprintf(stderr, "%s: using CPU backend\n", __func__); + backend = ggml_backend_cpu_init(); + } + return backend; +} + +static int create_server_socket(const char * host, int port) { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { + return -1; + } + + struct sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = inet_addr(host); + serv_addr.sin_port = htons(port); + + if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + return -1; + } + if (listen(sockfd, 5) < 0) { + return -1; + } + return sockfd; +} + +int main(int argc, char * argv[]) +{ + if (argc < 3) { + fprintf(stderr, "Usage: %s \n", argv[0]); + return 1; + } + const char * host = argv[1]; + int port = std::stoi(argv[2]); + + ggml_backend_t backend = create_backend(); + if (!backend) { + fprintf(stderr, "Failed to create backend\n"); + return 1; + } + + printf("Starting RPC server on %s:%d\n", host, port); + int server_socket = create_server_socket(host, port); + if (server_socket < 0) { + fprintf(stderr, "Failed to create server socket\n"); + return 1; + } + while (true) { + struct sockaddr_in cli_addr; + socklen_t clilen = sizeof(cli_addr); + int client_socket = accept(server_socket, (struct sockaddr *) &cli_addr, &clilen); + if (client_socket < 0) { + fprintf(stderr, "Failed to accept client connection\n"); + return 1; + } + printf("Accepted client connection\n"); + rpc_serve_client(backend, client_socket); + printf("Client connection closed\n"); + close(client_socket); + } + return 0; +} diff --git a/ggml-rpc.cpp b/ggml-rpc.cpp new file mode 100644 index 000000000..c1c1ffcec --- /dev/null +++ b/ggml-rpc.cpp @@ -0,0 +1,767 @@ +#include "ggml-rpc.h" +#include "ggml.h" +#include "ggml-backend-impl.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define UNUSED GGML_UNUSED + +#define GGML_DEBUG 1 +#if (GGML_DEBUG >= 1) +#define GGML_PRINT_DEBUG(...) printf(__VA_ARGS__) +#else +#define GGML_PRINT_DEBUG(...) +#endif + +// RPC data structures + +static ggml_guid_t ggml_backend_rpc_guid() { + static ggml_guid guid = { 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}; + return &guid; +} + +struct ggml_backend_rpc_buffer_type_context { + int sockfd; + std::string name; +}; + +struct ggml_backend_rpc_context { + std::string endpoint; + std::string name; + int sockfd; + ggml_backend_buffer_type_t buft; +}; + +struct ggml_backend_rpc_buffer_context { + int sockfd; + uint64_t remote_ptr; + std::string name; +}; + + +// RPC helper functions + +static int socket_connect(const char * host, int port) { + struct sockaddr_in addr; + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + return -1; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + struct hostent * server = gethostbyname(host); + if (server == NULL) { + fprintf(stderr, "Cannot resolve host '%s'\n", host); + return -1; + } + bcopy((char *)server->h_addr, (char *)&addr.sin_addr.s_addr, server->h_length); + if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + return -1; + } + return sock; +} + +static bool send_data(int sockfd, const void * data, size_t size) { + size_t bytes_sent = 0; + while (bytes_sent < size) { + ssize_t n = send(sockfd, (const uint8_t *)data + bytes_sent, size - bytes_sent, 0); + if (n < 0) { + return false; + } + bytes_sent += n; + } + return true; +} + +static bool recv_data(int sockfd, void * data, size_t size) { + size_t bytes_recv = 0; + while (bytes_recv < size) { + ssize_t n = recv(sockfd, (uint8_t *)data + bytes_recv, size - bytes_recv, 0); + if (n <= 0) { + return false; + } + bytes_recv += n; + } + return true; +} + +static bool send_rpc_cmd(int sockfd, enum rpc_cmd cmd, const std::vector & input, std::vector & output) { + uint8_t cmd_byte = cmd; + if (!send_data(sockfd, &cmd_byte, sizeof(cmd_byte))) { + return false; + } + uint64_t input_size = input.size(); + if (!send_data(sockfd, &input_size, sizeof(input_size))) { + return false; + } + if (!send_data(sockfd, input.data(), input.size())) { + return false; + } + uint64_t output_size; + if (!recv_data(sockfd, &output_size, sizeof(output_size))) { + return false; + } + if (output_size == 0) { + output.clear(); + return true; + } + output.resize(output_size); + if (!recv_data(sockfd, output.data(), output_size)) { + return false; + } + return true; +} + +// RPC client-side implementation + +GGML_CALL static const char * ggml_backend_rpc_buffer_get_name(ggml_backend_buffer_t buffer) { + ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; + return ctx->name.c_str(); +} + +GGML_CALL static void ggml_backend_rpc_buffer_free_buffer(ggml_backend_buffer_t buffer) { + ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; + // input serialization format: | remote_ptr (8 bytes) | + std::vector input(sizeof(uint64_t), 0); + uint64_t remote_ptr = ctx->remote_ptr; + memcpy(input.data(), &remote_ptr, sizeof(remote_ptr)); + std::vector output; + bool status = send_rpc_cmd(ctx->sockfd, FREE_BUFFER, input, output); + GGML_ASSERT(status); + GGML_ASSERT(output.empty()); + delete ctx; +} + +GGML_CALL static void * ggml_backend_rpc_buffer_get_base(ggml_backend_buffer_t buffer) { + static std::unordered_map cache; + if (cache.find(buffer) != cache.end()) { + return cache[buffer]; + } + ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; + // input serialization format: | remote_ptr (8 bytes) | + std::vector input(sizeof(uint64_t), 0); + uint64_t remote_ptr = ctx->remote_ptr; + memcpy(input.data(), &remote_ptr, sizeof(remote_ptr)); + std::vector output; + bool status = send_rpc_cmd(ctx->sockfd, BUFFER_GET_BASE, input, output); + GGML_ASSERT(status); + GGML_ASSERT(output.size() == sizeof(uint64_t)); + // output serialization format: | base_ptr (8 bytes) | + uint64_t base_ptr; + memcpy(&base_ptr, output.data(), sizeof(base_ptr)); + void * base = reinterpret_cast(base_ptr); + cache[buffer] = base; + return base; +} + +static rpc_tensor serialize_tensor(const ggml_tensor * tensor) { + rpc_tensor result; + result.id = reinterpret_cast(tensor); + result.type = tensor->type; + if (tensor->buffer) { + ggml_backend_buffer_t buffer = tensor->buffer; + ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; + result.buffer = ctx->remote_ptr; + } else { + result.buffer = 0; + } + for (uint32_t i = 0; i < GGML_MAX_DIMS; i++) { + result.ne[i] = tensor->ne[i]; + result.nb[i] = tensor->nb[i]; + } + result.op = tensor->op; + for (uint32_t i = 0; i < GGML_MAX_OP_PARAMS / sizeof(int32_t); i++) { + result.op_params[i] = tensor->op_params[i]; + } + result.flags = tensor->flags; + for (uint32_t i = 0; i < GGML_MAX_SRC; i++) { + result.src[i] = reinterpret_cast(tensor->src[i]); + } + result.view_src = reinterpret_cast(tensor->view_src); + result.view_offs = tensor->view_offs; + result.data = reinterpret_cast(tensor->data); + snprintf(result.name, GGML_MAX_NAME, "%s", tensor->name); + return result; +} + +static ggml_tensor * deserialize_tensor(struct ggml_context * ctx, const rpc_tensor * tensor) { + ggml_tensor * result = ggml_new_tensor_4d(ctx, (ggml_type) tensor->type, + tensor->ne[0], tensor->ne[1], tensor->ne[2], tensor->ne[3]); + for (uint32_t i = 0; i < GGML_MAX_DIMS; i++) { + result->nb[i] = tensor->nb[i]; + } + result->buffer = reinterpret_cast(tensor->buffer); + result->op = (ggml_op) tensor->op; + for (uint32_t i = 0; i < GGML_MAX_OP_PARAMS / sizeof(int32_t); i++) { + result->op_params[i] = tensor->op_params[i]; + } + result->flags = tensor->flags; + result->data = reinterpret_cast(tensor->data); + snprintf(result->name, GGML_MAX_NAME, "%s", tensor->name); + return result; +} + +GGML_CALL static void ggml_backend_rpc_buffer_init_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor) { + UNUSED(buffer); + if (ggml_is_quantized(tensor->type)) { + GGML_ASSERT(tensor->ne[0] % 512 == 0 && "unsupported quantized tensor"); + } +} + +GGML_CALL static void ggml_backend_rpc_buffer_set_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor, const void * data, size_t offset, size_t size) { + ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; + // input serialization format: | rpc_tensor | offset (8 bytes) | data (size bytes) | + int input_size = sizeof(rpc_tensor) + sizeof(uint64_t) + size; + std::vector input(input_size, 0); + rpc_tensor rpc_tensor = serialize_tensor(tensor); + memcpy(input.data(), &rpc_tensor, sizeof(rpc_tensor)); + memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset)); + memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), data, size); + std::vector output; + bool status = send_rpc_cmd(ctx->sockfd, SET_TENSOR, input, output); + GGML_ASSERT(status); +} + +GGML_CALL static void ggml_backend_rpc_buffer_get_tensor(ggml_backend_buffer_t buffer, const ggml_tensor * tensor, void * data, size_t offset, size_t size) { + ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; + // input serialization format: | rpc_tensor | offset (8 bytes) | size (8 bytes) | + int input_size = sizeof(rpc_tensor) + 2*sizeof(uint64_t); + std::vector input(input_size, 0); + rpc_tensor rpc_tensor = serialize_tensor(tensor); + memcpy(input.data(), &rpc_tensor, sizeof(rpc_tensor)); + memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset)); + memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), &size, sizeof(size)); + std::vector output; + bool status = send_rpc_cmd(ctx->sockfd, GET_TENSOR, input, output); + GGML_ASSERT(status); + GGML_ASSERT(output.size() == size); + // output serialization format: | data (size bytes) | + memcpy(data, output.data(), size); +} + +GGML_CALL static bool ggml_backend_rpc_buffer_cpy_tensor(ggml_backend_buffer_t buffer, const ggml_tensor * src, ggml_tensor * dst) { + // check if src and dst are on the same server + ggml_backend_buffer_t src_buffer = src->buffer; + ggml_backend_rpc_buffer_context * src_ctx = (ggml_backend_rpc_buffer_context *)src_buffer->context; + ggml_backend_buffer_t dst_buffer = dst->buffer; + ggml_backend_rpc_buffer_context * dst_ctx = (ggml_backend_rpc_buffer_context *)dst_buffer->context; + if (src_ctx->sockfd != dst_ctx->sockfd) { + return false; + } + ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; + // input serialization format: | rpc_tensor src | rpc_tensor dst | + int input_size = 2*sizeof(rpc_tensor); + std::vector input(input_size, 0); + rpc_tensor rpc_src = serialize_tensor(src); + rpc_tensor rpc_dst = serialize_tensor(dst); + memcpy(input.data(), &rpc_src, sizeof(rpc_src)); + memcpy(input.data() + sizeof(rpc_src), &rpc_dst, sizeof(rpc_dst)); + std::vector output; + bool status = send_rpc_cmd(ctx->sockfd, COPY_TENSOR, input, output); + GGML_ASSERT(status); + // output serialization format: | result (1 byte) | + GGML_ASSERT(output.size() == 1); + return output[0]; +} + +GGML_CALL static void ggml_backend_rpc_buffer_clear(ggml_backend_buffer_t buffer, uint8_t value) { + ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; + // serialization format: | bufptr (8 bytes) | value (1 byte) | + int input_size = sizeof(uint64_t) + sizeof(uint8_t); + std::vector input(input_size, 0); + memcpy(input.data(), &ctx->remote_ptr, sizeof(ctx->remote_ptr)); + memcpy(input.data() + sizeof(ctx->remote_ptr), &value, sizeof(value)); + std::vector output; + bool status = send_rpc_cmd(ctx->sockfd, BUFFER_CLEAR, input, output); + GGML_ASSERT(status); +} + +static ggml_backend_buffer_i ggml_backend_rpc_buffer_interface = { + /* .get_name = */ ggml_backend_rpc_buffer_get_name, + /* .free_buffer = */ ggml_backend_rpc_buffer_free_buffer, + /* .get_base = */ ggml_backend_rpc_buffer_get_base, + /* .init_tensor = */ ggml_backend_rpc_buffer_init_tensor, + /* .set_tensor = */ ggml_backend_rpc_buffer_set_tensor, + /* .get_tensor = */ ggml_backend_rpc_buffer_get_tensor, + /* .cpy_tensor = */ ggml_backend_rpc_buffer_cpy_tensor, + /* .clear = */ ggml_backend_rpc_buffer_clear, + /* .reset = */ NULL, +}; + +GGML_CALL static const char * ggml_backend_rpc_buffer_type_name(ggml_backend_buffer_type_t buft) { + ggml_backend_rpc_buffer_type_context * buft_ctx = (ggml_backend_rpc_buffer_type_context *)buft->context; + return buft_ctx->name.c_str(); +} + +GGML_CALL static ggml_backend_buffer_t ggml_backend_rpc_buffer_type_alloc_buffer(ggml_backend_buffer_type_t buft, size_t size) { + ggml_backend_rpc_buffer_type_context * buft_ctx = (ggml_backend_rpc_buffer_type_context *)buft->context; + // input serialization format: | size (8 bytes) | + int input_size = sizeof(uint64_t); + std::vector input(input_size, 0); + memcpy(input.data(), &size, sizeof(size)); + std::vector output; + bool status = send_rpc_cmd(buft_ctx->sockfd, ALLOC_BUFFER, input, output); + GGML_ASSERT(status); + GGML_ASSERT(output.size() == 2*sizeof(uint64_t)); + // output serialization format: | remote_ptr (8 bytes) | remote_size (8 bytes) | + uint64_t remote_ptr; + memcpy(&remote_ptr, output.data(), sizeof(remote_ptr)); + size_t remote_size; + memcpy(&remote_size, output.data() + sizeof(uint64_t), sizeof(remote_size)); + + ggml_backend_buffer_t buffer = ggml_backend_buffer_init(buft, + ggml_backend_rpc_buffer_interface, + new ggml_backend_rpc_buffer_context{buft_ctx->sockfd, remote_ptr, "RPC"}, + remote_size); + + return buffer; +} + +GGML_CALL static size_t ggml_backend_rpc_buffer_type_get_alignment(ggml_backend_buffer_type_t buft) { + UNUSED(buft); + // TODO: this is hardcoded for now but it should come from the remote backend + return 32; +} + +GGML_CALL static size_t ggml_backend_rpc_buffer_type_get_alloc_size(ggml_backend_buffer_type_t buft, const ggml_tensor * tensor) { + UNUSED(buft); + return ggml_nbytes(tensor); +} + +GGML_CALL static bool ggml_backend_rpc_buffer_type_supports_backend(ggml_backend_buffer_type_t buft, ggml_backend_t backend) { + if (!ggml_backend_is_rpc(backend)) { + return false; + } + ggml_backend_rpc_buffer_type_context * buft_ctx = (ggml_backend_rpc_buffer_type_context *)buft->context; + ggml_backend_rpc_context * rpc_ctx = (ggml_backend_rpc_context *)backend->context; + return buft_ctx->sockfd == rpc_ctx->sockfd; +} + +static ggml_backend_buffer_type_i ggml_backend_rpc_buffer_type_interface = { + /* .get_name = */ ggml_backend_rpc_buffer_type_name, + /* .alloc_buffer = */ ggml_backend_rpc_buffer_type_alloc_buffer, + /* .get_alignment = */ ggml_backend_rpc_buffer_type_get_alignment, + /* .get_max_size = */ NULL, // defaults to SIZE_MAX + /* .get_alloc_size = */ ggml_backend_rpc_buffer_type_get_alloc_size, + /* .supports_backend = */ ggml_backend_rpc_buffer_type_supports_backend, + /* .is_host = */ NULL, +}; + + +GGML_CALL static const char * ggml_backend_rpc_name(ggml_backend_t backend) { + ggml_backend_rpc_context * rpc_ctx = (ggml_backend_rpc_context *)backend->context; + + return rpc_ctx->name.c_str(); +} + +GGML_CALL static void ggml_backend_rpc_free(ggml_backend_t backend) { + ggml_backend_rpc_context * rpc_ctx = (ggml_backend_rpc_context *)backend->context; + ggml_backend_rpc_buffer_type_context * buft_ctx = (ggml_backend_rpc_buffer_type_context *)rpc_ctx->buft->context; + //close(rpc_ctx->sockfd); + delete buft_ctx; + delete rpc_ctx->buft; + delete rpc_ctx; + delete backend; +} + +GGML_CALL static ggml_backend_buffer_type_t ggml_backend_rpc_get_default_buffer_type(ggml_backend_t backend) { + ggml_backend_rpc_context * ctx = (ggml_backend_rpc_context *)backend->context; + return ctx->buft; +} + +GGML_CALL static void ggml_backend_rpc_synchronize(ggml_backend_t backend) { + UNUSED(backend); + // this is no-op because we don't have any async operations +} + +static void add_tensor(ggml_tensor * tensor, std::vector & tensors, std::unordered_set & visited) { + if (tensor == nullptr) { + return; + } + if (visited.find(tensor) != visited.end()) { + return; + } + visited.insert(tensor); + for (int i = 0; i < GGML_MAX_SRC; i++) { + add_tensor(tensor->src[i], tensors, visited); + } + add_tensor(tensor->view_src, tensors, visited); + tensors.push_back(serialize_tensor(tensor)); +} + +static void serialize_graph(const ggml_cgraph * cgraph, std::vector & output) { + uint32_t n_nodes = cgraph->n_nodes; + std::vector tensors; + std::unordered_set visited; + for (uint32_t i = 0; i < n_nodes; i++) { + add_tensor(cgraph->nodes[i], tensors, visited); + } + // serialization format: + // | n_nodes (4 bytes) | nodes (n_nodes * sizeof(uint64_t) | n_tensors (4 bytes) | tensors (n_tensors * sizeof(rpc_tensor)) | + uint32_t n_tensors = tensors.size(); + int output_size = sizeof(uint32_t) + n_nodes * sizeof(uint64_t) + sizeof(uint32_t) + n_tensors * sizeof(rpc_tensor); + output.resize(output_size, 0); + memcpy(output.data(), &n_nodes, sizeof(n_nodes)); + uint64_t * out_nodes = (uint64_t *)(output.data() + sizeof(n_nodes)); + for (uint32_t i = 0; i < n_nodes; i++) { + out_nodes[i] = reinterpret_cast(cgraph->nodes[i]); + } + uint32_t * out_ntensors = (uint32_t *)(output.data() + sizeof(n_nodes) + n_nodes * sizeof(uint64_t)); + *out_ntensors = n_tensors; + rpc_tensor * out_tensors = (rpc_tensor *)(output.data() + sizeof(n_nodes) + n_nodes * sizeof(uint64_t) + sizeof(uint32_t)); + memcpy(out_tensors, tensors.data(), n_tensors * sizeof(rpc_tensor)); +} + +GGML_CALL static enum ggml_status ggml_backend_rpc_graph_compute(ggml_backend_t backend, ggml_cgraph * cgraph) { + ggml_backend_rpc_context * rpc_ctx = (ggml_backend_rpc_context *)backend->context; + std::vector input; + serialize_graph(cgraph, input); + std::vector output; + bool status = send_rpc_cmd(rpc_ctx->sockfd, GRAPH_COMPUTE, input, output); + GGML_ASSERT(status); + GGML_ASSERT(output.size() == 1); + return (enum ggml_status)output[0]; +} + +GGML_CALL static bool ggml_backend_rpc_supports_op(ggml_backend_t backend, const ggml_tensor * op) { + UNUSED(backend); + UNUSED(op); + GGML_ASSERT(false && "not implemented"); + return false; +} + +static ggml_backend_i ggml_backend_rpc_interface = { + /* .get_name = */ ggml_backend_rpc_name, + /* .free = */ ggml_backend_rpc_free, + /* .get_default_buffer_type = */ ggml_backend_rpc_get_default_buffer_type, + /* .set_tensor_async = */ NULL, + /* .get_tensor_async = */ NULL, + /* .cpy_tensor_async = */ NULL, + /* .synchronize = */ ggml_backend_rpc_synchronize, + /* .graph_plan_create = */ NULL, + /* .graph_plan_free = */ NULL, + /* .graph_plan_compute = */ NULL, + /* .graph_compute = */ ggml_backend_rpc_graph_compute, + /* .supports_op = */ ggml_backend_rpc_supports_op, + /* .offload_op = */ NULL, + /* .event_new = */ NULL, + /* .event_free = */ NULL, + /* .event_record = */ NULL, + /* .event_wait = */ NULL, + /* .event_synchronize = */ NULL, +}; + +static std::vector endpoints; + +GGML_API GGML_CALL void ggml_rpc_init(const char * rpc_servers) { + endpoints.clear(); + GGML_ASSERT(rpc_servers != NULL); + std::string servers(rpc_servers); + size_t pos = 0; + while ((pos = servers.find(",")) != std::string::npos) { + std::string server = servers.substr(0, pos); + endpoints.push_back(server); + servers.erase(0, pos + 1); + } + endpoints.push_back(servers); +} + +static ggml_backend_t instances[GGML_RPC_MAX_SERVERS] = {0}; + +GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(int server_id) { + ggml_backend_rpc_init(server_id); + return ggml_backend_rpc_get_default_buffer_type(instances[server_id]); +} + +GGML_CALL ggml_backend_t ggml_backend_rpc_init(int server_id) { + if (server_id < 0 || server_id >= ggml_backend_rpc_get_server_count()) { + return nullptr; + } + if (instances[server_id]) { + return instances[server_id]; + } + std::string endpoint = endpoints[server_id]; + GGML_PRINT_DEBUG("Connecting to %s\n", endpoint.c_str()); + // split the endpoint into host and port + size_t pos = endpoint.find(":"); + std::string host = endpoint.substr(0, pos); + int port = std::stoi(endpoint.substr(pos + 1)); + int sockfd = socket_connect(host.c_str(), port); + GGML_ASSERT(sockfd >= 0 && "failed to connect to the server"); + + ggml_backend_rpc_buffer_type_context * buft_ctx = new ggml_backend_rpc_buffer_type_context { + /* .sockfd = */ sockfd, + /* .name = */ "RPC" + std::to_string(server_id) + }; + + ggml_backend_buffer_type_t buft = new ggml_backend_buffer_type { + /* .iface = */ ggml_backend_rpc_buffer_type_interface, + /* .context = */ buft_ctx + }; + + ggml_backend_rpc_context * ctx = new ggml_backend_rpc_context { + /* .endpoint = */ endpoint, + /* .name = */ "RPC", + /* .sockfd = */ sockfd, + /* .buft = */ buft + }; + + instances[server_id] = new ggml_backend { + /* .guid = */ ggml_backend_rpc_guid(), + /* .interface = */ ggml_backend_rpc_interface, + /* .context = */ ctx + }; + + return instances[server_id]; +} + +GGML_API GGML_CALL bool ggml_backend_is_rpc(ggml_backend_t backend) { + return backend != NULL && ggml_guid_matches(backend->guid, ggml_backend_rpc_guid()); +} + +GGML_API GGML_CALL int ggml_backend_rpc_get_server_count(void) { + return endpoints.size(); +} + +// RPC server-side implementation + +static void rpc_alloc_buffer(ggml_backend_t backend, const std::vector & input, std::vector & output) { + // input serialization format: | size (8 bytes) | + uint64_t size; + memcpy(&size, input.data(), sizeof(size)); + ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend); + ggml_backend_buffer_t buffer = ggml_backend_buft_alloc_buffer(buft, size); + uint64_t remote_ptr = reinterpret_cast(buffer); + uint64_t remote_size = buffer->size; + GGML_PRINT_DEBUG("[%s] size: %lu -> remote_ptr: %lx, remote_size: %lu\n", __func__, size, remote_ptr, remote_size); + // output serialization format: | remote_ptr (8 bytes) | remote_size (8 bytes) | + output.resize(2*sizeof(uint64_t), 0); + memcpy(output.data(), &remote_ptr, sizeof(remote_ptr)); + memcpy(output.data() + sizeof(uint64_t), &remote_size, sizeof(remote_size)); +} + +static void rpc_buffer_get_base(const std::vector & input, std::vector & output) { + // input serialization format: | remote_ptr (8 bytes) | + uint64_t remote_ptr; + memcpy(&remote_ptr, input.data(), sizeof(remote_ptr)); + GGML_PRINT_DEBUG("[%s] remote_ptr: %lx\n", __func__, remote_ptr); + ggml_backend_buffer_t buffer = reinterpret_cast(remote_ptr); + void * base = ggml_backend_buffer_get_base(buffer); + // output serialization format: | base_ptr (8 bytes) | + uint64_t base_ptr = reinterpret_cast(base); + output.resize(sizeof(uint64_t), 0); + memcpy(output.data(), &base_ptr, sizeof(base_ptr)); +} + +static void rpc_free_buffer(const std::vector & input) { + // input serialization format: | remote_ptr (8 bytes) | + uint64_t remote_ptr; + memcpy(&remote_ptr, input.data(), sizeof(remote_ptr)); + GGML_PRINT_DEBUG("[%s] remote_ptr: %lx\n", __func__, remote_ptr); + ggml_backend_buffer_t buffer = reinterpret_cast(remote_ptr); + ggml_backend_buffer_free(buffer); +} + +static void rpc_buffer_clear(const std::vector & input) { + // input serialization format: | remote_ptr (8 bytes) | value (1 byte) | + uint64_t remote_ptr; + memcpy(&remote_ptr, input.data(), sizeof(remote_ptr)); + uint8_t value; + memcpy(&value, input.data() + sizeof(uint64_t), sizeof(value)); + GGML_PRINT_DEBUG("[%s] remote_ptr: %lx, value: %u\n", __func__, remote_ptr, value); + ggml_backend_buffer_t buffer = reinterpret_cast(remote_ptr); + ggml_backend_buffer_clear(buffer, value); +} + +static void rpc_set_tensor(const std::vector & input) { + // serialization format: | rpc_tensor | offset (8 bytes) | data (size bytes) | + const rpc_tensor * in_tensor = (const rpc_tensor *)input.data(); + uint64_t offset; + memcpy(&offset, input.data() + sizeof(rpc_tensor), sizeof(offset)); + size_t size = input.size() - sizeof(rpc_tensor) - sizeof(offset); + + struct ggml_init_params params { + /*.mem_size =*/ ggml_tensor_overhead(), + /*.mem_buffer =*/ NULL, + /*.no_alloc =*/ true, + }; + struct ggml_context * ctx = ggml_init(params); + ggml_tensor * tensor = deserialize_tensor(ctx, in_tensor); + GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %lu, size: %lu\n", __func__, (void*)tensor->buffer, tensor->data, offset, size); + const void * data = input.data() + sizeof(rpc_tensor) + sizeof(offset); + ggml_backend_tensor_set(tensor, data, offset, size); + ggml_free(ctx); +} + +static void rpc_get_tensor(const std::vector & input, std::vector & output) { + // serialization format: | rpc_tensor | offset (8 bytes) | size (8 bytes) | + const rpc_tensor * in_tensor = (const rpc_tensor *)input.data(); + uint64_t offset; + memcpy(&offset, input.data() + sizeof(rpc_tensor), sizeof(offset)); + uint64_t size; + memcpy(&size, input.data() + sizeof(rpc_tensor) + sizeof(offset), sizeof(size)); + + struct ggml_init_params params { + /*.mem_size =*/ ggml_tensor_overhead(), + /*.mem_buffer =*/ NULL, + /*.no_alloc =*/ true, + }; + struct ggml_context * ctx = ggml_init(params); + ggml_tensor * tensor = deserialize_tensor(ctx, in_tensor); + GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %lu, size: %lu\n", __func__, (void*)tensor->buffer, tensor->data, offset, size); + // output serialization format: | data (size bytes) | + output.resize(size, 0); + ggml_backend_tensor_get(tensor, output.data(), offset, size); + ggml_free(ctx); +} + +static void rpc_copy_tensor(const std::vector & input, std::vector & output) { + // serialization format: | rpc_tensor src | rpc_tensor dst | + const rpc_tensor * rpc_src = (const rpc_tensor *)input.data(); + const rpc_tensor * rpc_dst = (const rpc_tensor *)(input.data() + sizeof(rpc_src)); + + struct ggml_init_params params { + /*.mem_size =*/ 2*ggml_tensor_overhead(), + /*.mem_buffer =*/ NULL, + /*.no_alloc =*/ true, + }; + struct ggml_context * ctx = ggml_init(params); + ggml_tensor * src = deserialize_tensor(ctx, rpc_src); + ggml_tensor * dst = deserialize_tensor(ctx, rpc_dst); + GGML_PRINT_DEBUG("[%s] src->buffer: %p, dst->buffer: %p\n", __func__, (void*)src->buffer, (void*)dst->buffer); + bool result = ggml_backend_buffer_copy_tensor(src, dst); + // output serialization format: | result (1 byte) | + output.resize(1, 0); + output[0] = result; + ggml_free(ctx); +} + +static struct ggml_tensor * create_node(uint64_t id, + struct ggml_context * ctx, + const std::unordered_map & tensor_ptrs, + std::unordered_map & tensor_map) { + if (id == 0) { + return nullptr; + } + if (tensor_map.find(id) != tensor_map.end()) { + return tensor_map[id]; + } + const rpc_tensor * tensor = tensor_ptrs.at(id); + struct ggml_tensor * result = deserialize_tensor(ctx, tensor); + tensor_map[id] = result; + for (int i = 0; i < GGML_MAX_SRC; i++) { + result->src[i] = create_node(tensor->src[i], ctx, tensor_ptrs, tensor_map); + } + result->view_src = create_node(tensor->view_src, ctx, tensor_ptrs, tensor_map); + result->view_offs = tensor->view_offs; + return result; +} + +static void rpc_graph_compute(ggml_backend_t backend, const std::vector & input, std::vector & output) { + // serialization format: + // | n_nodes (4 bytes) | nodes (n_nodes * sizeof(uint64_t) | n_tensors (4 bytes) | tensors (n_tensors * sizeof(rpc_tensor)) | + uint32_t n_nodes; + memcpy(&n_nodes, input.data(), sizeof(n_nodes)); + const uint64_t * nodes = (const uint64_t *)(input.data() + sizeof(n_nodes)); + uint32_t n_tensors; + memcpy(&n_tensors, input.data() + sizeof(n_nodes) + n_nodes*sizeof(uint64_t), sizeof(n_tensors)); + const rpc_tensor * tensors = (const rpc_tensor *)(input.data() + sizeof(n_nodes) + n_nodes*sizeof(uint64_t) + sizeof(n_tensors)); + GGML_PRINT_DEBUG("[%s] n_nodes: %u, n_tensors: %u\n", __func__, n_nodes, n_tensors); + + static size_t buf_size = ggml_tensor_overhead()*(n_nodes + n_tensors) + ggml_graph_overhead(); + struct ggml_init_params params = { + /*.mem_size =*/ buf_size, + /*.mem_buffer =*/ NULL, + /*.no_alloc =*/ true, + }; + struct ggml_context * ctx = ggml_init(params); + struct ggml_cgraph * graph = ggml_new_graph_custom(ctx, n_nodes, false); + graph->n_nodes = n_nodes; + std::unordered_map tensor_ptrs; + for (uint32_t i = 0; i < n_tensors; i++) { + tensor_ptrs[tensors[i].id] = &tensors[i]; + } + std::unordered_map tensor_map; + for (uint32_t i = 0; i < n_nodes; i++) { + graph->nodes[i] = create_node(nodes[i], ctx, tensor_ptrs, tensor_map); + } + ggml_status status = ggml_backend_graph_compute(backend, graph); + // output serialization format: | status (1 byte) | + output.resize(1, 0); + output[0] = status; + ggml_free(ctx); +} + +void rpc_serve_client(ggml_backend_t backend, int sockfd) { + while (true) { + uint8_t cmd; + if (!recv_data(sockfd, &cmd, 1)) { + break; + } + std::vector input; + std::vector output; + uint64_t input_size; + if (!recv_data(sockfd, &input_size, sizeof(input_size))) { + break; + } + input.resize(input_size); + if (!recv_data(sockfd, input.data(), input_size)) { + break; + } + switch (cmd) { + case ALLOC_BUFFER: { + rpc_alloc_buffer(backend, input, output); + break; + } + case BUFFER_GET_BASE: { + rpc_buffer_get_base(input, output); + break; + } + case FREE_BUFFER: { + rpc_free_buffer(input); + break; + } + case BUFFER_CLEAR: { + rpc_buffer_clear(input); + break; + } + case SET_TENSOR: { + rpc_set_tensor(input); + break; + } + case GET_TENSOR: { + rpc_get_tensor(input, output); + break; + } + case COPY_TENSOR: { + rpc_copy_tensor(input, output); + break; + } + case GRAPH_COMPUTE: { + rpc_graph_compute(backend, input, output); + break; + } + default: { + fprintf(stderr, "Unknown command: %d\n", cmd); + break; + } + } + uint64_t output_size = output.size(); + if (!send_data(sockfd, &output_size, sizeof(output_size))) { + break; + } + if (!send_data(sockfd, output.data(), output_size)) { + break; + } + } +} diff --git a/ggml-rpc.h b/ggml-rpc.h new file mode 100644 index 000000000..5695a37c5 --- /dev/null +++ b/ggml-rpc.h @@ -0,0 +1,53 @@ +#pragma once + +#include "ggml.h" +#include "ggml-backend.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define GGML_RPC_MAX_SERVERS 16 + +struct rpc_tensor { + uint64_t id; + uint32_t type; + uint64_t buffer; + uint32_t ne[GGML_MAX_DIMS]; + uint32_t nb[GGML_MAX_DIMS]; + uint32_t op; + int32_t op_params[GGML_MAX_OP_PARAMS / sizeof(int32_t)]; + int32_t flags; + uint64_t src[GGML_MAX_SRC]; + uint64_t view_src; + uint64_t view_offs; + uint64_t data; + char name[GGML_MAX_NAME]; +}; + +enum rpc_cmd { + ALLOC_BUFFER = 0, + BUFFER_GET_BASE, + FREE_BUFFER, + BUFFER_CLEAR, + SET_TENSOR, + GET_TENSOR, + COPY_TENSOR, + GRAPH_COMPUTE, +}; + +GGML_API GGML_CALL void ggml_rpc_init(const char * rpc_servers); + +// backend API +GGML_API GGML_CALL ggml_backend_t ggml_backend_rpc_init(int server_id); +GGML_API GGML_CALL bool ggml_backend_is_rpc(ggml_backend_t backend); + +GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(int server_id); + +GGML_API GGML_CALL int ggml_backend_rpc_get_server_count(void); + +GGML_API GGML_CALL void rpc_serve_client(ggml_backend_t backend, int sockfd); + +#ifdef __cplusplus +} +#endif diff --git a/llama.cpp b/llama.cpp index ad35e4a2e..75101fd13 100644 --- a/llama.cpp +++ b/llama.cpp @@ -7,6 +7,10 @@ #include "ggml-alloc.h" #include "ggml-backend.h" +#ifdef GGML_USE_RPC +# include "ggml-rpc.h" +#endif + #ifdef GGML_USE_CUDA # include "ggml-cuda.h" #elif defined(GGML_USE_CLBLAST) @@ -1690,6 +1694,8 @@ static ggml_backend_buffer_type_t llama_default_buffer_type_offload(int gpu) { #ifdef GGML_USE_METAL buft = ggml_backend_metal_buffer_type(); +#elif defined(GGML_USE_RPC) + buft = ggml_backend_rpc_buffer_type(gpu); #elif defined(GGML_USE_CUDA) buft = ggml_backend_cuda_buffer_type(gpu); #elif defined(GGML_USE_VULKAN) @@ -1739,6 +1745,8 @@ static ggml_backend_buffer_type_t llama_default_buffer_type_split(int fallback_g static size_t llama_get_device_count() { #if defined(GGML_USE_CUDA) return ggml_backend_cuda_get_device_count(); +#elif defined(GGML_USE_RPC) + return ggml_backend_rpc_get_server_count(); #elif defined(GGML_USE_SYCL) return ggml_backend_sycl_get_device_count(); #elif defined(GGML_USE_VULKAN) @@ -1754,6 +1762,10 @@ static size_t llama_get_device_memory(int device) { size_t free; ggml_backend_cuda_get_device_memory(device, &free, &total); return free; +#elif defined(GGML_USE_RPC) + // TODO: implement + GGML_UNUSED(device); + return 1; #elif defined(GGML_USE_SYCL) size_t total; size_t free; @@ -15483,7 +15495,7 @@ bool llama_supports_mlock(void) { bool llama_supports_gpu_offload(void) { #if defined(GGML_USE_CUDA) || defined(GGML_USE_CLBLAST) || defined(GGML_USE_METAL) || defined(GGML_USE_VULKAN) || \ - defined(GGML_USE_SYCL) || defined(GGML_USE_KOMPUTE) + defined(GGML_USE_SYCL) || defined(GGML_USE_KOMPUTE) || defined(GGML_USE_RPC) // Defined when llama.cpp is compiled with support for offloading model layers to GPU. return true; #else @@ -15512,6 +15524,16 @@ void llama_numa_init(enum ggml_numa_strategy numa) { } } +void llama_rpc_init(const char * rpc_servers) { +#ifdef GGML_USE_RPC + ggml_rpc_init(rpc_servers); +#else + if (rpc_servers != nullptr) { + LLAMA_LOG_WARN("%s: RPC support is not enabled in this build\n", __func__); + } +#endif +} + void llama_backend_free(void) { #ifdef GGML_USE_MPI ggml_mpi_backend_free(); @@ -15703,6 +15725,16 @@ struct llama_context * llama_new_context_with_model( } ctx->backends.push_back(ctx->backend_metal); } +#elif defined(GGML_USE_RPC) + for (int server = 0; server < ggml_backend_rpc_get_server_count(); ++server) { + ggml_backend_t backend = ggml_backend_rpc_init(server); + if (backend == nullptr) { + LLAMA_LOG_ERROR("%s: failed to initialize RPC%d backend\n", __func__, server); + llama_free(ctx); + return nullptr; + } + ctx->backends.push_back(backend); + } #elif defined(GGML_USE_CUDA) if (model->split_mode == LLAMA_SPLIT_MODE_NONE || model->split_mode == LLAMA_SPLIT_MODE_ROW) { // with split_mode LLAMA_SPLIT_MODE_NONE or LLAMA_SPLIT_MODE_ROW, only the main GPU backend is used diff --git a/llama.h b/llama.h index 0b2e708d0..7b93ac202 100644 --- a/llama.h +++ b/llama.h @@ -383,6 +383,7 @@ extern "C" { //optional: LLAMA_API void llama_numa_init(enum ggml_numa_strategy numa); + LLAMA_API void llama_rpc_init(const char * rpc_servers); // Call once at the end of the program - currently only used for MPI LLAMA_API void llama_backend_free(void);