? io-flow-post.patch
? src/server/.io.sm.swp
Index: src/server/io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
retrieving revision 1.65
diff -r1.65 io.sm
21a22,41
> enum 
> {
>     IO_READ = 1,
>     IO_WRITE = 2,
>     IO_FLOW_COMPLETED = 3
> };
> 
> enum
> {
>     IO_ACK_TAG = 1,
>     IO_FLOW_TAG = 2
> };
> 
> static int PINT_io_post_response(
>     PINT_server_op *s_op, job_status_s *js_p, job_aint user_tag);
> static int PINT_io_setup_flow(
>     PINT_server_op *s_op);
> 
> static int io_start(
>     PINT_server_op *s_op, job_status_s *js_p);
23,25c43,45
<     PINT_server_op *s_op, job_status_s* js_p);
< static int io_send_completion_ack(
<     PINT_server_op *s_op, job_status_s* js_p);
---
>     PINT_server_op *s_op, job_status_s *js_p);
> static int io_send_write_completion_ack(
>     PINT_server_op *s_op, job_status_s *js_p);
27c47,51
<     PINT_server_op *s_op, job_status_s* js_p);
---
>     PINT_server_op *s_op, job_status_s *js_p);
> static int io_start_flow_and_positive_ack(
>     PINT_server_op *s_op, job_status_s *js_p);
> static int io_check_write_completion(
>     PINT_server_op *s_op, job_status_s *js_p);
29c53
<     PINT_server_op *s_op, job_status_s* js_p);
---
>     PINT_server_op *s_op, job_status_s *js_p);
31c55
<     PINT_server_op *s_op, job_status_s* js_p);
---
>     PINT_server_op *s_op, job_status_s *js_p);
36a61,63
>     start,
>     start_flow_and_positive_ack,
>     check_write_completion,
38d64
<     send_negative_ack, 
39a66,67
>     send_negative_ack, 
>     send_write_completion_ack,
41,42c69
<     release,
<     send_completion_ack)
---
>     release)
47c74
<         success => send_positive_ack;
---
>         success => start;
51c78
<     state send_positive_ack
---
>     state start
53,54c80,97
<         run io_send_ack;
<         success => start_flow;
---
>         run io_start;
>         IO_WRITE => start_flow_and_positive_ack;
>         IO_READ  => send_positive_ack;
>         default => send_negative_ack;
>     }
> 
>     state start_flow_and_positive_ack
>     {
>         run io_start_flow_and_positive_ack;
>         success => check_write_completion;
>         default => send_negative_ack;
>     }
> 
>     state check_write_completion
>     {
>         run io_check_write_completion;
>         IO_FLOW_COMPLETED => send_write_completion_ack;
>         success => check_write_completion;
58c101
<     state send_negative_ack 
---
>     state send_positive_ack
60a104
>         success => start_flow;
67c111,116
<         success => send_completion_ack;
---
>         success => release;
>     }
> 
>     state send_negative_ack 
>     {
>         run io_send_ack;
71c120
<     state send_completion_ack
---
>     state send_write_completion_ack
73c122
<         run io_send_completion_ack;
---
>         run io_send_write_completion_ack;
92a142,170
>  * Function: io_start()
>  *
>  * Params:   server_op *s_op, 
>  *           job_status_s* js_p
>  *
>  * Returns:  int
>  *
>  * Synopsis: starts the IO, determines the IO type (read or write) and
>  *           sets the error code appropriately.
>  */
> static int io_start(PINT_server_op *s_op, job_status_s *js_p)
> {
>     if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
>     {
>         js_p->error_code = IO_WRITE;
>     }
>     else if(s_op->req->u.io.io_type == PVFS_IO_READ)
>     {
>         js_p->error_code = IO_READ;
>     }
>     else
>     {
>         gossip_lerr("Server: IO SM: unknown IO type requested.\n");
>         js_p->error_code = -PVFS_EINVAL;
>     }
>     return 1;
> }
> 
> /*
112,121c190
<     int err = -PVFS_EIO;
<     job_id_t tmp_id;
<     struct server_configuration_s *user_opts = get_server_config_struct();
<         
<     /* this is where we report the file size to the client before
<      * starting the I/O transfer, or else report an error if we
<      * failed to get the size, or failed for permission reasons
<      */
<     s_op->resp.status = js_p->error_code;
<     s_op->resp.u.io.bstream_size = s_op->ds_attr.b_size;
---
>     int ret;
123,125c192,193
<     err = PINT_encode(&s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
<                       s_op->addr, s_op->decoded.enc_type);
<     if (err < 0)
---
>     if(js_p->error_code == IO_READ ||
>        js_p->error_code == IO_WRITE)
127,128c195,201
<         gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
<         js_p->error_code = err;
---
>         js_p->error_code = 0;
>     }
>     
>     ret = PINT_io_post_response(s_op, js_p, IO_ACK_TAG);
>     if(ret < 0)
>     {
>         js_p->error_code = ret;
131,138c204
< 
<     err = job_bmi_send_list(
<         s_op->addr, s_op->encoded.buffer_list, s_op->encoded.size_list,
<         s_op->encoded.list_count, s_op->encoded.total_size,
<         s_op->tag, s_op->encoded.buffer_type, 0, s_op, 0, js_p,
<         &tmp_id, server_job_context, user_opts->server_job_bmi_timeout);
< 
<     return err;
---
>     return ret;
161c227
<     int err = -PVFS_EIO;
---
>     int ret;
164,167c230,232
<     struct filesystem_configuration_s *fs_conf;
<         
<     s_op->u.io.flow_d = PINT_flow_alloc();
<     if (!s_op->u.io.flow_d)
---
>     
>     ret = PINT_io_setup_flow(s_op);
>     if(ret < 0)
169c234
<         js_p->error_code = -PVFS_ENOMEM;
---
>         js_p->error_code = ret;
173,175c238,239
<     /* we still have the file size stored in the response structure 
<      * that we sent in the previous state, other details come from
<      * request
---
>     /* We start the flow separately only on reads, so this state
>      * only gets called if we're doing a read.
177,180c241,251
<     s_op->u.io.flow_d->file_data.fsize = s_op->resp.u.io.bstream_size;
<     s_op->u.io.flow_d->file_data.dist = s_op->req->u.io.io_dist;
<     s_op->u.io.flow_d->file_data.server_nr = s_op->req->u.io.server_nr;
<     s_op->u.io.flow_d->file_data.server_ct = s_op->req->u.io.server_ct;
---
>     s_op->u.io.flow_d->src.endpoint_id = TROVE_ENDPOINT;
>     s_op->u.io.flow_d->src.u.trove.handle = s_op->req->u.io.handle;
>     s_op->u.io.flow_d->src.u.trove.coll_id = s_op->req->u.io.fs_id;
>     s_op->u.io.flow_d->dest.endpoint_id = BMI_ENDPOINT;
>     s_op->u.io.flow_d->dest.u.bmi.address = s_op->addr;
>     s_op->u.io.flow_d->file_data.extend_flag = 0;
> 
>     gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
>                  "read data.\n");
>     ret = job_flow(s_op->u.io.flow_d, s_op, IO_FLOW_TAG, js_p, &tmp_id,
>                    server_job_context, user_opts->server_job_flow_timeout);
182,183c253
<     /* on writes, we allow the bstream to be extended at EOF */
<     if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
---
>     if(ret < 0)
185,187c255,256
<         gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
<                      "write data.\n");
<         s_op->u.io.flow_d->file_data.extend_flag = 1;
---
>         js_p->error_code = ret;
>         return 1;
189c258,288
<     else
---
>     return 0;
> }
> 
> /*
>  * Function: io_start_flow_and_positive_ack()
>  *
>  * Params:   server_op *b, 
>  *           job_status_s* js_p
>  *
>  * Pre:      determined the IO is a write, so we can post the flow
>  *           and send the ack together
>  *
>  * Post:     the positive ack has completed 
>  *
>  * Returns:  int
>  *
>  * Synopsis: releases the operation from the scheduler
>  */
> static int io_start_flow_and_positive_ack(
>     PINT_server_op *s_op, job_status_s *js_p)
> {
>     int ret;
>     struct server_configuration_s *user_opts = get_server_config_struct();
>     job_id_t tmp_id;
> 
>     js_p->error_code = 0;
>     s_op->u.io.positive_ack_done = 0;
>     s_op->u.io.flow_done = 0;
>     
>     ret = PINT_io_setup_flow(s_op);
>     if(ret < 0)
191,193c290,291
<         gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
<                      "read data.\n");
<         s_op->u.io.flow_d->file_data.extend_flag = 0;
---
>         js_p->error_code = ret;
>         return 1;
196,202c294,299
<     s_op->u.io.flow_d->file_req = s_op->req->u.io.file_req;
<     s_op->u.io.flow_d->file_req_offset = s_op->req->u.io.file_req_offset;
<     s_op->u.io.flow_d->mem_req = NULL;
<     s_op->u.io.flow_d->aggregate_size = s_op->req->u.io.aggregate_size;
<     s_op->u.io.flow_d->tag = s_op->tag;
<     s_op->u.io.flow_d->user_ptr = NULL;
<     s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
---
>     s_op->u.io.flow_d->src.endpoint_id = BMI_ENDPOINT;
>     s_op->u.io.flow_d->src.u.bmi.address = s_op->addr;
>     s_op->u.io.flow_d->dest.endpoint_id = TROVE_ENDPOINT;
>     s_op->u.io.flow_d->dest.u.trove.handle = s_op->req->u.io.handle;
>     s_op->u.io.flow_d->dest.u.trove.coll_id = s_op->req->u.io.fs_id;
>     s_op->u.io.flow_d->file_data.extend_flag = 1;
204,206c301,305
<     fs_conf = PINT_config_find_fs_id(user_opts, 
<         s_op->req->u.io.fs_id);
<     if(fs_conf)
---
>     gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
>                      "write data.\n");
>     ret = job_flow(s_op->u.io.flow_d, s_op, IO_FLOW_TAG, js_p, &tmp_id,
>                    server_job_context, user_opts->server_job_flow_timeout);
>     if(ret < 0)
208,210c307,308
<         /* pick up any buffer settings overrides from fs conf */
<         s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
<         s_op->u.io.flow_d->buffers_per_flow = fs_conf->fp_buffers_per_flow;
---
>         js_p->error_code = ret;
>         return 1;
213,217c311,321
<     gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, " 
<         "server_nr: %d, server_ct: %d\n",
<         lld(s_op->u.io.flow_d->file_data.fsize),
<         (int)s_op->u.io.flow_d->file_data.server_nr,
<         (int)s_op->u.io.flow_d->file_data.server_ct);
---
>     if(ret == 1)
>     {
>         s_op->u.io.flow_done = 1;
>     }
>     
>     ret = PINT_io_post_response(s_op, js_p, IO_ACK_TAG);
>     if(ret < 0)
>     {
>         js_p->error_code = ret;
>         return 1;
>     }
219,223c323,331
<     gossip_debug(GOSSIP_IO_DEBUG, "      file_req_offset: %lld, "
<         "aggregate_size: %lld, handle: %llu\n", 
<         lld(s_op->u.io.flow_d->file_req_offset),
<         lld(s_op->u.io.flow_d->aggregate_size),
<         llu(s_op->req->u.io.handle));
---
>     if(ret == 1)
>     {
>         /* ack completed already! */
>         s_op->u.io.positive_ack_done = 1;
>         if(s_op->u.io.flow_done)
>         {
>             return 1;
>         }
>     }
225,226c333,360
<     /* set endpoints depending on type of io requested */
<     if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
---
>     return 0;
> }
> 
> /*
>  * Function: io_check_write_completion()
>  *
>  * Params:   server_op *b, 
>  *           job_status_s* js_p
>  *
>  * Pre:      both the flow and positive ack have been posted, and we
>  *           need to wait for both to complete.
>  *
>  * Post:     either the flow or positive ack (or both) have completed.
>  *
>  * Returns:  int
>  *
>  * Synopsis: In the case of writes, both the positive ack and the flow
>  *           get posted together.  This state allows us to gather wait
>  *           for completion of both jobs.
>  */
> static int io_check_write_completion(
>     PINT_server_op *s_op, job_status_s *js_p)
> {
>     if(js_p->status_user_tag == IO_FLOW_TAG)
>     {
>         s_op->u.io.flow_done = 1;
>     }
>     else if(js_p->status_user_tag == IO_ACK_TAG)
228,240c362
<         s_op->u.io.flow_d->src.endpoint_id = BMI_ENDPOINT;
<         s_op->u.io.flow_d->src.u.bmi.address = s_op->addr;
<         s_op->u.io.flow_d->dest.endpoint_id = TROVE_ENDPOINT;
<         s_op->u.io.flow_d->dest.u.trove.handle = s_op->req->u.io.handle;
<         s_op->u.io.flow_d->dest.u.trove.coll_id = s_op->req->u.io.fs_id;
<     }
<     else if (s_op->req->u.io.io_type == PVFS_IO_READ)
<     {
<         s_op->u.io.flow_d->src.endpoint_id = TROVE_ENDPOINT;
<         s_op->u.io.flow_d->src.u.trove.handle = s_op->req->u.io.handle;
<         s_op->u.io.flow_d->src.u.trove.coll_id = s_op->req->u.io.fs_id;
<         s_op->u.io.flow_d->dest.endpoint_id = BMI_ENDPOINT;
<         s_op->u.io.flow_d->dest.u.bmi.address = s_op->addr;
---
>         s_op->u.io.positive_ack_done = 1;
244c366,368
<         gossip_lerr("Server: IO SM: unknown IO type requested.\n");
---
>         gossip_lerr("io_check_write_completion: "
>                     "Invalid value for status_user_tag: %d\n", 
>                     js_p->status_user_tag);
249,250c373,377
<     err = job_flow(s_op->u.io.flow_d, s_op, 0, js_p, &tmp_id,
<                    server_job_context, user_opts->server_job_flow_timeout);
---
>     if(s_op->u.io.flow_done && s_op->u.io.positive_ack_done)
>     {
>         js_p->error_code = IO_FLOW_COMPLETED;
>         return 1;
>     }
252c379,380
<     return err;
---
> 
>     return 0;
330c458
<  * Function: io_send_completion_ack()
---
>  * Function: io_send_write_completion_ack()
347c475
< static int io_send_completion_ack(
---
> static int io_send_write_completion_ack(
362a491,495
>     if(js_p->error_code == IO_FLOW_COMPLETED)
>     {
>         js_p->error_code = 0;
>     }
> 
397a531,615
> static int PINT_io_post_response(
>     PINT_server_op *s_op, job_status_s *js_p, job_aint user_tag)
> {
>     int err = -PVFS_EIO;
>     job_id_t tmp_id;
>     struct server_configuration_s *user_opts = get_server_config_struct();
>         
>     /* this is where we report the file size to the client before
>      * starting the I/O transfer, or else report an error if we
>      * failed to get the size, or failed for permission reasons
>      */
>     s_op->resp.status = js_p->error_code;
>     s_op->resp.u.io.bstream_size = s_op->ds_attr.b_size;
> 
>     err = PINT_encode(&s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
>                       s_op->addr, s_op->decoded.enc_type);
>     if (err < 0)
>     {
>         gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
>         js_p->error_code = err;
>         return 1;
>     }
> 
>     err = job_bmi_send_list(
>         s_op->addr, s_op->encoded.buffer_list, s_op->encoded.size_list,
>         s_op->encoded.list_count, s_op->encoded.total_size,
>         s_op->tag, s_op->encoded.buffer_type, 0, s_op, user_tag, js_p,
>         &tmp_id, server_job_context, user_opts->server_job_bmi_timeout);
> 
>     return err;
> }
> 
> static int PINT_io_setup_flow(
>     PINT_server_op *s_op)
> {
>     struct server_configuration_s *user_opts = get_server_config_struct();
>     struct filesystem_configuration_s *fs_conf;
> 
>     s_op->u.io.flow_d = PINT_flow_alloc();
>     if (!s_op->u.io.flow_d)
>     {
>         return -PVFS_ENOMEM;
>     }
> 
>     /* we still have the file size stored in the response structure 
>      * that we sent in the previous state, other details come from
>      * request
>      */
>     s_op->u.io.flow_d->file_data.fsize = s_op->resp.u.io.bstream_size;
>     s_op->u.io.flow_d->file_data.dist = s_op->req->u.io.io_dist;
>     s_op->u.io.flow_d->file_data.server_nr = s_op->req->u.io.server_nr;
>     s_op->u.io.flow_d->file_data.server_ct = s_op->req->u.io.server_ct;
> 
>     s_op->u.io.flow_d->file_req = s_op->req->u.io.file_req;
>     s_op->u.io.flow_d->file_req_offset = s_op->req->u.io.file_req_offset;
>     s_op->u.io.flow_d->mem_req = NULL;
>     s_op->u.io.flow_d->aggregate_size = s_op->req->u.io.aggregate_size;
>     s_op->u.io.flow_d->tag = s_op->tag;
>     s_op->u.io.flow_d->user_ptr = NULL;
>     s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
> 
>     fs_conf = PINT_config_find_fs_id(user_opts, 
>         s_op->req->u.io.fs_id);
>     if(fs_conf)
>     {
>         /* pick up any buffer settings overrides from fs conf */
>         s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
>         s_op->u.io.flow_d->buffers_per_flow = fs_conf->fp_buffers_per_flow;
>     }
> 
>     gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, " 
>         "server_nr: %d, server_ct: %d\n",
>         lld(s_op->u.io.flow_d->file_data.fsize),
>         (int)s_op->u.io.flow_d->file_data.server_nr,
>         (int)s_op->u.io.flow_d->file_data.server_ct);
> 
>     gossip_debug(GOSSIP_IO_DEBUG, "      file_req_offset: %lld, "
>         "aggregate_size: %lld, handle: %llu\n", 
>         lld(s_op->u.io.flow_d->file_req_offset),
>         lld(s_op->u.io.flow_d->aggregate_size),
>         llu(s_op->req->u.io.handle));
> 
>     return 0;
> }
>  
Index: src/server/pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
retrieving revision 1.143
diff -r1.143 pvfs2-server.h
274a275,276
>     int positive_ack_done;
>     int flow_done;
