Rename Events to task in supervisor

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2016-02-11 17:26:24 -08:00
parent d2bf71043f
commit 3dc59d565a
18 changed files with 274 additions and 281 deletions

View file

@ -6,13 +6,13 @@ import (
"github.com/docker/containerd/runtime"
)
type AddProcessEvent struct {
type AddProcessTask struct {
s *Supervisor
}
// TODO: add this to worker for concurrent starts??? maybe not because of races where the container
// could be stopped and removed...
func (h *AddProcessEvent) Handle(e *Event) error {
func (h *AddProcessTask) Handle(e *Task) error {
start := time.Now()
ci, ok := h.s.containers[e.ID]
if !ok {
@ -27,5 +27,11 @@ func (h *AddProcessEvent) Handle(e *Event) error {
}
ExecProcessTimer.UpdateSince(start)
e.StartResponse <- StartResponse{}
h.s.notifySubscribers(Event{
Timestamp: time.Now(),
Type: "start-process",
Pid: e.Pid,
ID: e.ID,
})
return nil
}

View file

@ -1,10 +1,10 @@
package supervisor
type CreateCheckpointEvent struct {
type CreateCheckpointTask struct {
s *Supervisor
}
func (h *CreateCheckpointEvent) Handle(e *Event) error {
func (h *CreateCheckpointTask) Handle(e *Task) error {
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
@ -12,11 +12,11 @@ func (h *CreateCheckpointEvent) Handle(e *Event) error {
return i.container.Checkpoint(*e.Checkpoint)
}
type DeleteCheckpointEvent struct {
type DeleteCheckpointTask struct {
s *Supervisor
}
func (h *DeleteCheckpointEvent) Handle(e *Event) error {
func (h *DeleteCheckpointTask) Handle(e *Task) error {
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound

View file

@ -6,11 +6,11 @@ import (
"github.com/docker/containerd/runtime"
)
type StartEvent struct {
type StartTask struct {
s *Supervisor
}
func (h *StartEvent) Handle(e *Event) error {
func (h *StartTask) Handle(e *Task) error {
start := time.Now()
container, err := runtime.New(h.s.stateDir, e.ID, e.BundlePath, e.Labels)
if err != nil {
@ -20,7 +20,7 @@ func (h *StartEvent) Handle(e *Event) error {
container: container,
}
ContainersCounter.Inc(1)
task := &StartTask{
task := &startTask{
Err: e.Err,
Container: container,
StartResponse: e.StartResponse,

View file

@ -7,21 +7,22 @@ import (
"github.com/docker/containerd/runtime"
)
type DeleteEvent struct {
type DeleteTask struct {
s *Supervisor
}
func (h *DeleteEvent) Handle(e *Event) error {
func (h *DeleteTask) Handle(e *Task) error {
if i, ok := h.s.containers[e.ID]; ok {
start := time.Now()
if err := h.deleteContainer(i.container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container")
}
h.s.notifySubscribers(&Event{
Type: ExitEventType,
ID: e.ID,
Status: e.Status,
Pid: e.Pid,
h.s.notifySubscribers(Event{
Type: "exit",
Timestamp: time.Now(),
ID: e.ID,
Status: e.Status,
Pid: e.Pid,
})
ContainersCounter.Dec(1)
ContainerDeleteTimer.UpdateSince(start)
@ -29,7 +30,7 @@ func (h *DeleteEvent) Handle(e *Event) error {
return nil
}
func (h *DeleteEvent) deleteContainer(container runtime.Container) error {
func (h *DeleteTask) deleteContainer(container runtime.Container) error {
delete(h.s.containers, container.ID())
return container.Delete()
}

View file

@ -4,13 +4,13 @@ import "errors"
var (
// External errors
ErrEventChanNil = errors.New("containerd: event channel is nil")
ErrTaskChanNil = errors.New("containerd: task channel is nil")
ErrBundleNotFound = errors.New("containerd: bundle not found")
ErrContainerNotFound = errors.New("containerd: container not found")
ErrContainerExists = errors.New("containerd: container already exists")
ErrProcessNotFound = errors.New("containerd: processs not found for container")
ErrUnknownContainerStatus = errors.New("containerd: unknown container status ")
ErrUnknownEvent = errors.New("containerd: unknown event type")
ErrUnknownTask = errors.New("containerd: unknown task type")
// Internal errors
errShutdown = errors.New("containerd: supervisor is shutdown")

View file

@ -7,11 +7,11 @@ import (
"github.com/docker/containerd/runtime"
)
type ExitEvent struct {
type ExitTask struct {
s *Supervisor
}
func (h *ExitEvent) Handle(e *Event) error {
func (h *ExitTask) Handle(e *Task) error {
start := time.Now()
proc := e.Process
status, err := proc.ExitStatus()
@ -23,37 +23,43 @@ func (h *ExitEvent) Handle(e *Event) error {
// if the process is the the init process of the container then
// fire a separate event for this process
if proc.ID() != runtime.InitProcessID {
ne := NewEvent(ExecExitEventType)
ne := NewTask(ExecExitTaskType)
ne.ID = proc.Container().ID()
ne.Pid = proc.ID()
ne.Status = status
ne.Process = proc
h.s.SendEvent(ne)
h.s.SendTask(ne)
return nil
}
container := proc.Container()
ne := NewEvent(DeleteEventType)
ne := NewTask(DeleteTaskType)
ne.ID = container.ID()
ne.Status = status
ne.Pid = proc.ID()
h.s.SendEvent(ne)
h.s.SendTask(ne)
ExitProcessTimer.UpdateSince(start)
return nil
}
type ExecExitEvent struct {
type ExecExitTask struct {
s *Supervisor
}
func (h *ExecExitEvent) Handle(e *Event) error {
func (h *ExecExitTask) Handle(e *Task) error {
container := e.Process.Container()
// exec process: we remove this process without notifying the main event loop
if err := container.RemoveProcess(e.Pid); err != nil {
logrus.WithField("error", err).Error("containerd: find container for pid")
}
h.s.notifySubscribers(e)
h.s.notifySubscribers(Event{
Timestamp: time.Now(),
ID: e.ID,
Type: "exit",
Pid: e.Pid,
Status: e.Status,
})
return nil
}

View file

@ -1,10 +1,10 @@
package supervisor
type GetContainersEvent struct {
type GetContainersTask struct {
s *Supervisor
}
func (h *GetContainersEvent) Handle(e *Event) error {
func (h *GetContainersTask) Handle(e *Task) error {
if e.ID != "" {
ci := h.s.containers[e.ID]
if ci == nil {

View file

@ -9,7 +9,7 @@ var (
ContainerStatsTimer = metrics.NewTimer()
ContainersCounter = metrics.NewCounter()
EventSubscriberCounter = metrics.NewCounter()
EventsCounter = metrics.NewCounter()
TasksCounter = metrics.NewCounter()
ExecProcessTimer = metrics.NewTimer()
ExitProcessTimer = metrics.NewTimer()
EpollFdCounter = metrics.NewCounter()
@ -23,7 +23,7 @@ func Metrics() map[string]interface{} {
"container-stats-time": ContainerStatsTimer,
"containers": ContainersCounter,
"event-subscribers": EventSubscriberCounter,
"events": EventsCounter,
"tasks": TasksCounter,
"exec-process-time": ExecProcessTimer,
"exit-process-time": ExitProcessTimer,
"epoll-fds": EpollFdCounter,

View file

@ -1,10 +1,10 @@
package supervisor
type SignalEvent struct {
type SignalTask struct {
s *Supervisor
}
func (h *SignalEvent) Handle(e *Event) error {
func (h *SignalTask) Handle(e *Task) error {
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound

View file

@ -2,11 +2,11 @@ package supervisor
import "time"
type StatsEvent struct {
type StatsTask struct {
s *Supervisor
}
func (h *StatsEvent) Handle(e *Event) error {
func (h *StatsTask) Handle(e *Task) error {
start := time.Now()
i, ok := h.s.containers[e.ID]
if !ok {

View file

@ -5,6 +5,7 @@ import (
"os"
"sort"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/chanotify"
@ -17,7 +18,8 @@ const (
)
// New returns an initialized Process supervisor.
func New(stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error) {
func New(stateDir string, oom bool) (*Supervisor, error) {
tasks := make(chan *startTask, 10)
if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, err
}
@ -34,7 +36,7 @@ func New(stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error)
containers: make(map[string]*containerInfo),
tasks: tasks,
machine: machine,
subscribers: make(map[chan *Event]struct{}),
subscribers: make(map[chan Event]struct{}),
el: eventloop.NewChanLoop(defaultBufferSize),
monitor: monitor,
}
@ -42,26 +44,26 @@ func New(stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error)
s.notifier = chanotify.New()
go func() {
for id := range s.notifier.Chan() {
e := NewEvent(OOMEventType)
e := NewTask(OOMTaskType)
e.ID = id.(string)
s.SendEvent(e)
s.SendTask(e)
}
}()
}
// register default event handlers
s.handlers = map[EventType]Handler{
ExecExitEventType: &ExecExitEvent{s},
ExitEventType: &ExitEvent{s},
StartContainerEventType: &StartEvent{s},
DeleteEventType: &DeleteEvent{s},
GetContainerEventType: &GetContainersEvent{s},
SignalEventType: &SignalEvent{s},
AddProcessEventType: &AddProcessEvent{s},
UpdateContainerEventType: &UpdateEvent{s},
CreateCheckpointEventType: &CreateCheckpointEvent{s},
DeleteCheckpointEventType: &DeleteCheckpointEvent{s},
StatsEventType: &StatsEvent{s},
UpdateProcessEventType: &UpdateProcessEvent{s},
s.handlers = map[TaskType]Handler{
ExecExitTaskType: &ExecExitTask{s},
ExitTaskType: &ExitTask{s},
StartContainerTaskType: &StartTask{s},
DeleteTaskType: &DeleteTask{s},
GetContainerTaskType: &GetContainersTask{s},
SignalTaskType: &SignalTask{s},
AddProcessTaskType: &AddProcessTask{s},
UpdateContainerTaskType: &UpdateTask{s},
CreateCheckpointTaskType: &CreateCheckpointTask{s},
DeleteCheckpointTaskType: &DeleteCheckpointTask{s},
StatsTaskType: &StatsTask{s},
UpdateProcessTaskType: &UpdateProcessTask{s},
}
go s.exitHandler()
if err := s.restore(); err != nil {
@ -78,13 +80,13 @@ type Supervisor struct {
// stateDir is the directory on the system to store container runtime state information.
stateDir string
containers map[string]*containerInfo
handlers map[EventType]Handler
events chan *Event
tasks chan *StartTask
handlers map[TaskType]Handler
events chan *Task
tasks chan *startTask
// we need a lock around the subscribers map only because additions and deletions from
// the map are via the API so we cannot really control the concurrency
subscriberLock sync.RWMutex
subscribers map[chan *Event]struct{}
subscribers map[chan Event]struct{}
machine Machine
notifier *chanotify.Notifier
el eventloop.EventLoop
@ -105,19 +107,27 @@ func (s *Supervisor) Close() error {
return nil
}
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Pid string `json:"pid,omitempty"`
Status int `json:"status,omitempty"`
}
// Events returns an event channel that external consumers can use to receive updates
// on container events
func (s *Supervisor) Events() chan *Event {
func (s *Supervisor) Events() chan Event {
s.subscriberLock.Lock()
defer s.subscriberLock.Unlock()
c := make(chan *Event, defaultBufferSize)
c := make(chan Event, defaultBufferSize)
EventSubscriberCounter.Inc(1)
s.subscribers[c] = struct{}{}
return c
}
// Unsubscribe removes the provided channel from receiving any more events
func (s *Supervisor) Unsubscribe(sub chan *Event) {
func (s *Supervisor) Unsubscribe(sub chan Event) {
s.subscriberLock.Lock()
defer s.subscriberLock.Unlock()
delete(s.subscribers, sub)
@ -127,7 +137,7 @@ func (s *Supervisor) Unsubscribe(sub chan *Event) {
// notifySubscribers will send the provided event to the external subscribers
// of the events channel
func (s *Supervisor) notifySubscribers(e *Event) {
func (s *Supervisor) notifySubscribers(e Event) {
s.subscriberLock.RLock()
defer s.subscriberLock.RUnlock()
for sub := range s.subscribers {
@ -159,17 +169,17 @@ func (s *Supervisor) Machine() Machine {
return s.machine
}
// SendEvent sends the provided event the the supervisors main event loop
func (s *Supervisor) SendEvent(evt *Event) {
EventsCounter.Inc(1)
s.el.Send(&commonEvent{data: evt, sv: s})
// SendTask sends the provided event the the supervisors main event loop
func (s *Supervisor) SendTask(evt *Task) {
TasksCounter.Inc(1)
s.el.Send(&commonTask{data: evt, sv: s})
}
func (s *Supervisor) exitHandler() {
for p := range s.monitor.Exits() {
e := NewEvent(ExitEventType)
e := NewTask(ExitTaskType)
e.Process = p
s.SendEvent(e)
s.SendTask(e)
}
}
@ -215,9 +225,9 @@ func (s *Supervisor) restore() error {
// exit events
sort.Sort(&processSorter{exitedProcesses})
for _, p := range exitedProcesses {
e := NewEvent(ExitEventType)
e := NewTask(ExitTaskType)
e.Process = p
s.SendEvent(e)
s.SendTask(e)
}
}
}

View file

@ -8,26 +8,26 @@ import (
"github.com/opencontainers/specs"
)
type EventType string
type TaskType string
const (
ExecExitEventType EventType = "execExit"
ExitEventType EventType = "exit"
StartContainerEventType EventType = "startContainer"
DeleteEventType EventType = "deleteContainerEvent"
GetContainerEventType EventType = "getContainer"
SignalEventType EventType = "signal"
AddProcessEventType EventType = "addProcess"
UpdateContainerEventType EventType = "updateContainer"
UpdateProcessEventType EventType = "updateProcess"
CreateCheckpointEventType EventType = "createCheckpoint"
DeleteCheckpointEventType EventType = "deleteCheckpoint"
StatsEventType EventType = "events"
OOMEventType EventType = "oom"
ExecExitTaskType TaskType = "execExit"
ExitTaskType TaskType = "exit"
StartContainerTaskType TaskType = "startContainer"
DeleteTaskType TaskType = "deleteContainerEvent"
GetContainerTaskType TaskType = "getContainer"
SignalTaskType TaskType = "signal"
AddProcessTaskType TaskType = "addProcess"
UpdateContainerTaskType TaskType = "updateContainer"
UpdateProcessTaskType TaskType = "updateProcess"
CreateCheckpointTaskType TaskType = "createCheckpoint"
DeleteCheckpointTaskType TaskType = "deleteCheckpoint"
StatsTaskType TaskType = "events"
OOMTaskType TaskType = "oom"
)
func NewEvent(t EventType) *Event {
return &Event{
func NewTask(t TaskType) *Task {
return &Task{
Type: t,
Timestamp: time.Now(),
Err: make(chan error, 1),
@ -38,8 +38,8 @@ type StartResponse struct {
Container runtime.Container
}
type Event struct {
Type EventType
type Task struct {
Type TaskType
Timestamp time.Time
ID string
BundlePath string
@ -66,18 +66,18 @@ type Event struct {
}
type Handler interface {
Handle(*Event) error
Handle(*Task) error
}
type commonEvent struct {
data *Event
type commonTask struct {
data *Task
sv *Supervisor
}
func (e *commonEvent) Handle() {
func (e *commonTask) Handle() {
h, ok := e.sv.handlers[e.data.Type]
if !ok {
e.data.Err <- ErrUnknownEvent
e.data.Err <- ErrUnknownTask
return
}
err := h.Handle(e.data)

View file

@ -1,12 +1,16 @@
package supervisor
import "github.com/docker/containerd/runtime"
import (
"time"
type UpdateEvent struct {
"github.com/docker/containerd/runtime"
)
type UpdateTask struct {
s *Supervisor
}
func (h *UpdateEvent) Handle(e *Event) error {
func (h *UpdateTask) Handle(e *Task) error {
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
@ -18,10 +22,20 @@ func (h *UpdateEvent) Handle(e *Event) error {
if err := container.Resume(); err != nil {
return ErrUnknownContainerStatus
}
h.s.notifySubscribers(Event{
ID: e.ID,
Type: "resume",
Timestamp: time.Now(),
})
case runtime.Paused:
if err := container.Pause(); err != nil {
return ErrUnknownContainerStatus
}
h.s.notifySubscribers(Event{
ID: e.ID,
Type: "pause",
Timestamp: time.Now(),
})
default:
return ErrUnknownContainerStatus
}
@ -29,11 +43,11 @@ func (h *UpdateEvent) Handle(e *Event) error {
return nil
}
type UpdateProcessEvent struct {
type UpdateProcessTask struct {
s *Supervisor
}
func (h *UpdateProcessEvent) Handle(e *Event) error {
func (h *UpdateProcessTask) Handle(e *Task) error {
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound

View file

@ -12,7 +12,7 @@ type Worker interface {
Start()
}
type StartTask struct {
type startTask struct {
Container runtime.Container
Checkpoint string
Stdin string
@ -40,9 +40,9 @@ func (w *worker) Start() {
started := time.Now()
process, err := t.Container.Start(t.Checkpoint, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil {
evt := NewEvent(DeleteEventType)
evt := NewTask(DeleteTaskType)
evt.ID = t.Container.ID()
w.s.SendEvent(evt)
w.s.SendTask(evt)
t.Err <- err
continue
}
@ -64,5 +64,10 @@ func (w *worker) Start() {
t.StartResponse <- StartResponse{
Container: t.Container,
}
w.s.notifySubscribers(Event{
Timestamp: time.Now(),
ID: t.Container.ID(),
Type: "start-container",
})
}
}