diff --git a/add_process.go b/add_process.go index 862f289..84c0fb5 100644 --- a/add_process.go +++ b/add_process.go @@ -17,7 +17,7 @@ func (h *AddProcessEvent) Handle(e *Event) error { if err != nil { return err } - l, err := h.s.log(ci.container.Path(), io) + l, err := h.s.copyIO(e.Stdout, e.Stderr, io) if err != nil { // log the error but continue with the other commands logrus.WithFields(logrus.Fields{ @@ -30,7 +30,7 @@ func (h *AddProcessEvent) Handle(e *Event) error { } h.s.processes[e.Pid] = &containerInfo{ container: ci.container, - logger: l, + copier: l, } return nil } diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 0eae0c2..102e886 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -33,6 +33,8 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine e := containerd.NewEvent(containerd.StartContainerEventType) e.ID = c.Id e.BundlePath = c.BundlePath + e.Stdout = c.Stdout + e.Stderr = c.Stderr if c.Checkpoint != "" { e.Checkpoint = &runtime.Checkpoint{ Name: c.Checkpoint, diff --git a/ctr/container.go b/ctr/container.go index 522bd2b..9958fc7 100644 --- a/ctr/container.go +++ b/ctr/container.go @@ -2,7 +2,12 @@ package main import ( "fmt" + "io" + "io/ioutil" "os" + "path/filepath" + "sync" + "syscall" "text/tabwriter" "github.com/codegangsta/cli" @@ -63,6 +68,10 @@ var StartCommand = cli.Command{ Value: "", Usage: "checkpoint to start the container from", }, + cli.BoolFlag{ + Name: "interactive,i", + Usage: "connect to the stdio of the container", + }, }, Action: func(context *cli.Context) { var ( @@ -75,17 +84,71 @@ var StartCommand = cli.Command{ if id == "" { fatal("container id cannot be empty", 1) } - c := getClient() - if _, err := c.CreateContainer(netcontext.Background(), &types.CreateContainerRequest{ + r := &types.CreateContainerRequest{ Id: id, BundlePath: path, Checkpoint: context.String("checkpoint"), - }); err != nil { + } + wg := &sync.WaitGroup{} + if context.Bool("interactive") { + if err := attachStdio(r, wg); err != nil { + fatal(err.Error(), 1) + } + } + c := getClient() + if _, err := c.CreateContainer(netcontext.Background(), r); err != nil { fatal(err.Error(), 1) } + wg.Wait() }, } +func attachStdio(r *types.CreateContainerRequest, wg *sync.WaitGroup) error { + dir, err := ioutil.TempDir("", "ctr-") + if err != nil { + return err + } + wg.Add(2) + for _, p := range []struct { + path string + flag int + done func(f *os.File) + }{ + { + path: filepath.Join(dir, "stdout"), + flag: syscall.O_RDWR, + done: func(f *os.File) { + r.Stdout = filepath.Join(dir, "stdout") + go func() { + io.Copy(os.Stdout, f) + wg.Done() + }() + }, + }, + { + path: filepath.Join(dir, "stderr"), + flag: syscall.O_RDWR, + done: func(f *os.File) { + r.Stderr = filepath.Join(dir, "stderr") + go func() { + io.Copy(os.Stderr, f) + wg.Done() + }() + }, + }, + } { + if err := syscall.Mkfifo(p.path, 0755); err != nil { + return fmt.Errorf("mkfifo: %s %v", p.path, err) + } + f, err := os.OpenFile(p.path, p.flag, 0) + if err != nil { + return fmt.Errorf("open: %s %v", p.path, err) + } + p.done(f) + } + return nil +} + var KillCommand = cli.Command{ Name: "kill", Usage: "send a signal to a container or it's processes", diff --git a/ctr/logs.go b/ctr/logs.go deleted file mode 100644 index 65e04bd..0000000 --- a/ctr/logs.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "encoding/json" - "io" - "os" - "time" - - "github.com/codegangsta/cli" - "github.com/docker/containerd" -) - -var LogsCommand = cli.Command{ - Name: "logs", - Usage: "view binary container logs generated by containerd", - Flags: []cli.Flag{ - cli.BoolFlag{ - Name: "follow,f", - Usage: "follow/tail the logs", - }, - }, - Action: func(context *cli.Context) { - path := context.Args().First() - if path == "" { - fatal("path to the log cannot be empty", 1) - } - if err := readLogs(path, context.Bool("follow")); err != nil { - fatal(err.Error(), 1) - } - }, -} - -func readLogs(path string, follow bool) error { - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - dec := json.NewDecoder(f) - for { - var msg *containerd.Message - if err := dec.Decode(&msg); err != nil { - if err == io.EOF { - if follow { - time.Sleep(100 * time.Millisecond) - continue - } - return nil - } - return err - } - switch msg.Stream { - case "stdout": - os.Stdout.Write(msg.Data) - case "stderr": - os.Stderr.Write(msg.Data) - } - } - return nil -} diff --git a/ctr/main.go b/ctr/main.go index e966e04..632f168 100644 --- a/ctr/main.go +++ b/ctr/main.go @@ -37,7 +37,6 @@ func main() { CheckpointCommand, ContainersCommand, EventsCommand, - LogsCommand, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { diff --git a/delete.go b/delete.go index 722da8a..c940cb8 100644 --- a/delete.go +++ b/delete.go @@ -14,9 +14,9 @@ func (h *DeleteEvent) Handle(e *Event) error { if err := h.deleteContainer(i.container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } - if i.logger != nil { - if err := i.logger.Close(); err != nil { - logrus.WithField("error", err).Error("containerd: close container logger") + if i.copier != nil { + if err := i.copier.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close container copier") } } h.s.notifySubscribers(&Event{ diff --git a/event.go b/event.go index 7239b15..c2346cd 100644 --- a/event.go +++ b/event.go @@ -36,6 +36,8 @@ type Event struct { Timestamp time.Time ID string BundlePath string + Stdout string + Stderr string Pid int Status int Signal os.Signal diff --git a/exit.go b/exit.go index a997f7f..27557cd 100644 --- a/exit.go +++ b/exit.go @@ -46,7 +46,7 @@ func (h *ExecExitEvent) Handle(e *Event) error { if err := info.container.RemoveProcess(e.Pid); err != nil { logrus.WithField("error", err).Error("containerd: find container for pid") } - if err := info.logger.Close(); err != nil { + if err := info.copier.Close(); err != nil { logrus.WithField("error", err).Error("containerd: close process IO") } delete(h.s.processes, e.Pid) diff --git a/io.go b/io.go new file mode 100644 index 0000000..221dc95 --- /dev/null +++ b/io.go @@ -0,0 +1,52 @@ +package containerd + +import ( + "io" + "os" +) + +type ioConfig struct { + StdoutPath string + StderrPath string + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func newCopier(i *ioConfig) (*copier, error) { + l := &copier{ + config: i, + } + if i.StdoutPath != "" { + f, err := os.OpenFile(i.StdoutPath, os.O_RDWR, 0) + if err != nil { + return nil, err + } + go io.Copy(f, i.Stdout) + } + if i.StderrPath != "" { + f, err := os.OpenFile(i.StderrPath, os.O_RDWR, 0) + if err != nil { + return nil, err + } + go io.Copy(f, i.Stderr) + } + return l, nil +} + +type copier struct { + config *ioConfig +} + +func (l *copier) Close() (err error) { + for _, c := range []io.Closer{ + l.config.Stdin, + l.config.Stdout, + l.config.Stderr, + } { + if cerr := c.Close(); err == nil { + err = cerr + } + } + return err +} diff --git a/log.go b/log.go deleted file mode 100644 index 2dc96e7..0000000 --- a/log.go +++ /dev/null @@ -1,110 +0,0 @@ -package containerd - -import ( - "encoding/json" - "io" - "os" - "path/filepath" - "sync" - "time" - - "github.com/Sirupsen/logrus" -) - -type logConfig struct { - BundlePath string - LogSize int64 // in bytes - Stdin io.WriteCloser - Stdout io.ReadCloser - Stderr io.ReadCloser -} - -func newLogger(i *logConfig) (*logger, error) { - l := &logger{ - config: i, - messages: make(chan *Message, DefaultBufferSize), - } - f, err := os.OpenFile( - filepath.Join(l.config.BundlePath, "logs.json"), - os.O_CREATE|os.O_WRONLY|os.O_APPEND, - 0655, - ) - if err != nil { - return nil, err - } - l.f = f - hout := &logHandler{ - stream: "stdout", - messages: l.messages, - } - herr := &logHandler{ - stream: "stderr", - messages: l.messages, - } - go func() { - io.Copy(hout, i.Stdout) - }() - go func() { - io.Copy(herr, i.Stderr) - }() - l.start() - return l, nil -} - -type Message struct { - Stream string `json:"stream"` - Timestamp time.Time `json:"timestamp"` - Data []byte `json:"data"` -} - -type logger struct { - config *logConfig - f *os.File - wg sync.WaitGroup - messages chan *Message -} - -type logHandler struct { - stream string - messages chan *Message -} - -func (h *logHandler) Write(b []byte) (int, error) { - h.messages <- &Message{ - Stream: h.stream, - Timestamp: time.Now(), - Data: b, - } - return len(b), nil -} - -func (l *logger) start() { - l.wg.Add(1) - go func() { - l.wg.Done() - enc := json.NewEncoder(l.f) - for m := range l.messages { - if err := enc.Encode(m); err != nil { - logrus.WithField("error", err).Error("write log message") - } - } - }() -} - -func (l *logger) Close() (err error) { - for _, c := range []io.Closer{ - l.config.Stdin, - l.config.Stdout, - l.config.Stderr, - } { - if cerr := c.Close(); err == nil { - err = cerr - } - } - close(l.messages) - l.wg.Wait() - if ferr := l.f.Close(); err == nil { - err = ferr - } - return err -} diff --git a/runtime/container.go b/runtime/container.go index b25b69b..16d5283 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -44,6 +44,7 @@ func (i *IO) Close() error { i.Stdin, i.Stdout, i.Stderr, + i.Console, } { if c != nil { if err := c.Close(); oerr == nil { @@ -51,9 +52,6 @@ func (i *IO) Close() error { } } } - if i.Console != nil { - oerr = i.Console.Close() - } return oerr } diff --git a/start.go b/start.go index f84542a..94a8da7 100644 --- a/start.go +++ b/start.go @@ -18,6 +18,8 @@ func (h *StartEvent) Handle(e *Event) error { Err: e.Err, IO: io, Container: container, + Stdout: e.Stdout, + Stderr: e.Stderr, } if e.Checkpoint != nil { task.Checkpoint = e.Checkpoint.Name diff --git a/supervisor.go b/supervisor.go index b73bc02..8988235 100644 --- a/supervisor.go +++ b/supervisor.go @@ -56,7 +56,7 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err type containerInfo struct { container runtime.Container - logger *logger + copier *copier } type Supervisor struct { @@ -222,14 +222,15 @@ func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } -func (s *Supervisor) log(path string, i *runtime.IO) (*logger, error) { - config := &logConfig{ - BundlePath: path, +func (s *Supervisor) copyIO(stdout, stderr string, i *runtime.IO) (*copier, error) { + config := &ioConfig{ Stdin: i.Stdin, Stdout: i.Stdout, Stderr: i.Stderr, + StdoutPath: stdout, + StderrPath: stderr, } - l, err := newLogger(config) + l, err := newCopier(config) if err != nil { return nil, err } diff --git a/worker.go b/worker.go index 9f14d33..c7d4dee 100644 --- a/worker.go +++ b/worker.go @@ -15,6 +15,8 @@ type StartTask struct { Container runtime.Container Checkpoint string IO *runtime.IO + Stdout string + Stderr string Err chan error } @@ -34,8 +36,7 @@ func (w *worker) Start() { defer w.wg.Done() for t := range w.s.tasks { started := time.Now() - // start logging the container's stdio - l, err := w.s.log(t.Container.Path(), t.IO) + l, err := w.s.copyIO(t.Stdout, t.Stderr, t.IO) if err != nil { evt := NewEvent(DeleteEventType) evt.ID = t.Container.ID() @@ -43,7 +44,7 @@ func (w *worker) Start() { t.Err <- err continue } - w.s.containers[t.Container.ID()].logger = l + w.s.containers[t.Container.ID()].copier = l if t.Checkpoint != "" { if err := t.Container.Restore(t.Checkpoint); err != nil { evt := NewEvent(DeleteEventType)