beam/examples/beamsh: 'beamsend' command serializes all messages and sends them over a network connection

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
Solomon Hykes 2014-03-26 15:48:12 -07:00
parent f49734592c
commit d955d77dc8

View file

@ -305,6 +305,66 @@ func GetHandler(name string) Handler {
beam.Send(out, data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
}
}
} else if name == "beamsend" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
if len(args) != 2 {
if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
u, err := url.Parse(args[1])
if err != nil {
beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
var tasks sync.WaitGroup
for {
payload, attachment, err := beam.Receive(in)
if err != nil {
Logf("receive failed with err=%v\n", err)
break
}
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
beam.Send(out, data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil)
return
}
beam.Send(out, data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
tasks.Add(1)
func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
defer conn.Close()
// even when successful, conn.File() returns a duplicate,
// so we must close the original
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
return
}
if attachment == nil {
return
}
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
attachment.Close()
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
conn.Close()
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
iotasks.Wait()
attachment.Close()
}(payload, attachment, conn)
}
tasks.Wait()
}
} else if name == "connect" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
if len(args) != 2 {