From 7899cff8117a4ac633f0c378dfda4bc7589631bb Mon Sep 17 00:00:00 2001 From: Zihao Chen Date: Mon, 15 Jul 2024 15:51:23 -0500 Subject: [PATCH] add monitor registry for rpc instance endpoint --- examples/rpc/rpc-server.cpp | 109 ++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/examples/rpc/rpc-server.cpp b/examples/rpc/rpc-server.cpp index 243f7546f..f75066636 100644 --- a/examples/rpc/rpc-server.cpp +++ b/examples/rpc/rpc-server.cpp @@ -12,10 +12,111 @@ #else # include #endif +#include #include #include +#include #include +#include +#include +#include +#include +#include +#include +#include +class Registry { +public: + Registry(const std::string& server_ip, int port) : server_ip(server_ip), port(port) { + log("Registry initialized"); + } + + void add_field(const std::string& key, const std::string& value) { + payload[key] = value; + log("Added field: " + key + " = " + value); + } + + bool register_with_central(const std::string& endpoint) { + payload["endpoint"] = endpoint; + payload["status"] = "unknown"; + payload["last_checked"] = "unknown"; + std::string json_payload = create_json_payload(); + log("Attempting to register with payload: " + json_payload); + return send_request(json_payload); + } + +private: + std::string server_ip; + int port; + std::map payload; + + std::string create_json_payload() const { + std::ostringstream json; + json << "{"; + for (auto it = payload.begin(); it != payload.end(); ++it) { + json << "\"" << it->first << "\":\"" << it->second << "\""; + if (std::next(it) != payload.end()) { + json << ","; + } + } + json << "}"; + return json.str(); + } + + std::string build_http_request(const std::string& data) const { + std::ostringstream request; + request << "POST /register HTTP/1.1\r\n"; + request << "Host: " << server_ip << "\r\n"; + request << "Content-Type: application/json\r\n"; + request << "Content-Length: " << data.length() << "\r\n"; + request << "Connection: close\r\n\r\n"; + request << data; + return request.str(); + } + + bool send_request(const std::string& data) const { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + log("Socket creation error"); + return false; + } + + struct sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(port); + + if (inet_pton(AF_INET, server_ip.c_str(), &serv_addr.sin_addr) <= 0) { + log("Invalid address / Address not supported"); + return false; + } + + if (connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) { + log("Connection failed"); + return false; + } + + std::string request = build_http_request(data); + send(sock, request.c_str(), request.length(), 0); + log("Sent request: " + request); + + char buffer[1024] = {0}; + read(sock, buffer, 1024); + close(sock); + + log("Received response: " + std::string(buffer)); + return true; + } + + void log(const std::string& message) const { + std::ofstream log_file("registry_log.txt", std::ios_base::app); + auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + log_file << std::put_time(std::localtime(&now), "%Y-%m-%d %H:%M:%S") << " - " << message << std::endl; + } +}; + +void print_colored(const std::string& text, const std::string& color_code) { + std::cout << "\033[" << color_code << "m" << text << "\033[0m" << std::endl; +} struct rpc_server_params { std::string host = "0.0.0.0"; @@ -128,6 +229,8 @@ static void get_backend_memory(size_t * free_mem, size_t * total_mem) { int main(int argc, char * argv[]) { rpc_server_params params; + Registry registry("0.0.0.0", 5000); + if (!rpc_server_params_parse(argc, argv, params)) { fprintf(stderr, "Invalid parameters\n"); return 1; @@ -138,6 +241,7 @@ int main(int argc, char * argv[]) { return 1; } std::string endpoint = params.host + ":" + std::to_string(params.port); + size_t free_mem, total_mem; if (params.backend_mem > 0) { free_mem = params.backend_mem; @@ -146,6 +250,11 @@ int main(int argc, char * argv[]) { get_backend_memory(&free_mem, &total_mem); } printf("\nStarting Antigma node on %s, backend memory: %zu MB\n", endpoint.c_str(), free_mem / (1024 * 1024)); + if (registry.register_with_central(endpoint)) { + print_colored("Registered successfully", "32"); + } else { + print_colored("Registered successfully", "31"); + } start_rpc_server(backend, endpoint.c_str(), free_mem, total_mem); ggml_backend_free(backend); return 0;