diff --git a/kernel/workqueue.c b/kernel/workqueue.c index d9a4aeb844d5..57cd77de4a4f 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -45,6 +45,7 @@ #include #include #include +#include #include "workqueue_internal.h" @@ -245,6 +246,7 @@ struct workqueue_struct { int saved_max_active; /* WQ: saved pwq max_active */ struct workqueue_attrs *unbound_attrs; /* WQ: only for unbound wqs */ + struct pool_workqueue *dfl_pwq; /* WQ: only for unbound wqs */ #ifdef CONFIG_SYSFS struct wq_device *wq_dev; /* I: for sysfs interface */ @@ -268,6 +270,9 @@ static cpumask_var_t *wq_numa_possible_cpumask; static bool wq_numa_enabled; /* unbound NUMA affinity enabled */ +/* buf for wq_update_unbound_numa_attrs(), protected by CPU hotplug exclusion */ +static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf; + static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */ @@ -3710,6 +3715,61 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq, return pwq; } +/* undo alloc_unbound_pwq(), used only in the error path */ +static void free_unbound_pwq(struct pool_workqueue *pwq) +{ + lockdep_assert_held(&wq_pool_mutex); + + if (pwq) { + put_unbound_pool(pwq->pool); + kfree(pwq); + } +} + +/** + * wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node + * @attrs: the wq_attrs of interest + * @node: the target NUMA node + * @cpu_going_down: if >= 0, the CPU to consider as offline + * @cpumask: outarg, the resulting cpumask + * + * Calculate the cpumask a workqueue with @attrs should use on @node. If + * @cpu_going_down is >= 0, that cpu is considered offline during + * calculation. The result is stored in @cpumask. This function returns + * %true if the resulting @cpumask is different from @attrs->cpumask, + * %false if equal. + * + * If NUMA affinity is not enabled, @attrs->cpumask is always used. If + * enabled and @node has online CPUs requested by @attrs, the returned + * cpumask is the intersection of the possible CPUs of @node and + * @attrs->cpumask. + * + * The caller is responsible for ensuring that the cpumask of @node stays + * stable. + */ +static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node, + int cpu_going_down, cpumask_t *cpumask) +{ + if (!wq_numa_enabled) + goto use_dfl; + + /* does @node have any online CPUs @attrs wants? */ + cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask); + if (cpu_going_down >= 0) + cpumask_clear_cpu(cpu_going_down, cpumask); + + if (cpumask_empty(cpumask)) + goto use_dfl; + + /* yeap, return possible CPUs in @node that @attrs wants */ + cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]); + return !cpumask_equal(cpumask, attrs->cpumask); + +use_dfl: + cpumask_copy(cpumask, attrs->cpumask); + return false; +} + /* install @pwq into @wq's numa_pwq_tbl[] for @node and return the old pwq */ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq, int node, @@ -3732,11 +3792,12 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq, * @wq: the target workqueue * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs() * - * Apply @attrs to an unbound workqueue @wq. If @attrs doesn't match the - * current attributes, a new pwq is created and made the first pwq which - * will serve all new work items. Older pwqs are released as in-flight - * work items finish. Note that a work item which repeatedly requeues - * itself back-to-back will stay on its current pwq. + * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA + * machines, this function maps a separate pwq to each NUMA node with + * possibles CPUs in @attrs->cpumask so that work items are affine to the + * NUMA node it was issued on. Older pwqs are released as in-flight work + * items finish. Note that a work item which repeatedly requeues itself + * back-to-back will stay on its current pwq. * * Performs GFP_KERNEL allocations. Returns 0 on success and -errno on * failure. @@ -3744,8 +3805,8 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq, int apply_workqueue_attrs(struct workqueue_struct *wq, const struct workqueue_attrs *attrs) { - struct workqueue_attrs *new_attrs; - struct pool_workqueue *pwq, *last_pwq = NULL; + struct workqueue_attrs *new_attrs, *tmp_attrs; + struct pool_workqueue **pwq_tbl, *dfl_pwq; int node, ret; /* only unbound workqueues can change attributes */ @@ -3756,40 +3817,191 @@ int apply_workqueue_attrs(struct workqueue_struct *wq, if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs))) return -EINVAL; - /* make a copy of @attrs and sanitize it */ + pwq_tbl = kzalloc(wq_numa_tbl_len * sizeof(pwq_tbl[0]), GFP_KERNEL); new_attrs = alloc_workqueue_attrs(GFP_KERNEL); - if (!new_attrs) + tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL); + if (!pwq_tbl || !new_attrs || !tmp_attrs) goto enomem; + /* make a copy of @attrs and sanitize it */ copy_workqueue_attrs(new_attrs, attrs); cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask); - mutex_lock(&wq_pool_mutex); - pwq = alloc_unbound_pwq(wq, new_attrs); - mutex_unlock(&wq_pool_mutex); - if (!pwq) - goto enomem; + /* + * We may create multiple pwqs with differing cpumasks. Make a + * copy of @new_attrs which will be modified and used to obtain + * pools. + */ + copy_workqueue_attrs(tmp_attrs, new_attrs); + /* + * CPUs should stay stable across pwq creations and installations. + * Pin CPUs, determine the target cpumask for each node and create + * pwqs accordingly. + */ + get_online_cpus(); + + mutex_lock(&wq_pool_mutex); + + /* + * If something goes wrong during CPU up/down, we'll fall back to + * the default pwq covering whole @attrs->cpumask. Always create + * it even if we don't use it immediately. + */ + dfl_pwq = alloc_unbound_pwq(wq, new_attrs); + if (!dfl_pwq) + goto enomem_pwq; + + for_each_node(node) { + if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) { + pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs); + if (!pwq_tbl[node]) + goto enomem_pwq; + } else { + dfl_pwq->refcnt++; + pwq_tbl[node] = dfl_pwq; + } + } + + mutex_unlock(&wq_pool_mutex); + + /* all pwqs have been created successfully, let's install'em */ mutex_lock(&wq->mutex); copy_workqueue_attrs(wq->unbound_attrs, new_attrs); + + /* save the previous pwq and install the new one */ for_each_node(node) - last_pwq = numa_pwq_tbl_install(wq, node, pwq); + pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]); + + /* @dfl_pwq might not have been used, ensure it's linked */ + link_pwq(dfl_pwq); + swap(wq->dfl_pwq, dfl_pwq); mutex_unlock(&wq->mutex); - put_pwq_unlocked(last_pwq); + /* put the old pwqs */ + for_each_node(node) + put_pwq_unlocked(pwq_tbl[node]); + put_pwq_unlocked(dfl_pwq); + + put_online_cpus(); ret = 0; /* fall through */ out_free: + free_workqueue_attrs(tmp_attrs); free_workqueue_attrs(new_attrs); + kfree(pwq_tbl); return ret; +enomem_pwq: + free_unbound_pwq(dfl_pwq); + for_each_node(node) + if (pwq_tbl && pwq_tbl[node] != dfl_pwq) + free_unbound_pwq(pwq_tbl[node]); + mutex_unlock(&wq_pool_mutex); + put_online_cpus(); enomem: ret = -ENOMEM; goto out_free; } +/** + * wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug + * @wq: the target workqueue + * @cpu: the CPU coming up or going down + * @online: whether @cpu is coming up or going down + * + * This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and + * %CPU_DOWN_FAILED. @cpu is being hot[un]plugged, update NUMA affinity of + * @wq accordingly. + * + * If NUMA affinity can't be adjusted due to memory allocation failure, it + * falls back to @wq->dfl_pwq which may not be optimal but is always + * correct. + * + * Note that when the last allowed CPU of a NUMA node goes offline for a + * workqueue with a cpumask spanning multiple nodes, the workers which were + * already executing the work items for the workqueue will lose their CPU + * affinity and may execute on any CPU. This is similar to how per-cpu + * workqueues behave on CPU_DOWN. If a workqueue user wants strict + * affinity, it's the user's responsibility to flush the work item from + * CPU_DOWN_PREPARE. + */ +static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu, + bool online) +{ + int node = cpu_to_node(cpu); + int cpu_off = online ? -1 : cpu; + struct pool_workqueue *old_pwq = NULL, *pwq; + struct workqueue_attrs *target_attrs; + cpumask_t *cpumask; + + lockdep_assert_held(&wq_pool_mutex); + + if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND)) + return; + + /* + * We don't wanna alloc/free wq_attrs for each wq for each CPU. + * Let's use a preallocated one. The following buf is protected by + * CPU hotplug exclusion. + */ + target_attrs = wq_update_unbound_numa_attrs_buf; + cpumask = target_attrs->cpumask; + + mutex_lock(&wq->mutex); + + copy_workqueue_attrs(target_attrs, wq->unbound_attrs); + pwq = unbound_pwq_by_node(wq, node); + + /* + * Let's determine what needs to be done. If the target cpumask is + * different from wq's, we need to compare it to @pwq's and create + * a new one if they don't match. If the target cpumask equals + * wq's, the default pwq should be used. If @pwq is already the + * default one, nothing to do; otherwise, install the default one. + */ + if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) { + if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask)) + goto out_unlock; + } else { + if (pwq == wq->dfl_pwq) + goto out_unlock; + else + goto use_dfl_pwq; + } + + mutex_unlock(&wq->mutex); + + /* create a new pwq */ + pwq = alloc_unbound_pwq(wq, target_attrs); + if (!pwq) { + pr_warning("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n", + wq->name); + goto out_unlock; + } + + /* + * Install the new pwq. As this function is called only from CPU + * hotplug callbacks and applying a new attrs is wrapped with + * get/put_online_cpus(), @wq->unbound_attrs couldn't have changed + * inbetween. + */ + mutex_lock(&wq->mutex); + old_pwq = numa_pwq_tbl_install(wq, node, pwq); + goto out_unlock; + +use_dfl_pwq: + spin_lock_irq(&wq->dfl_pwq->pool->lock); + get_pwq(wq->dfl_pwq); + spin_unlock_irq(&wq->dfl_pwq->pool->lock); + old_pwq = numa_pwq_tbl_install(wq, node, wq->dfl_pwq); +out_unlock: + mutex_unlock(&wq->mutex); + put_pwq_unlocked(old_pwq); +} + static int alloc_and_link_pwqs(struct workqueue_struct *wq) { bool highpri = wq->flags & WQ_HIGHPRI; @@ -3942,6 +4154,7 @@ EXPORT_SYMBOL_GPL(__alloc_workqueue_key); void destroy_workqueue(struct workqueue_struct *wq) { struct pool_workqueue *pwq; + int node; /* drain it before proceeding with destruction */ drain_workqueue(wq); @@ -3993,11 +4206,21 @@ void destroy_workqueue(struct workqueue_struct *wq) } else { /* * We're the sole accessor of @wq at this point. Directly - * access the first pwq and put the base ref. @wq will be - * freed when the last pwq is released. + * access numa_pwq_tbl[] and dfl_pwq to put the base refs. + * @wq will be freed when the last pwq is released. */ - pwq = list_first_entry(&wq->pwqs, struct pool_workqueue, - pwqs_node); + for_each_node(node) { + pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]); + RCU_INIT_POINTER(wq->numa_pwq_tbl[node], NULL); + put_pwq_unlocked(pwq); + } + + /* + * Put dfl_pwq. @wq may be freed any time after dfl_pwq is + * put. Don't access it afterwards. + */ + pwq = wq->dfl_pwq; + wq->dfl_pwq = NULL; put_pwq_unlocked(pwq); } } @@ -4285,6 +4508,7 @@ static int __cpuinit workqueue_cpu_up_callback(struct notifier_block *nfb, { int cpu = (unsigned long)hcpu; struct worker_pool *pool; + struct workqueue_struct *wq; int pi; switch (action & ~CPU_TASKS_FROZEN) { @@ -4317,6 +4541,10 @@ static int __cpuinit workqueue_cpu_up_callback(struct notifier_block *nfb, mutex_unlock(&pool->manager_mutex); } + /* update NUMA affinity of unbound workqueues */ + list_for_each_entry(wq, &workqueues, list) + wq_update_unbound_numa(wq, cpu, true); + mutex_unlock(&wq_pool_mutex); break; } @@ -4333,12 +4561,21 @@ static int __cpuinit workqueue_cpu_down_callback(struct notifier_block *nfb, { int cpu = (unsigned long)hcpu; struct work_struct unbind_work; + struct workqueue_struct *wq; switch (action & ~CPU_TASKS_FROZEN) { case CPU_DOWN_PREPARE: - /* unbinding should happen on the local CPU */ + /* unbinding per-cpu workers should happen on the local CPU */ INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn); queue_work_on(cpu, system_highpri_wq, &unbind_work); + + /* update NUMA affinity of unbound workqueues */ + mutex_lock(&wq_pool_mutex); + list_for_each_entry(wq, &workqueues, list) + wq_update_unbound_numa(wq, cpu, false); + mutex_unlock(&wq_pool_mutex); + + /* wait for per-cpu unbinding to finish */ flush_work(&unbind_work); break; } @@ -4526,6 +4763,9 @@ static void __init wq_numa_init(void) if (num_possible_nodes() <= 1) return; + wq_update_unbound_numa_attrs_buf = alloc_workqueue_attrs(GFP_KERNEL); + BUG_ON(!wq_update_unbound_numa_attrs_buf); + /* * We want masks of possible CPUs of each node which isn't readily * available. Build one from cpu_to_node() which should have been