diff options
Diffstat (limited to 'drivers/staging/dst')
-rw-r--r-- | drivers/staging/dst/Kconfig | 67 | ||||
-rw-r--r-- | drivers/staging/dst/Makefile | 3 | ||||
-rw-r--r-- | drivers/staging/dst/crypto.c | 731 | ||||
-rw-r--r-- | drivers/staging/dst/dcore.c | 995 | ||||
-rw-r--r-- | drivers/staging/dst/export.c | 657 | ||||
-rw-r--r-- | drivers/staging/dst/state.c | 839 | ||||
-rw-r--r-- | drivers/staging/dst/thread_pool.c | 345 | ||||
-rw-r--r-- | drivers/staging/dst/trans.c | 335 |
8 files changed, 3972 insertions, 0 deletions
diff --git a/drivers/staging/dst/Kconfig b/drivers/staging/dst/Kconfig new file mode 100644 index 00000000000..448d342ac2a --- /dev/null +++ b/drivers/staging/dst/Kconfig @@ -0,0 +1,67 @@ +config DST + tristate "Distributed storage" + depends on NET && CRYPTO && SYSFS && BLK_DEV + select CONNECTOR + ---help--- + DST is a network block device storage, which can be used to organize + exported storage on the remote nodes into the local block device. + + DST works on top of any network media and protocol; it is just a matter + of configuration utility to understand the correct addresses. The most + common example is TCP over IP, which allows to pass through firewalls and + create remote backup storage in a different datacenter. DST requires + single port to be enabled on the exporting node and outgoing connections + on the local node. + + DST works with in-kernel client and server, which improves performance by + eliminating unneded data copies and by not depending on the version + of the external IO components. It requires userspace configuration utility + though. + + DST uses transaction model, when each store has to be explicitly acked + from the remote node to be considered as successfully written. There + may be lots of in-flight transactions. When remote host does not ack + the transaction it will be resent predefined number of times with specified + timeouts between them. All those parameters are configurable. Transactions + are marked as failed after all resends complete unsuccessfully; having + long enough resend timeout and/or large number of resends allows not to + return error to the higher (FS usually) layer in case of short network + problems or remote node outages. In case of network RAID setup this means + that storage will not degrade until transactions are marked as failed, and + thus will not force checksum recalculation and data rebuild. In case of + connection failure DST will try to reconnect to the remote node automatically. + DST sends ping commands at idle time to detect if remote node is alive. + + Because of transactional model it is possible to use zero-copy sending + without worry of data corruption (which in turn could be detected by the + strong checksums though). + + DST may fully encrypt the data channel in case of untrusted channel and implement + strong checksum of the transferred data. It is possible to configure algorithms + and crypto keys; they should match on both sides of the network channel. + Crypto processing does not introduce noticeble performance overhead, since DST + uses configurable pool of threads to perform crypto processing. + + DST utilizes memory pool model of all its transaction allocations (it is the + only additional allocation on the client) and server allocations (bio pools, + while pages are allocated from the slab). + + At startup DST performs a simple negotiation with the export node to determine + access permissions and size of the exported storage. It can be extended if + new parameters should be autonegotiated. + + DST carries block IO flags in the protocol, which allows to transparently implement + barriers and sync/flush operations. Those flags are used in the export node where + IO against the local storage is performed, which means that sync write will be sync + on the remote node too, which in turn improves data integrity and improved resistance + to errors and data corruption during power outages or storage damages. + + Homepage: http://www.ioremap.net/projects/dst + Userspace configuration utility and the latest releases: http://www.ioremap.net/archive/dst/ + +config DST_DEBUG + bool "DST debug" + depends on DST + ---help--- + This option will enable HEAVY debugging of the DST. + Turn it on ONLY if you have to debug some really obscure problem. diff --git a/drivers/staging/dst/Makefile b/drivers/staging/dst/Makefile new file mode 100644 index 00000000000..3a8b0cf9643 --- /dev/null +++ b/drivers/staging/dst/Makefile @@ -0,0 +1,3 @@ +obj-$(CONFIG_DST) += nst.o + +nst-y := dcore.o state.o export.o thread_pool.o crypto.o trans.o diff --git a/drivers/staging/dst/crypto.c b/drivers/staging/dst/crypto.c new file mode 100644 index 00000000000..7250f90f592 --- /dev/null +++ b/drivers/staging/dst/crypto.c @@ -0,0 +1,731 @@ +/* + * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include <linux/bio.h> +#include <linux/crypto.h> +#include <linux/dst.h> +#include <linux/kernel.h> +#include <linux/scatterlist.h> +#include <linux/slab.h> + +/* + * Tricky bastard, but IV can be more complex with time... + */ +static inline u64 dst_gen_iv(struct dst_trans *t) +{ + return t->gen; +} + +/* + * Crypto machinery: hash/cipher support for the given crypto controls. + */ +static struct crypto_hash *dst_init_hash(struct dst_crypto_ctl *ctl, u8 *key) +{ + int err; + struct crypto_hash *hash; + + hash = crypto_alloc_hash(ctl->hash_algo, 0, CRYPTO_ALG_ASYNC); + if (IS_ERR(hash)) { + err = PTR_ERR(hash); + dprintk("%s: failed to allocate hash '%s', err: %d.\n", + __func__, ctl->hash_algo, err); + goto err_out_exit; + } + + ctl->crypto_attached_size = crypto_hash_digestsize(hash); + + if (!ctl->hash_keysize) + return hash; + + err = crypto_hash_setkey(hash, key, ctl->hash_keysize); + if (err) { + dprintk("%s: failed to set key for hash '%s', err: %d.\n", + __func__, ctl->hash_algo, err); + goto err_out_free; + } + + return hash; + +err_out_free: + crypto_free_hash(hash); +err_out_exit: + return ERR_PTR(err); +} + +static struct crypto_ablkcipher *dst_init_cipher(struct dst_crypto_ctl *ctl, u8 *key) +{ + int err = -EINVAL; + struct crypto_ablkcipher *cipher; + + if (!ctl->cipher_keysize) + goto err_out_exit; + + cipher = crypto_alloc_ablkcipher(ctl->cipher_algo, 0, 0); + if (IS_ERR(cipher)) { + err = PTR_ERR(cipher); + dprintk("%s: failed to allocate cipher '%s', err: %d.\n", + __func__, ctl->cipher_algo, err); + goto err_out_exit; + } + + crypto_ablkcipher_clear_flags(cipher, ~0); + + err = crypto_ablkcipher_setkey(cipher, key, ctl->cipher_keysize); + if (err) { + dprintk("%s: failed to set key for cipher '%s', err: %d.\n", + __func__, ctl->cipher_algo, err); + goto err_out_free; + } + + return cipher; + +err_out_free: + crypto_free_ablkcipher(cipher); +err_out_exit: + return ERR_PTR(err); +} + +/* + * Crypto engine has a pool of pages to encrypt data into before sending + * it over the network. This pool is freed/allocated here. + */ +static void dst_crypto_pages_free(struct dst_crypto_engine *e) +{ + unsigned int i; + + for (i=0; i<e->page_num; ++i) + __free_page(e->pages[i]); + kfree(e->pages); +} + +static int dst_crypto_pages_alloc(struct dst_crypto_engine *e, int num) +{ + int i; + + e->pages = kmalloc(num * sizeof(struct page **), GFP_KERNEL); + if (!e->pages) + return -ENOMEM; + + for (i=0; i<num; ++i) { + e->pages[i] = alloc_page(GFP_KERNEL); + if (!e->pages[i]) + goto err_out_free_pages; + } + + e->page_num = num; + return 0; + +err_out_free_pages: + while (--i >= 0) + __free_page(e->pages[i]); + + kfree(e->pages); + return -ENOMEM; +} + +/* + * Initialize crypto engine for given node. + * Setup cipher/hash, keys, pool of threads and private data. + */ +static int dst_crypto_engine_init(struct dst_crypto_engine *e, struct dst_node *n) +{ + int err; + struct dst_crypto_ctl *ctl = &n->crypto; + + err = dst_crypto_pages_alloc(e, n->max_pages); + if (err) + goto err_out_exit; + + e->size = PAGE_SIZE; + e->data = kmalloc(e->size, GFP_KERNEL); + if (!e->data) { + err = -ENOMEM; + goto err_out_free_pages; + } + + if (ctl->hash_algo[0]) { + e->hash = dst_init_hash(ctl, n->hash_key); + if (IS_ERR(e->hash)) { + err = PTR_ERR(e->hash); + e->hash = NULL; + goto err_out_free; + } + } + + if (ctl->cipher_algo[0]) { + e->cipher = dst_init_cipher(ctl, n->cipher_key); + if (IS_ERR(e->cipher)) { + err = PTR_ERR(e->cipher); + e->cipher = NULL; + goto err_out_free_hash; + } + } + + return 0; + +err_out_free_hash: + crypto_free_hash(e->hash); +err_out_free: + kfree(e->data); +err_out_free_pages: + dst_crypto_pages_free(e); +err_out_exit: + return err; +} + +static void dst_crypto_engine_exit(struct dst_crypto_engine *e) +{ + if (e->hash) + crypto_free_hash(e->hash); + if (e->cipher) + crypto_free_ablkcipher(e->cipher); + dst_crypto_pages_free(e); + kfree(e->data); +} + +/* + * Waiting for cipher processing to be completed. + */ +struct dst_crypto_completion +{ + struct completion complete; + int error; +}; + +static void dst_crypto_complete(struct crypto_async_request *req, int err) +{ + struct dst_crypto_completion *c = req->data; + + if (err == -EINPROGRESS) + return; + + dprintk("%s: req: %p, err: %d.\n", __func__, req, err); + c->error = err; + complete(&c->complete); +} + +static int dst_crypto_process(struct ablkcipher_request *req, + struct scatterlist *sg_dst, struct scatterlist *sg_src, + void *iv, int enc, unsigned long timeout) +{ + struct dst_crypto_completion c; + int err; + + init_completion(&c.complete); + c.error = -EINPROGRESS; + + ablkcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_BACKLOG, + dst_crypto_complete, &c); + + ablkcipher_request_set_crypt(req, sg_src, sg_dst, sg_src->length, iv); + + if (enc) + err = crypto_ablkcipher_encrypt(req); + else + err = crypto_ablkcipher_decrypt(req); + + switch (err) { + case -EINPROGRESS: + case -EBUSY: + err = wait_for_completion_interruptible_timeout(&c.complete, + timeout); + if (!err) + err = -ETIMEDOUT; + else + err = c.error; + break; + default: + break; + } + + return err; +} + +/* + * DST uses generic iteration approach for data crypto processing. + * Single block IO request is switched into array of scatterlists, + * which are submitted to the crypto processing iterator. + * + * Input and output iterator initialization are different, since + * in output case we can not encrypt data in-place and need a + * temporary storage, which is then being sent to the remote peer. + */ +static int dst_trans_iter_out(struct bio *bio, struct dst_crypto_engine *e, + int (* iterator) (struct dst_crypto_engine *e, + struct scatterlist *dst, + struct scatterlist *src)) +{ + struct bio_vec *bv; + int err, i; + + sg_init_table(e->src, bio->bi_vcnt); + sg_init_table(e->dst, bio->bi_vcnt); + + bio_for_each_segment(bv, bio, i) { + sg_set_page(&e->src[i], bv->bv_page, bv->bv_len, bv->bv_offset); + sg_set_page(&e->dst[i], e->pages[i], bv->bv_len, bv->bv_offset); + + err = iterator(e, &e->dst[i], &e->src[i]); + if (err) + return err; + } + + return 0; +} + +static int dst_trans_iter_in(struct bio *bio, struct dst_crypto_engine *e, + int (* iterator) (struct dst_crypto_engine *e, + struct scatterlist *dst, + struct scatterlist *src)) +{ + struct bio_vec *bv; + int err, i; + + sg_init_table(e->src, bio->bi_vcnt); + sg_init_table(e->dst, bio->bi_vcnt); + + bio_for_each_segment(bv, bio, i) { + sg_set_page(&e->src[i], bv->bv_page, bv->bv_len, bv->bv_offset); + sg_set_page(&e->dst[i], bv->bv_page, bv->bv_len, bv->bv_offset); + + err = iterator(e, &e->dst[i], &e->src[i]); + if (err) + return err; + } + + return 0; +} + +static int dst_crypt_iterator(struct dst_crypto_engine *e, + struct scatterlist *sg_dst, struct scatterlist *sg_src) +{ + struct ablkcipher_request *req = e->data; + u8 iv[32]; + + memset(iv, 0, sizeof(iv)); + + memcpy(iv, &e->iv, sizeof(e->iv)); + + return dst_crypto_process(req, sg_dst, sg_src, iv, e->enc, e->timeout); +} + +static int dst_crypt(struct dst_crypto_engine *e, struct bio *bio) +{ + struct ablkcipher_request *req = e->data; + + memset(req, 0, sizeof(struct ablkcipher_request)); + ablkcipher_request_set_tfm(req, e->cipher); + + if (e->enc) + return dst_trans_iter_out(bio, e, dst_crypt_iterator); + else + return dst_trans_iter_in(bio, e, dst_crypt_iterator); +} + +static int dst_hash_iterator(struct dst_crypto_engine *e, + struct scatterlist *sg_dst, struct scatterlist *sg_src) +{ + return crypto_hash_update(e->data, sg_src, sg_src->length); +} + +static int dst_hash(struct dst_crypto_engine *e, struct bio *bio, void *dst) +{ + struct hash_desc *desc = e->data; + int err; + + desc->tfm = e->hash; + desc->flags = 0; + + err = crypto_hash_init(desc); + if (err) + return err; + + err = dst_trans_iter_in(bio, e, dst_hash_iterator); + if (err) + return err; + + err = crypto_hash_final(desc, dst); + if (err) + return err; + + return 0; +} + +/* + * Initialize/cleanup a crypto thread. The only thing it should + * do is to allocate a pool of pages as temporary storage. + * And to setup cipher and/or hash. + */ +static void *dst_crypto_thread_init(void *data) +{ + struct dst_node *n = data; + struct dst_crypto_engine *e; + int err = -ENOMEM; + + e = kzalloc(sizeof(struct dst_crypto_engine), GFP_KERNEL); + if (!e) + goto err_out_exit; + e->src = kcalloc(2 * n->max_pages, sizeof(struct scatterlist), + GFP_KERNEL); + if (!e->src) + goto err_out_free; + + e->dst = e->src + n->max_pages; + + err = dst_crypto_engine_init(e, n); + if (err) + goto err_out_free_all; + + return e; + +err_out_free_all: + kfree(e->src); +err_out_free: + kfree(e); +err_out_exit: + return ERR_PTR(err); +} + +static void dst_crypto_thread_cleanup(void *private) +{ + struct dst_crypto_engine *e = private; + + dst_crypto_engine_exit(e); + kfree(e->src); + kfree(e); +} + +/* + * Initialize crypto engine for given node: store keys, create pool + * of threads, initialize each one. + * + * Each thread has unique ID, but 0 and 1 are reserved for receiving and accepting + * threads (if export node), so IDs could start from 2, but starting them + * from 10 allows easily understand what this thread is for. + */ +int dst_node_crypto_init(struct dst_node *n, struct dst_crypto_ctl *ctl) +{ + void *key = (ctl + 1); + int err = -ENOMEM, i; + char name[32]; + + if (ctl->hash_keysize) { + n->hash_key = kmalloc(ctl->hash_keysize, GFP_KERNEL); + if (!n->hash_key) + goto err_out_exit; + memcpy(n->hash_key, key, ctl->hash_keysize); + } + + if (ctl->cipher_keysize) { + n->cipher_key = kmalloc(ctl->cipher_keysize, GFP_KERNEL); + if (!n->cipher_key) + goto err_out_free_hash; + memcpy(n->cipher_key, key, ctl->cipher_keysize); + } + memcpy(&n->crypto, ctl, sizeof(struct dst_crypto_ctl)); + + for (i=0; i<ctl->thread_num; ++i) { + snprintf(name, sizeof(name), "%s-crypto-%d", n->name, i); + /* Unique ids... */ + err = thread_pool_add_worker(n->pool, name, i+10, + dst_crypto_thread_init, dst_crypto_thread_cleanup, n); + if (err) + goto err_out_free_threads; + } + + return 0; + +err_out_free_threads: + while (--i >= 0) + thread_pool_del_worker_id(n->pool, i+10); + + if (ctl->cipher_keysize) + kfree(n->cipher_key); + ctl->cipher_keysize = 0; +err_out_free_hash: + if (ctl->hash_keysize) + kfree(n->hash_key); + ctl->hash_keysize = 0; +err_out_exit: + return err; +} + +void dst_node_crypto_exit(struct dst_node *n) +{ + struct dst_crypto_ctl *ctl = &n->crypto; + + if (ctl->cipher_algo[0] || ctl->hash_algo[0]) { + kfree(n->hash_key); + kfree(n->cipher_key); + } +} + +/* + * Thrad pool setup callback. Just stores a transaction in private data. + */ +static int dst_trans_crypto_setup(void *crypto_engine, void *trans) +{ + struct dst_crypto_engine *e = crypto_engine; + + e->private = trans; + return 0; +} + +#if 0 +static void dst_dump_bio(struct bio *bio) +{ + u8 *p; + struct bio_vec *bv; + int i; + + bio_for_each_segment(bv, bio, i) { + dprintk("%s: %llu/%u: size: %u, offset: %u, data: ", + __func__, bio->bi_sector, bio->bi_size, + bv->bv_len, bv->bv_offset); + + p = kmap(bv->bv_page) + bv->bv_offset; + for (i=0; i<bv->bv_len; ++i) + printk("%02x ", p[i]); + kunmap(bv->bv_page); + printk("\n"); + } +} +#endif + +/* + * Encrypt/hash data and send it to the network. + */ +static int dst_crypto_process_sending(struct dst_crypto_engine *e, + struct bio *bio, u8 *hash) +{ + int err; + + if (e->cipher) { + err = dst_crypt(e, bio); + if (err) + goto err_out_exit; + } + + if (e->hash) { + err = dst_hash(e, bio, hash); + if (err) + goto err_out_exit; + +#ifdef CONFIG_DST_DEBUG + { + unsigned int i; + + /* dst_dump_bio(bio); */ + + printk(KERN_DEBUG "%s: bio: %llu/%u, rw: %lu, hash: ", + __func__, (u64)bio->bi_sector, + bio->bi_size, bio_data_dir(bio)); + for (i=0; i<crypto_hash_digestsize(e->hash); ++i) + printk("%02x ", hash[i]); + printk("\n"); + } +#endif + } + + return 0; + +err_out_exit: + return err; +} + +/* + * Check if received data is valid. Decipher if it is. + */ +static int dst_crypto_process_receiving(struct dst_crypto_engine *e, + struct bio *bio, u8 *hash, u8 *recv_hash) +{ + int err; + + if (e->hash) { + int mismatch; + + err = dst_hash(e, bio, hash); + if (err) + goto err_out_exit; + + mismatch = !!memcmp(recv_hash, hash, + crypto_hash_digestsize(e->hash)); +#ifdef CONFIG_DST_DEBUG + /* dst_dump_bio(bio); */ + + printk(KERN_DEBUG "%s: bio: %llu/%u, rw: %lu, hash mismatch: %d", + __func__, (u64)bio->bi_sector, bio->bi_size, + bio_data_dir(bio), mismatch); + if (mismatch) { + unsigned int i; + + printk(", recv/calc: "); + for (i=0; i<crypto_hash_digestsize(e->hash); ++i) { + printk("%02x/%02x ", recv_hash[i], hash[i]); + } + } + printk("\n"); +#endif + err = -1; + if (mismatch) + goto err_out_exit; + } + + if (e->cipher) { + err = dst_crypt(e, bio); + if (err) + goto err_out_exit; + } + + return 0; + +err_out_exit: + return err; +} + +/* + * Thread pool callback to encrypt data and send it to the netowork. + */ +static int dst_trans_crypto_action(void *crypto_engine, void *schedule_data) +{ + struct dst_crypto_engine *e = crypto_engine; + struct dst_trans *t = schedule_data; + struct bio *bio = t->bio; + int err; + + dprintk("%s: t: %p, gen: %llu, cipher: %p, hash: %p.\n", + __func__, t, t->gen, e->cipher, e->hash); + + e->enc = t->enc; + e->iv = dst_gen_iv(t); + + if (bio_data_dir(bio) == WRITE) { + err = dst_crypto_process_sending(e, bio, t->cmd.hash); + if (err) + goto err_out_exit; + + if (e->hash) { + t->cmd.csize = crypto_hash_digestsize(e->hash); + t->cmd.size += t->cmd.csize; + } + + return dst_trans_send(t); + } else { + u8 *hash = e->data + e->size/2; + + err = dst_crypto_process_receiving(e, bio, hash, t->cmd.hash); + if (err) + goto err_out_exit; + + dst_trans_remove(t); + dst_trans_put(t); + } + + return 0; + +err_out_exit: + t->error = err; + dst_trans_put(t); + return err; +} + +/* + * Schedule crypto processing for given transaction. + */ +int dst_trans_crypto(struct dst_trans *t) +{ + struct dst_node *n = t->n; + int err; + + err = thread_pool_schedule(n->pool, + dst_trans_crypto_setup, dst_trans_crypto_action, + t, MAX_SCHEDULE_TIMEOUT); + if (err) + goto err_out_exit; + + return 0; + +err_out_exit: + dst_trans_put(t); + return err; +} + +/* + * Crypto machinery for the export node. + */ +static int dst_export_crypto_setup(void *crypto_engine, void *bio) +{ + struct dst_crypto_engine *e = crypto_engine; + + e->private = bio; + return 0; +} + +static int dst_export_crypto_action(void *crypto_engine, void *schedule_data) +{ + struct dst_crypto_engine *e = crypto_engine; + struct bio *bio = schedule_data; + struct dst_export_priv *p = bio->bi_private; + int err; + + dprintk("%s: e: %p, data: %p, bio: %llu/%u, dir: %lu.\n", __func__, + e, e->data, (u64)bio->bi_sector, bio->bi_size, bio_data_dir(bio)); + + e->enc = (bio_data_dir(bio) == READ); + e->iv = p->cmd.id; + + if (bio_data_dir(bio) == WRITE) { + u8 *hash = e->data + e->size/2; + + err = dst_crypto_process_receiving(e, bio, hash, p->cmd.hash); + if (err) + goto err_out_exit; + + generic_make_request(bio); + } else { + err = dst_crypto_process_sending(e, bio, p->cmd.hash); + if (err) + goto err_out_exit; + + if (e->hash) { + p->cmd.csize = crypto_hash_digestsize(e->hash); + p->cmd.size += p->cmd.csize; + } + + err = dst_export_send_bio(bio); + } + return 0; + +err_out_exit: + bio_put(bio); + return err; +} + +int dst_export_crypto(struct dst_node *n, struct bio *bio) +{ + int err; + + err = thread_pool_schedule(n->pool, + dst_export_crypto_setup, dst_export_crypto_action, + bio, MAX_SCHEDULE_TIMEOUT); + if (err) + goto err_out_exit; + + return 0; + +err_out_exit: + bio_put(bio); + return err; +} diff --git a/drivers/staging/dst/dcore.c b/drivers/staging/dst/dcore.c new file mode 100644 index 00000000000..fad25b75304 --- /dev/null +++ b/drivers/staging/dst/dcore.c @@ -0,0 +1,995 @@ +/* + * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include <linux/module.h> +#include <linux/kernel.h> +#include <linux/blkdev.h> +#include <linux/bio.h> +#include <linux/buffer_head.h> +#include <linux/connector.h> +#include <linux/dst.h> +#include <linux/device.h> +#include <linux/jhash.h> +#include <linux/idr.h> +#include <linux/init.h> +#include <linux/namei.h> +#include <linux/slab.h> +#include <linux/socket.h> + +#include <linux/in.h> +#include <linux/in6.h> + +#include <net/sock.h> + +static int dst_major; + +static DEFINE_MUTEX(dst_hash_lock); +static struct list_head *dst_hashtable; +static unsigned int dst_hashtable_size = 128; +module_param(dst_hashtable_size, uint, 0644); + +static char dst_name[] = "Dementianting goldfish"; + +static DEFINE_IDR(dst_index_idr); +static struct cb_id cn_dst_id = { CN_DST_IDX, CN_DST_VAL }; + +/* + * DST sysfs tree for device called 'storage': + * + * /sys/bus/dst/devices/storage/ + * /sys/bus/dst/devices/storage/type : 192.168.4.80:1025 + * /sys/bus/dst/devices/storage/size : 800 + * /sys/bus/dst/devices/storage/name : storage + */ + +static int dst_dev_match(struct device *dev, struct device_driver *drv) +{ + return 1; +} + +static struct bus_type dst_dev_bus_type = { + .name = "dst", + .match = &dst_dev_match, +}; + +static void dst_node_release(struct device *dev) +{ + struct dst_info *info = container_of(dev, struct dst_info, device); + + kfree(info); +} + +static struct device dst_node_dev = { + .bus = &dst_dev_bus_type, + .release = &dst_node_release +}; + +/* + * Setting size of the node after it was changed. + */ +static void dst_node_set_size(struct dst_node *n) +{ + struct block_device *bdev; + + set_capacity(n->disk, n->size >> 9); + + bdev = bdget_disk(n->disk, 0); + if (bdev) { + mutex_lock(&bdev->bd_inode->i_mutex); + i_size_write(bdev->bd_inode, n->size); + mutex_unlock(&bdev->bd_inode->i_mutex); + bdput(bdev); + } +} + +/* + * Distributed storage request processing function. + */ +static int dst_request(struct request_queue *q, struct bio *bio) +{ + struct dst_node *n = q->queuedata; + int err = -EIO; + + if (bio_empty_barrier(bio) && !q->prepare_discard_fn) { + /* + * This is a dirty^Wnice hack, but if we complete this + * operation with -EOPNOTSUPP like intended, XFS + * will stuck and freeze the machine. This may be + * not particulary XFS problem though, but it is the + * only FS which sends empty barrier at umount time + * I worked with. + * + * Empty barriers are not allowed anyway, see 51fd77bd9f512 + * for example, although later it was changed to bio_discard() + * only, which does not work in this case. + */ + //err = -EOPNOTSUPP; + err = 0; + goto end_io; + } + + bio_get(bio); + + return dst_process_bio(n, bio); + +end_io: + bio_endio(bio, err); + return err; +} + +/* + * Open/close callbacks for appropriate block device. + */ +static int dst_bdev_open(struct block_device *bdev, fmode_t mode) +{ + struct dst_node *n = bdev->bd_disk->private_data; + + dst_node_get(n); + return 0; +} + +static int dst_bdev_release(struct gendisk *disk, fmode_t mode) +{ + struct dst_node *n = disk->private_data; + + dst_node_put(n); + return 0; +} + +static struct block_device_operations dst_blk_ops = { + .open = dst_bdev_open, + .release = dst_bdev_release, + .owner = THIS_MODULE, +}; + +/* + * Block layer binding - disk is created when array is fully configured + * by userspace request. + */ +static int dst_node_create_disk(struct dst_node *n) +{ + int err = -ENOMEM; + u32 index = 0; + + n->queue = blk_init_queue(NULL, NULL); + if (!n->queue) + goto err_out_exit; + + n->queue->queuedata = n; + blk_queue_make_request(n->queue, dst_request); + blk_queue_max_phys_segments(n->queue, n->max_pages); + blk_queue_max_hw_segments(n->queue, n->max_pages); + + err = -ENOMEM; + n->disk = alloc_disk(1); + if (!n->disk) + goto err_out_free_queue; + + if (!(n->state->permissions & DST_PERM_WRITE)) { + printk(KERN_INFO "DST node %s attached read-only.\n", n->name); + set_disk_ro(n->disk, 1); + } + + if (!idr_pre_get(&dst_index_idr, GFP_KERNEL)) + goto err_out_put; + + mutex_lock(&dst_hash_lock); + err = idr_get_new(&dst_index_idr, NULL, &index); + mutex_unlock(&dst_hash_lock); + if (err) + goto err_out_put; + + n->disk->major = dst_major; + n->disk->first_minor = index; + n->disk->fops = &dst_blk_ops; + n->disk->queue = n->queue; + n->disk->private_data = n; + snprintf(n->disk->disk_name, sizeof(n->disk->disk_name), "dst-%s", n->name); + + return 0; + +err_out_put: + put_disk(n->disk); +err_out_free_queue: + blk_cleanup_queue(n->queue); +err_out_exit: + return err; +} + +/* + * Sysfs machinery: show device's size. + */ +static ssize_t dst_show_size(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct dst_info *info = container_of(dev, struct dst_info, device); + + return sprintf(buf, "%llu\n", info->size); +} + +/* + * Show local exported device. + */ +static ssize_t dst_show_local(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct dst_info *info = container_of(dev, struct dst_info, device); + + return sprintf(buf, "%s\n", info->local); +} + +/* + * Shows type of the remote node - device major/minor number + * for local nodes and address (af_inet ipv4/ipv6 only) for remote nodes. + */ +static ssize_t dst_show_type(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct dst_info *info = container_of(dev, struct dst_info, device); + int family = info->net.addr.sa_family; + + if (family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in *)&info->net.addr; + return sprintf(buf, "%u.%u.%u.%u:%d\n", + NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port)); + } else if (family == AF_INET6) { + struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&info->net.addr; + return sprintf(buf, + "%pi6:%d\n", + &sin->sin6_addr, ntohs(sin->sin6_port)); + } else { + int i, sz = PAGE_SIZE - 2; /* 0 symbol and '\n' below */ + int size, addrlen = info->net.addr.sa_data_len; + unsigned char *a = (unsigned char *)&info->net.addr.sa_data; + char *buf_orig = buf; + + size = snprintf(buf, sz, "family: %d, addrlen: %u, addr: ", + family, addrlen); + sz -= size; + buf += size; + + for (i=0; i<addrlen; ++i) { + if (sz < 3) + break; + + size = snprintf(buf, sz, "%02x ", a[i]); + sz -= size; + buf += size; + } + buf += sprintf(buf, "\n"); + + return buf - buf_orig; + } + return 0; +} + +static struct device_attribute dst_node_attrs[] = { + __ATTR(size, 0444, dst_show_size, NULL), + __ATTR(type, 0444, dst_show_type, NULL), + __ATTR(local, 0444, dst_show_local, NULL), +}; + +static int dst_create_node_attributes(struct dst_node *n) +{ + int err, i; + + for (i=0; i<ARRAY_SIZE(dst_node_attrs); ++i) { + err = device_create_file(&n->info->device, + &dst_node_attrs[i]); + if (err) + goto err_out_remove_all; + } + return 0; + +err_out_remove_all: + while (--i >= 0) + device_remove_file(&n->info->device, + &dst_node_attrs[i]); + + return err; +} + +static void dst_remove_node_attributes(struct dst_node *n) +{ + int i; + + for (i=0; i<ARRAY_SIZE(dst_node_attrs); ++i) + device_remove_file(&n->info->device, + &dst_node_attrs[i]); +} + +/* + * Sysfs cleanup and initialization. + * Shows number of useful parameters. + */ +static void dst_node_sysfs_exit(struct dst_node *n) +{ + if (n->info) { + dst_remove_node_attributes(n); + device_unregister(&n->info->device); + n->info = NULL; + } +} + +static int dst_node_sysfs_init(struct dst_node *n) +{ + int err; + + n->info = kzalloc(sizeof(struct dst_info), GFP_KERNEL); + if (!n->info) + return -ENOMEM; + + memcpy(&n->info->device, &dst_node_dev, sizeof(struct device)); + n->info->size = n->size; + + dev_set_name(&n->info->device, "dst-%s", n->name); + err = device_register(&n->info->device); + if (err) { + dprintk(KERN_ERR "Failed to register node '%s', err: %d.\n", + n->name, err); + goto err_out_exit; + } + + dst_create_node_attributes(n); + + return 0; + +err_out_exit: + kfree(n->info); + n->info = NULL; + return err; +} + +/* + * DST node hash tables machinery. + */ +static inline unsigned int dst_hash(char *str, unsigned int size) +{ + return (jhash(str, size, 0) % dst_hashtable_size); +} + +static void dst_node_remove(struct dst_node *n) +{ + mutex_lock(&dst_hash_lock); + list_del_init(&n->node_entry); + mutex_unlock(&dst_hash_lock); +} + +static void dst_node_add(struct dst_node *n) +{ + unsigned hash = dst_hash(n->name, sizeof(n->name)); + + mutex_lock(&dst_hash_lock); + list_add_tail(&n->node_entry, &dst_hashtable[hash]); + mutex_unlock(&dst_hash_lock); +} + +/* + * Cleaning node when it is about to be freed. + * There are still users of the socket though, + * so connection cleanup should be protected. + */ +static void dst_node_cleanup(struct dst_node *n) +{ + struct dst_state *st = n->state; + + if (!st) + return; + + if (n->queue) { + blk_cleanup_queue(n->queue); + + mutex_lock(&dst_hash_lock); + idr_remove(&dst_index_idr, n->disk->first_minor); + mutex_unlock(&dst_hash_lock); + + put_disk(n->disk); + } + + if (n->bdev) { + sync_blockdev(n->bdev); + blkdev_put(n->bdev, FMODE_READ|FMODE_WRITE); + } + + dst_state_lock(st); + st->need_exit = 1; + dst_state_exit_connected(st); + dst_state_unlock(st); + + wake_up(&st->thread_wait); + + dst_state_put(st); + n->state = NULL; +} + +/* + * Free security attributes attached to given node. + */ +static void dst_security_exit(struct dst_node *n) +{ + struct dst_secure *s, *tmp; + + list_for_each_entry_safe(s, tmp, &n->security_list, sec_entry) { + list_del(&s->sec_entry); + kfree(s); + } +} + +/* + * Free node when there are no more users. + * Actually node has to be freed on behalf od userspace process, + * since there are number of threads, which are embedded in the + * node, so they can not exit and free node from there, that is + * why there is a wakeup if reference counter is not equal to zero. + */ +void dst_node_put(struct dst_node *n) +{ + if (unlikely(!n)) + return; + + dprintk("%s: n: %p, refcnt: %d.\n", + __func__, n, atomic_read(&n->refcnt)); + + if (atomic_dec_and_test(&n->refcnt)) { + dst_node_remove(n); + n->trans_scan_timeout = 0; + dst_node_cleanup(n); + thread_pool_destroy(n->pool); + dst_node_sysfs_exit(n); + dst_node_crypto_exit(n); + dst_security_exit(n); + dst_node_trans_exit(n); + + kfree(n); + + dprintk("%s: freed n: %p.\n", __func__, n); + } else { + wake_up(&n->wait); + } +} + +/* + * This function finds devices major/minor numbers for given pathname. + */ +static int dst_lookup_device(const char *path, dev_t *dev) +{ + int err; + struct nameidata nd; + struct inode *inode; + + err = path_lookup(path, LOOKUP_FOLLOW, &nd); + if (err) + return err; + + inode = nd.path.dentry->d_inode; + if (!inode) { + err = -ENOENT; + goto out; + } + + if (!S_ISBLK(inode->i_mode)) { + err = -ENOTBLK; + goto out; + } + + *dev = inode->i_rdev; + +out: + path_put(&nd.path); + return err; +} + +/* + * Setting up export device: lookup by the name, get its size + * and setup listening socket, which will accept clients, which + * will submit IO for given storage. + */ +static int dst_setup_export(struct dst_node *n, struct dst_ctl *ctl, + struct dst_export_ctl *le) +{ + int err; + dev_t dev = 0; /* gcc likes to scream here */ + + snprintf(n->info->local, sizeof(n->info->local), "%s", le->device); + + err = dst_lookup_device(le->device, &dev); + if (err) + return err; + + n->bdev = open_by_devnum(dev, FMODE_READ|FMODE_WRITE); + if (!n->bdev) + return -ENODEV; + + if (n->size != 0) + n->size = min_t(loff_t, n->bdev->bd_inode->i_size, n->size); + else + n->size = n->bdev->bd_inode->i_size; + + n->info->size = n->size; + err = dst_node_init_listened(n, le); + if (err) + goto err_out_cleanup; + + return 0; + +err_out_cleanup: + blkdev_put(n->bdev, FMODE_READ|FMODE_WRITE); + n->bdev = NULL; + + return err; +} + +/* Empty thread pool callbacks for the network processing threads. */ +static inline void *dst_thread_network_init(void *data) +{ + dprintk("%s: data: %p.\n", __func__, data); + return data; +} + +static inline void dst_thread_network_cleanup(void *data) +{ + dprintk("%s: data: %p.\n", __func__, data); +} + +/* + * Allocate DST node and initialize some of its parameters. + */ +static struct dst_node *dst_alloc_node(struct dst_ctl *ctl, + int (*start)(struct dst_node *), + int num) +{ + struct dst_node *n; + int err; + + n = kzalloc(sizeof(struct dst_node), GFP_KERNEL); + if (!n) + return NULL; + + INIT_LIST_HEAD(&n->node_entry); + + INIT_LIST_HEAD(&n->security_list); + mutex_init(&n->security_lock); + + init_waitqueue_head(&n->wait); + + n->trans_scan_timeout = msecs_to_jiffies(ctl->trans_scan_timeout); + if (!n->trans_scan_timeout) + n->trans_scan_timeout = HZ; + + n->trans_max_retries = ctl->trans_max_retries; + if (!n->trans_max_retries) + n->trans_max_retries = 10; + + /* + * Pretty much arbitrary default numbers. + * 32 matches maximum number of pages in bio originated from ext3 (31). + */ + n->max_pages = ctl->max_pages; + if (!n->max_pages) + n->max_pages = 32; + + if (n->max_pages > 1024) + n->max_pages = 1024; + + n->start = start; + n->size = ctl->size; + + atomic_set(&n->refcnt, 1); + atomic_long_set(&n->gen, 0); + snprintf(n->name, sizeof(n->name), "%s", ctl->name); + + err = dst_node_sysfs_init(n); + if (err) + goto err_out_free; + + n->pool = thread_pool_create(num, n->name, dst_thread_network_init, + dst_thread_network_cleanup, n); + if (IS_ERR(n->pool)) { + err = PTR_ERR(n->pool); + goto err_out_sysfs_exit; + } + + dprintk("%s: n: %p, name: %s.\n", __func__, n, n->name); + + return n; + +err_out_sysfs_exit: + dst_node_sysfs_exit(n); +err_out_free: + kfree(n); + return NULL; +} + +/* + * Starting a node, connected to the remote server: + * register block device and initialize transaction mechanism. + * In revers order though. + * + * It will autonegotiate some parameters with the remote node + * and update local if needed. + * + * Transaction initialization should be the last thing before + * starting the node, since transaction should include not only + * block IO, but also crypto related data (if any), which are + * initialized separately. + */ +static int dst_start_remote(struct dst_node *n) +{ + int err; + + err = dst_node_trans_init(n, sizeof(struct dst_trans)); + if (err) + return err; + + err = dst_node_create_disk(n); + if (err) + return err; + + dst_node_set_size(n); + add_disk(n->disk); + + dprintk("DST: started remote node '%s', minor: %d.\n", n->name, n->disk->first_minor); + + return 0; +} + +/* + * Adding remote node and initialize connection. + */ +static int dst_add_remote(struct dst_node *n, struct dst_ctl *ctl, + void *data, unsigned int size) +{ + int err; + struct dst_network_ctl *rctl = data; + + if (n) + return -EEXIST; + + if (size != sizeof(struct dst_network_ctl)) + return -EINVAL; + + n = dst_alloc_node(ctl, dst_start_remote, 1); + if (!n) + return -ENOMEM; + + memcpy(&n->info->net, rctl, sizeof(struct dst_network_ctl)); + err = dst_node_init_connected(n, rctl); + if (err) + goto err_out_free; + + dst_node_add(n); + + return 0; + +err_out_free: + dst_node_put(n); + return err; +} + +/* + * Adding export node: initializing block device and listening socket. + */ +static int dst_add_export(struct dst_node *n, struct dst_ctl *ctl, + void *data, unsigned int size) +{ + int err; + struct dst_export_ctl *le = data; + + if (n) + return -EEXIST; + + if (size != sizeof(struct dst_export_ctl)) + return -EINVAL; + + n = dst_alloc_node(ctl, dst_start_export, 2); + if (!n) + return -EINVAL; + + err = dst_setup_export(n, ctl, le); + if (err) + goto err_out_free; + + dst_node_add(n); + + return 0; + +err_out_free: + dst_node_put(n); + return err; +} + +static int dst_node_remove_unload(struct dst_node *n) +{ + printk(KERN_INFO "STOPPED name: '%s', size: %llu.\n", + n->name, n->size); + + if (n->disk) + del_gendisk(n->disk); + + dst_node_remove(n); + dst_node_sysfs_exit(n); + + /* + * This is not a hack. Really. + * Node's reference counter allows to implement fine grained + * node freeing, but since all transactions (which hold node's + * reference counter) are processed in the dedicated thread, + * it is possible that reference will hit zero in that thread, + * so we will not be able to exit thread and cleanup the node. + * + * So, we remove disk, so no new activity is possible, and + * wait until all pending transaction are completed (either + * in receiving thread or by timeout in workqueue), in this + * case reference counter will be less or equal to 2 (once set in + * dst_alloc_node() and then in connector message parser; + * or when we force module unloading, and connector message + * parser does not hold a reference, in this case reference + * counter will be equal to 1), + * and subsequent dst_node_put() calls will free the node. + */ + dprintk("%s: going to sleep with %d refcnt.\n", __func__, atomic_read(&n->refcnt)); + wait_event(n->wait, atomic_read(&n->refcnt) <= 2); + + dst_node_put(n); + return 0; +} + +/* + * Remove node from the hash table. + */ +static int dst_del_node(struct dst_node *n, struct dst_ctl *ctl, + void *data, unsigned int size) +{ + if (!n) + return -ENODEV; + + return dst_node_remove_unload(n); +} + +/* + * Initialize crypto processing for given node. + */ +static int dst_crypto_init(struct dst_node *n, struct dst_ctl *ctl, + void *data, unsigned int size) +{ + struct dst_crypto_ctl *crypto = data; + + if (!n) + return -ENODEV; + + if (size != sizeof(struct dst_crypto_ctl) + crypto->hash_keysize + + crypto->cipher_keysize) + return -EINVAL; + + if (n->trans_cache) + return -EEXIST; + + return dst_node_crypto_init(n, crypto); +} + +/* + * Security attributes for given node. + */ +static int dst_security_init(struct dst_node *n, struct dst_ctl *ctl, + void *data, unsigned int size) +{ + struct dst_secure *s; + + if (!n) + return -ENODEV; + + if (size != sizeof(struct dst_secure_user)) + return -EINVAL; + + s = kmalloc(sizeof(struct dst_secure), GFP_KERNEL); + if (!s) + return -ENOMEM; + + memcpy(&s->sec, data, size); + + mutex_lock(&n->security_lock); + list_add_tail(&s->sec_entry, &n->security_list); + mutex_unlock(&n->security_lock); + + return 0; +} + +/* + * Kill'em all! + */ +static int dst_start_node(struct dst_node *n, struct dst_ctl *ctl, + void *data, unsigned int size) +{ + int err; + + if (!n) + return -ENODEV; + + if (n->trans_cache) + return 0; + + err = n->start(n); + if (err) + return err; + + printk(KERN_INFO "STARTED name: '%s', size: %llu.\n", n->name, n->size); + return 0; +} + +typedef int (*dst_command_func)(struct dst_node *n, struct dst_ctl *ctl, + void *data, unsigned int size); + +/* + * List of userspace commands. + */ +static dst_command_func dst_commands[] = { + [DST_ADD_REMOTE] = &dst_add_remote, + [DST_ADD_EXPORT] = &dst_add_export, + [DST_DEL_NODE] = &dst_del_node, + [DST_CRYPTO] = &dst_crypto_init, + [DST_SECURITY] = &dst_security_init, + [DST_START] = &dst_start_node, +}; + +/* + * Configuration parser. + */ +static void cn_dst_callback(void *data) +{ + struct dst_ctl *ctl; + struct cn_msg *msg = data; + int err; + struct dst_ctl_ack ack; + struct dst_node *n = NULL, *tmp; + unsigned int hash; + + if (msg->len < sizeof(struct dst_ctl)) { + err = -EBADMSG; + goto out; + } + + ctl = (struct dst_ctl *)msg->data; + + if (ctl->cmd >= DST_CMD_MAX) { + err = -EINVAL; + goto out; + } + hash = dst_hash(ctl->name, sizeof(ctl->name)); + + mutex_lock(&dst_hash_lock); + list_for_each_entry(tmp, &dst_hashtable[hash], node_entry) { + if (!memcmp(tmp->name, ctl->name, sizeof(tmp->name))) { + n = tmp; + dst_node_get(n); + break; + } + } + mutex_unlock(&dst_hash_lock); + + err = dst_commands[ctl->cmd](n, ctl, msg->data + sizeof(struct dst_ctl), + msg->len - sizeof(struct dst_ctl)); + + dst_node_put(n); +out: + memcpy(&ack.msg, msg, sizeof(struct cn_msg)); + + ack.msg.ack = msg->ack + 1; + ack.msg.len = sizeof(struct dst_ctl_ack) - sizeof(struct cn_msg); + + ack.error = err; + + cn_netlink_send(&ack.msg, 0, GFP_KERNEL); +} + +/* + * Global initialization: sysfs, hash table, block device registration, + * connector and various caches. + */ +static int __init dst_sysfs_init(void) +{ + return bus_register(&dst_dev_bus_type); +} + +static void dst_sysfs_exit(void) +{ + bus_unregister(&dst_dev_bus_type); +} + +static int __init dst_hashtable_init(void) +{ + unsigned int i; + + dst_hashtable = kcalloc(dst_hashtable_size, sizeof(struct list_head), + GFP_KERNEL); + if (!dst_hashtable) + return -ENOMEM; + + for (i=0; i<dst_hashtable_size; ++i) + INIT_LIST_HEAD(&dst_hashtable[i]); + + return 0; +} + +static void dst_hashtable_exit(void) +{ + unsigned int i; + struct dst_node *n, *tmp; + + for (i=0; i<dst_hashtable_size; ++i) { + list_for_each_entry_safe(n, tmp, &dst_hashtable[i], node_entry) { + dst_node_remove_unload(n); + } + } + + kfree(dst_hashtable); +} + +static int __init dst_sys_init(void) +{ + int err = -ENOMEM; + + err = dst_hashtable_init(); + if (err) + goto err_out_exit; + + err = dst_export_init(); + if (err) + goto err_out_hashtable_exit; + + err = register_blkdev(dst_major, DST_NAME); + if (err < 0) + goto err_out_export_exit; + if (err) + dst_major = err; + + err = dst_sysfs_init(); + if (err) + goto err_out_unregister; + + err = cn_add_callback(&cn_dst_id, "DST", cn_dst_callback); + if (err) + goto err_out_sysfs_exit; + + printk(KERN_INFO "Distributed storage, '%s' release.\n", dst_name); + + return 0; + +err_out_sysfs_exit: + dst_sysfs_exit(); +err_out_unregister: + unregister_blkdev(dst_major, DST_NAME); +err_out_export_exit: + dst_export_exit(); +err_out_hashtable_exit: + dst_hashtable_exit(); +err_out_exit: + return err; +} + +static void __exit dst_sys_exit(void) +{ + cn_del_callback(&cn_dst_id); + unregister_blkdev(dst_major, DST_NAME); + dst_hashtable_exit(); + dst_sysfs_exit(); + dst_export_exit(); +} + +module_init(dst_sys_init); +module_exit(dst_sys_exit); + +MODULE_DESCRIPTION("Distributed storage"); +MODULE_AUTHOR("Evgeniy Polyakov <zbr@ioremap.net>"); +MODULE_LICENSE("GPL"); diff --git a/drivers/staging/dst/export.c b/drivers/staging/dst/export.c new file mode 100644 index 00000000000..80ae4ebe610 --- /dev/null +++ b/drivers/staging/dst/export.c @@ -0,0 +1,657 @@ +/* + * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include <linux/blkdev.h> +#include <linux/bio.h> +#include <linux/dst.h> +#include <linux/in.h> +#include <linux/in6.h> +#include <linux/poll.h> +#include <linux/slab.h> +#include <linux/socket.h> + +#include <net/sock.h> + +/* + * Export bioset is used for server block IO requests. + */ +static struct bio_set *dst_bio_set; + +int __init dst_export_init(void) +{ + int err = -ENOMEM; + + dst_bio_set = bioset_create(32, sizeof(struct dst_export_priv)); + if (!dst_bio_set) + goto err_out_exit; + + return 0; + +err_out_exit: + return err; +} + +void dst_export_exit(void) +{ + bioset_free(dst_bio_set); +} + +/* + * When client connects and autonegotiates with the server node, + * its permissions are checked in a security attributes and sent + * back. + */ +static unsigned int dst_check_permissions(struct dst_state *main, struct dst_state *st) +{ + struct dst_node *n = main->node; + struct dst_secure *sentry; + struct dst_secure_user *s; + struct saddr *sa = &st->ctl.addr; + unsigned int perm = 0; + + mutex_lock(&n->security_lock); + list_for_each_entry(sentry, &n->security_list, sec_entry) { + s = &sentry->sec; + + if (s->addr.sa_family != sa->sa_family) + continue; + + if (s->addr.sa_data_len != sa->sa_data_len) + continue; + + /* + * This '2' below is a port field. This may be very wrong to do + * in atalk for example though. If there will be any need to extent + * protocol to something else, I can create per-family helpers and + * use them instead of this memcmp. + */ + if (memcmp(s->addr.sa_data + 2, sa->sa_data + 2, + sa->sa_data_len - 2)) + continue; + + perm = s->permissions; + } + mutex_unlock(&n->security_lock); + + return perm; +} + +/* + * Accept new client: allocate appropriate network state and check permissions. + */ +static struct dst_state *dst_accept_client(struct dst_state *st) +{ + unsigned int revents = 0; + unsigned int err_mask = POLLERR | POLLHUP | POLLRDHUP; + unsigned int mask = err_mask | POLLIN; + struct dst_node *n = st->node; + int err = 0; + struct socket *sock = NULL; + struct dst_state *new; + + while (!err && !sock) { + revents = dst_state_poll(st); + + if (!(revents & mask)) { + DEFINE_WAIT(wait); + + for (;;) { + prepare_to_wait(&st->thread_wait, + &wait, TASK_INTERRUPTIBLE); + if (!n->trans_scan_timeout || st->need_exit) + break; + + revents = dst_state_poll(st); + + if (revents & mask) + break; + + if (signal_pending(current)) + break; + + /* + * Magic HZ? Polling check above is not safe in + * all cases (like socket reset in BH context), + * so it is simpler just to postpone it to the + * process context instead of implementing special + * locking there. + */ + schedule_timeout(HZ); + } + finish_wait(&st->thread_wait, &wait); + } + + err = -ECONNRESET; + dst_state_lock(st); + + dprintk("%s: st: %p, revents: %x [err: %d, in: %d].\n", + __func__, st, revents, revents & err_mask, + revents & POLLIN); + + if (revents & err_mask) { + dprintk("%s: revents: %x, socket: %p, err: %d.\n", + __func__, revents, st->socket, err); + err = -ECONNRESET; + } + + if (!n->trans_scan_timeout || st->need_exit) + err = -ENODEV; + + if (st->socket && (revents & POLLIN)) + err = kernel_accept(st->socket, &sock, 0); + + dst_state_unlock(st); + } + + if (err) + goto err_out_exit; + + new = dst_state_alloc(st->node); + if (!new) { + err = -ENOMEM; + goto err_out_release; + } + new->socket = sock; + + new->ctl.addr.sa_data_len = sizeof(struct sockaddr); + err = kernel_getpeername(sock, (struct sockaddr *)&new->ctl.addr, + (int *)&new->ctl.addr.sa_data_len); + if (err) + goto err_out_put; + + new->permissions = dst_check_permissions(st, new); + if (new->permissions == 0) { + err = -EPERM; + dst_dump_addr(sock, (struct sockaddr *)&new->ctl.addr, + "Client is not allowed to connect"); + goto err_out_put; + } + + err = dst_poll_init(new); + if (err) + goto err_out_put; + + dst_dump_addr(sock, (struct sockaddr *)&new->ctl.addr, + "Connected client"); + + return new; + +err_out_put: + dst_state_put(new); +err_out_release: + sock_release(sock); +err_out_exit: + return ERR_PTR(err); +} + +/* + * Each server's block request sometime finishes. + * Usually it happens in hard irq context of the appropriate controller, + * so to play good with all cases we just queue BIO into the queue + * and wake up processing thread, which gets completed request and + * send (encrypting if needed) it back to the client (if it was a read + * request), or sends back reply that writing succesfully completed. + */ +static int dst_export_process_request_queue(struct dst_state *st) +{ + unsigned long flags; + struct dst_export_priv *p = NULL; + struct bio *bio; + int err = 0; + + while (!list_empty(&st->request_list)) { + spin_lock_irqsave(&st->request_lock, flags); + if (!list_empty(&st->request_list)) { + p = list_first_entry(&st->request_list, + struct dst_export_priv, request_entry); + list_del(&p->request_entry); + } + spin_unlock_irqrestore(&st->request_lock, flags); + + if (!p) + break; + + bio = p->bio; + + if (dst_need_crypto(st->node) && (bio_data_dir(bio) == READ)) + err = dst_export_crypto(st->node, bio); + else + err = dst_export_send_bio(bio); + + if (err) + break; + } + + return err; +} + +/* + * Cleanup export state. + * It has to wait until all requests are finished, + * and then free them all. + */ +static void dst_state_cleanup_export(struct dst_state *st) +{ + struct dst_export_priv *p; + unsigned long flags; + + /* + * This loop waits for all pending bios to be completed and freed. + */ + while (atomic_read(&st->refcnt) > 1) { + dprintk("%s: st: %p, refcnt: %d, list_empty: %d.\n", + __func__, st, atomic_read(&st->refcnt), + list_empty(&st->request_list)); + wait_event_timeout(st->thread_wait, + (atomic_read(&st->refcnt) == 1) || + !list_empty(&st->request_list), + HZ/2); + + while (!list_empty(&st->request_list)) { + p = NULL; + spin_lock_irqsave(&st->request_lock, flags); + if (!list_empty(&st->request_list)) { + p = list_first_entry(&st->request_list, + struct dst_export_priv, request_entry); + list_del(&p->request_entry); + } + spin_unlock_irqrestore(&st->request_lock, flags); + + if (p) + bio_put(p->bio); + + dprintk("%s: st: %p, refcnt: %d, list_empty: %d, p: %p.\n", + __func__, st, atomic_read(&st->refcnt), + list_empty(&st->request_list), p); + } + } + + dst_state_put(st); +} + +/* + * Client accepting thread. + * Not only accepts new connection, but also schedules receiving thread + * and performs request completion described above. + */ +static int dst_accept(void *init_data, void *schedule_data) +{ + struct dst_state *main_st = schedule_data; + struct dst_node *n = init_data; + struct dst_state *st; + int err; + + while (n->trans_scan_timeout && !main_st->need_exit) { + dprintk("%s: main_st: %p, n: %p.\n", __func__, main_st, n); + st = dst_accept_client(main_st); + if (IS_ERR(st)) + continue; + + err = dst_state_schedule_receiver(st); + if (!err) { + while (n->trans_scan_timeout) { + err = wait_event_interruptible_timeout(st->thread_wait, + !list_empty(&st->request_list) || + !n->trans_scan_timeout || + st->need_exit, + HZ); + + if (!n->trans_scan_timeout || st->need_exit) + break; + + if (list_empty(&st->request_list)) + continue; + + err = dst_export_process_request_queue(st); + if (err) + break; + } + + st->need_exit = 1; + wake_up(&st->thread_wait); + } + + dst_state_cleanup_export(st); + } + + dprintk("%s: freeing listening socket st: %p.\n", __func__, main_st); + + dst_state_lock(main_st); + dst_poll_exit(main_st); + dst_state_socket_release(main_st); + dst_state_unlock(main_st); + dst_state_put(main_st); + dprintk("%s: freed listening socket st: %p.\n", __func__, main_st); + + return 0; +} + +int dst_start_export(struct dst_node *n) +{ + if (list_empty(&n->security_list)) { + printk(KERN_ERR "You are trying to export node '%s' without security attributes.\n" + "No clients will be allowed to connect. Exiting.\n", n->name); + return -EINVAL; + } + return dst_node_trans_init(n, sizeof(struct dst_export_priv)); +} + +/* + * Initialize listening state and schedule accepting thread. + */ +int dst_node_init_listened(struct dst_node *n, struct dst_export_ctl *le) +{ + struct dst_state *st; + int err = -ENOMEM; + struct dst_network_ctl *ctl = &le->ctl; + + memcpy(&n->info->net, ctl, sizeof(struct dst_network_ctl)); + + st = dst_state_alloc(n); + if (IS_ERR(st)) { + err = PTR_ERR(st); + goto err_out_exit; + } + memcpy(&st->ctl, ctl, sizeof(struct dst_network_ctl)); + + err = dst_state_socket_create(st); + if (err) + goto err_out_put; + + st->socket->sk->sk_reuse = 1; + + err = kernel_bind(st->socket, (struct sockaddr *)&ctl->addr, + ctl->addr.sa_data_len); + if (err) + goto err_out_socket_release; + + err = kernel_listen(st->socket, 1024); + if (err) + goto err_out_socket_release; + n->state = st; + + err = dst_poll_init(st); + if (err) + goto err_out_socket_release; + + dst_state_get(st); + + err = thread_pool_schedule(n->pool, dst_thread_setup, + dst_accept, st, MAX_SCHEDULE_TIMEOUT); + if (err) + goto err_out_poll_exit; + + return 0; + +err_out_poll_exit: + dst_poll_exit(st); +err_out_socket_release: + dst_state_socket_release(st); +err_out_put: + dst_state_put(st); +err_out_exit: + n->state = NULL; + return err; +} + +/* + * Free bio and related private data. + * Also drop a reference counter for appropriate state, + * which waits when there are no more block IOs in-flight. + */ +static void dst_bio_destructor(struct bio *bio) +{ + struct bio_vec *bv; + struct dst_export_priv *priv = bio->bi_private; + int i; + + bio_for_each_segment(bv, bio, i) { + if (!bv->bv_page) + break; + + __free_page(bv->bv_page); + } + + if (priv) + dst_state_put(priv->state); + bio_free(bio, dst_bio_set); +} + +/* + * Block IO completion. Queue request to be sent back to + * the client (or just confirmation). + */ +static void dst_bio_end_io(struct bio *bio, int err) +{ + struct dst_export_priv *p = bio->bi_private; + struct dst_state *st = p->state; + unsigned long flags; + + spin_lock_irqsave(&st->request_lock, flags); + list_add_tail(&p->request_entry, &st->request_list); + spin_unlock_irqrestore(&st->request_lock, flags); + + wake_up(&st->thread_wait); +} + +/* + * Allocate read request for the server. + */ +static int dst_export_read_request(struct bio *bio, unsigned int total_size) +{ + unsigned int size; + struct page *page; + int err; + + while (total_size) { + err = -ENOMEM; + page = alloc_page(GFP_KERNEL); + if (!page) + goto err_out_exit; + + size = min_t(unsigned int, PAGE_SIZE, total_size); + + err = bio_add_page(bio, page, size, 0); + dprintk("%s: bio: %llu/%u, size: %u, err: %d.\n", + __func__, (u64)bio->bi_sector, bio->bi_size, + size, err); + if (err <= 0) + goto err_out_free_page; + + total_size -= size; + } + + return 0; + +err_out_free_page: + __free_page(page); +err_out_exit: + return err; +} + +/* + * Allocate write request for the server. + * Should not only get pages, but also read data from the network. + */ +static int dst_export_write_request(struct dst_state *st, + struct bio *bio, unsigned int total_size) +{ + unsigned int size; + struct page *page; + void *data; + int err; + + while (total_size) { + err = -ENOMEM; + page = alloc_page(GFP_KERNEL); + if (!page) + goto err_out_exit; + + data = kmap(page); + if (!data) + goto err_out_free_page; + + size = min_t(unsigned int, PAGE_SIZE, total_size); + + err = dst_data_recv(st, data, size); + if (err) + goto err_out_unmap_page; + + err = bio_add_page(bio, page, size, 0); + if (err <= 0) + goto err_out_unmap_page; + + kunmap(page); + + total_size -= size; + } + + return 0; + +err_out_unmap_page: + kunmap(page); +err_out_free_page: + __free_page(page); +err_out_exit: + return err; +} + +/* + * Groovy, we've gotten an IO request from the client. + * Allocate BIO from the bioset, private data from the mempool + * and lots of pages for IO. + */ +int dst_process_io(struct dst_state *st) +{ + struct dst_node *n = st->node; + struct dst_cmd *cmd = st->data; + struct bio *bio; + struct dst_export_priv *priv; + int err = -ENOMEM; + + if (unlikely(!n->bdev)) { + err = -EINVAL; + goto err_out_exit; + } + + bio = bio_alloc_bioset(GFP_KERNEL, + PAGE_ALIGN(cmd->size) >> PAGE_SHIFT, + dst_bio_set); + if (!bio) + goto err_out_exit; + + priv = (struct dst_export_priv *)(((void *)bio) - sizeof (struct dst_export_priv)); + + priv->state = dst_state_get(st); + priv->bio = bio; + + bio->bi_private = priv; + bio->bi_end_io = dst_bio_end_io; + bio->bi_destructor = dst_bio_destructor; + bio->bi_bdev = n->bdev; + + /* + * Server side is only interested in two low bits: + * uptodate (set by itself actually) and rw block + */ + bio->bi_flags |= cmd->flags & 3; + + bio->bi_rw = cmd->rw; + bio->bi_size = 0; + bio->bi_sector = cmd->sector; + + dst_bio_to_cmd(bio, &priv->cmd, DST_IO_RESPONSE, cmd->id); + + priv->cmd.flags = 0; + priv->cmd.size = cmd->size; + + if (bio_data_dir(bio) == WRITE) { + err = dst_recv_cdata(st, priv->cmd.hash); + if (err) + goto err_out_free; + + err = dst_export_write_request(st, bio, cmd->size); + if (err) + goto err_out_free; + + if (dst_need_crypto(n)) + return dst_export_crypto(n, bio); + } else { + err = dst_export_read_request(bio, cmd->size); + if (err) + goto err_out_free; + } + + dprintk("%s: bio: %llu/%u, rw: %lu, dir: %lu, flags: %lx, phys: %d.\n", + __func__, (u64)bio->bi_sector, bio->bi_size, + bio->bi_rw, bio_data_dir(bio), + bio->bi_flags, bio->bi_phys_segments); + + generic_make_request(bio); + + return 0; + +err_out_free: + bio_put(bio); +err_out_exit: + return err; +} + +/* + * Ok, block IO is ready, let's send it back to the client... + */ +int dst_export_send_bio(struct bio *bio) +{ + struct dst_export_priv *p = bio->bi_private; + struct dst_state *st = p->state; + struct dst_cmd *cmd = &p->cmd; + int err; + + dprintk("%s: id: %llu, bio: %llu/%u, csize: %u, flags: %lu, rw: %lu.\n", + __func__, cmd->id, (u64)bio->bi_sector, bio->bi_size, + cmd->csize, bio->bi_flags, bio->bi_rw); + + dst_convert_cmd(cmd); + + dst_state_lock(st); + if (!st->socket) { + err = -ECONNRESET; + goto err_out_unlock; + } + + if (bio_data_dir(bio) == WRITE) { + /* ... or just confirmation that writing has completed. */ + cmd->size = cmd->csize = 0; + err = dst_data_send_header(st->socket, cmd, + sizeof(struct dst_cmd), 0); + if (err) + goto err_out_unlock; + } else { + err = dst_send_bio(st, cmd, bio); + if (err) + goto err_out_unlock; + } + + dst_state_unlock(st); + + bio_put(bio); + return 0; + +err_out_unlock: + dst_state_unlock(st); + + bio_put(bio); + return err; +} diff --git a/drivers/staging/dst/state.c b/drivers/staging/dst/state.c new file mode 100644 index 00000000000..d057e52f3b6 --- /dev/null +++ b/drivers/staging/dst/state.c @@ -0,0 +1,839 @@ +/* + * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include <linux/buffer_head.h> +#include <linux/blkdev.h> +#include <linux/bio.h> +#include <linux/connector.h> +#include <linux/dst.h> +#include <linux/device.h> +#include <linux/in.h> +#include <linux/in6.h> +#include <linux/socket.h> +#include <linux/slab.h> + +#include <net/sock.h> + +/* + * Polling machinery. + */ + +struct dst_poll_helper +{ + poll_table pt; + struct dst_state *st; +}; + +static int dst_queue_wake(wait_queue_t *wait, unsigned mode, int sync, void *key) +{ + struct dst_state *st = container_of(wait, struct dst_state, wait); + + wake_up(&st->thread_wait); + return 1; +} + +static void dst_queue_func(struct file *file, wait_queue_head_t *whead, + poll_table *pt) +{ + struct dst_state *st = container_of(pt, struct dst_poll_helper, pt)->st; + + st->whead = whead; + init_waitqueue_func_entry(&st->wait, dst_queue_wake); + add_wait_queue(whead, &st->wait); +} + +void dst_poll_exit(struct dst_state *st) +{ + if (st->whead) { + remove_wait_queue(st->whead, &st->wait); + st->whead = NULL; + } +} + +int dst_poll_init(struct dst_state *st) +{ + struct dst_poll_helper ph; + + ph.st = st; + init_poll_funcptr(&ph.pt, &dst_queue_func); + + st->socket->ops->poll(NULL, st->socket, &ph.pt); + return 0; +} + +/* + * Header receiving function - may block. + */ +static int dst_data_recv_header(struct socket *sock, + void *data, unsigned int size, int block) +{ + struct msghdr msg; + struct kvec iov; + int err; + + iov.iov_base = data; + iov.iov_len = size; + + msg.msg_iov = (struct iovec *)&iov; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = (block)?MSG_WAITALL:MSG_DONTWAIT; + + err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len, + msg.msg_flags); + if (err != size) + return -1; + + return 0; +} + +/* + * Header sending function - may block. + */ +int dst_data_send_header(struct socket *sock, + void *data, unsigned int size, int more) +{ + struct msghdr msg; + struct kvec iov; + int err; + + iov.iov_base = data; + iov.iov_len = size; + + msg.msg_iov = (struct iovec *)&iov; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = MSG_WAITALL | (more)?MSG_MORE:0; + + err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); + if (err != size) { + dprintk("%s: size: %u, more: %d, err: %d.\n", + __func__, size, more, err); + return -1; + } + + return 0; +} + +/* + * Block autoconfiguration: request size of the storage and permissions. + */ +static int dst_request_remote_config(struct dst_state *st) +{ + struct dst_node *n = st->node; + int err = -EINVAL; + struct dst_cmd *cmd = st->data; + + memset(cmd, 0, sizeof(struct dst_cmd)); + cmd->cmd = DST_CFG; + + dst_convert_cmd(cmd); + + err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0); + if (err) + goto out; + + err = dst_data_recv_header(st->socket, cmd, sizeof(struct dst_cmd), 1); + if (err) + goto out; + + dst_convert_cmd(cmd); + + if (cmd->cmd != DST_CFG) { + err = -EINVAL; + dprintk("%s: checking result: cmd: %d, size reported: %llu.\n", + __func__, cmd->cmd, cmd->sector); + goto out; + } + + if (n->size != 0) + n->size = min_t(loff_t, n->size, cmd->sector); + else + n->size = cmd->sector; + + n->info->size = n->size; + st->permissions = cmd->rw; + +out: + dprintk("%s: n: %p, err: %d, size: %llu, permission: %x.\n", + __func__, n, err, n->size, st->permissions); + return err; +} + +/* + * Socket machinery. + */ + +#define DST_DEFAULT_TIMEO 20000 + +int dst_state_socket_create(struct dst_state *st) +{ + int err; + struct socket *sock; + struct dst_network_ctl *ctl = &st->ctl; + + err = sock_create(ctl->addr.sa_family, ctl->type, ctl->proto, &sock); + if (err < 0) + return err; + + sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo = + msecs_to_jiffies(DST_DEFAULT_TIMEO); + sock->sk->sk_allocation = GFP_NOIO; + + st->socket = st->read_socket = sock; + return 0; +} + +void dst_state_socket_release(struct dst_state *st) +{ + dprintk("%s: st: %p, socket: %p, n: %p.\n", + __func__, st, st->socket, st->node); + if (st->socket) { + sock_release(st->socket); + st->socket = NULL; + st->read_socket = NULL; + } +} + +void dst_dump_addr(struct socket *sk, struct sockaddr *sa, char *str) +{ + if (sk->ops->family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in *)sa; + printk(KERN_INFO "%s %u.%u.%u.%u:%d.\n", + str, NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port)); + } else if (sk->ops->family == AF_INET6) { + struct sockaddr_in6 *sin = (struct sockaddr_in6 *)sa; + printk(KERN_INFO "%s %pi6:%d", + str, &sin->sin6_addr, ntohs(sin->sin6_port)); + } +} + +void dst_state_exit_connected(struct dst_state *st) +{ + if (st->socket) { + dst_poll_exit(st); + st->socket->ops->shutdown(st->socket, 2); + + dst_dump_addr(st->socket, (struct sockaddr *)&st->ctl.addr, + "Disconnected peer"); + dst_state_socket_release(st); + } +} + +static int dst_state_init_connected(struct dst_state *st) +{ + int err; + struct dst_network_ctl *ctl = &st->ctl; + + err = dst_state_socket_create(st); + if (err) + goto err_out_exit; + + err = kernel_connect(st->socket, (struct sockaddr *)&st->ctl.addr, + st->ctl.addr.sa_data_len, 0); + if (err) + goto err_out_release; + + err = dst_poll_init(st); + if (err) + goto err_out_release; + + dst_dump_addr(st->socket, (struct sockaddr *)&ctl->addr, + "Connected to peer"); + + return 0; + +err_out_release: + dst_state_socket_release(st); +err_out_exit: + return err; +} + +/* + * State reset is used to reconnect to the remote peer. + * May fail, but who cares, we will try again later. + */ +static void inline dst_state_reset_nolock(struct dst_state *st) +{ + dst_state_exit_connected(st); + dst_state_init_connected(st); +} + +static void inline dst_state_reset(struct dst_state *st) +{ + dst_state_lock(st); + dst_state_reset_nolock(st); + dst_state_unlock(st); +} + +/* + * Basic network sending/receiving functions. + * Blocked mode is used. + */ +static int dst_data_recv_raw(struct dst_state *st, void *buf, u64 size) +{ + struct msghdr msg; + struct kvec iov; + int err; + + BUG_ON(!size); + + iov.iov_base = buf; + iov.iov_len = size; + + msg.msg_iov = (struct iovec *)&iov; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = MSG_DONTWAIT; + + err = kernel_recvmsg(st->socket, &msg, &iov, 1, iov.iov_len, + msg.msg_flags); + if (err <= 0) { + dprintk("%s: failed to recv data: size: %llu, err: %d.\n", + __func__, size, err); + if (err == 0) + err = -ECONNRESET; + + dst_state_exit_connected(st); + } + + return err; +} + +/* + * Ping command to early detect failed nodes. + */ +static int dst_send_ping(struct dst_state *st) +{ + struct dst_cmd *cmd = st->data; + int err = -ECONNRESET; + + dst_state_lock(st); + if (st->socket) { + memset(cmd, 0, sizeof(struct dst_cmd)); + + cmd->cmd = __cpu_to_be32(DST_PING); + + err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0); + } + dprintk("%s: st: %p, socket: %p, err: %d.\n", __func__, st, st->socket, err); + dst_state_unlock(st); + + return err; +} + +/* + * Receiving function, which should either return error or read + * whole block request. If there was no traffic for a one second, + * send a ping, since remote node may die. + */ +int dst_data_recv(struct dst_state *st, void *data, unsigned int size) +{ + unsigned int revents = 0; + unsigned int err_mask = POLLERR | POLLHUP | POLLRDHUP; + unsigned int mask = err_mask | POLLIN; + struct dst_node *n = st->node; + int err = 0; + + while (size && !err) { + revents = dst_state_poll(st); + + if (!(revents & mask)) { + DEFINE_WAIT(wait); + + for (;;) { + prepare_to_wait(&st->thread_wait, &wait, + TASK_INTERRUPTIBLE); + if (!n->trans_scan_timeout || st->need_exit) + break; + + revents = dst_state_poll(st); + + if (revents & mask) + break; + + if (signal_pending(current)) + break; + + if (!schedule_timeout(HZ)) { + err = dst_send_ping(st); + if (err) + return err; + } + + continue; + } + finish_wait(&st->thread_wait, &wait); + } + + err = -ECONNRESET; + dst_state_lock(st); + + if ( st->socket && + (st->read_socket == st->socket) && + (revents & POLLIN)) { + err = dst_data_recv_raw(st, data, size); + if (err > 0) { + data += err; + size -= err; + err = 0; + } + } + + if (revents & err_mask || !st->socket) { + dprintk("%s: revents: %x, socket: %p, size: %u, err: %d.\n", + __func__, revents, st->socket, size, err); + err = -ECONNRESET; + } + + dst_state_unlock(st); + + if (!n->trans_scan_timeout) + err = -ENODEV; + } + + return err; +} + +/* + * Send block autoconf reply. + */ +static int dst_process_cfg(struct dst_state *st) +{ + struct dst_node *n = st->node; + struct dst_cmd *cmd = st->data; + int err; + + cmd->sector = n->size; + cmd->rw = st->permissions; + + dst_convert_cmd(cmd); + + dst_state_lock(st); + err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0); + dst_state_unlock(st); + + return err; +} + +/* + * Receive block IO from the network. + */ +static int dst_recv_bio(struct dst_state *st, struct bio *bio, unsigned int total_size) +{ + struct bio_vec *bv; + int i, err; + void *data; + unsigned int sz; + + bio_for_each_segment(bv, bio, i) { + sz = min(total_size, bv->bv_len); + + dprintk("%s: bio: %llu/%u, total: %u, len: %u, sz: %u, off: %u.\n", + __func__, (u64)bio->bi_sector, bio->bi_size, total_size, + bv->bv_len, sz, bv->bv_offset); + + data = kmap(bv->bv_page) + bv->bv_offset; + err = dst_data_recv(st, data, sz); + kunmap(bv->bv_page); + + bv->bv_len = sz; + + if (err) + return err; + + total_size -= sz; + if (total_size == 0) + break; + } + + return 0; +} + +/* + * Our block IO has just completed and arrived: get it. + */ +static int dst_process_io_response(struct dst_state *st) +{ + struct dst_node *n = st->node; + struct dst_cmd *cmd = st->data; + struct dst_trans *t; + int err = 0; + struct bio *bio; + + mutex_lock(&n->trans_lock); + t = dst_trans_search(n, cmd->id); + mutex_unlock(&n->trans_lock); + + if (!t) + goto err_out_exit; + + bio = t->bio; + + dprintk("%s: bio: %llu/%u, cmd_size: %u, csize: %u, dir: %lu.\n", + __func__, (u64)bio->bi_sector, bio->bi_size, cmd->size, + cmd->csize, bio_data_dir(bio)); + + if (bio_data_dir(bio) == READ) { + if (bio->bi_size != cmd->size - cmd->csize) + goto err_out_exit; + + if (dst_need_crypto(n)) { + err = dst_recv_cdata(st, t->cmd.hash); + if (err) + goto err_out_exit; + } + + err = dst_recv_bio(st, t->bio, bio->bi_size); + if (err) + goto err_out_exit; + + if (dst_need_crypto(n)) + return dst_trans_crypto(t); + } else { + err = -EBADMSG; + if (cmd->size || cmd->csize) + goto err_out_exit; + } + + dst_trans_remove(t); + dst_trans_put(t); + + return 0; + +err_out_exit: + return err; +} + +/* + * Receive crypto data. + */ +int dst_recv_cdata(struct dst_state *st, void *cdata) +{ + struct dst_cmd *cmd = st->data; + struct dst_node *n = st->node; + struct dst_crypto_ctl *c = &n->crypto; + int err; + + if (cmd->csize != c->crypto_attached_size) { + dprintk("%s: cmd: cmd: %u, sector: %llu, size: %u, " + "csize: %u != digest size %u.\n", + __func__, cmd->cmd, cmd->sector, cmd->size, + cmd->csize, c->crypto_attached_size); + err = -EINVAL; + goto err_out_exit; + } + + err = dst_data_recv(st, cdata, cmd->csize); + if (err) + goto err_out_exit; + + cmd->size -= cmd->csize; + return 0; + +err_out_exit: + return err; +} + +/* + * Receive the command and start its processing. + */ +static int dst_recv_processing(struct dst_state *st) +{ + int err = -EINTR; + struct dst_cmd *cmd = st->data; + + /* + * If socket will be reset after this statement, then + * dst_data_recv() will just fail and loop will + * start again, so it can be done without any locks. + * + * st->read_socket is needed to prevents state machine + * breaking between this data reading and subsequent one + * in protocol specific functions during connection reset. + * In case of reset we have to read next command and do + * not expect data for old command to magically appear in + * new connection. + */ + st->read_socket = st->socket; + err = dst_data_recv(st, cmd, sizeof(struct dst_cmd)); + if (err) + goto out_exit; + + dst_convert_cmd(cmd); + + dprintk("%s: cmd: %u, size: %u, csize: %u, id: %llu, " + "sector: %llu, flags: %llx, rw: %llx.\n", + __func__, cmd->cmd, cmd->size, + cmd->csize, cmd->id, cmd->sector, + cmd->flags, cmd->rw); + + /* + * This should catch protocol breakage and random garbage instead of commands. + */ + if (unlikely(cmd->csize > st->size - sizeof(struct dst_cmd))) { + err = -EBADMSG; + goto out_exit; + } + + err = -EPROTO; + switch (cmd->cmd) { + case DST_IO_RESPONSE: + err = dst_process_io_response(st); + break; + case DST_IO: + err = dst_process_io(st); + break; + case DST_CFG: + err = dst_process_cfg(st); + break; + case DST_PING: + err = 0; + break; + default: + break; + } + +out_exit: + return err; +} + +/* + * Receiving thread. For the client node we should try to reconnect, + * for accepted client we just drop the state and expect it to reconnect. + */ +static int dst_recv(void *init_data, void *schedule_data) +{ + struct dst_state *st = schedule_data; + struct dst_node *n = init_data; + int err = 0; + + dprintk("%s: start st: %p, n: %p, scan: %lu, need_exit: %d.\n", + __func__, st, n, n->trans_scan_timeout, st->need_exit); + + while (n->trans_scan_timeout && !st->need_exit) { + err = dst_recv_processing(st); + if (err < 0) { + if (!st->ctl.type) + break; + + if (!n->trans_scan_timeout || st->need_exit) + break; + + dst_state_reset(st); + msleep(1000); + } + } + + st->need_exit = 1; + wake_up(&st->thread_wait); + + dprintk("%s: freeing receiving socket st: %p.\n", __func__, st); + dst_state_lock(st); + dst_state_exit_connected(st); + dst_state_unlock(st); + dst_state_put(st); + + dprintk("%s: freed receiving socket st: %p.\n", __func__, st); + + return err; +} + +/* + * Network state dies here and borns couple of lines below. + * This object is the main network state processing engine: + * sending, receiving, reconnections, all network related + * tasks are handled on behalf of the state. + */ +static void dst_state_free(struct dst_state *st) +{ + dprintk("%s: st: %p.\n", __func__, st); + if (st->cleanup) + st->cleanup(st); + kfree(st->data); + kfree(st); +} + +struct dst_state *dst_state_alloc(struct dst_node *n) +{ + struct dst_state *st; + int err = -ENOMEM; + + st = kzalloc(sizeof(struct dst_state), GFP_KERNEL); + if (!st) + goto err_out_exit; + + st->node = n; + st->need_exit = 0; + + st->size = PAGE_SIZE; + st->data = kmalloc(st->size, GFP_KERNEL); + if (!st->data) + goto err_out_free; + + spin_lock_init(&st->request_lock); + INIT_LIST_HEAD(&st->request_list); + + mutex_init(&st->state_lock); + init_waitqueue_head(&st->thread_wait); + + /* + * One for processing thread, another one for node itself. + */ + atomic_set(&st->refcnt, 2); + + dprintk("%s: st: %p, n: %p.\n", __func__, st, st->node); + + return st; + +err_out_free: + kfree(st); +err_out_exit: + return ERR_PTR(err); +} + +int dst_state_schedule_receiver(struct dst_state *st) +{ + return thread_pool_schedule_private(st->node->pool, dst_thread_setup, + dst_recv, st, MAX_SCHEDULE_TIMEOUT, st->node); +} + +/* + * Initialize client's connection to the remote peer: allocate state, + * connect and perform block IO autoconfiguration. + */ +int dst_node_init_connected(struct dst_node *n, struct dst_network_ctl *r) +{ + struct dst_state *st; + int err = -ENOMEM; + + st = dst_state_alloc(n); + if (IS_ERR(st)) { + err = PTR_ERR(st); + goto err_out_exit; + } + memcpy(&st->ctl, r, sizeof(struct dst_network_ctl)); + + err = dst_state_init_connected(st); + if (err) + goto err_out_free_data; + + err = dst_request_remote_config(st); + if (err) + goto err_out_exit_connected; + n->state = st; + + err = dst_state_schedule_receiver(st); + if (err) + goto err_out_exit_connected; + + return 0; + +err_out_exit_connected: + dst_state_exit_connected(st); +err_out_free_data: + dst_state_free(st); +err_out_exit: + n->state = NULL; + return err; +} + +void dst_state_put(struct dst_state *st) +{ + dprintk("%s: st: %p, refcnt: %d.\n", + __func__, st, atomic_read(&st->refcnt)); + if (atomic_dec_and_test(&st->refcnt)) + dst_state_free(st); +} + +/* + * Send block IO to the network one by one using zero-copy ->sendpage(). + */ +int dst_send_bio(struct dst_state *st, struct dst_cmd *cmd, struct bio *bio) +{ + struct bio_vec *bv; + struct dst_crypto_ctl *c = &st->node->crypto; + int err, i = 0; + int flags = MSG_WAITALL; + + err = dst_data_send_header(st->socket, cmd, + sizeof(struct dst_cmd) + c->crypto_attached_size, bio->bi_vcnt); + if (err) + goto err_out_exit; + + bio_for_each_segment(bv, bio, i) { + if (i < bio->bi_vcnt - 1) + flags |= MSG_MORE; + + err = kernel_sendpage(st->socket, bv->bv_page, bv->bv_offset, + bv->bv_len, flags); + if (err <= 0) + goto err_out_exit; + } + + return 0; + +err_out_exit: + dprintk("%s: %d/%d, flags: %x, err: %d.\n", + __func__, i, bio->bi_vcnt, flags, err); + return err; +} + +/* + * Send transaction to the remote peer. + */ +int dst_trans_send(struct dst_trans *t) +{ + int err; + struct dst_state *st = t->n->state; + struct bio *bio = t->bio; + + dst_convert_cmd(&t->cmd); + + dst_state_lock(st); + if (!st->socket) { + err = dst_state_init_connected(st); + if (err) + goto err_out_unlock; + } + + if (bio_data_dir(bio) == WRITE) { + err = dst_send_bio(st, &t->cmd, t->bio); + } else { + err = dst_data_send_header(st->socket, &t->cmd, + sizeof(struct dst_cmd), 0); + } + if (err) + goto err_out_reset; + + dst_state_unlock(st); + return 0; + +err_out_reset: + dst_state_reset_nolock(st); +err_out_unlock: + dst_state_unlock(st); + + return err; +} diff --git a/drivers/staging/dst/thread_pool.c b/drivers/staging/dst/thread_pool.c new file mode 100644 index 00000000000..7bed4e85102 --- /dev/null +++ b/drivers/staging/dst/thread_pool.c @@ -0,0 +1,345 @@ +/* + * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include <linux/kernel.h> +#include <linux/dst.h> +#include <linux/kthread.h> +#include <linux/slab.h> + +/* + * Thread pool abstraction allows to schedule a work to be performed + * on behalf of kernel thread. One does not operate with threads itself, + * instead user provides setup and cleanup callbacks for thread pool itself, + * and action and cleanup callbacks for each submitted work. + * + * Each worker has private data initialized at creation time and data, + * provided by user at scheduling time. + * + * When action is being performed, thread can not be used by other users, + * instead they will sleep until there is free thread to pick their work. + */ +struct thread_pool_worker +{ + struct list_head worker_entry; + + struct task_struct *thread; + + struct thread_pool *pool; + + int error; + int has_data; + int need_exit; + unsigned int id; + + wait_queue_head_t wait; + + void *private; + void *schedule_data; + + int (* action)(void *private, void *schedule_data); + void (* cleanup)(void *private); +}; + +static void thread_pool_exit_worker(struct thread_pool_worker *w) +{ + kthread_stop(w->thread); + + w->cleanup(w->private); + kfree(w); +} + +/* + * Called to mark thread as ready and allow users to schedule new work. + */ +static void thread_pool_worker_make_ready(struct thread_pool_worker *w) +{ + struct thread_pool *p = w->pool; + + mutex_lock(&p->thread_lock); + + if (!w->need_exit) { + list_move_tail(&w->worker_entry, &p->ready_list); + w->has_data = 0; + mutex_unlock(&p->thread_lock); + + wake_up(&p->wait); + } else { + p->thread_num--; + list_del(&w->worker_entry); + mutex_unlock(&p->thread_lock); + + thread_pool_exit_worker(w); + } +} + +/* + * Thread action loop: waits until there is new work. + */ +static int thread_pool_worker_func(void *data) +{ + struct thread_pool_worker *w = data; + + while (!kthread_should_stop()) { + wait_event_interruptible(w->wait, + kthread_should_stop() || w->has_data); + + if (kthread_should_stop()) + break; + + if (!w->has_data) + continue; + + w->action(w->private, w->schedule_data); + thread_pool_worker_make_ready(w); + } + + return 0; +} + +/* + * Remove single worker without specifying which one. + */ +void thread_pool_del_worker(struct thread_pool *p) +{ + struct thread_pool_worker *w = NULL; + + while (!w && p->thread_num) { + wait_event(p->wait, !list_empty(&p->ready_list) || !p->thread_num); + + dprintk("%s: locking list_empty: %d, thread_num: %d.\n", + __func__, list_empty(&p->ready_list), p->thread_num); + + mutex_lock(&p->thread_lock); + if (!list_empty(&p->ready_list)) { + w = list_first_entry(&p->ready_list, + struct thread_pool_worker, + worker_entry); + + dprintk("%s: deleting w: %p, thread_num: %d, list: %p [%p.%p].\n", + __func__, w, p->thread_num, &p->ready_list, + p->ready_list.prev, p->ready_list.next); + + p->thread_num--; + list_del(&w->worker_entry); + } + mutex_unlock(&p->thread_lock); + } + + if (w) + thread_pool_exit_worker(w); + dprintk("%s: deleted w: %p, thread_num: %d.\n", + __func__, w, p->thread_num); +} + +/* + * Remove a worker with given ID. + */ +void thread_pool_del_worker_id(struct thread_pool *p, unsigned int id) +{ + struct thread_pool_worker *w; + int found = 0; + + mutex_lock(&p->thread_lock); + list_for_each_entry(w, &p->ready_list, worker_entry) { + if (w->id == id) { + found = 1; + p->thread_num--; + list_del(&w->worker_entry); + break; + } + } + + if (!found) { + list_for_each_entry(w, &p->active_list, worker_entry) { + if (w->id == id) { + w->need_exit = 1; + break; + } + } + } + mutex_unlock(&p->thread_lock); + + if (found) + thread_pool_exit_worker(w); +} + +/* + * Add new worker thread with given parameters. + * If initialization callback fails, return error. + */ +int thread_pool_add_worker(struct thread_pool *p, + char *name, + unsigned int id, + void *(* init)(void *private), + void (* cleanup)(void *private), + void *private) +{ + struct thread_pool_worker *w; + int err = -ENOMEM; + + w = kzalloc(sizeof(struct thread_pool_worker), GFP_KERNEL); + if (!w) + goto err_out_exit; + + w->pool = p; + init_waitqueue_head(&w->wait); + w->cleanup = cleanup; + w->id = id; + + w->thread = kthread_run(thread_pool_worker_func, w, "%s", name); + if (IS_ERR(w->thread)) { + err = PTR_ERR(w->thread); + goto err_out_free; + } + + w->private = init(private); + if (IS_ERR(w->private)) { + err = PTR_ERR(w->private); + goto err_out_stop_thread; + } + + mutex_lock(&p->thread_lock); + list_add_tail(&w->worker_entry, &p->ready_list); + p->thread_num++; + mutex_unlock(&p->thread_lock); + + return 0; + +err_out_stop_thread: + kthread_stop(w->thread); +err_out_free: + kfree(w); +err_out_exit: + return err; +} + +/* + * Destroy the whole pool. + */ +void thread_pool_destroy(struct thread_pool *p) +{ + while (p->thread_num) { + dprintk("%s: num: %d.\n", __func__, p->thread_num); + thread_pool_del_worker(p); + } + + kfree(p); +} + +/* + * Create a pool with given number of threads. + * They will have sequential IDs started from zero. + */ +struct thread_pool *thread_pool_create(int num, char *name, + void *(* init)(void *private), + void (* cleanup)(void *private), + void *private) +{ + struct thread_pool_worker *w, *tmp; + struct thread_pool *p; + int err = -ENOMEM; + int i; + + p = kzalloc(sizeof(struct thread_pool), GFP_KERNEL); + if (!p) + goto err_out_exit; + + init_waitqueue_head(&p->wait); + mutex_init(&p->thread_lock); + INIT_LIST_HEAD(&p->ready_list); + INIT_LIST_HEAD(&p->active_list); + p->thread_num = 0; + + for (i=0; i<num; ++i) { + err = thread_pool_add_worker(p, name, i, init, + cleanup, private); + if (err) + goto err_out_free_all; + } + + return p; + +err_out_free_all: + list_for_each_entry_safe(w, tmp, &p->ready_list, worker_entry) { + list_del(&w->worker_entry); + thread_pool_exit_worker(w); + } + kfree(p); +err_out_exit: + return ERR_PTR(err); +} + +/* + * Schedule execution of the action on a given thread, + * provided ID pointer has to match previously stored + * private data. + */ +int thread_pool_schedule_private(struct thread_pool *p, + int (* setup)(void *private, void *data), + int (* action)(void *private, void *data), + void *data, long timeout, void *id) +{ + struct thread_pool_worker *w, *tmp, *worker = NULL; + int err = 0; + + while (!worker && !err) { + timeout = wait_event_interruptible_timeout(p->wait, + !list_empty(&p->ready_list), + timeout); + + if (!timeout) { + err = -ETIMEDOUT; + break; + } + + worker = NULL; + mutex_lock(&p->thread_lock); + list_for_each_entry_safe(w, tmp, &p->ready_list, worker_entry) { + if (id && id != w->private) + continue; + + worker = w; + + list_move_tail(&w->worker_entry, &p->active_list); + + err = setup(w->private, data); + if (!err) { + w->schedule_data = data; + w->action = action; + w->has_data = 1; + wake_up(&w->wait); + } else { + list_move_tail(&w->worker_entry, &p->ready_list); + } + + break; + } + mutex_unlock(&p->thread_lock); + } + + return err; +} + +/* + * Schedule execution on arbitrary thread from the pool. + */ +int thread_pool_schedule(struct thread_pool *p, + int (* setup)(void *private, void *data), + int (* action)(void *private, void *data), + void *data, long timeout) +{ + return thread_pool_schedule_private(p, setup, + action, data, timeout, NULL); +} diff --git a/drivers/staging/dst/trans.c b/drivers/staging/dst/trans.c new file mode 100644 index 00000000000..557d372a496 --- /dev/null +++ b/drivers/staging/dst/trans.c @@ -0,0 +1,335 @@ +/* + * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include <linux/bio.h> +#include <linux/dst.h> +#include <linux/slab.h> +#include <linux/mempool.h> + +/* + * Transaction memory pool size. + */ +static int dst_mempool_num = 32; +module_param(dst_mempool_num, int, 0644); + +/* + * Transaction tree management. + */ +static inline int dst_trans_cmp(dst_gen_t gen, dst_gen_t new) +{ + if (gen < new) + return 1; + if (gen > new) + return -1; + return 0; +} + +struct dst_trans *dst_trans_search(struct dst_node *node, dst_gen_t gen) +{ + struct rb_root *root = &node->trans_root; + struct rb_node *n = root->rb_node; + struct dst_trans *t, *ret = NULL; + int cmp; + + while (n) { + t = rb_entry(n, struct dst_trans, trans_entry); + + cmp = dst_trans_cmp(t->gen, gen); + if (cmp < 0) + n = n->rb_left; + else if (cmp > 0) + n = n->rb_right; + else { + ret = t; + break; + } + } + + dprintk("%s: %s transaction: id: %llu.\n", __func__, + (ret)?"found":"not found", gen); + + return ret; +} + +static int dst_trans_insert(struct dst_trans *new) +{ + struct rb_root *root = &new->n->trans_root; + struct rb_node **n = &root->rb_node, *parent = NULL; + struct dst_trans *ret = NULL, *t; + int cmp; + + while (*n) { + parent = *n; + + t = rb_entry(parent, struct dst_trans, trans_entry); + + cmp = dst_trans_cmp(t->gen, new->gen); + if (cmp < 0) + n = &parent->rb_left; + else if (cmp > 0) + n = &parent->rb_right; + else { + ret = t; + break; + } + } + + new->send_time = jiffies; + if (ret) { + printk("%s: exist: old: gen: %llu, bio: %llu/%u, send_time: %lu, " + "new: gen: %llu, bio: %llu/%u, send_time: %lu.\n", + __func__, + ret->gen, (u64)ret->bio->bi_sector, + ret->bio->bi_size, ret->send_time, + new->gen, (u64)new->bio->bi_sector, + new->bio->bi_size, new->send_time); + return -EEXIST; + } + + rb_link_node(&new->trans_entry, parent, n); + rb_insert_color(&new->trans_entry, root); + + dprintk("%s: inserted: gen: %llu, bio: %llu/%u, send_time: %lu.\n", + __func__, new->gen, (u64)new->bio->bi_sector, + new->bio->bi_size, new->send_time); + + return 0; +} + +int dst_trans_remove_nolock(struct dst_trans *t) +{ + struct dst_node *n = t->n; + + if (t->trans_entry.rb_parent_color) { + rb_erase(&t->trans_entry, &n->trans_root); + t->trans_entry.rb_parent_color = 0; + } + return 0; +} + +int dst_trans_remove(struct dst_trans *t) +{ + int ret; + struct dst_node *n = t->n; + + mutex_lock(&n->trans_lock); + ret = dst_trans_remove_nolock(t); + mutex_unlock(&n->trans_lock); + + return ret; +} + +/* + * When transaction is completed and there are no more users, + * we complete appriate block IO request with given error status. + */ +void dst_trans_put(struct dst_trans *t) +{ + if (atomic_dec_and_test(&t->refcnt)) { + struct bio *bio = t->bio; + + dprintk("%s: completed t: %p, gen: %llu, bio: %p.\n", + __func__, t, t->gen, bio); + + bio_endio(bio, t->error); + bio_put(bio); + + dst_node_put(t->n); + mempool_free(t, t->n->trans_pool); + } +} + +/* + * Process given block IO request: allocate transaction, insert it into the tree + * and send/schedule crypto processing. + */ +int dst_process_bio(struct dst_node *n, struct bio *bio) +{ + struct dst_trans *t; + int err = -ENOMEM; + + t = mempool_alloc(n->trans_pool, GFP_NOFS); + if (!t) + goto err_out_exit; + + t->n = dst_node_get(n); + t->bio = bio; + t->error = 0; + t->retries = 0; + atomic_set(&t->refcnt, 1); + t->gen = atomic_long_inc_return(&n->gen); + + t->enc = bio_data_dir(bio); + dst_bio_to_cmd(bio, &t->cmd, DST_IO, t->gen); + + mutex_lock(&n->trans_lock); + err = dst_trans_insert(t); + mutex_unlock(&n->trans_lock); + if (err) + goto err_out_free; + + dprintk("%s: gen: %llu, bio: %llu/%u, dir/enc: %d, need_crypto: %d.\n", + __func__, t->gen, (u64)bio->bi_sector, + bio->bi_size, t->enc, dst_need_crypto(n)); + + if (dst_need_crypto(n) && t->enc) + dst_trans_crypto(t); + else + dst_trans_send(t); + + return 0; + +err_out_free: + dst_node_put(n); + mempool_free(t, n->trans_pool); +err_out_exit: + bio_endio(bio, err); + bio_put(bio); + return err; +} + +/* + * Scan for timeout/stale transactions. + * Each transaction is being resent multiple times before error completion. + */ +static void dst_trans_scan(struct work_struct *work) +{ + struct dst_node *n = container_of(work, struct dst_node, trans_work.work); + struct rb_node *rb_node; + struct dst_trans *t; + unsigned long timeout = n->trans_scan_timeout; + int num = 10 * n->trans_max_retries; + + mutex_lock(&n->trans_lock); + + for (rb_node = rb_first(&n->trans_root); rb_node; ) { + t = rb_entry(rb_node, struct dst_trans, trans_entry); + + if (timeout && time_after(t->send_time + timeout, jiffies) + && t->retries == 0) + break; +#if 0 + dprintk("%s: t: %p, gen: %llu, n: %s, retries: %u, max: %u.\n", + __func__, t, t->gen, n->name, + t->retries, n->trans_max_retries); +#endif + if (--num == 0) + break; + + dst_trans_get(t); + + rb_node = rb_next(rb_node); + + if (timeout && (++t->retries < n->trans_max_retries)) { + dst_trans_send(t); + } else { + t->error = -ETIMEDOUT; + dst_trans_remove_nolock(t); + dst_trans_put(t); + } + + dst_trans_put(t); + } + + mutex_unlock(&n->trans_lock); + + /* + * If no timeout specified then system is in the middle of exiting process, + * so no need to reschedule scanning process again. + */ + if (timeout) { + if (!num) + timeout = HZ; + schedule_delayed_work(&n->trans_work, timeout); + } +} + +/* + * Flush all transactions and mark them as timed out. + * Destroy transaction pools. + */ +void dst_node_trans_exit(struct dst_node *n) +{ + struct dst_trans *t; + struct rb_node *rb_node; + + if (!n->trans_cache) + return; + + dprintk("%s: n: %p, cancelling the work.\n", __func__, n); + cancel_delayed_work_sync(&n->trans_work); + flush_scheduled_work(); + dprintk("%s: n: %p, work has been cancelled.\n", __func__, n); + + for (rb_node = rb_first(&n->trans_root); rb_node; ) { + t = rb_entry(rb_node, struct dst_trans, trans_entry); + + dprintk("%s: t: %p, gen: %llu, n: %s.\n", + __func__, t, t->gen, n->name); + + rb_node = rb_next(rb_node); + + t->error = -ETIMEDOUT; + dst_trans_remove_nolock(t); + dst_trans_put(t); + } + + mempool_destroy(n->trans_pool); + kmem_cache_destroy(n->trans_cache); +} + +/* + * Initialize transaction storage for given node. + * Transaction stores not only control information, + * but also network command and crypto data (if needed) + * to reduce number of allocations. Thus transaction size + * differs from node to node. + */ +int dst_node_trans_init(struct dst_node *n, unsigned int size) +{ + /* + * We need this, since node with given name can be dropped from the + * hash table, but be still alive, so subsequent creation of the node + * with the same name may collide with existing cache name. + */ + + snprintf(n->cache_name, sizeof(n->cache_name), "%s-%p", n->name, n); + + n->trans_cache = kmem_cache_create(n->cache_name, + size + n->crypto.crypto_attached_size, + 0, 0, NULL); + if (!n->trans_cache) + goto err_out_exit; + + n->trans_pool = mempool_create_slab_pool(dst_mempool_num, n->trans_cache); + if (!n->trans_pool) + goto err_out_cache_destroy; + + mutex_init(&n->trans_lock); + n->trans_root = RB_ROOT; + + INIT_DELAYED_WORK(&n->trans_work, dst_trans_scan); + schedule_delayed_work(&n->trans_work, n->trans_scan_timeout); + + dprintk("%s: n: %p, size: %u, crypto: %u.\n", + __func__, n, size, n->crypto.crypto_attached_size); + + return 0; + +err_out_cache_destroy: + kmem_cache_destroy(n->trans_cache); +err_out_exit: + return -ENOMEM; +} |