diff --git a/fs/splice.c b/fs/splice.c index bfe62ae40f40..4f355a1c1a9e 100644 --- a/fs/splice.c +++ b/fs/splice.c @@ -261,6 +261,7 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe, return ret; } +EXPORT_SYMBOL_GPL(splice_to_pipe); void spd_release_page(struct splice_pipe_desc *spd, unsigned int i) { diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h index b617095adb88..6b41c15efa27 100644 --- a/include/linux/skbuff.h +++ b/include/linux/skbuff.h @@ -35,6 +35,7 @@ #include #include #include +#include /* A. Checksumming of received packets by device. * @@ -861,6 +862,9 @@ int skb_append_datato_frags(struct sock *sk, struct sk_buff *skb, int len, int odd, struct sk_buff *skb), void *from, int length); +int skb_append_pagefrags(struct sk_buff *skb, struct page *page, + int offset, size_t size); + struct skb_seq_state { __u32 lower_offset; __u32 upper_offset; @@ -2696,9 +2700,15 @@ int skb_copy_bits(const struct sk_buff *skb, int offset, void *to, int len); int skb_store_bits(struct sk_buff *skb, int offset, const void *from, int len); __wsum skb_copy_and_csum_bits(const struct sk_buff *skb, int offset, u8 *to, int len, __wsum csum); -int skb_splice_bits(struct sk_buff *skb, unsigned int offset, +ssize_t skb_socket_splice(struct sock *sk, + struct pipe_inode_info *pipe, + struct splice_pipe_desc *spd); +int skb_splice_bits(struct sk_buff *skb, struct sock *sk, unsigned int offset, struct pipe_inode_info *pipe, unsigned int len, - unsigned int flags); + unsigned int flags, + ssize_t (*splice_cb)(struct sock *, + struct pipe_inode_info *, + struct splice_pipe_desc *)); void skb_copy_and_csum_dev(const struct sk_buff *skb, u8 *to); unsigned int skb_zerocopy_headlen(const struct sk_buff *from); int skb_zerocopy(struct sk_buff *to, struct sk_buff *from, diff --git a/net/core/skbuff.c b/net/core/skbuff.c index f3fe9bd9e672..9bac0e6f8dfa 100644 --- a/net/core/skbuff.c +++ b/net/core/skbuff.c @@ -1870,15 +1870,39 @@ static bool __skb_splice_bits(struct sk_buff *skb, struct pipe_inode_info *pipe, return false; } +ssize_t skb_socket_splice(struct sock *sk, + struct pipe_inode_info *pipe, + struct splice_pipe_desc *spd) +{ + int ret; + + /* Drop the socket lock, otherwise we have reverse + * locking dependencies between sk_lock and i_mutex + * here as compared to sendfile(). We enter here + * with the socket lock held, and splice_to_pipe() will + * grab the pipe inode lock. For sendfile() emulation, + * we call into ->sendpage() with the i_mutex lock held + * and networking will grab the socket lock. + */ + release_sock(sk); + ret = splice_to_pipe(pipe, spd); + lock_sock(sk); + + return ret; +} + /* * Map data from the skb to a pipe. Should handle both the linear part, * the fragments, and the frag list. It does NOT handle frag lists within * the frag list, if such a thing exists. We'd probably need to recurse to * handle that cleanly. */ -int skb_splice_bits(struct sk_buff *skb, unsigned int offset, +int skb_splice_bits(struct sk_buff *skb, struct sock *sk, unsigned int offset, struct pipe_inode_info *pipe, unsigned int tlen, - unsigned int flags) + unsigned int flags, + ssize_t (*splice_cb)(struct sock *, + struct pipe_inode_info *, + struct splice_pipe_desc *)) { struct partial_page partial[MAX_SKB_FRAGS]; struct page *pages[MAX_SKB_FRAGS]; @@ -1891,7 +1915,6 @@ int skb_splice_bits(struct sk_buff *skb, unsigned int offset, .spd_release = sock_spd_release, }; struct sk_buff *frag_iter; - struct sock *sk = skb->sk; int ret = 0; /* @@ -1914,23 +1937,12 @@ int skb_splice_bits(struct sk_buff *skb, unsigned int offset, } done: - if (spd.nr_pages) { - /* - * Drop the socket lock, otherwise we have reverse - * locking dependencies between sk_lock and i_mutex - * here as compared to sendfile(). We enter here - * with the socket lock held, and splice_to_pipe() will - * grab the pipe inode lock. For sendfile() emulation, - * we call into ->sendpage() with the i_mutex lock held - * and networking will grab the socket lock. - */ - release_sock(sk); - ret = splice_to_pipe(pipe, &spd); - lock_sock(sk); - } + if (spd.nr_pages) + ret = splice_cb(sk, pipe, &spd); return ret; } +EXPORT_SYMBOL_GPL(skb_splice_bits); /** * skb_store_bits - store bits from kernel buffer to skb @@ -2915,6 +2927,24 @@ int skb_append_datato_frags(struct sock *sk, struct sk_buff *skb, } EXPORT_SYMBOL(skb_append_datato_frags); +int skb_append_pagefrags(struct sk_buff *skb, struct page *page, + int offset, size_t size) +{ + int i = skb_shinfo(skb)->nr_frags; + + if (skb_can_coalesce(skb, i, page, offset)) { + skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], size); + } else if (i < MAX_SKB_FRAGS) { + get_page(page); + skb_fill_page_desc(skb, i, page, offset, size); + } else { + return -EMSGSIZE; + } + + return 0; +} +EXPORT_SYMBOL_GPL(skb_append_pagefrags); + /** * skb_pull_rcsum - pull skb and update receive checksum * @skb: buffer to update diff --git a/net/ipv4/ip_output.c b/net/ipv4/ip_output.c index 8d91b922fcfe..451b009dae75 100644 --- a/net/ipv4/ip_output.c +++ b/net/ipv4/ip_output.c @@ -1233,11 +1233,9 @@ ssize_t ip_append_page(struct sock *sk, struct flowi4 *fl4, struct page *page, } while (size > 0) { - int i; - - if (skb_is_gso(skb)) + if (skb_is_gso(skb)) { len = size; - else { + } else { /* Check if the remaining data fits into current packet. */ len = mtu - skb->len; @@ -1289,15 +1287,10 @@ ssize_t ip_append_page(struct sock *sk, struct flowi4 *fl4, struct page *page, continue; } - i = skb_shinfo(skb)->nr_frags; if (len > size) len = size; - if (skb_can_coalesce(skb, i, page, offset)) { - skb_frag_size_add(&skb_shinfo(skb)->frags[i-1], len); - } else if (i < MAX_SKB_FRAGS) { - get_page(page); - skb_fill_page_desc(skb, i, page, offset, len); - } else { + + if (skb_append_pagefrags(skb, page, offset, len)) { err = -EMSGSIZE; goto error; } diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c index 90afcec3f427..65f791f74845 100644 --- a/net/ipv4/tcp.c +++ b/net/ipv4/tcp.c @@ -695,8 +695,9 @@ static int tcp_splice_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, struct tcp_splice_state *tss = rd_desc->arg.data; int ret; - ret = skb_splice_bits(skb, offset, tss->pipe, min(rd_desc->count, len), - tss->flags); + ret = skb_splice_bits(skb, skb->sk, offset, tss->pipe, + min(rd_desc->count, len), tss->flags, + skb_socket_splice); if (ret > 0) rd_desc->count -= ret; return ret; diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c index 941b3d26e3bf..b8c44076c776 100644 --- a/net/unix/af_unix.c +++ b/net/unix/af_unix.c @@ -518,6 +518,11 @@ static int unix_ioctl(struct socket *, unsigned int, unsigned long); static int unix_shutdown(struct socket *, int); static int unix_stream_sendmsg(struct socket *, struct msghdr *, size_t); static int unix_stream_recvmsg(struct socket *, struct msghdr *, size_t, int); +static ssize_t unix_stream_sendpage(struct socket *, struct page *, int offset, + size_t size, int flags); +static ssize_t unix_stream_splice_read(struct socket *, loff_t *ppos, + struct pipe_inode_info *, size_t size, + unsigned int flags); static int unix_dgram_sendmsg(struct socket *, struct msghdr *, size_t); static int unix_dgram_recvmsg(struct socket *, struct msghdr *, size_t, int); static int unix_dgram_connect(struct socket *, struct sockaddr *, @@ -558,7 +563,8 @@ static const struct proto_ops unix_stream_ops = { .sendmsg = unix_stream_sendmsg, .recvmsg = unix_stream_recvmsg, .mmap = sock_no_mmap, - .sendpage = sock_no_sendpage, + .sendpage = unix_stream_sendpage, + .splice_read = unix_stream_splice_read, .set_peek_off = unix_set_peek_off, }; @@ -1720,6 +1726,101 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg, return sent ? : err; } +static ssize_t unix_stream_sendpage(struct socket *socket, struct page *page, + int offset, size_t size, int flags) +{ + int err = 0; + bool send_sigpipe = true; + struct sock *other, *sk = socket->sk; + struct sk_buff *skb, *newskb = NULL, *tail = NULL; + + if (flags & MSG_OOB) + return -EOPNOTSUPP; + + other = unix_peer(sk); + if (!other || sk->sk_state != TCP_ESTABLISHED) + return -ENOTCONN; + + if (false) { +alloc_skb: + unix_state_unlock(other); + mutex_unlock(&unix_sk(other)->readlock); + newskb = sock_alloc_send_pskb(sk, 0, 0, flags & MSG_DONTWAIT, + &err, 0); + if (!newskb) + return err; + } + + /* we must acquire readlock as we modify already present + * skbs in the sk_receive_queue and mess with skb->len + */ + err = mutex_lock_interruptible(&unix_sk(other)->readlock); + if (err) { + err = flags & MSG_DONTWAIT ? -EAGAIN : -ERESTARTSYS; + send_sigpipe = false; + goto err; + } + + if (sk->sk_shutdown & SEND_SHUTDOWN) { + err = -EPIPE; + goto err_unlock; + } + + unix_state_lock(other); + + if (sock_flag(other, SOCK_DEAD) || + other->sk_shutdown & RCV_SHUTDOWN) { + err = -EPIPE; + goto err_state_unlock; + } + + skb = skb_peek_tail(&other->sk_receive_queue); + if (tail && tail == skb) { + skb = newskb; + } else if (!skb) { + if (newskb) + skb = newskb; + else + goto alloc_skb; + } else if (newskb) { + /* this is fast path, we don't necessarily need to + * call to kfree_skb even though with newskb == NULL + * this - does no harm + */ + consume_skb(newskb); + } + + if (skb_append_pagefrags(skb, page, offset, size)) { + tail = skb; + goto alloc_skb; + } + + skb->len += size; + skb->data_len += size; + skb->truesize += size; + atomic_add(size, &sk->sk_wmem_alloc); + + if (newskb) + __skb_queue_tail(&other->sk_receive_queue, newskb); + + unix_state_unlock(other); + mutex_unlock(&unix_sk(other)->readlock); + + other->sk_data_ready(other); + + return size; + +err_state_unlock: + unix_state_unlock(other); +err_unlock: + mutex_unlock(&unix_sk(other)->readlock); +err: + kfree_skb(newskb); + if (send_sigpipe && !(flags & MSG_NOSIGNAL)) + send_sig(SIGPIPE, current, 0); + return err; +} + static int unix_seqpacket_sendmsg(struct socket *sock, struct msghdr *msg, size_t len) { @@ -1860,8 +1961,9 @@ static int unix_dgram_recvmsg(struct socket *sock, struct msghdr *msg, * Sleep until more data has arrived. But check for races.. */ static long unix_stream_data_wait(struct sock *sk, long timeo, - struct sk_buff *last) + struct sk_buff *last, unsigned int last_len) { + struct sk_buff *tail; DEFINE_WAIT(wait); unix_state_lock(sk); @@ -1869,7 +1971,9 @@ static long unix_stream_data_wait(struct sock *sk, long timeo, for (;;) { prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); - if (skb_peek_tail(&sk->sk_receive_queue) != last || + tail = skb_peek_tail(&sk->sk_receive_queue); + if (tail != last || + (tail && tail->len != last_len) || sk->sk_err || (sk->sk_shutdown & RCV_SHUTDOWN) || signal_pending(current) || @@ -1893,38 +1997,50 @@ static unsigned int unix_skb_len(const struct sk_buff *skb) return skb->len - UNIXCB(skb).consumed; } -static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg, - size_t size, int flags) +struct unix_stream_read_state { + int (*recv_actor)(struct sk_buff *, int, int, + struct unix_stream_read_state *); + struct socket *socket; + struct msghdr *msg; + struct pipe_inode_info *pipe; + size_t size; + int flags; + unsigned int splice_flags; +}; + +static int unix_stream_read_generic(struct unix_stream_read_state *state) { struct scm_cookie scm; + struct socket *sock = state->socket; struct sock *sk = sock->sk; struct unix_sock *u = unix_sk(sk); - DECLARE_SOCKADDR(struct sockaddr_un *, sunaddr, msg->msg_name); int copied = 0; + int flags = state->flags; int noblock = flags & MSG_DONTWAIT; - int check_creds = 0; + bool check_creds = false; int target; int err = 0; long timeo; int skip; + size_t size = state->size; + unsigned int last_len; err = -EINVAL; if (sk->sk_state != TCP_ESTABLISHED) goto out; err = -EOPNOTSUPP; - if (flags&MSG_OOB) + if (flags & MSG_OOB) goto out; - target = sock_rcvlowat(sk, flags&MSG_WAITALL, size); + target = sock_rcvlowat(sk, flags & MSG_WAITALL, size); timeo = sock_rcvtimeo(sk, noblock); + memset(&scm, 0, sizeof(scm)); + /* Lock the socket to prevent queue disordering * while sleeps in memcpy_tomsg */ - - memset(&scm, 0, sizeof(scm)); - err = mutex_lock_interruptible(&u->readlock); if (unlikely(err)) { /* recvmsg() in non blocking mode is supposed to return -EAGAIN @@ -1940,6 +2056,7 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg, unix_state_lock(sk); last = skb = skb_peek(&sk->sk_receive_queue); + last_len = last ? last->len : 0; again: if (skb == NULL) { unix_sk(sk)->recursion_level = 0; @@ -1962,16 +2079,17 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg, break; mutex_unlock(&u->readlock); - timeo = unix_stream_data_wait(sk, timeo, last); + timeo = unix_stream_data_wait(sk, timeo, last, + last_len); - if (signal_pending(current) - || mutex_lock_interruptible(&u->readlock)) { + if (signal_pending(current) || + mutex_lock_interruptible(&u->readlock)) { err = sock_intr_errno(timeo); goto out; } continue; - unlock: +unlock: unix_state_unlock(sk); break; } @@ -1980,6 +2098,7 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg, while (skip >= unix_skb_len(skb)) { skip -= unix_skb_len(skb); last = skb; + last_len = skb->len; skb = skb_peek_next(skb, &sk->sk_receive_queue); if (!skb) goto again; @@ -1996,18 +2115,20 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg, } else if (test_bit(SOCK_PASSCRED, &sock->flags)) { /* Copy credentials */ scm_set_cred(&scm, UNIXCB(skb).pid, UNIXCB(skb).uid, UNIXCB(skb).gid); - check_creds = 1; + check_creds = true; } /* Copy address just once */ - if (sunaddr) { - unix_copy_addr(msg, skb->sk); + if (state->msg && state->msg->msg_name) { + DECLARE_SOCKADDR(struct sockaddr_un *, sunaddr, + state->msg->msg_name); + unix_copy_addr(state->msg, skb->sk); sunaddr = NULL; } chunk = min_t(unsigned int, unix_skb_len(skb) - skip, size); - if (skb_copy_datagram_msg(skb, UNIXCB(skb).consumed + skip, - msg, chunk)) { + chunk = state->recv_actor(skb, skip, chunk, state); + if (chunk < 0) { if (copied == 0) copied = -EFAULT; break; @@ -2045,11 +2166,85 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg, } while (size); mutex_unlock(&u->readlock); - scm_recv(sock, msg, &scm, flags); + if (state->msg) + scm_recv(sock, state->msg, &scm, flags); + else + scm_destroy(&scm); out: return copied ? : err; } +static int unix_stream_read_actor(struct sk_buff *skb, + int skip, int chunk, + struct unix_stream_read_state *state) +{ + int ret; + + ret = skb_copy_datagram_msg(skb, UNIXCB(skb).consumed + skip, + state->msg, chunk); + return ret ?: chunk; +} + +static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg, + size_t size, int flags) +{ + struct unix_stream_read_state state = { + .recv_actor = unix_stream_read_actor, + .socket = sock, + .msg = msg, + .size = size, + .flags = flags + }; + + return unix_stream_read_generic(&state); +} + +static ssize_t skb_unix_socket_splice(struct sock *sk, + struct pipe_inode_info *pipe, + struct splice_pipe_desc *spd) +{ + int ret; + struct unix_sock *u = unix_sk(sk); + + mutex_unlock(&u->readlock); + ret = splice_to_pipe(pipe, spd); + mutex_lock(&u->readlock); + + return ret; +} + +static int unix_stream_splice_actor(struct sk_buff *skb, + int skip, int chunk, + struct unix_stream_read_state *state) +{ + return skb_splice_bits(skb, state->socket->sk, + UNIXCB(skb).consumed + skip, + state->pipe, chunk, state->splice_flags, + skb_unix_socket_splice); +} + +static ssize_t unix_stream_splice_read(struct socket *sock, loff_t *ppos, + struct pipe_inode_info *pipe, + size_t size, unsigned int flags) +{ + struct unix_stream_read_state state = { + .recv_actor = unix_stream_splice_actor, + .socket = sock, + .pipe = pipe, + .size = size, + .splice_flags = flags, + }; + + if (unlikely(*ppos)) + return -ESPIPE; + + if (sock->file->f_flags & O_NONBLOCK || + flags & SPLICE_F_NONBLOCK) + state.flags = MSG_DONTWAIT; + + return unix_stream_read_generic(&state); +} + static int unix_shutdown(struct socket *sock, int mode) { struct sock *sk = sock->sk;