From 5c383d13d25a328367c6f632104cf992d8b8d959 Mon Sep 17 00:00:00 2001 From: Mrunal Patel Date: Tue, 30 May 2017 14:30:45 -0700 Subject: [PATCH 1/4] conmon: Add info/warn to syslog as well Signed-off-by: Mrunal Patel --- conmon/conmon.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/conmon/conmon.c b/conmon/conmon.c index c1667697..cc42903b 100644 --- a/conmon/conmon.c +++ b/conmon/conmon.c @@ -40,11 +40,13 @@ #define nwarn(fmt, ...) \ do { \ fprintf(stderr, "[conmon:w]: " fmt "\n", ##__VA_ARGS__); \ + syslog(LOG_INFO, "conmon : " fmt " \n", ##__VA_ARGS__); \ } while (0) #define ninfo(fmt, ...) \ do { \ fprintf(stderr, "[conmon:i]: " fmt "\n", ##__VA_ARGS__); \ + syslog(LOG_INFO, "conmon : " fmt " \n", ##__VA_ARGS__); \ } while (0) #define _cleanup_(x) __attribute__((cleanup(x))) From 065f12490ca6542d2f3e0c3fcd73e15954adc9c1 Mon Sep 17 00:00:00 2001 From: Mrunal Patel Date: Tue, 30 May 2017 14:35:12 -0700 Subject: [PATCH 2/4] conmon: Add unix domain socket for attach Signed-off-by: Mrunal Patel --- conmon/conmon.c | 104 ++++++++++++++++++++++++++++++++++++++++++++--- oci/oci.go | 1 + server/server.go | 4 ++ 3 files changed, 104 insertions(+), 5 deletions(-) diff --git a/conmon/conmon.c b/conmon/conmon.c index cc42903b..3e814c81 100644 --- a/conmon/conmon.c +++ b/conmon/conmon.c @@ -93,6 +93,7 @@ static inline void strv_cleanup(char ***strv) static bool terminal = false; static char *cid = NULL; +static char *cuuid = NULL; static char *runtime_path = NULL; static char *bundle_path = NULL; static char *pid_file = NULL; @@ -104,6 +105,7 @@ static GOptionEntry entries[] = { { "terminal", 't', 0, G_OPTION_ARG_NONE, &terminal, "Terminal", NULL }, { "cid", 'c', 0, G_OPTION_ARG_STRING, &cid, "Container ID", NULL }, + { "cuuid", 'u', 0, G_OPTION_ARG_STRING, &cuuid, "Container UUID", NULL }, { "runtime", 'r', 0, G_OPTION_ARG_STRING, &runtime_path, "Runtime path", NULL }, { "bundle", 'b', 0, G_OPTION_ARG_STRING, &bundle_path, "Bundle path", NULL }, { "pidfile", 'p', 0, G_OPTION_ARG_STRING, &pid_file, "PID file", NULL }, @@ -436,6 +438,7 @@ int main(int argc, char *argv[]) int ret, runtime_status; char cwd[PATH_MAX]; char default_pid_file[PATH_MAX]; + char attach_sock_path[PATH_MAX]; GError *err = NULL; _cleanup_free_ char *contents; int cpid = -1; @@ -470,6 +473,9 @@ int main(int argc, char *argv[]) int wb; uint64_t oom_event; + /* Used for attach */ + _cleanup_close_ int conn_sock = -1; + /* Command line parameters */ context = g_option_context_new("- conmon utility"); g_option_context_add_main_entries(context, entries, "conmon"); @@ -481,6 +487,9 @@ int main(int argc, char *argv[]) if (cid == NULL) nexit("Container ID not provided. Use --cid"); + if (!exec && cuuid == NULL) + nexit("Container UUID not provided. Use --cuuid"); + if (runtime_path == NULL) nexit("Runtime path not provided. Use --runtime"); @@ -553,7 +562,7 @@ int main(int argc, char *argv[]) csfd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); if (csfd < 0) pexit("Failed to create console-socket"); - if (fchmod(csfd, 0700)) + if (fchmod(csfd, 0700)) pexit("Failed to change console-socket permissions"); /* XXX: This should be handled with a rename(2). */ if (unlink(csname) < 0) @@ -745,6 +754,50 @@ int main(int argc, char *argv[]) cpid = atoi(contents); ninfo("container PID: %d", cpid); + /* Setup endpoint for attach */ + char attach_symlink_dir_path[PATH_MAX]; + struct sockaddr_un attach_addr = {0}; + _cleanup_close_ int afd = -1; + + if (!exec) { + attach_addr.sun_family = AF_UNIX; + + /* + * Create a symlink so we don't exceed unix domain socket + * path length limit. + */ + snprintf(attach_symlink_dir_path, PATH_MAX, "/var/run/crio/%s", cuuid); + if (unlink(attach_symlink_dir_path) == -1 && errno != ENOENT) { + pexit("Failed to remove existing symlink for attach socket directory"); + } + if (symlink(bundle_path, attach_symlink_dir_path) == -1) + pexit("Failed to create symlink for attach socket"); + + snprintf(attach_sock_path, PATH_MAX, "/var/run/crio/%s/attach", cuuid); + ninfo("attach sock path: %s", attach_sock_path); + + strncpy(attach_addr.sun_path, attach_sock_path, sizeof(attach_addr.sun_path) - 1); + ninfo("addr{sun_family=AF_UNIX, sun_path=%s}", attach_addr.sun_path); + + /* + * We make the socket non-blocking to avoid a race where client aborts connection + * before the server gets a chance to call accept. In that scenario, the server + * accept blocks till a new client connection comes in. + */ + afd = socket(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); + if (afd == -1) + pexit("Failed to create attach socket"); + + if (fchmod(afd, 0700)) + pexit("Failed to change attach socket permissions"); + + if (bind(afd, (struct sockaddr *)&attach_addr, sizeof(struct sockaddr_un)) == -1) + pexit("Failed to bind attach socket: %s", attach_sock_path); + + if (listen(afd, 10) == -1) + pexit("Failed to listen on attach socket: %s", attach_sock_path); + } + /* Send the container pid back to parent */ if (sync_pipe_fd > 0 && !exec) { len = snprintf(buf, BUF_SIZE, "{\"pid\": %d}\n", cpid); @@ -810,6 +863,13 @@ int main(int argc, char *argv[]) pexit("Failed to add OOM eventfd to epoll"); } + /* Add the attach socket to epoll */ + if (afd > 0) { + ev.data.fd = afd; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) + pexit("Failed to add attach socket fd to epoll"); + } + /* Log all of the container's output. */ while (num_stdio_fds > 0) { int ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1); @@ -831,10 +891,28 @@ int main(int argc, char *argv[]) if (open("oom", O_CREAT, 0666) < 0) { nwarn("Failed to write oom file"); } - } - else { - nwarn("unknown pipe fd"); - goto out; + } else if (evlist[i].data.fd == afd) { + conn_sock = accept(afd, NULL, NULL); + if (conn_sock == -1) { + nwarn("Failed to accept client connection on attach socket"); + continue; + } + ev.events = EPOLLIN; + ev.data.fd = conn_sock; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) { + pexit("Failed to add client socket fd to epoll"); + } + ninfo("Accepted connection"); + } else { + num_read = read(masterfd, buf, BUF_SIZE); + if (num_read <= 0) + goto out; + ninfo("got data on connection: %d", num_read); + if (terminal) { + if (write_all(masterfd_stdout, buf, num_read) < 0) { + nwarn("Failed to write to master pty"); + } + } } if (masterfd == masterfd_stdout || masterfd == masterfd_stderr) { @@ -846,11 +924,21 @@ int main(int argc, char *argv[]) nwarn("write_k8s_log failed"); goto out; } + + if (conn_sock > 0) { + if (write_all(conn_sock, buf, num_read) < 0) { + nwarn("Failed to write to socket"); + } + } } } else if (evlist[i].events & (EPOLLHUP | EPOLLERR)) { printf("closing fd %d\n", evlist[i].data.fd); if (close(evlist[i].data.fd) < 0) pexit("close"); + if (!exec && evlist[i].data.fd == conn_sock) { + conn_sock = -1; + continue; + } num_stdio_fds--; } } @@ -906,5 +994,11 @@ out: } } + if (!exec) { + if (unlink(attach_symlink_dir_path) == -1 && errno != ENOENT) { + pexit("Failed to remove symlink for attach socket directory"); + } + } + return EXIT_SUCCESS; } diff --git a/oci/oci.go b/oci/oci.go index 7bd57959..bc64b34c 100644 --- a/oci/oci.go +++ b/oci/oci.go @@ -114,6 +114,7 @@ func (r *Runtime) CreateContainer(c *Container, cgroupParent string) error { args = append(args, "-s") } args = append(args, "-c", c.name) + args = append(args, "-u", c.id) args = append(args, "-r", r.Path(c)) args = append(args, "-b", c.bundlePath) args = append(args, "-p", filepath.Join(c.bundlePath, "pidfile")) diff --git a/server/server.go b/server/server.go index 7c62afde..b6d61dc6 100644 --- a/server/server.go +++ b/server/server.go @@ -555,6 +555,10 @@ func New(config *Config) (*Server, error) { return nil, err } + if err := os.MkdirAll("/var/run/crio", 0755); err != nil { + return nil, err + } + r, err := oci.New(config.Runtime, config.RuntimeHostPrivileged, config.Conmon, config.ConmonEnv, config.CgroupManager) if err != nil { return nil, err From 1a6825758cdef038cc17f0fbc70a247c7e4e21be Mon Sep 17 00:00:00 2001 From: Mrunal Patel Date: Tue, 30 May 2017 15:14:44 -0700 Subject: [PATCH 3/4] conmon: Add control fifo for terminal resize handling Signed-off-by: Mrunal Patel --- conmon/conmon.c | 101 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/conmon/conmon.c b/conmon/conmon.c index 3e814c81..f766f609 100644 --- a/conmon/conmon.c +++ b/conmon/conmon.c @@ -12,10 +12,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include @@ -439,6 +441,7 @@ int main(int argc, char *argv[]) char cwd[PATH_MAX]; char default_pid_file[PATH_MAX]; char attach_sock_path[PATH_MAX]; + char ctl_fifo_path[PATH_MAX]; GError *err = NULL; _cleanup_free_ char *contents; int cpid = -1; @@ -798,6 +801,40 @@ int main(int argc, char *argv[]) pexit("Failed to listen on attach socket: %s", attach_sock_path); } + /* Setup fifo for reading in terminal resize and other stdio control messages */ + _cleanup_close_ int ctlfd = -1; + _cleanup_close_ int dummyfd = -1; + int ctl_msg_type = -1; + int height = -1; + int width = -1; + struct winsize ws; + #define CTLBUFSZ 200 + char ctlbuf[CTLBUFSZ]; + char *readptr = ctlbuf; + int readsz = CTLBUFSZ - 1; + int cp_rem = 0; + if (!exec) { + snprintf(ctl_fifo_path, PATH_MAX, "%s/ctl", bundle_path); + ninfo("ctl fifo path: %s", ctl_fifo_path); + + if (mkfifo(ctl_fifo_path, 0666) == -1) + pexit("Failed to mkfifo at %s", ctl_fifo_path); + + ctlfd = open(ctl_fifo_path, O_RDONLY|O_NONBLOCK|O_CLOEXEC); + if (ctlfd == -1) + pexit("Failed to open control fifo"); + + /* + * Open a dummy writer to prevent getting flood of POLLHUPs when + * last writer closes. + */ + dummyfd = open(ctl_fifo_path, O_WRONLY|O_CLOEXEC); + if (dummyfd == -1) + pexit("Failed to open dummy writer for fifo"); + + ninfo("ctlfd: %d", ctlfd); + } + /* Send the container pid back to parent */ if (sync_pipe_fd > 0 && !exec) { len = snprintf(buf, BUF_SIZE, "{\"pid\": %d}\n", cpid); @@ -870,6 +907,13 @@ int main(int argc, char *argv[]) pexit("Failed to add attach socket fd to epoll"); } + /* Add control fifo fd to epoll */ + if (ctlfd > 0) { + ev.data.fd = ctlfd; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) + pexit("Failed to add control fifo fd to epoll"); + } + /* Log all of the container's output. */ while (num_stdio_fds > 0) { int ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1); @@ -903,6 +947,59 @@ int main(int argc, char *argv[]) pexit("Failed to add client socket fd to epoll"); } ninfo("Accepted connection"); + } else if (!exec && evlist[i].data.fd == ctlfd) { + num_read = read(ctlfd, readptr, readsz); + if (num_read <= 0) { + nwarn("Failed to read from control fd"); + continue; + } + readptr[num_read] = '\0'; + ninfo("Got ctl message: %s\n", ctlbuf); + + char *beg = ctlbuf; + char *newline = strchrnul(beg, '\n'); + /* Process each message which ends with a line */ + while (*newline != '\0') { + ret = sscanf(ctlbuf, "%d %d %d\n", &ctl_msg_type, &height, &width); + if (ret != 3) { + nwarn("Failed to sscanf message"); + continue; + } + ninfo("Message type: %d, Height: %d, Width: %d", ctl_msg_type, height, width); + ret = ioctl(masterfd_stdout, TIOCGWINSZ, &ws); + ninfo("Existing size: %d %d", ws.ws_row, ws.ws_col); + ws.ws_row = height; + ws.ws_col = width; + ret = ioctl(masterfd_stdout, TIOCSWINSZ, &ws); + if (ret == -1) { + nwarn("Failed to set process pty terminal size"); + } + beg = newline + 1; + newline = strchrnul(beg, '\n'); + } + if (num_read == (CTLBUFSZ - 1) && beg == ctlbuf) { + /* + * We did not find a newline in the entire buffer. + * This shouldn't happen as our buffer is larger than + * the message that we expect to receive. + */ + nwarn("Could not find newline in entire buffer\n"); + } else if (*beg == '\0') { + /* We exhausted all messages that were complete */ + readptr = ctlbuf; + readsz = CTLBUFSZ - 1; + } else { + /* + * We copy remaining data to beginning of buffer + * and advance readptr after that. + */ + cp_rem = 0; + do { + ctlbuf[cp_rem++] = *beg++; + } while (*beg != '\0'); + readptr = ctlbuf + cp_rem; + readsz = CTLBUFSZ - 1 - cp_rem; + } } else { num_read = read(masterfd, buf, BUF_SIZE); if (num_read <= 0) @@ -932,6 +1029,10 @@ int main(int argc, char *argv[]) } } } else if (evlist[i].events & (EPOLLHUP | EPOLLERR)) { + if (!exec && evlist[i].data.fd == ctlfd) { + ninfo("Remote writer to control fd closed"); + continue; + } printf("closing fd %d\n", evlist[i].data.fd); if (close(evlist[i].data.fd) < 0) pexit("close"); From 8f5f7aa5e2dd28c5cddbe50d96efe07048ac7d11 Mon Sep 17 00:00:00 2001 From: Mrunal Patel Date: Tue, 30 May 2017 15:15:30 -0700 Subject: [PATCH 4/4] Add code to handle CRI attach A goroutine is started to forward terminal resize requests from the resize channel. Also, data is copied back/forth between stdin, stdout, stderr streams and the attach socket for the container. Signed-off-by: Mrunal Patel --- server/container_attach.go | 102 ++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/server/container_attach.go b/server/container_attach.go index 96e2676b..a94690e0 100644 --- a/server/container_attach.go +++ b/server/container_attach.go @@ -1,11 +1,111 @@ package server import ( + "fmt" + "io" + "io/ioutil" + "net" + "os" + "path/filepath" + "syscall" + + "github.com/Sirupsen/logrus" + "github.com/kubernetes-incubator/cri-o/oci" "golang.org/x/net/context" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/term" ) // Attach prepares a streaming endpoint to attach to a running container. func (s *Server) Attach(ctx context.Context, req *pb.AttachRequest) (*pb.AttachResponse, error) { - return nil, nil + logrus.Debugf("AttachRequest %+v", req) + + resp, err := s.GetAttach(req) + if err != nil { + return nil, fmt.Errorf("unable to prepare attach endpoint") + } + + return resp, nil +} + +// Attach endpoint for streaming.Runtime +func (ss streamService) Attach(containerID string, inputStream io.Reader, outputStream, errorStream io.WriteCloser, tty bool, resize <-chan term.Size) error { + c := ss.runtimeServer.GetContainer(containerID) + + if c == nil { + return fmt.Errorf("could not find container %q", containerID) + } + + if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil { + return err + } + + cState := ss.runtimeServer.runtime.ContainerStatus(c) + if !(cState.Status == oci.ContainerStateRunning || cState.Status == oci.ContainerStateCreated) { + return fmt.Errorf("container is not created or running") + } + + controlPath := filepath.Join(c.BundlePath(), "ctl") + controlFile, err := os.OpenFile(controlPath, syscall.O_WRONLY, 0) + if err != nil { + return fmt.Errorf("failed to open container ctl file: %v", err) + } + + kubecontainer.HandleResizing(resize, func(size term.Size) { + logrus.Infof("Got a resize event: %+v", size) + _, err := fmt.Fprintf(controlFile, "%d %d %d\n", 1, size.Height, size.Width) + if err != nil { + logrus.Infof("Failed to write to control file to resize terminal: %v", err) + } + }) + + attachSocketPath := filepath.Join("/var/run/crio", c.ID(), "attach") + conn, err := net.Dial("unix", attachSocketPath) + if err != nil { + return fmt.Errorf("failed to connect to container %s attach socket: %v", c.ID(), err) + } + defer conn.Close() + + receiveStdout := make(chan error) + if outputStream != nil || errorStream != nil { + go func() { + receiveStdout <- redirectResponseToOutputStream(tty, outputStream, errorStream, conn) + }() + } + + stdinDone := make(chan struct{}) + go func() { + if inputStream != nil { + io.Copy(conn, inputStream) + } + close(stdinDone) + }() + + select { + case err := <-receiveStdout: + return err + case <-stdinDone: + if outputStream != nil || errorStream != nil { + return <-receiveStdout + } + } + + return nil +} + +func redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, conn io.Reader) error { + if outputStream == nil { + outputStream = ioutil.Discard + } + if errorStream == nil { + // errorStream = ioutil.Discard + } + var err error + if tty { + _, err = io.Copy(outputStream, conn) + } else { + // TODO + } + return err }