diff options
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r-- | fs/ocfs2/dlm/dlmrecovery.c | 65 |
1 files changed, 54 insertions, 11 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 186e9a76aa5..1e232000f3f 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -278,6 +278,24 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) return dead; } +int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) +{ + if (timeout) { + mlog(ML_NOTICE, "%s: waiting %dms for notification of " + "death of node %u\n", dlm->name, timeout, node); + wait_event_timeout(dlm->dlm_reco_thread_wq, + dlm_is_node_dead(dlm, node), + msecs_to_jiffies(timeout)); + } else { + mlog(ML_NOTICE, "%s: waiting indefinitely for notification " + "of death of node %u\n", dlm->name, node); + wait_event(dlm->dlm_reco_thread_wq, + dlm_is_node_dead(dlm, node)); + } + /* for now, return 0 */ + return 0; +} + /* callers of the top-level api calls (dlmlock/dlmunlock) should * block on the dlm->reco.event when recovery is in progress. * the dlm recovery thread will set this state when it begins @@ -1675,7 +1693,10 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, u8 dead_node, u8 new_master) { int i; - struct list_head *iter, *iter2, *bucket; + struct list_head *iter, *iter2; + struct hlist_node *hash_iter; + struct hlist_head *bucket; + struct dlm_lock_resource *res; mlog_entry_void(); @@ -1699,10 +1720,9 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, * for now we need to run the whole hash, clear * the RECOVERING state and set the owner * if necessary */ - for (i=0; i<DLM_HASH_SIZE; i++) { - bucket = &(dlm->resources[i]); - list_for_each(iter, bucket) { - res = list_entry (iter, struct dlm_lock_resource, list); + for (i = 0; i < DLM_HASH_BUCKETS; i++) { + bucket = &(dlm->lockres_hash[i]); + hlist_for_each_entry(res, hash_iter, bucket, hash_node) { if (res->state & DLM_LOCK_RES_RECOVERING) { if (res->owner == dead_node) { mlog(0, "(this=%u) res %.*s owner=%u " @@ -1834,10 +1854,10 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) { - struct list_head *iter; + struct hlist_node *iter; struct dlm_lock_resource *res; int i; - struct list_head *bucket; + struct hlist_head *bucket; struct dlm_lock *lock; @@ -1858,10 +1878,9 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) * can be kicked again to see if any ASTs or BASTs * need to be fired as a result. */ - for (i=0; i<DLM_HASH_SIZE; i++) { - bucket = &(dlm->resources[i]); - list_for_each(iter, bucket) { - res = list_entry (iter, struct dlm_lock_resource, list); + for (i = 0; i < DLM_HASH_BUCKETS; i++) { + bucket = &(dlm->lockres_hash[i]); + hlist_for_each_entry(res, iter, bucket, hash_node) { /* always prune any $RECOVERY entries for dead nodes, * otherwise hangs can occur during later recovery */ if (dlm_is_recovery_lock(res->lockname.name, @@ -2032,6 +2051,30 @@ again: dlm->reco.new_master); status = -EEXIST; } else { + status = 0; + + /* see if recovery was already finished elsewhere */ + spin_lock(&dlm->spinlock); + if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { + status = -EINVAL; + mlog(0, "%s: got reco EX lock, but " + "node got recovered already\n", dlm->name); + if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { + mlog(ML_ERROR, "%s: new master is %u " + "but no dead node!\n", + dlm->name, dlm->reco.new_master); + BUG(); + } + } + spin_unlock(&dlm->spinlock); + } + + /* if this node has actually become the recovery master, + * set the master and send the messages to begin recovery */ + if (!status) { + mlog(0, "%s: dead=%u, this=%u, sending " + "begin_reco now\n", dlm->name, + dlm->reco.dead_node, dlm->node_num); status = dlm_send_begin_reco_message(dlm, dlm->reco.dead_node); /* this always succeeds */ |