From 6ff2239019175a4ca99f2df86412e4c3866a0776 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 10 Nov 2015 14:24:34 -0800 Subject: [PATCH] Add journaling --- api/v1/server.go | 23 +++++------- containerd/daemon.go | 13 +++---- event.go | 83 ++++++++++++++++---------------------------- journal.go | 41 ++++++++++++++++++++++ supervisor.go | 41 +++++++++++++++------- 5 files changed, 115 insertions(+), 86 deletions(-) create mode 100644 journal.go diff --git a/api/v1/server.go b/api/v1/server.go index c0211ca..cb2a099 100644 --- a/api/v1/server.go +++ b/api/v1/server.go @@ -49,12 +49,11 @@ func (s *server) signalPid(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - e := &containerd.SignalEvent{ - ID: id, - Pid: pid, - Signal: syscall.Signal(signal.Signal), - Err: make(chan error, 1), - } + + e := containerd.NewEvent(containerd.SignalEventType) + e.ID = id + e.Pid = pid + e.Signal = syscall.Signal(signal.Signal) s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { status := http.StatusInternalServerError @@ -69,9 +68,7 @@ func (s *server) signalPid(w http.ResponseWriter, r *http.Request) { func (s *server) containers(w http.ResponseWriter, r *http.Request) { var state State state.Containers = []Container{} - e := &containerd.GetContainersEvent{ - Err: make(chan error, 1), - } + e := containerd.NewEvent(containerd.GetContainerEventType) s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -112,11 +109,9 @@ func (s *server) createContainer(w http.ResponseWriter, r *http.Request) { http.Error(w, "empty bundle path", http.StatusBadRequest) return } - e := &containerd.StartContainerEvent{ - ID: id, - BundlePath: c.BundlePath, - Err: make(chan error, 1), - } + e := containerd.NewEvent(containerd.StartContainerEventType) + e.ID = id + e.BundlePath = c.BundlePath s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { code := http.StatusInternalServerError diff --git a/containerd/daemon.go b/containerd/daemon.go index 4a9f029..f632df4 100644 --- a/containerd/daemon.go +++ b/containerd/daemon.go @@ -54,7 +54,7 @@ func daemon(stateDir string, concurrency, bufferSize int) error { if err != nil { return err } - events := make(chan containerd.Event, bufferSize) + events := make(chan *containerd.Event, bufferSize) // start the signal handler in the background. go startSignalHandler(supervisor, bufferSize) if err := supervisor.Start(events); err != nil { @@ -71,6 +71,7 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { for s := range signals { switch s { case syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP: + supervisor.Close() os.Exit(0) case syscall.SIGCHLD: exits, err := reap() @@ -84,7 +85,7 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { } } -func reap() (exits []*containerd.ExitEvent, err error) { +func reap() (exits []*containerd.Event, err error) { var ( ws syscall.WaitStatus rus syscall.Rusage @@ -100,9 +101,9 @@ func reap() (exits []*containerd.ExitEvent, err error) { if pid <= 0 { return exits, nil } - exits = append(exits, &containerd.ExitEvent{ - Pid: pid, - Status: utils.ExitStatus(ws), - }) + e := containerd.NewEvent(containerd.ExitEventType) + e.Pid = pid + e.Status = utils.ExitStatus(ws) + exits = append(exits, e) } } diff --git a/event.go b/event.go index c9490a8..d8ef85e 100644 --- a/event.go +++ b/event.go @@ -1,59 +1,36 @@ package containerd -import "os" +import ( + "os" + "time" +) -type Event interface { - String() string +type EventType string + +const ( + ExitEventType EventType = "exit" + StartContainerEventType EventType = "startContainer" + ContainerStartErrorEventType EventType = "startContainerError" + GetContainerEventType EventType = "getContainer" + SignalEventType EventType = "signal" +) + +func NewEvent(t EventType) *Event { + return &Event{ + Type: t, + Timestamp: time.Now(), + Err: make(chan error, 1), + } } -type CallbackEvent interface { - Event() Event - Callback() chan Event -} - -type ExitEvent struct { - Pid int - Status int -} - -func (e *ExitEvent) String() string { - return "exit event" -} - -type StartContainerEvent struct { - ID string - BundlePath string - Err chan error -} - -func (c *StartContainerEvent) String() string { - return "create container" -} - -type ContainerStartErrorEvent struct { - ID string -} - -func (c *ContainerStartErrorEvent) String() string { - return "container start error" -} - -type GetContainersEvent struct { - Containers []Container - Err chan error -} - -func (c *GetContainersEvent) String() string { - return "get containers" -} - -type SignalEvent struct { - ID string - Pid int - Signal os.Signal - Err chan error -} - -func (s *SignalEvent) String() string { - return "signal event" +type Event struct { + Type EventType `json:"type"` + Timestamp time.Time `json:"timestamp"` + ID string `json:"id,omitempty"` + BundlePath string `json:"bundlePath,omitempty"` + Pid int `json:"pid,omitempty"` + Status int `json:"status,omitempty"` + Signal os.Signal `json:"signal,omitempty"` + Containers []Container `json:"-"` + Err chan error `json:"-"` } diff --git a/journal.go b/journal.go new file mode 100644 index 0000000..5889f0f --- /dev/null +++ b/journal.go @@ -0,0 +1,41 @@ +package containerd + +import ( + "encoding/json" + "os" + "path/filepath" +) + +type entry struct { + Event *Event `json:"event"` +} + +func newJournal(path string) (*journal, error) { + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return nil, err + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0755) + if err != nil { + return nil, err + } + return &journal{ + f: f, + enc: json.NewEncoder(f), + }, nil +} + +type journal struct { + f *os.File + enc *json.Encoder +} + +func (j *journal) write(e *Event) error { + et := &entry{ + Event: e, + } + return j.enc.Encode(et) +} + +func (j *journal) Close() error { + return j.f.Close() +} diff --git a/supervisor.go b/supervisor.go index c62fd19..1aa4486 100644 --- a/supervisor.go +++ b/supervisor.go @@ -2,6 +2,7 @@ package containerd import ( "os" + "path/filepath" "sync" "github.com/Sirupsen/logrus" @@ -18,11 +19,16 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { if err != nil { return nil, err } + j, err := newJournal(filepath.Join(stateDir, "journal.json")) + if err != nil { + return nil, err + } s := &Supervisor{ stateDir: stateDir, containers: make(map[string]Container), runtime: runtime, tasks: make(chan *startTask, concurrency*100), + journal: j, } for i := 0; i < concurrency; i++ { s.workerGroup.Add(1) @@ -39,24 +45,33 @@ type Supervisor struct { runtime Runtime - events chan Event + journal *journal + + events chan *Event tasks chan *startTask workerGroup sync.WaitGroup } +func (s *Supervisor) Close() error { + return s.journal.Close() +} + // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and // executing new containers. // // This event loop is the only thing that is allowed to modify state of containers and processes. -func (s *Supervisor) Start(events chan Event) error { +func (s *Supervisor) Start(events chan *Event) error { if events == nil { return ErrEventChanNil } s.events = events go func() { - for evt := range events { - switch e := evt.(type) { - case *ExitEvent: + for e := range events { + if err := s.journal.write(e); err != nil { + logrus.WithField("error", err).Error("write journal entry") + } + switch e.Type { + case ExitEventType: logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). Debug("containerd: process exited") container, err := s.getContainerForPid(e.Pid) @@ -70,7 +85,7 @@ func (s *Supervisor) Start(events chan Event) error { if err := s.deleteContainer(container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } - case *StartContainerEvent: + case StartContainerEventType: container, err := s.runtime.Create(e.ID, e.BundlePath) if err != nil { e.Err <- err @@ -81,18 +96,18 @@ func (s *Supervisor) Start(events chan Event) error { err: e.Err, container: container, } - case *ContainerStartErrorEvent: + case ContainerStartErrorEventType: if container, ok := s.containers[e.ID]; ok { if err := s.deleteContainer(container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } } - case *GetContainersEvent: + case GetContainerEventType: for _, c := range s.containers { e.Containers = append(e.Containers, c) } e.Err <- nil - case *SignalEvent: + case SignalEventType: container, ok := s.containers[e.ID] if !ok { e.Err <- ErrContainerNotFound @@ -139,7 +154,7 @@ func (s *Supervisor) getContainerForPid(pid int) (Container, error) { return nil, errNoContainerForPid } -func (s *Supervisor) SendEvent(evt Event) { +func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } @@ -152,9 +167,9 @@ func (s *Supervisor) startContainerWorker(tasks chan *startTask) { defer s.workerGroup.Done() for t := range tasks { if err := t.container.Start(); err != nil { - s.SendEvent(&ContainerStartErrorEvent{ - ID: t.container.ID(), - }) + e := NewEvent(StartContainerEventType) + e.ID = t.container.ID() + s.SendEvent(e) t.err <- err continue }