diff --git a/shadowsocks/manager.py b/shadowsocks/manager.py index 96dcb8b..767c003 100644 --- a/shadowsocks/manager.py +++ b/shadowsocks/manager.py @@ -28,7 +28,8 @@ import collections from shadowsocks import common, eventloop, tcprelay, udprelay, asyncdns, shell -BUF_SIZE = 2048 +BUF_SIZE = 1506 +STAT_SEND_LIMIT = 100 class Manager(object): @@ -44,6 +45,7 @@ class Manager(object): self._statistics = collections.defaultdict(int) self._control_client_addr = None try: + # TODO use address instead of port self._control_socket.bind(('127.0.0.1', int(config['manager_port']))) self._control_socket.setblocking(False) @@ -53,6 +55,7 @@ class Manager(object): exit(1) self._loop.add(self._control_socket, eventloop.POLL_IN, self) + self._loop.add_periodic(self.handle_periodic) port_password = config['port_password'] del config['port_password'] @@ -70,8 +73,10 @@ class Manager(object): port)) return logging.info("adding server at %s:%d" % (config['server'], port)) - t = tcprelay.TCPRelay(config, self._dns_resolver, False) - u = udprelay.UDPRelay(config, self._dns_resolver, False) + t = tcprelay.TCPRelay(config, self._dns_resolver, False, + self.stat_callback) + u = udprelay.UDPRelay(config, self._dns_resolver, False, + self.stat_callback) t.add_to_loop(self._loop) u.add_to_loop(self._loop) self._relays[port] = (t, u) @@ -126,9 +131,27 @@ class Manager(object): logging.error(e) return None + def stat_callback(self, port, data_len): + self._statistics[port] += data_len + def handle_periodic(self): - # TODO send statistics - pass + r = {} + i = 0 + + def send_data(data_dict): + if data_dict: + data = common.to_bytes(json.dumps(data_dict, + separators=(',', ':'))) + self._send_control_data(b'stat: ' + data) + + for k, v in self._statistics.items(): + r[k] = v + i += 1 + if i >= STAT_SEND_LIMIT: + send_data(r) + r.clear() + send_data(r) + self._statistics.clear() def _send_control_data(self, data): if self._control_client_addr: diff --git a/shadowsocks/tcprelay.py b/shadowsocks/tcprelay.py index e14dcaa..d11af31 100644 --- a/shadowsocks/tcprelay.py +++ b/shadowsocks/tcprelay.py @@ -147,10 +147,10 @@ class TCPRelayHandler(object): logging.debug('chosen server: %s:%d', server, server_port) return server, server_port - def _update_activity(self): + def _update_activity(self, data_len=0): # tell the TCP Relay we have activities recently # else it will think we are inactive and timed out - self._server.update_activity(self) + self._server.update_activity(self, data_len) def _update_stream(self, stream, status): # update a stream to a new waiting status @@ -317,7 +317,6 @@ class TCPRelayHandler(object): self._log_error(e) if self._config['verbose']: traceback.print_exc() - # TODO use logging when debug completed self.destroy() def _create_remote_socket(self, ip, port): @@ -388,7 +387,6 @@ class TCPRelayHandler(object): def _on_local_read(self): # handle all local read events and dispatch them to methods for # each stage - self._update_activity() if not self._local_sock: return is_local = self._is_local @@ -402,6 +400,7 @@ class TCPRelayHandler(object): if not data: self.destroy() return + self._update_activity(len(data)) if not is_local: data = self._encryptor.decrypt(data) if not data: @@ -424,10 +423,10 @@ class TCPRelayHandler(object): def _on_remote_read(self): # handle all remote read events - self._update_activity() data = None try: data = self._remote_sock.recv(BUF_SIZE) + except (OSError, IOError) as e: if eventloop.errno_from_exception(e) in \ (errno.ETIMEDOUT, errno.EAGAIN, errno.EWOULDBLOCK): @@ -435,6 +434,7 @@ class TCPRelayHandler(object): if not data: self.destroy() return + self._update_activity(len(data)) if self._is_local: data = self._encryptor.decrypt(data) else: @@ -549,7 +549,7 @@ class TCPRelayHandler(object): class TCPRelay(object): - def __init__(self, config, dns_resolver, is_local): + def __init__(self, config, dns_resolver, is_local, stat_callback=None): self._config = config self._is_local = is_local self._dns_resolver = dns_resolver @@ -589,6 +589,7 @@ class TCPRelay(object): self._config['fast_open'] = False server_socket.listen(1024) self._server_socket = server_socket + self._stat_callback = stat_callback def add_to_loop(self, loop): if self._eventloop: @@ -607,7 +608,10 @@ class TCPRelay(object): self._timeouts[index] = None del self._handler_to_timeouts[hash(handler)] - def update_activity(self, handler): + def update_activity(self, handler, data_len): + if data_len and self._stat_callback: + self._stat_callback(self._listen_port, data_len) + # set handler to active now = int(time.time()) if now - handler.last_activity < eventloop.TIMEOUT_PRECISION: diff --git a/shadowsocks/udprelay.py b/shadowsocks/udprelay.py index a90df9f..a36f981 100644 --- a/shadowsocks/udprelay.py +++ b/shadowsocks/udprelay.py @@ -81,7 +81,7 @@ def client_key(source_addr, server_af): class UDPRelay(object): - def __init__(self, config, dns_resolver, is_local): + def __init__(self, config, dns_resolver, is_local, stat_callback=None): self._config = config if is_local: self._listen_addr = config['local_address'] @@ -121,6 +121,7 @@ class UDPRelay(object): server_socket.bind((self._listen_addr, self._listen_port)) server_socket.setblocking(False) self._server_socket = server_socket + self._stat_callback = stat_callback def _get_a_server(self): server = self._config['server'] @@ -146,6 +147,8 @@ class UDPRelay(object): data, r_addr = server.recvfrom(BUF_SIZE) if not data: logging.debug('UDP handle_server: data is empty') + if self._stat_callback: + self._stat_callback(self._listen_port, len(data)) if self._is_local: frag = common.ord(data[2]) if frag != 0: @@ -181,7 +184,6 @@ class UDPRelay(object): af, socktype, proto, canonname, sa = addrs[0] key = client_key(r_addr, af) - logging.debug(key) client = self._cache.get(key, None) if not client: # TODO async getaddrinfo @@ -221,6 +223,8 @@ class UDPRelay(object): if not data: logging.debug('UDP handle_client: data is empty') return + if self._stat_callback: + self._stat_callback(self._listen_port, len(data)) if not self._is_local: addrlen = len(r_addr[0]) if addrlen > 255: