package oci import ( "bytes" "encoding/json" "fmt" "io/ioutil" "os" "os/exec" "path/filepath" "strconv" "strings" "syscall" "time" "github.com/kubernetes-incubator/cri-o/utils" rspec "github.com/opencontainers/runtime-spec/specs-go" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" kwait "k8s.io/apimachinery/pkg/util/wait" ) const ( // ContainerStateCreated represents the created state of a container ContainerStateCreated = "created" // ContainerStateRunning represents the running state of a container ContainerStateRunning = "running" // ContainerStateStopped represents the stopped state of a container ContainerStateStopped = "stopped" // ContainerCreateTimeout represents the value of container creating timeout ContainerCreateTimeout = 10 * time.Second ) // New creates a new Runtime with options provided func New(runtimeTrustedPath string, runtimeUntrustedPath string, trustLevel string, conmonPath string, conmonEnv []string, cgroupManager string, containerExitsDir string) (*Runtime, error) { r := &Runtime{ name: filepath.Base(runtimeTrustedPath), trustedPath: runtimeTrustedPath, untrustedPath: runtimeUntrustedPath, trustLevel: trustLevel, conmonPath: conmonPath, conmonEnv: conmonEnv, cgroupManager: cgroupManager, containerExitsDir: containerExitsDir, } return r, nil } // Runtime stores the information about a oci runtime type Runtime struct { name string trustedPath string untrustedPath string trustLevel string conmonPath string conmonEnv []string cgroupManager string containerExitsDir string } // syncInfo is used to return data from monitor process to daemon type syncInfo struct { Pid int `json:"pid"` Message string `json:"message,omitempty"` } // exitCodeInfo is used to return the monitored process exit code to the daemon type exitCodeInfo struct { ExitCode int32 `json:"exit_code"` Message string `json:"message,omitempty"` } // Name returns the name of the OCI Runtime func (r *Runtime) Name() string { return r.name } // Path returns the full path the OCI Runtime executable. // Depending if the container is privileged and/or trusted, // this will return either the trusted or untrusted runtime path. func (r *Runtime) Path(c *Container) string { if !c.trusted { // We have an explicitly untrusted container. if c.privileged { logrus.Warnf("Running an untrusted but privileged container") return r.trustedPath } if r.untrustedPath != "" { return r.untrustedPath } return r.trustedPath } // Our container is trusted. Let's look at the configured trust level. if r.trustLevel == "trusted" { return r.trustedPath } // Our container is trusted, but we are running untrusted. // We will use the untrusted container runtime if it's set // and if it's not a privileged container. if c.privileged || r.untrustedPath == "" { return r.trustedPath } return r.untrustedPath } // Version returns the version of the OCI Runtime func (r *Runtime) Version() (string, error) { runtimeVersion, err := getOCIVersion(r.trustedPath, "-v") if err != nil { return "", err } return runtimeVersion, nil } func getOCIVersion(name string, args ...string) (string, error) { out, err := utils.ExecCmd(name, args...) if err != nil { return "", err } firstLine := out[:strings.Index(out, "\n")] v := firstLine[strings.LastIndex(firstLine, " ")+1:] return v, nil } // CreateContainer creates a container. func (r *Runtime) CreateContainer(c *Container, cgroupParent string) error { var stderrBuf bytes.Buffer parentPipe, childPipe, err := newPipe() childStartPipe, parentStartPipe, err := newPipe() if err != nil { return fmt.Errorf("error creating socket pair: %v", err) } defer parentPipe.Close() defer parentStartPipe.Close() var args []string if r.cgroupManager == "systemd" { args = append(args, "-s") } args = append(args, "-c", c.id) args = append(args, "-u", c.id) args = append(args, "-r", r.Path(c)) args = append(args, "-b", c.bundlePath) args = append(args, "-p", filepath.Join(c.bundlePath, "pidfile")) args = append(args, "-l", c.logPath) args = append(args, "--exit-dir", r.containerExitsDir) if c.terminal { args = append(args, "-t") } else if c.stdin { args = append(args, "-i") } logrus.WithFields(logrus.Fields{ "args": args, }).Debugf("running conmon: %s", r.conmonPath) cmd := exec.Command(r.conmonPath, args...) cmd.Dir = c.bundlePath cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, } cmd.Stdin = os.Stdin cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if c.terminal { cmd.Stderr = &stderrBuf } cmd.ExtraFiles = append(cmd.ExtraFiles, childPipe, childStartPipe) // 0, 1 and 2 are stdin, stdout and stderr cmd.Env = append(r.conmonEnv, fmt.Sprintf("_OCI_SYNCPIPE=%d", 3)) cmd.Env = append(cmd.Env, fmt.Sprintf("_OCI_STARTPIPE=%d", 4)) err = cmd.Start() if err != nil { childPipe.Close() return err } // We don't need childPipe on the parent side childPipe.Close() childStartPipe.Close() // Move conmon to specified cgroup if cgroupParent != "" { if r.cgroupManager == "systemd" { logrus.Infof("Running conmon under slice %s and unitName %s", cgroupParent, createUnitName("crio-conmon", c.id)) if err = utils.RunUnderSystemdScope(cmd.Process.Pid, cgroupParent, createUnitName("crio-conmon", c.id)); err != nil { logrus.Warnf("Failed to add conmon to sandbox cgroup: %v", err) } } } /* We set the cgroup, now the child can start creating children */ someData := []byte{0} _, err = parentStartPipe.Write(someData) if err != nil { return err } /* Wait for initial setup and fork, and reap child */ err = cmd.Wait() if err != nil { return err } // Wait to get container pid from conmon type syncStruct struct { si *syncInfo err error } ch := make(chan syncStruct) go func() { var si *syncInfo if err = json.NewDecoder(parentPipe).Decode(&si); err != nil { ch <- syncStruct{err: err} return } ch <- syncStruct{si: si} }() select { case ss := <-ch: if ss.err != nil { return fmt.Errorf("error reading container (probably exited) json message: %v", ss.err) } logrus.Debugf("Received container pid: %d", ss.si.Pid) if ss.si.Pid == -1 { if ss.si.Message != "" { logrus.Debugf("Container creation error: %s", ss.si.Message) return fmt.Errorf("container create failed: %s", ss.si.Message) } logrus.Debugf("Container creation failed") return fmt.Errorf("container create failed") } case <-time.After(ContainerCreateTimeout): return fmt.Errorf("create container timeout") } return nil } func createUnitName(prefix string, name string) string { return fmt.Sprintf("%s-%s.scope", prefix, name) } // StartContainer starts a container. func (r *Runtime) StartContainer(c *Container) error { c.opLock.Lock() defer c.opLock.Unlock() if err := utils.ExecCmdWithStdStreams(os.Stdin, os.Stdout, os.Stderr, r.Path(c), "start", c.id); err != nil { return err } c.state.Started = time.Now() return nil } // ExecSyncResponse is returned from ExecSync. type ExecSyncResponse struct { Stdout []byte Stderr []byte ExitCode int32 } // ExecSyncError wraps command's streams, exit code and error on ExecSync error. type ExecSyncError struct { Stdout bytes.Buffer Stderr bytes.Buffer ExitCode int32 Err error } func (e ExecSyncError) Error() string { return fmt.Sprintf("command error: %+v, stdout: %s, stderr: %s, exit code %d", e.Err, e.Stdout.Bytes(), e.Stderr.Bytes(), e.ExitCode) } func prepareExec() (pidFile, parentPipe, childPipe *os.File, err error) { parentPipe, childPipe, err = os.Pipe() if err != nil { return nil, nil, nil, err } pidFile, err = ioutil.TempFile("", "pidfile") if err != nil { parentPipe.Close() childPipe.Close() return nil, nil, nil, err } return } func parseLog(log []byte) (stdout, stderr []byte) { // Split the log on newlines, which is what separates entries. lines := bytes.SplitAfter(log, []byte{'\n'}) for _, line := range lines { // Ignore empty lines. if len(line) == 0 { continue } // The format of log lines is "DATE pipe REST". parts := bytes.SplitN(line, []byte{' '}, 3) if len(parts) < 3 { // Ignore the line if it's formatted incorrectly, but complain // about it so it can be debugged. logrus.Warnf("hit invalid log format: %q", string(line)) continue } pipe := string(parts[1]) content := parts[2] switch pipe { case "stdout": stdout = append(stdout, content...) case "stderr": stderr = append(stderr, content...) default: // Complain about unknown pipes. logrus.Warnf("hit invalid log format [unknown pipe %s]: %q", pipe, string(line)) continue } } return stdout, stderr } // ExecSync execs a command in a container and returns it's stdout, stderr and return code. func (r *Runtime) ExecSync(c *Container, command []string, timeout int64) (resp *ExecSyncResponse, err error) { pidFile, parentPipe, childPipe, err := prepareExec() if err != nil { return nil, ExecSyncError{ ExitCode: -1, Err: err, } } defer parentPipe.Close() defer func() { if e := os.Remove(pidFile.Name()); e != nil { logrus.Warnf("could not remove temporary PID file %s", pidFile.Name()) } }() logFile, err := ioutil.TempFile("", "crio-log-"+c.id) if err != nil { return nil, ExecSyncError{ ExitCode: -1, Err: err, } } logPath := logFile.Name() defer func() { logFile.Close() os.RemoveAll(logPath) }() f, err := ioutil.TempFile("", "exec-process") if err != nil { return nil, ExecSyncError{ ExitCode: -1, Err: err, } } defer os.RemoveAll(f.Name()) var args []string args = append(args, "-c", c.id) args = append(args, "-r", r.Path(c)) args = append(args, "-p", pidFile.Name()) args = append(args, "-e") if c.terminal { args = append(args, "-t") } if timeout > 0 { args = append(args, "-T") args = append(args, fmt.Sprintf("%d", timeout)) } args = append(args, "-l", logPath) pspec := rspec.Process{ Env: r.conmonEnv, Args: command, Cwd: "/", } processJSON, err := json.Marshal(pspec) if err != nil { return nil, ExecSyncError{ ExitCode: -1, Err: err, } } if err := ioutil.WriteFile(f.Name(), processJSON, 0644); err != nil { return nil, ExecSyncError{ ExitCode: -1, Err: err, } } args = append(args, "--exec-process-spec", f.Name()) cmd := exec.Command(r.conmonPath, args...) var stdoutBuf, stderrBuf bytes.Buffer cmd.Stdout = &stdoutBuf cmd.Stderr = &stderrBuf cmd.ExtraFiles = append(cmd.ExtraFiles, childPipe) // 0, 1 and 2 are stdin, stdout and stderr cmd.Env = append(r.conmonEnv, fmt.Sprintf("_OCI_SYNCPIPE=%d", 3)) err = cmd.Start() if err != nil { childPipe.Close() return nil, ExecSyncError{ Stdout: stdoutBuf, Stderr: stderrBuf, ExitCode: -1, Err: err, } } // We don't need childPipe on the parent side childPipe.Close() err = cmd.Wait() if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { if status, ok := exitErr.Sys().(unix.WaitStatus); ok { return nil, ExecSyncError{ Stdout: stdoutBuf, Stderr: stderrBuf, ExitCode: int32(status.ExitStatus()), Err: err, } } } else { return nil, ExecSyncError{ Stdout: stdoutBuf, Stderr: stderrBuf, ExitCode: -1, Err: err, } } } var ec *exitCodeInfo if err := json.NewDecoder(parentPipe).Decode(&ec); err != nil { return nil, ExecSyncError{ Stdout: stdoutBuf, Stderr: stderrBuf, ExitCode: -1, Err: err, } } logrus.Infof("Received container exit code: %v, message: %s", ec.ExitCode, ec.Message) if ec.ExitCode == -1 { return nil, ExecSyncError{ Stdout: stdoutBuf, Stderr: stderrBuf, ExitCode: -1, Err: fmt.Errorf(ec.Message), } } // The actual logged output is not the same as stdoutBuf and stderrBuf, // which are used for getting error information. For the actual // ExecSyncResponse we have to read the logfile. // XXX: Currently runC dups the same console over both stdout and stderr, // so we can't differentiate between the two. logBytes, err := ioutil.ReadFile(logPath) if err != nil { return nil, ExecSyncError{ Stdout: stdoutBuf, Stderr: stderrBuf, ExitCode: -1, Err: err, } } // We have to parse the log output into {stdout, stderr} buffers. stdoutBytes, stderrBytes := parseLog(logBytes) return &ExecSyncResponse{ Stdout: stdoutBytes, Stderr: stderrBytes, ExitCode: ec.ExitCode, }, nil } // StopContainer stops a container. Timeout is given in seconds. func (r *Runtime) StopContainer(c *Container, timeout int64) error { c.opLock.Lock() defer c.opLock.Unlock() // Check if the process is around before sending a signal err := unix.Kill(c.state.Pid, 0) if err == unix.ESRCH { c.state.Finished = time.Now() return nil } if err := utils.ExecCmdWithStdStreams(os.Stdin, os.Stdout, os.Stderr, r.Path(c), "kill", c.id, c.GetStopSignal()); err != nil { return fmt.Errorf("failed to stop container %s, %v", c.id, err) } if timeout == -1 { // default 10 seconds delay timeout = 10 } done := make(chan struct{}) // we could potentially re-use "done" channel to exit the loop on timeout // but we use another channel "chControl" so that we won't never incur in the // case the "done" channel is closed in the "default" select case and we also // reach the timeout in the select below. If that happens we could raise // a panic closing a closed channel so better be safe and use another new // channel just to control the loop. chControl := make(chan struct{}) go func() { for { select { case <-chControl: return default: // Check if the process is still around err := unix.Kill(c.state.Pid, 0) if err == unix.ESRCH { close(done) return } time.Sleep(100 * time.Millisecond) } } }() select { case <-done: return nil case <-time.After(time.Duration(timeout) * time.Second): close(chControl) err := unix.Kill(c.state.Pid, unix.SIGKILL) if err != nil && err != unix.ESRCH { return fmt.Errorf("failed to kill process: %v", err) } } c.state.Finished = time.Now() return nil } // DeleteContainer deletes a container. func (r *Runtime) DeleteContainer(c *Container) error { c.opLock.Lock() defer c.opLock.Unlock() _, err := utils.ExecCmd(r.Path(c), "delete", "--force", c.id) return err } // SetStartFailed sets the container state appropriately after a start failure func (r *Runtime) SetStartFailed(c *Container, err error) { c.opLock.Lock() defer c.opLock.Unlock() // adjust finished and started times c.state.Finished, c.state.Started = c.state.Created, c.state.Created c.state.Error = err.Error() } // UpdateStatus refreshes the status of the container. func (r *Runtime) UpdateStatus(c *Container) error { c.opLock.Lock() defer c.opLock.Unlock() out, err := exec.Command(r.Path(c), "state", c.id).CombinedOutput() if err != nil { // there are many code paths that could lead to have a bad state in the // underlying runtime. // On any error like a container went away or we rebooted and containers // went away we do not error out stopping kubernetes to recover. // We always populate the fields below so kube can restart/reschedule // containers failing. c.state.Status = ContainerStateStopped c.state.Finished = time.Now() c.state.ExitCode = 255 return nil } if err := json.NewDecoder(bytes.NewBuffer(out)).Decode(&c.state); err != nil { return fmt.Errorf("failed to decode container status for %s: %s", c.id, err) } if c.state.Status == ContainerStateStopped { exitFilePath := filepath.Join(r.containerExitsDir, c.id) var fi os.FileInfo err = kwait.ExponentialBackoff( kwait.Backoff{ Duration: 500 * time.Millisecond, Factor: 1.2, Steps: 6, }, func() (bool, error) { var err error fi, err = os.Stat(exitFilePath) if err != nil { // wait longer return false, nil } return true, nil }) if err != nil { logrus.Warnf("failed to find container exit file: %v", err) c.state.ExitCode = -1 } else { c.state.Finished = getFinishedTime(fi) statusCodeStr, err := ioutil.ReadFile(exitFilePath) if err != nil { return fmt.Errorf("failed to read exit file: %v", err) } statusCode, err := strconv.Atoi(string(statusCodeStr)) if err != nil { return fmt.Errorf("status code conversion failed: %v", err) } c.state.ExitCode = int32(statusCode) } oomFilePath := filepath.Join(c.bundlePath, "oom") if _, err = os.Stat(oomFilePath); err == nil { c.state.OOMKilled = true } } return nil } // ContainerStatus returns the state of a container. func (r *Runtime) ContainerStatus(c *Container) *ContainerState { c.opLock.Lock() defer c.opLock.Unlock() return c.state } // newPipe creates a unix socket pair for communication func newPipe() (parent *os.File, child *os.File, err error) { fds, err := unix.Socketpair(unix.AF_LOCAL, unix.SOCK_STREAM|unix.SOCK_CLOEXEC, 0) if err != nil { return nil, nil, err } return os.NewFile(uintptr(fds[1]), "parent"), os.NewFile(uintptr(fds[0]), "child"), nil } // RuntimeReady checks if the runtime is up and ready to accept // basic containers e.g. container only needs host network. func (r *Runtime) RuntimeReady() (bool, error) { return true, nil } // NetworkReady checks if the runtime network is up and ready to // accept containers which require container network. func (r *Runtime) NetworkReady() (bool, error) { return true, nil }