From d955d77dc8c040178cf6e4a55c746c2e925432f2 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Wed, 26 Mar 2014 15:48:12 -0700 Subject: [PATCH] beam/examples/beamsh: 'beamsend' command serializes all messages and sends them over a network connection Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- beam/examples/beamsh/beamsh.go | 60 ++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index 83c0de5..4c82410 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -305,6 +305,66 @@ func GetHandler(name string) Handler { beam.Send(out, data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f) } } + } else if name == "beamsend" { + return func(args []string, in *net.UnixConn, out *net.UnixConn) { + if len(args) != 2 { + if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { + Fatal(err) + } + return + } + u, err := url.Parse(args[1]) + if err != nil { + beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + var tasks sync.WaitGroup + for { + payload, attachment, err := beam.Receive(in) + if err != nil { + Logf("receive failed with err=%v\n", err) + break + } + conn, err := net.Dial(u.Scheme, u.Host) + if err != nil { + beam.Send(out, data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil) + return + } + beam.Send(out, data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil) + tasks.Add(1) + func(payload []byte, attachment *os.File, conn net.Conn) { + defer tasks.Done() + defer conn.Close() + // even when successful, conn.File() returns a duplicate, + // so we must close the original + if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil { + return + } + if attachment == nil { + return + } + var iotasks sync.WaitGroup + iotasks.Add(2) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying the connection to [%d]\n", attachment.Fd()) + io.Copy(attachment, conn) + attachment.Close() + Debugf("done copying the connection to [%d]\n", attachment.Fd()) + }(attachment, conn) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying [%d] to the connection\n", attachment.Fd()) + io.Copy(conn, attachment) + conn.Close() + Debugf("done copying [%d] to the connection\n", attachment.Fd()) + }(attachment, conn) + iotasks.Wait() + attachment.Close() + }(payload, attachment, conn) + } + tasks.Wait() + } } else if name == "connect" { return func(args []string, in *net.UnixConn, out *net.UnixConn) { if len(args) != 2 {