diff --git a/README.md b/README.md index 9f78680..473e84f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,58 @@ +shadowsocks manyuser branch +=========== +Install +------- +install MySQL 5.x.x + +`pip install cymysql` + +create a database named `shadowsocks` + +import `shadowsocks.sql` + +edit Config.py +Example: + + #Config + MYSQL_HOST = 'mdss.mengsky.net' + MYSQL_PORT = 3306 + MYSQL_USER = 'ss' + MYSQL_PASS = 'ss' + MYSQL_DB = 'shadowsocks' + + MANAGE_PASS = 'ss233333333' + #if you want manage in other server you should set this value to global ip + MANAGE_BIND_IP = '127.0.0.1' + #make sure this port is idle + MANAGE_PORT = 23333 + +TestRun `cd shadowsocks` ` python server.py` + +if no exception server will startup. you will see such like +Example: + + db start server at port [%s] pass [%s] + +User table colum +------------------ +`passwd` server pass +`port` server port +`t` last keepalive time +`u` upload transfer +`d` download transer +`transfer_enable` if u + d > transfer_enable this server will be stop (db_transfer.py del_server_out_of_bound_safe) + +Manage socket +------------------ +Manage server work in UDP at `MANAGE_BIND_IP` `MANAGE_PORT` + +use `MANAGE_PASS:port:passwd:0` to del a server at port `port` + +use `MANAGE_PASS:port:passwd:1` to run a server at port `port` password is `passwd` +eg: + + udpCliSock.sendto('MANAGE_PASS:65535:123456:1', (MANAGE_BIND_IP, MANAGE_PORT)) + shadowsocks =========== diff --git a/shadowsocks/Config.py b/shadowsocks/Config.py new file mode 100644 index 0000000..cb3ad45 --- /dev/null +++ b/shadowsocks/Config.py @@ -0,0 +1,12 @@ +#Config +MYSQL_HOST = 'mdss.mengsky.net' +MYSQL_PORT = 3306 +MYSQL_USER = 'ss' +MYSQL_PASS = 'ss' +MYSQL_DB = 'shadowsocks' + +MANAGE_PASS = 'ss233333333' +#if you want manage in other server you should set this value to global ip +MANAGE_BIND_IP = '127.0.0.1' +#make sure this port is idle +MANAGE_PORT = 23333 \ No newline at end of file diff --git a/shadowsocks/asyncmgr.py b/shadowsocks/asyncmgr.py new file mode 100644 index 0000000..cf23e7b --- /dev/null +++ b/shadowsocks/asyncmgr.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2014 clowwindy +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import time +import os +import socket +import struct +import re +import logging +import common +import lru_cache +import eventloop +import server_pool +import Config + +class ServerMgr(object): + + def __init__(self): + self._loop = None + self._request_id = 1 + self._hosts = {} + self._hostname_status = {} + self._hostname_to_cb = {} + self._cb_to_hostname = {} + self._last_time = time.time() + self._sock = None + self._servers = None + + def add_to_loop(self, loop): + if self._loop: + raise Exception('already add to loop') + self._loop = loop + # TODO when dns server is IPv6 + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, + socket.SOL_UDP) + self._sock.bind((Config.MANAGE_BIND_IP, Config.MANAGE_PORT)) + self._sock.setblocking(False) + loop.add(self._sock, eventloop.POLL_IN) + loop.add_handler(self.handle_events) + + def _handle_data(self, sock): + data, addr = sock.recvfrom(128) + #manage pwd:port:passwd:action + args = data.split(':') + if len(args) < 4: + return + if args[0] == Config.MANAGE_PASS: + if args[3] == '0': + server_pool.ServerPool.get_instance().cb_del_server(args[1]) + elif args[3] == '1': + server_pool.ServerPool.get_instance().new_server(args[1], args[2]) + + def handle_events(self, events): + for sock, fd, event in events: + if sock != self._sock: + continue + if event & eventloop.POLL_ERR: + logging.error('mgr socket err') + self._loop.remove(self._sock) + self._sock.close() + # TODO when dns server is IPv6 + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, + socket.SOL_UDP) + self._sock.setblocking(False) + self._loop.add(self._sock, eventloop.POLL_IN) + else: + self._handle_data(sock) + break + + def close(self): + if self._sock: + self._sock.close() + self._sock = None + + +def test(): + pass + +if __name__ == '__main__': + test() diff --git a/shadowsocks/config.json b/shadowsocks/config.json new file mode 100644 index 0000000..1607364 --- /dev/null +++ b/shadowsocks/config.json @@ -0,0 +1,11 @@ +{ + "server":"127.0.0.1", + "server_port":8388, + "local_address": "127.0.0.1", + "local_port":1080, + "password":"m", + "timeout":300, + "method":"aes-256-cfb", + "fast_open": false, + "workers": 1 +} \ No newline at end of file diff --git a/shadowsocks/db_transfer.py b/shadowsocks/db_transfer.py new file mode 100644 index 0000000..4ce72fe --- /dev/null +++ b/shadowsocks/db_transfer.py @@ -0,0 +1,119 @@ +#!/usr/bin/python +# -*- coding: UTF-8 -*- + +import logging +import cymysql +import time +import sys +from server_pool import ServerPool +import Config + +class DbTransfer(object): + + instance = None + + def __init__(self): + self.last_get_transfer = {} + + @staticmethod + def get_instance(): + if DbTransfer.instance is None: + DbTransfer.instance = DbTransfer() + return DbTransfer.instance + + def push_db_all_user(self): + #更新用户流量到数据库 + last_transfer = self.last_get_transfer + curr_transfer = ServerPool.get_instance().get_servers_transfer() + #上次和本次的增量 + dt_transfer = {} + for id in curr_transfer.keys(): + if id in last_transfer: + if last_transfer[id][0] == curr_transfer[id][0] and \ + last_transfer[id][1] == curr_transfer[id][1]: + continue + elif curr_transfer[id][0] == 0 and curr_transfer[id][1] == 0: + continue + elif last_transfer[id][0] <= curr_transfer[id][0] and \ + last_transfer[id][1] <= curr_transfer[id][1]: + dt_transfer[id] = [curr_transfer[id][0] - last_transfer[id][0] ,curr_transfer[id][1] - last_transfer[id][1]] + else: + dt_transfer[id] = [curr_transfer[id][0], curr_transfer[id][1]] + else: + if curr_transfer[id][0] == 0 and curr_transfer[id][1] == 0: + continue + dt_transfer[id] = [curr_transfer[id][0], curr_transfer[id][1]] + + self.last_get_transfer = curr_transfer + query_head = 'UPDATE user' + query_sub_when = '' + query_sub_when2 = '' + query_sub_in = None + last_time = time.time() + for id in dt_transfer.keys(): + query_sub_when += ' WHEN %s THEN u+%s' % (id, dt_transfer[id][0]) + query_sub_when2 += ' WHEN %s THEN d+%s' % (id, dt_transfer[id][1]) + if query_sub_in is not None: + query_sub_in += ',%s' % id + else: + query_sub_in = '%s' % id + if query_sub_when == '': + return + query_sql = query_head + ' SET u = CASE port' + query_sub_when + \ + ' END, d = CASE port' + query_sub_when2 + \ + ' END, t = ' + str(int(last_time)) + \ + ' WHERE port IN (%s)' % query_sub_in + #print query_sql + conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER, passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8') + cur = conn.cursor() + cur.execute(query_sql) + cur.close() + conn.commit() + conn.close() + + @staticmethod + def pull_db_all_user(): + #数据库所有用户信息 + conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER, passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8') + cur = conn.cursor() + cur.execute("SELECT port, u, d, transfer_enable, passwd, switch, enable FROM user") + rows = [] + for r in cur.fetchall(): + rows.append(list(r)) + cur.close() + conn.close() + return rows + + @staticmethod + def del_server_out_of_bound_safe(rows): + #停止超流量的服务 + for row in rows: + if ServerPool.get_instance().server_is_run(row[0]) is True: + if row[1] + row[2] >= row[3]: + logging.info('db stop server at port [%s]' % (row[0])) + ServerPool.get_instance().del_server(row[0]) + else: + if row[5] == 1 and row[6] == 1 and row[1] + row[2] < row[3]: + logging.info('db start server at port [%s] pass [%s]' % (row[0], row[4])) + ServerPool.get_instance().new_server(row[0], row[4]) + + @staticmethod + def thread_db(): + import socket + import time + timeout = 60 + socket.setdefaulttimeout(timeout) + while True: + #logging.warn('db loop') + try: + DbTransfer.get_instance().push_db_all_user() + rows = DbTransfer.get_instance().pull_db_all_user() + DbTransfer.del_server_out_of_bound_safe(rows) + except Exception as e: + logging.warn('db thread except:%s' % e) + finally: + time.sleep(15) + + +#SQLData.pull_db_all_user() +#print DbTransfer.get_instance().test() diff --git a/shadowsocks/eventloop.py b/shadowsocks/eventloop.py index 0b628e0..52d890b 100644 --- a/shadowsocks/eventloop.py +++ b/shadowsocks/eventloop.py @@ -192,6 +192,9 @@ class EventLoop(object): def add_handler(self, handler): self._handlers.append(handler) + def remove_handler(self, handler): + self._handlers.remove(handler) + def run(self): while not self.stopping: try: diff --git a/shadowsocks/server.py b/shadowsocks/server.py index d5648bc..2ca3186 100755 --- a/shadowsocks/server.py +++ b/shadowsocks/server.py @@ -1,123 +1,15 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -# Copyright (c) 2014 clowwindy -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - +import time import sys -import os -import logging -import utils -import encrypt -import eventloop -import tcprelay -import udprelay -import asyncdns - - -def main(): - utils.check_python() - - config = utils.get_config(False) - - utils.print_shadowsocks() - - if config['port_password']: - if config['server_port'] or config['password']: - logging.warn('warning: port_password should not be used with ' - 'server_port and password. server_port and password ' - 'will be ignored') - else: - config['port_password'] = {} - server_port = config['server_port'] - if type(server_port) == list: - for a_server_port in server_port: - config['port_password'][a_server_port] = config['password'] - else: - config['port_password'][str(server_port)] = config['password'] - - encrypt.init_table(config['password'], config['method']) - tcp_servers = [] - udp_servers = [] - dns_resolver = asyncdns.DNSResolver() - for port, password in config['port_password'].items(): - a_config = config.copy() - a_config['server_port'] = int(port) - a_config['password'] = password - logging.info("starting server at %s:%d" % - (a_config['server'], int(port))) - tcp_servers.append(tcprelay.TCPRelay(a_config, dns_resolver, False)) - udp_servers.append(udprelay.UDPRelay(a_config, dns_resolver, False)) - - def run_server(): - try: - loop = eventloop.EventLoop() - dns_resolver.add_to_loop(loop) - for tcp_server in tcp_servers: - tcp_server.add_to_loop(loop) - for udp_server in udp_servers: - udp_server.add_to_loop(loop) - loop.run() - except (KeyboardInterrupt, IOError, OSError) as e: - logging.error(e) - import traceback - traceback.print_exc() - os._exit(0) - - if int(config['workers']) > 1: - if os.name == 'posix': - children = [] - is_child = False - for i in xrange(0, int(config['workers'])): - r = os.fork() - if r == 0: - logging.info('worker started') - is_child = True - run_server() - break - else: - children.append(r) - if not is_child: - def handler(signum, _): - for pid in children: - os.kill(pid, signum) - os.waitpid(pid, 0) - sys.exit() - import signal - signal.signal(signal.SIGTERM, handler) - - # master - for a_tcp_server in tcp_servers: - a_tcp_server.close() - for a_udp_server in udp_servers: - a_udp_server.close() - dns_resolver.close() - - for child in children: - os.waitpid(child, 0) - else: - logging.warn('worker is only available on Unix/Linux') - run_server() - else: - run_server() +import thread +import server_pool +import db_transfer +#def test(): +# thread.start_new_thread(DbTransfer.thread_db, ()) +# Api.web_server() if __name__ == '__main__': - main() + #server_pool.ServerPool.get_instance() + thread.start_new_thread(db_transfer.DbTransfer.thread_db, ()) + while True: + time.sleep(99999) diff --git a/shadowsocks/server_pool.py b/shadowsocks/server_pool.py new file mode 100644 index 0000000..61f5ca0 --- /dev/null +++ b/shadowsocks/server_pool.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2014 clowwindy +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import logging +import utils +import time +import eventloop +import tcprelay +import udprelay +import asyncdns +import thread +import threading +import sys +import asyncmgr + + +class ServerPool(object): + + instance = None + + def __init__(self): + utils.check_python() + self.config = utils.get_config(False) + utils.print_shadowsocks() + self.dns_resolver = asyncdns.DNSResolver() + + self.tcp_servers_pool = {} + #self.udp_servers_pool = {} + + self.loop = eventloop.EventLoop() + thread.start_new_thread(ServerPool.run_server, (self.loop, self.dns_resolver)) + + @staticmethod + def get_instance(): + if ServerPool.instance is None: + ServerPool.instance = ServerPool() + return ServerPool.instance + + @staticmethod + def run_server(loop, dns_resolver): + try: + mgr = asyncmgr.ServerMgr() + mgr.add_to_loop(loop) + dns_resolver.add_to_loop(loop) + loop.run() + except (KeyboardInterrupt, IOError, OSError) as e: + logging.error(e) + import traceback + traceback.print_exc() + os.exit(0) + + def server_is_run(self, port): + port = int(port) + if port in self.tcp_servers_pool: + return True + return False + + def new_server(self, port, password): + ret = True + port = int(port) + if self.server_is_run(port): + logging.info("server already at %s:%d" %(self.config['server'], port)) + return 'this port server is already running' + + a_config = self.config.copy() + a_config['server_port'] = port + a_config['password'] = password + logging.info("starting server at %s:%d" %(a_config['server'], port)) + try: + tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False) + #udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False) + except Exception, e: + logging.warn(e) + return e + try: + #add is safe + tcp_server.add_to_loop(self.loop) + self.tcp_servers_pool.update({port: tcp_server}) + #self.udp_servers_pool.update({port: tcp_server}) + except Exception, e: + logging.warn(e) + ret = e + return ret + + def cb_del_server(self, port): + port = int(port) + ret = True + if port not in self.tcp_servers_pool: + logging.info("stopped server at %s:%d already stop" % (self.config['server'], int(port))) + return True + logging.info("stopped server at %s:%d" % (self.config['server'], int(port))) + try: + self.tcp_servers_pool[int(port)].destroy() + del self.tcp_servers_pool[int(port)] + #del self.udp_servers_pool[int(port)] + except Exception, e: + ret = e + logging.warn(e) + import traceback + traceback.print_exc() + return ret + + def get_server_transfer(self, port): + port = int(port) + if port in self.tcp_servers_pool: + return [self.tcp_servers_pool[port].server_transfer_ul, self.tcp_servers_pool[port].server_transfer_dl] + return [0,0] + + def get_servers_transfer(self): + ret = {} + for server_port in self.tcp_servers_pool.keys(): + ret[server_port] = [self.tcp_servers_pool[server_port].server_transfer_ul + ,self.tcp_servers_pool[server_port].server_transfer_dl] + return ret + diff --git a/shadowsocks/shadowsocks.sql b/shadowsocks/shadowsocks.sql new file mode 100644 index 0000000..b828eaa --- /dev/null +++ b/shadowsocks/shadowsocks.sql @@ -0,0 +1,24 @@ +SET FOREIGN_KEY_CHECKS=0; + +CREATE TABLE `user` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `email` varchar(32) NOT NULL, + `pass` varchar(16) NOT NULL, + `passwd` varchar(16) NOT NULL, + `t` int(11) NOT NULL DEFAULT '0', + `u` int(20) NOT NULL, + `d` int(20) NOT NULL, + `transfer_enable` bigint(20) NOT NULL, + `port` int(11) NOT NULL, + `switch` tinyint(4) NOT NULL DEFAULT '1', + `enable` tinyint(4) NOT NULL DEFAULT '1', + `type` tinyint(4) NOT NULL DEFAULT '1', + `last_get_gitf_time` int(11) NOT NULL DEFAULT '0', + `last_rest_pass_time` int(11) NOT NULL DEFAULT '0', + PRIMARY KEY (`id`,`port`) +) ENGINE=InnoDB AUTO_INCREMENT=415 DEFAULT CHARSET=utf8; + +-- ---------------------------- +-- Records of user +-- ---------------------------- +INSERT INTO `user` VALUES ('7', 'test@test.com', '123456', '0000000', '1410609560', '0', '0', '9320666234', '50000', '1', '1', '7', '0', '0'); \ No newline at end of file diff --git a/shadowsocks/tcprelay.py b/shadowsocks/tcprelay.py index 36b1c56..b2bc44d 100644 --- a/shadowsocks/tcprelay.py +++ b/shadowsocks/tcprelay.py @@ -539,6 +539,21 @@ class TCPRelay(object): self._eventloop.add(self._server_socket, eventloop.POLL_IN | eventloop.POLL_ERR) + def remove_to_loop(self): + self._eventloop.remove(self._server_socket) + self._eventloop.remove_handler(self._handle_events) + + def destroy(self): + #destroy all conn and server conn at this tcprelay + self.remove_to_loop() + for fd in self._fd_to_handlers.keys(): + try: + self._fd_to_handlers[fd].destroy() + except Exception, e: + #already destroy + pass + self.close() + def remove_handler(self, handler): index = self._handler_to_timeouts.get(hash(handler), -1) if index >= 0: