Implement journal replay

Add addprocess event for addtional processes

Add more api process information
This commit is contained in:
Michael Crosby 2015-11-10 14:57:10 -08:00
parent 6ff2239019
commit 17d9c10e2d
10 changed files with 296 additions and 62 deletions

View file

@ -9,6 +9,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/crosbymichael/containerd"
"github.com/gorilla/mux"
"github.com/opencontainers/specs"
)
func NewServer(supervisor *containerd.Supervisor) http.Handler {
@ -18,9 +19,10 @@ func NewServer(supervisor *containerd.Supervisor) http.Handler {
r: r,
}
r.HandleFunc("/containers/{id:.*}/process/{pid:.*}", s.signalPid).Methods("POST")
r.HandleFunc("/containers/{id:.*}/process", s.addProcess).Methods("PUT")
r.HandleFunc("/containers/{id:.*}", s.createContainer).Methods("POST")
r.HandleFunc("/event", s.event).Methods("POST")
r.HandleFunc("/containers", s.containers).Methods("GET")
// r.HandleFunc("/containers/{id:.*}", s.deleteContainer).Methods("DELETE")
return s
}
@ -33,6 +35,57 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.r.ServeHTTP(w, r)
}
func (s *server) event(w http.ResponseWriter, r *http.Request) {
var e containerd.Event
if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
e.Err = make(chan error, 1)
s.supervisor.SendEvent(&e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if e.Containers != nil && len(e.Containers) > 0 {
if err := writeContainers(w, &e); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func (s *server) addProcess(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
var process specs.Process
if err := json.NewDecoder(r.Body).Decode(&process); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
e := containerd.NewEvent(containerd.AddProcessEventType)
e.ID = id
e.Process = &process
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
p := Process{
Pid: e.Pid,
Terminal: process.Terminal,
Args: process.Args,
Env: process.Env,
Cwd: process.Cwd,
}
p.User.UID = process.User.UID
p.User.GID = process.User.GID
p.User.AdditionalGids = process.User.AdditionalGids
if err := json.NewEncoder(w).Encode(p); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *server) signalPid(w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
@ -66,14 +119,21 @@ func (s *server) signalPid(w http.ResponseWriter, r *http.Request) {
}
func (s *server) containers(w http.ResponseWriter, r *http.Request) {
var state State
state.Containers = []Container{}
e := containerd.NewEvent(containerd.GetContainerEventType)
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := writeContainers(w, e); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func writeContainers(w http.ResponseWriter, e *containerd.Event) error {
var state State
state.Containers = []Container{}
for _, c := range e.Containers {
processes, err := c.Processes()
if err != nil {
@ -82,9 +142,10 @@ func (s *server) containers(w http.ResponseWriter, r *http.Request) {
"container": c.ID(),
}).Error("get processes for container")
}
var pids []int
var pids []Process
for _, p := range processes {
pids = append(pids, p.Pid())
proc := createProcess(p)
pids = append(pids, proc)
}
state.Containers = append(state.Containers, Container{
ID: c.ID(),
@ -92,10 +153,26 @@ func (s *server) containers(w http.ResponseWriter, r *http.Request) {
Processes: pids,
})
}
if err := json.NewEncoder(w).Encode(&state); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
return json.NewEncoder(w).Encode(&state)
}
func createProcess(in containerd.Process) Process {
pid, err := in.Pid()
if err != nil {
logrus.WithField("error", err).Error("get process pid")
}
process := in.Spec()
p := Process{
Pid: pid,
Terminal: process.Terminal,
Args: process.Args,
Env: process.Env,
Cwd: process.Cwd,
}
p.User.UID = process.User.UID
p.User.GID = process.User.GID
p.User.AdditionalGids = process.User.AdditionalGids
return p
}
func (s *server) createContainer(w http.ResponseWriter, r *http.Request) {

View file

@ -5,9 +5,24 @@ type State struct {
}
type Container struct {
ID string `json:"id,omitempty"`
BundlePath string `json:"bundlePath,omitempty"`
Processes []int `json:"processes,omitempty"`
ID string `json:"id,omitempty"`
BundlePath string `json:"bundlePath,omitempty"`
Processes []Process `json:"processes,omitempty"`
}
type User struct {
UID uint32 `json:"uid"`
GID uint32 `json:"gid"`
AdditionalGids []uint32 `json:"additionalGids,omitempty"`
}
type Process struct {
Terminal bool `json:"terminal,omitempty"`
User User `json:"user,omitempty"`
Args []string `json:"args,omitempty"`
Env []string `json:"env,omitempty"`
Cwd string `json:"cwd,omitempty"`
Pid int `json:"pid,omitempty"`
}
type Signal struct {

View file

@ -1,9 +1,14 @@
package containerd
import "os"
import (
"os"
"github.com/opencontainers/specs"
)
type Process interface {
Pid() int
Pid() (int, error)
Spec() specs.Process
Signal(os.Signal) error
}

85
containerd/journal.go Normal file
View file

@ -0,0 +1,85 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/crosbymichael/containerd"
)
var JournalCommand = cli.Command{
Name: "journal",
Usage: "interact with the containerd journal",
Subcommands: []cli.Command{
JournalReplyCommand,
},
}
var JournalReplyCommand = cli.Command{
Name: "replay",
Usage: "replay a journal to get containerd's state syncronized after a crash",
Flags: []cli.Flag{
cli.StringFlag{
Name: "addr",
Value: "localhost:8888",
Usage: "address of the containerd daemon",
},
},
Action: func(context *cli.Context) {
if err := replay(context.Args().First(), context.String("addr")); err != nil {
logrus.Fatal(err)
}
},
}
func replay(path, addr string) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
dec := json.NewDecoder(f)
var events []*containerd.Event
type entry struct {
Event *containerd.Event `json:"event"`
}
for dec.More() {
var e entry
if err := dec.Decode(&e); err != nil {
if err == io.EOF {
break
}
return err
}
events = append(events, e.Event)
}
c := &http.Client{}
for _, e := range events {
switch e.Type {
case containerd.ExitEventType, containerd.DeleteEventType:
// ignore these types of events
continue
}
data, err := json.Marshal(e)
if err != nil {
return err
}
fmt.Printf("sending %q event\n", e.Type)
r, err := c.Post("http://"+filepath.Join(addr, "event"), "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
if r.Body != nil {
io.Copy(os.Stdout, r.Body)
r.Body.Close()
}
}
return nil
}

View file

@ -25,6 +25,7 @@ func main() {
}
app.Commands = []cli.Command{
DaemonCommand,
JournalCommand,
}
app.Flags = []cli.Flag{
cli.BoolFlag{Name: "debug", Usage: "enable debug output in the logs"},

View file

@ -11,7 +11,8 @@ var (
ErrProcessNotFound = errors.New("containerd: processs not found for container")
// Internal errors
errShutdown = errors.New("containerd: supervisor is shutdown")
errRootNotAbs = errors.New("containerd: rootfs path is not an absolute path")
errNoContainerForPid = errors.New("containerd: pid not registered for any container")
errShutdown = errors.New("containerd: supervisor is shutdown")
errRootNotAbs = errors.New("containerd: rootfs path is not an absolute path")
errNoContainerForPid = errors.New("containerd: pid not registered for any container")
errInvalidContainerType = errors.New("containerd: invalid container type for runtime")
)

View file

@ -3,16 +3,19 @@ package containerd
import (
"os"
"time"
"github.com/opencontainers/specs"
)
type EventType string
const (
ExitEventType EventType = "exit"
StartContainerEventType EventType = "startContainer"
ContainerStartErrorEventType EventType = "startContainerError"
GetContainerEventType EventType = "getContainer"
SignalEventType EventType = "signal"
ExitEventType EventType = "exit"
StartContainerEventType EventType = "startContainer"
DeleteEventType EventType = "deleteContainerEvent"
GetContainerEventType EventType = "getContainer"
SignalEventType EventType = "signal"
AddProcessEventType EventType = "addProcess"
)
func NewEvent(t EventType) *Event {
@ -24,13 +27,14 @@ func NewEvent(t EventType) *Event {
}
type Event struct {
Type EventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
ID string `json:"id,omitempty"`
BundlePath string `json:"bundlePath,omitempty"`
Pid int `json:"pid,omitempty"`
Status int `json:"status,omitempty"`
Signal os.Signal `json:"signal,omitempty"`
Containers []Container `json:"-"`
Err chan error `json:"-"`
Type EventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
ID string `json:"id,omitempty"`
BundlePath string `json:"bundlePath,omitempty"`
Pid int `json:"pid,omitempty"`
Status int `json:"status,omitempty"`
Signal os.Signal `json:"signal,omitempty"`
Process *specs.Process `json:"process,omitempty"`
Containers []Container `json:"-"`
Err chan error `json:"-"`
}

View file

@ -1,6 +1,9 @@
package containerd
import "github.com/opencontainers/specs"
// runtime handles containers, containers handle their own actions.
type Runtime interface {
Create(id, bundlePath string) (Container, error)
StartProcess(Container, specs.Process) (Process, error)
}

View file

@ -14,6 +14,7 @@ import (
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/configs"
_ "github.com/opencontainers/runc/libcontainer/nsenter"
"github.com/opencontainers/runc/libcontainer/seccomp"
"github.com/opencontainers/specs"
)
@ -159,27 +160,34 @@ func init() {
}
type libcontainerProcess struct {
pid int
process *libcontainer.Process
spec specs.Process
}
func (p *libcontainerProcess) Pid() int {
return p.pid
// change interface to support an error
func (p *libcontainerProcess) Pid() (int, error) {
pid, err := p.process.Pid()
if err != nil {
return -1, err
}
return pid, nil
}
func (p *libcontainerProcess) Spec() specs.Process {
return p.spec
}
func (p *libcontainerProcess) Signal(s os.Signal) error {
proc, err := os.FindProcess(p.pid)
if err != nil {
return err
}
return proc.Signal(s)
return p.process.Signal(s)
}
type libcontainerContainer struct {
c libcontainer.Container
initProcess *libcontainer.Process
exitStatus int
exited bool
path string
c libcontainer.Container
initProcess *libcontainerProcess
additionalProcesses []*libcontainerProcess
exitStatus int
exited bool
path string
}
func (c *libcontainerContainer) ID() string {
@ -195,7 +203,7 @@ func (c *libcontainerContainer) Pid() (int, error) {
}
func (c *libcontainerContainer) Start() error {
return c.c.Start(c.initProcess)
return c.c.Start(c.initProcess.process)
}
func (c *libcontainerContainer) SetExited(status int) {
@ -209,17 +217,13 @@ func (c *libcontainerContainer) Delete() error {
}
func (c *libcontainerContainer) Processes() ([]Process, error) {
pids, err := c.c.Processes()
if err != nil {
return nil, err
procs := []Process{
c.initProcess,
}
var proceses []Process
for _, pid := range pids {
proceses = append(proceses, &libcontainerProcess{
pid: pid,
})
for _, p := range c.additionalProcesses {
procs = append(procs, p)
}
return proceses, nil
return procs, nil
}
func NewRuntime(stateDir string) (Runtime, error) {
@ -258,13 +262,33 @@ func (r *libcontainerRuntime) Create(id, bundlePath string) (Container, error) {
}
process := r.newProcess(spec.Process)
c := &libcontainerContainer{
c: container,
initProcess: process,
path: bundlePath,
c: container,
initProcess: &libcontainerProcess{
process: process,
spec: spec.Process,
},
path: bundlePath,
}
return c, nil
}
func (r *libcontainerRuntime) StartProcess(ci Container, p specs.Process) (Process, error) {
c, ok := ci.(*libcontainerContainer)
if !ok {
return nil, errInvalidContainerType
}
process := r.newProcess(p)
if err := c.c.Start(process); err != nil {
return nil, err
}
lp := &libcontainerProcess{
process: process,
spec: p,
}
c.additionalProcesses = append(c.additionalProcesses, lp)
return lp, nil
}
// newProcess returns a new libcontainer Process with the arguments from the
// spec and stdio from the current process.
func (r *libcontainerRuntime) newProcess(p specs.Process) *libcontainer.Process {

View file

@ -52,6 +52,8 @@ type Supervisor struct {
workerGroup sync.WaitGroup
}
// need proper close logic for jobs and stuff so that sending to the channels dont panic
// but can complete jobs
func (s *Supervisor) Close() error {
return s.journal.Close()
}
@ -82,9 +84,9 @@ func (s *Supervisor) Start(events chan *Event) error {
continue
}
container.SetExited(e.Status)
if err := s.deleteContainer(container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container")
}
ne := NewEvent(DeleteEventType)
ne.ID = container.ID()
s.SendEvent(ne)
case StartContainerEventType:
container, err := s.runtime.Create(e.ID, e.BundlePath)
if err != nil {
@ -96,7 +98,8 @@ func (s *Supervisor) Start(events chan *Event) error {
err: e.Err,
container: container,
}
case ContainerStartErrorEventType:
continue
case DeleteEventType:
if container, ok := s.containers[e.ID]; ok {
if err := s.deleteContainer(container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container")
@ -106,7 +109,6 @@ func (s *Supervisor) Start(events chan *Event) error {
for _, c := range s.containers {
e.Containers = append(e.Containers, c)
}
e.Err <- nil
case SignalEventType:
container, ok := s.containers[e.ID]
if !ok {
@ -119,13 +121,30 @@ func (s *Supervisor) Start(events chan *Event) error {
continue
}
for _, p := range processes {
if p.Pid() == e.Pid {
if pid, err := p.Pid(); err == nil && pid == e.Pid {
e.Err <- p.Signal(e.Signal)
continue
}
}
e.Err <- ErrProcessNotFound
continue
case AddProcessEventType:
container, ok := s.containers[e.ID]
if !ok {
e.Err <- ErrContainerNotFound
continue
}
p, err := s.runtime.StartProcess(container, *e.Process)
if err != nil {
e.Err <- err
continue
}
if e.Pid, err = p.Pid(); err != nil {
e.Err <- err
continue
}
}
close(e.Err)
}
}()
return nil