From 36231f23a3c25ae74970016ddb129894370664a9 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Wed, 2 Apr 2014 16:38:36 -0700 Subject: [PATCH] beam: Router can route beam messages with a convenient set of rules and handlers Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- beam/beam.go | 11 +++ beam/examples/beamsh/beamsh.go | 8 +- beam/router.go | 137 +++++++++++++++++++++++++++++++++ beam/router_test.go | 96 +++++++++++++++++++++++ 4 files changed, 246 insertions(+), 6 deletions(-) create mode 100644 beam/router.go create mode 100644 beam/router_test.go diff --git a/beam/beam.go b/beam/beam.go index f93c11b..fab5f62 100644 --- a/beam/beam.go +++ b/beam/beam.go @@ -1,6 +1,7 @@ package beam import ( + "fmt" "io" "os" ) @@ -100,3 +101,13 @@ func Copy(dst Sender, src Receiver) (int, error) { panic("impossibru!") return n, nil } + +// MsgDesc returns a human readable description of a beam message, usually +// for debugging purposes. +func MsgDesc(payload []byte, attachment *os.File) string { + var filedesc string = "" + if attachment != nil { + filedesc = fmt.Sprintf("%d", attachment.Fd()) + } + return fmt.Sprintf("'%s'[%s]", payload, filedesc) +} diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go index 72e506a..f3930e0 100644 --- a/beam/examples/beamsh/beamsh.go +++ b/beam/examples/beamsh/beamsh.go @@ -851,13 +851,9 @@ func SendToConn(connections chan net.Conn, src beam.Receiver) error { return nil } -func msgDesc(payload []byte, attachment *os.File) string { - var filedesc string = "" - if attachment != nil { - filedesc = fmt.Sprintf("%d", attachment.Fd()) - } - return fmt.Sprintf("'%s'[%s]", payload, filedesc) +func msgDesc(payload []byte, attachment *os.File) string { + return beam.MsgDesc(payload, attachment) } func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error { diff --git a/beam/router.go b/beam/router.go new file mode 100644 index 0000000..3dcc3cc --- /dev/null +++ b/beam/router.go @@ -0,0 +1,137 @@ +package beam + +import ( + "io" + "fmt" + "os" + "github.com/dotcloud/docker/pkg/beam/data" +) + +type Router struct { + routes []*Route + sink Sender +} + +func NewRouter(sink Sender) *Router { + return &Router{sink:sink} +} + +func (r *Router) Send(payload []byte, attachment *os.File) (err error) { + //fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment)) + defer func() { + //fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err) + }() + for _, route := range r.routes { + if route.Match(payload, attachment) { + return route.Handle(payload, attachment) + } + } + if r.sink != nil { + //fmt.Printf("[Router.Send] no match. sending to sink\n") + return r.sink.Send(payload, attachment) + } + //fmt.Printf("[Router.Send] no match. return error.\n") + return fmt.Errorf("no matching route") +} + +func (r *Router) NewRoute() *Route { + route := &Route{} + r.routes = append(r.routes, route) + return route +} + + + +type Route struct { + rules []func([]byte, *os.File) bool + handler func([]byte, *os.File) error +} + +func (route *Route) Match(payload []byte, attachment *os.File) bool { + for _, rule := range route.rules { + if !rule(payload, attachment) { + return false + } + } + return true +} + +func (route *Route) Handle(payload []byte, attachment *os.File) error { + if route.handler == nil { + return nil + } + return route.handler(payload, attachment) +} + +func (r *Route) HasAttachment() *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + return attachment != nil + }) + return r +} + + +func (route *Route) Tee(dst Sender) *Route { + inner := route.handler + route.handler = func(payload []byte, attachment *os.File) error { + if inner == nil { + return nil + } + if attachment == nil { + return inner(payload, attachment) + } + // Setup the tee + w, err := SendPipe(dst, payload) + if err != nil { + return err + } + teeR, teeW, err := os.Pipe() + if err != nil { + w.Close() + return err + } + go func() { + io.Copy(io.MultiWriter(teeW, w), attachment) + attachment.Close() + w.Close() + teeW.Close() + }() + return inner(payload, teeR) + } + return route +} + +func (r *Route) KeyStartsWith(k string, beginning ...string) *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + values := data.Message(payload).Get(k) + if len(values) < len(beginning) { + return false + } + for i, v := range beginning { + if v != values[i] { + return false + } + } + return true + }) + return r +} + +func (r *Route) Passthrough(dst Sender) *Route { + r.handler = func(payload []byte, attachment *os.File) error { + return dst.Send(payload, attachment) + } + return r +} + +func (r *Route) All() *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + return true + }) + return r +} + +func (r *Route) Handler(h func([]byte, *os.File) error) *Route { + r.handler = h + return r +} diff --git a/beam/router_test.go b/beam/router_test.go new file mode 100644 index 0000000..f4c0cb1 --- /dev/null +++ b/beam/router_test.go @@ -0,0 +1,96 @@ +package beam + +import ( + "fmt" + "io/ioutil" + "testing" + "os" + "sync" +) + +type msg struct { + payload []byte + attachment *os.File +} + +func (m msg) String() string { + return MsgDesc(m.payload, m.attachment) +} + + +type mockReceiver []msg + +func (r *mockReceiver) Send(p []byte, a *os.File) error { + (*r) = append((*r), msg{p, a}) + return nil +} + +func TestSendNoSinkNoRoute(t *testing.T) { + r := NewRouter(nil) + if err := r.Send([]byte("hello"), nil); err == nil { + t.Fatalf("error expected") + } + a, b, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + if err := r.Send([]byte("foo bar baz"), a); err == nil { + t.Fatalf("error expected") + } +} + +func TestSendSinkNoRoute(t *testing.T) { + var sink mockReceiver + r := NewRouter(&sink) + if err := r.Send([]byte("hello"), nil); err != nil { + t.Fatal(err) + } + a, b, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + if err := r.Send([]byte("world"), a); err != nil { + t.Fatal(err) + } + if len(sink) != 2 { + t.Fatalf("%#v\n", sink) + } + if string(sink[0].payload) != "hello" { + t.Fatalf("%#v\n", sink) + } + if sink[0].attachment != nil { + t.Fatalf("%#v\n", sink) + } + if string(sink[1].payload) != "world" { + t.Fatalf("%#v\n", sink) + } + if sink[1].attachment == nil || sink[1].attachment.Fd() > 42 || sink[1].attachment.Fd() < 0 { + t.Fatalf("%v\n", sink) + } + var tasks sync.WaitGroup + tasks.Add(2) + go func() { + defer tasks.Done() + fmt.Printf("[%d] Reading from '%d'\n", os.Getpid(), sink[1].attachment.Fd()) + data, err := ioutil.ReadAll(sink[1].attachment) + if err != nil { + t.Fatal(err) + } + if string(data) != "foo bar\n" { + t.Fatalf("%v\n", string(data)) + } + }() + go func() { + defer tasks.Done() + fmt.Printf("[%d] writing to '%d'\n", os.Getpid(), a.Fd()) + if _, err := fmt.Fprintf(b, "foo bar\n"); err != nil { + t.Fatal(err) + } + b.Close() + }() + tasks.Wait() +}