diff --git a/cmd/containerd-shim/main.go b/cmd/containerd-shim/main.go index 8a45ad8..b7cc6d6 100644 --- a/cmd/containerd-shim/main.go +++ b/cmd/containerd-shim/main.go @@ -57,16 +57,20 @@ func main() { if err := setupRoot(); err != nil { return err } + path, err := os.Getwd() + if err != nil { + return err + } var ( server = grpc.NewServer() - sv = shim.New() + sv = shim.New(path) ) logrus.Debug("registering grpc server") shimapi.RegisterShimServer(server, sv) if err := serve(server, "shim.sock"); err != nil { return err } - return handleSignals(signals, server, sv) + return handleSignals(signals, server) } if err := app.Run(os.Args); err != nil { fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err) @@ -107,24 +111,14 @@ func serve(server *grpc.Server, path string) error { return nil } -func handleSignals(signals chan os.Signal, server *grpc.Server, service *shim.Service) error { +func handleSignals(signals chan os.Signal, server *grpc.Server) error { for s := range signals { logrus.WithField("signal", s).Debug("received signal") switch s { case syscall.SIGCHLD: - exits, err := reaper.Reap() - if err != nil { + if err := reaper.Reap(); err != nil { logrus.WithError(err).Error("reap exit status") } - for _, e := range exits { - logrus.WithFields(logrus.Fields{ - "status": e.Status, - "pid": e.Pid, - }).Debug("process exited") - if err := service.ProcessExit(e); err != nil { - return err - } - } case syscall.SIGTERM, syscall.SIGINT: // TODO: should we forward signals to the processes if they are still running? // i.e. machine reboot diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 0f8947a..b308573 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -409,7 +409,7 @@ func handleSignals(signals chan os.Signal, server *grpc.Server) error { log.G(global).WithField("signal", s).Debug("received signal") switch s { case syscall.SIGCHLD: - if _, err := reaper.Reap(); err != nil { + if err := reaper.Reap(); err != nil { log.G(global).WithError(err).Error("reap containerd processes") } default: diff --git a/linux/runtime.go b/linux/runtime.go index df4c292..ff9c396 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -36,6 +36,8 @@ func init() { type Config struct { // Runtime is a path or name of an OCI runtime used by the shim Runtime string `toml:"runtime"` + // NoShim calls runc directly from within the pkg + NoShim bool `toml:"no_shim"` } func New(ic *plugin.InitContext) (interface{}, error) { @@ -50,6 +52,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { c, cancel := context.WithCancel(ic.Context) return &Runtime{ root: path, + remote: !cfg.NoShim, runtime: cfg.Runtime, events: make(chan *containerd.Event, 2048), eventsContext: c, @@ -61,6 +64,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { type Runtime struct { root string runtime string + remote bool events chan *containerd.Event eventsContext context.Context @@ -73,7 +77,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts containerd.CreateO if err != nil { return nil, err } - s, err := newShim(path) + s, err := newShim(path, r.remote) if err != nil { os.RemoveAll(path) return nil, err @@ -217,7 +221,7 @@ func (r *Runtime) deleteBundle(id string) error { func (r *Runtime) loadContainer(path string) (*Container, error) { id := filepath.Base(path) - s, err := loadShim(path) + s, err := loadShim(path, r.remote) if err != nil { return nil, err } diff --git a/linux/shim.go b/linux/shim.go index a011796..62c1b94 100644 --- a/linux/shim.go +++ b/linux/shim.go @@ -14,12 +14,16 @@ import ( "google.golang.org/grpc/grpclog" "github.com/docker/containerd/api/services/shim" + localShim "github.com/docker/containerd/linux/shim" "github.com/docker/containerd/reaper" "github.com/docker/containerd/utils" "github.com/pkg/errors" ) -func newShim(path string) (shim.ShimClient, error) { +func newShim(path string, remote bool) (shim.ShimClient, error) { + if !remote { + return localShim.Client(path), nil + } socket := filepath.Join(path, "shim.sock") l, err := utils.CreateUnixSocket(socket) if err != nil { @@ -48,7 +52,10 @@ func newShim(path string) (shim.ShimClient, error) { return connectShim(socket) } -func loadShim(path string) (shim.ShimClient, error) { +func loadShim(path string, remote bool) (shim.ShimClient, error) { + if !remote { + return localShim.Client(path), nil + } socket := filepath.Join(path, "shim.sock") return connectShim(socket) // TODO: failed to connect to the shim, check if it's alive diff --git a/linux/shim/client.go b/linux/shim/client.go new file mode 100644 index 0000000..2e190aa --- /dev/null +++ b/linux/shim/client.go @@ -0,0 +1,105 @@ +package shim + +import ( + "path/filepath" + "syscall" + + shimapi "github.com/docker/containerd/api/services/shim" + "github.com/docker/containerd/api/types/container" + google_protobuf "github.com/golang/protobuf/ptypes/empty" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +func Client(path string) shimapi.ShimClient { + return &client{ + s: New(path), + } +} + +type client struct { + s *Service +} + +func (c *client) Create(ctx context.Context, in *shimapi.CreateRequest, opts ...grpc.CallOption) (*shimapi.CreateResponse, error) { + return c.s.Create(ctx, in) +} + +func (c *client) Start(ctx context.Context, in *shimapi.StartRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + return c.s.Start(ctx, in) +} + +func (c *client) Delete(ctx context.Context, in *shimapi.DeleteRequest, opts ...grpc.CallOption) (*shimapi.DeleteResponse, error) { + return c.s.Delete(ctx, in) +} + +func (c *client) Exec(ctx context.Context, in *shimapi.ExecRequest, opts ...grpc.CallOption) (*shimapi.ExecResponse, error) { + return c.s.Exec(ctx, in) +} + +func (c *client) Pty(ctx context.Context, in *shimapi.PtyRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + return c.s.Pty(ctx, in) +} + +func (c *client) Events(ctx context.Context, in *shimapi.EventsRequest, opts ...grpc.CallOption) (shimapi.Shim_EventsClient, error) { + return &events{ + c: c.s.events, + ctx: ctx, + }, nil +} + +func (c *client) State(ctx context.Context, in *shimapi.StateRequest, opts ...grpc.CallOption) (*shimapi.StateResponse, error) { + return c.s.State(ctx, in) +} + +func (c *client) Pause(ctx context.Context, in *shimapi.PauseRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + return c.s.Pause(ctx, in) +} + +func (c *client) Resume(ctx context.Context, in *shimapi.ResumeRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + return c.s.Resume(ctx, in) +} + +func (c *client) Exit(ctx context.Context, in *shimapi.ExitRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + // don't exit the calling process for the client + // but make sure we unmount the containers rootfs for this client + if err := syscall.Unmount(filepath.Join(c.s.path, "rootfs"), 0); err != nil { + return nil, err + } + return empty, nil +} + +type events struct { + c chan *container.Event + ctx context.Context +} + +func (e *events) Recv() (*container.Event, error) { + ev := <-e.c + return ev, nil +} + +func (e *events) Header() (metadata.MD, error) { + return nil, nil +} + +func (e *events) Trailer() metadata.MD { + return nil +} + +func (e *events) CloseSend() error { + return nil +} + +func (e *events) Context() context.Context { + return e.ctx +} + +func (e *events) SendMsg(m interface{}) error { + return nil +} + +func (e *events) RecvMsg(m interface{}) error { + return nil +} diff --git a/linux/shim/exec.go b/linux/shim/exec.go index 0a94ce1..4b45273 100644 --- a/linux/shim/exec.go +++ b/linux/shim/exec.go @@ -26,22 +26,19 @@ type execProcess struct { parent *initProcess } -func newExecProcess(context context.Context, r *shimapi.ExecRequest, parent *initProcess, id int) (process, error) { - cwd, err := os.Getwd() - if err != nil { - return nil, err - } +func newExecProcess(context context.Context, path string, r *shimapi.ExecRequest, parent *initProcess, id int) (process, error) { e := &execProcess{ id: id, parent: parent, } var ( + err error socket *runc.ConsoleSocket io runc.IO - pidfile = filepath.Join(cwd, fmt.Sprintf("%d.pid", id)) + pidfile = filepath.Join(path, fmt.Sprintf("%d.pid", id)) ) if r.Terminal { - if socket, err = runc.NewConsoleSocket(filepath.Join(cwd, "pty.sock")); err != nil { + if socket, err = runc.NewConsoleSocket(filepath.Join(path, "pty.sock")); err != nil { return nil, err } defer os.Remove(socket.Path()) diff --git a/linux/shim/init.go b/linux/shim/init.go index e7b9347..aba5c60 100644 --- a/linux/shim/init.go +++ b/linux/shim/init.go @@ -25,24 +25,20 @@ type initProcess struct { pid int } -func newInitProcess(context context.Context, r *shimapi.CreateRequest) (*initProcess, error) { - cwd, err := os.Getwd() - if err != nil { - return nil, err - } +func newInitProcess(context context.Context, path string, r *shimapi.CreateRequest) (*initProcess, error) { for _, rm := range r.Rootfs { m := &containerd.Mount{ Type: rm.Type, Source: rm.Source, Options: rm.Options, } - if err := m.Mount(filepath.Join(cwd, "rootfs")); err != nil { + if err := m.Mount(filepath.Join(path, "rootfs")); err != nil { return nil, err } } runtime := &runc.Runc{ Command: r.Runtime, - Log: filepath.Join(cwd, "log.json"), + Log: filepath.Join(path, "log.json"), LogFormat: runc.JSON, PdeathSignal: syscall.SIGKILL, } @@ -52,11 +48,12 @@ func newInitProcess(context context.Context, r *shimapi.CreateRequest) (*initPro runc: runtime, } var ( + err error socket *runc.ConsoleSocket io runc.IO ) if r.Terminal { - if socket, err = runc.NewConsoleSocket(filepath.Join(cwd, "pty.sock")); err != nil { + if socket, err = runc.NewConsoleSocket(filepath.Join(path, "pty.sock")); err != nil { return nil, err } defer os.Remove(socket.Path()) @@ -68,7 +65,7 @@ func newInitProcess(context context.Context, r *shimapi.CreateRequest) (*initPro p.io = io } opts := &runc.CreateOpts{ - PidFile: filepath.Join(cwd, "init.pid"), + PidFile: filepath.Join(path, "init.pid"), ConsoleSocket: socket, IO: io, NoPivot: r.NoPivot, diff --git a/linux/shim/service.go b/linux/shim/service.go index 4cc8fd0..4d7d61c 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -5,10 +5,11 @@ import ( "sync" "syscall" + "github.com/Sirupsen/logrus" "github.com/crosbymichael/console" shimapi "github.com/docker/containerd/api/services/shim" "github.com/docker/containerd/api/types/container" - "github.com/docker/containerd/utils" + "github.com/docker/containerd/reaper" google_protobuf "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "golang.org/x/net/context" @@ -17,8 +18,9 @@ import ( var empty = &google_protobuf.Empty{} // New returns a new shim service that can be used via GRPC -func New() *Service { +func New(path string) *Service { return &Service{ + path: path, processes: make(map[int]process), events: make(chan *container.Event, 4096), } @@ -26,6 +28,7 @@ func New() *Service { type Service struct { initProcess *initProcess + path string id string bundle string mu sync.Mutex @@ -35,7 +38,7 @@ type Service struct { } func (s *Service) Create(ctx context.Context, r *shimapi.CreateRequest) (*shimapi.CreateResponse, error) { - process, err := newInitProcess(ctx, r) + process, err := newInitProcess(ctx, s.path, r) if err != nil { return nil, err } @@ -46,6 +49,23 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateRequest) (*shimap pid := process.Pid() s.processes[pid] = process s.mu.Unlock() + cmd := &reaper.Cmd{ + ExitCh: make(chan int, 1), + } + reaper.Default.Register(pid, cmd) + go func() { + status, err := reaper.Default.WaitPid(pid) + if err != nil { + logrus.WithError(err).Error("waitpid") + } + process.Exited(status) + s.events <- &container.Event{ + Type: container.Event_EXIT, + ID: s.id, + Pid: uint32(pid), + ExitStatus: uint32(status), + } + }() s.events <- &container.Event{ Type: container.Event_CREATE, ID: r.ID, @@ -90,7 +110,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecRequest) (*shimapi.Ex s.mu.Lock() defer s.mu.Unlock() s.execID++ - process, err := newExecProcess(ctx, r, s.initProcess, s.execID) + process, err := newExecProcess(ctx, s.path, r, s.initProcess, s.execID) if err != nil { return nil, err } @@ -197,18 +217,3 @@ func (s *Service) Exit(ctx context.Context, r *shimapi.ExitRequest) (*google_pro } return empty, nil } - -func (s *Service) ProcessExit(e utils.Exit) error { - s.mu.Lock() - if p, ok := s.processes[e.Pid]; ok { - p.Exited(e.Status) - s.events <- &container.Event{ - Type: container.Event_EXIT, - ID: s.id, - Pid: uint32(p.Pid()), - ExitStatus: uint32(e.Status), - } - } - s.mu.Unlock() - return nil -} diff --git a/reaper/reaper.go b/reaper/reaper.go index 38894fe..94e278e 100644 --- a/reaper/reaper.go +++ b/reaper/reaper.go @@ -11,33 +11,35 @@ import ( // Reap should be called when the process receives an SIGCHLD. Reap will reap // all exited processes and close their wait channels -func Reap() ([]utils.Exit, error) { +func Reap() error { exits, err := utils.Reap(false) for _, e := range exits { - Default.mu.Lock() + Default.Lock() c, ok := Default.cmds[e.Pid] - Default.mu.Unlock() + Default.Unlock() if !ok { continue } - // after we get an exit, call wait on the go process to make sure all - // pipes are closed and finalizers are run on the process - c.c.Wait() - c.exitCh <- e.Status - Default.mu.Lock() + if c.c != nil { + // after we get an exit, call wait on the go process to make sure all + // pipes are closed and finalizers are run on the process + c.c.Wait() + } + c.ExitCh <- e.Status + Default.Lock() delete(Default.cmds, e.Pid) - Default.mu.Unlock() + Default.Unlock() } - return exits, err + return err } var Default = &Monitor{ - cmds: make(map[int]*cmd), + cmds: make(map[int]*Cmd), } type Monitor struct { - mu sync.Mutex - cmds map[int]*cmd + sync.Mutex + cmds map[int]*Cmd } func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) { @@ -61,18 +63,18 @@ func (m *Monitor) CombinedOutput(c *exec.Cmd) ([]byte, error) { // Start starts the command a registers the process with the reaper func (m *Monitor) Start(c *exec.Cmd) error { - rc := &cmd{ + rc := &Cmd{ c: c, - exitCh: make(chan int, 1), + ExitCh: make(chan int, 1), } - m.mu.Lock() + m.Lock() // start the process if err := rc.c.Start(); err != nil { - m.mu.Unlock() + m.Unlock() return err } m.cmds[rc.c.Process.Pid] = rc - m.mu.Unlock() + m.Unlock() return nil } @@ -86,16 +88,26 @@ func (m *Monitor) Run(c *exec.Cmd) error { } func (m *Monitor) Wait(c *exec.Cmd) (int, error) { - m.mu.Lock() - rc, ok := m.cmds[c.Process.Pid] - m.mu.Unlock() + return m.WaitPid(c.Process.Pid) +} + +func (m *Monitor) Register(pid int, c *Cmd) { + m.Lock() + m.cmds[pid] = c + m.Unlock() +} + +func (m *Monitor) WaitPid(pid int) (int, error) { + m.Lock() + rc, ok := m.cmds[pid] + m.Unlock() if !ok { return 255, fmt.Errorf("process does not exist") } - return <-rc.exitCh, nil + return <-rc.ExitCh, nil } -type cmd struct { +type Cmd struct { c *exec.Cmd - exitCh chan int + ExitCh chan int }