pkg/beam/router.go

189 lines
3.9 KiB
Go
Raw Normal View History

package beam
import (
"io"
"fmt"
"os"
"github.com/dotcloud/docker/pkg/beam/data"
)
type Router struct {
routes []*Route
sink Sender
}
func NewRouter(sink Sender) *Router {
return &Router{sink:sink}
}
func (r *Router) Send(payload []byte, attachment *os.File) (err error) {
//fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment))
defer func() {
//fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err)
}()
for _, route := range r.routes {
if route.Match(payload, attachment) {
return route.Handle(payload, attachment)
}
}
if r.sink != nil {
// fmt.Printf("[%d] [Router.Send] no match. sending %s to sink %#v\n", os.Getpid(), MsgDesc(payload, attachment), r.sink)
return r.sink.Send(payload, attachment)
}
//fmt.Printf("[Router.Send] no match. return error.\n")
return fmt.Errorf("no matching route")
}
func (r *Router) NewRoute() *Route {
route := &Route{}
r.routes = append(r.routes, route)
return route
}
type Route struct {
rules []func([]byte, *os.File) bool
handler func([]byte, *os.File) error
}
func (route *Route) Match(payload []byte, attachment *os.File) bool {
for _, rule := range route.rules {
if !rule(payload, attachment) {
return false
}
}
return true
}
func (route *Route) Handle(payload []byte, attachment *os.File) error {
if route.handler == nil {
return nil
}
return route.handler(payload, attachment)
}
func (r *Route) HasAttachment() *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return attachment != nil
})
return r
}
func (route *Route) Tee(dst Sender) *Route {
inner := route.handler
route.handler = func(payload []byte, attachment *os.File) error {
if inner == nil {
return nil
}
if attachment == nil {
return inner(payload, attachment)
}
// Setup the tee
w, err := SendPipe(dst, payload)
if err != nil {
return err
}
teeR, teeW, err := os.Pipe()
if err != nil {
w.Close()
return err
}
go func() {
io.Copy(io.MultiWriter(teeW, w), attachment)
attachment.Close()
w.Close()
teeW.Close()
}()
return inner(payload, teeR)
}
return route
}
func (r *Route) Filter(f func([]byte, *os.File) bool) *Route {
r.rules = append(r.rules, f)
return r
}
func (r *Route) KeyStartsWith(k string, beginning ...string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
values := data.Message(payload).Get(k)
if values == nil {
return false
}
if len(values) < len(beginning) {
return false
}
for i, v := range beginning {
if v != values[i] {
return false
}
}
return true
})
return r
}
func (r *Route) KeyEquals(k string, full...string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
values := data.Message(payload).Get(k)
if len(values) != len(full) {
return false
}
for i, v := range full {
if v != values[i] {
return false
}
}
return true
})
return r
}
func (r *Route) KeyIncludes(k, v string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
for _, val := range data.Message(payload).Get(k) {
if val == v {
return true
}
}
return false
})
return r
}
func (r *Route) NoKey(k string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return len(data.Message(payload).Get(k)) == 0
})
return r
}
func (r *Route) KeyExists(k string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return data.Message(payload).Get(k) != nil
})
return r
}
func (r *Route) Passthrough(dst Sender) *Route {
r.handler = func(payload []byte, attachment *os.File) error {
return dst.Send(payload, attachment)
}
return r
}
func (r *Route) All() *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return true
})
return r
}
func (r *Route) Handler(h func([]byte, *os.File) error) *Route {
r.handler = h
return r
}