Merge pull request #391 from mlaventure/restore-and-cleanup

Add restore and small cleanup
This commit is contained in:
Michael Crosby 2016-12-16 11:54:53 -08:00 committed by GitHub
commit 4171ca0ca1
15 changed files with 247 additions and 82 deletions

View file

@ -119,7 +119,10 @@ high performance container runtime
}
defer nec.Close()
execService, err := execution.New(executor)
ctx := log.WithModule(gocontext.Background(), "containerd")
ctx = log.WithModule(ctx, "execution")
ctx = events.WithPoster(ctx, events.GetNATSPoster(nec))
execService, err := execution.New(ctx, executor)
if err != nil {
return err
}

61
cmd/ctr/events.go Normal file
View file

@ -0,0 +1,61 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/nats-io/go-nats"
"github.com/urfave/cli"
)
var eventsCommand = cli.Command{
Name: "events",
Usage: "display containerd events",
Flags: []cli.Flag{
cli.StringFlag{
Name: "subject, s",
Usage: "subjects filter",
Value: "containerd.>",
},
},
Action: func(context *cli.Context) error {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return err
}
nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
nc.Close()
return err
}
defer nec.Close()
evCh := make(chan *nats.Msg, 64)
sub, err := nec.Subscribe(context.String("subject"), func(e *nats.Msg) {
evCh <- e
})
if err != nil {
return err
}
defer sub.Unsubscribe()
for {
e, more := <-evCh
if !more {
break
}
var prettyJSON bytes.Buffer
err := json.Indent(&prettyJSON, e.Data, "", "\t")
if err != nil {
fmt.Println(string(e.Data))
} else {
fmt.Println(prettyJSON.String())
}
}
return nil
},
}

View file

@ -18,6 +18,10 @@ var execCommand = cli.Command{
Name: "id, i",
Usage: "target container id",
},
cli.StringFlag{
Name: "pid, p",
Usage: "new process id",
},
cli.StringFlag{
Name: "cwd, c",
Usage: "current working directory for the process",
@ -48,6 +52,7 @@ var execCommand = cli.Command{
sOpts := &execution.StartProcessRequest{
ContainerID: id,
Process: &execution.Process{
ID: context.String("pid"),
Cwd: context.String("cwd"),
Terminal: context.Bool("tty"),
Args: context.Args(),

View file

@ -36,6 +36,7 @@ containerd client
app.Commands = []cli.Command{
runCommand,
execCommand,
eventsCommand,
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {

View file

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"time"
gocontext "context"
@ -110,15 +111,23 @@ var runCommand = cli.Command{
}
var ec uint32
eventLoop:
for {
e, more := <-evCh
select {
case e, more := <-evCh:
if !more {
break
fmt.Println("No More!")
break eventLoop
}
if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID {
ec = e.StatusCode
break
break eventLoop
}
case <-time.After(1 * time.Second):
if nec.Conn.Status() != nats.CONNECTED {
break eventLoop
}
}
}

View file

@ -11,12 +11,12 @@ func NewContainer(stateRoot, id, bundle string) (*Container, error) {
id: id,
bundle: bundle,
stateDir: stateDir,
status: "created",
status: Created,
processes: make(map[string]Process),
}, nil
}
func LoadContainer(dir StateDir, id, bundle, status string, initPid int64) *Container {
func LoadContainer(dir StateDir, id, bundle string, status Status, initPid int64) *Container {
return &Container{
id: id,
stateDir: dir,
@ -32,7 +32,7 @@ type Container struct {
bundle string
stateDir StateDir
initPid int64
status string
status Status
processes map[string]Process
}
@ -41,7 +41,13 @@ func (c *Container) ID() string {
return c.id
}
func (c *Container) Status() string {
func (c *Container) Status() Status {
for _, p := range c.processes {
if p.Pid() == c.initPid {
c.status = p.Status()
break
}
}
return c.status
}

7
execution/error.go Normal file
View file

@ -0,0 +1,7 @@
package execution
import "fmt"
var (
ErrProcessNotFound = fmt.Errorf("process not found")
)

View file

@ -16,6 +16,7 @@ type CreateOpts struct {
}
type StartProcessOpts struct {
ID string
Spec specs.Process
Console bool
Stdin string

View file

@ -12,6 +12,15 @@ import (
"github.com/docker/containerd/execution"
)
const (
initProcessID = "init"
)
const (
PidFilename = "pid"
StartTimeFilename = "starttime"
)
var (
ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string")
)
@ -59,11 +68,11 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp
}
}(container)
initProcID, initStateDir, err := container.StateDir().NewProcess()
initStateDir, err := container.StateDir().NewProcess(initProcessID)
if err != nil {
return nil, err
}
pidFile := filepath.Join(initStateDir, "pid")
pidFile := filepath.Join(initStateDir, PidFilename)
err = r.runc.Create(ctx, id, o.Bundle, &runc.CreateOpts{
PidFile: pidFile,
Console: oio.console,
@ -79,11 +88,7 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp
}
}()
pid, err := runc.ReadPidFile(pidFile)
if err != nil {
return nil, err
}
process, err := newProcess(initProcID, pid)
process, err := newProcess(initProcessID, initStateDir, execution.Created)
if err != nil {
return nil, err
}
@ -112,7 +117,7 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) {
execution.StateDir(filepath.Join(r.root, runcC.ID)),
runcC.ID,
runcC.Bundle,
runcC.Status,
execution.Status(runcC.Status),
int64(runcC.Pid),
)
@ -121,19 +126,11 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) {
return nil, err
}
for _, d := range dirs {
pid, err := runc.ReadPidFile(filepath.Join(d, "pid"))
if err != nil {
if os.IsNotExist(err) {
// Process died in between
continue
}
return nil, err
}
process, err := newProcess(filepath.Base(d), pid)
process, err := newProcess(filepath.Base(d), d, execution.Running)
if err != nil {
return nil, err
}
container.AddProcess(process, pid == runcC.Pid)
container.AddProcess(process, process.Pid() == int64(runcC.Pid))
}
return container, nil
@ -201,17 +198,17 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o
}
}()
procID, procStateDir, err := c.StateDir().NewProcess()
procStateDir, err := c.StateDir().NewProcess(o.ID)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
c.StateDir().DeleteProcess(procID)
c.StateDir().DeleteProcess(o.ID)
}
}()
pidFile := filepath.Join(procStateDir, "pid")
pidFile := filepath.Join(procStateDir, PidFilename)
if err := r.runc.Exec(ctx, c.ID(), o.Spec, &runc.ExecOpts{
PidFile: pidFile,
Detach: false,
@ -221,12 +218,8 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o
}); err != nil {
return nil, err
}
pid, err := runc.ReadPidFile(pidFile)
if err != nil {
return nil, err
}
process, err := newProcess(procID, pid)
process, err := newProcess(o.ID, procStateDir, execution.Running)
if err != nil {
return nil, err
}

View file

@ -1,26 +1,64 @@
package oci
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"syscall"
"github.com/crosbymichael/go-runc"
"github.com/docker/containerd/execution"
starttime "github.com/opencontainers/runc/libcontainer/system"
)
func newProcess(id string, pid int) (execution.Process, error) {
proc, err := os.FindProcess(pid)
func newProcess(id, stateDir string, status execution.Status) (execution.Process, error) {
pid, err := runc.ReadPidFile(filepath.Join(stateDir, PidFilename))
if err != nil {
return nil, err
}
if err := syscall.Kill(pid, 0); err != nil {
if err == syscall.ESRCH {
status = execution.Stopped
} else {
return nil, err
}
}
if status != execution.Stopped {
stime, err := starttime.GetProcessStartTime(pid)
switch {
case os.IsNotExist(err):
status = execution.Stopped
case err != nil:
return nil, err
default:
b, err := ioutil.ReadFile(filepath.Join(stateDir, StartTimeFilename))
switch {
case os.IsNotExist(err):
err = ioutil.WriteFile(filepath.Join(stateDir, StartTimeFilename), []byte(stime), 0600)
if err != nil {
return nil, err
}
case err != nil:
return nil, err
case string(b) != stime:
status = execution.Stopped
}
}
}
return &process{
id: id,
proc: proc,
pid: pid,
status: status,
exitCode: execution.UnknownStatusCode,
}, nil
}
type process struct {
id string
proc *os.Process
pid int
status execution.Status
exitCode uint32
}
func (p *process) ID() string {
@ -28,18 +66,37 @@ func (p *process) ID() string {
}
func (p *process) Pid() int64 {
return int64(p.proc.Pid)
return int64(p.pid)
}
func (p *process) Wait() (uint32, error) {
state, err := p.proc.Wait()
if p.status != execution.Stopped {
var wstatus syscall.WaitStatus
_, err := syscall.Wait4(p.pid, &wstatus, 0, nil)
if err != nil {
return 0, nil
// This process doesn't belong to us
p.exitCode = execution.UnknownStatusCode
return p.exitCode, nil
}
// TODO: implement kill-all if we are the init pid
return uint32(state.Sys().(syscall.WaitStatus).ExitStatus()), nil
// TODO: implement kill-all if we are the init pid?
p.status = execution.Stopped
p.exitCode = uint32(wstatus.ExitStatus())
}
return p.exitCode, nil
}
func (p *process) Signal(s os.Signal) error {
return p.proc.Signal(s)
if p.status != execution.Stopped {
sig, ok := s.(syscall.Signal)
if !ok {
return fmt.Errorf("invalid signal %v", s)
}
return syscall.Kill(p.pid, sig)
}
return execution.ErrProcessNotFound
}
func (p *process) Status() execution.Status {
return p.status
}

View file

@ -1,19 +0,0 @@
package execution
import (
"context"
"github.com/docker/containerd/log"
"github.com/sirupsen/logrus"
)
var ctx context.Context
func GetLogger(module string) *logrus.Entry {
if ctx == nil {
ctx = log.WithModule(context.Background(), "execution")
}
subCtx := log.WithModule(ctx, module)
return log.GetLogger(subCtx)
}

View file

@ -8,4 +8,5 @@ type Process interface {
//Spec() *specs.Process
Wait() (uint32, error)
Signal(os.Signal) error
Status() Status
}

View file

@ -2,6 +2,7 @@ package execution
import (
"fmt"
"os"
"syscall"
"time"
@ -14,18 +15,53 @@ import (
var (
emptyResponse = &google_protobuf.Empty{}
ErrProcessNotFound = fmt.Errorf("Process not found")
)
func New(executor Executor) (*Service, error) {
return &Service{
func New(ctx context.Context, executor Executor) (*Service, error) {
svc := &Service{
executor: executor,
}, nil
}
// List existing container, some of them may have died away if
// we've been restarted
containers, err := executor.List(ctx)
if err != nil {
return nil, err
}
for _, c := range containers {
status := c.Status()
if status == Stopped || status == Deleted {
// generate exit event for all processes, (generate event for init last)
processes := c.Processes()
processes = append(processes[1:], processes[0])
for _, p := range c.Processes() {
if p.Status() != Stopped {
p.Signal(os.Kill)
}
sc, err := p.Wait()
if err != nil {
sc = UnknownStatusCode
}
topic := GetContainerProcessEventTopic(c.ID(), p.ID())
svc.publishEvent(ctx, topic, &ContainerExitEvent{
ContainerEvent: ContainerEvent{
Timestamp: time.Now(),
ID: c.ID(),
Action: "exit",
},
PID: p.ID(),
StatusCode: sc,
})
}
}
}
return svc, nil
}
type Service struct {
executor Executor
supervisor *Supervisor
}
func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
@ -134,6 +170,7 @@ func (s *Service) StartProcess(ctx context.Context, r *api.StartProcessRequest)
}
process, err := s.executor.StartProcess(ctx, container, StartProcessOpts{
ID: r.Process.ID,
Spec: spec,
Console: r.Console,
Stdin: r.Stdin,

View file

@ -26,13 +26,13 @@ func (s StateDir) Delete() error {
return os.RemoveAll(string(s))
}
func (s StateDir) NewProcess() (id, dir string, err error) {
dir, err = ioutil.TempDir(s.processesDir(), "")
if err != nil {
return "", "", err
func (s StateDir) NewProcess(id string) (dir string, err error) {
dir = filepath.Join(s.processesDir(), id)
if err = os.Mkdir(dir, 0700); err != nil {
return "", err
}
return filepath.Base(dir), dir, err
return dir, nil
}
func (s StateDir) ProcessDir(id string) string {

View file

@ -3,8 +3,11 @@ package execution
type Status string
const (
Created Status = "created"
Paused Status = "paused"
Running Status = "running"
Stopped Status = "stopped"
Deleted Status = "deleted"
UnknownStatusCode = 255
)