fs: dlm: parallelize lowcomms socket handling

This patch is rework of lowcomms handling, the main goal was here to
handle recvmsg() and sendpage() to run parallel. Parallel in two senses:
1. per connection and 2. that recvmsg()/sendpage() doesn't block each
other.

Currently recvmsg()/sendpage() cannot run parallel because two
workqueues "dlm_recv" and "dlm_send" are ordered workqueues. That means
only one work item can be executed. The amount of queue items will be
increased about the amount of nodes being inside the cluster. The current
two workqueues for sending and receiving can also block each other if the
same connection is executed at the same time in dlm_recv and dlm_send
workqueue because a per connection mutex for the socket handling.

To make it more parallel we introduce one "dlm_io" workqueue which is
not an ordered workqueue, the amount of workers are not limited. Due
per connection flags SEND/RECV pending we schedule workers ordered per
connection and per send and receive task. To get rid of the mutex
blocking same workers to do socket handling we switched to a semaphore
which handles socket operations as read lock and sock releases as write
operations, to prevent sock_release() being called while the socket is
being used.

There might be more optimization removing the semaphore and replacing it
with other synchronization mechanism, however due other circumstances
e.g. othercon behaviour it seems complicated to doing this change. I
added comments to remove the othercon handling and moving to a different
synchronization mechanism as this is done. We need to do that to the next
dlm major version upgrade because it is not backwards compatible with the
current connect mechanism.

The processing of dlm messages need to be still handled by a ordered
workqueue. An dlm_process ordered workqueue was introduced which gets
filled by the receive worker. This is probably the next bottleneck of
DLM but the application can't currently parse dlm messages parallel. A
comment was introduced to lift the workqueue context of dlm processing
in a non-sleepable softirq to get messages processing done fast.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Alexander Aring 2022-11-17 17:11:57 -05:00 committed by David Teigland
parent 1351975ac1
commit dbb751ffab
3 changed files with 590 additions and 488 deletions

File diff suppressed because it is too large Load diff

View file

@ -306,11 +306,11 @@ static void dlm_send_queue_flush(struct midcomms_node *node)
pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
rcu_read_lock();
spin_lock(&node->send_queue_lock);
spin_lock_bh(&node->send_queue_lock);
list_for_each_entry_rcu(mh, &node->send_queue, list) {
dlm_mhandle_delete(node, mh);
}
spin_unlock(&node->send_queue_lock);
spin_unlock_bh(&node->send_queue_lock);
rcu_read_unlock();
}
@ -437,7 +437,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
}
}
spin_lock(&node->send_queue_lock);
spin_lock_bh(&node->send_queue_lock);
list_for_each_entry_rcu(mh, &node->send_queue, list) {
if (before(mh->seq, seq)) {
dlm_mhandle_delete(node, mh);
@ -446,7 +446,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
break;
}
}
spin_unlock(&node->send_queue_lock);
spin_unlock_bh(&node->send_queue_lock);
rcu_read_unlock();
}
@ -890,12 +890,7 @@ static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
dlm_receive_buffer(p, nodeid);
}
/*
* Called from the low-level comms layer to process a buffer of
* commands.
*/
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len)
{
const unsigned char *ptr = buf;
const struct dlm_header *hd;
@ -930,6 +925,32 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
if (msglen > len)
break;
ret += msglen;
len -= msglen;
ptr += msglen;
}
return ret;
}
/*
* Called from the low-level comms layer to process a buffer of
* commands.
*/
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
{
const unsigned char *ptr = buf;
const struct dlm_header *hd;
uint16_t msglen;
int ret = 0;
while (len >= sizeof(struct dlm_header)) {
hd = (struct dlm_header *)ptr;
msglen = le16_to_cpu(hd->h_length);
if (msglen > len)
break;
switch (hd->h_version) {
case cpu_to_le32(DLM_VERSION_3_1):
dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
@ -1046,9 +1067,9 @@ static void midcomms_new_msg_cb(void *data)
atomic_inc(&mh->node->send_queue_cnt);
spin_lock(&mh->node->send_queue_lock);
spin_lock_bh(&mh->node->send_queue_lock);
list_add_tail_rcu(&mh->list, &mh->node->send_queue);
spin_unlock(&mh->node->send_queue_lock);
spin_unlock_bh(&mh->node->send_queue_lock);
mh->seq = mh->node->seq_send++;
}

View file

@ -14,6 +14,7 @@
struct midcomms_node;
int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc);