test pass ota in udp

This commit is contained in:
mengskysama 2016-01-12 12:10:40 +08:00
parent 248582c932
commit 269e3dd82d
4 changed files with 60 additions and 23 deletions

View file

@ -133,6 +133,42 @@ class Encryptor(object):
return self.decipher.update(buf)
def gen_key_iv(password, method):
method = method.lower()
(key_len, iv_len, m) = method_supported[method]
key = None
if key_len > 0:
key, _ = EVP_BytesToKey(password, key_len, iv_len)
else:
key = password
iv = random_string(iv_len)
return key, iv, m
def encrypt_all_m(key, iv, m, method, data):
result = []
result.append(iv)
cipher = m(method, key, iv, 1)
result.append(cipher.update(data))
return b''.join(result)
def dencrypt_all(password, method, data):
result = []
method = method.lower()
(key_len, iv_len, m) = method_supported[method]
key = None
if key_len > 0:
key, _ = EVP_BytesToKey(password, key_len, iv_len)
else:
key = password
iv = data[:iv_len]
data = data[iv_len:]
cipher = m(method, key, iv, 0)
result.append(cipher.update(data))
return b''.join(result), key, iv
def encrypt_all(password, method, op, data):
result = []
method = method.lower()
@ -185,6 +221,18 @@ def test_encrypt_all():
assert plain == plain2
def test_encrypt_all_m():
from os import urandom
plain = urandom(10240)
for method in CIPHERS_TO_TEST:
logging.warn(method)
key, iv, m = gen_key_iv(b'key', method)
cipher = encrypt_all_m(key, iv, m, method, plain)
plain2, key, iv = dencrypt_all(b'key', method, cipher)
assert plain == plain2
if __name__ == '__main__':
test_encrypt_all()
test_encryptor()
test_encrypt_all_m()

View file

@ -1,11 +0,0 @@
{
"server":"127.0.0.1",
"server_port":8899,
"local_port":1081,
"password":"aes_password",
"timeout":60,
"method":"rc4-md5",
"local_address":"127.0.0.1",
"fast_open":false,
"one_time_auth":true
}

View file

@ -150,6 +150,8 @@ class UDPRelay(object):
def _handle_server(self):
server = self._server_socket
data, r_addr = server.recvfrom(BUF_SIZE)
key = None
iv = None
if not data:
logging.debug('UDP handle_server: data is empty')
if self._stat_callback:
@ -162,7 +164,7 @@ class UDPRelay(object):
else:
data = data[3:]
else:
data = encrypt.encrypt_all(self._password, self._method, 0, data)
data, key, iv = encrypt.dencrypt_all(self._password, self._method, data)
# decrypt data
if not data:
logging.debug(
@ -184,13 +186,11 @@ class UDPRelay(object):
logging.warn('UDP one time auth header is too short')
return
_hash = data[-ONETIMEAUTH_BYTES:]
_data = data[header_length: -ONETIMEAUTH_BYTES]
_key = self._encryptor.decipher_iv + self._encryptor.key
if onetimeauth_verify(_hash, _data, _key) is False:
data = data[: -ONETIMEAUTH_BYTES]
_key = iv + key
if onetimeauth_verify(_hash, data, _key) is False:
logging.warn('UDP one time auth fail')
return
self._one_time_authed = True
header_length += ONETIMEAUTH_BYTES
addrs = self._dns_cache.get(server_addr, None)
if addrs is None:
addrs = socket.getaddrinfo(server_addr, server_port, 0,
@ -221,10 +221,11 @@ class UDPRelay(object):
self._eventloop.add(client, eventloop.POLL_IN, self)
if self._is_local:
key, iv, m = gen_key_iv(self._password, self._method)
# spec https://shadowsocks.org/en/spec/one-time-auth.html
if self._one_time_auth_enable:
data = self._one_time_auth_chunk_data_gen(data)
data = encrypt.encrypt_all(self._password, self._method, 1, data)
data = self._ota_chunk_data_gen(key, iv, data)
data = encrypt.encrypt_all_m(key, iv, m, method, data)
if not data:
return
else:
@ -275,9 +276,9 @@ class UDPRelay(object):
# simply drop that packet
pass
def _one_time_auth_chunk_data_gen(self, data):
def _ota_chunk_data_gen(self, key, iv, data):
data = chr(ord(data[0]) | ADDRTYPE_AUTH) + data[1:]
key = self._encryptor.cipher_iv + self._encryptor.key
key = iv + key
return data + onetimeauth_gen(data, key)
def add_to_loop(self, loop):

View file

@ -7,6 +7,5 @@
"method":"rc4-md5",
"local_address":"127.0.0.1",
"fast_open":false,
"one_time_auth":true,
"verbose":1
"one_time_auth":true
}