home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / client / sysint / sys-small-io.sm < prev    next >
Text File  |  2010-03-08  |  22KB  |  652 lines

  1. /* 
  2.  * (C) 2003 Clemson University and The University of Chicago 
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. /** \file
  8.  *  \ingroup sysint
  9.  *
  10.  *  PVFS2 system interface routines for reading and writing small files.
  11.  */
  12.  
  13. #include <string.h>
  14. #include <assert.h>
  15.  
  16. #include "client-state-machine.h"
  17. #include "pvfs2-debug.h"
  18. #include "job.h"
  19. #include "gossip.h"
  20. #include "str-utils.h"
  21. #include "pint-cached-config.h"
  22. #include "PINT-reqproto-encode.h"
  23. #include "pint-util.h"
  24. #include "pvfs2-internal.h"
  25.  
  26. /* The small-io state machine should only be invoked/jumped-to from the
  27.  * sys-io state machine.  We make this assumption and expect io parameters
  28.  * to be initialized already.
  29.  */
  30.  
  31. static int small_io_completion_fn(void * user_args,
  32.                                   struct PVFS_server_resp * resp_p,
  33.                                   int index);
  34.  
  35. enum {
  36.   MIRROR_RETRY = 132
  37. };
  38.  
  39. %%
  40.  
  41. nested machine pvfs2_client_small_io_sm
  42. {
  43.     state setup_msgpairs 
  44.     {
  45.         run small_io_setup_msgpairs;
  46.         success => xfer_msgpairs;
  47.         default => return;
  48.     }
  49.  
  50.     state xfer_msgpairs
  51.     {
  52.         jump pvfs2_msgpairarray_sm;
  53.         default => check_for_retries;
  54.     }
  55.  
  56.     state check_for_retries
  57.     {
  58.         run small_io_check_for_retries;
  59.         MIRROR_RETRY => xfer_msgpairs;
  60.         default => cleanup;  /*no mirroring, done, or out of retries*/ 
  61.     }
  62.  
  63.     state cleanup 
  64.     {
  65.         run small_io_cleanup;
  66.         default => return;
  67.     }
  68. }
  69.  
  70. %%
  71.  
  72. /**
  73.  * Small I/O is done in cases where the size of data transferred between
  74.  * client and server is smaller than the maximum unexpected message size
  75.  * accepted by the BMI transport interface in use.  In this case, we don't
  76.  * need to perform initial rendezvous setup messages before sending the
  77.  * actual data (the 'flow'), instead we just pack the data in the unexpected
  78.  * message.  The sys-io state machine checks for possible 'small i/o' cases
  79.  * and routes to the small i/o state actions in case.
  80.  */
  81. static PINT_sm_action small_io_setup_msgpairs(
  82.     struct PINT_smcb *smcb, job_status_s *js_p)
  83. {
  84.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  85.     PVFS_object_attr *attr = NULL;
  86.     int i = 0;
  87.     int ret;
  88.     PVFS_handle datafile_handle;
  89.     int regions = 0;
  90.     PINT_sm_msgpair_state *msg_p;
  91.     uint32_t server_nr;
  92.  
  93.     js_p->error_code = 0;
  94.  
  95.     attr = &sm_p->getattr.attr;
  96.     assert(attr);
  97.  
  98.     /* initialize msgarray. one msgpair for each handle with data. */
  99.     ret = PINT_msgpairarray_init(&sm_p->msgarray_op, sm_p->u.io.datafile_count);
  100.     if(ret < 0)
  101.     {
  102.         js_p->error_code = ret;
  103.         return SM_ACTION_COMPLETE;
  104.     }
  105.  
  106.     /*initialize small_io_ctx array.  one context for each handle in the file.*/
  107.     sm_p->u.io.small_io_ctx = malloc(attr->u.meta.dfile_count *
  108.                                      sizeof(*sm_p->u.io.small_io_ctx));
  109.     if (!sm_p->u.io.small_io_ctx)
  110.     {
  111.         PINT_msgpairarray_destroy(&sm_p->msgarray_op);
  112.         js_p->error_code = -PVFS_ENOMEM;
  113.         return SM_ACTION_COMPLETE;
  114.     }
  115.     memset(sm_p->u.io.small_io_ctx,0,sizeof(*sm_p->u.io.small_io_ctx) *
  116.                                      attr->u.meta.dfile_count);
  117.  
  118.  
  119.     foreach_msgpair(&sm_p->msgarray_op, msg_p, i)
  120.     {
  121.         datafile_handle = attr->u.meta.dfile_array[
  122.             sm_p->u.io.datafile_index_array[i]];
  123.  
  124.         gossip_debug(GOSSIP_IO_DEBUG, "   small_io_setup_msgpairs: "
  125.                      "handle: %llu\n", llu(datafile_handle));
  126.  
  127.         if(sm_p->u.io.io_type == PVFS_IO_WRITE)
  128.         {
  129.             PINT_Request_state * file_req_state = NULL;
  130.             PINT_Request_state * mem_req_state = NULL;
  131.             PINT_request_file_data file_data;
  132.             PINT_Request_result result;
  133.  
  134.             memset(&file_data, 0, sizeof(PINT_request_file_data));
  135.             file_data.server_ct = attr->u.meta.dfile_count;
  136.             file_data.fsize = 0;
  137.             file_data.dist = attr->u.meta.dist;
  138.             file_data.extend_flag = 1;
  139.             result.segmax = IO_MAX_REGIONS;
  140.  
  141.             result.bytemax = PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req);
  142.             file_req_state = PINT_new_request_state(sm_p->u.io.file_req);
  143.             mem_req_state = PINT_new_request_state(sm_p->u.io.mem_req);
  144.  
  145.             PINT_REQUEST_STATE_SET_TARGET(file_req_state, 
  146.                                           sm_p->u.io.file_req_offset);
  147.  
  148.             PINT_REQUEST_STATE_SET_FINAL(file_req_state, 
  149.                                          sm_p->u.io.file_req_offset + 
  150.                                          result.bytemax);
  151.  
  152.             file_data.server_nr = sm_p->u.io.datafile_index_array[i]; 
  153.  
  154.             result.segs = 0;
  155.             result.bytes = 0;
  156.             result.offset_array = msg_p->req.u.small_io.offsets;
  157.             result.size_array = msg_p->req.u.small_io.sizes;
  158.             msg_p->req.u.small_io.buffer = sm_p->u.io.buffer;
  159.  
  160.             ret = PINT_process_request(
  161.                 file_req_state, mem_req_state,
  162.                 &file_data,
  163.                 &result,
  164.                 PINT_CLIENT);
  165.             if(ret < 0)
  166.             {
  167.                 js_p->error_code = ret;
  168.                 PINT_free_request_state(file_req_state);
  169.                 PINT_free_request_state(mem_req_state);
  170.                 return SM_ACTION_COMPLETE;
  171.             }
  172.  
  173.             regions = result.segs;
  174.  
  175.             PINT_free_request_state(file_req_state);
  176.             PINT_free_request_state(mem_req_state);
  177.         }
  178.  
  179.         /* if this is a write operation, the appropriate offset and size
  180.          * arrays will have been filled in by the request processing above.
  181.          * reads don't require processing of the memory request until
  182.          * the response.  
  183.          */ 
  184.         PINT_SERVREQ_SMALL_IO_FILL(
  185.             msg_p->req,
  186.             *sm_p->cred_p,
  187.             sm_p->object_ref.fs_id,
  188.             datafile_handle,
  189.             sm_p->u.io.io_type,
  190.             sm_p->u.io.datafile_index_array[i], 
  191.             attr->u.meta.dfile_count,
  192.             attr->u.meta.dist,
  193.             sm_p->u.io.file_req,
  194.             sm_p->u.io.file_req_offset,
  195.             regions,
  196.             PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
  197.             sm_p->hints);
  198.  
  199.         msg_p->fs_id  = sm_p->object_ref.fs_id;
  200.         msg_p->handle = datafile_handle;
  201.  
  202.         /*if we are processing a read request and the source file has mirrored
  203.          *handles, then bypass msgpairarray's retry mechanism.  SMALL-IO will
  204.          *prepare another set of msgpairs using the mirrors and then retry.
  205.         */
  206.         if (sm_p->u.io.io_type == PVFS_IO_READ &&
  207.             attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
  208.         {
  209.             msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;  
  210.         }
  211.         else
  212.         {
  213.             msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
  214.         }
  215.         msg_p->comp_fn = small_io_completion_fn;
  216.  
  217.         ret = PINT_cached_config_map_to_server(
  218.             &msg_p->svr_addr, datafile_handle,
  219.             sm_p->object_ref.fs_id);
  220.         if(ret < 0)
  221.         {
  222.             gossip_lerr("Failed to map data server address\n");
  223.             js_p->error_code = ret;
  224.             return SM_ACTION_COMPLETE;
  225.         }
  226.  
  227.         /*store the original datahandle for later use.*/
  228.         server_nr = msg_p->req.u.small_io.server_nr;
  229.         sm_p->u.io.small_io_ctx[server_nr].original_datahandle = msg_p->handle;
  230.     }
  231.  
  232.     js_p->error_code = 0;
  233.  
  234.     PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
  235.     return SM_ACTION_COMPLETE;
  236. }
  237.  
  238. /**
  239.  * We assume that the response buffer hasn't been freed yet (before
  240.  * the completion function is called.   The msgpairarray.sm doesn't
  241.  * free the response buffer until after the completion function is
  242.  * called.
  243.  */
  244. static int small_io_completion_fn(void * user_args,
  245.                                   struct PVFS_server_resp * resp_p,
  246.                                   int index)
  247. {
  248.     struct PINT_smcb *smcb = (struct PINT_smcb *)user_args;
  249.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
  250.     PINT_sm_msgarray_op   *mop   = &(sm_p->msgarray_op);
  251.     PINT_sm_msgpair_state *msg_p = &(mop->msgarray[index]);
  252.     PINT_client_small_io_ctx *ctx = 
  253.          &(sm_p->u.io.small_io_ctx[msg_p->req.u.small_io.server_nr]);
  254.     uint32_t server_nr = msg_p->req.u.small_io.server_nr;
  255.     PVFS_object_attr *attr __attribute__((unused)) = &(sm_p->getattr.attr);
  256.     PVFS_metafile_attr *meta __attribute__((unused)) = &(attr->u.meta);
  257.  
  258.     int ret = 0;
  259.  
  260.     assert(resp_p->op == PVFS_SERV_SMALL_IO);
  261.  
  262.     if(resp_p->status != 0)
  263.     {
  264.         return resp_p->status;
  265.     }
  266.  
  267.     if(resp_p->u.small_io.io_type == PVFS_IO_READ)
  268.     {
  269.         PVFS_size sizes[IO_MAX_REGIONS];
  270.         PVFS_offset offsets[IO_MAX_REGIONS];
  271.         PVFS_object_attr * attr = &sm_p->getattr.attr;
  272.         PINT_request_file_data fdata;
  273.         PINT_Request_result result;
  274.         PINT_Request_state * file_req_state;
  275.         PINT_Request_state * mem_req_state;
  276.         int i = 0;
  277.         int done = 0;
  278.         PVFS_size bytes_processed = 0;
  279.  
  280.         memset(&fdata, 0, sizeof(fdata));
  281.  
  282.         if(resp_p->u.small_io.result_size != 0)
  283.         {
  284.             memset(&fdata, 0, sizeof(PINT_request_file_data));
  285.             fdata.server_ct = attr->u.meta.dfile_count;
  286.  
  287.             fdata.server_nr = server_nr;
  288.             fdata.dist = attr->u.meta.dist;
  289.             fdata.fsize = resp_p->u.small_io.bstream_size;
  290.  
  291.             result.segmax = IO_MAX_REGIONS;
  292.             result.bytemax = resp_p->u.small_io.result_size;
  293.             result.size_array = sizes;
  294.             result.offset_array = offsets;
  295.  
  296.             file_req_state = PINT_new_request_state(sm_p->u.io.file_req);
  297.             mem_req_state = PINT_new_request_state(sm_p->u.io.mem_req);
  298.  
  299.             PINT_REQUEST_STATE_SET_TARGET(file_req_state, 
  300.                                           sm_p->u.io.file_req_offset);
  301.             PINT_REQUEST_STATE_SET_FINAL(
  302.                 file_req_state, sm_p->u.io.file_req_offset + 
  303.                 PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req));
  304.  
  305.             do
  306.             {
  307.                 result.segs = 0;
  308.                 result.bytes = 0;
  309.  
  310.                 ret = PINT_process_request(
  311.                     file_req_state,
  312.                     mem_req_state,
  313.                     &fdata,
  314.                     &result,
  315.                     PINT_CLIENT);
  316.                 if(ret < 0)
  317.                 {
  318.                     gossip_err("Failed processing request in small I/O read\n");
  319.                     return ret;
  320.                 }
  321.  
  322.                 for(i = 0; i < result.segs && !done; ++i)
  323.                 {
  324.                     int tmp_size;
  325.                     char * src_ptr;
  326.                     char * dest_ptr;
  327.  
  328.                     dest_ptr = (char *)sm_p->u.io.buffer + offsets[i];
  329.                     src_ptr = (char *)resp_p->u.small_io.buffer + 
  330.                         bytes_processed;
  331.  
  332.                     if((bytes_processed + sizes[i]) <= 
  333.                        resp_p->u.small_io.result_size)
  334.                     {
  335.                         tmp_size = sizes[i];
  336.                     }
  337.                     else
  338.                     {
  339.                         tmp_size = resp_p->u.small_io.result_size - 
  340.                             bytes_processed;
  341.                         done = 1;
  342.                     }
  343.  
  344.                     memcpy(dest_ptr, src_ptr, sizes[i]);
  345.                     bytes_processed += tmp_size;
  346.                 }
  347.             } while(!PINT_REQUEST_DONE(file_req_state) && !done);
  348.  
  349.             if(resp_p->u.small_io.result_size != bytes_processed)
  350.             {
  351.                 gossip_err("size of bytes copied to user buffer "
  352.                            "(%llu) do not match size of response (%llu)\n", 
  353.                            llu(bytes_processed),
  354.                            llu(resp_p->u.small_io.result_size));
  355.                 return -PVFS_EINVAL;
  356.             }
  357.             PINT_free_request_state(file_req_state);
  358.             PINT_free_request_state(mem_req_state);
  359.         }
  360.     } /*if PVFS_IO_READ*/
  361.  
  362.     sm_p->u.io.dfile_size_array[server_nr] = resp_p->u.small_io.bstream_size;
  363.     //sm_p->u.io.dfile_size_array[index] = resp_p->u.small_io.bstream_size;
  364.  
  365.     sm_p->u.io.total_size += resp_p->u.small_io.result_size;
  366.  
  367.     /* Let's SMALL-IO know that the msg completed. */
  368.     ctx->msg_completed = 1;
  369.  
  370.     /* To test fail-over with small-io, uncomment the following code.  This
  371.      * will force each primary handle to have each of its mirrored handles
  372.      * tried on a READ io.  If you are testing a file with many primary
  373.      * handles and/or many copies and don't want to wade through such a large
  374.      * test, then don't set the retry-limit.  By default, it is normally set
  375.      * at 5.  You can tweak this value in the pvfs2-fs.conf file, if you
  376.      * want.
  377.     */
  378.  
  379.     gossip_debug(GOSSIP_IO_DEBUG,"handle=%llu \toperation=%d \toffset=%ld "
  380.                                  "\taggregate_size=%ld\n"
  381.                                 ,llu(msg_p->req.u.small_io.handle)
  382.                                 ,msg_p->req.u.small_io.io_type
  383.                                 ,msg_p->req.u.small_io.file_req_offset
  384.                                 ,msg_p->req.u.small_io.aggregate_size);
  385.  
  386. /*
  387.     if (   (sm_p->u.io.io_type == PVFS_IO_READ) 
  388.         && (attr->mask & PVFS_ATTR_META_MIRROR_DFILES) 
  389.                                == PVFS_ATTR_META_MIRROR_DFILES
  390.         && (sm_p->u.io.retry_count < mop->params.retry_limit))
  391.     {
  392.        mop->params.retry_limit = meta->mirror_copies_count;
  393.        ctx->msg_completed = 0;
  394.     } 
  395. */
  396.  
  397.     return 0;
  398. }
  399.  
  400.  
  401. static int small_io_check_for_retries( struct PINT_smcb *smcb
  402.                                      , job_status_s *js_p)
  403. {
  404.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s..\n",__func__);
  405.  
  406.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  407.     struct PVFS_object_attr *attr = &(sm_p->getattr.attr);
  408.     PVFS_metafile_attr *meta = &(attr->u.meta);
  409.     PINT_client_small_io_ctx *ctx = NULL;
  410.     PINT_sm_msgarray_op *mop = &sm_p->msgarray_op;
  411.     PINT_sm_msgpair_state *msgarray = mop->msgarray;
  412.     PINT_sm_msgpair_state *msg = NULL;
  413.     PINT_sm_msgpair_state *new_msg = NULL;
  414.     PINT_sm_msgarray_op new_mop = {{0},0,0,{0}};
  415.     char *enc_req_bytes = NULL;
  416.  
  417.     uint32_t retry_msg_count = 0;
  418.     uint32_t index = 0;
  419.     uint32_t copies = 0;
  420.     uint32_t server_nr = 0;
  421.     int i   = 0;
  422.     int j   = 0;
  423.     int k   = 0;
  424.     int ret = 0;
  425.  
  426.     /*if we are processing a write request, then msgpairarray handles retries.
  427.      *if we are processing a read and the source file is mirrored, then
  428.      *SMALL-IO handles the retries.
  429.     */
  430.     if (sm_p->u.io.io_type == PVFS_IO_WRITE ||
  431.         ((attr->mask & PVFS_ATTR_META_MIRROR_DFILES) != 
  432.           PVFS_ATTR_META_MIRROR_DFILES))
  433.     {
  434.        return SM_ACTION_COMPLETE;
  435.     }
  436.  
  437.     /* Do any messages need to be retried? */
  438.     for (i=0; i<mop->count; i++)
  439.     {
  440.         server_nr = msgarray[i].req.u.small_io.server_nr;
  441.         ctx = &sm_p->u.io.small_io_ctx[server_nr];
  442.         if (!ctx->msg_completed)
  443.             retry_msg_count++;
  444.     }
  445.  
  446.     /* no retries needed */
  447.     if (!retry_msg_count)
  448.     {
  449.         return SM_ACTION_COMPLETE;
  450.     }
  451.  
  452.     /* do we have any retries available? */
  453.     if (sm_p->u.io.retry_count >= mop->params.retry_limit)
  454.     {
  455.         return SM_ACTION_COMPLETE;
  456.     }
  457.  
  458.     /* okay. let's setup new msgpairs to retry. we will modify the incomplete 
  459.      * msg pairs stored in msgarray and then copy them into a new msgarray 
  460.      * before calling msgpairarray.sm.
  461.     */
  462.     for (i=0; i<mop->count; i++)
  463.     {
  464.         msg = &msgarray[i];
  465.         server_nr = msg->req.u.small_io.server_nr;
  466.         ctx = &sm_p->u.io.small_io_ctx[server_nr];
  467.      
  468.         /* don't process completed messages */
  469.         if (ctx->msg_completed)
  470.            continue;
  471.  
  472.         /* for incomplete messages, cleanup memory, if necessary */
  473.         enc_req_bytes = (char *)&(msg->encoded_req);
  474.         for (k=0; k<sizeof(msg->encoded_req); k++)
  475.         {
  476.            if (enc_req_bytes[k] != '\0')
  477.            {
  478.               PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
  479.               break;
  480.            }
  481.         }/*end for*/
  482.  
  483.         if (msg->encoded_resp_p)
  484.         {
  485.             BMI_memfree(msg->svr_addr
  486.                        ,msg->encoded_resp_p
  487.                        ,msg->max_resp_sz
  488.                        ,BMI_RECV);
  489.         }
  490.         
  491.         /* Should we use the original datahandle? */
  492.         if (ctx->retry_original)
  493.         {
  494.             ctx->retry_original = 0;
  495.             msg->handle                = ctx->original_datahandle;
  496.             msg->req.u.small_io.handle = ctx->original_datahandle;
  497.             msg->svr_addr = 0;
  498.             ret = PINT_cached_config_map_to_server(&msg->svr_addr
  499.                                                   ,msg->handle
  500.                                                   ,msg->fs_id);
  501.             if (ret)
  502.             {
  503.                gossip_lerr("Unable to determine the server address "
  504.                            "for this handle (%llu)"
  505.                            ,llu(msg->handle));
  506.                js_p->error_code = ret;
  507.                return SM_ACTION_COMPLETE;
  508.             }
  509.             continue;
  510.         }/*end retry_original*/
  511.  
  512.         /* get next mirrored handle.  note:  if a mirrored handle is zero, then
  513.          * this means that the creation of this mirrored object failed for its
  514.          * particular server.  if so, then get the next valid handle.  as a 
  515.          * last resort, retry the original handle.
  516.         */
  517.         copies = ctx->current_copies_count;
  518.         for (;copies < meta->mirror_copies_count; copies++)
  519.         {
  520.             index = (copies*meta->dfile_count) + server_nr;
  521.             if (meta->mirror_dfile_array[index] != 0)
  522.             {  /* we have found a valid mirrored handle */
  523.                msg->handle = meta->mirror_dfile_array[index];
  524.                break;
  525.             }
  526.         }
  527.  
  528.         /* if we haven't found a valid mirrored handle, retry the original
  529.          * datahandle.
  530.         */
  531.         if ( copies == meta->mirror_copies_count )
  532.         {
  533.            msg->handle = ctx->original_datahandle;
  534.            ctx->retry_original = 0;
  535.            ctx->current_copies_count = 0;
  536.            sm_p->u.io.retry_count++;
  537.            msg->req.u.small_io.handle = ctx->original_datahandle;
  538.            msg->svr_addr = 0;
  539.            ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
  540.                                                ,msg->handle
  541.                                                ,msg->fs_id);
  542.            if (ret)
  543.            {
  544.               gossip_lerr("Unable to determine the server address "
  545.                           "for this handle (%llu)"
  546.                           ,llu(msg->handle));
  547.               js_p->error_code = ret;
  548.               return SM_ACTION_COMPLETE;
  549.            }
  550.            continue;
  551.         }/*end if we have to use the original*/
  552.  
  553.         /* Otherwise, use the discovered mirrored handle */
  554.         msg->req.u.small_io.handle = msg->handle;
  555.         msg->svr_addr = 0;
  556.         ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
  557.                                             ,msg->handle
  558.                                             ,msg->fs_id);
  559.         if (ret)
  560.         {
  561.            gossip_lerr("Unable to determine the server address "
  562.                        "for this handle (%llu)"
  563.                        ,llu(msg->handle));
  564.            js_p->error_code = ret;
  565.            return SM_ACTION_COMPLETE;
  566.         }
  567.  
  568.         /* and setup for the next retry event */
  569.         ctx->current_copies_count++;
  570.         if (ctx->current_copies_count == meta->mirror_copies_count)
  571.         {
  572.            ctx->current_copies_count = 0;
  573.            ctx->retry_original = 1;
  574.            sm_p->u.io.retry_count++;
  575.         }
  576.     }/*end for each msgpair*/
  577.  
  578.     /* Now, create a new msgpair array and populate from the above modified
  579.      * messages.
  580.     */
  581.     ret = PINT_msgpairarray_init(&new_mop,retry_msg_count);
  582.     if (ret)
  583.     {
  584.         gossip_lerr("Unable to initialize msgarray_op:new_op\n");
  585.         js_p->error_code = ret;
  586.         return SM_ACTION_COMPLETE;
  587.     }
  588.  
  589.     /* populate the new msgarray with the modified messages */
  590.     for (i=0, j=0; i<mop->count && j<new_mop.count; i++)
  591.     {
  592.         msg = &msgarray[i];
  593.         server_nr = msg->req.u.small_io.server_nr;
  594.         ctx = &sm_p->u.io.small_io_ctx[server_nr];
  595.      
  596.         /* don't populate with completed messages */
  597.         if (ctx->msg_completed)
  598.            continue;
  599.  
  600.         new_msg = &new_mop.msgarray[j];
  601.         j++;
  602.  
  603.         new_msg->fs_id       = msg->fs_id;
  604.         new_msg->handle      = msg->handle;
  605.         new_msg->comp_fn     = msg->comp_fn;
  606.         new_msg->svr_addr    = msg->svr_addr;
  607.         new_msg->req         = msg->req;
  608.         new_msg->enc_type    = msg->enc_type;
  609.         new_msg->retry_flag  = msg->retry_flag;
  610.     }/*end for*/
  611.  
  612.     /* Destroy the old msgarray and substitute with the new. Params are left
  613.      * in tact.
  614.     */
  615.     PINT_msgpairarray_destroy(mop);
  616.     mop->count    = new_mop.count;
  617.     mop->msgarray = new_mop.msgarray;
  618.     mop->msgpair  = new_mop.msgpair;
  619.  
  620.     /* Push the msgarray_op and jump to msgpairarray.sm */
  621.     PINT_sm_push_frame(smcb,0,mop);
  622.     js_p->error_code=MIRROR_RETRY;
  623.     return SM_ACTION_COMPLETE;
  624. }/*end small_io_check_for_retries*/
  625.  
  626.  
  627.  
  628. static int small_io_cleanup(
  629.     struct PINT_smcb *smcb, job_status_s *js_p)
  630. {
  631.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  632.  
  633.     PINT_msgpairarray_destroy(&sm_p->msgarray_op);
  634.  
  635.     /*release the ctx array; this array is allocated whether or not the 
  636.      *file to read is mirrored.
  637.     */
  638.     free(sm_p->u.io.small_io_ctx);
  639.  
  640.     return SM_ACTION_COMPLETE;
  641. }
  642.  
  643. /*
  644.  * Local variables:
  645.  *  mode: c
  646.  *  c-indent-level: 4
  647.  *  c-basic-offset: 4
  648.  * End:
  649.  *
  650.  * vim: ft=c ts=8 sts=4 sw=4 expandtab
  651.  */
  652.