beam/examples/beamsh: basic implementation of the pipeline design, with static handlers for now.

In the pipeline design, several beam commands can be run concurrently,
with their respective inputs and outputs connected in such a way that
beam messages flow from the first to last. This is similar to the way
a unix shell executes commands in a pipeline: instead of STDIN and
STDOUT, each beam command has a "BEAMIN" and "BEAMOUT".

Since beam allows for richer communication than plain byte streams, beam
pipelines can express more powerful computation, while retaining the
fundamental elegance and ease of use of unix-style composition.

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
Solomon Hykes 2014-03-25 18:11:06 -07:00
parent 9f4bcf010a
commit f295ccc72a

View file

@ -4,7 +4,6 @@ import (
"io"
"fmt"
"os"
"os/exec"
"github.com/dotcloud/docker/pkg/dockerscript"
"github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data"
@ -14,20 +13,16 @@ import (
"net"
"path"
"bufio"
"strconv"
"crypto/rand"
"encoding/hex"
)
func main() {
client, engine, err := beam.USocketPair()
devnull, err := Devnull()
if err != nil {
Fatal(err)
}
defer client.Close()
go func() {
Serve(engine, builtinsHandler)
Debugf("Shutting down engine\n")
engine.Close()
}()
defer devnull.Close()
if term.IsTerminal(0) {
input := bufio.NewScanner(os.Stdin)
for {
@ -42,7 +37,7 @@ func main() {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
continue
}
executeScript(client, cmd)
executeScript(devnull, cmd)
}
if err := input.Err(); err == io.EOF {
break
@ -55,84 +50,188 @@ func main() {
if err != nil {
Fatal("parse error: %v\n", err)
}
executeScript(client, script)
executeScript(devnull, script)
}
}
func executeScript(client *net.UnixConn, script []*dockerscript.Command) {
Debugf("%d commands:\n", len(script))
for _, cmd := range script {
job, err := beam.SendPipe(client, data.Empty().Set("cmd", cmd.Args...).Bytes())
if err != nil {
Fatal(err)
func beamCopy(dst *net.UnixConn, src *net.UnixConn) error {
for {
payload, attachment, err := beam.Receive(src)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
if err := beam.Send(dst, payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return err
}
// Recursively execute child-commands as commands to the new job
// executeScript blocks until commands are done, so this is depth-first recursion.
executeScript(job, cmd.Children)
// TODO: pass a default handler to deal with 'status'
// --> use beam chaining?
Debugf("[%s] Listening for reply messages\n", strings.Join(cmd.Args, " "))
Serve(job, builtinsHandler)
Debugf("[%s] done listening for reply messages\n", strings.Join(cmd.Args, " "))
}
panic("impossibru!")
return nil
}
func parseMsgPayload(payload []byte) ([]string, error) {
msg, err := data.Decode(string(payload))
type Handler func([]string, *net.UnixConn, *net.UnixConn)
func Devnull() (*net.UnixConn, error) {
priv, pub, err := beam.USocketPair()
if err != nil {
return nil, err
}
var cmd []string
if c, exists := msg["cmd"]; exists {
cmd = c
}
if len(cmd) == 0 {
return nil, fmt.Errorf("empty command")
}
return cmd, nil
}
func CmdCat(args []string, f *os.File) {
for _, name := range args[1:] {
f, err := os.Open(name)
if err != nil {
continue
go func() {
defer priv.Close()
for {
payload, attachment, err := beam.Receive(priv)
if err != nil {
return
}
fmt.Fprintf(os.Stderr, "[devnull] discarding '%s'\n", payload)
if attachment != nil {
attachment.Close()
}
}
io.Copy(os.Stdout, f)
f.Close()
}
}()
return pub, nil
}
func CmdEcho(args []string, f *os.File) {
resp, err := beam.FdConn(int(f.Fd()))
if err != nil {
Fatal(err)
return
func scriptString(script []*dockerscript.Command) string {
lines := make([]string, 0, len(script))
for _, cmd := range script {
line := strings.Join(cmd.Args, " ")
if len(cmd.Children) > 0 {
line += fmt.Sprintf(" { %s }", scriptString(cmd.Children))
} else {
line += " {}"
}
lines = append(lines, line)
}
defer resp.Close()
r, w, err := os.Pipe()
if err != nil {
return
}
Debugf("[CmdEcho] stdout pipe() r=%d w=%d\n", r.Fd(), w.Fd())
if err := beam.Send(resp, data.Empty().Set("cmd", "log", "stdout").Bytes(), r); err != nil {
return
}
fmt.Fprintln(w, strings.Join(args[1:], " "))
w.Close()
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
}
func CmdExit(args []string, f *os.File) {
var status int
if len(args) > 1 {
val, err := strconv.ParseInt(args[1], 10, 32)
if err == nil {
status = int(val)
func executeScript(client *net.UnixConn, script []*dockerscript.Command) error {
Debugf("executeScript(%s)\n", scriptString(script))
defer Debugf("executeScript(%s) DONE\n", scriptString(script))
for _, cmd := range script {
if err := executeCommand(client, cmd); err != nil {
return err
}
}
os.Exit(status)
return nil
}
// 1) Find a handler for the command (if no handler, fail)
// 2) Attach new in & out pair to the handler
// 3) [in the background] Copy handler output to our own output
// 4) [in the background] Run the handler
// 5) Recursively executeScript() all children commands and wait for them to complete
// 6) Wait for handler to return and (shortly afterwards) output copy to complete
// 7)
func executeCommand(client *net.UnixConn, cmd *dockerscript.Command) error {
Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " "))
defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " "))
handler := GetHandler(cmd.Args[0])
if handler == nil {
return fmt.Errorf("no such command: %s", cmd.Args[0])
}
inPub, inPriv, err := beam.USocketPair()
if err != nil {
return err
}
// Don't close inPub here. We close it to signify the end of input once
// all children are completed (guaranteeing that no more input will be sent
// by children).
// Otherwise we get a deadlock.
defer inPriv.Close()
outPub, outPriv, err := beam.USocketPair()
if err != nil {
return err
}
defer outPub.Close()
// don't close outPriv here. It must be closed after the handler is called,
// but before the copy tasks associated with it completes.
// Otherwise we get a deadlock.
var tasks sync.WaitGroup
tasks.Add(2)
go func() {
handler(cmd.Args, inPriv, outPriv)
// FIXME: do we need to outPriv.sync before closing it?
Debugf("[%s] handler returned, closing output\n", strings.Join(cmd.Args, " "))
outPriv.Close()
tasks.Done()
}()
go func() {
Debugf("[%s] copy start...\n", strings.Join(cmd.Args, " "))
beamCopy(client, outPub)
Debugf("[%s] copy done\n", strings.Join(cmd.Args, " "))
tasks.Done()
}()
// depth-first execution of children commands
// executeScript() blocks until all commands are completed
executeScript(inPub, cmd.Children)
inPub.Close()
Debugf("[%s] waiting for handler and output copy to complete...\n", strings.Join(cmd.Args, " "))
tasks.Wait()
Debugf("[%s] handler and output copy complete!\n", strings.Join(cmd.Args, " "))
return nil
}
func randomId() string {
id := make([]byte, 4)
io.ReadFull(rand.Reader, id)
return hex.EncodeToString(id)
}
func GetHandler(name string) Handler {
if name == "trace" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
for {
p, a, err := beam.Receive(in)
if err != nil {
return
}
fd := -1
if a != nil {
fd = int(a.Fd())
}
fmt.Printf("===> [TRACE] %s [%d]\n", p, fd)
beam.Send(out, p, a)
}
}
} else if name == "emit" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
beam.Send(out, data.Empty().Set("foo", args[1:]...).Bytes(), nil)
}
} else if name == "print" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
for {
_, a, err := beam.Receive(in)
if err != nil {
return
}
if a != nil {
io.Copy(os.Stdout, a)
}
}
}
} else if name == "openfile" {
return func(args []string, in *net.UnixConn, out *net.UnixConn) {
for _, name := range args {
f, err := os.Open(name)
if err != nil {
continue
}
if err := beam.Send(out, data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil {
f.Close()
}
}
}
}
return nil
}
// 'status' is a notification of a job's status.
//
func parseEnv(args []string) ([]string, map[string]string) {
@ -154,205 +253,11 @@ func parseEnv(args []string) ([]string, map[string]string) {
return argsOut, env
}
func CmdTrace(args []string, f *os.File) {
resp, err := beam.FdConn(int(f.Fd()))
if err != nil {
Fatal(err)
return
}
defer resp.Close()
for {
payload, attachment, err := beam.Receive(resp)
if err != nil {
Logf("[trace] error waiting for message\n")
return
}
msg, err := data.Decode(string(payload))
if err != nil {
fmt.Printf("===> %s\n", payload)
} else {
fmt.Printf("===> %v\n", msg)
}
if err := beam.Send(resp, payload, attachment); err != nil {
return
}
}
}
func CmdExec(args []string, f *os.File) {
resp, err := beam.FdConn(int(f.Fd()))
if err != nil {
Fatal(err)
return
}
defer resp.Close()
cmd := exec.Command(args[1], args[2:]...)
Logf("EXEC %s %s\n", cmd.Path, cmd.Args)
stdoutR, stdoutW, err := os.Pipe()
if err != nil {
Fatal(err)
return
}
cmd.Stdout = stdoutW
stderrR, stderrW, err := os.Pipe()
if err != nil {
Fatal(err)
return
}
cmd.Stderr = stderrW
if err := beam.Send(resp, data.Empty().Set("cmd", "log", "stdout").Bytes(), stdoutR); err != nil {
Fatal(err)
}
if err := beam.Send(resp, data.Empty().Set("cmd", "log", "stderr").Bytes(), stderrR); err != nil {
Fatal(err)
}
cmd.Run()
}
func CmdLog(args []string, f *os.File) {
defer Debugf("CmdLog done\n")
var name string
if len(args) > 0 {
name = args[1]
}
input := bufio.NewScanner(f)
for input.Scan() {
line := input.Text()
if len(line) > 0 {
fmt.Printf("[%s] %s\n", name, line)
}
if err := input.Err(); err != nil {
fmt.Printf("[%s:%s]\n", name, err)
break
}
}
}
type Msg struct {
payload []byte
attachment *os.File
}
func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error {
Debugf("[Serve %#v]\n", handler)
defer Debugf("[Serve %#v] done\n", handler)
var tasks sync.WaitGroup
defer tasks.Wait()
in := make(chan *Msg)
go func() {
for {
Debugf("[Serve] waiting for next message on endpoint...\n")
payload, attachment, err := beam.Receive(endpoint)
if err != nil {
break
}
in<-&Msg{payload, attachment}
}
Debugf("[Serve] endpoint closed. Waiting for tasks to complete\n")
tasks.Wait()
Debugf("[Serve] endpoint closed AND tasks complete\n")
close(in)
}()
for msg := range in {
tasks.Add(1)
// Handle new message
go func(payload []byte, attachment *os.File) {
Debugf("---> Handling '%s' [fd=%d]\n", payload, attachment.Fd())
defer tasks.Done()
args, err := parseMsgPayload(payload)
if err != nil {
Logf("error parsing beam message: %s\n", err)
if attachment != nil {
attachment.Close()
}
return
}
Debugf("---> calling handler for '%s'\n", args[0])
handlerAttachment := attachment
var iotasks sync.WaitGroup
if attachment != nil {
if caller, err := beam.FdConn(int(attachment.Fd())); err == nil {
Debugf("[Serve] message '%s' has a valid beam endpoint as attachment. Setting up indirection\n", payload)
defer caller.Close()
jobpub, jobpriv, err := beam.USocketPair()
if err != nil {
return
}
defer jobpub.Close()
if f, err := jobpriv.File(); err != nil {
jobpriv.Close()
return
} else {
handlerAttachment = f
defer attachment.Close()
}
jobpriv.Close()
// Read messages from the job and re-insert them for handling
iotasks.Add(1)
go func(job *net.UnixConn) {
defer iotasks.Done()
for {
payload, attachment, err := beam.Receive(job)
if err != nil {
return
}
var fd int = -1
if attachment != nil {
fd = int(attachment.Fd())
}
Debugf("[Serve] received introspection message '%s'[%d]\n", payload, fd)
// Send messages back in for introspection
// Note that we don't scope introspection: jobs have full access to the
// context in which they were called.
in <-&Msg{payload, attachment}
}
}(jobpub)
// Read messages from the caller to the job
go func(caller *net.UnixConn, job *net.UnixConn) {
for {
payload, f, err := beam.Receive(caller)
if err != nil {
return
}
if err := beam.Send(job, payload, f); err != nil {
return
}
}
}(caller, jobpub)
}
}
handler(args, handlerAttachment)
Debugf("---> handler returned for '%s'\n", args[0])
if handlerAttachment != nil {
handlerAttachment.Close()
}
Debugf("---> waiting for iotasks to complete for '%s'\n", args[0])
iotasks.Wait()
Debugf("---> iotasks complete for '%s'\n", args[0])
}(msg.payload, msg.attachment)
}
Debugf("[Serve] main serve loop completed\n")
return nil
}
func builtinsHandler(args []string, attachment *os.File) {
if args[0] == "exit" {
CmdExit(args, attachment)
} else if args[0] == "cat" {
CmdCat(args, attachment)
} else if args[0] == "echo" {
CmdEcho(args, attachment)
} else if args[0] == "log" {
CmdLog(args, attachment)
} else if args[0] == "trace" {
CmdTrace(args, attachment)
} else if args[0] == "exec" {
CmdExec(args, attachment)
}
}
func Logf(msg string, args ...interface{}) (int, error) {
if len(msg) == 0 || msg[len(msg) - 1] != '\n' {
msg = msg + "\n"
@ -362,7 +267,7 @@ func Logf(msg string, args ...interface{}) (int, error) {
}
func Debugf(msg string, args ...interface{}) {
if os.Getenv("DEBUG") != "" {
if os.Getenv("BEAMDEBUG") != "" {
Logf(msg, args...)
}
}