Merge pull request #1 from orvice/manyuser

更正一个拼写错误
This commit is contained in:
mengskysama 2015-01-26 09:46:34 +08:00
commit 043b2e62dd
12 changed files with 599 additions and 120 deletions

View file

@ -1,3 +1,91 @@
shadowsocks manyuser branch
===========
Which people need this branch
------------------
1.share shadowsocks server
2.create multi server by shadowsocks
3.manage server (transfer / account)
Install
-------
install MySQL 5.x.x
`pip install cymysql`
create a database named `shadowsocks`
import `shadowsocks.sql` into `shadowsocks`
edit Config.py
Example:
#Config
MYSQL_HOST = 'mdss.mengsky.net'
MYSQL_PORT = 3306
MYSQL_USER = 'ss'
MYSQL_PASS = 'ss'
MYSQL_DB = 'shadowsocks'
MANAGE_PASS = 'ss233333333'
#if you want manage in other server you should set this value to global ip
MANAGE_BIND_IP = '127.0.0.1'
#make sure this port is idle
MANAGE_PORT = 23333
TestRun `cd shadowsocks` ` python server.py`
if no exception server will startup. you will see such like
Example:
db start server at port [%s] pass [%s]
Database user table column
------------------
`passwd` server pass
`port` server port
`t` last keepalive time
`u` upload transfer
`d` download transer
`transfer_enable` if u + d > transfer_enable this server will be stop (db_transfer.py del_server_out_of_bound_safe)
Manage socket
------------------
Manage server work in UDP at `MANAGE_BIND_IP` `MANAGE_PORT`
use `MANAGE_PASS:port:passwd:0` to del a server at port `port`
use `MANAGE_PASS:port:passwd:1` to run a server at port `port` password is `passwd`
Python Eg:
udpsock.sendto('MANAGE_PASS:65535:123456:1', (MANAGE_BIND_IP, MANAGE_PORT))
PHP Eg:
$sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
$msg = 'MANAGE_PASS:65535:123456:1';
$len = strlen($msg);
socket_sendto($sock, $msg, $len, 0, MANAGE_BIND_IP, MANAGE_PORT);
socket_close($sock);
NOTICE
------------------
If error such like `2014-09-18 09:02:37 ERROR [Errno 24] Too many open files`
edit /etc/security/limits.conf
Add:
* soft nofile 8192
* hard nofile 65535
shadowsocks shadowsocks
=========== ===========

12
shadowsocks/Config.py Normal file
View file

@ -0,0 +1,12 @@
#Config
MYSQL_HOST = 'mdss.mengsky.net'
MYSQL_PORT = 3306
MYSQL_USER = 'ss'
MYSQL_PASS = 'ss'
MYSQL_DB = 'shadowsocks'
MANAGE_PASS = 'ss233333333'
#if you want manage in other server you should set this value to global ip
MANAGE_BIND_IP = '127.0.0.1'
#make sure this port is idle
MANAGE_PORT = 23333

100
shadowsocks/asyncmgr.py Normal file
View file

@ -0,0 +1,100 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014 clowwindy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import time
import os
import socket
import struct
import re
import logging
import common
import lru_cache
import eventloop
import server_pool
import Config
class ServerMgr(object):
def __init__(self):
self._loop = None
self._request_id = 1
self._hosts = {}
self._hostname_status = {}
self._hostname_to_cb = {}
self._cb_to_hostname = {}
self._last_time = time.time()
self._sock = None
self._servers = None
def add_to_loop(self, loop):
if self._loop:
raise Exception('already add to loop')
self._loop = loop
# TODO when dns server is IPv6
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.SOL_UDP)
self._sock.bind((Config.MANAGE_BIND_IP, Config.MANAGE_PORT))
self._sock.setblocking(False)
loop.add(self._sock, eventloop.POLL_IN)
loop.add_handler(self.handle_events)
def _handle_data(self, sock):
data, addr = sock.recvfrom(128)
#manage pwd:port:passwd:action
args = data.split(':')
if len(args) < 4:
return
if args[0] == Config.MANAGE_PASS:
if args[3] == '0':
server_pool.ServerPool.get_instance().cb_del_server(args[1])
elif args[3] == '1':
server_pool.ServerPool.get_instance().new_server(args[1], args[2])
def handle_events(self, events):
for sock, fd, event in events:
if sock != self._sock:
continue
if event & eventloop.POLL_ERR:
logging.error('mgr socket err')
self._loop.remove(self._sock)
self._sock.close()
# TODO when dns server is IPv6
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.SOL_UDP)
self._sock.setblocking(False)
self._loop.add(self._sock, eventloop.POLL_IN)
else:
self._handle_data(sock)
break
def close(self):
if self._sock:
self._sock.close()
self._sock = None
def test():
pass
if __name__ == '__main__':
test()

12
shadowsocks/config.json Normal file
View file

@ -0,0 +1,12 @@
{
"server":"0.0.0.0",
"server_ipv6": "[::]",
"server_port":8388,
"local_address": "127.0.0.1",
"local_port":1080,
"password":"m",
"timeout":300,
"method":"aes-256-cfb",
"fast_open": false,
"workers": 1
}

121
shadowsocks/db_transfer.py Normal file
View file

@ -0,0 +1,121 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import logging
import cymysql
import time
import sys
from server_pool import ServerPool
import Config
class DbTransfer(object):
instance = None
def __init__(self):
self.last_get_transfer = {}
@staticmethod
def get_instance():
if DbTransfer.instance is None:
DbTransfer.instance = DbTransfer()
return DbTransfer.instance
def push_db_all_user(self):
#更新用户流量到数据库
last_transfer = self.last_get_transfer
curr_transfer = ServerPool.get_instance().get_servers_transfer()
#上次和本次的增量
dt_transfer = {}
for id in curr_transfer.keys():
if id in last_transfer:
if last_transfer[id][0] == curr_transfer[id][0] and last_transfer[id][1] == curr_transfer[id][1]:
continue
elif curr_transfer[id][0] == 0 and curr_transfer[id][1] == 0:
continue
elif last_transfer[id][0] <= curr_transfer[id][0] and \
last_transfer[id][1] <= curr_transfer[id][1]:
dt_transfer[id] = [curr_transfer[id][0] - last_transfer[id][0],
curr_transfer[id][1] - last_transfer[id][1]]
else:
dt_transfer[id] = [curr_transfer[id][0], curr_transfer[id][1]]
else:
if curr_transfer[id][0] == 0 and curr_transfer[id][1] == 0:
continue
dt_transfer[id] = [curr_transfer[id][0], curr_transfer[id][1]]
self.last_get_transfer = curr_transfer
query_head = 'UPDATE user'
query_sub_when = ''
query_sub_when2 = ''
query_sub_in = None
last_time = time.time()
for id in dt_transfer.keys():
query_sub_when += ' WHEN %s THEN u+%s' % (id, dt_transfer[id][0])
query_sub_when2 += ' WHEN %s THEN d+%s' % (id, dt_transfer[id][1])
if query_sub_in is not None:
query_sub_in += ',%s' % id
else:
query_sub_in = '%s' % id
if query_sub_when == '':
return
query_sql = query_head + ' SET u = CASE port' + query_sub_when + \
' END, d = CASE port' + query_sub_when2 + \
' END, t = ' + str(int(last_time)) + \
' WHERE port IN (%s)' % query_sub_in
#print query_sql
conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER,
passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8')
cur = conn.cursor()
cur.execute(query_sql)
cur.close()
conn.commit()
conn.close()
@staticmethod
def pull_db_all_user():
#数据库所有用户信息
conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER,
passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8')
cur = conn.cursor()
cur.execute("SELECT port, u, d, transfer_enable, passwd, switch, enable FROM user")
rows = []
for r in cur.fetchall():
rows.append(list(r))
cur.close()
conn.close()
return rows
@staticmethod
def del_server_out_of_bound_safe(rows):
#停止超流量的服务
#启动没超流量的服务
for row in rows:
if ServerPool.get_instance().server_is_run(row[0]) > 0:
if row[1] + row[2] >= row[3]:
logging.info('db stop server at port [%s]' % (row[0]))
ServerPool.get_instance().del_server(row[0])
elif ServerPool.get_instance().server_run_status(row[0]) is False:
if row[5] == 1 and row[6] == 1 and row[1] + row[2] < row[3]:
logging.info('db start server at port [%s] pass [%s]' % (row[0], row[4]))
ServerPool.get_instance().new_server(row[0], row[4])
@staticmethod
def thread_db():
import socket
import time
timeout = 60
socket.setdefaulttimeout(timeout)
while True:
#logging.warn('db loop')
try:
DbTransfer.get_instance().push_db_all_user()
rows = DbTransfer.get_instance().pull_db_all_user()
DbTransfer.del_server_out_of_bound_safe(rows)
except Exception as e:
logging.warn('db thread except:%s' % e)
finally:
time.sleep(15)
#SQLData.pull_db_all_user()
#print DbTransfer.get_instance().test()

View file

@ -192,6 +192,9 @@ class EventLoop(object):
def add_handler(self, handler): def add_handler(self, handler):
self._handlers.append(handler) self._handlers.append(handler)
def remove_handler(self, handler):
self._handlers.remove(handler)
def run(self): def run(self):
while not self.stopping: while not self.stopping:
try: try:

View file

@ -1,123 +1,16 @@
#!/usr/bin/env python import time
# -*- coding: utf-8 -*-
# Copyright (c) 2014 clowwindy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import sys import sys
import os import thread
import logging import server_pool
import utils import db_transfer
import encrypt
import eventloop
import tcprelay
import udprelay
import asyncdns
def main():
utils.check_python()
config = utils.get_config(False)
utils.print_shadowsocks()
if config['port_password']:
if config['server_port'] or config['password']:
logging.warn('warning: port_password should not be used with '
'server_port and password. server_port and password '
'will be ignored')
else:
config['port_password'] = {}
server_port = config['server_port']
if type(server_port) == list:
for a_server_port in server_port:
config['port_password'][a_server_port] = config['password']
else:
config['port_password'][str(server_port)] = config['password']
encrypt.init_table(config['password'], config['method'])
tcp_servers = []
udp_servers = []
dns_resolver = asyncdns.DNSResolver()
for port, password in config['port_password'].items():
a_config = config.copy()
a_config['server_port'] = int(port)
a_config['password'] = password
logging.info("starting server at %s:%d" %
(a_config['server'], int(port)))
tcp_servers.append(tcprelay.TCPRelay(a_config, dns_resolver, False))
udp_servers.append(udprelay.UDPRelay(a_config, dns_resolver, False))
def run_server():
try:
loop = eventloop.EventLoop()
dns_resolver.add_to_loop(loop)
for tcp_server in tcp_servers:
tcp_server.add_to_loop(loop)
for udp_server in udp_servers:
udp_server.add_to_loop(loop)
loop.run()
except (KeyboardInterrupt, IOError, OSError) as e:
logging.error(e)
import traceback
traceback.print_exc()
os._exit(0)
if int(config['workers']) > 1:
if os.name == 'posix':
children = []
is_child = False
for i in xrange(0, int(config['workers'])):
r = os.fork()
if r == 0:
logging.info('worker started')
is_child = True
run_server()
break
else:
children.append(r)
if not is_child:
def handler(signum, _):
for pid in children:
os.kill(pid, signum)
os.waitpid(pid, 0)
sys.exit()
import signal
signal.signal(signal.SIGTERM, handler)
# master
for a_tcp_server in tcp_servers:
a_tcp_server.close()
for a_udp_server in udp_servers:
a_udp_server.close()
dns_resolver.close()
for child in children:
os.waitpid(child, 0)
else:
logging.warn('worker is only available on Unix/Linux')
run_server()
else:
run_server()
#def test():
# thread.start_new_thread(DbTransfer.thread_db, ())
# Api.web_server()
if __name__ == '__main__': if __name__ == '__main__':
main() #server_pool.ServerPool.get_instance()
#server_pool.ServerPool.get_instance().new_server(2333, '2333')
thread.start_new_thread(db_transfer.DbTransfer.thread_db, ())
while True:
time.sleep(99999)

196
shadowsocks/server_pool.py Normal file
View file

@ -0,0 +1,196 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014 clowwindy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import logging
import utils
import time
import eventloop
import tcprelay
import udprelay
import asyncdns
import thread
import threading
import sys
import asyncmgr
import Config
from socket import *
class ServerPool(object):
instance = None
def __init__(self):
utils.check_python()
self.config = utils.get_config(False)
utils.print_shadowsocks()
self.dns_resolver = asyncdns.DNSResolver()
self.mgr = asyncmgr.ServerMgr()
self.tcp_servers_pool = {}
self.tcp_ipv6_servers_pool = {}
#self.udp_servers_pool = {}
#self.udp_ipv6_servers_pool = {}
self.loop = eventloop.EventLoop()
thread.start_new_thread(ServerPool._loop, (self.loop, self.dns_resolver, self.mgr))
@staticmethod
def get_instance():
if ServerPool.instance is None:
ServerPool.instance = ServerPool()
return ServerPool.instance
@staticmethod
def _loop(loop, dns_resolver, mgr):
try:
mgr.add_to_loop(loop)
dns_resolver.add_to_loop(loop)
loop.run()
except (KeyboardInterrupt, IOError, OSError) as e:
logging.error(e)
import traceback
traceback.print_exc()
os.exit(0)
def server_is_run(self, port):
port = int(port)
ret = 0
if port in self.tcp_servers_pool:
ret = 1
if port in self.tcp_ipv6_servers_pool:
ret |= 2
return ret
def server_run_status(self, port):
if 'server' in self.config:
if port not in self.tcp_servers_pool:
return False
if 'server_ipv6' in self.config:
if port not in self.tcp_ipv6_servers_pool:
return False
return True
def new_server(self, port, password):
ret = True
port = int(port)
if 'server' in self.config:
if port in self.tcp_servers_pool:
logging.info("server already at %s:%d" % (self.config['server'], port))
return 'this port server is already running'
else:
a_config = self.config.copy()
a_config['server_port'] = port
a_config['password'] = password
try:
logging.info("starting server at %s:%d" % (a_config['server'], port))
tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False)
tcp_server.add_to_loop(self.loop)
self.tcp_servers_pool.update({port: tcp_server})
#udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False)
#udp_server.add_to_loop(self.loop)
#self.udp_servers_pool.update({port: udp_server})
except Exception, e:
logging.warn(e)
if 'server_ipv6' in self.config:
if port in self.tcp_ipv6_servers_pool:
logging.info("server already at %s:%d" % (self.config['server_ipv6'], port))
return 'this port server is already running'
else:
a_config = self.config.copy()
a_config['server'] = a_config['server_ipv6']
a_config['server_port'] = port
a_config['password'] = password
try:
logging.info("starting server at %s:%d" % (a_config['server'], port))
tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False)
tcp_server.add_to_loop(self.loop)
self.tcp_ipv6_servers_pool.update({port: tcp_server})
#udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False)
#udp_server.add_to_loop(self.loop)
#self.udp_ipv6_servers_pool.update({port: udp_server})
except Exception, e:
logging.warn(e)
return True
def del_server(self, port):
port = int(port)
logging.info("del server at %d" % port)
try:
udpsock = socket(AF_INET, SOCK_DGRAM)
udpsock.sendto('%s:%s:0:0' % (Config.MANAGE_PASS, port), (Config.MANAGE_BIND_IP, Config.MANAGE_PORT))
udpsock.close()
except Exception, e:
logging.warn(e)
return True
def cb_del_server(self, port):
port = int(port)
if port not in self.tcp_servers_pool:
logging.info("stopped server at %s:%d already stop" % (self.config['server'], port))
else:
logging.info("stopped server at %s:%d" % (self.config['server'], port))
try:
self.tcp_servers_pool[port].destroy()
del self.tcp_servers_pool[port]
#self.udp_servers_pool[port].destroy()
#del self.udp_servers_pool[port]
except Exception, e:
logging.warn(e)
if 'server_ipv6' in self.config:
if port not in self.tcp_ipv6_servers_pool:
logging.info("stopped server at %s:%d already stop" % (self.config['server_ipv6'], port))
else:
logging.info("stopped server at %s:%d" % (self.config['server_ipv6'], port))
try:
self.tcp_ipv6_servers_pool[port].destroy()
del self.tcp_ipv6_servers_pool[port]
#self.udp_ipv6_servers_pool[port].destroy()
#del self.udp_ipv6_servers_pool[port]
except Exception, e:
logging.warn(e)
return True
def get_server_transfer(self, port):
port = int(port)
ret = [0, 0]
if port in self.tcp_servers_pool:
ret[0] = self.tcp_servers_pool[port].server_transfer_ul
ret[1] = self.tcp_servers_pool[port].server_transfer_dl
if port in self.tcp_ipv6_servers_pool:
ret[0] += self.tcp_ipv6_servers_pool[port].server_transfer_ul
ret[1] += self.tcp_ipv6_servers_pool[port].server_transfer_dl
return ret
def get_servers_transfer(self):
servers = self.tcp_servers_pool.copy()
servers.update(self.tcp_ipv6_servers_pool)
ret = {}
for port in servers.keys():
ret[port] = self.get_server_transfer(port)
return ret

View file

@ -0,0 +1,24 @@
SET FOREIGN_KEY_CHECKS=0;
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`email` varchar(32) NOT NULL,
`pass` varchar(16) NOT NULL,
`passwd` varchar(16) NOT NULL,
`t` int(11) NOT NULL DEFAULT '0',
`u` bigint(20) NOT NULL,
`d` bigint(20) NOT NULL,
`transfer_enable` bigint(20) NOT NULL,
`port` int(11) NOT NULL,
`switch` tinyint(4) NOT NULL DEFAULT '1',
`enable` tinyint(4) NOT NULL DEFAULT '1',
`type` tinyint(4) NOT NULL DEFAULT '1',
`last_get_gift_time` int(11) NOT NULL DEFAULT '0',
`last_rest_pass_time` int(11) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`,`port`)
) ENGINE=InnoDB AUTO_INCREMENT=415 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES ('7', 'test@test.com', '123456', '0000000', '1410609560', '0', '0', '9320666234', '50000', '1', '1', '7', '0', '0');

View file

@ -356,6 +356,7 @@ class TCPRelayHandler(object):
if not data: if not data:
self.destroy() self.destroy()
return return
self._server.server_transfer_ul += len(data)
if not is_local: if not is_local:
data = self._encryptor.decrypt(data) data = self._encryptor.decrypt(data)
if not data: if not data:
@ -388,6 +389,7 @@ class TCPRelayHandler(object):
if not data: if not data:
self.destroy() self.destroy()
return return
self._server.server_transfer_dl += len(data)
if self._is_local: if self._is_local:
data = self._encryptor.decrypt(data) data = self._encryptor.decrypt(data)
else: else:
@ -495,6 +497,8 @@ class TCPRelay(object):
self._eventloop = None self._eventloop = None
self._fd_to_handlers = {} self._fd_to_handlers = {}
self._last_time = time.time() self._last_time = time.time()
self.server_transfer_ul = 0L
self.server_transfer_dl = 0L
self._timeout = config['timeout'] self._timeout = config['timeout']
self._timeouts = [] # a list for all the handlers self._timeouts = [] # a list for all the handlers
@ -539,6 +543,21 @@ class TCPRelay(object):
self._eventloop.add(self._server_socket, self._eventloop.add(self._server_socket,
eventloop.POLL_IN | eventloop.POLL_ERR) eventloop.POLL_IN | eventloop.POLL_ERR)
def remove_to_loop(self):
self._eventloop.remove(self._server_socket)
self._eventloop.remove_handler(self._handle_events)
def destroy(self):
#destroy all conn and server conn at this tcprelay
self.remove_to_loop()
for fd in self._fd_to_handlers.keys():
try:
self._fd_to_handlers[fd].destroy()
except Exception, e:
#already destroy
pass
self.close()
def remove_handler(self, handler): def remove_handler(self, handler):
index = self._handler_to_timeouts.get(hash(handler), -1) index = self._handler_to_timeouts.get(hash(handler), -1)
if index >= 0: if index >= 0:

View file

@ -249,6 +249,17 @@ class UDPRelay(object):
self._eventloop.add(server_socket, self._eventloop.add(server_socket,
eventloop.POLL_IN | eventloop.POLL_ERR) eventloop.POLL_IN | eventloop.POLL_ERR)
def remove_to_loop(self):
self._eventloop.remove(self._server_socket)
self._eventloop.remove_handler(self._handle_events)
def destroy(self):
#destroy all conn and server conn
self.remove_to_loop()
self.close()
#GC
self._cache = None
def _handle_events(self, events): def _handle_events(self, events):
for sock, fd, event in events: for sock, fd, event in events:
if sock == self._server_socket: if sock == self._server_socket: