beam/examples/beamsh: remote communication over beam (experimental).

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
Solomon Hykes 2014-03-27 14:54:53 -07:00
parent d4fb68ee0f
commit 6344b09af3
2 changed files with 187 additions and 47 deletions

View file

@ -24,6 +24,7 @@ func encodeString(s string) string {
}
var EncodeString = encodeString
var DecodeString = decodeString
func encodeList(l []string) string {
values := make([]string, 0, len(l))

View file

@ -124,6 +124,168 @@ func executeScript(client *net.UnixConn, script []*dockerscript.Command) error {
return nil
}
func dialer(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
return
}
connections <-conn
}
}()
return connections, nil
}
func listener(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := l.Accept()
if err != nil {
return
}
Logf("new connection\n")
connections<-conn
}
}()
return connections, nil
}
func msgDesc(payload []byte, attachment *os.File) string {
var filedesc string = "<nil>"
if attachment != nil {
filedesc = fmt.Sprintf("%d", attachment.Fd())
}
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
}
func SendToConn(connections chan net.Conn, src *net.UnixConn) error {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := beam.Receive(src)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
conn, ok := <-connections
if !ok {
break
}
Logf("Sending %s\n", msgDesc(payload, attachment))
tasks.Add(1)
go func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
return
}
if attachment == nil {
conn.Close()
return
}
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
attachment.Close()
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
conn.Close()
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
iotasks.Wait()
}(payload, attachment, conn)
}
return nil
}
func bicopy(a, b io.ReadWriteCloser) {
var iotasks sync.WaitGroup
oneCopy := func(dst io.WriteCloser, src io.Reader) {
defer iotasks.Done()
io.Copy(dst, src)
dst.Close()
}
iotasks.Add(2)
go oneCopy(a, b)
go oneCopy(b, a)
iotasks.Wait()
}
func ReceiveFromConn(connections chan net.Conn, dst *net.UnixConn) error {
for conn := range connections {
err := func () error {
Logf("parsing message from network...\n")
defer Logf("done parsing message from network\n")
buf := make([]byte, 4098)
n, err := conn.Read(buf)
if n == 0 {
conn.Close()
if err == io.EOF {
return nil
} else {
return err
}
}
Logf("decoding message from '%s'\n", buf[:n])
header, skip, err := data.DecodeString(string(buf[:n]))
if err != nil {
conn.Close()
return err
}
pub, priv, err := beam.SocketPair()
if err != nil {
return err
}
Logf("decoded message: %s\n", data.Message(header).Pretty())
go func(skipped []byte, conn net.Conn, f *os.File) {
// this closes both conn and f
if len(skipped) > 0 {
if _, err := f.Write(skipped); err != nil {
Logf("ERROR: %v\n", err)
f.Close()
conn.Close()
return
}
}
bicopy(conn, f)
}(buf[skip:n], conn, pub)
if err := beam.Send(dst, []byte(header), priv); err != nil {
return err
}
return nil
}()
if err != nil {
Logf("Error reading from connection: %v\n", err)
}
}
return nil
}
// 1) Find a handler for the command (if no handler, fail)
// 2) Attach new in & out pair to the handler
// 3) [in the background] Copy handler output to our own output
@ -224,6 +386,7 @@ func GetHandler(name string) Handler {
return
}
cmd.Stderr = errW
cmd.Stdin = os.Stdin
beam.Send(out, data.Empty().Set("cmd", "log", "stdout").Bytes(), outR)
beam.Send(out, data.Empty().Set("cmd", "log", "stderr").Bytes(), errR)
execErr := cmd.Run()
@ -326,6 +489,24 @@ func GetHandler(name string) Handler {
}
}
} else if name == "beamsend" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
if len(args) < 2 {
if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
var connector func(string) (chan net.Conn, error)
connector = dialer
connections, err := connector(args[1])
if err != nil {
beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
SendToConn(connections, in)
}
} else if name == "beamreceive" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
if len(args) != 2 {
if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
@ -333,57 +514,15 @@ func GetHandler(name string) Handler {
}
return
}
u, err := url.Parse(args[1])
var connector func(string) (chan net.Conn, error)
connector = listener
connections, err := connector(args[1])
if err != nil {
beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
var tasks sync.WaitGroup
for {
payload, attachment, err := beam.Receive(in)
if err != nil {
Logf("receive failed with err=%v\n", err)
break
}
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
beam.Send(out, data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil)
return
}
beam.Send(out, data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
tasks.Add(1)
func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
defer conn.Close()
// even when successful, conn.File() returns a duplicate,
// so we must close the original
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
return
}
if attachment == nil {
return
}
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
attachment.Close()
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
conn.Close()
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
iotasks.Wait()
attachment.Close()
}(payload, attachment, conn)
}
tasks.Wait()
// Copy in to conn
ReceiveFromConn(connections, out)
}
} else if name == "connect" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {