// Copyright 2017 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub // TODO(jba): test keepalive // TODO(jba): test that expired messages are not kept alive // TODO(jba): test that when all messages expire, Stop returns. import ( "io" "reflect" "strconv" "testing" "time" tspb "github.com/golang/protobuf/ptypes/timestamp" "golang.org/x/net/context" "google.golang.org/api/iterator" "google.golang.org/api/option" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) var ( timestamp = &tspb.Timestamp{} testMessages = []*pb.ReceivedMessage{ {AckId: "1", Message: &pb.PubsubMessage{Data: []byte{1}, PublishTime: timestamp}}, {AckId: "2", Message: &pb.PubsubMessage{Data: []byte{2}, PublishTime: timestamp}}, {AckId: "3", Message: &pb.PubsubMessage{Data: []byte{3}, PublishTime: timestamp}}, } ) func TestStreamingPullBasic(t *testing.T) { client, server := newFake(t) server.addStreamingPullMessages(testMessages) testStreamingPullIteration(t, client, server, testMessages) } func TestStreamingPullMultipleFetches(t *testing.T) { client, server := newFake(t) server.addStreamingPullMessages(testMessages[:1]) server.addStreamingPullMessages(testMessages[1:]) testStreamingPullIteration(t, client, server, testMessages) } func testStreamingPullIteration(t *testing.T, client *Client, server *fakeServer, msgs []*pb.ReceivedMessage) { if !useStreamingPull { t.SkipNow() } sub := client.Subscription("s") iter, err := sub.Pull(context.Background()) if err != nil { t.Fatal(err) } for i := 0; i < len(msgs); i++ { got, err := iter.Next() if err != nil { t.Fatal(err) } got.Done(i%2 == 0) // ack evens, nack odds want, err := toMessage(msgs[i]) if err != nil { t.Fatal(err) } want.calledDone = true // Don't compare done; it's a function. got.done = nil if !reflect.DeepEqual(got, want) { t.Errorf("%d: got\n%#v\nwant\n%#v", i, got, want) } } iter.Stop() server.wait() for i := 0; i < len(msgs); i++ { id := msgs[i].AckId if i%2 == 0 { if !server.Acked[id] { t.Errorf("msg %q should have been acked but wasn't", id) } } else { if dl, ok := server.Deadlines[id]; !ok || dl != 0 { t.Errorf("msg %q should have been nacked but wasn't", id) } } } } func TestStreamingPullStop(t *testing.T) { if !useStreamingPull { t.SkipNow() } // After Stop is called, Next returns iterator.Done. client, server := newFake(t) server.addStreamingPullMessages(testMessages) sub := client.Subscription("s") iter, err := sub.Pull(context.Background()) if err != nil { t.Fatal(err) } msg, err := iter.Next() if err != nil { t.Fatal(err) } msg.Done(true) iter.Stop() // Next should always return the same error. for i := 0; i < 3; i++ { _, err = iter.Next() if want := iterator.Done; err != want { t.Fatalf("got <%v> %p, want <%v> %p", err, err, want, want) } } } func TestStreamingPullError(t *testing.T) { if !useStreamingPull { t.SkipNow() } client, server := newFake(t) server.addStreamingPullError(grpc.Errorf(codes.Internal, "")) sub := client.Subscription("s") iter, err := sub.Pull(context.Background()) if err != nil { t.Fatal(err) } // Next should always return the same error. for i := 0; i < 3; i++ { _, err = iter.Next() if want := codes.Internal; grpc.Code(err) != want { t.Fatalf("got <%v>, want code %v", err, want) } } } func TestStreamingPullCancel(t *testing.T) { if !useStreamingPull { t.SkipNow() } // Test that canceling the iterator's context behaves correctly. client, server := newFake(t) server.addStreamingPullMessages(testMessages) sub := client.Subscription("s") ctx, cancel := context.WithCancel(context.Background()) iter, err := sub.Pull(ctx) if err != nil { t.Fatal(err) } _, err = iter.Next() if err != nil { t.Fatal(err) } // Here we have one message read (but not acked), and two // in the iterator's buffer. cancel() // Further calls to Next will return Canceled. _, err = iter.Next() if got, want := err, context.Canceled; got != want { t.Errorf("got %v, want %v", got, want) } // Despite the unacked message, Stop will still return promptly. done := make(chan struct{}) go func() { iter.Stop() close(done) }() select { case <-done: case <-time.After(1 * time.Second): t.Fatal("iter.Stop timed out") } } func TestStreamingPullRetry(t *testing.T) { if !useStreamingPull { t.SkipNow() } // Check that we retry on io.EOF or Unavailable. client, server := newFake(t) server.addStreamingPullMessages(testMessages[:1]) server.addStreamingPullError(io.EOF) server.addStreamingPullError(io.EOF) server.addStreamingPullMessages(testMessages[1:2]) server.addStreamingPullError(grpc.Errorf(codes.Unavailable, "")) server.addStreamingPullError(grpc.Errorf(codes.Unavailable, "")) server.addStreamingPullMessages(testMessages[2:]) testStreamingPullIteration(t, client, server, testMessages) } func TestStreamingPullConcurrent(t *testing.T) { if !useStreamingPull { t.SkipNow() } newMsg := func(i int) *pb.ReceivedMessage { return &pb.ReceivedMessage{ AckId: strconv.Itoa(i), Message: &pb.PubsubMessage{Data: []byte{byte(i)}, PublishTime: timestamp}, } } // Multiple goroutines should be able to read from the same iterator. client, server := newFake(t) // Add a lot of messages, a few at a time, to make sure both threads get a chance. nMessages := 100 for i := 0; i < nMessages; i += 2 { server.addStreamingPullMessages([]*pb.ReceivedMessage{newMsg(i), newMsg(i + 1)}) } sub := client.Subscription("s") iter, err := sub.Pull(context.Background()) if err != nil { t.Fatal(err) } seenc := make(chan string) errc := make(chan error, 2) for i := 0; i < 2; i++ { go func() { for { msg, err := iter.Next() if err == iterator.Done { return } if err != nil { errc <- err return } // Must ack before sending to channel, or Stop may hang. msg.Done(true) seenc <- msg.ackID } }() } seen := map[string]bool{} for i := 0; i < nMessages; i++ { select { case err := <-errc: t.Fatal(err) case id := <-seenc: if seen[id] { t.Fatalf("duplicate ID %q", id) } seen[id] = true } } iter.Stop() if len(seen) != nMessages { t.Fatalf("got %d messages, want %d", len(seen), nMessages) } } func newFake(t *testing.T) (*Client, *fakeServer) { srv, err := newFakeServer() if err != nil { t.Fatal(err) } conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) if err != nil { t.Fatal(err) } client, err := NewClient(context.Background(), "projectID", option.WithGRPCConn(conn)) if err != nil { t.Fatal(err) } return client, srv }