diff options
author | Len Brown <len.brown@intel.com> | 2009-04-05 02:14:15 -0400 |
---|---|---|
committer | Len Brown <len.brown@intel.com> | 2009-04-05 02:14:15 -0400 |
commit | 478c6a43fcbc6c11609f8cee7c7b57223907754f (patch) | |
tree | a7f7952099da60d33032aed6de9c0c56c9f8779e /fs/ocfs2/dlm | |
parent | 8a3f257c704e02aee9869decd069a806b45be3f1 (diff) | |
parent | 6bb597507f9839b13498781e481f5458aea33620 (diff) |
Merge branch 'linus' into release
Conflicts:
arch/x86/kernel/cpu/cpufreq/longhaul.c
Signed-off-by: Len Brown <len.brown@intel.com>
Diffstat (limited to 'fs/ocfs2/dlm')
-rw-r--r-- | fs/ocfs2/dlm/dlmcommon.h | 58 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmdebug.c | 87 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmdomain.c | 29 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmmaster.c | 387 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmthread.c | 20 |
5 files changed, 335 insertions, 246 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index bb53714813a..0102be35980 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -52,16 +52,12 @@ enum dlm_mle_type { DLM_MLE_BLOCK, DLM_MLE_MASTER, - DLM_MLE_MIGRATION -}; - -struct dlm_lock_name { - u8 len; - u8 name[DLM_LOCKID_NAME_MAX]; + DLM_MLE_MIGRATION, + DLM_MLE_NUM_TYPES }; struct dlm_master_list_entry { - struct list_head list; + struct hlist_node master_hash_node; struct list_head hb_events; struct dlm_ctxt *dlm; spinlock_t spinlock; @@ -78,10 +74,10 @@ struct dlm_master_list_entry { enum dlm_mle_type type; struct o2hb_callback_func mle_hb_up; struct o2hb_callback_func mle_hb_down; - union { - struct dlm_lock_resource *res; - struct dlm_lock_name name; - } u; + struct dlm_lock_resource *mleres; + unsigned char mname[DLM_LOCKID_NAME_MAX]; + unsigned int mnamelen; + unsigned int mnamehash; }; enum dlm_ast_type { @@ -151,13 +147,14 @@ struct dlm_ctxt unsigned long recovery_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; struct dlm_recovery_ctxt reco; spinlock_t master_lock; - struct list_head master_list; + struct hlist_head **master_hash; struct list_head mle_hb_events; /* these give a really vague idea of the system load */ - atomic_t local_resources; - atomic_t remote_resources; - atomic_t unknown_resources; + atomic_t mle_tot_count[DLM_MLE_NUM_TYPES]; + atomic_t mle_cur_count[DLM_MLE_NUM_TYPES]; + atomic_t res_tot_count; + atomic_t res_cur_count; struct dlm_debug_ctxt *dlm_debug_ctxt; struct dentry *dlm_debugfs_subroot; @@ -195,6 +192,13 @@ static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned return dlm->lockres_hash[(i / DLM_BUCKETS_PER_PAGE) % DLM_HASH_PAGES] + (i % DLM_BUCKETS_PER_PAGE); } +static inline struct hlist_head *dlm_master_hash(struct dlm_ctxt *dlm, + unsigned i) +{ + return dlm->master_hash[(i / DLM_BUCKETS_PER_PAGE) % DLM_HASH_PAGES] + + (i % DLM_BUCKETS_PER_PAGE); +} + /* these keventd work queue items are for less-frequently * called functions that cannot be directly called from the * net message handlers for some reason, usually because @@ -848,9 +852,7 @@ struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, unsigned int len); int dlm_is_host_down(int errno); -void dlm_change_lockres_owner(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - u8 owner); + struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, const char *lockid, int namelen, @@ -1008,6 +1010,9 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res) DLM_LOCK_RES_MIGRATING)); } +void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle); +void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle); + /* create/destroy slab caches */ int dlm_init_master_caches(void); void dlm_destroy_master_caches(void); @@ -1110,6 +1115,23 @@ static inline int dlm_node_iter_next(struct dlm_node_iter *iter) return bit; } +static inline void dlm_set_lockres_owner(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + u8 owner) +{ + assert_spin_locked(&res->spinlock); + + res->owner = owner; +} +static inline void dlm_change_lockres_owner(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + u8 owner) +{ + assert_spin_locked(&res->spinlock); + + if (owner != res->owner) + dlm_set_lockres_owner(dlm, res, owner); +} #endif /* DLMCOMMON_H */ diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index b32f60a5acf..df52f706f66 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -287,18 +287,8 @@ static int stringify_nodemap(unsigned long *nodemap, int maxnodes, static int dump_mle(struct dlm_master_list_entry *mle, char *buf, int len) { int out = 0; - unsigned int namelen; - const char *name; char *mle_type; - if (mle->type != DLM_MLE_MASTER) { - namelen = mle->u.name.len; - name = mle->u.name.name; - } else { - namelen = mle->u.res->lockname.len; - name = mle->u.res->lockname.name; - } - if (mle->type == DLM_MLE_BLOCK) mle_type = "BLK"; else if (mle->type == DLM_MLE_MASTER) @@ -306,7 +296,7 @@ static int dump_mle(struct dlm_master_list_entry *mle, char *buf, int len) else mle_type = "MIG"; - out += stringify_lockname(name, namelen, buf + out, len - out); + out += stringify_lockname(mle->mname, mle->mnamelen, buf + out, len - out); out += snprintf(buf + out, len - out, "\t%3s\tmas=%3u\tnew=%3u\tevt=%1d\tuse=%1d\tref=%3d\n", mle_type, mle->master, mle->new_master, @@ -501,23 +491,33 @@ static struct file_operations debug_purgelist_fops = { static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db) { struct dlm_master_list_entry *mle; - int out = 0; - unsigned long total = 0; + struct hlist_head *bucket; + struct hlist_node *list; + int i, out = 0; + unsigned long total = 0, longest = 0, bktcnt; out += snprintf(db->buf + out, db->len - out, "Dumping MLEs for Domain: %s\n", dlm->name); spin_lock(&dlm->master_lock); - list_for_each_entry(mle, &dlm->master_list, list) { - ++total; - if (db->len - out < 200) - continue; - out += dump_mle(mle, db->buf + out, db->len - out); + for (i = 0; i < DLM_HASH_BUCKETS; i++) { + bucket = dlm_master_hash(dlm, i); + hlist_for_each(list, bucket) { + mle = hlist_entry(list, struct dlm_master_list_entry, + master_hash_node); + ++total; + ++bktcnt; + if (db->len - out < 200) + continue; + out += dump_mle(mle, db->buf + out, db->len - out); + } + longest = max(longest, bktcnt); + bktcnt = 0; } spin_unlock(&dlm->master_lock); out += snprintf(db->buf + out, db->len - out, - "Total on list: %ld\n", total); + "Total: %ld, Longest: %ld\n", total, longest); return out; } @@ -756,12 +756,8 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) int out = 0; struct dlm_reco_node_data *node; char *state; - int lres, rres, ures, tres; - - lres = atomic_read(&dlm->local_resources); - rres = atomic_read(&dlm->remote_resources); - ures = atomic_read(&dlm->unknown_resources); - tres = lres + rres + ures; + int cur_mles = 0, tot_mles = 0; + int i; spin_lock(&dlm->spinlock); @@ -804,21 +800,48 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) db->buf + out, db->len - out); out += snprintf(db->buf + out, db->len - out, "\n"); - /* Mastered Resources Total: xxx Locally: xxx Remotely: ... */ + /* Lock Resources: xxx (xxx) */ + out += snprintf(db->buf + out, db->len - out, + "Lock Resources: %d (%d)\n", + atomic_read(&dlm->res_cur_count), + atomic_read(&dlm->res_tot_count)); + + for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) + tot_mles += atomic_read(&dlm->mle_tot_count[i]); + + for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) + cur_mles += atomic_read(&dlm->mle_cur_count[i]); + + /* MLEs: xxx (xxx) */ + out += snprintf(db->buf + out, db->len - out, + "MLEs: %d (%d)\n", cur_mles, tot_mles); + + /* Blocking: xxx (xxx) */ + out += snprintf(db->buf + out, db->len - out, + " Blocking: %d (%d)\n", + atomic_read(&dlm->mle_cur_count[DLM_MLE_BLOCK]), + atomic_read(&dlm->mle_tot_count[DLM_MLE_BLOCK])); + + /* Mastery: xxx (xxx) */ + out += snprintf(db->buf + out, db->len - out, + " Mastery: %d (%d)\n", + atomic_read(&dlm->mle_cur_count[DLM_MLE_MASTER]), + atomic_read(&dlm->mle_tot_count[DLM_MLE_MASTER])); + + /* Migration: xxx (xxx) */ out += snprintf(db->buf + out, db->len - out, - "Mastered Resources Total: %d Locally: %d " - "Remotely: %d Unknown: %d\n", - tres, lres, rres, ures); + " Migration: %d (%d)\n", + atomic_read(&dlm->mle_cur_count[DLM_MLE_MIGRATION]), + atomic_read(&dlm->mle_tot_count[DLM_MLE_MIGRATION])); /* Lists: Dirty=Empty Purge=InUse PendingASTs=Empty ... */ out += snprintf(db->buf + out, db->len - out, "Lists: Dirty=%s Purge=%s PendingASTs=%s " - "PendingBASTs=%s Master=%s\n", + "PendingBASTs=%s\n", (list_empty(&dlm->dirty_list) ? "Empty" : "InUse"), (list_empty(&dlm->purge_list) ? "Empty" : "InUse"), (list_empty(&dlm->pending_asts) ? "Empty" : "InUse"), - (list_empty(&dlm->pending_basts) ? "Empty" : "InUse"), - (list_empty(&dlm->master_list) ? "Empty" : "InUse")); + (list_empty(&dlm->pending_basts) ? "Empty" : "InUse")); /* Purge Count: xxx Refs: xxx */ out += snprintf(db->buf + out, db->len - out, diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index d8d578f4561..4d9e6b288dd 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -304,6 +304,9 @@ static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) if (dlm->lockres_hash) dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); + if (dlm->master_hash) + dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); + if (dlm->name) kfree(dlm->name); @@ -1534,12 +1537,27 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, for (i = 0; i < DLM_HASH_BUCKETS; i++) INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i)); + dlm->master_hash = (struct hlist_head **) + dlm_alloc_pagevec(DLM_HASH_PAGES); + if (!dlm->master_hash) { + mlog_errno(-ENOMEM); + dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); + kfree(dlm->name); + kfree(dlm); + dlm = NULL; + goto leave; + } + + for (i = 0; i < DLM_HASH_BUCKETS; i++) + INIT_HLIST_HEAD(dlm_master_hash(dlm, i)); + strcpy(dlm->name, domain); dlm->key = key; dlm->node_num = o2nm_this_node(); ret = dlm_create_debugfs_subroot(dlm); if (ret < 0) { + dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); kfree(dlm->name); kfree(dlm); @@ -1579,7 +1597,6 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, init_waitqueue_head(&dlm->reco.event); init_waitqueue_head(&dlm->ast_wq); init_waitqueue_head(&dlm->migration_wq); - INIT_LIST_HEAD(&dlm->master_list); INIT_LIST_HEAD(&dlm->mle_hb_events); dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN; @@ -1587,9 +1604,13 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, dlm->reco.new_master = O2NM_INVALID_NODE_NUM; dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; - atomic_set(&dlm->local_resources, 0); - atomic_set(&dlm->remote_resources, 0); - atomic_set(&dlm->unknown_resources, 0); + + atomic_set(&dlm->res_tot_count, 0); + atomic_set(&dlm->res_cur_count, 0); + for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) { + atomic_set(&dlm->mle_tot_count[i], 0); + atomic_set(&dlm->mle_cur_count[i], 0); + } spin_lock_init(&dlm->work_lock); INIT_LIST_HEAD(&dlm->work_list); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 0a281394785..f8b653fcd4d 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -73,22 +73,13 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm, const char *name, unsigned int namelen) { - struct dlm_lock_resource *res; - if (dlm != mle->dlm) return 0; - if (mle->type == DLM_MLE_BLOCK || - mle->type == DLM_MLE_MIGRATION) { - if (namelen != mle->u.name.len || - memcmp(name, mle->u.name.name, namelen)!=0) - return 0; - } else { - res = mle->u.res; - if (namelen != res->lockname.len || - memcmp(res->lockname.name, name, namelen) != 0) - return 0; - } + if (namelen != mle->mnamelen || + memcmp(name, mle->mname, namelen) != 0) + return 0; + return 1; } @@ -283,7 +274,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, mle->dlm = dlm; mle->type = type; - INIT_LIST_HEAD(&mle->list); + INIT_HLIST_NODE(&mle->master_hash_node); INIT_LIST_HEAD(&mle->hb_events); memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); spin_lock_init(&mle->spinlock); @@ -295,19 +286,27 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, mle->new_master = O2NM_MAX_NODES; mle->inuse = 0; + BUG_ON(mle->type != DLM_MLE_BLOCK && + mle->type != DLM_MLE_MASTER && + mle->type != DLM_MLE_MIGRATION); + if (mle->type == DLM_MLE_MASTER) { BUG_ON(!res); - mle->u.res = res; - } else if (mle->type == DLM_MLE_BLOCK) { - BUG_ON(!name); - memcpy(mle->u.name.name, name, namelen); - mle->u.name.len = namelen; - } else /* DLM_MLE_MIGRATION */ { + mle->mleres = res; + memcpy(mle->mname, res->lockname.name, res->lockname.len); + mle->mnamelen = res->lockname.len; + mle->mnamehash = res->lockname.hash; + } else { BUG_ON(!name); - memcpy(mle->u.name.name, name, namelen); - mle->u.name.len = namelen; + mle->mleres = NULL; + memcpy(mle->mname, name, namelen); + mle->mnamelen = namelen; + mle->mnamehash = dlm_lockid_hash(name, namelen); } + atomic_inc(&dlm->mle_tot_count[mle->type]); + atomic_inc(&dlm->mle_cur_count[mle->type]); + /* copy off the node_map and register hb callbacks on our copy */ memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map)); memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map)); @@ -318,6 +317,24 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, __dlm_mle_attach_hb_events(dlm, mle); } +void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle) +{ + assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&dlm->master_lock); + + if (!hlist_unhashed(&mle->master_hash_node)) + hlist_del_init(&mle->master_hash_node); +} + +void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle) +{ + struct hlist_head *bucket; + + assert_spin_locked(&dlm->master_lock); + + bucket = dlm_master_hash(dlm, mle->mnamehash); + hlist_add_head(&mle->master_hash_node, bucket); +} /* returns 1 if found, 0 if not */ static int dlm_find_mle(struct dlm_ctxt *dlm, @@ -325,10 +342,17 @@ static int dlm_find_mle(struct dlm_ctxt *dlm, char *name, unsigned int namelen) { struct dlm_master_list_entry *tmpmle; + struct hlist_head *bucket; + struct hlist_node *list; + unsigned int hash; assert_spin_locked(&dlm->master_lock); - list_for_each_entry(tmpmle, &dlm->master_list, list) { + hash = dlm_lockid_hash(name, namelen); + bucket = dlm_master_hash(dlm, hash); + hlist_for_each(list, bucket) { + tmpmle = hlist_entry(list, struct dlm_master_list_entry, + master_hash_node); if (!dlm_mle_equal(dlm, tmpmle, name, namelen)) continue; dlm_get_mle(tmpmle); @@ -408,24 +432,20 @@ static void dlm_mle_release(struct kref *kref) mle = container_of(kref, struct dlm_master_list_entry, mle_refs); dlm = mle->dlm; - if (mle->type != DLM_MLE_MASTER) { - mlog(0, "calling mle_release for %.*s, type %d\n", - mle->u.name.len, mle->u.name.name, mle->type); - } else { - mlog(0, "calling mle_release for %.*s, type %d\n", - mle->u.res->lockname.len, - mle->u.res->lockname.name, mle->type); - } assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); + mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname, + mle->type); + /* remove from list if not already */ - if (!list_empty(&mle->list)) - list_del_init(&mle->list); + __dlm_unlink_mle(dlm, mle); /* detach the mle from the domain node up/down events */ __dlm_mle_detach_hb_events(dlm, mle); + atomic_dec(&dlm->mle_cur_count[mle->type]); + /* NOTE: kfree under spinlock here. * if this is bad, we can move this to a freelist. */ kmem_cache_free(dlm_mle_cache, mle); @@ -465,43 +485,6 @@ void dlm_destroy_master_caches(void) kmem_cache_destroy(dlm_lockres_cache); } -static void dlm_set_lockres_owner(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - u8 owner) -{ - assert_spin_locked(&res->spinlock); - - mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner); - - if (owner == dlm->node_num) - atomic_inc(&dlm->local_resources); - else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN) - atomic_inc(&dlm->unknown_resources); - else - atomic_inc(&dlm->remote_resources); - - res->owner = owner; -} - -void dlm_change_lockres_owner(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, u8 owner) -{ - assert_spin_locked(&res->spinlock); - - if (owner == res->owner) - return; - - if (res->owner == dlm->node_num) - atomic_dec(&dlm->local_resources); - else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) - atomic_dec(&dlm->unknown_resources); - else - atomic_dec(&dlm->remote_resources); - - dlm_set_lockres_owner(dlm, res, owner); -} - - static void dlm_lockres_release(struct kref *kref) { struct dlm_lock_resource *res; @@ -527,6 +510,8 @@ static void dlm_lockres_release(struct kref *kref) } spin_unlock(&dlm->track_lock); + atomic_dec(&dlm->res_cur_count); + dlm_put(dlm); if (!hlist_unhashed(&res->hash_node) || @@ -607,6 +592,9 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, kref_init(&res->refs); + atomic_inc(&dlm->res_tot_count); + atomic_inc(&dlm->res_cur_count); + /* just for consistency */ spin_lock(&res->spinlock); dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN); @@ -843,7 +831,7 @@ lookup: alloc_mle = NULL; dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); set_bit(dlm->node_num, mle->maybe_map); - list_add(&mle->list, &dlm->master_list); + __dlm_insert_mle(dlm, mle); /* still holding the dlm spinlock, check the recovery map * to see if there are any nodes that still need to be @@ -1270,7 +1258,7 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, res->lockname.len, res->lockname.name); mle->type = DLM_MLE_MASTER; - mle->u.res = res; + mle->mleres = res; } } } @@ -1315,14 +1303,8 @@ static int dlm_do_master_request(struct dlm_lock_resource *res, BUG_ON(mle->type == DLM_MLE_MIGRATION); - if (mle->type != DLM_MLE_MASTER) { - request.namelen = mle->u.name.len; - memcpy(request.name, mle->u.name.name, request.namelen); - } else { - request.namelen = mle->u.res->lockname.len; - memcpy(request.name, mle->u.res->lockname.name, - request.namelen); - } + request.namelen = (u8)mle->mnamelen; + memcpy(request.name, mle->mname, request.namelen); again: ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, @@ -1575,7 +1557,7 @@ way_up_top: // "add the block.\n"); dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); set_bit(request->node_idx, mle->maybe_map); - list_add(&mle->list, &dlm->master_list); + __dlm_insert_mle(dlm, mle); response = DLM_MASTER_RESP_NO; } else { // mlog(0, "mle was found\n"); @@ -1967,7 +1949,7 @@ ok: assert->node_idx, rr, extra_ref, mle->inuse); dlm_print_one_mle(mle); } - list_del_init(&mle->list); + __dlm_unlink_mle(dlm, mle); __dlm_mle_detach_hb_events(dlm, mle); __dlm_put_mle(mle); if (extra_ref) { @@ -3159,10 +3141,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, tmp->master = master; atomic_set(&tmp->woken, 1); wake_up(&tmp->wq); - /* remove it from the list so that only one - * mle will be found */ - list_del_init(&tmp->list); - /* this was obviously WRONG. mle is uninited here. should be tmp. */ + /* remove it so that only one mle will be found */ + __dlm_unlink_mle(dlm, tmp); __dlm_mle_detach_hb_events(dlm, tmp); ret = DLM_MIGRATE_RESPONSE_MASTERY_REF; mlog(0, "%s:%.*s: master=%u, newmaster=%u, " @@ -3181,137 +3161,164 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, mle->master = master; /* do this for consistency with other mle types */ set_bit(new_master, mle->maybe_map); - list_add(&mle->list, &dlm->master_list); + __dlm_insert_mle(dlm, mle); return ret; } - -void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) +/* + * Sets the owner of the lockres, associated to the mle, to UNKNOWN + */ +static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm, + struct dlm_master_list_entry *mle) { - struct dlm_master_list_entry *mle, *next; struct dlm_lock_resource *res; - unsigned int hash; - mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); -top: - assert_spin_locked(&dlm->spinlock); + /* Find the lockres associated to the mle and set its owner to UNK */ + res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen, + mle->mnamehash); + if (res) { + spin_unlock(&dlm->master_lock); - /* clean the master list */ - spin_lock(&dlm->master_lock); - list_for_each_entry_safe(mle, next, &dlm->master_list, list) { - BUG_ON(mle->type != DLM_MLE_BLOCK && - mle->type != DLM_MLE_MASTER && - mle->type != DLM_MLE_MIGRATION); - - /* MASTER mles are initiated locally. the waiting - * process will notice the node map change - * shortly. let that happen as normal. */ - if (mle->type == DLM_MLE_MASTER) - continue; + /* move lockres onto recovery list */ + spin_lock(&res->spinlock); + dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN); + dlm_move_lockres_to_recovery_list(dlm, res); + spin_unlock(&res->spinlock); + dlm_lockres_put(res); + /* about to get rid of mle, detach from heartbeat */ + __dlm_mle_detach_hb_events(dlm, mle); - /* BLOCK mles are initiated by other nodes. - * need to clean up if the dead node would have - * been the master. */ - if (mle->type == DLM_MLE_BLOCK) { - int bit; + /* dump the mle */ + spin_lock(&dlm->master_lock); + __dlm_put_mle(mle); + spin_unlock(&dlm->master_lock); + } - spin_lock(&mle->spinlock); - bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); - if (bit != dead_node) { - mlog(0, "mle found, but dead node %u would " - "not have been master\n", dead_node); - spin_unlock(&mle->spinlock); - } else { - /* must drop the refcount by one since the - * assert_master will never arrive. this - * may result in the mle being unlinked and - * freed, but there may still be a process - * waiting in the dlmlock path which is fine. */ - mlog(0, "node %u was expected master\n", - dead_node); - atomic_set(&mle->woken, 1); - spin_unlock(&mle->spinlock); - wake_up(&mle->wq); - /* do not need events any longer, so detach - * from heartbeat */ - __dlm_mle_detach_hb_events(dlm, mle); - __dlm_put_mle(mle); - } - continue; - } + return res; +} - /* everything else is a MIGRATION mle */ - - /* the rule for MIGRATION mles is that the master - * becomes UNKNOWN if *either* the original or - * the new master dies. all UNKNOWN lockreses - * are sent to whichever node becomes the recovery - * master. the new master is responsible for - * determining if there is still a master for - * this lockres, or if he needs to take over - * mastery. either way, this node should expect - * another message to resolve this. */ - if (mle->master != dead_node && - mle->new_master != dead_node) - continue; +static void dlm_clean_migration_mle(struct dlm_ctxt *dlm, + struct dlm_master_list_entry *mle) +{ + __dlm_mle_detach_hb_events(dlm, mle); - /* if we have reached this point, this mle needs to - * be removed from the list and freed. */ + spin_lock(&mle->spinlock); + __dlm_unlink_mle(dlm, mle); + atomic_set(&mle->woken, 1); + spin_unlock(&mle->spinlock); - /* remove from the list early. NOTE: unlinking - * list_head while in list_for_each_safe */ - __dlm_mle_detach_hb_events(dlm, mle); - spin_lock(&mle->spinlock); - list_del_init(&mle->list); + wake_up(&mle->wq); +} + +static void dlm_clean_block_mle(struct dlm_ctxt *dlm, + struct dlm_master_list_entry *mle, u8 dead_node) +{ + int bit; + + BUG_ON(mle->type != DLM_MLE_BLOCK); + + spin_lock(&mle->spinlock); + bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); + if (bit != dead_node) { + mlog(0, "mle found, but dead node %u would not have been " + "master\n", dead_node); + spin_unlock(&mle->spinlock); + } else { + /* Must drop the refcount by one since the assert_master will + * never arrive. This may result in the mle being unlinked and + * freed, but there may still be a process waiting in the + * dlmlock path which is fine. */ + mlog(0, "node %u was expected master\n", dead_node); atomic_set(&mle->woken, 1); spin_unlock(&mle->spinlock); wake_up(&mle->wq); - mlog(0, "%s: node %u died during migration from " - "%u to %u!\n", dlm->name, dead_node, - mle->master, mle->new_master); - /* if there is a lockres associated with this - * mle, find it and set its owner to UNKNOWN */ - hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len); - res = __dlm_lookup_lockres(dlm, mle->u.name.name, - mle->u.name.len, hash); - if (res) { - /* unfortunately if we hit this rare case, our - * lock ordering is messed. we need to drop - * the master lock so that we can take the - * lockres lock, meaning that we will have to - * restart from the head of list. */ - spin_unlock(&dlm->master_lock); + /* Do not need events any longer, so detach from heartbeat */ + __dlm_mle_detach_hb_events(dlm, mle); + __dlm_put_mle(mle); + } +} - /* move lockres onto recovery list */ - spin_lock(&res->spinlock); - dlm_set_lockres_owner(dlm, res, - DLM_LOCK_RES_OWNER_UNKNOWN); - dlm_move_lockres_to_recovery_list(dlm, res); - spin_unlock(&res->spinlock); - dlm_lockres_put(res); +void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) +{ + struct dlm_master_list_entry *mle; + struct dlm_lock_resource *res; + struct hlist_head *bucket; + struct hlist_node *list; + unsigned int i; - /* about to get rid of mle, detach from heartbeat */ - __dlm_mle_detach_hb_events(dlm, mle); + mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); +top: + assert_spin_locked(&dlm->spinlock); - /* dump the mle */ - spin_lock(&dlm->master_lock); - __dlm_put_mle(mle); - spin_unlock(&dlm->master_lock); + /* clean the master list */ + spin_lock(&dlm->master_lock); + for (i = 0; i < DLM_HASH_BUCKETS; i++) { + bucket = dlm_master_hash(dlm, i); + hlist_for_each(list, bucket) { + mle = hlist_entry(list, struct dlm_master_list_entry, + master_hash_node); + + BUG_ON(mle->type != DLM_MLE_BLOCK && + mle->type != DLM_MLE_MASTER && + mle->type != DLM_MLE_MIGRATION); + + /* MASTER mles are initiated locally. The waiting + * process will notice the node map change shortly. + * Let that happen as normal. */ + if (mle->type == DLM_MLE_MASTER) + continue; + + /* BLOCK mles are initiated by other nodes. Need to + * clean up if the dead node would have been the + * master. */ + if (mle->type == DLM_MLE_BLOCK) { + dlm_clean_block_mle(dlm, mle, dead_node); + continue; + } - /* restart */ - goto top; - } + /* Everything else is a MIGRATION mle */ + + /* The rule for MIGRATION mles is that the master + * becomes UNKNOWN if *either* the original or the new + * master dies. All UNKNOWN lockres' are sent to + * whichever node becomes the recovery master. The new + * master is responsible for determining if there is + * still a master for this lockres, or if he needs to + * take over mastery. Either way, this node should + * expect another message to resolve this. */ + + if (mle->master != dead_node && + mle->new_master != dead_node) + continue; + + /* If we have reached this point, this mle needs to be + * removed from the list and freed. */ + dlm_clean_migration_mle(dlm, mle); + + mlog(0, "%s: node %u died during migration from " + "%u to %u!\n", dlm->name, dead_node, mle->master, + mle->new_master); + + /* If we find a lockres associated with the mle, we've + * hit this rare case that messes up our lock ordering. + * If so, we need to drop the master lock so that we can + * take the lockres lock, meaning that we will have to + * restart from the head of list. */ + res = dlm_reset_mleres_owner(dlm, mle); + if (res) + /* restart */ + goto top; - /* this may be the last reference */ - __dlm_put_mle(mle); + /* This may be the last reference */ + __dlm_put_mle(mle); + } } spin_unlock(&dlm->master_lock); } - int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 old_master) { diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 4060bb328bc..d490b66ad9d 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -162,12 +162,28 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm, spin_lock(&res->spinlock); if (!__dlm_lockres_unused(res)) { - spin_unlock(&res->spinlock); mlog(0, "%s:%.*s: tried to purge but not unused\n", dlm->name, res->lockname.len, res->lockname.name); - return -ENOTEMPTY; + __dlm_print_one_lock_resource(res); + spin_unlock(&res->spinlock); + BUG(); } + + if (res->state & DLM_LOCK_RES_MIGRATING) { + mlog(0, "%s:%.*s: Delay dropref as this lockres is " + "being remastered\n", dlm->name, res->lockname.len, + res->lockname.name); + /* Re-add the lockres to the end of the purge list */ + if (!list_empty(&res->purge)) { + list_del_init(&res->purge); + list_add_tail(&res->purge, &dlm->purge_list); + } + spin_unlock(&res->spinlock); + return 0; + } + master = (res->owner == dlm->node_num); + if (!master) res->state |= DLM_LOCK_RES_DROPPING_REF; spin_unlock(&res->spinlock); |