Merge pull request #571 from alexlarsson/conmon-glib-mainloop

conmon: Use glib mainloop instead of epoll
This commit is contained in:
Antonio Murdaca 2017-06-09 12:03:56 +02:00 committed by GitHub
commit 952ae0db1c

View file

@ -7,7 +7,6 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/epoll.h>
#include <sys/prctl.h> #include <sys/prctl.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h> #include <sys/types.h>
@ -22,6 +21,7 @@
#include <unistd.h> #include <unistd.h>
#include <glib.h> #include <glib.h>
#include <glib-unix.h>
#include "cmsg.h" #include "cmsg.h"
@ -435,9 +435,249 @@ static char *escape_json_string(const char *str)
} }
/* Global state */
static int runtime_status = -1;
static int masterfd_stdout = -1;
static int masterfd_stderr = -1;
static int num_stdio_fds = 0;
/* Used for attach */
static int conn_sock = -1;
static int logfd = -1;
static int oom_efd = -1;
static int afd = -1;
static int cfd = -1;
/* Used for OOM notification API */
static int ofd = -1;
static GMainLoop *main_loop = NULL;
static gboolean stdio_cb(int fd, GIOCondition condition, gpointer user_data)
{
char buf[BUF_SIZE];
stdpipe_t pipe = GPOINTER_TO_INT(user_data);
ssize_t num_read = 0;
if ((condition & G_IO_IN) != 0) {
num_read = read(fd, buf, BUF_SIZE);
if (num_read < 0) {
nwarn("stdio_input read failed %s", strerror(errno));
return G_SOURCE_CONTINUE;
}
if (num_read > 0) {
if (write_k8s_log(logfd, pipe, buf, num_read) < 0) {
nwarn("write_k8s_log failed");
return G_SOURCE_CONTINUE;
}
if (conn_sock > 0 && write_all(conn_sock, buf, num_read) < 0) {
nwarn("Failed to write to socket");
}
return G_SOURCE_CONTINUE;
}
}
/* End of input */
if (pipe == STDOUT_PIPE)
masterfd_stdout = -1;
if (pipe == STDERR_PIPE)
masterfd_stderr = -1;
num_stdio_fds--;
if (num_stdio_fds == 0) {
ninfo ("No more stdio, killing main loop");
g_main_loop_quit (main_loop);
}
close (fd);
return G_SOURCE_REMOVE;
}
static gboolean oom_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
uint64_t oom_event;
ssize_t num_read = 0;
if ((condition & G_IO_IN) != 0) {
num_read = read(fd, &oom_event, sizeof(uint64_t));
if (num_read < 0) {
nwarn("Failed to read oom event from eventfd");
return G_SOURCE_CONTINUE;
}
if (num_read > 0) {
if (num_read != sizeof(uint64_t))
nwarn("Failed to read full oom event from eventfd");
ninfo("OOM received");
if (open("oom", O_CREAT, 0666) < 0) {
nwarn("Failed to write oom file");
}
return G_SOURCE_CONTINUE;
}
}
/* End of input */
close (fd);
oom_efd = -1;
return G_SOURCE_REMOVE;
}
static gboolean conn_sock_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
char buf[BUF_SIZE];
ssize_t num_read = 0;
if ((condition & G_IO_IN) != 0) {
num_read = read(fd, buf, BUF_SIZE);
if (num_read < 0)
return G_SOURCE_CONTINUE;
if (num_read > 0) {
ninfo("got data on connection: %zd", num_read);
if (terminal && write_all(masterfd_stdout, buf, num_read) < 0) {
nwarn("Failed to write to master pty");
}
return G_SOURCE_CONTINUE;
}
}
/* End of input */
close (fd);
conn_sock = -1;
return G_SOURCE_REMOVE;
}
static gboolean attach_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
conn_sock = accept(fd, NULL, NULL);
if (conn_sock == -1) {
if (errno != EWOULDBLOCK)
nwarn("Failed to accept client connection on attach socket");
} else {
g_unix_fd_add (conn_sock, G_IO_IN, conn_sock_cb, GINT_TO_POINTER(STDOUT_PIPE));
ninfo("Accepted connection %d", conn_sock);
}
return G_SOURCE_CONTINUE;
}
static gboolean ctrl_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
#define CTLBUFSZ 200
static char ctlbuf[CTLBUFSZ];
static int readsz = CTLBUFSZ - 1;
static char *readptr = ctlbuf;
ssize_t num_read = 0;
int ctl_msg_type = -1;
int height = -1;
int width = -1;
struct winsize ws;
int ret;
num_read = read(fd, readptr, readsz);
if (num_read <= 0) {
nwarn("Failed to read from control fd");
return G_SOURCE_CONTINUE;
}
readptr[num_read] = '\0';
ninfo("Got ctl message: %s\n", ctlbuf);
char *beg = ctlbuf;
char *newline = strchrnul(beg, '\n');
/* Process each message which ends with a line */
while (*newline != '\0') {
ret = sscanf(ctlbuf, "%d %d %d\n", &ctl_msg_type, &height, &width);
if (ret != 3) {
nwarn("Failed to sscanf message");
return G_SOURCE_CONTINUE;
}
ninfo("Message type: %d, Height: %d, Width: %d", ctl_msg_type, height, width);
ret = ioctl(masterfd_stdout, TIOCGWINSZ, &ws);
ninfo("Existing size: %d %d", ws.ws_row, ws.ws_col);
ws.ws_row = height;
ws.ws_col = width;
ret = ioctl(masterfd_stdout, TIOCSWINSZ, &ws);
if (ret == -1) {
nwarn("Failed to set process pty terminal size");
}
beg = newline + 1;
newline = strchrnul(beg, '\n');
}
if (num_read == (CTLBUFSZ - 1) && beg == ctlbuf) {
/*
* We did not find a newline in the entire buffer.
* This shouldn't happen as our buffer is larger than
* the message that we expect to receive.
*/
nwarn("Could not find newline in entire buffer\n");
} else if (*beg == '\0') {
/* We exhausted all messages that were complete */
readptr = ctlbuf;
readsz = CTLBUFSZ - 1;
} else {
/*
* We copy remaining data to beginning of buffer
* and advance readptr after that.
*/
int cp_rem = 0;
do {
ctlbuf[cp_rem++] = *beg++;
} while (*beg != '\0');
readptr = ctlbuf + cp_rem;
readsz = CTLBUFSZ - 1 - cp_rem;
}
return G_SOURCE_CONTINUE;
}
static gboolean terminal_accept_cb(int fd, G_GNUC_UNUSED GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
const char *csname = user_data;
struct file_t console;
int connfd = -1;
ninfo("about to accept from csfd: %d", fd);
connfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
if (connfd < 0)
pexit("Failed to accept console-socket connection");
/* Not accepting anything else. */
close(fd);
unlink(csname);
/* We exit if this fails. */
ninfo("about to recvfd from connfd: %d", connfd);
console = recvfd(connfd);
ninfo("console = {.name = '%s'; .fd = %d}", console.name, console.fd);
free(console.name);
/* We only have a single fd for both pipes, so we just treat it as
* stdout. stderr is ignored. */
masterfd_stdout = console.fd;
masterfd_stderr = -1;
/* Clean up everything */
close(connfd);
return G_SOURCE_CONTINUE;
}
static void
runtime_exit_cb (GPid pid, int status, G_GNUC_UNUSED gpointer user_data)
{
runtime_status = status;
g_main_loop_quit (main_loop);
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
int ret, runtime_status; int ret;
char cwd[PATH_MAX]; char cwd[PATH_MAX];
char default_pid_file[PATH_MAX]; char default_pid_file[PATH_MAX];
char attach_sock_path[PATH_MAX]; char attach_sock_path[PATH_MAX];
@ -447,9 +687,6 @@ int main(int argc, char *argv[])
int cpid = -1; int cpid = -1;
int status; int status;
pid_t pid, create_pid; pid_t pid, create_pid;
_cleanup_close_ int logfd = -1;
_cleanup_close_ int masterfd_stdout = -1;
_cleanup_close_ int masterfd_stderr = -1;
_cleanup_close_ int epfd = -1; _cleanup_close_ int epfd = -1;
_cleanup_close_ int csfd = -1; _cleanup_close_ int csfd = -1;
/* Used for !terminal cases. */ /* Used for !terminal cases. */
@ -458,26 +695,17 @@ int main(int argc, char *argv[])
char csname[PATH_MAX] = "/tmp/conmon-term.XXXXXXXX"; char csname[PATH_MAX] = "/tmp/conmon-term.XXXXXXXX";
char buf[BUF_SIZE]; char buf[BUF_SIZE];
int num_read; int num_read;
struct epoll_event ev;
struct epoll_event evlist[MAX_EVENTS];
int sync_pipe_fd = -1; int sync_pipe_fd = -1;
char *sync_pipe, *endptr; char *sync_pipe, *endptr;
int len; int len;
int num_stdio_fds = 0;
GError *error = NULL; GError *error = NULL;
GOptionContext *context; GOptionContext *context;
GPtrArray *runtime_argv = NULL; GPtrArray *runtime_argv = NULL;
/* Used for OOM notification API */
_cleanup_close_ int efd = -1;
_cleanup_close_ int cfd = -1;
_cleanup_close_ int ofd = -1;
_cleanup_free_ char *memory_cgroup_path = NULL; _cleanup_free_ char *memory_cgroup_path = NULL;
int wb; int wb;
uint64_t oom_event;
/* Used for attach */ main_loop = g_main_loop_new (NULL, FALSE);
_cleanup_close_ int conn_sock = -1;
/* Command line parameters */ /* Command line parameters */
context = g_option_context_new("- conmon utility"); context = g_option_context_new("- conmon utility");
@ -668,50 +896,22 @@ int main(int argc, char *argv[])
close(slavefd_stdout); close(slavefd_stdout);
close(slavefd_stderr); close(slavefd_stderr);
/* Get the console fd. */
/*
* FIXME: If runc fails to start a container, we won't bail because we're
* busy waiting for requests. The solution probably involves
* epoll(2) and a signalfd(2). This causes a lot of issues.
*/
if (terminal) {
struct file_t console;
int connfd = -1;
ninfo("about to accept from csfd: %d", csfd);
connfd = accept4(csfd, NULL, NULL, SOCK_CLOEXEC);
if (connfd < 0)
pexit("Failed to accept console-socket connection");
/* Not accepting anything else. */
close(csfd);
unlink(csname);
/* We exit if this fails. */
ninfo("about to recvfd from connfd: %d", connfd);
console = recvfd(connfd);
ninfo("console = {.name = '%s'; .fd = %d}", console.name, console.fd);
free(console.name);
/* We only have a single fd for both pipes, so we just treat it as
* stdout. stderr is ignored. */
masterfd_stdout = console.fd;
masterfd_stderr = -1;
/* Clean up everything */
close(connfd);
}
ninfo("about to waitpid: %d", create_pid); ninfo("about to waitpid: %d", create_pid);
if (terminal) {
/* Wait for our create child to exit with the return code. */ guint terminal_watch = g_unix_fd_add (csfd, G_IO_IN, terminal_accept_cb, csname);
if (waitpid(create_pid, &runtime_status, 0) < 0) { g_child_watch_add (create_pid, runtime_exit_cb, NULL);
int old_errno = errno; g_main_loop_run (main_loop);
kill(create_pid, SIGKILL); g_source_remove (terminal_watch);
errno = old_errno; } else {
pexit("Failed to wait for `runtime %s`", exec ? "exec" : "create"); /* Wait for our create child to exit with the return code. */
if (waitpid(create_pid, &runtime_status, 0) < 0) {
int old_errno = errno;
kill(create_pid, SIGKILL);
errno = old_errno;
pexit("Failed to wait for `runtime %s`", exec ? "exec" : "create");
}
} }
if (!WIFEXITED(runtime_status) || WEXITSTATUS(runtime_status) != 0) { if (!WIFEXITED(runtime_status) || WEXITSTATUS(runtime_status) != 0) {
if (sync_pipe_fd > 0 && !exec) { if (sync_pipe_fd > 0 && !exec) {
if (terminal) { if (terminal) {
@ -746,6 +946,9 @@ int main(int argc, char *argv[])
nexit("Failed to create container: exit status %d", WEXITSTATUS(runtime_status)); nexit("Failed to create container: exit status %d", WEXITSTATUS(runtime_status));
} }
if (terminal && masterfd_stdout == -1)
pexit("Runtime did not set up terminal");
/* Read the pid so we can wait for the process to exit */ /* Read the pid so we can wait for the process to exit */
g_file_get_contents(pid_file, &contents, NULL, &err); g_file_get_contents(pid_file, &contents, NULL, &err);
if (err) { if (err) {
@ -760,7 +963,6 @@ int main(int argc, char *argv[])
/* Setup endpoint for attach */ /* Setup endpoint for attach */
char attach_symlink_dir_path[PATH_MAX]; char attach_symlink_dir_path[PATH_MAX];
struct sockaddr_un attach_addr = {0}; struct sockaddr_un attach_addr = {0};
_cleanup_close_ int afd = -1;
if (!exec) { if (!exec) {
attach_addr.sun_family = AF_UNIX; attach_addr.sun_family = AF_UNIX;
@ -804,15 +1006,6 @@ int main(int argc, char *argv[])
/* Setup fifo for reading in terminal resize and other stdio control messages */ /* Setup fifo for reading in terminal resize and other stdio control messages */
_cleanup_close_ int ctlfd = -1; _cleanup_close_ int ctlfd = -1;
_cleanup_close_ int dummyfd = -1; _cleanup_close_ int dummyfd = -1;
int ctl_msg_type = -1;
int height = -1;
int width = -1;
struct winsize ws;
#define CTLBUFSZ 200
char ctlbuf[CTLBUFSZ];
char *readptr = ctlbuf;
int readsz = CTLBUFSZ - 1;
int cp_rem = 0;
if (!exec) { if (!exec) {
snprintf(ctl_fifo_path, PATH_MAX, "%s/ctl", bundle_path); snprintf(ctl_fifo_path, PATH_MAX, "%s/ctl", bundle_path);
ninfo("ctl fifo path: %s", ctl_fifo_path); ninfo("ctl fifo path: %s", ctl_fifo_path);
@ -862,196 +1055,40 @@ int main(int argc, char *argv[])
if ((ofd = open(memory_cgroup_file_path, O_RDONLY | O_CLOEXEC)) == -1) if ((ofd = open(memory_cgroup_file_path, O_RDONLY | O_CLOEXEC)) == -1)
pexit("Failed to open %s", memory_cgroup_file_path); pexit("Failed to open %s", memory_cgroup_file_path);
if ((efd = eventfd(0, EFD_CLOEXEC)) == -1) if ((oom_efd = eventfd(0, EFD_CLOEXEC)) == -1)
pexit("Failed to create eventfd"); pexit("Failed to create eventfd");
wb = snprintf(buf, BUF_SIZE, "%d %d", efd, ofd); wb = snprintf(buf, BUF_SIZE, "%d %d", oom_efd, ofd);
if (write_all(cfd, buf, wb) < 0) if (write_all(cfd, buf, wb) < 0)
pexit("Failed to write to cgroup.event_control"); pexit("Failed to write to cgroup.event_control");
} }
/* Create epoll_ctl so that we can handle read/write events. */
/*
* TODO: Switch to libuv so that we can also implement exec as well as
* attach and other important things. Using epoll directly is just
* really nasty.
*/
epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0)
pexit("epoll_create");
ev.events = EPOLLIN;
if (masterfd_stdout >= 0) { if (masterfd_stdout >= 0) {
ev.data.fd = masterfd_stdout; g_unix_fd_add (masterfd_stdout, G_IO_IN, stdio_cb, GINT_TO_POINTER(STDOUT_PIPE));
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
pexit("Failed to add console masterfd_stdout to epoll");
num_stdio_fds++; num_stdio_fds++;
} }
if (masterfd_stderr >= 0) { if (masterfd_stderr >= 0) {
ev.data.fd = masterfd_stderr; g_unix_fd_add (masterfd_stderr, G_IO_IN, stdio_cb, GINT_TO_POINTER(STDERR_PIPE));
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
pexit("Failed to add console masterfd_stderr to epoll");
num_stdio_fds++; num_stdio_fds++;
} }
/* Add the OOM event fd to epoll */ /* Add the OOM event fd to epoll */
if (oom_handling_enabled) { if (oom_handling_enabled) {
ev.data.fd = efd; g_unix_fd_add (oom_efd, G_IO_IN, oom_cb, NULL);
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
pexit("Failed to add OOM eventfd to epoll");
} }
/* Add the attach socket to epoll */ /* Add the attach socket to epoll */
if (afd > 0) { if (afd > 0) {
ev.data.fd = afd; g_unix_fd_add (afd, G_IO_IN, attach_cb, NULL);
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
pexit("Failed to add attach socket fd to epoll");
} }
/* Add control fifo fd to epoll */ /* Add control fifo fd to epoll */
if (ctlfd > 0) { if (ctlfd > 0) {
ev.data.fd = ctlfd; g_unix_fd_add (ctlfd, G_IO_IN, ctrl_cb, NULL);
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
pexit("Failed to add control fifo fd to epoll");
} }
/* Log all of the container's output. */ g_main_loop_run (main_loop);
while (num_stdio_fds > 0) {
int ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1);
if (ready < 0)
continue;
for (int i = 0; i < ready; i++) {
if (evlist[i].events & EPOLLIN) {
int masterfd = evlist[i].data.fd;
stdpipe_t pipe = NO_PIPE;
if (masterfd == masterfd_stdout)
pipe = STDOUT_PIPE;
else if (masterfd == masterfd_stderr)
pipe = STDERR_PIPE;
else if (oom_handling_enabled && masterfd == efd) {
if (read(efd, &oom_event, sizeof(uint64_t)) != sizeof(uint64_t))
nwarn("Failed to read event from eventfd");
ninfo("OOM received");
if (open("oom", O_CREAT, 0666) < 0) {
nwarn("Failed to write oom file");
}
} else if (evlist[i].data.fd == afd) {
conn_sock = accept(afd, NULL, NULL);
if (conn_sock == -1) {
nwarn("Failed to accept client connection on attach socket");
continue;
}
ev.events = EPOLLIN;
ev.data.fd = conn_sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) {
pexit("Failed to add client socket fd to epoll");
}
ninfo("Accepted connection");
} else if (!exec && evlist[i].data.fd == ctlfd) {
num_read = read(ctlfd, readptr, readsz);
if (num_read <= 0) {
nwarn("Failed to read from control fd");
continue;
}
readptr[num_read] = '\0';
ninfo("Got ctl message: %s\n", ctlbuf);
char *beg = ctlbuf;
char *newline = strchrnul(beg, '\n');
/* Process each message which ends with a line */
while (*newline != '\0') {
ret = sscanf(ctlbuf, "%d %d %d\n", &ctl_msg_type, &height, &width);
if (ret != 3) {
nwarn("Failed to sscanf message");
continue;
}
ninfo("Message type: %d, Height: %d, Width: %d", ctl_msg_type, height, width);
ret = ioctl(masterfd_stdout, TIOCGWINSZ, &ws);
ninfo("Existing size: %d %d", ws.ws_row, ws.ws_col);
ws.ws_row = height;
ws.ws_col = width;
ret = ioctl(masterfd_stdout, TIOCSWINSZ, &ws);
if (ret == -1) {
nwarn("Failed to set process pty terminal size");
}
beg = newline + 1;
newline = strchrnul(beg, '\n');
}
if (num_read == (CTLBUFSZ - 1) && beg == ctlbuf) {
/*
* We did not find a newline in the entire buffer.
* This shouldn't happen as our buffer is larger than
* the message that we expect to receive.
*/
nwarn("Could not find newline in entire buffer\n");
} else if (*beg == '\0') {
/* We exhausted all messages that were complete */
readptr = ctlbuf;
readsz = CTLBUFSZ - 1;
} else {
/*
* We copy remaining data to beginning of buffer
* and advance readptr after that.
*/
cp_rem = 0;
do {
ctlbuf[cp_rem++] = *beg++;
} while (*beg != '\0');
readptr = ctlbuf + cp_rem;
readsz = CTLBUFSZ - 1 - cp_rem;
}
} else {
num_read = read(masterfd, buf, BUF_SIZE);
if (num_read == 0) {
ninfo("Remote socket closed");
close(conn_sock);
conn_sock = -1;
continue;
}
if (num_read < 0)
goto out;
ninfo("got data on connection: %d", num_read);
if (terminal) {
if (write_all(masterfd_stdout, buf, num_read) < 0) {
nwarn("Failed to write to master pty");
}
}
}
if (masterfd == masterfd_stdout || masterfd == masterfd_stderr) {
num_read = read(masterfd, buf, BUF_SIZE);
if (num_read <= 0)
goto out;
if (write_k8s_log(logfd, pipe, buf, num_read) < 0) {
nwarn("write_k8s_log failed");
goto out;
}
if (conn_sock > 0) {
if (write_all(conn_sock, buf, num_read) < 0) {
nwarn("Failed to write to socket");
}
}
}
} else if (evlist[i].events & (EPOLLHUP | EPOLLERR)) {
if (!exec && evlist[i].data.fd == ctlfd) {
ninfo("Remote writer to control fd closed");
continue;
}
printf("closing fd %d\n", evlist[i].data.fd);
if (close(evlist[i].data.fd) < 0)
pexit("close");
if (!exec && evlist[i].data.fd == conn_sock) {
conn_sock = -1;
continue;
}
num_stdio_fds--;
}
}
}
out:
/* Wait for the container process and record its exit code */ /* Wait for the container process and record its exit code */
while ((pid = waitpid(-1, &status, 0)) > 0) { while ((pid = waitpid(-1, &status, 0)) > 0) {
int exit_status = WEXITSTATUS(status); int exit_status = WEXITSTATUS(status);