diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index 7512b36..d83fbe7 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -11,6 +11,8 @@ import ( "net" "bytes" "path" + "bufio" + "strconv" ) func main() { @@ -32,20 +34,25 @@ func main() { if err != nil { Fatal(err) } - // Synchronize on the job handler - for { - _, _, err := beam.Receive(job) - if err == io.EOF { - break - } else if err != nil { - Fatalf("error reading from job handler: %#v\n", err) - } - } + Serve(job) Logf("[%s] done\n", strings.Join(cmd.Args, " ")) } client.Close() } +func parseMsgPayload(payload []byte) ([]string, error) { + // FIXME: send structured message instead of a text script + cmds, err := dockerscript.Parse(bytes.NewReader(payload)) + if err != nil { + return nil, err + } + if len(cmds) == 0 { + return nil, fmt.Errorf("empty command") + } + // We don't care about multiple commands or children + return cmds[0].Args, nil +} + func CmdCat(args []string, f *os.File) { for _, name := range args[1:] { f, err := os.Open(name) @@ -58,17 +65,55 @@ func CmdCat(args []string, f *os.File) { } func CmdEcho(args []string, f *os.File) { - fmt.Println(strings.Join(args[1:], " ")) + resp, err := beam.FdConn(int(f.Fd())) + if err != nil { + return + } + r, w, err := os.Pipe() + if err != nil { + return + } + if err := beam.Send(resp, []byte("log stdout"), r); err != nil { + return + } + fmt.Fprintln(w, strings.Join(args[1:], " ")) + fmt.Printf("waiting 5 seconds...\n") + w.Close() } -func CmdDie(args []string, f *os.File) { - Fatal(strings.Join(args[1:], " ")) +func CmdExit(args []string, f *os.File) { + var status int + if len(args) > 1 { + val, err := strconv.ParseInt(args[1], 10, 32) + if err == nil { + status = int(val) + } + } + os.Exit(status) +} + +func CmdLog(args []string, f *os.File) { + name := args[1] + input := bufio.NewScanner(f) + for input.Scan() { + line := input.Text() + if len(line) > 0 { + fmt.Printf("[%s] %s\n", name, line) + } + if err := input.Err(); err != nil { + fmt.Printf("[%s:%s]\n", name, err) + break + } + } } func Serve(endpoint *net.UnixConn) error { + Logf("[Serve]\n") + defer Logf("[Serve] done\n") var tasks sync.WaitGroup defer tasks.Wait() for { + Logf("[Serve] waiting for next message...\n") payload, attachment, err := beam.Receive(endpoint) if err != nil { return err @@ -76,13 +121,11 @@ func Serve(endpoint *net.UnixConn) error { tasks.Add(1) go func(payload []byte, attachment *os.File) { defer tasks.Done() - defer func() { - if attachment != nil { - attachment.Close() - } - }() - // FIXME: send structured message instead of a text script - cmds, err := dockerscript.Parse(bytes.NewReader(payload)) + if attachment != nil { + defer fmt.Printf("closing request attachment %d\n", attachment.Fd()) + attachment.Close() + } + args, err := parseMsgPayload(payload) if err != nil { Logf("error parsing beam message: %s\n", err) if attachment != nil { @@ -90,18 +133,15 @@ func Serve(endpoint *net.UnixConn) error { } return } - if len(cmds) == 0 { - Logf("ignoring empty command\n", err) - } - // We don't care about multiple commands or children - args := cmds[0].Args Logf("beam message: %v\n", args) - if args[0] == "die" { - CmdDie(args, attachment) + if args[0] == "exit" { + CmdExit(args, attachment) } else if args[0] == "cat" { CmdCat(args, attachment) } else if args[0] == "echo" { CmdEcho(args, attachment) + } else if args[0] == "log" { + CmdLog(args, attachment) } }(payload, attachment) }