From be362ef8edd1d626895d7d32f47f879f65429266 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Wed, 25 Nov 2015 20:27:11 -0500 Subject: [PATCH 1/2] Make filtering a linear operation. Improves the current filtering implementation complixity. Currently, the best case is O(N) and worst case O(N^2) for key-value filtering. In the new implementation, the best case is O(1) and worst case O(N), again for key-value filtering. Signed-off-by: David Calavera --- parsers/filters/parse.go | 169 +++++++++++++++---- parsers/filters/parse_test.go | 295 ++++++++++++++++++++++++---------- 2 files changed, 352 insertions(+), 112 deletions(-) diff --git a/parsers/filters/parse.go b/parsers/filters/parse.go index 6c394f1..7444201 100644 --- a/parsers/filters/parse.go +++ b/parsers/filters/parse.go @@ -5,6 +5,7 @@ package filters import ( "encoding/json" "errors" + "fmt" "regexp" "strings" ) @@ -15,7 +16,14 @@ import ( // in an slice. // e.g given -f 'label=label1=1' -f 'label=label2=2' -f 'image.name=ubuntu' // the args will be {'label': {'label1=1','label2=2'}, 'image.name', {'ubuntu'}} -type Args map[string][]string +type Args struct { + fields map[string]map[string]bool +} + +// NewArgs initializes a new Args struct. +func NewArgs() Args { + return Args{fields: map[string]map[string]bool{}} +} // ParseFlag parses the argument to the filter flag. Like // @@ -25,9 +33,6 @@ type Args map[string][]string // map is created. func ParseFlag(arg string, prev Args) (Args, error) { filters := prev - if prev == nil { - filters = Args{} - } if len(arg) == 0 { return filters, nil } @@ -37,9 +42,11 @@ func ParseFlag(arg string, prev Args) (Args, error) { } f := strings.SplitN(arg, "=", 2) + name := strings.ToLower(strings.TrimSpace(f[0])) value := strings.TrimSpace(f[1]) - filters[name] = append(filters[name], value) + + filters.Add(name, value) return filters, nil } @@ -50,11 +57,11 @@ var ErrBadFormat = errors.New("bad format of filter (expected name=value)") // ToParam packs the Args into an string for easy transport from client to server. func ToParam(a Args) (string, error) { // this way we don't URL encode {}, just empty space - if len(a) == 0 { + if a.Len() == 0 { return "", nil } - buf, err := json.Marshal(a) + buf, err := json.Marshal(a.fields) if err != nil { return "", err } @@ -63,23 +70,71 @@ func ToParam(a Args) (string, error) { // FromParam unpacks the filter Args. func FromParam(p string) (Args, error) { - args := Args{} if len(p) == 0 { - return args, nil + return NewArgs(), nil } - if err := json.NewDecoder(strings.NewReader(p)).Decode(&args); err != nil { - return nil, err + + r := strings.NewReader(p) + d := json.NewDecoder(r) + + m := map[string]map[string]bool{} + if err := d.Decode(&m); err != nil { + r.Seek(0, 0) + + // Allow parsing old arguments in slice format. + // Because other libraries might be sending them in this format. + deprecated := map[string][]string{} + if deprecatedErr := d.Decode(&deprecated); deprecatedErr == nil { + m = deprecatedArgs(deprecated) + } else { + return NewArgs(), err + } } - return args, nil + return Args{m}, nil +} + +// Get returns the list of values associates with a field. +// It returns a slice of strings to keep backwards compatibility with old code. +func (filters Args) Get(field string) []string { + values := filters.fields[field] + if values == nil { + return make([]string, 0) + } + slice := make([]string, 0, len(values)) + for key := range values { + slice = append(slice, key) + } + return slice +} + +// Add adds a new value to a filter field. +func (filters Args) Add(name, value string) { + if _, ok := filters.fields[name]; ok { + filters.fields[name][value] = true + } else { + filters.fields[name] = map[string]bool{value: true} + } +} + +// Del removes a value from a filter field. +func (filters Args) Del(name, value string) { + if _, ok := filters.fields[name]; ok { + delete(filters.fields[name], value) + } +} + +// Len returns the number of fields in the arguments. +func (filters Args) Len() int { + return len(filters.fields) } // MatchKVList returns true if the values for the specified field maches the ones // from the sources. // e.g. given Args are {'label': {'label1=1','label2=1'}, 'image.name', {'ubuntu'}}, -// field is 'label' and sources are {'label':{'label1=1','label2=2','label3=3'}} +// field is 'label' and sources are {'label1': '1', 'label2': '2'} // it returns true. func (filters Args) MatchKVList(field string, sources map[string]string) bool { - fieldValues := filters[field] + fieldValues := filters.fields[field] //do not filter if there is no filter set or cannot determine filter if len(fieldValues) == 0 { @@ -90,21 +145,16 @@ func (filters Args) MatchKVList(field string, sources map[string]string) bool { return false } -outer: - for _, name2match := range fieldValues { + for name2match := range fieldValues { testKV := strings.SplitN(name2match, "=", 2) - for k, v := range sources { - if len(testKV) == 1 { - if k == testKV[0] { - continue outer - } - } else if k == testKV[0] && v == testKV[1] { - continue outer - } + v, ok := sources[testKV[0]] + if !ok { + return false + } + if len(testKV) == 2 && testKV[1] != v { + return false } - - return false } return true @@ -115,13 +165,12 @@ outer: // field is 'image.name' and source is 'ubuntu' // it returns true. func (filters Args) Match(field, source string) bool { - fieldValues := filters[field] - - //do not filter if there is no filter set or cannot determine filter - if len(fieldValues) == 0 { + if filters.ExactMatch(field, source) { return true } - for _, name2match := range fieldValues { + + fieldValues := filters.fields[field] + for name2match := range fieldValues { match, err := regexp.MatchString(name2match, source) if err != nil { continue @@ -132,3 +181,61 @@ func (filters Args) Match(field, source string) bool { } return false } + +// ExactMatch returns true if the source matches exactly one of the filters. +func (filters Args) ExactMatch(field, source string) bool { + fieldValues, ok := filters.fields[field] + //do not filter if there is no filter set or cannot determine filter + if !ok || len(fieldValues) == 0 { + return true + } + + // try to march full name value to avoid O(N) regular expression matching + if fieldValues[source] { + return true + } + return false +} + +// Include returns true if the name of the field to filter is in the filters. +func (filters Args) Include(field string) bool { + _, ok := filters.fields[field] + return ok +} + +// Validate ensures that all the fields in the filter are valid. +// It returns an error as soon as it finds an invalid field. +func (filters Args) Validate(accepted map[string]bool) error { + for name := range filters.fields { + if !accepted[name] { + return fmt.Errorf("Invalid filter '%s'", name) + } + } + return nil +} + +// WalkValues iterates over the list of filtered values for a field. +// It stops the iteration if it finds an error and it returns that error. +func (filters Args) WalkValues(field string, op func(value string) error) error { + if _, ok := filters.fields[field]; !ok { + return nil + } + for v := range filters.fields[field] { + if err := op(v); err != nil { + return err + } + } + return nil +} + +func deprecatedArgs(d map[string][]string) map[string]map[string]bool { + m := map[string]map[string]bool{} + for k, v := range d { + values := map[string]bool{} + for _, vv := range v { + values[vv] = true + } + m[k] = values + } + return m +} diff --git a/parsers/filters/parse_test.go b/parsers/filters/parse_test.go index eb9fcef..308d1bc 100644 --- a/parsers/filters/parse_test.go +++ b/parsers/filters/parse_test.go @@ -1,7 +1,7 @@ package filters import ( - "sort" + "fmt" "testing" ) @@ -13,7 +13,7 @@ func TestParseArgs(t *testing.T) { "image.name=*untu", } var ( - args = Args{} + args = NewArgs() err error ) for i := range flagArgs { @@ -22,10 +22,10 @@ func TestParseArgs(t *testing.T) { t.Errorf("failed to parse %s: %s", flagArgs[i], err) } } - if len(args["created"]) != 1 { + if len(args.Get("created")) != 1 { t.Errorf("failed to set this arg") } - if len(args["image.name"]) != 2 { + if len(args.Get("image.name")) != 2 { t.Errorf("the args should have collapsed") } } @@ -36,7 +36,7 @@ func TestParseArgsEdgeCase(t *testing.T) { if err != nil { t.Fatal(err) } - if args == nil || len(args) != 0 { + if args.Len() != 0 { t.Fatalf("Expected an empty Args (map), got %v", args) } if args, err = ParseFlag("anything", args); err == nil || err != ErrBadFormat { @@ -45,10 +45,11 @@ func TestParseArgsEdgeCase(t *testing.T) { } func TestToParam(t *testing.T) { - a := Args{ - "created": []string{"today"}, - "image.name": []string{"ubuntu*", "*untu"}, + fields := map[string]map[string]bool{ + "created": {"today": true}, + "image.name": {"ubuntu*": true, "*untu": true}, } + a := Args{fields: fields} _, err := ToParam(a) if err != nil { @@ -63,42 +64,48 @@ func TestFromParam(t *testing.T) { "{'key': 'value'}", `{"key": "value"}`, } - valids := map[string]Args{ - `{"key": ["value"]}`: { - "key": {"value"}, + valid := map[*Args][]string{ + &Args{fields: map[string]map[string]bool{"key": {"value": true}}}: { + `{"key": ["value"]}`, + `{"key": {"value": true}}`, }, - `{"key": ["value1", "value2"]}`: { - "key": {"value1", "value2"}, + &Args{fields: map[string]map[string]bool{"key": {"value1": true, "value2": true}}}: { + `{"key": ["value1", "value2"]}`, + `{"key": {"value1": true, "value2": true}}`, }, - `{"key1": ["value1"], "key2": ["value2"]}`: { - "key1": {"value1"}, - "key2": {"value2"}, + &Args{fields: map[string]map[string]bool{"key1": {"value1": true}, "key2": {"value2": true}}}: { + `{"key1": ["value1"], "key2": ["value2"]}`, + `{"key1": {"value1": true}, "key2": {"value2": true}}`, }, } + for _, invalid := range invalids { if _, err := FromParam(invalid); err == nil { t.Fatalf("Expected an error with %v, got nothing", invalid) } } - for json, expectedArgs := range valids { - args, err := FromParam(json) - if err != nil { - t.Fatal(err) - } - if len(args) != len(expectedArgs) { - t.Fatalf("Expected %v, go %v", expectedArgs, args) - } - for key, expectedValues := range expectedArgs { - values := args[key] - sort.Strings(values) - sort.Strings(expectedValues) - if len(values) != len(expectedValues) { + + for expectedArgs, matchers := range valid { + for _, json := range matchers { + args, err := FromParam(json) + if err != nil { + t.Fatal(err) + } + if args.Len() != expectedArgs.Len() { t.Fatalf("Expected %v, go %v", expectedArgs, args) } - for index, expectedValue := range expectedValues { - if values[index] != expectedValue { + for key, expectedValues := range expectedArgs.fields { + values := args.Get(key) + + if len(values) != len(expectedValues) { t.Fatalf("Expected %v, go %v", expectedArgs, args) } + + for _, v := range values { + if !expectedValues[v] { + t.Fatalf("Expected %v, go %v", expectedArgs, args) + } + } } } } @@ -114,54 +121,63 @@ func TestEmpty(t *testing.T) { if err != nil { t.Errorf("%s", err) } - if len(a) != len(v1) { + if a.Len() != v1.Len() { t.Errorf("these should both be empty sets") } } -func TestArgsMatchKVList(t *testing.T) { - // empty sources - args := Args{ - "created": []string{"today"}, +func TestArgsMatchKVListEmptySources(t *testing.T) { + args := NewArgs() + if !args.MatchKVList("created", map[string]string{}) { + t.Fatalf("Expected true for (%v,created), got true", args) } + + args = Args{map[string]map[string]bool{"created": {"today": true}}} if args.MatchKVList("created", map[string]string{}) { t.Fatalf("Expected false for (%v,created), got true", args) } +} + +func TestArgsMatchKVList(t *testing.T) { // Not empty sources sources := map[string]string{ "key1": "value1", "key2": "value2", "key3": "value3", } + matches := map[*Args]string{ &Args{}: "field", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1": true}}, }: "labels", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1=value1"}, - }: "labels", - } - differs := map[*Args]string{ - &Args{ - "created": []string{"today"}, - }: "created", - &Args{ - "created": []string{"today"}, - "labels": []string{"key4"}, - }: "labels", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1=value3"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1=value1": true}}, }: "labels", } + for args, field := range matches { if args.MatchKVList(field, sources) != true { t.Fatalf("Expected true for %v on %v, got false", sources, args) } } + + differs := map[*Args]string{ + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key4": true}}, + }: "labels", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1=value3": true}}, + }: "labels", + } + for args, field := range differs { if args.MatchKVList(field, sources) != false { t.Fatalf("Expected false for %v on %v, got true", sources, args) @@ -171,48 +187,165 @@ func TestArgsMatchKVList(t *testing.T) { func TestArgsMatch(t *testing.T) { source := "today" + matches := map[*Args]string{ &Args{}: "field", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}}, }: "today", - &Args{ - "created": []string{"to*"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to*": true}}, }: "created", - &Args{ - "created": []string{"to(.*)"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to(.*)": true}}, }: "created", - &Args{ - "created": []string{"tod"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tod": true}}, }: "created", - &Args{ - "created": []string{"anything", "to*"}, - }: "created", - } - differs := map[*Args]string{ - &Args{ - "created": []string{"tomorrow"}, - }: "created", - &Args{ - "created": []string{"to(day"}, - }: "created", - &Args{ - "created": []string{"tom(.*)"}, - }: "created", - &Args{ - "created": []string{"today1"}, - "labels": []string{"today"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"anyting": true, "to*": true}}, }: "created", } + for args, field := range matches { if args.Match(field, source) != true { t.Fatalf("Expected true for %v on %v, got false", source, args) } } + + differs := map[*Args]string{ + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tomorrow": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to(day": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tom(.*)": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tom": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today1": true}, + "labels": map[string]bool{"today": true}}, + }: "created", + } + for args, field := range differs { if args.Match(field, source) != false { t.Fatalf("Expected false for %v on %v, got true", source, args) } } } + +func TestAdd(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + v := f.fields["status"] + if len(v) != 1 || !v["running"] { + t.Fatalf("Expected to include a running status, got %v", v) + } + + f.Add("status", "paused") + if len(v) != 2 || !v["paused"] { + t.Fatalf("Expected to include a paused status, got %v", v) + } +} + +func TestDel(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + f.Del("status", "running") + v := f.fields["status"] + if v["running"] { + t.Fatalf("Expected to not include a running status filter, got true") + } +} + +func TestLen(t *testing.T) { + f := NewArgs() + if f.Len() != 0 { + t.Fatalf("Expected to not include any field") + } + f.Add("status", "running") + if f.Len() != 1 { + t.Fatalf("Expected to include one field") + } +} + +func TestExactMatch(t *testing.T) { + f := NewArgs() + + if !f.ExactMatch("status", "running") { + t.Fatalf("Expected to match `running` when there are no filters, got false") + } + + f.Add("status", "running") + f.Add("status", "pause*") + + if !f.ExactMatch("status", "running") { + t.Fatalf("Expected to match `running` with one of the filters, got false") + } + + if f.ExactMatch("status", "paused") { + t.Fatalf("Expected to not match `paused` with one of the filters, got true") + } +} + +func TestInclude(t *testing.T) { + f := NewArgs() + if f.Include("status") { + t.Fatalf("Expected to not include a status key, got true") + } + f.Add("status", "running") + if !f.Include("status") { + t.Fatalf("Expected to include a status key, got false") + } +} + +func TestValidate(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + + valid := map[string]bool{ + "status": true, + "dangling": true, + } + + if err := f.Validate(valid); err != nil { + t.Fatal(err) + } + + f.Add("bogus", "running") + if err := f.Validate(valid); err == nil { + t.Fatalf("Expected to return an error, got nil") + } +} + +func TestWalkValues(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + f.Add("status", "paused") + + f.WalkValues("status", func(value string) error { + if value != "running" && value != "paused" { + t.Fatalf("Unexpected value %s", value) + } + return nil + }) + + err := f.WalkValues("status", func(value string) error { + return fmt.Errorf("return") + }) + if err == nil { + t.Fatalf("Expected to get an error, got nil") + } + + err = f.WalkValues("foo", func(value string) error { + return fmt.Errorf("return") + }) + if err != nil { + t.Fatalf("Expected to not iterate when the field doesn't exist, got %v", err) + } +} From a72c316bf4661347de25d0b731e404261c47393a Mon Sep 17 00:00:00 2001 From: David Calavera Date: Wed, 25 Nov 2015 21:03:10 -0500 Subject: [PATCH 2/2] Add PubSub topics. A TopicFunc is an interface to let the pubisher decide whether it needs to send a message to a subscriber or not. It returns true if the publisher must send the message and false otherwise. Users of the pubsub package can create a subscriber with a topic function by calling `pubsub.SubscribeTopic`. Message delivery has also been modified to use concurrent channels per subscriber. That way, topic verification and message delivery is not o(N+M) anymore, based on the number of subscribers and topic verification complexity. Using pubsub topics, the API stops controlling the message delivery, delegating that function to a topic generated with the filtering provided by the user. The publisher sends every message to the subscriber if there is no filter, but the api doesn't have to select messages to return anymore. Signed-off-by: David Calavera --- pubsub/publisher.go | 52 +++++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/pubsub/publisher.go b/pubsub/publisher.go index ab457cf..8529ffa 100644 --- a/pubsub/publisher.go +++ b/pubsub/publisher.go @@ -13,11 +13,12 @@ func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, - subscribers: make(map[subscriber]struct{}), + subscribers: make(map[subscriber]topicFunc), } } type subscriber chan interface{} +type topicFunc func(v interface{}) bool // Publisher is basic pub/sub structure. Allows to send events and subscribe // to them. Can be safely used from multiple goroutines. @@ -25,7 +26,7 @@ type Publisher struct { m sync.RWMutex buffer int timeout time.Duration - subscribers map[subscriber]struct{} + subscribers map[subscriber]topicFunc } // Len returns the number of subscribers for the publisher @@ -38,9 +39,14 @@ func (p *Publisher) Len() int { // Subscribe adds a new subscriber to the publisher returning the channel. func (p *Publisher) Subscribe() chan interface{} { + return p.SubscribeTopic(nil) +} + +// SubscribeTopic adds a new subscriber that filters messages sent by a topic. +func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() - p.subscribers[ch] = struct{}{} + p.subscribers[ch] = topic p.m.Unlock() return ch } @@ -56,20 +62,13 @@ func (p *Publisher) Evict(sub chan interface{}) { // Publish sends the data in v to all subscribers currently registered with the publisher. func (p *Publisher) Publish(v interface{}) { p.m.RLock() - for sub := range p.subscribers { - // send under a select as to not block if the receiver is unavailable - if p.timeout > 0 { - select { - case sub <- v: - case <-time.After(p.timeout): - } - continue - } - select { - case sub <- v: - default: - } + wg := new(sync.WaitGroup) + for sub, topic := range p.subscribers { + wg.Add(1) + + go p.sendTopic(sub, topic, v, wg) } + wg.Wait() p.m.RUnlock() } @@ -82,3 +81,24 @@ func (p *Publisher) Close() { } p.m.Unlock() } + +func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { + defer wg.Done() + if topic != nil && !topic(v) { + return + } + + // send under a select as to not block if the receiver is unavailable + if p.timeout > 0 { + select { + case sub <- v: + case <-time.After(p.timeout): + } + return + } + + select { + case sub <- v: + default: + } +}