beam: improve the API with Sender/Receiver interfaces and utilities: Copy/SendPipe/SendPair

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
Solomon Hykes 2014-03-28 16:57:48 -07:00
parent 071e5e5a65
commit 9c5fdb249f
5 changed files with 207 additions and 176 deletions

View file

@ -87,6 +87,7 @@ func main() {
func executeRootScript(script []*dockerscript.Command) error {
if len(rootPlugins) > 0 {
// If there are root plugins, wrap the script inside them
var (
rootCmd *dockerscript.Command
lastCmd *dockerscript.Command
@ -108,7 +109,7 @@ func executeRootScript(script []*dockerscript.Command) error {
return executeScript(nil, script)
}
func executeScript(client *net.UnixConn, script []*dockerscript.Command) error {
func executeScript(out beam.Sender, script []*dockerscript.Command) error {
Debugf("executeScript(%s)\n", scriptString(script))
defer Debugf("executeScript(%s) DONE\n", scriptString(script))
var background sync.WaitGroup
@ -116,12 +117,12 @@ func executeScript(client *net.UnixConn, script []*dockerscript.Command) error {
for _, cmd := range script {
if cmd.Background {
background.Add(1)
go func(client *net.UnixConn, cmd *dockerscript.Command) {
executeCommand(client, cmd)
go func(out beam.Sender, cmd *dockerscript.Command) {
executeCommand(out, cmd)
background.Done()
}(client, cmd)
}(out, cmd)
} else {
if err := executeCommand(client, cmd); err != nil {
if err := executeCommand(out, cmd); err != nil {
return err
}
}
@ -136,8 +137,8 @@ func executeScript(client *net.UnixConn, script []*dockerscript.Command) error {
// 4) [in the background] Run the handler
// 5) Recursively executeScript() all children commands and wait for them to complete
// 6) Wait for handler to return and (shortly afterwards) output copy to complete
// 7)
func executeCommand(client *net.UnixConn, cmd *dockerscript.Command) error {
// 7) Profit
func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
if flX {
fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1))
}
@ -177,9 +178,9 @@ func executeCommand(client *net.UnixConn, cmd *dockerscript.Command) error {
tasks.Done()
}()
go func() {
if client != nil {
if out != nil {
Debugf("[%s] copy start...\n", strings.Join(cmd.Args, " "))
n, err := beamCopy(client, outPub)
n, err := beam.Copy(out, outPub)
if err != nil {
Fatal(err)
}
@ -199,18 +200,18 @@ func executeCommand(client *net.UnixConn, cmd *dockerscript.Command) error {
}
type Handler func([]string, *net.UnixConn, *net.UnixConn)
type Handler func([]string, beam.Receiver, beam.Sender)
func GetHandler(name string) Handler {
if name == "log" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
stdout, err := sendWPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes())
stdout, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
defer stdout.Close()
stderr, err := sendWPipe(out, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", args...).Bytes())
stderr, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
@ -221,14 +222,14 @@ func GetHandler(name string) Handler {
}
var n int = 1
for {
payload, attachment, err := beam.Receive(in)
payload, attachment, err := in.Receive()
if err != nil {
return
}
if attachment == nil {
continue
}
w, err := sendWPipe(out, payload)
w, err := beam.SendPipe(out, payload)
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
attachment.Close()
@ -269,20 +270,20 @@ func GetHandler(name string) Handler {
tasks.Wait()
}
} else if name == "render" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
stdout, err := sendWPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes())
return func(args []string, in beam.Receiver, out beam.Sender) {
stdout, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
defer stdout.Close()
stderr, err := sendWPipe(out, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", args...).Bytes())
stderr, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
defer stderr.Close()
if len(args) != 2 {
fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0])
beam.Send(out, data.Empty().Set("status", "1").Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
txt := args[1]
@ -291,7 +292,7 @@ func GetHandler(name string) Handler {
}
t := template.Must(template.New("render").Parse(txt))
for {
payload, attachment, err := beam.Receive(in)
payload, attachment, err := in.Receive()
if err != nil {
return
}
@ -301,18 +302,18 @@ func GetHandler(name string) Handler {
}
if err := t.Execute(stdout, msg); err != nil {
fmt.Fprintf(stderr, "rendering error: %v\n", err)
beam.Send(out, data.Empty().Set("status", "1").Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
if err := beam.Send(out, payload, attachment); err != nil {
if err := out.Send(payload, attachment); err != nil {
return
}
}
}
} else if name == "devnull" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
for {
_, attachment, err := beam.Receive(in)
_, attachment, err := in.Receive()
if err != nil {
return
}
@ -322,11 +323,11 @@ func GetHandler(name string) Handler {
}
}
} else if name == "stdio" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := beam.Receive(in)
payload, attachment, err := in.Receive()
if err != nil {
return
}
@ -353,8 +354,8 @@ func GetHandler(name string) Handler {
}
}
} else if name == "echo" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
stdout, err := sendWPipe(out, data.Empty().Set("cmd", "log", "stdout").Bytes())
return func(args []string, in beam.Receiver, out beam.Sender) {
stdout, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stdout").Bytes())
if err != nil {
return
}
@ -362,13 +363,13 @@ func GetHandler(name string) Handler {
stdout.Close()
}
} else if name == "pass" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
for {
payload, attachment, err := beam.Receive(in)
payload, attachment, err := in.Receive()
if err != nil {
return
}
if err := beam.Send(out, payload, attachment); err != nil {
if err := out.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
@ -377,20 +378,20 @@ func GetHandler(name string) Handler {
}
}
} else if name == "in" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
os.Chdir(args[1])
GetHandler("pass")([]string{"pass"}, in, out)
}
} else if name == "exec" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
cmd := exec.Command(args[1], args[2:]...)
stdout, err := sendWPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes())
stdout, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
defer stdout.Close()
cmd.Stdout = stdout
stderr, err := sendWPipe(out, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", args...).Bytes())
stderr, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
@ -404,12 +405,12 @@ func GetHandler(name string) Handler {
} else {
status = "ok"
}
beam.Send(out, data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil)
out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil)
}
} else if name == "trace" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
for {
p, a, err := beam.Receive(in)
p, a, err := in.Receive()
if err != nil {
return
}
@ -423,17 +424,17 @@ func GetHandler(name string) Handler {
msg = fmt.Sprintf("%s [%d]", msg, a.Fd())
}
fmt.Printf("===> %s\n", msg)
beam.Send(out, p, a)
out.Send(p, a)
}
}
} else if name == "emit" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
beam.Send(out, data.Parse(args[1:]).Bytes(), nil)
return func(args []string, in beam.Receiver, out beam.Sender) {
out.Send(data.Parse(args[1:]).Bytes(), nil)
}
} else if name == "print" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
for {
_, a, err := beam.Receive(in)
_, a, err := in.Receive()
if err != nil {
return
}
@ -443,10 +444,10 @@ func GetHandler(name string) Handler {
}
}
} else if name == "multiprint" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
for {
payload, a, err := beam.Receive(in)
payload, a, err := in.Receive()
if err != nil {
return
}
@ -465,25 +466,25 @@ func GetHandler(name string) Handler {
tasks.Wait()
}
} else if name == "listen" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
for {
conn, err := l.Accept()
if err != nil {
beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
f, err := connToFile(conn)
@ -491,13 +492,13 @@ func GetHandler(name string) Handler {
conn.Close()
continue
}
beam.Send(out, data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
}
}
} else if name == "beamsend" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
if len(args) < 2 {
if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
@ -506,16 +507,16 @@ func GetHandler(name string) Handler {
connector = dialer
connections, err := connector(args[1])
if err != nil {
beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
SendToConn(connections, in)
}
} else if name == "beamreceive" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
@ -524,26 +525,26 @@ func GetHandler(name string) Handler {
connector = listener
connections, err := connector(args[1])
if err != nil {
beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
ReceiveFromConn(connections, out)
}
} else if name == "connect" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
var tasks sync.WaitGroup
for {
_, attachment, err := beam.Receive(in)
_, attachment, err := in.Receive()
if err != nil {
break
}
@ -553,10 +554,10 @@ func GetHandler(name string) Handler {
Logf("connecting to %s/%s\n", u.Scheme, u.Host)
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
beam.Send(out, data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil)
out.Send(data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil)
return
}
beam.Send(out, data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
tasks.Add(1)
go func(attachment *os.File, conn net.Conn) {
defer tasks.Done()
@ -580,13 +581,13 @@ func GetHandler(name string) Handler {
tasks.Wait()
}
} else if name == "openfile" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
return func(args []string, in beam.Receiver, out beam.Sender) {
for _, name := range args {
f, err := os.Open(name)
if err != nil {
continue
}
if err := beam.Send(out, data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil {
if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil {
f.Close()
}
}
@ -652,39 +653,6 @@ func scriptString(script []*dockerscript.Command) string {
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
}
func beamCopy(dst *net.UnixConn, src *net.UnixConn) (int, error) {
var n int
for {
payload, attachment, err := beam.Receive(src)
if err == io.EOF {
return n, nil
} else if err != nil {
return n, err
}
if err := beam.Send(dst, payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return n, err
}
n++
}
panic("impossibru!")
return n, nil
}
func sendWPipe(conn *net.UnixConn, payload []byte) (*os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
if err := beam.Send(conn, payload, r); err != nil {
r.Close()
w.Close()
return nil, err
}
return w, nil
}
func dialer(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
@ -731,11 +699,11 @@ func listener(addr string) (chan net.Conn, error) {
func SendToConn(connections chan net.Conn, src *net.UnixConn) error {
func SendToConn(connections chan net.Conn, src beam.Receiver) error {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := beam.Receive(src)
payload, attachment, err := src.Receive()
if err == io.EOF {
return nil
} else if err != nil {
@ -787,7 +755,7 @@ func msgDesc(payload []byte, attachment *os.File) string {
}
func ReceiveFromConn(connections chan net.Conn, dst *net.UnixConn) error {
func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error {
for conn := range connections {
err := func () error {
Logf("parsing message from network...\n")
@ -825,7 +793,7 @@ func ReceiveFromConn(connections chan net.Conn, dst *net.UnixConn) error {
}
bicopy(conn, f)
}(buf[skip:n], conn, pub)
if err := beam.Send(dst, []byte(header), priv); err != nil {
if err := dst.Send([]byte(header), priv); err != nil {
return err
}
return nil