diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index 207bc3e..d4f9ac8 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -4,7 +4,6 @@ import ( "io" "fmt" "os" - "os/exec" "github.com/dotcloud/docker/pkg/dockerscript" "github.com/dotcloud/docker/pkg/beam" "github.com/dotcloud/docker/pkg/beam/data" @@ -14,20 +13,16 @@ import ( "net" "path" "bufio" - "strconv" + "crypto/rand" + "encoding/hex" ) func main() { - client, engine, err := beam.USocketPair() + devnull, err := Devnull() if err != nil { Fatal(err) } - defer client.Close() - go func() { - Serve(engine, builtinsHandler) - Debugf("Shutting down engine\n") - engine.Close() - }() + defer devnull.Close() if term.IsTerminal(0) { input := bufio.NewScanner(os.Stdin) for { @@ -42,7 +37,7 @@ func main() { fmt.Fprintf(os.Stderr, "error: %v\n", err) continue } - executeScript(client, cmd) + executeScript(devnull, cmd) } if err := input.Err(); err == io.EOF { break @@ -55,84 +50,188 @@ func main() { if err != nil { Fatal("parse error: %v\n", err) } - executeScript(client, script) + executeScript(devnull, script) } } -func executeScript(client *net.UnixConn, script []*dockerscript.Command) { - Debugf("%d commands:\n", len(script)) - for _, cmd := range script { - job, err := beam.SendPipe(client, data.Empty().Set("cmd", cmd.Args...).Bytes()) - if err != nil { - Fatal(err) +func beamCopy(dst *net.UnixConn, src *net.UnixConn) error { + for { + payload, attachment, err := beam.Receive(src) + if err == io.EOF { + return nil + } else if err != nil { + return err + } + if err := beam.Send(dst, payload, attachment); err != nil { + if attachment != nil { + attachment.Close() + } + return err } - // Recursively execute child-commands as commands to the new job - // executeScript blocks until commands are done, so this is depth-first recursion. - executeScript(job, cmd.Children) - // TODO: pass a default handler to deal with 'status' - // --> use beam chaining? - Debugf("[%s] Listening for reply messages\n", strings.Join(cmd.Args, " ")) - Serve(job, builtinsHandler) - Debugf("[%s] done listening for reply messages\n", strings.Join(cmd.Args, " ")) } + panic("impossibru!") + return nil } -func parseMsgPayload(payload []byte) ([]string, error) { - msg, err := data.Decode(string(payload)) +type Handler func([]string, *net.UnixConn, *net.UnixConn) + +func Devnull() (*net.UnixConn, error) { + priv, pub, err := beam.USocketPair() if err != nil { return nil, err } - var cmd []string - if c, exists := msg["cmd"]; exists { - cmd = c - } - if len(cmd) == 0 { - return nil, fmt.Errorf("empty command") - } - return cmd, nil -} - -func CmdCat(args []string, f *os.File) { - for _, name := range args[1:] { - f, err := os.Open(name) - if err != nil { - continue + go func() { + defer priv.Close() + for { + payload, attachment, err := beam.Receive(priv) + if err != nil { + return + } + fmt.Fprintf(os.Stderr, "[devnull] discarding '%s'\n", payload) + if attachment != nil { + attachment.Close() + } } - io.Copy(os.Stdout, f) - f.Close() - } + }() + return pub, nil } -func CmdEcho(args []string, f *os.File) { - resp, err := beam.FdConn(int(f.Fd())) - if err != nil { - Fatal(err) - return +func scriptString(script []*dockerscript.Command) string { + lines := make([]string, 0, len(script)) + for _, cmd := range script { + line := strings.Join(cmd.Args, " ") + if len(cmd.Children) > 0 { + line += fmt.Sprintf(" { %s }", scriptString(cmd.Children)) + } else { + line += " {}" + } + lines = append(lines, line) } - defer resp.Close() - r, w, err := os.Pipe() - if err != nil { - return - } - Debugf("[CmdEcho] stdout pipe() r=%d w=%d\n", r.Fd(), w.Fd()) - if err := beam.Send(resp, data.Empty().Set("cmd", "log", "stdout").Bytes(), r); err != nil { - return - } - fmt.Fprintln(w, strings.Join(args[1:], " ")) - w.Close() + return fmt.Sprintf("'%s'", strings.Join(lines, "; ")) } -func CmdExit(args []string, f *os.File) { - var status int - if len(args) > 1 { - val, err := strconv.ParseInt(args[1], 10, 32) - if err == nil { - status = int(val) +func executeScript(client *net.UnixConn, script []*dockerscript.Command) error { + Debugf("executeScript(%s)\n", scriptString(script)) + defer Debugf("executeScript(%s) DONE\n", scriptString(script)) + for _, cmd := range script { + if err := executeCommand(client, cmd); err != nil { + return err } } - os.Exit(status) + return nil } +// 1) Find a handler for the command (if no handler, fail) +// 2) Attach new in & out pair to the handler +// 3) [in the background] Copy handler output to our own output +// 4) [in the background] Run the handler +// 5) Recursively executeScript() all children commands and wait for them to complete +// 6) Wait for handler to return and (shortly afterwards) output copy to complete +// 7) +func executeCommand(client *net.UnixConn, cmd *dockerscript.Command) error { + Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " ")) + defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " ")) + handler := GetHandler(cmd.Args[0]) + if handler == nil { + return fmt.Errorf("no such command: %s", cmd.Args[0]) + } + inPub, inPriv, err := beam.USocketPair() + if err != nil { + return err + } + // Don't close inPub here. We close it to signify the end of input once + // all children are completed (guaranteeing that no more input will be sent + // by children). + // Otherwise we get a deadlock. + defer inPriv.Close() + outPub, outPriv, err := beam.USocketPair() + if err != nil { + return err + } + defer outPub.Close() + // don't close outPriv here. It must be closed after the handler is called, + // but before the copy tasks associated with it completes. + // Otherwise we get a deadlock. + var tasks sync.WaitGroup + tasks.Add(2) + go func() { + handler(cmd.Args, inPriv, outPriv) + // FIXME: do we need to outPriv.sync before closing it? + Debugf("[%s] handler returned, closing output\n", strings.Join(cmd.Args, " ")) + outPriv.Close() + tasks.Done() + }() + go func() { + Debugf("[%s] copy start...\n", strings.Join(cmd.Args, " ")) + beamCopy(client, outPub) + Debugf("[%s] copy done\n", strings.Join(cmd.Args, " ")) + tasks.Done() + }() + // depth-first execution of children commands + // executeScript() blocks until all commands are completed + executeScript(inPub, cmd.Children) + inPub.Close() + Debugf("[%s] waiting for handler and output copy to complete...\n", strings.Join(cmd.Args, " ")) + tasks.Wait() + Debugf("[%s] handler and output copy complete!\n", strings.Join(cmd.Args, " ")) + return nil +} + +func randomId() string { + id := make([]byte, 4) + io.ReadFull(rand.Reader, id) + return hex.EncodeToString(id) +} + +func GetHandler(name string) Handler { + if name == "trace" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + for { + p, a, err := beam.Receive(in) + if err != nil { + return + } + fd := -1 + if a != nil { + fd = int(a.Fd()) + } + fmt.Printf("===> [TRACE] %s [%d]\n", p, fd) + beam.Send(out, p, a) + } + } + } else if name == "emit" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + beam.Send(out, data.Empty().Set("foo", args[1:]...).Bytes(), nil) + } + } else if name == "print" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + for { + _, a, err := beam.Receive(in) + if err != nil { + return + } + if a != nil { + io.Copy(os.Stdout, a) + } + } + } + } else if name == "openfile" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + for _, name := range args { + f, err := os.Open(name) + if err != nil { + continue + } + if err := beam.Send(out, data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil { + f.Close() + } + } + } + } + return nil +} + + // 'status' is a notification of a job's status. // func parseEnv(args []string) ([]string, map[string]string) { @@ -154,205 +253,11 @@ func parseEnv(args []string) ([]string, map[string]string) { return argsOut, env } -func CmdTrace(args []string, f *os.File) { - resp, err := beam.FdConn(int(f.Fd())) - if err != nil { - Fatal(err) - return - } - defer resp.Close() - for { - payload, attachment, err := beam.Receive(resp) - if err != nil { - Logf("[trace] error waiting for message\n") - return - } - msg, err := data.Decode(string(payload)) - if err != nil { - fmt.Printf("===> %s\n", payload) - } else { - fmt.Printf("===> %v\n", msg) - } - if err := beam.Send(resp, payload, attachment); err != nil { - return - } - } -} - - -func CmdExec(args []string, f *os.File) { - resp, err := beam.FdConn(int(f.Fd())) - if err != nil { - Fatal(err) - return - } - defer resp.Close() - cmd := exec.Command(args[1], args[2:]...) - Logf("EXEC %s %s\n", cmd.Path, cmd.Args) - stdoutR, stdoutW, err := os.Pipe() - if err != nil { - Fatal(err) - return - } - cmd.Stdout = stdoutW - stderrR, stderrW, err := os.Pipe() - if err != nil { - Fatal(err) - return - } - cmd.Stderr = stderrW - if err := beam.Send(resp, data.Empty().Set("cmd", "log", "stdout").Bytes(), stdoutR); err != nil { - Fatal(err) - } - if err := beam.Send(resp, data.Empty().Set("cmd", "log", "stderr").Bytes(), stderrR); err != nil { - Fatal(err) - } - cmd.Run() -} - -func CmdLog(args []string, f *os.File) { - defer Debugf("CmdLog done\n") - var name string - if len(args) > 0 { - name = args[1] - } - input := bufio.NewScanner(f) - for input.Scan() { - line := input.Text() - if len(line) > 0 { - fmt.Printf("[%s] %s\n", name, line) - } - if err := input.Err(); err != nil { - fmt.Printf("[%s:%s]\n", name, err) - break - } - } -} - type Msg struct { payload []byte attachment *os.File } -func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error { - Debugf("[Serve %#v]\n", handler) - defer Debugf("[Serve %#v] done\n", handler) - var tasks sync.WaitGroup - defer tasks.Wait() - in := make(chan *Msg) - go func() { - for { - Debugf("[Serve] waiting for next message on endpoint...\n") - payload, attachment, err := beam.Receive(endpoint) - if err != nil { - break - } - in<-&Msg{payload, attachment} - } - Debugf("[Serve] endpoint closed. Waiting for tasks to complete\n") - tasks.Wait() - Debugf("[Serve] endpoint closed AND tasks complete\n") - close(in) - }() - for msg := range in { - tasks.Add(1) - // Handle new message - go func(payload []byte, attachment *os.File) { - Debugf("---> Handling '%s' [fd=%d]\n", payload, attachment.Fd()) - defer tasks.Done() - args, err := parseMsgPayload(payload) - if err != nil { - Logf("error parsing beam message: %s\n", err) - if attachment != nil { - attachment.Close() - } - return - } - Debugf("---> calling handler for '%s'\n", args[0]) - handlerAttachment := attachment - var iotasks sync.WaitGroup - if attachment != nil { - if caller, err := beam.FdConn(int(attachment.Fd())); err == nil { - Debugf("[Serve] message '%s' has a valid beam endpoint as attachment. Setting up indirection\n", payload) - defer caller.Close() - jobpub, jobpriv, err := beam.USocketPair() - if err != nil { - return - } - defer jobpub.Close() - if f, err := jobpriv.File(); err != nil { - jobpriv.Close() - return - } else { - handlerAttachment = f - defer attachment.Close() - } - jobpriv.Close() - // Read messages from the job and re-insert them for handling - iotasks.Add(1) - go func(job *net.UnixConn) { - defer iotasks.Done() - for { - payload, attachment, err := beam.Receive(job) - if err != nil { - return - } - var fd int = -1 - if attachment != nil { - fd = int(attachment.Fd()) - } - Debugf("[Serve] received introspection message '%s'[%d]\n", payload, fd) - // Send messages back in for introspection - // Note that we don't scope introspection: jobs have full access to the - // context in which they were called. - in <-&Msg{payload, attachment} - } - }(jobpub) - // Read messages from the caller to the job - go func(caller *net.UnixConn, job *net.UnixConn) { - for { - payload, f, err := beam.Receive(caller) - if err != nil { - return - } - if err := beam.Send(job, payload, f); err != nil { - return - } - } - }(caller, jobpub) - } - } - handler(args, handlerAttachment) - Debugf("---> handler returned for '%s'\n", args[0]) - if handlerAttachment != nil { - handlerAttachment.Close() - } - Debugf("---> waiting for iotasks to complete for '%s'\n", args[0]) - iotasks.Wait() - Debugf("---> iotasks complete for '%s'\n", args[0]) - }(msg.payload, msg.attachment) - } - Debugf("[Serve] main serve loop completed\n") - return nil -} - -func builtinsHandler(args []string, attachment *os.File) { - if args[0] == "exit" { - CmdExit(args, attachment) - } else if args[0] == "cat" { - CmdCat(args, attachment) - } else if args[0] == "echo" { - CmdEcho(args, attachment) - } else if args[0] == "log" { - CmdLog(args, attachment) - } else if args[0] == "trace" { - CmdTrace(args, attachment) - } else if args[0] == "exec" { - CmdExec(args, attachment) - } -} - - func Logf(msg string, args ...interface{}) (int, error) { if len(msg) == 0 || msg[len(msg) - 1] != '\n' { msg = msg + "\n" @@ -362,7 +267,7 @@ func Logf(msg string, args ...interface{}) (int, error) { } func Debugf(msg string, args ...interface{}) { - if os.Getenv("DEBUG") != "" { + if os.Getenv("BEAMDEBUG") != "" { Logf(msg, args...) } }