Update the kubernetes api to latest

Signed-off-by: Mrunal Patel <mrunalp@gmail.com>
This commit is contained in:
Mrunal Patel 2016-08-30 15:27:31 -07:00
parent f5f2ff63b2
commit 303a3929b2
56 changed files with 6885 additions and 2610 deletions

View file

@ -34,13 +34,14 @@
package grpc
import (
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"os"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
@ -60,7 +61,7 @@ type Codec interface {
String() string
}
// protoCodec is a Codec implemetation with protobuf. It is the default codec for gRPC.
// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC.
type protoCodec struct{}
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
@ -75,6 +76,73 @@ func (protoCodec) String() string {
return "proto"
}
// Compressor defines the interface gRPC uses to compress a message.
type Compressor interface {
// Do compresses p into w.
Do(w io.Writer, p []byte) error
// Type returns the compression algorithm the Compressor uses.
Type() string
}
// NewGZIPCompressor creates a Compressor based on GZIP.
func NewGZIPCompressor() Compressor {
return &gzipCompressor{}
}
type gzipCompressor struct {
}
func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
z := gzip.NewWriter(w)
if _, err := z.Write(p); err != nil {
return err
}
return z.Close()
}
func (c *gzipCompressor) Type() string {
return "gzip"
}
// Decompressor defines the interface gRPC uses to decompress a message.
type Decompressor interface {
// Do reads the data from r and uncompress them.
Do(r io.Reader) ([]byte, error)
// Type returns the compression algorithm the Decompressor uses.
Type() string
}
type gzipDecompressor struct {
}
// NewGZIPDecompressor creates a Decompressor based on GZIP.
func NewGZIPDecompressor() Decompressor {
return &gzipDecompressor{}
}
func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
z, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
defer z.Close()
return ioutil.ReadAll(z)
}
func (d *gzipDecompressor) Type() string {
return "gzip"
}
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
traceInfo traceInfo // in trace.go
}
var defaultCallInfo = callInfo{failFast: true}
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
@ -113,41 +181,70 @@ func Trailer(md *metadata.MD) CallOption {
})
}
// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failfast is true, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will retry
// the call if it fails due to a transient error. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md
func FailFast(failFast bool) CallOption {
return beforeCall(func(c *callInfo) error {
c.failFast = failFast
return nil
})
}
// The format of the payload: compressed or not?
type payloadFormat uint8
const (
compressionNone payloadFormat = iota // no compression
compressionFlate
// More formats
compressionMade
)
// parser reads complelete gRPC messages from the underlying reader.
// parser reads complete gRPC messages from the underlying reader.
type parser struct {
s io.Reader
}
// r is the underlying reader.
// See the comment on recvMsg for the permissible
// error types.
r io.Reader
// recvMsg is to read a complete gRPC message from the stream. It is blocking if
// the message has not been complete yet. It returns the message and its type,
// EOF is returned with nil msg and 0 pf if the entire stream is done. Other
// non-nil error is returned if something is wrong on reading.
func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) {
// The header of a gRPC message. Find more detail
// at http://www.grpc.io/docs/guides/wire.html.
var buf [5]byte
header [5]byte
}
if _, err := io.ReadFull(p.s, buf[:]); err != nil {
// recvMsg reads a complete gRPC message from the stream.
//
// It returns the message and its payload (compression/encoding)
// format. The caller owns the returned msg memory.
//
// If there is an error, possible values are:
// * io.EOF, when no messages remain
// * io.ErrUnexpectedEOF
// * of type transport.ConnectionError
// * of type transport.StreamError
// No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible
// error.
func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
return 0, nil, err
}
pf = payloadFormat(buf[0])
length := binary.BigEndian.Uint32(buf[1:])
pf = payloadFormat(p.header[0])
length := binary.BigEndian.Uint32(p.header[1:])
if length == 0 {
return pf, nil, nil
}
if length > uint32(maxMsgSize) {
return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
if _, err := io.ReadFull(p.s, msg); err != nil {
if _, err := io.ReadFull(p.r, msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
@ -158,7 +255,7 @@ func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) {
// encode serializes msg and prepends the message header. If msg is nil, it
// generates the message header of 0 message length.
func encode(c Codec, msg interface{}, pf payloadFormat) ([]byte, error) {
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) {
var b []byte
var length uint
if msg != nil {
@ -168,6 +265,12 @@ func encode(c Codec, msg interface{}, pf payloadFormat) ([]byte, error) {
if err != nil {
return nil, err
}
if cp != nil {
if err := cp.Do(cbuf, b); err != nil {
return nil, err
}
b = cbuf.Bytes()
}
length = uint(len(b))
}
if length > math.MaxUint32 {
@ -182,7 +285,11 @@ func encode(c Codec, msg interface{}, pf payloadFormat) ([]byte, error) {
var buf = make([]byte, payloadLen+sizeLen+len(b))
// Write payload format
buf[0] = byte(pf)
if cp == nil {
buf[0] = byte(compressionNone)
} else {
buf[0] = byte(compressionMade)
}
// Write length of b into buf
binary.BigEndian.PutUint32(buf[1:], uint32(length))
// Copy encoded msg to buf
@ -191,22 +298,40 @@ func encode(c Codec, msg interface{}, pf payloadFormat) ([]byte, error) {
return buf, nil
}
func recv(p *parser, c Codec, m interface{}) error {
pf, d, err := p.recvMsg()
func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error {
switch pf {
case compressionNone:
case compressionMade:
if dc == nil || recvCompress != dc.Type() {
return transport.StreamErrorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
default:
return transport.StreamErrorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
}
return nil
}
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error {
pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
switch pf {
case compressionNone:
if err := c.Unmarshal(d, m); err != nil {
if rErr, ok := err.(rpcError); ok {
return rErr
} else {
return Errorf(codes.Internal, "grpc: %v", err)
}
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
return err
}
if pf == compressionMade {
d, err = dc.Do(bytes.NewReader(d))
if err != nil {
return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
default:
return Errorf(codes.Internal, "gprc: compression is not supported yet.")
}
if len(d) > maxMsgSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize)
}
if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
return nil
}
@ -217,8 +342,8 @@ type rpcError struct {
desc string
}
func (e rpcError) Error() string {
return fmt.Sprintf("rpc error: code = %d desc = %q", e.code, e.desc)
func (e *rpcError) Error() string {
return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
}
// Code returns the error code for err if it was produced by the rpc system.
@ -227,7 +352,7 @@ func Code(err error) codes.Code {
if err == nil {
return codes.OK
}
if e, ok := err.(rpcError); ok {
if e, ok := err.(*rpcError); ok {
return e.code
}
return codes.Unknown
@ -239,7 +364,7 @@ func ErrorDesc(err error) string {
if err == nil {
return ""
}
if e, ok := err.(rpcError); ok {
if e, ok := err.(*rpcError); ok {
return e.desc
}
return err.Error()
@ -251,7 +376,7 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
if c == codes.OK {
return nil
}
return rpcError{
return &rpcError{
code: c,
desc: fmt.Sprintf(format, a...),
}
@ -260,18 +385,37 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
// toRPCErr converts an error into a rpcError.
func toRPCErr(err error) error {
switch e := err.(type) {
case rpcError:
case *rpcError:
return err
case transport.StreamError:
return rpcError{
return &rpcError{
code: e.Code,
desc: e.Desc,
}
case transport.ConnectionError:
return rpcError{
return &rpcError{
code: codes.Internal,
desc: e.Desc,
}
default:
switch err {
case context.DeadlineExceeded:
return &rpcError{
code: codes.DeadlineExceeded,
desc: err.Error(),
}
case context.Canceled:
return &rpcError{
code: codes.Canceled,
desc: err.Error(),
}
case ErrClientConnClosing:
return &rpcError{
code: codes.FailedPrecondition,
desc: err.Error(),
}
}
}
return Errorf(codes.Unknown, "%v", err)
}
@ -304,34 +448,10 @@ func convertCode(err error) codes.Code {
return codes.Unknown
}
const (
// how long to wait after the first failure before retrying
baseDelay = 1.0 * time.Second
// upper bound of backoff delay
maxDelay = 120 * time.Second
// backoff increases by this factor on each retry
backoffFactor = 1.6
// backoff is randomized downwards by this factor
backoffJitter = 0.2
)
func backoff(retries int) (t time.Duration) {
if retries == 0 {
return baseDelay
}
backoff, max := float64(baseDelay), float64(maxDelay)
for backoff < max && retries > 0 {
backoff *= backoffFactor
retries--
}
if backoff > max {
backoff = max
}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
backoff *= 1 + backoffJitter*(rand.Float64()*2-1)
if backoff < 0 {
return 0
}
return time.Duration(backoff)
}
// SupportPackageIsVersion3 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package.
//
// This constant may be renamed in the future if a change in the generated code
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
const SupportPackageIsVersion3 = true