Add journaling

This commit is contained in:
Michael Crosby 2015-11-10 14:24:34 -08:00
parent d34d482a5f
commit 6ff2239019
5 changed files with 115 additions and 86 deletions

View File

@ -49,12 +49,11 @@ func (s *server) signalPid(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
e := &containerd.SignalEvent{
ID: id,
Pid: pid,
Signal: syscall.Signal(signal.Signal),
Err: make(chan error, 1),
}
e := containerd.NewEvent(containerd.SignalEventType)
e.ID = id
e.Pid = pid
e.Signal = syscall.Signal(signal.Signal)
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
status := http.StatusInternalServerError
@ -69,9 +68,7 @@ func (s *server) signalPid(w http.ResponseWriter, r *http.Request) {
func (s *server) containers(w http.ResponseWriter, r *http.Request) {
var state State
state.Containers = []Container{}
e := &containerd.GetContainersEvent{
Err: make(chan error, 1),
}
e := containerd.NewEvent(containerd.GetContainerEventType)
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
@ -112,11 +109,9 @@ func (s *server) createContainer(w http.ResponseWriter, r *http.Request) {
http.Error(w, "empty bundle path", http.StatusBadRequest)
return
}
e := &containerd.StartContainerEvent{
ID: id,
BundlePath: c.BundlePath,
Err: make(chan error, 1),
}
e := containerd.NewEvent(containerd.StartContainerEventType)
e.ID = id
e.BundlePath = c.BundlePath
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
code := http.StatusInternalServerError

View File

@ -54,7 +54,7 @@ func daemon(stateDir string, concurrency, bufferSize int) error {
if err != nil {
return err
}
events := make(chan containerd.Event, bufferSize)
events := make(chan *containerd.Event, bufferSize)
// start the signal handler in the background.
go startSignalHandler(supervisor, bufferSize)
if err := supervisor.Start(events); err != nil {
@ -71,6 +71,7 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) {
for s := range signals {
switch s {
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP:
supervisor.Close()
os.Exit(0)
case syscall.SIGCHLD:
exits, err := reap()
@ -84,7 +85,7 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) {
}
}
func reap() (exits []*containerd.ExitEvent, err error) {
func reap() (exits []*containerd.Event, err error) {
var (
ws syscall.WaitStatus
rus syscall.Rusage
@ -100,9 +101,9 @@ func reap() (exits []*containerd.ExitEvent, err error) {
if pid <= 0 {
return exits, nil
}
exits = append(exits, &containerd.ExitEvent{
Pid: pid,
Status: utils.ExitStatus(ws),
})
e := containerd.NewEvent(containerd.ExitEventType)
e.Pid = pid
e.Status = utils.ExitStatus(ws)
exits = append(exits, e)
}
}

View File

@ -1,59 +1,36 @@
package containerd
import "os"
import (
"os"
"time"
)
type Event interface {
String() string
type EventType string
const (
ExitEventType EventType = "exit"
StartContainerEventType EventType = "startContainer"
ContainerStartErrorEventType EventType = "startContainerError"
GetContainerEventType EventType = "getContainer"
SignalEventType EventType = "signal"
)
func NewEvent(t EventType) *Event {
return &Event{
Type: t,
Timestamp: time.Now(),
Err: make(chan error, 1),
}
}
type CallbackEvent interface {
Event() Event
Callback() chan Event
}
type ExitEvent struct {
Pid int
Status int
}
func (e *ExitEvent) String() string {
return "exit event"
}
type StartContainerEvent struct {
ID string
BundlePath string
Err chan error
}
func (c *StartContainerEvent) String() string {
return "create container"
}
type ContainerStartErrorEvent struct {
ID string
}
func (c *ContainerStartErrorEvent) String() string {
return "container start error"
}
type GetContainersEvent struct {
Containers []Container
Err chan error
}
func (c *GetContainersEvent) String() string {
return "get containers"
}
type SignalEvent struct {
ID string
Pid int
Signal os.Signal
Err chan error
}
func (s *SignalEvent) String() string {
return "signal event"
type Event struct {
Type EventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
ID string `json:"id,omitempty"`
BundlePath string `json:"bundlePath,omitempty"`
Pid int `json:"pid,omitempty"`
Status int `json:"status,omitempty"`
Signal os.Signal `json:"signal,omitempty"`
Containers []Container `json:"-"`
Err chan error `json:"-"`
}

41
journal.go Normal file
View File

@ -0,0 +1,41 @@
package containerd
import (
"encoding/json"
"os"
"path/filepath"
)
type entry struct {
Event *Event `json:"event"`
}
func newJournal(path string) (*journal, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, err
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0755)
if err != nil {
return nil, err
}
return &journal{
f: f,
enc: json.NewEncoder(f),
}, nil
}
type journal struct {
f *os.File
enc *json.Encoder
}
func (j *journal) write(e *Event) error {
et := &entry{
Event: e,
}
return j.enc.Encode(et)
}
func (j *journal) Close() error {
return j.f.Close()
}

View File

@ -2,6 +2,7 @@ package containerd
import (
"os"
"path/filepath"
"sync"
"github.com/Sirupsen/logrus"
@ -18,11 +19,16 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) {
if err != nil {
return nil, err
}
j, err := newJournal(filepath.Join(stateDir, "journal.json"))
if err != nil {
return nil, err
}
s := &Supervisor{
stateDir: stateDir,
containers: make(map[string]Container),
runtime: runtime,
tasks: make(chan *startTask, concurrency*100),
journal: j,
}
for i := 0; i < concurrency; i++ {
s.workerGroup.Add(1)
@ -39,24 +45,33 @@ type Supervisor struct {
runtime Runtime
events chan Event
journal *journal
events chan *Event
tasks chan *startTask
workerGroup sync.WaitGroup
}
func (s *Supervisor) Close() error {
return s.journal.Close()
}
// Start is a non-blocking call that runs the supervisor for monitoring contianer processes and
// executing new containers.
//
// This event loop is the only thing that is allowed to modify state of containers and processes.
func (s *Supervisor) Start(events chan Event) error {
func (s *Supervisor) Start(events chan *Event) error {
if events == nil {
return ErrEventChanNil
}
s.events = events
go func() {
for evt := range events {
switch e := evt.(type) {
case *ExitEvent:
for e := range events {
if err := s.journal.write(e); err != nil {
logrus.WithField("error", err).Error("write journal entry")
}
switch e.Type {
case ExitEventType:
logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}).
Debug("containerd: process exited")
container, err := s.getContainerForPid(e.Pid)
@ -70,7 +85,7 @@ func (s *Supervisor) Start(events chan Event) error {
if err := s.deleteContainer(container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container")
}
case *StartContainerEvent:
case StartContainerEventType:
container, err := s.runtime.Create(e.ID, e.BundlePath)
if err != nil {
e.Err <- err
@ -81,18 +96,18 @@ func (s *Supervisor) Start(events chan Event) error {
err: e.Err,
container: container,
}
case *ContainerStartErrorEvent:
case ContainerStartErrorEventType:
if container, ok := s.containers[e.ID]; ok {
if err := s.deleteContainer(container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container")
}
}
case *GetContainersEvent:
case GetContainerEventType:
for _, c := range s.containers {
e.Containers = append(e.Containers, c)
}
e.Err <- nil
case *SignalEvent:
case SignalEventType:
container, ok := s.containers[e.ID]
if !ok {
e.Err <- ErrContainerNotFound
@ -139,7 +154,7 @@ func (s *Supervisor) getContainerForPid(pid int) (Container, error) {
return nil, errNoContainerForPid
}
func (s *Supervisor) SendEvent(evt Event) {
func (s *Supervisor) SendEvent(evt *Event) {
s.events <- evt
}
@ -152,9 +167,9 @@ func (s *Supervisor) startContainerWorker(tasks chan *startTask) {
defer s.workerGroup.Done()
for t := range tasks {
if err := t.container.Start(); err != nil {
s.SendEvent(&ContainerStartErrorEvent{
ID: t.container.ID(),
})
e := NewEvent(StartContainerEventType)
e.ID = t.container.ID()
s.SendEvent(e)
t.err <- err
continue
}