add ipv6 eth in server pool

This commit is contained in:
mengskysama 2014-09-17 20:37:16 +08:00
parent 7f8621f153
commit 1f05b378ac
4 changed files with 89 additions and 61 deletions

View file

@ -1,5 +1,6 @@
{
"server":"127.0.0.1",
"server":"0.0.0.0",
"server_ipv6": "[::]",
"server_port":8388,
"local_address": "127.0.0.1",
"local_port":1080,

View file

@ -29,14 +29,14 @@ class DbTransfer(object):
dt_transfer = {}
for id in curr_transfer.keys():
if id in last_transfer:
if last_transfer[id][0] == curr_transfer[id][0] and \
last_transfer[id][1] == curr_transfer[id][1]:
if last_transfer[id][0] == curr_transfer[id][0] and last_transfer[id][1] == curr_transfer[id][1]:
continue
elif curr_transfer[id][0] == 0 and curr_transfer[id][1] == 0:
continue
elif last_transfer[id][0] <= curr_transfer[id][0] and \
last_transfer[id][1] <= curr_transfer[id][1]:
dt_transfer[id] = [curr_transfer[id][0] - last_transfer[id][0] ,curr_transfer[id][1] - last_transfer[id][1]]
dt_transfer[id] = [curr_transfer[id][0] - last_transfer[id][0],
curr_transfer[id][1] - last_transfer[id][1]]
else:
dt_transfer[id] = [curr_transfer[id][0], curr_transfer[id][1]]
else:
@ -64,7 +64,8 @@ class DbTransfer(object):
' END, t = ' + str(int(last_time)) + \
' WHERE port IN (%s)' % query_sub_in
#print query_sql
conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER, passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8')
conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER,
passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8')
cur = conn.cursor()
cur.execute(query_sql)
cur.close()
@ -74,7 +75,8 @@ class DbTransfer(object):
@staticmethod
def pull_db_all_user():
#数据库所有用户信息
conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER, passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8')
conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER,
passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8')
cur = conn.cursor()
cur.execute("SELECT port, u, d, transfer_enable, passwd, switch, enable FROM user")
rows = []
@ -88,7 +90,7 @@ class DbTransfer(object):
def del_server_out_of_bound_safe(rows):
#停止超流量的服务
for row in rows:
if ServerPool.get_instance().server_is_run(row[0]) is True:
if ServerPool.get_instance().server_is_run(row[0]) > 0:
if row[1] + row[2] >= row[3]:
logging.info('db stop server at port [%s]' % (row[0]))
ServerPool.get_instance().del_server(row[0])

View file

@ -10,6 +10,7 @@ import db_transfer
if __name__ == '__main__':
#server_pool.ServerPool.get_instance()
#server_pool.ServerPool.get_instance().new_server(2333, '2333')
thread.start_new_thread(db_transfer.DbTransfer.thread_db, ())
while True:
time.sleep(99999)

View file

@ -47,6 +47,7 @@ class ServerPool(object):
self.dns_resolver = asyncdns.DNSResolver()
self.mgr = asyncmgr.ServerMgr()
self.tcp_servers_pool = {}
self.tcp_ipv6_servers_pool = {}
#self.udp_servers_pool = {}
self.loop = eventloop.EventLoop()
@ -72,83 +73,106 @@ class ServerPool(object):
def server_is_run(self, port):
port = int(port)
ret = 0
if port in self.tcp_servers_pool:
return True
return False
ret = 1
if port in self.tcp_ipv6_servers_pool:
ret |= 2
return ret
def new_server(self, port, password):
ret = True
port = int(port)
if self.server_is_run(port):
logging.info("server already at %s:%d" %(self.config['server'], port))
return 'this port server is already running'
a_config = self.config.copy()
a_config['server_port'] = port
a_config['password'] = password
logging.info("starting server at %s:%d" %(a_config['server'], port))
try:
tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False)
#udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False)
except Exception, e:
logging.warn(e)
return e
try:
#add is safe
tcp_server.add_to_loop(self.loop)
self.tcp_servers_pool.update({port: tcp_server})
#self.udp_servers_pool.update({port: tcp_server})
except Exception, e:
logging.warn(e)
ret = e
return ret
if 'server' in self.config:
if port in self.tcp_servers_pool:
logging.info("server already at %s:%d" % (self.config['server'], port))
return 'this port server is already running'
else:
a_config = self.config.copy()
a_config['server_port'] = port
a_config['password'] = password
try:
logging.info("starting server at %s:%d" % (a_config['server'], port))
tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False)
tcp_server.add_to_loop(self.loop)
self.tcp_servers_pool.update({port: tcp_server})
#udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False)
except Exception, e:
logging.warn(e)
if 'server_ipv6' in self.config:
if port in self.tcp_ipv6_servers_pool:
logging.info("server already at %s:%d" % (self.config['server_ipv6'], port))
return 'this port server is already running'
else:
a_config = self.config.copy()
a_config['server'] = a_config['server_ipv6']
a_config['server_port'] = port
a_config['password'] = password
try:
logging.info("starting server at %s:%d" % (a_config['server'], port))
tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False)
tcp_server.add_to_loop(self.loop)
self.tcp_ipv6_servers_pool.update({port: tcp_server})
#udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False)
except Exception, e:
logging.warn(e)
return True
def del_server(self, port):
port = int(port)
ret = True
if port not in self.tcp_servers_pool:
logging.info("stopped server at %s:%d already stop" % (self.config['server'], int(port)))
return True
logging.info("stopping server at %s:%d" % (self.config['server'], int(port)))
logging.info("del server at %d" % int(port))
try:
udpsock = socket(AF_INET, SOCK_DGRAM)
udpsock.sendto('%s:%s:0:0' % (Config.MANAGE_PASS, port), (Config.MANAGE_BIND_IP, Config.MANAGE_PORT))
udpsock.close()
except Exception, e:
import traceback
traceback.print_exc()
ret = e
logging.warn(e)
return ret
return True
def cb_del_server(self, port):
port = int(port)
ret = True
if port not in self.tcp_servers_pool:
logging.info("stopped server at %s:%d already stop" % (self.config['server'], int(port)))
return True
logging.info("stopped server at %s:%d" % (self.config['server'], int(port)))
try:
self.tcp_servers_pool[int(port)].destroy()
del self.tcp_servers_pool[int(port)]
#del self.udp_servers_pool[int(port)]
except Exception, e:
ret = e
logging.warn(e)
import traceback
traceback.print_exc()
return ret
else:
logging.info("stopped server at %s:%d" % (self.config['server'], int(port)))
try:
self.tcp_servers_pool[int(port)].destroy()
del self.tcp_servers_pool[int(port)]
#del self.udp_servers_pool[int(port)]
except Exception, e:
logging.warn(e)
if port not in self.tcp_ipv6_servers_pool:
logging.info("stopped server at %s:%d already stop" % (self.config['server_ipv6'], int(port)))
else:
logging.info("stopped server at %s:%d" % (self.config['server_ipv6'], int(port)))
try:
self.tcp_servers_pool[int(port)].destroy()
del self.tcp_servers_pool[int(port)]
except Exception, e:
logging.warn(e)
return True
def get_server_transfer(self, port):
port = int(port)
ret = [0, 0]
if port in self.tcp_servers_pool:
return [self.tcp_servers_pool[port].server_transfer_ul, self.tcp_servers_pool[port].server_transfer_dl]
return [0,0]
def get_servers_transfer(self):
ret = {}
for server_port in self.tcp_servers_pool.keys():
ret[server_port] = [self.tcp_servers_pool[server_port].server_transfer_ul
,self.tcp_servers_pool[server_port].server_transfer_dl]
ret[0] = self.tcp_servers_pool[port].server_transfer_ul
ret[1] = self.tcp_servers_pool[port].server_transfer_dl
if port in self.tcp_ipv6_servers_pool:
ret[0] += self.tcp_ipv6_servers_pool[port].server_transfer_ul
ret[1] += self.tcp_ipv6_servers_pool[port].server_transfer_dl
return ret
def get_servers_transfer(self):
servers = self.tcp_servers_pool.copy()
servers.update(self.tcp_ipv6_servers_pool)
ret = {}
for port in servers.keys():
ret[port] = self.get_server_transfer(port)
return ret