On 14:47 Mon 10 Dec     , Evgeniy Polyakov wrote:
> 
> Network state machine.
> 
> Includes network async processing state machine and related tasks.
Hi, I've tried to play a little bit with DST and discover huge memory
leak. Every read request from remote node result in bio + bio's pages leak.

Data flow:
->kst_export_ready              ## prepare and submit bio 
  ->generic_make_request(bio)   ## submit it

->kst_export_read_end_io        ## block layer call bio_end_io callback

->kst_thread_process_state      ## process ready requests
  ->kst_data_callback
     ->kst_data_process_bio     ## submit pages to network layer
  ->kst_complete_req
     ->kst_bio_endio
       ->kst_export_read_end_io ## WoW we calling the same bio_end_io 
                                ## callback twice 
     ->dst_free_request(req);   ## request will be destroyed but it's bio
                                ## and all bio's pages wasn't released.
We may release bio's pages after it was sent to network, it is safe because
sendpage() already called get_page(). I've attached simple patch which 
this this.  
> 
> Signed-off-by: Evgeniy Polyakov <[EMAIL PROTECTED]>
> 
> 
> diff --git a/drivers/block/dst/kst.c b/drivers/block/dst/kst.c
> new file mode 100644
> index 0000000..8fa3387
> --- /dev/null
> +++ b/drivers/block/dst/kst.c
> @@ -0,0 +1,1513 @@
> +/*
> + * 2007+ Copyright (c) Evgeniy Polyakov <[EMAIL PROTECTED]>> + * All rights 
> reserved.
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU General Public License for more details.
> + */
> +
> +#include <linux/kernel.h>
> +#include <linux/module.h>
> +#include <linux/list.h>
> +#include <linux/slab.h>
> +#include <linux/socket.h>
> +#include <linux/kthread.h>
> +#include <linux/net.h>
> +#include <linux/in.h>
> +#include <linux/poll.h>
> +#include <linux/bio.h>
> +#include <linux/dst.h>
> +
> +#include <net/sock.h>
> +
> +struct kst_poll_helper
> +{
> +     poll_table              pt;
> +     struct kst_state        *st;
> +};
> +
> +static LIST_HEAD(kst_worker_list);
> +static DEFINE_MUTEX(kst_worker_mutex);
> +
> +/*
> + * This function creates bound socket for local export node.
> + */
> +static int kst_sock_create(struct kst_state *st, struct saddr *addr,
> +             int type, int proto, int backlog)
> +{
> +     int err;
> +
> +     err = sock_create(addr->sa_family, type, proto, &st->socket);
> +     if (err)
> +             goto err_out_exit;
> +
> +     err = st->socket->ops->bind(st->socket, (struct sockaddr *)addr,
> +                     addr->sa_data_len);
> +
> +     err = st->socket->ops->listen(st->socket, backlog);
> +     if (err)
> +             goto err_out_release;
> +
> +     st->socket->sk->sk_allocation = GFP_NOIO;
> +
> +     return 0;
> +
> +err_out_release:
> +     sock_release(st->socket);
> +err_out_exit:
> +     return err;
> +}
> +
> +static void kst_sock_release(struct kst_state *st)
> +{
> +     if (st->socket) {
> +             sock_release(st->socket);
> +             st->socket = NULL;
> +     }
> +}
> +
> +void kst_wake(struct kst_state *st)
> +{
> +     if (st) {
> +             struct kst_worker *w = st->node->w;
> +             unsigned long flags;
> +
> +             spin_lock_irqsave(&w->ready_lock, flags);
> +             if (list_empty(&st->ready_entry))
> +                     list_add_tail(&st->ready_entry, &w->ready_list);
> +             spin_unlock_irqrestore(&w->ready_lock, flags);
> +
> +             wake_up(&w->wait);
> +     }
> +}
> +EXPORT_SYMBOL_GPL(kst_wake);
> +
> +/*
> + * Polling machinery.
> + */
> +static int kst_state_wake_callback(wait_queue_t *wait, unsigned mode,
> +             int sync, void *key)
> +{
> +     struct kst_state *st = container_of(wait, struct kst_state, wait);
> +     kst_wake(st);
> +     return 1;
> +}
> +
> +static void kst_queue_func(struct file *file, wait_queue_head_t *whead,
> +                              poll_table *pt)
> +{
> +     struct kst_state *st = container_of(pt, struct kst_poll_helper, pt)->st;
> +
> +     st->whead = whead;
> +     init_waitqueue_func_entry(&st->wait, kst_state_wake_callback);
> +     add_wait_queue(whead, &st->wait);
> +}
> +
> +static void kst_poll_exit(struct kst_state *st)
> +{
> +     if (st->whead) {
> +             remove_wait_queue(st->whead, &st->wait);
> +             st->whead = NULL;
> +     }
> +}
> +
> +/*
> + * This function removes request from state tree and ordering list.
> + */
> +void kst_del_req(struct dst_request *req)
> +{
> +     list_del_init(&req->request_list_entry);
> +}
> +EXPORT_SYMBOL_GPL(kst_del_req);
> +
> +static struct dst_request *kst_req_first(struct kst_state *st)
> +{
> +     struct dst_request *req = NULL;
> +
> +     if (!list_empty(&st->request_list))
> +             req = list_entry(st->request_list.next, struct dst_request,
> +                             request_list_entry);
> +     return req;
> +}
> +
> +/*
> + * This function dequeues first request from the queue and tree.
> + */
> +static struct dst_request *kst_dequeue_req(struct kst_state *st)
> +{
> +     struct dst_request *req;
> +
> +     mutex_lock(&st->request_lock);
> +     req = kst_req_first(st);
> +     if (req)
> +             kst_del_req(req);
> +     mutex_unlock(&st->request_lock);
> +     return req;
> +}
> +
> +/*
> + * This function enqueues request into tree, indexed by start of the request,
> + * and also puts request into ordered queue.
> + */
> +int kst_enqueue_req(struct kst_state *st, struct dst_request *req)
> +{
> +     if (unlikely(req->flags & DST_REQ_CHECK_QUEUE)) {
> +             struct dst_request *r;
> +
> +             list_for_each_entry(r, &st->request_list, request_list_entry) {
> +                     if (bio_rw(r->bio) != bio_rw(req->bio))
> +                             continue;
> +
> +                     if (r->start >= req->start + req->size)
> +                             continue;
> +
> +                     if (r->start + r->size <= req->start)
> +                             continue;
> +
> +                     return -EEXIST;
> +             }
> +     }
> +
> +     list_add_tail(&req->request_list_entry, &st->request_list);
> +     return 0;
> +}
> +EXPORT_SYMBOL_GPL(kst_enqueue_req);
> +
> +/*
> + * BIOs for local exporting node are freed via this function.
> + */
> +static void kst_export_put_bio(struct bio *bio)
> +{
> +     int i;
> +     struct bio_vec *bv;
> +
> +     dprintk("%s: bio: %p, size: %u, idx: %d, num: %d, req: %p.\n",
> +                     __func__, bio, bio->bi_size, bio->bi_idx,
> +                     bio->bi_vcnt, bio->bi_private);
> +
> +     bio_for_each_segment(bv, bio, i)
> +             __free_page(bv->bv_page);
> +     bio_put(bio);
> +}
> +
> +/*
> + * This is a generic request completion function for requests,
> + * queued for async processing.
> + * If it is local export node, state machine is different,
> + * see details below.
> + */
> +void kst_complete_req(struct dst_request *req, int err)
> +{
> +     dprintk("%s: bio: %p, req: %p, size: %llu, orig_size: %llu, "
> +                     "bi_size: %u, err: %d, flags: %u.\n",
> +                     __func__, req->bio, req, req->size, req->orig_size,
> +                     req->bio->bi_size, err, req->flags);
> +
> +     if (req->flags & DST_REQ_EXPORT) {
> +             if (err || !(req->flags & DST_REQ_EXPORT_WRITE)) {
> +                     req->bio_endio(req, err);
> +                     goto out;
> +             }
> +
> +             req->bio->bi_rw = WRITE;
> +             generic_make_request(req->bio);
> +     } else {
> +             req->bio_endio(req, err);
> +     }
> +out:
> +     dst_free_request(req);
> +}
> +EXPORT_SYMBOL_GPL(kst_complete_req);
> +
> +static void kst_flush_requests(struct kst_state *st)
> +{
> +     struct dst_request *req;
> +
> +     while ((req = kst_dequeue_req(st)) != NULL)
> +             kst_complete_req(req, -EIO);
> +}
> +
> +static int kst_poll_init(struct kst_state *st)
> +{
> +     struct kst_poll_helper ph;
> +
> +     ph.st = st;
> +     init_poll_funcptr(&ph.pt, &kst_queue_func);
> +
> +     st->socket->ops->poll(NULL, st->socket, &ph.pt);
> +     return 0;
> +}
> +
> +/*
> + * Main state creation function.
> + * It creates new state according to given operations
> + * and links it into worker structure and node.
> + */
> +static struct kst_state *kst_state_init(struct dst_node *node,
> +             unsigned int permissions,
> +             struct kst_state_ops *ops, void *data)
> +{
> +     struct kst_state *st;
> +     int err;
> +
> +     st = kzalloc(sizeof(struct kst_state), GFP_KERNEL);
> +     if (!st)
> +             return ERR_PTR(-ENOMEM);
> +
> +     st->permissions = permissions;
> +     st->node = node;
> +     st->ops = ops;
> +     INIT_LIST_HEAD(&st->ready_entry);
> +     INIT_LIST_HEAD(&st->entry);
> +     INIT_LIST_HEAD(&st->request_list);
> +     mutex_init(&st->request_lock);
> +
> +     err = st->ops->init(st, data);
> +     if (err)
> +             goto err_out_free;
> +     mutex_lock(&node->w->state_mutex);
> +     list_add_tail(&st->entry, &node->w->state_list);
> +     mutex_unlock(&node->w->state_mutex);
> +
> +     kst_wake(st);
> +
> +     return st;
> +
> +err_out_free:
> +     kfree(st);
> +     return ERR_PTR(err);
> +}
> +
> +/*
> + * This function is called when node is removed,
> + * or when state is destroyed for connected to local exporting
> + * node client.
> + */
> +void kst_state_exit(struct kst_state *st)
> +{
> +     struct kst_worker *w = st->node->w;
> +
> +     mutex_lock(&w->state_mutex);
> +     list_del_init(&st->entry);
> +     mutex_unlock(&w->state_mutex);
> +
> +     st->ops->exit(st);
> +
> +     if (st == st->node->state)
> +             st->node->state = NULL;
> +
> +     kfree(st);
> +}
> +
> +static int kst_error(struct kst_state *st, int err)
> +{
> +     if ((err == -ECONNRESET || err == -EPIPE) && st->ops->recovery)
> +             err = st->ops->recovery(st, err);
> +
> +     return st->node->st->alg->ops->error(st, err);
> +}
> +
> +/*
> + * This is main state processing function.
> + * It tries to complete request and invoke appropriate
> + * callbacks in case of errors or successfull operation finish.
> + */
> +static int kst_thread_process_state(struct kst_state *st)
> +{
> +     int err, empty;
> +     unsigned int revents;
> +     struct dst_request *req, *tmp;
> +
> +     mutex_lock(&st->request_lock);
> +     if (st->ops->ready) {
> +             err = st->ops->ready(st);
> +             if (err) {
> +                     mutex_unlock(&st->request_lock);
> +                     if (err < 0)
> +                             kst_state_exit(st);
> +                     return err;
> +             }
> +     }
> +
> +     err = 0;
> +     empty = 1;
> +     req = NULL;
> +     list_for_each_entry_safe(req, tmp, &st->request_list, 
> request_list_entry) {
> +             empty = 0;
> +             revents = st->socket->ops->poll(st->socket->file,
> +                             st->socket, NULL);
> +             if (!revents)
> +                     break;
> +             err = req->callback(req, revents);
> +             if (req->size && !err)
> +                     err = 1;
> +
> +             if (err < 0 || !req->size) {
> +                     if (!req->size)
> +                             err = 0;
> +                     kst_del_req(req);
> +                     kst_complete_req(req, err);
> +             }
> +
> +             if (err)
> +                     break;
> +     }
> +
> +     dprintk("%s: broke the loop: err: %d, list_empty: %d.\n",
> +                     __func__, err, list_empty(&st->request_list));
> +     mutex_unlock(&st->request_lock);
> +
> +     if (err < 0) {
> +             dprintk("%s: req: %p, err: %d, st: %p, node->state: %p.\n",
> +                     __func__, req, err, st, st->node->state);
> +
> +             if (st != st->node->state) {
> +                     /*
> +                      * Accepted client has state not related to storage
> +                      * node, so it must be freed explicitely.
> +                      * We do not try to fix clients connections to local
> +                      * export nodes, just drop the client.
> +                      */
> +
> +                     kst_state_exit(st);
> +                     return err;
> +             }
> +
> +             err = kst_error(st, err);
> +             if (err)
> +                     return err;
> +
> +             kst_wake(st);
> +     }
> +
> +     if (list_empty(&st->request_list) && !empty)
> +             kst_wake(st);
> +
> +     return err;
> +}
> +
> +/*
> + * Main worker thread - one per storage.
> + */
> +static int kst_thread_func(void *data)
> +{
> +     struct kst_worker *w = data;
> +     struct kst_state *st;
> +     unsigned long flags;
> +     int err = 0;
> +
> +     while (!kthread_should_stop()) {
> +             wait_event_interruptible_timeout(w->wait,
> +                             !list_empty(&w->ready_list) ||
> +                             kthread_should_stop(),
> +                             HZ);
> +
> +             st = NULL;
> +             spin_lock_irqsave(&w->ready_lock, flags);
> +             if (!list_empty(&w->ready_list)) {
> +                     st = list_entry(w->ready_list.next, struct kst_state,
> +                                     ready_entry);
> +                     list_del_init(&st->ready_entry);
> +             }
> +             spin_unlock_irqrestore(&w->ready_lock, flags);
> +
> +             if (!st)
> +                     continue;
> +
> +             err = kst_thread_process_state(st);
> +     }
> +
> +     return err;
> +}
> +
> +/*
> + * Worker initialization - this object will host andprocess all states,
> + * which in turn host requests for remote targets.
> + */
> +struct kst_worker *kst_worker_init(int id)
> +{
> +     struct kst_worker *w;
> +     int err;
> +
> +     w = kzalloc(sizeof(struct kst_worker), GFP_KERNEL);
> +     if (!w)
> +             return ERR_PTR(-ENOMEM);
> +
> +     w->id = id;
> +     init_waitqueue_head(&w->wait);
> +     spin_lock_init(&w->ready_lock);
> +     mutex_init(&w->state_mutex);
> +
> +     INIT_LIST_HEAD(&w->ready_list);
> +     INIT_LIST_HEAD(&w->state_list);
> +
> +     w->req_pool = mempool_create_slab_pool(256, dst_request_cache);
> +     if (!w->req_pool) {
> +             err = -ENOMEM;
> +             goto err_out_free;
> +     }
> +
> +     w->thread = kthread_run(&kst_thread_func, w, "kst%d", w->id);
> +     if (IS_ERR(w->thread)) {
> +             err = PTR_ERR(w->thread);
> +             goto err_out_destroy;
> +     }
> +
> +     mutex_lock(&kst_worker_mutex);
> +     list_add_tail(&w->entry, &kst_worker_list);
> +     mutex_unlock(&kst_worker_mutex);
> +
> +     return w;
> +
> +err_out_destroy:
> +     mempool_destroy(w->req_pool);
> +err_out_free:
> +     kfree(w);
> +     return ERR_PTR(err);
> +}
> +
> +void kst_worker_exit(struct kst_worker *w)
> +{
> +     struct kst_state *st, *n;
> +
> +     mutex_lock(&kst_worker_mutex);
> +     list_del(&w->entry);
> +     mutex_unlock(&kst_worker_mutex);
> +
> +     kthread_stop(w->thread);
> +
> +     list_for_each_entry_safe(st, n, &w->state_list, entry) {
> +             kst_state_exit(st);
> +     }
> +
> +     mempool_destroy(w->req_pool);
> +     kfree(w);
> +}
> +
> +/*
> + * Common state exit callback.
> + * Removes itself from worker's list of states,
> + * releases socket and flushes all requests.
> + */
> +static void kst_common_exit(struct kst_state *st)
> +{
> +     unsigned long flags;
> +
> +     kst_poll_exit(st);
> +
> +     spin_lock_irqsave(&st->node->w->ready_lock, flags);
> +     list_del_init(&st->ready_entry);
> +     spin_unlock_irqrestore(&st->node->w->ready_lock, flags);
> +
> +     kst_flush_requests(st);
> +     kst_sock_release(st);
> +}
> +
> +/*
> + * Listen socket contains security attributes in request_list,
> + * so it can not be flushed via usual way.
> + */
> +static void kst_listen_flush(struct kst_state *st)
> +{
> +     struct dst_secure *s, *tmp;
> +
> +     list_for_each_entry_safe(s, tmp, &st->request_list, sec_entry) {
> +             list_del(&s->sec_entry);
> +             kfree(s);
> +     }
> +}
> +
> +static void kst_listen_exit(struct kst_state *st)
> +{
> +     kst_listen_flush(st);
> +     kst_common_exit(st);
> +}
> +
> +/*
> + * BIO vector receiving function - does not block, but may sleep because
> + * of scheduling policy.
> + */
> +static int kst_data_recv_bio_vec(struct kst_state *st, struct bio_vec *bv,
> +             unsigned int offset, unsigned int size)
> +{
> +     struct msghdr msg;
> +     struct kvec iov;
> +     void *kaddr;
> +     int err;
> +
> +     kaddr = kmap(bv->bv_page);
> +
> +     iov.iov_base = kaddr + bv->bv_offset + offset;
> +     iov.iov_len = size;
> +
> +     msg.msg_iov = (struct iovec *)&iov;
> +     msg.msg_iovlen = 1;
> +     msg.msg_name = NULL;
> +     msg.msg_namelen = 0;
> +     msg.msg_control = NULL;
> +     msg.msg_controllen = 0;
> +     msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
> +
> +     err = kernel_recvmsg(st->socket, &msg, &iov, 1, iov.iov_len,
> +                     msg.msg_flags);
> +     kunmap(bv->bv_page);
> +
> +     return err;
> +}
> +
> +/*
> + * BIO vector sending function - does not block, but may sleep because
> + * of scheduling policy.
> + */
> +static int kst_data_send_bio_vec(struct kst_state *st, struct bio_vec *bv,
> +             unsigned int offset, unsigned int size)
> +{
> +     return kernel_sendpage(st->socket, bv->bv_page,
> +                     bv->bv_offset + offset, size,
> +                     MSG_DONTWAIT | MSG_NOSIGNAL);
> +}
> +
> +static int kst_data_send_bio_vec_slow(struct kst_state *st, struct bio_vec 
> *bv,
> +             unsigned int offset, unsigned int size)
> +{
> +     struct msghdr msg;
> +     struct kvec iov;
> +     void *addr;
> +     int err;
> +
> +     addr = kmap(bv->bv_page);
> +     iov.iov_base = addr + bv->bv_offset + offset;
> +     iov.iov_len = size;
> +
> +     msg.msg_iov = (struct iovec *)&iov;
> +     msg.msg_iovlen = 1;
> +     msg.msg_name = NULL;
> +     msg.msg_namelen = 0;
> +     msg.msg_control = NULL;
> +     msg.msg_controllen = 0;
> +     msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
> +
> +     err = kernel_sendmsg(st->socket, &msg, &iov, 1, iov.iov_len);
> +     kunmap(bv->bv_page);
> +
> +     return err;
> +}
> +
> +static u32 dst_csum_bvec(struct bio_vec *bv, unsigned int offset, unsigned 
> int size)
> +{
> +     void *addr;
> +     u32 csum;
> +
> +     addr = kmap_atomic(bv->bv_page, KM_USER0);
> +     csum =  dst_csum_data(addr + bv->bv_offset + offset, size);
> +     kunmap_atomic(addr, KM_USER0);
> +
> +     return csum;
> +}
> +
> +typedef int (*kst_data_process_bio_vec_t)(struct kst_state *st,
> +             struct bio_vec *bv, unsigned int offset, unsigned int size);
> +
> +/*
> + * @req: processing request.
> + * Contains BIO and all related to its processing info.
> + *
> + * This function sends or receives requested number of pages from given BIO.
> + *
> + * In case of errors negative value is returned and @size,
> + * @index and @off are set to the:
> + * - number of bytes not yet processed (i.e. the rest of the bytes to be
> + *   processed).
> + * - index of the last bio_vec started to be processed (header sent).
> + * - offset of the first byte to be processed in the bio_vec.
> + *
> + * If there are no errors, zero is returned.
> + * -EAGAIN is not an error and is transformed into zero return value,
> + * called must check if @size is zero, in that case whole BIO is processed
> + * and thus req->bio_endio() can be called, othervise new request must be 
> allocated
> + * to be processed later.
> + */
> +static int kst_data_process_bio(struct dst_request *req)
> +{
> +     int err = -ENOSPC;
> +     struct dst_remote_request r;
> +     kst_data_process_bio_vec_t func;
> +     unsigned int cur_size;
> +     int use_csum = test_bit(DST_NODE_USE_CSUM, &req->node->flags);
> +
> +     if (bio_rw(req->bio) == WRITE) {
> +             int i;
> +
> +             func = kst_data_send_bio_vec;
> +             for (i=req->idx; i<req->num; ++i) {
> +                     struct bio_vec *bv = bio_iovec_idx(req->bio, i);
> +
> +                     if (PageSlab(bv->bv_page)) {
> +                             func = kst_data_send_bio_vec_slow;
> +                             break;
> +                     }
> +             }
> +             r.cmd = cpu_to_be32(DST_WRITE);
> +     } else {
> +             r.cmd = cpu_to_be32(DST_READ);
> +             func = kst_data_recv_bio_vec;
> +     }
> +
> +     dprintk("%s: start: [%c], start: %llu, idx: %d, num: %d, "
> +                     "size: %llu, offset: %u, flags: %x, use_csum: %d.\n",
> +                     __func__, (bio_rw(req->bio) == WRITE)?'W':'R',
> +                     req->start, req->idx, req->num, req->size, req->offset,
> +                     req->flags, use_csum);
> +
> +     while (req->idx < req->num) {
> +             struct bio_vec *bv = bio_iovec_idx(req->bio, req->idx);
> +
> +             cur_size = min_t(u64, bv->bv_len - req->offset, req->size);
> +
> +             dprintk("%s: page: %p, slab: %d, count: %d, max: %d, off: %u, 
> len: %u, req->offset: %u, "
> +                             "req->size: %llu, cur_size: %u, flags: %x, "
> +                             "use_csum: %d, req->csum: %x.\n",
> +                             __func__, bv->bv_page, PageSlab(bv->bv_page),
> +                             atomic_read(&bv->bv_page->_count), 
> req->bio->bi_vcnt,
> +                             bv->bv_offset, bv->bv_len,
> +                             req->offset, req->size, cur_size,
> +                             req->flags, use_csum, req->tmp_csum);
> +
> +             if (cur_size == 0) {
> +                     printk(KERN_ERR "%s: %d/%d: start: %llu, "
> +                             "bv_offset: %u, bv_len: %u, "
> +                             "req_offset: %u, req_size: %llu, "
> +                             "req: %p, bio: %p, err: %d.\n",
> +                             __func__, req->idx, req->num, req->start,
> +                             bv->bv_offset, bv->bv_len,
> +                             req->offset, req->size,
> +                             req, req->bio, err);
> +                     BUG();
> +             }
> +
> +             if (!(req->flags & DST_REQ_HEADER_SENT)) {
> +                     r.sector = cpu_to_be64(req->start);
> +                     r.offset = cpu_to_be32(bv->bv_offset + req->offset);
> +                     r.size = cpu_to_be32(cur_size);
> +                     r.csum = 0;
> +
> +                     if (use_csum && bio_rw(req->bio) == WRITE &&
> +                                     !req->tmp_offset) {
> +                             req->tmp_offset = req->offset;
> +                             r.csum = cpu_to_be32(dst_csum_bvec(bv,
> +                                             req->offset, cur_size));
> +                     }
> +
> +                     err = dst_data_send_header(req->state->socket, &r);
> +                     dprintk("%s: %d/%d: sending header: cmd: %u, start: 
> %llu, "
> +                             "bv_offset: %u, bv_len: %u, "
> +                             "a offset: %u, offset: %u, "
> +                             "cur_size: %u, err: %d.\n",
> +                             __func__, req->idx, req->num, 
> be32_to_cpu(r.cmd),
> +                             req->start, bv->bv_offset, bv->bv_len,
> +                             bv->bv_offset + req->offset,
> +                             req->offset, cur_size, err);
> +
> +                     if (err != sizeof(struct dst_remote_request)) {
> +                             if (err >= 0)
> +                                     err = -EINVAL;
> +                             break;
> +                     }
> +
> +                     req->flags |= DST_REQ_HEADER_SENT;
> +             }
> +
> +             if (use_csum && (bio_rw(req->bio) != WRITE) &&
> +                             !(req->flags & DST_REQ_CHEKSUM_RECV)) {
> +                     struct dst_remote_request tmp_req;
> +
> +                     err = dst_data_recv_header(req->state->socket, 
> &tmp_req, 0);
> +                     dprintk("%s: %d/%d: receiving header: start: %llu, "
> +                             "bv_offset: %u, bv_len: %u, "
> +                             "a offset: %u, offset: %u, "
> +                             "cur_size: %u, err: %d.\n",
> +                             __func__, req->idx, req->num,
> +                             req->start, bv->bv_offset, bv->bv_len,
> +                             bv->bv_offset + req->offset,
> +                             req->offset, cur_size, err);
> +
> +                     if (err != sizeof(struct dst_remote_request)) {
> +                             if (err >= 0)
> +                                     err = -EINVAL;
> +                             break;
> +                     }
> +
> +                     if (req->tmp_csum) {
> +                             printk("%s: req: %p, old csum: %x, new: %x.\n",
> +                                             __func__, req, req->tmp_csum,
> +                                             be32_to_cpu(tmp_req.csum));
> +                             BUG_ON(1);
> +                     }
> +
> +                     dprintk("%s: req: %p, old csum: %x, new: %x.\n",
> +                                     __func__, req, req->tmp_csum,
> +                                     be32_to_cpu(tmp_req.csum));
> +                     req->tmp_csum = be32_to_cpu(tmp_req.csum);
> +
> +                     req->flags |= DST_REQ_CHEKSUM_RECV;
> +             }
> +
> +             err = func(req->state, bv, req->offset, cur_size);
> +             if (err <= 0)
> +                     break;
> +
> +             req->offset += err;
> +             req->size -= err;
> +
> +             if (req->offset != bv->bv_len) {
> +                     dprintk("%s: %d/%d: this: start: %llu, bv_offset: %u, "
> +                             "bv_len: %u, offset: %u, "
> +                             "cur_size: %u, err: %d.\n",
> +                             __func__, req->idx, req->num, req->start,
> +                             bv->bv_offset, bv->bv_len,
> +                             req->offset, cur_size, err);
> +                     err = -EAGAIN;
> +                     break;
> +             }
> +
> +             if (use_csum && bio_rw(req->bio) != WRITE) {
> +                     u32 csum = dst_csum_bvec(bv, req->tmp_offset,
> +                                     bv->bv_len - req->tmp_offset);
> +
> +                     dprintk("%s: req: %p, csum: %x, received csum: %x.\n",
> +                                     __func__, req, csum, req->tmp_csum);
> +
> +                     if (csum != req->tmp_csum) {
> +                             printk("%s: %d/%d: broken checksum: start: 
> %llu, "
> +                                     "bv_offset: %u, bv_len: %u, "
> +                                     "a offset: %u, offset: %u, "
> +                                     "cur_size: %u, orig_size: %llu.\n",
> +                                     __func__, req->idx, req->num,
> +                                     req->start, bv->bv_offset, bv->bv_len,
> +                                     bv->bv_offset + req->offset,
> +                                     req->offset, cur_size, req->orig_size);
> +                             printk("%s: broken checksum: req: %p, csum: %x, 
> "
> +                                     "should be: %x, flags: %x, "
> +                                     "req->tmp_offset: %u, rw: %lu.\n",
> +                                     __func__, req, csum, req->tmp_csum,
> +                                     req->flags, req->tmp_offset, 
> bio_rw(req->bio));
> +
> +                             req->offset -= err;
> +                             req->size += err;
> +
> +                             err = -EREMOTEIO;
> +                             break;
> +                     }
> +             }
> +
> +             req->offset = 0;
> +             req->idx++;
> +             req->flags &= ~(DST_REQ_HEADER_SENT | DST_REQ_CHEKSUM_RECV);
> +             req->tmp_csum = 0;
> +             req->start += to_sector(bv->bv_len);
> +     }
> +
> +     if (err <= 0 && err != -EAGAIN) {
> +             if (err == 0)
> +                     err = -ECONNRESET;
> +     } else
> +             err = 0;
> +
> +     if (err < 0 || (req->idx == req->num && req->size)) {
> +             dprintk("%s: return: idx: %d, num: %d, offset: %u, "
> +                             "size: %llu, err: %d.\n",
> +                     __func__, req->idx, req->num, req->offset,
> +                     req->size, err);
> +     }
> +     dprintk("%s: end: start: %llu, idx: %d, num: %d, "
> +                     "size: %llu, offset: %u.\n",
> +             __func__, req->start, req->idx, req->num,
> +             req->size, req->offset);
> +
> +     return err;
> +}
> +
> +void kst_bio_endio(struct dst_request *req, int err)
> +{
> +     if (err && printk_ratelimit())
> +             printk("%s: freeing bio: %p, bi_size: %u, "
> +                     "orig_size: %llu, req: %p, err: %d.\n",
> +             __func__, req->bio, req->bio->bi_size, req->orig_size,
> +             req, err);
> +     bio_endio(req->bio, req->orig_size, err);
> +}
> +EXPORT_SYMBOL_GPL(kst_bio_endio);
> +
> +/*
> + * This callback is invoked by worker thread to process given request.
> + */
> +int kst_data_callback(struct dst_request *req, unsigned int revents)
> +{
> +     int err;
> +
> +     dprintk("%s: req: %p, num: %d, idx: %d, bio: %p, "
> +                     "revents: %x, flags: %x.\n",
> +                     __func__, req, req->num, req->idx, req->bio,
> +                     revents, req->flags);
> +
> +     if (req->flags & DST_REQ_EXPORT_READ)
> +             return 1;
> +
> +     err = kst_data_process_bio(req);
> +
> +     if (revents & (POLLERR | POLLHUP | POLLRDHUP))
> +             err = -EPIPE;
> +
> +     return err;
> +}
> +EXPORT_SYMBOL_GPL(kst_data_callback);
> +
> +struct dst_request *dst_clone_request(struct dst_request *req, mempool_t 
> *pool)
> +{
> +     struct dst_request *new_req;
> +
> +     new_req = mempool_alloc(pool, GFP_NOIO);
> +     if (!new_req)
> +             return NULL;
> +
> +     memset(new_req, 0, sizeof(struct dst_request));
> +
> +     dprintk("%s: req: %p, new_req: %p.\n", __func__, req, new_req);
> +
> +     if (req) {
> +             new_req->bio = req->bio;
> +             new_req->state = req->state;
> +             new_req->node = req->node;
> +             new_req->idx = req->idx;
> +             new_req->num = req->num;
> +             new_req->size = req->size;
> +             new_req->orig_size = req->orig_size;
> +             new_req->offset = req->offset;
> +             new_req->tmp_offset = req->tmp_offset;
> +             new_req->tmp_csum = req->tmp_csum;
> +             new_req->start = req->start;
> +             new_req->flags = req->flags;
> +             new_req->bio_endio = req->bio_endio;
> +             new_req->priv = req->priv;
> +     }
> +
> +     return new_req;
> +}
> +EXPORT_SYMBOL_GPL(dst_clone_request);
> +
> +void dst_free_request(struct dst_request *req)
> +{
> +     dprintk("%s: free req: %p, pool: %p, bio: %p, state: %p, node: %p.\n",
> +                     __func__, req, req->node->w->req_pool,
> +                     req->bio, req->state, req->node);
> +     mempool_free(req, req->node->w->req_pool);
> +}
> +EXPORT_SYMBOL_GPL(dst_free_request);
> +
> +/*
> + * This is main data processing function, eventually invoked from block 
> layer.
> + * It tries to complte request, but if it is about to block, it allocates
> + * new request and queues it to main worker to be processed when events 
> allow.
> + */
> +static int kst_data_push(struct dst_request *req)
> +{
> +     struct kst_state *st = req->state;
> +     struct dst_request *new_req;
> +     unsigned int revents;
> +     int err, locked = 0;
> +
> +     dprintk("%s: start: %llu, size: %llu, bio: %p.\n",
> +                     __func__, req->start, req->size, req->bio);
> +
> +     if (!list_empty(&st->request_list) || (req->flags & 
> DST_REQ_ALWAYS_QUEUE))
> +             goto alloc_new_req;
> +
> +     if (mutex_trylock(&st->request_lock)) {
> +             locked = 1;
> +
> +             if (!list_empty(&st->request_list))
> +                     goto alloc_new_req;
> +
> +             revents = st->socket->ops->poll(NULL, st->socket, NULL);
> +             if (revents & POLLOUT) {
> +                     err = kst_data_process_bio(req);
> +                     if (err < 0)
> +                             goto out_unlock;
> +
> +                     if (!req->size)
> +                             goto out_bio_endio;
> +             }
> +     }
> +
> +alloc_new_req:
> +     err = -ENOMEM;
> +     new_req = dst_clone_request(req, req->node->w->req_pool);
> +     if (!new_req)
> +             goto out_unlock;
> +
> +     new_req->callback = &kst_data_callback;
> +
> +     if (!locked)
> +             mutex_lock(&st->request_lock);
> +
> +     locked = 1;
> +
> +     err = kst_enqueue_req(st, new_req);
> +     if (err)
> +             goto out_unlock;
> +     mutex_unlock(&st->request_lock);
> +
> +     err = 0;
> +     goto out;
> +
> +out_bio_endio:
> +     req->bio_endio(req, err);
> +out_unlock:
> +     if (locked)
> +             mutex_unlock(&st->request_lock);
> +     locked = 0;
> +
> +     if (err) {
> +             err = kst_error(st, err);
> +             if (!err)
> +                     goto alloc_new_req;
> +     }
> +
> +     if (err && printk_ratelimit()) {
> +             printk("%s: error [%c], start: %llu, idx: %d, num: %d, "
> +                             "size: %llu, offset: %u, err: %d.\n",
> +                     __func__, (bio_rw(req->bio) == WRITE)?'W':'R',
> +                     req->start, req->idx, req->num, req->size,
> +                     req->offset, err);
> +     }
> +
> +out:
> +
> +     kst_wake(st);
> +     return err;
> +}
> +
> +/*
> + * Remote node initialization callback.
> + */
> +static int kst_data_init(struct kst_state *st, void *data)
> +{
> +     int err;
> +
> +     st->socket = data;
> +     st->socket->sk->sk_allocation = GFP_NOIO;
> +     /*
> +      * Why not?
> +      */
> +     st->socket->sk->sk_sndbuf = st->socket->sk->sk_sndbuf = 1024*1024*10;
> +
> +     err = kst_poll_init(st);
> +     if (err)
> +             return err;
> +
> +     return 0;
> +}
> +
> +/*
> + * Remote node recovery function - tries to reconnect to given target.
> + */
> +static int kst_data_recovery(struct kst_state *st, int err)
> +{
> +     struct socket *sock;
> +     struct sockaddr addr;
> +     int addrlen;
> +     struct dst_request *req;
> +
> +     if (err != -ECONNRESET && err != -EPIPE) {
> +             dprintk("%s: state %p does not know how "
> +                             "to recover from error %d.\n",
> +                             __func__, st, err);
> +             return err;
> +     }
> +
> +     err = sock_create(st->socket->ops->family, st->socket->type,
> +                     st->socket->sk->sk_protocol, &sock);
> +     if (err < 0)
> +             goto err_out_exit;
> +
> +     sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo =
> +             msecs_to_jiffies(DST_DEFAULT_TIMEO);
> +
> +     err = sock->ops->getname(st->socket, &addr, &addrlen, 2);
> +     if (err)
> +             goto err_out_destroy;
> +
> +     err = sock->ops->connect(sock, &addr, addrlen, 0);
> +     if (err)
> +             goto err_out_destroy;
> +
> +     kst_poll_exit(st);
> +     kst_sock_release(st);
> +
> +     mutex_lock(&st->request_lock);
> +     err = st->ops->init(st, sock);
> +     if (!err) {
> +             /*
> +              * After reconnection is completed all requests
> +              * must be resent from the state they were finished previously,
> +              * but with new headers.
> +              */
> +             list_for_each_entry(req, &st->request_list, request_list_entry)
> +                     req->flags &= ~(DST_REQ_HEADER_SENT | 
> DST_REQ_CHEKSUM_RECV);
> +     }
> +     mutex_unlock(&st->request_lock);
> +     if (err < 0)
> +             goto err_out_destroy;
> +
> +     kst_wake(st);
> +     dprintk("%s: reconnected.\n", __func__);
> +
> +     return 0;
> +
> +err_out_destroy:
> +     sock_release(sock);
> +err_out_exit:
> +     dprintk("%s: revovery failed: st: %p, err: %d.\n", __func__, st, err);
> +     return err;
> +}
> +
> +/*
> + * Local exporting node end IO callbacks.
> + */
> +static int kst_export_write_end_io(struct bio *bio, unsigned int size, int 
> err)
> +{
> +     dprintk("%s: bio: %p, size: %u, idx: %d, num: %d, err: %d.\n",
> +             __func__, bio, bio->bi_size, bio->bi_idx, bio->bi_vcnt, err);
> +
> +     if (bio->bi_size)
> +             return 1;
> +
> +     kst_export_put_bio(bio);
> +     return 0;
> +}
> +
> +static int kst_export_read_end_io(struct bio *bio, unsigned int size, int 
> err)
> +{
> +     struct dst_request *req = bio->bi_private;
> +     struct kst_state *st = req->state;
> +     int use_csum = test_bit(DST_NODE_USE_CSUM, &req->node->flags);
> +
> +     dprintk("%s: bio: %p, req: %p, size: %u, idx: %d, num: %d, err: %d.\n",
> +             __func__, bio, req, bio->bi_size, bio->bi_idx,
> +             bio->bi_vcnt, err);
> +
> +     if (bio->bi_size)
> +             return 1;
> +
> +     if (err) {
> +             kst_export_put_bio(bio);
> +             return 0;
> +     }
> +
> +     bio->bi_size = req->size = req->orig_size;
> +     bio->bi_rw = WRITE;
> +     if (use_csum)
> +             req->flags &= ~(DST_REQ_HEADER_SENT | DST_REQ_CHEKSUM_RECV);
> +
> +     /*
> +      * This is a race with kst_data_callback(), which checks
> +      * this bit to determine if it can or can not process given
> +      * request. This does not harm actually, since subsequent
> +      * state wakeup will call it again and thus will pick
> +      * given request in time.
> +      */
> +     req->flags &= ~DST_REQ_EXPORT_READ;
> +     kst_wake(st);
> +     return 0;
> +}
> +
> +/*
> + * This callback is invoked each time new request from remote
> + * node to given local export node is received.
> + * It allocates new block IO request and queues it for processing.
> + */
> +static int kst_export_ready(struct kst_state *st)
> +{
> +     struct dst_remote_request r;
> +     struct bio *bio;
> +     int err, nr, i;
> +     struct dst_request *req;
> +     unsigned int revents = st->socket->ops->poll(NULL, st->socket, NULL);
> +
> +     if (revents & (POLLERR | POLLHUP)) {
> +             err = -EPIPE;
> +             goto err_out_exit;
> +     }
> +
> +     if (!(revents & POLLIN) || !list_empty(&st->request_list))
> +             return 0;
> +
> +     err = dst_data_recv_header(st->socket, &r, 1);
> +     if (err != sizeof(struct dst_remote_request)) {
> +             err = -ECONNRESET;
> +             goto err_out_exit;
> +     }
> +
> +     kst_convert_header(&r);
> +
> +     dprintk("\n%s: st: %p, cmd: %u, sector: %llu, size: %u, "
> +                     "csum: %x, offset: %u.\n",
> +                     __func__, st, r.cmd, r.sector,
> +                     r.size, r.csum, r.offset);
> +
> +     err = -EINVAL;
> +     if (r.cmd != DST_READ && r.cmd != DST_WRITE && r.cmd != DST_REMOTE_CFG)
> +             goto err_out_exit;
> +
> +     if ((s64)(r.sector + to_sector(r.size)) < 0 ||
> +             (r.sector + to_sector(r.size)) > st->node->size ||
> +             r.offset >= PAGE_SIZE)
> +             goto err_out_exit;
> +
> +     if (r.cmd == DST_REMOTE_CFG) {
> +             r.sector = st->node->size;
> +
> +             if (test_bit(DST_NODE_USE_CSUM, &st->node->flags))
> +                     r.csum = 1;
> +
> +             kst_convert_header(&r);
> +
> +             err = dst_data_send_header(st->socket, &r);
> +             if (err != sizeof(struct dst_remote_request)) {
> +                     err = -EINVAL;
> +                     goto err_out_exit;
> +             }
> +             kst_wake(st);
> +             return 0;
> +     }
> +
> +     nr = DIV_ROUND_UP(r.size, PAGE_SIZE);
> +
> +     while (r.size) {
> +             int nr_pages = min(BIO_MAX_PAGES, nr);
> +             unsigned int size;
> +             struct page *page;
> +
> +             err = -ENOMEM;
> +             req = dst_clone_request(NULL, st->node->w->req_pool);
> +             if (!req)
> +                     goto err_out_exit;
> +
> +             bio = bio_alloc(GFP_NOIO, nr_pages);
> +             if (!bio)
> +                     goto err_out_free_req;
> +
> +             req->flags = DST_REQ_EXPORT | DST_REQ_HEADER_SENT |
> +                             DST_REQ_CHEKSUM_RECV;
> +             req->bio = bio;
> +             req->state = st;
> +             req->node = st->node;
> +             req->callback = &kst_data_callback;
> +             req->bio_endio = &kst_bio_endio;
> +
> +             req->tmp_offset = 0;
> +             req->tmp_csum = r.csum;
> +
> +             /*
> +              * Yes, looks a bit weird.
> +              * Logic is simple - for local exporting node all operations
> +              * are reversed compared to usual nodes, since usual nodes
> +              * process remote data and local export node process remote
> +              * requests, so that writing data means sending data to
> +              * remote node and receiving on the local export one.
> +              *
> +              * So, to process writing to the exported node we need first
> +              * to receive data from the net (i.e. to perform READ
> +              * operationin terms of usual node), and then put it to the
> +              * storage (WRITE command, so it will be changed before
> +              * calling generic_make_request()).
> +              *
> +              * To process read request from the exported node we need
> +              * first to read it from storage (READ command for BIO)
> +              * and then send it over the net (perform WRITE operation
> +              * in terms of network).
> +              */
> +             if (r.cmd == DST_WRITE) {
> +                     req->flags |= DST_REQ_EXPORT_WRITE;
> +                     bio->bi_end_io = kst_export_write_end_io;
> +             } else {
> +                     req->flags |= DST_REQ_EXPORT_READ;
> +                     bio->bi_end_io = kst_export_read_end_io;
> +             }
> +             bio->bi_rw = READ;
> +             bio->bi_private = req;
> +             bio->bi_sector = r.sector;
> +             bio->bi_bdev = st->node->bdev;
> +
> +             for (i = 0; i < nr_pages; ++i) {
> +                     page = alloc_page(GFP_NOIO);
> +                     if (!page)
> +                             break;
> +
> +                     size = min_t(u32, PAGE_SIZE - r.offset, r.size);
> +
> +                     err = bio_add_page(bio, page, size, 0);
> +                     dprintk("%s: %d/%d: page: %p, size: %u, "
> +                                     "offset: %u (used zero), err: %d.\n",
> +                                     __func__, i, nr_pages, page, size,
> +                                     r.offset, err);
> +                     if (err <= 0)
> +                             break;
> +
> +                     if (err == size)
> +                             nr--;
> +
> +                     r.size -= err;
> +                     r.sector += to_sector(err);
> +
> +                     if (!r.size)
> +                             break;
> +             }
> +
> +             if (!bio->bi_vcnt) {
> +                     err = -ENOMEM;
> +                     goto err_out_put;
> +             }
> +
> +             req->size = req->orig_size = bio->bi_size;
> +             req->start = bio->bi_sector;
> +             req->idx = 0;
> +             req->num = bio->bi_vcnt;
> +
> +             dprintk("%s: submitting: bio: %p, req: %p, start: %llu, "
> +                     "size: %llu, idx: %d, num: %d, offset: %u, csum: %x.\n",
> +                     __func__, bio, req, req->start, req->size,
> +                     req->idx, req->num, req->offset, req->tmp_csum);
> +
> +             err = kst_enqueue_req(st, req);
> +             if (err)
> +                     goto err_out_put;
> +
> +             if (r.cmd == DST_READ) {
> +                     generic_make_request(bio);
> +             }
> +     }
> +
> +     kst_wake(st);
> +     return 0;
> +
> +err_out_put:
> +     bio_put(bio);
> +err_out_free_req:
> +     dst_free_request(req);
> +err_out_exit:
> +     return err;
> +}
> +
> +static void kst_export_exit(struct kst_state *st)
> +{
> +     struct dst_node *n = st->node;
> +
> +     kst_common_exit(st);
> +     dst_node_put(n);
> +}
> +
> +static struct kst_state_ops kst_data_export_ops = {
> +     .init = &kst_data_init,
> +     .push = &kst_data_push,
> +     .exit = &kst_export_exit,
> +     .ready = &kst_export_ready,
> +};
> +
> +/*
> + * This callback is invoked each time listening socket for
> + * given local export node becomes ready.
> + * It creates new state for connected client and queues for processing.
> + */
> +static int kst_listen_ready(struct kst_state *st)
> +{
> +     struct socket *newsock;
> +     struct saddr addr;
> +     struct kst_state *newst;
> +     int err;
> +     unsigned int revents, permissions = 0;
> +     struct dst_secure *s;
> +
> +     revents = st->socket->ops->poll(NULL, st->socket, NULL);
> +     if (!(revents & POLLIN))
> +             return 1;
> +
> +     err = sock_create(st->socket->ops->family, st->socket->type,
> +                     st->socket->sk->sk_protocol, &newsock);
> +     if (err)
> +             goto err_out_exit;
> +
> +     err = st->socket->ops->accept(st->socket, newsock, 0);
> +     if (err)
> +             goto err_out_put;
> +
> +     if (newsock->ops->getname(newsock, (struct sockaddr *)&addr,
> +                               (int *)&addr.sa_data_len, 2) < 0) {
> +             err = -ECONNABORTED;
> +             goto err_out_put;
> +     }
> +
> +     list_for_each_entry(s, &st->request_list, sec_entry) {
> +             void *sec_addr, *new_addr;
> +
> +             sec_addr = ((void *)&s->sec.addr) + s->sec.check_offset;
> +             new_addr = ((void *)&addr) + s->sec.check_offset;
> +
> +             if (!memcmp(sec_addr, new_addr,
> +                             addr.sa_data_len - s->sec.check_offset)) {
> +                     permissions = s->sec.permissions;
> +                     break;
> +             }
> +     }
> +
> +     /*
> +      * So far only reading and writing are supported.
> +      * Block device does not know about anything else,
> +      * but as far as I recall, there was a prognosis,
> +      * that computer will never require more than 640kb of RAM.
> +      */
> +     if (permissions == 0) {
> +             err = -EPERM;
> +             goto err_out_put;
> +     }
> +
> +     if (st->socket->ops->family == AF_INET) {
> +             struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
> +             printk(KERN_INFO "%s: Client: %u.%u.%u.%u:%d.\n", __func__,
> +                     NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
> +     } else if (st->socket->ops->family == AF_INET6) {
> +             struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&addr;
> +             printk(KERN_INFO "%s: Client: "
> +                     "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d",
> +                     __func__,
> +                     NIP6(sin->sin6_addr), ntohs(sin->sin6_port));
> +     }
> +
> +     dst_node_get(st->node);
> +     newst = kst_state_init(st->node, permissions,
> +                     &kst_data_export_ops, newsock);
> +     if (IS_ERR(newst)) {
> +             err = PTR_ERR(newst);
> +             goto err_out_put;
> +     }
> +
> +     /*
> +      * Negative return value means error, positive - stop this state
> +      * processing. Zero allows to check state for pending requests.
> +      * Listening socket contains security objects in request list,
> +      * since it does not have any requests.
> +      */
> +     return 1;
> +
> +err_out_put:
> +     sock_release(newsock);
> +err_out_exit:
> +     return 1;
> +}
> +
> +static int kst_listen_init(struct kst_state *st, void *data)
> +{
> +     int err = -ENOMEM, i;
> +     struct dst_le_template *tmp = data;
> +     struct dst_secure *s;
> +
> +     for (i=0; i<tmp->le->secure_attr_num; ++i) {
> +             s = kmalloc(sizeof(struct dst_secure), GFP_KERNEL);
> +             if (!s)
> +                     goto err_out_exit;
> +
> +             memcpy(&s->sec, tmp->data, sizeof(struct dst_secure_user));
> +
> +             list_add_tail(&s->sec_entry, &st->request_list);
> +             tmp->data += sizeof(struct dst_secure_user);
> +
> +             if (s->sec.addr.sa_family == AF_INET) {
> +                     struct sockaddr_in *sin =
> +                             (struct sockaddr_in *)&s->sec.addr;
> +                     printk(KERN_INFO "%s: Client: %u.%u.%u.%u:%d, "
> +                                     "permissions: %x.\n",
> +                             __func__, NIPQUAD(sin->sin_addr.s_addr),
> +                             ntohs(sin->sin_port), s->sec.permissions);
> +             } else if (s->sec.addr.sa_family == AF_INET6) {
> +                     struct sockaddr_in6 *sin =
> +                             (struct sockaddr_in6 *)&s->sec.addr;
> +                     printk(KERN_INFO "%s: Client: "
> +                             "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d, "
> +                             "permissions: %x.\n",
> +                             __func__, NIP6(sin->sin6_addr),
> +                             ntohs(sin->sin6_port), s->sec.permissions);
> +             }
> +     }
> +
> +     err = kst_sock_create(st, &tmp->le->rctl.addr, tmp->le->rctl.type,
> +                     tmp->le->rctl.proto, tmp->le->backlog);
> +     if (err)
> +             goto err_out_exit;
> +
> +     err = kst_poll_init(st);
> +     if (err)
> +             goto err_out_release;
> +
> +     return 0;
> +
> +err_out_release:
> +     kst_sock_release(st);
> +err_out_exit:
> +     kst_listen_flush(st);
> +     return err;
> +}
> +
> +/*
> + * Operations for different types of states.
> + * There are three:
> + * data state - created for remote node, when distributed storage connects
> + *   to remote node, which contain data.
> + * listen state - created for local export node, when remote distributed
> + *   storage's node connects to given node to get/put data.
> + * data export state - created for each client connected to above listen
> + *   state.
> + */
> +static struct kst_state_ops kst_listen_ops = {
> +     .init = &kst_listen_init,
> +     .exit = &kst_listen_exit,
> +     .ready = &kst_listen_ready,
> +};
> +static struct kst_state_ops kst_data_ops = {
> +     .init = &kst_data_init,
> +     .push = &kst_data_push,
> +     .exit = &kst_common_exit,
> +     .recovery = &kst_data_recovery,
> +};
> +
> +struct kst_state *kst_listener_state_init(struct dst_node *node,
> +             struct dst_le_template *tmp)
> +{
> +     return kst_state_init(node, DST_PERM_READ | DST_PERM_WRITE,
> +                     &kst_listen_ops, tmp);
> +}
> +
> +struct kst_state *kst_data_state_init(struct dst_node *node,
> +             struct socket *newsock)
> +{
> +     return kst_state_init(node, DST_PERM_READ | DST_PERM_WRITE,
> +                     &kst_data_ops, newsock);
> +}
> +
> +/*
> + * Remove all workers and associated states.
> + */
> +void kst_exit_all(void)
> +{
> +     struct kst_worker *w, *n;
> +
> +     list_for_each_entry_safe(w, n, &kst_worker_list, entry) {
> +             kst_worker_exit(w);
> +     }
> +}
> 
> -
> To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
> the body of a message to [EMAIL PROTECTED]
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
 drivers/block/dst/kst.c |    9 +++++++++
 1 files changed, 9 insertions(+), 0 deletions(-)

diff --git a/drivers/block/dst/kst.c b/drivers/block/dst/kst.c
index 8fa3387..d275bb9 100644
--- a/drivers/block/dst/kst.c
+++ b/drivers/block/dst/kst.c
@@ -1111,8 +1111,17 @@ static int kst_export_read_end_io(struct bio *bio, 
unsigned int size, int err)
                return 0;
        }
 
+       /* FIXME: This is a litle bit strange, but bio_end_io will 
+          be called one more time for this bio later from here:
+               ->kst_complete_req
+                 ->kst_bio_endio
+          At this moment network layer has already pinned bio's pages
+          and we may sefly release all pages, so let's reuse existing
+          kst_export_write_end_io method instead of writing new one.
+       */
        bio->bi_size = req->size = req->orig_size;
        bio->bi_rw = WRITE;
+       bio->bi_end_io = kst_export_write_end_io;
        if (use_csum)
                req->flags &= ~(DST_REQ_HEADER_SENT | DST_REQ_CHEKSUM_RECV);
 

Reply via email to