diff --git a/.gitignore b/.gitignore index f24cd99..7da4331 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,8 @@ pip-log.txt #Mr Developer .mr.developer.cfg + +# +config.json +*.json +config.* diff --git a/config.json b/config.json index 7adf48d..61e4f55 100644 --- a/config.json +++ b/config.json @@ -2,7 +2,7 @@ "server":"localhost", "server_port":8388, "local_port":1080, - "password":"barfoo!", + "password":"barfoo!xx", "timeout":600, "method":"table" } diff --git a/shadowsocks/TODO.md b/shadowsocks/TODO.md new file mode 100644 index 0000000..f2bbfc6 --- /dev/null +++ b/shadowsocks/TODO.md @@ -0,0 +1,4 @@ +1. fd 不在 fd_map 中却注册到了 epoll (可能跟 unrigxxxx 的时效有关,明天来再看下 epoll 的示例代码) +2. epoll 出现 MY_POLLEV_ERR 事件 +3. 大部分 SSL 连接时浏览器会报: ERR_SSL_PROTOCOL_ERROR, +但是当尝试把 read() 及 write() 的数据打印出来时又可以正常连接(应该是跟打印输出会消耗一定的时间有关) diff --git a/shadowsocks/ioloop.py b/shadowsocks/ioloop.py new file mode 100644 index 0000000..2a781e3 --- /dev/null +++ b/shadowsocks/ioloop.py @@ -0,0 +1,292 @@ +#coding:utf8 +""" +Created on Oct 24, 2013 + +@author: xing +@ +""" + +import sys +import os +import select +import logging +import socket +import errno +import binascii +import time + +try: + from cStringIO import StringIO +except ImportError, e: + from StringIO import StringIO + +try: + from select import epoll as pollerFact + MY_POLLEV_IN = select.EPOLLIN + MY_POLLEV_PRI = select.EPOLLPRI + MY_POLLEV_OUT = select.EPOLLOUT + MY_POLLEV_ERR = select.EPOLLERR +except ImportError, e: + print >> sys.stderr, e + pollerFact = select.poll + MY_POLLEV_IN = select.POLLIN + MY_POLLEV_PRI = select.POLLPRI + MY_POLLEV_OUT = select.POLLOUT + MY_POLLEV_ERR = select.POLLERR + + +class IOLoopError(Exception): + pass + + +class IOLoop(object): + _instance = None + + @classmethod + def instance(cls): + if not cls._instance: + cls._instance = cls() + return cls._instance + + def __init__(self): + logging.debug('IOLoop.__init__()') + self._fd_map = {} + self._poller = pollerFact() + + def add_handler(self, fd, handler, m_read=False, m_write=False): + if fd in self._fd_map: + raise IOLoopError(u'fd(%s) handler is %s' % (fd, self._fd_map[fd])) + + flags = MY_POLLEV_ERR + if m_read: + flags |= MY_POLLEV_IN | MY_POLLEV_PRI + if m_write: + flags |= MY_POLLEV_OUT + + self._poller.register(fd, flags) + #self._set_nonblocking(fd) + self._fd_map[fd] = handler + logging.debug('len(ioloop._fd_map) = %d', len(self._fd_map)) + + def remove_handler(self, fd): + handler = self._fd_map.pop(fd) + del handler + self._poller.unregister(fd) + logging.debug('unregister %d,current len(ioloop._fd_map) = %d', fd, len(self._fd_map)) + + + def modify_handler(self, fd, handler, m_read=False, m_write=False): + self.remove_handler(fd) + self.add_handler(fd, handler, m_read=False, m_write=False) + + def wait_events(self, timeout): + events_list = self._poller.poll(timeout) + for fd, events in events_list: + if fd not in self._fd_map: + logging.warn('fd %d not in fd_map', fd) + self._poller.unregister(fd) + continue + # logging.info('fd %d, events %d', fd, events) + handler = self._fd_map[fd] + if events & MY_POLLEV_ERR: + # logging.debug("fd[%s] events MY_POLLEV_ERR | MY_POLLEV_HUP", fd) + handler.handle_error(fd, events) + elif events & MY_POLLEV_IN or events & MY_POLLEV_PRI: + # logging.debug("fd[%s] events MY_POLLEV_IN | MY_POLLEV_PRI", fd) + handler.handle_read() + elif events & MY_POLLEV_OUT: + # logging.debug("fd[%s] events MY_POLLEV_OUT", fd) + handler.handle_write() + else: + logging.error("unknow events %d", events) + + #@staticmethod + #def _set_nonblocking(fd): + # flags = fcntl.fcntl(fd, fcntl.F_GETFL) + # fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + +class IOStream(object): + def __init__(self, obj): + self._wbuf = StringIO() + self._fd = obj.fileno() + self._obj = obj + + def read(self, *args, **kwargs): + # logging.debug("IOStream[%s].read()", self._obj.fileno()) + return self._obj.read(*args, **kwargs) + + def write(self, s): + """write to buffer, unit IOHandler.handle_write() call real_write() to write it""" + self._wbuf.write(s) + + def real_write(self): + if self._wbuf.tell() > 0: + self._obj.write(self._wbuf.getvalue()) + self._wbuf.truncate(0) + + def close(self): + return self._obj.close() + + def fileno(self): + return self._fd + + +class SocketStream(IOStream): + def read(self, size): + return self._obj.recv(size) + + def real_write(self): + if self._wbuf.tell() > 0: + data = self._wbuf.getvalue() + try: + self._obj.sendall(data) + except socket.error, _e: + if _e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): + return + + self._wbuf.truncate(0) + + def close(self): + if isinstance(self._obj, socket._socketobject): + self._obj.shutdown(socket.SHUT_RDWR) + self._obj.close() + + +class BaseHandler(object): + monitor_read = True + monitor_write = True + def __init__(self): + raise + + def handle_read(self): + raise + + def handle_write(self): + raise + + def handle_error(self, fd, events): + logging.warn("socket error, fd: %d, events: %d", fd, events) + + +class IOHandler(BaseHandler): + monitor_read = True + monitor_write = True + def __init__(self, ioloop, iostream): + self._ioloop = ioloop + self._ios = iostream + self._fd = self._ios.fileno() + + def handle_read(self): + """fd 可读事件出现""" + # logging.debug("read from fd %s", self._fd) + try: + s = self.do_stream_read() + if len(s) == 0: + logging.debug('iostream[%s].read() return len(s) == 0, close it', self._fd) + self._ioloop.remove_handler(self._fd) + self._ios.close() + return s + except socket.error, _e: + if _e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): + logging.debug('socket error, %s', _e) + return + else: + raise + + def do_stream_read(self, size=None): + # 定义这个方法是为了方便重载 + if size: + return self._ios.read(size) + else: + return self._ios.read() + + def handle_write(self): + """fd 可写事件出现""" + self._ios.real_write() + + def handle_error(self, fd, events): + logging.error("handle_error fd(%s), events: %r", fd, events) + try: + self._ios.close() + except Exception, e: + loggin.error("handle_error() close() exception: %s", e) + + +class SimpleCopyFileHandler(IOHandler): + monitor_read = True + monitor_write = True + def __init__(self, outfile, *args, **kwargs): + super(self.__class__, self).__init__(*args, **kwargs) + self._outfile = outfile + self._outfp = open(self._outfile, 'wb') + self.last_len = 0 + + def handle_read(self): + s = super(self.__class__, self).handle_read() + if s: + self._outfp.write(s) + curr_len = self._outfp.tell() + if curr_len - self.last_len >= 1024*1024: + self._ios.write(str(curr_len/1024/1024)+'M\n') + self.last_len = curr_len + + def do_stream_read(self, size=4096): + # 定义这个方法是为了方便重载 + return self._ios.read(size) + + +class SimpleAcceptHandler(BaseHandler): + monitor_read = True + monitor_write = False + def __init__(self, ioloop, srv_socket): + self._ioloop = ioloop + self._srv_socket = srv_socket + + def handle_read(self): + cli_socket, cli_addr = self._srv_socket.accept() + logging.debug("accept connect[%s] from %s:%s" % ( + cli_socket.fileno(), cli_addr[0], cli_addr[1])) + cli_socket.setblocking(0) + #handler = SimpleCopyFileHandler('/dev/null', self._ioloop, SocketStream(cli_socket)) + handler = SimpleCopyFileHandler('/data/SimpleCopyFileHandler.fd%s.out.txt' % ( + cli_socket.fileno()), self._ioloop, SocketStream(cli_socket)) + self._ioloop.add_handler(cli_socket.fileno(), handler, m_read=True, m_write=False) + + +def test_pipe(): + ioloop = IOLoop() + io_stdin = IOStream(sys.stdin) + + import random + fifo_filename = ''.join([chr(random.randint(0, 25)+ord('A')) for _ in range(10)]) + fifo_filepath = os.path.join('/tmp', fifo_filename) + if not os.path.exists(fifo_filepath): + os.mkfifo(fifo_filepath) + io_pipe = IOStream(open(fifo_filepath, 'rb+')) + import atexit + atexit.register(lambda: os.unlink(fifo_filepath)) + + ioloop.add_handler(io_pipe.fileno(), IOHandler(ioloop, io_pipe), m_read=True, m_write=True) + ioloop.add_handler(io_stdin.fileno(), IOHandler(ioloop, io_stdin), m_read=True, m_write=True) + while True: + ioloop.wait_events(0.1) + + +def test_copyfilehandler(): + ioloop = IOLoop() + import socket + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setblocking(0) + sock.bind(("0.0.0.0", 64433)) + logging.info("listing on %s", str(sock.getsockname())) + sock.listen(1024) + ioloop.add_handler(sock.fileno(), SimpleAcceptHandler(ioloop, sock), m_read=True) + while True: + ioloop.wait_events(0.1) + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + #logging.basicConfig(level=logging.INFO) + test_copyfilehandler() diff --git a/shadowsocks/server-epoll.py b/shadowsocks/server-epoll.py new file mode 100644 index 0000000..177902d --- /dev/null +++ b/shadowsocks/server-epoll.py @@ -0,0 +1,373 @@ +#!/usr/bin/env python +#coding: utf8 + +# Copyright (c) 2014 clowwindy +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import sys +if sys.version_info < (2, 6): + import simplejson as json +else: + import json + +""" +try: + import gevent + import gevent.monkey + gevent.monkey.patch_all(dns=gevent.version_info[0] >= 1) +except ImportError: + gevent = None + print >>sys.stderr, 'warning: gevent not found, using threading instead' +""" + +sys.setrecursionlimit(30) +import socket +import struct +import os +import time +import errno +import logging +import getopt +import encrypt +import utils +import ioloop + +def send_all(sock, data): + bytes_sent = 0 + while True: + r = sock.send(data[bytes_sent:]) + if r < 0: + return r + bytes_sent += r + if bytes_sent == len(data): + return bytes_sent + +def parse_options(): + version = '' + try: + import pkg_resources + version = pkg_resources.get_distribution('shadowsocks').version + except: + pass + print 'shadowsocks %s' % version + + KEY = None + METHOD = None + IPv6 = False + + config_path = utils.find_config() + try: + optlist, args = getopt.getopt(sys.argv[1:], 's:p:k:m:c:6') + for key, value in optlist: + if key == '-c': + config_path = value + + if config_path: + logging.info('loading config from %s' % config_path) + try: + f = open(config_path, 'rb') + config = json.load(f) + except ValueError as e: + logging.error('found an error in config.json: %s', e.message) + sys.exit(1) + else: + config = {} + + optlist, args = getopt.getopt(sys.argv[1:], 's:p:k:m:c:6') + for key, value in optlist: + if key == '-p': + config['server_port'] = int(value) + elif key == '-k': + config['password'] = value + elif key == '-s': + config['server'] = value + elif key == '-m': + config['method'] = value + elif key == '-6': + IPv6 = True + except getopt.GetoptError: + utils.print_server_help() + sys.exit(2) + + return config + +class TunnelStream(ioloop.SocketStream): + def read(self, *args, **kwargs): + s = ioloop.SocketStream.read(self, *args, **kwargs) + return s + def real_write(self, *args, **kwargs): + ioloop.SocketStream.real_write(self, *args, **kwargs) + +class BaseTunnelHandler(ioloop.IOHandler): + def __init__(self, *args, **kwargs): + ioloop.IOHandler.__init__(self, *args, **kwargs) + self.encryptor = encrypt.Encryptor(G_CONFIG["password"], G_CONFIG["method"]) + self._remote_ios = None + self._rs_connecting = False + + def encrypt(self, data): + return self.encryptor.encrypt(data) + + def decrypt(self, data): + return self.encryptor.decrypt(data) + + def close_tunnel(self): + if self._remote_ios: + logging.debug('!!!!!!!!!!! close remote ios %d', self._remote_ios.fileno()) + self._ioloop.remove_handler(self._remote_ios.fileno()) + self._remote_ios._obj.close() + + logging.debug('!!!!!!!!!!! close local ios %d', self._ios.fileno()) + self._ioloop.remove_handler(self._ios.fileno()) + self._ios.close() + + def left_to_local(self, data): + """解密""" + return self.decrypt(data) + + def local_to_left(self, data): + """加密""" + return self.encrypt(data) + + def do_stream_read(self): + raise + + def write_to_remote(self, data): + raise + + def connect_to_remote(self): + raise + + def set_remote_ts(self, sock): + raise + + def handle_read(self): + """fd 可读事件出现""" + # logging.info("%r, remote_ios: %r, _rs_connecting: %r", self, self._remote_ios, self._rs_connecting) + if not self._remote_ios: + if not self._rs_connecting: + self.connect_to_remote() + return + + logging.debug("handle_read(), local:%d, remote:%d, Handler:%r", + self._ios.fileno(), self._remote_ios.fileno(), self) + try: + _s = time.time() + s = self.do_stream_read() + # logging.debug('do_stream_read() cast time %f', time.time()-_s) + if len(s) == 0: + logging.debug('iostream[%s].read() return len(s) == 0, close it', self._fd) + self.close_tunnel() + + _s = time.time() + self.write_to_remote(s) + # logging.debug('write_to_remote() cast time %f', time.time()-_s) + return + + except socket.error, _e: + if _e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): + # logging.debug('socket error, %s', _e) + return + else: + raise + +class LeftTunnelHandler(BaseTunnelHandler): + def __init__(self, *args, **kwargs): + super(self.__class__, self).__init__(*args, **kwargs) + self._remote_ios = None + + def set_remote_ios(self, sock_stream): + self._remote_ios = sock_stream + logging.info("self._remote_ios: %r", self._remote_ios) + + def connect_to_remote(self): + rfile = self._ios + iv_len = self.encryptor.iv_len() + if iv_len: + self.decrypt(rfile.read(iv_len)) + addrtype = ord(self.decrypt(rfile.read(1))) + if addrtype == 1: + addr = socket.inet_ntoa(self.decrypt(rfile.read(4))) + elif addrtype == 3: + addr = self.decrypt(rfile.read(ord(self.decrypt(rfile.read(1))))) + elif addrtype == 4: + addr = socket.inet_ntop(socket.AF_INET6, self.decrypt(rfile.read(16))) + else: + # not supported + logging.warn('addr_type not supported, maybe wrong password') + return None + port = struct.unpack('>H', self.decrypt(rfile.read(2)))[0] + try: + logging.info('connecting to remote %s:%d', addr, port) + _start_time = time.time() + remote_socket = socket.socket() + remote_socket.setblocking(0) + + try: + remote_socket.connect((addr, port)) + except socket.error, _e: + if _e.errno != errno.EINPROGRESS: + raise _e + + logging.info('socket.connect() cost time: %f', time.time()-_start_time) + except socket.error, e: + # Connection refused + logging.warn(e) + return None + + remote_ts = TunnelStream(remote_socket) + handler = ShadowConnectHandler(self._ioloop, self, remote_ts) + self._ioloop.add_handler(remote_ts.fileno(), handler, m_read=True, m_write=True) + self._rs_connecting = True + return None + + def do_stream_read(self, size=4096): + """从客户端读""" + return self.left_to_local(self._ios.read(size)) + + def write_to_remote(self, data): + """发送到目标服务器""" + return self._remote_ios.write(data) + +class RightTunnelHandler(BaseTunnelHandler): + def __init__(self, remote_ios, *args, **kwargs): + super(self.__class__, self).__init__(*args, **kwargs) + self._remote_ios = remote_ios + + def do_stream_read(self, size=4096): + """从目标服务器读""" + data = self._ios.read(size) + # logging.debug('recv from right: %s', list(data)) + return data + + def write_to_remote(self, data): + """发送到客户端""" + data = self.local_to_left(data) + # logging.debug('send to left: %s', list(data)) + self._remote_ios.write(data) + +class ShadowConnectHandler(ioloop.BaseHandler): + def __init__(self, _ioloop, left_handler, right_ts): + self._ioloop = _ioloop + self._left_handler = left_handler + self._left_ts = self._left_handler._ios + self._right_ts = right_ts + def handle_write(self): + self.handle_connect_res() + + def handle_read(self): + self.handle_connect_res() + + def handle_connect_res(self): + self._left_handler.set_remote_ios(self._right_ts) + print self._left_handler._remote_ios + + handler = RightTunnelHandler( self._left_ts, self._ioloop, self._right_ts) + self._ioloop.modify_handler( self._right_ts.fileno(), handler, m_read=True, m_write=True) + + logging.info('New tunnel (%d,%d) <=> (%d,%d)' % ( + self._left_handler._ios.fileno(), self._left_handler._remote_ios.fileno(), + handler._ios.fileno(), handler._remote_ios.fileno(), + )) + +class ShadowAcceptHandler(ioloop.BaseHandler): + def __init__(self, _ioloop, srv_socket): + self._ioloop = _ioloop + self._srv_socket = srv_socket + + def handle_read(self): + cli_socket, cli_addr = self._srv_socket.accept() + logging.debug("accept connect[%s] from %s:%s" % ( + cli_socket.fileno(), cli_addr[0], cli_addr[1])) + cli_socket.setblocking(0) + ts = TunnelStream(cli_socket) + handler = LeftTunnelHandler( self._ioloop, ts) + self._ioloop.add_handler(cli_socket.fileno(), handler, m_read=True, m_write=True) + +def main(): + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(levelname)-8s # %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', filemode='a+') + + config = parse_options() + + SERVER = config['server'] + PORT = config['server_port'] + KEY = config['password'] + + config['method'] = config.get('method', None) + METHOD = config.get('method') + + config['port_password'] = config.get('port_password', None) + PORTPASSWORD = config.get('port_password') + + config['timeout'] = config.get('timeout', 600) + + if not KEY and not config_path: + sys.exit('config not specified, please read https://github.com/clowwindy/shadowsocks') + + utils.check_config(config) + + global G_CONFIG + G_CONFIG = config + + if PORTPASSWORD: + if PORT or KEY: + logging.warn('warning: port_password should not be used with server_port and password. server_port and password will be ignored') + else: + PORTPASSWORD = {} + PORTPASSWORD[str(PORT)] = KEY + + encrypt.init_table(KEY, METHOD) + + io = ioloop.IOLoop() + import socket + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setblocking(0) + sock.bind((SERVER, PORT)) + logging.info("listing on %s", str(sock.getsockname())) + sock.listen(1024) + io.add_handler(sock.fileno(), ShadowAcceptHandler(io, sock), m_read=True) + next_tick = time.time() + 10 + count = 0 + while True: + count += 1 + if time.time() >= next_tick: + logging.info("loop count %d", count) + next_tick = time.time() + 10 + pass + _s = time.time() + io.wait_events(0.1) + use_time = time.time() - _s + if use_time > 0.2: + logging.error("events process cost time: %f", _e-_s) + elif use_time < 0.1: + time.sleep(0.1-use_time) + +global G_CONFIG +if __name__ == '__main__': + while 1: + try: + main() + except (socket.error, ioloop.IOLoopError), e: + import traceback + logging.error(traceback.format_exc()) + break