diff --git a/containerd/main.go b/containerd/main.go index 8a78328..e067f2c 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -69,6 +69,11 @@ var daemonFlags = []cli.Flag{ Value: 15 * time.Second, Usage: "timeout duration for waiting on a container to start before it is killed", }, + cli.IntFlag{ + Name: "retain-count", + Value: 500, + Usage: "number of past events to keep in the event log", + }, } func main() { @@ -85,15 +90,7 @@ func main() { setAppBefore(app) app.Action = func(context *cli.Context) { - if err := daemon( - context.String("listen"), - context.String("state-dir"), - 10, - context.String("runtime"), - context.StringSlice("runtime-args"), - context.String("shim"), - context.Duration("start-timeout"), - ); err != nil { + if err := daemon(context); err != nil { logrus.Fatal(err) } } @@ -102,7 +99,7 @@ func main() { } } -func daemon(address, stateDir string, concurrency int, runtimeName string, runtimeArgs []string, shimName string, timeout time.Duration) error { +func daemon(context *cli.Context) error { // setup a standard reaper so that we don't leave any zombies if we are still alive // this is just good practice because we are spawning new processes s := make(chan os.Signal, 2048) @@ -110,12 +107,18 @@ func daemon(address, stateDir string, concurrency int, runtimeName string, runti if err := osutils.SetSubreaper(1); err != nil { logrus.WithField("error", err).Error("containerd: set subpreaper") } - sv, err := supervisor.New(stateDir, runtimeName, shimName, runtimeArgs, timeout) + sv, err := supervisor.New( + context.String("state-dir"), + context.String("runtime"), + context.String("shim"), + context.StringSlice("runtime-args"), + context.Duration("start-timeout"), + context.Int("retain-count")) if err != nil { return err } wg := &sync.WaitGroup{} - for i := 0; i < concurrency; i++ { + for i := 0; i < 10; i++ { wg.Add(1) w := supervisor.NewWorker(sv, wg) go w.Start() @@ -123,7 +126,7 @@ func daemon(address, stateDir string, concurrency int, runtimeName string, runti if err := sv.Start(); err != nil { return err } - server, err := startServer(address, sv) + server, err := startServer(context.String("listen"), sv) if err != nil { return err } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index ea245fb..6c582e0 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -18,7 +18,7 @@ const ( ) // New returns an initialized Process supervisor. -func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration) (*Supervisor, error) { +func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) { startTasks := make(chan *startTask, 10) if err := os.MkdirAll(stateDir, 0755); err != nil { return nil, err @@ -44,7 +44,7 @@ func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, ti shim: shimName, timeout: timeout, } - if err := setupEventLog(s); err != nil { + if err := setupEventLog(s, retainCount); err != nil { return nil, err } go s.exitHandler() @@ -59,20 +59,60 @@ type containerInfo struct { container runtime.Container } -func setupEventLog(s *Supervisor) error { +func setupEventLog(s *Supervisor, retainCount int) error { if err := readEventLog(s); err != nil { return err } logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events") events := s.Events(time.Time{}) - f, err := os.OpenFile(filepath.Join(s.stateDir, "events.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755) + return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount) +} + +func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error { + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755) if err != nil { return err } - enc := json.NewEncoder(f) go func() { + var ( + count = len(s.eventLog) + enc = json.NewEncoder(f) + ) for e := range events { + // if we have a specified retain count make sure the truncate the event + // log if it grows past the specified number of events to keep. + if retainCount > 0 { + if count > retainCount { + logrus.Debug("truncating event log") + // close the log file + if f != nil { + f.Close() + } + slice := retainCount - 1 + l := len(s.eventLog) + if slice >= l { + slice = l + } + s.eventLock.Lock() + s.eventLog = s.eventLog[len(s.eventLog)-slice:] + s.eventLock.Unlock() + if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil { + logrus.WithField("error", err).Error("containerd: open event to journal") + continue + } + enc = json.NewEncoder(f) + count = 0 + for _, le := range s.eventLog { + if err := enc.Encode(le); err != nil { + logrus.WithField("error", err).Error("containerd: write event to journal") + } + } + } + } + s.eventLock.Lock() s.eventLog = append(s.eventLog, e) + s.eventLock.Unlock() + count++ if err := enc.Encode(e); err != nil { logrus.WithField("error", err).Error("containerd: write event to journal") } @@ -121,6 +161,7 @@ type Supervisor struct { tasks chan Task monitor *Monitor eventLog []Event + eventLock sync.Mutex timeout time.Duration } @@ -156,7 +197,10 @@ func (s *Supervisor) Events(from time.Time) chan Event { s.subscribers[c] = struct{}{} if !from.IsZero() { // replay old event - for _, e := range s.eventLog { + s.eventLock.Lock() + past := s.eventLog[:] + s.eventLock.Unlock() + for _, e := range past { if e.Timestamp.After(from) { c <- e }