diff --git a/net/tipc/msg.c b/net/tipc/msg.c index c69fb99163fc..23809039dda1 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -235,9 +235,6 @@ int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen, msg_set_size(hdr, MIN_H_SIZE); __skb_queue_tail(txq, skb); total += 1; - if (prev) - msg_set_ack_required(buf_msg(prev), 0); - msg_set_ack_required(hdr, 1); } hdr = buf_msg(skb); curr = msg_blocks(hdr); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index cd4281779468..58660d56bc83 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -340,9 +340,19 @@ static inline int msg_ack_required(struct tipc_msg *m) return msg_bits(m, 0, 18, 1); } -static inline void msg_set_ack_required(struct tipc_msg *m, u32 d) +static inline void msg_set_ack_required(struct tipc_msg *m) { - msg_set_bits(m, 0, 18, 1, d); + msg_set_bits(m, 0, 18, 1, 1); +} + +static inline int msg_nagle_ack(struct tipc_msg *m) +{ + return msg_bits(m, 0, 18, 1); +} + +static inline void msg_set_nagle_ack(struct tipc_msg *m) +{ + msg_set_bits(m, 0, 18, 1, 1); } static inline bool msg_is_rcast(struct tipc_msg *m) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index e370ad0edd76..d6b67d07d22e 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -48,6 +48,8 @@ #include "group.h" #include "trace.h" +#define NAGLE_START_INIT 4 +#define NAGLE_START_MAX 1024 #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ #define CONN_PROBING_INTV msecs_to_jiffies(3600000) /* [ms] => 1 h */ #define TIPC_FWD_MSG 1 @@ -119,7 +121,10 @@ struct tipc_sock { struct rcu_head rcu; struct tipc_group *group; u32 oneway; + u32 nagle_start; u16 snd_backlog; + u16 msg_acc; + u16 pkt_cnt; bool expect_ack; bool nodelay; bool group_is_open; @@ -143,7 +148,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz); static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); -static void tipc_sk_push_backlog(struct tipc_sock *tsk); +static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -474,6 +479,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk = tipc_sk(sk); tsk->max_pkt = MAX_PKT_DEFAULT; tsk->maxnagle = 0; + tsk->nagle_start = NAGLE_START_INIT; INIT_LIST_HEAD(&tsk->publications); INIT_LIST_HEAD(&tsk->cong_links); msg = &tsk->phdr; @@ -541,7 +547,7 @@ static void __tipc_shutdown(struct socket *sock, int error) !tsk_conn_cong(tsk))); /* Push out delayed messages if in Nagle mode */ - tipc_sk_push_backlog(tsk); + tipc_sk_push_backlog(tsk, false); /* Remove pending SYN */ __skb_queue_purge(&sk->sk_write_queue); @@ -1252,14 +1258,37 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, /* tipc_sk_push_backlog(): send accumulated buffers in socket write queue * when socket is in Nagle mode */ -static void tipc_sk_push_backlog(struct tipc_sock *tsk) +static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack) { struct sk_buff_head *txq = &tsk->sk.sk_write_queue; + struct sk_buff *skb = skb_peek_tail(txq); struct net *net = sock_net(&tsk->sk); u32 dnode = tsk_peer_node(tsk); - struct sk_buff *skb = skb_peek(txq); int rc; + if (nagle_ack) { + tsk->pkt_cnt += skb_queue_len(txq); + if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) { + tsk->oneway = 0; + if (tsk->nagle_start < NAGLE_START_MAX) + tsk->nagle_start *= 2; + tsk->expect_ack = false; + pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n", + tsk->portid, tsk->msg_acc, tsk->pkt_cnt, + tsk->nagle_start); + } else { + tsk->nagle_start = NAGLE_START_INIT; + if (skb) { + msg_set_ack_required(buf_msg(skb)); + tsk->expect_ack = true; + } else { + tsk->expect_ack = false; + } + } + tsk->msg_acc = 0; + tsk->pkt_cnt = 0; + } + if (!skb || tsk->cong_link_cnt) return; @@ -1267,9 +1296,10 @@ static void tipc_sk_push_backlog(struct tipc_sock *tsk) if (msg_is_syn(buf_msg(skb))) return; + if (tsk->msg_acc) + tsk->pkt_cnt += skb_queue_len(txq); tsk->snt_unacked += tsk->snd_backlog; tsk->snd_backlog = 0; - tsk->expect_ack = true; rc = tipc_node_xmit(net, txq, dnode, tsk->portid); if (rc == -ELINKCONG) tsk->cong_link_cnt = 1; @@ -1322,8 +1352,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, return; } else if (mtyp == CONN_ACK) { was_cong = tsk_conn_cong(tsk); - tsk->expect_ack = false; - tipc_sk_push_backlog(tsk); + tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr)); tsk->snt_unacked -= msg_conn_ack(hdr); if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) tsk->snd_win = msg_adv_win(hdr); @@ -1516,6 +1545,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = &tsk->phdr; struct net *net = sock_net(sk); + struct sk_buff *skb; u32 dnode = tsk_peer_node(tsk); int maxnagle = tsk->maxnagle; int maxpkt = tsk->max_pkt; @@ -1544,17 +1574,25 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) break; send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); blocks = tsk->snd_backlog; - if (tsk->oneway++ >= 4 && send <= maxnagle) { + if (tsk->oneway++ >= tsk->nagle_start && send <= maxnagle) { rc = tipc_msg_append(hdr, m, send, maxnagle, txq); if (unlikely(rc < 0)) break; blocks += rc; + tsk->msg_acc++; if (blocks <= 64 && tsk->expect_ack) { tsk->snd_backlog = blocks; sent += send; break; + } else if (blocks > 64) { + tsk->pkt_cnt += skb_queue_len(txq); + } else { + skb = skb_peek_tail(txq); + msg_set_ack_required(buf_msg(skb)); + tsk->expect_ack = true; + tsk->msg_acc = 0; + tsk->pkt_cnt = 0; } - tsk->expect_ack = true; } else { rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq); if (unlikely(rc != send)) @@ -2091,7 +2129,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, smp_wmb(); tsk->cong_link_cnt--; wakeup = true; - tipc_sk_push_backlog(tsk); + tipc_sk_push_backlog(tsk, false); break; case GROUP_PROTOCOL: tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq); @@ -2180,7 +2218,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb, return false; case TIPC_ESTABLISHED: if (!skb_queue_empty(&sk->sk_write_queue)) - tipc_sk_push_backlog(tsk); + tipc_sk_push_backlog(tsk, false); /* Accept only connection-based messages sent by peer */ if (likely(con_msg && !err && pport == oport && pnode == onode)) { @@ -2188,8 +2226,10 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb, struct sk_buff *skb; skb = tipc_sk_build_ack(tsk); - if (skb) + if (skb) { + msg_set_nagle_ack(buf_msg(skb)); __skb_queue_tail(xmitq, skb); + } } return true; }