add event log and timestamp to events api

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2016-02-12 10:17:59 -08:00
parent 3dc59d565a
commit 9341a95c26
6 changed files with 191 additions and 111 deletions

View file

@ -1,8 +1,11 @@
package supervisor
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
"time"
@ -40,6 +43,9 @@ func New(stateDir string, oom bool) (*Supervisor, error) {
el: eventloop.NewChanLoop(defaultBufferSize),
monitor: monitor,
}
if err := setupEventLog(s); err != nil {
return nil, err
}
if oom {
s.notifier = chanotify.New()
go func() {
@ -76,6 +82,51 @@ type containerInfo struct {
container runtime.Container
}
func setupEventLog(s *Supervisor) error {
if err := readEventLog(s); err != nil {
return err
}
logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
events := s.Events(time.Time{})
f, err := os.OpenFile(filepath.Join(s.stateDir, "events.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0755)
if err != nil {
return err
}
enc := json.NewEncoder(f)
go func() {
for e := range events {
s.eventLog = append(s.eventLog, e)
if err := enc.Encode(e); err != nil {
logrus.WithField("error", err).Error("containerd: write event to journal")
}
}
}()
return nil
}
func readEventLog(s *Supervisor) error {
f, err := os.Open(filepath.Join(s.stateDir, "events.log"))
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer f.Close()
dec := json.NewDecoder(f)
for {
var e Event
if err := dec.Decode(&e); err != nil {
if err == io.EOF {
return nil
}
return err
}
s.eventLog = append(s.eventLog, e)
}
return nil
}
type Supervisor struct {
// stateDir is the directory on the system to store container runtime state information.
stateDir string
@ -91,6 +142,7 @@ type Supervisor struct {
notifier *chanotify.Notifier
el eventloop.EventLoop
monitor *Monitor
eventLog []Event
}
// Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to
@ -117,12 +169,20 @@ type Event struct {
// Events returns an event channel that external consumers can use to receive updates
// on container events
func (s *Supervisor) Events() chan Event {
func (s *Supervisor) Events(from time.Time) chan Event {
s.subscriberLock.Lock()
defer s.subscriberLock.Unlock()
c := make(chan Event, defaultBufferSize)
EventSubscriberCounter.Inc(1)
s.subscribers[c] = struct{}{}
if !from.IsZero() {
// replay old event
for _, e := range s.eventLog {
if e.Timestamp.After(from) {
c <- e
}
}
}
return c
}
@ -145,7 +205,7 @@ func (s *Supervisor) notifySubscribers(e Event) {
select {
case sub <- e:
default:
logrus.WithField("event", e.Type).Warn("event not sent to subscriber")
logrus.WithField("event", e.Type).Warn("containerd: event not sent to subscriber")
}
}
}
@ -157,9 +217,7 @@ func (s *Supervisor) notifySubscribers(e Event) {
// therefore it is save to do operations in the handlers that modify state of the system or
// state of the Supervisor
func (s *Supervisor) Start() error {
logrus.WithFields(logrus.Fields{
"stateDir": s.stateDir,
}).Debug("Supervisor started")
logrus.WithField("stateDir", s.stateDir).Debug("containerd: supervisor running")
return s.el.Start()
}