add statistics

This commit is contained in:
clowwindy 2015-08-05 22:43:21 +08:00
parent e08845d6f3
commit 9c3af61433
3 changed files with 45 additions and 14 deletions

View file

@ -28,7 +28,8 @@ import collections
from shadowsocks import common, eventloop, tcprelay, udprelay, asyncdns, shell from shadowsocks import common, eventloop, tcprelay, udprelay, asyncdns, shell
BUF_SIZE = 2048 BUF_SIZE = 1506
STAT_SEND_LIMIT = 100
class Manager(object): class Manager(object):
@ -44,6 +45,7 @@ class Manager(object):
self._statistics = collections.defaultdict(int) self._statistics = collections.defaultdict(int)
self._control_client_addr = None self._control_client_addr = None
try: try:
# TODO use address instead of port
self._control_socket.bind(('127.0.0.1', self._control_socket.bind(('127.0.0.1',
int(config['manager_port']))) int(config['manager_port'])))
self._control_socket.setblocking(False) self._control_socket.setblocking(False)
@ -53,6 +55,7 @@ class Manager(object):
exit(1) exit(1)
self._loop.add(self._control_socket, self._loop.add(self._control_socket,
eventloop.POLL_IN, self) eventloop.POLL_IN, self)
self._loop.add_periodic(self.handle_periodic)
port_password = config['port_password'] port_password = config['port_password']
del config['port_password'] del config['port_password']
@ -70,8 +73,10 @@ class Manager(object):
port)) port))
return return
logging.info("adding server at %s:%d" % (config['server'], port)) logging.info("adding server at %s:%d" % (config['server'], port))
t = tcprelay.TCPRelay(config, self._dns_resolver, False) t = tcprelay.TCPRelay(config, self._dns_resolver, False,
u = udprelay.UDPRelay(config, self._dns_resolver, False) self.stat_callback)
u = udprelay.UDPRelay(config, self._dns_resolver, False,
self.stat_callback)
t.add_to_loop(self._loop) t.add_to_loop(self._loop)
u.add_to_loop(self._loop) u.add_to_loop(self._loop)
self._relays[port] = (t, u) self._relays[port] = (t, u)
@ -126,9 +131,27 @@ class Manager(object):
logging.error(e) logging.error(e)
return None return None
def stat_callback(self, port, data_len):
self._statistics[port] += data_len
def handle_periodic(self): def handle_periodic(self):
# TODO send statistics r = {}
pass i = 0
def send_data(data_dict):
if data_dict:
data = common.to_bytes(json.dumps(data_dict,
separators=(',', ':')))
self._send_control_data(b'stat: ' + data)
for k, v in self._statistics.items():
r[k] = v
i += 1
if i >= STAT_SEND_LIMIT:
send_data(r)
r.clear()
send_data(r)
self._statistics.clear()
def _send_control_data(self, data): def _send_control_data(self, data):
if self._control_client_addr: if self._control_client_addr:

View file

@ -147,10 +147,10 @@ class TCPRelayHandler(object):
logging.debug('chosen server: %s:%d', server, server_port) logging.debug('chosen server: %s:%d', server, server_port)
return server, server_port return server, server_port
def _update_activity(self): def _update_activity(self, data_len=0):
# tell the TCP Relay we have activities recently # tell the TCP Relay we have activities recently
# else it will think we are inactive and timed out # else it will think we are inactive and timed out
self._server.update_activity(self) self._server.update_activity(self, data_len)
def _update_stream(self, stream, status): def _update_stream(self, stream, status):
# update a stream to a new waiting status # update a stream to a new waiting status
@ -317,7 +317,6 @@ class TCPRelayHandler(object):
self._log_error(e) self._log_error(e)
if self._config['verbose']: if self._config['verbose']:
traceback.print_exc() traceback.print_exc()
# TODO use logging when debug completed
self.destroy() self.destroy()
def _create_remote_socket(self, ip, port): def _create_remote_socket(self, ip, port):
@ -388,7 +387,6 @@ class TCPRelayHandler(object):
def _on_local_read(self): def _on_local_read(self):
# handle all local read events and dispatch them to methods for # handle all local read events and dispatch them to methods for
# each stage # each stage
self._update_activity()
if not self._local_sock: if not self._local_sock:
return return
is_local = self._is_local is_local = self._is_local
@ -402,6 +400,7 @@ class TCPRelayHandler(object):
if not data: if not data:
self.destroy() self.destroy()
return return
self._update_activity(len(data))
if not is_local: if not is_local:
data = self._encryptor.decrypt(data) data = self._encryptor.decrypt(data)
if not data: if not data:
@ -424,10 +423,10 @@ class TCPRelayHandler(object):
def _on_remote_read(self): def _on_remote_read(self):
# handle all remote read events # handle all remote read events
self._update_activity()
data = None data = None
try: try:
data = self._remote_sock.recv(BUF_SIZE) data = self._remote_sock.recv(BUF_SIZE)
except (OSError, IOError) as e: except (OSError, IOError) as e:
if eventloop.errno_from_exception(e) in \ if eventloop.errno_from_exception(e) in \
(errno.ETIMEDOUT, errno.EAGAIN, errno.EWOULDBLOCK): (errno.ETIMEDOUT, errno.EAGAIN, errno.EWOULDBLOCK):
@ -435,6 +434,7 @@ class TCPRelayHandler(object):
if not data: if not data:
self.destroy() self.destroy()
return return
self._update_activity(len(data))
if self._is_local: if self._is_local:
data = self._encryptor.decrypt(data) data = self._encryptor.decrypt(data)
else: else:
@ -549,7 +549,7 @@ class TCPRelayHandler(object):
class TCPRelay(object): class TCPRelay(object):
def __init__(self, config, dns_resolver, is_local): def __init__(self, config, dns_resolver, is_local, stat_callback=None):
self._config = config self._config = config
self._is_local = is_local self._is_local = is_local
self._dns_resolver = dns_resolver self._dns_resolver = dns_resolver
@ -589,6 +589,7 @@ class TCPRelay(object):
self._config['fast_open'] = False self._config['fast_open'] = False
server_socket.listen(1024) server_socket.listen(1024)
self._server_socket = server_socket self._server_socket = server_socket
self._stat_callback = stat_callback
def add_to_loop(self, loop): def add_to_loop(self, loop):
if self._eventloop: if self._eventloop:
@ -607,7 +608,10 @@ class TCPRelay(object):
self._timeouts[index] = None self._timeouts[index] = None
del self._handler_to_timeouts[hash(handler)] del self._handler_to_timeouts[hash(handler)]
def update_activity(self, handler): def update_activity(self, handler, data_len):
if data_len and self._stat_callback:
self._stat_callback(self._listen_port, data_len)
# set handler to active # set handler to active
now = int(time.time()) now = int(time.time())
if now - handler.last_activity < eventloop.TIMEOUT_PRECISION: if now - handler.last_activity < eventloop.TIMEOUT_PRECISION:

View file

@ -81,7 +81,7 @@ def client_key(source_addr, server_af):
class UDPRelay(object): class UDPRelay(object):
def __init__(self, config, dns_resolver, is_local): def __init__(self, config, dns_resolver, is_local, stat_callback=None):
self._config = config self._config = config
if is_local: if is_local:
self._listen_addr = config['local_address'] self._listen_addr = config['local_address']
@ -121,6 +121,7 @@ class UDPRelay(object):
server_socket.bind((self._listen_addr, self._listen_port)) server_socket.bind((self._listen_addr, self._listen_port))
server_socket.setblocking(False) server_socket.setblocking(False)
self._server_socket = server_socket self._server_socket = server_socket
self._stat_callback = stat_callback
def _get_a_server(self): def _get_a_server(self):
server = self._config['server'] server = self._config['server']
@ -146,6 +147,8 @@ class UDPRelay(object):
data, r_addr = server.recvfrom(BUF_SIZE) data, r_addr = server.recvfrom(BUF_SIZE)
if not data: if not data:
logging.debug('UDP handle_server: data is empty') logging.debug('UDP handle_server: data is empty')
if self._stat_callback:
self._stat_callback(self._listen_port, len(data))
if self._is_local: if self._is_local:
frag = common.ord(data[2]) frag = common.ord(data[2])
if frag != 0: if frag != 0:
@ -181,7 +184,6 @@ class UDPRelay(object):
af, socktype, proto, canonname, sa = addrs[0] af, socktype, proto, canonname, sa = addrs[0]
key = client_key(r_addr, af) key = client_key(r_addr, af)
logging.debug(key)
client = self._cache.get(key, None) client = self._cache.get(key, None)
if not client: if not client:
# TODO async getaddrinfo # TODO async getaddrinfo
@ -221,6 +223,8 @@ class UDPRelay(object):
if not data: if not data:
logging.debug('UDP handle_client: data is empty') logging.debug('UDP handle_client: data is empty')
return return
if self._stat_callback:
self._stat_callback(self._listen_port, len(data))
if not self._is_local: if not self._is_local:
addrlen = len(r_addr[0]) addrlen = len(r_addr[0])
if addrlen > 255: if addrlen > 255: