From 956199efcdbc4a808b7fe0f0d69d12bb4b2835a2 Mon Sep 17 00:00:00 2001 From: clowwindy Date: Wed, 5 Aug 2015 18:12:38 +0800 Subject: [PATCH] add control manager --- shadowsocks/manager.py | 152 ++++++++++++++++++++++++++++++++++++++++ shadowsocks/server.py | 12 +++- shadowsocks/shell.py | 5 +- shadowsocks/tcprelay.py | 2 + shadowsocks/udprelay.py | 2 + 5 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 shadowsocks/manager.py diff --git a/shadowsocks/manager.py b/shadowsocks/manager.py new file mode 100644 index 0000000..199a96d --- /dev/null +++ b/shadowsocks/manager.py @@ -0,0 +1,152 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# Copyright 2015 clowwindy +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function, \ + with_statement + +import errno +import traceback +import socket +import logging +import json +import collections + +from shadowsocks import common, eventloop, tcprelay, udprelay, asyncdns, shell + + +BUF_SIZE = 2048 + + +class Manager(object): + + def __init__(self, config): + self._config = config + self._relays = {} # (tcprelay, udprelay) + self._loop = eventloop.EventLoop() + self._dns_resolver = asyncdns.DNSResolver() + self._dns_resolver.add_to_loop(self._loop) + self._control_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, + socket.IPPROTO_UDP) + self._statistics = collections.defaultdict(int) + self._control_client_addr = None + try: + self._control_socket.bind(('127.0.0.1', + int(config['manager_port']))) + self._control_socket.setblocking(False) + except (OSError, IOError) as e: + logging.error(e) + logging.error('can not bind to manager port') + exit(1) + self._loop.add(self._control_socket, + eventloop.POLL_IN, self) + + port_password = config['port_password'] + del config['port_password'] + for port, password in port_password.items(): + a_config = config.copy() + a_config['server_port'] = int(port) + a_config['password'] = password + self.add_port(a_config) + + def add_port(self, config): + port = int(config['server_port']) + servers = self._relays.get(port, None) + if servers: + logging.error("server already exists at %s:%d" % (config['server'], + port)) + return + logging.info("adding server at %s:%d" % (config['server'], port)) + t = tcprelay.TCPRelay(config, self._dns_resolver, False) + u = udprelay.UDPRelay(config, self._dns_resolver, False) + t.add_to_loop(self._loop) + u.add_to_loop(self._loop) + self._relays[port] = (t, u) + + def remove_port(self, config): + port = int(config['server_port']) + servers = self._relays.get(port, None) + if servers: + logging.info("removing server at %s:%d" % (config['server'], port)) + t, u = servers + t.close(next_tick=False) + u.close(next_tick=False) + del self._relays[port] + else: + logging.error("server not exist at %s:%d" % (config['server'], + port)) + + def handle_event(self, sock, fd, event): + if sock == self._control_socket and event == eventloop.POLL_IN: + data, self._control_client_addr = sock.recvfrom(BUF_SIZE) + parsed = self._parse_command(data) + if parsed: + command, config = parsed + a_config = self._config.copy() + if config: + a_config.update(config) + if 'server_port' not in a_config: + logging.error('can not find server_port in config') + else: + if command == 'add': + self.add_port(a_config) + elif command == 'remove': + self.remove_port(a_config) + elif command == 'ping': + self._send_control_data(b'pong') + else: + logging.error('unknown command %s', command) + + def _parse_command(self, data): + # commands: + # add: {"server_port": 8000, "password": "foobar"} + # remove: {"server_port": 8000"} + data = common.to_str(data) + parts = data.split(':', 1) + if len(parts) < 2: + return data, None + command, config_json = parts + try: + config = json.loads(config_json) + return command, config + except Exception as e: + logging.error(e) + return None + + def handle_periodic(self): + # TODO send statistics + pass + + def _send_control_data(self, data): + if self._control_client_addr: + try: + self._control_socket.sendto(data, self._control_client_addr) + except (socket.error, OSError, IOError) as e: + error_no = eventloop.errno_from_exception(e) + if error_no in (errno.EAGAIN, errno.EINPROGRESS, + errno.EWOULDBLOCK): + return + else: + shell.print_exception(e) + if self._config['verbose']: + traceback.print_exc() + + def run(self): + self._loop.run() + + +def run(config): + Manager(config).run() \ No newline at end of file diff --git a/shadowsocks/server.py b/shadowsocks/server.py index 429a20a..8ff13d6 100755 --- a/shadowsocks/server.py +++ b/shadowsocks/server.py @@ -24,7 +24,8 @@ import logging import signal sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../')) -from shadowsocks import shell, daemon, eventloop, tcprelay, udprelay, asyncdns +from shadowsocks import shell, daemon, eventloop, tcprelay, udprelay, \ + asyncdns, manager def main(): @@ -48,10 +49,17 @@ def main(): else: config['port_password'][str(server_port)] = config['password'] + if config['manager_port']: + logging.info('entering manager mode') + manager.run(config) + return + tcp_servers = [] udp_servers = [] dns_resolver = asyncdns.DNSResolver() - for port, password in config['port_password'].items(): + port_password = config['port_password'] + del config['port_password'] + for port, password in port_password.items(): a_config = config.copy() a_config['server_port'] = int(port) a_config['password'] = password diff --git a/shadowsocks/shell.py b/shadowsocks/shell.py index f8ae81f..586c2f6 100644 --- a/shadowsocks/shell.py +++ b/shadowsocks/shell.py @@ -136,7 +136,7 @@ def get_config(is_local): else: shortopts = 'hd:s:p:k:m:c:t:vq' longopts = ['help', 'fast-open', 'pid-file=', 'log-file=', 'workers=', - 'forbidden-ip=', 'user=', 'version'] + 'forbidden-ip=', 'user=', 'manager-port=', 'version'] try: config_path = find_config() optlist, args = getopt.getopt(sys.argv[1:], shortopts, longopts) @@ -181,6 +181,8 @@ def get_config(is_local): config['fast_open'] = True elif key == '--workers': config['workers'] = int(value) + elif key == '--manager-port': + config['manager_port'] = int(value) elif key == '--user': config['user'] = to_str(value) elif key == '--forbidden-ip': @@ -317,6 +319,7 @@ Proxy options: --fast-open use TCP_FASTOPEN, requires Linux 3.7+ --workers WORKERS number of workers, available on Unix/Linux --forbidden-ip IPLIST comma seperated IP list forbidden to connect + --manager-port PORT optional server manager UDP port General options: -h, --help show this help message and exit diff --git a/shadowsocks/tcprelay.py b/shadowsocks/tcprelay.py index e61b8ac..e14dcaa 100644 --- a/shadowsocks/tcprelay.py +++ b/shadowsocks/tcprelay.py @@ -708,3 +708,5 @@ class TCPRelay(object): self._eventloop.remove_periodic(self.handle_periodic) self._eventloop.remove(self._server_socket) self._server_socket.close() + for handler in list(self._fd_to_handlers.values()): + handler.destroy() diff --git a/shadowsocks/udprelay.py b/shadowsocks/udprelay.py index e3fe77c..a90df9f 100644 --- a/shadowsocks/udprelay.py +++ b/shadowsocks/udprelay.py @@ -290,3 +290,5 @@ class UDPRelay(object): self._eventloop.remove_periodic(self.handle_periodic) self._eventloop.remove(self._server_socket) self._server_socket.close() + for client in list(self._cache.values()): + client.close()