Implement non-terminal attach

We use a SOCK_SEQPACKET socket for the attach unix domain socket, which
means the kernel will ensure that the reading side only ever get the
data from one write operation. We use this for frameing, where the
first byte is the pipe that the next bytes are for. We have to make sure
that all reads from the socket are using at least the same size of buffer
as the write side, because otherwise the extra data in the message
will be dropped.

This also adds a stdin pipe for the container, similar to the ones we
use for stdout/err, because we need a way for an attached client
to write to stdin, even if not using a tty.

This fixes https://github.com/kubernetes-incubator/cri-o/issues/569

Signed-off-by: Alexander Larsson <alexl@redhat.com>
This commit is contained in:
Alexander Larsson 2017-06-08 22:08:29 +02:00
parent e3170caa2e
commit 7bb957bf75
8 changed files with 143 additions and 42 deletions

View file

@ -94,6 +94,7 @@ static inline void strv_cleanup(char ***strv)
#define MAX_EVENTS 10
static bool terminal = false;
static bool opt_stdin = false;
static char *cid = NULL;
static char *cuuid = NULL;
static char *runtime_path = NULL;
@ -106,6 +107,7 @@ static char *log_path = NULL;
static GOptionEntry entries[] =
{
{ "terminal", 't', 0, G_OPTION_ARG_NONE, &terminal, "Terminal", NULL },
{ "stdin", 'i', 0, G_OPTION_ARG_NONE, &opt_stdin, "Stdin", NULL },
{ "cid", 'c', 0, G_OPTION_ARG_STRING, &cid, "Container ID", NULL },
{ "cuuid", 'u', 0, G_OPTION_ARG_STRING, &cuuid, "Container UUID", NULL },
{ "runtime", 'r', 0, G_OPTION_ARG_STRING, &runtime_path, "Runtime path", NULL },
@ -245,7 +247,8 @@ int set_k8s_timestamp(char *buf, ssize_t buflen, const char *pipename)
return err;
}
/* stdpipe_t represents one of the std pipes (or NONE). */
/* stdpipe_t represents one of the std pipes (or NONE).
* Sync with const in container_attach.go */
typedef enum {
NO_PIPE,
STDIN_PIPE, /* unused */
@ -439,12 +442,15 @@ static char *escape_json_string(const char *str)
static int runtime_status = -1;
static int masterfd_stdin = -1;
static int masterfd_stdout = -1;
static int masterfd_stderr = -1;
static int num_stdio_fds = 0;
/* Used for attach */
static int conn_sock = -1;
static int conn_sock_readable;
static int conn_sock_writable;
static int logfd = -1;
static int oom_efd = -1;
@ -455,9 +461,28 @@ static int ofd = -1;
static GMainLoop *main_loop = NULL;
static void conn_sock_shutdown(int how)
{
if (conn_sock == -1)
return;
shutdown(conn_sock, how);
if (how & SHUT_RD)
conn_sock_readable = false;
if (how & SHUT_WR)
conn_sock_writable = false;
if (!conn_sock_writable && !conn_sock_readable) {
close(conn_sock);
conn_sock = -1;
}
}
static gboolean stdio_cb(int fd, GIOCondition condition, gpointer user_data)
{
char buf[BUF_SIZE];
#define STDIO_BUF_SIZE 8192 /* Sync with redirectResponseToOutputStreams() */
/* We use one extra byte at the start, which we don't read into, instead
we use that for marking the pipe when we write to the attached socket */
char real_buf[STDIO_BUF_SIZE + 1];
char *buf = real_buf + 1;
stdpipe_t pipe = GPOINTER_TO_INT(user_data);
ssize_t num_read = 0;
@ -474,8 +499,10 @@ static gboolean stdio_cb(int fd, GIOCondition condition, gpointer user_data)
return G_SOURCE_CONTINUE;
}
if (conn_sock > 0 && write_all(conn_sock, buf, num_read) < 0) {
real_buf[0] = pipe;
if (conn_sock_writable && write_all(conn_sock, real_buf, num_read+1) < 0) {
nwarn("Failed to write to socket");
conn_sock_shutdown(SHUT_WR);
}
return G_SOURCE_CONTINUE;
@ -528,44 +555,49 @@ static gboolean oom_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer us
static gboolean conn_sock_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
char buf[BUF_SIZE];
#define CONN_SOCK_BUF_SIZE 32*1024 /* Match the write size in CopyDetachable */
char buf[CONN_SOCK_BUF_SIZE];
ssize_t num_read = 0;
if ((condition & G_IO_IN) != 0) {
num_read = read(fd, buf, BUF_SIZE);
num_read = read(fd, buf, CONN_SOCK_BUF_SIZE);
if (num_read < 0)
return G_SOURCE_CONTINUE;
if (num_read > 0) {
ninfo("got data on connection: %zd", num_read);
if (terminal && write_all(masterfd_stdout, buf, num_read) < 0) {
nwarn("Failed to write to master pty");
if (num_read > 0 && masterfd_stdin >= 0) {
if (write_all(masterfd_stdin, buf, num_read) < 0) {
nwarn("Failed to write to container stdin");
}
return G_SOURCE_CONTINUE;
}
}
/* End of input */
close (fd);
conn_sock = -1;
conn_sock_shutdown(SHUT_RD);
if (masterfd_stdin >= 0 && opt_stdin) {
close(masterfd_stdin);
masterfd_stdin = -1;
}
return G_SOURCE_REMOVE;
}
static gboolean attach_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
static gboolean attach_cb(int fd, G_GNUC_UNUSED GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
conn_sock = accept(fd, NULL, NULL);
if (conn_sock == -1) {
if (errno != EWOULDBLOCK)
nwarn("Failed to accept client connection on attach socket");
} else {
g_unix_fd_add (conn_sock, G_IO_IN, conn_sock_cb, GINT_TO_POINTER(STDOUT_PIPE));
conn_sock_readable = true;
conn_sock_writable = true;
g_unix_fd_add (conn_sock, G_IO_IN|G_IO_HUP|G_IO_ERR, conn_sock_cb, GINT_TO_POINTER(STDOUT_PIPE));
ninfo("Accepted connection %d", conn_sock);
}
return G_SOURCE_CONTINUE;
}
static gboolean ctrl_cb(int fd, GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
static gboolean ctrl_cb(int fd, G_GNUC_UNUSED GIOCondition condition, G_GNUC_UNUSED gpointer user_data)
{
#define CTLBUFSZ 200
static char ctlbuf[CTLBUFSZ];
@ -659,6 +691,7 @@ static gboolean terminal_accept_cb(int fd, G_GNUC_UNUSED GIOCondition condition,
/* We only have a single fd for both pipes, so we just treat it as
* stdout. stderr is ignored. */
masterfd_stdin = console.fd;
masterfd_stdout = console.fd;
masterfd_stderr = -1;
@ -669,7 +702,7 @@ static gboolean terminal_accept_cb(int fd, G_GNUC_UNUSED GIOCondition condition,
}
static void
runtime_exit_cb (GPid pid, int status, G_GNUC_UNUSED gpointer user_data)
runtime_exit_cb (G_GNUC_UNUSED GPid pid, int status, G_GNUC_UNUSED gpointer user_data)
{
runtime_status = status;
g_main_loop_quit (main_loop);
@ -690,6 +723,7 @@ int main(int argc, char *argv[])
_cleanup_close_ int epfd = -1;
_cleanup_close_ int csfd = -1;
/* Used for !terminal cases. */
int slavefd_stdin = -1;
int slavefd_stdout = -1;
int slavefd_stderr = -1;
char csname[PATH_MAX] = "/tmp/conmon-term.XXXXXXXX";
@ -814,6 +848,15 @@ int main(int argc, char *argv[])
* used anything else (and it wouldn't be a good idea to create a new
* pty pair in the host).
*/
if (opt_stdin) {
if (pipe2(fds, O_CLOEXEC) < 0)
pexit("Failed to create !terminal stdin pipe");
masterfd_stdin = fds[1];
slavefd_stdin = fds[0];
}
if (pipe2(fds, O_CLOEXEC) < 0)
pexit("Failed to create !terminal stdout pipe");
@ -875,8 +918,17 @@ int main(int argc, char *argv[])
if (create_pid < 0) {
pexit("Failed to fork the create command");
} else if (!create_pid) {
/* We only need to touch the stdio if we have terminal=false. */
_cleanup_close_ int dev_null = -1;
/* FIXME: This results in us not outputting runc error messages to crio's log. */
if (slavefd_stdin < 0) {
dev_null = open("/dev/null", O_RDONLY);
if (dev_null < 0)
pexit("Failed to open /dev/null");
slavefd_stdin = dev_null;
}
if (dup2(slavefd_stdin, STDIN_FILENO) < 0)
pexit("Failed to dup over stdout");
if (slavefd_stdout >= 0) {
if (dup2(slavefd_stdout, STDOUT_FILENO) < 0)
pexit("Failed to dup over stdout");
@ -893,6 +945,7 @@ int main(int argc, char *argv[])
g_ptr_array_free (runtime_argv, TRUE);
/* The runtime has that fd now. We don't need to touch it anymore. */
close(slavefd_stdin);
close(slavefd_stdout);
close(slavefd_stderr);
@ -989,7 +1042,7 @@ int main(int argc, char *argv[])
* before the server gets a chance to call accept. In that scenario, the server
* accept blocks till a new client connection comes in.
*/
afd = socket(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
afd = socket(AF_UNIX, SOCK_SEQPACKET|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
if (afd == -1)
pexit("Failed to create attach socket");

View file

@ -31,6 +31,8 @@ type Container struct {
sandbox string
netns ns.NetNS
terminal bool
stdin bool
stdinOnce bool
privileged bool
state *ContainerState
metadata *pb.ContainerMetadata
@ -54,7 +56,7 @@ type ContainerState struct {
}
// NewContainer creates a container object.
func NewContainer(id string, name string, bundlePath string, logPath string, netns ns.NetNS, labels map[string]string, annotations map[string]string, image *pb.ImageSpec, metadata *pb.ContainerMetadata, sandbox string, terminal bool, privileged bool, dir string, created time.Time, stopSignal string) (*Container, error) {
func NewContainer(id string, name string, bundlePath string, logPath string, netns ns.NetNS, labels map[string]string, annotations map[string]string, image *pb.ImageSpec, metadata *pb.ContainerMetadata, sandbox string, terminal bool, stdin bool, stdinOnce bool, privileged bool, dir string, created time.Time, stopSignal string) (*Container, error) {
state := &ContainerState{}
state.Created = created
c := &Container{
@ -66,6 +68,8 @@ func NewContainer(id string, name string, bundlePath string, logPath string, net
sandbox: sandbox,
netns: netns,
terminal: terminal,
stdin: stdin,
stdinOnce: stdinOnce,
privileged: privileged,
metadata: metadata,
annotations: annotations,

View file

@ -121,6 +121,8 @@ func (r *Runtime) CreateContainer(c *Container, cgroupParent string) error {
args = append(args, "-l", c.logPath)
if c.terminal {
args = append(args, "-t")
} else if c.stdin {
args = append(args, "-i")
}
logrus.WithFields(logrus.Fields{
"args": args,

View file

@ -54,6 +54,12 @@ const (
// TTY is the terminal path annotation
TTY = "io.kubernetes.cri-o.TTY"
// Stdin is the stdin annotation
Stdin = "io.kubernetes.cri-o.Stdin"
// StdinOnce is the stdin_once annotation
StdinOnce = "io.kubernetes.cri-o.StdinOnce"
)
// ContainerType values

View file

@ -3,14 +3,12 @@ package server
import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"path/filepath"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/pools"
"github.com/kubernetes-incubator/cri-o/oci"
"github.com/kubernetes-incubator/cri-o/utils"
"golang.org/x/net/context"
@ -19,6 +17,13 @@ import (
"k8s.io/kubernetes/pkg/util/term"
)
/* Sync with stdpipe_t in conmon.c */
const (
AttachPipeStdin = 1
AttachPipeStdout = 2
AttachPipeStderr = 3
)
// Attach prepares a streaming endpoint to attach to a running container.
func (s *Server) Attach(ctx context.Context, req *pb.AttachRequest) (*pb.AttachResponse, error) {
logrus.Debugf("AttachRequest %+v", req)
@ -63,7 +68,7 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output
})
attachSocketPath := filepath.Join("/var/run/crio", c.ID(), "attach")
conn, err := net.Dial("unix", attachSocketPath)
conn, err := net.DialUnix("unixpacket", nil, &net.UnixAddr{Name: attachSocketPath, Net: "unixpacket"})
if err != nil {
return fmt.Errorf("failed to connect to container %s attach socket: %v", c.ID(), err)
}
@ -72,7 +77,7 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output
receiveStdout := make(chan error)
if outputStream != nil || errorStream != nil {
go func() {
receiveStdout <- redirectResponseToOutputStream(tty, outputStream, errorStream, conn)
receiveStdout <- redirectResponseToOutputStreams(outputStream, errorStream, conn)
}()
}
@ -81,6 +86,7 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output
var err error
if inputStream != nil {
_, err = utils.CopyDetachable(conn, inputStream, nil)
conn.CloseWrite()
}
stdinDone <- err
}()
@ -100,18 +106,42 @@ func (ss streamService) Attach(containerID string, inputStream io.Reader, output
return nil
}
func redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, conn io.Reader) error {
if outputStream == nil {
outputStream = ioutil.Discard
}
if errorStream == nil {
// errorStream = ioutil.Discard
}
func redirectResponseToOutputStreams(outputStream, errorStream io.Writer, conn io.Reader) error {
var err error
if tty {
_, err = pools.Copy(outputStream, conn)
} else {
// TODO
buf := make([]byte, 8192+1) /* Sync with conmon STDIO_BUF_SIZE */
for {
nr, er := conn.Read(buf)
if nr > 0 {
var dst io.Writer
if buf[0] == AttachPipeStdout {
dst = outputStream
} else if buf[0] == AttachPipeStderr {
dst = errorStream
} else {
logrus.Infof("Got unexpected attach type %+d", buf[0])
}
if dst != nil {
nw, ew := dst.Write(buf[1:nr])
if ew != nil {
err = ew
break
}
if nr != nw+1 {
err = io.ErrShortWrite
break
}
}
}
if er == io.EOF {
break
}
if er != nil {
err = er
break
}
}
return err
}

View file

@ -531,6 +531,8 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string,
specgen.AddAnnotation(annotations.ContainerType, annotations.ContainerTypeContainer)
specgen.AddAnnotation(annotations.LogPath, logPath)
specgen.AddAnnotation(annotations.TTY, fmt.Sprintf("%v", containerConfig.Tty))
specgen.AddAnnotation(annotations.Stdin, fmt.Sprintf("%v", containerConfig.Stdin))
specgen.AddAnnotation(annotations.StdinOnce, fmt.Sprintf("%v", containerConfig.StdinOnce))
specgen.AddAnnotation(annotations.Image, image)
created := time.Now()
@ -660,7 +662,7 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string,
return nil, err
}
container, err := oci.NewContainer(containerID, containerName, containerInfo.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, imageSpec, metadata, sb.id, containerConfig.Tty, sb.privileged, containerInfo.Dir, created, containerImageConfig.Config.StopSignal)
container, err := oci.NewContainer(containerID, containerName, containerInfo.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, imageSpec, metadata, sb.id, containerConfig.Tty, containerConfig.Stdin, containerConfig.StdinOnce, sb.privileged, containerInfo.Dir, created, containerImageConfig.Config.StopSignal)
if err != nil {
return nil, err
}

View file

@ -438,7 +438,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
return nil, fmt.Errorf("failed to write runtime configuration for pod sandbox %s(%s): %v", sb.name, id, err)
}
container, err := oci.NewContainer(id, containerName, podContainer.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, sb.privileged, podContainer.Dir, created, podContainer.Config.Config.StopSignal)
container, err := oci.NewContainer(id, containerName, podContainer.RunDir, logPath, sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, false, false, sb.privileged, podContainer.Dir, created, podContainer.Config.Config.StopSignal)
if err != nil {
return nil, err
}

View file

@ -34,6 +34,10 @@ const (
shutdownFile = "/var/lib/crio/crio.shutdown"
)
func isTrue(annotaton string) bool {
return annotaton == "true"
}
// streamService implements streaming.Runtime.
type streamService struct {
runtimeServer *Server // needed by Exec() endpoint
@ -116,10 +120,10 @@ func (s *Server) loadContainer(id string) error {
return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID])
}
var tty bool
if v := m.Annotations[annotations.TTY]; v == "true" {
tty = true
}
tty := isTrue(m.Annotations[annotations.TTY])
stdin := isTrue(m.Annotations[annotations.Stdin])
stdinOnce := isTrue(m.Annotations[annotations.StdinOnce])
containerPath, err := s.store.ContainerRunDirectory(id)
if err != nil {
return err
@ -148,7 +152,7 @@ func (s *Server) loadContainer(id string) error {
return err
}
ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, img, &metadata, sb.id, tty, sb.privileged, containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, img, &metadata, sb.id, tty, stdin, stdinOnce, sb.privileged, containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
if err != nil {
return err
}
@ -238,7 +242,7 @@ func (s *Server) loadSandbox(id string) error {
return err
}
privileged := m.Annotations[annotations.PrivilegedRuntime] == "true"
privileged := isTrue(m.Annotations[annotations.PrivilegedRuntime])
sb := &sandbox{
id: id,
@ -304,7 +308,7 @@ func (s *Server) loadSandbox(id string) error {
return err
}
scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, privileged, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, false, false, privileged, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
if err != nil {
return err
}