From e9f63fc9a41c057a8245f7bfc090d7f210f602df Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 14 Dec 2015 14:15:26 -0800 Subject: [PATCH 1/3] Add basic fifo support for IO copy Signed-off-by: Michael Crosby --- add_process.go | 4 +- api/grpc/server/server.go | 2 + ctr/container.go | 69 ++++++++++++++++++++++-- ctr/logs.go | 60 --------------------- ctr/main.go | 1 - delete.go | 6 +-- event.go | 2 + exit.go | 2 +- io.go | 52 ++++++++++++++++++ log.go | 110 -------------------------------------- runtime/container.go | 4 +- start.go | 2 + supervisor.go | 11 ++-- worker.go | 7 +-- 14 files changed, 141 insertions(+), 191 deletions(-) delete mode 100644 ctr/logs.go create mode 100644 io.go delete mode 100644 log.go diff --git a/add_process.go b/add_process.go index 862f289..84c0fb5 100644 --- a/add_process.go +++ b/add_process.go @@ -17,7 +17,7 @@ func (h *AddProcessEvent) Handle(e *Event) error { if err != nil { return err } - l, err := h.s.log(ci.container.Path(), io) + l, err := h.s.copyIO(e.Stdout, e.Stderr, io) if err != nil { // log the error but continue with the other commands logrus.WithFields(logrus.Fields{ @@ -30,7 +30,7 @@ func (h *AddProcessEvent) Handle(e *Event) error { } h.s.processes[e.Pid] = &containerInfo{ container: ci.container, - logger: l, + copier: l, } return nil } diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 0eae0c2..102e886 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -33,6 +33,8 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine e := containerd.NewEvent(containerd.StartContainerEventType) e.ID = c.Id e.BundlePath = c.BundlePath + e.Stdout = c.Stdout + e.Stderr = c.Stderr if c.Checkpoint != "" { e.Checkpoint = &runtime.Checkpoint{ Name: c.Checkpoint, diff --git a/ctr/container.go b/ctr/container.go index 522bd2b..9958fc7 100644 --- a/ctr/container.go +++ b/ctr/container.go @@ -2,7 +2,12 @@ package main import ( "fmt" + "io" + "io/ioutil" "os" + "path/filepath" + "sync" + "syscall" "text/tabwriter" "github.com/codegangsta/cli" @@ -63,6 +68,10 @@ var StartCommand = cli.Command{ Value: "", Usage: "checkpoint to start the container from", }, + cli.BoolFlag{ + Name: "interactive,i", + Usage: "connect to the stdio of the container", + }, }, Action: func(context *cli.Context) { var ( @@ -75,17 +84,71 @@ var StartCommand = cli.Command{ if id == "" { fatal("container id cannot be empty", 1) } - c := getClient() - if _, err := c.CreateContainer(netcontext.Background(), &types.CreateContainerRequest{ + r := &types.CreateContainerRequest{ Id: id, BundlePath: path, Checkpoint: context.String("checkpoint"), - }); err != nil { + } + wg := &sync.WaitGroup{} + if context.Bool("interactive") { + if err := attachStdio(r, wg); err != nil { + fatal(err.Error(), 1) + } + } + c := getClient() + if _, err := c.CreateContainer(netcontext.Background(), r); err != nil { fatal(err.Error(), 1) } + wg.Wait() }, } +func attachStdio(r *types.CreateContainerRequest, wg *sync.WaitGroup) error { + dir, err := ioutil.TempDir("", "ctr-") + if err != nil { + return err + } + wg.Add(2) + for _, p := range []struct { + path string + flag int + done func(f *os.File) + }{ + { + path: filepath.Join(dir, "stdout"), + flag: syscall.O_RDWR, + done: func(f *os.File) { + r.Stdout = filepath.Join(dir, "stdout") + go func() { + io.Copy(os.Stdout, f) + wg.Done() + }() + }, + }, + { + path: filepath.Join(dir, "stderr"), + flag: syscall.O_RDWR, + done: func(f *os.File) { + r.Stderr = filepath.Join(dir, "stderr") + go func() { + io.Copy(os.Stderr, f) + wg.Done() + }() + }, + }, + } { + if err := syscall.Mkfifo(p.path, 0755); err != nil { + return fmt.Errorf("mkfifo: %s %v", p.path, err) + } + f, err := os.OpenFile(p.path, p.flag, 0) + if err != nil { + return fmt.Errorf("open: %s %v", p.path, err) + } + p.done(f) + } + return nil +} + var KillCommand = cli.Command{ Name: "kill", Usage: "send a signal to a container or it's processes", diff --git a/ctr/logs.go b/ctr/logs.go deleted file mode 100644 index 65e04bd..0000000 --- a/ctr/logs.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "encoding/json" - "io" - "os" - "time" - - "github.com/codegangsta/cli" - "github.com/docker/containerd" -) - -var LogsCommand = cli.Command{ - Name: "logs", - Usage: "view binary container logs generated by containerd", - Flags: []cli.Flag{ - cli.BoolFlag{ - Name: "follow,f", - Usage: "follow/tail the logs", - }, - }, - Action: func(context *cli.Context) { - path := context.Args().First() - if path == "" { - fatal("path to the log cannot be empty", 1) - } - if err := readLogs(path, context.Bool("follow")); err != nil { - fatal(err.Error(), 1) - } - }, -} - -func readLogs(path string, follow bool) error { - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - dec := json.NewDecoder(f) - for { - var msg *containerd.Message - if err := dec.Decode(&msg); err != nil { - if err == io.EOF { - if follow { - time.Sleep(100 * time.Millisecond) - continue - } - return nil - } - return err - } - switch msg.Stream { - case "stdout": - os.Stdout.Write(msg.Data) - case "stderr": - os.Stderr.Write(msg.Data) - } - } - return nil -} diff --git a/ctr/main.go b/ctr/main.go index e966e04..632f168 100644 --- a/ctr/main.go +++ b/ctr/main.go @@ -37,7 +37,6 @@ func main() { CheckpointCommand, ContainersCommand, EventsCommand, - LogsCommand, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { diff --git a/delete.go b/delete.go index 722da8a..c940cb8 100644 --- a/delete.go +++ b/delete.go @@ -14,9 +14,9 @@ func (h *DeleteEvent) Handle(e *Event) error { if err := h.deleteContainer(i.container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } - if i.logger != nil { - if err := i.logger.Close(); err != nil { - logrus.WithField("error", err).Error("containerd: close container logger") + if i.copier != nil { + if err := i.copier.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close container copier") } } h.s.notifySubscribers(&Event{ diff --git a/event.go b/event.go index 7239b15..c2346cd 100644 --- a/event.go +++ b/event.go @@ -36,6 +36,8 @@ type Event struct { Timestamp time.Time ID string BundlePath string + Stdout string + Stderr string Pid int Status int Signal os.Signal diff --git a/exit.go b/exit.go index a997f7f..27557cd 100644 --- a/exit.go +++ b/exit.go @@ -46,7 +46,7 @@ func (h *ExecExitEvent) Handle(e *Event) error { if err := info.container.RemoveProcess(e.Pid); err != nil { logrus.WithField("error", err).Error("containerd: find container for pid") } - if err := info.logger.Close(); err != nil { + if err := info.copier.Close(); err != nil { logrus.WithField("error", err).Error("containerd: close process IO") } delete(h.s.processes, e.Pid) diff --git a/io.go b/io.go new file mode 100644 index 0000000..221dc95 --- /dev/null +++ b/io.go @@ -0,0 +1,52 @@ +package containerd + +import ( + "io" + "os" +) + +type ioConfig struct { + StdoutPath string + StderrPath string + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func newCopier(i *ioConfig) (*copier, error) { + l := &copier{ + config: i, + } + if i.StdoutPath != "" { + f, err := os.OpenFile(i.StdoutPath, os.O_RDWR, 0) + if err != nil { + return nil, err + } + go io.Copy(f, i.Stdout) + } + if i.StderrPath != "" { + f, err := os.OpenFile(i.StderrPath, os.O_RDWR, 0) + if err != nil { + return nil, err + } + go io.Copy(f, i.Stderr) + } + return l, nil +} + +type copier struct { + config *ioConfig +} + +func (l *copier) Close() (err error) { + for _, c := range []io.Closer{ + l.config.Stdin, + l.config.Stdout, + l.config.Stderr, + } { + if cerr := c.Close(); err == nil { + err = cerr + } + } + return err +} diff --git a/log.go b/log.go deleted file mode 100644 index 2dc96e7..0000000 --- a/log.go +++ /dev/null @@ -1,110 +0,0 @@ -package containerd - -import ( - "encoding/json" - "io" - "os" - "path/filepath" - "sync" - "time" - - "github.com/Sirupsen/logrus" -) - -type logConfig struct { - BundlePath string - LogSize int64 // in bytes - Stdin io.WriteCloser - Stdout io.ReadCloser - Stderr io.ReadCloser -} - -func newLogger(i *logConfig) (*logger, error) { - l := &logger{ - config: i, - messages: make(chan *Message, DefaultBufferSize), - } - f, err := os.OpenFile( - filepath.Join(l.config.BundlePath, "logs.json"), - os.O_CREATE|os.O_WRONLY|os.O_APPEND, - 0655, - ) - if err != nil { - return nil, err - } - l.f = f - hout := &logHandler{ - stream: "stdout", - messages: l.messages, - } - herr := &logHandler{ - stream: "stderr", - messages: l.messages, - } - go func() { - io.Copy(hout, i.Stdout) - }() - go func() { - io.Copy(herr, i.Stderr) - }() - l.start() - return l, nil -} - -type Message struct { - Stream string `json:"stream"` - Timestamp time.Time `json:"timestamp"` - Data []byte `json:"data"` -} - -type logger struct { - config *logConfig - f *os.File - wg sync.WaitGroup - messages chan *Message -} - -type logHandler struct { - stream string - messages chan *Message -} - -func (h *logHandler) Write(b []byte) (int, error) { - h.messages <- &Message{ - Stream: h.stream, - Timestamp: time.Now(), - Data: b, - } - return len(b), nil -} - -func (l *logger) start() { - l.wg.Add(1) - go func() { - l.wg.Done() - enc := json.NewEncoder(l.f) - for m := range l.messages { - if err := enc.Encode(m); err != nil { - logrus.WithField("error", err).Error("write log message") - } - } - }() -} - -func (l *logger) Close() (err error) { - for _, c := range []io.Closer{ - l.config.Stdin, - l.config.Stdout, - l.config.Stderr, - } { - if cerr := c.Close(); err == nil { - err = cerr - } - } - close(l.messages) - l.wg.Wait() - if ferr := l.f.Close(); err == nil { - err = ferr - } - return err -} diff --git a/runtime/container.go b/runtime/container.go index b25b69b..16d5283 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -44,6 +44,7 @@ func (i *IO) Close() error { i.Stdin, i.Stdout, i.Stderr, + i.Console, } { if c != nil { if err := c.Close(); oerr == nil { @@ -51,9 +52,6 @@ func (i *IO) Close() error { } } } - if i.Console != nil { - oerr = i.Console.Close() - } return oerr } diff --git a/start.go b/start.go index f84542a..94a8da7 100644 --- a/start.go +++ b/start.go @@ -18,6 +18,8 @@ func (h *StartEvent) Handle(e *Event) error { Err: e.Err, IO: io, Container: container, + Stdout: e.Stdout, + Stderr: e.Stderr, } if e.Checkpoint != nil { task.Checkpoint = e.Checkpoint.Name diff --git a/supervisor.go b/supervisor.go index b73bc02..8988235 100644 --- a/supervisor.go +++ b/supervisor.go @@ -56,7 +56,7 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err type containerInfo struct { container runtime.Container - logger *logger + copier *copier } type Supervisor struct { @@ -222,14 +222,15 @@ func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } -func (s *Supervisor) log(path string, i *runtime.IO) (*logger, error) { - config := &logConfig{ - BundlePath: path, +func (s *Supervisor) copyIO(stdout, stderr string, i *runtime.IO) (*copier, error) { + config := &ioConfig{ Stdin: i.Stdin, Stdout: i.Stdout, Stderr: i.Stderr, + StdoutPath: stdout, + StderrPath: stderr, } - l, err := newLogger(config) + l, err := newCopier(config) if err != nil { return nil, err } diff --git a/worker.go b/worker.go index 9f14d33..c7d4dee 100644 --- a/worker.go +++ b/worker.go @@ -15,6 +15,8 @@ type StartTask struct { Container runtime.Container Checkpoint string IO *runtime.IO + Stdout string + Stderr string Err chan error } @@ -34,8 +36,7 @@ func (w *worker) Start() { defer w.wg.Done() for t := range w.s.tasks { started := time.Now() - // start logging the container's stdio - l, err := w.s.log(t.Container.Path(), t.IO) + l, err := w.s.copyIO(t.Stdout, t.Stderr, t.IO) if err != nil { evt := NewEvent(DeleteEventType) evt.ID = t.Container.ID() @@ -43,7 +44,7 @@ func (w *worker) Start() { t.Err <- err continue } - w.s.containers[t.Container.ID()].logger = l + w.s.containers[t.Container.ID()].copier = l if t.Checkpoint != "" { if err := t.Container.Restore(t.Checkpoint); err != nil { evt := NewEvent(DeleteEventType) From c4aa39a81814f06d87562a448c548026f23140d0 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 14 Dec 2015 14:18:42 -0800 Subject: [PATCH 2/3] Update protos with stdin for container create Signed-off-by: Michael Crosby --- api/grpc/types/api.pb.go | 101 ++++++++++++++++++++------------------- api/grpc/types/api.proto | 1 + 2 files changed, 52 insertions(+), 50 deletions(-) diff --git a/api/grpc/types/api.pb.go b/api/grpc/types/api.pb.go index 062db99..a1d4419 100644 --- a/api/grpc/types/api.pb.go +++ b/api/grpc/types/api.pb.go @@ -53,6 +53,7 @@ var _ = math.Inf type CreateContainerRequest struct { Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` BundlePath string `protobuf:"bytes,2,opt,name=bundlePath" json:"bundlePath,omitempty"` + Stdin string `protobuf:"bytes,3,opt,name=stdin" json:"stdin,omitempty"` Stdout string `protobuf:"bytes,4,opt,name=stdout" json:"stdout,omitempty"` Stderr string `protobuf:"bytes,5,opt,name=stderr" json:"stderr,omitempty"` Checkpoint string `protobuf:"bytes,7,opt,name=checkpoint" json:"checkpoint,omitempty"` @@ -711,54 +712,54 @@ var _API_serviceDesc = grpc.ServiceDesc{ } var fileDescriptor0 = []byte{ - // 773 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x56, 0x5d, 0x4f, 0xdc, 0x3a, - 0x10, 0xdd, 0x25, 0x9b, 0xfd, 0x98, 0x65, 0xc3, 0x62, 0x60, 0x09, 0xd1, 0xe5, 0x02, 0xb9, 0xb7, - 0x55, 0x9f, 0x10, 0x82, 0x4a, 0xe5, 0xb1, 0x08, 0xaa, 0xaa, 0x12, 0x55, 0x11, 0x88, 0x4a, 0x7d, - 0x0c, 0x89, 0xc5, 0x46, 0x64, 0x93, 0x34, 0x76, 0x28, 0xfc, 0x8d, 0xfe, 0xb2, 0xfe, 0xa4, 0x3a, - 0x8e, 0xed, 0x7c, 0x6c, 0x52, 0x9e, 0xfa, 0xe8, 0xb1, 0x7d, 0xce, 0xcc, 0x99, 0xf1, 0x49, 0x60, - 0xe4, 0xc4, 0xfe, 0x61, 0x9c, 0x44, 0x34, 0x42, 0x3a, 0x7d, 0x8e, 0x31, 0xb1, 0x03, 0x98, 0x9d, - 0x27, 0xd8, 0xa1, 0xf8, 0x3c, 0x0a, 0xa9, 0xe3, 0x87, 0x38, 0xb9, 0xc6, 0xdf, 0x53, 0x4c, 0x28, - 0x02, 0x58, 0xf1, 0x3d, 0xb3, 0xbb, 0xdf, 0x7d, 0x33, 0x42, 0x6c, 0x71, 0x97, 0x86, 0x5e, 0x80, - 0xaf, 0x1c, 0x3a, 0x37, 0x57, 0x78, 0xcc, 0x80, 0x3e, 0xa1, 0x5e, 0x94, 0x52, 0xb3, 0x57, 0x5a, - 0xe3, 0x24, 0x31, 0x75, 0x79, 0xc7, 0x9d, 0x63, 0xf7, 0x21, 0x8e, 0xfc, 0x90, 0x9a, 0x83, 0x2c, - 0x66, 0xef, 0xc0, 0xf6, 0x12, 0x1b, 0x89, 0xa3, 0x90, 0x60, 0xfb, 0x14, 0x26, 0x37, 0xfe, 0x7d, - 0xe8, 0x04, 0x4d, 0xfc, 0x63, 0xd0, 0x62, 0xb6, 0xc8, 0x88, 0x27, 0x9c, 0x88, 0x9f, 0x34, 0xb5, - 0x6c, 0x6d, 0x4f, 0xc1, 0x90, 0x37, 0x05, 0x16, 0x85, 0xf5, 0x33, 0xcf, 0xbb, 0x4a, 0x22, 0x17, - 0x13, 0xd2, 0x84, 0x37, 0x85, 0x21, 0xc5, 0xc9, 0xc2, 0xcf, 0x40, 0x32, 0xd0, 0x21, 0xda, 0x81, - 0x5e, 0x4a, 0x70, 0xc2, 0x21, 0xc7, 0xc7, 0xe3, 0x43, 0xae, 0xce, 0xe1, 0x2d, 0x0b, 0xa1, 0x55, - 0xe8, 0x39, 0xc9, 0x3d, 0x61, 0x65, 0x6a, 0x79, 0x2a, 0x38, 0x7c, 0x64, 0x35, 0x8a, 0x85, 0xfb, - 0xc3, 0x33, 0xfb, 0xbc, 0xb8, 0x53, 0xe8, 0xf1, 0xf3, 0x2c, 0x98, 0x0a, 0xa6, 0x49, 0xb6, 0xb8, - 0x57, 0x99, 0xcf, 0xc0, 0x70, 0x3c, 0xcf, 0xa7, 0x7e, 0xc4, 0x88, 0x3f, 0xfa, 0x1e, 0x61, 0x74, - 0x1a, 0xab, 0xe0, 0x00, 0x50, 0x39, 0xdf, 0xbc, 0x0a, 0x59, 0x34, 0xc7, 0xb1, 0x2f, 0x95, 0x72, - 0x4a, 0xd3, 0xa6, 0xc2, 0x5e, 0x55, 0x44, 0x5f, 0xe1, 0xc5, 0xac, 0x8b, 0x62, 0x8a, 0x9b, 0xb6, - 0x05, 0xe6, 0x32, 0x9a, 0x10, 0xef, 0x04, 0xb6, 0x2f, 0x70, 0x80, 0x5f, 0x62, 0x62, 0xaa, 0x84, - 0xce, 0x02, 0xe7, 0xc3, 0x90, 0x01, 0x2e, 0x5f, 0x12, 0x80, 0xff, 0xc1, 0xd6, 0xa5, 0x4f, 0xe8, - 0x1f, 0xe1, 0xec, 0x6f, 0x00, 0xc5, 0x01, 0x05, 0xae, 0xa8, 0xf0, 0x93, 0x4f, 0x45, 0xa7, 0x98, - 0x2c, 0xd4, 0x8d, 0x79, 0xa3, 0x86, 0x68, 0x03, 0xc6, 0x69, 0xe8, 0x3f, 0xdd, 0x44, 0xee, 0x03, - 0xa6, 0x84, 0x4f, 0xe2, 0x10, 0x4d, 0x40, 0x27, 0x73, 0x1c, 0x04, 0x7c, 0x10, 0x87, 0xf6, 0x7b, - 0x98, 0xd5, 0xf9, 0x85, 0xc2, 0xaf, 0x61, 0x5c, 0xa8, 0x45, 0x18, 0x9b, 0xd6, 0x2c, 0x97, 0x01, - 0xab, 0x37, 0x94, 0xa9, 0x25, 0x12, 0xb7, 0xf7, 0xc1, 0x50, 0x03, 0xcc, 0x37, 0xf2, 0xe1, 0x77, - 0x68, 0x4a, 0x44, 0x39, 0x0f, 0x30, 0x10, 0xed, 0xac, 0xb4, 0xf1, 0xef, 0x0c, 0x5e, 0x00, 0x23, - 0x95, 0x4e, 0x7b, 0x8f, 0x6a, 0x8f, 0x58, 0xe3, 0xb1, 0x03, 0x18, 0xc5, 0x79, 0x9e, 0x38, 0xe7, - 0x19, 0x1f, 0x1b, 0x22, 0x05, 0x99, 0x7f, 0x51, 0x1a, 0x7f, 0xd7, 0x6c, 0x3e, 0x06, 0x9f, 0x1d, - 0x77, 0xce, 0xc8, 0xea, 0x5c, 0x6e, 0xcc, 0x0e, 0xa9, 0x37, 0xba, 0xc0, 0x8b, 0x28, 0x79, 0xe6, - 0x3c, 0x3d, 0xfb, 0x2b, 0x7b, 0xdd, 0xb9, 0x82, 0x42, 0xfa, 0xff, 0xd9, 0xa0, 0xca, 0x9c, 0xa5, - 0xf2, 0x53, 0xa9, 0xbc, 0x2a, 0x66, 0x0f, 0x06, 0x8b, 0x9c, 0x4b, 0xcc, 0xb2, 0x4c, 0x4e, 0x64, - 0x60, 0x5f, 0xc0, 0xec, 0x36, 0xf6, 0x5e, 0xb2, 0xaf, 0xc2, 0x31, 0x0a, 0x07, 0xc9, 0x4b, 0xd2, - 0xa4, 0x2d, 0x2d, 0xa1, 0x88, 0xe1, 0x5d, 0x83, 0xc9, 0x87, 0x47, 0xcc, 0xa6, 0x43, 0xf6, 0xfe, - 0x57, 0x17, 0x74, 0x1e, 0xc9, 0x2a, 0xce, 0x92, 0x11, 0x1c, 0x39, 0x5f, 0xc9, 0x1a, 0x15, 0xfe, - 0xa4, 0xa6, 0x7c, 0xaf, 0x6c, 0x69, 0x7a, 0xcd, 0xd2, 0x06, 0x7c, 0xcd, 0xea, 0x16, 0x6d, 0x31, - 0x87, 0x95, 0xba, 0x65, 0x53, 0xaa, 0xf2, 0x8d, 0x5a, 0xe4, 0xab, 0xba, 0x01, 0xb4, 0xb8, 0xc1, - 0xf1, 0x4f, 0x1d, 0xb4, 0xb3, 0xab, 0x4f, 0xe8, 0x1a, 0xd6, 0x6a, 0xee, 0x8c, 0x76, 0xe5, 0xe9, - 0xc6, 0x6f, 0x84, 0xf5, 0x6f, 0xdb, 0xb6, 0x50, 0xaf, 0x93, 0x61, 0xd6, 0xa4, 0x55, 0x98, 0xcd, - 0x8d, 0x53, 0x98, 0x6d, 0x1d, 0xe9, 0xa0, 0x77, 0xd0, 0xcf, 0x0d, 0x1f, 0x6d, 0x8a, 0xb3, 0x95, - 0x2f, 0x87, 0xb5, 0x55, 0x8b, 0xaa, 0x8b, 0xe7, 0x00, 0x85, 0xcf, 0x22, 0x53, 0x1c, 0x5b, 0xfa, - 0x54, 0x58, 0x3b, 0x0d, 0x3b, 0x0a, 0xe4, 0x16, 0xa6, 0x75, 0xef, 0x44, 0x35, 0x1d, 0xea, 0x4e, - 0x67, 0xed, 0xb5, 0xee, 0x97, 0x61, 0xeb, 0x0e, 0xaa, 0x60, 0x5b, 0xfc, 0x58, 0xc1, 0xb6, 0x5a, - 0x6f, 0x07, 0x7d, 0x01, 0xa3, 0x6a, 0x7e, 0xe8, 0x1f, 0x71, 0xa9, 0xd1, 0x93, 0xad, 0xdd, 0x96, - 0x5d, 0x05, 0xf8, 0x16, 0xf4, 0xdc, 0xf2, 0x36, 0xa4, 0xca, 0x25, 0x67, 0xb4, 0x36, 0xab, 0x41, - 0x75, 0xeb, 0x08, 0xfa, 0xf9, 0x33, 0x52, 0x2d, 0xab, 0xbc, 0x2a, 0x6b, 0xb5, 0x1c, 0xb5, 0x3b, - 0x47, 0xdd, 0xbb, 0x3e, 0xff, 0x4d, 0x39, 0xf9, 0x1d, 0x00, 0x00, 0xff, 0xff, 0xd4, 0x82, 0xb4, - 0x20, 0xb3, 0x08, 0x00, 0x00, + // 782 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x56, 0x5f, 0x4f, 0xdb, 0x3e, + 0x14, 0x6d, 0x9b, 0xa6, 0x7f, 0x6e, 0x49, 0x28, 0x06, 0x4a, 0x88, 0x7e, 0xfc, 0x80, 0xec, 0x8f, + 0xf6, 0x84, 0x10, 0x4c, 0x1a, 0x8f, 0x43, 0x30, 0x4d, 0x93, 0x98, 0x86, 0x40, 0x4c, 0xda, 0x63, + 0x48, 0x2c, 0x1a, 0xd1, 0x26, 0x59, 0xec, 0x30, 0xd0, 0xbe, 0xc5, 0x3e, 0xd9, 0x3e, 0xd2, 0x1c, + 0xdb, 0x71, 0x9a, 0x34, 0x19, 0x4f, 0x7b, 0xf4, 0xb5, 0x7d, 0xee, 0x3d, 0xe7, 0x5e, 0x9f, 0x04, + 0x86, 0x6e, 0x1c, 0x1c, 0xc4, 0x49, 0x44, 0x23, 0xa4, 0xd3, 0xa7, 0x18, 0x13, 0xe7, 0x27, 0x4c, + 0xce, 0x12, 0xec, 0x52, 0x7c, 0x16, 0x85, 0xd4, 0x0d, 0x42, 0x9c, 0x5c, 0xe1, 0xef, 0x29, 0x26, + 0x14, 0x01, 0x74, 0x02, 0xdf, 0x6a, 0xef, 0xb5, 0xdf, 0x0c, 0x11, 0x5b, 0xdc, 0xa6, 0xa1, 0x3f, + 0xc3, 0x97, 0x2e, 0x9d, 0x5a, 0x1d, 0x1e, 0x33, 0x40, 0x27, 0xd4, 0x0f, 0x42, 0x4b, 0xe3, 0x4b, + 0x13, 0x7a, 0x6c, 0x19, 0xa5, 0xd4, 0xea, 0x2e, 0xac, 0x71, 0x92, 0x58, 0x7a, 0x0e, 0xe1, 0x4d, + 0xb1, 0x77, 0x1f, 0x47, 0x41, 0x48, 0xad, 0x7e, 0x16, 0x73, 0xb6, 0x61, 0x6b, 0x29, 0x39, 0x89, + 0xa3, 0x90, 0x60, 0xe7, 0x04, 0x8c, 0xeb, 0xe0, 0x2e, 0x74, 0x67, 0x75, 0xe5, 0x8c, 0x40, 0x8b, + 0xd9, 0x22, 0xab, 0xc3, 0xe0, 0x89, 0xf8, 0x49, 0x5e, 0x88, 0xe1, 0x8c, 0xc1, 0xcc, 0x6f, 0x4a, + 0x2c, 0x0a, 0x6b, 0xa7, 0xbe, 0x7f, 0x99, 0x44, 0x1e, 0x26, 0xa4, 0x0e, 0x6f, 0x0c, 0x03, 0x8a, + 0x93, 0x79, 0x90, 0x81, 0x64, 0xa0, 0x03, 0xb4, 0x0d, 0xdd, 0x94, 0xe0, 0x84, 0x43, 0x8e, 0x8e, + 0x46, 0x07, 0x5c, 0xac, 0x83, 0x1b, 0x16, 0x42, 0x2b, 0xd0, 0x75, 0x93, 0x3b, 0xc2, 0x68, 0x6a, + 0xa2, 0x14, 0x1c, 0x3e, 0x30, 0x8e, 0x72, 0xe1, 0xfd, 0xf0, 0xad, 0x1e, 0x27, 0x77, 0x02, 0x5d, + 0x7e, 0x9e, 0x05, 0x53, 0x99, 0xc9, 0xc8, 0x16, 0x77, 0xaa, 0xf2, 0x09, 0x98, 0xae, 0xef, 0x07, + 0x34, 0x88, 0x58, 0xe2, 0x8f, 0x81, 0x4f, 0x58, 0x3a, 0x8d, 0x31, 0xd8, 0x07, 0xb4, 0x58, 0xaf, + 0x60, 0x91, 0x93, 0xe6, 0x38, 0xce, 0x85, 0x52, 0x4e, 0x69, 0x5a, 0x47, 0xec, 0x55, 0x49, 0xf4, + 0x0e, 0x27, 0xb3, 0x26, 0xc9, 0x14, 0x37, 0x1d, 0x1b, 0xac, 0x65, 0x34, 0x29, 0xde, 0x31, 0x6c, + 0x9d, 0xe3, 0x19, 0x7e, 0x2e, 0x13, 0x53, 0x25, 0x74, 0xe7, 0x58, 0xcc, 0x46, 0x06, 0xb8, 0x7c, + 0x49, 0x02, 0xbe, 0x80, 0xcd, 0x8b, 0x80, 0xd0, 0xbf, 0xc2, 0x39, 0xdf, 0x00, 0x8a, 0x03, 0x0a, + 0x5c, 0xa5, 0xc2, 0x8f, 0x01, 0x95, 0x9d, 0x62, 0xb2, 0x50, 0x2f, 0xe6, 0x8d, 0x1a, 0xa0, 0x75, + 0x18, 0xa5, 0x61, 0xf0, 0x78, 0x1d, 0x79, 0xf7, 0x98, 0x12, 0x3e, 0x89, 0x03, 0x3e, 0xa8, 0x53, + 0x3c, 0x9b, 0xf1, 0x41, 0x1c, 0x38, 0xef, 0x61, 0x52, 0xcd, 0x2f, 0x15, 0x7e, 0x0d, 0xa3, 0x42, + 0x2d, 0xc2, 0xb2, 0x69, 0xf5, 0x72, 0x99, 0xb0, 0x72, 0x4d, 0x99, 0x5a, 0xb2, 0x70, 0x67, 0x0f, + 0x4c, 0x35, 0xc0, 0x7c, 0x43, 0x0c, 0xbf, 0x4b, 0x53, 0x22, 0xe9, 0xdc, 0x43, 0x5f, 0xb6, 0xb3, + 0xd4, 0xc6, 0x7f, 0x33, 0x78, 0x33, 0x18, 0xaa, 0x72, 0x9a, 0x7b, 0x54, 0x79, 0xd3, 0xe2, 0x11, + 0xef, 0xc3, 0x30, 0x16, 0x75, 0x62, 0x91, 0x67, 0x74, 0x64, 0xca, 0x12, 0xf2, 0xfa, 0x0b, 0x6a, + 0xfc, 0x5d, 0xb3, 0xf9, 0xe8, 0x7f, 0x76, 0xbd, 0x29, 0x4b, 0x56, 0xcd, 0xe5, 0xc5, 0xec, 0x90, + 0x7a, 0xa3, 0x73, 0x3c, 0x8f, 0x92, 0x27, 0x9e, 0xa7, 0xeb, 0x7c, 0x65, 0xaf, 0x5b, 0x28, 0x28, + 0xa5, 0x7f, 0xc9, 0x06, 0x35, 0xaf, 0x39, 0x57, 0x7e, 0x9c, 0x2b, 0xaf, 0xc8, 0xec, 0x42, 0x7f, + 0x2e, 0x72, 0xc9, 0x59, 0xce, 0x8b, 0x93, 0x15, 0x38, 0xe7, 0x30, 0xb9, 0x89, 0xfd, 0xe7, 0xdc, + 0xac, 0x70, 0x8c, 0xc2, 0x41, 0x04, 0x25, 0x2d, 0xb7, 0xa5, 0x25, 0x14, 0x39, 0xbc, 0xab, 0x60, + 0x7c, 0x78, 0xc0, 0x6c, 0x3a, 0xf2, 0xde, 0xff, 0x6e, 0x83, 0xce, 0x23, 0x19, 0xe3, 0xac, 0x18, + 0x99, 0x43, 0xe4, 0xeb, 0x14, 0x56, 0xa8, 0xf0, 0x8d, 0x8a, 0xf2, 0xdd, 0x45, 0x4b, 0xd3, 0x2b, + 0x96, 0xd6, 0xe7, 0x6b, 0xc6, 0x5b, 0xb6, 0xc5, 0x1a, 0x94, 0x78, 0xe7, 0x4d, 0x29, 0xcb, 0x37, + 0x6c, 0x90, 0xaf, 0xec, 0x06, 0xd0, 0xe0, 0x06, 0x47, 0xbf, 0x74, 0xd0, 0x4e, 0x2f, 0x3f, 0xa1, + 0x2b, 0x58, 0xad, 0xb8, 0x33, 0xda, 0xc9, 0x4f, 0xd7, 0x7e, 0x32, 0xec, 0xff, 0x9b, 0xb6, 0xa5, + 0x7a, 0xad, 0x0c, 0xb3, 0x22, 0xad, 0xc2, 0xac, 0x6f, 0x9c, 0xc2, 0x6c, 0xea, 0x48, 0x0b, 0xbd, + 0x83, 0x9e, 0x30, 0x7c, 0xb4, 0x21, 0xcf, 0x96, 0xbe, 0x1c, 0xf6, 0x66, 0x25, 0xaa, 0x2e, 0x9e, + 0x01, 0x14, 0x3e, 0x8b, 0x2c, 0x79, 0x6c, 0xe9, 0x53, 0x61, 0x6f, 0xd7, 0xec, 0x28, 0x90, 0x1b, + 0x18, 0x57, 0xbd, 0x13, 0x55, 0x74, 0xa8, 0x3a, 0x9d, 0xbd, 0xdb, 0xb8, 0xbf, 0x08, 0x5b, 0x75, + 0x50, 0x05, 0xdb, 0xe0, 0xc7, 0x0a, 0xb6, 0xd1, 0x7a, 0x5b, 0xe8, 0x0b, 0x98, 0x65, 0xf3, 0x43, + 0xff, 0xc9, 0x4b, 0xb5, 0x9e, 0x6c, 0xef, 0x34, 0xec, 0x2a, 0xc0, 0xb7, 0xa0, 0x0b, 0xcb, 0x5b, + 0xcf, 0x55, 0x5e, 0x70, 0x46, 0x7b, 0xa3, 0x1c, 0x54, 0xb7, 0x0e, 0xa1, 0x27, 0x9e, 0x91, 0x6a, + 0x59, 0xe9, 0x55, 0xd9, 0x2b, 0x8b, 0x51, 0xa7, 0x75, 0xd8, 0xbe, 0xed, 0xf1, 0xbf, 0x96, 0xe3, + 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x36, 0x89, 0xf8, 0xbd, 0xc2, 0x08, 0x00, 0x00, } diff --git a/api/grpc/types/api.proto b/api/grpc/types/api.proto index 6c9faa1..8342e26 100644 --- a/api/grpc/types/api.proto +++ b/api/grpc/types/api.proto @@ -17,6 +17,7 @@ service API { message CreateContainerRequest { string id = 1; // ID of container string bundlePath = 2; // path to OCI bundle + string stdin = 3; // path to the file where stdin will be read (optional) string stdout = 4; // path to file where stdout will be written (optional) string stderr = 5; // path to file where stderr will be written (optional) string checkpoint = 7; // checkpoint name if you want to create immediate checkpoint (optional) From 92c0790899d3f3c4e0d465e6ea667bb93a272b95 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 14 Dec 2015 14:40:50 -0800 Subject: [PATCH 3/3] Add stdin support for client and daemon Signed-off-by: Michael Crosby --- add_process.go | 2 +- api/grpc/server/server.go | 1 + ctr/container.go | 31 +++++++++++++++++-------------- event.go | 1 + io.go | 27 ++++++++++++++++++--------- start.go | 1 + supervisor.go | 3 ++- worker.go | 3 ++- 8 files changed, 43 insertions(+), 26 deletions(-) diff --git a/add_process.go b/add_process.go index 84c0fb5..d458b0b 100644 --- a/add_process.go +++ b/add_process.go @@ -17,7 +17,7 @@ func (h *AddProcessEvent) Handle(e *Event) error { if err != nil { return err } - l, err := h.s.copyIO(e.Stdout, e.Stderr, io) + l, err := h.s.copyIO(e.Stdin, e.Stdout, e.Stderr, io) if err != nil { // log the error but continue with the other commands logrus.WithFields(logrus.Fields{ diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 102e886..3e08bad 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -35,6 +35,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine e.BundlePath = c.BundlePath e.Stdout = c.Stdout e.Stderr = c.Stderr + e.Stdin = c.Stdin if c.Checkpoint != "" { e.Checkpoint = &runtime.Checkpoint{ Name: c.Checkpoint, diff --git a/ctr/container.go b/ctr/container.go index 9958fc7..f931111 100644 --- a/ctr/container.go +++ b/ctr/container.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "os" "path/filepath" - "sync" "syscall" "text/tabwriter" @@ -89,9 +88,8 @@ var StartCommand = cli.Command{ BundlePath: path, Checkpoint: context.String("checkpoint"), } - wg := &sync.WaitGroup{} if context.Bool("interactive") { - if err := attachStdio(r, wg); err != nil { + if err := attachStdio(r); err != nil { fatal(err.Error(), 1) } } @@ -99,30 +97,38 @@ var StartCommand = cli.Command{ if _, err := c.CreateContainer(netcontext.Background(), r); err != nil { fatal(err.Error(), 1) } - wg.Wait() + if stdin != nil { + io.Copy(stdin, os.Stdin) + } }, } -func attachStdio(r *types.CreateContainerRequest, wg *sync.WaitGroup) error { +var stdin io.WriteCloser + +func attachStdio(r *types.CreateContainerRequest) error { dir, err := ioutil.TempDir("", "ctr-") if err != nil { return err } - wg.Add(2) for _, p := range []struct { path string flag int done func(f *os.File) }{ + { + path: filepath.Join(dir, "stdin"), + flag: syscall.O_RDWR, + done: func(f *os.File) { + r.Stdin = filepath.Join(dir, "stdin") + stdin = f + }, + }, { path: filepath.Join(dir, "stdout"), flag: syscall.O_RDWR, done: func(f *os.File) { r.Stdout = filepath.Join(dir, "stdout") - go func() { - io.Copy(os.Stdout, f) - wg.Done() - }() + go io.Copy(os.Stdout, f) }, }, { @@ -130,10 +136,7 @@ func attachStdio(r *types.CreateContainerRequest, wg *sync.WaitGroup) error { flag: syscall.O_RDWR, done: func(f *os.File) { r.Stderr = filepath.Join(dir, "stderr") - go func() { - io.Copy(os.Stderr, f) - wg.Done() - }() + go io.Copy(os.Stderr, f) }, }, } { diff --git a/event.go b/event.go index c2346cd..6a5c5ea 100644 --- a/event.go +++ b/event.go @@ -38,6 +38,7 @@ type Event struct { BundlePath string Stdout string Stderr string + Stdin string Pid int Status int Signal os.Signal diff --git a/io.go b/io.go index 221dc95..0c4a0d0 100644 --- a/io.go +++ b/io.go @@ -8,20 +8,31 @@ import ( type ioConfig struct { StdoutPath string StderrPath string - Stdin io.WriteCloser - Stdout io.ReadCloser - Stderr io.ReadCloser + StdinPath string + + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser } func newCopier(i *ioConfig) (*copier, error) { l := &copier{ config: i, } + if i.StdinPath != "" { + f, err := os.OpenFile(i.StdinPath, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + l.closers = append(l.closers, f) + go io.Copy(i.Stdin, f) + } if i.StdoutPath != "" { f, err := os.OpenFile(i.StdoutPath, os.O_RDWR, 0) if err != nil { return nil, err } + l.closers = append(l.closers, f) go io.Copy(f, i.Stdout) } if i.StderrPath != "" { @@ -29,21 +40,19 @@ func newCopier(i *ioConfig) (*copier, error) { if err != nil { return nil, err } + l.closers = append(l.closers, f) go io.Copy(f, i.Stderr) } return l, nil } type copier struct { - config *ioConfig + config *ioConfig + closers []io.Closer } func (l *copier) Close() (err error) { - for _, c := range []io.Closer{ - l.config.Stdin, - l.config.Stdout, - l.config.Stderr, - } { + for _, c := range append(l.closers, l.config.Stdin, l.config.Stdout, l.config.Stderr) { if cerr := c.Close(); err == nil { err = cerr } diff --git a/start.go b/start.go index 94a8da7..1f9ec8b 100644 --- a/start.go +++ b/start.go @@ -18,6 +18,7 @@ func (h *StartEvent) Handle(e *Event) error { Err: e.Err, IO: io, Container: container, + Stdin: e.Stdin, Stdout: e.Stdout, Stderr: e.Stderr, } diff --git a/supervisor.go b/supervisor.go index 8988235..c186c6d 100644 --- a/supervisor.go +++ b/supervisor.go @@ -222,13 +222,14 @@ func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } -func (s *Supervisor) copyIO(stdout, stderr string, i *runtime.IO) (*copier, error) { +func (s *Supervisor) copyIO(stdin, stdout, stderr string, i *runtime.IO) (*copier, error) { config := &ioConfig{ Stdin: i.Stdin, Stdout: i.Stdout, Stderr: i.Stderr, StdoutPath: stdout, StderrPath: stderr, + StdinPath: stdin, } l, err := newCopier(config) if err != nil { diff --git a/worker.go b/worker.go index c7d4dee..eb15c69 100644 --- a/worker.go +++ b/worker.go @@ -15,6 +15,7 @@ type StartTask struct { Container runtime.Container Checkpoint string IO *runtime.IO + Stdin string Stdout string Stderr string Err chan error @@ -36,7 +37,7 @@ func (w *worker) Start() { defer w.wg.Done() for t := range w.s.tasks { started := time.Now() - l, err := w.s.copyIO(t.Stdout, t.Stderr, t.IO) + l, err := w.s.copyIO(t.Stdin, t.Stdout, t.Stderr, t.IO) if err != nil { evt := NewEvent(DeleteEventType) evt.ID = t.Container.ID()