pipe: fix and clarify pipe write wakeup logic

The pipe rework ends up having been extra painful, partly becaused of
actual bugs with ordering and caching of the pipe state, but also
because of subtle performance issues.

In particular, the pipe rework caused the kernel build to inexplicably
slow down.

The reason turns out to be that the GNU make jobserver (which limits the
parallelism of the build) uses a pipe to implement a "token" system: a
parallel submake will read a character from the pipe to get the job
token before starting a new job, and will write a character back to the
pipe when it is done.  The overall job limit is thus easily controlled
by just writing the appropriate number of initial token characters into
the pipe.

But to work well, that really means that the old behavior of write
wakeups being synchronous (WF_SYNC) is very important - when the pipe
writer wakes up a reader, we want the reader to actually get scheduled
immediately.  Otherwise you lose the parallelism of the build.

The pipe rework lost that synchronous wakeup on write, and we had
clearly all forgotten the reasons and rules for it.

This rewrites the pipe write wakeup logic to do the required Wsync
wakeups, but also clarifies the logic and avoids extraneous wakeups.

It also ends up addign a number of comments about what oit does and why,
so that we hopefully don't end up forgetting about this next time we
change this code.

Cc: David Howells <dhowells@redhat.com>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
This commit is contained in:
Linus Torvalds 2019-12-07 12:14:28 -08:00
parent ad910e36da
commit 1b6b26ae70
1 changed files with 41 additions and 18 deletions

View File

@ -391,9 +391,9 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
struct pipe_inode_info *pipe = filp->private_data;
unsigned int head;
ssize_t ret = 0;
int do_wakeup = 0;
size_t total_len = iov_iter_count(from);
ssize_t chars;
bool was_empty = false;
/* Null write succeeds. */
if (unlikely(total_len == 0))
@ -407,11 +407,21 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
goto out;
}
/*
* Only wake up if the pipe started out empty, since
* otherwise there should be no readers waiting.
*
* If it wasn't empty we try to merge new data into
* the last buffer.
*
* That naturally merges small writes, but it also
* page-aligs the rest of the writes for large writes
* spanning multiple pages.
*/
head = pipe->head;
/* We try to merge small writes */
chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
if (!pipe_empty(head, pipe->tail) && chars != 0) {
was_empty = pipe_empty(head, pipe->tail);
chars = total_len & (PAGE_SIZE-1);
if (chars && !was_empty) {
unsigned int mask = pipe->ring_size - 1;
struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask];
int offset = buf->offset + buf->len;
@ -426,7 +436,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
ret = -EFAULT;
goto out;
}
do_wakeup = 1;
buf->len += ret;
if (!iov_iter_count(from))
goto out;
@ -471,17 +481,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
}
pipe->head = head + 1;
/* Always wake up, even if the copy fails. Otherwise
* we lock up (O_NONBLOCK-)readers that sleep due to
* syscall merging.
* FIXME! Is this really true?
*/
wake_up_locked_poll(
&pipe->wait, EPOLLIN | EPOLLRDNORM);
spin_unlock_irq(&pipe->wait.lock);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
/* Insert it into the buffer array */
buf = &pipe->bufs[head & mask];
@ -524,14 +524,37 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
ret = -ERESTARTSYS;
break;
}
/*
* We're going to release the pipe lock and wait for more
* space. We wake up any readers if necessary, and then
* after waiting we need to re-check whether the pipe
* become empty while we dropped the lock.
*/
if (was_empty) {
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
}
pipe->waiting_writers++;
pipe_wait(pipe);
pipe->waiting_writers--;
was_empty = pipe_empty(head, pipe->tail);
}
out:
__pipe_unlock(pipe);
if (do_wakeup) {
wake_up_interruptible_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
/*
* If we do do a wakeup event, we do a 'sync' wakeup, because we
* want the reader to start processing things asap, rather than
* leave the data pending.
*
* This is particularly important for small writes, because of
* how (for example) the GNU make jobserver uses small writes to
* wake up pending jobs
*/
if (was_empty) {
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
}
if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {