From 1dbb699e6aea1f389b89d583ba67e39f5b2c17b8 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Wed, 2 Apr 2014 19:18:00 -0700 Subject: [PATCH] beam/examples/beamsh: move builtins to a separate file for readability Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- beam/examples/beamsh/beamsh.go | 367 ++--------------------------- beam/examples/beamsh/builtins.go | 384 +++++++++++++++++++++++++++++++ 2 files changed, 402 insertions(+), 349 deletions(-) create mode 100644 beam/examples/beamsh/builtins.go diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index 957dda6..353ef68 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -11,11 +11,9 @@ import ( "net" "net/url" "os" - "os/exec" "path" "strings" "sync" - "text/template" "flag" ) @@ -252,370 +250,41 @@ func Handlers() (*beam.UnixConn, error) { func GetHandler(name string) Handler { if name == "logger" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if err := os.MkdirAll("logs", 0700); err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - var tasks sync.WaitGroup - defer tasks.Wait() - var n int = 1 - r := beam.NewRouter(out) - r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func (payload []byte, attachment *os.File) error { - tasks.Add(1) - go func(n int) { - defer tasks.Done() - defer attachment.Close() - var streamname string - if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" { - streamname = "stdout" - } else { - streamname = cmd[1] - } - if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 { - streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname) - } - logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700) - if err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - defer logfile.Close() - io.Copy(logfile, attachment) - logfile.Sync() - }(n) - n++ - return nil - }).Tee(out) - if _, err := beam.Copy(r, in); err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - } + return CmdLogger } else if name == "render" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0]) - out.Send(data.Empty().Set("status", "1").Bytes(), nil) - return - } - txt := args[1] - if !strings.HasSuffix(txt, "\n") { - txt += "\n" - } - t := template.Must(template.New("render").Parse(txt)) - for { - payload, attachment, err := in.Receive() - if err != nil { - return - } - msg, err := data.Decode(string(payload)) - if err != nil { - fmt.Fprintf(stderr, "decode error: %v\n") - } - if err := t.Execute(stdout, msg); err != nil { - fmt.Fprintf(stderr, "rendering error: %v\n", err) - out.Send(data.Empty().Set("status", "1").Bytes(), nil) - return - } - if err := out.Send(payload, attachment); err != nil { - return - } - } - } + return CmdRender } else if name == "devnull" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - _, attachment, err := in.Receive() - if err != nil { - return - } - if attachment != nil { - attachment.Close() - } - } - } + return CmdDevnull } else if name == "prompt" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) < 2 { - fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0]) - return - } - if !term.IsTerminal(0) { - fmt.Fprintf(stderr, "can't prompt: no tty available...\n") - return - } - fmt.Printf("%s: ", strings.Join(args[1:], " ")) - oldState, _ := term.SaveState(0) - term.DisableEcho(0, oldState) - line, _, err := bufio.NewReader(os.Stdin).ReadLine() - if err != nil { - fmt.Fprintln(stderr, err.Error()) - return - } - val := string(line) - fmt.Printf("\n") - term.RestoreTerminal(0, oldState) - out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil) - } + return CmdPrompt } else if name == "stdio" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - var tasks sync.WaitGroup - defer tasks.Wait() - - r := beam.NewRouter(out) - r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { - tasks.Add(1) - go func() { - defer tasks.Done() - defer attachment.Close() - io.Copy(os.Stdout, attachment) - attachment.Close() - }() - return nil - }).Tee(out) - - if _, err := beam.Copy(r, in); err != nil { - Fatal(err) - fmt.Fprintf(stderr, "%v\n", err) - return - } - } + return CmdStdio } else if name == "echo" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - fmt.Fprintln(stdout, strings.Join(args[1:], " ")) - } + return CmdEcho } else if name == "pass" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - payload, attachment, err := in.Receive() - if err != nil { - return - } - if err := out.Send(payload, attachment); err != nil { - if attachment != nil { - attachment.Close() - } - return - } - } - } + return CmdPass } else if name == "in" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - os.Chdir(args[1]) - GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out) - } + return CmdIn } else if name == "exec" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - cmd := exec.Command(args[1], args[2:]...) - cmd.Stdout = stdout - cmd.Stderr = stderr - cmd.Stdin = os.Stdin - execErr := cmd.Run() - var status string - if execErr != nil { - status = execErr.Error() - } else { - status = "ok" - } - out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil) - } + return CmdExec } else if name == "trace" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - p, a, err := in.Receive() - if err != nil { - return - } - var msg string - if pretty := data.Message(string(p)).Pretty(); pretty != "" { - msg = pretty - } else { - msg = string(p) - } - if a != nil { - msg = fmt.Sprintf("%s [%d]", msg, a.Fd()) - } - fmt.Printf("===> %s\n", msg) - out.Send(p, a) - } - } + return CmdTrace } else if name == "emit" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - out.Send(data.Parse(args[1:]).Bytes(), nil) - } + return CmdEmit } else if name == "print" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - payload, a, err := in.Receive() - if err != nil { - return - } - // Skip commands - if a != nil && data.Message(payload).Get("cmd") == nil { - dup, err := beam.SendPipe(out, payload) - if err != nil { - a.Close() - return - } - io.Copy(io.MultiWriter(os.Stdout, dup), a) - dup.Close() - } else { - if err := out.Send(payload, a); err != nil { - return - } - } - } - } + return CmdPrint } else if name == "multiprint" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - var tasks sync.WaitGroup - for { - payload, a, err := in.Receive() - if err != nil { - return - } - if a != nil { - tasks.Add(1) - go func(payload []byte, attachment *os.File) { - defer tasks.Done() - msg := data.Message(string(payload)) - input := bufio.NewScanner(attachment) - for input.Scan() { - fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text()) - } - }(payload, a) - } - } - tasks.Wait() - } + return CmdMultiprint } else if name == "listen" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) - return - } - u, err := url.Parse(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - l, err := net.Listen(u.Scheme, u.Host) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - for { - conn, err := l.Accept() - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - f, err := connToFile(conn) - if err != nil { - conn.Close() - continue - } - out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f) - } - } + return CmdListen } else if name == "beamsend" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) < 2 { - if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { - Fatal(err) - } - return - } - var connector func(string) (chan net.Conn, error) - connector = dialer - connections, err := connector(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - // Copy in to conn - SendToConn(connections, in) - } + return CmdBeamsend } else if name == "beamreceive" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { - Fatal(err) - } - return - } - var connector func(string) (chan net.Conn, error) - connector = listener - connections, err := connector(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - // Copy in to conn - ReceiveFromConn(connections, out) - } + return CmdBeamreceive } else if name == "connect" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) - return - } - u, err := url.Parse(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - var tasks sync.WaitGroup - for { - _, attachment, err := in.Receive() - if err != nil { - break - } - if attachment == nil { - continue - } - Logf("connecting to %s/%s\n", u.Scheme, u.Host) - conn, err := net.Dial(u.Scheme, u.Host) - if err != nil { - out.Send(data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil) - return - } - out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil) - tasks.Add(1) - go func(attachment *os.File, conn net.Conn) { - defer tasks.Done() - // even when successful, conn.File() returns a duplicate, - // so we must close the original - var iotasks sync.WaitGroup - iotasks.Add(2) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - io.Copy(attachment, conn) - }(attachment, conn) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - io.Copy(conn, attachment) - }(attachment, conn) - iotasks.Wait() - conn.Close() - attachment.Close() - }(attachment, conn) - } - tasks.Wait() - } + return CmdConnect } else if name == "openfile" { - return func(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for _, name := range args { - f, err := os.Open(name) - if err != nil { - continue - } - if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil { - f.Close() - } - } - } + return CmdOpenfile } return nil } diff --git a/beam/examples/beamsh/builtins.go b/beam/examples/beamsh/builtins.go new file mode 100644 index 0000000..d85bafd --- /dev/null +++ b/beam/examples/beamsh/builtins.go @@ -0,0 +1,384 @@ +package main + +import ( + "io" + "os/exec" + "github.com/dotcloud/docker/pkg/beam" + "github.com/dotcloud/docker/pkg/beam/data" + "github.com/dotcloud/docker/pkg/term" + "text/template" + "fmt" + "sync" + "os" + "strings" + "path" + "bufio" + "net" + "net/url" +) + + +func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if err := os.MkdirAll("logs", 0700); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + var tasks sync.WaitGroup + defer tasks.Wait() + var n int = 1 + r := beam.NewRouter(out) + r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func (payload []byte, attachment *os.File) error { + tasks.Add(1) + go func(n int) { + defer tasks.Done() + defer attachment.Close() + var streamname string + if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" { + streamname = "stdout" + } else { + streamname = cmd[1] + } + if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 { + streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname) + } + logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700) + if err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + defer logfile.Close() + io.Copy(logfile, attachment) + logfile.Sync() + }(n) + n++ + return nil + }).Tee(out) + if _, err := beam.Copy(r, in); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } +} + +func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0]) + out.Send(data.Empty().Set("status", "1").Bytes(), nil) + return + } + txt := args[1] + if !strings.HasSuffix(txt, "\n") { + txt += "\n" + } + t := template.Must(template.New("render").Parse(txt)) + for { + payload, attachment, err := in.Receive() + if err != nil { + return + } + msg, err := data.Decode(string(payload)) + if err != nil { + fmt.Fprintf(stderr, "decode error: %v\n") + } + if err := t.Execute(stdout, msg); err != nil { + fmt.Fprintf(stderr, "rendering error: %v\n", err) + out.Send(data.Empty().Set("status", "1").Bytes(), nil) + return + } + if err := out.Send(payload, attachment); err != nil { + return + } + } +} + +func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + _, attachment, err := in.Receive() + if err != nil { + return + } + if attachment != nil { + attachment.Close() + } + } +} + +func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) < 2 { + fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0]) + return + } + if !term.IsTerminal(0) { + fmt.Fprintf(stderr, "can't prompt: no tty available...\n") + return + } + fmt.Printf("%s: ", strings.Join(args[1:], " ")) + oldState, _ := term.SaveState(0) + term.DisableEcho(0, oldState) + line, _, err := bufio.NewReader(os.Stdin).ReadLine() + if err != nil { + fmt.Fprintln(stderr, err.Error()) + return + } + val := string(line) + fmt.Printf("\n") + term.RestoreTerminal(0, oldState) + out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil) +} + +func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + var tasks sync.WaitGroup + defer tasks.Wait() + + r := beam.NewRouter(out) + r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { + tasks.Add(1) + go func() { + defer tasks.Done() + defer attachment.Close() + io.Copy(os.Stdout, attachment) + attachment.Close() + }() + return nil + }).Tee(out) + + if _, err := beam.Copy(r, in); err != nil { + Fatal(err) + fmt.Fprintf(stderr, "%v\n", err) + return + } +} + +func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + fmt.Fprintln(stdout, strings.Join(args[1:], " ")) +} + +func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + payload, attachment, err := in.Receive() + if err != nil { + return + } + if err := out.Send(payload, attachment); err != nil { + if attachment != nil { + attachment.Close() + } + return + } + } +} + +func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + os.Chdir(args[1]) + GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out) +} + +func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + cmd := exec.Command(args[1], args[2:]...) + cmd.Stdout = stdout + cmd.Stderr = stderr + cmd.Stdin = os.Stdin + execErr := cmd.Run() + var status string + if execErr != nil { + status = execErr.Error() + } else { + status = "ok" + } + out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil) +} + +func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + p, a, err := in.Receive() + if err != nil { + return + } + var msg string + if pretty := data.Message(string(p)).Pretty(); pretty != "" { + msg = pretty + } else { + msg = string(p) + } + if a != nil { + msg = fmt.Sprintf("%s [%d]", msg, a.Fd()) + } + fmt.Printf("===> %s\n", msg) + out.Send(p, a) + } +} + +func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + out.Send(data.Parse(args[1:]).Bytes(), nil) +} + +func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + payload, a, err := in.Receive() + if err != nil { + return + } + // Skip commands + if a != nil && data.Message(payload).Get("cmd") == nil { + dup, err := beam.SendPipe(out, payload) + if err != nil { + a.Close() + return + } + io.Copy(io.MultiWriter(os.Stdout, dup), a) + dup.Close() + } else { + if err := out.Send(payload, a); err != nil { + return + } + } + } +} + +func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + var tasks sync.WaitGroup + for { + payload, a, err := in.Receive() + if err != nil { + return + } + if a != nil { + tasks.Add(1) + go func(payload []byte, attachment *os.File) { + defer tasks.Done() + msg := data.Message(string(payload)) + input := bufio.NewScanner(attachment) + for input.Scan() { + fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text()) + } + }(payload, a) + } + } + tasks.Wait() +} + +func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) + return + } + u, err := url.Parse(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + l, err := net.Listen(u.Scheme, u.Host) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + for { + conn, err := l.Accept() + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + f, err := connToFile(conn) + if err != nil { + conn.Close() + continue + } + out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f) + } +} + +func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) < 2 { + if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { + Fatal(err) + } + return + } + var connector func(string) (chan net.Conn, error) + connector = dialer + connections, err := connector(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + // Copy in to conn + SendToConn(connections, in) +} + +func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { + Fatal(err) + } + return + } + var connector func(string) (chan net.Conn, error) + connector = listener + connections, err := connector(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + // Copy in to conn + ReceiveFromConn(connections, out) +} + +func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) + return + } + u, err := url.Parse(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + var tasks sync.WaitGroup + for { + _, attachment, err := in.Receive() + if err != nil { + break + } + if attachment == nil { + continue + } + Logf("connecting to %s/%s\n", u.Scheme, u.Host) + conn, err := net.Dial(u.Scheme, u.Host) + if err != nil { + out.Send(data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil) + return + } + out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil) + tasks.Add(1) + go func(attachment *os.File, conn net.Conn) { + defer tasks.Done() + // even when successful, conn.File() returns a duplicate, + // so we must close the original + var iotasks sync.WaitGroup + iotasks.Add(2) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + io.Copy(attachment, conn) + }(attachment, conn) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + io.Copy(conn, attachment) + }(attachment, conn) + iotasks.Wait() + conn.Close() + attachment.Close() + }(attachment, conn) + } + tasks.Wait() +} + +func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for _, name := range args { + f, err := os.Open(name) + if err != nil { + continue + } + if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil { + f.Close() + } + } +}