From ae77c61a8a7e4b74ecae9d073ed5ce98c26a895e Mon Sep 17 00:00:00 2001 From: Mrunal Patel Date: Tue, 30 May 2017 15:15:30 -0700 Subject: [PATCH] Add code to handle CRI attach A goroutine is started to forward terminal resize requests from the resize channel. Also, data is copied back/forth between stdin, stdout, stderr streams and the attach socket for the container. Signed-off-by: Mrunal Patel --- server/container_attach.go | 102 ++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/server/container_attach.go b/server/container_attach.go index 96e2676b..d4fe9d26 100644 --- a/server/container_attach.go +++ b/server/container_attach.go @@ -1,11 +1,111 @@ package server import ( + "fmt" + "io" + "io/ioutil" + "net" + "os" + "path/filepath" + "syscall" + + "github.com/Sirupsen/logrus" + "github.com/kubernetes-incubator/cri-o/oci" "golang.org/x/net/context" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/term" ) // Attach prepares a streaming endpoint to attach to a running container. func (s *Server) Attach(ctx context.Context, req *pb.AttachRequest) (*pb.AttachResponse, error) { - return nil, nil + logrus.Debugf("AttachRequest %+v", req) + + resp, err := s.GetAttach(req) + if err != nil { + return nil, fmt.Errorf("unable to prepare attach endpoint") + } + + return resp, nil +} + +// Attach endpoint for streaming.Runtime +func (ss streamService) Attach(containerID string, inputStream io.Reader, outputStream, errorStream io.WriteCloser, tty bool, resize <-chan term.Size) error { + c := ss.runtimeServer.GetContainer(containerID) + + if c == nil { + return fmt.Errorf("could not find container %q", containerID) + } + + if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil { + return err + } + + cState := ss.runtimeServer.runtime.ContainerStatus(c) + if !(cState.Status == oci.ContainerStateRunning || cState.Status == oci.ContainerStateCreated) { + return fmt.Errorf("container is not created or running") + } + + controlPath := filepath.Join(c.BundlePath(), "ctl") + controlFile, err := os.OpenFile(controlPath, syscall.O_WRONLY, 0) + if err != nil { + return fmt.Errorf("failed to open container ctl file: %v", err) + } + + kubecontainer.HandleResizing(resize, func(size term.Size) { + logrus.Infof("Got a resize event: %+v", size) + _, err := fmt.Fprintf(controlFile, "%d %d %d\n", 1, size.Height, size.Width) + if err != nil { + logrus.Infof("Failed to write to control file to resize terminal: %v", err) + } + }) + + attachSocketPath := filepath.Join("/run", c.Name(), "attach") + conn, err := net.Dial("unix", attachSocketPath) + if err != nil { + return fmt.Errorf("failed to connect to container %s attach socket: %v", c.ID(), err) + } + defer conn.Close() + + receiveStdout := make(chan error) + if outputStream != nil || errorStream != nil { + go func() { + receiveStdout <- redirectResponseToOutputStream(tty, outputStream, errorStream, conn) + }() + } + + stdinDone := make(chan struct{}) + go func() { + if inputStream != nil { + io.Copy(conn, inputStream) + } + close(stdinDone) + }() + + select { + case err := <-receiveStdout: + return err + case <-stdinDone: + if outputStream != nil || errorStream != nil { + return <-receiveStdout + } + } + + return nil +} + +func redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, conn io.Reader) error { + if outputStream == nil { + outputStream = ioutil.Discard + } + if errorStream == nil { + // errorStream = ioutil.Discard + } + var err error + if tty { + _, err = io.Copy(outputStream, conn) + } else { + // TODO + } + return err }