conmon: Use polling_fd_set helper

This commit is contained in:
Alexander Larsson 2017-06-05 13:14:54 +02:00
parent 22b78314e6
commit 5fc1b0629c

View file

@ -25,6 +25,8 @@
#include "cmsg.h"
#include "pollset.h"
#define pexit(fmt, ...) \
do { \
fprintf(stderr, "[conmon:e]: " fmt " %m\n", ##__VA_ARGS__); \
@ -432,6 +434,169 @@ static char *escape_json_string(const char *str)
return g_string_free (escaped, FALSE);
}
/* Global state */
static int masterfd_stdout = -1;
static int masterfd_stderr = -1;
static int num_stdio_fds = 0;
/* Used for attach */
static int conn_sock = -1;
static int logfd = -1;
static int efd = -1;
static int cfd = -1;
/* Used for OOM notification API */
static int ofd = -1;
static void stdio_input (G_GNUC_UNUSED polling_set_t *set, int fd, gpointer user_data)
{
char buf[BUF_SIZE];
stdpipe_t pipe = GPOINTER_TO_INT(user_data);
ssize_t num_read;
num_read = read(fd, buf, BUF_SIZE);
if (num_read <= 0) {
nwarn("stdio_input read failed");
return;
}
if (write_k8s_log(logfd, pipe, buf, num_read) < 0) {
nwarn("write_k8s_log failed");
return;
}
if (conn_sock > 0) {
if (write_all(conn_sock, buf, num_read) < 0) {
nwarn("Failed to write to socket");
}
}
}
static bool stdio_error (G_GNUC_UNUSED polling_set_t *set, int fd, gpointer user_data)
{
stdpipe_t pipe = GPOINTER_TO_INT(user_data);
printf("closing stdio fd %d\n", fd);
if (pipe == STDOUT_PIPE)
masterfd_stdout = -1;
if (pipe == STDERR_PIPE)
masterfd_stderr = -1;
num_stdio_fds--;
return true;
}
static void oom_input (G_GNUC_UNUSED polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data)
{
uint64_t oom_event;
if (read(fd, &oom_event, sizeof(uint64_t)) != sizeof(uint64_t))
nwarn("Failed to read event from eventfd");
ninfo("OOM received");
if (open("oom", O_CREAT, 0666) < 0) {
nwarn("Failed to write oom file");
}
}
static void conn_sock_input (G_GNUC_UNUSED polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data)
{
ssize_t num_read;
char buf[BUF_SIZE];
num_read = read(fd, buf, BUF_SIZE);
if (num_read <= 0)
return;
ninfo("got data on connection: %zd", num_read);
if (terminal) {
if (write_all(masterfd_stdout, buf, num_read) < 0) {
nwarn("Failed to write to master pty");
}
}
}
static bool conn_sock_error (G_GNUC_UNUSED polling_set_t *set, G_GNUC_UNUSED int fd, G_GNUC_UNUSED gpointer user_data)
{
conn_sock = -1;
return true;
}
static void attach_input (polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data)
{
conn_sock = accept(fd, NULL, NULL);
if (conn_sock == -1) {
nwarn("Failed to accept client connection on attach socket");
return;
}
if (polling_set_add_fd(set, conn_sock, conn_sock_input, conn_sock_error, NULL) < 0)
pexit("Failed to add client socket fd to epoll");
ninfo("Accepted connection");
}
static void ctl_input (G_GNUC_UNUSED polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data)
{
#define CTLBUFSZ 200
static char ctlbuf[CTLBUFSZ];
static int readsz = CTLBUFSZ - 1;
static char *readptr = ctlbuf;
ssize_t num_read;
int ctl_msg_type = -1;
int height = -1;
int width = -1;
struct winsize ws;
int ret;
num_read = read(fd, readptr, readsz);
if (num_read <= 0) {
nwarn("Failed to read from control fd");
return;
}
readptr[num_read] = '\0';
ninfo("Got ctl message: %s\n", ctlbuf);
char *beg = ctlbuf;
char *newline = strchrnul(beg, '\n');
/* Process each message which ends with a line */
while (*newline != '\0') {
ret = sscanf(ctlbuf, "%d %d %d\n", &ctl_msg_type, &height, &width);
if (ret != 3) {
nwarn("Failed to sscanf message");
continue;
}
ninfo("Message type: %d, Height: %d, Width: %d", ctl_msg_type, height, width);
ret = ioctl(masterfd_stdout, TIOCGWINSZ, &ws);
ninfo("Existing size: %d %d", ws.ws_row, ws.ws_col);
ws.ws_row = height;
ws.ws_col = width;
ret = ioctl(masterfd_stdout, TIOCSWINSZ, &ws);
if (ret == -1) {
nwarn("Failed to set process pty terminal size");
}
beg = newline + 1;
newline = strchrnul(beg, '\n');
}
if (*beg == '\0') {
/* We exhausted all messages that were complete */
readptr = ctlbuf;
readsz = CTLBUFSZ - 1;
} else {
/*
* We copy remaining data to beginning of buffer
* and advance readptr after that.
*/
int cp_rem = 0;
do {
ctlbuf[cp_rem++] = *beg++;
} while (*beg != '\0');
readptr = ctlbuf + cp_rem;
readsz = CTLBUFSZ - 1 - cp_rem;
}
}
static bool ctl_error (G_GNUC_UNUSED polling_set_t *set, G_GNUC_UNUSED int fd, G_GNUC_UNUSED gpointer user_data)
{
ninfo("Remote writer to control fd closed");
return false; /* Don't remove */
}
int main(int argc, char *argv[])
{
@ -445,10 +610,6 @@ int main(int argc, char *argv[])
int cpid = -1;
int status;
pid_t pid, create_pid;
_cleanup_close_ int logfd = -1;
_cleanup_close_ int masterfd_stdout = -1;
_cleanup_close_ int masterfd_stderr = -1;
_cleanup_close_ int epfd = -1;
_cleanup_close_ int csfd = -1;
/* Used for !terminal cases. */
int slavefd_stdout = -1;
@ -456,26 +617,17 @@ int main(int argc, char *argv[])
char csname[PATH_MAX] = "/tmp/conmon-term.XXXXXXXX";
char buf[BUF_SIZE];
int num_read;
struct epoll_event ev;
struct epoll_event evlist[MAX_EVENTS];
_cleanup_pollset_ polling_set_t pollset = POLLING_SET_INIT;
int sync_pipe_fd = -1;
char *sync_pipe, *endptr;
int len;
int num_stdio_fds = 0;
GError *error = NULL;
GOptionContext *context;
GPtrArray *runtime_argv = NULL;
/* Used for OOM notification API */
_cleanup_close_ int efd = -1;
_cleanup_close_ int cfd = -1;
_cleanup_close_ int ofd = -1;
_cleanup_free_ char *memory_cgroup_path = NULL;
int wb;
uint64_t oom_event;
/* Used for attach */
_cleanup_close_ int conn_sock = -1;
/* Command line parameters */
context = g_option_context_new("- conmon utility");
@ -796,15 +948,6 @@ int main(int argc, char *argv[])
/* Setup fifo for reading in terminal resize and other stdio control messages */
_cleanup_close_ int ctlfd = -1;
_cleanup_close_ int dummyfd = -1;
int ctl_msg_type = -1;
int height = -1;
int width = -1;
struct winsize ws;
#define CTLBUFSZ 200
char ctlbuf[CTLBUFSZ];
char *readptr = ctlbuf;
int readsz = CTLBUFSZ - 1;
int cp_rem = 0;
if (!exec) {
snprintf(ctl_fifo_path, PATH_MAX, "%s/ctl", bundle_path);
ninfo("ctl fifo path: %s", ctl_fifo_path);
@ -862,174 +1005,43 @@ int main(int argc, char *argv[])
pexit("Failed to write to cgroup.event_control");
}
/* Create epoll_ctl so that we can handle read/write events. */
/*
* TODO: Switch to libuv so that we can also implement exec as well as
* attach and other important things. Using epoll directly is just
* really nasty.
*/
epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0)
pexit("epoll_create");
ev.events = EPOLLIN;
if (polling_set_init(&pollset) < 0)
pexit("polling_set_init");
if (masterfd_stdout >= 0) {
ev.data.fd = masterfd_stdout;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
if (polling_set_add_fd(&pollset, masterfd_stdout, stdio_input, stdio_error, GINT_TO_POINTER(STDOUT_PIPE)) < 0)
pexit("Failed to add console masterfd_stdout to epoll");
num_stdio_fds++;
}
if (masterfd_stderr >= 0) {
ev.data.fd = masterfd_stderr;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
pexit("Failed to add console masterfd_stderr to epoll");
if (polling_set_add_fd(&pollset, masterfd_stderr, stdio_input, stdio_error, GINT_TO_POINTER(STDERR_PIPE)) < 0)
pexit("Failed to add console masterfd_stdout to epoll");
num_stdio_fds++;
}
/* Add the OOM event fd to epoll */
if (oom_handling_enabled) {
ev.data.fd = efd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
if (polling_set_add_fd(&pollset, efd, oom_input, NULL, NULL) < 0)
pexit("Failed to add OOM eventfd to epoll");
}
/* Add the attach socket to epoll */
if (afd > 0) {
ev.data.fd = afd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
if (polling_set_add_fd(&pollset, afd, attach_input, NULL, NULL) < 0)
pexit("Failed to add attach socket fd to epoll");
}
/* Add control fifo fd to epoll */
if (ctlfd > 0) {
ev.data.fd = ctlfd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
if (polling_set_add_fd(&pollset, ctlfd, ctl_input, ctl_error, NULL) < 0)
pexit("Failed to add control fifo fd to epoll");
}
/* Log all of the container's output. */
while (num_stdio_fds > 0) {
int ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1);
if (ready < 0)
continue;
for (int i = 0; i < ready; i++) {
if (evlist[i].events & EPOLLIN) {
int masterfd = evlist[i].data.fd;
stdpipe_t pipe = NO_PIPE;
if (masterfd == masterfd_stdout)
pipe = STDOUT_PIPE;
else if (masterfd == masterfd_stderr)
pipe = STDERR_PIPE;
else if (oom_handling_enabled && masterfd == efd) {
if (read(efd, &oom_event, sizeof(uint64_t)) != sizeof(uint64_t))
nwarn("Failed to read event from eventfd");
ninfo("OOM received");
if (open("oom", O_CREAT, 0666) < 0) {
nwarn("Failed to write oom file");
}
} else if (evlist[i].data.fd == afd) {
conn_sock = accept(afd, NULL, NULL);
if (conn_sock == -1) {
nwarn("Failed to accept client connection on attach socket");
continue;
}
ev.events = EPOLLIN;
ev.data.fd = conn_sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) {
pexit("Failed to add client socket fd to epoll");
}
ninfo("Accepted connection");
} else if (!exec && evlist[i].data.fd == ctlfd) {
num_read = read(ctlfd, readptr, readsz);
if (num_read <= 0) {
nwarn("Failed to read from control fd");
continue;
}
ctlbuf[num_read] = '\0';
ninfo("Got ctl message: %s\n", ctlbuf);
char *beg = ctlbuf;
char *newline = strchrnul(beg, '\n');
/* Process each message which ends with a line */
while (*newline != '\0') {
ret = sscanf(ctlbuf, "%d %d %d\n", &ctl_msg_type, &height, &width);
if (ret != 3) {
nwarn("Failed to sscanf message");
continue;
}
ninfo("Message type: %d, Height: %d, Width: %d", ctl_msg_type, height, width);
ret = ioctl(masterfd_stdout, TIOCGWINSZ, &ws);
ninfo("Existing size: %d %d", ws.ws_row, ws.ws_col);
ws.ws_row = height;
ws.ws_col = width;
ret = ioctl(masterfd_stdout, TIOCSWINSZ, &ws);
if (ret == -1) {
nwarn("Failed to set process pty terminal size");
}
beg = newline + 1;
newline = strchrnul(beg, '\n');
}
if (*beg == '\0') {
/* We exhausted all messages that were complete */
readptr = ctlbuf;
readsz = CTLBUFSZ - 1;
} else {
/*
* We copy remaining data to beginning of buffer
* and advance readptr after that.
*/
cp_rem = 0;
do {
ctlbuf[cp_rem++] = *beg++;
} while (*beg != '\0');
readptr = ctlbuf + cp_rem;
readsz = CTLBUFSZ - 1 - cp_rem;
}
} else {
num_read = read(masterfd, buf, BUF_SIZE);
if (num_read <= 0)
goto out;
ninfo("got data on connection: %d", num_read);
if (terminal) {
if (write_all(masterfd_stdout, buf, num_read) < 0) {
nwarn("Failed to write to master pty");
}
}
polling_set_iterate(&pollset);
}
if (masterfd == masterfd_stdout || masterfd == masterfd_stderr) {
num_read = read(masterfd, buf, BUF_SIZE);
if (num_read <= 0)
goto out;
if (write_k8s_log(logfd, pipe, buf, num_read) < 0) {
nwarn("write_k8s_log failed");
goto out;
}
if (conn_sock > 0) {
if (write_all(conn_sock, buf, num_read) < 0) {
nwarn("Failed to write to socket");
}
}
}
} else if (evlist[i].events & (EPOLLHUP | EPOLLERR)) {
if (!exec && evlist[i].data.fd == ctlfd) {
ninfo("Remote writer to control fd closed");
continue;
}
printf("closing fd %d\n", evlist[i].data.fd);
if (close(evlist[i].data.fd) < 0)
pexit("close");
if (!exec && evlist[i].data.fd == conn_sock) {
conn_sock = -1;
}
num_stdio_fds--;
}
}
}
out:
/* Wait for the container process and record its exit code */
while ((pid = waitpid(-1, &status, 0)) > 0) {
int exit_status = WEXITSTATUS(status);