fs: dlm: fix race in mhandle deletion

This patch fixes a race between mhandle deletion in case of receiving an
acknowledge and flush of all pending mhandle in cases of an timeout or
resetting node states.

Fixes: 489d8e559c ("fs: dlm: add reliable connection if reconnect")
Reported-by: Guillaume Nault <gnault@redhat.com>
Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Alexander Aring 2021-06-11 12:55:40 -04:00 committed by David Teigland
parent d10a0b8875
commit f5fe8d5107

View file

@ -287,6 +287,14 @@ static void dlm_mhandle_release(struct rcu_head *rcu)
kfree(mh);
}
static void dlm_mhandle_delete(struct midcomms_node *node,
struct dlm_mhandle *mh)
{
list_del_rcu(&mh->list);
atomic_dec(&node->send_queue_cnt);
call_rcu(&mh->rcu, dlm_mhandle_release);
}
static void dlm_send_queue_flush(struct midcomms_node *node)
{
struct dlm_mhandle *mh;
@ -294,15 +302,11 @@ static void dlm_send_queue_flush(struct midcomms_node *node)
pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
rcu_read_lock();
spin_lock(&node->send_queue_lock);
list_for_each_entry_rcu(mh, &node->send_queue, list) {
spin_lock(&node->send_queue_lock);
list_del_rcu(&mh->list);
spin_unlock(&node->send_queue_lock);
atomic_dec(&node->send_queue_cnt);
call_rcu(&mh->rcu, dlm_mhandle_release);
dlm_mhandle_delete(node, mh);
}
spin_unlock(&node->send_queue_lock);
rcu_read_unlock();
}
@ -424,21 +428,24 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
rcu_read_lock();
list_for_each_entry_rcu(mh, &node->send_queue, list) {
if (before(mh->seq, seq)) {
spin_lock(&node->send_queue_lock);
list_del_rcu(&mh->list);
spin_unlock(&node->send_queue_lock);
atomic_dec(&node->send_queue_cnt);
if (mh->ack_rcv)
mh->ack_rcv(node);
call_rcu(&mh->rcu, dlm_mhandle_release);
} else {
/* send queue should be ordered */
break;
}
}
spin_lock(&node->send_queue_lock);
list_for_each_entry_rcu(mh, &node->send_queue, list) {
if (before(mh->seq, seq)) {
dlm_mhandle_delete(node, mh);
} else {
/* send queue should be ordered */
break;
}
}
spin_unlock(&node->send_queue_lock);
rcu_read_unlock();
}