diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index f4bc2bf..bbe0c8a 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -243,7 +243,6 @@ func Handlers() (*beam.UnixConn, error) { func GetHandler(name string) Handler { if name == "logger" { return func(args []string, in beam.Receiver, out beam.Sender) { - var tasks sync.WaitGroup stdout, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes()) if err != nil { return @@ -258,35 +257,17 @@ func GetHandler(name string) Handler { fmt.Fprintf(stderr, "%v\n", err) return } + var tasks sync.WaitGroup + defer tasks.Wait() var n int = 1 - for { - payload, attachment, err := in.Receive() - if err != nil { - return - } - if attachment == nil { - continue - } - w, err := beam.SendPipe(out, payload) - if err != nil { - fmt.Fprintf(stderr, "%v\n", err) - attachment.Close() - return - } + r := beam.NewRouter(out) + r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func (payload []byte, attachment *os.File) error { tasks.Add(1) - go func(payload []byte, attachment *os.File, n int, sink *os.File) { + go func(n int) { defer tasks.Done() defer attachment.Close() - defer sink.Close() - cmd := data.Message(payload).Get("cmd") - if cmd == nil || len(cmd) == 0 { - return - } - if cmd[0] != "log" { - return - } var streamname string - if len(cmd) == 1 || cmd[1] == "stdout" { + if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" { streamname = "stdout" } else { streamname = cmd[1] @@ -299,13 +280,17 @@ func GetHandler(name string) Handler { fmt.Fprintf(stderr, "%v\n", err) return } - io.Copy(io.MultiWriter(logfile, sink), attachment) + defer logfile.Close() + io.Copy(logfile, attachment) logfile.Sync() - logfile.Close() - }(payload, attachment, n, w) + }(n) n++ + return nil + }).Tee(out) + if _, err := beam.Copy(r, in); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return } - tasks.Wait() } } else if name == "render" { return func(args []string, in beam.Receiver, out beam.Sender) {