Merge pull request #209 from crosbymichael/event-path

Set fixed event log size
This commit is contained in:
Michael Crosby 2016-04-21 10:23:37 -07:00
commit 6a33c8a49b
2 changed files with 66 additions and 19 deletions

View file

@ -69,6 +69,11 @@ var daemonFlags = []cli.Flag{
Value: 15 * time.Second,
Usage: "timeout duration for waiting on a container to start before it is killed",
},
cli.IntFlag{
Name: "retain-count",
Value: 500,
Usage: "number of past events to keep in the event log",
},
}
func main() {
@ -85,15 +90,7 @@ func main() {
setAppBefore(app)
app.Action = func(context *cli.Context) {
if err := daemon(
context.String("listen"),
context.String("state-dir"),
10,
context.String("runtime"),
context.StringSlice("runtime-args"),
context.String("shim"),
context.Duration("start-timeout"),
); err != nil {
if err := daemon(context); err != nil {
logrus.Fatal(err)
}
}
@ -102,7 +99,7 @@ func main() {
}
}
func daemon(address, stateDir string, concurrency int, runtimeName string, runtimeArgs []string, shimName string, timeout time.Duration) error {
func daemon(context *cli.Context) error {
// setup a standard reaper so that we don't leave any zombies if we are still alive
// this is just good practice because we are spawning new processes
s := make(chan os.Signal, 2048)
@ -110,12 +107,18 @@ func daemon(address, stateDir string, concurrency int, runtimeName string, runti
if err := osutils.SetSubreaper(1); err != nil {
logrus.WithField("error", err).Error("containerd: set subpreaper")
}
sv, err := supervisor.New(stateDir, runtimeName, shimName, runtimeArgs, timeout)
sv, err := supervisor.New(
context.String("state-dir"),
context.String("runtime"),
context.String("shim"),
context.StringSlice("runtime-args"),
context.Duration("start-timeout"),
context.Int("retain-count"))
if err != nil {
return err
}
wg := &sync.WaitGroup{}
for i := 0; i < concurrency; i++ {
for i := 0; i < 10; i++ {
wg.Add(1)
w := supervisor.NewWorker(sv, wg)
go w.Start()
@ -123,7 +126,7 @@ func daemon(address, stateDir string, concurrency int, runtimeName string, runti
if err := sv.Start(); err != nil {
return err
}
server, err := startServer(address, sv)
server, err := startServer(context.String("listen"), sv)
if err != nil {
return err
}

View file

@ -18,7 +18,7 @@ const (
)
// New returns an initialized Process supervisor.
func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration) (*Supervisor, error) {
func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) {
startTasks := make(chan *startTask, 10)
if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, err
@ -44,7 +44,7 @@ func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, ti
shim: shimName,
timeout: timeout,
}
if err := setupEventLog(s); err != nil {
if err := setupEventLog(s, retainCount); err != nil {
return nil, err
}
go s.exitHandler()
@ -59,20 +59,60 @@ type containerInfo struct {
container runtime.Container
}
func setupEventLog(s *Supervisor) error {
func setupEventLog(s *Supervisor, retainCount int) error {
if err := readEventLog(s); err != nil {
return err
}
logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
events := s.Events(time.Time{})
f, err := os.OpenFile(filepath.Join(s.stateDir, "events.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount)
}
func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
if err != nil {
return err
}
enc := json.NewEncoder(f)
go func() {
var (
count = len(s.eventLog)
enc = json.NewEncoder(f)
)
for e := range events {
// if we have a specified retain count make sure the truncate the event
// log if it grows past the specified number of events to keep.
if retainCount > 0 {
if count > retainCount {
logrus.Debug("truncating event log")
// close the log file
if f != nil {
f.Close()
}
slice := retainCount - 1
l := len(s.eventLog)
if slice >= l {
slice = l
}
s.eventLock.Lock()
s.eventLog = s.eventLog[len(s.eventLog)-slice:]
s.eventLock.Unlock()
if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil {
logrus.WithField("error", err).Error("containerd: open event to journal")
continue
}
enc = json.NewEncoder(f)
count = 0
for _, le := range s.eventLog {
if err := enc.Encode(le); err != nil {
logrus.WithField("error", err).Error("containerd: write event to journal")
}
}
}
}
s.eventLock.Lock()
s.eventLog = append(s.eventLog, e)
s.eventLock.Unlock()
count++
if err := enc.Encode(e); err != nil {
logrus.WithField("error", err).Error("containerd: write event to journal")
}
@ -121,6 +161,7 @@ type Supervisor struct {
tasks chan Task
monitor *Monitor
eventLog []Event
eventLock sync.Mutex
timeout time.Duration
}
@ -156,7 +197,10 @@ func (s *Supervisor) Events(from time.Time) chan Event {
s.subscribers[c] = struct{}{}
if !from.IsZero() {
// replay old event
for _, e := range s.eventLog {
s.eventLock.Lock()
past := s.eventLog[:]
s.eventLock.Unlock()
for _, e := range past {
if e.Timestamp.After(from) {
c <- e
}