diff --git a/beam/data/data.go b/beam/data/data.go index cf0cb7c..5604259 100644 --- a/beam/data/data.go +++ b/beam/data/data.go @@ -24,6 +24,7 @@ func encodeString(s string) string { } var EncodeString = encodeString +var DecodeString = decodeString func encodeList(l []string) string { values := make([]string, 0, len(l)) diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index c96a75d..05cec7c 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -124,6 +124,168 @@ func executeScript(client *net.UnixConn, script []*dockerscript.Command) error { return nil } +func dialer(addr string) (chan net.Conn, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + connections := make(chan net.Conn) + go func() { + defer close(connections) + for { + conn, err := net.Dial(u.Scheme, u.Host) + if err != nil { + return + } + connections <-conn + } + }() + return connections, nil +} + +func listener(addr string) (chan net.Conn, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + l, err := net.Listen(u.Scheme, u.Host) + if err != nil { + return nil, err + } + connections := make(chan net.Conn) + go func() { + defer close(connections) + for { + conn, err := l.Accept() + if err != nil { + return + } + Logf("new connection\n") + connections<-conn + } + }() + return connections, nil +} + +func msgDesc(payload []byte, attachment *os.File) string { + var filedesc string = "" + if attachment != nil { + filedesc = fmt.Sprintf("%d", attachment.Fd()) + } + return fmt.Sprintf("'%s'[%s]", payload, filedesc) + +} + +func SendToConn(connections chan net.Conn, src *net.UnixConn) error { + var tasks sync.WaitGroup + defer tasks.Wait() + for { + payload, attachment, err := beam.Receive(src) + if err == io.EOF { + return nil + } else if err != nil { + return err + } + conn, ok := <-connections + if !ok { + break + } + Logf("Sending %s\n", msgDesc(payload, attachment)) + tasks.Add(1) + go func(payload []byte, attachment *os.File, conn net.Conn) { + defer tasks.Done() + if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil { + return + } + if attachment == nil { + conn.Close() + return + } + var iotasks sync.WaitGroup + iotasks.Add(2) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying the connection to [%d]\n", attachment.Fd()) + io.Copy(attachment, conn) + attachment.Close() + Debugf("done copying the connection to [%d]\n", attachment.Fd()) + }(attachment, conn) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying [%d] to the connection\n", attachment.Fd()) + io.Copy(conn, attachment) + conn.Close() + Debugf("done copying [%d] to the connection\n", attachment.Fd()) + }(attachment, conn) + iotasks.Wait() + }(payload, attachment, conn) + } + return nil +} + +func bicopy(a, b io.ReadWriteCloser) { + var iotasks sync.WaitGroup + oneCopy := func(dst io.WriteCloser, src io.Reader) { + defer iotasks.Done() + io.Copy(dst, src) + dst.Close() + } + iotasks.Add(2) + go oneCopy(a, b) + go oneCopy(b, a) + iotasks.Wait() +} + +func ReceiveFromConn(connections chan net.Conn, dst *net.UnixConn) error { + for conn := range connections { + err := func () error { + Logf("parsing message from network...\n") + defer Logf("done parsing message from network\n") + buf := make([]byte, 4098) + n, err := conn.Read(buf) + if n == 0 { + conn.Close() + if err == io.EOF { + return nil + } else { + return err + } + } + Logf("decoding message from '%s'\n", buf[:n]) + header, skip, err := data.DecodeString(string(buf[:n])) + if err != nil { + conn.Close() + return err + } + pub, priv, err := beam.SocketPair() + if err != nil { + return err + } + Logf("decoded message: %s\n", data.Message(header).Pretty()) + go func(skipped []byte, conn net.Conn, f *os.File) { + // this closes both conn and f + if len(skipped) > 0 { + if _, err := f.Write(skipped); err != nil { + Logf("ERROR: %v\n", err) + f.Close() + conn.Close() + return + } + } + bicopy(conn, f) + }(buf[skip:n], conn, pub) + if err := beam.Send(dst, []byte(header), priv); err != nil { + return err + } + return nil + }() + if err != nil { + Logf("Error reading from connection: %v\n", err) + } + } + return nil +} + // 1) Find a handler for the command (if no handler, fail) // 2) Attach new in & out pair to the handler // 3) [in the background] Copy handler output to our own output @@ -224,6 +386,7 @@ func GetHandler(name string) Handler { return } cmd.Stderr = errW + cmd.Stdin = os.Stdin beam.Send(out, data.Empty().Set("cmd", "log", "stdout").Bytes(), outR) beam.Send(out, data.Empty().Set("cmd", "log", "stderr").Bytes(), errR) execErr := cmd.Run() @@ -326,6 +489,24 @@ func GetHandler(name string) Handler { } } } else if name == "beamsend" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + if len(args) < 2 { + if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { + Fatal(err) + } + return + } + var connector func(string) (chan net.Conn, error) + connector = dialer + connections, err := connector(args[1]) + if err != nil { + beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + // Copy in to conn + SendToConn(connections, in) + } + } else if name == "beamreceive" { return func(args []string, in *net.UnixConn, out *net.UnixConn) { if len(args) != 2 { if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { @@ -333,57 +514,15 @@ func GetHandler(name string) Handler { } return } - u, err := url.Parse(args[1]) + var connector func(string) (chan net.Conn, error) + connector = listener + connections, err := connector(args[1]) if err != nil { beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) return } - var tasks sync.WaitGroup - for { - payload, attachment, err := beam.Receive(in) - if err != nil { - Logf("receive failed with err=%v\n", err) - break - } - conn, err := net.Dial(u.Scheme, u.Host) - if err != nil { - beam.Send(out, data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil) - return - } - beam.Send(out, data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil) - tasks.Add(1) - func(payload []byte, attachment *os.File, conn net.Conn) { - defer tasks.Done() - defer conn.Close() - // even when successful, conn.File() returns a duplicate, - // so we must close the original - if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil { - return - } - if attachment == nil { - return - } - var iotasks sync.WaitGroup - iotasks.Add(2) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - Debugf("copying the connection to [%d]\n", attachment.Fd()) - io.Copy(attachment, conn) - attachment.Close() - Debugf("done copying the connection to [%d]\n", attachment.Fd()) - }(attachment, conn) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - Debugf("copying [%d] to the connection\n", attachment.Fd()) - io.Copy(conn, attachment) - conn.Close() - Debugf("done copying [%d] to the connection\n", attachment.Fd()) - }(attachment, conn) - iotasks.Wait() - attachment.Close() - }(payload, attachment, conn) - } - tasks.Wait() + // Copy in to conn + ReceiveFromConn(connections, out) } } else if name == "connect" { return func(args []string, in *net.UnixConn, out *net.UnixConn) {