update vendor

Signed-off-by: Jess Frazelle <acidburn@microsoft.com>
This commit is contained in:
Jess Frazelle 2018-09-25 12:27:46 -04:00
parent 19a32db84d
commit 94d1cfbfbf
No known key found for this signature in database
GPG key ID: 18F3685C0022BFF3
10501 changed files with 2307943 additions and 29279 deletions

274
vendor/google.golang.org/grpc/balancer/balancer.go generated vendored Normal file
View file

@ -0,0 +1,274 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package balancer defines APIs for load balancing in gRPC.
// All APIs in this package are experimental.
package balancer
import (
"errors"
"net"
"strings"
"golang.org/x/net/context"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/resolver"
)
var (
// m is a map from name to balancer builder.
m = make(map[string]Builder)
)
// Register registers the balancer builder to the balancer map. b.Name
// (lowercased) will be used as the name registered with this builder.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Balancers are
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
m[strings.ToLower(b.Name())] = b
}
// Get returns the resolver builder registered with the given name.
// Note that the compare is done in a case-insenstive fashion.
// If no builder is register with the name, nil will be returned.
func Get(name string) Builder {
if b, ok := m[strings.ToLower(name)]; ok {
return b
}
return nil
}
// SubConn represents a gRPC sub connection.
// Each sub connection contains a list of addresses. gRPC will
// try to connect to them (in sequence), and stop trying the
// remainder once one connection is successful.
//
// The reconnect backoff will be applied on the list, not a single address.
// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
//
// All SubConns start in IDLE, and will not try to connect. To trigger
// the connecting, Balancers must call Connect.
// When the connection encounters an error, it will reconnect immediately.
// When the connection becomes IDLE, it will not reconnect unless Connect is
// called.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
// If it's in the list, the connection will be kept.
// If it's not in the list, the connection will gracefully closed, and
// a new connection will be created.
//
// This will trigger a state transition for the SubConn.
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
}
// NewSubConnOptions contains options to create new SubConn.
type NewSubConnOptions struct{}
// ClientConn represents a gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// NewSubConn is called by balancer to create a new SubConn.
// It doesn't block and wait for the connections to be established.
// Behaviors of the SubConn can be controlled by options.
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
RemoveSubConn(SubConn)
// UpdateBalancerState is called by balancer to nofity gRPC that some internal
// state in balancer has changed.
//
// gRPC will update the connectivity state of the ClientConn, and will call pick
// on the new picker to pick new SubConn.
UpdateBalancerState(s connectivity.State, p Picker)
// ResolveNow is called by balancer to notify gRPC to do a name resolving.
ResolveNow(resolver.ResolveNowOption)
// Target returns the dial target for this ClientConn.
Target() string
}
// BuildOptions contains additional information for Build.
type BuildOptions struct {
// DialCreds is the transport credential the Balancer implementation can
// use to dial to a remote load balancer server. The Balancer implementations
// can ignore this if it does not need to talk to another party securely.
DialCreds credentials.TransportCredentials
// Dialer is the custom dialer the Balancer implementation can use to dial
// to a remote load balancer server. The Balancer implementations
// can ignore this if it doesn't need to talk to remote balancer.
Dialer func(context.Context, string) (net.Conn, error)
// ChannelzParentID is the entity parent's channelz unique identification number.
ChannelzParentID int64
}
// Builder creates a balancer.
type Builder interface {
// Build creates a new balancer with the ClientConn.
Build(cc ClientConn, opts BuildOptions) Balancer
// Name returns the name of balancers built by this builder.
// It will be used to pick balancers (for example in service config).
Name() string
}
// PickOptions contains addition information for the Pick operation.
type PickOptions struct {
// FullMethodName is the method name that NewClientStream() is called
// with. The canonical format is /service/Method.
FullMethodName string
}
// DoneInfo contains additional information for done.
type DoneInfo struct {
// Err is the rpc error the RPC finished with. It could be nil.
Err error
// BytesSent indicates if any bytes have been sent to the server.
BytesSent bool
// BytesReceived indicates if any byte has been received from the server.
BytesReceived bool
}
var (
// ErrNoSubConnAvailable indicates no SubConn is available for pick().
// gRPC will block the RPC until a new picker is available via UpdateBalancerState().
ErrNoSubConnAvailable = errors.New("no SubConn is available")
// ErrTransientFailure indicates all SubConns are in TransientFailure.
// WaitForReady RPCs will block, non-WaitForReady RPCs will fail.
ErrTransientFailure = errors.New("all SubConns are in TransientFailure")
)
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
type Picker interface {
// Pick returns the SubConn to be used to send the RPC.
// The returned SubConn must be one returned by NewSubConn().
//
// This functions is expected to return:
// - a SubConn that is known to be READY;
// - ErrNoSubConnAvailable if no SubConn is available, but progress is being
// made (for example, some SubConn is in CONNECTING mode);
// - other errors if no active connecting is happening (for example, all SubConn
// are in TRANSIENT_FAILURE mode).
//
// If a SubConn is returned:
// - If it is READY, gRPC will send the RPC on it;
// - If it is not ready, or becomes not ready after it's returned, gRPC will block
// until UpdateBalancerState() is called and will call pick on the new picker.
//
// If the returned error is not nil:
// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
// - If the error is ErrTransientFailure:
// - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
// is called to pick again;
// - Otherwise, RPC will fail with unavailable error.
// - Else (error is other non-nil error):
// - The RPC will fail with unavailable error.
//
// The returned done() function will be called once the rpc has finished, with the
// final status of that RPC.
// done may be nil if balancer doesn't care about the RPC status.
Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
}
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
// HandleSubConnStateChange is called by gRPC when the connectivity state
// of sc has changed.
// Balancer is expected to aggregate all the state of SubConn and report
// that back to gRPC.
// Balancer should also generate and update Pickers when its internal state has
// been changed by the new state.
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
// balancers.
// Balancer can create new SubConn or remove SubConn with the addresses.
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
//
// It's not thread safe.
type ConnectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
// RecordTransition records state change happening in subConn and based on that
// it evaluates what aggregated state should be.
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
//
// Idle and Shutdown are not considered.
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}

208
vendor/google.golang.org/grpc/balancer/base/balancer.go generated vendored Normal file
View file

@ -0,0 +1,208 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package base
import (
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
type baseBuilder struct {
name string
pickerBuilder PickerBuilder
}
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
return &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &connectivityStateEvaluator{},
// Initialize picker to a picker that always return
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
// may call UpdateBalancerState with this picker.
picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
}
}
func (bb *baseBuilder) Name() string {
return bb.name
}
type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
csEvltr *connectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]balancer.SubConn
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
}
func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
grpclog.Infof("base.baseBalancer: HandleResolvedAddrs called with error %v", err)
return
}
grpclog.Infoln("base.baseBalancer: got new resolved addresses: ", addrs)
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range addrs {
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[a] = sc
b.scStates[sc] = connectivity.Idle
sc.Connect()
}
}
for a, sc := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(sc)
delete(b.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
}
}
}
// regeneratePicker takes a snapshot of the balancer, and generates a picker
// from it. The picker is
// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
// - built by the pickerBuilder with all READY SubConns otherwise.
func (b *baseBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = NewErrPicker(balancer.ErrTransientFailure)
return
}
readySCs := make(map[resolver.Address]balancer.SubConn)
// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[addr] = sc
}
}
b.picker = b.pickerBuilder.Build(readySCs)
}
func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
oldS, ok := b.scStates[sc]
if !ok {
grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
return
}
b.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}
oldAggrState := b.state
b.state = b.csEvltr.recordTransition(oldS, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
b.regeneratePicker()
}
b.cc.UpdateBalancerState(b.state, b.picker)
}
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (b *baseBalancer) Close() {
}
// NewErrPicker returns a picker that always returns err on Pick().
func NewErrPicker(err error) balancer.Picker {
return &errPicker{err: err}
}
type errPicker struct {
err error // Pick() always returns this err.
}
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
}
// connectivityStateEvaluator gets updated by addrConns when their
// states transition, based on which it evaluates the state of
// ClientConn.
type connectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
// recordTransition records state change happening in every subConn and based on
// that it evaluates what aggregated state should be.
// It can only transition between Ready, Connecting and TransientFailure. Other states,
// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
// before any subConn is created ClientConn is in idle state. In the end when ClientConn
// closes it is in Shutdown state.
//
// recordTransition should only be called synchronously from the same goroutine.
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}

52
vendor/google.golang.org/grpc/balancer/base/base.go generated vendored Normal file
View file

@ -0,0 +1,52 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package base defines a balancer base that can be used to build balancers with
// different picking algorithms.
//
// The base balancer creates a new SubConn for each resolved address. The
// provided picker will only be notified about READY SubConns.
//
// This package is the base of round_robin balancer, its purpose is to be used
// to build round_robin like balancers with complex picking algorithms.
// Balancers with more complicated logic should try to implement a balancer
// builder from scratch.
//
// All APIs in this package are experimental.
package base
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
// Build takes a slice of ready SubConns, and returns a picker that will be
// used by gRPC to pick a SubConn.
Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
}
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
}
}

View file

@ -0,0 +1,839 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: grpc/lb/v1/load_balancer.proto
package grpc_lb_v1 // import "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import duration "github.com/golang/protobuf/ptypes/duration"
import timestamp "github.com/golang/protobuf/ptypes/timestamp"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type LoadBalanceRequest struct {
// Types that are valid to be assigned to LoadBalanceRequestType:
// *LoadBalanceRequest_InitialRequest
// *LoadBalanceRequest_ClientStats
LoadBalanceRequestType isLoadBalanceRequest_LoadBalanceRequestType `protobuf_oneof:"load_balance_request_type"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadBalanceRequest) Reset() { *m = LoadBalanceRequest{} }
func (m *LoadBalanceRequest) String() string { return proto.CompactTextString(m) }
func (*LoadBalanceRequest) ProtoMessage() {}
func (*LoadBalanceRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_load_balancer_12026aec3f0251ba, []int{0}
}
func (m *LoadBalanceRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadBalanceRequest.Unmarshal(m, b)
}
func (m *LoadBalanceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LoadBalanceRequest.Marshal(b, m, deterministic)
}
func (dst *LoadBalanceRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_LoadBalanceRequest.Merge(dst, src)
}
func (m *LoadBalanceRequest) XXX_Size() int {
return xxx_messageInfo_LoadBalanceRequest.Size(m)
}
func (m *LoadBalanceRequest) XXX_DiscardUnknown() {
xxx_messageInfo_LoadBalanceRequest.DiscardUnknown(m)
}
var xxx_messageInfo_LoadBalanceRequest proto.InternalMessageInfo
type isLoadBalanceRequest_LoadBalanceRequestType interface {
isLoadBalanceRequest_LoadBalanceRequestType()
}
type LoadBalanceRequest_InitialRequest struct {
InitialRequest *InitialLoadBalanceRequest `protobuf:"bytes,1,opt,name=initial_request,json=initialRequest,proto3,oneof"`
}
type LoadBalanceRequest_ClientStats struct {
ClientStats *ClientStats `protobuf:"bytes,2,opt,name=client_stats,json=clientStats,proto3,oneof"`
}
func (*LoadBalanceRequest_InitialRequest) isLoadBalanceRequest_LoadBalanceRequestType() {}
func (*LoadBalanceRequest_ClientStats) isLoadBalanceRequest_LoadBalanceRequestType() {}
func (m *LoadBalanceRequest) GetLoadBalanceRequestType() isLoadBalanceRequest_LoadBalanceRequestType {
if m != nil {
return m.LoadBalanceRequestType
}
return nil
}
func (m *LoadBalanceRequest) GetInitialRequest() *InitialLoadBalanceRequest {
if x, ok := m.GetLoadBalanceRequestType().(*LoadBalanceRequest_InitialRequest); ok {
return x.InitialRequest
}
return nil
}
func (m *LoadBalanceRequest) GetClientStats() *ClientStats {
if x, ok := m.GetLoadBalanceRequestType().(*LoadBalanceRequest_ClientStats); ok {
return x.ClientStats
}
return nil
}
// XXX_OneofFuncs is for the internal use of the proto package.
func (*LoadBalanceRequest) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _LoadBalanceRequest_OneofMarshaler, _LoadBalanceRequest_OneofUnmarshaler, _LoadBalanceRequest_OneofSizer, []interface{}{
(*LoadBalanceRequest_InitialRequest)(nil),
(*LoadBalanceRequest_ClientStats)(nil),
}
}
func _LoadBalanceRequest_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
m := msg.(*LoadBalanceRequest)
// load_balance_request_type
switch x := m.LoadBalanceRequestType.(type) {
case *LoadBalanceRequest_InitialRequest:
b.EncodeVarint(1<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.InitialRequest); err != nil {
return err
}
case *LoadBalanceRequest_ClientStats:
b.EncodeVarint(2<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.ClientStats); err != nil {
return err
}
case nil:
default:
return fmt.Errorf("LoadBalanceRequest.LoadBalanceRequestType has unexpected type %T", x)
}
return nil
}
func _LoadBalanceRequest_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*LoadBalanceRequest)
switch tag {
case 1: // load_balance_request_type.initial_request
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(InitialLoadBalanceRequest)
err := b.DecodeMessage(msg)
m.LoadBalanceRequestType = &LoadBalanceRequest_InitialRequest{msg}
return true, err
case 2: // load_balance_request_type.client_stats
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(ClientStats)
err := b.DecodeMessage(msg)
m.LoadBalanceRequestType = &LoadBalanceRequest_ClientStats{msg}
return true, err
default:
return false, nil
}
}
func _LoadBalanceRequest_OneofSizer(msg proto.Message) (n int) {
m := msg.(*LoadBalanceRequest)
// load_balance_request_type
switch x := m.LoadBalanceRequestType.(type) {
case *LoadBalanceRequest_InitialRequest:
s := proto.Size(x.InitialRequest)
n += 1 // tag and wire
n += proto.SizeVarint(uint64(s))
n += s
case *LoadBalanceRequest_ClientStats:
s := proto.Size(x.ClientStats)
n += 1 // tag and wire
n += proto.SizeVarint(uint64(s))
n += s
case nil:
default:
panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
}
return n
}
type InitialLoadBalanceRequest struct {
// The name of the load balanced service (e.g., service.googleapis.com). Its
// length should be less than 256 bytes.
// The name might include a port number. How to handle the port number is up
// to the balancer.
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *InitialLoadBalanceRequest) Reset() { *m = InitialLoadBalanceRequest{} }
func (m *InitialLoadBalanceRequest) String() string { return proto.CompactTextString(m) }
func (*InitialLoadBalanceRequest) ProtoMessage() {}
func (*InitialLoadBalanceRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_load_balancer_12026aec3f0251ba, []int{1}
}
func (m *InitialLoadBalanceRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InitialLoadBalanceRequest.Unmarshal(m, b)
}
func (m *InitialLoadBalanceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_InitialLoadBalanceRequest.Marshal(b, m, deterministic)
}
func (dst *InitialLoadBalanceRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_InitialLoadBalanceRequest.Merge(dst, src)
}
func (m *InitialLoadBalanceRequest) XXX_Size() int {
return xxx_messageInfo_InitialLoadBalanceRequest.Size(m)
}
func (m *InitialLoadBalanceRequest) XXX_DiscardUnknown() {
xxx_messageInfo_InitialLoadBalanceRequest.DiscardUnknown(m)
}
var xxx_messageInfo_InitialLoadBalanceRequest proto.InternalMessageInfo
func (m *InitialLoadBalanceRequest) GetName() string {
if m != nil {
return m.Name
}
return ""
}
// Contains the number of calls finished for a particular load balance token.
type ClientStatsPerToken struct {
// See Server.load_balance_token.
LoadBalanceToken string `protobuf:"bytes,1,opt,name=load_balance_token,json=loadBalanceToken,proto3" json:"load_balance_token,omitempty"`
// The total number of RPCs that finished associated with the token.
NumCalls int64 `protobuf:"varint,2,opt,name=num_calls,json=numCalls,proto3" json:"num_calls,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ClientStatsPerToken) Reset() { *m = ClientStatsPerToken{} }
func (m *ClientStatsPerToken) String() string { return proto.CompactTextString(m) }
func (*ClientStatsPerToken) ProtoMessage() {}
func (*ClientStatsPerToken) Descriptor() ([]byte, []int) {
return fileDescriptor_load_balancer_12026aec3f0251ba, []int{2}
}
func (m *ClientStatsPerToken) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ClientStatsPerToken.Unmarshal(m, b)
}
func (m *ClientStatsPerToken) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ClientStatsPerToken.Marshal(b, m, deterministic)
}
func (dst *ClientStatsPerToken) XXX_Merge(src proto.Message) {
xxx_messageInfo_ClientStatsPerToken.Merge(dst, src)
}
func (m *ClientStatsPerToken) XXX_Size() int {
return xxx_messageInfo_ClientStatsPerToken.Size(m)
}
func (m *ClientStatsPerToken) XXX_DiscardUnknown() {
xxx_messageInfo_ClientStatsPerToken.DiscardUnknown(m)
}
var xxx_messageInfo_ClientStatsPerToken proto.InternalMessageInfo
func (m *ClientStatsPerToken) GetLoadBalanceToken() string {
if m != nil {
return m.LoadBalanceToken
}
return ""
}
func (m *ClientStatsPerToken) GetNumCalls() int64 {
if m != nil {
return m.NumCalls
}
return 0
}
// Contains client level statistics that are useful to load balancing. Each
// count except the timestamp should be reset to zero after reporting the stats.
type ClientStats struct {
// The timestamp of generating the report.
Timestamp *timestamp.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// The total number of RPCs that started.
NumCallsStarted int64 `protobuf:"varint,2,opt,name=num_calls_started,json=numCallsStarted,proto3" json:"num_calls_started,omitempty"`
// The total number of RPCs that finished.
NumCallsFinished int64 `protobuf:"varint,3,opt,name=num_calls_finished,json=numCallsFinished,proto3" json:"num_calls_finished,omitempty"`
// The total number of RPCs that failed to reach a server except dropped RPCs.
NumCallsFinishedWithClientFailedToSend int64 `protobuf:"varint,6,opt,name=num_calls_finished_with_client_failed_to_send,json=numCallsFinishedWithClientFailedToSend,proto3" json:"num_calls_finished_with_client_failed_to_send,omitempty"`
// The total number of RPCs that finished and are known to have been received
// by a server.
NumCallsFinishedKnownReceived int64 `protobuf:"varint,7,opt,name=num_calls_finished_known_received,json=numCallsFinishedKnownReceived,proto3" json:"num_calls_finished_known_received,omitempty"`
// The list of dropped calls.
CallsFinishedWithDrop []*ClientStatsPerToken `protobuf:"bytes,8,rep,name=calls_finished_with_drop,json=callsFinishedWithDrop,proto3" json:"calls_finished_with_drop,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ClientStats) Reset() { *m = ClientStats{} }
func (m *ClientStats) String() string { return proto.CompactTextString(m) }
func (*ClientStats) ProtoMessage() {}
func (*ClientStats) Descriptor() ([]byte, []int) {
return fileDescriptor_load_balancer_12026aec3f0251ba, []int{3}
}
func (m *ClientStats) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ClientStats.Unmarshal(m, b)
}
func (m *ClientStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ClientStats.Marshal(b, m, deterministic)
}
func (dst *ClientStats) XXX_Merge(src proto.Message) {
xxx_messageInfo_ClientStats.Merge(dst, src)
}
func (m *ClientStats) XXX_Size() int {
return xxx_messageInfo_ClientStats.Size(m)
}
func (m *ClientStats) XXX_DiscardUnknown() {
xxx_messageInfo_ClientStats.DiscardUnknown(m)
}
var xxx_messageInfo_ClientStats proto.InternalMessageInfo
func (m *ClientStats) GetTimestamp() *timestamp.Timestamp {
if m != nil {
return m.Timestamp
}
return nil
}
func (m *ClientStats) GetNumCallsStarted() int64 {
if m != nil {
return m.NumCallsStarted
}
return 0
}
func (m *ClientStats) GetNumCallsFinished() int64 {
if m != nil {
return m.NumCallsFinished
}
return 0
}
func (m *ClientStats) GetNumCallsFinishedWithClientFailedToSend() int64 {
if m != nil {
return m.NumCallsFinishedWithClientFailedToSend
}
return 0
}
func (m *ClientStats) GetNumCallsFinishedKnownReceived() int64 {
if m != nil {
return m.NumCallsFinishedKnownReceived
}
return 0
}
func (m *ClientStats) GetCallsFinishedWithDrop() []*ClientStatsPerToken {
if m != nil {
return m.CallsFinishedWithDrop
}
return nil
}
type LoadBalanceResponse struct {
// Types that are valid to be assigned to LoadBalanceResponseType:
// *LoadBalanceResponse_InitialResponse
// *LoadBalanceResponse_ServerList
LoadBalanceResponseType isLoadBalanceResponse_LoadBalanceResponseType `protobuf_oneof:"load_balance_response_type"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadBalanceResponse) Reset() { *m = LoadBalanceResponse{} }
func (m *LoadBalanceResponse) String() string { return proto.CompactTextString(m) }
func (*LoadBalanceResponse) ProtoMessage() {}
func (*LoadBalanceResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_load_balancer_12026aec3f0251ba, []int{4}
}
func (m *LoadBalanceResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadBalanceResponse.Unmarshal(m, b)
}
func (m *LoadBalanceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LoadBalanceResponse.Marshal(b, m, deterministic)
}
func (dst *LoadBalanceResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_LoadBalanceResponse.Merge(dst, src)
}
func (m *LoadBalanceResponse) XXX_Size() int {
return xxx_messageInfo_LoadBalanceResponse.Size(m)
}
func (m *LoadBalanceResponse) XXX_DiscardUnknown() {
xxx_messageInfo_LoadBalanceResponse.DiscardUnknown(m)
}
var xxx_messageInfo_LoadBalanceResponse proto.InternalMessageInfo
type isLoadBalanceResponse_LoadBalanceResponseType interface {
isLoadBalanceResponse_LoadBalanceResponseType()
}
type LoadBalanceResponse_InitialResponse struct {
InitialResponse *InitialLoadBalanceResponse `protobuf:"bytes,1,opt,name=initial_response,json=initialResponse,proto3,oneof"`
}
type LoadBalanceResponse_ServerList struct {
ServerList *ServerList `protobuf:"bytes,2,opt,name=server_list,json=serverList,proto3,oneof"`
}
func (*LoadBalanceResponse_InitialResponse) isLoadBalanceResponse_LoadBalanceResponseType() {}
func (*LoadBalanceResponse_ServerList) isLoadBalanceResponse_LoadBalanceResponseType() {}
func (m *LoadBalanceResponse) GetLoadBalanceResponseType() isLoadBalanceResponse_LoadBalanceResponseType {
if m != nil {
return m.LoadBalanceResponseType
}
return nil
}
func (m *LoadBalanceResponse) GetInitialResponse() *InitialLoadBalanceResponse {
if x, ok := m.GetLoadBalanceResponseType().(*LoadBalanceResponse_InitialResponse); ok {
return x.InitialResponse
}
return nil
}
func (m *LoadBalanceResponse) GetServerList() *ServerList {
if x, ok := m.GetLoadBalanceResponseType().(*LoadBalanceResponse_ServerList); ok {
return x.ServerList
}
return nil
}
// XXX_OneofFuncs is for the internal use of the proto package.
func (*LoadBalanceResponse) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _LoadBalanceResponse_OneofMarshaler, _LoadBalanceResponse_OneofUnmarshaler, _LoadBalanceResponse_OneofSizer, []interface{}{
(*LoadBalanceResponse_InitialResponse)(nil),
(*LoadBalanceResponse_ServerList)(nil),
}
}
func _LoadBalanceResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
m := msg.(*LoadBalanceResponse)
// load_balance_response_type
switch x := m.LoadBalanceResponseType.(type) {
case *LoadBalanceResponse_InitialResponse:
b.EncodeVarint(1<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.InitialResponse); err != nil {
return err
}
case *LoadBalanceResponse_ServerList:
b.EncodeVarint(2<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.ServerList); err != nil {
return err
}
case nil:
default:
return fmt.Errorf("LoadBalanceResponse.LoadBalanceResponseType has unexpected type %T", x)
}
return nil
}
func _LoadBalanceResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*LoadBalanceResponse)
switch tag {
case 1: // load_balance_response_type.initial_response
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(InitialLoadBalanceResponse)
err := b.DecodeMessage(msg)
m.LoadBalanceResponseType = &LoadBalanceResponse_InitialResponse{msg}
return true, err
case 2: // load_balance_response_type.server_list
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(ServerList)
err := b.DecodeMessage(msg)
m.LoadBalanceResponseType = &LoadBalanceResponse_ServerList{msg}
return true, err
default:
return false, nil
}
}
func _LoadBalanceResponse_OneofSizer(msg proto.Message) (n int) {
m := msg.(*LoadBalanceResponse)
// load_balance_response_type
switch x := m.LoadBalanceResponseType.(type) {
case *LoadBalanceResponse_InitialResponse:
s := proto.Size(x.InitialResponse)
n += 1 // tag and wire
n += proto.SizeVarint(uint64(s))
n += s
case *LoadBalanceResponse_ServerList:
s := proto.Size(x.ServerList)
n += 1 // tag and wire
n += proto.SizeVarint(uint64(s))
n += s
case nil:
default:
panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
}
return n
}
type InitialLoadBalanceResponse struct {
// This is an application layer redirect that indicates the client should use
// the specified server for load balancing. When this field is non-empty in
// the response, the client should open a separate connection to the
// load_balancer_delegate and call the BalanceLoad method. Its length should
// be less than 64 bytes.
LoadBalancerDelegate string `protobuf:"bytes,1,opt,name=load_balancer_delegate,json=loadBalancerDelegate,proto3" json:"load_balancer_delegate,omitempty"`
// This interval defines how often the client should send the client stats
// to the load balancer. Stats should only be reported when the duration is
// positive.
ClientStatsReportInterval *duration.Duration `protobuf:"bytes,2,opt,name=client_stats_report_interval,json=clientStatsReportInterval,proto3" json:"client_stats_report_interval,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *InitialLoadBalanceResponse) Reset() { *m = InitialLoadBalanceResponse{} }
func (m *InitialLoadBalanceResponse) String() string { return proto.CompactTextString(m) }
func (*InitialLoadBalanceResponse) ProtoMessage() {}
func (*InitialLoadBalanceResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_load_balancer_12026aec3f0251ba, []int{5}
}
func (m *InitialLoadBalanceResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InitialLoadBalanceResponse.Unmarshal(m, b)
}
func (m *InitialLoadBalanceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_InitialLoadBalanceResponse.Marshal(b, m, deterministic)
}
func (dst *InitialLoadBalanceResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_InitialLoadBalanceResponse.Merge(dst, src)
}
func (m *InitialLoadBalanceResponse) XXX_Size() int {
return xxx_messageInfo_InitialLoadBalanceResponse.Size(m)
}
func (m *InitialLoadBalanceResponse) XXX_DiscardUnknown() {
xxx_messageInfo_InitialLoadBalanceResponse.DiscardUnknown(m)
}
var xxx_messageInfo_InitialLoadBalanceResponse proto.InternalMessageInfo
func (m *InitialLoadBalanceResponse) GetLoadBalancerDelegate() string {
if m != nil {
return m.LoadBalancerDelegate
}
return ""
}
func (m *InitialLoadBalanceResponse) GetClientStatsReportInterval() *duration.Duration {
if m != nil {
return m.ClientStatsReportInterval
}
return nil
}
type ServerList struct {
// Contains a list of servers selected by the load balancer. The list will
// be updated when server resolutions change or as needed to balance load
// across more servers. The client should consume the server list in order
// unless instructed otherwise via the client_config.
Servers []*Server `protobuf:"bytes,1,rep,name=servers,proto3" json:"servers,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ServerList) Reset() { *m = ServerList{} }
func (m *ServerList) String() string { return proto.CompactTextString(m) }
func (*ServerList) ProtoMessage() {}
func (*ServerList) Descriptor() ([]byte, []int) {
return fileDescriptor_load_balancer_12026aec3f0251ba, []int{6}
}
func (m *ServerList) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ServerList.Unmarshal(m, b)
}
func (m *ServerList) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ServerList.Marshal(b, m, deterministic)
}
func (dst *ServerList) XXX_Merge(src proto.Message) {
xxx_messageInfo_ServerList.Merge(dst, src)
}
func (m *ServerList) XXX_Size() int {
return xxx_messageInfo_ServerList.Size(m)
}
func (m *ServerList) XXX_DiscardUnknown() {
xxx_messageInfo_ServerList.DiscardUnknown(m)
}
var xxx_messageInfo_ServerList proto.InternalMessageInfo
func (m *ServerList) GetServers() []*Server {
if m != nil {
return m.Servers
}
return nil
}
// Contains server information. When the drop field is not true, use the other
// fields.
type Server struct {
// A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address.
IpAddress []byte `protobuf:"bytes,1,opt,name=ip_address,json=ipAddress,proto3" json:"ip_address,omitempty"`
// A resolved port number for the server.
Port int32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"`
// An opaque but printable token for load reporting. The client must include
// the token of the picked server into the initial metadata when it starts a
// call to that server. The token is used by the server to verify the request
// and to allow the server to report load to the gRPC LB system. The token is
// also used in client stats for reporting dropped calls.
//
// Its length can be variable but must be less than 50 bytes.
LoadBalanceToken string `protobuf:"bytes,3,opt,name=load_balance_token,json=loadBalanceToken,proto3" json:"load_balance_token,omitempty"`
// Indicates whether this particular request should be dropped by the client.
// If the request is dropped, there will be a corresponding entry in
// ClientStats.calls_finished_with_drop.
Drop bool `protobuf:"varint,4,opt,name=drop,proto3" json:"drop,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Server) Reset() { *m = Server{} }
func (m *Server) String() string { return proto.CompactTextString(m) }
func (*Server) ProtoMessage() {}
func (*Server) Descriptor() ([]byte, []int) {
return fileDescriptor_load_balancer_12026aec3f0251ba, []int{7}
}
func (m *Server) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Server.Unmarshal(m, b)
}
func (m *Server) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Server.Marshal(b, m, deterministic)
}
func (dst *Server) XXX_Merge(src proto.Message) {
xxx_messageInfo_Server.Merge(dst, src)
}
func (m *Server) XXX_Size() int {
return xxx_messageInfo_Server.Size(m)
}
func (m *Server) XXX_DiscardUnknown() {
xxx_messageInfo_Server.DiscardUnknown(m)
}
var xxx_messageInfo_Server proto.InternalMessageInfo
func (m *Server) GetIpAddress() []byte {
if m != nil {
return m.IpAddress
}
return nil
}
func (m *Server) GetPort() int32 {
if m != nil {
return m.Port
}
return 0
}
func (m *Server) GetLoadBalanceToken() string {
if m != nil {
return m.LoadBalanceToken
}
return ""
}
func (m *Server) GetDrop() bool {
if m != nil {
return m.Drop
}
return false
}
func init() {
proto.RegisterType((*LoadBalanceRequest)(nil), "grpc.lb.v1.LoadBalanceRequest")
proto.RegisterType((*InitialLoadBalanceRequest)(nil), "grpc.lb.v1.InitialLoadBalanceRequest")
proto.RegisterType((*ClientStatsPerToken)(nil), "grpc.lb.v1.ClientStatsPerToken")
proto.RegisterType((*ClientStats)(nil), "grpc.lb.v1.ClientStats")
proto.RegisterType((*LoadBalanceResponse)(nil), "grpc.lb.v1.LoadBalanceResponse")
proto.RegisterType((*InitialLoadBalanceResponse)(nil), "grpc.lb.v1.InitialLoadBalanceResponse")
proto.RegisterType((*ServerList)(nil), "grpc.lb.v1.ServerList")
proto.RegisterType((*Server)(nil), "grpc.lb.v1.Server")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// LoadBalancerClient is the client API for LoadBalancer service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type LoadBalancerClient interface {
// Bidirectional rpc to get a list of servers.
BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (LoadBalancer_BalanceLoadClient, error)
}
type loadBalancerClient struct {
cc *grpc.ClientConn
}
func NewLoadBalancerClient(cc *grpc.ClientConn) LoadBalancerClient {
return &loadBalancerClient{cc}
}
func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (LoadBalancer_BalanceLoadClient, error) {
stream, err := c.cc.NewStream(ctx, &_LoadBalancer_serviceDesc.Streams[0], "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
if err != nil {
return nil, err
}
x := &loadBalancerBalanceLoadClient{stream}
return x, nil
}
type LoadBalancer_BalanceLoadClient interface {
Send(*LoadBalanceRequest) error
Recv() (*LoadBalanceResponse, error)
grpc.ClientStream
}
type loadBalancerBalanceLoadClient struct {
grpc.ClientStream
}
func (x *loadBalancerBalanceLoadClient) Send(m *LoadBalanceRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *loadBalancerBalanceLoadClient) Recv() (*LoadBalanceResponse, error) {
m := new(LoadBalanceResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// LoadBalancerServer is the server API for LoadBalancer service.
type LoadBalancerServer interface {
// Bidirectional rpc to get a list of servers.
BalanceLoad(LoadBalancer_BalanceLoadServer) error
}
func RegisterLoadBalancerServer(s *grpc.Server, srv LoadBalancerServer) {
s.RegisterService(&_LoadBalancer_serviceDesc, srv)
}
func _LoadBalancer_BalanceLoad_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(LoadBalancerServer).BalanceLoad(&loadBalancerBalanceLoadServer{stream})
}
type LoadBalancer_BalanceLoadServer interface {
Send(*LoadBalanceResponse) error
Recv() (*LoadBalanceRequest, error)
grpc.ServerStream
}
type loadBalancerBalanceLoadServer struct {
grpc.ServerStream
}
func (x *loadBalancerBalanceLoadServer) Send(m *LoadBalanceResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *loadBalancerBalanceLoadServer) Recv() (*LoadBalanceRequest, error) {
m := new(LoadBalanceRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _LoadBalancer_serviceDesc = grpc.ServiceDesc{
ServiceName: "grpc.lb.v1.LoadBalancer",
HandlerType: (*LoadBalancerServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "BalanceLoad",
Handler: _LoadBalancer_BalanceLoad_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "grpc/lb/v1/load_balancer.proto",
}
func init() {
proto.RegisterFile("grpc/lb/v1/load_balancer.proto", fileDescriptor_load_balancer_12026aec3f0251ba)
}
var fileDescriptor_load_balancer_12026aec3f0251ba = []byte{
// 752 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x55, 0xdd, 0x6e, 0x23, 0x35,
0x14, 0xee, 0x90, 0x69, 0x36, 0x39, 0x29, 0x34, 0xeb, 0x85, 0x65, 0x92, 0xdd, 0x6d, 0x4b, 0x24,
0x56, 0x11, 0x2a, 0x13, 0x52, 0xb8, 0x00, 0x89, 0x0b, 0x48, 0xab, 0x2a, 0x2d, 0xbd, 0x88, 0x9c,
0x4a, 0x45, 0x95, 0x90, 0x99, 0xc9, 0xb8, 0xa9, 0x55, 0xc7, 0x1e, 0x3c, 0x4e, 0x2a, 0xae, 0x79,
0x1f, 0xc4, 0x2b, 0x20, 0x5e, 0x0c, 0x8d, 0xed, 0x49, 0xa6, 0x49, 0xa3, 0xbd, 0xca, 0xf8, 0x9c,
0xcf, 0xdf, 0xf9, 0xfd, 0x1c, 0x38, 0x98, 0xaa, 0x74, 0xd2, 0xe3, 0x71, 0x6f, 0xd1, 0xef, 0x71,
0x19, 0x25, 0x24, 0x8e, 0x78, 0x24, 0x26, 0x54, 0x85, 0xa9, 0x92, 0x5a, 0x22, 0xc8, 0xfd, 0x21,
0x8f, 0xc3, 0x45, 0xbf, 0x7d, 0x30, 0x95, 0x72, 0xca, 0x69, 0xcf, 0x78, 0xe2, 0xf9, 0x5d, 0x2f,
0x99, 0xab, 0x48, 0x33, 0x29, 0x2c, 0xb6, 0x7d, 0xb8, 0xee, 0xd7, 0x6c, 0x46, 0x33, 0x1d, 0xcd,
0x52, 0x0b, 0xe8, 0xfc, 0xeb, 0x01, 0xba, 0x92, 0x51, 0x32, 0xb0, 0x31, 0x30, 0xfd, 0x63, 0x4e,
0x33, 0x8d, 0x46, 0xb0, 0xcf, 0x04, 0xd3, 0x2c, 0xe2, 0x44, 0x59, 0x53, 0xe0, 0x1d, 0x79, 0xdd,
0xc6, 0xc9, 0x97, 0xe1, 0x2a, 0x7a, 0x78, 0x61, 0x21, 0x9b, 0xf7, 0x87, 0x3b, 0xf8, 0x13, 0x77,
0xbf, 0x60, 0xfc, 0x11, 0xf6, 0x26, 0x9c, 0x51, 0xa1, 0x49, 0xa6, 0x23, 0x9d, 0x05, 0x1f, 0x19,
0xba, 0xcf, 0xcb, 0x74, 0xa7, 0xc6, 0x3f, 0xce, 0xdd, 0xc3, 0x1d, 0xdc, 0x98, 0xac, 0x8e, 0x83,
0x37, 0xd0, 0x2a, 0xb7, 0xa2, 0x48, 0x8a, 0xe8, 0x3f, 0x53, 0xda, 0xe9, 0x41, 0x6b, 0x6b, 0x26,
0x08, 0x81, 0x2f, 0xa2, 0x19, 0x35, 0xe9, 0xd7, 0xb1, 0xf9, 0xee, 0xfc, 0x0e, 0xaf, 0x4a, 0xb1,
0x46, 0x54, 0x5d, 0xcb, 0x07, 0x2a, 0xd0, 0x31, 0xa0, 0x27, 0x41, 0x74, 0x6e, 0x75, 0x17, 0x9b,
0x7c, 0x45, 0x6d, 0xd1, 0x6f, 0xa0, 0x2e, 0xe6, 0x33, 0x32, 0x89, 0x38, 0xb7, 0xd5, 0x54, 0x70,
0x4d, 0xcc, 0x67, 0xa7, 0xf9, 0xb9, 0xf3, 0x4f, 0x05, 0x1a, 0xa5, 0x10, 0xe8, 0x7b, 0xa8, 0x2f,
0x3b, 0xef, 0x3a, 0xd9, 0x0e, 0xed, 0x6c, 0xc2, 0x62, 0x36, 0xe1, 0x75, 0x81, 0xc0, 0x2b, 0x30,
0xfa, 0x0a, 0x5e, 0x2e, 0xc3, 0xe4, 0xad, 0x53, 0x9a, 0x26, 0x2e, 0xdc, 0x7e, 0x11, 0x6e, 0x6c,
0xcd, 0x79, 0x01, 0x2b, 0xec, 0x1d, 0x13, 0x2c, 0xbb, 0xa7, 0x49, 0x50, 0x31, 0xe0, 0x66, 0x01,
0x3e, 0x77, 0x76, 0xf4, 0x1b, 0x7c, 0xbd, 0x89, 0x26, 0x8f, 0x4c, 0xdf, 0x13, 0x37, 0xa9, 0xbb,
0x88, 0x71, 0x9a, 0x10, 0x2d, 0x49, 0x46, 0x45, 0x12, 0x54, 0x0d, 0xd1, 0xfb, 0x75, 0xa2, 0x1b,
0xa6, 0xef, 0x6d, 0xad, 0xe7, 0x06, 0x7f, 0x2d, 0xc7, 0x54, 0x24, 0x68, 0x08, 0x5f, 0x3c, 0x43,
0xff, 0x20, 0xe4, 0xa3, 0x20, 0x8a, 0x4e, 0x28, 0x5b, 0xd0, 0x24, 0x78, 0x61, 0x28, 0xdf, 0xad,
0x53, 0xfe, 0x92, 0xa3, 0xb0, 0x03, 0xa1, 0x5f, 0x21, 0x78, 0x2e, 0xc9, 0x44, 0xc9, 0x34, 0xa8,
0x1d, 0x55, 0xba, 0x8d, 0x93, 0xc3, 0x2d, 0x6b, 0x54, 0x8c, 0x16, 0x7f, 0x36, 0x59, 0xcf, 0xf8,
0x4c, 0xc9, 0xf4, 0xd2, 0xaf, 0xf9, 0xcd, 0xdd, 0x4b, 0xbf, 0xb6, 0xdb, 0xac, 0x76, 0xfe, 0xf3,
0xe0, 0xd5, 0x93, 0xfd, 0xc9, 0x52, 0x29, 0x32, 0x8a, 0xc6, 0xd0, 0x5c, 0x49, 0xc1, 0xda, 0xdc,
0x04, 0xdf, 0x7f, 0x48, 0x0b, 0x16, 0x3d, 0xdc, 0xc1, 0xfb, 0x4b, 0x31, 0x38, 0xd2, 0x1f, 0xa0,
0x91, 0x51, 0xb5, 0xa0, 0x8a, 0x70, 0x96, 0x69, 0x27, 0x86, 0xd7, 0x65, 0xbe, 0xb1, 0x71, 0x5f,
0x31, 0x23, 0x26, 0xc8, 0x96, 0xa7, 0xc1, 0x5b, 0x68, 0xaf, 0x49, 0xc1, 0x72, 0x5a, 0x2d, 0xfc,
0xed, 0x41, 0x7b, 0x7b, 0x2a, 0xe8, 0x3b, 0x78, 0xfd, 0xe4, 0x49, 0x21, 0x09, 0xe5, 0x74, 0x1a,
0xe9, 0x42, 0x1f, 0x9f, 0x96, 0xd6, 0x5c, 0x9d, 0x39, 0x1f, 0xba, 0x85, 0xb7, 0x65, 0xed, 0x12,
0x45, 0x53, 0xa9, 0x34, 0x61, 0x42, 0x53, 0xb5, 0x88, 0xb8, 0x4b, 0xbf, 0xb5, 0xb1, 0xd0, 0x67,
0xee, 0x31, 0xc2, 0xad, 0x92, 0x96, 0xb1, 0xb9, 0x7c, 0xe1, 0xee, 0x76, 0x7e, 0x02, 0x58, 0x95,
0x8a, 0x8e, 0xe1, 0x85, 0x2d, 0x35, 0x0b, 0x3c, 0x33, 0x59, 0xb4, 0xd9, 0x13, 0x5c, 0x40, 0x2e,
0xfd, 0x5a, 0xa5, 0xe9, 0x77, 0xfe, 0xf2, 0xa0, 0x6a, 0x3d, 0xe8, 0x1d, 0x00, 0x4b, 0x49, 0x94,
0x24, 0x8a, 0x66, 0x99, 0x29, 0x69, 0x0f, 0xd7, 0x59, 0xfa, 0xb3, 0x35, 0xe4, 0x6f, 0x41, 0x1e,
0xdb, 0xe4, 0xbb, 0x8b, 0xcd, 0xf7, 0x16, 0xd1, 0x57, 0xb6, 0x88, 0x1e, 0x81, 0x6f, 0xd6, 0xce,
0x3f, 0xf2, 0xba, 0x35, 0x6c, 0xbe, 0xed, 0xfa, 0x9c, 0xc4, 0xb0, 0x57, 0x6a, 0xb8, 0x42, 0x18,
0x1a, 0xee, 0x3b, 0x37, 0xa3, 0x83, 0x72, 0x1d, 0x9b, 0xcf, 0x54, 0xfb, 0x70, 0xab, 0xdf, 0x4e,
0xae, 0xeb, 0x7d, 0xe3, 0x0d, 0x6e, 0xe0, 0x63, 0x26, 0x4b, 0xc0, 0xc1, 0xcb, 0x72, 0xc8, 0x51,
0xde, 0xf6, 0x91, 0x77, 0xdb, 0x77, 0x63, 0x98, 0x4a, 0x1e, 0x89, 0x69, 0x28, 0xd5, 0xb4, 0x67,
0xfe, 0x51, 0x8a, 0x99, 0x9b, 0x13, 0x8f, 0xcd, 0x0f, 0xe1, 0x31, 0x59, 0xf4, 0xe3, 0xaa, 0x19,
0xd9, 0xb7, 0xff, 0x07, 0x00, 0x00, 0xff, 0xff, 0x81, 0x14, 0xee, 0xd1, 0x7b, 0x06, 0x00, 0x00,
}

368
vendor/google.golang.org/grpc/balancer/grpclb/grpclb.go generated vendored Normal file
View file

@ -0,0 +1,368 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//go:generate ./regenerate.sh
// Package grpclb defines a grpclb balancer.
//
// To install grpclb balancer, import this package as:
// import _ "google.golang.org/grpc/balancer/grpclb"
package grpclb
import (
"errors"
"strconv"
"strings"
"sync"
"time"
durationpb "github.com/golang/protobuf/ptypes/duration"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/resolver"
)
const (
lbTokeyKey = "lb-token"
defaultFallbackTimeout = 10 * time.Second
grpclbName = "grpclb"
)
var (
// defaultBackoffConfig configures the backoff strategy that's used when the
// init handshake in the RPC is unsuccessful. It's not for the clientconn
// reconnect backoff.
//
// It has the same value as the default grpc.DefaultBackoffConfig.
//
// TODO: make backoff configurable.
defaultBackoffConfig = backoff.Exponential{
MaxDelay: 120 * time.Second,
}
errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
)
func convertDuration(d *durationpb.Duration) time.Duration {
if d == nil {
return 0
}
return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
}
// Client API for LoadBalancer service.
// Mostly copied from generated pb.go file.
// To avoid circular dependency.
type loadBalancerClient struct {
cc *grpc.ClientConn
}
func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) {
desc := &grpc.StreamDesc{
StreamName: "BalanceLoad",
ServerStreams: true,
ClientStreams: true,
}
stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
if err != nil {
return nil, err
}
x := &balanceLoadClientStream{stream}
return x, nil
}
type balanceLoadClientStream struct {
grpc.ClientStream
}
func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
m := new(lbpb.LoadBalanceResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func init() {
balancer.Register(newLBBuilder())
}
// newLBBuilder creates a builder for grpclb.
func newLBBuilder() balancer.Builder {
return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
}
// newLBBuilderWithFallbackTimeout creates a grpclb builder with the given
// fallbackTimeout. If no response is received from the remote balancer within
// fallbackTimeout, the backend addresses from the resolved address list will be
// used.
//
// Only call this function when a non-default fallback timeout is needed.
func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
return &lbBuilder{
fallbackTimeout: fallbackTimeout,
}
}
type lbBuilder struct {
fallbackTimeout time.Duration
}
func (b *lbBuilder) Name() string {
return grpclbName
}
func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
// This generates a manual resolver builder with a random scheme. This
// scheme will be used to dial to remote LB, so we can send filtered address
// updates to remote LB ClientConn using this manual resolver.
scheme := "grpclb_internal_" + strconv.FormatInt(time.Now().UnixNano(), 36)
r := &lbManualResolver{scheme: scheme, ccb: cc}
var target string
targetSplitted := strings.Split(cc.Target(), ":///")
if len(targetSplitted) < 2 {
target = cc.Target()
} else {
target = targetSplitted[1]
}
lb := &lbBalancer{
cc: newLBCacheClientConn(cc),
target: target,
opt: opt,
fallbackTimeout: b.fallbackTimeout,
doneCh: make(chan struct{}),
manualResolver: r,
csEvltr: &balancer.ConnectivityStateEvaluator{},
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
clientStats: newRPCStats(),
backoff: defaultBackoffConfig, // TODO: make backoff configurable.
}
return lb
}
type lbBalancer struct {
cc *lbCacheClientConn
target string
opt balancer.BuildOptions
fallbackTimeout time.Duration
doneCh chan struct{}
// manualResolver is used in the remote LB ClientConn inside grpclb. When
// resolved address updates are received by grpclb, filtered updates will be
// send to remote LB ClientConn through this resolver.
manualResolver *lbManualResolver
// The ClientConn to talk to the remote balancer.
ccRemoteLB *grpc.ClientConn
// backoff for calling remote balancer.
backoff backoff.Strategy
// Support client side load reporting. Each picker gets a reference to this,
// and will update its content.
clientStats *rpcStats
mu sync.Mutex // guards everything following.
// The full server list including drops, used to check if the newly received
// serverList contains anything new. Each generate picker will also have
// reference to this list to do the first layer pick.
fullServerList []*lbpb.Server
// All backends addresses, with metadata set to nil. This list contains all
// backend addresses in the same order and with the same duplicates as in
// serverlist. When generating picker, a SubConn slice with the same order
// but with only READY SCs will be gerenated.
backendAddrs []resolver.Address
// Roundrobin functionalities.
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
picker balancer.Picker
// Support fallback to resolved backend addresses if there's no response
// from remote balancer within fallbackTimeout.
fallbackTimerExpired bool
serverListReceived bool
// resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
// when resolved address updates are received, and read in the goroutine
// handling fallback.
resolvedBackendAddrs []resolver.Address
}
// regeneratePicker takes a snapshot of the balancer, and generates a picker from
// it. The picker
// - always returns ErrTransientFailure if the balancer is in TransientFailure,
// - does two layer roundrobin pick otherwise.
// Caller must hold lb.mu.
func (lb *lbBalancer) regeneratePicker() {
if lb.state == connectivity.TransientFailure {
lb.picker = &errPicker{err: balancer.ErrTransientFailure}
return
}
var readySCs []balancer.SubConn
for _, a := range lb.backendAddrs {
if sc, ok := lb.subConns[a]; ok {
if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
readySCs = append(readySCs, sc)
}
}
}
if len(lb.fullServerList) <= 0 {
if len(readySCs) <= 0 {
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}
lb.picker = &rrPicker{subConns: readySCs}
return
}
lb.picker = &lbPicker{
serverList: lb.fullServerList,
subConns: readySCs,
stats: lb.clientStats,
}
}
func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
lb.mu.Lock()
defer lb.mu.Unlock()
oldS, ok := lb.scStates[sc]
if !ok {
grpclog.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
return
}
lb.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(lb.scStates, sc)
}
oldAggrState := lb.state
lb.state = lb.csEvltr.RecordTransition(oldS, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (oldS == connectivity.Ready) != (s == connectivity.Ready) ||
(lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
lb.regeneratePicker()
}
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}
// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
// resolved backends (backends received from resolver, not from remote balancer)
// if no connection to remote balancers was successful.
func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
timer := time.NewTimer(fallbackTimeout)
defer timer.Stop()
select {
case <-timer.C:
case <-lb.doneCh:
return
}
lb.mu.Lock()
if lb.serverListReceived {
lb.mu.Unlock()
return
}
lb.fallbackTimerExpired = true
lb.refreshSubConns(lb.resolvedBackendAddrs)
lb.mu.Unlock()
}
// HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB
// clientConn. The remoteLB clientConn will handle creating/removing remoteLB
// connections.
func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
grpclog.Infof("lbBalancer: handleResolvedResult: %+v", addrs)
if len(addrs) <= 0 {
return
}
var remoteBalancerAddrs, backendAddrs []resolver.Address
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
remoteBalancerAddrs = append(remoteBalancerAddrs, a)
} else {
backendAddrs = append(backendAddrs, a)
}
}
if lb.ccRemoteLB == nil {
if len(remoteBalancerAddrs) <= 0 {
grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
return
}
// First time receiving resolved addresses, create a cc to remote
// balancers.
lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName)
// Start the fallback goroutine.
go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
}
// cc to remote balancers uses lb.manualResolver. Send the updated remote
// balancer addresses to it through manualResolver.
lb.manualResolver.NewAddress(remoteBalancerAddrs)
lb.mu.Lock()
lb.resolvedBackendAddrs = backendAddrs
// If serverListReceived is true, connection to remote balancer was
// successful and there's no need to do fallback anymore.
// If fallbackTimerExpired is false, fallback hasn't happened yet.
if !lb.serverListReceived && lb.fallbackTimerExpired {
// This means we received a new list of resolved backends, and we are
// still in fallback mode. Need to update the list of backends we are
// using to the new list of backends.
lb.refreshSubConns(lb.resolvedBackendAddrs)
}
lb.mu.Unlock()
}
func (lb *lbBalancer) Close() {
select {
case <-lb.doneCh:
return
default:
}
close(lb.doneCh)
if lb.ccRemoteLB != nil {
lb.ccRemoteLB.Close()
}
lb.cc.close()
}

View file

@ -0,0 +1,170 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"sync"
"sync/atomic"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// rpcStats is same as lbmpb.ClientStats, except that numCallsDropped is a map
// instead of a slice.
type rpcStats struct {
// Only access the following fields atomically.
numCallsStarted int64
numCallsFinished int64
numCallsFinishedWithClientFailedToSend int64
numCallsFinishedKnownReceived int64
mu sync.Mutex
// map load_balance_token -> num_calls_dropped
numCallsDropped map[string]int64
}
func newRPCStats() *rpcStats {
return &rpcStats{
numCallsDropped: make(map[string]int64),
}
}
// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
func (s *rpcStats) toClientStats() *lbpb.ClientStats {
stats := &lbpb.ClientStats{
NumCallsStarted: atomic.SwapInt64(&s.numCallsStarted, 0),
NumCallsFinished: atomic.SwapInt64(&s.numCallsFinished, 0),
NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.numCallsFinishedWithClientFailedToSend, 0),
NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.numCallsFinishedKnownReceived, 0),
}
s.mu.Lock()
dropped := s.numCallsDropped
s.numCallsDropped = make(map[string]int64)
s.mu.Unlock()
for token, count := range dropped {
stats.CallsFinishedWithDrop = append(stats.CallsFinishedWithDrop, &lbpb.ClientStatsPerToken{
LoadBalanceToken: token,
NumCalls: count,
})
}
return stats
}
func (s *rpcStats) drop(token string) {
atomic.AddInt64(&s.numCallsStarted, 1)
s.mu.Lock()
s.numCallsDropped[token]++
s.mu.Unlock()
atomic.AddInt64(&s.numCallsFinished, 1)
}
func (s *rpcStats) failedToSend() {
atomic.AddInt64(&s.numCallsStarted, 1)
atomic.AddInt64(&s.numCallsFinishedWithClientFailedToSend, 1)
atomic.AddInt64(&s.numCallsFinished, 1)
}
func (s *rpcStats) knownReceived() {
atomic.AddInt64(&s.numCallsStarted, 1)
atomic.AddInt64(&s.numCallsFinishedKnownReceived, 1)
atomic.AddInt64(&s.numCallsFinished, 1)
}
type errPicker struct {
// Pick always returns this err.
err error
}
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
}
// rrPicker does roundrobin on subConns. It's typically used when there's no
// response from remote balancer, and grpclb falls back to the resolved
// backends.
//
// It guaranteed that len(subConns) > 0.
type rrPicker struct {
mu sync.Mutex
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
subConnsNext int
}
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
defer p.mu.Unlock()
sc := p.subConns[p.subConnsNext]
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
return sc, nil, nil
}
// lbPicker does two layers of picks:
//
// First layer: roundrobin on all servers in serverList, including drops and backends.
// - If it picks a drop, the RPC will fail as being dropped.
// - If it picks a backend, do a second layer pick to pick the real backend.
//
// Second layer: roundrobin on all READY backends.
//
// It's guaranteed that len(serverList) > 0.
type lbPicker struct {
mu sync.Mutex
serverList []*lbpb.Server
serverListNext int
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
subConnsNext int
stats *rpcStats
}
func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
defer p.mu.Unlock()
// Layer one roundrobin on serverList.
s := p.serverList[p.serverListNext]
p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
// If it's a drop, return an error and fail the RPC.
if s.Drop {
p.stats.drop(s.LoadBalanceToken)
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
}
// If not a drop but there's no ready subConns.
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
// Return the next ready subConn in the list, also collect rpc stats.
sc := p.subConns[p.subConnsNext]
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
done := func(info balancer.DoneInfo) {
if !info.BytesSent {
p.stats.failedToSend()
} else if info.BytesReceived {
p.stats.knownReceived()
}
}
return sc, done, nil
}

View file

@ -0,0 +1,294 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"fmt"
"io"
"net"
"reflect"
"time"
timestamppb "github.com/golang/protobuf/ptypes/timestamp"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
// processServerList updates balaner's internal state, create/remove SubConns
// and regenerates picker using the received serverList.
func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
grpclog.Infof("lbBalancer: processing server list: %+v", l)
lb.mu.Lock()
defer lb.mu.Unlock()
// Set serverListReceived to true so fallback will not take effect if it has
// not hit timeout.
lb.serverListReceived = true
// If the new server list == old server list, do nothing.
if reflect.DeepEqual(lb.fullServerList, l.Servers) {
grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
return
}
lb.fullServerList = l.Servers
var backendAddrs []resolver.Address
for i, s := range l.Servers {
if s.Drop {
continue
}
md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken)
ip := net.IP(s.IpAddress)
ipStr := ip.String()
if ip.To4() == nil {
// Add square brackets to ipv6 addresses, otherwise net.Dial() and
// net.SplitHostPort() will return too many colons error.
ipStr = fmt.Sprintf("[%s]", ipStr)
}
addr := resolver.Address{
Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
Metadata: &md,
}
grpclog.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
i, ipStr, s.Port, s.LoadBalanceToken)
backendAddrs = append(backendAddrs, addr)
}
// Call refreshSubConns to create/remove SubConns.
lb.refreshSubConns(backendAddrs)
// Regenerate and update picker no matter if there's update on backends (if
// any SubConn will be newed/removed). Because since the full serverList was
// different, there might be updates in drops or pick weights(different
// number of duplicates). We need to update picker with the fulllist.
//
// Now with cache, even if SubConn was newed/removed, there might be no
// state changes.
lb.regeneratePicker()
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}
// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
// indicating whether the backendAddrs are different from the cached
// backendAddrs (whether any SubConn was newed/removed).
// Caller must hold lb.mu.
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
lb.backendAddrs = nil
var backendsUpdated bool
// addrsSet is the set converted from backendAddrs, it's used to quick
// lookup for an address.
addrsSet := make(map[resolver.Address]struct{})
// Create new SubConns.
for _, addr := range backendAddrs {
addrWithoutMD := addr
addrWithoutMD.Metadata = nil
addrsSet[addrWithoutMD] = struct{}{}
lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD)
if _, ok := lb.subConns[addrWithoutMD]; !ok {
backendsUpdated = true
// Use addrWithMD to create the SubConn.
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("roundrobinBalancer: failed to create new SubConn: %v", err)
continue
}
lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
if _, ok := lb.scStates[sc]; !ok {
// Only set state of new sc to IDLE. The state could already be
// READY for cached SubConns.
lb.scStates[sc] = connectivity.Idle
}
sc.Connect()
}
}
for a, sc := range lb.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
backendsUpdated = true
lb.cc.RemoveSubConn(sc)
delete(lb.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
}
}
return backendsUpdated
}
func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
for {
reply, err := s.Recv()
if err != nil {
if err == io.EOF {
return errServerTerminatedConnection
}
return fmt.Errorf("grpclb: failed to recv server list: %v", err)
}
if serverList := reply.GetServerList(); serverList != nil {
lb.processServerList(serverList)
}
}
}
func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-s.Context().Done():
return
}
stats := lb.clientStats.toClientStats()
t := time.Now()
stats.Timestamp = &timestamppb.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
if err := s.Send(&lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
ClientStats: stats,
},
}); err != nil {
return
}
}
}
func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false))
if err != nil {
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}
// grpclb handshake on the stream.
initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: &lbpb.InitialLoadBalanceRequest{
Name: lb.target,
},
},
}
if err := stream.Send(initReq); err != nil {
return true, fmt.Errorf("grpclb: failed to send init request: %v", err)
}
reply, err := stream.Recv()
if err != nil {
return true, fmt.Errorf("grpclb: failed to recv init response: %v", err)
}
initResp := reply.GetInitialResponse()
if initResp == nil {
return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
}
if initResp.LoadBalancerDelegate != "" {
return true, fmt.Errorf("grpclb: Delegation is not supported")
}
go func() {
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d)
}
}()
// No backoff if init req/resp handshake was successful.
return false, lb.readServerList(stream)
}
func (lb *lbBalancer) watchRemoteBalancer() {
var retryCount int
for {
doBackoff, err := lb.callRemoteBalancer()
select {
case <-lb.doneCh:
return
default:
if err != nil {
if err == errServerTerminatedConnection {
grpclog.Info(err)
} else {
grpclog.Error(err)
}
}
}
if !doBackoff {
retryCount = 0
continue
}
timer := time.NewTimer(lb.backoff.Backoff(retryCount))
select {
case <-timer.C:
case <-lb.doneCh:
timer.Stop()
return
}
retryCount++
}
}
func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
var dopts []grpc.DialOption
if creds := lb.opt.DialCreds; creds != nil {
if err := creds.OverrideServerName(remoteLBName); err == nil {
dopts = append(dopts, grpc.WithTransportCredentials(creds))
} else {
grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
dopts = append(dopts, grpc.WithInsecure())
}
} else {
dopts = append(dopts, grpc.WithInsecure())
}
if lb.opt.Dialer != nil {
// WithDialer takes a different type of function, so we instead use a
// special DialOption here.
wcd := internal.WithContextDialer.(func(func(context.Context, string) (net.Conn, error)) grpc.DialOption)
dopts = append(dopts, wcd(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption)
dopts = append(dopts, wrb(lb.manualResolver))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
}
// DialContext using manualResolver.Scheme, which is a random scheme generated
// when init grpclb. The target name is not important.
cc, err := grpc.DialContext(context.Background(), "grpclb:///grpclb.server", dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
}
lb.ccRemoteLB = cc
go lb.watchRemoteBalancer()
}

View file

@ -0,0 +1,970 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
durationpb "github.com/golang/protobuf/ptypes/duration"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)
var (
lbServerName = "bar.com"
beServerName = "foo.com"
lbToken = "iamatoken"
// Resolver replaces localhost with fakeName in Next().
// Dialer replaces fakeName with localhost when dialing.
// This will test that custom dialer is passed from Dial to grpclb.
fakeName = "fake.Name"
)
type serverNameCheckCreds struct {
mu sync.Mutex
sn string
expected string
}
func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if _, err := io.WriteString(rawConn, c.sn); err != nil {
fmt.Printf("Failed to write the server name %s to the client %v", c.sn, err)
return nil, nil, err
}
return rawConn, nil, nil
}
func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
b := make([]byte, len(c.expected))
errCh := make(chan error, 1)
go func() {
_, err := rawConn.Read(b)
errCh <- err
}()
select {
case err := <-errCh:
if err != nil {
fmt.Printf("Failed to read the server name from the server %v", err)
return nil, nil, err
}
case <-ctx.Done():
return nil, nil, ctx.Err()
}
if c.expected != string(b) {
fmt.Printf("Read the server name %s want %s", string(b), c.expected)
return nil, nil, errors.New("received unexpected server name")
}
return rawConn, nil, nil
}
func (c *serverNameCheckCreds) Info() credentials.ProtocolInfo {
c.mu.Lock()
defer c.mu.Unlock()
return credentials.ProtocolInfo{}
}
func (c *serverNameCheckCreds) Clone() credentials.TransportCredentials {
c.mu.Lock()
defer c.mu.Unlock()
return &serverNameCheckCreds{
expected: c.expected,
}
}
func (c *serverNameCheckCreds) OverrideServerName(s string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.expected = s
return nil
}
// fakeNameDialer replaces fakeName with localhost when dialing.
// This will test that custom dialer is passed from Dial to grpclb.
func fakeNameDialer(addr string, timeout time.Duration) (net.Conn, error) {
addr = strings.Replace(addr, fakeName, "localhost", 1)
return net.DialTimeout("tcp", addr, timeout)
}
// merge merges the new client stats into current stats.
//
// It's a test-only method. rpcStats is defined in grpclb_picker.
func (s *rpcStats) merge(cs *lbpb.ClientStats) {
atomic.AddInt64(&s.numCallsStarted, cs.NumCallsStarted)
atomic.AddInt64(&s.numCallsFinished, cs.NumCallsFinished)
atomic.AddInt64(&s.numCallsFinishedWithClientFailedToSend, cs.NumCallsFinishedWithClientFailedToSend)
atomic.AddInt64(&s.numCallsFinishedKnownReceived, cs.NumCallsFinishedKnownReceived)
s.mu.Lock()
for _, perToken := range cs.CallsFinishedWithDrop {
s.numCallsDropped[perToken.LoadBalanceToken] += perToken.NumCalls
}
s.mu.Unlock()
}
func mapsEqual(a, b map[string]int64) bool {
if len(a) != len(b) {
return false
}
for k, v1 := range a {
if v2, ok := b[k]; !ok || v1 != v2 {
return false
}
}
return true
}
func atomicEqual(a, b *int64) bool {
return atomic.LoadInt64(a) == atomic.LoadInt64(b)
}
// equal compares two rpcStats.
//
// It's a test-only method. rpcStats is defined in grpclb_picker.
func (s *rpcStats) equal(o *rpcStats) bool {
if !atomicEqual(&s.numCallsStarted, &o.numCallsStarted) {
return false
}
if !atomicEqual(&s.numCallsFinished, &o.numCallsFinished) {
return false
}
if !atomicEqual(&s.numCallsFinishedWithClientFailedToSend, &o.numCallsFinishedWithClientFailedToSend) {
return false
}
if !atomicEqual(&s.numCallsFinishedKnownReceived, &o.numCallsFinishedKnownReceived) {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
o.mu.Lock()
defer o.mu.Unlock()
if !mapsEqual(s.numCallsDropped, o.numCallsDropped) {
return false
}
return true
}
type remoteBalancer struct {
sls chan *lbpb.ServerList
statsDura time.Duration
done chan struct{}
stats *rpcStats
}
func newRemoteBalancer(intervals []time.Duration) *remoteBalancer {
return &remoteBalancer{
sls: make(chan *lbpb.ServerList, 1),
done: make(chan struct{}),
stats: newRPCStats(),
}
}
func (b *remoteBalancer) stop() {
close(b.sls)
close(b.done)
}
func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
initReq := req.GetInitialRequest()
if initReq.Name != beServerName {
return status.Errorf(codes.InvalidArgument, "invalid service name: %v", initReq.Name)
}
resp := &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
InitialResponse: &lbpb.InitialLoadBalanceResponse{
ClientStatsReportInterval: &durationpb.Duration{
Seconds: int64(b.statsDura.Seconds()),
Nanos: int32(b.statsDura.Nanoseconds() - int64(b.statsDura.Seconds())*1e9),
},
},
},
}
if err := stream.Send(resp); err != nil {
return err
}
go func() {
for {
var (
req *lbpb.LoadBalanceRequest
err error
)
if req, err = stream.Recv(); err != nil {
return
}
b.stats.merge(req.GetClientStats())
}
}()
for v := range b.sls {
resp = &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
ServerList: v,
},
}
if err := stream.Send(resp); err != nil {
return err
}
}
<-b.done
return nil
}
type testServer struct {
testpb.TestServiceServer
addr string
fallback bool
}
const testmdkey = "testmd"
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Internal, "failed to receive metadata")
}
if !s.fallback && (md == nil || md["lb-token"][0] != lbToken) {
return nil, status.Errorf(codes.Internal, "received unexpected metadata: %v", md)
}
grpc.SetTrailer(ctx, metadata.Pairs(testmdkey, s.addr))
return &testpb.Empty{}, nil
}
func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
return nil
}
func startBackends(sn string, fallback bool, lis ...net.Listener) (servers []*grpc.Server) {
for _, l := range lis {
creds := &serverNameCheckCreds{
sn: sn,
}
s := grpc.NewServer(grpc.Creds(creds))
testpb.RegisterTestServiceServer(s, &testServer{addr: l.Addr().String(), fallback: fallback})
servers = append(servers, s)
go func(s *grpc.Server, l net.Listener) {
s.Serve(l)
}(s, l)
}
return
}
func stopBackends(servers []*grpc.Server) {
for _, s := range servers {
s.Stop()
}
}
type testServers struct {
lbAddr string
ls *remoteBalancer
lb *grpc.Server
beIPs []net.IP
bePorts []int
}
func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) {
var (
beListeners []net.Listener
ls *remoteBalancer
lb *grpc.Server
beIPs []net.IP
bePorts []int
)
for i := 0; i < numberOfBackends; i++ {
// Start a backend.
beLis, e := net.Listen("tcp", "localhost:0")
if e != nil {
err = fmt.Errorf("Failed to listen %v", err)
return
}
beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP)
bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port)
beListeners = append(beListeners, beLis)
}
backends := startBackends(beServerName, false, beListeners...)
// Start a load balancer.
lbLis, err := net.Listen("tcp", "localhost:0")
if err != nil {
err = fmt.Errorf("Failed to create the listener for the load balancer %v", err)
return
}
lbCreds := &serverNameCheckCreds{
sn: lbServerName,
}
lb = grpc.NewServer(grpc.Creds(lbCreds))
ls = newRemoteBalancer(nil)
lbgrpc.RegisterLoadBalancerServer(lb, ls)
go func() {
lb.Serve(lbLis)
}()
tss = &testServers{
lbAddr: fakeName + ":" + strconv.Itoa(lbLis.Addr().(*net.TCPAddr).Port),
ls: ls,
lb: lb,
beIPs: beIPs,
bePorts: bePorts,
}
cleanup = func() {
defer stopBackends(backends)
defer func() {
ls.stop()
lb.Stop()
}()
}
return
}
func TestGRPCLB(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
tss, cleanup, err := newLoadBalancer(1)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()
be := &lbpb.Server{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}
var bes []*lbpb.Server
bes = append(bes, be)
sl := &lbpb.ServerList{
Servers: bes,
}
tss.ls.sls <- sl
creds := serverNameCheckCreds{
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
}
// The remote balancer sends response with duplicates to grpclb client.
func TestGRPCLBWeighted(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
tss, cleanup, err := newLoadBalancer(2)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()
beServers := []*lbpb.Server{{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}, {
IpAddress: tss.beIPs[1],
Port: int32(tss.bePorts[1]),
LoadBalanceToken: lbToken,
}}
portsToIndex := make(map[int]int)
for i := range beServers {
portsToIndex[tss.bePorts[i]] = i
}
creds := serverNameCheckCreds{
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
sequences := []string{"00101", "00011"}
for _, seq := range sequences {
var (
bes []*lbpb.Server
p peer.Peer
result string
)
for _, s := range seq {
bes = append(bes, beServers[s-'0'])
}
tss.ls.sls <- &lbpb.ServerList{Servers: bes}
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port])
}
// The generated result will be in format of "0010100101".
if !strings.Contains(result, strings.Repeat(seq, 2)) {
t.Errorf("got result sequence %q, want patten %q", result, seq)
}
}
}
func TestDropRequest(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
tss, cleanup, err := newLoadBalancer(1)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()
tss.ls.sls <- &lbpb.ServerList{
Servers: []*lbpb.Server{{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
Drop: false,
}, {
Drop: true,
}},
}
creds := serverNameCheckCreds{
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
// Wait for the 1st, non-fail-fast RPC to succeed. This ensures both server
// connections are made, because the first one has DropForLoadBalancing set
// to true.
var i int
for i = 0; i < 1000; i++ {
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
break
}
time.Sleep(time.Millisecond)
}
if i >= 1000 {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", testC, err)
}
select {
case <-ctx.Done():
t.Fatal("timed out", ctx.Err())
default:
}
for _, failfast := range []bool{true, false} {
for i := 0; i < 3; i++ {
// Even RPCs should fail, because the 2st backend has
// DropForLoadBalancing set to true.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(failfast)); status.Code(err) != codes.Unavailable {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
}
// Odd RPCs should succeed since they choose the non-drop-request
// backend according to the round robin policy.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(failfast)); err != nil {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
}
}
}
// When the balancer in use disconnects, grpclb should connect to the next address from resolved balancer address list.
func TestBalancerDisconnects(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
var (
tests []*testServers
lbs []*grpc.Server
)
for i := 0; i < 2; i++ {
tss, cleanup, err := newLoadBalancer(1)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()
be := &lbpb.Server{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}
var bes []*lbpb.Server
bes = append(bes, be)
sl := &lbpb.ServerList{
Servers: bes,
}
tss.ls.sls <- sl
tests = append(tests, tss)
lbs = append(lbs, tss.lb)
}
creds := serverNameCheckCreds{
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
Addr: tests[0].lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}, {
Addr: tests[1].lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port != tests[0].bePorts[0] {
t.Fatalf("got peer: %v, want peer port: %v", p.Addr, tests[0].bePorts[0])
}
lbs[0].Stop()
// Stop balancer[0], balancer[1] should be used by grpclb.
// Check peer address to see if that happened.
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tests[1].bePorts[0] {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("No RPC sent to second backend after 1 second")
}
type customGRPCLBBuilder struct {
balancer.Builder
name string
}
func (b *customGRPCLBBuilder) Name() string {
return b.name
}
const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout"
func init() {
balancer.Register(&customGRPCLBBuilder{
Builder: newLBBuilderWithFallbackTimeout(100 * time.Millisecond),
name: grpclbCustomFallbackName,
})
}
func TestFallback(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
tss, cleanup, err := newLoadBalancer(1)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()
// Start a standalone backend.
beLis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen %v", err)
}
defer beLis.Close()
standaloneBEs := startBackends(beServerName, true, beLis)
defer stopBackends(standaloneBEs)
be := &lbpb.Server{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}
var bes []*lbpb.Server
bes = append(bes, be)
sl := &lbpb.ServerList{
Servers: bes,
}
tss.ls.sls <- sl
creds := serverNameCheckCreds{
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerName(grpclbCustomFallbackName),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
Addr: "",
Type: resolver.GRPCLB,
ServerName: lbServerName,
}, {
Addr: beLis.Addr().String(),
Type: resolver.Backend,
ServerName: beServerName,
}})
var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.String() != beLis.Addr().String() {
t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr())
}
r.NewAddress([]resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}, {
Addr: beLis.Addr().String(),
Type: resolver.Backend,
ServerName: beServerName,
}})
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
}
type failPreRPCCred struct{}
func (failPreRPCCred) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
if strings.Contains(uri[0], failtosendURI) {
return nil, fmt.Errorf("rpc should fail to send")
}
return nil, nil
}
func (failPreRPCCred) RequireTransportSecurity() bool {
return false
}
func checkStats(stats, expected *rpcStats) error {
if !stats.equal(expected) {
return fmt.Errorf("stats not equal: got %+v, want %+v", stats, expected)
}
return nil
}
func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rpcStats {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
tss, cleanup, err := newLoadBalancer(1)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()
tss.ls.sls <- &lbpb.ServerList{
Servers: []*lbpb.Server{{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
Drop: drop,
}},
}
tss.ls.statsDura = 100 * time.Millisecond
creds := serverNameCheckCreds{expected: beServerName}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithTransportCredentials(&creds),
grpc.WithPerRPCCredentials(failPreRPCCred{}),
grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
runRPCs(cc)
time.Sleep(1 * time.Second)
stats := tss.ls.stats
return stats
}
const (
countRPC = 40
failtosendURI = "failtosend"
dropErrDesc = "request dropped by grpclb"
)
func TestGRPCLBStatsUnarySuccess(t *testing.T) {
defer leakcheck.Check(t)
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
for i := 0; i < countRPC-1; i++ {
testC.EmptyCall(context.Background(), &testpb.Empty{})
}
})
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC),
numCallsFinishedKnownReceived: int64(countRPC),
}); err != nil {
t.Fatal(err)
}
}
func TestGRPCLBStatsUnaryDrop(t *testing.T) {
defer leakcheck.Check(t)
c := 0
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
for {
c++
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
if strings.Contains(err.Error(), dropErrDesc) {
break
}
}
}
for i := 0; i < countRPC; i++ {
testC.EmptyCall(context.Background(), &testpb.Empty{})
}
})
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC + c),
numCallsFinished: int64(countRPC + c),
numCallsFinishedWithClientFailedToSend: int64(c - 1),
numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)},
}); err != nil {
t.Fatal(err)
}
}
func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
defer leakcheck.Check(t)
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
for i := 0; i < countRPC-1; i++ {
cc.Invoke(context.Background(), failtosendURI, &testpb.Empty{}, nil)
}
})
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC),
numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
numCallsFinishedKnownReceived: 1,
}); err != nil {
t.Fatal(err)
}
}
func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
defer leakcheck.Check(t)
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
}
for {
if _, err = stream.Recv(); err == io.EOF {
break
}
}
for i := 0; i < countRPC-1; i++ {
stream, err = testC.FullDuplexCall(context.Background())
if err == nil {
// Wait for stream to end if err is nil.
for {
if _, err = stream.Recv(); err == io.EOF {
break
}
}
}
}
})
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC),
numCallsFinishedKnownReceived: int64(countRPC),
}); err != nil {
t.Fatal(err)
}
}
func TestGRPCLBStatsStreamingDrop(t *testing.T) {
defer leakcheck.Check(t)
c := 0
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
for {
c++
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
if strings.Contains(err.Error(), dropErrDesc) {
break
}
}
}
for i := 0; i < countRPC; i++ {
testC.FullDuplexCall(context.Background())
}
})
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC + c),
numCallsFinished: int64(countRPC + c),
numCallsFinishedWithClientFailedToSend: int64(c - 1),
numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)},
}); err != nil {
t.Fatal(err)
}
}
func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
defer leakcheck.Check(t)
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
}
for {
if _, err = stream.Recv(); err == io.EOF {
break
}
}
for i := 0; i < countRPC-1; i++ {
cc.NewStream(context.Background(), &grpc.StreamDesc{}, failtosendURI)
}
})
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC),
numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
numCallsFinishedKnownReceived: 1,
}); err != nil {
t.Fatal(err)
}
}

View file

@ -0,0 +1,214 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"fmt"
"sync"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
)
// The parent ClientConn should re-resolve when grpclb loses connection to the
// remote balancer. When the ClientConn inside grpclb gets a TransientFailure,
// it calls lbManualResolver.ResolveNow(), which calls parent ClientConn's
// ResolveNow, and eventually results in re-resolve happening in parent
// ClientConn's resolver (DNS for example).
//
// parent
// ClientConn
// +-----------------------------------------------------------------+
// | parent +---------------------------------+ |
// | DNS ClientConn | grpclb | |
// | resolver balancerWrapper | | |
// | + + | grpclb grpclb | |
// | | | | ManualResolver ClientConn | |
// | | | | + + | |
// | | | | | | Transient | |
// | | | | | | Failure | |
// | | | | | <--------- | | |
// | | | <--------------- | ResolveNow | | |
// | | <--------- | ResolveNow | | | | |
// | | ResolveNow | | | | | |
// | | | | | | | |
// | + + | + + | |
// | +---------------------------------+ |
// +-----------------------------------------------------------------+
// lbManualResolver is used by the ClientConn inside grpclb. It's a manual
// resolver with a special ResolveNow() function.
//
// When ResolveNow() is called, it calls ResolveNow() on the parent ClientConn,
// so when grpclb client lose contact with remote balancers, the parent
// ClientConn's resolver will re-resolve.
type lbManualResolver struct {
scheme string
ccr resolver.ClientConn
ccb balancer.ClientConn
}
func (r *lbManualResolver) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
r.ccr = cc
return r, nil
}
func (r *lbManualResolver) Scheme() string {
return r.scheme
}
// ResolveNow calls resolveNow on the parent ClientConn.
func (r *lbManualResolver) ResolveNow(o resolver.ResolveNowOption) {
r.ccb.ResolveNow(o)
}
// Close is a noop for Resolver.
func (*lbManualResolver) Close() {}
// NewAddress calls cc.NewAddress.
func (r *lbManualResolver) NewAddress(addrs []resolver.Address) {
r.ccr.NewAddress(addrs)
}
// NewServiceConfig calls cc.NewServiceConfig.
func (r *lbManualResolver) NewServiceConfig(sc string) {
r.ccr.NewServiceConfig(sc)
}
const subConnCacheTime = time.Second * 10
// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache.
// SubConns will be kept in cache for subConnCacheTime before being removed.
//
// Its new and remove methods are updated to do cache first.
type lbCacheClientConn struct {
cc balancer.ClientConn
timeout time.Duration
mu sync.Mutex
// subConnCache only keeps subConns that are being deleted.
subConnCache map[resolver.Address]*subConnCacheEntry
subConnToAddr map[balancer.SubConn]resolver.Address
}
type subConnCacheEntry struct {
sc balancer.SubConn
cancel func()
abortDeleting bool
}
func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn {
return &lbCacheClientConn{
cc: cc,
timeout: subConnCacheTime,
subConnCache: make(map[resolver.Address]*subConnCacheEntry),
subConnToAddr: make(map[balancer.SubConn]resolver.Address),
}
}
func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len(addrs) != 1 {
return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs))
}
addrWithoutMD := addrs[0]
addrWithoutMD.Metadata = nil
ccc.mu.Lock()
defer ccc.mu.Unlock()
if entry, ok := ccc.subConnCache[addrWithoutMD]; ok {
// If entry is in subConnCache, the SubConn was being deleted.
// cancel function will never be nil.
entry.cancel()
delete(ccc.subConnCache, addrWithoutMD)
return entry.sc, nil
}
scNew, err := ccc.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
ccc.subConnToAddr[scNew] = addrWithoutMD
return scNew, nil
}
func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
ccc.mu.Lock()
defer ccc.mu.Unlock()
addr, ok := ccc.subConnToAddr[sc]
if !ok {
return
}
if entry, ok := ccc.subConnCache[addr]; ok {
if entry.sc != sc {
// This could happen if NewSubConn was called multiple times for the
// same address, and those SubConns are all removed. We remove sc
// immediately here.
delete(ccc.subConnToAddr, sc)
ccc.cc.RemoveSubConn(sc)
}
return
}
entry := &subConnCacheEntry{
sc: sc,
}
ccc.subConnCache[addr] = entry
timer := time.AfterFunc(ccc.timeout, func() {
ccc.mu.Lock()
if entry.abortDeleting {
return
}
ccc.cc.RemoveSubConn(sc)
delete(ccc.subConnToAddr, sc)
delete(ccc.subConnCache, addr)
ccc.mu.Unlock()
})
entry.cancel = func() {
if !timer.Stop() {
// If stop was not successful, the timer has fired (this can only
// happen in a race). But the deleting function is blocked on ccc.mu
// because the mutex was held by the caller of this function.
//
// Set abortDeleting to true to abort the deleting function. When
// the lock is released, the deleting function will acquire the
// lock, check the value of abortDeleting and return.
entry.abortDeleting = true
}
}
}
func (ccc *lbCacheClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
ccc.cc.UpdateBalancerState(s, p)
}
func (ccc *lbCacheClientConn) close() {
ccc.mu.Lock()
// Only cancel all existing timers. There's no need to remove SubConns.
for _, entry := range ccc.subConnCache {
entry.cancel()
}
ccc.mu.Unlock()
}

View file

@ -0,0 +1,219 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"fmt"
"sync"
"testing"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
type mockSubConn struct {
balancer.SubConn
}
type mockClientConn struct {
balancer.ClientConn
mu sync.Mutex
subConns map[balancer.SubConn]resolver.Address
}
func newMockClientConn() *mockClientConn {
return &mockClientConn{
subConns: make(map[balancer.SubConn]resolver.Address),
}
}
func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &mockSubConn{}
mcc.mu.Lock()
defer mcc.mu.Unlock()
mcc.subConns[sc] = addrs[0]
return sc, nil
}
func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
mcc.mu.Lock()
defer mcc.mu.Unlock()
delete(mcc.subConns, sc)
}
const testCacheTimeout = 100 * time.Millisecond
func checkMockCC(mcc *mockClientConn, scLen int) error {
mcc.mu.Lock()
defer mcc.mu.Unlock()
if len(mcc.subConns) != scLen {
return fmt.Errorf("mcc = %+v, want len(mcc.subConns) = %v", mcc.subConns, scLen)
}
return nil
}
func checkCacheCC(ccc *lbCacheClientConn, sccLen, sctaLen int) error {
ccc.mu.Lock()
defer ccc.mu.Unlock()
if len(ccc.subConnCache) != sccLen {
return fmt.Errorf("ccc = %+v, want len(ccc.subConnCache) = %v", ccc.subConnCache, sccLen)
}
if len(ccc.subConnToAddr) != sctaLen {
return fmt.Errorf("ccc = %+v, want len(ccc.subConnToAddr) = %v", ccc.subConnToAddr, sctaLen)
}
return nil
}
// Test that SubConn won't be immediately removed.
func TestLBCacheClientConnExpire(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
t.Fatal(err)
}
ccc := newLBCacheClientConn(mcc)
ccc.timeout = testCacheTimeout
if err := checkCacheCC(ccc, 0, 0); err != nil {
t.Fatal(err)
}
sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
// One subconn in MockCC.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// No subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 0, 1); err != nil {
t.Fatal(err)
}
ccc.RemoveSubConn(sc)
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// One subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 1, 1); err != nil {
t.Fatal(err)
}
// Should all become empty after timeout.
var err error
for i := 0; i < 2; i++ {
time.Sleep(testCacheTimeout)
err = checkMockCC(mcc, 0)
if err != nil {
continue
}
err = checkCacheCC(ccc, 0, 0)
if err != nil {
continue
}
}
if err != nil {
t.Fatal(err)
}
}
// Test that NewSubConn with the same address of a SubConn being removed will
// reuse the SubConn and cancel the removing.
func TestLBCacheClientConnReuse(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
t.Fatal(err)
}
ccc := newLBCacheClientConn(mcc)
ccc.timeout = testCacheTimeout
if err := checkCacheCC(ccc, 0, 0); err != nil {
t.Fatal(err)
}
sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
// One subconn in MockCC.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// No subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 0, 1); err != nil {
t.Fatal(err)
}
ccc.RemoveSubConn(sc)
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// One subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 1, 1); err != nil {
t.Fatal(err)
}
// Recreate the old subconn, this should cancel the deleting process.
sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
// One subconn in MockCC.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// No subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 0, 1); err != nil {
t.Fatal(err)
}
var err error
// Should not become empty after 2*timeout.
time.Sleep(2 * testCacheTimeout)
err = checkMockCC(mcc, 1)
if err != nil {
t.Fatal(err)
}
err = checkCacheCC(ccc, 0, 1)
if err != nil {
t.Fatal(err)
}
// Call remove again, will delete after timeout.
ccc.RemoveSubConn(sc)
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// One subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 1, 1); err != nil {
t.Fatal(err)
}
// Should all become empty after timeout.
for i := 0; i < 2; i++ {
time.Sleep(testCacheTimeout)
err = checkMockCC(mcc, 0)
if err != nil {
continue
}
err = checkCacheCC(ccc, 0, 0)
if err != nil {
continue
}
}
if err != nil {
t.Fatal(err)
}
}

33
vendor/google.golang.org/grpc/balancer/grpclb/regenerate.sh generated vendored Executable file
View file

@ -0,0 +1,33 @@
#!/bin/bash
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -eux -o pipefail
TMP=$(mktemp -d)
function finish {
rm -rf "$TMP"
}
trap finish EXIT
pushd "$TMP"
mkdir -p grpc/lb/v1
curl https://raw.githubusercontent.com/grpc/grpc-proto/master/grpc/lb/v1/load_balancer.proto > grpc/lb/v1/load_balancer.proto
protoc --go_out=plugins=grpc,paths=source_relative:. -I. grpc/lb/v1/*.proto
popd
rm -f grpc_lb_v1/*.pb.go
cp "$TMP"/grpc/lb/v1/*.pb.go grpc_lb_v1/

View file

@ -0,0 +1,79 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package roundrobin defines a roundrobin balancer. Roundrobin balancer is
// installed as one of the default balancers in gRPC, users don't need to
// explicitly install this balancer.
package roundrobin
import (
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
// Name is the name of round_robin balancer.
const Name = "round_robin"
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}
func init() {
balancer.Register(newBuilder())
}
type rrPickerBuilder struct{}
func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
var scs []balancer.SubConn
for _, sc := range readySCs {
scs = append(scs, sc)
}
return &rrPicker{
subConns: scs,
}
}
type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
mu sync.Mutex
next int
}
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return sc, nil, nil
}

View file

@ -0,0 +1,477 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package roundrobin_test
import (
"fmt"
"net"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)
type testServer struct {
testpb.TestServiceServer
}
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}
func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
return nil
}
type test struct {
servers []*grpc.Server
addresses []string
}
func (t *test) cleanup() {
for _, s := range t.servers {
s.Stop()
}
}
func startTestServers(count int) (_ *test, err error) {
t := &test{}
defer func() {
if err != nil {
for _, s := range t.servers {
s.Stop()
}
}
}()
for i := 0; i < count; i++ {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("Failed to listen %v", err)
}
s := grpc.NewServer()
testpb.RegisterTestServiceServer(s, &testServer{})
t.servers = append(t.servers, s)
t.addresses = append(t.addresses, lis.Addr().String())
go func(s *grpc.Server, l net.Listener) {
s.Serve(l)
}(s, lis)
}
return t, nil
}
func TestOneBackend(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
test, err := startTestServers(1)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
}
func TestBackendsRoundRobin(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
backendCount := 5
test, err := startTestServers(backendCount)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
var resolvedAddrs []resolver.Address
for i := 0; i < backendCount; i++ {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
}
r.NewAddress(resolvedAddrs)
var p peer.Peer
// Make sure connections to all servers are up.
for si := 0; si < backendCount; si++ {
var connected bool
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
if p.Addr.String() == test.addresses[si] {
connected = true
break
}
time.Sleep(time.Millisecond)
}
if !connected {
t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
}
}
for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
}
}
}
func TestAddressesRemoved(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
test, err := startTestServers(1)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
r.NewAddress([]resolver.Address{})
for i := 0; i < 1000; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
}
func TestCloseWithPendingRPC(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
test, err := startTestServers(1)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
testc := testpb.NewTestServiceClient(cc)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// This RPC blocks until cc is closed.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded {
t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
}
cancel()
}()
}
cc.Close()
wg.Wait()
}
func TestNewAddressWhileBlocking(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
test, err := startTestServers(1)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed.
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
}
r.NewAddress([]resolver.Address{})
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// This RPC blocks until NewAddress is called.
testc.EmptyCall(context.Background(), &testpb.Empty{})
}()
}
time.Sleep(50 * time.Millisecond)
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
wg.Wait()
}
func TestOneServerDown(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
backendCount := 3
test, err := startTestServers(backendCount)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
var resolvedAddrs []resolver.Address
for i := 0; i < backendCount; i++ {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
}
r.NewAddress(resolvedAddrs)
var p peer.Peer
// Make sure connections to all servers are up.
for si := 0; si < backendCount; si++ {
var connected bool
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
if p.Addr.String() == test.addresses[si] {
connected = true
break
}
time.Sleep(time.Millisecond)
}
if !connected {
t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
}
}
for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
}
}
// Stop one server, RPCs should roundrobin among the remaining servers.
backendCount--
test.servers[backendCount].Stop()
// Loop until see server[backendCount-1] twice without seeing server[backendCount].
var targetSeen int
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
targetSeen = 0
t.Logf("EmptyCall() = _, %v, want _, <nil>", err)
// Due to a race, this RPC could possibly get the connection that
// was closing, and this RPC may fail. Keep trying when this
// happens.
continue
}
switch p.Addr.String() {
case test.addresses[backendCount-1]:
targetSeen++
case test.addresses[backendCount]:
// Reset targetSeen if peer is server[backendCount].
targetSeen = 0
}
// Break to make sure the last picked address is server[-1], so the following for loop won't be flaky.
if targetSeen >= 2 {
break
}
}
if targetSeen != 2 {
t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
}
for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
}
}
}
func TestAllServersDown(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
backendCount := 3
test, err := startTestServers(backendCount)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
var resolvedAddrs []resolver.Address
for i := 0; i < backendCount; i++ {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
}
r.NewAddress(resolvedAddrs)
var p peer.Peer
// Make sure connections to all servers are up.
for si := 0; si < backendCount; si++ {
var connected bool
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
if p.Addr.String() == test.addresses[si] {
connected = true
break
}
time.Sleep(time.Millisecond)
}
if !connected {
t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
}
}
for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
}
}
// All servers are stopped, failfast RPC should fail with unavailable.
for i := 0; i < backendCount; i++ {
test.servers[i].Stop()
}
time.Sleep(100 * time.Millisecond)
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
}