diff --git a/beam/beam.go b/beam/beam.go index fab5f62..b1e4667 100644 --- a/beam/beam.go +++ b/beam/beam.go @@ -24,6 +24,11 @@ type SendCloser interface { Close() error } +type ReceiveSender interface { + Receiver + Sender +} + func SendPipe(dst Sender, data []byte) (*os.File, error) { r, w, err := os.Pipe() if err != nil { @@ -111,3 +116,20 @@ func MsgDesc(payload []byte, attachment *os.File) string { } return fmt.Sprintf("'%s'[%s]", payload, filedesc) } + +type devnull struct{} + +func Devnull() ReceiveSender { + return devnull{} +} + +func (d devnull) Send(p []byte, a *os.File) error { + if a != nil { + a.Close() + } + return nil +} + +func (d devnull) Receive() ([]byte, *os.File, error) { + return nil, nil, io.EOF +} diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index af20916..c444d0a 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -23,9 +23,19 @@ var rootPlugins = []string{ var ( flX bool + flPing bool + introspect beam.ReceiveSender = beam.Devnull() ) func main() { + fd3 := os.NewFile(3, "beam-introspect") + if introsp, err := beam.FileConn(fd3); err == nil { + introspect = introsp + Logf("introspection enabled\n") + } else { + Logf("introspection disabled\n") + } + fd3.Close() flag.BoolVar(&flX, "x", false, "print commands as they are being executed") flag.Parse() if flag.NArg() == 0{ @@ -33,7 +43,7 @@ func main() { // No arguments, stdin is terminal --> interactive mode input := bufio.NewScanner(os.Stdin) for { - os.Stdout.Write([]byte("beamsh> ")) + fmt.Printf("[%d] beamsh> ", os.Getpid()) if !input.Scan() { break } @@ -103,11 +113,34 @@ func executeRootScript(script []*dockerscript.Command) error { lastCmd.Children = script script = []*dockerscript.Command{rootCmd} } - handlers, err := Handlers() + handlers, err := Handlers(introspect) if err != nil { return err } defer handlers.Close() + var tasks sync.WaitGroup + defer func() { + Debugf("Waiting for introspection...\n") + tasks.Wait() + Debugf("DONE Waiting for introspection\n") + }() + if introspect != nil { + tasks.Add(1) + go func() { + Debugf("starting introspection\n") + defer Debugf("done with introspection\n") + defer tasks.Done() + introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil) + Debugf("XXX starting reading introspection messages\n") + r := beam.NewRouter(handlers) + r.NewRoute().All().Handler(func(p []byte, a *os.File) error { + Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a)) + return handlers.Send(p, a) + }) + n, err := beam.Copy(r, introspect) + Debugf("XXX done reading %d introspection messages: %v\n", n, err) + }() + } if err := executeScript(handlers, script); err != nil { return err } @@ -188,7 +221,7 @@ func executeCommand(out beam.Sender, cmd *dockerscript.Command) error { type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender) -func Handlers() (*beam.UnixConn, error) { +func Handlers(sink beam.Sender) (*beam.UnixConn, error) { var tasks sync.WaitGroup pub, priv, err := beam.USocketPair() if err != nil { @@ -202,15 +235,16 @@ func Handlers() (*beam.UnixConn, error) { priv.CloseWrite() Debugf("[handlers] done closewrite() on endpoint\n") }() - for { - Debugf("[handlers] waiting for next job...\n") - payload, conn, err := beam.ReceiveConn(priv) - Debugf("[handlers] ReceiveConn() returned %v\n", err) + r := beam.NewRouter(sink) + r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error { + conn, err := beam.FileConn(attachment) if err != nil { - return + attachment.Close() + return err } + // attachment.Close() tasks.Add(1) - go func(payload []byte, conn *beam.UnixConn) { + go func() { defer tasks.Done() defer func() { Debugf("[handlers] '%s' closewrite\n", payload) @@ -239,8 +273,10 @@ func Handlers() (*beam.UnixConn, error) { Debugf("[handlers] calling %s\n", strings.Join(cmd, " ")) handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn)) Debugf("[handlers] returned: %s\n", strings.Join(cmd, " ")) - }(payload, conn) - } + }() + return nil + }) + beam.Copy(r, priv) Debugf("[handlers] waiting for all tasks\n") tasks.Wait() Debugf("[handlers] all tasks returned\n") @@ -285,6 +321,8 @@ func GetHandler(name string) Handler { return CmdConnect } else if name == "openfile" { return CmdOpenfile + } else if name == "spawn" { + return CmdSpawn } return nil } diff --git a/beam/examples/beamsh/builtins.go b/beam/examples/beamsh/builtins.go index da767f1..d4e084d 100644 --- a/beam/examples/beamsh/builtins.go +++ b/beam/examples/beamsh/builtins.go @@ -6,6 +6,7 @@ import ( "github.com/dotcloud/docker/pkg/beam" "github.com/dotcloud/docker/pkg/beam/data" "github.com/dotcloud/docker/pkg/term" + "github.com/dotcloud/docker/utils" "text/template" "fmt" "sync" @@ -167,6 +168,27 @@ func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam } } +func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + c := exec.Command(utils.SelfPath()) + r, w, err := os.Pipe() + if err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + c.Stdin = r + c.Stdout = stdout + c.Stderr = stderr + go func() { + fmt.Fprintf(w, strings.Join(args[1:], " ")) + w.Sync() + w.Close() + }() + if err := c.Run(); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } +} + func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { os.Chdir(args[1]) GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out) @@ -176,8 +198,46 @@ func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam cmd := exec.Command(args[1], args[2:]...) cmd.Stdout = stdout cmd.Stderr = stderr - cmd.Stdin = os.Stdin + //cmd.Stdin = os.Stdin + local, remote, err := beam.SocketPair() + if err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + child, err := beam.FileConn(local) + if err != nil { + local.Close() + remote.Close() + fmt.Fprintf(stderr, "%v\n", err) + return + } + local.Close() + cmd.ExtraFiles = append(cmd.ExtraFiles, remote) + + var tasks sync.WaitGroup + tasks.Add(1) + go func() { + defer Debugf("done copying to child\n") + defer tasks.Done() + defer child.CloseWrite() + beam.Copy(child, in) + }() + + tasks.Add(1) + go func() { + defer Debugf("done copying from child %d\n") + defer tasks.Done() + r := beam.NewRouter(out) + r.NewRoute().All().Handler(func(p []byte, a *os.File) error { + return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a) + }) + beam.Copy(r, child) + }() execErr := cmd.Run() + // We can close both ends of the socket without worrying about data stuck in the buffer, + // because unix socket writes are fully synchronous. + child.Close() + tasks.Wait() var status string if execErr != nil { status = execErr.Error() @@ -190,7 +250,11 @@ func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { r := beam.NewRouter(out) r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error { - fmt.Printf("===> %s\n", beam.MsgDesc(payload, attachment)) + var sfd string = "nil" + if attachment != nil { + sfd = fmt.Sprintf("%d", attachment.Fd()) + } + fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd) out.Send(payload, attachment) return nil }) diff --git a/beam/router.go b/beam/router.go index e19ec9d..abdfe61 100644 --- a/beam/router.go +++ b/beam/router.go @@ -27,7 +27,7 @@ func (r *Router) Send(payload []byte, attachment *os.File) (err error) { } } if r.sink != nil { - //fmt.Printf("[Router.Send] no match. sending to sink\n") + // fmt.Printf("[%d] [Router.Send] no match. sending %s to sink %#v\n", os.Getpid(), MsgDesc(payload, attachment), r.sink) return r.sink.Send(payload, attachment) } //fmt.Printf("[Router.Send] no match. return error.\n") @@ -101,9 +101,17 @@ func (route *Route) Tee(dst Sender) *Route { return route } +func (r *Route) Filter(f func([]byte, *os.File) bool) *Route { + r.rules = append(r.rules, f) + return r +} + func (r *Route) KeyStartsWith(k string, beginning ...string) *Route { r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { values := data.Message(payload).Get(k) + if values == nil { + return false + } if len(values) < len(beginning) { return false } @@ -117,6 +125,7 @@ func (r *Route) KeyStartsWith(k string, beginning ...string) *Route { return r } + func (r *Route) KeyEquals(k string, full...string) *Route { r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { values := data.Message(payload).Get(k) @@ -152,6 +161,13 @@ func (r *Route) NoKey(k string) *Route { return r } +func (r *Route) KeyExists(k string) *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + return data.Message(payload).Get(k) != nil + }) + return r +} + func (r *Route) Passthrough(dst Sender) *Route { r.handler = func(payload []byte, attachment *os.File) error { return dst.Send(payload, attachment)