Streamline events (#287)

* Sync process.State() with the matching events

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>

* Allow requesting events for a specific container

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>

* Sync container state retrieval with other events

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>

* Let containerd take care of calling runtime delete on exit

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>

* Take care of possible race in TestBusyboxTopExecTopKillInit

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickaël Laventure 2016-07-13 11:01:07 -07:00 committed by Michael Crosby
parent 6dd2f1c422
commit 90f827ca10
11 changed files with 342 additions and 212 deletions

View file

@ -41,7 +41,7 @@ func (s *Supervisor) exit(t *ExitTask) error {
Status: status,
Process: proc,
}
s.SendTask(ne)
s.execExit(ne)
return nil
}
container := proc.Container()
@ -50,7 +50,7 @@ func (s *Supervisor) exit(t *ExitTask) error {
Status: status,
PID: proc.ID(),
}
s.SendTask(ne)
s.delete(ne)
ExitProcessTimer.UpdateSince(start)

View file

@ -6,8 +6,11 @@ import "github.com/docker/containerd/runtime"
// containers
type GetContainersTask struct {
baseTask
ID string
ID string
GetState func(c runtime.Container) (interface{}, error)
Containers []runtime.Container
States []interface{}
}
func (s *Supervisor) getContainers(t *GetContainersTask) error {
@ -18,12 +21,26 @@ func (s *Supervisor) getContainers(t *GetContainersTask) error {
return ErrContainerNotFound
}
t.Containers = append(t.Containers, ci.container)
if t.GetState != nil {
st, err := t.GetState(ci.container)
if err != nil {
return err
}
t.States = append(t.States, st)
}
return nil
}
for _, ci := range s.containers {
t.Containers = append(t.Containers, ci.container)
if t.GetState != nil {
st, err := t.GetState(ci.container)
if err != nil {
return err
}
t.States = append(t.States, st)
}
}
return nil

View file

@ -64,7 +64,7 @@ func setupEventLog(s *Supervisor, retainCount int) error {
return err
}
logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
events := s.Events(time.Time{})
events := s.Events(time.Time{}, false, "")
return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount)
}
@ -191,12 +191,13 @@ type Event struct {
// Events returns an event channel that external consumers can use to receive updates
// on container events
func (s *Supervisor) Events(from time.Time) chan Event {
func (s *Supervisor) Events(from time.Time, storedOnly bool, id string) chan Event {
c := make(chan Event, defaultBufferSize)
if storedOnly {
defer s.Unsubscribe(c)
}
s.subscriberLock.Lock()
defer s.subscriberLock.Unlock()
c := make(chan Event, defaultBufferSize)
EventSubscriberCounter.Inc(1)
s.subscribers[c] = struct{}{}
if !from.IsZero() {
// replay old event
s.eventLock.Lock()
@ -204,14 +205,17 @@ func (s *Supervisor) Events(from time.Time) chan Event {
s.eventLock.Unlock()
for _, e := range past {
if e.Timestamp.After(from) {
c <- e
if id == "" || e.ID == id {
c <- e
}
}
}
// Notify the client that from now on it's live events
c <- Event{
Type: StateLive,
Timestamp: time.Now(),
}
}
if storedOnly {
close(c)
} else {
EventSubscriberCounter.Inc(1)
s.subscribers[c] = struct{}{}
}
return c
}
@ -220,9 +224,11 @@ func (s *Supervisor) Events(from time.Time) chan Event {
func (s *Supervisor) Unsubscribe(sub chan Event) {
s.subscriberLock.Lock()
defer s.subscriberLock.Unlock()
delete(s.subscribers, sub)
close(sub)
EventSubscriberCounter.Dec(1)
if _, ok := s.subscribers[sub]; ok {
delete(s.subscribers, sub)
close(sub)
EventSubscriberCounter.Dec(1)
}
}
// notifySubscribers will send the provided event to the external subscribers
@ -364,8 +370,6 @@ func (s *Supervisor) handleTask(i Task) {
err = s.delete(t)
case *ExitTask:
err = s.exit(t)
case *ExecExitTask:
err = s.execExit(t)
case *GetContainersTask:
err = s.getContainers(t)
case *SignalTask: