Add events support in client

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2015-12-10 12:30:04 -08:00
parent 288b9a0cc3
commit 71ef776082
7 changed files with 70 additions and 20 deletions

View file

@ -217,12 +217,14 @@ func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContaine
func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error { func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
events := s.sv.Events() events := s.sv.Events()
defer s.sv.Unsubscribe(events)
for evt := range events { for evt := range events {
switch evt.Type { switch evt.Type {
case containerd.ExitEventType: case containerd.ExitEventType:
ev := &types.Event{ ev := &types.Event{
Type: "exit", Type: "exit",
Id: evt.ID, Id: evt.ID,
Pid: uint32(evt.Pid),
Status: uint32(evt.Status), Status: uint32(evt.Status),
} }
if err := stream.Send(ev); err != nil { if err := stream.Send(ev); err != nil {

34
ctr/events.go Normal file
View file

@ -0,0 +1,34 @@
package main
import (
"fmt"
"os"
"text/tabwriter"
"github.com/codegangsta/cli"
"github.com/docker/containerd/api/grpc/types"
netcontext "golang.org/x/net/context"
)
var EventsCommand = cli.Command{
Name: "events",
Usage: "receive events from the containerd daemon",
Action: func(context *cli.Context) {
c := getClient()
events, err := c.Events(netcontext.Background(), &types.EventsRequest{})
if err != nil {
fatal(err.Error(), 1)
}
w := tabwriter.NewWriter(os.Stdout, 20, 1, 3, ' ', 0)
fmt.Fprint(w, "TYPE\tID\tPID\tSTATUS\n")
w.Flush()
for {
e, err := events.Recv()
if err != nil {
fatal(err.Error(), 1)
}
fmt.Fprintf(w, "%s\t%s\t%d\t%d\n", e.Type, e.Id, e.Pid, e.Status)
w.Flush()
}
},
}

View file

@ -36,6 +36,7 @@ func main() {
app.Commands = []cli.Command{ app.Commands = []cli.Command{
ContainersCommand, ContainersCommand,
CheckpointCommand, CheckpointCommand,
EventsCommand,
} }
app.Before = func(context *cli.Context) error { app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") { if context.GlobalBool("debug") {

View file

@ -14,6 +14,12 @@ func (h *DeleteEvent) Handle(e *Event) error {
if err := h.deleteContainer(container); err != nil { if err := h.deleteContainer(container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container") logrus.WithField("error", err).Error("containerd: deleting container")
} }
h.s.NotifySubscribers(&Event{
Type: ExitEventType,
ID: e.ID,
Status: e.Status,
Pid: e.Pid,
})
ContainersCounter.Dec(1) ContainersCounter.Dec(1)
h.s.containerGroup.Done() h.s.containerGroup.Done()
} }

10
exit.go
View file

@ -6,10 +6,6 @@ type ExitEvent struct {
s *Supervisor s *Supervisor
} }
type ExecExitEvent struct {
s *Supervisor
}
func (h *ExitEvent) Handle(e *Event) error { func (h *ExitEvent) Handle(e *Event) error {
logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}).
Debug("containerd: process exited") Debug("containerd: process exited")
@ -34,10 +30,16 @@ func (h *ExitEvent) Handle(e *Event) error {
container.SetExited(e.Status) container.SetExited(e.Status)
ne := NewEvent(DeleteEventType) ne := NewEvent(DeleteEventType)
ne.ID = container.ID() ne.ID = container.ID()
ne.Pid = e.Pid
ne.Status = e.Status
h.s.SendEvent(ne) h.s.SendEvent(ne)
return nil return nil
} }
type ExecExitEvent struct {
s *Supervisor
}
func (h *ExecExitEvent) Handle(e *Event) error { func (h *ExecExitEvent) Handle(e *Event) error {
// exec process: we remove this process without notifying the main event loop // exec process: we remove this process without notifying the main event loop
container := h.s.processes[e.Pid] container := h.s.processes[e.Pid]

View file

@ -3,9 +3,10 @@ package containerd
import "github.com/rcrowley/go-metrics" import "github.com/rcrowley/go-metrics"
var ( var (
ContainerStartTimer = metrics.NewTimer() ContainerStartTimer = metrics.NewTimer()
ContainersCounter = metrics.NewCounter() ContainersCounter = metrics.NewCounter()
EventsCounter = metrics.NewCounter() EventsCounter = metrics.NewCounter()
EventSubscriberCounter = metrics.NewCounter()
) )
func Metrics() map[string]interface{} { func Metrics() map[string]interface{} {
@ -13,5 +14,6 @@ func Metrics() map[string]interface{} {
"container-start-time": ContainerStartTimer, "container-start-time": ContainerStartTimer,
"containers": ContainersCounter, "containers": ContainersCounter,
"events": EventsCounter, "events": EventsCounter,
"events-subscribers": EventSubscriberCounter,
} }
} }

View file

@ -28,13 +28,14 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err
return nil, err return nil, err
} }
s := &Supervisor{ s := &Supervisor{
stateDir: stateDir, stateDir: stateDir,
containers: make(map[string]runtime.Container), containers: make(map[string]runtime.Container),
processes: make(map[int]runtime.Container), processes: make(map[int]runtime.Container),
runtime: r, runtime: r,
tasks: tasks, tasks: tasks,
events: make(chan *Event, 2048), events: make(chan *Event, 2048),
machine: machine, machine: machine,
subscribers: make(map[chan *Event]struct{}),
} }
// register default event handlers // register default event handlers
s.handlers = map[EventType]Handler{ s.handlers = map[EventType]Handler{
@ -62,13 +63,11 @@ type Supervisor struct {
runtime runtime.Runtime runtime runtime.Runtime
events chan *Event events chan *Event
tasks chan *StartTask tasks chan *StartTask
subscribers map[subscriber]bool subscribers map[chan *Event]struct{}
machine Machine machine Machine
containerGroup sync.WaitGroup containerGroup sync.WaitGroup
} }
type subscriber chan *Event
func (s *Supervisor) Stop(sig chan os.Signal) { func (s *Supervisor) Stop(sig chan os.Signal) {
// Close the tasks channel so that no new containers get started // Close the tasks channel so that no new containers get started
close(s.tasks) close(s.tasks)
@ -109,12 +108,16 @@ func (s *Supervisor) Close() error {
return nil return nil
} }
func (s *Supervisor) Events() subscriber { func (s *Supervisor) Events() chan *Event {
return subscriber(make(chan *Event)) c := make(chan *Event, 2048)
EventSubscriberCounter.Inc(1)
s.subscribers[c] = struct{}{}
return c
} }
func (s *Supervisor) Unsubscribe(sub subscriber) { func (s *Supervisor) Unsubscribe(sub chan *Event) {
delete(s.subscribers, sub) delete(s.subscribers, sub)
EventSubscriberCounter.Dec(1)
} }
func (s *Supervisor) NotifySubscribers(e *Event) { func (s *Supervisor) NotifySubscribers(e *Event) {