beam/examples/beamsh: more bells and whistles for demos

* Automatically switch to interactive mode when stdin is a terminal

* Basic implementation of "responses"

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
Solomon Hykes 2014-03-23 20:46:31 -07:00
parent b1090576ae
commit 572952d6ce

View file

@ -6,6 +6,7 @@ import (
"os" "os"
"github.com/dotcloud/docker/pkg/dockerscript" "github.com/dotcloud/docker/pkg/dockerscript"
"github.com/dotcloud/docker/pkg/beam" "github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/term"
"strings" "strings"
"sync" "sync"
"net" "net"
@ -16,28 +17,61 @@ import (
) )
func main() { func main() {
script, err := dockerscript.Parse(os.Stdin) Logf("Initializing engine\n")
if err != nil {
Fatal("parse error: %v\n", err)
}
Logf("%d commands:\n", len(script))
client, engine, err := beam.USocketPair() client, engine, err := beam.USocketPair()
if err != nil { if err != nil {
Fatal(err) Fatal(err)
} }
defer client.Close()
go func() { go func() {
Serve(engine) Serve(engine, builtinsHandler)
Logf("Shutting down engine\n")
engine.Close() engine.Close()
}() }()
if term.IsTerminal(0) {
input := bufio.NewScanner(os.Stdin)
for {
os.Stdout.Write([]byte("beamsh> "))
if !input.Scan() {
break
}
line := input.Text()
if len(line) != 0 {
cmd, err := dockerscript.Parse(strings.NewReader(line))
if err != nil {
Logf("error: %v\n", err)
continue
}
executeScript(client, cmd)
}
if err := input.Err(); err == io.EOF {
break
} else if err != nil {
Fatal(err)
}
}
} else {
script, err := dockerscript.Parse(os.Stdin)
if err != nil {
Fatal("parse error: %v\n", err)
}
executeScript(client, script)
}
}
func executeScript(client *net.UnixConn, script []*dockerscript.Command) {
Logf("%d commands:\n", len(script))
for _, cmd := range script { for _, cmd := range script {
job, err := beam.SendPipe(client, []byte(strings.Join(cmd.Args, " "))) job, err := beam.SendPipe(client, []byte(strings.Join(cmd.Args, " ")))
if err != nil { if err != nil {
Fatal(err) Fatal(err)
} }
Serve(job) // TODO: pass a default handler to deal with 'status'
// --> use beam chaining?
Logf("Listening for reply messages\n")
Serve(job, builtinsHandler)
Logf("[%s] done\n", strings.Join(cmd.Args, " ")) Logf("[%s] done\n", strings.Join(cmd.Args, " "))
} }
client.Close()
} }
func parseMsgPayload(payload []byte) ([]string, error) { func parseMsgPayload(payload []byte) ([]string, error) {
@ -75,11 +109,11 @@ func CmdEcho(args []string, f *os.File) {
if err != nil { if err != nil {
return return
} }
Logf("[CmdEcho] stdout pipe() r=%d w=%d\n", r.Fd(), w.Fd())
if err := beam.Send(resp, []byte("log stdout"), r); err != nil { if err := beam.Send(resp, []byte("log stdout"), r); err != nil {
return return
} }
fmt.Fprintln(w, strings.Join(args[1:], " ")) fmt.Fprintln(w, strings.Join(args[1:], " "))
fmt.Printf("waiting 5 seconds...\n")
w.Close() w.Close()
} }
@ -94,7 +128,29 @@ func CmdExit(args []string, f *os.File) {
os.Exit(status) os.Exit(status)
} }
// 'status' is a notification of a job's status.
//
func parseEnv(args []string) ([]string, map[string]string) {
var argsOut []string
env := make(map[string]string)
for _, word := range args[1:] {
if strings.Contains(word, "=") {
kv := strings.SplitN(word, "=", 2)
key := kv[0]
var val string
if len(kv) == 2 {
val = kv[1]
}
env[key] = val
} else {
argsOut = append(argsOut, word)
}
}
return argsOut, env
}
func CmdLog(args []string, f *os.File) { func CmdLog(args []string, f *os.File) {
defer Logf("CmdLog done\n")
name := args[1] name := args[1]
input := bufio.NewScanner(f) input := bufio.NewScanner(f)
for input.Scan() { for input.Scan() {
@ -109,23 +165,28 @@ func CmdLog(args []string, f *os.File) {
} }
} }
func Serve(endpoint *net.UnixConn) error { func Serve(endpoint *net.UnixConn, handler func([]string, *os.File)) error {
Logf("[Serve]\n") Logf("[Serve %#v]\n", handler)
defer Logf("[Serve] done\n") defer Logf("[Serve %#v] done\n", handler)
var tasks sync.WaitGroup var tasks sync.WaitGroup
defer tasks.Wait() defer tasks.Wait()
for { for {
Logf("[Serve] waiting for next message...\n")
payload, attachment, err := beam.Receive(endpoint) payload, attachment, err := beam.Receive(endpoint)
if err != nil { if err != nil {
return err return err
} }
tasks.Add(1) tasks.Add(1)
// Handle new message
go func(payload []byte, attachment *os.File) { go func(payload []byte, attachment *os.File) {
Logf("---> Handling '%s' [fd=%d]\n", payload, attachment.Fd())
defer tasks.Done() defer tasks.Done()
if attachment != nil { if attachment != nil {
defer fmt.Printf("closing request attachment %d\n", attachment.Fd()) defer func() {
attachment.Close() Logf("---> Closing attachment [fd=%d] for msg '%s'\n", attachment.Fd(), payload)
attachment.Close()
}()
} else {
defer Logf("---> No attachment to close for msg '%s'\n", payload)
} }
args, err := parseMsgPayload(payload) args, err := parseMsgPayload(payload)
if err != nil { if err != nil {
@ -135,21 +196,26 @@ func Serve(endpoint *net.UnixConn) error {
} }
return return
} }
Logf("beam message: %v\n", args) Logf("---> calling handler for '%s'\n", args[0])
if args[0] == "exit" { handler(args, attachment)
CmdExit(args, attachment) Logf("---> handler returned for '%s'\n", args[0])
} else if args[0] == "cat" {
CmdCat(args, attachment)
} else if args[0] == "echo" {
CmdEcho(args, attachment)
} else if args[0] == "log" {
CmdLog(args, attachment)
}
}(payload, attachment) }(payload, attachment)
} }
return nil return nil
} }
func builtinsHandler(args []string, attachment *os.File) {
if args[0] == "exit" {
CmdExit(args, attachment)
} else if args[0] == "cat" {
CmdCat(args, attachment)
} else if args[0] == "echo" {
CmdEcho(args, attachment)
} else if args[0] == "log" {
CmdLog(args, attachment)
}
}
func Logf(msg string, args ...interface{}) (int, error) { func Logf(msg string, args ...interface{}) (int, error) {
if len(msg) == 0 || msg[len(msg) - 1] != '\n' { if len(msg) == 0 || msg[len(msg) - 1] != '\n' {