flow-proto-tuning.patch:
-----------
This patch adds "FlowBufferSizeBytes" and "FlowBuffersPerFlow" options to the configuration file format. They allow you to specify the buffer size that the default flow protocol will use as well as the maximum number of buffers to use per flow. Note that if you change either of these parameters, then you need to remount any active clients so that they pick up the configuration change before performing any I/O.

max-aio.patch:
----------
This patch adds "TroveMaxConcurrentIO" to the configuration file format. It allows you to specify the maximum number of I/O operations that trove will allow to proceed concurrently (currently 16). Note from the previous email regarding AIO that depending on your access pattern, AIO may queue all of your operations anyway regardless of this setting. It probably doesn't have much effect unless you are accessing more than one file at a time, or if you are using an alternative to the stock AIO implementation.

As a side note, I generally am not including any pvfs2-genconfig changes with these types of patches, but I can make them available if anyone is interested. We typically add command line arguments to pvfs2-genconfig for every parameter so that we can reproduce consistent configuration files by calling pvfs2-genconfig from a script, but I don't know if this is generally useful or just adds clutter :)

-Phil
diff -Naur pvfs2/src/client/sysint/sys-io.sm pvfs2-new/src/client/sysint/sys-io.sm
--- pvfs2/src/client/sysint/sys-io.sm	2006-06-16 19:22:47.000000000 +0200
+++ pvfs2-new/src/client/sysint/sys-io.sm	2006-08-02 23:26:24.000000000 +0200
@@ -1639,6 +1639,7 @@
     PVFS_object_attr *attr = NULL;
     struct server_configuration_s *server_config = NULL;
     unsigned long status_user_tag = 0;
+    struct filesystem_configuration_s * fs_config;
 
     gossip_debug(GOSSIP_IO_DEBUG, "- build_context_flow called\n");
 
@@ -1703,7 +1704,16 @@
 
     status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FLOW);
 
-    server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
+    server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);    
+
+    fs_config = PINT_config_find_fs_id(server_config, cur_ctx->msg.fs_id);
+    if(fs_config)
+    {
+        /* pick up any buffer settings overrides from fs conf */
+        cur_ctx->flow_desc.buffer_size = fs_config->fp_buffer_size;
+        cur_ctx->flow_desc.buffers_per_flow = fs_config->fp_buffers_per_flow;
+    }
+
     ret = job_flow(
         &cur_ctx->flow_desc, sm_p, status_user_tag,
         &cur_ctx->flow_status, &cur_ctx->flow_job_id,
diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/common/misc/server-config.c
--- pvfs2/src/common/misc/server-config.c	2006-08-02 17:13:00.000000000 +0200
+++ pvfs2-new/src/common/misc/server-config.c	2006-08-02 23:29:17.000000000 +0200
@@ -67,6 +67,8 @@
 static DOTCONF_CB(get_bmi_module_list);
 static DOTCONF_CB(get_flow_module_list);
 static DOTCONF_CB(get_handle_recycle_timeout_seconds);
+static DOTCONF_CB(get_flow_buffer_size_bytes);
+static DOTCONF_CB(get_flow_buffers_per_flow);
 static DOTCONF_CB(get_attr_cache_keywords_list);
 static DOTCONF_CB(get_attr_cache_size);
 static DOTCONF_CB(get_attr_cache_max_num_elems);
@@ -573,6 +575,14 @@
     {"FlowModules",ARG_LIST, get_flow_module_list,NULL,
         CTX_DEFAULTS|CTX_GLOBAL,"flowproto_multiqueue,"},
 
+    /* buffer size to use for bulk data transfers */
+    {"FlowBufferSizeBytes", ARG_INT,
+         get_flow_buffer_size_bytes, NULL, CTX_FILESYSTEM,"262144"},
+
+    /* number of buffers to use for bulk data transfers */
+    {"FlowBuffersPerFlow", ARG_INT,
+         get_flow_buffers_per_flow, NULL, CTX_FILESYSTEM,"8"},
+
     /* The TROVE storage layer has a management component that deals with
      * allocating handle values for new metafiles and datafiles.  The underlying
      * trove module can be given a hint to tell it how long to wait before
@@ -979,6 +989,8 @@
     fs_conf->encoding = ENCODING_DEFAULT;
     fs_conf->trove_sync_meta = TROVE_SYNC;
     fs_conf->trove_sync_data = TROVE_SYNC;
+    fs_conf->fp_buffer_size = -1;
+    fs_conf->fp_buffers_per_flow = -1;
 
     if (!config_s->file_systems)
     {
@@ -1382,6 +1394,31 @@
 }
 
 
+DOTCONF_CB(get_flow_buffer_size_bytes)
+{
+    struct filesystem_configuration_s *fs_conf = NULL;
+    struct server_configuration_s *config_s = 
+        (struct server_configuration_s *)cmd->context;
+
+    fs_conf = (struct filesystem_configuration_s *)
+        PINT_llist_head(config_s->file_systems);
+    fs_conf->fp_buffer_size = cmd->data.value;
+    return NULL;
+}
+
+DOTCONF_CB(get_flow_buffers_per_flow)
+{
+    struct filesystem_configuration_s *fs_conf = NULL;
+    struct server_configuration_s *config_s = 
+        (struct server_configuration_s *)cmd->context;
+
+    fs_conf = (struct filesystem_configuration_s *)
+        PINT_llist_head(config_s->file_systems);
+    fs_conf->fp_buffers_per_flow = cmd->data.value;
+
+    return NULL;
+}
+
 DOTCONF_CB(get_attr_cache_keywords_list)
 {
     int i = 0, len = 0;
@@ -2298,6 +2335,9 @@
             src_fs->attr_cache_max_num_elems;
         dest_fs->trove_sync_meta = src_fs->trove_sync_meta;
         dest_fs->trove_sync_data = src_fs->trove_sync_data;
+
+        dest_fs->fp_buffer_size = src_fs->fp_buffer_size;
+        dest_fs->fp_buffers_per_flow = src_fs->fp_buffers_per_flow;
     }
 }
 
diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/common/misc/server-config.h
--- pvfs2/src/common/misc/server-config.h	2006-07-13 07:11:40.000000000 +0200
+++ pvfs2-new/src/common/misc/server-config.h	2006-08-02 23:28:17.000000000 +0200
@@ -85,6 +85,9 @@
     int coalescing_high_watermark;
     int coalescing_low_watermark;
 
+    int fp_buffer_size;
+    int fp_buffers_per_flow;
+
 } filesystem_configuration_s;
 
 typedef struct distribution_param_configuration_s
diff -Naur pvfs2/src/io/flow/flow.c pvfs2-new/src/io/flow/flow.c
--- pvfs2/src/io/flow/flow.c	2006-06-28 23:03:08.000000000 +0200
+++ pvfs2-new/src/io/flow/flow.c	2006-08-02 23:26:24.000000000 +0200
@@ -306,6 +306,8 @@
     flow_d->aggregate_size = -1;
     flow_d->state = FLOW_INITIAL;
     flow_d->type = FLOWPROTO_DEFAULT;
+    flow_d->buffers_per_flow = -1;
+    flow_d->buffer_size = -1;
 
     flow_d->flow_mutex = (tmp_mutex ? tmp_mutex : gen_mutex_build());
     assert(flow_d->flow_mutex);
diff -Naur pvfs2/src/io/flow/flow.h pvfs2-new/src/io/flow/flow.h
--- pvfs2/src/io/flow/flow.h	2005-12-14 22:50:25.000000000 +0100
+++ pvfs2-new/src/io/flow/flow.h	2006-08-02 23:26:24.000000000 +0200
@@ -120,6 +120,10 @@
     /* information about the datafile that this flow will access */
     PINT_request_file_data file_data;
 
+    /* the buffer settings may be ignored by some protocols */
+    int buffer_size;            /* buffer size to use */
+    int buffers_per_flow;       /* number of buffers to allow per flow */
+
 	/***********************************************************/
     /* fields that can be read publicly upon completion */
 
diff -Naur pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c
--- pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c	2006-05-28 18:52:08.000000000 +0200
+++ pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c	2006-08-02 23:26:24.000000000 +0200
@@ -22,8 +22,12 @@
 #include "pint-perf-counter.h"
 #include "pvfs2-internal.h"
 
+/* the following buffer settings are used by default if none are specified in
+ * the flow descriptor
+ */
 #define BUFFERS_PER_FLOW 8
 #define BUFFER_SIZE (256*1024)
+
 #define MAX_REGIONS 64
 
 #define FLOW_CLEANUP(__flow_data)                                     \
@@ -72,7 +76,7 @@
 struct fp_private_data
 {
     flow_descriptor *parent;
-    struct fp_queue_item prealloc_array[BUFFERS_PER_FLOW];
+    struct fp_queue_item* prealloc_array;
     struct qlist_head list_link;
     PVFS_size total_bytes_processed;
     int next_seq;
@@ -540,7 +544,21 @@
             PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
     }
 
-    for(i=0; i<BUFFERS_PER_FLOW; i++)
+    if(flow_d->buffer_size < 1)
+        flow_d->buffer_size = BUFFER_SIZE;
+    if(flow_d->buffers_per_flow < 1)
+        flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
+        
+    flow_data->prealloc_array = (struct fp_queue_item*)
+        malloc(flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    if(!flow_data->prealloc_array)
+    {
+        free(flow_data);
+        return(-PVFS_ENOMEM);
+    }
+    memset(flow_data->prealloc_array, 0,
+        flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    for(i=0; i<flow_d->buffers_per_flow; i++)
     {
         flow_data->prealloc_array[i].parent = flow_d;
         flow_data->prealloc_array[i].bmi_callback.data = 
@@ -557,7 +575,7 @@
         /* put all of the buffers on empty list, we don't really do any
          * queueing for this type of flow
          */
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -583,7 +601,7 @@
         /* put all of the buffers on empty list, we don't really do any
          * queueing for this type of flow
          */
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -604,9 +622,9 @@
     else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
         flow_d->dest.endpoint_id == BMI_ENDPOINT)
     {
-        flow_data->initial_posts = BUFFERS_PER_FLOW;
+        flow_data->initial_posts = flow_d->buffers_per_flow;
         gen_mutex_lock(flow_data->parent->flow_mutex);
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
                 "flowproto-multiqueue forcing bmi_send_callback_fn.\n");
@@ -635,7 +653,7 @@
         flow_data->initial_posts = 1;
 
         /* place remaining buffers on "empty" queue */
-        for(i=1; i<BUFFERS_PER_FLOW; i++)
+        for(i=1; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -760,7 +778,7 @@
         {
             /* if the q_item has not been used, allocate a buffer */
             q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
-                BUFFER_SIZE, BMI_RECV);
+                q_item->parent->buffer_size, BMI_RECV);
             /* TODO: error handling */
             assert(q_item->buffer);
             q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -784,7 +802,8 @@
                 result_tmp->offset_list;
             result_tmp->result.size_array = 
                 result_tmp->size_list;
-            result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
+            result_tmp->result.bytemax = flow_data->parent->buffer_size - 
+                bytes_processed;
             result_tmp->result.bytes = 0;
             result_tmp->result.segmax = MAX_REGIONS;
             result_tmp->result.segs = 0;
@@ -813,10 +832,10 @@
                 tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
                 bytes_processed += old_result_tmp->result.bytes;
             }
-        }while(bytes_processed < BUFFER_SIZE && 
+        }while(bytes_processed < flow_data->parent->buffer_size && 
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert(bytes_processed <= BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
         if(bytes_processed == 0)
         {        
             qlist_del(&q_item->list_link);
@@ -830,7 +849,7 @@
         ret = BMI_post_recv(&q_item->posted_id,
             q_item->parent->src.u.bmi.address,
             q_item->buffer,
-            BUFFER_SIZE,
+            flow_data->parent->buffer_size,
             &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
@@ -1065,7 +1084,7 @@
     {
         /* if the q_item has not been used, allocate a buffer */
         q_item->buffer = BMI_memalloc(q_item->parent->dest.u.bmi.address,
-            BUFFER_SIZE, BMI_SEND);
+            q_item->parent->buffer_size, BMI_SEND);
         /* TODO: error handling */
         assert(q_item->buffer);
         q_item->bmi_callback.fn = bmi_send_callback_wrapper;
@@ -1093,7 +1112,8 @@
             result_tmp->offset_list;
         result_tmp->result.size_array = 
             result_tmp->size_list;
-        result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
+        result_tmp->result.bytemax = q_item->parent->buffer_size 
+            - bytes_processed;
         result_tmp->result.bytes = 0;
         result_tmp->result.segmax = MAX_REGIONS;
         result_tmp->result.segs = 0;
@@ -1125,10 +1145,10 @@
             q_item->buffer_used += old_result_tmp->result.bytes;
         }
 
-    }while(bytes_processed < BUFFER_SIZE && 
+    }while(bytes_processed < flow_data->parent->buffer_size && 
         !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-    assert(bytes_processed <= BUFFER_SIZE);
+    assert(bytes_processed <= flow_data->parent->buffer_size);
 
     /* important to update the sequence /after/ request processed */
     q_item->seq = flow_data->next_seq;
@@ -1328,7 +1348,7 @@
     {
         /* if the q_item has not been used, allocate a buffer */
         q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
-            BUFFER_SIZE, BMI_RECV);
+            q_item->parent->buffer_size, BMI_RECV);
         /* TODO: error handling */
         assert(q_item->buffer);
         q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -1360,7 +1380,8 @@
                 result_tmp->offset_list;
             result_tmp->result.size_array = 
                 result_tmp->size_list;
-            result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
+            result_tmp->result.bytemax = flow_data->parent->buffer_size 
+                - bytes_processed;
             result_tmp->result.bytes = 0;
             result_tmp->result.segmax = MAX_REGIONS;
             result_tmp->result.segs = 0;
@@ -1391,10 +1412,10 @@
                     ((char*)tmp_buffer + old_result_tmp->result.bytes);
                 bytes_processed += old_result_tmp->result.bytes;
             }
-        }while(bytes_processed < BUFFER_SIZE && 
+        }while(bytes_processed < flow_data->parent->buffer_size && 
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert(bytes_processed <= BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
  
         flow_data->total_bytes_processed += bytes_processed;
 
@@ -1414,7 +1435,7 @@
         ret = BMI_post_recv(&q_item->posted_id,
             q_item->parent->src.u.bmi.address,
             q_item->buffer,
-            BUFFER_SIZE,
+            flow_data->parent->buffer_size,
             &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
@@ -1458,13 +1479,13 @@
     if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
         flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
     {
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
         {
             if(flow_data->prealloc_array[i].buffer)
             {
                 BMI_memfree(flow_data->parent->src.u.bmi.address,
                     flow_data->prealloc_array[i].buffer,
-                    BUFFER_SIZE,
+                    flow_data->parent->buffer_size,
                     BMI_RECV);
             }
             result_tmp = &(flow_data->prealloc_array[i].result_chain);
@@ -1481,13 +1502,13 @@
     else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
         flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
     {
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
         {
             if(flow_data->prealloc_array[i].buffer)
             {
                 BMI_memfree(flow_data->parent->dest.u.bmi.address,
                     flow_data->prealloc_array[i].buffer,
-                    BUFFER_SIZE,
+                    flow_data->parent->buffer_size,
                     BMI_SEND);
             }
             result_tmp = &(flow_data->prealloc_array[i].result_chain);
@@ -1507,7 +1528,7 @@
         if(flow_data->intermediate)
         {
             BMI_memfree(flow_data->parent->dest.u.bmi.address,
-                flow_data->intermediate, BUFFER_SIZE, BMI_SEND);
+                flow_data->intermediate, flow_data->parent->buffer_size, BMI_SEND);
         }
     }
     else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
@@ -1516,9 +1537,11 @@
         if(flow_data->intermediate)
         {
             BMI_memfree(flow_data->parent->src.u.bmi.address,
-                flow_data->intermediate, BUFFER_SIZE, BMI_RECV);
+                flow_data->intermediate, flow_data->parent->buffer_size, BMI_RECV);
         }
     }
+
+    free(flow_data->prealloc_array);
 }
 
 /* mem_to_bmi_callback()
@@ -1574,7 +1597,7 @@
         q_item->result_chain.offset_list;
     q_item->result_chain.result.size_array = 
         q_item->result_chain.size_list;
-    q_item->result_chain.result.bytemax = BUFFER_SIZE;
+    q_item->result_chain.result.bytemax = flow_data->parent->buffer_size;
     q_item->result_chain.result.bytes = 0;
     q_item->result_chain.result.segmax = MAX_REGIONS;
     q_item->result_chain.result.segs = 0;
@@ -1590,14 +1613,14 @@
 
     /* was MAX_REGIONS enough to satisfy this step? */
     if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
-        q_item->result_chain.result.bytes < BUFFER_SIZE)
+        q_item->result_chain.result.bytes < flow_data->parent->buffer_size)
     {
         /* create an intermediate buffer */
         if(!flow_data->intermediate)
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->dest.u.bmi.address,
-                BUFFER_SIZE, BMI_SEND);
+                flow_data->parent->buffer_size, BMI_SEND);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
@@ -1615,7 +1638,7 @@
         do
         {
             q_item->result_chain.result.bytemax =
-                (BUFFER_SIZE - bytes_processed);
+                (flow_data->parent->buffer_size - bytes_processed);
             q_item->result_chain.result.bytes = 0;
             q_item->result_chain.result.segmax = MAX_REGIONS;
             q_item->result_chain.result.segs = 0;
@@ -1638,10 +1661,10 @@
                 memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
                 bytes_processed += q_item->result_chain.size_list[i];
             }
-        }while(bytes_processed < BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert (bytes_processed <= BUFFER_SIZE);
+        assert (bytes_processed <= flow_data->parent->buffer_size);
 
         /* setup for BMI operation */
         flow_data->tmp_buffer_list[0] = flow_data->intermediate;
@@ -1761,7 +1784,7 @@
         do
         {
             q_item->result_chain.result.bytemax =
-                (BUFFER_SIZE - bytes_processed);
+                (q_item->parent->buffer_size - bytes_processed);
             q_item->result_chain.result.bytes = 0;
             q_item->result_chain.result.segmax = MAX_REGIONS;
             q_item->result_chain.result.segs = 0;
@@ -1785,10 +1808,10 @@
                 memcpy(dest_ptr, src_ptr, region_size);
                 bytes_processed += region_size;
             }
-        }while(bytes_processed < BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert(bytes_processed <= BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
     }
 
     /* are we done? */
@@ -1807,7 +1830,7 @@
         q_item->result_chain.offset_list;
     q_item->result_chain.result.size_array = 
         q_item->result_chain.size_list;
-    q_item->result_chain.result.bytemax = BUFFER_SIZE;
+    q_item->result_chain.result.bytemax = flow_data->parent->buffer_size;
     q_item->result_chain.result.bytes = 0;
     q_item->result_chain.result.segmax = MAX_REGIONS;
     q_item->result_chain.result.segs = 0;
@@ -1822,22 +1845,22 @@
 
     /* was MAX_REGIONS enough to satisfy this step? */
     if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
-        q_item->result_chain.result.bytes < BUFFER_SIZE)
+        q_item->result_chain.result.bytes < flow_data->parent->buffer_size)
     {
         /* create an intermediate buffer */
         if(!flow_data->intermediate)
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->src.u.bmi.address,
-                BUFFER_SIZE, BMI_RECV);
+                flow_data->parent->buffer_size, BMI_RECV);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
         /* setup for BMI operation */
         flow_data->tmp_buffer_list[0] = flow_data->intermediate;
         buffer_type = BMI_PRE_ALLOC;
-        q_item->buffer_used = BUFFER_SIZE;
-        total_size = BUFFER_SIZE;
+        q_item->buffer_used = flow_data->parent->buffer_size;
+        total_size = flow_data->parent->buffer_size;
         size_array = &q_item->buffer_used;
         segs = 1;
         /* we will copy data out on next iteration */
diff -Naur pvfs2/src/server/io.sm pvfs2-new/src/server/io.sm
--- pvfs2/src/server/io.sm	2006-06-05 21:57:28.000000000 +0200
+++ pvfs2-new/src/server/io.sm	2006-08-02 23:27:24.000000000 +0200
@@ -162,6 +162,7 @@
     int err = -PVFS_EIO;
     job_id_t tmp_id;
     struct server_configuration_s *user_opts = get_server_config_struct();
+    struct filesystem_configuration_s *fs_conf;
         
     s_op->u.io.flow_d = PINT_flow_alloc();
     if (!s_op->u.io.flow_d)
@@ -201,6 +202,15 @@
     s_op->u.io.flow_d->user_ptr = NULL;
     s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
 
+    fs_conf = PINT_config_find_fs_id(user_opts, 
+        s_op->req->u.io.fs_id);
+    if(fs_conf)
+    {
+        /* pick up any buffer settings overrides from fs conf */
+        s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
+        s_op->u.io.flow_d->buffers_per_flow = fs_conf->fp_buffers_per_flow;
+    }
+
     gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, " 
         "server_nr: %d, server_ct: %d\n",
         lld(s_op->u.io.flow_d->file_data.fsize),
diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/common/misc/server-config.c
--- pvfs2/src/common/misc/server-config.c	2006-08-02 17:13:00.000000000 +0200
+++ pvfs2-new/src/common/misc/server-config.c	2006-08-03 23:10:28.000000000 +0200
@@ -73,6 +73,7 @@
 static DOTCONF_CB(get_trove_sync_meta);
 static DOTCONF_CB(get_trove_sync_data);
 static DOTCONF_CB(get_db_cache_size_bytes);
+static DOTCONF_CB(get_trove_max_concurrent_io);
 static DOTCONF_CB(get_db_cache_type);
 static DOTCONF_CB(get_param);
 static DOTCONF_CB(get_value);
@@ -423,6 +424,12 @@
     {"ID",ARG_INT, get_filesystem_collid,NULL,
         CTX_FILESYSTEM,NULL},
 
+    /* maximum number of AIO operations that Trove will allow to run
+     * concurrently 
+     */
+    {"TroveMaxConcurrentIO", ARG_INT, get_trove_max_concurrent_io, NULL,
+        CTX_DEFAULTS|CTX_GLOBAL,"16"},
+
     /* The gossip interface in pvfs allows users to specify different
      * levels of logging for the pvfs server.  The output of these
      * different log levels is written to a file, which is specified in
@@ -742,6 +749,7 @@
     config_s->client_job_flow_timeout = PVFS2_CLIENT_JOB_FLOW_TIMEOUT_DEFAULT;
     config_s->client_retry_limit = PVFS2_CLIENT_RETRY_LIMIT_DEFAULT;
     config_s->client_retry_delay_ms = PVFS2_CLIENT_RETRY_DELAY_MS_DEFAULT;
+    config_s->trove_max_concurrent_io = 16;
 
     if (cache_config_files(
             config_s, global_config_filename, server_config_filename))
@@ -1548,6 +1556,14 @@
     return NULL;
 }
 
+DOTCONF_CB(get_trove_max_concurrent_io)
+{
+    struct server_configuration_s *config_s = 
+        (struct server_configuration_s *)cmd->context;
+    config_s->trove_max_concurrent_io = cmd->data.value;
+    return NULL;
+}
+
 DOTCONF_CB(get_db_cache_type)
 {
     struct server_configuration_s *config_s =
diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/common/misc/server-config.h
--- pvfs2/src/common/misc/server-config.h	2006-07-13 07:11:40.000000000 +0200
+++ pvfs2-new/src/common/misc/server-config.h	2006-08-03 23:26:23.000000000 +0200
@@ -147,6 +147,8 @@
                                        if zero, use defaults */
     char * db_cache_type;
 
+    int trove_max_concurrent_io;    /* maximum number of simultaneous I/O ops */
+
 } server_configuration_s;
 
 int PINT_parse_config(
diff -Naur pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c pvfs2-new/src/io/trove/trove-dbpf/dbpf-bstream.c
--- pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c	2006-06-23 22:59:29.000000000 +0200
+++ pvfs2-new/src/io/trove/trove-dbpf/dbpf-bstream.c	2006-08-03 23:05:37.000000000 +0200
@@ -31,7 +31,7 @@
 
 #define AIOCB_ARRAY_SZ 64
 
-#define DBPF_MAX_IOS_IN_PROGRESS  16
+extern int TROVE_max_concurrent_io;
 static int s_dbpf_ios_in_progress = 0;
 static dbpf_op_queue_p s_dbpf_io_ready_queue = NULL;
 static gen_mutex_t s_dbpf_io_mutex = GEN_MUTEX_INITIALIZER;
@@ -277,9 +277,6 @@
                (cur_op->op.type == BSTREAM_WRITE_LIST));
         dbpf_op_queue_remove(cur_op);
 
-        assert(s_dbpf_ios_in_progress <
-               (DBPF_MAX_IOS_IN_PROGRESS + 1));
-
         gossip_debug(GOSSIP_TROVE_DEBUG, "starting delayed I/O "
                      "operation %p (%d in progress)\n", cur_op,
                      s_dbpf_ios_in_progress);
@@ -363,7 +360,7 @@
     {
         s_dbpf_ios_in_progress--;
     }
-    if (s_dbpf_ios_in_progress < DBPF_MAX_IOS_IN_PROGRESS)
+    if (s_dbpf_ios_in_progress < TROVE_max_concurrent_io)
     {
         s_dbpf_ios_in_progress++;
     }
diff -Naur pvfs2/src/io/trove/trove.c pvfs2-new/src/io/trove/trove.c
--- pvfs2/src/io/trove/trove.c	2006-06-16 23:01:13.000000000 +0200
+++ pvfs2-new/src/io/trove/trove.c	2006-08-03 23:08:08.000000000 +0200
@@ -31,6 +31,7 @@
 
 int TROVE_db_cache_size_bytes = 0;
 int TROVE_shm_key_hint = 0;
+int TROVE_max_concurrent_io = 16;
 
 /** Initiate reading from a contiguous region in a bstream into a
  *  contiguous region in memory.
@@ -964,6 +965,11 @@
         TROVE_shm_key_hint = *((int*)parameter);
 	return(0);
     }
+    if(option == TROVE_MAX_CONCURRENT_IO)
+    {
+        TROVE_max_concurrent_io = *((int*)parameter);
+	return(0);
+    }
 
     method_id = map_coll_id_to_method(coll_id);
     if (method_id < 0) {
diff -Naur pvfs2/src/io/trove/trove.h pvfs2-new/src/io/trove/trove.h
--- pvfs2/src/io/trove/trove.h	2006-07-13 07:11:41.000000000 +0200
+++ pvfs2-new/src/io/trove/trove.h	2006-08-03 23:07:37.000000000 +0200
@@ -72,6 +72,7 @@
     TROVE_COLLECTION_ATTR_CACHE_MAX_NUM_ELEMS,
     TROVE_COLLECTION_ATTR_CACHE_INITIALIZE,
     TROVE_DB_CACHE_SIZE_BYTES,
+    TROVE_MAX_CONCURRENT_IO,
     TROVE_COLLECTION_COALESCING_HIGH_WATERMARK,
     TROVE_COLLECTION_COALESCING_LOW_WATERMARK,
     TROVE_COLLECTION_META_SYNC_MODE,
diff -Naur pvfs2/src/server/pvfs2-server.c pvfs2-new/src/server/pvfs2-server.c
--- pvfs2/src/server/pvfs2-server.c	2006-07-13 07:11:42.000000000 +0200
+++ pvfs2-new/src/server/pvfs2-server.c	2006-08-03 23:07:01.000000000 +0200
@@ -950,6 +950,10 @@
                                    &server_config.db_cache_size_bytes);
     /* this should never fail */
     assert(ret == 0);
+    ret = trove_collection_setinfo(0, 0, TROVE_MAX_CONCURRENT_IO,
+        &server_config.trove_max_concurrent_io);
+    /* this should never fail */
+    assert(ret == 0);
 
     /* parse port number and allow trove to use it to help differentiate
      * shmem regions if needed
_______________________________________________
Pvfs2-developers mailing list
[email protected]
http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers

Reply via email to