325 lines
8.9 KiB
Go
325 lines
8.9 KiB
Go
|
// Copyright 2014 Google Inc. All Rights Reserved.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package pubsub
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"math/rand"
|
||
|
"reflect"
|
||
|
"sync"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"golang.org/x/net/context"
|
||
|
|
||
|
"cloud.google.com/go/internal/testutil"
|
||
|
"google.golang.org/api/iterator"
|
||
|
"google.golang.org/api/option"
|
||
|
)
|
||
|
|
||
|
const timeout = time.Minute * 10
|
||
|
const ackDeadline = time.Second * 10
|
||
|
|
||
|
const batchSize = 100
|
||
|
const batches = 100
|
||
|
|
||
|
// messageCounter keeps track of how many times a given message has been received.
|
||
|
type messageCounter struct {
|
||
|
mu sync.Mutex
|
||
|
counts map[string]int
|
||
|
// A value is sent to recv each time Inc is called.
|
||
|
recv chan struct{}
|
||
|
}
|
||
|
|
||
|
func (mc *messageCounter) Inc(msgID string) {
|
||
|
mc.mu.Lock()
|
||
|
mc.counts[msgID] += 1
|
||
|
mc.mu.Unlock()
|
||
|
mc.recv <- struct{}{}
|
||
|
}
|
||
|
|
||
|
// process pulls messages from an iterator and records them in mc.
|
||
|
func process(t *testing.T, it *MessageIterator, mc *messageCounter) {
|
||
|
for {
|
||
|
m, err := it.Next()
|
||
|
if err == iterator.Done {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if err != nil {
|
||
|
t.Errorf("unexpected err from iterator: %v", err)
|
||
|
return
|
||
|
}
|
||
|
mc.Inc(m.ID)
|
||
|
// Simulate time taken to process m, while continuing to process more messages.
|
||
|
go func() {
|
||
|
// Some messages will need to have their ack deadline extended due to this delay.
|
||
|
delay := rand.Intn(int(ackDeadline * 3))
|
||
|
time.After(time.Duration(delay))
|
||
|
m.Done(true)
|
||
|
}()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// newIter constructs a new MessageIterator.
|
||
|
func newIter(t *testing.T, ctx context.Context, sub *Subscription) *MessageIterator {
|
||
|
it, err := sub.Pull(ctx)
|
||
|
if err != nil {
|
||
|
t.Fatalf("error constructing iterator: %v", err)
|
||
|
}
|
||
|
return it
|
||
|
}
|
||
|
|
||
|
// launchIter launches a number of goroutines to pull from the supplied MessageIterator.
|
||
|
func launchIter(t *testing.T, ctx context.Context, it *MessageIterator, mc *messageCounter, n int, wg *sync.WaitGroup) {
|
||
|
for j := 0; j < n; j++ {
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
process(t, it, mc)
|
||
|
}()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// iteratorLifetime controls how long iterators live for before they are stopped.
|
||
|
type iteratorLifetimes interface {
|
||
|
// lifetimeChan should be called when an iterator is started. The
|
||
|
// returned channel will send when the iterator should be stopped.
|
||
|
lifetimeChan() <-chan time.Time
|
||
|
}
|
||
|
|
||
|
var immortal = &explicitLifetimes{}
|
||
|
|
||
|
// explicitLifetimes implements iteratorLifetime with hard-coded lifetimes, falling back
|
||
|
// to indefinite lifetimes when no explicit lifetimes remain.
|
||
|
type explicitLifetimes struct {
|
||
|
mu sync.Mutex
|
||
|
lifetimes []time.Duration
|
||
|
}
|
||
|
|
||
|
func (el *explicitLifetimes) lifetimeChan() <-chan time.Time {
|
||
|
el.mu.Lock()
|
||
|
defer el.mu.Unlock()
|
||
|
if len(el.lifetimes) == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
lifetime := el.lifetimes[0]
|
||
|
el.lifetimes = el.lifetimes[1:]
|
||
|
return time.After(lifetime)
|
||
|
}
|
||
|
|
||
|
// consumer consumes messages according to its configuration.
|
||
|
type consumer struct {
|
||
|
// How many goroutines should pull from the subscription.
|
||
|
iteratorsInFlight int
|
||
|
// How many goroutines should pull from each iterator.
|
||
|
concurrencyPerIterator int
|
||
|
|
||
|
lifetimes iteratorLifetimes
|
||
|
}
|
||
|
|
||
|
// consume reads messages from a subscription, and keeps track of what it receives in mc.
|
||
|
// After consume returns, the caller should wait on wg to ensure that no more updates to mc will be made.
|
||
|
func (c *consumer) consume(t *testing.T, ctx context.Context, sub *Subscription, mc *messageCounter, wg *sync.WaitGroup, stop <-chan struct{}) {
|
||
|
for i := 0; i < c.iteratorsInFlight; i++ {
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
for {
|
||
|
it := newIter(t, ctx, sub)
|
||
|
launchIter(t, ctx, it, mc, c.concurrencyPerIterator, wg)
|
||
|
|
||
|
select {
|
||
|
case <-c.lifetimes.lifetimeChan():
|
||
|
it.Stop()
|
||
|
case <-stop:
|
||
|
it.Stop()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// publish publishes many messages to topic, and returns the published message ids.
|
||
|
func publish(t *testing.T, ctx context.Context, topic *Topic) []string {
|
||
|
var published []string
|
||
|
msgs := make([]*Message, batchSize)
|
||
|
for i := 0; i < batches; i++ {
|
||
|
for j := 0; j < batchSize; j++ {
|
||
|
text := fmt.Sprintf("msg %02d-%02d", i, j)
|
||
|
msgs[j] = &Message{Data: []byte(text)}
|
||
|
}
|
||
|
ids, err := topic.Publish(ctx, msgs...)
|
||
|
if err != nil {
|
||
|
t.Errorf("Publish error: %v", err)
|
||
|
}
|
||
|
published = append(published, ids...)
|
||
|
}
|
||
|
return published
|
||
|
}
|
||
|
|
||
|
// diff returns counts of the differences between got and want.
|
||
|
func diff(got, want map[string]int) map[string]int {
|
||
|
ids := make(map[string]struct{})
|
||
|
for k := range got {
|
||
|
ids[k] = struct{}{}
|
||
|
}
|
||
|
for k := range want {
|
||
|
ids[k] = struct{}{}
|
||
|
}
|
||
|
|
||
|
gotWantCount := make(map[string]int)
|
||
|
for k := range ids {
|
||
|
if got[k] == want[k] {
|
||
|
continue
|
||
|
}
|
||
|
desc := fmt.Sprintf("<got: %v ; want: %v>", got[k], want[k])
|
||
|
gotWantCount[desc] += 1
|
||
|
}
|
||
|
return gotWantCount
|
||
|
}
|
||
|
|
||
|
// TestEndToEnd pumps many messages into a topic and tests that they are all delivered to each subscription for the topic.
|
||
|
// It also tests that messages are not unexpectedly redelivered.
|
||
|
func TestEndToEnd(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Integration tests skipped in short mode")
|
||
|
}
|
||
|
ctx := context.Background()
|
||
|
ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
|
||
|
if ts == nil {
|
||
|
t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
|
||
|
}
|
||
|
|
||
|
now := time.Now()
|
||
|
topicName := fmt.Sprintf("endtoend-%d", now.Unix())
|
||
|
subPrefix := fmt.Sprintf("endtoend-%d", now.Unix())
|
||
|
|
||
|
client, err := NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
|
||
|
if err != nil {
|
||
|
t.Fatalf("Creating client error: %v", err)
|
||
|
}
|
||
|
|
||
|
var topic *Topic
|
||
|
if topic, err = client.CreateTopic(ctx, topicName); err != nil {
|
||
|
t.Fatalf("CreateTopic error: %v", err)
|
||
|
}
|
||
|
defer topic.Delete(ctx)
|
||
|
|
||
|
// Three subscriptions to the same topic.
|
||
|
var subA, subB, subC *Subscription
|
||
|
if subA, err = client.CreateSubscription(ctx, subPrefix+"-a", topic, ackDeadline, nil); err != nil {
|
||
|
t.Fatalf("CreateSub error: %v", err)
|
||
|
}
|
||
|
defer subA.Delete(ctx)
|
||
|
|
||
|
if subB, err = client.CreateSubscription(ctx, subPrefix+"-b", topic, ackDeadline, nil); err != nil {
|
||
|
t.Fatalf("CreateSub error: %v", err)
|
||
|
}
|
||
|
defer subB.Delete(ctx)
|
||
|
|
||
|
if subC, err = client.CreateSubscription(ctx, subPrefix+"-c", topic, ackDeadline, nil); err != nil {
|
||
|
t.Fatalf("CreateSub error: %v", err)
|
||
|
}
|
||
|
defer subC.Delete(ctx)
|
||
|
|
||
|
expectedCounts := make(map[string]int)
|
||
|
for _, id := range publish(t, ctx, topic) {
|
||
|
expectedCounts[id] = 1
|
||
|
}
|
||
|
|
||
|
// recv provides an indication that messages are still arriving.
|
||
|
recv := make(chan struct{})
|
||
|
|
||
|
// Keep track of the number of times each message (by message id) was
|
||
|
// seen from each subscription.
|
||
|
mcA := &messageCounter{counts: make(map[string]int), recv: recv}
|
||
|
mcB := &messageCounter{counts: make(map[string]int), recv: recv}
|
||
|
mcC := &messageCounter{counts: make(map[string]int), recv: recv}
|
||
|
|
||
|
stopC := make(chan struct{})
|
||
|
|
||
|
// We have three subscriptions to our topic.
|
||
|
// Each subscription will get a copy of each pulished message.
|
||
|
//
|
||
|
// subA has just one iterator, while subB has two. The subB iterators
|
||
|
// will each process roughly half of the messages for subB. All of
|
||
|
// these iterators live until all messages have been consumed. subC is
|
||
|
// processed by a series of short-lived iterators.
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
|
||
|
con := &consumer{
|
||
|
concurrencyPerIterator: 1,
|
||
|
iteratorsInFlight: 2,
|
||
|
lifetimes: immortal,
|
||
|
}
|
||
|
con.consume(t, ctx, subA, mcA, &wg, stopC)
|
||
|
|
||
|
con = &consumer{
|
||
|
concurrencyPerIterator: 1,
|
||
|
iteratorsInFlight: 2,
|
||
|
lifetimes: immortal,
|
||
|
}
|
||
|
con.consume(t, ctx, subB, mcB, &wg, stopC)
|
||
|
|
||
|
con = &consumer{
|
||
|
concurrencyPerIterator: 1,
|
||
|
iteratorsInFlight: 2,
|
||
|
lifetimes: &explicitLifetimes{
|
||
|
lifetimes: []time.Duration{ackDeadline, ackDeadline, ackDeadline / 2, ackDeadline / 2},
|
||
|
},
|
||
|
}
|
||
|
con.consume(t, ctx, subC, mcC, &wg, stopC)
|
||
|
|
||
|
go func() {
|
||
|
timeoutC := time.After(timeout)
|
||
|
// Every time this ticker ticks, we will check if we have received any
|
||
|
// messages since the last time it ticked. We check less frequently
|
||
|
// than the ack deadline, so that we can detect if messages are
|
||
|
// redelivered after having their ack deadline extended.
|
||
|
checkQuiescence := time.NewTicker(ackDeadline * 3)
|
||
|
defer checkQuiescence.Stop()
|
||
|
|
||
|
var received bool
|
||
|
for {
|
||
|
select {
|
||
|
case <-recv:
|
||
|
received = true
|
||
|
case <-checkQuiescence.C:
|
||
|
if received {
|
||
|
received = false
|
||
|
} else {
|
||
|
close(stopC)
|
||
|
return
|
||
|
}
|
||
|
case <-timeoutC:
|
||
|
t.Errorf("timed out")
|
||
|
close(stopC)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
wg.Wait()
|
||
|
|
||
|
for _, mc := range []*messageCounter{mcA, mcB, mcC} {
|
||
|
if got, want := mc.counts, expectedCounts; !reflect.DeepEqual(got, want) {
|
||
|
t.Errorf("message counts: %v\n", diff(got, want))
|
||
|
}
|
||
|
}
|
||
|
}
|