execution: "restore" container on service creation

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2016-12-15 10:40:24 -08:00
parent 73cb78fae3
commit 0fdd2469f6
8 changed files with 90 additions and 36 deletions

View file

@ -119,7 +119,10 @@ high performance container runtime
} }
defer nec.Close() defer nec.Close()
execService, err := execution.New(executor) ctx := log.WithModule(gocontext.Background(), "containerd")
ctx = log.WithModule(ctx, "execution")
ctx = events.WithPoster(ctx, events.GetNATSPoster(nec))
execService, err := execution.New(ctx, executor)
if err != nil { if err != nil {
return err return err
} }

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"time"
gocontext "context" gocontext "context"
@ -110,15 +111,23 @@ var runCommand = cli.Command{
} }
var ec uint32 var ec uint32
eventLoop:
for { for {
e, more := <-evCh select {
if !more { case e, more := <-evCh:
break if !more {
} fmt.Println("No More!")
break eventLoop
}
if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID { if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID {
ec = e.StatusCode ec = e.StatusCode
break break eventLoop
}
case <-time.After(1 * time.Second):
if nec.Conn.Status() != nats.CONNECTED {
break eventLoop
}
} }
} }

View file

@ -42,6 +42,12 @@ func (c *Container) ID() string {
} }
func (c *Container) Status() Status { func (c *Container) Status() Status {
for _, p := range c.processes {
if p.Pid() == c.initPid {
c.status = p.Status()
break
}
}
return c.status return c.status
} }

View file

@ -85,7 +85,7 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp
} }
}() }()
process, err := newProcess(container, initProcessID, initStateDir) process, err := newProcess(initProcessID, initStateDir, execution.Created)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -123,7 +123,7 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) {
return nil, err return nil, err
} }
for _, d := range dirs { for _, d := range dirs {
process, err := newProcess(container, filepath.Base(d), d) process, err := newProcess(filepath.Base(d), d, execution.Running)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -216,7 +216,7 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o
return nil, err return nil, err
} }
process, err := newProcess(c, o.ID, procStateDir) process, err := newProcess(o.ID, procStateDir, execution.Running)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -12,12 +12,11 @@ import (
starttime "github.com/opencontainers/runc/libcontainer/system" starttime "github.com/opencontainers/runc/libcontainer/system"
) )
func newProcess(c *execution.Container, id, stateDir string) (execution.Process, error) { func newProcess(id, stateDir string, status execution.Status) (execution.Process, error) {
pid, err := runc.ReadPidFile(filepath.Join(stateDir, PidFilename)) pid, err := runc.ReadPidFile(filepath.Join(stateDir, PidFilename))
if err != nil { if err != nil {
return nil, err return nil, err
} }
status := execution.Running
if err := syscall.Kill(pid, 0); err != nil { if err := syscall.Kill(pid, 0); err != nil {
if err == syscall.ESRCH { if err == syscall.ESRCH {
status = execution.Stopped status = execution.Stopped
@ -25,7 +24,7 @@ func newProcess(c *execution.Container, id, stateDir string) (execution.Process,
return nil, err return nil, err
} }
} }
if status == execution.Running { if status != execution.Stopped {
stime, err := starttime.GetProcessStartTime(pid) stime, err := starttime.GetProcessStartTime(pid)
switch { switch {
case os.IsNotExist(err): case os.IsNotExist(err):
@ -48,22 +47,18 @@ func newProcess(c *execution.Container, id, stateDir string) (execution.Process,
} }
} }
return &process{ return &process{
c: c, id: id,
id: id, pid: pid,
pid: pid, status: status,
status: status, exitCode: execution.UnknownStatusCode,
}, nil }, nil
} }
type process struct { type process struct {
c *execution.Container id string
id string pid int
pid int status execution.Status
status execution.Status exitCode uint32
}
func (p *process) Container() *execution.Container {
return p.c
} }
func (p *process) ID() string { func (p *process) ID() string {
@ -75,22 +70,24 @@ func (p *process) Pid() int64 {
} }
func (p *process) Wait() (uint32, error) { func (p *process) Wait() (uint32, error) {
if p.status == execution.Running { if p.status != execution.Stopped {
var wstatus syscall.WaitStatus var wstatus syscall.WaitStatus
_, err := syscall.Wait4(p.pid, &wstatus, 0, nil) _, err := syscall.Wait4(p.pid, &wstatus, 0, nil)
if err != nil { if err != nil {
return 255, nil // This process doesn't belong to us
p.exitCode = execution.UnknownStatusCode
return p.exitCode, nil
} }
// TODO: implement kill-all if we are the init pid // TODO: implement kill-all if we are the init pid?
p.status = execution.Stopped p.status = execution.Stopped
return uint32(wstatus.ExitStatus()), nil p.exitCode = uint32(wstatus.ExitStatus())
} }
return p.exitCode, nil
return 255, execution.ErrProcessNotFound
} }
func (p *process) Signal(s os.Signal) error { func (p *process) Signal(s os.Signal) error {
if p.status == execution.Running { if p.status != execution.Stopped {
sig, ok := s.(syscall.Signal) sig, ok := s.(syscall.Signal)
if !ok { if !ok {
return fmt.Errorf("invalid signal %v", s) return fmt.Errorf("invalid signal %v", s)

View file

@ -3,7 +3,6 @@ package execution
import "os" import "os"
type Process interface { type Process interface {
Container() *Container
ID() string ID() string
Pid() int64 Pid() int64
//Spec() *specs.Process //Spec() *specs.Process

View file

@ -2,6 +2,7 @@ package execution
import ( import (
"fmt" "fmt"
"os"
"syscall" "syscall"
"time" "time"
@ -16,10 +17,47 @@ var (
emptyResponse = &google_protobuf.Empty{} emptyResponse = &google_protobuf.Empty{}
) )
func New(executor Executor) (*Service, error) { func New(ctx context.Context, executor Executor) (*Service, error) {
return &Service{ svc := &Service{
executor: executor, executor: executor,
}, nil }
// List existing container, some of them may have died away if
// we've been restarted
containers, err := executor.List(ctx)
if err != nil {
return nil, err
}
for _, c := range containers {
status := c.Status()
if status == Stopped || status == Deleted {
// generate exit event for all processes, (generate event for init last)
processes := c.Processes()
processes = append(processes[1:], processes[0])
for _, p := range c.Processes() {
if p.Status() != Stopped {
p.Signal(os.Kill)
}
sc, err := p.Wait()
if err != nil {
sc = UnknownStatusCode
}
topic := GetContainerProcessEventTopic(c.ID(), p.ID())
svc.publishEvent(ctx, topic, &ContainerExitEvent{
ContainerEvent: ContainerEvent{
Timestamp: time.Now(),
ID: c.ID(),
Action: "exit",
},
PID: p.ID(),
StatusCode: sc,
})
}
}
}
return svc, nil
} }
type Service struct { type Service struct {

View file

@ -8,4 +8,6 @@ const (
Running Status = "running" Running Status = "running"
Stopped Status = "stopped" Stopped Status = "stopped"
Deleted Status = "deleted" Deleted Status = "deleted"
UnknownStatusCode = 255
) )