Signed-off-by: Olivier Gambier <olivier@docker.com>
This commit is contained in:
Olivier Gambier 2016-03-22 10:42:33 -07:00
parent 59401e277b
commit d1444b56e9
141 changed files with 19483 additions and 4205 deletions

View file

@ -34,13 +34,13 @@
package grpc
import (
"bytes"
"io"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/transport"
)
@ -48,16 +48,16 @@ import (
// On error, it returns the error and indicates whether the call should be retried.
//
// TODO(zhaoq): Check whether the received message sequence is valid.
func recvResponse(codec Codec, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
// Try to acquire header metadata from the server if there is any.
var err error
c.headerMD, err = stream.Header()
if err != nil {
return err
}
p := &parser{s: stream}
p := &parser{r: stream}
for {
if err = recv(p, codec, reply); err != nil {
if err = recv(p, dopts.codec, stream, dopts.dc, reply); err != nil {
if err == io.EOF {
break
}
@ -69,7 +69,7 @@ func recvResponse(codec Codec, t transport.ClientTransport, c *callInfo, stream
}
// sendRequest writes out various information of an RPC such as Context and Message.
func sendRequest(ctx context.Context, codec Codec, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
stream, err := t.NewStream(ctx, callHdr)
if err != nil {
return nil, err
@ -81,8 +81,11 @@ func sendRequest(ctx context.Context, codec Codec, callHdr *transport.CallHdr, t
}
}
}()
// TODO(zhaoq): Support compression.
outBuf, err := encode(codec, args, compressionNone)
var cbuf *bytes.Buffer
if compressor != nil {
cbuf = new(bytes.Buffer)
}
outBuf, err := encode(codec, args, compressor, cbuf)
if err != nil {
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
}
@ -94,14 +97,6 @@ func sendRequest(ctx context.Context, codec Codec, callHdr *transport.CallHdr, t
return stream, nil
}
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
traceInfo traceInfo // in trace.go
}
// Invoke is called by the generated code. It sends the RPC request on the
// wire and returns after response is received.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
@ -116,9 +111,8 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
o.after(&c)
}
}()
if EnableTracing {
c.traceInfo.tr = trace.New("Sent."+methodFamily(method), method)
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
defer c.traceInfo.tr.Finish()
c.traceInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
@ -133,17 +127,11 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
}()
}
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
}
topts := &transport.Options{
Last: true,
Delay: false,
}
var (
ts int // track the transport sequence number
lastErr error // record the error that happened
)
for {
@ -156,7 +144,14 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if lastErr != nil && c.failFast {
return toRPCErr(lastErr)
}
t, ts, err = cc.wait(ctx, ts)
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
}
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
t, err = cc.dopts.picker.Pick(ctx)
if err != nil {
if lastErr != nil {
// This was a retry; return the error from the last attempt.
@ -164,10 +159,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
if EnableTracing {
c.traceInfo.tr.LazyLog(payload{args}, true)
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts)
stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
if err != nil {
if _, ok := err.(transport.ConnectionError); ok {
lastErr = err
@ -179,12 +174,12 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
return toRPCErr(err)
}
// Receive the response
lastErr = recvResponse(cc.dopts.codec, t, &c, stream, reply)
lastErr = recvResponse(cc.dopts, t, &c, stream, reply)
if _, ok := lastErr.(transport.ConnectionError); ok {
continue
}
if EnableTracing {
c.traceInfo.tr.LazyLog(payload{reply}, true)
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
}
t.CloseStream(stream, lastErr)
if lastErr != nil {