beam/examples/beamsh: catch introspection calls from jobs for proper nesting

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
Solomon Hykes 2014-03-25 10:14:27 -07:00
parent 5df3aaeda2
commit 8b8e477ede

View file

@ -201,29 +201,37 @@ func CmdLog(args []string, f *os.File) {
} }
} }
type Msg struct {
payload []byte
attachment *os.File
}
func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error { func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error {
Debugf("[Serve %#v]\n", handler) Debugf("[Serve %#v]\n", handler)
defer Debugf("[Serve %#v] done\n", handler) defer Debugf("[Serve %#v] done\n", handler)
var tasks sync.WaitGroup var tasks sync.WaitGroup
defer tasks.Wait() defer tasks.Wait()
for { in := make(chan *Msg)
payload, attachment, err := beam.Receive(endpoint) go func() {
if err != nil { for {
return err Debugf("[Serve] waiting for next message on endpoint...\n")
payload, attachment, err := beam.Receive(endpoint)
if err != nil {
break
}
in<-&Msg{payload, attachment}
} }
Debugf("[Serve] endpoint closed. Waiting for tasks to complete\n")
tasks.Wait()
Debugf("[Serve] endpoint closed AND tasks complete\n")
close(in)
}()
for msg := range in {
tasks.Add(1) tasks.Add(1)
// Handle new message // Handle new message
go func(payload []byte, attachment *os.File) { go func(payload []byte, attachment *os.File) {
Debugf("---> Handling '%s' [fd=%d]\n", payload, attachment.Fd()) Debugf("---> Handling '%s' [fd=%d]\n", payload, attachment.Fd())
defer tasks.Done() defer tasks.Done()
if attachment != nil {
defer func() {
Debugf("---> Closing attachment [fd=%d] for msg '%s'\n", attachment.Fd(), payload)
attachment.Close()
}()
} else {
defer Debugf("---> No attachment to close for msg '%s'\n", payload)
}
args, err := parseMsgPayload(payload) args, err := parseMsgPayload(payload)
if err != nil { if err != nil {
Logf("error parsing beam message: %s\n", err) Logf("error parsing beam message: %s\n", err)
@ -233,10 +241,70 @@ func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error {
return return
} }
Debugf("---> calling handler for '%s'\n", args[0]) Debugf("---> calling handler for '%s'\n", args[0])
handler(args, attachment) handlerAttachment := attachment
var iotasks sync.WaitGroup
if attachment != nil {
if caller, err := beam.FdConn(int(attachment.Fd())); err == nil {
Debugf("[Serve] message '%s' has a valid beam endpoint as attachment. Setting up indirection\n", payload)
defer caller.Close()
jobpub, jobpriv, err := beam.USocketPair()
if err != nil {
return
}
defer jobpub.Close()
if f, err := jobpriv.File(); err != nil {
jobpriv.Close()
return
} else {
handlerAttachment = f
defer attachment.Close()
}
jobpriv.Close()
// Read messages from the job and re-insert them for handling
iotasks.Add(1)
go func(job *net.UnixConn) {
defer iotasks.Done()
for {
payload, attachment, err := beam.Receive(job)
if err != nil {
return
}
var fd int = -1
if attachment != nil {
fd = int(attachment.Fd())
}
Debugf("[Serve] received introspection message '%s'[%d]\n", payload, fd)
// Send messages back in for introspection
// Note that we don't scope introspection: jobs have full access to the
// context in which they were called.
in <-&Msg{payload, attachment}
}
}(jobpub)
// Read messages from the caller to the job
go func(caller *net.UnixConn, job *net.UnixConn) {
for {
payload, f, err := beam.Receive(caller)
if err != nil {
return
}
if err := beam.Send(job, payload, f); err != nil {
return
}
}
}(caller, jobpub)
}
}
handler(args, handlerAttachment)
Debugf("---> handler returned for '%s'\n", args[0]) Debugf("---> handler returned for '%s'\n", args[0])
}(payload, attachment) if handlerAttachment != nil {
handlerAttachment.Close()
}
Debugf("---> waiting for iotasks to complete for '%s'\n", args[0])
iotasks.Wait()
Debugf("---> iotasks complete for '%s'\n", args[0])
}(msg.payload, msg.attachment)
} }
Debugf("[Serve] main serve loop completed\n")
return nil return nil
} }