From 92c0790899d3f3c4e0d465e6ea667bb93a272b95 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 14 Dec 2015 14:40:50 -0800 Subject: [PATCH] Add stdin support for client and daemon Signed-off-by: Michael Crosby --- add_process.go | 2 +- api/grpc/server/server.go | 1 + ctr/container.go | 31 +++++++++++++++++-------------- event.go | 1 + io.go | 27 ++++++++++++++++++--------- start.go | 1 + supervisor.go | 3 ++- worker.go | 3 ++- 8 files changed, 43 insertions(+), 26 deletions(-) diff --git a/add_process.go b/add_process.go index 84c0fb5..d458b0b 100644 --- a/add_process.go +++ b/add_process.go @@ -17,7 +17,7 @@ func (h *AddProcessEvent) Handle(e *Event) error { if err != nil { return err } - l, err := h.s.copyIO(e.Stdout, e.Stderr, io) + l, err := h.s.copyIO(e.Stdin, e.Stdout, e.Stderr, io) if err != nil { // log the error but continue with the other commands logrus.WithFields(logrus.Fields{ diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 102e886..3e08bad 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -35,6 +35,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine e.BundlePath = c.BundlePath e.Stdout = c.Stdout e.Stderr = c.Stderr + e.Stdin = c.Stdin if c.Checkpoint != "" { e.Checkpoint = &runtime.Checkpoint{ Name: c.Checkpoint, diff --git a/ctr/container.go b/ctr/container.go index 9958fc7..f931111 100644 --- a/ctr/container.go +++ b/ctr/container.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "os" "path/filepath" - "sync" "syscall" "text/tabwriter" @@ -89,9 +88,8 @@ var StartCommand = cli.Command{ BundlePath: path, Checkpoint: context.String("checkpoint"), } - wg := &sync.WaitGroup{} if context.Bool("interactive") { - if err := attachStdio(r, wg); err != nil { + if err := attachStdio(r); err != nil { fatal(err.Error(), 1) } } @@ -99,30 +97,38 @@ var StartCommand = cli.Command{ if _, err := c.CreateContainer(netcontext.Background(), r); err != nil { fatal(err.Error(), 1) } - wg.Wait() + if stdin != nil { + io.Copy(stdin, os.Stdin) + } }, } -func attachStdio(r *types.CreateContainerRequest, wg *sync.WaitGroup) error { +var stdin io.WriteCloser + +func attachStdio(r *types.CreateContainerRequest) error { dir, err := ioutil.TempDir("", "ctr-") if err != nil { return err } - wg.Add(2) for _, p := range []struct { path string flag int done func(f *os.File) }{ + { + path: filepath.Join(dir, "stdin"), + flag: syscall.O_RDWR, + done: func(f *os.File) { + r.Stdin = filepath.Join(dir, "stdin") + stdin = f + }, + }, { path: filepath.Join(dir, "stdout"), flag: syscall.O_RDWR, done: func(f *os.File) { r.Stdout = filepath.Join(dir, "stdout") - go func() { - io.Copy(os.Stdout, f) - wg.Done() - }() + go io.Copy(os.Stdout, f) }, }, { @@ -130,10 +136,7 @@ func attachStdio(r *types.CreateContainerRequest, wg *sync.WaitGroup) error { flag: syscall.O_RDWR, done: func(f *os.File) { r.Stderr = filepath.Join(dir, "stderr") - go func() { - io.Copy(os.Stderr, f) - wg.Done() - }() + go io.Copy(os.Stderr, f) }, }, } { diff --git a/event.go b/event.go index c2346cd..6a5c5ea 100644 --- a/event.go +++ b/event.go @@ -38,6 +38,7 @@ type Event struct { BundlePath string Stdout string Stderr string + Stdin string Pid int Status int Signal os.Signal diff --git a/io.go b/io.go index 221dc95..0c4a0d0 100644 --- a/io.go +++ b/io.go @@ -8,20 +8,31 @@ import ( type ioConfig struct { StdoutPath string StderrPath string - Stdin io.WriteCloser - Stdout io.ReadCloser - Stderr io.ReadCloser + StdinPath string + + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser } func newCopier(i *ioConfig) (*copier, error) { l := &copier{ config: i, } + if i.StdinPath != "" { + f, err := os.OpenFile(i.StdinPath, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + l.closers = append(l.closers, f) + go io.Copy(i.Stdin, f) + } if i.StdoutPath != "" { f, err := os.OpenFile(i.StdoutPath, os.O_RDWR, 0) if err != nil { return nil, err } + l.closers = append(l.closers, f) go io.Copy(f, i.Stdout) } if i.StderrPath != "" { @@ -29,21 +40,19 @@ func newCopier(i *ioConfig) (*copier, error) { if err != nil { return nil, err } + l.closers = append(l.closers, f) go io.Copy(f, i.Stderr) } return l, nil } type copier struct { - config *ioConfig + config *ioConfig + closers []io.Closer } func (l *copier) Close() (err error) { - for _, c := range []io.Closer{ - l.config.Stdin, - l.config.Stdout, - l.config.Stderr, - } { + for _, c := range append(l.closers, l.config.Stdin, l.config.Stdout, l.config.Stderr) { if cerr := c.Close(); err == nil { err = cerr } diff --git a/start.go b/start.go index 94a8da7..1f9ec8b 100644 --- a/start.go +++ b/start.go @@ -18,6 +18,7 @@ func (h *StartEvent) Handle(e *Event) error { Err: e.Err, IO: io, Container: container, + Stdin: e.Stdin, Stdout: e.Stdout, Stderr: e.Stderr, } diff --git a/supervisor.go b/supervisor.go index 8988235..c186c6d 100644 --- a/supervisor.go +++ b/supervisor.go @@ -222,13 +222,14 @@ func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } -func (s *Supervisor) copyIO(stdout, stderr string, i *runtime.IO) (*copier, error) { +func (s *Supervisor) copyIO(stdin, stdout, stderr string, i *runtime.IO) (*copier, error) { config := &ioConfig{ Stdin: i.Stdin, Stdout: i.Stdout, Stderr: i.Stderr, StdoutPath: stdout, StderrPath: stderr, + StdinPath: stdin, } l, err := newCopier(config) if err != nil { diff --git a/worker.go b/worker.go index c7d4dee..eb15c69 100644 --- a/worker.go +++ b/worker.go @@ -15,6 +15,7 @@ type StartTask struct { Container runtime.Container Checkpoint string IO *runtime.IO + Stdin string Stdout string Stderr string Err chan error @@ -36,7 +37,7 @@ func (w *worker) Start() { defer w.wg.Done() for t := range w.s.tasks { started := time.Now() - l, err := w.s.copyIO(t.Stdout, t.Stderr, t.IO) + l, err := w.s.copyIO(t.Stdin, t.Stdout, t.Stderr, t.IO) if err != nil { evt := NewEvent(DeleteEventType) evt.ID = t.Container.ID()