diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index fa6a8d1..b7eccc3 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -106,7 +106,15 @@ func executeRootScript(script []*dockerscript.Command) error { lastCmd.Children = script script = []*dockerscript.Command{rootCmd} } - return executeScript(nil, script) + handlers, err := Handlers() + if err != nil { + return err + } + defer handlers.Close() + if err := executeScript(handlers, script); err != nil { + return err + } + return nil } func executeScript(out beam.Sender, script []*dockerscript.Command) error { @@ -147,61 +155,92 @@ func executeCommand(out beam.Sender, cmd *dockerscript.Command) error { if len(cmd.Args) == 0 { return fmt.Errorf("empty command") } - handler := GetHandler(cmd.Args[0]) - if handler == nil { - return fmt.Errorf("no such command: %s", cmd.Args[0]) - } - inPub, inPriv, err := beam.USocketPair() + Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " ")) + job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Bytes()) if err != nil { - return err + return fmt.Errorf("%v\n", err) } - // Don't close inPub here. We close it to signify the end of input once - // all children are completed (guaranteeing that no more input will be sent - // by children). - // Otherwise we get a deadlock. - defer inPriv.Close() - outPub, outPriv, err := beam.USocketPair() - if err != nil { - return err - } - defer outPub.Close() - // don't close outPriv here. It must be closed after the handler is called, - // but before the copy tasks associated with it completes. - // Otherwise we get a deadlock. var tasks sync.WaitGroup - tasks.Add(2) - go func() { - handler(cmd.Args, inPriv, outPriv) - // FIXME: do we need to outPriv.sync before closing it? - Debugf("[%s] handler returned, closing output\n", strings.Join(cmd.Args, " ")) - outPriv.Close() - tasks.Done() - }() + tasks.Add(1) + Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) go func() { if out != nil { - Debugf("[%s] copy start...\n", strings.Join(cmd.Args, " ")) - n, err := beam.Copy(out, outPub) + Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) + n, err := beam.Copy(out, job) if err != nil { - Fatal(err) + Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err) } - Debugf("[%s] copied %d messages\n", strings.Join(cmd.Args, " "), n) - Debugf("[%s] copy done\n", strings.Join(cmd.Args, " ")) + Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n) } tasks.Done() }() // depth-first execution of children commands // executeScript() blocks until all commands are completed - executeScript(inPub, cmd.Children) - inPub.Close() - Debugf("[%s] waiting for handler and output copy to complete...\n", strings.Join(cmd.Args, " ")) + Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) + executeScript(job, cmd.Children) + Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) + job.CloseWrite() + Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " ")) + Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " ")) tasks.Wait() - Debugf("[%s] handler and output copy complete!\n", strings.Join(cmd.Args, " ")) + Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " ")) return nil } type Handler func([]string, beam.Receiver, beam.Sender) + +func Handlers() (*beam.UnixConn, error) { + var tasks sync.WaitGroup + pub, priv, err := beam.USocketPair() + if err != nil { + return nil, err + } + go func() { + defer func() { + Debugf("[handlers] closewrite() on endpoint\n") + // FIXME: this is not yet necessary but will be once + // there is synchronization over standard beam messages + priv.CloseWrite() + Debugf("[handlers] done closewrite() on endpoint\n") + }() + for { + Debugf("[handlers] waiting for next job...\n") + payload, conn, err := beam.ReceiveConn(priv) + Debugf("[handlers] ReceiveConn() returned %v\n", err) + if err != nil { + return + } + tasks.Add(1) + go func(payload []byte, conn *beam.UnixConn) { + defer tasks.Done() + defer func() { + Debugf("[handlers] '%s' closewrite\n", payload) + conn.CloseWrite() + Debugf("[handlers] '%s' done closewrite\n", payload) + }() + cmd := data.Message(payload).Get("cmd") + Debugf("[handlers] received %s\n", strings.Join(cmd, " ")) + if len(cmd) == 0 { + return + } + handler := GetHandler(cmd[0]) + if handler == nil { + return + } + Debugf("[handlers] calling %s\n", strings.Join(cmd, " ")) + handler(cmd, beam.Receiver(conn), beam.Sender(conn)) + Debugf("[handlers] returned: %s\n", strings.Join(cmd, " ")) + }(payload, conn) + } + Debugf("[handlers] waiting for all tasks\n") + tasks.Wait() + Debugf("[handlers] all tasks returned\n") + }() + return pub, nil +} + func GetHandler(name string) Handler { if name == "log" { return func(args []string, in beam.Receiver, out beam.Sender) { @@ -324,6 +363,16 @@ func GetHandler(name string) Handler { } } else if name == "stdio" { return func(args []string, in beam.Receiver, out beam.Sender) { + stdout, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes()) + if err != nil { + return + } + defer stdout.Close() + stderr, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", args...).Bytes()) + if err != nil { + return + } + defer stderr.Close() var tasks sync.WaitGroup defer tasks.Wait() for { @@ -331,26 +380,39 @@ func GetHandler(name string) Handler { if err != nil { return } - tasks.Add(1) - go func(payload []byte, attachment *os.File) { - defer tasks.Done() - if attachment == nil { + cmd := data.Message(payload).Get("cmd") + if attachment != nil && len(cmd) > 0 && cmd[0] == "log" { + w, err := beam.SendPipe(out, payload) + if err != nil { + attachment.Close() + fmt.Fprintf(stderr, "sendpipe: %v\n", err) return } - defer attachment.Close() - cmd := data.Message(payload).Get("cmd") - if cmd == nil || len(cmd) == 0 { + tasks.Add(1) + go func(payload []byte, attachment *os.File, sink *os.File) { + defer tasks.Done() + defer attachment.Close() + defer sink.Close() + cmd := data.Message(payload).Get("cmd") + if cmd == nil || len(cmd) == 0 { + return + } + if cmd[0] != "log" { + return + } + var output io.Writer + if len(cmd) == 1 || cmd[1] == "stdout" { + output = os.Stdout + } else if cmd[1] == "stderr" { + output = os.Stderr + } + io.Copy(io.MultiWriter(output, sink), attachment) + }(payload, attachment, w) + } else { + if err := out.Send(payload, attachment); err != nil { return } - if cmd[0] != "log" { - return - } - if len(cmd) == 1 || cmd[1] == "stdout" { - io.Copy(os.Stdout, attachment) - } else if cmd[1] == "stderr" { - io.Copy(os.Stderr, attachment) - } - }(payload, attachment) + } } } } else if name == "echo" {