Hi Sam,

> I had made the same change in Julian's branch, there are still a
> couple things that aren't clear to me about this max value though.
> First, its a global value for all outstanding lio_listio calls the
> pvfs server makes, but based on your previous email comments about
> glibc's   one-thread-per-fd oddity, it seems like we only want that
> value to max out per datafile.  Also, after we hit the max we just
> queue the operations and post them once current ops have completed.
> If librt just queues ops and does them in FIFO order though, its
> pretty much the same thing.  Why not just let librt handle the
> queuing?  If we were to do ordering of the operations based on
> offsets, then it would make sense for us to queue, but we don't.
> Are we better at queueing than librt?

I think we could (in theory atleast) do a better job than librt.
Coalescing adjacent file block requests, using the bmap ioctl() to order
I/O requests (RobR's thesis approach, although I dont know if these ioctls
work on non-ext* file systems)
etc are something that we can do if we do the queueing and dispatching
ourselves.
thanks,
Murali

>
> I know Julian was looking at performance of aio and found results
> somewhere (I don't have a reference, sorry) that showed lio_listio
> did better in cases where multiple fds were passed to one lio_listio
> operation (right now we just do one fd with multiple segments to one
> lio_listio).  I wonder if that difference is based on the glibc
> queuing behavior that you describe.  Just a curiousity, but I wonder
> if the aio performance would change if we were to post multiple trove
> operations in the same lio_listio call, or possibly even break up the
> bstream into multiple files based on strip size...sounds crazy
> right? :-)
>
> -sam
>
> > As a side note, I generally am not including any pvfs2-genconfig
> > changes with these types of patches, but I can make them available
> > if anyone is interested.  We typically add command line arguments
> > to pvfs2-genconfig for every parameter so that we can reproduce
> > consistent configuration files by calling pvfs2-genconfig from a
> > script, but I don't know if this is generally useful or just adds
> > clutter :)
> >
> > -Phil
> > diff -Naur pvfs2/src/client/sysint/sys-io.sm pvfs2-new/src/client/
> > sysint/sys-io.sm
> > --- pvfs2/src/client/sysint/sys-io.sm       2006-06-16 19:22:47.000000000
> > +0200
> > +++ pvfs2-new/src/client/sysint/sys-io.sm   2006-08-02
> > 23:26:24.000000000 +0200
> > @@ -1639,6 +1639,7 @@
> >      PVFS_object_attr *attr = NULL;
> >      struct server_configuration_s *server_config = NULL;
> >      unsigned long status_user_tag = 0;
> > +    struct filesystem_configuration_s * fs_config;
> >
> >      gossip_debug(GOSSIP_IO_DEBUG, "- build_context_flow called\n");
> >
> > @@ -1703,7 +1704,16 @@
> >
> >      status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FLOW);
> >
> > -    server_config = PINT_get_server_config_struct(sm_p-
> > >object_ref.fs_id);
> > +    server_config = PINT_get_server_config_struct(sm_p-
> > >object_ref.fs_id);
> > +
> > +    fs_config = PINT_config_find_fs_id(server_config, cur_ctx-
> > >msg.fs_id);
> > +    if(fs_config)
> > +    {
> > +        /* pick up any buffer settings overrides from fs conf */
> > +        cur_ctx->flow_desc.buffer_size = fs_config->fp_buffer_size;
> > +        cur_ctx->flow_desc.buffers_per_flow = fs_config-
> > >fp_buffers_per_flow;
> > +    }
> > +
> >      ret = job_flow(
> >          &cur_ctx->flow_desc, sm_p, status_user_tag,
> >          &cur_ctx->flow_status, &cur_ctx->flow_job_id,
> > diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/
> > common/misc/server-config.c
> > --- pvfs2/src/common/misc/server-config.c   2006-08-02
> > 17:13:00.000000000 +0200
> > +++ pvfs2-new/src/common/misc/server-config.c       2006-08-02
> > 23:29:17.000000000 +0200
> > @@ -67,6 +67,8 @@
> >  static DOTCONF_CB(get_bmi_module_list);
> >  static DOTCONF_CB(get_flow_module_list);
> >  static DOTCONF_CB(get_handle_recycle_timeout_seconds);
> > +static DOTCONF_CB(get_flow_buffer_size_bytes);
> > +static DOTCONF_CB(get_flow_buffers_per_flow);
> >  static DOTCONF_CB(get_attr_cache_keywords_list);
> >  static DOTCONF_CB(get_attr_cache_size);
> >  static DOTCONF_CB(get_attr_cache_max_num_elems);
> > @@ -573,6 +575,14 @@
> >      {"FlowModules",ARG_LIST, get_flow_module_list,NULL,
> >          CTX_DEFAULTS|CTX_GLOBAL,"flowproto_multiqueue,"},
> >
> > +    /* buffer size to use for bulk data transfers */
> > +    {"FlowBufferSizeBytes", ARG_INT,
> > +         get_flow_buffer_size_bytes, NULL, CTX_FILESYSTEM,"262144"},
> > +
> > +    /* number of buffers to use for bulk data transfers */
> > +    {"FlowBuffersPerFlow", ARG_INT,
> > +         get_flow_buffers_per_flow, NULL, CTX_FILESYSTEM,"8"},
> > +
> >      /* The TROVE storage layer has a management component that
> > deals with
> >       * allocating handle values for new metafiles and datafiles.
> > The underlying
> >       * trove module can be given a hint to tell it how long to
> > wait before
> > @@ -979,6 +989,8 @@
> >      fs_conf->encoding = ENCODING_DEFAULT;
> >      fs_conf->trove_sync_meta = TROVE_SYNC;
> >      fs_conf->trove_sync_data = TROVE_SYNC;
> > +    fs_conf->fp_buffer_size = -1;
> > +    fs_conf->fp_buffers_per_flow = -1;
> >
> >      if (!config_s->file_systems)
> >      {
> > @@ -1382,6 +1394,31 @@
> >  }
> >
> >
> > +DOTCONF_CB(get_flow_buffer_size_bytes)
> > +{
> > +    struct filesystem_configuration_s *fs_conf = NULL;
> > +    struct server_configuration_s *config_s =
> > +        (struct server_configuration_s *)cmd->context;
> > +
> > +    fs_conf = (struct filesystem_configuration_s *)
> > +        PINT_llist_head(config_s->file_systems);
> > +    fs_conf->fp_buffer_size = cmd->data.value;
> > +    return NULL;
> > +}
> > +
> > +DOTCONF_CB(get_flow_buffers_per_flow)
> > +{
> > +    struct filesystem_configuration_s *fs_conf = NULL;
> > +    struct server_configuration_s *config_s =
> > +        (struct server_configuration_s *)cmd->context;
> > +
> > +    fs_conf = (struct filesystem_configuration_s *)
> > +        PINT_llist_head(config_s->file_systems);
> > +    fs_conf->fp_buffers_per_flow = cmd->data.value;
> > +
> > +    return NULL;
> > +}
> > +
> >  DOTCONF_CB(get_attr_cache_keywords_list)
> >  {
> >      int i = 0, len = 0;
> > @@ -2298,6 +2335,9 @@
> >              src_fs->attr_cache_max_num_elems;
> >          dest_fs->trove_sync_meta = src_fs->trove_sync_meta;
> >          dest_fs->trove_sync_data = src_fs->trove_sync_data;
> > +
> > +        dest_fs->fp_buffer_size = src_fs->fp_buffer_size;
> > +        dest_fs->fp_buffers_per_flow = src_fs->fp_buffers_per_flow;
> >      }
> >  }
> >
> > diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/
> > common/misc/server-config.h
> > --- pvfs2/src/common/misc/server-config.h   2006-07-13
> > 07:11:40.000000000 +0200
> > +++ pvfs2-new/src/common/misc/server-config.h       2006-08-02
> > 23:28:17.000000000 +0200
> > @@ -85,6 +85,9 @@
> >      int coalescing_high_watermark;
> >      int coalescing_low_watermark;
> >
> > +    int fp_buffer_size;
> > +    int fp_buffers_per_flow;
> > +
> >  } filesystem_configuration_s;
> >
> >  typedef struct distribution_param_configuration_s
> > diff -Naur pvfs2/src/io/flow/flow.c pvfs2-new/src/io/flow/flow.c
> > --- pvfs2/src/io/flow/flow.c        2006-06-28 23:03:08.000000000 +0200
> > +++ pvfs2-new/src/io/flow/flow.c    2006-08-02 23:26:24.000000000 +0200
> > @@ -306,6 +306,8 @@
> >      flow_d->aggregate_size = -1;
> >      flow_d->state = FLOW_INITIAL;
> >      flow_d->type = FLOWPROTO_DEFAULT;
> > +    flow_d->buffers_per_flow = -1;
> > +    flow_d->buffer_size = -1;
> >
> >      flow_d->flow_mutex = (tmp_mutex ? tmp_mutex : gen_mutex_build());
> >      assert(flow_d->flow_mutex);
> > diff -Naur pvfs2/src/io/flow/flow.h pvfs2-new/src/io/flow/flow.h
> > --- pvfs2/src/io/flow/flow.h        2005-12-14 22:50:25.000000000 +0100
> > +++ pvfs2-new/src/io/flow/flow.h    2006-08-02 23:26:24.000000000 +0200
> > @@ -120,6 +120,10 @@
> >      /* information about the datafile that this flow will access */
> >      PINT_request_file_data file_data;
> >
> > +    /* the buffer settings may be ignored by some protocols */
> > +    int buffer_size;            /* buffer size to use */
> > +    int buffers_per_flow;       /* number of buffers to allow per
> > flow */
> > +
> >     /***********************************************************/
> >      /* fields that can be read publicly upon completion */
> >
> > diff -Naur pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-
> > multiqueue.c pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto-
> > multiqueue.c
> > --- pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c
> > 2006-05-28 18:52:08.000000000 +0200
> > +++ pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto-
> > multiqueue.c        2006-08-02 23:26:24.000000000 +0200
> > @@ -22,8 +22,12 @@
> >  #include "pint-perf-counter.h"
> >  #include "pvfs2-internal.h"
> >
> > +/* the following buffer settings are used by default if none are
> > specified in
> > + * the flow descriptor
> > + */
> >  #define BUFFERS_PER_FLOW 8
> >  #define BUFFER_SIZE (256*1024)
> > +
> >  #define MAX_REGIONS 64
> >
> >  #define FLOW_CLEANUP
> > (__flow_data)                                     \
> > @@ -72,7 +76,7 @@
> >  struct fp_private_data
> >  {
> >      flow_descriptor *parent;
> > -    struct fp_queue_item prealloc_array[BUFFERS_PER_FLOW];
> > +    struct fp_queue_item* prealloc_array;
> >      struct qlist_head list_link;
> >      PVFS_size total_bytes_processed;
> >      int next_seq;
> > @@ -540,7 +544,21 @@
> >              PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
> >      }
> >
> > -    for(i=0; i<BUFFERS_PER_FLOW; i++)
> > +    if(flow_d->buffer_size < 1)
> > +        flow_d->buffer_size = BUFFER_SIZE;
> > +    if(flow_d->buffers_per_flow < 1)
> > +        flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
> > +
> > +    flow_data->prealloc_array = (struct fp_queue_item*)
> > +        malloc(flow_d->buffers_per_flow*sizeof(struct
> > fp_queue_item));
> > +    if(!flow_data->prealloc_array)
> > +    {
> > +        free(flow_data);
> > +        return(-PVFS_ENOMEM);
> > +    }
> > +    memset(flow_data->prealloc_array, 0,
> > +        flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
> > +    for(i=0; i<flow_d->buffers_per_flow; i++)
> >      {
> >          flow_data->prealloc_array[i].parent = flow_d;
> >          flow_data->prealloc_array[i].bmi_callback.data =
> > @@ -557,7 +575,7 @@
> >          /* put all of the buffers on empty list, we don't really
> > do any
> >           * queueing for this type of flow
> >           */
> > -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> > +        for(i=0; i<flow_d->buffers_per_flow; i++)
> >          {
> >              qlist_add_tail(&flow_data->prealloc_array[i].list_link,
> >                  &flow_data->empty_list);
> > @@ -583,7 +601,7 @@
> >          /* put all of the buffers on empty list, we don't really
> > do any
> >           * queueing for this type of flow
> >           */
> > -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> > +        for(i=0; i<flow_d->buffers_per_flow; i++)
> >          {
> >              qlist_add_tail(&flow_data->prealloc_array[i].list_link,
> >                  &flow_data->empty_list);
> > @@ -604,9 +622,9 @@
> >      else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
> >          flow_d->dest.endpoint_id == BMI_ENDPOINT)
> >      {
> > -        flow_data->initial_posts = BUFFERS_PER_FLOW;
> > +        flow_data->initial_posts = flow_d->buffers_per_flow;
> >          gen_mutex_lock(flow_data->parent->flow_mutex);
> > -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> > +        for(i=0; i<flow_d->buffers_per_flow; i++)
> >          {
> >              gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
> >                  "flowproto-multiqueue forcing bmi_send_callback_fn.
> > \n");
> > @@ -635,7 +653,7 @@
> >          flow_data->initial_posts = 1;
> >
> >          /* place remaining buffers on "empty" queue */
> > -        for(i=1; i<BUFFERS_PER_FLOW; i++)
> > +        for(i=1; i<flow_d->buffers_per_flow; i++)
> >          {
> >              qlist_add_tail(&flow_data->prealloc_array[i].list_link,
> >                  &flow_data->empty_list);
> > @@ -760,7 +778,7 @@
> >          {
> >              /* if the q_item has not been used, allocate a buffer */
> >              q_item->buffer = BMI_memalloc(q_item->parent-
> > >src.u.bmi.address,
> > -                BUFFER_SIZE, BMI_RECV);
> > +                q_item->parent->buffer_size, BMI_RECV);
> >              /* TODO: error handling */
> >              assert(q_item->buffer);
> >              q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
> > @@ -784,7 +802,8 @@
> >                  result_tmp->offset_list;
> >              result_tmp->result.size_array =
> >                  result_tmp->size_list;
> > -            result_tmp->result.bytemax = BUFFER_SIZE -
> > bytes_processed;
> > +            result_tmp->result.bytemax = flow_data->parent-
> > >buffer_size -
> > +                bytes_processed;
> >              result_tmp->result.bytes = 0;
> >              result_tmp->result.segmax = MAX_REGIONS;
> >              result_tmp->result.segs = 0;
> > @@ -813,10 +832,10 @@
> >                  tmp_buffer = (void*)((char*)tmp_buffer +
> > old_result_tmp->result.bytes);
> >                  bytes_processed += old_result_tmp->result.bytes;
> >              }
> > -        }while(bytes_processed < BUFFER_SIZE &&
> > +        }while(bytes_processed < flow_data->parent->buffer_size &&
> >              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
> >
> > -        assert(bytes_processed <= BUFFER_SIZE);
> > +        assert(bytes_processed <= flow_data->parent->buffer_size);
> >          if(bytes_processed == 0)
> >          {
> >              qlist_del(&q_item->list_link);
> > @@ -830,7 +849,7 @@
> >          ret = BMI_post_recv(&q_item->posted_id,
> >              q_item->parent->src.u.bmi.address,
> >              q_item->buffer,
> > -            BUFFER_SIZE,
> > +            flow_data->parent->buffer_size,
> >              &tmp_actual_size,
> >              BMI_PRE_ALLOC,
> >              q_item->parent->tag,
> > @@ -1065,7 +1084,7 @@
> >      {
> >          /* if the q_item has not been used, allocate a buffer */
> >          q_item->buffer = BMI_memalloc(q_item->parent-
> > >dest.u.bmi.address,
> > -            BUFFER_SIZE, BMI_SEND);
> > +            q_item->parent->buffer_size, BMI_SEND);
> >          /* TODO: error handling */
> >          assert(q_item->buffer);
> >          q_item->bmi_callback.fn = bmi_send_callback_wrapper;
> > @@ -1093,7 +1112,8 @@
> >              result_tmp->offset_list;
> >          result_tmp->result.size_array =
> >              result_tmp->size_list;
> > -        result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
> > +        result_tmp->result.bytemax = q_item->parent->buffer_size
> > +            - bytes_processed;
> >          result_tmp->result.bytes = 0;
> >          result_tmp->result.segmax = MAX_REGIONS;
> >          result_tmp->result.segs = 0;
> > @@ -1125,10 +1145,10 @@
> >              q_item->buffer_used += old_result_tmp->result.bytes;
> >          }
> >
> > -    }while(bytes_processed < BUFFER_SIZE &&
> > +    }while(bytes_processed < flow_data->parent->buffer_size &&
> >          !PINT_REQUEST_DONE(q_item->parent->file_req_state));
> >
> > -    assert(bytes_processed <= BUFFER_SIZE);
> > +    assert(bytes_processed <= flow_data->parent->buffer_size);
> >
> >      /* important to update the sequence /after/ request processed */
> >      q_item->seq = flow_data->next_seq;
> > @@ -1328,7 +1348,7 @@
> >      {
> >          /* if the q_item has not been used, allocate a buffer */
> >          q_item->buffer = BMI_memalloc(q_item->parent-
> > >src.u.bmi.address,
> > -            BUFFER_SIZE, BMI_RECV);
> > +            q_item->parent->buffer_size, BMI_RECV);
> >          /* TODO: error handling */
> >          assert(q_item->buffer);
> >          q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
> > @@ -1360,7 +1380,8 @@
> >                  result_tmp->offset_list;
> >              result_tmp->result.size_array =
> >                  result_tmp->size_list;
> > -            result_tmp->result.bytemax = BUFFER_SIZE -
> > bytes_processed;
> > +            result_tmp->result.bytemax = flow_data->parent-
> > >buffer_size
> > +                - bytes_processed;
> >              result_tmp->result.bytes = 0;
> >              result_tmp->result.segmax = MAX_REGIONS;
> >              result_tmp->result.segs = 0;
> > @@ -1391,10 +1412,10 @@
> >                      ((char*)tmp_buffer + old_result_tmp-
> > >result.bytes);
> >                  bytes_processed += old_result_tmp->result.bytes;
> >              }
> > -        }while(bytes_processed < BUFFER_SIZE &&
> > +        }while(bytes_processed < flow_data->parent->buffer_size &&
> >              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
> >
> > -        assert(bytes_processed <= BUFFER_SIZE);
> > +        assert(bytes_processed <= flow_data->parent->buffer_size);
> >
> >          flow_data->total_bytes_processed += bytes_processed;
> >
> > @@ -1414,7 +1435,7 @@
> >          ret = BMI_post_recv(&q_item->posted_id,
> >              q_item->parent->src.u.bmi.address,
> >              q_item->buffer,
> > -            BUFFER_SIZE,
> > +            flow_data->parent->buffer_size,
> >              &tmp_actual_size,
> >              BMI_PRE_ALLOC,
> >              q_item->parent->tag,
> > @@ -1458,13 +1479,13 @@
> >      if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
> >          flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
> >      {
> > -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> > +        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
> >          {
> >              if(flow_data->prealloc_array[i].buffer)
> >              {
> >                  BMI_memfree(flow_data->parent->src.u.bmi.address,
> >                      flow_data->prealloc_array[i].buffer,
> > -                    BUFFER_SIZE,
> > +                    flow_data->parent->buffer_size,
> >                      BMI_RECV);
> >              }
> >              result_tmp = &(flow_data->prealloc_array
> > [i].result_chain);
> > @@ -1481,13 +1502,13 @@
> >      else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
> >          flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
> >      {
> > -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> > +        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
> >          {
> >              if(flow_data->prealloc_array[i].buffer)
> >              {
> >                  BMI_memfree(flow_data->parent->dest.u.bmi.address,
> >                      flow_data->prealloc_array[i].buffer,
> > -                    BUFFER_SIZE,
> > +                    flow_data->parent->buffer_size,
> >                      BMI_SEND);
> >              }
> >              result_tmp = &(flow_data->prealloc_array
> > [i].result_chain);
> > @@ -1507,7 +1528,7 @@
> >          if(flow_data->intermediate)
> >          {
> >              BMI_memfree(flow_data->parent->dest.u.bmi.address,
> > -                flow_data->intermediate, BUFFER_SIZE, BMI_SEND);
> > +                flow_data->intermediate, flow_data->parent-
> > >buffer_size, BMI_SEND);
> >          }
> >      }
> >      else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
> > @@ -1516,9 +1537,11 @@
> >          if(flow_data->intermediate)
> >          {
> >              BMI_memfree(flow_data->parent->src.u.bmi.address,
> > -                flow_data->intermediate, BUFFER_SIZE, BMI_RECV);
> > +                flow_data->intermediate, flow_data->parent-
> > >buffer_size, BMI_RECV);
> >          }
> >      }
> > +
> > +    free(flow_data->prealloc_array);
> >  }
> >
> >  /* mem_to_bmi_callback()
> > @@ -1574,7 +1597,7 @@
> >          q_item->result_chain.offset_list;
> >      q_item->result_chain.result.size_array =
> >          q_item->result_chain.size_list;
> > -    q_item->result_chain.result.bytemax = BUFFER_SIZE;
> > +    q_item->result_chain.result.bytemax = flow_data->parent-
> > >buffer_size;
> >      q_item->result_chain.result.bytes = 0;
> >      q_item->result_chain.result.segmax = MAX_REGIONS;
> >      q_item->result_chain.result.segs = 0;
> > @@ -1590,14 +1613,14 @@
> >
> >      /* was MAX_REGIONS enough to satisfy this step? */
> >      if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
> > -        q_item->result_chain.result.bytes < BUFFER_SIZE)
> > +        q_item->result_chain.result.bytes < flow_data->parent-
> > >buffer_size)
> >      {
> >          /* create an intermediate buffer */
> >          if(!flow_data->intermediate)
> >          {
> >              flow_data->intermediate = BMI_memalloc(
> >                  flow_data->parent->dest.u.bmi.address,
> > -                BUFFER_SIZE, BMI_SEND);
> > +                flow_data->parent->buffer_size, BMI_SEND);
> >              /* TODO: error handling */
> >              assert(flow_data->intermediate);
> >          }
> > @@ -1615,7 +1638,7 @@
> >          do
> >          {
> >              q_item->result_chain.result.bytemax =
> > -                (BUFFER_SIZE - bytes_processed);
> > +                (flow_data->parent->buffer_size - bytes_processed);
> >              q_item->result_chain.result.bytes = 0;
> >              q_item->result_chain.result.segmax = MAX_REGIONS;
> >              q_item->result_chain.result.segs = 0;
> > @@ -1638,10 +1661,10 @@
> >                  memcpy(dest_ptr, src_ptr, q_item-
> > >result_chain.size_list[i]);
> >                  bytes_processed += q_item->result_chain.size_list[i];
> >              }
> > -        }while(bytes_processed < BUFFER_SIZE &&
> > +        }while(bytes_processed < flow_data->parent->buffer_size &&
> >              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
> >
> > -        assert (bytes_processed <= BUFFER_SIZE);
> > +        assert (bytes_processed <= flow_data->parent->buffer_size);
> >
> >          /* setup for BMI operation */
> >          flow_data->tmp_buffer_list[0] = flow_data->intermediate;
> > @@ -1761,7 +1784,7 @@
> >          do
> >          {
> >              q_item->result_chain.result.bytemax =
> > -                (BUFFER_SIZE - bytes_processed);
> > +                (q_item->parent->buffer_size - bytes_processed);
> >              q_item->result_chain.result.bytes = 0;
> >              q_item->result_chain.result.segmax = MAX_REGIONS;
> >              q_item->result_chain.result.segs = 0;
> > @@ -1785,10 +1808,10 @@
> >                  memcpy(dest_ptr, src_ptr, region_size);
> >                  bytes_processed += region_size;
> >              }
> > -        }while(bytes_processed < BUFFER_SIZE &&
> > +        }while(bytes_processed < flow_data->parent->buffer_size &&
> >              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
> >
> > -        assert(bytes_processed <= BUFFER_SIZE);
> > +        assert(bytes_processed <= flow_data->parent->buffer_size);
> >      }
> >
> >      /* are we done? */
> > @@ -1807,7 +1830,7 @@
> >          q_item->result_chain.offset_list;
> >      q_item->result_chain.result.size_array =
> >          q_item->result_chain.size_list;
> > -    q_item->result_chain.result.bytemax = BUFFER_SIZE;
> > +    q_item->result_chain.result.bytemax = flow_data->parent-
> > >buffer_size;
> >      q_item->result_chain.result.bytes = 0;
> >      q_item->result_chain.result.segmax = MAX_REGIONS;
> >      q_item->result_chain.result.segs = 0;
> > @@ -1822,22 +1845,22 @@
> >
> >      /* was MAX_REGIONS enough to satisfy this step? */
> >      if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
> > -        q_item->result_chain.result.bytes < BUFFER_SIZE)
> > +        q_item->result_chain.result.bytes < flow_data->parent-
> > >buffer_size)
> >      {
> >          /* create an intermediate buffer */
> >          if(!flow_data->intermediate)
> >          {
> >              flow_data->intermediate = BMI_memalloc(
> >                  flow_data->parent->src.u.bmi.address,
> > -                BUFFER_SIZE, BMI_RECV);
> > +                flow_data->parent->buffer_size, BMI_RECV);
> >              /* TODO: error handling */
> >              assert(flow_data->intermediate);
> >          }
> >          /* setup for BMI operation */
> >          flow_data->tmp_buffer_list[0] = flow_data->intermediate;
> >          buffer_type = BMI_PRE_ALLOC;
> > -        q_item->buffer_used = BUFFER_SIZE;
> > -        total_size = BUFFER_SIZE;
> > +        q_item->buffer_used = flow_data->parent->buffer_size;
> > +        total_size = flow_data->parent->buffer_size;
> >          size_array = &q_item->buffer_used;
> >          segs = 1;
> >          /* we will copy data out on next iteration */
> > diff -Naur pvfs2/src/server/io.sm pvfs2-new/src/server/io.sm
> > --- pvfs2/src/server/io.sm  2006-06-05 21:57:28.000000000 +0200
> > +++ pvfs2-new/src/server/io.sm      2006-08-02 23:27:24.000000000 +0200
> > @@ -162,6 +162,7 @@
> >      int err = -PVFS_EIO;
> >      job_id_t tmp_id;
> >      struct server_configuration_s *user_opts =
> > get_server_config_struct();
> > +    struct filesystem_configuration_s *fs_conf;
> >
> >      s_op->u.io.flow_d = PINT_flow_alloc();
> >      if (!s_op->u.io.flow_d)
> > @@ -201,6 +202,15 @@
> >      s_op->u.io.flow_d->user_ptr = NULL;
> >      s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
> >
> > +    fs_conf = PINT_config_find_fs_id(user_opts,
> > +        s_op->req->u.io.fs_id);
> > +    if(fs_conf)
> > +    {
> > +        /* pick up any buffer settings overrides from fs conf */
> > +        s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
> > +        s_op->u.io.flow_d->buffers_per_flow = fs_conf-
> > >fp_buffers_per_flow;
> > +    }
> > +
> >      gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, "
> >          "server_nr: %d, server_ct: %d\n",
> >          lld(s_op->u.io.flow_d->file_data.fsize),
> > diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/
> > common/misc/server-config.c
> > --- pvfs2/src/common/misc/server-config.c   2006-08-02
> > 17:13:00.000000000 +0200
> > +++ pvfs2-new/src/common/misc/server-config.c       2006-08-03
> > 23:10:28.000000000 +0200
> > @@ -73,6 +73,7 @@
> >  static DOTCONF_CB(get_trove_sync_meta);
> >  static DOTCONF_CB(get_trove_sync_data);
> >  static DOTCONF_CB(get_db_cache_size_bytes);
> > +static DOTCONF_CB(get_trove_max_concurrent_io);
> >  static DOTCONF_CB(get_db_cache_type);
> >  static DOTCONF_CB(get_param);
> >  static DOTCONF_CB(get_value);
> > @@ -423,6 +424,12 @@
> >      {"ID",ARG_INT, get_filesystem_collid,NULL,
> >          CTX_FILESYSTEM,NULL},
> >
> > +    /* maximum number of AIO operations that Trove will allow to run
> > +     * concurrently
> > +     */
> > +    {"TroveMaxConcurrentIO", ARG_INT, get_trove_max_concurrent_io,
> > NULL,
> > +        CTX_DEFAULTS|CTX_GLOBAL,"16"},
> > +
> >      /* The gossip interface in pvfs allows users to specify different
> >       * levels of logging for the pvfs server.  The output of these
> >       * different log levels is written to a file, which is
> > specified in
> > @@ -742,6 +749,7 @@
> >      config_s->client_job_flow_timeout =
> > PVFS2_CLIENT_JOB_FLOW_TIMEOUT_DEFAULT;
> >      config_s->client_retry_limit = PVFS2_CLIENT_RETRY_LIMIT_DEFAULT;
> >      config_s->client_retry_delay_ms =
> > PVFS2_CLIENT_RETRY_DELAY_MS_DEFAULT;
> > +    config_s->trove_max_concurrent_io = 16;
> >
> >      if (cache_config_files(
> >              config_s, global_config_filename,
> > server_config_filename))
> > @@ -1548,6 +1556,14 @@
> >      return NULL;
> >  }
> >
> > +DOTCONF_CB(get_trove_max_concurrent_io)
> > +{
> > +    struct server_configuration_s *config_s =
> > +        (struct server_configuration_s *)cmd->context;
> > +    config_s->trove_max_concurrent_io = cmd->data.value;
> > +    return NULL;
> > +}
> > +
> >  DOTCONF_CB(get_db_cache_type)
> >  {
> >      struct server_configuration_s *config_s =
> > diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/
> > common/misc/server-config.h
> > --- pvfs2/src/common/misc/server-config.h   2006-07-13
> > 07:11:40.000000000 +0200
> > +++ pvfs2-new/src/common/misc/server-config.h       2006-08-03
> > 23:26:23.000000000 +0200
> > @@ -147,6 +147,8 @@
> >                                         if zero, use defaults */
> >      char * db_cache_type;
> >
> > +    int trove_max_concurrent_io;    /* maximum number of
> > simultaneous I/O ops */
> > +
> >  } server_configuration_s;
> >
> >  int PINT_parse_config(
> > diff -Naur pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c pvfs2-new/
> > src/io/trove/trove-dbpf/dbpf-bstream.c
> > --- pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c    2006-06-23
> > 22:59:29.000000000 +0200
> > +++ pvfs2-new/src/io/trove/trove-dbpf/dbpf-bstream.c        2006-08-03
> > 23:05:37.000000000 +0200
> > @@ -31,7 +31,7 @@
> >
> >  #define AIOCB_ARRAY_SZ 64
> >
> > -#define DBPF_MAX_IOS_IN_PROGRESS  16
> > +extern int TROVE_max_concurrent_io;
> >  static int s_dbpf_ios_in_progress = 0;
> >  static dbpf_op_queue_p s_dbpf_io_ready_queue = NULL;
> >  static gen_mutex_t s_dbpf_io_mutex = GEN_MUTEX_INITIALIZER;
> > @@ -277,9 +277,6 @@
> >                 (cur_op->op.type == BSTREAM_WRITE_LIST));
> >          dbpf_op_queue_remove(cur_op);
> >
> > -        assert(s_dbpf_ios_in_progress <
> > -               (DBPF_MAX_IOS_IN_PROGRESS + 1));
> > -
> >          gossip_debug(GOSSIP_TROVE_DEBUG, "starting delayed I/O "
> >                       "operation %p (%d in progress)\n", cur_op,
> >                       s_dbpf_ios_in_progress);
> > @@ -363,7 +360,7 @@
> >      {
> >          s_dbpf_ios_in_progress--;
> >      }
> > -    if (s_dbpf_ios_in_progress < DBPF_MAX_IOS_IN_PROGRESS)
> > +    if (s_dbpf_ios_in_progress < TROVE_max_concurrent_io)
> >      {
> >          s_dbpf_ios_in_progress++;
> >      }
> > diff -Naur pvfs2/src/io/trove/trove.c pvfs2-new/src/io/trove/trove.c
> > --- pvfs2/src/io/trove/trove.c      2006-06-16 23:01:13.000000000 +0200
> > +++ pvfs2-new/src/io/trove/trove.c  2006-08-03 23:08:08.000000000 +0200
> > @@ -31,6 +31,7 @@
> >
> >  int TROVE_db_cache_size_bytes = 0;
> >  int TROVE_shm_key_hint = 0;
> > +int TROVE_max_concurrent_io = 16;
> >
> >  /** Initiate reading from a contiguous region in a bstream into a
> >   *  contiguous region in memory.
> > @@ -964,6 +965,11 @@
> >          TROVE_shm_key_hint = *((int*)parameter);
> >     return(0);
> >      }
> > +    if(option == TROVE_MAX_CONCURRENT_IO)
> > +    {
> > +        TROVE_max_concurrent_io = *((int*)parameter);
> > +   return(0);
> > +    }
> >
> >      method_id = map_coll_id_to_method(coll_id);
> >      if (method_id < 0) {
> > diff -Naur pvfs2/src/io/trove/trove.h pvfs2-new/src/io/trove/trove.h
> > --- pvfs2/src/io/trove/trove.h      2006-07-13 07:11:41.000000000 +0200
> > +++ pvfs2-new/src/io/trove/trove.h  2006-08-03 23:07:37.000000000 +0200
> > @@ -72,6 +72,7 @@
> >      TROVE_COLLECTION_ATTR_CACHE_MAX_NUM_ELEMS,
> >      TROVE_COLLECTION_ATTR_CACHE_INITIALIZE,
> >      TROVE_DB_CACHE_SIZE_BYTES,
> > +    TROVE_MAX_CONCURRENT_IO,
> >      TROVE_COLLECTION_COALESCING_HIGH_WATERMARK,
> >      TROVE_COLLECTION_COALESCING_LOW_WATERMARK,
> >      TROVE_COLLECTION_META_SYNC_MODE,
> > diff -Naur pvfs2/src/server/pvfs2-server.c pvfs2-new/src/server/
> > pvfs2-server.c
> > --- pvfs2/src/server/pvfs2-server.c 2006-07-13 07:11:42.000000000
> > +0200
> > +++ pvfs2-new/src/server/pvfs2-server.c     2006-08-03
> > 23:07:01.000000000 +0200
> > @@ -950,6 +950,10 @@
> >
> > &server_config.db_cache_size_bytes);
> >      /* this should never fail */
> >      assert(ret == 0);
> > +    ret = trove_collection_setinfo(0, 0, TROVE_MAX_CONCURRENT_IO,
> > +        &server_config.trove_max_concurrent_io);
> > +    /* this should never fail */
> > +    assert(ret == 0);
> >
> >      /* parse port number and allow trove to use it to help
> > differentiate
> >       * shmem regions if needed
> > _______________________________________________
> > Pvfs2-developers mailing list
> > [email protected]
> > http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers
>
> _______________________________________________
> Pvfs2-developers mailing list
> [email protected]
> http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers
>
>
_______________________________________________
Pvfs2-developers mailing list
[email protected]
http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers

Reply via email to