From 7c52b166c588c98cf3d2b2e7e6a0468a98e84d0d Mon Sep 17 00:00:00 2001 From: Robert Peterson Date: Fri, 16 Mar 2007 10:26:37 +0000 Subject: [GFS2] Add gfs2_tool lockdump support to gfs2 (bz 228540) The attached patch resolves bz 228540. This adds the capability for gfs2 to dump gfs2 locks through the debugfs file system. This used to exist in gfs1 as "gfs_tool lockdump" but it's missing from gfs2 because all the ioctls were stripped out. Please see the bugzilla for more history about the fix. This patch is also attached to the bugzilla record. The patch is against Steve Whitehouse's latest nmw git tree kernel (2.6.21-rc1) and has been tested on system trin-10. Signed-off-by: Robert Peterson Signed-off-by: Steven Whitehouse --- fs/gfs2/glock.c | 314 ++++++++++++++++++++++++++++++++++++++++----------- fs/gfs2/glock.h | 4 + fs/gfs2/incore.h | 1 + fs/gfs2/main.c | 3 + fs/gfs2/ops_fstype.c | 3 + 5 files changed, 262 insertions(+), 63 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index 12accb08fe0..9f203ef4da6 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include "gfs2.h" #include "incore.h" @@ -40,14 +42,22 @@ struct gfs2_gl_hash_bucket { struct hlist_head hb_list; }; +struct glock_iter { + int hash; /* hash bucket index */ + struct gfs2_sbd *sdp; /* incore superblock */ + struct gfs2_glock *gl; /* current glock struct */ + struct hlist_head *hb_list; /* current hash bucket ptr */ + struct seq_file *seq; /* sequence file for debugfs */ + char string[512]; /* scratch space */ +}; + typedef void (*glock_examiner) (struct gfs2_glock * gl); static int gfs2_dump_lockstate(struct gfs2_sbd *sdp); -static int dump_glock(struct gfs2_glock *gl); -static int dump_inode(struct gfs2_inode *ip); static void gfs2_glock_xmote_th(struct gfs2_holder *gh); static void gfs2_glock_drop_th(struct gfs2_glock *gl); static DECLARE_RWSEM(gfs2_umount_flush_sem); +static struct dentry *gfs2_root; #define GFS2_GL_HASH_SHIFT 15 #define GFS2_GL_HASH_SIZE (1 << GFS2_GL_HASH_SHIFT) @@ -1109,6 +1119,20 @@ find_holder_by_owner(struct list_head *head, struct task_struct *owner) return NULL; } +static void print_dbg(struct glock_iter *gi, const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + if (gi) { + vsprintf(gi->string, fmt, args); + seq_printf(gi->seq, gi->string); + } + else + vprintk(fmt, args); + va_end(args); +} + /** * add_to_queue - Add a holder to the wait queue (but look for recursion) * @gh: the holder structure to add @@ -1849,31 +1873,32 @@ void gfs2_gl_hash_clear(struct gfs2_sbd *sdp, int wait) * Returns: 0 on success, -ENOBUFS when we run out of space */ -static int dump_holder(char *str, struct gfs2_holder *gh) +static int dump_holder(struct glock_iter *gi, char *str, + struct gfs2_holder *gh) { unsigned int x; - int error = -ENOBUFS; - printk(KERN_INFO " %s\n", str); - printk(KERN_INFO " owner = %ld\n", + print_dbg(gi, " %s\n", str); + print_dbg(gi, " owner = %ld\n", (gh->gh_owner) ? (long)gh->gh_owner->pid : -1); - printk(KERN_INFO " gh_state = %u\n", gh->gh_state); - printk(KERN_INFO " gh_flags ="); + print_dbg(gi, " gh_state = %u\n", gh->gh_state); + print_dbg(gi, " gh_flags ="); for (x = 0; x < 32; x++) if (gh->gh_flags & (1 << x)) - printk(" %u", x); - printk(" \n"); - printk(KERN_INFO " error = %d\n", gh->gh_error); - printk(KERN_INFO " gh_iflags ="); + print_dbg(gi, " %u", x); + print_dbg(gi, " \n"); + print_dbg(gi, " error = %d\n", gh->gh_error); + print_dbg(gi, " gh_iflags ="); for (x = 0; x < 32; x++) if (test_bit(x, &gh->gh_iflags)) - printk(" %u", x); - printk(" \n"); - print_symbol(KERN_INFO " initialized at: %s\n", gh->gh_ip); - - error = 0; + print_dbg(gi, " %u", x); + print_dbg(gi, " \n"); + if (gi) + print_dbg(gi, " initialized at: 0x%x\n", gh->gh_ip); + else + print_symbol(KERN_INFO " initialized at: %s\n", gh->gh_ip); - return error; + return 0; } /** @@ -1883,25 +1908,20 @@ static int dump_holder(char *str, struct gfs2_holder *gh) * Returns: 0 on success, -ENOBUFS when we run out of space */ -static int dump_inode(struct gfs2_inode *ip) +static int dump_inode(struct glock_iter *gi, struct gfs2_inode *ip) { unsigned int x; - int error = -ENOBUFS; - printk(KERN_INFO " Inode:\n"); - printk(KERN_INFO " num = %llu %llu\n", - (unsigned long long)ip->i_num.no_formal_ino, - (unsigned long long)ip->i_num.no_addr); - printk(KERN_INFO " type = %u\n", IF2DT(ip->i_inode.i_mode)); - printk(KERN_INFO " i_flags ="); + print_dbg(gi, " Inode:\n"); + print_dbg(gi, " num = %llu/%llu\n", + ip->i_num.no_formal_ino, ip->i_num.no_addr); + print_dbg(gi, " type = %u\n", IF2DT(ip->i_inode.i_mode)); + print_dbg(gi, " i_flags ="); for (x = 0; x < 32; x++) if (test_bit(x, &ip->i_flags)) - printk(" %u", x); - printk(" \n"); - - error = 0; - - return error; + print_dbg(gi, " %u", x); + print_dbg(gi, " \n"); + return 0; } /** @@ -1912,7 +1932,7 @@ static int dump_inode(struct gfs2_inode *ip) * Returns: 0 on success, -ENOBUFS when we run out of space */ -static int dump_glock(struct gfs2_glock *gl) +static int dump_glock(struct glock_iter *gi, struct gfs2_glock *gl) { struct gfs2_holder *gh; unsigned int x; @@ -1920,66 +1940,66 @@ static int dump_glock(struct gfs2_glock *gl) spin_lock(&gl->gl_spin); - printk(KERN_INFO "Glock 0x%p (%u, %llu)\n", gl, gl->gl_name.ln_type, - (unsigned long long)gl->gl_name.ln_number); - printk(KERN_INFO " gl_flags ="); + print_dbg(gi, "Glock 0x%p (%u, %llu)\n", gl, gl->gl_name.ln_type, + (unsigned long long)gl->gl_name.ln_number); + print_dbg(gi, " gl_flags ="); for (x = 0; x < 32; x++) { if (test_bit(x, &gl->gl_flags)) - printk(" %u", x); - } - printk(" \n"); - printk(KERN_INFO " gl_ref = %d\n", atomic_read(&gl->gl_ref)); - printk(KERN_INFO " gl_state = %u\n", gl->gl_state); - printk(KERN_INFO " gl_owner = %s\n", gl->gl_owner->comm); - print_symbol(KERN_INFO " gl_ip = %s\n", gl->gl_ip); - printk(KERN_INFO " req_gh = %s\n", (gl->gl_req_gh) ? "yes" : "no"); - printk(KERN_INFO " req_bh = %s\n", (gl->gl_req_bh) ? "yes" : "no"); - printk(KERN_INFO " lvb_count = %d\n", atomic_read(&gl->gl_lvb_count)); - printk(KERN_INFO " object = %s\n", (gl->gl_object) ? "yes" : "no"); - printk(KERN_INFO " le = %s\n", + print_dbg(gi, " %u", x); + } + print_dbg(gi, " \n"); + print_dbg(gi, " gl_ref = %d\n", atomic_read(&gl->gl_ref)); + print_dbg(gi, " gl_state = %u\n", gl->gl_state); + print_dbg(gi, " gl_owner = %s\n", gl->gl_owner->comm); + print_dbg(gi, " gl_ip = %lu\n", gl->gl_ip); + print_dbg(gi, " req_gh = %s\n", (gl->gl_req_gh) ? "yes" : "no"); + print_dbg(gi, " req_bh = %s\n", (gl->gl_req_bh) ? "yes" : "no"); + print_dbg(gi, " lvb_count = %d\n", atomic_read(&gl->gl_lvb_count)); + print_dbg(gi, " object = %s\n", (gl->gl_object) ? "yes" : "no"); + print_dbg(gi, " le = %s\n", (list_empty(&gl->gl_le.le_list)) ? "no" : "yes"); - printk(KERN_INFO " reclaim = %s\n", - (list_empty(&gl->gl_reclaim)) ? "no" : "yes"); + print_dbg(gi, " reclaim = %s\n", + (list_empty(&gl->gl_reclaim)) ? "no" : "yes"); if (gl->gl_aspace) - printk(KERN_INFO " aspace = 0x%p nrpages = %lu\n", gl->gl_aspace, - gl->gl_aspace->i_mapping->nrpages); + print_dbg(gi, " aspace = 0x%p nrpages = %lu\n", gl->gl_aspace, + gl->gl_aspace->i_mapping->nrpages); else - printk(KERN_INFO " aspace = no\n"); - printk(KERN_INFO " ail = %d\n", atomic_read(&gl->gl_ail_count)); + print_dbg(gi, " aspace = no\n"); + print_dbg(gi, " ail = %d\n", atomic_read(&gl->gl_ail_count)); if (gl->gl_req_gh) { - error = dump_holder("Request", gl->gl_req_gh); + error = dump_holder(gi, "Request", gl->gl_req_gh); if (error) goto out; } list_for_each_entry(gh, &gl->gl_holders, gh_list) { - error = dump_holder("Holder", gh); + error = dump_holder(gi, "Holder", gh); if (error) goto out; } list_for_each_entry(gh, &gl->gl_waiters1, gh_list) { - error = dump_holder("Waiter1", gh); + error = dump_holder(gi, "Waiter1", gh); if (error) goto out; } list_for_each_entry(gh, &gl->gl_waiters2, gh_list) { - error = dump_holder("Waiter2", gh); + error = dump_holder(gi, "Waiter2", gh); if (error) goto out; } list_for_each_entry(gh, &gl->gl_waiters3, gh_list) { - error = dump_holder("Waiter3", gh); + error = dump_holder(gi, "Waiter3", gh); if (error) goto out; } if (gl->gl_ops == &gfs2_inode_glops && gl->gl_object) { if (!test_bit(GLF_LOCK, &gl->gl_flags) && - list_empty(&gl->gl_holders)) { - error = dump_inode(gl->gl_object); + list_empty(&gl->gl_holders)) { + error = dump_inode(gi, gl->gl_object); if (error) goto out; } else { error = -ENOBUFS; - printk(KERN_INFO " Inode: busy\n"); + print_dbg(gi, " Inode: busy\n"); } } @@ -2014,7 +2034,7 @@ static int gfs2_dump_lockstate(struct gfs2_sbd *sdp) if (gl->gl_sbd != sdp) continue; - error = dump_glock(gl); + error = dump_glock(NULL, gl); if (error) break; } @@ -2043,3 +2063,171 @@ int __init gfs2_glock_init(void) return 0; } +static int gfs2_glock_iter_next(struct glock_iter *gi) +{ + while (1) { + if (!gi->hb_list) { /* If we don't have a hash bucket yet */ + gi->hb_list = &gl_hash_table[gi->hash].hb_list; + if (hlist_empty(gi->hb_list)) { + gi->hash++; + gi->hb_list = NULL; + if (gi->hash >= GFS2_GL_HASH_SIZE) + return 1; + else + continue; + } + if (!hlist_empty(gi->hb_list)) { + gi->gl = list_entry(gi->hb_list->first, + struct gfs2_glock, + gl_list); + } + } else { + if (gi->gl->gl_list.next == NULL) { + gi->hash++; + gi->hb_list = NULL; + continue; + } + gi->gl = list_entry(gi->gl->gl_list.next, + struct gfs2_glock, gl_list); + } + if (gi->gl) + break; + } + return 0; +} + +static void gfs2_glock_iter_free(struct glock_iter *gi) +{ + kfree(gi); +} + +static struct glock_iter *gfs2_glock_iter_init(struct gfs2_sbd *sdp) +{ + struct glock_iter *gi; + + gi = kmalloc(sizeof (*gi), GFP_KERNEL); + if (!gi) + return NULL; + + gi->sdp = sdp; + gi->hash = 0; + gi->gl = NULL; + gi->hb_list = NULL; + gi->seq = NULL; + memset(gi->string, 0, sizeof(gi->string)); + + if (gfs2_glock_iter_next(gi)) { + gfs2_glock_iter_free(gi); + return NULL; + } + + return gi; +} + +static void *gfs2_glock_seq_start(struct seq_file *file, loff_t *pos) +{ + struct glock_iter *gi; + loff_t n = *pos; + + gi = gfs2_glock_iter_init(file->private); + if (!gi) + return NULL; + + while (n--) { + if (gfs2_glock_iter_next(gi)) { + gfs2_glock_iter_free(gi); + return NULL; + } + } + + return gi; +} + +static void *gfs2_glock_seq_next(struct seq_file *file, void *iter_ptr, + loff_t *pos) +{ + struct glock_iter *gi = iter_ptr; + + (*pos)++; + + if (gfs2_glock_iter_next(gi)) { + gfs2_glock_iter_free(gi); + return NULL; + } + + return gi; +} + +static void gfs2_glock_seq_stop(struct seq_file *file, void *iter_ptr) +{ + /* nothing for now */ +} + +static int gfs2_glock_seq_show(struct seq_file *file, void *iter_ptr) +{ + struct glock_iter *gi = iter_ptr; + + gi->seq = file; + dump_glock(gi, gi->gl); + + return 0; +} + +static struct seq_operations gfs2_glock_seq_ops = { + .start = gfs2_glock_seq_start, + .next = gfs2_glock_seq_next, + .stop = gfs2_glock_seq_stop, + .show = gfs2_glock_seq_show, +}; + +static int gfs2_debugfs_open(struct inode *inode, struct file *file) +{ + struct seq_file *seq; + int ret; + + ret = seq_open(file, &gfs2_glock_seq_ops); + if (ret) + return ret; + + seq = file->private_data; + seq->private = inode->i_private; + + return 0; +} + +static const struct file_operations gfs2_debug_fops = { + .owner = THIS_MODULE, + .open = gfs2_debugfs_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release +}; + +int gfs2_create_debugfs_file(struct gfs2_sbd *sdp) +{ + sdp->debugfs_dentry = debugfs_create_file(sdp->sd_table_name, + S_IFREG | S_IRUGO, + gfs2_root, sdp, + &gfs2_debug_fops); + if (!sdp->debugfs_dentry) + return -ENOMEM; + + return 0; +} + +void gfs2_delete_debugfs_file(struct gfs2_sbd *sdp) +{ + if (sdp->debugfs_dentry) + debugfs_remove(sdp->debugfs_dentry); +} + +int gfs2_register_debugfs(void) +{ + gfs2_root = debugfs_create_dir("gfs2", NULL); + return gfs2_root ? 0 : -ENOMEM; +} + +void gfs2_unregister_debugfs(void) +{ + debugfs_remove(gfs2_root); +} diff --git a/fs/gfs2/glock.h b/fs/gfs2/glock.h index f50e40ceca4..d7cef740872 100644 --- a/fs/gfs2/glock.h +++ b/fs/gfs2/glock.h @@ -135,5 +135,9 @@ void gfs2_scand_internal(struct gfs2_sbd *sdp); void gfs2_gl_hash_clear(struct gfs2_sbd *sdp, int wait); int __init gfs2_glock_init(void); +int gfs2_create_debugfs_file(struct gfs2_sbd *sdp); +void gfs2_delete_debugfs_file(struct gfs2_sbd *sdp); +int gfs2_register_debugfs(void); +void gfs2_unregister_debugfs(void); #endif /* __GLOCK_DOT_H__ */ diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h index 49f0dbf40d8..7555261d911 100644 --- a/fs/gfs2/incore.h +++ b/fs/gfs2/incore.h @@ -611,6 +611,7 @@ struct gfs2_sbd { unsigned long sd_last_warning; struct vfsmount *sd_gfs2mnt; + struct dentry *debugfs_dentry; /* for debugfs */ }; #endif /* __INCORE_DOT_H__ */ diff --git a/fs/gfs2/main.c b/fs/gfs2/main.c index 6e8a59809ab..218395371db 100644 --- a/fs/gfs2/main.c +++ b/fs/gfs2/main.c @@ -103,6 +103,8 @@ static int __init init_gfs2_fs(void) if (error) goto fail_unregister; + gfs2_register_debugfs(); + printk("GFS2 (built %s %s) installed\n", __DATE__, __TIME__); return 0; @@ -130,6 +132,7 @@ fail: static void __exit exit_gfs2_fs(void) { + gfs2_unregister_debugfs(); unregister_filesystem(&gfs2_fs_type); unregister_filesystem(&gfs2meta_fs_type); diff --git a/fs/gfs2/ops_fstype.c b/fs/gfs2/ops_fstype.c index ee54cb66708..ecb8b18de0e 100644 --- a/fs/gfs2/ops_fstype.c +++ b/fs/gfs2/ops_fstype.c @@ -690,6 +690,8 @@ static int fill_super(struct super_block *sb, void *data, int silent) if (error) goto fail; + gfs2_create_debugfs_file(sdp); + error = gfs2_sys_fs_add(sdp); if (error) goto fail; @@ -896,6 +898,7 @@ error: static void gfs2_kill_sb(struct super_block *sb) { + gfs2_delete_debugfs_file(sb->s_fs_info); kill_block_super(sb); } -- cgit v1.2.3 From 5c7342d894973636f03270673e1fb7b908a421a8 Mon Sep 17 00:00:00 2001 From: Josef Whiter Date: Wed, 7 Mar 2007 17:09:10 -0500 Subject: [GFS2] fix bz 231369, gfs2 will oops if you specify an invalid mount option If you specify an invalid mount option when trying to mount a gfs2 filesystem, gfs2 will oops. The attached patch resolves this problem. Signed-off-by: Josef Whiter Signed-off-by: Steven Whitehouse --- fs/gfs2/glock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index 9f203ef4da6..a3a24f2e99d 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -2217,7 +2217,7 @@ int gfs2_create_debugfs_file(struct gfs2_sbd *sdp) void gfs2_delete_debugfs_file(struct gfs2_sbd *sdp) { - if (sdp->debugfs_dentry) + if (sdp && sdp->debugfs_dentry) debugfs_remove(sdp->debugfs_dentry); } -- cgit v1.2.3 From 89adc934f3f96600e7f31447426c7e99d62c5460 Mon Sep 17 00:00:00 2001 From: Patrick Caulfield Date: Tue, 13 Mar 2007 17:08:45 +0000 Subject: [DLM] Fix uninitialised variable in receiving The length of the second element of the kvec array was not initialised before being added to the first one. This could cause invalid lengths to be passed to kernel_recvmsg Signed-Off-By: Patrick Caulfield Signed-off-by: Steven Whitehouse --- fs/dlm/lowcomms-tcp.c | 1 + 1 file changed, 1 insertion(+) (limited to 'fs') diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c index 07e0a122c32..9bfe7fb721e 100644 --- a/fs/dlm/lowcomms-tcp.c +++ b/fs/dlm/lowcomms-tcp.c @@ -299,6 +299,7 @@ static int receive_from_sock(struct connection *con) */ iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); + iov[1].iov_len = 0; nvec = 1; /* -- cgit v1.2.3 From 1de913909263ba7f7054debeda1b79771a7233db Mon Sep 17 00:00:00 2001 From: Josef Whiter Date: Mon, 12 Mar 2007 16:55:07 -0500 Subject: [GFS2] Fix bz 231380, unlock page before dequeing glocks in gfs2_commit_write If we are writing a file, and in the middle of writing the file another node attempts to get a shared lock on that file (by doing a du for example) the process doing the writing will hang waiting on lock_page. The reason for this is because when we have waiters on a exclusive glock, we will go through and flush out all dirty pages associated with that inode and release the lock. The problem is that when we flush the dirty pages, we could hit a page that we have locked durring the generic_file_buffered_write part of this operation. This patch unlocks the page before we go to dequeue the lock and locks it immediatly afterwards, since generic_file_buffered_write needs the page locked when the commit_write is completed. This patch resolves the problem, however if somebody sees a better way to do this please don't hesistate to yell. Signed-off-by: Josef Whiter Signed-off-by: Steven Whitehouse --- fs/gfs2/ops_address.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'fs') diff --git a/fs/gfs2/ops_address.c b/fs/gfs2/ops_address.c index b3b7e847535..90c287932d5 100644 --- a/fs/gfs2/ops_address.c +++ b/fs/gfs2/ops_address.c @@ -507,7 +507,9 @@ static int gfs2_commit_write(struct file *file, struct page *page, gfs2_quota_unlock(ip); gfs2_alloc_put(ip); } + unlock_page(page); gfs2_glock_dq_m(1, &ip->i_gh); + lock_page(page); gfs2_holder_uninit(&ip->i_gh); return 0; @@ -520,7 +522,9 @@ fail_endtrans: gfs2_quota_unlock(ip); gfs2_alloc_put(ip); } + unlock_page(page); gfs2_glock_dq_m(1, &ip->i_gh); + lock_page(page); gfs2_holder_uninit(&ip->i_gh); fail_nounlock: ClearPageUptodate(page); -- cgit v1.2.3 From 3b8249f6178cb2b68b9d683587797270125cc06a Mon Sep 17 00:00:00 2001 From: Steven Whitehouse Date: Fri, 16 Mar 2007 09:40:31 +0000 Subject: [GFS2] Fix bz 224480 and cleanup glock demotion code This patch prevents the printing of a warning message in cases where the fs is functioning normally by handing off responsibility for unlinked, but still open inodes, to another node for eventual deallocation. Also, there is now an improved system for ensuring that such requests to other nodes do not get lost. The callback on the iopen lock is only ever called when i_nlink == 0 and when a node is unable to deallocate it due to it still being in use on another node. When a node receives the callback therefore, it knows that i_nlink must be zero, so we mark it as such (in gfs2_drop_inode) in order that it will then attempt deallocation of the inode itself. As an additional benefit, queuing a demote request no longer requires a memory allocation. This simplifies the code for dealing with gfs2_holders as it removes one special case. There are two new fields in struct gfs2_glock. gl_demote_state is the state which the remote node has requested and gl_demote_time is the time when the request came in. Both fields are only valid when the GLF_DEMOTE flag is set in gl_flags. Signed-off-by: Steven Whitehouse --- fs/gfs2/glock.c | 233 ++++++++++++++-------------------------------------- fs/gfs2/glock.h | 2 +- fs/gfs2/incore.h | 8 +- fs/gfs2/main.c | 1 - fs/gfs2/ops_super.c | 28 ++++++- 5 files changed, 93 insertions(+), 179 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index a3a24f2e99d..e7075945b05 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -54,7 +54,7 @@ struct glock_iter { typedef void (*glock_examiner) (struct gfs2_glock * gl); static int gfs2_dump_lockstate(struct gfs2_sbd *sdp); -static void gfs2_glock_xmote_th(struct gfs2_holder *gh); +static void gfs2_glock_xmote_th(struct gfs2_glock *gl, struct gfs2_holder *gh); static void gfs2_glock_drop_th(struct gfs2_glock *gl); static DECLARE_RWSEM(gfs2_umount_flush_sem); static struct dentry *gfs2_root; @@ -212,7 +212,6 @@ int gfs2_glock_put(struct gfs2_glock *gl) gfs2_assert(sdp, list_empty(&gl->gl_reclaim)); gfs2_assert(sdp, list_empty(&gl->gl_holders)); gfs2_assert(sdp, list_empty(&gl->gl_waiters1)); - gfs2_assert(sdp, list_empty(&gl->gl_waiters2)); gfs2_assert(sdp, list_empty(&gl->gl_waiters3)); glock_free(gl); rv = 1; @@ -399,7 +398,7 @@ void gfs2_holder_reinit(unsigned int state, unsigned flags, struct gfs2_holder * { gh->gh_state = state; gh->gh_flags = flags; - gh->gh_iflags &= 1 << HIF_ALLOCED; + gh->gh_iflags = 0; gh->gh_ip = (unsigned long)__builtin_return_address(0); } @@ -416,54 +415,8 @@ void gfs2_holder_uninit(struct gfs2_holder *gh) gh->gh_ip = 0; } -/** - * gfs2_holder_get - get a struct gfs2_holder structure - * @gl: the glock - * @state: the state we're requesting - * @flags: the modifier flags - * @gfp_flags: - * - * Figure out how big an impact this function has. Either: - * 1) Replace it with a cache of structures hanging off the struct gfs2_sbd - * 2) Leave it like it is - * - * Returns: the holder structure, NULL on ENOMEM - */ - -static struct gfs2_holder *gfs2_holder_get(struct gfs2_glock *gl, - unsigned int state, - int flags, gfp_t gfp_flags) -{ - struct gfs2_holder *gh; - - gh = kmalloc(sizeof(struct gfs2_holder), gfp_flags); - if (!gh) - return NULL; - - gfs2_holder_init(gl, state, flags, gh); - set_bit(HIF_ALLOCED, &gh->gh_iflags); - gh->gh_ip = (unsigned long)__builtin_return_address(0); - return gh; -} - -/** - * gfs2_holder_put - get rid of a struct gfs2_holder structure - * @gh: the holder structure - * - */ - -static void gfs2_holder_put(struct gfs2_holder *gh) -{ - gfs2_holder_uninit(gh); - kfree(gh); -} - -static void gfs2_holder_dispose_or_wake(struct gfs2_holder *gh) +static void gfs2_holder_wake(struct gfs2_holder *gh) { - if (test_bit(HIF_DEALLOC, &gh->gh_iflags)) { - gfs2_holder_put(gh); - return; - } clear_bit(HIF_WAIT, &gh->gh_iflags); smp_mb(); wake_up_bit(&gh->gh_iflags, HIF_WAIT); @@ -529,7 +482,7 @@ static int rq_promote(struct gfs2_holder *gh) gfs2_reclaim_glock(sdp); } - gfs2_glock_xmote_th(gh); + gfs2_glock_xmote_th(gh->gh_gl, gh); spin_lock(&gl->gl_spin); } return 1; @@ -552,7 +505,7 @@ static int rq_promote(struct gfs2_holder *gh) gh->gh_error = 0; set_bit(HIF_HOLDER, &gh->gh_iflags); - gfs2_holder_dispose_or_wake(gh); + gfs2_holder_wake(gh); return 0; } @@ -564,32 +517,24 @@ static int rq_promote(struct gfs2_holder *gh) * Returns: 1 if the queue is blocked */ -static int rq_demote(struct gfs2_holder *gh) +static int rq_demote(struct gfs2_glock *gl) { - struct gfs2_glock *gl = gh->gh_gl; - if (!list_empty(&gl->gl_holders)) return 1; - if (gl->gl_state == gh->gh_state || gl->gl_state == LM_ST_UNLOCKED) { - list_del_init(&gh->gh_list); - gh->gh_error = 0; - spin_unlock(&gl->gl_spin); - gfs2_holder_dispose_or_wake(gh); - spin_lock(&gl->gl_spin); - } else { - gl->gl_req_gh = gh; - set_bit(GLF_LOCK, &gl->gl_flags); - spin_unlock(&gl->gl_spin); - - if (gh->gh_state == LM_ST_UNLOCKED || - gl->gl_state != LM_ST_EXCLUSIVE) - gfs2_glock_drop_th(gl); - else - gfs2_glock_xmote_th(gh); - - spin_lock(&gl->gl_spin); + if (gl->gl_state == gl->gl_demote_state || + gl->gl_state == LM_ST_UNLOCKED) { + clear_bit(GLF_DEMOTE, &gl->gl_flags); + return 0; } + set_bit(GLF_LOCK, &gl->gl_flags); + spin_unlock(&gl->gl_spin); + if (gl->gl_demote_state == LM_ST_UNLOCKED || + gl->gl_state != LM_ST_EXCLUSIVE) + gfs2_glock_drop_th(gl); + else + gfs2_glock_xmote_th(gl, NULL); + spin_lock(&gl->gl_spin); return 0; } @@ -617,16 +562,8 @@ static void run_queue(struct gfs2_glock *gl) else gfs2_assert_warn(gl->gl_sbd, 0); - } else if (!list_empty(&gl->gl_waiters2) && - !test_bit(GLF_SKIP_WAITERS2, &gl->gl_flags)) { - gh = list_entry(gl->gl_waiters2.next, - struct gfs2_holder, gh_list); - - if (test_bit(HIF_DEMOTE, &gh->gh_iflags)) - blocked = rq_demote(gh); - else - gfs2_assert_warn(gl->gl_sbd, 0); - + } else if (test_bit(GLF_DEMOTE, &gl->gl_flags)) { + blocked = rq_demote(gl); } else if (!list_empty(&gl->gl_waiters3)) { gh = list_entry(gl->gl_waiters3.next, struct gfs2_holder, gh_list); @@ -717,50 +654,24 @@ static void gfs2_glmutex_unlock(struct gfs2_glock *gl) } /** - * handle_callback - add a demote request to a lock's queue + * handle_callback - process a demote request * @gl: the glock * @state: the state the caller wants us to change to * - * Note: This may fail sliently if we are out of memory. + * There are only two requests that we are going to see in actual + * practise: LM_ST_SHARED and LM_ST_UNLOCKED */ static void handle_callback(struct gfs2_glock *gl, unsigned int state) { - struct gfs2_holder *gh, *new_gh = NULL; - -restart: spin_lock(&gl->gl_spin); - - list_for_each_entry(gh, &gl->gl_waiters2, gh_list) { - if (test_bit(HIF_DEMOTE, &gh->gh_iflags) && - gl->gl_req_gh != gh) { - if (gh->gh_state != state) - gh->gh_state = LM_ST_UNLOCKED; - goto out; - } - } - - if (new_gh) { - list_add_tail(&new_gh->gh_list, &gl->gl_waiters2); - new_gh = NULL; - } else { - spin_unlock(&gl->gl_spin); - - new_gh = gfs2_holder_get(gl, state, LM_FLAG_TRY, GFP_NOFS); - if (!new_gh) - return; - set_bit(HIF_DEMOTE, &new_gh->gh_iflags); - set_bit(HIF_DEALLOC, &new_gh->gh_iflags); - set_bit(HIF_WAIT, &new_gh->gh_iflags); - - goto restart; + if (test_and_set_bit(GLF_DEMOTE, &gl->gl_flags) == 0) { + gl->gl_demote_state = state; + gl->gl_demote_time = jiffies; + } else if (gl->gl_demote_state != LM_ST_UNLOCKED) { + gl->gl_demote_state = state; } - -out: spin_unlock(&gl->gl_spin); - - if (new_gh) - gfs2_holder_put(new_gh); } /** @@ -820,56 +731,37 @@ static void xmote_bh(struct gfs2_glock *gl, unsigned int ret) /* Deal with each possible exit condition */ - if (!gh) + if (!gh) { gl->gl_stamp = jiffies; - else if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags))) { + if (ret & LM_OUT_CANCELED) + op_done = 0; + else + clear_bit(GLF_DEMOTE, &gl->gl_flags); + } else { spin_lock(&gl->gl_spin); list_del_init(&gh->gh_list); gh->gh_error = -EIO; - spin_unlock(&gl->gl_spin); - } else if (test_bit(HIF_DEMOTE, &gh->gh_iflags)) { - spin_lock(&gl->gl_spin); - list_del_init(&gh->gh_list); - if (gl->gl_state == gh->gh_state || - gl->gl_state == LM_ST_UNLOCKED) { + if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags))) + goto out; + gh->gh_error = GLR_CANCELED; + if (ret & LM_OUT_CANCELED) + goto out; + if (relaxed_state_ok(gl->gl_state, gh->gh_state, gh->gh_flags)) { + list_add_tail(&gh->gh_list, &gl->gl_holders); gh->gh_error = 0; - } else { - if (gfs2_assert_warn(sdp, gh->gh_flags & - (LM_FLAG_TRY | LM_FLAG_TRY_1CB)) == -1) - fs_warn(sdp, "ret = 0x%.8X\n", ret); - gh->gh_error = GLR_TRYFAILED; + set_bit(HIF_HOLDER, &gh->gh_iflags); + set_bit(HIF_FIRST, &gh->gh_iflags); + op_done = 0; + goto out; } - spin_unlock(&gl->gl_spin); - - if (ret & LM_OUT_CANCELED) - handle_callback(gl, LM_ST_UNLOCKED); - - } else if (ret & LM_OUT_CANCELED) { - spin_lock(&gl->gl_spin); - list_del_init(&gh->gh_list); - gh->gh_error = GLR_CANCELED; - spin_unlock(&gl->gl_spin); - - } else if (relaxed_state_ok(gl->gl_state, gh->gh_state, gh->gh_flags)) { - spin_lock(&gl->gl_spin); - list_move_tail(&gh->gh_list, &gl->gl_holders); - gh->gh_error = 0; - set_bit(HIF_HOLDER, &gh->gh_iflags); - spin_unlock(&gl->gl_spin); - - set_bit(HIF_FIRST, &gh->gh_iflags); - - op_done = 0; - - } else if (gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)) { - spin_lock(&gl->gl_spin); - list_del_init(&gh->gh_list); gh->gh_error = GLR_TRYFAILED; - spin_unlock(&gl->gl_spin); - - } else { + if (gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)) + goto out; + gh->gh_error = -EINVAL; if (gfs2_assert_withdraw(sdp, 0) == -1) fs_err(sdp, "ret = 0x%.8X\n", ret); +out: + spin_unlock(&gl->gl_spin); } if (glops->go_xmote_bh) @@ -887,7 +779,7 @@ static void xmote_bh(struct gfs2_glock *gl, unsigned int ret) gfs2_glock_put(gl); if (gh) - gfs2_holder_dispose_or_wake(gh); + gfs2_holder_wake(gh); } /** @@ -898,12 +790,11 @@ static void xmote_bh(struct gfs2_glock *gl, unsigned int ret) * */ -void gfs2_glock_xmote_th(struct gfs2_holder *gh) +void gfs2_glock_xmote_th(struct gfs2_glock *gl, struct gfs2_holder *gh) { - struct gfs2_glock *gl = gh->gh_gl; struct gfs2_sbd *sdp = gl->gl_sbd; - int flags = gh->gh_flags; - unsigned state = gh->gh_state; + int flags = gh ? gh->gh_flags : 0; + unsigned state = gh ? gh->gh_state : gl->gl_demote_state; const struct gfs2_glock_operations *glops = gl->gl_ops; int lck_flags = flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB | LM_FLAG_NOEXP | LM_FLAG_ANY | @@ -953,6 +844,7 @@ static void drop_bh(struct gfs2_glock *gl, unsigned int ret) gfs2_assert_warn(sdp, !ret); state_change(gl, LM_ST_UNLOCKED); + clear_bit(GLF_DEMOTE, &gl->gl_flags); if (glops->go_inval) glops->go_inval(gl, DIO_METADATA); @@ -974,7 +866,7 @@ static void drop_bh(struct gfs2_glock *gl, unsigned int ret) gfs2_glock_put(gl); if (gh) - gfs2_holder_dispose_or_wake(gh); + gfs2_holder_wake(gh); } /** @@ -1291,9 +1183,8 @@ void gfs2_glock_dq(struct gfs2_holder *gh) if (glops->go_unlock) glops->go_unlock(gh); - gl->gl_stamp = jiffies; - spin_lock(&gl->gl_spin); + gl->gl_stamp = jiffies; } clear_bit(GLF_LOCK, &gl->gl_flags); @@ -1981,16 +1872,16 @@ static int dump_glock(struct glock_iter *gi, struct gfs2_glock *gl) if (error) goto out; } - list_for_each_entry(gh, &gl->gl_waiters2, gh_list) { - error = dump_holder(gi, "Waiter2", gh); - if (error) - goto out; - } list_for_each_entry(gh, &gl->gl_waiters3, gh_list) { error = dump_holder(gi, "Waiter3", gh); if (error) goto out; } + if (test_bit(GLF_DEMOTE, &gl->gl_flags)) { + print_dbg(gi, " Demotion req to state %u (%llu uS ago)\n", + gl->gl_demote_state, + (u64)(jiffies - gl->gl_demote_time)*1000000/HZ); + } if (gl->gl_ops == &gfs2_inode_glops && gl->gl_object) { if (!test_bit(GLF_LOCK, &gl->gl_flags) && list_empty(&gl->gl_holders)) { diff --git a/fs/gfs2/glock.h b/fs/gfs2/glock.h index d7cef740872..5e662eadc6f 100644 --- a/fs/gfs2/glock.h +++ b/fs/gfs2/glock.h @@ -67,7 +67,7 @@ static inline int gfs2_glock_is_blocking(struct gfs2_glock *gl) { int ret; spin_lock(&gl->gl_spin); - ret = !list_empty(&gl->gl_waiters2) || !list_empty(&gl->gl_waiters3); + ret = test_bit(GLF_DEMOTE, &gl->gl_flags) || !list_empty(&gl->gl_waiters3); spin_unlock(&gl->gl_spin); return ret; } diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h index 7555261d911..9c125823d76 100644 --- a/fs/gfs2/incore.h +++ b/fs/gfs2/incore.h @@ -115,11 +115,8 @@ enum { /* Actions */ HIF_MUTEX = 0, HIF_PROMOTE = 1, - HIF_DEMOTE = 2, /* States */ - HIF_ALLOCED = 4, - HIF_DEALLOC = 5, HIF_HOLDER = 6, HIF_FIRST = 7, HIF_ABORTED = 9, @@ -142,8 +139,8 @@ struct gfs2_holder { enum { GLF_LOCK = 1, GLF_STICKY = 2, + GLF_DEMOTE = 3, GLF_DIRTY = 5, - GLF_SKIP_WAITERS2 = 6, }; struct gfs2_glock { @@ -156,11 +153,12 @@ struct gfs2_glock { unsigned int gl_state; unsigned int gl_hash; + unsigned int gl_demote_state; /* state requested by remote node */ + unsigned long gl_demote_time; /* time of first demote request */ struct task_struct *gl_owner; unsigned long gl_ip; struct list_head gl_holders; struct list_head gl_waiters1; /* HIF_MUTEX */ - struct list_head gl_waiters2; /* HIF_DEMOTE */ struct list_head gl_waiters3; /* HIF_PROMOTE */ const struct gfs2_glock_operations *gl_ops; diff --git a/fs/gfs2/main.c b/fs/gfs2/main.c index 218395371db..c4bb374eaf9 100644 --- a/fs/gfs2/main.c +++ b/fs/gfs2/main.c @@ -45,7 +45,6 @@ static void gfs2_init_glock_once(void *foo, struct kmem_cache *cachep, unsigned spin_lock_init(&gl->gl_spin); INIT_LIST_HEAD(&gl->gl_holders); INIT_LIST_HEAD(&gl->gl_waiters1); - INIT_LIST_HEAD(&gl->gl_waiters2); INIT_LIST_HEAD(&gl->gl_waiters3); gl->gl_lvb = NULL; atomic_set(&gl->gl_lvb_count, 0); diff --git a/fs/gfs2/ops_super.c b/fs/gfs2/ops_super.c index b89999d3a76..485ce3d4992 100644 --- a/fs/gfs2/ops_super.c +++ b/fs/gfs2/ops_super.c @@ -283,6 +283,31 @@ static int gfs2_remount_fs(struct super_block *sb, int *flags, char *data) return error; } +/** + * gfs2_drop_inode - Drop an inode (test for remote unlink) + * @inode: The inode to drop + * + * If we've received a callback on an iopen lock then its because a + * remote node tried to deallocate the inode but failed due to this node + * still having the inode open. Here we mark the link count zero + * since we know that it must have reached zero if the GLF_DEMOTE flag + * is set on the iopen glock. If we didn't do a disk read since the + * remote node removed the final link then we might otherwise miss + * this event. This check ensures that this node will deallocate the + * inode's blocks, or alternatively pass the baton on to another + * node for later deallocation. + */ +static void gfs2_drop_inode(struct inode *inode) +{ + if (inode->i_private && inode->i_nlink) { + struct gfs2_inode *ip = GFS2_I(inode); + struct gfs2_glock *gl = ip->i_iopen_gh.gh_gl; + if (gl && test_bit(GLF_DEMOTE, &gl->gl_flags)) + clear_nlink(inode); + } + generic_drop_inode(inode); +} + /** * gfs2_clear_inode - Deallocate an inode when VFS is done with it * @inode: The VFS inode @@ -441,7 +466,7 @@ out_unlock: out_uninit: gfs2_holder_uninit(&ip->i_iopen_gh); gfs2_glock_dq_uninit(&gh); - if (error) + if (error && error != GLR_TRYFAILED) fs_warn(sdp, "gfs2_delete_inode: %d\n", error); out: truncate_inode_pages(&inode->i_data, 0); @@ -481,6 +506,7 @@ const struct super_operations gfs2_super_ops = { .statfs = gfs2_statfs, .remount_fs = gfs2_remount_fs, .clear_inode = gfs2_clear_inode, + .drop_inode = gfs2_drop_inode, .show_options = gfs2_show_options, }; -- cgit v1.2.3 From 420d2a1028b906f24e836e37089a6ad55ab1848f Mon Sep 17 00:00:00 2001 From: Steven Whitehouse Date: Sun, 18 Mar 2007 16:05:27 +0000 Subject: [GFS2] Fix a bug on i386 due to evaluation order Since gcc didn't evaluate the last two terms of the expression in glock.c:1881 as a constant expression, it resulted in an error on i386 due to the lack of a 64bit divide instruction. This adds some brackets to fix the problem. This was reported by Andrew Morton. Signed-off-by: Steven Whitehouse Cc: Andrew Morton --- fs/gfs2/glock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index e7075945b05..b8aa816bb6e 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -1880,7 +1880,7 @@ static int dump_glock(struct glock_iter *gi, struct gfs2_glock *gl) if (test_bit(GLF_DEMOTE, &gl->gl_flags)) { print_dbg(gi, " Demotion req to state %u (%llu uS ago)\n", gl->gl_demote_state, - (u64)(jiffies - gl->gl_demote_time)*1000000/HZ); + (u64)(jiffies - gl->gl_demote_time)*(1000000/HZ)); } if (gl->gl_ops == &gfs2_inode_glops && gl->gl_object) { if (!test_bit(GLF_LOCK, &gl->gl_flags) && -- cgit v1.2.3 From 254da030dfb1b13d42d07e4292a4790d88c6874f Mon Sep 17 00:00:00 2001 From: Patrick Caulfield Date: Wed, 21 Mar 2007 09:23:53 +0000 Subject: [DLM] Don't delete misc device if lockspace removal fails Currently if the lockspace removal fails the misc device associated with a lockspace is left deleted. After that there is no way to access the orphaned lockspace from userland. This patch recreates the misc device if th dlm_release_lockspace fails. I believe this is better than attempting to remove the lockspace first because that leaves an unattached device lying around. The potential gap in which there is no access to the lockspace between removing the misc device and recreating it is acceptable ... after all the application is trying to remove it, and only new users of the lockspace will be affected. Signed-Off-By: Patrick Caulfield Signed-off-by: Steven Whitehouse --- fs/dlm/user.c | 58 ++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 22 deletions(-) (limited to 'fs') diff --git a/fs/dlm/user.c b/fs/dlm/user.c index 3870150b83a..27a75ce571c 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -286,11 +286,34 @@ static int device_user_unlock(struct dlm_user_proc *proc, return error; } +static int create_misc_device(struct dlm_ls *ls, char *name) +{ + int error, len; + + error = -ENOMEM; + len = strlen(name) + strlen(name_prefix) + 2; + ls->ls_device.name = kzalloc(len, GFP_KERNEL); + if (!ls->ls_device.name) + goto fail; + + snprintf((char *)ls->ls_device.name, len, "%s_%s", name_prefix, + name); + ls->ls_device.fops = &device_fops; + ls->ls_device.minor = MISC_DYNAMIC_MINOR; + + error = misc_register(&ls->ls_device); + if (error) { + kfree(ls->ls_device.name); + } +fail: + return error; +} + static int device_create_lockspace(struct dlm_lspace_params *params) { dlm_lockspace_t *lockspace; struct dlm_ls *ls; - int error, len; + int error; if (!capable(CAP_SYS_ADMIN)) return -EPERM; @@ -304,29 +327,14 @@ static int device_create_lockspace(struct dlm_lspace_params *params) if (!ls) return -ENOENT; - error = -ENOMEM; - len = strlen(params->name) + strlen(name_prefix) + 2; - ls->ls_device.name = kzalloc(len, GFP_KERNEL); - if (!ls->ls_device.name) - goto fail; - snprintf((char *)ls->ls_device.name, len, "%s_%s", name_prefix, - params->name); - ls->ls_device.fops = &device_fops; - ls->ls_device.minor = MISC_DYNAMIC_MINOR; - - error = misc_register(&ls->ls_device); - if (error) { - kfree(ls->ls_device.name); - goto fail; - } - - error = ls->ls_device.minor; + error = create_misc_device(ls, params->name); dlm_put_lockspace(ls); - return error; - fail: - dlm_put_lockspace(ls); - dlm_release_lockspace(lockspace, 0); + if (error) + dlm_release_lockspace(lockspace, 0); + else + error = ls->ls_device.minor; + return error; } @@ -343,6 +351,10 @@ static int device_remove_lockspace(struct dlm_lspace_params *params) if (!ls) return -ENOENT; + /* Deregister the misc device first, so we don't have + * a device that's not attached to a lockspace. If + * dlm_release_lockspace fails then we can recreate it + */ error = misc_deregister(&ls->ls_device); if (error) { dlm_put_lockspace(ls); @@ -361,6 +373,8 @@ static int device_remove_lockspace(struct dlm_lspace_params *params) dlm_put_lockspace(ls); error = dlm_release_lockspace(lockspace, force); + if (error) + create_misc_device(ls, ls->ls_name); out: return error; } -- cgit v1.2.3 From f35ac346bc48b2086aa94f031baf1f6237a89de6 Mon Sep 17 00:00:00 2001 From: Steven Whitehouse Date: Sun, 18 Mar 2007 17:04:15 +0000 Subject: [GFS2] Speed up lock_dlm's locking (move sprintf) The following patch speeds up lock_dlm's locking by moving the sprintf out from the lock acquisition path and into the lock creation path. This reduces the amount of CPU time used in acquiring locks by a fair amount. Signed-off-by: Steven Whitehouse Acked-by: David Teigland --- fs/gfs2/locking/dlm/lock.c | 10 ++++------ fs/gfs2/locking/dlm/lock_dlm.h | 1 + 2 files changed, 5 insertions(+), 6 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/locking/dlm/lock.c b/fs/gfs2/locking/dlm/lock.c index b167addf9fd..f9c8bda289a 100644 --- a/fs/gfs2/locking/dlm/lock.c +++ b/fs/gfs2/locking/dlm/lock.c @@ -151,7 +151,7 @@ static inline unsigned int make_flags(struct gdlm_lock *lp, /* make_strname - convert GFS lock numbers to a string */ -static inline void make_strname(struct lm_lockname *lockname, +static inline void make_strname(const struct lm_lockname *lockname, struct gdlm_strname *str) { sprintf(str->name, "%8x%16llx", lockname->ln_type, @@ -169,6 +169,7 @@ static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name, return -ENOMEM; lp->lockname = *name; + make_strname(name, &lp->strname); lp->ls = ls; lp->cur = DLM_LOCK_IV; lp->lvb = NULL; @@ -227,7 +228,6 @@ void gdlm_put_lock(void *lock) unsigned int gdlm_do_lock(struct gdlm_lock *lp) { struct gdlm_ls *ls = lp->ls; - struct gdlm_strname str; int error, bast = 1; /* @@ -249,8 +249,6 @@ unsigned int gdlm_do_lock(struct gdlm_lock *lp) if (test_bit(LFL_NOBAST, &lp->flags)) bast = 0; - make_strname(&lp->lockname, &str); - set_bit(LFL_ACTIVE, &lp->flags); log_debug("lk %x,%llx id %x %d,%d %x", lp->lockname.ln_type, @@ -258,8 +256,8 @@ unsigned int gdlm_do_lock(struct gdlm_lock *lp) lp->cur, lp->req, lp->lkf); error = dlm_lock(ls->dlm_lockspace, lp->req, &lp->lksb, lp->lkf, - str.name, str.namelen, 0, gdlm_ast, lp, - bast ? gdlm_bast : NULL); + lp->strname.name, lp->strname.namelen, 0, gdlm_ast, + lp, bast ? gdlm_bast : NULL); if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) { lp->lksb.sb_status = -EAGAIN; diff --git a/fs/gfs2/locking/dlm/lock_dlm.h b/fs/gfs2/locking/dlm/lock_dlm.h index a87c7bf3c56..6888bd49c31 100644 --- a/fs/gfs2/locking/dlm/lock_dlm.h +++ b/fs/gfs2/locking/dlm/lock_dlm.h @@ -106,6 +106,7 @@ enum { struct gdlm_lock { struct gdlm_ls *ls; struct lm_lockname lockname; + struct gdlm_strname strname; char *lvb; struct dlm_lksb lksb; -- cgit v1.2.3 From 6883562588bc6c70776ecc396ee7eda36c2c8da9 Mon Sep 17 00:00:00 2001 From: Benjamin Marzinski Date: Fri, 23 Mar 2007 09:05:12 +0000 Subject: [GFS2] Fix log entry list corruption When glock_lo_add and rg_lo_add attempt to add an element to the log, they check to see if has already been added before locking the log. If another process adds that element to the log in this window between the check and locking the log, the element will be added to the list twice. This causes the log element list to become corrupted in such a way that the log element can never be successfully removed from the list. This patch pulls the list_empty() check inside the log lock, to remove this window. Signed-off-by: Benjamin E. Marzinski Signed-off-by: Steven Whitehouse --- fs/gfs2/lops.c | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/lops.c b/fs/gfs2/lops.c index 16bb4b4561a..f82d84d05d2 100644 --- a/fs/gfs2/lops.c +++ b/fs/gfs2/lops.c @@ -33,16 +33,17 @@ static void glock_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le) tr->tr_touched = 1; - if (!list_empty(&le->le_list)) - return; - gl = container_of(le, struct gfs2_glock, gl_le); if (gfs2_assert_withdraw(sdp, gfs2_glock_is_held_excl(gl))) return; - gfs2_glock_hold(gl); - set_bit(GLF_DIRTY, &gl->gl_flags); gfs2_log_lock(sdp); + if (!list_empty(&le->le_list)){ + gfs2_log_unlock(sdp); + return; + } + gfs2_glock_hold(gl); + set_bit(GLF_DIRTY, &gl->gl_flags); sdp->sd_log_num_gl++; list_add(&le->le_list, &sdp->sd_log_le_gl); gfs2_log_unlock(sdp); @@ -415,13 +416,14 @@ static void rg_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le) tr->tr_touched = 1; - if (!list_empty(&le->le_list)) - return; - rgd = container_of(le, struct gfs2_rgrpd, rd_le); - gfs2_rgrp_bh_hold(rgd); gfs2_log_lock(sdp); + if (!list_empty(&le->le_list)){ + gfs2_log_unlock(sdp); + return; + } + gfs2_rgrp_bh_hold(rgd); sdp->sd_log_num_rg++; list_add(&le->le_list, &sdp->sd_log_le_rg); gfs2_log_unlock(sdp); -- cgit v1.2.3 From 172e045a7fcc3ee647fa70dbd585a3c247b49cb2 Mon Sep 17 00:00:00 2001 From: Benjamin Marzinski Date: Fri, 23 Mar 2007 14:51:56 -0600 Subject: [GFS2] flush the log if a transaction can't allocate space This is a fix for bz #208514. When GFS2 frees up space, the freed blocks aren't available for reuse until the resource group is successfully written to the ondisk journal. So in rare cases, GFS2 operations will fail, saying that the filesystem is out of space, when in reality, you are just waiting for a log flush. For instance, on a 1Gig filesystem, if I continually write 10 Mb to a file, and then truncate it, after a hundred interations, the write will fail with -ENOSPC, even though the filesystem is just 1% full. The attached patch calls a log flush in these cases. I tested this patch fairly heavily to check if there were any locking issues that I missed, and it seems to work just fine. Also, this patch only does the log flush if get_local_rgrp makes a complete loop of resource groups without skipping any do to locking issues. The code would be slightly simpler if it just always did the log flush after the first failed pass, and you could only ever have to go through the loop twice, instead of up to three times. However, I guessed that failing to find a rg simply do to locking issues would be common enough to skip the log flush in that case, but I'm not certain that this is the right way to go. Either way, I don't suppose this code will be hit all that often. Signed-off-by: Benjamin E. Marzinski Signed-off-by: Steven Whitehouse --- fs/gfs2/rgrp.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/gfs2/rgrp.c b/fs/gfs2/rgrp.c index 8d9c08b5c4b..2ce48d4f246 100644 --- a/fs/gfs2/rgrp.c +++ b/fs/gfs2/rgrp.c @@ -27,6 +27,7 @@ #include "trans.h" #include "ops_file.h" #include "util.h" +#include "log.h" #define BFITNOENT ((u32)~0) @@ -941,9 +942,13 @@ static int get_local_rgrp(struct gfs2_inode *ip) rgd = gfs2_rgrpd_get_first(sdp); if (rgd == begin) { - if (++loops >= 2 || !skipped) + if (++loops >= 3) return -ENOSPC; + if (!skipped) + loops++; flags = 0; + if (loops == 2) + gfs2_log_flush(sdp, NULL); } } -- cgit v1.2.3 From 04b933f27bc8e7f3f6423020cec58a4eab3bb7a7 Mon Sep 17 00:00:00 2001 From: Robert Peterson Date: Fri, 23 Mar 2007 17:05:15 -0500 Subject: [GFS2] Red Hat bz 228540: owner references In Testing the previously posted and accepted patch for https://bugzilla.redhat.com/bugzilla/show_bug.cgi?id=228540 I uncovered some gfs2 badness. It turns out that the current gfs2 code saves off a process pointer when glocks is taken in both the glock and glock holder structures. Those structures will persist in memory long after the process has ended; pointers to poisoned memory. This problem isn't caused by the 228540 fix; the new capability introduced by the fix just uncovered the problem. I wrote this patch that avoids saving process pointers and instead saves off the process pid. Rather than referencing the bad pointers, it now does process lookups. There is special code that makes the output nicer for printing holder information for processes that have ended. This patch also adds a stub for the new "sprint_symbol" function that exists in Andrew Morton's -mm patch set, but won't go into the base kernel until 2.6.22, since it adds functionality but doesn't fix a bug. Signed-off-by: Bob Peterson Signed-off-by: Steven Whitehouse --- fs/gfs2/glock.c | 75 ++++++++++++++++++++++++++++++++++++++++++-------------- fs/gfs2/glock.h | 2 +- fs/gfs2/incore.h | 4 +-- 3 files changed, 59 insertions(+), 22 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index b8aa816bb6e..d2e3094c40f 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -25,6 +25,8 @@ #include #include #include +#include +#include #include "gfs2.h" #include "incore.h" @@ -54,6 +56,7 @@ struct glock_iter { typedef void (*glock_examiner) (struct gfs2_glock * gl); static int gfs2_dump_lockstate(struct gfs2_sbd *sdp); +static int dump_glock(struct glock_iter *gi, struct gfs2_glock *gl); static void gfs2_glock_xmote_th(struct gfs2_glock *gl, struct gfs2_holder *gh); static void gfs2_glock_drop_th(struct gfs2_glock *gl); static DECLARE_RWSEM(gfs2_umount_flush_sem); @@ -64,6 +67,7 @@ static struct dentry *gfs2_root; #define GFS2_GL_HASH_MASK (GFS2_GL_HASH_SIZE - 1) static struct gfs2_gl_hash_bucket gl_hash_table[GFS2_GL_HASH_SIZE]; +static struct dentry *gfs2_root; /* * Despite what you might think, the numbers below are not arbitrary :-) @@ -312,7 +316,7 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, atomic_set(&gl->gl_ref, 1); gl->gl_state = LM_ST_UNLOCKED; gl->gl_hash = hash; - gl->gl_owner = NULL; + gl->gl_owner_pid = 0; gl->gl_ip = 0; gl->gl_ops = glops; gl->gl_req_gh = NULL; @@ -376,7 +380,7 @@ void gfs2_holder_init(struct gfs2_glock *gl, unsigned int state, unsigned flags, INIT_LIST_HEAD(&gh->gh_list); gh->gh_gl = gl; gh->gh_ip = (unsigned long)__builtin_return_address(0); - gh->gh_owner = current; + gh->gh_owner_pid = current->pid; gh->gh_state = state; gh->gh_flags = flags; gh->gh_error = 0; @@ -601,7 +605,7 @@ static void gfs2_glmutex_lock(struct gfs2_glock *gl) if (test_and_set_bit(GLF_LOCK, &gl->gl_flags)) { list_add_tail(&gh.gh_list, &gl->gl_waiters1); } else { - gl->gl_owner = current; + gl->gl_owner_pid = current->pid; gl->gl_ip = (unsigned long)__builtin_return_address(0); clear_bit(HIF_WAIT, &gh.gh_iflags); smp_mb(); @@ -628,7 +632,7 @@ static int gfs2_glmutex_trylock(struct gfs2_glock *gl) if (test_and_set_bit(GLF_LOCK, &gl->gl_flags)) { acquired = 0; } else { - gl->gl_owner = current; + gl->gl_owner_pid = current->pid; gl->gl_ip = (unsigned long)__builtin_return_address(0); } spin_unlock(&gl->gl_spin); @@ -646,7 +650,7 @@ static void gfs2_glmutex_unlock(struct gfs2_glock *gl) { spin_lock(&gl->gl_spin); clear_bit(GLF_LOCK, &gl->gl_flags); - gl->gl_owner = NULL; + gl->gl_owner_pid = 0; gl->gl_ip = 0; run_queue(gl); BUG_ON(!spin_is_locked(&gl->gl_spin)); @@ -999,12 +1003,12 @@ static int glock_wait_internal(struct gfs2_holder *gh) } static inline struct gfs2_holder * -find_holder_by_owner(struct list_head *head, struct task_struct *owner) +find_holder_by_owner(struct list_head *head, pid_t pid) { struct gfs2_holder *gh; list_for_each_entry(gh, head, gh_list) { - if (gh->gh_owner == owner) + if (gh->gh_owner_pid == pid) return gh; } @@ -1036,24 +1040,24 @@ static void add_to_queue(struct gfs2_holder *gh) struct gfs2_glock *gl = gh->gh_gl; struct gfs2_holder *existing; - BUG_ON(!gh->gh_owner); + BUG_ON(!gh->gh_owner_pid); if (test_and_set_bit(HIF_WAIT, &gh->gh_iflags)) BUG(); - existing = find_holder_by_owner(&gl->gl_holders, gh->gh_owner); + existing = find_holder_by_owner(&gl->gl_holders, gh->gh_owner_pid); if (existing) { print_symbol(KERN_WARNING "original: %s\n", existing->gh_ip); - printk(KERN_INFO "pid : %d\n", existing->gh_owner->pid); + printk(KERN_INFO "pid : %d\n", existing->gh_owner_pid); printk(KERN_INFO "lock type : %d lock state : %d\n", existing->gh_gl->gl_name.ln_type, existing->gh_gl->gl_state); print_symbol(KERN_WARNING "new: %s\n", gh->gh_ip); - printk(KERN_INFO "pid : %d\n", gh->gh_owner->pid); + printk(KERN_INFO "pid : %d\n", gh->gh_owner_pid); printk(KERN_INFO "lock type : %d lock state : %d\n", gl->gl_name.ln_type, gl->gl_state); BUG(); } - existing = find_holder_by_owner(&gl->gl_waiters3, gh->gh_owner); + existing = find_holder_by_owner(&gl->gl_waiters3, gh->gh_owner_pid); if (existing) { print_symbol(KERN_WARNING "original: %s\n", existing->gh_ip); print_symbol(KERN_WARNING "new: %s\n", gh->gh_ip); @@ -1756,6 +1760,22 @@ void gfs2_gl_hash_clear(struct gfs2_sbd *sdp, int wait) * Diagnostic routines to help debug distributed deadlock */ +static void gfs2_print_symbol(struct glock_iter *gi, const char *fmt, + unsigned long address) +{ +/* when sprint_symbol becomes available in the new kernel, replace this */ +/* function with: + char buffer[KSYM_SYMBOL_LEN]; + + sprint_symbol(buffer, address); + print_dbg(gi, fmt, buffer); +*/ + if (gi) + print_dbg(gi, fmt, address); + else + print_symbol(fmt, address); +} + /** * dump_holder - print information about a glock holder * @str: a string naming the type of holder @@ -1768,10 +1788,18 @@ static int dump_holder(struct glock_iter *gi, char *str, struct gfs2_holder *gh) { unsigned int x; + struct task_struct *gh_owner; print_dbg(gi, " %s\n", str); - print_dbg(gi, " owner = %ld\n", - (gh->gh_owner) ? (long)gh->gh_owner->pid : -1); + if (gh->gh_owner_pid) { + print_dbg(gi, " owner = %ld ", (long)gh->gh_owner_pid); + gh_owner = find_task_by_pid(gh->gh_owner_pid); + if (gh_owner) + print_dbg(gi, "(%s)\n", gh_owner->comm); + else + print_dbg(gi, "(ended)\n"); + } else + print_dbg(gi, " owner = -1\n"); print_dbg(gi, " gh_state = %u\n", gh->gh_state); print_dbg(gi, " gh_flags ="); for (x = 0; x < 32; x++) @@ -1784,10 +1812,7 @@ static int dump_holder(struct glock_iter *gi, char *str, if (test_bit(x, &gh->gh_iflags)) print_dbg(gi, " %u", x); print_dbg(gi, " \n"); - if (gi) - print_dbg(gi, " initialized at: 0x%x\n", gh->gh_ip); - else - print_symbol(KERN_INFO " initialized at: %s\n", gh->gh_ip); + gfs2_print_symbol(gi, " initialized at: %s\n", gh->gh_ip); return 0; } @@ -1828,6 +1853,7 @@ static int dump_glock(struct glock_iter *gi, struct gfs2_glock *gl) struct gfs2_holder *gh; unsigned int x; int error = -ENOBUFS; + struct task_struct *gl_owner; spin_lock(&gl->gl_spin); @@ -1838,10 +1864,21 @@ static int dump_glock(struct glock_iter *gi, struct gfs2_glock *gl) if (test_bit(x, &gl->gl_flags)) print_dbg(gi, " %u", x); } + if (!test_bit(GLF_LOCK, &gl->gl_flags)) + print_dbg(gi, " (unlocked)"); print_dbg(gi, " \n"); print_dbg(gi, " gl_ref = %d\n", atomic_read(&gl->gl_ref)); print_dbg(gi, " gl_state = %u\n", gl->gl_state); - print_dbg(gi, " gl_owner = %s\n", gl->gl_owner->comm); + if (gl->gl_owner_pid) { + gl_owner = find_task_by_pid(gl->gl_owner_pid); + if (gl_owner) + print_dbg(gi, " gl_owner = pid %d (%s)\n", + gl->gl_owner_pid, gl_owner->comm); + else + print_dbg(gi, " gl_owner = %d (ended)\n", + gl->gl_owner_pid); + } else + print_dbg(gi, " gl_owner = -1\n"); print_dbg(gi, " gl_ip = %lu\n", gl->gl_ip); print_dbg(gi, " req_gh = %s\n", (gl->gl_req_gh) ? "yes" : "no"); print_dbg(gi, " req_bh = %s\n", (gl->gl_req_bh) ? "yes" : "no"); diff --git a/fs/gfs2/glock.h b/fs/gfs2/glock.h index 5e662eadc6f..11477ca3a3c 100644 --- a/fs/gfs2/glock.h +++ b/fs/gfs2/glock.h @@ -38,7 +38,7 @@ static inline int gfs2_glock_is_locked_by_me(struct gfs2_glock *gl) /* Look in glock's list of holders for one with current task as owner */ spin_lock(&gl->gl_spin); list_for_each_entry(gh, &gl->gl_holders, gh_list) { - if (gh->gh_owner == current) { + if (gh->gh_owner_pid == current->pid) { locked = 1; break; } diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h index 9c125823d76..fdf04705906 100644 --- a/fs/gfs2/incore.h +++ b/fs/gfs2/incore.h @@ -127,7 +127,7 @@ struct gfs2_holder { struct list_head gh_list; struct gfs2_glock *gh_gl; - struct task_struct *gh_owner; + pid_t gh_owner_pid; unsigned int gh_state; unsigned gh_flags; @@ -155,7 +155,7 @@ struct gfs2_glock { unsigned int gl_hash; unsigned int gl_demote_state; /* state requested by remote node */ unsigned long gl_demote_time; /* time of first demote request */ - struct task_struct *gl_owner; + pid_t gl_owner_pid; unsigned long gl_ip; struct list_head gl_holders; struct list_head gl_waiters1; /* HIF_MUTEX */ -- cgit v1.2.3 From 032067270295cfca11975c0f7b467244aa170c14 Mon Sep 17 00:00:00 2001 From: Patrick Caulfield Date: Mon, 26 Mar 2007 09:56:00 +0100 Subject: [DLM] fix coverity-spotted stupidity Replacement patch to remove redundant code rather than moving it around. Signed-Off-By: Patrick Caulfield Signed-off-by: Steven Whitehouse --- fs/dlm/lowcomms-tcp.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'fs') diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c index 9bfe7fb721e..919e92a6aeb 100644 --- a/fs/dlm/lowcomms-tcp.c +++ b/fs/dlm/lowcomms-tcp.c @@ -319,8 +319,6 @@ static int receive_from_sock(struct connection *con) if (ret <= 0) goto out_close; - if (ret == -EAGAIN) - goto out_resched; if (ret == len) call_again_soon = 1; -- cgit v1.2.3 From ef0c2bb05f40f9a0cd2deae63e199bfa62faa7fa Mon Sep 17 00:00:00 2001 From: David Teigland Date: Wed, 28 Mar 2007 09:56:46 -0500 Subject: [DLM] overlapping cancel and unlock Full cancel and force-unlock support. In the past, cancel and force-unlock wouldn't work if there was another operation in progress on the lock. Now, both cancel and unlock-force can overlap an operation on a lock, meaning there may be 2 or 3 operations in progress on a lock in parallel. This support is important not only because cancel and force-unlock are explicit operations that an app can use, but both are used implicitly when a process exits while holding locks. Summary of changes: - add-to and remove-from waiters functions were rewritten to handle situations with more than one remote operation outstanding on a lock - validate_unlock_args detects when an overlapping cancel/unlock-force can be sent and when it needs to be delayed until a request/lookup reply is received - processing request/lookup replies detects when cancel/unlock-force occured during the op, and carries out the delayed cancel/unlock-force - manipulation of the "waiters" (remote operation) state of a lock moved under the standard rsb mutex that protects all the other lock state - the two recovery routines related to locks on the waiters list changed according to the way lkb's are now locked before accessing waiters state - waiters recovery detects when lkb's being recovered have overlapping cancel/unlock-force, and may not recover such locks - revert_lock (cancel) returns a value to distinguish cases where it did nothing vs cases where it actually did a cancel; the cancel completion ast should only be done when cancel did something - orphaned locks put on new list so they can be found later for purging - cancel must be called on a lock when making it an orphan - flag user locks (ENDOFLIFE) at the end of their useful life (to the application) so we can return an error for any further cancel/unlock-force - we weren't setting COMP/BAST ast flags if one was already set, so we'd lose either a completion or blocking ast - clear an unread bast on a lock that's become unlocked Signed-off-by: David Teigland Signed-off-by: Steven Whitehouse --- fs/dlm/dlm_internal.h | 10 +- fs/dlm/lock.c | 710 +++++++++++++++++++++++++++++++++++++------------- fs/dlm/lockspace.c | 4 +- fs/dlm/user.c | 77 +++--- 4 files changed, 577 insertions(+), 224 deletions(-) (limited to 'fs') diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 61d93201e1b..178931cca67 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -210,6 +210,9 @@ struct dlm_args { #define DLM_IFL_MSTCPY 0x00010000 #define DLM_IFL_RESEND 0x00020000 #define DLM_IFL_DEAD 0x00040000 +#define DLM_IFL_OVERLAP_UNLOCK 0x00080000 +#define DLM_IFL_OVERLAP_CANCEL 0x00100000 +#define DLM_IFL_ENDOFLIFE 0x00200000 #define DLM_IFL_USER 0x00000001 #define DLM_IFL_ORPHAN 0x00000002 @@ -230,8 +233,8 @@ struct dlm_lkb { int8_t lkb_grmode; /* granted lock mode */ int8_t lkb_bastmode; /* requested mode */ int8_t lkb_highbast; /* highest mode bast sent for */ - int8_t lkb_wait_type; /* type of reply waiting for */ + int8_t lkb_wait_count; int8_t lkb_ast_type; /* type of ast queued for */ struct list_head lkb_idtbl_list; /* lockspace lkbtbl */ @@ -440,6 +443,9 @@ struct dlm_ls { struct mutex ls_waiters_mutex; struct list_head ls_waiters; /* lkbs needing a reply */ + struct mutex ls_orphans_mutex; + struct list_head ls_orphans; + struct list_head ls_nodes; /* current nodes in ls */ struct list_head ls_nodes_gone; /* dead node list, recovery */ int ls_num_nodes; /* number of nodes in ls */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index e725005fafd..b865a46059d 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -254,6 +254,22 @@ static inline int down_conversion(struct dlm_lkb *lkb) return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); } +static inline int is_overlap_unlock(struct dlm_lkb *lkb) +{ + return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; +} + +static inline int is_overlap_cancel(struct dlm_lkb *lkb) +{ + return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; +} + +static inline int is_overlap(struct dlm_lkb *lkb) +{ + return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | + DLM_IFL_OVERLAP_CANCEL)); +} + static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) { if (is_master_copy(lkb)) @@ -267,6 +283,12 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) dlm_add_ast(lkb, AST_COMP); } +static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) +{ + queue_cast(r, lkb, + is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); +} + static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) { if (is_master_copy(lkb)) @@ -547,6 +569,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) lkb->lkb_grmode = DLM_LOCK_IV; kref_init(&lkb->lkb_ref); INIT_LIST_HEAD(&lkb->lkb_ownqueue); + INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); get_random_bytes(&bucket, sizeof(bucket)); bucket &= (ls->ls_lkbtbl_size - 1); @@ -735,23 +758,75 @@ static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) unhold_lkb(lkb); } +static int msg_reply_type(int mstype) +{ + switch (mstype) { + case DLM_MSG_REQUEST: + return DLM_MSG_REQUEST_REPLY; + case DLM_MSG_CONVERT: + return DLM_MSG_CONVERT_REPLY; + case DLM_MSG_UNLOCK: + return DLM_MSG_UNLOCK_REPLY; + case DLM_MSG_CANCEL: + return DLM_MSG_CANCEL_REPLY; + case DLM_MSG_LOOKUP: + return DLM_MSG_LOOKUP_REPLY; + } + return -1; +} + /* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */ -static void add_to_waiters(struct dlm_lkb *lkb, int mstype) +static int add_to_waiters(struct dlm_lkb *lkb, int mstype) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; + int error = 0; mutex_lock(&ls->ls_waiters_mutex); - if (lkb->lkb_wait_type) { - log_print("add_to_waiters error %d", lkb->lkb_wait_type); + + if (is_overlap_unlock(lkb) || + (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { + error = -EINVAL; + goto out; + } + + if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { + switch (mstype) { + case DLM_MSG_UNLOCK: + lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + break; + case DLM_MSG_CANCEL: + lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + break; + default: + error = -EBUSY; + goto out; + } + lkb->lkb_wait_count++; + hold_lkb(lkb); + + log_debug(ls, "add overlap %x cur %d new %d count %d flags %x", + lkb->lkb_id, lkb->lkb_wait_type, mstype, + lkb->lkb_wait_count, lkb->lkb_flags); goto out; } + + DLM_ASSERT(!lkb->lkb_wait_count, + dlm_print_lkb(lkb); + printk("wait_count %d\n", lkb->lkb_wait_count);); + + lkb->lkb_wait_count++; lkb->lkb_wait_type = mstype; - kref_get(&lkb->lkb_ref); + hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: + if (error) + log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s", + lkb->lkb_id, error, lkb->lkb_flags, mstype, + lkb->lkb_wait_type, lkb->lkb_resource->res_name); mutex_unlock(&ls->ls_waiters_mutex); + return error; } /* We clear the RESEND flag because we might be taking an lkb off the waiters @@ -759,34 +834,85 @@ static void add_to_waiters(struct dlm_lkb *lkb, int mstype) request reply on the requestqueue) between dlm_recover_waiters_pre() which set RESEND and dlm_recover_waiters_post() */ -static int _remove_from_waiters(struct dlm_lkb *lkb) +static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype) { - int error = 0; + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + int overlap_done = 0; - if (!lkb->lkb_wait_type) { - log_print("remove_from_waiters error"); - error = -EINVAL; - goto out; + if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + overlap_done = 1; + goto out_del; } - lkb->lkb_wait_type = 0; + + if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + overlap_done = 1; + goto out_del; + } + + /* N.B. type of reply may not always correspond to type of original + msg due to lookup->request optimization, verify others? */ + + if (lkb->lkb_wait_type) { + lkb->lkb_wait_type = 0; + goto out_del; + } + + log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d", + lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type); + return -1; + + out_del: + /* the force-unlock/cancel has completed and we haven't recvd a reply + to the op that was in progress prior to the unlock/cancel; we + give up on any reply to the earlier op. FIXME: not sure when/how + this would happen */ + + if (overlap_done && lkb->lkb_wait_type) { + log_error(ls, "remove_from_waiters %x reply %d give up on %d", + lkb->lkb_id, mstype, lkb->lkb_wait_type); + lkb->lkb_wait_count--; + lkb->lkb_wait_type = 0; + } + + DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); + lkb->lkb_flags &= ~DLM_IFL_RESEND; - list_del(&lkb->lkb_wait_reply); + lkb->lkb_wait_count--; + if (!lkb->lkb_wait_count) + list_del_init(&lkb->lkb_wait_reply); unhold_lkb(lkb); - out: - return error; + return 0; } -static int remove_from_waiters(struct dlm_lkb *lkb) +static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; mutex_lock(&ls->ls_waiters_mutex); - error = _remove_from_waiters(lkb); + error = _remove_from_waiters(lkb, mstype); mutex_unlock(&ls->ls_waiters_mutex); return error; } +/* Handles situations where we might be processing a "fake" or "stub" reply in + which we can't try to take waiters_mutex again. */ + +static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + int error; + + if (ms != &ls->ls_stub_ms) + mutex_lock(&ls->ls_waiters_mutex); + error = _remove_from_waiters(lkb, ms->m_type); + if (ms != &ls->ls_stub_ms) + mutex_unlock(&ls->ls_waiters_mutex); + return error; +} + static void dir_remove(struct dlm_rsb *r) { int to_nodeid; @@ -988,8 +1114,14 @@ static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) _remove_lock(r, lkb); } -static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) +/* returns: 0 did nothing + 1 moved lock to granted + -1 removed lock */ + +static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) { + int rv = 0; + lkb->lkb_rqmode = DLM_LOCK_IV; switch (lkb->lkb_status) { @@ -997,6 +1129,7 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) break; case DLM_LKSTS_CONVERT: move_lkb(r, lkb, DLM_LKSTS_GRANTED); + rv = 1; break; case DLM_LKSTS_WAITING: del_lkb(r, lkb); @@ -1004,15 +1137,17 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) /* this unhold undoes the original ref from create_lkb() so this leads to the lkb being freed */ unhold_lkb(lkb); + rv = -1; break; default: log_print("invalid status for revert %d", lkb->lkb_status); } + return rv; } -static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) +static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) { - revert_lock(r, lkb); + return revert_lock(r, lkb); } static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -1499,7 +1634,7 @@ static void process_lookup_list(struct dlm_rsb *r) struct dlm_lkb *lkb, *safe; list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { - list_del(&lkb->lkb_rsb_lookup); + list_del_init(&lkb->lkb_rsb_lookup); _request_lock(r, lkb); schedule(); } @@ -1530,7 +1665,7 @@ static void confirm_master(struct dlm_rsb *r, int error) if (!list_empty(&r->res_lookup)) { lkb = list_entry(r->res_lookup.next, struct dlm_lkb, lkb_rsb_lookup); - list_del(&lkb->lkb_rsb_lookup); + list_del_init(&lkb->lkb_rsb_lookup); r->res_first_lkid = lkb->lkb_id; _request_lock(r, lkb); } else @@ -1614,6 +1749,9 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) DLM_LKF_FORCEUNLOCK)) return -EINVAL; + if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) + return -EINVAL; + args->flags = flags; args->astparam = (long) astarg; return 0; @@ -1638,6 +1776,9 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, if (lkb->lkb_wait_type) goto out; + + if (is_overlap(lkb)) + goto out; } lkb->lkb_exflags = args->flags; @@ -1654,35 +1795,126 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, return rv; } +/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 + for success */ + +/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here + because there may be a lookup in progress and it's valid to do + cancel/unlockf on it */ + static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) { + struct dlm_ls *ls = lkb->lkb_resource->res_ls; int rv = -EINVAL; - if (lkb->lkb_flags & DLM_IFL_MSTCPY) + if (lkb->lkb_flags & DLM_IFL_MSTCPY) { + log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); + dlm_print_lkb(lkb); goto out; + } - if (args->flags & DLM_LKF_FORCEUNLOCK) - goto out_ok; + /* an lkb may still exist even though the lock is EOL'ed due to a + cancel, unlock or failed noqueue request; an app can't use these + locks; return same error as if the lkid had not been found at all */ - if (args->flags & DLM_LKF_CANCEL && - lkb->lkb_status == DLM_LKSTS_GRANTED) + if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { + log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); + rv = -ENOENT; goto out; + } - if (!(args->flags & DLM_LKF_CANCEL) && - lkb->lkb_status != DLM_LKSTS_GRANTED) - goto out; + /* an lkb may be waiting for an rsb lookup to complete where the + lookup was initiated by another lock */ + + if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { + if (!list_empty(&lkb->lkb_rsb_lookup)) { + log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); + list_del_init(&lkb->lkb_rsb_lookup); + queue_cast(lkb->lkb_resource, lkb, + args->flags & DLM_LKF_CANCEL ? + -DLM_ECANCEL : -DLM_EUNLOCK); + unhold_lkb(lkb); /* undoes create_lkb() */ + rv = -EBUSY; + goto out; + } + } + + /* cancel not allowed with another cancel/unlock in progress */ + + if (args->flags & DLM_LKF_CANCEL) { + if (lkb->lkb_exflags & DLM_LKF_CANCEL) + goto out; + + if (is_overlap(lkb)) + goto out; + + if (lkb->lkb_flags & DLM_IFL_RESEND) { + lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + rv = -EBUSY; + goto out; + } + + switch (lkb->lkb_wait_type) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + rv = -EBUSY; + goto out; + case DLM_MSG_UNLOCK: + case DLM_MSG_CANCEL: + goto out; + } + /* add_to_waiters() will set OVERLAP_CANCEL */ + goto out_ok; + } + + /* do we need to allow a force-unlock if there's a normal unlock + already in progress? in what conditions could the normal unlock + fail such that we'd want to send a force-unlock to be sure? */ + + if (args->flags & DLM_LKF_FORCEUNLOCK) { + if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) + goto out; + + if (is_overlap_unlock(lkb)) + goto out; + if (lkb->lkb_flags & DLM_IFL_RESEND) { + lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + rv = -EBUSY; + goto out; + } + + switch (lkb->lkb_wait_type) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + rv = -EBUSY; + goto out; + case DLM_MSG_UNLOCK: + goto out; + } + /* add_to_waiters() will set OVERLAP_UNLOCK */ + goto out_ok; + } + + /* normal unlock not allowed if there's any op in progress */ rv = -EBUSY; - if (lkb->lkb_wait_type) + if (lkb->lkb_wait_type || lkb->lkb_wait_count) goto out; out_ok: - lkb->lkb_exflags = args->flags; + /* an overlapping op shouldn't blow away exflags from other op */ + lkb->lkb_exflags |= args->flags; lkb->lkb_sbflags = 0; lkb->lkb_astparam = args->astparam; - rv = 0; out: + if (rv) + log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, + lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, + args->flags, lkb->lkb_wait_type, + lkb->lkb_resource->res_name); return rv; } @@ -1759,17 +1991,19 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) return -DLM_EUNLOCK; } -/* FIXME: if revert_lock() finds that the lkb is granted, we should - skip the queue_cast(ECANCEL). It indicates that the request/convert - completed (and queued a normal ast) just before the cancel; we don't - want to clobber the sb_result for the normal ast with ECANCEL. */ +/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) { - revert_lock(r, lkb); - queue_cast(r, lkb, -DLM_ECANCEL); - grant_pending_locks(r); - return -DLM_ECANCEL; + int error; + + error = revert_lock(r, lkb); + if (error) { + queue_cast(r, lkb, -DLM_ECANCEL); + grant_pending_locks(r); + return -DLM_ECANCEL; + } + return 0; } /* @@ -2035,6 +2269,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace, if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) error = 0; + if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) + error = 0; out_put: dlm_put_lkb(lkb); out: @@ -2176,7 +2412,9 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) struct dlm_mhandle *mh; int to_nodeid, error; - add_to_waiters(lkb, mstype); + error = add_to_waiters(lkb, mstype); + if (error) + return error; to_nodeid = r->res_nodeid; @@ -2192,7 +2430,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) return 0; fail: - remove_from_waiters(lkb); + remove_from_waiters(lkb, msg_reply_type(mstype)); return error; } @@ -2209,7 +2447,8 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) /* down conversions go without a reply from the master */ if (!error && down_conversion(lkb)) { - remove_from_waiters(lkb); + remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); + r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; r->res_ls->ls_stub_ms.m_result = 0; r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); @@ -2280,7 +2519,9 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) struct dlm_mhandle *mh; int to_nodeid, error; - add_to_waiters(lkb, DLM_MSG_LOOKUP); + error = add_to_waiters(lkb, DLM_MSG_LOOKUP); + if (error) + return error; to_nodeid = dlm_dir_nodeid(r); @@ -2296,7 +2537,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) return 0; fail: - remove_from_waiters(lkb); + remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); return error; } @@ -2740,7 +2981,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; - int error, mstype; + int error, mstype, result; error = find_lkb(ls, ms->m_remid, &lkb); if (error) { @@ -2749,20 +2990,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - mstype = lkb->lkb_wait_type; - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_request_reply not on waiters"); - goto out; - } - - /* this is the value returned from do_request() on the master */ - error = ms->m_result; - r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + mstype = lkb->lkb_wait_type; + error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); + if (error) + goto out; + /* Optimization: the dir node was also the master, so it took our lookup as a request and sent request reply instead of lookup reply */ if (mstype == DLM_MSG_LOOKUP) { @@ -2770,14 +3006,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) lkb->lkb_nodeid = r->res_nodeid; } - switch (error) { + /* this is the value returned from do_request() on the master */ + result = ms->m_result; + + switch (result) { case -EAGAIN: - /* request would block (be queued) on remote master; - the unhold undoes the original ref from create_lkb() - so it leads to the lkb being freed */ + /* request would block (be queued) on remote master */ queue_cast(r, lkb, -EAGAIN); confirm_master(r, -EAGAIN); - unhold_lkb(lkb); + unhold_lkb(lkb); /* undoes create_lkb() */ break; case -EINPROGRESS: @@ -2785,41 +3022,62 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) /* request was queued or granted on remote master */ receive_flags_reply(lkb, ms); lkb->lkb_remid = ms->m_lkid; - if (error) + if (result) add_lkb(r, lkb, DLM_LKSTS_WAITING); else { grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); } - confirm_master(r, error); + confirm_master(r, result); break; case -EBADR: case -ENOTBLK: /* find_rsb failed to find rsb or rsb wasn't master */ + log_debug(ls, "receive_request_reply %x %x master diff %d %d", + lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result); r->res_nodeid = -1; lkb->lkb_nodeid = -1; - _request_lock(r, lkb); + + if (is_overlap(lkb)) { + /* we'll ignore error in cancel/unlock reply */ + queue_cast_overlap(r, lkb); + unhold_lkb(lkb); /* undoes create_lkb() */ + } else + _request_lock(r, lkb); break; default: - log_error(ls, "receive_request_reply error %d", error); + log_error(ls, "receive_request_reply %x error %d", + lkb->lkb_id, result); } + if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { + log_debug(ls, "receive_request_reply %x result %d unlock", + lkb->lkb_id, result); + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + send_unlock(r, lkb); + } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { + log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + send_cancel(r, lkb); + } else { + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + } + out: unlock_rsb(r); put_rsb(r); - out: dlm_put_lkb(lkb); } static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms) { - int error = ms->m_result; - /* this is the value returned from do_convert() on the master */ - - switch (error) { + switch (ms->m_result) { case -EAGAIN: /* convert would block (be queued) on remote master */ queue_cast(r, lkb, -EAGAIN); @@ -2839,19 +3097,26 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, break; default: - log_error(r->res_ls, "receive_convert_reply error %d", error); + log_error(r->res_ls, "receive_convert_reply %x error %d", + lkb->lkb_id, ms->m_result); } } static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) { struct dlm_rsb *r = lkb->lkb_resource; + int error; hold_rsb(r); lock_rsb(r); - __receive_convert_reply(r, lkb, ms); + /* stub reply can happen with waiters_mutex held */ + error = remove_from_waiters_ms(lkb, ms); + if (error) + goto out; + __receive_convert_reply(r, lkb, ms); + out: unlock_rsb(r); put_rsb(r); } @@ -2868,37 +3133,38 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_convert_reply not on waiters"); - goto out; - } - _receive_convert_reply(lkb, ms); - out: dlm_put_lkb(lkb); } static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) { struct dlm_rsb *r = lkb->lkb_resource; - int error = ms->m_result; + int error; hold_rsb(r); lock_rsb(r); + /* stub reply can happen with waiters_mutex held */ + error = remove_from_waiters_ms(lkb, ms); + if (error) + goto out; + /* this is the value returned from do_unlock() on the master */ - switch (error) { + switch (ms->m_result) { case -DLM_EUNLOCK: receive_flags_reply(lkb, ms); remove_lock_pc(r, lkb); queue_cast(r, lkb, -DLM_EUNLOCK); break; + case -ENOENT: + break; default: - log_error(r->res_ls, "receive_unlock_reply error %d", error); + log_error(r->res_ls, "receive_unlock_reply %x error %d", + lkb->lkb_id, ms->m_result); } - + out: unlock_rsb(r); put_rsb(r); } @@ -2915,37 +3181,39 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_unlock_reply not on waiters"); - goto out; - } - _receive_unlock_reply(lkb, ms); - out: dlm_put_lkb(lkb); } static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) { struct dlm_rsb *r = lkb->lkb_resource; - int error = ms->m_result; + int error; hold_rsb(r); lock_rsb(r); + /* stub reply can happen with waiters_mutex held */ + error = remove_from_waiters_ms(lkb, ms); + if (error) + goto out; + /* this is the value returned from do_cancel() on the master */ - switch (error) { + switch (ms->m_result) { case -DLM_ECANCEL: receive_flags_reply(lkb, ms); revert_lock_pc(r, lkb); - queue_cast(r, lkb, -DLM_ECANCEL); + if (ms->m_result) + queue_cast(r, lkb, -DLM_ECANCEL); + break; + case 0: break; default: - log_error(r->res_ls, "receive_cancel_reply error %d", error); + log_error(r->res_ls, "receive_cancel_reply %x error %d", + lkb->lkb_id, ms->m_result); } - + out: unlock_rsb(r); put_rsb(r); } @@ -2962,14 +3230,7 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_cancel_reply not on waiters"); - goto out; - } - _receive_cancel_reply(lkb, ms); - out: dlm_put_lkb(lkb); } @@ -2985,20 +3246,17 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) return; } - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_lookup_reply not on waiters"); - goto out; - } - - /* this is the value returned by dlm_dir_lookup on dir node + /* ms->m_result is the value returned by dlm_dir_lookup on dir node FIXME: will a non-zero error ever be returned? */ - error = ms->m_result; r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); + if (error) + goto out; + ret_nodeid = ms->m_nodeid; if (ret_nodeid == dlm_our_nodeid()) { r->res_nodeid = 0; @@ -3009,14 +3267,22 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) r->res_nodeid = ret_nodeid; } + if (is_overlap(lkb)) { + log_debug(ls, "receive_lookup_reply %x unlock %x", + lkb->lkb_id, lkb->lkb_flags); + queue_cast_overlap(r, lkb); + unhold_lkb(lkb); /* undoes create_lkb() */ + goto out_list; + } + _request_lock(r, lkb); + out_list: if (!ret_nodeid) process_lookup_list(r); - + out: unlock_rsb(r); put_rsb(r); - out: dlm_put_lkb(lkb); } @@ -3153,9 +3419,9 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) { if (middle_conversion(lkb)) { hold_lkb(lkb); + ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; ls->ls_stub_ms.m_result = -EINPROGRESS; ls->ls_stub_ms.m_flags = lkb->lkb_flags; - _remove_from_waiters(lkb); _receive_convert_reply(lkb, &ls->ls_stub_ms); /* Same special case as in receive_rcom_lock_args() */ @@ -3227,18 +3493,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_UNLOCK: hold_lkb(lkb); + ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; ls->ls_stub_ms.m_result = -DLM_EUNLOCK; ls->ls_stub_ms.m_flags = lkb->lkb_flags; - _remove_from_waiters(lkb); _receive_unlock_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; case DLM_MSG_CANCEL: hold_lkb(lkb); + ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; ls->ls_stub_ms.m_result = -DLM_ECANCEL; ls->ls_stub_ms.m_flags = lkb->lkb_flags; - _remove_from_waiters(lkb); _receive_cancel_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; @@ -3252,37 +3518,47 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) mutex_unlock(&ls->ls_waiters_mutex); } -static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) +static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) { struct dlm_lkb *lkb; - int rv = 0; + int found = 0; mutex_lock(&ls->ls_waiters_mutex); list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { if (lkb->lkb_flags & DLM_IFL_RESEND) { - rv = lkb->lkb_wait_type; - _remove_from_waiters(lkb); - lkb->lkb_flags &= ~DLM_IFL_RESEND; + hold_lkb(lkb); + found = 1; break; } } mutex_unlock(&ls->ls_waiters_mutex); - if (!rv) + if (!found) lkb = NULL; - *lkb_ret = lkb; - return rv; + return lkb; } /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the master or dir-node for r. Processing the lkb may result in it being placed back on waiters. */ +/* We do this after normal locking has been enabled and any saved messages + (in requestqueue) have been processed. We should be confident that at + this point we won't get or process a reply to any of these waiting + operations. But, new ops may be coming in on the rsbs/locks here from + userspace or remotely. */ + +/* there may have been an overlap unlock/cancel prior to recovery or after + recovery. if before, the lkb may still have a pos wait_count; if after, the + overlap flag would just have been set and nothing new sent. we can be + confident here than any replies to either the initial op or overlap ops + prior to recovery have been received. */ + int dlm_recover_waiters_post(struct dlm_ls *ls) { struct dlm_lkb *lkb; struct dlm_rsb *r; - int error = 0, mstype; + int error = 0, mstype, err, oc, ou; while (1) { if (dlm_locking_stopped(ls)) { @@ -3291,48 +3567,78 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) break; } - mstype = remove_resend_waiter(ls, &lkb); - if (!mstype) + lkb = find_resend_waiter(ls); + if (!lkb) break; r = lkb->lkb_resource; + hold_rsb(r); + lock_rsb(r); + + mstype = lkb->lkb_wait_type; + oc = is_overlap_cancel(lkb); + ou = is_overlap_unlock(lkb); + err = 0; log_debug(ls, "recover_waiters_post %x type %d flags %x %s", lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); - switch (mstype) { - - case DLM_MSG_LOOKUP: - hold_rsb(r); - lock_rsb(r); - _request_lock(r, lkb); - if (is_master(r)) - confirm_master(r, 0); - unlock_rsb(r); - put_rsb(r); - break; - - case DLM_MSG_REQUEST: - hold_rsb(r); - lock_rsb(r); - _request_lock(r, lkb); - if (is_master(r)) - confirm_master(r, 0); - unlock_rsb(r); - put_rsb(r); - break; - - case DLM_MSG_CONVERT: - hold_rsb(r); - lock_rsb(r); - _convert_lock(r, lkb); - unlock_rsb(r); - put_rsb(r); - break; - - default: - log_error(ls, "recover_waiters_post type %d", mstype); + /* At this point we assume that we won't get a reply to any + previous op or overlap op on this lock. First, do a big + remove_from_waiters() for all previous ops. */ + + lkb->lkb_flags &= ~DLM_IFL_RESEND; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + lkb->lkb_wait_type = 0; + lkb->lkb_wait_count = 0; + mutex_lock(&ls->ls_waiters_mutex); + list_del_init(&lkb->lkb_wait_reply); + mutex_unlock(&ls->ls_waiters_mutex); + unhold_lkb(lkb); /* for waiters list */ + + if (oc || ou) { + /* do an unlock or cancel instead of resending */ + switch (mstype) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + queue_cast(r, lkb, ou ? -DLM_EUNLOCK : + -DLM_ECANCEL); + unhold_lkb(lkb); /* undoes create_lkb() */ + break; + case DLM_MSG_CONVERT: + if (oc) { + queue_cast(r, lkb, -DLM_ECANCEL); + } else { + lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; + _unlock_lock(r, lkb); + } + break; + default: + err = 1; + } + } else { + switch (mstype) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + _request_lock(r, lkb); + if (is_master(r)) + confirm_master(r, 0); + break; + case DLM_MSG_CONVERT: + _convert_lock(r, lkb); + break; + default: + err = 1; + } } + + if (err) + log_error(ls, "recover_waiters_post %x %d %x %d %d", + lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou); + unlock_rsb(r); + put_rsb(r); + dlm_put_lkb(lkb); } return error; @@ -3684,7 +3990,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, /* add this new lkb to the per-process list of locks */ spin_lock(&ua->proc->locks_spin); - kref_get(&lkb->lkb_ref); + hold_lkb(lkb); list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); spin_unlock(&ua->proc->locks_spin); out: @@ -3774,6 +4080,9 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error == -DLM_EUNLOCK) error = 0; + /* from validate_unlock_args() */ + if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) + error = 0; if (error) goto out_put; @@ -3786,6 +4095,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, dlm_put_lkb(lkb); out: unlock_recovery(ls); + kfree(ua_tmp); return error; } @@ -3815,33 +4125,37 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error == -DLM_ECANCEL) error = 0; - if (error) - goto out_put; - - /* this lkb was removed from the WAITING queue */ - if (lkb->lkb_grmode == DLM_LOCK_IV) { - spin_lock(&ua->proc->locks_spin); - list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); - spin_unlock(&ua->proc->locks_spin); - } + /* from validate_unlock_args() */ + if (error == -EBUSY) + error = 0; out_put: dlm_put_lkb(lkb); out: unlock_recovery(ls); + kfree(ua_tmp); return error; } +/* lkb's that are removed from the waiters list by revert are just left on the + orphans list with the granted orphan locks, to be freed by purge */ + static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) { struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; + struct dlm_args args; + int error; - if (ua->lksb.sb_lvbptr) - kfree(ua->lksb.sb_lvbptr); - kfree(ua); - lkb->lkb_astparam = (long)NULL; + hold_lkb(lkb); + mutex_lock(&ls->ls_orphans_mutex); + list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); + mutex_unlock(&ls->ls_orphans_mutex); - /* TODO: propogate to master if needed */ - return 0; + set_unlock_args(0, ua, &args); + + error = cancel_lock(ls, lkb, &args); + if (error == -DLM_ECANCEL) + error = 0; + return error; } /* The force flag allows the unlock to go ahead even if the lkb isn't granted. @@ -3853,10 +4167,6 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) struct dlm_args args; int error; - /* FIXME: we need to handle the case where the lkb is in limbo - while the rsb is being looked up, currently we assert in - _unlock_lock/is_remote because rsb nodeid is -1. */ - set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); error = unlock_lock(ls, lkb, &args); @@ -3865,6 +4175,31 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) return error; } +/* We have to release clear_proc_locks mutex before calling unlock_proc_lock() + (which does lock_rsb) due to deadlock with receiving a message that does + lock_rsb followed by dlm_user_add_ast() */ + +static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, + struct dlm_user_proc *proc) +{ + struct dlm_lkb *lkb = NULL; + + mutex_lock(&ls->ls_clear_proc_locks); + if (list_empty(&proc->locks)) + goto out; + + lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); + list_del_init(&lkb->lkb_ownqueue); + + if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) + lkb->lkb_flags |= DLM_IFL_ORPHAN; + else + lkb->lkb_flags |= DLM_IFL_DEAD; + out: + mutex_unlock(&ls->ls_clear_proc_locks); + return lkb; +} + /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, which we clear here. */ @@ -3880,18 +4215,15 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) struct dlm_lkb *lkb, *safe; lock_recovery(ls); - mutex_lock(&ls->ls_clear_proc_locks); - list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) { - list_del_init(&lkb->lkb_ownqueue); - - if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) { - lkb->lkb_flags |= DLM_IFL_ORPHAN; + while (1) { + lkb = del_proc_lock(ls, proc); + if (!lkb) + break; + if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) orphan_proc_lock(ls, lkb); - } else { - lkb->lkb_flags |= DLM_IFL_DEAD; + else unlock_proc_lock(ls, lkb); - } /* this removes the reference for the proc->locks list added by dlm_user_request, it may result in the lkb @@ -3900,6 +4232,8 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } + mutex_lock(&ls->ls_clear_proc_locks); + /* in-progress unlocks */ list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { list_del_init(&lkb->lkb_ownqueue); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index f40817b53c6..f607ca2f079 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -459,6 +459,8 @@ static int new_lockspace(char *name, int namelen, void **lockspace, INIT_LIST_HEAD(&ls->ls_waiters); mutex_init(&ls->ls_waiters_mutex); + INIT_LIST_HEAD(&ls->ls_orphans); + mutex_init(&ls->ls_orphans_mutex); INIT_LIST_HEAD(&ls->ls_nodes); INIT_LIST_HEAD(&ls->ls_nodes_gone); diff --git a/fs/dlm/user.c b/fs/dlm/user.c index 27a75ce571c..c978c67b1ef 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2006 Red Hat, Inc. All rights reserved. + * Copyright (C) 2006-2007 Red Hat, Inc. All rights reserved. * * This copyrighted material is made available to anyone wishing to use, * modify, copy, or redistribute it subject to the terms and conditions @@ -128,35 +128,30 @@ static void compat_output(struct dlm_lock_result *res, } #endif +/* we could possibly check if the cancel of an orphan has resulted in the lkb + being removed and then remove that lkb from the orphans list and free it */ void dlm_user_add_ast(struct dlm_lkb *lkb, int type) { struct dlm_ls *ls; struct dlm_user_args *ua; struct dlm_user_proc *proc; - int remove_ownqueue = 0; + int eol = 0, ast_type; - /* dlm_clear_proc_locks() sets ORPHAN/DEAD flag on each - lkb before dealing with it. We need to check this - flag before taking ls_clear_proc_locks mutex because if - it's set, dlm_clear_proc_locks() holds the mutex. */ - - if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) { - /* log_print("user_add_ast skip1 %x", lkb->lkb_flags); */ + if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) return; - } ls = lkb->lkb_resource->res_ls; mutex_lock(&ls->ls_clear_proc_locks); /* If ORPHAN/DEAD flag is set, it means the process is dead so an ast can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed - lkb->ua so we can't try to use it. */ + lkb->ua so we can't try to use it. This second check is necessary + for cases where a completion ast is received for an operation that + began before clear_proc_locks did its cancel/unlock. */ - if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) { - /* log_print("user_add_ast skip2 %x", lkb->lkb_flags); */ + if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) goto out; - } DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb);); ua = (struct dlm_user_args *)lkb->lkb_astparam; @@ -166,28 +161,42 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int type) goto out; spin_lock(&proc->asts_spin); - if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) { + + ast_type = lkb->lkb_ast_type; + lkb->lkb_ast_type |= type; + + if (!ast_type) { kref_get(&lkb->lkb_ref); list_add_tail(&lkb->lkb_astqueue, &proc->asts); - lkb->lkb_ast_type |= type; wake_up_interruptible(&proc->wait); } - - /* noqueue requests that fail may need to be removed from the - proc's locks list, there should be a better way of detecting - this situation than checking all these things... */ - - if (type == AST_COMP && lkb->lkb_grmode == DLM_LOCK_IV && - ua->lksb.sb_status == -EAGAIN && !list_empty(&lkb->lkb_ownqueue)) - remove_ownqueue = 1; - - /* unlocks or cancels of waiting requests need to be removed from the - proc's unlocking list, again there must be a better way... */ - - if (ua->lksb.sb_status == -DLM_EUNLOCK || + if (type == AST_COMP && (ast_type & AST_COMP)) + log_debug(ls, "ast overlap %x status %x %x", + lkb->lkb_id, ua->lksb.sb_status, lkb->lkb_flags); + + /* Figure out if this lock is at the end of its life and no longer + available for the application to use. The lkb still exists until + the final ast is read. A lock becomes EOL in three situations: + 1. a noqueue request fails with EAGAIN + 2. an unlock completes with EUNLOCK + 3. a cancel of a waiting request completes with ECANCEL + An EOL lock needs to be removed from the process's list of locks. + And we can't allow any new operation on an EOL lock. This is + not related to the lifetime of the lkb struct which is managed + entirely by refcount. */ + + if (type == AST_COMP && + lkb->lkb_grmode == DLM_LOCK_IV && + ua->lksb.sb_status == -EAGAIN) + eol = 1; + else if (ua->lksb.sb_status == -DLM_EUNLOCK || (ua->lksb.sb_status == -DLM_ECANCEL && lkb->lkb_grmode == DLM_LOCK_IV)) - remove_ownqueue = 1; + eol = 1; + if (eol) { + lkb->lkb_ast_type &= ~AST_BAST; + lkb->lkb_flags |= DLM_IFL_ENDOFLIFE; + } /* We want to copy the lvb to userspace when the completion ast is read if the status is 0, the lock has an lvb and @@ -204,11 +213,13 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int type) spin_unlock(&proc->asts_spin); - if (remove_ownqueue) { + if (eol) { spin_lock(&ua->proc->locks_spin); - list_del_init(&lkb->lkb_ownqueue); + if (!list_empty(&lkb->lkb_ownqueue)) { + list_del_init(&lkb->lkb_ownqueue); + dlm_put_lkb(lkb); + } spin_unlock(&ua->proc->locks_spin); - dlm_put_lkb(lkb); } out: mutex_unlock(&ls->ls_clear_proc_locks); -- cgit v1.2.3 From b9af8a788ade3435b53667873774b5366cf73f58 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Wed, 28 Mar 2007 11:08:04 -0500 Subject: [GFS2] use log_error before LM_OUT_ERROR We always want to see the details of the error returned to gfs, but log_debug is often turned off, so use log_error (printk). Signed-off-by: David Teigland Signed-off-by: Steven Whitehouse --- fs/gfs2/locking/dlm/lock.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/locking/dlm/lock.c b/fs/gfs2/locking/dlm/lock.c index f9c8bda289a..c305255bfe8 100644 --- a/fs/gfs2/locking/dlm/lock.c +++ b/fs/gfs2/locking/dlm/lock.c @@ -266,7 +266,7 @@ unsigned int gdlm_do_lock(struct gdlm_lock *lp) } if (error) { - log_debug("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x " + log_error("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x " "flags=%lx", ls->fsname, lp->lockname.ln_type, (unsigned long long)lp->lockname.ln_number, error, lp->cur, lp->req, lp->lkf, lp->flags); @@ -294,7 +294,7 @@ static unsigned int gdlm_do_unlock(struct gdlm_lock *lp) error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, lkf, NULL, lp); if (error) { - log_debug("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x " + log_error("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x " "flags=%lx", ls->fsname, lp->lockname.ln_type, (unsigned long long)lp->lockname.ln_number, error, lp->cur, lp->req, lp->lkf, lp->flags); -- cgit v1.2.3 From f01963f2648cfd708ee8d521b3737cfa55ea8795 Mon Sep 17 00:00:00 2001 From: Steven Whitehouse Date: Mon, 2 Apr 2007 10:03:24 +0100 Subject: [GFS2] Set drop_count to 0 (off) by default This sets the drop_count to 0 by default which is a better default for most people. Signed-off-by: Steven Whitehouse --- fs/gfs2/locking/dlm/lock_dlm.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/gfs2/locking/dlm/lock_dlm.h b/fs/gfs2/locking/dlm/lock_dlm.h index 6888bd49c31..d074c6e6f9b 100644 --- a/fs/gfs2/locking/dlm/lock_dlm.h +++ b/fs/gfs2/locking/dlm/lock_dlm.h @@ -36,7 +36,7 @@ #define GDLM_STRNAME_BYTES 24 #define GDLM_LVB_SIZE 32 -#define GDLM_DROP_COUNT 200000 +#define GDLM_DROP_COUNT 0 #define GDLM_DROP_PERIOD 60 #define GDLM_NAME_LEN 128 -- cgit v1.2.3 From 7e4dac33594468153c38b5c94d8ebcafb0e0a68d Mon Sep 17 00:00:00 2001 From: David Teigland Date: Mon, 2 Apr 2007 09:06:41 -0500 Subject: [DLM] split create_message function This splits the current create_message() function into two parts so that later patches can call the new lower-level _create_message() function when they don't have an rsb struct. No functional change in this patch. Signed-off-by: David Teigland Signed-off-by: Steven Whitehouse --- fs/dlm/lock.c | 54 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 22 deletions(-) (limited to 'fs') diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index b865a46059d..7807958846c 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -2301,31 +2301,14 @@ int dlm_unlock(dlm_lockspace_t *lockspace, * receive_lookup_reply send_lookup_reply */ -static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, - int to_nodeid, int mstype, - struct dlm_message **ms_ret, - struct dlm_mhandle **mh_ret) +static int _create_message(struct dlm_ls *ls, int mb_len, + int to_nodeid, int mstype, + struct dlm_message **ms_ret, + struct dlm_mhandle **mh_ret) { struct dlm_message *ms; struct dlm_mhandle *mh; char *mb; - int mb_len = sizeof(struct dlm_message); - - switch (mstype) { - case DLM_MSG_REQUEST: - case DLM_MSG_LOOKUP: - case DLM_MSG_REMOVE: - mb_len += r->res_length; - break; - case DLM_MSG_CONVERT: - case DLM_MSG_UNLOCK: - case DLM_MSG_REQUEST_REPLY: - case DLM_MSG_CONVERT_REPLY: - case DLM_MSG_GRANT: - if (lkb && lkb->lkb_lvbptr) - mb_len += r->res_ls->ls_lvblen; - break; - } /* get_buffer gives us a message handle (mh) that we need to pass into lowcomms_commit and a message buffer (mb) that we @@ -2340,7 +2323,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, ms = (struct dlm_message *) mb; ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); - ms->m_header.h_lockspace = r->res_ls->ls_global_id; + ms->m_header.h_lockspace = ls->ls_global_id; ms->m_header.h_nodeid = dlm_our_nodeid(); ms->m_header.h_length = mb_len; ms->m_header.h_cmd = DLM_MSG; @@ -2352,6 +2335,33 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, return 0; } +static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, + int to_nodeid, int mstype, + struct dlm_message **ms_ret, + struct dlm_mhandle **mh_ret) +{ + int mb_len = sizeof(struct dlm_message); + + switch (mstype) { + case DLM_MSG_REQUEST: + case DLM_MSG_LOOKUP: + case DLM_MSG_REMOVE: + mb_len += r->res_length; + break; + case DLM_MSG_CONVERT: + case DLM_MSG_UNLOCK: + case DLM_MSG_REQUEST_REPLY: + case DLM_MSG_CONVERT_REPLY: + case DLM_MSG_GRANT: + if (lkb && lkb->lkb_lvbptr) + mb_len += r->res_ls->ls_lvblen; + break; + } + + return _create_message(r->res_ls, mb_len, to_nodeid, mstype, + ms_ret, mh_ret); +} + /* further lowcomms enhancements or alternate implementations may make the return value from this function useful at some point */ -- cgit v1.2.3 From 8499137d4ef1829281e04838113b6b09a0bf1269 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Fri, 30 Mar 2007 15:02:40 -0500 Subject: [DLM] add orphan purging code (1/2) Add code for purging orphan locks. A process can also purge all of its own non-orphan locks by passing a pid of zero. Code already exists for processes to create persistent locks that become orphans when the process exits, but the complimentary capability for another process to then purge these orphans has been missing. Signed-off-by: David Teigland Signed-off-by: Steven Whitehouse --- fs/dlm/dlm_internal.h | 1 + fs/dlm/lock.c | 101 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) (limited to 'fs') diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 178931cca67..30994d68f6a 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -342,6 +342,7 @@ struct dlm_header { #define DLM_MSG_LOOKUP 11 #define DLM_MSG_REMOVE 12 #define DLM_MSG_LOOKUP_REPLY 13 +#define DLM_MSG_PURGE 14 struct dlm_message { struct dlm_header m_header; diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 7807958846c..9d26b3a3967 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -85,6 +85,7 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms); static int receive_extralen(struct dlm_message *ms); +static void do_purge(struct dlm_ls *ls, int nodeid, int pid); /* * Lock compatibilty matrix - thanks Steve @@ -2987,6 +2988,11 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); } +static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) +{ + do_purge(ls, ms->m_nodeid, ms->m_pid); +} + static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; @@ -3409,6 +3415,12 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) receive_lookup_reply(ls, ms); break; + /* other messages */ + + case DLM_MSG_PURGE: + receive_purge(ls, ms); + break; + default: log_error(ls, "unknown message type %d", ms->m_type); } @@ -4260,3 +4272,92 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) unlock_recovery(ls); } +static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) +{ + struct dlm_lkb *lkb, *safe; + + while (1) { + lkb = NULL; + spin_lock(&proc->locks_spin); + if (!list_empty(&proc->locks)) { + lkb = list_entry(proc->locks.next, struct dlm_lkb, + lkb_ownqueue); + list_del_init(&lkb->lkb_ownqueue); + } + spin_unlock(&proc->locks_spin); + + if (!lkb) + break; + + lkb->lkb_flags |= DLM_IFL_DEAD; + unlock_proc_lock(ls, lkb); + dlm_put_lkb(lkb); /* ref from proc->locks list */ + } + + spin_lock(&proc->locks_spin); + list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { + list_del_init(&lkb->lkb_ownqueue); + lkb->lkb_flags |= DLM_IFL_DEAD; + dlm_put_lkb(lkb); + } + spin_unlock(&proc->locks_spin); + + spin_lock(&proc->asts_spin); + list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { + list_del(&lkb->lkb_astqueue); + dlm_put_lkb(lkb); + } + spin_unlock(&proc->asts_spin); +} + +/* pid of 0 means purge all orphans */ + +static void do_purge(struct dlm_ls *ls, int nodeid, int pid) +{ + struct dlm_lkb *lkb, *safe; + + mutex_lock(&ls->ls_orphans_mutex); + list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { + if (pid && lkb->lkb_ownpid != pid) + continue; + unlock_proc_lock(ls, lkb); + list_del_init(&lkb->lkb_ownqueue); + dlm_put_lkb(lkb); + } + mutex_unlock(&ls->ls_orphans_mutex); +} + +static int send_purge(struct dlm_ls *ls, int nodeid, int pid) +{ + struct dlm_message *ms; + struct dlm_mhandle *mh; + int error; + + error = _create_message(ls, sizeof(struct dlm_message), nodeid, + DLM_MSG_PURGE, &ms, &mh); + if (error) + return error; + ms->m_nodeid = nodeid; + ms->m_pid = pid; + + return send_message(mh, ms); +} + +int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, + int nodeid, int pid) +{ + int error = 0; + + if (nodeid != dlm_our_nodeid()) { + error = send_purge(ls, nodeid, pid); + } else { + lock_recovery(ls); + if (pid == current->pid) + purge_proc_locks(ls, proc); + else + do_purge(ls, nodeid, pid); + unlock_recovery(ls); + } + return error; +} + -- cgit v1.2.3 From 72c2be776bd6eec5186e316e6d9dd4aab78d314d Mon Sep 17 00:00:00 2001 From: David Teigland Date: Fri, 30 Mar 2007 15:06:16 -0500 Subject: [DLM] interface for purge (2/2) Add code to accept purge commands from userland. Signed-off-by: David Teigland Signed-off-by: Steven Whitehouse --- fs/dlm/lock.h | 2 ++ fs/dlm/user.c | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+) (limited to 'fs') diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index 0843a3073ec..64fc4ec4066 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -41,6 +41,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, uint32_t flags, uint32_t lkid, char *lvb_in); int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, uint32_t flags, uint32_t lkid); +int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, + int nodeid, int pid); void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc); static inline int is_master(struct dlm_rsb *r) diff --git a/fs/dlm/user.c b/fs/dlm/user.c index c978c67b1ef..3e746a6ebac 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -56,6 +56,7 @@ struct dlm_write_request32 { union { struct dlm_lock_params32 lock; struct dlm_lspace_params lspace; + struct dlm_purge_params purge; } i; }; @@ -92,6 +93,9 @@ static void compat_input(struct dlm_write_request *kb, kb->i.lspace.flags = kb32->i.lspace.flags; kb->i.lspace.minor = kb32->i.lspace.minor; strcpy(kb->i.lspace.name, kb32->i.lspace.name); + } else if (kb->cmd == DLM_USER_PURGE) { + kb->i.purge.nodeid = kb32->i.purge.nodeid; + kb->i.purge.pid = kb32->i.purge.pid; } else { kb->i.lock.mode = kb32->i.lock.mode; kb->i.lock.namelen = kb32->i.lock.namelen; @@ -320,6 +324,22 @@ fail: return error; } +static int device_user_purge(struct dlm_user_proc *proc, + struct dlm_purge_params *params) +{ + struct dlm_ls *ls; + int error; + + ls = dlm_find_lockspace_local(proc->lockspace); + if (!ls) + return -ENOENT; + + error = dlm_user_purge(ls, proc, params->nodeid, params->pid); + + dlm_put_lockspace(ls); + return error; +} + static int device_create_lockspace(struct dlm_lspace_params *params) { dlm_lockspace_t *lockspace; @@ -522,6 +542,14 @@ static ssize_t device_write(struct file *file, const char __user *buf, error = device_remove_lockspace(&kbuf->i.lspace); break; + case DLM_USER_PURGE: + if (!proc) { + log_print("no locking on control device"); + goto out_sig; + } + error = device_user_purge(proc, &kbuf->i.purge); + break; + default: log_print("Unknown command passed to DLM device : %d\n", kbuf->cmd); -- cgit v1.2.3 From ce03f12b37c0bd81ad707d3642241cc55c944711 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Mon, 2 Apr 2007 12:12:55 -0500 Subject: [DLM] change lkid format A lock id is a uint32 and is used as an opaque reference to the lock. For userland apps, the lkid is passed up, through libdlm, as the return value from a write() on the dlm device. This created a problem when the high bit was 1, making the lkid look like an error. This is fixed by changing how the lkid is composed. The low 16 bits identified the hash bucket for the lock and the high 16 bits were a per-bucket counter (which eventually hit 0x8000 causing the problem). These are simply swapped around; the number of hash table buckets is far below 0x8000, making all lkid's positive when viewed as signed. Signed-off-by: David Teigland Signed-off-by: Steven Whitehouse --- fs/dlm/lock.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'fs') diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 9d26b3a3967..eac54d230fd 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -580,7 +580,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) /* counter can roll over so we must verify lkid is not in use */ while (lkid == 0) { - lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16); + lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++; list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { @@ -601,8 +601,8 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) { - uint16_t bucket = lkid & 0xFFFF; struct dlm_lkb *lkb; + uint16_t bucket = (lkid >> 16); list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { if (lkb->lkb_id == lkid) @@ -614,7 +614,7 @@ static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - uint16_t bucket = lkid & 0xFFFF; + uint16_t bucket = (lkid >> 16); if (bucket >= ls->ls_lkbtbl_size) return -EBADSLT; @@ -644,7 +644,7 @@ static void kill_lkb(struct kref *kref) static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) { - uint16_t bucket = lkb->lkb_id & 0xFFFF; + uint16_t bucket = (lkb->lkb_id >> 16); write_lock(&ls->ls_lkbtbl[bucket].lock); if (kref_put(&lkb->lkb_ref, kill_lkb)) { -- cgit v1.2.3 From a43a49066d36612f3bb46653cdb265a89c235eff Mon Sep 17 00:00:00 2001 From: Steven Whitehouse Date: Mon, 2 Apr 2007 10:48:17 +0100 Subject: [GFS2] Fix bz 234168 (ignoring rgrp flags) Ths following patch makes GFS2 use the rgrp flags properly. Although there are also separate flags for both data and metadata as well, I've not implemented these as there seems little use for them. On the otherhand, the "noalloc" flag is generally useful for future changes we might which to make, so this ensures that we interpret it correctly. In addition I fixed the comment above the function which was incorrect. Signed-off-by: Steven Whitehouse --- fs/gfs2/rgrp.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/rgrp.c b/fs/gfs2/rgrp.c index 2ce48d4f246..1727f5012ef 100644 --- a/fs/gfs2/rgrp.c +++ b/fs/gfs2/rgrp.c @@ -698,8 +698,6 @@ struct gfs2_alloc *gfs2_alloc_get(struct gfs2_inode *ip) * @al: the struct gfs2_alloc structure describing the reservation * * If there's room for the requested blocks to be allocated from the RG: - * Sets the $al_reserved_data field in @al. - * Sets the $al_reserved_meta field in @al. * Sets the $al_rgd field in @al. * * Returns: 1 on success (it fits), 0 on failure (it doesn't fit) @@ -710,6 +708,9 @@ static int try_rgrp_fit(struct gfs2_rgrpd *rgd, struct gfs2_alloc *al) struct gfs2_sbd *sdp = rgd->rd_sbd; int ret = 0; + if (rgd->rd_rg.rg_flags & GFS2_RGF_NOALLOC) + return 0; + spin_lock(&sdp->sd_rindex_spin); if (rgd->rd_free_clone >= al->al_requested) { al->al_rgd = rgd; -- cgit v1.2.3 From fc7c44f03d95f20b5446d06f5bb9605cddd53203 Mon Sep 17 00:00:00 2001 From: Patrick Caulfield Date: Tue, 10 Apr 2007 09:40:19 +0100 Subject: [DLM] Remove redundant assignment This patch removes a redundant (and incorrect) assignment from compat_output Signed-Off-By: Patrick Caulfield Signed-off-by: Steven Whitehouse --- fs/dlm/user.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'fs') diff --git a/fs/dlm/user.c b/fs/dlm/user.c index 3e746a6ebac..b0201ec325a 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -115,8 +115,6 @@ static void compat_input(struct dlm_write_request *kb, static void compat_output(struct dlm_lock_result *res, struct dlm_lock_result32 *res32) { - res32->length = res->length - (sizeof(struct dlm_lock_result) - - sizeof(struct dlm_lock_result32)); res32->user_astaddr = (__u32)(long)res->user_astaddr; res32->user_astparam = (__u32)(long)res->user_astparam; res32->user_lksb = (__u32)(long)res->user_lksb; -- cgit v1.2.3 From 6ed7257b46709e87d79ac2b6b819b7e0c9184998 Mon Sep 17 00:00:00 2001 From: Patrick Caulfield Date: Tue, 17 Apr 2007 15:39:57 +0100 Subject: [DLM] Consolidate transport protocols This patch consolidates the TCP & SCTP protocols for the DLM into a single file and makes it switchable at run-time (well, at least before the DLM actually starts up!) For RHEL5 this patch requires Neil Horman's patch that expands the in-kernel socket API but that has already been twice ACKed so it should be OK. The patch adds a new lowcomms.c file that replaces the existing lowcomms-sctp.c & lowcomms-tcp.c files. Signed-off-By: Patrick Caulfield Signed-off-by: Steven Whitehouse --- fs/dlm/Kconfig | 31 +- fs/dlm/Makefile | 6 +- fs/dlm/config.c | 10 +- fs/dlm/config.h | 3 +- fs/dlm/lowcomms-sctp.c | 1210 --------------------------------------- fs/dlm/lowcomms-tcp.c | 1006 --------------------------------- fs/dlm/lowcomms.c | 1468 ++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 1487 insertions(+), 2247 deletions(-) delete mode 100644 fs/dlm/lowcomms-sctp.c delete mode 100644 fs/dlm/lowcomms-tcp.c create mode 100644 fs/dlm/lowcomms.c (limited to 'fs') diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index 6fa7b0d5c04..69a94690e49 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig @@ -3,36 +3,19 @@ menu "Distributed Lock Manager" config DLM tristate "Distributed Lock Manager (DLM)" - depends on SYSFS && (IPV6 || IPV6=n) + depends on IPV6 || IPV6=n select CONFIGFS_FS - select IP_SCTP if DLM_SCTP + select IP_SCTP help - A general purpose distributed lock manager for kernel or userspace - applications. - -choice - prompt "Select DLM communications protocol" - depends on DLM - default DLM_TCP - help - The DLM Can use TCP or SCTP for it's network communications. - SCTP supports multi-homed operations whereas TCP doesn't. - However, SCTP seems to have stability problems at the moment. - -config DLM_TCP - bool "TCP/IP" - -config DLM_SCTP - bool "SCTP" - -endchoice + A general purpose distributed lock manager for kernel or userspace + applications. config DLM_DEBUG bool "DLM debugging" depends on DLM help - Under the debugfs mount point, the name of each lockspace will - appear as a file in the "dlm" directory. The output is the - list of resource and locks the local node knows about. + Under the debugfs mount point, the name of each lockspace will + appear as a file in the "dlm" directory. The output is the + list of resource and locks the local node knows about. endmenu diff --git a/fs/dlm/Makefile b/fs/dlm/Makefile index 65388944eba..604cf7dc5f3 100644 --- a/fs/dlm/Makefile +++ b/fs/dlm/Makefile @@ -8,14 +8,12 @@ dlm-y := ast.o \ member.o \ memory.o \ midcomms.o \ + lowcomms.o \ rcom.o \ recover.o \ recoverd.o \ requestqueue.o \ user.o \ - util.o + util.o dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o -dlm-$(CONFIG_DLM_TCP) += lowcomms-tcp.o - -dlm-$(CONFIG_DLM_SCTP) += lowcomms-sctp.o \ No newline at end of file diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 8665c88e5af..822abdcd143 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -89,6 +89,7 @@ struct cluster { unsigned int cl_toss_secs; unsigned int cl_scan_secs; unsigned int cl_log_debug; + unsigned int cl_protocol; }; enum { @@ -101,6 +102,7 @@ enum { CLUSTER_ATTR_TOSS_SECS, CLUSTER_ATTR_SCAN_SECS, CLUSTER_ATTR_LOG_DEBUG, + CLUSTER_ATTR_PROTOCOL, }; struct cluster_attribute { @@ -159,6 +161,7 @@ CLUSTER_ATTR(recover_timer, 1); CLUSTER_ATTR(toss_secs, 1); CLUSTER_ATTR(scan_secs, 1); CLUSTER_ATTR(log_debug, 0); +CLUSTER_ATTR(protocol, 0); static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, @@ -170,6 +173,7 @@ static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr, + [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr, NULL, }; @@ -904,6 +908,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_TOSS_SECS 10 #define DEFAULT_SCAN_SECS 5 #define DEFAULT_LOG_DEBUG 0 +#define DEFAULT_PROTOCOL 0 struct dlm_config_info dlm_config = { .ci_tcp_port = DEFAULT_TCP_PORT, @@ -914,6 +919,7 @@ struct dlm_config_info dlm_config = { .ci_recover_timer = DEFAULT_RECOVER_TIMER, .ci_toss_secs = DEFAULT_TOSS_SECS, .ci_scan_secs = DEFAULT_SCAN_SECS, - .ci_log_debug = DEFAULT_LOG_DEBUG + .ci_log_debug = DEFAULT_LOG_DEBUG, + .ci_protocol = DEFAULT_PROTOCOL }; diff --git a/fs/dlm/config.h b/fs/dlm/config.h index 1e978611a96..967cc3d72e5 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -26,6 +26,7 @@ struct dlm_config_info { int ci_toss_secs; int ci_scan_secs; int ci_log_debug; + int ci_protocol; }; extern struct dlm_config_info dlm_config; diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c deleted file mode 100644 index dc83a9d979b..00000000000 --- a/fs/dlm/lowcomms-sctp.c +++ /dev/null @@ -1,1210 +0,0 @@ -/****************************************************************************** -******************************************************************************* -** -** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved. -** -** This copyrighted material is made available to anyone wishing to use, -** modify, copy, or redistribute it subject to the terms and conditions -** of the GNU General Public License v.2. -** -******************************************************************************* -******************************************************************************/ - -/* - * lowcomms.c - * - * This is the "low-level" comms layer. - * - * It is responsible for sending/receiving messages - * from other nodes in the cluster. - * - * Cluster nodes are referred to by their nodeids. nodeids are - * simply 32 bit numbers to the locking module - if they need to - * be expanded for the cluster infrastructure then that is it's - * responsibility. It is this layer's - * responsibility to resolve these into IP address or - * whatever it needs for inter-node communication. - * - * The comms level is two kernel threads that deal mainly with - * the receiving of messages from other nodes and passing them - * up to the mid-level comms layer (which understands the - * message format) for execution by the locking core, and - * a send thread which does all the setting up of connections - * to remote nodes and the sending of data. Threads are not allowed - * to send their own data because it may cause them to wait in times - * of high load. Also, this way, the sending thread can collect together - * messages bound for one node and send them in one block. - * - * I don't see any problem with the recv thread executing the locking - * code on behalf of remote processes as the locking code is - * short, efficient and never (well, hardly ever) waits. - * - */ - -#include -#include -#include -#include -#include -#include -#include - -#include "dlm_internal.h" -#include "lowcomms.h" -#include "config.h" -#include "midcomms.h" - -static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; -static int dlm_local_count; -static int dlm_local_nodeid; - -/* One of these per connected node */ - -#define NI_INIT_PENDING 1 -#define NI_WRITE_PENDING 2 - -struct nodeinfo { - spinlock_t lock; - sctp_assoc_t assoc_id; - unsigned long flags; - struct list_head write_list; /* nodes with pending writes */ - struct list_head writequeue; /* outgoing writequeue_entries */ - spinlock_t writequeue_lock; - int nodeid; - struct work_struct swork; /* Send workqueue */ - struct work_struct lwork; /* Locking workqueue */ -}; - -static DEFINE_IDR(nodeinfo_idr); -static DECLARE_RWSEM(nodeinfo_lock); -static int max_nodeid; - -struct cbuf { - unsigned int base; - unsigned int len; - unsigned int mask; -}; - -/* Just the one of these, now. But this struct keeps - the connection-specific variables together */ - -#define CF_READ_PENDING 1 - -struct connection { - struct socket *sock; - unsigned long flags; - struct page *rx_page; - atomic_t waiting_requests; - struct cbuf cb; - int eagain_flag; - struct work_struct work; /* Send workqueue */ -}; - -/* An entry waiting to be sent */ - -struct writequeue_entry { - struct list_head list; - struct page *page; - int offset; - int len; - int end; - int users; - struct nodeinfo *ni; -}; - -static void cbuf_add(struct cbuf *cb, int n) -{ - cb->len += n; -} - -static int cbuf_data(struct cbuf *cb) -{ - return ((cb->base + cb->len) & cb->mask); -} - -static void cbuf_init(struct cbuf *cb, int size) -{ - cb->base = cb->len = 0; - cb->mask = size-1; -} - -static void cbuf_eat(struct cbuf *cb, int n) -{ - cb->len -= n; - cb->base += n; - cb->base &= cb->mask; -} - -/* List of nodes which have writes pending */ -static LIST_HEAD(write_nodes); -static DEFINE_SPINLOCK(write_nodes_lock); - - -/* Maximum number of incoming messages to process before - * doing a schedule() - */ -#define MAX_RX_MSG_COUNT 25 - -/* Work queues */ -static struct workqueue_struct *recv_workqueue; -static struct workqueue_struct *send_workqueue; -static struct workqueue_struct *lock_workqueue; - -/* The SCTP connection */ -static struct connection sctp_con; - -static void process_send_sockets(struct work_struct *work); -static void process_recv_sockets(struct work_struct *work); -static void process_lock_request(struct work_struct *work); - -static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) -{ - struct sockaddr_storage addr; - int error; - - if (!dlm_local_count) - return -1; - - error = dlm_nodeid_to_addr(nodeid, &addr); - if (error) - return error; - - if (dlm_local_addr[0]->ss_family == AF_INET) { - struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; - struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; - ret4->sin_addr.s_addr = in4->sin_addr.s_addr; - } else { - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; - struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; - memcpy(&ret6->sin6_addr, &in6->sin6_addr, - sizeof(in6->sin6_addr)); - } - - return 0; -} - -/* If alloc is 0 here we will not attempt to allocate a new - nodeinfo struct */ -static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) -{ - struct nodeinfo *ni; - int r; - int n; - - down_read(&nodeinfo_lock); - ni = idr_find(&nodeinfo_idr, nodeid); - up_read(&nodeinfo_lock); - - if (ni || !alloc) - return ni; - - down_write(&nodeinfo_lock); - - ni = idr_find(&nodeinfo_idr, nodeid); - if (ni) - goto out_up; - - r = idr_pre_get(&nodeinfo_idr, alloc); - if (!r) - goto out_up; - - ni = kmalloc(sizeof(struct nodeinfo), alloc); - if (!ni) - goto out_up; - - r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n); - if (r) { - kfree(ni); - ni = NULL; - goto out_up; - } - if (n != nodeid) { - idr_remove(&nodeinfo_idr, n); - kfree(ni); - ni = NULL; - goto out_up; - } - memset(ni, 0, sizeof(struct nodeinfo)); - spin_lock_init(&ni->lock); - INIT_LIST_HEAD(&ni->writequeue); - spin_lock_init(&ni->writequeue_lock); - INIT_WORK(&ni->lwork, process_lock_request); - INIT_WORK(&ni->swork, process_send_sockets); - ni->nodeid = nodeid; - - if (nodeid > max_nodeid) - max_nodeid = nodeid; -out_up: - up_write(&nodeinfo_lock); - - return ni; -} - -/* Don't call this too often... */ -static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) -{ - int i; - struct nodeinfo *ni; - - for (i=1; i<=max_nodeid; i++) { - ni = nodeid2nodeinfo(i, 0); - if (ni && ni->assoc_id == assoc) - return ni; - } - return NULL; -} - -/* Data or notification available on socket */ -static void lowcomms_data_ready(struct sock *sk, int count_unused) -{ - if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) - queue_work(recv_workqueue, &sctp_con.work); -} - - -/* Add the port number to an IP6 or 4 sockaddr and return the address length. - Also padd out the struct with zeros to make comparisons meaningful */ - -static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, - int *addr_len) -{ - struct sockaddr_in *local4_addr; - struct sockaddr_in6 *local6_addr; - - if (!dlm_local_count) - return; - - if (!port) { - if (dlm_local_addr[0]->ss_family == AF_INET) { - local4_addr = (struct sockaddr_in *)dlm_local_addr[0]; - port = be16_to_cpu(local4_addr->sin_port); - } else { - local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0]; - port = be16_to_cpu(local6_addr->sin6_port); - } - } - - saddr->ss_family = dlm_local_addr[0]->ss_family; - if (dlm_local_addr[0]->ss_family == AF_INET) { - struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; - in4_addr->sin_port = cpu_to_be16(port); - memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); - memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) - - sizeof(struct sockaddr_in)); - *addr_len = sizeof(struct sockaddr_in); - } else { - struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; - in6_addr->sin6_port = cpu_to_be16(port); - memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) - - sizeof(struct sockaddr_in6)); - *addr_len = sizeof(struct sockaddr_in6); - } -} - -/* Close the connection and tidy up */ -static void close_connection(void) -{ - if (sctp_con.sock) { - sock_release(sctp_con.sock); - sctp_con.sock = NULL; - } - - if (sctp_con.rx_page) { - __free_page(sctp_con.rx_page); - sctp_con.rx_page = NULL; - } -} - -/* We only send shutdown messages to nodes that are not part of the cluster */ -static void send_shutdown(sctp_assoc_t associd) -{ - static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - struct msghdr outmessage; - struct cmsghdr *cmsg; - struct sctp_sndrcvinfo *sinfo; - int ret; - - outmessage.msg_name = NULL; - outmessage.msg_namelen = 0; - outmessage.msg_control = outcmsg; - outmessage.msg_controllen = sizeof(outcmsg); - outmessage.msg_flags = MSG_EOR; - - cmsg = CMSG_FIRSTHDR(&outmessage); - cmsg->cmsg_level = IPPROTO_SCTP; - cmsg->cmsg_type = SCTP_SNDRCV; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); - outmessage.msg_controllen = cmsg->cmsg_len; - sinfo = CMSG_DATA(cmsg); - memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - - sinfo->sinfo_flags |= MSG_EOF; - sinfo->sinfo_assoc_id = associd; - - ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0); - - if (ret != 0) - log_print("send EOF to node failed: %d", ret); -} - - -/* INIT failed but we don't know which node... - restart INIT on all pending nodes */ -static void init_failed(void) -{ - int i; - struct nodeinfo *ni; - - for (i=1; i<=max_nodeid; i++) { - ni = nodeid2nodeinfo(i, 0); - if (!ni) - continue; - - if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) { - ni->assoc_id = 0; - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - queue_work(send_workqueue, &ni->swork); - } - } - } -} - -/* Something happened to an association */ -static void process_sctp_notification(struct msghdr *msg, char *buf) -{ - union sctp_notification *sn = (union sctp_notification *)buf; - - if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { - switch (sn->sn_assoc_change.sac_state) { - - case SCTP_COMM_UP: - case SCTP_RESTART: - { - /* Check that the new node is in the lockspace */ - struct sctp_prim prim; - mm_segment_t fs; - int nodeid; - int prim_len, ret; - int addr_len; - struct nodeinfo *ni; - - /* This seems to happen when we received a connection - * too early... or something... anyway, it happens but - * we always seem to get a real message too, see - * receive_from_sock */ - - if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { - log_print("COMM_UP for invalid assoc ID %d", - (int)sn->sn_assoc_change.sac_assoc_id); - init_failed(); - return; - } - memset(&prim, 0, sizeof(struct sctp_prim)); - prim_len = sizeof(struct sctp_prim); - prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; - - fs = get_fs(); - set_fs(get_ds()); - ret = sctp_con.sock->ops->getsockopt(sctp_con.sock, - IPPROTO_SCTP, - SCTP_PRIMARY_ADDR, - (char*)&prim, - &prim_len); - set_fs(fs); - if (ret < 0) { - struct nodeinfo *ni; - - log_print("getsockopt/sctp_primary_addr on " - "new assoc %d failed : %d", - (int)sn->sn_assoc_change.sac_assoc_id, - ret); - - /* Retry INIT later */ - ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); - if (ni) - clear_bit(NI_INIT_PENDING, &ni->flags); - return; - } - make_sockaddr(&prim.ssp_addr, 0, &addr_len); - if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { - log_print("reject connect from unknown addr"); - send_shutdown(prim.ssp_assoc_id); - return; - } - - ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); - if (!ni) - return; - - /* Save the assoc ID */ - ni->assoc_id = sn->sn_assoc_change.sac_assoc_id; - - log_print("got new/restarted association %d nodeid %d", - (int)sn->sn_assoc_change.sac_assoc_id, nodeid); - - /* Send any pending writes */ - clear_bit(NI_INIT_PENDING, &ni->flags); - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - queue_work(send_workqueue, &ni->swork); - } - } - break; - - case SCTP_COMM_LOST: - case SCTP_SHUTDOWN_COMP: - { - struct nodeinfo *ni; - - ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); - if (ni) { - spin_lock(&ni->lock); - ni->assoc_id = 0; - spin_unlock(&ni->lock); - } - } - break; - - /* We don't know which INIT failed, so clear the PENDING flags - * on them all. if assoc_id is zero then it will then try - * again */ - - case SCTP_CANT_STR_ASSOC: - { - log_print("Can't start SCTP association - retrying"); - init_failed(); - } - break; - - default: - log_print("unexpected SCTP assoc change id=%d state=%d", - (int)sn->sn_assoc_change.sac_assoc_id, - sn->sn_assoc_change.sac_state); - } - } -} - -/* Data received from remote end */ -static int receive_from_sock(void) -{ - int ret = 0; - struct msghdr msg; - struct kvec iov[2]; - unsigned len; - int r; - struct sctp_sndrcvinfo *sinfo; - struct cmsghdr *cmsg; - struct nodeinfo *ni; - - /* These two are marginally too big for stack allocation, but this - * function is (currently) only called by dlm_recvd so static should be - * OK. - */ - static struct sockaddr_storage msgname; - static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - - if (sctp_con.sock == NULL) - goto out; - - if (sctp_con.rx_page == NULL) { - /* - * This doesn't need to be atomic, but I think it should - * improve performance if it is. - */ - sctp_con.rx_page = alloc_page(GFP_ATOMIC); - if (sctp_con.rx_page == NULL) - goto out_resched; - cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE); - } - - memset(&incmsg, 0, sizeof(incmsg)); - memset(&msgname, 0, sizeof(msgname)); - - msg.msg_name = &msgname; - msg.msg_namelen = sizeof(msgname); - msg.msg_flags = 0; - msg.msg_control = incmsg; - msg.msg_controllen = sizeof(incmsg); - msg.msg_iovlen = 1; - - /* I don't see why this circular buffer stuff is necessary for SCTP - * which is a packet-based protocol, but the whole thing breaks under - * load without it! The overhead is minimal (and is in the TCP lowcomms - * anyway, of course) so I'll leave it in until I can figure out what's - * really happening. - */ - - /* - * iov[0] is the bit of the circular buffer between the current end - * point (cb.base + cb.len) and the end of the buffer. - */ - iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb); - iov[0].iov_base = page_address(sctp_con.rx_page) + - cbuf_data(&sctp_con.cb); - iov[1].iov_len = 0; - - /* - * iov[1] is the bit of the circular buffer between the start of the - * buffer and the start of the currently used section (cb.base) - */ - if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) { - iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb); - iov[1].iov_len = sctp_con.cb.base; - iov[1].iov_base = page_address(sctp_con.rx_page); - msg.msg_iovlen = 2; - } - len = iov[0].iov_len + iov[1].iov_len; - - r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len, - MSG_NOSIGNAL | MSG_DONTWAIT); - if (ret <= 0) - goto out_close; - - msg.msg_control = incmsg; - msg.msg_controllen = sizeof(incmsg); - cmsg = CMSG_FIRSTHDR(&msg); - sinfo = CMSG_DATA(cmsg); - - if (msg.msg_flags & MSG_NOTIFICATION) { - process_sctp_notification(&msg, page_address(sctp_con.rx_page)); - return 0; - } - - /* Is this a new association ? */ - ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL); - if (ni) { - ni->assoc_id = sinfo->sinfo_assoc_id; - if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) { - - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - queue_work(send_workqueue, &ni->swork); - } - } - } - - /* INIT sends a message with length of 1 - ignore it */ - if (r == 1) - return 0; - - cbuf_add(&sctp_con.cb, ret); - // PJC: TODO: Add to node's workqueue....can we ?? - ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), - page_address(sctp_con.rx_page), - sctp_con.cb.base, sctp_con.cb.len, - PAGE_CACHE_SIZE); - if (ret < 0) - goto out_close; - cbuf_eat(&sctp_con.cb, ret); - -out: - ret = 0; - goto out_ret; - -out_resched: - lowcomms_data_ready(sctp_con.sock->sk, 0); - ret = 0; - cond_resched(); - goto out_ret; - -out_close: - if (ret != -EAGAIN) - log_print("error reading from sctp socket: %d", ret); -out_ret: - return ret; -} - -/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */ -static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num) -{ - mm_segment_t fs; - int result = 0; - - fs = get_fs(); - set_fs(get_ds()); - if (num == 1) - result = sctp_con.sock->ops->bind(sctp_con.sock, - (struct sockaddr *) addr, - addr_len); - else - result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP, - SCTP_SOCKOPT_BINDX_ADD, - (char *)addr, addr_len); - set_fs(fs); - - if (result < 0) - log_print("Can't bind to port %d addr number %d", - dlm_config.ci_tcp_port, num); - - return result; -} - -static void init_local(void) -{ - struct sockaddr_storage sas, *addr; - int i; - - dlm_local_nodeid = dlm_our_nodeid(); - - for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { - if (dlm_our_addr(&sas, i)) - break; - - addr = kmalloc(sizeof(*addr), GFP_KERNEL); - if (!addr) - break; - memcpy(addr, &sas, sizeof(*addr)); - dlm_local_addr[dlm_local_count++] = addr; - } -} - -/* Initialise SCTP socket and bind to all interfaces */ -static int init_sock(void) -{ - mm_segment_t fs; - struct socket *sock = NULL; - struct sockaddr_storage localaddr; - struct sctp_event_subscribe subscribe; - int result = -EINVAL, num = 1, i, addr_len; - - if (!dlm_local_count) { - init_local(); - if (!dlm_local_count) { - log_print("no local IP address has been set"); - goto out; - } - } - - result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, - IPPROTO_SCTP, &sock); - if (result < 0) { - log_print("Can't create comms socket, check SCTP is loaded"); - goto out; - } - - /* Listen for events */ - memset(&subscribe, 0, sizeof(subscribe)); - subscribe.sctp_data_io_event = 1; - subscribe.sctp_association_event = 1; - subscribe.sctp_send_failure_event = 1; - subscribe.sctp_shutdown_event = 1; - subscribe.sctp_partial_delivery_event = 1; - - fs = get_fs(); - set_fs(get_ds()); - result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS, - (char *)&subscribe, sizeof(subscribe)); - set_fs(fs); - - if (result < 0) { - log_print("Failed to set SCTP_EVENTS on socket: result=%d", - result); - goto create_delsock; - } - - /* Init con struct */ - sock->sk->sk_user_data = &sctp_con; - sctp_con.sock = sock; - sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready; - - /* Bind to all interfaces. */ - for (i = 0; i < dlm_local_count; i++) { - memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); - make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); - - result = add_bind_addr(&localaddr, addr_len, num); - if (result) - goto create_delsock; - ++num; - } - - result = sock->ops->listen(sock, 5); - if (result < 0) { - log_print("Can't set socket listening"); - goto create_delsock; - } - - return 0; - -create_delsock: - sock_release(sock); - sctp_con.sock = NULL; -out: - return result; -} - - -static struct writequeue_entry *new_writequeue_entry(gfp_t allocation) -{ - struct writequeue_entry *entry; - - entry = kmalloc(sizeof(struct writequeue_entry), allocation); - if (!entry) - return NULL; - - entry->page = alloc_page(allocation); - if (!entry->page) { - kfree(entry); - return NULL; - } - - entry->offset = 0; - entry->len = 0; - entry->end = 0; - entry->users = 0; - - return entry; -} - -void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) -{ - struct writequeue_entry *e; - int offset = 0; - int users = 0; - struct nodeinfo *ni; - - ni = nodeid2nodeinfo(nodeid, allocation); - if (!ni) - return NULL; - - spin_lock(&ni->writequeue_lock); - e = list_entry(ni->writequeue.prev, struct writequeue_entry, list); - if ((&e->list == &ni->writequeue) || - (PAGE_CACHE_SIZE - e->end < len)) { - e = NULL; - } else { - offset = e->end; - e->end += len; - users = e->users++; - } - spin_unlock(&ni->writequeue_lock); - - if (e) { - got_one: - if (users == 0) - kmap(e->page); - *ppc = page_address(e->page) + offset; - return e; - } - - e = new_writequeue_entry(allocation); - if (e) { - spin_lock(&ni->writequeue_lock); - offset = e->end; - e->end += len; - e->ni = ni; - users = e->users++; - list_add_tail(&e->list, &ni->writequeue); - spin_unlock(&ni->writequeue_lock); - goto got_one; - } - return NULL; -} - -void dlm_lowcomms_commit_buffer(void *arg) -{ - struct writequeue_entry *e = (struct writequeue_entry *) arg; - int users; - struct nodeinfo *ni = e->ni; - - spin_lock(&ni->writequeue_lock); - users = --e->users; - if (users) - goto out; - e->len = e->end - e->offset; - kunmap(e->page); - spin_unlock(&ni->writequeue_lock); - - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - - queue_work(send_workqueue, &ni->swork); - } - return; - -out: - spin_unlock(&ni->writequeue_lock); - return; -} - -static void free_entry(struct writequeue_entry *e) -{ - __free_page(e->page); - kfree(e); -} - -/* Initiate an SCTP association. In theory we could just use sendmsg() on - the first IP address and it should work, but this allows us to set up the - association before sending any valuable data that we can't afford to lose. - It also keeps the send path clean as it can now always use the association ID */ -static void initiate_association(int nodeid) -{ - struct sockaddr_storage rem_addr; - static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - struct msghdr outmessage; - struct cmsghdr *cmsg; - struct sctp_sndrcvinfo *sinfo; - int ret; - int addrlen; - char buf[1]; - struct kvec iov[1]; - struct nodeinfo *ni; - - log_print("Initiating association with node %d", nodeid); - - ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); - if (!ni) - return; - - if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) { - log_print("no address for nodeid %d", nodeid); - return; - } - - make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); - - outmessage.msg_name = &rem_addr; - outmessage.msg_namelen = addrlen; - outmessage.msg_control = outcmsg; - outmessage.msg_controllen = sizeof(outcmsg); - outmessage.msg_flags = MSG_EOR; - - iov[0].iov_base = buf; - iov[0].iov_len = 1; - - /* Real INIT messages seem to cause trouble. Just send a 1 byte message - we can afford to lose */ - cmsg = CMSG_FIRSTHDR(&outmessage); - cmsg->cmsg_level = IPPROTO_SCTP; - cmsg->cmsg_type = SCTP_SNDRCV; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); - sinfo = CMSG_DATA(cmsg); - memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); - - outmessage.msg_controllen = cmsg->cmsg_len; - ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1); - if (ret < 0) { - log_print("send INIT to node failed: %d", ret); - /* Try again later */ - clear_bit(NI_INIT_PENDING, &ni->flags); - } -} - -/* Send a message */ -static void send_to_sock(struct nodeinfo *ni) -{ - int ret = 0; - struct writequeue_entry *e; - int len, offset; - struct msghdr outmsg; - static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - struct cmsghdr *cmsg; - struct sctp_sndrcvinfo *sinfo; - struct kvec iov; - - /* See if we need to init an association before we start - sending precious messages */ - spin_lock(&ni->lock); - if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { - spin_unlock(&ni->lock); - initiate_association(ni->nodeid); - return; - } - spin_unlock(&ni->lock); - - outmsg.msg_name = NULL; /* We use assoc_id */ - outmsg.msg_namelen = 0; - outmsg.msg_control = outcmsg; - outmsg.msg_controllen = sizeof(outcmsg); - outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR; - - cmsg = CMSG_FIRSTHDR(&outmsg); - cmsg->cmsg_level = IPPROTO_SCTP; - cmsg->cmsg_type = SCTP_SNDRCV; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); - sinfo = CMSG_DATA(cmsg); - memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); - sinfo->sinfo_assoc_id = ni->assoc_id; - outmsg.msg_controllen = cmsg->cmsg_len; - - spin_lock(&ni->writequeue_lock); - for (;;) { - if (list_empty(&ni->writequeue)) - break; - e = list_entry(ni->writequeue.next, struct writequeue_entry, - list); - len = e->len; - offset = e->offset; - BUG_ON(len == 0 && e->users == 0); - spin_unlock(&ni->writequeue_lock); - kmap(e->page); - - ret = 0; - if (len) { - iov.iov_base = page_address(e->page)+offset; - iov.iov_len = len; - - ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1, - len); - if (ret == -EAGAIN) { - sctp_con.eagain_flag = 1; - goto out; - } else if (ret < 0) - goto send_error; - } else { - /* Don't starve people filling buffers */ - cond_resched(); - } - - spin_lock(&ni->writequeue_lock); - e->offset += ret; - e->len -= ret; - - if (e->len == 0 && e->users == 0) { - list_del(&e->list); - kunmap(e->page); - free_entry(e); - continue; - } - } - spin_unlock(&ni->writequeue_lock); -out: - return; - -send_error: - log_print("Error sending to node %d %d", ni->nodeid, ret); - spin_lock(&ni->lock); - if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { - ni->assoc_id = 0; - spin_unlock(&ni->lock); - initiate_association(ni->nodeid); - } else - spin_unlock(&ni->lock); - - return; -} - -/* Try to send any messages that are pending */ -static void process_output_queue(void) -{ - struct list_head *list; - struct list_head *temp; - - spin_lock_bh(&write_nodes_lock); - list_for_each_safe(list, temp, &write_nodes) { - struct nodeinfo *ni = - list_entry(list, struct nodeinfo, write_list); - clear_bit(NI_WRITE_PENDING, &ni->flags); - list_del(&ni->write_list); - - spin_unlock_bh(&write_nodes_lock); - - send_to_sock(ni); - spin_lock_bh(&write_nodes_lock); - } - spin_unlock_bh(&write_nodes_lock); -} - -/* Called after we've had -EAGAIN and been woken up */ -static void refill_write_queue(void) -{ - int i; - - for (i=1; i<=max_nodeid; i++) { - struct nodeinfo *ni = nodeid2nodeinfo(i, 0); - - if (ni) { - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - } - } - } -} - -static void clean_one_writequeue(struct nodeinfo *ni) -{ - struct list_head *list; - struct list_head *temp; - - spin_lock(&ni->writequeue_lock); - list_for_each_safe(list, temp, &ni->writequeue) { - struct writequeue_entry *e = - list_entry(list, struct writequeue_entry, list); - list_del(&e->list); - free_entry(e); - } - spin_unlock(&ni->writequeue_lock); -} - -static void clean_writequeues(void) -{ - int i; - - for (i=1; i<=max_nodeid; i++) { - struct nodeinfo *ni = nodeid2nodeinfo(i, 0); - if (ni) - clean_one_writequeue(ni); - } -} - - -static void dealloc_nodeinfo(void) -{ - int i; - - for (i=1; i<=max_nodeid; i++) { - struct nodeinfo *ni = nodeid2nodeinfo(i, 0); - if (ni) { - idr_remove(&nodeinfo_idr, i); - kfree(ni); - } - } -} - -int dlm_lowcomms_close(int nodeid) -{ - struct nodeinfo *ni; - - ni = nodeid2nodeinfo(nodeid, 0); - if (!ni) - return -1; - - spin_lock(&ni->lock); - if (ni->assoc_id) { - ni->assoc_id = 0; - /* Don't send shutdown here, sctp will just queue it - till the node comes back up! */ - } - spin_unlock(&ni->lock); - - clean_one_writequeue(ni); - clear_bit(NI_INIT_PENDING, &ni->flags); - return 0; -} - -// PJC: The work queue function for receiving. -static void process_recv_sockets(struct work_struct *work) -{ - if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { - int ret; - int count = 0; - - do { - ret = receive_from_sock(); - - /* Don't starve out everyone else */ - if (++count >= MAX_RX_MSG_COUNT) { - cond_resched(); - count = 0; - } - } while (!kthread_should_stop() && ret >=0); - } - cond_resched(); -} - -// PJC: the work queue function for sending -static void process_send_sockets(struct work_struct *work) -{ - if (sctp_con.eagain_flag) { - sctp_con.eagain_flag = 0; - refill_write_queue(); - } - process_output_queue(); -} - -// PJC: Process lock requests from a particular node. -// TODO: can we optimise this out on UP ?? -static void process_lock_request(struct work_struct *work) -{ -} - -static void daemons_stop(void) -{ - destroy_workqueue(recv_workqueue); - destroy_workqueue(send_workqueue); - destroy_workqueue(lock_workqueue); -} - -static int daemons_start(void) -{ - int error; - recv_workqueue = create_workqueue("dlm_recv"); - error = IS_ERR(recv_workqueue); - if (error) { - log_print("can't start dlm_recv %d", error); - return error; - } - - send_workqueue = create_singlethread_workqueue("dlm_send"); - error = IS_ERR(send_workqueue); - if (error) { - log_print("can't start dlm_send %d", error); - destroy_workqueue(recv_workqueue); - return error; - } - - lock_workqueue = create_workqueue("dlm_rlock"); - error = IS_ERR(lock_workqueue); - if (error) { - log_print("can't start dlm_rlock %d", error); - destroy_workqueue(send_workqueue); - destroy_workqueue(recv_workqueue); - return error; - } - - return 0; -} - -/* - * This is quite likely to sleep... - */ -int dlm_lowcomms_start(void) -{ - int error; - - INIT_WORK(&sctp_con.work, process_recv_sockets); - - error = init_sock(); - if (error) - goto fail_sock; - error = daemons_start(); - if (error) - goto fail_sock; - return 0; - -fail_sock: - close_connection(); - return error; -} - -void dlm_lowcomms_stop(void) -{ - int i; - - sctp_con.flags = 0x7; - daemons_stop(); - clean_writequeues(); - close_connection(); - dealloc_nodeinfo(); - max_nodeid = 0; - - dlm_local_count = 0; - dlm_local_nodeid = 0; - - for (i = 0; i < dlm_local_count; i++) - kfree(dlm_local_addr[i]); -} diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c deleted file mode 100644 index 919e92a6aeb..00000000000 --- a/fs/dlm/lowcomms-tcp.c +++ /dev/null @@ -1,1006 +0,0 @@ -/****************************************************************************** -******************************************************************************* -** -** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. -** -** This copyrighted material is made available to anyone wishing to use, -** modify, copy, or redistribute it subject to the terms and conditions -** of the GNU General Public License v.2. -** -******************************************************************************* -******************************************************************************/ - -/* - * lowcomms.c - * - * This is the "low-level" comms layer. - * - * It is responsible for sending/receiving messages - * from other nodes in the cluster. - * - * Cluster nodes are referred to by their nodeids. nodeids are - * simply 32 bit numbers to the locking module - if they need to - * be expanded for the cluster infrastructure then that is it's - * responsibility. It is this layer's - * responsibility to resolve these into IP address or - * whatever it needs for inter-node communication. - * - * The comms level is two kernel threads that deal mainly with - * the receiving of messages from other nodes and passing them - * up to the mid-level comms layer (which understands the - * message format) for execution by the locking core, and - * a send thread which does all the setting up of connections - * to remote nodes and the sending of data. Threads are not allowed - * to send their own data because it may cause them to wait in times - * of high load. Also, this way, the sending thread can collect together - * messages bound for one node and send them in one block. - * - * I don't see any problem with the recv thread executing the locking - * code on behalf of remote processes as the locking code is - * short, efficient and never waits. - * - */ - - -#include -#include -#include -#include - -#include "dlm_internal.h" -#include "lowcomms.h" -#include "midcomms.h" -#include "config.h" - -struct cbuf { - unsigned int base; - unsigned int len; - unsigned int mask; -}; - -#define NODE_INCREMENT 32 -static void cbuf_add(struct cbuf *cb, int n) -{ - cb->len += n; -} - -static int cbuf_data(struct cbuf *cb) -{ - return ((cb->base + cb->len) & cb->mask); -} - -static void cbuf_init(struct cbuf *cb, int size) -{ - cb->base = cb->len = 0; - cb->mask = size-1; -} - -static void cbuf_eat(struct cbuf *cb, int n) -{ - cb->len -= n; - cb->base += n; - cb->base &= cb->mask; -} - -static bool cbuf_empty(struct cbuf *cb) -{ - return cb->len == 0; -} - -/* Maximum number of incoming messages to process before - doing a cond_resched() -*/ -#define MAX_RX_MSG_COUNT 25 - -struct connection { - struct socket *sock; /* NULL if not connected */ - uint32_t nodeid; /* So we know who we are in the list */ - struct mutex sock_mutex; - unsigned long flags; /* bit 1,2 = We are on the read/write lists */ -#define CF_READ_PENDING 1 -#define CF_WRITE_PENDING 2 -#define CF_CONNECT_PENDING 3 -#define CF_IS_OTHERCON 4 - struct list_head writequeue; /* List of outgoing writequeue_entries */ - struct list_head listenlist; /* List of allocated listening sockets */ - spinlock_t writequeue_lock; - int (*rx_action) (struct connection *); /* What to do when active */ - struct page *rx_page; - struct cbuf cb; - int retries; -#define MAX_CONNECT_RETRIES 3 - struct connection *othercon; - struct work_struct rwork; /* Receive workqueue */ - struct work_struct swork; /* Send workqueue */ -}; -#define sock2con(x) ((struct connection *)(x)->sk_user_data) - -/* An entry waiting to be sent */ -struct writequeue_entry { - struct list_head list; - struct page *page; - int offset; - int len; - int end; - int users; - struct connection *con; -}; - -static struct sockaddr_storage dlm_local_addr; - -/* Work queues */ -static struct workqueue_struct *recv_workqueue; -static struct workqueue_struct *send_workqueue; - -/* An array of pointers to connections, indexed by NODEID */ -static struct connection **connections; -static DECLARE_MUTEX(connections_lock); -static struct kmem_cache *con_cache; -static int conn_array_size; - -static void process_recv_sockets(struct work_struct *work); -static void process_send_sockets(struct work_struct *work); - -static struct connection *nodeid2con(int nodeid, gfp_t allocation) -{ - struct connection *con = NULL; - - down(&connections_lock); - if (nodeid >= conn_array_size) { - int new_size = nodeid + NODE_INCREMENT; - struct connection **new_conns; - - new_conns = kzalloc(sizeof(struct connection *) * - new_size, allocation); - if (!new_conns) - goto finish; - - memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); - conn_array_size = new_size; - kfree(connections); - connections = new_conns; - - } - - con = connections[nodeid]; - if (con == NULL && allocation) { - con = kmem_cache_zalloc(con_cache, allocation); - if (!con) - goto finish; - - con->nodeid = nodeid; - mutex_init(&con->sock_mutex); - INIT_LIST_HEAD(&con->writequeue); - spin_lock_init(&con->writequeue_lock); - INIT_WORK(&con->swork, process_send_sockets); - INIT_WORK(&con->rwork, process_recv_sockets); - - connections[nodeid] = con; - } - -finish: - up(&connections_lock); - return con; -} - -/* Data available on socket or listen socket received a connect */ -static void lowcomms_data_ready(struct sock *sk, int count_unused) -{ - struct connection *con = sock2con(sk); - - if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) - queue_work(recv_workqueue, &con->rwork); -} - -static void lowcomms_write_space(struct sock *sk) -{ - struct connection *con = sock2con(sk); - - if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) - queue_work(send_workqueue, &con->swork); -} - -static inline void lowcomms_connect_sock(struct connection *con) -{ - if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) - queue_work(send_workqueue, &con->swork); -} - -static void lowcomms_state_change(struct sock *sk) -{ - if (sk->sk_state == TCP_ESTABLISHED) - lowcomms_write_space(sk); -} - -/* Make a socket active */ -static int add_sock(struct socket *sock, struct connection *con) -{ - con->sock = sock; - - /* Install a data_ready callback */ - con->sock->sk->sk_data_ready = lowcomms_data_ready; - con->sock->sk->sk_write_space = lowcomms_write_space; - con->sock->sk->sk_state_change = lowcomms_state_change; - - return 0; -} - -/* Add the port number to an IP6 or 4 sockaddr and return the address - length */ -static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, - int *addr_len) -{ - saddr->ss_family = dlm_local_addr.ss_family; - if (saddr->ss_family == AF_INET) { - struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; - in4_addr->sin_port = cpu_to_be16(port); - *addr_len = sizeof(struct sockaddr_in); - } else { - struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; - in6_addr->sin6_port = cpu_to_be16(port); - *addr_len = sizeof(struct sockaddr_in6); - } -} - -/* Close a remote connection and tidy up */ -static void close_connection(struct connection *con, bool and_other) -{ - mutex_lock(&con->sock_mutex); - - if (con->sock) { - sock_release(con->sock); - con->sock = NULL; - } - if (con->othercon && and_other) { - /* Will only re-enter once. */ - close_connection(con->othercon, false); - } - if (con->rx_page) { - __free_page(con->rx_page); - con->rx_page = NULL; - } - con->retries = 0; - mutex_unlock(&con->sock_mutex); -} - -/* Data received from remote end */ -static int receive_from_sock(struct connection *con) -{ - int ret = 0; - struct msghdr msg = {}; - struct kvec iov[2]; - unsigned len; - int r; - int call_again_soon = 0; - int nvec; - - mutex_lock(&con->sock_mutex); - - if (con->sock == NULL) { - ret = -EAGAIN; - goto out_close; - } - - if (con->rx_page == NULL) { - /* - * This doesn't need to be atomic, but I think it should - * improve performance if it is. - */ - con->rx_page = alloc_page(GFP_ATOMIC); - if (con->rx_page == NULL) - goto out_resched; - cbuf_init(&con->cb, PAGE_CACHE_SIZE); - } - - /* - * iov[0] is the bit of the circular buffer between the current end - * point (cb.base + cb.len) and the end of the buffer. - */ - iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); - iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); - iov[1].iov_len = 0; - nvec = 1; - - /* - * iov[1] is the bit of the circular buffer between the start of the - * buffer and the start of the currently used section (cb.base) - */ - if (cbuf_data(&con->cb) >= con->cb.base) { - iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); - iov[1].iov_len = con->cb.base; - iov[1].iov_base = page_address(con->rx_page); - nvec = 2; - } - len = iov[0].iov_len + iov[1].iov_len; - - r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, - MSG_DONTWAIT | MSG_NOSIGNAL); - - if (ret <= 0) - goto out_close; - - if (ret == len) - call_again_soon = 1; - cbuf_add(&con->cb, ret); - ret = dlm_process_incoming_buffer(con->nodeid, - page_address(con->rx_page), - con->cb.base, con->cb.len, - PAGE_CACHE_SIZE); - if (ret == -EBADMSG) { - printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, " - "iov_len=%u, iov_base[0]=%p, read=%d\n", - page_address(con->rx_page), con->cb.base, con->cb.len, - len, iov[0].iov_base, r); - } - if (ret < 0) - goto out_close; - cbuf_eat(&con->cb, ret); - - if (cbuf_empty(&con->cb) && !call_again_soon) { - __free_page(con->rx_page); - con->rx_page = NULL; - } - - if (call_again_soon) - goto out_resched; - mutex_unlock(&con->sock_mutex); - return 0; - -out_resched: - if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) - queue_work(recv_workqueue, &con->rwork); - mutex_unlock(&con->sock_mutex); - return -EAGAIN; - -out_close: - mutex_unlock(&con->sock_mutex); - if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { - close_connection(con, false); - /* Reconnect when there is something to send */ - } - /* Don't return success if we really got EOF */ - if (ret == 0) - ret = -EAGAIN; - - return ret; -} - -/* Listening socket is busy, accept a connection */ -static int accept_from_sock(struct connection *con) -{ - int result; - struct sockaddr_storage peeraddr; - struct socket *newsock; - int len; - int nodeid; - struct connection *newcon; - struct connection *addcon; - - memset(&peeraddr, 0, sizeof(peeraddr)); - result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, - IPPROTO_TCP, &newsock); - if (result < 0) - return -ENOMEM; - - mutex_lock_nested(&con->sock_mutex, 0); - - result = -ENOTCONN; - if (con->sock == NULL) - goto accept_err; - - newsock->type = con->sock->type; - newsock->ops = con->sock->ops; - - result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); - if (result < 0) - goto accept_err; - - /* Get the connected socket's peer */ - memset(&peeraddr, 0, sizeof(peeraddr)); - if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, - &len, 2)) { - result = -ECONNABORTED; - goto accept_err; - } - - /* Get the new node's NODEID */ - make_sockaddr(&peeraddr, 0, &len); - if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { - printk("dlm: connect from non cluster node\n"); - sock_release(newsock); - mutex_unlock(&con->sock_mutex); - return -1; - } - - log_print("got connection from %d", nodeid); - - /* Check to see if we already have a connection to this node. This - * could happen if the two nodes initiate a connection at roughly - * the same time and the connections cross on the wire. - * TEMPORARY FIX: - * In this case we store the incoming one in "othercon" - */ - newcon = nodeid2con(nodeid, GFP_KERNEL); - if (!newcon) { - result = -ENOMEM; - goto accept_err; - } - mutex_lock_nested(&newcon->sock_mutex, 1); - if (newcon->sock) { - struct connection *othercon = newcon->othercon; - - if (!othercon) { - othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); - if (!othercon) { - printk("dlm: failed to allocate incoming socket\n"); - mutex_unlock(&newcon->sock_mutex); - result = -ENOMEM; - goto accept_err; - } - othercon->nodeid = nodeid; - othercon->rx_action = receive_from_sock; - mutex_init(&othercon->sock_mutex); - INIT_WORK(&othercon->swork, process_send_sockets); - INIT_WORK(&othercon->rwork, process_recv_sockets); - set_bit(CF_IS_OTHERCON, &othercon->flags); - newcon->othercon = othercon; - } - othercon->sock = newsock; - newsock->sk->sk_user_data = othercon; - add_sock(newsock, othercon); - addcon = othercon; - } - else { - newsock->sk->sk_user_data = newcon; - newcon->rx_action = receive_from_sock; - add_sock(newsock, newcon); - addcon = newcon; - } - - mutex_unlock(&newcon->sock_mutex); - - /* - * Add it to the active queue in case we got data - * beween processing the accept adding the socket - * to the read_sockets list - */ - if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) - queue_work(recv_workqueue, &addcon->rwork); - mutex_unlock(&con->sock_mutex); - - return 0; - -accept_err: - mutex_unlock(&con->sock_mutex); - sock_release(newsock); - - if (result != -EAGAIN) - printk("dlm: error accepting connection from node: %d\n", result); - return result; -} - -/* Connect a new socket to its peer */ -static void connect_to_sock(struct connection *con) -{ - int result = -EHOSTUNREACH; - struct sockaddr_storage saddr; - int addr_len; - struct socket *sock; - - if (con->nodeid == 0) { - log_print("attempt to connect sock 0 foiled"); - return; - } - - mutex_lock(&con->sock_mutex); - if (con->retries++ > MAX_CONNECT_RETRIES) - goto out; - - /* Some odd races can cause double-connects, ignore them */ - if (con->sock) { - result = 0; - goto out; - } - - /* Create a socket to communicate with */ - result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, - IPPROTO_TCP, &sock); - if (result < 0) - goto out_err; - - memset(&saddr, 0, sizeof(saddr)); - if (dlm_nodeid_to_addr(con->nodeid, &saddr)) - goto out_err; - - sock->sk->sk_user_data = con; - con->rx_action = receive_from_sock; - - make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); - - add_sock(sock, con); - - log_print("connecting to %d", con->nodeid); - result = - sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, - O_NONBLOCK); - if (result == -EINPROGRESS) - result = 0; - if (result == 0) - goto out; - -out_err: - if (con->sock) { - sock_release(con->sock); - con->sock = NULL; - } - /* - * Some errors are fatal and this list might need adjusting. For other - * errors we try again until the max number of retries is reached. - */ - if (result != -EHOSTUNREACH && result != -ENETUNREACH && - result != -ENETDOWN && result != EINVAL - && result != -EPROTONOSUPPORT) { - lowcomms_connect_sock(con); - result = 0; - } -out: - mutex_unlock(&con->sock_mutex); - return; -} - -static struct socket *create_listen_sock(struct connection *con, - struct sockaddr_storage *saddr) -{ - struct socket *sock = NULL; - mm_segment_t fs; - int result = 0; - int one = 1; - int addr_len; - - if (dlm_local_addr.ss_family == AF_INET) - addr_len = sizeof(struct sockaddr_in); - else - addr_len = sizeof(struct sockaddr_in6); - - /* Create a socket to communicate with */ - result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); - if (result < 0) { - printk("dlm: Can't create listening comms socket\n"); - goto create_out; - } - - fs = get_fs(); - set_fs(get_ds()); - result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&one, sizeof(one)); - set_fs(fs); - if (result < 0) { - printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n", - result); - } - sock->sk->sk_user_data = con; - con->rx_action = accept_from_sock; - con->sock = sock; - - /* Bind to our port */ - make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); - result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); - if (result < 0) { - printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port); - sock_release(sock); - sock = NULL; - con->sock = NULL; - goto create_out; - } - - fs = get_fs(); - set_fs(get_ds()); - - result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, - (char *)&one, sizeof(one)); - set_fs(fs); - if (result < 0) { - printk("dlm: Set keepalive failed: %d\n", result); - } - - result = sock->ops->listen(sock, 5); - if (result < 0) { - printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port); - sock_release(sock); - sock = NULL; - goto create_out; - } - -create_out: - return sock; -} - - -/* Listen on all interfaces */ -static int listen_for_all(void) -{ - struct socket *sock = NULL; - struct connection *con = nodeid2con(0, GFP_KERNEL); - int result = -EINVAL; - - /* We don't support multi-homed hosts */ - set_bit(CF_IS_OTHERCON, &con->flags); - - sock = create_listen_sock(con, &dlm_local_addr); - if (sock) { - add_sock(sock, con); - result = 0; - } - else { - result = -EADDRINUSE; - } - - return result; -} - - - -static struct writequeue_entry *new_writequeue_entry(struct connection *con, - gfp_t allocation) -{ - struct writequeue_entry *entry; - - entry = kmalloc(sizeof(struct writequeue_entry), allocation); - if (!entry) - return NULL; - - entry->page = alloc_page(allocation); - if (!entry->page) { - kfree(entry); - return NULL; - } - - entry->offset = 0; - entry->len = 0; - entry->end = 0; - entry->users = 0; - entry->con = con; - - return entry; -} - -void *dlm_lowcomms_get_buffer(int nodeid, int len, - gfp_t allocation, char **ppc) -{ - struct connection *con; - struct writequeue_entry *e; - int offset = 0; - int users = 0; - - con = nodeid2con(nodeid, allocation); - if (!con) - return NULL; - - spin_lock(&con->writequeue_lock); - e = list_entry(con->writequeue.prev, struct writequeue_entry, list); - if ((&e->list == &con->writequeue) || - (PAGE_CACHE_SIZE - e->end < len)) { - e = NULL; - } else { - offset = e->end; - e->end += len; - users = e->users++; - } - spin_unlock(&con->writequeue_lock); - - if (e) { - got_one: - if (users == 0) - kmap(e->page); - *ppc = page_address(e->page) + offset; - return e; - } - - e = new_writequeue_entry(con, allocation); - if (e) { - spin_lock(&con->writequeue_lock); - offset = e->end; - e->end += len; - users = e->users++; - list_add_tail(&e->list, &con->writequeue); - spin_unlock(&con->writequeue_lock); - goto got_one; - } - return NULL; -} - -void dlm_lowcomms_commit_buffer(void *mh) -{ - struct writequeue_entry *e = (struct writequeue_entry *)mh; - struct connection *con = e->con; - int users; - - spin_lock(&con->writequeue_lock); - users = --e->users; - if (users) - goto out; - e->len = e->end - e->offset; - kunmap(e->page); - spin_unlock(&con->writequeue_lock); - - if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { - queue_work(send_workqueue, &con->swork); - } - return; - -out: - spin_unlock(&con->writequeue_lock); - return; -} - -static void free_entry(struct writequeue_entry *e) -{ - __free_page(e->page); - kfree(e); -} - -/* Send a message */ -static void send_to_sock(struct connection *con) -{ - int ret = 0; - ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); - const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; - struct writequeue_entry *e; - int len, offset; - - mutex_lock(&con->sock_mutex); - if (con->sock == NULL) - goto out_connect; - - sendpage = con->sock->ops->sendpage; - - spin_lock(&con->writequeue_lock); - for (;;) { - e = list_entry(con->writequeue.next, struct writequeue_entry, - list); - if ((struct list_head *) e == &con->writequeue) - break; - - len = e->len; - offset = e->offset; - BUG_ON(len == 0 && e->users == 0); - spin_unlock(&con->writequeue_lock); - kmap(e->page); - - ret = 0; - if (len) { - ret = sendpage(con->sock, e->page, offset, len, - msg_flags); - if (ret == -EAGAIN || ret == 0) - goto out; - if (ret <= 0) - goto send_error; - } - else { - /* Don't starve people filling buffers */ - cond_resched(); - } - - spin_lock(&con->writequeue_lock); - e->offset += ret; - e->len -= ret; - - if (e->len == 0 && e->users == 0) { - list_del(&e->list); - kunmap(e->page); - free_entry(e); - continue; - } - } - spin_unlock(&con->writequeue_lock); -out: - mutex_unlock(&con->sock_mutex); - return; - -send_error: - mutex_unlock(&con->sock_mutex); - close_connection(con, false); - lowcomms_connect_sock(con); - return; - -out_connect: - mutex_unlock(&con->sock_mutex); - connect_to_sock(con); - return; -} - -static void clean_one_writequeue(struct connection *con) -{ - struct list_head *list; - struct list_head *temp; - - spin_lock(&con->writequeue_lock); - list_for_each_safe(list, temp, &con->writequeue) { - struct writequeue_entry *e = - list_entry(list, struct writequeue_entry, list); - list_del(&e->list); - free_entry(e); - } - spin_unlock(&con->writequeue_lock); -} - -/* Called from recovery when it knows that a node has - left the cluster */ -int dlm_lowcomms_close(int nodeid) -{ - struct connection *con; - - if (!connections) - goto out; - - log_print("closing connection to node %d", nodeid); - con = nodeid2con(nodeid, 0); - if (con) { - clean_one_writequeue(con); - close_connection(con, true); - } - return 0; - -out: - return -1; -} - -/* Look for activity on active sockets */ -static void process_recv_sockets(struct work_struct *work) -{ - struct connection *con = container_of(work, struct connection, rwork); - int err; - - clear_bit(CF_READ_PENDING, &con->flags); - do { - err = con->rx_action(con); - } while (!err); -} - - -static void process_send_sockets(struct work_struct *work) -{ - struct connection *con = container_of(work, struct connection, swork); - - if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { - connect_to_sock(con); - } - - clear_bit(CF_WRITE_PENDING, &con->flags); - send_to_sock(con); -} - - -/* Discard all entries on the write queues */ -static void clean_writequeues(void) -{ - int nodeid; - - for (nodeid = 1; nodeid < conn_array_size; nodeid++) { - struct connection *con = nodeid2con(nodeid, 0); - - if (con) - clean_one_writequeue(con); - } -} - -static void work_stop(void) -{ - destroy_workqueue(recv_workqueue); - destroy_workqueue(send_workqueue); -} - -static int work_start(void) -{ - int error; - recv_workqueue = create_workqueue("dlm_recv"); - error = IS_ERR(recv_workqueue); - if (error) { - log_print("can't start dlm_recv %d", error); - return error; - } - - send_workqueue = create_singlethread_workqueue("dlm_send"); - error = IS_ERR(send_workqueue); - if (error) { - log_print("can't start dlm_send %d", error); - destroy_workqueue(recv_workqueue); - return error; - } - - return 0; -} - -void dlm_lowcomms_stop(void) -{ - int i; - - /* Set all the flags to prevent any - socket activity. - */ - for (i = 0; i < conn_array_size; i++) { - if (connections[i]) - connections[i]->flags |= 0xFF; - } - - work_stop(); - clean_writequeues(); - - for (i = 0; i < conn_array_size; i++) { - if (connections[i]) { - close_connection(connections[i], true); - if (connections[i]->othercon) - kmem_cache_free(con_cache, connections[i]->othercon); - kmem_cache_free(con_cache, connections[i]); - } - } - - kfree(connections); - connections = NULL; - - kmem_cache_destroy(con_cache); -} - -/* This is quite likely to sleep... */ -int dlm_lowcomms_start(void) -{ - int error = 0; - - error = -ENOMEM; - connections = kzalloc(sizeof(struct connection *) * - NODE_INCREMENT, GFP_KERNEL); - if (!connections) - goto out; - - conn_array_size = NODE_INCREMENT; - - if (dlm_our_addr(&dlm_local_addr, 0)) { - log_print("no local IP address has been set"); - goto fail_free_conn; - } - if (!dlm_our_addr(&dlm_local_addr, 1)) { - log_print("This dlm comms module does not support multi-homed clustering"); - goto fail_free_conn; - } - - con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), - __alignof__(struct connection), 0, - NULL, NULL); - if (!con_cache) - goto fail_free_conn; - - - /* Start listening */ - error = listen_for_all(); - if (error) - goto fail_unlisten; - - error = work_start(); - if (error) - goto fail_unlisten; - - return 0; - -fail_unlisten: - close_connection(connections[0], false); - kmem_cache_free(con_cache, connections[0]); - kmem_cache_destroy(con_cache); - -fail_free_conn: - kfree(connections); - -out: - return error; -} - -/* - * Overrides for Emacs so that we follow Linus's tabbing style. - * Emacs will notice this stuff at the end of the file and automatically - * adjust the settings for this buffer only. This must remain at the end - * of the file. - * --------------------------------------------------------------------------- - * Local variables: - * c-file-style: "linux" - * End: - */ diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c new file mode 100644 index 00000000000..76399b7819b --- /dev/null +++ b/fs/dlm/lowcomms.c @@ -0,0 +1,1468 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +/* + * lowcomms.c + * + * This is the "low-level" comms layer. + * + * It is responsible for sending/receiving messages + * from other nodes in the cluster. + * + * Cluster nodes are referred to by their nodeids. nodeids are + * simply 32 bit numbers to the locking module - if they need to + * be expanded for the cluster infrastructure then that is it's + * responsibility. It is this layer's + * responsibility to resolve these into IP address or + * whatever it needs for inter-node communication. + * + * The comms level is two kernel threads that deal mainly with + * the receiving of messages from other nodes and passing them + * up to the mid-level comms layer (which understands the + * message format) for execution by the locking core, and + * a send thread which does all the setting up of connections + * to remote nodes and the sending of data. Threads are not allowed + * to send their own data because it may cause them to wait in times + * of high load. Also, this way, the sending thread can collect together + * messages bound for one node and send them in one block. + * + * lowcomms will choose to use wither TCP or SCTP as its transport layer + * depending on the configuration variable 'protocol'. This should be set + * to 0 (default) for TCP or 1 for SCTP. It shouldbe configured using a + * cluster-wide mechanism as it must be the same on all nodes of the cluster + * for the DLM to function. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "dlm_internal.h" +#include "lowcomms.h" +#include "midcomms.h" +#include "config.h" + +#define NEEDED_RMEM (4*1024*1024) + +struct cbuf { + unsigned int base; + unsigned int len; + unsigned int mask; +}; + +static void cbuf_add(struct cbuf *cb, int n) +{ + cb->len += n; +} + +static int cbuf_data(struct cbuf *cb) +{ + return ((cb->base + cb->len) & cb->mask); +} + +static void cbuf_init(struct cbuf *cb, int size) +{ + cb->base = cb->len = 0; + cb->mask = size-1; +} + +static void cbuf_eat(struct cbuf *cb, int n) +{ + cb->len -= n; + cb->base += n; + cb->base &= cb->mask; +} + +static bool cbuf_empty(struct cbuf *cb) +{ + return cb->len == 0; +} + +struct connection { + struct socket *sock; /* NULL if not connected */ + uint32_t nodeid; /* So we know who we are in the list */ + struct mutex sock_mutex; + unsigned long flags; +#define CF_READ_PENDING 1 +#define CF_WRITE_PENDING 2 +#define CF_CONNECT_PENDING 3 +#define CF_INIT_PENDING 4 +#define CF_IS_OTHERCON 5 + struct list_head writequeue; /* List of outgoing writequeue_entries */ + spinlock_t writequeue_lock; + int (*rx_action) (struct connection *); /* What to do when active */ + void (*connect_action) (struct connection *); /* What to do to connect */ + struct page *rx_page; + struct cbuf cb; + int retries; +#define MAX_CONNECT_RETRIES 3 + int sctp_assoc; + struct connection *othercon; + struct work_struct rwork; /* Receive workqueue */ + struct work_struct swork; /* Send workqueue */ +}; +#define sock2con(x) ((struct connection *)(x)->sk_user_data) + +/* An entry waiting to be sent */ +struct writequeue_entry { + struct list_head list; + struct page *page; + int offset; + int len; + int end; + int users; + struct connection *con; +}; + +static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; +static int dlm_local_count; + +/* Work queues */ +static struct workqueue_struct *recv_workqueue; +static struct workqueue_struct *send_workqueue; + +static DEFINE_IDR(connections_idr); +static DECLARE_MUTEX(connections_lock); +static int max_nodeid; +static struct kmem_cache *con_cache; + +static void process_recv_sockets(struct work_struct *work); +static void process_send_sockets(struct work_struct *work); + +/* + * If 'allocation' is zero then we don't attempt to create a new + * connection structure for this node. + */ +static struct connection *__nodeid2con(int nodeid, gfp_t alloc) +{ + struct connection *con = NULL; + int r; + int n; + + con = idr_find(&connections_idr, nodeid); + if (con || !alloc) + return con; + + r = idr_pre_get(&connections_idr, alloc); + if (!r) + return NULL; + + con = kmem_cache_zalloc(con_cache, alloc); + if (!con) + return NULL; + + r = idr_get_new_above(&connections_idr, con, nodeid, &n); + if (r) { + kmem_cache_free(con_cache, con); + return NULL; + } + + if (n != nodeid) { + idr_remove(&connections_idr, n); + kmem_cache_free(con_cache, con); + return NULL; + } + + con->nodeid = nodeid; + mutex_init(&con->sock_mutex); + INIT_LIST_HEAD(&con->writequeue); + spin_lock_init(&con->writequeue_lock); + INIT_WORK(&con->swork, process_send_sockets); + INIT_WORK(&con->rwork, process_recv_sockets); + + /* Setup action pointers for child sockets */ + if (con->nodeid) { + struct connection *zerocon = idr_find(&connections_idr, 0); + + con->connect_action = zerocon->connect_action; + if (!con->rx_action) + con->rx_action = zerocon->rx_action; + } + + if (nodeid > max_nodeid) + max_nodeid = nodeid; + + return con; +} + +static struct connection *nodeid2con(int nodeid, gfp_t allocation) +{ + struct connection *con; + + down(&connections_lock); + con = __nodeid2con(nodeid, allocation); + up(&connections_lock); + + return con; +} + +/* This is a bit drastic, but only called when things go wrong */ +static struct connection *assoc2con(int assoc_id) +{ + int i; + struct connection *con; + + down(&connections_lock); + for (i=0; isctp_assoc == assoc_id) { + up(&connections_lock); + return con; + } + } + up(&connections_lock); + return NULL; +} + +static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) +{ + struct sockaddr_storage addr; + int error; + + if (!dlm_local_count) + return -1; + + error = dlm_nodeid_to_addr(nodeid, &addr); + if (error) + return error; + + if (dlm_local_addr[0]->ss_family == AF_INET) { + struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; + struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; + ret4->sin_addr.s_addr = in4->sin_addr.s_addr; + } else { + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; + struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; + memcpy(&ret6->sin6_addr, &in6->sin6_addr, + sizeof(in6->sin6_addr)); + } + + return 0; +} + +/* Data available on socket or listen socket received a connect */ +static void lowcomms_data_ready(struct sock *sk, int count_unused) +{ + struct connection *con = sock2con(sk); + if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) + queue_work(recv_workqueue, &con->rwork); +} + +static void lowcomms_write_space(struct sock *sk) +{ + struct connection *con = sock2con(sk); + + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); +} + +static inline void lowcomms_connect_sock(struct connection *con) +{ + if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); +} + +static void lowcomms_state_change(struct sock *sk) +{ + if (sk->sk_state == TCP_ESTABLISHED) + lowcomms_write_space(sk); +} + +/* Make a socket active */ +static int add_sock(struct socket *sock, struct connection *con) +{ + con->sock = sock; + + /* Install a data_ready callback */ + con->sock->sk->sk_data_ready = lowcomms_data_ready; + con->sock->sk->sk_write_space = lowcomms_write_space; + con->sock->sk->sk_state_change = lowcomms_state_change; + con->sock->sk->sk_user_data = con; + return 0; +} + +/* Add the port number to an IPv6 or 4 sockaddr and return the address + length */ +static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, + int *addr_len) +{ + saddr->ss_family = dlm_local_addr[0]->ss_family; + if (saddr->ss_family == AF_INET) { + struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; + in4_addr->sin_port = cpu_to_be16(port); + *addr_len = sizeof(struct sockaddr_in); + memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); + } else { + struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; + in6_addr->sin6_port = cpu_to_be16(port); + *addr_len = sizeof(struct sockaddr_in6); + } +} + +/* Close a remote connection and tidy up */ +static void close_connection(struct connection *con, bool and_other) +{ + mutex_lock(&con->sock_mutex); + + if (con->sock) { + sock_release(con->sock); + con->sock = NULL; + } + if (con->othercon && and_other) { + /* Will only re-enter once. */ + close_connection(con->othercon, false); + } + if (con->rx_page) { + __free_page(con->rx_page); + con->rx_page = NULL; + } + con->retries = 0; + mutex_unlock(&con->sock_mutex); +} + +/* We only send shutdown messages to nodes that are not part of the cluster */ +static void sctp_send_shutdown(sctp_assoc_t associd) +{ + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + struct msghdr outmessage; + struct cmsghdr *cmsg; + struct sctp_sndrcvinfo *sinfo; + int ret; + struct connection *con; + + con = nodeid2con(0,0); + BUG_ON(con == NULL); + + outmessage.msg_name = NULL; + outmessage.msg_namelen = 0; + outmessage.msg_control = outcmsg; + outmessage.msg_controllen = sizeof(outcmsg); + outmessage.msg_flags = MSG_EOR; + + cmsg = CMSG_FIRSTHDR(&outmessage); + cmsg->cmsg_level = IPPROTO_SCTP; + cmsg->cmsg_type = SCTP_SNDRCV; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); + outmessage.msg_controllen = cmsg->cmsg_len; + sinfo = CMSG_DATA(cmsg); + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); + + sinfo->sinfo_flags |= MSG_EOF; + sinfo->sinfo_assoc_id = associd; + + ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); + + if (ret != 0) + log_print("send EOF to node failed: %d", ret); +} + +/* INIT failed but we don't know which node... + restart INIT on all pending nodes */ +static void sctp_init_failed(void) +{ + int i; + struct connection *con; + + down(&connections_lock); + for (i=1; i<=max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (!con) + continue; + con->sctp_assoc = 0; + if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { + queue_work(send_workqueue, &con->swork); + } + } + } + up(&connections_lock); +} + +/* Something happened to an association */ +static void process_sctp_notification(struct connection *con, struct msghdr *msg, char *buf) +{ + union sctp_notification *sn = (union sctp_notification *)buf; + + if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { + switch (sn->sn_assoc_change.sac_state) { + + case SCTP_COMM_UP: + case SCTP_RESTART: + { + /* Check that the new node is in the lockspace */ + struct sctp_prim prim; + int nodeid; + int prim_len, ret; + int addr_len; + struct connection *new_con; + struct file *file; + sctp_peeloff_arg_t parg; + int parglen = sizeof(parg); + + /* + * We get this before any data for an association. + * We verify that the node is in the cluster and + * then peel off a socket for it. + */ + if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { + log_print("COMM_UP for invalid assoc ID %d", + (int)sn->sn_assoc_change.sac_assoc_id); + sctp_init_failed(); + return; + } + memset(&prim, 0, sizeof(struct sctp_prim)); + prim_len = sizeof(struct sctp_prim); + prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; + + ret = kernel_getsockopt(con->sock, + IPPROTO_SCTP, + SCTP_PRIMARY_ADDR, + (char*)&prim, + &prim_len); + if (ret < 0) { + log_print("getsockopt/sctp_primary_addr on " + "new assoc %d failed : %d", + (int)sn->sn_assoc_change.sac_assoc_id, + ret); + + /* Retry INIT later */ + new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); + if (new_con) + clear_bit(CF_CONNECT_PENDING, &con->flags); + return; + } + make_sockaddr(&prim.ssp_addr, 0, &addr_len); + if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { + int i; + unsigned char *b=(unsigned char *)&prim.ssp_addr; + log_print("reject connect from unknown addr"); + for (i=0; isn_assoc_change.sac_assoc_id; + ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, SCTP_SOCKOPT_PEELOFF, + (void *)&parg, &parglen); + if (ret < 0) { + log_print("Can't peel off a socket for connection %d to node %d: err=%d\n", + parg.associd, nodeid, ret); + return; + } + + file = fget(parg.sd); + new_con->sock = SOCKET_I(file->f_dentry->d_inode); + add_sock(new_con->sock, new_con); + fput(file); + put_unused_fd(parg.sd); + + log_print("got new/restarted association %d nodeid %d", + (int)sn->sn_assoc_change.sac_assoc_id, nodeid); + + /* Send any pending writes */ + clear_bit(CF_CONNECT_PENDING, &new_con->flags); + clear_bit(CF_INIT_PENDING, &con->flags); + if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { + queue_work(send_workqueue, &new_con->swork); + } + if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) + queue_work(recv_workqueue, &new_con->rwork); + } + break; + + case SCTP_COMM_LOST: + case SCTP_SHUTDOWN_COMP: + { + con = assoc2con(sn->sn_assoc_change.sac_assoc_id); + if (con) { + con->sctp_assoc = 0; + } + } + break; + + /* We don't know which INIT failed, so clear the PENDING flags + * on them all. if assoc_id is zero then it will then try + * again */ + + case SCTP_CANT_STR_ASSOC: + { + log_print("Can't start SCTP association - retrying"); + sctp_init_failed(); + } + break; + + default: + log_print("unexpected SCTP assoc change id=%d state=%d", + (int)sn->sn_assoc_change.sac_assoc_id, + sn->sn_assoc_change.sac_state); + } + } +} + +/* Data received from remote end */ +static int receive_from_sock(struct connection *con) +{ + int ret = 0; + struct msghdr msg = {}; + struct kvec iov[2]; + unsigned len; + int r; + int call_again_soon = 0; + int nvec; + char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + + mutex_lock(&con->sock_mutex); + + if (con->sock == NULL) { + ret = -EAGAIN; + goto out_close; + } + + if (con->rx_page == NULL) { + /* + * This doesn't need to be atomic, but I think it should + * improve performance if it is. + */ + con->rx_page = alloc_page(GFP_ATOMIC); + if (con->rx_page == NULL) + goto out_resched; + cbuf_init(&con->cb, PAGE_CACHE_SIZE); + } + + /* Only SCTP needs these really */ + memset(&incmsg, 0, sizeof(incmsg)); + msg.msg_control = incmsg; + msg.msg_controllen = sizeof(incmsg); + + /* + * iov[0] is the bit of the circular buffer between the current end + * point (cb.base + cb.len) and the end of the buffer. + */ + iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); + iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); + iov[1].iov_len = 0; + nvec = 1; + + /* + * iov[1] is the bit of the circular buffer between the start of the + * buffer and the start of the currently used section (cb.base) + */ + if (cbuf_data(&con->cb) >= con->cb.base) { + iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); + iov[1].iov_len = con->cb.base; + iov[1].iov_base = page_address(con->rx_page); + nvec = 2; + } + len = iov[0].iov_len + iov[1].iov_len; + + r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, + MSG_DONTWAIT | MSG_NOSIGNAL); + if (ret <= 0) + goto out_close; + + /* Process SCTP notifications */ + if (msg.msg_flags & MSG_NOTIFICATION) { + BUG_ON(con->nodeid != 0); + msg.msg_control = incmsg; + msg.msg_controllen = sizeof(incmsg); + + process_sctp_notification(con, &msg, + page_address(con->rx_page) + con->cb.base); + mutex_unlock(&con->sock_mutex); + return 0; + } + BUG_ON(con->nodeid == 0); + + if (ret == len) + call_again_soon = 1; + cbuf_add(&con->cb, ret); + ret = dlm_process_incoming_buffer(con->nodeid, + page_address(con->rx_page), + con->cb.base, con->cb.len, + PAGE_CACHE_SIZE); + if (ret == -EBADMSG) { + printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, " + "iov_len=%u, iov_base[0]=%p, read=%d\n", + page_address(con->rx_page), con->cb.base, con->cb.len, + len, iov[0].iov_base, r); + } + if (ret < 0) + goto out_close; + cbuf_eat(&con->cb, ret); + + if (cbuf_empty(&con->cb) && !call_again_soon) { + __free_page(con->rx_page); + con->rx_page = NULL; + } + + if (call_again_soon) + goto out_resched; + mutex_unlock(&con->sock_mutex); + return 0; + +out_resched: + if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) + queue_work(recv_workqueue, &con->rwork); + mutex_unlock(&con->sock_mutex); + return -EAGAIN; + +out_close: + mutex_unlock(&con->sock_mutex); + if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { + close_connection(con, false); + /* Reconnect when there is something to send */ + } + /* Don't return success if we really got EOF */ + if (ret == 0) + ret = -EAGAIN; + + return ret; +} + +/* Listening socket is busy, accept a connection */ +static int tcp_accept_from_sock(struct connection *con) +{ + int result; + struct sockaddr_storage peeraddr; + struct socket *newsock; + int len; + int nodeid; + struct connection *newcon; + struct connection *addcon; + + memset(&peeraddr, 0, sizeof(peeraddr)); + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, + IPPROTO_TCP, &newsock); + if (result < 0) + return -ENOMEM; + + mutex_lock_nested(&con->sock_mutex, 0); + + result = -ENOTCONN; + if (con->sock == NULL) + goto accept_err; + + newsock->type = con->sock->type; + newsock->ops = con->sock->ops; + + result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); + if (result < 0) + goto accept_err; + + /* Get the connected socket's peer */ + memset(&peeraddr, 0, sizeof(peeraddr)); + if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, + &len, 2)) { + result = -ECONNABORTED; + goto accept_err; + } + + /* Get the new node's NODEID */ + make_sockaddr(&peeraddr, 0, &len); + if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { + printk("dlm: connect from non cluster node\n"); + sock_release(newsock); + mutex_unlock(&con->sock_mutex); + return -1; + } + + log_print("got connection from %d", nodeid); + + /* Check to see if we already have a connection to this node. This + * could happen if the two nodes initiate a connection at roughly + * the same time and the connections cross on the wire. + * In this case we store the incoming one in "othercon" + */ + newcon = nodeid2con(nodeid, GFP_KERNEL); + if (!newcon) { + result = -ENOMEM; + goto accept_err; + } + mutex_lock_nested(&newcon->sock_mutex, 1); + if (newcon->sock) { + struct connection *othercon = newcon->othercon; + + if (!othercon) { + othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); + if (!othercon) { + printk("dlm: failed to allocate incoming socket\n"); + mutex_unlock(&newcon->sock_mutex); + result = -ENOMEM; + goto accept_err; + } + othercon->nodeid = nodeid; + othercon->rx_action = receive_from_sock; + mutex_init(&othercon->sock_mutex); + INIT_WORK(&othercon->swork, process_send_sockets); + INIT_WORK(&othercon->rwork, process_recv_sockets); + set_bit(CF_IS_OTHERCON, &othercon->flags); + newcon->othercon = othercon; + } + othercon->sock = newsock; + newsock->sk->sk_user_data = othercon; + add_sock(newsock, othercon); + addcon = othercon; + } + else { + newsock->sk->sk_user_data = newcon; + newcon->rx_action = receive_from_sock; + add_sock(newsock, newcon); + addcon = newcon; + } + + mutex_unlock(&newcon->sock_mutex); + + /* + * Add it to the active queue in case we got data + * beween processing the accept adding the socket + * to the read_sockets list + */ + if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) + queue_work(recv_workqueue, &addcon->rwork); + mutex_unlock(&con->sock_mutex); + + return 0; + +accept_err: + mutex_unlock(&con->sock_mutex); + sock_release(newsock); + + if (result != -EAGAIN) + printk("dlm: error accepting connection from node: %d\n", result); + return result; +} + +static void free_entry(struct writequeue_entry *e) +{ + __free_page(e->page); + kfree(e); +} + +/* Initiate an SCTP association. + This is a special case of send_to_sock() in that we don't yet have a + peeled-off socket for this association, so we use the listening socket + and add the primary IP address of the remote node. + */ +static void sctp_init_assoc(struct connection *con) +{ + struct sockaddr_storage rem_addr; + char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + struct msghdr outmessage; + struct cmsghdr *cmsg; + struct sctp_sndrcvinfo *sinfo; + struct connection *base_con; + struct writequeue_entry *e; + int len, offset; + int ret; + int addrlen; + struct kvec iov[1]; + + if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) + return; + + if (con->retries++ > MAX_CONNECT_RETRIES) + return; + + log_print("Initiating association with node %d", con->nodeid); + + if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { + log_print("no address for nodeid %d", con->nodeid); + return; + } + base_con = nodeid2con(0, 0); + BUG_ON(base_con == NULL); + + make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); + + outmessage.msg_name = &rem_addr; + outmessage.msg_namelen = addrlen; + outmessage.msg_control = outcmsg; + outmessage.msg_controllen = sizeof(outcmsg); + outmessage.msg_flags = MSG_EOR; + + spin_lock(&con->writequeue_lock); + e = list_entry(con->writequeue.next, struct writequeue_entry, + list); + + BUG_ON((struct list_head *) e == &con->writequeue); + + len = e->len; + offset = e->offset; + spin_unlock(&con->writequeue_lock); + kmap(e->page); + + /* Send the first block off the write queue */ + iov[0].iov_base = page_address(e->page)+offset; + iov[0].iov_len = len; + + cmsg = CMSG_FIRSTHDR(&outmessage); + cmsg->cmsg_level = IPPROTO_SCTP; + cmsg->cmsg_type = SCTP_SNDRCV; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); + sinfo = CMSG_DATA(cmsg); + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); + sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); + outmessage.msg_controllen = cmsg->cmsg_len; + + ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); + if (ret < 0) { + log_print("Send first packet to node %d failed: %d", con->nodeid, ret); + + /* Try again later */ + clear_bit(CF_CONNECT_PENDING, &con->flags); + clear_bit(CF_INIT_PENDING, &con->flags); + } + else { + spin_lock(&con->writequeue_lock); + e->offset += ret; + e->len -= ret; + + if (e->len == 0 && e->users == 0) { + list_del(&e->list); + kunmap(e->page); + free_entry(e); + } + spin_unlock(&con->writequeue_lock); + } +} + +/* Connect a new socket to its peer */ +static void tcp_connect_to_sock(struct connection *con) +{ + int result = -EHOSTUNREACH; + struct sockaddr_storage saddr; + int addr_len; + struct socket *sock; + + if (con->nodeid == 0) { + log_print("attempt to connect sock 0 foiled"); + return; + } + + mutex_lock(&con->sock_mutex); + if (con->retries++ > MAX_CONNECT_RETRIES) + goto out; + + /* Some odd races can cause double-connects, ignore them */ + if (con->sock) { + result = 0; + goto out; + } + + /* Create a socket to communicate with */ + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, + IPPROTO_TCP, &sock); + if (result < 0) + goto out_err; + + memset(&saddr, 0, sizeof(saddr)); + if (dlm_nodeid_to_addr(con->nodeid, &saddr)) + goto out_err; + + sock->sk->sk_user_data = con; + con->rx_action = receive_from_sock; + con->connect_action = tcp_connect_to_sock; + add_sock(sock, con); + + make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); + + log_print("connecting to %d", con->nodeid); + result = + sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, + O_NONBLOCK); + if (result == -EINPROGRESS) + result = 0; + if (result == 0) + goto out; + +out_err: + if (con->sock) { + sock_release(con->sock); + con->sock = NULL; + } + /* + * Some errors are fatal and this list might need adjusting. For other + * errors we try again until the max number of retries is reached. + */ + if (result != -EHOSTUNREACH && result != -ENETUNREACH && + result != -ENETDOWN && result != EINVAL + && result != -EPROTONOSUPPORT) { + lowcomms_connect_sock(con); + result = 0; + } +out: + mutex_unlock(&con->sock_mutex); + return; +} + +static struct socket *tcp_create_listen_sock(struct connection *con, + struct sockaddr_storage *saddr) +{ + struct socket *sock = NULL; + int result = 0; + int one = 1; + int addr_len; + + if (dlm_local_addr[0]->ss_family == AF_INET) + addr_len = sizeof(struct sockaddr_in); + else + addr_len = sizeof(struct sockaddr_in6); + + /* Create a socket to communicate with */ + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); + if (result < 0) { + printk("dlm: Can't create listening comms socket\n"); + goto create_out; + } + + result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&one, sizeof(one)); + + if (result < 0) { + printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n", + result); + } + sock->sk->sk_user_data = con; + con->rx_action = tcp_accept_from_sock; + con->connect_action = tcp_connect_to_sock; + con->sock = sock; + + /* Bind to our port */ + make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); + result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); + if (result < 0) { + printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port); + sock_release(sock); + sock = NULL; + con->sock = NULL; + goto create_out; + } + result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, + (char *)&one, sizeof(one)); + if (result < 0) { + printk("dlm: Set keepalive failed: %d\n", result); + } + + result = sock->ops->listen(sock, 5); + if (result < 0) { + printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port); + sock_release(sock); + sock = NULL; + goto create_out; + } + +create_out: + return sock; +} + +/* Get local addresses */ +static void init_local(void) +{ + struct sockaddr_storage sas, *addr; + int i; + + for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { + if (dlm_our_addr(&sas, i)) + break; + + addr = kmalloc(sizeof(*addr), GFP_KERNEL); + if (!addr) + break; + memcpy(addr, &sas, sizeof(*addr)); + dlm_local_addr[dlm_local_count++] = addr; + } +} + +/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */ +static int add_sctp_bind_addr(struct connection *sctp_con, struct sockaddr_storage *addr, int addr_len, int num) +{ + int result = 0; + + if (num == 1) + result = kernel_bind(sctp_con->sock, + (struct sockaddr *) addr, + addr_len); + else + result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, + SCTP_SOCKOPT_BINDX_ADD, + (char *)addr, addr_len); + + if (result < 0) + log_print("Can't bind to port %d addr number %d", + dlm_config.ci_tcp_port, num); + + return result; +} + +/* Initialise SCTP socket and bind to all interfaces */ +static int sctp_listen_for_all(void) +{ + struct socket *sock = NULL; + struct sockaddr_storage localaddr; + struct sctp_event_subscribe subscribe; + int result = -EINVAL, num = 1, i, addr_len; + struct connection *con = nodeid2con(0, GFP_KERNEL); + int bufsize = NEEDED_RMEM; + + if (!con) + return -ENOMEM; + + log_print("Using SCTP for communications"); + + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, + IPPROTO_SCTP, &sock); + if (result < 0) { + log_print("Can't create comms socket, check SCTP is loaded"); + goto out; + } + + /* Listen for events */ + memset(&subscribe, 0, sizeof(subscribe)); + subscribe.sctp_data_io_event = 1; + subscribe.sctp_association_event = 1; + subscribe.sctp_send_failure_event = 1; + subscribe.sctp_shutdown_event = 1; + subscribe.sctp_partial_delivery_event = 1; + + result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF, + (char *)&bufsize, sizeof(bufsize)); + if (result) + log_print("Error increasing buffer space on socket: %d", result); + + result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, + (char *)&subscribe, sizeof(subscribe)); + if (result < 0) { + log_print("Failed to set SCTP_EVENTS on socket: result=%d", + result); + goto create_delsock; + } + + /* Init con struct */ + sock->sk->sk_user_data = con; + con->sock = sock; + con->sock->sk->sk_data_ready = lowcomms_data_ready; + con->rx_action = receive_from_sock; + con->connect_action = sctp_init_assoc; + + /* Bind to all interfaces. */ + for (i = 0; i < dlm_local_count; i++) { + memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); + make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); + + result = add_sctp_bind_addr(con, &localaddr, addr_len, num); + if (result) + goto create_delsock; + ++num; + } + + result = sock->ops->listen(sock, 5); + if (result < 0) { + log_print("Can't set socket listening"); + goto create_delsock; + } + + return 0; + +create_delsock: + sock_release(sock); + con->sock = NULL; +out: + return result; +} + +static int tcp_listen_for_all(void) +{ + struct socket *sock = NULL; + struct connection *con = nodeid2con(0, GFP_KERNEL); + int result = -EINVAL; + + if (!con) + return -ENOMEM; + + /* We don't support multi-homed hosts */ + if (dlm_local_addr[1] != NULL) { + log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); + return -EINVAL; + } + + log_print("Using TCP for communications"); + + set_bit(CF_IS_OTHERCON, &con->flags); + + sock = tcp_create_listen_sock(con, dlm_local_addr[0]); + if (sock) { + add_sock(sock, con); + result = 0; + } + else { + result = -EADDRINUSE; + } + + return result; +} + + + +static struct writequeue_entry *new_writequeue_entry(struct connection *con, + gfp_t allocation) +{ + struct writequeue_entry *entry; + + entry = kmalloc(sizeof(struct writequeue_entry), allocation); + if (!entry) + return NULL; + + entry->page = alloc_page(allocation); + if (!entry->page) { + kfree(entry); + return NULL; + } + + entry->offset = 0; + entry->len = 0; + entry->end = 0; + entry->users = 0; + entry->con = con; + + return entry; +} + +void *dlm_lowcomms_get_buffer(int nodeid, int len, + gfp_t allocation, char **ppc) +{ + struct connection *con; + struct writequeue_entry *e; + int offset = 0; + int users = 0; + + con = nodeid2con(nodeid, allocation); + if (!con) + return NULL; + + spin_lock(&con->writequeue_lock); + e = list_entry(con->writequeue.prev, struct writequeue_entry, list); + if ((&e->list == &con->writequeue) || + (PAGE_CACHE_SIZE - e->end < len)) { + e = NULL; + } else { + offset = e->end; + e->end += len; + users = e->users++; + } + spin_unlock(&con->writequeue_lock); + + if (e) { + got_one: + if (users == 0) + kmap(e->page); + *ppc = page_address(e->page) + offset; + return e; + } + + e = new_writequeue_entry(con, allocation); + if (e) { + spin_lock(&con->writequeue_lock); + offset = e->end; + e->end += len; + users = e->users++; + list_add_tail(&e->list, &con->writequeue); + spin_unlock(&con->writequeue_lock); + goto got_one; + } + return NULL; +} + +void dlm_lowcomms_commit_buffer(void *mh) +{ + struct writequeue_entry *e = (struct writequeue_entry *)mh; + struct connection *con = e->con; + int users; + + spin_lock(&con->writequeue_lock); + users = --e->users; + if (users) + goto out; + e->len = e->end - e->offset; + kunmap(e->page); + spin_unlock(&con->writequeue_lock); + + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { + queue_work(send_workqueue, &con->swork); + } + return; + +out: + spin_unlock(&con->writequeue_lock); + return; +} + +/* Send a message */ +static void send_to_sock(struct connection *con) +{ + int ret = 0; + ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); + const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; + struct writequeue_entry *e; + int len, offset; + + mutex_lock(&con->sock_mutex); + if (con->sock == NULL) + goto out_connect; + + sendpage = con->sock->ops->sendpage; + + spin_lock(&con->writequeue_lock); + for (;;) { + e = list_entry(con->writequeue.next, struct writequeue_entry, + list); + if ((struct list_head *) e == &con->writequeue) + break; + + len = e->len; + offset = e->offset; + BUG_ON(len == 0 && e->users == 0); + spin_unlock(&con->writequeue_lock); + kmap(e->page); + + ret = 0; + if (len) { + ret = sendpage(con->sock, e->page, offset, len, + msg_flags); + if (ret == -EAGAIN || ret == 0) + goto out; + if (ret <= 0) + goto send_error; + } + else { + /* Don't starve people filling buffers */ + cond_resched(); + } + + spin_lock(&con->writequeue_lock); + e->offset += ret; + e->len -= ret; + + if (e->len == 0 && e->users == 0) { + list_del(&e->list); + kunmap(e->page); + free_entry(e); + continue; + } + } + spin_unlock(&con->writequeue_lock); +out: + mutex_unlock(&con->sock_mutex); + return; + +send_error: + mutex_unlock(&con->sock_mutex); + close_connection(con, false); + lowcomms_connect_sock(con); + return; + +out_connect: + mutex_unlock(&con->sock_mutex); + if (!test_bit(CF_INIT_PENDING, &con->flags)) + lowcomms_connect_sock(con); + return; +} + +static void clean_one_writequeue(struct connection *con) +{ + struct list_head *list; + struct list_head *temp; + + spin_lock(&con->writequeue_lock); + list_for_each_safe(list, temp, &con->writequeue) { + struct writequeue_entry *e = + list_entry(list, struct writequeue_entry, list); + list_del(&e->list); + free_entry(e); + } + spin_unlock(&con->writequeue_lock); +} + +/* Called from recovery when it knows that a node has + left the cluster */ +int dlm_lowcomms_close(int nodeid) +{ + struct connection *con; + + log_print("closing connection to node %d", nodeid); + con = nodeid2con(nodeid, 0); + if (con) { + clean_one_writequeue(con); + close_connection(con, true); + } + return 0; +} + +/* Receive workqueue function */ +static void process_recv_sockets(struct work_struct *work) +{ + struct connection *con = container_of(work, struct connection, rwork); + int err; + + clear_bit(CF_READ_PENDING, &con->flags); + do { + err = con->rx_action(con); + } while (!err); +} + +/* Send workqueue function */ +static void process_send_sockets(struct work_struct *work) +{ + struct connection *con = container_of(work, struct connection, swork); + + if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { + con->connect_action(con); + } + clear_bit(CF_WRITE_PENDING, &con->flags); + send_to_sock(con); +} + + +/* Discard all entries on the write queues */ +static void clean_writequeues(void) +{ + int nodeid; + + for (nodeid = 1; nodeid < max_nodeid; nodeid++) { + struct connection *con = nodeid2con(nodeid, 0); + + if (con) + clean_one_writequeue(con); + } +} + +static void work_stop(void) +{ + destroy_workqueue(recv_workqueue); + destroy_workqueue(send_workqueue); +} + +static int work_start(void) +{ + int error; + recv_workqueue = create_workqueue("dlm_recv"); + error = IS_ERR(recv_workqueue); + if (error) { + log_print("can't start dlm_recv %d", error); + return error; + } + + send_workqueue = create_singlethread_workqueue("dlm_send"); + error = IS_ERR(send_workqueue); + if (error) { + log_print("can't start dlm_send %d", error); + destroy_workqueue(recv_workqueue); + return error; + } + + return 0; +} + +void dlm_lowcomms_stop(void) +{ + int i; + struct connection *con; + + /* Set all the flags to prevent any + socket activity. + */ + down(&connections_lock); + for (i = 0; i < max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (con) + con->flags |= 0xFF; + } + up(&connections_lock); + + work_stop(); + + down(&connections_lock); + clean_writequeues(); + + for (i = 0; i < max_nodeid; i++) { + con = nodeid2con(i, 0); + if (con) { + close_connection(con, true); + if (con->othercon) + kmem_cache_free(con_cache, con->othercon); + kmem_cache_free(con_cache, con); + } + } + up(&connections_lock); + kmem_cache_destroy(con_cache); +} + +int dlm_lowcomms_start(void) +{ + int error = -EINVAL; + struct connection *con; + + init_local(); + if (!dlm_local_count) { + log_print("no local IP address has been set"); + goto out; + } + + error = -ENOMEM; + con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), + __alignof__(struct connection), 0, + NULL, NULL); + if (!con_cache) + goto out; + + /* Set some sysctl minima */ + if (sysctl_rmem_max < NEEDED_RMEM) + sysctl_rmem_max = NEEDED_RMEM; + + /* Start listening */ + if (dlm_config.ci_protocol == 0) + error = tcp_listen_for_all(); + else + error = sctp_listen_for_all(); + if (error) + goto fail_unlisten; + + error = work_start(); + if (error) + goto fail_unlisten; + + return 0; + +fail_unlisten: + con = nodeid2con(0,0); + if (con) { + close_connection(con, false); + kmem_cache_free(con_cache, con); + } + kmem_cache_destroy(con_cache); + +out: + return error; +} -- cgit v1.2.3 From 8fa1de386f4d72f0710b389ccf96308fef87df78 Mon Sep 17 00:00:00 2001 From: Adrian Bunk Date: Wed, 4 Apr 2007 17:25:29 +0200 Subject: [DLM] fs/dlm/ast.c should #include "ast.h" Every file should include the headers containing the prototypes for it's global functions. Signed-off-by: Adrian Bunk Signed-off-by: David Teigland Signed-off-by: Steven Whitehouse --- fs/dlm/ast.c | 1 + 1 file changed, 1 insertion(+) (limited to 'fs') diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index f91d39cb1e0..6308122890c 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -14,6 +14,7 @@ #include "dlm_internal.h" #include "lock.h" #include "user.h" +#include "ast.h" #define WAKE_ASTS 0 -- cgit v1.2.3 From 7a0079d9e3fe8826475a08785f3d348c4b509774 Mon Sep 17 00:00:00 2001 From: Robert Peterson Date: Tue, 17 Apr 2007 11:37:11 -0500 Subject: [GFS2] bz 236008: Kernel gpf doing cat /debugfs/gfs2/xxx (lock dump) This is for Bugzilla Bug 236008: Kernel gpf doing cat /debugfs/gfs2/xxx (lock dump) seen at the "gfs2 summit". This also fixes the bug that caused garbage to be printed by the "initialized at" field. I apologize for the kludge, but that code will all be ripped out anyway when the official sprint_symbol function becomes available in the Linux kernel. I also changed some formatting so that spaces are replaced by proper tabs. Signed-off-by: Robert Peterson Signed-off-by: Steven Whitehouse --- fs/gfs2/glock.c | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index d2e3094c40f..b075f9359c6 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -1765,15 +1765,20 @@ static void gfs2_print_symbol(struct glock_iter *gi, const char *fmt, { /* when sprint_symbol becomes available in the new kernel, replace this */ /* function with: - char buffer[KSYM_SYMBOL_LEN]; + char buffer[KSYM_SYMBOL_LEN]; - sprint_symbol(buffer, address); - print_dbg(gi, fmt, buffer); + sprint_symbol(buffer, address); + print_dbg(gi, fmt, buffer); */ - if (gi) - print_dbg(gi, fmt, address); - else - print_symbol(fmt, address); + char buffer[256]; + + if (gi) { + memset(buffer, 0, sizeof(buffer)); + sprintf(buffer, "%p", address); + print_dbg(gi, fmt, buffer); + } + else + print_symbol(fmt, address); } /** @@ -1993,14 +1998,19 @@ int __init gfs2_glock_init(void) static int gfs2_glock_iter_next(struct glock_iter *gi) { + read_lock(gl_lock_addr(gi->hash)); while (1) { if (!gi->hb_list) { /* If we don't have a hash bucket yet */ gi->hb_list = &gl_hash_table[gi->hash].hb_list; if (hlist_empty(gi->hb_list)) { + read_unlock(gl_lock_addr(gi->hash)); gi->hash++; + read_lock(gl_lock_addr(gi->hash)); gi->hb_list = NULL; - if (gi->hash >= GFS2_GL_HASH_SIZE) + if (gi->hash >= GFS2_GL_HASH_SIZE) { + read_unlock(gl_lock_addr(gi->hash)); return 1; + } else continue; } @@ -2011,7 +2021,9 @@ static int gfs2_glock_iter_next(struct glock_iter *gi) } } else { if (gi->gl->gl_list.next == NULL) { + read_unlock(gl_lock_addr(gi->hash)); gi->hash++; + read_lock(gl_lock_addr(gi->hash)); gi->hb_list = NULL; continue; } @@ -2021,6 +2033,7 @@ static int gfs2_glock_iter_next(struct glock_iter *gi) if (gi->gl) break; } + read_unlock(gl_lock_addr(gi->hash)); return 0; } -- cgit v1.2.3 From bdd19a22f85a7039e01accd8717eaec4addd9dfd Mon Sep 17 00:00:00 2001 From: Steven Whitehouse Date: Wed, 18 Apr 2007 09:38:42 +0100 Subject: [GFS2] Patch to detect corrupt number of dir entries in leaf and/or inode blocks This patch detects when the number of entries in a leaf block or inode block (in the case of stuffed directories) is corrupt and informs the user. It prevents us from running off the end of the array thats been allocated for the sorting in this case, Signed-off-by: Steven Whitehouse --- fs/gfs2/dir.c | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/dir.c b/fs/gfs2/dir.c index 82a1ac7895a..6c3ed7674a9 100644 --- a/fs/gfs2/dir.c +++ b/fs/gfs2/dir.c @@ -1262,9 +1262,10 @@ static int gfs2_dir_read_leaf(struct inode *inode, u64 *offset, void *opaque, u64 leaf_no) { struct gfs2_inode *ip = GFS2_I(inode); + struct gfs2_sbd *sdp = GFS2_SB(inode); struct buffer_head *bh; struct gfs2_leaf *lf; - unsigned entries = 0; + unsigned entries = 0, entries2 = 0; unsigned leaves = 0; const struct gfs2_dirent **darr, *dent; struct dirent_gather g; @@ -1290,7 +1291,13 @@ static int gfs2_dir_read_leaf(struct inode *inode, u64 *offset, void *opaque, return 0; error = -ENOMEM; - larr = vmalloc((leaves + entries) * sizeof(void *)); + /* + * The extra 99 entries are not normally used, but are a buffer + * zone in case the number of entries in the leaf is corrupt. + * 99 is the maximum number of entries that can fit in a single + * leaf block. + */ + larr = vmalloc((leaves + entries + 99) * sizeof(void *)); if (!larr) goto out; darr = (const struct gfs2_dirent **)(larr + leaves); @@ -1305,10 +1312,18 @@ static int gfs2_dir_read_leaf(struct inode *inode, u64 *offset, void *opaque, lf = (struct gfs2_leaf *)bh->b_data; lfn = be64_to_cpu(lf->lf_next); if (lf->lf_entries) { + entries2 += be16_to_cpu(lf->lf_entries); dent = gfs2_dirent_scan(inode, bh->b_data, bh->b_size, gfs2_dirent_gather, NULL, &g); error = PTR_ERR(dent); - if (IS_ERR(dent)) { + if (IS_ERR(dent)) + goto out_kfree; + if (entries2 != g.offset) { + fs_warn(sdp, "Number of entries corrupt in dir leaf %llu, " + "entries2 (%u) != g.offset (%u)\n", + (u64)bh->b_blocknr, entries2, g.offset); + + error = -EIO; goto out_kfree; } error = 0; @@ -1318,6 +1333,7 @@ static int gfs2_dir_read_leaf(struct inode *inode, u64 *offset, void *opaque, } } while(lfn); + BUG_ON(entries2 != entries); error = do_filldir_main(ip, offset, opaque, filldir, darr, entries, copied); out_kfree: @@ -1401,6 +1417,7 @@ int gfs2_dir_read(struct inode *inode, u64 *offset, void *opaque, filldir_t filldir) { struct gfs2_inode *dip = GFS2_I(inode); + struct gfs2_sbd *sdp = GFS2_SB(inode); struct dirent_gather g; const struct gfs2_dirent **darr, *dent; struct buffer_head *dibh; @@ -1423,8 +1440,8 @@ int gfs2_dir_read(struct inode *inode, u64 *offset, void *opaque, return error; error = -ENOMEM; - darr = kmalloc(dip->i_di.di_entries * sizeof(struct gfs2_dirent *), - GFP_KERNEL); + /* 96 is max number of dirents which can be stuffed into an inode */ + darr = kmalloc(96 * sizeof(struct gfs2_dirent *), GFP_KERNEL); if (darr) { g.pdent = darr; g.offset = 0; @@ -1434,6 +1451,14 @@ int gfs2_dir_read(struct inode *inode, u64 *offset, void *opaque, error = PTR_ERR(dent); goto out; } + if (dip->i_di.di_entries != g.offset) { + fs_warn(sdp, "Number of entries corrupt in dir %llu, " + "ip->i_di.di_entries (%u) != g.offset (%u)\n", + dip->i_num.no_addr, dip->i_di.di_entries, + g.offset); + error = -EIO; + goto out; + } error = do_filldir_main(dip, offset, opaque, filldir, darr, dip->i_di.di_entries, &copied); out: -- cgit v1.2.3 From 5f8820960cf4fb621483d4a37c24939ad831bfe7 Mon Sep 17 00:00:00 2001 From: Robert Peterson Date: Wed, 18 Apr 2007 11:41:11 -0500 Subject: [GFS2] lockdump improvements The patch below consists of the following changes (in code order): 1. I fixed a minor compiler warning regarding the printing of a kernel symbol address. 2. I implemented a suggestion from Dave Teigland that moves the debugfs information for gfs2 into a subdirectory so we can easily expand our use of debugfs in the future. The current code keeps the glock information in: /debug/gfs2/ With the patch, the new code keeps the glock information in: /debug/gfs2//glock That will allow us to create more debugfs files in the future. 3. This fixes a bug whereby a failed mount attempt causes the debugfs file to not be deleted. Failed mount attempts should always clean up after themselves, including deleting the debugfs file and/or directory. Signed-off-by: Bob Peterson Signed-off-by: Steven Whitehouse --- fs/gfs2/glock.c | 26 ++++++++++++++++++-------- fs/gfs2/incore.h | 3 ++- fs/gfs2/ops_fstype.c | 1 + 3 files changed, 21 insertions(+), 9 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index b075f9359c6..7988715b7a0 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -1774,7 +1774,7 @@ static void gfs2_print_symbol(struct glock_iter *gi, const char *fmt, if (gi) { memset(buffer, 0, sizeof(buffer)); - sprintf(buffer, "%p", address); + sprintf(buffer, "0x%08lx", address); print_dbg(gi, fmt, buffer); } else @@ -2146,11 +2146,14 @@ static const struct file_operations gfs2_debug_fops = { int gfs2_create_debugfs_file(struct gfs2_sbd *sdp) { - sdp->debugfs_dentry = debugfs_create_file(sdp->sd_table_name, - S_IFREG | S_IRUGO, - gfs2_root, sdp, - &gfs2_debug_fops); - if (!sdp->debugfs_dentry) + sdp->debugfs_dir = debugfs_create_dir(sdp->sd_table_name, gfs2_root); + if (!sdp->debugfs_dir) + return -ENOMEM; + sdp->debugfs_dentry_glocks = debugfs_create_file("glocks", + S_IFREG | S_IRUGO, + sdp->debugfs_dir, sdp, + &gfs2_debug_fops); + if (!sdp->debugfs_dentry_glocks) return -ENOMEM; return 0; @@ -2158,8 +2161,14 @@ int gfs2_create_debugfs_file(struct gfs2_sbd *sdp) void gfs2_delete_debugfs_file(struct gfs2_sbd *sdp) { - if (sdp && sdp->debugfs_dentry) - debugfs_remove(sdp->debugfs_dentry); + if (sdp && sdp->debugfs_dir) { + if (sdp->debugfs_dentry_glocks) { + debugfs_remove(sdp->debugfs_dentry_glocks); + sdp->debugfs_dentry_glocks = NULL; + } + debugfs_remove(sdp->debugfs_dir); + sdp->debugfs_dir = NULL; + } } int gfs2_register_debugfs(void) @@ -2171,4 +2180,5 @@ int gfs2_register_debugfs(void) void gfs2_unregister_debugfs(void) { debugfs_remove(gfs2_root); + gfs2_root = NULL; } diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h index fdf04705906..d995441373a 100644 --- a/fs/gfs2/incore.h +++ b/fs/gfs2/incore.h @@ -609,7 +609,8 @@ struct gfs2_sbd { unsigned long sd_last_warning; struct vfsmount *sd_gfs2mnt; - struct dentry *debugfs_dentry; /* for debugfs */ + struct dentry *debugfs_dir; /* debugfs directory */ + struct dentry *debugfs_dentry_glocks; /* for debugfs */ }; #endif /* __INCORE_DOT_H__ */ diff --git a/fs/gfs2/ops_fstype.c b/fs/gfs2/ops_fstype.c index ecb8b18de0e..2c5f8e7def0 100644 --- a/fs/gfs2/ops_fstype.c +++ b/fs/gfs2/ops_fstype.c @@ -756,6 +756,7 @@ fail_lm: fail_sys: gfs2_sys_fs_del(sdp); fail: + gfs2_delete_debugfs_file(sdp); kfree(sdp); sb->s_fs_info = NULL; return error; -- cgit v1.2.3 From 7d3c1feb80913ba4253c3517d48b9b3741c44fc9 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Thu, 19 Apr 2007 10:30:41 -0500 Subject: [DLM] fix mode munging There are flags to enable two specialized features in the dlm: 1. CONVDEADLK causes the dlm to resolve conversion deadlocks internally by changing the granted mode of locks to NL. 2. ALTPR/ALTCW cause the dlm to change the requested mode of locks to PR or CW to grant them if the normal requested mode can't be granted. GFS direct i/o exercises both of these features, especially when mixed with buffered i/o. The dlm has problems with them. The first problem is on the master node. If it demotes a lock as a part of converting it, the actual step of converting the lock isn't being done after the demotion, the lock is just left sitting on the granted queue with a granted mode of NL. I think the mistaken assumption was that the call to grant_pending_locks() would grant it, but that function naturally doesn't look at locks on the granted queue. The second problem is on the process node. If the master either demotes or gives an altmode, the munging of the gr/rq modes is never done in the process copy of the lock, leaving the master/process copies out of sync. Signed-off-by: David Teigland Signed-off-by: Steven Whitehouse --- fs/dlm/lock.c | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 2 deletions(-) (limited to 'fs') diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index eac54d230fd..d8d6e729f96 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -224,6 +224,16 @@ static inline int is_demoted(struct dlm_lkb *lkb) return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); } +static inline int is_altmode(struct dlm_lkb *lkb) +{ + return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); +} + +static inline int is_granted(struct dlm_lkb *lkb) +{ + return (lkb->lkb_status == DLM_LKSTS_GRANTED); +} + static inline int is_remote(struct dlm_rsb *r) { DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); @@ -1191,6 +1201,50 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) queue_cast(r, lkb, 0); } +/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to + change the granted/requested modes. We're munging things accordingly in + the process copy. + CONVDEADLK: our grmode may have been forced down to NL to resolve a + conversion deadlock + ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become + compatible with other granted locks */ + +static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + if (ms->m_type != DLM_MSG_CONVERT_REPLY) { + log_print("munge_demoted %x invalid reply type %d", + lkb->lkb_id, ms->m_type); + return; + } + + if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { + log_print("munge_demoted %x invalid modes gr %d rq %d", + lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); + return; + } + + lkb->lkb_grmode = DLM_LOCK_NL; +} + +static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + if (ms->m_type != DLM_MSG_REQUEST_REPLY && + ms->m_type != DLM_MSG_GRANT) { + log_print("munge_altmode %x invalid reply type %d", + lkb->lkb_id, ms->m_type); + return; + } + + if (lkb->lkb_exflags & DLM_LKF_ALTPR) + lkb->lkb_rqmode = DLM_LOCK_PR; + else if (lkb->lkb_exflags & DLM_LKF_ALTCW) + lkb->lkb_rqmode = DLM_LOCK_CW; + else { + log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); + dlm_print_lkb(lkb); + } +} + static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) { struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, @@ -1965,9 +2019,24 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) goto out; } - if (can_be_queued(lkb)) { - if (is_demoted(lkb)) + /* is_demoted() means the can_be_granted() above set the grmode + to NL, and left us on the granted queue. This auto-demotion + (due to CONVDEADLK) might mean other locks, and/or this lock, are + now grantable. We have to try to grant other converting locks + before we try again to grant this one. */ + + if (is_demoted(lkb)) { + grant_pending_convert(r, DLM_LOCK_IV); + if (_can_be_granted(r, lkb, 1)) { + grant_lock(r, lkb); + queue_cast(r, lkb, 0); grant_pending_locks(r); + goto out; + } + /* else fall through and move to convert queue */ + } + + if (can_be_queued(lkb)) { error = -EINPROGRESS; del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); @@ -2908,6 +2977,8 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) lock_rsb(r); receive_flags_reply(lkb, ms); + if (is_altmode(lkb)) + munge_altmode(lkb, ms); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); @@ -3038,6 +3109,8 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) /* request was queued or granted on remote master */ receive_flags_reply(lkb, ms); lkb->lkb_remid = ms->m_lkid; + if (is_altmode(lkb)) + munge_altmode(lkb, ms); if (result) add_lkb(r, lkb, DLM_LKSTS_WAITING); else { @@ -3101,6 +3174,9 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, case -EINPROGRESS: /* convert was queued on remote master */ + receive_flags_reply(lkb, ms); + if (is_demoted(lkb)) + munge_demoted(lkb, ms); del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); break; @@ -3108,6 +3184,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, case 0: /* convert was granted on remote master */ receive_flags_reply(lkb, ms); + if (is_demoted(lkb)) + munge_demoted(lkb, ms); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); break; -- cgit v1.2.3 From 2439fe50724e8693e8b933b3f8125d870bfbdb25 Mon Sep 17 00:00:00 2001 From: Josef Bacik Date: Thu, 19 Apr 2007 17:59:05 -0400 Subject: [DLM] Fix dlm_lowcoms_stop hang When you attempt to release a lockspace in DLM, it will hang trying to down a semaphore that has already been downed. The attached patch fixes the problem. Signed-off-by: Josef Bacik Signed-off-by: Steven Whitehouse Cc: Patrick Caulfield --- fs/dlm/lowcomms.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 76399b7819b..2b32f3c82fe 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1407,7 +1407,7 @@ void dlm_lowcomms_stop(void) clean_writequeues(); for (i = 0; i < max_nodeid; i++) { - con = nodeid2con(i, 0); + con = __nodeid2con(i, 0); if (con) { close_connection(con, true); if (con->othercon) -- cgit v1.2.3 From 30d3a2373f171e62e4032819f55fed2ec887d0b8 Mon Sep 17 00:00:00 2001 From: Patrick Caulfield Date: Mon, 23 Apr 2007 16:26:21 +0100 Subject: [DLM] Lowcomms nodeid range & initialisation fixes Fix a few range & initialization bugs in lowcomms. - max_nodeid is really the highest nodeid encountered, so all loops must include it in their iterations. - clean dlm_local_count & connection_idr so we can do a clean restart. - Remove a spurious BUG_ON Signed-Off-By: Patrick Caulfield Signed-off-by: Steven Whitehouse --- fs/dlm/lowcomms.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'fs') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 2b32f3c82fe..5c33233bc2d 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -219,7 +219,7 @@ static struct connection *assoc2con(int assoc_id) struct connection *con; down(&connections_lock); - for (i=0; isctp_assoc == assoc_id) { up(&connections_lock); @@ -467,12 +467,10 @@ static void process_sctp_notification(struct connection *con, struct msghdr *msg parg.associd = sn->sn_assoc_change.sac_assoc_id; ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, SCTP_SOCKOPT_PEELOFF, (void *)&parg, &parglen); - if (ret < 0) { + if (ret) { log_print("Can't peel off a socket for connection %d to node %d: err=%d\n", parg.associd, nodeid, ret); - return; } - file = fget(parg.sd); new_con->sock = SOCKET_I(file->f_dentry->d_inode); add_sock(new_con->sock, new_con); @@ -585,7 +583,6 @@ static int receive_from_sock(struct connection *con) /* Process SCTP notifications */ if (msg.msg_flags & MSG_NOTIFICATION) { - BUG_ON(con->nodeid != 0); msg.msg_control = incmsg; msg.msg_controllen = sizeof(incmsg); @@ -984,6 +981,7 @@ static void init_local(void) struct sockaddr_storage sas, *addr; int i; + dlm_local_count = 0; for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { if (dlm_our_addr(&sas, i)) break; @@ -1350,8 +1348,8 @@ static void clean_writequeues(void) { int nodeid; - for (nodeid = 1; nodeid < max_nodeid; nodeid++) { - struct connection *con = nodeid2con(nodeid, 0); + for (nodeid = 1; nodeid <= max_nodeid; nodeid++) { + struct connection *con = __nodeid2con(nodeid, 0); if (con) clean_one_writequeue(con); @@ -1394,7 +1392,7 @@ void dlm_lowcomms_stop(void) socket activity. */ down(&connections_lock); - for (i = 0; i < max_nodeid; i++) { + for (i = 0; i <= max_nodeid; i++) { con = __nodeid2con(i, 0); if (con) con->flags |= 0xFF; @@ -1406,7 +1404,7 @@ void dlm_lowcomms_stop(void) down(&connections_lock); clean_writequeues(); - for (i = 0; i < max_nodeid; i++) { + for (i = 0; i <= max_nodeid; i++) { con = __nodeid2con(i, 0); if (con) { close_connection(con, true); @@ -1415,8 +1413,10 @@ void dlm_lowcomms_stop(void) kmem_cache_free(con_cache, con); } } + max_nodeid = 0; up(&connections_lock); kmem_cache_destroy(con_cache); + idr_init(&connections_idr); } int dlm_lowcomms_start(void) -- cgit v1.2.3 From 476c006be009d4121e401a9e9f49a3362a7a272f Mon Sep 17 00:00:00 2001 From: Josef Bacik Date: Mon, 23 Apr 2007 11:55:39 -0400 Subject: [GFS2] use lib/parser for parsing mount options This patch converts the mount option parsing to use the kernels lib/parser stuff like all of the other filesystems. I tested this and it works well. Thank you, Signed-off-by: Josef Bacik Signed-off-by: Steven Whitehouse --- fs/gfs2/mount.c | 239 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 143 insertions(+), 96 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/mount.c b/fs/gfs2/mount.c index 32caecd2030..4864659555d 100644 --- a/fs/gfs2/mount.c +++ b/fs/gfs2/mount.c @@ -13,6 +13,7 @@ #include #include #include +#include #include "gfs2.h" #include "incore.h" @@ -20,6 +21,52 @@ #include "sys.h" #include "util.h" +enum { + Opt_lockproto, + Opt_locktable, + Opt_hostdata, + Opt_spectator, + Opt_ignore_local_fs, + Opt_localflocks, + Opt_localcaching, + Opt_debug, + Opt_nodebug, + Opt_upgrade, + Opt_num_glockd, + Opt_acl, + Opt_noacl, + Opt_quota_off, + Opt_quota_account, + Opt_quota_on, + Opt_suiddir, + Opt_nosuiddir, + Opt_data_writeback, + Opt_data_ordered, +}; + +static match_table_t tokens = { + {Opt_lockproto, "lockproto=%s"}, + {Opt_locktable, "locktable=%s"}, + {Opt_hostdata, "hostdata=%s"}, + {Opt_spectator, "spectator"}, + {Opt_ignore_local_fs, "ignore_local_fs"}, + {Opt_localflocks, "localflocks"}, + {Opt_localcaching, "localcaching"}, + {Opt_debug, "debug"}, + {Opt_nodebug, "nodebug"}, + {Opt_upgrade, "upgrade"}, + {Opt_num_glockd, "num_glockd=%d"}, + {Opt_acl, "acl"}, + {Opt_noacl, "noacl"}, + {Opt_quota_off, "quota=off"}, + {Opt_quota_account, "quota=account"}, + {Opt_quota_on, "quota=on"}, + {Opt_suiddir, "suiddir"}, + {Opt_nosuiddir, "nosuiddir"}, + {Opt_data_writeback, "data=writeback"}, + {Opt_data_ordered, "data=ordered"} +}; + /** * gfs2_mount_args - Parse mount options * @sdp: @@ -54,146 +101,150 @@ int gfs2_mount_args(struct gfs2_sbd *sdp, char *data_arg, int remount) process them */ for (options = data; (o = strsep(&options, ",")); ) { + int token, option; + substring_t tmp[MAX_OPT_ARGS]; + if (!*o) continue; - v = strchr(o, '='); - if (v) - *v++ = 0; + token = match_token(o, tokens, tmp); + switch (token) { + case Opt_lockproto: + v = match_strdup(&tmp[0]); + if (!v) { + fs_info(sdp, "no memory for lockproto\n"); + error = -ENOMEM; + goto out_error; + } - if (!strcmp(o, "lockproto")) { - if (!v) - goto need_value; - if (remount && strcmp(v, args->ar_lockproto)) + if (remount && strcmp(v, args->ar_lockproto)) { + kfree(v); goto cant_remount; + } + strncpy(args->ar_lockproto, v, GFS2_LOCKNAME_LEN); args->ar_lockproto[GFS2_LOCKNAME_LEN - 1] = 0; - } + kfree(v); + break; + case Opt_locktable: + v = match_strdup(&tmp[0]); + if (!v) { + fs_info(sdp, "no memory for locktable\n"); + error = -ENOMEM; + goto out_error; + } - else if (!strcmp(o, "locktable")) { - if (!v) - goto need_value; - if (remount && strcmp(v, args->ar_locktable)) + if (remount && strcmp(v, args->ar_locktable)) { + kfree(v); goto cant_remount; + } + strncpy(args->ar_locktable, v, GFS2_LOCKNAME_LEN); - args->ar_locktable[GFS2_LOCKNAME_LEN - 1] = 0; - } + args->ar_locktable[GFS2_LOCKNAME_LEN - 1] = 0; + kfree(v); + break; + case Opt_hostdata: + v = match_strdup(&tmp[0]); + if (!v) { + fs_info(sdp, "no memory for hostdata\n"); + error = -ENOMEM; + goto out_error; + } - else if (!strcmp(o, "hostdata")) { - if (!v) - goto need_value; - if (remount && strcmp(v, args->ar_hostdata)) + if (remount && strcmp(v, args->ar_hostdata)) { + kfree(v); goto cant_remount; + } + strncpy(args->ar_hostdata, v, GFS2_LOCKNAME_LEN); args->ar_hostdata[GFS2_LOCKNAME_LEN - 1] = 0; - } - - else if (!strcmp(o, "spectator")) { + kfree(v); + break; + case Opt_spectator: if (remount && !args->ar_spectator) goto cant_remount; args->ar_spectator = 1; sdp->sd_vfs->s_flags |= MS_RDONLY; - } - - else if (!strcmp(o, "ignore_local_fs")) { + break; + case Opt_ignore_local_fs: if (remount && !args->ar_ignore_local_fs) goto cant_remount; args->ar_ignore_local_fs = 1; - } - - else if (!strcmp(o, "localflocks")) { + break; + case Opt_localflocks: if (remount && !args->ar_localflocks) goto cant_remount; args->ar_localflocks = 1; - } - - else if (!strcmp(o, "localcaching")) { + break; + case Opt_localcaching: if (remount && !args->ar_localcaching) goto cant_remount; args->ar_localcaching = 1; - } - - else if (!strcmp(o, "debug")) + break; + case Opt_debug: args->ar_debug = 1; - - else if (!strcmp(o, "nodebug")) + break; + case Opt_nodebug: args->ar_debug = 0; - - else if (!strcmp(o, "upgrade")) { + break; + case Opt_upgrade: if (remount && !args->ar_upgrade) goto cant_remount; args->ar_upgrade = 1; - } + break; + case Opt_num_glockd: + if ((error = match_int(&tmp[0], &option))) { + fs_info(sdp, "problem getting num_glockd\n"); + goto out_error; + } - else if (!strcmp(o, "num_glockd")) { - unsigned int x; - if (!v) - goto need_value; - sscanf(v, "%u", &x); - if (remount && x != args->ar_num_glockd) + if (remount && option != args->ar_num_glockd) goto cant_remount; - if (!x || x > GFS2_GLOCKD_MAX) { + if (!option || option > GFS2_GLOCKD_MAX) { fs_info(sdp, "0 < num_glockd <= %u (not %u)\n", - GFS2_GLOCKD_MAX, x); + GFS2_GLOCKD_MAX, option); error = -EINVAL; - break; + goto out_error; } - args->ar_num_glockd = x; - } - - else if (!strcmp(o, "acl")) { + args->ar_num_glockd = option; + break; + case Opt_acl: args->ar_posix_acl = 1; sdp->sd_vfs->s_flags |= MS_POSIXACL; - } - - else if (!strcmp(o, "noacl")) { + break; + case Opt_noacl: args->ar_posix_acl = 0; sdp->sd_vfs->s_flags &= ~MS_POSIXACL; - } - - else if (!strcmp(o, "quota")) { - if (!v) - goto need_value; - if (!strcmp(v, "off")) - args->ar_quota = GFS2_QUOTA_OFF; - else if (!strcmp(v, "account")) - args->ar_quota = GFS2_QUOTA_ACCOUNT; - else if (!strcmp(v, "on")) - args->ar_quota = GFS2_QUOTA_ON; - else { - fs_info(sdp, "invalid value for quota\n"); - error = -EINVAL; - break; - } - } - - else if (!strcmp(o, "suiddir")) + break; + case Opt_quota_off: + args->ar_quota = GFS2_QUOTA_OFF; + break; + case Opt_quota_account: + args->ar_quota = GFS2_QUOTA_ACCOUNT; + break; + case Opt_quota_on: + args->ar_quota = GFS2_QUOTA_ON; + break; + case Opt_suiddir: args->ar_suiddir = 1; - - else if (!strcmp(o, "nosuiddir")) + break; + case Opt_nosuiddir: args->ar_suiddir = 0; - - else if (!strcmp(o, "data")) { - if (!v) - goto need_value; - if (!strcmp(v, "writeback")) - args->ar_data = GFS2_DATA_WRITEBACK; - else if (!strcmp(v, "ordered")) - args->ar_data = GFS2_DATA_ORDERED; - else { - fs_info(sdp, "invalid value for data\n"); - error = -EINVAL; - break; - } - } - - else { + break; + case Opt_data_writeback: + args->ar_data = GFS2_DATA_WRITEBACK; + break; + case Opt_data_ordered: + args->ar_data = GFS2_DATA_ORDERED; + break; + default: fs_info(sdp, "unknown option: %s\n", o); error = -EINVAL; - break; + goto out_error; } } +out_error: if (error) fs_info(sdp, "invalid mount option(s)\n"); @@ -202,10 +253,6 @@ int gfs2_mount_args(struct gfs2_sbd *sdp, char *data_arg, int remount) return error; -need_value: - fs_info(sdp, "need value for option %s\n", o); - return -EINVAL; - cant_remount: fs_info(sdp, "can't remount with option %s\n", o); return -EINVAL; -- cgit v1.2.3 From bf126aee6d54fe1e509846abf3b27aba84c6d7ce Mon Sep 17 00:00:00 2001 From: Steven Whitehouse Date: Fri, 20 Apr 2007 09:18:30 +0100 Subject: [GFS2] Patch to fix mmap of stuffed files If a stuffed file is mmaped and a page fault is generated at some offset above the initial page, we need to create a zero page to hang the buffer heads off before we can unstuff the file. This is a fix for bz #236087 Signed-off-by: Steven Whitehouse --- fs/gfs2/ops_address.c | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/ops_address.c b/fs/gfs2/ops_address.c index 90c287932d5..30c15622174 100644 --- a/fs/gfs2/ops_address.c +++ b/fs/gfs2/ops_address.c @@ -197,7 +197,19 @@ static int stuffed_readpage(struct gfs2_inode *ip, struct page *page) void *kaddr; int error; - BUG_ON(page->index); + /* + * Due to the order of unstuffing files and ->nopage(), we can be + * asked for a zero page in the case of a stuffed file being extended, + * so we need to supply one here. It doesn't happen often. + */ + if (unlikely(page->index)) { + kaddr = kmap_atomic(page, KM_USER0); + memset(kaddr, 0, PAGE_CACHE_SIZE); + kunmap_atomic(kaddr, KM_USER0); + flush_dcache_page(page); + SetPageUptodate(page); + return 0; + } error = gfs2_meta_inode_buffer(ip, &dibh); if (error) @@ -208,9 +220,8 @@ static int stuffed_readpage(struct gfs2_inode *ip, struct page *page) ip->i_di.di_size); memset(kaddr + ip->i_di.di_size, 0, PAGE_CACHE_SIZE - ip->i_di.di_size); kunmap_atomic(kaddr, KM_USER0); - + flush_dcache_page(page); brelse(dibh); - SetPageUptodate(page); return 0; -- cgit v1.2.3 From f391a4ead61e4510ff385815ddaf3c0777fbad1b Mon Sep 17 00:00:00 2001 From: "akpm@linux-foundation.org" Date: Wed, 25 Apr 2007 21:08:02 -0700 Subject: [GFS2] printk warning fixes alpha: fs/gfs2/dir.c: In function 'gfs2_dir_read_leaf': fs/gfs2/dir.c:1322: warning: format '%llu' expects type 'long long unsigned int', but argument 3 has type 'sector_t' fs/gfs2/dir.c: In function 'gfs2_dir_read': fs/gfs2/dir.c:1455: warning: format '%llu' expects type 'long long unsigned int', but argument 3 has type '__u64' Cc: Steven Whitehouse Signed-off-by: Andrew Morton Signed-off-by: Steven Whitehouse --- fs/gfs2/dir.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/dir.c b/fs/gfs2/dir.c index 6c3ed7674a9..a96fa07b3f3 100644 --- a/fs/gfs2/dir.c +++ b/fs/gfs2/dir.c @@ -1319,9 +1319,11 @@ static int gfs2_dir_read_leaf(struct inode *inode, u64 *offset, void *opaque, if (IS_ERR(dent)) goto out_kfree; if (entries2 != g.offset) { - fs_warn(sdp, "Number of entries corrupt in dir leaf %llu, " - "entries2 (%u) != g.offset (%u)\n", - (u64)bh->b_blocknr, entries2, g.offset); + fs_warn(sdp, "Number of entries corrupt in dir " + "leaf %llu, entries2 (%u) != " + "g.offset (%u)\n", + (unsigned long long)bh->b_blocknr, + entries2, g.offset); error = -EIO; goto out_kfree; @@ -1454,7 +1456,8 @@ int gfs2_dir_read(struct inode *inode, u64 *offset, void *opaque, if (dip->i_di.di_entries != g.offset) { fs_warn(sdp, "Number of entries corrupt in dir %llu, " "ip->i_di.di_entries (%u) != g.offset (%u)\n", - dip->i_num.no_addr, dip->i_di.di_entries, + (unsigned long long)dip->i_num.no_addr, + dip->i_di.di_entries, g.offset); error = -EIO; goto out; -- cgit v1.2.3 From 617e82e10ccf96a13eb2efd5eac4abef44a87d02 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Thu, 26 Apr 2007 13:46:49 -0500 Subject: [DLM] lowcomms style Replace some printk with log_print, and fix some simple cases of lines over 80. Also, return -ENOTCONN if lowcomms_start fails due to no local IP address being available. Signed-off-by: David Teigland Signed-off-by: Steven Whitehouse --- fs/dlm/lowcomms.c | 67 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 37 insertions(+), 30 deletions(-) (limited to 'fs') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 5c33233bc2d..27970a58d29 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -395,7 +395,8 @@ static void sctp_init_failed(void) } /* Something happened to an association */ -static void process_sctp_notification(struct connection *con, struct msghdr *msg, char *buf) +static void process_sctp_notification(struct connection *con, + struct msghdr *msg, char *buf) { union sctp_notification *sn = (union sctp_notification *)buf; @@ -422,7 +423,7 @@ static void process_sctp_notification(struct connection *con, struct msghdr *msg */ if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { log_print("COMM_UP for invalid assoc ID %d", - (int)sn->sn_assoc_change.sac_assoc_id); + (int)sn->sn_assoc_change.sac_assoc_id); sctp_init_failed(); return; } @@ -465,10 +466,12 @@ static void process_sctp_notification(struct connection *con, struct msghdr *msg /* Peel off a new sock */ parg.associd = sn->sn_assoc_change.sac_assoc_id; - ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, SCTP_SOCKOPT_PEELOFF, + ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, + SCTP_SOCKOPT_PEELOFF, (void *)&parg, &parglen); if (ret) { - log_print("Can't peel off a socket for connection %d to node %d: err=%d\n", + log_print("Can't peel off a socket for " + "connection %d to node %d: err=%d\n", parg.associd, nodeid, ret); } file = fget(parg.sd); @@ -478,7 +481,7 @@ static void process_sctp_notification(struct connection *con, struct msghdr *msg put_unused_fd(parg.sd); log_print("got new/restarted association %d nodeid %d", - (int)sn->sn_assoc_change.sac_assoc_id, nodeid); + (int)sn->sn_assoc_change.sac_assoc_id, nodeid); /* Send any pending writes */ clear_bit(CF_CONNECT_PENDING, &new_con->flags); @@ -587,7 +590,7 @@ static int receive_from_sock(struct connection *con) msg.msg_controllen = sizeof(incmsg); process_sctp_notification(con, &msg, - page_address(con->rx_page) + con->cb.base); + page_address(con->rx_page) + con->cb.base); mutex_unlock(&con->sock_mutex); return 0; } @@ -601,10 +604,10 @@ static int receive_from_sock(struct connection *con) con->cb.base, con->cb.len, PAGE_CACHE_SIZE); if (ret == -EBADMSG) { - printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, " - "iov_len=%u, iov_base[0]=%p, read=%d\n", - page_address(con->rx_page), con->cb.base, con->cb.len, - len, iov[0].iov_base, r); + log_print("lowcomms: addr=%p, base=%u, len=%u, " + "iov_len=%u, iov_base[0]=%p, read=%d", + page_address(con->rx_page), con->cb.base, con->cb.len, + len, iov[0].iov_base, r); } if (ret < 0) goto out_close; @@ -680,7 +683,7 @@ static int tcp_accept_from_sock(struct connection *con) /* Get the new node's NODEID */ make_sockaddr(&peeraddr, 0, &len); if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { - printk("dlm: connect from non cluster node\n"); + log_print("connect from non cluster node"); sock_release(newsock); mutex_unlock(&con->sock_mutex); return -1; @@ -705,7 +708,7 @@ static int tcp_accept_from_sock(struct connection *con) if (!othercon) { othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); if (!othercon) { - printk("dlm: failed to allocate incoming socket\n"); + log_print("failed to allocate incoming socket"); mutex_unlock(&newcon->sock_mutex); result = -ENOMEM; goto accept_err; @@ -748,7 +751,7 @@ accept_err: sock_release(newsock); if (result != -EAGAIN) - printk("dlm: error accepting connection from node: %d\n", result); + log_print("error accepting connection from node: %d", result); return result; } @@ -826,7 +829,8 @@ static void sctp_init_assoc(struct connection *con) ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); if (ret < 0) { - log_print("Send first packet to node %d failed: %d", con->nodeid, ret); + log_print("Send first packet to node %d failed: %d", + con->nodeid, ret); /* Try again later */ clear_bit(CF_CONNECT_PENDING, &con->flags); @@ -929,9 +933,10 @@ static struct socket *tcp_create_listen_sock(struct connection *con, addr_len = sizeof(struct sockaddr_in6); /* Create a socket to communicate with */ - result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, + IPPROTO_TCP, &sock); if (result < 0) { - printk("dlm: Can't create listening comms socket\n"); + log_print("Can't create listening comms socket"); goto create_out; } @@ -939,8 +944,7 @@ static struct socket *tcp_create_listen_sock(struct connection *con, (char *)&one, sizeof(one)); if (result < 0) { - printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n", - result); + log_print("Failed to set SO_REUSEADDR on socket: %d", result); } sock->sk->sk_user_data = con; con->rx_action = tcp_accept_from_sock; @@ -951,7 +955,7 @@ static struct socket *tcp_create_listen_sock(struct connection *con, make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); if (result < 0) { - printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port); + log_print("Can't bind to port %d", dlm_config.ci_tcp_port); sock_release(sock); sock = NULL; con->sock = NULL; @@ -960,12 +964,12 @@ static struct socket *tcp_create_listen_sock(struct connection *con, result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); if (result < 0) { - printk("dlm: Set keepalive failed: %d\n", result); + log_print("Set keepalive failed: %d", result); } result = sock->ops->listen(sock, 5); if (result < 0) { - printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port); + log_print("Can't listen on port %d", dlm_config.ci_tcp_port); sock_release(sock); sock = NULL; goto create_out; @@ -994,8 +998,11 @@ static void init_local(void) } } -/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */ -static int add_sctp_bind_addr(struct connection *sctp_con, struct sockaddr_storage *addr, int addr_len, int num) +/* Bind to an IP address. SCTP allows multiple address so it can do + multi-homing */ +static int add_sctp_bind_addr(struct connection *sctp_con, + struct sockaddr_storage *addr, + int addr_len, int num) { int result = 0; @@ -1048,10 +1055,10 @@ static int sctp_listen_for_all(void) result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&bufsize, sizeof(bufsize)); if (result) - log_print("Error increasing buffer space on socket: %d", result); + log_print("Error increasing buffer space on socket %d", result); result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, - (char *)&subscribe, sizeof(subscribe)); + (char *)&subscribe, sizeof(subscribe)); if (result < 0) { log_print("Failed to set SCTP_EVENTS on socket: result=%d", result); @@ -1102,7 +1109,8 @@ static int tcp_listen_for_all(void) /* We don't support multi-homed hosts */ if (dlm_local_addr[1] != NULL) { - log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); + log_print("TCP protocol can't handle multi-homed hosts, " + "try SCTP"); return -EINVAL; } @@ -1148,8 +1156,7 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con, return entry; } -void *dlm_lowcomms_get_buffer(int nodeid, int len, - gfp_t allocation, char **ppc) +void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) { struct connection *con; struct writequeue_entry *e; @@ -1253,8 +1260,7 @@ static void send_to_sock(struct connection *con) goto out; if (ret <= 0) goto send_error; - } - else { + } else { /* Don't starve people filling buffers */ cond_resched(); } @@ -1426,6 +1432,7 @@ int dlm_lowcomms_start(void) init_local(); if (!dlm_local_count) { + error = -ENOTCONN; log_print("no local IP address has been set"); goto out; } -- cgit v1.2.3 From 37fde8ca6c60ea61f5e9d7cb877c25ac60e74167 Mon Sep 17 00:00:00 2001 From: Steven Whitehouse Date: Tue, 1 May 2007 09:51:39 +0100 Subject: [GFS2] Uncomment sprintf_symbol calling code Now that the patch from -mm has gone upstream, we can uncomment the code in GFS2 which uses sprintf_symbol. Signed-off-by: Steven Whitehouse Cc: Robert Peterson --- fs/gfs2/glock.c | 12 ------------ 1 file changed, 12 deletions(-) (limited to 'fs') diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index 7988715b7a0..1815429a297 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -1763,22 +1763,10 @@ void gfs2_gl_hash_clear(struct gfs2_sbd *sdp, int wait) static void gfs2_print_symbol(struct glock_iter *gi, const char *fmt, unsigned long address) { -/* when sprint_symbol becomes available in the new kernel, replace this */ -/* function with: char buffer[KSYM_SYMBOL_LEN]; sprint_symbol(buffer, address); print_dbg(gi, fmt, buffer); -*/ - char buffer[256]; - - if (gi) { - memset(buffer, 0, sizeof(buffer)); - sprintf(buffer, "0x%08lx", address); - print_dbg(gi, fmt, buffer); - } - else - print_symbol(fmt, address); } /** -- cgit v1.2.3