dlm for 3.5

This set includes some minor fixes and improvements.
 The one large patch addresses the special "nodir" mode,
 which has been a long neglected proof of concept, but
 with these fixes seems to be quite usable.  It allows
 the resource master to be assigned statically instead of
 dynamically, which can improve performance if there is
 little locality and most resources are shared.
 -----BEGIN PGP SIGNATURE-----
 Version: GnuPG v1.4.12 (GNU/Linux)
 
 iQIcBAABAgAGBQJPu/MlAAoJEDgbc8f8gGmq860P/0o+tYG2pAUz87WnKg92cGwm
 ajaI78ydY6qOjndcEjbgdX6uWqVQ7f/OKo3drzVH8KFQ67eiaXC4wv2xTL3aymbX
 2Ua55oiVsW+k9d9yK5Dzfa4qAlR5QPV1WEAnoVkiEDNoiGCGecjmVebhK1/Sb5Lu
 1gaIJ3C+3L1ngfAzpfeB+7LwuVB36UlIyBrvPOj6yWiSDgpPaVbTrEU0NaDDDDIi
 oo7tTiqivCZf/GH+ZcIjPE/LBen/lVqXSDU2YShiac/ErRfpRk9rnDFIUeN2nYPd
 JwPjzutFWM+N6HIA2RCBXKo7FkK2rvYXw84/RVMvA4goEH/Qu8yDtBww20BmvFYY
 3guU1udka0/NR7/ap98Btdqsvqco6R2X/rpzx8y1eD1jzUvb6El6yg3PM1Qvd8zQ
 72aVzcdgAI4qtEAVziy5X4omNeQ6a55sUYXlCcvkiwZJQdPzkDuzntC28q3bgJva
 QD0ugX7ltBpHuZZZb2tbBN9hfMqyo7gneaY2OoGVCTb1U9ibb5JgfZOswTC2gQsE
 17vykdL5owQ8bbBj2tkRQiJ8dZoxn23hV+sZrvLm3TR8xF4oJtDqUdRs9K7iX8It
 YxTTCL1LmxHRFG/0Cy2l7VhoqkIKsoVFdavW7pivFNkzp/yQNHk4r2iJWhR9YArV
 qaE2HqIxJsev/B/lBPyo
 =mHOh
 -----END PGP SIGNATURE-----

Merge tag 'dlm-3.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set includes some minor fixes and improvements.  The one large
  patch addresses the special "nodir" mode, which has been a long
  neglected proof of concept, but with these fixes seems to be quite
  usable.  It allows the resource master to be assigned statically
  instead of dynamically, which can improve performance if there is
  little locality and most resources are shared."

* tag 'dlm-3.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: NULL dereference on failure in kmem_cache_create()
  gfs2: fix recovery during unmount
  dlm: fixes for nodir mode
  dlm: improve error and debug messages
  dlm: avoid unnecessary search in search_rsb
  dlm: limit rcom debug messages
  dlm: fix waiter recovery
  dlm: prevent connections during shutdown
This commit is contained in:
Linus Torvalds 2012-05-22 19:31:38 -07:00
commit 6101167727
16 changed files with 544 additions and 304 deletions

View File

@ -310,6 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
}
mutex_unlock(&ls->ls_cb_mutex);
log_debug(ls, "dlm_callback_resume %d", count);
if (count)
log_debug(ls, "dlm_callback_resume %d", count);
}

View File

@ -38,6 +38,7 @@
#include <linux/miscdevice.h>
#include <linux/mutex.h>
#include <linux/idr.h>
#include <linux/ratelimit.h>
#include <asm/uaccess.h>
#include <linux/dlm.h>
@ -74,6 +75,13 @@ do { \
(ls)->ls_name , ##args); \
} while (0)
#define log_limit(ls, fmt, args...) \
do { \
if (dlm_config.ci_log_debug) \
printk_ratelimited(KERN_DEBUG "dlm: %s: " fmt "\n", \
(ls)->ls_name , ##args); \
} while (0)
#define DLM_ASSERT(x, do) \
{ \
if (!(x)) \
@ -263,6 +271,8 @@ struct dlm_lkb {
ktime_t lkb_last_cast_time; /* for debugging */
ktime_t lkb_last_bast_time; /* for debugging */
uint64_t lkb_recover_seq; /* from ls_recover_seq */
char *lkb_lvbptr;
struct dlm_lksb *lkb_lksb; /* caller's status block */
void (*lkb_astfn) (void *astparam);
@ -317,7 +327,7 @@ enum rsb_flags {
RSB_NEW_MASTER,
RSB_NEW_MASTER2,
RSB_RECOVER_CONVERT,
RSB_LOCKS_PURGED,
RSB_RECOVER_GRANT,
};
static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
@ -563,6 +573,7 @@ struct dlm_ls {
struct mutex ls_requestqueue_mutex;
struct dlm_rcom *ls_recover_buf;
int ls_recover_nodeid; /* for debugging */
unsigned int ls_recover_locks_in; /* for log info */
uint64_t ls_rcom_seq;
spinlock_t ls_rcom_spin;
struct list_head ls_recover_list;
@ -589,6 +600,7 @@ struct dlm_ls {
#define LSFL_UEVENT_WAIT 5
#define LSFL_TIMEWARN 6
#define LSFL_CB_DELAY 7
#define LSFL_NODIR 8
/* much of this is just saving user space pointers associated with the
lock that we pass back to the user lib with an ast */
@ -636,7 +648,7 @@ static inline int dlm_recovery_stopped(struct dlm_ls *ls)
static inline int dlm_no_directory(struct dlm_ls *ls)
{
return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0;
return test_bit(LSFL_NODIR, &ls->ls_flags);
}
int dlm_netlink_init(void);

File diff suppressed because it is too large Load Diff

View File

@ -15,7 +15,8 @@
void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_print_lkb(struct dlm_lkb *lkb);
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
uint32_t saved_seq);
void dlm_receive_buffer(union dlm_packet *p, int nodeid);
int dlm_modes_compat(int mode1, int mode2);
void dlm_put_rsb(struct dlm_rsb *r);
@ -31,9 +32,9 @@ void dlm_adjust_timeouts(struct dlm_ls *ls);
int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
unsigned int flags, struct dlm_rsb **r_ret);
int dlm_purge_locks(struct dlm_ls *ls);
void dlm_recover_purge(struct dlm_ls *ls);
void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
void dlm_grant_after_purge(struct dlm_ls *ls);
void dlm_recover_grant(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls);
void dlm_recover_waiters_pre(struct dlm_ls *ls);
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);

View File

@ -74,6 +74,19 @@ static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
return len;
}
static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
{
return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
}
static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
{
int val = simple_strtoul(buf, NULL, 0);
if (val == 1)
set_bit(LSFL_NODIR, &ls->ls_flags);
return len;
}
static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
{
uint32_t status = dlm_recover_status(ls);
@ -107,6 +120,12 @@ static struct dlm_attr dlm_attr_id = {
.store = dlm_id_store
};
static struct dlm_attr dlm_attr_nodir = {
.attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
.show = dlm_nodir_show,
.store = dlm_nodir_store
};
static struct dlm_attr dlm_attr_recover_status = {
.attr = {.name = "recover_status", .mode = S_IRUGO},
.show = dlm_recover_status_show
@ -121,6 +140,7 @@ static struct attribute *dlm_attrs[] = {
&dlm_attr_control.attr,
&dlm_attr_event.attr,
&dlm_attr_id.attr,
&dlm_attr_nodir.attr,
&dlm_attr_recover_status.attr,
&dlm_attr_recover_nodeid.attr,
NULL,

View File

@ -142,6 +142,7 @@ struct writequeue_entry {
static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
static int dlm_allow_conn;
/* Work queues */
static struct workqueue_struct *recv_workqueue;
@ -710,6 +711,13 @@ static int tcp_accept_from_sock(struct connection *con)
struct connection *newcon;
struct connection *addcon;
mutex_lock(&connections_lock);
if (!dlm_allow_conn) {
mutex_unlock(&connections_lock);
return -1;
}
mutex_unlock(&connections_lock);
memset(&peeraddr, 0, sizeof(peeraddr));
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
IPPROTO_TCP, &newsock);
@ -1503,6 +1511,7 @@ void dlm_lowcomms_stop(void)
socket activity.
*/
mutex_lock(&connections_lock);
dlm_allow_conn = 0;
foreach_conn(stop_conn);
mutex_unlock(&connections_lock);
@ -1530,7 +1539,7 @@ int dlm_lowcomms_start(void)
if (!dlm_local_count) {
error = -ENOTCONN;
log_print("no local IP address has been set");
goto out;
goto fail;
}
error = -ENOMEM;
@ -1538,7 +1547,13 @@ int dlm_lowcomms_start(void)
__alignof__(struct connection), 0,
NULL);
if (!con_cache)
goto out;
goto fail;
error = work_start();
if (error)
goto fail_destroy;
dlm_allow_conn = 1;
/* Start listening */
if (dlm_config.ci_protocol == 0)
@ -1548,20 +1563,17 @@ int dlm_lowcomms_start(void)
if (error)
goto fail_unlisten;
error = work_start();
if (error)
goto fail_unlisten;
return 0;
fail_unlisten:
dlm_allow_conn = 0;
con = nodeid2con(0,0);
if (con) {
close_connection(con, false);
kmem_cache_free(con_cache, con);
}
fail_destroy:
kmem_cache_destroy(con_cache);
out:
fail:
return error;
}

View File

@ -21,21 +21,19 @@ static struct kmem_cache *rsb_cache;
int __init dlm_memory_init(void)
{
int ret = 0;
lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb),
__alignof__(struct dlm_lkb), 0, NULL);
if (!lkb_cache)
ret = -ENOMEM;
return -ENOMEM;
rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
__alignof__(struct dlm_rsb), 0, NULL);
if (!rsb_cache) {
kmem_cache_destroy(lkb_cache);
ret = -ENOMEM;
return -ENOMEM;
}
return ret;
return 0;
}
void dlm_memory_exit(void)

View File

@ -486,47 +486,50 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
return 0;
}
static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
{
uint64_t seq;
int rv = 0;
switch (rc->rc_type) {
case DLM_RCOM_STATUS_REPLY:
case DLM_RCOM_NAMES_REPLY:
case DLM_RCOM_LOOKUP_REPLY:
case DLM_RCOM_LOCK_REPLY:
spin_lock(&ls->ls_recover_lock);
seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
if (rc->rc_seq_reply != seq) {
log_debug(ls, "ignoring old reply %x from %d "
"seq_reply %llx expect %llx",
rc->rc_type, rc->rc_header.h_nodeid,
(unsigned long long)rc->rc_seq_reply,
(unsigned long long)seq);
rv = 1;
}
}
return rv;
}
/* Called by dlm_recv; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
int stop, reply = 0, lock = 0;
uint32_t status;
uint64_t seq;
if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
log_debug(ls, "ignoring recovery message %x from %d",
rc->rc_type, nodeid);
switch (rc->rc_type) {
case DLM_RCOM_LOCK:
lock = 1;
break;
case DLM_RCOM_LOCK_REPLY:
lock = 1;
reply = 1;
break;
case DLM_RCOM_STATUS_REPLY:
case DLM_RCOM_NAMES_REPLY:
case DLM_RCOM_LOOKUP_REPLY:
reply = 1;
};
spin_lock(&ls->ls_recover_lock);
status = ls->ls_recover_status;
stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
(reply && (rc->rc_seq_reply != seq)) ||
(lock && !(status & DLM_RS_DIR))) {
log_limit(ls, "dlm_receive_rcom ignore msg %d "
"from %d %llu %llu recover seq %llu sts %x gen %u",
rc->rc_type,
nodeid,
(unsigned long long)rc->rc_seq,
(unsigned long long)rc->rc_seq_reply,
(unsigned long long)seq,
status, ls->ls_generation);
goto out;
}
if (is_old_reply(ls, rc))
goto out;
switch (rc->rc_type) {
case DLM_RCOM_STATUS:
receive_rcom_status(ls, rc);

View File

@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid)
{
struct dlm_lkb *lkb;
list_for_each_entry(lkb, queue, lkb_statequeue)
if (!(lkb->lkb_flags & DLM_IFL_MSTCPY))
list_for_each_entry(lkb, queue, lkb_statequeue) {
if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
lkb->lkb_nodeid = nodeid;
lkb->lkb_remid = 0;
}
}
}
static void set_master_lkbs(struct dlm_rsb *r)
@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r)
/*
* Propagate the new master nodeid to locks
* The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
* The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
* The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
* rsb's to consider.
*/
static void set_new_master(struct dlm_rsb *r, int nodeid)
{
lock_rsb(r);
r->res_nodeid = nodeid;
set_master_lkbs(r);
rsb_set_flag(r, RSB_NEW_MASTER);
rsb_set_flag(r, RSB_NEW_MASTER2);
unlock_rsb(r);
}
/*
@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
static int recover_master(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
dir_nodeid = dlm_dir_nodeid(r);
int error, ret_nodeid;
int our_nodeid = dlm_our_nodeid();
int dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid == our_nodeid) {
error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r)
if (ret_nodeid == our_nodeid)
ret_nodeid = 0;
lock_rsb(r);
set_new_master(r, ret_nodeid);
unlock_rsb(r);
} else {
recover_list_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid);
@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r)
}
/*
* When not using a directory, most resource names will hash to a new static
* master nodeid and the resource will need to be remastered.
* All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
* This is necessary because recovery can be started, aborted and restarted,
* causing the master nodeid to briefly change during the aborted recovery, and
* change back to the original value in the second recovery. The MSTCPY locks
* may or may not have been purged during the aborted recovery. Another node
* with an outstanding request in waiters list and a request reply saved in the
* requestqueue, cannot know whether it should ignore the reply and resend the
* request, or accept the reply and complete the request. It must do the
* former if the remote node purged MSTCPY locks, and it must do the later if
* the remote node did not. This is solved by always purging MSTCPY locks, in
* which case, the request reply would always be ignored and the request
* resent.
*/
static int recover_master_static(struct dlm_rsb *r)
{
int master = dlm_dir_nodeid(r);
int dir_nodeid = dlm_dir_nodeid(r);
int new_master = dir_nodeid;
if (master == dlm_our_nodeid())
master = 0;
if (dir_nodeid == dlm_our_nodeid())
new_master = 0;
if (r->res_nodeid != master) {
if (is_master(r))
dlm_purge_mstcpy_locks(r);
set_new_master(r, master);
return 1;
}
return 0;
lock_rsb(r);
dlm_purge_mstcpy_locks(r);
set_new_master(r, new_master);
unlock_rsb(r);
return 1;
}
/*
@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
if (nodeid == dlm_our_nodeid())
nodeid = 0;
lock_rsb(r);
set_new_master(r, nodeid);
unlock_rsb(r);
recover_list_del(r);
if (recover_list_empty(ls))
@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
struct dlm_rsb *r;
int error, count = 0;
log_debug(ls, "dlm_recover_locks");
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (is_master(r)) {
@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
}
up_read(&ls->ls_root_sem);
log_debug(ls, "dlm_recover_locks %d locks", count);
log_debug(ls, "dlm_recover_locks %d out", count);
error = dlm_wait_function(ls, &recover_list_empty);
out:
@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r)
}
/* We've become the new master for this rsb and waiting/converting locks may
need to be granted in dlm_grant_after_purge() due to locks that may have
need to be granted in dlm_recover_grant() due to locks that may have
existed from a removed node. */
static void set_locks_purged(struct dlm_rsb *r)
static void recover_grant(struct dlm_rsb *r)
{
if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
rsb_set_flag(r, RSB_LOCKS_PURGED);
rsb_set_flag(r, RSB_RECOVER_GRANT);
}
void dlm_recover_rsbs(struct dlm_ls *ls)
{
struct dlm_rsb *r;
int count = 0;
log_debug(ls, "dlm_recover_rsbs");
unsigned int count = 0;
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
if (rsb_flag(r, RSB_RECOVER_CONVERT))
recover_conversion(r);
if (rsb_flag(r, RSB_NEW_MASTER2))
set_locks_purged(r);
recover_grant(r);
recover_lvb(r);
count++;
}
@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
}
up_read(&ls->ls_root_sem);
log_debug(ls, "dlm_recover_rsbs %d rsbs", count);
if (count)
log_debug(ls, "dlm_recover_rsbs %d done", count);
}
/* Create a single list of all root rsb's to be used during recovery */

View File

@ -54,7 +54,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
unsigned long start;
int error, neg = 0;
log_debug(ls, "dlm_recover %llx", (unsigned long long)rv->seq);
log_debug(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
mutex_lock(&ls->ls_recoverd_active);
@ -84,6 +84,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail;
}
ls->ls_recover_locks_in = 0;
dlm_set_recover_status(ls, DLM_RS_NODES);
error = dlm_recover_members_wait(ls);
@ -130,7 +132,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* Clear lkb's for departed nodes.
*/
dlm_purge_locks(ls);
dlm_recover_purge(ls);
/*
* Get new master nodeid's for rsb's that were mastered on
@ -161,6 +163,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail;
}
log_debug(ls, "dlm_recover_locks %u in",
ls->ls_recover_locks_in);
/*
* Finalize state in master rsb's now that all locks can be
* checked. This includes conversion resolution and lvb
@ -225,9 +230,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail;
}
dlm_grant_after_purge(ls);
dlm_recover_grant(ls);
log_debug(ls, "dlm_recover %llx generation %u done: %u ms",
log_debug(ls, "dlm_recover %llu generation %u done: %u ms",
(unsigned long long)rv->seq, ls->ls_generation,
jiffies_to_msecs(jiffies - start));
mutex_unlock(&ls->ls_recoverd_active);
@ -237,7 +242,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
fail:
dlm_release_root_list(ls);
log_debug(ls, "dlm_recover %llx error %d",
log_debug(ls, "dlm_recover %llu error %d",
(unsigned long long)rv->seq, error);
mutex_unlock(&ls->ls_recoverd_active);
return error;

View File

@ -19,6 +19,7 @@
struct rq_entry {
struct list_head list;
uint32_t recover_seq;
int nodeid;
struct dlm_message request;
};
@ -41,6 +42,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
return;
}
e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
e->nodeid = nodeid;
memcpy(&e->request, ms, ms->m_header.h_length);
@ -63,6 +65,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
int dlm_process_requestqueue(struct dlm_ls *ls)
{
struct rq_entry *e;
struct dlm_message *ms;
int error = 0;
mutex_lock(&ls->ls_requestqueue_mutex);
@ -76,7 +79,15 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
mutex_unlock(&ls->ls_requestqueue_mutex);
dlm_receive_message_saved(ls, &e->request);
ms = &e->request;
log_limit(ls, "dlm_process_requestqueue msg %d from %d "
"lkid %x remid %x result %d seq %u",
ms->m_type, ms->m_header.h_nodeid,
ms->m_lkid, ms->m_remid, ms->m_result,
e->recover_seq);
dlm_receive_message_saved(ls, &e->request, e->recover_seq);
mutex_lock(&ls->ls_requestqueue_mutex);
list_del(&e->list);
@ -138,35 +149,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
if (!dlm_no_directory(ls))
return 0;
/* with no directory, the master is likely to change as a part of
recovery; requests to/from the defunct master need to be purged */
switch (type) {
case DLM_MSG_REQUEST:
case DLM_MSG_CONVERT:
case DLM_MSG_UNLOCK:
case DLM_MSG_CANCEL:
/* we're no longer the master of this resource, the sender
will resend to the new master (see waiter_needs_recovery) */
if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid())
return 1;
break;
case DLM_MSG_REQUEST_REPLY:
case DLM_MSG_CONVERT_REPLY:
case DLM_MSG_UNLOCK_REPLY:
case DLM_MSG_CANCEL_REPLY:
case DLM_MSG_GRANT:
/* this reply is from the former master of the resource,
we'll resend to the new master if needed */
if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid)
return 1;
break;
}
return 0;
return 1;
}
void dlm_purge_requestqueue(struct dlm_ls *ls)

View File

@ -543,7 +543,6 @@ struct gfs2_sb_host {
struct lm_lockstruct {
int ls_jid;
unsigned int ls_first;
unsigned int ls_nodir;
const struct lm_lockops *ls_ops;
dlm_lockspace_t *ls_dlm;

View File

@ -1209,8 +1209,6 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
fsname++;
flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
if (ls->ls_nodir)
flags |= DLM_LSFL_NODIR;
/*
* create/join lockspace

View File

@ -993,6 +993,7 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
ls->ls_jid = option;
break;
case Opt_id:
case Opt_nodir:
/* Obsolete, but left for backward compat purposes */
break;
case Opt_first:
@ -1001,12 +1002,6 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
goto hostdata_error;
ls->ls_first = option;
break;
case Opt_nodir:
ret = match_int(&tmp[0], &option);
if (ret || (option != 0 && option != 1))
goto hostdata_error;
ls->ls_nodir = option;
break;
case Opt_err:
default:
hostdata_error:

View File

@ -368,10 +368,7 @@ int gfs2_recover_set(struct gfs2_sbd *sdp, unsigned jid)
struct gfs2_jdesc *jd;
int rv;
rv = -ESHUTDOWN;
spin_lock(&sdp->sd_jindex_spin);
if (test_bit(SDF_NORECOVERY, &sdp->sd_flags))
goto out;
rv = -EBUSY;
if (sdp->sd_jdesc->jd_jid == jid)
goto out;
@ -396,8 +393,13 @@ static ssize_t recover_store(struct gfs2_sbd *sdp, const char *buf, size_t len)
if (rv != 1)
return -EINVAL;
rv = gfs2_recover_set(sdp, jid);
if (test_bit(SDF_NORECOVERY, &sdp->sd_flags)) {
rv = -ESHUTDOWN;
goto out;
}
rv = gfs2_recover_set(sdp, jid);
out:
return rv ? rv : len;
}

View File

@ -67,7 +67,6 @@ struct dlm_lksb {
/* dlm_new_lockspace() flags */
#define DLM_LSFL_NODIR 0x00000001
#define DLM_LSFL_TIMEWARN 0x00000002
#define DLM_LSFL_FS 0x00000004
#define DLM_LSFL_NEWEXCL 0x00000008