diff --git a/shadowsocks/lru_cache.py b/shadowsocks/lru_cache.py new file mode 100644 index 0000000..399c810 --- /dev/null +++ b/shadowsocks/lru_cache.py @@ -0,0 +1,62 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +import collections +import logging +import heapq +import time + + +class LRUCache(collections.MutableMapping): + """This class is not thread safe""" + + def __init__(self, timeout=60, *args, **kwargs): + self.timeout = timeout + self.store = {} + self.time_to_keys = collections.defaultdict(list) + self.last_visits = [] + self.update(dict(*args, **kwargs)) # use the free update to set keys + + def __getitem__(self, key): + "O(logm)" + t = time.time() + self.time_to_keys[t].append(key) + heapq.heappush(self.last_visits, t) + return self.store[key] + + def __setitem__(self, key, value): + "O(logm)" + t = time.time() + self.store[key] = value + self.time_to_keys[t].append(key) + heapq.heappush(self.last_visits, t) + + def __delitem__(self, key): + "O(1)" + del self.store[key] + + def __iter__(self): + return iter(self.store) + + def __len__(self): + return len(self.store) + + def sweep(self): + "O(m)" + now = time.time() + c = 0 + while len(self.last_visits) > 0: + least = self.last_visits[0] + if now - least <= self.timeout: + break + for key in self.time_to_keys[least]: + heapq.heappop(self.last_visits) + if self.store.__contains__(key): + value = self.store[key] + if hasattr(value, 'close'): + value.close() + del self.store[key] + c += 1 + del self.time_to_keys[least] + if c: + logging.debug('%d keys swept' % c) diff --git a/shadowsocks/udprelay.py b/shadowsocks/udprelay.py index 36cd411..eb5c677 100644 --- a/shadowsocks/udprelay.py +++ b/shadowsocks/udprelay.py @@ -66,12 +66,15 @@ # `server` means the UDP server that handles user requests +import time import threading import socket import logging import struct import encrypt import eventloop +import lru_cache + BUF_SIZE = 65536 @@ -131,8 +134,8 @@ class UDPRelay(object): self._timeout = timeout self._is_local = is_local self._eventloop = eventloop.EventLoop() - self._cache = {} # TODO replace ith an LRU cache - self._client_fd_to_server_addr = {} # TODO replace ith an LRU cache + self._cache = lru_cache.LRUCache(timeout=timeout) + self._client_fd_to_server_addr = lru_cache.LRUCache(timeout=timeout) def _handle_server(self): server = self._server_socket @@ -223,13 +226,20 @@ class UDPRelay(object): def _run(self): server_socket = self._server_socket self._eventloop.add(server_socket, eventloop.MODE_IN) + last_time = time.time() while True: - events = self._eventloop.poll() + events = self._eventloop.poll(10) for sock, event in events: if sock == self._server_socket: self._handle_server() else: self._handle_client(sock) + now = time.time() + if now - last_time > 3.5: + self._cache.sweep() + if now - last_time > 7: + self._client_fd_to_server_addr.sweep() + last_time = now def start(self): addrs = socket.getaddrinfo(self._listen_addr, self._listen_port, 0,