home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / test / io / bmi / driver-fs-read.c < prev    next >
C/C++ Source or Header  |  2006-09-11  |  29KB  |  910 lines

  1. /*
  2.  * (C) 2001 Clemson University and The University of Chicago
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <unistd.h>
  10. #include <errno.h>
  11. #include <assert.h>
  12. #include <string.h>
  13. #include <mpi.h>
  14. #include "gossip.h"
  15. #include "bmi.h"
  16. #include "bench-initialize.h"
  17. #include "bench-args.h"
  18. #include "bench-mem.h"
  19.  
  20. #define TESTCOUNT 10
  21. #define MSG_SIZE (256*1024)
  22.  
  23. struct svr_xfer_state
  24. {
  25.     int step;
  26.  
  27.     PVFS_BMI_addr_t addr;
  28.     bmi_msg_tag_t tag;
  29.     int mpi_addr;
  30.     int mpi_tag;
  31.  
  32.     void* unexp_buffer;
  33.     bmi_size_t unexp_size;
  34.     void* mpi_unexp_buffer;
  35.     int mpi_unexp_size;
  36.  
  37.     struct request* req;
  38.     struct response* resp;
  39.  
  40.     void** buffer_array;
  41.     int buffer_array_size;
  42.  
  43.     int list_factor;
  44.     void** buffer_list;
  45.     bmi_size_t* size_list;
  46.     MPI_Datatype dtype;
  47.     MPI_Aint* displace_array;
  48.     int* blocklen_array;
  49. };
  50.  
  51. struct request
  52. {
  53.     char filler[25];
  54. };
  55.  
  56. struct response
  57. {
  58.     int filler[400];
  59. };
  60.  
  61. int num_done = 0;
  62.  
  63. int svr_handle_next(struct svr_xfer_state* state, bmi_context_id context);
  64. int svr_handle_next_mpi(struct svr_xfer_state* state);
  65. int client_handle_next_mpi(struct svr_xfer_state* state);
  66. int client_handle_next(struct svr_xfer_state* state, bmi_context_id context);
  67. int prepare_states(struct svr_xfer_state* state_array, PVFS_BMI_addr_t*
  68.     addr_array, struct bench_options* opts, int count, int svr_flag);
  69. int prepare_states_mpi(struct svr_xfer_state* state_array, int*
  70.     addr_array, struct bench_options* opts, int count, int svr_flag);
  71. int teardown_states(struct svr_xfer_state* state_array, PVFS_BMI_addr_t*
  72.     addr_array, struct bench_options* opts, int count, int svr_flag);
  73. int teardown_states_mpi(struct svr_xfer_state* state_array, int*
  74.     addr_array, struct bench_options* opts, int count, int svr_flag);
  75.  
  76. MPI_Request* request_array;
  77. int* index_array;
  78. MPI_Status* status_array;
  79. void** stupid_array;
  80. int request_count;
  81.  
  82. int main(
  83.     int argc,
  84.     char *argv[])
  85. {
  86.     int ret = -1;
  87.     int world_rank = 0;
  88.     MPI_Comm comm;
  89.     PVFS_BMI_addr_t *bmi_peer_array;
  90.     int *mpi_peer_array;
  91.     int num_clients;
  92.     struct bench_options opts;
  93.     int im_a_server = 0;
  94.     int num_messages = 0;
  95.     double bmi_time, mpi_time;
  96.     double max_bmi_time, max_mpi_time;
  97.     double min_bmi_time, min_mpi_time;
  98.     double sum_bmi_time, sum_mpi_time;
  99.     double sq_bmi_time, sq_mpi_time;
  100.     double sumsq_bmi_time, sumsq_mpi_time;
  101.     double var_bmi_time, var_mpi_time;
  102.     double stddev_bmi_time, stddev_mpi_time;
  103.     double agg_bmi_bw, agg_mpi_bw;
  104.     double ave_bmi_time, ave_mpi_time;
  105.     int total_data_xfer = 0;
  106.     bmi_context_id context = -1;
  107.     struct svr_xfer_state* state_array = NULL;
  108.     struct svr_xfer_state* tmp_state = NULL;
  109.     int num_requested = 0;
  110.     void* user_ptr_array[TESTCOUNT];
  111.     struct BMI_unexpected_info info_array[TESTCOUNT];
  112.     bmi_op_id_t id_array[TESTCOUNT];
  113.     bmi_error_code_t error_array[TESTCOUNT];
  114.     bmi_size_t size_array[TESTCOUNT];
  115.     int outcount = 0;
  116.     int indexer = 0;
  117.     int state_size;
  118.     int i;
  119.     double time1, time2;
  120.     int flag;
  121.     MPI_Status status;
  122.  
  123.     /* start up benchmark environment */
  124.     ret = bench_init(&opts, argc, argv, &num_clients, &world_rank, &comm,
  125.              &bmi_peer_array, &mpi_peer_array, &context);
  126.     if (ret < 0)
  127.     {
  128.     fprintf(stderr, "bench_init() failure.\n");
  129.     return (-1);
  130.     }
  131.  
  132.     /* note whether we are a "server" or not */
  133.     if (world_rank < opts.num_servers)
  134.     {
  135.     im_a_server = 1;
  136.     }
  137.  
  138.     num_messages = opts.total_len / MSG_SIZE;
  139.     if(opts.total_len%MSG_SIZE)
  140.         num_messages++;
  141.  
  142.     if(im_a_server)
  143.         state_size = num_clients;
  144.     else
  145.         state_size = opts.num_servers;
  146.  
  147.     /* allocate array to hold state of concurrent xfers */
  148.     state_array = (struct
  149.         svr_xfer_state*)malloc(state_size*sizeof(struct svr_xfer_state));
  150.     assert(state_array);
  151.     memset(state_array, 0, state_size*sizeof(struct svr_xfer_state));
  152.  
  153.     if(im_a_server)
  154.         prepare_states(state_array, bmi_peer_array, &opts, state_size, 1);
  155.     else
  156.         prepare_states(state_array, bmi_peer_array, &opts, state_size, 0);
  157.  
  158.     MPI_Barrier(MPI_COMM_WORLD);
  159.  
  160.     time1 = MPI_Wtime();
  161.     
  162.     if(im_a_server)
  163.     {
  164.         while(num_done < num_clients)
  165.         {
  166.             outcount = 0;
  167.             ret = BMI_testunexpected(TESTCOUNT, &outcount, info_array, 0);
  168.             assert(ret >= 0);
  169.             indexer = 0;
  170.             while(indexer < outcount)
  171.             {
  172.                 assert(info_array[indexer].error_code == 0);
  173.                 state_array[num_requested].addr = info_array[indexer].addr;
  174.                 state_array[num_requested].tag = info_array[indexer].tag;
  175.                 state_array[num_requested].unexp_buffer = info_array[indexer].buffer;
  176.                 state_array[num_requested].unexp_size = info_array[indexer].size;
  177.                 ret = svr_handle_next(&state_array[num_requested], context);
  178.                 assert(ret == 0);
  179.                 indexer++;
  180.                 num_requested++;
  181.             }
  182.  
  183.             outcount = 0;
  184.             ret = BMI_testcontext(TESTCOUNT, id_array, &outcount,
  185.                 error_array, size_array, user_ptr_array, 0, context);
  186.             assert(ret >= 0);
  187.             indexer = 0;
  188.             while(indexer < outcount)
  189.             {
  190.                 assert(error_array[indexer] == 0);
  191.                 tmp_state = user_ptr_array[indexer];
  192.                 ret = svr_handle_next(tmp_state, context);
  193.                 indexer++;
  194.             }
  195.         }
  196.     }
  197.     else
  198.     {
  199.         for(i=0; i< opts.num_servers; i++)
  200.         {
  201.             ret = client_handle_next(&state_array[i], context);
  202.             assert(ret == 0);
  203.         }
  204.  
  205.         while(num_done < opts.num_servers)
  206.         {
  207.             outcount = 0;
  208.             ret = BMI_testcontext(TESTCOUNT, id_array, &outcount,
  209.                 error_array, size_array, user_ptr_array, 0, context);
  210.             assert(ret >= 0);
  211.             indexer = 0;
  212.             while(indexer < outcount)
  213.             {
  214.                 assert(error_array[indexer] == 0);
  215.                 tmp_state = user_ptr_array[indexer];
  216.                 ret = client_handle_next(tmp_state, context);
  217.                 indexer++;
  218.             }
  219.         }
  220.     }
  221.     time2 = MPI_Wtime();
  222.     bmi_time = (time2-time1);
  223.  
  224.     MPI_Barrier(MPI_COMM_WORLD);
  225.  
  226.     if(im_a_server)
  227.         teardown_states(state_array, bmi_peer_array, &opts, state_size, 1);
  228.     else
  229.         teardown_states(state_array, bmi_peer_array, &opts, state_size, 0);
  230.  
  231.     if(im_a_server)
  232.     {
  233.         request_array =
  234.         (MPI_Request*)malloc(((num_messages+1)*num_clients)*sizeof(MPI_Request));
  235.         index_array =
  236.         (int*)malloc(((num_messages+1)*num_clients)*sizeof(int));
  237.         status_array =
  238.         (MPI_Status*)malloc(((num_messages+1)*num_clients)*sizeof(MPI_Status));
  239.         stupid_array =
  240.         (void**)malloc(((num_messages+1)*num_clients)*sizeof(void*));
  241.         assert(request_array);
  242.         assert(index_array);
  243.         assert(status_array);
  244.         assert(stupid_array);
  245.         prepare_states_mpi(state_array, mpi_peer_array, &opts, state_size, 1);
  246.     }
  247.     else
  248.     {
  249.         request_array =
  250.         (MPI_Request*)malloc(((num_messages+2)*num_clients)*sizeof(MPI_Request));
  251.         index_array =
  252.         (int*)malloc(((num_messages+2)*num_clients)*sizeof(int));
  253.         status_array =
  254.         (MPI_Status*)malloc(((num_messages+2)*num_clients)*sizeof(MPI_Status));
  255.         stupid_array =
  256.         (void**)malloc(((num_messages+2)*num_clients)*sizeof(void*));
  257.         assert(request_array);
  258.         assert(index_array);
  259.         assert(status_array);
  260.         assert(stupid_array);
  261.         prepare_states_mpi(state_array, mpi_peer_array, &opts, state_size, 0);
  262.     }
  263.  
  264.     fflush(NULL);
  265.     MPI_Barrier(MPI_COMM_WORLD);
  266.  
  267.     time1 = MPI_Wtime();
  268.  
  269.     num_done = 0;
  270.     num_requested = 0;
  271.     if(im_a_server)
  272.     {
  273.         while(num_done < num_clients)
  274.         {
  275.             /* check for requests (tag 0) */
  276.             ret = MPI_Iprobe(MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &flag,
  277.                 &status);
  278.             assert(ret == MPI_SUCCESS);
  279.             if(flag)
  280.             {
  281.                 int foo;
  282.                 /* got an "unexpected" message */
  283.                 state_array[num_requested].mpi_addr = status.MPI_SOURCE;
  284.                 state_array[num_requested].mpi_tag = status.MPI_TAG;
  285.                 MPI_Get_count(&status, MPI_BYTE, &foo);
  286.                     state_array[num_requested].mpi_unexp_size = foo;
  287.                 ret = svr_handle_next_mpi(&state_array[num_requested]);
  288.                 assert(ret == 0);
  289.                 num_requested++;
  290.             }
  291.             /* check for data progress */
  292.             ret = MPI_Testsome(request_count, request_array, &outcount, 
  293.                 index_array, status_array);
  294.             assert(ret == MPI_SUCCESS);
  295.             indexer = 0;
  296.             while(indexer < outcount)
  297.             {
  298.                 ret = svr_handle_next_mpi((struct svr_xfer_state*)stupid_array[index_array[indexer]]);
  299.                 assert(ret == 0);
  300.                 indexer++;
  301.             }
  302.         }
  303.     }
  304.     else
  305.     {
  306.         /* client */
  307.         for(i=0; i<opts.num_servers; i++)
  308.         {
  309.             ret = client_handle_next_mpi(&state_array[i]);
  310.             assert(ret == 0);
  311.         }
  312.  
  313.         while(num_done < opts.num_servers)
  314.         {
  315.             /* check for progress */
  316.             ret = MPI_Testsome(request_count, request_array, &outcount, 
  317.                 index_array, status_array);
  318.             assert(ret == MPI_SUCCESS);
  319.             indexer = 0;
  320.             while(indexer < outcount)
  321.             {
  322.                 ret = client_handle_next_mpi((struct svr_xfer_state*)stupid_array[index_array[indexer]]);
  323.                 assert(ret == 0);
  324.                 indexer++;
  325.             }
  326.         }
  327.     }
  328.  
  329.     time2 = MPI_Wtime();
  330.     mpi_time = time2-time1;
  331.     
  332.     MPI_Barrier(MPI_COMM_WORLD);
  333.  
  334.     if(im_a_server)
  335.         teardown_states_mpi(state_array, mpi_peer_array, &opts, state_size, 1);
  336.     else
  337.         teardown_states_mpi(state_array, mpi_peer_array, &opts, state_size, 0);
  338.     free(request_array);
  339.     free(status_array);
  340.     free(index_array);
  341.  
  342.     /* reduce seperately among clients and servers to get min 
  343.      * times and max times
  344.      */
  345.     MPI_Allreduce(&mpi_time, &max_mpi_time, 1, MPI_DOUBLE, MPI_MAX, comm);
  346.     MPI_Allreduce(&mpi_time, &min_mpi_time, 1, MPI_DOUBLE, MPI_MIN, comm);
  347.     MPI_Allreduce(&mpi_time, &sum_mpi_time, 1, MPI_DOUBLE, MPI_SUM, comm);
  348.     MPI_Allreduce(&bmi_time, &max_bmi_time, 1, MPI_DOUBLE, MPI_MAX, comm);
  349.     MPI_Allreduce(&bmi_time, &min_bmi_time, 1, MPI_DOUBLE, MPI_MIN, comm);
  350.     MPI_Allreduce(&bmi_time, &sum_bmi_time, 1, MPI_DOUBLE, MPI_SUM, comm);
  351.     sq_bmi_time = bmi_time * bmi_time;
  352.     sq_mpi_time = mpi_time * mpi_time;
  353.     MPI_Allreduce(&sq_bmi_time, &sumsq_bmi_time, 1, MPI_DOUBLE, MPI_SUM, comm);
  354.     MPI_Allreduce(&sq_mpi_time, &sumsq_mpi_time, 1, MPI_DOUBLE, MPI_SUM, comm);
  355.  
  356.     /* exactly one "client" and one "server" compute and print
  357.      * statistics
  358.      */
  359.     if (world_rank == 0)
  360.     {
  361.     bench_args_dump(&opts);
  362.  
  363.     total_data_xfer = opts.num_servers * num_clients * num_messages *
  364.         MSG_SIZE;
  365.     if (opts.num_servers > 1)
  366.     {
  367.         var_bmi_time = sumsq_bmi_time -
  368.         sum_bmi_time * sum_bmi_time / (double) (opts.num_servers);
  369.         var_mpi_time = sumsq_mpi_time -
  370.         sum_mpi_time * sum_mpi_time / (double) (opts.num_servers);
  371.     }
  372.     else
  373.     {
  374.         var_bmi_time = 0;
  375.         var_mpi_time = 0;
  376.     }
  377.     ave_bmi_time = sum_bmi_time / opts.num_servers;
  378.     ave_mpi_time = sum_mpi_time / opts.num_servers;
  379.     stddev_bmi_time = sqrt(var_bmi_time);
  380.     stddev_mpi_time = sqrt(var_mpi_time);
  381.     agg_bmi_bw = (double) total_data_xfer / max_bmi_time;
  382.     agg_mpi_bw = (double) total_data_xfer / max_mpi_time;
  383.  
  384.     printf
  385.         ("%d %d %f %f %f %f %f (msg_len,servers,min,max,ave,stddev,agg_MB/s) bmi server\n",
  386.          MSG_SIZE, opts.num_servers, min_bmi_time, max_bmi_time,
  387.          ave_bmi_time, stddev_bmi_time, agg_bmi_bw / (1024 * 1024));
  388.     printf
  389.         ("%d %d %f %f %f %f %f (msg_len,servers,min,max,ave,stddev,agg_MB/s) mpi server\n",
  390.          MSG_SIZE, opts.num_servers, min_mpi_time, max_mpi_time,
  391.          ave_mpi_time, stddev_mpi_time, agg_mpi_bw / (1024 * 1024));
  392.     }
  393.  
  394.     if (world_rank == opts.num_servers)
  395.     {
  396.     total_data_xfer = opts.num_servers * num_clients * num_messages *
  397.         MSG_SIZE;
  398.  
  399.     if (num_clients > 1)
  400.     {
  401.         var_bmi_time = sumsq_bmi_time -
  402.         sum_bmi_time * sum_bmi_time / (double) (num_clients);
  403.         var_mpi_time = sumsq_mpi_time -
  404.         sum_mpi_time * sum_mpi_time / (double) (num_clients);
  405.     }
  406.     else
  407.     {
  408.         var_bmi_time = 0;
  409.         var_mpi_time = 0;
  410.     }
  411.     stddev_bmi_time = sqrt(var_bmi_time);
  412.     stddev_mpi_time = sqrt(var_mpi_time);
  413.     agg_bmi_bw = (double) total_data_xfer / max_bmi_time;
  414.     agg_mpi_bw = (double) total_data_xfer / max_mpi_time;
  415.     ave_bmi_time = sum_bmi_time / num_clients;
  416.     ave_mpi_time = sum_mpi_time / num_clients;
  417.  
  418.     printf
  419.         ("%d %d %f %f %f %f %f (msg_len,clients,min,max,ave,stddev,agg_MB/s) bmi client\n",
  420.          MSG_SIZE, num_clients, min_bmi_time, max_bmi_time,
  421.          ave_bmi_time, stddev_bmi_time, agg_bmi_bw / (1024 * 1024));
  422.     printf
  423.         ("%d %d %f %f %f %f %f (msg_len,clients,min,max,ave,stddev,agg_MB/s) mpi client\n",
  424.          MSG_SIZE, num_clients, min_mpi_time, max_mpi_time,
  425.          ave_mpi_time, stddev_mpi_time, agg_mpi_bw / (1024 * 1024));
  426.     }
  427.  
  428.     /* shutdown interfaces */
  429.     BMI_close_context(context);
  430.     BMI_finalize();
  431.     MPI_Finalize();
  432.     return 0;
  433. }
  434.  
  435. int client_handle_next(struct svr_xfer_state* state, bmi_context_id context)
  436. {
  437.     int ret = 0;
  438.     bmi_op_id_t tmp_id;
  439.     bmi_size_t actual_size;
  440.     int i;
  441.  
  442.     switch(state->step)
  443.     {
  444.         case 0:
  445.             /* post recv for response */
  446.             ret = BMI_post_recv(&tmp_id, state->addr, state->resp,
  447.                 sizeof(struct response), &actual_size, BMI_PRE_ALLOC,
  448.                 state->tag, state, context);
  449.             if(ret < 0)
  450.             {
  451.                 PVFS_perror("BMI_post_recv", ret);
  452.             }
  453.             assert(ret == 0);
  454.  
  455.             /* send request */
  456.             ret = BMI_post_sendunexpected(&tmp_id, state->addr, state->req,
  457.                 sizeof(struct request), BMI_PRE_ALLOC, state->tag,
  458.                 state, context);
  459.             assert(ret >= 0);        
  460.             state->step++;
  461.             if(ret == 1)
  462.                 return(client_handle_next(state, context));
  463.             else
  464.                 return(0);
  465.             break;
  466.  
  467.         case 1:
  468.             /* send completed */
  469.             state->step++;
  470.             return(0);
  471.             break;
  472.  
  473.         default:
  474.  
  475.             /* recv completed (resp or bulk) */
  476.             state->step++;
  477.             if(state->step == (state->buffer_array_size + 3))
  478.             {
  479.                 num_done++;
  480.                 return(0);
  481.             }
  482.  
  483.             for(i=0; i<state->list_factor; i++)
  484.             {
  485.                 state->buffer_list[i] = (char *) state->buffer_array[state->step-3] +
  486.                     ((MSG_SIZE/state->list_factor)*i);
  487.             }
  488.             ret = BMI_post_recv_list(&tmp_id, state->addr,
  489.                 state->buffer_list, state->size_list, state->list_factor,
  490.                 MSG_SIZE, &actual_size, BMI_PRE_ALLOC,
  491.                 state->tag, state, context);
  492.             assert(ret >= 0);
  493.             if(ret == 1)
  494.                 return(client_handle_next(state, context));
  495.             else
  496.                 return(0);
  497.  
  498.         break;
  499.     }
  500.  
  501.     return(0);
  502. };
  503.  
  504. int svr_handle_next(struct svr_xfer_state* state, bmi_context_id context)
  505. {
  506.     int ret = 0;
  507.     bmi_op_id_t tmp_id;
  508.     int i;
  509.  
  510.     switch(state->step)
  511.     {
  512.         case 0:
  513.             /* received a request */
  514.             BMI_unexpected_free(state->addr, state->unexp_buffer);
  515.             /* post a response send */
  516.             ret = BMI_post_send(&tmp_id, state->addr, state->resp,
  517.                 sizeof(struct response), BMI_PRE_ALLOC, state->tag,
  518.                 state, context);
  519.             assert(ret >= 0);
  520.             state->step++;
  521.             if(ret == 1)
  522.                 return(svr_handle_next(state, context));
  523.             else
  524.                 return(0);
  525.             break;
  526.  
  527.         case 1:
  528.             /* response send completed */
  529.             state->step++;
  530.  
  531.             /* post data sends */
  532.             for(i=0; i<state->buffer_array_size; i++)
  533.             {
  534.                 ret = BMI_post_send(&tmp_id, state->addr,
  535.                     state->buffer_array[i], MSG_SIZE, BMI_PRE_ALLOC,
  536.                     state->tag, state, context);
  537.                 assert(ret >= 0);
  538.                 if(ret == 1)
  539.                     state->step++;
  540.             }
  541.  
  542.             if(state->step == (state->buffer_array_size + 2))
  543.                 num_done++;
  544.             return(0);
  545.             break;
  546.  
  547.         default:
  548.             state->step++;
  549.             if(state->step == (state->buffer_array_size + 2))
  550.             {
  551.                 num_done++;
  552.             }
  553.             return(0);
  554.             break;
  555.     }
  556.     
  557.     return(0);
  558. };
  559.  
  560. int prepare_states(struct svr_xfer_state* state_array, PVFS_BMI_addr_t*
  561.     addr_array, struct bench_options* opts, int count, int svr_flag)
  562. {
  563.     int i,j;
  564.  
  565.     if(svr_flag == 0)
  566.     {
  567.         /* CLIENT */
  568.         for(i=0; i<count; i++)
  569.         {
  570.             state_array[i].addr = addr_array[i];
  571.             state_array[i].tag = 0;
  572.             /* allocate request */
  573.             state_array[i].req = BMI_memalloc(addr_array[i],
  574.                 sizeof(struct request), BMI_SEND);
  575.             assert(state_array[i].req);
  576.             /* allocate response */
  577.             state_array[i].resp = BMI_memalloc(addr_array[i],
  578.                 sizeof(struct response), BMI_RECV);
  579.             assert(state_array[i].resp);
  580.  
  581.             /* allocate array of buffers for bulk transfer */
  582.             state_array[i].buffer_array_size = opts->total_len/MSG_SIZE;
  583.             if(opts->total_len%MSG_SIZE)
  584.                 state_array[i].buffer_array_size++;
  585.             state_array[i].buffer_array =
  586.                 (void**)malloc(state_array[i].buffer_array_size*sizeof(void*));
  587.             assert(state_array[i].buffer_array);
  588.             for(j=0; j<state_array[i].buffer_array_size; j++)
  589.             {
  590.                 state_array[i].buffer_array[j] =
  591.                    malloc(MSG_SIZE);
  592.                 assert(state_array[i].buffer_array[j]);
  593.             }
  594.  
  595.             /* setup scratch area for list transfers */
  596.             state_array[i].list_factor = opts->list_io_factor;
  597.             state_array[i].buffer_list =
  598.                 (void**)malloc(opts->list_io_factor*sizeof(void*));
  599.             state_array[i].size_list =
  600.                 (bmi_size_t*)malloc(opts->list_io_factor*sizeof(bmi_size_t));
  601.             assert(state_array[i].buffer_list);
  602.             assert(state_array[i].size_list);
  603.             /* preset size list */
  604.             for(j=0; j<opts->list_io_factor; j++)
  605.             {
  606.                 state_array[i].size_list[j] = MSG_SIZE/opts->list_io_factor;
  607.             }
  608.             state_array[i].size_list[opts->list_io_factor-1] +=
  609.                 (MSG_SIZE%opts->list_io_factor);
  610.  
  611.         }
  612.     }
  613.     else
  614.     {
  615.         /* SERVER */
  616.         for(i=0; i<count; i++)
  617.         {
  618.             /* allocate response */
  619.             state_array[i].resp = BMI_memalloc(addr_array[i],
  620.                 sizeof(struct response), BMI_SEND);
  621.             assert(state_array[i].resp);
  622.  
  623.             /* allocate array of buffers for bulk transfer */
  624.             state_array[i].buffer_array_size = opts->total_len/MSG_SIZE;
  625.             if(opts->total_len%MSG_SIZE)
  626.                 state_array[i].buffer_array_size++;
  627.             state_array[i].buffer_array =
  628.                 (void**)malloc(state_array[i].buffer_array_size*sizeof(void*));
  629.             assert(state_array[i].buffer_array);
  630.             for(j=0; j<state_array[i].buffer_array_size; j++)
  631.             {
  632.                 state_array[i].buffer_array[j] =
  633.                     BMI_memalloc(addr_array[i], MSG_SIZE, BMI_SEND);
  634.                 assert(state_array[i].buffer_array[j]);
  635.             }
  636.         }
  637.     }
  638.  
  639.     return(0);
  640. }
  641.  
  642. int teardown_states(struct svr_xfer_state* state_array, PVFS_BMI_addr_t*
  643.     addr_array, struct bench_options* opts, int count, int svr_flag)
  644. {
  645.     int i,j;
  646.  
  647.     if(svr_flag == 0)
  648.     {
  649.         /* client */
  650.         for(i=0; i<count; i++)
  651.         {
  652.             /* free request */
  653.             BMI_memfree(addr_array[i], state_array[i].req, 
  654.                 sizeof(struct request), BMI_SEND);
  655.             /* free response */
  656.             BMI_memfree(addr_array[i], state_array[i].resp, 
  657.                 sizeof(struct request), BMI_RECV);
  658.             /* free data buffers */
  659.             for(j=0; j<state_array[i].buffer_array_size; j++)
  660.             {
  661.                 free(state_array[i].buffer_array[j]);
  662.             }
  663.             free(state_array[i].buffer_list);
  664.         }
  665.     }
  666.     else
  667.     {
  668.         /* server */
  669.         for(i=0; i<count; i++)
  670.         {
  671.             /* free response */
  672.             BMI_memfree(addr_array[i], state_array[i].resp, 
  673.                 sizeof(struct request), BMI_SEND);
  674.             /* free data buffers */
  675.             for(j=0; j<state_array[i].buffer_array_size; j++)
  676.             {
  677.                 BMI_memfree(addr_array[i], 
  678.                     state_array[i].buffer_array[j], MSG_SIZE, BMI_SEND);
  679.             }
  680.         }
  681.     }
  682.  
  683.     return(0);
  684. }
  685.  
  686.  
  687. int svr_handle_next_mpi(struct svr_xfer_state* state)
  688. {
  689.     int ret = 0;
  690.     MPI_Status status;
  691.     int i;
  692.  
  693.     switch(state->step)
  694.     {
  695.         case 0:
  696.             /* received a request */ 
  697.             /* need actual blocking receive to pull in what iprobe saw */
  698.             state->mpi_unexp_buffer = (void*)malloc(state->mpi_unexp_size);
  699.             assert(state->mpi_unexp_buffer);
  700.             ret = MPI_Recv(state->mpi_unexp_buffer, state->mpi_unexp_size, 
  701.                 MPI_BYTE, state->mpi_addr, state->mpi_tag, MPI_COMM_WORLD,
  702.                 &status);
  703.             assert(ret == MPI_SUCCESS);
  704.             assert(status.MPI_ERROR == MPI_SUCCESS);
  705.             free(state->mpi_unexp_buffer);
  706.  
  707.             /* post a response send */
  708.             stupid_array[request_count] = state;
  709.             ret = MPI_Isend(state->resp, sizeof(struct response), MPI_BYTE,
  710.                 state->mpi_addr, 0, MPI_COMM_WORLD,
  711.                 &request_array[request_count]);
  712.             request_count++;
  713.             assert(ret == MPI_SUCCESS);
  714.  
  715.             state->step++;
  716.             return(0);
  717.             break;
  718.         case 1:
  719.             /* response send completed */
  720.             state->step++;
  721.  
  722.             /* post data sends */
  723.             for(i=0; i<state->buffer_array_size; i++)
  724.             {
  725.                 stupid_array[request_count] = state;
  726.                 ret = MPI_Isend(state->buffer_array[i],
  727.                     MSG_SIZE, MPI_BYTE, state->mpi_addr, 0, MPI_COMM_WORLD,
  728.                     &request_array[request_count]);
  729.                 request_count++;
  730.                 assert(ret == MPI_SUCCESS);
  731.             }
  732.  
  733.             return(0);
  734.             break;
  735.  
  736.         default:
  737.             state->step++;
  738.             if(state->step == (state->buffer_array_size + 2))
  739.             {
  740.                 num_done++;
  741.             }
  742.             return(0);
  743.             break;
  744.     }
  745.  
  746.     assert(0);
  747.     return(0);
  748. }
  749.  
  750. int client_handle_next_mpi(struct svr_xfer_state* state)
  751. {
  752.  
  753.     int ret = 0;
  754.  
  755.     switch(state->step)
  756.     {
  757.         case 0:
  758.             /* post recv for response */
  759.             stupid_array[request_count] = state;
  760.             ret = MPI_Irecv(state->resp, sizeof(struct response), MPI_BYTE,
  761.                 state->mpi_addr, 0, MPI_COMM_WORLD,
  762.                 &request_array[request_count]);
  763.             request_count++;
  764.             assert(ret == MPI_SUCCESS);
  765.  
  766.             /* send request */
  767.             stupid_array[request_count] = state;
  768.             ret = MPI_Isend(state->req, sizeof(struct request), MPI_BYTE,
  769.                 state->mpi_addr, 0, MPI_COMM_WORLD,
  770.                 &request_array[request_count]);
  771.             request_count++;
  772.             assert(ret == MPI_SUCCESS);
  773.             state->step++;
  774.             return(0);
  775.             break;
  776.  
  777.         case 1:
  778.             /* send completed */
  779.             state->step++;
  780.             return(0);
  781.             break;
  782.         default:
  783.             /* recv completed (resp or bulk) */
  784.             state->step++;
  785.             if(state->step == (state->buffer_array_size + 3))
  786.             {
  787.                 num_done++;
  788.                 return(0);
  789.             }
  790.  
  791.             stupid_array[request_count] = state;
  792.             ret = MPI_Irecv(state->buffer_array[state->step-3], 
  793.                 1, state->dtype,
  794.                 state->mpi_addr, 0, MPI_COMM_WORLD,
  795.                 &request_array[request_count]);
  796.             request_count++;
  797.             assert(ret == MPI_SUCCESS);
  798.             return(0);
  799.             break;
  800.     }
  801.  
  802.     assert(0);
  803.     return(0);
  804. }
  805.  
  806. int prepare_states_mpi(struct svr_xfer_state* state_array, int*
  807.     addr_array, struct bench_options* opts, int count, int svr_flag)
  808. {
  809.     int i,j;
  810.     int ret;
  811.  
  812.     if(svr_flag == 0)
  813.     {
  814.         /* CLIENT */
  815.         for(i=0; i<count; i++)
  816.         {
  817.             state_array[i].mpi_addr = addr_array[i];
  818.             state_array[i].mpi_tag = 0;
  819.             state_array[i].step = 0;
  820.             /* allocate request and response */
  821.             state_array[i].req = (struct request*)malloc(sizeof(struct
  822.                 request));
  823.             state_array[i].resp = (struct response*)malloc(sizeof(struct
  824.                 response));
  825.             assert(state_array[i].req); 
  826.             assert(state_array[i].resp); 
  827.  
  828.             /* allocate array of buffers for bulk transfer */
  829.             state_array[i].buffer_array_size = opts->total_len/MSG_SIZE;
  830.             if(opts->total_len%MSG_SIZE)
  831.                 state_array[i].buffer_array_size++;
  832.             state_array[i].buffer_array =
  833.                 (void**)malloc(state_array[i].buffer_array_size*sizeof(void*));
  834.             assert(state_array[i].buffer_array);
  835.             for(j=0; j<state_array[i].buffer_array_size; j++)
  836.             {
  837.                 state_array[i].buffer_array[j] =
  838.                    malloc(MSG_SIZE);
  839.                 assert(state_array[i].buffer_array[j]);
  840.             }
  841.  
  842.             /* set up data types */
  843.             state_array[i].displace_array = 
  844.                 (MPI_Aint*)malloc(opts->list_io_factor* sizeof(MPI_Aint));
  845.             assert(state_array[i].displace_array);
  846.             state_array[i].blocklen_array = 
  847.                 (int*)malloc(opts->list_io_factor* sizeof(int));
  848.             assert(state_array[i].blocklen_array);
  849.             for(j=0; j<opts->list_io_factor; j++)
  850.             {
  851.                 state_array[i].blocklen_array[j] = state_array[i].size_list[j];
  852.             }
  853.             state_array[i].displace_array[0] = 0;
  854.             for(j=1; j<opts->list_io_factor; j++)
  855.             {
  856.                 state_array[i].displace_array[j] = state_array[i].size_list[j-1];
  857.             }
  858.             ret = MPI_Type_hindexed(opts->list_io_factor, 
  859.                 state_array[i].blocklen_array, state_array[i].displace_array,
  860.                 MPI_BYTE, &state_array[i].dtype);
  861.             assert(ret == MPI_SUCCESS);
  862.             ret = MPI_Type_commit(&state_array[i].dtype);
  863.             assert(ret == MPI_SUCCESS);
  864.         }
  865.     }
  866.     else
  867.     {
  868.         /* SERVER */
  869.         for(i=0; i<count; i++)
  870.         {
  871.             state_array[i].step = 0;
  872.             /* allocate response */
  873.             state_array[i].resp = (struct response*)malloc(sizeof(struct
  874.                 response));
  875.             assert(state_array[i].resp);
  876.  
  877.             /* allocate array of buffers for bulk transfer */
  878.             state_array[i].buffer_array_size = opts->total_len/MSG_SIZE;
  879.             if(opts->total_len%MSG_SIZE)
  880.                 state_array[i].buffer_array_size++;
  881.             state_array[i].buffer_array =
  882.                 (void**)malloc(state_array[i].buffer_array_size*sizeof(void*));
  883.             assert(state_array[i].buffer_array);
  884.             for(j=0; j<state_array[i].buffer_array_size; j++)
  885.             {
  886.                 state_array[i].buffer_array[j] =
  887.                    malloc(MSG_SIZE);
  888.                 assert(state_array[i].buffer_array[j]);
  889.             }
  890.         }
  891.     }
  892.     return(0);
  893. }
  894.  
  895. int teardown_states_mpi(struct svr_xfer_state* state_array, int*
  896.     addr_array, struct bench_options* opts, int count, int svr_flag)
  897. {
  898.     /* TODO: fill this in */
  899.     return(0);
  900. }
  901.  
  902. /*
  903.  * Local variables:
  904.  *  c-indent-level: 4
  905.  *  c-basic-offset: 4
  906.  * End:
  907.  *
  908.  * vim: ts=8 sts=4 sw=4 expandtab
  909.  */
  910.