add monitor registry for rpc instance endpoint

This commit is contained in:
Zihao Chen 2024-07-15 15:51:23 -05:00
parent e3e86419ef
commit 7899cff811

View file

@ -12,10 +12,111 @@
#else
# include <unistd.h>
#endif
#include <ctime>
#include <string>
#include <stdio.h>
#include <sstream>
#include <iostream>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <map>
#include <fstream>
#include <chrono>
#include <iomanip>
class Registry {
public:
Registry(const std::string& server_ip, int port) : server_ip(server_ip), port(port) {
log("Registry initialized");
}
void add_field(const std::string& key, const std::string& value) {
payload[key] = value;
log("Added field: " + key + " = " + value);
}
bool register_with_central(const std::string& endpoint) {
payload["endpoint"] = endpoint;
payload["status"] = "unknown";
payload["last_checked"] = "unknown";
std::string json_payload = create_json_payload();
log("Attempting to register with payload: " + json_payload);
return send_request(json_payload);
}
private:
std::string server_ip;
int port;
std::map<std::string, std::string> payload;
std::string create_json_payload() const {
std::ostringstream json;
json << "{";
for (auto it = payload.begin(); it != payload.end(); ++it) {
json << "\"" << it->first << "\":\"" << it->second << "\"";
if (std::next(it) != payload.end()) {
json << ",";
}
}
json << "}";
return json.str();
}
std::string build_http_request(const std::string& data) const {
std::ostringstream request;
request << "POST /register HTTP/1.1\r\n";
request << "Host: " << server_ip << "\r\n";
request << "Content-Type: application/json\r\n";
request << "Content-Length: " << data.length() << "\r\n";
request << "Connection: close\r\n\r\n";
request << data;
return request.str();
}
bool send_request(const std::string& data) const {
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
log("Socket creation error");
return false;
}
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
if (inet_pton(AF_INET, server_ip.c_str(), &serv_addr.sin_addr) <= 0) {
log("Invalid address / Address not supported");
return false;
}
if (connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) {
log("Connection failed");
return false;
}
std::string request = build_http_request(data);
send(sock, request.c_str(), request.length(), 0);
log("Sent request: " + request);
char buffer[1024] = {0};
read(sock, buffer, 1024);
close(sock);
log("Received response: " + std::string(buffer));
return true;
}
void log(const std::string& message) const {
std::ofstream log_file("registry_log.txt", std::ios_base::app);
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
log_file << std::put_time(std::localtime(&now), "%Y-%m-%d %H:%M:%S") << " - " << message << std::endl;
}
};
void print_colored(const std::string& text, const std::string& color_code) {
std::cout << "\033[" << color_code << "m" << text << "\033[0m" << std::endl;
}
struct rpc_server_params {
std::string host = "0.0.0.0";
@ -128,6 +229,8 @@ static void get_backend_memory(size_t * free_mem, size_t * total_mem) {
int main(int argc, char * argv[]) {
rpc_server_params params;
Registry registry("0.0.0.0", 5000);
if (!rpc_server_params_parse(argc, argv, params)) {
fprintf(stderr, "Invalid parameters\n");
return 1;
@ -138,6 +241,7 @@ int main(int argc, char * argv[]) {
return 1;
}
std::string endpoint = params.host + ":" + std::to_string(params.port);
size_t free_mem, total_mem;
if (params.backend_mem > 0) {
free_mem = params.backend_mem;
@ -146,6 +250,11 @@ int main(int argc, char * argv[]) {
get_backend_memory(&free_mem, &total_mem);
}
printf("\nStarting Antigma node on %s, backend memory: %zu MB\n", endpoint.c_str(), free_mem / (1024 * 1024));
if (registry.register_with_central(endpoint)) {
print_colored("Registered successfully", "32");
} else {
print_colored("Registered successfully", "31");
}
start_rpc_server(backend, endpoint.c_str(), free_mem, total_mem);
ggml_backend_free(backend);
return 0;