Hi All,
Attached patch (very very experimental, I haven't tested it out at all) builds the necessary infrastructure for doing efficient read-caching (at least for immutable files) in the page-cache of the kernel. Write-caching/buffering is not hard per-se, but making it efficient is hard I think (especially if we wish to avoid read-modify-writebacks). I haven't thought through that completely. Without modifying the VFS core to keep track of partially valid pages, I think this might be tricky to do. Please correct me if I am wrong here.

Let me know if you have any comment/suggestions etc.
I will test this out sometime this week/next weekend whenever I get time, but if someone feels like jumping in and testing/modifying it, that would be great too!
Thanks,
Murali

Index: src/kernel/linux-2.6/file.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/kernel/linux-2.6/file.c,v
retrieving revision 1.129
diff -u -r1.129 file.c
--- src/kernel/linux-2.6/file.c 29 Sep 2006 16:48:13 -0000      1.129
+++ src/kernel/linux-2.6/file.c 30 Oct 2006 06:54:02 -0000
@@ -26,10 +26,14 @@
     IO_WRITEX = 1,
 };
 
+struct rw_options;
+
 #ifdef PVFS2_LINUX_KERNEL_2_4
 static int pvfs2_precheck_file_write(struct file *file, struct inode *inode,
     size_t *count, loff_t *ppos);
 #endif
+static ssize_t wait_for_iox(struct rw_options *rw, struct iovec *vec, int 
nr_segs,
+        struct xtvec *xtvec, int xtnr_segs, size_t total_size);
 
 #define wake_up_daemon_for_return(op)             \
 do {                                              \
@@ -120,6 +124,7 @@
     /* whether the destination addresses are in user/kernel */
     int copy_to_user;
     const char *fnstr;
+    ssize_t count;
     /* Asynch I/O control block */
     struct kiocb *iocb;
     union {
@@ -128,8 +133,13 @@
             unsigned long nr_segs;
         } address;
         struct {
+            /* All pages spanning a given I/O operation */
             struct page  **pages;
             unsigned long nr_pages;
+            /* Only those pages that need to be fetched */
+            unsigned long nr_issue_pages;
+            struct page  **issue_pages;
+            struct list_head page_list; /* list of pages for which I/O needs 
to be done needed by read_cache_pages */
         } pages;
     } dest;
     union {
@@ -143,6 +153,191 @@
     } off;
 };
 
+static ssize_t precopy_buffers(int buffer_index, struct rw_options *rw, 
+        const struct iovec *vec, int nr_segs, size_t total_size)
+{
+    ssize_t ret = 0;
+
+    if (rw->type == IO_WRITEV)
+    {
+        /* 
+         * copy data from application/kernel by pulling it out 
+         * of the iovec. NOTE: target buffers can be addresses
+         * or struct page pointers
+         */
+        if (rw->copy_dest_type == COPY_TO_ADDRESSES) {
+            if (rw->copy_to_user)
+            {
+                ret = pvfs_bufmap_copy_iovec_from_user(
+                        buffer_index, vec, nr_segs, total_size);
+            }
+            else {
+                ret = pvfs_bufmap_copy_iovec_from_kernel(
+                        buffer_index, vec, nr_segs, total_size);
+            }
+        }
+        else {
+            ret = pvfs_bufmap_copy_from_pages(
+                        buffer_index, vec, nr_segs, total_size);
+        }
+        if (ret < 0)
+        {
+            gossip_err("%s: Failed to copy-in buffers. Please make sure "
+                        "that the pvfs2-client is running. %ld\n", 
+                        rw->fnstr, (long) ret);
+        }
+    }
+    return ret;
+}
+
+static ssize_t postcopy_buffers(int buffer_index, struct rw_options *rw,
+        const struct iovec *vec, int nr_segs, size_t total_size)
+{
+    ssize_t ret = 0;
+
+    if (rw->type == IO_READV)
+    {
+        /*
+         * copy data to application/kernel by pushing it out to the iovec.
+         * NOTE; target buffers can be addresses or struct page pointers
+         */
+        if (total_size)
+        {
+            if (rw->copy_dest_type == COPY_TO_ADDRESSES)
+            {
+                if (rw->copy_to_user)
+                {
+                    ret = pvfs_bufmap_copy_to_user_iovec(buffer_index, vec, 
+                            nr_segs, total_size);
+
+                }
+                else
+                {
+                    ret = pvfs_bufmap_copy_to_kernel_iovec(buffer_index, vec,
+                            nr_segs, total_size);
+                }
+            }
+            else {
+                ret = pvfs_bufmap_copy_to_pages(buffer_index, vec,
+                            nr_segs, total_size);
+            }
+            if (ret < 0)
+            {
+                gossip_err("%s: Failed to copy-out buffers.  Please make sure "
+                            "that the pvfs2-client is running (%ld)\n",
+                            rw->fnstr, (long) ret);
+            }
+        }
+    }
+    return ret;
+}
+
+/* Copy from page-cache to application address space */
+static ssize_t pagecache_read_actor(struct rw_options *rw, const struct iovec 
*vec,
+        int nr_segs, size_t total_actual_io)
+{
+    struct iovec *copied_iovec = NULL;
+    size_t ret = 0, amt_copied = 0, cur_copy_size = 0;
+    unsigned int seg, page_offset = 0;
+    int index = 0;
+    void *from_kaddr = NULL;
+    void __user *to_addr = NULL;
+    /*
+     * copy the passed in iovec so that we can change some of its fields
+     */
+    copied_iovec = (struct iovec *) kmalloc(nr_segs * sizeof(struct iovec),
+            PVFS2_BUFMAP_GFP_FLAGS);
+    if (copied_iovec == NULL)
+    {
+        gossip_err("pagecache_read_actor: failed allocating memory\n");
+        return -ENOMEM;
+    }
+    memcpy(copied_iovec, vec, nr_segs * sizeof(struct iovec));
+    /*
+     * Go through each segment in the iovec and make sure that
+     * the summation of iov_len is greater than the given size.
+     */
+    for (seg = 0, amt_copied = 0; seg < nr_segs; seg++)
+    {
+        amt_copied += copied_iovec[seg].iov_len;
+    }
+    if (amt_copied < total_actual_io)
+    {
+        gossip_err("pagecache_read_actor: computed total (%zd) is less than 
(%zd)\n",
+                amt_copied, total_actual_io);
+        kfree(copied_iovec);
+        return -EINVAL;
+    }
+    index = 0;
+    amt_copied = 0;
+    seg = 0;
+    page_offset = 0;
+    /* 
+     * Go through each segment in the iovec and copy from the page-cache,
+     * but make sure that we do so one page at a time.
+     */
+    while (amt_copied < total_actual_io)
+    {
+       struct iovec *iv = &copied_iovec[seg];
+        int inc_index = 0;
+
+        if (index >= rw->dest.pages.nr_pages) {
+            gossip_err("index cannot exceed number of allocated pages %ld\n", 
+                    (long) rw->dest.pages.nr_pages);
+            kfree(copied_iovec);
+            return -EINVAL;
+        }
+
+        if (iv->iov_len < (PAGE_CACHE_SIZE - page_offset))
+        {
+            cur_copy_size = iv->iov_len;
+            seg++;
+            to_addr = iv->iov_base;
+            inc_index = 0;
+        }
+        else if (iv->iov_len == (PAGE_CACHE_SIZE - page_offset))
+        {
+            cur_copy_size = iv->iov_len;
+            seg++;
+            to_addr = iv->iov_base;
+            inc_index = 1;
+        }
+        else 
+        {
+            cur_copy_size = (PAGE_CACHE_SIZE - page_offset);
+            to_addr = iv->iov_base;
+            iv->iov_base += cur_copy_size;
+            iv->iov_len  -= cur_copy_size;
+            inc_index = 1;
+        }
+        from_kaddr = pvfs2_kmap(rw->dest.pages.pages[index]);
+        ret = copy_to_user(to_addr, from_kaddr + page_offset, cur_copy_size);
+        pvfs2_kunmap(rw->dest.pages.pages[index]);
+#if 0
+        gossip_debug(GOSSIP_BUFMAP_DEBUG, "pagecache_read_actor: copying to 
user %p from "
+                "kernel %p %d bytes (from_kaddr:%p, page_offset:%d)\n",
+                to_addr, from_kaddr + page_offset, cur_copy_size, from_kaddr, 
page_offset); 
+#endif
+        if (ret)
+        {
+            gossip_err("Failed to copy data to user space\n");
+            kfree(copied_iovec);
+            return -EFAULT;
+        }
+
+        amt_copied += cur_copy_size;
+        if (inc_index) {
+            page_offset = 0;
+            index++;
+        }
+        else {
+            page_offset += cur_copy_size;
+        }
+    }
+    kfree(copied_iovec);
+    return 0;
+}
+
 /*
  * Post and wait for the I/O upcall to finish
  * @rw - contains state information to initiate the I/O operation
@@ -194,29 +389,12 @@
     gossip_debug(GOSSIP_FILE_DEBUG, "%s: copy_to_user %d nr_segs %u, "
             "offset: %llu total_size: %zd\n", rw->fnstr, rw->copy_to_user, 
             nr_segs, llu(*(rw->off.io.offset)), total_size);
-    if (rw->type == IO_WRITEV)
+    /* Stage 1: copy the buffers into client-core's address space */
+    if ((ret = precopy_buffers(buffer_index, rw, vec, nr_segs, total_size)) < 
0) 
     {
-        /* 
-         * copy data from application/kernel by pulling it out 
-         * of the iovec.
-         */
-        if (rw->copy_to_user)
-        {
-            ret = pvfs_bufmap_copy_iovec_from_user(
-                    buffer_index, vec, nr_segs, total_size);
-        }
-        else {
-            ret = pvfs_bufmap_copy_iovec_from_kernel(
-                    buffer_index, vec, nr_segs, total_size);
-        }
-        if (ret < 0)
-        {
-            gossip_lerr("Failed to copy-in buffers. Please make sure "
-                        "that the pvfs2-client is running. %ld\n", 
-                        (long) ret);
-            goto out;
-        }
+        goto out;
     }
+    /* Stage 2: Service the I/O operation */
     ret = service_operation(new_op, rw->fnstr,
          get_interruptible_flag(rw->inode));
 
@@ -249,38 +427,18 @@
           }
           goto out;
     }
-
-    if (rw->type == IO_READV)
-    {
-        /*
-         * copy data to application/kernel by pushing it out to the iovec.
+    /* Stage 3: Post copy buffers from client-core's address space */
+    if ((ret = postcopy_buffers(buffer_index, rw, vec, nr_segs, 
+                    new_op->downcall.resp.io.amt_complete)) < 0) {
+        /* put error codes in downcall so that handle_io_error()
+         * preserves it properly 
          */
-        if (new_op->downcall.resp.io.amt_complete)
-        {
-            if (rw->copy_to_user)
-            {
-                ret = pvfs_bufmap_copy_to_user_iovec(buffer_index, vec, 
-                        nr_segs, new_op->downcall.resp.io.amt_complete);
-            }
-            else
-            {
-                ret = pvfs_bufmap_copy_to_kernel_iovec(buffer_index, vec,
-                        nr_segs, new_op->downcall.resp.io.amt_complete);
-            }
-            if (ret < 0)
-            {
-                gossip_lerr("%s: Failed to copy-out buffers.  Please make sure 
"
-                            "that the pvfs2-client is running (%ld)\n",
-                            rw->fnstr, (long) ret);
-                /* put error codes in downcall so that handle_io_error()
-                 * preserves it properly */
-                new_op->downcall.status = ret;
-                handle_io_error();
-                goto out;
-            }
-        }
+        new_op->downcall.status = ret;
+        handle_io_error();
+        goto out;
     }
     ret = new_op->downcall.resp.io.amt_complete;
+    gossip_debug(GOSSIP_FILE_DEBUG, "wait_for_io returning %ld\n", (long) ret);
     /*
       tell the device file owner waiting on I/O that this read has
       completed and it can return now.  in this exact case, on
@@ -456,6 +614,210 @@
     return max_nr_iovecs;
 }
 
+static void cleanup_cache_pages(unsigned long page_idx, struct rw_options *rw, 
int error)
+{
+    unsigned long j;
+    struct page *page;
+
+    /* Release any newly allocated pages */
+    list_for_each_entry (page, &rw->dest.pages.page_list, lru) {
+        list_del(&page->lru);
+        page_cache_release(page);
+    }
+    /* and pinned existing ones as well */
+    for (j = 0; j < page_idx; j++) {
+        if (rw->dest.pages.pages[j]) {
+            if (error < 0)
+                SetPageError(rw->dest.pages.pages[j]);
+            else
+                SetPageUptodate(rw->dest.pages.pages[j]);
+            unlock_page(rw->dest.pages.pages[j]);
+        }
+    }
+    kfree(rw->dest.pages.pages);
+    rw->dest.pages.pages = NULL;
+    rw->dest.pages.nr_pages = 0;
+    kfree(rw->dest.pages.issue_pages);
+    rw->dest.pages.issue_pages = NULL;
+    rw->dest.pages.nr_issue_pages = 0;
+    return;
+}
+
+/* callback from read_cache_pages */
+static int pvfs2_readpages_fill_cb(void *_data, struct page *page)
+{
+    struct rw_options *rw = (struct rw_options *) _data;
+    rw->dest.pages.issue_pages[rw->dest.pages.nr_issue_pages++] = page;
+    return 0;
+}
+
+/* Locate the pages of the file blocks from the page-cache and 
+ * store them in the rw_options control block.
+ * Note: if we don't locate, we allocate them.
+ * After that we increment their ref count so that we know for sure that
+ * they won't get swapped out.
+ */
+static int locate_file_pages(struct rw_options *rw, size_t total_size)
+{
+    struct address_space *mapping;
+    loff_t offset, isize;
+    unsigned long page_idx, begin_index, end_index, nr_to_read;
+    int ret = 0;
+    struct page *page;
+    
+    if (!rw || !rw->file || !rw->inode) {
+        gossip_err("locate_file_pages: invalid options\n");
+        return -EINVAL;
+    }
+    isize = i_size_read(rw->inode);
+    rw->copy_dest_type = COPY_TO_PAGES;
+    INIT_LIST_HEAD(&rw->dest.pages.page_list);
+    mapping = rw->file->f_mapping;
+    offset = *(rw->off.io.offset);
+    if (isize == 0) {
+        rw->dest.pages.nr_pages = 0;
+        rw->dest.pages.pages = NULL;
+        rw->dest.pages.nr_issue_pages = 0;
+        rw->dest.pages.issue_pages = NULL;
+        return 0;
+    }
+    begin_index = offset >> PAGE_CACHE_SHIFT;
+    end_index = (offset + total_size) >> PAGE_CACHE_SHIFT;
+    nr_to_read = end_index - begin_index + 1;
+    rw->dest.pages.nr_pages = nr_to_read;
+    rw->dest.pages.pages = (struct page **) kmalloc(nr_to_read * sizeof(struct 
page *), 
+            PVFS2_BUFMAP_GFP_FLAGS);
+    if (!rw->dest.pages.pages) {
+        gossip_err("locate_file_pages: could not allocate memory\n");
+        return -ENOMEM;
+    }
+    memset(rw->dest.pages.pages, 0, nr_to_read * sizeof(struct page *));
+    gossip_debug(GOSSIP_FILE_DEBUG, "locate_file_pages: read %ld pages\n",
+            nr_to_read);
+    /* Preallocate all pages, and increase their ref counts if they are in 
cache */
+    for (page_idx = 0; page_idx < nr_to_read; page_idx++) {
+        pgoff_t page_offset = begin_index + page_idx;
+
+        if (page_offset > end_index)
+            break;
+        page = find_get_page(mapping, page_offset);
+        /* NOTE: page is now pinned. we need to call page_cache_release() */
+        if (page) {
+            rw->dest.pages.pages[page_idx] = page;
+            continue;
+        }
+        /* Allocate, but don't add it to the LRU list yet */
+        page = page_cache_alloc_cold(mapping);
+        if (!page) {
+            gossip_err("locate_file_pages: could not allocate page cache\n");
+            break;
+        }
+        page->index = page_offset;
+        /* Add it to our internal private list */
+        list_add_tail(&page->lru, &rw->dest.pages.page_list);
+        rw->dest.pages.pages[page_idx] = page;
+        ret++;
+    }
+    /* cleanup in case of error */
+    if (page_idx != nr_to_read) {
+        ret = -ENOMEM;
+        goto cleanup;
+    }
+    /* Allocate memory for the pages for which I/O needs to be issued against 
*/
+    rw->dest.pages.nr_issue_pages = 0;
+    rw->dest.pages.issue_pages = (struct page **) kmalloc(ret * sizeof(struct 
page *),
+            PVFS2_BUFMAP_GFP_FLAGS);
+    if (!rw->dest.pages.issue_pages) {
+        ret = -ENOMEM;
+        goto cleanup;
+    }
+    memset(rw->dest.pages.issue_pages, 0, ret * sizeof(struct page *));
+    gossip_debug(GOSSIP_FILE_DEBUG, "locate_file_pages: issue %ld I/O\n", 
(long) ret);
+    /* read_cache_pages can now be called on the list of pages */
+    ret = read_cache_pages(mapping, &rw->dest.pages.page_list, 
pvfs2_readpages_fill_cb, rw);
+    if (ret) {
+        goto cleanup;
+    }
+    /* NOTE: After a successful call to read_cache_pages(), we will have all 
pages ref counted by 1 more
+     * so that they won't be replaced and the ones on which we need to I/O are 
locked as well.
+     */
+out:
+    return ret;
+cleanup:
+    cleanup_cache_pages(page_idx, rw, ret);
+    goto out;
+}
+
+/*
+ * NOTE: Currently only immutable files pass their I/O
+ * through the cache.
+ * Preparation for cached I/O requires that we locate all the file block
+ * in the page-cache and stashing those pointers.
+ * Returns the actual size of completed I/O.
+ */
+static ssize_t wait_for_cached_io(struct rw_options *old_rw, struct iovec 
*vec, 
+        int nr_segs, size_t total_size)
+{
+    ssize_t err = 0, total_actual_io;
+    struct rw_options rw;
+
+    memcpy(&rw, old_rw, sizeof(struct rw_options));
+    if (rw.type != IO_READV) {
+        gossip_err("do_cached_readv_writev: writes are not handled yet!\n");
+        return -EOPNOTSUPP;
+    }
+    /* (Al)locate all the pages in the pagecache first */
+    if ((err = locate_file_pages(&rw, total_size)) < 0) {
+        gossip_err("do_cached_readv_writev: error in locating pages %d\n", 
err);
+        return err;
+    }
+    /* Issue and wait for I/O only for pages that are not uptodate 
+     * or are not found in the cache 
+     */
+    if (rw.dest.pages.nr_issue_pages) {
+        struct iovec *uncached_vec;
+        struct xtvec *uncached_xtvec;
+        int i;
+
+        uncached_vec = (struct iovec *) kmalloc(rw.dest.pages.nr_issue_pages * 
sizeof(struct iovec *), PVFS2_BUFMAP_GFP_FLAGS);
+        if (!uncached_vec) {
+            gossip_err("do_cached_readv_writev: out of memory\n");
+            err = -ENOMEM;
+            goto cleanup;
+        }
+        uncached_xtvec = (struct xtvec *) kmalloc(rw.dest.pages.nr_issue_pages 
* sizeof(struct xtvec *), PVFS2_BUFMAP_GFP_FLAGS);
+        if (!uncached_xtvec) {
+            gossip_err("do_cached_readv_writev: out of memory\n");
+            kfree(uncached_vec);
+            err = -ENOMEM;
+            goto cleanup;
+        }
+        for (i = 0; i < rw.dest.pages.nr_issue_pages; i++) {
+            uncached_vec[i].iov_base = rw.dest.pages.issue_pages[i];
+            uncached_vec[i].iov_len = PAGE_CACHE_SIZE;
+            uncached_xtvec[i].xtv_off = (rw.dest.pages.issue_pages[i]->index 
<< PAGE_CACHE_SHIFT);
+            uncached_xtvec[i].xtv_len = PAGE_CACHE_SIZE;
+        }
+        err = wait_for_iox(&rw, 
+                uncached_vec, rw.dest.pages.nr_issue_pages, 
+                uncached_xtvec, rw.dest.pages.nr_issue_pages, 
+                (rw.dest.pages.nr_issue_pages << PAGE_CACHE_SHIFT));
+        kfree(uncached_xtvec);
+        kfree(uncached_vec);
+        if (err < 0) {
+            gossip_err("do_cached_readv_writev: wait_for_iox failed with error 
%ld\n", (long) err);
+            goto cleanup;
+        }
+    }
+    /* total I/O size = uncached I/O + cached I/O sizes */
+    total_actual_io = err + ((rw.dest.pages.nr_pages - 
rw.dest.pages.nr_issue_pages) << PAGE_CACHE_SHIFT);
+    /* Copy the data from the page-cache to the application's address space */
+    err = pagecache_read_actor(&rw, vec, nr_segs, total_actual_io);
+cleanup:
+    cleanup_cache_pages(rw.dest.pages.nr_pages, &rw, err);
+    return err == 0 ? total_actual_io : err;
+}
+
 /*
  * Common entry point for read/write/readv/writev
  */
@@ -550,6 +912,7 @@
         ret = 0;
         goto out;
     }
+    rw->count = count;
     /*
      * if the total size of data transfer requested is greater than
      * the kernel-set blocksize of PVFS2, then we split the iovecs
@@ -619,8 +982,14 @@
         /* how much to transfer in this loop iteration */
         each_count = (((count - total_count) > pvfs_bufmap_size_query()) ?
                       pvfs_bufmap_size_query() : (count - total_count));
-        /* and push the I/O through */
-        ret = wait_for_io(rw, ptr, seg_array[seg], each_count);
+        /* if a file is immutable, make sure its I/O is staged through the 
cache */
+        if (IS_IMMUTABLE(rw->inode)) {
+            ret = wait_for_cached_io(rw, ptr, seg_array[seg], each_count);
+        }
+        else {
+            /* push the I/O through */
+            ret = wait_for_io(rw, ptr, seg_array[seg], each_count);
+        }
         if (ret < 0)
         {
             goto out;
@@ -728,13 +1097,12 @@
     if (IS_IMMUTABLE(rw.inode)) 
     {
         rw.readahead_size = (rw.inode)->i_size;
-        return generic_file_read(file, buf, count, offset);
     }
     else 
     {
         rw.readahead_size = 0;
-        return do_direct_readv_writev(&rw);
     }
+    return do_direct_readv_writev(&rw);
 }
 
 /** Write data from a contiguous user buffer into a file at a specified
@@ -1067,34 +1435,17 @@
     }
     gossip_debug(GOSSIP_FILE_DEBUG, "%s: copy_to_user %d nr_segs %d, "
             "xtnr_segs: %d "
-            "total_size: %zd\n",
+            "total_size: %zd "
+            "copy_dst_type %d\n",
             rw->fnstr, rw->copy_to_user, 
             nr_segs, xtnr_segs,
-            total_size);
+            total_size, rw->copy_dest_type);
 
-    if (rw->type == IO_WRITEX)
-    {
-        /* copy data from application by pulling it out
-         * of the iovec.
-         */
-        if (rw->copy_to_user)
-        {
-            ret = pvfs_bufmap_copy_iovec_from_user(
-                    buffer_index, vec, nr_segs, total_size);
-        }
-        else {
-            ret = pvfs_bufmap_copy_iovec_from_kernel(
-                    buffer_index, vec, nr_segs, total_size);
-        }
-        if (ret < 0)
-        {
-            gossip_lerr("%s: failed to copy-in user buffer. Please make sure "
-                    " that the pvfs2-client is running. %ld\n",
-                    rw->fnstr, (long) ret);
-            goto out;
-        }
+    /* Stage 1: Copy in buffers */
+    if ((ret = precopy_buffers(buffer_index, rw, vec, nr_segs, total_size)) < 
0) {
+        goto out;
     }
-    /* whew! finally service this operation */
+    /* Stage 2: whew! finally service this operation */
     ret = service_operation(new_op, rw->fnstr,
             get_interruptible_flag(rw->inode));
     if (ret < 0)
@@ -1128,34 +1479,14 @@
     }
     gossip_debug(GOSSIP_FILE_DEBUG, "downcall returned %lld\n",
             llu(new_op->downcall.resp.iox.amt_complete));
-    if (rw->type == IO_READX)
-    {
-        /* copy data to application by pushing it out to the iovec.
-         */
-        if (new_op->downcall.resp.iox.amt_complete)
-        {
-            if (rw->copy_to_user)
-            {
-                ret = pvfs_bufmap_copy_to_user_iovec(buffer_index, vec,
-                        nr_segs, new_op->downcall.resp.iox.amt_complete);
-            }
-            else
-            {
-                ret = pvfs_bufmap_copy_to_kernel_iovec(buffer_index, vec, 
-                        nr_segs, new_op->downcall.resp.iox.amt_complete);
-            }
-            if (ret < 0)
-            {
-                gossip_lerr("%s: failed to copy-out user buffers. Please make 
sure "
-                        " that the pvfs2-client is running. (%ld)\n", 
-                        rw->fnstr, (long) ret);
-                /* put error codes in downcall so that handle_io_error()
-                 * preserves it properly */
-                new_op->downcall.status = ret;
-                handle_io_error();
-                goto out;
-            }
-        }
+    /* Stage 3: Post copy buffers */
+    if ((ret = postcopy_buffers(buffer_index, rw, vec, nr_segs, 
+                    new_op->downcall.resp.iox.amt_complete)) < 0) {
+        /* put error codes in downcall so that handle_io_error()
+         * preserves it properly */
+        new_op->downcall.status = ret;
+        handle_io_error();
+        goto out;
     }
     ret = new_op->downcall.resp.iox.amt_complete;
     gossip_debug(GOSSIP_FILE_DEBUG, "wait_for_iox returning %ld\n", (long) 
ret);
Index: src/kernel/linux-2.6/pvfs2-bufmap.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/kernel/linux-2.6/pvfs2-bufmap.c,v
retrieving revision 1.46
diff -u -r1.46 pvfs2-bufmap.c
--- src/kernel/linux-2.6/pvfs2-bufmap.c 28 Sep 2006 05:13:41 -0000      1.46
+++ src/kernel/linux-2.6/pvfs2-bufmap.c 30 Oct 2006 06:54:03 -0000
@@ -472,6 +472,120 @@
     return 0;
 }
 
+/*
+ * pvfs_bufmap_copy_to_pages() 
+ * Copies data from client-core's address space to the specified pages 
(typically page-cache pages)
+ * for a specified size and number of pages.
+ */
+int pvfs_bufmap_copy_to_pages(int buffer_index, const struct iovec *vec, 
+        unsigned long nr_segs, size_t size)
+{
+    size_t amt_copied = 0, amt_remaining = 0, cur_copy_size = 0;
+    int index = 0;
+    void *from_kaddr, *to_kaddr;
+    struct pvfs_bufmap_desc *from = &desc_array[buffer_index];
+    struct page *page;
+
+    gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_to_pages: nr_pages 
%lu,"
+            "index %d, size %zd\n", nr_segs, buffer_index, size);
+
+    if (bufmap_init == 0)
+    {
+        gossip_err("pvfs_bufmap_copy_to_pages: not yet "
+                    "initialized.\n");
+        gossip_err("pvfs2: please confirm that pvfs2-client daemon is 
running.\n");
+        return -EIO;
+    }
+
+    while (amt_copied < size)
+    {
+        if (index >= nr_segs) 
+        {
+            gossip_err("pvfs_bufmap_copy_to_pages: count cannot exceed number 
of"
+                    "pages(%lu)\n", nr_segs);
+            return -EIO;
+        }
+        page = (struct page *) vec[index].iov_base;
+        if (page == NULL) {
+            gossip_err("pvfs_bufmap_copy_to_pages: invalid page pointer\n");
+            return -EIO;
+        }
+        amt_remaining = (size - amt_copied);
+        cur_copy_size =
+            ((amt_remaining > PAGE_SIZE) ? PAGE_SIZE : amt_remaining);
+
+        from_kaddr = pvfs2_kmap(from->page_array[index]);
+        to_kaddr = pvfs2_kmap(page);
+        memcpy(to_kaddr, from_kaddr, cur_copy_size);
+        /* zero out remaining page */
+        if (cur_copy_size < PAGE_SIZE) {
+            memset(to_kaddr + cur_copy_size, 0, PAGE_SIZE - cur_copy_size);
+        }
+        pvfs2_kunmap(page);
+        pvfs2_kunmap(from->page_array[index]);
+
+        amt_copied += cur_copy_size;
+        index++;
+    }
+    return 0;
+}
+
+/*
+ * pvfs_bufmap_copy_from_pages() 
+ * Copies data to client-core's address space from the specified target 
+ * pages (typically the kernel's page-cache)
+ * for a given size and number of pages.
+ * NOTE: iovec is expected to store pointers to struct page
+ */
+int pvfs_bufmap_copy_from_pages(int buffer_index, const struct iovec *vec, 
+        unsigned long nr_segs, size_t size)
+{
+    size_t amt_copied = 0, amt_remaining = 0, cur_copy_size = 0;
+    int index = 0;
+    void *from_kaddr, *to_kaddr;
+    struct pvfs_bufmap_desc *to = &desc_array[buffer_index];
+    struct page *page;
+
+    gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_from_pages: nr_pages 
%lu,"
+            "index %d, size %zd\n", nr_segs, buffer_index, size);
+
+    if (bufmap_init == 0)
+    {
+        gossip_err("pvfs_bufmap_copy_from_pages: not yet "
+                    "initialized.\n");
+        gossip_err("pvfs2: please confirm that pvfs2-client daemon is 
running.\n");
+        return -EIO;
+    }
+
+    while (amt_copied < size)
+    {
+        if (index >= nr_segs) {
+            gossip_err("pvfs_bufmap_copy_from_pages: count cannot exceed 
number of"
+                    "pages(%lu)\n", nr_segs);
+            return -EIO;
+        }
+        page = (struct page *) vec[index].iov_base;
+        if (page == NULL) {
+            gossip_err("pvfs_bufmap_copy_from_pages: invalid page pointer\n");
+            return -EIO;
+        }
+        amt_remaining = (size - amt_copied);
+        cur_copy_size =
+            ((amt_remaining > PAGE_SIZE) ? PAGE_SIZE : amt_remaining);
+
+        to_kaddr = pvfs2_kmap(to->page_array[index]);
+        from_kaddr = pvfs2_kmap(page);
+        memcpy(to_kaddr, from_kaddr, cur_copy_size);
+        pvfs2_kunmap(page);
+        pvfs2_kunmap(to->page_array[index]);
+
+        amt_copied += cur_copy_size;
+        index++;
+    }
+    return 0;
+}
+
+
 /* pvfs_bufmap_copy_iovec_from_user()
  *
  * copies data from several user space address's in an iovec
Index: src/kernel/linux-2.6/pvfs2-bufmap.h
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/kernel/linux-2.6/pvfs2-bufmap.h,v
retrieving revision 1.16
diff -u -r1.16 pvfs2-bufmap.h
--- src/kernel/linux-2.6/pvfs2-bufmap.h 28 Sep 2006 05:13:41 -0000      1.16
+++ src/kernel/linux-2.6/pvfs2-bufmap.h 30 Oct 2006 06:54:03 -0000
@@ -77,6 +77,18 @@
     int buffer_index,
     size_t size);
 
+int pvfs_bufmap_copy_to_pages(
+    int buffer_index, 
+    const struct iovec *vec, 
+    unsigned long nr_segs, 
+    size_t size);
+
+int pvfs_bufmap_copy_from_pages(
+    int buffer_index, 
+    const struct iovec *vec, 
+    unsigned long nr_segs, 
+    size_t size);
+
 #ifdef HAVE_AIO_VFS_SUPPORT
 size_t pvfs_bufmap_copy_to_user_task(
         struct task_struct *tsk,
_______________________________________________
Pvfs2-developers mailing list
[email protected]
http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers

Reply via email to