diff --git a/conmon/conmon.c b/conmon/conmon.c index dbcf1230..a82267a5 100644 --- a/conmon/conmon.c +++ b/conmon/conmon.c @@ -94,6 +94,7 @@ static inline void strv_cleanup(char ***strv) #define MAX_EVENTS 10 static bool terminal = false; +static bool opt_stdin = false; static char *cid = NULL; static char *cuuid = NULL; static char *runtime_path = NULL; @@ -106,6 +107,7 @@ static char *log_path = NULL; static GOptionEntry entries[] = { { "terminal", 't', 0, G_OPTION_ARG_NONE, &terminal, "Terminal", NULL }, + { "stdin", 'i', 0, G_OPTION_ARG_NONE, &opt_stdin, "Stdin", NULL }, { "cid", 'c', 0, G_OPTION_ARG_STRING, &cid, "Container ID", NULL }, { "cuuid", 'u', 0, G_OPTION_ARG_STRING, &cuuid, "Container UUID", NULL }, { "runtime", 'r', 0, G_OPTION_ARG_STRING, &runtime_path, "Runtime path", NULL }, @@ -245,7 +247,8 @@ int set_k8s_timestamp(char *buf, ssize_t buflen, const char *pipename) return err; } -/* stdpipe_t represents one of the std pipes (or NONE). */ +/* stdpipe_t represents one of the std pipes (or NONE). + * Sync with const in container_attach.go */ typedef enum { NO_PIPE, STDIN_PIPE, /* unused */ @@ -439,12 +442,15 @@ static char *escape_json_string(const char *str) static int runtime_status = -1; +static int masterfd_stdin = -1; static int masterfd_stdout = -1; static int masterfd_stderr = -1; static int num_stdio_fds = 0; /* Used for attach */ static int conn_sock = -1; +static int conn_sock_readable; +static int conn_sock_writable; static int logfd = -1; static int oom_efd = -1; @@ -455,9 +461,28 @@ static int ofd = -1; static GMainLoop *main_loop = NULL; +static void conn_sock_shutdown(int how) +{ + if (conn_sock == -1) + return; + shutdown(conn_sock, how); + if (how & SHUT_RD) + conn_sock_readable = false; + if (how & SHUT_WR) + conn_sock_writable = false; + if (!conn_sock_writable && !conn_sock_readable) { + close(conn_sock); + conn_sock = -1; + } +} + static gboolean stdio_cb(int fd, GIOCondition condition, gpointer user_data) { - char buf[BUF_SIZE]; + #define STDIO_BUF_SIZE 8192 /* Sync with redirectResponseToOutputStreams() */ + /* We use one extra byte at the start, which we don't read into, instead + we use that for marking the pipe when we write to the attached socket */ + char real_buf[STDIO_BUF_SIZE + 1]; + char *buf = real_buf + 1; stdpipe_t pipe = GPOINTER_TO_INT(user_data); ssize_t num_read = 0; @@ -474,8 +499,10 @@ static gboolean stdio_cb(int fd, GIOCondition condition, gpointer user_data) return G_SOURCE_CONTINUE; } - if (conn_sock > 0 && write_all(conn_sock, buf, num_read) < 0) { + real_buf[0] = pipe; + if (conn_sock_writable && write_all(conn_sock, real_buf, num_read+1) < 0) { nwarn("Failed to write to socket"); + conn_sock_shutdown(SHUT_WR); } return G_SOURCE_CONTINUE; @@ -528,44 +555,49 @@ static gboolean oom_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer us static gboolean conn_sock_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data) { - char buf[BUF_SIZE]; + #define CONN_SOCK_BUF_SIZE 32*1024 /* Match the write size in CopyDetachable */ + char buf[CONN_SOCK_BUF_SIZE]; ssize_t num_read = 0; if ((condition & G_IO_IN) != 0) { - num_read = read(fd, buf, BUF_SIZE); + num_read = read(fd, buf, CONN_SOCK_BUF_SIZE); if (num_read < 0) return G_SOURCE_CONTINUE; - if (num_read > 0) { - ninfo("got data on connection: %zd", num_read); - if (terminal && write_all(masterfd_stdout, buf, num_read) < 0) { - nwarn("Failed to write to master pty"); + if (num_read > 0 && masterfd_stdin >= 0) { + if (write_all(masterfd_stdin, buf, num_read) < 0) { + nwarn("Failed to write to container stdin"); } return G_SOURCE_CONTINUE; } } /* End of input */ - close (fd); - conn_sock = -1; + conn_sock_shutdown(SHUT_RD); + if (masterfd_stdin >= 0 && opt_stdin) { + close(masterfd_stdin); + masterfd_stdin = -1; + } return G_SOURCE_REMOVE; } -static gboolean attach_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data) +static gboolean attach_cb(int fd, G_GNUC_UNUSED GIOCondition condition, G_GNUC_UNUSED gpointer user_data) { conn_sock = accept(fd, NULL, NULL); if (conn_sock == -1) { if (errno != EWOULDBLOCK) nwarn("Failed to accept client connection on attach socket"); } else { - g_unix_fd_add (conn_sock, G_IO_IN, conn_sock_cb, GINT_TO_POINTER(STDOUT_PIPE)); + conn_sock_readable = true; + conn_sock_writable = true; + g_unix_fd_add (conn_sock, G_IO_IN|G_IO_HUP|G_IO_ERR, conn_sock_cb, GINT_TO_POINTER(STDOUT_PIPE)); ninfo("Accepted connection %d", conn_sock); } return G_SOURCE_CONTINUE; } -static gboolean ctrl_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data) +static gboolean ctrl_cb(int fd, G_GNUC_UNUSED GIOCondition condition, G_GNUC_UNUSED gpointer user_data) { #define CTLBUFSZ 200 static char ctlbuf[CTLBUFSZ]; @@ -659,6 +691,7 @@ static gboolean terminal_accept_cb(int fd, G_GNUC_UNUSED GIOCondition condition, /* We only have a single fd for both pipes, so we just treat it as * stdout. stderr is ignored. */ + masterfd_stdin = console.fd; masterfd_stdout = console.fd; masterfd_stderr = -1; @@ -669,7 +702,7 @@ static gboolean terminal_accept_cb(int fd, G_GNUC_UNUSED GIOCondition condition, } static void -runtime_exit_cb (GPid pid, int status, G_GNUC_UNUSED gpointer user_data) +runtime_exit_cb (G_GNUC_UNUSED GPid pid, int status, G_GNUC_UNUSED gpointer user_data) { runtime_status = status; g_main_loop_quit (main_loop); @@ -690,6 +723,7 @@ int main(int argc, char *argv[]) _cleanup_close_ int epfd = -1; _cleanup_close_ int csfd = -1; /* Used for !terminal cases. */ + int slavefd_stdin = -1; int slavefd_stdout = -1; int slavefd_stderr = -1; char csname[PATH_MAX] = "/tmp/conmon-term.XXXXXXXX"; @@ -814,6 +848,15 @@ int main(int argc, char *argv[]) * used anything else (and it wouldn't be a good idea to create a new * pty pair in the host). */ + + if (opt_stdin) { + if (pipe2(fds, O_CLOEXEC) < 0) + pexit("Failed to create !terminal stdin pipe"); + + masterfd_stdin = fds[1]; + slavefd_stdin = fds[0]; + } + if (pipe2(fds, O_CLOEXEC) < 0) pexit("Failed to create !terminal stdout pipe"); @@ -875,8 +918,17 @@ int main(int argc, char *argv[]) if (create_pid < 0) { pexit("Failed to fork the create command"); } else if (!create_pid) { - /* We only need to touch the stdio if we have terminal=false. */ + _cleanup_close_ int dev_null = -1; /* FIXME: This results in us not outputting runc error messages to crio's log. */ + if (slavefd_stdin < 0) { + dev_null = open("/dev/null", O_RDONLY); + if (dev_null < 0) + pexit("Failed to open /dev/null"); + slavefd_stdin = dev_null; + } + if (dup2(slavefd_stdin, STDIN_FILENO) < 0) + pexit("Failed to dup over stdout"); + if (slavefd_stdout >= 0) { if (dup2(slavefd_stdout, STDOUT_FILENO) < 0) pexit("Failed to dup over stdout"); @@ -893,6 +945,7 @@ int main(int argc, char *argv[]) g_ptr_array_free (runtime_argv, TRUE); /* The runtime has that fd now. We don't need to touch it anymore. */ + close(slavefd_stdin); close(slavefd_stdout); close(slavefd_stderr); @@ -989,7 +1042,7 @@ int main(int argc, char *argv[]) * before the server gets a chance to call accept. In that scenario, the server * accept blocks till a new client connection comes in. */ - afd = socket(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); + afd = socket(AF_UNIX, SOCK_SEQPACKET|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); if (afd == -1) pexit("Failed to create attach socket"); diff --git a/oci/container.go b/oci/container.go index 42885db4..6b99e487 100644 --- a/oci/container.go +++ b/oci/container.go @@ -31,6 +31,8 @@ type Container struct { sandbox string netns ns.NetNS terminal bool + stdin bool + stdinOnce bool privileged bool state *ContainerState metadata *pb.ContainerMetadata @@ -54,7 +56,7 @@ type ContainerState struct { } // NewContainer creates a container object. -func NewContainer(id string, name string, bundlePath string, logPath string, netns ns.NetNS, labels map[string]string, annotations map[string]string, image *pb.ImageSpec, metadata *pb.ContainerMetadata, sandbox string, terminal bool, privileged bool, dir string, created time.Time, stopSignal string) (*Container, error) { +func NewContainer(id string, name string, bundlePath string, logPath string, netns ns.NetNS, labels map[string]string, annotations map[string]string, image *pb.ImageSpec, metadata *pb.ContainerMetadata, sandbox string, terminal bool, stdin bool, stdinOnce bool, privileged bool, dir string, created time.Time, stopSignal string) (*Container, error) { state := &ContainerState{} state.Created = created c := &Container{ @@ -66,6 +68,8 @@ func NewContainer(id string, name string, bundlePath string, logPath string, net sandbox: sandbox, netns: netns, terminal: terminal, + stdin: stdin, + stdinOnce: stdinOnce, privileged: privileged, metadata: metadata, annotations: annotations, diff --git a/oci/oci.go b/oci/oci.go index 5eb8339e..8b33141c 100644 --- a/oci/oci.go +++ b/oci/oci.go @@ -121,6 +121,8 @@ func (r *Runtime) CreateContainer(c *Container, cgroupParent string) error { args = append(args, "-l", c.logPath) if c.terminal { args = append(args, "-t") + } else if c.stdin { + args = append(args, "-i") } logrus.WithFields(logrus.Fields{ "args": args, diff --git a/pkg/annotations/annotations.go b/pkg/annotations/annotations.go index 5f85a204..40916a25 100644 --- a/pkg/annotations/annotations.go +++ b/pkg/annotations/annotations.go @@ -54,6 +54,12 @@ const ( // TTY is the terminal path annotation TTY = "io.kubernetes.cri-o.TTY" + + // Stdin is the stdin annotation + Stdin = "io.kubernetes.cri-o.Stdin" + + // StdinOnce is the stdin_once annotation + StdinOnce = "io.kubernetes.cri-o.StdinOnce" ) // ContainerType values diff --git a/server/container_attach.go b/server/container_attach.go index 81c6cc3c..e7823e1e 100644 --- a/server/container_attach.go +++ b/server/container_attach.go @@ -3,14 +3,12 @@ package server import ( "fmt" "io" - "io/ioutil" "net" "os" "path/filepath" "syscall" "github.com/Sirupsen/logrus" - "github.com/docker/docker/pkg/pools" "github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/utils" "golang.org/x/net/context" @@ -19,6 +17,13 @@ import ( "k8s.io/kubernetes/pkg/util/term" ) +/* Sync with stdpipe_t in conmon.c */ +const ( + AttachPipeStdin = 1 + AttachPipeStdout = 2 + AttachPipeStderr = 3 +) + // Attach prepares a streaming endpoint to attach to a running container. func (s *Server) Attach(ctx context.Context, req *pb.AttachRequest) (*pb.AttachResponse, error) { logrus.Debugf("AttachRequest %+v", req) @@ -63,7 +68,7 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output }) attachSocketPath := filepath.Join("/var/run/crio", c.ID(), "attach") - conn, err := net.Dial("unix", attachSocketPath) + conn, err := net.DialUnix("unixpacket", nil, &net.UnixAddr{Name: attachSocketPath, Net: "unixpacket"}) if err != nil { return fmt.Errorf("failed to connect to container %s attach socket: %v", c.ID(), err) } @@ -72,7 +77,7 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output receiveStdout := make(chan error) if outputStream != nil || errorStream != nil { go func() { - receiveStdout <- redirectResponseToOutputStream(tty, outputStream, errorStream, conn) + receiveStdout <- redirectResponseToOutputStreams(outputStream, errorStream, conn) }() } @@ -81,6 +86,7 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output var err error if inputStream != nil { _, err = utils.CopyDetachable(conn, inputStream, nil) + conn.CloseWrite() } stdinDone <- err }() @@ -100,18 +106,42 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output return nil } -func redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, conn io.Reader) error { - if outputStream == nil { - outputStream = ioutil.Discard - } - if errorStream == nil { - // errorStream = ioutil.Discard - } +func redirectResponseToOutputStreams(outputStream, errorStream io.Writer, conn io.Reader) error { var err error - if tty { - _, err = pools.Copy(outputStream, conn) - } else { - // TODO + buf := make([]byte, 8192+1) /* Sync with conmon STDIO_BUF_SIZE */ + + for { + nr, er := conn.Read(buf) + if nr > 0 { + var dst io.Writer + if buf[0] == AttachPipeStdout { + dst = outputStream + } else if buf[0] == AttachPipeStderr { + dst = errorStream + } else { + logrus.Infof("Got unexpected attach type %+d", buf[0]) + } + + if dst != nil { + nw, ew := dst.Write(buf[1:nr]) + if ew != nil { + err = ew + break + } + if nr != nw+1 { + err = io.ErrShortWrite + break + } + } + } + if er == io.EOF { + break + } + if er != nil { + err = er + break + } } + return err } diff --git a/server/container_create.go b/server/container_create.go index f2946d92..57624d08 100644 --- a/server/container_create.go +++ b/server/container_create.go @@ -537,6 +537,8 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string, specgen.AddAnnotation(annotations.ContainerType, annotations.ContainerTypeContainer) specgen.AddAnnotation(annotations.LogPath, logPath) specgen.AddAnnotation(annotations.TTY, fmt.Sprintf("%v", containerConfig.Tty)) + specgen.AddAnnotation(annotations.Stdin, fmt.Sprintf("%v", containerConfig.Stdin)) + specgen.AddAnnotation(annotations.StdinOnce, fmt.Sprintf("%v", containerConfig.StdinOnce)) specgen.AddAnnotation(annotations.Image, image) created := time.Now() @@ -671,7 +673,7 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string, return nil, err } - container, err := oci.NewContainer(containerID, containerName, containerInfo.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, imageSpec, metadata, sb.id, containerConfig.Tty, sb.privileged, containerInfo.Dir, created, containerImageConfig.Config.StopSignal) + container, err := oci.NewContainer(containerID, containerName, containerInfo.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, imageSpec, metadata, sb.id, containerConfig.Tty, containerConfig.Stdin, containerConfig.StdinOnce, sb.privileged, containerInfo.Dir, created, containerImageConfig.Config.StopSignal) if err != nil { return nil, err } diff --git a/server/sandbox_run.go b/server/sandbox_run.go index faa93e1e..1353a616 100644 --- a/server/sandbox_run.go +++ b/server/sandbox_run.go @@ -438,7 +438,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest return nil, fmt.Errorf("failed to write runtime configuration for pod sandbox %s(%s): %v", sb.name, id, err) } - container, err := oci.NewContainer(id, containerName, podContainer.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, sb.privileged, podContainer.Dir, created, podContainer.Config.Config.StopSignal) + container, err := oci.NewContainer(id, containerName, podContainer.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, false, false, sb.privileged, podContainer.Dir, created, podContainer.Config.Config.StopSignal) if err != nil { return nil, err } diff --git a/server/server.go b/server/server.go index 82e96f4e..93a81e26 100644 --- a/server/server.go +++ b/server/server.go @@ -34,6 +34,10 @@ const ( shutdownFile = "/var/lib/crio/crio.shutdown" ) +func isTrue(annotaton string) bool { + return annotaton == "true" +} + // streamService implements streaming.Runtime. type streamService struct { runtimeServer *Server // needed by Exec() endpoint @@ -116,10 +120,10 @@ func (s *Server) loadContainer(id string) error { return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID]) } - var tty bool - if v := m.Annotations[annotations.TTY]; v == "true" { - tty = true - } + tty := isTrue(m.Annotations[annotations.TTY]) + stdin := isTrue(m.Annotations[annotations.Stdin]) + stdinOnce := isTrue(m.Annotations[annotations.StdinOnce]) + containerPath, err := s.store.ContainerRunDirectory(id) if err != nil { return err @@ -148,7 +152,7 @@ func (s *Server) loadContainer(id string) error { return err } - ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, img, &metadata, sb.id, tty, sb.privileged, containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"]) + ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, img, &metadata, sb.id, tty, stdin, stdinOnce, sb.privileged, containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"]) if err != nil { return err } @@ -238,7 +242,7 @@ func (s *Server) loadSandbox(id string) error { return err } - privileged := m.Annotations[annotations.PrivilegedRuntime] == "true" + privileged := isTrue(m.Annotations[annotations.PrivilegedRuntime]) sb := &sandbox{ id: id, @@ -304,7 +308,7 @@ func (s *Server) loadSandbox(id string) error { return err } - scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, privileged, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"]) + scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, false, false, privileged, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"]) if err != nil { return err }