home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / test / io / bmi / driver-bw-multi.c < prev    next >
C/C++ Source or Header  |  2006-08-11  |  26KB  |  1,041 lines

  1. /*
  2.  * (C) 2001 Clemson University and The University of Chicago
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <unistd.h>
  10. #include <errno.h>
  11. #include "gossip.h"
  12. #include <mpi.h>
  13. #include "bmi.h"
  14. #include "bench-initialize.h"
  15. #include "bench-args.h"
  16. #include "bench-mem.h"
  17.  
  18. static int bmi_server_postall(
  19.     struct bench_options *opts,
  20.     struct mem_buffers *bmi_buf_array,
  21.     int num_clients,
  22.     PVFS_BMI_addr_t * addr_array,
  23.     enum bmi_buffer_type buffer_type,
  24.     double *wtime,
  25.     int world_rank,
  26.     bmi_context_id context);
  27. static int bmi_client_postall(
  28.     struct bench_options *opts,
  29.     struct mem_buffers *bmi_buf_array,
  30.     int num_servers,
  31.     PVFS_BMI_addr_t * addr_array,
  32.     enum bmi_buffer_type buffer_type,
  33.     double *wtime,
  34.     int world_rank,
  35.     bmi_context_id context);
  36. static int mpi_client_postall(
  37.     struct bench_options *opts,
  38.     struct mem_buffers *mpi_buf_array,
  39.     int num_servers,
  40.     int *addr_array,
  41.     double *wtime,
  42.     int world_rank);
  43. static int mpi_server_postall(
  44.     struct bench_options *opts,
  45.     struct mem_buffers *mpi_buf_array,
  46.     int num_clients,
  47.     int *addr_array,
  48.     double *wtime,
  49.     int world_rank);
  50.  
  51. int main(
  52.     int argc,
  53.     char *argv[])
  54. {
  55.     int ret = -1;
  56.     int world_rank = 0;
  57.     MPI_Comm comm;
  58.     PVFS_BMI_addr_t *bmi_peer_array;
  59.     int *mpi_peer_array;
  60.     int num_clients;
  61.     struct bench_options opts;
  62.     int i = 0;
  63.     enum bmi_buffer_type buffer_type = BMI_EXT_ALLOC;
  64.     struct mem_buffers *mpi_buf_array = NULL;
  65.     struct mem_buffers *bmi_buf_array = NULL;
  66.     int im_a_server = 0;
  67.     int num_messages = 0;
  68.     double bmi_time, mpi_time;
  69.     double max_bmi_time, max_mpi_time;
  70.     double min_bmi_time, min_mpi_time;
  71.     double sum_bmi_time, sum_mpi_time;
  72.     double sq_bmi_time, sq_mpi_time;
  73.     double sumsq_bmi_time, sumsq_mpi_time;
  74.     double var_bmi_time, var_mpi_time;
  75.     double stddev_bmi_time, stddev_mpi_time;
  76.     double agg_bmi_bw, agg_mpi_bw;
  77.     double ave_bmi_time, ave_mpi_time;
  78.     double total_data_xfer = 0;
  79.     bmi_context_id context = -1;
  80.  
  81.     /* start up benchmark environment */
  82.     ret = bench_init(&opts, argc, argv, &num_clients, &world_rank, &comm,
  83.              &bmi_peer_array, &mpi_peer_array, &context);
  84.     if (ret < 0)
  85.     {
  86.     fprintf(stderr, "bench_init() failure.\n");
  87.     return (-1);
  88.     }
  89.  
  90.     /* note whether we are a "server" or not */
  91.     if (world_rank < opts.num_servers)
  92.     {
  93.     im_a_server = 1;
  94.     }
  95.  
  96.     num_messages = opts.total_len / opts.message_len;
  97.  
  98.     /* setup buffers */
  99.     if (im_a_server)
  100.     {
  101.     /* allocate array to track buffer sets */
  102.     mpi_buf_array = (struct mem_buffers *) malloc(num_clients *
  103.                               sizeof(struct
  104.                                  mem_buffers));
  105.     bmi_buf_array =
  106.         (struct mem_buffers *) malloc(num_clients *
  107.                       sizeof(struct mem_buffers));
  108.     if (!mpi_buf_array || !bmi_buf_array)
  109.     {
  110.         fprintf(stderr, "malloc failure.\n");
  111.         return (-1);
  112.     }
  113.  
  114.     /* actually allocate buffers */
  115.     for (i = 0; i < num_clients; i++)
  116.     {
  117.         if (opts.flags & BMI_ALLOCATE_MEMORY)
  118.         {
  119.         buffer_type = BMI_PRE_ALLOC;
  120.         ret = BMI_alloc_buffers(&(bmi_buf_array[i]), num_messages,
  121.                     opts.message_len, bmi_peer_array[i],
  122.                     BMI_RECV);
  123.         }
  124.         else
  125.         {
  126.         ret = alloc_buffers(&(bmi_buf_array[i]), num_messages,
  127.                     opts.message_len);
  128.         }
  129.  
  130.         ret += alloc_buffers(&(mpi_buf_array[i]), num_messages,
  131.                  opts.message_len);
  132.  
  133.         if (ret < 0)
  134.         {
  135.         fprintf(stderr, "alloc_buffers failure.\n");
  136.         return (-1);
  137.         }
  138.     }
  139.     }
  140.     else
  141.     {
  142.     /* allocate array to track buffer sets */
  143.     mpi_buf_array = (struct mem_buffers *) malloc(opts.num_servers *
  144.                               sizeof(struct
  145.                                  mem_buffers));
  146.     bmi_buf_array =
  147.         (struct mem_buffers *) malloc(opts.num_servers *
  148.                       sizeof(struct mem_buffers));
  149.     if (!mpi_buf_array || !bmi_buf_array)
  150.     {
  151.         fprintf(stderr, "malloc failure.\n");
  152.         return (-1);
  153.     }
  154.  
  155.     /* actually allocate buffers */
  156.     for (i = 0; i < opts.num_servers; i++)
  157.     {
  158.         if (opts.flags & BMI_ALLOCATE_MEMORY)
  159.         {
  160.         buffer_type = BMI_PRE_ALLOC;
  161.         ret = BMI_alloc_buffers(&(bmi_buf_array[i]), num_messages,
  162.                     opts.message_len, bmi_peer_array[i],
  163.                     BMI_SEND);
  164.         }
  165.         else
  166.         {
  167.         ret = alloc_buffers(&(bmi_buf_array[i]), num_messages,
  168.                     opts.message_len);
  169.         }
  170.  
  171.         ret += alloc_buffers(&(mpi_buf_array[i]), num_messages,
  172.                  opts.message_len);
  173.  
  174.         if (ret < 0)
  175.         {
  176.         fprintf(stderr, "alloc_buffers failure.\n");
  177.         return (-1);
  178.         }
  179.  
  180.         /* only the "client" marks its buffers */
  181.         ret = mark_buffers(&(bmi_buf_array[i]));
  182.         ret += mark_buffers(&(mpi_buf_array[i]));
  183.         if (ret < 0)
  184.         {
  185.         fprintf(stderr, "mark_buffers() failure.\n");
  186.         return (-1);
  187.         }
  188.     }
  189.     }
  190.  
  191.     /********************************************************/
  192.     /* Actually measure some stuff */
  193.  
  194.     if (im_a_server)
  195.     {
  196.     ret = bmi_server_postall(&opts, bmi_buf_array, num_clients,
  197.                  bmi_peer_array, buffer_type, &bmi_time,
  198.                  world_rank, context);
  199.     }
  200.     else
  201.     {
  202.     ret = bmi_client_postall(&opts, bmi_buf_array, opts.num_servers,
  203.                  bmi_peer_array, buffer_type, &bmi_time,
  204.                  world_rank, context);
  205.     }
  206.     if (ret < 0)
  207.     {
  208.     fprintf(stderr, "failure in main routine, MPI task %d.\n", world_rank);
  209.     return (-1);
  210.     }
  211.  
  212.     MPI_Barrier(MPI_COMM_WORLD);
  213.  
  214.     if (im_a_server)
  215.     {
  216.     ret = mpi_server_postall(&opts, mpi_buf_array, num_clients,
  217.                  mpi_peer_array, &mpi_time, world_rank);
  218.     }
  219.     else
  220.     {
  221.     ret = mpi_client_postall(&opts, mpi_buf_array, opts.num_servers,
  222.                  mpi_peer_array, &mpi_time, world_rank);
  223.     }
  224.  
  225.     if (ret < 0)
  226.     {
  227.     fprintf(stderr, "failure in main routine, MPI task %d.\n", world_rank);
  228.     return (-1);
  229.     }
  230.  
  231.     MPI_Barrier(MPI_COMM_WORLD);
  232.  
  233.     /********************************************************/
  234.     /* Done measuring */
  235. #if 0
  236.     /* server verifies buffers that it receives */
  237.     if (!(opts.flags & REUSE_BUFFERS) && im_a_server)
  238.     {
  239.     for (i = 0; i < num_clients; i++)
  240.     {
  241.         ret = check_buffers(&(mpi_buf_array[i]));
  242.         if (ret < 0)
  243.         {
  244.         fprintf(stderr, "*********************************\n");
  245.         fprintf(stderr, "MPI buffer verification failed.\n");
  246.         return (-1);
  247.         }
  248.         ret = check_buffers(&(bmi_buf_array[i]));
  249.         if (ret < 0)
  250.         {
  251.         fprintf(stderr, "**********************************\n");
  252.         fprintf(stderr, "BMI buffer verification failed.\n");
  253.         return (-1);
  254.         }
  255.     }
  256.     }
  257. #endif
  258.     /* release buffers */
  259.     if (im_a_server)
  260.     {
  261.     for (i = 0; i < num_clients; i++)
  262.     {
  263.         free_buffers(&(mpi_buf_array[i]));
  264.         if (opts.flags & BMI_ALLOCATE_MEMORY)
  265.         {
  266.         BMI_free_buffers(&(bmi_buf_array[i]), bmi_peer_array[i],
  267.                  BMI_RECV);
  268.         }
  269.         else
  270.         {
  271.         free_buffers(&(bmi_buf_array[i]));
  272.         }
  273.     }
  274.     }
  275.     else
  276.     {
  277.     for (i = 0; i < opts.num_servers; i++)
  278.     {
  279.         free_buffers(&(mpi_buf_array[i]));
  280.         if (opts.flags & BMI_ALLOCATE_MEMORY)
  281.         {
  282.         BMI_free_buffers(&(bmi_buf_array[i]), bmi_peer_array[i],
  283.                  BMI_SEND);
  284.         }
  285.         else
  286.         {
  287.         free_buffers(&(bmi_buf_array[i]));
  288.         }
  289.     }
  290.     }
  291.  
  292.     /* reduce seperately among clients and servers to get min 
  293.      * times and max times
  294.      */
  295.     MPI_Allreduce(&mpi_time, &max_mpi_time, 1, MPI_DOUBLE, MPI_MAX, comm);
  296.     MPI_Allreduce(&mpi_time, &min_mpi_time, 1, MPI_DOUBLE, MPI_MIN, comm);
  297.     MPI_Allreduce(&mpi_time, &sum_mpi_time, 1, MPI_DOUBLE, MPI_SUM, comm);
  298.     MPI_Allreduce(&bmi_time, &max_bmi_time, 1, MPI_DOUBLE, MPI_MAX, comm);
  299.     MPI_Allreduce(&bmi_time, &min_bmi_time, 1, MPI_DOUBLE, MPI_MIN, comm);
  300.     MPI_Allreduce(&bmi_time, &sum_bmi_time, 1, MPI_DOUBLE, MPI_SUM, comm);
  301.     sq_bmi_time = bmi_time * bmi_time;
  302.     sq_mpi_time = mpi_time * mpi_time;
  303.     MPI_Allreduce(&sq_bmi_time, &sumsq_bmi_time, 1, MPI_DOUBLE, MPI_SUM, comm);
  304.     MPI_Allreduce(&sq_mpi_time, &sumsq_mpi_time, 1, MPI_DOUBLE, MPI_SUM, comm);
  305.  
  306.     /* do this first to get nice output ordering */
  307.     if (world_rank == 0) {
  308.     bench_args_dump(&opts);
  309.         fflush(stdout);
  310.     }
  311.     MPI_Barrier(MPI_COMM_WORLD);
  312.  
  313.     /* exactly one "client" and one "server" compute and print
  314.      * statistics
  315.      */
  316.     if (world_rank == 0)
  317.     {
  318.     total_data_xfer = (double)opts.num_servers * (double)num_clients *
  319.             (double)num_messages * (double)opts.message_len;
  320.     if (opts.num_servers > 1)
  321.     {
  322.         var_bmi_time = sumsq_bmi_time -
  323.         sum_bmi_time * sum_bmi_time / (double) (opts.num_servers);
  324.         var_mpi_time = sumsq_mpi_time -
  325.         sum_mpi_time * sum_mpi_time / (double) (opts.num_servers);
  326.     }
  327.     else
  328.     {
  329.         var_bmi_time = 0;
  330.         var_mpi_time = 0;
  331.     }
  332.     ave_bmi_time = sum_bmi_time / opts.num_servers;
  333.     ave_mpi_time = sum_mpi_time / opts.num_servers;
  334.     stddev_bmi_time = sqrt(var_bmi_time);
  335.     stddev_mpi_time = sqrt(var_mpi_time);
  336.     agg_bmi_bw = (double) total_data_xfer / max_bmi_time;
  337.     agg_mpi_bw = (double) total_data_xfer / max_mpi_time;
  338.  
  339.     printf
  340.         ("%d %d %f %f %f %f %f (msg_len,servers,min,max,ave,stddev,agg_MB/s) bmi server\n",
  341.          opts.message_len, opts.num_servers, min_bmi_time, max_bmi_time,
  342.          ave_bmi_time, stddev_bmi_time, agg_bmi_bw / (1024 * 1024));
  343.     printf
  344.         ("%d %d %f %f %f %f %f (msg_len,servers,min,max,ave,stddev,agg_MB/s) mpi server\n",
  345.          opts.message_len, opts.num_servers, min_mpi_time, max_mpi_time,
  346.          ave_mpi_time, stddev_mpi_time, agg_mpi_bw / (1024 * 1024));
  347.     }
  348.  
  349.     /* enforce output ordering */
  350.     fflush(stdout);
  351.     MPI_Barrier(MPI_COMM_WORLD);
  352.  
  353.     if (world_rank == opts.num_servers)
  354.     {
  355.     total_data_xfer = (double)opts.num_servers * (double)num_clients *
  356.             (double)num_messages * (double)opts.message_len;
  357.  
  358.     if (num_clients > 1)
  359.     {
  360.         var_bmi_time = sumsq_bmi_time -
  361.         sum_bmi_time * sum_bmi_time / (double) (num_clients);
  362.         var_mpi_time = sumsq_mpi_time -
  363.         sum_mpi_time * sum_mpi_time / (double) (num_clients);
  364.     }
  365.     else
  366.     {
  367.         var_bmi_time = 0;
  368.         var_mpi_time = 0;
  369.     }
  370.     stddev_bmi_time = sqrt(var_bmi_time);
  371.     stddev_mpi_time = sqrt(var_mpi_time);
  372.     agg_bmi_bw = (double) total_data_xfer / max_bmi_time;
  373.     agg_mpi_bw = (double) total_data_xfer / max_mpi_time;
  374.     ave_bmi_time = sum_bmi_time / num_clients;
  375.     ave_mpi_time = sum_mpi_time / num_clients;
  376.  
  377.     printf
  378.         ("%d %d %f %f %f %f %f (msg_len,clients,min,max,ave,stddev,agg_MB/s) bmi client\n",
  379.          opts.message_len, num_clients, min_bmi_time, max_bmi_time,
  380.          ave_bmi_time, stddev_bmi_time, agg_bmi_bw / (1024 * 1024));
  381.     printf
  382.         ("%d %d %f %f %f %f %f (msg_len,clients,min,max,ave,stddev,agg_MB/s) mpi client\n",
  383.          opts.message_len, num_clients, min_mpi_time, max_mpi_time,
  384.          ave_mpi_time, stddev_mpi_time, agg_mpi_bw / (1024 * 1024));
  385.     }
  386.  
  387. #if 0
  388.     MPI_Allreduce(&read_tim, &max_read_tim, 1, MPI_DOUBLE, MPI_MAX,
  389.           MPI_COMM_WORLD);
  390.     MPI_Allreduce(&read_tim, &min_read_tim, 1, MPI_DOUBLE, MPI_MIN,
  391.           MPI_COMM_WORLD);
  392.     MPI_Allreduce(&read_tim, &sum_read_tim, 1, MPI_DOUBLE, MPI_SUM,
  393.           MPI_COMM_WORLD);
  394.  
  395.     ret = MPI_Comm_size(comm, &comm_size);
  396.     if (ret != MPI_SUCCESS)
  397.     {
  398.     fprintf(stderr, "Comm_size failure.\n");
  399.     return (-1);
  400.     }
  401.     printf("comm size: %d\n", comm_size);
  402. #endif
  403.     /* shutdown interfaces */
  404.     BMI_close_context(context);
  405.     BMI_finalize();
  406.     MPI_Finalize();
  407.     return 0;
  408. }
  409.  
  410.  
  411. static int bmi_server_postall(
  412.     struct bench_options *opts,
  413.     struct mem_buffers *bmi_buf_array,
  414.     int num_clients,
  415.     PVFS_BMI_addr_t * addr_array,
  416.     enum bmi_buffer_type buffer_type,
  417.     double *wtime,
  418.     int world_rank,
  419.     bmi_context_id context)
  420. {
  421.     double time1, time2;
  422.     int i, j;
  423.     int **done;
  424.     int *done_index;
  425.     bmi_op_id_t **ids;
  426.     int num_buffers = bmi_buf_array[0].num_buffers;
  427.     void *recv_buffer;
  428.     int ret;
  429.     bmi_size_t actual_size;
  430.     int outcount;
  431.     bmi_op_id_t *id_array;
  432.     int *index_array;
  433.     bmi_error_code_t *error_code_array;
  434.     bmi_size_t *actual_size_array;
  435.     int done_clients = 0;
  436.  
  437.     /* allocate a lot of arrays to keep up with what message we are
  438.      * on for each peer
  439.      */
  440.     done = (int **) malloc(num_clients * sizeof(int *));
  441.     done_index = (int *) malloc(num_clients * sizeof(int));
  442.     ids = (bmi_op_id_t **) malloc(num_clients * sizeof(bmi_op_id_t *));
  443.     id_array = (bmi_op_id_t *) malloc(num_clients * sizeof(bmi_op_id_t));
  444.     actual_size_array = (bmi_size_t *) malloc(num_clients * sizeof(bmi_size_t));
  445.     error_code_array = (bmi_error_code_t *) malloc(num_clients *
  446.                            sizeof(bmi_error_code_t));
  447.     index_array = (int *) malloc(num_clients * sizeof(int));
  448.  
  449.     if (!done || !done_index || !ids || !id_array || !actual_size_array
  450.     || !error_code_array || !index_array)
  451.     {
  452.     fprintf(stderr, "malloc error.\n");
  453.     return (-1);
  454.     }
  455.     for (i = 0; i < num_clients; i++)
  456.     {
  457.     done[i] = (int *) malloc(num_buffers * sizeof(int));
  458.     ids[i] = (bmi_op_id_t *) malloc(num_buffers * sizeof(bmi_op_id_t));
  459.     if (!done[i] || !ids[i])
  460.     {
  461.         fprintf(stderr, "malloc error.\n");
  462.         return (-1);
  463.     }
  464.     }
  465.  
  466.     /* barrier and then start timing */
  467.     MPI_Barrier(MPI_COMM_WORLD);
  468.     time1 = MPI_Wtime();
  469.  
  470.     /* post everything at once */
  471.     for (i = 0; i < num_buffers; i++)
  472.     {
  473.     for (j = 0; j < num_clients; j++)
  474.     {
  475.         if (opts->flags & REUSE_BUFFERS)
  476.         {
  477.         recv_buffer = bmi_buf_array[j].buffers[0];
  478.         }
  479.         else
  480.         {
  481.         recv_buffer = bmi_buf_array[j].buffers[i];
  482.         }
  483.  
  484.         ret = BMI_post_recv(&(ids[j][i]), addr_array[j], recv_buffer,
  485.                 bmi_buf_array[0].size, &actual_size,
  486.                 buffer_type, 0, NULL, context);
  487.         if (ret < 0)
  488.         {
  489.         fprintf(stderr, "Server: BMI recv error.\n");
  490.         return (-1);
  491.         }
  492.         else if (ret == 0)
  493.         {
  494.         done[j][i] = 0;
  495.         }
  496.         else
  497.         {
  498.         /* mark that this message completed immediately */
  499.         done[j][i] = 1;
  500.         }
  501.     }
  502.     }
  503.  
  504.     /* find the first message to test for each client */
  505.     for (i = 0; i < num_clients; i++)
  506.     {
  507.     for (j = 0; j < num_buffers; j++)
  508.     {
  509.         if (done[i][j] == 0)
  510.         {
  511.         done_index[i] = j;
  512.         id_array[i] = ids[i][j];
  513.         break;
  514.         }
  515.     }
  516.     if (j == num_buffers)
  517.     {
  518.         /* this client is completely done */
  519.         done_index[i] = -1;
  520.         id_array[i] = 0;
  521.         done_clients++;
  522.     }
  523.     }
  524.  
  525.     /* while there is still work to do */
  526.     while (done_clients < num_clients)
  527.     {
  528.     outcount = 0;
  529.     do
  530.     {
  531.         /* test the earliest uncompleted message for each host-
  532.          * there are zero entries for hosts that are already
  533.          * finished
  534.          */
  535.         ret = BMI_testsome(num_clients, id_array, &outcount, index_array,
  536.                    error_code_array, actual_size_array, NULL, 0,
  537.                    context);
  538.     } while (ret == 0 && outcount == 0);
  539.  
  540.     if (ret < 0)
  541.     {
  542.         fprintf(stderr, "testsome error.\n");
  543.         return (-1);
  544.     }
  545.  
  546.     /* for each completed message, mark that it is done and
  547.      * adjust indexes to test for next uncompleted message
  548.      */
  549.     for (i = 0; i < outcount; i++)
  550.     {
  551.         if (error_code_array[i] != 0)
  552.         {
  553.         fprintf(stderr, "BMI op failure.\n");
  554.         return (-1);
  555.         }
  556.  
  557.         (done_index[index_array[i]])++;
  558.         if (done_index[index_array[i]] == num_buffers)
  559.         {
  560.         done_index[index_array[i]] = -1;
  561.         id_array[index_array[i]] = 0;
  562.         done_clients++;
  563.         }
  564.         else
  565.         {
  566.         while (done[index_array[i]][done_index[index_array[i]]] == 1)
  567.         {
  568.             (done_index[index_array[i]])++;
  569.         }
  570.         if (done_index[index_array[i]] == num_buffers)
  571.         {
  572.             done_index[index_array[i]] = -1;
  573.             id_array[index_array[i]] = 0;
  574.             done_clients++;
  575.         }
  576.         else
  577.         {
  578.             id_array[index_array[i]] =
  579.             ids[index_array[i]][done_index[index_array[i]]];
  580.         }
  581.         }
  582.     }
  583.     }
  584.  
  585.     /* stop timing */
  586.     time2 = MPI_Wtime();
  587.  
  588.     *wtime = time2 - time1;
  589.  
  590.     return (0);
  591. }
  592.  
  593.  
  594. static int bmi_client_postall(
  595.     struct bench_options *opts,
  596.     struct mem_buffers *bmi_buf_array,
  597.     int num_servers,
  598.     PVFS_BMI_addr_t * addr_array,
  599.     enum bmi_buffer_type buffer_type,
  600.     double *wtime,
  601.     int world_rank,
  602.     bmi_context_id context)
  603. {
  604.     double time1, time2;
  605.     int i, j;
  606.     int **done;
  607.     int *done_index;
  608.     bmi_op_id_t **ids;
  609.     int num_buffers = bmi_buf_array[0].num_buffers;
  610.     void *send_buffer;
  611.     int ret;
  612.     int outcount;
  613.     bmi_op_id_t *id_array;
  614.     int *index_array;
  615.     bmi_error_code_t *error_code_array;
  616.     bmi_size_t *actual_size_array;
  617.     int done_servers = 0;
  618.  
  619.     /* allocate a lot of arrays to keep up with what message we are
  620.      * on for each peer
  621.      */
  622.     done = (int **) malloc(num_servers * sizeof(int *));
  623.     done_index = (int *) malloc(num_servers * sizeof(int));
  624.     ids = (bmi_op_id_t **) malloc(num_servers * sizeof(bmi_op_id_t *));
  625.     id_array = (bmi_op_id_t *) malloc(num_servers * sizeof(bmi_op_id_t));
  626.     actual_size_array = (bmi_size_t *) malloc(num_servers * sizeof(bmi_size_t));
  627.     error_code_array = (bmi_error_code_t *) malloc(num_servers *
  628.                            sizeof(bmi_error_code_t));
  629.     index_array = (int *) malloc(num_servers * sizeof(int));
  630.  
  631.     if (!done || !done_index || !ids || !id_array || !actual_size_array
  632.     || !error_code_array || !index_array)
  633.     {
  634.     fprintf(stderr, "malloc error.\n");
  635.     return (-1);
  636.     }
  637.     for (i = 0; i < num_servers; i++)
  638.     {
  639.     done[i] = (int *) malloc(num_buffers * sizeof(int));
  640.     ids[i] = (bmi_op_id_t *) malloc(num_buffers * sizeof(bmi_op_id_t));
  641.     if (!done[i] || !ids[i])
  642.     {
  643.         fprintf(stderr, "malloc error.\n");
  644.         return (-1);
  645.     }
  646.     }
  647.  
  648.     /* barrier and then start timing */
  649.     MPI_Barrier(MPI_COMM_WORLD);
  650.     time1 = MPI_Wtime();
  651.  
  652.     /* post everything at once */
  653.     for (i = 0; i < num_buffers; i++)
  654.     {
  655.     for (j = 0; j < num_servers; j++)
  656.     {
  657.         if (opts->flags & REUSE_BUFFERS)
  658.         {
  659.         send_buffer = bmi_buf_array[j].buffers[0];
  660.         }
  661.         else
  662.         {
  663.         send_buffer = bmi_buf_array[j].buffers[i];
  664.         }
  665.  
  666.         ret = BMI_post_send(&(ids[j][i]), addr_array[j], send_buffer,
  667.                 bmi_buf_array[0].size, buffer_type, 0, NULL,
  668.                 context);
  669.         if (ret < 0)
  670.         {
  671.         fprintf(stderr, "Client: BMI send error.\n");
  672.         return (-1);
  673.         }
  674.         else if (ret == 0)
  675.         {
  676.         done[j][i] = 0;
  677.         }
  678.         else
  679.         {
  680.         /* mark that this message completed immediately */
  681.         done[j][i] = 1;
  682.         }
  683.     }
  684.     }
  685.  
  686.     /* find the first message to test for each client */
  687.     for (i = 0; i < num_servers; i++)
  688.     {
  689.     for (j = 0; j < num_buffers; j++)
  690.     {
  691.         if (done[i][j] == 0)
  692.         {
  693.         done_index[i] = j;
  694.         id_array[i] = ids[i][j];
  695.         break;
  696.         }
  697.     }
  698.     if (j == num_buffers)
  699.     {
  700.         /* this client is completely done */
  701.         done_index[i] = -1;
  702.         id_array[i] = 0;
  703.         done_servers++;
  704.     }
  705.     }
  706.  
  707.     /* while there is still work to do */
  708.     while (done_servers < num_servers)
  709.     {
  710.     outcount = 0;
  711.     do
  712.     {
  713.         /* test the earliest uncompleted message for each host-
  714.          * there are zero entries for hosts that are already
  715.          * finished
  716.          */
  717.         ret = BMI_testsome(num_servers, id_array, &outcount, index_array,
  718.                    error_code_array, actual_size_array, NULL, 0,
  719.                    context);
  720.     } while (ret == 0 && outcount == 0);
  721.  
  722.     if (ret < 0)
  723.     {
  724.         fprintf(stderr, "testsome error.\n");
  725.         return (-1);
  726.     }
  727.  
  728.     /* for each completed message, mark that it is done and
  729.      * adjust indexes to test for next uncompleted message
  730.      */
  731.     for (i = 0; i < outcount; i++)
  732.     {
  733.         if (error_code_array[i] != 0)
  734.         {
  735.         fprintf(stderr, "BMI op failure.\n");
  736.         return (-1);
  737.         }
  738.  
  739.         (done_index[index_array[i]])++;
  740.         if (done_index[index_array[i]] == num_buffers)
  741.         {
  742.         done_index[index_array[i]] = -1;
  743.         id_array[index_array[i]] = 0;
  744.         done_servers++;
  745.         }
  746.         else
  747.         {
  748.         while (done[index_array[i]][done_index[index_array[i]]] == 1)
  749.         {
  750.             (done_index[index_array[i]])++;
  751.         }
  752.         if (done_index[index_array[i]] == num_buffers)
  753.         {
  754.             done_index[index_array[i]] = -1;
  755.             id_array[index_array[i]] = 0;
  756.             done_servers++;
  757.         }
  758.         else
  759.         {
  760.             id_array[index_array[i]] =
  761.             ids[index_array[i]][done_index[index_array[i]]];
  762.         }
  763.         }
  764.     }
  765.     }
  766.  
  767.     /* stop timing */
  768.     time2 = MPI_Wtime();
  769.  
  770.     *wtime = time2 - time1;
  771.  
  772.     return (0);
  773. }
  774.  
  775.  
  776. static int mpi_server_postall(
  777.     struct bench_options *opts,
  778.     struct mem_buffers *mpi_buf_array,
  779.     int num_clients,
  780.     int *addr_array,
  781.     double *wtime,
  782.     int world_rank)
  783. {
  784.     int num_buffers = mpi_buf_array[0].num_buffers;
  785.     double time1, time2;
  786.     int i, j;
  787.     void *recv_buffer;
  788.     int ret = -1;
  789.     MPI_Request **requests = NULL;
  790.     MPI_Request *req_array = NULL;
  791.     int *request_index = NULL;
  792.     int done_clients = 0;
  793.     int outcount = 0;
  794.     int *index_array = NULL;
  795.     MPI_Status *status_array = NULL;
  796.  
  797.  
  798.     /* allocate a lot of arrays to keep up with what message we are
  799.      * on for each peer
  800.      */
  801.     requests = (MPI_Request **) malloc(num_clients * sizeof(MPI_Request *));
  802.     request_index = (int *) malloc(num_clients * sizeof(int));
  803.     req_array = (MPI_Request *) malloc(num_clients * sizeof(MPI_Request));
  804.     index_array = (int *) malloc(num_clients * sizeof(int));
  805.     status_array = (MPI_Status *) malloc(num_clients * sizeof(MPI_Status));
  806.  
  807.     if (!requests || !request_index || !req_array || !index_array ||
  808.     !status_array)
  809.     {
  810.     fprintf(stderr, "malloc error.\n");
  811.     return (-1);
  812.     }
  813.     for (i = 0; i < num_clients; i++)
  814.     {
  815.     request_index[i] = 0;
  816.     requests[i] = (MPI_Request *) malloc(num_buffers * sizeof(MPI_Request));
  817.     if (!requests[i])
  818.     {
  819.         fprintf(stderr, "malloc error.\n");
  820.         return (-1);
  821.     }
  822.     }
  823.  
  824.     /* barrier and then start timing */
  825.     MPI_Barrier(MPI_COMM_WORLD);
  826.     time1 = MPI_Wtime();
  827.  
  828.     /* post everything at once */
  829.     for (i = 0; i < num_buffers; i++)
  830.     {
  831.     for (j = 0; j < num_clients; j++)
  832.     {
  833.         if (opts->flags & REUSE_BUFFERS)
  834.         {
  835.         recv_buffer = mpi_buf_array[j].buffers[0];
  836.         }
  837.         else
  838.         {
  839.         recv_buffer = mpi_buf_array[j].buffers[i];
  840.         }
  841.  
  842.         ret = MPI_Irecv(recv_buffer, mpi_buf_array[0].size,
  843.                 MPI_BYTE, addr_array[j], 0, MPI_COMM_WORLD,
  844.                 &requests[j][i]);
  845.         if (ret != MPI_SUCCESS)
  846.         {
  847.         fprintf(stderr, "MPI_Irecv failure.\n");
  848.         return (-1);
  849.         }
  850.     }
  851.     }
  852.  
  853.     /* go until all peers have finished */
  854.     while (done_clients < num_clients)
  855.     {
  856.     /* build array of requests to test for */
  857.     for (i = 0; i < num_clients; i++)
  858.     {
  859.         if (request_index[i] == -1)
  860.         {
  861.         req_array[i] = MPI_REQUEST_NULL;
  862.         }
  863.         else
  864.         {
  865.         req_array[i] = requests[i][request_index[i]];
  866.         }
  867.     }
  868.  
  869.     /* test for completion */
  870.     do
  871.     {
  872.         ret = MPI_Testsome(num_clients, req_array, &outcount,
  873.                    index_array, status_array);
  874.     } while (ret == MPI_SUCCESS && outcount == 0);
  875.  
  876.     if (ret != MPI_SUCCESS)
  877.     {
  878.         fprintf(stderr, "MPI_Testsome failure.\n");
  879.         return (-1);
  880.     }
  881.  
  882.     /* for each completed message, mark that it is done and
  883.      * adjust indexes to test for next uncompleted message
  884.      */
  885.     for (i = 0; i < outcount; i++)
  886.     {
  887.         (request_index[index_array[i]])++;
  888.         if (request_index[index_array[i]] == num_buffers)
  889.         {
  890.         request_index[index_array[i]] = -1;
  891.         done_clients++;
  892.         }
  893.     }
  894.     }
  895.  
  896.     /* stop timing */
  897.     time2 = MPI_Wtime();
  898.  
  899.     *wtime = time2 - time1;
  900.  
  901.     return (0);
  902. }
  903.  
  904.  
  905. static int mpi_client_postall(
  906.     struct bench_options *opts,
  907.     struct mem_buffers *mpi_buf_array,
  908.     int num_servers,
  909.     int *addr_array,
  910.     double *wtime,
  911.     int world_rank)
  912. {
  913.     int num_buffers = mpi_buf_array[0].num_buffers;
  914.     double time1, time2;
  915.     int i, j;
  916.     void *send_buffer;
  917.     int ret = -1;
  918.     MPI_Request **requests = NULL;
  919.     MPI_Request *req_array = NULL;
  920.     int *request_index = NULL;
  921.     int done_servers = 0;
  922.     int outcount = 0;
  923.     int *index_array = NULL;
  924.     MPI_Status *status_array = NULL;
  925.  
  926.  
  927.     /* allocate a lot of arrays to keep up with what message we are
  928.      * on for each peer
  929.      */
  930.     requests = (MPI_Request **) malloc(num_servers * sizeof(MPI_Request *));
  931.     request_index = (int *) malloc(num_servers * sizeof(int));
  932.     req_array = (MPI_Request *) malloc(num_servers * sizeof(MPI_Request));
  933.     index_array = (int *) malloc(num_servers * sizeof(int));
  934.     status_array = (MPI_Status *) malloc(num_servers * sizeof(MPI_Status));
  935.  
  936.     if (!requests || !request_index || !req_array || !index_array ||
  937.     !status_array)
  938.     {
  939.     fprintf(stderr, "malloc error.\n");
  940.     return (-1);
  941.     }
  942.     for (i = 0; i < num_servers; i++)
  943.     {
  944.     request_index[i] = 0;
  945.     requests[i] = (MPI_Request *) malloc(num_buffers * sizeof(MPI_Request));
  946.     if (!requests[i])
  947.     {
  948.         fprintf(stderr, "malloc error.\n");
  949.         return (-1);
  950.     }
  951.     }
  952.  
  953.     /* barrier and then start timing */
  954.     MPI_Barrier(MPI_COMM_WORLD);
  955.     time1 = MPI_Wtime();
  956.  
  957.     /* post everything at once */
  958.     for (i = 0; i < num_buffers; i++)
  959.     {
  960.     for (j = 0; j < num_servers; j++)
  961.     {
  962.         if (opts->flags & REUSE_BUFFERS)
  963.         {
  964.         send_buffer = mpi_buf_array[j].buffers[0];
  965.         }
  966.         else
  967.         {
  968.         send_buffer = mpi_buf_array[j].buffers[i];
  969.         }
  970.  
  971.         ret = MPI_Isend(send_buffer, mpi_buf_array[0].size,
  972.                 MPI_BYTE, addr_array[j], 0, MPI_COMM_WORLD,
  973.                 &(requests[j][i]));
  974.         if (ret != MPI_SUCCESS)
  975.         {
  976.         fprintf(stderr, "MPI_Isend failure.\n");
  977.         return (-1);
  978.         }
  979.     }
  980.     }
  981.  
  982.     /* go until all peers have finished */
  983.     while (done_servers < num_servers)
  984.     {
  985.     /* build array of requests to test for */
  986.     for (i = 0; i < num_servers; i++)
  987.     {
  988.         if (request_index[i] == -1)
  989.         {
  990.         req_array[i] = MPI_REQUEST_NULL;
  991.         }
  992.         else
  993.         {
  994.         req_array[i] = requests[i][request_index[i]];
  995.         }
  996.     }
  997.  
  998.     /* test for completion */
  999.     do
  1000.     {
  1001.         ret = MPI_Testsome(num_servers, req_array, &outcount,
  1002.                    index_array, status_array);
  1003.     } while (ret == MPI_SUCCESS && outcount == 0);
  1004.  
  1005.     if (ret != MPI_SUCCESS)
  1006.     {
  1007.         fprintf(stderr, "MPI_Testsome failure.\n");
  1008.         return (-1);
  1009.     }
  1010.  
  1011.     /* for each completed message, mark that it is done and
  1012.      * adjust indexes to test for next uncompleted message
  1013.      */
  1014.     for (i = 0; i < outcount; i++)
  1015.     {
  1016.         (request_index[index_array[i]])++;
  1017.         if (request_index[index_array[i]] == num_buffers)
  1018.         {
  1019.         request_index[index_array[i]] = -1;
  1020.         done_servers++;
  1021.         }
  1022.     }
  1023.     }
  1024.  
  1025.     /* stop timing */
  1026.     time2 = MPI_Wtime();
  1027.  
  1028.     *wtime = time2 - time1;
  1029.  
  1030.     return (0);
  1031. }
  1032.  
  1033. /*
  1034.  * Local variables:
  1035.  *  c-indent-level: 4
  1036.  *  c-basic-offset: 4
  1037.  * End:
  1038.  *
  1039.  * vim: ts=8 sts=4 sw=4 expandtab
  1040.  */
  1041.