fix push_db_all_user
This commit is contained in:
parent
933e75b9a7
commit
52041387f9
1 changed files with 24 additions and 16 deletions
|
@ -41,16 +41,14 @@ class TransferBase(object):
|
||||||
dt_transfer[id] = [self.last_get_transfer[id][0] - last_transfer[id][0], self.last_get_transfer[id][1] - last_transfer[id][1]]
|
dt_transfer[id] = [self.last_get_transfer[id][0] - last_transfer[id][0], self.last_get_transfer[id][1] - last_transfer[id][1]]
|
||||||
|
|
||||||
for id in curr_transfer.keys():
|
for id in curr_transfer.keys():
|
||||||
|
if id in self.force_update_transfer:
|
||||||
|
continue
|
||||||
#算出与上次记录的流量差值,保存于dt_transfer表
|
#算出与上次记录的流量差值,保存于dt_transfer表
|
||||||
if id in last_transfer:
|
if id in last_transfer:
|
||||||
if curr_transfer[id][0] + curr_transfer[id][1] - last_transfer[id][0] - last_transfer[id][1] <= 0:
|
if curr_transfer[id][0] + curr_transfer[id][1] - last_transfer[id][0] - last_transfer[id][1] <= 0:
|
||||||
continue
|
continue
|
||||||
if last_transfer[id][0] <= curr_transfer[id][0] and \
|
dt_transfer[id] = [curr_transfer[id][0] - last_transfer[id][0],
|
||||||
last_transfer[id][1] <= curr_transfer[id][1]:
|
curr_transfer[id][1] - last_transfer[id][1]]
|
||||||
dt_transfer[id] = [curr_transfer[id][0] - last_transfer[id][0],
|
|
||||||
curr_transfer[id][1] - last_transfer[id][1]]
|
|
||||||
else:
|
|
||||||
dt_transfer[id] = [curr_transfer[id][0], curr_transfer[id][1]]
|
|
||||||
else:
|
else:
|
||||||
if curr_transfer[id][0] + curr_transfer[id][1] <= 0:
|
if curr_transfer[id][0] + curr_transfer[id][1] <= 0:
|
||||||
continue
|
continue
|
||||||
|
@ -67,14 +65,16 @@ class TransferBase(object):
|
||||||
|
|
||||||
update_transfer = self.update_all_user(dt_transfer) #返回有更新的表
|
update_transfer = self.update_all_user(dt_transfer) #返回有更新的表
|
||||||
for id in update_transfer.keys(): #其增量加在此表
|
for id in update_transfer.keys(): #其增量加在此表
|
||||||
if id in self.force_update_transfer: #但排除在force_update_transfer内的
|
if id not in self.force_update_transfer: #但排除在force_update_transfer内的
|
||||||
if id in self.last_update_transfer:
|
|
||||||
del self.last_update_transfer[id]
|
|
||||||
self.force_update_transfer.remove(id)
|
|
||||||
else:
|
|
||||||
last = self.last_update_transfer.get(id, [0,0])
|
last = self.last_update_transfer.get(id, [0,0])
|
||||||
self.last_update_transfer[id] = [last[0] + update_transfer[id][0], last[1] + update_transfer[id][1]]
|
self.last_update_transfer[id] = [last[0] + update_transfer[id][0], last[1] + update_transfer[id][1]]
|
||||||
self.last_get_transfer = curr_transfer
|
self.last_get_transfer = curr_transfer
|
||||||
|
for id in self.force_update_transfer:
|
||||||
|
if id in self.last_update_transfer:
|
||||||
|
del self.last_update_transfer[id]
|
||||||
|
if id in self.last_get_transfer:
|
||||||
|
del self.last_get_transfer[id]
|
||||||
|
self.force_update_transfer = set()
|
||||||
|
|
||||||
def del_server_out_of_bound_safe(self, last_rows, rows):
|
def del_server_out_of_bound_safe(self, last_rows, rows):
|
||||||
#停止超流量的服务
|
#停止超流量的服务
|
||||||
|
@ -117,6 +117,7 @@ class TransferBase(object):
|
||||||
if not allow:
|
if not allow:
|
||||||
logging.info('db stop server at port [%s]' % (port,))
|
logging.info('db stop server at port [%s]' % (port,))
|
||||||
ServerPool.get_instance().cb_del_server(port)
|
ServerPool.get_instance().cb_del_server(port)
|
||||||
|
self.force_update_transfer.add(port)
|
||||||
else:
|
else:
|
||||||
cfgchange = False
|
cfgchange = False
|
||||||
if port in ServerPool.get_instance().tcp_servers_pool:
|
if port in ServerPool.get_instance().tcp_servers_pool:
|
||||||
|
@ -135,6 +136,7 @@ class TransferBase(object):
|
||||||
if cfgchange:
|
if cfgchange:
|
||||||
logging.info('db stop server at port [%s] reason: config changed: %s' % (port, cfg))
|
logging.info('db stop server at port [%s] reason: config changed: %s' % (port, cfg))
|
||||||
ServerPool.get_instance().cb_del_server(port)
|
ServerPool.get_instance().cb_del_server(port)
|
||||||
|
self.force_update_transfer.add(port)
|
||||||
new_servers[port] = (passwd, cfg)
|
new_servers[port] = (passwd, cfg)
|
||||||
|
|
||||||
elif allow and ServerPool.get_instance().server_run_status(port) is False:
|
elif allow and ServerPool.get_instance().server_run_status(port) is False:
|
||||||
|
@ -146,6 +148,7 @@ class TransferBase(object):
|
||||||
else:
|
else:
|
||||||
logging.info('db stop server at port [%s] reason: port not exist' % (row['port']))
|
logging.info('db stop server at port [%s] reason: port not exist' % (row['port']))
|
||||||
ServerPool.get_instance().cb_del_server(row['port'])
|
ServerPool.get_instance().cb_del_server(row['port'])
|
||||||
|
self.clear_cache(row['port'])
|
||||||
if row['port'] in self.port_uid_table:
|
if row['port'] in self.port_uid_table:
|
||||||
del self.port_uid_table[row['port']]
|
del self.port_uid_table[row['port']]
|
||||||
|
|
||||||
|
@ -156,13 +159,17 @@ class TransferBase(object):
|
||||||
passwd, cfg = new_servers[port]
|
passwd, cfg = new_servers[port]
|
||||||
self.new_server(port, passwd, cfg)
|
self.new_server(port, passwd, cfg)
|
||||||
|
|
||||||
|
def clear_cache(self, port):
|
||||||
|
if port in self.force_update_transfer: del self.force_update_transfer[port]
|
||||||
|
if port in self.last_get_transfer: del self.last_get_transfer[port]
|
||||||
|
if port in self.last_update_transfer: del self.last_update_transfer[port]
|
||||||
|
|
||||||
def new_server(self, port, passwd, cfg):
|
def new_server(self, port, passwd, cfg):
|
||||||
protocol = cfg.get('protocol', ServerPool.get_instance().config.get('protocol', 'origin'))
|
protocol = cfg.get('protocol', ServerPool.get_instance().config.get('protocol', 'origin'))
|
||||||
method = cfg.get('method', ServerPool.get_instance().config.get('method', 'None'))
|
method = cfg.get('method', ServerPool.get_instance().config.get('method', 'None'))
|
||||||
obfs = cfg.get('obfs', ServerPool.get_instance().config.get('obfs', 'plain'))
|
obfs = cfg.get('obfs', ServerPool.get_instance().config.get('obfs', 'plain'))
|
||||||
logging.info('db start server at port [%s] pass [%s] protocol [%s] method [%s] obfs [%s]' % (port, passwd, protocol, method, obfs))
|
logging.info('db start server at port [%s] pass [%s] protocol [%s] method [%s] obfs [%s]' % (port, passwd, protocol, method, obfs))
|
||||||
ServerPool.get_instance().new_server(port, cfg)
|
ServerPool.get_instance().new_server(port, cfg)
|
||||||
self.force_update_transfer.add(port)
|
|
||||||
|
|
||||||
def cmp(self, val1, val2):
|
def cmp(self, val1, val2):
|
||||||
if type(val1) is bytes:
|
if type(val1) is bytes:
|
||||||
|
@ -230,7 +237,7 @@ class DbTransfer(TransferBase):
|
||||||
"user": "ss",
|
"user": "ss",
|
||||||
"password": "pass",
|
"password": "pass",
|
||||||
"db": "shadowsocks",
|
"db": "shadowsocks",
|
||||||
"node_id": 1,
|
"node_id": 0,
|
||||||
"transfer_mul": 1.0,
|
"transfer_mul": 1.0,
|
||||||
"ssl_enable": 0,
|
"ssl_enable": 0,
|
||||||
"ssl_ca": "",
|
"ssl_ca": "",
|
||||||
|
@ -261,8 +268,9 @@ class DbTransfer(TransferBase):
|
||||||
for id in dt_transfer.keys():
|
for id in dt_transfer.keys():
|
||||||
transfer = dt_transfer[id]
|
transfer = dt_transfer[id]
|
||||||
#小于最低更新流量的先不更新
|
#小于最低更新流量的先不更新
|
||||||
update_trs = 1024 * max(2048 - self.user_pass.get(id, 0) * 64, 16)
|
update_trs = 1024 * (2048 - self.user_pass.get(id, 0) * 64)
|
||||||
if transfer[0] + transfer[1] < update_trs:
|
if transfer[0] + transfer[1] < update_trs and id not in self.force_update_transfer:
|
||||||
|
self.user_pass[id] = self.user_pass.get(id, 0) + 1
|
||||||
continue
|
continue
|
||||||
if id in self.user_pass:
|
if id in self.user_pass:
|
||||||
del self.user_pass[id]
|
del self.user_pass[id]
|
||||||
|
@ -372,7 +380,7 @@ class Dbv3Transfer(DbTransfer):
|
||||||
transfer = dt_transfer[id]
|
transfer = dt_transfer[id]
|
||||||
bandwidth_thistime = bandwidth_thistime + transfer[0] + transfer[1]
|
bandwidth_thistime = bandwidth_thistime + transfer[0] + transfer[1]
|
||||||
|
|
||||||
update_trs = 1024 * max(2048 - self.user_pass.get(id, 0) * 64, 16)
|
update_trs = 1024 * (2048 - self.user_pass.get(id, 0) * 64)
|
||||||
if transfer[0] + transfer[1] < update_trs:
|
if transfer[0] + transfer[1] < update_trs:
|
||||||
self.user_pass[id] = self.user_pass.get(id, 0) + 1
|
self.user_pass[id] = self.user_pass.get(id, 0) + 1
|
||||||
continue
|
continue
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue