diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 3e2b2fb..0cb954f 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -119,7 +119,10 @@ high performance container runtime } defer nec.Close() - execService, err := execution.New(executor) + ctx := log.WithModule(gocontext.Background(), "containerd") + ctx = log.WithModule(ctx, "execution") + ctx = events.WithPoster(ctx, events.GetNATSPoster(nec)) + execService, err := execution.New(ctx, executor) if err != nil { return err } diff --git a/cmd/ctr/run.go b/cmd/ctr/run.go index c6b57fa..16ec614 100644 --- a/cmd/ctr/run.go +++ b/cmd/ctr/run.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "time" gocontext "context" @@ -110,15 +111,23 @@ var runCommand = cli.Command{ } var ec uint32 + eventLoop: for { - e, more := <-evCh - if !more { - break - } + select { + case e, more := <-evCh: + if !more { + fmt.Println("No More!") + break eventLoop + } - if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID { - ec = e.StatusCode - break + if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID { + ec = e.StatusCode + break eventLoop + } + case <-time.After(1 * time.Second): + if nec.Conn.Status() != nats.CONNECTED { + break eventLoop + } } } diff --git a/execution/container.go b/execution/container.go index 68b17de..c9c2db0 100644 --- a/execution/container.go +++ b/execution/container.go @@ -42,6 +42,12 @@ func (c *Container) ID() string { } func (c *Container) Status() Status { + for _, p := range c.processes { + if p.Pid() == c.initPid { + c.status = p.Status() + break + } + } return c.status } diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go index 07d4490..5a6dc5f 100644 --- a/execution/executors/oci/oci.go +++ b/execution/executors/oci/oci.go @@ -85,7 +85,7 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp } }() - process, err := newProcess(container, initProcessID, initStateDir) + process, err := newProcess(initProcessID, initStateDir, execution.Created) if err != nil { return nil, err } @@ -123,7 +123,7 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { return nil, err } for _, d := range dirs { - process, err := newProcess(container, filepath.Base(d), d) + process, err := newProcess(filepath.Base(d), d, execution.Running) if err != nil { return nil, err } @@ -216,7 +216,7 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o return nil, err } - process, err := newProcess(c, o.ID, procStateDir) + process, err := newProcess(o.ID, procStateDir, execution.Running) if err != nil { return nil, err } diff --git a/execution/executors/oci/process.go b/execution/executors/oci/process.go index b3f73f8..6e6095d 100644 --- a/execution/executors/oci/process.go +++ b/execution/executors/oci/process.go @@ -12,12 +12,11 @@ import ( starttime "github.com/opencontainers/runc/libcontainer/system" ) -func newProcess(c *execution.Container, id, stateDir string) (execution.Process, error) { +func newProcess(id, stateDir string, status execution.Status) (execution.Process, error) { pid, err := runc.ReadPidFile(filepath.Join(stateDir, PidFilename)) if err != nil { return nil, err } - status := execution.Running if err := syscall.Kill(pid, 0); err != nil { if err == syscall.ESRCH { status = execution.Stopped @@ -25,7 +24,7 @@ func newProcess(c *execution.Container, id, stateDir string) (execution.Process, return nil, err } } - if status == execution.Running { + if status != execution.Stopped { stime, err := starttime.GetProcessStartTime(pid) switch { case os.IsNotExist(err): @@ -48,22 +47,18 @@ func newProcess(c *execution.Container, id, stateDir string) (execution.Process, } } return &process{ - c: c, - id: id, - pid: pid, - status: status, + id: id, + pid: pid, + status: status, + exitCode: execution.UnknownStatusCode, }, nil } type process struct { - c *execution.Container - id string - pid int - status execution.Status -} - -func (p *process) Container() *execution.Container { - return p.c + id string + pid int + status execution.Status + exitCode uint32 } func (p *process) ID() string { @@ -75,22 +70,24 @@ func (p *process) Pid() int64 { } func (p *process) Wait() (uint32, error) { - if p.status == execution.Running { + if p.status != execution.Stopped { var wstatus syscall.WaitStatus _, err := syscall.Wait4(p.pid, &wstatus, 0, nil) if err != nil { - return 255, nil + // This process doesn't belong to us + p.exitCode = execution.UnknownStatusCode + return p.exitCode, nil } - // TODO: implement kill-all if we are the init pid + // TODO: implement kill-all if we are the init pid? p.status = execution.Stopped - return uint32(wstatus.ExitStatus()), nil + p.exitCode = uint32(wstatus.ExitStatus()) } + return p.exitCode, nil - return 255, execution.ErrProcessNotFound } func (p *process) Signal(s os.Signal) error { - if p.status == execution.Running { + if p.status != execution.Stopped { sig, ok := s.(syscall.Signal) if !ok { return fmt.Errorf("invalid signal %v", s) diff --git a/execution/process.go b/execution/process.go index 9f365a6..f23de9c 100644 --- a/execution/process.go +++ b/execution/process.go @@ -3,7 +3,6 @@ package execution import "os" type Process interface { - Container() *Container ID() string Pid() int64 //Spec() *specs.Process diff --git a/execution/service.go b/execution/service.go index f812183..cb3303b 100644 --- a/execution/service.go +++ b/execution/service.go @@ -2,6 +2,7 @@ package execution import ( "fmt" + "os" "syscall" "time" @@ -16,10 +17,47 @@ var ( emptyResponse = &google_protobuf.Empty{} ) -func New(executor Executor) (*Service, error) { - return &Service{ +func New(ctx context.Context, executor Executor) (*Service, error) { + svc := &Service{ executor: executor, - }, nil + } + + // List existing container, some of them may have died away if + // we've been restarted + containers, err := executor.List(ctx) + if err != nil { + return nil, err + } + + for _, c := range containers { + status := c.Status() + if status == Stopped || status == Deleted { + // generate exit event for all processes, (generate event for init last) + processes := c.Processes() + processes = append(processes[1:], processes[0]) + for _, p := range c.Processes() { + if p.Status() != Stopped { + p.Signal(os.Kill) + } + sc, err := p.Wait() + if err != nil { + sc = UnknownStatusCode + } + topic := GetContainerProcessEventTopic(c.ID(), p.ID()) + svc.publishEvent(ctx, topic, &ContainerExitEvent{ + ContainerEvent: ContainerEvent{ + Timestamp: time.Now(), + ID: c.ID(), + Action: "exit", + }, + PID: p.ID(), + StatusCode: sc, + }) + } + } + } + + return svc, nil } type Service struct { diff --git a/execution/status.go b/execution/status.go index a6be45a..f3bc83b 100644 --- a/execution/status.go +++ b/execution/status.go @@ -8,4 +8,6 @@ const ( Running Status = "running" Stopped Status = "stopped" Deleted Status = "deleted" + + UnknownStatusCode = 255 )