diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/dir.c | 76 | ||||
-rw-r--r-- | fs/dlm/dlm_internal.h | 16 | ||||
-rw-r--r-- | fs/dlm/lock.c | 249 | ||||
-rw-r--r-- | fs/dlm/lock.h | 2 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 66 | ||||
-rw-r--r-- | fs/dlm/lowcomms.c | 21 | ||||
-rw-r--r-- | fs/dlm/main.c | 10 | ||||
-rw-r--r-- | fs/dlm/member.c | 4 | ||||
-rw-r--r-- | fs/dlm/member.h | 3 | ||||
-rw-r--r-- | fs/dlm/memory.c | 32 | ||||
-rw-r--r-- | fs/dlm/memory.h | 16 | ||||
-rw-r--r-- | fs/dlm/midcomms.c | 15 | ||||
-rw-r--r-- | fs/dlm/rcom.c | 25 | ||||
-rw-r--r-- | fs/dlm/recover.c | 27 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 11 | ||||
-rw-r--r-- | fs/dlm/user.c | 29 | ||||
-rw-r--r-- | fs/dlm/util.c | 82 |
17 files changed, 410 insertions, 274 deletions
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index 46754553fdc..ff97ba92433 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -49,7 +49,7 @@ static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len) spin_unlock(&ls->ls_recover_list_lock); if (!found) - de = allocate_direntry(ls, len); + de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_KERNEL); return de; } @@ -62,7 +62,7 @@ void dlm_clear_free_entries(struct dlm_ls *ls) de = list_entry(ls->ls_recover_list.next, struct dlm_direntry, list); list_del(&de->list); - free_direntry(de); + kfree(de); } spin_unlock(&ls->ls_recover_list_lock); } @@ -171,7 +171,7 @@ void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen } list_del(&de->list); - free_direntry(de); + kfree(de); out: write_unlock(&ls->ls_dirtbl[bucket].lock); } @@ -302,7 +302,7 @@ static int get_entry(struct dlm_ls *ls, int nodeid, char *name, write_unlock(&ls->ls_dirtbl[bucket].lock); - de = allocate_direntry(ls, namelen); + de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_KERNEL); if (!de) return -ENOMEM; @@ -313,7 +313,7 @@ static int get_entry(struct dlm_ls *ls, int nodeid, char *name, write_lock(&ls->ls_dirtbl[bucket].lock); tmp = search_bucket(ls, name, namelen, bucket); if (tmp) { - free_direntry(de); + kfree(de); de = tmp; } else { list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); @@ -329,49 +329,47 @@ int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen, return get_entry(ls, nodeid, name, namelen, r_nodeid); } -/* Copy the names of master rsb's into the buffer provided. - Only select names whose dir node is the given nodeid. */ +static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) +{ + struct dlm_rsb *r; + + down_read(&ls->ls_root_sem); + list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + if (len == r->res_length && !memcmp(name, r->res_name, len)) { + up_read(&ls->ls_root_sem); + return r; + } + } + up_read(&ls->ls_root_sem); + return NULL; +} + +/* Find the rsb where we left off (or start again), then send rsb names + for rsb's we're master of and whose directory node matches the requesting + node. inbuf is the rsb name last sent, inlen is the name's length */ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, char *outbuf, int outlen, int nodeid) { struct list_head *list; - struct dlm_rsb *start_r = NULL, *r = NULL; - int offset = 0, start_namelen, error, dir_nodeid; - char *start_name; + struct dlm_rsb *r; + int offset = 0, dir_nodeid; uint16_t be_namelen; - /* - * Find the rsb where we left off (or start again) - */ - - start_namelen = inlen; - start_name = inbuf; - - if (start_namelen > 1) { - /* - * We could also use a find_rsb_root() function here that - * searched the ls_root_list. - */ - error = dlm_find_rsb(ls, start_name, start_namelen, R_MASTER, - &start_r); - DLM_ASSERT(!error && start_r, - printk("error %d\n", error);); - DLM_ASSERT(!list_empty(&start_r->res_root_list), - dlm_print_rsb(start_r);); - dlm_put_rsb(start_r); - } - - /* - * Send rsb names for rsb's we're master of and whose directory node - * matches the requesting node. - */ - down_read(&ls->ls_root_sem); - if (start_r) - list = start_r->res_root_list.next; - else + + if (inlen > 1) { + r = find_rsb_root(ls, inbuf, inlen); + if (!r) { + inbuf[inlen - 1] = '\0'; + log_error(ls, "copy_master_names from %d start %d %s", + nodeid, inlen, inbuf); + goto out; + } + list = r->res_root_list.next; + } else { list = ls->ls_root_list.next; + } for (offset = 0; list != &ls->ls_root_list; list = list->next) { r = list_entry(list, struct dlm_rsb, res_root_list); diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index d2fc2384c3b..ec61bbaf25d 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -570,5 +570,21 @@ static inline int dlm_no_directory(struct dlm_ls *ls) return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0; } +int dlm_netlink_init(void); +void dlm_netlink_exit(void); +void dlm_timeout_warn(struct dlm_lkb *lkb); + +#ifdef CONFIG_DLM_DEBUG +int dlm_register_debugfs(void); +void dlm_unregister_debugfs(void); +int dlm_create_debug_file(struct dlm_ls *ls); +void dlm_delete_debug_file(struct dlm_ls *ls); +#else +static inline int dlm_register_debugfs(void) { return 0; } +static inline void dlm_unregister_debugfs(void) { } +static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; } +static inline void dlm_delete_debug_file(struct dlm_ls *ls) { } +#endif + #endif /* __DLM_INTERNAL_DOT_H__ */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 3915b8e1414..ff4a198fa67 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -88,7 +88,6 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, static int receive_extralen(struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); static void del_timeout(struct dlm_lkb *lkb); -void dlm_timeout_warn(struct dlm_lkb *lkb); /* * Lock compatibilty matrix - thanks Steve @@ -335,7 +334,7 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) { struct dlm_rsb *r; - r = allocate_rsb(ls, len); + r = dlm_allocate_rsb(ls, len); if (!r) return NULL; @@ -478,7 +477,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); if (!error) { write_unlock(&ls->ls_rsbtbl[bucket].lock); - free_rsb(r); + dlm_free_rsb(r); r = tmp; goto out; } @@ -490,12 +489,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, return error; } -int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, - unsigned int flags, struct dlm_rsb **r_ret) -{ - return find_rsb(ls, name, namelen, flags, r_ret); -} - /* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */ @@ -519,7 +512,7 @@ static void toss_rsb(struct kref *kref) list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); r->res_toss_time = jiffies; if (r->res_lvbptr) { - free_lvb(r->res_lvbptr); + dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; } } @@ -589,7 +582,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) uint32_t lkid = 0; uint16_t bucket; - lkb = allocate_lkb(ls); + lkb = dlm_allocate_lkb(ls); if (!lkb) return -ENOMEM; @@ -683,8 +676,8 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) /* for local/process lkbs, lvbptr points to caller's lksb */ if (lkb->lkb_lvbptr && is_master_copy(lkb)) - free_lvb(lkb->lkb_lvbptr); - free_lkb(lkb); + dlm_free_lvb(lkb->lkb_lvbptr); + dlm_free_lkb(lkb); return 1; } else { write_unlock(&ls->ls_lkbtbl[bucket].lock); @@ -988,7 +981,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b) if (is_master(r)) dir_remove(r); - free_rsb(r); + dlm_free_rsb(r); count++; } else { write_unlock(&ls->ls_rsbtbl[b].lock); @@ -1171,7 +1164,7 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) return; if (!r->res_lvbptr) - r->res_lvbptr = allocate_lvb(r->res_ls); + r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) return; @@ -1203,7 +1196,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) return; if (!r->res_lvbptr) - r->res_lvbptr = allocate_lvb(r->res_ls); + r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) return; @@ -1852,7 +1845,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) { struct dlm_ls *ls = r->res_ls; - int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); + int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); @@ -1886,7 +1879,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) return 1; } - for (;;) { + for (i = 0; i < 2; i++) { /* It's possible for dlm_scand to remove an old rsb for this same resource from the toss list, us to create a new one, look up the master locally, and find it @@ -1900,6 +1893,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) log_debug(ls, "dir_lookup error %d %s", error, r->res_name); schedule(); } + if (error && error != -EEXIST) + return error; if (ret_nodeid == our_nodeid) { r->res_first_lkid = 0; @@ -1941,8 +1936,11 @@ static void confirm_master(struct dlm_rsb *r, int error) break; case -EAGAIN: - /* the remote master didn't queue our NOQUEUE request; - make a waiting lkb the first_lkid */ + case -EBADR: + case -ENOTBLK: + /* the remote request failed and won't be retried (it was + a NOQUEUE, or has been canceled/unlocked); make a waiting + lkb the first_lkid */ r->res_first_lkid = 0; @@ -2108,17 +2106,18 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) /* an lkb may be waiting for an rsb lookup to complete where the lookup was initiated by another lock */ - if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { - if (!list_empty(&lkb->lkb_rsb_lookup)) { + if (!list_empty(&lkb->lkb_rsb_lookup)) { + if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); list_del_init(&lkb->lkb_rsb_lookup); queue_cast(lkb->lkb_resource, lkb, args->flags & DLM_LKF_CANCEL ? -DLM_ECANCEL : -DLM_EUNLOCK); unhold_lkb(lkb); /* undoes create_lkb() */ - rv = -EBUSY; - goto out; } + /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ + rv = -EBUSY; + goto out; } /* cancel not allowed with another cancel/unlock in progress */ @@ -2986,7 +2985,7 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, if (lkb->lkb_exflags & DLM_LKF_VALBLK) { if (!lkb->lkb_lvbptr) - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; len = receive_extralen(ms); @@ -3006,11 +3005,9 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST); lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP); - DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb);); - if (lkb->lkb_exflags & DLM_LKF_VALBLK) { /* lkb was just created so there won't be an lvb yet */ - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; } @@ -3021,16 +3018,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { - if (lkb->lkb_nodeid != ms->m_header.h_nodeid) { - log_error(ls, "convert_args nodeid %d %d lkid %x %x", - lkb->lkb_nodeid, ms->m_header.h_nodeid, - lkb->lkb_id, lkb->lkb_remid); - return -EINVAL; - } - - if (!is_master_copy(lkb)) - return -EINVAL; - if (lkb->lkb_status != DLM_LKSTS_GRANTED) return -EBUSY; @@ -3046,8 +3033,6 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { - if (!is_master_copy(lkb)) - return -EINVAL; if (receive_lvb(ls, lkb, ms)) return -ENOMEM; return 0; @@ -3063,6 +3048,50 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) lkb->lkb_remid = ms->m_lkid; } +/* This is called after the rsb is locked so that we can safely inspect + fields in the lkb. */ + +static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + int from = ms->m_header.h_nodeid; + int error = 0; + + switch (ms->m_type) { + case DLM_MSG_CONVERT: + case DLM_MSG_UNLOCK: + case DLM_MSG_CANCEL: + if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + case DLM_MSG_CONVERT_REPLY: + case DLM_MSG_UNLOCK_REPLY: + case DLM_MSG_CANCEL_REPLY: + case DLM_MSG_GRANT: + case DLM_MSG_BAST: + if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + case DLM_MSG_REQUEST_REPLY: + if (!is_process_copy(lkb)) + error = -EINVAL; + else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + default: + error = -EINVAL; + } + + if (error) + log_error(lkb->lkb_resource->res_ls, + "ignore invalid message %d from %d %x %x %x %d", + ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, + lkb->lkb_flags, lkb->lkb_nodeid); + return error; +} + static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; @@ -3124,17 +3153,21 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags(lkb, ms); error = receive_convert_args(ls, lkb, ms); if (error) - goto out; + goto out_reply; reply = !down_conversion(lkb); error = do_convert(r, lkb); - out: + out_reply: if (reply) send_convert_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3160,15 +3193,19 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags(lkb, ms); error = receive_unlock_args(ls, lkb, ms); if (error) - goto out; + goto out_reply; error = do_unlock(r, lkb); - out: + out_reply: send_unlock_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3196,9 +3233,13 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + error = do_cancel(r, lkb); send_cancel_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3217,22 +3258,26 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_grant no lkb"); + log_debug(ls, "receive_grant from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags_reply(lkb, ms); if (is_altmode(lkb)) munge_altmode(lkb, ms); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3246,18 +3291,22 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_bast no lkb"); + log_debug(ls, "receive_bast from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); - queue_bast(r, lkb, ms->m_bastmode); + error = validate_message(lkb, ms); + if (error) + goto out; + queue_bast(r, lkb, ms->m_bastmode); + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3323,15 +3372,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_request_reply no lkb"); + log_debug(ls, "receive_request_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + mstype = lkb->lkb_wait_type; error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); if (error) @@ -3383,6 +3436,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) if (is_overlap(lkb)) { /* we'll ignore error in cancel/unlock reply */ queue_cast_overlap(r, lkb); + confirm_master(r, result); unhold_lkb(lkb); /* undoes create_lkb() */ } else _request_lock(r, lkb); @@ -3463,6 +3517,10 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3481,10 +3539,10 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_convert_reply no lkb"); + log_debug(ls, "receive_convert_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_convert_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3498,6 +3556,10 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3529,10 +3591,10 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_unlock_reply no lkb"); + log_debug(ls, "receive_unlock_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_unlock_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3546,6 +3608,10 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3577,10 +3643,10 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_cancel_reply no lkb"); + log_debug(ls, "receive_cancel_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_cancel_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3640,6 +3706,13 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) { + if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { + log_debug(ls, "ignore non-member message %d from %d %x %x %d", + ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, + ms->m_remid, ms->m_result); + return; + } + switch (ms->m_type) { /* messages sent to a master node */ @@ -3778,8 +3851,9 @@ void dlm_receive_buffer(struct dlm_header *hd, int nodeid) ls = dlm_find_lockspace_global(hd->h_lockspace); if (!ls) { - log_print("invalid h_lockspace %x from %d cmd %d type %d", - hd->h_lockspace, nodeid, hd->h_cmd, type); + if (dlm_config.ci_log_debug) + log_print("invalid lockspace %x from %d cmd %d type %d", + hd->h_lockspace, nodeid, hd->h_cmd, type); if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) dlm_send_ls_not_ready(nodeid, rc); @@ -3806,6 +3880,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; ls->ls_stub_ms.m_result = -EINPROGRESS; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_convert_reply(lkb, &ls->ls_stub_ms); /* Same special case as in receive_rcom_lock_args() */ @@ -3847,6 +3922,7 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) void dlm_recover_waiters_pre(struct dlm_ls *ls) { struct dlm_lkb *lkb, *safe; + int wait_type, stub_unlock_result, stub_cancel_result; mutex_lock(&ls->ls_waiters_mutex); @@ -3865,7 +3941,33 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) if (!waiter_needs_recovery(ls, lkb)) continue; - switch (lkb->lkb_wait_type) { + wait_type = lkb->lkb_wait_type; + stub_unlock_result = -DLM_EUNLOCK; + stub_cancel_result = -DLM_ECANCEL; + + /* Main reply may have been received leaving a zero wait_type, + but a reply for the overlapping op may not have been + received. In that case we need to fake the appropriate + reply for the overlap op. */ + + if (!wait_type) { + if (is_overlap_cancel(lkb)) { + wait_type = DLM_MSG_CANCEL; + if (lkb->lkb_grmode == DLM_LOCK_IV) + stub_cancel_result = 0; + } + if (is_overlap_unlock(lkb)) { + wait_type = DLM_MSG_UNLOCK; + if (lkb->lkb_grmode == DLM_LOCK_IV) + stub_unlock_result = -ENOENT; + } + + log_debug(ls, "rwpre overlap %x %x %d %d %d", + lkb->lkb_id, lkb->lkb_flags, wait_type, + stub_cancel_result, stub_unlock_result); + } + + switch (wait_type) { case DLM_MSG_REQUEST: lkb->lkb_flags |= DLM_IFL_RESEND; @@ -3878,8 +3980,9 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_UNLOCK: hold_lkb(lkb); ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; - ls->ls_stub_ms.m_result = -DLM_EUNLOCK; + ls->ls_stub_ms.m_result = stub_unlock_result; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_unlock_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; @@ -3887,15 +3990,16 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_CANCEL: hold_lkb(lkb); ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; - ls->ls_stub_ms.m_result = -DLM_ECANCEL; + ls->ls_stub_ms.m_result = stub_cancel_result; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_cancel_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; default: - log_error(ls, "invalid lkb wait_type %d", - lkb->lkb_wait_type); + log_error(ls, "invalid lkb wait_type %d %d", + lkb->lkb_wait_type, wait_type); } schedule(); } @@ -4184,7 +4288,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP); if (lkb->lkb_exflags & DLM_LKF_VALBLK) { - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - @@ -4259,7 +4363,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) put_rsb(r); out: if (error) - log_print("recover_master_copy %d %x", error, rl->rl_lkid); + log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid); rl->rl_result = error; return error; } @@ -4342,7 +4446,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, } } - /* After ua is attached to lkb it will be freed by free_lkb(). + /* After ua is attached to lkb it will be freed by dlm_free_lkb(). When DLM_IFL_USER is set, the dlm knows that this is a userspace lock and that lkb_astparam is the dlm_user_args structure. */ @@ -4679,6 +4783,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) } list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { + lkb->lkb_ast_type = 0; list_del(&lkb->lkb_astqueue); dlm_put_lkb(lkb); } diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index ada04680a1e..27b6ed30291 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -19,8 +19,6 @@ void dlm_print_lkb(struct dlm_lkb *lkb); void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms); void dlm_receive_buffer(struct dlm_header *hd, int nodeid); int dlm_modes_compat(int mode1, int mode2); -int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, - unsigned int flags, struct dlm_rsb **r_ret); void dlm_put_rsb(struct dlm_rsb *r); void dlm_hold_rsb(struct dlm_rsb *r); int dlm_put_lkb(struct dlm_lkb *lkb); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 6353a838452..b180fdc5108 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -24,14 +24,6 @@ #include "recover.h" #include "requestqueue.h" -#ifdef CONFIG_DLM_DEBUG -int dlm_create_debug_file(struct dlm_ls *ls); -void dlm_delete_debug_file(struct dlm_ls *ls); -#else -static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; } -static inline void dlm_delete_debug_file(struct dlm_ls *ls) { } -#endif - static int ls_count; static struct mutex ls_lock; static struct list_head lslist; @@ -166,26 +158,7 @@ static struct kobj_type dlm_ktype = { .release = lockspace_kobj_release, }; -static struct kset dlm_kset = { - .ktype = &dlm_ktype, -}; - -static int kobject_setup(struct dlm_ls *ls) -{ - char lsname[DLM_LOCKSPACE_LEN]; - int error; - - memset(lsname, 0, DLM_LOCKSPACE_LEN); - snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name); - - error = kobject_set_name(&ls->ls_kobj, "%s", lsname); - if (error) - return error; - - ls->ls_kobj.kset = &dlm_kset; - ls->ls_kobj.ktype = &dlm_ktype; - return 0; -} +static struct kset *dlm_kset; static int do_uevent(struct dlm_ls *ls, int in) { @@ -220,24 +193,22 @@ static int do_uevent(struct dlm_ls *ls, int in) int dlm_lockspace_init(void) { - int error; - ls_count = 0; mutex_init(&ls_lock); INIT_LIST_HEAD(&lslist); spin_lock_init(&lslist_lock); - kobject_set_name(&dlm_kset.kobj, "dlm"); - kobj_set_kset_s(&dlm_kset, kernel_subsys); - error = kset_register(&dlm_kset); - if (error) - printk("dlm_lockspace_init: cannot register kset %d\n", error); - return error; + dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj); + if (!dlm_kset) { + printk(KERN_WARNING "%s: can not create kset\n", __FUNCTION__); + return -ENOMEM; + } + return 0; } void dlm_lockspace_exit(void) { - kset_unregister(&dlm_kset); + kset_unregister(dlm_kset); } static int dlm_scand(void *data) @@ -549,13 +520,12 @@ static int new_lockspace(char *name, int namelen, void **lockspace, goto out_delist; } - error = kobject_setup(ls); - if (error) - goto out_stop; - - error = kobject_register(&ls->ls_kobj); + ls->ls_kobj.kset = dlm_kset; + error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, + "%s", ls->ls_name); if (error) goto out_stop; + kobject_uevent(&ls->ls_kobj, KOBJ_ADD); /* let kobject handle freeing of ls if there's an error */ do_unreg = 1; @@ -601,7 +571,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace, kfree(ls->ls_rsbtbl); out_lsfree: if (do_unreg) - kobject_unregister(&ls->ls_kobj); + kobject_put(&ls->ls_kobj); else kfree(ls); out: @@ -706,9 +676,9 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_del_ast(lkb); if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY) - free_lvb(lkb->lkb_lvbptr); + dlm_free_lvb(lkb->lkb_lvbptr); - free_lkb(lkb); + dlm_free_lkb(lkb); } } dlm_astd_resume(); @@ -726,7 +696,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) res_hashchain); list_del(&rsb->res_hashchain); - free_rsb(rsb); + dlm_free_rsb(rsb); } head = &ls->ls_rsbtbl[i].toss; @@ -734,7 +704,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) rsb = list_entry(head->next, struct dlm_rsb, res_hashchain); list_del(&rsb->res_hashchain); - free_rsb(rsb); + dlm_free_rsb(rsb); } } @@ -750,7 +720,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_clear_members(ls); dlm_clear_members_gone(ls); kfree(ls->ls_node_array); - kobject_unregister(&ls->ls_kobj); + kobject_put(&ls->ls_kobj); /* The ls structure will be freed when the kobject is done with */ mutex_lock(&ls_lock); diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 58bf3f5cdbe..7c1e5e5cccd 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -864,7 +864,7 @@ static void sctp_init_assoc(struct connection *con) static void tcp_connect_to_sock(struct connection *con) { int result = -EHOSTUNREACH; - struct sockaddr_storage saddr; + struct sockaddr_storage saddr, src_addr; int addr_len; struct socket *sock; @@ -898,6 +898,17 @@ static void tcp_connect_to_sock(struct connection *con) con->connect_action = tcp_connect_to_sock; add_sock(sock, con); + /* Bind to our cluster-known address connecting to avoid + routing problems */ + memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); + make_sockaddr(&src_addr, 0, &addr_len); + result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, + addr_len); + if (result < 0) { + log_print("could not bind for connect: %d", result); + /* This *may* not indicate a critical error */ + } + make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); log_print("connecting to %d", con->nodeid); @@ -1062,7 +1073,7 @@ static int sctp_listen_for_all(void) subscribe.sctp_shutdown_event = 1; subscribe.sctp_partial_delivery_event = 1; - result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF, + result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, (char *)&bufsize, sizeof(bufsize)); if (result) log_print("Error increasing buffer space on socket %d", result); @@ -1426,6 +1437,8 @@ void dlm_lowcomms_stop(void) con = __nodeid2con(i, 0); if (con) { close_connection(con, true); + if (con->othercon) + kmem_cache_free(con_cache, con->othercon); kmem_cache_free(con_cache, con); } } @@ -1454,10 +1467,6 @@ int dlm_lowcomms_start(void) if (!con_cache) goto out; - /* Set some sysctl minima */ - if (sysctl_rmem_max < NEEDED_RMEM) - sysctl_rmem_max = NEEDED_RMEM; - /* Start listening */ if (dlm_config.ci_protocol == 0) error = tcp_listen_for_all(); diff --git a/fs/dlm/main.c b/fs/dlm/main.c index eca2907f238..58487fb95a4 100644 --- a/fs/dlm/main.c +++ b/fs/dlm/main.c @@ -18,16 +18,6 @@ #include "memory.h" #include "config.h" -#ifdef CONFIG_DLM_DEBUG -int dlm_register_debugfs(void); -void dlm_unregister_debugfs(void); -#else -static inline int dlm_register_debugfs(void) { return 0; } -static inline void dlm_unregister_debugfs(void) { } -#endif -int dlm_netlink_init(void); -void dlm_netlink_exit(void); - static int __init init_dlm(void) { int error; diff --git a/fs/dlm/member.c b/fs/dlm/member.c index e9cdcab306e..fa17f5a2788 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -70,7 +70,7 @@ static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb) ls->ls_num_nodes--; } -static int dlm_is_member(struct dlm_ls *ls, int nodeid) +int dlm_is_member(struct dlm_ls *ls, int nodeid) { struct dlm_member *memb; diff --git a/fs/dlm/member.h b/fs/dlm/member.h index 927c08c1921..7a26fca1e0b 100644 --- a/fs/dlm/member.h +++ b/fs/dlm/member.h @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -19,6 +19,7 @@ void dlm_clear_members(struct dlm_ls *ls); void dlm_clear_members_gone(struct dlm_ls *ls); int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out); int dlm_is_removed(struct dlm_ls *ls, int nodeid); +int dlm_is_member(struct dlm_ls *ls, int nodeid); #endif /* __MEMBER_DOT_H__ */ diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c index ecf0e5cb203..f7783867491 100644 --- a/fs/dlm/memory.c +++ b/fs/dlm/memory.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -35,7 +35,7 @@ void dlm_memory_exit(void) kmem_cache_destroy(lkb_cache); } -char *allocate_lvb(struct dlm_ls *ls) +char *dlm_allocate_lvb(struct dlm_ls *ls) { char *p; @@ -43,7 +43,7 @@ char *allocate_lvb(struct dlm_ls *ls) return p; } -void free_lvb(char *p) +void dlm_free_lvb(char *p) { kfree(p); } @@ -51,7 +51,7 @@ void free_lvb(char *p) /* FIXME: have some minimal space built-in to rsb for the name and kmalloc a separate name if needed, like dentries are done */ -struct dlm_rsb *allocate_rsb(struct dlm_ls *ls, int namelen) +struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls, int namelen) { struct dlm_rsb *r; @@ -61,14 +61,14 @@ struct dlm_rsb *allocate_rsb(struct dlm_ls *ls, int namelen) return r; } -void free_rsb(struct dlm_rsb *r) +void dlm_free_rsb(struct dlm_rsb *r) { if (r->res_lvbptr) - free_lvb(r->res_lvbptr); + dlm_free_lvb(r->res_lvbptr); kfree(r); } -struct dlm_lkb *allocate_lkb(struct dlm_ls *ls) +struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls) { struct dlm_lkb *lkb; @@ -76,7 +76,7 @@ struct dlm_lkb *allocate_lkb(struct dlm_ls *ls) return lkb; } -void free_lkb(struct dlm_lkb *lkb) +void dlm_free_lkb(struct dlm_lkb *lkb) { if (lkb->lkb_flags & DLM_IFL_USER) { struct dlm_user_args *ua; @@ -90,19 +90,3 @@ void free_lkb(struct dlm_lkb *lkb) kmem_cache_free(lkb_cache, lkb); } -struct dlm_direntry *allocate_direntry(struct dlm_ls *ls, int namelen) -{ - struct dlm_direntry *de; - - DLM_ASSERT(namelen <= DLM_RESNAME_MAXLEN, - printk("namelen = %d\n", namelen);); - - de = kzalloc(sizeof(*de) + namelen, GFP_KERNEL); - return de; -} - -void free_direntry(struct dlm_direntry *de) -{ - kfree(de); -} - diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h index 6ead158ccc5..485fb29143b 100644 --- a/fs/dlm/memory.h +++ b/fs/dlm/memory.h @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -16,14 +16,12 @@ int dlm_memory_init(void); void dlm_memory_exit(void); -struct dlm_rsb *allocate_rsb(struct dlm_ls *ls, int namelen); -void free_rsb(struct dlm_rsb *r); -struct dlm_lkb *allocate_lkb(struct dlm_ls *ls); -void free_lkb(struct dlm_lkb *l); -struct dlm_direntry *allocate_direntry(struct dlm_ls *ls, int namelen); -void free_direntry(struct dlm_direntry *de); -char *allocate_lvb(struct dlm_ls *ls); -void free_lvb(char *l); +struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls, int namelen); +void dlm_free_rsb(struct dlm_rsb *r); +struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls); +void dlm_free_lkb(struct dlm_lkb *l); +char *dlm_allocate_lvb(struct dlm_ls *ls); +void dlm_free_lvb(char *l); #endif /* __MEMORY_DOT_H__ */ diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index f8c69dda16a..e69926e984d 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -58,8 +58,12 @@ static void copy_from_cb(void *dst, const void *base, unsigned offset, int dlm_process_incoming_buffer(int nodeid, const void *base, unsigned offset, unsigned len, unsigned limit) { - unsigned char __tmp[DLM_INBUF_LEN]; - struct dlm_header *msg = (struct dlm_header *) __tmp; + union { + unsigned char __buf[DLM_INBUF_LEN]; + /* this is to force proper alignment on some arches */ + struct dlm_header dlm; + } __tmp; + struct dlm_header *msg = &__tmp.dlm; int ret = 0; int err = 0; uint16_t msglen; @@ -100,8 +104,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base, in the buffer on the stack (which should work for most ordinary messages). */ - if (msglen > sizeof(__tmp) && - msg == (struct dlm_header *) __tmp) { + if (msglen > DLM_INBUF_LEN && msg == &__tmp.dlm) { msg = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL); if (msg == NULL) return ret; @@ -119,7 +122,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base, dlm_receive_buffer(msg, nodeid); } - if (msg != (struct dlm_header *) __tmp) + if (msg != &__tmp.dlm) kfree(msg); return err ? err : ret; diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index ae2fd97fa4a..026824cd3ac 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -197,11 +197,6 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) spin_unlock(&ls->ls_rcom_spin); } -static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) -{ - receive_sync_reply(ls, rc_in); -} - int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) { struct dlm_rcom *rc; @@ -254,11 +249,6 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) send_rcom(ls, mh, rc); } -static void receive_rcom_names_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) -{ - receive_sync_reply(ls, rc_in); -} - int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) { struct dlm_rcom *rc; @@ -381,11 +371,6 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) send_rcom(ls, mh, rc); } -static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) -{ - dlm_recover_process_copy(ls, rc_in); -} - /* If the lockspace doesn't exist then still send a status message back; it's possible that it just doesn't have its global_id yet. */ @@ -481,11 +466,11 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) break; case DLM_RCOM_STATUS_REPLY: - receive_rcom_status_reply(ls, rc); + receive_sync_reply(ls, rc); break; case DLM_RCOM_NAMES_REPLY: - receive_rcom_names_reply(ls, rc); + receive_sync_reply(ls, rc); break; case DLM_RCOM_LOOKUP_REPLY: @@ -493,11 +478,11 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) break; case DLM_RCOM_LOCK_REPLY: - receive_rcom_lock_reply(ls, rc); + dlm_recover_process_copy(ls, rc); break; default: - DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type);); + log_error(ls, "receive_rcom bad type %d", rc->rc_type); } out: return; diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index c2cc7694cd1..df075dc300f 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -629,7 +629,7 @@ static void recover_lvb(struct dlm_rsb *r) goto out; if (!r->res_lvbptr) { - r->res_lvbptr = allocate_lvb(r->res_ls); + r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) goto out; } @@ -731,6 +731,20 @@ int dlm_create_root_list(struct dlm_ls *ls) list_add(&r->res_root_list, &ls->ls_root_list); dlm_hold_rsb(r); } + + /* If we're using a directory, add tossed rsbs to the root + list; they'll have entries created in the new directory, + but no other recovery steps should do anything with them. */ + + if (dlm_no_directory(ls)) { + read_unlock(&ls->ls_rsbtbl[i].lock); + continue; + } + + list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) { + list_add(&r->res_root_list, &ls->ls_root_list); + dlm_hold_rsb(r); + } read_unlock(&ls->ls_rsbtbl[i].lock); } out: @@ -750,6 +764,11 @@ void dlm_release_root_list(struct dlm_ls *ls) up_write(&ls->ls_root_sem); } +/* If not using a directory, clear the entire toss list, there's no benefit to + caching the master value since it's fixed. If we are using a dir, keep the + rsb's we're the master of. Recovery will add them to the root list and from + there they'll be entered in the rebuilt directory. */ + void dlm_clear_toss_list(struct dlm_ls *ls) { struct dlm_rsb *r, *safe; @@ -759,8 +778,10 @@ void dlm_clear_toss_list(struct dlm_ls *ls) write_lock(&ls->ls_rsbtbl[i].lock); list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss, res_hashchain) { - list_del(&r->res_hashchain); - free_rsb(r); + if (dlm_no_directory(ls) || !is_master(r)) { + list_del(&r->res_hashchain); + dlm_free_rsb(r); + } } write_unlock(&ls->ls_rsbtbl[i].lock); } diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 4b89e20eebe..997f9531d59 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -67,17 +67,18 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_astd_resume(); /* - * This list of root rsb's will be the basis of most of the recovery - * routines. + * Free non-master tossed rsb's. Master rsb's are kept on toss + * list and put on root list to be included in resdir recovery. */ - dlm_create_root_list(ls); + dlm_clear_toss_list(ls); /* - * Free all the tossed rsb's so we don't have to recover them. + * This list of root rsb's will be the basis of most of the recovery + * routines. */ - dlm_clear_toss_list(ls); + dlm_create_root_list(ls); /* * Add or remove nodes from the lockspace's ls_nodes list. diff --git a/fs/dlm/user.c b/fs/dlm/user.c index 4f741546f4b..7cbc6826239 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -24,8 +24,7 @@ #include "lvb_table.h" #include "user.h" -static const char *name_prefix="dlm"; -static struct miscdevice ctl_device; +static const char name_prefix[] = "dlm"; static const struct file_operations device_fops; #ifdef CONFIG_COMPAT @@ -82,7 +81,8 @@ struct dlm_lock_result32 { }; static void compat_input(struct dlm_write_request *kb, - struct dlm_write_request32 *kb32) + struct dlm_write_request32 *kb32, + int max_namelen) { kb->version[0] = kb32->version[0]; kb->version[1] = kb32->version[1]; @@ -112,7 +112,11 @@ static void compat_input(struct dlm_write_request *kb, kb->i.lock.bastaddr = (void *)(long)kb32->i.lock.bastaddr; kb->i.lock.lksb = (void *)(long)kb32->i.lock.lksb; memcpy(kb->i.lock.lvb, kb32->i.lock.lvb, DLM_USER_LVB_LEN); - memcpy(kb->i.lock.name, kb32->i.lock.name, kb->i.lock.namelen); + if (kb->i.lock.namelen <= max_namelen) + memcpy(kb->i.lock.name, kb32->i.lock.name, + kb->i.lock.namelen); + else + kb->i.lock.namelen = max_namelen; } } @@ -236,12 +240,12 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int type) spin_unlock(&proc->asts_spin); if (eol) { - spin_lock(&ua->proc->locks_spin); + spin_lock(&proc->locks_spin); if (!list_empty(&lkb->lkb_ownqueue)) { list_del_init(&lkb->lkb_ownqueue); dlm_put_lkb(lkb); } - spin_unlock(&ua->proc->locks_spin); + spin_unlock(&proc->locks_spin); } out: mutex_unlock(&ls->ls_clear_proc_locks); @@ -529,7 +533,8 @@ static ssize_t device_write(struct file *file, const char __user *buf, if (proc) set_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags); - compat_input(kbuf, k32buf); + compat_input(kbuf, k32buf, + count - sizeof(struct dlm_write_request32)); kfree(k32buf); } #endif @@ -896,14 +901,16 @@ static const struct file_operations ctl_device_fops = { .owner = THIS_MODULE, }; +static struct miscdevice ctl_device = { + .name = "dlm-control", + .fops = &ctl_device_fops, + .minor = MISC_DYNAMIC_MINOR, +}; + int dlm_user_init(void) { int error; - ctl_device.name = "dlm-control"; - ctl_device.fops = &ctl_device_fops; - ctl_device.minor = MISC_DYNAMIC_MINOR; - error = misc_register(&ctl_device); if (error) log_print("misc_register failed for control device"); diff --git a/fs/dlm/util.c b/fs/dlm/util.c index 963889cf674..4d9c1f4e1bd 100644 --- a/fs/dlm/util.c +++ b/fs/dlm/util.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -14,6 +14,14 @@ #include "rcom.h" #include "util.h" +#define DLM_ERRNO_EDEADLK 35 +#define DLM_ERRNO_EBADR 53 +#define DLM_ERRNO_EBADSLT 57 +#define DLM_ERRNO_EPROTO 71 +#define DLM_ERRNO_EOPNOTSUPP 95 +#define DLM_ERRNO_ETIMEDOUT 110 +#define DLM_ERRNO_EINPROGRESS 115 + static void header_out(struct dlm_header *hd) { hd->h_version = cpu_to_le32(hd->h_version); @@ -30,11 +38,54 @@ static void header_in(struct dlm_header *hd) hd->h_length = le16_to_cpu(hd->h_length); } -void dlm_message_out(struct dlm_message *ms) +/* higher errno values are inconsistent across architectures, so select + one set of values for on the wire */ + +static int to_dlm_errno(int err) +{ + switch (err) { + case -EDEADLK: + return -DLM_ERRNO_EDEADLK; + case -EBADR: + return -DLM_ERRNO_EBADR; + case -EBADSLT: + return -DLM_ERRNO_EBADSLT; + case -EPROTO: + return -DLM_ERRNO_EPROTO; + case -EOPNOTSUPP: + return -DLM_ERRNO_EOPNOTSUPP; + case -ETIMEDOUT: + return -DLM_ERRNO_ETIMEDOUT; + case -EINPROGRESS: + return -DLM_ERRNO_EINPROGRESS; + } + return err; +} + +static int from_dlm_errno(int err) { - struct dlm_header *hd = (struct dlm_header *) ms; + switch (err) { + case -DLM_ERRNO_EDEADLK: + return -EDEADLK; + case -DLM_ERRNO_EBADR: + return -EBADR; + case -DLM_ERRNO_EBADSLT: + return -EBADSLT; + case -DLM_ERRNO_EPROTO: + return -EPROTO; + case -DLM_ERRNO_EOPNOTSUPP: + return -EOPNOTSUPP; + case -DLM_ERRNO_ETIMEDOUT: + return -ETIMEDOUT; + case -DLM_ERRNO_EINPROGRESS: + return -EINPROGRESS; + } + return err; +} - header_out(hd); +void dlm_message_out(struct dlm_message *ms) +{ + header_out(&ms->m_header); ms->m_type = cpu_to_le32(ms->m_type); ms->m_nodeid = cpu_to_le32(ms->m_nodeid); @@ -53,14 +104,12 @@ void dlm_message_out(struct dlm_message *ms) ms->m_rqmode = cpu_to_le32(ms->m_rqmode); ms->m_bastmode = cpu_to_le32(ms->m_bastmode); ms->m_asts = cpu_to_le32(ms->m_asts); - ms->m_result = cpu_to_le32(ms->m_result); + ms->m_result = cpu_to_le32(to_dlm_errno(ms->m_result)); } void dlm_message_in(struct dlm_message *ms) { - struct dlm_header *hd = (struct dlm_header *) ms; - - header_in(hd); + header_in(&ms->m_header); ms->m_type = le32_to_cpu(ms->m_type); ms->m_nodeid = le32_to_cpu(ms->m_nodeid); @@ -79,7 +128,7 @@ void dlm_message_in(struct dlm_message *ms) ms->m_rqmode = le32_to_cpu(ms->m_rqmode); ms->m_bastmode = le32_to_cpu(ms->m_bastmode); ms->m_asts = le32_to_cpu(ms->m_asts); - ms->m_result = le32_to_cpu(ms->m_result); + ms->m_result = from_dlm_errno(le32_to_cpu(ms->m_result)); } static void rcom_lock_out(struct rcom_lock *rl) @@ -126,10 +175,9 @@ static void rcom_config_in(struct rcom_config *rf) void dlm_rcom_out(struct dlm_rcom *rc) { - struct dlm_header *hd = (struct dlm_header *) rc; int type = rc->rc_type; - header_out(hd); + header_out(&rc->rc_header); rc->rc_type = cpu_to_le32(rc->rc_type); rc->rc_result = cpu_to_le32(rc->rc_result); @@ -137,7 +185,7 @@ void dlm_rcom_out(struct dlm_rcom *rc) rc->rc_seq = cpu_to_le64(rc->rc_seq); rc->rc_seq_reply = cpu_to_le64(rc->rc_seq_reply); - if (type == DLM_RCOM_LOCK) + if ((type == DLM_RCOM_LOCK) || (type == DLM_RCOM_LOCK_REPLY)) rcom_lock_out((struct rcom_lock *) rc->rc_buf); else if (type == DLM_RCOM_STATUS_REPLY) @@ -146,9 +194,9 @@ void dlm_rcom_out(struct dlm_rcom *rc) void dlm_rcom_in(struct dlm_rcom *rc) { - struct dlm_header *hd = (struct dlm_header *) rc; + int type; - header_in(hd); + header_in(&rc->rc_header); rc->rc_type = le32_to_cpu(rc->rc_type); rc->rc_result = le32_to_cpu(rc->rc_result); @@ -156,10 +204,12 @@ void dlm_rcom_in(struct dlm_rcom *rc) rc->rc_seq = le64_to_cpu(rc->rc_seq); rc->rc_seq_reply = le64_to_cpu(rc->rc_seq_reply); - if (rc->rc_type == DLM_RCOM_LOCK) + type = rc->rc_type; + + if ((type == DLM_RCOM_LOCK) || (type == DLM_RCOM_LOCK_REPLY)) rcom_lock_in((struct rcom_lock *) rc->rc_buf); - else if (rc->rc_type == DLM_RCOM_STATUS_REPLY) + else if (type == DLM_RCOM_STATUS_REPLY) rcom_config_in((struct rcom_config *) rc->rc_buf); } |