diff --git a/conmon/conmon.c b/conmon/conmon.c index e391c194..960650d9 100644 --- a/conmon/conmon.c +++ b/conmon/conmon.c @@ -25,6 +25,8 @@ #include "cmsg.h" +#include "pollset.h" + #define pexit(fmt, ...) \ do { \ fprintf(stderr, "[conmon:e]: " fmt " %m\n", ##__VA_ARGS__); \ @@ -432,6 +434,169 @@ static char *escape_json_string(const char *str) return g_string_free (escaped, FALSE); } +/* Global state */ + +static int masterfd_stdout = -1; +static int masterfd_stderr = -1; +static int num_stdio_fds = 0; + +/* Used for attach */ +static int conn_sock = -1; + +static int logfd = -1; +static int efd = -1; +static int cfd = -1; +/* Used for OOM notification API */ +static int ofd = -1; + + +static void stdio_input (G_GNUC_UNUSED polling_set_t *set, int fd, gpointer user_data) +{ + char buf[BUF_SIZE]; + stdpipe_t pipe = GPOINTER_TO_INT(user_data); + ssize_t num_read; + + num_read = read(fd, buf, BUF_SIZE); + if (num_read <= 0) { + nwarn("stdio_input read failed"); + return; + } + + if (write_k8s_log(logfd, pipe, buf, num_read) < 0) { + nwarn("write_k8s_log failed"); + return; + } + + if (conn_sock > 0) { + if (write_all(conn_sock, buf, num_read) < 0) { + nwarn("Failed to write to socket"); + } + } +} + +static bool stdio_error (G_GNUC_UNUSED polling_set_t *set, int fd, gpointer user_data) +{ + stdpipe_t pipe = GPOINTER_TO_INT(user_data); + printf("closing stdio fd %d\n", fd); + if (pipe == STDOUT_PIPE) + masterfd_stdout = -1; + if (pipe == STDERR_PIPE) + masterfd_stderr = -1; + num_stdio_fds--; + return true; +} + +static void oom_input (G_GNUC_UNUSED polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data) +{ + uint64_t oom_event; + + if (read(fd, &oom_event, sizeof(uint64_t)) != sizeof(uint64_t)) + nwarn("Failed to read event from eventfd"); + ninfo("OOM received"); + if (open("oom", O_CREAT, 0666) < 0) { + nwarn("Failed to write oom file"); + } +} + +static void conn_sock_input (G_GNUC_UNUSED polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data) +{ + ssize_t num_read; + char buf[BUF_SIZE]; + + num_read = read(fd, buf, BUF_SIZE); + if (num_read <= 0) + return; + ninfo("got data on connection: %zd", num_read); + if (terminal) { + if (write_all(masterfd_stdout, buf, num_read) < 0) { + nwarn("Failed to write to master pty"); + } + } +} + +static bool conn_sock_error (G_GNUC_UNUSED polling_set_t *set, G_GNUC_UNUSED int fd, G_GNUC_UNUSED gpointer user_data) +{ + conn_sock = -1; + return true; +} + +static void attach_input (polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data) +{ + conn_sock = accept(fd, NULL, NULL); + if (conn_sock == -1) { + nwarn("Failed to accept client connection on attach socket"); + return; + } + if (polling_set_add_fd(set, conn_sock, conn_sock_input, conn_sock_error, NULL) < 0) + pexit("Failed to add client socket fd to epoll"); + ninfo("Accepted connection"); +} + +static void ctl_input (G_GNUC_UNUSED polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data) +{ + #define CTLBUFSZ 200 + static char ctlbuf[CTLBUFSZ]; + static int readsz = CTLBUFSZ - 1; + static char *readptr = ctlbuf; + ssize_t num_read; + int ctl_msg_type = -1; + int height = -1; + int width = -1; + struct winsize ws; + int ret; + + num_read = read(fd, readptr, readsz); + if (num_read <= 0) { + nwarn("Failed to read from control fd"); + return; + } + readptr[num_read] = '\0'; + ninfo("Got ctl message: %s\n", ctlbuf); + + char *beg = ctlbuf; + char *newline = strchrnul(beg, '\n'); + /* Process each message which ends with a line */ + while (*newline != '\0') { + ret = sscanf(ctlbuf, "%d %d %d\n", &ctl_msg_type, &height, &width); + if (ret != 3) { + nwarn("Failed to sscanf message"); + continue; + } + ninfo("Message type: %d, Height: %d, Width: %d", ctl_msg_type, height, width); + ret = ioctl(masterfd_stdout, TIOCGWINSZ, &ws); + ninfo("Existing size: %d %d", ws.ws_row, ws.ws_col); + ws.ws_row = height; + ws.ws_col = width; + ret = ioctl(masterfd_stdout, TIOCSWINSZ, &ws); + if (ret == -1) { + nwarn("Failed to set process pty terminal size"); + } + beg = newline + 1; + newline = strchrnul(beg, '\n'); + } + if (*beg == '\0') { + /* We exhausted all messages that were complete */ + readptr = ctlbuf; + readsz = CTLBUFSZ - 1; + } else { + /* + * We copy remaining data to beginning of buffer + * and advance readptr after that. + */ + int cp_rem = 0; + do { + ctlbuf[cp_rem++] = *beg++; + } while (*beg != '\0'); + readptr = ctlbuf + cp_rem; + readsz = CTLBUFSZ - 1 - cp_rem; + } +} + +static bool ctl_error (G_GNUC_UNUSED polling_set_t *set, G_GNUC_UNUSED int fd, G_GNUC_UNUSED gpointer user_data) +{ + ninfo("Remote writer to control fd closed"); + return false; /* Don't remove */ +} int main(int argc, char *argv[]) { @@ -445,10 +610,6 @@ int main(int argc, char *argv[]) int cpid = -1; int status; pid_t pid, create_pid; - _cleanup_close_ int logfd = -1; - _cleanup_close_ int masterfd_stdout = -1; - _cleanup_close_ int masterfd_stderr = -1; - _cleanup_close_ int epfd = -1; _cleanup_close_ int csfd = -1; /* Used for !terminal cases. */ int slavefd_stdout = -1; @@ -456,26 +617,17 @@ int main(int argc, char *argv[]) char csname[PATH_MAX] = "/tmp/conmon-term.XXXXXXXX"; char buf[BUF_SIZE]; int num_read; - struct epoll_event ev; - struct epoll_event evlist[MAX_EVENTS]; + _cleanup_pollset_ polling_set_t pollset = POLLING_SET_INIT; int sync_pipe_fd = -1; char *sync_pipe, *endptr; int len; - int num_stdio_fds = 0; GError *error = NULL; GOptionContext *context; GPtrArray *runtime_argv = NULL; - /* Used for OOM notification API */ - _cleanup_close_ int efd = -1; - _cleanup_close_ int cfd = -1; - _cleanup_close_ int ofd = -1; _cleanup_free_ char *memory_cgroup_path = NULL; int wb; - uint64_t oom_event; - /* Used for attach */ - _cleanup_close_ int conn_sock = -1; /* Command line parameters */ context = g_option_context_new("- conmon utility"); @@ -796,15 +948,6 @@ int main(int argc, char *argv[]) /* Setup fifo for reading in terminal resize and other stdio control messages */ _cleanup_close_ int ctlfd = -1; _cleanup_close_ int dummyfd = -1; - int ctl_msg_type = -1; - int height = -1; - int width = -1; - struct winsize ws; - #define CTLBUFSZ 200 - char ctlbuf[CTLBUFSZ]; - char *readptr = ctlbuf; - int readsz = CTLBUFSZ - 1; - int cp_rem = 0; if (!exec) { snprintf(ctl_fifo_path, PATH_MAX, "%s/ctl", bundle_path); ninfo("ctl fifo path: %s", ctl_fifo_path); @@ -862,174 +1005,43 @@ int main(int argc, char *argv[]) pexit("Failed to write to cgroup.event_control"); } - /* Create epoll_ctl so that we can handle read/write events. */ - /* - * TODO: Switch to libuv so that we can also implement exec as well as - * attach and other important things. Using epoll directly is just - * really nasty. - */ - epfd = epoll_create1(EPOLL_CLOEXEC); - if (epfd < 0) - pexit("epoll_create"); - ev.events = EPOLLIN; + if (polling_set_init(&pollset) < 0) + pexit("polling_set_init"); + if (masterfd_stdout >= 0) { - ev.data.fd = masterfd_stdout; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) + if (polling_set_add_fd(&pollset, masterfd_stdout, stdio_input, stdio_error, GINT_TO_POINTER(STDOUT_PIPE)) < 0) pexit("Failed to add console masterfd_stdout to epoll"); num_stdio_fds++; } if (masterfd_stderr >= 0) { - ev.data.fd = masterfd_stderr; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) - pexit("Failed to add console masterfd_stderr to epoll"); + if (polling_set_add_fd(&pollset, masterfd_stderr, stdio_input, stdio_error, GINT_TO_POINTER(STDERR_PIPE)) < 0) + pexit("Failed to add console masterfd_stdout to epoll"); num_stdio_fds++; } /* Add the OOM event fd to epoll */ if (oom_handling_enabled) { - ev.data.fd = efd; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) + if (polling_set_add_fd(&pollset, efd, oom_input, NULL, NULL) < 0) pexit("Failed to add OOM eventfd to epoll"); } /* Add the attach socket to epoll */ if (afd > 0) { - ev.data.fd = afd; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) + if (polling_set_add_fd(&pollset, afd, attach_input, NULL, NULL) < 0) pexit("Failed to add attach socket fd to epoll"); } /* Add control fifo fd to epoll */ if (ctlfd > 0) { - ev.data.fd = ctlfd; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) + if (polling_set_add_fd(&pollset, ctlfd, ctl_input, ctl_error, NULL) < 0) pexit("Failed to add control fifo fd to epoll"); } /* Log all of the container's output. */ while (num_stdio_fds > 0) { - int ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1); - if (ready < 0) - continue; - - for (int i = 0; i < ready; i++) { - if (evlist[i].events & EPOLLIN) { - int masterfd = evlist[i].data.fd; - stdpipe_t pipe = NO_PIPE; - if (masterfd == masterfd_stdout) - pipe = STDOUT_PIPE; - else if (masterfd == masterfd_stderr) - pipe = STDERR_PIPE; - else if (oom_handling_enabled && masterfd == efd) { - if (read(efd, &oom_event, sizeof(uint64_t)) != sizeof(uint64_t)) - nwarn("Failed to read event from eventfd"); - ninfo("OOM received"); - if (open("oom", O_CREAT, 0666) < 0) { - nwarn("Failed to write oom file"); - } - } else if (evlist[i].data.fd == afd) { - conn_sock = accept(afd, NULL, NULL); - if (conn_sock == -1) { - nwarn("Failed to accept client connection on attach socket"); - continue; - } - ev.events = EPOLLIN; - ev.data.fd = conn_sock; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) { - pexit("Failed to add client socket fd to epoll"); - } - ninfo("Accepted connection"); - } else if (!exec && evlist[i].data.fd == ctlfd) { - num_read = read(ctlfd, readptr, readsz); - if (num_read <= 0) { - nwarn("Failed to read from control fd"); - continue; - } - ctlbuf[num_read] = '\0'; - ninfo("Got ctl message: %s\n", ctlbuf); - - char *beg = ctlbuf; - char *newline = strchrnul(beg, '\n'); - /* Process each message which ends with a line */ - while (*newline != '\0') { - ret = sscanf(ctlbuf, "%d %d %d\n", &ctl_msg_type, &height, &width); - if (ret != 3) { - nwarn("Failed to sscanf message"); - continue; - } - ninfo("Message type: %d, Height: %d, Width: %d", ctl_msg_type, height, width); - ret = ioctl(masterfd_stdout, TIOCGWINSZ, &ws); - ninfo("Existing size: %d %d", ws.ws_row, ws.ws_col); - ws.ws_row = height; - ws.ws_col = width; - ret = ioctl(masterfd_stdout, TIOCSWINSZ, &ws); - if (ret == -1) { - nwarn("Failed to set process pty terminal size"); - } - beg = newline + 1; - newline = strchrnul(beg, '\n'); - } - if (*beg == '\0') { - /* We exhausted all messages that were complete */ - readptr = ctlbuf; - readsz = CTLBUFSZ - 1; - } else { - /* - * We copy remaining data to beginning of buffer - * and advance readptr after that. - */ - cp_rem = 0; - do { - ctlbuf[cp_rem++] = *beg++; - } while (*beg != '\0'); - readptr = ctlbuf + cp_rem; - readsz = CTLBUFSZ - 1 - cp_rem; - } - } else { - num_read = read(masterfd, buf, BUF_SIZE); - if (num_read <= 0) - goto out; - ninfo("got data on connection: %d", num_read); - if (terminal) { - if (write_all(masterfd_stdout, buf, num_read) < 0) { - nwarn("Failed to write to master pty"); - } - } - } - - if (masterfd == masterfd_stdout || masterfd == masterfd_stderr) { - num_read = read(masterfd, buf, BUF_SIZE); - if (num_read <= 0) - goto out; - - if (write_k8s_log(logfd, pipe, buf, num_read) < 0) { - nwarn("write_k8s_log failed"); - goto out; - } - - if (conn_sock > 0) { - if (write_all(conn_sock, buf, num_read) < 0) { - nwarn("Failed to write to socket"); - } - } - } - } else if (evlist[i].events & (EPOLLHUP | EPOLLERR)) { - if (!exec && evlist[i].data.fd == ctlfd) { - ninfo("Remote writer to control fd closed"); - continue; - } - printf("closing fd %d\n", evlist[i].data.fd); - if (close(evlist[i].data.fd) < 0) - pexit("close"); - if (!exec && evlist[i].data.fd == conn_sock) { - conn_sock = -1; - } - num_stdio_fds--; - } - } + polling_set_iterate(&pollset); } -out: /* Wait for the container process and record its exit code */ while ((pid = waitpid(-1, &status, 0)) > 0) { int exit_status = WEXITSTATUS(status);