diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h index aba0f0f429be..e3d080c299f6 100644 --- a/include/linux/skmsg.h +++ b/include/linux/skmsg.h @@ -126,8 +126,6 @@ int sk_msg_zerocopy_from_iter(struct sock *sk, struct iov_iter *from, struct sk_msg *msg, u32 bytes); int sk_msg_memcopy_from_iter(struct sock *sk, struct iov_iter *from, struct sk_msg *msg, u32 bytes); -int sk_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags, - long timeo, int *err); int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, int len, int flags); diff --git a/net/core/skmsg.c b/net/core/skmsg.c index 43ce17a6a585..f9a81b314e4c 100644 --- a/net/core/skmsg.c +++ b/net/core/skmsg.c @@ -399,29 +399,6 @@ out: } EXPORT_SYMBOL_GPL(sk_msg_memcopy_from_iter); -int sk_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags, - long timeo, int *err) -{ - DEFINE_WAIT_FUNC(wait, woken_wake_function); - int ret = 0; - - if (sk->sk_shutdown & RCV_SHUTDOWN) - return 1; - - if (!timeo) - return ret; - - add_wait_queue(sk_sleep(sk), &wait); - sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); - ret = sk_wait_event(sk, &timeo, - !list_empty(&psock->ingress_msg) || - !skb_queue_empty(&sk->sk_receive_queue), &wait); - sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); - remove_wait_queue(sk_sleep(sk), &wait); - return ret; -} -EXPORT_SYMBOL_GPL(sk_msg_wait_data); - /* Receive sk_msg from psock->ingress_msg to @msg. */ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, int len, int flags) diff --git a/net/ipv4/tcp_bpf.c b/net/ipv4/tcp_bpf.c index ad9d17923fc5..bb49b52d7be8 100644 --- a/net/ipv4/tcp_bpf.c +++ b/net/ipv4/tcp_bpf.c @@ -163,6 +163,28 @@ static bool tcp_bpf_stream_read(const struct sock *sk) return !empty; } +static int tcp_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags, + long timeo, int *err) +{ + DEFINE_WAIT_FUNC(wait, woken_wake_function); + int ret = 0; + + if (sk->sk_shutdown & RCV_SHUTDOWN) + return 1; + + if (!timeo) + return ret; + + add_wait_queue(sk_sleep(sk), &wait); + sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); + ret = sk_wait_event(sk, &timeo, + !list_empty(&psock->ingress_msg) || + !skb_queue_empty(&sk->sk_receive_queue), &wait); + sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); + remove_wait_queue(sk_sleep(sk), &wait); + return ret; +} + static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock, int flags, int *addr_len) { @@ -188,7 +210,7 @@ msg_bytes_ready: long timeo; timeo = sock_rcvtimeo(sk, nonblock); - data = sk_msg_wait_data(sk, psock, flags, timeo, &err); + data = tcp_msg_wait_data(sk, psock, flags, timeo, &err); if (data) { if (!sk_psock_queue_empty(psock)) goto msg_bytes_ready; diff --git a/net/ipv4/udp_bpf.c b/net/ipv4/udp_bpf.c index 954c4591a6fd..565a70040c57 100644 --- a/net/ipv4/udp_bpf.c +++ b/net/ipv4/udp_bpf.c @@ -21,6 +21,45 @@ static int sk_udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, return udp_prot.recvmsg(sk, msg, len, noblock, flags, addr_len); } +static bool udp_sk_has_data(struct sock *sk) +{ + return !skb_queue_empty(&udp_sk(sk)->reader_queue) || + !skb_queue_empty(&sk->sk_receive_queue); +} + +static bool psock_has_data(struct sk_psock *psock) +{ + return !skb_queue_empty(&psock->ingress_skb) || + !sk_psock_queue_empty(psock); +} + +#define udp_msg_has_data(__sk, __psock) \ + ({ udp_sk_has_data(__sk) || psock_has_data(__psock); }) + +static int udp_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags, + long timeo, int *err) +{ + DEFINE_WAIT_FUNC(wait, woken_wake_function); + int ret = 0; + + if (sk->sk_shutdown & RCV_SHUTDOWN) + return 1; + + if (!timeo) + return ret; + + add_wait_queue(sk_sleep(sk), &wait); + sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); + ret = udp_msg_has_data(sk, psock); + if (!ret) { + wait_woken(&wait, TASK_INTERRUPTIBLE, timeo); + ret = udp_msg_has_data(sk, psock); + } + sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); + remove_wait_queue(sk_sleep(sk), &wait); + return ret; +} + static int udp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock, int flags, int *addr_len) { @@ -34,8 +73,7 @@ static int udp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, if (unlikely(!psock)) return sk_udp_recvmsg(sk, msg, len, nonblock, flags, addr_len); - lock_sock(sk); - if (sk_psock_queue_empty(psock)) { + if (!psock_has_data(psock)) { ret = sk_udp_recvmsg(sk, msg, len, nonblock, flags, addr_len); goto out; } @@ -47,9 +85,9 @@ msg_bytes_ready: long timeo; timeo = sock_rcvtimeo(sk, nonblock); - data = sk_msg_wait_data(sk, psock, flags, timeo, &err); + data = udp_msg_wait_data(sk, psock, flags, timeo, &err); if (data) { - if (!sk_psock_queue_empty(psock)) + if (psock_has_data(psock)) goto msg_bytes_ready; ret = sk_udp_recvmsg(sk, msg, len, nonblock, flags, addr_len); goto out; @@ -62,7 +100,6 @@ msg_bytes_ready: } ret = copied; out: - release_sock(sk); sk_psock_put(sk, psock); return ret; }