Refactor container info in supervisor

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2015-12-11 11:27:33 -08:00
parent e5545a1461
commit 3010f209ff
11 changed files with 67 additions and 48 deletions

View file

@ -9,15 +9,16 @@ type AddProcessEvent struct {
// TODO: add this to worker for concurrent starts??? maybe not because of races where the container // TODO: add this to worker for concurrent starts??? maybe not because of races where the container
// could be stopped and removed... // could be stopped and removed...
func (h *AddProcessEvent) Handle(e *Event) error { func (h *AddProcessEvent) Handle(e *Event) error {
container, ok := h.s.containers[e.ID] ci, ok := h.s.containers[e.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
p, io, err := h.s.runtime.StartProcess(container, *e.Process) p, io, err := h.s.runtime.StartProcess(ci.container, *e.Process)
if err != nil { if err != nil {
return err return err
} }
if err := h.s.log(container.Path(), io); err != nil { l, err := h.s.log(ci.container.Path(), io)
if err != nil {
// log the error but continue with the other commands // log the error but continue with the other commands
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"error": err, "error": err,
@ -27,6 +28,9 @@ func (h *AddProcessEvent) Handle(e *Event) error {
if e.Pid, err = p.Pid(); err != nil { if e.Pid, err = p.Pid(); err != nil {
return err return err
} }
h.s.processes[e.Pid] = container h.s.processes[e.Pid] = &containerInfo{
container: ci.container,
logger: l,
}
return nil return nil
} }

View file

@ -5,11 +5,11 @@ type CreateCheckpointEvent struct {
} }
func (h *CreateCheckpointEvent) Handle(e *Event) error { func (h *CreateCheckpointEvent) Handle(e *Event) error {
container, ok := h.s.containers[e.ID] i, ok := h.s.containers[e.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
return container.Checkpoint(*e.Checkpoint) return i.container.Checkpoint(*e.Checkpoint)
} }
type DeleteCheckpointEvent struct { type DeleteCheckpointEvent struct {
@ -17,9 +17,9 @@ type DeleteCheckpointEvent struct {
} }
func (h *DeleteCheckpointEvent) Handle(e *Event) error { func (h *DeleteCheckpointEvent) Handle(e *Event) error {
container, ok := h.s.containers[e.ID] i, ok := h.s.containers[e.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
return container.DeleteCheckpoint(e.Checkpoint.Name) return i.container.DeleteCheckpoint(e.Checkpoint.Name)
} }

View file

@ -10,10 +10,13 @@ type DeleteEvent struct {
} }
func (h *DeleteEvent) Handle(e *Event) error { func (h *DeleteEvent) Handle(e *Event) error {
if container, ok := h.s.containers[e.ID]; ok { if i, ok := h.s.containers[e.ID]; ok {
if err := h.deleteContainer(container); err != nil { if err := h.deleteContainer(i.container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container") logrus.WithField("error", err).Error("containerd: deleting container")
} }
if err := i.logger.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close container logger")
}
h.s.notifySubscribers(&Event{ h.s.notifySubscribers(&Event{
Type: ExitEventType, Type: ExitEventType,
ID: e.ID, ID: e.ID,

11
exit.go
View file

@ -10,9 +10,9 @@ func (h *ExitEvent) Handle(e *Event) error {
logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}).
Debug("containerd: process exited") Debug("containerd: process exited")
// is it the child process of a container // is it the child process of a container
if container, ok := h.s.processes[e.Pid]; ok { if info, ok := h.s.processes[e.Pid]; ok {
ne := NewEvent(ExecExitEventType) ne := NewEvent(ExecExitEventType)
ne.ID = container.ID() ne.ID = info.container.ID()
ne.Pid = e.Pid ne.Pid = e.Pid
ne.Status = e.Status ne.Status = e.Status
h.s.SendEvent(ne) h.s.SendEvent(ne)
@ -42,10 +42,13 @@ type ExecExitEvent struct {
func (h *ExecExitEvent) Handle(e *Event) error { func (h *ExecExitEvent) Handle(e *Event) error {
// exec process: we remove this process without notifying the main event loop // exec process: we remove this process without notifying the main event loop
container := h.s.processes[e.Pid] info := h.s.processes[e.Pid]
if err := container.RemoveProcess(e.Pid); err != nil { if err := info.container.RemoveProcess(e.Pid); err != nil {
logrus.WithField("error", err).Error("containerd: find container for pid") logrus.WithField("error", err).Error("containerd: find container for pid")
} }
if err := info.logger.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close process IO")
}
delete(h.s.processes, e.Pid) delete(h.s.processes, e.Pid)
h.s.notifySubscribers(e) h.s.notifySubscribers(e)
return nil return nil

View file

@ -5,8 +5,8 @@ type GetContainersEvent struct {
} }
func (h *GetContainersEvent) Handle(e *Event) error { func (h *GetContainersEvent) Handle(e *Event) error {
for _, c := range h.s.containers { for _, i := range h.s.containers {
e.Containers = append(e.Containers, c) e.Containers = append(e.Containers, i.container)
} }
return nil return nil
} }

29
log.go
View file

@ -24,6 +24,15 @@ func newLogger(i *logConfig) (*logger, error) {
config: i, config: i,
messages: make(chan *Message, DefaultBufferSize), messages: make(chan *Message, DefaultBufferSize),
} }
f, err := os.OpenFile(
filepath.Join(l.config.BundlePath, "logs.json"),
os.O_CREATE|os.O_WRONLY|os.O_APPEND,
0655,
)
if err != nil {
return nil, err
}
l.f = f
hout := &logHandler{ hout := &logHandler{
stream: "stdout", stream: "stdout",
messages: l.messages, messages: l.messages,
@ -32,16 +41,14 @@ func newLogger(i *logConfig) (*logger, error) {
stream: "stderr", stream: "stderr",
messages: l.messages, messages: l.messages,
} }
l.wg.Add(2)
go func() { go func() {
defer l.wg.Done()
io.Copy(hout, i.Stdout) io.Copy(hout, i.Stdout)
}() }()
go func() { go func() {
defer l.wg.Done()
io.Copy(herr, i.Stderr) io.Copy(herr, i.Stderr)
}() }()
return l, l.start() l.start()
return l, nil
} }
type Message struct { type Message struct {
@ -71,27 +78,17 @@ func (h *logHandler) Write(b []byte) (int, error) {
return len(b), nil return len(b), nil
} }
func (l *logger) start() error { func (l *logger) start() {
f, err := os.OpenFile(
filepath.Join(l.config.BundlePath, "logs.json"),
os.O_CREATE|os.O_WRONLY|os.O_APPEND,
0655,
)
if err != nil {
return err
}
l.f = f
l.wg.Add(1) l.wg.Add(1)
go func() { go func() {
l.wg.Done() l.wg.Done()
enc := json.NewEncoder(f) enc := json.NewEncoder(l.f)
for m := range l.messages { for m := range l.messages {
if err := enc.Encode(m); err != nil { if err := enc.Encode(m); err != nil {
logrus.WithField("error", err).Error("write log message") logrus.WithField("error", err).Error("write log message")
} }
} }
}() }()
return nil
} }
func (l *logger) Close() (err error) { func (l *logger) Close() (err error) {

View file

@ -5,11 +5,11 @@ type SignalEvent struct {
} }
func (h *SignalEvent) Handle(e *Event) error { func (h *SignalEvent) Handle(e *Event) error {
container, ok := h.s.containers[e.ID] i, ok := h.s.containers[e.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
processes, err := container.Processes() processes, err := i.container.Processes()
if err != nil { if err != nil {
return err return err
} }

View file

@ -10,7 +10,9 @@ func (h *StartEvent) Handle(e *Event) error {
return err return err
} }
h.s.containerGroup.Add(1) h.s.containerGroup.Add(1)
h.s.containers[e.ID] = container h.s.containers[e.ID] = &containerInfo{
container: container,
}
ContainersCounter.Inc(1) ContainersCounter.Inc(1)
task := &StartTask{ task := &StartTask{
Err: e.Err, Err: e.Err,

View file

@ -29,8 +29,8 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err
} }
s := &Supervisor{ s := &Supervisor{
stateDir: stateDir, stateDir: stateDir,
containers: make(map[string]runtime.Container), containers: make(map[string]*containerInfo),
processes: make(map[int]runtime.Container), processes: make(map[int]*containerInfo),
runtime: r, runtime: r,
tasks: tasks, tasks: tasks,
events: make(chan *Event, DefaultBufferSize), events: make(chan *Event, DefaultBufferSize),
@ -54,11 +54,16 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err
return s, nil return s, nil
} }
type containerInfo struct {
container runtime.Container
logger *logger
}
type Supervisor struct { type Supervisor struct {
// stateDir is the directory on the system to store container runtime state information. // stateDir is the directory on the system to store container runtime state information.
stateDir string stateDir string
containers map[string]runtime.Container containers map[string]*containerInfo
processes map[int]runtime.Container processes map[int]*containerInfo
handlers map[EventType]Handler handlers map[EventType]Handler
runtime runtime.Runtime runtime runtime.Runtime
events chan *Event events chan *Event
@ -78,7 +83,8 @@ func (s *Supervisor) Stop(sig chan os.Signal) {
// Close the tasks channel so that no new containers get started // Close the tasks channel so that no new containers get started
close(s.tasks) close(s.tasks)
// send a SIGTERM to all containers // send a SIGTERM to all containers
for id, c := range s.containers { for id, i := range s.containers {
c := i.container
logrus.WithField("id", id).Debug("sending TERM to container processes") logrus.WithField("id", id).Debug("sending TERM to container processes")
procs, err := c.Processes() procs, err := c.Processes()
if err != nil { if err != nil {
@ -193,7 +199,8 @@ func (s *Supervisor) Machine() Machine {
// getContainerForPid returns the container where the provided pid is the pid1 or main // getContainerForPid returns the container where the provided pid is the pid1 or main
// process in the container // process in the container
func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
for _, container := range s.containers { for _, i := range s.containers {
container := i.container
cpid, err := container.Pid() cpid, err := container.Pid()
if err != nil { if err != nil {
if lerr, ok := err.(libcontainer.Error); ok { if lerr, ok := err.(libcontainer.Error); ok {
@ -215,16 +222,16 @@ func (s *Supervisor) SendEvent(evt *Event) {
s.events <- evt s.events <- evt
} }
func (s *Supervisor) log(path string, i *runtime.IO) error { func (s *Supervisor) log(path string, i *runtime.IO) (*logger, error) {
config := &logConfig{ config := &logConfig{
BundlePath: path, BundlePath: path,
Stdin: i.Stdin, Stdin: i.Stdin,
Stdout: i.Stdout, Stdout: i.Stdout,
Stderr: i.Stderr, Stderr: i.Stderr,
} }
// TODO: save logger to call close after its all done l, err := newLogger(config)
if _, err := newLogger(config); err != nil { if err != nil {
return err return nil, err
} }
return nil return l, nil
} }

View file

@ -7,10 +7,11 @@ type UpdateEvent struct {
} }
func (h *UpdateEvent) Handle(e *Event) error { func (h *UpdateEvent) Handle(e *Event) error {
container, ok := h.s.containers[e.ID] i, ok := h.s.containers[e.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
container := i.container
if e.State.Status != "" { if e.State.Status != "" {
switch e.State.Status { switch e.State.Status {
case runtime.Running: case runtime.Running:

View file

@ -35,13 +35,15 @@ func (w *worker) Start() {
for t := range w.s.tasks { for t := range w.s.tasks {
started := time.Now() started := time.Now()
// start logging the container's stdio // start logging the container's stdio
if err := w.s.log(t.Container.Path(), t.IO); err != nil { l, err := w.s.log(t.Container.Path(), t.IO)
if err != nil {
evt := NewEvent(DeleteEventType) evt := NewEvent(DeleteEventType)
evt.ID = t.Container.ID() evt.ID = t.Container.ID()
w.s.SendEvent(evt) w.s.SendEvent(evt)
t.Err <- err t.Err <- err
continue continue
} }
w.s.containers[t.Container.ID()].logger = l
if t.Checkpoint != "" { if t.Checkpoint != "" {
if err := t.Container.Restore(t.Checkpoint); err != nil { if err := t.Container.Restore(t.Checkpoint); err != nil {
evt := NewEvent(DeleteEventType) evt := NewEvent(DeleteEventType)