beam/examples/beamsh: commands are messages.

Commands in the pipeline should either implement or pass-through command messages.

This amounts to a proof-of-concept implementation of the "pipeline"
design of Docker plugins.

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
Solomon Hykes 2014-03-31 12:04:39 -07:00
parent 2edf4802d8
commit 5220dbeffd

View file

@ -106,7 +106,15 @@ func executeRootScript(script []*dockerscript.Command) error {
lastCmd.Children = script
script = []*dockerscript.Command{rootCmd}
}
return executeScript(nil, script)
handlers, err := Handlers()
if err != nil {
return err
}
defer handlers.Close()
if err := executeScript(handlers, script); err != nil {
return err
}
return nil
}
func executeScript(out beam.Sender, script []*dockerscript.Command) error {
@ -147,61 +155,92 @@ func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
if len(cmd.Args) == 0 {
return fmt.Errorf("empty command")
}
handler := GetHandler(cmd.Args[0])
if handler == nil {
return fmt.Errorf("no such command: %s", cmd.Args[0])
}
inPub, inPriv, err := beam.USocketPair()
Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " "))
job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Bytes())
if err != nil {
return err
return fmt.Errorf("%v\n", err)
}
// Don't close inPub here. We close it to signify the end of input once
// all children are completed (guaranteeing that no more input will be sent
// by children).
// Otherwise we get a deadlock.
defer inPriv.Close()
outPub, outPriv, err := beam.USocketPair()
if err != nil {
return err
}
defer outPub.Close()
// don't close outPriv here. It must be closed after the handler is called,
// but before the copy tasks associated with it completes.
// Otherwise we get a deadlock.
var tasks sync.WaitGroup
tasks.Add(2)
go func() {
handler(cmd.Args, inPriv, outPriv)
// FIXME: do we need to outPriv.sync before closing it?
Debugf("[%s] handler returned, closing output\n", strings.Join(cmd.Args, " "))
outPriv.Close()
tasks.Done()
}()
tasks.Add(1)
Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
go func() {
if out != nil {
Debugf("[%s] copy start...\n", strings.Join(cmd.Args, " "))
n, err := beam.Copy(out, outPub)
Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
n, err := beam.Copy(out, job)
if err != nil {
Fatal(err)
Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err)
}
Debugf("[%s] copied %d messages\n", strings.Join(cmd.Args, " "), n)
Debugf("[%s] copy done\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n)
}
tasks.Done()
}()
// depth-first execution of children commands
// executeScript() blocks until all commands are completed
executeScript(inPub, cmd.Children)
inPub.Close()
Debugf("[%s] waiting for handler and output copy to complete...\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
executeScript(job, cmd.Children)
Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
job.CloseWrite()
Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " "))
tasks.Wait()
Debugf("[%s] handler and output copy complete!\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " "))
return nil
}
type Handler func([]string, beam.Receiver, beam.Sender)
func Handlers() (*beam.UnixConn, error) {
var tasks sync.WaitGroup
pub, priv, err := beam.USocketPair()
if err != nil {
return nil, err
}
go func() {
defer func() {
Debugf("[handlers] closewrite() on endpoint\n")
// FIXME: this is not yet necessary but will be once
// there is synchronization over standard beam messages
priv.CloseWrite()
Debugf("[handlers] done closewrite() on endpoint\n")
}()
for {
Debugf("[handlers] waiting for next job...\n")
payload, conn, err := beam.ReceiveConn(priv)
Debugf("[handlers] ReceiveConn() returned %v\n", err)
if err != nil {
return
}
tasks.Add(1)
go func(payload []byte, conn *beam.UnixConn) {
defer tasks.Done()
defer func() {
Debugf("[handlers] '%s' closewrite\n", payload)
conn.CloseWrite()
Debugf("[handlers] '%s' done closewrite\n", payload)
}()
cmd := data.Message(payload).Get("cmd")
Debugf("[handlers] received %s\n", strings.Join(cmd, " "))
if len(cmd) == 0 {
return
}
handler := GetHandler(cmd[0])
if handler == nil {
return
}
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
handler(cmd, beam.Receiver(conn), beam.Sender(conn))
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
}(payload, conn)
}
Debugf("[handlers] waiting for all tasks\n")
tasks.Wait()
Debugf("[handlers] all tasks returned\n")
}()
return pub, nil
}
func GetHandler(name string) Handler {
if name == "log" {
return func(args []string, in beam.Receiver, out beam.Sender) {
@ -324,6 +363,16 @@ func GetHandler(name string) Handler {
}
} else if name == "stdio" {
return func(args []string, in beam.Receiver, out beam.Sender) {
stdout, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
defer stdout.Close()
stderr, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
defer stderr.Close()
var tasks sync.WaitGroup
defer tasks.Wait()
for {
@ -331,26 +380,39 @@ func GetHandler(name string) Handler {
if err != nil {
return
}
tasks.Add(1)
go func(payload []byte, attachment *os.File) {
defer tasks.Done()
if attachment == nil {
cmd := data.Message(payload).Get("cmd")
if attachment != nil && len(cmd) > 0 && cmd[0] == "log" {
w, err := beam.SendPipe(out, payload)
if err != nil {
attachment.Close()
fmt.Fprintf(stderr, "sendpipe: %v\n", err)
return
}
defer attachment.Close()
cmd := data.Message(payload).Get("cmd")
if cmd == nil || len(cmd) == 0 {
tasks.Add(1)
go func(payload []byte, attachment *os.File, sink *os.File) {
defer tasks.Done()
defer attachment.Close()
defer sink.Close()
cmd := data.Message(payload).Get("cmd")
if cmd == nil || len(cmd) == 0 {
return
}
if cmd[0] != "log" {
return
}
var output io.Writer
if len(cmd) == 1 || cmd[1] == "stdout" {
output = os.Stdout
} else if cmd[1] == "stderr" {
output = os.Stderr
}
io.Copy(io.MultiWriter(output, sink), attachment)
}(payload, attachment, w)
} else {
if err := out.Send(payload, attachment); err != nil {
return
}
if cmd[0] != "log" {
return
}
if len(cmd) == 1 || cmd[1] == "stdout" {
io.Copy(os.Stdout, attachment)
} else if cmd[1] == "stderr" {
io.Copy(os.Stderr, attachment)
}
}(payload, attachment)
}
}
}
} else if name == "echo" {