diff --git a/fs/io-wq.c b/fs/io-wq.c index 315b54eb0548..8ba4ccaafbaf 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -68,6 +68,7 @@ struct io_worker { struct io_wqe_acct { unsigned nr_workers; unsigned max_workers; + int index; atomic_t nr_running; }; @@ -108,19 +109,16 @@ struct io_wq { free_work_fn *free_work; io_wq_work_fn *do_work; - struct task_struct *manager; - struct io_wq_hash *hash; refcount_t refs; - struct completion exited; atomic_t worker_refs; struct completion worker_done; struct hlist_node cpuhp_node; - pid_t task_pid; + struct task_struct *task; }; static enum cpuhp_state io_wq_online; @@ -133,8 +131,7 @@ struct io_cb_cancel_data { bool cancel_all; }; -static void io_wqe_cancel_pending_work(struct io_wqe *wqe, - struct io_cb_cancel_data *match); +static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); static bool io_worker_get(struct io_worker *worker) { @@ -163,6 +160,12 @@ static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker) return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND); } +static void io_worker_ref_put(struct io_wq *wq) +{ + if (atomic_dec_and_test(&wq->worker_refs)) + complete(&wq->worker_done); +} + static void io_worker_exit(struct io_worker *worker) { struct io_wqe *wqe = worker->wqe; @@ -190,8 +193,7 @@ static void io_worker_exit(struct io_worker *worker) raw_spin_unlock_irq(&wqe->lock); kfree_rcu(worker, rcu); - if (atomic_dec_and_test(&wqe->wq->worker_refs)) - complete(&wqe->wq->worker_done); + io_worker_ref_put(wqe->wq); do_exit(0); } @@ -206,7 +208,7 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe) /* * Check head of free list for an available worker. If one isn't available, - * caller must wake up the wq manager to create one. + * caller must create one. */ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) __must_hold(RCU) @@ -230,7 +232,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) /* * We need a worker. If we find a free one, we're good. If not, and we're - * below the max number of workers, wake up the manager to create one. + * below the max number of workers, create one. */ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) { @@ -246,8 +248,11 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) ret = io_wqe_activate_free_worker(wqe); rcu_read_unlock(); - if (!ret && acct->nr_workers < acct->max_workers) - wake_up_process(wqe->wq->manager); + if (!ret && acct->nr_workers < acct->max_workers) { + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + create_io_worker(wqe->wq, wqe, acct->index); + } } static void io_wqe_inc_running(struct io_worker *worker) @@ -257,14 +262,61 @@ static void io_wqe_inc_running(struct io_worker *worker) atomic_inc(&acct->nr_running); } +struct create_worker_data { + struct callback_head work; + struct io_wqe *wqe; + int index; +}; + +static void create_worker_cb(struct callback_head *cb) +{ + struct create_worker_data *cwd; + struct io_wq *wq; + + cwd = container_of(cb, struct create_worker_data, work); + wq = cwd->wqe->wq; + create_io_worker(wq, cwd->wqe, cwd->index); + kfree(cwd); +} + +static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct) +{ + struct create_worker_data *cwd; + struct io_wq *wq = wqe->wq; + + /* raced with exit, just ignore create call */ + if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) + goto fail; + + cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC); + if (cwd) { + init_task_work(&cwd->work, create_worker_cb); + cwd->wqe = wqe; + cwd->index = acct->index; + if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL)) + return; + + kfree(cwd); + } +fail: + atomic_dec(&acct->nr_running); + io_worker_ref_put(wq); +} + static void io_wqe_dec_running(struct io_worker *worker) __must_hold(wqe->lock) { struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; - if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) - io_wqe_wake_worker(wqe, acct); + if (!(worker->flags & IO_WORKER_F_UP)) + return; + + if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) { + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + io_queue_worker_create(wqe, acct); + } } /* @@ -483,9 +535,8 @@ static int io_wqe_worker(void *data) char buf[TASK_COMM_LEN]; worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); - io_wqe_inc_running(worker); - snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task_pid); + snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); set_task_comm(current, buf); while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { @@ -549,8 +600,7 @@ void io_wq_worker_running(struct task_struct *tsk) /* * Called when worker is going to sleep. If there are no workers currently - * running and we have work pending, wake up a free one or have the manager - * set one up. + * running and we have work pending, wake up a free one or create a new one. */ void io_wq_worker_sleeping(struct task_struct *tsk) { @@ -570,7 +620,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk) raw_spin_unlock_irq(&worker->wqe->lock); } -static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) { struct io_wqe_acct *acct = &wqe->acct[index]; struct io_worker *worker; @@ -580,7 +630,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); if (!worker) - return false; + goto fail; refcount_set(&worker->ref, 1); worker->nulls_node.pprev = NULL; @@ -588,14 +638,13 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) spin_lock_init(&worker->lock); init_completion(&worker->ref_done); - atomic_inc(&wq->worker_refs); - tsk = create_io_thread(io_wqe_worker, worker, wqe->node); if (IS_ERR(tsk)) { - if (atomic_dec_and_test(&wq->worker_refs)) - complete(&wq->worker_done); kfree(worker); - return false; +fail: + atomic_dec(&acct->nr_running); + io_worker_ref_put(wq); + return; } tsk->pf_io_worker = worker; @@ -614,7 +663,6 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) acct->nr_workers++; raw_spin_unlock_irq(&wqe->lock); wake_up_new_task(tsk); - return true; } static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) @@ -662,93 +710,11 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) return false; } -static void io_wq_check_workers(struct io_wq *wq) -{ - int node; - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - bool fork_worker[2] = { false, false }; - - if (!node_online(node)) - continue; - - raw_spin_lock_irq(&wqe->lock); - if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) - fork_worker[IO_WQ_ACCT_BOUND] = true; - if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) - fork_worker[IO_WQ_ACCT_UNBOUND] = true; - raw_spin_unlock_irq(&wqe->lock); - if (fork_worker[IO_WQ_ACCT_BOUND]) - create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); - if (fork_worker[IO_WQ_ACCT_UNBOUND]) - create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); - } -} - static bool io_wq_work_match_all(struct io_wq_work *work, void *data) { return true; } -static void io_wq_cancel_pending(struct io_wq *wq) -{ - struct io_cb_cancel_data match = { - .fn = io_wq_work_match_all, - .cancel_all = true, - }; - int node; - - for_each_node(node) - io_wqe_cancel_pending_work(wq->wqes[node], &match); -} - -/* - * Manager thread. Tasked with creating new workers, if we need them. - */ -static int io_wq_manager(void *data) -{ - struct io_wq *wq = data; - char buf[TASK_COMM_LEN]; - int node; - - snprintf(buf, sizeof(buf), "iou-mgr-%d", wq->task_pid); - set_task_comm(current, buf); - - do { - set_current_state(TASK_INTERRUPTIBLE); - io_wq_check_workers(wq); - schedule_timeout(HZ); - if (signal_pending(current)) { - struct ksignal ksig; - - if (!get_signal(&ksig)) - continue; - set_bit(IO_WQ_BIT_EXIT, &wq->state); - } - } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)); - - io_wq_check_workers(wq); - - rcu_read_lock(); - for_each_node(node) - io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); - rcu_read_unlock(); - - if (atomic_dec_and_test(&wq->worker_refs)) - complete(&wq->worker_done); - wait_for_completion(&wq->worker_done); - - spin_lock_irq(&wq->hash->wait.lock); - for_each_node(node) - list_del_init(&wq->wqes[node]->wait.entry); - spin_unlock_irq(&wq->hash->wait.lock); - - io_wq_cancel_pending(wq); - complete(&wq->exited); - do_exit(0); -} - static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) { struct io_wq *wq = wqe->wq; @@ -780,39 +746,13 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) wq_list_add_after(&work->list, &tail->list, &wqe->work_list); } -static int io_wq_fork_manager(struct io_wq *wq) -{ - struct task_struct *tsk; - - if (wq->manager) - return 0; - - WARN_ON_ONCE(test_bit(IO_WQ_BIT_EXIT, &wq->state)); - - init_completion(&wq->worker_done); - atomic_set(&wq->worker_refs, 1); - tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE); - if (!IS_ERR(tsk)) { - wq->manager = get_task_struct(tsk); - wake_up_new_task(tsk); - return 0; - } - - if (atomic_dec_and_test(&wq->worker_refs)) - complete(&wq->worker_done); - - return PTR_ERR(tsk); -} - static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); int work_flags; unsigned long flags; - /* Can only happen if manager creation fails after exec */ - if (io_wq_fork_manager(wqe->wq) || - test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) { + if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) { io_run_cancel(work, wqe); return; } @@ -967,17 +907,12 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); - int ret; list_del_init(&wait->entry); rcu_read_lock(); - ret = io_wqe_activate_free_worker(wqe); + io_wqe_activate_free_worker(wqe); rcu_read_unlock(); - - if (!ret) - wake_up_process(wqe->wq->manager); - return 1; } @@ -1018,6 +953,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) goto err; wq->wqes[node] = wqe; wqe->node = alloc_node; + wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND; + wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = @@ -1032,13 +969,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) INIT_LIST_HEAD(&wqe->all_list); } - wq->task_pid = current->pid; - init_completion(&wq->exited); + wq->task = get_task_struct(data->task); refcount_set(&wq->refs, 1); - - ret = io_wq_fork_manager(wq); - if (!ret) - return wq; + atomic_set(&wq->worker_refs, 1); + init_completion(&wq->worker_done); + return wq; err: io_wq_put_hash(data->hash); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); @@ -1051,14 +986,39 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) return ERR_PTR(ret); } -static void io_wq_destroy_manager(struct io_wq *wq) +static void io_wq_exit_workers(struct io_wq *wq) { - if (wq->manager) { - wake_up_process(wq->manager); - wait_for_completion(&wq->exited); - put_task_struct(wq->manager); - wq->manager = NULL; + struct callback_head *cb; + int node; + + set_bit(IO_WQ_BIT_EXIT, &wq->state); + + if (!wq->task) + return; + + while ((cb = task_work_cancel(wq->task, create_worker_cb)) != NULL) { + struct create_worker_data *cwd; + + cwd = container_of(cb, struct create_worker_data, work); + atomic_dec(&cwd->wqe->acct[cwd->index].nr_running); + io_worker_ref_put(wq); + kfree(cwd); } + + rcu_read_lock(); + for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; + + io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL); + spin_lock_irq(&wq->hash->wait.lock); + list_del_init(&wq->wqes[node]->wait.entry); + spin_unlock_irq(&wq->hash->wait.lock); + } + rcu_read_unlock(); + io_worker_ref_put(wq); + wait_for_completion(&wq->worker_done); + put_task_struct(wq->task); + wq->task = NULL; } static void io_wq_destroy(struct io_wq *wq) @@ -1067,8 +1027,7 @@ static void io_wq_destroy(struct io_wq *wq) cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); - set_bit(IO_WQ_BIT_EXIT, &wq->state); - io_wq_destroy_manager(wq); + io_wq_exit_workers(wq); for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; @@ -1092,8 +1051,7 @@ void io_wq_put(struct io_wq *wq) void io_wq_put_and_exit(struct io_wq *wq) { - set_bit(IO_WQ_BIT_EXIT, &wq->state); - io_wq_destroy_manager(wq); + io_wq_exit_workers(wq); io_wq_put(wq); } diff --git a/fs/io-wq.h b/fs/io-wq.h index 80d590564ff9..0e6d310999e8 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -116,6 +116,7 @@ static inline void io_wq_put_hash(struct io_wq_hash *hash) struct io_wq_data { struct io_wq_hash *hash; + struct task_struct *task; io_wq_work_fn *do_work; free_work_fn *free_work; }; diff --git a/fs/io_uring.c b/fs/io_uring.c index eeb165253491..9a8504294ab7 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -7911,7 +7911,8 @@ static struct io_wq_work *io_free_work(struct io_wq_work *work) return req ? &req->work : NULL; } -static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx) +static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx, + struct task_struct *task) { struct io_wq_hash *hash; struct io_wq_data data; @@ -7928,6 +7929,7 @@ static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx) } data.hash = hash; + data.task = task; data.free_work = io_free_work; data.do_work = io_wq_submit_work; @@ -7953,7 +7955,7 @@ static int io_uring_alloc_task_context(struct task_struct *task, return ret; } - tctx->io_wq = io_init_wq_offload(ctx); + tctx->io_wq = io_init_wq_offload(ctx, task); if (IS_ERR(tctx->io_wq)) { ret = PTR_ERR(tctx->io_wq); percpu_counter_destroy(&tctx->inflight);