From 17d9c10e2d1e786b4e5845ea560d8e8ca6a982da Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 10 Nov 2015 14:57:10 -0800 Subject: [PATCH] Implement journal replay Add addprocess event for addtional processes Add more api process information --- api/v1/server.go | 93 +++++++++++++++++++++++++++++++++++++++---- api/v1/types.go | 21 ++++++++-- container.go | 9 ++++- containerd/journal.go | 85 +++++++++++++++++++++++++++++++++++++++ containerd/main.go | 1 + errors.go | 7 ++-- event.go | 32 ++++++++------- runtime.go | 3 ++ runtime_linux.go | 76 +++++++++++++++++++++++------------ supervisor.go | 31 ++++++++++++--- 10 files changed, 296 insertions(+), 62 deletions(-) create mode 100644 containerd/journal.go diff --git a/api/v1/server.go b/api/v1/server.go index cb2a099..9e34484 100644 --- a/api/v1/server.go +++ b/api/v1/server.go @@ -9,6 +9,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/crosbymichael/containerd" "github.com/gorilla/mux" + "github.com/opencontainers/specs" ) func NewServer(supervisor *containerd.Supervisor) http.Handler { @@ -18,9 +19,10 @@ func NewServer(supervisor *containerd.Supervisor) http.Handler { r: r, } r.HandleFunc("/containers/{id:.*}/process/{pid:.*}", s.signalPid).Methods("POST") + r.HandleFunc("/containers/{id:.*}/process", s.addProcess).Methods("PUT") r.HandleFunc("/containers/{id:.*}", s.createContainer).Methods("POST") + r.HandleFunc("/event", s.event).Methods("POST") r.HandleFunc("/containers", s.containers).Methods("GET") - // r.HandleFunc("/containers/{id:.*}", s.deleteContainer).Methods("DELETE") return s } @@ -33,6 +35,57 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.r.ServeHTTP(w, r) } +func (s *server) event(w http.ResponseWriter, r *http.Request) { + var e containerd.Event + if err := json.NewDecoder(r.Body).Decode(&e); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + e.Err = make(chan error, 1) + s.supervisor.SendEvent(&e) + if err := <-e.Err; err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if e.Containers != nil && len(e.Containers) > 0 { + if err := writeContainers(w, &e); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +func (s *server) addProcess(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + var process specs.Process + if err := json.NewDecoder(r.Body).Decode(&process); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + e := containerd.NewEvent(containerd.AddProcessEventType) + e.ID = id + e.Process = &process + s.supervisor.SendEvent(e) + if err := <-e.Err; err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + p := Process{ + Pid: e.Pid, + Terminal: process.Terminal, + Args: process.Args, + Env: process.Env, + Cwd: process.Cwd, + } + p.User.UID = process.User.UID + p.User.GID = process.User.GID + p.User.AdditionalGids = process.User.AdditionalGids + if err := json.NewEncoder(w).Encode(p); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + func (s *server) signalPid(w http.ResponseWriter, r *http.Request) { var ( vars = mux.Vars(r) @@ -66,14 +119,21 @@ func (s *server) signalPid(w http.ResponseWriter, r *http.Request) { } func (s *server) containers(w http.ResponseWriter, r *http.Request) { - var state State - state.Containers = []Container{} e := containerd.NewEvent(containerd.GetContainerEventType) s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } + if err := writeContainers(w, e); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func writeContainers(w http.ResponseWriter, e *containerd.Event) error { + var state State + state.Containers = []Container{} for _, c := range e.Containers { processes, err := c.Processes() if err != nil { @@ -82,9 +142,10 @@ func (s *server) containers(w http.ResponseWriter, r *http.Request) { "container": c.ID(), }).Error("get processes for container") } - var pids []int + var pids []Process for _, p := range processes { - pids = append(pids, p.Pid()) + proc := createProcess(p) + pids = append(pids, proc) } state.Containers = append(state.Containers, Container{ ID: c.ID(), @@ -92,10 +153,26 @@ func (s *server) containers(w http.ResponseWriter, r *http.Request) { Processes: pids, }) } - if err := json.NewEncoder(w).Encode(&state); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return + return json.NewEncoder(w).Encode(&state) +} + +func createProcess(in containerd.Process) Process { + pid, err := in.Pid() + if err != nil { + logrus.WithField("error", err).Error("get process pid") } + process := in.Spec() + p := Process{ + Pid: pid, + Terminal: process.Terminal, + Args: process.Args, + Env: process.Env, + Cwd: process.Cwd, + } + p.User.UID = process.User.UID + p.User.GID = process.User.GID + p.User.AdditionalGids = process.User.AdditionalGids + return p } func (s *server) createContainer(w http.ResponseWriter, r *http.Request) { diff --git a/api/v1/types.go b/api/v1/types.go index 4ca8fdb..545d6db 100644 --- a/api/v1/types.go +++ b/api/v1/types.go @@ -5,9 +5,24 @@ type State struct { } type Container struct { - ID string `json:"id,omitempty"` - BundlePath string `json:"bundlePath,omitempty"` - Processes []int `json:"processes,omitempty"` + ID string `json:"id,omitempty"` + BundlePath string `json:"bundlePath,omitempty"` + Processes []Process `json:"processes,omitempty"` +} + +type User struct { + UID uint32 `json:"uid"` + GID uint32 `json:"gid"` + AdditionalGids []uint32 `json:"additionalGids,omitempty"` +} + +type Process struct { + Terminal bool `json:"terminal,omitempty"` + User User `json:"user,omitempty"` + Args []string `json:"args,omitempty"` + Env []string `json:"env,omitempty"` + Cwd string `json:"cwd,omitempty"` + Pid int `json:"pid,omitempty"` } type Signal struct { diff --git a/container.go b/container.go index 4fc8424..f115be8 100644 --- a/container.go +++ b/container.go @@ -1,9 +1,14 @@ package containerd -import "os" +import ( + "os" + + "github.com/opencontainers/specs" +) type Process interface { - Pid() int + Pid() (int, error) + Spec() specs.Process Signal(os.Signal) error } diff --git a/containerd/journal.go b/containerd/journal.go new file mode 100644 index 0000000..1c876dd --- /dev/null +++ b/containerd/journal.go @@ -0,0 +1,85 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + + "github.com/Sirupsen/logrus" + "github.com/codegangsta/cli" + "github.com/crosbymichael/containerd" +) + +var JournalCommand = cli.Command{ + Name: "journal", + Usage: "interact with the containerd journal", + Subcommands: []cli.Command{ + JournalReplyCommand, + }, +} + +var JournalReplyCommand = cli.Command{ + Name: "replay", + Usage: "replay a journal to get containerd's state syncronized after a crash", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "addr", + Value: "localhost:8888", + Usage: "address of the containerd daemon", + }, + }, + Action: func(context *cli.Context) { + if err := replay(context.Args().First(), context.String("addr")); err != nil { + logrus.Fatal(err) + } + }, +} + +func replay(path, addr string) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + dec := json.NewDecoder(f) + var events []*containerd.Event + type entry struct { + Event *containerd.Event `json:"event"` + } + for dec.More() { + var e entry + if err := dec.Decode(&e); err != nil { + if err == io.EOF { + break + } + return err + } + events = append(events, e.Event) + } + c := &http.Client{} + for _, e := range events { + switch e.Type { + case containerd.ExitEventType, containerd.DeleteEventType: + // ignore these types of events + continue + } + data, err := json.Marshal(e) + if err != nil { + return err + } + fmt.Printf("sending %q event\n", e.Type) + r, err := c.Post("http://"+filepath.Join(addr, "event"), "application/json", bytes.NewBuffer(data)) + if err != nil { + return err + } + if r.Body != nil { + io.Copy(os.Stdout, r.Body) + r.Body.Close() + } + } + return nil +} diff --git a/containerd/main.go b/containerd/main.go index 9480d35..eae2d81 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -25,6 +25,7 @@ func main() { } app.Commands = []cli.Command{ DaemonCommand, + JournalCommand, } app.Flags = []cli.Flag{ cli.BoolFlag{Name: "debug", Usage: "enable debug output in the logs"}, diff --git a/errors.go b/errors.go index 8cd3dc4..1863752 100644 --- a/errors.go +++ b/errors.go @@ -11,7 +11,8 @@ var ( ErrProcessNotFound = errors.New("containerd: processs not found for container") // Internal errors - errShutdown = errors.New("containerd: supervisor is shutdown") - errRootNotAbs = errors.New("containerd: rootfs path is not an absolute path") - errNoContainerForPid = errors.New("containerd: pid not registered for any container") + errShutdown = errors.New("containerd: supervisor is shutdown") + errRootNotAbs = errors.New("containerd: rootfs path is not an absolute path") + errNoContainerForPid = errors.New("containerd: pid not registered for any container") + errInvalidContainerType = errors.New("containerd: invalid container type for runtime") ) diff --git a/event.go b/event.go index d8ef85e..8e427b5 100644 --- a/event.go +++ b/event.go @@ -3,16 +3,19 @@ package containerd import ( "os" "time" + + "github.com/opencontainers/specs" ) type EventType string const ( - ExitEventType EventType = "exit" - StartContainerEventType EventType = "startContainer" - ContainerStartErrorEventType EventType = "startContainerError" - GetContainerEventType EventType = "getContainer" - SignalEventType EventType = "signal" + ExitEventType EventType = "exit" + StartContainerEventType EventType = "startContainer" + DeleteEventType EventType = "deleteContainerEvent" + GetContainerEventType EventType = "getContainer" + SignalEventType EventType = "signal" + AddProcessEventType EventType = "addProcess" ) func NewEvent(t EventType) *Event { @@ -24,13 +27,14 @@ func NewEvent(t EventType) *Event { } type Event struct { - Type EventType `json:"type"` - Timestamp time.Time `json:"timestamp"` - ID string `json:"id,omitempty"` - BundlePath string `json:"bundlePath,omitempty"` - Pid int `json:"pid,omitempty"` - Status int `json:"status,omitempty"` - Signal os.Signal `json:"signal,omitempty"` - Containers []Container `json:"-"` - Err chan error `json:"-"` + Type EventType `json:"type"` + Timestamp time.Time `json:"timestamp"` + ID string `json:"id,omitempty"` + BundlePath string `json:"bundlePath,omitempty"` + Pid int `json:"pid,omitempty"` + Status int `json:"status,omitempty"` + Signal os.Signal `json:"signal,omitempty"` + Process *specs.Process `json:"process,omitempty"` + Containers []Container `json:"-"` + Err chan error `json:"-"` } diff --git a/runtime.go b/runtime.go index 4ffc838..72daf9a 100644 --- a/runtime.go +++ b/runtime.go @@ -1,6 +1,9 @@ package containerd +import "github.com/opencontainers/specs" + // runtime handles containers, containers handle their own actions. type Runtime interface { Create(id, bundlePath string) (Container, error) + StartProcess(Container, specs.Process) (Process, error) } diff --git a/runtime_linux.go b/runtime_linux.go index 797b33b..d4580df 100644 --- a/runtime_linux.go +++ b/runtime_linux.go @@ -14,6 +14,7 @@ import ( "github.com/opencontainers/runc/libcontainer" "github.com/opencontainers/runc/libcontainer/configs" + _ "github.com/opencontainers/runc/libcontainer/nsenter" "github.com/opencontainers/runc/libcontainer/seccomp" "github.com/opencontainers/specs" ) @@ -159,27 +160,34 @@ func init() { } type libcontainerProcess struct { - pid int + process *libcontainer.Process + spec specs.Process } -func (p *libcontainerProcess) Pid() int { - return p.pid +// change interface to support an error +func (p *libcontainerProcess) Pid() (int, error) { + pid, err := p.process.Pid() + if err != nil { + return -1, err + } + return pid, nil +} + +func (p *libcontainerProcess) Spec() specs.Process { + return p.spec } func (p *libcontainerProcess) Signal(s os.Signal) error { - proc, err := os.FindProcess(p.pid) - if err != nil { - return err - } - return proc.Signal(s) + return p.process.Signal(s) } type libcontainerContainer struct { - c libcontainer.Container - initProcess *libcontainer.Process - exitStatus int - exited bool - path string + c libcontainer.Container + initProcess *libcontainerProcess + additionalProcesses []*libcontainerProcess + exitStatus int + exited bool + path string } func (c *libcontainerContainer) ID() string { @@ -195,7 +203,7 @@ func (c *libcontainerContainer) Pid() (int, error) { } func (c *libcontainerContainer) Start() error { - return c.c.Start(c.initProcess) + return c.c.Start(c.initProcess.process) } func (c *libcontainerContainer) SetExited(status int) { @@ -209,17 +217,13 @@ func (c *libcontainerContainer) Delete() error { } func (c *libcontainerContainer) Processes() ([]Process, error) { - pids, err := c.c.Processes() - if err != nil { - return nil, err + procs := []Process{ + c.initProcess, } - var proceses []Process - for _, pid := range pids { - proceses = append(proceses, &libcontainerProcess{ - pid: pid, - }) + for _, p := range c.additionalProcesses { + procs = append(procs, p) } - return proceses, nil + return procs, nil } func NewRuntime(stateDir string) (Runtime, error) { @@ -258,13 +262,33 @@ func (r *libcontainerRuntime) Create(id, bundlePath string) (Container, error) { } process := r.newProcess(spec.Process) c := &libcontainerContainer{ - c: container, - initProcess: process, - path: bundlePath, + c: container, + initProcess: &libcontainerProcess{ + process: process, + spec: spec.Process, + }, + path: bundlePath, } return c, nil } +func (r *libcontainerRuntime) StartProcess(ci Container, p specs.Process) (Process, error) { + c, ok := ci.(*libcontainerContainer) + if !ok { + return nil, errInvalidContainerType + } + process := r.newProcess(p) + if err := c.c.Start(process); err != nil { + return nil, err + } + lp := &libcontainerProcess{ + process: process, + spec: p, + } + c.additionalProcesses = append(c.additionalProcesses, lp) + return lp, nil +} + // newProcess returns a new libcontainer Process with the arguments from the // spec and stdio from the current process. func (r *libcontainerRuntime) newProcess(p specs.Process) *libcontainer.Process { diff --git a/supervisor.go b/supervisor.go index 1aa4486..d7cf649 100644 --- a/supervisor.go +++ b/supervisor.go @@ -52,6 +52,8 @@ type Supervisor struct { workerGroup sync.WaitGroup } +// need proper close logic for jobs and stuff so that sending to the channels dont panic +// but can complete jobs func (s *Supervisor) Close() error { return s.journal.Close() } @@ -82,9 +84,9 @@ func (s *Supervisor) Start(events chan *Event) error { continue } container.SetExited(e.Status) - if err := s.deleteContainer(container); err != nil { - logrus.WithField("error", err).Error("containerd: deleting container") - } + ne := NewEvent(DeleteEventType) + ne.ID = container.ID() + s.SendEvent(ne) case StartContainerEventType: container, err := s.runtime.Create(e.ID, e.BundlePath) if err != nil { @@ -96,7 +98,8 @@ func (s *Supervisor) Start(events chan *Event) error { err: e.Err, container: container, } - case ContainerStartErrorEventType: + continue + case DeleteEventType: if container, ok := s.containers[e.ID]; ok { if err := s.deleteContainer(container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") @@ -106,7 +109,6 @@ func (s *Supervisor) Start(events chan *Event) error { for _, c := range s.containers { e.Containers = append(e.Containers, c) } - e.Err <- nil case SignalEventType: container, ok := s.containers[e.ID] if !ok { @@ -119,13 +121,30 @@ func (s *Supervisor) Start(events chan *Event) error { continue } for _, p := range processes { - if p.Pid() == e.Pid { + if pid, err := p.Pid(); err == nil && pid == e.Pid { e.Err <- p.Signal(e.Signal) continue } } e.Err <- ErrProcessNotFound + continue + case AddProcessEventType: + container, ok := s.containers[e.ID] + if !ok { + e.Err <- ErrContainerNotFound + continue + } + p, err := s.runtime.StartProcess(container, *e.Process) + if err != nil { + e.Err <- err + continue + } + if e.Pid, err = p.Pid(); err != nil { + e.Err <- err + continue + } } + close(e.Err) } }() return nil