Merge pull request #579 from alexlarsson/non-terminal-attach

Implement non-terminal attach
This commit is contained in:
Mrunal Patel 2017-06-14 21:45:44 -07:00 committed by GitHub
commit 7b9032bac7
8 changed files with 143 additions and 42 deletions

View file

@ -94,6 +94,7 @@ static inline void strv_cleanup(char ***strv)
#define MAX_EVENTS 10
static bool terminal = false;
static bool opt_stdin = false;
static char *cid = NULL;
static char *cuuid = NULL;
static char *runtime_path = NULL;
@ -106,6 +107,7 @@ static char *log_path = NULL;
static GOptionEntry entries[] =
{
{ "terminal", 't', 0, G_OPTION_ARG_NONE, &terminal, "Terminal", NULL },
{ "stdin", 'i', 0, G_OPTION_ARG_NONE, &opt_stdin, "Stdin", NULL },
{ "cid", 'c', 0, G_OPTION_ARG_STRING, &cid, "Container ID", NULL },
{ "cuuid", 'u', 0, G_OPTION_ARG_STRING, &cuuid, "Container UUID", NULL },
{ "runtime", 'r', 0, G_OPTION_ARG_STRING, &runtime_path, "Runtime path", NULL },
@ -245,7 +247,8 @@ int set_k8s_timestamp(char *buf, ssize_t buflen, const char *pipename)
return err;
}
/* stdpipe_t represents one of the std pipes (or NONE). */
/* stdpipe_t represents one of the std pipes (or NONE).
* Sync with const in container_attach.go */
typedef enum {
NO_PIPE,
STDIN_PIPE, /* unused */
@ -439,12 +442,15 @@ static char *escape_json_string(const char *str)
static int runtime_status = -1;
static int masterfd_stdin = -1;
static int masterfd_stdout = -1;
static int masterfd_stderr = -1;
static int num_stdio_fds = 0;
/* Used for attach */
static int conn_sock = -1;
static int conn_sock_readable;
static int conn_sock_writable;
static int logfd = -1;
static int oom_efd = -1;
@ -455,9 +461,28 @@ static int ofd = -1;
static GMainLoop *main_loop = NULL;
static void conn_sock_shutdown(int how)
{
if (conn_sock == -1)
return;
shutdown(conn_sock, how);
if (how & SHUT_RD)
conn_sock_readable = false;
if (how & SHUT_WR)
conn_sock_writable = false;
if (!conn_sock_writable && !conn_sock_readable) {
close(conn_sock);
conn_sock = -1;
}
}
static gboolean stdio_cb(int fd, GIOCondition condition, gpointer user_data)
{
char buf[BUF_SIZE];
#define STDIO_BUF_SIZE 8192 /* Sync with redirectResponseToOutputStreams() */
/* We use one extra byte at the start, which we don't read into, instead
we use that for marking the pipe when we write to the attached socket */
char real_buf[STDIO_BUF_SIZE + 1];
char *buf = real_buf + 1;
stdpipe_t pipe = GPOINTER_TO_INT(user_data);
ssize_t num_read = 0;
@ -474,8 +499,10 @@ static gboolean stdio_cb(int fd, GIOCondition condition, gpointer user_data)
return G_SOURCE_CONTINUE;
}
if (conn_sock > 0 && write_all(conn_sock, buf, num_read) < 0) {
real_buf[0] = pipe;
if (conn_sock_writable && write_all(conn_sock, real_buf, num_read+1) < 0) {
nwarn("Failed to write to socket");
conn_sock_shutdown(SHUT_WR);
}
return G_SOURCE_CONTINUE;
@ -528,44 +555,49 @@ static gboolean oom_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer us
static gboolean conn_sock_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
char buf[BUF_SIZE];
#define CONN_SOCK_BUF_SIZE 32*1024 /* Match the write size in CopyDetachable */
char buf[CONN_SOCK_BUF_SIZE];
ssize_t num_read = 0;
if ((condition & G_IO_IN) != 0) {
num_read = read(fd, buf, BUF_SIZE);
num_read = read(fd, buf, CONN_SOCK_BUF_SIZE);
if (num_read < 0)
return G_SOURCE_CONTINUE;
if (num_read > 0) {
ninfo("got data on connection: %zd", num_read);
if (terminal && write_all(masterfd_stdout, buf, num_read) < 0) {
nwarn("Failed to write to master pty");
if (num_read > 0 && masterfd_stdin >= 0) {
if (write_all(masterfd_stdin, buf, num_read) < 0) {
nwarn("Failed to write to container stdin");
}
return G_SOURCE_CONTINUE;
}
}
/* End of input */
close (fd);
conn_sock = -1;
conn_sock_shutdown(SHUT_RD);
if (masterfd_stdin >= 0 && opt_stdin) {
close(masterfd_stdin);
masterfd_stdin = -1;
}
return G_SOURCE_REMOVE;
}
static gboolean attach_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
static gboolean attach_cb(int fd, G_GNUC_UNUSED GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
conn_sock = accept(fd, NULL, NULL);
if (conn_sock == -1) {
if (errno != EWOULDBLOCK)
nwarn("Failed to accept client connection on attach socket");
} else {
g_unix_fd_add (conn_sock, G_IO_IN, conn_sock_cb, GINT_TO_POINTER(STDOUT_PIPE));
conn_sock_readable = true;
conn_sock_writable = true;
g_unix_fd_add (conn_sock, G_IO_IN|G_IO_HUP|G_IO_ERR, conn_sock_cb, GINT_TO_POINTER(STDOUT_PIPE));
ninfo("Accepted connection %d", conn_sock);
}
return G_SOURCE_CONTINUE;
}
static gboolean ctrl_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
static gboolean ctrl_cb(int fd, G_GNUC_UNUSED GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
#define CTLBUFSZ 200
static char ctlbuf[CTLBUFSZ];
@ -659,6 +691,7 @@ static gboolean terminal_accept_cb(int fd, G_GNUC_UNUSED GIOCondition condition,
/* We only have a single fd for both pipes, so we just treat it as
* stdout. stderr is ignored. */
masterfd_stdin = console.fd;
masterfd_stdout = console.fd;
masterfd_stderr = -1;
@ -669,7 +702,7 @@ static gboolean terminal_accept_cb(int fd, G_GNUC_UNUSED GIOCondition condition,
}
static void
runtime_exit_cb (GPid pid, int status, G_GNUC_UNUSED gpointer user_data)
runtime_exit_cb (G_GNUC_UNUSED GPid pid, int status, G_GNUC_UNUSED gpointer user_data)
{
runtime_status = status;
g_main_loop_quit (main_loop);
@ -690,6 +723,7 @@ int main(int argc, char *argv[])
_cleanup_close_ int epfd = -1;
_cleanup_close_ int csfd = -1;
/* Used for !terminal cases. */
int slavefd_stdin = -1;
int slavefd_stdout = -1;
int slavefd_stderr = -1;
char csname[PATH_MAX] = "/tmp/conmon-term.XXXXXXXX";
@ -814,6 +848,15 @@ int main(int argc, char *argv[])
* used anything else (and it wouldn't be a good idea to create a new
* pty pair in the host).
*/
if (opt_stdin) {
if (pipe2(fds, O_CLOEXEC) < 0)
pexit("Failed to create !terminal stdin pipe");
masterfd_stdin = fds[1];
slavefd_stdin = fds[0];
}
if (pipe2(fds, O_CLOEXEC) < 0)
pexit("Failed to create !terminal stdout pipe");
@ -875,8 +918,17 @@ int main(int argc, char *argv[])
if (create_pid < 0) {
pexit("Failed to fork the create command");
} else if (!create_pid) {
/* We only need to touch the stdio if we have terminal=false. */
_cleanup_close_ int dev_null = -1;
/* FIXME: This results in us not outputting runc error messages to crio's log. */
if (slavefd_stdin < 0) {
dev_null = open("/dev/null", O_RDONLY);
if (dev_null < 0)
pexit("Failed to open /dev/null");
slavefd_stdin = dev_null;
}
if (dup2(slavefd_stdin, STDIN_FILENO) < 0)
pexit("Failed to dup over stdout");
if (slavefd_stdout >= 0) {
if (dup2(slavefd_stdout, STDOUT_FILENO) < 0)
pexit("Failed to dup over stdout");
@ -893,6 +945,7 @@ int main(int argc, char *argv[])
g_ptr_array_free (runtime_argv, TRUE);
/* The runtime has that fd now. We don't need to touch it anymore. */
close(slavefd_stdin);
close(slavefd_stdout);
close(slavefd_stderr);
@ -989,7 +1042,7 @@ int main(int argc, char *argv[])
* before the server gets a chance to call accept. In that scenario, the server
* accept blocks till a new client connection comes in.
*/
afd = socket(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
afd = socket(AF_UNIX, SOCK_SEQPACKET|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
if (afd == -1)
pexit("Failed to create attach socket");

View file

@ -31,6 +31,8 @@ type Container struct {
sandbox string
netns ns.NetNS
terminal bool
stdin bool
stdinOnce bool
privileged bool
state *ContainerState
metadata *pb.ContainerMetadata
@ -54,7 +56,7 @@ type ContainerState struct {
}
// NewContainer creates a container object.
func NewContainer(id string, name string, bundlePath string, logPath string, netns ns.NetNS, labels map[string]string, annotations map[string]string, image *pb.ImageSpec, metadata *pb.ContainerMetadata, sandbox string, terminal bool, privileged bool, dir string, created time.Time, stopSignal string) (*Container, error) {
func NewContainer(id string, name string, bundlePath string, logPath string, netns ns.NetNS, labels map[string]string, annotations map[string]string, image *pb.ImageSpec, metadata *pb.ContainerMetadata, sandbox string, terminal bool, stdin bool, stdinOnce bool, privileged bool, dir string, created time.Time, stopSignal string) (*Container, error) {
state := &ContainerState{}
state.Created = created
c := &Container{
@ -66,6 +68,8 @@ func NewContainer(id string, name string, bundlePath string, logPath string, net
sandbox: sandbox,
netns: netns,
terminal: terminal,
stdin: stdin,
stdinOnce: stdinOnce,
privileged: privileged,
metadata: metadata,
annotations: annotations,

View file

@ -121,6 +121,8 @@ func (r *Runtime) CreateContainer(c *Container, cgroupParent string) error {
args = append(args, "-l", c.logPath)
if c.terminal {
args = append(args, "-t")
} else if c.stdin {
args = append(args, "-i")
}
logrus.WithFields(logrus.Fields{
"args": args,

View file

@ -54,6 +54,12 @@ const (
// TTY is the terminal path annotation
TTY = "io.kubernetes.cri-o.TTY"
// Stdin is the stdin annotation
Stdin = "io.kubernetes.cri-o.Stdin"
// StdinOnce is the stdin_once annotation
StdinOnce = "io.kubernetes.cri-o.StdinOnce"
)
// ContainerType values

View file

@ -3,14 +3,12 @@ package server
import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"path/filepath"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/pools"
"github.com/kubernetes-incubator/cri-o/oci"
"github.com/kubernetes-incubator/cri-o/utils"
"golang.org/x/net/context"
@ -19,6 +17,13 @@ import (
"k8s.io/kubernetes/pkg/util/term"
)
/* Sync with stdpipe_t in conmon.c */
const (
AttachPipeStdin = 1
AttachPipeStdout = 2
AttachPipeStderr = 3
)
// Attach prepares a streaming endpoint to attach to a running container.
func (s *Server) Attach(ctx context.Context, req *pb.AttachRequest) (*pb.AttachResponse, error) {
logrus.Debugf("AttachRequest %+v", req)
@ -63,7 +68,7 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output
})
attachSocketPath := filepath.Join("/var/run/crio", c.ID(), "attach")
conn, err := net.Dial("unix", attachSocketPath)
conn, err := net.DialUnix("unixpacket", nil, &net.UnixAddr{Name: attachSocketPath, Net: "unixpacket"})
if err != nil {
return fmt.Errorf("failed to connect to container %s attach socket: %v", c.ID(), err)
}
@ -72,7 +77,7 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output
receiveStdout := make(chan error)
if outputStream != nil || errorStream != nil {
go func() {
receiveStdout <- redirectResponseToOutputStream(tty, outputStream, errorStream, conn)
receiveStdout <- redirectResponseToOutputStreams(outputStream, errorStream, conn)
}()
}
@ -81,6 +86,7 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output
var err error
if inputStream != nil {
_, err = utils.CopyDetachable(conn, inputStream, nil)
conn.CloseWrite()
}
stdinDone <- err
}()
@ -100,18 +106,42 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output
return nil
}
func redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, conn io.Reader) error {
if outputStream == nil {
outputStream = ioutil.Discard
}
if errorStream == nil {
// errorStream = ioutil.Discard
}
func redirectResponseToOutputStreams(outputStream, errorStream io.Writer, conn io.Reader) error {
var err error
if tty {
_, err = pools.Copy(outputStream, conn)
} else {
// TODO
buf := make([]byte, 8192+1) /* Sync with conmon STDIO_BUF_SIZE */
for {
nr, er := conn.Read(buf)
if nr > 0 {
var dst io.Writer
if buf[0] == AttachPipeStdout {
dst = outputStream
} else if buf[0] == AttachPipeStderr {
dst = errorStream
} else {
logrus.Infof("Got unexpected attach type %+d", buf[0])
}
if dst != nil {
nw, ew := dst.Write(buf[1:nr])
if ew != nil {
err = ew
break
}
if nr != nw+1 {
err = io.ErrShortWrite
break
}
}
}
if er == io.EOF {
break
}
if er != nil {
err = er
break
}
}
return err
}

View file

@ -537,6 +537,8 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string,
specgen.AddAnnotation(annotations.ContainerType, annotations.ContainerTypeContainer)
specgen.AddAnnotation(annotations.LogPath, logPath)
specgen.AddAnnotation(annotations.TTY, fmt.Sprintf("%v", containerConfig.Tty))
specgen.AddAnnotation(annotations.Stdin, fmt.Sprintf("%v", containerConfig.Stdin))
specgen.AddAnnotation(annotations.StdinOnce, fmt.Sprintf("%v", containerConfig.StdinOnce))
specgen.AddAnnotation(annotations.Image, image)
created := time.Now()
@ -671,7 +673,7 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string,
return nil, err
}
container, err := oci.NewContainer(containerID, containerName, containerInfo.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, imageSpec, metadata, sb.id, containerConfig.Tty, sb.privileged, containerInfo.Dir, created, containerImageConfig.Config.StopSignal)
container, err := oci.NewContainer(containerID, containerName, containerInfo.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, imageSpec, metadata, sb.id, containerConfig.Tty, containerConfig.Stdin, containerConfig.StdinOnce, sb.privileged, containerInfo.Dir, created, containerImageConfig.Config.StopSignal)
if err != nil {
return nil, err
}

View file

@ -438,7 +438,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
return nil, fmt.Errorf("failed to write runtime configuration for pod sandbox %s(%s): %v", sb.name, id, err)
}
container, err := oci.NewContainer(id, containerName, podContainer.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, sb.privileged, podContainer.Dir, created, podContainer.Config.Config.StopSignal)
container, err := oci.NewContainer(id, containerName, podContainer.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, false, false, sb.privileged, podContainer.Dir, created, podContainer.Config.Config.StopSignal)
if err != nil {
return nil, err
}

View file

@ -34,6 +34,10 @@ const (
shutdownFile = "/var/lib/crio/crio.shutdown"
)
func isTrue(annotaton string) bool {
return annotaton == "true"
}
// streamService implements streaming.Runtime.
type streamService struct {
runtimeServer *Server // needed by Exec() endpoint
@ -116,10 +120,10 @@ func (s *Server) loadContainer(id string) error {
return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID])
}
var tty bool
if v := m.Annotations[annotations.TTY]; v == "true" {
tty = true
}
tty := isTrue(m.Annotations[annotations.TTY])
stdin := isTrue(m.Annotations[annotations.Stdin])
stdinOnce := isTrue(m.Annotations[annotations.StdinOnce])
containerPath, err := s.store.ContainerRunDirectory(id)
if err != nil {
return err
@ -148,7 +152,7 @@ func (s *Server) loadContainer(id string) error {
return err
}
ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, img, &metadata, sb.id, tty, sb.privileged, containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, img, &metadata, sb.id, tty, stdin, stdinOnce, sb.privileged, containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
if err != nil {
return err
}
@ -238,7 +242,7 @@ func (s *Server) loadSandbox(id string) error {
return err
}
privileged := m.Annotations[annotations.PrivilegedRuntime] == "true"
privileged := isTrue(m.Annotations[annotations.PrivilegedRuntime])
sb := &sandbox{
id: id,
@ -304,7 +308,7 @@ func (s *Server) loadSandbox(id string) error {
return err
}
scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, privileged, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, false, false, privileged, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
if err != nil {
return err
}