add container lib

Signed-off-by: Jess Frazelle <acidburn@microsoft.com>
This commit is contained in:
Jess Frazelle 2018-03-19 22:54:29 -04:00
parent d3fbdd212e
commit 7743701943
10 changed files with 445 additions and 397 deletions

135
container/container.go Normal file
View file

@ -0,0 +1,135 @@
// Package container allows for running a process in a container.
package container
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"github.com/coreos/go-systemd/activation"
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/cgroups/systemd"
"github.com/opencontainers/runc/libcontainer/specconv"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
// Container defines the behavior and settings for a container object.
type Container struct {
ID string
Spec *specs.Spec
PIDFile string
ConsoleSocket string
Root string
Detach bool
UseSystemdCgroup bool
NoPivotRoot bool
NoNewKeyring bool
Rootless bool
}
// Run starts the container. It returns the exit status or -1 and an
// error. Signals sent to the current process will be forwarded to container.
func (c *Container) Run() (int, error) {
var err error
// Convert pid-file to an absolute path so we can write to the
// right file after chdir to bundle.
if c.PIDFile != "" {
c.PIDFile, err = filepath.Abs(c.PIDFile)
if err != nil {
return -1, err
}
}
// Get the absolute path to the root.
c.Root, err = filepath.Abs(c.Root)
if err != nil {
return -1, err
}
notifySocket := newNotifySocket(c.ID, c.Root)
if notifySocket != nil {
// Setup the spec for the notify socket.
notifySocket.setupSpec(c.Spec)
}
// Create the libcontainer config.
config, err := specconv.CreateLibcontainerConfig(&specconv.CreateOpts{
CgroupName: c.ID,
UseSystemdCgroup: c.UseSystemdCgroup,
NoPivotRoot: c.NoPivotRoot,
NoNewKeyring: c.NoNewKeyring,
Spec: c.Spec,
Rootless: c.Rootless,
})
if err != nil {
return -1, err
}
// Setup the cgroups manager. Default is cgroupfs.
cgroupManager := libcontainer.Cgroupfs
if c.UseSystemdCgroup {
if systemd.UseSystemd() {
cgroupManager = libcontainer.SystemdCgroups
} else {
return -1, fmt.Errorf("systemd cgroup flag passed, but systemd support for managing cgroups is not available")
}
}
// We resolve the paths for {newuidmap,newgidmap} from the context of runc,
// to avoid doing a path lookup in the nsexec context. TODO: The binary
// names are not currently configurable.
newuidmap, err := exec.LookPath("newuidmap")
if err != nil {
newuidmap = ""
}
newgidmap, err := exec.LookPath("newgidmap")
if err != nil {
newgidmap = ""
}
// Create the new libcontainer factory.
factory, err := libcontainer.New(c.Root, cgroupManager, nil, nil,
libcontainer.NewuidmapPath(newuidmap),
libcontainer.NewgidmapPath(newgidmap))
if err != nil {
return -1, err
}
// Create the factory.
container, err := factory.Create(c.ID, config)
if err != nil {
return -1, err
}
if notifySocket != nil {
// Setup the socket for the notify socket.
err := notifySocket.setupSocket()
if err != nil {
return -1, err
}
}
// Support on-demand socket activation by passing file descriptors into
// the container init process.
listenFDs := []*os.File{}
if os.Getenv("LISTEN_FDS") != "" {
listenFDs = activation.Files(false)
}
// Initialize the runner.
r := &runner{
enableSubreaper: true,
shouldDestroy: true,
container: container,
listenFDs: listenFDs,
notifySocket: notifySocket,
consoleSocket: c.ConsoleSocket,
detach: c.Detach,
pidFile: c.PIDFile,
}
// Run the process.
return r.run(c.Spec.Process)
}

106
container/notify_socket.go Normal file
View file

@ -0,0 +1,106 @@
package container
import (
"bytes"
"fmt"
"net"
"os"
"path/filepath"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/sirupsen/logrus"
)
type notifySocket struct {
socket *net.UnixConn
host string
socketPath string
}
func newNotifySocket(id, root string) *notifySocket {
if os.Getenv("NOTIFY_SOCKET") == "" {
// Return early if we do not have a NOTIFY_SOCKET.
return nil
}
path := filepath.Join(filepath.Join(root, id), "notify.sock")
notifySocket := &notifySocket{
socket: nil,
host: os.Getenv("NOTIFY_SOCKET"),
socketPath: path,
}
return notifySocket
}
func (s *notifySocket) Close() error {
return s.socket.Close()
}
// If systemd is supporting sd_notify protocol, this function will add support
// for sd_notify protocol from within the container.
func (s *notifySocket) setupSpec(spec *specs.Spec) {
mount := specs.Mount{Destination: s.host, Type: "bind", Source: s.socketPath, Options: []string{"bind"}}
spec.Mounts = append(spec.Mounts, mount)
spec.Process.Env = append(spec.Process.Env, fmt.Sprintf("NOTIFY_SOCKET=%s", s.host))
}
func (s *notifySocket) setupSocket() error {
addr := net.UnixAddr{
Name: s.socketPath,
Net: "unixgram",
}
socket, err := net.ListenUnixgram("unixgram", &addr)
if err != nil {
return err
}
s.socket = socket
return nil
}
// pid1 must be set only with -d, as it is used to set the new process as the main process
// for the service in butts
func (s *notifySocket) run(pid1 int) {
buf := make([]byte, 512)
notifySocketHostAddr := net.UnixAddr{Name: s.host, Net: "unixgram"}
client, err := net.DialUnix("unixgram", nil, &notifySocketHostAddr)
if err != nil {
logrus.Error(err)
return
}
for {
r, err := s.socket.Read(buf)
if err != nil {
break
}
var out bytes.Buffer
for _, line := range bytes.Split(buf[0:r], []byte{'\n'}) {
if bytes.HasPrefix(line, []byte("READY=")) {
_, err = out.Write(line)
if err != nil {
return
}
_, err = out.Write([]byte{'\n'})
if err != nil {
return
}
_, err = client.Write(out.Bytes())
if err != nil {
return
}
// now we can inform butts to use pid1 as the pid to monitor
if pid1 > 0 {
newPid := fmt.Sprintf("MAINPID=%d\n", pid1)
client.Write([]byte(newPid))
}
return
}
}
}
}

143
container/process.go Normal file
View file

@ -0,0 +1,143 @@
package container
import (
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/configs"
"github.com/opencontainers/runc/libcontainer/utils"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/sirupsen/logrus"
)
// newProcess returns a new libcontainer Process with the arguments from the
// spec and stdio from the current process.
func newProcess(p specs.Process) (*libcontainer.Process, error) {
// Create the libcontainer process.
lp := &libcontainer.Process{
Args: p.Args,
Env: p.Env,
User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID),
Cwd: p.Cwd,
Label: p.SelinuxLabel,
NoNewPrivileges: &p.NoNewPrivileges,
AppArmorProfile: p.ApparmorProfile,
}
// Setup the console size.
if p.ConsoleSize != nil {
lp.ConsoleWidth = uint16(p.ConsoleSize.Width)
lp.ConsoleHeight = uint16(p.ConsoleSize.Height)
}
// Convert the capabilities.
if p.Capabilities != nil {
lp.Capabilities = &configs.Capabilities{}
lp.Capabilities.Bounding = p.Capabilities.Bounding
lp.Capabilities.Effective = p.Capabilities.Effective
lp.Capabilities.Inheritable = p.Capabilities.Inheritable
lp.Capabilities.Permitted = p.Capabilities.Permitted
lp.Capabilities.Ambient = p.Capabilities.Ambient
}
// Setup the additional user groups.
for _, gid := range p.User.AdditionalGids {
lp.AdditionalGroups = append(lp.AdditionalGroups, strconv.FormatUint(uint64(gid), 10))
}
// Setup the Rlimits.
for _, rlimit := range p.Rlimits {
rl, err := createLibContainerRlimit(rlimit)
if err != nil {
return nil, err
}
lp.Rlimits = append(lp.Rlimits, rl)
}
return lp, nil
}
func destroy(container libcontainer.Container) {
if err := container.Destroy(); err != nil {
logrus.Error(err)
}
}
func setupIO(process *libcontainer.Process, rootuid, rootgid int, createTTY, detach bool, sockpath string) (*tty, error) {
if createTTY {
process.Stdin = nil
process.Stdout = nil
process.Stderr = nil
t := &tty{}
if !detach {
parent, child, err := utils.NewSockPair("console")
if err != nil {
return nil, err
}
process.ConsoleSocket = child
t.postStart = append(t.postStart, parent, child)
t.consoleC = make(chan error, 1)
go func() {
if err := t.recvtty(process, parent); err != nil {
t.consoleC <- err
}
t.consoleC <- nil
}()
} else {
// the caller of runc will handle receiving the console master
conn, err := net.Dial("unix", sockpath)
if err != nil {
return nil, err
}
uc, ok := conn.(*net.UnixConn)
if !ok {
return nil, fmt.Errorf("casting to UnixConn failed")
}
t.postStart = append(t.postStart, uc)
socket, err := uc.File()
if err != nil {
return nil, err
}
t.postStart = append(t.postStart, socket)
process.ConsoleSocket = socket
}
return t, nil
}
// when runc will detach the caller provides the stdio to runc via runc's 0,1,2
// and the container's process inherits runc's stdio.
if detach {
if err := inheritStdio(process); err != nil {
return nil, err
}
return &tty{}, nil
}
return setupProcessPipes(process, rootuid, rootgid)
}
// createPidFile creates a file with the processes pid inside it atomically
// it creates a temp file with the paths filename + '.' infront of it
// then renames the file
func createPidFile(path string, process *libcontainer.Process) error {
pid, err := process.Pid()
if err != nil {
return err
}
var (
tmpDir = filepath.Dir(path)
tmpName = filepath.Join(tmpDir, fmt.Sprintf(".%s", filepath.Base(path)))
)
f, err := os.OpenFile(tmpName, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
}
_, err = fmt.Fprintf(f, "%d", pid)
f.Close()
if err != nil {
return err
}
return os.Rename(tmpName, path)
}

66
container/rlimit.go Normal file
View file

@ -0,0 +1,66 @@
package container
import (
"fmt"
"github.com/opencontainers/runc/libcontainer/configs"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
const (
rLimitCPU = iota // CPU time in sec
rLimitFsize // Maximum filesize
rLimitData // max data size
rLimitStack // max stack size
rLimitCore // max core file size
rLimitRss // max resident set size
rLimitNproc // max number of processes
rLimitNofile // max number of open files
rLimitMemlock // max locked-in-memory address space
rLimitAs // address space limit
rLimitLocks // maximum file locks held
rLimitSigpending // max number of pending signals
rLimitMsgqueue // maximum bytes in POSIX mqueues
rLimitNice // max nice prio allowed to raise to
rLimitRtprio // maximum realtime priority
rLimitRttime // timeout for RT tasks in us
)
var rlimitMap = map[string]int{
"RLIMIT_CPU": rLimitCPU,
"RLIMIT_FSIZE": rLimitFsize,
"RLIMIT_DATA": rLimitData,
"RLIMIT_STACK": rLimitStack,
"RLIMIT_CORE": rLimitCore,
"RLIMIT_RSS": rLimitRss,
"RLIMIT_NPROC": rLimitNproc,
"RLIMIT_NOFILE": rLimitNofile,
"RLIMIT_MEMLOCK": rLimitMemlock,
"RLIMIT_AS": rLimitAs,
"RLIMIT_LOCKS": rLimitLocks,
"RLIMIT_SIGPENDING": rLimitSigpending,
"RLIMIT_MSGQUEUE": rLimitMsgqueue,
"RLIMIT_NICE": rLimitNice,
"RLIMIT_RTPRIO": rLimitRtprio,
"RLIMIT_RTTIME": rLimitRttime,
}
func strToRlimit(key string) (int, error) {
rl, ok := rlimitMap[key]
if !ok {
return 0, fmt.Errorf("wrong rlimit value: %s", key)
}
return rl, nil
}
func createLibContainerRlimit(rlimit specs.POSIXRlimit) (configs.Rlimit, error) {
rl, err := strToRlimit(rlimit.Type)
if err != nil {
return configs.Rlimit{}, err
}
return configs.Rlimit{
Type: rl,
Hard: rlimit.Hard,
Soft: rlimit.Soft,
}, nil
}

129
container/runner.go Normal file
View file

@ -0,0 +1,129 @@
package container
import (
"fmt"
"os"
"github.com/opencontainers/runc/libcontainer"
specs "github.com/opencontainers/runtime-spec/specs-go"
"golang.org/x/sys/unix"
)
type runner struct {
enableSubreaper bool
detach bool
shouldDestroy bool
consoleSocket string
pidFile string
container libcontainer.Container
listenFDs []*os.File
notifySocket *notifySocket
}
func (r *runner) run(config *specs.Process) (int, error) {
// Check the terminal settings.
if r.detach && config.Terminal && r.consoleSocket == "" {
return -1, fmt.Errorf("cannot allocate tty if runc will detach without setting console socket")
}
if (!r.detach || !config.Terminal) && r.consoleSocket != "" {
return -1, fmt.Errorf("cannot use console socket if runc will not detach or allocate tty")
}
// Create the process.
process, err := newProcess(*config)
if err != nil {
r.destroy()
return -1, err
}
// Setup the listen file descriptors.
if len(r.listenFDs) > 0 {
process.Env = append(process.Env, fmt.Sprintf("LISTEN_FDS=%d", len(r.listenFDs)), "LISTEN_PID=1")
process.ExtraFiles = append(process.ExtraFiles, r.listenFDs...)
}
// Get the rootuid.
rootuid, err := r.container.Config().HostRootUID()
if err != nil {
r.destroy()
return -1, err
}
// Get the rootgid.
rootgid, err := r.container.Config().HostRootGID()
if err != nil {
r.destroy()
return -1, err
}
// Setting up IO is a two stage process. We need to modify process to deal
// with detaching containers, and then we get a tty after the container has
// started.
handler := newSignalHandler(r.enableSubreaper, r.notifySocket)
tty, err := setupIO(process, rootuid, rootgid, config.Terminal, r.detach, r.consoleSocket)
if err != nil {
r.destroy()
return -1, err
}
defer tty.Close()
// Run the container.
if err := r.container.Run(process); err != nil {
r.destroy()
tty.Close()
return -1, err
}
// Wait for the tty.
if err := tty.waitConsole(); err != nil {
r.terminate(process)
r.destroy()
tty.Close()
return -1, err
}
// Close after start the tty.
if err = tty.ClosePostStart(); err != nil {
r.terminate(process)
r.destroy()
tty.Close()
return -1, err
}
// Create the pid file.
if r.pidFile != "" {
if err := createPidFile(r.pidFile, process); err != nil {
r.terminate(process)
r.destroy()
tty.Close()
return -1, err
}
}
// Forward the handler.
status, err := handler.forward(process, tty, r.detach)
if err != nil {
r.terminate(process)
}
// Return early if we are detaching.
if r.detach {
return 0, nil
}
// Cleanup.
r.destroy()
return status, err
}
func (r *runner) destroy() {
if r.shouldDestroy {
destroy(r.container)
}
}
func (r *runner) terminate(p *libcontainer.Process) {
_ = p.Signal(unix.SIGKILL)
_, _ = p.Wait()
}

138
container/signals.go Normal file
View file

@ -0,0 +1,138 @@
package container
import (
"os"
"os/signal"
"syscall" // only for Signal
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/system"
"github.com/opencontainers/runc/libcontainer/utils"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
const signalBufferSize = 2048
// newSignalHandler returns a signal handler for processing SIGCHLD and SIGWINCH signals
// while still forwarding all other signals to the process.
// If notifySocket is present, use it to read butts notifications from the container and
// forward them to notifySocketHost.
func newSignalHandler(enableSubreaper bool, notifySocket *notifySocket) *signalHandler {
if enableSubreaper {
// set us as the subreaper before registering the signal handler for the container
if err := system.SetSubreaper(1); err != nil {
logrus.Warn(err)
}
}
// ensure that we have a large buffer size so that we do not miss any signals
// incase we are not processing them fast enough.
s := make(chan os.Signal, signalBufferSize)
// handle all signals for the process.
signal.Notify(s)
return &signalHandler{
signals: s,
notifySocket: notifySocket,
}
}
// exit models a process exit status with the pid and
// exit status.
type exit struct {
pid int
status int
}
type signalHandler struct {
signals chan os.Signal
notifySocket *notifySocket
}
// forward handles the main signal event loop forwarding, resizing, or reaping depending
// on the signal received.
func (h *signalHandler) forward(process *libcontainer.Process, tty *tty, detach bool) (int, error) {
// make sure we know the pid of our main process so that we can return
// after it dies.
if detach && h.notifySocket == nil {
return 0, nil
}
pid1, err := process.Pid()
if err != nil {
return -1, err
}
if h.notifySocket != nil {
if detach {
h.notifySocket.run(pid1)
return 0, nil
}
go h.notifySocket.run(0)
}
// Perform the initial tty resize. Always ignore errors resizing because
// stdout might have disappeared (due to races with when SIGHUP is sent).
_ = tty.resize()
// Handle and forward signals.
for s := range h.signals {
switch s {
case unix.SIGWINCH:
// Ignore errors resizing, as above.
_ = tty.resize()
case unix.SIGCHLD:
exits, err := h.reap()
if err != nil {
logrus.Error(err)
}
for _, e := range exits {
logrus.WithFields(logrus.Fields{
"pid": e.pid,
"status": e.status,
}).Debug("process exited")
if e.pid == pid1 {
// call Wait() on the process even though we already have the exit
// status because we must ensure that any of the go specific process
// fun such as flushing pipes are complete before we return.
process.Wait()
if h.notifySocket != nil {
h.notifySocket.Close()
}
return e.status, nil
}
}
default:
logrus.Debugf("sending signal to process %s", s)
if err := unix.Kill(pid1, s.(syscall.Signal)); err != nil {
logrus.Error(err)
}
}
}
return -1, nil
}
// reap runs wait4 in a loop until we have finished processing any existing exits
// then returns all exits to the main event loop for further processing.
func (h *signalHandler) reap() (exits []exit, err error) {
var (
ws unix.WaitStatus
rus unix.Rusage
)
for {
pid, err := unix.Wait4(-1, &ws, unix.WNOHANG, &rus)
if err != nil {
if err == unix.ECHILD {
return exits, nil
}
return nil, err
}
if pid <= 0 {
return exits, nil
}
exits = append(exits, exit{
pid: pid,
status: utils.ExitStatus(ws),
})
}
}

163
container/tty.go Normal file
View file

@ -0,0 +1,163 @@
package container
import (
"fmt"
"io"
"os"
"os/signal"
"sync"
"github.com/containerd/console"
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/utils"
)
type tty struct {
epoller *console.Epoller
console *console.EpollConsole
stdin console.Console
closers []io.Closer
postStart []io.Closer
wg sync.WaitGroup
consoleC chan error
}
func (t *tty) copyIO(w io.Writer, r io.ReadCloser) {
defer t.wg.Done()
io.Copy(w, r)
r.Close()
}
// setup pipes for the process so that advanced features like c/r are able to easily checkpoint
// and restore the process's IO without depending on a host specific path or device
func setupProcessPipes(p *libcontainer.Process, rootuid, rootgid int) (*tty, error) {
i, err := p.InitializeIO(rootuid, rootgid)
if err != nil {
return nil, err
}
t := &tty{
closers: []io.Closer{
i.Stdin,
i.Stdout,
i.Stderr,
},
}
// add the process's io to the post start closers if they support close
for _, cc := range []interface{}{
p.Stdin,
p.Stdout,
p.Stderr,
} {
if c, ok := cc.(io.Closer); ok {
t.postStart = append(t.postStart, c)
}
}
go func() {
io.Copy(i.Stdin, os.Stdin)
i.Stdin.Close()
}()
t.wg.Add(2)
go t.copyIO(os.Stdout, i.Stdout)
go t.copyIO(os.Stderr, i.Stderr)
return t, nil
}
func inheritStdio(process *libcontainer.Process) error {
process.Stdin = os.Stdin
process.Stdout = os.Stdout
process.Stderr = os.Stderr
return nil
}
func (t *tty) recvtty(process *libcontainer.Process, socket *os.File) error {
f, err := utils.RecvFd(socket)
if err != nil {
return err
}
cons, err := console.ConsoleFromFile(f)
if err != nil {
return err
}
console.ClearONLCR(cons.Fd())
epoller, err := console.NewEpoller()
if err != nil {
return err
}
epollConsole, err := epoller.Add(cons)
if err != nil {
return err
}
go epoller.Wait()
go io.Copy(epollConsole, os.Stdin)
t.wg.Add(1)
go t.copyIO(os.Stdout, epollConsole)
// set raw mode to stdin and also handle interrupt
stdin, err := console.ConsoleFromFile(os.Stdin)
if err != nil {
return err
}
if err := stdin.SetRaw(); err != nil {
return fmt.Errorf("failed to set the terminal from the stdin: %v", err)
}
go handleInterrupt(stdin)
t.epoller = epoller
t.stdin = stdin
t.console = epollConsole
t.closers = []io.Closer{epollConsole}
return nil
}
func handleInterrupt(c console.Console) {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
<-sigchan
c.Reset()
os.Exit(0)
}
func (t *tty) waitConsole() error {
if t.consoleC != nil {
return <-t.consoleC
}
return nil
}
// ClosePostStart closes any fds that are provided to the container and dup2'd
// so that we no longer have copy in our process.
func (t *tty) ClosePostStart() error {
for _, c := range t.postStart {
c.Close()
}
return nil
}
// Close closes all open fds for the tty and/or restores the orignal
// stdin state to what it was prior to the container execution
func (t *tty) Close() error {
// ensure that our side of the fds are always closed
for _, c := range t.postStart {
c.Close()
}
// the process is gone at this point, shutting down the console if we have
// one and wait for all IO to be finished
if t.console != nil && t.epoller != nil {
t.console.Shutdown(t.epoller.CloseConsole)
}
t.wg.Wait()
for _, c := range t.closers {
c.Close()
}
if t.stdin != nil {
t.stdin.Reset()
}
return nil
}
func (t *tty) resize() error {
if t.console == nil {
return nil
}
return t.console.ResizeFrom(console.Current())
}