home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / client / sysint / sys-io.sm < prev    next >
Text File  |  2010-06-28  |  107KB  |  3,273 lines

  1. /* 
  2.  * (C) 2003 Clemson University and The University of Chicago 
  3.  *
  4.  * See COPYING in top-level directory.
  5.  *
  6.  */
  7.  
  8. /** \file
  9.  *  \ingroup sysint
  10.  *
  11.  *  PVFS2 system interface routines for reading and writing files.
  12.  */
  13.  
  14. #include <string.h>
  15. #include <assert.h>
  16. #include <unistd.h>
  17.  
  18. #include "client-state-machine.h"
  19. #include "pvfs2-debug.h"
  20. #include "job.h"
  21. #include "gossip.h"
  22. #include "str-utils.h"
  23. #include "pint-cached-config.h"
  24. #include "PINT-reqproto-encode.h"
  25. #include "pint-util.h"
  26. #include "pvfs2-internal.h"
  27.  
  28. #define IO_MAX_SEGMENT_NUM 50 
  29. #define IO_ATTR_MASKS (PVFS_ATTR_META_ALL|PVFS_ATTR_COMMON_TYPE)
  30.  
  31. extern job_context_id pint_client_sm_context;
  32.  
  33. enum
  34. {
  35.     IO_NO_DATA = 132,
  36.     IO_DATAFILE_TRANSFERS_COMPLETE,
  37.     IO_RETRY,
  38.     IO_RETRY_NODELAY,
  39.     IO_GET_DATAFILE_SIZE,
  40.     IO_ANALYZE_SIZE_RESULTS,
  41.     IO_DO_SMALL_IO,
  42.     IO_UNSTUFF,
  43.     IO_GETATTR_SERVER,
  44.     IO_MIRRORING,
  45.     IO_NO_MIRRORING,
  46.     IO_FATAL_ERROR,
  47. };
  48.  
  49. /* Helper functions local to sys-io.sm. */
  50.  
  51. static inline int io_complete_context_send_or_recv(
  52.     PINT_smcb *smcb, job_status_s *js_p);
  53.  
  54. static inline int io_decode_ack_response(
  55.     PINT_client_io_ctx *cur_ctx,
  56.     struct PINT_decoded_msg *decoded_resp,
  57.     struct PVFS_server_resp **resp);
  58.  
  59. static inline int io_post_flow(
  60.     PINT_smcb *smcb, PINT_client_io_ctx *cur_ctx);
  61.  
  62. static inline int io_post_write_ack_recv(
  63.     PINT_smcb *smcb, PINT_client_io_ctx * cur_ctx);
  64.  
  65. static inline int io_process_context_recv(
  66.     PINT_client_sm *sm_p, job_status_s *js_p, PINT_client_io_ctx **out_ctx);
  67.  
  68. static inline int io_check_context_status(
  69.     PINT_client_io_ctx *cur_ctx, int io_type,
  70.     PVFS_size *total_size);
  71.  
  72. static int io_find_target_datafiles(
  73.     PVFS_Request mem_req,
  74.     PVFS_Request file_req,
  75.     PVFS_offset file_req_offset,
  76.     PINT_dist *dist_p,
  77.     PVFS_fs_id fs_id,
  78.     enum PVFS_io_type io_type,
  79.     PVFS_handle *input_handle_array,
  80.     int input_handle_count,
  81.     int *handle_index_array,
  82.     int *handle_index_out_count,
  83.     int *sio_handle_index_array,
  84.     int *sio_handle_index_count);
  85.  
  86. static int io_find_total_size(
  87.     PINT_client_sm * sm_p,
  88.     PVFS_offset final_offset,
  89.     PVFS_size * total_return_size);
  90.  
  91. static int io_find_offset(
  92.     PINT_client_sm * sm_p,
  93.     PVFS_size contig_size,
  94.     PVFS_size * total_return_offset);
  95.  
  96. static int io_get_max_unexp_size(
  97.     struct PINT_Request * file_req,
  98.     PVFS_handle handle, 
  99.     PVFS_fs_id fs_id, 
  100.     enum PVFS_io_type type,
  101.     int * max_unexp_payload);
  102.  
  103. static int io_zero_fill_holes(
  104.     PINT_client_sm *sm_p,
  105.     PVFS_size eof,
  106.     int datafile_count,
  107.     PVFS_size * datafile_size_array,
  108.     int * datafile_index_array);
  109.  
  110. static int io_contexts_init(PINT_client_sm *sm_p, int count,
  111.                             PVFS_object_attr *attr);
  112.  
  113. static void io_contexts_destroy(PINT_client_sm *sm_p);
  114.  
  115. static int unstuff_needed(
  116.     PVFS_Request mem_req,
  117.     PVFS_offset file_req_offset,
  118.     PINT_dist *dist_p,
  119.     uint32_t mask,
  120.     enum PVFS_io_type io_type);
  121.  
  122. static int unstuff_comp_fn(
  123.     void *v_p,
  124.     struct PVFS_server_resp *resp_p,
  125.     int i);
  126.  
  127. /* misc constants and helper macros */
  128. #define IO_RECV_COMPLETED                                    1
  129.  
  130. /* possible I/O state machine phases (status_user_tag) */
  131. #define IO_SM_PHASE_REQ_MSGPAIR_RECV                         0
  132. #define IO_SM_PHASE_REQ_MSGPAIR_SEND                         1
  133. #define IO_SM_PHASE_FLOW                                     2
  134. #define IO_SM_PHASE_FINAL_ACK                                3
  135. #define IO_SM_NUM_PHASES                                     4
  136.  
  137. #define STATUS_USER_TAG_TYPE(tag, type)                      \
  138. ((tag % IO_SM_NUM_PHASES) == type)
  139. #define STATUS_USER_TAG_GET_INDEX(tag, type)                 \
  140. (tag / IO_SM_NUM_PHASES)
  141. #define STATUS_USER_TAG_IS_SEND_OR_RECV(tag)                 \
  142. (STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_RECV) ||  \
  143.  STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
  144.  
  145. static int io_datafile_index_array_init(
  146.     PINT_client_sm *sm_p,
  147.     int datafile_count);
  148.  
  149. static void io_datafile_index_array_destroy(
  150.     PINT_client_sm *sm_p);
  151.  
  152. %%
  153.  
  154. machine pvfs2_client_io_sm
  155.  
  156. {
  157.     state init
  158.     {
  159.         run io_init;
  160.         default => io_getattr;
  161.     }
  162.  
  163.     state io_getattr
  164.     {   
  165.         jump pvfs2_client_getattr_sm;
  166.         success => inspect_attr;
  167.         default => io_cleanup;
  168.     }
  169.  
  170.     state inspect_attr
  171.     {   
  172.         run io_inspect_attr;
  173.         IO_UNSTUFF => unstuff_setup_msgpair;
  174.         IO_GETATTR_SERVER => unstuff_setup_msgpair;
  175.         success => io_datafile_setup_msgpairs;
  176.         default => io_cleanup;
  177.     }
  178.  
  179.     state unstuff_setup_msgpair
  180.     {
  181.         run io_unstuff_setup_msgpair;
  182.         success => unstuff_xfer_msgpair;
  183.         default => io_cleanup;
  184.     }
  185.  
  186.     state unstuff_xfer_msgpair
  187.     {
  188.         jump pvfs2_msgpairarray_sm;
  189.         success => io_datafile_setup_msgpairs; 
  190.         default => io_cleanup;
  191.     }
  192.  
  193.     state io_datafile_setup_msgpairs
  194.     {
  195.         run io_datafile_setup_msgpairs;
  196.         IO_NO_DATA => io_cleanup;
  197.         IO_DO_SMALL_IO => small_io;
  198.         success => io_datafile_post_msgpairs;
  199.         default => io_cleanup;
  200.     }
  201.  
  202.     state small_io
  203.     {
  204.         jump pvfs2_client_small_io_sm;
  205.         success => io_analyze_results;
  206.         default => io_cleanup;
  207.     }
  208.  
  209.     state io_datafile_post_msgpairs
  210.     { 
  211.         run io_datafile_post_msgpairs;
  212.         IO_RETRY => io_datafile_post_msgpairs_retry;
  213.         IO_FATAL_ERROR => io_cleanup;
  214.         default => io_datafile_complete_operations;
  215.     }
  216.  
  217.     state io_datafile_post_msgpairs_retry
  218.     {
  219.         run io_datafile_post_msgpairs_retry;
  220.         IO_MIRRORING    => io_datafile_mirror_retry;
  221.         IO_NO_MIRRORING => io_datafile_no_mirror_retry;
  222.         default => io_datafile_no_mirror_retry;
  223.     }
  224.  
  225.     state io_datafile_no_mirror_retry
  226.     {
  227.         run io_datafile_no_mirror_retry;
  228.         IO_DATAFILE_TRANSFERS_COMPLETE => io_analyze_results;
  229.         default => io_datafile_post_msgpairs;
  230.     }
  231.  
  232.     state io_datafile_mirror_retry
  233.     {
  234.         run io_datafile_mirror_retry;
  235.         IO_DATAFILE_TRANSFERS_COMPLETE => io_analyze_results;
  236.         default => io_datafile_post_msgpairs;
  237.     }
  238.  
  239.     state io_datafile_complete_operations
  240.     {
  241.         run io_datafile_complete_operations;
  242.         IO_DATAFILE_TRANSFERS_COMPLETE => io_analyze_results;
  243.         IO_RETRY => io_datafile_post_msgpairs_retry;
  244.         default => io_datafile_complete_operations;
  245.     }
  246.  
  247.     state io_analyze_results
  248.     {
  249.         run io_analyze_results;
  250.         IO_RETRY => init;
  251.         /*IO_ANALYZE_SIZE_RESULTS => io_analyze_size_results;*/
  252.         IO_GET_DATAFILE_SIZE => io_datafile_size;
  253.         default => io_cleanup;
  254.     }
  255.  
  256.     state io_datafile_size
  257.     {
  258.         jump pvfs2_client_datafile_getattr_sizes_sm;
  259.         success => io_analyze_size_results;
  260.         default => io_cleanup;
  261.     }
  262.  
  263.     state io_analyze_size_results
  264.     {
  265.         run io_analyze_size_results;
  266.         default => io_cleanup;
  267.     }
  268.  
  269.     state io_cleanup
  270.     {
  271.         run io_cleanup;
  272.         default => terminate;
  273.     }
  274. }
  275.  
  276. %%
  277.  
  278. /** Initiate a read or write operation.
  279.  *
  280.  *  \param type specifies if the operation is a read or write.
  281.  */
  282. PVFS_error PVFS_isys_io(
  283.     PVFS_object_ref ref,
  284.     PVFS_Request file_req,
  285.     PVFS_offset file_req_offset,
  286.     void *buffer,
  287.     PVFS_Request mem_req,
  288.     const PVFS_credentials *credentials,
  289.     PVFS_sysresp_io *resp_p,
  290.     enum PVFS_io_type io_type,
  291.     PVFS_sys_op_id *op_id,
  292.     PVFS_hint hints,
  293.     void *user_ptr)
  294. {
  295.     PVFS_error ret = -PVFS_EINVAL;
  296.     PINT_smcb *smcb = NULL;
  297.     PINT_client_sm *sm_p = NULL;
  298.     struct filesystem_configuration_s* cur_fs = NULL;
  299.     struct server_configuration_s *server_config = NULL;
  300.  
  301.     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_io entered [%llu]\n",
  302.                  llu(ref.handle));
  303.  
  304.     if ((ref.handle == PVFS_HANDLE_NULL) ||
  305.         (ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
  306.     {
  307.         gossip_err("invalid (NULL) required argument\n");
  308.         return ret;
  309.     }
  310.  
  311.     if ((io_type != PVFS_IO_READ) && (io_type != PVFS_IO_WRITE))
  312.     {
  313.         gossip_err("invalid (unknown) I/O type specified\n");
  314.         return ret;
  315.     }
  316.  
  317.     server_config = PINT_get_server_config_struct(ref.fs_id);
  318.     cur_fs = PINT_config_find_fs_id(server_config, ref.fs_id);
  319.     PINT_put_server_config_struct(server_config);
  320.  
  321.     if (!cur_fs)
  322.     {
  323.         gossip_err("invalid (unknown) fs id specified\n");
  324.         return ret;
  325.     }
  326.  
  327.     /* look for zero byte operations */
  328.     if ((PINT_REQUEST_TOTAL_BYTES(mem_req) == 0) ||
  329.         (PINT_REQUEST_TOTAL_BYTES(file_req) == 0))
  330.     {
  331.         gossip_ldebug(GOSSIP_IO_DEBUG, "Warning: 0 byte I/O operation "
  332.                       "attempted.\n");
  333.         resp_p->total_completed = 0;
  334.         return 1; 
  335.     }
  336.  
  337.     PINT_smcb_alloc(&smcb, PVFS_SYS_IO,
  338.              sizeof(struct PINT_client_sm),
  339.              client_op_state_get_machine,
  340.              client_state_machine_terminate,
  341.              pint_client_sm_context);
  342.     if (smcb == NULL)
  343.     {
  344.         return -PVFS_ENOMEM;
  345.     }
  346.     sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  347.  
  348.     PINT_init_msgarray_params(sm_p, ref.fs_id);
  349.     PINT_init_sysint_credentials(sm_p->cred_p, credentials);
  350.     sm_p->u.io.io_type = io_type;
  351.     sm_p->u.io.file_req = file_req;
  352.     sm_p->u.io.file_req_offset = file_req_offset;
  353.     sm_p->u.io.io_resp_p = resp_p;
  354.     sm_p->u.io.mem_req = mem_req;
  355.     sm_p->u.io.buffer = buffer; 
  356.     sm_p->u.io.flowproto_type = cur_fs->flowproto;
  357.     sm_p->u.io.encoding = cur_fs->encoding;
  358.     sm_p->u.io.stored_error_code = 0;
  359.     sm_p->u.io.retry_count = 0;
  360.     sm_p->msgarray_op.msgarray = NULL;
  361.     sm_p->msgarray_op.count = 0;
  362.     sm_p->u.io.datafile_index_array = NULL;
  363.     sm_p->u.io.datafile_count = 0;
  364.     sm_p->u.io.total_size = 0;
  365.     sm_p->u.io.small_io = 0;
  366.     sm_p->object_ref = ref;
  367.     PVFS_hint_copy(hints, &sm_p->hints);
  368.     PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &ref.handle);
  369.  
  370.     return PINT_client_state_machine_post(
  371.         smcb,  op_id, user_ptr);
  372. }
  373.  
  374. /** Perform a read or write operation.
  375.  *
  376.  *  \param type specifies if the operation is a read or write.
  377.  */
  378. PVFS_error PVFS_sys_io(
  379.     PVFS_object_ref ref,
  380.     PVFS_Request file_req,
  381.     PVFS_offset file_req_offset,
  382.     void *buffer,
  383.     PVFS_Request mem_req,
  384.     const PVFS_credentials *credentials,
  385.     PVFS_sysresp_io *resp_p,
  386.     enum PVFS_io_type io_type,
  387.     PVFS_hint hints)
  388. {
  389.     PVFS_error ret = -PVFS_EINVAL, error = 0;
  390.     PVFS_sys_op_id op_id;
  391.  
  392.     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_io entered\n");
  393.  
  394.     ret = PVFS_isys_io(ref, file_req, file_req_offset, buffer, mem_req,
  395.                        credentials, resp_p, io_type, &op_id, hints, NULL);
  396.     if (ret == 1)
  397.         return 0;
  398.     else if (ret < 0)
  399.     {
  400.         PVFS_perror_gossip("PVFS_isys_io call", ret);
  401.         error = ret;
  402.     }
  403.     else 
  404.     {
  405.         ret = PVFS_sys_wait(op_id, "io", &error);
  406.         if (ret)
  407.         {
  408.             PVFS_perror_gossip("PVFS_sys_wait call", ret);
  409.             error = ret;
  410.         }
  411.         PINT_sys_release(op_id);
  412.     } 
  413.  
  414.     return error;
  415. }
  416.  
  417. /*******************************************************************/
  418.  
  419. static PINT_sm_action io_init(
  420.         struct PINT_smcb *smcb, job_status_s *js_p)
  421. {
  422.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  423.     job_id_t tmp_id;
  424.  
  425.     gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: io_init\n", sm_p);
  426.  
  427.     assert((js_p->error_code == 0) ||
  428.            (js_p->error_code == IO_RETRY));
  429.  
  430.     PINT_SM_GETATTR_STATE_FILL(
  431.         sm_p->getattr,
  432.         sm_p->object_ref,
  433.         IO_ATTR_MASKS, 
  434.         PVFS_TYPE_METAFILE,
  435.         0);
  436.        
  437.     if (js_p->error_code == IO_RETRY ||
  438.         (js_p->error_code == IO_RETRY_NODELAY))
  439.     {
  440.         js_p->error_code = 0;
  441.  
  442.         io_datafile_index_array_destroy(sm_p);
  443.         io_contexts_destroy(sm_p);
  444.  
  445.         if (PINT_smcb_cancelled(smcb))
  446.         {
  447.             js_p->error_code = -PVFS_ECANCEL;
  448.             return SM_ACTION_COMPLETE;
  449.         }
  450.  
  451.         if(js_p->error_code == IO_RETRY_NODELAY)
  452.         {
  453.             gossip_debug(GOSSIP_IO_DEBUG, "  sys-io retrying without delay.\n");
  454.             js_p->error_code = 0;
  455.             return 1;
  456.         }
  457.         gossip_debug(GOSSIP_IO_DEBUG, "  sys-io retrying with delay.\n");
  458.         return job_req_sched_post_timer(
  459.             sm_p->msgarray_op.params.retry_delay, smcb, 0, js_p, &tmp_id,
  460.             pint_client_sm_context);
  461.     }
  462.     return SM_ACTION_COMPLETE;
  463. }
  464.  
  465. static PINT_sm_action io_inspect_attr(
  466.         struct PINT_smcb *smcb, job_status_s *js_p)
  467. {
  468.  
  469.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  470.     if (PINT_smcb_cancelled(smcb))
  471.     {
  472.         js_p->error_code = -PVFS_ECANCEL;
  473.         return SM_ACTION_COMPLETE;
  474.     }
  475.  
  476.     /* determine if we need to unstuff or not to service this request */
  477.     js_p->error_code = unstuff_needed(
  478.         sm_p->u.io.mem_req,
  479.         sm_p->u.io.file_req_offset,
  480.         sm_p->getattr.attr.u.meta.dist,
  481.         sm_p->getattr.attr.mask,
  482.         sm_p->u.io.io_type);
  483.     return(SM_ACTION_COMPLETE);
  484. }
  485.  
  486. static PINT_sm_action io_unstuff_setup_msgpair(
  487.         struct PINT_smcb *smcb, job_status_s *js_p)
  488. {
  489.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  490.     int ret = -PVFS_EINVAL;
  491.     PINT_sm_msgpair_state *msg_p = NULL;
  492.  
  493.     PINT_msgpair_init(&sm_p->msgarray_op);
  494.     msg_p = &sm_p->msgarray_op.msgpair;
  495.  
  496.     if(js_p->error_code == IO_UNSTUFF)
  497.     {
  498.         /* note that unstuff must request the same attr mask that we requested
  499.          * earlier.  If the file has already been unstuffed then we need an 
  500.          * updated authoritative copy of all of the attrs relevant to I/O.
  501.          */
  502.         PINT_SERVREQ_UNSTUFF_FILL(
  503.                 msg_p->req,
  504.                 (*sm_p->cred_p),
  505.                 sm_p->object_ref.fs_id,
  506.                 sm_p->object_ref.handle,
  507.                 IO_ATTR_MASKS);
  508.     }
  509.     else if(js_p->error_code == IO_GETATTR_SERVER)
  510.     {
  511.         PINT_SERVREQ_GETATTR_FILL(
  512.                 msg_p->req,
  513.                 (*sm_p->cred_p),
  514.                 sm_p->object_ref.fs_id,
  515.                 sm_p->object_ref.handle,
  516.                 IO_ATTR_MASKS,
  517.                 sm_p->hints);
  518.     }
  519.     else
  520.     {
  521.         assert(0);
  522.     }
  523.     js_p->error_code = 0;
  524.  
  525.     msg_p->fs_id = sm_p->object_ref.fs_id;
  526.     msg_p->handle = sm_p->object_ref.handle;
  527.     msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
  528.     msg_p->comp_fn = unstuff_comp_fn;
  529.  
  530.     ret = PINT_cached_config_map_to_server(
  531.             &msg_p->svr_addr,
  532.             msg_p->handle,
  533.             msg_p->fs_id);
  534.     if (ret)
  535.     {
  536.         gossip_err("Failed to map meta server address\n");
  537.         js_p->error_code = ret;
  538.     }
  539.  
  540.     PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
  541.     return SM_ACTION_COMPLETE;
  542. }
  543.  
  544.  
  545. static PINT_sm_action io_datafile_setup_msgpairs(
  546.         struct PINT_smcb *smcb, job_status_s *js_p)
  547. {
  548.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  549.     int ret = -PVFS_EINVAL, i = 0;
  550.     PVFS_object_attr *attr = NULL;
  551.     int target_datafile_count = 0;
  552.     int * sio_array;
  553.     int sio_count;
  554.  
  555.     gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
  556.                  "io_datafile_setup_msgpairs\n", sm_p);
  557.  
  558.     if (PINT_smcb_cancelled(smcb))
  559.     {
  560.         js_p->error_code = -PVFS_ECANCEL;
  561.         goto exit;
  562.     }
  563.  
  564.     js_p->error_code = 0;
  565.  
  566.     attr = &sm_p->getattr.attr;
  567.     assert(attr);
  568.  
  569.     switch(attr->objtype)
  570.     {
  571.         case PVFS_TYPE_METAFILE:
  572.  
  573.             assert(attr->mask & PVFS_ATTR_META_DFILES);
  574.             assert(attr->mask & PVFS_ATTR_META_DIST);
  575.             assert(attr->u.meta.dist_size > 0);
  576.             assert(attr->u.meta.dfile_array);
  577.             assert(attr->u.meta.dfile_count > 0);
  578.             if (attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
  579.             {
  580.                 assert(attr->u.meta.mirror_dfile_array);
  581.                 assert(attr->u.meta.mirror_copies_count);
  582.             }
  583.             break;
  584.         case PVFS_TYPE_DIRECTORY:
  585.             js_p->error_code = -PVFS_EISDIR;
  586.             goto exit;
  587.         default:
  588.             js_p->error_code = -PVFS_EBADF;
  589.             goto exit;
  590.     }
  591.     /* cannot write to an immutable file */
  592.     if (sm_p->u.io.io_type == PVFS_IO_WRITE
  593.         && (attr->u.meta.hint.flags & PVFS_IMMUTABLE_FL))
  594.     {
  595.         js_p->error_code = -PVFS_EPERM;
  596.         goto exit;
  597.     }
  598.     ret = PINT_dist_lookup(attr->u.meta.dist);
  599.     if (ret)
  600.     {
  601.         PVFS_perror_gossip("PINT_dist_lookup failed; aborting I/O", ret);
  602.         js_p->error_code = -PVFS_EBADF;
  603.         goto exit;
  604.     }
  605.  
  606.     ret = io_datafile_index_array_init(sm_p, attr->u.meta.dfile_count);
  607.     if(ret < 0)
  608.     {
  609.         js_p->error_code = ret;
  610.         goto error_exit;
  611.     }
  612.  
  613.     PINT_SM_DATAFILE_SIZE_ARRAY_INIT(
  614.         &sm_p->u.io.dfile_size_array, 
  615.         attr->u.meta.dfile_count);
  616.  
  617.     /* initialize the array of indexes to datafiles in the file request
  618.      * that have requests small enough to do small I/O 
  619.      * (pack data in unexpected message)
  620.      */
  621.     sio_array = malloc(sizeof(int) * attr->u.meta.dfile_count);
  622.     if(!sio_array)
  623.     {
  624.         js_p->error_code = -PVFS_ENOMEM;
  625.         goto datafile_index_array_destroy;
  626.     }
  627.    
  628.     ret = io_find_target_datafiles(
  629.         sm_p->u.io.mem_req,
  630.         sm_p->u.io.file_req,
  631.         sm_p->u.io.file_req_offset,
  632.         attr->u.meta.dist,
  633.         sm_p->getattr.object_ref.fs_id,
  634.         sm_p->u.io.io_type,
  635.         attr->u.meta.dfile_array,
  636.         attr->u.meta.dfile_count,
  637.         sm_p->u.io.datafile_index_array,
  638.         &target_datafile_count,
  639.         sio_array, 
  640.         &sio_count);
  641.     if(ret < 0)
  642.     {
  643.         js_p->error_code = ret;
  644.         goto sio_array_destroy;
  645.     }
  646.  
  647.     sm_p->u.io.datafile_count = target_datafile_count;
  648.  
  649.     if (target_datafile_count == 0)
  650.     {
  651.         gossip_debug(GOSSIP_IO_DEBUG, "  datafile_setup_msgpairs: no "
  652.                      "datafiles have data; aborting\n");
  653.  
  654.         js_p->error_code = IO_NO_DATA;
  655.         goto sio_array_destroy;
  656.     }
  657.  
  658.     gossip_debug(GOSSIP_IO_DEBUG,
  659.                  "  %s: %d datafiles "
  660.                  "might have data\n", __func__, target_datafile_count);
  661.  
  662.     /* look at sio_array and sio_count to see if there are any
  663.      * servers that we can do small I/O to, instead of setting up
  664.      * flows.  For now, we're going to stick with the semantics that
  665.      * small I/O is only done if all of the sizes for the target datafiles
  666.      * are small enough (sio_count == target_datafile_count).  This can
  667.      * be changed in the future, for example, if sio_count is some
  668.      * percentage of the target_datafile_count, then do small I/O to
  669.      * the sio_array servers, etc.
  670.      */
  671.     if(sio_count == target_datafile_count)
  672.     {
  673.         gossip_debug(GOSSIP_IO_DEBUG, "  %s: doing small I/O\n", __func__);
  674.  
  675.         sm_p->u.io.small_io = 1;
  676.         js_p->error_code = IO_DO_SMALL_IO;
  677.         goto sio_array_destroy;
  678.     }
  679.  
  680.     ret = io_contexts_init(sm_p, target_datafile_count, attr);
  681.     if(ret < 0)
  682.     {
  683.         js_p->error_code = ret;
  684.         goto sio_array_destroy;
  685.     }
  686.  
  687.     sm_p->u.io.total_cancellations_remaining = 0;
  688.  
  689.     /* initialize all per server I/O operation contexts and requests */
  690.     for(i = 0; i < target_datafile_count; i++)
  691.     {
  692.         gossip_debug(GOSSIP_IO_DEBUG, "  filling I/O request "
  693.                      "for %llu\n", llu(sm_p->u.io.contexts[i].data_handle));
  694.  
  695.         PINT_SERVREQ_IO_FILL(
  696.             sm_p->u.io.contexts[i].msg.req,
  697.             *sm_p->cred_p,
  698.             sm_p->object_ref.fs_id,
  699.             sm_p->u.io.contexts[i].data_handle,
  700.             sm_p->u.io.io_type,
  701.             sm_p->u.io.flowproto_type,
  702.             sm_p->u.io.datafile_index_array[i],
  703.             attr->u.meta.dfile_count,
  704.             attr->u.meta.dist,
  705.             sm_p->u.io.file_req,
  706.             sm_p->u.io.file_req_offset,
  707.             PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
  708.             sm_p->hints);
  709.     }
  710.  
  711.     js_p->error_code = 0;
  712.  
  713. sio_array_destroy:
  714.     free(sio_array);
  715.     
  716.     goto exit;
  717.  
  718. datafile_index_array_destroy:
  719.     io_datafile_index_array_destroy(sm_p);
  720. error_exit:
  721. exit:
  722.     return SM_ACTION_COMPLETE;
  723. }
  724.  
  725. /*
  726.   This is based on msgpairarray_post() in msgpairarray.c.  It's
  727.   different enough in that we don't have to wait on the msgpairarray
  728.   operations to all complete before posting flows as we can do so for each
  729.   server individually when we're ready.  this avoids the msgpairarray
  730.   sync point implicit in the design
  731. */
  732. static PINT_sm_action io_datafile_post_msgpairs(
  733.         struct PINT_smcb *smcb, job_status_s *js_p)
  734. {
  735.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  736.     int ret = -PVFS_EINVAL, i = 0;
  737.     unsigned long status_user_tag = 0;
  738.     int must_loop_encodings = 0;
  739.     struct server_configuration_s *server_config = NULL;
  740.  
  741.     gossip_debug(GOSSIP_CLIENT_DEBUG, "io_datafile_post_msgpairs "
  742.                  "state: post (%d message(s))\n", sm_p->u.io.datafile_count);
  743.  
  744.     if (PINT_smcb_cancelled(smcb))
  745.     {
  746.         js_p->error_code = -PVFS_ECANCEL;
  747.         return SM_ACTION_COMPLETE;
  748.     }
  749.  
  750.     js_p->error_code = 0;
  751.  
  752.     /* completion count tracks sends/recvs separately, will increment
  753.      * as we go through the loop to maintain a count of outstanding msgpairs */
  754.     sm_p->u.io.msgpair_completion_count = 0;
  755.  
  756.     for(i = 0; i < sm_p->u.io.context_count; i++)
  757.     {
  758.         PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
  759.         PINT_sm_msgpair_state *msg = &cur_ctx->msg;
  760.  
  761.         /* do not do this one again in retry case */
  762.         if (cur_ctx->msg_recv_has_been_posted &&
  763.             cur_ctx->msg_recv_in_progress)
  764.         {
  765.             ++sm_p->u.io.msgpair_completion_count;
  766.             goto recv_already_posted;
  767.         }
  768.  
  769.         if (!ENCODING_IS_VALID(sm_p->u.io.encoding))
  770.         {
  771.             PRINT_ENCODING_ERROR("supported", sm_p->u.io.encoding);
  772.             must_loop_encodings = 1;
  773.             sm_p->u.io.encoding = (ENCODING_INVALID_MIN + 1);
  774.         }
  775.         else if (!ENCODING_IS_SUPPORTED(sm_p->u.io.encoding))
  776.         {
  777.             PRINT_ENCODING_ERROR("supported", sm_p->u.io.encoding);
  778.             must_loop_encodings = 1;
  779.             sm_p->u.io.encoding = ENCODING_SUPPORTED_MIN;
  780.         }
  781.  
  782.       try_next_encoding:
  783.         assert(ENCODING_IS_VALID(sm_p->u.io.encoding));
  784.  
  785.         ret = PINT_encode(&msg->req, PINT_ENCODE_REQ, &msg->encoded_req,
  786.                           msg->svr_addr, sm_p->u.io.encoding);
  787.         if (ret)
  788.         {
  789.             if (must_loop_encodings)
  790.             {
  791.                 gossip_debug(GOSSIP_CLIENT_DEBUG, "Looping through "
  792.                              "encodings [%d/%d]\n", sm_p->u.io.encoding,
  793.                              ENCODING_INVALID_MAX);
  794.  
  795.                 sm_p->u.io.encoding++;
  796.                 if (ENCODING_IS_VALID(sm_p->u.io.encoding))
  797.                 {
  798.                     goto try_next_encoding;
  799.                 }
  800.             }     
  801.             /*
  802.               FIXME: make this a clean error transition by adjusting
  803.               the completion count and/or (not) exiting
  804.             */
  805.             /* If one of the msgpairs gets this type of error, then the entire
  806.              * request should be aborted. Becky Ligon.
  807.             */
  808.             PVFS_perror_gossip("PINT_encode failed", ret);
  809.             sm_p->u.io.stored_error_code = ret;
  810.             js_p->error_code = IO_FATAL_ERROR;
  811.             return SM_ACTION_COMPLETE;
  812.         }
  813.  
  814.         /* calculate maximum response message size and allocate it */
  815.         msg->max_resp_sz = PINT_encode_calc_max_size(
  816.             PINT_ENCODE_RESP, msg->req.op, sm_p->u.io.encoding);
  817.         msg->encoded_resp_p = BMI_memalloc(
  818.             msg->svr_addr, msg->max_resp_sz, BMI_RECV);
  819.         if (!msg->encoded_resp_p)
  820.         {
  821.             /* FIXME: see above FIXME */
  822.             sm_p->u.io.stored_error_code = -PVFS_ENOMEM;
  823.             js_p->error_code = IO_FATAL_ERROR;
  824.             return SM_ACTION_COMPLETE;
  825.         }
  826.  
  827.         /*
  828.           recalculate the status user tag based on this the progress
  829.           of the current context like this: status_user_tag = (4 *
  830.           (context index) + context phase)
  831.         */
  832.         assert(cur_ctx->index == i);
  833.         status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_RECV);
  834.  
  835.         gossip_debug(GOSSIP_IO_DEBUG," posting recv with "
  836.                      "status_user_tag=%lu (max_size %d)\n",
  837.                      status_user_tag, msg->max_resp_sz);
  838.  
  839.         cur_ctx->session_tag = PINT_util_get_next_tag();
  840.         cur_ctx->msg_recv_has_been_posted = 0;
  841.         cur_ctx->msg_recv_in_progress = 0;
  842.  
  843.         server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
  844.         ret = job_bmi_recv(
  845.             msg->svr_addr, msg->encoded_resp_p, msg->max_resp_sz,
  846.             cur_ctx->session_tag, BMI_PRE_ALLOC, smcb, status_user_tag,
  847.             &msg->recv_status, &msg->recv_id, pint_client_sm_context,
  848.             server_config->client_job_bmi_timeout, sm_p->hints);
  849.         PINT_put_server_config_struct(server_config);
  850.  
  851.         /* ret -1: problem, do not look at msg recv_status */
  852.         /* ret 1: immediate completion, see status */
  853.         /* ret 0: okay */
  854.  
  855.         if (ret < 0) {
  856.             PVFS_perror_gossip("Post of receive failed", ret);
  857.             js_p->error_code = ret;
  858.             continue;
  859.  
  860.         }
  861.  
  862.         if (ret == 0) {
  863.             int tmp = 0;
  864.             /* perform a quick test to see if the recv failed before
  865.              * posting the send; if it reports an error quickly then
  866.              * we can save the confusion of sending a request for
  867.              * which we can't recv a response
  868.              */
  869.             ret = job_test(msg->recv_id, &tmp, NULL,
  870.                            &msg->recv_status, 0,
  871.                            pint_client_sm_context);
  872.             if (ret < 0) {
  873.                 PVFS_perror_gossip("Post of receive failed", ret);
  874.                 js_p->error_code = ret;
  875.                 continue;
  876.             }
  877.         }
  878.  
  879.         /* either from job_bmi_recv or from job_test finding something */
  880.         if (ret == 1) {
  881.             /*
  882.              * This recv must have completed with an error because the
  883.              * server has not yet been sent our request.
  884.              */
  885.             PVFS_perror_gossip("Receive immediately failed",
  886.                                msg->recv_status.error_code);
  887.  
  888.             //ret = msg->recv_status.error_code;
  889.             js_p->error_code = IO_RETRY;
  890.             continue;
  891.         }
  892.  
  893.         cur_ctx->msg_recv_has_been_posted = 1;
  894.         cur_ctx->msg_recv_in_progress = 1;
  895.  
  896.         /* posted the receive okay */
  897.         ++sm_p->u.io.msgpair_completion_count;
  898.  
  899.       recv_already_posted:
  900.  
  901.         if (cur_ctx->msg_send_has_been_posted &&
  902.             cur_ctx->msg_send_in_progress)
  903.         {
  904.             ++sm_p->u.io.msgpair_completion_count;
  905.             continue;
  906.         }
  907.  
  908.         status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_SEND);
  909.  
  910.         cur_ctx->msg_send_has_been_posted = 0;
  911.         cur_ctx->msg_send_in_progress = 0;
  912.  
  913.         gossip_debug(GOSSIP_IO_DEBUG," posting send with "
  914.                      "status_user_tag=%lu\n", status_user_tag);
  915.  
  916.         server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
  917.         ret = job_bmi_send_list(
  918.             msg->encoded_req.dest, msg->encoded_req.buffer_list,
  919.             msg->encoded_req.size_list, msg->encoded_req.list_count,
  920.             msg->encoded_req.total_size, cur_ctx->session_tag,
  921.             msg->encoded_req.buffer_type, 1, smcb, status_user_tag,
  922.             &msg->send_status, &msg->send_id, pint_client_sm_context,
  923.             server_config->client_job_bmi_timeout, sm_p->hints);
  924.         PINT_put_server_config_struct(server_config);
  925.  
  926.         if (ret < 0) {
  927.             PVFS_perror_gossip("Post of send failed, cancelling recv", ret);
  928.             msg->op_status = msg->send_status.error_code;
  929.             msg->send_id = 0;
  930.             job_bmi_cancel(msg->recv_id, pint_client_sm_context);
  931.  
  932.             js_p->error_code = ret;
  933.             continue;
  934.         }
  935.  
  936.         if (ret == 1) {
  937.             if (msg->send_status.error_code == 0) {
  938.                 gossip_debug(GOSSIP_IO_DEBUG, "  io_datafile_post_msgpairs: "
  939.                     "send completed immediately.\n");
  940.  
  941.                 /* 0 is the valid "completed job id" value */
  942.                 cur_ctx->msg_send_has_been_posted = 1;
  943.                 msg->send_id = 0;
  944.  
  945.             } else {
  946.                 PVFS_perror_gossip("Send immediately failed, cancelling recv",
  947.                     msg->recv_status.error_code);
  948.                 msg->op_status = msg->send_status.error_code;
  949.                 msg->send_id = 0;
  950.  
  951.                 /* still wait for the recv to complete */
  952.                 job_bmi_cancel(msg->recv_id, pint_client_sm_context);
  953.  
  954.                 js_p->error_code = msg->send_status.error_code;
  955.                 continue;
  956.             }
  957.         } else {
  958.             /* posted the send */
  959.             cur_ctx->msg_send_in_progress = 1;
  960.             cur_ctx->msg_send_has_been_posted = 1;
  961.             ++sm_p->u.io.msgpair_completion_count;
  962.         }
  963.     }/*end for*/
  964.  
  965.     gossip_debug(GOSSIP_IO_DEBUG, "io_datafile_post_msgpairs: "
  966.                  "completion count is %d\n",
  967.                  sm_p->u.io.msgpair_completion_count);
  968.  
  969.  
  970.     /* if anything posted, just wait for that to complete, else
  971.      * go sleep then try the remaining msgpairs again */
  972.     if (sm_p->u.io.msgpair_completion_count
  973.      || sm_p->u.io.flow_completion_count
  974.      || sm_p->u.io.write_ack_completion_count)
  975.         return SM_ACTION_DEFERRED;  /* means go find another machine to run */
  976.     else {
  977.         js_p->error_code = IO_RETRY;
  978.         return SM_ACTION_COMPLETE;  /* means look at error_code and run my */
  979.                                     /* machine again. */
  980.     }
  981. }
  982.  
  983. /*
  984.  * For IO retry, come here to sleep a bit then go back and post
  985.  * some more msgpairs.  If mirroring, then we have more setup before a
  986.  * retry can happen.  Also, the retry-count is calculated differently.
  987.  */
  988. static int io_datafile_post_msgpairs_retry (struct PINT_smcb *smcb
  989.                                           ,job_status_s *js_p)
  990. {
  991.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  992.     PVFS_object_attr *attr = &(sm_p->getattr.attr);
  993.     struct PINT_client_io_sm *io = &(sm_p->u.io);
  994.  
  995.     gossip_debug(GOSSIP_IO_DEBUG,"Executing io_datafile_post_msgpairs_retry...\n");
  996.  
  997.     /* Are we mirroring on a READ request? */
  998.     if ( (attr->mask & PVFS_ATTR_META_MIRROR_DFILES) &&
  999.          io->io_type == PVFS_IO_READ )
  1000.     {  
  1001.          js_p->error_code = IO_MIRRORING;
  1002.          return SM_ACTION_COMPLETE;
  1003.     }
  1004.  
  1005.     js_p->error_code = IO_NO_MIRRORING;
  1006.     return SM_ACTION_COMPLETE;
  1007. }
  1008.  
  1009.  
  1010. static int io_datafile_mirror_retry(struct PINT_smcb *smcb
  1011.                                    ,job_status_s *js_p )
  1012. {
  1013.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s...\n",__func__);
  1014.    gossip_debug(GOSSIP_IO_DEBUG,"Executing io_datafile_mirror_retry...\n");
  1015.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1016.     PVFS_object_attr *attr = &(sm_p->getattr.attr);
  1017.     PVFS_metafile_attr *meta = &(attr->u.meta);
  1018.     struct PINT_client_io_sm *io = &(sm_p->u.io);
  1019.     PINT_sm_msgpair_state *msg = NULL;
  1020.     PINT_client_io_ctx *ctx = NULL;
  1021.     uint32_t index = 0;
  1022.     uint32_t copies = 0;
  1023.     int i,j,ret;
  1024.     char *enc_req_bytes = NULL;
  1025.  
  1026.     /* Have we exhausted the number of retries */
  1027.     if (io->retry_count >= sm_p->msgarray_op.params.retry_limit)
  1028.     {
  1029.         js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
  1030.         return SM_ACTION_COMPLETE;
  1031.     }
  1032.  
  1033.     /* Find failed contexts and prepare them for retry.
  1034.     */
  1035.     for (i=0; i<io->context_count; i++)
  1036.     {
  1037.         ctx = &(io->contexts[i]);
  1038.         msg = &(ctx->msg);
  1039.         if (ctx->msg_recv_has_been_posted && ctx->msg_send_has_been_posted)
  1040.            /* this context has not failed. */
  1041.            continue;
  1042.  
  1043.         /* cleanup the failed context */
  1044.         enc_req_bytes = (char *)&(msg->encoded_req);
  1045.         for (j=0; j<sizeof(msg->encoded_req); j++)
  1046.         {
  1047.            if (enc_req_bytes[j] != '\0')
  1048.            {
  1049.               PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
  1050.               break;
  1051.            }
  1052.         }/*end for*/
  1053.         if (msg->encoded_resp_p)
  1054.         {
  1055.            BMI_memfree(msg->svr_addr
  1056.                       ,msg->encoded_resp_p
  1057.                       ,msg->max_resp_sz
  1058.                       ,BMI_RECV);
  1059.         }
  1060.         memset(&(msg->encoded_req),0,sizeof(msg->encoded_req));
  1061.         memset(&(msg->svr_addr),0,sizeof(msg->svr_addr));
  1062.         msg->encoded_resp_p = NULL;
  1063.         
  1064.         /* use the primary data handle */
  1065.         if (ctx->retry_original)
  1066.         {
  1067.             /* setup context to retry the original */
  1068.            ctx->data_handle     = meta->dfile_array[ctx->server_nr];
  1069.            ctx->retry_original  =  0;
  1070.            msg->handle          = ctx->data_handle;
  1071.            msg->req.u.io.handle = ctx->data_handle;
  1072.            ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
  1073.                                                ,msg->handle
  1074.                                                ,msg->fs_id);
  1075.            if (ret)
  1076.            {
  1077.               gossip_lerr("Unable to determine the server address "
  1078.                           "for this handle (%llu)"
  1079.                           ,llu(msg->handle));
  1080.               js_p->error_code = ret;
  1081.               return SM_ACTION_COMPLETE;
  1082.            }
  1083.            PINT_flow_reset(&(ctx->flow_desc));
  1084.            continue;
  1085.         }
  1086.  
  1087.         /* get next mirrored handle.  note:  if a mirrored handle is zero, then
  1088.          * this means that the creation of this mirrored object failed for its
  1089.          * particular server.  if so, then get the next valid handle.  as a 
  1090.          * last resort, retry the original handle.
  1091.         */
  1092.         copies = ctx->current_copies_count;
  1093.         for (;copies < meta->mirror_copies_count; copies++)
  1094.         {
  1095.             index = (copies*meta->dfile_count) + ctx->server_nr;
  1096.             if (meta->mirror_dfile_array[index] != 0)
  1097.             {  /* we have found a valid mirrored handle */
  1098.                ctx->data_handle = meta->mirror_dfile_array[index];
  1099.                break;
  1100.             }
  1101.         }
  1102.  
  1103.         /* we have NOT found a valid mirrored handle, so retry the primary */
  1104.         if ( copies == meta->mirror_copies_count )
  1105.         {
  1106.            ctx->data_handle = meta->dfile_array[ctx->server_nr];
  1107.            ctx->retry_original = 0;
  1108.            ctx->current_copies_count = 0;
  1109.            io->retry_count++;
  1110.            /* setup context to retry original */
  1111.            msg->handle          = ctx->data_handle;
  1112.            msg->req.u.io.handle = ctx->data_handle;
  1113.            ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
  1114.                                                ,msg->handle
  1115.                                                ,msg->fs_id);
  1116.            if (ret)
  1117.            {
  1118.               gossip_lerr("Unable to determine the server address "
  1119.                           "for this handle (%llu)"
  1120.                           ,llu(msg->handle));
  1121.               js_p->error_code = ret;
  1122.               return SM_ACTION_COMPLETE;
  1123.            }
  1124.            PINT_flow_reset(&(ctx->flow_desc));
  1125.            continue;
  1126.         }
  1127.  
  1128.         /* setup the context for the discovered mirrored handle */
  1129.         msg->handle          = ctx->data_handle;
  1130.         msg->req.u.io.handle = ctx->data_handle;
  1131.         ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
  1132.                                             ,msg->handle
  1133.                                             ,msg->fs_id);
  1134.         if (ret)
  1135.         {
  1136.            gossip_lerr("Unable to determine the server address "
  1137.                        "for this handle (%llu)"
  1138.                        ,llu(msg->handle));
  1139.            js_p->error_code = ret;
  1140.            return SM_ACTION_COMPLETE;
  1141.         }
  1142.         PINT_flow_reset(&(ctx->flow_desc));
  1143.  
  1144.  
  1145.         /* setup for the NEXT io-retry event for this context */
  1146.         ctx->current_copies_count++;
  1147.         if ( ctx->current_copies_count == meta->mirror_copies_count )
  1148.         {/* we have gone through all of the mirrored handles, after this 
  1149.           * iteration executes; so, indicate original for the next retry event.
  1150.          */
  1151.             ctx->current_copies_count = 0;
  1152.             ctx->retry_original = 1;
  1153.             io->retry_count++;
  1154.         }
  1155.     }/*end for each context*/
  1156.  
  1157.     /* sleep a small while before starting the next round of retries */
  1158.     return (job_req_sched_post_timer(sm_p->msgarray_op.params.retry_delay
  1159.                                     ,smcb
  1160.                                     ,0
  1161.                                     ,js_p
  1162.                                     ,NULL
  1163.                                     ,pint_client_sm_context));
  1164.  
  1165. }/*end io_datafile_mirror_retry*/
  1166.  
  1167. static int io_datafile_no_mirror_retry(struct PINT_smcb *smcb
  1168.                                       ,job_status_s *js_p)
  1169. {
  1170.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1171.     /* give up if beyond retry limit */
  1172.     ++sm_p->u.io.retry_count;
  1173.     if (sm_p->u.io.retry_count > sm_p->msgarray_op.params.retry_limit) {
  1174.         gossip_debug(GOSSIP_CLIENT_DEBUG, "%s: retry %d exceeds limit %d\n"
  1175.                                         ,__func__
  1176.                                         ,sm_p->u.io.retry_count
  1177.                                         ,sm_p->msgarray_op.params.retry_delay);
  1178.         js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
  1179.         return SM_ACTION_COMPLETE;
  1180.     }
  1181.  
  1182.     gossip_debug(GOSSIP_CLIENT_DEBUG, "%s: retry %d, wait %d ms\n", __func__,
  1183.       sm_p->u.io.retry_count, sm_p->msgarray_op.params.retry_delay);
  1184.  
  1185.     return job_req_sched_post_timer(sm_p->msgarray_op.params.retry_delay,
  1186.         smcb, 0, js_p, NULL, pint_client_sm_context);
  1187. }
  1188.  
  1189.  
  1190. /*
  1191.   This state allows us to make sure all posted operations complete and
  1192.   are accounted for.  since this handles ALL operation completions,
  1193.   there's special case handling of completing the msgpair recv.  in
  1194.   this case we post the flow operations as soon as we see them (the
  1195.   main motivation for not using the common msgpairarray code).
  1196. */
  1197. static PINT_sm_action io_datafile_complete_operations(
  1198.         struct PINT_smcb *smcb, job_status_s *js_p)
  1199. {
  1200.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1201.     int ret = -PVFS_EINVAL, index = 0, i;
  1202.     unsigned long status_user_tag = (unsigned long)
  1203.         js_p->status_user_tag;
  1204.     PINT_client_io_ctx *cur_ctx = NULL;
  1205.     PVFS_object_attr * attr;
  1206.     int matched_send_or_recv = 0;
  1207.     struct server_configuration_s *server_config = NULL;
  1208.  
  1209.     gossip_debug(
  1210.         GOSSIP_CLIENT_DEBUG, "(%p) io_datafile_complete_operations "
  1211.         "(tag %lu)\n", sm_p, status_user_tag);
  1212.  
  1213.     assert(sm_p->u.io.msgpair_completion_count > -1);
  1214.     assert(sm_p->u.io.flow_completion_count > -1);
  1215.     assert(sm_p->u.io.write_ack_completion_count > -1);
  1216.  
  1217.     attr = &sm_p->getattr.attr;
  1218.     assert(attr);
  1219.  
  1220.     /* check if we're completing a send or recv msgpair */
  1221.     if (STATUS_USER_TAG_IS_SEND_OR_RECV(status_user_tag))
  1222.     {
  1223.         /*
  1224.          * The completion count might validly be zero when recovering from
  1225.          * a cancellation.
  1226.          */
  1227.         if (sm_p->u.io.msgpair_completion_count)
  1228.         {
  1229.             ret = io_complete_context_send_or_recv(smcb, js_p);
  1230.             if (ret < 0) {
  1231.                 /* problem */
  1232.                 PVFS_perror_gossip(
  1233.                     "io_complete_context_send_or_recv failed", ret);
  1234.                 js_p->error_code = ret;
  1235.                 return SM_ACTION_COMPLETE;
  1236.             } else if (ret == 0) {
  1237.                 /* is a send */
  1238.                 gossip_debug(GOSSIP_IO_DEBUG, "  matched send in context "
  1239.                              "%d; continuing.\n", index);
  1240.                 js_p->error_code = 0;
  1241.                 /* If send had problem, BMI will apparently ensure that the
  1242.                  * recv will fail too, so handle the retry stuff there.
  1243.                  */
  1244.                 return SM_ACTION_DEFERRED;
  1245.             } else {
  1246.                 /* is a recv */
  1247.                 assert(ret == IO_RECV_COMPLETED);
  1248.                 matched_send_or_recv = 1;
  1249.             }
  1250.         }
  1251.     }
  1252.  
  1253.     /* if we've just completed a recv above, process the receive
  1254.      * and post the flow if we're doing a read 
  1255.      */
  1256.     if (ret == IO_RECV_COMPLETED)
  1257.     {
  1258.         ret = io_process_context_recv(sm_p, js_p, &cur_ctx);
  1259.         if (ret < 0)
  1260.         {
  1261.             char buf[64] = {0};
  1262.             PVFS_strerror_r(ret, buf, 64);
  1263.  
  1264.             gossip_debug(GOSSIP_IO_DEBUG,
  1265.               "%s: io_process_context_recv failed: "
  1266.               "%s (%d remaining msgpairs)\n",
  1267.               __func__, buf, sm_p->u.io.msgpair_completion_count);
  1268.  
  1269.             js_p->error_code = ret;
  1270.             /* if recv failed, probably have to do the send again too */
  1271.             cur_ctx->msg_send_has_been_posted = 0;
  1272.             cur_ctx->msg_recv_has_been_posted = 0;
  1273.             goto check_next_step;
  1274.         }
  1275.  
  1276.         if(sm_p->u.io.io_type == PVFS_IO_WRITE)
  1277.         {
  1278.             /* we expect this write to _not_ succeed immediately, because we
  1279.              * have not posted the flow yet.
  1280.              */
  1281.             ret = io_post_write_ack_recv(smcb, cur_ctx);
  1282.             if(ret < 0)
  1283.             {
  1284.                 PVFS_perror_gossip("Post of write-ack recv failed", ret);
  1285.                 js_p->error_code = ret;
  1286.                 goto check_next_step;
  1287.             }
  1288.         }
  1289.  
  1290.         /* for now we wait to post the flow until we get back
  1291.          * the response from the server for both reads and writes
  1292.          */
  1293.         ret = io_post_flow(smcb, cur_ctx);
  1294.         if(ret < 0)
  1295.         {
  1296.             char buf[64] = {0};
  1297.             PVFS_strerror_r(ret, buf, 64);
  1298.  
  1299.             gossip_debug(GOSSIP_IO_DEBUG,
  1300.                          "%s: io_post_flow failed: "
  1301.                          "%s (%d remaining msgpairs)\n",
  1302.                          __func__, 
  1303.                          buf, 
  1304.                          sm_p->u.io.msgpair_completion_count);
  1305.  
  1306.             PVFS_perror_gossip("Flow post failed", ret);
  1307.             cur_ctx->msg_send_has_been_posted = 0;
  1308.             cur_ctx->msg_recv_has_been_posted = 0;
  1309.             js_p->error_code = ret;
  1310.             goto check_next_step;
  1311.         }
  1312.     }
  1313.  
  1314.     /* check if we've completed all msgpairs and posted all flows */
  1315.     if (matched_send_or_recv)
  1316.     {
  1317.         if (sm_p->u.io.msgpair_completion_count == 0)
  1318.         {
  1319.             gossip_debug(GOSSIP_IO_DEBUG, "*** all msgpairs complete "
  1320.                          "(all flows posted)\n");
  1321.         }
  1322.         else
  1323.         {
  1324.             gossip_debug(
  1325.                 GOSSIP_IO_DEBUG, "*** %d msgpair completions "
  1326.                 "pending\n", sm_p->u.io.msgpair_completion_count);
  1327.         }
  1328.     }
  1329.  
  1330.     /* at this point, we're either completing a flow or a write ack */
  1331.     if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FLOW))
  1332.     {
  1333.         assert(sm_p->u.io.flow_completion_count);
  1334.  
  1335.         index = STATUS_USER_TAG_GET_INDEX(
  1336.             status_user_tag, IO_SM_PHASE_FLOW);
  1337.         cur_ctx = &sm_p->u.io.contexts[index];
  1338.         assert(cur_ctx);
  1339.  
  1340.         cur_ctx->flow_status = *js_p;
  1341.  
  1342.         if (cur_ctx->write_ack_in_progress)
  1343.         {
  1344.             int ret = 0;
  1345.  
  1346.             assert(sm_p->u.io.write_ack_completion_count);
  1347.             server_config = PINT_get_server_config_struct(
  1348.                 sm_p->object_ref.fs_id);
  1349.             ret = job_reset_timeout(cur_ctx->write_ack.recv_id,
  1350.                 server_config->client_job_bmi_timeout);
  1351.             PINT_put_server_config_struct(server_config);
  1352.  
  1353.             /*
  1354.               allow -PVFS_EINVAL errors in case the recv has already
  1355.               completed (before we've processed it)
  1356.             */
  1357.             assert((ret == 0) || (ret == -PVFS_EINVAL));
  1358.         }
  1359.  
  1360.         gossip_debug(GOSSIP_IO_DEBUG, "  matched completed flow for "
  1361.                      "context %p%s\n", cur_ctx,
  1362.                      ((cur_ctx->write_ack_in_progress ?
  1363.  
  1364.                       " and reset write_recv timeout" : "")));
  1365.  
  1366.         cur_ctx->flow_in_progress = 0;
  1367.         sm_p->u.io.flow_completion_count--;
  1368.         assert(sm_p->u.io.flow_completion_count > -1);
  1369.  
  1370.         /* look for flow error when no write ack is in progress (usually a
  1371.          * read case) 
  1372.          */
  1373.         if (js_p->error_code < 0 && !cur_ctx->write_ack_in_progress) 
  1374.         {
  1375.             if ((PVFS_ERROR_CLASS(-js_p->error_code) == PVFS_ERROR_BMI) ||
  1376.                  (PVFS_ERROR_CLASS(-js_p->error_code) == PVFS_ERROR_FLOW) ||
  1377.                  (js_p->error_code == -ECONNRESET) || 
  1378.                  (js_p->error_code == -PVFS_EPROTO))
  1379.             {
  1380.                 /* if this is a an error that we can retry */
  1381.                 gossip_err(
  1382.                    "%s: flow failed, retrying from msgpair\n", __func__);
  1383.                 cur_ctx->msg_send_has_been_posted = 0;
  1384.                 cur_ctx->msg_recv_has_been_posted = 0;
  1385.             }
  1386.             else
  1387.             {
  1388.                 /* do not retry on remaining error codes */
  1389.                 gossip_err(
  1390.                    "%s: flow failed, not retrying\n", __func__);
  1391.  
  1392.                 /* forcing the count high insures that the sm won't restart */
  1393.                 sm_p->u.io.retry_count = sm_p->msgarray_op.params.retry_limit;
  1394.             }
  1395.  
  1396.         }
  1397.  
  1398.         /*To test fail-over uncomment the following. This will allow the code
  1399.          * to go through the retry state at least one time on a read operation.
  1400.         */
  1401.         //if (!cur_ctx->write_ack_in_progress && sm_p->u.io.retry_count==0)
  1402.         //{
  1403.         //   cur_ctx->msg_send_has_been_posted = 0;
  1404.         //   cur_ctx->msg_recv_has_been_posted = 0;
  1405.         //}
  1406.  
  1407.  
  1408.     }
  1409.     else if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FINAL_ACK))
  1410.     {
  1411.         assert(sm_p->u.io.write_ack_completion_count);
  1412.  
  1413.         index = STATUS_USER_TAG_GET_INDEX(
  1414.             status_user_tag, IO_SM_PHASE_FINAL_ACK);
  1415.         cur_ctx = &sm_p->u.io.contexts[index];
  1416.         assert(cur_ctx);
  1417.  
  1418.         assert(cur_ctx->write_ack.recv_status.actual_size <=
  1419.                cur_ctx->write_ack.max_resp_sz);
  1420.  
  1421.         cur_ctx->write_ack.recv_id = 0;
  1422.         cur_ctx->write_ack.recv_status = *js_p;
  1423.  
  1424.         gossip_debug(GOSSIP_IO_DEBUG, "  matched completed ack for "
  1425.                      "context %p\n", cur_ctx);
  1426.  
  1427.         cur_ctx->write_ack_in_progress = 0;
  1428.         sm_p->u.io.write_ack_completion_count--;
  1429.         assert(sm_p->u.io.write_ack_completion_count > -1);
  1430.  
  1431.         if (js_p->error_code < 0) {
  1432.             gossip_debug(GOSSIP_IO_DEBUG,
  1433.               "%s: write-ack failed, retrying from msgpair\n", __func__);
  1434.             cur_ctx->msg_send_has_been_posted = 0;
  1435.             cur_ctx->msg_recv_has_been_posted = 0;
  1436.         }
  1437.         else
  1438.         {
  1439.             /* if we successfully received an ack.  If the flow has _not_
  1440.              * finished, then we should go ahead and cancel it (the ack is
  1441.              * reporting an error, no point in waiting for flow timeout).  
  1442.              */
  1443.             if(cur_ctx->flow_in_progress != 0)
  1444.             {
  1445.                 job_flow_cancel(cur_ctx->flow_job_id,
  1446.                     pint_client_sm_context);
  1447.                 /* bump up the retry count to prevent the state machine from
  1448.                  * restarting after this error propigates
  1449.                  */
  1450.                 sm_p->u.io.retry_count = sm_p->msgarray_op.params.retry_limit;
  1451.             }
  1452.         }
  1453.     }
  1454.  
  1455.   check_next_step:
  1456.  
  1457.     /*
  1458.      * If something is pending, return SM_ACTION_DEFERRED to let SM find the
  1459.      * next thing to do.
  1460.      */
  1461.     if (sm_p->u.io.msgpair_completion_count
  1462.      || sm_p->u.io.flow_completion_count
  1463.      || sm_p->u.io.write_ack_completion_count) {
  1464.         if (PINT_smcb_cancelled(smcb))
  1465.             gossip_debug(GOSSIP_IO_DEBUG, "detected I/O cancellation with "
  1466.                          "%d flows and %d write acks pending\n",
  1467.                          sm_p->u.io.flow_completion_count,
  1468.                          sm_p->u.io.write_ack_completion_count);
  1469.         else
  1470.             gossip_debug(GOSSIP_IO_DEBUG, " %d flows pending, %d write acks "
  1471.                          "pending, %d msgpair\n", 
  1472.                          sm_p->u.io.flow_completion_count,
  1473.                          sm_p->u.io.write_ack_completion_count,
  1474.                          sm_p->u.io.msgpair_completion_count);
  1475.         return SM_ACTION_DEFERRED;
  1476.     }
  1477.  
  1478.     /*
  1479.      * Else either we've finished it all or have some msgpairs to retry
  1480.      * that failed earlier.
  1481.      */
  1482.     for (i=0; i < sm_p->u.io.datafile_count; i++) {
  1483.         PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
  1484.         if (!cur_ctx->msg_recv_has_been_posted)
  1485.             break;
  1486.         if (!cur_ctx->msg_send_has_been_posted)
  1487.             break;
  1488.     }
  1489.     if (i < sm_p->u.io.datafile_count && !PINT_smcb_cancelled(smcb)) {
  1490.         gossip_debug(GOSSIP_IO_DEBUG,
  1491.           "*** %s: some msgpairs to repost\n", __func__);
  1492.         js_p->error_code = IO_RETRY;
  1493.     } else {
  1494.         gossip_debug(GOSSIP_IO_DEBUG, "*** all operations %s "
  1495.                      "(msgpairs, flows, write acks)\n",
  1496.                      (PINT_smcb_cancelled(smcb) ? "cancelled" : "completed"));
  1497.         js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
  1498.     }
  1499.     return SM_ACTION_COMPLETE;
  1500. }
  1501.  
  1502. static PINT_sm_action io_analyze_results(
  1503.         struct PINT_smcb *smcb, job_status_s *js_p)
  1504. {
  1505.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1506.     int ret = -PVFS_EINVAL, i = 0;
  1507.     PVFS_object_attr *attr;
  1508.  
  1509.     gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
  1510.                  "io_analyze_results\n", sm_p);
  1511.  
  1512.     attr = &sm_p->getattr.attr;
  1513.  
  1514.     /* I/O op failed or cancelled if the transfers didn't complete or
  1515.      * the error code is non-zero when returning from small I/O sm
  1516.      */
  1517.     if (js_p->error_code != IO_DATAFILE_TRANSFERS_COMPLETE &&
  1518.         (js_p->error_code == 0 && !sm_p->u.io.small_io))
  1519.     {
  1520.         ret = (sm_p->u.io.stored_error_code ?
  1521.                sm_p->u.io.stored_error_code :
  1522.                js_p->error_code);
  1523.  
  1524.         if (ret == 0)
  1525.         {
  1526.             ret = (PINT_smcb_cancelled(smcb) ? -PVFS_ECANCEL : -PVFS_EIO);
  1527.         }
  1528.     }
  1529.     else if (!PINT_smcb_cancelled(smcb))
  1530.     {
  1531.         /*
  1532.          * look through all the contexts for errors, saving the first
  1533.          * one to return (if any) while adding up the size of the
  1534.          * transfer (in case things actually completed).
  1535.          *
  1536.          * in the case of small I/O, we don't have to add up the size
  1537.          * based on the contexts (which weren't initialized), since the
  1538.          * small I/O state machine sets the total_size directly
  1539.          */
  1540.  
  1541.         ret = 0;
  1542.         if(!sm_p->u.io.small_io)
  1543.         {
  1544.             for(i = 0; i < sm_p->u.io.context_count; i++)
  1545.             {
  1546.                 PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
  1547.                 assert(cur_ctx);
  1548.  
  1549.                 ret = io_check_context_status(
  1550.                     cur_ctx, sm_p->u.io.io_type, &sm_p->u.io.total_size);
  1551.                 if (ret < 0)
  1552.                 {
  1553.                     if (ret == -PVFS_ECANCEL)
  1554.                     {
  1555.                         gossip_debug(GOSSIP_IO_DEBUG, "*** I/O operation "
  1556.                                      "cancelled\n");
  1557.                     }
  1558.                     else
  1559.                     {
  1560.                         gossip_debug(GOSSIP_IO_DEBUG, 
  1561.                             "io_check_context_status found error: %d", ret);
  1562.                     }
  1563.                     break;
  1564.                 }
  1565.  
  1566.                 gossip_debug(
  1567.                     GOSSIP_IO_DEBUG, "[%d/%d] running size is %lld\n",
  1568.                     (i + 1), sm_p->u.io.datafile_count,
  1569.                     lld(sm_p->u.io.total_size));
  1570.             }
  1571.             gossip_debug(GOSSIP_IO_DEBUG, "[%d/%d] running size is %lld\n",
  1572.                 (i + 1), sm_p->u.io.datafile_count,
  1573.                 lld(sm_p->u.io.total_size));
  1574.         }
  1575.  
  1576.         /*
  1577.           at this point, we may know an error occurred.  if we
  1578.           couldn't find any errors in the context, use the preserved
  1579.           error code from the complete_operations state (which may be
  1580.           success)
  1581.         */
  1582.         if (ret == 0)
  1583.         {
  1584.             char buf[64] = {0};
  1585.  
  1586.             ret = (PINT_smcb_cancelled(smcb) ? -PVFS_ECANCEL :
  1587.                    sm_p->u.io.stored_error_code);
  1588.  
  1589.             PVFS_strerror_r(ret, buf, 64);
  1590.             gossip_debug(GOSSIP_IO_DEBUG, "no context errors found; "
  1591.                          "using: %s\n", buf);
  1592.         }
  1593.     }
  1594.     else
  1595.     {
  1596.         ret = (PINT_smcb_cancelled(smcb) ? -PVFS_ECANCEL : -PVFS_EIO);
  1597.     }
  1598.  
  1599.     /* be sure there are no jobs still laying around */
  1600.     assert((sm_p->u.io.msgpair_completion_count == 0) &&
  1601.            (sm_p->u.io.flow_completion_count == 0) &&
  1602.            (sm_p->u.io.write_ack_completion_count == 0));
  1603.  
  1604.     /*
  1605.       FIXME: non bmi errors pop out in flow failures above -- they are
  1606.       not properly marked as flow errors either, so we check for them
  1607.       explicitly here (but not all -- fix it for real).
  1608.     */
  1609.     if (((PVFS_ERROR_CLASS(-ret) == PVFS_ERROR_BMI) ||
  1610.          (PVFS_ERROR_CLASS(-ret) == PVFS_ERROR_FLOW) ||
  1611.          (ret == -ECONNRESET) || (ret == -PVFS_EPROTO)) &&
  1612.         (sm_p->u.io.retry_count < sm_p->msgarray_op.params.retry_limit))
  1613.     {
  1614.         assert(!PINT_smcb_cancelled(smcb));
  1615.  
  1616.         sm_p->u.io.stored_error_code = 0;
  1617.         sm_p->u.io.total_size = 0;  /* start from the beginning again */
  1618.         sm_p->u.io.retry_count++;
  1619.  
  1620.         gossip_debug(GOSSIP_IO_DEBUG, "Retrying I/O operation "
  1621.                      "(attempt number %d)\n", sm_p->u.io.retry_count);
  1622.  
  1623.         if(ret == -BMI_ECANCEL)
  1624.         {
  1625.             /* if we got a BMI cancellation, then it probably indicates a
  1626.              * that a BMI timeout has expired; we should retry without
  1627.              * introducing another delay
  1628.              */
  1629.             js_p->error_code = IO_RETRY_NODELAY;
  1630.         }
  1631.         else
  1632.         {
  1633.             js_p->error_code = IO_RETRY;
  1634.         }
  1635.         goto analyze_results_exit;
  1636.     }
  1637.  
  1638.     /* all other errors we should just propigate immediately */
  1639.     if(ret != 0)
  1640.     {
  1641.         js_p->error_code = ret;
  1642.         goto analyze_results_exit;
  1643.     }
  1644.  
  1645.     gossip_debug(GOSSIP_IO_DEBUG, "total bytes transferred is %lld\n",
  1646.                  lld(sm_p->u.io.total_size));
  1647.  
  1648.     if(sm_p->u.io.io_type == PVFS_IO_WRITE)
  1649.     {
  1650.         js_p->error_code = 0;
  1651.         sm_p->u.io.io_resp_p->total_completed = sm_p->u.io.total_size;
  1652.  
  1653.         /* we don't know if the file size has changed here so we invalidate
  1654.          * the size in the attribute cache.  The only sure-fire way to
  1655.          * recompute the file size (if our write was past eof) is to
  1656.          * get all the datafile sizes (we could have written to what was
  1657.          * previously a hole).  That's too expensive for just a cached
  1658.          * size update.
  1659.          */
  1660.         PINT_acache_invalidate_size(sm_p->object_ref);
  1661.  
  1662.         /* we can skip the check for holes since its only needed in the
  1663.          * case of reads
  1664.          */
  1665.         goto analyze_results_exit;
  1666.     }
  1667.  
  1668.     /* In order to give the sysint caller the correct value for length
  1669.      * of bytes read, we have to check for holes in the logical file.  
  1670.      * The algorithm is as follows:
  1671.      *
  1672.      * 1. If the size of the memory request is equivalent to the number of
  1673.      * bytes read, we know there are no holes and the total size
  1674.      * is the correct value to return back to the caller.
  1675.      *
  1676.      * 2. If 1. is false, either there's a hole in the file within the
  1677.      * region of the file request, or the request is past EOF.  If the request
  1678.      * is NOT past EOF, then the return value for length of bytes read
  1679.      * is equivalent to the size of the memory request.  To check that the
  1680.      * request is not past EOF, we iterate through the target datafiles,
  1681.      * calculate the logical file offsets of each based on the their physical
  1682.      * bstream size, looking for a logical offset that is >= the upper bound
  1683.      * of the memory request.  If we find a target datafile that matches this
  1684.      * criteria, we know that the request is not past EOF, and that
  1685.      * returned bytes read is equivalent to the size of the memory request.
  1686.      *
  1687.      * 3. If none of the target datafile logical offsets are >= the upper
  1688.      * bound of the file request, we still must check all the datafiles
  1689.      * that are beyond the last target datafile.  To do this we have to
  1690.      * go and get the sizes of each one, and perform the above comparison
  1691.      * on them as well.  Again, if one of their logical offsets >= the upper
  1692.      * bound of the file request, we know the returned bytes read is the
  1693.      * size of the file request.  
  1694.      *
  1695.      * 4. If we don't find any datafiles with 
  1696.      * logical offset >= the upper bound of the file request, the returned
  1697.      * bytes read value is the size of the file request minus the last offset
  1698.      * of the datafiles (where the EOF occurs).
  1699.      */
  1700.     if(sm_p->u.io.total_size == PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req))
  1701.     {
  1702.         sm_p->u.io.io_resp_p->total_completed = sm_p->u.io.total_size;
  1703.     }
  1704.     else
  1705.     {
  1706.         PVFS_offset eor; /* end-of-request */
  1707.         PVFS_offset max_datafile_logical_offset;
  1708.         PVFS_offset filereq_ub_offset;
  1709.  
  1710.         /* compute the upper bound of the file request (based on the
  1711.          * memory request.  This is the logical offset used to compare 
  1712.          * against all the datafile logical offsets
  1713.          */
  1714.         ret = io_find_offset(
  1715.             sm_p, 
  1716.             PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req), 
  1717.             &filereq_ub_offset);
  1718.         if(ret)
  1719.         {
  1720.            js_p->error_code = ret;
  1721.            goto analyze_results_exit;
  1722.         }
  1723.  
  1724.         eor = filereq_ub_offset + sm_p->u.io.file_req_offset;
  1725.  
  1726.         max_datafile_logical_offset = attr->u.meta.dist->methods->
  1727.             logical_file_size(
  1728.                 attr->u.meta.dist->params,
  1729.                 attr->u.meta.dfile_count,
  1730.                 sm_p->u.io.dfile_size_array);
  1731.         if(max_datafile_logical_offset > eor)
  1732.         {
  1733.             eor = max_datafile_logical_offset;
  1734.  
  1735.             /* we found a logical offset that is past the end of the
  1736.              * request, so we know the request is not past EOF.
  1737.              *
  1738.              * In this case, we don't need to change the size of the
  1739.              * file stored in the attribute cache
  1740.              */
  1741.             sm_p->u.io.io_resp_p->total_completed = 
  1742.                 PINT_REQUEST_TOTAL_BYTES(
  1743.                     sm_p->u.io.mem_req);
  1744.  
  1745.             
  1746.             ret = io_zero_fill_holes(sm_p, eor,
  1747.                                      sm_p->u.io.datafile_count,
  1748.                                      sm_p->u.io.dfile_size_array,
  1749.                                      sm_p->u.io.datafile_index_array);
  1750.             if(ret < 0)
  1751.             {
  1752.                 js_p->error_code = ret;
  1753.                 goto analyze_results_exit;
  1754.             }
  1755.         }
  1756.         else
  1757.         {
  1758.             /* if we fail to find a datafile that matches, we still don't
  1759.              * know if the request is past EOF or we just don't have all
  1760.              * the datafiles sizes.  If it turns out we already
  1761.              * have all the datafile sizes, we
  1762.              * can go straight to computing the total size.  Otherwise, we
  1763.              * need to get the rest of the datafiles.  
  1764.              * At some point we should fix
  1765.              * this and the getattr state machine to allow us to only get
  1766.              * the remaining datafile sizes that we need, instead of 
  1767.              * getting all of them.  Right now the getattr state machine
  1768.              * just gets them all.
  1769.              */
  1770.             if(sm_p->u.io.datafile_count == attr->u.meta.dfile_count)
  1771.             {
  1772.                 /* we skip getting all the datafile sizes (since we already
  1773.                  * have them) and just compute the total size.
  1774.                  */
  1775.                 PVFS_size total_size;
  1776.                 
  1777.                 ret = io_find_total_size(
  1778.                     sm_p, max_datafile_logical_offset, &total_size);
  1779.                 if(ret < 0)
  1780.                 {
  1781.                     js_p->error_code = ret;
  1782.                     goto analyze_results_exit;
  1783.                 }
  1784.  
  1785.                 sm_p->u.io.io_resp_p->total_completed = total_size;
  1786.  
  1787.                 ret = io_zero_fill_holes(sm_p, eor,
  1788.                                          sm_p->u.io.datafile_count,
  1789.                                          sm_p->u.io.dfile_size_array,
  1790.                                          NULL);
  1791.                 if(ret < 0)
  1792.                 {
  1793.                     js_p->error_code = ret;
  1794.                     goto analyze_results_exit;
  1795.                 }
  1796.  
  1797.                 js_p->error_code = 0;
  1798.                 goto analyze_results_exit;
  1799.             }
  1800.  
  1801.             /* looks like we don't have all the datafile sizes, so
  1802.              * we need to go and get them
  1803.              */
  1804.  
  1805.             /* NOTE:  when jumping to getattr_datafile_sizes, results will be */
  1806.             /* allocated and stored in sm_p->getattr.size_array.              */
  1807.  
  1808.             /* setting this state result will cause the state machine to
  1809.              * jump to getattr_datafile_sizes and get all the 
  1810.              * datafile sizes from all the
  1811.              * servers.  Once complete, we will return back to the
  1812.              * io state machine at the analyze_size_results state.
  1813.              */
  1814.             js_p->error_code = IO_GET_DATAFILE_SIZE;
  1815.             goto analyze_results_exit;
  1816.         }
  1817.     }
  1818.  
  1819.     js_p->error_code = ret;
  1820.  
  1821. analyze_results_exit:
  1822.  
  1823.     return SM_ACTION_COMPLETE;
  1824. }
  1825.  
  1826. static PINT_sm_action io_analyze_size_results(
  1827.         struct PINT_smcb *smcb, job_status_s *js_p)
  1828. {
  1829.     /* Now that we have all the datafile sizes, 
  1830.      * this state allows us to finish our check that the file request
  1831.      * is not beyond EOF, and return the appropriate value for bytes
  1832.      * read to the sysint caller.
  1833.      *
  1834.      * The check iterates through all the datafiles and compares
  1835.      * their logical sizes with the upper bound of the file request.
  1836.      * If one of the datafile's logical sizes is >= than the ub,
  1837.      * we know the request is not past EOF.  Otherwise it must be, and
  1838.      * the return value for bytes read is calculated from the size
  1839.      * of the file request and the greatest logical offset of 
  1840.      * the datafiles (the actual EOF).
  1841.      */
  1842.     
  1843.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1844.     PVFS_offset eof = 0;
  1845.     PVFS_offset eor;
  1846.     PVFS_offset filereq_ub_offset;
  1847.     int ret;
  1848.     PVFS_object_attr * attr;
  1849.  
  1850.     attr = &sm_p->getattr.attr;
  1851.     assert(attr);
  1852.     
  1853.     ret = io_find_offset(
  1854.         sm_p, 
  1855.         PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
  1856.         &filereq_ub_offset);
  1857.     if(ret < 0)
  1858.     {
  1859.         js_p->error_code = ret;
  1860.         goto error_exit;
  1861.     }
  1862.  
  1863.     eor = filereq_ub_offset + sm_p->u.io.file_req_offset;
  1864.  
  1865.     eof = attr->u.meta.dist->methods->
  1866.         logical_file_size(
  1867.             attr->u.meta.dist->params,
  1868.             attr->u.meta.dfile_count,
  1869.             sm_p->getattr.size_array);
  1870.     if(eof > eor)
  1871.     {
  1872.         eor = eof;
  1873.  
  1874.         /* we found a logical offset that is past the end of the
  1875.          * request, so we know the request is not past EOF
  1876.          */
  1877.         sm_p->u.io.io_resp_p->total_completed = 
  1878.             PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req);
  1879.  
  1880.         ret = io_zero_fill_holes(sm_p, eor,
  1881.                                  attr->u.meta.dfile_count,
  1882.                                  sm_p->getattr.size_array,
  1883.                                  NULL);
  1884.         if(ret < 0)
  1885.         {
  1886.             js_p->error_code = ret;
  1887.             goto error_exit;
  1888.         }
  1889.     }
  1890.     else
  1891.     {
  1892.         PVFS_size total_size;
  1893.         
  1894.         ret = io_find_total_size(sm_p, eof, &total_size);
  1895.         if(ret < 0)
  1896.         {
  1897.             js_p->error_code = ret;
  1898.             goto error_exit;
  1899.         }
  1900.         
  1901.         sm_p->u.io.io_resp_p->total_completed = total_size;
  1902.  
  1903.         ret = io_zero_fill_holes(sm_p, eof,
  1904.                                  attr->u.meta.dfile_count,
  1905.                                  sm_p->getattr.size_array,
  1906.                                  NULL);
  1907.         if(ret < 0)
  1908.         {
  1909.             js_p->error_code = ret;
  1910.             goto error_exit;
  1911.         }
  1912.     }
  1913.  
  1914.  
  1915.     js_p->error_code = 0;
  1916.  
  1917. error_exit:
  1918.     return SM_ACTION_COMPLETE;
  1919. }
  1920.  
  1921. static PINT_sm_action io_cleanup(
  1922.         struct PINT_smcb *smcb, job_status_s *js_p)
  1923. {
  1924.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1925.     gossip_debug(GOSSIP_CLIENT_DEBUG,
  1926.                  "(%p) io state: io_cleanup\n", sm_p);
  1927.  
  1928.     io_contexts_destroy(sm_p);
  1929.  
  1930.     io_datafile_index_array_destroy(sm_p);
  1931.  
  1932.     PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr); 
  1933.     
  1934.     if(sm_p->u.io.dfile_size_array)
  1935.     {
  1936.         PINT_SM_DATAFILE_SIZE_ARRAY_DESTROY(&sm_p->u.io.dfile_size_array);
  1937.     }
  1938.  
  1939.     /*these errors occur only within THIS machine and indicate an error that
  1940.      *occurred BEFORE starting msgpairs or small-io.
  1941.     */
  1942.     if (js_p->error_code == IO_FATAL_ERROR)
  1943.         js_p->error_code = sm_p->u.io.stored_error_code;
  1944.  
  1945.     sm_p->error_code = js_p->error_code;
  1946.  
  1947.     if (sm_p->error_code)
  1948.     {
  1949.         char buf[64] = {0};
  1950.         PINT_acache_invalidate(sm_p->object_ref);
  1951.  
  1952.         PVFS_strerror_r(sm_p->error_code, buf, 64);
  1953.         gossip_debug(GOSSIP_IO_DEBUG,
  1954.                      "*** Final I/O operation error is %s\n", buf);
  1955.     }
  1956.  
  1957.     PINT_SET_OP_COMPLETE;
  1958.     return SM_ACTION_TERMINATE;
  1959. }
  1960.  
  1961. static inline int io_decode_ack_response(
  1962.     PINT_client_io_ctx *cur_ctx,
  1963.     struct PINT_decoded_msg *decoded_resp,
  1964.     struct PVFS_server_resp **resp)
  1965. {
  1966.     int ret = -PVFS_EINVAL;
  1967.  
  1968.     gossip_debug(GOSSIP_IO_DEBUG, "- io_process_context_recv called\n");
  1969.  
  1970.     assert(cur_ctx && decoded_resp && resp);
  1971.  
  1972.     ret = PINT_serv_decode_resp(
  1973.         cur_ctx->msg.fs_id, cur_ctx->msg.encoded_resp_p, decoded_resp,
  1974.         &cur_ctx->msg.svr_addr,
  1975.         cur_ctx->msg.recv_status.actual_size, resp);
  1976.  
  1977.     if (ret)
  1978.     {
  1979.         PVFS_perror("PINT_server_decode_resp failed", ret);
  1980.         return ret;
  1981.     }
  1982.  
  1983.     assert((*resp)->status < 1);
  1984.     cur_ctx->msg.op_status = (*resp)->status;
  1985.  
  1986.     if (cur_ctx->msg.recv_status.error_code || cur_ctx->msg.op_status)
  1987.     {
  1988.         gossip_debug(
  1989.             GOSSIP_IO_DEBUG, "  error %d with status %d related "
  1990.             "to response from context %p; not submitting flow.\n",
  1991.             cur_ctx->msg.recv_status.error_code,
  1992.             cur_ctx->msg.op_status, cur_ctx);
  1993.  
  1994.         if (cur_ctx->msg.recv_status.error_code)
  1995.         {
  1996.             PVFS_perror_gossip(
  1997.                 "io_process_context_recv (recv_status.error_code)",
  1998.                 cur_ctx->msg.recv_status.error_code);
  1999.             ret = cur_ctx->msg.recv_status.error_code;
  2000.         }
  2001.         else if (cur_ctx->msg.op_status)
  2002.         {
  2003.             PVFS_perror_gossip("io_process_context_recv (op_status)",
  2004.                                cur_ctx->msg.op_status);
  2005.             gossip_err("server: %s\n"
  2006.                       , BMI_addr_rev_lookup(cur_ctx->msg.svr_addr));
  2007.             ret = cur_ctx->msg.op_status;
  2008.         }
  2009.  
  2010.         PINT_serv_free_msgpair_resources(
  2011.             &cur_ctx->msg.encoded_req, cur_ctx->msg.encoded_resp_p,
  2012.             decoded_resp, &cur_ctx->msg.svr_addr,
  2013.             cur_ctx->msg.max_resp_sz);
  2014.         memset(&cur_ctx->msg.encoded_req,0,sizeof(cur_ctx->msg.encoded_req));
  2015.         cur_ctx->msg.encoded_resp_p = NULL;
  2016.     }
  2017.     return ret;
  2018. }
  2019.  
  2020. /* post flow sets up the flow and posts it.  This may be called
  2021.  * either immediately after the request is posted (in the case of writes
  2022.  * at present), or not until the ack from the request is received (as
  2023.  * in the case of reads).
  2024.  */
  2025. static inline int io_post_flow(
  2026.     PINT_smcb *smcb,
  2027.     PINT_client_io_ctx *cur_ctx)
  2028. {
  2029.     PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  2030.     int ret = 0;
  2031.     PVFS_object_attr *attr = NULL;
  2032.     struct server_configuration_s *server_config = NULL;
  2033.     unsigned long status_user_tag = 0;
  2034.     struct filesystem_configuration_s * fs_config;
  2035.  
  2036.     gossip_debug(GOSSIP_IO_DEBUG, "%s: entry\n", __func__);
  2037.  
  2038.     if (!sm_p || !cur_ctx)
  2039.     {
  2040.         return -PVFS_EINVAL;
  2041.     }
  2042.  
  2043.     /* We need the file's metadata info (distribution and datafile count) */
  2044.     attr = &sm_p->getattr.attr;
  2045.     assert(attr);
  2046.     
  2047.     /*
  2048.      * Notify BMI about the memory buffer the user passed in.  For transports
  2049.      * that need registration, this allows them to work with one large region
  2050.      * rather than lots of small stripe-size regions.  But only bother if the
  2051.      * request is contiguous; too complex and likely no faster in the highly
  2052.      * fragmented case.
  2053.      */
  2054.     if (sm_p->u.io.mem_req->num_contig_chunks == 1)
  2055.     {
  2056.         struct bmi_optimistic_buffer_info binfo;
  2057.  
  2058.         binfo.buffer = sm_p->u.io.buffer;
  2059.         binfo.len = PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
  2060.         binfo.rw = sm_p->u.io.io_type;
  2061.         BMI_set_info(cur_ctx->msg.svr_addr, BMI_OPTIMISTIC_BUFFER_REG, &binfo);
  2062.     }
  2063.  
  2064.     gossip_debug(GOSSIP_IO_DEBUG, "* mem req size is %lld, "
  2065.                  "file_req size is %lld (bytes)\n",
  2066.                  lld(PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req)),
  2067.                  lld(PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.file_req)));
  2068.  
  2069.     /* must reset the error_code and internal PINT_distribute fields
  2070.      * in case of a retry */
  2071.     PINT_flow_reset(&cur_ctx->flow_desc);
  2072.  
  2073.     cur_ctx->flow_desc.file_data.fsize = 
  2074.         sm_p->u.io.dfile_size_array[cur_ctx->index];
  2075.     cur_ctx->flow_desc.file_data.dist = attr->u.meta.dist;
  2076.     cur_ctx->flow_desc.file_data.server_nr = cur_ctx->server_nr;
  2077.     cur_ctx->flow_desc.file_data.server_ct = attr->u.meta.dfile_count;
  2078.  
  2079.     cur_ctx->flow_desc.file_req = sm_p->u.io.file_req;
  2080.     cur_ctx->flow_desc.file_req_offset = sm_p->u.io.file_req_offset;
  2081.  
  2082.     cur_ctx->flow_desc.mem_req = sm_p->u.io.mem_req;
  2083.  
  2084.     cur_ctx->flow_desc.tag = cur_ctx->session_tag;
  2085.     cur_ctx->flow_desc.type = sm_p->u.io.flowproto_type;
  2086.     cur_ctx->flow_desc.user_ptr = NULL;
  2087.  
  2088.     gossip_debug(GOSSIP_IO_DEBUG, "  bstream_size = %lld, datafile "
  2089.                  "nr=%d, ct=%d, file_req_off = %lld\n",
  2090.                  lld(cur_ctx->flow_desc.file_data.fsize),
  2091.                  cur_ctx->flow_desc.file_data.server_nr,
  2092.                  cur_ctx->flow_desc.file_data.server_ct,
  2093.                  lld(cur_ctx->flow_desc.file_req_offset));
  2094.  
  2095.     if (sm_p->u.io.io_type == PVFS_IO_READ)
  2096.     {
  2097.         cur_ctx->flow_desc.file_data.extend_flag = 0;
  2098.         cur_ctx->flow_desc.src.endpoint_id = BMI_ENDPOINT;
  2099.         cur_ctx->flow_desc.src.u.bmi.address = cur_ctx->msg.svr_addr;
  2100.         cur_ctx->flow_desc.dest.endpoint_id = MEM_ENDPOINT;
  2101.         cur_ctx->flow_desc.dest.u.mem.buffer = sm_p->u.io.buffer;
  2102.     }
  2103.     else
  2104.     {
  2105.         assert(sm_p->u.io.io_type == PVFS_IO_WRITE);
  2106.  
  2107.         cur_ctx->flow_desc.file_data.extend_flag = 1;
  2108.         cur_ctx->flow_desc.src.endpoint_id = MEM_ENDPOINT;
  2109.         cur_ctx->flow_desc.src.u.mem.buffer = sm_p->u.io.buffer;
  2110.         cur_ctx->flow_desc.dest.endpoint_id = BMI_ENDPOINT;
  2111.         cur_ctx->flow_desc.dest.u.bmi.address = cur_ctx->msg.svr_addr;
  2112.     }
  2113.  
  2114.     status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FLOW);
  2115.  
  2116.     server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);    
  2117.  
  2118.     fs_config = PINT_config_find_fs_id(server_config, cur_ctx->msg.fs_id);
  2119.     if(fs_config)
  2120.     {
  2121.         /* pick up any buffer settings overrides from fs conf */
  2122.         cur_ctx->flow_desc.buffer_size = fs_config->fp_buffer_size;
  2123.         cur_ctx->flow_desc.buffers_per_flow = fs_config->fp_buffers_per_flow;
  2124.     }
  2125.  
  2126.     ret = job_flow(
  2127.         &cur_ctx->flow_desc, smcb, status_user_tag,
  2128.         &cur_ctx->flow_status, &cur_ctx->flow_job_id,
  2129.         pint_client_sm_context,
  2130.         server_config->client_job_flow_timeout, sm_p->hints);
  2131.     PINT_put_server_config_struct(server_config);
  2132.  
  2133.     /* if the flow fails immediately, then we have to do some special
  2134.      * handling.  This function is not equiped to handle the failure
  2135.      * directly, so we instead post a null job that will propigate the error
  2136.      * to the normal state where we interpret flow errors
  2137.      */
  2138.     if((ret < 0) || (ret == 1 && cur_ctx->flow_status.error_code != 0))
  2139.     {
  2140.         /* make sure the error code is stored in the flow descriptor */
  2141.         if(ret == 1)
  2142.         {
  2143.             cur_ctx->flow_desc.error_code = cur_ctx->flow_status.error_code;
  2144.         }
  2145.         else
  2146.         {
  2147.             cur_ctx->flow_desc.error_code = ret;
  2148.         }
  2149.  
  2150.         gossip_debug(GOSSIP_IO_DEBUG, "  immediate flow failure for "
  2151.                      "context %p, error code: %d\n", cur_ctx,
  2152.                      cur_ctx->flow_desc.error_code);
  2153.         gossip_debug(GOSSIP_IO_DEBUG, "  posting job_null() to propigate.\n");
  2154.  
  2155.         /* post a fake job to propigate the flow failure to a later state */
  2156.         ret = job_null(cur_ctx->flow_desc.error_code, sm_p, 
  2157.             status_user_tag, &cur_ctx->flow_status,
  2158.             &cur_ctx->flow_job_id, pint_client_sm_context);
  2159.         if(ret !=0)
  2160.         {
  2161.             return(ret);
  2162.         }
  2163.     }
  2164.     else
  2165.     {
  2166.         gossip_debug(GOSSIP_IO_DEBUG, "  posted flow for "
  2167.                      "context %p\n", cur_ctx);
  2168.     }
  2169.  
  2170.     cur_ctx->flow_has_been_posted = 1;
  2171.     cur_ctx->flow_in_progress = 1;
  2172.     sm_p->u.io.flow_completion_count++;
  2173.  
  2174.     return 0;
  2175. }
  2176.  
  2177. static inline int io_post_write_ack_recv(
  2178.     PINT_smcb *smcb,
  2179.     PINT_client_io_ctx * cur_ctx)
  2180. {
  2181.     PINT_client_sm * sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  2182.     int ret;
  2183.     unsigned long status_user_tag;
  2184.  
  2185.     gossip_debug(GOSSIP_IO_DEBUG, "  preposting write "
  2186.                  "ack for context %p.\n", cur_ctx);
  2187.  
  2188.     cur_ctx->write_ack.max_resp_sz = PINT_encode_calc_max_size(
  2189.         PINT_ENCODE_RESP, PVFS_SERV_WRITE_COMPLETION,
  2190.         sm_p->u.io.encoding);
  2191.     cur_ctx->write_ack.encoded_resp_p = BMI_memalloc(
  2192.         cur_ctx->msg.svr_addr, cur_ctx->write_ack.max_resp_sz,
  2193.         BMI_RECV);
  2194.  
  2195.     if (!cur_ctx->write_ack.encoded_resp_p)
  2196.     {
  2197.         gossip_err("BMI_memalloc (for write ack) failed\n");
  2198.         return -PVFS_ENOMEM;
  2199.     }
  2200.  
  2201.     /*
  2202.        we're pre-posting the final write ack here, even though it's
  2203.        ahead of the flow phase; reads are at the flow phase.
  2204.  
  2205.        the timeout used here is a scaling one that needs to be long
  2206.        enough for the entire flow to occur
  2207.        */
  2208.     status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FINAL_ACK);
  2209.  
  2210.     /*
  2211.        pre-post this recv with an infinite timeout and adjust it
  2212.        after the flow completes since we don't know how long a flow
  2213.        can take at this point
  2214.        */ 
  2215.     ret = job_bmi_recv(
  2216.         cur_ctx->msg.svr_addr, cur_ctx->write_ack.encoded_resp_p,
  2217.         cur_ctx->write_ack.max_resp_sz, cur_ctx->session_tag,
  2218.         BMI_PRE_ALLOC, smcb, status_user_tag,
  2219.         &cur_ctx->write_ack.recv_status, &cur_ctx->write_ack.recv_id,
  2220.         pint_client_sm_context, JOB_TIMEOUT_INF, sm_p->hints);
  2221.  
  2222.     if (ret < 0)
  2223.     {
  2224.         gossip_err("job_bmi_recv (write ack) failed\n");
  2225.         return ret;
  2226.     }
  2227.  
  2228.     assert(ret == 0);
  2229.     cur_ctx->write_ack_has_been_posted = 1;
  2230.     cur_ctx->write_ack_in_progress = 1;
  2231.     sm_p->u.io.write_ack_completion_count++;
  2232.  
  2233.     return 0;
  2234. }
  2235.  
  2236. /*
  2237.   returns 0 on send completion; IO_RECV_COMPLETED on recv completion,
  2238.   and -PVFS_error on failure
  2239. */
  2240. static inline int io_complete_context_send_or_recv(
  2241.     PINT_smcb *smcb,
  2242.     job_status_s *js_p)
  2243. {
  2244.     int ret = -PVFS_EINVAL, index = 0;
  2245.     unsigned long status_user_tag = 0;
  2246.     PINT_client_io_ctx *cur_ctx = NULL;
  2247.     PINT_sm_msgpair_state *msg = NULL;
  2248.     PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  2249.  
  2250.     gossip_debug(GOSSIP_IO_DEBUG,
  2251.                  "- complete_context_send_or_recv called\n");
  2252.  
  2253.     assert(smcb && js_p);
  2254.     assert(smcb->op == PVFS_SYS_IO);
  2255.  
  2256.     status_user_tag = (unsigned long)js_p->status_user_tag;
  2257.  
  2258.     if (STATUS_USER_TAG_TYPE(
  2259.             status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV))
  2260.     {
  2261.         index = STATUS_USER_TAG_GET_INDEX(
  2262.             status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
  2263.  
  2264.         gossip_debug(GOSSIP_IO_DEBUG, "got a recv completion with "
  2265.                      "context index %d\n", index);
  2266.  
  2267.         cur_ctx = &sm_p->u.io.contexts[index];
  2268.         assert(cur_ctx);
  2269.  
  2270.         msg = &cur_ctx->msg;
  2271.  
  2272.         msg->recv_id = 0;
  2273.         msg->recv_status = *js_p;
  2274.  
  2275.         assert(msg->recv_status.error_code <= 0);
  2276.         assert(msg->recv_status.actual_size <= msg->max_resp_sz);
  2277.  
  2278.         cur_ctx->msg_recv_in_progress = 0;
  2279.         sm_p->u.io.msgpair_completion_count--;
  2280.  
  2281.         ret = IO_RECV_COMPLETED;
  2282.     }
  2283.     else if (STATUS_USER_TAG_TYPE(
  2284.                  status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
  2285.     {
  2286.         index = STATUS_USER_TAG_GET_INDEX(
  2287.             status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
  2288.  
  2289.         gossip_debug(GOSSIP_IO_DEBUG, "got a send completion with "
  2290.                      "context index %d\n", index);
  2291.  
  2292.         cur_ctx = &sm_p->u.io.contexts[index];
  2293.         assert(cur_ctx);
  2294.  
  2295.         msg = &cur_ctx->msg;
  2296.  
  2297.         msg->send_id = 0;
  2298.         msg->send_status = *js_p;
  2299.  
  2300.         assert(msg->send_status.error_code <= 0);
  2301.  
  2302.         cur_ctx->msg_send_in_progress = 0;
  2303.         sm_p->u.io.msgpair_completion_count--;
  2304.  
  2305.         ret = 0;
  2306.     }
  2307.     return ret;
  2308. }
  2309.  
  2310. /**
  2311.  * process_context_recv handles the ack or nack from the server
  2312.  * in response to the I/O request.  This is called for each I/O context
  2313.  * i.e. each specific server response for each datafile.
  2314.  */
  2315. static inline int io_process_context_recv(
  2316.     PINT_client_sm *sm_p,
  2317.     job_status_s *js_p, 
  2318.     PINT_client_io_ctx **out_ctx)
  2319. {
  2320.     int ret = -PVFS_EINVAL, index = 0;
  2321.     unsigned long status_user_tag = 0;
  2322.     struct PINT_decoded_msg decoded_resp;
  2323.     struct PVFS_server_resp *resp = NULL;
  2324.     PINT_client_io_ctx *cur_ctx = NULL;
  2325.  
  2326.     gossip_debug(GOSSIP_IO_DEBUG,
  2327.                  "- io_process_context_recv called\n");
  2328.  
  2329.     assert(sm_p && js_p);
  2330.     assert(STATUS_USER_TAG_TYPE(
  2331.                status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV));
  2332.  
  2333.     status_user_tag = (unsigned long)js_p->status_user_tag;
  2334.  
  2335.     index = STATUS_USER_TAG_GET_INDEX(
  2336.         status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
  2337.  
  2338.     cur_ctx = &sm_p->u.io.contexts[index];
  2339.     assert(cur_ctx);
  2340.     *out_ctx = cur_ctx;
  2341.  
  2342.     if (js_p->error_code)
  2343.     {
  2344.         {
  2345.             char buf[1024];
  2346.             PVFS_strerror_r(js_p->error_code, buf, sizeof(buf));
  2347.             buf[sizeof(buf)-1] = '\0';
  2348.             gossip_debug(GOSSIP_IO_DEBUG, "%s: entered with error: %s\n",
  2349.               __func__, buf);
  2350.         }
  2351.         return js_p->error_code;
  2352.     }
  2353.  
  2354.     /* decode the response from the server */
  2355.     ret = io_decode_ack_response(cur_ctx, &decoded_resp, &resp);
  2356.     if (ret)
  2357.     {
  2358.         {
  2359.             char buf[1024];
  2360.             PVFS_strerror_r(js_p->error_code, buf, sizeof(buf));
  2361.             buf[sizeof(buf)-1] = '\0';
  2362.             gossip_debug(GOSSIP_IO_DEBUG, "%s: failed: %s\n", __func__, buf);
  2363.         }
  2364.         return ret;
  2365.     }
  2366.  
  2367.     /* save the datafile size */
  2368.     sm_p->u.io.dfile_size_array[cur_ctx->index] = resp->u.io.bstream_size;
  2369.  
  2370.     /* now we can destroy I/O request response resources */
  2371.     ret = PINT_serv_free_msgpair_resources(
  2372.         &cur_ctx->msg.encoded_req, cur_ctx->msg.encoded_resp_p,
  2373.         &decoded_resp, &cur_ctx->msg.svr_addr,
  2374.         cur_ctx->msg.max_resp_sz);
  2375.     memset(&cur_ctx->msg.encoded_req,0,sizeof(cur_ctx->msg.encoded_req));
  2376.     cur_ctx->msg.encoded_resp_p = NULL;
  2377.  
  2378.     if (ret)
  2379.     {
  2380.         PVFS_perror_gossip("PINT_serv_free_msgpair_resources "
  2381.                            "failed", ret);
  2382.         return ret;
  2383.     }
  2384.  
  2385.     return ret;
  2386. }
  2387.  
  2388. static inline int io_check_context_status(
  2389.     PINT_client_io_ctx *cur_ctx,
  2390.     int io_type,
  2391.     PVFS_size *total_size)
  2392. {
  2393.     int ret = 0;
  2394.  
  2395.     gossip_debug(GOSSIP_IO_DEBUG, "- io_check_context_status called\n");
  2396.  
  2397.     assert(cur_ctx && total_size);
  2398.  
  2399.     if (cur_ctx->msg.send_status.error_code)
  2400.     {
  2401.         gossip_debug(GOSSIP_IO_DEBUG,
  2402.                      "  error (%d) in msgpair send for context %p\n",
  2403.                      cur_ctx->msg.send_status.error_code, cur_ctx);
  2404.         ret = cur_ctx->msg.send_status.error_code;
  2405.     }
  2406.     else if (cur_ctx->msg.recv_status.error_code)
  2407.     {
  2408.         gossip_debug(GOSSIP_IO_DEBUG,
  2409.                      "  error (%d) in msgpair recv for context %p\n",
  2410.                      cur_ctx->msg.recv_status.error_code, cur_ctx);
  2411.         ret = cur_ctx->msg.recv_status.error_code;
  2412.     }
  2413.     else if (io_type == PVFS_IO_WRITE)
  2414.     {
  2415.         /* we check the write ack status before the flow status so that the
  2416.          * error code that the server reported in the ack takes precedence
  2417.          */
  2418.         if (cur_ctx->write_ack.recv_status.error_code)
  2419.         {
  2420.             gossip_debug(
  2421.                 GOSSIP_IO_DEBUG,
  2422.                 "  error (%d) in final ack for context %p\n",
  2423.                 cur_ctx->write_ack.recv_status.error_code, cur_ctx);
  2424.  
  2425.             assert(cur_ctx->write_ack_has_been_posted);
  2426.             ret = cur_ctx->write_ack.recv_status.error_code;
  2427.         }
  2428.         else if (cur_ctx->write_ack_has_been_posted)
  2429.         {
  2430.             struct PINT_decoded_msg decoded_resp;
  2431.             struct PVFS_server_resp *resp = NULL;
  2432.             /*
  2433.               size for writes are reported in the final ack, but we
  2434.               have to decode it first
  2435.             */
  2436.             ret = PINT_serv_decode_resp(
  2437.                 cur_ctx->msg.fs_id, cur_ctx->write_ack.encoded_resp_p,
  2438.                 &decoded_resp, &cur_ctx->msg.svr_addr,
  2439.                 cur_ctx->write_ack.recv_status.actual_size, &resp);
  2440.             if (ret == 0)
  2441.             {
  2442.                 gossip_debug(
  2443.                     GOSSIP_IO_DEBUG,
  2444.                     "  %lld bytes written to context %p\n",
  2445.                     lld(resp->u.write_completion.total_completed),
  2446.                     cur_ctx);
  2447.  
  2448.                 *total_size += resp->u.write_completion.total_completed;
  2449.                 
  2450.                 /* pass along the error code from the server as well */
  2451.                 ret = resp->status;
  2452.  
  2453.                 PINT_decode_release(&decoded_resp, PINT_DECODE_RESP);
  2454.             }
  2455.             else
  2456.             {
  2457.                 PVFS_perror_gossip("PINT_serv_decode_resp failed", ret);
  2458.             }
  2459.  
  2460.             PINT_flow_reset(&cur_ctx->flow_desc);
  2461.             BMI_memfree(cur_ctx->msg.svr_addr,
  2462.                         cur_ctx->write_ack.encoded_resp_p,
  2463.                         cur_ctx->write_ack.max_resp_sz, BMI_RECV);
  2464.         }
  2465.         else if (cur_ctx->flow_status.error_code)
  2466.         {
  2467.             gossip_debug(GOSSIP_IO_DEBUG,
  2468.                          "  error (%d) in flow for context %p\n",
  2469.                          cur_ctx->flow_status.error_code, cur_ctx);
  2470.             PINT_flow_reset(&cur_ctx->flow_desc);
  2471.             ret = cur_ctx->flow_status.error_code;
  2472.         }
  2473.     }
  2474.     else if (cur_ctx->flow_status.error_code)
  2475.     {
  2476.         gossip_debug(GOSSIP_IO_DEBUG,
  2477.                      "  error (%d) in flow for context %p\n",
  2478.                      cur_ctx->flow_status.error_code, cur_ctx);
  2479.         PINT_flow_reset(&cur_ctx->flow_desc);
  2480.         ret = cur_ctx->flow_status.error_code;
  2481.     }
  2482.     else if (io_type == PVFS_IO_READ)
  2483.     {
  2484.         gossip_debug(
  2485.             GOSSIP_IO_DEBUG, "  %lld bytes read from context %p\n",
  2486.             lld(cur_ctx->flow_desc.total_transferred), cur_ctx);
  2487.  
  2488.         /* size for reads are reported in the flow */
  2489.         *total_size += cur_ctx->flow_desc.total_transferred;
  2490.  
  2491.         /*
  2492.           we can't reset the flow here in case we have to do a zero
  2493.           fill adjustment that we haven't detected yet
  2494.         */
  2495.     }
  2496.  
  2497.     return ret;
  2498. }
  2499.  
  2500. static int io_get_max_unexp_size(
  2501.     struct PINT_Request * file_req,
  2502.     PVFS_handle handle, 
  2503.     PVFS_fs_id fs_id, 
  2504.     enum PVFS_io_type type,
  2505.     int * max_unexp_payload)
  2506. {
  2507.     int ret;
  2508.     PVFS_BMI_addr_t server_addr;
  2509.     int bmi_max_unexp_payload;
  2510.  
  2511.     /* we need to get the server address for the particular server
  2512.      * with this datafile so that we can use it to get the size
  2513.      * of the unexpected payload for the bmi interface associated
  2514.      * with that server.  We used the payload size to determine
  2515.      * whether to do small I/O.
  2516.      */
  2517.     ret = PINT_cached_config_map_to_server(
  2518.         &server_addr, 
  2519.         handle,
  2520.         fs_id);
  2521.     if(ret < 0)
  2522.     {
  2523.         return ret; 
  2524.     }
  2525.  
  2526.     /* now get the value for the BMI interface in use for that server */
  2527.     ret = BMI_get_info(server_addr, BMI_GET_UNEXP_SIZE, 
  2528.                        (void *)&bmi_max_unexp_payload);
  2529.     if(ret < 0)
  2530.     {
  2531.         return ret;
  2532.     }
  2533.  
  2534.     *max_unexp_payload = bmi_max_unexp_payload;
  2535.     return 0;
  2536. }
  2537.  
  2538. /*
  2539.   determines what subset of the datafiles actually contain data that
  2540.   we are interested in for this request. returns 0 on success,
  2541.   -PVFS_error on failure
  2542. */
  2543. static int io_find_target_datafiles(
  2544.     PVFS_Request mem_req,
  2545.     PVFS_Request file_req,
  2546.     PVFS_offset file_req_offset,
  2547.     PINT_dist *dist_p,
  2548.     PVFS_fs_id fs_id,
  2549.     enum PVFS_io_type io_type,
  2550.     PVFS_handle *input_handle_array,
  2551.     int input_handle_count,
  2552.     int *handle_index_array,
  2553.     int *handle_index_out_count,
  2554.     int *sio_handle_index_array,
  2555.     int *sio_handle_index_count)
  2556. {
  2557.     int ret = -PVFS_EINVAL, i = 0;
  2558.     struct PINT_Request_state *req_state = NULL;
  2559.     struct PINT_Request_state *mem_req_state = NULL;
  2560.     PINT_request_file_data tmp_file_data;
  2561.     PINT_Request_result tmp_result;
  2562.     PVFS_offset offsets;
  2563.     PVFS_size sizes;
  2564.     int total_bytes = 0;
  2565.     struct server_configuration_s * server_config;
  2566.     struct filesystem_configuration_s * fs_config;
  2567.     int small_io_size;
  2568.  
  2569.     gossip_debug(GOSSIP_IO_DEBUG, "- io_find_target_datafiles called\n");
  2570.  
  2571.     if (!handle_index_array || !handle_index_out_count)
  2572.     {
  2573.         return ret;
  2574.     }
  2575.     *handle_index_out_count = 0;
  2576.     *sio_handle_index_count = 0;
  2577.  
  2578.     req_state = PINT_new_request_state(file_req);
  2579.     if (!req_state)
  2580.     {
  2581.         return -PVFS_ENOMEM;
  2582.     }
  2583.     mem_req_state = PINT_new_request_state(mem_req);
  2584.     if (!mem_req_state)
  2585.     {
  2586.         PINT_free_request_state(req_state);
  2587.         return -PVFS_ENOMEM;
  2588.     }
  2589.  
  2590.     tmp_file_data.dist = dist_p;
  2591.     tmp_file_data.server_ct = input_handle_count;
  2592.     tmp_file_data.extend_flag = 1;
  2593.  
  2594.     /* for each datafile handle, calculate the unexp request or response
  2595.      * size (may be different for each server), and then calculate if
  2596.      * any data exists on that server (in the case of reads) 
  2597.      * or if any data should be written
  2598.      * (in the case of writes).
  2599.      */
  2600.     for(i = 0; i < input_handle_count; i++)
  2601.     {
  2602.         int max_unexp_payload;
  2603.  
  2604.         ret = io_get_max_unexp_size(
  2605.             file_req, input_handle_array[i], fs_id, 
  2606.             io_type, &max_unexp_payload);
  2607.         if(ret < 0)
  2608.         {
  2609.             PINT_free_request_state(mem_req_state);
  2610.             PINT_free_request_state(req_state);
  2611.             return ret;
  2612.         }
  2613.  
  2614.         /* NOTE: we don't have to give an accurate file size here, as
  2615.          * long as we set the extend flag to tell the I/O req
  2616.          * processor to continue past eof if needed
  2617.          */
  2618.         tmp_file_data.fsize = 0;  
  2619.         tmp_file_data.server_nr = i;
  2620.  
  2621.         PINT_REQUEST_STATE_RESET(req_state);
  2622.         PINT_REQUEST_STATE_RESET(mem_req_state);
  2623.  
  2624.         /* if a file datatype offset was specified, go ahead and skip
  2625.          * ahead before calculating
  2626.          */
  2627.         if (file_req_offset)
  2628.         {
  2629.             PINT_REQUEST_STATE_SET_TARGET(req_state, file_req_offset);
  2630.         }
  2631.  
  2632.         PINT_REQUEST_STATE_SET_FINAL(req_state,
  2633.             file_req_offset+PINT_REQUEST_TOTAL_BYTES(mem_req));
  2634.  
  2635.         memset(&tmp_result, 0, sizeof(PINT_Request_result));
  2636.  
  2637.         /* + 1 here so that the total_bytes processed can exist
  2638.          * be > max_unexp_payload
  2639.          */
  2640.         tmp_result.bytemax = max_unexp_payload + 1;
  2641.         tmp_result.segmax = 1;
  2642.         
  2643.         tmp_result.offset_array = &offsets;
  2644.         tmp_result.size_array = &sizes;
  2645.         total_bytes = 0;
  2646.  
  2647.         /* we need to keep processing the request (not just check for non-zero)
  2648.          * so that we can figure out whether to do small I/O.
  2649.          */
  2650.         do
  2651.         {
  2652.             tmp_result.bytes = 0;
  2653.             tmp_result.segs = 0;
  2654.  
  2655.             /* PINT_process_request() returns number of bytes processed */
  2656.             ret = PINT_process_request(
  2657.                 req_state, mem_req_state, &tmp_file_data,
  2658.                 &tmp_result, PINT_CLIENT);
  2659.             if (ret < 0)
  2660.             {
  2661.                 PINT_free_request_state(mem_req_state);
  2662.                 PINT_free_request_state(req_state);
  2663.                 return ret;
  2664.             }
  2665.  
  2666.             total_bytes += tmp_result.bytes;
  2667.  
  2668.             /* we limit the request processing for each datafile to only
  2669.              * check that the size is as least as big as max_unexp_size.
  2670.              * That way we know whether to do small I/O.  Calculating the
  2671.              * entire size for each datafile isn't necessary (and may be
  2672.              * expensive).
  2673.              */
  2674.         } while(!PINT_REQUEST_DONE(req_state) 
  2675.                 && total_bytes <= max_unexp_payload); 
  2676.  
  2677.         /* check if we found data that belongs to this handle */
  2678.         if (total_bytes != 0)
  2679.         {
  2680.             handle_index_array[(*handle_index_out_count)++] = i;
  2681.  
  2682. #ifndef PVFS2_SMALL_IO_OFF
  2683.             /* we need the encoding type from the server config */
  2684.             server_config = PINT_get_server_config_struct(fs_id);
  2685.             if(!server_config)
  2686.             {
  2687.                 return -PVFS_EINVAL;
  2688.             }
  2689.  
  2690.             fs_config = PINT_config_find_fs_id(server_config, fs_id);
  2691.             if(!fs_config)
  2692.             {
  2693.                 return -PVFS_EINVAL;
  2694.             }
  2695.  
  2696.             /* finally, compute the exact payload from the bmi_max_unexp_payload
  2697.              * and the small io message size
  2698.              */
  2699.             small_io_size = PINT_encode_calc_max_size(
  2700.                 (io_type == PVFS_IO_READ ? PINT_ENCODE_RESP : PINT_ENCODE_REQ), 
  2701.                 PVFS_SERV_SMALL_IO, 
  2702.                 fs_config->encoding);
  2703.  
  2704.             PINT_put_server_config_struct(server_config);
  2705.  
  2706.             if(io_type == PVFS_IO_WRITE)
  2707.             {
  2708.                 /* add the size of the entire file request */
  2709.                 small_io_size += (PVFS_REQUEST_ENCODED_SIZE * 
  2710.                                   (file_req->num_nested_req + 1));
  2711.             }
  2712.  
  2713.             /* encode/decode funcs malloc a max size for small I/O independent
  2714.              * of the bmi max unexp payload, so we need to subtract that
  2715.              * extra here
  2716.              */
  2717.             small_io_size -= (io_type == PVFS_IO_READ ? 
  2718.                               extra_size_PVFS_servreq_small_io :
  2719.                               extra_size_PVFS_servresp_small_io);
  2720.  
  2721.             if(total_bytes + small_io_size <= max_unexp_payload)
  2722.             {
  2723.                 sio_handle_index_array[(*sio_handle_index_count)++] = i;
  2724.             }
  2725. #endif
  2726.  
  2727.             gossip_debug(GOSSIP_IO_DEBUG, "%s: "
  2728.                          "datafile[%d] might have data (out=%d)\n",
  2729.                          __func__, i, *handle_index_out_count);
  2730.         }
  2731.     }
  2732.     PINT_free_request_state(req_state);
  2733.     PINT_free_request_state(mem_req_state);
  2734.  
  2735.     return 0;
  2736. }
  2737.  
  2738. /* If there are no datafiles that have a logical
  2739.  * offset past the upper bound of the file request, we know that
  2740.  * the request is beyond the EOF of the file.  We compute
  2741.  * the return value for bytes read by finding the upper bound of the
  2742.  * memory request *within* the logical file (before EOF).  This is
  2743.  * the end of the contiguous segment in the file request < EOF.
  2744.  * The number of bytes read is then the length of the file request
  2745.  * from start to this point.
  2746.  */
  2747. static int
  2748. io_find_total_size(
  2749.     PINT_client_sm * sm_p,
  2750.     PVFS_offset final_offset,
  2751.     PVFS_size * total_return_size)
  2752. {
  2753.     int res;
  2754.     PVFS_offset current_offset;
  2755.     PVFS_offset offsets[IO_MAX_SEGMENT_NUM];
  2756.     PVFS_size sizes[IO_MAX_SEGMENT_NUM];
  2757.     PINT_Request_state * filereq_state;
  2758.     PINT_Request_state * memreq_state;
  2759.     PINT_request_file_data rfdata;
  2760.     PINT_Request_result result;
  2761.     PVFS_size total_size = 0;
  2762.     PVFS_object_attr * attr;
  2763.     int index = 0;
  2764.  
  2765.     /* if the final offset is zero, then the file size is zero */
  2766.     if(final_offset == 0)
  2767.     {
  2768.         *total_return_size = 0;
  2769.         return 0;
  2770.     }
  2771.  
  2772.     attr = &sm_p->getattr.attr;
  2773.  
  2774.     filereq_state = PINT_new_request_state(sm_p->u.io.file_req);
  2775.     memreq_state = PINT_new_request_state(sm_p->u.io.mem_req);
  2776.  
  2777.     rfdata.server_nr = 0;
  2778.     rfdata.server_ct = 1;
  2779.     rfdata.fsize = final_offset;
  2780.     rfdata.dist = attr->u.meta.dist;
  2781.     rfdata.extend_flag = 0;
  2782.  
  2783.     result.offset_array = offsets;
  2784.     result.size_array = sizes;
  2785.     result.segmax = IO_MAX_SEGMENT_NUM;
  2786.     result.bytemax = final_offset; 
  2787.  
  2788.     PINT_REQUEST_STATE_SET_FINAL(
  2789.         filereq_state, final_offset);
  2790.  
  2791.     do
  2792.     {
  2793.         result.segs = 0;
  2794.         result.bytes = 0;
  2795.  
  2796.         res = PINT_process_request(filereq_state, memreq_state,
  2797.                                    &rfdata, &result, PINT_SERVER);
  2798.         if(res < 0)
  2799.         {
  2800.             goto exit;
  2801.         }
  2802.  
  2803.         for(index = 0; index < result.segs; ++index)
  2804.         {
  2805.             current_offset = sm_p->u.io.file_req_offset + offsets[index];
  2806.             if((final_offset >= current_offset) &&
  2807.                (final_offset <= (current_offset + sizes[index])))
  2808.             {
  2809.                 total_size += (final_offset - current_offset);
  2810.                 break;
  2811.             }
  2812.             else if(final_offset < current_offset)
  2813.             {
  2814.                 break;
  2815.             }
  2816.             else
  2817.             {
  2818.                 total_size += sizes[index];
  2819.             }
  2820.         }
  2821.  
  2822.     } while(!PINT_REQUEST_DONE(filereq_state) && result.segs);
  2823.  
  2824.     *total_return_size = total_size;
  2825.  
  2826. exit:
  2827.  
  2828.     PINT_free_request_state(filereq_state);
  2829.     PINT_free_request_state(memreq_state);
  2830.     return 0;
  2831. }
  2832.  
  2833. /* computes the logical offset in the file request from the size
  2834.  * of contiguous buffer.  This function acts only on the file request
  2835.  * since the actual size of the file doesn't matter.
  2836.  */
  2837. static int
  2838. io_find_offset(
  2839.     PINT_client_sm * sm_p,
  2840.     PVFS_size contig_size,
  2841.     PVFS_offset * total_return_offset)
  2842. {
  2843.     PINT_Request_state * filereq_state;
  2844.     PINT_Request_state * memreq_state;
  2845.     PINT_request_file_data rfdata;
  2846.     PINT_Request_result result;
  2847.     int res;
  2848.     PVFS_offset offsets[IO_MAX_SEGMENT_NUM];
  2849.     PVFS_size sizes[IO_MAX_SEGMENT_NUM];
  2850.     PVFS_offset total_offset = 0;
  2851.     PVFS_size total_size = 0;
  2852.     PVFS_object_attr * attr;
  2853.     int index = 0;
  2854.  
  2855.     attr = &sm_p->getattr.attr;
  2856.  
  2857.     filereq_state = PINT_new_request_state(sm_p->u.io.file_req);
  2858.     memreq_state = PINT_new_request_state(sm_p->u.io.mem_req);
  2859.  
  2860.     rfdata.server_nr = 0;
  2861.     rfdata.server_ct = 1;
  2862.     rfdata.fsize = 0;
  2863.     rfdata.dist = attr->u.meta.dist;
  2864.     rfdata.extend_flag = 1;
  2865.  
  2866.     result.offset_array = offsets;
  2867.     result.size_array = sizes;
  2868.     result.segmax = IO_MAX_SEGMENT_NUM;
  2869.     result.bytemax = contig_size; 
  2870.  
  2871.     PINT_REQUEST_STATE_SET_FINAL(
  2872.         filereq_state, contig_size);
  2873.     do
  2874.     {
  2875.         result.segs = 0;
  2876.         result.bytes = 0;
  2877.  
  2878.         res = PINT_process_request(filereq_state, memreq_state,
  2879.                                    &rfdata, &result, PINT_SERVER);
  2880.         if(res < 0)
  2881.         {
  2882.             PINT_free_request_state(filereq_state);
  2883.             PINT_free_request_state(memreq_state);
  2884.             return res;
  2885.         }
  2886.  
  2887.         for(index = 0; index < result.segs; ++index)
  2888.         {
  2889.             if(contig_size <= (total_size + sizes[index]))
  2890.             {
  2891.                 total_offset = offsets[index] + (contig_size - total_size);
  2892.                 break;
  2893.             }
  2894.             else
  2895.             {
  2896.                 total_offset = offsets[index] + sizes[index];
  2897.                 total_size += sizes[index];
  2898.             }
  2899.         }
  2900.  
  2901.     } while(!PINT_REQUEST_DONE(filereq_state) && result.segs);
  2902.  
  2903.     *total_return_offset = total_offset;
  2904.  
  2905.     PINT_free_request_state(filereq_state);
  2906.     PINT_free_request_state(memreq_state);
  2907.     return 0;
  2908. }
  2909.  
  2910. static int io_zero_fill_holes(
  2911.     PINT_client_sm *sm_p,
  2912.     PVFS_size eof,
  2913.     int datafile_count,
  2914.     PVFS_size * datafile_size_array,
  2915.     int * datafile_index_array)
  2916. {
  2917.     PVFS_object_attr * attr;
  2918.     PINT_request_file_data fdata;
  2919.     PINT_Request_result result;
  2920.     PINT_Request_state * file_req_state = NULL;
  2921.     PINT_Request_state * mem_req_state = NULL;
  2922.     int i = 0;
  2923.     int j = 0;
  2924.     int ret;
  2925.     PVFS_offset offsets[IO_MAX_SEGMENT_NUM];
  2926.     PVFS_size sizes[IO_MAX_SEGMENT_NUM];
  2927.    
  2928.     attr = &sm_p->getattr.attr;
  2929.     fdata.server_ct = attr->u.meta.dfile_count;
  2930.     fdata.dist = attr->u.meta.dist;
  2931.     fdata.extend_flag = 0;
  2932.  
  2933.     result.bytemax = PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req); 
  2934.     result.segmax = IO_MAX_SEGMENT_NUM;
  2935.     result.offset_array = offsets;
  2936.     result.size_array = sizes;
  2937.  
  2938.     file_req_state = PINT_new_request_state(sm_p->u.io.file_req);
  2939.     mem_req_state = PINT_new_request_state(sm_p->u.io.mem_req);
  2940.  
  2941.     for(; i < datafile_count; ++i)
  2942.     {
  2943.         PVFS_size real_bstream_size, actual_bstream_size;
  2944.  
  2945.         fdata.server_nr = (datafile_index_array ? datafile_index_array[i] : i);
  2946.         actual_bstream_size = datafile_size_array[i];
  2947.  
  2948.         fdata.fsize = actual_bstream_size;
  2949.  
  2950.         result.bytemax = PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req);
  2951.         result.segmax = IO_MAX_SEGMENT_NUM;
  2952.         fdata.extend_flag = 0;
  2953.         fdata.dist = attr->u.meta.dist;
  2954.  
  2955.         real_bstream_size = 
  2956.             (PVFS_size) fdata.dist->methods->logical_to_physical_offset(
  2957.                 fdata.dist->params, &fdata, eof);
  2958.  
  2959.        if(actual_bstream_size < real_bstream_size)
  2960.        {
  2961.            /* We've got holes! */
  2962.  
  2963.            PVFS_offset real_logical_offset = 
  2964.                fdata.dist->methods->physical_to_logical_offset(
  2965.                    fdata.dist->params, &fdata, real_bstream_size);
  2966.  
  2967.            PVFS_offset actual_logical_offset = 
  2968.                fdata.dist->methods->physical_to_logical_offset(
  2969.                 fdata.dist->params, &fdata, actual_bstream_size);
  2970.  
  2971.            PINT_REQUEST_STATE_SET_TARGET(
  2972.                file_req_state,
  2973.                sm_p->u.io.file_req_offset);
  2974.  
  2975.            PINT_REQUEST_STATE_SET_FINAL(
  2976.                file_req_state,
  2977.                actual_logical_offset);
  2978.  
  2979.            do
  2980.            {
  2981.                result.bytes = 0;
  2982.                result.segs = 0;
  2983.  
  2984.                ret = PINT_process_request(file_req_state, mem_req_state,
  2985.                                           &fdata, &result, PINT_CLIENT);
  2986.                if(ret < 0)
  2987.                {
  2988.                    PVFS_perror_gossip("PINT_process_request failed", ret);
  2989.                    return ret;
  2990.                }
  2991.            } while(!PINT_REQUEST_DONE(file_req_state) && result.segs);
  2992.                
  2993.            PINT_REQUEST_STATE_SET_FINAL(
  2994.                file_req_state,
  2995.                real_logical_offset);
  2996.  
  2997.            fdata.extend_flag = 1;
  2998.  
  2999.            result.bytes = 0;
  3000.            result.segs = 0;
  3001.  
  3002.            do
  3003.            {
  3004.                result.bytes = 0;
  3005.                result.segs = 0;
  3006.                ret = PINT_process_request(file_req_state, mem_req_state,
  3007.                                           &fdata, &result, PINT_CLIENT);
  3008.                if(ret < 0)
  3009.                {
  3010.                    PVFS_perror_gossip("PINT_process_request failed", ret);
  3011.                    return ret;
  3012.                }
  3013.  
  3014.                if(result.segs)
  3015.                {
  3016.                    for(j = 0; j < result.segs; ++j)
  3017.                    {
  3018.                        memset((void *)(((size_t)sm_p->u.io.buffer) + 
  3019.                                        ((size_t)offsets[j])),
  3020.                               0, sizes[j]);
  3021.                    }
  3022.                }
  3023.            } while(!PINT_REQUEST_DONE(file_req_state) && result.segs);
  3024.  
  3025.            PINT_REQUEST_STATE_RESET(file_req_state);
  3026.            PINT_REQUEST_STATE_RESET(mem_req_state);
  3027.        }
  3028.     }
  3029.  
  3030.     PINT_free_request_state(file_req_state);
  3031.     PINT_free_request_state(mem_req_state);
  3032.  
  3033.     return 0;
  3034. }
  3035.  
  3036. static int io_datafile_index_array_init(
  3037.     PINT_client_sm *sm_p,
  3038.     int datafile_count)
  3039. {
  3040.     sm_p->u.io.datafile_index_array = 
  3041.         (int *)malloc(datafile_count * sizeof(int));
  3042.     if(!sm_p->u.io.datafile_index_array)
  3043.     {
  3044.         return -PVFS_ENOMEM;
  3045.     }
  3046.  
  3047.     memset(sm_p->u.io.datafile_index_array, 0,
  3048.            (datafile_count * sizeof(int)));
  3049.     sm_p->u.io.datafile_count = datafile_count;
  3050.     return 0;
  3051. }
  3052.  
  3053. static void io_datafile_index_array_destroy(
  3054.     PINT_client_sm *sm_p)
  3055. {
  3056.     free(sm_p->u.io.datafile_index_array);
  3057.     sm_p->u.io.datafile_index_array = NULL;
  3058.     sm_p->u.io.datafile_count = 0;
  3059. }
  3060.  
  3061. static int io_contexts_init(
  3062.     PINT_client_sm *sm_p,
  3063.     int context_count,
  3064.     PVFS_object_attr *attr)
  3065. {
  3066.     int ret;
  3067.     int i = 0;
  3068.  
  3069.     sm_p->u.io.contexts = (PINT_client_io_ctx *)malloc(
  3070.         context_count * sizeof(PINT_client_io_ctx));
  3071.     if(!sm_p->u.io.contexts)
  3072.     {
  3073.         return -PVFS_ENOMEM;
  3074.     }
  3075.  
  3076.     memset(sm_p->u.io.contexts, 0,
  3077.            (context_count * sizeof(PINT_client_io_ctx)));
  3078.     sm_p->u.io.context_count = context_count;
  3079.  
  3080.     for(i = 0; i < context_count; ++i)
  3081.     {
  3082.         PINT_client_io_ctx * cur_ctx = &sm_p->u.io.contexts[i];
  3083.         PINT_sm_msgpair_state *msg = &cur_ctx->msg;
  3084.  
  3085.         msg->fs_id = sm_p->object_ref.fs_id;
  3086.         msg->handle =
  3087.             attr->u.meta.dfile_array[
  3088.                 sm_p->u.io.datafile_index_array[i]];
  3089.         msg->retry_flag = PVFS_MSGPAIR_NO_RETRY;
  3090.         msg->comp_fn = NULL;
  3091.  
  3092.         ret = PINT_cached_config_map_to_server(
  3093.             &msg->svr_addr, msg->handle, msg->fs_id);
  3094.         if(ret)
  3095.         {
  3096.             gossip_err("Failed to map meta server address\n");
  3097.             free(sm_p->u.io.contexts);
  3098.             return ret;
  3099.         }
  3100.         
  3101.         gossip_debug(GOSSIP_IO_DEBUG, "initializing context[%d] %p\n",
  3102.                      i, cur_ctx);
  3103.  
  3104.         cur_ctx->index = i;
  3105.         cur_ctx->server_nr = sm_p->u.io.datafile_index_array[i];
  3106.         cur_ctx->data_handle =
  3107.             attr->u.meta.dfile_array[cur_ctx->server_nr];
  3108.  
  3109.         PINT_flow_reset(&cur_ctx->flow_desc);
  3110.     }
  3111.  
  3112.     return 0;
  3113. }
  3114.  
  3115. static void io_contexts_destroy(PINT_client_sm *sm_p)
  3116. {
  3117.     int i = 0;
  3118.     for(; i < sm_p->u.io.context_count; ++i)
  3119.     {
  3120.         PINT_flow_clear(&(sm_p->u.io.contexts[i].flow_desc));
  3121.     }
  3122.  
  3123.     free(sm_p->u.io.contexts);
  3124.     sm_p->u.io.contexts = NULL;
  3125.     sm_p->u.io.context_count = 0;
  3126. }
  3127.  
  3128. /* unstuff_needed()
  3129.  *
  3130.  * looks at the I/O pattern requested and compares against the distribution
  3131.  * to determine if a stuffed file would have to be "unstuffed" in order to
  3132.  * service the request
  3133.  *
  3134.  * returns IO_UNSTUFF if unstuff is needed
  3135.  * returns IO_GETATTR_SERVER if current stuffed status needs to be confirmed
  3136.  * returns 0 otherwise
  3137.  */
  3138. static int unstuff_needed(
  3139.     PVFS_Request mem_req,
  3140.     PVFS_offset file_req_offset,
  3141.     PINT_dist *dist_p,
  3142.     uint32_t mask,
  3143.     enum PVFS_io_type io_type)
  3144. {
  3145.     PVFS_offset max_offset = 0;
  3146.     PVFS_offset first_unstuffed_offset = 0;
  3147.     PINT_request_file_data fake_file_data;
  3148.  
  3149.     gossip_debug(GOSSIP_IO_DEBUG, "sys-io checking to see if file should be unstuffed.\n");
  3150.  
  3151.     /* check the flag first to see if file is already explicitly marked as
  3152.      * unstuffed
  3153.      */
  3154.     if(mask & PVFS_ATTR_META_UNSTUFFED)
  3155.     {
  3156.         gossip_debug(GOSSIP_IO_DEBUG, "sys-io detected file is already unstuffed.\n");
  3157.         return(0);
  3158.     }
  3159.  
  3160.     /* calculate maximum logical file offset from the callers's parameters */
  3161.     /* file request is tiled, so we only need to know the beginning file
  3162.      * offset and size of the memory offset */
  3163.     max_offset = file_req_offset + PINT_REQUEST_TOTAL_BYTES(mem_req);
  3164.  
  3165.     gossip_debug(GOSSIP_IO_DEBUG, "sys-io calculated max offset of I/O request as %lld.\n", lld(max_offset));
  3166.  
  3167.  
  3168.     /* we need to query the distribution to determine what the first offset
  3169.      * is that does not belong to the first server/datafile.  We construct a
  3170.      * fake server data struct for 2 servers and find out what the first
  3171.      * offset (above zero) is that hits the second server */
  3172.     fake_file_data.dist = dist_p;
  3173.     fake_file_data.server_ct = 2;
  3174.     fake_file_data.extend_flag = 1;
  3175.     fake_file_data.fsize = 0;  
  3176.     fake_file_data.server_nr = 1;
  3177.  
  3178.     /* call next mapped offset to find the next logical offset that appears
  3179.      * on the 2nd server 
  3180.      */
  3181.     first_unstuffed_offset = dist_p->methods->next_mapped_offset(
  3182.         dist_p->params,
  3183.         &fake_file_data,
  3184.         0);
  3185.     
  3186.     gossip_debug(GOSSIP_IO_DEBUG, "sys-io calculated first unstuffed offset as %lld.\n", lld(first_unstuffed_offset));
  3187.  
  3188.     /* compare to see if the file needs to be unstuffed yet */
  3189.     if(max_offset > first_unstuffed_offset)
  3190.     {
  3191.         if(io_type == PVFS_IO_READ)
  3192.         {
  3193.             /* reads should not unstuff, but we do need to confirm that the
  3194.              * attributes are up to date before proceeding
  3195.              */
  3196.             gossip_debug(GOSSIP_IO_DEBUG, "sys-io will perform an extra getattr to confirm file is still stuffed.\n");
  3197.             return(IO_GETATTR_SERVER);
  3198.         }
  3199.         else
  3200.         {
  3201.             gossip_debug(GOSSIP_IO_DEBUG, "sys-io will unstuff the file.\n");
  3202.             return(IO_UNSTUFF);
  3203.         }
  3204.     }
  3205.  
  3206.     gossip_debug(GOSSIP_IO_DEBUG, "sys-io will not unstuff the file.\n");
  3207.     return(0);
  3208. }
  3209.  
  3210. /* unstuff_comp_fn()
  3211.  *
  3212.  * completion function for unstuff msgpair array
  3213.  */
  3214. static int unstuff_comp_fn(
  3215.     void *v_p,
  3216.     struct PVFS_server_resp *resp_p,
  3217.     int i)
  3218. {
  3219.     PINT_smcb *smcb = v_p;
  3220.     PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
  3221.  
  3222.     gossip_debug(GOSSIP_IO_DEBUG,
  3223.         "unstuff/getattr completion fn: unstuff_comp_fn\n");
  3224.  
  3225.     /* only posted one msgpair */
  3226.     assert(i==0);
  3227.  
  3228.     if (resp_p->status != 0)
  3229.     {
  3230.         gossip_debug(GOSSIP_IO_DEBUG,
  3231.             "unstuff negative response with error code: %d\n", 
  3232.             resp_p->status);
  3233.         return resp_p->status;
  3234.     }
  3235.  
  3236.     assert(resp_p->op == PVFS_SERV_UNSTUFF || resp_p->op ==
  3237.         PVFS_SERV_GETATTR);
  3238.     
  3239.     if(resp_p->op == PVFS_SERV_UNSTUFF)
  3240.     {
  3241.         PINT_acache_update(sm_p->object_ref,
  3242.             &resp_p->u.unstuff.attr,
  3243.             NULL);
  3244.  
  3245.         /* replace attrs found by getattr */
  3246.         /* PINT_copy_object_attr() takes care of releasing old memory */
  3247.         PINT_copy_object_attr(&sm_p->getattr.attr, &resp_p->u.unstuff.attr);
  3248.     }
  3249.     else 
  3250.     {
  3251.         gossip_debug(GOSSIP_CLIENT_DEBUG, "Updating attributes before reading beyond stuffing boundary.\n");
  3252.         PINT_acache_update(sm_p->object_ref,
  3253.             &resp_p->u.getattr.attr,
  3254.             NULL);
  3255.  
  3256.         /* replace attrs found by getattr */
  3257.         /* PINT_copy_object_attr() takes care of releasing old memory */
  3258.         PINT_copy_object_attr(&sm_p->getattr.attr, &resp_p->u.getattr.attr);
  3259.     }
  3260.  
  3261.     return(0);
  3262. }
  3263.  
  3264. /*
  3265.  * Local variables:
  3266.  *  mode: c
  3267.  *  c-indent-level: 4
  3268.  *  c-basic-offset: 4
  3269.  * End:
  3270.  *
  3271.  * vim: ft=c ts=8 sts=4 sw=4 expandtab
  3272.  */
  3273.