From f7971cfc1e139a4ca098d8a282cf77a776cc5e8a Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Fri, 28 Mar 2014 15:42:57 -0700 Subject: [PATCH] beam/examples/beamsh: move code around for readability Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- beam/examples/beamsh/beamsh.go | 458 ++++++++++++++++----------------- 1 file changed, 221 insertions(+), 237 deletions(-) diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index 2fe8685..cc1b37e 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -2,8 +2,6 @@ package main import ( "bufio" - "crypto/rand" - "encoding/hex" "fmt" "github.com/dotcloud/docker/pkg/beam" "github.com/dotcloud/docker/pkg/beam/data" @@ -35,6 +33,7 @@ func main() { flag.Parse() if flag.NArg() == 0{ if term.IsTerminal(0) { + // No arguments, stdin is terminal --> interactive mode input := bufio.NewScanner(os.Stdin) for { os.Stdout.Write([]byte("beamsh> ")) @@ -59,6 +58,7 @@ func main() { } } } else { + // No arguments, stdin not terminal --> batch mode script, err := dockerscript.Parse(os.Stdin) if err != nil { Fatal("parse error: %v\n", err) @@ -68,6 +68,7 @@ func main() { } } } else { + // 1+ arguments: parse them as script files for _, scriptpath := range flag.Args() { f, err := os.Open(scriptpath) if err != nil { @@ -84,63 +85,6 @@ func main() { } } -func beamCopy(dst *net.UnixConn, src *net.UnixConn) (int, error) { - var n int - for { - payload, attachment, err := beam.Receive(src) - if err == io.EOF { - return n, nil - } else if err != nil { - return n, err - } - if err := beam.Send(dst, payload, attachment); err != nil { - if attachment != nil { - attachment.Close() - } - return n, err - } - n++ - } - panic("impossibru!") - return n, nil -} - -type Handler func([]string, *net.UnixConn, *net.UnixConn) - -func Devnull() (*net.UnixConn, error) { - priv, pub, err := beam.USocketPair() - if err != nil { - return nil, err - } - go func() { - defer priv.Close() - for { - _, attachment, err := beam.Receive(priv) - if err != nil { - return - } - if attachment != nil { - attachment.Close() - } - } - }() - return pub, nil -} - -func scriptString(script []*dockerscript.Command) string { - lines := make([]string, 0, len(script)) - for _, cmd := range script { - line := strings.Join(cmd.Args, " ") - if len(cmd.Children) > 0 { - line += fmt.Sprintf(" { %s }", scriptString(cmd.Children)) - } else { - line += " {}" - } - lines = append(lines, line) - } - return fmt.Sprintf("'%s'", strings.Join(lines, "; ")) -} - func executeRootScript(script []*dockerscript.Command) error { if len(rootPlugins) > 0 { var ( @@ -185,167 +129,6 @@ func executeScript(client *net.UnixConn, script []*dockerscript.Command) error { return nil } -func dialer(addr string) (chan net.Conn, error) { - u, err := url.Parse(addr) - if err != nil { - return nil, err - } - connections := make(chan net.Conn) - go func() { - defer close(connections) - for { - conn, err := net.Dial(u.Scheme, u.Host) - if err != nil { - return - } - connections <-conn - } - }() - return connections, nil -} - -func listener(addr string) (chan net.Conn, error) { - u, err := url.Parse(addr) - if err != nil { - return nil, err - } - l, err := net.Listen(u.Scheme, u.Host) - if err != nil { - return nil, err - } - connections := make(chan net.Conn) - go func() { - defer close(connections) - for { - conn, err := l.Accept() - if err != nil { - return - } - Logf("new connection\n") - connections<-conn - } - }() - return connections, nil -} - -func msgDesc(payload []byte, attachment *os.File) string { - var filedesc string = "" - if attachment != nil { - filedesc = fmt.Sprintf("%d", attachment.Fd()) - } - return fmt.Sprintf("'%s'[%s]", payload, filedesc) - -} - -func SendToConn(connections chan net.Conn, src *net.UnixConn) error { - var tasks sync.WaitGroup - defer tasks.Wait() - for { - payload, attachment, err := beam.Receive(src) - if err == io.EOF { - return nil - } else if err != nil { - return err - } - conn, ok := <-connections - if !ok { - break - } - Logf("Sending %s\n", msgDesc(payload, attachment)) - tasks.Add(1) - go func(payload []byte, attachment *os.File, conn net.Conn) { - defer tasks.Done() - if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil { - return - } - if attachment == nil { - conn.Close() - return - } - var iotasks sync.WaitGroup - iotasks.Add(2) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - Debugf("copying the connection to [%d]\n", attachment.Fd()) - io.Copy(attachment, conn) - attachment.Close() - Debugf("done copying the connection to [%d]\n", attachment.Fd()) - }(attachment, conn) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - Debugf("copying [%d] to the connection\n", attachment.Fd()) - io.Copy(conn, attachment) - conn.Close() - Debugf("done copying [%d] to the connection\n", attachment.Fd()) - }(attachment, conn) - iotasks.Wait() - }(payload, attachment, conn) - } - return nil -} - -func bicopy(a, b io.ReadWriteCloser) { - var iotasks sync.WaitGroup - oneCopy := func(dst io.WriteCloser, src io.Reader) { - defer iotasks.Done() - io.Copy(dst, src) - dst.Close() - } - iotasks.Add(2) - go oneCopy(a, b) - go oneCopy(b, a) - iotasks.Wait() -} - -func ReceiveFromConn(connections chan net.Conn, dst *net.UnixConn) error { - for conn := range connections { - err := func () error { - Logf("parsing message from network...\n") - defer Logf("done parsing message from network\n") - buf := make([]byte, 4098) - n, err := conn.Read(buf) - if n == 0 { - conn.Close() - if err == io.EOF { - return nil - } else { - return err - } - } - Logf("decoding message from '%s'\n", buf[:n]) - header, skip, err := data.DecodeString(string(buf[:n])) - if err != nil { - conn.Close() - return err - } - pub, priv, err := beam.SocketPair() - if err != nil { - return err - } - Logf("decoded message: %s\n", data.Message(header).Pretty()) - go func(skipped []byte, conn net.Conn, f *os.File) { - // this closes both conn and f - if len(skipped) > 0 { - if _, err := f.Write(skipped); err != nil { - Logf("ERROR: %v\n", err) - f.Close() - conn.Close() - return - } - } - bicopy(conn, f) - }(buf[skip:n], conn, pub) - if err := beam.Send(dst, []byte(header), priv); err != nil { - return err - } - return nil - }() - if err != nil { - Logf("Error reading from connection: %v\n", err) - } - } - return nil -} // 1) Find a handler for the command (if no handler, fail) // 2) Attach new in & out pair to the handler @@ -415,24 +198,8 @@ func executeCommand(client *net.UnixConn, cmd *dockerscript.Command) error { return nil } -func randomId() string { - id := make([]byte, 4) - io.ReadFull(rand.Reader, id) - return hex.EncodeToString(id) -} -func sendWPipe(conn *net.UnixConn, payload []byte) (*os.File, error) { - r, w, err := os.Pipe() - if err != nil { - return nil, err - } - if err := beam.Send(conn, payload, r); err != nil { - r.Close() - w.Close() - return nil, err - } - return w, nil -} +type Handler func([]string, *net.UnixConn, *net.UnixConn) func GetHandler(name string) Handler { if name == "log" { @@ -836,6 +603,9 @@ func GetHandler(name string) Handler { return nil } + +// VARIOUS HELPER FUNCTIONS: + func connToFile(conn net.Conn) (f *os.File, err error) { if connWithFile, ok := conn.(interface { File() (*os.File, error) }); !ok { return nil, fmt.Errorf("no file descriptor available") @@ -875,3 +645,217 @@ func Fatalf(msg string, args ...interface{}) { func Fatal(args ...interface{}) { Fatalf("%v", args[0]) } + +func scriptString(script []*dockerscript.Command) string { + lines := make([]string, 0, len(script)) + for _, cmd := range script { + line := strings.Join(cmd.Args, " ") + if len(cmd.Children) > 0 { + line += fmt.Sprintf(" { %s }", scriptString(cmd.Children)) + } else { + line += " {}" + } + lines = append(lines, line) + } + return fmt.Sprintf("'%s'", strings.Join(lines, "; ")) +} + +func beamCopy(dst *net.UnixConn, src *net.UnixConn) (int, error) { + var n int + for { + payload, attachment, err := beam.Receive(src) + if err == io.EOF { + return n, nil + } else if err != nil { + return n, err + } + if err := beam.Send(dst, payload, attachment); err != nil { + if attachment != nil { + attachment.Close() + } + return n, err + } + n++ + } + panic("impossibru!") + return n, nil +} + +func sendWPipe(conn *net.UnixConn, payload []byte) (*os.File, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + if err := beam.Send(conn, payload, r); err != nil { + r.Close() + w.Close() + return nil, err + } + return w, nil +} + +func dialer(addr string) (chan net.Conn, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + connections := make(chan net.Conn) + go func() { + defer close(connections) + for { + conn, err := net.Dial(u.Scheme, u.Host) + if err != nil { + return + } + connections <-conn + } + }() + return connections, nil +} + +func listener(addr string) (chan net.Conn, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + l, err := net.Listen(u.Scheme, u.Host) + if err != nil { + return nil, err + } + connections := make(chan net.Conn) + go func() { + defer close(connections) + for { + conn, err := l.Accept() + if err != nil { + return + } + Logf("new connection\n") + connections<-conn + } + }() + return connections, nil +} + + + +func SendToConn(connections chan net.Conn, src *net.UnixConn) error { + var tasks sync.WaitGroup + defer tasks.Wait() + for { + payload, attachment, err := beam.Receive(src) + if err == io.EOF { + return nil + } else if err != nil { + return err + } + conn, ok := <-connections + if !ok { + break + } + Logf("Sending %s\n", msgDesc(payload, attachment)) + tasks.Add(1) + go func(payload []byte, attachment *os.File, conn net.Conn) { + defer tasks.Done() + if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil { + return + } + if attachment == nil { + conn.Close() + return + } + var iotasks sync.WaitGroup + iotasks.Add(2) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying the connection to [%d]\n", attachment.Fd()) + io.Copy(attachment, conn) + attachment.Close() + Debugf("done copying the connection to [%d]\n", attachment.Fd()) + }(attachment, conn) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying [%d] to the connection\n", attachment.Fd()) + io.Copy(conn, attachment) + conn.Close() + Debugf("done copying [%d] to the connection\n", attachment.Fd()) + }(attachment, conn) + iotasks.Wait() + }(payload, attachment, conn) + } + return nil +} + +func msgDesc(payload []byte, attachment *os.File) string { + var filedesc string = "" + if attachment != nil { + filedesc = fmt.Sprintf("%d", attachment.Fd()) + } + return fmt.Sprintf("'%s'[%s]", payload, filedesc) + +} + +func ReceiveFromConn(connections chan net.Conn, dst *net.UnixConn) error { + for conn := range connections { + err := func () error { + Logf("parsing message from network...\n") + defer Logf("done parsing message from network\n") + buf := make([]byte, 4098) + n, err := conn.Read(buf) + if n == 0 { + conn.Close() + if err == io.EOF { + return nil + } else { + return err + } + } + Logf("decoding message from '%s'\n", buf[:n]) + header, skip, err := data.DecodeString(string(buf[:n])) + if err != nil { + conn.Close() + return err + } + pub, priv, err := beam.SocketPair() + if err != nil { + return err + } + Logf("decoded message: %s\n", data.Message(header).Pretty()) + go func(skipped []byte, conn net.Conn, f *os.File) { + // this closes both conn and f + if len(skipped) > 0 { + if _, err := f.Write(skipped); err != nil { + Logf("ERROR: %v\n", err) + f.Close() + conn.Close() + return + } + } + bicopy(conn, f) + }(buf[skip:n], conn, pub) + if err := beam.Send(dst, []byte(header), priv); err != nil { + return err + } + return nil + }() + if err != nil { + Logf("Error reading from connection: %v\n", err) + } + } + return nil +} + + +func bicopy(a, b io.ReadWriteCloser) { + var iotasks sync.WaitGroup + oneCopy := func(dst io.WriteCloser, src io.Reader) { + defer iotasks.Done() + io.Copy(dst, src) + dst.Close() + } + iotasks.Add(2) + go oneCopy(a, b) + go oneCopy(b, a) + iotasks.Wait() +} +