beam/examples/beamsh: move code around for readability

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
Solomon Hykes 2014-03-28 15:42:57 -07:00
parent 11c2feae5b
commit f7971cfc1e

View file

@ -2,8 +2,6 @@ package main
import ( import (
"bufio" "bufio"
"crypto/rand"
"encoding/hex"
"fmt" "fmt"
"github.com/dotcloud/docker/pkg/beam" "github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data" "github.com/dotcloud/docker/pkg/beam/data"
@ -35,6 +33,7 @@ func main() {
flag.Parse() flag.Parse()
if flag.NArg() == 0{ if flag.NArg() == 0{
if term.IsTerminal(0) { if term.IsTerminal(0) {
// No arguments, stdin is terminal --> interactive mode
input := bufio.NewScanner(os.Stdin) input := bufio.NewScanner(os.Stdin)
for { for {
os.Stdout.Write([]byte("beamsh> ")) os.Stdout.Write([]byte("beamsh> "))
@ -59,6 +58,7 @@ func main() {
} }
} }
} else { } else {
// No arguments, stdin not terminal --> batch mode
script, err := dockerscript.Parse(os.Stdin) script, err := dockerscript.Parse(os.Stdin)
if err != nil { if err != nil {
Fatal("parse error: %v\n", err) Fatal("parse error: %v\n", err)
@ -68,6 +68,7 @@ func main() {
} }
} }
} else { } else {
// 1+ arguments: parse them as script files
for _, scriptpath := range flag.Args() { for _, scriptpath := range flag.Args() {
f, err := os.Open(scriptpath) f, err := os.Open(scriptpath)
if err != nil { if err != nil {
@ -84,63 +85,6 @@ func main() {
} }
} }
func beamCopy(dst *net.UnixConn, src *net.UnixConn) (int, error) {
var n int
for {
payload, attachment, err := beam.Receive(src)
if err == io.EOF {
return n, nil
} else if err != nil {
return n, err
}
if err := beam.Send(dst, payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return n, err
}
n++
}
panic("impossibru!")
return n, nil
}
type Handler func([]string, *net.UnixConn, *net.UnixConn)
func Devnull() (*net.UnixConn, error) {
priv, pub, err := beam.USocketPair()
if err != nil {
return nil, err
}
go func() {
defer priv.Close()
for {
_, attachment, err := beam.Receive(priv)
if err != nil {
return
}
if attachment != nil {
attachment.Close()
}
}
}()
return pub, nil
}
func scriptString(script []*dockerscript.Command) string {
lines := make([]string, 0, len(script))
for _, cmd := range script {
line := strings.Join(cmd.Args, " ")
if len(cmd.Children) > 0 {
line += fmt.Sprintf(" { %s }", scriptString(cmd.Children))
} else {
line += " {}"
}
lines = append(lines, line)
}
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
}
func executeRootScript(script []*dockerscript.Command) error { func executeRootScript(script []*dockerscript.Command) error {
if len(rootPlugins) > 0 { if len(rootPlugins) > 0 {
var ( var (
@ -185,167 +129,6 @@ func executeScript(client *net.UnixConn, script []*dockerscript.Command) error {
return nil return nil
} }
func dialer(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
return
}
connections <-conn
}
}()
return connections, nil
}
func listener(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := l.Accept()
if err != nil {
return
}
Logf("new connection\n")
connections<-conn
}
}()
return connections, nil
}
func msgDesc(payload []byte, attachment *os.File) string {
var filedesc string = "<nil>"
if attachment != nil {
filedesc = fmt.Sprintf("%d", attachment.Fd())
}
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
}
func SendToConn(connections chan net.Conn, src *net.UnixConn) error {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := beam.Receive(src)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
conn, ok := <-connections
if !ok {
break
}
Logf("Sending %s\n", msgDesc(payload, attachment))
tasks.Add(1)
go func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
return
}
if attachment == nil {
conn.Close()
return
}
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
attachment.Close()
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
conn.Close()
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
iotasks.Wait()
}(payload, attachment, conn)
}
return nil
}
func bicopy(a, b io.ReadWriteCloser) {
var iotasks sync.WaitGroup
oneCopy := func(dst io.WriteCloser, src io.Reader) {
defer iotasks.Done()
io.Copy(dst, src)
dst.Close()
}
iotasks.Add(2)
go oneCopy(a, b)
go oneCopy(b, a)
iotasks.Wait()
}
func ReceiveFromConn(connections chan net.Conn, dst *net.UnixConn) error {
for conn := range connections {
err := func () error {
Logf("parsing message from network...\n")
defer Logf("done parsing message from network\n")
buf := make([]byte, 4098)
n, err := conn.Read(buf)
if n == 0 {
conn.Close()
if err == io.EOF {
return nil
} else {
return err
}
}
Logf("decoding message from '%s'\n", buf[:n])
header, skip, err := data.DecodeString(string(buf[:n]))
if err != nil {
conn.Close()
return err
}
pub, priv, err := beam.SocketPair()
if err != nil {
return err
}
Logf("decoded message: %s\n", data.Message(header).Pretty())
go func(skipped []byte, conn net.Conn, f *os.File) {
// this closes both conn and f
if len(skipped) > 0 {
if _, err := f.Write(skipped); err != nil {
Logf("ERROR: %v\n", err)
f.Close()
conn.Close()
return
}
}
bicopy(conn, f)
}(buf[skip:n], conn, pub)
if err := beam.Send(dst, []byte(header), priv); err != nil {
return err
}
return nil
}()
if err != nil {
Logf("Error reading from connection: %v\n", err)
}
}
return nil
}
// 1) Find a handler for the command (if no handler, fail) // 1) Find a handler for the command (if no handler, fail)
// 2) Attach new in & out pair to the handler // 2) Attach new in & out pair to the handler
@ -415,24 +198,8 @@ func executeCommand(client *net.UnixConn, cmd *dockerscript.Command) error {
return nil return nil
} }
func randomId() string {
id := make([]byte, 4)
io.ReadFull(rand.Reader, id)
return hex.EncodeToString(id)
}
func sendWPipe(conn *net.UnixConn, payload []byte) (*os.File, error) { type Handler func([]string, *net.UnixConn, *net.UnixConn)
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
if err := beam.Send(conn, payload, r); err != nil {
r.Close()
w.Close()
return nil, err
}
return w, nil
}
func GetHandler(name string) Handler { func GetHandler(name string) Handler {
if name == "log" { if name == "log" {
@ -836,6 +603,9 @@ func GetHandler(name string) Handler {
return nil return nil
} }
// VARIOUS HELPER FUNCTIONS:
func connToFile(conn net.Conn) (f *os.File, err error) { func connToFile(conn net.Conn) (f *os.File, err error) {
if connWithFile, ok := conn.(interface { File() (*os.File, error) }); !ok { if connWithFile, ok := conn.(interface { File() (*os.File, error) }); !ok {
return nil, fmt.Errorf("no file descriptor available") return nil, fmt.Errorf("no file descriptor available")
@ -875,3 +645,217 @@ func Fatalf(msg string, args ...interface{}) {
func Fatal(args ...interface{}) { func Fatal(args ...interface{}) {
Fatalf("%v", args[0]) Fatalf("%v", args[0])
} }
func scriptString(script []*dockerscript.Command) string {
lines := make([]string, 0, len(script))
for _, cmd := range script {
line := strings.Join(cmd.Args, " ")
if len(cmd.Children) > 0 {
line += fmt.Sprintf(" { %s }", scriptString(cmd.Children))
} else {
line += " {}"
}
lines = append(lines, line)
}
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
}
func beamCopy(dst *net.UnixConn, src *net.UnixConn) (int, error) {
var n int
for {
payload, attachment, err := beam.Receive(src)
if err == io.EOF {
return n, nil
} else if err != nil {
return n, err
}
if err := beam.Send(dst, payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return n, err
}
n++
}
panic("impossibru!")
return n, nil
}
func sendWPipe(conn *net.UnixConn, payload []byte) (*os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
if err := beam.Send(conn, payload, r); err != nil {
r.Close()
w.Close()
return nil, err
}
return w, nil
}
func dialer(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
return
}
connections <-conn
}
}()
return connections, nil
}
func listener(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := l.Accept()
if err != nil {
return
}
Logf("new connection\n")
connections<-conn
}
}()
return connections, nil
}
func SendToConn(connections chan net.Conn, src *net.UnixConn) error {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := beam.Receive(src)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
conn, ok := <-connections
if !ok {
break
}
Logf("Sending %s\n", msgDesc(payload, attachment))
tasks.Add(1)
go func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
return
}
if attachment == nil {
conn.Close()
return
}
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
attachment.Close()
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
conn.Close()
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
iotasks.Wait()
}(payload, attachment, conn)
}
return nil
}
func msgDesc(payload []byte, attachment *os.File) string {
var filedesc string = "<nil>"
if attachment != nil {
filedesc = fmt.Sprintf("%d", attachment.Fd())
}
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
}
func ReceiveFromConn(connections chan net.Conn, dst *net.UnixConn) error {
for conn := range connections {
err := func () error {
Logf("parsing message from network...\n")
defer Logf("done parsing message from network\n")
buf := make([]byte, 4098)
n, err := conn.Read(buf)
if n == 0 {
conn.Close()
if err == io.EOF {
return nil
} else {
return err
}
}
Logf("decoding message from '%s'\n", buf[:n])
header, skip, err := data.DecodeString(string(buf[:n]))
if err != nil {
conn.Close()
return err
}
pub, priv, err := beam.SocketPair()
if err != nil {
return err
}
Logf("decoded message: %s\n", data.Message(header).Pretty())
go func(skipped []byte, conn net.Conn, f *os.File) {
// this closes both conn and f
if len(skipped) > 0 {
if _, err := f.Write(skipped); err != nil {
Logf("ERROR: %v\n", err)
f.Close()
conn.Close()
return
}
}
bicopy(conn, f)
}(buf[skip:n], conn, pub)
if err := beam.Send(dst, []byte(header), priv); err != nil {
return err
}
return nil
}()
if err != nil {
Logf("Error reading from connection: %v\n", err)
}
}
return nil
}
func bicopy(a, b io.ReadWriteCloser) {
var iotasks sync.WaitGroup
oneCopy := func(dst io.WriteCloser, src io.Reader) {
defer iotasks.Done()
io.Copy(dst, src)
dst.Close()
}
iotasks.Add(2)
go oneCopy(a, b)
go oneCopy(b, a)
iotasks.Wait()
}