Update shim for exec
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
6808dbc02f
commit
835f3b6a97
37 changed files with 786 additions and 709 deletions
|
@ -1,5 +1,7 @@
|
|||
package supervisor
|
||||
|
||||
import "time"
|
||||
|
||||
type AddProcessEvent struct {
|
||||
s *Supervisor
|
||||
}
|
||||
|
@ -7,32 +9,23 @@ type AddProcessEvent struct {
|
|||
// TODO: add this to worker for concurrent starts??? maybe not because of races where the container
|
||||
// could be stopped and removed...
|
||||
func (h *AddProcessEvent) Handle(e *Event) error {
|
||||
/*
|
||||
start := time.Now()
|
||||
ci, ok := h.s.containers[e.ID]
|
||||
if !ok {
|
||||
return ErrContainerNotFound
|
||||
}
|
||||
p, io, err := h.s.runtime.StartProcess(ci.container, *e.Process, e.Console)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if e.Pid, err = p.Pid(); err != nil {
|
||||
return err
|
||||
}
|
||||
h.s.processes[e.Pid] = &containerInfo{
|
||||
container: ci.container,
|
||||
}
|
||||
l, err := h.s.copyIO(e.Stdin, e.Stdout, e.Stderr, io)
|
||||
if err != nil {
|
||||
// log the error but continue with the other commands
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"error": err,
|
||||
"id": e.ID,
|
||||
}).Error("log stdio")
|
||||
}
|
||||
h.s.processes[e.Pid].copier = l
|
||||
ExecProcessTimer.UpdateSince(start)
|
||||
*/
|
||||
start := time.Now()
|
||||
ci, ok := h.s.containers[e.ID]
|
||||
if !ok {
|
||||
return ErrContainerNotFound
|
||||
}
|
||||
process, err := ci.container.Exec(e.Pid, *e.ProcessSpec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.s.monitorProcess(process); err != nil {
|
||||
return err
|
||||
}
|
||||
ExecProcessTimer.UpdateSince(start)
|
||||
e.StartResponse <- StartResponse{
|
||||
Stdin: process.Stdin(),
|
||||
Stdout: process.Stdout(),
|
||||
Stderr: process.Stderr(),
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -17,11 +17,6 @@ func (h *DeleteEvent) Handle(e *Event) error {
|
|||
if err := h.deleteContainer(i.container); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: deleting container")
|
||||
}
|
||||
if i.copier != nil {
|
||||
if err := i.copier.Close(); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: close container copier")
|
||||
}
|
||||
}
|
||||
h.s.notifySubscribers(&Event{
|
||||
Type: ExitEventType,
|
||||
ID: e.ID,
|
||||
|
|
|
@ -25,7 +25,9 @@ func (h *ExitEvent) Handle(e *Event) error {
|
|||
if proc.ID() != runtime.InitProcessID {
|
||||
ne := NewEvent(ExecExitEventType)
|
||||
ne.ID = proc.Container().ID()
|
||||
ne.Pid = proc.ID()
|
||||
ne.Status = status
|
||||
ne.Process = proc
|
||||
h.s.SendEvent(ne)
|
||||
|
||||
return nil
|
||||
|
@ -51,17 +53,11 @@ type ExecExitEvent struct {
|
|||
}
|
||||
|
||||
func (h *ExecExitEvent) Handle(e *Event) error {
|
||||
container := e.Process.Container()
|
||||
// exec process: we remove this process without notifying the main event loop
|
||||
/*
|
||||
info := h.s.processes[e.Pid]
|
||||
if err := info.container.RemoveProcess(e.Pid); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: find container for pid")
|
||||
}
|
||||
if err := info.copier.Close(); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: close process IO")
|
||||
}
|
||||
delete(h.s.processes, e.Pid)
|
||||
h.s.notifySubscribers(e)
|
||||
*/
|
||||
if err := container.RemoveProcess(e.Pid); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: find container for pid")
|
||||
}
|
||||
h.s.notifySubscribers(e)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,66 +0,0 @@
|
|||
package supervisor
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
type ioConfig struct {
|
||||
StdoutPath string
|
||||
StderrPath string
|
||||
StdinPath string
|
||||
|
||||
Stdin io.WriteCloser
|
||||
Stdout io.ReadCloser
|
||||
Stderr io.ReadCloser
|
||||
}
|
||||
|
||||
func newCopier(i *ioConfig) (*copier, error) {
|
||||
l := &copier{
|
||||
config: i,
|
||||
}
|
||||
if i.StdinPath != "" {
|
||||
f, err := os.OpenFile(i.StdinPath, os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.closers = append(l.closers, f)
|
||||
go func() {
|
||||
io.Copy(i.Stdin, f)
|
||||
i.Stdin.Close()
|
||||
}()
|
||||
}
|
||||
if i.StdoutPath != "" {
|
||||
f, err := os.OpenFile(i.StdoutPath, os.O_RDWR, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.closers = append(l.closers, f)
|
||||
go io.Copy(f, i.Stdout)
|
||||
}
|
||||
if i.StderrPath != "" {
|
||||
f, err := os.OpenFile(i.StderrPath, os.O_RDWR, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.closers = append(l.closers, f)
|
||||
go io.Copy(f, i.Stderr)
|
||||
}
|
||||
return l, nil
|
||||
}
|
||||
|
||||
type copier struct {
|
||||
config *ioConfig
|
||||
closers []io.Closer
|
||||
}
|
||||
|
||||
func (l *copier) Close() (err error) {
|
||||
for _, c := range append(l.closers, l.config.Stdin, l.config.Stdout, l.config.Stderr) {
|
||||
if c != nil {
|
||||
if cerr := c.Close(); err == nil {
|
||||
err = cerr
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -11,6 +11,7 @@ var (
|
|||
EventsCounter = metrics.NewCounter()
|
||||
ExecProcessTimer = metrics.NewTimer()
|
||||
ExitProcessTimer = metrics.NewTimer()
|
||||
EpollFdCounter = metrics.NewCounter()
|
||||
)
|
||||
|
||||
func Metrics() map[string]interface{} {
|
||||
|
@ -23,5 +24,6 @@ func Metrics() map[string]interface{} {
|
|||
"events": EventsCounter,
|
||||
"exec-process-time": ExecProcessTimer,
|
||||
"exit-process-time": ExitProcessTimer,
|
||||
"epoll-fds": EpollFdCounter,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ func (m *Monitor) Monitor(p runtime.Process) error {
|
|||
if err := syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
|
||||
return err
|
||||
}
|
||||
EpollFdCounter.Inc(1)
|
||||
m.processes[fd] = p
|
||||
return nil
|
||||
}
|
||||
|
@ -75,6 +76,7 @@ func (m *Monitor) start() {
|
|||
}); err != nil {
|
||||
logrus.WithField("error", err).Fatal("containerd: epoll remove fd")
|
||||
}
|
||||
EpollFdCounter.Dec(1)
|
||||
if err := proc.Close(); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: close process IO")
|
||||
}
|
||||
|
|
|
@ -5,20 +5,18 @@ type SignalEvent struct {
|
|||
}
|
||||
|
||||
func (h *SignalEvent) Handle(e *Event) error {
|
||||
/*
|
||||
i, ok := h.s.containers[e.ID]
|
||||
if !ok {
|
||||
return ErrContainerNotFound
|
||||
i, ok := h.s.containers[e.ID]
|
||||
if !ok {
|
||||
return ErrContainerNotFound
|
||||
}
|
||||
processes, err := i.container.Processes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, p := range processes {
|
||||
if p.ID() == e.Pid {
|
||||
return p.Signal(e.Signal)
|
||||
}
|
||||
processes, err := i.container.Processes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, p := range processes {
|
||||
if pid, err := p.Pid(); err == nil && pid == e.Pid {
|
||||
return p.Signal(e.Signal)
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
return ErrProcessNotFound
|
||||
}
|
||||
|
|
69
supervisor/sort_test.go
Normal file
69
supervisor/sort_test.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package supervisor
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/containerd/runtime"
|
||||
"github.com/opencontainers/specs"
|
||||
)
|
||||
|
||||
type testProcess struct {
|
||||
id string
|
||||
}
|
||||
|
||||
func (p *testProcess) ID() string {
|
||||
return p.id
|
||||
}
|
||||
|
||||
func (p *testProcess) Stdin() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (p *testProcess) Stdout() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (p *testProcess) Stderr() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (p *testProcess) ExitFD() int {
|
||||
return -1
|
||||
}
|
||||
|
||||
func (p *testProcess) ExitStatus() (int, error) {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
func (p *testProcess) Container() runtime.Container {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *testProcess) Spec() specs.Process {
|
||||
return specs.Process{}
|
||||
}
|
||||
|
||||
func (p *testProcess) Signal(os.Signal) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *testProcess) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSortProcesses(t *testing.T) {
|
||||
p := []runtime.Process{
|
||||
&testProcess{"ls"},
|
||||
&testProcess{"other"},
|
||||
&testProcess{"init"},
|
||||
&testProcess{"other2"},
|
||||
}
|
||||
s := &processSorter{p}
|
||||
sort.Sort(s)
|
||||
|
||||
if id := p[len(p)-1].ID(); id != "init" {
|
||||
t.Fatalf("expected init but received %q", id)
|
||||
}
|
||||
}
|
|
@ -3,7 +3,7 @@ package supervisor
|
|||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -76,7 +76,6 @@ func New(stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error)
|
|||
|
||||
type containerInfo struct {
|
||||
container runtime.Container
|
||||
copier *copier
|
||||
}
|
||||
|
||||
type Supervisor struct {
|
||||
|
@ -195,14 +194,6 @@ func (s *Supervisor) restore() error {
|
|||
id := d.Name()
|
||||
container, err := runtime.Load(s.stateDir, id)
|
||||
if err != nil {
|
||||
if err == runtime.ErrContainerExited {
|
||||
logrus.WithField("id", id).Debug("containerd: container exited while away")
|
||||
// TODO: fire events to do the removal
|
||||
if err := os.RemoveAll(filepath.Join(s.stateDir, id)); err != nil {
|
||||
logrus.WithField("error", err).Warn("containerd: remove container state")
|
||||
}
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
processes, err := container.Processes()
|
||||
|
@ -214,11 +205,42 @@ func (s *Supervisor) restore() error {
|
|||
container: container,
|
||||
}
|
||||
logrus.WithField("id", id).Debug("containerd: container restored")
|
||||
var exitedProcesses []runtime.Process
|
||||
for _, p := range processes {
|
||||
if err := s.monitorProcess(p); err != nil {
|
||||
return err
|
||||
if _, err := p.ExitStatus(); err == nil {
|
||||
exitedProcesses = append(exitedProcesses, p)
|
||||
} else {
|
||||
if err := s.monitorProcess(p); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(exitedProcesses) > 0 {
|
||||
// sort processes so that init is fired last because that is how the kernel sends the
|
||||
// exit events
|
||||
sort.Sort(&processSorter{exitedProcesses})
|
||||
for _, p := range exitedProcesses {
|
||||
e := NewEvent(ExitEventType)
|
||||
e.Process = p
|
||||
s.SendEvent(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type processSorter struct {
|
||||
processes []runtime.Process
|
||||
}
|
||||
|
||||
func (s *processSorter) Len() int {
|
||||
return len(s.processes)
|
||||
}
|
||||
|
||||
func (s *processSorter) Swap(i, j int) {
|
||||
s.processes[i], s.processes[j] = s.processes[j], s.processes[i]
|
||||
}
|
||||
|
||||
func (s *processSorter) Less(i, j int) bool {
|
||||
return s.processes[j].ID() == "init"
|
||||
}
|
||||
|
|
|
@ -27,17 +27,17 @@ func (h *UpdateEvent) Handle(e *Event) error {
|
|||
}
|
||||
}
|
||||
if e.Signal != nil {
|
||||
/*
|
||||
// signal the pid1/main process of the container
|
||||
processes, err := container.Processes()
|
||||
if err != nil {
|
||||
return err
|
||||
// signal the pid1/main process of the container
|
||||
processes, err := container.Processes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, p := range processes {
|
||||
if p.ID() == runtime.InitProcessID {
|
||||
return p.Signal(e.Signal)
|
||||
}
|
||||
if len(processes) == 0 {
|
||||
return ErrProcessNotFound
|
||||
}
|
||||
return processes[0].Signal(e.Signal)
|
||||
*/
|
||||
}
|
||||
return ErrProcessNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ type Worker interface {
|
|||
type StartTask struct {
|
||||
Container runtime.Container
|
||||
Checkpoint string
|
||||
IO *runtime.IO
|
||||
Stdin string
|
||||
Stdout string
|
||||
Stderr string
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue