From 572952d6cecd92cdfdca8bebf1d41642377741b1 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Sun, 23 Mar 2014 20:46:31 -0700 Subject: [PATCH] beam/examples/beamsh: more bells and whistles for demos * Automatically switch to interactive mode when stdin is a terminal * Basic implementation of "responses" Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- beam/examples/beamsh/beamsh.go | 116 ++++++++++++++++++++++++++------- 1 file changed, 91 insertions(+), 25 deletions(-) diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index 6f88914..6f149ce 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -6,6 +6,7 @@ import ( "os" "github.com/dotcloud/docker/pkg/dockerscript" "github.com/dotcloud/docker/pkg/beam" + "github.com/dotcloud/docker/pkg/term" "strings" "sync" "net" @@ -16,28 +17,61 @@ import ( ) func main() { - script, err := dockerscript.Parse(os.Stdin) - if err != nil { - Fatal("parse error: %v\n", err) - } - Logf("%d commands:\n", len(script)) + Logf("Initializing engine\n") client, engine, err := beam.USocketPair() if err != nil { Fatal(err) } + defer client.Close() go func() { - Serve(engine) + Serve(engine, builtinsHandler) + Logf("Shutting down engine\n") engine.Close() }() + if term.IsTerminal(0) { + input := bufio.NewScanner(os.Stdin) + for { + os.Stdout.Write([]byte("beamsh> ")) + if !input.Scan() { + break + } + line := input.Text() + if len(line) != 0 { + cmd, err := dockerscript.Parse(strings.NewReader(line)) + if err != nil { + Logf("error: %v\n", err) + continue + } + executeScript(client, cmd) + } + if err := input.Err(); err == io.EOF { + break + } else if err != nil { + Fatal(err) + } + } + } else { + script, err := dockerscript.Parse(os.Stdin) + if err != nil { + Fatal("parse error: %v\n", err) + } + executeScript(client, script) + } +} + +func executeScript(client *net.UnixConn, script []*dockerscript.Command) { + Logf("%d commands:\n", len(script)) for _, cmd := range script { job, err := beam.SendPipe(client, []byte(strings.Join(cmd.Args, " "))) if err != nil { Fatal(err) } - Serve(job) + // TODO: pass a default handler to deal with 'status' + // --> use beam chaining? + Logf("Listening for reply messages\n") + Serve(job, builtinsHandler) Logf("[%s] done\n", strings.Join(cmd.Args, " ")) } - client.Close() } func parseMsgPayload(payload []byte) ([]string, error) { @@ -75,11 +109,11 @@ func CmdEcho(args []string, f *os.File) { if err != nil { return } + Logf("[CmdEcho] stdout pipe() r=%d w=%d\n", r.Fd(), w.Fd()) if err := beam.Send(resp, []byte("log stdout"), r); err != nil { return } fmt.Fprintln(w, strings.Join(args[1:], " ")) - fmt.Printf("waiting 5 seconds...\n") w.Close() } @@ -94,7 +128,29 @@ func CmdExit(args []string, f *os.File) { os.Exit(status) } +// 'status' is a notification of a job's status. +// +func parseEnv(args []string) ([]string, map[string]string) { + var argsOut []string + env := make(map[string]string) + for _, word := range args[1:] { + if strings.Contains(word, "=") { + kv := strings.SplitN(word, "=", 2) + key := kv[0] + var val string + if len(kv) == 2 { + val = kv[1] + } + env[key] = val + } else { + argsOut = append(argsOut, word) + } + } + return argsOut, env +} + func CmdLog(args []string, f *os.File) { + defer Logf("CmdLog done\n") name := args[1] input := bufio.NewScanner(f) for input.Scan() { @@ -109,23 +165,28 @@ func CmdLog(args []string, f *os.File) { } } -func Serve(endpoint *net.UnixConn) error { - Logf("[Serve]\n") - defer Logf("[Serve] done\n") +func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error { + Logf("[Serve %#v]\n", handler) + defer Logf("[Serve %#v] done\n", handler) var tasks sync.WaitGroup defer tasks.Wait() for { - Logf("[Serve] waiting for next message...\n") payload, attachment, err := beam.Receive(endpoint) if err != nil { return err } tasks.Add(1) + // Handle new message go func(payload []byte, attachment *os.File) { + Logf("---> Handling '%s' [fd=%d]\n", payload, attachment.Fd()) defer tasks.Done() if attachment != nil { - defer fmt.Printf("closing request attachment %d\n", attachment.Fd()) - attachment.Close() + defer func() { + Logf("---> Closing attachment [fd=%d] for msg '%s'\n", attachment.Fd(), payload) + attachment.Close() + }() + } else { + defer Logf("---> No attachment to close for msg '%s'\n", payload) } args, err := parseMsgPayload(payload) if err != nil { @@ -135,21 +196,26 @@ func Serve(endpoint *net.UnixConn) error { } return } - Logf("beam message: %v\n", args) - if args[0] == "exit" { - CmdExit(args, attachment) - } else if args[0] == "cat" { - CmdCat(args, attachment) - } else if args[0] == "echo" { - CmdEcho(args, attachment) - } else if args[0] == "log" { - CmdLog(args, attachment) - } + Logf("---> calling handler for '%s'\n", args[0]) + handler(args, attachment) + Logf("---> handler returned for '%s'\n", args[0]) }(payload, attachment) } return nil } +func builtinsHandler(args []string, attachment *os.File) { + if args[0] == "exit" { + CmdExit(args, attachment) + } else if args[0] == "cat" { + CmdCat(args, attachment) + } else if args[0] == "echo" { + CmdEcho(args, attachment) + } else if args[0] == "log" { + CmdLog(args, attachment) + } +} + func Logf(msg string, args ...interface{}) (int, error) { if len(msg) == 0 || msg[len(msg) - 1] != '\n' {