Merge pull request #18266 from calavera/events_pub_sub
Event PubSub topics + linear filtering.
This commit is contained in:
commit
de5a23ebff
3 changed files with 388 additions and 128 deletions
|
@ -5,6 +5,7 @@ package filters
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
@ -15,7 +16,14 @@ import (
|
||||||
// in an slice.
|
// in an slice.
|
||||||
// e.g given -f 'label=label1=1' -f 'label=label2=2' -f 'image.name=ubuntu'
|
// e.g given -f 'label=label1=1' -f 'label=label2=2' -f 'image.name=ubuntu'
|
||||||
// the args will be {'label': {'label1=1','label2=2'}, 'image.name', {'ubuntu'}}
|
// the args will be {'label': {'label1=1','label2=2'}, 'image.name', {'ubuntu'}}
|
||||||
type Args map[string][]string
|
type Args struct {
|
||||||
|
fields map[string]map[string]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewArgs initializes a new Args struct.
|
||||||
|
func NewArgs() Args {
|
||||||
|
return Args{fields: map[string]map[string]bool{}}
|
||||||
|
}
|
||||||
|
|
||||||
// ParseFlag parses the argument to the filter flag. Like
|
// ParseFlag parses the argument to the filter flag. Like
|
||||||
//
|
//
|
||||||
|
@ -25,9 +33,6 @@ type Args map[string][]string
|
||||||
// map is created.
|
// map is created.
|
||||||
func ParseFlag(arg string, prev Args) (Args, error) {
|
func ParseFlag(arg string, prev Args) (Args, error) {
|
||||||
filters := prev
|
filters := prev
|
||||||
if prev == nil {
|
|
||||||
filters = Args{}
|
|
||||||
}
|
|
||||||
if len(arg) == 0 {
|
if len(arg) == 0 {
|
||||||
return filters, nil
|
return filters, nil
|
||||||
}
|
}
|
||||||
|
@ -37,9 +42,11 @@ func ParseFlag(arg string, prev Args) (Args, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
f := strings.SplitN(arg, "=", 2)
|
f := strings.SplitN(arg, "=", 2)
|
||||||
|
|
||||||
name := strings.ToLower(strings.TrimSpace(f[0]))
|
name := strings.ToLower(strings.TrimSpace(f[0]))
|
||||||
value := strings.TrimSpace(f[1])
|
value := strings.TrimSpace(f[1])
|
||||||
filters[name] = append(filters[name], value)
|
|
||||||
|
filters.Add(name, value)
|
||||||
|
|
||||||
return filters, nil
|
return filters, nil
|
||||||
}
|
}
|
||||||
|
@ -50,11 +57,11 @@ var ErrBadFormat = errors.New("bad format of filter (expected name=value)")
|
||||||
// ToParam packs the Args into an string for easy transport from client to server.
|
// ToParam packs the Args into an string for easy transport from client to server.
|
||||||
func ToParam(a Args) (string, error) {
|
func ToParam(a Args) (string, error) {
|
||||||
// this way we don't URL encode {}, just empty space
|
// this way we don't URL encode {}, just empty space
|
||||||
if len(a) == 0 {
|
if a.Len() == 0 {
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
buf, err := json.Marshal(a)
|
buf, err := json.Marshal(a.fields)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -63,23 +70,71 @@ func ToParam(a Args) (string, error) {
|
||||||
|
|
||||||
// FromParam unpacks the filter Args.
|
// FromParam unpacks the filter Args.
|
||||||
func FromParam(p string) (Args, error) {
|
func FromParam(p string) (Args, error) {
|
||||||
args := Args{}
|
|
||||||
if len(p) == 0 {
|
if len(p) == 0 {
|
||||||
return args, nil
|
return NewArgs(), nil
|
||||||
}
|
}
|
||||||
if err := json.NewDecoder(strings.NewReader(p)).Decode(&args); err != nil {
|
|
||||||
return nil, err
|
r := strings.NewReader(p)
|
||||||
|
d := json.NewDecoder(r)
|
||||||
|
|
||||||
|
m := map[string]map[string]bool{}
|
||||||
|
if err := d.Decode(&m); err != nil {
|
||||||
|
r.Seek(0, 0)
|
||||||
|
|
||||||
|
// Allow parsing old arguments in slice format.
|
||||||
|
// Because other libraries might be sending them in this format.
|
||||||
|
deprecated := map[string][]string{}
|
||||||
|
if deprecatedErr := d.Decode(&deprecated); deprecatedErr == nil {
|
||||||
|
m = deprecatedArgs(deprecated)
|
||||||
|
} else {
|
||||||
|
return NewArgs(), err
|
||||||
}
|
}
|
||||||
return args, nil
|
}
|
||||||
|
return Args{m}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the list of values associates with a field.
|
||||||
|
// It returns a slice of strings to keep backwards compatibility with old code.
|
||||||
|
func (filters Args) Get(field string) []string {
|
||||||
|
values := filters.fields[field]
|
||||||
|
if values == nil {
|
||||||
|
return make([]string, 0)
|
||||||
|
}
|
||||||
|
slice := make([]string, 0, len(values))
|
||||||
|
for key := range values {
|
||||||
|
slice = append(slice, key)
|
||||||
|
}
|
||||||
|
return slice
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add adds a new value to a filter field.
|
||||||
|
func (filters Args) Add(name, value string) {
|
||||||
|
if _, ok := filters.fields[name]; ok {
|
||||||
|
filters.fields[name][value] = true
|
||||||
|
} else {
|
||||||
|
filters.fields[name] = map[string]bool{value: true}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Del removes a value from a filter field.
|
||||||
|
func (filters Args) Del(name, value string) {
|
||||||
|
if _, ok := filters.fields[name]; ok {
|
||||||
|
delete(filters.fields[name], value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len returns the number of fields in the arguments.
|
||||||
|
func (filters Args) Len() int {
|
||||||
|
return len(filters.fields)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MatchKVList returns true if the values for the specified field maches the ones
|
// MatchKVList returns true if the values for the specified field maches the ones
|
||||||
// from the sources.
|
// from the sources.
|
||||||
// e.g. given Args are {'label': {'label1=1','label2=1'}, 'image.name', {'ubuntu'}},
|
// e.g. given Args are {'label': {'label1=1','label2=1'}, 'image.name', {'ubuntu'}},
|
||||||
// field is 'label' and sources are {'label':{'label1=1','label2=2','label3=3'}}
|
// field is 'label' and sources are {'label1': '1', 'label2': '2'}
|
||||||
// it returns true.
|
// it returns true.
|
||||||
func (filters Args) MatchKVList(field string, sources map[string]string) bool {
|
func (filters Args) MatchKVList(field string, sources map[string]string) bool {
|
||||||
fieldValues := filters[field]
|
fieldValues := filters.fields[field]
|
||||||
|
|
||||||
//do not filter if there is no filter set or cannot determine filter
|
//do not filter if there is no filter set or cannot determine filter
|
||||||
if len(fieldValues) == 0 {
|
if len(fieldValues) == 0 {
|
||||||
|
@ -90,22 +145,17 @@ func (filters Args) MatchKVList(field string, sources map[string]string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
outer:
|
for name2match := range fieldValues {
|
||||||
for _, name2match := range fieldValues {
|
|
||||||
testKV := strings.SplitN(name2match, "=", 2)
|
testKV := strings.SplitN(name2match, "=", 2)
|
||||||
|
|
||||||
for k, v := range sources {
|
v, ok := sources[testKV[0]]
|
||||||
if len(testKV) == 1 {
|
if !ok {
|
||||||
if k == testKV[0] {
|
|
||||||
continue outer
|
|
||||||
}
|
|
||||||
} else if k == testKV[0] && v == testKV[1] {
|
|
||||||
continue outer
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
if len(testKV) == 2 && testKV[1] != v {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -115,13 +165,12 @@ outer:
|
||||||
// field is 'image.name' and source is 'ubuntu'
|
// field is 'image.name' and source is 'ubuntu'
|
||||||
// it returns true.
|
// it returns true.
|
||||||
func (filters Args) Match(field, source string) bool {
|
func (filters Args) Match(field, source string) bool {
|
||||||
fieldValues := filters[field]
|
if filters.ExactMatch(field, source) {
|
||||||
|
|
||||||
//do not filter if there is no filter set or cannot determine filter
|
|
||||||
if len(fieldValues) == 0 {
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
for _, name2match := range fieldValues {
|
|
||||||
|
fieldValues := filters.fields[field]
|
||||||
|
for name2match := range fieldValues {
|
||||||
match, err := regexp.MatchString(name2match, source)
|
match, err := regexp.MatchString(name2match, source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
|
@ -132,3 +181,61 @@ func (filters Args) Match(field, source string) bool {
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExactMatch returns true if the source matches exactly one of the filters.
|
||||||
|
func (filters Args) ExactMatch(field, source string) bool {
|
||||||
|
fieldValues, ok := filters.fields[field]
|
||||||
|
//do not filter if there is no filter set or cannot determine filter
|
||||||
|
if !ok || len(fieldValues) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to march full name value to avoid O(N) regular expression matching
|
||||||
|
if fieldValues[source] {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Include returns true if the name of the field to filter is in the filters.
|
||||||
|
func (filters Args) Include(field string) bool {
|
||||||
|
_, ok := filters.fields[field]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate ensures that all the fields in the filter are valid.
|
||||||
|
// It returns an error as soon as it finds an invalid field.
|
||||||
|
func (filters Args) Validate(accepted map[string]bool) error {
|
||||||
|
for name := range filters.fields {
|
||||||
|
if !accepted[name] {
|
||||||
|
return fmt.Errorf("Invalid filter '%s'", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WalkValues iterates over the list of filtered values for a field.
|
||||||
|
// It stops the iteration if it finds an error and it returns that error.
|
||||||
|
func (filters Args) WalkValues(field string, op func(value string) error) error {
|
||||||
|
if _, ok := filters.fields[field]; !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for v := range filters.fields[field] {
|
||||||
|
if err := op(v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deprecatedArgs(d map[string][]string) map[string]map[string]bool {
|
||||||
|
m := map[string]map[string]bool{}
|
||||||
|
for k, v := range d {
|
||||||
|
values := map[string]bool{}
|
||||||
|
for _, vv := range v {
|
||||||
|
values[vv] = true
|
||||||
|
}
|
||||||
|
m[k] = values
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package filters
|
package filters
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -13,7 +13,7 @@ func TestParseArgs(t *testing.T) {
|
||||||
"image.name=*untu",
|
"image.name=*untu",
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
args = Args{}
|
args = NewArgs()
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
for i := range flagArgs {
|
for i := range flagArgs {
|
||||||
|
@ -22,10 +22,10 @@ func TestParseArgs(t *testing.T) {
|
||||||
t.Errorf("failed to parse %s: %s", flagArgs[i], err)
|
t.Errorf("failed to parse %s: %s", flagArgs[i], err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(args["created"]) != 1 {
|
if len(args.Get("created")) != 1 {
|
||||||
t.Errorf("failed to set this arg")
|
t.Errorf("failed to set this arg")
|
||||||
}
|
}
|
||||||
if len(args["image.name"]) != 2 {
|
if len(args.Get("image.name")) != 2 {
|
||||||
t.Errorf("the args should have collapsed")
|
t.Errorf("the args should have collapsed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ func TestParseArgsEdgeCase(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if args == nil || len(args) != 0 {
|
if args.Len() != 0 {
|
||||||
t.Fatalf("Expected an empty Args (map), got %v", args)
|
t.Fatalf("Expected an empty Args (map), got %v", args)
|
||||||
}
|
}
|
||||||
if args, err = ParseFlag("anything", args); err == nil || err != ErrBadFormat {
|
if args, err = ParseFlag("anything", args); err == nil || err != ErrBadFormat {
|
||||||
|
@ -45,10 +45,11 @@ func TestParseArgsEdgeCase(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestToParam(t *testing.T) {
|
func TestToParam(t *testing.T) {
|
||||||
a := Args{
|
fields := map[string]map[string]bool{
|
||||||
"created": []string{"today"},
|
"created": {"today": true},
|
||||||
"image.name": []string{"ubuntu*", "*untu"},
|
"image.name": {"ubuntu*": true, "*untu": true},
|
||||||
}
|
}
|
||||||
|
a := Args{fields: fields}
|
||||||
|
|
||||||
_, err := ToParam(a)
|
_, err := ToParam(a)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -63,45 +64,51 @@ func TestFromParam(t *testing.T) {
|
||||||
"{'key': 'value'}",
|
"{'key': 'value'}",
|
||||||
`{"key": "value"}`,
|
`{"key": "value"}`,
|
||||||
}
|
}
|
||||||
valids := map[string]Args{
|
valid := map[*Args][]string{
|
||||||
`{"key": ["value"]}`: {
|
&Args{fields: map[string]map[string]bool{"key": {"value": true}}}: {
|
||||||
"key": {"value"},
|
`{"key": ["value"]}`,
|
||||||
|
`{"key": {"value": true}}`,
|
||||||
},
|
},
|
||||||
`{"key": ["value1", "value2"]}`: {
|
&Args{fields: map[string]map[string]bool{"key": {"value1": true, "value2": true}}}: {
|
||||||
"key": {"value1", "value2"},
|
`{"key": ["value1", "value2"]}`,
|
||||||
|
`{"key": {"value1": true, "value2": true}}`,
|
||||||
},
|
},
|
||||||
`{"key1": ["value1"], "key2": ["value2"]}`: {
|
&Args{fields: map[string]map[string]bool{"key1": {"value1": true}, "key2": {"value2": true}}}: {
|
||||||
"key1": {"value1"},
|
`{"key1": ["value1"], "key2": ["value2"]}`,
|
||||||
"key2": {"value2"},
|
`{"key1": {"value1": true}, "key2": {"value2": true}}`,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, invalid := range invalids {
|
for _, invalid := range invalids {
|
||||||
if _, err := FromParam(invalid); err == nil {
|
if _, err := FromParam(invalid); err == nil {
|
||||||
t.Fatalf("Expected an error with %v, got nothing", invalid)
|
t.Fatalf("Expected an error with %v, got nothing", invalid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for json, expectedArgs := range valids {
|
|
||||||
|
for expectedArgs, matchers := range valid {
|
||||||
|
for _, json := range matchers {
|
||||||
args, err := FromParam(json)
|
args, err := FromParam(json)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(args) != len(expectedArgs) {
|
if args.Len() != expectedArgs.Len() {
|
||||||
t.Fatalf("Expected %v, go %v", expectedArgs, args)
|
t.Fatalf("Expected %v, go %v", expectedArgs, args)
|
||||||
}
|
}
|
||||||
for key, expectedValues := range expectedArgs {
|
for key, expectedValues := range expectedArgs.fields {
|
||||||
values := args[key]
|
values := args.Get(key)
|
||||||
sort.Strings(values)
|
|
||||||
sort.Strings(expectedValues)
|
|
||||||
if len(values) != len(expectedValues) {
|
if len(values) != len(expectedValues) {
|
||||||
t.Fatalf("Expected %v, go %v", expectedArgs, args)
|
t.Fatalf("Expected %v, go %v", expectedArgs, args)
|
||||||
}
|
}
|
||||||
for index, expectedValue := range expectedValues {
|
|
||||||
if values[index] != expectedValue {
|
for _, v := range values {
|
||||||
|
if !expectedValues[v] {
|
||||||
t.Fatalf("Expected %v, go %v", expectedArgs, args)
|
t.Fatalf("Expected %v, go %v", expectedArgs, args)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEmpty(t *testing.T) {
|
func TestEmpty(t *testing.T) {
|
||||||
|
@ -114,54 +121,63 @@ func TestEmpty(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%s", err)
|
t.Errorf("%s", err)
|
||||||
}
|
}
|
||||||
if len(a) != len(v1) {
|
if a.Len() != v1.Len() {
|
||||||
t.Errorf("these should both be empty sets")
|
t.Errorf("these should both be empty sets")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestArgsMatchKVList(t *testing.T) {
|
func TestArgsMatchKVListEmptySources(t *testing.T) {
|
||||||
// empty sources
|
args := NewArgs()
|
||||||
args := Args{
|
if !args.MatchKVList("created", map[string]string{}) {
|
||||||
"created": []string{"today"},
|
t.Fatalf("Expected true for (%v,created), got true", args)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
args = Args{map[string]map[string]bool{"created": {"today": true}}}
|
||||||
if args.MatchKVList("created", map[string]string{}) {
|
if args.MatchKVList("created", map[string]string{}) {
|
||||||
t.Fatalf("Expected false for (%v,created), got true", args)
|
t.Fatalf("Expected false for (%v,created), got true", args)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestArgsMatchKVList(t *testing.T) {
|
||||||
// Not empty sources
|
// Not empty sources
|
||||||
sources := map[string]string{
|
sources := map[string]string{
|
||||||
"key1": "value1",
|
"key1": "value1",
|
||||||
"key2": "value2",
|
"key2": "value2",
|
||||||
"key3": "value3",
|
"key3": "value3",
|
||||||
}
|
}
|
||||||
|
|
||||||
matches := map[*Args]string{
|
matches := map[*Args]string{
|
||||||
&Args{}: "field",
|
&Args{}: "field",
|
||||||
&Args{
|
&Args{map[string]map[string]bool{
|
||||||
"created": []string{"today"},
|
"created": map[string]bool{"today": true},
|
||||||
"labels": []string{"key1"},
|
"labels": map[string]bool{"key1": true}},
|
||||||
}: "labels",
|
}: "labels",
|
||||||
&Args{
|
&Args{map[string]map[string]bool{
|
||||||
"created": []string{"today"},
|
"created": map[string]bool{"today": true},
|
||||||
"labels": []string{"key1=value1"},
|
"labels": map[string]bool{"key1=value1": true}},
|
||||||
}: "labels",
|
|
||||||
}
|
|
||||||
differs := map[*Args]string{
|
|
||||||
&Args{
|
|
||||||
"created": []string{"today"},
|
|
||||||
}: "created",
|
|
||||||
&Args{
|
|
||||||
"created": []string{"today"},
|
|
||||||
"labels": []string{"key4"},
|
|
||||||
}: "labels",
|
|
||||||
&Args{
|
|
||||||
"created": []string{"today"},
|
|
||||||
"labels": []string{"key1=value3"},
|
|
||||||
}: "labels",
|
}: "labels",
|
||||||
}
|
}
|
||||||
|
|
||||||
for args, field := range matches {
|
for args, field := range matches {
|
||||||
if args.MatchKVList(field, sources) != true {
|
if args.MatchKVList(field, sources) != true {
|
||||||
t.Fatalf("Expected true for %v on %v, got false", sources, args)
|
t.Fatalf("Expected true for %v on %v, got false", sources, args)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
differs := map[*Args]string{
|
||||||
|
&Args{map[string]map[string]bool{
|
||||||
|
"created": map[string]bool{"today": true}},
|
||||||
|
}: "created",
|
||||||
|
&Args{map[string]map[string]bool{
|
||||||
|
"created": map[string]bool{"today": true},
|
||||||
|
"labels": map[string]bool{"key4": true}},
|
||||||
|
}: "labels",
|
||||||
|
&Args{map[string]map[string]bool{
|
||||||
|
"created": map[string]bool{"today": true},
|
||||||
|
"labels": map[string]bool{"key1=value3": true}},
|
||||||
|
}: "labels",
|
||||||
|
}
|
||||||
|
|
||||||
for args, field := range differs {
|
for args, field := range differs {
|
||||||
if args.MatchKVList(field, sources) != false {
|
if args.MatchKVList(field, sources) != false {
|
||||||
t.Fatalf("Expected false for %v on %v, got true", sources, args)
|
t.Fatalf("Expected false for %v on %v, got true", sources, args)
|
||||||
|
@ -171,48 +187,165 @@ func TestArgsMatchKVList(t *testing.T) {
|
||||||
|
|
||||||
func TestArgsMatch(t *testing.T) {
|
func TestArgsMatch(t *testing.T) {
|
||||||
source := "today"
|
source := "today"
|
||||||
|
|
||||||
matches := map[*Args]string{
|
matches := map[*Args]string{
|
||||||
&Args{}: "field",
|
&Args{}: "field",
|
||||||
&Args{
|
&Args{map[string]map[string]bool{
|
||||||
"created": []string{"today"},
|
"created": map[string]bool{"today": true}},
|
||||||
"labels": []string{"key1"},
|
|
||||||
}: "today",
|
}: "today",
|
||||||
&Args{
|
&Args{map[string]map[string]bool{
|
||||||
"created": []string{"to*"},
|
"created": map[string]bool{"to*": true}},
|
||||||
}: "created",
|
}: "created",
|
||||||
&Args{
|
&Args{map[string]map[string]bool{
|
||||||
"created": []string{"to(.*)"},
|
"created": map[string]bool{"to(.*)": true}},
|
||||||
}: "created",
|
}: "created",
|
||||||
&Args{
|
&Args{map[string]map[string]bool{
|
||||||
"created": []string{"tod"},
|
"created": map[string]bool{"tod": true}},
|
||||||
}: "created",
|
}: "created",
|
||||||
&Args{
|
&Args{map[string]map[string]bool{
|
||||||
"created": []string{"anything", "to*"},
|
"created": map[string]bool{"anyting": true, "to*": true}},
|
||||||
}: "created",
|
|
||||||
}
|
|
||||||
differs := map[*Args]string{
|
|
||||||
&Args{
|
|
||||||
"created": []string{"tomorrow"},
|
|
||||||
}: "created",
|
|
||||||
&Args{
|
|
||||||
"created": []string{"to(day"},
|
|
||||||
}: "created",
|
|
||||||
&Args{
|
|
||||||
"created": []string{"tom(.*)"},
|
|
||||||
}: "created",
|
|
||||||
&Args{
|
|
||||||
"created": []string{"today1"},
|
|
||||||
"labels": []string{"today"},
|
|
||||||
}: "created",
|
}: "created",
|
||||||
}
|
}
|
||||||
|
|
||||||
for args, field := range matches {
|
for args, field := range matches {
|
||||||
if args.Match(field, source) != true {
|
if args.Match(field, source) != true {
|
||||||
t.Fatalf("Expected true for %v on %v, got false", source, args)
|
t.Fatalf("Expected true for %v on %v, got false", source, args)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
differs := map[*Args]string{
|
||||||
|
&Args{map[string]map[string]bool{
|
||||||
|
"created": map[string]bool{"tomorrow": true}},
|
||||||
|
}: "created",
|
||||||
|
&Args{map[string]map[string]bool{
|
||||||
|
"created": map[string]bool{"to(day": true}},
|
||||||
|
}: "created",
|
||||||
|
&Args{map[string]map[string]bool{
|
||||||
|
"created": map[string]bool{"tom(.*)": true}},
|
||||||
|
}: "created",
|
||||||
|
&Args{map[string]map[string]bool{
|
||||||
|
"created": map[string]bool{"tom": true}},
|
||||||
|
}: "created",
|
||||||
|
&Args{map[string]map[string]bool{
|
||||||
|
"created": map[string]bool{"today1": true},
|
||||||
|
"labels": map[string]bool{"today": true}},
|
||||||
|
}: "created",
|
||||||
|
}
|
||||||
|
|
||||||
for args, field := range differs {
|
for args, field := range differs {
|
||||||
if args.Match(field, source) != false {
|
if args.Match(field, source) != false {
|
||||||
t.Fatalf("Expected false for %v on %v, got true", source, args)
|
t.Fatalf("Expected false for %v on %v, got true", source, args)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAdd(t *testing.T) {
|
||||||
|
f := NewArgs()
|
||||||
|
f.Add("status", "running")
|
||||||
|
v := f.fields["status"]
|
||||||
|
if len(v) != 1 || !v["running"] {
|
||||||
|
t.Fatalf("Expected to include a running status, got %v", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
f.Add("status", "paused")
|
||||||
|
if len(v) != 2 || !v["paused"] {
|
||||||
|
t.Fatalf("Expected to include a paused status, got %v", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDel(t *testing.T) {
|
||||||
|
f := NewArgs()
|
||||||
|
f.Add("status", "running")
|
||||||
|
f.Del("status", "running")
|
||||||
|
v := f.fields["status"]
|
||||||
|
if v["running"] {
|
||||||
|
t.Fatalf("Expected to not include a running status filter, got true")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLen(t *testing.T) {
|
||||||
|
f := NewArgs()
|
||||||
|
if f.Len() != 0 {
|
||||||
|
t.Fatalf("Expected to not include any field")
|
||||||
|
}
|
||||||
|
f.Add("status", "running")
|
||||||
|
if f.Len() != 1 {
|
||||||
|
t.Fatalf("Expected to include one field")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExactMatch(t *testing.T) {
|
||||||
|
f := NewArgs()
|
||||||
|
|
||||||
|
if !f.ExactMatch("status", "running") {
|
||||||
|
t.Fatalf("Expected to match `running` when there are no filters, got false")
|
||||||
|
}
|
||||||
|
|
||||||
|
f.Add("status", "running")
|
||||||
|
f.Add("status", "pause*")
|
||||||
|
|
||||||
|
if !f.ExactMatch("status", "running") {
|
||||||
|
t.Fatalf("Expected to match `running` with one of the filters, got false")
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.ExactMatch("status", "paused") {
|
||||||
|
t.Fatalf("Expected to not match `paused` with one of the filters, got true")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInclude(t *testing.T) {
|
||||||
|
f := NewArgs()
|
||||||
|
if f.Include("status") {
|
||||||
|
t.Fatalf("Expected to not include a status key, got true")
|
||||||
|
}
|
||||||
|
f.Add("status", "running")
|
||||||
|
if !f.Include("status") {
|
||||||
|
t.Fatalf("Expected to include a status key, got false")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidate(t *testing.T) {
|
||||||
|
f := NewArgs()
|
||||||
|
f.Add("status", "running")
|
||||||
|
|
||||||
|
valid := map[string]bool{
|
||||||
|
"status": true,
|
||||||
|
"dangling": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := f.Validate(valid); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
f.Add("bogus", "running")
|
||||||
|
if err := f.Validate(valid); err == nil {
|
||||||
|
t.Fatalf("Expected to return an error, got nil")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWalkValues(t *testing.T) {
|
||||||
|
f := NewArgs()
|
||||||
|
f.Add("status", "running")
|
||||||
|
f.Add("status", "paused")
|
||||||
|
|
||||||
|
f.WalkValues("status", func(value string) error {
|
||||||
|
if value != "running" && value != "paused" {
|
||||||
|
t.Fatalf("Unexpected value %s", value)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
err := f.WalkValues("status", func(value string) error {
|
||||||
|
return fmt.Errorf("return")
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Expected to get an error, got nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = f.WalkValues("foo", func(value string) error {
|
||||||
|
return fmt.Errorf("return")
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected to not iterate when the field doesn't exist, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -13,11 +13,12 @@ func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
|
||||||
return &Publisher{
|
return &Publisher{
|
||||||
buffer: buffer,
|
buffer: buffer,
|
||||||
timeout: publishTimeout,
|
timeout: publishTimeout,
|
||||||
subscribers: make(map[subscriber]struct{}),
|
subscribers: make(map[subscriber]topicFunc),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type subscriber chan interface{}
|
type subscriber chan interface{}
|
||||||
|
type topicFunc func(v interface{}) bool
|
||||||
|
|
||||||
// Publisher is basic pub/sub structure. Allows to send events and subscribe
|
// Publisher is basic pub/sub structure. Allows to send events and subscribe
|
||||||
// to them. Can be safely used from multiple goroutines.
|
// to them. Can be safely used from multiple goroutines.
|
||||||
|
@ -25,7 +26,7 @@ type Publisher struct {
|
||||||
m sync.RWMutex
|
m sync.RWMutex
|
||||||
buffer int
|
buffer int
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
subscribers map[subscriber]struct{}
|
subscribers map[subscriber]topicFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len returns the number of subscribers for the publisher
|
// Len returns the number of subscribers for the publisher
|
||||||
|
@ -38,9 +39,14 @@ func (p *Publisher) Len() int {
|
||||||
|
|
||||||
// Subscribe adds a new subscriber to the publisher returning the channel.
|
// Subscribe adds a new subscriber to the publisher returning the channel.
|
||||||
func (p *Publisher) Subscribe() chan interface{} {
|
func (p *Publisher) Subscribe() chan interface{} {
|
||||||
|
return p.SubscribeTopic(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
|
||||||
|
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
|
||||||
ch := make(chan interface{}, p.buffer)
|
ch := make(chan interface{}, p.buffer)
|
||||||
p.m.Lock()
|
p.m.Lock()
|
||||||
p.subscribers[ch] = struct{}{}
|
p.subscribers[ch] = topic
|
||||||
p.m.Unlock()
|
p.m.Unlock()
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
@ -56,20 +62,13 @@ func (p *Publisher) Evict(sub chan interface{}) {
|
||||||
// Publish sends the data in v to all subscribers currently registered with the publisher.
|
// Publish sends the data in v to all subscribers currently registered with the publisher.
|
||||||
func (p *Publisher) Publish(v interface{}) {
|
func (p *Publisher) Publish(v interface{}) {
|
||||||
p.m.RLock()
|
p.m.RLock()
|
||||||
for sub := range p.subscribers {
|
wg := new(sync.WaitGroup)
|
||||||
// send under a select as to not block if the receiver is unavailable
|
for sub, topic := range p.subscribers {
|
||||||
if p.timeout > 0 {
|
wg.Add(1)
|
||||||
select {
|
|
||||||
case sub <- v:
|
go p.sendTopic(sub, topic, v, wg)
|
||||||
case <-time.After(p.timeout):
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case sub <- v:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
p.m.RUnlock()
|
p.m.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,3 +81,24 @@ func (p *Publisher) Close() {
|
||||||
}
|
}
|
||||||
p.m.Unlock()
|
p.m.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
|
||||||
|
defer wg.Done()
|
||||||
|
if topic != nil && !topic(v) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// send under a select as to not block if the receiver is unavailable
|
||||||
|
if p.timeout > 0 {
|
||||||
|
select {
|
||||||
|
case sub <- v:
|
||||||
|
case <-time.After(p.timeout):
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case sub <- v:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue