From f295ccc72ac0989e8defa670ebd005ef458fc4ae Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 25 Mar 2014 18:11:06 -0700 Subject: [PATCH] beam/examples/beamsh: basic implementation of the pipeline design, with static handlers for now. In the pipeline design, several beam commands can be run concurrently, with their respective inputs and outputs connected in such a way that beam messages flow from the first to last. This is similar to the way a unix shell executes commands in a pipeline: instead of STDIN and STDOUT, each beam command has a "BEAMIN" and "BEAMOUT". Since beam allows for richer communication than plain byte streams, beam pipelines can express more powerful computation, while retaining the fundamental elegance and ease of use of unix-style composition. Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- beam/examples/beamsh/beamsh.go | 431 +++++++++++++-------------------- 1 file changed, 168 insertions(+), 263 deletions(-) diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index 207bc3e..d4f9ac8 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -4,7 +4,6 @@ import ( "io" "fmt" "os" - "os/exec" "github.com/dotcloud/docker/pkg/dockerscript" "github.com/dotcloud/docker/pkg/beam" "github.com/dotcloud/docker/pkg/beam/data" @@ -14,20 +13,16 @@ import ( "net" "path" "bufio" - "strconv" + "crypto/rand" + "encoding/hex" ) func main() { - client, engine, err := beam.USocketPair() + devnull, err := Devnull() if err != nil { Fatal(err) } - defer client.Close() - go func() { - Serve(engine, builtinsHandler) - Debugf("Shutting down engine\n") - engine.Close() - }() + defer devnull.Close() if term.IsTerminal(0) { input := bufio.NewScanner(os.Stdin) for { @@ -42,7 +37,7 @@ func main() { fmt.Fprintf(os.Stderr, "error: %v\n", err) continue } - executeScript(client, cmd) + executeScript(devnull, cmd) } if err := input.Err(); err == io.EOF { break @@ -55,84 +50,188 @@ func main() { if err != nil { Fatal("parse error: %v\n", err) } - executeScript(client, script) + executeScript(devnull, script) } } -func executeScript(client *net.UnixConn, script []*dockerscript.Command) { - Debugf("%d commands:\n", len(script)) - for _, cmd := range script { - job, err := beam.SendPipe(client, data.Empty().Set("cmd", cmd.Args...).Bytes()) - if err != nil { - Fatal(err) +func beamCopy(dst *net.UnixConn, src *net.UnixConn) error { + for { + payload, attachment, err := beam.Receive(src) + if err == io.EOF { + return nil + } else if err != nil { + return err + } + if err := beam.Send(dst, payload, attachment); err != nil { + if attachment != nil { + attachment.Close() + } + return err } - // Recursively execute child-commands as commands to the new job - // executeScript blocks until commands are done, so this is depth-first recursion. - executeScript(job, cmd.Children) - // TODO: pass a default handler to deal with 'status' - // --> use beam chaining? - Debugf("[%s] Listening for reply messages\n", strings.Join(cmd.Args, " ")) - Serve(job, builtinsHandler) - Debugf("[%s] done listening for reply messages\n", strings.Join(cmd.Args, " ")) } + panic("impossibru!") + return nil } -func parseMsgPayload(payload []byte) ([]string, error) { - msg, err := data.Decode(string(payload)) +type Handler func([]string, *net.UnixConn, *net.UnixConn) + +func Devnull() (*net.UnixConn, error) { + priv, pub, err := beam.USocketPair() if err != nil { return nil, err } - var cmd []string - if c, exists := msg["cmd"]; exists { - cmd = c - } - if len(cmd) == 0 { - return nil, fmt.Errorf("empty command") - } - return cmd, nil -} - -func CmdCat(args []string, f *os.File) { - for _, name := range args[1:] { - f, err := os.Open(name) - if err != nil { - continue + go func() { + defer priv.Close() + for { + payload, attachment, err := beam.Receive(priv) + if err != nil { + return + } + fmt.Fprintf(os.Stderr, "[devnull] discarding '%s'\n", payload) + if attachment != nil { + attachment.Close() + } } - io.Copy(os.Stdout, f) - f.Close() - } + }() + return pub, nil } -func CmdEcho(args []string, f *os.File) { - resp, err := beam.FdConn(int(f.Fd())) - if err != nil { - Fatal(err) - return +func scriptString(script []*dockerscript.Command) string { + lines := make([]string, 0, len(script)) + for _, cmd := range script { + line := strings.Join(cmd.Args, " ") + if len(cmd.Children) > 0 { + line += fmt.Sprintf(" { %s }", scriptString(cmd.Children)) + } else { + line += " {}" + } + lines = append(lines, line) } - defer resp.Close() - r, w, err := os.Pipe() - if err != nil { - return - } - Debugf("[CmdEcho] stdout pipe() r=%d w=%d\n", r.Fd(), w.Fd()) - if err := beam.Send(resp, data.Empty().Set("cmd", "log", "stdout").Bytes(), r); err != nil { - return - } - fmt.Fprintln(w, strings.Join(args[1:], " ")) - w.Close() + return fmt.Sprintf("'%s'", strings.Join(lines, "; ")) } -func CmdExit(args []string, f *os.File) { - var status int - if len(args) > 1 { - val, err := strconv.ParseInt(args[1], 10, 32) - if err == nil { - status = int(val) +func executeScript(client *net.UnixConn, script []*dockerscript.Command) error { + Debugf("executeScript(%s)\n", scriptString(script)) + defer Debugf("executeScript(%s) DONE\n", scriptString(script)) + for _, cmd := range script { + if err := executeCommand(client, cmd); err != nil { + return err } } - os.Exit(status) + return nil } +// 1) Find a handler for the command (if no handler, fail) +// 2) Attach new in & out pair to the handler +// 3) [in the background] Copy handler output to our own output +// 4) [in the background] Run the handler +// 5) Recursively executeScript() all children commands and wait for them to complete +// 6) Wait for handler to return and (shortly afterwards) output copy to complete +// 7) +func executeCommand(client *net.UnixConn, cmd *dockerscript.Command) error { + Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " ")) + defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " ")) + handler := GetHandler(cmd.Args[0]) + if handler == nil { + return fmt.Errorf("no such command: %s", cmd.Args[0]) + } + inPub, inPriv, err := beam.USocketPair() + if err != nil { + return err + } + // Don't close inPub here. We close it to signify the end of input once + // all children are completed (guaranteeing that no more input will be sent + // by children). + // Otherwise we get a deadlock. + defer inPriv.Close() + outPub, outPriv, err := beam.USocketPair() + if err != nil { + return err + } + defer outPub.Close() + // don't close outPriv here. It must be closed after the handler is called, + // but before the copy tasks associated with it completes. + // Otherwise we get a deadlock. + var tasks sync.WaitGroup + tasks.Add(2) + go func() { + handler(cmd.Args, inPriv, outPriv) + // FIXME: do we need to outPriv.sync before closing it? + Debugf("[%s] handler returned, closing output\n", strings.Join(cmd.Args, " ")) + outPriv.Close() + tasks.Done() + }() + go func() { + Debugf("[%s] copy start...\n", strings.Join(cmd.Args, " ")) + beamCopy(client, outPub) + Debugf("[%s] copy done\n", strings.Join(cmd.Args, " ")) + tasks.Done() + }() + // depth-first execution of children commands + // executeScript() blocks until all commands are completed + executeScript(inPub, cmd.Children) + inPub.Close() + Debugf("[%s] waiting for handler and output copy to complete...\n", strings.Join(cmd.Args, " ")) + tasks.Wait() + Debugf("[%s] handler and output copy complete!\n", strings.Join(cmd.Args, " ")) + return nil +} + +func randomId() string { + id := make([]byte, 4) + io.ReadFull(rand.Reader, id) + return hex.EncodeToString(id) +} + +func GetHandler(name string) Handler { + if name == "trace" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + for { + p, a, err := beam.Receive(in) + if err != nil { + return + } + fd := -1 + if a != nil { + fd = int(a.Fd()) + } + fmt.Printf("===> [TRACE] %s [%d]\n", p, fd) + beam.Send(out, p, a) + } + } + } else if name == "emit" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + beam.Send(out, data.Empty().Set("foo", args[1:]...).Bytes(), nil) + } + } else if name == "print" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + for { + _, a, err := beam.Receive(in) + if err != nil { + return + } + if a != nil { + io.Copy(os.Stdout, a) + } + } + } + } else if name == "openfile" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + for _, name := range args { + f, err := os.Open(name) + if err != nil { + continue + } + if err := beam.Send(out, data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil { + f.Close() + } + } + } + } + return nil +} + + // 'status' is a notification of a job's status. // func parseEnv(args []string) ([]string, map[string]string) { @@ -154,205 +253,11 @@ func parseEnv(args []string) ([]string, map[string]string) { return argsOut, env } -func CmdTrace(args []string, f *os.File) { - resp, err := beam.FdConn(int(f.Fd())) - if err != nil { - Fatal(err) - return - } - defer resp.Close() - for { - payload, attachment, err := beam.Receive(resp) - if err != nil { - Logf("[trace] error waiting for message\n") - return - } - msg, err := data.Decode(string(payload)) - if err != nil { - fmt.Printf("===> %s\n", payload) - } else { - fmt.Printf("===> %v\n", msg) - } - if err := beam.Send(resp, payload, attachment); err != nil { - return - } - } -} - - -func CmdExec(args []string, f *os.File) { - resp, err := beam.FdConn(int(f.Fd())) - if err != nil { - Fatal(err) - return - } - defer resp.Close() - cmd := exec.Command(args[1], args[2:]...) - Logf("EXEC %s %s\n", cmd.Path, cmd.Args) - stdoutR, stdoutW, err := os.Pipe() - if err != nil { - Fatal(err) - return - } - cmd.Stdout = stdoutW - stderrR, stderrW, err := os.Pipe() - if err != nil { - Fatal(err) - return - } - cmd.Stderr = stderrW - if err := beam.Send(resp, data.Empty().Set("cmd", "log", "stdout").Bytes(), stdoutR); err != nil { - Fatal(err) - } - if err := beam.Send(resp, data.Empty().Set("cmd", "log", "stderr").Bytes(), stderrR); err != nil { - Fatal(err) - } - cmd.Run() -} - -func CmdLog(args []string, f *os.File) { - defer Debugf("CmdLog done\n") - var name string - if len(args) > 0 { - name = args[1] - } - input := bufio.NewScanner(f) - for input.Scan() { - line := input.Text() - if len(line) > 0 { - fmt.Printf("[%s] %s\n", name, line) - } - if err := input.Err(); err != nil { - fmt.Printf("[%s:%s]\n", name, err) - break - } - } -} - type Msg struct { payload []byte attachment *os.File } -func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error { - Debugf("[Serve %#v]\n", handler) - defer Debugf("[Serve %#v] done\n", handler) - var tasks sync.WaitGroup - defer tasks.Wait() - in := make(chan *Msg) - go func() { - for { - Debugf("[Serve] waiting for next message on endpoint...\n") - payload, attachment, err := beam.Receive(endpoint) - if err != nil { - break - } - in<-&Msg{payload, attachment} - } - Debugf("[Serve] endpoint closed. Waiting for tasks to complete\n") - tasks.Wait() - Debugf("[Serve] endpoint closed AND tasks complete\n") - close(in) - }() - for msg := range in { - tasks.Add(1) - // Handle new message - go func(payload []byte, attachment *os.File) { - Debugf("---> Handling '%s' [fd=%d]\n", payload, attachment.Fd()) - defer tasks.Done() - args, err := parseMsgPayload(payload) - if err != nil { - Logf("error parsing beam message: %s\n", err) - if attachment != nil { - attachment.Close() - } - return - } - Debugf("---> calling handler for '%s'\n", args[0]) - handlerAttachment := attachment - var iotasks sync.WaitGroup - if attachment != nil { - if caller, err := beam.FdConn(int(attachment.Fd())); err == nil { - Debugf("[Serve] message '%s' has a valid beam endpoint as attachment. Setting up indirection\n", payload) - defer caller.Close() - jobpub, jobpriv, err := beam.USocketPair() - if err != nil { - return - } - defer jobpub.Close() - if f, err := jobpriv.File(); err != nil { - jobpriv.Close() - return - } else { - handlerAttachment = f - defer attachment.Close() - } - jobpriv.Close() - // Read messages from the job and re-insert them for handling - iotasks.Add(1) - go func(job *net.UnixConn) { - defer iotasks.Done() - for { - payload, attachment, err := beam.Receive(job) - if err != nil { - return - } - var fd int = -1 - if attachment != nil { - fd = int(attachment.Fd()) - } - Debugf("[Serve] received introspection message '%s'[%d]\n", payload, fd) - // Send messages back in for introspection - // Note that we don't scope introspection: jobs have full access to the - // context in which they were called. - in <-&Msg{payload, attachment} - } - }(jobpub) - // Read messages from the caller to the job - go func(caller *net.UnixConn, job *net.UnixConn) { - for { - payload, f, err := beam.Receive(caller) - if err != nil { - return - } - if err := beam.Send(job, payload, f); err != nil { - return - } - } - }(caller, jobpub) - } - } - handler(args, handlerAttachment) - Debugf("---> handler returned for '%s'\n", args[0]) - if handlerAttachment != nil { - handlerAttachment.Close() - } - Debugf("---> waiting for iotasks to complete for '%s'\n", args[0]) - iotasks.Wait() - Debugf("---> iotasks complete for '%s'\n", args[0]) - }(msg.payload, msg.attachment) - } - Debugf("[Serve] main serve loop completed\n") - return nil -} - -func builtinsHandler(args []string, attachment *os.File) { - if args[0] == "exit" { - CmdExit(args, attachment) - } else if args[0] == "cat" { - CmdCat(args, attachment) - } else if args[0] == "echo" { - CmdEcho(args, attachment) - } else if args[0] == "log" { - CmdLog(args, attachment) - } else if args[0] == "trace" { - CmdTrace(args, attachment) - } else if args[0] == "exec" { - CmdExec(args, attachment) - } -} - - func Logf(msg string, args ...interface{}) (int, error) { if len(msg) == 0 || msg[len(msg) - 1] != '\n' { msg = msg + "\n" @@ -362,7 +267,7 @@ func Logf(msg string, args ...interface{}) (int, error) { } func Debugf(msg string, args ...interface{}) { - if os.Getenv("DEBUG") != "" { + if os.Getenv("BEAMDEBUG") != "" { Logf(msg, args...) } }