diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 86761ec623f9..cd9a137ad6ce 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -137,6 +137,14 @@ struct io_defer_entry { #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL) #define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK) +/* + * No waiters. It's larger than any valid value of the tw counter + * so that tests against ->cq_wait_nr would fail and skip wake_up(). + */ +#define IO_CQ_WAKE_INIT (-1U) +/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */ +#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1) + static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, struct task_struct *task, bool cancel_all); @@ -303,6 +311,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) goto err; ctx->flags = p->flags; + atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); init_waitqueue_head(&ctx->sqo_sq_wait); INIT_LIST_HEAD(&ctx->sqd_list); INIT_LIST_HEAD(&ctx->cq_overflow_list); @@ -1304,16 +1313,23 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags) { struct io_ring_ctx *ctx = req->ctx; unsigned nr_wait, nr_tw, nr_tw_prev; - struct llist_node *first; + struct llist_node *head; + /* See comment above IO_CQ_WAKE_INIT */ + BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); + + /* + * We don't know how many reuqests is there in the link and whether + * they can even be queued lazily, fall back to non-lazy. + */ if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) flags &= ~IOU_F_TWQ_LAZY_WAKE; - first = READ_ONCE(ctx->work_llist.first); + head = READ_ONCE(ctx->work_llist.first); do { nr_tw_prev = 0; - if (first) { - struct io_kiocb *first_req = container_of(first, + if (head) { + struct io_kiocb *first_req = container_of(head, struct io_kiocb, io_task_work.node); /* @@ -1322,17 +1338,29 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags) */ nr_tw_prev = READ_ONCE(first_req->nr_tw); } + + /* + * Theoretically, it can overflow, but that's fine as one of + * previous adds should've tried to wake the task. + */ nr_tw = nr_tw_prev + 1; - /* Large enough to fail the nr_wait comparison below */ if (!(flags & IOU_F_TWQ_LAZY_WAKE)) - nr_tw = -1U; + nr_tw = IO_CQ_WAKE_FORCE; req->nr_tw = nr_tw; - req->io_task_work.node.next = first; - } while (!try_cmpxchg(&ctx->work_llist.first, &first, + req->io_task_work.node.next = head; + } while (!try_cmpxchg(&ctx->work_llist.first, &head, &req->io_task_work.node)); - if (!first) { + /* + * cmpxchg implies a full barrier, which pairs with the barrier + * in set_current_state() on the io_cqring_wait() side. It's used + * to ensure that either we see updated ->cq_wait_nr, or waiters + * going to sleep will observe the work added to the list, which + * is similar to the wait/wawke task state sync. + */ + + if (!head) { if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); if (ctx->has_evfd) @@ -1340,14 +1368,12 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags) } nr_wait = atomic_read(&ctx->cq_wait_nr); - /* no one is waiting */ - if (!nr_wait) + /* not enough or no one is waiting */ + if (nr_tw < nr_wait) return; - /* either not enough or the previous add has already woken it up */ - if (nr_wait > nr_tw || nr_tw_prev >= nr_wait) + /* the previous add has already woken it up */ + if (nr_tw_prev >= nr_wait) return; - /* pairs with set_current_state() in io_cqring_wait() */ - smp_mb__after_atomic(); wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); } @@ -2000,9 +2026,10 @@ inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd, goto out; fd = array_index_nospec(fd, ctx->nr_user_files); slot = io_fixed_file_slot(&ctx->file_table, fd); - file = io_slot_file(slot); + if (!req->rsrc_node) + __io_req_set_rsrc_node(req, ctx); req->flags |= io_slot_flags(slot); - io_req_set_rsrc_node(req, ctx, 0); + file = io_slot_file(slot); out: io_ring_submit_unlock(ctx, issue_flags); return file; @@ -2613,7 +2640,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, ret = io_cqring_wait_schedule(ctx, &iowq); __set_current_state(TASK_RUNNING); - atomic_set(&ctx->cq_wait_nr, 0); + atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); /* * Run task_work after scheduling and before io_should_wake(). diff --git a/io_uring/register.c b/io_uring/register.c index 708dd1d89add..5e62c1208996 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -278,13 +279,14 @@ static __cold int io_register_iowq_aff(struct io_ring_ctx *ctx, if (len > cpumask_size()) len = cpumask_size(); - if (in_compat_syscall()) { +#ifdef CONFIG_COMPAT + if (in_compat_syscall()) ret = compat_get_bitmap(cpumask_bits(new_mask), (const compat_ulong_t __user *)arg, len * 8 /* CHAR_BIT */); - } else { + else +#endif ret = copy_from_user(new_mask, arg, len); - } if (ret) { free_cpumask_var(new_mask); diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index 7238b9cfe33b..c6f199bbee28 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -102,17 +102,21 @@ static inline void io_charge_rsrc_node(struct io_ring_ctx *ctx, node->refs++; } +static inline void __io_req_set_rsrc_node(struct io_kiocb *req, + struct io_ring_ctx *ctx) +{ + lockdep_assert_held(&ctx->uring_lock); + req->rsrc_node = ctx->rsrc_node; + io_charge_rsrc_node(ctx, ctx->rsrc_node); +} + static inline void io_req_set_rsrc_node(struct io_kiocb *req, struct io_ring_ctx *ctx, unsigned int issue_flags) { if (!req->rsrc_node) { io_ring_submit_lock(ctx, issue_flags); - - lockdep_assert_held(&ctx->uring_lock); - - req->rsrc_node = ctx->rsrc_node; - io_charge_rsrc_node(ctx, ctx->rsrc_node); + __io_req_set_rsrc_node(req, ctx); io_ring_submit_unlock(ctx, issue_flags); } } diff --git a/io_uring/rw.c b/io_uring/rw.c index 0c856726b15d..118cc9f1cf16 100644 --- a/io_uring/rw.c +++ b/io_uring/rw.c @@ -168,27 +168,6 @@ void io_readv_writev_cleanup(struct io_kiocb *req) kfree(io->free_iovec); } -static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) -{ - switch (ret) { - case -EIOCBQUEUED: - break; - case -ERESTARTSYS: - case -ERESTARTNOINTR: - case -ERESTARTNOHAND: - case -ERESTART_RESTARTBLOCK: - /* - * We can't just restart the syscall, since previously - * submitted sqes may already be in progress. Just fail this - * IO with EINTR. - */ - ret = -EINTR; - fallthrough; - default: - kiocb->ki_complete(kiocb, ret); - } -} - static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req) { struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw); @@ -371,6 +350,33 @@ static void io_complete_rw_iopoll(struct kiocb *kiocb, long res) smp_store_release(&req->iopoll_completed, 1); } +static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) +{ + /* IO was queued async, completion will happen later */ + if (ret == -EIOCBQUEUED) + return; + + /* transform internal restart error codes */ + if (unlikely(ret < 0)) { + switch (ret) { + case -ERESTARTSYS: + case -ERESTARTNOINTR: + case -ERESTARTNOHAND: + case -ERESTART_RESTARTBLOCK: + /* + * We can't just restart the syscall, since previously + * submitted sqes may already be in progress. Just fail + * this IO with EINTR. + */ + ret = -EINTR; + break; + } + } + + INDIRECT_CALL_2(kiocb->ki_complete, io_complete_rw_iopoll, + io_complete_rw, kiocb, ret); +} + static int kiocb_done(struct io_kiocb *req, ssize_t ret, unsigned int issue_flags) {