linux-stable/fs/btrfs/async-thread.c
Nikolay Borisov 670f6b3867 btrfs: fix memory ordering between normal and ordered work functions
commit 45da9c1767 upstream.

Ordered work functions aren't guaranteed to be handled by the same thread
which executed the normal work functions. The only way execution between
normal/ordered functions is synchronized is via the WORK_DONE_BIT,
unfortunately the used bitops don't guarantee any ordering whatsoever.

This manifested as seemingly inexplicable crashes on ARM64, where
async_chunk::inode is seen as non-null in async_cow_submit which causes
submit_compressed_extents to be called and crash occurs because
async_chunk::inode suddenly became NULL. The call trace was similar to:

    pc : submit_compressed_extents+0x38/0x3d0
    lr : async_cow_submit+0x50/0xd0
    sp : ffff800015d4bc20

    <registers omitted for brevity>

    Call trace:
     submit_compressed_extents+0x38/0x3d0
     async_cow_submit+0x50/0xd0
     run_ordered_work+0xc8/0x280
     btrfs_work_helper+0x98/0x250
     process_one_work+0x1f0/0x4ac
     worker_thread+0x188/0x504
     kthread+0x110/0x114
     ret_from_fork+0x10/0x18

Fix this by adding respective barrier calls which ensure that all
accesses preceding setting of WORK_DONE_BIT are strictly ordered before
setting the flag. At the same time add a read barrier after reading of
WORK_DONE_BIT in run_ordered_work which ensures all subsequent loads
would be strictly ordered after reading the bit. This in turn ensures
are all accesses before WORK_DONE_BIT are going to be strictly ordered
before any access that can occur in ordered_func.

Reported-by: Chris Murphy <lists@colorremedies.com>
Fixes: 08a9ff3264 ("btrfs: Added btrfs_workqueue_struct implemented ordered execution based on kernel workqueue")
CC: stable@vger.kernel.org # 4.4+
Link: https://bugzilla.redhat.com/show_bug.cgi?id=2011928
Reviewed-by: Josef Bacik <josef@toxicpanda.com>
Tested-by: Chris Murphy <chris@colorremedies.com>
Signed-off-by: Nikolay Borisov <nborisov@suse.com>
Signed-off-by: David Sterba <dsterba@suse.com>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
2021-11-26 10:47:21 +01:00

426 lines
11 KiB
C

// SPDX-License-Identifier: GPL-2.0
/*
* Copyright (C) 2007 Oracle. All rights reserved.
* Copyright (C) 2014 Fujitsu. All rights reserved.
*/
#include <linux/kthread.h>
#include <linux/slab.h>
#include <linux/list.h>
#include <linux/spinlock.h>
#include <linux/freezer.h>
#include "async-thread.h"
#include "ctree.h"
enum {
WORK_DONE_BIT,
WORK_ORDER_DONE_BIT,
WORK_HIGH_PRIO_BIT,
};
#define NO_THRESHOLD (-1)
#define DFT_THRESHOLD (32)
struct __btrfs_workqueue {
struct workqueue_struct *normal_wq;
/* File system this workqueue services */
struct btrfs_fs_info *fs_info;
/* List head pointing to ordered work list */
struct list_head ordered_list;
/* Spinlock for ordered_list */
spinlock_t list_lock;
/* Thresholding related variants */
atomic_t pending;
/* Up limit of concurrency workers */
int limit_active;
/* Current number of concurrency workers */
int current_active;
/* Threshold to change current_active */
int thresh;
unsigned int count;
spinlock_t thres_lock;
};
struct btrfs_workqueue {
struct __btrfs_workqueue *normal;
struct __btrfs_workqueue *high;
};
struct btrfs_fs_info *
btrfs_workqueue_owner(const struct __btrfs_workqueue *wq)
{
return wq->fs_info;
}
struct btrfs_fs_info *
btrfs_work_owner(const struct btrfs_work *work)
{
return work->wq->fs_info;
}
bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq)
{
/*
* We could compare wq->normal->pending with num_online_cpus()
* to support "thresh == NO_THRESHOLD" case, but it requires
* moving up atomic_inc/dec in thresh_queue/exec_hook. Let's
* postpone it until someone needs the support of that case.
*/
if (wq->normal->thresh == NO_THRESHOLD)
return false;
return atomic_read(&wq->normal->pending) > wq->normal->thresh * 2;
}
static struct __btrfs_workqueue *
__btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info, const char *name,
unsigned int flags, int limit_active, int thresh)
{
struct __btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL);
if (!ret)
return NULL;
ret->fs_info = fs_info;
ret->limit_active = limit_active;
atomic_set(&ret->pending, 0);
if (thresh == 0)
thresh = DFT_THRESHOLD;
/* For low threshold, disabling threshold is a better choice */
if (thresh < DFT_THRESHOLD) {
ret->current_active = limit_active;
ret->thresh = NO_THRESHOLD;
} else {
/*
* For threshold-able wq, let its concurrency grow on demand.
* Use minimal max_active at alloc time to reduce resource
* usage.
*/
ret->current_active = 1;
ret->thresh = thresh;
}
if (flags & WQ_HIGHPRI)
ret->normal_wq = alloc_workqueue("btrfs-%s-high", flags,
ret->current_active, name);
else
ret->normal_wq = alloc_workqueue("btrfs-%s", flags,
ret->current_active, name);
if (!ret->normal_wq) {
kfree(ret);
return NULL;
}
INIT_LIST_HEAD(&ret->ordered_list);
spin_lock_init(&ret->list_lock);
spin_lock_init(&ret->thres_lock);
trace_btrfs_workqueue_alloc(ret, name, flags & WQ_HIGHPRI);
return ret;
}
static inline void
__btrfs_destroy_workqueue(struct __btrfs_workqueue *wq);
struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info,
const char *name,
unsigned int flags,
int limit_active,
int thresh)
{
struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL);
if (!ret)
return NULL;
ret->normal = __btrfs_alloc_workqueue(fs_info, name,
flags & ~WQ_HIGHPRI,
limit_active, thresh);
if (!ret->normal) {
kfree(ret);
return NULL;
}
if (flags & WQ_HIGHPRI) {
ret->high = __btrfs_alloc_workqueue(fs_info, name, flags,
limit_active, thresh);
if (!ret->high) {
__btrfs_destroy_workqueue(ret->normal);
kfree(ret);
return NULL;
}
}
return ret;
}
/*
* Hook for threshold which will be called in btrfs_queue_work.
* This hook WILL be called in IRQ handler context,
* so workqueue_set_max_active MUST NOT be called in this hook
*/
static inline void thresh_queue_hook(struct __btrfs_workqueue *wq)
{
if (wq->thresh == NO_THRESHOLD)
return;
atomic_inc(&wq->pending);
}
/*
* Hook for threshold which will be called before executing the work,
* This hook is called in kthread content.
* So workqueue_set_max_active is called here.
*/
static inline void thresh_exec_hook(struct __btrfs_workqueue *wq)
{
int new_current_active;
long pending;
int need_change = 0;
if (wq->thresh == NO_THRESHOLD)
return;
atomic_dec(&wq->pending);
spin_lock(&wq->thres_lock);
/*
* Use wq->count to limit the calling frequency of
* workqueue_set_max_active.
*/
wq->count++;
wq->count %= (wq->thresh / 4);
if (!wq->count)
goto out;
new_current_active = wq->current_active;
/*
* pending may be changed later, but it's OK since we really
* don't need it so accurate to calculate new_max_active.
*/
pending = atomic_read(&wq->pending);
if (pending > wq->thresh)
new_current_active++;
if (pending < wq->thresh / 2)
new_current_active--;
new_current_active = clamp_val(new_current_active, 1, wq->limit_active);
if (new_current_active != wq->current_active) {
need_change = 1;
wq->current_active = new_current_active;
}
out:
spin_unlock(&wq->thres_lock);
if (need_change) {
workqueue_set_max_active(wq->normal_wq, wq->current_active);
}
}
static void run_ordered_work(struct __btrfs_workqueue *wq,
struct btrfs_work *self)
{
struct list_head *list = &wq->ordered_list;
struct btrfs_work *work;
spinlock_t *lock = &wq->list_lock;
unsigned long flags;
void *wtag;
bool free_self = false;
while (1) {
spin_lock_irqsave(lock, flags);
if (list_empty(list))
break;
work = list_entry(list->next, struct btrfs_work,
ordered_list);
if (!test_bit(WORK_DONE_BIT, &work->flags))
break;
/*
* Orders all subsequent loads after reading WORK_DONE_BIT,
* paired with the smp_mb__before_atomic in btrfs_work_helper
* this guarantees that the ordered function will see all
* updates from ordinary work function.
*/
smp_rmb();
/*
* we are going to call the ordered done function, but
* we leave the work item on the list as a barrier so
* that later work items that are done don't have their
* functions called before this one returns
*/
if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
break;
trace_btrfs_ordered_sched(work);
spin_unlock_irqrestore(lock, flags);
work->ordered_func(work);
/* now take the lock again and drop our item from the list */
spin_lock_irqsave(lock, flags);
list_del(&work->ordered_list);
spin_unlock_irqrestore(lock, flags);
if (work == self) {
/*
* This is the work item that the worker is currently
* executing.
*
* The kernel workqueue code guarantees non-reentrancy
* of work items. I.e., if a work item with the same
* address and work function is queued twice, the second
* execution is blocked until the first one finishes. A
* work item may be freed and recycled with the same
* work function; the workqueue code assumes that the
* original work item cannot depend on the recycled work
* item in that case (see find_worker_executing_work()).
*
* Note that different types of Btrfs work can depend on
* each other, and one type of work on one Btrfs
* filesystem may even depend on the same type of work
* on another Btrfs filesystem via, e.g., a loop device.
* Therefore, we must not allow the current work item to
* be recycled until we are really done, otherwise we
* break the above assumption and can deadlock.
*/
free_self = true;
} else {
/*
* We don't want to call the ordered free functions with
* the lock held though. Save the work as tag for the
* trace event, because the callback could free the
* structure.
*/
wtag = work;
work->ordered_free(work);
trace_btrfs_all_work_done(wq->fs_info, wtag);
}
}
spin_unlock_irqrestore(lock, flags);
if (free_self) {
wtag = self;
self->ordered_free(self);
trace_btrfs_all_work_done(wq->fs_info, wtag);
}
}
static void btrfs_work_helper(struct work_struct *normal_work)
{
struct btrfs_work *work = container_of(normal_work, struct btrfs_work,
normal_work);
struct __btrfs_workqueue *wq;
void *wtag;
int need_order = 0;
/*
* We should not touch things inside work in the following cases:
* 1) after work->func() if it has no ordered_free
* Since the struct is freed in work->func().
* 2) after setting WORK_DONE_BIT
* The work may be freed in other threads almost instantly.
* So we save the needed things here.
*/
if (work->ordered_func)
need_order = 1;
wq = work->wq;
/* Safe for tracepoints in case work gets freed by the callback */
wtag = work;
trace_btrfs_work_sched(work);
thresh_exec_hook(wq);
work->func(work);
if (need_order) {
/*
* Ensures all memory accesses done in the work function are
* ordered before setting the WORK_DONE_BIT. Ensuring the thread
* which is going to executed the ordered work sees them.
* Pairs with the smp_rmb in run_ordered_work.
*/
smp_mb__before_atomic();
set_bit(WORK_DONE_BIT, &work->flags);
run_ordered_work(wq, work);
}
if (!need_order)
trace_btrfs_all_work_done(wq->fs_info, wtag);
}
void btrfs_init_work(struct btrfs_work *work, btrfs_func_t func,
btrfs_func_t ordered_func, btrfs_func_t ordered_free)
{
work->func = func;
work->ordered_func = ordered_func;
work->ordered_free = ordered_free;
INIT_WORK(&work->normal_work, btrfs_work_helper);
INIT_LIST_HEAD(&work->ordered_list);
work->flags = 0;
}
static inline void __btrfs_queue_work(struct __btrfs_workqueue *wq,
struct btrfs_work *work)
{
unsigned long flags;
work->wq = wq;
thresh_queue_hook(wq);
if (work->ordered_func) {
spin_lock_irqsave(&wq->list_lock, flags);
list_add_tail(&work->ordered_list, &wq->ordered_list);
spin_unlock_irqrestore(&wq->list_lock, flags);
}
trace_btrfs_work_queued(work);
queue_work(wq->normal_wq, &work->normal_work);
}
void btrfs_queue_work(struct btrfs_workqueue *wq,
struct btrfs_work *work)
{
struct __btrfs_workqueue *dest_wq;
if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags) && wq->high)
dest_wq = wq->high;
else
dest_wq = wq->normal;
__btrfs_queue_work(dest_wq, work);
}
static inline void
__btrfs_destroy_workqueue(struct __btrfs_workqueue *wq)
{
destroy_workqueue(wq->normal_wq);
trace_btrfs_workqueue_destroy(wq);
kfree(wq);
}
void btrfs_destroy_workqueue(struct btrfs_workqueue *wq)
{
if (!wq)
return;
if (wq->high)
__btrfs_destroy_workqueue(wq->high);
__btrfs_destroy_workqueue(wq->normal);
kfree(wq);
}
void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active)
{
if (!wq)
return;
wq->normal->limit_active = limit_active;
if (wq->high)
wq->high->limit_active = limit_active;
}
void btrfs_set_work_high_priority(struct btrfs_work *work)
{
set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
}
void btrfs_flush_workqueue(struct btrfs_workqueue *wq)
{
if (wq->high)
flush_workqueue(wq->high->normal_wq);
flush_workqueue(wq->normal->normal_wq);
}