138 lines
2.8 KiB
Go
138 lines
2.8 KiB
Go
|
package beam
|
||
|
|
||
|
import (
|
||
|
"io"
|
||
|
"fmt"
|
||
|
"os"
|
||
|
"github.com/dotcloud/docker/pkg/beam/data"
|
||
|
)
|
||
|
|
||
|
type Router struct {
|
||
|
routes []*Route
|
||
|
sink Sender
|
||
|
}
|
||
|
|
||
|
func NewRouter(sink Sender) *Router {
|
||
|
return &Router{sink:sink}
|
||
|
}
|
||
|
|
||
|
func (r *Router) Send(payload []byte, attachment *os.File) (err error) {
|
||
|
//fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment))
|
||
|
defer func() {
|
||
|
//fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err)
|
||
|
}()
|
||
|
for _, route := range r.routes {
|
||
|
if route.Match(payload, attachment) {
|
||
|
return route.Handle(payload, attachment)
|
||
|
}
|
||
|
}
|
||
|
if r.sink != nil {
|
||
|
//fmt.Printf("[Router.Send] no match. sending to sink\n")
|
||
|
return r.sink.Send(payload, attachment)
|
||
|
}
|
||
|
//fmt.Printf("[Router.Send] no match. return error.\n")
|
||
|
return fmt.Errorf("no matching route")
|
||
|
}
|
||
|
|
||
|
func (r *Router) NewRoute() *Route {
|
||
|
route := &Route{}
|
||
|
r.routes = append(r.routes, route)
|
||
|
return route
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
type Route struct {
|
||
|
rules []func([]byte, *os.File) bool
|
||
|
handler func([]byte, *os.File) error
|
||
|
}
|
||
|
|
||
|
func (route *Route) Match(payload []byte, attachment *os.File) bool {
|
||
|
for _, rule := range route.rules {
|
||
|
if !rule(payload, attachment) {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (route *Route) Handle(payload []byte, attachment *os.File) error {
|
||
|
if route.handler == nil {
|
||
|
return nil
|
||
|
}
|
||
|
return route.handler(payload, attachment)
|
||
|
}
|
||
|
|
||
|
func (r *Route) HasAttachment() *Route {
|
||
|
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||
|
return attachment != nil
|
||
|
})
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
|
||
|
func (route *Route) Tee(dst Sender) *Route {
|
||
|
inner := route.handler
|
||
|
route.handler = func(payload []byte, attachment *os.File) error {
|
||
|
if inner == nil {
|
||
|
return nil
|
||
|
}
|
||
|
if attachment == nil {
|
||
|
return inner(payload, attachment)
|
||
|
}
|
||
|
// Setup the tee
|
||
|
w, err := SendPipe(dst, payload)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
teeR, teeW, err := os.Pipe()
|
||
|
if err != nil {
|
||
|
w.Close()
|
||
|
return err
|
||
|
}
|
||
|
go func() {
|
||
|
io.Copy(io.MultiWriter(teeW, w), attachment)
|
||
|
attachment.Close()
|
||
|
w.Close()
|
||
|
teeW.Close()
|
||
|
}()
|
||
|
return inner(payload, teeR)
|
||
|
}
|
||
|
return route
|
||
|
}
|
||
|
|
||
|
func (r *Route) KeyStartsWith(k string, beginning ...string) *Route {
|
||
|
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||
|
values := data.Message(payload).Get(k)
|
||
|
if len(values) < len(beginning) {
|
||
|
return false
|
||
|
}
|
||
|
for i, v := range beginning {
|
||
|
if v != values[i] {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
return true
|
||
|
})
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
func (r *Route) Passthrough(dst Sender) *Route {
|
||
|
r.handler = func(payload []byte, attachment *os.File) error {
|
||
|
return dst.Send(payload, attachment)
|
||
|
}
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
func (r *Route) All() *Route {
|
||
|
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||
|
return true
|
||
|
})
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
func (r *Route) Handler(h func([]byte, *os.File) error) *Route {
|
||
|
r.handler = h
|
||
|
return r
|
||
|
}
|