diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2010-05-12 18:47:29 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2010-05-12 18:47:29 -0700 |
commit | cdf5f61ed1d64d50eb9cf10071ab40836f5f9f91 (patch) | |
tree | d67166525b89055b44ecf52cd9ae395a676877f5 | |
parent | 769d9968e42c995eaaf61ac5583d998f32e0769a (diff) | |
parent | e84346b726ea90a8ed470bc81c4136a7b8710ea5 (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
ceph: preserve seq # on requeued messages after transient transport errors
ceph: fix cap removal races
ceph: zero unused message header, footer fields
ceph: fix locking for waking session requests after reconnect
ceph: resubmit requests on pg mapping change (not just primary change)
ceph: fix open file counting on snapped inodes when mds returns no caps
ceph: unregister osd request on failure
ceph: don't use writeback_control in writepages completion
ceph: unregister bdi before kill_anon_super releases device name
-rw-r--r-- | fs/ceph/addr.c | 6 | ||||
-rw-r--r-- | fs/ceph/caps.c | 19 | ||||
-rw-r--r-- | fs/ceph/inode.c | 4 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 34 | ||||
-rw-r--r-- | fs/ceph/messenger.c | 17 | ||||
-rw-r--r-- | fs/ceph/messenger.h | 1 | ||||
-rw-r--r-- | fs/ceph/osd_client.c | 26 | ||||
-rw-r--r-- | fs/ceph/osd_client.h | 3 | ||||
-rw-r--r-- | fs/ceph/osdmap.c | 29 | ||||
-rw-r--r-- | fs/ceph/osdmap.h | 2 | ||||
-rw-r--r-- | fs/ceph/rados.h | 1 | ||||
-rw-r--r-- | fs/ceph/super.c | 23 |
12 files changed, 116 insertions, 49 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 4b42c2bb603..a9005d862ed 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -504,7 +504,6 @@ static void writepages_finish(struct ceph_osd_request *req, int i; struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; - struct writeback_control *wbc = req->r_wbc; __s32 rc = -EIO; u64 bytes = 0; struct ceph_client *client = ceph_inode_to_client(inode); @@ -546,10 +545,6 @@ static void writepages_finish(struct ceph_osd_request *req, clear_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC); - if (i >= wrote) { - dout("inode %p skipping page %p\n", inode, page); - wbc->pages_skipped++; - } ceph_put_snap_context((void *)page->private); page->private = 0; ClearPagePrivate(page); @@ -799,7 +794,6 @@ get_more_pages: alloc_page_vec(client, req); req->r_callback = writepages_finish; req->r_inode = inode; - req->r_wbc = wbc; } /* note position of first page in pvec */ diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 0c168180686..d9400534b27 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -858,6 +858,8 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci) } /* + * Remove a cap. Take steps to deal with a racing iterate_session_caps. + * * caller should hold i_lock. * caller will not hold session s_mutex if called from destroy_inode. */ @@ -866,15 +868,10 @@ void __ceph_remove_cap(struct ceph_cap *cap) struct ceph_mds_session *session = cap->session; struct ceph_inode_info *ci = cap->ci; struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc; + int removed = 0; dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); - /* remove from inode list */ - rb_erase(&cap->ci_node, &ci->i_caps); - cap->ci = NULL; - if (ci->i_auth_cap == cap) - ci->i_auth_cap = NULL; - /* remove from session list */ spin_lock(&session->s_cap_lock); if (session->s_cap_iterator == cap) { @@ -885,10 +882,18 @@ void __ceph_remove_cap(struct ceph_cap *cap) list_del_init(&cap->session_caps); session->s_nr_caps--; cap->session = NULL; + removed = 1; } + /* protect backpointer with s_cap_lock: see iterate_session_caps */ + cap->ci = NULL; spin_unlock(&session->s_cap_lock); - if (cap->session == NULL) + /* remove from inode list */ + rb_erase(&cap->ci_node, &ci->i_caps); + if (ci->i_auth_cap == cap) + ci->i_auth_cap = NULL; + + if (removed) ceph_put_cap(cap); if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) { diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 261f3e6c0bc..85b4d2ffdeb 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -733,6 +733,10 @@ no_change: __ceph_get_fmode(ci, cap_fmode); spin_unlock(&inode->i_lock); } + } else if (cap_fmode >= 0) { + pr_warning("mds issued no caps on %llx.%llx\n", + ceph_vinop(inode)); + __ceph_get_fmode(ci, cap_fmode); } /* update delegation info? */ diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 60a9a4ae47b..24561a557e0 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -736,9 +736,10 @@ static void cleanup_cap_releases(struct ceph_mds_session *session) } /* - * Helper to safely iterate over all caps associated with a session. + * Helper to safely iterate over all caps associated with a session, with + * special care taken to handle a racing __ceph_remove_cap(). * - * caller must hold session s_mutex + * Caller must hold session s_mutex. */ static int iterate_session_caps(struct ceph_mds_session *session, int (*cb)(struct inode *, struct ceph_cap *, @@ -2136,7 +2137,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) struct ceph_mds_session *session = NULL; struct ceph_msg *reply; struct rb_node *p; - int err; + int err = -ENOMEM; struct ceph_pagelist *pagelist; pr_info("reconnect to recovering mds%d\n", mds); @@ -2185,7 +2186,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) goto fail; err = iterate_session_caps(session, encode_caps_cb, pagelist); if (err < 0) - goto out; + goto fail; /* * snaprealms. we provide mds with the ino, seq (version), and @@ -2213,28 +2214,31 @@ send: reply->nr_pages = calc_pages_for(0, pagelist->length); ceph_con_send(&session->s_con, reply); - if (session) { - session->s_state = CEPH_MDS_SESSION_OPEN; - __wake_requests(mdsc, &session->s_waiting); - } + session->s_state = CEPH_MDS_SESSION_OPEN; + mutex_unlock(&session->s_mutex); + + mutex_lock(&mdsc->mutex); + __wake_requests(mdsc, &session->s_waiting); + mutex_unlock(&mdsc->mutex); + + ceph_put_mds_session(session); -out: up_read(&mdsc->snap_rwsem); - if (session) { - mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); - } mutex_lock(&mdsc->mutex); return; fail: ceph_msg_put(reply); + up_read(&mdsc->snap_rwsem); + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); fail_nomsg: ceph_pagelist_release(pagelist); kfree(pagelist); fail_nopagelist: - pr_err("ENOMEM preparing reconnect for mds%d\n", mds); - goto out; + pr_err("error %d preparing reconnect for mds%d\n", err, mds); + mutex_lock(&mdsc->mutex); + return; } diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 509f57d9ccb..cd4fadb6491 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -492,7 +492,14 @@ static void prepare_write_message(struct ceph_connection *con) list_move_tail(&m->list_head, &con->out_sent); } - m->hdr.seq = cpu_to_le64(++con->out_seq); + /* + * only assign outgoing seq # if we haven't sent this message + * yet. if it is requeued, resend with it's original seq. + */ + if (m->needs_out_seq) { + m->hdr.seq = cpu_to_le64(++con->out_seq); + m->needs_out_seq = false; + } dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", m, con->out_seq, le16_to_cpu(m->hdr.type), @@ -1986,6 +1993,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); + msg->needs_out_seq = true; + /* queue */ mutex_lock(&con->mutex); BUG_ON(!list_empty(&msg->list_head)); @@ -2085,15 +2094,19 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, kref_init(&m->kref); INIT_LIST_HEAD(&m->list_head); + m->hdr.tid = 0; m->hdr.type = cpu_to_le16(type); + m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); + m->hdr.version = 0; m->hdr.front_len = cpu_to_le32(front_len); m->hdr.middle_len = 0; m->hdr.data_len = cpu_to_le32(page_len); m->hdr.data_off = cpu_to_le16(page_off); - m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); + m->hdr.reserved = 0; m->footer.front_crc = 0; m->footer.middle_crc = 0; m->footer.data_crc = 0; + m->footer.flags = 0; m->front_max = front_len; m->front_is_vmalloc = false; m->more_to_follow = false; diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index a343dae73cd..a5caf91cc97 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h @@ -86,6 +86,7 @@ struct ceph_msg { struct kref kref; bool front_is_vmalloc; bool more_to_follow; + bool needs_out_seq; int front_max; struct ceph_msgpool *pool; diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index c7b4dedaace..3514f71ff85 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -565,7 +565,8 @@ static int __map_osds(struct ceph_osd_client *osdc, { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_pg pgid; - int o = -1; + int acting[CEPH_PG_MAX_SIZE]; + int o = -1, num = 0; int err; dout("map_osds %p tid %lld\n", req, req->r_tid); @@ -576,10 +577,16 @@ static int __map_osds(struct ceph_osd_client *osdc, pgid = reqhead->layout.ol_pgid; req->r_pgid = pgid; - o = ceph_calc_pg_primary(osdc->osdmap, pgid); + err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); + if (err > 0) { + o = acting[0]; + num = err; + } if ((req->r_osd && req->r_osd->o_osd == o && - req->r_sent >= req->r_osd->o_incarnation) || + req->r_sent >= req->r_osd->o_incarnation && + req->r_num_pg_osds == num && + memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || (req->r_osd == NULL && o == -1)) return 0; /* no change */ @@ -587,6 +594,10 @@ static int __map_osds(struct ceph_osd_client *osdc, req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, req->r_osd ? req->r_osd->o_osd : -1); + /* record full pg acting set */ + memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); + req->r_num_pg_osds = num; + if (req->r_osd) { __cancel_request(req); list_del_init(&req->r_osd_item); @@ -612,7 +623,7 @@ static int __map_osds(struct ceph_osd_client *osdc, __remove_osd_from_lru(req->r_osd); list_add(&req->r_osd_item, &req->r_osd->o_requests); } - err = 1; /* osd changed */ + err = 1; /* osd or pg changed */ out: return err; @@ -779,16 +790,18 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, struct ceph_osd_request *req; u64 tid; int numops, object_len, flags; + s32 result; tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len < sizeof(*rhead)) goto bad; numops = le32_to_cpu(rhead->num_ops); object_len = le32_to_cpu(rhead->object_len); + result = le32_to_cpu(rhead->result); if (msg->front.iov_len != sizeof(*rhead) + object_len + numops * sizeof(struct ceph_osd_op)) goto bad; - dout("handle_reply %p tid %llu\n", msg, tid); + dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); /* lookup */ mutex_lock(&osdc->request_mutex); @@ -834,7 +847,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, dout("handle_reply tid %llu flags %d\n", tid, flags); /* either this is a read, or we got the safe response */ - if ((flags & CEPH_OSD_FLAG_ONDISK) || + if (result < 0 || + (flags & CEPH_OSD_FLAG_ONDISK) || ((flags & CEPH_OSD_FLAG_WRITE) == 0)) __unregister_request(osdc, req); diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h index b0759911e7c..ce776989ef6 100644 --- a/fs/ceph/osd_client.h +++ b/fs/ceph/osd_client.h @@ -48,6 +48,8 @@ struct ceph_osd_request { struct list_head r_osd_item; struct ceph_osd *r_osd; struct ceph_pg r_pgid; + int r_pg_osds[CEPH_PG_MAX_SIZE]; + int r_num_pg_osds; struct ceph_connection *r_con_filling_msg; @@ -66,7 +68,6 @@ struct ceph_osd_request { struct list_head r_unsafe_item; struct inode *r_inode; /* for use by callbacks */ - struct writeback_control *r_wbc; /* ditto */ char r_oid[40]; /* object name */ int r_oid_len; diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c index 2e2c15eed82..cfdd8f4388b 100644 --- a/fs/ceph/osdmap.c +++ b/fs/ceph/osdmap.c @@ -1041,12 +1041,33 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, } /* + * Return acting set for given pgid. + */ +int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, + int *acting) +{ + int rawosds[CEPH_PG_MAX_SIZE], *osds; + int i, o, num = CEPH_PG_MAX_SIZE; + + osds = calc_pg_raw(osdmap, pgid, rawosds, &num); + if (!osds) + return -1; + + /* primary is first up osd */ + o = 0; + for (i = 0; i < num; i++) + if (ceph_osd_is_up(osdmap, osds[i])) + acting[o++] = osds[i]; + return o; +} + +/* * Return primary osd for given pgid, or -1 if none. */ int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) { - int rawosds[10], *osds; - int i, num = ARRAY_SIZE(rawosds); + int rawosds[CEPH_PG_MAX_SIZE], *osds; + int i, num = CEPH_PG_MAX_SIZE; osds = calc_pg_raw(osdmap, pgid, rawosds, &num); if (!osds) @@ -1054,9 +1075,7 @@ int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) /* primary is first up osd */ for (i = 0; i < num; i++) - if (ceph_osd_is_up(osdmap, osds[i])) { + if (ceph_osd_is_up(osdmap, osds[i])) return osds[i]; - break; - } return -1; } diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h index 8bc9f1e4f56..970b547e510 100644 --- a/fs/ceph/osdmap.h +++ b/fs/ceph/osdmap.h @@ -120,6 +120,8 @@ extern int ceph_calc_object_layout(struct ceph_object_layout *ol, const char *oid, struct ceph_file_layout *fl, struct ceph_osdmap *osdmap); +extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, + int *acting); extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid); diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h index a1fc1d017b5..fd56451a871 100644 --- a/fs/ceph/rados.h +++ b/fs/ceph/rados.h @@ -58,6 +58,7 @@ struct ceph_timespec { #define CEPH_PG_LAYOUT_LINEAR 2 #define CEPH_PG_LAYOUT_HYBRID 3 +#define CEPH_PG_MAX_SIZE 16 /* max # osds in a single pg */ /* * placement group. diff --git a/fs/ceph/super.c b/fs/ceph/super.c index f888cf487b7..110857ba926 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -47,10 +47,20 @@ const char *ceph_file_part(const char *s, int len) */ static void ceph_put_super(struct super_block *s) { - struct ceph_client *cl = ceph_client(s); + struct ceph_client *client = ceph_sb_to_client(s); dout("put_super\n"); - ceph_mdsc_close_sessions(&cl->mdsc); + ceph_mdsc_close_sessions(&client->mdsc); + + /* + * ensure we release the bdi before put_anon_super releases + * the device name. + */ + if (s->s_bdi == &client->backing_dev_info) { + bdi_unregister(&client->backing_dev_info); + s->s_bdi = NULL; + } + return; } @@ -636,6 +646,8 @@ static void ceph_destroy_client(struct ceph_client *client) destroy_workqueue(client->pg_inv_wq); destroy_workqueue(client->trunc_wq); + bdi_destroy(&client->backing_dev_info); + if (client->msgr) ceph_messenger_destroy(client->msgr); mempool_destroy(client->wb_pagevec_pool); @@ -876,14 +888,14 @@ static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client) { int err; - sb->s_bdi = &client->backing_dev_info; - /* set ra_pages based on rsize mount option? */ if (client->mount_args->rsize >= PAGE_CACHE_SIZE) client->backing_dev_info.ra_pages = (client->mount_args->rsize + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT; err = bdi_register_dev(&client->backing_dev_info, sb->s_dev); + if (!err) + sb->s_bdi = &client->backing_dev_info; return err; } @@ -957,9 +969,6 @@ static void ceph_kill_sb(struct super_block *s) dout("kill_sb %p\n", s); ceph_mdsc_pre_umount(&client->mdsc); kill_anon_super(s); /* will call put_super after sb is r/o */ - if (s->s_bdi == &client->backing_dev_info) - bdi_unregister(&client->backing_dev_info); - bdi_destroy(&client->backing_dev_info); ceph_destroy_client(client); } |