Add stdin support for client and daemon

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2015-12-14 14:40:50 -08:00
parent c4aa39a818
commit 92c0790899
8 changed files with 43 additions and 26 deletions

View file

@ -17,7 +17,7 @@ func (h *AddProcessEvent) Handle(e *Event) error {
if err != nil { if err != nil {
return err return err
} }
l, err := h.s.copyIO(e.Stdout, e.Stderr, io) l, err := h.s.copyIO(e.Stdin, e.Stdout, e.Stderr, io)
if err != nil { if err != nil {
// log the error but continue with the other commands // log the error but continue with the other commands
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{

View file

@ -35,6 +35,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
e.BundlePath = c.BundlePath e.BundlePath = c.BundlePath
e.Stdout = c.Stdout e.Stdout = c.Stdout
e.Stderr = c.Stderr e.Stderr = c.Stderr
e.Stdin = c.Stdin
if c.Checkpoint != "" { if c.Checkpoint != "" {
e.Checkpoint = &runtime.Checkpoint{ e.Checkpoint = &runtime.Checkpoint{
Name: c.Checkpoint, Name: c.Checkpoint,

View file

@ -6,7 +6,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"syscall" "syscall"
"text/tabwriter" "text/tabwriter"
@ -89,9 +88,8 @@ var StartCommand = cli.Command{
BundlePath: path, BundlePath: path,
Checkpoint: context.String("checkpoint"), Checkpoint: context.String("checkpoint"),
} }
wg := &sync.WaitGroup{}
if context.Bool("interactive") { if context.Bool("interactive") {
if err := attachStdio(r, wg); err != nil { if err := attachStdio(r); err != nil {
fatal(err.Error(), 1) fatal(err.Error(), 1)
} }
} }
@ -99,30 +97,38 @@ var StartCommand = cli.Command{
if _, err := c.CreateContainer(netcontext.Background(), r); err != nil { if _, err := c.CreateContainer(netcontext.Background(), r); err != nil {
fatal(err.Error(), 1) fatal(err.Error(), 1)
} }
wg.Wait() if stdin != nil {
io.Copy(stdin, os.Stdin)
}
}, },
} }
func attachStdio(r *types.CreateContainerRequest, wg *sync.WaitGroup) error { var stdin io.WriteCloser
func attachStdio(r *types.CreateContainerRequest) error {
dir, err := ioutil.TempDir("", "ctr-") dir, err := ioutil.TempDir("", "ctr-")
if err != nil { if err != nil {
return err return err
} }
wg.Add(2)
for _, p := range []struct { for _, p := range []struct {
path string path string
flag int flag int
done func(f *os.File) done func(f *os.File)
}{ }{
{
path: filepath.Join(dir, "stdin"),
flag: syscall.O_RDWR,
done: func(f *os.File) {
r.Stdin = filepath.Join(dir, "stdin")
stdin = f
},
},
{ {
path: filepath.Join(dir, "stdout"), path: filepath.Join(dir, "stdout"),
flag: syscall.O_RDWR, flag: syscall.O_RDWR,
done: func(f *os.File) { done: func(f *os.File) {
r.Stdout = filepath.Join(dir, "stdout") r.Stdout = filepath.Join(dir, "stdout")
go func() { go io.Copy(os.Stdout, f)
io.Copy(os.Stdout, f)
wg.Done()
}()
}, },
}, },
{ {
@ -130,10 +136,7 @@ func attachStdio(r *types.CreateContainerRequest, wg *sync.WaitGroup) error {
flag: syscall.O_RDWR, flag: syscall.O_RDWR,
done: func(f *os.File) { done: func(f *os.File) {
r.Stderr = filepath.Join(dir, "stderr") r.Stderr = filepath.Join(dir, "stderr")
go func() { go io.Copy(os.Stderr, f)
io.Copy(os.Stderr, f)
wg.Done()
}()
}, },
}, },
} { } {

View file

@ -38,6 +38,7 @@ type Event struct {
BundlePath string BundlePath string
Stdout string Stdout string
Stderr string Stderr string
Stdin string
Pid int Pid int
Status int Status int
Signal os.Signal Signal os.Signal

19
io.go
View file

@ -8,6 +8,8 @@ import (
type ioConfig struct { type ioConfig struct {
StdoutPath string StdoutPath string
StderrPath string StderrPath string
StdinPath string
Stdin io.WriteCloser Stdin io.WriteCloser
Stdout io.ReadCloser Stdout io.ReadCloser
Stderr io.ReadCloser Stderr io.ReadCloser
@ -17,11 +19,20 @@ func newCopier(i *ioConfig) (*copier, error) {
l := &copier{ l := &copier{
config: i, config: i,
} }
if i.StdinPath != "" {
f, err := os.OpenFile(i.StdinPath, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
l.closers = append(l.closers, f)
go io.Copy(i.Stdin, f)
}
if i.StdoutPath != "" { if i.StdoutPath != "" {
f, err := os.OpenFile(i.StdoutPath, os.O_RDWR, 0) f, err := os.OpenFile(i.StdoutPath, os.O_RDWR, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
l.closers = append(l.closers, f)
go io.Copy(f, i.Stdout) go io.Copy(f, i.Stdout)
} }
if i.StderrPath != "" { if i.StderrPath != "" {
@ -29,6 +40,7 @@ func newCopier(i *ioConfig) (*copier, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
l.closers = append(l.closers, f)
go io.Copy(f, i.Stderr) go io.Copy(f, i.Stderr)
} }
return l, nil return l, nil
@ -36,14 +48,11 @@ func newCopier(i *ioConfig) (*copier, error) {
type copier struct { type copier struct {
config *ioConfig config *ioConfig
closers []io.Closer
} }
func (l *copier) Close() (err error) { func (l *copier) Close() (err error) {
for _, c := range []io.Closer{ for _, c := range append(l.closers, l.config.Stdin, l.config.Stdout, l.config.Stderr) {
l.config.Stdin,
l.config.Stdout,
l.config.Stderr,
} {
if cerr := c.Close(); err == nil { if cerr := c.Close(); err == nil {
err = cerr err = cerr
} }

View file

@ -18,6 +18,7 @@ func (h *StartEvent) Handle(e *Event) error {
Err: e.Err, Err: e.Err,
IO: io, IO: io,
Container: container, Container: container,
Stdin: e.Stdin,
Stdout: e.Stdout, Stdout: e.Stdout,
Stderr: e.Stderr, Stderr: e.Stderr,
} }

View file

@ -222,13 +222,14 @@ func (s *Supervisor) SendEvent(evt *Event) {
s.events <- evt s.events <- evt
} }
func (s *Supervisor) copyIO(stdout, stderr string, i *runtime.IO) (*copier, error) { func (s *Supervisor) copyIO(stdin, stdout, stderr string, i *runtime.IO) (*copier, error) {
config := &ioConfig{ config := &ioConfig{
Stdin: i.Stdin, Stdin: i.Stdin,
Stdout: i.Stdout, Stdout: i.Stdout,
Stderr: i.Stderr, Stderr: i.Stderr,
StdoutPath: stdout, StdoutPath: stdout,
StderrPath: stderr, StderrPath: stderr,
StdinPath: stdin,
} }
l, err := newCopier(config) l, err := newCopier(config)
if err != nil { if err != nil {

View file

@ -15,6 +15,7 @@ type StartTask struct {
Container runtime.Container Container runtime.Container
Checkpoint string Checkpoint string
IO *runtime.IO IO *runtime.IO
Stdin string
Stdout string Stdout string
Stderr string Stderr string
Err chan error Err chan error
@ -36,7 +37,7 @@ func (w *worker) Start() {
defer w.wg.Done() defer w.wg.Done()
for t := range w.s.tasks { for t := range w.s.tasks {
started := time.Now() started := time.Now()
l, err := w.s.copyIO(t.Stdout, t.Stderr, t.IO) l, err := w.s.copyIO(t.Stdin, t.Stdout, t.Stderr, t.IO)
if err != nil { if err != nil {
evt := NewEvent(DeleteEventType) evt := NewEvent(DeleteEventType)
evt.ID = t.Container.ID() evt.ID = t.Container.ID()