diff --git a/add_process.go b/add_process.go index 85573be..253eab0 100644 --- a/add_process.go +++ b/add_process.go @@ -1,18 +1,29 @@ package containerd +import "github.com/Sirupsen/logrus" + type AddProcessEvent struct { s *Supervisor } +// TODO: add this to worker for concurrent starts??? maybe not because of races where the container +// could be stopped and removed... func (h *AddProcessEvent) Handle(e *Event) error { container, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - p, err := h.s.runtime.StartProcess(container, *e.Process, e.Stdio) + p, io, err := h.s.runtime.StartProcess(container, *e.Process) if err != nil { return err } + if err := h.s.log(container.Path(), io); err != nil { + // log the error but continue with the other commands + logrus.WithFields(logrus.Fields{ + "error": err, + "id": e.ID, + }).Error("log stdio") + } if e.Pid, err = p.Pid(); err != nil { return err } diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 5a345b1..0eae0c2 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -38,10 +38,6 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine Name: c.Checkpoint, } } - e.Stdio = &runtime.Stdio{ - Stderr: c.Stderr, - Stdout: c.Stdout, - } s.sv.SendEvent(e) if err := <-e.Err; err != nil { return nil, err diff --git a/ctr/logs.go b/ctr/logs.go new file mode 100644 index 0000000..65e04bd --- /dev/null +++ b/ctr/logs.go @@ -0,0 +1,60 @@ +package main + +import ( + "encoding/json" + "io" + "os" + "time" + + "github.com/codegangsta/cli" + "github.com/docker/containerd" +) + +var LogsCommand = cli.Command{ + Name: "logs", + Usage: "view binary container logs generated by containerd", + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "follow,f", + Usage: "follow/tail the logs", + }, + }, + Action: func(context *cli.Context) { + path := context.Args().First() + if path == "" { + fatal("path to the log cannot be empty", 1) + } + if err := readLogs(path, context.Bool("follow")); err != nil { + fatal(err.Error(), 1) + } + }, +} + +func readLogs(path string, follow bool) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + dec := json.NewDecoder(f) + for { + var msg *containerd.Message + if err := dec.Decode(&msg); err != nil { + if err == io.EOF { + if follow { + time.Sleep(100 * time.Millisecond) + continue + } + return nil + } + return err + } + switch msg.Stream { + case "stdout": + os.Stdout.Write(msg.Data) + case "stderr": + os.Stderr.Write(msg.Data) + } + } + return nil +} diff --git a/ctr/main.go b/ctr/main.go index 3575f17..e966e04 100644 --- a/ctr/main.go +++ b/ctr/main.go @@ -34,9 +34,10 @@ func main() { }, } app.Commands = []cli.Command{ - ContainersCommand, CheckpointCommand, + ContainersCommand, EventsCommand, + LogsCommand, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { diff --git a/event.go b/event.go index 2680955..7239b15 100644 --- a/event.go +++ b/event.go @@ -36,7 +36,6 @@ type Event struct { Timestamp time.Time ID string BundlePath string - Stdio *runtime.Stdio Pid int Status int Signal os.Signal diff --git a/linux/linux.go b/linux/linux.go index 8a86db5..5809f43 100644 --- a/linux/linux.go +++ b/linux/linux.go @@ -5,7 +5,6 @@ package linux import ( "encoding/json" "fmt" - "io" "io/ioutil" "os" "path/filepath" @@ -363,25 +362,29 @@ func (r *libcontainerRuntime) Type() string { return "libcontainer" } -func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio) (runtime.Container, error) { +func (r *libcontainerRuntime) Create(id, bundlePath string) (runtime.Container, *runtime.IO, error) { spec, rspec, err := r.loadSpec( filepath.Join(bundlePath, "config.json"), filepath.Join(bundlePath, "runtime.json"), ) if err != nil { - return nil, err + return nil, nil, err } config, err := r.createLibcontainerConfig(id, bundlePath, spec, rspec) if err != nil { - return nil, err + return nil, nil, err } container, err := r.factory.Create(id, config) if err != nil { - return nil, fmt.Errorf("create container: %v", err) + return nil, nil, fmt.Errorf("create container: %v", err) } - process, err := r.newProcess(spec.Process, stdio) + process, err := r.newProcess(spec.Process) if err != nil { - return nil, err + return nil, nil, err + } + i, err := process.InitializeIO(int(spec.Process.User.UID)) + if err != nil { + return nil, nil, err } c := &libcontainerContainer{ c: container, @@ -392,20 +395,28 @@ func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio }, path: bundlePath, } - return c, nil + return c, &runtime.IO{ + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + }, nil } -func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process, stdio *runtime.Stdio) (runtime.Process, error) { +func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process) (runtime.Process, *runtime.IO, error) { c, ok := ci.(*libcontainerContainer) if !ok { - return nil, runtime.ErrInvalidContainerType + return nil, nil, runtime.ErrInvalidContainerType } - process, err := r.newProcess(p, stdio) + process, err := r.newProcess(p) if err != nil { - return nil, err + return nil, nil, err + } + i, err := process.InitializeIO(int(p.User.UID)) + if err != nil { + return nil, nil, err } if err := c.c.Start(process); err != nil { - return nil, err + return nil, nil, err } lp := &libcontainerProcess{ process: process, @@ -413,42 +424,29 @@ func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process } pid, err := process.Pid() if err != nil { - return nil, err + return nil, nil, err } c.additionalProcesses[pid] = lp - return lp, nil + return lp, &runtime.IO{ + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + }, nil } // newProcess returns a new libcontainer Process with the arguments from the // spec and stdio from the current process. -func (r *libcontainerRuntime) newProcess(p specs.Process, stdio *runtime.Stdio) (*libcontainer.Process, error) { - var ( - stderr, stdout io.Writer - ) - if stdio != nil { - if stdio.Stdout != "" { - f, err := os.OpenFile(stdio.Stdout, os.O_CREATE|os.O_WRONLY, 0755) - if err != nil { - return nil, fmt.Errorf("open stdout: %v", err) - } - stdout = f - } - if stdio.Stderr != "" { - f, err := os.OpenFile(stdio.Stderr, os.O_CREATE|os.O_WRONLY, 0755) - if err != nil { - return nil, fmt.Errorf("open stderr: %v", err) - } - stderr = f - } +func (r *libcontainerRuntime) newProcess(p specs.Process) (*libcontainer.Process, error) { + // TODO: support terminals + if p.Terminal { + return nil, runtime.ErrTerminalsNotSupported } return &libcontainer.Process{ Args: p.Args, Env: p.Env, // TODO: fix libcontainer's API to better support uid/gid in a typesafe way. - User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID), - Cwd: p.Cwd, - Stderr: stderr, - Stdout: stdout, + User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID), + Cwd: p.Cwd, }, nil } diff --git a/log.go b/log.go new file mode 100644 index 0000000..1a84871 --- /dev/null +++ b/log.go @@ -0,0 +1,113 @@ +package containerd + +import ( + "encoding/json" + "io" + "os" + "path/filepath" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +type logConfig struct { + BundlePath string + LogSize int64 // in bytes + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func newLogger(i *logConfig) (*logger, error) { + l := &logger{ + config: i, + messages: make(chan *Message, DefaultBufferSize), + } + hout := &logHandler{ + stream: "stdout", + messages: l.messages, + } + herr := &logHandler{ + stream: "stderr", + messages: l.messages, + } + l.wg.Add(2) + go func() { + defer l.wg.Done() + io.Copy(hout, i.Stdout) + }() + go func() { + defer l.wg.Done() + io.Copy(herr, i.Stderr) + }() + return l, l.start() +} + +type Message struct { + Stream string `json:"stream"` + Timestamp time.Time `json:"timestamp"` + Data []byte `json:"data"` +} + +type logger struct { + config *logConfig + f *os.File + wg sync.WaitGroup + messages chan *Message +} + +type logHandler struct { + stream string + messages chan *Message +} + +func (h *logHandler) Write(b []byte) (int, error) { + h.messages <- &Message{ + Stream: h.stream, + Timestamp: time.Now(), + Data: b, + } + return len(b), nil +} + +func (l *logger) start() error { + f, err := os.OpenFile( + filepath.Join(l.config.BundlePath, "logs.json"), + os.O_CREATE|os.O_WRONLY|os.O_APPEND, + 0655, + ) + if err != nil { + return err + } + l.f = f + l.wg.Add(1) + go func() { + l.wg.Done() + enc := json.NewEncoder(f) + for m := range l.messages { + if err := enc.Encode(m); err != nil { + logrus.WithField("error", err).Error("write log message") + } + } + }() + return nil +} + +func (l *logger) Close() (err error) { + for _, c := range []io.Closer{ + l.config.Stdin, + l.config.Stdout, + l.config.Stderr, + } { + if cerr := c.Close(); err == nil { + err = cerr + } + } + close(l.messages) + l.wg.Wait() + if ferr := l.f.Close(); err == nil { + err = ferr + } + return err +} diff --git a/runtime/container.go b/runtime/container.go index e7c20de..863ffe4 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -1,6 +1,7 @@ package runtime import ( + "io" "os" "time" @@ -24,9 +25,24 @@ type State struct { Status Status } -type Stdio struct { - Stderr string - Stdout string +type IO struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func (i *IO) Close() error { + var oerr error + for _, c := range []io.Closer{ + i.Stdin, + i.Stdout, + i.Stderr, + } { + if err := c.Close(); oerr == nil { + oerr = err + } + } + return oerr } type Stat struct { diff --git a/runtime/runtime.go b/runtime/runtime.go index 883d7c4..aabe055 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -7,18 +7,20 @@ import ( ) var ( - ErrNotChildProcess = errors.New("containerd: not a child process for container") - ErrInvalidContainerType = errors.New("containerd: invalid container type for runtime") - ErrCheckpointNotExists = errors.New("containerd: checkpoint does not exist for container") - ErrCheckpointExists = errors.New("containerd: checkpoint already exists") - ErrContainerExited = errors.New("containerd: container has exited") + ErrNotChildProcess = errors.New("containerd: not a child process for container") + ErrInvalidContainerType = errors.New("containerd: invalid container type for runtime") + ErrCheckpointNotExists = errors.New("containerd: checkpoint does not exist for container") + ErrCheckpointExists = errors.New("containerd: checkpoint already exists") + ErrContainerExited = errors.New("containerd: container has exited") + ErrTerminalsNotSupported = errors.New("containerd: terminals are not supported for runtime") ) // Runtime handles containers, containers handle their own actions type Runtime interface { - // Create creates a new container initialized but without it starting it - Create(id, bundlePath string, stdio *Stdio) (Container, error) - // StartProcess adds a new process to the container - StartProcess(Container, specs.Process, *Stdio) (Process, error) + // Type of the runtime Type() string + // Create creates a new container initialized but without it starting it + Create(id, bundlePath string) (Container, *IO, error) + // StartProcess adds a new process to the container + StartProcess(Container, specs.Process) (Process, *IO, error) } diff --git a/start.go b/start.go index 9400e08..4c725ad 100644 --- a/start.go +++ b/start.go @@ -5,7 +5,7 @@ type StartEvent struct { } func (h *StartEvent) Handle(e *Event) error { - container, err := h.s.runtime.Create(e.ID, e.BundlePath, e.Stdio) + container, io, err := h.s.runtime.Create(e.ID, e.BundlePath) if err != nil { return err } @@ -14,6 +14,7 @@ func (h *StartEvent) Handle(e *Event) error { ContainersCounter.Inc(1) task := &StartTask{ Err: e.Err, + IO: io, Container: container, } if e.Checkpoint != nil { diff --git a/supervisor.go b/supervisor.go index bbbbfa5..b2761df 100644 --- a/supervisor.go +++ b/supervisor.go @@ -214,3 +214,17 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } + +func (s *Supervisor) log(path string, i *runtime.IO) error { + config := &logConfig{ + BundlePath: path, + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + } + // TODO: save logger to call close after its all done + if _, err := newLogger(config); err != nil { + return err + } + return nil +} diff --git a/worker.go b/worker.go index d77ebef..c3ca8be 100644 --- a/worker.go +++ b/worker.go @@ -14,6 +14,7 @@ type Worker interface { type StartTask struct { Container runtime.Container Checkpoint string + IO *runtime.IO Err chan error } @@ -33,6 +34,14 @@ func (w *worker) Start() { defer w.wg.Done() for t := range w.s.tasks { started := time.Now() + // start logging the container's stdio + if err := w.s.log(t.Container.Path(), t.IO); err != nil { + evt := NewEvent(DeleteEventType) + evt.ID = t.Container.ID() + w.s.SendEvent(evt) + t.Err <- err + continue + } if t.Checkpoint != "" { if err := t.Container.Restore(t.Checkpoint); err != nil { evt := NewEvent(DeleteEventType)