On Aug 10, 2006, at 4:04 PM, Phil Carns wrote:

flow-proto-tuning.patch:
-----------
This patch adds "FlowBufferSizeBytes" and "FlowBuffersPerFlow" options to the configuration file format. They allow you to specify the buffer size that the default flow protocol will use as well as the maximum number of buffers to use per flow. Note that if you change either of these parameters, then you need to remount any active clients so that they pick up the configuration change before performing any I/O.

max-aio.patch:
----------
This patch adds "TroveMaxConcurrentIO" to the configuration file format. It allows you to specify the maximum number of I/O operations that trove will allow to proceed concurrently (currently 16). Note from the previous email regarding AIO that depending on your access pattern, AIO may queue all of your operations anyway regardless of this setting. It probably doesn't have much effect unless you are accessing more than one file at a time, or if you are using an alternative to the stock AIO implementation.


I had made the same change in Julian's branch, there are still a couple things that aren't clear to me about this max value though. First, its a global value for all outstanding lio_listio calls the pvfs server makes, but based on your previous email comments about glibc's one-thread-per-fd oddity, it seems like we only want that value to max out per datafile. Also, after we hit the max we just queue the operations and post them once current ops have completed. If librt just queues ops and does them in FIFO order though, its pretty much the same thing. Why not just let librt handle the queuing? If we were to do ordering of the operations based on offsets, then it would make sense for us to queue, but we don't. Are we better at queueing than librt?

I know Julian was looking at performance of aio and found results somewhere (I don't have a reference, sorry) that showed lio_listio did better in cases where multiple fds were passed to one lio_listio operation (right now we just do one fd with multiple segments to one lio_listio). I wonder if that difference is based on the glibc queuing behavior that you describe. Just a curiousity, but I wonder if the aio performance would change if we were to post multiple trove operations in the same lio_listio call, or possibly even break up the bstream into multiple files based on strip size...sounds crazy right? :-)

-sam

As a side note, I generally am not including any pvfs2-genconfig changes with these types of patches, but I can make them available if anyone is interested. We typically add command line arguments to pvfs2-genconfig for every parameter so that we can reproduce consistent configuration files by calling pvfs2-genconfig from a script, but I don't know if this is generally useful or just adds clutter :)

-Phil
diff -Naur pvfs2/src/client/sysint/sys-io.sm pvfs2-new/src/client/ sysint/sys-io.sm --- pvfs2/src/client/sysint/sys-io.sm 2006-06-16 19:22:47.000000000 +0200 +++ pvfs2-new/src/client/sysint/sys-io.sm 2006-08-02 23:26:24.000000000 +0200
@@ -1639,6 +1639,7 @@
     PVFS_object_attr *attr = NULL;
     struct server_configuration_s *server_config = NULL;
     unsigned long status_user_tag = 0;
+    struct filesystem_configuration_s * fs_config;

     gossip_debug(GOSSIP_IO_DEBUG, "- build_context_flow called\n");

@@ -1703,7 +1704,16 @@

     status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FLOW);

- server_config = PINT_get_server_config_struct(sm_p- >object_ref.fs_id); + server_config = PINT_get_server_config_struct(sm_p- >object_ref.fs_id);
+
+ fs_config = PINT_config_find_fs_id(server_config, cur_ctx- >msg.fs_id);
+    if(fs_config)
+    {
+        /* pick up any buffer settings overrides from fs conf */
+        cur_ctx->flow_desc.buffer_size = fs_config->fp_buffer_size;
+ cur_ctx->flow_desc.buffers_per_flow = fs_config- >fp_buffers_per_flow;
+    }
+
     ret = job_flow(
         &cur_ctx->flow_desc, sm_p, status_user_tag,
         &cur_ctx->flow_status, &cur_ctx->flow_job_id,
diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/ common/misc/server-config.c --- pvfs2/src/common/misc/server-config.c 2006-08-02 17:13:00.000000000 +0200 +++ pvfs2-new/src/common/misc/server-config.c 2006-08-02 23:29:17.000000000 +0200
@@ -67,6 +67,8 @@
 static DOTCONF_CB(get_bmi_module_list);
 static DOTCONF_CB(get_flow_module_list);
 static DOTCONF_CB(get_handle_recycle_timeout_seconds);
+static DOTCONF_CB(get_flow_buffer_size_bytes);
+static DOTCONF_CB(get_flow_buffers_per_flow);
 static DOTCONF_CB(get_attr_cache_keywords_list);
 static DOTCONF_CB(get_attr_cache_size);
 static DOTCONF_CB(get_attr_cache_max_num_elems);
@@ -573,6 +575,14 @@
     {"FlowModules",ARG_LIST, get_flow_module_list,NULL,
         CTX_DEFAULTS|CTX_GLOBAL,"flowproto_multiqueue,"},

+    /* buffer size to use for bulk data transfers */
+    {"FlowBufferSizeBytes", ARG_INT,
+         get_flow_buffer_size_bytes, NULL, CTX_FILESYSTEM,"262144"},
+
+    /* number of buffers to use for bulk data transfers */
+    {"FlowBuffersPerFlow", ARG_INT,
+         get_flow_buffers_per_flow, NULL, CTX_FILESYSTEM,"8"},
+
/* The TROVE storage layer has a management component that deals with * allocating handle values for new metafiles and datafiles. The underlying * trove module can be given a hint to tell it how long to wait before
@@ -979,6 +989,8 @@
     fs_conf->encoding = ENCODING_DEFAULT;
     fs_conf->trove_sync_meta = TROVE_SYNC;
     fs_conf->trove_sync_data = TROVE_SYNC;
+    fs_conf->fp_buffer_size = -1;
+    fs_conf->fp_buffers_per_flow = -1;

     if (!config_s->file_systems)
     {
@@ -1382,6 +1394,31 @@
 }


+DOTCONF_CB(get_flow_buffer_size_bytes)
+{
+    struct filesystem_configuration_s *fs_conf = NULL;
+    struct server_configuration_s *config_s =
+        (struct server_configuration_s *)cmd->context;
+
+    fs_conf = (struct filesystem_configuration_s *)
+        PINT_llist_head(config_s->file_systems);
+    fs_conf->fp_buffer_size = cmd->data.value;
+    return NULL;
+}
+
+DOTCONF_CB(get_flow_buffers_per_flow)
+{
+    struct filesystem_configuration_s *fs_conf = NULL;
+    struct server_configuration_s *config_s =
+        (struct server_configuration_s *)cmd->context;
+
+    fs_conf = (struct filesystem_configuration_s *)
+        PINT_llist_head(config_s->file_systems);
+    fs_conf->fp_buffers_per_flow = cmd->data.value;
+
+    return NULL;
+}
+
 DOTCONF_CB(get_attr_cache_keywords_list)
 {
     int i = 0, len = 0;
@@ -2298,6 +2335,9 @@
             src_fs->attr_cache_max_num_elems;
         dest_fs->trove_sync_meta = src_fs->trove_sync_meta;
         dest_fs->trove_sync_data = src_fs->trove_sync_data;
+
+        dest_fs->fp_buffer_size = src_fs->fp_buffer_size;
+        dest_fs->fp_buffers_per_flow = src_fs->fp_buffers_per_flow;
     }
 }

diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/ common/misc/server-config.h --- pvfs2/src/common/misc/server-config.h 2006-07-13 07:11:40.000000000 +0200 +++ pvfs2-new/src/common/misc/server-config.h 2006-08-02 23:28:17.000000000 +0200
@@ -85,6 +85,9 @@
     int coalescing_high_watermark;
     int coalescing_low_watermark;

+    int fp_buffer_size;
+    int fp_buffers_per_flow;
+
 } filesystem_configuration_s;

 typedef struct distribution_param_configuration_s
diff -Naur pvfs2/src/io/flow/flow.c pvfs2-new/src/io/flow/flow.c
--- pvfs2/src/io/flow/flow.c    2006-06-28 23:03:08.000000000 +0200
+++ pvfs2-new/src/io/flow/flow.c        2006-08-02 23:26:24.000000000 +0200
@@ -306,6 +306,8 @@
     flow_d->aggregate_size = -1;
     flow_d->state = FLOW_INITIAL;
     flow_d->type = FLOWPROTO_DEFAULT;
+    flow_d->buffers_per_flow = -1;
+    flow_d->buffer_size = -1;

     flow_d->flow_mutex = (tmp_mutex ? tmp_mutex : gen_mutex_build());
     assert(flow_d->flow_mutex);
diff -Naur pvfs2/src/io/flow/flow.h pvfs2-new/src/io/flow/flow.h
--- pvfs2/src/io/flow/flow.h    2005-12-14 22:50:25.000000000 +0100
+++ pvfs2-new/src/io/flow/flow.h        2006-08-02 23:26:24.000000000 +0200
@@ -120,6 +120,10 @@
     /* information about the datafile that this flow will access */
     PINT_request_file_data file_data;

+    /* the buffer settings may be ignored by some protocols */
+    int buffer_size;            /* buffer size to use */
+ int buffers_per_flow; /* number of buffers to allow per flow */
+
        /***********************************************************/
     /* fields that can be read publicly upon completion */

diff -Naur pvfs2/src/io/flow/flowproto-bmi-trove/flowproto- multiqueue.c pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto- multiqueue.c --- pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c 2006-05-28 18:52:08.000000000 +0200 +++ pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto- multiqueue.c 2006-08-02 23:26:24.000000000 +0200
@@ -22,8 +22,12 @@
 #include "pint-perf-counter.h"
 #include "pvfs2-internal.h"

+/* the following buffer settings are used by default if none are specified in
+ * the flow descriptor
+ */
 #define BUFFERS_PER_FLOW 8
 #define BUFFER_SIZE (256*1024)
+
 #define MAX_REGIONS 64

#define FLOW_CLEANUP (__flow_data) \
@@ -72,7 +76,7 @@
 struct fp_private_data
 {
     flow_descriptor *parent;
-    struct fp_queue_item prealloc_array[BUFFERS_PER_FLOW];
+    struct fp_queue_item* prealloc_array;
     struct qlist_head list_link;
     PVFS_size total_bytes_processed;
     int next_seq;
@@ -540,7 +544,21 @@
             PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
     }

-    for(i=0; i<BUFFERS_PER_FLOW; i++)
+    if(flow_d->buffer_size < 1)
+        flow_d->buffer_size = BUFFER_SIZE;
+    if(flow_d->buffers_per_flow < 1)
+        flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
+
+    flow_data->prealloc_array = (struct fp_queue_item*)
+ malloc(flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    if(!flow_data->prealloc_array)
+    {
+        free(flow_data);
+        return(-PVFS_ENOMEM);
+    }
+    memset(flow_data->prealloc_array, 0,
+        flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    for(i=0; i<flow_d->buffers_per_flow; i++)
     {
         flow_data->prealloc_array[i].parent = flow_d;
         flow_data->prealloc_array[i].bmi_callback.data =
@@ -557,7 +575,7 @@
/* put all of the buffers on empty list, we don't really do any
          * queueing for this type of flow
          */
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -583,7 +601,7 @@
/* put all of the buffers on empty list, we don't really do any
          * queueing for this type of flow
          */
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -604,9 +622,9 @@
     else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
         flow_d->dest.endpoint_id == BMI_ENDPOINT)
     {
-        flow_data->initial_posts = BUFFERS_PER_FLOW;
+        flow_data->initial_posts = flow_d->buffers_per_flow;
         gen_mutex_lock(flow_data->parent->flow_mutex);
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue forcing bmi_send_callback_fn. \n");
@@ -635,7 +653,7 @@
         flow_data->initial_posts = 1;

         /* place remaining buffers on "empty" queue */
-        for(i=1; i<BUFFERS_PER_FLOW; i++)
+        for(i=1; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -760,7 +778,7 @@
         {
             /* if the q_item has not been used, allocate a buffer */
q_item->buffer = BMI_memalloc(q_item->parent- >src.u.bmi.address,
-                BUFFER_SIZE, BMI_RECV);
+                q_item->parent->buffer_size, BMI_RECV);
             /* TODO: error handling */
             assert(q_item->buffer);
             q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -784,7 +802,8 @@
                 result_tmp->offset_list;
             result_tmp->result.size_array =
                 result_tmp->size_list;
- result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed; + result_tmp->result.bytemax = flow_data->parent- >buffer_size -
+                bytes_processed;
             result_tmp->result.bytes = 0;
             result_tmp->result.segmax = MAX_REGIONS;
             result_tmp->result.segs = 0;
@@ -813,10 +832,10 @@
tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
                 bytes_processed += old_result_tmp->result.bytes;
             }
-        }while(bytes_processed < BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));

-        assert(bytes_processed <= BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
         if(bytes_processed == 0)
         {
             qlist_del(&q_item->list_link);
@@ -830,7 +849,7 @@
         ret = BMI_post_recv(&q_item->posted_id,
             q_item->parent->src.u.bmi.address,
             q_item->buffer,
-            BUFFER_SIZE,
+            flow_data->parent->buffer_size,
             &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
@@ -1065,7 +1084,7 @@
     {
         /* if the q_item has not been used, allocate a buffer */
q_item->buffer = BMI_memalloc(q_item->parent- >dest.u.bmi.address,
-            BUFFER_SIZE, BMI_SEND);
+            q_item->parent->buffer_size, BMI_SEND);
         /* TODO: error handling */
         assert(q_item->buffer);
         q_item->bmi_callback.fn = bmi_send_callback_wrapper;
@@ -1093,7 +1112,8 @@
             result_tmp->offset_list;
         result_tmp->result.size_array =
             result_tmp->size_list;
-        result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
+        result_tmp->result.bytemax = q_item->parent->buffer_size
+            - bytes_processed;
         result_tmp->result.bytes = 0;
         result_tmp->result.segmax = MAX_REGIONS;
         result_tmp->result.segs = 0;
@@ -1125,10 +1145,10 @@
             q_item->buffer_used += old_result_tmp->result.bytes;
         }

-    }while(bytes_processed < BUFFER_SIZE &&
+    }while(bytes_processed < flow_data->parent->buffer_size &&
         !PINT_REQUEST_DONE(q_item->parent->file_req_state));

-    assert(bytes_processed <= BUFFER_SIZE);
+    assert(bytes_processed <= flow_data->parent->buffer_size);

     /* important to update the sequence /after/ request processed */
     q_item->seq = flow_data->next_seq;
@@ -1328,7 +1348,7 @@
     {
         /* if the q_item has not been used, allocate a buffer */
q_item->buffer = BMI_memalloc(q_item->parent- >src.u.bmi.address,
-            BUFFER_SIZE, BMI_RECV);
+            q_item->parent->buffer_size, BMI_RECV);
         /* TODO: error handling */
         assert(q_item->buffer);
         q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -1360,7 +1380,8 @@
                 result_tmp->offset_list;
             result_tmp->result.size_array =
                 result_tmp->size_list;
- result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed; + result_tmp->result.bytemax = flow_data->parent- >buffer_size
+                - bytes_processed;
             result_tmp->result.bytes = 0;
             result_tmp->result.segmax = MAX_REGIONS;
             result_tmp->result.segs = 0;
@@ -1391,10 +1412,10 @@
((char*)tmp_buffer + old_result_tmp- >result.bytes);
                 bytes_processed += old_result_tmp->result.bytes;
             }
-        }while(bytes_processed < BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));

-        assert(bytes_processed <= BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);

         flow_data->total_bytes_processed += bytes_processed;

@@ -1414,7 +1435,7 @@
         ret = BMI_post_recv(&q_item->posted_id,
             q_item->parent->src.u.bmi.address,
             q_item->buffer,
-            BUFFER_SIZE,
+            flow_data->parent->buffer_size,
             &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
@@ -1458,13 +1479,13 @@
     if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
         flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
     {
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
         {
             if(flow_data->prealloc_array[i].buffer)
             {
                 BMI_memfree(flow_data->parent->src.u.bmi.address,
                     flow_data->prealloc_array[i].buffer,
-                    BUFFER_SIZE,
+                    flow_data->parent->buffer_size,
                     BMI_RECV);
             }
result_tmp = &(flow_data->prealloc_array [i].result_chain);
@@ -1481,13 +1502,13 @@
     else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
         flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
     {
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
         {
             if(flow_data->prealloc_array[i].buffer)
             {
                 BMI_memfree(flow_data->parent->dest.u.bmi.address,
                     flow_data->prealloc_array[i].buffer,
-                    BUFFER_SIZE,
+                    flow_data->parent->buffer_size,
                     BMI_SEND);
             }
result_tmp = &(flow_data->prealloc_array [i].result_chain);
@@ -1507,7 +1528,7 @@
         if(flow_data->intermediate)
         {
             BMI_memfree(flow_data->parent->dest.u.bmi.address,
-                flow_data->intermediate, BUFFER_SIZE, BMI_SEND);
+ flow_data->intermediate, flow_data->parent- >buffer_size, BMI_SEND);
         }
     }
     else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
@@ -1516,9 +1537,11 @@
         if(flow_data->intermediate)
         {
             BMI_memfree(flow_data->parent->src.u.bmi.address,
-                flow_data->intermediate, BUFFER_SIZE, BMI_RECV);
+ flow_data->intermediate, flow_data->parent- >buffer_size, BMI_RECV);
         }
     }
+
+    free(flow_data->prealloc_array);
 }

 /* mem_to_bmi_callback()
@@ -1574,7 +1597,7 @@
         q_item->result_chain.offset_list;
     q_item->result_chain.result.size_array =
         q_item->result_chain.size_list;
-    q_item->result_chain.result.bytemax = BUFFER_SIZE;
+ q_item->result_chain.result.bytemax = flow_data->parent- >buffer_size;
     q_item->result_chain.result.bytes = 0;
     q_item->result_chain.result.segmax = MAX_REGIONS;
     q_item->result_chain.result.segs = 0;
@@ -1590,14 +1613,14 @@

     /* was MAX_REGIONS enough to satisfy this step? */
     if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
-        q_item->result_chain.result.bytes < BUFFER_SIZE)
+ q_item->result_chain.result.bytes < flow_data->parent- >buffer_size)
     {
         /* create an intermediate buffer */
         if(!flow_data->intermediate)
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->dest.u.bmi.address,
-                BUFFER_SIZE, BMI_SEND);
+                flow_data->parent->buffer_size, BMI_SEND);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
@@ -1615,7 +1638,7 @@
         do
         {
             q_item->result_chain.result.bytemax =
-                (BUFFER_SIZE - bytes_processed);
+                (flow_data->parent->buffer_size - bytes_processed);
             q_item->result_chain.result.bytes = 0;
             q_item->result_chain.result.segmax = MAX_REGIONS;
             q_item->result_chain.result.segs = 0;
@@ -1638,10 +1661,10 @@
memcpy(dest_ptr, src_ptr, q_item- >result_chain.size_list[i]);
                 bytes_processed += q_item->result_chain.size_list[i];
             }
-        }while(bytes_processed < BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));

-        assert (bytes_processed <= BUFFER_SIZE);
+        assert (bytes_processed <= flow_data->parent->buffer_size);

         /* setup for BMI operation */
         flow_data->tmp_buffer_list[0] = flow_data->intermediate;
@@ -1761,7 +1784,7 @@
         do
         {
             q_item->result_chain.result.bytemax =
-                (BUFFER_SIZE - bytes_processed);
+                (q_item->parent->buffer_size - bytes_processed);
             q_item->result_chain.result.bytes = 0;
             q_item->result_chain.result.segmax = MAX_REGIONS;
             q_item->result_chain.result.segs = 0;
@@ -1785,10 +1808,10 @@
                 memcpy(dest_ptr, src_ptr, region_size);
                 bytes_processed += region_size;
             }
-        }while(bytes_processed < BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));

-        assert(bytes_processed <= BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
     }

     /* are we done? */
@@ -1807,7 +1830,7 @@
         q_item->result_chain.offset_list;
     q_item->result_chain.result.size_array =
         q_item->result_chain.size_list;
-    q_item->result_chain.result.bytemax = BUFFER_SIZE;
+ q_item->result_chain.result.bytemax = flow_data->parent- >buffer_size;
     q_item->result_chain.result.bytes = 0;
     q_item->result_chain.result.segmax = MAX_REGIONS;
     q_item->result_chain.result.segs = 0;
@@ -1822,22 +1845,22 @@

     /* was MAX_REGIONS enough to satisfy this step? */
     if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
-        q_item->result_chain.result.bytes < BUFFER_SIZE)
+ q_item->result_chain.result.bytes < flow_data->parent- >buffer_size)
     {
         /* create an intermediate buffer */
         if(!flow_data->intermediate)
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->src.u.bmi.address,
-                BUFFER_SIZE, BMI_RECV);
+                flow_data->parent->buffer_size, BMI_RECV);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
         /* setup for BMI operation */
         flow_data->tmp_buffer_list[0] = flow_data->intermediate;
         buffer_type = BMI_PRE_ALLOC;
-        q_item->buffer_used = BUFFER_SIZE;
-        total_size = BUFFER_SIZE;
+        q_item->buffer_used = flow_data->parent->buffer_size;
+        total_size = flow_data->parent->buffer_size;
         size_array = &q_item->buffer_used;
         segs = 1;
         /* we will copy data out on next iteration */
diff -Naur pvfs2/src/server/io.sm pvfs2-new/src/server/io.sm
--- pvfs2/src/server/io.sm      2006-06-05 21:57:28.000000000 +0200
+++ pvfs2-new/src/server/io.sm  2006-08-02 23:27:24.000000000 +0200
@@ -162,6 +162,7 @@
     int err = -PVFS_EIO;
     job_id_t tmp_id;
struct server_configuration_s *user_opts = get_server_config_struct();
+    struct filesystem_configuration_s *fs_conf;

     s_op->u.io.flow_d = PINT_flow_alloc();
     if (!s_op->u.io.flow_d)
@@ -201,6 +202,15 @@
     s_op->u.io.flow_d->user_ptr = NULL;
     s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;

+    fs_conf = PINT_config_find_fs_id(user_opts,
+        s_op->req->u.io.fs_id);
+    if(fs_conf)
+    {
+        /* pick up any buffer settings overrides from fs conf */
+        s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
+ s_op->u.io.flow_d->buffers_per_flow = fs_conf- >fp_buffers_per_flow;
+    }
+
     gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, "
         "server_nr: %d, server_ct: %d\n",
         lld(s_op->u.io.flow_d->file_data.fsize),
diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/ common/misc/server-config.c --- pvfs2/src/common/misc/server-config.c 2006-08-02 17:13:00.000000000 +0200 +++ pvfs2-new/src/common/misc/server-config.c 2006-08-03 23:10:28.000000000 +0200
@@ -73,6 +73,7 @@
 static DOTCONF_CB(get_trove_sync_meta);
 static DOTCONF_CB(get_trove_sync_data);
 static DOTCONF_CB(get_db_cache_size_bytes);
+static DOTCONF_CB(get_trove_max_concurrent_io);
 static DOTCONF_CB(get_db_cache_type);
 static DOTCONF_CB(get_param);
 static DOTCONF_CB(get_value);
@@ -423,6 +424,12 @@
     {"ID",ARG_INT, get_filesystem_collid,NULL,
         CTX_FILESYSTEM,NULL},

+    /* maximum number of AIO operations that Trove will allow to run
+     * concurrently
+     */
+ {"TroveMaxConcurrentIO", ARG_INT, get_trove_max_concurrent_io, NULL,
+        CTX_DEFAULTS|CTX_GLOBAL,"16"},
+
     /* The gossip interface in pvfs allows users to specify different
      * levels of logging for the pvfs server.  The output of these
* different log levels is written to a file, which is specified in
@@ -742,6 +749,7 @@
config_s->client_job_flow_timeout = PVFS2_CLIENT_JOB_FLOW_TIMEOUT_DEFAULT;
     config_s->client_retry_limit = PVFS2_CLIENT_RETRY_LIMIT_DEFAULT;
config_s->client_retry_delay_ms = PVFS2_CLIENT_RETRY_DELAY_MS_DEFAULT;
+    config_s->trove_max_concurrent_io = 16;

     if (cache_config_files(
config_s, global_config_filename, server_config_filename))
@@ -1548,6 +1556,14 @@
     return NULL;
 }

+DOTCONF_CB(get_trove_max_concurrent_io)
+{
+    struct server_configuration_s *config_s =
+        (struct server_configuration_s *)cmd->context;
+    config_s->trove_max_concurrent_io = cmd->data.value;
+    return NULL;
+}
+
 DOTCONF_CB(get_db_cache_type)
 {
     struct server_configuration_s *config_s =
diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/ common/misc/server-config.h --- pvfs2/src/common/misc/server-config.h 2006-07-13 07:11:40.000000000 +0200 +++ pvfs2-new/src/common/misc/server-config.h 2006-08-03 23:26:23.000000000 +0200
@@ -147,6 +147,8 @@
                                        if zero, use defaults */
     char * db_cache_type;

+ int trove_max_concurrent_io; /* maximum number of simultaneous I/O ops */
+
 } server_configuration_s;

 int PINT_parse_config(
diff -Naur pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c pvfs2-new/ src/io/trove/trove-dbpf/dbpf-bstream.c --- pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c 2006-06-23 22:59:29.000000000 +0200 +++ pvfs2-new/src/io/trove/trove-dbpf/dbpf-bstream.c 2006-08-03 23:05:37.000000000 +0200
@@ -31,7 +31,7 @@

 #define AIOCB_ARRAY_SZ 64

-#define DBPF_MAX_IOS_IN_PROGRESS  16
+extern int TROVE_max_concurrent_io;
 static int s_dbpf_ios_in_progress = 0;
 static dbpf_op_queue_p s_dbpf_io_ready_queue = NULL;
 static gen_mutex_t s_dbpf_io_mutex = GEN_MUTEX_INITIALIZER;
@@ -277,9 +277,6 @@
                (cur_op->op.type == BSTREAM_WRITE_LIST));
         dbpf_op_queue_remove(cur_op);

-        assert(s_dbpf_ios_in_progress <
-               (DBPF_MAX_IOS_IN_PROGRESS + 1));
-
         gossip_debug(GOSSIP_TROVE_DEBUG, "starting delayed I/O "
                      "operation %p (%d in progress)\n", cur_op,
                      s_dbpf_ios_in_progress);
@@ -363,7 +360,7 @@
     {
         s_dbpf_ios_in_progress--;
     }
-    if (s_dbpf_ios_in_progress < DBPF_MAX_IOS_IN_PROGRESS)
+    if (s_dbpf_ios_in_progress < TROVE_max_concurrent_io)
     {
         s_dbpf_ios_in_progress++;
     }
diff -Naur pvfs2/src/io/trove/trove.c pvfs2-new/src/io/trove/trove.c
--- pvfs2/src/io/trove/trove.c  2006-06-16 23:01:13.000000000 +0200
+++ pvfs2-new/src/io/trove/trove.c      2006-08-03 23:08:08.000000000 +0200
@@ -31,6 +31,7 @@

 int TROVE_db_cache_size_bytes = 0;
 int TROVE_shm_key_hint = 0;
+int TROVE_max_concurrent_io = 16;

 /** Initiate reading from a contiguous region in a bstream into a
  *  contiguous region in memory.
@@ -964,6 +965,11 @@
         TROVE_shm_key_hint = *((int*)parameter);
        return(0);
     }
+    if(option == TROVE_MAX_CONCURRENT_IO)
+    {
+        TROVE_max_concurrent_io = *((int*)parameter);
+       return(0);
+    }

     method_id = map_coll_id_to_method(coll_id);
     if (method_id < 0) {
diff -Naur pvfs2/src/io/trove/trove.h pvfs2-new/src/io/trove/trove.h
--- pvfs2/src/io/trove/trove.h  2006-07-13 07:11:41.000000000 +0200
+++ pvfs2-new/src/io/trove/trove.h      2006-08-03 23:07:37.000000000 +0200
@@ -72,6 +72,7 @@
     TROVE_COLLECTION_ATTR_CACHE_MAX_NUM_ELEMS,
     TROVE_COLLECTION_ATTR_CACHE_INITIALIZE,
     TROVE_DB_CACHE_SIZE_BYTES,
+    TROVE_MAX_CONCURRENT_IO,
     TROVE_COLLECTION_COALESCING_HIGH_WATERMARK,
     TROVE_COLLECTION_COALESCING_LOW_WATERMARK,
     TROVE_COLLECTION_META_SYNC_MODE,
diff -Naur pvfs2/src/server/pvfs2-server.c pvfs2-new/src/server/ pvfs2-server.c --- pvfs2/src/server/pvfs2-server.c 2006-07-13 07:11:42.000000000 +0200 +++ pvfs2-new/src/server/pvfs2-server.c 2006-08-03 23:07:01.000000000 +0200
@@ -950,6 +950,10 @@
&server_config.db_cache_size_bytes);
     /* this should never fail */
     assert(ret == 0);
+    ret = trove_collection_setinfo(0, 0, TROVE_MAX_CONCURRENT_IO,
+        &server_config.trove_max_concurrent_io);
+    /* this should never fail */
+    assert(ret == 0);

/* parse port number and allow trove to use it to help differentiate
      * shmem regions if needed
_______________________________________________
Pvfs2-developers mailing list
[email protected]
http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers

_______________________________________________
Pvfs2-developers mailing list
[email protected]
http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers

Reply via email to