From 2ab3d705ca5d4f7ea345a21c3da41a447a549649 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 12 Mar 2024 13:05:07 -0400 Subject: [PATCH 1/3] dlm: fix user space lkb refcounting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch fixes to check on the right return value if it was the last callback. The rv variable got overwritten by the return of copy_result_to_user(). Fixing it by introducing a second variable for the return value and don't let rv being overwritten. Cc: stable@vger.kernel.org Fixes: 61bed0baa4db ("fs: dlm: use a non-static queue for callbacks") Reported-by: Valentin Vidić Closes: https://lore.kernel.org/gfs2/Ze4qSvzGJDt5yxC3@valentin-vidic.from.hr Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/user.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fs/dlm/user.c b/fs/dlm/user.c index 695e691b38b3..9f9b68448830 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -806,7 +806,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, struct dlm_lkb *lkb; DECLARE_WAITQUEUE(wait, current); struct dlm_callback *cb; - int rv, copy_lvb = 0; + int rv, ret, copy_lvb = 0; int old_mode, new_mode; if (count == sizeof(struct dlm_device_version)) { @@ -906,9 +906,9 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, trace_dlm_ast(lkb->lkb_resource->res_ls, lkb); } - rv = copy_result_to_user(lkb->lkb_ua, - test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags), - cb->flags, cb->mode, copy_lvb, buf, count); + ret = copy_result_to_user(lkb->lkb_ua, + test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags), + cb->flags, cb->mode, copy_lvb, buf, count); kref_put(&cb->ref, dlm_release_callback); @@ -916,7 +916,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, if (rv == DLM_DEQUEUE_CALLBACK_LAST) dlm_put_lkb(lkb); - return rv; + return ret; } static __poll_t device_poll(struct file *file, poll_table *wait) From 484b4f90c28f441e71e9d7df407c5b76578d2658 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Fri, 15 Mar 2024 14:11:45 -0500 Subject: [PATCH 2/3] dlm: revert atomic_t lkb_wait_count Revert "fs: dlm: handle lkb wait count as atomic_t" This reverts commit 75a7d60134ce84209f2c61ec4619ee543aa8f466. This counter does not need to be atomic. As the comment in the reverted commit mentions, the counter is protected by the rsb lock. Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 2 +- fs/dlm/lock.c | 32 ++++++++++++++++++-------------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index dfc444dad329..3b4dbce849f0 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -246,7 +246,7 @@ struct dlm_lkb { int8_t lkb_highbast; /* highest mode bast sent for */ int8_t lkb_wait_type; /* type of reply waiting for */ - atomic_t lkb_wait_count; + int8_t lkb_wait_count; int lkb_wait_nodeid; /* for debugging */ struct list_head lkb_statequeue; /* rsb g/c/w list */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 652c51fbbf76..6dca5cdbbc5d 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1407,7 +1407,6 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error = 0; - int wc; mutex_lock(&ls->ls_waiters_mutex); @@ -1429,17 +1428,20 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) error = -EBUSY; goto out; } - wc = atomic_inc_return(&lkb->lkb_wait_count); + lkb->lkb_wait_count++; hold_lkb(lkb); log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", - lkb->lkb_id, lkb->lkb_wait_type, mstype, wc, - dlm_iflags_val(lkb)); + lkb->lkb_id, lkb->lkb_wait_type, mstype, + lkb->lkb_wait_count, dlm_iflags_val(lkb)); goto out; } - wc = atomic_fetch_inc(&lkb->lkb_wait_count); - DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc);); + DLM_ASSERT(!lkb->lkb_wait_count, + dlm_print_lkb(lkb); + printk("wait_count %d\n", lkb->lkb_wait_count);); + + lkb->lkb_wait_count++; lkb->lkb_wait_type = mstype; lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ hold_lkb(lkb); @@ -1502,7 +1504,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, log_debug(ls, "remwait %x convert_reply zap overlap_cancel", lkb->lkb_id); lkb->lkb_wait_type = 0; - atomic_dec(&lkb->lkb_wait_count); + lkb->lkb_wait_count--; unhold_lkb(lkb); goto out_del; } @@ -1529,15 +1531,16 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, if (overlap_done && lkb->lkb_wait_type) { log_error(ls, "remwait error %x reply %d wait_type %d overlap", lkb->lkb_id, mstype, lkb->lkb_wait_type); - atomic_dec(&lkb->lkb_wait_count); + lkb->lkb_wait_count--; unhold_lkb(lkb); lkb->lkb_wait_type = 0; } - DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb);); + DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); - if (atomic_dec_and_test(&lkb->lkb_wait_count)) + lkb->lkb_wait_count--; + if (!lkb->lkb_wait_count) list_del_init(&lkb->lkb_wait_reply); unhold_lkb(lkb); return 0; @@ -2666,7 +2669,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, goto out; /* lock not allowed if there's any op in progress */ - if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)) + if (lkb->lkb_wait_type || lkb->lkb_wait_count) goto out; if (is_overlap(lkb)) @@ -2728,7 +2731,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) /* normal unlock not allowed if there's any op in progress */ if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) && - (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))) + (lkb->lkb_wait_type || lkb->lkb_wait_count)) goto out; /* an lkb may be waiting for an rsb lookup to complete where the @@ -5070,9 +5073,10 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) /* drop all wait_count references we still * hold a reference for this iteration. */ - while (!atomic_dec_and_test(&lkb->lkb_wait_count)) + while (lkb->lkb_wait_count) { + lkb->lkb_wait_count--; unhold_lkb(lkb); - + } mutex_lock(&ls->ls_waiters_mutex); list_del_init(&lkb->lkb_wait_reply); mutex_unlock(&ls->ls_waiters_mutex); From c53309b912fdd652eb3c2de44626aac4b0fc9bf9 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Fri, 15 Mar 2024 17:11:39 -0500 Subject: [PATCH 3/3] dlm: add comments about forced waiters reset When a lock is waiting for a reply for a remote operation, and recovery interrupts this "waiters" state, the remote operation is voided by the recovery, and no reply will be processed. The lkb waiters state for the remote operation is forcibly reset/cleared, so that the lock operation can be restarted after recovery. Improve the comments describing this. Signed-off-by: David Teigland --- fs/dlm/lock.c | 78 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 58 insertions(+), 20 deletions(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 6dca5cdbbc5d..fd752dd03896 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -5014,21 +5014,32 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) return lkb; } -/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the - master or dir-node for r. Processing the lkb may result in it being placed - back on waiters. */ - -/* We do this after normal locking has been enabled and any saved messages - (in requestqueue) have been processed. We should be confident that at - this point we won't get or process a reply to any of these waiting - operations. But, new ops may be coming in on the rsbs/locks here from - userspace or remotely. */ - -/* there may have been an overlap unlock/cancel prior to recovery or after - recovery. if before, the lkb may still have a pos wait_count; if after, the - overlap flag would just have been set and nothing new sent. we can be - confident here than any replies to either the initial op or overlap ops - prior to recovery have been received. */ +/* + * Forced state reset for locks that were in the middle of remote operations + * when recovery happened (i.e. lkbs that were on the waiters list, waiting + * for a reply from a remote operation.) The lkbs remaining on the waiters + * list need to be reevaluated; some may need resending to a different node + * than previously, and some may now need local handling rather than remote. + * + * First, the lkb state for the voided remote operation is forcibly reset, + * equivalent to what remove_from_waiters() would normally do: + * . lkb removed from ls_waiters list + * . lkb wait_type cleared + * . lkb waiters_count cleared + * . lkb ref count decremented for each waiters_count (almost always 1, + * but possibly 2 in case of cancel/unlock overlapping, which means + * two remote replies were being expected for the lkb.) + * + * Second, the lkb is reprocessed like an original operation would be, + * by passing it to _request_lock or _convert_lock, which will either + * process the lkb operation locally, or send it to a remote node again + * and put the lkb back onto the waiters list. + * + * When reprocessing the lkb, we may find that it's flagged for an overlapping + * force-unlock or cancel, either from before recovery began, or after recovery + * finished. If this is the case, the unlock/cancel is done directly, and the + * original operation is not initiated again (no _request_lock/_convert_lock.) + */ int dlm_recover_waiters_post(struct dlm_ls *ls) { @@ -5043,6 +5054,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) break; } + /* + * Find an lkb from the waiters list that's been affected by + * recovery node changes, and needs to be reprocessed. Does + * hold_lkb(), adding a refcount. + */ lkb = find_resend_waiter(ls); if (!lkb) break; @@ -5051,6 +5067,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) hold_rsb(r); lock_rsb(r); + /* + * If the lkb has been flagged for a force unlock or cancel, + * then the reprocessing below will be replaced by just doing + * the unlock/cancel directly. + */ mstype = lkb->lkb_wait_type; oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); @@ -5064,23 +5085,40 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, dlm_dir_nodeid(r), oc, ou); - /* At this point we assume that we won't get a reply to any - previous op or overlap op on this lock. First, do a big - remove_from_waiters() for all previous ops. */ + /* + * No reply to the pre-recovery operation will now be received, + * so a forced equivalent of remove_from_waiters() is needed to + * reset the waiters state that was in place before recovery. + */ clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); + + /* Forcibly clear wait_type */ lkb->lkb_wait_type = 0; - /* drop all wait_count references we still - * hold a reference for this iteration. + + /* + * Forcibly reset wait_count and associated refcount. The + * wait_count will almost always be 1, but in case of an + * overlapping unlock/cancel it could be 2: see where + * add_to_waiters() finds the lkb is already on the waiters + * list and does lkb_wait_count++; hold_lkb(). */ while (lkb->lkb_wait_count) { lkb->lkb_wait_count--; unhold_lkb(lkb); } + + /* Forcibly remove from waiters list */ mutex_lock(&ls->ls_waiters_mutex); list_del_init(&lkb->lkb_wait_reply); mutex_unlock(&ls->ls_waiters_mutex); + /* + * The lkb is now clear of all prior waiters state and can be + * processed locally, or sent to remote node again, or directly + * cancelled/unlocked. + */ + if (oc || ou) { /* do an unlock or cancel instead of resending */ switch (mstype) {