add manyuser

This commit is contained in:
bianzhifu 2016-08-27 17:22:51 +08:00
parent f35590b2e2
commit c738ebb128
5 changed files with 269 additions and 16 deletions

22
shadowsocks/config.py Normal file
View file

@ -0,0 +1,22 @@
import logging
#Config
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASS = ''
MYSQL_DB = 'shadowsocks'
SS_BIND_IP = '0.0.0.0'
SS_METHOD = 'rc4-md5'
MANAGE_BIND_IP = '127.0.0.1'
MANAGE_PORT = 3333
CHECKTIME = 15
SYNCTIME = 600
#LOG CONFIG
LOG_ENABLE = False
LOG_LEVEL = logging.INFO
LOG_FILE = '/val/log/shadowsocks.log'

143
shadowsocks/dbtransfer.py Normal file
View file

@ -0,0 +1,143 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import logging
import cymysql
import time
import socket
import config
import json
class DbTransfer(object):
instance = None
def __init__(self):
self.last_get_transfer = {}
@staticmethod
def get_instance():
if DbTransfer.instance is None:
DbTransfer.instance = DbTransfer()
return DbTransfer.instance
@staticmethod
def get_mysql_conn():
conn = cymysql.connect(host=config.MYSQL_HOST, port=config.MYSQL_PORT, user=config.MYSQL_USER,
passwd=config.MYSQL_PASS, db=config.MYSQL_DB, charset='utf8')
return conn;
@staticmethod
def send_command(cmd):
data = ''
try:
cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cli.settimeout(1)
cli.sendto(cmd, ('%s' % (config.MANAGE_BIND_IP), config.MANAGE_PORT))
data, addr = cli.recvfrom(1500)
cli.close()
# TODO: bad way solve timed out
# time.sleep(0.05)
except:
logging.warn('send_command response')
return data
@staticmethod
def get_servers_transfer():
dt_transfer = {}
cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cli.settimeout(2)
cli.sendto('transfer: {}', ('%s' % (config.MANAGE_BIND_IP), config.MANAGE_PORT))
while True:
data, addr = cli.recvfrom(1500)
if data == 'e':
break
data = json.loads(data)
dt_transfer.update(data)
cli.close()
return dt_transfer
def push_db_all_user(self):
logging.info("push_db_all_user")
dt_transfer = self.get_servers_transfer()
last_time = time.time()
conn = DbTransfer.get_mysql_conn()
cur = conn.cursor()
for port in dt_transfer.keys():
update_sql='UPDATE user '+\
'set b_usage=b_usage+'+str(dt_transfer[port])+\
' where ss_port = '+str(port)
insert_sql='insert bandwidth_log value(null,"node1",'+str(port)+','+str(dt_transfer[port])+','+str(int(last_time))+')'
logging.info(update_sql)
cur.execute(update_sql)
cur.execute(insert_sql)
cur.close()
conn.commit()
conn.close()
@staticmethod
def pull_db_all_user():
conn = DbTransfer.get_mysql_conn()
cur = conn.cursor()
cur.execute("SELECT ss_port, ss_pwd, b_usage, b_max, u_status FROM user")
rows = []
for r in cur.fetchall():
rows.append(list(r))
cur.close()
conn.close()
return rows
@staticmethod
def del_server_out_of_bound_safe(rows):
for row in rows:
server = json.loads(DbTransfer.get_instance().send_command('stat: {"server_port":%s}' % row[0]))
if server['stat'] != 'ko':
if row[4] < 0:
# stop disable or switch off user
logging.info('db stop server at port [%s] reason: disable' % (row[0]))
DbTransfer.send_command('remove: {"server_port":%s}' % row[0])
elif row[2] >= row[3]:
# stop out bandwidth user
logging.info('db stop server at port [%s] reason: out bandwidth' % (row[0]))
DbTransfer.send_command('remove: {"server_port":%s}' % row[0])
if server['password'] != row[1]:
# password changed
logging.info('db stop server at port [%s] reason: password changed' % (row[0]))
DbTransfer.send_command('remove: {"server_port":%s}' % row[0])
else:
if row[4] > 0 and row[2] < row[3]:
logging.info('db start server at port [%s] pass [%s]' % (row[0], row[1]))
DbTransfer.send_command('add: {"server_port": %s, "password":"%s"}' % (row[0], row[1]))
@staticmethod
def thread_db():
import socket
import time
timeout = 30
socket.setdefaulttimeout(timeout)
while True:
logging.info('db thread_db')
try:
rows = DbTransfer.get_instance().pull_db_all_user()
DbTransfer.del_server_out_of_bound_safe(rows)
except Exception as e:
import traceback
traceback.print_exc()
logging.warn('db thread except:%s' % e)
finally:
time.sleep(config.CHECKTIME)
@staticmethod
def thread_push():
import socket
import time
timeout = 30
socket.setdefaulttimeout(timeout)
while True:
logging.info('db thread_push')
try:
DbTransfer.get_instance().push_db_all_user()
except Exception as e:
import traceback
traceback.print_exc()
logging.warn('db thread except:%s' % e)
finally:
time.sleep(config.SYNCTIME)

View file

@ -67,7 +67,7 @@ class Manager(object):
exit(1)
self._loop.add(self._control_socket,
eventloop.POLL_IN, self)
self._loop.add_periodic(self.handle_periodic)
# self._loop.add_periodic(self.handle_periodic)
port_password = config['port_password']
del config['port_password']
@ -106,6 +106,14 @@ class Manager(object):
logging.error("server not exist at %s:%d" % (config['server'],
port))
def stat_port(self, config):
port = int(config['server_port'])
servers = self._relays.get(port, None)
if servers:
self._send_control_data(b'{"stat":"ok", "password":"%s"}' % servers[0]._config['password'])
else:
self._send_control_data(b'{"stat":"ko"}')
def handle_event(self, sock, fd, event):
if sock == self._control_socket and event == eventloop.POLL_IN:
data, self._control_client_addr = sock.recvfrom(BUF_SIZE)
@ -113,22 +121,27 @@ class Manager(object):
if parsed:
command, config = parsed
a_config = self._config.copy()
if config:
# let the command override the configuration file
a_config.update(config)
if 'server_port' not in a_config:
logging.error('can not find server_port in config')
if command == 'transfer':
self.handle_periodic()
else:
if command == 'add':
self.add_port(a_config)
self._send_control_data(b'ok')
elif command == 'remove':
self.remove_port(a_config)
self._send_control_data(b'ok')
elif command == 'ping':
self._send_control_data(b'pong')
if config:
# let the command override the configuration file
a_config.update(config)
if 'server_port' not in a_config:
logging.error('can not find server_port in config')
else:
logging.error('unknown command %s', command)
if command == 'add':
self.add_port(a_config)
self._send_control_data(b'ok')
elif command == 'remove':
self.remove_port(a_config)
self._send_control_data(b'ok')
elif command == 'stat':
self.stat_port(a_config)
elif command == 'ping':
self._send_control_data(b'pong')
else:
logging.error('unknown command %s', command)
def _parse_command(self, data):
# commands:
@ -158,7 +171,7 @@ class Manager(object):
# use compact JSON format (without space)
data = common.to_bytes(json.dumps(data_dict,
separators=(',', ':')))
self._send_control_data(b'stat: ' + data)
self._send_control_data(data)
for k, v in self._statistics.items():
r[k] = v
@ -170,6 +183,7 @@ class Manager(object):
i = 0
if len(r) > 0:
send_data(r)
self._send_control_data('e')
self._statistics.clear()
def _send_control_data(self, data):

37
shadowsocks/servers.py Executable file
View file

@ -0,0 +1,37 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import thread
import time
import manager
import config
from dbtransfer import DbTransfer
if config.LOG_ENABLE:
logging.basicConfig(
filename=config.LOG_FILE,
level=config.LOG_LEVEL,
datefmt='%Y-%m-%d %H:%M:%S',
format='%(asctime)s %(levelname)s %(filename)s[%(lineno)d] %(message)s'
)
def main():
configer = {
'server': '%s' % config.SS_BIND_IP,
'local_port': 1081,
'port_password': {
},
'method': '%s' % config.SS_METHOD,
'manager_address': '%s:%s' % (config.MANAGE_BIND_IP, config.MANAGE_PORT),
'timeout': 60, # some protocol keepalive packet 3 min Eg bt
'fast_open': False,
'verbose': 1
}
start_shadowsock = thread.start_new_thread(manager.run, (configer,))
time.sleep(1)
sync_users = thread.start_new_thread(DbTransfer.thread_db, ())
time.sleep(1)
sysc_transfer = thread.start_new_thread(DbTransfer.thread_push, ())
while True:
time.sleep(3600)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,37 @@
SET NAMES utf8;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for `bandwidth_log`
-- ----------------------------
DROP TABLE IF EXISTS `bandwidth_log`;
CREATE TABLE `bandwidth_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`node` varchar(255) NOT NULL,
`port` int(11) NOT NULL,
`data` bigint(20) NOT NULL,
`time` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for `user`
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`u_email` varchar(32) NOT NULL COMMENT '邮件',
`u_pwd` varchar(32) NOT NULL COMMENT '密码',
`u_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '用户状态',
`ss_port` int(11) NOT NULL COMMENT 'ss端口',
`ss_pwd` varchar(32) NOT NULL COMMENT 'ss密码',
`b_usage` bigint(20) NOT NULL COMMENT '使用流量',
`b_max` bigint(20) NOT NULL COMMENT '可使用最大流量',
`update_time` int(11) NOT NULL DEFAULT '0' COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `pk_ss_port` (`ss_port`) USING HASH,
UNIQUE KEY `pk_u_email` (`u_email`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=422 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;