Add no_shim
config for not running with a shim
This reuses the exiting shim code and services to let containerd run as the reaper for all container processes without the use of a shim. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
a160a6a068
commit
1d7fa45403
9 changed files with 199 additions and 78 deletions
cmd
linux
reaper
|
@ -57,16 +57,20 @@ func main() {
|
|||
if err := setupRoot(); err != nil {
|
||||
return err
|
||||
}
|
||||
path, err := os.Getwd()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var (
|
||||
server = grpc.NewServer()
|
||||
sv = shim.New()
|
||||
sv = shim.New(path)
|
||||
)
|
||||
logrus.Debug("registering grpc server")
|
||||
shimapi.RegisterShimServer(server, sv)
|
||||
if err := serve(server, "shim.sock"); err != nil {
|
||||
return err
|
||||
}
|
||||
return handleSignals(signals, server, sv)
|
||||
return handleSignals(signals, server)
|
||||
}
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
|
||||
|
@ -107,24 +111,14 @@ func serve(server *grpc.Server, path string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func handleSignals(signals chan os.Signal, server *grpc.Server, service *shim.Service) error {
|
||||
func handleSignals(signals chan os.Signal, server *grpc.Server) error {
|
||||
for s := range signals {
|
||||
logrus.WithField("signal", s).Debug("received signal")
|
||||
switch s {
|
||||
case syscall.SIGCHLD:
|
||||
exits, err := reaper.Reap()
|
||||
if err != nil {
|
||||
if err := reaper.Reap(); err != nil {
|
||||
logrus.WithError(err).Error("reap exit status")
|
||||
}
|
||||
for _, e := range exits {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"status": e.Status,
|
||||
"pid": e.Pid,
|
||||
}).Debug("process exited")
|
||||
if err := service.ProcessExit(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case syscall.SIGTERM, syscall.SIGINT:
|
||||
// TODO: should we forward signals to the processes if they are still running?
|
||||
// i.e. machine reboot
|
||||
|
|
|
@ -372,7 +372,7 @@ func handleSignals(signals chan os.Signal, server *grpc.Server) error {
|
|||
log.G(global).WithField("signal", s).Debug("received signal")
|
||||
switch s {
|
||||
case syscall.SIGCHLD:
|
||||
if _, err := reaper.Reap(); err != nil {
|
||||
if err := reaper.Reap(); err != nil {
|
||||
log.G(global).WithError(err).Error("reap containerd processes")
|
||||
}
|
||||
default:
|
||||
|
|
|
@ -36,6 +36,8 @@ func init() {
|
|||
type Config struct {
|
||||
// Runtime is a path or name of an OCI runtime used by the shim
|
||||
Runtime string `toml:"runtime"`
|
||||
// NoShim calls runc directly from within the pkg
|
||||
NoShim bool `toml:"no_shim"`
|
||||
}
|
||||
|
||||
func New(ic *plugin.InitContext) (interface{}, error) {
|
||||
|
@ -50,6 +52,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||
c, cancel := context.WithCancel(ic.Context)
|
||||
return &Runtime{
|
||||
root: path,
|
||||
remote: !cfg.NoShim,
|
||||
runtime: cfg.Runtime,
|
||||
events: make(chan *containerd.Event, 2048),
|
||||
eventsContext: c,
|
||||
|
@ -60,6 +63,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||
type Runtime struct {
|
||||
root string
|
||||
runtime string
|
||||
remote bool
|
||||
|
||||
events chan *containerd.Event
|
||||
eventsContext context.Context
|
||||
|
@ -71,7 +75,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts containerd.CreateO
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s, err := newShim(path)
|
||||
s, err := newShim(path, r.remote)
|
||||
if err != nil {
|
||||
os.RemoveAll(path)
|
||||
return nil, err
|
||||
|
@ -205,7 +209,7 @@ func (r *Runtime) deleteBundle(id string) error {
|
|||
|
||||
func (r *Runtime) loadContainer(path string) (*Container, error) {
|
||||
id := filepath.Base(path)
|
||||
s, err := loadShim(path)
|
||||
s, err := loadShim(path, r.remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -14,12 +14,16 @@ import (
|
|||
"google.golang.org/grpc/grpclog"
|
||||
|
||||
"github.com/docker/containerd/api/services/shim"
|
||||
localShim "github.com/docker/containerd/linux/shim"
|
||||
"github.com/docker/containerd/reaper"
|
||||
"github.com/docker/containerd/utils"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func newShim(path string) (shim.ShimClient, error) {
|
||||
func newShim(path string, remote bool) (shim.ShimClient, error) {
|
||||
if !remote {
|
||||
return localShim.Client(path), nil
|
||||
}
|
||||
socket := filepath.Join(path, "shim.sock")
|
||||
l, err := utils.CreateUnixSocket(socket)
|
||||
if err != nil {
|
||||
|
@ -48,7 +52,10 @@ func newShim(path string) (shim.ShimClient, error) {
|
|||
return connectShim(socket)
|
||||
}
|
||||
|
||||
func loadShim(path string) (shim.ShimClient, error) {
|
||||
func loadShim(path string, remote bool) (shim.ShimClient, error) {
|
||||
if !remote {
|
||||
return localShim.Client(path), nil
|
||||
}
|
||||
socket := filepath.Join(path, "shim.sock")
|
||||
return connectShim(socket)
|
||||
// TODO: failed to connect to the shim, check if it's alive
|
||||
|
|
105
linux/shim/client.go
Normal file
105
linux/shim/client.go
Normal file
|
@ -0,0 +1,105 @@
|
|||
package shim
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
|
||||
shimapi "github.com/docker/containerd/api/services/shim"
|
||||
"github.com/docker/containerd/api/types/container"
|
||||
google_protobuf "github.com/golang/protobuf/ptypes/empty"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func Client(path string) shimapi.ShimClient {
|
||||
return &client{
|
||||
s: New(path),
|
||||
}
|
||||
}
|
||||
|
||||
type client struct {
|
||||
s *Service
|
||||
}
|
||||
|
||||
func (c *client) Create(ctx context.Context, in *shimapi.CreateRequest, opts ...grpc.CallOption) (*shimapi.CreateResponse, error) {
|
||||
return c.s.Create(ctx, in)
|
||||
}
|
||||
|
||||
func (c *client) Start(ctx context.Context, in *shimapi.StartRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
|
||||
return c.s.Start(ctx, in)
|
||||
}
|
||||
|
||||
func (c *client) Delete(ctx context.Context, in *shimapi.DeleteRequest, opts ...grpc.CallOption) (*shimapi.DeleteResponse, error) {
|
||||
return c.s.Delete(ctx, in)
|
||||
}
|
||||
|
||||
func (c *client) Exec(ctx context.Context, in *shimapi.ExecRequest, opts ...grpc.CallOption) (*shimapi.ExecResponse, error) {
|
||||
return c.s.Exec(ctx, in)
|
||||
}
|
||||
|
||||
func (c *client) Pty(ctx context.Context, in *shimapi.PtyRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
|
||||
return c.s.Pty(ctx, in)
|
||||
}
|
||||
|
||||
func (c *client) Events(ctx context.Context, in *shimapi.EventsRequest, opts ...grpc.CallOption) (shimapi.Shim_EventsClient, error) {
|
||||
return &events{
|
||||
c: c.s.events,
|
||||
ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *client) State(ctx context.Context, in *shimapi.StateRequest, opts ...grpc.CallOption) (*shimapi.StateResponse, error) {
|
||||
return c.s.State(ctx, in)
|
||||
}
|
||||
|
||||
func (c *client) Pause(ctx context.Context, in *shimapi.PauseRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
|
||||
return c.s.Pause(ctx, in)
|
||||
}
|
||||
|
||||
func (c *client) Resume(ctx context.Context, in *shimapi.ResumeRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
|
||||
return c.s.Resume(ctx, in)
|
||||
}
|
||||
|
||||
func (c *client) Exit(ctx context.Context, in *shimapi.ExitRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
|
||||
// don't exit the calling process for the client
|
||||
// but make sure we unmount the containers rootfs for this client
|
||||
if err := syscall.Unmount(filepath.Join(c.s.path, "rootfs"), 0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
type events struct {
|
||||
c chan *container.Event
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (e *events) Recv() (*container.Event, error) {
|
||||
ev := <-e.c
|
||||
return ev, nil
|
||||
}
|
||||
|
||||
func (e *events) Header() (metadata.MD, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (e *events) Trailer() metadata.MD {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *events) CloseSend() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *events) Context() context.Context {
|
||||
return e.ctx
|
||||
}
|
||||
|
||||
func (e *events) SendMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *events) RecvMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
|
@ -25,22 +25,19 @@ type execProcess struct {
|
|||
parent *initProcess
|
||||
}
|
||||
|
||||
func newExecProcess(context context.Context, r *shimapi.ExecRequest, parent *initProcess, id int) (process, error) {
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func newExecProcess(context context.Context, path string, r *shimapi.ExecRequest, parent *initProcess, id int) (process, error) {
|
||||
e := &execProcess{
|
||||
id: id,
|
||||
parent: parent,
|
||||
}
|
||||
var (
|
||||
err error
|
||||
socket *runc.ConsoleSocket
|
||||
io runc.IO
|
||||
pidfile = filepath.Join(cwd, fmt.Sprintf("%d.pid", id))
|
||||
pidfile = filepath.Join(path, fmt.Sprintf("%d.pid", id))
|
||||
)
|
||||
if r.Terminal {
|
||||
if socket, err = runc.NewConsoleSocket(filepath.Join(cwd, "pty.sock")); err != nil {
|
||||
if socket, err = runc.NewConsoleSocket(filepath.Join(path, "pty.sock")); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer os.Remove(socket.Path())
|
||||
|
|
|
@ -25,24 +25,20 @@ type initProcess struct {
|
|||
pid int
|
||||
}
|
||||
|
||||
func newInitProcess(context context.Context, r *shimapi.CreateRequest) (*initProcess, error) {
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func newInitProcess(context context.Context, path string, r *shimapi.CreateRequest) (*initProcess, error) {
|
||||
for _, rm := range r.Rootfs {
|
||||
m := &containerd.Mount{
|
||||
Type: rm.Type,
|
||||
Source: rm.Source,
|
||||
Options: rm.Options,
|
||||
}
|
||||
if err := m.Mount(filepath.Join(cwd, "rootfs")); err != nil {
|
||||
if err := m.Mount(filepath.Join(path, "rootfs")); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
runtime := &runc.Runc{
|
||||
Command: r.Runtime,
|
||||
Log: filepath.Join(cwd, "log.json"),
|
||||
Log: filepath.Join(path, "log.json"),
|
||||
LogFormat: runc.JSON,
|
||||
PdeathSignal: syscall.SIGKILL,
|
||||
}
|
||||
|
@ -52,11 +48,12 @@ func newInitProcess(context context.Context, r *shimapi.CreateRequest) (*initPro
|
|||
runc: runtime,
|
||||
}
|
||||
var (
|
||||
err error
|
||||
socket *runc.ConsoleSocket
|
||||
io runc.IO
|
||||
)
|
||||
if r.Terminal {
|
||||
if socket, err = runc.NewConsoleSocket(filepath.Join(cwd, "pty.sock")); err != nil {
|
||||
if socket, err = runc.NewConsoleSocket(filepath.Join(path, "pty.sock")); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer os.Remove(socket.Path())
|
||||
|
@ -68,7 +65,7 @@ func newInitProcess(context context.Context, r *shimapi.CreateRequest) (*initPro
|
|||
p.io = io
|
||||
}
|
||||
opts := &runc.CreateOpts{
|
||||
PidFile: filepath.Join(cwd, "init.pid"),
|
||||
PidFile: filepath.Join(path, "init.pid"),
|
||||
ConsoleSocket: socket,
|
||||
IO: io,
|
||||
NoPivot: r.NoPivot,
|
||||
|
|
|
@ -5,10 +5,11 @@ import (
|
|||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/crosbymichael/console"
|
||||
shimapi "github.com/docker/containerd/api/services/shim"
|
||||
"github.com/docker/containerd/api/types/container"
|
||||
"github.com/docker/containerd/utils"
|
||||
"github.com/docker/containerd/reaper"
|
||||
google_protobuf "github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -17,8 +18,9 @@ import (
|
|||
var empty = &google_protobuf.Empty{}
|
||||
|
||||
// New returns a new shim service that can be used via GRPC
|
||||
func New() *Service {
|
||||
func New(path string) *Service {
|
||||
return &Service{
|
||||
path: path,
|
||||
processes: make(map[int]process),
|
||||
events: make(chan *container.Event, 4096),
|
||||
}
|
||||
|
@ -26,6 +28,7 @@ func New() *Service {
|
|||
|
||||
type Service struct {
|
||||
initProcess *initProcess
|
||||
path string
|
||||
id string
|
||||
bundle string
|
||||
mu sync.Mutex
|
||||
|
@ -35,7 +38,7 @@ type Service struct {
|
|||
}
|
||||
|
||||
func (s *Service) Create(ctx context.Context, r *shimapi.CreateRequest) (*shimapi.CreateResponse, error) {
|
||||
process, err := newInitProcess(ctx, r)
|
||||
process, err := newInitProcess(ctx, s.path, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -46,6 +49,23 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateRequest) (*shimap
|
|||
pid := process.Pid()
|
||||
s.processes[pid] = process
|
||||
s.mu.Unlock()
|
||||
cmd := &reaper.Cmd{
|
||||
ExitCh: make(chan int, 1),
|
||||
}
|
||||
reaper.Default.Register(pid, cmd)
|
||||
go func() {
|
||||
status, err := reaper.Default.WaitPid(pid)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("waitpid")
|
||||
}
|
||||
process.Exited(status)
|
||||
s.events <- &container.Event{
|
||||
Type: container.Event_EXIT,
|
||||
ID: s.id,
|
||||
Pid: uint32(pid),
|
||||
ExitStatus: uint32(status),
|
||||
}
|
||||
}()
|
||||
s.events <- &container.Event{
|
||||
Type: container.Event_CREATE,
|
||||
ID: r.ID,
|
||||
|
@ -90,7 +110,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecRequest) (*shimapi.Ex
|
|||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.execID++
|
||||
process, err := newExecProcess(ctx, r, s.initProcess, s.execID)
|
||||
process, err := newExecProcess(ctx, s.path, r, s.initProcess, s.execID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -197,18 +217,3 @@ func (s *Service) Exit(ctx context.Context, r *shimapi.ExitRequest) (*google_pro
|
|||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
func (s *Service) ProcessExit(e utils.Exit) error {
|
||||
s.mu.Lock()
|
||||
if p, ok := s.processes[e.Pid]; ok {
|
||||
p.Exited(e.Status)
|
||||
s.events <- &container.Event{
|
||||
Type: container.Event_EXIT,
|
||||
ID: s.id,
|
||||
Pid: uint32(p.Pid()),
|
||||
ExitStatus: uint32(e.Status),
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -11,33 +11,35 @@ import (
|
|||
|
||||
// Reap should be called when the process receives an SIGCHLD. Reap will reap
|
||||
// all exited processes and close their wait channels
|
||||
func Reap() ([]utils.Exit, error) {
|
||||
func Reap() error {
|
||||
exits, err := utils.Reap(false)
|
||||
for _, e := range exits {
|
||||
Default.mu.Lock()
|
||||
Default.Lock()
|
||||
c, ok := Default.cmds[e.Pid]
|
||||
Default.mu.Unlock()
|
||||
Default.Unlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if c.c != nil {
|
||||
// after we get an exit, call wait on the go process to make sure all
|
||||
// pipes are closed and finalizers are run on the process
|
||||
c.c.Wait()
|
||||
c.exitCh <- e.Status
|
||||
Default.mu.Lock()
|
||||
delete(Default.cmds, e.Pid)
|
||||
Default.mu.Unlock()
|
||||
}
|
||||
return exits, err
|
||||
c.ExitCh <- e.Status
|
||||
Default.Lock()
|
||||
delete(Default.cmds, e.Pid)
|
||||
Default.Unlock()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var Default = &Monitor{
|
||||
cmds: make(map[int]*cmd),
|
||||
cmds: make(map[int]*Cmd),
|
||||
}
|
||||
|
||||
type Monitor struct {
|
||||
mu sync.Mutex
|
||||
cmds map[int]*cmd
|
||||
sync.Mutex
|
||||
cmds map[int]*Cmd
|
||||
}
|
||||
|
||||
func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) {
|
||||
|
@ -61,18 +63,18 @@ func (m *Monitor) CombinedOutput(c *exec.Cmd) ([]byte, error) {
|
|||
|
||||
// Start starts the command a registers the process with the reaper
|
||||
func (m *Monitor) Start(c *exec.Cmd) error {
|
||||
rc := &cmd{
|
||||
rc := &Cmd{
|
||||
c: c,
|
||||
exitCh: make(chan int, 1),
|
||||
ExitCh: make(chan int, 1),
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.Lock()
|
||||
// start the process
|
||||
if err := rc.c.Start(); err != nil {
|
||||
m.mu.Unlock()
|
||||
m.Unlock()
|
||||
return err
|
||||
}
|
||||
m.cmds[rc.c.Process.Pid] = rc
|
||||
m.mu.Unlock()
|
||||
m.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -86,16 +88,26 @@ func (m *Monitor) Run(c *exec.Cmd) error {
|
|||
}
|
||||
|
||||
func (m *Monitor) Wait(c *exec.Cmd) (int, error) {
|
||||
m.mu.Lock()
|
||||
rc, ok := m.cmds[c.Process.Pid]
|
||||
m.mu.Unlock()
|
||||
return m.WaitPid(c.Process.Pid)
|
||||
}
|
||||
|
||||
func (m *Monitor) Register(pid int, c *Cmd) {
|
||||
m.Lock()
|
||||
m.cmds[pid] = c
|
||||
m.Unlock()
|
||||
}
|
||||
|
||||
func (m *Monitor) WaitPid(pid int) (int, error) {
|
||||
m.Lock()
|
||||
rc, ok := m.cmds[pid]
|
||||
m.Unlock()
|
||||
if !ok {
|
||||
return 255, fmt.Errorf("process does not exist")
|
||||
}
|
||||
return <-rc.exitCh, nil
|
||||
return <-rc.ExitCh, nil
|
||||
}
|
||||
|
||||
type cmd struct {
|
||||
type Cmd struct {
|
||||
c *exec.Cmd
|
||||
exitCh chan int
|
||||
ExitCh chan int
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue