diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c index f5628ada47b5..e6a68d66f3b2 100644 --- a/net/ipv4/udp.c +++ b/net/ipv4/udp.c @@ -1195,10 +1195,36 @@ void udp_skb_destructor(struct sock *sk, struct sk_buff *skb) } EXPORT_SYMBOL(udp_skb_destructor); +/* Idea of busylocks is to let producers grab an extra spinlock + * to relieve pressure on the receive_queue spinlock shared by consumer. + * Under flood, this means that only one producer can be in line + * trying to acquire the receive_queue spinlock. + * These busylock can be allocated on a per cpu manner, instead of a + * per socket one (that would consume a cache line per socket) + */ +static int udp_busylocks_log __read_mostly; +static spinlock_t *udp_busylocks __read_mostly; + +static spinlock_t *busylock_acquire(void *ptr) +{ + spinlock_t *busy; + + busy = udp_busylocks + hash_ptr(ptr, udp_busylocks_log); + spin_lock(busy); + return busy; +} + +static void busylock_release(spinlock_t *busy) +{ + if (busy) + spin_unlock(busy); +} + int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) { struct sk_buff_head *list = &sk->sk_receive_queue; int rmem, delta, amt, err = -ENOMEM; + spinlock_t *busy = NULL; int size; /* try to avoid the costly atomic add/sub pair when the receive @@ -1214,8 +1240,11 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) * - Less cache line misses at copyout() time * - Less work at consume_skb() (less alien page frag freeing) */ - if (rmem > (sk->sk_rcvbuf >> 1)) + if (rmem > (sk->sk_rcvbuf >> 1)) { skb_condense(skb); + + busy = busylock_acquire(sk); + } size = skb->truesize; /* we drop only if the receive buf is full and the receive @@ -1252,6 +1281,7 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) if (!sock_flag(sk, SOCK_DEAD)) sk->sk_data_ready(sk); + busylock_release(busy); return 0; uncharge_drop: @@ -1259,6 +1289,7 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) drop: atomic_inc(&sk->sk_drops); + busylock_release(busy); return err; } EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb); @@ -2613,6 +2644,7 @@ EXPORT_SYMBOL(udp_flow_hashrnd); void __init udp_init(void) { unsigned long limit; + unsigned int i; udp_table_init(&udp_table, "UDP"); limit = nr_free_buffer_pages() / 8; @@ -2623,4 +2655,13 @@ void __init udp_init(void) sysctl_udp_rmem_min = SK_MEM_QUANTUM; sysctl_udp_wmem_min = SK_MEM_QUANTUM; + + /* 16 spinlocks per cpu */ + udp_busylocks_log = ilog2(nr_cpu_ids) + 4; + udp_busylocks = kmalloc(sizeof(spinlock_t) << udp_busylocks_log, + GFP_KERNEL); + if (!udp_busylocks) + panic("UDP: failed to alloc udp_busylocks\n"); + for (i = 0; i < (1U << udp_busylocks_log); i++) + spin_lock_init(udp_busylocks + i); }