7bb957bf75
We use a SOCK_SEQPACKET socket for the attach unix domain socket, which means the kernel will ensure that the reading side only ever get the data from one write operation. We use this for frameing, where the first byte is the pipe that the next bytes are for. We have to make sure that all reads from the socket are using at least the same size of buffer as the write side, because otherwise the extra data in the message will be dropped. This also adds a stdin pipe for the container, similar to the ones we use for stdout/err, because we need a way for an attached client to write to stdin, even if not using a tty. This fixes https://github.com/kubernetes-incubator/cri-o/issues/569 Signed-off-by: Alexander Larsson <alexl@redhat.com>
147 lines
3.5 KiB
Go
147 lines
3.5 KiB
Go
package server
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"syscall"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/kubernetes-incubator/cri-o/oci"
|
|
"github.com/kubernetes-incubator/cri-o/utils"
|
|
"golang.org/x/net/context"
|
|
pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
|
"k8s.io/kubernetes/pkg/util/term"
|
|
)
|
|
|
|
/* Sync with stdpipe_t in conmon.c */
|
|
const (
|
|
AttachPipeStdin = 1
|
|
AttachPipeStdout = 2
|
|
AttachPipeStderr = 3
|
|
)
|
|
|
|
// Attach prepares a streaming endpoint to attach to a running container.
|
|
func (s *Server) Attach(ctx context.Context, req *pb.AttachRequest) (*pb.AttachResponse, error) {
|
|
logrus.Debugf("AttachRequest %+v", req)
|
|
|
|
resp, err := s.GetAttach(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to prepare attach endpoint")
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// Attach endpoint for streaming.Runtime
|
|
func (ss streamService) Attach(containerID string, inputStream io.Reader, outputStream, errorStream io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
|
c := ss.runtimeServer.GetContainer(containerID)
|
|
|
|
if c == nil {
|
|
return fmt.Errorf("could not find container %q", containerID)
|
|
}
|
|
|
|
if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil {
|
|
return err
|
|
}
|
|
|
|
cState := ss.runtimeServer.runtime.ContainerStatus(c)
|
|
if !(cState.Status == oci.ContainerStateRunning || cState.Status == oci.ContainerStateCreated) {
|
|
return fmt.Errorf("container is not created or running")
|
|
}
|
|
|
|
controlPath := filepath.Join(c.BundlePath(), "ctl")
|
|
controlFile, err := os.OpenFile(controlPath, syscall.O_WRONLY, 0)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open container ctl file: %v", err)
|
|
}
|
|
|
|
kubecontainer.HandleResizing(resize, func(size term.Size) {
|
|
logrus.Infof("Got a resize event: %+v", size)
|
|
_, err := fmt.Fprintf(controlFile, "%d %d %d\n", 1, size.Height, size.Width)
|
|
if err != nil {
|
|
logrus.Infof("Failed to write to control file to resize terminal: %v", err)
|
|
}
|
|
})
|
|
|
|
attachSocketPath := filepath.Join("/var/run/crio", c.ID(), "attach")
|
|
conn, err := net.DialUnix("unixpacket", nil, &net.UnixAddr{Name: attachSocketPath, Net: "unixpacket"})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to container %s attach socket: %v", c.ID(), err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
receiveStdout := make(chan error)
|
|
if outputStream != nil || errorStream != nil {
|
|
go func() {
|
|
receiveStdout <- redirectResponseToOutputStreams(outputStream, errorStream, conn)
|
|
}()
|
|
}
|
|
|
|
stdinDone := make(chan error)
|
|
go func() {
|
|
var err error
|
|
if inputStream != nil {
|
|
_, err = utils.CopyDetachable(conn, inputStream, nil)
|
|
conn.CloseWrite()
|
|
}
|
|
stdinDone <- err
|
|
}()
|
|
|
|
select {
|
|
case err := <-receiveStdout:
|
|
return err
|
|
case err := <-stdinDone:
|
|
if _, ok := err.(utils.DetachError); ok {
|
|
return nil
|
|
}
|
|
if outputStream != nil || errorStream != nil {
|
|
return <-receiveStdout
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func redirectResponseToOutputStreams(outputStream, errorStream io.Writer, conn io.Reader) error {
|
|
var err error
|
|
buf := make([]byte, 8192+1) /* Sync with conmon STDIO_BUF_SIZE */
|
|
|
|
for {
|
|
nr, er := conn.Read(buf)
|
|
if nr > 0 {
|
|
var dst io.Writer
|
|
if buf[0] == AttachPipeStdout {
|
|
dst = outputStream
|
|
} else if buf[0] == AttachPipeStderr {
|
|
dst = errorStream
|
|
} else {
|
|
logrus.Infof("Got unexpected attach type %+d", buf[0])
|
|
}
|
|
|
|
if dst != nil {
|
|
nw, ew := dst.Write(buf[1:nr])
|
|
if ew != nil {
|
|
err = ew
|
|
break
|
|
}
|
|
if nr != nw+1 {
|
|
err = io.ErrShortWrite
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if er == io.EOF {
|
|
break
|
|
}
|
|
if er != nil {
|
|
err = er
|
|
break
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|