beam/examples/beamsh: 'exec' can communicate with its child via beam.
Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
parent
9598cba7c0
commit
1794406033
4 changed files with 154 additions and 14 deletions
22
beam/beam.go
22
beam/beam.go
|
@ -24,6 +24,11 @@ type SendCloser interface {
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ReceiveSender interface {
|
||||||
|
Receiver
|
||||||
|
Sender
|
||||||
|
}
|
||||||
|
|
||||||
func SendPipe(dst Sender, data []byte) (*os.File, error) {
|
func SendPipe(dst Sender, data []byte) (*os.File, error) {
|
||||||
r, w, err := os.Pipe()
|
r, w, err := os.Pipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -111,3 +116,20 @@ func MsgDesc(payload []byte, attachment *os.File) string {
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
|
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type devnull struct{}
|
||||||
|
|
||||||
|
func Devnull() ReceiveSender {
|
||||||
|
return devnull{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d devnull) Send(p []byte, a *os.File) error {
|
||||||
|
if a != nil {
|
||||||
|
a.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d devnull) Receive() ([]byte, *os.File, error) {
|
||||||
|
return nil, nil, io.EOF
|
||||||
|
}
|
||||||
|
|
|
@ -23,9 +23,19 @@ var rootPlugins = []string{
|
||||||
|
|
||||||
var (
|
var (
|
||||||
flX bool
|
flX bool
|
||||||
|
flPing bool
|
||||||
|
introspect beam.ReceiveSender = beam.Devnull()
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
fd3 := os.NewFile(3, "beam-introspect")
|
||||||
|
if introsp, err := beam.FileConn(fd3); err == nil {
|
||||||
|
introspect = introsp
|
||||||
|
Logf("introspection enabled\n")
|
||||||
|
} else {
|
||||||
|
Logf("introspection disabled\n")
|
||||||
|
}
|
||||||
|
fd3.Close()
|
||||||
flag.BoolVar(&flX, "x", false, "print commands as they are being executed")
|
flag.BoolVar(&flX, "x", false, "print commands as they are being executed")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
if flag.NArg() == 0{
|
if flag.NArg() == 0{
|
||||||
|
@ -33,7 +43,7 @@ func main() {
|
||||||
// No arguments, stdin is terminal --> interactive mode
|
// No arguments, stdin is terminal --> interactive mode
|
||||||
input := bufio.NewScanner(os.Stdin)
|
input := bufio.NewScanner(os.Stdin)
|
||||||
for {
|
for {
|
||||||
os.Stdout.Write([]byte("beamsh> "))
|
fmt.Printf("[%d] beamsh> ", os.Getpid())
|
||||||
if !input.Scan() {
|
if !input.Scan() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -103,11 +113,34 @@ func executeRootScript(script []*dockerscript.Command) error {
|
||||||
lastCmd.Children = script
|
lastCmd.Children = script
|
||||||
script = []*dockerscript.Command{rootCmd}
|
script = []*dockerscript.Command{rootCmd}
|
||||||
}
|
}
|
||||||
handlers, err := Handlers()
|
handlers, err := Handlers(introspect)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer handlers.Close()
|
defer handlers.Close()
|
||||||
|
var tasks sync.WaitGroup
|
||||||
|
defer func() {
|
||||||
|
Debugf("Waiting for introspection...\n")
|
||||||
|
tasks.Wait()
|
||||||
|
Debugf("DONE Waiting for introspection\n")
|
||||||
|
}()
|
||||||
|
if introspect != nil {
|
||||||
|
tasks.Add(1)
|
||||||
|
go func() {
|
||||||
|
Debugf("starting introspection\n")
|
||||||
|
defer Debugf("done with introspection\n")
|
||||||
|
defer tasks.Done()
|
||||||
|
introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil)
|
||||||
|
Debugf("XXX starting reading introspection messages\n")
|
||||||
|
r := beam.NewRouter(handlers)
|
||||||
|
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
|
||||||
|
Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a))
|
||||||
|
return handlers.Send(p, a)
|
||||||
|
})
|
||||||
|
n, err := beam.Copy(r, introspect)
|
||||||
|
Debugf("XXX done reading %d introspection messages: %v\n", n, err)
|
||||||
|
}()
|
||||||
|
}
|
||||||
if err := executeScript(handlers, script); err != nil {
|
if err := executeScript(handlers, script); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -188,7 +221,7 @@ func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
|
||||||
type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender)
|
type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender)
|
||||||
|
|
||||||
|
|
||||||
func Handlers() (*beam.UnixConn, error) {
|
func Handlers(sink beam.Sender) (*beam.UnixConn, error) {
|
||||||
var tasks sync.WaitGroup
|
var tasks sync.WaitGroup
|
||||||
pub, priv, err := beam.USocketPair()
|
pub, priv, err := beam.USocketPair()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -202,15 +235,16 @@ func Handlers() (*beam.UnixConn, error) {
|
||||||
priv.CloseWrite()
|
priv.CloseWrite()
|
||||||
Debugf("[handlers] done closewrite() on endpoint\n")
|
Debugf("[handlers] done closewrite() on endpoint\n")
|
||||||
}()
|
}()
|
||||||
for {
|
r := beam.NewRouter(sink)
|
||||||
Debugf("[handlers] waiting for next job...\n")
|
r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error {
|
||||||
payload, conn, err := beam.ReceiveConn(priv)
|
conn, err := beam.FileConn(attachment)
|
||||||
Debugf("[handlers] ReceiveConn() returned %v\n", err)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
attachment.Close()
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
// attachment.Close()
|
||||||
tasks.Add(1)
|
tasks.Add(1)
|
||||||
go func(payload []byte, conn *beam.UnixConn) {
|
go func() {
|
||||||
defer tasks.Done()
|
defer tasks.Done()
|
||||||
defer func() {
|
defer func() {
|
||||||
Debugf("[handlers] '%s' closewrite\n", payload)
|
Debugf("[handlers] '%s' closewrite\n", payload)
|
||||||
|
@ -239,8 +273,10 @@ func Handlers() (*beam.UnixConn, error) {
|
||||||
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
|
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
|
||||||
handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn))
|
handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn))
|
||||||
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
|
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
|
||||||
}(payload, conn)
|
}()
|
||||||
}
|
return nil
|
||||||
|
})
|
||||||
|
beam.Copy(r, priv)
|
||||||
Debugf("[handlers] waiting for all tasks\n")
|
Debugf("[handlers] waiting for all tasks\n")
|
||||||
tasks.Wait()
|
tasks.Wait()
|
||||||
Debugf("[handlers] all tasks returned\n")
|
Debugf("[handlers] all tasks returned\n")
|
||||||
|
@ -285,6 +321,8 @@ func GetHandler(name string) Handler {
|
||||||
return CmdConnect
|
return CmdConnect
|
||||||
} else if name == "openfile" {
|
} else if name == "openfile" {
|
||||||
return CmdOpenfile
|
return CmdOpenfile
|
||||||
|
} else if name == "spawn" {
|
||||||
|
return CmdSpawn
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"github.com/dotcloud/docker/pkg/beam"
|
"github.com/dotcloud/docker/pkg/beam"
|
||||||
"github.com/dotcloud/docker/pkg/beam/data"
|
"github.com/dotcloud/docker/pkg/beam/data"
|
||||||
"github.com/dotcloud/docker/pkg/term"
|
"github.com/dotcloud/docker/pkg/term"
|
||||||
|
"github.com/dotcloud/docker/utils"
|
||||||
"text/template"
|
"text/template"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -167,6 +168,27 @@ func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||||
|
c := exec.Command(utils.SelfPath())
|
||||||
|
r, w, err := os.Pipe()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(stderr, "%v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.Stdin = r
|
||||||
|
c.Stdout = stdout
|
||||||
|
c.Stderr = stderr
|
||||||
|
go func() {
|
||||||
|
fmt.Fprintf(w, strings.Join(args[1:], " "))
|
||||||
|
w.Sync()
|
||||||
|
w.Close()
|
||||||
|
}()
|
||||||
|
if err := c.Run(); err != nil {
|
||||||
|
fmt.Fprintf(stderr, "%v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||||
os.Chdir(args[1])
|
os.Chdir(args[1])
|
||||||
GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out)
|
GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out)
|
||||||
|
@ -176,8 +198,46 @@ func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam
|
||||||
cmd := exec.Command(args[1], args[2:]...)
|
cmd := exec.Command(args[1], args[2:]...)
|
||||||
cmd.Stdout = stdout
|
cmd.Stdout = stdout
|
||||||
cmd.Stderr = stderr
|
cmd.Stderr = stderr
|
||||||
cmd.Stdin = os.Stdin
|
//cmd.Stdin = os.Stdin
|
||||||
|
local, remote, err := beam.SocketPair()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(stderr, "%v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
child, err := beam.FileConn(local)
|
||||||
|
if err != nil {
|
||||||
|
local.Close()
|
||||||
|
remote.Close()
|
||||||
|
fmt.Fprintf(stderr, "%v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
local.Close()
|
||||||
|
cmd.ExtraFiles = append(cmd.ExtraFiles, remote)
|
||||||
|
|
||||||
|
var tasks sync.WaitGroup
|
||||||
|
tasks.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer Debugf("done copying to child\n")
|
||||||
|
defer tasks.Done()
|
||||||
|
defer child.CloseWrite()
|
||||||
|
beam.Copy(child, in)
|
||||||
|
}()
|
||||||
|
|
||||||
|
tasks.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer Debugf("done copying from child %d\n")
|
||||||
|
defer tasks.Done()
|
||||||
|
r := beam.NewRouter(out)
|
||||||
|
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
|
||||||
|
return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a)
|
||||||
|
})
|
||||||
|
beam.Copy(r, child)
|
||||||
|
}()
|
||||||
execErr := cmd.Run()
|
execErr := cmd.Run()
|
||||||
|
// We can close both ends of the socket without worrying about data stuck in the buffer,
|
||||||
|
// because unix socket writes are fully synchronous.
|
||||||
|
child.Close()
|
||||||
|
tasks.Wait()
|
||||||
var status string
|
var status string
|
||||||
if execErr != nil {
|
if execErr != nil {
|
||||||
status = execErr.Error()
|
status = execErr.Error()
|
||||||
|
@ -190,7 +250,11 @@ func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam
|
||||||
func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||||
r := beam.NewRouter(out)
|
r := beam.NewRouter(out)
|
||||||
r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error {
|
r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error {
|
||||||
fmt.Printf("===> %s\n", beam.MsgDesc(payload, attachment))
|
var sfd string = "nil"
|
||||||
|
if attachment != nil {
|
||||||
|
sfd = fmt.Sprintf("%d", attachment.Fd())
|
||||||
|
}
|
||||||
|
fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd)
|
||||||
out.Send(payload, attachment)
|
out.Send(payload, attachment)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -27,7 +27,7 @@ func (r *Router) Send(payload []byte, attachment *os.File) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if r.sink != nil {
|
if r.sink != nil {
|
||||||
//fmt.Printf("[Router.Send] no match. sending to sink\n")
|
// fmt.Printf("[%d] [Router.Send] no match. sending %s to sink %#v\n", os.Getpid(), MsgDesc(payload, attachment), r.sink)
|
||||||
return r.sink.Send(payload, attachment)
|
return r.sink.Send(payload, attachment)
|
||||||
}
|
}
|
||||||
//fmt.Printf("[Router.Send] no match. return error.\n")
|
//fmt.Printf("[Router.Send] no match. return error.\n")
|
||||||
|
@ -101,9 +101,17 @@ func (route *Route) Tee(dst Sender) *Route {
|
||||||
return route
|
return route
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Route) Filter(f func([]byte, *os.File) bool) *Route {
|
||||||
|
r.rules = append(r.rules, f)
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Route) KeyStartsWith(k string, beginning ...string) *Route {
|
func (r *Route) KeyStartsWith(k string, beginning ...string) *Route {
|
||||||
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||||
values := data.Message(payload).Get(k)
|
values := data.Message(payload).Get(k)
|
||||||
|
if values == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
if len(values) < len(beginning) {
|
if len(values) < len(beginning) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -117,6 +125,7 @@ func (r *Route) KeyStartsWith(k string, beginning ...string) *Route {
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (r *Route) KeyEquals(k string, full...string) *Route {
|
func (r *Route) KeyEquals(k string, full...string) *Route {
|
||||||
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||||
values := data.Message(payload).Get(k)
|
values := data.Message(payload).Get(k)
|
||||||
|
@ -152,6 +161,13 @@ func (r *Route) NoKey(k string) *Route {
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Route) KeyExists(k string) *Route {
|
||||||
|
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||||
|
return data.Message(payload).Get(k) != nil
|
||||||
|
})
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Route) Passthrough(dst Sender) *Route {
|
func (r *Route) Passthrough(dst Sender) *Route {
|
||||||
r.handler = func(payload []byte, attachment *os.File) error {
|
r.handler = func(payload []byte, attachment *os.File) error {
|
||||||
return dst.Send(payload, attachment)
|
return dst.Send(payload, attachment)
|
||||||
|
|
Loading…
Reference in a new issue