diff --git a/proxy/MAINTAINERS b/proxy/MAINTAINERS new file mode 100644 index 0000000..1e998f8 --- /dev/null +++ b/proxy/MAINTAINERS @@ -0,0 +1 @@ +Michael Crosby (@crosbymichael) diff --git a/proxy/network_proxy_test.go b/proxy/network_proxy_test.go new file mode 100644 index 0000000..9e38256 --- /dev/null +++ b/proxy/network_proxy_test.go @@ -0,0 +1,216 @@ +package proxy + +import ( + "bytes" + "fmt" + "io" + "net" + "strings" + "testing" + "time" +) + +var testBuf = []byte("Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo") +var testBufSize = len(testBuf) + +type EchoServer interface { + Run() + Close() + LocalAddr() net.Addr +} + +type TCPEchoServer struct { + listener net.Listener + testCtx *testing.T +} + +type UDPEchoServer struct { + conn net.PacketConn + testCtx *testing.T +} + +func NewEchoServer(t *testing.T, proto, address string) EchoServer { + var server EchoServer + if strings.HasPrefix(proto, "tcp") { + listener, err := net.Listen(proto, address) + if err != nil { + t.Fatal(err) + } + server = &TCPEchoServer{listener: listener, testCtx: t} + } else { + socket, err := net.ListenPacket(proto, address) + if err != nil { + t.Fatal(err) + } + server = &UDPEchoServer{conn: socket, testCtx: t} + } + return server +} + +func (server *TCPEchoServer) Run() { + go func() { + for { + client, err := server.listener.Accept() + if err != nil { + return + } + go func(client net.Conn) { + if _, err := io.Copy(client, client); err != nil { + server.testCtx.Logf("can't echo to the client: %v\n", err.Error()) + } + client.Close() + }(client) + } + }() +} + +func (server *TCPEchoServer) LocalAddr() net.Addr { return server.listener.Addr() } +func (server *TCPEchoServer) Close() { server.listener.Addr() } + +func (server *UDPEchoServer) Run() { + go func() { + readBuf := make([]byte, 1024) + for { + read, from, err := server.conn.ReadFrom(readBuf) + if err != nil { + return + } + for i := 0; i != read; { + written, err := server.conn.WriteTo(readBuf[i:read], from) + if err != nil { + break + } + i += written + } + } + }() +} + +func (server *UDPEchoServer) LocalAddr() net.Addr { return server.conn.LocalAddr() } +func (server *UDPEchoServer) Close() { server.conn.Close() } + +func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string) { + defer proxy.Close() + go proxy.Run() + client, err := net.Dial(proto, addr) + if err != nil { + t.Fatalf("Can't connect to the proxy: %v", err) + } + defer client.Close() + client.SetDeadline(time.Now().Add(10 * time.Second)) + if _, err = client.Write(testBuf); err != nil { + t.Fatal(err) + } + recvBuf := make([]byte, testBufSize) + if _, err = client.Read(recvBuf); err != nil { + t.Fatal(err) + } + if !bytes.Equal(testBuf, recvBuf) { + t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf)) + } +} + +func testProxy(t *testing.T, proto string, proxy Proxy) { + testProxyAt(t, proto, proxy, proxy.FrontendAddr().String()) +} + +func TestTCP4Proxy(t *testing.T) { + backend := NewEchoServer(t, "tcp", "127.0.0.1:0") + defer backend.Close() + backend.Run() + frontendAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + proxy, err := NewProxy(frontendAddr, backend.LocalAddr()) + if err != nil { + t.Fatal(err) + } + testProxy(t, "tcp", proxy) +} + +func TestTCP6Proxy(t *testing.T) { + backend := NewEchoServer(t, "tcp", "[::1]:0") + defer backend.Close() + backend.Run() + frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0} + proxy, err := NewProxy(frontendAddr, backend.LocalAddr()) + if err != nil { + t.Fatal(err) + } + testProxy(t, "tcp", proxy) +} + +func TestTCPDualStackProxy(t *testing.T) { + // If I understand `godoc -src net favoriteAddrFamily` (used by the + // net.Listen* functions) correctly this should work, but it doesn't. + t.Skip("No support for dual stack yet") + backend := NewEchoServer(t, "tcp", "[::1]:0") + defer backend.Close() + backend.Run() + frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0} + proxy, err := NewProxy(frontendAddr, backend.LocalAddr()) + if err != nil { + t.Fatal(err) + } + ipv4ProxyAddr := &net.TCPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: proxy.FrontendAddr().(*net.TCPAddr).Port, + } + testProxyAt(t, "tcp", proxy, ipv4ProxyAddr.String()) +} + +func TestUDP4Proxy(t *testing.T) { + backend := NewEchoServer(t, "udp", "127.0.0.1:0") + defer backend.Close() + backend.Run() + frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + proxy, err := NewProxy(frontendAddr, backend.LocalAddr()) + if err != nil { + t.Fatal(err) + } + testProxy(t, "udp", proxy) +} + +func TestUDP6Proxy(t *testing.T) { + backend := NewEchoServer(t, "udp", "[::1]:0") + defer backend.Close() + backend.Run() + frontendAddr := &net.UDPAddr{IP: net.IPv6loopback, Port: 0} + proxy, err := NewProxy(frontendAddr, backend.LocalAddr()) + if err != nil { + t.Fatal(err) + } + testProxy(t, "udp", proxy) +} + +func TestUDPWriteError(t *testing.T) { + frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + // Hopefully, this port will be free: */ + backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 25587} + proxy, err := NewProxy(frontendAddr, backendAddr) + if err != nil { + t.Fatal(err) + } + defer proxy.Close() + go proxy.Run() + client, err := net.Dial("udp", "127.0.0.1:25587") + if err != nil { + t.Fatalf("Can't connect to the proxy: %v", err) + } + defer client.Close() + // Make sure the proxy doesn't stop when there is no actual backend: + client.Write(testBuf) + client.Write(testBuf) + backend := NewEchoServer(t, "udp", "127.0.0.1:25587") + defer backend.Close() + backend.Run() + client.SetDeadline(time.Now().Add(10 * time.Second)) + if _, err = client.Write(testBuf); err != nil { + t.Fatal(err) + } + recvBuf := make([]byte, testBufSize) + if _, err = client.Read(recvBuf); err != nil { + t.Fatal(err) + } + if !bytes.Equal(testBuf, recvBuf) { + t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf)) + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000..7a711f6 --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,29 @@ +package proxy + +import ( + "fmt" + "net" +) + +type Proxy interface { + // Start forwarding traffic back and forth the front and back-end + // addresses. + Run() + // Stop forwarding traffic and close both ends of the Proxy. + Close() + // Return the address on which the proxy is listening. + FrontendAddr() net.Addr + // Return the proxied address. + BackendAddr() net.Addr +} + +func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) { + switch frontendAddr.(type) { + case *net.UDPAddr: + return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr)) + case *net.TCPAddr: + return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr)) + default: + panic(fmt.Errorf("Unsupported protocol")) + } +} diff --git a/proxy/stub_proxy.go b/proxy/stub_proxy.go new file mode 100644 index 0000000..7684427 --- /dev/null +++ b/proxy/stub_proxy.go @@ -0,0 +1,22 @@ +package proxy + +import ( + "net" +) + +type StubProxy struct { + frontendAddr net.Addr + backendAddr net.Addr +} + +func (p *StubProxy) Run() {} +func (p *StubProxy) Close() {} +func (p *StubProxy) FrontendAddr() net.Addr { return p.frontendAddr } +func (p *StubProxy) BackendAddr() net.Addr { return p.backendAddr } + +func NewStubProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) { + return &StubProxy{ + frontendAddr: frontendAddr, + backendAddr: backendAddr, + }, nil +} diff --git a/proxy/tcp_proxy.go b/proxy/tcp_proxy.go new file mode 100644 index 0000000..b84483e --- /dev/null +++ b/proxy/tcp_proxy.go @@ -0,0 +1,93 @@ +package proxy + +import ( + "io" + "log" + "net" + "syscall" +) + +type TCPProxy struct { + listener *net.TCPListener + frontendAddr *net.TCPAddr + backendAddr *net.TCPAddr +} + +func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) { + listener, err := net.ListenTCP("tcp", frontendAddr) + if err != nil { + return nil, err + } + // If the port in frontendAddr was 0 then ListenTCP will have a picked + // a port to listen on, hence the call to Addr to get that actual port: + return &TCPProxy{ + listener: listener, + frontendAddr: listener.Addr().(*net.TCPAddr), + backendAddr: backendAddr, + }, nil +} + +func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) { + backend, err := net.DialTCP("tcp", nil, proxy.backendAddr) + if err != nil { + log.Printf("Can't forward traffic to backend tcp/%v: %v\n", proxy.backendAddr, err.Error()) + client.Close() + return + } + + event := make(chan int64) + var broker = func(to, from *net.TCPConn) { + written, err := io.Copy(to, from) + if err != nil { + // If the socket we are writing to is shutdown with + // SHUT_WR, forward it to the other end of the pipe: + if err, ok := err.(*net.OpError); ok && err.Err == syscall.EPIPE { + from.CloseWrite() + } + } + to.CloseRead() + event <- written + } + + log.Printf("Forwarding traffic between tcp/%v and tcp/%v", client.RemoteAddr(), backend.RemoteAddr()) + go broker(client, backend) + go broker(backend, client) + + var transferred int64 = 0 + for i := 0; i < 2; i++ { + select { + case written := <-event: + transferred += written + case <-quit: + // Interrupt the two brokers and "join" them. + client.Close() + backend.Close() + for ; i < 2; i++ { + transferred += <-event + } + goto done + } + } + client.Close() + backend.Close() +done: + log.Printf("%v bytes transferred between tcp/%v and tcp/%v", transferred, client.RemoteAddr(), backend.RemoteAddr()) +} + +func (proxy *TCPProxy) Run() { + quit := make(chan bool) + defer close(quit) + log.Printf("Starting proxy on tcp/%v for tcp/%v", proxy.frontendAddr, proxy.backendAddr) + for { + client, err := proxy.listener.Accept() + if err != nil { + log.Printf("Stopping proxy on tcp/%v for tcp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error()) + return + } + go proxy.clientLoop(client.(*net.TCPConn), quit) + } +} + +func (proxy *TCPProxy) Close() { proxy.listener.Close() } +func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr } +func (proxy *TCPProxy) BackendAddr() net.Addr { return proxy.backendAddr } diff --git a/proxy/udp_proxy.go b/proxy/udp_proxy.go new file mode 100644 index 0000000..9395516 --- /dev/null +++ b/proxy/udp_proxy.go @@ -0,0 +1,162 @@ +package proxy + +import ( + "encoding/binary" + "log" + "net" + "strings" + "sync" + "syscall" + "time" +) + +const ( + UDPConnTrackTimeout = 90 * time.Second + UDPBufSize = 2048 +) + +// A net.Addr where the IP is split into two fields so you can use it as a key +// in a map: +type connTrackKey struct { + IPHigh uint64 + IPLow uint64 + Port int +} + +func newConnTrackKey(addr *net.UDPAddr) *connTrackKey { + if len(addr.IP) == net.IPv4len { + return &connTrackKey{ + IPHigh: 0, + IPLow: uint64(binary.BigEndian.Uint32(addr.IP)), + Port: addr.Port, + } + } + return &connTrackKey{ + IPHigh: binary.BigEndian.Uint64(addr.IP[:8]), + IPLow: binary.BigEndian.Uint64(addr.IP[8:]), + Port: addr.Port, + } +} + +type connTrackMap map[connTrackKey]*net.UDPConn + +type UDPProxy struct { + listener *net.UDPConn + frontendAddr *net.UDPAddr + backendAddr *net.UDPAddr + connTrackTable connTrackMap + connTrackLock sync.Mutex +} + +func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) { + listener, err := net.ListenUDP("udp", frontendAddr) + if err != nil { + return nil, err + } + return &UDPProxy{ + listener: listener, + frontendAddr: listener.LocalAddr().(*net.UDPAddr), + backendAddr: backendAddr, + connTrackTable: make(connTrackMap), + }, nil +} + +func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) { + defer func() { + proxy.connTrackLock.Lock() + delete(proxy.connTrackTable, *clientKey) + proxy.connTrackLock.Unlock() + log.Printf("Done proxying between udp/%v and udp/%v", clientAddr.String(), proxy.backendAddr.String()) + proxyConn.Close() + }() + + readBuf := make([]byte, UDPBufSize) + for { + proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout)) + again: + read, err := proxyConn.Read(readBuf) + if err != nil { + if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED { + // This will happen if the last write failed + // (e.g: nothing is actually listening on the + // proxied port on the container), ignore it + // and continue until UDPConnTrackTimeout + // expires: + goto again + } + return + } + for i := 0; i != read; { + written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr) + if err != nil { + return + } + i += written + log.Printf("Forwarded %v/%v bytes to udp/%v", i, read, clientAddr.String()) + } + } +} + +func (proxy *UDPProxy) Run() { + readBuf := make([]byte, UDPBufSize) + log.Printf("Starting proxy on udp/%v for udp/%v", proxy.frontendAddr, proxy.backendAddr) + for { + read, from, err := proxy.listener.ReadFromUDP(readBuf) + if err != nil { + // NOTE: Apparently ReadFrom doesn't return + // ECONNREFUSED like Read do (see comment in + // UDPProxy.replyLoop) + if isClosedError(err) { + log.Printf("Stopping proxy on udp/%v for udp/%v (socket was closed)", proxy.frontendAddr, proxy.backendAddr) + } else { + log.Printf("Stopping proxy on udp/%v for udp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error()) + } + break + } + + fromKey := newConnTrackKey(from) + proxy.connTrackLock.Lock() + proxyConn, hit := proxy.connTrackTable[*fromKey] + if !hit { + proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr) + if err != nil { + log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err) + continue + } + proxy.connTrackTable[*fromKey] = proxyConn + go proxy.replyLoop(proxyConn, from, fromKey) + } + proxy.connTrackLock.Unlock() + for i := 0; i != read; { + written, err := proxyConn.Write(readBuf[i:read]) + if err != nil { + log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err) + break + } + i += written + log.Printf("Forwarded %v/%v bytes to udp/%v", i, read, proxy.backendAddr.String()) + } + } +} + +func (proxy *UDPProxy) Close() { + proxy.listener.Close() + proxy.connTrackLock.Lock() + defer proxy.connTrackLock.Unlock() + for _, conn := range proxy.connTrackTable { + conn.Close() + } +} + +func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr } +func (proxy *UDPProxy) BackendAddr() net.Addr { return proxy.backendAddr } + +func isClosedError(err error) bool { + /* This comparison is ugly, but unfortunately, net.go doesn't export errClosing. + * See: + * http://golang.org/src/pkg/net/net.go + * https://code.google.com/p/go/issues/detail?id=4337 + * https://groups.google.com/forum/#!msg/golang-nuts/0_aaCvBmOcM/SptmDyX1XJMJ + */ + return strings.HasSuffix(err.Error(), "use of closed network connection") +}