diff --git a/parsers/filters/parse.go b/parsers/filters/parse.go index 6c394f1..7444201 100644 --- a/parsers/filters/parse.go +++ b/parsers/filters/parse.go @@ -5,6 +5,7 @@ package filters import ( "encoding/json" "errors" + "fmt" "regexp" "strings" ) @@ -15,7 +16,14 @@ import ( // in an slice. // e.g given -f 'label=label1=1' -f 'label=label2=2' -f 'image.name=ubuntu' // the args will be {'label': {'label1=1','label2=2'}, 'image.name', {'ubuntu'}} -type Args map[string][]string +type Args struct { + fields map[string]map[string]bool +} + +// NewArgs initializes a new Args struct. +func NewArgs() Args { + return Args{fields: map[string]map[string]bool{}} +} // ParseFlag parses the argument to the filter flag. Like // @@ -25,9 +33,6 @@ type Args map[string][]string // map is created. func ParseFlag(arg string, prev Args) (Args, error) { filters := prev - if prev == nil { - filters = Args{} - } if len(arg) == 0 { return filters, nil } @@ -37,9 +42,11 @@ func ParseFlag(arg string, prev Args) (Args, error) { } f := strings.SplitN(arg, "=", 2) + name := strings.ToLower(strings.TrimSpace(f[0])) value := strings.TrimSpace(f[1]) - filters[name] = append(filters[name], value) + + filters.Add(name, value) return filters, nil } @@ -50,11 +57,11 @@ var ErrBadFormat = errors.New("bad format of filter (expected name=value)") // ToParam packs the Args into an string for easy transport from client to server. func ToParam(a Args) (string, error) { // this way we don't URL encode {}, just empty space - if len(a) == 0 { + if a.Len() == 0 { return "", nil } - buf, err := json.Marshal(a) + buf, err := json.Marshal(a.fields) if err != nil { return "", err } @@ -63,23 +70,71 @@ func ToParam(a Args) (string, error) { // FromParam unpacks the filter Args. func FromParam(p string) (Args, error) { - args := Args{} if len(p) == 0 { - return args, nil + return NewArgs(), nil } - if err := json.NewDecoder(strings.NewReader(p)).Decode(&args); err != nil { - return nil, err + + r := strings.NewReader(p) + d := json.NewDecoder(r) + + m := map[string]map[string]bool{} + if err := d.Decode(&m); err != nil { + r.Seek(0, 0) + + // Allow parsing old arguments in slice format. + // Because other libraries might be sending them in this format. + deprecated := map[string][]string{} + if deprecatedErr := d.Decode(&deprecated); deprecatedErr == nil { + m = deprecatedArgs(deprecated) + } else { + return NewArgs(), err + } } - return args, nil + return Args{m}, nil +} + +// Get returns the list of values associates with a field. +// It returns a slice of strings to keep backwards compatibility with old code. +func (filters Args) Get(field string) []string { + values := filters.fields[field] + if values == nil { + return make([]string, 0) + } + slice := make([]string, 0, len(values)) + for key := range values { + slice = append(slice, key) + } + return slice +} + +// Add adds a new value to a filter field. +func (filters Args) Add(name, value string) { + if _, ok := filters.fields[name]; ok { + filters.fields[name][value] = true + } else { + filters.fields[name] = map[string]bool{value: true} + } +} + +// Del removes a value from a filter field. +func (filters Args) Del(name, value string) { + if _, ok := filters.fields[name]; ok { + delete(filters.fields[name], value) + } +} + +// Len returns the number of fields in the arguments. +func (filters Args) Len() int { + return len(filters.fields) } // MatchKVList returns true if the values for the specified field maches the ones // from the sources. // e.g. given Args are {'label': {'label1=1','label2=1'}, 'image.name', {'ubuntu'}}, -// field is 'label' and sources are {'label':{'label1=1','label2=2','label3=3'}} +// field is 'label' and sources are {'label1': '1', 'label2': '2'} // it returns true. func (filters Args) MatchKVList(field string, sources map[string]string) bool { - fieldValues := filters[field] + fieldValues := filters.fields[field] //do not filter if there is no filter set or cannot determine filter if len(fieldValues) == 0 { @@ -90,21 +145,16 @@ func (filters Args) MatchKVList(field string, sources map[string]string) bool { return false } -outer: - for _, name2match := range fieldValues { + for name2match := range fieldValues { testKV := strings.SplitN(name2match, "=", 2) - for k, v := range sources { - if len(testKV) == 1 { - if k == testKV[0] { - continue outer - } - } else if k == testKV[0] && v == testKV[1] { - continue outer - } + v, ok := sources[testKV[0]] + if !ok { + return false + } + if len(testKV) == 2 && testKV[1] != v { + return false } - - return false } return true @@ -115,13 +165,12 @@ outer: // field is 'image.name' and source is 'ubuntu' // it returns true. func (filters Args) Match(field, source string) bool { - fieldValues := filters[field] - - //do not filter if there is no filter set or cannot determine filter - if len(fieldValues) == 0 { + if filters.ExactMatch(field, source) { return true } - for _, name2match := range fieldValues { + + fieldValues := filters.fields[field] + for name2match := range fieldValues { match, err := regexp.MatchString(name2match, source) if err != nil { continue @@ -132,3 +181,61 @@ func (filters Args) Match(field, source string) bool { } return false } + +// ExactMatch returns true if the source matches exactly one of the filters. +func (filters Args) ExactMatch(field, source string) bool { + fieldValues, ok := filters.fields[field] + //do not filter if there is no filter set or cannot determine filter + if !ok || len(fieldValues) == 0 { + return true + } + + // try to march full name value to avoid O(N) regular expression matching + if fieldValues[source] { + return true + } + return false +} + +// Include returns true if the name of the field to filter is in the filters. +func (filters Args) Include(field string) bool { + _, ok := filters.fields[field] + return ok +} + +// Validate ensures that all the fields in the filter are valid. +// It returns an error as soon as it finds an invalid field. +func (filters Args) Validate(accepted map[string]bool) error { + for name := range filters.fields { + if !accepted[name] { + return fmt.Errorf("Invalid filter '%s'", name) + } + } + return nil +} + +// WalkValues iterates over the list of filtered values for a field. +// It stops the iteration if it finds an error and it returns that error. +func (filters Args) WalkValues(field string, op func(value string) error) error { + if _, ok := filters.fields[field]; !ok { + return nil + } + for v := range filters.fields[field] { + if err := op(v); err != nil { + return err + } + } + return nil +} + +func deprecatedArgs(d map[string][]string) map[string]map[string]bool { + m := map[string]map[string]bool{} + for k, v := range d { + values := map[string]bool{} + for _, vv := range v { + values[vv] = true + } + m[k] = values + } + return m +} diff --git a/parsers/filters/parse_test.go b/parsers/filters/parse_test.go index eb9fcef..308d1bc 100644 --- a/parsers/filters/parse_test.go +++ b/parsers/filters/parse_test.go @@ -1,7 +1,7 @@ package filters import ( - "sort" + "fmt" "testing" ) @@ -13,7 +13,7 @@ func TestParseArgs(t *testing.T) { "image.name=*untu", } var ( - args = Args{} + args = NewArgs() err error ) for i := range flagArgs { @@ -22,10 +22,10 @@ func TestParseArgs(t *testing.T) { t.Errorf("failed to parse %s: %s", flagArgs[i], err) } } - if len(args["created"]) != 1 { + if len(args.Get("created")) != 1 { t.Errorf("failed to set this arg") } - if len(args["image.name"]) != 2 { + if len(args.Get("image.name")) != 2 { t.Errorf("the args should have collapsed") } } @@ -36,7 +36,7 @@ func TestParseArgsEdgeCase(t *testing.T) { if err != nil { t.Fatal(err) } - if args == nil || len(args) != 0 { + if args.Len() != 0 { t.Fatalf("Expected an empty Args (map), got %v", args) } if args, err = ParseFlag("anything", args); err == nil || err != ErrBadFormat { @@ -45,10 +45,11 @@ func TestParseArgsEdgeCase(t *testing.T) { } func TestToParam(t *testing.T) { - a := Args{ - "created": []string{"today"}, - "image.name": []string{"ubuntu*", "*untu"}, + fields := map[string]map[string]bool{ + "created": {"today": true}, + "image.name": {"ubuntu*": true, "*untu": true}, } + a := Args{fields: fields} _, err := ToParam(a) if err != nil { @@ -63,42 +64,48 @@ func TestFromParam(t *testing.T) { "{'key': 'value'}", `{"key": "value"}`, } - valids := map[string]Args{ - `{"key": ["value"]}`: { - "key": {"value"}, + valid := map[*Args][]string{ + &Args{fields: map[string]map[string]bool{"key": {"value": true}}}: { + `{"key": ["value"]}`, + `{"key": {"value": true}}`, }, - `{"key": ["value1", "value2"]}`: { - "key": {"value1", "value2"}, + &Args{fields: map[string]map[string]bool{"key": {"value1": true, "value2": true}}}: { + `{"key": ["value1", "value2"]}`, + `{"key": {"value1": true, "value2": true}}`, }, - `{"key1": ["value1"], "key2": ["value2"]}`: { - "key1": {"value1"}, - "key2": {"value2"}, + &Args{fields: map[string]map[string]bool{"key1": {"value1": true}, "key2": {"value2": true}}}: { + `{"key1": ["value1"], "key2": ["value2"]}`, + `{"key1": {"value1": true}, "key2": {"value2": true}}`, }, } + for _, invalid := range invalids { if _, err := FromParam(invalid); err == nil { t.Fatalf("Expected an error with %v, got nothing", invalid) } } - for json, expectedArgs := range valids { - args, err := FromParam(json) - if err != nil { - t.Fatal(err) - } - if len(args) != len(expectedArgs) { - t.Fatalf("Expected %v, go %v", expectedArgs, args) - } - for key, expectedValues := range expectedArgs { - values := args[key] - sort.Strings(values) - sort.Strings(expectedValues) - if len(values) != len(expectedValues) { + + for expectedArgs, matchers := range valid { + for _, json := range matchers { + args, err := FromParam(json) + if err != nil { + t.Fatal(err) + } + if args.Len() != expectedArgs.Len() { t.Fatalf("Expected %v, go %v", expectedArgs, args) } - for index, expectedValue := range expectedValues { - if values[index] != expectedValue { + for key, expectedValues := range expectedArgs.fields { + values := args.Get(key) + + if len(values) != len(expectedValues) { t.Fatalf("Expected %v, go %v", expectedArgs, args) } + + for _, v := range values { + if !expectedValues[v] { + t.Fatalf("Expected %v, go %v", expectedArgs, args) + } + } } } } @@ -114,54 +121,63 @@ func TestEmpty(t *testing.T) { if err != nil { t.Errorf("%s", err) } - if len(a) != len(v1) { + if a.Len() != v1.Len() { t.Errorf("these should both be empty sets") } } -func TestArgsMatchKVList(t *testing.T) { - // empty sources - args := Args{ - "created": []string{"today"}, +func TestArgsMatchKVListEmptySources(t *testing.T) { + args := NewArgs() + if !args.MatchKVList("created", map[string]string{}) { + t.Fatalf("Expected true for (%v,created), got true", args) } + + args = Args{map[string]map[string]bool{"created": {"today": true}}} if args.MatchKVList("created", map[string]string{}) { t.Fatalf("Expected false for (%v,created), got true", args) } +} + +func TestArgsMatchKVList(t *testing.T) { // Not empty sources sources := map[string]string{ "key1": "value1", "key2": "value2", "key3": "value3", } + matches := map[*Args]string{ &Args{}: "field", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1": true}}, }: "labels", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1=value1"}, - }: "labels", - } - differs := map[*Args]string{ - &Args{ - "created": []string{"today"}, - }: "created", - &Args{ - "created": []string{"today"}, - "labels": []string{"key4"}, - }: "labels", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1=value3"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1=value1": true}}, }: "labels", } + for args, field := range matches { if args.MatchKVList(field, sources) != true { t.Fatalf("Expected true for %v on %v, got false", sources, args) } } + + differs := map[*Args]string{ + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key4": true}}, + }: "labels", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1=value3": true}}, + }: "labels", + } + for args, field := range differs { if args.MatchKVList(field, sources) != false { t.Fatalf("Expected false for %v on %v, got true", sources, args) @@ -171,48 +187,165 @@ func TestArgsMatchKVList(t *testing.T) { func TestArgsMatch(t *testing.T) { source := "today" + matches := map[*Args]string{ &Args{}: "field", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}}, }: "today", - &Args{ - "created": []string{"to*"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to*": true}}, }: "created", - &Args{ - "created": []string{"to(.*)"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to(.*)": true}}, }: "created", - &Args{ - "created": []string{"tod"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tod": true}}, }: "created", - &Args{ - "created": []string{"anything", "to*"}, - }: "created", - } - differs := map[*Args]string{ - &Args{ - "created": []string{"tomorrow"}, - }: "created", - &Args{ - "created": []string{"to(day"}, - }: "created", - &Args{ - "created": []string{"tom(.*)"}, - }: "created", - &Args{ - "created": []string{"today1"}, - "labels": []string{"today"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"anyting": true, "to*": true}}, }: "created", } + for args, field := range matches { if args.Match(field, source) != true { t.Fatalf("Expected true for %v on %v, got false", source, args) } } + + differs := map[*Args]string{ + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tomorrow": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to(day": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tom(.*)": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tom": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today1": true}, + "labels": map[string]bool{"today": true}}, + }: "created", + } + for args, field := range differs { if args.Match(field, source) != false { t.Fatalf("Expected false for %v on %v, got true", source, args) } } } + +func TestAdd(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + v := f.fields["status"] + if len(v) != 1 || !v["running"] { + t.Fatalf("Expected to include a running status, got %v", v) + } + + f.Add("status", "paused") + if len(v) != 2 || !v["paused"] { + t.Fatalf("Expected to include a paused status, got %v", v) + } +} + +func TestDel(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + f.Del("status", "running") + v := f.fields["status"] + if v["running"] { + t.Fatalf("Expected to not include a running status filter, got true") + } +} + +func TestLen(t *testing.T) { + f := NewArgs() + if f.Len() != 0 { + t.Fatalf("Expected to not include any field") + } + f.Add("status", "running") + if f.Len() != 1 { + t.Fatalf("Expected to include one field") + } +} + +func TestExactMatch(t *testing.T) { + f := NewArgs() + + if !f.ExactMatch("status", "running") { + t.Fatalf("Expected to match `running` when there are no filters, got false") + } + + f.Add("status", "running") + f.Add("status", "pause*") + + if !f.ExactMatch("status", "running") { + t.Fatalf("Expected to match `running` with one of the filters, got false") + } + + if f.ExactMatch("status", "paused") { + t.Fatalf("Expected to not match `paused` with one of the filters, got true") + } +} + +func TestInclude(t *testing.T) { + f := NewArgs() + if f.Include("status") { + t.Fatalf("Expected to not include a status key, got true") + } + f.Add("status", "running") + if !f.Include("status") { + t.Fatalf("Expected to include a status key, got false") + } +} + +func TestValidate(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + + valid := map[string]bool{ + "status": true, + "dangling": true, + } + + if err := f.Validate(valid); err != nil { + t.Fatal(err) + } + + f.Add("bogus", "running") + if err := f.Validate(valid); err == nil { + t.Fatalf("Expected to return an error, got nil") + } +} + +func TestWalkValues(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + f.Add("status", "paused") + + f.WalkValues("status", func(value string) error { + if value != "running" && value != "paused" { + t.Fatalf("Unexpected value %s", value) + } + return nil + }) + + err := f.WalkValues("status", func(value string) error { + return fmt.Errorf("return") + }) + if err == nil { + t.Fatalf("Expected to get an error, got nil") + } + + err = f.WalkValues("foo", func(value string) error { + return fmt.Errorf("return") + }) + if err != nil { + t.Fatalf("Expected to not iterate when the field doesn't exist, got %v", err) + } +} diff --git a/pubsub/publisher.go b/pubsub/publisher.go index ab457cf..8529ffa 100644 --- a/pubsub/publisher.go +++ b/pubsub/publisher.go @@ -13,11 +13,12 @@ func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, - subscribers: make(map[subscriber]struct{}), + subscribers: make(map[subscriber]topicFunc), } } type subscriber chan interface{} +type topicFunc func(v interface{}) bool // Publisher is basic pub/sub structure. Allows to send events and subscribe // to them. Can be safely used from multiple goroutines. @@ -25,7 +26,7 @@ type Publisher struct { m sync.RWMutex buffer int timeout time.Duration - subscribers map[subscriber]struct{} + subscribers map[subscriber]topicFunc } // Len returns the number of subscribers for the publisher @@ -38,9 +39,14 @@ func (p *Publisher) Len() int { // Subscribe adds a new subscriber to the publisher returning the channel. func (p *Publisher) Subscribe() chan interface{} { + return p.SubscribeTopic(nil) +} + +// SubscribeTopic adds a new subscriber that filters messages sent by a topic. +func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() - p.subscribers[ch] = struct{}{} + p.subscribers[ch] = topic p.m.Unlock() return ch } @@ -56,20 +62,13 @@ func (p *Publisher) Evict(sub chan interface{}) { // Publish sends the data in v to all subscribers currently registered with the publisher. func (p *Publisher) Publish(v interface{}) { p.m.RLock() - for sub := range p.subscribers { - // send under a select as to not block if the receiver is unavailable - if p.timeout > 0 { - select { - case sub <- v: - case <-time.After(p.timeout): - } - continue - } - select { - case sub <- v: - default: - } + wg := new(sync.WaitGroup) + for sub, topic := range p.subscribers { + wg.Add(1) + + go p.sendTopic(sub, topic, v, wg) } + wg.Wait() p.m.RUnlock() } @@ -82,3 +81,24 @@ func (p *Publisher) Close() { } p.m.Unlock() } + +func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { + defer wg.Done() + if topic != nil && !topic(v) { + return + } + + // send under a select as to not block if the receiver is unavailable + if p.timeout > 0 { + select { + case sub <- v: + case <-time.After(p.timeout): + } + return + } + + select { + case sub <- v: + default: + } +}