diff --git a/conmon/conmon.c b/conmon/conmon.c index c1667697..930f8280 100644 --- a/conmon/conmon.c +++ b/conmon/conmon.c @@ -12,10 +12,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include @@ -23,6 +25,8 @@ #include "cmsg.h" +#include "pollset.h" + #define pexit(fmt, ...) \ do { \ fprintf(stderr, "[conmon:e]: " fmt " %m\n", ##__VA_ARGS__); \ @@ -40,11 +44,13 @@ #define nwarn(fmt, ...) \ do { \ fprintf(stderr, "[conmon:w]: " fmt "\n", ##__VA_ARGS__); \ + syslog(LOG_INFO, "conmon : " fmt " \n", ##__VA_ARGS__); \ } while (0) #define ninfo(fmt, ...) \ do { \ fprintf(stderr, "[conmon:i]: " fmt "\n", ##__VA_ARGS__); \ + syslog(LOG_INFO, "conmon : " fmt " \n", ##__VA_ARGS__); \ } while (0) #define _cleanup_(x) __attribute__((cleanup(x))) @@ -428,21 +434,182 @@ static char *escape_json_string(const char *str) return g_string_free (escaped, FALSE); } +/* Global state */ + +static int masterfd_stdout = -1; +static int masterfd_stderr = -1; +static int num_stdio_fds = 0; + +/* Used for attach */ +static int conn_sock = -1; + +static int logfd = -1; +static int efd = -1; +static int cfd = -1; +/* Used for OOM notification API */ +static int ofd = -1; + + +static void stdio_input (G_GNUC_UNUSED polling_set_t *set, int fd, gpointer user_data) +{ + char buf[BUF_SIZE]; + stdpipe_t pipe = GPOINTER_TO_INT(user_data); + ssize_t num_read; + + num_read = read(fd, buf, BUF_SIZE); + if (num_read <= 0) { + nwarn("stdio_input read failed"); + return; + } + + if (write_k8s_log(logfd, pipe, buf, num_read) < 0) { + nwarn("write_k8s_log failed"); + return; + } + + if (conn_sock > 0) { + if (write_all(conn_sock, buf, num_read) < 0) { + nwarn("Failed to write to socket"); + } + } +} + +static bool stdio_error (G_GNUC_UNUSED polling_set_t *set, int fd, gpointer user_data) +{ + stdpipe_t pipe = GPOINTER_TO_INT(user_data); + printf("closing stdio fd %d\n", fd); + if (pipe == STDOUT_PIPE) + masterfd_stdout = -1; + if (pipe == STDERR_PIPE) + masterfd_stderr = -1; + num_stdio_fds--; + return true; +} + +static void oom_input (G_GNUC_UNUSED polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data) +{ + uint64_t oom_event; + + if (read(fd, &oom_event, sizeof(uint64_t)) != sizeof(uint64_t)) + nwarn("Failed to read event from eventfd"); + ninfo("OOM received"); + if (open("oom", O_CREAT, 0666) < 0) { + nwarn("Failed to write oom file"); + } +} + +static void conn_sock_input (G_GNUC_UNUSED polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data) +{ + ssize_t num_read; + char buf[BUF_SIZE]; + + num_read = read(fd, buf, BUF_SIZE); + if (num_read <= 0) + return; + ninfo("got data on connection: %zd", num_read); + if (terminal) { + if (write_all(masterfd_stdout, buf, num_read) < 0) { + nwarn("Failed to write to master pty"); + } + } +} + +static bool conn_sock_error (G_GNUC_UNUSED polling_set_t *set, G_GNUC_UNUSED int fd, G_GNUC_UNUSED gpointer user_data) +{ + conn_sock = -1; + return true; +} + +static void attach_input (polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data) +{ + conn_sock = accept(fd, NULL, NULL); + if (conn_sock == -1) { + nwarn("Failed to accept client connection on attach socket"); + return; + } + if (polling_set_add_fd(set, conn_sock, conn_sock_input, conn_sock_error, NULL) < 0) + pexit("Failed to add client socket fd to epoll"); + ninfo("Accepted connection"); +} + +static void ctl_input (G_GNUC_UNUSED polling_set_t *set, int fd, G_GNUC_UNUSED gpointer user_data) +{ + #define CTLBUFSZ 200 + static char ctlbuf[CTLBUFSZ]; + static int readsz = CTLBUFSZ - 1; + static char *readptr = ctlbuf; + ssize_t num_read; + int ctl_msg_type = -1; + int height = -1; + int width = -1; + struct winsize ws; + int ret; + + num_read = read(fd, readptr, readsz); + if (num_read <= 0) { + nwarn("Failed to read from control fd"); + return; + } + readptr[num_read] = '\0'; + ninfo("Got ctl message: %s\n", ctlbuf); + + char *beg = ctlbuf; + char *newline = strchrnul(beg, '\n'); + /* Process each message which ends with a line */ + while (*newline != '\0') { + ret = sscanf(ctlbuf, "%d %d %d\n", &ctl_msg_type, &height, &width); + if (ret != 3) { + nwarn("Failed to sscanf message"); + continue; + } + ninfo("Message type: %d, Height: %d, Width: %d", ctl_msg_type, height, width); + ret = ioctl(masterfd_stdout, TIOCGWINSZ, &ws); + ninfo("Existing size: %d %d", ws.ws_row, ws.ws_col); + ws.ws_row = height; + ws.ws_col = width; + ret = ioctl(masterfd_stdout, TIOCSWINSZ, &ws); + if (ret == -1) { + nwarn("Failed to set process pty terminal size"); + } + beg = newline + 1; + newline = strchrnul(beg, '\n'); + } + if (*beg == '\0') { + /* We exhausted all messages that were complete */ + readptr = ctlbuf; + readsz = CTLBUFSZ - 1; + } else { + /* + * We copy remaining data to beginning of buffer + * and advance readptr after that. + */ + int cp_rem = 0; + do { + ctlbuf[cp_rem++] = *beg++; + } while (*beg != '\0'); + readptr = ctlbuf + cp_rem; + readsz = CTLBUFSZ - 1 - cp_rem; + } +} + +static bool ctl_error (G_GNUC_UNUSED polling_set_t *set, G_GNUC_UNUSED int fd, G_GNUC_UNUSED gpointer user_data) +{ + ninfo("Remote writer to control fd closed"); + return false; /* Don't remove */ +} int main(int argc, char *argv[]) { int ret, runtime_status; char cwd[PATH_MAX]; char default_pid_file[PATH_MAX]; + char attach_sock_path[PATH_MAX]; + char ctl_fifo_path[PATH_MAX]; GError *err = NULL; _cleanup_free_ char *contents; int cpid = -1; int status; pid_t pid, create_pid; - _cleanup_close_ int logfd = -1; - _cleanup_close_ int masterfd_stdout = -1; - _cleanup_close_ int masterfd_stderr = -1; - _cleanup_close_ int epfd = -1; _cleanup_close_ int csfd = -1; /* Used for !terminal cases. */ int slavefd_stdout = -1; @@ -450,23 +617,17 @@ int main(int argc, char *argv[]) char csname[PATH_MAX] = "/tmp/conmon-term.XXXXXXXX"; char buf[BUF_SIZE]; int num_read; - struct epoll_event ev; - struct epoll_event evlist[MAX_EVENTS]; + _cleanup_pollset_ polling_set_t pollset = POLLING_SET_INIT; int sync_pipe_fd = -1; char *sync_pipe, *endptr; int len; - int num_stdio_fds = 0; GError *error = NULL; GOptionContext *context; GPtrArray *runtime_argv = NULL; - /* Used for OOM notification API */ - _cleanup_close_ int efd = -1; - _cleanup_close_ int cfd = -1; - _cleanup_close_ int ofd = -1; _cleanup_free_ char *memory_cgroup_path = NULL; int wb; - uint64_t oom_event; + /* Command line parameters */ context = g_option_context_new("- conmon utility"); @@ -551,7 +712,7 @@ int main(int argc, char *argv[]) csfd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); if (csfd < 0) pexit("Failed to create console-socket"); - if (fchmod(csfd, 0700)) + if (fchmod(csfd, 0700)) pexit("Failed to change console-socket permissions"); /* XXX: This should be handled with a rename(2). */ if (unlink(csname) < 0) @@ -743,6 +904,72 @@ int main(int argc, char *argv[]) cpid = atoi(contents); ninfo("container PID: %d", cpid); + /* Setup endpoint for attach */ + char attach_symlink_dir_path[PATH_MAX]; + struct sockaddr_un attach_addr = {0}; + _cleanup_close_ int afd = -1; + + if (!exec) { + attach_addr.sun_family = AF_UNIX; + + /* + * Create a symlink so we don't exceed unix domain socket + * path length limit. + */ + snprintf(attach_symlink_dir_path, PATH_MAX, "/run/%s", cid); + if (symlink(bundle_path, attach_symlink_dir_path) == -1) + pexit("Failed to create symlink for attach socket"); + + snprintf(attach_sock_path, PATH_MAX, "/run/%s/attach", cid); + ninfo("attach sock path: %s", attach_sock_path); + + strncpy(attach_addr.sun_path, attach_sock_path, sizeof(attach_addr.sun_path) - 1); + ninfo("addr{sun_family=AF_UNIX, sun_path=%s}", attach_addr.sun_path); + + /* + * We make the socket non-blocking to avoid a race where client aborts connection + * before the server gets a chance to call accept. In that scenario, the server + * accept blocks till a new client connection comes in. + */ + afd = socket(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); + if (afd == -1) + pexit("Failed to create attach socket"); + + if (fchmod(afd, 0700)) + pexit("Failed to change attach socket permissions"); + + if (bind(afd, (struct sockaddr *)&attach_addr, sizeof(struct sockaddr_un)) == -1) + pexit("Failed to bind attach socket: %s", attach_sock_path); + + if (listen(afd, 10) == -1) + pexit("Failed to listen on attach socket: %s", attach_sock_path); + } + + /* Setup fifo for reading in terminal resize and other stdio control messages */ + _cleanup_close_ int ctlfd = -1; + _cleanup_close_ int dummyfd = -1; + if (!exec) { + snprintf(ctl_fifo_path, PATH_MAX, "%s/ctl", bundle_path); + ninfo("ctl fifo path: %s", ctl_fifo_path); + + if (mkfifo(ctl_fifo_path, 0666) == -1) + pexit("Failed to mkfifo at %s", ctl_fifo_path); + + ctlfd = open(ctl_fifo_path, O_RDONLY|O_NONBLOCK|O_CLOEXEC); + if (ctlfd == -1) + pexit("Failed to open control fifo"); + + /* + * Open a dummy writer to prevent getting flood of POLLHUPs when + * last writer closes. + */ + dummyfd = open(ctl_fifo_path, O_WRONLY|O_CLOEXEC); + if (dummyfd == -1) + pexit("Failed to open dummy writer for fifo"); + + ninfo("ctlfd: %d", ctlfd); + } + /* Send the container pid back to parent */ if (sync_pipe_fd > 0 && !exec) { len = snprintf(buf, BUF_SIZE, "{\"pid\": %d}\n", cpid); @@ -778,83 +1005,43 @@ int main(int argc, char *argv[]) pexit("Failed to write to cgroup.event_control"); } - /* Create epoll_ctl so that we can handle read/write events. */ - /* - * TODO: Switch to libuv so that we can also implement exec as well as - * attach and other important things. Using epoll directly is just - * really nasty. - */ - epfd = epoll_create1(EPOLL_CLOEXEC); - if (epfd < 0) - pexit("epoll_create"); - ev.events = EPOLLIN; + if (polling_set_init(&pollset) < 0) + pexit("polling_set_init"); + if (masterfd_stdout >= 0) { - ev.data.fd = masterfd_stdout; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) + if (polling_set_add_fd(&pollset, masterfd_stdout, stdio_input, stdio_error, GINT_TO_POINTER(STDOUT_PIPE)) < 0) pexit("Failed to add console masterfd_stdout to epoll"); num_stdio_fds++; } if (masterfd_stderr >= 0) { - ev.data.fd = masterfd_stderr; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) - pexit("Failed to add console masterfd_stderr to epoll"); + if (polling_set_add_fd(&pollset, masterfd_stderr, stdio_input, stdio_error, GINT_TO_POINTER(STDERR_PIPE)) < 0) + pexit("Failed to add console masterfd_stdout to epoll"); num_stdio_fds++; } /* Add the OOM event fd to epoll */ if (oom_handling_enabled) { - ev.data.fd = efd; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) + if (polling_set_add_fd(&pollset, efd, oom_input, NULL, NULL) < 0) pexit("Failed to add OOM eventfd to epoll"); } + /* Add the attach socket to epoll */ + if (afd > 0) { + if (polling_set_add_fd(&pollset, afd, attach_input, NULL, NULL) < 0) + pexit("Failed to add attach socket fd to epoll"); + } + + /* Add control fifo fd to epoll */ + if (ctlfd > 0) { + if (polling_set_add_fd(&pollset, ctlfd, ctl_input, ctl_error, NULL) < 0) + pexit("Failed to add control fifo fd to epoll"); + } + /* Log all of the container's output. */ while (num_stdio_fds > 0) { - int ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1); - if (ready < 0) - continue; - - for (int i = 0; i < ready; i++) { - if (evlist[i].events & EPOLLIN) { - int masterfd = evlist[i].data.fd; - stdpipe_t pipe = NO_PIPE; - if (masterfd == masterfd_stdout) - pipe = STDOUT_PIPE; - else if (masterfd == masterfd_stderr) - pipe = STDERR_PIPE; - else if (oom_handling_enabled && masterfd == efd) { - if (read(efd, &oom_event, sizeof(uint64_t)) != sizeof(uint64_t)) - nwarn("Failed to read event from eventfd"); - ninfo("OOM received"); - if (open("oom", O_CREAT, 0666) < 0) { - nwarn("Failed to write oom file"); - } - } - else { - nwarn("unknown pipe fd"); - goto out; - } - - if (masterfd == masterfd_stdout || masterfd == masterfd_stderr) { - num_read = read(masterfd, buf, BUF_SIZE); - if (num_read <= 0) - goto out; - - if (write_k8s_log(logfd, pipe, buf, num_read) < 0) { - nwarn("write_k8s_log failed"); - goto out; - } - } - } else if (evlist[i].events & (EPOLLHUP | EPOLLERR)) { - printf("closing fd %d\n", evlist[i].data.fd); - if (close(evlist[i].data.fd) < 0) - pexit("close"); - num_stdio_fds--; - } - } + polling_set_iterate(&pollset); } -out: /* Wait for the container process and record its exit code */ while ((pid = waitpid(-1, &status, 0)) > 0) { int exit_status = WEXITSTATUS(status); @@ -904,5 +1091,11 @@ out: } } + if (!exec) { + if (unlink(attach_symlink_dir_path) == -1 && errno != ENOENT) { + pexit("Failed to remove symlink"); + } + } + return EXIT_SUCCESS; } diff --git a/conmon/pollset.c b/conmon/pollset.c new file mode 100644 index 00000000..851fe3e6 --- /dev/null +++ b/conmon/pollset.c @@ -0,0 +1,103 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include + +#include "pollset.h" + +#define MAX_EVENTS 10 + +typedef struct _polling_fd_t polling_fd_t; +struct _polling_fd_t { + int fd; + polling_set_input_cb input_cb; + polling_set_error_cb error_cb; + gpointer user_data; +}; + +static void polling_fd_free(polling_fd_t *polling_fd) +{ + if (polling_fd->fd >= 0) + close(polling_fd->fd); + g_free(polling_fd); +} + +int polling_set_init(polling_set_t *set) +{ + set->epfd = epoll_create1(EPOLL_CLOEXEC); + if (set->epfd < 0) + return -1; + set->fd_hash = g_hash_table_new_full(g_direct_hash, g_direct_equal, + NULL, (GDestroyNotify)polling_fd_free); + return 0; +} + +void polling_set_destroy(polling_set_t *set) +{ + if (set->epfd >= 0) + close(set->epfd); + if (set->fd_hash) + g_hash_table_destroy(set->fd_hash); +} + +int polling_set_add_fd(polling_set_t *set, int fd, polling_set_input_cb input_cb, polling_set_error_cb error_cb, gpointer user_data) +{ + struct epoll_event ev; + polling_fd_t *polling_fd; + + ev.events = EPOLLIN; + ev.data.fd = fd; + + if (epoll_ctl(set->epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) + return -1; + + polling_fd = g_new0(polling_fd_t, 1); + polling_fd->fd = fd; + polling_fd->input_cb = input_cb; + polling_fd->error_cb = error_cb; + polling_fd->user_data = user_data; + + g_hash_table_insert(set->fd_hash, GINT_TO_POINTER(fd), polling_fd); + return 0; +} + +void polling_set_remove_fd(polling_set_t *set, int fd) +{ + g_hash_table_remove(set->fd_hash, GINT_TO_POINTER(fd)); +} + +int polling_set_iterate(polling_set_t *set) +{ + struct epoll_event evlist[MAX_EVENTS]; + int ready; + + do { + ready = epoll_wait(set->epfd, evlist, MAX_EVENTS, -1); + } while (ready == -1 && errno == EINTR); + + if (ready < 0) + return -1; + + for (int i = 0; i < ready; i++) { + int fd = evlist[i].data.fd; + polling_fd_t *polling_fd = g_hash_table_lookup(set->fd_hash, GINT_TO_POINTER(fd)); + + if (polling_fd == NULL) + continue; + + if (evlist[i].events & EPOLLIN && + polling_fd->input_cb) + polling_fd->input_cb(set, fd, polling_fd->user_data); + else if (evlist[i].events & (EPOLLHUP | EPOLLERR)) { + bool remove = true; + if (polling_fd->error_cb) + remove = polling_fd->error_cb(set, fd, polling_fd->user_data); + if (remove) + polling_set_remove_fd(set, fd); + } + } + + return 0; +} diff --git a/conmon/pollset.h b/conmon/pollset.h new file mode 100644 index 00000000..d37d311f --- /dev/null +++ b/conmon/pollset.h @@ -0,0 +1,22 @@ +#pragma once + +#include + +typedef struct _polling_set_t polling_set_t; +struct _polling_set_t { + int epfd; + GHashTable *fd_hash; +}; + +#define POLLING_SET_INIT { -1, NULL } + +typedef void (*polling_set_input_cb) (polling_set_t *set, int fd, gpointer user_data); +typedef bool (*polling_set_error_cb) (polling_set_t *set, int fd, gpointer user_data); + +int polling_set_init(polling_set_t *set); +void polling_set_destroy(polling_set_t *set); +int polling_set_add_fd(polling_set_t *set, int fd, polling_set_input_cb input_cb, polling_set_error_cb error_cb, gpointer user_data); + +int polling_set_iterate(polling_set_t *set); + +#define _cleanup_pollset_ __attribute__((cleanup(polling_set_destroy))) diff --git a/server/container_attach.go b/server/container_attach.go index 96e2676b..d4fe9d26 100644 --- a/server/container_attach.go +++ b/server/container_attach.go @@ -1,11 +1,111 @@ package server import ( + "fmt" + "io" + "io/ioutil" + "net" + "os" + "path/filepath" + "syscall" + + "github.com/Sirupsen/logrus" + "github.com/kubernetes-incubator/cri-o/oci" "golang.org/x/net/context" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/term" ) // Attach prepares a streaming endpoint to attach to a running container. func (s *Server) Attach(ctx context.Context, req *pb.AttachRequest) (*pb.AttachResponse, error) { - return nil, nil + logrus.Debugf("AttachRequest %+v", req) + + resp, err := s.GetAttach(req) + if err != nil { + return nil, fmt.Errorf("unable to prepare attach endpoint") + } + + return resp, nil +} + +// Attach endpoint for streaming.Runtime +func (ss streamService) Attach(containerID string, inputStream io.Reader, outputStream, errorStream io.WriteCloser, tty bool, resize <-chan term.Size) error { + c := ss.runtimeServer.GetContainer(containerID) + + if c == nil { + return fmt.Errorf("could not find container %q", containerID) + } + + if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil { + return err + } + + cState := ss.runtimeServer.runtime.ContainerStatus(c) + if !(cState.Status == oci.ContainerStateRunning || cState.Status == oci.ContainerStateCreated) { + return fmt.Errorf("container is not created or running") + } + + controlPath := filepath.Join(c.BundlePath(), "ctl") + controlFile, err := os.OpenFile(controlPath, syscall.O_WRONLY, 0) + if err != nil { + return fmt.Errorf("failed to open container ctl file: %v", err) + } + + kubecontainer.HandleResizing(resize, func(size term.Size) { + logrus.Infof("Got a resize event: %+v", size) + _, err := fmt.Fprintf(controlFile, "%d %d %d\n", 1, size.Height, size.Width) + if err != nil { + logrus.Infof("Failed to write to control file to resize terminal: %v", err) + } + }) + + attachSocketPath := filepath.Join("/run", c.Name(), "attach") + conn, err := net.Dial("unix", attachSocketPath) + if err != nil { + return fmt.Errorf("failed to connect to container %s attach socket: %v", c.ID(), err) + } + defer conn.Close() + + receiveStdout := make(chan error) + if outputStream != nil || errorStream != nil { + go func() { + receiveStdout <- redirectResponseToOutputStream(tty, outputStream, errorStream, conn) + }() + } + + stdinDone := make(chan struct{}) + go func() { + if inputStream != nil { + io.Copy(conn, inputStream) + } + close(stdinDone) + }() + + select { + case err := <-receiveStdout: + return err + case <-stdinDone: + if outputStream != nil || errorStream != nil { + return <-receiveStdout + } + } + + return nil +} + +func redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, conn io.Reader) error { + if outputStream == nil { + outputStream = ioutil.Discard + } + if errorStream == nil { + // errorStream = ioutil.Discard + } + var err error + if tty { + _, err = io.Copy(outputStream, conn) + } else { + // TODO + } + return err }