linux-stable/fs/dlm/ast.c
Alexander Aring 7d28c870a7 fs: dlm: fix DLM_IFL_CB_PENDING gets overwritten
commit a034c1370d upstream.

This patch introduce a new internal flag per lkb value to handle
internal flags which are handled not on wire. The current lkb internal
flags stored as lkb->lkb_flags are split in upper and lower bits, the
lower bits are used to share internal flags over wire for other cluster
wide lkb copies on other nodes.

In commit 61bed0baa4 ("fs: dlm: use a non-static queue for callbacks")
we introduced a new internal flag for pending callbacks for the dlm
callback queue. This flag is protected by the lkb->lkb_cb_lock lock.
This patch overlooked that on dlm receive path and the mentioned upper
and lower bits, that dlm will read the flags, mask it and write it
back. As example receive_flags() in fs/dlm/lock.c. This flag
manipulation is not done atomically and is not protected by
lkb->lkb_cb_lock. This has unknown side effects of the current callback
handling.

In future we should move to set/clear/test bit functionality and avoid
read, mask and writing back flag values. In later patches we will move
the upper parts to the new introduced internal lkb flags which are not
shared between other cluster nodes to the new non shared internal flag
field to avoid similar issues.

Cc: stable@vger.kernel.org
Fixes: 61bed0baa4 ("fs: dlm: use a non-static queue for callbacks")
Reported-by: Bob Peterson <rpeterso@redhat.com>
Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
2023-05-11 23:10:55 +09:00

286 lines
6.9 KiB
C

// SPDX-License-Identifier: GPL-2.0-only
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved.
**
**
*******************************************************************************
******************************************************************************/
#include <trace/events/dlm.h>
#include "dlm_internal.h"
#include "memory.h"
#include "lock.h"
#include "user.h"
#include "ast.h"
void dlm_release_callback(struct kref *ref)
{
struct dlm_callback *cb = container_of(ref, struct dlm_callback, ref);
dlm_free_cb(cb);
}
void dlm_callback_set_last_ptr(struct dlm_callback **from,
struct dlm_callback *to)
{
if (*from)
kref_put(&(*from)->ref, dlm_release_callback);
if (to)
kref_get(&to->ref);
*from = to;
}
void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
{
struct dlm_callback *cb, *safe;
list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
list_del(&cb->list);
kref_put(&cb->ref, dlm_release_callback);
}
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
/* invalidate */
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
lkb->lkb_last_bast_mode = -1;
}
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
struct dlm_callback *cb;
int prev_mode;
if (flags & DLM_CB_BAST) {
/* if cb is a bast, it should be skipped if the blocking mode is
* compatible with the last granted mode
*/
if (lkb->lkb_last_cast) {
if (dlm_modes_compat(mode, lkb->lkb_last_cast->mode)) {
log_debug(ls, "skip %x bast mode %d for cast mode %d",
lkb->lkb_id, mode,
lkb->lkb_last_cast->mode);
goto out;
}
}
/*
* Suppress some redundant basts here, do more on removal.
* Don't even add a bast if the callback just before it
* is a bast for the same mode or a more restrictive mode.
* (the addional > PR check is needed for PR/CW inversion)
*/
if (lkb->lkb_last_cb && lkb->lkb_last_cb->flags & DLM_CB_BAST) {
prev_mode = lkb->lkb_last_cb->mode;
if ((prev_mode == mode) ||
(prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
log_debug(ls, "skip %x add bast mode %d for bast mode %d",
lkb->lkb_id, mode, prev_mode);
goto out;
}
}
}
cb = dlm_allocate_cb();
if (!cb) {
rv = DLM_ENQUEUE_CALLBACK_FAILURE;
goto out;
}
cb->flags = flags;
cb->mode = mode;
cb->sb_status = status;
cb->sb_flags = (sbflags & 0x000000FF);
kref_init(&cb->ref);
if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags))
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
list_add_tail(&cb->list, &lkb->lkb_callbacks);
if (flags & DLM_CB_CAST)
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb);
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb);
out:
return rv;
}
int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
{
/* oldest undelivered cb is callbacks first entry */
*cb = list_first_entry_or_null(&lkb->lkb_callbacks,
struct dlm_callback, list);
if (!*cb)
return DLM_DEQUEUE_CALLBACK_EMPTY;
/* remove it from callbacks so shift others down */
list_del(&(*cb)->list);
if (list_empty(&lkb->lkb_callbacks))
return DLM_DEQUEUE_CALLBACK_LAST;
return DLM_DEQUEUE_CALLBACK_SUCCESS;
}
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
uint32_t sbflags)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int rv;
if (lkb->lkb_flags & DLM_IFL_USER) {
dlm_user_add_ast(lkb, flags, mode, status, sbflags);
return;
}
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
switch (rv) {
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
kref_get(&lkb->lkb_ref);
spin_lock(&ls->ls_cb_lock);
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
} else {
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
}
spin_unlock(&ls->ls_cb_lock);
break;
case DLM_ENQUEUE_CALLBACK_FAILURE:
WARN_ON_ONCE(1);
break;
case DLM_ENQUEUE_CALLBACK_SUCCESS:
break;
default:
WARN_ON_ONCE(1);
break;
}
spin_unlock(&lkb->lkb_cb_lock);
}
void dlm_callback_work(struct work_struct *work)
{
struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
void (*castfn) (void *astparam);
void (*bastfn) (void *astparam, int mode);
struct dlm_callback *cb;
int rv;
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
spin_unlock(&lkb->lkb_cb_lock);
if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY))
goto out;
for (;;) {
castfn = lkb->lkb_astfn;
bastfn = lkb->lkb_bastfn;
if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(ls, lkb, cb->mode);
lkb->lkb_last_bast_time = ktime_get();
lkb->lkb_last_bast_mode = cb->mode;
bastfn(lkb->lkb_astparam, cb->mode);
} else if (cb->flags & DLM_CB_CAST) {
lkb->lkb_lksb->sb_status = cb->sb_status;
lkb->lkb_lksb->sb_flags = cb->sb_flags;
trace_dlm_ast(ls, lkb);
lkb->lkb_last_cast_time = ktime_get();
castfn(lkb->lkb_astparam);
}
kref_put(&cb->ref, dlm_release_callback);
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
spin_unlock(&lkb->lkb_cb_lock);
break;
}
spin_unlock(&lkb->lkb_cb_lock);
}
out:
/* undo kref_get from dlm_add_callback, may cause lkb to be freed */
dlm_put_lkb(lkb);
}
int dlm_callback_start(struct dlm_ls *ls)
{
ls->ls_callback_wq = alloc_workqueue("dlm_callback",
WQ_HIGHPRI | WQ_MEM_RECLAIM, 0);
if (!ls->ls_callback_wq) {
log_print("can't start dlm_callback workqueue");
return -ENOMEM;
}
return 0;
}
void dlm_callback_stop(struct dlm_ls *ls)
{
if (ls->ls_callback_wq)
destroy_workqueue(ls->ls_callback_wq);
}
void dlm_callback_suspend(struct dlm_ls *ls)
{
if (ls->ls_callback_wq) {
spin_lock(&ls->ls_cb_lock);
set_bit(LSFL_CB_DELAY, &ls->ls_flags);
spin_unlock(&ls->ls_cb_lock);
flush_workqueue(ls->ls_callback_wq);
}
}
#define MAX_CB_QUEUE 25
void dlm_callback_resume(struct dlm_ls *ls)
{
struct dlm_lkb *lkb, *safe;
int count = 0, sum = 0;
bool empty;
if (!ls->ls_callback_wq)
return;
more:
spin_lock(&ls->ls_cb_lock);
list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
list_del_init(&lkb->lkb_cb_list);
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
count++;
if (count == MAX_CB_QUEUE)
break;
}
empty = list_empty(&ls->ls_cb_delay);
if (empty)
clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
spin_unlock(&ls->ls_cb_lock);
sum += count;
if (!empty) {
count = 0;
cond_resched();
goto more;
}
if (sum)
log_rinfo(ls, "%s %d", __func__, sum);
}