From 1b8cc65462afc44b84c90fc487ce94329b18f12f Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 18 Apr 2016 17:28:13 -0700 Subject: [PATCH] Set fixed event log size Truncate the event log on disk and in memory so that it does not grow forever. This is mainly used for higher levels to receive past events if they miss any. Signed-off-by: Michael Crosby --- containerd/main.go | 29 +++++++++++---------- supervisor/supervisor.go | 56 +++++++++++++++++++++++++++++++++++----- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/containerd/main.go b/containerd/main.go index 8a78328..e067f2c 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -69,6 +69,11 @@ var daemonFlags = []cli.Flag{ Value: 15 * time.Second, Usage: "timeout duration for waiting on a container to start before it is killed", }, + cli.IntFlag{ + Name: "retain-count", + Value: 500, + Usage: "number of past events to keep in the event log", + }, } func main() { @@ -85,15 +90,7 @@ func main() { setAppBefore(app) app.Action = func(context *cli.Context) { - if err := daemon( - context.String("listen"), - context.String("state-dir"), - 10, - context.String("runtime"), - context.StringSlice("runtime-args"), - context.String("shim"), - context.Duration("start-timeout"), - ); err != nil { + if err := daemon(context); err != nil { logrus.Fatal(err) } } @@ -102,7 +99,7 @@ func main() { } } -func daemon(address, stateDir string, concurrency int, runtimeName string, runtimeArgs []string, shimName string, timeout time.Duration) error { +func daemon(context *cli.Context) error { // setup a standard reaper so that we don't leave any zombies if we are still alive // this is just good practice because we are spawning new processes s := make(chan os.Signal, 2048) @@ -110,12 +107,18 @@ func daemon(address, stateDir string, concurrency int, runtimeName string, runti if err := osutils.SetSubreaper(1); err != nil { logrus.WithField("error", err).Error("containerd: set subpreaper") } - sv, err := supervisor.New(stateDir, runtimeName, shimName, runtimeArgs, timeout) + sv, err := supervisor.New( + context.String("state-dir"), + context.String("runtime"), + context.String("shim"), + context.StringSlice("runtime-args"), + context.Duration("start-timeout"), + context.Int("retain-count")) if err != nil { return err } wg := &sync.WaitGroup{} - for i := 0; i < concurrency; i++ { + for i := 0; i < 10; i++ { wg.Add(1) w := supervisor.NewWorker(sv, wg) go w.Start() @@ -123,7 +126,7 @@ func daemon(address, stateDir string, concurrency int, runtimeName string, runti if err := sv.Start(); err != nil { return err } - server, err := startServer(address, sv) + server, err := startServer(context.String("listen"), sv) if err != nil { return err } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index ea245fb..6c582e0 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -18,7 +18,7 @@ const ( ) // New returns an initialized Process supervisor. -func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration) (*Supervisor, error) { +func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) { startTasks := make(chan *startTask, 10) if err := os.MkdirAll(stateDir, 0755); err != nil { return nil, err @@ -44,7 +44,7 @@ func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, ti shim: shimName, timeout: timeout, } - if err := setupEventLog(s); err != nil { + if err := setupEventLog(s, retainCount); err != nil { return nil, err } go s.exitHandler() @@ -59,20 +59,60 @@ type containerInfo struct { container runtime.Container } -func setupEventLog(s *Supervisor) error { +func setupEventLog(s *Supervisor, retainCount int) error { if err := readEventLog(s); err != nil { return err } logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events") events := s.Events(time.Time{}) - f, err := os.OpenFile(filepath.Join(s.stateDir, "events.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755) + return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount) +} + +func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error { + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755) if err != nil { return err } - enc := json.NewEncoder(f) go func() { + var ( + count = len(s.eventLog) + enc = json.NewEncoder(f) + ) for e := range events { + // if we have a specified retain count make sure the truncate the event + // log if it grows past the specified number of events to keep. + if retainCount > 0 { + if count > retainCount { + logrus.Debug("truncating event log") + // close the log file + if f != nil { + f.Close() + } + slice := retainCount - 1 + l := len(s.eventLog) + if slice >= l { + slice = l + } + s.eventLock.Lock() + s.eventLog = s.eventLog[len(s.eventLog)-slice:] + s.eventLock.Unlock() + if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil { + logrus.WithField("error", err).Error("containerd: open event to journal") + continue + } + enc = json.NewEncoder(f) + count = 0 + for _, le := range s.eventLog { + if err := enc.Encode(le); err != nil { + logrus.WithField("error", err).Error("containerd: write event to journal") + } + } + } + } + s.eventLock.Lock() s.eventLog = append(s.eventLog, e) + s.eventLock.Unlock() + count++ if err := enc.Encode(e); err != nil { logrus.WithField("error", err).Error("containerd: write event to journal") } @@ -121,6 +161,7 @@ type Supervisor struct { tasks chan Task monitor *Monitor eventLog []Event + eventLock sync.Mutex timeout time.Duration } @@ -156,7 +197,10 @@ func (s *Supervisor) Events(from time.Time) chan Event { s.subscribers[c] = struct{}{} if !from.IsZero() { // replay old event - for _, e := range s.eventLog { + s.eventLock.Lock() + past := s.eventLog[:] + s.eventLock.Unlock() + for _, e := range past { if e.Timestamp.After(from) { c <- e }