From 8b8e477ede6e429f9bfa0ebaa2e771d8dd35c06b Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 25 Mar 2014 10:14:27 -0700 Subject: [PATCH] beam/examples/beamsh: catch introspection calls from jobs for proper nesting Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- beam/examples/beamsh/beamsh.go | 96 +++++++++++++++++++++++++++++----- 1 file changed, 82 insertions(+), 14 deletions(-) diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index be07fdd..9b7fa93 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -201,29 +201,37 @@ func CmdLog(args []string, f *os.File) { } } +type Msg struct { + payload []byte + attachment *os.File +} + func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error { Debugf("[Serve %#v]\n", handler) defer Debugf("[Serve %#v] done\n", handler) var tasks sync.WaitGroup defer tasks.Wait() - for { - payload, attachment, err := beam.Receive(endpoint) - if err != nil { - return err + in := make(chan *Msg) + go func() { + for { + Debugf("[Serve] waiting for next message on endpoint...\n") + payload, attachment, err := beam.Receive(endpoint) + if err != nil { + break + } + in<-&Msg{payload, attachment} } + Debugf("[Serve] endpoint closed. Waiting for tasks to complete\n") + tasks.Wait() + Debugf("[Serve] endpoint closed AND tasks complete\n") + close(in) + }() + for msg := range in { tasks.Add(1) // Handle new message go func(payload []byte, attachment *os.File) { Debugf("---> Handling '%s' [fd=%d]\n", payload, attachment.Fd()) defer tasks.Done() - if attachment != nil { - defer func() { - Debugf("---> Closing attachment [fd=%d] for msg '%s'\n", attachment.Fd(), payload) - attachment.Close() - }() - } else { - defer Debugf("---> No attachment to close for msg '%s'\n", payload) - } args, err := parseMsgPayload(payload) if err != nil { Logf("error parsing beam message: %s\n", err) @@ -233,10 +241,70 @@ func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error { return } Debugf("---> calling handler for '%s'\n", args[0]) - handler(args, attachment) + handlerAttachment := attachment + var iotasks sync.WaitGroup + if attachment != nil { + if caller, err := beam.FdConn(int(attachment.Fd())); err == nil { + Debugf("[Serve] message '%s' has a valid beam endpoint as attachment. Setting up indirection\n", payload) + defer caller.Close() + jobpub, jobpriv, err := beam.USocketPair() + if err != nil { + return + } + defer jobpub.Close() + if f, err := jobpriv.File(); err != nil { + jobpriv.Close() + return + } else { + handlerAttachment = f + defer attachment.Close() + } + jobpriv.Close() + // Read messages from the job and re-insert them for handling + iotasks.Add(1) + go func(job *net.UnixConn) { + defer iotasks.Done() + for { + payload, attachment, err := beam.Receive(job) + if err != nil { + return + } + var fd int = -1 + if attachment != nil { + fd = int(attachment.Fd()) + } + Debugf("[Serve] received introspection message '%s'[%d]\n", payload, fd) + // Send messages back in for introspection + // Note that we don't scope introspection: jobs have full access to the + // context in which they were called. + in <-&Msg{payload, attachment} + } + }(jobpub) + // Read messages from the caller to the job + go func(caller *net.UnixConn, job *net.UnixConn) { + for { + payload, f, err := beam.Receive(caller) + if err != nil { + return + } + if err := beam.Send(job, payload, f); err != nil { + return + } + } + }(caller, jobpub) + } + } + handler(args, handlerAttachment) Debugf("---> handler returned for '%s'\n", args[0]) - }(payload, attachment) + if handlerAttachment != nil { + handlerAttachment.Close() + } + Debugf("---> waiting for iotasks to complete for '%s'\n", args[0]) + iotasks.Wait() + Debugf("---> iotasks complete for '%s'\n", args[0]) + }(msg.payload, msg.attachment) } + Debugf("[Serve] main serve loop completed\n") return nil }