From 39006d4cddbec62aef9842536e7e27fd2cdbf26b Mon Sep 17 00:00:00 2001 From: Antonio Murdaca Date: Fri, 1 Sep 2017 12:37:10 +0200 Subject: [PATCH] serve grpc and http on the same socket Signed-off-by: Antonio Murdaca --- cmd/crio/main.go | 38 ++- server/inspect.go | 8 +- vendor.conf | 1 + vendor/github.com/soheilhy/cmux/LICENSE | 202 +++++++++++++++ vendor/github.com/soheilhy/cmux/README.md | 83 ++++++ vendor/github.com/soheilhy/cmux/buffer.go | 67 +++++ vendor/github.com/soheilhy/cmux/cmux.go | 269 ++++++++++++++++++++ vendor/github.com/soheilhy/cmux/doc.go | 18 ++ vendor/github.com/soheilhy/cmux/matchers.go | 262 +++++++++++++++++++ vendor/github.com/soheilhy/cmux/patricia.go | 179 +++++++++++++ 10 files changed, 1109 insertions(+), 18 deletions(-) create mode 100644 vendor/github.com/soheilhy/cmux/LICENSE create mode 100644 vendor/github.com/soheilhy/cmux/README.md create mode 100644 vendor/github.com/soheilhy/cmux/buffer.go create mode 100644 vendor/github.com/soheilhy/cmux/cmux.go create mode 100644 vendor/github.com/soheilhy/cmux/doc.go create mode 100644 vendor/github.com/soheilhy/cmux/matchers.go create mode 100644 vendor/github.com/soheilhy/cmux/patricia.go diff --git a/cmd/crio/main.go b/cmd/crio/main.go index 6f586ecb..5aa98066 100644 --- a/cmd/crio/main.go +++ b/cmd/crio/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "net" "net/http" @@ -15,6 +16,7 @@ import ( "github.com/kubernetes-incubator/cri-o/server" "github.com/opencontainers/selinux/go-selinux" "github.com/sirupsen/logrus" + "github.com/soheilhy/cmux" "github.com/urfave/cli" "golang.org/x/sys/unix" "google.golang.org/grpc" @@ -126,7 +128,7 @@ func mergeConfig(config *server.Config, ctx *cli.Context) error { return nil } -func catchShutdown(gserver *grpc.Server, sserver *server.Server, signalled *bool) { +func catchShutdown(gserver *grpc.Server, sserver *server.Server, hserver *http.Server, signalled *bool) { sig := make(chan os.Signal, 10) signal.Notify(sig, unix.SIGINT, unix.SIGTERM) go func() { @@ -141,6 +143,7 @@ func catchShutdown(gserver *grpc.Server, sserver *server.Server, signalled *bool } *signalled = true gserver.GracefulStop() + hserver.Shutdown(context.Background()) return } }() @@ -400,8 +403,6 @@ func main() { }() } - graceful := false - catchShutdown(s, service, &graceful) runtime.RegisterRuntimeServiceServer(s, service) runtime.RegisterImageServiceServer(s, service) @@ -412,17 +413,28 @@ func main() { service.StartExitMonitor() }() - go func() { - err := service.StartInfoEndpoints(lis) - // graceful shutdown doesn't quite work with unix domain sockets - if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - logrus.Fatalf("Failed to start container inspect endpoint: %v", err) - } - }() + m := cmux.New(lis) + grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) + httpL := m.Match(cmux.HTTP1Fast()) - err = s.Serve(lis) - if graceful && strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { - err = nil + infoMux := service.GetInfoMux() + srv := &http.Server{ + Handler: infoMux, + } + + graceful := false + catchShutdown(s, service, srv, &graceful) + + go s.Serve(grpcL) + go srv.Serve(httpL) + + err = m.Serve() + if err != nil { + if graceful && strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { + err = nil + } else { + logrus.Fatal(err) + } } if err2 := service.Shutdown(); err2 != nil { diff --git a/server/inspect.go b/server/inspect.go index 7441d5ae..6de6d4fc 100644 --- a/server/inspect.go +++ b/server/inspect.go @@ -3,7 +3,6 @@ package server import ( "encoding/json" "fmt" - "net" "net/http" "path/filepath" @@ -27,9 +26,8 @@ type CrioInfo struct { StorageRoot string `json:"storage_root"` } -// StartInfoEndpoints starts a http server that -// serves container information requests and crio daemon information -func (s *Server) StartInfoEndpoints(l net.Listener) error { +// GetInfoMux returns the mux used to serve info requests +func (s *Server) GetInfoMux() *bone.Mux { mux := bone.New() mux.Get("/info", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -77,5 +75,5 @@ func (s *Server) StartInfoEndpoints(l net.Listener) error { w.Write(js) })) - return http.Serve(l, mux) + return mux } diff --git a/vendor.conf b/vendor.conf index 51103c37..9ca085aa 100644 --- a/vendor.conf +++ b/vendor.conf @@ -98,3 +98,4 @@ github.com/matttproud/golang_protobuf_extensions fc2b8d3a73c4867e51861bbdd5ae3c1 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 github.com/containerd/cgroups 7a5fdd8330119dc70d850260db8f3594d89d6943 github.com/go-zoo/bone 031b4005dfe248ccba241a0c9de0f9e112fd6b7c +github.com/soheilhy/cmux v0.1.3 diff --git a/vendor/github.com/soheilhy/cmux/LICENSE b/vendor/github.com/soheilhy/cmux/LICENSE new file mode 100644 index 00000000..d6456956 --- /dev/null +++ b/vendor/github.com/soheilhy/cmux/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/soheilhy/cmux/README.md b/vendor/github.com/soheilhy/cmux/README.md new file mode 100644 index 00000000..70306e6a --- /dev/null +++ b/vendor/github.com/soheilhy/cmux/README.md @@ -0,0 +1,83 @@ +# cmux: Connection Mux ![Travis Build Status](https://api.travis-ci.org/soheilhy/args.svg?branch=master "Travis Build Status") [![GoDoc](https://godoc.org/github.com/soheilhy/cmux?status.svg)](http://godoc.org/github.com/soheilhy/cmux) + +cmux is a generic Go library to multiplex connections based on +their payload. Using cmux, you can serve gRPC, SSH, HTTPS, HTTP, +Go RPC, and pretty much any other protocol on the same TCP listener. + +## How-To +Simply create your main listener, create a cmux for that listener, +and then match connections: +```go +// Create the main listener. +l, err := net.Listen("tcp", ":23456") +if err != nil { + log.Fatal(err) +} + +// Create a cmux. +m := cmux.New(l) + +// Match connections in order: +// First grpc, then HTTP, and otherwise Go RPC/TCP. +grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) +httpL := m.Match(cmux.HTTP1Fast()) +trpcL := m.Match(cmux.Any()) // Any means anything that is not yet matched. + +// Create your protocol servers. +grpcS := grpc.NewServer() +grpchello.RegisterGreeterServer(grpcs, &server{}) + +httpS := &http.Server{ + Handler: &helloHTTP1Handler{}, +} + +trpcS := rpc.NewServer() +trpcS.Register(&ExampleRPCRcvr{}) + +// Use the muxed listeners for your servers. +go grpcS.Serve(grpcL) +go httpS.Serve(httpL) +go trpcS.Accept(trpcL) + +// Start serving! +m.Serve() +``` + +Take a look at [other examples in the GoDoc](http://godoc.org/github.com/soheilhy/cmux/#pkg-examples). + +## Docs +* [GoDocs](https://godoc.org/github.com/soheilhy/cmux) + +## Performance +There is room for improvment but, since we are only matching +the very first bytes of a connection, the performance overheads on +long-lived connections (i.e., RPCs and pipelined HTTP streams) +is negligible. + +*TODO(soheil)*: Add benchmarks. + +## Limitations +* *TLS*: `net/http` uses a type assertion to identify TLS connections; since +cmux's lookahead-implementing connection wraps the underlying TLS connection, +this type assertion fails. +Because of that, you can serve HTTPS using cmux but `http.Request.TLS` +would not be set in your handlers. + +* *Different Protocols on The Same Connection*: `cmux` matches the connection +when it's accepted. For example, one connection can be either gRPC or REST, but +not both. That is, we assume that a client connection is either used for gRPC +or REST. + +* *Java gRPC Clients*: Java gRPC client blocks until it receives a SETTINGS +frame from the server. If you are using the Java client to connect to a cmux'ed +gRPC server please match with writers: +```go +grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) +``` + +# Copyright and License +Copyright 2016 The CMux Authors. All rights reserved. + +See [CONTRIBUTORS](https://github.com/soheilhy/cmux/blob/master/CONTRIBUTORS) +for the CMux Authors. Code is released under +[the Apache 2 license](https://github.com/soheilhy/cmux/blob/master/LICENSE). diff --git a/vendor/github.com/soheilhy/cmux/buffer.go b/vendor/github.com/soheilhy/cmux/buffer.go new file mode 100644 index 00000000..f8cf30a1 --- /dev/null +++ b/vendor/github.com/soheilhy/cmux/buffer.go @@ -0,0 +1,67 @@ +// Copyright 2016 The CMux Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cmux + +import ( + "bytes" + "io" +) + +// bufferedReader is an optimized implementation of io.Reader that behaves like +// ``` +// io.MultiReader(bytes.NewReader(buffer.Bytes()), io.TeeReader(source, buffer)) +// ``` +// without allocating. +type bufferedReader struct { + source io.Reader + buffer bytes.Buffer + bufferRead int + bufferSize int + sniffing bool + lastErr error +} + +func (s *bufferedReader) Read(p []byte) (int, error) { + if s.bufferSize > s.bufferRead { + // If we have already read something from the buffer before, we return the + // same data and the last error if any. We need to immediately return, + // otherwise we may block for ever, if we try to be smart and call + // source.Read() seeking a little bit of more data. + bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize]) + s.bufferRead += bn + return bn, s.lastErr + } else if !s.sniffing && s.buffer.Cap() != 0 { + // We don't need the buffer anymore. + // Reset it to release the internal slice. + s.buffer = bytes.Buffer{} + } + + // If there is nothing more to return in the sniffed buffer, read from the + // source. + sn, sErr := s.source.Read(p) + if sn > 0 && s.sniffing { + s.lastErr = sErr + if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil { + return wn, wErr + } + } + return sn, sErr +} + +func (s *bufferedReader) reset(snif bool) { + s.sniffing = snif + s.bufferRead = 0 + s.bufferSize = s.buffer.Len() +} diff --git a/vendor/github.com/soheilhy/cmux/cmux.go b/vendor/github.com/soheilhy/cmux/cmux.go new file mode 100644 index 00000000..9de6b0a3 --- /dev/null +++ b/vendor/github.com/soheilhy/cmux/cmux.go @@ -0,0 +1,269 @@ +// Copyright 2016 The CMux Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cmux + +import ( + "fmt" + "io" + "net" + "sync" + "time" +) + +// Matcher matches a connection based on its content. +type Matcher func(io.Reader) bool + +// MatchWriter is a match that can also write response (say to do handshake). +type MatchWriter func(io.Writer, io.Reader) bool + +// ErrorHandler handles an error and returns whether +// the mux should continue serving the listener. +type ErrorHandler func(error) bool + +var _ net.Error = ErrNotMatched{} + +// ErrNotMatched is returned whenever a connection is not matched by any of +// the matchers registered in the multiplexer. +type ErrNotMatched struct { + c net.Conn +} + +func (e ErrNotMatched) Error() string { + return fmt.Sprintf("mux: connection %v not matched by an matcher", + e.c.RemoteAddr()) +} + +// Temporary implements the net.Error interface. +func (e ErrNotMatched) Temporary() bool { return true } + +// Timeout implements the net.Error interface. +func (e ErrNotMatched) Timeout() bool { return false } + +type errListenerClosed string + +func (e errListenerClosed) Error() string { return string(e) } +func (e errListenerClosed) Temporary() bool { return false } +func (e errListenerClosed) Timeout() bool { return false } + +// ErrListenerClosed is returned from muxListener.Accept when the underlying +// listener is closed. +var ErrListenerClosed = errListenerClosed("mux: listener closed") + +// for readability of readTimeout +var noTimeout time.Duration + +// New instantiates a new connection multiplexer. +func New(l net.Listener) CMux { + return &cMux{ + root: l, + bufLen: 1024, + errh: func(_ error) bool { return true }, + donec: make(chan struct{}), + readTimeout: noTimeout, + } +} + +// CMux is a multiplexer for network connections. +type CMux interface { + // Match returns a net.Listener that sees (i.e., accepts) only + // the connections matched by at least one of the matcher. + // + // The order used to call Match determines the priority of matchers. + Match(...Matcher) net.Listener + // MatchWithWriters returns a net.Listener that accepts only the + // connections that matched by at least of the matcher writers. + // + // Prefer Matchers over MatchWriters, since the latter can write on the + // connection before the actual handler. + // + // The order used to call Match determines the priority of matchers. + MatchWithWriters(...MatchWriter) net.Listener + // Serve starts multiplexing the listener. Serve blocks and perhaps + // should be invoked concurrently within a go routine. + Serve() error + // HandleError registers an error handler that handles listener errors. + HandleError(ErrorHandler) + // sets a timeout for the read of matchers + SetReadTimeout(time.Duration) +} + +type matchersListener struct { + ss []MatchWriter + l muxListener +} + +type cMux struct { + root net.Listener + bufLen int + errh ErrorHandler + donec chan struct{} + sls []matchersListener + readTimeout time.Duration +} + +func matchersToMatchWriters(matchers []Matcher) []MatchWriter { + mws := make([]MatchWriter, 0, len(matchers)) + for _, m := range matchers { + mws = append(mws, func(w io.Writer, r io.Reader) bool { + return m(r) + }) + } + return mws +} + +func (m *cMux) Match(matchers ...Matcher) net.Listener { + mws := matchersToMatchWriters(matchers) + return m.MatchWithWriters(mws...) +} + +func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener { + ml := muxListener{ + Listener: m.root, + connc: make(chan net.Conn, m.bufLen), + } + m.sls = append(m.sls, matchersListener{ss: matchers, l: ml}) + return ml +} + +func (m *cMux) SetReadTimeout(t time.Duration) { + m.readTimeout = t +} + +func (m *cMux) Serve() error { + var wg sync.WaitGroup + + defer func() { + close(m.donec) + wg.Wait() + + for _, sl := range m.sls { + close(sl.l.connc) + // Drain the connections enqueued for the listener. + for c := range sl.l.connc { + _ = c.Close() + } + } + }() + + for { + c, err := m.root.Accept() + if err != nil { + if !m.handleErr(err) { + return err + } + continue + } + + wg.Add(1) + go m.serve(c, m.donec, &wg) + } +} + +func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + + muc := newMuxConn(c) + if m.readTimeout > noTimeout { + _ = c.SetReadDeadline(time.Now().Add(m.readTimeout)) + } + for _, sl := range m.sls { + for _, s := range sl.ss { + matched := s(muc.Conn, muc.startSniffing()) + if matched { + muc.doneSniffing() + if m.readTimeout > noTimeout { + _ = c.SetReadDeadline(time.Time{}) + } + select { + case sl.l.connc <- muc: + case <-donec: + _ = c.Close() + } + return + } + } + } + + _ = c.Close() + err := ErrNotMatched{c: c} + if !m.handleErr(err) { + _ = m.root.Close() + } +} + +func (m *cMux) HandleError(h ErrorHandler) { + m.errh = h +} + +func (m *cMux) handleErr(err error) bool { + if !m.errh(err) { + return false + } + + if ne, ok := err.(net.Error); ok { + return ne.Temporary() + } + + return false +} + +type muxListener struct { + net.Listener + connc chan net.Conn +} + +func (l muxListener) Accept() (net.Conn, error) { + c, ok := <-l.connc + if !ok { + return nil, ErrListenerClosed + } + return c, nil +} + +// MuxConn wraps a net.Conn and provides transparent sniffing of connection data. +type MuxConn struct { + net.Conn + buf bufferedReader +} + +func newMuxConn(c net.Conn) *MuxConn { + return &MuxConn{ + Conn: c, + buf: bufferedReader{source: c}, + } +} + +// From the io.Reader documentation: +// +// When Read encounters an error or end-of-file condition after +// successfully reading n > 0 bytes, it returns the number of +// bytes read. It may return the (non-nil) error from the same call +// or return the error (and n == 0) from a subsequent call. +// An instance of this general case is that a Reader returning +// a non-zero number of bytes at the end of the input stream may +// return either err == EOF or err == nil. The next Read should +// return 0, EOF. +func (m *MuxConn) Read(p []byte) (int, error) { + return m.buf.Read(p) +} + +func (m *MuxConn) startSniffing() io.Reader { + m.buf.reset(true) + return &m.buf +} + +func (m *MuxConn) doneSniffing() { + m.buf.reset(false) +} diff --git a/vendor/github.com/soheilhy/cmux/doc.go b/vendor/github.com/soheilhy/cmux/doc.go new file mode 100644 index 00000000..aaa8f315 --- /dev/null +++ b/vendor/github.com/soheilhy/cmux/doc.go @@ -0,0 +1,18 @@ +// Copyright 2016 The CMux Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Package cmux is a library to multiplex network connections based on +// their payload. Using cmux, you can serve different protocols from the +// same listener. +package cmux diff --git a/vendor/github.com/soheilhy/cmux/matchers.go b/vendor/github.com/soheilhy/cmux/matchers.go new file mode 100644 index 00000000..652fd869 --- /dev/null +++ b/vendor/github.com/soheilhy/cmux/matchers.go @@ -0,0 +1,262 @@ +// Copyright 2016 The CMux Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cmux + +import ( + "bufio" + "crypto/tls" + "io" + "io/ioutil" + "net/http" + "strings" + + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" +) + +// Any is a Matcher that matches any connection. +func Any() Matcher { + return func(r io.Reader) bool { return true } +} + +// PrefixMatcher returns a matcher that matches a connection if it +// starts with any of the strings in strs. +func PrefixMatcher(strs ...string) Matcher { + pt := newPatriciaTreeString(strs...) + return pt.matchPrefix +} + +func prefixByteMatcher(list ...[]byte) Matcher { + pt := newPatriciaTree(list...) + return pt.matchPrefix +} + +var defaultHTTPMethods = []string{ + "OPTIONS", + "GET", + "HEAD", + "POST", + "PUT", + "DELETE", + "TRACE", + "CONNECT", +} + +// HTTP1Fast only matches the methods in the HTTP request. +// +// This matcher is very optimistic: if it returns true, it does not mean that +// the request is a valid HTTP response. If you want a correct but slower HTTP1 +// matcher, use HTTP1 instead. +func HTTP1Fast(extMethods ...string) Matcher { + return PrefixMatcher(append(defaultHTTPMethods, extMethods...)...) +} + +// TLS matches HTTPS requests. +// +// By default, any TLS handshake packet is matched. An optional whitelist +// of versions can be passed in to restrict the matcher, for example: +// TLS(tls.VersionTLS11, tls.VersionTLS12) +func TLS(versions ...int) Matcher { + if len(versions) == 0 { + versions = []int{ + tls.VersionSSL30, + tls.VersionTLS10, + tls.VersionTLS11, + tls.VersionTLS12, + } + } + prefixes := [][]byte{} + for _, v := range versions { + prefixes = append(prefixes, []byte{22, byte(v >> 8 & 0xff), byte(v & 0xff)}) + } + return prefixByteMatcher(prefixes...) +} + +const maxHTTPRead = 4096 + +// HTTP1 parses the first line or upto 4096 bytes of the request to see if +// the conection contains an HTTP request. +func HTTP1() Matcher { + return func(r io.Reader) bool { + br := bufio.NewReader(&io.LimitedReader{R: r, N: maxHTTPRead}) + l, part, err := br.ReadLine() + if err != nil || part { + return false + } + + _, _, proto, ok := parseRequestLine(string(l)) + if !ok { + return false + } + + v, _, ok := http.ParseHTTPVersion(proto) + return ok && v == 1 + } +} + +// grabbed from net/http. +func parseRequestLine(line string) (method, uri, proto string, ok bool) { + s1 := strings.Index(line, " ") + s2 := strings.Index(line[s1+1:], " ") + if s1 < 0 || s2 < 0 { + return + } + s2 += s1 + 1 + return line[:s1], line[s1+1 : s2], line[s2+1:], true +} + +// HTTP2 parses the frame header of the first frame to detect whether the +// connection is an HTTP2 connection. +func HTTP2() Matcher { + return hasHTTP2Preface +} + +// HTTP1HeaderField returns a matcher matching the header fields of the first +// request of an HTTP 1 connection. +func HTTP1HeaderField(name, value string) Matcher { + return func(r io.Reader) bool { + return matchHTTP1Field(r, name, func(gotValue string) bool { + return gotValue == value + }) + } +} + +// HTTP1HeaderFieldPrefix returns a matcher matching the header fields of the +// first request of an HTTP 1 connection. If the header with key name has a +// value prefixed with valuePrefix, this will match. +func HTTP1HeaderFieldPrefix(name, valuePrefix string) Matcher { + return func(r io.Reader) bool { + return matchHTTP1Field(r, name, func(gotValue string) bool { + return strings.HasPrefix(gotValue, valuePrefix) + }) + } +} + +// HTTP2HeaderField returns a matcher matching the header fields of the first +// headers frame. +func HTTP2HeaderField(name, value string) Matcher { + return func(r io.Reader) bool { + return matchHTTP2Field(ioutil.Discard, r, name, func(gotValue string) bool { + return gotValue == value + }) + } +} + +// HTTP2HeaderFieldPrefix returns a matcher matching the header fields of the +// first headers frame. If the header with key name has a value prefixed with +// valuePrefix, this will match. +func HTTP2HeaderFieldPrefix(name, valuePrefix string) Matcher { + return func(r io.Reader) bool { + return matchHTTP2Field(ioutil.Discard, r, name, func(gotValue string) bool { + return strings.HasPrefix(gotValue, valuePrefix) + }) + } +} + +// HTTP2MatchHeaderFieldSendSettings matches the header field and writes the +// settings to the server. Prefer HTTP2HeaderField over this one, if the client +// does not block on receiving a SETTING frame. +func HTTP2MatchHeaderFieldSendSettings(name, value string) MatchWriter { + return func(w io.Writer, r io.Reader) bool { + return matchHTTP2Field(w, r, name, func(gotValue string) bool { + return gotValue == value + }) + } +} + +// HTTP2MatchHeaderFieldPrefixSendSettings matches the header field prefix +// and writes the settings to the server. Prefer HTTP2HeaderFieldPrefix over +// this one, if the client does not block on receiving a SETTING frame. +func HTTP2MatchHeaderFieldPrefixSendSettings(name, valuePrefix string) MatchWriter { + return func(w io.Writer, r io.Reader) bool { + return matchHTTP2Field(w, r, name, func(gotValue string) bool { + return strings.HasPrefix(gotValue, valuePrefix) + }) + } +} + +func hasHTTP2Preface(r io.Reader) bool { + var b [len(http2.ClientPreface)]byte + last := 0 + + for { + n, err := r.Read(b[last:]) + if err != nil { + return false + } + + last += n + eq := string(b[:last]) == http2.ClientPreface[:last] + if last == len(http2.ClientPreface) { + return eq + } + if !eq { + return false + } + } +} + +func matchHTTP1Field(r io.Reader, name string, matches func(string) bool) (matched bool) { + req, err := http.ReadRequest(bufio.NewReader(r)) + if err != nil { + return false + } + + return matches(req.Header.Get(name)) +} + +func matchHTTP2Field(w io.Writer, r io.Reader, name string, matches func(string) bool) (matched bool) { + if !hasHTTP2Preface(r) { + return false + } + + done := false + framer := http2.NewFramer(w, r) + hdec := hpack.NewDecoder(uint32(4<<10), func(hf hpack.HeaderField) { + if hf.Name == name { + done = true + if matches(hf.Value) { + matched = true + } + } + }) + for { + f, err := framer.ReadFrame() + if err != nil { + return false + } + + switch f := f.(type) { + case *http2.SettingsFrame: + if err := framer.WriteSettings(); err != nil { + return false + } + case *http2.ContinuationFrame: + if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil { + return false + } + done = done || f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0 + case *http2.HeadersFrame: + if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil { + return false + } + done = done || f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0 + } + + if done { + return matched + } + } +} diff --git a/vendor/github.com/soheilhy/cmux/patricia.go b/vendor/github.com/soheilhy/cmux/patricia.go new file mode 100644 index 00000000..c3e3d85b --- /dev/null +++ b/vendor/github.com/soheilhy/cmux/patricia.go @@ -0,0 +1,179 @@ +// Copyright 2016 The CMux Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cmux + +import ( + "bytes" + "io" +) + +// patriciaTree is a simple patricia tree that handles []byte instead of string +// and cannot be changed after instantiation. +type patriciaTree struct { + root *ptNode + maxDepth int // max depth of the tree. +} + +func newPatriciaTree(bs ...[]byte) *patriciaTree { + max := 0 + for _, b := range bs { + if max < len(b) { + max = len(b) + } + } + return &patriciaTree{ + root: newNode(bs), + maxDepth: max + 1, + } +} + +func newPatriciaTreeString(strs ...string) *patriciaTree { + b := make([][]byte, len(strs)) + for i, s := range strs { + b[i] = []byte(s) + } + return newPatriciaTree(b...) +} + +func (t *patriciaTree) matchPrefix(r io.Reader) bool { + buf := make([]byte, t.maxDepth) + n, _ := io.ReadFull(r, buf) + return t.root.match(buf[:n], true) +} + +func (t *patriciaTree) match(r io.Reader) bool { + buf := make([]byte, t.maxDepth) + n, _ := io.ReadFull(r, buf) + return t.root.match(buf[:n], false) +} + +type ptNode struct { + prefix []byte + next map[byte]*ptNode + terminal bool +} + +func newNode(strs [][]byte) *ptNode { + if len(strs) == 0 { + return &ptNode{ + prefix: []byte{}, + terminal: true, + } + } + + if len(strs) == 1 { + return &ptNode{ + prefix: strs[0], + terminal: true, + } + } + + p, strs := splitPrefix(strs) + n := &ptNode{ + prefix: p, + } + + nexts := make(map[byte][][]byte) + for _, s := range strs { + if len(s) == 0 { + n.terminal = true + continue + } + nexts[s[0]] = append(nexts[s[0]], s[1:]) + } + + n.next = make(map[byte]*ptNode) + for first, rests := range nexts { + n.next[first] = newNode(rests) + } + + return n +} + +func splitPrefix(bss [][]byte) (prefix []byte, rest [][]byte) { + if len(bss) == 0 || len(bss[0]) == 0 { + return prefix, bss + } + + if len(bss) == 1 { + return bss[0], [][]byte{{}} + } + + for i := 0; ; i++ { + var cur byte + eq := true + for j, b := range bss { + if len(b) <= i { + eq = false + break + } + + if j == 0 { + cur = b[i] + continue + } + + if cur != b[i] { + eq = false + break + } + } + + if !eq { + break + } + + prefix = append(prefix, cur) + } + + rest = make([][]byte, 0, len(bss)) + for _, b := range bss { + rest = append(rest, b[len(prefix):]) + } + + return prefix, rest +} + +func (n *ptNode) match(b []byte, prefix bool) bool { + l := len(n.prefix) + if l > 0 { + if l > len(b) { + l = len(b) + } + if !bytes.Equal(b[:l], n.prefix) { + return false + } + } + + if n.terminal && (prefix || len(n.prefix) == len(b)) { + return true + } + + if l >= len(b) { + return false + } + + nextN, ok := n.next[b[l]] + if !ok { + return false + } + + if l == len(b) { + b = b[l:l] + } else { + b = b[l+1:] + } + return nextN.match(b, prefix) +}